#include "spdk/blobfs.h"
#include "spdk/conf.h"
-#include "blobfs_internal.h"
+#include "tree.h"
#include "spdk/queue.h"
#include "spdk/thread.h"
#define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
#define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
+#define SPDK_BLOBFS_SIGNATURE "BLOBFS"
+
static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
static struct spdk_mempool *g_cache_pool;
static TAILQ_HEAD(, spdk_file) g_caches;
+static struct spdk_poller *g_cache_pool_mgmt_poller;
+static struct spdk_thread *g_cache_pool_thread;
+#define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
static int g_fs_count = 0;
static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_spinlock_t g_caches_lock;
#define TRACE_GROUP_BLOBFS 0x7
#define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
}
void
-spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
+cache_buffer_free(struct cache_buffer *cache_buffer)
{
spdk_mempool_put(g_cache_pool, cache_buffer->buf);
free(cache_buffer);
} op;
};
-static void cache_free_buffers(struct spdk_file *file);
-static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs);
-static void spdk_fs_free_io_channels(struct spdk_filesystem *fs);
+static void file_free(struct spdk_file *file);
+static void fs_io_device_unregister(struct spdk_filesystem *fs);
+static void fs_free_io_channels(struct spdk_filesystem *fs);
void
spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
}
+static int _blobfs_cache_pool_reclaim(void *arg);
+
+static bool
+blobfs_cache_pool_need_reclaim(void)
+{
+ size_t count;
+
+ count = spdk_mempool_count(g_cache_pool);
+ /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
+ * when the number of available cache buffer is less than 1/5 of total buffers.
+ */
+ if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
+ return false;
+ }
+
+ return true;
+}
+
static void
-__initialize_cache(void)
+__start_cache_pool_mgmt(void *ctx)
{
assert(g_cache_pool == NULL);
assert(false);
}
TAILQ_INIT(&g_caches);
- pthread_spin_init(&g_caches_lock, 0);
+
+ assert(g_cache_pool_mgmt_poller == NULL);
+ g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
+ BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
}
static void
-__free_cache(void)
+__stop_cache_pool_mgmt(void *ctx)
{
- assert(g_cache_pool != NULL);
+ spdk_poller_unregister(&g_cache_pool_mgmt_poller);
+ assert(g_cache_pool != NULL);
+ assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
spdk_mempool_free(g_cache_pool);
g_cache_pool = NULL;
+
+ spdk_thread_exit(g_cache_pool_thread);
+}
+
+static void
+initialize_global_cache(void)
+{
+ pthread_mutex_lock(&g_cache_init_lock);
+ if (g_fs_count == 0) {
+ g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
+ assert(g_cache_pool_thread != NULL);
+ spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
+ }
+ g_fs_count++;
+ pthread_mutex_unlock(&g_cache_init_lock);
+}
+
+static void
+free_global_cache(void)
+{
+ pthread_mutex_lock(&g_cache_init_lock);
+ g_fs_count--;
+ if (g_fs_count == 0) {
+ spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
+ }
+ pthread_mutex_unlock(&g_cache_init_lock);
}
static uint64_t
}
static int
-_spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
- uint32_t max_ops)
+fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
+ uint32_t max_ops)
{
uint32_t i;
}
static int
-_spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
+fs_md_channel_create(void *io_device, void *ctx_buf)
{
struct spdk_filesystem *fs;
struct spdk_fs_channel *channel = ctx_buf;
fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
- return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
+ return fs_channel_create(fs, channel, fs->md_target.max_ops);
}
static int
-_spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
+fs_sync_channel_create(void *io_device, void *ctx_buf)
{
struct spdk_filesystem *fs;
struct spdk_fs_channel *channel = ctx_buf;
fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
- return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
+ return fs_channel_create(fs, channel, fs->sync_target.max_ops);
}
static int
-_spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
+fs_io_channel_create(void *io_device, void *ctx_buf)
{
struct spdk_filesystem *fs;
struct spdk_fs_channel *channel = ctx_buf;
fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
- return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
+ return fs_channel_create(fs, channel, fs->io_target.max_ops);
}
static void
-_spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
+fs_channel_destroy(void *io_device, void *ctx_buf)
{
struct spdk_fs_channel *channel = ctx_buf;
fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
- pthread_mutex_lock(&g_cache_init_lock);
- if (g_fs_count == 0) {
- __initialize_cache();
- }
- g_fs_count++;
- pthread_mutex_unlock(&g_cache_init_lock);
+ initialize_global_cache();
}
static void
fs_conf_parse(void)
{
struct spdk_conf_section *sp;
+ int cache_buffer_shift;
sp = spdk_conf_find_section(NULL, "Blobfs");
if (sp == NULL) {
return;
}
- g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
- if (g_fs_cache_buffer_shift <= 0) {
+ cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
+ if (cache_buffer_shift <= 0) {
g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
+ } else {
+ g_fs_cache_buffer_shift = cache_buffer_shift;
}
}
TAILQ_INIT(&fs->files);
fs->md_target.max_ops = 512;
- spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
+ spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
sizeof(struct spdk_fs_channel), "blobfs_md");
fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
fs->sync_target.max_ops = 512;
- spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
+ spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
sizeof(struct spdk_fs_channel), "blobfs_sync");
fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
fs->io_target.max_ops = 512;
- spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
+ spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
sizeof(struct spdk_fs_channel), "blobfs_io");
return fs;
req = alloc_fs_request(fs->md_target.md_fs_channel);
if (req == NULL) {
- spdk_fs_free_io_channels(fs);
- spdk_fs_io_device_unregister(fs);
+ fs_free_io_channels(fs);
+ fs_io_device_unregister(fs);
cb_fn(cb_arg, NULL, -ENOMEM);
return;
}
args->fs = fs;
spdk_bs_opts_init(&opts);
- snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
+ snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
if (opt) {
opts.cluster_sz = opt->cluster_sz;
}
return NULL;
}
+ if (pthread_spin_init(&file->lock, 0)) {
+ free(file->tree);
+ free(file);
+ return NULL;
+ }
+
file->fs = fs;
TAILQ_INIT(&file->open_requests);
TAILQ_INIT(&file->sync_requests);
- pthread_spin_init(&file->lock, 0);
TAILQ_INSERT_TAIL(&fs->files, file, tailq);
file->priority = SPDK_FILE_PRIORITY_LOW;
return file;
struct spdk_fs_cb_args *args = &req->args;
struct spdk_filesystem *fs = args->fs;
struct spdk_bs_type bstype;
- static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
+ static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
static const struct spdk_bs_type zeros;
if (bserrno != 0) {
args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
free_fs_request(req);
- free(fs);
+ fs_free_io_channels(fs);
+ fs_io_device_unregister(fs);
return;
}
bstype = spdk_bs_get_bstype(bs);
if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
- SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
+ SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n");
spdk_bs_set_bstype(bs, blobfs_type);
} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
- SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
- SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
- args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
+ SPDK_ERRLOG("not blobfs\n");
+ SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype));
+ args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
free_fs_request(req);
- free(fs);
+ fs_free_io_channels(fs);
+ fs_io_device_unregister(fs);
return;
}
}
static void
-spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
+fs_io_device_unregister(struct spdk_filesystem *fs)
{
assert(fs != NULL);
spdk_io_device_unregister(&fs->md_target, NULL);
}
static void
-spdk_fs_free_io_channels(struct spdk_filesystem *fs)
+fs_free_io_channels(struct spdk_filesystem *fs)
{
assert(fs != NULL);
spdk_fs_free_io_channel(fs->md_target.md_io_channel);
req = alloc_fs_request(fs->md_target.md_fs_channel);
if (req == NULL) {
- spdk_fs_free_io_channels(fs);
- spdk_fs_io_device_unregister(fs);
+ fs_free_io_channels(fs);
+ fs_io_device_unregister(fs);
cb_fn(cb_arg, NULL, -ENOMEM);
return;
}
TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
TAILQ_REMOVE(&fs->files, file, tailq);
- cache_free_buffers(file);
- free(file->name);
- free(file->tree);
- free(file);
+ file_free(file);
}
- pthread_mutex_lock(&g_cache_init_lock);
- g_fs_count--;
- if (g_fs_count == 0) {
- __free_cache();
- }
- pthread_mutex_unlock(&g_cache_init_lock);
+ free_global_cache();
args->fn.fs_op(args->arg, bserrno);
free(req);
- spdk_fs_io_device_unregister(fs);
+ fs_io_device_unregister(fs);
}
void
args->arg = cb_arg;
args->fs = fs;
- spdk_fs_free_io_channels(fs);
+ fs_free_io_channels(fs);
spdk_bs_unload(fs->bs, unload_cb, req);
}
struct spdk_fs_request *req = arg;
struct spdk_fs_cb_args *args = &req->args;
- args->rc = fserrno;
- sem_post(args->sem);
+ __wake_caller(args, fserrno);
SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
}
}
static void
-__spdk_fs_md_rename_file(struct spdk_fs_request *req)
+_fs_md_rename_file(struct spdk_fs_request *req)
{
struct spdk_fs_cb_args *args = &req->args;
struct spdk_file *f;
static void
fs_rename_delete_done(void *arg, int fserrno)
{
- __spdk_fs_md_rename_file(arg);
+ _fs_md_rename_file(arg);
}
void
f = fs_find_file(fs, new_name);
if (f == NULL) {
- __spdk_fs_md_rename_file(req);
+ _fs_md_rename_file(req);
return;
}
return;
}
+ blobid = f->blobid;
TAILQ_REMOVE(&fs->files, f, tailq);
- cache_free_buffers(f);
-
- blobid = f->blobid;
-
- free(f->name);
- free(f->tree);
- free(f);
+ file_free(f);
spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
}
free_fs_request(req);
}
+static void
+_copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
+{
+ int i;
+ size_t len;
+
+ for (i = 0; i < iovcnt; i++) {
+ len = spdk_min(iovs[i].iov_len, buf_len);
+ memcpy(buf, iovs[i].iov_base, len);
+ buf += len;
+ assert(buf_len >= len);
+ buf_len -= len;
+ }
+}
+
+static void
+_copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
+{
+ int i;
+ size_t len;
+
+ for (i = 0; i < iovcnt; i++) {
+ len = spdk_min(iovs[i].iov_len, buf_len);
+ memcpy(iovs[i].iov_base, buf, len);
+ buf += len;
+ assert(buf_len >= len);
+ buf_len -= len;
+ }
+}
+
static void
__read_done(void *ctx, int bserrno)
{
struct spdk_fs_request *req = ctx;
struct spdk_fs_cb_args *args = &req->args;
+ void *buf;
assert(req != NULL);
+ buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
if (args->op.rw.is_read) {
- memcpy(args->iovs[0].iov_base,
- args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
- args->iovs[0].iov_len);
+ _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
__rw_done(req, 0);
} else {
- memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
- args->iovs[0].iov_base,
- args->iovs[0].iov_len);
+ _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
spdk_blob_io_write(args->file->blob, args->op.rw.channel,
args->op.rw.pin_buf,
args->op.rw.start_lba, args->op.rw.num_lba,
*num_lba = (end_lba - *start_lba + 1);
}
+static bool
+__is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
+{
+ uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
+
+ if ((offset % lba_size == 0) && (length % lba_size == 0)) {
+ return true;
+ }
+
+ return false;
+}
+
static void
-__readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
- void *payload, uint64_t offset, uint64_t length,
- spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
+_fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
+{
+ uint32_t i;
+
+ for (i = 0; i < iovcnt; i++) {
+ req->args.iovs[i].iov_base = iovs[i].iov_base;
+ req->args.iovs[i].iov_len = iovs[i].iov_len;
+ }
+}
+
+static void
+__readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
+ struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
+ spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
{
struct spdk_fs_request *req;
struct spdk_fs_cb_args *args;
return;
}
- req = alloc_fs_request_with_iov(channel, 1);
+ req = alloc_fs_request_with_iov(channel, iovcnt);
if (req == NULL) {
cb_fn(cb_arg, -ENOMEM);
return;
args->arg = cb_arg;
args->file = file;
args->op.rw.channel = channel->bs_channel;
- args->iovs[0].iov_base = payload;
- args->iovs[0].iov_len = (size_t)length;
+ _fs_request_setup_iovs(req, iovs, iovcnt);
args->op.rw.is_read = is_read;
args->op.rw.offset = offset;
args->op.rw.blocklen = lba_size;
pin_buf_length = num_lba * lba_size;
+ args->op.rw.length = pin_buf_length;
args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
if (args->op.rw.pin_buf == NULL) {
if (!is_read && file->length < offset + length) {
spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
+ } else if (!is_read && __is_lba_aligned(file, offset, length)) {
+ _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
+ spdk_blob_io_write(args->file->blob, args->op.rw.channel,
+ args->op.rw.pin_buf,
+ args->op.rw.start_lba, args->op.rw.num_lba,
+ __rw_done, req);
} else {
__do_blob_read(req, 0);
}
}
+static void
+__readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
+ void *payload, uint64_t offset, uint64_t length,
+ spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
+{
+ struct iovec iov;
+
+ iov.iov_base = payload;
+ iov.iov_len = (size_t)length;
+
+ __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
+}
+
void
spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
void *payload, uint64_t offset, uint64_t length,
__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
}
+void
+spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
+ struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
+ spdk_file_op_complete cb_fn, void *cb_arg)
+{
+ SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
+ file->name, offset, length);
+
+ __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
+}
+
void
spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
void *payload, uint64_t offset, uint64_t length,
__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
}
+void
+spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
+ struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
+ spdk_file_op_complete cb_fn, void *cb_arg)
+{
+ SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
+ file->name, offset, length);
+
+ __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
+}
+
struct spdk_io_channel *
spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
{
return NULL;
}
- _spdk_fs_channel_create(fs, &ctx->ch, 512);
+ if (pthread_spin_init(&ctx->ch.lock, 0)) {
+ free(ctx);
+ return NULL;
+ }
+
+ fs_channel_create(fs, &ctx->ch, 512);
ctx->ch.send_request = fs->send_request;
ctx->ch.sync = 1;
- pthread_spin_init(&ctx->ch.lock, 0);
return ctx;
}
usleep(1000);
}
- _spdk_fs_channel_destroy(NULL, &ctx->ch);
+ fs_channel_destroy(NULL, &ctx->ch);
free(ctx);
}
-void
+int
spdk_fs_set_cache_size(uint64_t size_in_mb)
{
+ /* setting g_fs_cache_size is only permitted if cache pool
+ * is already freed or hasn't been initialized
+ */
+ if (g_cache_pool != NULL) {
+ return -EPERM;
+ }
+
g_fs_cache_size = size_in_mb * 1024 * 1024;
+
+ return 0;
}
uint64_t
static void __file_flush(void *ctx);
-static void *
-alloc_cache_memory_buffer(struct spdk_file *context)
+/* Try to free some cache buffers from this file.
+ */
+static int
+reclaim_cache_buffers(struct spdk_file *file)
{
- struct spdk_file *file;
- void *buf;
+ int rc;
+
+ BLOBFS_TRACE(file, "free=%s\n", file->name);
- buf = spdk_mempool_get(g_cache_pool);
- if (buf != NULL) {
- return buf;
+ /* The function is safe to be called with any threads, while the file
+ * lock maybe locked by other thread for now, so try to get the file
+ * lock here.
+ */
+ rc = pthread_spin_trylock(&file->lock);
+ if (rc != 0) {
+ return -1;
}
- pthread_spin_lock(&g_caches_lock);
- TAILQ_FOREACH(file, &g_caches, cache_tailq) {
- if (!file->open_for_writing &&
- file->priority == SPDK_FILE_PRIORITY_LOW &&
- file != context) {
- break;
- }
+ if (file->tree->present_mask == 0) {
+ pthread_spin_unlock(&file->lock);
+ return -1;
}
- pthread_spin_unlock(&g_caches_lock);
- if (file != NULL) {
- cache_free_buffers(file);
- buf = spdk_mempool_get(g_cache_pool);
- if (buf != NULL) {
- return buf;
- }
+ tree_free_buffers(file->tree);
+
+ TAILQ_REMOVE(&g_caches, file, cache_tailq);
+ /* If not freed, put it in the end of the queue */
+ if (file->tree->present_mask != 0) {
+ TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
+ } else {
+ file->last = NULL;
}
+ pthread_spin_unlock(&file->lock);
- pthread_spin_lock(&g_caches_lock);
- TAILQ_FOREACH(file, &g_caches, cache_tailq) {
- if (!file->open_for_writing && file != context) {
- break;
- }
+ return 0;
+}
+
+static int
+_blobfs_cache_pool_reclaim(void *arg)
+{
+ struct spdk_file *file, *tmp;
+ int rc;
+
+ if (!blobfs_cache_pool_need_reclaim()) {
+ return SPDK_POLLER_IDLE;
}
- pthread_spin_unlock(&g_caches_lock);
- if (file != NULL) {
- cache_free_buffers(file);
- buf = spdk_mempool_get(g_cache_pool);
- if (buf != NULL) {
- return buf;
+
+ TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
+ if (!file->open_for_writing &&
+ file->priority == SPDK_FILE_PRIORITY_LOW) {
+ rc = reclaim_cache_buffers(file);
+ if (rc < 0) {
+ continue;
+ }
+ if (!blobfs_cache_pool_need_reclaim()) {
+ return SPDK_POLLER_BUSY;
+ }
+ break;
}
}
- pthread_spin_lock(&g_caches_lock);
- TAILQ_FOREACH(file, &g_caches, cache_tailq) {
- if (file != context) {
+ TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
+ if (!file->open_for_writing) {
+ rc = reclaim_cache_buffers(file);
+ if (rc < 0) {
+ continue;
+ }
+ if (!blobfs_cache_pool_need_reclaim()) {
+ return SPDK_POLLER_BUSY;
+ }
break;
}
}
- pthread_spin_unlock(&g_caches_lock);
- if (file != NULL) {
- cache_free_buffers(file);
- buf = spdk_mempool_get(g_cache_pool);
- if (buf != NULL) {
- return buf;
+
+ TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
+ rc = reclaim_cache_buffers(file);
+ if (rc < 0) {
+ continue;
}
+ break;
}
- return NULL;
+ return SPDK_POLLER_BUSY;
+}
+
+static void
+_add_file_to_cache_pool(void *ctx)
+{
+ struct spdk_file *file = ctx;
+
+ TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
+}
+
+static void
+_remove_file_from_cache_pool(void *ctx)
+{
+ struct spdk_file *file = ctx;
+
+ TAILQ_REMOVE(&g_caches, file, cache_tailq);
}
static struct cache_buffer *
{
struct cache_buffer *buf;
int count = 0;
+ bool need_update = false;
buf = calloc(1, sizeof(*buf));
if (buf == NULL) {
return NULL;
}
- buf->buf = alloc_cache_memory_buffer(file);
- while (buf->buf == NULL) {
- /*
- * TODO: alloc_cache_memory_buffer() should eventually free
- * some buffers. Need a more sophisticated check here, instead
- * of just bailing if 100 tries does not result in getting a
- * free buffer. This will involve using the sync channel's
- * semaphore to block until a buffer becomes available.
- */
+ do {
+ buf->buf = spdk_mempool_get(g_cache_pool);
+ if (buf->buf) {
+ break;
+ }
if (count++ == 100) {
SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
file, offset);
free(buf);
return NULL;
}
- buf->buf = alloc_cache_memory_buffer(file);
- }
+ usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
+ } while (true);
buf->buf_size = CACHE_BUFFER_SIZE;
buf->offset = offset;
- pthread_spin_lock(&g_caches_lock);
if (file->tree->present_mask == 0) {
- TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
+ need_update = true;
+ }
+ file->tree = tree_insert_buffer(file->tree, buf);
+
+ if (need_update) {
+ spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
}
- file->tree = spdk_tree_insert_buffer(file->tree, buf);
- pthread_spin_unlock(&g_caches_lock);
return buf;
}
pthread_spin_unlock(&file->lock);
sync_args->fn.file_op(sync_args->arg, bserrno);
- pthread_spin_lock(&file->lock);
- free_fs_request(sync_req);
- pthread_spin_unlock(&file->lock);
+ free_fs_request(sync_req);
__check_sync_reqs(file);
}
}
if (next->bytes_flushed == next->buf_size) {
BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
- next = spdk_tree_find_buffer(file->tree, file->length_flushed);
+ next = tree_find_buffer(file->tree, file->length_flushed);
}
/*
uint32_t lba_size;
pthread_spin_lock(&file->lock);
- next = spdk_tree_find_buffer(file->tree, file->length_flushed);
+ next = tree_find_buffer(file->tree, file->length_flushed);
if (next == NULL || next->in_progress ||
((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
/*
__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
next->in_progress = true;
- BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
+ BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
offset, length, start_lba, num_lba);
pthread_spin_unlock(&file->lock);
spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
struct spdk_fs_cb_args *args;
offset = __next_cache_buffer_offset(offset);
- if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
+ if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
return;
}
file->fs->send_request(__readahead, req);
}
-static int
-__file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
- struct spdk_fs_channel *channel)
-{
- struct cache_buffer *buf;
- int rc;
-
- buf = spdk_tree_find_filled_buffer(file->tree, offset);
- if (buf == NULL) {
- pthread_spin_unlock(&file->lock);
- rc = __send_rw_from_file(file, payload, offset, length, true, channel);
- pthread_spin_lock(&file->lock);
- return rc;
- }
-
- if ((offset + length) > (buf->offset + buf->bytes_filled)) {
- length = buf->offset + buf->bytes_filled - offset;
- }
- BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
- memcpy(payload, &buf->buf[offset - buf->offset], length);
- if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
- pthread_spin_lock(&g_caches_lock);
- spdk_tree_remove_buffer(file->tree, buf);
- if (file->tree->present_mask == 0) {
- TAILQ_REMOVE(&g_caches, file, cache_tailq);
- }
- pthread_spin_unlock(&g_caches_lock);
- }
-
- sem_post(&channel->sem);
- return 0;
-}
-
int64_t
spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
void *payload, uint64_t offset, uint64_t length)
struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
uint64_t final_offset, final_length;
uint32_t sub_reads = 0;
+ struct cache_buffer *buf;
+ uint64_t read_len;
int rc = 0;
pthread_spin_lock(&file->lock);
length = final_offset - offset;
}
- sub_reads++;
- rc = __file_read(file, payload, offset, length, channel);
+ buf = tree_find_filled_buffer(file->tree, offset);
+ if (buf == NULL) {
+ pthread_spin_unlock(&file->lock);
+ rc = __send_rw_from_file(file, payload, offset, length, true, channel);
+ pthread_spin_lock(&file->lock);
+ if (rc == 0) {
+ sub_reads++;
+ }
+ } else {
+ read_len = length;
+ if ((offset + length) > (buf->offset + buf->bytes_filled)) {
+ read_len = buf->offset + buf->bytes_filled - offset;
+ }
+ BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
+ memcpy(payload, &buf->buf[offset - buf->offset], read_len);
+ if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
+ tree_remove_buffer(file->tree, buf);
+ if (file->tree->present_mask == 0) {
+ spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
+ }
+ }
+ }
+
if (rc == 0) {
final_length += length;
} else {
offset += length;
}
pthread_spin_unlock(&file->lock);
- while (sub_reads-- > 0) {
+ while (sub_reads > 0) {
sem_wait(&channel->sem);
+ sub_reads--;
}
if (rc == 0) {
return final_length;
flush_req = alloc_fs_request(channel);
if (!flush_req) {
SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
+ free_fs_request(sync_req);
pthread_spin_unlock(&file->lock);
cb_fn(cb_arg, -ENOMEM);
return;
}
static void
-cache_free_buffers(struct spdk_file *file)
+_file_free(void *ctx)
+{
+ struct spdk_file *file = ctx;
+
+ TAILQ_REMOVE(&g_caches, file, cache_tailq);
+
+ free(file->name);
+ free(file->tree);
+ free(file);
+}
+
+static void
+file_free(struct spdk_file *file)
{
BLOBFS_TRACE(file, "free=%s\n", file->name);
pthread_spin_lock(&file->lock);
- pthread_spin_lock(&g_caches_lock);
if (file->tree->present_mask == 0) {
- pthread_spin_unlock(&g_caches_lock);
pthread_spin_unlock(&file->lock);
+ free(file->name);
+ free(file->tree);
+ free(file);
return;
}
- spdk_tree_free_buffers(file->tree);
- TAILQ_REMOVE(&g_caches, file, cache_tailq);
- /* If not freed, put it in the end of the queue */
- if (file->tree->present_mask != 0) {
- TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
- }
- file->last = NULL;
- pthread_spin_unlock(&g_caches_lock);
+ tree_free_buffers(file->tree);
+ assert(file->tree->present_mask == 0);
+ spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
pthread_spin_unlock(&file->lock);
}