]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/spdk/lib/bdev/bdev.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / lib / bdev / bdev.c
index ab82fffd7be61b8659e348377ac6dc4fd7c01a97..4678e4a34913bdcc8f81931e11ba41188aba8c21 100644 (file)
@@ -44,6 +44,7 @@
 #include "spdk/queue.h"
 #include "spdk/nvme_spec.h"
 #include "spdk/scsi_spec.h"
+#include "spdk/notify.h"
 #include "spdk/util.h"
 #include "spdk/trace.h"
 
 int __itt_init_ittlib(const char *, __itt_group_id);
 #endif
 
-#define SPDK_BDEV_IO_POOL_SIZE                 (64 * 1024)
+#define SPDK_BDEV_IO_POOL_SIZE                 (64 * 1024 - 1)
 #define SPDK_BDEV_IO_CACHE_SIZE                        256
-#define BUF_SMALL_POOL_SIZE                    8192
-#define BUF_LARGE_POOL_SIZE                    1024
+#define BUF_SMALL_POOL_SIZE                    8191
+#define BUF_LARGE_POOL_SIZE                    1023
 #define NOMEM_THRESHOLD_COUNT                  8
 #define ZERO_BUFFER_SIZE                       0x100000
 
@@ -75,12 +76,18 @@ int __itt_init_ittlib(const char *, __itt_group_id);
 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC                1000
 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE     1
 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE   512
-#define SPDK_BDEV_QOS_MIN_IOS_PER_SEC          10000
-#define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC                (10 * 1024 * 1024)
+#define SPDK_BDEV_QOS_MIN_IOS_PER_SEC          1000
+#define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC                (1024 * 1024)
 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED                UINT64_MAX
 
-static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"};
-static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"};
+#define SPDK_BDEV_POOL_ALIGNMENT 512
+
+static const char *qos_conf_type[] = {"Limit_IOPS",
+                                     "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS"
+                                    };
+static const char *qos_rpc_type[] = {"rw_ios_per_sec",
+                                    "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec"
+                                   };
 
 TAILQ_HEAD(spdk_bdev_list, spdk_bdev);
 
@@ -139,6 +146,12 @@ struct spdk_bdev_qos_limit {
 
        /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
        uint32_t max_per_timeslice;
+
+       /** Function to check whether to queue the IO. */
+       bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io);
+
+       /** Function to update for the submitted IO. */
+       void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io);
 };
 
 struct spdk_bdev_qos {
@@ -242,6 +255,8 @@ struct spdk_bdev_channel {
 
        uint32_t                flags;
 
+       struct spdk_histogram_data *histogram;
+
 #ifdef SPDK_CONFIG_VTUNE
        uint64_t                start_tsc;
        uint64_t                interval_tsc;
@@ -268,6 +283,12 @@ struct spdk_bdev_iostat_ctx {
        void *cb_arg;
 };
 
+struct set_qos_limit_ctx {
+       void (*cb_fn)(void *cb_arg, int status);
+       void *cb_arg;
+       struct spdk_bdev *bdev;
+};
+
 #define __bdev_to_io_dev(bdev)         (((char *)bdev) + 1)
 #define __bdev_from_io_dev(io_dev)     ((struct spdk_bdev *)(((char *)io_dev) - 1))
 
@@ -275,6 +296,19 @@ static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool
                void *cb_arg);
 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io);
 
+static void _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i);
+static void _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status);
+
+static int
+_spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                               struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks,
+                               uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg);
+static int
+_spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                                struct iovec *iov, int iovcnt, void *md_buf,
+                                uint64_t offset_blocks, uint64_t num_blocks,
+                                spdk_bdev_io_completion_cb cb, void *cb_arg);
+
 void
 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
 {
@@ -400,6 +434,11 @@ spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
 {
        struct iovec *iovs;
 
+       if (bdev_io->u.bdev.iovs == NULL) {
+               bdev_io->u.bdev.iovs = &bdev_io->iov;
+               bdev_io->u.bdev.iovcnt = 1;
+       }
+
        iovs = bdev_io->u.bdev.iovs;
 
        assert(iovs != NULL);
@@ -409,23 +448,158 @@ spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
        iovs[0].iov_len = len;
 }
 
+void
+spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len)
+{
+       assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks);
+       bdev_io->u.bdev.md_buf = md_buf;
+}
+
+static bool
+_is_buf_allocated(const struct iovec *iovs)
+{
+       if (iovs == NULL) {
+               return false;
+       }
+
+       return iovs[0].iov_base != NULL;
+}
+
+static bool
+_are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment)
+{
+       int i;
+       uintptr_t iov_base;
+
+       if (spdk_likely(alignment == 1)) {
+               return true;
+       }
+
+       for (i = 0; i < iovcnt; i++) {
+               iov_base = (uintptr_t)iovs[i].iov_base;
+               if ((iov_base & (alignment - 1)) != 0) {
+                       return false;
+               }
+       }
+
+       return true;
+}
+
+static void
+_copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
+{
+       int i;
+       size_t len;
+
+       for (i = 0; i < iovcnt; i++) {
+               len = spdk_min(iovs[i].iov_len, buf_len);
+               memcpy(buf, iovs[i].iov_base, len);
+               buf += len;
+               buf_len -= len;
+       }
+}
+
+static void
+_copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
+{
+       int i;
+       size_t len;
+
+       for (i = 0; i < iovcnt; i++) {
+               len = spdk_min(iovs[i].iov_len, buf_len);
+               memcpy(iovs[i].iov_base, buf, len);
+               buf += len;
+               buf_len -= len;
+       }
+}
+
+static void
+_bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
+{
+       /* save original iovec */
+       bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs;
+       bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt;
+       /* set bounce iov */
+       bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov;
+       bdev_io->u.bdev.iovcnt = 1;
+       /* set bounce buffer for this operation */
+       bdev_io->u.bdev.iovs[0].iov_base = buf;
+       bdev_io->u.bdev.iovs[0].iov_len = len;
+       /* if this is write path, copy data from original buffer to bounce buffer */
+       if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
+               _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt);
+       }
+}
+
+static void
+_bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len)
+{
+       /* save original md_buf */
+       bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf;
+       /* set bounce md_buf */
+       bdev_io->u.bdev.md_buf = md_buf;
+
+       if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
+               memcpy(md_buf, bdev_io->internal.orig_md_buf, len);
+       }
+}
+
+static void
+_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len)
+{
+       struct spdk_bdev *bdev = bdev_io->bdev;
+       bool buf_allocated;
+       uint64_t md_len, alignment;
+       void *aligned_buf;
+
+       alignment = spdk_bdev_get_buf_align(bdev);
+       buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs);
+       aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1));
+
+       if (buf_allocated) {
+               _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len);
+       } else {
+               spdk_bdev_io_set_buf(bdev_io, aligned_buf, len);
+       }
+
+       if (spdk_bdev_is_md_separate(bdev)) {
+               aligned_buf = (char *)aligned_buf + len;
+               md_len = bdev_io->u.bdev.num_blocks * bdev->md_len;
+
+               assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0);
+
+               if (bdev_io->u.bdev.md_buf != NULL) {
+                       _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len);
+               } else {
+                       spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len);
+               }
+       }
+
+       bdev_io->internal.buf = buf;
+       bdev_io->internal.get_buf_cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true);
+}
+
 static void
 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
 {
+       struct spdk_bdev *bdev = bdev_io->bdev;
        struct spdk_mempool *pool;
        struct spdk_bdev_io *tmp;
-       void *buf, *aligned_buf;
        bdev_io_stailq_t *stailq;
        struct spdk_bdev_mgmt_channel *ch;
-
-       assert(bdev_io->u.bdev.iovcnt == 1);
+       uint64_t buf_len, md_len, alignment;
+       void *buf;
 
        buf = bdev_io->internal.buf;
+       buf_len = bdev_io->internal.buf_len;
+       md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0;
+       alignment = spdk_bdev_get_buf_align(bdev);
        ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
 
        bdev_io->internal.buf = NULL;
 
-       if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
+       if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) +
+           SPDK_BDEV_POOL_ALIGNMENT) {
                pool = g_bdev_mgr.buf_small_pool;
                stailq = &ch->need_buf_small;
        } else {
@@ -437,39 +611,88 @@ spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
                spdk_mempool_put(pool, buf);
        } else {
                tmp = STAILQ_FIRST(stailq);
+               STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
+               _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len);
+       }
+}
+
+static void
+_bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io)
+{
+       if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) {
+               assert(bdev_io->internal.orig_md_buf == NULL);
+               return;
+       }
 
-               aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
-               spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len);
+       /* if this is read path, copy data from bounce buffer to original buffer */
+       if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
+           bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
+               _copy_buf_to_iovs(bdev_io->internal.orig_iovs,
+                                 bdev_io->internal.orig_iovcnt,
+                                 bdev_io->internal.bounce_iov.iov_base,
+                                 bdev_io->internal.bounce_iov.iov_len);
+       }
+       /* set orignal buffer for this io */
+       bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt;
+       bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs;
+       /* disable bouncing buffer for this io */
+       bdev_io->internal.orig_iovcnt = 0;
+       bdev_io->internal.orig_iovs = NULL;
+
+       /* do the same for metadata buffer */
+       if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) {
+               assert(spdk_bdev_is_md_separate(bdev_io->bdev));
+
+               if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
+                   bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
+                       memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf,
+                              bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev));
+               }
 
-               STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
-               tmp->internal.buf = buf;
-               tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp);
+               bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf;
+               bdev_io->internal.orig_md_buf = NULL;
        }
+
+       spdk_bdev_io_put_buf(bdev_io);
 }
 
 void
 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
 {
+       struct spdk_bdev *bdev = bdev_io->bdev;
        struct spdk_mempool *pool;
        bdev_io_stailq_t *stailq;
-       void *buf, *aligned_buf;
        struct spdk_bdev_mgmt_channel *mgmt_ch;
+       uint64_t alignment, md_len;
+       void *buf;
 
        assert(cb != NULL);
-       assert(bdev_io->u.bdev.iovs != NULL);
 
-       if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
-               /* Buffer already present */
-               cb(bdev_io->internal.ch->channel, bdev_io);
+       alignment = spdk_bdev_get_buf_align(bdev);
+       md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0;
+
+       if (_is_buf_allocated(bdev_io->u.bdev.iovs) &&
+           _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) {
+               /* Buffer already present and aligned */
+               cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true);
+               return;
+       }
+
+       if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) +
+           SPDK_BDEV_POOL_ALIGNMENT) {
+               SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n",
+                           len + alignment);
+               cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, false);
                return;
        }
 
-       assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
        mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
 
        bdev_io->internal.buf_len = len;
        bdev_io->internal.get_buf_cb = cb;
-       if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
+
+       if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) +
+           SPDK_BDEV_POOL_ALIGNMENT) {
                pool = g_bdev_mgr.buf_small_pool;
                stailq = &mgmt_ch->need_buf_small;
        } else {
@@ -478,15 +701,10 @@ spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, u
        }
 
        buf = spdk_mempool_get(pool);
-
        if (!buf) {
                STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
        } else {
-               aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
-               spdk_bdev_io_set_buf(bdev_io, aligned_buf, len);
-
-               bdev_io->internal.buf = buf;
-               bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
+               _bdev_io_set_buf(bdev_io, buf, len);
        }
 }
 
@@ -532,9 +750,8 @@ spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
 
        spdk_json_write_object_begin(w);
        spdk_json_write_named_string(w, "method", "set_bdev_qos_limit");
-       spdk_json_write_name(w, "params");
 
-       spdk_json_write_object_begin(w);
+       spdk_json_write_named_object_begin(w, "params");
        spdk_json_write_named_string(w, "name", bdev->name);
        for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
                if (limits[i] > 0) {
@@ -558,8 +775,7 @@ spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
 
        spdk_json_write_object_begin(w);
        spdk_json_write_named_string(w, "method", "set_bdev_options");
-       spdk_json_write_name(w, "params");
-       spdk_json_write_object_begin(w);
+       spdk_json_write_named_object_begin(w, "params");
        spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
        spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
        spdk_json_write_object_end(w);
@@ -572,11 +788,11 @@ spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
        }
 
        TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
-               spdk_bdev_qos_config_json(bdev, w);
-
                if (bdev->fn_table->write_config_json) {
                        bdev->fn_table->write_config_json(bdev, w);
                }
+
+               spdk_bdev_qos_config_json(bdev, w);
        }
 
        spdk_json_write_array_end(w);
@@ -601,7 +817,7 @@ spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
                bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
                assert(bdev_io != NULL);
                ch->per_thread_cache_count++;
-               STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
+               STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link);
        }
 
        TAILQ_INIT(&ch->shared_resources);
@@ -724,6 +940,9 @@ spdk_bdev_modules_init(void)
 
        TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
                g_resume_bdev_module = module;
+               if (module->async_init) {
+                       module->internal.action_in_progress = 1;
+               }
                rc = module->module_init();
                if (rc != 0) {
                        return rc;
@@ -734,17 +953,10 @@ spdk_bdev_modules_init(void)
        return 0;
 }
 
-
-static void
-spdk_bdev_init_failed_complete(void *cb_arg)
-{
-       spdk_bdev_init_complete(-1);
-}
-
 static void
 spdk_bdev_init_failed(void *cb_arg)
 {
-       spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL);
+       spdk_bdev_init_complete(-1);
 }
 
 void
@@ -784,6 +996,9 @@ spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
        g_init_cb_fn = cb_fn;
        g_init_cb_arg = cb_arg;
 
+       spdk_notify_type_register("bdev_register");
+       spdk_notify_type_register("bdev_unregister");
+
        snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
 
        g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
@@ -809,7 +1024,8 @@ spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
 
        g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
                                    BUF_SMALL_POOL_SIZE,
-                                   SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
+                                   SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) +
+                                   SPDK_BDEV_POOL_ALIGNMENT,
                                    cache_size,
                                    SPDK_ENV_SOCKET_ID_ANY);
        if (!g_bdev_mgr.buf_small_pool) {
@@ -823,7 +1039,8 @@ spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
 
        g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
                                    BUF_LARGE_POOL_SIZE,
-                                   SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
+                                   SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) +
+                                   SPDK_BDEV_POOL_ALIGNMENT,
                                    cache_size,
                                    SPDK_ENV_SOCKET_ID_ANY);
        if (!g_bdev_mgr.buf_large_pool) {
@@ -832,8 +1049,8 @@ spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
                return;
        }
 
-       g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
-                                NULL);
+       g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
+                                             NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
        if (!g_bdev_mgr.zero_buffer) {
                SPDK_ERRLOG("create bdev zero buffer failed\n");
                spdk_bdev_init_complete(-1);
@@ -888,7 +1105,7 @@ spdk_bdev_mgr_unregister_cb(void *io_device)
        spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
        spdk_mempool_free(g_bdev_mgr.buf_small_pool);
        spdk_mempool_free(g_bdev_mgr.buf_large_pool);
-       spdk_dma_free(g_bdev_mgr.zero_buffer);
+       spdk_free(g_bdev_mgr.zero_buffer);
 
        cb_fn(g_fini_cb_arg);
        g_fini_cb_fn = NULL;
@@ -966,7 +1183,7 @@ _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
        if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
                SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
                /*
-                * Bdev module finish need to be deffered as we might be in the middle of some context
+                * Bdev module finish need to be deferred as we might be in the middle of some context
                 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
                 * after returning.
                 */
@@ -975,12 +1192,38 @@ _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
        }
 
        /*
-        * Unregister the last bdev in the list.  The last bdev in the list should be a bdev
-        * that has no bdevs that depend on it.
+        * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem
+        * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity
+        * to detect clean shutdown as opposed to run-time hot removal of the underlying
+        * base bdevs.
+        *
+        * Also, walk the list in the reverse order.
         */
-       bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
-       SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
-       spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
+       for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
+            bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) {
+               if (bdev->internal.claim_module != NULL) {
+                       SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n",
+                                     bdev->name, bdev->internal.claim_module->name);
+                       continue;
+               }
+
+               SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
+               spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
+               return;
+       }
+
+       /*
+        * If any bdev fails to unclaim underlying bdev properly, we may face the
+        * case of bdev list consisting of claimed bdevs only (if claims are managed
+        * correctly, this would mean there's a loop in the claims graph which is
+        * clearly impossible). Warn and unregister last bdev on the list then.
+        */
+       for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
+            bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) {
+               SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name);
+               spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
+               return;
+       }
 }
 
 void
@@ -1030,18 +1273,20 @@ spdk_bdev_get_io(struct spdk_bdev_channel *channel)
 void
 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
 {
-       struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
+       struct spdk_bdev_mgmt_channel *ch;
 
        assert(bdev_io != NULL);
        assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
 
+       ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
+
        if (bdev_io->internal.buf != NULL) {
                spdk_bdev_io_put_buf(bdev_io);
        }
 
        if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
                ch->per_thread_cache_count++;
-               STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
+               STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link);
                while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
                        struct spdk_bdev_io_wait_entry *entry;
 
@@ -1065,6 +1310,8 @@ _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit)
        case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
                return true;
        case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
+       case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT:
+       case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT:
                return false;
        case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
        default:
@@ -1080,8 +1327,25 @@ _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io)
        case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
        case SPDK_BDEV_IO_TYPE_READ:
        case SPDK_BDEV_IO_TYPE_WRITE:
-       case SPDK_BDEV_IO_TYPE_UNMAP:
-       case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
+               return true;
+       default:
+               return false;
+       }
+}
+
+static bool
+_spdk_bdev_is_read_io(struct spdk_bdev_io *bdev_io)
+{
+       switch (bdev_io->type) {
+       case SPDK_BDEV_IO_TYPE_NVME_IO:
+       case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
+               /* Bit 1 (0x2) set for read operation */
+               if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) {
+                       return true;
+               } else {
+                       return false;
+               }
+       case SPDK_BDEV_IO_TYPE_READ:
                return true;
        default:
                return false;
@@ -1099,67 +1363,160 @@ _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
                return bdev_io->u.nvme_passthru.nbytes;
        case SPDK_BDEV_IO_TYPE_READ:
        case SPDK_BDEV_IO_TYPE_WRITE:
-       case SPDK_BDEV_IO_TYPE_UNMAP:
-       case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
                return bdev_io->u.bdev.num_blocks * bdev->blocklen;
        default:
                return 0;
        }
 }
 
+static bool
+_spdk_bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
+{
+       if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) {
+               return true;
+       } else {
+               return false;
+       }
+}
+
+static bool
+_spdk_bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
+{
+       if (_spdk_bdev_is_read_io(io) == false) {
+               return false;
+       }
+
+       return _spdk_bdev_qos_rw_queue_io(limit, io);
+}
+
+static bool
+_spdk_bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
+{
+       if (_spdk_bdev_is_read_io(io) == true) {
+               return false;
+       }
+
+       return _spdk_bdev_qos_rw_queue_io(limit, io);
+}
+
+static void
+_spdk_bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
+{
+       limit->remaining_this_timeslice--;
+}
+
+static void
+_spdk_bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
+{
+       limit->remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(io);
+}
+
+static void
+_spdk_bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
+{
+       if (_spdk_bdev_is_read_io(io) == false) {
+               return;
+       }
+
+       return _spdk_bdev_qos_rw_bps_update_quota(limit, io);
+}
+
+static void
+_spdk_bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
+{
+       if (_spdk_bdev_is_read_io(io) == true) {
+               return;
+       }
+
+       return _spdk_bdev_qos_rw_bps_update_quota(limit, io);
+}
+
 static void
-_spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte)
+_spdk_bdev_qos_set_ops(struct spdk_bdev_qos *qos)
 {
        int i;
 
        for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
                if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
+                       qos->rate_limits[i].queue_io = NULL;
+                       qos->rate_limits[i].update_quota = NULL;
                        continue;
                }
 
                switch (i) {
                case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
-                       qos->rate_limits[i].remaining_this_timeslice--;
+                       qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io;
+                       qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_iops_update_quota;
                        break;
                case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
-                       qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte;
+                       qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io;
+                       qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_bps_update_quota;
+                       break;
+               case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT:
+                       qos->rate_limits[i].queue_io = _spdk_bdev_qos_r_queue_io;
+                       qos->rate_limits[i].update_quota = _spdk_bdev_qos_r_bps_update_quota;
+                       break;
+               case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT:
+                       qos->rate_limits[i].queue_io = _spdk_bdev_qos_w_queue_io;
+                       qos->rate_limits[i].update_quota = _spdk_bdev_qos_w_bps_update_quota;
                        break;
-               case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
                default:
                        break;
                }
        }
 }
 
-static void
+static inline void
+_spdk_bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io)
+{
+       struct spdk_bdev *bdev = bdev_io->bdev;
+       struct spdk_io_channel *ch = bdev_ch->channel;
+       struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
+
+       if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
+               bdev_ch->io_outstanding++;
+               shared_resource->io_outstanding++;
+               bdev_io->internal.in_submit_request = true;
+               bdev->fn_table->submit_request(ch, bdev_io);
+               bdev_io->internal.in_submit_request = false;
+       } else {
+               TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
+       }
+}
+
+static int
 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos)
 {
-       struct spdk_bdev_io             *bdev_io = NULL;
-       struct spdk_bdev                *bdev = ch->bdev;
-       struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
-       int                             i;
-       bool                            to_limit_io;
-       uint64_t                        io_size_in_byte;
+       struct spdk_bdev_io             *bdev_io = NULL, *tmp = NULL;
+       int                             i, submitted_ios = 0;
 
-       while (!TAILQ_EMPTY(&qos->queued)) {
-               for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
-                       if (qos->rate_limits[i].max_per_timeslice > 0 &&
-                           (qos->rate_limits[i].remaining_this_timeslice <= 0)) {
-                               return;
+       TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) {
+               if (_spdk_bdev_qos_io_to_limit(bdev_io) == true) {
+                       for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
+                               if (!qos->rate_limits[i].queue_io) {
+                                       continue;
+                               }
+
+                               if (qos->rate_limits[i].queue_io(&qos->rate_limits[i],
+                                                                bdev_io) == true) {
+                                       return submitted_ios;
+                               }
+                       }
+                       for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
+                               if (!qos->rate_limits[i].update_quota) {
+                                       continue;
+                               }
+
+                               qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io);
                        }
                }
 
-               bdev_io = TAILQ_FIRST(&qos->queued);
                TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
-               ch->io_outstanding++;
-               shared_resource->io_outstanding++;
-               to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io);
-               if (to_limit_io == true) {
-                       io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io);
-                       _spdk_bdev_qos_update_per_io(qos, io_size_in_byte);
-               }
-               bdev->fn_table->submit_request(ch->channel, bdev_io);
+               _spdk_bdev_io_do_submit(ch, bdev_io);
+               submitted_ios++;
        }
+
+       return submitted_ios;
 }
 
 static void
@@ -1243,6 +1600,7 @@ _spdk_bdev_io_split_with_payload(void *_bdev_io)
        struct iovec *parent_iov, *iov;
        uint64_t parent_iov_offset, iov_len;
        uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt;
+       void *md_buf = NULL;
        int rc;
 
        remaining = bdev_io->u.bdev.split_remaining_num_blocks;
@@ -1266,6 +1624,13 @@ _spdk_bdev_io_split_with_payload(void *_bdev_io)
                to_next_boundary_bytes = to_next_boundary * blocklen;
                iov = &bdev_io->child_iov[child_iovcnt];
                iovcnt = 0;
+
+               if (bdev_io->u.bdev.md_buf) {
+                       assert((parent_iov_offset % blocklen) > 0);
+                       md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) *
+                                spdk_bdev_get_md_size(bdev_io->bdev);
+               }
+
                while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt &&
                       child_iovcnt < BDEV_IO_NUM_CHILD_IOV) {
                        parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos];
@@ -1305,15 +1670,17 @@ _spdk_bdev_io_split_with_payload(void *_bdev_io)
                bdev_io->u.bdev.split_outstanding++;
 
                if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
-                       rc = spdk_bdev_readv_blocks(bdev_io->internal.desc,
-                                                   spdk_io_channel_from_ctx(bdev_io->internal.ch),
-                                                   iov, iovcnt, current_offset, to_next_boundary,
-                                                   _spdk_bdev_io_split_done, bdev_io);
+                       rc = _spdk_bdev_readv_blocks_with_md(bdev_io->internal.desc,
+                                                            spdk_io_channel_from_ctx(bdev_io->internal.ch),
+                                                            iov, iovcnt, md_buf, current_offset,
+                                                            to_next_boundary,
+                                                            _spdk_bdev_io_split_done, bdev_io);
                } else {
-                       rc = spdk_bdev_writev_blocks(bdev_io->internal.desc,
-                                                    spdk_io_channel_from_ctx(bdev_io->internal.ch),
-                                                    iov, iovcnt, current_offset, to_next_boundary,
-                                                    _spdk_bdev_io_split_done, bdev_io);
+                       rc = _spdk_bdev_writev_blocks_with_md(bdev_io->internal.desc,
+                                                             spdk_io_channel_from_ctx(bdev_io->internal.ch),
+                                                             iov, iovcnt, md_buf, current_offset,
+                                                             to_next_boundary,
+                                                             _spdk_bdev_io_split_done, bdev_io);
                }
 
                if (rc == 0) {
@@ -1357,11 +1724,9 @@ _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_ar
        }
 
        /*
-        * Parent I/O finishes when all blocks are consumed or there is any failure of
-        * child I/O and no outstanding child I/O.
+        * Parent I/O finishes when all blocks are consumed.
         */
-       if (parent_io->u.bdev.split_remaining_num_blocks == 0 ||
-           parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) {
+       if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
                parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
                                       parent_io->internal.caller_ctx);
                return;
@@ -1388,30 +1753,42 @@ _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
 }
 
 static void
+_spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
+                              bool success)
+{
+       if (!success) {
+               spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
+               return;
+       }
+
+       _spdk_bdev_io_split(ch, bdev_io);
+}
+
+/* Explicitly mark this inline, since it's used as a function pointer and otherwise won't
+ *  be inlined, at least on some compilers.
+ */
+static inline void
 _spdk_bdev_io_submit(void *ctx)
 {
        struct spdk_bdev_io *bdev_io = ctx;
        struct spdk_bdev *bdev = bdev_io->bdev;
        struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
-       struct spdk_io_channel *ch = bdev_ch->channel;
        struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
        uint64_t tsc;
 
        tsc = spdk_get_ticks();
        bdev_io->internal.submit_tsc = tsc;
        spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type);
+
+       if (spdk_likely(bdev_ch->flags == 0)) {
+               _spdk_bdev_io_do_submit(bdev_ch, bdev_io);
+               return;
+       }
+
        bdev_ch->io_outstanding++;
        shared_resource->io_outstanding++;
        bdev_io->internal.in_submit_request = true;
-       if (spdk_likely(bdev_ch->flags == 0)) {
-               if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
-                       bdev->fn_table->submit_request(ch, bdev_io);
-               } else {
-                       bdev_ch->io_outstanding--;
-                       shared_resource->io_outstanding--;
-                       TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
-               }
-       } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
+       if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
                spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
        } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
                bdev_ch->io_outstanding--;
@@ -1429,14 +1806,14 @@ static void
 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
 {
        struct spdk_bdev *bdev = bdev_io->bdev;
-       struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
+       struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io);
 
        assert(thread != NULL);
        assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
 
        if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) {
                if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
-                       spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split,
+                       spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split_get_buf_cb,
                                             bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
                } else {
                        _spdk_bdev_io_split(NULL, bdev_io);
@@ -1483,6 +1860,9 @@ spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
        bdev_io->internal.in_submit_request = false;
        bdev_io->internal.buf = NULL;
        bdev_io->internal.io_submit_ch = NULL;
+       bdev_io->internal.orig_iovs = NULL;
+       bdev_io->internal.orig_iovcnt = 0;
+       bdev_io->internal.orig_md_buf = NULL;
 }
 
 static bool
@@ -1504,6 +1884,11 @@ spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_ty
                        /* The bdev layer will emulate write zeroes as long as write is supported. */
                        supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
                        break;
+               case SPDK_BDEV_IO_TYPE_ZCOPY:
+                       /* Zero copy can be emulated with regular read and write */
+                       supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) &&
+                                   _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
+                       break;
                default:
                        break;
                }
@@ -1542,6 +1927,8 @@ spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
 
                qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice;
        }
+
+       _spdk_bdev_qos_set_ops(qos);
 }
 
 static int
@@ -1580,9 +1967,7 @@ spdk_bdev_channel_poll_qos(void *arg)
                }
        }
 
-       _spdk_bdev_qos_io_submit(qos->ch, qos);
-
-       return -1;
+       return _spdk_bdev_qos_io_submit(qos->ch, qos);
 }
 
 static void
@@ -1590,27 +1975,18 @@ _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
 {
        struct spdk_bdev_shared_resource *shared_resource;
 
-       if (!ch) {
-               return;
-       }
+       spdk_put_io_channel(ch->channel);
 
-       if (ch->channel) {
-               spdk_put_io_channel(ch->channel);
-       }
+       shared_resource = ch->shared_resource;
 
        assert(ch->io_outstanding == 0);
-
-       shared_resource = ch->shared_resource;
-       if (shared_resource) {
-               assert(ch->io_outstanding == 0);
-               assert(shared_resource->ref > 0);
-               shared_resource->ref--;
-               if (shared_resource->ref == 0) {
-                       assert(shared_resource->io_outstanding == 0);
-                       TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
-                       spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
-                       free(shared_resource);
-               }
+       assert(shared_resource->ref > 0);
+       shared_resource->ref--;
+       if (shared_resource->ref == 0) {
+               assert(shared_resource->io_outstanding == 0);
+               TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
+               spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
+               free(shared_resource);
        }
 }
 
@@ -1633,6 +2009,7 @@ _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
 
                        /* Take another reference to ch */
                        io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
+                       assert(io_ch != NULL);
                        qos->ch = ch;
 
                        qos->thread = spdk_io_channel_get_thread(io_ch);
@@ -1680,8 +2057,17 @@ spdk_bdev_channel_create(void *io_device, void *ctx_buf)
                return -1;
        }
 
+       assert(ch->histogram == NULL);
+       if (bdev->internal.histogram_enabled) {
+               ch->histogram = spdk_histogram_data_alloc();
+               if (ch->histogram == NULL) {
+                       SPDK_ERRLOG("Could not allocate histogram\n");
+               }
+       }
+
        mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
        if (!mgmt_io_ch) {
+               spdk_put_io_channel(ch->channel);
                return -1;
        }
 
@@ -1697,6 +2083,7 @@ spdk_bdev_channel_create(void *io_device, void *ctx_buf)
        if (shared_resource == NULL) {
                shared_resource = calloc(1, sizeof(*shared_resource));
                if (shared_resource == NULL) {
+                       spdk_put_io_channel(ch->channel);
                        spdk_put_io_channel(mgmt_io_ch);
                        return -1;
                }
@@ -1872,8 +2259,11 @@ _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat
        total->num_read_ops += add->num_read_ops;
        total->bytes_written += add->bytes_written;
        total->num_write_ops += add->num_write_ops;
+       total->bytes_unmapped += add->bytes_unmapped;
+       total->num_unmap_ops += add->num_unmap_ops;
        total->read_latency_ticks += add->read_latency_ticks;
        total->write_latency_ticks += add->write_latency_ticks;
+       total->unmap_latency_ticks += add->unmap_latency_ticks;
 }
 
 static void
@@ -1898,6 +2288,10 @@ spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
        _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
        _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
 
+       if (ch->histogram) {
+               spdk_histogram_data_free(ch->histogram);
+       }
+
        _spdk_bdev_channel_destroy_resource(ch);
 }
 
@@ -2033,12 +2427,7 @@ spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
 size_t
 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
 {
-       /* TODO: push this logic down to the bdev modules */
-       if (bdev->need_aligned_buffer) {
-               return bdev->blocklen;
-       }
-
-       return 1;
+       return 1 << bdev->required_alignment;
 }
 
 uint32_t
@@ -2059,6 +2448,73 @@ spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
        return &bdev->uuid;
 }
 
+uint32_t
+spdk_bdev_get_md_size(const struct spdk_bdev *bdev)
+{
+       return bdev->md_len;
+}
+
+bool
+spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev)
+{
+       return (bdev->md_len != 0) && bdev->md_interleave;
+}
+
+bool
+spdk_bdev_is_md_separate(const struct spdk_bdev *bdev)
+{
+       return (bdev->md_len != 0) && !bdev->md_interleave;
+}
+
+uint32_t
+spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev)
+{
+       if (spdk_bdev_is_md_interleaved(bdev)) {
+               return bdev->blocklen - bdev->md_len;
+       } else {
+               return bdev->blocklen;
+       }
+}
+
+enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev)
+{
+       if (bdev->md_len != 0) {
+               return bdev->dif_type;
+       } else {
+               return SPDK_DIF_DISABLE;
+       }
+}
+
+bool
+spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev)
+{
+       if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
+               return bdev->dif_is_head_of_md;
+       } else {
+               return false;
+       }
+}
+
+bool
+spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev,
+                              enum spdk_dif_check_type check_type)
+{
+       if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) {
+               return false;
+       }
+
+       switch (check_type) {
+       case SPDK_DIF_CHECK_TYPE_REFTAG:
+               return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0;
+       case SPDK_DIF_CHECK_TYPE_APPTAG:
+               return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0;
+       case SPDK_DIF_CHECK_TYPE_GUARD:
+               return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0;
+       default:
+               return false;
+       }
+}
+
 uint64_t
 spdk_bdev_get_qd(const struct spdk_bdev *bdev)
 {
@@ -2164,11 +2620,20 @@ spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_
                          uint64_t num_bytes, uint64_t *num_blocks)
 {
        uint32_t block_size = bdev->blocklen;
-
-       *offset_blocks = offset_bytes / block_size;
-       *num_blocks = num_bytes / block_size;
-
-       return (offset_bytes % block_size) | (num_bytes % block_size);
+       uint8_t shift_cnt;
+
+       /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */
+       if (spdk_likely(spdk_u32_is_pow2(block_size))) {
+               shift_cnt = spdk_u32log2(block_size);
+               *offset_blocks = offset_bytes >> shift_cnt;
+               *num_blocks = num_bytes >> shift_cnt;
+               return (offset_bytes - (*offset_blocks << shift_cnt)) |
+                      (num_bytes - (*num_blocks << shift_cnt));
+       } else {
+               *offset_blocks = offset_bytes / block_size;
+               *num_blocks = num_bytes / block_size;
+               return (offset_bytes % block_size) | (num_bytes % block_size);
+       }
 }
 
 static bool
@@ -2188,24 +2653,16 @@ spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64
        return true;
 }
 
-int
-spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
-              void *buf, uint64_t offset, uint64_t nbytes,
-              spdk_bdev_io_completion_cb cb, void *cb_arg)
+static bool
+_bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf)
 {
-       uint64_t offset_blocks, num_blocks;
-
-       if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
-               return -EINVAL;
-       }
-
-       return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
+       return _is_buf_allocated(iovs) == (md_buf != NULL);
 }
 
-int
-spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
-                     void *buf, uint64_t offset_blocks, uint64_t num_blocks,
-                     spdk_bdev_io_completion_cb cb, void *cb_arg)
+static int
+_spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf,
+                              void *md_buf, int64_t offset_blocks, uint64_t num_blocks,
+                              spdk_bdev_io_completion_cb cb, void *cb_arg)
 {
        struct spdk_bdev *bdev = desc->bdev;
        struct spdk_bdev_io *bdev_io;
@@ -2227,6 +2684,7 @@ spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
        bdev_io->u.bdev.iovs[0].iov_base = buf;
        bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
        bdev_io->u.bdev.iovcnt = 1;
+       bdev_io->u.bdev.md_buf = md_buf;
        bdev_io->u.bdev.num_blocks = num_blocks;
        bdev_io->u.bdev.offset_blocks = offset_blocks;
        spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
@@ -2235,6 +2693,50 @@ spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
        return 0;
 }
 
+int
+spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+              void *buf, uint64_t offset, uint64_t nbytes,
+              spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       uint64_t offset_blocks, num_blocks;
+
+       if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
+               return -EINVAL;
+       }
+
+       return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
+}
+
+int
+spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                     void *buf, uint64_t offset_blocks, uint64_t num_blocks,
+                     spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       return _spdk_bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks,
+                                             cb, cb_arg);
+}
+
+int
+spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                             void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks,
+                             spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       struct iovec iov = {
+               .iov_base = buf,
+       };
+
+       if (!spdk_bdev_is_md_separate(desc->bdev)) {
+               return -EINVAL;
+       }
+
+       if (!_bdev_io_check_md_buf(&iov, md_buf)) {
+               return -EINVAL;
+       }
+
+       return _spdk_bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks,
+                                             cb, cb_arg);
+}
+
 int
 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
                struct iovec *iov, int iovcnt,
@@ -2250,15 +2752,78 @@ spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
        return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
 }
 
+static int
+_spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                               struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks,
+                               uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       struct spdk_bdev *bdev = desc->bdev;
+       struct spdk_bdev_io *bdev_io;
+       struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
+
+       if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
+               return -EINVAL;
+       }
+
+       bdev_io = spdk_bdev_get_io(channel);
+       if (!bdev_io) {
+               return -ENOMEM;
+       }
+
+       bdev_io->internal.ch = channel;
+       bdev_io->internal.desc = desc;
+       bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
+       bdev_io->u.bdev.iovs = iov;
+       bdev_io->u.bdev.iovcnt = iovcnt;
+       bdev_io->u.bdev.md_buf = md_buf;
+       bdev_io->u.bdev.num_blocks = num_blocks;
+       bdev_io->u.bdev.offset_blocks = offset_blocks;
+       spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
+
+       spdk_bdev_io_submit(bdev_io);
+       return 0;
+}
+
 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
                           struct iovec *iov, int iovcnt,
                           uint64_t offset_blocks, uint64_t num_blocks,
                           spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       return _spdk_bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks,
+                                              num_blocks, cb, cb_arg);
+}
+
+int
+spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                              struct iovec *iov, int iovcnt, void *md_buf,
+                              uint64_t offset_blocks, uint64_t num_blocks,
+                              spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       if (!spdk_bdev_is_md_separate(desc->bdev)) {
+               return -EINVAL;
+       }
+
+       if (!_bdev_io_check_md_buf(iov, md_buf)) {
+               return -EINVAL;
+       }
+
+       return _spdk_bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks,
+                                              num_blocks, cb, cb_arg);
+}
+
+static int
+_spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                               void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks,
+                               spdk_bdev_io_completion_cb cb, void *cb_arg)
 {
        struct spdk_bdev *bdev = desc->bdev;
        struct spdk_bdev_io *bdev_io;
        struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
 
+       if (!desc->write) {
+               return -EBADF;
+       }
+
        if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
                return -EINVAL;
        }
@@ -2270,9 +2835,12 @@ int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *
 
        bdev_io->internal.ch = channel;
        bdev_io->internal.desc = desc;
-       bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
-       bdev_io->u.bdev.iovs = iov;
-       bdev_io->u.bdev.iovcnt = iovcnt;
+       bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
+       bdev_io->u.bdev.iovs = &bdev_io->iov;
+       bdev_io->u.bdev.iovs[0].iov_base = buf;
+       bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
+       bdev_io->u.bdev.iovcnt = 1;
+       bdev_io->u.bdev.md_buf = md_buf;
        bdev_io->u.bdev.num_blocks = num_blocks;
        bdev_io->u.bdev.offset_blocks = offset_blocks;
        spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
@@ -2299,6 +2867,37 @@ int
 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
                       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
                       spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       return _spdk_bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks,
+                                              cb, cb_arg);
+}
+
+int
+spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                              void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks,
+                              spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       struct iovec iov = {
+               .iov_base = buf,
+       };
+
+       if (!spdk_bdev_is_md_separate(desc->bdev)) {
+               return -EINVAL;
+       }
+
+       if (!_bdev_io_check_md_buf(&iov, md_buf)) {
+               return -EINVAL;
+       }
+
+       return _spdk_bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks,
+                                              cb, cb_arg);
+}
+
+static int
+_spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                                struct iovec *iov, int iovcnt, void *md_buf,
+                                uint64_t offset_blocks, uint64_t num_blocks,
+                                spdk_bdev_io_completion_cb cb, void *cb_arg)
 {
        struct spdk_bdev *bdev = desc->bdev;
        struct spdk_bdev_io *bdev_io;
@@ -2320,10 +2919,9 @@ spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
        bdev_io->internal.ch = channel;
        bdev_io->internal.desc = desc;
        bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
-       bdev_io->u.bdev.iovs = &bdev_io->iov;
-       bdev_io->u.bdev.iovs[0].iov_base = buf;
-       bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
-       bdev_io->u.bdev.iovcnt = 1;
+       bdev_io->u.bdev.iovs = iov;
+       bdev_io->u.bdev.iovcnt = iovcnt;
+       bdev_io->u.bdev.md_buf = md_buf;
        bdev_io->u.bdev.num_blocks = num_blocks;
        bdev_io->u.bdev.offset_blocks = offset_blocks;
        spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
@@ -2352,6 +2950,57 @@ spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
                        struct iovec *iov, int iovcnt,
                        uint64_t offset_blocks, uint64_t num_blocks,
                        spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       return _spdk_bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks,
+                                               num_blocks, cb, cb_arg);
+}
+
+int
+spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                               struct iovec *iov, int iovcnt, void *md_buf,
+                               uint64_t offset_blocks, uint64_t num_blocks,
+                               spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       if (!spdk_bdev_is_md_separate(desc->bdev)) {
+               return -EINVAL;
+       }
+
+       if (!_bdev_io_check_md_buf(iov, md_buf)) {
+               return -EINVAL;
+       }
+
+       return _spdk_bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks,
+                                               num_blocks, cb, cb_arg);
+}
+
+static void
+bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
+{
+       if (!success) {
+               /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */
+               bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM;
+               bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx);
+               return;
+       }
+
+       if (bdev_io->u.bdev.zcopy.populate) {
+               /* Read the real data into the buffer */
+               bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
+               bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
+               spdk_bdev_io_submit(bdev_io);
+               return;
+       }
+
+       /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */
+       bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
+       bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx);
+}
+
+int
+spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
+                     uint64_t offset_blocks, uint64_t num_blocks,
+                     bool populate,
+                     spdk_bdev_io_completion_cb cb, void *cb_arg)
 {
        struct spdk_bdev *bdev = desc->bdev;
        struct spdk_bdev_io *bdev_io;
@@ -2365,21 +3014,77 @@ spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
                return -EINVAL;
        }
 
-       bdev_io = spdk_bdev_get_io(channel);
-       if (!bdev_io) {
-               return -ENOMEM;
+       if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
+               return -ENOTSUP;
+       }
+
+       bdev_io = spdk_bdev_get_io(channel);
+       if (!bdev_io) {
+               return -ENOMEM;
+       }
+
+       bdev_io->internal.ch = channel;
+       bdev_io->internal.desc = desc;
+       bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY;
+       bdev_io->u.bdev.num_blocks = num_blocks;
+       bdev_io->u.bdev.offset_blocks = offset_blocks;
+       bdev_io->u.bdev.iovs = NULL;
+       bdev_io->u.bdev.iovcnt = 0;
+       bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0;
+       bdev_io->u.bdev.zcopy.commit = 0;
+       bdev_io->u.bdev.zcopy.start = 1;
+       spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
+
+       if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
+               spdk_bdev_io_submit(bdev_io);
+       } else {
+               /* Emulate zcopy by allocating a buffer */
+               spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf,
+                                    bdev_io->u.bdev.num_blocks * bdev->blocklen);
+       }
+
+       return 0;
+}
+
+int
+spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit,
+                   spdk_bdev_io_completion_cb cb, void *cb_arg)
+{
+       struct spdk_bdev *bdev = bdev_io->bdev;
+
+       if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
+               /* This can happen if the zcopy was emulated in start */
+               if (bdev_io->u.bdev.zcopy.start != 1) {
+                       return -EINVAL;
+               }
+               bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY;
+       }
+
+       if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) {
+               return -EINVAL;
+       }
+
+       bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0;
+       bdev_io->u.bdev.zcopy.start = 0;
+       bdev_io->internal.caller_ctx = cb_arg;
+       bdev_io->internal.cb = cb;
+       bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
+
+       if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
+               spdk_bdev_io_submit(bdev_io);
+               return 0;
+       }
+
+       if (!bdev_io->u.bdev.zcopy.commit) {
+               /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */
+               bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
+               bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx);
+               return 0;
        }
 
-       bdev_io->internal.ch = channel;
-       bdev_io->internal.desc = desc;
        bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
-       bdev_io->u.bdev.iovs = iov;
-       bdev_io->u.bdev.iovcnt = iovcnt;
-       bdev_io->u.bdev.num_blocks = num_blocks;
-       bdev_io->u.bdev.offset_blocks = offset_blocks;
-       spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
-
        spdk_bdev_io_submit(bdev_io);
+
        return 0;
 }
 
@@ -2414,6 +3119,11 @@ spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channe
                return -EINVAL;
        }
 
+       if (!_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) &&
+           !_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
+               return -ENOTSUP;
+       }
+
        bdev_io = spdk_bdev_get_io(channel);
 
        if (!bdev_io) {
@@ -2430,16 +3140,15 @@ spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channe
        if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
                spdk_bdev_io_submit(bdev_io);
                return 0;
-       } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
-               assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
-               bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
-               bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
-               _spdk_bdev_write_zero_buffer_next(bdev_io);
-               return 0;
-       } else {
-               spdk_bdev_free_io(bdev_io);
-               return -ENOTSUP;
        }
+
+       assert(_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE));
+       assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
+       bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
+       bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
+       _spdk_bdev_write_zero_buffer_next(bdev_io);
+
+       return 0;
 }
 
 int
@@ -2877,7 +3586,7 @@ _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
                bdev_io->internal.ch->io_outstanding++;
                shared_resource->io_outstanding++;
                bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
-               bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
+               bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io);
                if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
                        break;
                }
@@ -2888,7 +3597,7 @@ static inline void
 _spdk_bdev_io_complete(void *ctx)
 {
        struct spdk_bdev_io *bdev_io = ctx;
-       uint64_t tsc;
+       uint64_t tsc, tsc_diff;
 
        if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
                /*
@@ -2904,26 +3613,35 @@ _spdk_bdev_io_complete(void *ctx)
                 * Defer completion to avoid potential infinite recursion if the
                 * user's completion callback issues a new I/O.
                 */
-               spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
+               spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io),
                                     _spdk_bdev_io_complete, bdev_io);
                return;
        }
 
        tsc = spdk_get_ticks();
+       tsc_diff = tsc - bdev_io->internal.submit_tsc;
        spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0);
 
+       if (bdev_io->internal.ch->histogram) {
+               spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff);
+       }
+
        if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
                switch (bdev_io->type) {
                case SPDK_BDEV_IO_TYPE_READ:
                        bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
                        bdev_io->internal.ch->stat.num_read_ops++;
-                       bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
+                       bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff;
                        break;
                case SPDK_BDEV_IO_TYPE_WRITE:
                        bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
                        bdev_io->internal.ch->stat.num_write_ops++;
-                       bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
+                       bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff;
                        break;
+               case SPDK_BDEV_IO_TYPE_UNMAP:
+                       bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
+                       bdev_io->internal.ch->stat.num_unmap_ops++;
+                       bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff;
                default:
                        break;
                }
@@ -2939,7 +3657,7 @@ _spdk_bdev_io_complete(void *ctx)
                data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
                data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
                data[4] = bdev_io->bdev->fn_table->get_spin_time ?
-                         bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
+                         bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0;
 
                __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
                                   __itt_metadata_u64, 5, data);
@@ -2950,7 +3668,7 @@ _spdk_bdev_io_complete(void *ctx)
 #endif
 
        assert(bdev_io->internal.cb != NULL);
-       assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
+       assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io));
 
        bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
                             bdev_io->internal.caller_ctx);
@@ -3011,6 +3729,8 @@ spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status sta
                        return;
                }
        } else {
+               _bdev_io_unset_bounce_buf(bdev_io);
+
                assert(bdev_ch->io_outstanding > 0);
                assert(shared_resource->io_outstanding > 0);
                bdev_ch->io_outstanding--;
@@ -3126,6 +3846,12 @@ spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
        return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
 }
 
+struct spdk_io_channel *
+spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io)
+{
+       return bdev_io->internal.ch->channel;
+}
+
 static void
 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits)
 {
@@ -3262,6 +3988,16 @@ spdk_bdev_init(struct spdk_bdev *bdev)
        bdev->internal.qd_poller = NULL;
        bdev->internal.qos = NULL;
 
+       if (spdk_bdev_get_buf_align(bdev) > 1) {
+               if (bdev->split_on_optimal_io_boundary) {
+                       bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary,
+                                                            SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen);
+               } else {
+                       bdev->split_on_optimal_io_boundary = true;
+                       bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen;
+               }
+       }
+
        TAILQ_INIT(&bdev->internal.open_descs);
 
        TAILQ_INIT(&bdev->aliases);
@@ -3336,6 +4072,10 @@ spdk_bdev_start(struct spdk_bdev *bdev)
        }
 
        if (bdev->internal.claim_module) {
+               if (bdev->internal.claim_module->examine_disk) {
+                       bdev->internal.claim_module->internal.action_in_progress++;
+                       bdev->internal.claim_module->examine_disk(bdev);
+               }
                return;
        }
 
@@ -3356,21 +4096,15 @@ spdk_bdev_register(struct spdk_bdev *bdev)
                spdk_bdev_start(bdev);
        }
 
+       spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev));
        return rc;
 }
 
 int
 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
 {
-       int rc;
-
-       rc = spdk_bdev_init(vbdev);
-       if (rc) {
-               return rc;
-       }
-
-       spdk_bdev_start(vbdev);
-       return 0;
+       SPDK_ERRLOG("This function is deprecated.  Use spdk_bdev_register() instead.\n");
+       return spdk_bdev_register(vbdev);
 }
 
 void
@@ -3395,33 +4129,19 @@ _remove_notify(void *arg)
        }
 }
 
-void
-spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
+/* Must be called while holding bdev->internal.mutex.
+ * returns: 0 - bdev removed and ready to be destructed.
+ *          -EBUSY - bdev can't be destructed yet.  */
+static int
+spdk_bdev_unregister_unsafe(struct spdk_bdev *bdev)
 {
        struct spdk_bdev_desc   *desc, *tmp;
-       bool                    do_destruct = true;
-       struct spdk_thread      *thread;
-
-       SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
-
-       thread = spdk_get_thread();
-       if (!thread) {
-               /* The user called this from a non-SPDK thread. */
-               if (cb_fn != NULL) {
-                       cb_fn(cb_arg, -ENOTSUP);
-               }
-               return;
-       }
-
-       pthread_mutex_lock(&bdev->internal.mutex);
-
-       bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
-       bdev->internal.unregister_cb = cb_fn;
-       bdev->internal.unregister_ctx = cb_arg;
+       int                     rc = 0;
 
+       /* Notify each descriptor about hotremoval */
        TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
+               rc = -EBUSY;
                if (desc->remove_cb) {
-                       do_destruct = false;
                        /*
                         * Defer invocation of the remove_cb to a separate message that will
                         *  run later on its thread.  This ensures this context unwinds and
@@ -3436,15 +4156,53 @@ spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void
                }
        }
 
-       if (!do_destruct) {
+       /* If there are no descriptors, proceed removing the bdev */
+       if (rc == 0) {
+               TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
+               SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name);
+               spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev));
+       }
+
+       return rc;
+}
+
+void
+spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
+{
+       struct spdk_thread      *thread;
+       int                     rc;
+
+       SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
+
+       thread = spdk_get_thread();
+       if (!thread) {
+               /* The user called this from a non-SPDK thread. */
+               if (cb_fn != NULL) {
+                       cb_fn(cb_arg, -ENOTSUP);
+               }
+               return;
+       }
+
+       pthread_mutex_lock(&bdev->internal.mutex);
+       if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) {
                pthread_mutex_unlock(&bdev->internal.mutex);
+               if (cb_fn) {
+                       cb_fn(cb_arg, -EBUSY);
+               }
                return;
        }
 
-       TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
+       bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
+       bdev->internal.unregister_cb = cb_fn;
+       bdev->internal.unregister_ctx = cb_arg;
+
+       /* Call under lock. */
+       rc = spdk_bdev_unregister_unsafe(bdev);
        pthread_mutex_unlock(&bdev->internal.mutex);
 
-       spdk_bdev_fini(bdev);
+       if (rc == 0) {
+               spdk_bdev_fini(bdev);
+       }
 }
 
 int
@@ -3453,6 +4211,7 @@ spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_
 {
        struct spdk_bdev_desc *desc;
        struct spdk_thread *thread;
+       struct set_qos_limit_ctx *ctx;
 
        thread = spdk_get_thread();
        if (!thread) {
@@ -3469,24 +4228,41 @@ spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_
        SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
                      spdk_get_thread());
 
+       desc->bdev = bdev;
+       desc->thread = thread;
+       desc->remove_cb = remove_cb;
+       desc->remove_ctx = remove_ctx;
+       desc->write = write;
+       *_desc = desc;
+
        pthread_mutex_lock(&bdev->internal.mutex);
 
        if (write && bdev->internal.claim_module) {
                SPDK_ERRLOG("Could not open %s - %s module already claimed it\n",
                            bdev->name, bdev->internal.claim_module->name);
-               free(desc);
                pthread_mutex_unlock(&bdev->internal.mutex);
+               free(desc);
+               *_desc = NULL;
                return -EPERM;
        }
 
-       TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
+       /* Enable QoS */
+       if (bdev->internal.qos && bdev->internal.qos->thread == NULL) {
+               ctx = calloc(1, sizeof(*ctx));
+               if (ctx == NULL) {
+                       SPDK_ERRLOG("Failed to allocate memory for QoS context\n");
+                       pthread_mutex_unlock(&bdev->internal.mutex);
+                       free(desc);
+                       *_desc = NULL;
+                       return -ENOMEM;
+               }
+               ctx->bdev = bdev;
+               spdk_for_each_channel(__bdev_to_io_dev(bdev),
+                                     _spdk_bdev_enable_qos_msg, ctx,
+                                     _spdk_bdev_enable_qos_done);
+       }
 
-       desc->bdev = bdev;
-       desc->thread = thread;
-       desc->remove_cb = remove_cb;
-       desc->remove_ctx = remove_ctx;
-       desc->write = write;
-       *_desc = desc;
+       TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
 
        pthread_mutex_unlock(&bdev->internal.mutex);
 
@@ -3497,12 +4273,15 @@ void
 spdk_bdev_close(struct spdk_bdev_desc *desc)
 {
        struct spdk_bdev *bdev = desc->bdev;
-       bool do_unregister = false;
+       int rc;
 
        SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
                      spdk_get_thread());
 
-       assert(desc->thread == spdk_get_thread());
+       if (desc->thread != spdk_get_thread()) {
+               SPDK_ERRLOG("Descriptor %p for bdev %s closed on wrong thread (%p, expected %p)\n",
+                           desc, bdev->name, spdk_get_thread(), desc->thread);
+       }
 
        pthread_mutex_lock(&bdev->internal.mutex);
 
@@ -3530,12 +4309,14 @@ spdk_bdev_close(struct spdk_bdev_desc *desc)
        spdk_bdev_set_qd_sampling_period(bdev, 0);
 
        if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
-               do_unregister = true;
-       }
-       pthread_mutex_unlock(&bdev->internal.mutex);
+               rc = spdk_bdev_unregister_unsafe(bdev);
+               pthread_mutex_unlock(&bdev->internal.mutex);
 
-       if (do_unregister == true) {
-               spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
+               if (rc == 0) {
+                       spdk_bdev_fini(bdev);
+               }
+       } else {
+               pthread_mutex_unlock(&bdev->internal.mutex);
        }
 }
 
@@ -3582,10 +4363,8 @@ spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *i
 
        switch (bdev_io->type) {
        case SPDK_BDEV_IO_TYPE_READ:
-               iovs = bdev_io->u.bdev.iovs;
-               iovcnt = bdev_io->u.bdev.iovcnt;
-               break;
        case SPDK_BDEV_IO_TYPE_WRITE:
+       case SPDK_BDEV_IO_TYPE_ZCOPY:
                iovs = bdev_io->u.bdev.iovs;
                iovcnt = bdev_io->u.bdev.iovcnt;
                break;
@@ -3603,6 +4382,25 @@ spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *i
        }
 }
 
+void *
+spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io)
+{
+       if (bdev_io == NULL) {
+               return NULL;
+       }
+
+       if (!spdk_bdev_is_md_separate(bdev_io->bdev)) {
+               return NULL;
+       }
+
+       if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ ||
+           bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
+               return bdev_io->u.bdev.md_buf;
+       }
+
+       return NULL;
+}
+
 void
 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
 {
@@ -3612,10 +4410,6 @@ spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
                assert(false);
        }
 
-       if (bdev_module->async_init) {
-               bdev_module->internal.action_in_progress = 1;
-       }
-
        /*
         * Modules with examine callbacks must be initialized first, so they are
         *  ready to handle examine callbacks from later modules that will
@@ -3692,12 +4486,6 @@ _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, vo
        _spdk_bdev_write_zero_buffer_next(parent_io);
 }
 
-struct set_qos_limit_ctx {
-       void (*cb_fn)(void *cb_arg, int status);
-       void *cb_arg;
-       struct spdk_bdev *bdev;
-};
-
 static void
 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
 {
@@ -3705,7 +4493,9 @@ _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
        ctx->bdev->internal.qos_mod_in_progress = false;
        pthread_mutex_unlock(&ctx->bdev->internal.mutex);
 
-       ctx->cb_fn(ctx->cb_arg, status);
+       if (ctx->cb_fn) {
+               ctx->cb_fn(ctx->cb_arg, status);
+       }
        free(ctx);
 }
 
@@ -3736,12 +4526,14 @@ _spdk_bdev_disable_qos_done(void *cb_arg)
                        bdev_io->internal.io_submit_ch = NULL;
                }
 
-               spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
+               spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io),
                                     _spdk_bdev_io_submit, bdev_io);
        }
 
-       spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
-       spdk_poller_unregister(&qos->poller);
+       if (qos->thread != NULL) {
+               spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
+               spdk_poller_unregister(&qos->poller);
+       }
 
        free(qos);
 
@@ -3760,7 +4552,11 @@ _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
        thread = bdev->internal.qos->thread;
        pthread_mutex_unlock(&bdev->internal.mutex);
 
-       spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
+       if (thread != NULL) {
+               spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
+       } else {
+               _spdk_bdev_disable_qos_done(ctx);
+       }
 }
 
 static void
@@ -3897,7 +4693,6 @@ spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits,
 
        if (disable_rate_limit == false) {
                if (bdev->internal.qos == NULL) {
-                       /* Enabling */
                        bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
                        if (!bdev->internal.qos) {
                                pthread_mutex_unlock(&bdev->internal.mutex);
@@ -3906,7 +4701,10 @@ spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits,
                                cb_fn(cb_arg, -ENOMEM);
                                return;
                        }
+               }
 
+               if (bdev->internal.qos->thread == NULL) {
+                       /* Enabling */
                        _spdk_bdev_set_qos_rate_limits(bdev, limits);
 
                        spdk_for_each_channel(__bdev_to_io_dev(bdev),
@@ -3937,14 +4735,179 @@ spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits,
        pthread_mutex_unlock(&bdev->internal.mutex);
 }
 
+struct spdk_bdev_histogram_ctx {
+       spdk_bdev_histogram_status_cb cb_fn;
+       void *cb_arg;
+       struct spdk_bdev *bdev;
+       int status;
+};
+
+static void
+_spdk_bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status)
+{
+       struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
+
+       pthread_mutex_lock(&ctx->bdev->internal.mutex);
+       ctx->bdev->internal.histogram_in_progress = false;
+       pthread_mutex_unlock(&ctx->bdev->internal.mutex);
+       ctx->cb_fn(ctx->cb_arg, ctx->status);
+       free(ctx);
+}
+
+static void
+_spdk_bdev_histogram_disable_channel(struct spdk_io_channel_iter *i)
+{
+       struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
+       struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
+
+       if (ch->histogram != NULL) {
+               spdk_histogram_data_free(ch->histogram);
+               ch->histogram = NULL;
+       }
+       spdk_for_each_channel_continue(i, 0);
+}
+
+static void
+_spdk_bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status)
+{
+       struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
+
+       if (status != 0) {
+               ctx->status = status;
+               ctx->bdev->internal.histogram_enabled = false;
+               spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), _spdk_bdev_histogram_disable_channel, ctx,
+                                     _spdk_bdev_histogram_disable_channel_cb);
+       } else {
+               pthread_mutex_lock(&ctx->bdev->internal.mutex);
+               ctx->bdev->internal.histogram_in_progress = false;
+               pthread_mutex_unlock(&ctx->bdev->internal.mutex);
+               ctx->cb_fn(ctx->cb_arg, ctx->status);
+               free(ctx);
+       }
+}
+
+static void
+_spdk_bdev_histogram_enable_channel(struct spdk_io_channel_iter *i)
+{
+       struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
+       struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
+       int status = 0;
+
+       if (ch->histogram == NULL) {
+               ch->histogram = spdk_histogram_data_alloc();
+               if (ch->histogram == NULL) {
+                       status = -ENOMEM;
+               }
+       }
+
+       spdk_for_each_channel_continue(i, status);
+}
+
+void
+spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn,
+                          void *cb_arg, bool enable)
+{
+       struct spdk_bdev_histogram_ctx *ctx;
+
+       ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx));
+       if (ctx == NULL) {
+               cb_fn(cb_arg, -ENOMEM);
+               return;
+       }
+
+       ctx->bdev = bdev;
+       ctx->status = 0;
+       ctx->cb_fn = cb_fn;
+       ctx->cb_arg = cb_arg;
+
+       pthread_mutex_lock(&bdev->internal.mutex);
+       if (bdev->internal.histogram_in_progress) {
+               pthread_mutex_unlock(&bdev->internal.mutex);
+               free(ctx);
+               cb_fn(cb_arg, -EAGAIN);
+               return;
+       }
+
+       bdev->internal.histogram_in_progress = true;
+       pthread_mutex_unlock(&bdev->internal.mutex);
+
+       bdev->internal.histogram_enabled = enable;
+
+       if (enable) {
+               /* Allocate histogram for each channel */
+               spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_enable_channel, ctx,
+                                     _spdk_bdev_histogram_enable_channel_cb);
+       } else {
+               spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_disable_channel, ctx,
+                                     _spdk_bdev_histogram_disable_channel_cb);
+       }
+}
+
+struct spdk_bdev_histogram_data_ctx {
+       spdk_bdev_histogram_data_cb cb_fn;
+       void *cb_arg;
+       struct spdk_bdev *bdev;
+       /** merged histogram data from all channels */
+       struct spdk_histogram_data      *histogram;
+};
+
+static void
+_spdk_bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status)
+{
+       struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
+
+       ctx->cb_fn(ctx->cb_arg, status, ctx->histogram);
+       free(ctx);
+}
+
+static void
+_spdk_bdev_histogram_get_channel(struct spdk_io_channel_iter *i)
+{
+       struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
+       struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
+       struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
+       int status = 0;
+
+       if (ch->histogram == NULL) {
+               status = -EFAULT;
+       } else {
+               spdk_histogram_data_merge(ctx->histogram, ch->histogram);
+       }
+
+       spdk_for_each_channel_continue(i, status);
+}
+
+void
+spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram,
+                       spdk_bdev_histogram_data_cb cb_fn,
+                       void *cb_arg)
+{
+       struct spdk_bdev_histogram_data_ctx *ctx;
+
+       ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx));
+       if (ctx == NULL) {
+               cb_fn(cb_arg, -ENOMEM, NULL);
+               return;
+       }
+
+       ctx->bdev = bdev;
+       ctx->cb_fn = cb_fn;
+       ctx->cb_arg = cb_arg;
+
+       ctx->histogram = histogram;
+
+       spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_get_channel, ctx,
+                             _spdk_bdev_histogram_get_channel_cb);
+}
+
 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
 
-SPDK_TRACE_REGISTER_FN(bdev_trace)
+SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV)
 {
        spdk_trace_register_owner(OWNER_BDEV, 'b');
        spdk_trace_register_object(OBJECT_BDEV_IO, 'i');
-       spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV,
+       spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV,
                                        OBJECT_BDEV_IO, 1, 0, "type:   ");
-       spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV,
+       spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV,
                                        OBJECT_BDEV_IO, 0, 0, "");
 }