]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include <algorithm> | |
19 | #include <iterator> | |
20 | #include <memory> | |
21 | #include <random> | |
22 | #include <string> | |
23 | #include <utility> | |
24 | #include <vector> | |
25 | ||
26 | #include <gtest/gtest.h> | |
27 | ||
28 | #include "arrow/buffer.h" | |
29 | #include "arrow/io/compressed.h" | |
30 | #include "arrow/io/memory.h" | |
31 | #include "arrow/io/test_common.h" | |
32 | #include "arrow/status.h" | |
33 | #include "arrow/testing/gtest_util.h" | |
34 | #include "arrow/testing/util.h" | |
35 | #include "arrow/util/compression.h" | |
36 | ||
37 | namespace arrow { | |
38 | namespace io { | |
39 | ||
40 | using ::arrow::util::Codec; | |
41 | ||
42 | #ifdef ARROW_VALGRIND | |
43 | // Avoid slowing down tests too much with Valgrind | |
44 | static constexpr int64_t RANDOM_DATA_SIZE = 50 * 1024; | |
45 | static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 120 * 1024; | |
46 | #else | |
47 | // The data should be large enough to exercise internal buffers | |
48 | static constexpr int64_t RANDOM_DATA_SIZE = 3 * 1024 * 1024; | |
49 | static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 8 * 1024 * 1024; | |
50 | #endif | |
51 | ||
52 | std::vector<uint8_t> MakeRandomData(int data_size) { | |
53 | std::vector<uint8_t> data(data_size); | |
54 | random_bytes(data_size, 1234, data.data()); | |
55 | return data; | |
56 | } | |
57 | ||
58 | std::vector<uint8_t> MakeCompressibleData(int data_size) { | |
59 | std::string base_data = | |
60 | "Apache Arrow is a cross-language development platform for in-memory data"; | |
61 | int nrepeats = static_cast<int>(1 + data_size / base_data.size()); | |
62 | ||
63 | std::vector<uint8_t> data(base_data.size() * nrepeats); | |
64 | for (int i = 0; i < nrepeats; ++i) { | |
65 | std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size()); | |
66 | } | |
67 | data.resize(data_size); | |
68 | return data; | |
69 | } | |
70 | ||
71 | std::shared_ptr<Buffer> CompressDataOneShot(Codec* codec, | |
72 | const std::vector<uint8_t>& data) { | |
73 | int64_t max_compressed_len, compressed_len; | |
74 | max_compressed_len = codec->MaxCompressedLen(data.size(), data.data()); | |
75 | auto compressed = *AllocateResizableBuffer(max_compressed_len); | |
76 | compressed_len = *codec->Compress(data.size(), data.data(), max_compressed_len, | |
77 | compressed->mutable_data()); | |
78 | ABORT_NOT_OK(compressed->Resize(compressed_len)); | |
79 | return std::move(compressed); | |
80 | } | |
81 | ||
82 | Status RunCompressedInputStream(Codec* codec, std::shared_ptr<Buffer> compressed, | |
83 | int64_t* stream_pos, std::vector<uint8_t>* out) { | |
84 | // Create compressed input stream | |
85 | auto buffer_reader = std::make_shared<BufferReader>(compressed); | |
86 | ARROW_ASSIGN_OR_RAISE(auto stream, CompressedInputStream::Make(codec, buffer_reader)); | |
87 | ||
88 | std::vector<uint8_t> decompressed; | |
89 | int64_t decompressed_size = 0; | |
90 | const int64_t chunk_size = 1111; | |
91 | while (true) { | |
92 | ARROW_ASSIGN_OR_RAISE(auto buf, stream->Read(chunk_size)); | |
93 | if (buf->size() == 0) { | |
94 | // EOF | |
95 | break; | |
96 | } | |
97 | decompressed.resize(decompressed_size + buf->size()); | |
98 | memcpy(decompressed.data() + decompressed_size, buf->data(), buf->size()); | |
99 | decompressed_size += buf->size(); | |
100 | } | |
101 | if (stream_pos != nullptr) { | |
102 | RETURN_NOT_OK(stream->Tell().Value(stream_pos)); | |
103 | } | |
104 | *out = std::move(decompressed); | |
105 | return Status::OK(); | |
106 | } | |
107 | ||
108 | Status RunCompressedInputStream(Codec* codec, std::shared_ptr<Buffer> compressed, | |
109 | std::vector<uint8_t>* out) { | |
110 | return RunCompressedInputStream(codec, compressed, nullptr, out); | |
111 | } | |
112 | ||
113 | void CheckCompressedInputStream(Codec* codec, const std::vector<uint8_t>& data) { | |
114 | // Create compressed data | |
115 | auto compressed = CompressDataOneShot(codec, data); | |
116 | ||
117 | std::vector<uint8_t> decompressed; | |
118 | int64_t stream_pos = -1; | |
119 | ASSERT_OK(RunCompressedInputStream(codec, compressed, &stream_pos, &decompressed)); | |
120 | ||
121 | ASSERT_EQ(decompressed.size(), data.size()); | |
122 | ASSERT_EQ(decompressed, data); | |
123 | ASSERT_EQ(stream_pos, static_cast<int64_t>(decompressed.size())); | |
124 | } | |
125 | ||
126 | void CheckCompressedOutputStream(Codec* codec, const std::vector<uint8_t>& data, | |
127 | bool do_flush) { | |
128 | // Create compressed output stream | |
129 | ASSERT_OK_AND_ASSIGN(auto buffer_writer, BufferOutputStream::Create()); | |
130 | ASSERT_OK_AND_ASSIGN(auto stream, CompressedOutputStream::Make(codec, buffer_writer)); | |
131 | ASSERT_OK_AND_EQ(0, stream->Tell()); | |
132 | ||
133 | const uint8_t* input = data.data(); | |
134 | int64_t input_len = data.size(); | |
135 | const int64_t chunk_size = 1111; | |
136 | while (input_len > 0) { | |
137 | int64_t nbytes = std::min(chunk_size, input_len); | |
138 | ASSERT_OK(stream->Write(input, nbytes)); | |
139 | input += nbytes; | |
140 | input_len -= nbytes; | |
141 | if (do_flush) { | |
142 | ASSERT_OK(stream->Flush()); | |
143 | } | |
144 | } | |
145 | ASSERT_OK_AND_EQ(static_cast<int64_t>(data.size()), stream->Tell()); | |
146 | ASSERT_OK(stream->Close()); | |
147 | ||
148 | // Get compressed data and decompress it | |
149 | ASSERT_OK_AND_ASSIGN(auto compressed, buffer_writer->Finish()); | |
150 | std::vector<uint8_t> decompressed(data.size()); | |
151 | ASSERT_OK(codec->Decompress(compressed->size(), compressed->data(), decompressed.size(), | |
152 | decompressed.data())); | |
153 | ASSERT_EQ(decompressed, data); | |
154 | } | |
155 | ||
156 | class CompressedInputStreamTest : public ::testing::TestWithParam<Compression::type> { | |
157 | protected: | |
158 | Compression::type GetCompression() { return GetParam(); } | |
159 | ||
160 | std::unique_ptr<Codec> MakeCodec() { return *Codec::Create(GetCompression()); } | |
161 | }; | |
162 | ||
163 | class CompressedOutputStreamTest : public ::testing::TestWithParam<Compression::type> { | |
164 | protected: | |
165 | Compression::type GetCompression() { return GetParam(); } | |
166 | ||
167 | std::unique_ptr<Codec> MakeCodec() { return *Codec::Create(GetCompression()); } | |
168 | }; | |
169 | ||
170 | TEST_P(CompressedInputStreamTest, CompressibleData) { | |
171 | auto codec = MakeCodec(); | |
172 | auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE); | |
173 | ||
174 | CheckCompressedInputStream(codec.get(), data); | |
175 | } | |
176 | ||
177 | TEST_P(CompressedInputStreamTest, RandomData) { | |
178 | auto codec = MakeCodec(); | |
179 | auto data = MakeRandomData(RANDOM_DATA_SIZE); | |
180 | ||
181 | CheckCompressedInputStream(codec.get(), data); | |
182 | } | |
183 | ||
184 | TEST_P(CompressedInputStreamTest, TruncatedData) { | |
185 | auto codec = MakeCodec(); | |
186 | auto data = MakeRandomData(10000); | |
187 | auto compressed = CompressDataOneShot(codec.get(), data); | |
188 | auto truncated = SliceBuffer(compressed, 0, compressed->size() - 3); | |
189 | ||
190 | std::vector<uint8_t> decompressed; | |
191 | ASSERT_RAISES(IOError, RunCompressedInputStream(codec.get(), truncated, &decompressed)); | |
192 | } | |
193 | ||
194 | TEST_P(CompressedInputStreamTest, InvalidData) { | |
195 | auto codec = MakeCodec(); | |
196 | auto compressed_data = MakeRandomData(100); | |
197 | ||
198 | auto buffer_reader = std::make_shared<BufferReader>(Buffer::Wrap(compressed_data)); | |
199 | ASSERT_OK_AND_ASSIGN(auto stream, | |
200 | CompressedInputStream::Make(codec.get(), buffer_reader)); | |
201 | ASSERT_RAISES(IOError, stream->Read(1024)); | |
202 | } | |
203 | ||
204 | TEST_P(CompressedInputStreamTest, ConcatenatedStreams) { | |
205 | // ARROW-5974: just like the "gunzip", "bzip2" and "xz" commands, | |
206 | // decompressing concatenated compressed streams should yield the entire | |
207 | // original data. | |
208 | auto codec = MakeCodec(); | |
209 | auto data1 = MakeCompressibleData(100); | |
210 | auto data2 = MakeCompressibleData(200); | |
211 | auto compressed1 = CompressDataOneShot(codec.get(), data1); | |
212 | auto compressed2 = CompressDataOneShot(codec.get(), data2); | |
213 | std::vector<uint8_t> expected; | |
214 | std::copy(data1.begin(), data1.end(), std::back_inserter(expected)); | |
215 | std::copy(data2.begin(), data2.end(), std::back_inserter(expected)); | |
216 | ||
217 | ASSERT_OK_AND_ASSIGN(auto concatenated, ConcatenateBuffers({compressed1, compressed2})); | |
218 | std::vector<uint8_t> decompressed; | |
219 | ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed)); | |
220 | ASSERT_EQ(decompressed.size(), expected.size()); | |
221 | ASSERT_EQ(decompressed, expected); | |
222 | ||
223 | // Same, but with an empty decompressed stream in the middle | |
224 | auto compressed_empty = CompressDataOneShot(codec.get(), {}); | |
225 | ASSERT_OK_AND_ASSIGN(concatenated, | |
226 | ConcatenateBuffers({compressed1, compressed_empty, compressed2})); | |
227 | ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed)); | |
228 | ASSERT_EQ(decompressed.size(), expected.size()); | |
229 | ASSERT_EQ(decompressed, expected); | |
230 | ||
231 | // Same, but with an empty decompressed stream at the end | |
232 | ASSERT_OK_AND_ASSIGN(concatenated, | |
233 | ConcatenateBuffers({compressed1, compressed2, compressed_empty})); | |
234 | ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed)); | |
235 | ASSERT_EQ(decompressed.size(), expected.size()); | |
236 | ASSERT_EQ(decompressed, expected); | |
237 | } | |
238 | ||
239 | TEST_P(CompressedOutputStreamTest, CompressibleData) { | |
240 | auto codec = MakeCodec(); | |
241 | auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE); | |
242 | ||
243 | CheckCompressedOutputStream(codec.get(), data, false /* do_flush */); | |
244 | CheckCompressedOutputStream(codec.get(), data, true /* do_flush */); | |
245 | } | |
246 | ||
247 | TEST_P(CompressedOutputStreamTest, RandomData) { | |
248 | auto codec = MakeCodec(); | |
249 | auto data = MakeRandomData(RANDOM_DATA_SIZE); | |
250 | ||
251 | CheckCompressedOutputStream(codec.get(), data, false /* do_flush */); | |
252 | CheckCompressedOutputStream(codec.get(), data, true /* do_flush */); | |
253 | } | |
254 | ||
255 | // NOTES: | |
256 | // - Snappy doesn't support streaming decompression | |
257 | // - BZ2 doesn't support one-shot compression | |
258 | // - LZ4 raw format doesn't support streaming decompression | |
259 | ||
260 | #ifdef ARROW_WITH_SNAPPY | |
261 | TEST(TestSnappyInputStream, NotImplemented) { | |
262 | std::unique_ptr<Codec> codec; | |
263 | ASSERT_OK_AND_ASSIGN(codec, Codec::Create(Compression::SNAPPY)); | |
264 | std::shared_ptr<InputStream> stream = std::make_shared<BufferReader>(""); | |
265 | ASSERT_RAISES(NotImplemented, CompressedInputStream::Make(codec.get(), stream)); | |
266 | } | |
267 | ||
268 | TEST(TestSnappyOutputStream, NotImplemented) { | |
269 | std::unique_ptr<Codec> codec; | |
270 | ASSERT_OK_AND_ASSIGN(codec, Codec::Create(Compression::SNAPPY)); | |
271 | std::shared_ptr<OutputStream> stream = std::make_shared<MockOutputStream>(); | |
272 | ASSERT_RAISES(NotImplemented, CompressedOutputStream::Make(codec.get(), stream)); | |
273 | } | |
274 | #endif | |
275 | ||
276 | #if !defined ARROW_WITH_ZLIB && !defined ARROW_WITH_BROTLI && !defined ARROW_WITH_LZ4 && \ | |
277 | !defined ARROW_WITH_ZSTD | |
278 | GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(CompressedInputStreamTest); | |
279 | GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(CompressedOutputStreamTest); | |
280 | #endif | |
281 | ||
282 | #ifdef ARROW_WITH_ZLIB | |
283 | INSTANTIATE_TEST_SUITE_P(TestGZipInputStream, CompressedInputStreamTest, | |
284 | ::testing::Values(Compression::GZIP)); | |
285 | INSTANTIATE_TEST_SUITE_P(TestGZipOutputStream, CompressedOutputStreamTest, | |
286 | ::testing::Values(Compression::GZIP)); | |
287 | #endif | |
288 | ||
289 | #ifdef ARROW_WITH_BROTLI | |
290 | INSTANTIATE_TEST_SUITE_P(TestBrotliInputStream, CompressedInputStreamTest, | |
291 | ::testing::Values(Compression::BROTLI)); | |
292 | INSTANTIATE_TEST_SUITE_P(TestBrotliOutputStream, CompressedOutputStreamTest, | |
293 | ::testing::Values(Compression::BROTLI)); | |
294 | #endif | |
295 | ||
296 | #ifdef ARROW_WITH_LZ4 | |
297 | INSTANTIATE_TEST_SUITE_P(TestLZ4InputStream, CompressedInputStreamTest, | |
298 | ::testing::Values(Compression::LZ4_FRAME)); | |
299 | INSTANTIATE_TEST_SUITE_P(TestLZ4OutputStream, CompressedOutputStreamTest, | |
300 | ::testing::Values(Compression::LZ4_FRAME)); | |
301 | #endif | |
302 | ||
303 | #ifdef ARROW_WITH_ZSTD | |
304 | INSTANTIATE_TEST_SUITE_P(TestZSTDInputStream, CompressedInputStreamTest, | |
305 | ::testing::Values(Compression::ZSTD)); | |
306 | INSTANTIATE_TEST_SUITE_P(TestZSTDOutputStream, CompressedOutputStreamTest, | |
307 | ::testing::Values(Compression::ZSTD)); | |
308 | #endif | |
309 | ||
310 | } // namespace io | |
311 | } // namespace arrow |