]> git.proxmox.com Git - ceph.git/blame - ceph/src/spdk/lib/reduce/reduce.c
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / lib / reduce / reduce.c
CommitLineData
9f95a23c
TL
1/*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34#include "spdk/stdinc.h"
35
36#include "spdk/reduce.h"
37#include "spdk/env.h"
38#include "spdk/string.h"
39#include "spdk/bit_array.h"
40#include "spdk/util.h"
41#include "spdk_internal/log.h"
42
43#include "libpmem.h"
44
45/* Always round up the size of the PM region to the nearest cacheline. */
46#define REDUCE_PM_SIZE_ALIGNMENT 64
47
48/* Offset into the backing device where the persistent memory file's path is stored. */
49#define REDUCE_BACKING_DEV_PATH_OFFSET 4096
50
51#define REDUCE_EMPTY_MAP_ENTRY -1ULL
52
53#define REDUCE_NUM_VOL_REQUESTS 256
54
55/* Structure written to offset 0 of both the pm file and the backing device. */
56struct spdk_reduce_vol_superblock {
57 uint8_t signature[8];
58 struct spdk_reduce_vol_params params;
59 uint8_t reserved[4048];
60};
61SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
62
63#define SPDK_REDUCE_SIGNATURE "SPDKREDU"
64/* null terminator counts one */
65SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect");
67
68#define REDUCE_PATH_MAX 4096
69
f67539c2
TL
70#define REDUCE_ZERO_BUF_SIZE 0x100000
71
9f95a23c
TL
72/**
73 * Describes a persistent memory file used to hold metadata associated with a
74 * compressed volume.
75 */
76struct spdk_reduce_pm_file {
77 char path[REDUCE_PATH_MAX];
78 void *pm_buf;
79 int pm_is_pmem;
80 uint64_t size;
81};
82
83#define REDUCE_IO_READV 1
84#define REDUCE_IO_WRITEV 2
85
86struct spdk_reduce_chunk_map {
87 uint32_t compressed_size;
88 uint32_t reserved;
89 uint64_t io_unit_index[0];
90};
91
92struct spdk_reduce_vol_request {
93 /**
94 * Scratch buffer used for uncompressed chunk. This is used for:
95 * 1) source buffer for compression operations
96 * 2) destination buffer for decompression operations
97 * 3) data buffer when writing uncompressed chunk to disk
98 * 4) data buffer when reading uncompressed chunk from disk
99 */
100 uint8_t *decomp_buf;
101 struct iovec *decomp_buf_iov;
f67539c2
TL
102
103 /**
104 * These are used to construct the iovecs that are sent to
105 * the decomp engine, they point to a mix of the scratch buffer
106 * and user buffer
107 */
108 struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2];
109 int decomp_iovcnt;
110
9f95a23c
TL
111 /**
112 * Scratch buffer used for compressed chunk. This is used for:
113 * 1) destination buffer for compression operations
114 * 2) source buffer for decompression operations
115 * 3) data buffer when writing compressed chunk to disk
116 * 4) data buffer when reading compressed chunk from disk
117 */
118 uint8_t *comp_buf;
119 struct iovec *comp_buf_iov;
120 struct iovec *iov;
f67539c2 121 bool rmw;
9f95a23c
TL
122 struct spdk_reduce_vol *vol;
123 int type;
124 int reduce_errno;
125 int iovcnt;
126 int num_backing_ops;
127 uint32_t num_io_units;
128 bool chunk_is_compressed;
129 uint64_t offset;
130 uint64_t logical_map_index;
131 uint64_t length;
132 uint64_t chunk_map_index;
133 struct spdk_reduce_chunk_map *chunk;
134 spdk_reduce_vol_op_complete cb_fn;
135 void *cb_arg;
136 TAILQ_ENTRY(spdk_reduce_vol_request) tailq;
137 struct spdk_reduce_vol_cb_args backing_cb_args;
138};
139
140struct spdk_reduce_vol {
141 struct spdk_reduce_vol_params params;
142 uint32_t backing_io_units_per_chunk;
143 uint32_t backing_lba_per_io_unit;
144 uint32_t logical_blocks_per_chunk;
145 struct spdk_reduce_pm_file pm_file;
146 struct spdk_reduce_backing_dev *backing_dev;
147 struct spdk_reduce_vol_superblock *backing_super;
148 struct spdk_reduce_vol_superblock *pm_super;
149 uint64_t *pm_logical_map;
150 uint64_t *pm_chunk_maps;
151
152 struct spdk_bit_array *allocated_chunk_maps;
153 struct spdk_bit_array *allocated_backing_io_units;
154
155 struct spdk_reduce_vol_request *request_mem;
156 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests;
157 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests;
158 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests;
159
160 /* Single contiguous buffer used for all request buffers for this volume. */
161 uint8_t *buf_mem;
162 struct iovec *buf_iov_mem;
163};
164
165static void _start_readv_request(struct spdk_reduce_vol_request *req);
166static void _start_writev_request(struct spdk_reduce_vol_request *req);
f67539c2
TL
167static uint8_t *g_zero_buf;
168static int g_vol_count = 0;
9f95a23c
TL
169
170/*
171 * Allocate extra metadata chunks and corresponding backing io units to account for
172 * outstanding IO in worst case scenario where logical map is completely allocated
173 * and no data can be compressed. We need extra chunks in this case to handle
174 * in-flight writes since reduce never writes data in place.
175 */
176#define REDUCE_NUM_EXTRA_CHUNKS 128
177
178static void
179_reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
180{
181 if (vol->pm_file.pm_is_pmem) {
182 pmem_persist(addr, len);
183 } else {
184 pmem_msync(addr, len);
185 }
186}
187
188static uint64_t
189_get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
190{
191 uint64_t chunks_in_logical_map, logical_map_size;
192
193 chunks_in_logical_map = vol_size / chunk_size;
194 logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
195
196 /* Round up to next cacheline. */
197 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
198 REDUCE_PM_SIZE_ALIGNMENT;
199}
200
201static uint64_t
202_get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
203{
204 uint64_t num_chunks;
205
206 num_chunks = vol_size / chunk_size;
207 num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
208
209 return num_chunks;
210}
211
f67539c2
TL
212static inline uint32_t
213_reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
214{
215 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
216}
217
9f95a23c
TL
218static uint64_t
219_get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
220{
221 uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
222
223 num_chunks = _get_total_chunks(vol_size, chunk_size);
224 io_units_per_chunk = chunk_size / backing_io_unit_size;
f67539c2
TL
225
226 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
9f95a23c
TL
227
228 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
229 REDUCE_PM_SIZE_ALIGNMENT;
230}
231
9f95a23c
TL
232static struct spdk_reduce_chunk_map *
233_reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
234{
235 uintptr_t chunk_map_addr;
236
237 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
238
239 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
f67539c2
TL
240 chunk_map_addr += chunk_map_index *
241 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
9f95a23c
TL
242
243 return (struct spdk_reduce_chunk_map *)chunk_map_addr;
244}
245
246static int
247_validate_vol_params(struct spdk_reduce_vol_params *params)
248{
249 if (params->vol_size > 0) {
250 /**
251 * User does not pass in the vol size - it gets calculated by libreduce from
252 * values in this structure plus the size of the backing device.
253 */
254 return -EINVAL;
255 }
256
257 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
258 params->logical_block_size == 0) {
259 return -EINVAL;
260 }
261
262 /* Chunk size must be an even multiple of the backing io unit size. */
263 if ((params->chunk_size % params->backing_io_unit_size) != 0) {
264 return -EINVAL;
265 }
266
267 /* Chunk size must be an even multiple of the logical block size. */
268 if ((params->chunk_size % params->logical_block_size) != 0) {
269 return -1;
270 }
271
272 return 0;
273}
274
275static uint64_t
276_get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
277{
278 uint64_t num_chunks;
279
280 num_chunks = backing_dev_size / chunk_size;
281 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
282 return 0;
283 }
284
285 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
286 return num_chunks * chunk_size;
287}
288
289static uint64_t
290_get_pm_file_size(struct spdk_reduce_vol_params *params)
291{
292 uint64_t total_pm_size;
293
294 total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
295 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
296 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
297 params->backing_io_unit_size);
298 return total_pm_size;
299}
300
301const struct spdk_uuid *
302spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
303{
304 return &vol->params.uuid;
305}
306
307static void
308_initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
309{
f67539c2
TL
310 uint64_t logical_map_size;
311
9f95a23c
TL
312 /* Superblock is at the beginning of the pm file. */
313 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
314
315 /* Logical map immediately follows the super block. */
316 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
317
318 /* Chunks maps follow the logical map. */
f67539c2
TL
319 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
320 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
9f95a23c
TL
321}
322
323/* We need 2 iovs during load - one for the superblock, another for the path */
324#define LOAD_IOV_COUNT 2
325
326struct reduce_init_load_ctx {
327 struct spdk_reduce_vol *vol;
328 struct spdk_reduce_vol_cb_args backing_cb_args;
329 spdk_reduce_vol_op_with_handle_complete cb_fn;
330 void *cb_arg;
331 struct iovec iov[LOAD_IOV_COUNT];
332 void *path;
333};
334
335static int
336_allocate_vol_requests(struct spdk_reduce_vol *vol)
337{
338 struct spdk_reduce_vol_request *req;
339 int i;
340
341 /* Allocate 2x since we need buffers for both read/write and compress/decompress
342 * intermediate buffers.
343 */
f67539c2
TL
344 vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size,
345 64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
9f95a23c
TL
346 if (vol->buf_mem == NULL) {
347 return -ENOMEM;
348 }
349
350 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
351 if (vol->request_mem == NULL) {
f67539c2 352 spdk_free(vol->buf_mem);
9f95a23c
TL
353 vol->buf_mem = NULL;
354 return -ENOMEM;
355 }
356
357 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
358 * buffers.
359 */
360 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
361 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
362 if (vol->buf_iov_mem == NULL) {
363 free(vol->request_mem);
f67539c2 364 spdk_free(vol->buf_mem);
9f95a23c
TL
365 vol->request_mem = NULL;
366 vol->buf_mem = NULL;
367 return -ENOMEM;
368 }
369
370 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
371 req = &vol->request_mem[i];
372 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
373 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
374 req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size;
375 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
376 req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size;
377 }
378
379 return 0;
380}
381
382static void
383_init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
384{
385 if (ctx != NULL) {
f67539c2 386 spdk_free(ctx->path);
9f95a23c
TL
387 free(ctx);
388 }
389
390 if (vol != NULL) {
f67539c2
TL
391 if (vol->pm_file.pm_buf != NULL) {
392 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
393 }
394
395 spdk_free(vol->backing_super);
9f95a23c
TL
396 spdk_bit_array_free(&vol->allocated_chunk_maps);
397 spdk_bit_array_free(&vol->allocated_backing_io_units);
398 free(vol->request_mem);
399 free(vol->buf_iov_mem);
f67539c2 400 spdk_free(vol->buf_mem);
9f95a23c
TL
401 free(vol);
402 }
403}
404
f67539c2
TL
405static int
406_alloc_zero_buff(void)
407{
408 int rc = 0;
409
410 /* The zero buffer is shared between all volumnes and just used
411 * for reads so allocate one global instance here if not already
412 * allocated when another vol init'd or loaded.
413 */
414 if (g_vol_count++ == 0) {
415 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
416 64, NULL, SPDK_ENV_LCORE_ID_ANY,
417 SPDK_MALLOC_DMA);
418 if (g_zero_buf == NULL) {
419 rc = -ENOMEM;
420 }
421 }
422 return rc;
423}
424
9f95a23c
TL
425static void
426_init_write_super_cpl(void *cb_arg, int reduce_errno)
427{
428 struct reduce_init_load_ctx *init_ctx = cb_arg;
429 int rc;
430
431 rc = _allocate_vol_requests(init_ctx->vol);
432 if (rc != 0) {
433 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
434 _init_load_cleanup(init_ctx->vol, init_ctx);
435 return;
436 }
437
f67539c2
TL
438 rc = _alloc_zero_buff();
439 if (rc != 0) {
440 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
441 _init_load_cleanup(init_ctx->vol, init_ctx);
442 return;
443 }
444
9f95a23c
TL
445 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
446 /* Only clean up the ctx - the vol has been passed to the application
447 * for use now that initialization was successful.
448 */
449 _init_load_cleanup(NULL, init_ctx);
450}
451
452static void
453_init_write_path_cpl(void *cb_arg, int reduce_errno)
454{
455 struct reduce_init_load_ctx *init_ctx = cb_arg;
456 struct spdk_reduce_vol *vol = init_ctx->vol;
457
458 init_ctx->iov[0].iov_base = vol->backing_super;
459 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
460 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
461 init_ctx->backing_cb_args.cb_arg = init_ctx;
462 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
463 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
464 &init_ctx->backing_cb_args);
465}
466
467static int
468_allocate_bit_arrays(struct spdk_reduce_vol *vol)
469{
470 uint64_t total_chunks, total_backing_io_units;
471 uint32_t i, num_metadata_io_units;
472
473 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
474 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
475 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
476 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
477
478 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
479 return -ENOMEM;
480 }
481
482 /* Set backing io unit bits associated with metadata. */
483 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
484 vol->backing_dev->blocklen;
485 for (i = 0; i < num_metadata_io_units; i++) {
486 spdk_bit_array_set(vol->allocated_backing_io_units, i);
487 }
488
489 return 0;
490}
491
492void
493spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
494 struct spdk_reduce_backing_dev *backing_dev,
495 const char *pm_file_dir,
496 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
497{
498 struct spdk_reduce_vol *vol;
499 struct reduce_init_load_ctx *init_ctx;
500 uint64_t backing_dev_size;
501 size_t mapped_len;
502 int dir_len, max_dir_len, rc;
503
504 /* We need to append a path separator and the UUID to the supplied
505 * path.
506 */
507 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
508 dir_len = strnlen(pm_file_dir, max_dir_len);
509 /* Strip trailing slash if the user provided one - we will add it back
510 * later when appending the filename.
511 */
512 if (pm_file_dir[dir_len - 1] == '/') {
513 dir_len--;
514 }
515 if (dir_len == max_dir_len) {
516 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
517 cb_fn(cb_arg, NULL, -EINVAL);
518 return;
519 }
520
521 rc = _validate_vol_params(params);
522 if (rc != 0) {
523 SPDK_ERRLOG("invalid vol params\n");
524 cb_fn(cb_arg, NULL, rc);
525 return;
526 }
527
528 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
529 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
530 if (params->vol_size == 0) {
531 SPDK_ERRLOG("backing device is too small\n");
532 cb_fn(cb_arg, NULL, -EINVAL);
533 return;
534 }
535
536 if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
537 backing_dev->unmap == NULL) {
538 SPDK_ERRLOG("backing_dev function pointer not specified\n");
539 cb_fn(cb_arg, NULL, -EINVAL);
540 return;
541 }
542
543 vol = calloc(1, sizeof(*vol));
544 if (vol == NULL) {
545 cb_fn(cb_arg, NULL, -ENOMEM);
546 return;
547 }
548
549 TAILQ_INIT(&vol->free_requests);
550 TAILQ_INIT(&vol->executing_requests);
551 TAILQ_INIT(&vol->queued_requests);
552
f67539c2
TL
553 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
554 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
9f95a23c
TL
555 if (vol->backing_super == NULL) {
556 cb_fn(cb_arg, NULL, -ENOMEM);
557 _init_load_cleanup(vol, NULL);
558 return;
559 }
560
561 init_ctx = calloc(1, sizeof(*init_ctx));
562 if (init_ctx == NULL) {
563 cb_fn(cb_arg, NULL, -ENOMEM);
564 _init_load_cleanup(vol, NULL);
565 return;
566 }
567
f67539c2
TL
568 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
569 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
9f95a23c
TL
570 if (init_ctx->path == NULL) {
571 cb_fn(cb_arg, NULL, -ENOMEM);
572 _init_load_cleanup(vol, init_ctx);
573 return;
574 }
575
576 if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
577 spdk_uuid_generate(&params->uuid);
578 }
579
580 memcpy(vol->pm_file.path, pm_file_dir, dir_len);
581 vol->pm_file.path[dir_len] = '/';
582 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
583 &params->uuid);
584 vol->pm_file.size = _get_pm_file_size(params);
585 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
586 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
587 &mapped_len, &vol->pm_file.pm_is_pmem);
588 if (vol->pm_file.pm_buf == NULL) {
589 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
590 vol->pm_file.path, strerror(errno));
591 cb_fn(cb_arg, NULL, -errno);
592 _init_load_cleanup(vol, init_ctx);
593 return;
594 }
595
596 if (vol->pm_file.size != mapped_len) {
597 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
598 vol->pm_file.size, mapped_len);
599 cb_fn(cb_arg, NULL, -ENOMEM);
600 _init_load_cleanup(vol, init_ctx);
601 return;
602 }
603
604 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
605 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
606 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
607 memcpy(&vol->params, params, sizeof(*params));
608
609 vol->backing_dev = backing_dev;
610
611 rc = _allocate_bit_arrays(vol);
612 if (rc != 0) {
613 cb_fn(cb_arg, NULL, rc);
614 _init_load_cleanup(vol, init_ctx);
615 return;
616 }
617
618 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
619 sizeof(vol->backing_super->signature));
620 memcpy(&vol->backing_super->params, params, sizeof(*params));
621
622 _initialize_vol_pm_pointers(vol);
623
624 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
625 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
626 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
627 */
628 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
629 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
630
631 init_ctx->vol = vol;
632 init_ctx->cb_fn = cb_fn;
633 init_ctx->cb_arg = cb_arg;
634
635 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
636 init_ctx->iov[0].iov_base = init_ctx->path;
637 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
638 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
639 init_ctx->backing_cb_args.cb_arg = init_ctx;
640 /* Write path to offset 4K on backing device - just after where the super
641 * block will be written. We wait until this is committed before writing the
642 * super block to guarantee we don't get the super block written without the
643 * the path if the system crashed in the middle of a write operation.
644 */
645 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
646 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
647 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
648 &init_ctx->backing_cb_args);
649}
650
f67539c2
TL
651static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
652
9f95a23c
TL
653static void
654_load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
655{
656 struct reduce_init_load_ctx *load_ctx = cb_arg;
657 struct spdk_reduce_vol *vol = load_ctx->vol;
658 uint64_t backing_dev_size;
659 uint64_t i, num_chunks, logical_map_index;
660 struct spdk_reduce_chunk_map *chunk;
661 size_t mapped_len;
662 uint32_t j;
663 int rc;
664
f67539c2
TL
665 rc = _alloc_zero_buff();
666 if (rc) {
667 goto error;
668 }
669
9f95a23c
TL
670 if (memcmp(vol->backing_super->signature,
671 SPDK_REDUCE_SIGNATURE,
672 sizeof(vol->backing_super->signature)) != 0) {
673 /* This backing device isn't a libreduce backing device. */
674 rc = -EILSEQ;
675 goto error;
676 }
677
f67539c2
TL
678 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
679 * So don't bother getting the volume ready to use - invoke the callback immediately
680 * so destroy_load_cb can delete the metadata off of the block device and delete the
681 * persistent memory file if it exists.
682 */
683 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
684 if (load_ctx->cb_fn == (*destroy_load_cb)) {
685 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
686 _init_load_cleanup(NULL, load_ctx);
687 return;
688 }
689
9f95a23c
TL
690 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
691 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
692 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
693 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
694
695 rc = _allocate_bit_arrays(vol);
696 if (rc != 0) {
697 goto error;
698 }
699
700 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
701 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
702 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
703 backing_dev_size);
704 rc = -EILSEQ;
705 goto error;
706 }
707
9f95a23c
TL
708 vol->pm_file.size = _get_pm_file_size(&vol->params);
709 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
710 &vol->pm_file.pm_is_pmem);
711 if (vol->pm_file.pm_buf == NULL) {
712 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
713 rc = -errno;
714 goto error;
715 }
716
717 if (vol->pm_file.size != mapped_len) {
718 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
719 vol->pm_file.size, mapped_len);
720 rc = -ENOMEM;
721 goto error;
722 }
723
724 rc = _allocate_vol_requests(vol);
725 if (rc != 0) {
726 goto error;
727 }
728
729 _initialize_vol_pm_pointers(vol);
730
731 num_chunks = vol->params.vol_size / vol->params.chunk_size;
732 for (i = 0; i < num_chunks; i++) {
733 logical_map_index = vol->pm_logical_map[i];
734 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
735 continue;
736 }
737 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
738 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
739 for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
740 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
741 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
742 }
743 }
744 }
745
746 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
747 /* Only clean up the ctx - the vol has been passed to the application
748 * for use now that volume load was successful.
749 */
750 _init_load_cleanup(NULL, load_ctx);
751 return;
752
753error:
754 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
755 _init_load_cleanup(vol, load_ctx);
756}
757
758void
759spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
760 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
761{
762 struct spdk_reduce_vol *vol;
763 struct reduce_init_load_ctx *load_ctx;
764
765 if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
766 backing_dev->unmap == NULL) {
767 SPDK_ERRLOG("backing_dev function pointer not specified\n");
768 cb_fn(cb_arg, NULL, -EINVAL);
769 return;
770 }
771
772 vol = calloc(1, sizeof(*vol));
773 if (vol == NULL) {
774 cb_fn(cb_arg, NULL, -ENOMEM);
775 return;
776 }
777
778 TAILQ_INIT(&vol->free_requests);
779 TAILQ_INIT(&vol->executing_requests);
780 TAILQ_INIT(&vol->queued_requests);
781
f67539c2
TL
782 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
783 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
9f95a23c
TL
784 if (vol->backing_super == NULL) {
785 _init_load_cleanup(vol, NULL);
786 cb_fn(cb_arg, NULL, -ENOMEM);
787 return;
788 }
789
790 vol->backing_dev = backing_dev;
791
792 load_ctx = calloc(1, sizeof(*load_ctx));
793 if (load_ctx == NULL) {
794 _init_load_cleanup(vol, NULL);
795 cb_fn(cb_arg, NULL, -ENOMEM);
796 return;
797 }
798
f67539c2
TL
799 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
800 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
9f95a23c
TL
801 if (load_ctx->path == NULL) {
802 _init_load_cleanup(vol, load_ctx);
803 cb_fn(cb_arg, NULL, -ENOMEM);
804 return;
805 }
806
807 load_ctx->vol = vol;
808 load_ctx->cb_fn = cb_fn;
809 load_ctx->cb_arg = cb_arg;
810
811 load_ctx->iov[0].iov_base = vol->backing_super;
812 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
813 load_ctx->iov[1].iov_base = load_ctx->path;
814 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
815 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
816 load_ctx->backing_cb_args.cb_arg = load_ctx;
817 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
818 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
819 vol->backing_dev->blocklen,
820 &load_ctx->backing_cb_args);
821}
822
823void
824spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
825 spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
826{
827 if (vol == NULL) {
828 /* This indicates a programming error. */
829 assert(false);
830 cb_fn(cb_arg, -EINVAL);
831 return;
832 }
833
f67539c2
TL
834 if (--g_vol_count == 0) {
835 spdk_free(g_zero_buf);
836 }
837 assert(g_vol_count >= 0);
9f95a23c
TL
838 _init_load_cleanup(vol, NULL);
839 cb_fn(cb_arg, 0);
840}
841
842struct reduce_destroy_ctx {
843 spdk_reduce_vol_op_complete cb_fn;
844 void *cb_arg;
845 struct spdk_reduce_vol *vol;
846 struct spdk_reduce_vol_superblock *super;
847 struct iovec iov;
848 struct spdk_reduce_vol_cb_args backing_cb_args;
849 int reduce_errno;
850 char pm_path[REDUCE_PATH_MAX];
851};
852
853static void
854destroy_unload_cpl(void *cb_arg, int reduce_errno)
855{
856 struct reduce_destroy_ctx *destroy_ctx = cb_arg;
857
858 if (destroy_ctx->reduce_errno == 0) {
859 if (unlink(destroy_ctx->pm_path)) {
860 SPDK_ERRLOG("%s could not be unlinked: %s\n",
861 destroy_ctx->pm_path, strerror(errno));
862 }
863 }
864
865 /* Even if the unload somehow failed, we still pass the destroy_ctx
866 * reduce_errno since that indicates whether or not the volume was
867 * actually destroyed.
868 */
869 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
f67539c2 870 spdk_free(destroy_ctx->super);
9f95a23c
TL
871 free(destroy_ctx);
872}
873
874static void
875_destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
876{
877 struct reduce_destroy_ctx *destroy_ctx = cb_arg;
878 struct spdk_reduce_vol *vol = destroy_ctx->vol;
879
880 destroy_ctx->reduce_errno = reduce_errno;
881 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
882}
883
884static void
885destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
886{
887 struct reduce_destroy_ctx *destroy_ctx = cb_arg;
888
889 if (reduce_errno != 0) {
890 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
f67539c2 891 spdk_free(destroy_ctx->super);
9f95a23c
TL
892 free(destroy_ctx);
893 return;
894 }
895
896 destroy_ctx->vol = vol;
897 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
898 destroy_ctx->iov.iov_base = destroy_ctx->super;
899 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
900 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
901 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
902 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
903 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
904 &destroy_ctx->backing_cb_args);
905}
906
907void
908spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
909 spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
910{
911 struct reduce_destroy_ctx *destroy_ctx;
912
913 destroy_ctx = calloc(1, sizeof(*destroy_ctx));
914 if (destroy_ctx == NULL) {
915 cb_fn(cb_arg, -ENOMEM);
916 return;
917 }
918
f67539c2
TL
919 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
920 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
9f95a23c
TL
921 if (destroy_ctx->super == NULL) {
922 free(destroy_ctx);
923 cb_fn(cb_arg, -ENOMEM);
924 return;
925 }
926 destroy_ctx->cb_fn = cb_fn;
927 destroy_ctx->cb_arg = cb_arg;
928 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
929}
930
931static bool
932_request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
933{
934 uint64_t start_chunk, end_chunk;
935
936 start_chunk = offset / vol->logical_blocks_per_chunk;
937 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
938
939 return (start_chunk != end_chunk);
940}
941
942typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
943
944static void
945_reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
946{
947 struct spdk_reduce_vol_request *next_req;
948 struct spdk_reduce_vol *vol = req->vol;
949
950 req->cb_fn(req->cb_arg, reduce_errno);
951 TAILQ_REMOVE(&vol->executing_requests, req, tailq);
952
953 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
954 if (next_req->logical_map_index == req->logical_map_index) {
955 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
956 if (next_req->type == REDUCE_IO_READV) {
957 _start_readv_request(next_req);
958 } else {
959 assert(next_req->type == REDUCE_IO_WRITEV);
960 _start_writev_request(next_req);
961 }
962 break;
963 }
964 }
965
966 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
967}
968
969static void
970_write_write_done(void *_req, int reduce_errno)
971{
972 struct spdk_reduce_vol_request *req = _req;
973 struct spdk_reduce_vol *vol = req->vol;
974 uint64_t old_chunk_map_index;
975 struct spdk_reduce_chunk_map *old_chunk;
976 uint32_t i;
977
978 if (reduce_errno != 0) {
979 req->reduce_errno = reduce_errno;
980 }
981
982 assert(req->num_backing_ops > 0);
983 if (--req->num_backing_ops > 0) {
984 return;
985 }
986
987 if (req->reduce_errno != 0) {
988 _reduce_vol_complete_req(req, req->reduce_errno);
989 return;
990 }
991
992 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
993 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
994 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
995 for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
996 if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
997 break;
998 }
999 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true);
1000 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]);
1001 old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1002 }
1003 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
1004 }
1005
1006 /*
1007 * We don't need to persist the clearing of the old chunk map here. The old chunk map
1008 * becomes invalid after we update the logical map, since the old chunk map will no
1009 * longer have a reference to it in the logical map.
1010 */
1011
1012 /* Persist the new chunk map. This must be persisted before we update the logical map. */
f67539c2
TL
1013 _reduce_persist(vol, req->chunk,
1014 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
9f95a23c
TL
1015
1016 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1017
1018 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1019
1020 _reduce_vol_complete_req(req, 0);
1021}
1022
1023static void
1024_issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1025 reduce_request_fn next_fn, bool is_write)
1026{
1027 struct iovec *iov;
1028 uint8_t *buf;
1029 uint32_t i;
1030
1031 if (req->chunk_is_compressed) {
1032 iov = req->comp_buf_iov;
1033 buf = req->comp_buf;
1034 } else {
1035 iov = req->decomp_buf_iov;
1036 buf = req->decomp_buf;
1037 }
1038
1039 req->num_backing_ops = req->num_io_units;
1040 req->backing_cb_args.cb_fn = next_fn;
1041 req->backing_cb_args.cb_arg = req;
1042 for (i = 0; i < req->num_io_units; i++) {
1043 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1044 iov[i].iov_len = vol->params.backing_io_unit_size;
1045 if (is_write) {
1046 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1,
1047 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1048 vol->backing_lba_per_io_unit, &req->backing_cb_args);
1049 } else {
1050 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1,
1051 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1052 vol->backing_lba_per_io_unit, &req->backing_cb_args);
1053 }
1054 }
1055}
1056
1057static void
1058_reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1059 uint32_t compressed_size)
1060{
1061 struct spdk_reduce_vol *vol = req->vol;
1062 uint32_t i;
f67539c2
TL
1063 uint64_t chunk_offset, remainder, total_len = 0;
1064 uint8_t *buf;
1065 int j;
9f95a23c
TL
1066
1067 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
1068
1069 /* TODO: fail if no chunk map found - but really this should not happen if we
1070 * size the number of requests similarly to number of extra chunk maps
1071 */
1072 assert(req->chunk_map_index != UINT32_MAX);
1073 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1074
1075 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1076 req->num_io_units = spdk_divide_round_up(compressed_size,
1077 vol->params.backing_io_unit_size);
1078 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1079 req->chunk->compressed_size =
1080 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1081
f67539c2
TL
1082 /* if the chunk is uncompressed we need to copy the data from the host buffers. */
1083 if (req->chunk_is_compressed == false) {
1084 chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1085 buf = req->decomp_buf;
1086 total_len = chunk_offset * vol->params.logical_block_size;
1087
1088 /* zero any offset into chunk */
1089 if (req->rmw == false && chunk_offset) {
1090 memset(buf, 0, total_len);
1091 }
1092 buf += total_len;
1093
1094 /* copy the data */
1095 for (j = 0; j < req->iovcnt; j++) {
1096 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1097 buf += req->iov[j].iov_len;
1098 total_len += req->iov[j].iov_len;
1099 }
1100
1101 /* zero any remainder */
1102 remainder = vol->params.chunk_size - total_len;
1103 total_len += remainder;
1104 if (req->rmw == false && remainder) {
1105 memset(buf, 0, remainder);
1106 }
1107 assert(total_len == vol->params.chunk_size);
1108 }
1109
9f95a23c
TL
1110 for (i = 0; i < req->num_io_units; i++) {
1111 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
1112 /* TODO: fail if no backing block found - but really this should also not
1113 * happen (see comment above).
1114 */
1115 assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1116 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1117 }
9f95a23c
TL
1118
1119 _issue_backing_ops(req, vol, next_fn, true /* write */);
1120}
1121
1122static void
1123_write_compress_done(void *_req, int reduce_errno)
1124{
1125 struct spdk_reduce_vol_request *req = _req;
1126
1127 /* Negative reduce_errno indicates failure for compression operations.
1128 * Just write the uncompressed data instead. Force this to happen
1129 * by just passing the full chunk size to _reduce_vol_write_chunk.
1130 * When it sees the data couldn't be compressed, it will just write
1131 * the uncompressed buffer to disk.
1132 */
1133 if (reduce_errno < 0) {
1134 reduce_errno = req->vol->params.chunk_size;
1135 }
1136
1137 /* Positive reduce_errno indicates number of bytes in compressed buffer. */
1138 _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno);
1139}
1140
1141static void
1142_reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1143{
1144 struct spdk_reduce_vol *vol = req->vol;
1145
1146 req->backing_cb_args.cb_fn = next_fn;
1147 req->backing_cb_args.cb_arg = req;
1148 req->comp_buf_iov[0].iov_base = req->comp_buf;
1149 req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
9f95a23c 1150 vol->backing_dev->compress(vol->backing_dev,
f67539c2 1151 &req->decomp_iov[0], req->decomp_iovcnt, req->comp_buf_iov, 1,
9f95a23c
TL
1152 &req->backing_cb_args);
1153}
1154
1155static void
f67539c2 1156_reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
9f95a23c
TL
1157{
1158 struct spdk_reduce_vol *vol = req->vol;
1159
1160 req->backing_cb_args.cb_fn = next_fn;
1161 req->backing_cb_args.cb_arg = req;
1162 req->comp_buf_iov[0].iov_base = req->comp_buf;
1163 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1164 req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1165 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1166 vol->backing_dev->decompress(vol->backing_dev,
1167 req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1168 &req->backing_cb_args);
1169}
1170
f67539c2
TL
1171static void
1172_reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1173{
1174 struct spdk_reduce_vol *vol = req->vol;
1175 uint64_t chunk_offset, remainder = 0;
1176 uint64_t ttl_len = 0;
1177 int i;
1178
1179 req->decomp_iovcnt = 0;
1180 chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1181
1182 if (chunk_offset) {
1183 /* first iov point to our scratch buffer for any offset into the chunk */
1184 req->decomp_iov[0].iov_base = req->decomp_buf;
1185 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1186 ttl_len += req->decomp_iov[0].iov_len;
1187 req->decomp_iovcnt = 1;
1188 }
1189
1190 /* now the user data iov, direct to the user buffer */
1191 for (i = 0; i < req->iovcnt; i++) {
1192 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1193 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1194 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1195 }
1196 req->decomp_iovcnt += req->iovcnt;
1197
1198 /* send the rest of the chunk to our scratch buffer */
1199 remainder = vol->params.chunk_size - ttl_len;
1200 if (remainder) {
1201 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1202 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1203 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1204 req->decomp_iovcnt++;
1205 }
1206 assert(ttl_len == vol->params.chunk_size);
1207
1208 req->backing_cb_args.cb_fn = next_fn;
1209 req->backing_cb_args.cb_arg = req;
1210 req->comp_buf_iov[0].iov_base = req->comp_buf;
1211 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1212 vol->backing_dev->decompress(vol->backing_dev,
1213 req->comp_buf_iov, 1, &req->decomp_iov[0], req->decomp_iovcnt,
1214 &req->backing_cb_args);
1215}
1216
9f95a23c
TL
1217static void
1218_write_decompress_done(void *_req, int reduce_errno)
1219{
1220 struct spdk_reduce_vol_request *req = _req;
1221 struct spdk_reduce_vol *vol = req->vol;
f67539c2 1222 uint64_t chunk_offset, remainder, ttl_len = 0;
9f95a23c
TL
1223 int i;
1224
1225 /* Negative reduce_errno indicates failure for compression operations. */
1226 if (reduce_errno < 0) {
1227 _reduce_vol_complete_req(req, reduce_errno);
1228 return;
1229 }
1230
1231 /* Positive reduce_errno indicates number of bytes in decompressed
1232 * buffer. This should equal the chunk size - otherwise that's another
1233 * type of failure.
1234 */
1235 if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1236 _reduce_vol_complete_req(req, -EIO);
1237 return;
1238 }
1239
f67539c2 1240 req->decomp_iovcnt = 0;
9f95a23c 1241 chunk_offset = req->offset % vol->logical_blocks_per_chunk;
f67539c2
TL
1242
1243 if (chunk_offset) {
1244 req->decomp_iov[0].iov_base = req->decomp_buf;
1245 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1246 ttl_len += req->decomp_iov[0].iov_len;
1247 req->decomp_iovcnt = 1;
1248 }
1249
9f95a23c 1250 for (i = 0; i < req->iovcnt; i++) {
f67539c2
TL
1251 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1252 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1253 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1254 }
1255 req->decomp_iovcnt += req->iovcnt;
1256
1257 remainder = vol->params.chunk_size - ttl_len;
1258 if (remainder) {
1259 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1260 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1261 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1262 req->decomp_iovcnt++;
9f95a23c 1263 }
f67539c2 1264 assert(ttl_len == vol->params.chunk_size);
9f95a23c
TL
1265
1266 _reduce_vol_compress_chunk(req, _write_compress_done);
1267}
1268
1269static void
1270_write_read_done(void *_req, int reduce_errno)
1271{
1272 struct spdk_reduce_vol_request *req = _req;
1273
1274 if (reduce_errno != 0) {
1275 req->reduce_errno = reduce_errno;
1276 }
1277
1278 assert(req->num_backing_ops > 0);
1279 if (--req->num_backing_ops > 0) {
1280 return;
1281 }
1282
1283 if (req->reduce_errno != 0) {
1284 _reduce_vol_complete_req(req, req->reduce_errno);
1285 return;
1286 }
1287
1288 if (req->chunk_is_compressed) {
f67539c2 1289 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
9f95a23c
TL
1290 } else {
1291 _write_decompress_done(req, req->chunk->compressed_size);
1292 }
1293}
1294
1295static void
1296_read_decompress_done(void *_req, int reduce_errno)
1297{
1298 struct spdk_reduce_vol_request *req = _req;
1299 struct spdk_reduce_vol *vol = req->vol;
9f95a23c
TL
1300
1301 /* Negative reduce_errno indicates failure for compression operations. */
1302 if (reduce_errno < 0) {
1303 _reduce_vol_complete_req(req, reduce_errno);
1304 return;
1305 }
1306
1307 /* Positive reduce_errno indicates number of bytes in decompressed
1308 * buffer. This should equal the chunk size - otherwise that's another
1309 * type of failure.
1310 */
1311 if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1312 _reduce_vol_complete_req(req, -EIO);
1313 return;
1314 }
1315
9f95a23c
TL
1316 _reduce_vol_complete_req(req, 0);
1317}
1318
1319static void
1320_read_read_done(void *_req, int reduce_errno)
1321{
1322 struct spdk_reduce_vol_request *req = _req;
f67539c2
TL
1323 uint64_t chunk_offset;
1324 uint8_t *buf;
1325 int i;
9f95a23c
TL
1326
1327 if (reduce_errno != 0) {
1328 req->reduce_errno = reduce_errno;
1329 }
1330
1331 assert(req->num_backing_ops > 0);
1332 if (--req->num_backing_ops > 0) {
1333 return;
1334 }
1335
1336 if (req->reduce_errno != 0) {
1337 _reduce_vol_complete_req(req, req->reduce_errno);
1338 return;
1339 }
1340
1341 if (req->chunk_is_compressed) {
1342 _reduce_vol_decompress_chunk(req, _read_decompress_done);
1343 } else {
f67539c2
TL
1344
1345 /* If the chunk was compressed, the data would have been sent to the
1346 * host buffers by the decompression operation, if not we need to memcpy here.
1347 */
1348 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1349 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1350 for (i = 0; i < req->iovcnt; i++) {
1351 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1352 buf += req->iov[i].iov_len;
1353 }
1354
9f95a23c
TL
1355 _read_decompress_done(req, req->chunk->compressed_size);
1356 }
1357}
1358
1359static void
1360_reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1361{
1362 struct spdk_reduce_vol *vol = req->vol;
1363
1364 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1365 assert(req->chunk_map_index != UINT32_MAX);
1366
1367 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1368 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1369 vol->params.backing_io_unit_size);
1370 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1371
1372 _issue_backing_ops(req, vol, next_fn, false /* read */);
1373}
1374
1375static bool
1376_iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1377 uint64_t length)
1378{
1379 uint64_t size = 0;
1380 int i;
1381
f67539c2
TL
1382 if (iovcnt > REDUCE_MAX_IOVECS) {
1383 return false;
1384 }
1385
9f95a23c
TL
1386 for (i = 0; i < iovcnt; i++) {
1387 size += iov[i].iov_len;
1388 }
1389
1390 return size == (length * vol->params.logical_block_size);
1391}
1392
1393static bool
1394_check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1395{
1396 struct spdk_reduce_vol_request *req;
1397
1398 TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1399 if (logical_map_index == req->logical_map_index) {
1400 return true;
1401 }
1402 }
1403
1404 return false;
1405}
1406
1407static void
1408_start_readv_request(struct spdk_reduce_vol_request *req)
1409{
1410 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1411 _reduce_vol_read_chunk(req, _read_read_done);
1412}
1413
1414void
1415spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1416 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1417 spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1418{
1419 struct spdk_reduce_vol_request *req;
1420 uint64_t logical_map_index;
1421 bool overlapped;
1422 int i;
1423
1424 if (length == 0) {
1425 cb_fn(cb_arg, 0);
1426 return;
1427 }
1428
1429 if (_request_spans_chunk_boundary(vol, offset, length)) {
1430 cb_fn(cb_arg, -EINVAL);
1431 return;
1432 }
1433
1434 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1435 cb_fn(cb_arg, -EINVAL);
1436 return;
1437 }
1438
1439 logical_map_index = offset / vol->logical_blocks_per_chunk;
1440 overlapped = _check_overlap(vol, logical_map_index);
1441
1442 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1443 /*
1444 * This chunk hasn't been allocated. So treat the data as all
1445 * zeroes for this chunk - do the memset and immediately complete
1446 * the operation.
1447 */
1448 for (i = 0; i < iovcnt; i++) {
1449 memset(iov[i].iov_base, 0, iov[i].iov_len);
1450 }
1451 cb_fn(cb_arg, 0);
1452 return;
1453 }
1454
1455 req = TAILQ_FIRST(&vol->free_requests);
1456 if (req == NULL) {
1457 cb_fn(cb_arg, -ENOMEM);
1458 return;
1459 }
1460
1461 TAILQ_REMOVE(&vol->free_requests, req, tailq);
1462 req->type = REDUCE_IO_READV;
1463 req->vol = vol;
1464 req->iov = iov;
1465 req->iovcnt = iovcnt;
1466 req->offset = offset;
1467 req->logical_map_index = logical_map_index;
1468 req->length = length;
1469 req->cb_fn = cb_fn;
1470 req->cb_arg = cb_arg;
1471
1472 if (!overlapped) {
1473 _start_readv_request(req);
1474 } else {
1475 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1476 }
1477}
1478
1479static void
1480_start_writev_request(struct spdk_reduce_vol_request *req)
1481{
1482 struct spdk_reduce_vol *vol = req->vol;
f67539c2
TL
1483 uint64_t chunk_offset, ttl_len = 0;
1484 uint64_t remainder = 0;
1485 uint32_t lbsize;
9f95a23c 1486 int i;
9f95a23c
TL
1487
1488 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1489 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
f67539c2
TL
1490 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1491 /* Read old chunk, then overwrite with data from this write
1492 * operation.
1493 */
1494 req->rmw = true;
1495 _reduce_vol_read_chunk(req, _write_read_done);
1496 return;
1497 }
9f95a23c
TL
1498 }
1499
9f95a23c 1500 lbsize = vol->params.logical_block_size;
f67539c2
TL
1501 req->decomp_iovcnt = 0;
1502 req->rmw = false;
1503
1504 /* Note: point to our zero buf for offset into the chunk. */
1505 chunk_offset = req->offset % vol->logical_blocks_per_chunk;
9f95a23c 1506 if (chunk_offset != 0) {
f67539c2
TL
1507 ttl_len += chunk_offset * lbsize;
1508 req->decomp_iov[0].iov_base = g_zero_buf;
1509 req->decomp_iov[0].iov_len = ttl_len;
1510 req->decomp_iovcnt = 1;
9f95a23c 1511 }
f67539c2
TL
1512
1513 /* now the user data iov, direct from the user buffer */
9f95a23c 1514 for (i = 0; i < req->iovcnt; i++) {
f67539c2
TL
1515 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1516 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1517 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
9f95a23c 1518 }
f67539c2
TL
1519 req->decomp_iovcnt += req->iovcnt;
1520
1521 remainder = vol->params.chunk_size - ttl_len;
1522 if (remainder) {
1523 req->decomp_iov[req->decomp_iovcnt].iov_base = g_zero_buf;
1524 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1525 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1526 req->decomp_iovcnt++;
9f95a23c 1527 }
f67539c2
TL
1528 assert(ttl_len == req->vol->params.chunk_size);
1529
9f95a23c
TL
1530 _reduce_vol_compress_chunk(req, _write_compress_done);
1531}
1532
1533void
1534spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1535 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1536 spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1537{
1538 struct spdk_reduce_vol_request *req;
1539 uint64_t logical_map_index;
1540 bool overlapped;
1541
1542 if (length == 0) {
1543 cb_fn(cb_arg, 0);
1544 return;
1545 }
1546
1547 if (_request_spans_chunk_boundary(vol, offset, length)) {
1548 cb_fn(cb_arg, -EINVAL);
1549 return;
1550 }
1551
1552 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1553 cb_fn(cb_arg, -EINVAL);
1554 return;
1555 }
1556
1557 logical_map_index = offset / vol->logical_blocks_per_chunk;
1558 overlapped = _check_overlap(vol, logical_map_index);
1559
1560 req = TAILQ_FIRST(&vol->free_requests);
1561 if (req == NULL) {
1562 cb_fn(cb_arg, -ENOMEM);
1563 return;
1564 }
1565
1566 TAILQ_REMOVE(&vol->free_requests, req, tailq);
1567 req->type = REDUCE_IO_WRITEV;
1568 req->vol = vol;
1569 req->iov = iov;
1570 req->iovcnt = iovcnt;
1571 req->offset = offset;
1572 req->logical_map_index = logical_map_index;
1573 req->length = length;
1574 req->cb_fn = cb_fn;
1575 req->cb_arg = cb_arg;
1576
1577 if (!overlapped) {
1578 _start_writev_request(req);
1579 } else {
1580 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1581 }
1582}
1583
1584const struct spdk_reduce_vol_params *
1585spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1586{
1587 return &vol->params;
1588}
1589
f67539c2
TL
1590void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1591{
1592 uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1593 uint32_t struct_size;
1594 uint64_t chunk_map_size;
1595
1596 SPDK_NOTICELOG("vol info:\n");
1597 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1598 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1599 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1600 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1601 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1602 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1603 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1604 vol->params.vol_size / vol->params.chunk_size);
1605 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1606 vol->params.backing_io_unit_size);
1607 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1608 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
1609 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1610
1611 SPDK_NOTICELOG("pmem info:\n");
1612 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1613 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1614 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1615 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1616 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1617 vol->params.chunk_size);
1618 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1619 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1620 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1621 vol->params.backing_io_unit_size);
1622 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1623}
1624
9f95a23c 1625SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)