]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/util/compression_lz4.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / compression_lz4.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/util/compression.h"
19
20#include <cstdint>
21#include <cstring>
22#include <memory>
23
24#include <lz4.h>
25#include <lz4frame.h>
26
27#include "arrow/result.h"
28#include "arrow/status.h"
29#include "arrow/util/bit_util.h"
30#include "arrow/util/endian.h"
31#include "arrow/util/logging.h"
32#include "arrow/util/macros.h"
33#include "arrow/util/ubsan.h"
34
35#ifndef LZ4F_HEADER_SIZE_MAX
36#define LZ4F_HEADER_SIZE_MAX 19
37#endif
38
39namespace arrow {
40namespace util {
41
42namespace {
43
44static Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) {
45 return Status::IOError(prefix_msg, LZ4F_getErrorName(ret));
46}
47
48static LZ4F_preferences_t DefaultPreferences() {
49 LZ4F_preferences_t prefs;
50 memset(&prefs, 0, sizeof(prefs));
51 return prefs;
52}
53
54// ----------------------------------------------------------------------
55// Lz4 frame decompressor implementation
56
57class LZ4Decompressor : public Decompressor {
58 public:
59 LZ4Decompressor() {}
60
61 ~LZ4Decompressor() override {
62 if (ctx_ != nullptr) {
63 ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
64 }
65 }
66
67 Status Init() {
68 LZ4F_errorCode_t ret;
69 finished_ = false;
70
71 ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION);
72 if (LZ4F_isError(ret)) {
73 return LZ4Error(ret, "LZ4 init failed: ");
74 } else {
75 return Status::OK();
76 }
77 }
78
79 Status Reset() override {
80#if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800
81 // LZ4F_resetDecompressionContext appeared in 1.8.0
82 DCHECK_NE(ctx_, nullptr);
83 LZ4F_resetDecompressionContext(ctx_);
84 finished_ = false;
85 return Status::OK();
86#else
87 if (ctx_ != nullptr) {
88 ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
89 }
90 return Init();
91#endif
92 }
93
94 Result<DecompressResult> Decompress(int64_t input_len, const uint8_t* input,
95 int64_t output_len, uint8_t* output) override {
96 auto src = input;
97 auto dst = output;
98 auto src_size = static_cast<size_t>(input_len);
99 auto dst_capacity = static_cast<size_t>(output_len);
100 size_t ret;
101
102 ret =
103 LZ4F_decompress(ctx_, dst, &dst_capacity, src, &src_size, nullptr /* options */);
104 if (LZ4F_isError(ret)) {
105 return LZ4Error(ret, "LZ4 decompress failed: ");
106 }
107 finished_ = (ret == 0);
108 return DecompressResult{static_cast<int64_t>(src_size),
109 static_cast<int64_t>(dst_capacity),
110 (src_size == 0 && dst_capacity == 0)};
111 }
112
113 bool IsFinished() override { return finished_; }
114
115 protected:
116 LZ4F_decompressionContext_t ctx_ = nullptr;
117 bool finished_;
118};
119
120// ----------------------------------------------------------------------
121// Lz4 frame compressor implementation
122
123class LZ4Compressor : public Compressor {
124 public:
125 LZ4Compressor() {}
126
127 ~LZ4Compressor() override {
128 if (ctx_ != nullptr) {
129 ARROW_UNUSED(LZ4F_freeCompressionContext(ctx_));
130 }
131 }
132
133 Status Init() {
134 LZ4F_errorCode_t ret;
135 prefs_ = DefaultPreferences();
136 first_time_ = true;
137
138 ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
139 if (LZ4F_isError(ret)) {
140 return LZ4Error(ret, "LZ4 init failed: ");
141 } else {
142 return Status::OK();
143 }
144 }
145
146#define BEGIN_COMPRESS(dst, dst_capacity, output_too_small) \
147 if (first_time_) { \
148 if (dst_capacity < LZ4F_HEADER_SIZE_MAX) { \
149 /* Output too small to write LZ4F header */ \
150 return (output_too_small); \
151 } \
152 ret = LZ4F_compressBegin(ctx_, dst, dst_capacity, &prefs_); \
153 if (LZ4F_isError(ret)) { \
154 return LZ4Error(ret, "LZ4 compress begin failed: "); \
155 } \
156 first_time_ = false; \
157 dst += ret; \
158 dst_capacity -= ret; \
159 bytes_written += static_cast<int64_t>(ret); \
160 }
161
162 Result<CompressResult> Compress(int64_t input_len, const uint8_t* input,
163 int64_t output_len, uint8_t* output) override {
164 auto src = input;
165 auto dst = output;
166 auto src_size = static_cast<size_t>(input_len);
167 auto dst_capacity = static_cast<size_t>(output_len);
168 size_t ret;
169 int64_t bytes_written = 0;
170
171 BEGIN_COMPRESS(dst, dst_capacity, (CompressResult{0, 0}));
172
173 if (dst_capacity < LZ4F_compressBound(src_size, &prefs_)) {
174 // Output too small to compress into
175 return CompressResult{0, bytes_written};
176 }
177 ret = LZ4F_compressUpdate(ctx_, dst, dst_capacity, src, src_size,
178 nullptr /* options */);
179 if (LZ4F_isError(ret)) {
180 return LZ4Error(ret, "LZ4 compress update failed: ");
181 }
182 bytes_written += static_cast<int64_t>(ret);
183 DCHECK_LE(bytes_written, output_len);
184 return CompressResult{input_len, bytes_written};
185 }
186
187 Result<FlushResult> Flush(int64_t output_len, uint8_t* output) override {
188 auto dst = output;
189 auto dst_capacity = static_cast<size_t>(output_len);
190 size_t ret;
191 int64_t bytes_written = 0;
192
193 BEGIN_COMPRESS(dst, dst_capacity, (FlushResult{0, true}));
194
195 if (dst_capacity < LZ4F_compressBound(0, &prefs_)) {
196 // Output too small to flush into
197 return FlushResult{bytes_written, true};
198 }
199
200 ret = LZ4F_flush(ctx_, dst, dst_capacity, nullptr /* options */);
201 if (LZ4F_isError(ret)) {
202 return LZ4Error(ret, "LZ4 flush failed: ");
203 }
204 bytes_written += static_cast<int64_t>(ret);
205 DCHECK_LE(bytes_written, output_len);
206 return FlushResult{bytes_written, false};
207 }
208
209 Result<EndResult> End(int64_t output_len, uint8_t* output) override {
210 auto dst = output;
211 auto dst_capacity = static_cast<size_t>(output_len);
212 size_t ret;
213 int64_t bytes_written = 0;
214
215 BEGIN_COMPRESS(dst, dst_capacity, (EndResult{0, true}));
216
217 if (dst_capacity < LZ4F_compressBound(0, &prefs_)) {
218 // Output too small to end frame into
219 return EndResult{bytes_written, true};
220 }
221
222 ret = LZ4F_compressEnd(ctx_, dst, dst_capacity, nullptr /* options */);
223 if (LZ4F_isError(ret)) {
224 return LZ4Error(ret, "LZ4 end failed: ");
225 }
226 bytes_written += static_cast<int64_t>(ret);
227 DCHECK_LE(bytes_written, output_len);
228 return EndResult{bytes_written, false};
229 }
230
231#undef BEGIN_COMPRESS
232
233 protected:
234 LZ4F_compressionContext_t ctx_ = nullptr;
235 LZ4F_preferences_t prefs_;
236 bool first_time_;
237};
238
239// ----------------------------------------------------------------------
240// Lz4 frame codec implementation
241
242class Lz4FrameCodec : public Codec {
243 public:
244 Lz4FrameCodec() : prefs_(DefaultPreferences()) {}
245
246 int64_t MaxCompressedLen(int64_t input_len,
247 const uint8_t* ARROW_ARG_UNUSED(input)) override {
248 return static_cast<int64_t>(
249 LZ4F_compressFrameBound(static_cast<size_t>(input_len), &prefs_));
250 }
251
252 Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
253 int64_t output_buffer_len, uint8_t* output_buffer) override {
254 auto output_len =
255 LZ4F_compressFrame(output_buffer, static_cast<size_t>(output_buffer_len), input,
256 static_cast<size_t>(input_len), &prefs_);
257 if (LZ4F_isError(output_len)) {
258 return LZ4Error(output_len, "Lz4 compression failure: ");
259 }
260 return static_cast<int64_t>(output_len);
261 }
262
263 Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
264 int64_t output_buffer_len, uint8_t* output_buffer) override {
265 ARROW_ASSIGN_OR_RAISE(auto decomp, MakeDecompressor());
266
267 int64_t total_bytes_written = 0;
268 while (!decomp->IsFinished() && input_len != 0) {
269 ARROW_ASSIGN_OR_RAISE(
270 auto res,
271 decomp->Decompress(input_len, input, output_buffer_len, output_buffer));
272 input += res.bytes_read;
273 input_len -= res.bytes_read;
274 output_buffer += res.bytes_written;
275 output_buffer_len -= res.bytes_written;
276 total_bytes_written += res.bytes_written;
277 if (res.need_more_output) {
278 return Status::IOError("Lz4 decompression buffer too small");
279 }
280 }
281 if (!decomp->IsFinished()) {
282 return Status::IOError("Lz4 compressed input contains less than one frame");
283 }
284 if (input_len != 0) {
285 return Status::IOError("Lz4 compressed input contains more than one frame");
286 }
287 return total_bytes_written;
288 }
289
290 Result<std::shared_ptr<Compressor>> MakeCompressor() override {
291 auto ptr = std::make_shared<LZ4Compressor>();
292 RETURN_NOT_OK(ptr->Init());
293 return ptr;
294 }
295
296 Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
297 auto ptr = std::make_shared<LZ4Decompressor>();
298 RETURN_NOT_OK(ptr->Init());
299 return ptr;
300 }
301
302 Compression::type compression_type() const override { return Compression::LZ4_FRAME; }
303 int minimum_compression_level() const override { return kUseDefaultCompressionLevel; }
304 int maximum_compression_level() const override { return kUseDefaultCompressionLevel; }
305 int default_compression_level() const override { return kUseDefaultCompressionLevel; }
306
307 protected:
308 const LZ4F_preferences_t prefs_;
309};
310
311// ----------------------------------------------------------------------
312// Lz4 "raw" codec implementation
313
314class Lz4Codec : public Codec {
315 public:
316 Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
317 int64_t output_buffer_len, uint8_t* output_buffer) override {
318 int64_t decompressed_size = LZ4_decompress_safe(
319 reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer),
320 static_cast<int>(input_len), static_cast<int>(output_buffer_len));
321 if (decompressed_size < 0) {
322 return Status::IOError("Corrupt Lz4 compressed data.");
323 }
324 return decompressed_size;
325 }
326
327 int64_t MaxCompressedLen(int64_t input_len,
328 const uint8_t* ARROW_ARG_UNUSED(input)) override {
329 return LZ4_compressBound(static_cast<int>(input_len));
330 }
331
332 Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
333 int64_t output_buffer_len, uint8_t* output_buffer) override {
334 int64_t output_len = LZ4_compress_default(
335 reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer),
336 static_cast<int>(input_len), static_cast<int>(output_buffer_len));
337 if (output_len == 0) {
338 return Status::IOError("Lz4 compression failure.");
339 }
340 return output_len;
341 }
342
343 Result<std::shared_ptr<Compressor>> MakeCompressor() override {
344 return Status::NotImplemented(
345 "Streaming compression unsupported with LZ4 raw format. "
346 "Try using LZ4 frame format instead.");
347 }
348
349 Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
350 return Status::NotImplemented(
351 "Streaming decompression unsupported with LZ4 raw format. "
352 "Try using LZ4 frame format instead.");
353 }
354
355 Compression::type compression_type() const override { return Compression::LZ4; }
356 int minimum_compression_level() const override { return kUseDefaultCompressionLevel; }
357 int maximum_compression_level() const override { return kUseDefaultCompressionLevel; }
358 int default_compression_level() const override { return kUseDefaultCompressionLevel; }
359};
360
361// ----------------------------------------------------------------------
362// Lz4 Hadoop "raw" codec implementation
363
364class Lz4HadoopCodec : public Lz4Codec {
365 public:
366 Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
367 int64_t output_buffer_len, uint8_t* output_buffer) override {
368 const int64_t decompressed_size =
369 TryDecompressHadoop(input_len, input, output_buffer_len, output_buffer);
370 if (decompressed_size != kNotHadoop) {
371 return decompressed_size;
372 }
373 // Fall back on raw LZ4 codec (for files produces by earlier versions of Parquet C++)
374 return Lz4Codec::Decompress(input_len, input, output_buffer_len, output_buffer);
375 }
376
377 int64_t MaxCompressedLen(int64_t input_len,
378 const uint8_t* ARROW_ARG_UNUSED(input)) override {
379 return kPrefixLength + Lz4Codec::MaxCompressedLen(input_len, nullptr);
380 }
381
382 Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
383 int64_t output_buffer_len, uint8_t* output_buffer) override {
384 if (output_buffer_len < kPrefixLength) {
385 return Status::Invalid("Output buffer too small for Lz4HadoopCodec compression");
386 }
387
388 ARROW_ASSIGN_OR_RAISE(
389 int64_t output_len,
390 Lz4Codec::Compress(input_len, input, output_buffer_len - kPrefixLength,
391 output_buffer + kPrefixLength));
392
393 // Prepend decompressed size in bytes and compressed size in bytes
394 // to be compatible with Hadoop Lz4Codec
395 const uint32_t decompressed_size =
396 BitUtil::ToBigEndian(static_cast<uint32_t>(input_len));
397 const uint32_t compressed_size =
398 BitUtil::ToBigEndian(static_cast<uint32_t>(output_len));
399 SafeStore(output_buffer, decompressed_size);
400 SafeStore(output_buffer + sizeof(uint32_t), compressed_size);
401
402 return kPrefixLength + output_len;
403 }
404
405 Result<std::shared_ptr<Compressor>> MakeCompressor() override {
406 return Status::NotImplemented(
407 "Streaming compression unsupported with LZ4 Hadoop raw format. "
408 "Try using LZ4 frame format instead.");
409 }
410
411 Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
412 return Status::NotImplemented(
413 "Streaming decompression unsupported with LZ4 Hadoop raw format. "
414 "Try using LZ4 frame format instead.");
415 }
416
417 Compression::type compression_type() const override { return Compression::LZ4_HADOOP; }
418
419 protected:
420 // Offset starting at which page data can be read/written
421 static const int64_t kPrefixLength = sizeof(uint32_t) * 2;
422
423 static const int64_t kNotHadoop = -1;
424
425 int64_t TryDecompressHadoop(int64_t input_len, const uint8_t* input,
426 int64_t output_buffer_len, uint8_t* output_buffer) {
427 // Parquet files written with the Hadoop Lz4Codec use their own framing.
428 // The input buffer can contain an arbitrary number of "frames", each
429 // with the following structure:
430 // - bytes 0..3: big-endian uint32_t representing the frame decompressed size
431 // - bytes 4..7: big-endian uint32_t representing the frame compressed size
432 // - bytes 8...: frame compressed data
433 //
434 // The Hadoop Lz4Codec source code can be found here:
435 // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
436 int64_t total_decompressed_size = 0;
437
438 while (input_len >= kPrefixLength) {
439 const uint32_t expected_decompressed_size =
440 BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
441 const uint32_t expected_compressed_size =
442 BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
443 input += kPrefixLength;
444 input_len -= kPrefixLength;
445
446 if (input_len < expected_compressed_size) {
447 // Not enough bytes for Hadoop "frame"
448 return kNotHadoop;
449 }
450 if (output_buffer_len < expected_decompressed_size) {
451 // Not enough bytes to hold advertised output => probably not Hadoop
452 return kNotHadoop;
453 }
454 // Try decompressing and compare with expected decompressed length
455 auto maybe_decompressed_size = Lz4Codec::Decompress(
456 expected_compressed_size, input, output_buffer_len, output_buffer);
457 if (!maybe_decompressed_size.ok() ||
458 *maybe_decompressed_size != expected_decompressed_size) {
459 return kNotHadoop;
460 }
461 input += expected_compressed_size;
462 input_len -= expected_compressed_size;
463 output_buffer += expected_decompressed_size;
464 output_buffer_len -= expected_decompressed_size;
465 total_decompressed_size += expected_decompressed_size;
466 }
467
468 if (input_len == 0) {
469 return total_decompressed_size;
470 } else {
471 return kNotHadoop;
472 }
473 }
474};
475
476} // namespace
477
478namespace internal {
479
480std::unique_ptr<Codec> MakeLz4FrameCodec() {
481 return std::unique_ptr<Codec>(new Lz4FrameCodec());
482}
483
484std::unique_ptr<Codec> MakeLz4HadoopRawCodec() {
485 return std::unique_ptr<Codec>(new Lz4HadoopCodec());
486}
487
488std::unique_ptr<Codec> MakeLz4RawCodec() {
489 return std::unique_ptr<Codec>(new Lz4Codec());
490}
491
492} // namespace internal
493
494} // namespace util
495} // namespace arrow