]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/blob/blobstore.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / lib / blob / blobstore.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "spdk/stdinc.h"
35
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/thread.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 #include "spdk/util.h"
44
45 #include "spdk_internal/assert.h"
46 #include "spdk_internal/log.h"
47
48 #include "blobstore.h"
49
50 #define BLOB_CRC32C_INITIAL 0xffffffffUL
51
52 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
53 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
54 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
55 static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
56 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg);
57
58 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
59 uint16_t value_len, bool internal);
60 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
61 const void **value, size_t *value_len, bool internal);
62 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal);
63
64 static void
65 _spdk_blob_verify_md_op(struct spdk_blob *blob)
66 {
67 assert(blob != NULL);
68 assert(spdk_get_thread() == blob->bs->md_thread);
69 assert(blob->state != SPDK_BLOB_STATE_LOADING);
70 }
71
72 static struct spdk_blob_list *
73 _spdk_bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid)
74 {
75 struct spdk_blob_list *snapshot_entry = NULL;
76
77 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) {
78 if (snapshot_entry->id == blobid) {
79 break;
80 }
81 }
82
83 return snapshot_entry;
84 }
85
86 static void
87 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
88 {
89 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
90 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
91 assert(bs->num_free_clusters > 0);
92
93 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
94
95 spdk_bit_array_set(bs->used_clusters, cluster_num);
96 bs->num_free_clusters--;
97 }
98
99 static int
100 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster)
101 {
102 uint64_t *cluster_lba = &blob->active.clusters[cluster_num];
103
104 _spdk_blob_verify_md_op(blob);
105
106 if (*cluster_lba != 0) {
107 return -EEXIST;
108 }
109
110 *cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster);
111 return 0;
112 }
113
114 static int
115 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num,
116 uint64_t *lowest_free_cluster, bool update_map)
117 {
118 pthread_mutex_lock(&blob->bs->used_clusters_mutex);
119 *lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters,
120 *lowest_free_cluster);
121 if (*lowest_free_cluster == UINT32_MAX) {
122 /* No more free clusters. Cannot satisfy the request */
123 pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
124 return -ENOSPC;
125 }
126
127 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id);
128 _spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster);
129 pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
130
131 if (update_map) {
132 _spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster);
133 }
134
135 return 0;
136 }
137
138 static void
139 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
140 {
141 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
142 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
143 assert(bs->num_free_clusters < bs->total_clusters);
144
145 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
146
147 pthread_mutex_lock(&bs->used_clusters_mutex);
148 spdk_bit_array_clear(bs->used_clusters, cluster_num);
149 bs->num_free_clusters++;
150 pthread_mutex_unlock(&bs->used_clusters_mutex);
151 }
152
153 static void
154 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs)
155 {
156 xattrs->count = 0;
157 xattrs->names = NULL;
158 xattrs->ctx = NULL;
159 xattrs->get_value = NULL;
160 }
161
162 void
163 spdk_blob_opts_init(struct spdk_blob_opts *opts)
164 {
165 opts->num_clusters = 0;
166 opts->thin_provision = false;
167 _spdk_blob_xattrs_init(&opts->xattrs);
168 }
169
170 void
171 spdk_blob_open_opts_init(struct spdk_blob_open_opts *opts)
172 {
173 opts->clear_method = BLOB_CLEAR_WITH_UNMAP;
174 }
175
176 static struct spdk_blob *
177 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
178 {
179 struct spdk_blob *blob;
180
181 blob = calloc(1, sizeof(*blob));
182 if (!blob) {
183 return NULL;
184 }
185
186 blob->id = id;
187 blob->bs = bs;
188
189 blob->parent_id = SPDK_BLOBID_INVALID;
190
191 blob->state = SPDK_BLOB_STATE_DIRTY;
192 blob->active.num_pages = 1;
193 blob->active.pages = calloc(1, sizeof(*blob->active.pages));
194 if (!blob->active.pages) {
195 free(blob);
196 return NULL;
197 }
198
199 blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
200
201 TAILQ_INIT(&blob->xattrs);
202 TAILQ_INIT(&blob->xattrs_internal);
203
204 return blob;
205 }
206
207 static void
208 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs)
209 {
210 struct spdk_xattr *xattr, *xattr_tmp;
211
212 TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) {
213 TAILQ_REMOVE(xattrs, xattr, link);
214 free(xattr->name);
215 free(xattr->value);
216 free(xattr);
217 }
218 }
219
220 static void
221 _spdk_blob_free(struct spdk_blob *blob)
222 {
223 assert(blob != NULL);
224
225 free(blob->active.clusters);
226 free(blob->clean.clusters);
227 free(blob->active.pages);
228 free(blob->clean.pages);
229
230 _spdk_xattrs_free(&blob->xattrs);
231 _spdk_xattrs_free(&blob->xattrs_internal);
232
233 if (blob->back_bs_dev) {
234 blob->back_bs_dev->destroy(blob->back_bs_dev);
235 }
236
237 free(blob);
238 }
239
240 struct freeze_io_ctx {
241 struct spdk_bs_cpl cpl;
242 struct spdk_blob *blob;
243 };
244
245 static void
246 _spdk_blob_io_sync(struct spdk_io_channel_iter *i)
247 {
248 spdk_for_each_channel_continue(i, 0);
249 }
250
251 static void
252 _spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i)
253 {
254 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
255 struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch);
256 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
257 struct spdk_bs_request_set *set;
258 struct spdk_bs_user_op_args *args;
259 spdk_bs_user_op_t *op, *tmp;
260
261 TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) {
262 set = (struct spdk_bs_request_set *)op;
263 args = &set->u.user_op;
264
265 if (args->blob == ctx->blob) {
266 TAILQ_REMOVE(&ch->queued_io, op, link);
267 spdk_bs_user_op_execute(op);
268 }
269 }
270
271 spdk_for_each_channel_continue(i, 0);
272 }
273
274 static void
275 _spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status)
276 {
277 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
278
279 ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0);
280
281 free(ctx);
282 }
283
284 static void
285 _spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
286 {
287 struct freeze_io_ctx *ctx;
288
289 ctx = calloc(1, sizeof(*ctx));
290 if (!ctx) {
291 cb_fn(cb_arg, -ENOMEM);
292 return;
293 }
294
295 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
296 ctx->cpl.u.blob_basic.cb_fn = cb_fn;
297 ctx->cpl.u.blob_basic.cb_arg = cb_arg;
298 ctx->blob = blob;
299
300 /* Freeze I/O on blob */
301 blob->frozen_refcnt++;
302
303 if (blob->frozen_refcnt == 1) {
304 spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl);
305 } else {
306 cb_fn(cb_arg, 0);
307 free(ctx);
308 }
309 }
310
311 static void
312 _spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
313 {
314 struct freeze_io_ctx *ctx;
315
316 ctx = calloc(1, sizeof(*ctx));
317 if (!ctx) {
318 cb_fn(cb_arg, -ENOMEM);
319 return;
320 }
321
322 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
323 ctx->cpl.u.blob_basic.cb_fn = cb_fn;
324 ctx->cpl.u.blob_basic.cb_arg = cb_arg;
325 ctx->blob = blob;
326
327 assert(blob->frozen_refcnt > 0);
328
329 blob->frozen_refcnt--;
330
331 if (blob->frozen_refcnt == 0) {
332 spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl);
333 } else {
334 cb_fn(cb_arg, 0);
335 free(ctx);
336 }
337 }
338
339 static int
340 _spdk_blob_mark_clean(struct spdk_blob *blob)
341 {
342 uint64_t *clusters = NULL;
343 uint32_t *pages = NULL;
344
345 assert(blob != NULL);
346
347 if (blob->active.num_clusters) {
348 assert(blob->active.clusters);
349 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
350 if (!clusters) {
351 return -ENOMEM;
352 }
353 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
354 }
355
356 if (blob->active.num_pages) {
357 assert(blob->active.pages);
358 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
359 if (!pages) {
360 free(clusters);
361 return -ENOMEM;
362 }
363 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
364 }
365
366 free(blob->clean.clusters);
367 free(blob->clean.pages);
368
369 blob->clean.num_clusters = blob->active.num_clusters;
370 blob->clean.clusters = blob->active.clusters;
371 blob->clean.num_pages = blob->active.num_pages;
372 blob->clean.pages = blob->active.pages;
373
374 blob->active.clusters = clusters;
375 blob->active.pages = pages;
376
377 /* If the metadata was dirtied again while the metadata was being written to disk,
378 * we do not want to revert the DIRTY state back to CLEAN here.
379 */
380 if (blob->state == SPDK_BLOB_STATE_LOADING) {
381 blob->state = SPDK_BLOB_STATE_CLEAN;
382 }
383
384 return 0;
385 }
386
387 static int
388 _spdk_blob_deserialize_xattr(struct spdk_blob *blob,
389 struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal)
390 {
391 struct spdk_xattr *xattr;
392
393 if (desc_xattr->length != sizeof(desc_xattr->name_length) +
394 sizeof(desc_xattr->value_length) +
395 desc_xattr->name_length + desc_xattr->value_length) {
396 return -EINVAL;
397 }
398
399 xattr = calloc(1, sizeof(*xattr));
400 if (xattr == NULL) {
401 return -ENOMEM;
402 }
403
404 xattr->name = malloc(desc_xattr->name_length + 1);
405 if (xattr->name == NULL) {
406 free(xattr);
407 return -ENOMEM;
408 }
409 memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
410 xattr->name[desc_xattr->name_length] = '\0';
411
412 xattr->value = malloc(desc_xattr->value_length);
413 if (xattr->value == NULL) {
414 free(xattr->name);
415 free(xattr);
416 return -ENOMEM;
417 }
418 xattr->value_len = desc_xattr->value_length;
419 memcpy(xattr->value,
420 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
421 desc_xattr->value_length);
422
423 TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link);
424
425 return 0;
426 }
427
428
429 static int
430 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
431 {
432 struct spdk_blob_md_descriptor *desc;
433 size_t cur_desc = 0;
434 void *tmp;
435
436 desc = (struct spdk_blob_md_descriptor *)page->descriptors;
437 while (cur_desc < sizeof(page->descriptors)) {
438 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
439 if (desc->length == 0) {
440 /* If padding and length are 0, this terminates the page */
441 break;
442 }
443 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
444 struct spdk_blob_md_descriptor_flags *desc_flags;
445
446 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
447
448 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
449 return -EINVAL;
450 }
451
452 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
453 SPDK_BLOB_INVALID_FLAGS_MASK) {
454 return -EINVAL;
455 }
456
457 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
458 SPDK_BLOB_DATA_RO_FLAGS_MASK) {
459 blob->data_ro = true;
460 blob->md_ro = true;
461 }
462
463 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
464 SPDK_BLOB_MD_RO_FLAGS_MASK) {
465 blob->md_ro = true;
466 }
467
468 if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
469 blob->data_ro = true;
470 blob->md_ro = true;
471 }
472
473 blob->invalid_flags = desc_flags->invalid_flags;
474 blob->data_ro_flags = desc_flags->data_ro_flags;
475 blob->md_ro_flags = desc_flags->md_ro_flags;
476
477 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
478 struct spdk_blob_md_descriptor_extent *desc_extent;
479 unsigned int i, j;
480 unsigned int cluster_count = blob->active.num_clusters;
481
482 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
483
484 if (desc_extent->length == 0 ||
485 (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
486 return -EINVAL;
487 }
488
489 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
490 for (j = 0; j < desc_extent->extents[i].length; j++) {
491 if (desc_extent->extents[i].cluster_idx != 0) {
492 if (!spdk_bit_array_get(blob->bs->used_clusters,
493 desc_extent->extents[i].cluster_idx + j)) {
494 return -EINVAL;
495 }
496 }
497 cluster_count++;
498 }
499 }
500
501 if (cluster_count == 0) {
502 return -EINVAL;
503 }
504 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
505 if (tmp == NULL) {
506 return -ENOMEM;
507 }
508 blob->active.clusters = tmp;
509 blob->active.cluster_array_size = cluster_count;
510
511 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
512 for (j = 0; j < desc_extent->extents[i].length; j++) {
513 if (desc_extent->extents[i].cluster_idx != 0) {
514 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
515 desc_extent->extents[i].cluster_idx + j);
516 } else if (spdk_blob_is_thin_provisioned(blob)) {
517 blob->active.clusters[blob->active.num_clusters++] = 0;
518 } else {
519 return -EINVAL;
520 }
521 }
522 }
523
524 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
525 int rc;
526
527 rc = _spdk_blob_deserialize_xattr(blob,
528 (struct spdk_blob_md_descriptor_xattr *) desc, false);
529 if (rc != 0) {
530 return rc;
531 }
532 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
533 int rc;
534
535 rc = _spdk_blob_deserialize_xattr(blob,
536 (struct spdk_blob_md_descriptor_xattr *) desc, true);
537 if (rc != 0) {
538 return rc;
539 }
540 } else {
541 /* Unrecognized descriptor type. Do not fail - just continue to the
542 * next descriptor. If this descriptor is associated with some feature
543 * defined in a newer version of blobstore, that version of blobstore
544 * should create and set an associated feature flag to specify if this
545 * blob can be loaded or not.
546 */
547 }
548
549 /* Advance to the next descriptor */
550 cur_desc += sizeof(*desc) + desc->length;
551 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
552 break;
553 }
554 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
555 }
556
557 return 0;
558 }
559
560 static int
561 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
562 struct spdk_blob *blob)
563 {
564 const struct spdk_blob_md_page *page;
565 uint32_t i;
566 int rc;
567
568 assert(page_count > 0);
569 assert(pages[0].sequence_num == 0);
570 assert(blob != NULL);
571 assert(blob->state == SPDK_BLOB_STATE_LOADING);
572 assert(blob->active.clusters == NULL);
573
574 /* The blobid provided doesn't match what's in the MD, this can
575 * happen for example if a bogus blobid is passed in through open.
576 */
577 if (blob->id != pages[0].id) {
578 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
579 blob->id, pages[0].id);
580 return -ENOENT;
581 }
582
583 for (i = 0; i < page_count; i++) {
584 page = &pages[i];
585
586 assert(page->id == blob->id);
587 assert(page->sequence_num == i);
588
589 rc = _spdk_blob_parse_page(page, blob);
590 if (rc != 0) {
591 return rc;
592 }
593 }
594
595 return 0;
596 }
597
598 static int
599 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
600 struct spdk_blob_md_page **pages,
601 uint32_t *page_count,
602 struct spdk_blob_md_page **last_page)
603 {
604 struct spdk_blob_md_page *page;
605
606 assert(pages != NULL);
607 assert(page_count != NULL);
608
609 if (*page_count == 0) {
610 assert(*pages == NULL);
611 *page_count = 1;
612 *pages = spdk_malloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE,
613 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
614 } else {
615 assert(*pages != NULL);
616 (*page_count)++;
617 *pages = spdk_realloc(*pages,
618 SPDK_BS_PAGE_SIZE * (*page_count),
619 SPDK_BS_PAGE_SIZE);
620 }
621
622 if (*pages == NULL) {
623 *page_count = 0;
624 *last_page = NULL;
625 return -ENOMEM;
626 }
627
628 page = &(*pages)[*page_count - 1];
629 memset(page, 0, sizeof(*page));
630 page->id = blob->id;
631 page->sequence_num = *page_count - 1;
632 page->next = SPDK_INVALID_MD_PAGE;
633 *last_page = page;
634
635 return 0;
636 }
637
638 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
639 * Update required_sz on both success and failure.
640 *
641 */
642 static int
643 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
644 uint8_t *buf, size_t buf_sz,
645 size_t *required_sz, bool internal)
646 {
647 struct spdk_blob_md_descriptor_xattr *desc;
648
649 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
650 strlen(xattr->name) +
651 xattr->value_len;
652
653 if (buf_sz < *required_sz) {
654 return -1;
655 }
656
657 desc = (struct spdk_blob_md_descriptor_xattr *)buf;
658
659 desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR;
660 desc->length = sizeof(desc->name_length) +
661 sizeof(desc->value_length) +
662 strlen(xattr->name) +
663 xattr->value_len;
664 desc->name_length = strlen(xattr->name);
665 desc->value_length = xattr->value_len;
666
667 memcpy(desc->name, xattr->name, desc->name_length);
668 memcpy((void *)((uintptr_t)desc->name + desc->name_length),
669 xattr->value,
670 desc->value_length);
671
672 return 0;
673 }
674
675 static void
676 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
677 uint64_t start_cluster, uint64_t *next_cluster,
678 uint8_t *buf, size_t buf_sz)
679 {
680 struct spdk_blob_md_descriptor_extent *desc;
681 size_t cur_sz;
682 uint64_t i, extent_idx;
683 uint64_t lba, lba_per_cluster, lba_count;
684
685 /* The buffer must have room for at least one extent */
686 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
687 if (buf_sz < cur_sz) {
688 *next_cluster = start_cluster;
689 return;
690 }
691
692 desc = (struct spdk_blob_md_descriptor_extent *)buf;
693 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
694
695 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
696
697 lba = blob->active.clusters[start_cluster];
698 lba_count = lba_per_cluster;
699 extent_idx = 0;
700 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
701 if ((lba + lba_count) == blob->active.clusters[i]) {
702 lba_count += lba_per_cluster;
703 continue;
704 } else if (lba == 0 && blob->active.clusters[i] == 0) {
705 lba_count += lba_per_cluster;
706 continue;
707 }
708 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
709 desc->extents[extent_idx].length = lba_count / lba_per_cluster;
710 extent_idx++;
711
712 cur_sz += sizeof(desc->extents[extent_idx]);
713
714 if (buf_sz < cur_sz) {
715 /* If we ran out of buffer space, return */
716 desc->length = sizeof(desc->extents[0]) * extent_idx;
717 *next_cluster = i;
718 return;
719 }
720
721 lba = blob->active.clusters[i];
722 lba_count = lba_per_cluster;
723 }
724
725 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
726 desc->extents[extent_idx].length = lba_count / lba_per_cluster;
727 extent_idx++;
728
729 desc->length = sizeof(desc->extents[0]) * extent_idx;
730 *next_cluster = blob->active.num_clusters;
731
732 return;
733 }
734
735 static void
736 _spdk_blob_serialize_flags(const struct spdk_blob *blob,
737 uint8_t *buf, size_t *buf_sz)
738 {
739 struct spdk_blob_md_descriptor_flags *desc;
740
741 /*
742 * Flags get serialized first, so we should always have room for the flags
743 * descriptor.
744 */
745 assert(*buf_sz >= sizeof(*desc));
746
747 desc = (struct spdk_blob_md_descriptor_flags *)buf;
748 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
749 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
750 desc->invalid_flags = blob->invalid_flags;
751 desc->data_ro_flags = blob->data_ro_flags;
752 desc->md_ro_flags = blob->md_ro_flags;
753
754 *buf_sz -= sizeof(*desc);
755 }
756
757 static int
758 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob,
759 const struct spdk_xattr_tailq *xattrs, bool internal,
760 struct spdk_blob_md_page **pages,
761 struct spdk_blob_md_page *cur_page,
762 uint32_t *page_count, uint8_t **buf,
763 size_t *remaining_sz)
764 {
765 const struct spdk_xattr *xattr;
766 int rc;
767
768 TAILQ_FOREACH(xattr, xattrs, link) {
769 size_t required_sz = 0;
770
771 rc = _spdk_blob_serialize_xattr(xattr,
772 *buf, *remaining_sz,
773 &required_sz, internal);
774 if (rc < 0) {
775 /* Need to add a new page to the chain */
776 rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
777 &cur_page);
778 if (rc < 0) {
779 spdk_free(*pages);
780 *pages = NULL;
781 *page_count = 0;
782 return rc;
783 }
784
785 *buf = (uint8_t *)cur_page->descriptors;
786 *remaining_sz = sizeof(cur_page->descriptors);
787
788 /* Try again */
789 required_sz = 0;
790 rc = _spdk_blob_serialize_xattr(xattr,
791 *buf, *remaining_sz,
792 &required_sz, internal);
793
794 if (rc < 0) {
795 spdk_free(*pages);
796 *pages = NULL;
797 *page_count = 0;
798 return rc;
799 }
800 }
801
802 *remaining_sz -= required_sz;
803 *buf += required_sz;
804 }
805
806 return 0;
807 }
808
809 static int
810 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
811 uint32_t *page_count)
812 {
813 struct spdk_blob_md_page *cur_page;
814 int rc;
815 uint8_t *buf;
816 size_t remaining_sz;
817 uint64_t last_cluster;
818
819 assert(pages != NULL);
820 assert(page_count != NULL);
821 assert(blob != NULL);
822 assert(blob->state == SPDK_BLOB_STATE_DIRTY);
823
824 *pages = NULL;
825 *page_count = 0;
826
827 /* A blob always has at least 1 page, even if it has no descriptors */
828 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
829 if (rc < 0) {
830 return rc;
831 }
832
833 buf = (uint8_t *)cur_page->descriptors;
834 remaining_sz = sizeof(cur_page->descriptors);
835
836 /* Serialize flags */
837 _spdk_blob_serialize_flags(blob, buf, &remaining_sz);
838 buf += sizeof(struct spdk_blob_md_descriptor_flags);
839
840 /* Serialize xattrs */
841 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false,
842 pages, cur_page, page_count, &buf, &remaining_sz);
843 if (rc < 0) {
844 return rc;
845 }
846
847 /* Serialize internal xattrs */
848 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true,
849 pages, cur_page, page_count, &buf, &remaining_sz);
850 if (rc < 0) {
851 return rc;
852 }
853
854 /* Serialize extents */
855 last_cluster = 0;
856 while (last_cluster < blob->active.num_clusters) {
857 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
858 buf, remaining_sz);
859
860 if (last_cluster == blob->active.num_clusters) {
861 break;
862 }
863
864 rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
865 &cur_page);
866 if (rc < 0) {
867 return rc;
868 }
869
870 buf = (uint8_t *)cur_page->descriptors;
871 remaining_sz = sizeof(cur_page->descriptors);
872 }
873
874 return 0;
875 }
876
877 struct spdk_blob_load_ctx {
878 struct spdk_blob *blob;
879
880 struct spdk_blob_md_page *pages;
881 uint32_t num_pages;
882 spdk_bs_sequence_t *seq;
883
884 spdk_bs_sequence_cpl cb_fn;
885 void *cb_arg;
886 };
887
888 static uint32_t
889 _spdk_blob_md_page_calc_crc(void *page)
890 {
891 uint32_t crc;
892
893 crc = BLOB_CRC32C_INITIAL;
894 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
895 crc ^= BLOB_CRC32C_INITIAL;
896
897 return crc;
898
899 }
900
901 static void
902 _spdk_blob_load_final(void *cb_arg, int bserrno)
903 {
904 struct spdk_blob_load_ctx *ctx = cb_arg;
905 struct spdk_blob *blob = ctx->blob;
906
907 _spdk_blob_mark_clean(blob);
908
909 ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno);
910
911 /* Free the memory */
912 spdk_free(ctx->pages);
913 free(ctx);
914 }
915
916 static void
917 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno)
918 {
919 struct spdk_blob_load_ctx *ctx = cb_arg;
920 struct spdk_blob *blob = ctx->blob;
921
922 if (bserrno != 0) {
923 goto error;
924 }
925
926 blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot);
927
928 if (blob->back_bs_dev == NULL) {
929 bserrno = -ENOMEM;
930 goto error;
931 }
932
933 _spdk_blob_load_final(ctx, bserrno);
934 return;
935
936 error:
937 SPDK_ERRLOG("Snapshot fail\n");
938 _spdk_blob_free(blob);
939 ctx->cb_fn(ctx->seq, NULL, bserrno);
940 spdk_free(ctx->pages);
941 free(ctx);
942 }
943
944 static void
945 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
946 {
947 struct spdk_blob_load_ctx *ctx = cb_arg;
948 struct spdk_blob *blob = ctx->blob;
949 struct spdk_blob_md_page *page;
950 const void *value;
951 size_t len;
952 int rc;
953 uint32_t crc;
954
955 if (bserrno) {
956 SPDK_ERRLOG("Metadata page read failed: %d\n", bserrno);
957 _spdk_blob_free(blob);
958 ctx->cb_fn(seq, NULL, bserrno);
959 spdk_free(ctx->pages);
960 free(ctx);
961 return;
962 }
963
964 page = &ctx->pages[ctx->num_pages - 1];
965 crc = _spdk_blob_md_page_calc_crc(page);
966 if (crc != page->crc) {
967 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
968 _spdk_blob_free(blob);
969 ctx->cb_fn(seq, NULL, -EINVAL);
970 spdk_free(ctx->pages);
971 free(ctx);
972 return;
973 }
974
975 if (page->next != SPDK_INVALID_MD_PAGE) {
976 uint32_t next_page = page->next;
977 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
978
979
980 assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
981
982 /* Read the next page */
983 ctx->num_pages++;
984 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
985 sizeof(*page));
986 if (ctx->pages == NULL) {
987 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
988 free(ctx);
989 return;
990 }
991
992 spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1],
993 next_lba,
994 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
995 _spdk_blob_load_cpl, ctx);
996 return;
997 }
998
999 /* Parse the pages */
1000 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
1001 if (rc) {
1002 _spdk_blob_free(blob);
1003 ctx->cb_fn(seq, NULL, rc);
1004 spdk_free(ctx->pages);
1005 free(ctx);
1006 return;
1007 }
1008 ctx->seq = seq;
1009
1010
1011 if (spdk_blob_is_thin_provisioned(blob)) {
1012 rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true);
1013 if (rc == 0) {
1014 if (len != sizeof(spdk_blob_id)) {
1015 _spdk_blob_free(blob);
1016 ctx->cb_fn(seq, NULL, -EINVAL);
1017 spdk_free(ctx->pages);
1018 free(ctx);
1019 return;
1020 }
1021 /* open snapshot blob and continue in the callback function */
1022 blob->parent_id = *(spdk_blob_id *)value;
1023 spdk_bs_open_blob(blob->bs, blob->parent_id,
1024 _spdk_blob_load_snapshot_cpl, ctx);
1025 return;
1026 } else {
1027 /* add zeroes_dev for thin provisioned blob */
1028 blob->back_bs_dev = spdk_bs_create_zeroes_dev();
1029 }
1030 } else {
1031 /* standard blob */
1032 blob->back_bs_dev = NULL;
1033 }
1034 _spdk_blob_load_final(ctx, bserrno);
1035 }
1036
1037 /* Load a blob from disk given a blobid */
1038 static void
1039 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1040 spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1041 {
1042 struct spdk_blob_load_ctx *ctx;
1043 struct spdk_blob_store *bs;
1044 uint32_t page_num;
1045 uint64_t lba;
1046
1047 _spdk_blob_verify_md_op(blob);
1048
1049 bs = blob->bs;
1050
1051 ctx = calloc(1, sizeof(*ctx));
1052 if (!ctx) {
1053 cb_fn(seq, cb_arg, -ENOMEM);
1054 return;
1055 }
1056
1057 ctx->blob = blob;
1058 ctx->pages = spdk_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE);
1059 if (!ctx->pages) {
1060 free(ctx);
1061 cb_fn(seq, cb_arg, -ENOMEM);
1062 return;
1063 }
1064 ctx->num_pages = 1;
1065 ctx->cb_fn = cb_fn;
1066 ctx->cb_arg = cb_arg;
1067
1068 page_num = _spdk_bs_blobid_to_page(blob->id);
1069 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
1070
1071 blob->state = SPDK_BLOB_STATE_LOADING;
1072
1073 spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba,
1074 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
1075 _spdk_blob_load_cpl, ctx);
1076 }
1077
1078 struct spdk_blob_persist_ctx {
1079 struct spdk_blob *blob;
1080
1081 struct spdk_bs_super_block *super;
1082
1083 struct spdk_blob_md_page *pages;
1084
1085 uint64_t idx;
1086
1087 spdk_bs_sequence_t *seq;
1088 spdk_bs_sequence_cpl cb_fn;
1089 void *cb_arg;
1090 };
1091
1092 static void
1093 spdk_bs_batch_clear_dev(struct spdk_blob_persist_ctx *ctx, spdk_bs_batch_t *batch, uint64_t lba,
1094 uint32_t lba_count)
1095 {
1096 if (ctx->blob->clear_method == BLOB_CLEAR_WITH_DEFAULT ||
1097 ctx->blob->clear_method == BLOB_CLEAR_WITH_UNMAP) {
1098 spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1099 } else if (ctx->blob->clear_method == BLOB_CLEAR_WITH_WRITE_ZEROES) {
1100 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1101 }
1102 }
1103
1104 static void
1105 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1106 {
1107 struct spdk_blob_persist_ctx *ctx = cb_arg;
1108 struct spdk_blob *blob = ctx->blob;
1109
1110 if (bserrno == 0) {
1111 _spdk_blob_mark_clean(blob);
1112 }
1113
1114 /* Call user callback */
1115 ctx->cb_fn(seq, ctx->cb_arg, bserrno);
1116
1117 /* Free the memory */
1118 spdk_free(ctx->pages);
1119 free(ctx);
1120 }
1121
1122 static void
1123 _spdk_blob_persist_clear_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1124 {
1125 struct spdk_blob_persist_ctx *ctx = cb_arg;
1126 struct spdk_blob *blob = ctx->blob;
1127 struct spdk_blob_store *bs = blob->bs;
1128 void *tmp;
1129 size_t i;
1130
1131 /* Release all clusters that were truncated */
1132 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1133 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
1134
1135 /* Nothing to release if it was not allocated */
1136 if (blob->active.clusters[i] != 0) {
1137 _spdk_bs_release_cluster(bs, cluster_num);
1138 }
1139 }
1140
1141 if (blob->active.num_clusters == 0) {
1142 free(blob->active.clusters);
1143 blob->active.clusters = NULL;
1144 blob->active.cluster_array_size = 0;
1145 } else if (blob->active.num_clusters != blob->active.cluster_array_size) {
1146 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
1147 assert(tmp != NULL);
1148 blob->active.clusters = tmp;
1149 blob->active.cluster_array_size = blob->active.num_clusters;
1150 }
1151
1152 _spdk_blob_persist_complete(seq, ctx, bserrno);
1153 }
1154
1155 static void
1156 _spdk_blob_persist_clear_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1157 {
1158 struct spdk_blob_persist_ctx *ctx = cb_arg;
1159 struct spdk_blob *blob = ctx->blob;
1160 struct spdk_blob_store *bs = blob->bs;
1161 spdk_bs_batch_t *batch;
1162 size_t i;
1163 uint64_t lba;
1164 uint32_t lba_count;
1165
1166 /* Clusters don't move around in blobs. The list shrinks or grows
1167 * at the end, but no changes ever occur in the middle of the list.
1168 */
1169
1170 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_clear_clusters_cpl, ctx);
1171
1172 /* Clear all clusters that were truncated */
1173 lba = 0;
1174 lba_count = 0;
1175 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1176 uint64_t next_lba = blob->active.clusters[i];
1177 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
1178
1179 if (next_lba > 0 && (lba + lba_count) == next_lba) {
1180 /* This cluster is contiguous with the previous one. */
1181 lba_count += next_lba_count;
1182 continue;
1183 }
1184
1185 /* This cluster is not contiguous with the previous one. */
1186
1187 /* If a run of LBAs previously existing, clear them now */
1188 if (lba_count > 0) {
1189 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count);
1190 }
1191
1192 /* Start building the next batch */
1193 lba = next_lba;
1194 if (next_lba > 0) {
1195 lba_count = next_lba_count;
1196 } else {
1197 lba_count = 0;
1198 }
1199 }
1200
1201 /* If we ended with a contiguous set of LBAs, clear them now */
1202 if (lba_count > 0) {
1203 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count);
1204 }
1205
1206 spdk_bs_batch_close(batch);
1207 }
1208
1209 static void
1210 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1211 {
1212 struct spdk_blob_persist_ctx *ctx = cb_arg;
1213 struct spdk_blob *blob = ctx->blob;
1214 struct spdk_blob_store *bs = blob->bs;
1215 size_t i;
1216
1217 /* This loop starts at 1 because the first page is special and handled
1218 * below. The pages (except the first) are never written in place,
1219 * so any pages in the clean list must be zeroed.
1220 */
1221 for (i = 1; i < blob->clean.num_pages; i++) {
1222 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
1223 }
1224
1225 if (blob->active.num_pages == 0) {
1226 uint32_t page_num;
1227
1228 page_num = _spdk_bs_blobid_to_page(blob->id);
1229 spdk_bit_array_clear(bs->used_md_pages, page_num);
1230 }
1231
1232 /* Move on to clearing clusters */
1233 _spdk_blob_persist_clear_clusters(seq, ctx, 0);
1234 }
1235
1236 static void
1237 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1238 {
1239 struct spdk_blob_persist_ctx *ctx = cb_arg;
1240 struct spdk_blob *blob = ctx->blob;
1241 struct spdk_blob_store *bs = blob->bs;
1242 uint64_t lba;
1243 uint32_t lba_count;
1244 spdk_bs_batch_t *batch;
1245 size_t i;
1246
1247 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
1248
1249 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
1250
1251 /* This loop starts at 1 because the first page is special and handled
1252 * below. The pages (except the first) are never written in place,
1253 * so any pages in the clean list must be zeroed.
1254 */
1255 for (i = 1; i < blob->clean.num_pages; i++) {
1256 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
1257
1258 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1259 }
1260
1261 /* The first page will only be zeroed if this is a delete. */
1262 if (blob->active.num_pages == 0) {
1263 uint32_t page_num;
1264
1265 /* The first page in the metadata goes where the blobid indicates */
1266 page_num = _spdk_bs_blobid_to_page(blob->id);
1267 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
1268
1269 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1270 }
1271
1272 spdk_bs_batch_close(batch);
1273 }
1274
1275 static void
1276 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1277 {
1278 struct spdk_blob_persist_ctx *ctx = cb_arg;
1279 struct spdk_blob *blob = ctx->blob;
1280 struct spdk_blob_store *bs = blob->bs;
1281 uint64_t lba;
1282 uint32_t lba_count;
1283 struct spdk_blob_md_page *page;
1284
1285 if (blob->active.num_pages == 0) {
1286 /* Move on to the next step */
1287 _spdk_blob_persist_zero_pages(seq, ctx, 0);
1288 return;
1289 }
1290
1291 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1292
1293 page = &ctx->pages[0];
1294 /* The first page in the metadata goes where the blobid indicates */
1295 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
1296
1297 spdk_bs_sequence_write_dev(seq, page, lba, lba_count,
1298 _spdk_blob_persist_zero_pages, ctx);
1299 }
1300
1301 static void
1302 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1303 {
1304 struct spdk_blob_persist_ctx *ctx = cb_arg;
1305 struct spdk_blob *blob = ctx->blob;
1306 struct spdk_blob_store *bs = blob->bs;
1307 uint64_t lba;
1308 uint32_t lba_count;
1309 struct spdk_blob_md_page *page;
1310 spdk_bs_batch_t *batch;
1311 size_t i;
1312
1313 /* Clusters don't move around in blobs. The list shrinks or grows
1314 * at the end, but no changes ever occur in the middle of the list.
1315 */
1316
1317 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1318
1319 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
1320
1321 /* This starts at 1. The root page is not written until
1322 * all of the others are finished
1323 */
1324 for (i = 1; i < blob->active.num_pages; i++) {
1325 page = &ctx->pages[i];
1326 assert(page->sequence_num == i);
1327
1328 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
1329
1330 spdk_bs_batch_write_dev(batch, page, lba, lba_count);
1331 }
1332
1333 spdk_bs_batch_close(batch);
1334 }
1335
1336 static int
1337 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz)
1338 {
1339 uint64_t i;
1340 uint64_t *tmp;
1341 uint64_t lfc; /* lowest free cluster */
1342 uint64_t num_clusters;
1343 struct spdk_blob_store *bs;
1344
1345 bs = blob->bs;
1346
1347 _spdk_blob_verify_md_op(blob);
1348
1349 if (blob->active.num_clusters == sz) {
1350 return 0;
1351 }
1352
1353 if (blob->active.num_clusters < blob->active.cluster_array_size) {
1354 /* If this blob was resized to be larger, then smaller, then
1355 * larger without syncing, then the cluster array already
1356 * contains spare assigned clusters we can use.
1357 */
1358 num_clusters = spdk_min(blob->active.cluster_array_size,
1359 sz);
1360 } else {
1361 num_clusters = blob->active.num_clusters;
1362 }
1363
1364 /* Do two passes - one to verify that we can obtain enough clusters
1365 * and another to actually claim them.
1366 */
1367
1368 if (spdk_blob_is_thin_provisioned(blob) == false) {
1369 lfc = 0;
1370 for (i = num_clusters; i < sz; i++) {
1371 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1372 if (lfc == UINT32_MAX) {
1373 /* No more free clusters. Cannot satisfy the request */
1374 return -ENOSPC;
1375 }
1376 lfc++;
1377 }
1378 }
1379
1380 if (sz > num_clusters) {
1381 /* Expand the cluster array if necessary.
1382 * We only shrink the array when persisting.
1383 */
1384 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1385 if (sz > 0 && tmp == NULL) {
1386 return -ENOMEM;
1387 }
1388 memset(tmp + blob->active.cluster_array_size, 0,
1389 sizeof(uint64_t) * (sz - blob->active.cluster_array_size));
1390 blob->active.clusters = tmp;
1391 blob->active.cluster_array_size = sz;
1392 }
1393
1394 blob->state = SPDK_BLOB_STATE_DIRTY;
1395
1396 if (spdk_blob_is_thin_provisioned(blob) == false) {
1397 lfc = 0;
1398 for (i = num_clusters; i < sz; i++) {
1399 _spdk_bs_allocate_cluster(blob, i, &lfc, true);
1400 lfc++;
1401 }
1402 }
1403
1404 blob->active.num_clusters = sz;
1405
1406 return 0;
1407 }
1408
1409 static void
1410 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx)
1411 {
1412 spdk_bs_sequence_t *seq = ctx->seq;
1413 struct spdk_blob *blob = ctx->blob;
1414 struct spdk_blob_store *bs = blob->bs;
1415 uint64_t i;
1416 uint32_t page_num;
1417 void *tmp;
1418 int rc;
1419
1420 if (blob->active.num_pages == 0) {
1421 /* This is the signal that the blob should be deleted.
1422 * Immediately jump to the clean up routine. */
1423 assert(blob->clean.num_pages > 0);
1424 ctx->idx = blob->clean.num_pages - 1;
1425 blob->state = SPDK_BLOB_STATE_CLEAN;
1426 _spdk_blob_persist_zero_pages(seq, ctx, 0);
1427 return;
1428
1429 }
1430
1431 /* Generate the new metadata */
1432 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1433 if (rc < 0) {
1434 _spdk_blob_persist_complete(seq, ctx, rc);
1435 return;
1436 }
1437
1438 assert(blob->active.num_pages >= 1);
1439
1440 /* Resize the cache of page indices */
1441 tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages));
1442 if (!tmp) {
1443 _spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1444 return;
1445 }
1446 blob->active.pages = tmp;
1447
1448 /* Assign this metadata to pages. This requires two passes -
1449 * one to verify that there are enough pages and a second
1450 * to actually claim them. */
1451 page_num = 0;
1452 /* Note that this loop starts at one. The first page location is fixed by the blobid. */
1453 for (i = 1; i < blob->active.num_pages; i++) {
1454 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1455 if (page_num == UINT32_MAX) {
1456 _spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1457 return;
1458 }
1459 page_num++;
1460 }
1461
1462 page_num = 0;
1463 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1464 for (i = 1; i < blob->active.num_pages; i++) {
1465 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1466 ctx->pages[i - 1].next = page_num;
1467 /* Now that previous metadata page is complete, calculate the crc for it. */
1468 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1469 blob->active.pages[i] = page_num;
1470 spdk_bit_array_set(bs->used_md_pages, page_num);
1471 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1472 page_num++;
1473 }
1474 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1475 /* Start writing the metadata from last page to first */
1476 ctx->idx = blob->active.num_pages - 1;
1477 blob->state = SPDK_BLOB_STATE_CLEAN;
1478 _spdk_blob_persist_write_page_chain(seq, ctx, 0);
1479 }
1480
1481 static void
1482 _spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1483 {
1484 struct spdk_blob_persist_ctx *ctx = cb_arg;
1485
1486 ctx->blob->bs->clean = 0;
1487
1488 spdk_free(ctx->super);
1489
1490 _spdk_blob_persist_start(ctx);
1491 }
1492
1493 static void
1494 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1495 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg);
1496
1497
1498 static void
1499 _spdk_blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1500 {
1501 struct spdk_blob_persist_ctx *ctx = cb_arg;
1502
1503 ctx->super->clean = 0;
1504 if (ctx->super->size == 0) {
1505 ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen;
1506 }
1507
1508 _spdk_bs_write_super(seq, ctx->blob->bs, ctx->super, _spdk_blob_persist_dirty_cpl, ctx);
1509 }
1510
1511
1512 /* Write a blob to disk */
1513 static void
1514 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1515 spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1516 {
1517 struct spdk_blob_persist_ctx *ctx;
1518
1519 _spdk_blob_verify_md_op(blob);
1520
1521 if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1522 cb_fn(seq, cb_arg, 0);
1523 return;
1524 }
1525
1526 ctx = calloc(1, sizeof(*ctx));
1527 if (!ctx) {
1528 cb_fn(seq, cb_arg, -ENOMEM);
1529 return;
1530 }
1531 ctx->blob = blob;
1532 ctx->seq = seq;
1533 ctx->cb_fn = cb_fn;
1534 ctx->cb_arg = cb_arg;
1535
1536 if (blob->bs->clean) {
1537 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
1538 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1539 if (!ctx->super) {
1540 cb_fn(seq, cb_arg, -ENOMEM);
1541 free(ctx);
1542 return;
1543 }
1544
1545 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(blob->bs, 0),
1546 _spdk_bs_byte_to_lba(blob->bs, sizeof(*ctx->super)),
1547 _spdk_blob_persist_dirty, ctx);
1548 } else {
1549 _spdk_blob_persist_start(ctx);
1550 }
1551 }
1552
1553 struct spdk_blob_copy_cluster_ctx {
1554 struct spdk_blob *blob;
1555 uint8_t *buf;
1556 uint64_t page;
1557 uint64_t new_cluster;
1558 spdk_bs_sequence_t *seq;
1559 };
1560
1561 static void
1562 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno)
1563 {
1564 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1565 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq;
1566 TAILQ_HEAD(, spdk_bs_request_set) requests;
1567 spdk_bs_user_op_t *op;
1568
1569 TAILQ_INIT(&requests);
1570 TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link);
1571
1572 while (!TAILQ_EMPTY(&requests)) {
1573 op = TAILQ_FIRST(&requests);
1574 TAILQ_REMOVE(&requests, op, link);
1575 if (bserrno == 0) {
1576 spdk_bs_user_op_execute(op);
1577 } else {
1578 spdk_bs_user_op_abort(op);
1579 }
1580 }
1581
1582 spdk_free(ctx->buf);
1583 free(ctx);
1584 }
1585
1586 static void
1587 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno)
1588 {
1589 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1590
1591 if (bserrno) {
1592 if (bserrno == -EEXIST) {
1593 /* The metadata insert failed because another thread
1594 * allocated the cluster first. Free our cluster
1595 * but continue without error. */
1596 bserrno = 0;
1597 }
1598 _spdk_bs_release_cluster(ctx->blob->bs, ctx->new_cluster);
1599 }
1600
1601 spdk_bs_sequence_finish(ctx->seq, bserrno);
1602 }
1603
1604 static void
1605 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1606 {
1607 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1608 uint32_t cluster_number;
1609
1610 if (bserrno) {
1611 /* The write failed, so jump to the final completion handler */
1612 spdk_bs_sequence_finish(seq, bserrno);
1613 return;
1614 }
1615
1616 cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1617
1618 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1619 _spdk_blob_insert_cluster_cpl, ctx);
1620 }
1621
1622 static void
1623 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1624 {
1625 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1626
1627 if (bserrno != 0) {
1628 /* The read failed, so jump to the final completion handler */
1629 spdk_bs_sequence_finish(seq, bserrno);
1630 return;
1631 }
1632
1633 /* Write whole cluster */
1634 spdk_bs_sequence_write_dev(seq, ctx->buf,
1635 _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster),
1636 _spdk_bs_cluster_to_lba(ctx->blob->bs, 1),
1637 _spdk_blob_write_copy_cpl, ctx);
1638 }
1639
1640 static void
1641 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob,
1642 struct spdk_io_channel *_ch,
1643 uint64_t io_unit, spdk_bs_user_op_t *op)
1644 {
1645 struct spdk_bs_cpl cpl;
1646 struct spdk_bs_channel *ch;
1647 struct spdk_blob_copy_cluster_ctx *ctx;
1648 uint32_t cluster_start_page;
1649 uint32_t cluster_number;
1650 int rc;
1651
1652 ch = spdk_io_channel_get_ctx(_ch);
1653
1654 if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) {
1655 /* There are already operations pending. Queue this user op
1656 * and return because it will be re-executed when the outstanding
1657 * cluster allocation completes. */
1658 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1659 return;
1660 }
1661
1662 /* Round the io_unit offset down to the first page in the cluster */
1663 cluster_start_page = _spdk_bs_io_unit_to_cluster_start(blob, io_unit);
1664
1665 /* Calculate which index in the metadata cluster array the corresponding
1666 * cluster is supposed to be at. */
1667 cluster_number = _spdk_bs_io_unit_to_cluster_number(blob, io_unit);
1668
1669 ctx = calloc(1, sizeof(*ctx));
1670 if (!ctx) {
1671 spdk_bs_user_op_abort(op);
1672 return;
1673 }
1674
1675 assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0);
1676
1677 ctx->blob = blob;
1678 ctx->page = cluster_start_page;
1679
1680 if (blob->parent_id != SPDK_BLOBID_INVALID) {
1681 ctx->buf = spdk_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen,
1682 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1683 if (!ctx->buf) {
1684 SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n",
1685 blob->bs->cluster_sz);
1686 free(ctx);
1687 spdk_bs_user_op_abort(op);
1688 return;
1689 }
1690 }
1691
1692 rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false);
1693 if (rc != 0) {
1694 spdk_free(ctx->buf);
1695 free(ctx);
1696 spdk_bs_user_op_abort(op);
1697 return;
1698 }
1699
1700 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1701 cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl;
1702 cpl.u.blob_basic.cb_arg = ctx;
1703
1704 ctx->seq = spdk_bs_sequence_start(_ch, &cpl);
1705 if (!ctx->seq) {
1706 _spdk_bs_release_cluster(blob->bs, ctx->new_cluster);
1707 spdk_free(ctx->buf);
1708 free(ctx);
1709 spdk_bs_user_op_abort(op);
1710 return;
1711 }
1712
1713 /* Queue the user op to block other incoming operations */
1714 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1715
1716 if (blob->parent_id != SPDK_BLOBID_INVALID) {
1717 /* Read cluster from backing device */
1718 spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf,
1719 _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page),
1720 _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz),
1721 _spdk_blob_write_copy, ctx);
1722 } else {
1723 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1724 _spdk_blob_insert_cluster_cpl, ctx);
1725 }
1726 }
1727
1728 static void
1729 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length,
1730 uint64_t *lba, uint32_t *lba_count)
1731 {
1732 *lba_count = length;
1733
1734 if (!_spdk_bs_io_unit_is_allocated(blob, io_unit)) {
1735 assert(blob->back_bs_dev != NULL);
1736 *lba = _spdk_bs_io_unit_to_back_dev_lba(blob, io_unit);
1737 *lba_count = _spdk_bs_io_unit_to_back_dev_lba(blob, *lba_count);
1738 } else {
1739 *lba = _spdk_bs_blob_io_unit_to_lba(blob, io_unit);
1740 }
1741 }
1742
1743 struct op_split_ctx {
1744 struct spdk_blob *blob;
1745 struct spdk_io_channel *channel;
1746 uint64_t io_unit_offset;
1747 uint64_t io_units_remaining;
1748 void *curr_payload;
1749 enum spdk_blob_op_type op_type;
1750 spdk_bs_sequence_t *seq;
1751 };
1752
1753 static void
1754 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno)
1755 {
1756 struct op_split_ctx *ctx = cb_arg;
1757 struct spdk_blob *blob = ctx->blob;
1758 struct spdk_io_channel *ch = ctx->channel;
1759 enum spdk_blob_op_type op_type = ctx->op_type;
1760 uint8_t *buf = ctx->curr_payload;
1761 uint64_t offset = ctx->io_unit_offset;
1762 uint64_t length = ctx->io_units_remaining;
1763 uint64_t op_length;
1764
1765 if (bserrno != 0 || ctx->io_units_remaining == 0) {
1766 spdk_bs_sequence_finish(ctx->seq, bserrno);
1767 free(ctx);
1768 return;
1769 }
1770
1771 op_length = spdk_min(length, _spdk_bs_num_io_units_to_cluster_boundary(blob,
1772 offset));
1773
1774 /* Update length and payload for next operation */
1775 ctx->io_units_remaining -= op_length;
1776 ctx->io_unit_offset += op_length;
1777 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1778 ctx->curr_payload += op_length * blob->bs->io_unit_size;
1779 }
1780
1781 switch (op_type) {
1782 case SPDK_BLOB_READ:
1783 spdk_blob_io_read(blob, ch, buf, offset, op_length,
1784 _spdk_blob_request_submit_op_split_next, ctx);
1785 break;
1786 case SPDK_BLOB_WRITE:
1787 spdk_blob_io_write(blob, ch, buf, offset, op_length,
1788 _spdk_blob_request_submit_op_split_next, ctx);
1789 break;
1790 case SPDK_BLOB_UNMAP:
1791 spdk_blob_io_unmap(blob, ch, offset, op_length,
1792 _spdk_blob_request_submit_op_split_next, ctx);
1793 break;
1794 case SPDK_BLOB_WRITE_ZEROES:
1795 spdk_blob_io_write_zeroes(blob, ch, offset, op_length,
1796 _spdk_blob_request_submit_op_split_next, ctx);
1797 break;
1798 case SPDK_BLOB_READV:
1799 case SPDK_BLOB_WRITEV:
1800 SPDK_ERRLOG("readv/write not valid for %s\n", __func__);
1801 spdk_bs_sequence_finish(ctx->seq, -EINVAL);
1802 free(ctx);
1803 break;
1804 }
1805 }
1806
1807 static void
1808 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob,
1809 void *payload, uint64_t offset, uint64_t length,
1810 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1811 {
1812 struct op_split_ctx *ctx;
1813 spdk_bs_sequence_t *seq;
1814 struct spdk_bs_cpl cpl;
1815
1816 assert(blob != NULL);
1817
1818 ctx = calloc(1, sizeof(struct op_split_ctx));
1819 if (ctx == NULL) {
1820 cb_fn(cb_arg, -ENOMEM);
1821 return;
1822 }
1823
1824 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1825 cpl.u.blob_basic.cb_fn = cb_fn;
1826 cpl.u.blob_basic.cb_arg = cb_arg;
1827
1828 seq = spdk_bs_sequence_start(ch, &cpl);
1829 if (!seq) {
1830 free(ctx);
1831 cb_fn(cb_arg, -ENOMEM);
1832 return;
1833 }
1834
1835 ctx->blob = blob;
1836 ctx->channel = ch;
1837 ctx->curr_payload = payload;
1838 ctx->io_unit_offset = offset;
1839 ctx->io_units_remaining = length;
1840 ctx->op_type = op_type;
1841 ctx->seq = seq;
1842
1843 _spdk_blob_request_submit_op_split_next(ctx, 0);
1844 }
1845
1846 static void
1847 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob,
1848 void *payload, uint64_t offset, uint64_t length,
1849 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1850 {
1851 struct spdk_bs_cpl cpl;
1852 uint64_t lba;
1853 uint32_t lba_count;
1854
1855 assert(blob != NULL);
1856
1857 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1858 cpl.u.blob_basic.cb_fn = cb_fn;
1859 cpl.u.blob_basic.cb_arg = cb_arg;
1860
1861 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
1862
1863 if (blob->frozen_refcnt) {
1864 /* This blob I/O is frozen */
1865 spdk_bs_user_op_t *op;
1866 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch);
1867
1868 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
1869 if (!op) {
1870 cb_fn(cb_arg, -ENOMEM);
1871 return;
1872 }
1873
1874 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
1875
1876 return;
1877 }
1878
1879 switch (op_type) {
1880 case SPDK_BLOB_READ: {
1881 spdk_bs_batch_t *batch;
1882
1883 batch = spdk_bs_batch_open(_ch, &cpl);
1884 if (!batch) {
1885 cb_fn(cb_arg, -ENOMEM);
1886 return;
1887 }
1888
1889 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
1890 /* Read from the blob */
1891 spdk_bs_batch_read_dev(batch, payload, lba, lba_count);
1892 } else {
1893 /* Read from the backing block device */
1894 spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count);
1895 }
1896
1897 spdk_bs_batch_close(batch);
1898 break;
1899 }
1900 case SPDK_BLOB_WRITE:
1901 case SPDK_BLOB_WRITE_ZEROES: {
1902 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
1903 /* Write to the blob */
1904 spdk_bs_batch_t *batch;
1905
1906 if (lba_count == 0) {
1907 cb_fn(cb_arg, 0);
1908 return;
1909 }
1910
1911 batch = spdk_bs_batch_open(_ch, &cpl);
1912 if (!batch) {
1913 cb_fn(cb_arg, -ENOMEM);
1914 return;
1915 }
1916
1917 if (op_type == SPDK_BLOB_WRITE) {
1918 spdk_bs_batch_write_dev(batch, payload, lba, lba_count);
1919 } else {
1920 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1921 }
1922
1923 spdk_bs_batch_close(batch);
1924 } else {
1925 /* Queue this operation and allocate the cluster */
1926 spdk_bs_user_op_t *op;
1927
1928 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
1929 if (!op) {
1930 cb_fn(cb_arg, -ENOMEM);
1931 return;
1932 }
1933
1934 _spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op);
1935 }
1936 break;
1937 }
1938 case SPDK_BLOB_UNMAP: {
1939 spdk_bs_batch_t *batch;
1940
1941 batch = spdk_bs_batch_open(_ch, &cpl);
1942 if (!batch) {
1943 cb_fn(cb_arg, -ENOMEM);
1944 return;
1945 }
1946
1947 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
1948 spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1949 }
1950
1951 spdk_bs_batch_close(batch);
1952 break;
1953 }
1954 case SPDK_BLOB_READV:
1955 case SPDK_BLOB_WRITEV:
1956 SPDK_ERRLOG("readv/write not valid\n");
1957 cb_fn(cb_arg, -EINVAL);
1958 break;
1959 }
1960 }
1961
1962 static void
1963 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1964 void *payload, uint64_t offset, uint64_t length,
1965 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1966 {
1967 assert(blob != NULL);
1968
1969 if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1970 cb_fn(cb_arg, -EPERM);
1971 return;
1972 }
1973
1974 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) {
1975 cb_fn(cb_arg, -EINVAL);
1976 return;
1977 }
1978 if (length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset)) {
1979 _spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length,
1980 cb_fn, cb_arg, op_type);
1981 } else {
1982 _spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length,
1983 cb_fn, cb_arg, op_type);
1984 }
1985 }
1986
1987 struct rw_iov_ctx {
1988 struct spdk_blob *blob;
1989 struct spdk_io_channel *channel;
1990 spdk_blob_op_complete cb_fn;
1991 void *cb_arg;
1992 bool read;
1993 int iovcnt;
1994 struct iovec *orig_iov;
1995 uint64_t io_unit_offset;
1996 uint64_t io_units_remaining;
1997 uint64_t io_units_done;
1998 struct iovec iov[0];
1999 };
2000
2001 static void
2002 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2003 {
2004 assert(cb_arg == NULL);
2005 spdk_bs_sequence_finish(seq, bserrno);
2006 }
2007
2008 static void
2009 _spdk_rw_iov_split_next(void *cb_arg, int bserrno)
2010 {
2011 struct rw_iov_ctx *ctx = cb_arg;
2012 struct spdk_blob *blob = ctx->blob;
2013 struct iovec *iov, *orig_iov;
2014 int iovcnt;
2015 size_t orig_iovoff;
2016 uint64_t io_units_count, io_units_to_boundary, io_unit_offset;
2017 uint64_t byte_count;
2018
2019 if (bserrno != 0 || ctx->io_units_remaining == 0) {
2020 ctx->cb_fn(ctx->cb_arg, bserrno);
2021 free(ctx);
2022 return;
2023 }
2024
2025 io_unit_offset = ctx->io_unit_offset;
2026 io_units_to_boundary = _spdk_bs_num_io_units_to_cluster_boundary(blob, io_unit_offset);
2027 io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary);
2028 /*
2029 * Get index and offset into the original iov array for our current position in the I/O sequence.
2030 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
2031 * point to the current position in the I/O sequence.
2032 */
2033 byte_count = ctx->io_units_done * blob->bs->io_unit_size;
2034 orig_iov = &ctx->orig_iov[0];
2035 orig_iovoff = 0;
2036 while (byte_count > 0) {
2037 if (byte_count >= orig_iov->iov_len) {
2038 byte_count -= orig_iov->iov_len;
2039 orig_iov++;
2040 } else {
2041 orig_iovoff = byte_count;
2042 byte_count = 0;
2043 }
2044 }
2045
2046 /*
2047 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many
2048 * bytes of this next I/O remain to be accounted for in the new iov array.
2049 */
2050 byte_count = io_units_count * blob->bs->io_unit_size;
2051 iov = &ctx->iov[0];
2052 iovcnt = 0;
2053 while (byte_count > 0) {
2054 assert(iovcnt < ctx->iovcnt);
2055 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
2056 iov->iov_base = orig_iov->iov_base + orig_iovoff;
2057 byte_count -= iov->iov_len;
2058 orig_iovoff = 0;
2059 orig_iov++;
2060 iov++;
2061 iovcnt++;
2062 }
2063
2064 ctx->io_unit_offset += io_units_count;
2065 ctx->io_units_remaining -= io_units_count;
2066 ctx->io_units_done += io_units_count;
2067 iov = &ctx->iov[0];
2068
2069 if (ctx->read) {
2070 spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset,
2071 io_units_count, _spdk_rw_iov_split_next, ctx);
2072 } else {
2073 spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset,
2074 io_units_count, _spdk_rw_iov_split_next, ctx);
2075 }
2076 }
2077
2078 static void
2079 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel,
2080 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2081 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
2082 {
2083 struct spdk_bs_cpl cpl;
2084
2085 assert(blob != NULL);
2086
2087 if (!read && blob->data_ro) {
2088 cb_fn(cb_arg, -EPERM);
2089 return;
2090 }
2091
2092 if (length == 0) {
2093 cb_fn(cb_arg, 0);
2094 return;
2095 }
2096
2097 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) {
2098 cb_fn(cb_arg, -EINVAL);
2099 return;
2100 }
2101
2102 /*
2103 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
2104 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary,
2105 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster
2106 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
2107 * to allocate a separate iov array and split the I/O such that none of the resulting
2108 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel)
2109 * but since this case happens very infrequently, any performance impact will be negligible.
2110 *
2111 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
2112 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
2113 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called
2114 * when the batch was completed, to allow for freeing the memory for the iov arrays.
2115 */
2116 if (spdk_likely(length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset))) {
2117 uint32_t lba_count;
2118 uint64_t lba;
2119
2120 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2121 cpl.u.blob_basic.cb_fn = cb_fn;
2122 cpl.u.blob_basic.cb_arg = cb_arg;
2123
2124 if (blob->frozen_refcnt) {
2125 /* This blob I/O is frozen */
2126 enum spdk_blob_op_type op_type;
2127 spdk_bs_user_op_t *op;
2128 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel);
2129
2130 op_type = read ? SPDK_BLOB_READV : SPDK_BLOB_WRITEV;
2131 op = spdk_bs_user_op_alloc(_channel, &cpl, op_type, blob, iov, iovcnt, offset, length);
2132 if (!op) {
2133 cb_fn(cb_arg, -ENOMEM);
2134 return;
2135 }
2136
2137 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
2138
2139 return;
2140 }
2141
2142 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
2143
2144 if (read) {
2145 spdk_bs_sequence_t *seq;
2146
2147 seq = spdk_bs_sequence_start(_channel, &cpl);
2148 if (!seq) {
2149 cb_fn(cb_arg, -ENOMEM);
2150 return;
2151 }
2152
2153 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2154 spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
2155 } else {
2156 spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count,
2157 _spdk_rw_iov_done, NULL);
2158 }
2159 } else {
2160 if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2161 spdk_bs_sequence_t *seq;
2162
2163 seq = spdk_bs_sequence_start(_channel, &cpl);
2164 if (!seq) {
2165 cb_fn(cb_arg, -ENOMEM);
2166 return;
2167 }
2168
2169 spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
2170 } else {
2171 /* Queue this operation and allocate the cluster */
2172 spdk_bs_user_op_t *op;
2173
2174 op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset,
2175 length);
2176 if (!op) {
2177 cb_fn(cb_arg, -ENOMEM);
2178 return;
2179 }
2180
2181 _spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op);
2182 }
2183 }
2184 } else {
2185 struct rw_iov_ctx *ctx;
2186
2187 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
2188 if (ctx == NULL) {
2189 cb_fn(cb_arg, -ENOMEM);
2190 return;
2191 }
2192
2193 ctx->blob = blob;
2194 ctx->channel = _channel;
2195 ctx->cb_fn = cb_fn;
2196 ctx->cb_arg = cb_arg;
2197 ctx->read = read;
2198 ctx->orig_iov = iov;
2199 ctx->iovcnt = iovcnt;
2200 ctx->io_unit_offset = offset;
2201 ctx->io_units_remaining = length;
2202 ctx->io_units_done = 0;
2203
2204 _spdk_rw_iov_split_next(ctx, 0);
2205 }
2206 }
2207
2208 static struct spdk_blob *
2209 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
2210 {
2211 struct spdk_blob *blob;
2212
2213 TAILQ_FOREACH(blob, &bs->blobs, link) {
2214 if (blob->id == blobid) {
2215 return blob;
2216 }
2217 }
2218
2219 return NULL;
2220 }
2221
2222 static void
2223 _spdk_blob_get_snapshot_and_clone_entries(struct spdk_blob *blob,
2224 struct spdk_blob_list **snapshot_entry, struct spdk_blob_list **clone_entry)
2225 {
2226 assert(blob != NULL);
2227 *snapshot_entry = NULL;
2228 *clone_entry = NULL;
2229
2230 if (blob->parent_id == SPDK_BLOBID_INVALID) {
2231 return;
2232 }
2233
2234 TAILQ_FOREACH(*snapshot_entry, &blob->bs->snapshots, link) {
2235 if ((*snapshot_entry)->id == blob->parent_id) {
2236 break;
2237 }
2238 }
2239
2240 if (*snapshot_entry != NULL) {
2241 TAILQ_FOREACH(*clone_entry, &(*snapshot_entry)->clones, link) {
2242 if ((*clone_entry)->id == blob->id) {
2243 break;
2244 }
2245 }
2246
2247 assert(clone_entry != NULL);
2248 }
2249 }
2250
2251 static int
2252 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
2253 {
2254 struct spdk_blob_store *bs = io_device;
2255 struct spdk_bs_channel *channel = ctx_buf;
2256 struct spdk_bs_dev *dev;
2257 uint32_t max_ops = bs->max_channel_ops;
2258 uint32_t i;
2259
2260 dev = bs->dev;
2261
2262 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
2263 if (!channel->req_mem) {
2264 return -1;
2265 }
2266
2267 TAILQ_INIT(&channel->reqs);
2268
2269 for (i = 0; i < max_ops; i++) {
2270 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
2271 }
2272
2273 channel->bs = bs;
2274 channel->dev = dev;
2275 channel->dev_channel = dev->create_channel(dev);
2276
2277 if (!channel->dev_channel) {
2278 SPDK_ERRLOG("Failed to create device channel.\n");
2279 free(channel->req_mem);
2280 return -1;
2281 }
2282
2283 TAILQ_INIT(&channel->need_cluster_alloc);
2284 TAILQ_INIT(&channel->queued_io);
2285
2286 return 0;
2287 }
2288
2289 static void
2290 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
2291 {
2292 struct spdk_bs_channel *channel = ctx_buf;
2293 spdk_bs_user_op_t *op;
2294
2295 while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) {
2296 op = TAILQ_FIRST(&channel->need_cluster_alloc);
2297 TAILQ_REMOVE(&channel->need_cluster_alloc, op, link);
2298 spdk_bs_user_op_abort(op);
2299 }
2300
2301 while (!TAILQ_EMPTY(&channel->queued_io)) {
2302 op = TAILQ_FIRST(&channel->queued_io);
2303 TAILQ_REMOVE(&channel->queued_io, op, link);
2304 spdk_bs_user_op_abort(op);
2305 }
2306
2307 free(channel->req_mem);
2308 channel->dev->destroy_channel(channel->dev, channel->dev_channel);
2309 }
2310
2311 static void
2312 _spdk_bs_dev_destroy(void *io_device)
2313 {
2314 struct spdk_blob_store *bs = io_device;
2315 struct spdk_blob *blob, *blob_tmp;
2316
2317 bs->dev->destroy(bs->dev);
2318
2319 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
2320 TAILQ_REMOVE(&bs->blobs, blob, link);
2321 _spdk_blob_free(blob);
2322 }
2323
2324 pthread_mutex_destroy(&bs->used_clusters_mutex);
2325
2326 spdk_bit_array_free(&bs->used_blobids);
2327 spdk_bit_array_free(&bs->used_md_pages);
2328 spdk_bit_array_free(&bs->used_clusters);
2329 /*
2330 * If this function is called for any reason except a successful unload,
2331 * the unload_cpl type will be NONE and this will be a nop.
2332 */
2333 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
2334
2335 free(bs);
2336 }
2337
2338 static int
2339 _spdk_bs_blob_list_add(struct spdk_blob *blob)
2340 {
2341 spdk_blob_id snapshot_id;
2342 struct spdk_blob_list *snapshot_entry = NULL;
2343 struct spdk_blob_list *clone_entry = NULL;
2344
2345 assert(blob != NULL);
2346
2347 snapshot_id = blob->parent_id;
2348 if (snapshot_id == SPDK_BLOBID_INVALID) {
2349 return 0;
2350 }
2351
2352 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, snapshot_id);
2353 if (snapshot_entry == NULL) {
2354 /* Snapshot not found */
2355 snapshot_entry = calloc(1, sizeof(struct spdk_blob_list));
2356 if (snapshot_entry == NULL) {
2357 return -ENOMEM;
2358 }
2359 snapshot_entry->id = snapshot_id;
2360 TAILQ_INIT(&snapshot_entry->clones);
2361 TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link);
2362 } else {
2363 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
2364 if (clone_entry->id == blob->id) {
2365 break;
2366 }
2367 }
2368 }
2369
2370 if (clone_entry == NULL) {
2371 /* Clone not found */
2372 clone_entry = calloc(1, sizeof(struct spdk_blob_list));
2373 if (clone_entry == NULL) {
2374 return -ENOMEM;
2375 }
2376 clone_entry->id = blob->id;
2377 TAILQ_INIT(&clone_entry->clones);
2378 TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link);
2379 snapshot_entry->clone_count++;
2380 }
2381
2382 return 0;
2383 }
2384
2385 static void
2386 _spdk_bs_blob_list_remove(struct spdk_blob *blob)
2387 {
2388 struct spdk_blob_list *snapshot_entry = NULL;
2389 struct spdk_blob_list *clone_entry = NULL;
2390
2391 _spdk_blob_get_snapshot_and_clone_entries(blob, &snapshot_entry, &clone_entry);
2392
2393 if (snapshot_entry == NULL) {
2394 return;
2395 }
2396
2397 blob->parent_id = SPDK_BLOBID_INVALID;
2398 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
2399 free(clone_entry);
2400
2401 snapshot_entry->clone_count--;
2402 }
2403
2404 static int
2405 _spdk_bs_blob_list_free(struct spdk_blob_store *bs)
2406 {
2407 struct spdk_blob_list *snapshot_entry;
2408 struct spdk_blob_list *snapshot_entry_tmp;
2409 struct spdk_blob_list *clone_entry;
2410 struct spdk_blob_list *clone_entry_tmp;
2411
2412 TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) {
2413 TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) {
2414 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
2415 free(clone_entry);
2416 }
2417 TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link);
2418 free(snapshot_entry);
2419 }
2420
2421 return 0;
2422 }
2423
2424 static void
2425 _spdk_bs_free(struct spdk_blob_store *bs)
2426 {
2427 _spdk_bs_blob_list_free(bs);
2428
2429 spdk_bs_unregister_md_thread(bs);
2430 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
2431 }
2432
2433 void
2434 spdk_bs_opts_init(struct spdk_bs_opts *opts)
2435 {
2436 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
2437 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
2438 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
2439 opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS;
2440 opts->clear_method = BS_CLEAR_WITH_UNMAP;
2441 memset(&opts->bstype, 0, sizeof(opts->bstype));
2442 opts->iter_cb_fn = NULL;
2443 opts->iter_cb_arg = NULL;
2444 }
2445
2446 static int
2447 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
2448 {
2449 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
2450 opts->max_channel_ops == 0) {
2451 SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
2452 return -1;
2453 }
2454
2455 return 0;
2456 }
2457
2458 static int
2459 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs)
2460 {
2461 struct spdk_blob_store *bs;
2462 uint64_t dev_size;
2463 int rc;
2464
2465 dev_size = dev->blocklen * dev->blockcnt;
2466 if (dev_size < opts->cluster_sz) {
2467 /* Device size cannot be smaller than cluster size of blobstore */
2468 SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n",
2469 dev_size, opts->cluster_sz);
2470 return -ENOSPC;
2471 }
2472 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
2473 /* Cluster size cannot be smaller than page size */
2474 SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n",
2475 opts->cluster_sz, SPDK_BS_PAGE_SIZE);
2476 return -EINVAL;
2477 }
2478 bs = calloc(1, sizeof(struct spdk_blob_store));
2479 if (!bs) {
2480 return -ENOMEM;
2481 }
2482
2483 TAILQ_INIT(&bs->blobs);
2484 TAILQ_INIT(&bs->snapshots);
2485 bs->dev = dev;
2486 bs->md_thread = spdk_get_thread();
2487 assert(bs->md_thread != NULL);
2488
2489 /*
2490 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
2491 * even multiple of the cluster size.
2492 */
2493 bs->cluster_sz = opts->cluster_sz;
2494 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
2495 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2496 bs->num_free_clusters = bs->total_clusters;
2497 bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
2498 bs->io_unit_size = dev->blocklen;
2499 if (bs->used_clusters == NULL) {
2500 free(bs);
2501 return -ENOMEM;
2502 }
2503
2504 bs->max_channel_ops = opts->max_channel_ops;
2505 bs->super_blob = SPDK_BLOBID_INVALID;
2506 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
2507
2508 /* The metadata is assumed to be at least 1 page */
2509 bs->used_md_pages = spdk_bit_array_create(1);
2510 bs->used_blobids = spdk_bit_array_create(0);
2511
2512 pthread_mutex_init(&bs->used_clusters_mutex, NULL);
2513
2514 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
2515 sizeof(struct spdk_bs_channel), "blobstore");
2516 rc = spdk_bs_register_md_thread(bs);
2517 if (rc == -1) {
2518 spdk_io_device_unregister(bs, NULL);
2519 pthread_mutex_destroy(&bs->used_clusters_mutex);
2520 spdk_bit_array_free(&bs->used_blobids);
2521 spdk_bit_array_free(&bs->used_md_pages);
2522 spdk_bit_array_free(&bs->used_clusters);
2523 free(bs);
2524 /* FIXME: this is a lie but don't know how to get a proper error code here */
2525 return -ENOMEM;
2526 }
2527
2528 *_bs = bs;
2529 return 0;
2530 }
2531
2532 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
2533
2534 struct spdk_bs_load_ctx {
2535 struct spdk_blob_store *bs;
2536 struct spdk_bs_super_block *super;
2537
2538 struct spdk_bs_md_mask *mask;
2539 bool in_page_chain;
2540 uint32_t page_index;
2541 uint32_t cur_page;
2542 struct spdk_blob_md_page *page;
2543
2544 spdk_bs_sequence_t *seq;
2545 spdk_blob_op_with_handle_complete iter_cb_fn;
2546 void *iter_cb_arg;
2547 struct spdk_blob *blob;
2548 spdk_blob_id blobid;
2549 };
2550
2551 static void
2552 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2553 {
2554 assert(bserrno != 0);
2555
2556 spdk_free(ctx->super);
2557 spdk_bs_sequence_finish(seq, bserrno);
2558 _spdk_bs_free(ctx->bs);
2559 free(ctx);
2560 }
2561
2562 static void
2563 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
2564 {
2565 uint32_t i = 0;
2566
2567 while (true) {
2568 i = spdk_bit_array_find_first_set(array, i);
2569 if (i >= mask->length) {
2570 break;
2571 }
2572 mask->mask[i / 8] |= 1U << (i % 8);
2573 i++;
2574 }
2575 }
2576
2577 static int
2578 _spdk_bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask)
2579 {
2580 struct spdk_bit_array *array;
2581 uint32_t i;
2582
2583 if (spdk_bit_array_resize(array_ptr, mask->length) < 0) {
2584 return -ENOMEM;
2585 }
2586
2587 array = *array_ptr;
2588 for (i = 0; i < mask->length; i++) {
2589 if (mask->mask[i / 8] & (1U << (i % 8))) {
2590 spdk_bit_array_set(array, i);
2591 }
2592 }
2593
2594 return 0;
2595 }
2596
2597 static void
2598 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
2599 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
2600 {
2601 /* Update the values in the super block */
2602 super->super_blob = bs->super_blob;
2603 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
2604 super->crc = _spdk_blob_md_page_calc_crc(super);
2605 spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0),
2606 _spdk_bs_byte_to_lba(bs, sizeof(*super)),
2607 cb_fn, cb_arg);
2608 }
2609
2610 static void
2611 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2612 {
2613 struct spdk_bs_load_ctx *ctx = arg;
2614 uint64_t mask_size, lba, lba_count;
2615
2616 /* Write out the used clusters mask */
2617 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2618 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL,
2619 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2620 if (!ctx->mask) {
2621 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2622 return;
2623 }
2624
2625 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
2626 ctx->mask->length = ctx->bs->total_clusters;
2627 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
2628
2629 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
2630 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2631 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2632 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2633 }
2634
2635 static void
2636 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2637 {
2638 struct spdk_bs_load_ctx *ctx = arg;
2639 uint64_t mask_size, lba, lba_count;
2640
2641 if (seq->bserrno) {
2642 _spdk_bs_load_ctx_fail(seq, ctx, seq->bserrno);
2643 return;
2644 }
2645
2646 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2647 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL,
2648 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2649 if (!ctx->mask) {
2650 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2651 return;
2652 }
2653
2654 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
2655 ctx->mask->length = ctx->super->md_len;
2656 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
2657
2658 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
2659 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2660 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2661 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2662 }
2663
2664 static void
2665 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2666 {
2667 struct spdk_bs_load_ctx *ctx = arg;
2668 uint64_t mask_size, lba, lba_count;
2669
2670 if (ctx->super->used_blobid_mask_len == 0) {
2671 /*
2672 * This is a pre-v3 on-disk format where the blobid mask does not get
2673 * written to disk.
2674 */
2675 cb_fn(seq, arg, 0);
2676 return;
2677 }
2678
2679 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2680 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
2681 SPDK_MALLOC_DMA);
2682 if (!ctx->mask) {
2683 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2684 return;
2685 }
2686
2687 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
2688 ctx->mask->length = ctx->super->md_len;
2689 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
2690
2691 _spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
2692 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2693 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2694 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2695 }
2696
2697 static void
2698 _spdk_blob_set_thin_provision(struct spdk_blob *blob)
2699 {
2700 _spdk_blob_verify_md_op(blob);
2701 blob->invalid_flags |= SPDK_BLOB_THIN_PROV;
2702 blob->state = SPDK_BLOB_STATE_DIRTY;
2703 }
2704
2705 static void _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno);
2706
2707 static void
2708 _spdk_bs_delete_corrupted_blob_cpl(void *cb_arg, int bserrno)
2709 {
2710 struct spdk_bs_load_ctx *ctx = cb_arg;
2711 spdk_blob_id id;
2712 int64_t page_num;
2713
2714 /* Iterate to next blob (we can't use spdk_bs_iter_next function as our
2715 * last blob has been removed */
2716 page_num = _spdk_bs_blobid_to_page(ctx->blobid);
2717 page_num++;
2718 page_num = spdk_bit_array_find_first_set(ctx->bs->used_blobids, page_num);
2719 if (page_num >= spdk_bit_array_capacity(ctx->bs->used_blobids)) {
2720 _spdk_bs_load_iter(ctx, NULL, -ENOENT);
2721 return;
2722 }
2723
2724 id = _spdk_bs_page_to_blobid(page_num);
2725
2726 spdk_bs_open_blob(ctx->bs, id, _spdk_bs_load_iter, ctx);
2727 }
2728
2729 static void
2730 _spdk_bs_delete_corrupted_close_cb(void *cb_arg, int bserrno)
2731 {
2732 struct spdk_bs_load_ctx *ctx = cb_arg;
2733
2734 if (bserrno != 0) {
2735 SPDK_ERRLOG("Failed to close corrupted blob\n");
2736 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2737 return;
2738 }
2739
2740 spdk_bs_delete_blob(ctx->bs, ctx->blobid, _spdk_bs_delete_corrupted_blob_cpl, ctx);
2741 }
2742
2743 static void
2744 _spdk_bs_delete_corrupted_blob(void *cb_arg, int bserrno)
2745 {
2746 struct spdk_bs_load_ctx *ctx = cb_arg;
2747 uint64_t i;
2748
2749 if (bserrno != 0) {
2750 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n");
2751 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2752 return;
2753 }
2754
2755 /* Snapshot and clone have the same copy of cluster map at this point.
2756 * Let's clear cluster map for snpashot now so that it won't be cleared
2757 * for clone later when we remove snapshot. Also set thin provision to
2758 * pass data corruption check */
2759 for (i = 0; i < ctx->blob->active.num_clusters; i++) {
2760 ctx->blob->active.clusters[i] = 0;
2761 }
2762
2763 ctx->blob->md_ro = false;
2764
2765 _spdk_blob_set_thin_provision(ctx->blob);
2766
2767 ctx->blobid = ctx->blob->id;
2768
2769 spdk_blob_close(ctx->blob, _spdk_bs_delete_corrupted_close_cb, ctx);
2770 }
2771
2772 static void
2773 _spdk_bs_update_corrupted_blob(void *cb_arg, int bserrno)
2774 {
2775 struct spdk_bs_load_ctx *ctx = cb_arg;
2776
2777 if (bserrno != 0) {
2778 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n");
2779 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2780 return;
2781 }
2782
2783 ctx->blob->md_ro = false;
2784 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_PENDING_REMOVAL, true);
2785 spdk_blob_set_read_only(ctx->blob);
2786
2787 if (ctx->iter_cb_fn) {
2788 ctx->iter_cb_fn(ctx->iter_cb_arg, ctx->blob, 0);
2789 }
2790 _spdk_bs_blob_list_add(ctx->blob);
2791
2792 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2793 }
2794
2795 static void
2796 _spdk_bs_examine_clone(void *cb_arg, struct spdk_blob *blob, int bserrno)
2797 {
2798 struct spdk_bs_load_ctx *ctx = cb_arg;
2799
2800 if (bserrno != 0) {
2801 SPDK_ERRLOG("Failed to open clone of a corrupted blob\n");
2802 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2803 return;
2804 }
2805
2806 if (blob->parent_id == ctx->blob->id) {
2807 /* Power failure occured before updating clone - keep snapshot */
2808 spdk_blob_close(blob, _spdk_bs_update_corrupted_blob, ctx);
2809 } else {
2810 /* Power failure occured after updating clone - remove snapshot */
2811 spdk_blob_close(blob, _spdk_bs_delete_corrupted_blob, ctx);
2812 }
2813 }
2814
2815 static void
2816 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno)
2817 {
2818 struct spdk_bs_load_ctx *ctx = arg;
2819 const void *value;
2820 size_t len;
2821 int rc = 0;
2822
2823 if (bserrno == 0) {
2824 /* Examine blob if it is corrupted after power failure. Fix
2825 * the ones that can be fixed and remove any other corrupted
2826 * ones. If it is not corrupted just process it */
2827 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_PENDING_REMOVAL, &value, &len, true);
2828 if (rc != 0) {
2829 /* Not corrupted - process it and continue with iterating through blobs */
2830 if (ctx->iter_cb_fn) {
2831 ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0);
2832 }
2833 _spdk_bs_blob_list_add(blob);
2834 spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx);
2835 return;
2836 }
2837
2838 assert(len == sizeof(spdk_blob_id));
2839
2840 ctx->blob = blob;
2841
2842 /* Open clone to check if we are able to fix this blob or should we remove it */
2843 spdk_bs_open_blob(ctx->bs, *(spdk_blob_id *)value, _spdk_bs_examine_clone, ctx);
2844 return;
2845 } else if (bserrno == -ENOENT) {
2846 bserrno = 0;
2847 } else {
2848 /*
2849 * This case needs to be looked at further. Same problem
2850 * exists with applications that rely on explicit blob
2851 * iteration. We should just skip the blob that failed
2852 * to load and continue on to the next one.
2853 */
2854 SPDK_ERRLOG("Error in iterating blobs\n");
2855 }
2856
2857 ctx->iter_cb_fn = NULL;
2858
2859 spdk_free(ctx->super);
2860 spdk_free(ctx->mask);
2861 spdk_bs_sequence_finish(ctx->seq, bserrno);
2862 free(ctx);
2863 }
2864
2865 static void
2866 _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2867 {
2868 ctx->seq = seq;
2869 spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx);
2870 }
2871
2872 static void
2873 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2874 {
2875 struct spdk_bs_load_ctx *ctx = cb_arg;
2876 int rc;
2877
2878 /* The type must be correct */
2879 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
2880
2881 /* The length of the mask (in bits) must not be greater than
2882 * the length of the buffer (converted to bits) */
2883 assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
2884
2885 /* The length of the mask must be exactly equal to the size
2886 * (in pages) of the metadata region */
2887 assert(ctx->mask->length == ctx->super->md_len);
2888
2889 rc = _spdk_bs_load_mask(&ctx->bs->used_blobids, ctx->mask);
2890 if (rc < 0) {
2891 spdk_free(ctx->mask);
2892 _spdk_bs_load_ctx_fail(seq, ctx, rc);
2893 return;
2894 }
2895
2896 _spdk_bs_load_complete(seq, ctx, bserrno);
2897 }
2898
2899 static void
2900 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2901 {
2902 struct spdk_bs_load_ctx *ctx = cb_arg;
2903 uint64_t lba, lba_count, mask_size;
2904 int rc;
2905
2906 /* The type must be correct */
2907 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
2908 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2909 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
2910 struct spdk_blob_md_page) * 8));
2911 /* The length of the mask must be exactly equal to the total number of clusters */
2912 assert(ctx->mask->length == ctx->bs->total_clusters);
2913
2914 rc = _spdk_bs_load_mask(&ctx->bs->used_clusters, ctx->mask);
2915 if (rc < 0) {
2916 spdk_free(ctx->mask);
2917 _spdk_bs_load_ctx_fail(seq, ctx, rc);
2918 return;
2919 }
2920
2921 ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters);
2922 assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters);
2923
2924 spdk_free(ctx->mask);
2925
2926 /* Read the used blobids mask */
2927 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2928 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
2929 SPDK_MALLOC_DMA);
2930 if (!ctx->mask) {
2931 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2932 return;
2933 }
2934 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2935 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2936 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2937 _spdk_bs_load_used_blobids_cpl, ctx);
2938 }
2939
2940 static void
2941 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2942 {
2943 struct spdk_bs_load_ctx *ctx = cb_arg;
2944 uint64_t lba, lba_count, mask_size;
2945 int rc;
2946
2947 /* The type must be correct */
2948 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
2949 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2950 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
2951 8));
2952 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
2953 assert(ctx->mask->length == ctx->super->md_len);
2954
2955 rc = _spdk_bs_load_mask(&ctx->bs->used_md_pages, ctx->mask);
2956 if (rc < 0) {
2957 spdk_free(ctx->mask);
2958 _spdk_bs_load_ctx_fail(seq, ctx, rc);
2959 return;
2960 }
2961
2962 spdk_free(ctx->mask);
2963
2964 /* Read the used clusters mask */
2965 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2966 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
2967 SPDK_MALLOC_DMA);
2968 if (!ctx->mask) {
2969 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2970 return;
2971 }
2972 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2973 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2974 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2975 _spdk_bs_load_used_clusters_cpl, ctx);
2976 }
2977
2978 static void
2979 _spdk_bs_load_read_used_pages(spdk_bs_sequence_t *seq, void *cb_arg)
2980 {
2981 struct spdk_bs_load_ctx *ctx = cb_arg;
2982 uint64_t lba, lba_count, mask_size;
2983
2984 /* Read the used pages mask */
2985 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2986 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL,
2987 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2988 if (!ctx->mask) {
2989 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2990 return;
2991 }
2992
2993 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2994 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2995 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2996 _spdk_bs_load_used_pages_cpl, ctx);
2997 }
2998
2999 static int
3000 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
3001 {
3002 struct spdk_blob_md_descriptor *desc;
3003 size_t cur_desc = 0;
3004
3005 desc = (struct spdk_blob_md_descriptor *)page->descriptors;
3006 while (cur_desc < sizeof(page->descriptors)) {
3007 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
3008 if (desc->length == 0) {
3009 /* If padding and length are 0, this terminates the page */
3010 break;
3011 }
3012 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
3013 struct spdk_blob_md_descriptor_extent *desc_extent;
3014 unsigned int i, j;
3015 unsigned int cluster_count = 0;
3016 uint32_t cluster_idx;
3017
3018 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
3019
3020 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
3021 for (j = 0; j < desc_extent->extents[i].length; j++) {
3022 cluster_idx = desc_extent->extents[i].cluster_idx;
3023 /*
3024 * cluster_idx = 0 means an unallocated cluster - don't mark that
3025 * in the used cluster map.
3026 */
3027 if (cluster_idx != 0) {
3028 spdk_bit_array_set(bs->used_clusters, cluster_idx + j);
3029 if (bs->num_free_clusters == 0) {
3030 return -ENOSPC;
3031 }
3032 bs->num_free_clusters--;
3033 }
3034 cluster_count++;
3035 }
3036 }
3037 if (cluster_count == 0) {
3038 return -EINVAL;
3039 }
3040 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
3041 /* Skip this item */
3042 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
3043 /* Skip this item */
3044 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
3045 /* Skip this item */
3046 } else {
3047 /* Error */
3048 return -EINVAL;
3049 }
3050 /* Advance to the next descriptor */
3051 cur_desc += sizeof(*desc) + desc->length;
3052 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
3053 break;
3054 }
3055 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
3056 }
3057 return 0;
3058 }
3059
3060 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
3061 {
3062 uint32_t crc;
3063
3064 crc = _spdk_blob_md_page_calc_crc(ctx->page);
3065 if (crc != ctx->page->crc) {
3066 return false;
3067 }
3068
3069 if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
3070 return false;
3071 }
3072 return true;
3073 }
3074
3075 static void
3076 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
3077
3078 static void
3079 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3080 {
3081 struct spdk_bs_load_ctx *ctx = cb_arg;
3082
3083 _spdk_bs_load_complete(seq, ctx, bserrno);
3084 }
3085
3086 static void
3087 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3088 {
3089 struct spdk_bs_load_ctx *ctx = cb_arg;
3090
3091 spdk_free(ctx->mask);
3092 ctx->mask = NULL;
3093
3094 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
3095 }
3096
3097 static void
3098 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3099 {
3100 struct spdk_bs_load_ctx *ctx = cb_arg;
3101
3102 spdk_free(ctx->mask);
3103 ctx->mask = NULL;
3104
3105 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl);
3106 }
3107
3108 static void
3109 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3110 {
3111 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
3112 }
3113
3114 static void
3115 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3116 {
3117 struct spdk_bs_load_ctx *ctx = cb_arg;
3118 uint64_t num_md_clusters;
3119 uint64_t i;
3120 uint32_t page_num;
3121
3122 if (bserrno != 0) {
3123 _spdk_bs_load_ctx_fail(seq, ctx, bserrno);
3124 return;
3125 }
3126
3127 page_num = ctx->cur_page;
3128 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
3129 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
3130 spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
3131 if (ctx->page->sequence_num == 0) {
3132 spdk_bit_array_set(ctx->bs->used_blobids, page_num);
3133 }
3134 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
3135 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3136 return;
3137 }
3138 if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
3139 ctx->in_page_chain = true;
3140 ctx->cur_page = ctx->page->next;
3141 _spdk_bs_load_replay_cur_md_page(seq, cb_arg);
3142 return;
3143 }
3144 }
3145 }
3146
3147 ctx->in_page_chain = false;
3148
3149 do {
3150 ctx->page_index++;
3151 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
3152
3153 if (ctx->page_index < ctx->super->md_len) {
3154 ctx->cur_page = ctx->page_index;
3155 _spdk_bs_load_replay_cur_md_page(seq, cb_arg);
3156 } else {
3157 /* Claim all of the clusters used by the metadata */
3158 num_md_clusters = spdk_divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster);
3159 for (i = 0; i < num_md_clusters; i++) {
3160 _spdk_bs_claim_cluster(ctx->bs, i);
3161 }
3162 spdk_free(ctx->page);
3163 _spdk_bs_load_write_used_md(seq, ctx, bserrno);
3164 }
3165 }
3166
3167 static void
3168 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
3169 {
3170 struct spdk_bs_load_ctx *ctx = cb_arg;
3171 uint64_t lba;
3172
3173 assert(ctx->cur_page < ctx->super->md_len);
3174 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
3175 spdk_bs_sequence_read_dev(seq, ctx->page, lba,
3176 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
3177 _spdk_bs_load_replay_md_cpl, ctx);
3178 }
3179
3180 static void
3181 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
3182 {
3183 struct spdk_bs_load_ctx *ctx = cb_arg;
3184
3185 ctx->page_index = 0;
3186 ctx->cur_page = 0;
3187 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE,
3188 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3189 if (!ctx->page) {
3190 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3191 return;
3192 }
3193 _spdk_bs_load_replay_cur_md_page(seq, cb_arg);
3194 }
3195
3196 static void
3197 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg)
3198 {
3199 struct spdk_bs_load_ctx *ctx = cb_arg;
3200 int rc;
3201
3202 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
3203 if (rc < 0) {
3204 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3205 return;
3206 }
3207
3208 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
3209 if (rc < 0) {
3210 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3211 return;
3212 }
3213
3214 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
3215 if (rc < 0) {
3216 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3217 return;
3218 }
3219
3220 ctx->bs->num_free_clusters = ctx->bs->total_clusters;
3221 _spdk_bs_load_replay_md(seq, cb_arg);
3222 }
3223
3224 static void
3225 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3226 {
3227 struct spdk_bs_load_ctx *ctx = cb_arg;
3228 uint32_t crc;
3229 int rc;
3230 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
3231
3232 if (ctx->super->version > SPDK_BS_VERSION ||
3233 ctx->super->version < SPDK_BS_INITIAL_VERSION) {
3234 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3235 return;
3236 }
3237
3238 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3239 sizeof(ctx->super->signature)) != 0) {
3240 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3241 return;
3242 }
3243
3244 crc = _spdk_blob_md_page_calc_crc(ctx->super);
3245 if (crc != ctx->super->crc) {
3246 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3247 return;
3248 }
3249
3250 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
3251 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
3252 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
3253 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
3254 } else {
3255 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
3256 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
3257 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
3258 _spdk_bs_load_ctx_fail(seq, ctx, -ENXIO);
3259 return;
3260 }
3261
3262 if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) {
3263 SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n",
3264 ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size);
3265 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3266 return;
3267 }
3268
3269 if (ctx->super->size == 0) {
3270 ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen;
3271 }
3272
3273 if (ctx->super->io_unit_size == 0) {
3274 ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE;
3275 }
3276
3277 /* Parse the super block */
3278 ctx->bs->clean = 1;
3279 ctx->bs->cluster_sz = ctx->super->cluster_size;
3280 ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size;
3281 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
3282 ctx->bs->io_unit_size = ctx->super->io_unit_size;
3283 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
3284 if (rc < 0) {
3285 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3286 return;
3287 }
3288 ctx->bs->md_start = ctx->super->md_start;
3289 ctx->bs->md_len = ctx->super->md_len;
3290 ctx->bs->total_data_clusters = ctx->bs->total_clusters - spdk_divide_round_up(
3291 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
3292 ctx->bs->super_blob = ctx->super->super_blob;
3293 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
3294
3295 if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) {
3296 _spdk_bs_recover(seq, ctx);
3297 } else {
3298 _spdk_bs_load_read_used_pages(seq, ctx);
3299 }
3300 }
3301
3302 void
3303 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
3304 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
3305 {
3306 struct spdk_blob_store *bs;
3307 struct spdk_bs_cpl cpl;
3308 spdk_bs_sequence_t *seq;
3309 struct spdk_bs_load_ctx *ctx;
3310 struct spdk_bs_opts opts = {};
3311 int err;
3312
3313 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
3314
3315 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
3316 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen);
3317 dev->destroy(dev);
3318 cb_fn(cb_arg, NULL, -EINVAL);
3319 return;
3320 }
3321
3322 if (o) {
3323 opts = *o;
3324 } else {
3325 spdk_bs_opts_init(&opts);
3326 }
3327
3328 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
3329 dev->destroy(dev);
3330 cb_fn(cb_arg, NULL, -EINVAL);
3331 return;
3332 }
3333
3334 err = _spdk_bs_alloc(dev, &opts, &bs);
3335 if (err) {
3336 dev->destroy(dev);
3337 cb_fn(cb_arg, NULL, err);
3338 return;
3339 }
3340
3341 ctx = calloc(1, sizeof(*ctx));
3342 if (!ctx) {
3343 _spdk_bs_free(bs);
3344 cb_fn(cb_arg, NULL, -ENOMEM);
3345 return;
3346 }
3347
3348 ctx->bs = bs;
3349 ctx->iter_cb_fn = opts.iter_cb_fn;
3350 ctx->iter_cb_arg = opts.iter_cb_arg;
3351
3352 /* Allocate memory for the super block */
3353 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
3354 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3355 if (!ctx->super) {
3356 free(ctx);
3357 _spdk_bs_free(bs);
3358 cb_fn(cb_arg, NULL, -ENOMEM);
3359 return;
3360 }
3361
3362 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3363 cpl.u.bs_handle.cb_fn = cb_fn;
3364 cpl.u.bs_handle.cb_arg = cb_arg;
3365 cpl.u.bs_handle.bs = bs;
3366
3367 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3368 if (!seq) {
3369 spdk_free(ctx->super);
3370 free(ctx);
3371 _spdk_bs_free(bs);
3372 cb_fn(cb_arg, NULL, -ENOMEM);
3373 return;
3374 }
3375
3376 /* Read the super block */
3377 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3378 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3379 _spdk_bs_load_super_cpl, ctx);
3380 }
3381
3382 /* END spdk_bs_load */
3383
3384 /* START spdk_bs_dump */
3385
3386 struct spdk_bs_dump_ctx {
3387 struct spdk_blob_store *bs;
3388 struct spdk_bs_super_block *super;
3389 uint32_t cur_page;
3390 struct spdk_blob_md_page *page;
3391 spdk_bs_sequence_t *seq;
3392 FILE *fp;
3393 spdk_bs_dump_print_xattr print_xattr_fn;
3394 char xattr_name[4096];
3395 };
3396
3397 static void
3398 _spdk_bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno)
3399 {
3400 spdk_free(ctx->super);
3401
3402 /*
3403 * We need to defer calling spdk_bs_call_cpl() until after
3404 * dev destruction, so tuck these away for later use.
3405 */
3406 ctx->bs->unload_err = bserrno;
3407 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3408 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3409
3410 spdk_bs_sequence_finish(seq, 0);
3411 _spdk_bs_free(ctx->bs);
3412 free(ctx);
3413 }
3414
3415 static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
3416
3417 static void
3418 _spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx)
3419 {
3420 uint32_t page_idx = ctx->cur_page;
3421 struct spdk_blob_md_page *page = ctx->page;
3422 struct spdk_blob_md_descriptor *desc;
3423 size_t cur_desc = 0;
3424 uint32_t crc;
3425
3426 fprintf(ctx->fp, "=========\n");
3427 fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx);
3428 fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id);
3429
3430 crc = _spdk_blob_md_page_calc_crc(page);
3431 fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch");
3432
3433 desc = (struct spdk_blob_md_descriptor *)page->descriptors;
3434 while (cur_desc < sizeof(page->descriptors)) {
3435 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
3436 if (desc->length == 0) {
3437 /* If padding and length are 0, this terminates the page */
3438 break;
3439 }
3440 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
3441 struct spdk_blob_md_descriptor_extent *desc_extent;
3442 unsigned int i;
3443
3444 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
3445
3446 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
3447 if (desc_extent->extents[i].cluster_idx != 0) {
3448 fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32,
3449 desc_extent->extents[i].cluster_idx);
3450 } else {
3451 fprintf(ctx->fp, "Unallocated Extent - ");
3452 }
3453 fprintf(ctx->fp, " Length: %" PRIu32, desc_extent->extents[i].length);
3454 fprintf(ctx->fp, "\n");
3455 }
3456 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
3457 struct spdk_blob_md_descriptor_xattr *desc_xattr;
3458 uint32_t i;
3459
3460 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
3461
3462 if (desc_xattr->length !=
3463 sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) +
3464 desc_xattr->name_length + desc_xattr->value_length) {
3465 }
3466
3467 memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length);
3468 ctx->xattr_name[desc_xattr->name_length] = '\0';
3469 fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name);
3470 fprintf(ctx->fp, " value = \"");
3471 ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name,
3472 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
3473 desc_xattr->value_length);
3474 fprintf(ctx->fp, "\"\n");
3475 for (i = 0; i < desc_xattr->value_length; i++) {
3476 if (i % 16 == 0) {
3477 fprintf(ctx->fp, " ");
3478 }
3479 fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i));
3480 if ((i + 1) % 16 == 0) {
3481 fprintf(ctx->fp, "\n");
3482 }
3483 }
3484 if (i % 16 != 0) {
3485 fprintf(ctx->fp, "\n");
3486 }
3487 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
3488 /* TODO */
3489 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
3490 /* TODO */
3491 } else {
3492 /* Error */
3493 }
3494 /* Advance to the next descriptor */
3495 cur_desc += sizeof(*desc) + desc->length;
3496 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
3497 break;
3498 }
3499 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
3500 }
3501 }
3502
3503 static void
3504 _spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3505 {
3506 struct spdk_bs_dump_ctx *ctx = cb_arg;
3507
3508 if (bserrno != 0) {
3509 _spdk_bs_dump_finish(seq, ctx, bserrno);
3510 return;
3511 }
3512
3513 if (ctx->page->id != 0) {
3514 _spdk_bs_dump_print_md_page(ctx);
3515 }
3516
3517 ctx->cur_page++;
3518
3519 if (ctx->cur_page < ctx->super->md_len) {
3520 _spdk_bs_dump_read_md_page(seq, cb_arg);
3521 } else {
3522 spdk_free(ctx->page);
3523 _spdk_bs_dump_finish(seq, ctx, 0);
3524 }
3525 }
3526
3527 static void
3528 _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
3529 {
3530 struct spdk_bs_dump_ctx *ctx = cb_arg;
3531 uint64_t lba;
3532
3533 assert(ctx->cur_page < ctx->super->md_len);
3534 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
3535 spdk_bs_sequence_read_dev(seq, ctx->page, lba,
3536 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
3537 _spdk_bs_dump_read_md_page_cpl, ctx);
3538 }
3539
3540 static void
3541 _spdk_bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3542 {
3543 struct spdk_bs_dump_ctx *ctx = cb_arg;
3544
3545 fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature);
3546 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3547 sizeof(ctx->super->signature)) != 0) {
3548 fprintf(ctx->fp, "(Mismatch)\n");
3549 _spdk_bs_dump_finish(seq, ctx, bserrno);
3550 return;
3551 } else {
3552 fprintf(ctx->fp, "(OK)\n");
3553 }
3554 fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version);
3555 fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc,
3556 (ctx->super->crc == _spdk_blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch");
3557 fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype);
3558 fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size);
3559 fprintf(ctx->fp, "Super Blob ID: ");
3560 if (ctx->super->super_blob == SPDK_BLOBID_INVALID) {
3561 fprintf(ctx->fp, "(None)\n");
3562 } else {
3563 fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob);
3564 }
3565 fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean);
3566 fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start);
3567 fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len);
3568 fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start);
3569 fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len);
3570 fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start);
3571 fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len);
3572 fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start);
3573 fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len);
3574
3575 ctx->cur_page = 0;
3576 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE,
3577 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3578 if (!ctx->page) {
3579 _spdk_bs_dump_finish(seq, ctx, -ENOMEM);
3580 return;
3581 }
3582 _spdk_bs_dump_read_md_page(seq, cb_arg);
3583 }
3584
3585 void
3586 spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn,
3587 spdk_bs_op_complete cb_fn, void *cb_arg)
3588 {
3589 struct spdk_blob_store *bs;
3590 struct spdk_bs_cpl cpl;
3591 spdk_bs_sequence_t *seq;
3592 struct spdk_bs_dump_ctx *ctx;
3593 struct spdk_bs_opts opts = {};
3594 int err;
3595
3596 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev);
3597
3598 spdk_bs_opts_init(&opts);
3599
3600 err = _spdk_bs_alloc(dev, &opts, &bs);
3601 if (err) {
3602 dev->destroy(dev);
3603 cb_fn(cb_arg, err);
3604 return;
3605 }
3606
3607 ctx = calloc(1, sizeof(*ctx));
3608 if (!ctx) {
3609 _spdk_bs_free(bs);
3610 cb_fn(cb_arg, -ENOMEM);
3611 return;
3612 }
3613
3614 ctx->bs = bs;
3615 ctx->fp = fp;
3616 ctx->print_xattr_fn = print_xattr_fn;
3617
3618 /* Allocate memory for the super block */
3619 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
3620 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3621 if (!ctx->super) {
3622 free(ctx);
3623 _spdk_bs_free(bs);
3624 cb_fn(cb_arg, -ENOMEM);
3625 return;
3626 }
3627
3628 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3629 cpl.u.bs_basic.cb_fn = cb_fn;
3630 cpl.u.bs_basic.cb_arg = cb_arg;
3631
3632 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3633 if (!seq) {
3634 spdk_free(ctx->super);
3635 free(ctx);
3636 _spdk_bs_free(bs);
3637 cb_fn(cb_arg, -ENOMEM);
3638 return;
3639 }
3640
3641 /* Read the super block */
3642 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3643 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3644 _spdk_bs_dump_super_cpl, ctx);
3645 }
3646
3647 /* END spdk_bs_dump */
3648
3649 /* START spdk_bs_init */
3650
3651 struct spdk_bs_init_ctx {
3652 struct spdk_blob_store *bs;
3653 struct spdk_bs_super_block *super;
3654 };
3655
3656 static void
3657 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3658 {
3659 struct spdk_bs_init_ctx *ctx = cb_arg;
3660
3661 spdk_free(ctx->super);
3662 free(ctx);
3663
3664 spdk_bs_sequence_finish(seq, bserrno);
3665 }
3666
3667 static void
3668 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3669 {
3670 struct spdk_bs_init_ctx *ctx = cb_arg;
3671
3672 /* Write super block */
3673 spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
3674 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
3675 _spdk_bs_init_persist_super_cpl, ctx);
3676 }
3677
3678 void
3679 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
3680 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
3681 {
3682 struct spdk_bs_init_ctx *ctx;
3683 struct spdk_blob_store *bs;
3684 struct spdk_bs_cpl cpl;
3685 spdk_bs_sequence_t *seq;
3686 spdk_bs_batch_t *batch;
3687 uint64_t num_md_lba;
3688 uint64_t num_md_pages;
3689 uint64_t num_md_clusters;
3690 uint32_t i;
3691 struct spdk_bs_opts opts = {};
3692 int rc;
3693
3694 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
3695
3696 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
3697 SPDK_ERRLOG("unsupported dev block length of %d\n",
3698 dev->blocklen);
3699 dev->destroy(dev);
3700 cb_fn(cb_arg, NULL, -EINVAL);
3701 return;
3702 }
3703
3704 if (o) {
3705 opts = *o;
3706 } else {
3707 spdk_bs_opts_init(&opts);
3708 }
3709
3710 if (_spdk_bs_opts_verify(&opts) != 0) {
3711 dev->destroy(dev);
3712 cb_fn(cb_arg, NULL, -EINVAL);
3713 return;
3714 }
3715
3716 rc = _spdk_bs_alloc(dev, &opts, &bs);
3717 if (rc) {
3718 dev->destroy(dev);
3719 cb_fn(cb_arg, NULL, rc);
3720 return;
3721 }
3722
3723 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
3724 /* By default, allocate 1 page per cluster.
3725 * Technically, this over-allocates metadata
3726 * because more metadata will reduce the number
3727 * of usable clusters. This can be addressed with
3728 * more complex math in the future.
3729 */
3730 bs->md_len = bs->total_clusters;
3731 } else {
3732 bs->md_len = opts.num_md_pages;
3733 }
3734 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
3735 if (rc < 0) {
3736 _spdk_bs_free(bs);
3737 cb_fn(cb_arg, NULL, -ENOMEM);
3738 return;
3739 }
3740
3741 rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
3742 if (rc < 0) {
3743 _spdk_bs_free(bs);
3744 cb_fn(cb_arg, NULL, -ENOMEM);
3745 return;
3746 }
3747
3748 ctx = calloc(1, sizeof(*ctx));
3749 if (!ctx) {
3750 _spdk_bs_free(bs);
3751 cb_fn(cb_arg, NULL, -ENOMEM);
3752 return;
3753 }
3754
3755 ctx->bs = bs;
3756
3757 /* Allocate memory for the super block */
3758 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
3759 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3760 if (!ctx->super) {
3761 free(ctx);
3762 _spdk_bs_free(bs);
3763 cb_fn(cb_arg, NULL, -ENOMEM);
3764 return;
3765 }
3766 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3767 sizeof(ctx->super->signature));
3768 ctx->super->version = SPDK_BS_VERSION;
3769 ctx->super->length = sizeof(*ctx->super);
3770 ctx->super->super_blob = bs->super_blob;
3771 ctx->super->clean = 0;
3772 ctx->super->cluster_size = bs->cluster_sz;
3773 ctx->super->io_unit_size = bs->io_unit_size;
3774 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
3775
3776 /* Calculate how many pages the metadata consumes at the front
3777 * of the disk.
3778 */
3779
3780 /* The super block uses 1 page */
3781 num_md_pages = 1;
3782
3783 /* The used_md_pages mask requires 1 bit per metadata page, rounded
3784 * up to the nearest page, plus a header.
3785 */
3786 ctx->super->used_page_mask_start = num_md_pages;
3787 ctx->super->used_page_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) +
3788 spdk_divide_round_up(bs->md_len, 8),
3789 SPDK_BS_PAGE_SIZE);
3790 num_md_pages += ctx->super->used_page_mask_len;
3791
3792 /* The used_clusters mask requires 1 bit per cluster, rounded
3793 * up to the nearest page, plus a header.
3794 */
3795 ctx->super->used_cluster_mask_start = num_md_pages;
3796 ctx->super->used_cluster_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) +
3797 spdk_divide_round_up(bs->total_clusters, 8),
3798 SPDK_BS_PAGE_SIZE);
3799 num_md_pages += ctx->super->used_cluster_mask_len;
3800
3801 /* The used_blobids mask requires 1 bit per metadata page, rounded
3802 * up to the nearest page, plus a header.
3803 */
3804 ctx->super->used_blobid_mask_start = num_md_pages;
3805 ctx->super->used_blobid_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) +
3806 spdk_divide_round_up(bs->md_len, 8),
3807 SPDK_BS_PAGE_SIZE);
3808 num_md_pages += ctx->super->used_blobid_mask_len;
3809
3810 /* The metadata region size was chosen above */
3811 ctx->super->md_start = bs->md_start = num_md_pages;
3812 ctx->super->md_len = bs->md_len;
3813 num_md_pages += bs->md_len;
3814
3815 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
3816
3817 ctx->super->size = dev->blockcnt * dev->blocklen;
3818
3819 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
3820
3821 num_md_clusters = spdk_divide_round_up(num_md_pages, bs->pages_per_cluster);
3822 if (num_md_clusters > bs->total_clusters) {
3823 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
3824 "please decrease number of pages reserved for metadata "
3825 "or increase cluster size.\n");
3826 spdk_free(ctx->super);
3827 free(ctx);
3828 _spdk_bs_free(bs);
3829 cb_fn(cb_arg, NULL, -ENOMEM);
3830 return;
3831 }
3832 /* Claim all of the clusters used by the metadata */
3833 for (i = 0; i < num_md_clusters; i++) {
3834 _spdk_bs_claim_cluster(bs, i);
3835 }
3836
3837 bs->total_data_clusters = bs->num_free_clusters;
3838
3839 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3840 cpl.u.bs_handle.cb_fn = cb_fn;
3841 cpl.u.bs_handle.cb_arg = cb_arg;
3842 cpl.u.bs_handle.bs = bs;
3843
3844 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3845 if (!seq) {
3846 spdk_free(ctx->super);
3847 free(ctx);
3848 _spdk_bs_free(bs);
3849 cb_fn(cb_arg, NULL, -ENOMEM);
3850 return;
3851 }
3852
3853 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
3854
3855 /* Clear metadata space */
3856 spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba);
3857
3858 if (opts.clear_method == BS_CLEAR_WITH_UNMAP) {
3859 /* Trim data clusters */
3860 spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
3861 } else if (opts.clear_method == BS_CLEAR_WITH_WRITE_ZEROES) {
3862 /* Write_zeroes to data clusters */
3863 spdk_bs_batch_write_zeroes_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
3864 }
3865
3866 spdk_bs_batch_close(batch);
3867 }
3868
3869 /* END spdk_bs_init */
3870
3871 /* START spdk_bs_destroy */
3872
3873 static void
3874 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3875 {
3876 struct spdk_bs_init_ctx *ctx = cb_arg;
3877 struct spdk_blob_store *bs = ctx->bs;
3878
3879 /*
3880 * We need to defer calling spdk_bs_call_cpl() until after
3881 * dev destruction, so tuck these away for later use.
3882 */
3883 bs->unload_err = bserrno;
3884 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3885 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3886
3887 spdk_bs_sequence_finish(seq, bserrno);
3888
3889 _spdk_bs_free(bs);
3890 free(ctx);
3891 }
3892
3893 void
3894 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
3895 void *cb_arg)
3896 {
3897 struct spdk_bs_cpl cpl;
3898 spdk_bs_sequence_t *seq;
3899 struct spdk_bs_init_ctx *ctx;
3900
3901 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
3902
3903 if (!TAILQ_EMPTY(&bs->blobs)) {
3904 SPDK_ERRLOG("Blobstore still has open blobs\n");
3905 cb_fn(cb_arg, -EBUSY);
3906 return;
3907 }
3908
3909 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3910 cpl.u.bs_basic.cb_fn = cb_fn;
3911 cpl.u.bs_basic.cb_arg = cb_arg;
3912
3913 ctx = calloc(1, sizeof(*ctx));
3914 if (!ctx) {
3915 cb_fn(cb_arg, -ENOMEM);
3916 return;
3917 }
3918
3919 ctx->bs = bs;
3920
3921 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3922 if (!seq) {
3923 free(ctx);
3924 cb_fn(cb_arg, -ENOMEM);
3925 return;
3926 }
3927
3928 /* Write zeroes to the super block */
3929 spdk_bs_sequence_write_zeroes_dev(seq,
3930 _spdk_bs_page_to_lba(bs, 0),
3931 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
3932 _spdk_bs_destroy_trim_cpl, ctx);
3933 }
3934
3935 /* END spdk_bs_destroy */
3936
3937 /* START spdk_bs_unload */
3938
3939 static void
3940 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3941 {
3942 struct spdk_bs_load_ctx *ctx = cb_arg;
3943
3944 spdk_free(ctx->super);
3945
3946 /*
3947 * We need to defer calling spdk_bs_call_cpl() until after
3948 * dev destruction, so tuck these away for later use.
3949 */
3950 ctx->bs->unload_err = bserrno;
3951 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3952 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3953
3954 spdk_bs_sequence_finish(seq, bserrno);
3955
3956 _spdk_bs_free(ctx->bs);
3957 free(ctx);
3958 }
3959
3960 static void
3961 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3962 {
3963 struct spdk_bs_load_ctx *ctx = cb_arg;
3964
3965 spdk_free(ctx->mask);
3966 ctx->super->clean = 1;
3967
3968 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
3969 }
3970
3971 static void
3972 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3973 {
3974 struct spdk_bs_load_ctx *ctx = cb_arg;
3975
3976 spdk_free(ctx->mask);
3977 ctx->mask = NULL;
3978
3979 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
3980 }
3981
3982 static void
3983 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3984 {
3985 struct spdk_bs_load_ctx *ctx = cb_arg;
3986
3987 spdk_free(ctx->mask);
3988 ctx->mask = NULL;
3989
3990 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl);
3991 }
3992
3993 static void
3994 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3995 {
3996 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
3997 }
3998
3999 void
4000 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
4001 {
4002 struct spdk_bs_cpl cpl;
4003 spdk_bs_sequence_t *seq;
4004 struct spdk_bs_load_ctx *ctx;
4005
4006 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
4007
4008 if (!TAILQ_EMPTY(&bs->blobs)) {
4009 SPDK_ERRLOG("Blobstore still has open blobs\n");
4010 cb_fn(cb_arg, -EBUSY);
4011 return;
4012 }
4013
4014 ctx = calloc(1, sizeof(*ctx));
4015 if (!ctx) {
4016 cb_fn(cb_arg, -ENOMEM);
4017 return;
4018 }
4019
4020 ctx->bs = bs;
4021
4022 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
4023 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
4024 if (!ctx->super) {
4025 free(ctx);
4026 cb_fn(cb_arg, -ENOMEM);
4027 return;
4028 }
4029
4030 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
4031 cpl.u.bs_basic.cb_fn = cb_fn;
4032 cpl.u.bs_basic.cb_arg = cb_arg;
4033
4034 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4035 if (!seq) {
4036 spdk_free(ctx->super);
4037 free(ctx);
4038 cb_fn(cb_arg, -ENOMEM);
4039 return;
4040 }
4041
4042 /* Read super block */
4043 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
4044 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
4045 _spdk_bs_unload_read_super_cpl, ctx);
4046 }
4047
4048 /* END spdk_bs_unload */
4049
4050 /* START spdk_bs_set_super */
4051
4052 struct spdk_bs_set_super_ctx {
4053 struct spdk_blob_store *bs;
4054 struct spdk_bs_super_block *super;
4055 };
4056
4057 static void
4058 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4059 {
4060 struct spdk_bs_set_super_ctx *ctx = cb_arg;
4061
4062 if (bserrno != 0) {
4063 SPDK_ERRLOG("Unable to write to super block of blobstore\n");
4064 }
4065
4066 spdk_free(ctx->super);
4067
4068 spdk_bs_sequence_finish(seq, bserrno);
4069
4070 free(ctx);
4071 }
4072
4073 static void
4074 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4075 {
4076 struct spdk_bs_set_super_ctx *ctx = cb_arg;
4077
4078 if (bserrno != 0) {
4079 SPDK_ERRLOG("Unable to read super block of blobstore\n");
4080 spdk_free(ctx->super);
4081 spdk_bs_sequence_finish(seq, bserrno);
4082 free(ctx);
4083 return;
4084 }
4085
4086 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx);
4087 }
4088
4089 void
4090 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
4091 spdk_bs_op_complete cb_fn, void *cb_arg)
4092 {
4093 struct spdk_bs_cpl cpl;
4094 spdk_bs_sequence_t *seq;
4095 struct spdk_bs_set_super_ctx *ctx;
4096
4097 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n");
4098
4099 ctx = calloc(1, sizeof(*ctx));
4100 if (!ctx) {
4101 cb_fn(cb_arg, -ENOMEM);
4102 return;
4103 }
4104
4105 ctx->bs = bs;
4106
4107 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
4108 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
4109 if (!ctx->super) {
4110 free(ctx);
4111 cb_fn(cb_arg, -ENOMEM);
4112 return;
4113 }
4114
4115 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
4116 cpl.u.bs_basic.cb_fn = cb_fn;
4117 cpl.u.bs_basic.cb_arg = cb_arg;
4118
4119 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4120 if (!seq) {
4121 spdk_free(ctx->super);
4122 free(ctx);
4123 cb_fn(cb_arg, -ENOMEM);
4124 return;
4125 }
4126
4127 bs->super_blob = blobid;
4128
4129 /* Read super block */
4130 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
4131 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
4132 _spdk_bs_set_super_read_cpl, ctx);
4133 }
4134
4135 /* END spdk_bs_set_super */
4136
4137 void
4138 spdk_bs_get_super(struct spdk_blob_store *bs,
4139 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4140 {
4141 if (bs->super_blob == SPDK_BLOBID_INVALID) {
4142 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
4143 } else {
4144 cb_fn(cb_arg, bs->super_blob, 0);
4145 }
4146 }
4147
4148 uint64_t
4149 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
4150 {
4151 return bs->cluster_sz;
4152 }
4153
4154 uint64_t
4155 spdk_bs_get_page_size(struct spdk_blob_store *bs)
4156 {
4157 return SPDK_BS_PAGE_SIZE;
4158 }
4159
4160 uint64_t
4161 spdk_bs_get_io_unit_size(struct spdk_blob_store *bs)
4162 {
4163 return bs->io_unit_size;
4164 }
4165
4166 uint64_t
4167 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
4168 {
4169 return bs->num_free_clusters;
4170 }
4171
4172 uint64_t
4173 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
4174 {
4175 return bs->total_data_clusters;
4176 }
4177
4178 static int
4179 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
4180 {
4181 bs->md_channel = spdk_get_io_channel(bs);
4182 if (!bs->md_channel) {
4183 SPDK_ERRLOG("Failed to get IO channel.\n");
4184 return -1;
4185 }
4186
4187 return 0;
4188 }
4189
4190 static int
4191 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
4192 {
4193 spdk_put_io_channel(bs->md_channel);
4194
4195 return 0;
4196 }
4197
4198 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
4199 {
4200 assert(blob != NULL);
4201
4202 return blob->id;
4203 }
4204
4205 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
4206 {
4207 assert(blob != NULL);
4208
4209 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
4210 }
4211
4212 uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob)
4213 {
4214 assert(blob != NULL);
4215
4216 return spdk_blob_get_num_pages(blob) * _spdk_bs_io_unit_per_page(blob->bs);
4217 }
4218
4219 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
4220 {
4221 assert(blob != NULL);
4222
4223 return blob->active.num_clusters;
4224 }
4225
4226 /* START spdk_bs_create_blob */
4227
4228 static void
4229 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4230 {
4231 struct spdk_blob *blob = cb_arg;
4232
4233 _spdk_blob_free(blob);
4234
4235 spdk_bs_sequence_finish(seq, bserrno);
4236 }
4237
4238 static int
4239 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs,
4240 bool internal)
4241 {
4242 uint64_t i;
4243 size_t value_len = 0;
4244 int rc;
4245 const void *value = NULL;
4246 if (xattrs->count > 0 && xattrs->get_value == NULL) {
4247 return -EINVAL;
4248 }
4249 for (i = 0; i < xattrs->count; i++) {
4250 xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len);
4251 if (value == NULL || value_len == 0) {
4252 return -EINVAL;
4253 }
4254 rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal);
4255 if (rc < 0) {
4256 return rc;
4257 }
4258 }
4259 return 0;
4260 }
4261
4262 static void
4263 _spdk_bs_create_blob(struct spdk_blob_store *bs,
4264 const struct spdk_blob_opts *opts,
4265 const struct spdk_blob_xattr_opts *internal_xattrs,
4266 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4267 {
4268 struct spdk_blob *blob;
4269 uint32_t page_idx;
4270 struct spdk_bs_cpl cpl;
4271 struct spdk_blob_opts opts_default;
4272 struct spdk_blob_xattr_opts internal_xattrs_default;
4273 spdk_bs_sequence_t *seq;
4274 spdk_blob_id id;
4275 int rc;
4276
4277 assert(spdk_get_thread() == bs->md_thread);
4278
4279 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
4280 if (page_idx == UINT32_MAX) {
4281 cb_fn(cb_arg, 0, -ENOMEM);
4282 return;
4283 }
4284 spdk_bit_array_set(bs->used_blobids, page_idx);
4285 spdk_bit_array_set(bs->used_md_pages, page_idx);
4286
4287 id = _spdk_bs_page_to_blobid(page_idx);
4288
4289 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
4290
4291 blob = _spdk_blob_alloc(bs, id);
4292 if (!blob) {
4293 cb_fn(cb_arg, 0, -ENOMEM);
4294 return;
4295 }
4296
4297 if (!opts) {
4298 spdk_blob_opts_init(&opts_default);
4299 opts = &opts_default;
4300 }
4301 if (!internal_xattrs) {
4302 _spdk_blob_xattrs_init(&internal_xattrs_default);
4303 internal_xattrs = &internal_xattrs_default;
4304 }
4305
4306 rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false);
4307 if (rc < 0) {
4308 _spdk_blob_free(blob);
4309 cb_fn(cb_arg, 0, rc);
4310 return;
4311 }
4312
4313 rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true);
4314 if (rc < 0) {
4315 _spdk_blob_free(blob);
4316 cb_fn(cb_arg, 0, rc);
4317 return;
4318 }
4319
4320 if (opts->thin_provision) {
4321 _spdk_blob_set_thin_provision(blob);
4322 }
4323
4324 rc = _spdk_blob_resize(blob, opts->num_clusters);
4325 if (rc < 0) {
4326 _spdk_blob_free(blob);
4327 cb_fn(cb_arg, 0, rc);
4328 return;
4329 }
4330 cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4331 cpl.u.blobid.cb_fn = cb_fn;
4332 cpl.u.blobid.cb_arg = cb_arg;
4333 cpl.u.blobid.blobid = blob->id;
4334
4335 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4336 if (!seq) {
4337 _spdk_blob_free(blob);
4338 cb_fn(cb_arg, 0, -ENOMEM);
4339 return;
4340 }
4341
4342 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
4343 }
4344
4345 void spdk_bs_create_blob(struct spdk_blob_store *bs,
4346 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4347 {
4348 _spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg);
4349 }
4350
4351 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
4352 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4353 {
4354 _spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg);
4355 }
4356
4357 /* END spdk_bs_create_blob */
4358
4359 /* START blob_cleanup */
4360
4361 struct spdk_clone_snapshot_ctx {
4362 struct spdk_bs_cpl cpl;
4363 int bserrno;
4364 bool frozen;
4365
4366 struct spdk_io_channel *channel;
4367
4368 /* Current cluster for inflate operation */
4369 uint64_t cluster;
4370
4371 /* For inflation force allocation of all unallocated clusters and remove
4372 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */
4373 bool allocate_all;
4374
4375 struct {
4376 spdk_blob_id id;
4377 struct spdk_blob *blob;
4378 } original;
4379 struct {
4380 spdk_blob_id id;
4381 struct spdk_blob *blob;
4382 } new;
4383
4384 /* xattrs specified for snapshot/clones only. They have no impact on
4385 * the original blobs xattrs. */
4386 const struct spdk_blob_xattr_opts *xattrs;
4387 };
4388
4389 static void
4390 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno)
4391 {
4392 struct spdk_clone_snapshot_ctx *ctx = cb_arg;
4393 struct spdk_bs_cpl *cpl = &ctx->cpl;
4394
4395 if (bserrno != 0) {
4396 if (ctx->bserrno != 0) {
4397 SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4398 } else {
4399 ctx->bserrno = bserrno;
4400 }
4401 }
4402
4403 switch (cpl->type) {
4404 case SPDK_BS_CPL_TYPE_BLOBID:
4405 cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno);
4406 break;
4407 case SPDK_BS_CPL_TYPE_BLOB_BASIC:
4408 cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno);
4409 break;
4410 default:
4411 SPDK_UNREACHABLE();
4412 break;
4413 }
4414
4415 free(ctx);
4416 }
4417
4418 static void
4419 _spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno)
4420 {
4421 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4422 struct spdk_blob *origblob = ctx->original.blob;
4423
4424 if (bserrno != 0) {
4425 if (ctx->bserrno != 0) {
4426 SPDK_ERRLOG("Unfreeze error %d\n", bserrno);
4427 } else {
4428 ctx->bserrno = bserrno;
4429 }
4430 }
4431
4432 ctx->original.id = origblob->id;
4433 origblob->locked_operation_in_progress = false;
4434
4435 spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4436 }
4437
4438 static void
4439 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno)
4440 {
4441 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4442 struct spdk_blob *origblob = ctx->original.blob;
4443
4444 if (bserrno != 0) {
4445 if (ctx->bserrno != 0) {
4446 SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4447 } else {
4448 ctx->bserrno = bserrno;
4449 }
4450 }
4451
4452 if (ctx->frozen) {
4453 /* Unfreeze any outstanding I/O */
4454 _spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx);
4455 } else {
4456 _spdk_bs_snapshot_unfreeze_cpl(ctx, 0);
4457 }
4458
4459 }
4460
4461 static void
4462 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno)
4463 {
4464 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4465 struct spdk_blob *newblob = ctx->new.blob;
4466
4467 if (bserrno != 0) {
4468 if (ctx->bserrno != 0) {
4469 SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4470 } else {
4471 ctx->bserrno = bserrno;
4472 }
4473 }
4474
4475 ctx->new.id = newblob->id;
4476 spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4477 }
4478
4479 /* END blob_cleanup */
4480
4481 /* START spdk_bs_create_snapshot */
4482
4483 static void
4484 _spdk_bs_snapshot_swap_cluster_maps(struct spdk_blob *blob1, struct spdk_blob *blob2)
4485 {
4486 uint64_t *cluster_temp;
4487
4488 cluster_temp = blob1->active.clusters;
4489 blob1->active.clusters = blob2->active.clusters;
4490 blob2->active.clusters = cluster_temp;
4491 }
4492
4493 static void
4494 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno)
4495 {
4496 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4497 struct spdk_blob *origblob = ctx->original.blob;
4498 struct spdk_blob *newblob = ctx->new.blob;
4499
4500 if (bserrno != 0) {
4501 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4502 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4503 return;
4504 }
4505
4506 /* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */
4507 bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true);
4508 if (bserrno != 0) {
4509 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4510 return;
4511 }
4512
4513 _spdk_bs_blob_list_add(ctx->original.blob);
4514
4515 spdk_blob_set_read_only(newblob);
4516
4517 /* sync snapshot metadata */
4518 spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, cb_arg);
4519 }
4520
4521 static void
4522 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno)
4523 {
4524 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4525 struct spdk_blob *origblob = ctx->original.blob;
4526 struct spdk_blob *newblob = ctx->new.blob;
4527
4528 if (bserrno != 0) {
4529 /* return cluster map back to original */
4530 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4531 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4532 return;
4533 }
4534
4535 /* Set internal xattr for snapshot id */
4536 bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true);
4537 if (bserrno != 0) {
4538 /* return cluster map back to original */
4539 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4540 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4541 return;
4542 }
4543
4544 _spdk_bs_blob_list_remove(origblob);
4545 origblob->parent_id = newblob->id;
4546
4547 /* Create new back_bs_dev for snapshot */
4548 origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob);
4549 if (origblob->back_bs_dev == NULL) {
4550 /* return cluster map back to original */
4551 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4552 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL);
4553 return;
4554 }
4555
4556 /* set clone blob as thin provisioned */
4557 _spdk_blob_set_thin_provision(origblob);
4558
4559 _spdk_bs_blob_list_add(newblob);
4560
4561 /* sync clone metadata */
4562 spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx);
4563 }
4564
4565 static void
4566 _spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc)
4567 {
4568 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4569 struct spdk_blob *origblob = ctx->original.blob;
4570 struct spdk_blob *newblob = ctx->new.blob;
4571 int bserrno;
4572
4573 if (rc != 0) {
4574 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, rc);
4575 return;
4576 }
4577
4578 ctx->frozen = true;
4579
4580 /* set new back_bs_dev for snapshot */
4581 newblob->back_bs_dev = origblob->back_bs_dev;
4582 /* Set invalid flags from origblob */
4583 newblob->invalid_flags = origblob->invalid_flags;
4584
4585 /* inherit parent from original blob if set */
4586 newblob->parent_id = origblob->parent_id;
4587 if (origblob->parent_id != SPDK_BLOBID_INVALID) {
4588 /* Set internal xattr for snapshot id */
4589 bserrno = _spdk_blob_set_xattr(newblob, BLOB_SNAPSHOT,
4590 &origblob->parent_id, sizeof(spdk_blob_id), true);
4591 if (bserrno != 0) {
4592 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4593 return;
4594 }
4595 }
4596
4597 /* swap cluster maps */
4598 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4599
4600 /* sync snapshot metadata */
4601 spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx);
4602 }
4603
4604 static void
4605 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4606 {
4607 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4608 struct spdk_blob *origblob = ctx->original.blob;
4609 struct spdk_blob *newblob = _blob;
4610
4611 if (bserrno != 0) {
4612 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4613 return;
4614 }
4615
4616 ctx->new.blob = newblob;
4617
4618 /* Zero out newblob cluster map */
4619 memset(newblob->active.clusters, 0,
4620 newblob->active.num_clusters * sizeof(newblob->active.clusters));
4621
4622 _spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx);
4623 }
4624
4625 static void
4626 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
4627 {
4628 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4629 struct spdk_blob *origblob = ctx->original.blob;
4630
4631 if (bserrno != 0) {
4632 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4633 return;
4634 }
4635
4636 ctx->new.id = blobid;
4637 ctx->cpl.u.blobid.blobid = blobid;
4638
4639 spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx);
4640 }
4641
4642
4643 static void
4644 _spdk_bs_xattr_snapshot(void *arg, const char *name,
4645 const void **value, size_t *value_len)
4646 {
4647 assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0);
4648
4649 struct spdk_blob *blob = (struct spdk_blob *)arg;
4650 *value = &blob->id;
4651 *value_len = sizeof(blob->id);
4652 }
4653
4654 static void
4655 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4656 {
4657 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4658 struct spdk_blob_opts opts;
4659 struct spdk_blob_xattr_opts internal_xattrs;
4660 char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS };
4661
4662 if (bserrno != 0) {
4663 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
4664 return;
4665 }
4666
4667 ctx->original.blob = _blob;
4668
4669 if (_blob->data_ro || _blob->md_ro) {
4670 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n",
4671 _blob->id);
4672 ctx->bserrno = -EINVAL;
4673 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4674 return;
4675 }
4676
4677 if (_blob->locked_operation_in_progress) {
4678 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot - another operation in progress\n");
4679 ctx->bserrno = -EBUSY;
4680 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4681 return;
4682 }
4683
4684 _blob->locked_operation_in_progress = true;
4685
4686 spdk_blob_opts_init(&opts);
4687 _spdk_blob_xattrs_init(&internal_xattrs);
4688
4689 /* Change the size of new blob to the same as in original blob,
4690 * but do not allocate clusters */
4691 opts.thin_provision = true;
4692 opts.num_clusters = spdk_blob_get_num_clusters(_blob);
4693
4694 /* If there are any xattrs specified for snapshot, set them now */
4695 if (ctx->xattrs) {
4696 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
4697 }
4698 /* Set internal xattr SNAPSHOT_IN_PROGRESS */
4699 internal_xattrs.count = 1;
4700 internal_xattrs.ctx = _blob;
4701 internal_xattrs.names = xattrs_names;
4702 internal_xattrs.get_value = _spdk_bs_xattr_snapshot;
4703
4704 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
4705 _spdk_bs_snapshot_newblob_create_cpl, ctx);
4706 }
4707
4708 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid,
4709 const struct spdk_blob_xattr_opts *snapshot_xattrs,
4710 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4711 {
4712 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
4713
4714 if (!ctx) {
4715 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
4716 return;
4717 }
4718 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4719 ctx->cpl.u.blobid.cb_fn = cb_fn;
4720 ctx->cpl.u.blobid.cb_arg = cb_arg;
4721 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
4722 ctx->bserrno = 0;
4723 ctx->frozen = false;
4724 ctx->original.id = blobid;
4725 ctx->xattrs = snapshot_xattrs;
4726
4727 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx);
4728 }
4729 /* END spdk_bs_create_snapshot */
4730
4731 /* START spdk_bs_create_clone */
4732
4733 static void
4734 _spdk_bs_xattr_clone(void *arg, const char *name,
4735 const void **value, size_t *value_len)
4736 {
4737 assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0);
4738
4739 struct spdk_blob *blob = (struct spdk_blob *)arg;
4740 *value = &blob->id;
4741 *value_len = sizeof(blob->id);
4742 }
4743
4744 static void
4745 _spdk_bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4746 {
4747 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4748 struct spdk_blob *clone = _blob;
4749
4750 ctx->new.blob = clone;
4751 _spdk_bs_blob_list_add(clone);
4752
4753 spdk_blob_close(clone, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4754 }
4755
4756 static void
4757 _spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
4758 {
4759 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4760
4761 ctx->cpl.u.blobid.blobid = blobid;
4762 spdk_bs_open_blob(ctx->original.blob->bs, blobid, _spdk_bs_clone_newblob_open_cpl, ctx);
4763 }
4764
4765 static void
4766 _spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4767 {
4768 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4769 struct spdk_blob_opts opts;
4770 struct spdk_blob_xattr_opts internal_xattrs;
4771 char *xattr_names[] = { BLOB_SNAPSHOT };
4772
4773 if (bserrno != 0) {
4774 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
4775 return;
4776 }
4777
4778 ctx->original.blob = _blob;
4779
4780 if (!_blob->data_ro || !_blob->md_ro) {
4781 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n");
4782 ctx->bserrno = -EINVAL;
4783 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4784 return;
4785 }
4786
4787 if (_blob->locked_operation_in_progress) {
4788 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create clone - another operation in progress\n");
4789 ctx->bserrno = -EBUSY;
4790 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4791 return;
4792 }
4793
4794 _blob->locked_operation_in_progress = true;
4795
4796 spdk_blob_opts_init(&opts);
4797 _spdk_blob_xattrs_init(&internal_xattrs);
4798
4799 opts.thin_provision = true;
4800 opts.num_clusters = spdk_blob_get_num_clusters(_blob);
4801 if (ctx->xattrs) {
4802 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
4803 }
4804
4805 /* Set internal xattr BLOB_SNAPSHOT */
4806 internal_xattrs.count = 1;
4807 internal_xattrs.ctx = _blob;
4808 internal_xattrs.names = xattr_names;
4809 internal_xattrs.get_value = _spdk_bs_xattr_clone;
4810
4811 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
4812 _spdk_bs_clone_newblob_create_cpl, ctx);
4813 }
4814
4815 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid,
4816 const struct spdk_blob_xattr_opts *clone_xattrs,
4817 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4818 {
4819 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
4820
4821 if (!ctx) {
4822 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
4823 return;
4824 }
4825
4826 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4827 ctx->cpl.u.blobid.cb_fn = cb_fn;
4828 ctx->cpl.u.blobid.cb_arg = cb_arg;
4829 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
4830 ctx->bserrno = 0;
4831 ctx->xattrs = clone_xattrs;
4832 ctx->original.id = blobid;
4833
4834 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx);
4835 }
4836
4837 /* END spdk_bs_create_clone */
4838
4839 /* START spdk_bs_inflate_blob */
4840
4841 static void
4842 _spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno)
4843 {
4844 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4845 struct spdk_blob *_blob = ctx->original.blob;
4846
4847 if (bserrno != 0) {
4848 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4849 return;
4850 }
4851
4852 assert(_parent != NULL);
4853
4854 _spdk_bs_blob_list_remove(_blob);
4855 _blob->parent_id = _parent->id;
4856 _spdk_blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id,
4857 sizeof(spdk_blob_id), true);
4858
4859 _blob->back_bs_dev->destroy(_blob->back_bs_dev);
4860 _blob->back_bs_dev = spdk_bs_create_blob_bs_dev(_parent);
4861 _spdk_bs_blob_list_add(_blob);
4862
4863 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4864 }
4865
4866 static void
4867 _spdk_bs_inflate_blob_done(void *cb_arg, int bserrno)
4868 {
4869 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4870 struct spdk_blob *_blob = ctx->original.blob;
4871 struct spdk_blob *_parent;
4872
4873 if (bserrno != 0) {
4874 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4875 return;
4876 }
4877
4878 if (ctx->allocate_all) {
4879 /* remove thin provisioning */
4880 _spdk_bs_blob_list_remove(_blob);
4881 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true);
4882 _blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV;
4883 _blob->back_bs_dev->destroy(_blob->back_bs_dev);
4884 _blob->back_bs_dev = NULL;
4885 _blob->parent_id = SPDK_BLOBID_INVALID;
4886 } else {
4887 _parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob;
4888 if (_parent->parent_id != SPDK_BLOBID_INVALID) {
4889 /* We must change the parent of the inflated blob */
4890 spdk_bs_open_blob(_blob->bs, _parent->parent_id,
4891 _spdk_bs_inflate_blob_set_parent_cpl, ctx);
4892 return;
4893 }
4894
4895 _spdk_bs_blob_list_remove(_blob);
4896 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true);
4897 _blob->parent_id = SPDK_BLOBID_INVALID;
4898 _blob->back_bs_dev->destroy(_blob->back_bs_dev);
4899 _blob->back_bs_dev = spdk_bs_create_zeroes_dev();
4900 }
4901
4902 _blob->state = SPDK_BLOB_STATE_DIRTY;
4903 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4904 }
4905
4906 /* Check if cluster needs allocation */
4907 static inline bool
4908 _spdk_bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all)
4909 {
4910 struct spdk_blob_bs_dev *b;
4911
4912 assert(blob != NULL);
4913
4914 if (blob->active.clusters[cluster] != 0) {
4915 /* Cluster is already allocated */
4916 return false;
4917 }
4918
4919 if (blob->parent_id == SPDK_BLOBID_INVALID) {
4920 /* Blob have no parent blob */
4921 return allocate_all;
4922 }
4923
4924 b = (struct spdk_blob_bs_dev *)blob->back_bs_dev;
4925 return (allocate_all || b->blob->active.clusters[cluster] != 0);
4926 }
4927
4928 static void
4929 _spdk_bs_inflate_blob_touch_next(void *cb_arg, int bserrno)
4930 {
4931 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4932 struct spdk_blob *_blob = ctx->original.blob;
4933 uint64_t offset;
4934
4935 if (bserrno != 0) {
4936 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4937 return;
4938 }
4939
4940 for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) {
4941 if (_spdk_bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) {
4942 break;
4943 }
4944 }
4945
4946 if (ctx->cluster < _blob->active.num_clusters) {
4947 offset = _spdk_bs_cluster_to_lba(_blob->bs, ctx->cluster);
4948
4949 /* We may safely increment a cluster before write */
4950 ctx->cluster++;
4951
4952 /* Use zero length write to touch a cluster */
4953 spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0,
4954 _spdk_bs_inflate_blob_touch_next, ctx);
4955 } else {
4956 _spdk_bs_inflate_blob_done(cb_arg, bserrno);
4957 }
4958 }
4959
4960 static void
4961 _spdk_bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4962 {
4963 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4964 uint64_t lfc; /* lowest free cluster */
4965 uint64_t i;
4966
4967 if (bserrno != 0) {
4968 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
4969 return;
4970 }
4971
4972 ctx->original.blob = _blob;
4973
4974 if (_blob->locked_operation_in_progress) {
4975 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot inflate blob - another operation in progress\n");
4976 ctx->bserrno = -EBUSY;
4977 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4978 return;
4979 }
4980
4981 _blob->locked_operation_in_progress = true;
4982
4983 if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) {
4984 /* This blob have no parent, so we cannot decouple it. */
4985 SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n");
4986 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL);
4987 return;
4988 }
4989
4990 if (spdk_blob_is_thin_provisioned(_blob) == false) {
4991 /* This is not thin provisioned blob. No need to inflate. */
4992 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, 0);
4993 return;
4994 }
4995
4996 /* Do two passes - one to verify that we can obtain enough clusters
4997 * and another to actually claim them.
4998 */
4999 lfc = 0;
5000 for (i = 0; i < _blob->active.num_clusters; i++) {
5001 if (_spdk_bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) {
5002 lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc);
5003 if (lfc == UINT32_MAX) {
5004 /* No more free clusters. Cannot satisfy the request */
5005 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC);
5006 return;
5007 }
5008 lfc++;
5009 }
5010 }
5011
5012 ctx->cluster = 0;
5013 _spdk_bs_inflate_blob_touch_next(ctx, 0);
5014 }
5015
5016 static void
5017 _spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
5018 spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg)
5019 {
5020 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
5021
5022 if (!ctx) {
5023 cb_fn(cb_arg, -ENOMEM);
5024 return;
5025 }
5026 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5027 ctx->cpl.u.bs_basic.cb_fn = cb_fn;
5028 ctx->cpl.u.bs_basic.cb_arg = cb_arg;
5029 ctx->bserrno = 0;
5030 ctx->original.id = blobid;
5031 ctx->channel = channel;
5032 ctx->allocate_all = allocate_all;
5033
5034 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_inflate_blob_open_cpl, ctx);
5035 }
5036
5037 void
5038 spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
5039 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg)
5040 {
5041 _spdk_bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg);
5042 }
5043
5044 void
5045 spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
5046 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg)
5047 {
5048 _spdk_bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg);
5049 }
5050 /* END spdk_bs_inflate_blob */
5051
5052 /* START spdk_blob_resize */
5053 struct spdk_bs_resize_ctx {
5054 spdk_blob_op_complete cb_fn;
5055 void *cb_arg;
5056 struct spdk_blob *blob;
5057 uint64_t sz;
5058 int rc;
5059 };
5060
5061 static void
5062 _spdk_bs_resize_unfreeze_cpl(void *cb_arg, int rc)
5063 {
5064 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg;
5065
5066 if (rc != 0) {
5067 SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc);
5068 }
5069
5070 if (ctx->rc != 0) {
5071 SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc);
5072 rc = ctx->rc;
5073 }
5074
5075 ctx->blob->locked_operation_in_progress = false;
5076
5077 ctx->cb_fn(ctx->cb_arg, rc);
5078 free(ctx);
5079 }
5080
5081 static void
5082 _spdk_bs_resize_freeze_cpl(void *cb_arg, int rc)
5083 {
5084 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg;
5085
5086 if (rc != 0) {
5087 ctx->blob->locked_operation_in_progress = false;
5088 ctx->cb_fn(ctx->cb_arg, rc);
5089 free(ctx);
5090 return;
5091 }
5092
5093 ctx->rc = _spdk_blob_resize(ctx->blob, ctx->sz);
5094
5095 _spdk_blob_unfreeze_io(ctx->blob, _spdk_bs_resize_unfreeze_cpl, ctx);
5096 }
5097
5098 void
5099 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg)
5100 {
5101 struct spdk_bs_resize_ctx *ctx;
5102
5103 _spdk_blob_verify_md_op(blob);
5104
5105 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
5106
5107 if (blob->md_ro) {
5108 cb_fn(cb_arg, -EPERM);
5109 return;
5110 }
5111
5112 if (sz == blob->active.num_clusters) {
5113 cb_fn(cb_arg, 0);
5114 return;
5115 }
5116
5117 if (blob->locked_operation_in_progress) {
5118 cb_fn(cb_arg, -EBUSY);
5119 return;
5120 }
5121
5122 ctx = calloc(1, sizeof(*ctx));
5123 if (!ctx) {
5124 cb_fn(cb_arg, -ENOMEM);
5125 return;
5126 }
5127
5128 blob->locked_operation_in_progress = true;
5129 ctx->cb_fn = cb_fn;
5130 ctx->cb_arg = cb_arg;
5131 ctx->blob = blob;
5132 ctx->sz = sz;
5133 _spdk_blob_freeze_io(blob, _spdk_bs_resize_freeze_cpl, ctx);
5134 }
5135
5136 /* END spdk_blob_resize */
5137
5138
5139 /* START spdk_bs_delete_blob */
5140
5141 static void
5142 _spdk_bs_delete_ebusy_close_cpl(void *cb_arg, int bserrno)
5143 {
5144 spdk_bs_sequence_t *seq = cb_arg;
5145
5146 spdk_bs_sequence_finish(seq, -EBUSY);
5147 }
5148
5149 static void
5150 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
5151 {
5152 spdk_bs_sequence_t *seq = cb_arg;
5153
5154 spdk_bs_sequence_finish(seq, bserrno);
5155 }
5156
5157 static void
5158 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5159 {
5160 struct spdk_blob *blob = cb_arg;
5161
5162 if (bserrno != 0) {
5163 /*
5164 * We already removed this blob from the blobstore tailq, so
5165 * we need to free it here since this is the last reference
5166 * to it.
5167 */
5168 _spdk_blob_free(blob);
5169 _spdk_bs_delete_close_cpl(seq, bserrno);
5170 return;
5171 }
5172
5173 /*
5174 * This will immediately decrement the ref_count and call
5175 * the completion routine since the metadata state is clean.
5176 * By calling spdk_blob_close, we reduce the number of call
5177 * points into code that touches the blob->open_ref count
5178 * and the blobstore's blob list.
5179 */
5180 spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq);
5181 }
5182
5183 static void
5184 _spdk_bs_delete_blob_finish(void *cb_arg, struct spdk_blob *blob, int bserrno)
5185 {
5186 spdk_bs_sequence_t *seq = cb_arg;
5187 struct spdk_blob_list *snapshot_entry = NULL;
5188 uint32_t page_num;
5189
5190 if (bserrno) {
5191 SPDK_ERRLOG("Failed to remove blob\n");
5192 spdk_bs_sequence_finish(seq, bserrno);
5193 return;
5194 }
5195
5196 /* Remove snapshot from the list */
5197 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id);
5198 if (snapshot_entry != NULL) {
5199 TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link);
5200 free(snapshot_entry);
5201 }
5202
5203 page_num = _spdk_bs_blobid_to_page(blob->id);
5204 spdk_bit_array_clear(blob->bs->used_blobids, page_num);
5205 blob->state = SPDK_BLOB_STATE_DIRTY;
5206 blob->active.num_pages = 0;
5207 _spdk_blob_resize(blob, 0);
5208
5209 _spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob);
5210 }
5211
5212 static int
5213 _spdk_bs_is_blob_deletable(struct spdk_blob *blob)
5214 {
5215 struct spdk_blob_list *snapshot_entry = NULL;
5216
5217 if (blob->open_ref > 1) {
5218 /* Someone has this blob open (besides this delete context). */
5219 return -EBUSY;
5220 }
5221
5222 /* Check if this is a snapshot with clones */
5223 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id);
5224 if (snapshot_entry != NULL) {
5225 /* If snapshot have clones, we cannot remove it */
5226 if (!TAILQ_EMPTY(&snapshot_entry->clones)) {
5227 SPDK_ERRLOG("Cannot remove snapshot with clones\n");
5228 return -EBUSY;
5229 }
5230 }
5231
5232 return 0;
5233 }
5234
5235 static void
5236 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
5237 {
5238 spdk_bs_sequence_t *seq = cb_arg;
5239
5240 if (bserrno != 0) {
5241 spdk_bs_sequence_finish(seq, bserrno);
5242 return;
5243 }
5244
5245 _spdk_blob_verify_md_op(blob);
5246
5247 bserrno = _spdk_bs_is_blob_deletable(blob);
5248 if (bserrno) {
5249 spdk_blob_close(blob, _spdk_bs_delete_ebusy_close_cpl, seq);
5250 return;
5251 }
5252
5253 _spdk_bs_blob_list_remove(blob);
5254
5255 if (blob->locked_operation_in_progress) {
5256 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress\n");
5257 spdk_blob_close(blob, _spdk_bs_delete_ebusy_close_cpl, seq);
5258 return;
5259 }
5260
5261 blob->locked_operation_in_progress = true;
5262
5263 /*
5264 * Remove the blob from the blob_store list now, to ensure it does not
5265 * get returned after this point by _spdk_blob_lookup().
5266 */
5267 TAILQ_REMOVE(&blob->bs->blobs, blob, link);
5268
5269 _spdk_bs_delete_blob_finish(seq, blob, 0);
5270 }
5271
5272 void
5273 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
5274 spdk_blob_op_complete cb_fn, void *cb_arg)
5275 {
5276 struct spdk_bs_cpl cpl;
5277 spdk_bs_sequence_t *seq;
5278
5279 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
5280
5281 assert(spdk_get_thread() == bs->md_thread);
5282
5283 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5284 cpl.u.blob_basic.cb_fn = cb_fn;
5285 cpl.u.blob_basic.cb_arg = cb_arg;
5286
5287 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
5288 if (!seq) {
5289 cb_fn(cb_arg, -ENOMEM);
5290 return;
5291 }
5292
5293 spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
5294 }
5295
5296 /* END spdk_bs_delete_blob */
5297
5298 /* START spdk_bs_open_blob */
5299
5300 static void
5301 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5302 {
5303 struct spdk_blob *blob = cb_arg;
5304
5305 /* If the blob have crc error, we just return NULL. */
5306 if (blob == NULL) {
5307 seq->cpl.u.blob_handle.blob = NULL;
5308 spdk_bs_sequence_finish(seq, bserrno);
5309 return;
5310 }
5311
5312 blob->open_ref++;
5313
5314 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
5315
5316 spdk_bs_sequence_finish(seq, bserrno);
5317 }
5318
5319 static void _spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
5320 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
5321 {
5322 struct spdk_blob *blob;
5323 struct spdk_bs_cpl cpl;
5324 struct spdk_blob_open_opts opts_default;
5325 spdk_bs_sequence_t *seq;
5326 uint32_t page_num;
5327
5328 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
5329 assert(spdk_get_thread() == bs->md_thread);
5330
5331 page_num = _spdk_bs_blobid_to_page(blobid);
5332 if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
5333 /* Invalid blobid */
5334 cb_fn(cb_arg, NULL, -ENOENT);
5335 return;
5336 }
5337
5338 blob = _spdk_blob_lookup(bs, blobid);
5339 if (blob) {
5340 blob->open_ref++;
5341 cb_fn(cb_arg, blob, 0);
5342 return;
5343 }
5344
5345 blob = _spdk_blob_alloc(bs, blobid);
5346 if (!blob) {
5347 cb_fn(cb_arg, NULL, -ENOMEM);
5348 return;
5349 }
5350
5351 if (!opts) {
5352 spdk_blob_open_opts_init(&opts_default);
5353 opts = &opts_default;
5354 }
5355
5356 blob->clear_method = opts->clear_method;
5357
5358 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
5359 cpl.u.blob_handle.cb_fn = cb_fn;
5360 cpl.u.blob_handle.cb_arg = cb_arg;
5361 cpl.u.blob_handle.blob = blob;
5362
5363 seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
5364 if (!seq) {
5365 _spdk_blob_free(blob);
5366 cb_fn(cb_arg, NULL, -ENOMEM);
5367 return;
5368 }
5369
5370 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
5371 }
5372
5373 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
5374 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
5375 {
5376 _spdk_bs_open_blob(bs, blobid, NULL, cb_fn, cb_arg);
5377 }
5378
5379 void spdk_bs_open_blob_ext(struct spdk_blob_store *bs, spdk_blob_id blobid,
5380 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
5381 {
5382 _spdk_bs_open_blob(bs, blobid, opts, cb_fn, cb_arg);
5383 }
5384
5385 /* END spdk_bs_open_blob */
5386
5387 /* START spdk_blob_set_read_only */
5388 int spdk_blob_set_read_only(struct spdk_blob *blob)
5389 {
5390 _spdk_blob_verify_md_op(blob);
5391
5392 blob->data_ro_flags |= SPDK_BLOB_READ_ONLY;
5393
5394 blob->state = SPDK_BLOB_STATE_DIRTY;
5395 return 0;
5396 }
5397 /* END spdk_blob_set_read_only */
5398
5399 /* START spdk_blob_sync_md */
5400
5401 static void
5402 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5403 {
5404 struct spdk_blob *blob = cb_arg;
5405
5406 if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
5407 blob->data_ro = true;
5408 blob->md_ro = true;
5409 }
5410
5411 spdk_bs_sequence_finish(seq, bserrno);
5412 }
5413
5414 static void
5415 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
5416 {
5417 struct spdk_bs_cpl cpl;
5418 spdk_bs_sequence_t *seq;
5419
5420 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5421 cpl.u.blob_basic.cb_fn = cb_fn;
5422 cpl.u.blob_basic.cb_arg = cb_arg;
5423
5424 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
5425 if (!seq) {
5426 cb_fn(cb_arg, -ENOMEM);
5427 return;
5428 }
5429
5430 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
5431 }
5432
5433 void
5434 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
5435 {
5436 _spdk_blob_verify_md_op(blob);
5437
5438 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
5439
5440 if (blob->md_ro) {
5441 assert(blob->state == SPDK_BLOB_STATE_CLEAN);
5442 cb_fn(cb_arg, 0);
5443 return;
5444 }
5445
5446 _spdk_blob_sync_md(blob, cb_fn, cb_arg);
5447 }
5448
5449 /* END spdk_blob_sync_md */
5450
5451 struct spdk_blob_insert_cluster_ctx {
5452 struct spdk_thread *thread;
5453 struct spdk_blob *blob;
5454 uint32_t cluster_num; /* cluster index in blob */
5455 uint32_t cluster; /* cluster on disk */
5456 int rc;
5457 spdk_blob_op_complete cb_fn;
5458 void *cb_arg;
5459 };
5460
5461 static void
5462 _spdk_blob_insert_cluster_msg_cpl(void *arg)
5463 {
5464 struct spdk_blob_insert_cluster_ctx *ctx = arg;
5465
5466 ctx->cb_fn(ctx->cb_arg, ctx->rc);
5467 free(ctx);
5468 }
5469
5470 static void
5471 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno)
5472 {
5473 struct spdk_blob_insert_cluster_ctx *ctx = arg;
5474
5475 ctx->rc = bserrno;
5476 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
5477 }
5478
5479 static void
5480 _spdk_blob_insert_cluster_msg(void *arg)
5481 {
5482 struct spdk_blob_insert_cluster_ctx *ctx = arg;
5483
5484 ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster);
5485 if (ctx->rc != 0) {
5486 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
5487 return;
5488 }
5489
5490 ctx->blob->state = SPDK_BLOB_STATE_DIRTY;
5491 _spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx);
5492 }
5493
5494 static void
5495 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
5496 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg)
5497 {
5498 struct spdk_blob_insert_cluster_ctx *ctx;
5499
5500 ctx = calloc(1, sizeof(*ctx));
5501 if (ctx == NULL) {
5502 cb_fn(cb_arg, -ENOMEM);
5503 return;
5504 }
5505
5506 ctx->thread = spdk_get_thread();
5507 ctx->blob = blob;
5508 ctx->cluster_num = cluster_num;
5509 ctx->cluster = cluster;
5510 ctx->cb_fn = cb_fn;
5511 ctx->cb_arg = cb_arg;
5512
5513 spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx);
5514 }
5515
5516 /* START spdk_blob_close */
5517
5518 static void
5519 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5520 {
5521 struct spdk_blob *blob = cb_arg;
5522
5523 if (bserrno == 0) {
5524 blob->open_ref--;
5525 if (blob->open_ref == 0) {
5526 /*
5527 * Blobs with active.num_pages == 0 are deleted blobs.
5528 * these blobs are removed from the blob_store list
5529 * when the deletion process starts - so don't try to
5530 * remove them again.
5531 */
5532 if (blob->active.num_pages > 0) {
5533 TAILQ_REMOVE(&blob->bs->blobs, blob, link);
5534 }
5535 _spdk_blob_free(blob);
5536 }
5537 }
5538
5539 spdk_bs_sequence_finish(seq, bserrno);
5540 }
5541
5542 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
5543 {
5544 struct spdk_bs_cpl cpl;
5545 spdk_bs_sequence_t *seq;
5546
5547 _spdk_blob_verify_md_op(blob);
5548
5549 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
5550
5551 if (blob->open_ref == 0) {
5552 cb_fn(cb_arg, -EBADF);
5553 return;
5554 }
5555
5556 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5557 cpl.u.blob_basic.cb_fn = cb_fn;
5558 cpl.u.blob_basic.cb_arg = cb_arg;
5559
5560 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
5561 if (!seq) {
5562 cb_fn(cb_arg, -ENOMEM);
5563 return;
5564 }
5565
5566 /* Sync metadata */
5567 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
5568 }
5569
5570 /* END spdk_blob_close */
5571
5572 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
5573 {
5574 return spdk_get_io_channel(bs);
5575 }
5576
5577 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
5578 {
5579 spdk_put_io_channel(channel);
5580 }
5581
5582 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel,
5583 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
5584 {
5585 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
5586 SPDK_BLOB_UNMAP);
5587 }
5588
5589 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel,
5590 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
5591 {
5592 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
5593 SPDK_BLOB_WRITE_ZEROES);
5594 }
5595
5596 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel,
5597 void *payload, uint64_t offset, uint64_t length,
5598 spdk_blob_op_complete cb_fn, void *cb_arg)
5599 {
5600 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
5601 SPDK_BLOB_WRITE);
5602 }
5603
5604 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel,
5605 void *payload, uint64_t offset, uint64_t length,
5606 spdk_blob_op_complete cb_fn, void *cb_arg)
5607 {
5608 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
5609 SPDK_BLOB_READ);
5610 }
5611
5612 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel,
5613 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
5614 spdk_blob_op_complete cb_fn, void *cb_arg)
5615 {
5616 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
5617 }
5618
5619 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel,
5620 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
5621 spdk_blob_op_complete cb_fn, void *cb_arg)
5622 {
5623 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
5624 }
5625
5626 struct spdk_bs_iter_ctx {
5627 int64_t page_num;
5628 struct spdk_blob_store *bs;
5629
5630 spdk_blob_op_with_handle_complete cb_fn;
5631 void *cb_arg;
5632 };
5633
5634 static void
5635 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
5636 {
5637 struct spdk_bs_iter_ctx *ctx = cb_arg;
5638 struct spdk_blob_store *bs = ctx->bs;
5639 spdk_blob_id id;
5640
5641 if (bserrno == 0) {
5642 ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
5643 free(ctx);
5644 return;
5645 }
5646
5647 ctx->page_num++;
5648 ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
5649 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
5650 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
5651 free(ctx);
5652 return;
5653 }
5654
5655 id = _spdk_bs_page_to_blobid(ctx->page_num);
5656
5657 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
5658 }
5659
5660 void
5661 spdk_bs_iter_first(struct spdk_blob_store *bs,
5662 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
5663 {
5664 struct spdk_bs_iter_ctx *ctx;
5665
5666 ctx = calloc(1, sizeof(*ctx));
5667 if (!ctx) {
5668 cb_fn(cb_arg, NULL, -ENOMEM);
5669 return;
5670 }
5671
5672 ctx->page_num = -1;
5673 ctx->bs = bs;
5674 ctx->cb_fn = cb_fn;
5675 ctx->cb_arg = cb_arg;
5676
5677 _spdk_bs_iter_cpl(ctx, NULL, -1);
5678 }
5679
5680 static void
5681 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
5682 {
5683 struct spdk_bs_iter_ctx *ctx = cb_arg;
5684
5685 _spdk_bs_iter_cpl(ctx, NULL, -1);
5686 }
5687
5688 void
5689 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob,
5690 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
5691 {
5692 struct spdk_bs_iter_ctx *ctx;
5693
5694 assert(blob != NULL);
5695
5696 ctx = calloc(1, sizeof(*ctx));
5697 if (!ctx) {
5698 cb_fn(cb_arg, NULL, -ENOMEM);
5699 return;
5700 }
5701
5702 ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
5703 ctx->bs = bs;
5704 ctx->cb_fn = cb_fn;
5705 ctx->cb_arg = cb_arg;
5706
5707 /* Close the existing blob */
5708 spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx);
5709 }
5710
5711 static int
5712 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
5713 uint16_t value_len, bool internal)
5714 {
5715 struct spdk_xattr_tailq *xattrs;
5716 struct spdk_xattr *xattr;
5717
5718 _spdk_blob_verify_md_op(blob);
5719
5720 if (blob->md_ro) {
5721 return -EPERM;
5722 }
5723
5724 if (internal) {
5725 xattrs = &blob->xattrs_internal;
5726 blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR;
5727 } else {
5728 xattrs = &blob->xattrs;
5729 }
5730
5731 TAILQ_FOREACH(xattr, xattrs, link) {
5732 if (!strcmp(name, xattr->name)) {
5733 free(xattr->value);
5734 xattr->value_len = value_len;
5735 xattr->value = malloc(value_len);
5736 memcpy(xattr->value, value, value_len);
5737
5738 blob->state = SPDK_BLOB_STATE_DIRTY;
5739
5740 return 0;
5741 }
5742 }
5743
5744 xattr = calloc(1, sizeof(*xattr));
5745 if (!xattr) {
5746 return -ENOMEM;
5747 }
5748 xattr->name = strdup(name);
5749 xattr->value_len = value_len;
5750 xattr->value = malloc(value_len);
5751 memcpy(xattr->value, value, value_len);
5752 TAILQ_INSERT_TAIL(xattrs, xattr, link);
5753
5754 blob->state = SPDK_BLOB_STATE_DIRTY;
5755
5756 return 0;
5757 }
5758
5759 int
5760 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
5761 uint16_t value_len)
5762 {
5763 return _spdk_blob_set_xattr(blob, name, value, value_len, false);
5764 }
5765
5766 static int
5767 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal)
5768 {
5769 struct spdk_xattr_tailq *xattrs;
5770 struct spdk_xattr *xattr;
5771
5772 _spdk_blob_verify_md_op(blob);
5773
5774 if (blob->md_ro) {
5775 return -EPERM;
5776 }
5777 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
5778
5779 TAILQ_FOREACH(xattr, xattrs, link) {
5780 if (!strcmp(name, xattr->name)) {
5781 TAILQ_REMOVE(xattrs, xattr, link);
5782 free(xattr->value);
5783 free(xattr->name);
5784 free(xattr);
5785
5786 if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) {
5787 blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR;
5788 }
5789 blob->state = SPDK_BLOB_STATE_DIRTY;
5790
5791 return 0;
5792 }
5793 }
5794
5795 return -ENOENT;
5796 }
5797
5798 int
5799 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name)
5800 {
5801 return _spdk_blob_remove_xattr(blob, name, false);
5802 }
5803
5804 static int
5805 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
5806 const void **value, size_t *value_len, bool internal)
5807 {
5808 struct spdk_xattr *xattr;
5809 struct spdk_xattr_tailq *xattrs;
5810
5811 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
5812
5813 TAILQ_FOREACH(xattr, xattrs, link) {
5814 if (!strcmp(name, xattr->name)) {
5815 *value = xattr->value;
5816 *value_len = xattr->value_len;
5817 return 0;
5818 }
5819 }
5820 return -ENOENT;
5821 }
5822
5823 int
5824 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
5825 const void **value, size_t *value_len)
5826 {
5827 _spdk_blob_verify_md_op(blob);
5828
5829 return _spdk_blob_get_xattr_value(blob, name, value, value_len, false);
5830 }
5831
5832 struct spdk_xattr_names {
5833 uint32_t count;
5834 const char *names[0];
5835 };
5836
5837 static int
5838 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names)
5839 {
5840 struct spdk_xattr *xattr;
5841 int count = 0;
5842
5843 TAILQ_FOREACH(xattr, xattrs, link) {
5844 count++;
5845 }
5846
5847 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
5848 if (*names == NULL) {
5849 return -ENOMEM;
5850 }
5851
5852 TAILQ_FOREACH(xattr, xattrs, link) {
5853 (*names)->names[(*names)->count++] = xattr->name;
5854 }
5855
5856 return 0;
5857 }
5858
5859 int
5860 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names)
5861 {
5862 _spdk_blob_verify_md_op(blob);
5863
5864 return _spdk_blob_get_xattr_names(&blob->xattrs, names);
5865 }
5866
5867 uint32_t
5868 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
5869 {
5870 assert(names != NULL);
5871
5872 return names->count;
5873 }
5874
5875 const char *
5876 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
5877 {
5878 if (index >= names->count) {
5879 return NULL;
5880 }
5881
5882 return names->names[index];
5883 }
5884
5885 void
5886 spdk_xattr_names_free(struct spdk_xattr_names *names)
5887 {
5888 free(names);
5889 }
5890
5891 struct spdk_bs_type
5892 spdk_bs_get_bstype(struct spdk_blob_store *bs)
5893 {
5894 return bs->bstype;
5895 }
5896
5897 void
5898 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
5899 {
5900 memcpy(&bs->bstype, &bstype, sizeof(bstype));
5901 }
5902
5903 bool
5904 spdk_blob_is_read_only(struct spdk_blob *blob)
5905 {
5906 assert(blob != NULL);
5907 return (blob->data_ro || blob->md_ro);
5908 }
5909
5910 bool
5911 spdk_blob_is_snapshot(struct spdk_blob *blob)
5912 {
5913 struct spdk_blob_list *snapshot_entry;
5914
5915 assert(blob != NULL);
5916
5917 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id);
5918 if (snapshot_entry == NULL) {
5919 return false;
5920 }
5921
5922 return true;
5923 }
5924
5925 bool
5926 spdk_blob_is_clone(struct spdk_blob *blob)
5927 {
5928 assert(blob != NULL);
5929
5930 if (blob->parent_id != SPDK_BLOBID_INVALID) {
5931 assert(spdk_blob_is_thin_provisioned(blob));
5932 return true;
5933 }
5934
5935 return false;
5936 }
5937
5938 bool
5939 spdk_blob_is_thin_provisioned(struct spdk_blob *blob)
5940 {
5941 assert(blob != NULL);
5942 return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV);
5943 }
5944
5945 spdk_blob_id
5946 spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id)
5947 {
5948 struct spdk_blob_list *snapshot_entry = NULL;
5949 struct spdk_blob_list *clone_entry = NULL;
5950
5951 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) {
5952 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
5953 if (clone_entry->id == blob_id) {
5954 return snapshot_entry->id;
5955 }
5956 }
5957 }
5958
5959 return SPDK_BLOBID_INVALID;
5960 }
5961
5962 int
5963 spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids,
5964 size_t *count)
5965 {
5966 struct spdk_blob_list *snapshot_entry, *clone_entry;
5967 size_t n;
5968
5969 snapshot_entry = _spdk_bs_get_snapshot_entry(bs, blobid);
5970 if (snapshot_entry == NULL) {
5971 *count = 0;
5972 return 0;
5973 }
5974
5975 if (ids == NULL || *count < snapshot_entry->clone_count) {
5976 *count = snapshot_entry->clone_count;
5977 return -ENOMEM;
5978 }
5979 *count = snapshot_entry->clone_count;
5980
5981 n = 0;
5982 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
5983 ids[n++] = clone_entry->id;
5984 }
5985
5986 return 0;
5987 }
5988
5989 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)