]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/blobfs/blobfs.c
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / spdk / lib / blobfs / blobfs.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "spdk/stdinc.h"
35
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46
47 #define BLOBFS_TRACE(file, str, args...) \
48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 free(cache_buffer);
68 }
69
70 #define CACHE_READAHEAD_THRESHOLD (128 * 1024)
71
72 struct spdk_file {
73 struct spdk_filesystem *fs;
74 struct spdk_blob *blob;
75 char *name;
76 uint64_t length;
77 bool is_deleted;
78 bool open_for_writing;
79 uint64_t length_flushed;
80 uint64_t append_pos;
81 uint64_t seq_byte_count;
82 uint64_t next_seq_offset;
83 uint32_t priority;
84 TAILQ_ENTRY(spdk_file) tailq;
85 spdk_blob_id blobid;
86 uint32_t ref_count;
87 pthread_spinlock_t lock;
88 struct cache_buffer *last;
89 struct cache_tree *tree;
90 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 TAILQ_ENTRY(spdk_file) cache_tailq;
93 };
94
95 struct spdk_deleted_file {
96 spdk_blob_id id;
97 TAILQ_ENTRY(spdk_deleted_file) tailq;
98 };
99
100 struct spdk_filesystem {
101 struct spdk_blob_store *bs;
102 TAILQ_HEAD(, spdk_file) files;
103 struct spdk_bs_opts bs_opts;
104 struct spdk_bs_dev *bdev;
105 fs_send_request_fn send_request;
106
107 struct {
108 uint32_t max_ops;
109 struct spdk_io_channel *sync_io_channel;
110 struct spdk_fs_channel *sync_fs_channel;
111 } sync_target;
112
113 struct {
114 uint32_t max_ops;
115 struct spdk_io_channel *md_io_channel;
116 struct spdk_fs_channel *md_fs_channel;
117 } md_target;
118
119 struct {
120 uint32_t max_ops;
121 } io_target;
122 };
123
124 struct spdk_fs_cb_args {
125 union {
126 spdk_fs_op_with_handle_complete fs_op_with_handle;
127 spdk_fs_op_complete fs_op;
128 spdk_file_op_with_handle_complete file_op_with_handle;
129 spdk_file_op_complete file_op;
130 spdk_file_stat_op_complete stat_op;
131 } fn;
132 void *arg;
133 sem_t *sem;
134 struct spdk_filesystem *fs;
135 struct spdk_file *file;
136 int rc;
137 bool from_request;
138 union {
139 struct {
140 TAILQ_HEAD(, spdk_deleted_file) deleted_files;
141 } fs_load;
142 struct {
143 uint64_t length;
144 } truncate;
145 struct {
146 struct spdk_io_channel *channel;
147 void *user_buf;
148 void *pin_buf;
149 int is_read;
150 off_t offset;
151 size_t length;
152 uint64_t start_lba;
153 uint64_t num_lba;
154 uint32_t blocklen;
155 } rw;
156 struct {
157 const char *old_name;
158 const char *new_name;
159 } rename;
160 struct {
161 struct cache_buffer *cache_buffer;
162 uint64_t length;
163 } flush;
164 struct {
165 struct cache_buffer *cache_buffer;
166 uint64_t length;
167 uint64_t offset;
168 } readahead;
169 struct {
170 uint64_t offset;
171 TAILQ_ENTRY(spdk_fs_request) tailq;
172 bool xattr_in_progress;
173 } sync;
174 struct {
175 uint32_t num_clusters;
176 } resize;
177 struct {
178 const char *name;
179 uint32_t flags;
180 TAILQ_ENTRY(spdk_fs_request) tailq;
181 } open;
182 struct {
183 const char *name;
184 struct spdk_blob *blob;
185 } create;
186 struct {
187 const char *name;
188 } delete;
189 struct {
190 const char *name;
191 } stat;
192 } op;
193 };
194
195 static void cache_free_buffers(struct spdk_file *file);
196
197 void
198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
199 {
200 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
201 }
202
203 static void
204 __initialize_cache(void)
205 {
206 assert(g_cache_pool == NULL);
207
208 g_cache_pool = spdk_mempool_create("spdk_fs_cache",
209 g_fs_cache_size / CACHE_BUFFER_SIZE,
210 CACHE_BUFFER_SIZE,
211 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
212 SPDK_ENV_SOCKET_ID_ANY);
213 if (!g_cache_pool) {
214 SPDK_ERRLOG("Create mempool failed, you may "
215 "increase the memory and try again\n");
216 assert(false);
217 }
218 TAILQ_INIT(&g_caches);
219 pthread_spin_init(&g_caches_lock, 0);
220 }
221
222 static void
223 __free_cache(void)
224 {
225 assert(g_cache_pool != NULL);
226
227 spdk_mempool_free(g_cache_pool);
228 g_cache_pool = NULL;
229 }
230
231 static uint64_t
232 __file_get_blob_size(struct spdk_file *file)
233 {
234 uint64_t cluster_sz;
235
236 cluster_sz = file->fs->bs_opts.cluster_sz;
237 return cluster_sz * spdk_blob_get_num_clusters(file->blob);
238 }
239
240 struct spdk_fs_request {
241 struct spdk_fs_cb_args args;
242 TAILQ_ENTRY(spdk_fs_request) link;
243 struct spdk_fs_channel *channel;
244 };
245
246 struct spdk_fs_channel {
247 struct spdk_fs_request *req_mem;
248 TAILQ_HEAD(, spdk_fs_request) reqs;
249 sem_t sem;
250 struct spdk_filesystem *fs;
251 struct spdk_io_channel *bs_channel;
252 fs_send_request_fn send_request;
253 bool sync;
254 pthread_spinlock_t lock;
255 };
256
257 static struct spdk_fs_request *
258 alloc_fs_request(struct spdk_fs_channel *channel)
259 {
260 struct spdk_fs_request *req;
261
262 if (channel->sync) {
263 pthread_spin_lock(&channel->lock);
264 }
265
266 req = TAILQ_FIRST(&channel->reqs);
267 if (req) {
268 TAILQ_REMOVE(&channel->reqs, req, link);
269 }
270
271 if (channel->sync) {
272 pthread_spin_unlock(&channel->lock);
273 }
274
275 if (req == NULL) {
276 return NULL;
277 }
278 memset(req, 0, sizeof(*req));
279 req->channel = channel;
280 req->args.from_request = true;
281
282 return req;
283 }
284
285 static void
286 free_fs_request(struct spdk_fs_request *req)
287 {
288 struct spdk_fs_channel *channel = req->channel;
289
290 if (channel->sync) {
291 pthread_spin_lock(&channel->lock);
292 }
293
294 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
295
296 if (channel->sync) {
297 pthread_spin_unlock(&channel->lock);
298 }
299 }
300
301 static int
302 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
303 uint32_t max_ops)
304 {
305 uint32_t i;
306
307 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
308 if (!channel->req_mem) {
309 return -1;
310 }
311
312 TAILQ_INIT(&channel->reqs);
313 sem_init(&channel->sem, 0, 0);
314
315 for (i = 0; i < max_ops; i++) {
316 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
317 }
318
319 channel->fs = fs;
320
321 return 0;
322 }
323
324 static int
325 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
326 {
327 struct spdk_filesystem *fs;
328 struct spdk_fs_channel *channel = ctx_buf;
329
330 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
331
332 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
333 }
334
335 static int
336 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
337 {
338 struct spdk_filesystem *fs;
339 struct spdk_fs_channel *channel = ctx_buf;
340
341 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
342
343 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
344 }
345
346 static int
347 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
348 {
349 struct spdk_filesystem *fs;
350 struct spdk_fs_channel *channel = ctx_buf;
351
352 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
353
354 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
355 }
356
357 static void
358 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
359 {
360 struct spdk_fs_channel *channel = ctx_buf;
361
362 free(channel->req_mem);
363 if (channel->bs_channel != NULL) {
364 spdk_bs_free_io_channel(channel->bs_channel);
365 }
366 }
367
368 static void
369 __send_request_direct(fs_request_fn fn, void *arg)
370 {
371 fn(arg);
372 }
373
374 static void
375 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
376 {
377 fs->bs = bs;
378 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
379 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
380 fs->md_target.md_fs_channel->send_request = __send_request_direct;
381 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
382 fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
383
384 pthread_mutex_lock(&g_cache_init_lock);
385 if (g_fs_count == 0) {
386 __initialize_cache();
387 }
388 g_fs_count++;
389 pthread_mutex_unlock(&g_cache_init_lock);
390 }
391
392 static void
393 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
394 {
395 struct spdk_fs_request *req = ctx;
396 struct spdk_fs_cb_args *args = &req->args;
397 struct spdk_filesystem *fs = args->fs;
398
399 if (bserrno == 0) {
400 common_fs_bs_init(fs, bs);
401 } else {
402 free(fs);
403 fs = NULL;
404 }
405
406 args->fn.fs_op_with_handle(args->arg, fs, bserrno);
407 free_fs_request(req);
408 }
409
410 static void
411 fs_conf_parse(void)
412 {
413 struct spdk_conf_section *sp;
414
415 sp = spdk_conf_find_section(NULL, "Blobfs");
416 if (sp == NULL) {
417 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
418 return;
419 }
420
421 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
422 if (g_fs_cache_buffer_shift <= 0) {
423 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
424 }
425 }
426
427 static struct spdk_filesystem *
428 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
429 {
430 struct spdk_filesystem *fs;
431
432 fs = calloc(1, sizeof(*fs));
433 if (fs == NULL) {
434 return NULL;
435 }
436
437 fs->bdev = dev;
438 fs->send_request = send_request_fn;
439 TAILQ_INIT(&fs->files);
440
441 fs->md_target.max_ops = 512;
442 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
443 sizeof(struct spdk_fs_channel), "blobfs_md");
444 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
445 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
446
447 fs->sync_target.max_ops = 512;
448 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
449 sizeof(struct spdk_fs_channel), "blobfs_sync");
450 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
451 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
452
453 fs->io_target.max_ops = 512;
454 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
455 sizeof(struct spdk_fs_channel), "blobfs_io");
456
457 return fs;
458 }
459
460 static void
461 __wake_caller(void *arg, int fserrno)
462 {
463 struct spdk_fs_cb_args *args = arg;
464
465 args->rc = fserrno;
466 sem_post(args->sem);
467 }
468
469 void
470 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
471 fs_send_request_fn send_request_fn,
472 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
473 {
474 struct spdk_filesystem *fs;
475 struct spdk_fs_request *req;
476 struct spdk_fs_cb_args *args;
477 struct spdk_bs_opts opts = {};
478
479 fs = fs_alloc(dev, send_request_fn);
480 if (fs == NULL) {
481 cb_fn(cb_arg, NULL, -ENOMEM);
482 return;
483 }
484
485 fs_conf_parse();
486
487 req = alloc_fs_request(fs->md_target.md_fs_channel);
488 if (req == NULL) {
489 spdk_put_io_channel(fs->md_target.md_io_channel);
490 spdk_io_device_unregister(&fs->md_target, NULL);
491 spdk_put_io_channel(fs->sync_target.sync_io_channel);
492 spdk_io_device_unregister(&fs->sync_target, NULL);
493 spdk_io_device_unregister(&fs->io_target, NULL);
494 free(fs);
495 cb_fn(cb_arg, NULL, -ENOMEM);
496 return;
497 }
498
499 args = &req->args;
500 args->fn.fs_op_with_handle = cb_fn;
501 args->arg = cb_arg;
502 args->fs = fs;
503
504 spdk_bs_opts_init(&opts);
505 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
506 if (opt) {
507 opts.cluster_sz = opt->cluster_sz;
508 }
509 spdk_bs_init(dev, &opts, init_cb, req);
510 }
511
512 static struct spdk_file *
513 file_alloc(struct spdk_filesystem *fs)
514 {
515 struct spdk_file *file;
516
517 file = calloc(1, sizeof(*file));
518 if (file == NULL) {
519 return NULL;
520 }
521
522 file->tree = calloc(1, sizeof(*file->tree));
523 if (file->tree == NULL) {
524 free(file);
525 return NULL;
526 }
527
528 file->fs = fs;
529 TAILQ_INIT(&file->open_requests);
530 TAILQ_INIT(&file->sync_requests);
531 pthread_spin_init(&file->lock, 0);
532 TAILQ_INSERT_TAIL(&fs->files, file, tailq);
533 file->priority = SPDK_FILE_PRIORITY_LOW;
534 return file;
535 }
536
537 static void fs_load_done(void *ctx, int bserrno);
538
539 static int
540 _handle_deleted_files(struct spdk_fs_request *req)
541 {
542 struct spdk_fs_cb_args *args = &req->args;
543 struct spdk_filesystem *fs = args->fs;
544
545 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
546 struct spdk_deleted_file *deleted_file;
547
548 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
549 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
550 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
551 free(deleted_file);
552 return 0;
553 }
554
555 return 1;
556 }
557
558 static void
559 fs_load_done(void *ctx, int bserrno)
560 {
561 struct spdk_fs_request *req = ctx;
562 struct spdk_fs_cb_args *args = &req->args;
563 struct spdk_filesystem *fs = args->fs;
564
565 /* The filesystem has been loaded. Now check if there are any files that
566 * were marked for deletion before last unload. Do not complete the
567 * fs_load callback until all of them have been deleted on disk.
568 */
569 if (_handle_deleted_files(req) == 0) {
570 /* We found a file that's been marked for deleting but not actually
571 * deleted yet. This function will get called again once the delete
572 * operation is completed.
573 */
574 return;
575 }
576
577 args->fn.fs_op_with_handle(args->arg, fs, 0);
578 free_fs_request(req);
579
580 }
581
582 static void
583 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
584 {
585 struct spdk_fs_request *req = ctx;
586 struct spdk_fs_cb_args *args = &req->args;
587 struct spdk_filesystem *fs = args->fs;
588 uint64_t *length;
589 const char *name;
590 uint32_t *is_deleted;
591 size_t value_len;
592
593 if (rc < 0) {
594 args->fn.fs_op_with_handle(args->arg, fs, rc);
595 free_fs_request(req);
596 return;
597 }
598
599 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
600 if (rc < 0) {
601 args->fn.fs_op_with_handle(args->arg, fs, rc);
602 free_fs_request(req);
603 return;
604 }
605
606 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
607 if (rc < 0) {
608 args->fn.fs_op_with_handle(args->arg, fs, rc);
609 free_fs_request(req);
610 return;
611 }
612
613 assert(value_len == 8);
614
615 /* This file could be deleted last time without close it, then app crashed, so we delete it now */
616 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
617 if (rc < 0) {
618 struct spdk_file *f;
619
620 f = file_alloc(fs);
621 if (f == NULL) {
622 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
623 free_fs_request(req);
624 return;
625 }
626
627 f->name = strdup(name);
628 f->blobid = spdk_blob_get_id(blob);
629 f->length = *length;
630 f->length_flushed = *length;
631 f->append_pos = *length;
632 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
633 } else {
634 struct spdk_deleted_file *deleted_file;
635
636 deleted_file = calloc(1, sizeof(*deleted_file));
637 if (deleted_file == NULL) {
638 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
639 free_fs_request(req);
640 return;
641 }
642 deleted_file->id = spdk_blob_get_id(blob);
643 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
644 }
645 }
646
647 static void
648 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
649 {
650 struct spdk_fs_request *req = ctx;
651 struct spdk_fs_cb_args *args = &req->args;
652 struct spdk_filesystem *fs = args->fs;
653 struct spdk_bs_type bstype;
654 static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
655 static const struct spdk_bs_type zeros;
656
657 if (bserrno != 0) {
658 args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
659 free_fs_request(req);
660 free(fs);
661 return;
662 }
663
664 bstype = spdk_bs_get_bstype(bs);
665
666 if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
667 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
668 spdk_bs_set_bstype(bs, blobfs_type);
669 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
670 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
671 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
672 args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
673 free_fs_request(req);
674 free(fs);
675 return;
676 }
677
678 common_fs_bs_init(fs, bs);
679 fs_load_done(req, 0);
680 }
681
682 static void
683 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
684 {
685 assert(fs != NULL);
686 spdk_io_device_unregister(&fs->md_target, NULL);
687 spdk_io_device_unregister(&fs->sync_target, NULL);
688 spdk_io_device_unregister(&fs->io_target, NULL);
689 free(fs);
690 }
691
692 static void
693 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
694 {
695 assert(fs != NULL);
696 spdk_fs_free_io_channel(fs->md_target.md_io_channel);
697 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
698 }
699
700 void
701 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
702 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
703 {
704 struct spdk_filesystem *fs;
705 struct spdk_fs_cb_args *args;
706 struct spdk_fs_request *req;
707 struct spdk_bs_opts bs_opts;
708
709 fs = fs_alloc(dev, send_request_fn);
710 if (fs == NULL) {
711 cb_fn(cb_arg, NULL, -ENOMEM);
712 return;
713 }
714
715 fs_conf_parse();
716
717 req = alloc_fs_request(fs->md_target.md_fs_channel);
718 if (req == NULL) {
719 spdk_fs_free_io_channels(fs);
720 spdk_fs_io_device_unregister(fs);
721 cb_fn(cb_arg, NULL, -ENOMEM);
722 return;
723 }
724
725 args = &req->args;
726 args->fn.fs_op_with_handle = cb_fn;
727 args->arg = cb_arg;
728 args->fs = fs;
729 TAILQ_INIT(&args->op.fs_load.deleted_files);
730 spdk_bs_opts_init(&bs_opts);
731 bs_opts.iter_cb_fn = iter_cb;
732 bs_opts.iter_cb_arg = req;
733 spdk_bs_load(dev, &bs_opts, load_cb, req);
734 }
735
736 static void
737 unload_cb(void *ctx, int bserrno)
738 {
739 struct spdk_fs_request *req = ctx;
740 struct spdk_fs_cb_args *args = &req->args;
741 struct spdk_filesystem *fs = args->fs;
742 struct spdk_file *file, *tmp;
743
744 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
745 TAILQ_REMOVE(&fs->files, file, tailq);
746 cache_free_buffers(file);
747 free(file->name);
748 free(file->tree);
749 free(file);
750 }
751
752 pthread_mutex_lock(&g_cache_init_lock);
753 g_fs_count--;
754 if (g_fs_count == 0) {
755 __free_cache();
756 }
757 pthread_mutex_unlock(&g_cache_init_lock);
758
759 args->fn.fs_op(args->arg, bserrno);
760 free(req);
761
762 spdk_fs_io_device_unregister(fs);
763 }
764
765 void
766 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
767 {
768 struct spdk_fs_request *req;
769 struct spdk_fs_cb_args *args;
770
771 /*
772 * We must free the md_channel before unloading the blobstore, so just
773 * allocate this request from the general heap.
774 */
775 req = calloc(1, sizeof(*req));
776 if (req == NULL) {
777 cb_fn(cb_arg, -ENOMEM);
778 return;
779 }
780
781 args = &req->args;
782 args->fn.fs_op = cb_fn;
783 args->arg = cb_arg;
784 args->fs = fs;
785
786 spdk_fs_free_io_channels(fs);
787 spdk_bs_unload(fs->bs, unload_cb, req);
788 }
789
790 static struct spdk_file *
791 fs_find_file(struct spdk_filesystem *fs, const char *name)
792 {
793 struct spdk_file *file;
794
795 TAILQ_FOREACH(file, &fs->files, tailq) {
796 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
797 return file;
798 }
799 }
800
801 return NULL;
802 }
803
804 void
805 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
806 spdk_file_stat_op_complete cb_fn, void *cb_arg)
807 {
808 struct spdk_file_stat stat;
809 struct spdk_file *f = NULL;
810
811 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
812 cb_fn(cb_arg, NULL, -ENAMETOOLONG);
813 return;
814 }
815
816 f = fs_find_file(fs, name);
817 if (f != NULL) {
818 stat.blobid = f->blobid;
819 stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
820 cb_fn(cb_arg, &stat, 0);
821 return;
822 }
823
824 cb_fn(cb_arg, NULL, -ENOENT);
825 }
826
827 static void
828 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
829 {
830 struct spdk_fs_request *req = arg;
831 struct spdk_fs_cb_args *args = &req->args;
832
833 args->rc = fserrno;
834 if (fserrno == 0) {
835 memcpy(args->arg, stat, sizeof(*stat));
836 }
837 sem_post(args->sem);
838 }
839
840 static void
841 __file_stat(void *arg)
842 {
843 struct spdk_fs_request *req = arg;
844 struct spdk_fs_cb_args *args = &req->args;
845
846 spdk_fs_file_stat_async(args->fs, args->op.stat.name,
847 args->fn.stat_op, req);
848 }
849
850 int
851 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
852 const char *name, struct spdk_file_stat *stat)
853 {
854 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
855 struct spdk_fs_request *req;
856 int rc;
857
858 req = alloc_fs_request(channel);
859 if (req == NULL) {
860 return -ENOMEM;
861 }
862
863 req->args.fs = fs;
864 req->args.op.stat.name = name;
865 req->args.fn.stat_op = __copy_stat;
866 req->args.arg = stat;
867 req->args.sem = &channel->sem;
868 channel->send_request(__file_stat, req);
869 sem_wait(&channel->sem);
870
871 rc = req->args.rc;
872 free_fs_request(req);
873
874 return rc;
875 }
876
877 static void
878 fs_create_blob_close_cb(void *ctx, int bserrno)
879 {
880 int rc;
881 struct spdk_fs_request *req = ctx;
882 struct spdk_fs_cb_args *args = &req->args;
883
884 rc = args->rc ? args->rc : bserrno;
885 args->fn.file_op(args->arg, rc);
886 free_fs_request(req);
887 }
888
889 static void
890 fs_create_blob_resize_cb(void *ctx, int bserrno)
891 {
892 struct spdk_fs_request *req = ctx;
893 struct spdk_fs_cb_args *args = &req->args;
894 struct spdk_file *f = args->file;
895 struct spdk_blob *blob = args->op.create.blob;
896 uint64_t length = 0;
897
898 args->rc = bserrno;
899 if (bserrno) {
900 spdk_blob_close(blob, fs_create_blob_close_cb, args);
901 return;
902 }
903
904 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
905 spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
906
907 spdk_blob_close(blob, fs_create_blob_close_cb, args);
908 }
909
910 static void
911 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
912 {
913 struct spdk_fs_request *req = ctx;
914 struct spdk_fs_cb_args *args = &req->args;
915
916 if (bserrno) {
917 args->fn.file_op(args->arg, bserrno);
918 free_fs_request(req);
919 return;
920 }
921
922 args->op.create.blob = blob;
923 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
924 }
925
926 static void
927 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
928 {
929 struct spdk_fs_request *req = ctx;
930 struct spdk_fs_cb_args *args = &req->args;
931 struct spdk_file *f = args->file;
932
933 if (bserrno) {
934 args->fn.file_op(args->arg, bserrno);
935 free_fs_request(req);
936 return;
937 }
938
939 f->blobid = blobid;
940 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
941 }
942
943 void
944 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
945 spdk_file_op_complete cb_fn, void *cb_arg)
946 {
947 struct spdk_file *file;
948 struct spdk_fs_request *req;
949 struct spdk_fs_cb_args *args;
950
951 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
952 cb_fn(cb_arg, -ENAMETOOLONG);
953 return;
954 }
955
956 file = fs_find_file(fs, name);
957 if (file != NULL) {
958 cb_fn(cb_arg, -EEXIST);
959 return;
960 }
961
962 file = file_alloc(fs);
963 if (file == NULL) {
964 cb_fn(cb_arg, -ENOMEM);
965 return;
966 }
967
968 req = alloc_fs_request(fs->md_target.md_fs_channel);
969 if (req == NULL) {
970 cb_fn(cb_arg, -ENOMEM);
971 return;
972 }
973
974 args = &req->args;
975 args->file = file;
976 args->fn.file_op = cb_fn;
977 args->arg = cb_arg;
978
979 file->name = strdup(name);
980 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
981 }
982
983 static void
984 __fs_create_file_done(void *arg, int fserrno)
985 {
986 struct spdk_fs_request *req = arg;
987 struct spdk_fs_cb_args *args = &req->args;
988
989 args->rc = fserrno;
990 sem_post(args->sem);
991 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
992 }
993
994 static void
995 __fs_create_file(void *arg)
996 {
997 struct spdk_fs_request *req = arg;
998 struct spdk_fs_cb_args *args = &req->args;
999
1000 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1001 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1002 }
1003
1004 int
1005 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
1006 {
1007 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1008 struct spdk_fs_request *req;
1009 struct spdk_fs_cb_args *args;
1010 int rc;
1011
1012 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1013
1014 req = alloc_fs_request(channel);
1015 if (req == NULL) {
1016 return -ENOMEM;
1017 }
1018
1019 args = &req->args;
1020 args->fs = fs;
1021 args->op.create.name = name;
1022 args->sem = &channel->sem;
1023 fs->send_request(__fs_create_file, req);
1024 sem_wait(&channel->sem);
1025 rc = args->rc;
1026 free_fs_request(req);
1027
1028 return rc;
1029 }
1030
1031 static void
1032 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1033 {
1034 struct spdk_fs_request *req = ctx;
1035 struct spdk_fs_cb_args *args = &req->args;
1036 struct spdk_file *f = args->file;
1037
1038 f->blob = blob;
1039 while (!TAILQ_EMPTY(&f->open_requests)) {
1040 req = TAILQ_FIRST(&f->open_requests);
1041 args = &req->args;
1042 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1043 args->fn.file_op_with_handle(args->arg, f, bserrno);
1044 free_fs_request(req);
1045 }
1046 }
1047
1048 static void
1049 fs_open_blob_create_cb(void *ctx, int bserrno)
1050 {
1051 struct spdk_fs_request *req = ctx;
1052 struct spdk_fs_cb_args *args = &req->args;
1053 struct spdk_file *file = args->file;
1054 struct spdk_filesystem *fs = args->fs;
1055
1056 if (file == NULL) {
1057 /*
1058 * This is from an open with CREATE flag - the file
1059 * is now created so look it up in the file list for this
1060 * filesystem.
1061 */
1062 file = fs_find_file(fs, args->op.open.name);
1063 assert(file != NULL);
1064 args->file = file;
1065 }
1066
1067 file->ref_count++;
1068 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1069 if (file->ref_count == 1) {
1070 assert(file->blob == NULL);
1071 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1072 } else if (file->blob != NULL) {
1073 fs_open_blob_done(req, file->blob, 0);
1074 } else {
1075 /*
1076 * The blob open for this file is in progress due to a previous
1077 * open request. When that open completes, it will invoke the
1078 * open callback for this request.
1079 */
1080 }
1081 }
1082
1083 void
1084 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1085 spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1086 {
1087 struct spdk_file *f = NULL;
1088 struct spdk_fs_request *req;
1089 struct spdk_fs_cb_args *args;
1090
1091 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1092 cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1093 return;
1094 }
1095
1096 f = fs_find_file(fs, name);
1097 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1098 cb_fn(cb_arg, NULL, -ENOENT);
1099 return;
1100 }
1101
1102 if (f != NULL && f->is_deleted == true) {
1103 cb_fn(cb_arg, NULL, -ENOENT);
1104 return;
1105 }
1106
1107 req = alloc_fs_request(fs->md_target.md_fs_channel);
1108 if (req == NULL) {
1109 cb_fn(cb_arg, NULL, -ENOMEM);
1110 return;
1111 }
1112
1113 args = &req->args;
1114 args->fn.file_op_with_handle = cb_fn;
1115 args->arg = cb_arg;
1116 args->file = f;
1117 args->fs = fs;
1118 args->op.open.name = name;
1119
1120 if (f == NULL) {
1121 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1122 } else {
1123 fs_open_blob_create_cb(req, 0);
1124 }
1125 }
1126
1127 static void
1128 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1129 {
1130 struct spdk_fs_request *req = arg;
1131 struct spdk_fs_cb_args *args = &req->args;
1132
1133 args->file = file;
1134 __wake_caller(args, bserrno);
1135 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1136 }
1137
1138 static void
1139 __fs_open_file(void *arg)
1140 {
1141 struct spdk_fs_request *req = arg;
1142 struct spdk_fs_cb_args *args = &req->args;
1143
1144 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1145 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1146 __fs_open_file_done, req);
1147 }
1148
1149 int
1150 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1151 const char *name, uint32_t flags, struct spdk_file **file)
1152 {
1153 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1154 struct spdk_fs_request *req;
1155 struct spdk_fs_cb_args *args;
1156 int rc;
1157
1158 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1159
1160 req = alloc_fs_request(channel);
1161 if (req == NULL) {
1162 return -ENOMEM;
1163 }
1164
1165 args = &req->args;
1166 args->fs = fs;
1167 args->op.open.name = name;
1168 args->op.open.flags = flags;
1169 args->sem = &channel->sem;
1170 fs->send_request(__fs_open_file, req);
1171 sem_wait(&channel->sem);
1172 rc = args->rc;
1173 if (rc == 0) {
1174 *file = args->file;
1175 } else {
1176 *file = NULL;
1177 }
1178 free_fs_request(req);
1179
1180 return rc;
1181 }
1182
1183 static void
1184 fs_rename_blob_close_cb(void *ctx, int bserrno)
1185 {
1186 struct spdk_fs_request *req = ctx;
1187 struct spdk_fs_cb_args *args = &req->args;
1188
1189 args->fn.fs_op(args->arg, bserrno);
1190 free_fs_request(req);
1191 }
1192
1193 static void
1194 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1195 {
1196 struct spdk_fs_request *req = ctx;
1197 struct spdk_fs_cb_args *args = &req->args;
1198 const char *new_name = args->op.rename.new_name;
1199
1200 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1201 spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1202 }
1203
1204 static void
1205 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1206 {
1207 struct spdk_fs_cb_args *args = &req->args;
1208 struct spdk_file *f;
1209
1210 f = fs_find_file(args->fs, args->op.rename.old_name);
1211 if (f == NULL) {
1212 args->fn.fs_op(args->arg, -ENOENT);
1213 free_fs_request(req);
1214 return;
1215 }
1216
1217 free(f->name);
1218 f->name = strdup(args->op.rename.new_name);
1219 args->file = f;
1220 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1221 }
1222
1223 static void
1224 fs_rename_delete_done(void *arg, int fserrno)
1225 {
1226 __spdk_fs_md_rename_file(arg);
1227 }
1228
1229 void
1230 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1231 const char *old_name, const char *new_name,
1232 spdk_file_op_complete cb_fn, void *cb_arg)
1233 {
1234 struct spdk_file *f;
1235 struct spdk_fs_request *req;
1236 struct spdk_fs_cb_args *args;
1237
1238 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1239 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1240 cb_fn(cb_arg, -ENAMETOOLONG);
1241 return;
1242 }
1243
1244 req = alloc_fs_request(fs->md_target.md_fs_channel);
1245 if (req == NULL) {
1246 cb_fn(cb_arg, -ENOMEM);
1247 return;
1248 }
1249
1250 args = &req->args;
1251 args->fn.fs_op = cb_fn;
1252 args->fs = fs;
1253 args->arg = cb_arg;
1254 args->op.rename.old_name = old_name;
1255 args->op.rename.new_name = new_name;
1256
1257 f = fs_find_file(fs, new_name);
1258 if (f == NULL) {
1259 __spdk_fs_md_rename_file(req);
1260 return;
1261 }
1262
1263 /*
1264 * The rename overwrites an existing file. So delete the existing file, then
1265 * do the actual rename.
1266 */
1267 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1268 }
1269
1270 static void
1271 __fs_rename_file_done(void *arg, int fserrno)
1272 {
1273 struct spdk_fs_request *req = arg;
1274 struct spdk_fs_cb_args *args = &req->args;
1275
1276 __wake_caller(args, fserrno);
1277 }
1278
1279 static void
1280 __fs_rename_file(void *arg)
1281 {
1282 struct spdk_fs_request *req = arg;
1283 struct spdk_fs_cb_args *args = &req->args;
1284
1285 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1286 __fs_rename_file_done, req);
1287 }
1288
1289 int
1290 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1291 const char *old_name, const char *new_name)
1292 {
1293 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1294 struct spdk_fs_request *req;
1295 struct spdk_fs_cb_args *args;
1296 int rc;
1297
1298 req = alloc_fs_request(channel);
1299 if (req == NULL) {
1300 return -ENOMEM;
1301 }
1302
1303 args = &req->args;
1304
1305 args->fs = fs;
1306 args->op.rename.old_name = old_name;
1307 args->op.rename.new_name = new_name;
1308 args->sem = &channel->sem;
1309 fs->send_request(__fs_rename_file, req);
1310 sem_wait(&channel->sem);
1311 rc = args->rc;
1312 free_fs_request(req);
1313 return rc;
1314 }
1315
1316 static void
1317 blob_delete_cb(void *ctx, int bserrno)
1318 {
1319 struct spdk_fs_request *req = ctx;
1320 struct spdk_fs_cb_args *args = &req->args;
1321
1322 args->fn.file_op(args->arg, bserrno);
1323 free_fs_request(req);
1324 }
1325
1326 void
1327 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1328 spdk_file_op_complete cb_fn, void *cb_arg)
1329 {
1330 struct spdk_file *f;
1331 spdk_blob_id blobid;
1332 struct spdk_fs_request *req;
1333 struct spdk_fs_cb_args *args;
1334
1335 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1336
1337 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1338 cb_fn(cb_arg, -ENAMETOOLONG);
1339 return;
1340 }
1341
1342 f = fs_find_file(fs, name);
1343 if (f == NULL) {
1344 cb_fn(cb_arg, -ENOENT);
1345 return;
1346 }
1347
1348 req = alloc_fs_request(fs->md_target.md_fs_channel);
1349 if (req == NULL) {
1350 cb_fn(cb_arg, -ENOMEM);
1351 return;
1352 }
1353
1354 args = &req->args;
1355 args->fn.file_op = cb_fn;
1356 args->arg = cb_arg;
1357
1358 if (f->ref_count > 0) {
1359 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1360 f->is_deleted = true;
1361 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1362 spdk_blob_sync_md(f->blob, blob_delete_cb, args);
1363 return;
1364 }
1365
1366 TAILQ_REMOVE(&fs->files, f, tailq);
1367
1368 cache_free_buffers(f);
1369
1370 blobid = f->blobid;
1371
1372 free(f->name);
1373 free(f->tree);
1374 free(f);
1375
1376 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1377 }
1378
1379 static void
1380 __fs_delete_file_done(void *arg, int fserrno)
1381 {
1382 struct spdk_fs_request *req = arg;
1383 struct spdk_fs_cb_args *args = &req->args;
1384
1385 __wake_caller(args, fserrno);
1386 }
1387
1388 static void
1389 __fs_delete_file(void *arg)
1390 {
1391 struct spdk_fs_request *req = arg;
1392 struct spdk_fs_cb_args *args = &req->args;
1393
1394 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1395 }
1396
1397 int
1398 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1399 const char *name)
1400 {
1401 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1402 struct spdk_fs_request *req;
1403 struct spdk_fs_cb_args *args;
1404 int rc;
1405
1406 req = alloc_fs_request(channel);
1407 if (req == NULL) {
1408 return -ENOMEM;
1409 }
1410
1411 args = &req->args;
1412 args->fs = fs;
1413 args->op.delete.name = name;
1414 args->sem = &channel->sem;
1415 fs->send_request(__fs_delete_file, req);
1416 sem_wait(&channel->sem);
1417 rc = args->rc;
1418 free_fs_request(req);
1419
1420 return rc;
1421 }
1422
1423 spdk_fs_iter
1424 spdk_fs_iter_first(struct spdk_filesystem *fs)
1425 {
1426 struct spdk_file *f;
1427
1428 f = TAILQ_FIRST(&fs->files);
1429 return f;
1430 }
1431
1432 spdk_fs_iter
1433 spdk_fs_iter_next(spdk_fs_iter iter)
1434 {
1435 struct spdk_file *f = iter;
1436
1437 if (f == NULL) {
1438 return NULL;
1439 }
1440
1441 f = TAILQ_NEXT(f, tailq);
1442 return f;
1443 }
1444
1445 const char *
1446 spdk_file_get_name(struct spdk_file *file)
1447 {
1448 return file->name;
1449 }
1450
1451 uint64_t
1452 spdk_file_get_length(struct spdk_file *file)
1453 {
1454 assert(file != NULL);
1455 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1456 return file->length;
1457 }
1458
1459 static void
1460 fs_truncate_complete_cb(void *ctx, int bserrno)
1461 {
1462 struct spdk_fs_request *req = ctx;
1463 struct spdk_fs_cb_args *args = &req->args;
1464
1465 args->fn.file_op(args->arg, bserrno);
1466 free_fs_request(req);
1467 }
1468
1469 static void
1470 fs_truncate_resize_cb(void *ctx, int bserrno)
1471 {
1472 struct spdk_fs_request *req = ctx;
1473 struct spdk_fs_cb_args *args = &req->args;
1474 struct spdk_file *file = args->file;
1475 uint64_t *length = &args->op.truncate.length;
1476
1477 if (bserrno) {
1478 args->fn.file_op(args->arg, bserrno);
1479 free_fs_request(req);
1480 return;
1481 }
1482
1483 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1484
1485 file->length = *length;
1486 if (file->append_pos > file->length) {
1487 file->append_pos = file->length;
1488 }
1489
1490 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args);
1491 }
1492
1493 static uint64_t
1494 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1495 {
1496 return (length + cluster_sz - 1) / cluster_sz;
1497 }
1498
1499 void
1500 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1501 spdk_file_op_complete cb_fn, void *cb_arg)
1502 {
1503 struct spdk_filesystem *fs;
1504 size_t num_clusters;
1505 struct spdk_fs_request *req;
1506 struct spdk_fs_cb_args *args;
1507
1508 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1509 if (length == file->length) {
1510 cb_fn(cb_arg, 0);
1511 return;
1512 }
1513
1514 req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1515 if (req == NULL) {
1516 cb_fn(cb_arg, -ENOMEM);
1517 return;
1518 }
1519
1520 args = &req->args;
1521 args->fn.file_op = cb_fn;
1522 args->arg = cb_arg;
1523 args->file = file;
1524 args->op.truncate.length = length;
1525 fs = file->fs;
1526
1527 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1528
1529 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1530 }
1531
1532 static void
1533 __truncate(void *arg)
1534 {
1535 struct spdk_fs_request *req = arg;
1536 struct spdk_fs_cb_args *args = &req->args;
1537
1538 spdk_file_truncate_async(args->file, args->op.truncate.length,
1539 args->fn.file_op, args);
1540 }
1541
1542 int
1543 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1544 uint64_t length)
1545 {
1546 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1547 struct spdk_fs_request *req;
1548 struct spdk_fs_cb_args *args;
1549 int rc;
1550
1551 req = alloc_fs_request(channel);
1552 if (req == NULL) {
1553 return -ENOMEM;
1554 }
1555
1556 args = &req->args;
1557
1558 args->file = file;
1559 args->op.truncate.length = length;
1560 args->fn.file_op = __wake_caller;
1561 args->sem = &channel->sem;
1562
1563 channel->send_request(__truncate, req);
1564 sem_wait(&channel->sem);
1565 rc = args->rc;
1566 free_fs_request(req);
1567
1568 return rc;
1569 }
1570
1571 static void
1572 __rw_done(void *ctx, int bserrno)
1573 {
1574 struct spdk_fs_request *req = ctx;
1575 struct spdk_fs_cb_args *args = &req->args;
1576
1577 spdk_dma_free(args->op.rw.pin_buf);
1578 args->fn.file_op(args->arg, bserrno);
1579 free_fs_request(req);
1580 }
1581
1582 static void
1583 __read_done(void *ctx, int bserrno)
1584 {
1585 struct spdk_fs_request *req = ctx;
1586 struct spdk_fs_cb_args *args = &req->args;
1587
1588 assert(req != NULL);
1589 if (args->op.rw.is_read) {
1590 memcpy(args->op.rw.user_buf,
1591 args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1592 args->op.rw.length);
1593 __rw_done(req, 0);
1594 } else {
1595 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1596 args->op.rw.user_buf,
1597 args->op.rw.length);
1598 spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1599 args->op.rw.pin_buf,
1600 args->op.rw.start_lba, args->op.rw.num_lba,
1601 __rw_done, req);
1602 }
1603 }
1604
1605 static void
1606 __do_blob_read(void *ctx, int fserrno)
1607 {
1608 struct spdk_fs_request *req = ctx;
1609 struct spdk_fs_cb_args *args = &req->args;
1610
1611 if (fserrno) {
1612 __rw_done(req, fserrno);
1613 return;
1614 }
1615 spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1616 args->op.rw.pin_buf,
1617 args->op.rw.start_lba, args->op.rw.num_lba,
1618 __read_done, req);
1619 }
1620
1621 static void
1622 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1623 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1624 {
1625 uint64_t end_lba;
1626
1627 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1628 *start_lba = offset / *lba_size;
1629 end_lba = (offset + length - 1) / *lba_size;
1630 *num_lba = (end_lba - *start_lba + 1);
1631 }
1632
1633 static void
1634 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1635 void *payload, uint64_t offset, uint64_t length,
1636 spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1637 {
1638 struct spdk_fs_request *req;
1639 struct spdk_fs_cb_args *args;
1640 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1641 uint64_t start_lba, num_lba, pin_buf_length;
1642 uint32_t lba_size;
1643
1644 if (is_read && offset + length > file->length) {
1645 cb_fn(cb_arg, -EINVAL);
1646 return;
1647 }
1648
1649 req = alloc_fs_request(channel);
1650 if (req == NULL) {
1651 cb_fn(cb_arg, -ENOMEM);
1652 return;
1653 }
1654
1655 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1656
1657 args = &req->args;
1658 args->fn.file_op = cb_fn;
1659 args->arg = cb_arg;
1660 args->file = file;
1661 args->op.rw.channel = channel->bs_channel;
1662 args->op.rw.user_buf = payload;
1663 args->op.rw.is_read = is_read;
1664 args->op.rw.offset = offset;
1665 args->op.rw.length = length;
1666 args->op.rw.blocklen = lba_size;
1667
1668 pin_buf_length = num_lba * lba_size;
1669 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, lba_size, NULL);
1670 if (args->op.rw.pin_buf == NULL) {
1671 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1672 file->name, offset, length);
1673 free_fs_request(req);
1674 cb_fn(cb_arg, -ENOMEM);
1675 return;
1676 }
1677
1678 args->op.rw.start_lba = start_lba;
1679 args->op.rw.num_lba = num_lba;
1680
1681 if (!is_read && file->length < offset + length) {
1682 spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1683 } else {
1684 __do_blob_read(req, 0);
1685 }
1686 }
1687
1688 void
1689 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1690 void *payload, uint64_t offset, uint64_t length,
1691 spdk_file_op_complete cb_fn, void *cb_arg)
1692 {
1693 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1694 }
1695
1696 void
1697 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1698 void *payload, uint64_t offset, uint64_t length,
1699 spdk_file_op_complete cb_fn, void *cb_arg)
1700 {
1701 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1702 file->name, offset, length);
1703 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1704 }
1705
1706 struct spdk_io_channel *
1707 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1708 {
1709 struct spdk_io_channel *io_channel;
1710 struct spdk_fs_channel *fs_channel;
1711
1712 io_channel = spdk_get_io_channel(&fs->io_target);
1713 fs_channel = spdk_io_channel_get_ctx(io_channel);
1714 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1715 fs_channel->send_request = __send_request_direct;
1716
1717 return io_channel;
1718 }
1719
1720 struct spdk_io_channel *
1721 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1722 {
1723 struct spdk_io_channel *io_channel;
1724 struct spdk_fs_channel *fs_channel;
1725
1726 io_channel = spdk_get_io_channel(&fs->io_target);
1727 fs_channel = spdk_io_channel_get_ctx(io_channel);
1728 fs_channel->send_request = fs->send_request;
1729 fs_channel->sync = 1;
1730 pthread_spin_init(&fs_channel->lock, 0);
1731
1732 return io_channel;
1733 }
1734
1735 void
1736 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1737 {
1738 spdk_put_io_channel(channel);
1739 }
1740
1741 void
1742 spdk_fs_set_cache_size(uint64_t size_in_mb)
1743 {
1744 g_fs_cache_size = size_in_mb * 1024 * 1024;
1745 }
1746
1747 uint64_t
1748 spdk_fs_get_cache_size(void)
1749 {
1750 return g_fs_cache_size / (1024 * 1024);
1751 }
1752
1753 static void __file_flush(void *_args);
1754
1755 static void *
1756 alloc_cache_memory_buffer(struct spdk_file *context)
1757 {
1758 struct spdk_file *file;
1759 void *buf;
1760
1761 buf = spdk_mempool_get(g_cache_pool);
1762 if (buf != NULL) {
1763 return buf;
1764 }
1765
1766 pthread_spin_lock(&g_caches_lock);
1767 TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1768 if (!file->open_for_writing &&
1769 file->priority == SPDK_FILE_PRIORITY_LOW &&
1770 file != context) {
1771 break;
1772 }
1773 }
1774 pthread_spin_unlock(&g_caches_lock);
1775 if (file != NULL) {
1776 cache_free_buffers(file);
1777 buf = spdk_mempool_get(g_cache_pool);
1778 if (buf != NULL) {
1779 return buf;
1780 }
1781 }
1782
1783 pthread_spin_lock(&g_caches_lock);
1784 TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1785 if (!file->open_for_writing && file != context) {
1786 break;
1787 }
1788 }
1789 pthread_spin_unlock(&g_caches_lock);
1790 if (file != NULL) {
1791 cache_free_buffers(file);
1792 buf = spdk_mempool_get(g_cache_pool);
1793 if (buf != NULL) {
1794 return buf;
1795 }
1796 }
1797
1798 pthread_spin_lock(&g_caches_lock);
1799 TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1800 if (file != context) {
1801 break;
1802 }
1803 }
1804 pthread_spin_unlock(&g_caches_lock);
1805 if (file != NULL) {
1806 cache_free_buffers(file);
1807 buf = spdk_mempool_get(g_cache_pool);
1808 if (buf != NULL) {
1809 return buf;
1810 }
1811 }
1812
1813 return NULL;
1814 }
1815
1816 static struct cache_buffer *
1817 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1818 {
1819 struct cache_buffer *buf;
1820 int count = 0;
1821
1822 buf = calloc(1, sizeof(*buf));
1823 if (buf == NULL) {
1824 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1825 return NULL;
1826 }
1827
1828 buf->buf = alloc_cache_memory_buffer(file);
1829 while (buf->buf == NULL) {
1830 /*
1831 * TODO: alloc_cache_memory_buffer() should eventually free
1832 * some buffers. Need a more sophisticated check here, instead
1833 * of just bailing if 100 tries does not result in getting a
1834 * free buffer. This will involve using the sync channel's
1835 * semaphore to block until a buffer becomes available.
1836 */
1837 if (count++ == 100) {
1838 SPDK_ERRLOG("could not allocate cache buffer\n");
1839 assert(false);
1840 free(buf);
1841 return NULL;
1842 }
1843 buf->buf = alloc_cache_memory_buffer(file);
1844 }
1845
1846 buf->buf_size = CACHE_BUFFER_SIZE;
1847 buf->offset = offset;
1848
1849 pthread_spin_lock(&g_caches_lock);
1850 if (file->tree->present_mask == 0) {
1851 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1852 }
1853 file->tree = spdk_tree_insert_buffer(file->tree, buf);
1854 pthread_spin_unlock(&g_caches_lock);
1855
1856 return buf;
1857 }
1858
1859 static struct cache_buffer *
1860 cache_append_buffer(struct spdk_file *file)
1861 {
1862 struct cache_buffer *last;
1863
1864 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1865 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1866
1867 last = cache_insert_buffer(file, file->append_pos);
1868 if (last == NULL) {
1869 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1870 return NULL;
1871 }
1872
1873 file->last = last;
1874
1875 return last;
1876 }
1877
1878 static void __check_sync_reqs(struct spdk_file *file);
1879
1880 static void
1881 __file_cache_finish_sync(void *ctx, int bserrno)
1882 {
1883 struct spdk_file *file = ctx;
1884 struct spdk_fs_request *sync_req;
1885 struct spdk_fs_cb_args *sync_args;
1886
1887 pthread_spin_lock(&file->lock);
1888 sync_req = TAILQ_FIRST(&file->sync_requests);
1889 sync_args = &sync_req->args;
1890 assert(sync_args->op.sync.offset <= file->length_flushed);
1891 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1892 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1893 pthread_spin_unlock(&file->lock);
1894
1895 sync_args->fn.file_op(sync_args->arg, bserrno);
1896 __check_sync_reqs(file);
1897
1898 pthread_spin_lock(&file->lock);
1899 free_fs_request(sync_req);
1900 pthread_spin_unlock(&file->lock);
1901 }
1902
1903 static void
1904 __free_args(struct spdk_fs_cb_args *args)
1905 {
1906 struct spdk_fs_request *req;
1907
1908 if (!args->from_request) {
1909 free(args);
1910 } else {
1911 /* Depends on args being at the start of the spdk_fs_request structure. */
1912 req = (struct spdk_fs_request *)args;
1913 free_fs_request(req);
1914 }
1915 }
1916
1917 static void
1918 __check_sync_reqs(struct spdk_file *file)
1919 {
1920 struct spdk_fs_request *sync_req;
1921
1922 pthread_spin_lock(&file->lock);
1923
1924 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1925 if (sync_req->args.op.sync.offset <= file->length_flushed) {
1926 break;
1927 }
1928 }
1929
1930 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1931 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1932 sync_req->args.op.sync.xattr_in_progress = true;
1933 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1934 sizeof(file->length_flushed));
1935
1936 pthread_spin_unlock(&file->lock);
1937 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1938 } else {
1939 pthread_spin_unlock(&file->lock);
1940 }
1941 }
1942
1943 static void
1944 __file_flush_done(void *arg, int bserrno)
1945 {
1946 struct spdk_fs_cb_args *args = arg;
1947 struct spdk_file *file = args->file;
1948 struct cache_buffer *next = args->op.flush.cache_buffer;
1949
1950 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1951
1952 pthread_spin_lock(&file->lock);
1953 next->in_progress = false;
1954 next->bytes_flushed += args->op.flush.length;
1955 file->length_flushed += args->op.flush.length;
1956 if (file->length_flushed > file->length) {
1957 file->length = file->length_flushed;
1958 }
1959 if (next->bytes_flushed == next->buf_size) {
1960 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1961 next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1962 }
1963
1964 /*
1965 * Assert that there is no cached data that extends past the end of the underlying
1966 * blob.
1967 */
1968 assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1969 next->bytes_filled == 0);
1970
1971 pthread_spin_unlock(&file->lock);
1972
1973 __check_sync_reqs(file);
1974
1975 __file_flush(args);
1976 }
1977
1978 static void
1979 __file_flush(void *_args)
1980 {
1981 struct spdk_fs_cb_args *args = _args;
1982 struct spdk_file *file = args->file;
1983 struct cache_buffer *next;
1984 uint64_t offset, length, start_lba, num_lba;
1985 uint32_t lba_size;
1986
1987 pthread_spin_lock(&file->lock);
1988 next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1989 if (next == NULL || next->in_progress) {
1990 /*
1991 * There is either no data to flush, or a flush I/O is already in
1992 * progress. So return immediately - if a flush I/O is in
1993 * progress we will flush more data after that is completed.
1994 */
1995 __free_args(args);
1996 if (next == NULL) {
1997 /*
1998 * For cases where a file's cache was evicted, and then the
1999 * file was later appended, we will write the data directly
2000 * to disk and bypass cache. So just update length_flushed
2001 * here to reflect that all data was already written to disk.
2002 */
2003 file->length_flushed = file->append_pos;
2004 }
2005 pthread_spin_unlock(&file->lock);
2006 if (next == NULL) {
2007 /*
2008 * There is no data to flush, but we still need to check for any
2009 * outstanding sync requests to make sure metadata gets updated.
2010 */
2011 __check_sync_reqs(file);
2012 }
2013 return;
2014 }
2015
2016 offset = next->offset + next->bytes_flushed;
2017 length = next->bytes_filled - next->bytes_flushed;
2018 if (length == 0) {
2019 __free_args(args);
2020 pthread_spin_unlock(&file->lock);
2021 return;
2022 }
2023 args->op.flush.length = length;
2024 args->op.flush.cache_buffer = next;
2025
2026 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2027
2028 next->in_progress = true;
2029 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2030 offset, length, start_lba, num_lba);
2031 pthread_spin_unlock(&file->lock);
2032 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2033 next->buf + (start_lba * lba_size) - next->offset,
2034 start_lba, num_lba, __file_flush_done, args);
2035 }
2036
2037 static void
2038 __file_extend_done(void *arg, int bserrno)
2039 {
2040 struct spdk_fs_cb_args *args = arg;
2041
2042 __wake_caller(args, bserrno);
2043 }
2044
2045 static void
2046 __file_extend_resize_cb(void *_args, int bserrno)
2047 {
2048 struct spdk_fs_cb_args *args = _args;
2049 struct spdk_file *file = args->file;
2050
2051 if (bserrno) {
2052 __wake_caller(args, bserrno);
2053 return;
2054 }
2055
2056 spdk_blob_sync_md(file->blob, __file_extend_done, args);
2057 }
2058
2059 static void
2060 __file_extend_blob(void *_args)
2061 {
2062 struct spdk_fs_cb_args *args = _args;
2063 struct spdk_file *file = args->file;
2064
2065 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2066 }
2067
2068 static void
2069 __rw_from_file_done(void *arg, int bserrno)
2070 {
2071 struct spdk_fs_cb_args *args = arg;
2072
2073 __wake_caller(args, bserrno);
2074 __free_args(args);
2075 }
2076
2077 static void
2078 __rw_from_file(void *_args)
2079 {
2080 struct spdk_fs_cb_args *args = _args;
2081 struct spdk_file *file = args->file;
2082
2083 if (args->op.rw.is_read) {
2084 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2085 args->op.rw.offset, args->op.rw.length,
2086 __rw_from_file_done, args);
2087 } else {
2088 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2089 args->op.rw.offset, args->op.rw.length,
2090 __rw_from_file_done, args);
2091 }
2092 }
2093
2094 static int
2095 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
2096 uint64_t offset, uint64_t length, bool is_read)
2097 {
2098 struct spdk_fs_cb_args *args;
2099
2100 args = calloc(1, sizeof(*args));
2101 if (args == NULL) {
2102 sem_post(sem);
2103 return -ENOMEM;
2104 }
2105
2106 args->file = file;
2107 args->sem = sem;
2108 args->op.rw.user_buf = payload;
2109 args->op.rw.offset = offset;
2110 args->op.rw.length = length;
2111 args->op.rw.is_read = is_read;
2112 file->fs->send_request(__rw_from_file, args);
2113 return 0;
2114 }
2115
2116 int
2117 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
2118 void *payload, uint64_t offset, uint64_t length)
2119 {
2120 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2121 struct spdk_fs_cb_args *args;
2122 uint64_t rem_length, copy, blob_size, cluster_sz;
2123 uint32_t cache_buffers_filled = 0;
2124 uint8_t *cur_payload;
2125 struct cache_buffer *last;
2126
2127 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2128
2129 if (length == 0) {
2130 return 0;
2131 }
2132
2133 if (offset != file->append_pos) {
2134 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2135 return -EINVAL;
2136 }
2137
2138 pthread_spin_lock(&file->lock);
2139 file->open_for_writing = true;
2140
2141 if (file->last == NULL) {
2142 if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
2143 cache_append_buffer(file);
2144 } else {
2145 int rc;
2146
2147 file->append_pos += length;
2148 pthread_spin_unlock(&file->lock);
2149 rc = __send_rw_from_file(file, &channel->sem, payload,
2150 offset, length, false);
2151 sem_wait(&channel->sem);
2152 return rc;
2153 }
2154 }
2155
2156 blob_size = __file_get_blob_size(file);
2157
2158 if ((offset + length) > blob_size) {
2159 struct spdk_fs_cb_args extend_args = {};
2160
2161 cluster_sz = file->fs->bs_opts.cluster_sz;
2162 extend_args.sem = &channel->sem;
2163 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2164 extend_args.file = file;
2165 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2166 pthread_spin_unlock(&file->lock);
2167 file->fs->send_request(__file_extend_blob, &extend_args);
2168 sem_wait(&channel->sem);
2169 if (extend_args.rc) {
2170 return extend_args.rc;
2171 }
2172 }
2173
2174 last = file->last;
2175 rem_length = length;
2176 cur_payload = payload;
2177 while (rem_length > 0) {
2178 copy = last->buf_size - last->bytes_filled;
2179 if (copy > rem_length) {
2180 copy = rem_length;
2181 }
2182 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy);
2183 memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2184 file->append_pos += copy;
2185 if (file->length < file->append_pos) {
2186 file->length = file->append_pos;
2187 }
2188 cur_payload += copy;
2189 last->bytes_filled += copy;
2190 rem_length -= copy;
2191 if (last->bytes_filled == last->buf_size) {
2192 cache_buffers_filled++;
2193 last = cache_append_buffer(file);
2194 if (last == NULL) {
2195 BLOBFS_TRACE(file, "nomem\n");
2196 pthread_spin_unlock(&file->lock);
2197 return -ENOMEM;
2198 }
2199 }
2200 }
2201
2202 pthread_spin_unlock(&file->lock);
2203
2204 if (cache_buffers_filled == 0) {
2205 return 0;
2206 }
2207
2208 args = calloc(1, sizeof(*args));
2209 if (args == NULL) {
2210 return -ENOMEM;
2211 }
2212
2213 args->file = file;
2214 file->fs->send_request(__file_flush, args);
2215 return 0;
2216 }
2217
2218 static void
2219 __readahead_done(void *arg, int bserrno)
2220 {
2221 struct spdk_fs_cb_args *args = arg;
2222 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2223 struct spdk_file *file = args->file;
2224
2225 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2226
2227 pthread_spin_lock(&file->lock);
2228 cache_buffer->bytes_filled = args->op.readahead.length;
2229 cache_buffer->bytes_flushed = args->op.readahead.length;
2230 cache_buffer->in_progress = false;
2231 pthread_spin_unlock(&file->lock);
2232
2233 __free_args(args);
2234 }
2235
2236 static void
2237 __readahead(void *_args)
2238 {
2239 struct spdk_fs_cb_args *args = _args;
2240 struct spdk_file *file = args->file;
2241 uint64_t offset, length, start_lba, num_lba;
2242 uint32_t lba_size;
2243
2244 offset = args->op.readahead.offset;
2245 length = args->op.readahead.length;
2246 assert(length > 0);
2247
2248 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2249
2250 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2251 offset, length, start_lba, num_lba);
2252 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2253 args->op.readahead.cache_buffer->buf,
2254 start_lba, num_lba, __readahead_done, args);
2255 }
2256
2257 static uint64_t
2258 __next_cache_buffer_offset(uint64_t offset)
2259 {
2260 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2261 }
2262
2263 static void
2264 check_readahead(struct spdk_file *file, uint64_t offset)
2265 {
2266 struct spdk_fs_cb_args *args;
2267
2268 offset = __next_cache_buffer_offset(offset);
2269 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2270 return;
2271 }
2272
2273 args = calloc(1, sizeof(*args));
2274 if (args == NULL) {
2275 return;
2276 }
2277
2278 BLOBFS_TRACE(file, "offset=%jx\n", offset);
2279
2280 args->file = file;
2281 args->op.readahead.offset = offset;
2282 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2283 if (!args->op.readahead.cache_buffer) {
2284 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2285 free(args);
2286 return;
2287 }
2288
2289 args->op.readahead.cache_buffer->in_progress = true;
2290 if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2291 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2292 } else {
2293 args->op.readahead.length = CACHE_BUFFER_SIZE;
2294 }
2295 file->fs->send_request(__readahead, args);
2296 }
2297
2298 static int
2299 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2300 {
2301 struct cache_buffer *buf;
2302 int rc;
2303
2304 buf = spdk_tree_find_filled_buffer(file->tree, offset);
2305 if (buf == NULL) {
2306 pthread_spin_unlock(&file->lock);
2307 rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2308 pthread_spin_lock(&file->lock);
2309 return rc;
2310 }
2311
2312 if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2313 length = buf->offset + buf->bytes_filled - offset;
2314 }
2315 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2316 memcpy(payload, &buf->buf[offset - buf->offset], length);
2317 if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2318 pthread_spin_lock(&g_caches_lock);
2319 spdk_tree_remove_buffer(file->tree, buf);
2320 if (file->tree->present_mask == 0) {
2321 TAILQ_REMOVE(&g_caches, file, cache_tailq);
2322 }
2323 pthread_spin_unlock(&g_caches_lock);
2324 }
2325
2326 sem_post(sem);
2327 return 0;
2328 }
2329
2330 int64_t
2331 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2332 void *payload, uint64_t offset, uint64_t length)
2333 {
2334 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2335 uint64_t final_offset, final_length;
2336 uint32_t sub_reads = 0;
2337 int rc = 0;
2338
2339 pthread_spin_lock(&file->lock);
2340
2341 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2342
2343 file->open_for_writing = false;
2344
2345 if (length == 0 || offset >= file->append_pos) {
2346 pthread_spin_unlock(&file->lock);
2347 return 0;
2348 }
2349
2350 if (offset + length > file->append_pos) {
2351 length = file->append_pos - offset;
2352 }
2353
2354 if (offset != file->next_seq_offset) {
2355 file->seq_byte_count = 0;
2356 }
2357 file->seq_byte_count += length;
2358 file->next_seq_offset = offset + length;
2359 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2360 check_readahead(file, offset);
2361 check_readahead(file, offset + CACHE_BUFFER_SIZE);
2362 }
2363
2364 final_length = 0;
2365 final_offset = offset + length;
2366 while (offset < final_offset) {
2367 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2368 if (length > (final_offset - offset)) {
2369 length = final_offset - offset;
2370 }
2371 rc = __file_read(file, payload, offset, length, &channel->sem);
2372 if (rc == 0) {
2373 final_length += length;
2374 } else {
2375 break;
2376 }
2377 payload += length;
2378 offset += length;
2379 sub_reads++;
2380 }
2381 pthread_spin_unlock(&file->lock);
2382 while (sub_reads-- > 0) {
2383 sem_wait(&channel->sem);
2384 }
2385 if (rc == 0) {
2386 return final_length;
2387 } else {
2388 return rc;
2389 }
2390 }
2391
2392 static void
2393 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2394 spdk_file_op_complete cb_fn, void *cb_arg)
2395 {
2396 struct spdk_fs_request *sync_req;
2397 struct spdk_fs_request *flush_req;
2398 struct spdk_fs_cb_args *sync_args;
2399 struct spdk_fs_cb_args *flush_args;
2400
2401 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2402
2403 pthread_spin_lock(&file->lock);
2404 if (file->append_pos <= file->length_flushed) {
2405 BLOBFS_TRACE(file, "done - no data to flush\n");
2406 pthread_spin_unlock(&file->lock);
2407 cb_fn(cb_arg, 0);
2408 return;
2409 }
2410
2411 sync_req = alloc_fs_request(channel);
2412 if (!sync_req) {
2413 pthread_spin_unlock(&file->lock);
2414 cb_fn(cb_arg, -ENOMEM);
2415 return;
2416 }
2417 sync_args = &sync_req->args;
2418
2419 flush_req = alloc_fs_request(channel);
2420 if (!flush_req) {
2421 pthread_spin_unlock(&file->lock);
2422 cb_fn(cb_arg, -ENOMEM);
2423 return;
2424 }
2425 flush_args = &flush_req->args;
2426
2427 sync_args->file = file;
2428 sync_args->fn.file_op = cb_fn;
2429 sync_args->arg = cb_arg;
2430 sync_args->op.sync.offset = file->append_pos;
2431 sync_args->op.sync.xattr_in_progress = false;
2432 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2433 pthread_spin_unlock(&file->lock);
2434
2435 flush_args->file = file;
2436 channel->send_request(__file_flush, flush_args);
2437 }
2438
2439 int
2440 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2441 {
2442 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2443 struct spdk_fs_cb_args args = {};
2444
2445 args.sem = &channel->sem;
2446 _file_sync(file, channel, __wake_caller, &args);
2447 sem_wait(&channel->sem);
2448
2449 return args.rc;
2450 }
2451
2452 void
2453 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2454 spdk_file_op_complete cb_fn, void *cb_arg)
2455 {
2456 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2457
2458 _file_sync(file, channel, cb_fn, cb_arg);
2459 }
2460
2461 void
2462 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2463 {
2464 BLOBFS_TRACE(file, "priority=%u\n", priority);
2465 file->priority = priority;
2466
2467 }
2468
2469 /*
2470 * Close routines
2471 */
2472
2473 static void
2474 __file_close_async_done(void *ctx, int bserrno)
2475 {
2476 struct spdk_fs_request *req = ctx;
2477 struct spdk_fs_cb_args *args = &req->args;
2478 struct spdk_file *file = args->file;
2479
2480 if (file->is_deleted) {
2481 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2482 return;
2483 }
2484
2485 args->fn.file_op(args->arg, bserrno);
2486 free_fs_request(req);
2487 }
2488
2489 static void
2490 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2491 {
2492 struct spdk_blob *blob;
2493
2494 pthread_spin_lock(&file->lock);
2495 if (file->ref_count == 0) {
2496 pthread_spin_unlock(&file->lock);
2497 __file_close_async_done(req, -EBADF);
2498 return;
2499 }
2500
2501 file->ref_count--;
2502 if (file->ref_count > 0) {
2503 pthread_spin_unlock(&file->lock);
2504 req->args.fn.file_op(req->args.arg, 0);
2505 free_fs_request(req);
2506 return;
2507 }
2508
2509 pthread_spin_unlock(&file->lock);
2510
2511 blob = file->blob;
2512 file->blob = NULL;
2513 spdk_blob_close(blob, __file_close_async_done, req);
2514 }
2515
2516 static void
2517 __file_close_async__sync_done(void *arg, int fserrno)
2518 {
2519 struct spdk_fs_request *req = arg;
2520 struct spdk_fs_cb_args *args = &req->args;
2521
2522 __file_close_async(args->file, req);
2523 }
2524
2525 void
2526 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2527 {
2528 struct spdk_fs_request *req;
2529 struct spdk_fs_cb_args *args;
2530
2531 req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2532 if (req == NULL) {
2533 cb_fn(cb_arg, -ENOMEM);
2534 return;
2535 }
2536
2537 args = &req->args;
2538 args->file = file;
2539 args->fn.file_op = cb_fn;
2540 args->arg = cb_arg;
2541
2542 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2543 }
2544
2545 static void
2546 __file_close(void *arg)
2547 {
2548 struct spdk_fs_request *req = arg;
2549 struct spdk_fs_cb_args *args = &req->args;
2550 struct spdk_file *file = args->file;
2551
2552 __file_close_async(file, req);
2553 }
2554
2555 int
2556 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2557 {
2558 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2559 struct spdk_fs_request *req;
2560 struct spdk_fs_cb_args *args;
2561
2562 req = alloc_fs_request(channel);
2563 if (req == NULL) {
2564 return -ENOMEM;
2565 }
2566
2567 args = &req->args;
2568
2569 spdk_file_sync(file, _channel);
2570 BLOBFS_TRACE(file, "name=%s\n", file->name);
2571 args->file = file;
2572 args->sem = &channel->sem;
2573 args->fn.file_op = __wake_caller;
2574 args->arg = req;
2575 channel->send_request(__file_close, req);
2576 sem_wait(&channel->sem);
2577
2578 return args->rc;
2579 }
2580
2581 int
2582 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2583 {
2584 if (size < sizeof(spdk_blob_id)) {
2585 return -EINVAL;
2586 }
2587
2588 memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2589
2590 return sizeof(spdk_blob_id);
2591 }
2592
2593 static void
2594 cache_free_buffers(struct spdk_file *file)
2595 {
2596 BLOBFS_TRACE(file, "free=%s\n", file->name);
2597 pthread_spin_lock(&file->lock);
2598 pthread_spin_lock(&g_caches_lock);
2599 if (file->tree->present_mask == 0) {
2600 pthread_spin_unlock(&g_caches_lock);
2601 pthread_spin_unlock(&file->lock);
2602 return;
2603 }
2604 spdk_tree_free_buffers(file->tree);
2605
2606 TAILQ_REMOVE(&g_caches, file, cache_tailq);
2607 /* If not freed, put it in the end of the queue */
2608 if (file->tree->present_mask != 0) {
2609 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2610 }
2611 file->last = NULL;
2612 pthread_spin_unlock(&g_caches_lock);
2613 pthread_spin_unlock(&file->lock);
2614 }
2615
2616 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2617 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)