]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/io/compressed_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / compressed_test.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <algorithm>
19#include <iterator>
20#include <memory>
21#include <random>
22#include <string>
23#include <utility>
24#include <vector>
25
26#include <gtest/gtest.h>
27
28#include "arrow/buffer.h"
29#include "arrow/io/compressed.h"
30#include "arrow/io/memory.h"
31#include "arrow/io/test_common.h"
32#include "arrow/status.h"
33#include "arrow/testing/gtest_util.h"
34#include "arrow/testing/util.h"
35#include "arrow/util/compression.h"
36
37namespace arrow {
38namespace io {
39
40using ::arrow::util::Codec;
41
42#ifdef ARROW_VALGRIND
43// Avoid slowing down tests too much with Valgrind
44static constexpr int64_t RANDOM_DATA_SIZE = 50 * 1024;
45static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 120 * 1024;
46#else
47// The data should be large enough to exercise internal buffers
48static constexpr int64_t RANDOM_DATA_SIZE = 3 * 1024 * 1024;
49static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 8 * 1024 * 1024;
50#endif
51
52std::vector<uint8_t> MakeRandomData(int data_size) {
53 std::vector<uint8_t> data(data_size);
54 random_bytes(data_size, 1234, data.data());
55 return data;
56}
57
58std::vector<uint8_t> MakeCompressibleData(int data_size) {
59 std::string base_data =
60 "Apache Arrow is a cross-language development platform for in-memory data";
61 int nrepeats = static_cast<int>(1 + data_size / base_data.size());
62
63 std::vector<uint8_t> data(base_data.size() * nrepeats);
64 for (int i = 0; i < nrepeats; ++i) {
65 std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size());
66 }
67 data.resize(data_size);
68 return data;
69}
70
71std::shared_ptr<Buffer> CompressDataOneShot(Codec* codec,
72 const std::vector<uint8_t>& data) {
73 int64_t max_compressed_len, compressed_len;
74 max_compressed_len = codec->MaxCompressedLen(data.size(), data.data());
75 auto compressed = *AllocateResizableBuffer(max_compressed_len);
76 compressed_len = *codec->Compress(data.size(), data.data(), max_compressed_len,
77 compressed->mutable_data());
78 ABORT_NOT_OK(compressed->Resize(compressed_len));
79 return std::move(compressed);
80}
81
82Status RunCompressedInputStream(Codec* codec, std::shared_ptr<Buffer> compressed,
83 int64_t* stream_pos, std::vector<uint8_t>* out) {
84 // Create compressed input stream
85 auto buffer_reader = std::make_shared<BufferReader>(compressed);
86 ARROW_ASSIGN_OR_RAISE(auto stream, CompressedInputStream::Make(codec, buffer_reader));
87
88 std::vector<uint8_t> decompressed;
89 int64_t decompressed_size = 0;
90 const int64_t chunk_size = 1111;
91 while (true) {
92 ARROW_ASSIGN_OR_RAISE(auto buf, stream->Read(chunk_size));
93 if (buf->size() == 0) {
94 // EOF
95 break;
96 }
97 decompressed.resize(decompressed_size + buf->size());
98 memcpy(decompressed.data() + decompressed_size, buf->data(), buf->size());
99 decompressed_size += buf->size();
100 }
101 if (stream_pos != nullptr) {
102 RETURN_NOT_OK(stream->Tell().Value(stream_pos));
103 }
104 *out = std::move(decompressed);
105 return Status::OK();
106}
107
108Status RunCompressedInputStream(Codec* codec, std::shared_ptr<Buffer> compressed,
109 std::vector<uint8_t>* out) {
110 return RunCompressedInputStream(codec, compressed, nullptr, out);
111}
112
113void CheckCompressedInputStream(Codec* codec, const std::vector<uint8_t>& data) {
114 // Create compressed data
115 auto compressed = CompressDataOneShot(codec, data);
116
117 std::vector<uint8_t> decompressed;
118 int64_t stream_pos = -1;
119 ASSERT_OK(RunCompressedInputStream(codec, compressed, &stream_pos, &decompressed));
120
121 ASSERT_EQ(decompressed.size(), data.size());
122 ASSERT_EQ(decompressed, data);
123 ASSERT_EQ(stream_pos, static_cast<int64_t>(decompressed.size()));
124}
125
126void CheckCompressedOutputStream(Codec* codec, const std::vector<uint8_t>& data,
127 bool do_flush) {
128 // Create compressed output stream
129 ASSERT_OK_AND_ASSIGN(auto buffer_writer, BufferOutputStream::Create());
130 ASSERT_OK_AND_ASSIGN(auto stream, CompressedOutputStream::Make(codec, buffer_writer));
131 ASSERT_OK_AND_EQ(0, stream->Tell());
132
133 const uint8_t* input = data.data();
134 int64_t input_len = data.size();
135 const int64_t chunk_size = 1111;
136 while (input_len > 0) {
137 int64_t nbytes = std::min(chunk_size, input_len);
138 ASSERT_OK(stream->Write(input, nbytes));
139 input += nbytes;
140 input_len -= nbytes;
141 if (do_flush) {
142 ASSERT_OK(stream->Flush());
143 }
144 }
145 ASSERT_OK_AND_EQ(static_cast<int64_t>(data.size()), stream->Tell());
146 ASSERT_OK(stream->Close());
147
148 // Get compressed data and decompress it
149 ASSERT_OK_AND_ASSIGN(auto compressed, buffer_writer->Finish());
150 std::vector<uint8_t> decompressed(data.size());
151 ASSERT_OK(codec->Decompress(compressed->size(), compressed->data(), decompressed.size(),
152 decompressed.data()));
153 ASSERT_EQ(decompressed, data);
154}
155
156class CompressedInputStreamTest : public ::testing::TestWithParam<Compression::type> {
157 protected:
158 Compression::type GetCompression() { return GetParam(); }
159
160 std::unique_ptr<Codec> MakeCodec() { return *Codec::Create(GetCompression()); }
161};
162
163class CompressedOutputStreamTest : public ::testing::TestWithParam<Compression::type> {
164 protected:
165 Compression::type GetCompression() { return GetParam(); }
166
167 std::unique_ptr<Codec> MakeCodec() { return *Codec::Create(GetCompression()); }
168};
169
170TEST_P(CompressedInputStreamTest, CompressibleData) {
171 auto codec = MakeCodec();
172 auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE);
173
174 CheckCompressedInputStream(codec.get(), data);
175}
176
177TEST_P(CompressedInputStreamTest, RandomData) {
178 auto codec = MakeCodec();
179 auto data = MakeRandomData(RANDOM_DATA_SIZE);
180
181 CheckCompressedInputStream(codec.get(), data);
182}
183
184TEST_P(CompressedInputStreamTest, TruncatedData) {
185 auto codec = MakeCodec();
186 auto data = MakeRandomData(10000);
187 auto compressed = CompressDataOneShot(codec.get(), data);
188 auto truncated = SliceBuffer(compressed, 0, compressed->size() - 3);
189
190 std::vector<uint8_t> decompressed;
191 ASSERT_RAISES(IOError, RunCompressedInputStream(codec.get(), truncated, &decompressed));
192}
193
194TEST_P(CompressedInputStreamTest, InvalidData) {
195 auto codec = MakeCodec();
196 auto compressed_data = MakeRandomData(100);
197
198 auto buffer_reader = std::make_shared<BufferReader>(Buffer::Wrap(compressed_data));
199 ASSERT_OK_AND_ASSIGN(auto stream,
200 CompressedInputStream::Make(codec.get(), buffer_reader));
201 ASSERT_RAISES(IOError, stream->Read(1024));
202}
203
204TEST_P(CompressedInputStreamTest, ConcatenatedStreams) {
205 // ARROW-5974: just like the "gunzip", "bzip2" and "xz" commands,
206 // decompressing concatenated compressed streams should yield the entire
207 // original data.
208 auto codec = MakeCodec();
209 auto data1 = MakeCompressibleData(100);
210 auto data2 = MakeCompressibleData(200);
211 auto compressed1 = CompressDataOneShot(codec.get(), data1);
212 auto compressed2 = CompressDataOneShot(codec.get(), data2);
213 std::vector<uint8_t> expected;
214 std::copy(data1.begin(), data1.end(), std::back_inserter(expected));
215 std::copy(data2.begin(), data2.end(), std::back_inserter(expected));
216
217 ASSERT_OK_AND_ASSIGN(auto concatenated, ConcatenateBuffers({compressed1, compressed2}));
218 std::vector<uint8_t> decompressed;
219 ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed));
220 ASSERT_EQ(decompressed.size(), expected.size());
221 ASSERT_EQ(decompressed, expected);
222
223 // Same, but with an empty decompressed stream in the middle
224 auto compressed_empty = CompressDataOneShot(codec.get(), {});
225 ASSERT_OK_AND_ASSIGN(concatenated,
226 ConcatenateBuffers({compressed1, compressed_empty, compressed2}));
227 ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed));
228 ASSERT_EQ(decompressed.size(), expected.size());
229 ASSERT_EQ(decompressed, expected);
230
231 // Same, but with an empty decompressed stream at the end
232 ASSERT_OK_AND_ASSIGN(concatenated,
233 ConcatenateBuffers({compressed1, compressed2, compressed_empty}));
234 ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed));
235 ASSERT_EQ(decompressed.size(), expected.size());
236 ASSERT_EQ(decompressed, expected);
237}
238
239TEST_P(CompressedOutputStreamTest, CompressibleData) {
240 auto codec = MakeCodec();
241 auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE);
242
243 CheckCompressedOutputStream(codec.get(), data, false /* do_flush */);
244 CheckCompressedOutputStream(codec.get(), data, true /* do_flush */);
245}
246
247TEST_P(CompressedOutputStreamTest, RandomData) {
248 auto codec = MakeCodec();
249 auto data = MakeRandomData(RANDOM_DATA_SIZE);
250
251 CheckCompressedOutputStream(codec.get(), data, false /* do_flush */);
252 CheckCompressedOutputStream(codec.get(), data, true /* do_flush */);
253}
254
255// NOTES:
256// - Snappy doesn't support streaming decompression
257// - BZ2 doesn't support one-shot compression
258// - LZ4 raw format doesn't support streaming decompression
259
260#ifdef ARROW_WITH_SNAPPY
261TEST(TestSnappyInputStream, NotImplemented) {
262 std::unique_ptr<Codec> codec;
263 ASSERT_OK_AND_ASSIGN(codec, Codec::Create(Compression::SNAPPY));
264 std::shared_ptr<InputStream> stream = std::make_shared<BufferReader>("");
265 ASSERT_RAISES(NotImplemented, CompressedInputStream::Make(codec.get(), stream));
266}
267
268TEST(TestSnappyOutputStream, NotImplemented) {
269 std::unique_ptr<Codec> codec;
270 ASSERT_OK_AND_ASSIGN(codec, Codec::Create(Compression::SNAPPY));
271 std::shared_ptr<OutputStream> stream = std::make_shared<MockOutputStream>();
272 ASSERT_RAISES(NotImplemented, CompressedOutputStream::Make(codec.get(), stream));
273}
274#endif
275
276#if !defined ARROW_WITH_ZLIB && !defined ARROW_WITH_BROTLI && !defined ARROW_WITH_LZ4 && \
277 !defined ARROW_WITH_ZSTD
278GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(CompressedInputStreamTest);
279GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(CompressedOutputStreamTest);
280#endif
281
282#ifdef ARROW_WITH_ZLIB
283INSTANTIATE_TEST_SUITE_P(TestGZipInputStream, CompressedInputStreamTest,
284 ::testing::Values(Compression::GZIP));
285INSTANTIATE_TEST_SUITE_P(TestGZipOutputStream, CompressedOutputStreamTest,
286 ::testing::Values(Compression::GZIP));
287#endif
288
289#ifdef ARROW_WITH_BROTLI
290INSTANTIATE_TEST_SUITE_P(TestBrotliInputStream, CompressedInputStreamTest,
291 ::testing::Values(Compression::BROTLI));
292INSTANTIATE_TEST_SUITE_P(TestBrotliOutputStream, CompressedOutputStreamTest,
293 ::testing::Values(Compression::BROTLI));
294#endif
295
296#ifdef ARROW_WITH_LZ4
297INSTANTIATE_TEST_SUITE_P(TestLZ4InputStream, CompressedInputStreamTest,
298 ::testing::Values(Compression::LZ4_FRAME));
299INSTANTIATE_TEST_SUITE_P(TestLZ4OutputStream, CompressedOutputStreamTest,
300 ::testing::Values(Compression::LZ4_FRAME));
301#endif
302
303#ifdef ARROW_WITH_ZSTD
304INSTANTIATE_TEST_SUITE_P(TestZSTDInputStream, CompressedInputStreamTest,
305 ::testing::Values(Compression::ZSTD));
306INSTANTIATE_TEST_SUITE_P(TestZSTDOutputStream, CompressedOutputStreamTest,
307 ::testing::Values(Compression::ZSTD));
308#endif
309
310} // namespace io
311} // namespace arrow