]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/util/compression_zlib.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / compression_zlib.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/util/compression_internal.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstring>
23#include <limits>
24#include <memory>
25
26#include <zconf.h>
27#include <zlib.h>
28
29#include "arrow/result.h"
30#include "arrow/status.h"
31#include "arrow/util/logging.h"
32#include "arrow/util/macros.h"
33
34namespace arrow {
35namespace util {
36namespace internal {
37
38namespace {
39
40// ----------------------------------------------------------------------
41// gzip implementation
42
43// These are magic numbers from zlib.h. Not clear why they are not defined
44// there.
45
46// Maximum window size
47constexpr int WINDOW_BITS = 15;
48
49// Output Gzip.
50constexpr int GZIP_CODEC = 16;
51
52// Determine if this is libz or gzip from header.
53constexpr int DETECT_CODEC = 32;
54
55constexpr int kGZipMinCompressionLevel = 1;
56constexpr int kGZipMaxCompressionLevel = 9;
57
58int CompressionWindowBitsForFormat(GZipFormat::type format) {
59 int window_bits = WINDOW_BITS;
60 switch (format) {
61 case GZipFormat::DEFLATE:
62 window_bits = -window_bits;
63 break;
64 case GZipFormat::GZIP:
65 window_bits += GZIP_CODEC;
66 break;
67 case GZipFormat::ZLIB:
68 break;
69 }
70 return window_bits;
71}
72
73int DecompressionWindowBitsForFormat(GZipFormat::type format) {
74 if (format == GZipFormat::DEFLATE) {
75 return -WINDOW_BITS;
76 } else {
77 /* If not deflate, autodetect format from header */
78 return WINDOW_BITS | DETECT_CODEC;
79 }
80}
81
82Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) {
83 return Status::IOError(prefix_msg, (msg) ? msg : "(unknown error)");
84}
85
86// ----------------------------------------------------------------------
87// gzip decompressor implementation
88
89class GZipDecompressor : public Decompressor {
90 public:
91 explicit GZipDecompressor(GZipFormat::type format)
92 : format_(format), initialized_(false), finished_(false) {}
93
94 ~GZipDecompressor() override {
95 if (initialized_) {
96 inflateEnd(&stream_);
97 }
98 }
99
100 Status Init() {
101 DCHECK(!initialized_);
102 memset(&stream_, 0, sizeof(stream_));
103 finished_ = false;
104
105 int ret;
106 int window_bits = DecompressionWindowBitsForFormat(format_);
107 if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
108 return ZlibError("zlib inflateInit failed: ");
109 } else {
110 initialized_ = true;
111 return Status::OK();
112 }
113 }
114
115 Status Reset() override {
116 DCHECK(initialized_);
117 finished_ = false;
118 int ret;
119 if ((ret = inflateReset(&stream_)) != Z_OK) {
120 return ZlibError("zlib inflateReset failed: ");
121 } else {
122 return Status::OK();
123 }
124 }
125
126 Result<DecompressResult> Decompress(int64_t input_len, const uint8_t* input,
127 int64_t output_len, uint8_t* output) override {
128 static constexpr auto input_limit =
129 static_cast<int64_t>(std::numeric_limits<uInt>::max());
130 stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
131 stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
132 stream_.next_out = reinterpret_cast<Bytef*>(output);
133 stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
134 int ret;
135
136 ret = inflate(&stream_, Z_SYNC_FLUSH);
137 if (ret == Z_DATA_ERROR || ret == Z_STREAM_ERROR || ret == Z_MEM_ERROR) {
138 return ZlibError("zlib inflate failed: ");
139 }
140 if (ret == Z_NEED_DICT) {
141 return ZlibError("zlib inflate failed (need preset dictionary): ");
142 }
143 finished_ = (ret == Z_STREAM_END);
144 if (ret == Z_BUF_ERROR) {
145 // No progress was possible
146 return DecompressResult{0, 0, true};
147 } else {
148 ARROW_CHECK(ret == Z_OK || ret == Z_STREAM_END);
149 // Some progress has been made
150 return DecompressResult{input_len - stream_.avail_in,
151 output_len - stream_.avail_out, false};
152 }
153 return Status::OK();
154 }
155
156 bool IsFinished() override { return finished_; }
157
158 protected:
159 Status ZlibError(const char* prefix_msg) {
160 return ZlibErrorPrefix(prefix_msg, stream_.msg);
161 }
162
163 z_stream stream_;
164 GZipFormat::type format_;
165 bool initialized_;
166 bool finished_;
167};
168
169// ----------------------------------------------------------------------
170// gzip compressor implementation
171
172class GZipCompressor : public Compressor {
173 public:
174 explicit GZipCompressor(int compression_level)
175 : initialized_(false), compression_level_(compression_level) {}
176
177 ~GZipCompressor() override {
178 if (initialized_) {
179 deflateEnd(&stream_);
180 }
181 }
182
183 Status Init(GZipFormat::type format) {
184 DCHECK(!initialized_);
185 memset(&stream_, 0, sizeof(stream_));
186
187 int ret;
188 // Initialize to run specified format
189 int window_bits = CompressionWindowBitsForFormat(format);
190 if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits,
191 compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) {
192 return ZlibError("zlib deflateInit failed: ");
193 } else {
194 initialized_ = true;
195 return Status::OK();
196 }
197 }
198
199 Result<CompressResult> Compress(int64_t input_len, const uint8_t* input,
200 int64_t output_len, uint8_t* output) override {
201 DCHECK(initialized_) << "Called on non-initialized stream";
202
203 static constexpr auto input_limit =
204 static_cast<int64_t>(std::numeric_limits<uInt>::max());
205
206 stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
207 stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
208 stream_.next_out = reinterpret_cast<Bytef*>(output);
209 stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
210
211 int64_t ret = 0;
212 ret = deflate(&stream_, Z_NO_FLUSH);
213 if (ret == Z_STREAM_ERROR) {
214 return ZlibError("zlib compress failed: ");
215 }
216 if (ret == Z_OK) {
217 // Some progress has been made
218 return CompressResult{input_len - stream_.avail_in, output_len - stream_.avail_out};
219 } else {
220 // No progress was possible
221 ARROW_CHECK_EQ(ret, Z_BUF_ERROR);
222 return CompressResult{0, 0};
223 }
224 }
225
226 Result<FlushResult> Flush(int64_t output_len, uint8_t* output) override {
227 DCHECK(initialized_) << "Called on non-initialized stream";
228
229 static constexpr auto input_limit =
230 static_cast<int64_t>(std::numeric_limits<uInt>::max());
231
232 stream_.avail_in = 0;
233 stream_.next_out = reinterpret_cast<Bytef*>(output);
234 stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
235
236 int64_t ret = 0;
237 ret = deflate(&stream_, Z_SYNC_FLUSH);
238 if (ret == Z_STREAM_ERROR) {
239 return ZlibError("zlib flush failed: ");
240 }
241 int64_t bytes_written;
242 if (ret == Z_OK) {
243 bytes_written = output_len - stream_.avail_out;
244 } else {
245 ARROW_CHECK_EQ(ret, Z_BUF_ERROR);
246 bytes_written = 0;
247 }
248 // "If deflate returns with avail_out == 0, this function must be called
249 // again with the same value of the flush parameter and more output space
250 // (updated avail_out), until the flush is complete (deflate returns
251 // with non-zero avail_out)."
252 // "Note that Z_BUF_ERROR is not fatal, and deflate() can be called again
253 // with more input and more output space to continue compressing."
254 return FlushResult{bytes_written, stream_.avail_out == 0};
255 }
256
257 Result<EndResult> End(int64_t output_len, uint8_t* output) override {
258 DCHECK(initialized_) << "Called on non-initialized stream";
259
260 static constexpr auto input_limit =
261 static_cast<int64_t>(std::numeric_limits<uInt>::max());
262
263 stream_.avail_in = 0;
264 stream_.next_out = reinterpret_cast<Bytef*>(output);
265 stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
266
267 int64_t ret = 0;
268 ret = deflate(&stream_, Z_FINISH);
269 if (ret == Z_STREAM_ERROR) {
270 return ZlibError("zlib flush failed: ");
271 }
272 int64_t bytes_written = output_len - stream_.avail_out;
273 if (ret == Z_STREAM_END) {
274 // Flush complete, we can now end the stream
275 initialized_ = false;
276 ret = deflateEnd(&stream_);
277 if (ret == Z_OK) {
278 return EndResult{bytes_written, false};
279 } else {
280 return ZlibError("zlib end failed: ");
281 }
282 } else {
283 // Not everything could be flushed,
284 return EndResult{bytes_written, true};
285 }
286 }
287
288 protected:
289 Status ZlibError(const char* prefix_msg) {
290 return ZlibErrorPrefix(prefix_msg, stream_.msg);
291 }
292
293 z_stream stream_;
294 bool initialized_;
295 int compression_level_;
296};
297
298// ----------------------------------------------------------------------
299// gzip codec implementation
300
301class GZipCodec : public Codec {
302 public:
303 explicit GZipCodec(int compression_level, GZipFormat::type format)
304 : format_(format),
305 compressor_initialized_(false),
306 decompressor_initialized_(false) {
307 compression_level_ = compression_level == kUseDefaultCompressionLevel
308 ? kGZipDefaultCompressionLevel
309 : compression_level;
310 }
311
312 ~GZipCodec() override {
313 EndCompressor();
314 EndDecompressor();
315 }
316
317 Result<std::shared_ptr<Compressor>> MakeCompressor() override {
318 auto ptr = std::make_shared<GZipCompressor>(compression_level_);
319 RETURN_NOT_OK(ptr->Init(format_));
320 return ptr;
321 }
322
323 Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
324 auto ptr = std::make_shared<GZipDecompressor>(format_);
325 RETURN_NOT_OK(ptr->Init());
326 return ptr;
327 }
328
329 Status InitCompressor() {
330 EndDecompressor();
331 memset(&stream_, 0, sizeof(stream_));
332
333 int ret;
334 // Initialize to run specified format
335 int window_bits = CompressionWindowBitsForFormat(format_);
336 if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits,
337 compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) {
338 return ZlibErrorPrefix("zlib deflateInit failed: ", stream_.msg);
339 }
340 compressor_initialized_ = true;
341 return Status::OK();
342 }
343
344 void EndCompressor() {
345 if (compressor_initialized_) {
346 (void)deflateEnd(&stream_);
347 }
348 compressor_initialized_ = false;
349 }
350
351 Status InitDecompressor() {
352 EndCompressor();
353 memset(&stream_, 0, sizeof(stream_));
354 int ret;
355
356 // Initialize to run either deflate or zlib/gzip format
357 int window_bits = DecompressionWindowBitsForFormat(format_);
358 if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
359 return ZlibErrorPrefix("zlib inflateInit failed: ", stream_.msg);
360 }
361 decompressor_initialized_ = true;
362 return Status::OK();
363 }
364
365 void EndDecompressor() {
366 if (decompressor_initialized_) {
367 (void)inflateEnd(&stream_);
368 }
369 decompressor_initialized_ = false;
370 }
371
372 Result<int64_t> Decompress(int64_t input_length, const uint8_t* input,
373 int64_t output_buffer_length, uint8_t* output) override {
374 if (!decompressor_initialized_) {
375 RETURN_NOT_OK(InitDecompressor());
376 }
377 if (output_buffer_length == 0) {
378 // The zlib library does not allow *output to be NULL, even when
379 // output_buffer_length is 0 (inflate() will return Z_STREAM_ERROR). We don't
380 // consider this an error, so bail early if no output is expected. Note that we
381 // don't signal an error if the input actually contains compressed data.
382 return 0;
383 }
384
385 // Reset the stream for this block
386 if (inflateReset(&stream_) != Z_OK) {
387 return ZlibErrorPrefix("zlib inflateReset failed: ", stream_.msg);
388 }
389
390 int ret = 0;
391 // gzip can run in streaming mode or non-streaming mode. We only
392 // support the non-streaming use case where we present it the entire
393 // compressed input and a buffer big enough to contain the entire
394 // compressed output. In the case where we don't know the output,
395 // we just make a bigger buffer and try the non-streaming mode
396 // from the beginning again.
397 while (ret != Z_STREAM_END) {
398 stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
399 stream_.avail_in = static_cast<uInt>(input_length);
400 stream_.next_out = reinterpret_cast<Bytef*>(output);
401 stream_.avail_out = static_cast<uInt>(output_buffer_length);
402
403 // We know the output size. In this case, we can use Z_FINISH
404 // which is more efficient.
405 ret = inflate(&stream_, Z_FINISH);
406 if (ret == Z_STREAM_END || ret != Z_OK) break;
407
408 // Failure, buffer was too small
409 return Status::IOError("Too small a buffer passed to GZipCodec. InputLength=",
410 input_length, " OutputLength=", output_buffer_length);
411 }
412
413 // Failure for some other reason
414 if (ret != Z_STREAM_END) {
415 return ZlibErrorPrefix("GZipCodec failed: ", stream_.msg);
416 }
417
418 return stream_.total_out;
419 }
420
421 int64_t MaxCompressedLen(int64_t input_length,
422 const uint8_t* ARROW_ARG_UNUSED(input)) override {
423 // Must be in compression mode
424 if (!compressor_initialized_) {
425 Status s = InitCompressor();
426 ARROW_CHECK_OK(s);
427 }
428 int64_t max_len = deflateBound(&stream_, static_cast<uLong>(input_length));
429 // ARROW-3514: return a more pessimistic estimate to account for bugs
430 // in old zlib versions.
431 return max_len + 12;
432 }
433
434 Result<int64_t> Compress(int64_t input_length, const uint8_t* input,
435 int64_t output_buffer_len, uint8_t* output) override {
436 if (!compressor_initialized_) {
437 RETURN_NOT_OK(InitCompressor());
438 }
439 stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
440 stream_.avail_in = static_cast<uInt>(input_length);
441 stream_.next_out = reinterpret_cast<Bytef*>(output);
442 stream_.avail_out = static_cast<uInt>(output_buffer_len);
443
444 int64_t ret = 0;
445 if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
446 if (ret == Z_OK) {
447 // Will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
448 // small
449 return Status::IOError("zlib deflate failed, output buffer too small");
450 }
451
452 return ZlibErrorPrefix("zlib deflate failed: ", stream_.msg);
453 }
454
455 if (deflateReset(&stream_) != Z_OK) {
456 return ZlibErrorPrefix("zlib deflateReset failed: ", stream_.msg);
457 }
458
459 // Actual output length
460 return output_buffer_len - stream_.avail_out;
461 }
462
463 Status Init() override {
464 const Status init_compressor_status = InitCompressor();
465 if (!init_compressor_status.ok()) {
466 return init_compressor_status;
467 }
468 return InitDecompressor();
469 }
470
471 Compression::type compression_type() const override { return Compression::GZIP; }
472
473 int compression_level() const override { return compression_level_; }
474 int minimum_compression_level() const override { return kGZipMinCompressionLevel; }
475 int maximum_compression_level() const override { return kGZipMaxCompressionLevel; }
476 int default_compression_level() const override { return kGZipDefaultCompressionLevel; }
477
478 private:
479 // zlib is stateful and the z_stream state variable must be initialized
480 // before
481 z_stream stream_;
482
483 // Realistically, this will always be GZIP, but we leave the option open to
484 // configure
485 GZipFormat::type format_;
486
487 // These variables are mutually exclusive. When the codec is in "compressor"
488 // state, compressor_initialized_ is true while decompressor_initialized_ is
489 // false. When it's decompressing, the opposite is true.
490 //
491 // Indeed, this is slightly hacky, but the alternative is having separate
492 // Compressor and Decompressor classes. If this ever becomes an issue, we can
493 // perform the refactoring then
494 bool compressor_initialized_;
495 bool decompressor_initialized_;
496 int compression_level_;
497};
498
499} // namespace
500
501std::unique_ptr<Codec> MakeGZipCodec(int compression_level, GZipFormat::type format) {
502 return std::unique_ptr<Codec>(new GZipCodec(compression_level, format));
503}
504
505} // namespace internal
506} // namespace util
507} // namespace arrow