2 * Copyright (c) 2016-present, Facebook, Inc.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
9 #include "platform.h" /* Large Files support, SET_BINARY_MODE */
11 #include "SkippableFrame.h"
12 #include "utils/FileSystem.h"
13 #include "utils/Range.h"
14 #include "utils/ScopeGuard.h"
15 #include "utils/ThreadPool.h"
16 #include "utils/WorkQueue.h"
30 const std::string nullOutput
= "nul";
32 const std::string nullOutput
= "/dev/null";
38 static std::uintmax_t fileSizeOrZero(const std::string
&file
) {
43 auto size
= file_size(file
, ec
);
50 static std::uint64_t handleOneInput(const Options
&options
,
51 const std::string
&inputFile
,
53 const std::string
&outputFile
,
56 auto inputSize
= fileSizeOrZero(inputFile
);
57 // WorkQueue outlives ThreadPool so in the case of error we are certain
58 // we don't accidentally try to call push() on it after it is destroyed
59 WorkQueue
<std::shared_ptr
<BufferWorkQueue
>> outs
{options
.numThreads
+ 1};
60 std::uint64_t bytesRead
;
61 std::uint64_t bytesWritten
;
63 // Initialize the (de)compression thread pool with numThreads
64 ThreadPool
executor(options
.numThreads
);
65 // Run the reader thread on an extra thread
66 ThreadPool
readExecutor(1);
67 if (!options
.decompress
) {
68 // Add a job that reads the input and starts all the compression jobs
70 [&state
, &outs
, &executor
, inputFd
, inputSize
, &options
, &bytesRead
] {
71 bytesRead
= asyncCompressChunks(
78 options
.determineParameters());
81 bytesWritten
= writeFile(state
, outs
, outputFd
, options
.decompress
);
83 // Add a job that reads the input and starts all the decompression jobs
84 readExecutor
.add([&state
, &outs
, &executor
, inputFd
, &bytesRead
] {
85 bytesRead
= asyncDecompressFrames(state
, outs
, executor
, inputFd
);
88 bytesWritten
= writeFile(state
, outs
, outputFd
, options
.decompress
);
91 if (!state
.errorHolder
.hasError()) {
92 std::string inputFileName
= inputFile
== "-" ? "stdin" : inputFile
;
93 std::string outputFileName
= outputFile
== "-" ? "stdout" : outputFile
;
94 if (!options
.decompress
) {
95 double ratio
= static_cast<double>(bytesWritten
) /
96 static_cast<double>(bytesRead
+ !bytesRead
);
97 state
.log(INFO
, "%-20s :%6.2f%% (%6" PRIu64
" => %6" PRIu64
99 inputFileName
.c_str(), ratio
* 100, bytesRead
, bytesWritten
,
100 outputFileName
.c_str());
102 state
.log(INFO
, "%-20s: %" PRIu64
" bytes \n",
103 inputFileName
.c_str(),bytesWritten
);
109 static FILE *openInputFile(const std::string
&inputFile
,
110 ErrorHolder
&errorHolder
) {
111 if (inputFile
== "-") {
112 SET_BINARY_MODE(stdin
);
115 // Check if input file is a directory
118 if (is_directory(inputFile
, ec
)) {
119 errorHolder
.setError("Output file is a directory -- ignored");
123 auto inputFd
= std::fopen(inputFile
.c_str(), "rb");
124 if (!errorHolder
.check(inputFd
!= nullptr, "Failed to open input file")) {
130 static FILE *openOutputFile(const Options
&options
,
131 const std::string
&outputFile
,
132 SharedState
& state
) {
133 if (outputFile
== "-") {
134 SET_BINARY_MODE(stdout
);
137 // Check if the output file exists and then open it
138 if (!options
.overwrite
&& outputFile
!= nullOutput
) {
139 auto outputFd
= std::fopen(outputFile
.c_str(), "rb");
140 if (outputFd
!= nullptr) {
141 std::fclose(outputFd
);
142 if (!state
.log
.logsAt(INFO
)) {
143 state
.errorHolder
.setError("Output file exists");
148 "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
151 if (c
!= 'y' && c
!= 'Y') {
152 state
.errorHolder
.setError("Not overwritten");
157 auto outputFd
= std::fopen(outputFile
.c_str(), "wb");
158 if (!state
.errorHolder
.check(
159 outputFd
!= nullptr, "Failed to open output file")) {
165 int pzstdMain(const Options
&options
) {
167 SharedState
state(options
);
168 for (const auto& input
: options
.inputFiles
) {
169 // Setup the shared state
170 auto printErrorGuard
= makeScopeGuard([&] {
171 if (state
.errorHolder
.hasError()) {
173 state
.log(ERROR
, "pzstd: %s: %s.\n", input
.c_str(),
174 state
.errorHolder
.getError().c_str());
177 // Open the input file
178 auto inputFd
= openInputFile(input
, state
.errorHolder
);
179 if (inputFd
== nullptr) {
182 auto closeInputGuard
= makeScopeGuard([&] { std::fclose(inputFd
); });
183 // Open the output file
184 auto outputFile
= options
.getOutputFile(input
);
185 if (!state
.errorHolder
.check(outputFile
!= "",
186 "Input file does not have extension .zst")) {
189 auto outputFd
= openOutputFile(options
, outputFile
, state
);
190 if (outputFd
== nullptr) {
193 auto closeOutputGuard
= makeScopeGuard([&] { std::fclose(outputFd
); });
194 // (de)compress the file
195 handleOneInput(options
, input
, inputFd
, outputFile
, outputFd
, state
);
196 if (state
.errorHolder
.hasError()) {
199 // Delete the input file if necessary
200 if (!options
.keepSource
) {
201 // Be sure that we are done and have written everything before we delete
202 if (!state
.errorHolder
.check(std::fclose(inputFd
) == 0,
203 "Failed to close input file")) {
206 closeInputGuard
.dismiss();
207 if (!state
.errorHolder
.check(std::fclose(outputFd
) == 0,
208 "Failed to close output file")) {
211 closeOutputGuard
.dismiss();
212 if (std::remove(input
.c_str()) != 0) {
213 state
.errorHolder
.setError("Failed to remove input file");
218 // Returns 1 if any of the files failed to (de)compress.
222 /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
223 static ZSTD_inBuffer
makeZstdInBuffer(const Buffer
& buffer
) {
224 return ZSTD_inBuffer
{buffer
.data(), buffer
.size(), 0};
228 * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
231 void advance(Buffer
& buffer
, ZSTD_inBuffer
& inBuffer
) {
232 auto pos
= inBuffer
.pos
;
233 inBuffer
.src
= static_cast<const unsigned char*>(inBuffer
.src
) + pos
;
234 inBuffer
.size
-= pos
;
236 return buffer
.advance(pos
);
239 /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
240 static ZSTD_outBuffer
makeZstdOutBuffer(Buffer
& buffer
) {
241 return ZSTD_outBuffer
{buffer
.data(), buffer
.size(), 0};
245 * Split `buffer` and advance `outBuffer` by the amount of data written, as
246 * indicated by `outBuffer.pos`.
248 Buffer
split(Buffer
& buffer
, ZSTD_outBuffer
& outBuffer
) {
249 auto pos
= outBuffer
.pos
;
250 outBuffer
.dst
= static_cast<unsigned char*>(outBuffer
.dst
) + pos
;
251 outBuffer
.size
-= pos
;
253 return buffer
.splitAt(pos
);
257 * Stream chunks of input from `in`, compress it, and stream it out to `out`.
259 * @param state The shared state
260 * @param in Queue that we `pop()` input buffers from
261 * @param out Queue that we `push()` compressed output buffers to
262 * @param maxInputSize An upper bound on the size of the input
264 static void compress(
266 std::shared_ptr
<BufferWorkQueue
> in
,
267 std::shared_ptr
<BufferWorkQueue
> out
,
268 size_t maxInputSize
) {
269 auto& errorHolder
= state
.errorHolder
;
270 auto guard
= makeScopeGuard([&] { out
->finish(); });
271 // Initialize the CCtx
272 auto ctx
= state
.cStreamPool
->get();
273 if (!errorHolder
.check(ctx
!= nullptr, "Failed to allocate ZSTD_CStream")) {
277 auto err
= ZSTD_resetCStream(ctx
.get(), 0);
278 if (!errorHolder
.check(!ZSTD_isError(err
), ZSTD_getErrorName(err
))) {
283 // Allocate space for the result
284 auto outBuffer
= Buffer(ZSTD_compressBound(maxInputSize
));
285 auto zstdOutBuffer
= makeZstdOutBuffer(outBuffer
);
288 // Read a buffer in from the input queue
289 while (in
->pop(inBuffer
) && !errorHolder
.hasError()) {
290 auto zstdInBuffer
= makeZstdInBuffer(inBuffer
);
291 // Compress the whole buffer and send it to the output queue
292 while (!inBuffer
.empty() && !errorHolder
.hasError()) {
293 if (!errorHolder
.check(
294 !outBuffer
.empty(), "ZSTD_compressBound() was too small")) {
299 ZSTD_compressStream(ctx
.get(), &zstdOutBuffer
, &zstdInBuffer
);
300 if (!errorHolder
.check(!ZSTD_isError(err
), ZSTD_getErrorName(err
))) {
303 // Split the compressed data off outBuffer and pass to the output queue
304 out
->push(split(outBuffer
, zstdOutBuffer
));
305 // Forget about the data we already compressed
306 advance(inBuffer
, zstdInBuffer
);
313 if (!errorHolder
.check(
314 !outBuffer
.empty(), "ZSTD_compressBound() was too small")) {
317 bytesLeft
= ZSTD_endStream(ctx
.get(), &zstdOutBuffer
);
318 if (!errorHolder
.check(
319 !ZSTD_isError(bytesLeft
), ZSTD_getErrorName(bytesLeft
))) {
322 out
->push(split(outBuffer
, zstdOutBuffer
));
323 } while (bytesLeft
!= 0 && !errorHolder
.hasError());
327 * Calculates how large each independently compressed frame should be.
329 * @param size The size of the source if known, 0 otherwise
330 * @param numThreads The number of threads available to run compression jobs on
331 * @param params The zstd parameters to be used for compression
333 static size_t calculateStep(
336 const ZSTD_parameters
¶ms
) {
339 return size_t{1} << (params
.cParams
.windowLog
+ 2);
343 enum class FileStatus
{ Continue
, Done
, Error
};
344 /// Determines the status of the file descriptor `fd`.
345 FileStatus
fileStatus(FILE* fd
) {
347 return FileStatus::Done
;
348 } else if (std::ferror(fd
)) {
349 return FileStatus::Error
;
351 return FileStatus::Continue
;
353 } // anonymous namespace
356 * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
357 * Will read less if an error or EOF occurs.
358 * Returns the status of the file after all of the reads have occurred.
361 readData(BufferWorkQueue
& queue
, size_t chunkSize
, size_t size
, FILE* fd
,
362 std::uint64_t *totalBytesRead
) {
364 while (!buffer
.empty()) {
366 std::fread(buffer
.data(), 1, std::min(chunkSize
, buffer
.size()), fd
);
367 *totalBytesRead
+= bytesRead
;
368 queue
.push(buffer
.splitAt(bytesRead
));
369 auto status
= fileStatus(fd
);
370 if (status
!= FileStatus::Continue
) {
374 return FileStatus::Continue
;
377 std::uint64_t asyncCompressChunks(
379 WorkQueue
<std::shared_ptr
<BufferWorkQueue
>>& chunks
,
380 ThreadPool
& executor
,
384 ZSTD_parameters params
) {
385 auto chunksGuard
= makeScopeGuard([&] { chunks
.finish(); });
386 std::uint64_t bytesRead
= 0;
388 // Break the input up into chunks of size `step` and compress each chunk
390 size_t step
= calculateStep(size
, numThreads
, params
);
391 state
.log(DEBUG
, "Chosen frame size: %zu\n", step
);
392 auto status
= FileStatus::Continue
;
393 while (status
== FileStatus::Continue
&& !state
.errorHolder
.hasError()) {
394 // Make a new input queue that we will put the chunk's input data into.
395 auto in
= std::make_shared
<BufferWorkQueue
>();
396 auto inGuard
= makeScopeGuard([&] { in
->finish(); });
397 // Make a new output queue that compress will put the compressed data into.
398 auto out
= std::make_shared
<BufferWorkQueue
>();
399 // Start compression in the thread pool
400 executor
.add([&state
, in
, out
, step
] {
402 state
, std::move(in
), std::move(out
), step
);
404 // Pass the output queue to the writer thread.
405 chunks
.push(std::move(out
));
406 state
.log(VERBOSE
, "%s\n", "Starting a new frame");
407 // Fill the input queue for the compression job we just started
408 status
= readData(*in
, ZSTD_CStreamInSize(), step
, fd
, &bytesRead
);
410 state
.errorHolder
.check(status
!= FileStatus::Error
, "Error reading input");
415 * Decompress a frame, whose data is streamed into `in`, and stream the output
418 * @param state The shared state
419 * @param in Queue that we `pop()` input buffers from. It contains
420 * exactly one compressed frame.
421 * @param out Queue that we `push()` decompressed output buffers to
423 static void decompress(
425 std::shared_ptr
<BufferWorkQueue
> in
,
426 std::shared_ptr
<BufferWorkQueue
> out
) {
427 auto& errorHolder
= state
.errorHolder
;
428 auto guard
= makeScopeGuard([&] { out
->finish(); });
429 // Initialize the DCtx
430 auto ctx
= state
.dStreamPool
->get();
431 if (!errorHolder
.check(ctx
!= nullptr, "Failed to allocate ZSTD_DStream")) {
435 auto err
= ZSTD_resetDStream(ctx
.get());
436 if (!errorHolder
.check(!ZSTD_isError(err
), ZSTD_getErrorName(err
))) {
441 const size_t outSize
= ZSTD_DStreamOutSize();
443 size_t returnCode
= 0;
444 // Read a buffer in from the input queue
445 while (in
->pop(inBuffer
) && !errorHolder
.hasError()) {
446 auto zstdInBuffer
= makeZstdInBuffer(inBuffer
);
447 // Decompress the whole buffer and send it to the output queue
448 while (!inBuffer
.empty() && !errorHolder
.hasError()) {
449 // Allocate a buffer with at least outSize bytes.
450 Buffer
outBuffer(outSize
);
451 auto zstdOutBuffer
= makeZstdOutBuffer(outBuffer
);
454 ZSTD_decompressStream(ctx
.get(), &zstdOutBuffer
, &zstdInBuffer
);
455 if (!errorHolder
.check(
456 !ZSTD_isError(returnCode
), ZSTD_getErrorName(returnCode
))) {
459 // Pass the buffer with the decompressed data to the output queue
460 out
->push(split(outBuffer
, zstdOutBuffer
));
461 // Advance past the input we already read
462 advance(inBuffer
, zstdInBuffer
);
463 if (returnCode
== 0) {
464 // The frame is over, prepare to (maybe) start a new frame
465 ZSTD_initDStream(ctx
.get());
469 if (!errorHolder
.check(returnCode
<= 1, "Incomplete block")) {
472 // We've given ZSTD_decompressStream all of our data, but there may still
474 while (returnCode
== 1) {
475 // Allocate a buffer with at least outSize bytes.
476 Buffer
outBuffer(outSize
);
477 auto zstdOutBuffer
= makeZstdOutBuffer(outBuffer
);
479 ZSTD_inBuffer zstdInBuffer
{nullptr, 0, 0};
482 ZSTD_decompressStream(ctx
.get(), &zstdOutBuffer
, &zstdInBuffer
);
483 if (!errorHolder
.check(
484 !ZSTD_isError(returnCode
), ZSTD_getErrorName(returnCode
))) {
487 // Pass the buffer with the decompressed data to the output queue
488 out
->push(split(outBuffer
, zstdOutBuffer
));
492 std::uint64_t asyncDecompressFrames(
494 WorkQueue
<std::shared_ptr
<BufferWorkQueue
>>& frames
,
495 ThreadPool
& executor
,
497 auto framesGuard
= makeScopeGuard([&] { frames
.finish(); });
498 std::uint64_t totalBytesRead
= 0;
500 // Split the source up into its component frames.
501 // If we find our recognized skippable frame we know the next frames size
502 // which means that we can decompress each standard frame in independently.
503 // Otherwise, we will decompress using only one decompression task.
504 const size_t chunkSize
= ZSTD_DStreamInSize();
505 auto status
= FileStatus::Continue
;
506 while (status
== FileStatus::Continue
&& !state
.errorHolder
.hasError()) {
507 // Make a new input queue that we will put the frames's bytes into.
508 auto in
= std::make_shared
<BufferWorkQueue
>();
509 auto inGuard
= makeScopeGuard([&] { in
->finish(); });
510 // Make a output queue that decompress will put the decompressed data into
511 auto out
= std::make_shared
<BufferWorkQueue
>();
515 // Calculate the size of the next frame.
516 // frameSize is 0 if the frame info can't be decoded.
517 Buffer
buffer(SkippableFrame::kSize
);
518 auto bytesRead
= std::fread(buffer
.data(), 1, buffer
.size(), fd
);
519 totalBytesRead
+= bytesRead
;
520 status
= fileStatus(fd
);
521 if (bytesRead
== 0 && status
!= FileStatus::Continue
) {
524 buffer
.subtract(buffer
.size() - bytesRead
);
525 frameSize
= SkippableFrame::tryRead(buffer
.range());
526 in
->push(std::move(buffer
));
528 if (frameSize
== 0) {
529 // We hit a non SkippableFrame, so this will be the last job.
530 // Make sure that we don't use too much memory
534 // Start decompression in the thread pool
535 executor
.add([&state
, in
, out
] {
536 return decompress(state
, std::move(in
), std::move(out
));
538 // Pass the output queue to the writer thread
539 frames
.push(std::move(out
));
540 if (frameSize
== 0) {
541 // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
542 // Pass the rest of the source to this decompression task
543 state
.log(VERBOSE
, "%s\n",
544 "Input not in pzstd format, falling back to serial decompression");
545 while (status
== FileStatus::Continue
&& !state
.errorHolder
.hasError()) {
546 status
= readData(*in
, chunkSize
, chunkSize
, fd
, &totalBytesRead
);
550 state
.log(VERBOSE
, "Decompressing a frame of size %zu", frameSize
);
551 // Fill the input queue for the decompression job we just started
552 status
= readData(*in
, chunkSize
, frameSize
, fd
, &totalBytesRead
);
554 state
.errorHolder
.check(status
!= FileStatus::Error
, "Error reading input");
555 return totalBytesRead
;
558 /// Write `data` to `fd`, returns true iff success.
559 static bool writeData(ByteRange data
, FILE* fd
) {
560 while (!data
.empty()) {
561 data
.advance(std::fwrite(data
.begin(), 1, data
.size(), fd
));
562 if (std::ferror(fd
)) {
569 std::uint64_t writeFile(
571 WorkQueue
<std::shared_ptr
<BufferWorkQueue
>>& outs
,
574 auto& errorHolder
= state
.errorHolder
;
575 auto lineClearGuard
= makeScopeGuard([&state
] {
576 state
.log
.clear(INFO
);
578 std::uint64_t bytesWritten
= 0;
579 std::shared_ptr
<BufferWorkQueue
> out
;
580 // Grab the output queue for each decompression job (in order).
581 while (outs
.pop(out
)) {
582 if (errorHolder
.hasError()) {
586 // If we are compressing and want to write skippable frames we can't
587 // start writing before compression is done because we need to know the
589 // Wait for the compressed size to be available and write skippable frame
590 SkippableFrame
frame(out
->size());
591 if (!writeData(frame
.data(), outputFd
)) {
592 errorHolder
.setError("Failed to write output");
595 bytesWritten
+= frame
.kSize
;
597 // For each chunk of the frame: Pop it from the queue and write it
599 while (out
->pop(buffer
) && !errorHolder
.hasError()) {
600 if (!writeData(buffer
.range(), outputFd
)) {
601 errorHolder
.setError("Failed to write output");
604 bytesWritten
+= buffer
.size();
605 state
.log
.update(INFO
, "Written: %u MB ",
606 static_cast<std::uint32_t>(bytesWritten
>> 20));