]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/util/compression.h" | |
19 | ||
20 | #include <cstdint> | |
21 | #include <cstring> | |
22 | #include <memory> | |
23 | ||
24 | #include <lz4.h> | |
25 | #include <lz4frame.h> | |
26 | ||
27 | #include "arrow/result.h" | |
28 | #include "arrow/status.h" | |
29 | #include "arrow/util/bit_util.h" | |
30 | #include "arrow/util/endian.h" | |
31 | #include "arrow/util/logging.h" | |
32 | #include "arrow/util/macros.h" | |
33 | #include "arrow/util/ubsan.h" | |
34 | ||
35 | #ifndef LZ4F_HEADER_SIZE_MAX | |
36 | #define LZ4F_HEADER_SIZE_MAX 19 | |
37 | #endif | |
38 | ||
39 | namespace arrow { | |
40 | namespace util { | |
41 | ||
42 | namespace { | |
43 | ||
44 | static Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) { | |
45 | return Status::IOError(prefix_msg, LZ4F_getErrorName(ret)); | |
46 | } | |
47 | ||
48 | static LZ4F_preferences_t DefaultPreferences() { | |
49 | LZ4F_preferences_t prefs; | |
50 | memset(&prefs, 0, sizeof(prefs)); | |
51 | return prefs; | |
52 | } | |
53 | ||
54 | // ---------------------------------------------------------------------- | |
55 | // Lz4 frame decompressor implementation | |
56 | ||
57 | class LZ4Decompressor : public Decompressor { | |
58 | public: | |
59 | LZ4Decompressor() {} | |
60 | ||
61 | ~LZ4Decompressor() override { | |
62 | if (ctx_ != nullptr) { | |
63 | ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_)); | |
64 | } | |
65 | } | |
66 | ||
67 | Status Init() { | |
68 | LZ4F_errorCode_t ret; | |
69 | finished_ = false; | |
70 | ||
71 | ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); | |
72 | if (LZ4F_isError(ret)) { | |
73 | return LZ4Error(ret, "LZ4 init failed: "); | |
74 | } else { | |
75 | return Status::OK(); | |
76 | } | |
77 | } | |
78 | ||
79 | Status Reset() override { | |
80 | #if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800 | |
81 | // LZ4F_resetDecompressionContext appeared in 1.8.0 | |
82 | DCHECK_NE(ctx_, nullptr); | |
83 | LZ4F_resetDecompressionContext(ctx_); | |
84 | finished_ = false; | |
85 | return Status::OK(); | |
86 | #else | |
87 | if (ctx_ != nullptr) { | |
88 | ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_)); | |
89 | } | |
90 | return Init(); | |
91 | #endif | |
92 | } | |
93 | ||
94 | Result<DecompressResult> Decompress(int64_t input_len, const uint8_t* input, | |
95 | int64_t output_len, uint8_t* output) override { | |
96 | auto src = input; | |
97 | auto dst = output; | |
98 | auto src_size = static_cast<size_t>(input_len); | |
99 | auto dst_capacity = static_cast<size_t>(output_len); | |
100 | size_t ret; | |
101 | ||
102 | ret = | |
103 | LZ4F_decompress(ctx_, dst, &dst_capacity, src, &src_size, nullptr /* options */); | |
104 | if (LZ4F_isError(ret)) { | |
105 | return LZ4Error(ret, "LZ4 decompress failed: "); | |
106 | } | |
107 | finished_ = (ret == 0); | |
108 | return DecompressResult{static_cast<int64_t>(src_size), | |
109 | static_cast<int64_t>(dst_capacity), | |
110 | (src_size == 0 && dst_capacity == 0)}; | |
111 | } | |
112 | ||
113 | bool IsFinished() override { return finished_; } | |
114 | ||
115 | protected: | |
116 | LZ4F_decompressionContext_t ctx_ = nullptr; | |
117 | bool finished_; | |
118 | }; | |
119 | ||
120 | // ---------------------------------------------------------------------- | |
121 | // Lz4 frame compressor implementation | |
122 | ||
123 | class LZ4Compressor : public Compressor { | |
124 | public: | |
125 | LZ4Compressor() {} | |
126 | ||
127 | ~LZ4Compressor() override { | |
128 | if (ctx_ != nullptr) { | |
129 | ARROW_UNUSED(LZ4F_freeCompressionContext(ctx_)); | |
130 | } | |
131 | } | |
132 | ||
133 | Status Init() { | |
134 | LZ4F_errorCode_t ret; | |
135 | prefs_ = DefaultPreferences(); | |
136 | first_time_ = true; | |
137 | ||
138 | ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); | |
139 | if (LZ4F_isError(ret)) { | |
140 | return LZ4Error(ret, "LZ4 init failed: "); | |
141 | } else { | |
142 | return Status::OK(); | |
143 | } | |
144 | } | |
145 | ||
146 | #define BEGIN_COMPRESS(dst, dst_capacity, output_too_small) \ | |
147 | if (first_time_) { \ | |
148 | if (dst_capacity < LZ4F_HEADER_SIZE_MAX) { \ | |
149 | /* Output too small to write LZ4F header */ \ | |
150 | return (output_too_small); \ | |
151 | } \ | |
152 | ret = LZ4F_compressBegin(ctx_, dst, dst_capacity, &prefs_); \ | |
153 | if (LZ4F_isError(ret)) { \ | |
154 | return LZ4Error(ret, "LZ4 compress begin failed: "); \ | |
155 | } \ | |
156 | first_time_ = false; \ | |
157 | dst += ret; \ | |
158 | dst_capacity -= ret; \ | |
159 | bytes_written += static_cast<int64_t>(ret); \ | |
160 | } | |
161 | ||
162 | Result<CompressResult> Compress(int64_t input_len, const uint8_t* input, | |
163 | int64_t output_len, uint8_t* output) override { | |
164 | auto src = input; | |
165 | auto dst = output; | |
166 | auto src_size = static_cast<size_t>(input_len); | |
167 | auto dst_capacity = static_cast<size_t>(output_len); | |
168 | size_t ret; | |
169 | int64_t bytes_written = 0; | |
170 | ||
171 | BEGIN_COMPRESS(dst, dst_capacity, (CompressResult{0, 0})); | |
172 | ||
173 | if (dst_capacity < LZ4F_compressBound(src_size, &prefs_)) { | |
174 | // Output too small to compress into | |
175 | return CompressResult{0, bytes_written}; | |
176 | } | |
177 | ret = LZ4F_compressUpdate(ctx_, dst, dst_capacity, src, src_size, | |
178 | nullptr /* options */); | |
179 | if (LZ4F_isError(ret)) { | |
180 | return LZ4Error(ret, "LZ4 compress update failed: "); | |
181 | } | |
182 | bytes_written += static_cast<int64_t>(ret); | |
183 | DCHECK_LE(bytes_written, output_len); | |
184 | return CompressResult{input_len, bytes_written}; | |
185 | } | |
186 | ||
187 | Result<FlushResult> Flush(int64_t output_len, uint8_t* output) override { | |
188 | auto dst = output; | |
189 | auto dst_capacity = static_cast<size_t>(output_len); | |
190 | size_t ret; | |
191 | int64_t bytes_written = 0; | |
192 | ||
193 | BEGIN_COMPRESS(dst, dst_capacity, (FlushResult{0, true})); | |
194 | ||
195 | if (dst_capacity < LZ4F_compressBound(0, &prefs_)) { | |
196 | // Output too small to flush into | |
197 | return FlushResult{bytes_written, true}; | |
198 | } | |
199 | ||
200 | ret = LZ4F_flush(ctx_, dst, dst_capacity, nullptr /* options */); | |
201 | if (LZ4F_isError(ret)) { | |
202 | return LZ4Error(ret, "LZ4 flush failed: "); | |
203 | } | |
204 | bytes_written += static_cast<int64_t>(ret); | |
205 | DCHECK_LE(bytes_written, output_len); | |
206 | return FlushResult{bytes_written, false}; | |
207 | } | |
208 | ||
209 | Result<EndResult> End(int64_t output_len, uint8_t* output) override { | |
210 | auto dst = output; | |
211 | auto dst_capacity = static_cast<size_t>(output_len); | |
212 | size_t ret; | |
213 | int64_t bytes_written = 0; | |
214 | ||
215 | BEGIN_COMPRESS(dst, dst_capacity, (EndResult{0, true})); | |
216 | ||
217 | if (dst_capacity < LZ4F_compressBound(0, &prefs_)) { | |
218 | // Output too small to end frame into | |
219 | return EndResult{bytes_written, true}; | |
220 | } | |
221 | ||
222 | ret = LZ4F_compressEnd(ctx_, dst, dst_capacity, nullptr /* options */); | |
223 | if (LZ4F_isError(ret)) { | |
224 | return LZ4Error(ret, "LZ4 end failed: "); | |
225 | } | |
226 | bytes_written += static_cast<int64_t>(ret); | |
227 | DCHECK_LE(bytes_written, output_len); | |
228 | return EndResult{bytes_written, false}; | |
229 | } | |
230 | ||
231 | #undef BEGIN_COMPRESS | |
232 | ||
233 | protected: | |
234 | LZ4F_compressionContext_t ctx_ = nullptr; | |
235 | LZ4F_preferences_t prefs_; | |
236 | bool first_time_; | |
237 | }; | |
238 | ||
239 | // ---------------------------------------------------------------------- | |
240 | // Lz4 frame codec implementation | |
241 | ||
242 | class Lz4FrameCodec : public Codec { | |
243 | public: | |
244 | Lz4FrameCodec() : prefs_(DefaultPreferences()) {} | |
245 | ||
246 | int64_t MaxCompressedLen(int64_t input_len, | |
247 | const uint8_t* ARROW_ARG_UNUSED(input)) override { | |
248 | return static_cast<int64_t>( | |
249 | LZ4F_compressFrameBound(static_cast<size_t>(input_len), &prefs_)); | |
250 | } | |
251 | ||
252 | Result<int64_t> Compress(int64_t input_len, const uint8_t* input, | |
253 | int64_t output_buffer_len, uint8_t* output_buffer) override { | |
254 | auto output_len = | |
255 | LZ4F_compressFrame(output_buffer, static_cast<size_t>(output_buffer_len), input, | |
256 | static_cast<size_t>(input_len), &prefs_); | |
257 | if (LZ4F_isError(output_len)) { | |
258 | return LZ4Error(output_len, "Lz4 compression failure: "); | |
259 | } | |
260 | return static_cast<int64_t>(output_len); | |
261 | } | |
262 | ||
263 | Result<int64_t> Decompress(int64_t input_len, const uint8_t* input, | |
264 | int64_t output_buffer_len, uint8_t* output_buffer) override { | |
265 | ARROW_ASSIGN_OR_RAISE(auto decomp, MakeDecompressor()); | |
266 | ||
267 | int64_t total_bytes_written = 0; | |
268 | while (!decomp->IsFinished() && input_len != 0) { | |
269 | ARROW_ASSIGN_OR_RAISE( | |
270 | auto res, | |
271 | decomp->Decompress(input_len, input, output_buffer_len, output_buffer)); | |
272 | input += res.bytes_read; | |
273 | input_len -= res.bytes_read; | |
274 | output_buffer += res.bytes_written; | |
275 | output_buffer_len -= res.bytes_written; | |
276 | total_bytes_written += res.bytes_written; | |
277 | if (res.need_more_output) { | |
278 | return Status::IOError("Lz4 decompression buffer too small"); | |
279 | } | |
280 | } | |
281 | if (!decomp->IsFinished()) { | |
282 | return Status::IOError("Lz4 compressed input contains less than one frame"); | |
283 | } | |
284 | if (input_len != 0) { | |
285 | return Status::IOError("Lz4 compressed input contains more than one frame"); | |
286 | } | |
287 | return total_bytes_written; | |
288 | } | |
289 | ||
290 | Result<std::shared_ptr<Compressor>> MakeCompressor() override { | |
291 | auto ptr = std::make_shared<LZ4Compressor>(); | |
292 | RETURN_NOT_OK(ptr->Init()); | |
293 | return ptr; | |
294 | } | |
295 | ||
296 | Result<std::shared_ptr<Decompressor>> MakeDecompressor() override { | |
297 | auto ptr = std::make_shared<LZ4Decompressor>(); | |
298 | RETURN_NOT_OK(ptr->Init()); | |
299 | return ptr; | |
300 | } | |
301 | ||
302 | Compression::type compression_type() const override { return Compression::LZ4_FRAME; } | |
303 | int minimum_compression_level() const override { return kUseDefaultCompressionLevel; } | |
304 | int maximum_compression_level() const override { return kUseDefaultCompressionLevel; } | |
305 | int default_compression_level() const override { return kUseDefaultCompressionLevel; } | |
306 | ||
307 | protected: | |
308 | const LZ4F_preferences_t prefs_; | |
309 | }; | |
310 | ||
311 | // ---------------------------------------------------------------------- | |
312 | // Lz4 "raw" codec implementation | |
313 | ||
314 | class Lz4Codec : public Codec { | |
315 | public: | |
316 | Result<int64_t> Decompress(int64_t input_len, const uint8_t* input, | |
317 | int64_t output_buffer_len, uint8_t* output_buffer) override { | |
318 | int64_t decompressed_size = LZ4_decompress_safe( | |
319 | reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer), | |
320 | static_cast<int>(input_len), static_cast<int>(output_buffer_len)); | |
321 | if (decompressed_size < 0) { | |
322 | return Status::IOError("Corrupt Lz4 compressed data."); | |
323 | } | |
324 | return decompressed_size; | |
325 | } | |
326 | ||
327 | int64_t MaxCompressedLen(int64_t input_len, | |
328 | const uint8_t* ARROW_ARG_UNUSED(input)) override { | |
329 | return LZ4_compressBound(static_cast<int>(input_len)); | |
330 | } | |
331 | ||
332 | Result<int64_t> Compress(int64_t input_len, const uint8_t* input, | |
333 | int64_t output_buffer_len, uint8_t* output_buffer) override { | |
334 | int64_t output_len = LZ4_compress_default( | |
335 | reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer), | |
336 | static_cast<int>(input_len), static_cast<int>(output_buffer_len)); | |
337 | if (output_len == 0) { | |
338 | return Status::IOError("Lz4 compression failure."); | |
339 | } | |
340 | return output_len; | |
341 | } | |
342 | ||
343 | Result<std::shared_ptr<Compressor>> MakeCompressor() override { | |
344 | return Status::NotImplemented( | |
345 | "Streaming compression unsupported with LZ4 raw format. " | |
346 | "Try using LZ4 frame format instead."); | |
347 | } | |
348 | ||
349 | Result<std::shared_ptr<Decompressor>> MakeDecompressor() override { | |
350 | return Status::NotImplemented( | |
351 | "Streaming decompression unsupported with LZ4 raw format. " | |
352 | "Try using LZ4 frame format instead."); | |
353 | } | |
354 | ||
355 | Compression::type compression_type() const override { return Compression::LZ4; } | |
356 | int minimum_compression_level() const override { return kUseDefaultCompressionLevel; } | |
357 | int maximum_compression_level() const override { return kUseDefaultCompressionLevel; } | |
358 | int default_compression_level() const override { return kUseDefaultCompressionLevel; } | |
359 | }; | |
360 | ||
361 | // ---------------------------------------------------------------------- | |
362 | // Lz4 Hadoop "raw" codec implementation | |
363 | ||
364 | class Lz4HadoopCodec : public Lz4Codec { | |
365 | public: | |
366 | Result<int64_t> Decompress(int64_t input_len, const uint8_t* input, | |
367 | int64_t output_buffer_len, uint8_t* output_buffer) override { | |
368 | const int64_t decompressed_size = | |
369 | TryDecompressHadoop(input_len, input, output_buffer_len, output_buffer); | |
370 | if (decompressed_size != kNotHadoop) { | |
371 | return decompressed_size; | |
372 | } | |
373 | // Fall back on raw LZ4 codec (for files produces by earlier versions of Parquet C++) | |
374 | return Lz4Codec::Decompress(input_len, input, output_buffer_len, output_buffer); | |
375 | } | |
376 | ||
377 | int64_t MaxCompressedLen(int64_t input_len, | |
378 | const uint8_t* ARROW_ARG_UNUSED(input)) override { | |
379 | return kPrefixLength + Lz4Codec::MaxCompressedLen(input_len, nullptr); | |
380 | } | |
381 | ||
382 | Result<int64_t> Compress(int64_t input_len, const uint8_t* input, | |
383 | int64_t output_buffer_len, uint8_t* output_buffer) override { | |
384 | if (output_buffer_len < kPrefixLength) { | |
385 | return Status::Invalid("Output buffer too small for Lz4HadoopCodec compression"); | |
386 | } | |
387 | ||
388 | ARROW_ASSIGN_OR_RAISE( | |
389 | int64_t output_len, | |
390 | Lz4Codec::Compress(input_len, input, output_buffer_len - kPrefixLength, | |
391 | output_buffer + kPrefixLength)); | |
392 | ||
393 | // Prepend decompressed size in bytes and compressed size in bytes | |
394 | // to be compatible with Hadoop Lz4Codec | |
395 | const uint32_t decompressed_size = | |
396 | BitUtil::ToBigEndian(static_cast<uint32_t>(input_len)); | |
397 | const uint32_t compressed_size = | |
398 | BitUtil::ToBigEndian(static_cast<uint32_t>(output_len)); | |
399 | SafeStore(output_buffer, decompressed_size); | |
400 | SafeStore(output_buffer + sizeof(uint32_t), compressed_size); | |
401 | ||
402 | return kPrefixLength + output_len; | |
403 | } | |
404 | ||
405 | Result<std::shared_ptr<Compressor>> MakeCompressor() override { | |
406 | return Status::NotImplemented( | |
407 | "Streaming compression unsupported with LZ4 Hadoop raw format. " | |
408 | "Try using LZ4 frame format instead."); | |
409 | } | |
410 | ||
411 | Result<std::shared_ptr<Decompressor>> MakeDecompressor() override { | |
412 | return Status::NotImplemented( | |
413 | "Streaming decompression unsupported with LZ4 Hadoop raw format. " | |
414 | "Try using LZ4 frame format instead."); | |
415 | } | |
416 | ||
417 | Compression::type compression_type() const override { return Compression::LZ4_HADOOP; } | |
418 | ||
419 | protected: | |
420 | // Offset starting at which page data can be read/written | |
421 | static const int64_t kPrefixLength = sizeof(uint32_t) * 2; | |
422 | ||
423 | static const int64_t kNotHadoop = -1; | |
424 | ||
425 | int64_t TryDecompressHadoop(int64_t input_len, const uint8_t* input, | |
426 | int64_t output_buffer_len, uint8_t* output_buffer) { | |
427 | // Parquet files written with the Hadoop Lz4Codec use their own framing. | |
428 | // The input buffer can contain an arbitrary number of "frames", each | |
429 | // with the following structure: | |
430 | // - bytes 0..3: big-endian uint32_t representing the frame decompressed size | |
431 | // - bytes 4..7: big-endian uint32_t representing the frame compressed size | |
432 | // - bytes 8...: frame compressed data | |
433 | // | |
434 | // The Hadoop Lz4Codec source code can be found here: | |
435 | // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc | |
436 | int64_t total_decompressed_size = 0; | |
437 | ||
438 | while (input_len >= kPrefixLength) { | |
439 | const uint32_t expected_decompressed_size = | |
440 | BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input)); | |
441 | const uint32_t expected_compressed_size = | |
442 | BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t))); | |
443 | input += kPrefixLength; | |
444 | input_len -= kPrefixLength; | |
445 | ||
446 | if (input_len < expected_compressed_size) { | |
447 | // Not enough bytes for Hadoop "frame" | |
448 | return kNotHadoop; | |
449 | } | |
450 | if (output_buffer_len < expected_decompressed_size) { | |
451 | // Not enough bytes to hold advertised output => probably not Hadoop | |
452 | return kNotHadoop; | |
453 | } | |
454 | // Try decompressing and compare with expected decompressed length | |
455 | auto maybe_decompressed_size = Lz4Codec::Decompress( | |
456 | expected_compressed_size, input, output_buffer_len, output_buffer); | |
457 | if (!maybe_decompressed_size.ok() || | |
458 | *maybe_decompressed_size != expected_decompressed_size) { | |
459 | return kNotHadoop; | |
460 | } | |
461 | input += expected_compressed_size; | |
462 | input_len -= expected_compressed_size; | |
463 | output_buffer += expected_decompressed_size; | |
464 | output_buffer_len -= expected_decompressed_size; | |
465 | total_decompressed_size += expected_decompressed_size; | |
466 | } | |
467 | ||
468 | if (input_len == 0) { | |
469 | return total_decompressed_size; | |
470 | } else { | |
471 | return kNotHadoop; | |
472 | } | |
473 | } | |
474 | }; | |
475 | ||
476 | } // namespace | |
477 | ||
478 | namespace internal { | |
479 | ||
480 | std::unique_ptr<Codec> MakeLz4FrameCodec() { | |
481 | return std::unique_ptr<Codec>(new Lz4FrameCodec()); | |
482 | } | |
483 | ||
484 | std::unique_ptr<Codec> MakeLz4HadoopRawCodec() { | |
485 | return std::unique_ptr<Codec>(new Lz4HadoopCodec()); | |
486 | } | |
487 | ||
488 | std::unique_ptr<Codec> MakeLz4RawCodec() { | |
489 | return std::unique_ptr<Codec>(new Lz4Codec()); | |
490 | } | |
491 | ||
492 | } // namespace internal | |
493 | ||
494 | } // namespace util | |
495 | } // namespace arrow |