]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/spdk/lib/blobfs/blobfs.c
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / lib / blobfs / blobfs.c
index 666cfa9693b9e02083c7203e82996f8428d9c49b..3af6b0639f6976a7f97d07b3910d460aeaf70556 100644 (file)
@@ -35,7 +35,7 @@
 
 #include "spdk/blobfs.h"
 #include "spdk/conf.h"
-#include "blobfs_internal.h"
+#include "tree.h"
 
 #include "spdk/queue.h"
 #include "spdk/thread.h"
 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
 
+#define SPDK_BLOBFS_SIGNATURE  "BLOBFS"
+
 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
 static struct spdk_mempool *g_cache_pool;
 static TAILQ_HEAD(, spdk_file) g_caches;
+static struct spdk_poller *g_cache_pool_mgmt_poller;
+static struct spdk_thread *g_cache_pool_thread;
+#define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
 static int g_fs_count = 0;
 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_spinlock_t g_caches_lock;
 
 #define TRACE_GROUP_BLOBFS     0x7
 #define TRACE_BLOBFS_XATTR_START       SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
@@ -104,7 +108,7 @@ SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
 }
 
 void
-spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
+cache_buffer_free(struct cache_buffer *cache_buffer)
 {
        spdk_mempool_put(g_cache_pool, cache_buffer->buf);
        free(cache_buffer);
@@ -245,9 +249,9 @@ struct spdk_fs_cb_args {
        } op;
 };
 
-static void cache_free_buffers(struct spdk_file *file);
-static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs);
-static void spdk_fs_free_io_channels(struct spdk_filesystem *fs);
+static void file_free(struct spdk_file *file);
+static void fs_io_device_unregister(struct spdk_filesystem *fs);
+static void fs_free_io_channels(struct spdk_filesystem *fs);
 
 void
 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
@@ -255,8 +259,26 @@ spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
        opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
 }
 
+static int _blobfs_cache_pool_reclaim(void *arg);
+
+static bool
+blobfs_cache_pool_need_reclaim(void)
+{
+       size_t count;
+
+       count = spdk_mempool_count(g_cache_pool);
+       /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
+        *  when the number of available cache buffer is less than 1/5 of total buffers.
+        */
+       if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
+               return false;
+       }
+
+       return true;
+}
+
 static void
-__initialize_cache(void)
+__start_cache_pool_mgmt(void *ctx)
 {
        assert(g_cache_pool == NULL);
 
@@ -271,16 +293,47 @@ __initialize_cache(void)
                assert(false);
        }
        TAILQ_INIT(&g_caches);
-       pthread_spin_init(&g_caches_lock, 0);
+
+       assert(g_cache_pool_mgmt_poller == NULL);
+       g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
+                                  BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
 }
 
 static void
-__free_cache(void)
+__stop_cache_pool_mgmt(void *ctx)
 {
-       assert(g_cache_pool != NULL);
+       spdk_poller_unregister(&g_cache_pool_mgmt_poller);
 
+       assert(g_cache_pool != NULL);
+       assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
        spdk_mempool_free(g_cache_pool);
        g_cache_pool = NULL;
+
+       spdk_thread_exit(g_cache_pool_thread);
+}
+
+static void
+initialize_global_cache(void)
+{
+       pthread_mutex_lock(&g_cache_init_lock);
+       if (g_fs_count == 0) {
+               g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
+               assert(g_cache_pool_thread != NULL);
+               spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
+       }
+       g_fs_count++;
+       pthread_mutex_unlock(&g_cache_init_lock);
+}
+
+static void
+free_global_cache(void)
+{
+       pthread_mutex_lock(&g_cache_init_lock);
+       g_fs_count--;
+       if (g_fs_count == 0) {
+               spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
+       }
+       pthread_mutex_unlock(&g_cache_init_lock);
 }
 
 static uint64_t
@@ -388,8 +441,8 @@ free_fs_request(struct spdk_fs_request *req)
 }
 
 static int
-_spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
-                       uint32_t max_ops)
+fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
+                 uint32_t max_ops)
 {
        uint32_t i;
 
@@ -412,40 +465,40 @@ _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *chan
 }
 
 static int
-_spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
+fs_md_channel_create(void *io_device, void *ctx_buf)
 {
        struct spdk_filesystem          *fs;
        struct spdk_fs_channel          *channel = ctx_buf;
 
        fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
 
-       return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
+       return fs_channel_create(fs, channel, fs->md_target.max_ops);
 }
 
 static int
-_spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
+fs_sync_channel_create(void *io_device, void *ctx_buf)
 {
        struct spdk_filesystem          *fs;
        struct spdk_fs_channel          *channel = ctx_buf;
 
        fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
 
-       return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
+       return fs_channel_create(fs, channel, fs->sync_target.max_ops);
 }
 
 static int
-_spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
+fs_io_channel_create(void *io_device, void *ctx_buf)
 {
        struct spdk_filesystem          *fs;
        struct spdk_fs_channel          *channel = ctx_buf;
 
        fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
 
-       return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
+       return fs_channel_create(fs, channel, fs->io_target.max_ops);
 }
 
 static void
-_spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
+fs_channel_destroy(void *io_device, void *ctx_buf)
 {
        struct spdk_fs_channel *channel = ctx_buf;
 
@@ -476,12 +529,7 @@ common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
        fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
        fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
 
-       pthread_mutex_lock(&g_cache_init_lock);
-       if (g_fs_count == 0) {
-               __initialize_cache();
-       }
-       g_fs_count++;
-       pthread_mutex_unlock(&g_cache_init_lock);
+       initialize_global_cache();
 }
 
 static void
@@ -506,6 +554,7 @@ static void
 fs_conf_parse(void)
 {
        struct spdk_conf_section *sp;
+       int cache_buffer_shift;
 
        sp = spdk_conf_find_section(NULL, "Blobfs");
        if (sp == NULL) {
@@ -513,9 +562,11 @@ fs_conf_parse(void)
                return;
        }
 
-       g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
-       if (g_fs_cache_buffer_shift <= 0) {
+       cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
+       if (cache_buffer_shift <= 0) {
                g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
+       } else {
+               g_fs_cache_buffer_shift = cache_buffer_shift;
        }
 }
 
@@ -534,19 +585,19 @@ fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
        TAILQ_INIT(&fs->files);
 
        fs->md_target.max_ops = 512;
-       spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
+       spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
                                sizeof(struct spdk_fs_channel), "blobfs_md");
        fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
        fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
 
        fs->sync_target.max_ops = 512;
-       spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
+       spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
                                sizeof(struct spdk_fs_channel), "blobfs_sync");
        fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
        fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
 
        fs->io_target.max_ops = 512;
-       spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
+       spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
                                sizeof(struct spdk_fs_channel), "blobfs_io");
 
        return fs;
@@ -581,8 +632,8 @@ spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
 
        req = alloc_fs_request(fs->md_target.md_fs_channel);
        if (req == NULL) {
-               spdk_fs_free_io_channels(fs);
-               spdk_fs_io_device_unregister(fs);
+               fs_free_io_channels(fs);
+               fs_io_device_unregister(fs);
                cb_fn(cb_arg, NULL, -ENOMEM);
                return;
        }
@@ -593,7 +644,7 @@ spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
        args->fs = fs;
 
        spdk_bs_opts_init(&opts);
-       snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
+       snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
        if (opt) {
                opts.cluster_sz = opt->cluster_sz;
        }
@@ -616,10 +667,15 @@ file_alloc(struct spdk_filesystem *fs)
                return NULL;
        }
 
+       if (pthread_spin_init(&file->lock, 0)) {
+               free(file->tree);
+               free(file);
+               return NULL;
+       }
+
        file->fs = fs;
        TAILQ_INIT(&file->open_requests);
        TAILQ_INIT(&file->sync_requests);
-       pthread_spin_init(&file->lock, 0);
        TAILQ_INSERT_TAIL(&fs->files, file, tailq);
        file->priority = SPDK_FILE_PRIORITY_LOW;
        return file;
@@ -753,27 +809,29 @@ load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
        struct spdk_fs_cb_args *args = &req->args;
        struct spdk_filesystem *fs = args->fs;
        struct spdk_bs_type bstype;
-       static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
+       static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
        static const struct spdk_bs_type zeros;
 
        if (bserrno != 0) {
                args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
                free_fs_request(req);
-               free(fs);
+               fs_free_io_channels(fs);
+               fs_io_device_unregister(fs);
                return;
        }
 
        bstype = spdk_bs_get_bstype(bs);
 
        if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
-               SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
+               SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n");
                spdk_bs_set_bstype(bs, blobfs_type);
        } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
-               SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
-               SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
-               args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
+               SPDK_ERRLOG("not blobfs\n");
+               SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype));
+               args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
                free_fs_request(req);
-               free(fs);
+               fs_free_io_channels(fs);
+               fs_io_device_unregister(fs);
                return;
        }
 
@@ -782,7 +840,7 @@ load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
 }
 
 static void
-spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
+fs_io_device_unregister(struct spdk_filesystem *fs)
 {
        assert(fs != NULL);
        spdk_io_device_unregister(&fs->md_target, NULL);
@@ -792,7 +850,7 @@ spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
 }
 
 static void
-spdk_fs_free_io_channels(struct spdk_filesystem *fs)
+fs_free_io_channels(struct spdk_filesystem *fs)
 {
        assert(fs != NULL);
        spdk_fs_free_io_channel(fs->md_target.md_io_channel);
@@ -818,8 +876,8 @@ spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
 
        req = alloc_fs_request(fs->md_target.md_fs_channel);
        if (req == NULL) {
-               spdk_fs_free_io_channels(fs);
-               spdk_fs_io_device_unregister(fs);
+               fs_free_io_channels(fs);
+               fs_io_device_unregister(fs);
                cb_fn(cb_arg, NULL, -ENOMEM);
                return;
        }
@@ -845,23 +903,15 @@ unload_cb(void *ctx, int bserrno)
 
        TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
                TAILQ_REMOVE(&fs->files, file, tailq);
-               cache_free_buffers(file);
-               free(file->name);
-               free(file->tree);
-               free(file);
+               file_free(file);
        }
 
-       pthread_mutex_lock(&g_cache_init_lock);
-       g_fs_count--;
-       if (g_fs_count == 0) {
-               __free_cache();
-       }
-       pthread_mutex_unlock(&g_cache_init_lock);
+       free_global_cache();
 
        args->fn.fs_op(args->arg, bserrno);
        free(req);
 
-       spdk_fs_io_device_unregister(fs);
+       fs_io_device_unregister(fs);
 }
 
 void
@@ -885,7 +935,7 @@ spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_a
        args->arg = cb_arg;
        args->fs = fs;
 
-       spdk_fs_free_io_channels(fs);
+       fs_free_io_channels(fs);
        spdk_bs_unload(fs->bs, unload_cb, req);
 }
 
@@ -1092,8 +1142,7 @@ __fs_create_file_done(void *arg, int fserrno)
        struct spdk_fs_request *req = arg;
        struct spdk_fs_cb_args *args = &req->args;
 
-       args->rc = fserrno;
-       sem_post(args->sem);
+       __wake_caller(args, fserrno);
        SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
 }
 
@@ -1312,7 +1361,7 @@ fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
 }
 
 static void
-__spdk_fs_md_rename_file(struct spdk_fs_request *req)
+_fs_md_rename_file(struct spdk_fs_request *req)
 {
        struct spdk_fs_cb_args *args = &req->args;
        struct spdk_file *f;
@@ -1334,7 +1383,7 @@ __spdk_fs_md_rename_file(struct spdk_fs_request *req)
 static void
 fs_rename_delete_done(void *arg, int fserrno)
 {
-       __spdk_fs_md_rename_file(arg);
+       _fs_md_rename_file(arg);
 }
 
 void
@@ -1369,7 +1418,7 @@ spdk_fs_rename_file_async(struct spdk_filesystem *fs,
 
        f = fs_find_file(fs, new_name);
        if (f == NULL) {
-               __spdk_fs_md_rename_file(req);
+               _fs_md_rename_file(req);
                return;
        }
 
@@ -1479,15 +1528,10 @@ spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
                return;
        }
 
+       blobid = f->blobid;
        TAILQ_REMOVE(&fs->files, f, tailq);
 
-       cache_free_buffers(f);
-
-       blobid = f->blobid;
-
-       free(f->name);
-       free(f->tree);
-       free(f);
+       file_free(f);
 
        spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
 }
@@ -1710,22 +1754,50 @@ __rw_done(void *ctx, int bserrno)
        free_fs_request(req);
 }
 
+static void
+_copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
+{
+       int i;
+       size_t len;
+
+       for (i = 0; i < iovcnt; i++) {
+               len = spdk_min(iovs[i].iov_len, buf_len);
+               memcpy(buf, iovs[i].iov_base, len);
+               buf += len;
+               assert(buf_len >= len);
+               buf_len -= len;
+       }
+}
+
+static void
+_copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
+{
+       int i;
+       size_t len;
+
+       for (i = 0; i < iovcnt; i++) {
+               len = spdk_min(iovs[i].iov_len, buf_len);
+               memcpy(iovs[i].iov_base, buf, len);
+               buf += len;
+               assert(buf_len >= len);
+               buf_len -= len;
+       }
+}
+
 static void
 __read_done(void *ctx, int bserrno)
 {
        struct spdk_fs_request *req = ctx;
        struct spdk_fs_cb_args *args = &req->args;
+       void *buf;
 
        assert(req != NULL);
+       buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
        if (args->op.rw.is_read) {
-               memcpy(args->iovs[0].iov_base,
-                      args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
-                      args->iovs[0].iov_len);
+               _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
                __rw_done(req, 0);
        } else {
-               memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
-                      args->iovs[0].iov_base,
-                      args->iovs[0].iov_len);
+               _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
                spdk_blob_io_write(args->file->blob, args->op.rw.channel,
                                   args->op.rw.pin_buf,
                                   args->op.rw.start_lba, args->op.rw.num_lba,
@@ -1761,10 +1833,33 @@ __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
        *num_lba = (end_lba - *start_lba + 1);
 }
 
+static bool
+__is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
+{
+       uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
+
+       if ((offset % lba_size == 0) && (length % lba_size == 0)) {
+               return true;
+       }
+
+       return false;
+}
+
 static void
-__readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
-           void *payload, uint64_t offset, uint64_t length,
-           spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
+_fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
+{
+       uint32_t i;
+
+       for (i = 0; i < iovcnt; i++) {
+               req->args.iovs[i].iov_base = iovs[i].iov_base;
+               req->args.iovs[i].iov_len = iovs[i].iov_len;
+       }
+}
+
+static void
+__readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
+             struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
+             spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
 {
        struct spdk_fs_request *req;
        struct spdk_fs_cb_args *args;
@@ -1777,7 +1872,7 @@ __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
                return;
        }
 
-       req = alloc_fs_request_with_iov(channel, 1);
+       req = alloc_fs_request_with_iov(channel, iovcnt);
        if (req == NULL) {
                cb_fn(cb_arg, -ENOMEM);
                return;
@@ -1790,13 +1885,13 @@ __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
        args->arg = cb_arg;
        args->file = file;
        args->op.rw.channel = channel->bs_channel;
-       args->iovs[0].iov_base = payload;
-       args->iovs[0].iov_len = (size_t)length;
+       _fs_request_setup_iovs(req, iovs, iovcnt);
        args->op.rw.is_read = is_read;
        args->op.rw.offset = offset;
        args->op.rw.blocklen = lba_size;
 
        pin_buf_length = num_lba * lba_size;
+       args->op.rw.length = pin_buf_length;
        args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
                                          SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
        if (args->op.rw.pin_buf == NULL) {
@@ -1812,11 +1907,30 @@ __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
 
        if (!is_read && file->length < offset + length) {
                spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
+       } else if (!is_read && __is_lba_aligned(file, offset, length)) {
+               _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
+               spdk_blob_io_write(args->file->blob, args->op.rw.channel,
+                                  args->op.rw.pin_buf,
+                                  args->op.rw.start_lba, args->op.rw.num_lba,
+                                  __rw_done, req);
        } else {
                __do_blob_read(req, 0);
        }
 }
 
+static void
+__readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
+           void *payload, uint64_t offset, uint64_t length,
+           spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
+{
+       struct iovec iov;
+
+       iov.iov_base = payload;
+       iov.iov_len = (size_t)length;
+
+       __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
+}
+
 void
 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
                      void *payload, uint64_t offset, uint64_t length,
@@ -1825,6 +1939,17 @@ spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
        __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
 }
 
+void
+spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
+                      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
+                      spdk_file_op_complete cb_fn, void *cb_arg)
+{
+       SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
+                     file->name, offset, length);
+
+       __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
+}
+
 void
 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
                     void *payload, uint64_t offset, uint64_t length,
@@ -1835,6 +1960,17 @@ spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
        __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
 }
 
+void
+spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
+                     struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
+                     spdk_file_op_complete cb_fn, void *cb_arg)
+{
+       SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
+                     file->name, offset, length);
+
+       __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
+}
+
 struct spdk_io_channel *
 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
 {
@@ -1865,11 +2001,15 @@ spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
                return NULL;
        }
 
-       _spdk_fs_channel_create(fs, &ctx->ch, 512);
+       if (pthread_spin_init(&ctx->ch.lock, 0)) {
+               free(ctx);
+               return NULL;
+       }
+
+       fs_channel_create(fs, &ctx->ch, 512);
 
        ctx->ch.send_request = fs->send_request;
        ctx->ch.sync = 1;
-       pthread_spin_init(&ctx->ch.lock, 0);
 
        return ctx;
 }
@@ -1890,14 +2030,23 @@ spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
                usleep(1000);
        }
 
-       _spdk_fs_channel_destroy(NULL, &ctx->ch);
+       fs_channel_destroy(NULL, &ctx->ch);
        free(ctx);
 }
 
-void
+int
 spdk_fs_set_cache_size(uint64_t size_in_mb)
 {
+       /* setting g_fs_cache_size is only permitted if cache pool
+        * is already freed or hasn't been initialized
+        */
+       if (g_cache_pool != NULL) {
+               return -EPERM;
+       }
+
        g_fs_cache_size = size_in_mb * 1024 * 1024;
+
+       return 0;
 }
 
 uint64_t
@@ -1908,65 +2057,104 @@ spdk_fs_get_cache_size(void)
 
 static void __file_flush(void *ctx);
 
-static void *
-alloc_cache_memory_buffer(struct spdk_file *context)
+/* Try to free some cache buffers from this file.
+ */
+static int
+reclaim_cache_buffers(struct spdk_file *file)
 {
-       struct spdk_file *file;
-       void *buf;
+       int rc;
+
+       BLOBFS_TRACE(file, "free=%s\n", file->name);
 
-       buf = spdk_mempool_get(g_cache_pool);
-       if (buf != NULL) {
-               return buf;
+       /* The function is safe to be called with any threads, while the file
+        * lock maybe locked by other thread for now, so try to get the file
+        * lock here.
+        */
+       rc = pthread_spin_trylock(&file->lock);
+       if (rc != 0) {
+               return -1;
        }
 
-       pthread_spin_lock(&g_caches_lock);
-       TAILQ_FOREACH(file, &g_caches, cache_tailq) {
-               if (!file->open_for_writing &&
-                   file->priority == SPDK_FILE_PRIORITY_LOW &&
-                   file != context) {
-                       break;
-               }
+       if (file->tree->present_mask == 0) {
+               pthread_spin_unlock(&file->lock);
+               return -1;
        }
-       pthread_spin_unlock(&g_caches_lock);
-       if (file != NULL) {
-               cache_free_buffers(file);
-               buf = spdk_mempool_get(g_cache_pool);
-               if (buf != NULL) {
-                       return buf;
-               }
+       tree_free_buffers(file->tree);
+
+       TAILQ_REMOVE(&g_caches, file, cache_tailq);
+       /* If not freed, put it in the end of the queue */
+       if (file->tree->present_mask != 0) {
+               TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
+       } else {
+               file->last = NULL;
        }
+       pthread_spin_unlock(&file->lock);
 
-       pthread_spin_lock(&g_caches_lock);
-       TAILQ_FOREACH(file, &g_caches, cache_tailq) {
-               if (!file->open_for_writing && file != context) {
-                       break;
-               }
+       return 0;
+}
+
+static int
+_blobfs_cache_pool_reclaim(void *arg)
+{
+       struct spdk_file *file, *tmp;
+       int rc;
+
+       if (!blobfs_cache_pool_need_reclaim()) {
+               return SPDK_POLLER_IDLE;
        }
-       pthread_spin_unlock(&g_caches_lock);
-       if (file != NULL) {
-               cache_free_buffers(file);
-               buf = spdk_mempool_get(g_cache_pool);
-               if (buf != NULL) {
-                       return buf;
+
+       TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
+               if (!file->open_for_writing &&
+                   file->priority == SPDK_FILE_PRIORITY_LOW) {
+                       rc = reclaim_cache_buffers(file);
+                       if (rc < 0) {
+                               continue;
+                       }
+                       if (!blobfs_cache_pool_need_reclaim()) {
+                               return SPDK_POLLER_BUSY;
+                       }
+                       break;
                }
        }
 
-       pthread_spin_lock(&g_caches_lock);
-       TAILQ_FOREACH(file, &g_caches, cache_tailq) {
-               if (file != context) {
+       TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
+               if (!file->open_for_writing) {
+                       rc = reclaim_cache_buffers(file);
+                       if (rc < 0) {
+                               continue;
+                       }
+                       if (!blobfs_cache_pool_need_reclaim()) {
+                               return SPDK_POLLER_BUSY;
+                       }
                        break;
                }
        }
-       pthread_spin_unlock(&g_caches_lock);
-       if (file != NULL) {
-               cache_free_buffers(file);
-               buf = spdk_mempool_get(g_cache_pool);
-               if (buf != NULL) {
-                       return buf;
+
+       TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
+               rc = reclaim_cache_buffers(file);
+               if (rc < 0) {
+                       continue;
                }
+               break;
        }
 
-       return NULL;
+       return SPDK_POLLER_BUSY;
+}
+
+static void
+_add_file_to_cache_pool(void *ctx)
+{
+       struct spdk_file *file = ctx;
+
+       TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
+}
+
+static void
+_remove_file_from_cache_pool(void *ctx)
+{
+       struct spdk_file *file = ctx;
+
+       TAILQ_REMOVE(&g_caches, file, cache_tailq);
 }
 
 static struct cache_buffer *
@@ -1974,6 +2162,7 @@ cache_insert_buffer(struct spdk_file *file, uint64_t offset)
 {
        struct cache_buffer *buf;
        int count = 0;
+       bool need_update = false;
 
        buf = calloc(1, sizeof(*buf));
        if (buf == NULL) {
@@ -1981,33 +2170,31 @@ cache_insert_buffer(struct spdk_file *file, uint64_t offset)
                return NULL;
        }
 
-       buf->buf = alloc_cache_memory_buffer(file);
-       while (buf->buf == NULL) {
-               /*
-                * TODO: alloc_cache_memory_buffer() should eventually free
-                *  some buffers.  Need a more sophisticated check here, instead
-                *  of just bailing if 100 tries does not result in getting a
-                *  free buffer.  This will involve using the sync channel's
-                *  semaphore to block until a buffer becomes available.
-                */
+       do {
+               buf->buf = spdk_mempool_get(g_cache_pool);
+               if (buf->buf) {
+                       break;
+               }
                if (count++ == 100) {
                        SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
                                    file, offset);
                        free(buf);
                        return NULL;
                }
-               buf->buf = alloc_cache_memory_buffer(file);
-       }
+               usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
+       } while (true);
 
        buf->buf_size = CACHE_BUFFER_SIZE;
        buf->offset = offset;
 
-       pthread_spin_lock(&g_caches_lock);
        if (file->tree->present_mask == 0) {
-               TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
+               need_update = true;
+       }
+       file->tree = tree_insert_buffer(file->tree, buf);
+
+       if (need_update) {
+               spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
        }
-       file->tree = spdk_tree_insert_buffer(file->tree, buf);
-       pthread_spin_unlock(&g_caches_lock);
 
        return buf;
 }
@@ -2052,10 +2239,8 @@ __file_cache_finish_sync(void *ctx, int bserrno)
        pthread_spin_unlock(&file->lock);
 
        sync_args->fn.file_op(sync_args->arg, bserrno);
-       pthread_spin_lock(&file->lock);
-       free_fs_request(sync_req);
-       pthread_spin_unlock(&file->lock);
 
+       free_fs_request(sync_req);
        __check_sync_reqs(file);
 }
 
@@ -2107,7 +2292,7 @@ __file_flush_done(void *ctx, int bserrno)
        }
        if (next->bytes_flushed == next->buf_size) {
                BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
-               next = spdk_tree_find_buffer(file->tree, file->length_flushed);
+               next = tree_find_buffer(file->tree, file->length_flushed);
        }
 
        /*
@@ -2135,7 +2320,7 @@ __file_flush(void *ctx)
        uint32_t lba_size;
 
        pthread_spin_lock(&file->lock);
-       next = spdk_tree_find_buffer(file->tree, file->length_flushed);
+       next = tree_find_buffer(file->tree, file->length_flushed);
        if (next == NULL || next->in_progress ||
            ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
                /*
@@ -2185,7 +2370,7 @@ __file_flush(void *ctx)
        __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
 
        next->in_progress = true;
-       BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
+       BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
                     offset, length, start_lba, num_lba);
        pthread_spin_unlock(&file->lock);
        spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
@@ -2435,7 +2620,7 @@ check_readahead(struct spdk_file *file, uint64_t offset,
        struct spdk_fs_cb_args *args;
 
        offset = __next_cache_buffer_offset(offset);
-       if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
+       if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
                return;
        }
 
@@ -2465,39 +2650,6 @@ check_readahead(struct spdk_file *file, uint64_t offset,
        file->fs->send_request(__readahead, req);
 }
 
-static int
-__file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
-           struct spdk_fs_channel *channel)
-{
-       struct cache_buffer *buf;
-       int rc;
-
-       buf = spdk_tree_find_filled_buffer(file->tree, offset);
-       if (buf == NULL) {
-               pthread_spin_unlock(&file->lock);
-               rc = __send_rw_from_file(file, payload, offset, length, true, channel);
-               pthread_spin_lock(&file->lock);
-               return rc;
-       }
-
-       if ((offset + length) > (buf->offset + buf->bytes_filled)) {
-               length = buf->offset + buf->bytes_filled - offset;
-       }
-       BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
-       memcpy(payload, &buf->buf[offset - buf->offset], length);
-       if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
-               pthread_spin_lock(&g_caches_lock);
-               spdk_tree_remove_buffer(file->tree, buf);
-               if (file->tree->present_mask == 0) {
-                       TAILQ_REMOVE(&g_caches, file, cache_tailq);
-               }
-               pthread_spin_unlock(&g_caches_lock);
-       }
-
-       sem_post(&channel->sem);
-       return 0;
-}
-
 int64_t
 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
               void *payload, uint64_t offset, uint64_t length)
@@ -2505,6 +2657,8 @@ spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
        struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
        uint64_t final_offset, final_length;
        uint32_t sub_reads = 0;
+       struct cache_buffer *buf;
+       uint64_t read_len;
        int rc = 0;
 
        pthread_spin_lock(&file->lock);
@@ -2540,8 +2694,29 @@ spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
                        length = final_offset - offset;
                }
 
-               sub_reads++;
-               rc = __file_read(file, payload, offset, length, channel);
+               buf = tree_find_filled_buffer(file->tree, offset);
+               if (buf == NULL) {
+                       pthread_spin_unlock(&file->lock);
+                       rc = __send_rw_from_file(file, payload, offset, length, true, channel);
+                       pthread_spin_lock(&file->lock);
+                       if (rc == 0) {
+                               sub_reads++;
+                       }
+               } else {
+                       read_len = length;
+                       if ((offset + length) > (buf->offset + buf->bytes_filled)) {
+                               read_len = buf->offset + buf->bytes_filled - offset;
+                       }
+                       BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
+                       memcpy(payload, &buf->buf[offset - buf->offset], read_len);
+                       if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
+                               tree_remove_buffer(file->tree, buf);
+                               if (file->tree->present_mask == 0) {
+                                       spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
+                               }
+                       }
+               }
+
                if (rc == 0) {
                        final_length += length;
                } else {
@@ -2551,8 +2726,9 @@ spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
                offset += length;
        }
        pthread_spin_unlock(&file->lock);
-       while (sub_reads-- > 0) {
+       while (sub_reads > 0) {
                sem_wait(&channel->sem);
+               sub_reads--;
        }
        if (rc == 0) {
                return final_length;
@@ -2592,6 +2768,7 @@ _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
        flush_req = alloc_fs_request(channel);
        if (!flush_req) {
                SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
+               free_fs_request(sync_req);
                pthread_spin_unlock(&file->lock);
                cb_fn(cb_arg, -ENOMEM);
                return;
@@ -2769,25 +2946,33 @@ spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
 }
 
 static void
-cache_free_buffers(struct spdk_file *file)
+_file_free(void *ctx)
+{
+       struct spdk_file *file = ctx;
+
+       TAILQ_REMOVE(&g_caches, file, cache_tailq);
+
+       free(file->name);
+       free(file->tree);
+       free(file);
+}
+
+static void
+file_free(struct spdk_file *file)
 {
        BLOBFS_TRACE(file, "free=%s\n", file->name);
        pthread_spin_lock(&file->lock);
-       pthread_spin_lock(&g_caches_lock);
        if (file->tree->present_mask == 0) {
-               pthread_spin_unlock(&g_caches_lock);
                pthread_spin_unlock(&file->lock);
+               free(file->name);
+               free(file->tree);
+               free(file);
                return;
        }
-       spdk_tree_free_buffers(file->tree);
 
-       TAILQ_REMOVE(&g_caches, file, cache_tailq);
-       /* If not freed, put it in the end of the queue */
-       if (file->tree->present_mask != 0) {
-               TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
-       }
-       file->last = NULL;
-       pthread_spin_unlock(&g_caches_lock);
+       tree_free_buffers(file->tree);
+       assert(file->tree->present_mask == 0);
+       spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
        pthread_spin_unlock(&file->lock);
 }