]> git.proxmox.com Git - ceph.git/blame - ceph/src/spdk/lib/blob/blobstore.c
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / spdk / lib / blob / blobstore.c
CommitLineData
7c673cae
FG
1/*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
11fdf7f2 34#include "spdk/stdinc.h"
7c673cae
FG
35
36#include "spdk/blob.h"
11fdf7f2 37#include "spdk/crc32.h"
7c673cae
FG
38#include "spdk/env.h"
39#include "spdk/queue.h"
11fdf7f2 40#include "spdk/thread.h"
7c673cae 41#include "spdk/bit_array.h"
11fdf7f2 42#include "spdk/likely.h"
7c673cae 43
11fdf7f2 44#include "spdk_internal/assert.h"
7c673cae
FG
45#include "spdk_internal/log.h"
46
47#include "blobstore.h"
11fdf7f2
TL
48
49#define BLOB_CRC32C_INITIAL 0xffffffffUL
50
51static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
52static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
53static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
54static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
55 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg);
56
57static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
58 uint16_t value_len, bool internal);
59static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
60 const void **value, size_t *value_len, bool internal);
61static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal);
62
63static void
64_spdk_blob_verify_md_op(struct spdk_blob *blob)
65{
66 assert(blob != NULL);
67 assert(spdk_get_thread() == blob->bs->md_thread);
68 assert(blob->state != SPDK_BLOB_STATE_LOADING);
69}
7c673cae
FG
70
71static inline size_t
72divide_round_up(size_t num, size_t divisor)
73{
74 return (num + divisor - 1) / divisor;
75}
76
77static void
78_spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
79{
80 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
81 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
82 assert(bs->num_free_clusters > 0);
83
11fdf7f2 84 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
7c673cae
FG
85
86 spdk_bit_array_set(bs->used_clusters, cluster_num);
87 bs->num_free_clusters--;
88}
89
11fdf7f2
TL
90static int
91_spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster)
92{
93 uint64_t *cluster_lba = &blob->active.clusters[cluster_num];
94
95 _spdk_blob_verify_md_op(blob);
96
97 if (*cluster_lba != 0) {
98 return -EEXIST;
99 }
100
101 *cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster);
102 return 0;
103}
104
105static int
106_spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num,
107 uint64_t *lowest_free_cluster, bool update_map)
108{
109 pthread_mutex_lock(&blob->bs->used_clusters_mutex);
110 *lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters,
111 *lowest_free_cluster);
112 if (*lowest_free_cluster == UINT32_MAX) {
113 /* No more free clusters. Cannot satisfy the request */
114 pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
115 return -ENOSPC;
116 }
117
118 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id);
119 _spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster);
120 pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
121
122 if (update_map) {
123 _spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster);
124 }
125
126 return 0;
127}
128
7c673cae
FG
129static void
130_spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
131{
132 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
133 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
134 assert(bs->num_free_clusters < bs->total_clusters);
135
11fdf7f2 136 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
7c673cae 137
11fdf7f2 138 pthread_mutex_lock(&bs->used_clusters_mutex);
7c673cae
FG
139 spdk_bit_array_clear(bs->used_clusters, cluster_num);
140 bs->num_free_clusters++;
11fdf7f2
TL
141 pthread_mutex_unlock(&bs->used_clusters_mutex);
142}
143
144static void
145_spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs)
146{
147 xattrs->count = 0;
148 xattrs->names = NULL;
149 xattrs->ctx = NULL;
150 xattrs->get_value = NULL;
151}
152
153void
154spdk_blob_opts_init(struct spdk_blob_opts *opts)
155{
156 opts->num_clusters = 0;
157 opts->thin_provision = false;
158 _spdk_blob_xattrs_init(&opts->xattrs);
7c673cae
FG
159}
160
161static struct spdk_blob *
162_spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
163{
164 struct spdk_blob *blob;
165
166 blob = calloc(1, sizeof(*blob));
167 if (!blob) {
168 return NULL;
169 }
170
171 blob->id = id;
172 blob->bs = bs;
173
11fdf7f2
TL
174 blob->parent_id = SPDK_BLOBID_INVALID;
175
7c673cae
FG
176 blob->state = SPDK_BLOB_STATE_DIRTY;
177 blob->active.num_pages = 1;
178 blob->active.pages = calloc(1, sizeof(*blob->active.pages));
179 if (!blob->active.pages) {
180 free(blob);
181 return NULL;
182 }
183
184 blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
185
186 TAILQ_INIT(&blob->xattrs);
11fdf7f2 187 TAILQ_INIT(&blob->xattrs_internal);
7c673cae
FG
188
189 return blob;
190}
191
192static void
11fdf7f2 193_spdk_xattrs_free(struct spdk_xattr_tailq *xattrs)
7c673cae 194{
11fdf7f2
TL
195 struct spdk_xattr *xattr, *xattr_tmp;
196
197 TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) {
198 TAILQ_REMOVE(xattrs, xattr, link);
199 free(xattr->name);
200 free(xattr->value);
201 free(xattr);
202 }
203}
7c673cae 204
11fdf7f2
TL
205static void
206_spdk_blob_free(struct spdk_blob *blob)
207{
7c673cae
FG
208 assert(blob != NULL);
209
210 free(blob->active.clusters);
211 free(blob->clean.clusters);
212 free(blob->active.pages);
213 free(blob->clean.pages);
214
11fdf7f2
TL
215 _spdk_xattrs_free(&blob->xattrs);
216 _spdk_xattrs_free(&blob->xattrs_internal);
217
218 if (blob->back_bs_dev) {
219 blob->back_bs_dev->destroy(blob->back_bs_dev);
7c673cae
FG
220 }
221
222 free(blob);
223}
224
11fdf7f2
TL
225struct freeze_io_ctx {
226 struct spdk_bs_cpl cpl;
227 struct spdk_blob *blob;
228};
229
230static void
231_spdk_blob_io_sync(struct spdk_io_channel_iter *i)
232{
233 spdk_for_each_channel_continue(i, 0);
234}
235
236static void
237_spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i)
238{
239 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
240 struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch);
241 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
242 struct spdk_bs_request_set *set;
243 struct spdk_bs_user_op_args *args;
244 spdk_bs_user_op_t *op, *tmp;
245
246 TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) {
247 set = (struct spdk_bs_request_set *)op;
248 args = &set->u.user_op;
249
250 if (args->blob == ctx->blob) {
251 TAILQ_REMOVE(&ch->queued_io, op, link);
252 spdk_bs_user_op_execute(op);
253 }
254 }
255
256 spdk_for_each_channel_continue(i, 0);
257}
258
259static void
260_spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status)
261{
262 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
263
264 ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0);
265
266 free(ctx);
267}
268
269static void
270_spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
271{
272 struct freeze_io_ctx *ctx;
273
274 ctx = calloc(1, sizeof(*ctx));
275 if (!ctx) {
276 cb_fn(cb_arg, -ENOMEM);
277 return;
278 }
279
280 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
281 ctx->cpl.u.blob_basic.cb_fn = cb_fn;
282 ctx->cpl.u.blob_basic.cb_arg = cb_arg;
283 ctx->blob = blob;
284
285 /* Freeze I/O on blob */
286 blob->frozen_refcnt++;
287
288 if (blob->frozen_refcnt == 1) {
289 spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl);
290 } else {
291 cb_fn(cb_arg, 0);
292 free(ctx);
293 }
294}
295
296static void
297_spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
298{
299 struct freeze_io_ctx *ctx;
300
301 ctx = calloc(1, sizeof(*ctx));
302 if (!ctx) {
303 cb_fn(cb_arg, -ENOMEM);
304 return;
305 }
306
307 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
308 ctx->cpl.u.blob_basic.cb_fn = cb_fn;
309 ctx->cpl.u.blob_basic.cb_arg = cb_arg;
310 ctx->blob = blob;
311
312 assert(blob->frozen_refcnt > 0);
313
314 blob->frozen_refcnt--;
315
316 if (blob->frozen_refcnt == 0) {
317 spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl);
318 } else {
319 cb_fn(cb_arg, 0);
320 free(ctx);
321 }
322}
323
7c673cae
FG
324static int
325_spdk_blob_mark_clean(struct spdk_blob *blob)
326{
327 uint64_t *clusters = NULL;
328 uint32_t *pages = NULL;
329
330 assert(blob != NULL);
7c673cae
FG
331
332 if (blob->active.num_clusters) {
333 assert(blob->active.clusters);
334 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
335 if (!clusters) {
11fdf7f2 336 return -ENOMEM;
7c673cae
FG
337 }
338 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
339 }
340
341 if (blob->active.num_pages) {
342 assert(blob->active.pages);
343 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
344 if (!pages) {
345 free(clusters);
11fdf7f2 346 return -ENOMEM;
7c673cae
FG
347 }
348 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
349 }
350
351 free(blob->clean.clusters);
352 free(blob->clean.pages);
353
354 blob->clean.num_clusters = blob->active.num_clusters;
355 blob->clean.clusters = blob->active.clusters;
356 blob->clean.num_pages = blob->active.num_pages;
357 blob->clean.pages = blob->active.pages;
358
359 blob->active.clusters = clusters;
360 blob->active.pages = pages;
361
11fdf7f2
TL
362 /* If the metadata was dirtied again while the metadata was being written to disk,
363 * we do not want to revert the DIRTY state back to CLEAN here.
364 */
365 if (blob->state == SPDK_BLOB_STATE_LOADING) {
366 blob->state = SPDK_BLOB_STATE_CLEAN;
367 }
7c673cae
FG
368
369 return 0;
370}
371
11fdf7f2
TL
372static int
373_spdk_blob_deserialize_xattr(struct spdk_blob *blob,
374 struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal)
375{
376 struct spdk_xattr *xattr;
377
378 if (desc_xattr->length != sizeof(desc_xattr->name_length) +
379 sizeof(desc_xattr->value_length) +
380 desc_xattr->name_length + desc_xattr->value_length) {
381 return -EINVAL;
382 }
383
384 xattr = calloc(1, sizeof(*xattr));
385 if (xattr == NULL) {
386 return -ENOMEM;
387 }
388
389 xattr->name = malloc(desc_xattr->name_length + 1);
390 if (xattr->name == NULL) {
391 free(xattr);
392 return -ENOMEM;
393 }
394 memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
395 xattr->name[desc_xattr->name_length] = '\0';
396
397 xattr->value = malloc(desc_xattr->value_length);
398 if (xattr->value == NULL) {
399 free(xattr->name);
400 free(xattr);
401 return -ENOMEM;
402 }
403 xattr->value_len = desc_xattr->value_length;
404 memcpy(xattr->value,
405 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
406 desc_xattr->value_length);
407
408 TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link);
409
410 return 0;
411}
412
413
414static int
7c673cae
FG
415_spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
416{
417 struct spdk_blob_md_descriptor *desc;
418 size_t cur_desc = 0;
419 void *tmp;
420
421 desc = (struct spdk_blob_md_descriptor *)page->descriptors;
422 while (cur_desc < sizeof(page->descriptors)) {
423 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
424 if (desc->length == 0) {
425 /* If padding and length are 0, this terminates the page */
426 break;
427 }
11fdf7f2
TL
428 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
429 struct spdk_blob_md_descriptor_flags *desc_flags;
430
431 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
432
433 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
434 return -EINVAL;
435 }
436
437 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
438 SPDK_BLOB_INVALID_FLAGS_MASK) {
439 return -EINVAL;
440 }
441
442 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
443 SPDK_BLOB_DATA_RO_FLAGS_MASK) {
444 blob->data_ro = true;
445 blob->md_ro = true;
446 }
447
448 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
449 SPDK_BLOB_MD_RO_FLAGS_MASK) {
450 blob->md_ro = true;
451 }
452
453 if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
454 blob->data_ro = true;
455 blob->md_ro = true;
456 }
457
458 blob->invalid_flags = desc_flags->invalid_flags;
459 blob->data_ro_flags = desc_flags->data_ro_flags;
460 blob->md_ro_flags = desc_flags->md_ro_flags;
461
7c673cae
FG
462 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
463 struct spdk_blob_md_descriptor_extent *desc_extent;
464 unsigned int i, j;
465 unsigned int cluster_count = blob->active.num_clusters;
466
467 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
468
11fdf7f2
TL
469 if (desc_extent->length == 0 ||
470 (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
471 return -EINVAL;
472 }
7c673cae
FG
473
474 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
475 for (j = 0; j < desc_extent->extents[i].length; j++) {
11fdf7f2
TL
476 if (desc_extent->extents[i].cluster_idx != 0) {
477 if (!spdk_bit_array_get(blob->bs->used_clusters,
478 desc_extent->extents[i].cluster_idx + j)) {
479 return -EINVAL;
480 }
481 }
7c673cae
FG
482 cluster_count++;
483 }
484 }
485
11fdf7f2
TL
486 if (cluster_count == 0) {
487 return -EINVAL;
488 }
7c673cae 489 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
11fdf7f2
TL
490 if (tmp == NULL) {
491 return -ENOMEM;
492 }
7c673cae
FG
493 blob->active.clusters = tmp;
494 blob->active.cluster_array_size = cluster_count;
495
496 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
497 for (j = 0; j < desc_extent->extents[i].length; j++) {
11fdf7f2
TL
498 if (desc_extent->extents[i].cluster_idx != 0) {
499 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
500 desc_extent->extents[i].cluster_idx + j);
501 } else if (spdk_blob_is_thin_provisioned(blob)) {
502 blob->active.clusters[blob->active.num_clusters++] = 0;
503 } else {
504 return -EINVAL;
505 }
7c673cae
FG
506 }
507 }
508
509 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
11fdf7f2 510 int rc;
7c673cae 511
11fdf7f2
TL
512 rc = _spdk_blob_deserialize_xattr(blob,
513 (struct spdk_blob_md_descriptor_xattr *) desc, false);
514 if (rc != 0) {
515 return rc;
516 }
517 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
518 int rc;
7c673cae 519
11fdf7f2
TL
520 rc = _spdk_blob_deserialize_xattr(blob,
521 (struct spdk_blob_md_descriptor_xattr *) desc, true);
522 if (rc != 0) {
523 return rc;
524 }
7c673cae 525 } else {
11fdf7f2
TL
526 /* Unrecognized descriptor type. Do not fail - just continue to the
527 * next descriptor. If this descriptor is associated with some feature
528 * defined in a newer version of blobstore, that version of blobstore
529 * should create and set an associated feature flag to specify if this
530 * blob can be loaded or not.
531 */
7c673cae
FG
532 }
533
534 /* Advance to the next descriptor */
535 cur_desc += sizeof(*desc) + desc->length;
536 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
537 break;
538 }
539 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
540 }
11fdf7f2
TL
541
542 return 0;
7c673cae
FG
543}
544
545static int
546_spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
547 struct spdk_blob *blob)
548{
549 const struct spdk_blob_md_page *page;
550 uint32_t i;
11fdf7f2 551 int rc;
7c673cae
FG
552
553 assert(page_count > 0);
554 assert(pages[0].sequence_num == 0);
555 assert(blob != NULL);
556 assert(blob->state == SPDK_BLOB_STATE_LOADING);
557 assert(blob->active.clusters == NULL);
11fdf7f2
TL
558
559 /* The blobid provided doesn't match what's in the MD, this can
560 * happen for example if a bogus blobid is passed in through open.
561 */
562 if (blob->id != pages[0].id) {
563 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
564 blob->id, pages[0].id);
565 return -ENOENT;
566 }
7c673cae
FG
567
568 for (i = 0; i < page_count; i++) {
569 page = &pages[i];
570
571 assert(page->id == blob->id);
572 assert(page->sequence_num == i);
573
11fdf7f2
TL
574 rc = _spdk_blob_parse_page(page, blob);
575 if (rc != 0) {
576 return rc;
577 }
7c673cae
FG
578 }
579
580 return 0;
581}
582
583static int
584_spdk_blob_serialize_add_page(const struct spdk_blob *blob,
585 struct spdk_blob_md_page **pages,
586 uint32_t *page_count,
587 struct spdk_blob_md_page **last_page)
588{
589 struct spdk_blob_md_page *page;
590
591 assert(pages != NULL);
592 assert(page_count != NULL);
593
594 if (*page_count == 0) {
595 assert(*pages == NULL);
596 *page_count = 1;
11fdf7f2
TL
597 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
598 SPDK_BS_PAGE_SIZE,
599 NULL);
7c673cae
FG
600 } else {
601 assert(*pages != NULL);
602 (*page_count)++;
11fdf7f2
TL
603 *pages = spdk_dma_realloc(*pages,
604 SPDK_BS_PAGE_SIZE * (*page_count),
605 SPDK_BS_PAGE_SIZE,
606 NULL);
7c673cae
FG
607 }
608
609 if (*pages == NULL) {
610 *page_count = 0;
611 *last_page = NULL;
612 return -ENOMEM;
613 }
614
615 page = &(*pages)[*page_count - 1];
616 memset(page, 0, sizeof(*page));
617 page->id = blob->id;
618 page->sequence_num = *page_count - 1;
619 page->next = SPDK_INVALID_MD_PAGE;
620 *last_page = page;
621
622 return 0;
623}
624
625/* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
626 * Update required_sz on both success and failure.
627 *
628 */
629static int
630_spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
631 uint8_t *buf, size_t buf_sz,
11fdf7f2 632 size_t *required_sz, bool internal)
7c673cae
FG
633{
634 struct spdk_blob_md_descriptor_xattr *desc;
635
636 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
637 strlen(xattr->name) +
638 xattr->value_len;
639
640 if (buf_sz < *required_sz) {
641 return -1;
642 }
643
644 desc = (struct spdk_blob_md_descriptor_xattr *)buf;
645
11fdf7f2 646 desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR;
7c673cae
FG
647 desc->length = sizeof(desc->name_length) +
648 sizeof(desc->value_length) +
649 strlen(xattr->name) +
650 xattr->value_len;
651 desc->name_length = strlen(xattr->name);
652 desc->value_length = xattr->value_len;
653
654 memcpy(desc->name, xattr->name, desc->name_length);
655 memcpy((void *)((uintptr_t)desc->name + desc->name_length),
656 xattr->value,
657 desc->value_length);
658
659 return 0;
660}
661
662static void
663_spdk_blob_serialize_extent(const struct spdk_blob *blob,
664 uint64_t start_cluster, uint64_t *next_cluster,
665 uint8_t *buf, size_t buf_sz)
666{
667 struct spdk_blob_md_descriptor_extent *desc;
668 size_t cur_sz;
669 uint64_t i, extent_idx;
11fdf7f2 670 uint64_t lba, lba_per_cluster, lba_count;
7c673cae
FG
671
672 /* The buffer must have room for at least one extent */
673 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
674 if (buf_sz < cur_sz) {
675 *next_cluster = start_cluster;
676 return;
677 }
678
679 desc = (struct spdk_blob_md_descriptor_extent *)buf;
680 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
681
682 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
683
684 lba = blob->active.clusters[start_cluster];
685 lba_count = lba_per_cluster;
686 extent_idx = 0;
687 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
688 if ((lba + lba_count) == blob->active.clusters[i]) {
689 lba_count += lba_per_cluster;
690 continue;
11fdf7f2
TL
691 } else if (lba == 0 && blob->active.clusters[i] == 0) {
692 lba_count += lba_per_cluster;
693 continue;
7c673cae
FG
694 }
695 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
696 desc->extents[extent_idx].length = lba_count / lba_per_cluster;
697 extent_idx++;
698
699 cur_sz += sizeof(desc->extents[extent_idx]);
700
701 if (buf_sz < cur_sz) {
702 /* If we ran out of buffer space, return */
703 desc->length = sizeof(desc->extents[0]) * extent_idx;
704 *next_cluster = i;
705 return;
706 }
707
708 lba = blob->active.clusters[i];
709 lba_count = lba_per_cluster;
710 }
711
712 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
713 desc->extents[extent_idx].length = lba_count / lba_per_cluster;
714 extent_idx++;
715
716 desc->length = sizeof(desc->extents[0]) * extent_idx;
717 *next_cluster = blob->active.num_clusters;
718
719 return;
720}
721
11fdf7f2
TL
722static void
723_spdk_blob_serialize_flags(const struct spdk_blob *blob,
724 uint8_t *buf, size_t *buf_sz)
7c673cae 725{
11fdf7f2 726 struct spdk_blob_md_descriptor_flags *desc;
7c673cae 727
11fdf7f2
TL
728 /*
729 * Flags get serialized first, so we should always have room for the flags
730 * descriptor.
731 */
732 assert(*buf_sz >= sizeof(*desc));
7c673cae 733
11fdf7f2
TL
734 desc = (struct spdk_blob_md_descriptor_flags *)buf;
735 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
736 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
737 desc->invalid_flags = blob->invalid_flags;
738 desc->data_ro_flags = blob->data_ro_flags;
739 desc->md_ro_flags = blob->md_ro_flags;
7c673cae 740
11fdf7f2
TL
741 *buf_sz -= sizeof(*desc);
742}
7c673cae 743
11fdf7f2
TL
744static int
745_spdk_blob_serialize_xattrs(const struct spdk_blob *blob,
746 const struct spdk_xattr_tailq *xattrs, bool internal,
747 struct spdk_blob_md_page **pages,
748 struct spdk_blob_md_page *cur_page,
749 uint32_t *page_count, uint8_t **buf,
750 size_t *remaining_sz)
751{
752 const struct spdk_xattr *xattr;
753 int rc;
7c673cae 754
11fdf7f2 755 TAILQ_FOREACH(xattr, xattrs, link) {
7c673cae 756 size_t required_sz = 0;
11fdf7f2 757
7c673cae 758 rc = _spdk_blob_serialize_xattr(xattr,
11fdf7f2
TL
759 *buf, *remaining_sz,
760 &required_sz, internal);
7c673cae
FG
761 if (rc < 0) {
762 /* Need to add a new page to the chain */
763 rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
764 &cur_page);
765 if (rc < 0) {
11fdf7f2 766 spdk_dma_free(*pages);
7c673cae
FG
767 *pages = NULL;
768 *page_count = 0;
769 return rc;
770 }
771
11fdf7f2
TL
772 *buf = (uint8_t *)cur_page->descriptors;
773 *remaining_sz = sizeof(cur_page->descriptors);
7c673cae
FG
774
775 /* Try again */
776 required_sz = 0;
777 rc = _spdk_blob_serialize_xattr(xattr,
11fdf7f2
TL
778 *buf, *remaining_sz,
779 &required_sz, internal);
7c673cae
FG
780
781 if (rc < 0) {
11fdf7f2 782 spdk_dma_free(*pages);
7c673cae
FG
783 *pages = NULL;
784 *page_count = 0;
11fdf7f2 785 return rc;
7c673cae
FG
786 }
787 }
788
11fdf7f2
TL
789 *remaining_sz -= required_sz;
790 *buf += required_sz;
791 }
792
793 return 0;
794}
795
796static int
797_spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
798 uint32_t *page_count)
799{
800 struct spdk_blob_md_page *cur_page;
801 int rc;
802 uint8_t *buf;
803 size_t remaining_sz;
804 uint64_t last_cluster;
805
806 assert(pages != NULL);
807 assert(page_count != NULL);
808 assert(blob != NULL);
809 assert(blob->state == SPDK_BLOB_STATE_DIRTY);
810
811 *pages = NULL;
812 *page_count = 0;
813
814 /* A blob always has at least 1 page, even if it has no descriptors */
815 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
816 if (rc < 0) {
817 return rc;
818 }
819
820 buf = (uint8_t *)cur_page->descriptors;
821 remaining_sz = sizeof(cur_page->descriptors);
822
823 /* Serialize flags */
824 _spdk_blob_serialize_flags(blob, buf, &remaining_sz);
825 buf += sizeof(struct spdk_blob_md_descriptor_flags);
826
827 /* Serialize xattrs */
828 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false,
829 pages, cur_page, page_count, &buf, &remaining_sz);
830 if (rc < 0) {
831 return rc;
832 }
833
834 /* Serialize internal xattrs */
835 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true,
836 pages, cur_page, page_count, &buf, &remaining_sz);
837 if (rc < 0) {
838 return rc;
7c673cae
FG
839 }
840
841 /* Serialize extents */
11fdf7f2 842 last_cluster = 0;
7c673cae
FG
843 while (last_cluster < blob->active.num_clusters) {
844 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
845 buf, remaining_sz);
846
847 if (last_cluster == blob->active.num_clusters) {
848 break;
849 }
850
851 rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
852 &cur_page);
853 if (rc < 0) {
854 return rc;
855 }
856
857 buf = (uint8_t *)cur_page->descriptors;
858 remaining_sz = sizeof(cur_page->descriptors);
859 }
860
861 return 0;
862}
863
864struct spdk_blob_load_ctx {
11fdf7f2 865 struct spdk_blob *blob;
7c673cae
FG
866
867 struct spdk_blob_md_page *pages;
868 uint32_t num_pages;
11fdf7f2 869 spdk_bs_sequence_t *seq;
7c673cae
FG
870
871 spdk_bs_sequence_cpl cb_fn;
872 void *cb_arg;
873};
874
11fdf7f2
TL
875static uint32_t
876_spdk_blob_md_page_calc_crc(void *page)
7c673cae 877{
11fdf7f2 878 uint32_t crc;
7c673cae 879
11fdf7f2
TL
880 crc = BLOB_CRC32C_INITIAL;
881 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
882 crc ^= BLOB_CRC32C_INITIAL;
7c673cae 883
11fdf7f2 884 return crc;
7c673cae 885
11fdf7f2 886}
7c673cae 887
11fdf7f2
TL
888static void
889_spdk_blob_load_final(void *cb_arg, int bserrno)
890{
891 struct spdk_blob_load_ctx *ctx = cb_arg;
892 struct spdk_blob *blob = ctx->blob;
7c673cae 893
11fdf7f2
TL
894 _spdk_blob_mark_clean(blob);
895
896 ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno);
897
898 /* Free the memory */
899 spdk_dma_free(ctx->pages);
900 free(ctx);
901}
902
903static void
904_spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno)
905{
906 struct spdk_blob_load_ctx *ctx = cb_arg;
907 struct spdk_blob *blob = ctx->blob;
908
909 if (bserrno != 0) {
910 goto error;
911 }
912
913 blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot);
914
915 if (blob->back_bs_dev == NULL) {
916 bserrno = -ENOMEM;
917 goto error;
918 }
919
920 _spdk_blob_load_final(ctx, bserrno);
921 return;
922
923error:
924 SPDK_ERRLOG("Snapshot fail\n");
925 _spdk_blob_free(blob);
926 ctx->cb_fn(ctx->seq, NULL, bserrno);
927 spdk_dma_free(ctx->pages);
928 free(ctx);
929}
930
931static void
932_spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
933{
934 struct spdk_blob_load_ctx *ctx = cb_arg;
935 struct spdk_blob *blob = ctx->blob;
936 struct spdk_blob_md_page *page;
937 const void *value;
938 size_t len;
939 int rc;
940 uint32_t crc;
941
942 page = &ctx->pages[ctx->num_pages - 1];
943 crc = _spdk_blob_md_page_calc_crc(page);
944 if (crc != page->crc) {
945 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
946 _spdk_blob_free(blob);
947 ctx->cb_fn(seq, NULL, -EINVAL);
948 spdk_dma_free(ctx->pages);
949 free(ctx);
950 return;
951 }
952
953 if (page->next != SPDK_INVALID_MD_PAGE) {
954 uint32_t next_page = page->next;
955 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
956
957
958 assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
959
960 /* Read the next page */
961 ctx->num_pages++;
962 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
963 sizeof(*page), NULL);
964 if (ctx->pages == NULL) {
965 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
7c673cae
FG
966 free(ctx);
967 return;
968 }
969
11fdf7f2
TL
970 spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1],
971 next_lba,
972 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
973 _spdk_blob_load_cpl, ctx);
7c673cae
FG
974 return;
975 }
976
977 /* Parse the pages */
978 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
11fdf7f2
TL
979 if (rc) {
980 _spdk_blob_free(blob);
981 ctx->cb_fn(seq, NULL, rc);
982 spdk_dma_free(ctx->pages);
983 free(ctx);
984 return;
985 }
986 ctx->seq = seq;
987
988
989 if (spdk_blob_is_thin_provisioned(blob)) {
990 rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true);
991 if (rc == 0) {
992 if (len != sizeof(spdk_blob_id)) {
993 _spdk_blob_free(blob);
994 ctx->cb_fn(seq, NULL, -EINVAL);
995 spdk_dma_free(ctx->pages);
996 free(ctx);
997 return;
998 }
999 /* open snapshot blob and continue in the callback function */
1000 blob->parent_id = *(spdk_blob_id *)value;
1001 spdk_bs_open_blob(blob->bs, blob->parent_id,
1002 _spdk_blob_load_snapshot_cpl, ctx);
1003 return;
1004 } else {
1005 /* add zeroes_dev for thin provisioned blob */
1006 blob->back_bs_dev = spdk_bs_create_zeroes_dev();
1007 }
1008 } else {
1009 /* standard blob */
1010 blob->back_bs_dev = NULL;
1011 }
1012 _spdk_blob_load_final(ctx, bserrno);
7c673cae
FG
1013}
1014
1015/* Load a blob from disk given a blobid */
1016static void
1017_spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1018 spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1019{
1020 struct spdk_blob_load_ctx *ctx;
1021 struct spdk_blob_store *bs;
1022 uint32_t page_num;
1023 uint64_t lba;
1024
11fdf7f2 1025 _spdk_blob_verify_md_op(blob);
7c673cae
FG
1026
1027 bs = blob->bs;
1028
1029 ctx = calloc(1, sizeof(*ctx));
1030 if (!ctx) {
1031 cb_fn(seq, cb_arg, -ENOMEM);
1032 return;
1033 }
1034
1035 ctx->blob = blob;
11fdf7f2
TL
1036 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
1037 SPDK_BS_PAGE_SIZE, NULL);
7c673cae
FG
1038 if (!ctx->pages) {
1039 free(ctx);
1040 cb_fn(seq, cb_arg, -ENOMEM);
1041 return;
1042 }
1043 ctx->num_pages = 1;
1044 ctx->cb_fn = cb_fn;
1045 ctx->cb_arg = cb_arg;
1046
1047 page_num = _spdk_bs_blobid_to_page(blob->id);
1048 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
1049
1050 blob->state = SPDK_BLOB_STATE_LOADING;
1051
11fdf7f2
TL
1052 spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba,
1053 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
1054 _spdk_blob_load_cpl, ctx);
7c673cae
FG
1055}
1056
1057struct spdk_blob_persist_ctx {
11fdf7f2
TL
1058 struct spdk_blob *blob;
1059
1060 struct spdk_bs_super_block *super;
7c673cae
FG
1061
1062 struct spdk_blob_md_page *pages;
1063
1064 uint64_t idx;
1065
11fdf7f2 1066 spdk_bs_sequence_t *seq;
7c673cae
FG
1067 spdk_bs_sequence_cpl cb_fn;
1068 void *cb_arg;
1069};
1070
1071static void
1072_spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1073{
11fdf7f2
TL
1074 struct spdk_blob_persist_ctx *ctx = cb_arg;
1075 struct spdk_blob *blob = ctx->blob;
7c673cae
FG
1076
1077 if (bserrno == 0) {
1078 _spdk_blob_mark_clean(blob);
1079 }
1080
1081 /* Call user callback */
1082 ctx->cb_fn(seq, ctx->cb_arg, bserrno);
1083
1084 /* Free the memory */
11fdf7f2 1085 spdk_dma_free(ctx->pages);
7c673cae
FG
1086 free(ctx);
1087}
1088
1089static void
1090_spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1091{
11fdf7f2
TL
1092 struct spdk_blob_persist_ctx *ctx = cb_arg;
1093 struct spdk_blob *blob = ctx->blob;
7c673cae
FG
1094 struct spdk_blob_store *bs = blob->bs;
1095 void *tmp;
1096 size_t i;
1097
1098 /* Release all clusters that were truncated */
1099 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1100 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
1101
11fdf7f2
TL
1102 /* Nothing to release if it was not allocated */
1103 if (blob->active.clusters[i] != 0) {
1104 _spdk_bs_release_cluster(bs, cluster_num);
1105 }
7c673cae
FG
1106 }
1107
1108 if (blob->active.num_clusters == 0) {
1109 free(blob->active.clusters);
1110 blob->active.clusters = NULL;
1111 blob->active.cluster_array_size = 0;
1112 } else {
1113 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
1114 assert(tmp != NULL);
1115 blob->active.clusters = tmp;
1116 blob->active.cluster_array_size = blob->active.num_clusters;
1117 }
1118
1119 _spdk_blob_persist_complete(seq, ctx, bserrno);
1120}
1121
1122static void
1123_spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1124{
11fdf7f2
TL
1125 struct spdk_blob_persist_ctx *ctx = cb_arg;
1126 struct spdk_blob *blob = ctx->blob;
7c673cae
FG
1127 struct spdk_blob_store *bs = blob->bs;
1128 spdk_bs_batch_t *batch;
1129 size_t i;
11fdf7f2
TL
1130 uint64_t lba;
1131 uint32_t lba_count;
7c673cae
FG
1132
1133 /* Clusters don't move around in blobs. The list shrinks or grows
1134 * at the end, but no changes ever occur in the middle of the list.
1135 */
1136
1137 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
1138
1139 /* Unmap all clusters that were truncated */
11fdf7f2
TL
1140 lba = 0;
1141 lba_count = 0;
7c673cae 1142 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
11fdf7f2
TL
1143 uint64_t next_lba = blob->active.clusters[i];
1144 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
1145
1146 if (next_lba > 0 && (lba + lba_count) == next_lba) {
1147 /* This cluster is contiguous with the previous one. */
1148 lba_count += next_lba_count;
1149 continue;
1150 }
1151
1152 /* This cluster is not contiguous with the previous one. */
7c673cae 1153
11fdf7f2
TL
1154 /* If a run of LBAs previously existing, send them
1155 * as an unmap.
1156 */
1157 if (lba_count > 0) {
1158 spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1159 }
1160
1161 /* Start building the next batch */
1162 lba = next_lba;
1163 if (next_lba > 0) {
1164 lba_count = next_lba_count;
1165 } else {
1166 lba_count = 0;
1167 }
1168 }
1169
1170 /* If we ended with a contiguous set of LBAs, send the unmap now */
1171 if (lba_count > 0) {
1172 spdk_bs_batch_unmap_dev(batch, lba, lba_count);
7c673cae
FG
1173 }
1174
1175 spdk_bs_batch_close(batch);
1176}
1177
1178static void
11fdf7f2 1179_spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
7c673cae
FG
1180{
1181 struct spdk_blob_persist_ctx *ctx = cb_arg;
11fdf7f2 1182 struct spdk_blob *blob = ctx->blob;
7c673cae
FG
1183 struct spdk_blob_store *bs = blob->bs;
1184 size_t i;
1185
1186 /* This loop starts at 1 because the first page is special and handled
1187 * below. The pages (except the first) are never written in place,
11fdf7f2 1188 * so any pages in the clean list must be zeroed.
7c673cae
FG
1189 */
1190 for (i = 1; i < blob->clean.num_pages; i++) {
1191 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
1192 }
1193
1194 if (blob->active.num_pages == 0) {
1195 uint32_t page_num;
1196
1197 page_num = _spdk_bs_blobid_to_page(blob->id);
1198 spdk_bit_array_clear(bs->used_md_pages, page_num);
1199 }
1200
1201 /* Move on to unmapping clusters */
1202 _spdk_blob_persist_unmap_clusters(seq, ctx, 0);
1203}
1204
1205static void
11fdf7f2 1206_spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
7c673cae 1207{
11fdf7f2
TL
1208 struct spdk_blob_persist_ctx *ctx = cb_arg;
1209 struct spdk_blob *blob = ctx->blob;
7c673cae
FG
1210 struct spdk_blob_store *bs = blob->bs;
1211 uint64_t lba;
1212 uint32_t lba_count;
1213 spdk_bs_batch_t *batch;
1214 size_t i;
1215
11fdf7f2 1216 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
7c673cae 1217
11fdf7f2 1218 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
7c673cae
FG
1219
1220 /* This loop starts at 1 because the first page is special and handled
1221 * below. The pages (except the first) are never written in place,
11fdf7f2 1222 * so any pages in the clean list must be zeroed.
7c673cae
FG
1223 */
1224 for (i = 1; i < blob->clean.num_pages; i++) {
1225 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
1226
11fdf7f2 1227 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
7c673cae
FG
1228 }
1229
11fdf7f2 1230 /* The first page will only be zeroed if this is a delete. */
7c673cae
FG
1231 if (blob->active.num_pages == 0) {
1232 uint32_t page_num;
1233
1234 /* The first page in the metadata goes where the blobid indicates */
1235 page_num = _spdk_bs_blobid_to_page(blob->id);
1236 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
1237
11fdf7f2 1238 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
7c673cae
FG
1239 }
1240
1241 spdk_bs_batch_close(batch);
1242}
1243
1244static void
1245_spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1246{
1247 struct spdk_blob_persist_ctx *ctx = cb_arg;
1248 struct spdk_blob *blob = ctx->blob;
1249 struct spdk_blob_store *bs = blob->bs;
1250 uint64_t lba;
1251 uint32_t lba_count;
1252 struct spdk_blob_md_page *page;
1253
1254 if (blob->active.num_pages == 0) {
1255 /* Move on to the next step */
11fdf7f2 1256 _spdk_blob_persist_zero_pages(seq, ctx, 0);
7c673cae
FG
1257 return;
1258 }
1259
1260 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1261
1262 page = &ctx->pages[0];
1263 /* The first page in the metadata goes where the blobid indicates */
1264 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
1265
11fdf7f2
TL
1266 spdk_bs_sequence_write_dev(seq, page, lba, lba_count,
1267 _spdk_blob_persist_zero_pages, ctx);
7c673cae
FG
1268}
1269
1270static void
1271_spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1272{
11fdf7f2
TL
1273 struct spdk_blob_persist_ctx *ctx = cb_arg;
1274 struct spdk_blob *blob = ctx->blob;
7c673cae 1275 struct spdk_blob_store *bs = blob->bs;
11fdf7f2 1276 uint64_t lba;
7c673cae
FG
1277 uint32_t lba_count;
1278 struct spdk_blob_md_page *page;
1279 spdk_bs_batch_t *batch;
1280 size_t i;
1281
1282 /* Clusters don't move around in blobs. The list shrinks or grows
1283 * at the end, but no changes ever occur in the middle of the list.
1284 */
1285
1286 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1287
1288 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
1289
1290 /* This starts at 1. The root page is not written until
1291 * all of the others are finished
1292 */
1293 for (i = 1; i < blob->active.num_pages; i++) {
1294 page = &ctx->pages[i];
1295 assert(page->sequence_num == i);
1296
1297 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
1298
11fdf7f2 1299 spdk_bs_batch_write_dev(batch, page, lba, lba_count);
7c673cae
FG
1300 }
1301
1302 spdk_bs_batch_close(batch);
1303}
1304
1305static int
11fdf7f2 1306_spdk_blob_resize(struct spdk_blob *blob, uint64_t sz)
7c673cae
FG
1307{
1308 uint64_t i;
1309 uint64_t *tmp;
1310 uint64_t lfc; /* lowest free cluster */
11fdf7f2 1311 uint64_t num_clusters;
7c673cae
FG
1312 struct spdk_blob_store *bs;
1313
1314 bs = blob->bs;
1315
11fdf7f2 1316 _spdk_blob_verify_md_op(blob);
7c673cae
FG
1317
1318 if (blob->active.num_clusters == sz) {
1319 return 0;
1320 }
1321
1322 if (blob->active.num_clusters < blob->active.cluster_array_size) {
1323 /* If this blob was resized to be larger, then smaller, then
1324 * larger without syncing, then the cluster array already
1325 * contains spare assigned clusters we can use.
1326 */
11fdf7f2
TL
1327 num_clusters = spdk_min(blob->active.cluster_array_size,
1328 sz);
1329 } else {
1330 num_clusters = blob->active.num_clusters;
7c673cae
FG
1331 }
1332
7c673cae
FG
1333 /* Do two passes - one to verify that we can obtain enough clusters
1334 * and another to actually claim them.
1335 */
1336
11fdf7f2
TL
1337 if (spdk_blob_is_thin_provisioned(blob) == false) {
1338 lfc = 0;
1339 for (i = num_clusters; i < sz; i++) {
1340 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1341 if (lfc == UINT32_MAX) {
1342 /* No more free clusters. Cannot satisfy the request */
1343 return -ENOSPC;
1344 }
1345 lfc++;
7c673cae 1346 }
7c673cae
FG
1347 }
1348
11fdf7f2 1349 if (sz > num_clusters) {
7c673cae
FG
1350 /* Expand the cluster array if necessary.
1351 * We only shrink the array when persisting.
1352 */
1353 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1354 if (sz > 0 && tmp == NULL) {
11fdf7f2 1355 return -ENOMEM;
7c673cae 1356 }
11fdf7f2
TL
1357 memset(tmp + blob->active.cluster_array_size, 0,
1358 sizeof(uint64_t) * (sz - blob->active.cluster_array_size));
7c673cae
FG
1359 blob->active.clusters = tmp;
1360 blob->active.cluster_array_size = sz;
1361 }
1362
11fdf7f2
TL
1363 blob->state = SPDK_BLOB_STATE_DIRTY;
1364
1365 if (spdk_blob_is_thin_provisioned(blob) == false) {
1366 lfc = 0;
1367 for (i = num_clusters; i < sz; i++) {
1368 _spdk_bs_allocate_cluster(blob, i, &lfc, true);
1369 lfc++;
1370 }
7c673cae
FG
1371 }
1372
1373 blob->active.num_clusters = sz;
1374
1375 return 0;
1376}
1377
7c673cae 1378static void
11fdf7f2 1379_spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx)
7c673cae 1380{
11fdf7f2
TL
1381 spdk_bs_sequence_t *seq = ctx->seq;
1382 struct spdk_blob *blob = ctx->blob;
1383 struct spdk_blob_store *bs = blob->bs;
7c673cae
FG
1384 uint64_t i;
1385 uint32_t page_num;
11fdf7f2
TL
1386 void *tmp;
1387 int rc;
7c673cae
FG
1388
1389 if (blob->active.num_pages == 0) {
1390 /* This is the signal that the blob should be deleted.
1391 * Immediately jump to the clean up routine. */
1392 assert(blob->clean.num_pages > 0);
1393 ctx->idx = blob->clean.num_pages - 1;
11fdf7f2
TL
1394 blob->state = SPDK_BLOB_STATE_CLEAN;
1395 _spdk_blob_persist_zero_pages(seq, ctx, 0);
7c673cae
FG
1396 return;
1397
1398 }
1399
1400 /* Generate the new metadata */
1401 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1402 if (rc < 0) {
11fdf7f2 1403 _spdk_blob_persist_complete(seq, ctx, rc);
7c673cae
FG
1404 return;
1405 }
1406
1407 assert(blob->active.num_pages >= 1);
1408
1409 /* Resize the cache of page indices */
11fdf7f2
TL
1410 tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages));
1411 if (!tmp) {
1412 _spdk_blob_persist_complete(seq, ctx, -ENOMEM);
7c673cae
FG
1413 return;
1414 }
11fdf7f2 1415 blob->active.pages = tmp;
7c673cae
FG
1416
1417 /* Assign this metadata to pages. This requires two passes -
1418 * one to verify that there are enough pages and a second
1419 * to actually claim them. */
1420 page_num = 0;
1421 /* Note that this loop starts at one. The first page location is fixed by the blobid. */
1422 for (i = 1; i < blob->active.num_pages; i++) {
1423 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
11fdf7f2
TL
1424 if (page_num == UINT32_MAX) {
1425 _spdk_blob_persist_complete(seq, ctx, -ENOMEM);
7c673cae
FG
1426 return;
1427 }
1428 page_num++;
1429 }
1430
1431 page_num = 0;
1432 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1433 for (i = 1; i < blob->active.num_pages; i++) {
1434 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1435 ctx->pages[i - 1].next = page_num;
11fdf7f2
TL
1436 /* Now that previous metadata page is complete, calculate the crc for it. */
1437 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
7c673cae
FG
1438 blob->active.pages[i] = page_num;
1439 spdk_bit_array_set(bs->used_md_pages, page_num);
11fdf7f2 1440 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
7c673cae
FG
1441 page_num++;
1442 }
11fdf7f2 1443 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
7c673cae
FG
1444 /* Start writing the metadata from last page to first */
1445 ctx->idx = blob->active.num_pages - 1;
11fdf7f2 1446 blob->state = SPDK_BLOB_STATE_CLEAN;
7c673cae
FG
1447 _spdk_blob_persist_write_page_chain(seq, ctx, 0);
1448}
1449
1450static void
11fdf7f2 1451_spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
7c673cae 1452{
11fdf7f2 1453 struct spdk_blob_persist_ctx *ctx = cb_arg;
7c673cae 1454
11fdf7f2 1455 ctx->blob->bs->clean = 0;
7c673cae 1456
11fdf7f2
TL
1457 spdk_dma_free(ctx->super);
1458
1459 _spdk_blob_persist_start(ctx);
1460}
1461
1462static void
1463_spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1464 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg);
1465
1466
1467static void
1468_spdk_blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1469{
1470 struct spdk_blob_persist_ctx *ctx = cb_arg;
1471
1472 ctx->super->clean = 0;
1473 if (ctx->super->size == 0) {
1474 ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen;
7c673cae
FG
1475 }
1476
11fdf7f2
TL
1477 _spdk_bs_write_super(seq, ctx->blob->bs, ctx->super, _spdk_blob_persist_dirty_cpl, ctx);
1478}
7c673cae 1479
11fdf7f2
TL
1480
1481/* Write a blob to disk */
1482static void
1483_spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1484 spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1485{
1486 struct spdk_blob_persist_ctx *ctx;
1487
1488 _spdk_blob_verify_md_op(blob);
1489
1490 if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1491 cb_fn(seq, cb_arg, 0);
7c673cae
FG
1492 return;
1493 }
1494
11fdf7f2
TL
1495 ctx = calloc(1, sizeof(*ctx));
1496 if (!ctx) {
1497 cb_fn(seq, cb_arg, -ENOMEM);
1498 return;
1499 }
1500 ctx->blob = blob;
1501 ctx->seq = seq;
1502 ctx->cb_fn = cb_fn;
1503 ctx->cb_arg = cb_arg;
7c673cae 1504
11fdf7f2
TL
1505 if (blob->bs->clean) {
1506 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1507 if (!ctx->super) {
1508 cb_fn(seq, cb_arg, -ENOMEM);
1509 free(ctx);
1510 return;
7c673cae
FG
1511 }
1512
11fdf7f2
TL
1513 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(blob->bs, 0),
1514 _spdk_bs_byte_to_lba(blob->bs, sizeof(*ctx->super)),
1515 _spdk_blob_persist_dirty, ctx);
1516 } else {
1517 _spdk_blob_persist_start(ctx);
7c673cae 1518 }
7c673cae
FG
1519}
1520
11fdf7f2 1521struct spdk_blob_copy_cluster_ctx {
7c673cae 1522 struct spdk_blob *blob;
11fdf7f2
TL
1523 uint8_t *buf;
1524 uint64_t page;
1525 uint64_t new_cluster;
1526 spdk_bs_sequence_t *seq;
1527};
7c673cae 1528
11fdf7f2
TL
1529static void
1530_spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno)
1531{
1532 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1533 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq;
1534 TAILQ_HEAD(, spdk_bs_request_set) requests;
1535 spdk_bs_user_op_t *op;
1536
1537 TAILQ_INIT(&requests);
1538 TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link);
1539
1540 while (!TAILQ_EMPTY(&requests)) {
1541 op = TAILQ_FIRST(&requests);
1542 TAILQ_REMOVE(&requests, op, link);
1543 if (bserrno == 0) {
1544 spdk_bs_user_op_execute(op);
1545 } else {
1546 spdk_bs_user_op_abort(op);
7c673cae
FG
1547 }
1548 }
1549
11fdf7f2
TL
1550 spdk_dma_free(ctx->buf);
1551 free(ctx);
7c673cae
FG
1552}
1553
11fdf7f2
TL
1554static void
1555_spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno)
7c673cae 1556{
11fdf7f2 1557 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
7c673cae 1558
11fdf7f2
TL
1559 if (bserrno) {
1560 uint32_t cluster_number;
7c673cae 1561
11fdf7f2
TL
1562 if (bserrno == -EEXIST) {
1563 /* The metadata insert failed because another thread
1564 * allocated the cluster first. Free our cluster
1565 * but continue without error. */
1566 bserrno = 0;
1567 }
7c673cae 1568
11fdf7f2
TL
1569 cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1570 _spdk_bs_release_cluster(ctx->blob->bs, cluster_number);
7c673cae
FG
1571 }
1572
11fdf7f2 1573 spdk_bs_sequence_finish(ctx->seq, bserrno);
7c673cae
FG
1574}
1575
1576static void
11fdf7f2 1577_spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
7c673cae 1578{
11fdf7f2
TL
1579 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1580 uint32_t cluster_number;
7c673cae 1581
11fdf7f2
TL
1582 if (bserrno) {
1583 /* The write failed, so jump to the final completion handler */
1584 spdk_bs_sequence_finish(seq, bserrno);
1585 return;
1586 }
1587
1588 cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1589
1590 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1591 _spdk_blob_insert_cluster_cpl, ctx);
7c673cae
FG
1592}
1593
1594static void
11fdf7f2 1595_spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
7c673cae 1596{
11fdf7f2 1597 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
7c673cae 1598
11fdf7f2
TL
1599 if (bserrno != 0) {
1600 /* The read failed, so jump to the final completion handler */
1601 spdk_bs_sequence_finish(seq, bserrno);
1602 return;
7c673cae
FG
1603 }
1604
11fdf7f2
TL
1605 /* Write whole cluster */
1606 spdk_bs_sequence_write_dev(seq, ctx->buf,
1607 _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster),
1608 _spdk_bs_cluster_to_lba(ctx->blob->bs, 1),
1609 _spdk_blob_write_copy_cpl, ctx);
7c673cae
FG
1610}
1611
11fdf7f2
TL
1612static void
1613_spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob,
1614 struct spdk_io_channel *_ch,
1615 uint64_t io_unit, spdk_bs_user_op_t *op)
7c673cae 1616{
11fdf7f2
TL
1617 struct spdk_bs_cpl cpl;
1618 struct spdk_bs_channel *ch;
1619 struct spdk_blob_copy_cluster_ctx *ctx;
1620 uint32_t cluster_start_page;
1621 uint32_t cluster_number;
1622 int rc;
7c673cae 1623
11fdf7f2 1624 ch = spdk_io_channel_get_ctx(_ch);
7c673cae 1625
11fdf7f2
TL
1626 if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) {
1627 /* There are already operations pending. Queue this user op
1628 * and return because it will be re-executed when the outstanding
1629 * cluster allocation completes. */
1630 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1631 return;
7c673cae
FG
1632 }
1633
11fdf7f2
TL
1634 /* Round the io_unit offset down to the first page in the cluster */
1635 cluster_start_page = _spdk_bs_io_unit_to_cluster_start(blob, io_unit);
7c673cae 1636
11fdf7f2
TL
1637 /* Calculate which index in the metadata cluster array the corresponding
1638 * cluster is supposed to be at. */
1639 cluster_number = _spdk_bs_io_unit_to_cluster_number(blob, io_unit);
1640
1641 ctx = calloc(1, sizeof(*ctx));
1642 if (!ctx) {
1643 spdk_bs_user_op_abort(op);
1644 return;
7c673cae
FG
1645 }
1646
11fdf7f2 1647 assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0);
7c673cae 1648
11fdf7f2
TL
1649 ctx->blob = blob;
1650 ctx->page = cluster_start_page;
7c673cae 1651
11fdf7f2
TL
1652 if (blob->parent_id != SPDK_BLOBID_INVALID) {
1653 ctx->buf = spdk_dma_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, NULL);
1654 if (!ctx->buf) {
1655 SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n",
1656 blob->bs->cluster_sz);
1657 free(ctx);
1658 spdk_bs_user_op_abort(op);
1659 return;
1660 }
1661 }
1662
1663 rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false);
1664 if (rc != 0) {
1665 spdk_dma_free(ctx->buf);
1666 free(ctx);
1667 spdk_bs_user_op_abort(op);
1668 return;
1669 }
1670
1671 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1672 cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl;
1673 cpl.u.blob_basic.cb_arg = ctx;
1674
1675 ctx->seq = spdk_bs_sequence_start(_ch, &cpl);
1676 if (!ctx->seq) {
1677 _spdk_bs_release_cluster(blob->bs, ctx->new_cluster);
1678 spdk_dma_free(ctx->buf);
1679 free(ctx);
1680 spdk_bs_user_op_abort(op);
1681 return;
1682 }
7c673cae 1683
11fdf7f2
TL
1684 /* Queue the user op to block other incoming operations */
1685 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1686
1687 if (blob->parent_id != SPDK_BLOBID_INVALID) {
1688 /* Read cluster from backing device */
1689 spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf,
1690 _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page),
1691 _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz),
1692 _spdk_blob_write_copy, ctx);
1693 } else {
1694 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1695 _spdk_blob_insert_cluster_cpl, ctx);
1696 }
7c673cae
FG
1697}
1698
11fdf7f2
TL
1699static void
1700_spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length,
1701 uint64_t *lba, uint32_t *lba_count)
1702{
1703 *lba_count = length;
7c673cae 1704
11fdf7f2
TL
1705 if (!_spdk_bs_io_unit_is_allocated(blob, io_unit)) {
1706 assert(blob->back_bs_dev != NULL);
1707 *lba = _spdk_bs_io_unit_to_back_dev_lba(blob, io_unit);
1708 *lba_count = _spdk_bs_io_unit_to_back_dev_lba(blob, *lba_count);
1709 } else {
1710 *lba = _spdk_bs_blob_io_unit_to_lba(blob, io_unit);
1711 }
1712}
7c673cae 1713
11fdf7f2
TL
1714struct op_split_ctx {
1715 struct spdk_blob *blob;
1716 struct spdk_io_channel *channel;
1717 uint64_t io_unit_offset;
1718 uint64_t io_units_remaining;
1719 void *curr_payload;
1720 enum spdk_blob_op_type op_type;
1721 spdk_bs_sequence_t *seq;
7c673cae
FG
1722};
1723
1724static void
11fdf7f2 1725_spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno)
7c673cae 1726{
11fdf7f2
TL
1727 struct op_split_ctx *ctx = cb_arg;
1728 struct spdk_blob *blob = ctx->blob;
1729 struct spdk_io_channel *ch = ctx->channel;
1730 enum spdk_blob_op_type op_type = ctx->op_type;
1731 uint8_t *buf = ctx->curr_payload;
1732 uint64_t offset = ctx->io_unit_offset;
1733 uint64_t length = ctx->io_units_remaining;
1734 uint64_t op_length;
1735
1736 if (bserrno != 0 || ctx->io_units_remaining == 0) {
1737 spdk_bs_sequence_finish(ctx->seq, bserrno);
1738 free(ctx);
1739 return;
1740 }
7c673cae 1741
11fdf7f2
TL
1742 op_length = spdk_min(length, _spdk_bs_num_io_units_to_cluster_boundary(blob,
1743 offset));
7c673cae 1744
11fdf7f2
TL
1745 /* Update length and payload for next operation */
1746 ctx->io_units_remaining -= op_length;
1747 ctx->io_unit_offset += op_length;
1748 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1749 ctx->curr_payload += op_length * blob->bs->io_unit_size;
1750 }
1751
1752 switch (op_type) {
1753 case SPDK_BLOB_READ:
1754 spdk_blob_io_read(blob, ch, buf, offset, op_length,
1755 _spdk_blob_request_submit_op_split_next, ctx);
1756 break;
1757 case SPDK_BLOB_WRITE:
1758 spdk_blob_io_write(blob, ch, buf, offset, op_length,
1759 _spdk_blob_request_submit_op_split_next, ctx);
1760 break;
1761 case SPDK_BLOB_UNMAP:
1762 spdk_blob_io_unmap(blob, ch, offset, op_length,
1763 _spdk_blob_request_submit_op_split_next, ctx);
1764 break;
1765 case SPDK_BLOB_WRITE_ZEROES:
1766 spdk_blob_io_write_zeroes(blob, ch, offset, op_length,
1767 _spdk_blob_request_submit_op_split_next, ctx);
1768 break;
1769 case SPDK_BLOB_READV:
1770 case SPDK_BLOB_WRITEV:
1771 SPDK_ERRLOG("readv/write not valid for %s\n", __func__);
1772 spdk_bs_sequence_finish(ctx->seq, -EINVAL);
1773 free(ctx);
1774 break;
1775 }
1776}
1777
1778static void
1779_spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob,
1780 void *payload, uint64_t offset, uint64_t length,
1781 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1782{
1783 struct op_split_ctx *ctx;
1784 spdk_bs_sequence_t *seq;
1785 struct spdk_bs_cpl cpl;
1786
1787 assert(blob != NULL);
1788
1789 ctx = calloc(1, sizeof(struct op_split_ctx));
1790 if (ctx == NULL) {
1791 cb_fn(cb_arg, -ENOMEM);
1792 return;
1793 }
1794
1795 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1796 cpl.u.blob_basic.cb_fn = cb_fn;
1797 cpl.u.blob_basic.cb_arg = cb_arg;
1798
1799 seq = spdk_bs_sequence_start(ch, &cpl);
1800 if (!seq) {
7c673cae 1801 free(ctx);
11fdf7f2 1802 cb_fn(cb_arg, -ENOMEM);
7c673cae
FG
1803 return;
1804 }
1805
11fdf7f2
TL
1806 ctx->blob = blob;
1807 ctx->channel = ch;
1808 ctx->curr_payload = payload;
1809 ctx->io_unit_offset = offset;
1810 ctx->io_units_remaining = length;
1811 ctx->op_type = op_type;
1812 ctx->seq = seq;
1813
1814 _spdk_blob_request_submit_op_split_next(ctx, 0);
1815}
1816
1817static void
1818_spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob,
1819 void *payload, uint64_t offset, uint64_t length,
1820 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1821{
1822 struct spdk_bs_cpl cpl;
1823 uint64_t lba;
1824 uint32_t lba_count;
1825
1826 assert(blob != NULL);
1827
1828 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1829 cpl.u.blob_basic.cb_fn = cb_fn;
1830 cpl.u.blob_basic.cb_arg = cb_arg;
1831
1832 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
1833
1834 if (blob->frozen_refcnt) {
1835 /* This blob I/O is frozen */
1836 spdk_bs_user_op_t *op;
1837 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch);
1838
1839 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
1840 if (!op) {
1841 cb_fn(cb_arg, -ENOMEM);
1842 return;
1843 }
1844
1845 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
1846
1847 return;
1848 }
1849
1850 switch (op_type) {
1851 case SPDK_BLOB_READ: {
1852 spdk_bs_batch_t *batch;
1853
1854 batch = spdk_bs_batch_open(_ch, &cpl);
1855 if (!batch) {
1856 cb_fn(cb_arg, -ENOMEM);
1857 return;
1858 }
1859
1860 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
1861 /* Read from the blob */
1862 spdk_bs_batch_read_dev(batch, payload, lba, lba_count);
1863 } else {
1864 /* Read from the backing block device */
1865 spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count);
1866 }
1867
1868 spdk_bs_batch_close(batch);
1869 break;
1870 }
1871 case SPDK_BLOB_WRITE:
1872 case SPDK_BLOB_WRITE_ZEROES: {
1873 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
1874 /* Write to the blob */
1875 spdk_bs_batch_t *batch;
1876
1877 if (lba_count == 0) {
1878 cb_fn(cb_arg, 0);
1879 return;
7c673cae 1880 }
11fdf7f2
TL
1881
1882 batch = spdk_bs_batch_open(_ch, &cpl);
1883 if (!batch) {
1884 cb_fn(cb_arg, -ENOMEM);
1885 return;
1886 }
1887
1888 if (op_type == SPDK_BLOB_WRITE) {
1889 spdk_bs_batch_write_dev(batch, payload, lba, lba_count);
1890 } else {
1891 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1892 }
1893
1894 spdk_bs_batch_close(batch);
1895 } else {
1896 /* Queue this operation and allocate the cluster */
1897 spdk_bs_user_op_t *op;
1898
1899 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
1900 if (!op) {
1901 cb_fn(cb_arg, -ENOMEM);
1902 return;
1903 }
1904
1905 _spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op);
7c673cae 1906 }
11fdf7f2 1907 break;
7c673cae 1908 }
11fdf7f2
TL
1909 case SPDK_BLOB_UNMAP: {
1910 spdk_bs_batch_t *batch;
7c673cae 1911
11fdf7f2
TL
1912 batch = spdk_bs_batch_open(_ch, &cpl);
1913 if (!batch) {
1914 cb_fn(cb_arg, -ENOMEM);
1915 return;
1916 }
7c673cae 1917
11fdf7f2
TL
1918 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
1919 spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1920 }
1921
1922 spdk_bs_batch_close(batch);
1923 break;
1924 }
1925 case SPDK_BLOB_READV:
1926 case SPDK_BLOB_WRITEV:
1927 SPDK_ERRLOG("readv/write not valid\n");
1928 cb_fn(cb_arg, -EINVAL);
1929 break;
1930 }
7c673cae
FG
1931}
1932
1933static void
11fdf7f2
TL
1934_spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1935 void *payload, uint64_t offset, uint64_t length,
1936 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
7c673cae 1937{
11fdf7f2 1938 assert(blob != NULL);
7c673cae 1939
11fdf7f2
TL
1940 if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1941 cb_fn(cb_arg, -EPERM);
1942 return;
1943 }
7c673cae 1944
11fdf7f2
TL
1945 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) {
1946 cb_fn(cb_arg, -EINVAL);
1947 return;
1948 }
1949 if (length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset)) {
1950 _spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length,
1951 cb_fn, cb_arg, op_type);
1952 } else {
1953 _spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length,
1954 cb_fn, cb_arg, op_type);
1955 }
1956}
1957
1958struct rw_iov_ctx {
1959 struct spdk_blob *blob;
1960 struct spdk_io_channel *channel;
1961 spdk_blob_op_complete cb_fn;
1962 void *cb_arg;
1963 bool read;
1964 int iovcnt;
1965 struct iovec *orig_iov;
1966 uint64_t io_unit_offset;
1967 uint64_t io_units_remaining;
1968 uint64_t io_units_done;
1969 struct iovec iov[0];
1970};
1971
1972static void
1973_spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1974{
1975 assert(cb_arg == NULL);
1976 spdk_bs_sequence_finish(seq, bserrno);
1977}
1978
1979static void
1980_spdk_rw_iov_split_next(void *cb_arg, int bserrno)
1981{
1982 struct rw_iov_ctx *ctx = cb_arg;
1983 struct spdk_blob *blob = ctx->blob;
1984 struct iovec *iov, *orig_iov;
1985 int iovcnt;
1986 size_t orig_iovoff;
1987 uint64_t io_units_count, io_units_to_boundary, io_unit_offset;
1988 uint64_t byte_count;
1989
1990 if (bserrno != 0 || ctx->io_units_remaining == 0) {
1991 ctx->cb_fn(ctx->cb_arg, bserrno);
7c673cae 1992 free(ctx);
7c673cae
FG
1993 return;
1994 }
11fdf7f2
TL
1995
1996 io_unit_offset = ctx->io_unit_offset;
1997 io_units_to_boundary = _spdk_bs_num_io_units_to_cluster_boundary(blob, io_unit_offset);
1998 io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary);
1999 /*
2000 * Get index and offset into the original iov array for our current position in the I/O sequence.
2001 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
2002 * point to the current position in the I/O sequence.
2003 */
2004 byte_count = ctx->io_units_done * blob->bs->io_unit_size;
2005 orig_iov = &ctx->orig_iov[0];
2006 orig_iovoff = 0;
2007 while (byte_count > 0) {
2008 if (byte_count >= orig_iov->iov_len) {
2009 byte_count -= orig_iov->iov_len;
2010 orig_iov++;
2011 } else {
2012 orig_iovoff = byte_count;
2013 byte_count = 0;
2014 }
2015 }
2016
2017 /*
2018 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many
2019 * bytes of this next I/O remain to be accounted for in the new iov array.
2020 */
2021 byte_count = io_units_count * blob->bs->io_unit_size;
2022 iov = &ctx->iov[0];
2023 iovcnt = 0;
2024 while (byte_count > 0) {
2025 assert(iovcnt < ctx->iovcnt);
2026 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
2027 iov->iov_base = orig_iov->iov_base + orig_iovoff;
2028 byte_count -= iov->iov_len;
2029 orig_iovoff = 0;
2030 orig_iov++;
2031 iov++;
2032 iovcnt++;
2033 }
2034
2035 ctx->io_unit_offset += io_units_count;
2036 ctx->io_units_remaining -= io_units_count;
2037 ctx->io_units_done += io_units_count;
2038 iov = &ctx->iov[0];
2039
2040 if (ctx->read) {
2041 spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset,
2042 io_units_count, _spdk_rw_iov_split_next, ctx);
2043 } else {
2044 spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset,
2045 io_units_count, _spdk_rw_iov_split_next, ctx);
2046 }
2047}
2048
2049static void
2050_spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel,
2051 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2052 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
2053{
2054 struct spdk_bs_cpl cpl;
2055
2056 assert(blob != NULL);
2057
2058 if (!read && blob->data_ro) {
2059 cb_fn(cb_arg, -EPERM);
2060 return;
2061 }
2062
2063 if (length == 0) {
2064 cb_fn(cb_arg, 0);
2065 return;
2066 }
2067
2068 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) {
2069 cb_fn(cb_arg, -EINVAL);
2070 return;
2071 }
2072
2073 /*
2074 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
2075 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary,
2076 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster
2077 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
2078 * to allocate a separate iov array and split the I/O such that none of the resulting
2079 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel)
2080 * but since this case happens very infrequently, any performance impact will be negligible.
2081 *
2082 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
2083 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
2084 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called
2085 * when the batch was completed, to allow for freeing the memory for the iov arrays.
2086 */
2087 if (spdk_likely(length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset))) {
2088 uint32_t lba_count;
2089 uint64_t lba;
2090
2091 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
2092
2093 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2094 cpl.u.blob_basic.cb_fn = cb_fn;
2095 cpl.u.blob_basic.cb_arg = cb_arg;
2096 if (blob->frozen_refcnt) {
2097 /* This blob I/O is frozen */
2098 spdk_bs_user_op_t *op;
2099 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel);
2100
2101 op = spdk_bs_user_op_alloc(_channel, &cpl, read, blob, iov, iovcnt, offset, length);
2102 if (!op) {
2103 cb_fn(cb_arg, -ENOMEM);
2104 return;
2105 }
2106
2107 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
2108
2109 return;
2110 }
2111
2112 if (read) {
2113 spdk_bs_sequence_t *seq;
2114
2115 seq = spdk_bs_sequence_start(_channel, &cpl);
2116 if (!seq) {
2117 cb_fn(cb_arg, -ENOMEM);
2118 return;
2119 }
2120
2121 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2122 spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
2123 } else {
2124 spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count,
2125 _spdk_rw_iov_done, NULL);
2126 }
2127 } else {
2128 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2129 spdk_bs_sequence_t *seq;
2130
2131 seq = spdk_bs_sequence_start(_channel, &cpl);
2132 if (!seq) {
2133 cb_fn(cb_arg, -ENOMEM);
2134 return;
2135 }
2136
2137 spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
2138 } else {
2139 /* Queue this operation and allocate the cluster */
2140 spdk_bs_user_op_t *op;
2141
2142 op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset,
2143 length);
2144 if (!op) {
2145 cb_fn(cb_arg, -ENOMEM);
2146 return;
2147 }
2148
2149 _spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op);
2150 }
2151 }
2152 } else {
2153 struct rw_iov_ctx *ctx;
2154
2155 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
2156 if (ctx == NULL) {
2157 cb_fn(cb_arg, -ENOMEM);
2158 return;
2159 }
2160
2161 ctx->blob = blob;
2162 ctx->channel = _channel;
2163 ctx->cb_fn = cb_fn;
2164 ctx->cb_arg = cb_arg;
2165 ctx->read = read;
2166 ctx->orig_iov = iov;
2167 ctx->iovcnt = iovcnt;
2168 ctx->io_unit_offset = offset;
2169 ctx->io_units_remaining = length;
2170 ctx->io_units_done = 0;
2171
2172 _spdk_rw_iov_split_next(ctx, 0);
2173 }
2174}
2175
2176static struct spdk_blob *
2177_spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
2178{
2179 struct spdk_blob *blob;
2180
2181 TAILQ_FOREACH(blob, &bs->blobs, link) {
2182 if (blob->id == blobid) {
2183 return blob;
2184 }
2185 }
2186
2187 return NULL;
2188}
2189
2190static int
2191_spdk_bs_channel_create(void *io_device, void *ctx_buf)
2192{
2193 struct spdk_blob_store *bs = io_device;
2194 struct spdk_bs_channel *channel = ctx_buf;
2195 struct spdk_bs_dev *dev;
2196 uint32_t max_ops = bs->max_channel_ops;
2197 uint32_t i;
2198
2199 dev = bs->dev;
2200
2201 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
2202 if (!channel->req_mem) {
2203 return -1;
2204 }
2205
2206 TAILQ_INIT(&channel->reqs);
2207
2208 for (i = 0; i < max_ops; i++) {
2209 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
2210 }
2211
2212 channel->bs = bs;
2213 channel->dev = dev;
2214 channel->dev_channel = dev->create_channel(dev);
2215
2216 if (!channel->dev_channel) {
2217 SPDK_ERRLOG("Failed to create device channel.\n");
2218 free(channel->req_mem);
2219 return -1;
2220 }
2221
2222 TAILQ_INIT(&channel->need_cluster_alloc);
2223 TAILQ_INIT(&channel->queued_io);
2224
2225 return 0;
2226}
2227
2228static void
2229_spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
2230{
2231 struct spdk_bs_channel *channel = ctx_buf;
2232 spdk_bs_user_op_t *op;
2233
2234 while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) {
2235 op = TAILQ_FIRST(&channel->need_cluster_alloc);
2236 TAILQ_REMOVE(&channel->need_cluster_alloc, op, link);
2237 spdk_bs_user_op_abort(op);
2238 }
2239
2240 while (!TAILQ_EMPTY(&channel->queued_io)) {
2241 op = TAILQ_FIRST(&channel->queued_io);
2242 TAILQ_REMOVE(&channel->queued_io, op, link);
2243 spdk_bs_user_op_abort(op);
2244 }
2245
2246 free(channel->req_mem);
2247 channel->dev->destroy_channel(channel->dev, channel->dev_channel);
2248}
2249
2250static void
2251_spdk_bs_dev_destroy(void *io_device)
2252{
2253 struct spdk_blob_store *bs = io_device;
2254 struct spdk_blob *blob, *blob_tmp;
2255
2256 bs->dev->destroy(bs->dev);
2257
2258 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
2259 TAILQ_REMOVE(&bs->blobs, blob, link);
2260 _spdk_blob_free(blob);
2261 }
2262
2263 pthread_mutex_destroy(&bs->used_clusters_mutex);
2264
2265 spdk_bit_array_free(&bs->used_blobids);
2266 spdk_bit_array_free(&bs->used_md_pages);
2267 spdk_bit_array_free(&bs->used_clusters);
2268 /*
2269 * If this function is called for any reason except a successful unload,
2270 * the unload_cpl type will be NONE and this will be a nop.
2271 */
2272 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
2273
2274 free(bs);
2275}
2276
2277static int
2278_spdk_bs_blob_list_add(struct spdk_blob *blob)
2279{
2280 spdk_blob_id snapshot_id;
2281 struct spdk_blob_list *snapshot_entry = NULL;
2282 struct spdk_blob_list *clone_entry = NULL;
2283
2284 assert(blob != NULL);
2285
2286 snapshot_id = blob->parent_id;
2287 if (snapshot_id == SPDK_BLOBID_INVALID) {
2288 return 0;
2289 }
2290
2291 TAILQ_FOREACH(snapshot_entry, &blob->bs->snapshots, link) {
2292 if (snapshot_entry->id == snapshot_id) {
2293 break;
2294 }
2295 }
2296
2297 if (snapshot_entry == NULL) {
2298 /* Snapshot not found */
2299 snapshot_entry = calloc(1, sizeof(struct spdk_blob_list));
2300 if (snapshot_entry == NULL) {
2301 return -ENOMEM;
2302 }
2303 snapshot_entry->id = snapshot_id;
2304 TAILQ_INIT(&snapshot_entry->clones);
2305 TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link);
2306 } else {
2307 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
2308 if (clone_entry->id == blob->id) {
2309 break;
2310 }
2311 }
2312 }
2313
2314 if (clone_entry == NULL) {
2315 /* Clone not found */
2316 clone_entry = calloc(1, sizeof(struct spdk_blob_list));
2317 if (clone_entry == NULL) {
2318 return -ENOMEM;
2319 }
2320 clone_entry->id = blob->id;
2321 TAILQ_INIT(&clone_entry->clones);
2322 TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link);
2323 snapshot_entry->clone_count++;
2324 }
2325
2326 return 0;
2327}
2328
2329static int
2330_spdk_bs_blob_list_remove(struct spdk_blob *blob)
2331{
2332 struct spdk_blob_list *snapshot_entry = NULL;
2333 struct spdk_blob_list *clone_entry = NULL;
2334 spdk_blob_id snapshot_id;
2335
2336 assert(blob != NULL);
2337
2338 snapshot_id = blob->parent_id;
2339 if (snapshot_id == SPDK_BLOBID_INVALID) {
2340 return 0;
2341 }
2342
2343 TAILQ_FOREACH(snapshot_entry, &blob->bs->snapshots, link) {
2344 if (snapshot_entry->id == snapshot_id) {
2345 break;
2346 }
2347 }
2348
2349 assert(snapshot_entry != NULL);
2350
2351 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
2352 if (clone_entry->id == blob->id) {
2353 break;
2354 }
2355 }
2356
2357 assert(clone_entry != NULL);
2358
2359 blob->parent_id = SPDK_BLOBID_INVALID;
2360 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
2361 free(clone_entry);
2362
2363 snapshot_entry->clone_count--;
2364 if (snapshot_entry->clone_count == 0) {
2365 /* Snapshot have no more clones */
2366 TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link);
2367 free(snapshot_entry);
2368 }
2369
2370 return 0;
2371}
2372
2373static int
2374_spdk_bs_blob_list_free(struct spdk_blob_store *bs)
2375{
2376 struct spdk_blob_list *snapshot_entry;
2377 struct spdk_blob_list *snapshot_entry_tmp;
2378 struct spdk_blob_list *clone_entry;
2379 struct spdk_blob_list *clone_entry_tmp;
2380
2381 TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) {
2382 TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) {
2383 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
2384 free(clone_entry);
2385 }
2386 TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link);
2387 free(snapshot_entry);
2388 }
2389
2390 return 0;
2391}
2392
2393static void
2394_spdk_bs_free(struct spdk_blob_store *bs)
2395{
2396 _spdk_bs_blob_list_free(bs);
2397
2398 spdk_bs_unregister_md_thread(bs);
2399 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
2400}
2401
2402void
2403spdk_bs_opts_init(struct spdk_bs_opts *opts)
2404{
2405 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
2406 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
2407 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
2408 opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS;
2409 memset(&opts->bstype, 0, sizeof(opts->bstype));
2410 opts->iter_cb_fn = NULL;
2411 opts->iter_cb_arg = NULL;
2412}
2413
2414static int
2415_spdk_bs_opts_verify(struct spdk_bs_opts *opts)
2416{
2417 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
2418 opts->max_channel_ops == 0) {
2419 SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
2420 return -1;
2421 }
2422
2423 return 0;
2424}
2425
2426static int
2427_spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs)
2428{
2429 struct spdk_blob_store *bs;
2430 uint64_t dev_size;
2431 int rc;
2432
2433 dev_size = dev->blocklen * dev->blockcnt;
2434 if (dev_size < opts->cluster_sz) {
2435 /* Device size cannot be smaller than cluster size of blobstore */
2436 SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n",
2437 dev_size, opts->cluster_sz);
2438 return -ENOSPC;
2439 }
2440 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
2441 /* Cluster size cannot be smaller than page size */
2442 SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n",
2443 opts->cluster_sz, SPDK_BS_PAGE_SIZE);
2444 return -EINVAL;
2445 }
2446 bs = calloc(1, sizeof(struct spdk_blob_store));
2447 if (!bs) {
2448 return -ENOMEM;
2449 }
2450
2451 TAILQ_INIT(&bs->blobs);
2452 TAILQ_INIT(&bs->snapshots);
2453 bs->dev = dev;
2454 bs->md_thread = spdk_get_thread();
2455 assert(bs->md_thread != NULL);
2456
2457 /*
2458 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
2459 * even multiple of the cluster size.
2460 */
2461 bs->cluster_sz = opts->cluster_sz;
2462 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
2463 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2464 bs->num_free_clusters = bs->total_clusters;
2465 bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
2466 bs->io_unit_size = dev->blocklen;
2467 if (bs->used_clusters == NULL) {
2468 free(bs);
2469 return -ENOMEM;
2470 }
2471
2472 bs->max_channel_ops = opts->max_channel_ops;
2473 bs->super_blob = SPDK_BLOBID_INVALID;
2474 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
2475
2476 /* The metadata is assumed to be at least 1 page */
2477 bs->used_md_pages = spdk_bit_array_create(1);
2478 bs->used_blobids = spdk_bit_array_create(0);
2479
2480 pthread_mutex_init(&bs->used_clusters_mutex, NULL);
2481
2482 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
2483 sizeof(struct spdk_bs_channel), "blobstore");
2484 rc = spdk_bs_register_md_thread(bs);
2485 if (rc == -1) {
2486 spdk_io_device_unregister(bs, NULL);
2487 pthread_mutex_destroy(&bs->used_clusters_mutex);
2488 spdk_bit_array_free(&bs->used_blobids);
2489 spdk_bit_array_free(&bs->used_md_pages);
2490 spdk_bit_array_free(&bs->used_clusters);
2491 free(bs);
2492 /* FIXME: this is a lie but don't know how to get a proper error code here */
2493 return -ENOMEM;
2494 }
2495
2496 *_bs = bs;
2497 return 0;
2498}
2499
2500/* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
2501
2502struct spdk_bs_load_ctx {
2503 struct spdk_blob_store *bs;
2504 struct spdk_bs_super_block *super;
2505
2506 struct spdk_bs_md_mask *mask;
2507 bool in_page_chain;
2508 uint32_t page_index;
2509 uint32_t cur_page;
2510 struct spdk_blob_md_page *page;
2511 bool is_load;
2512
2513 spdk_bs_sequence_t *seq;
2514 spdk_blob_op_with_handle_complete iter_cb_fn;
2515 void *iter_cb_arg;
2516};
2517
2518static void
2519_spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2520{
2521 assert(bserrno != 0);
2522
2523 spdk_dma_free(ctx->super);
2524 spdk_bs_sequence_finish(seq, bserrno);
2525 /*
2526 * Only free the blobstore when a load fails. If an unload fails (for some reason)
2527 * we want to keep the blobstore in case the caller wants to try again.
2528 */
2529 if (ctx->is_load) {
2530 _spdk_bs_free(ctx->bs);
2531 }
2532 free(ctx);
2533}
2534
2535static void
2536_spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
2537{
2538 uint32_t i = 0;
2539
2540 while (true) {
2541 i = spdk_bit_array_find_first_set(array, i);
2542 if (i >= mask->length) {
2543 break;
2544 }
2545 mask->mask[i / 8] |= 1U << (i % 8);
2546 i++;
2547 }
2548}
2549
2550static int
2551_spdk_bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask)
2552{
2553 struct spdk_bit_array *array;
2554 uint32_t i;
2555
2556 if (spdk_bit_array_resize(array_ptr, mask->length) < 0) {
2557 return -ENOMEM;
2558 }
2559
2560 array = *array_ptr;
2561 for (i = 0; i < mask->length; i++) {
2562 if (mask->mask[i / 8] & (1U << (i % 8))) {
2563 spdk_bit_array_set(array, i);
2564 }
2565 }
2566
2567 return 0;
2568}
2569
2570static void
2571_spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
2572 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
2573{
2574 /* Update the values in the super block */
2575 super->super_blob = bs->super_blob;
2576 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
2577 super->crc = _spdk_blob_md_page_calc_crc(super);
2578 spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0),
2579 _spdk_bs_byte_to_lba(bs, sizeof(*super)),
2580 cb_fn, cb_arg);
2581}
2582
2583static void
2584_spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2585{
2586 struct spdk_bs_load_ctx *ctx = arg;
2587 uint64_t mask_size, lba, lba_count;
2588
2589 /* Write out the used clusters mask */
2590 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2591 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2592 if (!ctx->mask) {
2593 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2594 return;
2595 }
2596
2597 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
2598 ctx->mask->length = ctx->bs->total_clusters;
2599 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
2600
2601 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
2602 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2603 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2604 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2605}
2606
2607static void
2608_spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2609{
2610 struct spdk_bs_load_ctx *ctx = arg;
2611 uint64_t mask_size, lba, lba_count;
2612
2613 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2614 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2615 if (!ctx->mask) {
2616 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2617 return;
2618 }
2619
2620 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
2621 ctx->mask->length = ctx->super->md_len;
2622 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
2623
2624 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
2625 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2626 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2627 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2628}
2629
2630static void
2631_spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2632{
2633 struct spdk_bs_load_ctx *ctx = arg;
2634 uint64_t mask_size, lba, lba_count;
2635
2636 if (ctx->super->used_blobid_mask_len == 0) {
2637 /*
2638 * This is a pre-v3 on-disk format where the blobid mask does not get
2639 * written to disk.
2640 */
2641 cb_fn(seq, arg, 0);
2642 return;
2643 }
2644
2645 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2646 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2647 if (!ctx->mask) {
2648 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2649 return;
2650 }
2651
2652 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
2653 ctx->mask->length = ctx->super->md_len;
2654 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
2655
2656 _spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
2657 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2658 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2659 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2660}
2661
2662static void
2663_spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno)
2664{
2665 struct spdk_bs_load_ctx *ctx = arg;
2666
2667 if (bserrno == 0) {
2668 if (ctx->iter_cb_fn) {
2669 ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0);
2670 }
2671 _spdk_bs_blob_list_add(blob);
2672 spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx);
2673 return;
2674 }
2675
2676 if (bserrno == -ENOENT) {
2677 bserrno = 0;
2678 } else {
2679 /*
2680 * This case needs to be looked at further. Same problem
2681 * exists with applications that rely on explicit blob
2682 * iteration. We should just skip the blob that failed
2683 * to load and continue on to the next one.
2684 */
2685 SPDK_ERRLOG("Error in iterating blobs\n");
2686 }
2687
2688 ctx->iter_cb_fn = NULL;
2689
2690 spdk_dma_free(ctx->super);
2691 spdk_dma_free(ctx->mask);
2692 spdk_bs_sequence_finish(ctx->seq, bserrno);
2693 free(ctx);
2694}
2695
2696static void
2697_spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2698{
2699 ctx->seq = seq;
2700 spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx);
2701}
2702
2703static void
2704_spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2705{
2706 struct spdk_bs_load_ctx *ctx = cb_arg;
2707 int rc;
2708
2709 /* The type must be correct */
2710 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
2711
2712 /* The length of the mask (in bits) must not be greater than
2713 * the length of the buffer (converted to bits) */
2714 assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
2715
2716 /* The length of the mask must be exactly equal to the size
2717 * (in pages) of the metadata region */
2718 assert(ctx->mask->length == ctx->super->md_len);
2719
2720 rc = _spdk_bs_load_mask(&ctx->bs->used_blobids, ctx->mask);
2721 if (rc < 0) {
2722 spdk_dma_free(ctx->mask);
2723 _spdk_bs_load_ctx_fail(seq, ctx, rc);
2724 return;
2725 }
2726
2727 _spdk_bs_load_complete(seq, ctx, bserrno);
2728}
2729
2730static void
2731_spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2732{
2733 struct spdk_bs_load_ctx *ctx = cb_arg;
2734 uint64_t lba, lba_count, mask_size;
2735 int rc;
2736
2737 /* The type must be correct */
2738 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
2739 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2740 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
2741 struct spdk_blob_md_page) * 8));
2742 /* The length of the mask must be exactly equal to the total number of clusters */
2743 assert(ctx->mask->length == ctx->bs->total_clusters);
2744
2745 rc = _spdk_bs_load_mask(&ctx->bs->used_clusters, ctx->mask);
2746 if (rc < 0) {
2747 spdk_dma_free(ctx->mask);
2748 _spdk_bs_load_ctx_fail(seq, ctx, rc);
2749 return;
2750 }
2751
2752 ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters);
2753 assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters);
2754
2755 spdk_dma_free(ctx->mask);
2756
2757 /* Read the used blobids mask */
2758 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2759 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2760 if (!ctx->mask) {
2761 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2762 return;
2763 }
2764 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2765 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2766 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2767 _spdk_bs_load_used_blobids_cpl, ctx);
2768}
2769
2770static void
2771_spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2772{
2773 struct spdk_bs_load_ctx *ctx = cb_arg;
2774 uint64_t lba, lba_count, mask_size;
2775 int rc;
2776
2777 /* The type must be correct */
2778 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
2779 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2780 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
2781 8));
2782 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
2783 assert(ctx->mask->length == ctx->super->md_len);
2784
2785 rc = _spdk_bs_load_mask(&ctx->bs->used_md_pages, ctx->mask);
2786 if (rc < 0) {
2787 spdk_dma_free(ctx->mask);
2788 _spdk_bs_load_ctx_fail(seq, ctx, rc);
2789 return;
2790 }
2791
2792 spdk_dma_free(ctx->mask);
2793
2794 /* Read the used clusters mask */
2795 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2796 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2797 if (!ctx->mask) {
2798 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2799 return;
2800 }
2801 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2802 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2803 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2804 _spdk_bs_load_used_clusters_cpl, ctx);
2805}
2806
2807static void
2808_spdk_bs_load_read_used_pages(spdk_bs_sequence_t *seq, void *cb_arg)
2809{
2810 struct spdk_bs_load_ctx *ctx = cb_arg;
2811 uint64_t lba, lba_count, mask_size;
2812
2813 /* Read the used pages mask */
2814 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2815 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2816 if (!ctx->mask) {
2817 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2818 return;
2819 }
2820
2821 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2822 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2823 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2824 _spdk_bs_load_used_pages_cpl, ctx);
2825}
2826
2827static int
2828_spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
2829{
2830 struct spdk_blob_md_descriptor *desc;
2831 size_t cur_desc = 0;
2832
2833 desc = (struct spdk_blob_md_descriptor *)page->descriptors;
2834 while (cur_desc < sizeof(page->descriptors)) {
2835 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
2836 if (desc->length == 0) {
2837 /* If padding and length are 0, this terminates the page */
2838 break;
2839 }
2840 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
2841 struct spdk_blob_md_descriptor_extent *desc_extent;
2842 unsigned int i, j;
2843 unsigned int cluster_count = 0;
2844 uint32_t cluster_idx;
2845
2846 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
2847
2848 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
2849 for (j = 0; j < desc_extent->extents[i].length; j++) {
2850 cluster_idx = desc_extent->extents[i].cluster_idx;
2851 /*
2852 * cluster_idx = 0 means an unallocated cluster - don't mark that
2853 * in the used cluster map.
2854 */
2855 if (cluster_idx != 0) {
2856 spdk_bit_array_set(bs->used_clusters, cluster_idx + j);
2857 if (bs->num_free_clusters == 0) {
2858 return -ENOSPC;
2859 }
2860 bs->num_free_clusters--;
2861 }
2862 cluster_count++;
2863 }
2864 }
2865 if (cluster_count == 0) {
2866 return -EINVAL;
2867 }
2868 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
2869 /* Skip this item */
2870 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
2871 /* Skip this item */
2872 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
2873 /* Skip this item */
2874 } else {
2875 /* Error */
2876 return -EINVAL;
2877 }
2878 /* Advance to the next descriptor */
2879 cur_desc += sizeof(*desc) + desc->length;
2880 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
2881 break;
2882 }
2883 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
2884 }
2885 return 0;
2886}
2887
2888static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
2889{
2890 uint32_t crc;
2891
2892 crc = _spdk_blob_md_page_calc_crc(ctx->page);
2893 if (crc != ctx->page->crc) {
2894 return false;
2895 }
2896
2897 if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
2898 return false;
2899 }
2900 return true;
2901}
2902
2903static void
2904_spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
2905
2906static void
2907_spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2908{
2909 struct spdk_bs_load_ctx *ctx = cb_arg;
2910
2911 _spdk_bs_load_complete(seq, ctx, bserrno);
2912}
2913
2914static void
2915_spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2916{
2917 struct spdk_bs_load_ctx *ctx = cb_arg;
2918
2919 spdk_dma_free(ctx->mask);
2920 ctx->mask = NULL;
2921
2922 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
2923}
2924
2925static void
2926_spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2927{
2928 struct spdk_bs_load_ctx *ctx = cb_arg;
2929
2930 spdk_dma_free(ctx->mask);
2931 ctx->mask = NULL;
2932
2933 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl);
2934}
2935
2936static void
2937_spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2938{
2939 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
2940}
2941
2942static void
2943_spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2944{
2945 struct spdk_bs_load_ctx *ctx = cb_arg;
2946 uint64_t num_md_clusters;
2947 uint64_t i;
2948 uint32_t page_num;
2949
2950 if (bserrno != 0) {
2951 _spdk_bs_load_ctx_fail(seq, ctx, bserrno);
2952 return;
2953 }
2954
2955 page_num = ctx->cur_page;
2956 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
2957 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
2958 spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
2959 if (ctx->page->sequence_num == 0) {
2960 spdk_bit_array_set(ctx->bs->used_blobids, page_num);
2961 }
2962 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
2963 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2964 return;
2965 }
2966 if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
2967 ctx->in_page_chain = true;
2968 ctx->cur_page = ctx->page->next;
2969 _spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2970 return;
2971 }
2972 }
2973 }
2974
2975 ctx->in_page_chain = false;
2976
2977 do {
2978 ctx->page_index++;
2979 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
2980
2981 if (ctx->page_index < ctx->super->md_len) {
2982 ctx->cur_page = ctx->page_index;
2983 _spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2984 } else {
2985 /* Claim all of the clusters used by the metadata */
2986 num_md_clusters = divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster);
2987 for (i = 0; i < num_md_clusters; i++) {
2988 _spdk_bs_claim_cluster(ctx->bs, i);
2989 }
2990 spdk_dma_free(ctx->page);
2991 _spdk_bs_load_write_used_md(seq, ctx, bserrno);
2992 }
2993}
2994
2995static void
2996_spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
2997{
2998 struct spdk_bs_load_ctx *ctx = cb_arg;
2999 uint64_t lba;
3000
3001 assert(ctx->cur_page < ctx->super->md_len);
3002 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
3003 spdk_bs_sequence_read_dev(seq, ctx->page, lba,
3004 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
3005 _spdk_bs_load_replay_md_cpl, ctx);
3006}
3007
3008static void
3009_spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
3010{
3011 struct spdk_bs_load_ctx *ctx = cb_arg;
3012
3013 ctx->page_index = 0;
3014 ctx->cur_page = 0;
3015 ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
3016 SPDK_BS_PAGE_SIZE,
3017 NULL);
3018 if (!ctx->page) {
3019 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3020 return;
3021 }
3022 _spdk_bs_load_replay_cur_md_page(seq, cb_arg);
3023}
3024
3025static void
3026_spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg)
3027{
3028 struct spdk_bs_load_ctx *ctx = cb_arg;
3029 int rc;
3030
3031 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
3032 if (rc < 0) {
3033 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3034 return;
3035 }
3036
3037 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
3038 if (rc < 0) {
3039 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3040 return;
3041 }
3042
3043 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
3044 if (rc < 0) {
3045 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3046 return;
3047 }
3048
3049 ctx->bs->num_free_clusters = ctx->bs->total_clusters;
3050 _spdk_bs_load_replay_md(seq, cb_arg);
3051}
3052
3053static void
3054_spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3055{
3056 struct spdk_bs_load_ctx *ctx = cb_arg;
3057 uint32_t crc;
3058 int rc;
3059 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
3060
3061 if (ctx->super->version > SPDK_BS_VERSION ||
3062 ctx->super->version < SPDK_BS_INITIAL_VERSION) {
3063 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3064 return;
3065 }
3066
3067 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3068 sizeof(ctx->super->signature)) != 0) {
3069 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3070 return;
3071 }
3072
3073 crc = _spdk_blob_md_page_calc_crc(ctx->super);
3074 if (crc != ctx->super->crc) {
3075 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3076 return;
3077 }
3078
3079 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
3080 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
3081 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
3082 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
3083 } else {
3084 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
3085 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
3086 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
3087 _spdk_bs_load_ctx_fail(seq, ctx, -ENXIO);
3088 return;
3089 }
3090
3091 if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) {
3092 SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n",
3093 ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size);
3094 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3095 return;
3096 }
3097
3098 if (ctx->super->size == 0) {
3099 ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen;
3100 }
3101
3102 if (ctx->super->io_unit_size == 0) {
3103 ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE;
3104 }
3105
3106 /* Parse the super block */
3107 ctx->bs->clean = 1;
3108 ctx->bs->cluster_sz = ctx->super->cluster_size;
3109 ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size;
3110 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
3111 ctx->bs->io_unit_size = ctx->super->io_unit_size;
3112 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
3113 if (rc < 0) {
3114 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3115 return;
3116 }
3117 ctx->bs->md_start = ctx->super->md_start;
3118 ctx->bs->md_len = ctx->super->md_len;
3119 ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
3120 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
3121 ctx->bs->super_blob = ctx->super->super_blob;
3122 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
3123
3124 if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) {
3125 _spdk_bs_recover(seq, ctx);
3126 } else {
3127 _spdk_bs_load_read_used_pages(seq, ctx);
3128 }
3129}
3130
3131void
3132spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
3133 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
3134{
3135 struct spdk_blob_store *bs;
3136 struct spdk_bs_cpl cpl;
3137 spdk_bs_sequence_t *seq;
3138 struct spdk_bs_load_ctx *ctx;
3139 struct spdk_bs_opts opts = {};
3140 int err;
3141
3142 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
3143
3144 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
3145 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen);
3146 dev->destroy(dev);
3147 cb_fn(cb_arg, NULL, -EINVAL);
3148 return;
3149 }
3150
3151 if (o) {
3152 opts = *o;
3153 } else {
3154 spdk_bs_opts_init(&opts);
3155 }
3156
3157 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
3158 dev->destroy(dev);
3159 cb_fn(cb_arg, NULL, -EINVAL);
3160 return;
3161 }
3162
3163 err = _spdk_bs_alloc(dev, &opts, &bs);
3164 if (err) {
3165 dev->destroy(dev);
3166 cb_fn(cb_arg, NULL, err);
3167 return;
3168 }
3169
3170 ctx = calloc(1, sizeof(*ctx));
3171 if (!ctx) {
3172 _spdk_bs_free(bs);
3173 cb_fn(cb_arg, NULL, -ENOMEM);
3174 return;
3175 }
3176
3177 ctx->bs = bs;
3178 ctx->is_load = true;
3179 ctx->iter_cb_fn = opts.iter_cb_fn;
3180 ctx->iter_cb_arg = opts.iter_cb_arg;
3181
3182 /* Allocate memory for the super block */
3183 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3184 if (!ctx->super) {
3185 free(ctx);
3186 _spdk_bs_free(bs);
3187 cb_fn(cb_arg, NULL, -ENOMEM);
3188 return;
3189 }
3190
3191 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3192 cpl.u.bs_handle.cb_fn = cb_fn;
3193 cpl.u.bs_handle.cb_arg = cb_arg;
3194 cpl.u.bs_handle.bs = bs;
3195
3196 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3197 if (!seq) {
3198 spdk_dma_free(ctx->super);
3199 free(ctx);
3200 _spdk_bs_free(bs);
3201 cb_fn(cb_arg, NULL, -ENOMEM);
3202 return;
3203 }
3204
3205 /* Read the super block */
3206 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3207 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3208 _spdk_bs_load_super_cpl, ctx);
3209}
3210
3211/* END spdk_bs_load */
3212
3213/* START spdk_bs_dump */
3214
3215struct spdk_bs_dump_ctx {
3216 struct spdk_blob_store *bs;
3217 struct spdk_bs_super_block *super;
3218 uint32_t cur_page;
3219 struct spdk_blob_md_page *page;
3220 spdk_bs_sequence_t *seq;
3221 FILE *fp;
3222 spdk_bs_dump_print_xattr print_xattr_fn;
3223 char xattr_name[4096];
3224};
3225
3226static void
3227_spdk_bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno)
3228{
3229 spdk_dma_free(ctx->super);
3230
3231 /*
3232 * We need to defer calling spdk_bs_call_cpl() until after
3233 * dev destruction, so tuck these away for later use.
3234 */
3235 ctx->bs->unload_err = bserrno;
3236 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3237 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3238
3239 spdk_bs_sequence_finish(seq, 0);
3240 _spdk_bs_free(ctx->bs);
3241 free(ctx);
3242}
3243
3244static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
3245
3246static void
3247_spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx)
3248{
3249 uint32_t page_idx = ctx->cur_page;
3250 struct spdk_blob_md_page *page = ctx->page;
3251 struct spdk_blob_md_descriptor *desc;
3252 size_t cur_desc = 0;
3253 uint32_t crc;
3254
3255 fprintf(ctx->fp, "=========\n");
3256 fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx);
3257 fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id);
3258
3259 crc = _spdk_blob_md_page_calc_crc(page);
3260 fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch");
3261
3262 desc = (struct spdk_blob_md_descriptor *)page->descriptors;
3263 while (cur_desc < sizeof(page->descriptors)) {
3264 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
3265 if (desc->length == 0) {
3266 /* If padding and length are 0, this terminates the page */
3267 break;
3268 }
3269 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
3270 struct spdk_blob_md_descriptor_extent *desc_extent;
3271 unsigned int i;
3272
3273 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
3274
3275 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
3276 if (desc_extent->extents[i].cluster_idx != 0) {
3277 fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32,
3278 desc_extent->extents[i].cluster_idx);
3279 } else {
3280 fprintf(ctx->fp, "Unallocated Extent - ");
3281 }
3282 fprintf(ctx->fp, " Length: %" PRIu32, desc_extent->extents[i].length);
3283 fprintf(ctx->fp, "\n");
3284 }
3285 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
3286 struct spdk_blob_md_descriptor_xattr *desc_xattr;
3287 uint32_t i;
3288
3289 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
3290
3291 if (desc_xattr->length !=
3292 sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) +
3293 desc_xattr->name_length + desc_xattr->value_length) {
3294 }
3295
3296 memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length);
3297 ctx->xattr_name[desc_xattr->name_length] = '\0';
3298 fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name);
3299 fprintf(ctx->fp, " value = \"");
3300 ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name,
3301 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
3302 desc_xattr->value_length);
3303 fprintf(ctx->fp, "\"\n");
3304 for (i = 0; i < desc_xattr->value_length; i++) {
3305 if (i % 16 == 0) {
3306 fprintf(ctx->fp, " ");
3307 }
3308 fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i));
3309 if ((i + 1) % 16 == 0) {
3310 fprintf(ctx->fp, "\n");
3311 }
3312 }
3313 if (i % 16 != 0) {
3314 fprintf(ctx->fp, "\n");
3315 }
3316 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
3317 /* TODO */
3318 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
3319 /* TODO */
3320 } else {
3321 /* Error */
3322 }
3323 /* Advance to the next descriptor */
3324 cur_desc += sizeof(*desc) + desc->length;
3325 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
3326 break;
3327 }
3328 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
3329 }
3330}
3331
3332static void
3333_spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3334{
3335 struct spdk_bs_dump_ctx *ctx = cb_arg;
3336
3337 if (bserrno != 0) {
3338 _spdk_bs_dump_finish(seq, ctx, bserrno);
3339 return;
3340 }
3341
3342 if (ctx->page->id != 0) {
3343 _spdk_bs_dump_print_md_page(ctx);
3344 }
3345
3346 ctx->cur_page++;
3347
3348 if (ctx->cur_page < ctx->super->md_len) {
3349 _spdk_bs_dump_read_md_page(seq, cb_arg);
3350 } else {
3351 spdk_dma_free(ctx->page);
3352 _spdk_bs_dump_finish(seq, ctx, 0);
3353 }
3354}
3355
3356static void
3357_spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
3358{
3359 struct spdk_bs_dump_ctx *ctx = cb_arg;
3360 uint64_t lba;
3361
3362 assert(ctx->cur_page < ctx->super->md_len);
3363 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
3364 spdk_bs_sequence_read_dev(seq, ctx->page, lba,
3365 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
3366 _spdk_bs_dump_read_md_page_cpl, ctx);
3367}
3368
3369static void
3370_spdk_bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3371{
3372 struct spdk_bs_dump_ctx *ctx = cb_arg;
3373
3374 fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature);
3375 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3376 sizeof(ctx->super->signature)) != 0) {
3377 fprintf(ctx->fp, "(Mismatch)\n");
3378 _spdk_bs_dump_finish(seq, ctx, bserrno);
3379 return;
3380 } else {
3381 fprintf(ctx->fp, "(OK)\n");
3382 }
3383 fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version);
3384 fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc,
3385 (ctx->super->crc == _spdk_blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch");
3386 fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype);
3387 fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size);
3388 fprintf(ctx->fp, "Super Blob ID: ");
3389 if (ctx->super->super_blob == SPDK_BLOBID_INVALID) {
3390 fprintf(ctx->fp, "(None)\n");
3391 } else {
3392 fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob);
3393 }
3394 fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean);
3395 fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start);
3396 fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len);
3397 fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start);
3398 fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len);
3399 fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start);
3400 fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len);
3401 fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start);
3402 fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len);
3403
3404 ctx->cur_page = 0;
3405 ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
3406 SPDK_BS_PAGE_SIZE,
3407 NULL);
3408 if (!ctx->page) {
3409 _spdk_bs_dump_finish(seq, ctx, -ENOMEM);
3410 return;
3411 }
3412 _spdk_bs_dump_read_md_page(seq, cb_arg);
3413}
3414
3415void
3416spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn,
3417 spdk_bs_op_complete cb_fn, void *cb_arg)
3418{
3419 struct spdk_blob_store *bs;
3420 struct spdk_bs_cpl cpl;
3421 spdk_bs_sequence_t *seq;
3422 struct spdk_bs_dump_ctx *ctx;
3423 struct spdk_bs_opts opts = {};
3424 int err;
3425
3426 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev);
3427
3428 spdk_bs_opts_init(&opts);
3429
3430 err = _spdk_bs_alloc(dev, &opts, &bs);
3431 if (err) {
3432 dev->destroy(dev);
3433 cb_fn(cb_arg, err);
3434 return;
3435 }
3436
3437 ctx = calloc(1, sizeof(*ctx));
3438 if (!ctx) {
3439 _spdk_bs_free(bs);
3440 cb_fn(cb_arg, -ENOMEM);
3441 return;
3442 }
3443
3444 ctx->bs = bs;
3445 ctx->fp = fp;
3446 ctx->print_xattr_fn = print_xattr_fn;
3447
3448 /* Allocate memory for the super block */
3449 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3450 if (!ctx->super) {
3451 free(ctx);
3452 _spdk_bs_free(bs);
3453 cb_fn(cb_arg, -ENOMEM);
3454 return;
3455 }
3456
3457 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3458 cpl.u.bs_basic.cb_fn = cb_fn;
3459 cpl.u.bs_basic.cb_arg = cb_arg;
3460
3461 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3462 if (!seq) {
3463 spdk_dma_free(ctx->super);
3464 free(ctx);
3465 _spdk_bs_free(bs);
3466 cb_fn(cb_arg, -ENOMEM);
3467 return;
3468 }
3469
3470 /* Read the super block */
3471 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3472 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3473 _spdk_bs_dump_super_cpl, ctx);
3474}
3475
3476/* END spdk_bs_dump */
3477
3478/* START spdk_bs_init */
3479
3480struct spdk_bs_init_ctx {
3481 struct spdk_blob_store *bs;
3482 struct spdk_bs_super_block *super;
3483};
3484
3485static void
3486_spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3487{
3488 struct spdk_bs_init_ctx *ctx = cb_arg;
3489
3490 spdk_dma_free(ctx->super);
3491 free(ctx);
3492
3493 spdk_bs_sequence_finish(seq, bserrno);
3494}
3495
3496static void
3497_spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3498{
3499 struct spdk_bs_init_ctx *ctx = cb_arg;
3500
3501 /* Write super block */
3502 spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
3503 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
3504 _spdk_bs_init_persist_super_cpl, ctx);
3505}
3506
3507void
3508spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
3509 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
3510{
3511 struct spdk_bs_init_ctx *ctx;
3512 struct spdk_blob_store *bs;
3513 struct spdk_bs_cpl cpl;
3514 spdk_bs_sequence_t *seq;
3515 spdk_bs_batch_t *batch;
3516 uint64_t num_md_lba;
3517 uint64_t num_md_pages;
3518 uint64_t num_md_clusters;
3519 uint32_t i;
3520 struct spdk_bs_opts opts = {};
3521 int rc;
3522
3523 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
3524
3525 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
3526 SPDK_ERRLOG("unsupported dev block length of %d\n",
3527 dev->blocklen);
3528 dev->destroy(dev);
3529 cb_fn(cb_arg, NULL, -EINVAL);
3530 return;
3531 }
3532
3533 if (o) {
3534 opts = *o;
3535 } else {
3536 spdk_bs_opts_init(&opts);
3537 }
3538
3539 if (_spdk_bs_opts_verify(&opts) != 0) {
3540 dev->destroy(dev);
3541 cb_fn(cb_arg, NULL, -EINVAL);
3542 return;
3543 }
3544
3545 rc = _spdk_bs_alloc(dev, &opts, &bs);
3546 if (rc) {
3547 dev->destroy(dev);
3548 cb_fn(cb_arg, NULL, rc);
3549 return;
3550 }
3551
3552 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
3553 /* By default, allocate 1 page per cluster.
3554 * Technically, this over-allocates metadata
3555 * because more metadata will reduce the number
3556 * of usable clusters. This can be addressed with
3557 * more complex math in the future.
3558 */
3559 bs->md_len = bs->total_clusters;
3560 } else {
3561 bs->md_len = opts.num_md_pages;
3562 }
3563 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
3564 if (rc < 0) {
3565 _spdk_bs_free(bs);
3566 cb_fn(cb_arg, NULL, -ENOMEM);
3567 return;
3568 }
3569
3570 rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
3571 if (rc < 0) {
3572 _spdk_bs_free(bs);
3573 cb_fn(cb_arg, NULL, -ENOMEM);
3574 return;
3575 }
3576
3577 ctx = calloc(1, sizeof(*ctx));
3578 if (!ctx) {
3579 _spdk_bs_free(bs);
3580 cb_fn(cb_arg, NULL, -ENOMEM);
3581 return;
3582 }
3583
3584 ctx->bs = bs;
3585
3586 /* Allocate memory for the super block */
3587 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3588 if (!ctx->super) {
3589 free(ctx);
3590 _spdk_bs_free(bs);
3591 cb_fn(cb_arg, NULL, -ENOMEM);
3592 return;
3593 }
3594 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3595 sizeof(ctx->super->signature));
3596 ctx->super->version = SPDK_BS_VERSION;
3597 ctx->super->length = sizeof(*ctx->super);
3598 ctx->super->super_blob = bs->super_blob;
3599 ctx->super->clean = 0;
3600 ctx->super->cluster_size = bs->cluster_sz;
3601 ctx->super->io_unit_size = bs->io_unit_size;
3602 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
3603
3604 /* Calculate how many pages the metadata consumes at the front
3605 * of the disk.
3606 */
3607
3608 /* The super block uses 1 page */
3609 num_md_pages = 1;
3610
3611 /* The used_md_pages mask requires 1 bit per metadata page, rounded
3612 * up to the nearest page, plus a header.
3613 */
3614 ctx->super->used_page_mask_start = num_md_pages;
3615 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
3616 divide_round_up(bs->md_len, 8),
3617 SPDK_BS_PAGE_SIZE);
3618 num_md_pages += ctx->super->used_page_mask_len;
3619
3620 /* The used_clusters mask requires 1 bit per cluster, rounded
3621 * up to the nearest page, plus a header.
3622 */
3623 ctx->super->used_cluster_mask_start = num_md_pages;
3624 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
3625 divide_round_up(bs->total_clusters, 8),
3626 SPDK_BS_PAGE_SIZE);
3627 num_md_pages += ctx->super->used_cluster_mask_len;
3628
3629 /* The used_blobids mask requires 1 bit per metadata page, rounded
3630 * up to the nearest page, plus a header.
3631 */
3632 ctx->super->used_blobid_mask_start = num_md_pages;
3633 ctx->super->used_blobid_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
3634 divide_round_up(bs->md_len, 8),
3635 SPDK_BS_PAGE_SIZE);
3636 num_md_pages += ctx->super->used_blobid_mask_len;
3637
3638 /* The metadata region size was chosen above */
3639 ctx->super->md_start = bs->md_start = num_md_pages;
3640 ctx->super->md_len = bs->md_len;
3641 num_md_pages += bs->md_len;
3642
3643 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
3644
3645 ctx->super->size = dev->blockcnt * dev->blocklen;
3646
3647 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
3648
3649 num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
3650 if (num_md_clusters > bs->total_clusters) {
3651 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
3652 "please decrease number of pages reserved for metadata "
3653 "or increase cluster size.\n");
3654 spdk_dma_free(ctx->super);
3655 free(ctx);
3656 _spdk_bs_free(bs);
3657 cb_fn(cb_arg, NULL, -ENOMEM);
3658 return;
3659 }
3660 /* Claim all of the clusters used by the metadata */
3661 for (i = 0; i < num_md_clusters; i++) {
3662 _spdk_bs_claim_cluster(bs, i);
3663 }
3664
3665 bs->total_data_clusters = bs->num_free_clusters;
3666
3667 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3668 cpl.u.bs_handle.cb_fn = cb_fn;
3669 cpl.u.bs_handle.cb_arg = cb_arg;
3670 cpl.u.bs_handle.bs = bs;
3671
3672 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3673 if (!seq) {
3674 spdk_dma_free(ctx->super);
3675 free(ctx);
3676 _spdk_bs_free(bs);
3677 cb_fn(cb_arg, NULL, -ENOMEM);
3678 return;
3679 }
3680
3681 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
3682
3683 /* Clear metadata space */
3684 spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba);
3685 /* Trim data clusters */
3686 spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
3687
3688 spdk_bs_batch_close(batch);
3689}
3690
3691/* END spdk_bs_init */
3692
3693/* START spdk_bs_destroy */
3694
3695static void
3696_spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3697{
3698 struct spdk_bs_init_ctx *ctx = cb_arg;
3699 struct spdk_blob_store *bs = ctx->bs;
3700
3701 /*
3702 * We need to defer calling spdk_bs_call_cpl() until after
3703 * dev destruction, so tuck these away for later use.
3704 */
3705 bs->unload_err = bserrno;
3706 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3707 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3708
3709 spdk_bs_sequence_finish(seq, bserrno);
3710
3711 _spdk_bs_free(bs);
3712 free(ctx);
3713}
3714
3715void
3716spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
3717 void *cb_arg)
3718{
3719 struct spdk_bs_cpl cpl;
3720 spdk_bs_sequence_t *seq;
3721 struct spdk_bs_init_ctx *ctx;
3722
3723 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
3724
3725 if (!TAILQ_EMPTY(&bs->blobs)) {
3726 SPDK_ERRLOG("Blobstore still has open blobs\n");
3727 cb_fn(cb_arg, -EBUSY);
3728 return;
3729 }
3730
3731 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3732 cpl.u.bs_basic.cb_fn = cb_fn;
3733 cpl.u.bs_basic.cb_arg = cb_arg;
3734
3735 ctx = calloc(1, sizeof(*ctx));
3736 if (!ctx) {
3737 cb_fn(cb_arg, -ENOMEM);
3738 return;
3739 }
3740
3741 ctx->bs = bs;
3742
3743 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3744 if (!seq) {
3745 free(ctx);
3746 cb_fn(cb_arg, -ENOMEM);
3747 return;
3748 }
3749
3750 /* Write zeroes to the super block */
3751 spdk_bs_sequence_write_zeroes_dev(seq,
3752 _spdk_bs_page_to_lba(bs, 0),
3753 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
3754 _spdk_bs_destroy_trim_cpl, ctx);
3755}
3756
3757/* END spdk_bs_destroy */
3758
3759/* START spdk_bs_unload */
3760
3761static void
3762_spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3763{
3764 struct spdk_bs_load_ctx *ctx = cb_arg;
3765
3766 spdk_dma_free(ctx->super);
3767
3768 /*
3769 * We need to defer calling spdk_bs_call_cpl() until after
3770 * dev destruction, so tuck these away for later use.
3771 */
3772 ctx->bs->unload_err = bserrno;
3773 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3774 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3775
3776 spdk_bs_sequence_finish(seq, bserrno);
3777
3778 _spdk_bs_free(ctx->bs);
3779 free(ctx);
3780}
3781
3782static void
3783_spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3784{
3785 struct spdk_bs_load_ctx *ctx = cb_arg;
3786
3787 spdk_dma_free(ctx->mask);
3788 ctx->super->clean = 1;
3789
3790 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
3791}
3792
3793static void
3794_spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3795{
3796 struct spdk_bs_load_ctx *ctx = cb_arg;
3797
3798 spdk_dma_free(ctx->mask);
3799 ctx->mask = NULL;
3800
3801 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
3802}
3803
3804static void
3805_spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3806{
3807 struct spdk_bs_load_ctx *ctx = cb_arg;
3808
3809 spdk_dma_free(ctx->mask);
3810 ctx->mask = NULL;
3811
3812 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl);
3813}
3814
3815static void
3816_spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3817{
3818 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
3819}
3820
3821void
3822spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
3823{
3824 struct spdk_bs_cpl cpl;
3825 spdk_bs_sequence_t *seq;
3826 struct spdk_bs_load_ctx *ctx;
3827
3828 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
3829
3830 if (!TAILQ_EMPTY(&bs->blobs)) {
3831 SPDK_ERRLOG("Blobstore still has open blobs\n");
3832 cb_fn(cb_arg, -EBUSY);
3833 return;
3834 }
3835
3836 ctx = calloc(1, sizeof(*ctx));
3837 if (!ctx) {
3838 cb_fn(cb_arg, -ENOMEM);
3839 return;
3840 }
3841
3842 ctx->bs = bs;
3843 ctx->is_load = false;
3844
3845 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3846 if (!ctx->super) {
3847 free(ctx);
3848 cb_fn(cb_arg, -ENOMEM);
3849 return;
3850 }
3851
3852 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3853 cpl.u.bs_basic.cb_fn = cb_fn;
3854 cpl.u.bs_basic.cb_arg = cb_arg;
3855
3856 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3857 if (!seq) {
3858 spdk_dma_free(ctx->super);
3859 free(ctx);
3860 cb_fn(cb_arg, -ENOMEM);
3861 return;
3862 }
3863
3864 /* Read super block */
3865 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3866 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3867 _spdk_bs_unload_read_super_cpl, ctx);
3868}
3869
3870/* END spdk_bs_unload */
3871
3872/* START spdk_bs_set_super */
3873
3874struct spdk_bs_set_super_ctx {
3875 struct spdk_blob_store *bs;
3876 struct spdk_bs_super_block *super;
3877};
3878
3879static void
3880_spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3881{
3882 struct spdk_bs_set_super_ctx *ctx = cb_arg;
3883
3884 if (bserrno != 0) {
3885 SPDK_ERRLOG("Unable to write to super block of blobstore\n");
3886 }
3887
3888 spdk_dma_free(ctx->super);
3889
3890 spdk_bs_sequence_finish(seq, bserrno);
3891
3892 free(ctx);
3893}
3894
3895static void
3896_spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3897{
3898 struct spdk_bs_set_super_ctx *ctx = cb_arg;
3899
3900 if (bserrno != 0) {
3901 SPDK_ERRLOG("Unable to read super block of blobstore\n");
3902 spdk_dma_free(ctx->super);
3903 spdk_bs_sequence_finish(seq, bserrno);
3904 free(ctx);
3905 return;
3906 }
3907
3908 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx);
3909}
3910
3911void
3912spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
3913 spdk_bs_op_complete cb_fn, void *cb_arg)
3914{
3915 struct spdk_bs_cpl cpl;
3916 spdk_bs_sequence_t *seq;
3917 struct spdk_bs_set_super_ctx *ctx;
3918
3919 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n");
3920
3921 ctx = calloc(1, sizeof(*ctx));
3922 if (!ctx) {
3923 cb_fn(cb_arg, -ENOMEM);
3924 return;
3925 }
3926
3927 ctx->bs = bs;
3928
3929 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3930 if (!ctx->super) {
3931 free(ctx);
3932 cb_fn(cb_arg, -ENOMEM);
3933 return;
3934 }
3935
3936 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3937 cpl.u.bs_basic.cb_fn = cb_fn;
3938 cpl.u.bs_basic.cb_arg = cb_arg;
3939
3940 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3941 if (!seq) {
3942 spdk_dma_free(ctx->super);
3943 free(ctx);
3944 cb_fn(cb_arg, -ENOMEM);
3945 return;
3946 }
3947
3948 bs->super_blob = blobid;
3949
3950 /* Read super block */
3951 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3952 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3953 _spdk_bs_set_super_read_cpl, ctx);
3954}
3955
3956/* END spdk_bs_set_super */
3957
3958void
3959spdk_bs_get_super(struct spdk_blob_store *bs,
3960 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3961{
3962 if (bs->super_blob == SPDK_BLOBID_INVALID) {
3963 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
3964 } else {
3965 cb_fn(cb_arg, bs->super_blob, 0);
3966 }
3967}
3968
3969uint64_t
3970spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
3971{
3972 return bs->cluster_sz;
3973}
3974
3975uint64_t
3976spdk_bs_get_page_size(struct spdk_blob_store *bs)
3977{
3978 return SPDK_BS_PAGE_SIZE;
3979}
3980
3981uint64_t
3982spdk_bs_get_io_unit_size(struct spdk_blob_store *bs)
3983{
3984 return bs->io_unit_size;
3985}
3986
3987uint64_t
3988spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
3989{
3990 return bs->num_free_clusters;
3991}
3992
3993uint64_t
3994spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
3995{
3996 return bs->total_data_clusters;
3997}
3998
3999static int
4000spdk_bs_register_md_thread(struct spdk_blob_store *bs)
4001{
4002 bs->md_channel = spdk_get_io_channel(bs);
4003 if (!bs->md_channel) {
4004 SPDK_ERRLOG("Failed to get IO channel.\n");
4005 return -1;
4006 }
4007
4008 return 0;
4009}
4010
4011static int
4012spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
4013{
4014 spdk_put_io_channel(bs->md_channel);
4015
4016 return 0;
4017}
4018
4019spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
4020{
4021 assert(blob != NULL);
4022
4023 return blob->id;
4024}
4025
4026uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
4027{
4028 assert(blob != NULL);
4029
4030 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
4031}
4032
4033uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob)
4034{
4035 assert(blob != NULL);
4036
4037 return spdk_blob_get_num_pages(blob) * _spdk_bs_io_unit_per_page(blob->bs);
4038}
4039
4040uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
4041{
4042 assert(blob != NULL);
4043
4044 return blob->active.num_clusters;
4045}
4046
4047/* START spdk_bs_create_blob */
4048
4049static void
4050_spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4051{
4052 struct spdk_blob *blob = cb_arg;
4053
4054 _spdk_blob_free(blob);
4055
4056 spdk_bs_sequence_finish(seq, bserrno);
4057}
4058
4059static int
4060_spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs,
4061 bool internal)
4062{
4063 uint64_t i;
4064 size_t value_len = 0;
4065 int rc;
4066 const void *value = NULL;
4067 if (xattrs->count > 0 && xattrs->get_value == NULL) {
4068 return -EINVAL;
4069 }
4070 for (i = 0; i < xattrs->count; i++) {
4071 xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len);
4072 if (value == NULL || value_len == 0) {
4073 return -EINVAL;
4074 }
4075 rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal);
4076 if (rc < 0) {
4077 return rc;
7c673cae
FG
4078 }
4079 }
11fdf7f2
TL
4080 return 0;
4081}
7c673cae 4082
11fdf7f2
TL
4083static void
4084_spdk_blob_set_thin_provision(struct spdk_blob *blob)
4085{
4086 _spdk_blob_verify_md_op(blob);
4087 blob->invalid_flags |= SPDK_BLOB_THIN_PROV;
4088 blob->state = SPDK_BLOB_STATE_DIRTY;
7c673cae
FG
4089}
4090
4091static void
11fdf7f2
TL
4092_spdk_bs_create_blob(struct spdk_blob_store *bs,
4093 const struct spdk_blob_opts *opts,
4094 const struct spdk_blob_xattr_opts *internal_xattrs,
4095 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
7c673cae 4096{
11fdf7f2
TL
4097 struct spdk_blob *blob;
4098 uint32_t page_idx;
4099 struct spdk_bs_cpl cpl;
4100 struct spdk_blob_opts opts_default;
4101 struct spdk_blob_xattr_opts internal_xattrs_default;
4102 spdk_bs_sequence_t *seq;
4103 spdk_blob_id id;
4104 int rc;
7c673cae 4105
11fdf7f2 4106 assert(spdk_get_thread() == bs->md_thread);
7c673cae 4107
11fdf7f2
TL
4108 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
4109 if (page_idx == UINT32_MAX) {
4110 cb_fn(cb_arg, 0, -ENOMEM);
7c673cae
FG
4111 return;
4112 }
11fdf7f2
TL
4113 spdk_bit_array_set(bs->used_blobids, page_idx);
4114 spdk_bit_array_set(bs->used_md_pages, page_idx);
7c673cae 4115
11fdf7f2 4116 id = _spdk_bs_page_to_blobid(page_idx);
7c673cae 4117
11fdf7f2 4118 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
7c673cae 4119
11fdf7f2
TL
4120 blob = _spdk_blob_alloc(bs, id);
4121 if (!blob) {
4122 cb_fn(cb_arg, 0, -ENOMEM);
7c673cae
FG
4123 return;
4124 }
7c673cae 4125
11fdf7f2
TL
4126 if (!opts) {
4127 spdk_blob_opts_init(&opts_default);
4128 opts = &opts_default;
4129 }
4130 if (!internal_xattrs) {
4131 _spdk_blob_xattrs_init(&internal_xattrs_default);
4132 internal_xattrs = &internal_xattrs_default;
4133 }
7c673cae 4134
11fdf7f2
TL
4135 rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false);
4136 if (rc < 0) {
4137 _spdk_blob_free(blob);
4138 cb_fn(cb_arg, 0, rc);
7c673cae
FG
4139 return;
4140 }
4141
11fdf7f2
TL
4142 rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true);
4143 if (rc < 0) {
4144 _spdk_blob_free(blob);
4145 cb_fn(cb_arg, 0, rc);
7c673cae
FG
4146 return;
4147 }
4148
11fdf7f2
TL
4149 if (opts->thin_provision) {
4150 _spdk_blob_set_thin_provision(blob);
4151 }
7c673cae 4152
11fdf7f2
TL
4153 rc = _spdk_blob_resize(blob, opts->num_clusters);
4154 if (rc < 0) {
4155 _spdk_blob_free(blob);
4156 cb_fn(cb_arg, 0, rc);
7c673cae
FG
4157 return;
4158 }
11fdf7f2
TL
4159 cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4160 cpl.u.blobid.cb_fn = cb_fn;
4161 cpl.u.blobid.cb_arg = cb_arg;
4162 cpl.u.blobid.blobid = blob->id;
7c673cae
FG
4163
4164 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4165 if (!seq) {
11fdf7f2
TL
4166 _spdk_blob_free(blob);
4167 cb_fn(cb_arg, 0, -ENOMEM);
7c673cae
FG
4168 return;
4169 }
4170
11fdf7f2 4171 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
7c673cae
FG
4172}
4173
11fdf7f2
TL
4174void spdk_bs_create_blob(struct spdk_blob_store *bs,
4175 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4176{
4177 _spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg);
4178}
7c673cae 4179
11fdf7f2
TL
4180void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
4181 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4182{
4183 _spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg);
4184}
7c673cae 4185
11fdf7f2
TL
4186/* END spdk_bs_create_blob */
4187
4188/* START blob_cleanup */
4189
4190struct spdk_clone_snapshot_ctx {
4191 struct spdk_bs_cpl cpl;
4192 int bserrno;
4193 bool frozen;
4194
4195 struct spdk_io_channel *channel;
4196
4197 /* Current cluster for inflate operation */
4198 uint64_t cluster;
4199
4200 /* For inflation force allocation of all unallocated clusters and remove
4201 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */
4202 bool allocate_all;
4203
4204 struct {
4205 spdk_blob_id id;
4206 struct spdk_blob *blob;
4207 } original;
4208 struct {
4209 spdk_blob_id id;
4210 struct spdk_blob *blob;
4211 } new;
4212
4213 /* xattrs specified for snapshot/clones only. They have no impact on
4214 * the original blobs xattrs. */
4215 const struct spdk_blob_xattr_opts *xattrs;
7c673cae
FG
4216};
4217
4218static void
11fdf7f2 4219_spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno)
7c673cae 4220{
11fdf7f2
TL
4221 struct spdk_clone_snapshot_ctx *ctx = cb_arg;
4222 struct spdk_bs_cpl *cpl = &ctx->cpl;
7c673cae 4223
11fdf7f2
TL
4224 if (bserrno != 0) {
4225 if (ctx->bserrno != 0) {
4226 SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4227 } else {
4228 ctx->bserrno = bserrno;
4229 }
4230 }
7c673cae 4231
11fdf7f2
TL
4232 switch (cpl->type) {
4233 case SPDK_BS_CPL_TYPE_BLOBID:
4234 cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno);
4235 break;
4236 case SPDK_BS_CPL_TYPE_BLOB_BASIC:
4237 cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno);
4238 break;
4239 default:
4240 SPDK_UNREACHABLE();
4241 break;
4242 }
4243
4244 free(ctx);
7c673cae
FG
4245}
4246
4247static void
11fdf7f2 4248_spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno)
7c673cae 4249{
11fdf7f2
TL
4250 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4251 struct spdk_blob *origblob = ctx->original.blob;
7c673cae 4252
11fdf7f2
TL
4253 if (bserrno != 0) {
4254 if (ctx->bserrno != 0) {
4255 SPDK_ERRLOG("Unfreeze error %d\n", bserrno);
4256 } else {
4257 ctx->bserrno = bserrno;
4258 }
4259 }
4260
4261 ctx->original.id = origblob->id;
4262 spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
7c673cae
FG
4263}
4264
11fdf7f2
TL
4265static void
4266_spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno)
7c673cae 4267{
11fdf7f2
TL
4268 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4269 struct spdk_blob *origblob = ctx->original.blob;
7c673cae 4270
11fdf7f2
TL
4271 if (bserrno != 0) {
4272 if (ctx->bserrno != 0) {
4273 SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4274 } else {
4275 ctx->bserrno = bserrno;
4276 }
4277 }
7c673cae 4278
11fdf7f2
TL
4279 if (ctx->frozen) {
4280 /* Unfreeze any outstanding I/O */
4281 _spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx);
7c673cae 4282 } else {
11fdf7f2 4283 _spdk_bs_snapshot_unfreeze_cpl(ctx, 0);
7c673cae
FG
4284 }
4285
11fdf7f2 4286}
7c673cae 4287
11fdf7f2
TL
4288static void
4289_spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno)
4290{
4291 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4292 struct spdk_blob *newblob = ctx->new.blob;
7c673cae 4293
11fdf7f2
TL
4294 if (bserrno != 0) {
4295 if (ctx->bserrno != 0) {
4296 SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4297 } else {
4298 ctx->bserrno = bserrno;
4299 }
7c673cae
FG
4300 }
4301
11fdf7f2
TL
4302 ctx->new.id = newblob->id;
4303 spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4304}
4305
4306/* END blob_cleanup */
4307
4308/* START spdk_bs_create_snapshot */
4309
4310static void
4311_spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno)
4312{
4313 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4314 struct spdk_blob *newblob = ctx->new.blob;
4315
4316 if (bserrno != 0) {
4317 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
7c673cae
FG
4318 return;
4319 }
4320
11fdf7f2
TL
4321 /* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */
4322 bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true);
4323 if (bserrno != 0) {
4324 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
7c673cae
FG
4325 return;
4326 }
7c673cae 4327
11fdf7f2 4328 _spdk_bs_blob_list_add(ctx->original.blob);
7c673cae 4329
11fdf7f2 4330 spdk_blob_set_read_only(newblob);
7c673cae 4331
11fdf7f2
TL
4332 /* sync snapshot metadata */
4333 spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, cb_arg);
4334}
7c673cae 4335
11fdf7f2
TL
4336static void
4337_spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno)
4338{
4339 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4340 struct spdk_blob *origblob = ctx->original.blob;
4341 struct spdk_blob *newblob = ctx->new.blob;
7c673cae 4342
11fdf7f2
TL
4343 if (bserrno != 0) {
4344 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4345 return;
4346 }
7c673cae 4347
11fdf7f2
TL
4348 /* Set internal xattr for snapshot id */
4349 bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true);
4350 if (bserrno != 0) {
4351 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4352 return;
7c673cae
FG
4353 }
4354
11fdf7f2
TL
4355 _spdk_bs_blob_list_remove(origblob);
4356 origblob->parent_id = newblob->id;
7c673cae 4357
11fdf7f2
TL
4358 /* Create new back_bs_dev for snapshot */
4359 origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob);
4360 if (origblob->back_bs_dev == NULL) {
4361 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL);
7c673cae
FG
4362 return;
4363 }
4364
11fdf7f2
TL
4365 /* set clone blob as thin provisioned */
4366 _spdk_blob_set_thin_provision(origblob);
4367
4368 _spdk_bs_blob_list_add(newblob);
4369
4370 /* Zero out origblob cluster map */
4371 memset(origblob->active.clusters, 0,
4372 origblob->active.num_clusters * sizeof(origblob->active.clusters));
4373
4374 /* sync clone metadata */
4375 spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx);
7c673cae
FG
4376}
4377
11fdf7f2
TL
4378static void
4379_spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc)
4380{
4381 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4382 struct spdk_blob *origblob = ctx->original.blob;
4383 struct spdk_blob *newblob = ctx->new.blob;
4384 int bserrno;
7c673cae 4385
11fdf7f2
TL
4386 if (rc != 0) {
4387 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, rc);
4388 return;
4389 }
7c673cae 4390
11fdf7f2
TL
4391 ctx->frozen = true;
4392
4393 /* set new back_bs_dev for snapshot */
4394 newblob->back_bs_dev = origblob->back_bs_dev;
4395 /* Set invalid flags from origblob */
4396 newblob->invalid_flags = origblob->invalid_flags;
4397
4398 /* inherit parent from original blob if set */
4399 newblob->parent_id = origblob->parent_id;
4400 if (origblob->parent_id != SPDK_BLOBID_INVALID) {
4401 /* Set internal xattr for snapshot id */
4402 bserrno = _spdk_blob_set_xattr(newblob, BLOB_SNAPSHOT,
4403 &origblob->parent_id, sizeof(spdk_blob_id), true);
4404 if (bserrno != 0) {
4405 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4406 return;
4407 }
4408 }
7c673cae 4409
11fdf7f2
TL
4410 /* Copy cluster map to snapshot */
4411 memcpy(newblob->active.clusters, origblob->active.clusters,
4412 origblob->active.num_clusters * sizeof(origblob->active.clusters));
4413
4414 /* sync snapshot metadata */
4415 spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx);
4416}
7c673cae
FG
4417
4418static void
11fdf7f2 4419_spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
7c673cae 4420{
11fdf7f2
TL
4421 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4422 struct spdk_blob *origblob = ctx->original.blob;
4423 struct spdk_blob *newblob = _blob;
7c673cae 4424
11fdf7f2
TL
4425 if (bserrno != 0) {
4426 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4427 return;
4428 }
7c673cae 4429
11fdf7f2 4430 ctx->new.blob = newblob;
7c673cae 4431
11fdf7f2 4432 _spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx);
7c673cae
FG
4433}
4434
4435static void
11fdf7f2 4436_spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
7c673cae 4437{
11fdf7f2
TL
4438 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4439 struct spdk_blob *origblob = ctx->original.blob;
7c673cae 4440
11fdf7f2
TL
4441 if (bserrno != 0) {
4442 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4443 return;
4444 }
7c673cae 4445
11fdf7f2
TL
4446 ctx->new.id = blobid;
4447 ctx->cpl.u.blobid.blobid = blobid;
7c673cae 4448
11fdf7f2 4449 spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx);
7c673cae
FG
4450}
4451
11fdf7f2 4452
7c673cae 4453static void
11fdf7f2
TL
4454_spdk_bs_xattr_snapshot(void *arg, const char *name,
4455 const void **value, size_t *value_len)
7c673cae 4456{
11fdf7f2 4457 assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0);
7c673cae 4458
11fdf7f2
TL
4459 struct spdk_blob *blob = (struct spdk_blob *)arg;
4460 *value = &blob->id;
4461 *value_len = sizeof(blob->id);
4462}
7c673cae 4463
11fdf7f2
TL
4464static void
4465_spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4466{
4467 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4468 struct spdk_blob_opts opts;
4469 struct spdk_blob_xattr_opts internal_xattrs;
4470 char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS };
4471
4472 if (bserrno != 0) {
4473 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
7c673cae
FG
4474 return;
4475 }
4476
11fdf7f2 4477 ctx->original.blob = _blob;
7c673cae 4478
11fdf7f2
TL
4479 if (_blob->data_ro || _blob->md_ro) {
4480 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n",
4481 _blob->id);
4482 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL);
4483 return;
7c673cae
FG
4484 }
4485
11fdf7f2
TL
4486 spdk_blob_opts_init(&opts);
4487 _spdk_blob_xattrs_init(&internal_xattrs);
4488
4489 /* Change the size of new blob to the same as in original blob,
4490 * but do not allocate clusters */
4491 opts.thin_provision = true;
4492 opts.num_clusters = spdk_blob_get_num_clusters(_blob);
4493
4494 /* If there are any xattrs specified for snapshot, set them now */
4495 if (ctx->xattrs) {
4496 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
4497 }
4498 /* Set internal xattr SNAPSHOT_IN_PROGRESS */
4499 internal_xattrs.count = 1;
4500 internal_xattrs.ctx = _blob;
4501 internal_xattrs.names = xattrs_names;
4502 internal_xattrs.get_value = _spdk_bs_xattr_snapshot;
4503
4504 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
4505 _spdk_bs_snapshot_newblob_create_cpl, ctx);
7c673cae
FG
4506}
4507
11fdf7f2
TL
4508void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid,
4509 const struct spdk_blob_xattr_opts *snapshot_xattrs,
4510 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
7c673cae 4511{
11fdf7f2 4512 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
7c673cae 4513
11fdf7f2
TL
4514 if (!ctx) {
4515 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
7c673cae
FG
4516 return;
4517 }
11fdf7f2
TL
4518 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4519 ctx->cpl.u.blobid.cb_fn = cb_fn;
4520 ctx->cpl.u.blobid.cb_arg = cb_arg;
4521 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
4522 ctx->bserrno = 0;
4523 ctx->frozen = false;
4524 ctx->original.id = blobid;
4525 ctx->xattrs = snapshot_xattrs;
4526
4527 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx);
4528}
4529/* END spdk_bs_create_snapshot */
7c673cae 4530
11fdf7f2 4531/* START spdk_bs_create_clone */
7c673cae 4532
11fdf7f2
TL
4533static void
4534_spdk_bs_xattr_clone(void *arg, const char *name,
4535 const void **value, size_t *value_len)
4536{
4537 assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0);
7c673cae 4538
11fdf7f2
TL
4539 struct spdk_blob *blob = (struct spdk_blob *)arg;
4540 *value = &blob->id;
4541 *value_len = sizeof(blob->id);
7c673cae
FG
4542}
4543
11fdf7f2
TL
4544static void
4545_spdk_bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
7c673cae 4546{
11fdf7f2
TL
4547 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4548 struct spdk_blob *clone = _blob;
7c673cae 4549
11fdf7f2
TL
4550 ctx->new.blob = clone;
4551 _spdk_bs_blob_list_add(clone);
7c673cae 4552
11fdf7f2
TL
4553 spdk_blob_close(clone, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4554}
7c673cae 4555
11fdf7f2
TL
4556static void
4557_spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
4558{
4559 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
7c673cae 4560
11fdf7f2
TL
4561 ctx->cpl.u.blobid.blobid = blobid;
4562 spdk_bs_open_blob(ctx->original.blob->bs, blobid, _spdk_bs_clone_newblob_open_cpl, ctx);
4563}
4564
4565static void
4566_spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4567{
4568 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4569 struct spdk_blob_opts opts;
4570 struct spdk_blob_xattr_opts internal_xattrs;
4571 char *xattr_names[] = { BLOB_SNAPSHOT };
4572
4573 if (bserrno != 0) {
4574 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
7c673cae
FG
4575 return;
4576 }
4577
11fdf7f2 4578 ctx->original.blob = _blob;
7c673cae 4579
11fdf7f2
TL
4580 if (!_blob->data_ro || !_blob->md_ro) {
4581 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n");
4582 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL);
7c673cae
FG
4583 return;
4584 }
4585
11fdf7f2
TL
4586 spdk_blob_opts_init(&opts);
4587 _spdk_blob_xattrs_init(&internal_xattrs);
7c673cae 4588
11fdf7f2
TL
4589 opts.thin_provision = true;
4590 opts.num_clusters = spdk_blob_get_num_clusters(_blob);
4591 if (ctx->xattrs) {
4592 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
4593 }
7c673cae 4594
11fdf7f2
TL
4595 /* Set internal xattr BLOB_SNAPSHOT */
4596 internal_xattrs.count = 1;
4597 internal_xattrs.ctx = _blob;
4598 internal_xattrs.names = xattr_names;
4599 internal_xattrs.get_value = _spdk_bs_xattr_clone;
7c673cae 4600
11fdf7f2
TL
4601 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
4602 _spdk_bs_clone_newblob_create_cpl, ctx);
7c673cae
FG
4603}
4604
11fdf7f2
TL
4605void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid,
4606 const struct spdk_blob_xattr_opts *clone_xattrs,
4607 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
7c673cae 4608{
11fdf7f2
TL
4609 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
4610
4611 if (!ctx) {
4612 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
4613 return;
7c673cae 4614 }
7c673cae 4615
11fdf7f2
TL
4616 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4617 ctx->cpl.u.blobid.cb_fn = cb_fn;
4618 ctx->cpl.u.blobid.cb_arg = cb_arg;
4619 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
4620 ctx->bserrno = 0;
4621 ctx->xattrs = clone_xattrs;
4622 ctx->original.id = blobid;
7c673cae 4623
11fdf7f2 4624 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx);
7c673cae
FG
4625}
4626
11fdf7f2
TL
4627/* END spdk_bs_create_clone */
4628
4629/* START spdk_bs_inflate_blob */
7c673cae 4630
11fdf7f2
TL
4631static void
4632_spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno)
7c673cae 4633{
11fdf7f2
TL
4634 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4635 struct spdk_blob *_blob = ctx->original.blob;
7c673cae 4636
11fdf7f2
TL
4637 if (bserrno != 0) {
4638 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4639 return;
4640 }
7c673cae 4641
11fdf7f2 4642 assert(_parent != NULL);
7c673cae 4643
11fdf7f2
TL
4644 _spdk_bs_blob_list_remove(_blob);
4645 _blob->parent_id = _parent->id;
4646 _spdk_blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id,
4647 sizeof(spdk_blob_id), true);
7c673cae 4648
11fdf7f2
TL
4649 _blob->back_bs_dev->destroy(_blob->back_bs_dev);
4650 _blob->back_bs_dev = spdk_bs_create_blob_bs_dev(_parent);
4651 _spdk_bs_blob_list_add(_blob);
7c673cae 4652
11fdf7f2 4653 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
7c673cae
FG
4654}
4655
11fdf7f2
TL
4656static void
4657_spdk_bs_inflate_blob_done(void *cb_arg, int bserrno)
7c673cae 4658{
11fdf7f2
TL
4659 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4660 struct spdk_blob *_blob = ctx->original.blob;
4661 struct spdk_blob *_parent;
7c673cae 4662
11fdf7f2
TL
4663 if (bserrno != 0) {
4664 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4665 return;
4666 }
4667
4668 if (ctx->allocate_all) {
4669 /* remove thin provisioning */
4670 _spdk_bs_blob_list_remove(_blob);
4671 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true);
4672 _blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV;
4673 _blob->back_bs_dev->destroy(_blob->back_bs_dev);
4674 _blob->back_bs_dev = NULL;
4675 _blob->parent_id = SPDK_BLOBID_INVALID;
4676 } else {
4677 _parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob;
4678 if (_parent->parent_id != SPDK_BLOBID_INVALID) {
4679 /* We must change the parent of the inflated blob */
4680 spdk_bs_open_blob(_blob->bs, _parent->parent_id,
4681 _spdk_bs_inflate_blob_set_parent_cpl, ctx);
4682 return;
4683 }
4684
4685 _spdk_bs_blob_list_remove(_blob);
4686 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true);
4687 _blob->parent_id = SPDK_BLOBID_INVALID;
4688 _blob->back_bs_dev->destroy(_blob->back_bs_dev);
4689 _blob->back_bs_dev = spdk_bs_create_zeroes_dev();
4690 }
4691
4692 _blob->state = SPDK_BLOB_STATE_DIRTY;
4693 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
7c673cae
FG
4694}
4695
11fdf7f2
TL
4696/* Check if cluster needs allocation */
4697static inline bool
4698_spdk_bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all)
7c673cae 4699{
11fdf7f2
TL
4700 struct spdk_blob_bs_dev *b;
4701
7c673cae
FG
4702 assert(blob != NULL);
4703
11fdf7f2
TL
4704 if (blob->active.clusters[cluster] != 0) {
4705 /* Cluster is already allocated */
4706 return false;
4707 }
4708
4709 if (blob->parent_id == SPDK_BLOBID_INVALID) {
4710 /* Blob have no parent blob */
4711 return allocate_all;
4712 }
7c673cae 4713
11fdf7f2
TL
4714 b = (struct spdk_blob_bs_dev *)blob->back_bs_dev;
4715 return (allocate_all || b->blob->active.clusters[cluster] != 0);
4716}
7c673cae
FG
4717
4718static void
11fdf7f2 4719_spdk_bs_inflate_blob_touch_next(void *cb_arg, int bserrno)
7c673cae 4720{
11fdf7f2
TL
4721 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4722 struct spdk_blob *_blob = ctx->original.blob;
4723 uint64_t offset;
7c673cae 4724
11fdf7f2
TL
4725 if (bserrno != 0) {
4726 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4727 return;
4728 }
7c673cae 4729
11fdf7f2
TL
4730 for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) {
4731 if (_spdk_bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) {
4732 break;
4733 }
4734 }
4735
4736 if (ctx->cluster < _blob->active.num_clusters) {
4737 offset = _spdk_bs_cluster_to_lba(_blob->bs, ctx->cluster);
4738
4739 /* We may safely increment a cluster before write */
4740 ctx->cluster++;
4741
4742 /* Use zero length write to touch a cluster */
4743 spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0,
4744 _spdk_bs_inflate_blob_touch_next, ctx);
4745 } else {
4746 _spdk_bs_inflate_blob_done(cb_arg, bserrno);
4747 }
7c673cae
FG
4748}
4749
11fdf7f2
TL
4750static void
4751_spdk_bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
7c673cae 4752{
11fdf7f2
TL
4753 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4754 uint64_t lfc; /* lowest free cluster */
4755 uint64_t i;
7c673cae 4756
11fdf7f2
TL
4757 if (bserrno != 0) {
4758 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
7c673cae
FG
4759 return;
4760 }
11fdf7f2 4761 ctx->original.blob = _blob;
7c673cae 4762
11fdf7f2
TL
4763 if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) {
4764 /* This blob have no parent, so we cannot decouple it. */
4765 SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n");
4766 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL);
4767 return;
4768 }
4769
4770 if (spdk_blob_is_thin_provisioned(_blob) == false) {
4771 /* This is not thin provisioned blob. No need to inflate. */
4772 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, 0);
4773 return;
4774 }
4775
4776 /* Do two passes - one to verify that we can obtain enough clusters
4777 * and another to actually claim them.
7c673cae 4778 */
11fdf7f2
TL
4779 lfc = 0;
4780 for (i = 0; i < _blob->active.num_clusters; i++) {
4781 if (_spdk_bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) {
4782 lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc);
4783 if (lfc == UINT32_MAX) {
4784 /* No more free clusters. Cannot satisfy the request */
4785 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC);
4786 return;
4787 }
4788 lfc++;
4789 }
4790 }
7c673cae 4791
11fdf7f2
TL
4792 ctx->cluster = 0;
4793 _spdk_bs_inflate_blob_touch_next(ctx, 0);
4794}
7c673cae 4795
11fdf7f2
TL
4796static void
4797_spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
4798 spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg)
4799{
4800 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
4801
4802 if (!ctx) {
4803 cb_fn(cb_arg, -ENOMEM);
7c673cae
FG
4804 return;
4805 }
11fdf7f2
TL
4806 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
4807 ctx->cpl.u.bs_basic.cb_fn = cb_fn;
4808 ctx->cpl.u.bs_basic.cb_arg = cb_arg;
4809 ctx->bserrno = 0;
4810 ctx->original.id = blobid;
4811 ctx->channel = channel;
4812 ctx->allocate_all = allocate_all;
4813
4814 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_inflate_blob_open_cpl, ctx);
4815}
7c673cae 4816
11fdf7f2
TL
4817void
4818spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
4819 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg)
4820{
4821 _spdk_bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg);
4822}
7c673cae 4823
11fdf7f2
TL
4824void
4825spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
4826 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg)
4827{
4828 _spdk_bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg);
4829}
4830/* END spdk_bs_inflate_blob */
4831
4832/* START spdk_blob_resize */
4833struct spdk_bs_resize_ctx {
4834 spdk_blob_op_complete cb_fn;
4835 void *cb_arg;
4836 struct spdk_blob *blob;
4837 uint64_t sz;
4838 int rc;
4839};
4840
4841static void
4842_spdk_bs_resize_unfreeze_cpl(void *cb_arg, int rc)
4843{
4844 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg;
4845
4846 if (rc != 0) {
4847 SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc);
4848 }
4849
4850 if (ctx->rc != 0) {
4851 SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc);
4852 rc = ctx->rc;
7c673cae
FG
4853 }
4854
11fdf7f2
TL
4855 ctx->blob->resize_in_progress = false;
4856
4857 ctx->cb_fn(ctx->cb_arg, rc);
4858 free(ctx);
7c673cae
FG
4859}
4860
11fdf7f2
TL
4861static void
4862_spdk_bs_resize_freeze_cpl(void *cb_arg, int rc)
4863{
4864 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg;
7c673cae 4865
11fdf7f2
TL
4866 if (rc != 0) {
4867 ctx->blob->resize_in_progress = false;
4868 ctx->cb_fn(ctx->cb_arg, rc);
4869 free(ctx);
4870 return;
4871 }
4872
4873 ctx->rc = _spdk_blob_resize(ctx->blob, ctx->sz);
4874
4875 _spdk_blob_unfreeze_io(ctx->blob, _spdk_bs_resize_unfreeze_cpl, ctx);
4876}
4877
4878void
4879spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg)
7c673cae 4880{
11fdf7f2 4881 struct spdk_bs_resize_ctx *ctx;
7c673cae 4882
11fdf7f2
TL
4883 _spdk_blob_verify_md_op(blob);
4884
4885 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
7c673cae 4886
11fdf7f2
TL
4887 if (blob->md_ro) {
4888 cb_fn(cb_arg, -EPERM);
4889 return;
4890 }
7c673cae
FG
4891
4892 if (sz == blob->active.num_clusters) {
11fdf7f2
TL
4893 cb_fn(cb_arg, 0);
4894 return;
7c673cae
FG
4895 }
4896
11fdf7f2
TL
4897 if (blob->resize_in_progress) {
4898 cb_fn(cb_arg, -EBUSY);
4899 return;
7c673cae
FG
4900 }
4901
11fdf7f2
TL
4902 ctx = calloc(1, sizeof(*ctx));
4903 if (!ctx) {
4904 cb_fn(cb_arg, -ENOMEM);
4905 return;
4906 }
4907
4908 blob->resize_in_progress = true;
4909 ctx->cb_fn = cb_fn;
4910 ctx->cb_arg = cb_arg;
4911 ctx->blob = blob;
4912 ctx->sz = sz;
4913 _spdk_blob_freeze_io(blob, _spdk_bs_resize_freeze_cpl, ctx);
7c673cae
FG
4914}
4915
11fdf7f2 4916/* END spdk_blob_resize */
7c673cae
FG
4917
4918
11fdf7f2 4919/* START spdk_bs_delete_blob */
7c673cae
FG
4920
4921static void
11fdf7f2 4922_spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
7c673cae 4923{
11fdf7f2 4924 spdk_bs_sequence_t *seq = cb_arg;
7c673cae
FG
4925
4926 spdk_bs_sequence_finish(seq, bserrno);
4927}
4928
4929static void
11fdf7f2 4930_spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
7c673cae
FG
4931{
4932 struct spdk_blob *blob = cb_arg;
4933
11fdf7f2
TL
4934 if (bserrno != 0) {
4935 /*
4936 * We already removed this blob from the blobstore tailq, so
4937 * we need to free it here since this is the last reference
4938 * to it.
4939 */
4940 _spdk_blob_free(blob);
4941 _spdk_bs_delete_close_cpl(seq, bserrno);
4942 return;
4943 }
4944
4945 /*
4946 * This will immediately decrement the ref_count and call
4947 * the completion routine since the metadata state is clean.
4948 * By calling spdk_blob_close, we reduce the number of call
4949 * points into code that touches the blob->open_ref count
4950 * and the blobstore's blob list.
4951 */
4952 spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq);
4953}
4954
4955static void
4956_spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
4957{
4958 spdk_bs_sequence_t *seq = cb_arg;
4959 uint32_t page_num;
4960
4961 if (bserrno != 0) {
4962 spdk_bs_sequence_finish(seq, bserrno);
4963 return;
4964 }
4965
4966 _spdk_blob_verify_md_op(blob);
4967
4968 if (blob->open_ref > 1) {
4969 /*
4970 * Someone has this blob open (besides this delete context).
4971 * Decrement the ref count directly and return -EBUSY.
4972 */
4973 blob->open_ref--;
4974 spdk_bs_sequence_finish(seq, -EBUSY);
4975 return;
4976 }
4977
4978 bserrno = _spdk_bs_blob_list_remove(blob);
4979 if (bserrno != 0) {
4980 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Remove blob #%" PRIu64 " from a list\n", blob->id);
4981 spdk_bs_sequence_finish(seq, bserrno);
4982 return;
4983 }
4984
4985 /*
4986 * Remove the blob from the blob_store list now, to ensure it does not
4987 * get returned after this point by _spdk_blob_lookup().
4988 */
4989 TAILQ_REMOVE(&blob->bs->blobs, blob, link);
4990 page_num = _spdk_bs_blobid_to_page(blob->id);
4991 spdk_bit_array_clear(blob->bs->used_blobids, page_num);
7c673cae
FG
4992 blob->state = SPDK_BLOB_STATE_DIRTY;
4993 blob->active.num_pages = 0;
11fdf7f2 4994 _spdk_blob_resize(blob, 0);
7c673cae 4995
11fdf7f2 4996 _spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob);
7c673cae
FG
4997}
4998
4999void
11fdf7f2
TL
5000spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
5001 spdk_blob_op_complete cb_fn, void *cb_arg)
7c673cae 5002{
7c673cae 5003 struct spdk_bs_cpl cpl;
11fdf7f2
TL
5004 spdk_bs_sequence_t *seq;
5005 struct spdk_blob_list *snapshot_entry = NULL;
7c673cae 5006
11fdf7f2 5007 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
7c673cae 5008
11fdf7f2 5009 assert(spdk_get_thread() == bs->md_thread);
7c673cae 5010
11fdf7f2
TL
5011 /* Check if this is a snapshot with clones */
5012 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) {
5013 if (snapshot_entry->id == blobid) {
5014 break;
5015 }
5016 }
5017 if (snapshot_entry != NULL) {
5018 /* If snapshot have clones, we cannot remove it */
5019 if (!TAILQ_EMPTY(&snapshot_entry->clones)) {
5020 SPDK_ERRLOG("Cannot remove snapshot with clones\n");
5021 cb_fn(cb_arg, -EBUSY);
5022 return;
5023 }
7c673cae
FG
5024 }
5025
5026 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5027 cpl.u.blob_basic.cb_fn = cb_fn;
5028 cpl.u.blob_basic.cb_arg = cb_arg;
5029
5030 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
5031 if (!seq) {
7c673cae
FG
5032 cb_fn(cb_arg, -ENOMEM);
5033 return;
5034 }
5035
11fdf7f2 5036 spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
7c673cae
FG
5037}
5038
11fdf7f2 5039/* END spdk_bs_delete_blob */
7c673cae 5040
11fdf7f2 5041/* START spdk_bs_open_blob */
7c673cae
FG
5042
5043static void
11fdf7f2 5044_spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
7c673cae
FG
5045{
5046 struct spdk_blob *blob = cb_arg;
5047
11fdf7f2
TL
5048 /* If the blob have crc error, we just return NULL. */
5049 if (blob == NULL) {
5050 seq->cpl.u.blob_handle.blob = NULL;
5051 spdk_bs_sequence_finish(seq, bserrno);
5052 return;
5053 }
5054
7c673cae
FG
5055 blob->open_ref++;
5056
5057 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
5058
5059 spdk_bs_sequence_finish(seq, bserrno);
5060}
5061
11fdf7f2
TL
5062void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
5063 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
7c673cae
FG
5064{
5065 struct spdk_blob *blob;
5066 struct spdk_bs_cpl cpl;
5067 spdk_bs_sequence_t *seq;
5068 uint32_t page_num;
5069
11fdf7f2
TL
5070 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
5071 assert(spdk_get_thread() == bs->md_thread);
5072
5073 page_num = _spdk_bs_blobid_to_page(blobid);
5074 if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
5075 /* Invalid blobid */
5076 cb_fn(cb_arg, NULL, -ENOENT);
5077 return;
5078 }
7c673cae
FG
5079
5080 blob = _spdk_blob_lookup(bs, blobid);
5081 if (blob) {
5082 blob->open_ref++;
5083 cb_fn(cb_arg, blob, 0);
5084 return;
5085 }
5086
7c673cae
FG
5087 blob = _spdk_blob_alloc(bs, blobid);
5088 if (!blob) {
5089 cb_fn(cb_arg, NULL, -ENOMEM);
5090 return;
5091 }
5092
5093 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
5094 cpl.u.blob_handle.cb_fn = cb_fn;
5095 cpl.u.blob_handle.cb_arg = cb_arg;
5096 cpl.u.blob_handle.blob = blob;
5097
5098 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
5099 if (!seq) {
5100 _spdk_blob_free(blob);
5101 cb_fn(cb_arg, NULL, -ENOMEM);
5102 return;
5103 }
5104
11fdf7f2
TL
5105 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
5106}
5107/* END spdk_bs_open_blob */
5108
5109/* START spdk_blob_set_read_only */
5110int spdk_blob_set_read_only(struct spdk_blob *blob)
5111{
5112 _spdk_blob_verify_md_op(blob);
5113
5114 blob->data_ro_flags |= SPDK_BLOB_READ_ONLY;
5115
5116 blob->state = SPDK_BLOB_STATE_DIRTY;
5117 return 0;
7c673cae 5118}
11fdf7f2
TL
5119/* END spdk_blob_set_read_only */
5120
5121/* START spdk_blob_sync_md */
7c673cae 5122
7c673cae 5123static void
11fdf7f2 5124_spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
7c673cae 5125{
11fdf7f2
TL
5126 struct spdk_blob *blob = cb_arg;
5127
5128 if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
5129 blob->data_ro = true;
5130 blob->md_ro = true;
5131 }
5132
7c673cae
FG
5133 spdk_bs_sequence_finish(seq, bserrno);
5134}
5135
11fdf7f2
TL
5136static void
5137_spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
7c673cae
FG
5138{
5139 struct spdk_bs_cpl cpl;
5140 spdk_bs_sequence_t *seq;
5141
11fdf7f2
TL
5142 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5143 cpl.u.blob_basic.cb_fn = cb_fn;
5144 cpl.u.blob_basic.cb_arg = cb_arg;
5145
5146 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
5147 if (!seq) {
5148 cb_fn(cb_arg, -ENOMEM);
5149 return;
5150 }
7c673cae 5151
11fdf7f2
TL
5152 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
5153}
7c673cae 5154
11fdf7f2
TL
5155void
5156spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
5157{
5158 _spdk_blob_verify_md_op(blob);
7c673cae 5159
11fdf7f2
TL
5160 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
5161
5162 if (blob->md_ro) {
5163 assert(blob->state == SPDK_BLOB_STATE_CLEAN);
7c673cae
FG
5164 cb_fn(cb_arg, 0);
5165 return;
5166 }
5167
11fdf7f2
TL
5168 _spdk_blob_sync_md(blob, cb_fn, cb_arg);
5169}
7c673cae 5170
11fdf7f2
TL
5171/* END spdk_blob_sync_md */
5172
5173struct spdk_blob_insert_cluster_ctx {
5174 struct spdk_thread *thread;
5175 struct spdk_blob *blob;
5176 uint32_t cluster_num; /* cluster index in blob */
5177 uint32_t cluster; /* cluster on disk */
5178 int rc;
5179 spdk_blob_op_complete cb_fn;
5180 void *cb_arg;
5181};
5182
5183static void
5184_spdk_blob_insert_cluster_msg_cpl(void *arg)
5185{
5186 struct spdk_blob_insert_cluster_ctx *ctx = arg;
5187
5188 ctx->cb_fn(ctx->cb_arg, ctx->rc);
5189 free(ctx);
5190}
5191
5192static void
5193_spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno)
5194{
5195 struct spdk_blob_insert_cluster_ctx *ctx = arg;
5196
5197 ctx->rc = bserrno;
5198 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
5199}
5200
5201static void
5202_spdk_blob_insert_cluster_msg(void *arg)
5203{
5204 struct spdk_blob_insert_cluster_ctx *ctx = arg;
5205
5206 ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster);
5207 if (ctx->rc != 0) {
5208 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
7c673cae
FG
5209 return;
5210 }
5211
11fdf7f2
TL
5212 ctx->blob->state = SPDK_BLOB_STATE_DIRTY;
5213 _spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx);
7c673cae
FG
5214}
5215
11fdf7f2
TL
5216static void
5217_spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
5218 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg)
5219{
5220 struct spdk_blob_insert_cluster_ctx *ctx;
5221
5222 ctx = calloc(1, sizeof(*ctx));
5223 if (ctx == NULL) {
5224 cb_fn(cb_arg, -ENOMEM);
5225 return;
5226 }
5227
5228 ctx->thread = spdk_get_thread();
5229 ctx->blob = blob;
5230 ctx->cluster_num = cluster_num;
5231 ctx->cluster = cluster;
5232 ctx->cb_fn = cb_fn;
5233 ctx->cb_arg = cb_arg;
5234
5235 spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx);
5236}
7c673cae 5237
11fdf7f2 5238/* START spdk_blob_close */
7c673cae
FG
5239
5240static void
5241_spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5242{
11fdf7f2 5243 struct spdk_blob *blob = cb_arg;
7c673cae 5244
11fdf7f2
TL
5245 if (bserrno == 0) {
5246 blob->open_ref--;
5247 if (blob->open_ref == 0) {
5248 /*
5249 * Blobs with active.num_pages == 0 are deleted blobs.
5250 * these blobs are removed from the blob_store list
5251 * when the deletion process starts - so don't try to
5252 * remove them again.
5253 */
5254 if (blob->active.num_pages > 0) {
5255 TAILQ_REMOVE(&blob->bs->blobs, blob, link);
5256 }
5257 _spdk_blob_free(blob);
5258 }
7c673cae
FG
5259 }
5260
7c673cae
FG
5261 spdk_bs_sequence_finish(seq, bserrno);
5262}
5263
11fdf7f2 5264void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
7c673cae
FG
5265{
5266 struct spdk_bs_cpl cpl;
7c673cae
FG
5267 spdk_bs_sequence_t *seq;
5268
11fdf7f2 5269 _spdk_blob_verify_md_op(blob);
7c673cae 5270
11fdf7f2 5271 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
7c673cae
FG
5272
5273 if (blob->open_ref == 0) {
5274 cb_fn(cb_arg, -EBADF);
5275 return;
5276 }
5277
7c673cae
FG
5278 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5279 cpl.u.blob_basic.cb_fn = cb_fn;
5280 cpl.u.blob_basic.cb_arg = cb_arg;
5281
5282 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
5283 if (!seq) {
5284 cb_fn(cb_arg, -ENOMEM);
5285 return;
5286 }
5287
7c673cae 5288 /* Sync metadata */
11fdf7f2 5289 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
7c673cae
FG
5290}
5291
11fdf7f2 5292/* END spdk_blob_close */
7c673cae 5293
11fdf7f2 5294struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
7c673cae 5295{
11fdf7f2 5296 return spdk_get_io_channel(bs);
7c673cae
FG
5297}
5298
5299void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
5300{
5301 spdk_put_io_channel(channel);
5302}
5303
11fdf7f2
TL
5304void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel,
5305 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
5306{
5307 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
5308 SPDK_BLOB_UNMAP);
5309}
5310
5311void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel,
5312 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
5313{
5314 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
5315 SPDK_BLOB_WRITE_ZEROES);
5316}
5317
5318void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel,
5319 void *payload, uint64_t offset, uint64_t length,
5320 spdk_blob_op_complete cb_fn, void *cb_arg)
5321{
5322 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
5323 SPDK_BLOB_WRITE);
5324}
5325
5326void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel,
5327 void *payload, uint64_t offset, uint64_t length,
5328 spdk_blob_op_complete cb_fn, void *cb_arg)
7c673cae 5329{
11fdf7f2
TL
5330 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
5331 SPDK_BLOB_READ);
7c673cae
FG
5332}
5333
11fdf7f2
TL
5334void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel,
5335 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
5336 spdk_blob_op_complete cb_fn, void *cb_arg)
7c673cae 5337{
11fdf7f2 5338 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
7c673cae
FG
5339}
5340
11fdf7f2
TL
5341void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel,
5342 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
5343 spdk_blob_op_complete cb_fn, void *cb_arg)
7c673cae 5344{
11fdf7f2 5345 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
7c673cae
FG
5346}
5347
5348struct spdk_bs_iter_ctx {
5349 int64_t page_num;
5350 struct spdk_blob_store *bs;
5351
5352 spdk_blob_op_with_handle_complete cb_fn;
5353 void *cb_arg;
5354};
5355
5356static void
11fdf7f2 5357_spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
7c673cae
FG
5358{
5359 struct spdk_bs_iter_ctx *ctx = cb_arg;
5360 struct spdk_blob_store *bs = ctx->bs;
5361 spdk_blob_id id;
5362
5363 if (bserrno == 0) {
11fdf7f2 5364 ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
7c673cae
FG
5365 free(ctx);
5366 return;
5367 }
5368
5369 ctx->page_num++;
11fdf7f2
TL
5370 ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
5371 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
7c673cae
FG
5372 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
5373 free(ctx);
5374 return;
5375 }
5376
11fdf7f2 5377 id = _spdk_bs_page_to_blobid(ctx->page_num);
7c673cae 5378
11fdf7f2 5379 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
7c673cae
FG
5380}
5381
5382void
11fdf7f2
TL
5383spdk_bs_iter_first(struct spdk_blob_store *bs,
5384 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
7c673cae
FG
5385{
5386 struct spdk_bs_iter_ctx *ctx;
5387
5388 ctx = calloc(1, sizeof(*ctx));
5389 if (!ctx) {
5390 cb_fn(cb_arg, NULL, -ENOMEM);
5391 return;
5392 }
5393
5394 ctx->page_num = -1;
5395 ctx->bs = bs;
5396 ctx->cb_fn = cb_fn;
5397 ctx->cb_arg = cb_arg;
5398
5399 _spdk_bs_iter_cpl(ctx, NULL, -1);
5400}
5401
5402static void
5403_spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
5404{
5405 struct spdk_bs_iter_ctx *ctx = cb_arg;
5406
5407 _spdk_bs_iter_cpl(ctx, NULL, -1);
5408}
5409
5410void
11fdf7f2
TL
5411spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob,
5412 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
7c673cae
FG
5413{
5414 struct spdk_bs_iter_ctx *ctx;
7c673cae 5415
7c673cae
FG
5416 assert(blob != NULL);
5417
5418 ctx = calloc(1, sizeof(*ctx));
5419 if (!ctx) {
5420 cb_fn(cb_arg, NULL, -ENOMEM);
5421 return;
5422 }
5423
5424 ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
5425 ctx->bs = bs;
5426 ctx->cb_fn = cb_fn;
5427 ctx->cb_arg = cb_arg;
5428
5429 /* Close the existing blob */
11fdf7f2 5430 spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx);
7c673cae
FG
5431}
5432
11fdf7f2
TL
5433static int
5434_spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
5435 uint16_t value_len, bool internal)
7c673cae 5436{
11fdf7f2
TL
5437 struct spdk_xattr_tailq *xattrs;
5438 struct spdk_xattr *xattr;
7c673cae 5439
11fdf7f2
TL
5440 _spdk_blob_verify_md_op(blob);
5441
5442 if (blob->md_ro) {
5443 return -EPERM;
5444 }
7c673cae 5445
11fdf7f2
TL
5446 if (internal) {
5447 xattrs = &blob->xattrs_internal;
5448 blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR;
5449 } else {
5450 xattrs = &blob->xattrs;
5451 }
7c673cae 5452
11fdf7f2 5453 TAILQ_FOREACH(xattr, xattrs, link) {
7c673cae
FG
5454 if (!strcmp(name, xattr->name)) {
5455 free(xattr->value);
5456 xattr->value_len = value_len;
5457 xattr->value = malloc(value_len);
5458 memcpy(xattr->value, value, value_len);
5459
5460 blob->state = SPDK_BLOB_STATE_DIRTY;
5461
5462 return 0;
5463 }
5464 }
5465
5466 xattr = calloc(1, sizeof(*xattr));
5467 if (!xattr) {
11fdf7f2 5468 return -ENOMEM;
7c673cae
FG
5469 }
5470 xattr->name = strdup(name);
5471 xattr->value_len = value_len;
5472 xattr->value = malloc(value_len);
5473 memcpy(xattr->value, value, value_len);
11fdf7f2 5474 TAILQ_INSERT_TAIL(xattrs, xattr, link);
7c673cae
FG
5475
5476 blob->state = SPDK_BLOB_STATE_DIRTY;
5477
5478 return 0;
5479}
5480
5481int
11fdf7f2
TL
5482spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
5483 uint16_t value_len)
5484{
5485 return _spdk_blob_set_xattr(blob, name, value, value_len, false);
5486}
5487
5488static int
5489_spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal)
7c673cae 5490{
11fdf7f2 5491 struct spdk_xattr_tailq *xattrs;
7c673cae
FG
5492 struct spdk_xattr *xattr;
5493
11fdf7f2 5494 _spdk_blob_verify_md_op(blob);
7c673cae 5495
11fdf7f2
TL
5496 if (blob->md_ro) {
5497 return -EPERM;
5498 }
5499 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
7c673cae 5500
11fdf7f2 5501 TAILQ_FOREACH(xattr, xattrs, link) {
7c673cae 5502 if (!strcmp(name, xattr->name)) {
11fdf7f2 5503 TAILQ_REMOVE(xattrs, xattr, link);
7c673cae
FG
5504 free(xattr->value);
5505 free(xattr->name);
5506 free(xattr);
5507
11fdf7f2
TL
5508 if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) {
5509 blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR;
5510 }
7c673cae
FG
5511 blob->state = SPDK_BLOB_STATE_DIRTY;
5512
5513 return 0;
5514 }
5515 }
5516
5517 return -ENOENT;
5518}
5519
5520int
11fdf7f2
TL
5521spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name)
5522{
5523 return _spdk_blob_remove_xattr(blob, name, false);
5524}
5525
5526static int
5527_spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
5528 const void **value, size_t *value_len, bool internal)
7c673cae
FG
5529{
5530 struct spdk_xattr *xattr;
11fdf7f2
TL
5531 struct spdk_xattr_tailq *xattrs;
5532
5533 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
7c673cae 5534
11fdf7f2 5535 TAILQ_FOREACH(xattr, xattrs, link) {
7c673cae
FG
5536 if (!strcmp(name, xattr->name)) {
5537 *value = xattr->value;
5538 *value_len = xattr->value_len;
5539 return 0;
5540 }
5541 }
7c673cae
FG
5542 return -ENOENT;
5543}
5544
11fdf7f2
TL
5545int
5546spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
5547 const void **value, size_t *value_len)
5548{
5549 _spdk_blob_verify_md_op(blob);
5550
5551 return _spdk_blob_get_xattr_value(blob, name, value, value_len, false);
5552}
5553
7c673cae
FG
5554struct spdk_xattr_names {
5555 uint32_t count;
5556 const char *names[0];
5557};
5558
11fdf7f2
TL
5559static int
5560_spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names)
7c673cae
FG
5561{
5562 struct spdk_xattr *xattr;
5563 int count = 0;
5564
11fdf7f2 5565 TAILQ_FOREACH(xattr, xattrs, link) {
7c673cae
FG
5566 count++;
5567 }
5568
5569 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
5570 if (*names == NULL) {
5571 return -ENOMEM;
5572 }
5573
11fdf7f2 5574 TAILQ_FOREACH(xattr, xattrs, link) {
7c673cae
FG
5575 (*names)->names[(*names)->count++] = xattr->name;
5576 }
5577
5578 return 0;
5579}
5580
11fdf7f2
TL
5581int
5582spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names)
5583{
5584 _spdk_blob_verify_md_op(blob);
5585
5586 return _spdk_blob_get_xattr_names(&blob->xattrs, names);
5587}
5588
7c673cae
FG
5589uint32_t
5590spdk_xattr_names_get_count(struct spdk_xattr_names *names)
5591{
5592 assert(names != NULL);
5593
5594 return names->count;
5595}
5596
5597const char *
5598spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
5599{
5600 if (index >= names->count) {
5601 return NULL;
5602 }
5603
5604 return names->names[index];
5605}
5606
5607void
5608spdk_xattr_names_free(struct spdk_xattr_names *names)
5609{
5610 free(names);
5611}
5612
11fdf7f2
TL
5613struct spdk_bs_type
5614spdk_bs_get_bstype(struct spdk_blob_store *bs)
5615{
5616 return bs->bstype;
5617}
5618
5619void
5620spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
5621{
5622 memcpy(&bs->bstype, &bstype, sizeof(bstype));
5623}
5624
5625bool
5626spdk_blob_is_read_only(struct spdk_blob *blob)
5627{
5628 assert(blob != NULL);
5629 return (blob->data_ro || blob->md_ro);
5630}
5631
5632bool
5633spdk_blob_is_snapshot(struct spdk_blob *blob)
5634{
5635 struct spdk_blob_list *snapshot_entry;
5636
5637 assert(blob != NULL);
5638
5639 TAILQ_FOREACH(snapshot_entry, &blob->bs->snapshots, link) {
5640 if (snapshot_entry->id == blob->id) {
5641 break;
5642 }
5643 }
5644
5645 if (snapshot_entry == NULL) {
5646 return false;
5647 }
5648
5649 return true;
5650}
5651
5652bool
5653spdk_blob_is_clone(struct spdk_blob *blob)
5654{
5655 assert(blob != NULL);
5656
5657 if (blob->parent_id != SPDK_BLOBID_INVALID) {
5658 assert(spdk_blob_is_thin_provisioned(blob));
5659 return true;
5660 }
5661
5662 return false;
5663}
5664
5665bool
5666spdk_blob_is_thin_provisioned(struct spdk_blob *blob)
5667{
5668 assert(blob != NULL);
5669 return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV);
5670}
5671
5672spdk_blob_id
5673spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id)
5674{
5675 struct spdk_blob_list *snapshot_entry = NULL;
5676 struct spdk_blob_list *clone_entry = NULL;
5677
5678 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) {
5679 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
5680 if (clone_entry->id == blob_id) {
5681 return snapshot_entry->id;
5682 }
5683 }
5684 }
5685
5686 return SPDK_BLOBID_INVALID;
5687}
5688
5689int
5690spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids,
5691 size_t *count)
5692{
5693 struct spdk_blob_list *snapshot_entry, *clone_entry;
5694 size_t n;
5695
5696 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) {
5697 if (snapshot_entry->id == blobid) {
5698 break;
5699 }
5700 }
5701 if (snapshot_entry == NULL) {
5702 *count = 0;
5703 return 0;
5704 }
5705
5706 if (ids == NULL || *count < snapshot_entry->clone_count) {
5707 *count = snapshot_entry->clone_count;
5708 return -ENOMEM;
5709 }
5710 *count = snapshot_entry->clone_count;
5711
5712 n = 0;
5713 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
5714 ids[n++] = clone_entry->id;
5715 }
5716
5717 return 0;
5718}
5719
5720SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)