]> git.proxmox.com Git - ceph.git/blob - ceph/src/zstd/contrib/pzstd/Pzstd.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / zstd / contrib / pzstd / Pzstd.h
1 /**
2 * Copyright (c) 2016-present, Facebook, Inc.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree. An additional grant
7 * of patent rights can be found in the PATENTS file in the same directory.
8 */
9 #pragma once
10
11 #include "ErrorHolder.h"
12 #include "Logging.h"
13 #include "Options.h"
14 #include "utils/Buffer.h"
15 #include "utils/Range.h"
16 #include "utils/ResourcePool.h"
17 #include "utils/ThreadPool.h"
18 #include "utils/WorkQueue.h"
19 #define ZSTD_STATIC_LINKING_ONLY
20 #include "zstd.h"
21 #undef ZSTD_STATIC_LINKING_ONLY
22
23 #include <cstddef>
24 #include <cstdint>
25 #include <memory>
26
27 namespace pzstd {
28 /**
29 * Runs pzstd with `options` and returns the number of bytes written.
30 * An error occurred if `errorHandler.hasError()`.
31 *
32 * @param options The pzstd options to use for (de)compression
33 * @returns 0 upon success and non-zero on failure.
34 */
35 int pzstdMain(const Options& options);
36
37 class SharedState {
38 public:
39 SharedState(const Options& options) : log(options.verbosity) {
40 if (!options.decompress) {
41 auto parameters = options.determineParameters();
42 cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
43 [this, parameters]() -> ZSTD_CStream* {
44 this->log(VERBOSE, "Creating new ZSTD_CStream\n");
45 auto zcs = ZSTD_createCStream();
46 if (zcs) {
47 auto err = ZSTD_initCStream_advanced(
48 zcs, nullptr, 0, parameters, 0);
49 if (ZSTD_isError(err)) {
50 ZSTD_freeCStream(zcs);
51 return nullptr;
52 }
53 }
54 return zcs;
55 },
56 [](ZSTD_CStream *zcs) {
57 ZSTD_freeCStream(zcs);
58 }});
59 } else {
60 dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
61 [this]() -> ZSTD_DStream* {
62 this->log(VERBOSE, "Creating new ZSTD_DStream\n");
63 auto zds = ZSTD_createDStream();
64 if (zds) {
65 auto err = ZSTD_initDStream(zds);
66 if (ZSTD_isError(err)) {
67 ZSTD_freeDStream(zds);
68 return nullptr;
69 }
70 }
71 return zds;
72 },
73 [](ZSTD_DStream *zds) {
74 ZSTD_freeDStream(zds);
75 }});
76 }
77 }
78
79 ~SharedState() {
80 // The resource pools have references to this, so destroy them first.
81 cStreamPool.reset();
82 dStreamPool.reset();
83 }
84
85 Logger log;
86 ErrorHolder errorHolder;
87 std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
88 std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
89 };
90
91 /**
92 * Streams input from `fd`, breaks input up into chunks, and compresses each
93 * chunk independently. Output of each chunk gets streamed to a queue, and
94 * the output queues get put into `chunks` in order.
95 *
96 * @param state The shared state
97 * @param chunks Each compression jobs output queue gets `pushed()` here
98 * as soon as it is available
99 * @param executor The thread pool to run compression jobs in
100 * @param fd The input file descriptor
101 * @param size The size of the input file if known, 0 otherwise
102 * @param numThreads The number of threads in the thread pool
103 * @param parameters The zstd parameters to use for compression
104 * @returns The number of bytes read from the file
105 */
106 std::uint64_t asyncCompressChunks(
107 SharedState& state,
108 WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
109 ThreadPool& executor,
110 FILE* fd,
111 std::uintmax_t size,
112 std::size_t numThreads,
113 ZSTD_parameters parameters);
114
115 /**
116 * Streams input from `fd`. If pzstd headers are available it breaks the input
117 * up into independent frames. It sends each frame to an independent
118 * decompression job. Output of each frame gets streamed to a queue, and
119 * the output queues get put into `frames` in order.
120 *
121 * @param state The shared state
122 * @param frames Each decompression jobs output queue gets `pushed()` here
123 * as soon as it is available
124 * @param executor The thread pool to run compression jobs in
125 * @param fd The input file descriptor
126 * @returns The number of bytes read from the file
127 */
128 std::uint64_t asyncDecompressFrames(
129 SharedState& state,
130 WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
131 ThreadPool& executor,
132 FILE* fd);
133
134 /**
135 * Streams input in from each queue in `outs` in order, and writes the data to
136 * `outputFd`.
137 *
138 * @param state The shared state
139 * @param outs A queue of output queues, one for each
140 * (de)compression job.
141 * @param outputFd The file descriptor to write to
142 * @param decompress Are we decompressing?
143 * @returns The number of bytes written
144 */
145 std::uint64_t writeFile(
146 SharedState& state,
147 WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
148 FILE* outputFd,
149 bool decompress);
150 }