]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/blob/blobstore.c
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / spdk / lib / blob / blobstore.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include <stdbool.h>
35 #include <assert.h>
36 #include <errno.h>
37 #include <limits.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "spdk/blob.h"
42 #include "spdk/env.h"
43 #include "spdk/queue.h"
44 #include "spdk/io_channel.h"
45 #include "spdk/bit_array.h"
46
47 #include "spdk_internal/log.h"
48
49 #include "blobstore.h"
50 #include "request.h"
51
52 static inline size_t
53 divide_round_up(size_t num, size_t divisor)
54 {
55 return (num + divisor - 1) / divisor;
56 }
57
58 static void
59 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
60 {
61 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
62 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
63 assert(bs->num_free_clusters > 0);
64
65 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num);
66
67 spdk_bit_array_set(bs->used_clusters, cluster_num);
68 bs->num_free_clusters--;
69 }
70
71 static void
72 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
73 {
74 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
75 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
76 assert(bs->num_free_clusters < bs->total_clusters);
77
78 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num);
79
80 spdk_bit_array_clear(bs->used_clusters, cluster_num);
81 bs->num_free_clusters++;
82 }
83
84 static struct spdk_blob *
85 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
86 {
87 struct spdk_blob *blob;
88
89 blob = calloc(1, sizeof(*blob));
90 if (!blob) {
91 return NULL;
92 }
93
94 blob->id = id;
95 blob->bs = bs;
96
97 blob->state = SPDK_BLOB_STATE_DIRTY;
98 blob->active.num_pages = 1;
99 blob->active.pages = calloc(1, sizeof(*blob->active.pages));
100 if (!blob->active.pages) {
101 free(blob);
102 return NULL;
103 }
104
105 blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
106
107 TAILQ_INIT(&blob->xattrs);
108
109 return blob;
110 }
111
112 static void
113 _spdk_blob_free(struct spdk_blob *blob)
114 {
115 struct spdk_xattr *xattr, *xattr_tmp;
116
117 assert(blob != NULL);
118
119 free(blob->active.clusters);
120 free(blob->clean.clusters);
121 free(blob->active.pages);
122 free(blob->clean.pages);
123
124 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
125 TAILQ_REMOVE(&blob->xattrs, xattr, link);
126 free(xattr->name);
127 free(xattr->value);
128 free(xattr);
129 }
130
131 free(blob);
132 }
133
134 static int
135 _spdk_blob_mark_clean(struct spdk_blob *blob)
136 {
137 uint64_t *clusters = NULL;
138 uint32_t *pages = NULL;
139
140 assert(blob != NULL);
141 assert(blob->state == SPDK_BLOB_STATE_LOADING ||
142 blob->state == SPDK_BLOB_STATE_SYNCING);
143
144 if (blob->active.num_clusters) {
145 assert(blob->active.clusters);
146 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
147 if (!clusters) {
148 return -1;
149 }
150 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
151 }
152
153 if (blob->active.num_pages) {
154 assert(blob->active.pages);
155 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
156 if (!pages) {
157 free(clusters);
158 return -1;
159 }
160 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
161 }
162
163 free(blob->clean.clusters);
164 free(blob->clean.pages);
165
166 blob->clean.num_clusters = blob->active.num_clusters;
167 blob->clean.clusters = blob->active.clusters;
168 blob->clean.num_pages = blob->active.num_pages;
169 blob->clean.pages = blob->active.pages;
170
171 blob->active.clusters = clusters;
172 blob->active.pages = pages;
173
174 blob->state = SPDK_BLOB_STATE_CLEAN;
175
176 return 0;
177 }
178
179 static void
180 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
181 {
182 struct spdk_blob_md_descriptor *desc;
183 size_t cur_desc = 0;
184 void *tmp;
185
186 desc = (struct spdk_blob_md_descriptor *)page->descriptors;
187 while (cur_desc < sizeof(page->descriptors)) {
188 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
189 if (desc->length == 0) {
190 /* If padding and length are 0, this terminates the page */
191 break;
192 }
193 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
194 struct spdk_blob_md_descriptor_extent *desc_extent;
195 unsigned int i, j;
196 unsigned int cluster_count = blob->active.num_clusters;
197
198 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
199
200 assert(desc_extent->length > 0);
201 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0);
202
203 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
204 for (j = 0; j < desc_extent->extents[i].length; j++) {
205 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j));
206 cluster_count++;
207 }
208 }
209
210 assert(cluster_count > 0);
211 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
212 assert(tmp != NULL);
213 blob->active.clusters = tmp;
214 blob->active.cluster_array_size = cluster_count;
215
216 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
217 for (j = 0; j < desc_extent->extents[i].length; j++) {
218 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
219 desc_extent->extents[i].cluster_idx + j);
220 }
221 }
222
223 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
224 struct spdk_blob_md_descriptor_xattr *desc_xattr;
225 struct spdk_xattr *xattr;
226
227 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
228
229 xattr = calloc(1, sizeof(*xattr));
230 assert(xattr != NULL);
231
232 xattr->name = malloc(desc_xattr->name_length + 1);
233 assert(xattr->name);
234 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
235 xattr->name[desc_xattr->name_length] = '\0';
236
237 xattr->value = malloc(desc_xattr->value_length);
238 assert(xattr->value != NULL);
239 xattr->value_len = desc_xattr->value_length;
240 memcpy(xattr->value,
241 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
242 desc_xattr->value_length);
243
244 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
245 } else {
246 /* Error */
247 break;
248 }
249
250 /* Advance to the next descriptor */
251 cur_desc += sizeof(*desc) + desc->length;
252 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
253 break;
254 }
255 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
256 }
257 }
258
259 static int
260 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
261 struct spdk_blob *blob)
262 {
263 const struct spdk_blob_md_page *page;
264 uint32_t i;
265
266 assert(page_count > 0);
267 assert(pages[0].sequence_num == 0);
268 assert(blob != NULL);
269 assert(blob->state == SPDK_BLOB_STATE_LOADING);
270 assert(blob->active.clusters == NULL);
271 assert(blob->id == pages[0].id);
272 assert(blob->state == SPDK_BLOB_STATE_LOADING);
273
274 for (i = 0; i < page_count; i++) {
275 page = &pages[i];
276
277 assert(page->id == blob->id);
278 assert(page->sequence_num == i);
279
280 _spdk_blob_parse_page(page, blob);
281 }
282
283 return 0;
284 }
285
286 static int
287 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
288 struct spdk_blob_md_page **pages,
289 uint32_t *page_count,
290 struct spdk_blob_md_page **last_page)
291 {
292 struct spdk_blob_md_page *page;
293
294 assert(pages != NULL);
295 assert(page_count != NULL);
296
297 if (*page_count == 0) {
298 assert(*pages == NULL);
299 *page_count = 1;
300 *pages = spdk_malloc(sizeof(struct spdk_blob_md_page),
301 sizeof(struct spdk_blob_md_page),
302 NULL);
303 } else {
304 assert(*pages != NULL);
305 (*page_count)++;
306 *pages = spdk_realloc(*pages,
307 sizeof(struct spdk_blob_md_page) * (*page_count),
308 sizeof(struct spdk_blob_md_page),
309 NULL);
310 }
311
312 if (*pages == NULL) {
313 *page_count = 0;
314 *last_page = NULL;
315 return -ENOMEM;
316 }
317
318 page = &(*pages)[*page_count - 1];
319 memset(page, 0, sizeof(*page));
320 page->id = blob->id;
321 page->sequence_num = *page_count - 1;
322 page->next = SPDK_INVALID_MD_PAGE;
323 *last_page = page;
324
325 return 0;
326 }
327
328 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
329 * Update required_sz on both success and failure.
330 *
331 */
332 static int
333 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
334 uint8_t *buf, size_t buf_sz,
335 size_t *required_sz)
336 {
337 struct spdk_blob_md_descriptor_xattr *desc;
338
339 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
340 strlen(xattr->name) +
341 xattr->value_len;
342
343 if (buf_sz < *required_sz) {
344 return -1;
345 }
346
347 desc = (struct spdk_blob_md_descriptor_xattr *)buf;
348
349 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
350 desc->length = sizeof(desc->name_length) +
351 sizeof(desc->value_length) +
352 strlen(xattr->name) +
353 xattr->value_len;
354 desc->name_length = strlen(xattr->name);
355 desc->value_length = xattr->value_len;
356
357 memcpy(desc->name, xattr->name, desc->name_length);
358 memcpy((void *)((uintptr_t)desc->name + desc->name_length),
359 xattr->value,
360 desc->value_length);
361
362 return 0;
363 }
364
365 static void
366 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
367 uint64_t start_cluster, uint64_t *next_cluster,
368 uint8_t *buf, size_t buf_sz)
369 {
370 struct spdk_blob_md_descriptor_extent *desc;
371 size_t cur_sz;
372 uint64_t i, extent_idx;
373 uint32_t lba, lba_per_cluster, lba_count;
374
375 /* The buffer must have room for at least one extent */
376 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
377 if (buf_sz < cur_sz) {
378 *next_cluster = start_cluster;
379 return;
380 }
381
382 desc = (struct spdk_blob_md_descriptor_extent *)buf;
383 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
384
385 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
386
387 lba = blob->active.clusters[start_cluster];
388 lba_count = lba_per_cluster;
389 extent_idx = 0;
390 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
391 if ((lba + lba_count) == blob->active.clusters[i]) {
392 lba_count += lba_per_cluster;
393 continue;
394 }
395 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
396 desc->extents[extent_idx].length = lba_count / lba_per_cluster;
397 extent_idx++;
398
399 cur_sz += sizeof(desc->extents[extent_idx]);
400
401 if (buf_sz < cur_sz) {
402 /* If we ran out of buffer space, return */
403 desc->length = sizeof(desc->extents[0]) * extent_idx;
404 *next_cluster = i;
405 return;
406 }
407
408 lba = blob->active.clusters[i];
409 lba_count = lba_per_cluster;
410 }
411
412 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
413 desc->extents[extent_idx].length = lba_count / lba_per_cluster;
414 extent_idx++;
415
416 desc->length = sizeof(desc->extents[0]) * extent_idx;
417 *next_cluster = blob->active.num_clusters;
418
419 return;
420 }
421
422 static int
423 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
424 uint32_t *page_count)
425 {
426 struct spdk_blob_md_page *cur_page;
427 const struct spdk_xattr *xattr;
428 int rc;
429 uint8_t *buf;
430 size_t remaining_sz;
431
432 assert(pages != NULL);
433 assert(page_count != NULL);
434 assert(blob != NULL);
435 assert(blob->state == SPDK_BLOB_STATE_SYNCING);
436
437 *pages = NULL;
438 *page_count = 0;
439
440 /* A blob always has at least 1 page, even if it has no descriptors */
441 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
442 if (rc < 0) {
443 return rc;
444 }
445
446 buf = (uint8_t *)cur_page->descriptors;
447 remaining_sz = sizeof(cur_page->descriptors);
448
449 /* Serialize xattrs */
450 TAILQ_FOREACH(xattr, &blob->xattrs, link) {
451 size_t required_sz = 0;
452 rc = _spdk_blob_serialize_xattr(xattr,
453 buf, remaining_sz,
454 &required_sz);
455 if (rc < 0) {
456 /* Need to add a new page to the chain */
457 rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
458 &cur_page);
459 if (rc < 0) {
460 spdk_free(*pages);
461 *pages = NULL;
462 *page_count = 0;
463 return rc;
464 }
465
466 buf = (uint8_t *)cur_page->descriptors;
467 remaining_sz = sizeof(cur_page->descriptors);
468
469 /* Try again */
470 required_sz = 0;
471 rc = _spdk_blob_serialize_xattr(xattr,
472 buf, remaining_sz,
473 &required_sz);
474
475 if (rc < 0) {
476 spdk_free(*pages);
477 *pages = NULL;
478 *page_count = 0;
479 return -1;
480 }
481 }
482
483 remaining_sz -= required_sz;
484 buf += required_sz;
485 }
486
487 /* Serialize extents */
488 uint64_t last_cluster = 0;
489 while (last_cluster < blob->active.num_clusters) {
490 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
491 buf, remaining_sz);
492
493 if (last_cluster == blob->active.num_clusters) {
494 break;
495 }
496
497 rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
498 &cur_page);
499 if (rc < 0) {
500 return rc;
501 }
502
503 buf = (uint8_t *)cur_page->descriptors;
504 remaining_sz = sizeof(cur_page->descriptors);
505 }
506
507 return 0;
508 }
509
510 struct spdk_blob_load_ctx {
511 struct spdk_blob *blob;
512
513 struct spdk_blob_md_page *pages;
514 uint32_t num_pages;
515
516 spdk_bs_sequence_cpl cb_fn;
517 void *cb_arg;
518 };
519
520 static void
521 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
522 {
523 struct spdk_blob_load_ctx *ctx = cb_arg;
524 struct spdk_blob *blob = ctx->blob;
525 struct spdk_blob_md_page *page;
526 int rc;
527
528 page = &ctx->pages[ctx->num_pages - 1];
529
530 if (page->next != SPDK_INVALID_MD_PAGE) {
531 uint32_t next_page = page->next;
532 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
533
534
535 assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
536
537 /* Read the next page */
538 ctx->num_pages++;
539 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
540 sizeof(*page), NULL);
541 if (ctx->pages == NULL) {
542 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
543 free(ctx);
544 return;
545 }
546
547 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
548 next_lba,
549 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
550 _spdk_blob_load_cpl, ctx);
551 return;
552 }
553
554 /* Parse the pages */
555 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
556
557 _spdk_blob_mark_clean(blob);
558
559 ctx->cb_fn(seq, ctx->cb_arg, rc);
560
561 /* Free the memory */
562 spdk_free(ctx->pages);
563 free(ctx);
564 }
565
566 /* Load a blob from disk given a blobid */
567 static void
568 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
569 spdk_bs_sequence_cpl cb_fn, void *cb_arg)
570 {
571 struct spdk_blob_load_ctx *ctx;
572 struct spdk_blob_store *bs;
573 uint32_t page_num;
574 uint64_t lba;
575
576 assert(blob != NULL);
577 assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
578 blob->state == SPDK_BLOB_STATE_DIRTY);
579
580 bs = blob->bs;
581
582 ctx = calloc(1, sizeof(*ctx));
583 if (!ctx) {
584 cb_fn(seq, cb_arg, -ENOMEM);
585 return;
586 }
587
588 ctx->blob = blob;
589 ctx->pages = spdk_realloc(ctx->pages, sizeof(struct spdk_blob_md_page),
590 sizeof(struct spdk_blob_md_page), NULL);
591 if (!ctx->pages) {
592 free(ctx);
593 cb_fn(seq, cb_arg, -ENOMEM);
594 return;
595 }
596 ctx->num_pages = 1;
597 ctx->cb_fn = cb_fn;
598 ctx->cb_arg = cb_arg;
599
600 page_num = _spdk_bs_blobid_to_page(blob->id);
601 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
602
603 blob->state = SPDK_BLOB_STATE_LOADING;
604
605 spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
606 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)),
607 _spdk_blob_load_cpl, ctx);
608 }
609
610 struct spdk_blob_persist_ctx {
611 struct spdk_blob *blob;
612
613 struct spdk_blob_md_page *pages;
614
615 uint64_t idx;
616
617 spdk_bs_sequence_cpl cb_fn;
618 void *cb_arg;
619 };
620
621 static void
622 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
623 {
624 struct spdk_blob_persist_ctx *ctx = cb_arg;
625 struct spdk_blob *blob = ctx->blob;
626
627 if (bserrno == 0) {
628 _spdk_blob_mark_clean(blob);
629 }
630
631 /* Call user callback */
632 ctx->cb_fn(seq, ctx->cb_arg, bserrno);
633
634 /* Free the memory */
635 spdk_free(ctx->pages);
636 free(ctx);
637 }
638
639 static void
640 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
641 {
642 struct spdk_blob_persist_ctx *ctx = cb_arg;
643 struct spdk_blob *blob = ctx->blob;
644 struct spdk_blob_store *bs = blob->bs;
645 void *tmp;
646 size_t i;
647
648 /* Release all clusters that were truncated */
649 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
650 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
651
652 _spdk_bs_release_cluster(bs, cluster_num);
653 }
654
655 if (blob->active.num_clusters == 0) {
656 free(blob->active.clusters);
657 blob->active.clusters = NULL;
658 blob->active.cluster_array_size = 0;
659 } else {
660 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
661 assert(tmp != NULL);
662 blob->active.clusters = tmp;
663 blob->active.cluster_array_size = blob->active.num_clusters;
664 }
665
666 _spdk_blob_persist_complete(seq, ctx, bserrno);
667 }
668
669 static void
670 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
671 {
672 struct spdk_blob_persist_ctx *ctx = cb_arg;
673 struct spdk_blob *blob = ctx->blob;
674 struct spdk_blob_store *bs = blob->bs;
675 spdk_bs_batch_t *batch;
676 size_t i;
677
678 /* Clusters don't move around in blobs. The list shrinks or grows
679 * at the end, but no changes ever occur in the middle of the list.
680 */
681
682 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
683
684 /* Unmap all clusters that were truncated */
685 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
686 uint64_t lba = blob->active.clusters[i];
687 uint32_t lba_count = _spdk_bs_cluster_to_lba(bs, 1);
688
689 spdk_bs_batch_unmap(batch, lba, lba_count);
690 }
691
692 spdk_bs_batch_close(batch);
693 }
694
695 static void
696 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
697 {
698 struct spdk_blob_persist_ctx *ctx = cb_arg;
699 struct spdk_blob *blob = ctx->blob;
700 struct spdk_blob_store *bs = blob->bs;
701 size_t i;
702
703 /* This loop starts at 1 because the first page is special and handled
704 * below. The pages (except the first) are never written in place,
705 * so any pages in the clean list must be unmapped.
706 */
707 for (i = 1; i < blob->clean.num_pages; i++) {
708 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
709 }
710
711 if (blob->active.num_pages == 0) {
712 uint32_t page_num;
713
714 page_num = _spdk_bs_blobid_to_page(blob->id);
715 spdk_bit_array_clear(bs->used_md_pages, page_num);
716 }
717
718 /* Move on to unmapping clusters */
719 _spdk_blob_persist_unmap_clusters(seq, ctx, 0);
720 }
721
722 static void
723 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
724 {
725 struct spdk_blob_persist_ctx *ctx = cb_arg;
726 struct spdk_blob *blob = ctx->blob;
727 struct spdk_blob_store *bs = blob->bs;
728 uint64_t lba;
729 uint32_t lba_count;
730 spdk_bs_batch_t *batch;
731 size_t i;
732
733 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx);
734
735 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page));
736
737 /* This loop starts at 1 because the first page is special and handled
738 * below. The pages (except the first) are never written in place,
739 * so any pages in the clean list must be unmapped.
740 */
741 for (i = 1; i < blob->clean.num_pages; i++) {
742 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
743
744 spdk_bs_batch_unmap(batch, lba, lba_count);
745 }
746
747 /* The first page will only be unmapped if this is a delete. */
748 if (blob->active.num_pages == 0) {
749 uint32_t page_num;
750
751 /* The first page in the metadata goes where the blobid indicates */
752 page_num = _spdk_bs_blobid_to_page(blob->id);
753 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
754
755 spdk_bs_batch_unmap(batch, lba, lba_count);
756 }
757
758 spdk_bs_batch_close(batch);
759 }
760
761 static void
762 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
763 {
764 struct spdk_blob_persist_ctx *ctx = cb_arg;
765 struct spdk_blob *blob = ctx->blob;
766 struct spdk_blob_store *bs = blob->bs;
767 uint64_t lba;
768 uint32_t lba_count;
769 struct spdk_blob_md_page *page;
770
771 if (blob->active.num_pages == 0) {
772 /* Move on to the next step */
773 _spdk_blob_persist_unmap_pages(seq, ctx, 0);
774 return;
775 }
776
777 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
778
779 page = &ctx->pages[0];
780 /* The first page in the metadata goes where the blobid indicates */
781 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
782
783 spdk_bs_sequence_write(seq, page, lba, lba_count,
784 _spdk_blob_persist_unmap_pages, ctx);
785 }
786
787 static void
788 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
789 {
790 struct spdk_blob_persist_ctx *ctx = cb_arg;
791 struct spdk_blob *blob = ctx->blob;
792 struct spdk_blob_store *bs = blob->bs;
793 uint64_t lba;
794 uint32_t lba_count;
795 struct spdk_blob_md_page *page;
796 spdk_bs_batch_t *batch;
797 size_t i;
798
799 /* Clusters don't move around in blobs. The list shrinks or grows
800 * at the end, but no changes ever occur in the middle of the list.
801 */
802
803 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
804
805 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
806
807 /* This starts at 1. The root page is not written until
808 * all of the others are finished
809 */
810 for (i = 1; i < blob->active.num_pages; i++) {
811 page = &ctx->pages[i];
812 assert(page->sequence_num == i);
813
814 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
815
816 spdk_bs_batch_write(batch, page, lba, lba_count);
817 }
818
819 spdk_bs_batch_close(batch);
820 }
821
822 static int
823 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz)
824 {
825 uint64_t i;
826 uint64_t *tmp;
827 uint64_t lfc; /* lowest free cluster */
828 struct spdk_blob_store *bs;
829
830 bs = blob->bs;
831
832 assert(blob->state != SPDK_BLOB_STATE_LOADING &&
833 blob->state != SPDK_BLOB_STATE_SYNCING);
834
835 if (blob->active.num_clusters == sz) {
836 return 0;
837 }
838
839 if (blob->active.num_clusters < blob->active.cluster_array_size) {
840 /* If this blob was resized to be larger, then smaller, then
841 * larger without syncing, then the cluster array already
842 * contains spare assigned clusters we can use.
843 */
844 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
845 sz);
846 }
847
848 blob->state = SPDK_BLOB_STATE_DIRTY;
849
850 /* Do two passes - one to verify that we can obtain enough clusters
851 * and another to actually claim them.
852 */
853
854 lfc = 0;
855 for (i = blob->active.num_clusters; i < sz; i++) {
856 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
857 if (lfc >= bs->total_clusters) {
858 /* No more free clusters. Cannot satisfy the request */
859 assert(false);
860 return -1;
861 }
862 lfc++;
863 }
864
865 if (sz > blob->active.num_clusters) {
866 /* Expand the cluster array if necessary.
867 * We only shrink the array when persisting.
868 */
869 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
870 if (sz > 0 && tmp == NULL) {
871 assert(false);
872 return -1;
873 }
874 blob->active.clusters = tmp;
875 blob->active.cluster_array_size = sz;
876 }
877
878 lfc = 0;
879 for (i = blob->active.num_clusters; i < sz; i++) {
880 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
881 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
882 _spdk_bs_claim_cluster(bs, lfc);
883 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
884 lfc++;
885 }
886
887 blob->active.num_clusters = sz;
888
889 return 0;
890 }
891
892 /* Write a blob to disk */
893 static void
894 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
895 spdk_bs_sequence_cpl cb_fn, void *cb_arg)
896 {
897 struct spdk_blob_persist_ctx *ctx;
898 int rc;
899 uint64_t i;
900 uint32_t page_num;
901 struct spdk_blob_store *bs;
902
903 assert(blob != NULL);
904 assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
905 blob->state == SPDK_BLOB_STATE_DIRTY);
906
907 if (blob->state == SPDK_BLOB_STATE_CLEAN) {
908 cb_fn(seq, cb_arg, 0);
909 return;
910 }
911
912 bs = blob->bs;
913
914 ctx = calloc(1, sizeof(*ctx));
915 if (!ctx) {
916 cb_fn(seq, cb_arg, -ENOMEM);
917 return;
918 }
919 ctx->blob = blob;
920 ctx->cb_fn = cb_fn;
921 ctx->cb_arg = cb_arg;
922
923 blob->state = SPDK_BLOB_STATE_SYNCING;
924
925 if (blob->active.num_pages == 0) {
926 /* This is the signal that the blob should be deleted.
927 * Immediately jump to the clean up routine. */
928 assert(blob->clean.num_pages > 0);
929 ctx->idx = blob->clean.num_pages - 1;
930 _spdk_blob_persist_unmap_pages(seq, ctx, 0);
931 return;
932
933 }
934
935 /* Generate the new metadata */
936 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
937 if (rc < 0) {
938 free(ctx);
939 cb_fn(seq, cb_arg, rc);
940 return;
941 }
942
943 assert(blob->active.num_pages >= 1);
944
945 /* Resize the cache of page indices */
946 blob->active.pages = realloc(blob->active.pages,
947 blob->active.num_pages * sizeof(*blob->active.pages));
948 if (!blob->active.pages) {
949 free(ctx);
950 cb_fn(seq, cb_arg, -ENOMEM);
951 return;
952 }
953
954 /* Assign this metadata to pages. This requires two passes -
955 * one to verify that there are enough pages and a second
956 * to actually claim them. */
957 page_num = 0;
958 /* Note that this loop starts at one. The first page location is fixed by the blobid. */
959 for (i = 1; i < blob->active.num_pages; i++) {
960 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
961 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
962 spdk_free(ctx->pages);
963 free(ctx);
964 blob->state = SPDK_BLOB_STATE_DIRTY;
965 cb_fn(seq, cb_arg, -ENOMEM);
966 return;
967 }
968 page_num++;
969 }
970
971 page_num = 0;
972 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
973 for (i = 1; i < blob->active.num_pages; i++) {
974 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
975 ctx->pages[i - 1].next = page_num;
976 blob->active.pages[i] = page_num;
977 spdk_bit_array_set(bs->used_md_pages, page_num);
978 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
979 page_num++;
980 }
981
982 /* Start writing the metadata from last page to first */
983 ctx->idx = blob->active.num_pages - 1;
984 _spdk_blob_persist_write_page_chain(seq, ctx, 0);
985 }
986
987 static void
988 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel,
989 void *payload, uint64_t offset, uint64_t length,
990 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
991 {
992 spdk_bs_batch_t *batch;
993 struct spdk_bs_cpl cpl;
994 uint64_t lba;
995 uint32_t lba_count;
996 uint8_t *buf;
997 uint64_t page;
998
999 assert(blob != NULL);
1000
1001 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1002 cb_fn(cb_arg, -EINVAL);
1003 return;
1004 }
1005
1006 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1007 cpl.u.blob_basic.cb_fn = cb_fn;
1008 cpl.u.blob_basic.cb_arg = cb_arg;
1009
1010 batch = spdk_bs_batch_open(_channel, &cpl);
1011 if (!batch) {
1012 cb_fn(cb_arg, -ENOMEM);
1013 return;
1014 }
1015
1016 length = _spdk_bs_page_to_lba(blob->bs, length);
1017 page = offset;
1018 buf = payload;
1019 while (length > 0) {
1020 lba = _spdk_bs_blob_page_to_lba(blob, page);
1021 lba_count = spdk_min(length,
1022 _spdk_bs_page_to_lba(blob->bs,
1023 _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1024
1025 if (read) {
1026 spdk_bs_batch_read(batch, buf, lba, lba_count);
1027 } else {
1028 spdk_bs_batch_write(batch, buf, lba, lba_count);
1029 }
1030
1031 length -= lba_count;
1032 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1033 page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1034 }
1035
1036 spdk_bs_batch_close(batch);
1037 }
1038
1039 static struct spdk_blob *
1040 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1041 {
1042 struct spdk_blob *blob;
1043
1044 TAILQ_FOREACH(blob, &bs->blobs, link) {
1045 if (blob->id == blobid) {
1046 return blob;
1047 }
1048 }
1049
1050 return NULL;
1051 }
1052
1053 static int
1054 _spdk_bs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx)
1055 {
1056 struct spdk_blob_store *bs = io_device;
1057 struct spdk_bs_dev *dev = bs->dev;
1058 struct spdk_bs_channel *channel = ctx_buf;
1059 uint32_t max_ops = *(uint32_t *)unique_ctx;
1060 uint32_t i;
1061
1062 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1063 if (!channel->req_mem) {
1064 free(channel);
1065 return -1;
1066 }
1067
1068 TAILQ_INIT(&channel->reqs);
1069
1070 for (i = 0; i < max_ops; i++) {
1071 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1072 }
1073
1074 channel->bs = bs;
1075 channel->dev = dev;
1076 channel->dev_channel = dev->create_channel(dev);
1077
1078 return 0;
1079 }
1080
1081 static void
1082 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1083 {
1084 struct spdk_bs_channel *channel = ctx_buf;
1085
1086 free(channel->req_mem);
1087 channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1088 }
1089
1090 static void
1091 _spdk_bs_free(struct spdk_blob_store *bs)
1092 {
1093 struct spdk_blob *blob, *blob_tmp;
1094
1095 spdk_bs_unregister_md_thread(bs);
1096 spdk_io_device_unregister(bs);
1097
1098 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1099 TAILQ_REMOVE(&bs->blobs, blob, link);
1100 _spdk_blob_free(blob);
1101 }
1102
1103 spdk_bit_array_free(&bs->used_md_pages);
1104 spdk_bit_array_free(&bs->used_clusters);
1105
1106 bs->dev->destroy(bs->dev);
1107 free(bs);
1108 }
1109
1110 void
1111 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1112 {
1113 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1114 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1115 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1116 }
1117
1118 static struct spdk_blob_store *
1119 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1120 {
1121 struct spdk_blob_store *bs;
1122
1123 bs = calloc(1, sizeof(struct spdk_blob_store));
1124 if (!bs) {
1125 return NULL;
1126 }
1127
1128 TAILQ_INIT(&bs->blobs);
1129 bs->dev = dev;
1130
1131 /*
1132 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1133 * even multiple of the cluster size.
1134 */
1135 bs->cluster_sz = opts->cluster_sz;
1136 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1137 bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1138 bs->num_free_clusters = bs->total_clusters;
1139 bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1140 if (bs->used_clusters == NULL) {
1141 _spdk_bs_free(bs);
1142 return NULL;
1143 }
1144
1145 bs->max_md_ops = opts->max_md_ops;
1146 bs->super_blob = SPDK_BLOBID_INVALID;
1147
1148 /* The metadata is assumed to be at least 1 page */
1149 bs->used_md_pages = spdk_bit_array_create(1);
1150
1151 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
1152 sizeof(struct spdk_bs_channel));
1153 spdk_bs_register_md_thread(bs);
1154
1155 return bs;
1156 }
1157
1158 /* START spdk_bs_load */
1159
1160 struct spdk_bs_load_ctx {
1161 struct spdk_blob_store *bs;
1162 struct spdk_bs_super_block *super;
1163
1164 struct spdk_bs_md_mask *mask;
1165 };
1166
1167 static void
1168 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1169 {
1170 struct spdk_bs_load_ctx *ctx = cb_arg;
1171 uint32_t i, j;
1172 int rc;
1173
1174 /* The type must be correct */
1175 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1176 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1177 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1178 struct spdk_blob_md_page) * 8));
1179 /* The length of the mask must be exactly equal to the total number of clusters*/
1180 assert(ctx->mask->length == ctx->bs->total_clusters);
1181
1182 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1183 if (rc < 0) {
1184 spdk_free(ctx->super);
1185 spdk_free(ctx->mask);
1186 _spdk_bs_free(ctx->bs);
1187 free(ctx);
1188 spdk_bs_sequence_finish(seq, -ENOMEM);
1189 return;
1190 }
1191
1192 ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1193 for (i = 0; i < ctx->mask->length / 8; i++) {
1194 uint8_t segment = ctx->mask->mask[i];
1195 for (j = 0; segment && (j < 8); j++) {
1196 if (segment & 1U) {
1197 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1198 assert(ctx->bs->num_free_clusters > 0);
1199 ctx->bs->num_free_clusters--;
1200 }
1201 segment >>= 1U;
1202 }
1203 }
1204
1205 spdk_free(ctx->super);
1206 spdk_free(ctx->mask);
1207 free(ctx);
1208
1209 spdk_bs_sequence_finish(seq, bserrno);
1210 }
1211
1212 static void
1213 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1214 {
1215 struct spdk_bs_load_ctx *ctx = cb_arg;
1216 uint64_t lba, lba_count;
1217 uint32_t i, j;
1218 int rc;
1219
1220 /* The type must be correct */
1221 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1222 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1223 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) *
1224 8));
1225 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1226 assert(ctx->mask->length == ctx->super->md_len);
1227
1228 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1229 if (rc < 0) {
1230 spdk_free(ctx->super);
1231 spdk_free(ctx->mask);
1232 _spdk_bs_free(ctx->bs);
1233 free(ctx);
1234 spdk_bs_sequence_finish(seq, -ENOMEM);
1235 return;
1236 }
1237
1238 for (i = 0; i < ctx->mask->length / 8; i++) {
1239 uint8_t segment = ctx->mask->mask[i];
1240 for (j = 0; segment && (j < 8); j++) {
1241 if (segment & 1U) {
1242 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1243 }
1244 segment >>= 1U;
1245 }
1246 }
1247 spdk_free(ctx->mask);
1248
1249 /* Read the used clusters mask */
1250 ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page),
1251 0x1000, NULL);
1252 if (!ctx->mask) {
1253 spdk_free(ctx->super);
1254 _spdk_bs_free(ctx->bs);
1255 free(ctx);
1256 spdk_bs_sequence_finish(seq, -ENOMEM);
1257 return;
1258 }
1259 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1260 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1261 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1262 _spdk_bs_load_used_clusters_cpl, ctx);
1263 }
1264
1265 static void
1266 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1267 {
1268 struct spdk_bs_load_ctx *ctx = cb_arg;
1269 uint64_t lba, lba_count;
1270
1271 if (ctx->super->version != SPDK_BS_VERSION) {
1272 spdk_free(ctx->super);
1273 _spdk_bs_free(ctx->bs);
1274 free(ctx);
1275 spdk_bs_sequence_finish(seq, -EILSEQ);
1276 return;
1277 }
1278
1279 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1280 sizeof(ctx->super->signature)) != 0) {
1281 spdk_free(ctx->super);
1282 _spdk_bs_free(ctx->bs);
1283 free(ctx);
1284 spdk_bs_sequence_finish(seq, -EILSEQ);
1285 return;
1286 }
1287
1288 if (ctx->super->clean != 1) {
1289 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED.
1290 * All of the necessary data to recover is available
1291 * on disk - the code just has not been written yet.
1292 */
1293 assert(false);
1294 spdk_free(ctx->super);
1295 _spdk_bs_free(ctx->bs);
1296 free(ctx);
1297 spdk_bs_sequence_finish(seq, -EILSEQ);
1298 return;
1299 }
1300 ctx->super->clean = 0;
1301
1302 /* Parse the super block */
1303 ctx->bs->cluster_sz = ctx->super->cluster_size;
1304 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
1305 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1306 ctx->bs->md_start = ctx->super->md_start;
1307 ctx->bs->md_len = ctx->super->md_len;
1308
1309 /* Read the used pages mask */
1310 ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 0x1000,
1311 NULL);
1312 if (!ctx->mask) {
1313 spdk_free(ctx->super);
1314 _spdk_bs_free(ctx->bs);
1315 free(ctx);
1316 spdk_bs_sequence_finish(seq, -ENOMEM);
1317 return;
1318 }
1319 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1320 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1321 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1322 _spdk_bs_load_used_pages_cpl, ctx);
1323 }
1324
1325 void
1326 spdk_bs_load(struct spdk_bs_dev *dev,
1327 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1328 {
1329 struct spdk_blob_store *bs;
1330 struct spdk_bs_cpl cpl;
1331 spdk_bs_sequence_t *seq;
1332 struct spdk_bs_load_ctx *ctx;
1333 struct spdk_bs_opts opts = {};
1334
1335 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev);
1336
1337 spdk_bs_opts_init(&opts);
1338
1339 bs = _spdk_bs_alloc(dev, &opts);
1340 if (!bs) {
1341 cb_fn(cb_arg, NULL, -ENOMEM);
1342 return;
1343 }
1344
1345 ctx = calloc(1, sizeof(*ctx));
1346 if (!ctx) {
1347 _spdk_bs_free(bs);
1348 cb_fn(cb_arg, NULL, -ENOMEM);
1349 return;
1350 }
1351
1352 ctx->bs = bs;
1353
1354 /* Allocate memory for the super block */
1355 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1356 if (!ctx->super) {
1357 free(ctx);
1358 _spdk_bs_free(bs);
1359 return;
1360 }
1361
1362 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1363 cpl.u.bs_handle.cb_fn = cb_fn;
1364 cpl.u.bs_handle.cb_arg = cb_arg;
1365 cpl.u.bs_handle.bs = bs;
1366
1367 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1368 if (!seq) {
1369 spdk_free(ctx->super);
1370 free(ctx);
1371 _spdk_bs_free(bs);
1372 cb_fn(cb_arg, NULL, -ENOMEM);
1373 return;
1374 }
1375
1376 /* Read the super block */
1377 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1378 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1379 _spdk_bs_load_super_cpl, ctx);
1380 }
1381
1382 /* END spdk_bs_load */
1383
1384 /* START spdk_bs_init */
1385
1386 struct spdk_bs_init_ctx {
1387 struct spdk_blob_store *bs;
1388 struct spdk_bs_super_block *super;
1389 };
1390
1391 static void
1392 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1393 {
1394 struct spdk_bs_init_ctx *ctx = cb_arg;
1395
1396 spdk_free(ctx->super);
1397 free(ctx);
1398
1399 spdk_bs_sequence_finish(seq, bserrno);
1400 }
1401
1402 static void
1403 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1404 {
1405 struct spdk_bs_init_ctx *ctx = cb_arg;
1406
1407 /* Write super block */
1408 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1409 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1410 _spdk_bs_init_persist_super_cpl, ctx);
1411 }
1412
1413 void
1414 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
1415 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1416 {
1417 struct spdk_bs_init_ctx *ctx;
1418 struct spdk_blob_store *bs;
1419 struct spdk_bs_cpl cpl;
1420 spdk_bs_sequence_t *seq;
1421 uint64_t num_md_pages;
1422 uint32_t i;
1423 struct spdk_bs_opts opts = {};
1424 int rc;
1425
1426 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev);
1427
1428 if (o) {
1429 opts = *o;
1430 } else {
1431 spdk_bs_opts_init(&opts);
1432 }
1433
1434 bs = _spdk_bs_alloc(dev, &opts);
1435 if (!bs) {
1436 cb_fn(cb_arg, NULL, -ENOMEM);
1437 return;
1438 }
1439
1440 if (opts.num_md_pages == UINT32_MAX) {
1441 /* By default, allocate 1 page per cluster.
1442 * Technically, this over-allocates metadata
1443 * because more metadata will reduce the number
1444 * of usable clusters. This can be addressed with
1445 * more complex math in the future.
1446 */
1447 bs->md_len = bs->total_clusters;
1448 } else {
1449 bs->md_len = opts.num_md_pages;
1450 }
1451
1452 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
1453 if (rc < 0) {
1454 _spdk_bs_free(bs);
1455 cb_fn(cb_arg, NULL, -ENOMEM);
1456 return;
1457 }
1458
1459 ctx = calloc(1, sizeof(*ctx));
1460 if (!ctx) {
1461 _spdk_bs_free(bs);
1462 cb_fn(cb_arg, NULL, -ENOMEM);
1463 return;
1464 }
1465
1466 ctx->bs = bs;
1467
1468 /* Allocate memory for the super block */
1469 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1470 if (!ctx->super) {
1471 free(ctx);
1472 _spdk_bs_free(bs);
1473 return;
1474 }
1475 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1476 sizeof(ctx->super->signature));
1477 ctx->super->version = SPDK_BS_VERSION;
1478 ctx->super->length = sizeof(*ctx->super);
1479 ctx->super->super_blob = bs->super_blob;
1480 ctx->super->clean = 0;
1481 ctx->super->cluster_size = bs->cluster_sz;
1482
1483 /* Calculate how many pages the metadata consumes at the front
1484 * of the disk.
1485 */
1486
1487 /* The super block uses 1 page */
1488 num_md_pages = 1;
1489
1490 /* The used_md_pages mask requires 1 bit per metadata page, rounded
1491 * up to the nearest page, plus a header.
1492 */
1493 ctx->super->used_page_mask_start = num_md_pages;
1494 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1495 divide_round_up(bs->md_len, 8),
1496 sizeof(struct spdk_blob_md_page));
1497 num_md_pages += ctx->super->used_page_mask_len;
1498
1499 /* The used_clusters mask requires 1 bit per cluster, rounded
1500 * up to the nearest page, plus a header.
1501 */
1502 ctx->super->used_cluster_mask_start = num_md_pages;
1503 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1504 divide_round_up(bs->total_clusters, 8),
1505 sizeof(struct spdk_blob_md_page));
1506 num_md_pages += ctx->super->used_cluster_mask_len;
1507
1508 /* The metadata region size was chosen above */
1509 ctx->super->md_start = bs->md_start = num_md_pages;
1510 ctx->super->md_len = bs->md_len;
1511 num_md_pages += bs->md_len;
1512
1513 /* Claim all of the clusters used by the metadata */
1514 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) {
1515 _spdk_bs_claim_cluster(bs, i);
1516 }
1517
1518 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1519 cpl.u.bs_handle.cb_fn = cb_fn;
1520 cpl.u.bs_handle.cb_arg = cb_arg;
1521 cpl.u.bs_handle.bs = bs;
1522
1523 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1524 if (!seq) {
1525 spdk_free(ctx->super);
1526 free(ctx);
1527 _spdk_bs_free(bs);
1528 cb_fn(cb_arg, NULL, -ENOMEM);
1529 return;
1530 }
1531
1532 /* TRIM the entire device */
1533 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx);
1534 }
1535
1536 /* END spdk_bs_init */
1537
1538 /* START spdk_bs_unload */
1539
1540 struct spdk_bs_unload_ctx {
1541 struct spdk_blob_store *bs;
1542 struct spdk_bs_super_block *super;
1543
1544 struct spdk_bs_md_mask *mask;
1545 };
1546
1547 static void
1548 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1549 {
1550 struct spdk_bs_unload_ctx *ctx = cb_arg;
1551
1552 spdk_free(ctx->super);
1553
1554 spdk_bs_sequence_finish(seq, bserrno);
1555
1556 _spdk_bs_free(ctx->bs);
1557 free(ctx);
1558 }
1559
1560 static void
1561 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1562 {
1563 struct spdk_bs_unload_ctx *ctx = cb_arg;
1564
1565 spdk_free(ctx->mask);
1566
1567 /* Update the values in the super block */
1568 ctx->super->super_blob = ctx->bs->super_blob;
1569 ctx->super->clean = 1;
1570
1571 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1572 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1573 _spdk_bs_unload_write_super_cpl, ctx);
1574 }
1575
1576 static void
1577 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1578 {
1579 struct spdk_bs_unload_ctx *ctx = cb_arg;
1580 uint32_t i;
1581 uint64_t lba, lba_count;
1582
1583 spdk_free(ctx->mask);
1584
1585 /* Write out the used clusters mask */
1586 ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page),
1587 0x1000, NULL);
1588 if (!ctx->mask) {
1589 spdk_free(ctx->super);
1590 free(ctx);
1591 spdk_bs_sequence_finish(seq, -ENOMEM);
1592 return;
1593 }
1594
1595 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1596 ctx->mask->length = ctx->bs->total_clusters;
1597 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1598
1599 i = 0;
1600 while (true) {
1601 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i);
1602 if (i > ctx->mask->length) {
1603 break;
1604 }
1605 ctx->mask->mask[i / 8] |= 1U << (i % 8);
1606 i++;
1607 }
1608
1609 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1610 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1611 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1612 _spdk_bs_unload_write_used_clusters_cpl, ctx);
1613 }
1614
1615 static void
1616 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1617 {
1618 struct spdk_bs_unload_ctx *ctx = cb_arg;
1619 uint32_t i;
1620 uint64_t lba, lba_count;
1621
1622 /* Write out the used page mask */
1623 ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page),
1624 0x1000, NULL);
1625 if (!ctx->mask) {
1626 spdk_free(ctx->super);
1627 free(ctx);
1628 spdk_bs_sequence_finish(seq, -ENOMEM);
1629 return;
1630 }
1631
1632 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1633 ctx->mask->length = ctx->super->md_len;
1634 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1635
1636 i = 0;
1637 while (true) {
1638 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i);
1639 if (i > ctx->mask->length) {
1640 break;
1641 }
1642 ctx->mask->mask[i / 8] |= 1U << (i % 8);
1643 i++;
1644 }
1645
1646 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1647 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1648 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1649 _spdk_bs_unload_write_used_pages_cpl, ctx);
1650 }
1651
1652 void
1653 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
1654 {
1655 struct spdk_bs_cpl cpl;
1656 spdk_bs_sequence_t *seq;
1657 struct spdk_bs_unload_ctx *ctx;
1658
1659 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n");
1660
1661 ctx = calloc(1, sizeof(*ctx));
1662 if (!ctx) {
1663 cb_fn(cb_arg, -ENOMEM);
1664 return;
1665 }
1666
1667 ctx->bs = bs;
1668
1669 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1670 if (!ctx->super) {
1671 free(ctx);
1672 cb_fn(cb_arg, -ENOMEM);
1673 return;
1674 }
1675
1676 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
1677 cpl.u.bs_basic.cb_fn = cb_fn;
1678 cpl.u.bs_basic.cb_arg = cb_arg;
1679
1680 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1681 if (!seq) {
1682 spdk_free(ctx->super);
1683 free(ctx);
1684 cb_fn(cb_arg, -ENOMEM);
1685 return;
1686 }
1687
1688 assert(TAILQ_EMPTY(&bs->blobs));
1689
1690 /* Read super block */
1691 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1692 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1693 _spdk_bs_unload_read_super_cpl, ctx);
1694 }
1695
1696 /* END spdk_bs_unload */
1697
1698 void
1699 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
1700 spdk_bs_op_complete cb_fn, void *cb_arg)
1701 {
1702 bs->super_blob = blobid;
1703 cb_fn(cb_arg, 0);
1704 }
1705
1706 void
1707 spdk_bs_get_super(struct spdk_blob_store *bs,
1708 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1709 {
1710 if (bs->super_blob == SPDK_BLOBID_INVALID) {
1711 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
1712 } else {
1713 cb_fn(cb_arg, bs->super_blob, 0);
1714 }
1715 }
1716
1717 uint64_t
1718 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
1719 {
1720 return bs->cluster_sz;
1721 }
1722
1723 uint64_t
1724 spdk_bs_get_page_size(struct spdk_blob_store *bs)
1725 {
1726 return sizeof(struct spdk_blob_md_page);
1727 }
1728
1729 uint64_t
1730 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
1731 {
1732 return bs->num_free_clusters;
1733 }
1734
1735 int spdk_bs_register_md_thread(struct spdk_blob_store *bs)
1736 {
1737 bs->md_channel = spdk_get_io_channel(bs, SPDK_IO_PRIORITY_DEFAULT, true,
1738 (void *)&bs->max_md_ops);
1739
1740 return 0;
1741 }
1742
1743 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
1744 {
1745 spdk_put_io_channel(bs->md_channel);
1746
1747 return 0;
1748 }
1749
1750 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
1751 {
1752 assert(blob != NULL);
1753
1754 return blob->id;
1755 }
1756
1757 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
1758 {
1759 assert(blob != NULL);
1760
1761 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
1762 }
1763
1764 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
1765 {
1766 assert(blob != NULL);
1767
1768 return blob->active.num_clusters;
1769 }
1770
1771 /* START spdk_bs_md_create_blob */
1772
1773 static void
1774 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1775 {
1776 struct spdk_blob *blob = cb_arg;
1777
1778 _spdk_blob_free(blob);
1779
1780 spdk_bs_sequence_finish(seq, bserrno);
1781 }
1782
1783 void spdk_bs_md_create_blob(struct spdk_blob_store *bs,
1784 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1785 {
1786 struct spdk_blob *blob;
1787 uint32_t page_idx;
1788 struct spdk_bs_cpl cpl;
1789 spdk_bs_sequence_t *seq;
1790 spdk_blob_id id;
1791
1792 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
1793 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
1794 cb_fn(cb_arg, 0, -ENOMEM);
1795 return;
1796 }
1797 spdk_bit_array_set(bs->used_md_pages, page_idx);
1798
1799 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper
1800 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the
1801 * code assumes blob id == page_idx.
1802 */
1803 id = (1ULL << 32) | page_idx;
1804
1805 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
1806
1807 blob = _spdk_blob_alloc(bs, id);
1808 if (!blob) {
1809 cb_fn(cb_arg, 0, -ENOMEM);
1810 return;
1811 }
1812
1813 cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
1814 cpl.u.blobid.cb_fn = cb_fn;
1815 cpl.u.blobid.cb_arg = cb_arg;
1816 cpl.u.blobid.blobid = blob->id;
1817
1818 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1819 if (!seq) {
1820 _spdk_blob_free(blob);
1821 cb_fn(cb_arg, 0, -ENOMEM);
1822 return;
1823 }
1824
1825 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob);
1826 }
1827
1828 /* END spdk_bs_md_create_blob */
1829
1830 /* START spdk_bs_md_resize_blob */
1831 int
1832 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz)
1833 {
1834 int rc;
1835
1836 assert(blob != NULL);
1837
1838 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
1839
1840 if (sz == blob->active.num_clusters) {
1841 return 0;
1842 }
1843
1844 rc = _spdk_resize_blob(blob, sz);
1845 if (rc < 0) {
1846 return rc;
1847 }
1848
1849 return 0;
1850 }
1851
1852 /* END spdk_bs_md_resize_blob */
1853
1854
1855 /* START spdk_bs_md_delete_blob */
1856
1857 static void
1858 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1859 {
1860 struct spdk_blob *blob = cb_arg;
1861
1862 _spdk_blob_free(blob);
1863
1864 spdk_bs_sequence_finish(seq, bserrno);
1865 }
1866
1867 static void
1868 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1869 {
1870 struct spdk_blob *blob = cb_arg;
1871
1872 blob->state = SPDK_BLOB_STATE_DIRTY;
1873 blob->active.num_pages = 0;
1874 _spdk_resize_blob(blob, 0);
1875
1876 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob);
1877 }
1878
1879 void
1880 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1881 spdk_blob_op_complete cb_fn, void *cb_arg)
1882 {
1883 struct spdk_blob *blob;
1884 struct spdk_bs_cpl cpl;
1885 spdk_bs_sequence_t *seq;
1886
1887 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid);
1888
1889 blob = _spdk_blob_lookup(bs, blobid);
1890 if (blob) {
1891 assert(blob->open_ref > 0);
1892 cb_fn(cb_arg, -EINVAL);
1893 return;
1894 }
1895
1896 blob = _spdk_blob_alloc(bs, blobid);
1897 if (!blob) {
1898 cb_fn(cb_arg, -ENOMEM);
1899 return;
1900 }
1901
1902 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1903 cpl.u.blob_basic.cb_fn = cb_fn;
1904 cpl.u.blob_basic.cb_arg = cb_arg;
1905
1906 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1907 if (!seq) {
1908 _spdk_blob_free(blob);
1909 cb_fn(cb_arg, -ENOMEM);
1910 return;
1911 }
1912
1913 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob);
1914 }
1915
1916 /* END spdk_bs_md_delete_blob */
1917
1918 /* START spdk_bs_md_open_blob */
1919
1920 static void
1921 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1922 {
1923 struct spdk_blob *blob = cb_arg;
1924
1925 blob->open_ref++;
1926
1927 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
1928
1929 spdk_bs_sequence_finish(seq, bserrno);
1930 }
1931
1932 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1933 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
1934 {
1935 struct spdk_blob *blob;
1936 struct spdk_bs_cpl cpl;
1937 spdk_bs_sequence_t *seq;
1938 uint32_t page_num;
1939
1940 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid);
1941
1942 blob = _spdk_blob_lookup(bs, blobid);
1943 if (blob) {
1944 blob->open_ref++;
1945 cb_fn(cb_arg, blob, 0);
1946 return;
1947 }
1948
1949 page_num = _spdk_bs_blobid_to_page(blobid);
1950 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) {
1951 /* Invalid blobid */
1952 cb_fn(cb_arg, NULL, -ENOENT);
1953 return;
1954 }
1955
1956 blob = _spdk_blob_alloc(bs, blobid);
1957 if (!blob) {
1958 cb_fn(cb_arg, NULL, -ENOMEM);
1959 return;
1960 }
1961
1962 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
1963 cpl.u.blob_handle.cb_fn = cb_fn;
1964 cpl.u.blob_handle.cb_arg = cb_arg;
1965 cpl.u.blob_handle.blob = blob;
1966
1967 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1968 if (!seq) {
1969 _spdk_blob_free(blob);
1970 cb_fn(cb_arg, NULL, -ENOMEM);
1971 return;
1972 }
1973
1974 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob);
1975 }
1976
1977 /* START spdk_bs_md_sync_blob */
1978 static void
1979 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1980 {
1981 spdk_bs_sequence_finish(seq, bserrno);
1982 }
1983
1984 void spdk_bs_md_sync_blob(struct spdk_blob *blob,
1985 spdk_blob_op_complete cb_fn, void *cb_arg)
1986 {
1987 struct spdk_bs_cpl cpl;
1988 spdk_bs_sequence_t *seq;
1989
1990 assert(blob != NULL);
1991
1992 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id);
1993
1994 assert(blob->state != SPDK_BLOB_STATE_LOADING &&
1995 blob->state != SPDK_BLOB_STATE_SYNCING);
1996
1997 if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1998 cb_fn(cb_arg, 0);
1999 return;
2000 }
2001
2002 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2003 cpl.u.blob_basic.cb_fn = cb_fn;
2004 cpl.u.blob_basic.cb_arg = cb_arg;
2005
2006 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2007 if (!seq) {
2008 cb_fn(cb_arg, -ENOMEM);
2009 return;
2010 }
2011
2012 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob);
2013 }
2014
2015 /* END spdk_bs_md_sync_blob */
2016
2017 /* START spdk_bs_md_close_blob */
2018
2019 static void
2020 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2021 {
2022 struct spdk_blob **blob = cb_arg;
2023
2024 if ((*blob)->open_ref == 0) {
2025 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link);
2026 _spdk_blob_free((*blob));
2027 }
2028
2029 *blob = NULL;
2030
2031 spdk_bs_sequence_finish(seq, bserrno);
2032 }
2033
2034 void spdk_bs_md_close_blob(struct spdk_blob **b,
2035 spdk_blob_op_complete cb_fn, void *cb_arg)
2036 {
2037 struct spdk_bs_cpl cpl;
2038 struct spdk_blob *blob;
2039 spdk_bs_sequence_t *seq;
2040
2041 assert(b != NULL);
2042 blob = *b;
2043 assert(blob != NULL);
2044
2045 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id);
2046
2047 assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2048 blob->state != SPDK_BLOB_STATE_SYNCING);
2049
2050 if (blob->open_ref == 0) {
2051 cb_fn(cb_arg, -EBADF);
2052 return;
2053 }
2054
2055 blob->open_ref--;
2056
2057 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2058 cpl.u.blob_basic.cb_fn = cb_fn;
2059 cpl.u.blob_basic.cb_arg = cb_arg;
2060
2061 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2062 if (!seq) {
2063 cb_fn(cb_arg, -ENOMEM);
2064 return;
2065 }
2066
2067 if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2068 _spdk_blob_close_cpl(seq, b, 0);
2069 return;
2070 }
2071
2072 /* Sync metadata */
2073 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b);
2074 }
2075
2076 /* END spdk_bs_md_close_blob */
2077
2078 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs,
2079 uint32_t priority, uint32_t max_ops)
2080 {
2081 return spdk_get_io_channel(bs, priority, true, (void *)&max_ops);
2082 }
2083
2084 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
2085 {
2086 spdk_put_io_channel(channel);
2087 }
2088
2089 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel,
2090 spdk_blob_op_complete cb_fn, void *cb_arg)
2091 {
2092 /* Flush is synchronous right now */
2093 cb_fn(cb_arg, 0);
2094 }
2095
2096 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2097 void *payload, uint64_t offset, uint64_t length,
2098 spdk_blob_op_complete cb_fn, void *cb_arg)
2099 {
2100 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false);
2101 }
2102
2103 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2104 void *payload, uint64_t offset, uint64_t length,
2105 spdk_blob_op_complete cb_fn, void *cb_arg)
2106 {
2107 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true);
2108 }
2109
2110 struct spdk_bs_iter_ctx {
2111 int64_t page_num;
2112 struct spdk_blob_store *bs;
2113
2114 spdk_blob_op_with_handle_complete cb_fn;
2115 void *cb_arg;
2116 };
2117
2118 static void
2119 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
2120 {
2121 struct spdk_bs_iter_ctx *ctx = cb_arg;
2122 struct spdk_blob_store *bs = ctx->bs;
2123 spdk_blob_id id;
2124
2125 if (bserrno == 0) {
2126 ctx->cb_fn(ctx->cb_arg, blob, bserrno);
2127 free(ctx);
2128 return;
2129 }
2130
2131 ctx->page_num++;
2132 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num);
2133 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
2134 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
2135 free(ctx);
2136 return;
2137 }
2138
2139 id = (1ULL << 32) | ctx->page_num;
2140
2141 blob = _spdk_blob_lookup(bs, id);
2142 if (blob) {
2143 blob->open_ref++;
2144 ctx->cb_fn(ctx->cb_arg, blob, 0);
2145 free(ctx);
2146 return;
2147 }
2148
2149 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
2150 }
2151
2152 void
2153 spdk_bs_md_iter_first(struct spdk_blob_store *bs,
2154 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2155 {
2156 struct spdk_bs_iter_ctx *ctx;
2157
2158 ctx = calloc(1, sizeof(*ctx));
2159 if (!ctx) {
2160 cb_fn(cb_arg, NULL, -ENOMEM);
2161 return;
2162 }
2163
2164 ctx->page_num = -1;
2165 ctx->bs = bs;
2166 ctx->cb_fn = cb_fn;
2167 ctx->cb_arg = cb_arg;
2168
2169 _spdk_bs_iter_cpl(ctx, NULL, -1);
2170 }
2171
2172 static void
2173 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
2174 {
2175 struct spdk_bs_iter_ctx *ctx = cb_arg;
2176
2177 _spdk_bs_iter_cpl(ctx, NULL, -1);
2178 }
2179
2180 void
2181 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b,
2182 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2183 {
2184 struct spdk_bs_iter_ctx *ctx;
2185 struct spdk_blob *blob;
2186
2187 assert(b != NULL);
2188 blob = *b;
2189 assert(blob != NULL);
2190
2191 ctx = calloc(1, sizeof(*ctx));
2192 if (!ctx) {
2193 cb_fn(cb_arg, NULL, -ENOMEM);
2194 return;
2195 }
2196
2197 ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
2198 ctx->bs = bs;
2199 ctx->cb_fn = cb_fn;
2200 ctx->cb_arg = cb_arg;
2201
2202 /* Close the existing blob */
2203 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx);
2204 }
2205
2206 int
2207 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
2208 uint16_t value_len)
2209 {
2210 struct spdk_xattr *xattr;
2211
2212 assert(blob != NULL);
2213
2214 assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2215 blob->state != SPDK_BLOB_STATE_SYNCING);
2216
2217 TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2218 if (!strcmp(name, xattr->name)) {
2219 free(xattr->value);
2220 xattr->value_len = value_len;
2221 xattr->value = malloc(value_len);
2222 memcpy(xattr->value, value, value_len);
2223
2224 blob->state = SPDK_BLOB_STATE_DIRTY;
2225
2226 return 0;
2227 }
2228 }
2229
2230 xattr = calloc(1, sizeof(*xattr));
2231 if (!xattr) {
2232 return -1;
2233 }
2234 xattr->name = strdup(name);
2235 xattr->value_len = value_len;
2236 xattr->value = malloc(value_len);
2237 memcpy(xattr->value, value, value_len);
2238 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
2239
2240 blob->state = SPDK_BLOB_STATE_DIRTY;
2241
2242 return 0;
2243 }
2244
2245 int
2246 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name)
2247 {
2248 struct spdk_xattr *xattr;
2249
2250 assert(blob != NULL);
2251
2252 assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2253 blob->state != SPDK_BLOB_STATE_SYNCING);
2254
2255 TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2256 if (!strcmp(name, xattr->name)) {
2257 TAILQ_REMOVE(&blob->xattrs, xattr, link);
2258 free(xattr->value);
2259 free(xattr->name);
2260 free(xattr);
2261
2262 blob->state = SPDK_BLOB_STATE_DIRTY;
2263
2264 return 0;
2265 }
2266 }
2267
2268 return -ENOENT;
2269 }
2270
2271 int
2272 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name,
2273 const void **value, size_t *value_len)
2274 {
2275 struct spdk_xattr *xattr;
2276
2277 TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2278 if (!strcmp(name, xattr->name)) {
2279 *value = xattr->value;
2280 *value_len = xattr->value_len;
2281 return 0;
2282 }
2283 }
2284
2285 return -ENOENT;
2286 }
2287
2288 struct spdk_xattr_names {
2289 uint32_t count;
2290 const char *names[0];
2291 };
2292
2293 int
2294 spdk_bs_md_get_xattr_names(struct spdk_blob *blob,
2295 struct spdk_xattr_names **names)
2296 {
2297 struct spdk_xattr *xattr;
2298 int count = 0;
2299
2300 TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2301 count++;
2302 }
2303
2304 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
2305 if (*names == NULL) {
2306 return -ENOMEM;
2307 }
2308
2309 TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2310 (*names)->names[(*names)->count++] = xattr->name;
2311 }
2312
2313 return 0;
2314 }
2315
2316 uint32_t
2317 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
2318 {
2319 assert(names != NULL);
2320
2321 return names->count;
2322 }
2323
2324 const char *
2325 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
2326 {
2327 if (index >= names->count) {
2328 return NULL;
2329 }
2330
2331 return names->names[index];
2332 }
2333
2334 void
2335 spdk_xattr_names_free(struct spdk_xattr_names *names)
2336 {
2337 free(names);
2338 }
2339
2340 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB);