4 * Copyright (c) Intel Corporation.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include "spdk/stdinc.h"
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
39 #include "spdk/queue.h"
40 #include "spdk/thread.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 #include "spdk/util.h"
45 #include "spdk_internal/assert.h"
46 #include "spdk_internal/log.h"
48 #include "blobstore.h"
50 #define BLOB_CRC32C_INITIAL 0xffffffffUL
52 static int spdk_bs_register_md_thread(struct spdk_blob_store
*bs
);
53 static int spdk_bs_unregister_md_thread(struct spdk_blob_store
*bs
);
54 static void _spdk_blob_close_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
);
55 static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob
*blob
, uint32_t cluster_num
,
56 uint64_t cluster
, spdk_blob_op_complete cb_fn
, void *cb_arg
);
58 static int _spdk_blob_set_xattr(struct spdk_blob
*blob
, const char *name
, const void *value
,
59 uint16_t value_len
, bool internal
);
60 static int _spdk_blob_get_xattr_value(struct spdk_blob
*blob
, const char *name
,
61 const void **value
, size_t *value_len
, bool internal
);
62 static int _spdk_blob_remove_xattr(struct spdk_blob
*blob
, const char *name
, bool internal
);
65 _spdk_blob_verify_md_op(struct spdk_blob
*blob
)
68 assert(spdk_get_thread() == blob
->bs
->md_thread
);
69 assert(blob
->state
!= SPDK_BLOB_STATE_LOADING
);
72 static struct spdk_blob_list
*
73 _spdk_bs_get_snapshot_entry(struct spdk_blob_store
*bs
, spdk_blob_id blobid
)
75 struct spdk_blob_list
*snapshot_entry
= NULL
;
77 TAILQ_FOREACH(snapshot_entry
, &bs
->snapshots
, link
) {
78 if (snapshot_entry
->id
== blobid
) {
83 return snapshot_entry
;
87 _spdk_bs_claim_cluster(struct spdk_blob_store
*bs
, uint32_t cluster_num
)
89 assert(cluster_num
< spdk_bit_array_capacity(bs
->used_clusters
));
90 assert(spdk_bit_array_get(bs
->used_clusters
, cluster_num
) == false);
91 assert(bs
->num_free_clusters
> 0);
93 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Claiming cluster %u\n", cluster_num
);
95 spdk_bit_array_set(bs
->used_clusters
, cluster_num
);
96 bs
->num_free_clusters
--;
100 _spdk_blob_insert_cluster(struct spdk_blob
*blob
, uint32_t cluster_num
, uint64_t cluster
)
102 uint64_t *cluster_lba
= &blob
->active
.clusters
[cluster_num
];
104 _spdk_blob_verify_md_op(blob
);
106 if (*cluster_lba
!= 0) {
110 *cluster_lba
= _spdk_bs_cluster_to_lba(blob
->bs
, cluster
);
115 _spdk_bs_allocate_cluster(struct spdk_blob
*blob
, uint32_t cluster_num
,
116 uint64_t *lowest_free_cluster
, bool update_map
)
118 pthread_mutex_lock(&blob
->bs
->used_clusters_mutex
);
119 *lowest_free_cluster
= spdk_bit_array_find_first_clear(blob
->bs
->used_clusters
,
120 *lowest_free_cluster
);
121 if (*lowest_free_cluster
== UINT32_MAX
) {
122 /* No more free clusters. Cannot satisfy the request */
123 pthread_mutex_unlock(&blob
->bs
->used_clusters_mutex
);
127 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster
, blob
->id
);
128 _spdk_bs_claim_cluster(blob
->bs
, *lowest_free_cluster
);
129 pthread_mutex_unlock(&blob
->bs
->used_clusters_mutex
);
132 _spdk_blob_insert_cluster(blob
, cluster_num
, *lowest_free_cluster
);
139 _spdk_bs_release_cluster(struct spdk_blob_store
*bs
, uint32_t cluster_num
)
141 assert(cluster_num
< spdk_bit_array_capacity(bs
->used_clusters
));
142 assert(spdk_bit_array_get(bs
->used_clusters
, cluster_num
) == true);
143 assert(bs
->num_free_clusters
< bs
->total_clusters
);
145 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Releasing cluster %u\n", cluster_num
);
147 pthread_mutex_lock(&bs
->used_clusters_mutex
);
148 spdk_bit_array_clear(bs
->used_clusters
, cluster_num
);
149 bs
->num_free_clusters
++;
150 pthread_mutex_unlock(&bs
->used_clusters_mutex
);
154 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts
*xattrs
)
157 xattrs
->names
= NULL
;
159 xattrs
->get_value
= NULL
;
163 spdk_blob_opts_init(struct spdk_blob_opts
*opts
)
165 opts
->num_clusters
= 0;
166 opts
->thin_provision
= false;
167 _spdk_blob_xattrs_init(&opts
->xattrs
);
171 spdk_blob_open_opts_init(struct spdk_blob_open_opts
*opts
)
173 opts
->clear_method
= BLOB_CLEAR_WITH_UNMAP
;
176 static struct spdk_blob
*
177 _spdk_blob_alloc(struct spdk_blob_store
*bs
, spdk_blob_id id
)
179 struct spdk_blob
*blob
;
181 blob
= calloc(1, sizeof(*blob
));
189 blob
->parent_id
= SPDK_BLOBID_INVALID
;
191 blob
->state
= SPDK_BLOB_STATE_DIRTY
;
192 blob
->active
.num_pages
= 1;
193 blob
->active
.pages
= calloc(1, sizeof(*blob
->active
.pages
));
194 if (!blob
->active
.pages
) {
199 blob
->active
.pages
[0] = _spdk_bs_blobid_to_page(id
);
201 TAILQ_INIT(&blob
->xattrs
);
202 TAILQ_INIT(&blob
->xattrs_internal
);
208 _spdk_xattrs_free(struct spdk_xattr_tailq
*xattrs
)
210 struct spdk_xattr
*xattr
, *xattr_tmp
;
212 TAILQ_FOREACH_SAFE(xattr
, xattrs
, link
, xattr_tmp
) {
213 TAILQ_REMOVE(xattrs
, xattr
, link
);
221 _spdk_blob_free(struct spdk_blob
*blob
)
223 assert(blob
!= NULL
);
225 free(blob
->active
.clusters
);
226 free(blob
->clean
.clusters
);
227 free(blob
->active
.pages
);
228 free(blob
->clean
.pages
);
230 _spdk_xattrs_free(&blob
->xattrs
);
231 _spdk_xattrs_free(&blob
->xattrs_internal
);
233 if (blob
->back_bs_dev
) {
234 blob
->back_bs_dev
->destroy(blob
->back_bs_dev
);
240 struct freeze_io_ctx
{
241 struct spdk_bs_cpl cpl
;
242 struct spdk_blob
*blob
;
246 _spdk_blob_io_sync(struct spdk_io_channel_iter
*i
)
248 spdk_for_each_channel_continue(i
, 0);
252 _spdk_blob_execute_queued_io(struct spdk_io_channel_iter
*i
)
254 struct spdk_io_channel
*_ch
= spdk_io_channel_iter_get_channel(i
);
255 struct spdk_bs_channel
*ch
= spdk_io_channel_get_ctx(_ch
);
256 struct freeze_io_ctx
*ctx
= spdk_io_channel_iter_get_ctx(i
);
257 struct spdk_bs_request_set
*set
;
258 struct spdk_bs_user_op_args
*args
;
259 spdk_bs_user_op_t
*op
, *tmp
;
261 TAILQ_FOREACH_SAFE(op
, &ch
->queued_io
, link
, tmp
) {
262 set
= (struct spdk_bs_request_set
*)op
;
263 args
= &set
->u
.user_op
;
265 if (args
->blob
== ctx
->blob
) {
266 TAILQ_REMOVE(&ch
->queued_io
, op
, link
);
267 spdk_bs_user_op_execute(op
);
271 spdk_for_each_channel_continue(i
, 0);
275 _spdk_blob_io_cpl(struct spdk_io_channel_iter
*i
, int status
)
277 struct freeze_io_ctx
*ctx
= spdk_io_channel_iter_get_ctx(i
);
279 ctx
->cpl
.u
.blob_basic
.cb_fn(ctx
->cpl
.u
.blob_basic
.cb_arg
, 0);
285 _spdk_blob_freeze_io(struct spdk_blob
*blob
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
287 struct freeze_io_ctx
*ctx
;
289 ctx
= calloc(1, sizeof(*ctx
));
291 cb_fn(cb_arg
, -ENOMEM
);
295 ctx
->cpl
.type
= SPDK_BS_CPL_TYPE_BS_BASIC
;
296 ctx
->cpl
.u
.blob_basic
.cb_fn
= cb_fn
;
297 ctx
->cpl
.u
.blob_basic
.cb_arg
= cb_arg
;
300 /* Freeze I/O on blob */
301 blob
->frozen_refcnt
++;
303 if (blob
->frozen_refcnt
== 1) {
304 spdk_for_each_channel(blob
->bs
, _spdk_blob_io_sync
, ctx
, _spdk_blob_io_cpl
);
312 _spdk_blob_unfreeze_io(struct spdk_blob
*blob
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
314 struct freeze_io_ctx
*ctx
;
316 ctx
= calloc(1, sizeof(*ctx
));
318 cb_fn(cb_arg
, -ENOMEM
);
322 ctx
->cpl
.type
= SPDK_BS_CPL_TYPE_BS_BASIC
;
323 ctx
->cpl
.u
.blob_basic
.cb_fn
= cb_fn
;
324 ctx
->cpl
.u
.blob_basic
.cb_arg
= cb_arg
;
327 assert(blob
->frozen_refcnt
> 0);
329 blob
->frozen_refcnt
--;
331 if (blob
->frozen_refcnt
== 0) {
332 spdk_for_each_channel(blob
->bs
, _spdk_blob_execute_queued_io
, ctx
, _spdk_blob_io_cpl
);
340 _spdk_blob_mark_clean(struct spdk_blob
*blob
)
342 uint64_t *clusters
= NULL
;
343 uint32_t *pages
= NULL
;
345 assert(blob
!= NULL
);
347 if (blob
->active
.num_clusters
) {
348 assert(blob
->active
.clusters
);
349 clusters
= calloc(blob
->active
.num_clusters
, sizeof(*blob
->active
.clusters
));
353 memcpy(clusters
, blob
->active
.clusters
, blob
->active
.num_clusters
* sizeof(*clusters
));
356 if (blob
->active
.num_pages
) {
357 assert(blob
->active
.pages
);
358 pages
= calloc(blob
->active
.num_pages
, sizeof(*blob
->active
.pages
));
363 memcpy(pages
, blob
->active
.pages
, blob
->active
.num_pages
* sizeof(*pages
));
366 free(blob
->clean
.clusters
);
367 free(blob
->clean
.pages
);
369 blob
->clean
.num_clusters
= blob
->active
.num_clusters
;
370 blob
->clean
.clusters
= blob
->active
.clusters
;
371 blob
->clean
.num_pages
= blob
->active
.num_pages
;
372 blob
->clean
.pages
= blob
->active
.pages
;
374 blob
->active
.clusters
= clusters
;
375 blob
->active
.pages
= pages
;
377 /* If the metadata was dirtied again while the metadata was being written to disk,
378 * we do not want to revert the DIRTY state back to CLEAN here.
380 if (blob
->state
== SPDK_BLOB_STATE_LOADING
) {
381 blob
->state
= SPDK_BLOB_STATE_CLEAN
;
388 _spdk_blob_deserialize_xattr(struct spdk_blob
*blob
,
389 struct spdk_blob_md_descriptor_xattr
*desc_xattr
, bool internal
)
391 struct spdk_xattr
*xattr
;
393 if (desc_xattr
->length
!= sizeof(desc_xattr
->name_length
) +
394 sizeof(desc_xattr
->value_length
) +
395 desc_xattr
->name_length
+ desc_xattr
->value_length
) {
399 xattr
= calloc(1, sizeof(*xattr
));
404 xattr
->name
= malloc(desc_xattr
->name_length
+ 1);
405 if (xattr
->name
== NULL
) {
409 memcpy(xattr
->name
, desc_xattr
->name
, desc_xattr
->name_length
);
410 xattr
->name
[desc_xattr
->name_length
] = '\0';
412 xattr
->value
= malloc(desc_xattr
->value_length
);
413 if (xattr
->value
== NULL
) {
418 xattr
->value_len
= desc_xattr
->value_length
;
420 (void *)((uintptr_t)desc_xattr
->name
+ desc_xattr
->name_length
),
421 desc_xattr
->value_length
);
423 TAILQ_INSERT_TAIL(internal
? &blob
->xattrs_internal
: &blob
->xattrs
, xattr
, link
);
430 _spdk_blob_parse_page(const struct spdk_blob_md_page
*page
, struct spdk_blob
*blob
)
432 struct spdk_blob_md_descriptor
*desc
;
436 desc
= (struct spdk_blob_md_descriptor
*)page
->descriptors
;
437 while (cur_desc
< sizeof(page
->descriptors
)) {
438 if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_PADDING
) {
439 if (desc
->length
== 0) {
440 /* If padding and length are 0, this terminates the page */
443 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_FLAGS
) {
444 struct spdk_blob_md_descriptor_flags
*desc_flags
;
446 desc_flags
= (struct spdk_blob_md_descriptor_flags
*)desc
;
448 if (desc_flags
->length
!= sizeof(*desc_flags
) - sizeof(*desc
)) {
452 if ((desc_flags
->invalid_flags
| SPDK_BLOB_INVALID_FLAGS_MASK
) !=
453 SPDK_BLOB_INVALID_FLAGS_MASK
) {
457 if ((desc_flags
->data_ro_flags
| SPDK_BLOB_DATA_RO_FLAGS_MASK
) !=
458 SPDK_BLOB_DATA_RO_FLAGS_MASK
) {
459 blob
->data_ro
= true;
463 if ((desc_flags
->md_ro_flags
| SPDK_BLOB_MD_RO_FLAGS_MASK
) !=
464 SPDK_BLOB_MD_RO_FLAGS_MASK
) {
468 if ((desc_flags
->data_ro_flags
& SPDK_BLOB_READ_ONLY
)) {
469 blob
->data_ro
= true;
473 blob
->invalid_flags
= desc_flags
->invalid_flags
;
474 blob
->data_ro_flags
= desc_flags
->data_ro_flags
;
475 blob
->md_ro_flags
= desc_flags
->md_ro_flags
;
477 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_EXTENT
) {
478 struct spdk_blob_md_descriptor_extent
*desc_extent
;
480 unsigned int cluster_count
= blob
->active
.num_clusters
;
482 desc_extent
= (struct spdk_blob_md_descriptor_extent
*)desc
;
484 if (desc_extent
->length
== 0 ||
485 (desc_extent
->length
% sizeof(desc_extent
->extents
[0]) != 0)) {
489 for (i
= 0; i
< desc_extent
->length
/ sizeof(desc_extent
->extents
[0]); i
++) {
490 for (j
= 0; j
< desc_extent
->extents
[i
].length
; j
++) {
491 if (desc_extent
->extents
[i
].cluster_idx
!= 0) {
492 if (!spdk_bit_array_get(blob
->bs
->used_clusters
,
493 desc_extent
->extents
[i
].cluster_idx
+ j
)) {
501 if (cluster_count
== 0) {
504 tmp
= realloc(blob
->active
.clusters
, cluster_count
* sizeof(uint64_t));
508 blob
->active
.clusters
= tmp
;
509 blob
->active
.cluster_array_size
= cluster_count
;
511 for (i
= 0; i
< desc_extent
->length
/ sizeof(desc_extent
->extents
[0]); i
++) {
512 for (j
= 0; j
< desc_extent
->extents
[i
].length
; j
++) {
513 if (desc_extent
->extents
[i
].cluster_idx
!= 0) {
514 blob
->active
.clusters
[blob
->active
.num_clusters
++] = _spdk_bs_cluster_to_lba(blob
->bs
,
515 desc_extent
->extents
[i
].cluster_idx
+ j
);
516 } else if (spdk_blob_is_thin_provisioned(blob
)) {
517 blob
->active
.clusters
[blob
->active
.num_clusters
++] = 0;
524 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_XATTR
) {
527 rc
= _spdk_blob_deserialize_xattr(blob
,
528 (struct spdk_blob_md_descriptor_xattr
*) desc
, false);
532 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL
) {
535 rc
= _spdk_blob_deserialize_xattr(blob
,
536 (struct spdk_blob_md_descriptor_xattr
*) desc
, true);
541 /* Unrecognized descriptor type. Do not fail - just continue to the
542 * next descriptor. If this descriptor is associated with some feature
543 * defined in a newer version of blobstore, that version of blobstore
544 * should create and set an associated feature flag to specify if this
545 * blob can be loaded or not.
549 /* Advance to the next descriptor */
550 cur_desc
+= sizeof(*desc
) + desc
->length
;
551 if (cur_desc
+ sizeof(*desc
) > sizeof(page
->descriptors
)) {
554 desc
= (struct spdk_blob_md_descriptor
*)((uintptr_t)page
->descriptors
+ cur_desc
);
561 _spdk_blob_parse(const struct spdk_blob_md_page
*pages
, uint32_t page_count
,
562 struct spdk_blob
*blob
)
564 const struct spdk_blob_md_page
*page
;
568 assert(page_count
> 0);
569 assert(pages
[0].sequence_num
== 0);
570 assert(blob
!= NULL
);
571 assert(blob
->state
== SPDK_BLOB_STATE_LOADING
);
572 assert(blob
->active
.clusters
== NULL
);
574 /* The blobid provided doesn't match what's in the MD, this can
575 * happen for example if a bogus blobid is passed in through open.
577 if (blob
->id
!= pages
[0].id
) {
578 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
579 blob
->id
, pages
[0].id
);
583 for (i
= 0; i
< page_count
; i
++) {
586 assert(page
->id
== blob
->id
);
587 assert(page
->sequence_num
== i
);
589 rc
= _spdk_blob_parse_page(page
, blob
);
599 _spdk_blob_serialize_add_page(const struct spdk_blob
*blob
,
600 struct spdk_blob_md_page
**pages
,
601 uint32_t *page_count
,
602 struct spdk_blob_md_page
**last_page
)
604 struct spdk_blob_md_page
*page
;
606 assert(pages
!= NULL
);
607 assert(page_count
!= NULL
);
609 if (*page_count
== 0) {
610 assert(*pages
== NULL
);
612 *pages
= spdk_malloc(SPDK_BS_PAGE_SIZE
, SPDK_BS_PAGE_SIZE
,
613 NULL
, SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
615 assert(*pages
!= NULL
);
617 *pages
= spdk_realloc(*pages
,
618 SPDK_BS_PAGE_SIZE
* (*page_count
),
622 if (*pages
== NULL
) {
628 page
= &(*pages
)[*page_count
- 1];
629 memset(page
, 0, sizeof(*page
));
631 page
->sequence_num
= *page_count
- 1;
632 page
->next
= SPDK_INVALID_MD_PAGE
;
638 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
639 * Update required_sz on both success and failure.
643 _spdk_blob_serialize_xattr(const struct spdk_xattr
*xattr
,
644 uint8_t *buf
, size_t buf_sz
,
645 size_t *required_sz
, bool internal
)
647 struct spdk_blob_md_descriptor_xattr
*desc
;
649 *required_sz
= sizeof(struct spdk_blob_md_descriptor_xattr
) +
650 strlen(xattr
->name
) +
653 if (buf_sz
< *required_sz
) {
657 desc
= (struct spdk_blob_md_descriptor_xattr
*)buf
;
659 desc
->type
= internal
? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL
: SPDK_MD_DESCRIPTOR_TYPE_XATTR
;
660 desc
->length
= sizeof(desc
->name_length
) +
661 sizeof(desc
->value_length
) +
662 strlen(xattr
->name
) +
664 desc
->name_length
= strlen(xattr
->name
);
665 desc
->value_length
= xattr
->value_len
;
667 memcpy(desc
->name
, xattr
->name
, desc
->name_length
);
668 memcpy((void *)((uintptr_t)desc
->name
+ desc
->name_length
),
676 _spdk_blob_serialize_extent(const struct spdk_blob
*blob
,
677 uint64_t start_cluster
, uint64_t *next_cluster
,
678 uint8_t *buf
, size_t buf_sz
)
680 struct spdk_blob_md_descriptor_extent
*desc
;
682 uint64_t i
, extent_idx
;
683 uint64_t lba
, lba_per_cluster
, lba_count
;
685 /* The buffer must have room for at least one extent */
686 cur_sz
= sizeof(struct spdk_blob_md_descriptor
) + sizeof(desc
->extents
[0]);
687 if (buf_sz
< cur_sz
) {
688 *next_cluster
= start_cluster
;
692 desc
= (struct spdk_blob_md_descriptor_extent
*)buf
;
693 desc
->type
= SPDK_MD_DESCRIPTOR_TYPE_EXTENT
;
695 lba_per_cluster
= _spdk_bs_cluster_to_lba(blob
->bs
, 1);
697 lba
= blob
->active
.clusters
[start_cluster
];
698 lba_count
= lba_per_cluster
;
700 for (i
= start_cluster
+ 1; i
< blob
->active
.num_clusters
; i
++) {
701 if ((lba
+ lba_count
) == blob
->active
.clusters
[i
]) {
702 lba_count
+= lba_per_cluster
;
704 } else if (lba
== 0 && blob
->active
.clusters
[i
] == 0) {
705 lba_count
+= lba_per_cluster
;
708 desc
->extents
[extent_idx
].cluster_idx
= lba
/ lba_per_cluster
;
709 desc
->extents
[extent_idx
].length
= lba_count
/ lba_per_cluster
;
712 cur_sz
+= sizeof(desc
->extents
[extent_idx
]);
714 if (buf_sz
< cur_sz
) {
715 /* If we ran out of buffer space, return */
716 desc
->length
= sizeof(desc
->extents
[0]) * extent_idx
;
721 lba
= blob
->active
.clusters
[i
];
722 lba_count
= lba_per_cluster
;
725 desc
->extents
[extent_idx
].cluster_idx
= lba
/ lba_per_cluster
;
726 desc
->extents
[extent_idx
].length
= lba_count
/ lba_per_cluster
;
729 desc
->length
= sizeof(desc
->extents
[0]) * extent_idx
;
730 *next_cluster
= blob
->active
.num_clusters
;
736 _spdk_blob_serialize_flags(const struct spdk_blob
*blob
,
737 uint8_t *buf
, size_t *buf_sz
)
739 struct spdk_blob_md_descriptor_flags
*desc
;
742 * Flags get serialized first, so we should always have room for the flags
745 assert(*buf_sz
>= sizeof(*desc
));
747 desc
= (struct spdk_blob_md_descriptor_flags
*)buf
;
748 desc
->type
= SPDK_MD_DESCRIPTOR_TYPE_FLAGS
;
749 desc
->length
= sizeof(*desc
) - sizeof(struct spdk_blob_md_descriptor
);
750 desc
->invalid_flags
= blob
->invalid_flags
;
751 desc
->data_ro_flags
= blob
->data_ro_flags
;
752 desc
->md_ro_flags
= blob
->md_ro_flags
;
754 *buf_sz
-= sizeof(*desc
);
758 _spdk_blob_serialize_xattrs(const struct spdk_blob
*blob
,
759 const struct spdk_xattr_tailq
*xattrs
, bool internal
,
760 struct spdk_blob_md_page
**pages
,
761 struct spdk_blob_md_page
*cur_page
,
762 uint32_t *page_count
, uint8_t **buf
,
763 size_t *remaining_sz
)
765 const struct spdk_xattr
*xattr
;
768 TAILQ_FOREACH(xattr
, xattrs
, link
) {
769 size_t required_sz
= 0;
771 rc
= _spdk_blob_serialize_xattr(xattr
,
773 &required_sz
, internal
);
775 /* Need to add a new page to the chain */
776 rc
= _spdk_blob_serialize_add_page(blob
, pages
, page_count
,
785 *buf
= (uint8_t *)cur_page
->descriptors
;
786 *remaining_sz
= sizeof(cur_page
->descriptors
);
790 rc
= _spdk_blob_serialize_xattr(xattr
,
792 &required_sz
, internal
);
802 *remaining_sz
-= required_sz
;
810 _spdk_blob_serialize(const struct spdk_blob
*blob
, struct spdk_blob_md_page
**pages
,
811 uint32_t *page_count
)
813 struct spdk_blob_md_page
*cur_page
;
817 uint64_t last_cluster
;
819 assert(pages
!= NULL
);
820 assert(page_count
!= NULL
);
821 assert(blob
!= NULL
);
822 assert(blob
->state
== SPDK_BLOB_STATE_DIRTY
);
827 /* A blob always has at least 1 page, even if it has no descriptors */
828 rc
= _spdk_blob_serialize_add_page(blob
, pages
, page_count
, &cur_page
);
833 buf
= (uint8_t *)cur_page
->descriptors
;
834 remaining_sz
= sizeof(cur_page
->descriptors
);
836 /* Serialize flags */
837 _spdk_blob_serialize_flags(blob
, buf
, &remaining_sz
);
838 buf
+= sizeof(struct spdk_blob_md_descriptor_flags
);
840 /* Serialize xattrs */
841 rc
= _spdk_blob_serialize_xattrs(blob
, &blob
->xattrs
, false,
842 pages
, cur_page
, page_count
, &buf
, &remaining_sz
);
847 /* Serialize internal xattrs */
848 rc
= _spdk_blob_serialize_xattrs(blob
, &blob
->xattrs_internal
, true,
849 pages
, cur_page
, page_count
, &buf
, &remaining_sz
);
854 /* Serialize extents */
856 while (last_cluster
< blob
->active
.num_clusters
) {
857 _spdk_blob_serialize_extent(blob
, last_cluster
, &last_cluster
,
860 if (last_cluster
== blob
->active
.num_clusters
) {
864 rc
= _spdk_blob_serialize_add_page(blob
, pages
, page_count
,
870 buf
= (uint8_t *)cur_page
->descriptors
;
871 remaining_sz
= sizeof(cur_page
->descriptors
);
877 struct spdk_blob_load_ctx
{
878 struct spdk_blob
*blob
;
880 struct spdk_blob_md_page
*pages
;
882 spdk_bs_sequence_t
*seq
;
884 spdk_bs_sequence_cpl cb_fn
;
889 _spdk_blob_md_page_calc_crc(void *page
)
893 crc
= BLOB_CRC32C_INITIAL
;
894 crc
= spdk_crc32c_update(page
, SPDK_BS_PAGE_SIZE
- 4, crc
);
895 crc
^= BLOB_CRC32C_INITIAL
;
902 _spdk_blob_load_final(void *cb_arg
, int bserrno
)
904 struct spdk_blob_load_ctx
*ctx
= cb_arg
;
905 struct spdk_blob
*blob
= ctx
->blob
;
907 _spdk_blob_mark_clean(blob
);
909 ctx
->cb_fn(ctx
->seq
, ctx
->cb_arg
, bserrno
);
911 /* Free the memory */
912 spdk_free(ctx
->pages
);
917 _spdk_blob_load_snapshot_cpl(void *cb_arg
, struct spdk_blob
*snapshot
, int bserrno
)
919 struct spdk_blob_load_ctx
*ctx
= cb_arg
;
920 struct spdk_blob
*blob
= ctx
->blob
;
926 blob
->back_bs_dev
= spdk_bs_create_blob_bs_dev(snapshot
);
928 if (blob
->back_bs_dev
== NULL
) {
933 _spdk_blob_load_final(ctx
, bserrno
);
937 SPDK_ERRLOG("Snapshot fail\n");
938 _spdk_blob_free(blob
);
939 ctx
->cb_fn(ctx
->seq
, NULL
, bserrno
);
940 spdk_free(ctx
->pages
);
945 _spdk_blob_load_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
947 struct spdk_blob_load_ctx
*ctx
= cb_arg
;
948 struct spdk_blob
*blob
= ctx
->blob
;
949 struct spdk_blob_md_page
*page
;
956 SPDK_ERRLOG("Metadata page read failed: %d\n", bserrno
);
957 _spdk_blob_free(blob
);
958 ctx
->cb_fn(seq
, NULL
, bserrno
);
959 spdk_free(ctx
->pages
);
964 page
= &ctx
->pages
[ctx
->num_pages
- 1];
965 crc
= _spdk_blob_md_page_calc_crc(page
);
966 if (crc
!= page
->crc
) {
967 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx
->num_pages
);
968 _spdk_blob_free(blob
);
969 ctx
->cb_fn(seq
, NULL
, -EINVAL
);
970 spdk_free(ctx
->pages
);
975 if (page
->next
!= SPDK_INVALID_MD_PAGE
) {
976 uint32_t next_page
= page
->next
;
977 uint64_t next_lba
= _spdk_bs_page_to_lba(blob
->bs
, blob
->bs
->md_start
+ next_page
);
980 assert(next_lba
< (blob
->bs
->md_start
+ blob
->bs
->md_len
));
982 /* Read the next page */
984 ctx
->pages
= spdk_realloc(ctx
->pages
, (sizeof(*page
) * ctx
->num_pages
),
986 if (ctx
->pages
== NULL
) {
987 ctx
->cb_fn(seq
, ctx
->cb_arg
, -ENOMEM
);
992 spdk_bs_sequence_read_dev(seq
, &ctx
->pages
[ctx
->num_pages
- 1],
994 _spdk_bs_byte_to_lba(blob
->bs
, sizeof(*page
)),
995 _spdk_blob_load_cpl
, ctx
);
999 /* Parse the pages */
1000 rc
= _spdk_blob_parse(ctx
->pages
, ctx
->num_pages
, blob
);
1002 _spdk_blob_free(blob
);
1003 ctx
->cb_fn(seq
, NULL
, rc
);
1004 spdk_free(ctx
->pages
);
1011 if (spdk_blob_is_thin_provisioned(blob
)) {
1012 rc
= _spdk_blob_get_xattr_value(blob
, BLOB_SNAPSHOT
, &value
, &len
, true);
1014 if (len
!= sizeof(spdk_blob_id
)) {
1015 _spdk_blob_free(blob
);
1016 ctx
->cb_fn(seq
, NULL
, -EINVAL
);
1017 spdk_free(ctx
->pages
);
1021 /* open snapshot blob and continue in the callback function */
1022 blob
->parent_id
= *(spdk_blob_id
*)value
;
1023 spdk_bs_open_blob(blob
->bs
, blob
->parent_id
,
1024 _spdk_blob_load_snapshot_cpl
, ctx
);
1027 /* add zeroes_dev for thin provisioned blob */
1028 blob
->back_bs_dev
= spdk_bs_create_zeroes_dev();
1032 blob
->back_bs_dev
= NULL
;
1034 _spdk_blob_load_final(ctx
, bserrno
);
1037 /* Load a blob from disk given a blobid */
1039 _spdk_blob_load(spdk_bs_sequence_t
*seq
, struct spdk_blob
*blob
,
1040 spdk_bs_sequence_cpl cb_fn
, void *cb_arg
)
1042 struct spdk_blob_load_ctx
*ctx
;
1043 struct spdk_blob_store
*bs
;
1047 _spdk_blob_verify_md_op(blob
);
1051 ctx
= calloc(1, sizeof(*ctx
));
1053 cb_fn(seq
, cb_arg
, -ENOMEM
);
1058 ctx
->pages
= spdk_realloc(ctx
->pages
, SPDK_BS_PAGE_SIZE
, SPDK_BS_PAGE_SIZE
);
1061 cb_fn(seq
, cb_arg
, -ENOMEM
);
1066 ctx
->cb_arg
= cb_arg
;
1068 page_num
= _spdk_bs_blobid_to_page(blob
->id
);
1069 lba
= _spdk_bs_page_to_lba(blob
->bs
, bs
->md_start
+ page_num
);
1071 blob
->state
= SPDK_BLOB_STATE_LOADING
;
1073 spdk_bs_sequence_read_dev(seq
, &ctx
->pages
[0], lba
,
1074 _spdk_bs_byte_to_lba(bs
, SPDK_BS_PAGE_SIZE
),
1075 _spdk_blob_load_cpl
, ctx
);
1078 struct spdk_blob_persist_ctx
{
1079 struct spdk_blob
*blob
;
1081 struct spdk_bs_super_block
*super
;
1083 struct spdk_blob_md_page
*pages
;
1087 spdk_bs_sequence_t
*seq
;
1088 spdk_bs_sequence_cpl cb_fn
;
1093 spdk_bs_batch_clear_dev(struct spdk_blob_persist_ctx
*ctx
, spdk_bs_batch_t
*batch
, uint64_t lba
,
1096 if (ctx
->blob
->clear_method
== BLOB_CLEAR_WITH_DEFAULT
||
1097 ctx
->blob
->clear_method
== BLOB_CLEAR_WITH_UNMAP
) {
1098 spdk_bs_batch_unmap_dev(batch
, lba
, lba_count
);
1099 } else if (ctx
->blob
->clear_method
== BLOB_CLEAR_WITH_WRITE_ZEROES
) {
1100 spdk_bs_batch_write_zeroes_dev(batch
, lba
, lba_count
);
1105 _spdk_blob_persist_complete(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1107 struct spdk_blob_persist_ctx
*ctx
= cb_arg
;
1108 struct spdk_blob
*blob
= ctx
->blob
;
1111 _spdk_blob_mark_clean(blob
);
1114 /* Call user callback */
1115 ctx
->cb_fn(seq
, ctx
->cb_arg
, bserrno
);
1117 /* Free the memory */
1118 spdk_free(ctx
->pages
);
1123 _spdk_blob_persist_clear_clusters_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1125 struct spdk_blob_persist_ctx
*ctx
= cb_arg
;
1126 struct spdk_blob
*blob
= ctx
->blob
;
1127 struct spdk_blob_store
*bs
= blob
->bs
;
1131 /* Release all clusters that were truncated */
1132 for (i
= blob
->active
.num_clusters
; i
< blob
->active
.cluster_array_size
; i
++) {
1133 uint32_t cluster_num
= _spdk_bs_lba_to_cluster(bs
, blob
->active
.clusters
[i
]);
1135 /* Nothing to release if it was not allocated */
1136 if (blob
->active
.clusters
[i
] != 0) {
1137 _spdk_bs_release_cluster(bs
, cluster_num
);
1141 if (blob
->active
.num_clusters
== 0) {
1142 free(blob
->active
.clusters
);
1143 blob
->active
.clusters
= NULL
;
1144 blob
->active
.cluster_array_size
= 0;
1145 } else if (blob
->active
.num_clusters
!= blob
->active
.cluster_array_size
) {
1146 tmp
= realloc(blob
->active
.clusters
, sizeof(uint64_t) * blob
->active
.num_clusters
);
1147 assert(tmp
!= NULL
);
1148 blob
->active
.clusters
= tmp
;
1149 blob
->active
.cluster_array_size
= blob
->active
.num_clusters
;
1152 _spdk_blob_persist_complete(seq
, ctx
, bserrno
);
1156 _spdk_blob_persist_clear_clusters(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1158 struct spdk_blob_persist_ctx
*ctx
= cb_arg
;
1159 struct spdk_blob
*blob
= ctx
->blob
;
1160 struct spdk_blob_store
*bs
= blob
->bs
;
1161 spdk_bs_batch_t
*batch
;
1166 /* Clusters don't move around in blobs. The list shrinks or grows
1167 * at the end, but no changes ever occur in the middle of the list.
1170 batch
= spdk_bs_sequence_to_batch(seq
, _spdk_blob_persist_clear_clusters_cpl
, ctx
);
1172 /* Clear all clusters that were truncated */
1175 for (i
= blob
->active
.num_clusters
; i
< blob
->active
.cluster_array_size
; i
++) {
1176 uint64_t next_lba
= blob
->active
.clusters
[i
];
1177 uint32_t next_lba_count
= _spdk_bs_cluster_to_lba(bs
, 1);
1179 if (next_lba
> 0 && (lba
+ lba_count
) == next_lba
) {
1180 /* This cluster is contiguous with the previous one. */
1181 lba_count
+= next_lba_count
;
1185 /* This cluster is not contiguous with the previous one. */
1187 /* If a run of LBAs previously existing, clear them now */
1188 if (lba_count
> 0) {
1189 spdk_bs_batch_clear_dev(ctx
, batch
, lba
, lba_count
);
1192 /* Start building the next batch */
1195 lba_count
= next_lba_count
;
1201 /* If we ended with a contiguous set of LBAs, clear them now */
1202 if (lba_count
> 0) {
1203 spdk_bs_batch_clear_dev(ctx
, batch
, lba
, lba_count
);
1206 spdk_bs_batch_close(batch
);
1210 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1212 struct spdk_blob_persist_ctx
*ctx
= cb_arg
;
1213 struct spdk_blob
*blob
= ctx
->blob
;
1214 struct spdk_blob_store
*bs
= blob
->bs
;
1217 /* This loop starts at 1 because the first page is special and handled
1218 * below. The pages (except the first) are never written in place,
1219 * so any pages in the clean list must be zeroed.
1221 for (i
= 1; i
< blob
->clean
.num_pages
; i
++) {
1222 spdk_bit_array_clear(bs
->used_md_pages
, blob
->clean
.pages
[i
]);
1225 if (blob
->active
.num_pages
== 0) {
1228 page_num
= _spdk_bs_blobid_to_page(blob
->id
);
1229 spdk_bit_array_clear(bs
->used_md_pages
, page_num
);
1232 /* Move on to clearing clusters */
1233 _spdk_blob_persist_clear_clusters(seq
, ctx
, 0);
1237 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1239 struct spdk_blob_persist_ctx
*ctx
= cb_arg
;
1240 struct spdk_blob
*blob
= ctx
->blob
;
1241 struct spdk_blob_store
*bs
= blob
->bs
;
1244 spdk_bs_batch_t
*batch
;
1247 batch
= spdk_bs_sequence_to_batch(seq
, _spdk_blob_persist_zero_pages_cpl
, ctx
);
1249 lba_count
= _spdk_bs_byte_to_lba(bs
, SPDK_BS_PAGE_SIZE
);
1251 /* This loop starts at 1 because the first page is special and handled
1252 * below. The pages (except the first) are never written in place,
1253 * so any pages in the clean list must be zeroed.
1255 for (i
= 1; i
< blob
->clean
.num_pages
; i
++) {
1256 lba
= _spdk_bs_page_to_lba(bs
, bs
->md_start
+ blob
->clean
.pages
[i
]);
1258 spdk_bs_batch_write_zeroes_dev(batch
, lba
, lba_count
);
1261 /* The first page will only be zeroed if this is a delete. */
1262 if (blob
->active
.num_pages
== 0) {
1265 /* The first page in the metadata goes where the blobid indicates */
1266 page_num
= _spdk_bs_blobid_to_page(blob
->id
);
1267 lba
= _spdk_bs_page_to_lba(bs
, bs
->md_start
+ page_num
);
1269 spdk_bs_batch_write_zeroes_dev(batch
, lba
, lba_count
);
1272 spdk_bs_batch_close(batch
);
1276 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1278 struct spdk_blob_persist_ctx
*ctx
= cb_arg
;
1279 struct spdk_blob
*blob
= ctx
->blob
;
1280 struct spdk_blob_store
*bs
= blob
->bs
;
1283 struct spdk_blob_md_page
*page
;
1285 if (blob
->active
.num_pages
== 0) {
1286 /* Move on to the next step */
1287 _spdk_blob_persist_zero_pages(seq
, ctx
, 0);
1291 lba_count
= _spdk_bs_byte_to_lba(bs
, sizeof(*page
));
1293 page
= &ctx
->pages
[0];
1294 /* The first page in the metadata goes where the blobid indicates */
1295 lba
= _spdk_bs_page_to_lba(bs
, bs
->md_start
+ _spdk_bs_blobid_to_page(blob
->id
));
1297 spdk_bs_sequence_write_dev(seq
, page
, lba
, lba_count
,
1298 _spdk_blob_persist_zero_pages
, ctx
);
1302 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1304 struct spdk_blob_persist_ctx
*ctx
= cb_arg
;
1305 struct spdk_blob
*blob
= ctx
->blob
;
1306 struct spdk_blob_store
*bs
= blob
->bs
;
1309 struct spdk_blob_md_page
*page
;
1310 spdk_bs_batch_t
*batch
;
1313 /* Clusters don't move around in blobs. The list shrinks or grows
1314 * at the end, but no changes ever occur in the middle of the list.
1317 lba_count
= _spdk_bs_byte_to_lba(bs
, sizeof(*page
));
1319 batch
= spdk_bs_sequence_to_batch(seq
, _spdk_blob_persist_write_page_root
, ctx
);
1321 /* This starts at 1. The root page is not written until
1322 * all of the others are finished
1324 for (i
= 1; i
< blob
->active
.num_pages
; i
++) {
1325 page
= &ctx
->pages
[i
];
1326 assert(page
->sequence_num
== i
);
1328 lba
= _spdk_bs_page_to_lba(bs
, bs
->md_start
+ blob
->active
.pages
[i
]);
1330 spdk_bs_batch_write_dev(batch
, page
, lba
, lba_count
);
1333 spdk_bs_batch_close(batch
);
1337 _spdk_blob_resize(struct spdk_blob
*blob
, uint64_t sz
)
1341 uint64_t lfc
; /* lowest free cluster */
1342 uint64_t num_clusters
;
1343 struct spdk_blob_store
*bs
;
1347 _spdk_blob_verify_md_op(blob
);
1349 if (blob
->active
.num_clusters
== sz
) {
1353 if (blob
->active
.num_clusters
< blob
->active
.cluster_array_size
) {
1354 /* If this blob was resized to be larger, then smaller, then
1355 * larger without syncing, then the cluster array already
1356 * contains spare assigned clusters we can use.
1358 num_clusters
= spdk_min(blob
->active
.cluster_array_size
,
1361 num_clusters
= blob
->active
.num_clusters
;
1364 /* Do two passes - one to verify that we can obtain enough clusters
1365 * and another to actually claim them.
1368 if (spdk_blob_is_thin_provisioned(blob
) == false) {
1370 for (i
= num_clusters
; i
< sz
; i
++) {
1371 lfc
= spdk_bit_array_find_first_clear(bs
->used_clusters
, lfc
);
1372 if (lfc
== UINT32_MAX
) {
1373 /* No more free clusters. Cannot satisfy the request */
1380 if (sz
> num_clusters
) {
1381 /* Expand the cluster array if necessary.
1382 * We only shrink the array when persisting.
1384 tmp
= realloc(blob
->active
.clusters
, sizeof(uint64_t) * sz
);
1385 if (sz
> 0 && tmp
== NULL
) {
1388 memset(tmp
+ blob
->active
.cluster_array_size
, 0,
1389 sizeof(uint64_t) * (sz
- blob
->active
.cluster_array_size
));
1390 blob
->active
.clusters
= tmp
;
1391 blob
->active
.cluster_array_size
= sz
;
1394 blob
->state
= SPDK_BLOB_STATE_DIRTY
;
1396 if (spdk_blob_is_thin_provisioned(blob
) == false) {
1398 for (i
= num_clusters
; i
< sz
; i
++) {
1399 _spdk_bs_allocate_cluster(blob
, i
, &lfc
, true);
1404 blob
->active
.num_clusters
= sz
;
1410 _spdk_blob_persist_start(struct spdk_blob_persist_ctx
*ctx
)
1412 spdk_bs_sequence_t
*seq
= ctx
->seq
;
1413 struct spdk_blob
*blob
= ctx
->blob
;
1414 struct spdk_blob_store
*bs
= blob
->bs
;
1420 if (blob
->active
.num_pages
== 0) {
1421 /* This is the signal that the blob should be deleted.
1422 * Immediately jump to the clean up routine. */
1423 assert(blob
->clean
.num_pages
> 0);
1424 ctx
->idx
= blob
->clean
.num_pages
- 1;
1425 blob
->state
= SPDK_BLOB_STATE_CLEAN
;
1426 _spdk_blob_persist_zero_pages(seq
, ctx
, 0);
1431 /* Generate the new metadata */
1432 rc
= _spdk_blob_serialize(blob
, &ctx
->pages
, &blob
->active
.num_pages
);
1434 _spdk_blob_persist_complete(seq
, ctx
, rc
);
1438 assert(blob
->active
.num_pages
>= 1);
1440 /* Resize the cache of page indices */
1441 tmp
= realloc(blob
->active
.pages
, blob
->active
.num_pages
* sizeof(*blob
->active
.pages
));
1443 _spdk_blob_persist_complete(seq
, ctx
, -ENOMEM
);
1446 blob
->active
.pages
= tmp
;
1448 /* Assign this metadata to pages. This requires two passes -
1449 * one to verify that there are enough pages and a second
1450 * to actually claim them. */
1452 /* Note that this loop starts at one. The first page location is fixed by the blobid. */
1453 for (i
= 1; i
< blob
->active
.num_pages
; i
++) {
1454 page_num
= spdk_bit_array_find_first_clear(bs
->used_md_pages
, page_num
);
1455 if (page_num
== UINT32_MAX
) {
1456 _spdk_blob_persist_complete(seq
, ctx
, -ENOMEM
);
1463 blob
->active
.pages
[0] = _spdk_bs_blobid_to_page(blob
->id
);
1464 for (i
= 1; i
< blob
->active
.num_pages
; i
++) {
1465 page_num
= spdk_bit_array_find_first_clear(bs
->used_md_pages
, page_num
);
1466 ctx
->pages
[i
- 1].next
= page_num
;
1467 /* Now that previous metadata page is complete, calculate the crc for it. */
1468 ctx
->pages
[i
- 1].crc
= _spdk_blob_md_page_calc_crc(&ctx
->pages
[i
- 1]);
1469 blob
->active
.pages
[i
] = page_num
;
1470 spdk_bit_array_set(bs
->used_md_pages
, page_num
);
1471 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Claiming page %u for blob %lu\n", page_num
, blob
->id
);
1474 ctx
->pages
[i
- 1].crc
= _spdk_blob_md_page_calc_crc(&ctx
->pages
[i
- 1]);
1475 /* Start writing the metadata from last page to first */
1476 ctx
->idx
= blob
->active
.num_pages
- 1;
1477 blob
->state
= SPDK_BLOB_STATE_CLEAN
;
1478 _spdk_blob_persist_write_page_chain(seq
, ctx
, 0);
1482 _spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1484 struct spdk_blob_persist_ctx
*ctx
= cb_arg
;
1486 ctx
->blob
->bs
->clean
= 0;
1488 spdk_free(ctx
->super
);
1490 _spdk_blob_persist_start(ctx
);
1494 _spdk_bs_write_super(spdk_bs_sequence_t
*seq
, struct spdk_blob_store
*bs
,
1495 struct spdk_bs_super_block
*super
, spdk_bs_sequence_cpl cb_fn
, void *cb_arg
);
1499 _spdk_blob_persist_dirty(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1501 struct spdk_blob_persist_ctx
*ctx
= cb_arg
;
1503 ctx
->super
->clean
= 0;
1504 if (ctx
->super
->size
== 0) {
1505 ctx
->super
->size
= ctx
->blob
->bs
->dev
->blockcnt
* ctx
->blob
->bs
->dev
->blocklen
;
1508 _spdk_bs_write_super(seq
, ctx
->blob
->bs
, ctx
->super
, _spdk_blob_persist_dirty_cpl
, ctx
);
1512 /* Write a blob to disk */
1514 _spdk_blob_persist(spdk_bs_sequence_t
*seq
, struct spdk_blob
*blob
,
1515 spdk_bs_sequence_cpl cb_fn
, void *cb_arg
)
1517 struct spdk_blob_persist_ctx
*ctx
;
1519 _spdk_blob_verify_md_op(blob
);
1521 if (blob
->state
== SPDK_BLOB_STATE_CLEAN
) {
1522 cb_fn(seq
, cb_arg
, 0);
1526 ctx
= calloc(1, sizeof(*ctx
));
1528 cb_fn(seq
, cb_arg
, -ENOMEM
);
1534 ctx
->cb_arg
= cb_arg
;
1536 if (blob
->bs
->clean
) {
1537 ctx
->super
= spdk_zmalloc(sizeof(*ctx
->super
), 0x1000, NULL
,
1538 SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
1540 cb_fn(seq
, cb_arg
, -ENOMEM
);
1545 spdk_bs_sequence_read_dev(seq
, ctx
->super
, _spdk_bs_page_to_lba(blob
->bs
, 0),
1546 _spdk_bs_byte_to_lba(blob
->bs
, sizeof(*ctx
->super
)),
1547 _spdk_blob_persist_dirty
, ctx
);
1549 _spdk_blob_persist_start(ctx
);
1553 struct spdk_blob_copy_cluster_ctx
{
1554 struct spdk_blob
*blob
;
1557 uint64_t new_cluster
;
1558 spdk_bs_sequence_t
*seq
;
1562 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg
, int bserrno
)
1564 struct spdk_blob_copy_cluster_ctx
*ctx
= cb_arg
;
1565 struct spdk_bs_request_set
*set
= (struct spdk_bs_request_set
*)ctx
->seq
;
1566 TAILQ_HEAD(, spdk_bs_request_set
) requests
;
1567 spdk_bs_user_op_t
*op
;
1569 TAILQ_INIT(&requests
);
1570 TAILQ_SWAP(&set
->channel
->need_cluster_alloc
, &requests
, spdk_bs_request_set
, link
);
1572 while (!TAILQ_EMPTY(&requests
)) {
1573 op
= TAILQ_FIRST(&requests
);
1574 TAILQ_REMOVE(&requests
, op
, link
);
1576 spdk_bs_user_op_execute(op
);
1578 spdk_bs_user_op_abort(op
);
1582 spdk_free(ctx
->buf
);
1587 _spdk_blob_insert_cluster_cpl(void *cb_arg
, int bserrno
)
1589 struct spdk_blob_copy_cluster_ctx
*ctx
= cb_arg
;
1592 if (bserrno
== -EEXIST
) {
1593 /* The metadata insert failed because another thread
1594 * allocated the cluster first. Free our cluster
1595 * but continue without error. */
1598 _spdk_bs_release_cluster(ctx
->blob
->bs
, ctx
->new_cluster
);
1601 spdk_bs_sequence_finish(ctx
->seq
, bserrno
);
1605 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1607 struct spdk_blob_copy_cluster_ctx
*ctx
= cb_arg
;
1608 uint32_t cluster_number
;
1611 /* The write failed, so jump to the final completion handler */
1612 spdk_bs_sequence_finish(seq
, bserrno
);
1616 cluster_number
= _spdk_bs_page_to_cluster(ctx
->blob
->bs
, ctx
->page
);
1618 _spdk_blob_insert_cluster_on_md_thread(ctx
->blob
, cluster_number
, ctx
->new_cluster
,
1619 _spdk_blob_insert_cluster_cpl
, ctx
);
1623 _spdk_blob_write_copy(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
1625 struct spdk_blob_copy_cluster_ctx
*ctx
= cb_arg
;
1628 /* The read failed, so jump to the final completion handler */
1629 spdk_bs_sequence_finish(seq
, bserrno
);
1633 /* Write whole cluster */
1634 spdk_bs_sequence_write_dev(seq
, ctx
->buf
,
1635 _spdk_bs_cluster_to_lba(ctx
->blob
->bs
, ctx
->new_cluster
),
1636 _spdk_bs_cluster_to_lba(ctx
->blob
->bs
, 1),
1637 _spdk_blob_write_copy_cpl
, ctx
);
1641 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob
*blob
,
1642 struct spdk_io_channel
*_ch
,
1643 uint64_t io_unit
, spdk_bs_user_op_t
*op
)
1645 struct spdk_bs_cpl cpl
;
1646 struct spdk_bs_channel
*ch
;
1647 struct spdk_blob_copy_cluster_ctx
*ctx
;
1648 uint32_t cluster_start_page
;
1649 uint32_t cluster_number
;
1652 ch
= spdk_io_channel_get_ctx(_ch
);
1654 if (!TAILQ_EMPTY(&ch
->need_cluster_alloc
)) {
1655 /* There are already operations pending. Queue this user op
1656 * and return because it will be re-executed when the outstanding
1657 * cluster allocation completes. */
1658 TAILQ_INSERT_TAIL(&ch
->need_cluster_alloc
, op
, link
);
1662 /* Round the io_unit offset down to the first page in the cluster */
1663 cluster_start_page
= _spdk_bs_io_unit_to_cluster_start(blob
, io_unit
);
1665 /* Calculate which index in the metadata cluster array the corresponding
1666 * cluster is supposed to be at. */
1667 cluster_number
= _spdk_bs_io_unit_to_cluster_number(blob
, io_unit
);
1669 ctx
= calloc(1, sizeof(*ctx
));
1671 spdk_bs_user_op_abort(op
);
1675 assert(blob
->bs
->cluster_sz
% blob
->back_bs_dev
->blocklen
== 0);
1678 ctx
->page
= cluster_start_page
;
1680 if (blob
->parent_id
!= SPDK_BLOBID_INVALID
) {
1681 ctx
->buf
= spdk_malloc(blob
->bs
->cluster_sz
, blob
->back_bs_dev
->blocklen
,
1682 NULL
, SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
1684 SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32
" failed.\n",
1685 blob
->bs
->cluster_sz
);
1687 spdk_bs_user_op_abort(op
);
1692 rc
= _spdk_bs_allocate_cluster(blob
, cluster_number
, &ctx
->new_cluster
, false);
1694 spdk_free(ctx
->buf
);
1696 spdk_bs_user_op_abort(op
);
1700 cpl
.type
= SPDK_BS_CPL_TYPE_BLOB_BASIC
;
1701 cpl
.u
.blob_basic
.cb_fn
= _spdk_blob_allocate_and_copy_cluster_cpl
;
1702 cpl
.u
.blob_basic
.cb_arg
= ctx
;
1704 ctx
->seq
= spdk_bs_sequence_start(_ch
, &cpl
);
1706 _spdk_bs_release_cluster(blob
->bs
, ctx
->new_cluster
);
1707 spdk_free(ctx
->buf
);
1709 spdk_bs_user_op_abort(op
);
1713 /* Queue the user op to block other incoming operations */
1714 TAILQ_INSERT_TAIL(&ch
->need_cluster_alloc
, op
, link
);
1716 if (blob
->parent_id
!= SPDK_BLOBID_INVALID
) {
1717 /* Read cluster from backing device */
1718 spdk_bs_sequence_read_bs_dev(ctx
->seq
, blob
->back_bs_dev
, ctx
->buf
,
1719 _spdk_bs_dev_page_to_lba(blob
->back_bs_dev
, cluster_start_page
),
1720 _spdk_bs_dev_byte_to_lba(blob
->back_bs_dev
, blob
->bs
->cluster_sz
),
1721 _spdk_blob_write_copy
, ctx
);
1723 _spdk_blob_insert_cluster_on_md_thread(ctx
->blob
, cluster_number
, ctx
->new_cluster
,
1724 _spdk_blob_insert_cluster_cpl
, ctx
);
1729 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob
*blob
, uint64_t io_unit
, uint64_t length
,
1730 uint64_t *lba
, uint32_t *lba_count
)
1732 *lba_count
= length
;
1734 if (!_spdk_bs_io_unit_is_allocated(blob
, io_unit
)) {
1735 assert(blob
->back_bs_dev
!= NULL
);
1736 *lba
= _spdk_bs_io_unit_to_back_dev_lba(blob
, io_unit
);
1737 *lba_count
= _spdk_bs_io_unit_to_back_dev_lba(blob
, *lba_count
);
1739 *lba
= _spdk_bs_blob_io_unit_to_lba(blob
, io_unit
);
1743 struct op_split_ctx
{
1744 struct spdk_blob
*blob
;
1745 struct spdk_io_channel
*channel
;
1746 uint64_t io_unit_offset
;
1747 uint64_t io_units_remaining
;
1749 enum spdk_blob_op_type op_type
;
1750 spdk_bs_sequence_t
*seq
;
1754 _spdk_blob_request_submit_op_split_next(void *cb_arg
, int bserrno
)
1756 struct op_split_ctx
*ctx
= cb_arg
;
1757 struct spdk_blob
*blob
= ctx
->blob
;
1758 struct spdk_io_channel
*ch
= ctx
->channel
;
1759 enum spdk_blob_op_type op_type
= ctx
->op_type
;
1760 uint8_t *buf
= ctx
->curr_payload
;
1761 uint64_t offset
= ctx
->io_unit_offset
;
1762 uint64_t length
= ctx
->io_units_remaining
;
1765 if (bserrno
!= 0 || ctx
->io_units_remaining
== 0) {
1766 spdk_bs_sequence_finish(ctx
->seq
, bserrno
);
1771 op_length
= spdk_min(length
, _spdk_bs_num_io_units_to_cluster_boundary(blob
,
1774 /* Update length and payload for next operation */
1775 ctx
->io_units_remaining
-= op_length
;
1776 ctx
->io_unit_offset
+= op_length
;
1777 if (op_type
== SPDK_BLOB_WRITE
|| op_type
== SPDK_BLOB_READ
) {
1778 ctx
->curr_payload
+= op_length
* blob
->bs
->io_unit_size
;
1782 case SPDK_BLOB_READ
:
1783 spdk_blob_io_read(blob
, ch
, buf
, offset
, op_length
,
1784 _spdk_blob_request_submit_op_split_next
, ctx
);
1786 case SPDK_BLOB_WRITE
:
1787 spdk_blob_io_write(blob
, ch
, buf
, offset
, op_length
,
1788 _spdk_blob_request_submit_op_split_next
, ctx
);
1790 case SPDK_BLOB_UNMAP
:
1791 spdk_blob_io_unmap(blob
, ch
, offset
, op_length
,
1792 _spdk_blob_request_submit_op_split_next
, ctx
);
1794 case SPDK_BLOB_WRITE_ZEROES
:
1795 spdk_blob_io_write_zeroes(blob
, ch
, offset
, op_length
,
1796 _spdk_blob_request_submit_op_split_next
, ctx
);
1798 case SPDK_BLOB_READV
:
1799 case SPDK_BLOB_WRITEV
:
1800 SPDK_ERRLOG("readv/write not valid for %s\n", __func__
);
1801 spdk_bs_sequence_finish(ctx
->seq
, -EINVAL
);
1808 _spdk_blob_request_submit_op_split(struct spdk_io_channel
*ch
, struct spdk_blob
*blob
,
1809 void *payload
, uint64_t offset
, uint64_t length
,
1810 spdk_blob_op_complete cb_fn
, void *cb_arg
, enum spdk_blob_op_type op_type
)
1812 struct op_split_ctx
*ctx
;
1813 spdk_bs_sequence_t
*seq
;
1814 struct spdk_bs_cpl cpl
;
1816 assert(blob
!= NULL
);
1818 ctx
= calloc(1, sizeof(struct op_split_ctx
));
1820 cb_fn(cb_arg
, -ENOMEM
);
1824 cpl
.type
= SPDK_BS_CPL_TYPE_BLOB_BASIC
;
1825 cpl
.u
.blob_basic
.cb_fn
= cb_fn
;
1826 cpl
.u
.blob_basic
.cb_arg
= cb_arg
;
1828 seq
= spdk_bs_sequence_start(ch
, &cpl
);
1831 cb_fn(cb_arg
, -ENOMEM
);
1837 ctx
->curr_payload
= payload
;
1838 ctx
->io_unit_offset
= offset
;
1839 ctx
->io_units_remaining
= length
;
1840 ctx
->op_type
= op_type
;
1843 _spdk_blob_request_submit_op_split_next(ctx
, 0);
1847 _spdk_blob_request_submit_op_single(struct spdk_io_channel
*_ch
, struct spdk_blob
*blob
,
1848 void *payload
, uint64_t offset
, uint64_t length
,
1849 spdk_blob_op_complete cb_fn
, void *cb_arg
, enum spdk_blob_op_type op_type
)
1851 struct spdk_bs_cpl cpl
;
1855 assert(blob
!= NULL
);
1857 cpl
.type
= SPDK_BS_CPL_TYPE_BLOB_BASIC
;
1858 cpl
.u
.blob_basic
.cb_fn
= cb_fn
;
1859 cpl
.u
.blob_basic
.cb_arg
= cb_arg
;
1861 _spdk_blob_calculate_lba_and_lba_count(blob
, offset
, length
, &lba
, &lba_count
);
1863 if (blob
->frozen_refcnt
) {
1864 /* This blob I/O is frozen */
1865 spdk_bs_user_op_t
*op
;
1866 struct spdk_bs_channel
*bs_channel
= spdk_io_channel_get_ctx(_ch
);
1868 op
= spdk_bs_user_op_alloc(_ch
, &cpl
, op_type
, blob
, payload
, 0, offset
, length
);
1870 cb_fn(cb_arg
, -ENOMEM
);
1874 TAILQ_INSERT_TAIL(&bs_channel
->queued_io
, op
, link
);
1880 case SPDK_BLOB_READ
: {
1881 spdk_bs_batch_t
*batch
;
1883 batch
= spdk_bs_batch_open(_ch
, &cpl
);
1885 cb_fn(cb_arg
, -ENOMEM
);
1889 if (_spdk_bs_io_unit_is_allocated(blob
, offset
)) {
1890 /* Read from the blob */
1891 spdk_bs_batch_read_dev(batch
, payload
, lba
, lba_count
);
1893 /* Read from the backing block device */
1894 spdk_bs_batch_read_bs_dev(batch
, blob
->back_bs_dev
, payload
, lba
, lba_count
);
1897 spdk_bs_batch_close(batch
);
1900 case SPDK_BLOB_WRITE
:
1901 case SPDK_BLOB_WRITE_ZEROES
: {
1902 if (_spdk_bs_io_unit_is_allocated(blob
, offset
)) {
1903 /* Write to the blob */
1904 spdk_bs_batch_t
*batch
;
1906 if (lba_count
== 0) {
1911 batch
= spdk_bs_batch_open(_ch
, &cpl
);
1913 cb_fn(cb_arg
, -ENOMEM
);
1917 if (op_type
== SPDK_BLOB_WRITE
) {
1918 spdk_bs_batch_write_dev(batch
, payload
, lba
, lba_count
);
1920 spdk_bs_batch_write_zeroes_dev(batch
, lba
, lba_count
);
1923 spdk_bs_batch_close(batch
);
1925 /* Queue this operation and allocate the cluster */
1926 spdk_bs_user_op_t
*op
;
1928 op
= spdk_bs_user_op_alloc(_ch
, &cpl
, op_type
, blob
, payload
, 0, offset
, length
);
1930 cb_fn(cb_arg
, -ENOMEM
);
1934 _spdk_bs_allocate_and_copy_cluster(blob
, _ch
, offset
, op
);
1938 case SPDK_BLOB_UNMAP
: {
1939 spdk_bs_batch_t
*batch
;
1941 batch
= spdk_bs_batch_open(_ch
, &cpl
);
1943 cb_fn(cb_arg
, -ENOMEM
);
1947 if (_spdk_bs_io_unit_is_allocated(blob
, offset
)) {
1948 spdk_bs_batch_unmap_dev(batch
, lba
, lba_count
);
1951 spdk_bs_batch_close(batch
);
1954 case SPDK_BLOB_READV
:
1955 case SPDK_BLOB_WRITEV
:
1956 SPDK_ERRLOG("readv/write not valid\n");
1957 cb_fn(cb_arg
, -EINVAL
);
1963 _spdk_blob_request_submit_op(struct spdk_blob
*blob
, struct spdk_io_channel
*_channel
,
1964 void *payload
, uint64_t offset
, uint64_t length
,
1965 spdk_blob_op_complete cb_fn
, void *cb_arg
, enum spdk_blob_op_type op_type
)
1967 assert(blob
!= NULL
);
1969 if (blob
->data_ro
&& op_type
!= SPDK_BLOB_READ
) {
1970 cb_fn(cb_arg
, -EPERM
);
1974 if (offset
+ length
> _spdk_bs_cluster_to_lba(blob
->bs
, blob
->active
.num_clusters
)) {
1975 cb_fn(cb_arg
, -EINVAL
);
1978 if (length
<= _spdk_bs_num_io_units_to_cluster_boundary(blob
, offset
)) {
1979 _spdk_blob_request_submit_op_single(_channel
, blob
, payload
, offset
, length
,
1980 cb_fn
, cb_arg
, op_type
);
1982 _spdk_blob_request_submit_op_split(_channel
, blob
, payload
, offset
, length
,
1983 cb_fn
, cb_arg
, op_type
);
1988 struct spdk_blob
*blob
;
1989 struct spdk_io_channel
*channel
;
1990 spdk_blob_op_complete cb_fn
;
1994 struct iovec
*orig_iov
;
1995 uint64_t io_unit_offset
;
1996 uint64_t io_units_remaining
;
1997 uint64_t io_units_done
;
1998 struct iovec iov
[0];
2002 _spdk_rw_iov_done(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
2004 assert(cb_arg
== NULL
);
2005 spdk_bs_sequence_finish(seq
, bserrno
);
2009 _spdk_rw_iov_split_next(void *cb_arg
, int bserrno
)
2011 struct rw_iov_ctx
*ctx
= cb_arg
;
2012 struct spdk_blob
*blob
= ctx
->blob
;
2013 struct iovec
*iov
, *orig_iov
;
2016 uint64_t io_units_count
, io_units_to_boundary
, io_unit_offset
;
2017 uint64_t byte_count
;
2019 if (bserrno
!= 0 || ctx
->io_units_remaining
== 0) {
2020 ctx
->cb_fn(ctx
->cb_arg
, bserrno
);
2025 io_unit_offset
= ctx
->io_unit_offset
;
2026 io_units_to_boundary
= _spdk_bs_num_io_units_to_cluster_boundary(blob
, io_unit_offset
);
2027 io_units_count
= spdk_min(ctx
->io_units_remaining
, io_units_to_boundary
);
2029 * Get index and offset into the original iov array for our current position in the I/O sequence.
2030 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
2031 * point to the current position in the I/O sequence.
2033 byte_count
= ctx
->io_units_done
* blob
->bs
->io_unit_size
;
2034 orig_iov
= &ctx
->orig_iov
[0];
2036 while (byte_count
> 0) {
2037 if (byte_count
>= orig_iov
->iov_len
) {
2038 byte_count
-= orig_iov
->iov_len
;
2041 orig_iovoff
= byte_count
;
2047 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many
2048 * bytes of this next I/O remain to be accounted for in the new iov array.
2050 byte_count
= io_units_count
* blob
->bs
->io_unit_size
;
2053 while (byte_count
> 0) {
2054 assert(iovcnt
< ctx
->iovcnt
);
2055 iov
->iov_len
= spdk_min(byte_count
, orig_iov
->iov_len
- orig_iovoff
);
2056 iov
->iov_base
= orig_iov
->iov_base
+ orig_iovoff
;
2057 byte_count
-= iov
->iov_len
;
2064 ctx
->io_unit_offset
+= io_units_count
;
2065 ctx
->io_units_remaining
-= io_units_count
;
2066 ctx
->io_units_done
+= io_units_count
;
2070 spdk_blob_io_readv(ctx
->blob
, ctx
->channel
, iov
, iovcnt
, io_unit_offset
,
2071 io_units_count
, _spdk_rw_iov_split_next
, ctx
);
2073 spdk_blob_io_writev(ctx
->blob
, ctx
->channel
, iov
, iovcnt
, io_unit_offset
,
2074 io_units_count
, _spdk_rw_iov_split_next
, ctx
);
2079 _spdk_blob_request_submit_rw_iov(struct spdk_blob
*blob
, struct spdk_io_channel
*_channel
,
2080 struct iovec
*iov
, int iovcnt
, uint64_t offset
, uint64_t length
,
2081 spdk_blob_op_complete cb_fn
, void *cb_arg
, bool read
)
2083 struct spdk_bs_cpl cpl
;
2085 assert(blob
!= NULL
);
2087 if (!read
&& blob
->data_ro
) {
2088 cb_fn(cb_arg
, -EPERM
);
2097 if (offset
+ length
> _spdk_bs_cluster_to_lba(blob
->bs
, blob
->active
.num_clusters
)) {
2098 cb_fn(cb_arg
, -EINVAL
);
2103 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
2104 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary,
2105 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster
2106 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
2107 * to allocate a separate iov array and split the I/O such that none of the resulting
2108 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel)
2109 * but since this case happens very infrequently, any performance impact will be negligible.
2111 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
2112 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
2113 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called
2114 * when the batch was completed, to allow for freeing the memory for the iov arrays.
2116 if (spdk_likely(length
<= _spdk_bs_num_io_units_to_cluster_boundary(blob
, offset
))) {
2120 cpl
.type
= SPDK_BS_CPL_TYPE_BLOB_BASIC
;
2121 cpl
.u
.blob_basic
.cb_fn
= cb_fn
;
2122 cpl
.u
.blob_basic
.cb_arg
= cb_arg
;
2124 if (blob
->frozen_refcnt
) {
2125 /* This blob I/O is frozen */
2126 enum spdk_blob_op_type op_type
;
2127 spdk_bs_user_op_t
*op
;
2128 struct spdk_bs_channel
*bs_channel
= spdk_io_channel_get_ctx(_channel
);
2130 op_type
= read
? SPDK_BLOB_READV
: SPDK_BLOB_WRITEV
;
2131 op
= spdk_bs_user_op_alloc(_channel
, &cpl
, op_type
, blob
, iov
, iovcnt
, offset
, length
);
2133 cb_fn(cb_arg
, -ENOMEM
);
2137 TAILQ_INSERT_TAIL(&bs_channel
->queued_io
, op
, link
);
2142 _spdk_blob_calculate_lba_and_lba_count(blob
, offset
, length
, &lba
, &lba_count
);
2145 spdk_bs_sequence_t
*seq
;
2147 seq
= spdk_bs_sequence_start(_channel
, &cpl
);
2149 cb_fn(cb_arg
, -ENOMEM
);
2153 if (_spdk_bs_io_unit_is_allocated(blob
, offset
)) {
2154 spdk_bs_sequence_readv_dev(seq
, iov
, iovcnt
, lba
, lba_count
, _spdk_rw_iov_done
, NULL
);
2156 spdk_bs_sequence_readv_bs_dev(seq
, blob
->back_bs_dev
, iov
, iovcnt
, lba
, lba_count
,
2157 _spdk_rw_iov_done
, NULL
);
2160 if (_spdk_bs_io_unit_is_allocated(blob
, offset
)) {
2161 spdk_bs_sequence_t
*seq
;
2163 seq
= spdk_bs_sequence_start(_channel
, &cpl
);
2165 cb_fn(cb_arg
, -ENOMEM
);
2169 spdk_bs_sequence_writev_dev(seq
, iov
, iovcnt
, lba
, lba_count
, _spdk_rw_iov_done
, NULL
);
2171 /* Queue this operation and allocate the cluster */
2172 spdk_bs_user_op_t
*op
;
2174 op
= spdk_bs_user_op_alloc(_channel
, &cpl
, SPDK_BLOB_WRITEV
, blob
, iov
, iovcnt
, offset
,
2177 cb_fn(cb_arg
, -ENOMEM
);
2181 _spdk_bs_allocate_and_copy_cluster(blob
, _channel
, offset
, op
);
2185 struct rw_iov_ctx
*ctx
;
2187 ctx
= calloc(1, sizeof(struct rw_iov_ctx
) + iovcnt
* sizeof(struct iovec
));
2189 cb_fn(cb_arg
, -ENOMEM
);
2194 ctx
->channel
= _channel
;
2196 ctx
->cb_arg
= cb_arg
;
2198 ctx
->orig_iov
= iov
;
2199 ctx
->iovcnt
= iovcnt
;
2200 ctx
->io_unit_offset
= offset
;
2201 ctx
->io_units_remaining
= length
;
2202 ctx
->io_units_done
= 0;
2204 _spdk_rw_iov_split_next(ctx
, 0);
2208 static struct spdk_blob
*
2209 _spdk_blob_lookup(struct spdk_blob_store
*bs
, spdk_blob_id blobid
)
2211 struct spdk_blob
*blob
;
2213 TAILQ_FOREACH(blob
, &bs
->blobs
, link
) {
2214 if (blob
->id
== blobid
) {
2223 _spdk_blob_get_snapshot_and_clone_entries(struct spdk_blob
*blob
,
2224 struct spdk_blob_list
**snapshot_entry
, struct spdk_blob_list
**clone_entry
)
2226 assert(blob
!= NULL
);
2227 *snapshot_entry
= NULL
;
2228 *clone_entry
= NULL
;
2230 if (blob
->parent_id
== SPDK_BLOBID_INVALID
) {
2234 TAILQ_FOREACH(*snapshot_entry
, &blob
->bs
->snapshots
, link
) {
2235 if ((*snapshot_entry
)->id
== blob
->parent_id
) {
2240 if (*snapshot_entry
!= NULL
) {
2241 TAILQ_FOREACH(*clone_entry
, &(*snapshot_entry
)->clones
, link
) {
2242 if ((*clone_entry
)->id
== blob
->id
) {
2247 assert(clone_entry
!= NULL
);
2252 _spdk_bs_channel_create(void *io_device
, void *ctx_buf
)
2254 struct spdk_blob_store
*bs
= io_device
;
2255 struct spdk_bs_channel
*channel
= ctx_buf
;
2256 struct spdk_bs_dev
*dev
;
2257 uint32_t max_ops
= bs
->max_channel_ops
;
2262 channel
->req_mem
= calloc(max_ops
, sizeof(struct spdk_bs_request_set
));
2263 if (!channel
->req_mem
) {
2267 TAILQ_INIT(&channel
->reqs
);
2269 for (i
= 0; i
< max_ops
; i
++) {
2270 TAILQ_INSERT_TAIL(&channel
->reqs
, &channel
->req_mem
[i
], link
);
2275 channel
->dev_channel
= dev
->create_channel(dev
);
2277 if (!channel
->dev_channel
) {
2278 SPDK_ERRLOG("Failed to create device channel.\n");
2279 free(channel
->req_mem
);
2283 TAILQ_INIT(&channel
->need_cluster_alloc
);
2284 TAILQ_INIT(&channel
->queued_io
);
2290 _spdk_bs_channel_destroy(void *io_device
, void *ctx_buf
)
2292 struct spdk_bs_channel
*channel
= ctx_buf
;
2293 spdk_bs_user_op_t
*op
;
2295 while (!TAILQ_EMPTY(&channel
->need_cluster_alloc
)) {
2296 op
= TAILQ_FIRST(&channel
->need_cluster_alloc
);
2297 TAILQ_REMOVE(&channel
->need_cluster_alloc
, op
, link
);
2298 spdk_bs_user_op_abort(op
);
2301 while (!TAILQ_EMPTY(&channel
->queued_io
)) {
2302 op
= TAILQ_FIRST(&channel
->queued_io
);
2303 TAILQ_REMOVE(&channel
->queued_io
, op
, link
);
2304 spdk_bs_user_op_abort(op
);
2307 free(channel
->req_mem
);
2308 channel
->dev
->destroy_channel(channel
->dev
, channel
->dev_channel
);
2312 _spdk_bs_dev_destroy(void *io_device
)
2314 struct spdk_blob_store
*bs
= io_device
;
2315 struct spdk_blob
*blob
, *blob_tmp
;
2317 bs
->dev
->destroy(bs
->dev
);
2319 TAILQ_FOREACH_SAFE(blob
, &bs
->blobs
, link
, blob_tmp
) {
2320 TAILQ_REMOVE(&bs
->blobs
, blob
, link
);
2321 _spdk_blob_free(blob
);
2324 pthread_mutex_destroy(&bs
->used_clusters_mutex
);
2326 spdk_bit_array_free(&bs
->used_blobids
);
2327 spdk_bit_array_free(&bs
->used_md_pages
);
2328 spdk_bit_array_free(&bs
->used_clusters
);
2330 * If this function is called for any reason except a successful unload,
2331 * the unload_cpl type will be NONE and this will be a nop.
2333 spdk_bs_call_cpl(&bs
->unload_cpl
, bs
->unload_err
);
2339 _spdk_bs_blob_list_add(struct spdk_blob
*blob
)
2341 spdk_blob_id snapshot_id
;
2342 struct spdk_blob_list
*snapshot_entry
= NULL
;
2343 struct spdk_blob_list
*clone_entry
= NULL
;
2345 assert(blob
!= NULL
);
2347 snapshot_id
= blob
->parent_id
;
2348 if (snapshot_id
== SPDK_BLOBID_INVALID
) {
2352 snapshot_entry
= _spdk_bs_get_snapshot_entry(blob
->bs
, snapshot_id
);
2353 if (snapshot_entry
== NULL
) {
2354 /* Snapshot not found */
2355 snapshot_entry
= calloc(1, sizeof(struct spdk_blob_list
));
2356 if (snapshot_entry
== NULL
) {
2359 snapshot_entry
->id
= snapshot_id
;
2360 TAILQ_INIT(&snapshot_entry
->clones
);
2361 TAILQ_INSERT_TAIL(&blob
->bs
->snapshots
, snapshot_entry
, link
);
2363 TAILQ_FOREACH(clone_entry
, &snapshot_entry
->clones
, link
) {
2364 if (clone_entry
->id
== blob
->id
) {
2370 if (clone_entry
== NULL
) {
2371 /* Clone not found */
2372 clone_entry
= calloc(1, sizeof(struct spdk_blob_list
));
2373 if (clone_entry
== NULL
) {
2376 clone_entry
->id
= blob
->id
;
2377 TAILQ_INIT(&clone_entry
->clones
);
2378 TAILQ_INSERT_TAIL(&snapshot_entry
->clones
, clone_entry
, link
);
2379 snapshot_entry
->clone_count
++;
2386 _spdk_bs_blob_list_remove(struct spdk_blob
*blob
)
2388 struct spdk_blob_list
*snapshot_entry
= NULL
;
2389 struct spdk_blob_list
*clone_entry
= NULL
;
2391 _spdk_blob_get_snapshot_and_clone_entries(blob
, &snapshot_entry
, &clone_entry
);
2393 if (snapshot_entry
== NULL
) {
2397 blob
->parent_id
= SPDK_BLOBID_INVALID
;
2398 TAILQ_REMOVE(&snapshot_entry
->clones
, clone_entry
, link
);
2401 snapshot_entry
->clone_count
--;
2405 _spdk_bs_blob_list_free(struct spdk_blob_store
*bs
)
2407 struct spdk_blob_list
*snapshot_entry
;
2408 struct spdk_blob_list
*snapshot_entry_tmp
;
2409 struct spdk_blob_list
*clone_entry
;
2410 struct spdk_blob_list
*clone_entry_tmp
;
2412 TAILQ_FOREACH_SAFE(snapshot_entry
, &bs
->snapshots
, link
, snapshot_entry_tmp
) {
2413 TAILQ_FOREACH_SAFE(clone_entry
, &snapshot_entry
->clones
, link
, clone_entry_tmp
) {
2414 TAILQ_REMOVE(&snapshot_entry
->clones
, clone_entry
, link
);
2417 TAILQ_REMOVE(&bs
->snapshots
, snapshot_entry
, link
);
2418 free(snapshot_entry
);
2425 _spdk_bs_free(struct spdk_blob_store
*bs
)
2427 _spdk_bs_blob_list_free(bs
);
2429 spdk_bs_unregister_md_thread(bs
);
2430 spdk_io_device_unregister(bs
, _spdk_bs_dev_destroy
);
2434 spdk_bs_opts_init(struct spdk_bs_opts
*opts
)
2436 opts
->cluster_sz
= SPDK_BLOB_OPTS_CLUSTER_SZ
;
2437 opts
->num_md_pages
= SPDK_BLOB_OPTS_NUM_MD_PAGES
;
2438 opts
->max_md_ops
= SPDK_BLOB_OPTS_MAX_MD_OPS
;
2439 opts
->max_channel_ops
= SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS
;
2440 opts
->clear_method
= BS_CLEAR_WITH_UNMAP
;
2441 memset(&opts
->bstype
, 0, sizeof(opts
->bstype
));
2442 opts
->iter_cb_fn
= NULL
;
2443 opts
->iter_cb_arg
= NULL
;
2447 _spdk_bs_opts_verify(struct spdk_bs_opts
*opts
)
2449 if (opts
->cluster_sz
== 0 || opts
->num_md_pages
== 0 || opts
->max_md_ops
== 0 ||
2450 opts
->max_channel_ops
== 0) {
2451 SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
2459 _spdk_bs_alloc(struct spdk_bs_dev
*dev
, struct spdk_bs_opts
*opts
, struct spdk_blob_store
**_bs
)
2461 struct spdk_blob_store
*bs
;
2465 dev_size
= dev
->blocklen
* dev
->blockcnt
;
2466 if (dev_size
< opts
->cluster_sz
) {
2467 /* Device size cannot be smaller than cluster size of blobstore */
2468 SPDK_INFOLOG(SPDK_LOG_BLOB
, "Device size %" PRIu64
" is smaller than cluster size %" PRIu32
"\n",
2469 dev_size
, opts
->cluster_sz
);
2472 if (opts
->cluster_sz
< SPDK_BS_PAGE_SIZE
) {
2473 /* Cluster size cannot be smaller than page size */
2474 SPDK_ERRLOG("Cluster size %" PRIu32
" is smaller than page size %d\n",
2475 opts
->cluster_sz
, SPDK_BS_PAGE_SIZE
);
2478 bs
= calloc(1, sizeof(struct spdk_blob_store
));
2483 TAILQ_INIT(&bs
->blobs
);
2484 TAILQ_INIT(&bs
->snapshots
);
2486 bs
->md_thread
= spdk_get_thread();
2487 assert(bs
->md_thread
!= NULL
);
2490 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
2491 * even multiple of the cluster size.
2493 bs
->cluster_sz
= opts
->cluster_sz
;
2494 bs
->total_clusters
= dev
->blockcnt
/ (bs
->cluster_sz
/ dev
->blocklen
);
2495 bs
->pages_per_cluster
= bs
->cluster_sz
/ SPDK_BS_PAGE_SIZE
;
2496 bs
->num_free_clusters
= bs
->total_clusters
;
2497 bs
->used_clusters
= spdk_bit_array_create(bs
->total_clusters
);
2498 bs
->io_unit_size
= dev
->blocklen
;
2499 if (bs
->used_clusters
== NULL
) {
2504 bs
->max_channel_ops
= opts
->max_channel_ops
;
2505 bs
->super_blob
= SPDK_BLOBID_INVALID
;
2506 memcpy(&bs
->bstype
, &opts
->bstype
, sizeof(opts
->bstype
));
2508 /* The metadata is assumed to be at least 1 page */
2509 bs
->used_md_pages
= spdk_bit_array_create(1);
2510 bs
->used_blobids
= spdk_bit_array_create(0);
2512 pthread_mutex_init(&bs
->used_clusters_mutex
, NULL
);
2514 spdk_io_device_register(bs
, _spdk_bs_channel_create
, _spdk_bs_channel_destroy
,
2515 sizeof(struct spdk_bs_channel
), "blobstore");
2516 rc
= spdk_bs_register_md_thread(bs
);
2518 spdk_io_device_unregister(bs
, NULL
);
2519 pthread_mutex_destroy(&bs
->used_clusters_mutex
);
2520 spdk_bit_array_free(&bs
->used_blobids
);
2521 spdk_bit_array_free(&bs
->used_md_pages
);
2522 spdk_bit_array_free(&bs
->used_clusters
);
2524 /* FIXME: this is a lie but don't know how to get a proper error code here */
2532 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
2534 struct spdk_bs_load_ctx
{
2535 struct spdk_blob_store
*bs
;
2536 struct spdk_bs_super_block
*super
;
2538 struct spdk_bs_md_mask
*mask
;
2540 uint32_t page_index
;
2542 struct spdk_blob_md_page
*page
;
2544 spdk_bs_sequence_t
*seq
;
2545 spdk_blob_op_with_handle_complete iter_cb_fn
;
2547 struct spdk_blob
*blob
;
2548 spdk_blob_id blobid
;
2552 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t
*seq
, struct spdk_bs_load_ctx
*ctx
, int bserrno
)
2554 assert(bserrno
!= 0);
2556 spdk_free(ctx
->super
);
2557 spdk_bs_sequence_finish(seq
, bserrno
);
2558 _spdk_bs_free(ctx
->bs
);
2563 _spdk_bs_set_mask(struct spdk_bit_array
*array
, struct spdk_bs_md_mask
*mask
)
2568 i
= spdk_bit_array_find_first_set(array
, i
);
2569 if (i
>= mask
->length
) {
2572 mask
->mask
[i
/ 8] |= 1U << (i
% 8);
2578 _spdk_bs_load_mask(struct spdk_bit_array
**array_ptr
, struct spdk_bs_md_mask
*mask
)
2580 struct spdk_bit_array
*array
;
2583 if (spdk_bit_array_resize(array_ptr
, mask
->length
) < 0) {
2588 for (i
= 0; i
< mask
->length
; i
++) {
2589 if (mask
->mask
[i
/ 8] & (1U << (i
% 8))) {
2590 spdk_bit_array_set(array
, i
);
2598 _spdk_bs_write_super(spdk_bs_sequence_t
*seq
, struct spdk_blob_store
*bs
,
2599 struct spdk_bs_super_block
*super
, spdk_bs_sequence_cpl cb_fn
, void *cb_arg
)
2601 /* Update the values in the super block */
2602 super
->super_blob
= bs
->super_blob
;
2603 memcpy(&super
->bstype
, &bs
->bstype
, sizeof(bs
->bstype
));
2604 super
->crc
= _spdk_blob_md_page_calc_crc(super
);
2605 spdk_bs_sequence_write_dev(seq
, super
, _spdk_bs_page_to_lba(bs
, 0),
2606 _spdk_bs_byte_to_lba(bs
, sizeof(*super
)),
2611 _spdk_bs_write_used_clusters(spdk_bs_sequence_t
*seq
, void *arg
, spdk_bs_sequence_cpl cb_fn
)
2613 struct spdk_bs_load_ctx
*ctx
= arg
;
2614 uint64_t mask_size
, lba
, lba_count
;
2616 /* Write out the used clusters mask */
2617 mask_size
= ctx
->super
->used_cluster_mask_len
* SPDK_BS_PAGE_SIZE
;
2618 ctx
->mask
= spdk_zmalloc(mask_size
, 0x1000, NULL
,
2619 SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
2621 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
2625 ctx
->mask
->type
= SPDK_MD_MASK_TYPE_USED_CLUSTERS
;
2626 ctx
->mask
->length
= ctx
->bs
->total_clusters
;
2627 assert(ctx
->mask
->length
== spdk_bit_array_capacity(ctx
->bs
->used_clusters
));
2629 _spdk_bs_set_mask(ctx
->bs
->used_clusters
, ctx
->mask
);
2630 lba
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_cluster_mask_start
);
2631 lba_count
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_cluster_mask_len
);
2632 spdk_bs_sequence_write_dev(seq
, ctx
->mask
, lba
, lba_count
, cb_fn
, arg
);
2636 _spdk_bs_write_used_md(spdk_bs_sequence_t
*seq
, void *arg
, spdk_bs_sequence_cpl cb_fn
)
2638 struct spdk_bs_load_ctx
*ctx
= arg
;
2639 uint64_t mask_size
, lba
, lba_count
;
2642 _spdk_bs_load_ctx_fail(seq
, ctx
, seq
->bserrno
);
2646 mask_size
= ctx
->super
->used_page_mask_len
* SPDK_BS_PAGE_SIZE
;
2647 ctx
->mask
= spdk_zmalloc(mask_size
, 0x1000, NULL
,
2648 SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
2650 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
2654 ctx
->mask
->type
= SPDK_MD_MASK_TYPE_USED_PAGES
;
2655 ctx
->mask
->length
= ctx
->super
->md_len
;
2656 assert(ctx
->mask
->length
== spdk_bit_array_capacity(ctx
->bs
->used_md_pages
));
2658 _spdk_bs_set_mask(ctx
->bs
->used_md_pages
, ctx
->mask
);
2659 lba
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_page_mask_start
);
2660 lba_count
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_page_mask_len
);
2661 spdk_bs_sequence_write_dev(seq
, ctx
->mask
, lba
, lba_count
, cb_fn
, arg
);
2665 _spdk_bs_write_used_blobids(spdk_bs_sequence_t
*seq
, void *arg
, spdk_bs_sequence_cpl cb_fn
)
2667 struct spdk_bs_load_ctx
*ctx
= arg
;
2668 uint64_t mask_size
, lba
, lba_count
;
2670 if (ctx
->super
->used_blobid_mask_len
== 0) {
2672 * This is a pre-v3 on-disk format where the blobid mask does not get
2679 mask_size
= ctx
->super
->used_blobid_mask_len
* SPDK_BS_PAGE_SIZE
;
2680 ctx
->mask
= spdk_zmalloc(mask_size
, 0x1000, NULL
, SPDK_ENV_SOCKET_ID_ANY
,
2683 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
2687 ctx
->mask
->type
= SPDK_MD_MASK_TYPE_USED_BLOBIDS
;
2688 ctx
->mask
->length
= ctx
->super
->md_len
;
2689 assert(ctx
->mask
->length
== spdk_bit_array_capacity(ctx
->bs
->used_blobids
));
2691 _spdk_bs_set_mask(ctx
->bs
->used_blobids
, ctx
->mask
);
2692 lba
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_blobid_mask_start
);
2693 lba_count
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_blobid_mask_len
);
2694 spdk_bs_sequence_write_dev(seq
, ctx
->mask
, lba
, lba_count
, cb_fn
, arg
);
2698 _spdk_blob_set_thin_provision(struct spdk_blob
*blob
)
2700 _spdk_blob_verify_md_op(blob
);
2701 blob
->invalid_flags
|= SPDK_BLOB_THIN_PROV
;
2702 blob
->state
= SPDK_BLOB_STATE_DIRTY
;
2705 static void _spdk_bs_load_iter(void *arg
, struct spdk_blob
*blob
, int bserrno
);
2708 _spdk_bs_delete_corrupted_blob_cpl(void *cb_arg
, int bserrno
)
2710 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
2714 /* Iterate to next blob (we can't use spdk_bs_iter_next function as our
2715 * last blob has been removed */
2716 page_num
= _spdk_bs_blobid_to_page(ctx
->blobid
);
2718 page_num
= spdk_bit_array_find_first_set(ctx
->bs
->used_blobids
, page_num
);
2719 if (page_num
>= spdk_bit_array_capacity(ctx
->bs
->used_blobids
)) {
2720 _spdk_bs_load_iter(ctx
, NULL
, -ENOENT
);
2724 id
= _spdk_bs_page_to_blobid(page_num
);
2726 spdk_bs_open_blob(ctx
->bs
, id
, _spdk_bs_load_iter
, ctx
);
2730 _spdk_bs_delete_corrupted_close_cb(void *cb_arg
, int bserrno
)
2732 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
2735 SPDK_ERRLOG("Failed to close corrupted blob\n");
2736 spdk_bs_iter_next(ctx
->bs
, ctx
->blob
, _spdk_bs_load_iter
, ctx
);
2740 spdk_bs_delete_blob(ctx
->bs
, ctx
->blobid
, _spdk_bs_delete_corrupted_blob_cpl
, ctx
);
2744 _spdk_bs_delete_corrupted_blob(void *cb_arg
, int bserrno
)
2746 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
2750 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n");
2751 spdk_bs_iter_next(ctx
->bs
, ctx
->blob
, _spdk_bs_load_iter
, ctx
);
2755 /* Snapshot and clone have the same copy of cluster map at this point.
2756 * Let's clear cluster map for snpashot now so that it won't be cleared
2757 * for clone later when we remove snapshot. Also set thin provision to
2758 * pass data corruption check */
2759 for (i
= 0; i
< ctx
->blob
->active
.num_clusters
; i
++) {
2760 ctx
->blob
->active
.clusters
[i
] = 0;
2763 ctx
->blob
->md_ro
= false;
2765 _spdk_blob_set_thin_provision(ctx
->blob
);
2767 ctx
->blobid
= ctx
->blob
->id
;
2769 spdk_blob_close(ctx
->blob
, _spdk_bs_delete_corrupted_close_cb
, ctx
);
2773 _spdk_bs_update_corrupted_blob(void *cb_arg
, int bserrno
)
2775 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
2778 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n");
2779 spdk_bs_iter_next(ctx
->bs
, ctx
->blob
, _spdk_bs_load_iter
, ctx
);
2783 ctx
->blob
->md_ro
= false;
2784 _spdk_blob_remove_xattr(ctx
->blob
, SNAPSHOT_PENDING_REMOVAL
, true);
2785 spdk_blob_set_read_only(ctx
->blob
);
2787 if (ctx
->iter_cb_fn
) {
2788 ctx
->iter_cb_fn(ctx
->iter_cb_arg
, ctx
->blob
, 0);
2790 _spdk_bs_blob_list_add(ctx
->blob
);
2792 spdk_bs_iter_next(ctx
->bs
, ctx
->blob
, _spdk_bs_load_iter
, ctx
);
2796 _spdk_bs_examine_clone(void *cb_arg
, struct spdk_blob
*blob
, int bserrno
)
2798 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
2801 SPDK_ERRLOG("Failed to open clone of a corrupted blob\n");
2802 spdk_bs_iter_next(ctx
->bs
, ctx
->blob
, _spdk_bs_load_iter
, ctx
);
2806 if (blob
->parent_id
== ctx
->blob
->id
) {
2807 /* Power failure occured before updating clone - keep snapshot */
2808 spdk_blob_close(blob
, _spdk_bs_update_corrupted_blob
, ctx
);
2810 /* Power failure occured after updating clone - remove snapshot */
2811 spdk_blob_close(blob
, _spdk_bs_delete_corrupted_blob
, ctx
);
2816 _spdk_bs_load_iter(void *arg
, struct spdk_blob
*blob
, int bserrno
)
2818 struct spdk_bs_load_ctx
*ctx
= arg
;
2824 /* Examine blob if it is corrupted after power failure. Fix
2825 * the ones that can be fixed and remove any other corrupted
2826 * ones. If it is not corrupted just process it */
2827 rc
= _spdk_blob_get_xattr_value(blob
, SNAPSHOT_PENDING_REMOVAL
, &value
, &len
, true);
2829 /* Not corrupted - process it and continue with iterating through blobs */
2830 if (ctx
->iter_cb_fn
) {
2831 ctx
->iter_cb_fn(ctx
->iter_cb_arg
, blob
, 0);
2833 _spdk_bs_blob_list_add(blob
);
2834 spdk_bs_iter_next(ctx
->bs
, blob
, _spdk_bs_load_iter
, ctx
);
2838 assert(len
== sizeof(spdk_blob_id
));
2842 /* Open clone to check if we are able to fix this blob or should we remove it */
2843 spdk_bs_open_blob(ctx
->bs
, *(spdk_blob_id
*)value
, _spdk_bs_examine_clone
, ctx
);
2845 } else if (bserrno
== -ENOENT
) {
2849 * This case needs to be looked at further. Same problem
2850 * exists with applications that rely on explicit blob
2851 * iteration. We should just skip the blob that failed
2852 * to load and continue on to the next one.
2854 SPDK_ERRLOG("Error in iterating blobs\n");
2857 ctx
->iter_cb_fn
= NULL
;
2859 spdk_free(ctx
->super
);
2860 spdk_free(ctx
->mask
);
2861 spdk_bs_sequence_finish(ctx
->seq
, bserrno
);
2866 _spdk_bs_load_complete(spdk_bs_sequence_t
*seq
, struct spdk_bs_load_ctx
*ctx
, int bserrno
)
2869 spdk_bs_iter_first(ctx
->bs
, _spdk_bs_load_iter
, ctx
);
2873 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
2875 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
2878 /* The type must be correct */
2879 assert(ctx
->mask
->type
== SPDK_MD_MASK_TYPE_USED_BLOBIDS
);
2881 /* The length of the mask (in bits) must not be greater than
2882 * the length of the buffer (converted to bits) */
2883 assert(ctx
->mask
->length
<= (ctx
->super
->used_blobid_mask_len
* SPDK_BS_PAGE_SIZE
* 8));
2885 /* The length of the mask must be exactly equal to the size
2886 * (in pages) of the metadata region */
2887 assert(ctx
->mask
->length
== ctx
->super
->md_len
);
2889 rc
= _spdk_bs_load_mask(&ctx
->bs
->used_blobids
, ctx
->mask
);
2891 spdk_free(ctx
->mask
);
2892 _spdk_bs_load_ctx_fail(seq
, ctx
, rc
);
2896 _spdk_bs_load_complete(seq
, ctx
, bserrno
);
2900 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
2902 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
2903 uint64_t lba
, lba_count
, mask_size
;
2906 /* The type must be correct */
2907 assert(ctx
->mask
->type
== SPDK_MD_MASK_TYPE_USED_CLUSTERS
);
2908 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2909 assert(ctx
->mask
->length
<= (ctx
->super
->used_cluster_mask_len
* sizeof(
2910 struct spdk_blob_md_page
) * 8));
2911 /* The length of the mask must be exactly equal to the total number of clusters */
2912 assert(ctx
->mask
->length
== ctx
->bs
->total_clusters
);
2914 rc
= _spdk_bs_load_mask(&ctx
->bs
->used_clusters
, ctx
->mask
);
2916 spdk_free(ctx
->mask
);
2917 _spdk_bs_load_ctx_fail(seq
, ctx
, rc
);
2921 ctx
->bs
->num_free_clusters
= spdk_bit_array_count_clear(ctx
->bs
->used_clusters
);
2922 assert(ctx
->bs
->num_free_clusters
<= ctx
->bs
->total_clusters
);
2924 spdk_free(ctx
->mask
);
2926 /* Read the used blobids mask */
2927 mask_size
= ctx
->super
->used_blobid_mask_len
* SPDK_BS_PAGE_SIZE
;
2928 ctx
->mask
= spdk_zmalloc(mask_size
, 0x1000, NULL
, SPDK_ENV_SOCKET_ID_ANY
,
2931 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
2934 lba
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_blobid_mask_start
);
2935 lba_count
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_blobid_mask_len
);
2936 spdk_bs_sequence_read_dev(seq
, ctx
->mask
, lba
, lba_count
,
2937 _spdk_bs_load_used_blobids_cpl
, ctx
);
2941 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
2943 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
2944 uint64_t lba
, lba_count
, mask_size
;
2947 /* The type must be correct */
2948 assert(ctx
->mask
->type
== SPDK_MD_MASK_TYPE_USED_PAGES
);
2949 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2950 assert(ctx
->mask
->length
<= (ctx
->super
->used_page_mask_len
* SPDK_BS_PAGE_SIZE
*
2952 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
2953 assert(ctx
->mask
->length
== ctx
->super
->md_len
);
2955 rc
= _spdk_bs_load_mask(&ctx
->bs
->used_md_pages
, ctx
->mask
);
2957 spdk_free(ctx
->mask
);
2958 _spdk_bs_load_ctx_fail(seq
, ctx
, rc
);
2962 spdk_free(ctx
->mask
);
2964 /* Read the used clusters mask */
2965 mask_size
= ctx
->super
->used_cluster_mask_len
* SPDK_BS_PAGE_SIZE
;
2966 ctx
->mask
= spdk_zmalloc(mask_size
, 0x1000, NULL
, SPDK_ENV_SOCKET_ID_ANY
,
2969 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
2972 lba
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_cluster_mask_start
);
2973 lba_count
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_cluster_mask_len
);
2974 spdk_bs_sequence_read_dev(seq
, ctx
->mask
, lba
, lba_count
,
2975 _spdk_bs_load_used_clusters_cpl
, ctx
);
2979 _spdk_bs_load_read_used_pages(spdk_bs_sequence_t
*seq
, void *cb_arg
)
2981 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
2982 uint64_t lba
, lba_count
, mask_size
;
2984 /* Read the used pages mask */
2985 mask_size
= ctx
->super
->used_page_mask_len
* SPDK_BS_PAGE_SIZE
;
2986 ctx
->mask
= spdk_zmalloc(mask_size
, 0x1000, NULL
,
2987 SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
2989 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
2993 lba
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_page_mask_start
);
2994 lba_count
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->used_page_mask_len
);
2995 spdk_bs_sequence_read_dev(seq
, ctx
->mask
, lba
, lba_count
,
2996 _spdk_bs_load_used_pages_cpl
, ctx
);
3000 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page
*page
, struct spdk_blob_store
*bs
)
3002 struct spdk_blob_md_descriptor
*desc
;
3003 size_t cur_desc
= 0;
3005 desc
= (struct spdk_blob_md_descriptor
*)page
->descriptors
;
3006 while (cur_desc
< sizeof(page
->descriptors
)) {
3007 if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_PADDING
) {
3008 if (desc
->length
== 0) {
3009 /* If padding and length are 0, this terminates the page */
3012 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_EXTENT
) {
3013 struct spdk_blob_md_descriptor_extent
*desc_extent
;
3015 unsigned int cluster_count
= 0;
3016 uint32_t cluster_idx
;
3018 desc_extent
= (struct spdk_blob_md_descriptor_extent
*)desc
;
3020 for (i
= 0; i
< desc_extent
->length
/ sizeof(desc_extent
->extents
[0]); i
++) {
3021 for (j
= 0; j
< desc_extent
->extents
[i
].length
; j
++) {
3022 cluster_idx
= desc_extent
->extents
[i
].cluster_idx
;
3024 * cluster_idx = 0 means an unallocated cluster - don't mark that
3025 * in the used cluster map.
3027 if (cluster_idx
!= 0) {
3028 spdk_bit_array_set(bs
->used_clusters
, cluster_idx
+ j
);
3029 if (bs
->num_free_clusters
== 0) {
3032 bs
->num_free_clusters
--;
3037 if (cluster_count
== 0) {
3040 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_XATTR
) {
3041 /* Skip this item */
3042 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL
) {
3043 /* Skip this item */
3044 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_FLAGS
) {
3045 /* Skip this item */
3050 /* Advance to the next descriptor */
3051 cur_desc
+= sizeof(*desc
) + desc
->length
;
3052 if (cur_desc
+ sizeof(*desc
) > sizeof(page
->descriptors
)) {
3055 desc
= (struct spdk_blob_md_descriptor
*)((uintptr_t)page
->descriptors
+ cur_desc
);
3060 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx
*ctx
)
3064 crc
= _spdk_blob_md_page_calc_crc(ctx
->page
);
3065 if (crc
!= ctx
->page
->crc
) {
3069 if (_spdk_bs_page_to_blobid(ctx
->cur_page
) != ctx
->page
->id
) {
3076 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t
*seq
, void *cb_arg
);
3079 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3081 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3083 _spdk_bs_load_complete(seq
, ctx
, bserrno
);
3087 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3089 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3091 spdk_free(ctx
->mask
);
3094 _spdk_bs_write_used_clusters(seq
, cb_arg
, _spdk_bs_load_write_used_clusters_cpl
);
3098 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3100 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3102 spdk_free(ctx
->mask
);
3105 _spdk_bs_write_used_blobids(seq
, cb_arg
, _spdk_bs_load_write_used_blobids_cpl
);
3109 _spdk_bs_load_write_used_md(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3111 _spdk_bs_write_used_md(seq
, cb_arg
, _spdk_bs_load_write_used_pages_cpl
);
3115 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3117 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3118 uint64_t num_md_clusters
;
3123 _spdk_bs_load_ctx_fail(seq
, ctx
, bserrno
);
3127 page_num
= ctx
->cur_page
;
3128 if (_spdk_bs_load_cur_md_page_valid(ctx
) == true) {
3129 if (ctx
->page
->sequence_num
== 0 || ctx
->in_page_chain
== true) {
3130 spdk_bit_array_set(ctx
->bs
->used_md_pages
, page_num
);
3131 if (ctx
->page
->sequence_num
== 0) {
3132 spdk_bit_array_set(ctx
->bs
->used_blobids
, page_num
);
3134 if (_spdk_bs_load_replay_md_parse_page(ctx
->page
, ctx
->bs
)) {
3135 _spdk_bs_load_ctx_fail(seq
, ctx
, -EILSEQ
);
3138 if (ctx
->page
->next
!= SPDK_INVALID_MD_PAGE
) {
3139 ctx
->in_page_chain
= true;
3140 ctx
->cur_page
= ctx
->page
->next
;
3141 _spdk_bs_load_replay_cur_md_page(seq
, cb_arg
);
3147 ctx
->in_page_chain
= false;
3151 } while (spdk_bit_array_get(ctx
->bs
->used_md_pages
, ctx
->page_index
) == true);
3153 if (ctx
->page_index
< ctx
->super
->md_len
) {
3154 ctx
->cur_page
= ctx
->page_index
;
3155 _spdk_bs_load_replay_cur_md_page(seq
, cb_arg
);
3157 /* Claim all of the clusters used by the metadata */
3158 num_md_clusters
= spdk_divide_round_up(ctx
->super
->md_len
, ctx
->bs
->pages_per_cluster
);
3159 for (i
= 0; i
< num_md_clusters
; i
++) {
3160 _spdk_bs_claim_cluster(ctx
->bs
, i
);
3162 spdk_free(ctx
->page
);
3163 _spdk_bs_load_write_used_md(seq
, ctx
, bserrno
);
3168 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t
*seq
, void *cb_arg
)
3170 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3173 assert(ctx
->cur_page
< ctx
->super
->md_len
);
3174 lba
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->md_start
+ ctx
->cur_page
);
3175 spdk_bs_sequence_read_dev(seq
, ctx
->page
, lba
,
3176 _spdk_bs_byte_to_lba(ctx
->bs
, SPDK_BS_PAGE_SIZE
),
3177 _spdk_bs_load_replay_md_cpl
, ctx
);
3181 _spdk_bs_load_replay_md(spdk_bs_sequence_t
*seq
, void *cb_arg
)
3183 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3185 ctx
->page_index
= 0;
3187 ctx
->page
= spdk_zmalloc(SPDK_BS_PAGE_SIZE
, SPDK_BS_PAGE_SIZE
,
3188 NULL
, SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
3190 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
3193 _spdk_bs_load_replay_cur_md_page(seq
, cb_arg
);
3197 _spdk_bs_recover(spdk_bs_sequence_t
*seq
, void *cb_arg
)
3199 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3202 rc
= spdk_bit_array_resize(&ctx
->bs
->used_md_pages
, ctx
->super
->md_len
);
3204 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
3208 rc
= spdk_bit_array_resize(&ctx
->bs
->used_blobids
, ctx
->super
->md_len
);
3210 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
3214 rc
= spdk_bit_array_resize(&ctx
->bs
->used_clusters
, ctx
->bs
->total_clusters
);
3216 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
3220 ctx
->bs
->num_free_clusters
= ctx
->bs
->total_clusters
;
3221 _spdk_bs_load_replay_md(seq
, cb_arg
);
3225 _spdk_bs_load_super_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3227 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3230 static const char zeros
[SPDK_BLOBSTORE_TYPE_LENGTH
];
3232 if (ctx
->super
->version
> SPDK_BS_VERSION
||
3233 ctx
->super
->version
< SPDK_BS_INITIAL_VERSION
) {
3234 _spdk_bs_load_ctx_fail(seq
, ctx
, -EILSEQ
);
3238 if (memcmp(ctx
->super
->signature
, SPDK_BS_SUPER_BLOCK_SIG
,
3239 sizeof(ctx
->super
->signature
)) != 0) {
3240 _spdk_bs_load_ctx_fail(seq
, ctx
, -EILSEQ
);
3244 crc
= _spdk_blob_md_page_calc_crc(ctx
->super
);
3245 if (crc
!= ctx
->super
->crc
) {
3246 _spdk_bs_load_ctx_fail(seq
, ctx
, -EILSEQ
);
3250 if (memcmp(&ctx
->bs
->bstype
, &ctx
->super
->bstype
, SPDK_BLOBSTORE_TYPE_LENGTH
) == 0) {
3251 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Bstype matched - loading blobstore\n");
3252 } else if (memcmp(&ctx
->bs
->bstype
, zeros
, SPDK_BLOBSTORE_TYPE_LENGTH
) == 0) {
3253 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Bstype wildcard used - loading blobstore regardless bstype\n");
3255 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Unexpected bstype\n");
3256 SPDK_LOGDUMP(SPDK_LOG_BLOB
, "Expected:", ctx
->bs
->bstype
.bstype
, SPDK_BLOBSTORE_TYPE_LENGTH
);
3257 SPDK_LOGDUMP(SPDK_LOG_BLOB
, "Found:", ctx
->super
->bstype
.bstype
, SPDK_BLOBSTORE_TYPE_LENGTH
);
3258 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENXIO
);
3262 if (ctx
->super
->size
> ctx
->bs
->dev
->blockcnt
* ctx
->bs
->dev
->blocklen
) {
3263 SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n",
3264 ctx
->bs
->dev
->blockcnt
* ctx
->bs
->dev
->blocklen
, ctx
->super
->size
);
3265 _spdk_bs_load_ctx_fail(seq
, ctx
, -EILSEQ
);
3269 if (ctx
->super
->size
== 0) {
3270 ctx
->super
->size
= ctx
->bs
->dev
->blockcnt
* ctx
->bs
->dev
->blocklen
;
3273 if (ctx
->super
->io_unit_size
== 0) {
3274 ctx
->super
->io_unit_size
= SPDK_BS_PAGE_SIZE
;
3277 /* Parse the super block */
3279 ctx
->bs
->cluster_sz
= ctx
->super
->cluster_size
;
3280 ctx
->bs
->total_clusters
= ctx
->super
->size
/ ctx
->super
->cluster_size
;
3281 ctx
->bs
->pages_per_cluster
= ctx
->bs
->cluster_sz
/ SPDK_BS_PAGE_SIZE
;
3282 ctx
->bs
->io_unit_size
= ctx
->super
->io_unit_size
;
3283 rc
= spdk_bit_array_resize(&ctx
->bs
->used_clusters
, ctx
->bs
->total_clusters
);
3285 _spdk_bs_load_ctx_fail(seq
, ctx
, -ENOMEM
);
3288 ctx
->bs
->md_start
= ctx
->super
->md_start
;
3289 ctx
->bs
->md_len
= ctx
->super
->md_len
;
3290 ctx
->bs
->total_data_clusters
= ctx
->bs
->total_clusters
- spdk_divide_round_up(
3291 ctx
->bs
->md_start
+ ctx
->bs
->md_len
, ctx
->bs
->pages_per_cluster
);
3292 ctx
->bs
->super_blob
= ctx
->super
->super_blob
;
3293 memcpy(&ctx
->bs
->bstype
, &ctx
->super
->bstype
, sizeof(ctx
->super
->bstype
));
3295 if (ctx
->super
->used_blobid_mask_len
== 0 || ctx
->super
->clean
== 0) {
3296 _spdk_bs_recover(seq
, ctx
);
3298 _spdk_bs_load_read_used_pages(seq
, ctx
);
3303 spdk_bs_load(struct spdk_bs_dev
*dev
, struct spdk_bs_opts
*o
,
3304 spdk_bs_op_with_handle_complete cb_fn
, void *cb_arg
)
3306 struct spdk_blob_store
*bs
;
3307 struct spdk_bs_cpl cpl
;
3308 spdk_bs_sequence_t
*seq
;
3309 struct spdk_bs_load_ctx
*ctx
;
3310 struct spdk_bs_opts opts
= {};
3313 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Loading blobstore from dev %p\n", dev
);
3315 if ((SPDK_BS_PAGE_SIZE
% dev
->blocklen
) != 0) {
3316 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "unsupported dev block length of %d\n", dev
->blocklen
);
3318 cb_fn(cb_arg
, NULL
, -EINVAL
);
3325 spdk_bs_opts_init(&opts
);
3328 if (opts
.max_md_ops
== 0 || opts
.max_channel_ops
== 0) {
3330 cb_fn(cb_arg
, NULL
, -EINVAL
);
3334 err
= _spdk_bs_alloc(dev
, &opts
, &bs
);
3337 cb_fn(cb_arg
, NULL
, err
);
3341 ctx
= calloc(1, sizeof(*ctx
));
3344 cb_fn(cb_arg
, NULL
, -ENOMEM
);
3349 ctx
->iter_cb_fn
= opts
.iter_cb_fn
;
3350 ctx
->iter_cb_arg
= opts
.iter_cb_arg
;
3352 /* Allocate memory for the super block */
3353 ctx
->super
= spdk_zmalloc(sizeof(*ctx
->super
), 0x1000, NULL
,
3354 SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
3358 cb_fn(cb_arg
, NULL
, -ENOMEM
);
3362 cpl
.type
= SPDK_BS_CPL_TYPE_BS_HANDLE
;
3363 cpl
.u
.bs_handle
.cb_fn
= cb_fn
;
3364 cpl
.u
.bs_handle
.cb_arg
= cb_arg
;
3365 cpl
.u
.bs_handle
.bs
= bs
;
3367 seq
= spdk_bs_sequence_start(bs
->md_channel
, &cpl
);
3369 spdk_free(ctx
->super
);
3372 cb_fn(cb_arg
, NULL
, -ENOMEM
);
3376 /* Read the super block */
3377 spdk_bs_sequence_read_dev(seq
, ctx
->super
, _spdk_bs_page_to_lba(bs
, 0),
3378 _spdk_bs_byte_to_lba(bs
, sizeof(*ctx
->super
)),
3379 _spdk_bs_load_super_cpl
, ctx
);
3382 /* END spdk_bs_load */
3384 /* START spdk_bs_dump */
3386 struct spdk_bs_dump_ctx
{
3387 struct spdk_blob_store
*bs
;
3388 struct spdk_bs_super_block
*super
;
3390 struct spdk_blob_md_page
*page
;
3391 spdk_bs_sequence_t
*seq
;
3393 spdk_bs_dump_print_xattr print_xattr_fn
;
3394 char xattr_name
[4096];
3398 _spdk_bs_dump_finish(spdk_bs_sequence_t
*seq
, struct spdk_bs_dump_ctx
*ctx
, int bserrno
)
3400 spdk_free(ctx
->super
);
3403 * We need to defer calling spdk_bs_call_cpl() until after
3404 * dev destruction, so tuck these away for later use.
3406 ctx
->bs
->unload_err
= bserrno
;
3407 memcpy(&ctx
->bs
->unload_cpl
, &seq
->cpl
, sizeof(struct spdk_bs_cpl
));
3408 seq
->cpl
.type
= SPDK_BS_CPL_TYPE_NONE
;
3410 spdk_bs_sequence_finish(seq
, 0);
3411 _spdk_bs_free(ctx
->bs
);
3415 static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t
*seq
, void *cb_arg
);
3418 _spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx
*ctx
)
3420 uint32_t page_idx
= ctx
->cur_page
;
3421 struct spdk_blob_md_page
*page
= ctx
->page
;
3422 struct spdk_blob_md_descriptor
*desc
;
3423 size_t cur_desc
= 0;
3426 fprintf(ctx
->fp
, "=========\n");
3427 fprintf(ctx
->fp
, "Metadata Page Index: %" PRIu32
" (0x%" PRIx32
")\n", page_idx
, page_idx
);
3428 fprintf(ctx
->fp
, "Blob ID: 0x%" PRIx64
"\n", page
->id
);
3430 crc
= _spdk_blob_md_page_calc_crc(page
);
3431 fprintf(ctx
->fp
, "CRC: 0x%" PRIx32
" (%s)\n", page
->crc
, crc
== page
->crc
? "OK" : "Mismatch");
3433 desc
= (struct spdk_blob_md_descriptor
*)page
->descriptors
;
3434 while (cur_desc
< sizeof(page
->descriptors
)) {
3435 if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_PADDING
) {
3436 if (desc
->length
== 0) {
3437 /* If padding and length are 0, this terminates the page */
3440 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_EXTENT
) {
3441 struct spdk_blob_md_descriptor_extent
*desc_extent
;
3444 desc_extent
= (struct spdk_blob_md_descriptor_extent
*)desc
;
3446 for (i
= 0; i
< desc_extent
->length
/ sizeof(desc_extent
->extents
[0]); i
++) {
3447 if (desc_extent
->extents
[i
].cluster_idx
!= 0) {
3448 fprintf(ctx
->fp
, "Allocated Extent - Start: %" PRIu32
,
3449 desc_extent
->extents
[i
].cluster_idx
);
3451 fprintf(ctx
->fp
, "Unallocated Extent - ");
3453 fprintf(ctx
->fp
, " Length: %" PRIu32
, desc_extent
->extents
[i
].length
);
3454 fprintf(ctx
->fp
, "\n");
3456 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_XATTR
) {
3457 struct spdk_blob_md_descriptor_xattr
*desc_xattr
;
3460 desc_xattr
= (struct spdk_blob_md_descriptor_xattr
*)desc
;
3462 if (desc_xattr
->length
!=
3463 sizeof(desc_xattr
->name_length
) + sizeof(desc_xattr
->value_length
) +
3464 desc_xattr
->name_length
+ desc_xattr
->value_length
) {
3467 memcpy(ctx
->xattr_name
, desc_xattr
->name
, desc_xattr
->name_length
);
3468 ctx
->xattr_name
[desc_xattr
->name_length
] = '\0';
3469 fprintf(ctx
->fp
, "XATTR: name = \"%s\"\n", ctx
->xattr_name
);
3470 fprintf(ctx
->fp
, " value = \"");
3471 ctx
->print_xattr_fn(ctx
->fp
, ctx
->super
->bstype
.bstype
, ctx
->xattr_name
,
3472 (void *)((uintptr_t)desc_xattr
->name
+ desc_xattr
->name_length
),
3473 desc_xattr
->value_length
);
3474 fprintf(ctx
->fp
, "\"\n");
3475 for (i
= 0; i
< desc_xattr
->value_length
; i
++) {
3477 fprintf(ctx
->fp
, " ");
3479 fprintf(ctx
->fp
, "%02" PRIx8
" ", *((uint8_t *)desc_xattr
->name
+ desc_xattr
->name_length
+ i
));
3480 if ((i
+ 1) % 16 == 0) {
3481 fprintf(ctx
->fp
, "\n");
3485 fprintf(ctx
->fp
, "\n");
3487 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL
) {
3489 } else if (desc
->type
== SPDK_MD_DESCRIPTOR_TYPE_FLAGS
) {
3494 /* Advance to the next descriptor */
3495 cur_desc
+= sizeof(*desc
) + desc
->length
;
3496 if (cur_desc
+ sizeof(*desc
) > sizeof(page
->descriptors
)) {
3499 desc
= (struct spdk_blob_md_descriptor
*)((uintptr_t)page
->descriptors
+ cur_desc
);
3504 _spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3506 struct spdk_bs_dump_ctx
*ctx
= cb_arg
;
3509 _spdk_bs_dump_finish(seq
, ctx
, bserrno
);
3513 if (ctx
->page
->id
!= 0) {
3514 _spdk_bs_dump_print_md_page(ctx
);
3519 if (ctx
->cur_page
< ctx
->super
->md_len
) {
3520 _spdk_bs_dump_read_md_page(seq
, cb_arg
);
3522 spdk_free(ctx
->page
);
3523 _spdk_bs_dump_finish(seq
, ctx
, 0);
3528 _spdk_bs_dump_read_md_page(spdk_bs_sequence_t
*seq
, void *cb_arg
)
3530 struct spdk_bs_dump_ctx
*ctx
= cb_arg
;
3533 assert(ctx
->cur_page
< ctx
->super
->md_len
);
3534 lba
= _spdk_bs_page_to_lba(ctx
->bs
, ctx
->super
->md_start
+ ctx
->cur_page
);
3535 spdk_bs_sequence_read_dev(seq
, ctx
->page
, lba
,
3536 _spdk_bs_byte_to_lba(ctx
->bs
, SPDK_BS_PAGE_SIZE
),
3537 _spdk_bs_dump_read_md_page_cpl
, ctx
);
3541 _spdk_bs_dump_super_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3543 struct spdk_bs_dump_ctx
*ctx
= cb_arg
;
3545 fprintf(ctx
->fp
, "Signature: \"%.8s\" ", ctx
->super
->signature
);
3546 if (memcmp(ctx
->super
->signature
, SPDK_BS_SUPER_BLOCK_SIG
,
3547 sizeof(ctx
->super
->signature
)) != 0) {
3548 fprintf(ctx
->fp
, "(Mismatch)\n");
3549 _spdk_bs_dump_finish(seq
, ctx
, bserrno
);
3552 fprintf(ctx
->fp
, "(OK)\n");
3554 fprintf(ctx
->fp
, "Version: %" PRIu32
"\n", ctx
->super
->version
);
3555 fprintf(ctx
->fp
, "CRC: 0x%x (%s)\n", ctx
->super
->crc
,
3556 (ctx
->super
->crc
== _spdk_blob_md_page_calc_crc(ctx
->super
)) ? "OK" : "Mismatch");
3557 fprintf(ctx
->fp
, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH
, ctx
->super
->bstype
.bstype
);
3558 fprintf(ctx
->fp
, "Cluster Size: %" PRIu32
"\n", ctx
->super
->cluster_size
);
3559 fprintf(ctx
->fp
, "Super Blob ID: ");
3560 if (ctx
->super
->super_blob
== SPDK_BLOBID_INVALID
) {
3561 fprintf(ctx
->fp
, "(None)\n");
3563 fprintf(ctx
->fp
, "%" PRIu64
"\n", ctx
->super
->super_blob
);
3565 fprintf(ctx
->fp
, "Clean: %" PRIu32
"\n", ctx
->super
->clean
);
3566 fprintf(ctx
->fp
, "Used Metadata Page Mask Start: %" PRIu32
"\n", ctx
->super
->used_page_mask_start
);
3567 fprintf(ctx
->fp
, "Used Metadata Page Mask Length: %" PRIu32
"\n", ctx
->super
->used_page_mask_len
);
3568 fprintf(ctx
->fp
, "Used Cluster Mask Start: %" PRIu32
"\n", ctx
->super
->used_cluster_mask_start
);
3569 fprintf(ctx
->fp
, "Used Cluster Mask Length: %" PRIu32
"\n", ctx
->super
->used_cluster_mask_len
);
3570 fprintf(ctx
->fp
, "Used Blob ID Mask Start: %" PRIu32
"\n", ctx
->super
->used_blobid_mask_start
);
3571 fprintf(ctx
->fp
, "Used Blob ID Mask Length: %" PRIu32
"\n", ctx
->super
->used_blobid_mask_len
);
3572 fprintf(ctx
->fp
, "Metadata Start: %" PRIu32
"\n", ctx
->super
->md_start
);
3573 fprintf(ctx
->fp
, "Metadata Length: %" PRIu32
"\n", ctx
->super
->md_len
);
3576 ctx
->page
= spdk_zmalloc(SPDK_BS_PAGE_SIZE
, SPDK_BS_PAGE_SIZE
,
3577 NULL
, SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
3579 _spdk_bs_dump_finish(seq
, ctx
, -ENOMEM
);
3582 _spdk_bs_dump_read_md_page(seq
, cb_arg
);
3586 spdk_bs_dump(struct spdk_bs_dev
*dev
, FILE *fp
, spdk_bs_dump_print_xattr print_xattr_fn
,
3587 spdk_bs_op_complete cb_fn
, void *cb_arg
)
3589 struct spdk_blob_store
*bs
;
3590 struct spdk_bs_cpl cpl
;
3591 spdk_bs_sequence_t
*seq
;
3592 struct spdk_bs_dump_ctx
*ctx
;
3593 struct spdk_bs_opts opts
= {};
3596 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Dumping blobstore from dev %p\n", dev
);
3598 spdk_bs_opts_init(&opts
);
3600 err
= _spdk_bs_alloc(dev
, &opts
, &bs
);
3607 ctx
= calloc(1, sizeof(*ctx
));
3610 cb_fn(cb_arg
, -ENOMEM
);
3616 ctx
->print_xattr_fn
= print_xattr_fn
;
3618 /* Allocate memory for the super block */
3619 ctx
->super
= spdk_zmalloc(sizeof(*ctx
->super
), 0x1000, NULL
,
3620 SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
3624 cb_fn(cb_arg
, -ENOMEM
);
3628 cpl
.type
= SPDK_BS_CPL_TYPE_BS_BASIC
;
3629 cpl
.u
.bs_basic
.cb_fn
= cb_fn
;
3630 cpl
.u
.bs_basic
.cb_arg
= cb_arg
;
3632 seq
= spdk_bs_sequence_start(bs
->md_channel
, &cpl
);
3634 spdk_free(ctx
->super
);
3637 cb_fn(cb_arg
, -ENOMEM
);
3641 /* Read the super block */
3642 spdk_bs_sequence_read_dev(seq
, ctx
->super
, _spdk_bs_page_to_lba(bs
, 0),
3643 _spdk_bs_byte_to_lba(bs
, sizeof(*ctx
->super
)),
3644 _spdk_bs_dump_super_cpl
, ctx
);
3647 /* END spdk_bs_dump */
3649 /* START spdk_bs_init */
3651 struct spdk_bs_init_ctx
{
3652 struct spdk_blob_store
*bs
;
3653 struct spdk_bs_super_block
*super
;
3657 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3659 struct spdk_bs_init_ctx
*ctx
= cb_arg
;
3661 spdk_free(ctx
->super
);
3664 spdk_bs_sequence_finish(seq
, bserrno
);
3668 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3670 struct spdk_bs_init_ctx
*ctx
= cb_arg
;
3672 /* Write super block */
3673 spdk_bs_sequence_write_dev(seq
, ctx
->super
, _spdk_bs_page_to_lba(ctx
->bs
, 0),
3674 _spdk_bs_byte_to_lba(ctx
->bs
, sizeof(*ctx
->super
)),
3675 _spdk_bs_init_persist_super_cpl
, ctx
);
3679 spdk_bs_init(struct spdk_bs_dev
*dev
, struct spdk_bs_opts
*o
,
3680 spdk_bs_op_with_handle_complete cb_fn
, void *cb_arg
)
3682 struct spdk_bs_init_ctx
*ctx
;
3683 struct spdk_blob_store
*bs
;
3684 struct spdk_bs_cpl cpl
;
3685 spdk_bs_sequence_t
*seq
;
3686 spdk_bs_batch_t
*batch
;
3687 uint64_t num_md_lba
;
3688 uint64_t num_md_pages
;
3689 uint64_t num_md_clusters
;
3691 struct spdk_bs_opts opts
= {};
3694 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Initializing blobstore on dev %p\n", dev
);
3696 if ((SPDK_BS_PAGE_SIZE
% dev
->blocklen
) != 0) {
3697 SPDK_ERRLOG("unsupported dev block length of %d\n",
3700 cb_fn(cb_arg
, NULL
, -EINVAL
);
3707 spdk_bs_opts_init(&opts
);
3710 if (_spdk_bs_opts_verify(&opts
) != 0) {
3712 cb_fn(cb_arg
, NULL
, -EINVAL
);
3716 rc
= _spdk_bs_alloc(dev
, &opts
, &bs
);
3719 cb_fn(cb_arg
, NULL
, rc
);
3723 if (opts
.num_md_pages
== SPDK_BLOB_OPTS_NUM_MD_PAGES
) {
3724 /* By default, allocate 1 page per cluster.
3725 * Technically, this over-allocates metadata
3726 * because more metadata will reduce the number
3727 * of usable clusters. This can be addressed with
3728 * more complex math in the future.
3730 bs
->md_len
= bs
->total_clusters
;
3732 bs
->md_len
= opts
.num_md_pages
;
3734 rc
= spdk_bit_array_resize(&bs
->used_md_pages
, bs
->md_len
);
3737 cb_fn(cb_arg
, NULL
, -ENOMEM
);
3741 rc
= spdk_bit_array_resize(&bs
->used_blobids
, bs
->md_len
);
3744 cb_fn(cb_arg
, NULL
, -ENOMEM
);
3748 ctx
= calloc(1, sizeof(*ctx
));
3751 cb_fn(cb_arg
, NULL
, -ENOMEM
);
3757 /* Allocate memory for the super block */
3758 ctx
->super
= spdk_zmalloc(sizeof(*ctx
->super
), 0x1000, NULL
,
3759 SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
3763 cb_fn(cb_arg
, NULL
, -ENOMEM
);
3766 memcpy(ctx
->super
->signature
, SPDK_BS_SUPER_BLOCK_SIG
,
3767 sizeof(ctx
->super
->signature
));
3768 ctx
->super
->version
= SPDK_BS_VERSION
;
3769 ctx
->super
->length
= sizeof(*ctx
->super
);
3770 ctx
->super
->super_blob
= bs
->super_blob
;
3771 ctx
->super
->clean
= 0;
3772 ctx
->super
->cluster_size
= bs
->cluster_sz
;
3773 ctx
->super
->io_unit_size
= bs
->io_unit_size
;
3774 memcpy(&ctx
->super
->bstype
, &bs
->bstype
, sizeof(bs
->bstype
));
3776 /* Calculate how many pages the metadata consumes at the front
3780 /* The super block uses 1 page */
3783 /* The used_md_pages mask requires 1 bit per metadata page, rounded
3784 * up to the nearest page, plus a header.
3786 ctx
->super
->used_page_mask_start
= num_md_pages
;
3787 ctx
->super
->used_page_mask_len
= spdk_divide_round_up(sizeof(struct spdk_bs_md_mask
) +
3788 spdk_divide_round_up(bs
->md_len
, 8),
3790 num_md_pages
+= ctx
->super
->used_page_mask_len
;
3792 /* The used_clusters mask requires 1 bit per cluster, rounded
3793 * up to the nearest page, plus a header.
3795 ctx
->super
->used_cluster_mask_start
= num_md_pages
;
3796 ctx
->super
->used_cluster_mask_len
= spdk_divide_round_up(sizeof(struct spdk_bs_md_mask
) +
3797 spdk_divide_round_up(bs
->total_clusters
, 8),
3799 num_md_pages
+= ctx
->super
->used_cluster_mask_len
;
3801 /* The used_blobids mask requires 1 bit per metadata page, rounded
3802 * up to the nearest page, plus a header.
3804 ctx
->super
->used_blobid_mask_start
= num_md_pages
;
3805 ctx
->super
->used_blobid_mask_len
= spdk_divide_round_up(sizeof(struct spdk_bs_md_mask
) +
3806 spdk_divide_round_up(bs
->md_len
, 8),
3808 num_md_pages
+= ctx
->super
->used_blobid_mask_len
;
3810 /* The metadata region size was chosen above */
3811 ctx
->super
->md_start
= bs
->md_start
= num_md_pages
;
3812 ctx
->super
->md_len
= bs
->md_len
;
3813 num_md_pages
+= bs
->md_len
;
3815 num_md_lba
= _spdk_bs_page_to_lba(bs
, num_md_pages
);
3817 ctx
->super
->size
= dev
->blockcnt
* dev
->blocklen
;
3819 ctx
->super
->crc
= _spdk_blob_md_page_calc_crc(ctx
->super
);
3821 num_md_clusters
= spdk_divide_round_up(num_md_pages
, bs
->pages_per_cluster
);
3822 if (num_md_clusters
> bs
->total_clusters
) {
3823 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
3824 "please decrease number of pages reserved for metadata "
3825 "or increase cluster size.\n");
3826 spdk_free(ctx
->super
);
3829 cb_fn(cb_arg
, NULL
, -ENOMEM
);
3832 /* Claim all of the clusters used by the metadata */
3833 for (i
= 0; i
< num_md_clusters
; i
++) {
3834 _spdk_bs_claim_cluster(bs
, i
);
3837 bs
->total_data_clusters
= bs
->num_free_clusters
;
3839 cpl
.type
= SPDK_BS_CPL_TYPE_BS_HANDLE
;
3840 cpl
.u
.bs_handle
.cb_fn
= cb_fn
;
3841 cpl
.u
.bs_handle
.cb_arg
= cb_arg
;
3842 cpl
.u
.bs_handle
.bs
= bs
;
3844 seq
= spdk_bs_sequence_start(bs
->md_channel
, &cpl
);
3846 spdk_free(ctx
->super
);
3849 cb_fn(cb_arg
, NULL
, -ENOMEM
);
3853 batch
= spdk_bs_sequence_to_batch(seq
, _spdk_bs_init_trim_cpl
, ctx
);
3855 /* Clear metadata space */
3856 spdk_bs_batch_write_zeroes_dev(batch
, 0, num_md_lba
);
3858 if (opts
.clear_method
== BS_CLEAR_WITH_UNMAP
) {
3859 /* Trim data clusters */
3860 spdk_bs_batch_unmap_dev(batch
, num_md_lba
, ctx
->bs
->dev
->blockcnt
- num_md_lba
);
3861 } else if (opts
.clear_method
== BS_CLEAR_WITH_WRITE_ZEROES
) {
3862 /* Write_zeroes to data clusters */
3863 spdk_bs_batch_write_zeroes_dev(batch
, num_md_lba
, ctx
->bs
->dev
->blockcnt
- num_md_lba
);
3866 spdk_bs_batch_close(batch
);
3869 /* END spdk_bs_init */
3871 /* START spdk_bs_destroy */
3874 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3876 struct spdk_bs_init_ctx
*ctx
= cb_arg
;
3877 struct spdk_blob_store
*bs
= ctx
->bs
;
3880 * We need to defer calling spdk_bs_call_cpl() until after
3881 * dev destruction, so tuck these away for later use.
3883 bs
->unload_err
= bserrno
;
3884 memcpy(&bs
->unload_cpl
, &seq
->cpl
, sizeof(struct spdk_bs_cpl
));
3885 seq
->cpl
.type
= SPDK_BS_CPL_TYPE_NONE
;
3887 spdk_bs_sequence_finish(seq
, bserrno
);
3894 spdk_bs_destroy(struct spdk_blob_store
*bs
, spdk_bs_op_complete cb_fn
,
3897 struct spdk_bs_cpl cpl
;
3898 spdk_bs_sequence_t
*seq
;
3899 struct spdk_bs_init_ctx
*ctx
;
3901 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Destroying blobstore\n");
3903 if (!TAILQ_EMPTY(&bs
->blobs
)) {
3904 SPDK_ERRLOG("Blobstore still has open blobs\n");
3905 cb_fn(cb_arg
, -EBUSY
);
3909 cpl
.type
= SPDK_BS_CPL_TYPE_BS_BASIC
;
3910 cpl
.u
.bs_basic
.cb_fn
= cb_fn
;
3911 cpl
.u
.bs_basic
.cb_arg
= cb_arg
;
3913 ctx
= calloc(1, sizeof(*ctx
));
3915 cb_fn(cb_arg
, -ENOMEM
);
3921 seq
= spdk_bs_sequence_start(bs
->md_channel
, &cpl
);
3924 cb_fn(cb_arg
, -ENOMEM
);
3928 /* Write zeroes to the super block */
3929 spdk_bs_sequence_write_zeroes_dev(seq
,
3930 _spdk_bs_page_to_lba(bs
, 0),
3931 _spdk_bs_byte_to_lba(bs
, sizeof(struct spdk_bs_super_block
)),
3932 _spdk_bs_destroy_trim_cpl
, ctx
);
3935 /* END spdk_bs_destroy */
3937 /* START spdk_bs_unload */
3940 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3942 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3944 spdk_free(ctx
->super
);
3947 * We need to defer calling spdk_bs_call_cpl() until after
3948 * dev destruction, so tuck these away for later use.
3950 ctx
->bs
->unload_err
= bserrno
;
3951 memcpy(&ctx
->bs
->unload_cpl
, &seq
->cpl
, sizeof(struct spdk_bs_cpl
));
3952 seq
->cpl
.type
= SPDK_BS_CPL_TYPE_NONE
;
3954 spdk_bs_sequence_finish(seq
, bserrno
);
3956 _spdk_bs_free(ctx
->bs
);
3961 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3963 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3965 spdk_free(ctx
->mask
);
3966 ctx
->super
->clean
= 1;
3968 _spdk_bs_write_super(seq
, ctx
->bs
, ctx
->super
, _spdk_bs_unload_write_super_cpl
, ctx
);
3972 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3974 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3976 spdk_free(ctx
->mask
);
3979 _spdk_bs_write_used_clusters(seq
, cb_arg
, _spdk_bs_unload_write_used_clusters_cpl
);
3983 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3985 struct spdk_bs_load_ctx
*ctx
= cb_arg
;
3987 spdk_free(ctx
->mask
);
3990 _spdk_bs_write_used_blobids(seq
, cb_arg
, _spdk_bs_unload_write_used_blobids_cpl
);
3994 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
3996 _spdk_bs_write_used_md(seq
, cb_arg
, _spdk_bs_unload_write_used_pages_cpl
);
4000 spdk_bs_unload(struct spdk_blob_store
*bs
, spdk_bs_op_complete cb_fn
, void *cb_arg
)
4002 struct spdk_bs_cpl cpl
;
4003 spdk_bs_sequence_t
*seq
;
4004 struct spdk_bs_load_ctx
*ctx
;
4006 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Syncing blobstore\n");
4008 if (!TAILQ_EMPTY(&bs
->blobs
)) {
4009 SPDK_ERRLOG("Blobstore still has open blobs\n");
4010 cb_fn(cb_arg
, -EBUSY
);
4014 ctx
= calloc(1, sizeof(*ctx
));
4016 cb_fn(cb_arg
, -ENOMEM
);
4022 ctx
->super
= spdk_zmalloc(sizeof(*ctx
->super
), 0x1000, NULL
,
4023 SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
4026 cb_fn(cb_arg
, -ENOMEM
);
4030 cpl
.type
= SPDK_BS_CPL_TYPE_BS_BASIC
;
4031 cpl
.u
.bs_basic
.cb_fn
= cb_fn
;
4032 cpl
.u
.bs_basic
.cb_arg
= cb_arg
;
4034 seq
= spdk_bs_sequence_start(bs
->md_channel
, &cpl
);
4036 spdk_free(ctx
->super
);
4038 cb_fn(cb_arg
, -ENOMEM
);
4042 /* Read super block */
4043 spdk_bs_sequence_read_dev(seq
, ctx
->super
, _spdk_bs_page_to_lba(bs
, 0),
4044 _spdk_bs_byte_to_lba(bs
, sizeof(*ctx
->super
)),
4045 _spdk_bs_unload_read_super_cpl
, ctx
);
4048 /* END spdk_bs_unload */
4050 /* START spdk_bs_set_super */
4052 struct spdk_bs_set_super_ctx
{
4053 struct spdk_blob_store
*bs
;
4054 struct spdk_bs_super_block
*super
;
4058 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
4060 struct spdk_bs_set_super_ctx
*ctx
= cb_arg
;
4063 SPDK_ERRLOG("Unable to write to super block of blobstore\n");
4066 spdk_free(ctx
->super
);
4068 spdk_bs_sequence_finish(seq
, bserrno
);
4074 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
4076 struct spdk_bs_set_super_ctx
*ctx
= cb_arg
;
4079 SPDK_ERRLOG("Unable to read super block of blobstore\n");
4080 spdk_free(ctx
->super
);
4081 spdk_bs_sequence_finish(seq
, bserrno
);
4086 _spdk_bs_write_super(seq
, ctx
->bs
, ctx
->super
, _spdk_bs_set_super_write_cpl
, ctx
);
4090 spdk_bs_set_super(struct spdk_blob_store
*bs
, spdk_blob_id blobid
,
4091 spdk_bs_op_complete cb_fn
, void *cb_arg
)
4093 struct spdk_bs_cpl cpl
;
4094 spdk_bs_sequence_t
*seq
;
4095 struct spdk_bs_set_super_ctx
*ctx
;
4097 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Setting super blob id on blobstore\n");
4099 ctx
= calloc(1, sizeof(*ctx
));
4101 cb_fn(cb_arg
, -ENOMEM
);
4107 ctx
->super
= spdk_zmalloc(sizeof(*ctx
->super
), 0x1000, NULL
,
4108 SPDK_ENV_SOCKET_ID_ANY
, SPDK_MALLOC_DMA
);
4111 cb_fn(cb_arg
, -ENOMEM
);
4115 cpl
.type
= SPDK_BS_CPL_TYPE_BS_BASIC
;
4116 cpl
.u
.bs_basic
.cb_fn
= cb_fn
;
4117 cpl
.u
.bs_basic
.cb_arg
= cb_arg
;
4119 seq
= spdk_bs_sequence_start(bs
->md_channel
, &cpl
);
4121 spdk_free(ctx
->super
);
4123 cb_fn(cb_arg
, -ENOMEM
);
4127 bs
->super_blob
= blobid
;
4129 /* Read super block */
4130 spdk_bs_sequence_read_dev(seq
, ctx
->super
, _spdk_bs_page_to_lba(bs
, 0),
4131 _spdk_bs_byte_to_lba(bs
, sizeof(*ctx
->super
)),
4132 _spdk_bs_set_super_read_cpl
, ctx
);
4135 /* END spdk_bs_set_super */
4138 spdk_bs_get_super(struct spdk_blob_store
*bs
,
4139 spdk_blob_op_with_id_complete cb_fn
, void *cb_arg
)
4141 if (bs
->super_blob
== SPDK_BLOBID_INVALID
) {
4142 cb_fn(cb_arg
, SPDK_BLOBID_INVALID
, -ENOENT
);
4144 cb_fn(cb_arg
, bs
->super_blob
, 0);
4149 spdk_bs_get_cluster_size(struct spdk_blob_store
*bs
)
4151 return bs
->cluster_sz
;
4155 spdk_bs_get_page_size(struct spdk_blob_store
*bs
)
4157 return SPDK_BS_PAGE_SIZE
;
4161 spdk_bs_get_io_unit_size(struct spdk_blob_store
*bs
)
4163 return bs
->io_unit_size
;
4167 spdk_bs_free_cluster_count(struct spdk_blob_store
*bs
)
4169 return bs
->num_free_clusters
;
4173 spdk_bs_total_data_cluster_count(struct spdk_blob_store
*bs
)
4175 return bs
->total_data_clusters
;
4179 spdk_bs_register_md_thread(struct spdk_blob_store
*bs
)
4181 bs
->md_channel
= spdk_get_io_channel(bs
);
4182 if (!bs
->md_channel
) {
4183 SPDK_ERRLOG("Failed to get IO channel.\n");
4191 spdk_bs_unregister_md_thread(struct spdk_blob_store
*bs
)
4193 spdk_put_io_channel(bs
->md_channel
);
4198 spdk_blob_id
spdk_blob_get_id(struct spdk_blob
*blob
)
4200 assert(blob
!= NULL
);
4205 uint64_t spdk_blob_get_num_pages(struct spdk_blob
*blob
)
4207 assert(blob
!= NULL
);
4209 return _spdk_bs_cluster_to_page(blob
->bs
, blob
->active
.num_clusters
);
4212 uint64_t spdk_blob_get_num_io_units(struct spdk_blob
*blob
)
4214 assert(blob
!= NULL
);
4216 return spdk_blob_get_num_pages(blob
) * _spdk_bs_io_unit_per_page(blob
->bs
);
4219 uint64_t spdk_blob_get_num_clusters(struct spdk_blob
*blob
)
4221 assert(blob
!= NULL
);
4223 return blob
->active
.num_clusters
;
4226 /* START spdk_bs_create_blob */
4229 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
4231 struct spdk_blob
*blob
= cb_arg
;
4233 _spdk_blob_free(blob
);
4235 spdk_bs_sequence_finish(seq
, bserrno
);
4239 _spdk_blob_set_xattrs(struct spdk_blob
*blob
, const struct spdk_blob_xattr_opts
*xattrs
,
4243 size_t value_len
= 0;
4245 const void *value
= NULL
;
4246 if (xattrs
->count
> 0 && xattrs
->get_value
== NULL
) {
4249 for (i
= 0; i
< xattrs
->count
; i
++) {
4250 xattrs
->get_value(xattrs
->ctx
, xattrs
->names
[i
], &value
, &value_len
);
4251 if (value
== NULL
|| value_len
== 0) {
4254 rc
= _spdk_blob_set_xattr(blob
, xattrs
->names
[i
], value
, value_len
, internal
);
4263 _spdk_bs_create_blob(struct spdk_blob_store
*bs
,
4264 const struct spdk_blob_opts
*opts
,
4265 const struct spdk_blob_xattr_opts
*internal_xattrs
,
4266 spdk_blob_op_with_id_complete cb_fn
, void *cb_arg
)
4268 struct spdk_blob
*blob
;
4270 struct spdk_bs_cpl cpl
;
4271 struct spdk_blob_opts opts_default
;
4272 struct spdk_blob_xattr_opts internal_xattrs_default
;
4273 spdk_bs_sequence_t
*seq
;
4277 assert(spdk_get_thread() == bs
->md_thread
);
4279 page_idx
= spdk_bit_array_find_first_clear(bs
->used_md_pages
, 0);
4280 if (page_idx
== UINT32_MAX
) {
4281 cb_fn(cb_arg
, 0, -ENOMEM
);
4284 spdk_bit_array_set(bs
->used_blobids
, page_idx
);
4285 spdk_bit_array_set(bs
->used_md_pages
, page_idx
);
4287 id
= _spdk_bs_page_to_blobid(page_idx
);
4289 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Creating blob with id %lu at page %u\n", id
, page_idx
);
4291 blob
= _spdk_blob_alloc(bs
, id
);
4293 cb_fn(cb_arg
, 0, -ENOMEM
);
4298 spdk_blob_opts_init(&opts_default
);
4299 opts
= &opts_default
;
4301 if (!internal_xattrs
) {
4302 _spdk_blob_xattrs_init(&internal_xattrs_default
);
4303 internal_xattrs
= &internal_xattrs_default
;
4306 rc
= _spdk_blob_set_xattrs(blob
, &opts
->xattrs
, false);
4308 _spdk_blob_free(blob
);
4309 cb_fn(cb_arg
, 0, rc
);
4313 rc
= _spdk_blob_set_xattrs(blob
, internal_xattrs
, true);
4315 _spdk_blob_free(blob
);
4316 cb_fn(cb_arg
, 0, rc
);
4320 if (opts
->thin_provision
) {
4321 _spdk_blob_set_thin_provision(blob
);
4324 rc
= _spdk_blob_resize(blob
, opts
->num_clusters
);
4326 _spdk_blob_free(blob
);
4327 cb_fn(cb_arg
, 0, rc
);
4330 cpl
.type
= SPDK_BS_CPL_TYPE_BLOBID
;
4331 cpl
.u
.blobid
.cb_fn
= cb_fn
;
4332 cpl
.u
.blobid
.cb_arg
= cb_arg
;
4333 cpl
.u
.blobid
.blobid
= blob
->id
;
4335 seq
= spdk_bs_sequence_start(bs
->md_channel
, &cpl
);
4337 _spdk_blob_free(blob
);
4338 cb_fn(cb_arg
, 0, -ENOMEM
);
4342 _spdk_blob_persist(seq
, blob
, _spdk_bs_create_blob_cpl
, blob
);
4345 void spdk_bs_create_blob(struct spdk_blob_store
*bs
,
4346 spdk_blob_op_with_id_complete cb_fn
, void *cb_arg
)
4348 _spdk_bs_create_blob(bs
, NULL
, NULL
, cb_fn
, cb_arg
);
4351 void spdk_bs_create_blob_ext(struct spdk_blob_store
*bs
, const struct spdk_blob_opts
*opts
,
4352 spdk_blob_op_with_id_complete cb_fn
, void *cb_arg
)
4354 _spdk_bs_create_blob(bs
, opts
, NULL
, cb_fn
, cb_arg
);
4357 /* END spdk_bs_create_blob */
4359 /* START blob_cleanup */
4361 struct spdk_clone_snapshot_ctx
{
4362 struct spdk_bs_cpl cpl
;
4366 struct spdk_io_channel
*channel
;
4368 /* Current cluster for inflate operation */
4371 /* For inflation force allocation of all unallocated clusters and remove
4372 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */
4377 struct spdk_blob
*blob
;
4381 struct spdk_blob
*blob
;
4384 /* xattrs specified for snapshot/clones only. They have no impact on
4385 * the original blobs xattrs. */
4386 const struct spdk_blob_xattr_opts
*xattrs
;
4390 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg
, int bserrno
)
4392 struct spdk_clone_snapshot_ctx
*ctx
= cb_arg
;
4393 struct spdk_bs_cpl
*cpl
= &ctx
->cpl
;
4396 if (ctx
->bserrno
!= 0) {
4397 SPDK_ERRLOG("Cleanup error %d\n", bserrno
);
4399 ctx
->bserrno
= bserrno
;
4403 switch (cpl
->type
) {
4404 case SPDK_BS_CPL_TYPE_BLOBID
:
4405 cpl
->u
.blobid
.cb_fn(cpl
->u
.blobid
.cb_arg
, cpl
->u
.blobid
.blobid
, ctx
->bserrno
);
4407 case SPDK_BS_CPL_TYPE_BLOB_BASIC
:
4408 cpl
->u
.blob_basic
.cb_fn(cpl
->u
.blob_basic
.cb_arg
, ctx
->bserrno
);
4419 _spdk_bs_snapshot_unfreeze_cpl(void *cb_arg
, int bserrno
)
4421 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4422 struct spdk_blob
*origblob
= ctx
->original
.blob
;
4425 if (ctx
->bserrno
!= 0) {
4426 SPDK_ERRLOG("Unfreeze error %d\n", bserrno
);
4428 ctx
->bserrno
= bserrno
;
4432 ctx
->original
.id
= origblob
->id
;
4433 origblob
->locked_operation_in_progress
= false;
4435 spdk_blob_close(origblob
, _spdk_bs_clone_snapshot_cleanup_finish
, ctx
);
4439 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg
, int bserrno
)
4441 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4442 struct spdk_blob
*origblob
= ctx
->original
.blob
;
4445 if (ctx
->bserrno
!= 0) {
4446 SPDK_ERRLOG("Cleanup error %d\n", bserrno
);
4448 ctx
->bserrno
= bserrno
;
4453 /* Unfreeze any outstanding I/O */
4454 _spdk_blob_unfreeze_io(origblob
, _spdk_bs_snapshot_unfreeze_cpl
, ctx
);
4456 _spdk_bs_snapshot_unfreeze_cpl(ctx
, 0);
4462 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg
, int bserrno
)
4464 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4465 struct spdk_blob
*newblob
= ctx
->new.blob
;
4468 if (ctx
->bserrno
!= 0) {
4469 SPDK_ERRLOG("Cleanup error %d\n", bserrno
);
4471 ctx
->bserrno
= bserrno
;
4475 ctx
->new.id
= newblob
->id
;
4476 spdk_blob_close(newblob
, _spdk_bs_clone_snapshot_origblob_cleanup
, ctx
);
4479 /* END blob_cleanup */
4481 /* START spdk_bs_create_snapshot */
4484 _spdk_bs_snapshot_swap_cluster_maps(struct spdk_blob
*blob1
, struct spdk_blob
*blob2
)
4486 uint64_t *cluster_temp
;
4488 cluster_temp
= blob1
->active
.clusters
;
4489 blob1
->active
.clusters
= blob2
->active
.clusters
;
4490 blob2
->active
.clusters
= cluster_temp
;
4494 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg
, int bserrno
)
4496 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4497 struct spdk_blob
*origblob
= ctx
->original
.blob
;
4498 struct spdk_blob
*newblob
= ctx
->new.blob
;
4501 _spdk_bs_snapshot_swap_cluster_maps(newblob
, origblob
);
4502 _spdk_bs_clone_snapshot_newblob_cleanup(ctx
, bserrno
);
4506 /* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */
4507 bserrno
= _spdk_blob_remove_xattr(newblob
, SNAPSHOT_IN_PROGRESS
, true);
4509 _spdk_bs_clone_snapshot_origblob_cleanup(ctx
, bserrno
);
4513 _spdk_bs_blob_list_add(ctx
->original
.blob
);
4515 spdk_blob_set_read_only(newblob
);
4517 /* sync snapshot metadata */
4518 spdk_blob_sync_md(newblob
, _spdk_bs_clone_snapshot_origblob_cleanup
, cb_arg
);
4522 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg
, int bserrno
)
4524 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4525 struct spdk_blob
*origblob
= ctx
->original
.blob
;
4526 struct spdk_blob
*newblob
= ctx
->new.blob
;
4529 /* return cluster map back to original */
4530 _spdk_bs_snapshot_swap_cluster_maps(newblob
, origblob
);
4531 _spdk_bs_clone_snapshot_newblob_cleanup(ctx
, bserrno
);
4535 /* Set internal xattr for snapshot id */
4536 bserrno
= _spdk_blob_set_xattr(origblob
, BLOB_SNAPSHOT
, &newblob
->id
, sizeof(spdk_blob_id
), true);
4538 /* return cluster map back to original */
4539 _spdk_bs_snapshot_swap_cluster_maps(newblob
, origblob
);
4540 _spdk_bs_clone_snapshot_newblob_cleanup(ctx
, bserrno
);
4544 _spdk_bs_blob_list_remove(origblob
);
4545 origblob
->parent_id
= newblob
->id
;
4547 /* Create new back_bs_dev for snapshot */
4548 origblob
->back_bs_dev
= spdk_bs_create_blob_bs_dev(newblob
);
4549 if (origblob
->back_bs_dev
== NULL
) {
4550 /* return cluster map back to original */
4551 _spdk_bs_snapshot_swap_cluster_maps(newblob
, origblob
);
4552 _spdk_bs_clone_snapshot_newblob_cleanup(ctx
, -EINVAL
);
4556 /* set clone blob as thin provisioned */
4557 _spdk_blob_set_thin_provision(origblob
);
4559 _spdk_bs_blob_list_add(newblob
);
4561 /* sync clone metadata */
4562 spdk_blob_sync_md(origblob
, _spdk_bs_snapshot_origblob_sync_cpl
, ctx
);
4566 _spdk_bs_snapshot_freeze_cpl(void *cb_arg
, int rc
)
4568 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4569 struct spdk_blob
*origblob
= ctx
->original
.blob
;
4570 struct spdk_blob
*newblob
= ctx
->new.blob
;
4574 _spdk_bs_clone_snapshot_newblob_cleanup(ctx
, rc
);
4580 /* set new back_bs_dev for snapshot */
4581 newblob
->back_bs_dev
= origblob
->back_bs_dev
;
4582 /* Set invalid flags from origblob */
4583 newblob
->invalid_flags
= origblob
->invalid_flags
;
4585 /* inherit parent from original blob if set */
4586 newblob
->parent_id
= origblob
->parent_id
;
4587 if (origblob
->parent_id
!= SPDK_BLOBID_INVALID
) {
4588 /* Set internal xattr for snapshot id */
4589 bserrno
= _spdk_blob_set_xattr(newblob
, BLOB_SNAPSHOT
,
4590 &origblob
->parent_id
, sizeof(spdk_blob_id
), true);
4592 _spdk_bs_clone_snapshot_newblob_cleanup(ctx
, bserrno
);
4597 /* swap cluster maps */
4598 _spdk_bs_snapshot_swap_cluster_maps(newblob
, origblob
);
4600 /* sync snapshot metadata */
4601 spdk_blob_sync_md(newblob
, _spdk_bs_snapshot_newblob_sync_cpl
, ctx
);
4605 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg
, struct spdk_blob
*_blob
, int bserrno
)
4607 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4608 struct spdk_blob
*origblob
= ctx
->original
.blob
;
4609 struct spdk_blob
*newblob
= _blob
;
4612 _spdk_bs_clone_snapshot_origblob_cleanup(ctx
, bserrno
);
4616 ctx
->new.blob
= newblob
;
4618 /* Zero out newblob cluster map */
4619 memset(newblob
->active
.clusters
, 0,
4620 newblob
->active
.num_clusters
* sizeof(newblob
->active
.clusters
));
4622 _spdk_blob_freeze_io(origblob
, _spdk_bs_snapshot_freeze_cpl
, ctx
);
4626 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg
, spdk_blob_id blobid
, int bserrno
)
4628 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4629 struct spdk_blob
*origblob
= ctx
->original
.blob
;
4632 _spdk_bs_clone_snapshot_origblob_cleanup(ctx
, bserrno
);
4636 ctx
->new.id
= blobid
;
4637 ctx
->cpl
.u
.blobid
.blobid
= blobid
;
4639 spdk_bs_open_blob(origblob
->bs
, ctx
->new.id
, _spdk_bs_snapshot_newblob_open_cpl
, ctx
);
4644 _spdk_bs_xattr_snapshot(void *arg
, const char *name
,
4645 const void **value
, size_t *value_len
)
4647 assert(strncmp(name
, SNAPSHOT_IN_PROGRESS
, sizeof(SNAPSHOT_IN_PROGRESS
)) == 0);
4649 struct spdk_blob
*blob
= (struct spdk_blob
*)arg
;
4651 *value_len
= sizeof(blob
->id
);
4655 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg
, struct spdk_blob
*_blob
, int bserrno
)
4657 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4658 struct spdk_blob_opts opts
;
4659 struct spdk_blob_xattr_opts internal_xattrs
;
4660 char *xattrs_names
[] = { SNAPSHOT_IN_PROGRESS
};
4663 _spdk_bs_clone_snapshot_cleanup_finish(ctx
, bserrno
);
4667 ctx
->original
.blob
= _blob
;
4669 if (_blob
->data_ro
|| _blob
->md_ro
) {
4670 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Cannot create snapshot from read only blob with id %lu\n",
4672 ctx
->bserrno
= -EINVAL
;
4673 spdk_blob_close(_blob
, _spdk_bs_clone_snapshot_cleanup_finish
, ctx
);
4677 if (_blob
->locked_operation_in_progress
) {
4678 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Cannot create snapshot - another operation in progress\n");
4679 ctx
->bserrno
= -EBUSY
;
4680 spdk_blob_close(_blob
, _spdk_bs_clone_snapshot_cleanup_finish
, ctx
);
4684 _blob
->locked_operation_in_progress
= true;
4686 spdk_blob_opts_init(&opts
);
4687 _spdk_blob_xattrs_init(&internal_xattrs
);
4689 /* Change the size of new blob to the same as in original blob,
4690 * but do not allocate clusters */
4691 opts
.thin_provision
= true;
4692 opts
.num_clusters
= spdk_blob_get_num_clusters(_blob
);
4694 /* If there are any xattrs specified for snapshot, set them now */
4696 memcpy(&opts
.xattrs
, ctx
->xattrs
, sizeof(*ctx
->xattrs
));
4698 /* Set internal xattr SNAPSHOT_IN_PROGRESS */
4699 internal_xattrs
.count
= 1;
4700 internal_xattrs
.ctx
= _blob
;
4701 internal_xattrs
.names
= xattrs_names
;
4702 internal_xattrs
.get_value
= _spdk_bs_xattr_snapshot
;
4704 _spdk_bs_create_blob(_blob
->bs
, &opts
, &internal_xattrs
,
4705 _spdk_bs_snapshot_newblob_create_cpl
, ctx
);
4708 void spdk_bs_create_snapshot(struct spdk_blob_store
*bs
, spdk_blob_id blobid
,
4709 const struct spdk_blob_xattr_opts
*snapshot_xattrs
,
4710 spdk_blob_op_with_id_complete cb_fn
, void *cb_arg
)
4712 struct spdk_clone_snapshot_ctx
*ctx
= calloc(1, sizeof(*ctx
));
4715 cb_fn(cb_arg
, SPDK_BLOBID_INVALID
, -ENOMEM
);
4718 ctx
->cpl
.type
= SPDK_BS_CPL_TYPE_BLOBID
;
4719 ctx
->cpl
.u
.blobid
.cb_fn
= cb_fn
;
4720 ctx
->cpl
.u
.blobid
.cb_arg
= cb_arg
;
4721 ctx
->cpl
.u
.blobid
.blobid
= SPDK_BLOBID_INVALID
;
4723 ctx
->frozen
= false;
4724 ctx
->original
.id
= blobid
;
4725 ctx
->xattrs
= snapshot_xattrs
;
4727 spdk_bs_open_blob(bs
, ctx
->original
.id
, _spdk_bs_snapshot_origblob_open_cpl
, ctx
);
4729 /* END spdk_bs_create_snapshot */
4731 /* START spdk_bs_create_clone */
4734 _spdk_bs_xattr_clone(void *arg
, const char *name
,
4735 const void **value
, size_t *value_len
)
4737 assert(strncmp(name
, BLOB_SNAPSHOT
, sizeof(BLOB_SNAPSHOT
)) == 0);
4739 struct spdk_blob
*blob
= (struct spdk_blob
*)arg
;
4741 *value_len
= sizeof(blob
->id
);
4745 _spdk_bs_clone_newblob_open_cpl(void *cb_arg
, struct spdk_blob
*_blob
, int bserrno
)
4747 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4748 struct spdk_blob
*clone
= _blob
;
4750 ctx
->new.blob
= clone
;
4751 _spdk_bs_blob_list_add(clone
);
4753 spdk_blob_close(clone
, _spdk_bs_clone_snapshot_origblob_cleanup
, ctx
);
4757 _spdk_bs_clone_newblob_create_cpl(void *cb_arg
, spdk_blob_id blobid
, int bserrno
)
4759 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4761 ctx
->cpl
.u
.blobid
.blobid
= blobid
;
4762 spdk_bs_open_blob(ctx
->original
.blob
->bs
, blobid
, _spdk_bs_clone_newblob_open_cpl
, ctx
);
4766 _spdk_bs_clone_origblob_open_cpl(void *cb_arg
, struct spdk_blob
*_blob
, int bserrno
)
4768 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4769 struct spdk_blob_opts opts
;
4770 struct spdk_blob_xattr_opts internal_xattrs
;
4771 char *xattr_names
[] = { BLOB_SNAPSHOT
};
4774 _spdk_bs_clone_snapshot_cleanup_finish(ctx
, bserrno
);
4778 ctx
->original
.blob
= _blob
;
4780 if (!_blob
->data_ro
|| !_blob
->md_ro
) {
4781 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Clone not from read-only blob\n");
4782 ctx
->bserrno
= -EINVAL
;
4783 spdk_blob_close(_blob
, _spdk_bs_clone_snapshot_cleanup_finish
, ctx
);
4787 if (_blob
->locked_operation_in_progress
) {
4788 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Cannot create clone - another operation in progress\n");
4789 ctx
->bserrno
= -EBUSY
;
4790 spdk_blob_close(_blob
, _spdk_bs_clone_snapshot_cleanup_finish
, ctx
);
4794 _blob
->locked_operation_in_progress
= true;
4796 spdk_blob_opts_init(&opts
);
4797 _spdk_blob_xattrs_init(&internal_xattrs
);
4799 opts
.thin_provision
= true;
4800 opts
.num_clusters
= spdk_blob_get_num_clusters(_blob
);
4802 memcpy(&opts
.xattrs
, ctx
->xattrs
, sizeof(*ctx
->xattrs
));
4805 /* Set internal xattr BLOB_SNAPSHOT */
4806 internal_xattrs
.count
= 1;
4807 internal_xattrs
.ctx
= _blob
;
4808 internal_xattrs
.names
= xattr_names
;
4809 internal_xattrs
.get_value
= _spdk_bs_xattr_clone
;
4811 _spdk_bs_create_blob(_blob
->bs
, &opts
, &internal_xattrs
,
4812 _spdk_bs_clone_newblob_create_cpl
, ctx
);
4815 void spdk_bs_create_clone(struct spdk_blob_store
*bs
, spdk_blob_id blobid
,
4816 const struct spdk_blob_xattr_opts
*clone_xattrs
,
4817 spdk_blob_op_with_id_complete cb_fn
, void *cb_arg
)
4819 struct spdk_clone_snapshot_ctx
*ctx
= calloc(1, sizeof(*ctx
));
4822 cb_fn(cb_arg
, SPDK_BLOBID_INVALID
, -ENOMEM
);
4826 ctx
->cpl
.type
= SPDK_BS_CPL_TYPE_BLOBID
;
4827 ctx
->cpl
.u
.blobid
.cb_fn
= cb_fn
;
4828 ctx
->cpl
.u
.blobid
.cb_arg
= cb_arg
;
4829 ctx
->cpl
.u
.blobid
.blobid
= SPDK_BLOBID_INVALID
;
4831 ctx
->xattrs
= clone_xattrs
;
4832 ctx
->original
.id
= blobid
;
4834 spdk_bs_open_blob(bs
, ctx
->original
.id
, _spdk_bs_clone_origblob_open_cpl
, ctx
);
4837 /* END spdk_bs_create_clone */
4839 /* START spdk_bs_inflate_blob */
4842 _spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg
, struct spdk_blob
*_parent
, int bserrno
)
4844 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4845 struct spdk_blob
*_blob
= ctx
->original
.blob
;
4848 _spdk_bs_clone_snapshot_origblob_cleanup(ctx
, bserrno
);
4852 assert(_parent
!= NULL
);
4854 _spdk_bs_blob_list_remove(_blob
);
4855 _blob
->parent_id
= _parent
->id
;
4856 _spdk_blob_set_xattr(_blob
, BLOB_SNAPSHOT
, &_blob
->parent_id
,
4857 sizeof(spdk_blob_id
), true);
4859 _blob
->back_bs_dev
->destroy(_blob
->back_bs_dev
);
4860 _blob
->back_bs_dev
= spdk_bs_create_blob_bs_dev(_parent
);
4861 _spdk_bs_blob_list_add(_blob
);
4863 spdk_blob_sync_md(_blob
, _spdk_bs_clone_snapshot_origblob_cleanup
, ctx
);
4867 _spdk_bs_inflate_blob_done(void *cb_arg
, int bserrno
)
4869 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4870 struct spdk_blob
*_blob
= ctx
->original
.blob
;
4871 struct spdk_blob
*_parent
;
4874 _spdk_bs_clone_snapshot_origblob_cleanup(ctx
, bserrno
);
4878 if (ctx
->allocate_all
) {
4879 /* remove thin provisioning */
4880 _spdk_bs_blob_list_remove(_blob
);
4881 _spdk_blob_remove_xattr(_blob
, BLOB_SNAPSHOT
, true);
4882 _blob
->invalid_flags
= _blob
->invalid_flags
& ~SPDK_BLOB_THIN_PROV
;
4883 _blob
->back_bs_dev
->destroy(_blob
->back_bs_dev
);
4884 _blob
->back_bs_dev
= NULL
;
4885 _blob
->parent_id
= SPDK_BLOBID_INVALID
;
4887 _parent
= ((struct spdk_blob_bs_dev
*)(_blob
->back_bs_dev
))->blob
;
4888 if (_parent
->parent_id
!= SPDK_BLOBID_INVALID
) {
4889 /* We must change the parent of the inflated blob */
4890 spdk_bs_open_blob(_blob
->bs
, _parent
->parent_id
,
4891 _spdk_bs_inflate_blob_set_parent_cpl
, ctx
);
4895 _spdk_bs_blob_list_remove(_blob
);
4896 _spdk_blob_remove_xattr(_blob
, BLOB_SNAPSHOT
, true);
4897 _blob
->parent_id
= SPDK_BLOBID_INVALID
;
4898 _blob
->back_bs_dev
->destroy(_blob
->back_bs_dev
);
4899 _blob
->back_bs_dev
= spdk_bs_create_zeroes_dev();
4902 _blob
->state
= SPDK_BLOB_STATE_DIRTY
;
4903 spdk_blob_sync_md(_blob
, _spdk_bs_clone_snapshot_origblob_cleanup
, ctx
);
4906 /* Check if cluster needs allocation */
4908 _spdk_bs_cluster_needs_allocation(struct spdk_blob
*blob
, uint64_t cluster
, bool allocate_all
)
4910 struct spdk_blob_bs_dev
*b
;
4912 assert(blob
!= NULL
);
4914 if (blob
->active
.clusters
[cluster
] != 0) {
4915 /* Cluster is already allocated */
4919 if (blob
->parent_id
== SPDK_BLOBID_INVALID
) {
4920 /* Blob have no parent blob */
4921 return allocate_all
;
4924 b
= (struct spdk_blob_bs_dev
*)blob
->back_bs_dev
;
4925 return (allocate_all
|| b
->blob
->active
.clusters
[cluster
] != 0);
4929 _spdk_bs_inflate_blob_touch_next(void *cb_arg
, int bserrno
)
4931 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4932 struct spdk_blob
*_blob
= ctx
->original
.blob
;
4936 _spdk_bs_clone_snapshot_origblob_cleanup(ctx
, bserrno
);
4940 for (; ctx
->cluster
< _blob
->active
.num_clusters
; ctx
->cluster
++) {
4941 if (_spdk_bs_cluster_needs_allocation(_blob
, ctx
->cluster
, ctx
->allocate_all
)) {
4946 if (ctx
->cluster
< _blob
->active
.num_clusters
) {
4947 offset
= _spdk_bs_cluster_to_lba(_blob
->bs
, ctx
->cluster
);
4949 /* We may safely increment a cluster before write */
4952 /* Use zero length write to touch a cluster */
4953 spdk_blob_io_write(_blob
, ctx
->channel
, NULL
, offset
, 0,
4954 _spdk_bs_inflate_blob_touch_next
, ctx
);
4956 _spdk_bs_inflate_blob_done(cb_arg
, bserrno
);
4961 _spdk_bs_inflate_blob_open_cpl(void *cb_arg
, struct spdk_blob
*_blob
, int bserrno
)
4963 struct spdk_clone_snapshot_ctx
*ctx
= (struct spdk_clone_snapshot_ctx
*)cb_arg
;
4964 uint64_t lfc
; /* lowest free cluster */
4968 _spdk_bs_clone_snapshot_cleanup_finish(ctx
, bserrno
);
4972 ctx
->original
.blob
= _blob
;
4974 if (_blob
->locked_operation_in_progress
) {
4975 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Cannot inflate blob - another operation in progress\n");
4976 ctx
->bserrno
= -EBUSY
;
4977 spdk_blob_close(_blob
, _spdk_bs_clone_snapshot_cleanup_finish
, ctx
);
4981 _blob
->locked_operation_in_progress
= true;
4983 if (!ctx
->allocate_all
&& _blob
->parent_id
== SPDK_BLOBID_INVALID
) {
4984 /* This blob have no parent, so we cannot decouple it. */
4985 SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n");
4986 _spdk_bs_clone_snapshot_origblob_cleanup(ctx
, -EINVAL
);
4990 if (spdk_blob_is_thin_provisioned(_blob
) == false) {
4991 /* This is not thin provisioned blob. No need to inflate. */
4992 _spdk_bs_clone_snapshot_origblob_cleanup(ctx
, 0);
4996 /* Do two passes - one to verify that we can obtain enough clusters
4997 * and another to actually claim them.
5000 for (i
= 0; i
< _blob
->active
.num_clusters
; i
++) {
5001 if (_spdk_bs_cluster_needs_allocation(_blob
, i
, ctx
->allocate_all
)) {
5002 lfc
= spdk_bit_array_find_first_clear(_blob
->bs
->used_clusters
, lfc
);
5003 if (lfc
== UINT32_MAX
) {
5004 /* No more free clusters. Cannot satisfy the request */
5005 _spdk_bs_clone_snapshot_origblob_cleanup(ctx
, -ENOSPC
);
5013 _spdk_bs_inflate_blob_touch_next(ctx
, 0);
5017 _spdk_bs_inflate_blob(struct spdk_blob_store
*bs
, struct spdk_io_channel
*channel
,
5018 spdk_blob_id blobid
, bool allocate_all
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5020 struct spdk_clone_snapshot_ctx
*ctx
= calloc(1, sizeof(*ctx
));
5023 cb_fn(cb_arg
, -ENOMEM
);
5026 ctx
->cpl
.type
= SPDK_BS_CPL_TYPE_BLOB_BASIC
;
5027 ctx
->cpl
.u
.bs_basic
.cb_fn
= cb_fn
;
5028 ctx
->cpl
.u
.bs_basic
.cb_arg
= cb_arg
;
5030 ctx
->original
.id
= blobid
;
5031 ctx
->channel
= channel
;
5032 ctx
->allocate_all
= allocate_all
;
5034 spdk_bs_open_blob(bs
, ctx
->original
.id
, _spdk_bs_inflate_blob_open_cpl
, ctx
);
5038 spdk_bs_inflate_blob(struct spdk_blob_store
*bs
, struct spdk_io_channel
*channel
,
5039 spdk_blob_id blobid
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5041 _spdk_bs_inflate_blob(bs
, channel
, blobid
, true, cb_fn
, cb_arg
);
5045 spdk_bs_blob_decouple_parent(struct spdk_blob_store
*bs
, struct spdk_io_channel
*channel
,
5046 spdk_blob_id blobid
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5048 _spdk_bs_inflate_blob(bs
, channel
, blobid
, false, cb_fn
, cb_arg
);
5050 /* END spdk_bs_inflate_blob */
5052 /* START spdk_blob_resize */
5053 struct spdk_bs_resize_ctx
{
5054 spdk_blob_op_complete cb_fn
;
5056 struct spdk_blob
*blob
;
5062 _spdk_bs_resize_unfreeze_cpl(void *cb_arg
, int rc
)
5064 struct spdk_bs_resize_ctx
*ctx
= (struct spdk_bs_resize_ctx
*)cb_arg
;
5067 SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc
);
5071 SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx
->rc
);
5075 ctx
->blob
->locked_operation_in_progress
= false;
5077 ctx
->cb_fn(ctx
->cb_arg
, rc
);
5082 _spdk_bs_resize_freeze_cpl(void *cb_arg
, int rc
)
5084 struct spdk_bs_resize_ctx
*ctx
= (struct spdk_bs_resize_ctx
*)cb_arg
;
5087 ctx
->blob
->locked_operation_in_progress
= false;
5088 ctx
->cb_fn(ctx
->cb_arg
, rc
);
5093 ctx
->rc
= _spdk_blob_resize(ctx
->blob
, ctx
->sz
);
5095 _spdk_blob_unfreeze_io(ctx
->blob
, _spdk_bs_resize_unfreeze_cpl
, ctx
);
5099 spdk_blob_resize(struct spdk_blob
*blob
, uint64_t sz
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5101 struct spdk_bs_resize_ctx
*ctx
;
5103 _spdk_blob_verify_md_op(blob
);
5105 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Resizing blob %lu to %lu clusters\n", blob
->id
, sz
);
5108 cb_fn(cb_arg
, -EPERM
);
5112 if (sz
== blob
->active
.num_clusters
) {
5117 if (blob
->locked_operation_in_progress
) {
5118 cb_fn(cb_arg
, -EBUSY
);
5122 ctx
= calloc(1, sizeof(*ctx
));
5124 cb_fn(cb_arg
, -ENOMEM
);
5128 blob
->locked_operation_in_progress
= true;
5130 ctx
->cb_arg
= cb_arg
;
5133 _spdk_blob_freeze_io(blob
, _spdk_bs_resize_freeze_cpl
, ctx
);
5136 /* END spdk_blob_resize */
5139 /* START spdk_bs_delete_blob */
5142 _spdk_bs_delete_ebusy_close_cpl(void *cb_arg
, int bserrno
)
5144 spdk_bs_sequence_t
*seq
= cb_arg
;
5146 spdk_bs_sequence_finish(seq
, -EBUSY
);
5150 _spdk_bs_delete_close_cpl(void *cb_arg
, int bserrno
)
5152 spdk_bs_sequence_t
*seq
= cb_arg
;
5154 spdk_bs_sequence_finish(seq
, bserrno
);
5158 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
5160 struct spdk_blob
*blob
= cb_arg
;
5164 * We already removed this blob from the blobstore tailq, so
5165 * we need to free it here since this is the last reference
5168 _spdk_blob_free(blob
);
5169 _spdk_bs_delete_close_cpl(seq
, bserrno
);
5174 * This will immediately decrement the ref_count and call
5175 * the completion routine since the metadata state is clean.
5176 * By calling spdk_blob_close, we reduce the number of call
5177 * points into code that touches the blob->open_ref count
5178 * and the blobstore's blob list.
5180 spdk_blob_close(blob
, _spdk_bs_delete_close_cpl
, seq
);
5184 _spdk_bs_delete_blob_finish(void *cb_arg
, struct spdk_blob
*blob
, int bserrno
)
5186 spdk_bs_sequence_t
*seq
= cb_arg
;
5187 struct spdk_blob_list
*snapshot_entry
= NULL
;
5191 SPDK_ERRLOG("Failed to remove blob\n");
5192 spdk_bs_sequence_finish(seq
, bserrno
);
5196 /* Remove snapshot from the list */
5197 snapshot_entry
= _spdk_bs_get_snapshot_entry(blob
->bs
, blob
->id
);
5198 if (snapshot_entry
!= NULL
) {
5199 TAILQ_REMOVE(&blob
->bs
->snapshots
, snapshot_entry
, link
);
5200 free(snapshot_entry
);
5203 page_num
= _spdk_bs_blobid_to_page(blob
->id
);
5204 spdk_bit_array_clear(blob
->bs
->used_blobids
, page_num
);
5205 blob
->state
= SPDK_BLOB_STATE_DIRTY
;
5206 blob
->active
.num_pages
= 0;
5207 _spdk_blob_resize(blob
, 0);
5209 _spdk_blob_persist(seq
, blob
, _spdk_bs_delete_persist_cpl
, blob
);
5213 _spdk_bs_is_blob_deletable(struct spdk_blob
*blob
)
5215 struct spdk_blob_list
*snapshot_entry
= NULL
;
5217 if (blob
->open_ref
> 1) {
5218 /* Someone has this blob open (besides this delete context). */
5222 /* Check if this is a snapshot with clones */
5223 snapshot_entry
= _spdk_bs_get_snapshot_entry(blob
->bs
, blob
->id
);
5224 if (snapshot_entry
!= NULL
) {
5225 /* If snapshot have clones, we cannot remove it */
5226 if (!TAILQ_EMPTY(&snapshot_entry
->clones
)) {
5227 SPDK_ERRLOG("Cannot remove snapshot with clones\n");
5236 _spdk_bs_delete_open_cpl(void *cb_arg
, struct spdk_blob
*blob
, int bserrno
)
5238 spdk_bs_sequence_t
*seq
= cb_arg
;
5241 spdk_bs_sequence_finish(seq
, bserrno
);
5245 _spdk_blob_verify_md_op(blob
);
5247 bserrno
= _spdk_bs_is_blob_deletable(blob
);
5249 spdk_blob_close(blob
, _spdk_bs_delete_ebusy_close_cpl
, seq
);
5253 _spdk_bs_blob_list_remove(blob
);
5255 if (blob
->locked_operation_in_progress
) {
5256 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Cannot remove blob - another operation in progress\n");
5257 spdk_blob_close(blob
, _spdk_bs_delete_ebusy_close_cpl
, seq
);
5261 blob
->locked_operation_in_progress
= true;
5264 * Remove the blob from the blob_store list now, to ensure it does not
5265 * get returned after this point by _spdk_blob_lookup().
5267 TAILQ_REMOVE(&blob
->bs
->blobs
, blob
, link
);
5269 _spdk_bs_delete_blob_finish(seq
, blob
, 0);
5273 spdk_bs_delete_blob(struct spdk_blob_store
*bs
, spdk_blob_id blobid
,
5274 spdk_blob_op_complete cb_fn
, void *cb_arg
)
5276 struct spdk_bs_cpl cpl
;
5277 spdk_bs_sequence_t
*seq
;
5279 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Deleting blob %lu\n", blobid
);
5281 assert(spdk_get_thread() == bs
->md_thread
);
5283 cpl
.type
= SPDK_BS_CPL_TYPE_BLOB_BASIC
;
5284 cpl
.u
.blob_basic
.cb_fn
= cb_fn
;
5285 cpl
.u
.blob_basic
.cb_arg
= cb_arg
;
5287 seq
= spdk_bs_sequence_start(bs
->md_channel
, &cpl
);
5289 cb_fn(cb_arg
, -ENOMEM
);
5293 spdk_bs_open_blob(bs
, blobid
, _spdk_bs_delete_open_cpl
, seq
);
5296 /* END spdk_bs_delete_blob */
5298 /* START spdk_bs_open_blob */
5301 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
5303 struct spdk_blob
*blob
= cb_arg
;
5305 /* If the blob have crc error, we just return NULL. */
5307 seq
->cpl
.u
.blob_handle
.blob
= NULL
;
5308 spdk_bs_sequence_finish(seq
, bserrno
);
5314 TAILQ_INSERT_HEAD(&blob
->bs
->blobs
, blob
, link
);
5316 spdk_bs_sequence_finish(seq
, bserrno
);
5319 static void _spdk_bs_open_blob(struct spdk_blob_store
*bs
, spdk_blob_id blobid
,
5320 struct spdk_blob_open_opts
*opts
, spdk_blob_op_with_handle_complete cb_fn
, void *cb_arg
)
5322 struct spdk_blob
*blob
;
5323 struct spdk_bs_cpl cpl
;
5324 struct spdk_blob_open_opts opts_default
;
5325 spdk_bs_sequence_t
*seq
;
5328 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Opening blob %lu\n", blobid
);
5329 assert(spdk_get_thread() == bs
->md_thread
);
5331 page_num
= _spdk_bs_blobid_to_page(blobid
);
5332 if (spdk_bit_array_get(bs
->used_blobids
, page_num
) == false) {
5333 /* Invalid blobid */
5334 cb_fn(cb_arg
, NULL
, -ENOENT
);
5338 blob
= _spdk_blob_lookup(bs
, blobid
);
5341 cb_fn(cb_arg
, blob
, 0);
5345 blob
= _spdk_blob_alloc(bs
, blobid
);
5347 cb_fn(cb_arg
, NULL
, -ENOMEM
);
5352 spdk_blob_open_opts_init(&opts_default
);
5353 opts
= &opts_default
;
5356 blob
->clear_method
= opts
->clear_method
;
5358 cpl
.type
= SPDK_BS_CPL_TYPE_BLOB_HANDLE
;
5359 cpl
.u
.blob_handle
.cb_fn
= cb_fn
;
5360 cpl
.u
.blob_handle
.cb_arg
= cb_arg
;
5361 cpl
.u
.blob_handle
.blob
= blob
;
5363 seq
= spdk_bs_sequence_start(bs
->md_channel
, &cpl
);
5365 _spdk_blob_free(blob
);
5366 cb_fn(cb_arg
, NULL
, -ENOMEM
);
5370 _spdk_blob_load(seq
, blob
, _spdk_bs_open_blob_cpl
, blob
);
5373 void spdk_bs_open_blob(struct spdk_blob_store
*bs
, spdk_blob_id blobid
,
5374 spdk_blob_op_with_handle_complete cb_fn
, void *cb_arg
)
5376 _spdk_bs_open_blob(bs
, blobid
, NULL
, cb_fn
, cb_arg
);
5379 void spdk_bs_open_blob_ext(struct spdk_blob_store
*bs
, spdk_blob_id blobid
,
5380 struct spdk_blob_open_opts
*opts
, spdk_blob_op_with_handle_complete cb_fn
, void *cb_arg
)
5382 _spdk_bs_open_blob(bs
, blobid
, opts
, cb_fn
, cb_arg
);
5385 /* END spdk_bs_open_blob */
5387 /* START spdk_blob_set_read_only */
5388 int spdk_blob_set_read_only(struct spdk_blob
*blob
)
5390 _spdk_blob_verify_md_op(blob
);
5392 blob
->data_ro_flags
|= SPDK_BLOB_READ_ONLY
;
5394 blob
->state
= SPDK_BLOB_STATE_DIRTY
;
5397 /* END spdk_blob_set_read_only */
5399 /* START spdk_blob_sync_md */
5402 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
5404 struct spdk_blob
*blob
= cb_arg
;
5406 if (bserrno
== 0 && (blob
->data_ro_flags
& SPDK_BLOB_READ_ONLY
)) {
5407 blob
->data_ro
= true;
5411 spdk_bs_sequence_finish(seq
, bserrno
);
5415 _spdk_blob_sync_md(struct spdk_blob
*blob
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5417 struct spdk_bs_cpl cpl
;
5418 spdk_bs_sequence_t
*seq
;
5420 cpl
.type
= SPDK_BS_CPL_TYPE_BLOB_BASIC
;
5421 cpl
.u
.blob_basic
.cb_fn
= cb_fn
;
5422 cpl
.u
.blob_basic
.cb_arg
= cb_arg
;
5424 seq
= spdk_bs_sequence_start(blob
->bs
->md_channel
, &cpl
);
5426 cb_fn(cb_arg
, -ENOMEM
);
5430 _spdk_blob_persist(seq
, blob
, _spdk_blob_sync_md_cpl
, blob
);
5434 spdk_blob_sync_md(struct spdk_blob
*blob
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5436 _spdk_blob_verify_md_op(blob
);
5438 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Syncing blob %lu\n", blob
->id
);
5441 assert(blob
->state
== SPDK_BLOB_STATE_CLEAN
);
5446 _spdk_blob_sync_md(blob
, cb_fn
, cb_arg
);
5449 /* END spdk_blob_sync_md */
5451 struct spdk_blob_insert_cluster_ctx
{
5452 struct spdk_thread
*thread
;
5453 struct spdk_blob
*blob
;
5454 uint32_t cluster_num
; /* cluster index in blob */
5455 uint32_t cluster
; /* cluster on disk */
5457 spdk_blob_op_complete cb_fn
;
5462 _spdk_blob_insert_cluster_msg_cpl(void *arg
)
5464 struct spdk_blob_insert_cluster_ctx
*ctx
= arg
;
5466 ctx
->cb_fn(ctx
->cb_arg
, ctx
->rc
);
5471 _spdk_blob_insert_cluster_msg_cb(void *arg
, int bserrno
)
5473 struct spdk_blob_insert_cluster_ctx
*ctx
= arg
;
5476 spdk_thread_send_msg(ctx
->thread
, _spdk_blob_insert_cluster_msg_cpl
, ctx
);
5480 _spdk_blob_insert_cluster_msg(void *arg
)
5482 struct spdk_blob_insert_cluster_ctx
*ctx
= arg
;
5484 ctx
->rc
= _spdk_blob_insert_cluster(ctx
->blob
, ctx
->cluster_num
, ctx
->cluster
);
5486 spdk_thread_send_msg(ctx
->thread
, _spdk_blob_insert_cluster_msg_cpl
, ctx
);
5490 ctx
->blob
->state
= SPDK_BLOB_STATE_DIRTY
;
5491 _spdk_blob_sync_md(ctx
->blob
, _spdk_blob_insert_cluster_msg_cb
, ctx
);
5495 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob
*blob
, uint32_t cluster_num
,
5496 uint64_t cluster
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5498 struct spdk_blob_insert_cluster_ctx
*ctx
;
5500 ctx
= calloc(1, sizeof(*ctx
));
5502 cb_fn(cb_arg
, -ENOMEM
);
5506 ctx
->thread
= spdk_get_thread();
5508 ctx
->cluster_num
= cluster_num
;
5509 ctx
->cluster
= cluster
;
5511 ctx
->cb_arg
= cb_arg
;
5513 spdk_thread_send_msg(blob
->bs
->md_thread
, _spdk_blob_insert_cluster_msg
, ctx
);
5516 /* START spdk_blob_close */
5519 _spdk_blob_close_cpl(spdk_bs_sequence_t
*seq
, void *cb_arg
, int bserrno
)
5521 struct spdk_blob
*blob
= cb_arg
;
5525 if (blob
->open_ref
== 0) {
5527 * Blobs with active.num_pages == 0 are deleted blobs.
5528 * these blobs are removed from the blob_store list
5529 * when the deletion process starts - so don't try to
5530 * remove them again.
5532 if (blob
->active
.num_pages
> 0) {
5533 TAILQ_REMOVE(&blob
->bs
->blobs
, blob
, link
);
5535 _spdk_blob_free(blob
);
5539 spdk_bs_sequence_finish(seq
, bserrno
);
5542 void spdk_blob_close(struct spdk_blob
*blob
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5544 struct spdk_bs_cpl cpl
;
5545 spdk_bs_sequence_t
*seq
;
5547 _spdk_blob_verify_md_op(blob
);
5549 SPDK_DEBUGLOG(SPDK_LOG_BLOB
, "Closing blob %lu\n", blob
->id
);
5551 if (blob
->open_ref
== 0) {
5552 cb_fn(cb_arg
, -EBADF
);
5556 cpl
.type
= SPDK_BS_CPL_TYPE_BLOB_BASIC
;
5557 cpl
.u
.blob_basic
.cb_fn
= cb_fn
;
5558 cpl
.u
.blob_basic
.cb_arg
= cb_arg
;
5560 seq
= spdk_bs_sequence_start(blob
->bs
->md_channel
, &cpl
);
5562 cb_fn(cb_arg
, -ENOMEM
);
5567 _spdk_blob_persist(seq
, blob
, _spdk_blob_close_cpl
, blob
);
5570 /* END spdk_blob_close */
5572 struct spdk_io_channel
*spdk_bs_alloc_io_channel(struct spdk_blob_store
*bs
)
5574 return spdk_get_io_channel(bs
);
5577 void spdk_bs_free_io_channel(struct spdk_io_channel
*channel
)
5579 spdk_put_io_channel(channel
);
5582 void spdk_blob_io_unmap(struct spdk_blob
*blob
, struct spdk_io_channel
*channel
,
5583 uint64_t offset
, uint64_t length
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5585 _spdk_blob_request_submit_op(blob
, channel
, NULL
, offset
, length
, cb_fn
, cb_arg
,
5589 void spdk_blob_io_write_zeroes(struct spdk_blob
*blob
, struct spdk_io_channel
*channel
,
5590 uint64_t offset
, uint64_t length
, spdk_blob_op_complete cb_fn
, void *cb_arg
)
5592 _spdk_blob_request_submit_op(blob
, channel
, NULL
, offset
, length
, cb_fn
, cb_arg
,
5593 SPDK_BLOB_WRITE_ZEROES
);
5596 void spdk_blob_io_write(struct spdk_blob
*blob
, struct spdk_io_channel
*channel
,
5597 void *payload
, uint64_t offset
, uint64_t length
,
5598 spdk_blob_op_complete cb_fn
, void *cb_arg
)
5600 _spdk_blob_request_submit_op(blob
, channel
, payload
, offset
, length
, cb_fn
, cb_arg
,
5604 void spdk_blob_io_read(struct spdk_blob
*blob
, struct spdk_io_channel
*channel
,
5605 void *payload
, uint64_t offset
, uint64_t length
,
5606 spdk_blob_op_complete cb_fn
, void *cb_arg
)
5608 _spdk_blob_request_submit_op(blob
, channel
, payload
, offset
, length
, cb_fn
, cb_arg
,
5612 void spdk_blob_io_writev(struct spdk_blob
*blob
, struct spdk_io_channel
*channel
,
5613 struct iovec
*iov
, int iovcnt
, uint64_t offset
, uint64_t length
,
5614 spdk_blob_op_complete cb_fn
, void *cb_arg
)
5616 _spdk_blob_request_submit_rw_iov(blob
, channel
, iov
, iovcnt
, offset
, length
, cb_fn
, cb_arg
, false);
5619 void spdk_blob_io_readv(struct spdk_blob
*blob
, struct spdk_io_channel
*channel
,
5620 struct iovec
*iov
, int iovcnt
, uint64_t offset
, uint64_t length
,
5621 spdk_blob_op_complete cb_fn
, void *cb_arg
)
5623 _spdk_blob_request_submit_rw_iov(blob
, channel
, iov
, iovcnt
, offset
, length
, cb_fn
, cb_arg
, true);
5626 struct spdk_bs_iter_ctx
{
5628 struct spdk_blob_store
*bs
;
5630 spdk_blob_op_with_handle_complete cb_fn
;
5635 _spdk_bs_iter_cpl(void *cb_arg
, struct spdk_blob
*_blob
, int bserrno
)
5637 struct spdk_bs_iter_ctx
*ctx
= cb_arg
;
5638 struct spdk_blob_store
*bs
= ctx
->bs
;
5642 ctx
->cb_fn(ctx
->cb_arg
, _blob
, bserrno
);
5648 ctx
->page_num
= spdk_bit_array_find_first_set(bs
->used_blobids
, ctx
->page_num
);
5649 if (ctx
->page_num
>= spdk_bit_array_capacity(bs
->used_blobids
)) {
5650 ctx
->cb_fn(ctx
->cb_arg
, NULL
, -ENOENT
);
5655 id
= _spdk_bs_page_to_blobid(ctx
->page_num
);
5657 spdk_bs_open_blob(bs
, id
, _spdk_bs_iter_cpl
, ctx
);
5661 spdk_bs_iter_first(struct spdk_blob_store
*bs
,
5662 spdk_blob_op_with_handle_complete cb_fn
, void *cb_arg
)
5664 struct spdk_bs_iter_ctx
*ctx
;
5666 ctx
= calloc(1, sizeof(*ctx
));
5668 cb_fn(cb_arg
, NULL
, -ENOMEM
);
5675 ctx
->cb_arg
= cb_arg
;
5677 _spdk_bs_iter_cpl(ctx
, NULL
, -1);
5681 _spdk_bs_iter_close_cpl(void *cb_arg
, int bserrno
)
5683 struct spdk_bs_iter_ctx
*ctx
= cb_arg
;
5685 _spdk_bs_iter_cpl(ctx
, NULL
, -1);
5689 spdk_bs_iter_next(struct spdk_blob_store
*bs
, struct spdk_blob
*blob
,
5690 spdk_blob_op_with_handle_complete cb_fn
, void *cb_arg
)
5692 struct spdk_bs_iter_ctx
*ctx
;
5694 assert(blob
!= NULL
);
5696 ctx
= calloc(1, sizeof(*ctx
));
5698 cb_fn(cb_arg
, NULL
, -ENOMEM
);
5702 ctx
->page_num
= _spdk_bs_blobid_to_page(blob
->id
);
5705 ctx
->cb_arg
= cb_arg
;
5707 /* Close the existing blob */
5708 spdk_blob_close(blob
, _spdk_bs_iter_close_cpl
, ctx
);
5712 _spdk_blob_set_xattr(struct spdk_blob
*blob
, const char *name
, const void *value
,
5713 uint16_t value_len
, bool internal
)
5715 struct spdk_xattr_tailq
*xattrs
;
5716 struct spdk_xattr
*xattr
;
5718 _spdk_blob_verify_md_op(blob
);
5725 xattrs
= &blob
->xattrs_internal
;
5726 blob
->invalid_flags
|= SPDK_BLOB_INTERNAL_XATTR
;
5728 xattrs
= &blob
->xattrs
;
5731 TAILQ_FOREACH(xattr
, xattrs
, link
) {
5732 if (!strcmp(name
, xattr
->name
)) {
5734 xattr
->value_len
= value_len
;
5735 xattr
->value
= malloc(value_len
);
5736 memcpy(xattr
->value
, value
, value_len
);
5738 blob
->state
= SPDK_BLOB_STATE_DIRTY
;
5744 xattr
= calloc(1, sizeof(*xattr
));
5748 xattr
->name
= strdup(name
);
5749 xattr
->value_len
= value_len
;
5750 xattr
->value
= malloc(value_len
);
5751 memcpy(xattr
->value
, value
, value_len
);
5752 TAILQ_INSERT_TAIL(xattrs
, xattr
, link
);
5754 blob
->state
= SPDK_BLOB_STATE_DIRTY
;
5760 spdk_blob_set_xattr(struct spdk_blob
*blob
, const char *name
, const void *value
,
5763 return _spdk_blob_set_xattr(blob
, name
, value
, value_len
, false);
5767 _spdk_blob_remove_xattr(struct spdk_blob
*blob
, const char *name
, bool internal
)
5769 struct spdk_xattr_tailq
*xattrs
;
5770 struct spdk_xattr
*xattr
;
5772 _spdk_blob_verify_md_op(blob
);
5777 xattrs
= internal
? &blob
->xattrs_internal
: &blob
->xattrs
;
5779 TAILQ_FOREACH(xattr
, xattrs
, link
) {
5780 if (!strcmp(name
, xattr
->name
)) {
5781 TAILQ_REMOVE(xattrs
, xattr
, link
);
5786 if (internal
&& TAILQ_EMPTY(&blob
->xattrs_internal
)) {
5787 blob
->invalid_flags
&= ~SPDK_BLOB_INTERNAL_XATTR
;
5789 blob
->state
= SPDK_BLOB_STATE_DIRTY
;
5799 spdk_blob_remove_xattr(struct spdk_blob
*blob
, const char *name
)
5801 return _spdk_blob_remove_xattr(blob
, name
, false);
5805 _spdk_blob_get_xattr_value(struct spdk_blob
*blob
, const char *name
,
5806 const void **value
, size_t *value_len
, bool internal
)
5808 struct spdk_xattr
*xattr
;
5809 struct spdk_xattr_tailq
*xattrs
;
5811 xattrs
= internal
? &blob
->xattrs_internal
: &blob
->xattrs
;
5813 TAILQ_FOREACH(xattr
, xattrs
, link
) {
5814 if (!strcmp(name
, xattr
->name
)) {
5815 *value
= xattr
->value
;
5816 *value_len
= xattr
->value_len
;
5824 spdk_blob_get_xattr_value(struct spdk_blob
*blob
, const char *name
,
5825 const void **value
, size_t *value_len
)
5827 _spdk_blob_verify_md_op(blob
);
5829 return _spdk_blob_get_xattr_value(blob
, name
, value
, value_len
, false);
5832 struct spdk_xattr_names
{
5834 const char *names
[0];
5838 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq
*xattrs
, struct spdk_xattr_names
**names
)
5840 struct spdk_xattr
*xattr
;
5843 TAILQ_FOREACH(xattr
, xattrs
, link
) {
5847 *names
= calloc(1, sizeof(struct spdk_xattr_names
) + count
* sizeof(char *));
5848 if (*names
== NULL
) {
5852 TAILQ_FOREACH(xattr
, xattrs
, link
) {
5853 (*names
)->names
[(*names
)->count
++] = xattr
->name
;
5860 spdk_blob_get_xattr_names(struct spdk_blob
*blob
, struct spdk_xattr_names
**names
)
5862 _spdk_blob_verify_md_op(blob
);
5864 return _spdk_blob_get_xattr_names(&blob
->xattrs
, names
);
5868 spdk_xattr_names_get_count(struct spdk_xattr_names
*names
)
5870 assert(names
!= NULL
);
5872 return names
->count
;
5876 spdk_xattr_names_get_name(struct spdk_xattr_names
*names
, uint32_t index
)
5878 if (index
>= names
->count
) {
5882 return names
->names
[index
];
5886 spdk_xattr_names_free(struct spdk_xattr_names
*names
)
5892 spdk_bs_get_bstype(struct spdk_blob_store
*bs
)
5898 spdk_bs_set_bstype(struct spdk_blob_store
*bs
, struct spdk_bs_type bstype
)
5900 memcpy(&bs
->bstype
, &bstype
, sizeof(bstype
));
5904 spdk_blob_is_read_only(struct spdk_blob
*blob
)
5906 assert(blob
!= NULL
);
5907 return (blob
->data_ro
|| blob
->md_ro
);
5911 spdk_blob_is_snapshot(struct spdk_blob
*blob
)
5913 struct spdk_blob_list
*snapshot_entry
;
5915 assert(blob
!= NULL
);
5917 snapshot_entry
= _spdk_bs_get_snapshot_entry(blob
->bs
, blob
->id
);
5918 if (snapshot_entry
== NULL
) {
5926 spdk_blob_is_clone(struct spdk_blob
*blob
)
5928 assert(blob
!= NULL
);
5930 if (blob
->parent_id
!= SPDK_BLOBID_INVALID
) {
5931 assert(spdk_blob_is_thin_provisioned(blob
));
5939 spdk_blob_is_thin_provisioned(struct spdk_blob
*blob
)
5941 assert(blob
!= NULL
);
5942 return !!(blob
->invalid_flags
& SPDK_BLOB_THIN_PROV
);
5946 spdk_blob_get_parent_snapshot(struct spdk_blob_store
*bs
, spdk_blob_id blob_id
)
5948 struct spdk_blob_list
*snapshot_entry
= NULL
;
5949 struct spdk_blob_list
*clone_entry
= NULL
;
5951 TAILQ_FOREACH(snapshot_entry
, &bs
->snapshots
, link
) {
5952 TAILQ_FOREACH(clone_entry
, &snapshot_entry
->clones
, link
) {
5953 if (clone_entry
->id
== blob_id
) {
5954 return snapshot_entry
->id
;
5959 return SPDK_BLOBID_INVALID
;
5963 spdk_blob_get_clones(struct spdk_blob_store
*bs
, spdk_blob_id blobid
, spdk_blob_id
*ids
,
5966 struct spdk_blob_list
*snapshot_entry
, *clone_entry
;
5969 snapshot_entry
= _spdk_bs_get_snapshot_entry(bs
, blobid
);
5970 if (snapshot_entry
== NULL
) {
5975 if (ids
== NULL
|| *count
< snapshot_entry
->clone_count
) {
5976 *count
= snapshot_entry
->clone_count
;
5979 *count
= snapshot_entry
->clone_count
;
5982 TAILQ_FOREACH(clone_entry
, &snapshot_entry
->clones
, link
) {
5983 ids
[n
++] = clone_entry
->id
;
5989 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB
)