]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/util/compression_internal.h" | |
19 | ||
20 | #include <algorithm> | |
21 | #include <cstdint> | |
22 | #include <cstring> | |
23 | #include <limits> | |
24 | #include <memory> | |
25 | ||
26 | #include <zconf.h> | |
27 | #include <zlib.h> | |
28 | ||
29 | #include "arrow/result.h" | |
30 | #include "arrow/status.h" | |
31 | #include "arrow/util/logging.h" | |
32 | #include "arrow/util/macros.h" | |
33 | ||
34 | namespace arrow { | |
35 | namespace util { | |
36 | namespace internal { | |
37 | ||
38 | namespace { | |
39 | ||
40 | // ---------------------------------------------------------------------- | |
41 | // gzip implementation | |
42 | ||
43 | // These are magic numbers from zlib.h. Not clear why they are not defined | |
44 | // there. | |
45 | ||
46 | // Maximum window size | |
47 | constexpr int WINDOW_BITS = 15; | |
48 | ||
49 | // Output Gzip. | |
50 | constexpr int GZIP_CODEC = 16; | |
51 | ||
52 | // Determine if this is libz or gzip from header. | |
53 | constexpr int DETECT_CODEC = 32; | |
54 | ||
55 | constexpr int kGZipMinCompressionLevel = 1; | |
56 | constexpr int kGZipMaxCompressionLevel = 9; | |
57 | ||
58 | int CompressionWindowBitsForFormat(GZipFormat::type format) { | |
59 | int window_bits = WINDOW_BITS; | |
60 | switch (format) { | |
61 | case GZipFormat::DEFLATE: | |
62 | window_bits = -window_bits; | |
63 | break; | |
64 | case GZipFormat::GZIP: | |
65 | window_bits += GZIP_CODEC; | |
66 | break; | |
67 | case GZipFormat::ZLIB: | |
68 | break; | |
69 | } | |
70 | return window_bits; | |
71 | } | |
72 | ||
73 | int DecompressionWindowBitsForFormat(GZipFormat::type format) { | |
74 | if (format == GZipFormat::DEFLATE) { | |
75 | return -WINDOW_BITS; | |
76 | } else { | |
77 | /* If not deflate, autodetect format from header */ | |
78 | return WINDOW_BITS | DETECT_CODEC; | |
79 | } | |
80 | } | |
81 | ||
82 | Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) { | |
83 | return Status::IOError(prefix_msg, (msg) ? msg : "(unknown error)"); | |
84 | } | |
85 | ||
86 | // ---------------------------------------------------------------------- | |
87 | // gzip decompressor implementation | |
88 | ||
89 | class GZipDecompressor : public Decompressor { | |
90 | public: | |
91 | explicit GZipDecompressor(GZipFormat::type format) | |
92 | : format_(format), initialized_(false), finished_(false) {} | |
93 | ||
94 | ~GZipDecompressor() override { | |
95 | if (initialized_) { | |
96 | inflateEnd(&stream_); | |
97 | } | |
98 | } | |
99 | ||
100 | Status Init() { | |
101 | DCHECK(!initialized_); | |
102 | memset(&stream_, 0, sizeof(stream_)); | |
103 | finished_ = false; | |
104 | ||
105 | int ret; | |
106 | int window_bits = DecompressionWindowBitsForFormat(format_); | |
107 | if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { | |
108 | return ZlibError("zlib inflateInit failed: "); | |
109 | } else { | |
110 | initialized_ = true; | |
111 | return Status::OK(); | |
112 | } | |
113 | } | |
114 | ||
115 | Status Reset() override { | |
116 | DCHECK(initialized_); | |
117 | finished_ = false; | |
118 | int ret; | |
119 | if ((ret = inflateReset(&stream_)) != Z_OK) { | |
120 | return ZlibError("zlib inflateReset failed: "); | |
121 | } else { | |
122 | return Status::OK(); | |
123 | } | |
124 | } | |
125 | ||
126 | Result<DecompressResult> Decompress(int64_t input_len, const uint8_t* input, | |
127 | int64_t output_len, uint8_t* output) override { | |
128 | static constexpr auto input_limit = | |
129 | static_cast<int64_t>(std::numeric_limits<uInt>::max()); | |
130 | stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); | |
131 | stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit)); | |
132 | stream_.next_out = reinterpret_cast<Bytef*>(output); | |
133 | stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit)); | |
134 | int ret; | |
135 | ||
136 | ret = inflate(&stream_, Z_SYNC_FLUSH); | |
137 | if (ret == Z_DATA_ERROR || ret == Z_STREAM_ERROR || ret == Z_MEM_ERROR) { | |
138 | return ZlibError("zlib inflate failed: "); | |
139 | } | |
140 | if (ret == Z_NEED_DICT) { | |
141 | return ZlibError("zlib inflate failed (need preset dictionary): "); | |
142 | } | |
143 | finished_ = (ret == Z_STREAM_END); | |
144 | if (ret == Z_BUF_ERROR) { | |
145 | // No progress was possible | |
146 | return DecompressResult{0, 0, true}; | |
147 | } else { | |
148 | ARROW_CHECK(ret == Z_OK || ret == Z_STREAM_END); | |
149 | // Some progress has been made | |
150 | return DecompressResult{input_len - stream_.avail_in, | |
151 | output_len - stream_.avail_out, false}; | |
152 | } | |
153 | return Status::OK(); | |
154 | } | |
155 | ||
156 | bool IsFinished() override { return finished_; } | |
157 | ||
158 | protected: | |
159 | Status ZlibError(const char* prefix_msg) { | |
160 | return ZlibErrorPrefix(prefix_msg, stream_.msg); | |
161 | } | |
162 | ||
163 | z_stream stream_; | |
164 | GZipFormat::type format_; | |
165 | bool initialized_; | |
166 | bool finished_; | |
167 | }; | |
168 | ||
169 | // ---------------------------------------------------------------------- | |
170 | // gzip compressor implementation | |
171 | ||
172 | class GZipCompressor : public Compressor { | |
173 | public: | |
174 | explicit GZipCompressor(int compression_level) | |
175 | : initialized_(false), compression_level_(compression_level) {} | |
176 | ||
177 | ~GZipCompressor() override { | |
178 | if (initialized_) { | |
179 | deflateEnd(&stream_); | |
180 | } | |
181 | } | |
182 | ||
183 | Status Init(GZipFormat::type format) { | |
184 | DCHECK(!initialized_); | |
185 | memset(&stream_, 0, sizeof(stream_)); | |
186 | ||
187 | int ret; | |
188 | // Initialize to run specified format | |
189 | int window_bits = CompressionWindowBitsForFormat(format); | |
190 | if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, | |
191 | compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { | |
192 | return ZlibError("zlib deflateInit failed: "); | |
193 | } else { | |
194 | initialized_ = true; | |
195 | return Status::OK(); | |
196 | } | |
197 | } | |
198 | ||
199 | Result<CompressResult> Compress(int64_t input_len, const uint8_t* input, | |
200 | int64_t output_len, uint8_t* output) override { | |
201 | DCHECK(initialized_) << "Called on non-initialized stream"; | |
202 | ||
203 | static constexpr auto input_limit = | |
204 | static_cast<int64_t>(std::numeric_limits<uInt>::max()); | |
205 | ||
206 | stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); | |
207 | stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit)); | |
208 | stream_.next_out = reinterpret_cast<Bytef*>(output); | |
209 | stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit)); | |
210 | ||
211 | int64_t ret = 0; | |
212 | ret = deflate(&stream_, Z_NO_FLUSH); | |
213 | if (ret == Z_STREAM_ERROR) { | |
214 | return ZlibError("zlib compress failed: "); | |
215 | } | |
216 | if (ret == Z_OK) { | |
217 | // Some progress has been made | |
218 | return CompressResult{input_len - stream_.avail_in, output_len - stream_.avail_out}; | |
219 | } else { | |
220 | // No progress was possible | |
221 | ARROW_CHECK_EQ(ret, Z_BUF_ERROR); | |
222 | return CompressResult{0, 0}; | |
223 | } | |
224 | } | |
225 | ||
226 | Result<FlushResult> Flush(int64_t output_len, uint8_t* output) override { | |
227 | DCHECK(initialized_) << "Called on non-initialized stream"; | |
228 | ||
229 | static constexpr auto input_limit = | |
230 | static_cast<int64_t>(std::numeric_limits<uInt>::max()); | |
231 | ||
232 | stream_.avail_in = 0; | |
233 | stream_.next_out = reinterpret_cast<Bytef*>(output); | |
234 | stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit)); | |
235 | ||
236 | int64_t ret = 0; | |
237 | ret = deflate(&stream_, Z_SYNC_FLUSH); | |
238 | if (ret == Z_STREAM_ERROR) { | |
239 | return ZlibError("zlib flush failed: "); | |
240 | } | |
241 | int64_t bytes_written; | |
242 | if (ret == Z_OK) { | |
243 | bytes_written = output_len - stream_.avail_out; | |
244 | } else { | |
245 | ARROW_CHECK_EQ(ret, Z_BUF_ERROR); | |
246 | bytes_written = 0; | |
247 | } | |
248 | // "If deflate returns with avail_out == 0, this function must be called | |
249 | // again with the same value of the flush parameter and more output space | |
250 | // (updated avail_out), until the flush is complete (deflate returns | |
251 | // with non-zero avail_out)." | |
252 | // "Note that Z_BUF_ERROR is not fatal, and deflate() can be called again | |
253 | // with more input and more output space to continue compressing." | |
254 | return FlushResult{bytes_written, stream_.avail_out == 0}; | |
255 | } | |
256 | ||
257 | Result<EndResult> End(int64_t output_len, uint8_t* output) override { | |
258 | DCHECK(initialized_) << "Called on non-initialized stream"; | |
259 | ||
260 | static constexpr auto input_limit = | |
261 | static_cast<int64_t>(std::numeric_limits<uInt>::max()); | |
262 | ||
263 | stream_.avail_in = 0; | |
264 | stream_.next_out = reinterpret_cast<Bytef*>(output); | |
265 | stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit)); | |
266 | ||
267 | int64_t ret = 0; | |
268 | ret = deflate(&stream_, Z_FINISH); | |
269 | if (ret == Z_STREAM_ERROR) { | |
270 | return ZlibError("zlib flush failed: "); | |
271 | } | |
272 | int64_t bytes_written = output_len - stream_.avail_out; | |
273 | if (ret == Z_STREAM_END) { | |
274 | // Flush complete, we can now end the stream | |
275 | initialized_ = false; | |
276 | ret = deflateEnd(&stream_); | |
277 | if (ret == Z_OK) { | |
278 | return EndResult{bytes_written, false}; | |
279 | } else { | |
280 | return ZlibError("zlib end failed: "); | |
281 | } | |
282 | } else { | |
283 | // Not everything could be flushed, | |
284 | return EndResult{bytes_written, true}; | |
285 | } | |
286 | } | |
287 | ||
288 | protected: | |
289 | Status ZlibError(const char* prefix_msg) { | |
290 | return ZlibErrorPrefix(prefix_msg, stream_.msg); | |
291 | } | |
292 | ||
293 | z_stream stream_; | |
294 | bool initialized_; | |
295 | int compression_level_; | |
296 | }; | |
297 | ||
298 | // ---------------------------------------------------------------------- | |
299 | // gzip codec implementation | |
300 | ||
301 | class GZipCodec : public Codec { | |
302 | public: | |
303 | explicit GZipCodec(int compression_level, GZipFormat::type format) | |
304 | : format_(format), | |
305 | compressor_initialized_(false), | |
306 | decompressor_initialized_(false) { | |
307 | compression_level_ = compression_level == kUseDefaultCompressionLevel | |
308 | ? kGZipDefaultCompressionLevel | |
309 | : compression_level; | |
310 | } | |
311 | ||
312 | ~GZipCodec() override { | |
313 | EndCompressor(); | |
314 | EndDecompressor(); | |
315 | } | |
316 | ||
317 | Result<std::shared_ptr<Compressor>> MakeCompressor() override { | |
318 | auto ptr = std::make_shared<GZipCompressor>(compression_level_); | |
319 | RETURN_NOT_OK(ptr->Init(format_)); | |
320 | return ptr; | |
321 | } | |
322 | ||
323 | Result<std::shared_ptr<Decompressor>> MakeDecompressor() override { | |
324 | auto ptr = std::make_shared<GZipDecompressor>(format_); | |
325 | RETURN_NOT_OK(ptr->Init()); | |
326 | return ptr; | |
327 | } | |
328 | ||
329 | Status InitCompressor() { | |
330 | EndDecompressor(); | |
331 | memset(&stream_, 0, sizeof(stream_)); | |
332 | ||
333 | int ret; | |
334 | // Initialize to run specified format | |
335 | int window_bits = CompressionWindowBitsForFormat(format_); | |
336 | if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, | |
337 | compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { | |
338 | return ZlibErrorPrefix("zlib deflateInit failed: ", stream_.msg); | |
339 | } | |
340 | compressor_initialized_ = true; | |
341 | return Status::OK(); | |
342 | } | |
343 | ||
344 | void EndCompressor() { | |
345 | if (compressor_initialized_) { | |
346 | (void)deflateEnd(&stream_); | |
347 | } | |
348 | compressor_initialized_ = false; | |
349 | } | |
350 | ||
351 | Status InitDecompressor() { | |
352 | EndCompressor(); | |
353 | memset(&stream_, 0, sizeof(stream_)); | |
354 | int ret; | |
355 | ||
356 | // Initialize to run either deflate or zlib/gzip format | |
357 | int window_bits = DecompressionWindowBitsForFormat(format_); | |
358 | if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { | |
359 | return ZlibErrorPrefix("zlib inflateInit failed: ", stream_.msg); | |
360 | } | |
361 | decompressor_initialized_ = true; | |
362 | return Status::OK(); | |
363 | } | |
364 | ||
365 | void EndDecompressor() { | |
366 | if (decompressor_initialized_) { | |
367 | (void)inflateEnd(&stream_); | |
368 | } | |
369 | decompressor_initialized_ = false; | |
370 | } | |
371 | ||
372 | Result<int64_t> Decompress(int64_t input_length, const uint8_t* input, | |
373 | int64_t output_buffer_length, uint8_t* output) override { | |
374 | if (!decompressor_initialized_) { | |
375 | RETURN_NOT_OK(InitDecompressor()); | |
376 | } | |
377 | if (output_buffer_length == 0) { | |
378 | // The zlib library does not allow *output to be NULL, even when | |
379 | // output_buffer_length is 0 (inflate() will return Z_STREAM_ERROR). We don't | |
380 | // consider this an error, so bail early if no output is expected. Note that we | |
381 | // don't signal an error if the input actually contains compressed data. | |
382 | return 0; | |
383 | } | |
384 | ||
385 | // Reset the stream for this block | |
386 | if (inflateReset(&stream_) != Z_OK) { | |
387 | return ZlibErrorPrefix("zlib inflateReset failed: ", stream_.msg); | |
388 | } | |
389 | ||
390 | int ret = 0; | |
391 | // gzip can run in streaming mode or non-streaming mode. We only | |
392 | // support the non-streaming use case where we present it the entire | |
393 | // compressed input and a buffer big enough to contain the entire | |
394 | // compressed output. In the case where we don't know the output, | |
395 | // we just make a bigger buffer and try the non-streaming mode | |
396 | // from the beginning again. | |
397 | while (ret != Z_STREAM_END) { | |
398 | stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); | |
399 | stream_.avail_in = static_cast<uInt>(input_length); | |
400 | stream_.next_out = reinterpret_cast<Bytef*>(output); | |
401 | stream_.avail_out = static_cast<uInt>(output_buffer_length); | |
402 | ||
403 | // We know the output size. In this case, we can use Z_FINISH | |
404 | // which is more efficient. | |
405 | ret = inflate(&stream_, Z_FINISH); | |
406 | if (ret == Z_STREAM_END || ret != Z_OK) break; | |
407 | ||
408 | // Failure, buffer was too small | |
409 | return Status::IOError("Too small a buffer passed to GZipCodec. InputLength=", | |
410 | input_length, " OutputLength=", output_buffer_length); | |
411 | } | |
412 | ||
413 | // Failure for some other reason | |
414 | if (ret != Z_STREAM_END) { | |
415 | return ZlibErrorPrefix("GZipCodec failed: ", stream_.msg); | |
416 | } | |
417 | ||
418 | return stream_.total_out; | |
419 | } | |
420 | ||
421 | int64_t MaxCompressedLen(int64_t input_length, | |
422 | const uint8_t* ARROW_ARG_UNUSED(input)) override { | |
423 | // Must be in compression mode | |
424 | if (!compressor_initialized_) { | |
425 | Status s = InitCompressor(); | |
426 | ARROW_CHECK_OK(s); | |
427 | } | |
428 | int64_t max_len = deflateBound(&stream_, static_cast<uLong>(input_length)); | |
429 | // ARROW-3514: return a more pessimistic estimate to account for bugs | |
430 | // in old zlib versions. | |
431 | return max_len + 12; | |
432 | } | |
433 | ||
434 | Result<int64_t> Compress(int64_t input_length, const uint8_t* input, | |
435 | int64_t output_buffer_len, uint8_t* output) override { | |
436 | if (!compressor_initialized_) { | |
437 | RETURN_NOT_OK(InitCompressor()); | |
438 | } | |
439 | stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); | |
440 | stream_.avail_in = static_cast<uInt>(input_length); | |
441 | stream_.next_out = reinterpret_cast<Bytef*>(output); | |
442 | stream_.avail_out = static_cast<uInt>(output_buffer_len); | |
443 | ||
444 | int64_t ret = 0; | |
445 | if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) { | |
446 | if (ret == Z_OK) { | |
447 | // Will return Z_OK (and stream.msg NOT set) if stream.avail_out is too | |
448 | // small | |
449 | return Status::IOError("zlib deflate failed, output buffer too small"); | |
450 | } | |
451 | ||
452 | return ZlibErrorPrefix("zlib deflate failed: ", stream_.msg); | |
453 | } | |
454 | ||
455 | if (deflateReset(&stream_) != Z_OK) { | |
456 | return ZlibErrorPrefix("zlib deflateReset failed: ", stream_.msg); | |
457 | } | |
458 | ||
459 | // Actual output length | |
460 | return output_buffer_len - stream_.avail_out; | |
461 | } | |
462 | ||
463 | Status Init() override { | |
464 | const Status init_compressor_status = InitCompressor(); | |
465 | if (!init_compressor_status.ok()) { | |
466 | return init_compressor_status; | |
467 | } | |
468 | return InitDecompressor(); | |
469 | } | |
470 | ||
471 | Compression::type compression_type() const override { return Compression::GZIP; } | |
472 | ||
473 | int compression_level() const override { return compression_level_; } | |
474 | int minimum_compression_level() const override { return kGZipMinCompressionLevel; } | |
475 | int maximum_compression_level() const override { return kGZipMaxCompressionLevel; } | |
476 | int default_compression_level() const override { return kGZipDefaultCompressionLevel; } | |
477 | ||
478 | private: | |
479 | // zlib is stateful and the z_stream state variable must be initialized | |
480 | // before | |
481 | z_stream stream_; | |
482 | ||
483 | // Realistically, this will always be GZIP, but we leave the option open to | |
484 | // configure | |
485 | GZipFormat::type format_; | |
486 | ||
487 | // These variables are mutually exclusive. When the codec is in "compressor" | |
488 | // state, compressor_initialized_ is true while decompressor_initialized_ is | |
489 | // false. When it's decompressing, the opposite is true. | |
490 | // | |
491 | // Indeed, this is slightly hacky, but the alternative is having separate | |
492 | // Compressor and Decompressor classes. If this ever becomes an issue, we can | |
493 | // perform the refactoring then | |
494 | bool compressor_initialized_; | |
495 | bool decompressor_initialized_; | |
496 | int compression_level_; | |
497 | }; | |
498 | ||
499 | } // namespace | |
500 | ||
501 | std::unique_ptr<Codec> MakeGZipCodec(int compression_level, GZipFormat::type format) { | |
502 | return std::unique_ptr<Codec>(new GZipCodec(compression_level, format)); | |
503 | } | |
504 | ||
505 | } // namespace internal | |
506 | } // namespace util | |
507 | } // namespace arrow |