]> git.proxmox.com Git - ceph.git/blob - ceph/src/zstd/contrib/pzstd/Pzstd.cpp
import 15.2.0 Octopus source
[ceph.git] / ceph / src / zstd / contrib / pzstd / Pzstd.cpp
1 /*
2 * Copyright (c) 2016-present, Facebook, Inc.
3 * All rights reserved.
4 *
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 */
9 #include "platform.h" /* Large Files support, SET_BINARY_MODE */
10 #include "Pzstd.h"
11 #include "SkippableFrame.h"
12 #include "utils/FileSystem.h"
13 #include "utils/Range.h"
14 #include "utils/ScopeGuard.h"
15 #include "utils/ThreadPool.h"
16 #include "utils/WorkQueue.h"
17
18 #include <chrono>
19 #include <cinttypes>
20 #include <cstddef>
21 #include <cstdio>
22 #include <memory>
23 #include <string>
24
25
26 namespace pzstd {
27
28 namespace {
29 #ifdef _WIN32
30 const std::string nullOutput = "nul";
31 #else
32 const std::string nullOutput = "/dev/null";
33 #endif
34 }
35
36 using std::size_t;
37
38 static std::uintmax_t fileSizeOrZero(const std::string &file) {
39 if (file == "-") {
40 return 0;
41 }
42 std::error_code ec;
43 auto size = file_size(file, ec);
44 if (ec) {
45 size = 0;
46 }
47 return size;
48 }
49
50 static std::uint64_t handleOneInput(const Options &options,
51 const std::string &inputFile,
52 FILE* inputFd,
53 const std::string &outputFile,
54 FILE* outputFd,
55 SharedState& state) {
56 auto inputSize = fileSizeOrZero(inputFile);
57 // WorkQueue outlives ThreadPool so in the case of error we are certain
58 // we don't accidentally try to call push() on it after it is destroyed
59 WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
60 std::uint64_t bytesRead;
61 std::uint64_t bytesWritten;
62 {
63 // Initialize the (de)compression thread pool with numThreads
64 ThreadPool executor(options.numThreads);
65 // Run the reader thread on an extra thread
66 ThreadPool readExecutor(1);
67 if (!options.decompress) {
68 // Add a job that reads the input and starts all the compression jobs
69 readExecutor.add(
70 [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
71 bytesRead = asyncCompressChunks(
72 state,
73 outs,
74 executor,
75 inputFd,
76 inputSize,
77 options.numThreads,
78 options.determineParameters());
79 });
80 // Start writing
81 bytesWritten = writeFile(state, outs, outputFd, options.decompress);
82 } else {
83 // Add a job that reads the input and starts all the decompression jobs
84 readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
85 bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
86 });
87 // Start writing
88 bytesWritten = writeFile(state, outs, outputFd, options.decompress);
89 }
90 }
91 if (!state.errorHolder.hasError()) {
92 std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
93 std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
94 if (!options.decompress) {
95 double ratio = static_cast<double>(bytesWritten) /
96 static_cast<double>(bytesRead + !bytesRead);
97 state.log(INFO, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
98 " bytes, %s)\n",
99 inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
100 outputFileName.c_str());
101 } else {
102 state.log(INFO, "%-20s: %" PRIu64 " bytes \n",
103 inputFileName.c_str(),bytesWritten);
104 }
105 }
106 return bytesWritten;
107 }
108
109 static FILE *openInputFile(const std::string &inputFile,
110 ErrorHolder &errorHolder) {
111 if (inputFile == "-") {
112 SET_BINARY_MODE(stdin);
113 return stdin;
114 }
115 // Check if input file is a directory
116 {
117 std::error_code ec;
118 if (is_directory(inputFile, ec)) {
119 errorHolder.setError("Output file is a directory -- ignored");
120 return nullptr;
121 }
122 }
123 auto inputFd = std::fopen(inputFile.c_str(), "rb");
124 if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
125 return nullptr;
126 }
127 return inputFd;
128 }
129
130 static FILE *openOutputFile(const Options &options,
131 const std::string &outputFile,
132 SharedState& state) {
133 if (outputFile == "-") {
134 SET_BINARY_MODE(stdout);
135 return stdout;
136 }
137 // Check if the output file exists and then open it
138 if (!options.overwrite && outputFile != nullOutput) {
139 auto outputFd = std::fopen(outputFile.c_str(), "rb");
140 if (outputFd != nullptr) {
141 std::fclose(outputFd);
142 if (!state.log.logsAt(INFO)) {
143 state.errorHolder.setError("Output file exists");
144 return nullptr;
145 }
146 state.log(
147 INFO,
148 "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
149 outputFile.c_str());
150 int c = getchar();
151 if (c != 'y' && c != 'Y') {
152 state.errorHolder.setError("Not overwritten");
153 return nullptr;
154 }
155 }
156 }
157 auto outputFd = std::fopen(outputFile.c_str(), "wb");
158 if (!state.errorHolder.check(
159 outputFd != nullptr, "Failed to open output file")) {
160 return nullptr;
161 }
162 return outputFd;
163 }
164
165 int pzstdMain(const Options &options) {
166 int returnCode = 0;
167 SharedState state(options);
168 for (const auto& input : options.inputFiles) {
169 // Setup the shared state
170 auto printErrorGuard = makeScopeGuard([&] {
171 if (state.errorHolder.hasError()) {
172 returnCode = 1;
173 state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(),
174 state.errorHolder.getError().c_str());
175 }
176 });
177 // Open the input file
178 auto inputFd = openInputFile(input, state.errorHolder);
179 if (inputFd == nullptr) {
180 continue;
181 }
182 auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
183 // Open the output file
184 auto outputFile = options.getOutputFile(input);
185 if (!state.errorHolder.check(outputFile != "",
186 "Input file does not have extension .zst")) {
187 continue;
188 }
189 auto outputFd = openOutputFile(options, outputFile, state);
190 if (outputFd == nullptr) {
191 continue;
192 }
193 auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
194 // (de)compress the file
195 handleOneInput(options, input, inputFd, outputFile, outputFd, state);
196 if (state.errorHolder.hasError()) {
197 continue;
198 }
199 // Delete the input file if necessary
200 if (!options.keepSource) {
201 // Be sure that we are done and have written everything before we delete
202 if (!state.errorHolder.check(std::fclose(inputFd) == 0,
203 "Failed to close input file")) {
204 continue;
205 }
206 closeInputGuard.dismiss();
207 if (!state.errorHolder.check(std::fclose(outputFd) == 0,
208 "Failed to close output file")) {
209 continue;
210 }
211 closeOutputGuard.dismiss();
212 if (std::remove(input.c_str()) != 0) {
213 state.errorHolder.setError("Failed to remove input file");
214 continue;
215 }
216 }
217 }
218 // Returns 1 if any of the files failed to (de)compress.
219 return returnCode;
220 }
221
222 /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
223 static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
224 return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
225 }
226
227 /**
228 * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
229 * `inBuffer.pos`.
230 */
231 void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
232 auto pos = inBuffer.pos;
233 inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
234 inBuffer.size -= pos;
235 inBuffer.pos = 0;
236 return buffer.advance(pos);
237 }
238
239 /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
240 static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
241 return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
242 }
243
244 /**
245 * Split `buffer` and advance `outBuffer` by the amount of data written, as
246 * indicated by `outBuffer.pos`.
247 */
248 Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
249 auto pos = outBuffer.pos;
250 outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
251 outBuffer.size -= pos;
252 outBuffer.pos = 0;
253 return buffer.splitAt(pos);
254 }
255
256 /**
257 * Stream chunks of input from `in`, compress it, and stream it out to `out`.
258 *
259 * @param state The shared state
260 * @param in Queue that we `pop()` input buffers from
261 * @param out Queue that we `push()` compressed output buffers to
262 * @param maxInputSize An upper bound on the size of the input
263 */
264 static void compress(
265 SharedState& state,
266 std::shared_ptr<BufferWorkQueue> in,
267 std::shared_ptr<BufferWorkQueue> out,
268 size_t maxInputSize) {
269 auto& errorHolder = state.errorHolder;
270 auto guard = makeScopeGuard([&] { out->finish(); });
271 // Initialize the CCtx
272 auto ctx = state.cStreamPool->get();
273 if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
274 return;
275 }
276 {
277 auto err = ZSTD_resetCStream(ctx.get(), 0);
278 if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
279 return;
280 }
281 }
282
283 // Allocate space for the result
284 auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
285 auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
286 {
287 Buffer inBuffer;
288 // Read a buffer in from the input queue
289 while (in->pop(inBuffer) && !errorHolder.hasError()) {
290 auto zstdInBuffer = makeZstdInBuffer(inBuffer);
291 // Compress the whole buffer and send it to the output queue
292 while (!inBuffer.empty() && !errorHolder.hasError()) {
293 if (!errorHolder.check(
294 !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
295 return;
296 }
297 // Compress
298 auto err =
299 ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
300 if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
301 return;
302 }
303 // Split the compressed data off outBuffer and pass to the output queue
304 out->push(split(outBuffer, zstdOutBuffer));
305 // Forget about the data we already compressed
306 advance(inBuffer, zstdInBuffer);
307 }
308 }
309 }
310 // Write the epilog
311 size_t bytesLeft;
312 do {
313 if (!errorHolder.check(
314 !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
315 return;
316 }
317 bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
318 if (!errorHolder.check(
319 !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
320 return;
321 }
322 out->push(split(outBuffer, zstdOutBuffer));
323 } while (bytesLeft != 0 && !errorHolder.hasError());
324 }
325
326 /**
327 * Calculates how large each independently compressed frame should be.
328 *
329 * @param size The size of the source if known, 0 otherwise
330 * @param numThreads The number of threads available to run compression jobs on
331 * @param params The zstd parameters to be used for compression
332 */
333 static size_t calculateStep(
334 std::uintmax_t size,
335 size_t numThreads,
336 const ZSTD_parameters &params) {
337 (void)size;
338 (void)numThreads;
339 return size_t{1} << (params.cParams.windowLog + 2);
340 }
341
342 namespace {
343 enum class FileStatus { Continue, Done, Error };
344 /// Determines the status of the file descriptor `fd`.
345 FileStatus fileStatus(FILE* fd) {
346 if (std::feof(fd)) {
347 return FileStatus::Done;
348 } else if (std::ferror(fd)) {
349 return FileStatus::Error;
350 }
351 return FileStatus::Continue;
352 }
353 } // anonymous namespace
354
355 /**
356 * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
357 * Will read less if an error or EOF occurs.
358 * Returns the status of the file after all of the reads have occurred.
359 */
360 static FileStatus
361 readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
362 std::uint64_t *totalBytesRead) {
363 Buffer buffer(size);
364 while (!buffer.empty()) {
365 auto bytesRead =
366 std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
367 *totalBytesRead += bytesRead;
368 queue.push(buffer.splitAt(bytesRead));
369 auto status = fileStatus(fd);
370 if (status != FileStatus::Continue) {
371 return status;
372 }
373 }
374 return FileStatus::Continue;
375 }
376
377 std::uint64_t asyncCompressChunks(
378 SharedState& state,
379 WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
380 ThreadPool& executor,
381 FILE* fd,
382 std::uintmax_t size,
383 size_t numThreads,
384 ZSTD_parameters params) {
385 auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
386 std::uint64_t bytesRead = 0;
387
388 // Break the input up into chunks of size `step` and compress each chunk
389 // independently.
390 size_t step = calculateStep(size, numThreads, params);
391 state.log(DEBUG, "Chosen frame size: %zu\n", step);
392 auto status = FileStatus::Continue;
393 while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
394 // Make a new input queue that we will put the chunk's input data into.
395 auto in = std::make_shared<BufferWorkQueue>();
396 auto inGuard = makeScopeGuard([&] { in->finish(); });
397 // Make a new output queue that compress will put the compressed data into.
398 auto out = std::make_shared<BufferWorkQueue>();
399 // Start compression in the thread pool
400 executor.add([&state, in, out, step] {
401 return compress(
402 state, std::move(in), std::move(out), step);
403 });
404 // Pass the output queue to the writer thread.
405 chunks.push(std::move(out));
406 state.log(VERBOSE, "%s\n", "Starting a new frame");
407 // Fill the input queue for the compression job we just started
408 status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
409 }
410 state.errorHolder.check(status != FileStatus::Error, "Error reading input");
411 return bytesRead;
412 }
413
414 /**
415 * Decompress a frame, whose data is streamed into `in`, and stream the output
416 * to `out`.
417 *
418 * @param state The shared state
419 * @param in Queue that we `pop()` input buffers from. It contains
420 * exactly one compressed frame.
421 * @param out Queue that we `push()` decompressed output buffers to
422 */
423 static void decompress(
424 SharedState& state,
425 std::shared_ptr<BufferWorkQueue> in,
426 std::shared_ptr<BufferWorkQueue> out) {
427 auto& errorHolder = state.errorHolder;
428 auto guard = makeScopeGuard([&] { out->finish(); });
429 // Initialize the DCtx
430 auto ctx = state.dStreamPool->get();
431 if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
432 return;
433 }
434 {
435 auto err = ZSTD_resetDStream(ctx.get());
436 if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
437 return;
438 }
439 }
440
441 const size_t outSize = ZSTD_DStreamOutSize();
442 Buffer inBuffer;
443 size_t returnCode = 0;
444 // Read a buffer in from the input queue
445 while (in->pop(inBuffer) && !errorHolder.hasError()) {
446 auto zstdInBuffer = makeZstdInBuffer(inBuffer);
447 // Decompress the whole buffer and send it to the output queue
448 while (!inBuffer.empty() && !errorHolder.hasError()) {
449 // Allocate a buffer with at least outSize bytes.
450 Buffer outBuffer(outSize);
451 auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
452 // Decompress
453 returnCode =
454 ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
455 if (!errorHolder.check(
456 !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
457 return;
458 }
459 // Pass the buffer with the decompressed data to the output queue
460 out->push(split(outBuffer, zstdOutBuffer));
461 // Advance past the input we already read
462 advance(inBuffer, zstdInBuffer);
463 if (returnCode == 0) {
464 // The frame is over, prepare to (maybe) start a new frame
465 ZSTD_initDStream(ctx.get());
466 }
467 }
468 }
469 if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
470 return;
471 }
472 // We've given ZSTD_decompressStream all of our data, but there may still
473 // be data to read.
474 while (returnCode == 1) {
475 // Allocate a buffer with at least outSize bytes.
476 Buffer outBuffer(outSize);
477 auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
478 // Pass in no input.
479 ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
480 // Decompress
481 returnCode =
482 ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
483 if (!errorHolder.check(
484 !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
485 return;
486 }
487 // Pass the buffer with the decompressed data to the output queue
488 out->push(split(outBuffer, zstdOutBuffer));
489 }
490 }
491
492 std::uint64_t asyncDecompressFrames(
493 SharedState& state,
494 WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
495 ThreadPool& executor,
496 FILE* fd) {
497 auto framesGuard = makeScopeGuard([&] { frames.finish(); });
498 std::uint64_t totalBytesRead = 0;
499
500 // Split the source up into its component frames.
501 // If we find our recognized skippable frame we know the next frames size
502 // which means that we can decompress each standard frame in independently.
503 // Otherwise, we will decompress using only one decompression task.
504 const size_t chunkSize = ZSTD_DStreamInSize();
505 auto status = FileStatus::Continue;
506 while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
507 // Make a new input queue that we will put the frames's bytes into.
508 auto in = std::make_shared<BufferWorkQueue>();
509 auto inGuard = makeScopeGuard([&] { in->finish(); });
510 // Make a output queue that decompress will put the decompressed data into
511 auto out = std::make_shared<BufferWorkQueue>();
512
513 size_t frameSize;
514 {
515 // Calculate the size of the next frame.
516 // frameSize is 0 if the frame info can't be decoded.
517 Buffer buffer(SkippableFrame::kSize);
518 auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
519 totalBytesRead += bytesRead;
520 status = fileStatus(fd);
521 if (bytesRead == 0 && status != FileStatus::Continue) {
522 break;
523 }
524 buffer.subtract(buffer.size() - bytesRead);
525 frameSize = SkippableFrame::tryRead(buffer.range());
526 in->push(std::move(buffer));
527 }
528 if (frameSize == 0) {
529 // We hit a non SkippableFrame, so this will be the last job.
530 // Make sure that we don't use too much memory
531 in->setMaxSize(64);
532 out->setMaxSize(64);
533 }
534 // Start decompression in the thread pool
535 executor.add([&state, in, out] {
536 return decompress(state, std::move(in), std::move(out));
537 });
538 // Pass the output queue to the writer thread
539 frames.push(std::move(out));
540 if (frameSize == 0) {
541 // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
542 // Pass the rest of the source to this decompression task
543 state.log(VERBOSE, "%s\n",
544 "Input not in pzstd format, falling back to serial decompression");
545 while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
546 status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
547 }
548 break;
549 }
550 state.log(VERBOSE, "Decompressing a frame of size %zu", frameSize);
551 // Fill the input queue for the decompression job we just started
552 status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
553 }
554 state.errorHolder.check(status != FileStatus::Error, "Error reading input");
555 return totalBytesRead;
556 }
557
558 /// Write `data` to `fd`, returns true iff success.
559 static bool writeData(ByteRange data, FILE* fd) {
560 while (!data.empty()) {
561 data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
562 if (std::ferror(fd)) {
563 return false;
564 }
565 }
566 return true;
567 }
568
569 std::uint64_t writeFile(
570 SharedState& state,
571 WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
572 FILE* outputFd,
573 bool decompress) {
574 auto& errorHolder = state.errorHolder;
575 auto lineClearGuard = makeScopeGuard([&state] {
576 state.log.clear(INFO);
577 });
578 std::uint64_t bytesWritten = 0;
579 std::shared_ptr<BufferWorkQueue> out;
580 // Grab the output queue for each decompression job (in order).
581 while (outs.pop(out)) {
582 if (errorHolder.hasError()) {
583 continue;
584 }
585 if (!decompress) {
586 // If we are compressing and want to write skippable frames we can't
587 // start writing before compression is done because we need to know the
588 // compressed size.
589 // Wait for the compressed size to be available and write skippable frame
590 SkippableFrame frame(out->size());
591 if (!writeData(frame.data(), outputFd)) {
592 errorHolder.setError("Failed to write output");
593 return bytesWritten;
594 }
595 bytesWritten += frame.kSize;
596 }
597 // For each chunk of the frame: Pop it from the queue and write it
598 Buffer buffer;
599 while (out->pop(buffer) && !errorHolder.hasError()) {
600 if (!writeData(buffer.range(), outputFd)) {
601 errorHolder.setError("Failed to write output");
602 return bytesWritten;
603 }
604 bytesWritten += buffer.size();
605 state.log.update(INFO, "Written: %u MB ",
606 static_cast<std::uint32_t>(bytesWritten >> 20));
607 }
608 }
609 return bytesWritten;
610 }
611 }