]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/adapters/orc/adapter.h" | |
19 | ||
20 | #include <gtest/gtest.h> | |
21 | ||
22 | #include <orc/OrcFile.hh> | |
23 | #include <string> | |
24 | ||
25 | #include "arrow/adapters/orc/adapter_util.h" | |
26 | #include "arrow/array.h" | |
27 | #include "arrow/buffer.h" | |
28 | #include "arrow/buffer_builder.h" | |
29 | #include "arrow/chunked_array.h" | |
30 | #include "arrow/compute/cast.h" | |
31 | #include "arrow/io/interfaces.h" | |
32 | #include "arrow/io/memory.h" | |
33 | #include "arrow/status.h" | |
34 | #include "arrow/table.h" | |
35 | #include "arrow/testing/gtest_util.h" | |
36 | #include "arrow/testing/random.h" | |
37 | #include "arrow/type.h" | |
38 | #include "arrow/util/decimal.h" | |
39 | #include "arrow/util/key_value_metadata.h" | |
40 | ||
41 | namespace liborc = orc; | |
42 | ||
43 | namespace arrow { | |
44 | ||
45 | using internal::checked_pointer_cast; | |
46 | ||
47 | constexpr int kDefaultSmallMemStreamSize = 16384 * 5; // 80KB | |
48 | constexpr int kDefaultMemStreamSize = 10 * 1024 * 1024; | |
49 | constexpr int64_t kNanoMax = std::numeric_limits<int64_t>::max(); | |
50 | constexpr int64_t kNanoMin = std::numeric_limits<int64_t>::lowest(); | |
51 | const int64_t kMicroMax = std::floor(kNanoMax / 1000); | |
52 | const int64_t kMicroMin = std::ceil(kNanoMin / 1000); | |
53 | const int64_t kMilliMax = std::floor(kMicroMax / 1000); | |
54 | const int64_t kMilliMin = std::ceil(kMicroMin / 1000); | |
55 | const int64_t kSecondMax = std::floor(kMilliMax / 1000); | |
56 | const int64_t kSecondMin = std::ceil(kMilliMin / 1000); | |
57 | ||
58 | static constexpr random::SeedType kRandomSeed = 0x0ff1ce; | |
59 | ||
60 | class MemoryOutputStream : public liborc::OutputStream { | |
61 | public: | |
62 | explicit MemoryOutputStream(ssize_t capacity) | |
63 | : data_(capacity), name_("MemoryOutputStream"), length_(0) {} | |
64 | ||
65 | uint64_t getLength() const override { return length_; } | |
66 | ||
67 | uint64_t getNaturalWriteSize() const override { return natural_write_size_; } | |
68 | ||
69 | void write(const void* buf, size_t size) override { | |
70 | memcpy(data_.data() + length_, buf, size); | |
71 | length_ += size; | |
72 | } | |
73 | ||
74 | const std::string& getName() const override { return name_; } | |
75 | ||
76 | const char* getData() const { return data_.data(); } | |
77 | ||
78 | void close() override {} | |
79 | ||
80 | void reset() { length_ = 0; } | |
81 | ||
82 | private: | |
83 | std::vector<char> data_; | |
84 | std::string name_; | |
85 | uint64_t length_, natural_write_size_; | |
86 | }; | |
87 | ||
88 | std::shared_ptr<Buffer> GenerateFixedDifferenceBuffer(int32_t fixed_length, | |
89 | int64_t length) { | |
90 | BufferBuilder builder; | |
91 | int32_t offsets[length]; | |
92 | ARROW_EXPECT_OK(builder.Resize(4 * length)); | |
93 | for (int32_t i = 0; i < length; i++) { | |
94 | offsets[i] = fixed_length * i; | |
95 | } | |
96 | ARROW_EXPECT_OK(builder.Append(offsets, 4 * length)); | |
97 | std::shared_ptr<Buffer> buffer; | |
98 | ARROW_EXPECT_OK(builder.Finish(&buffer)); | |
99 | return buffer; | |
100 | } | |
101 | ||
102 | std::shared_ptr<Array> CastFixedSizeBinaryArrayToBinaryArray( | |
103 | std::shared_ptr<Array> array) { | |
104 | auto fixed_size_binary_array = checked_pointer_cast<FixedSizeBinaryArray>(array); | |
105 | std::shared_ptr<Buffer> value_offsets = GenerateFixedDifferenceBuffer( | |
106 | fixed_size_binary_array->byte_width(), array->length() + 1); | |
107 | return std::make_shared<BinaryArray>(array->length(), value_offsets, | |
108 | array->data()->buffers[1], | |
109 | array->data()->buffers[0]); | |
110 | } | |
111 | ||
112 | template <typename TargetArrayType> | |
113 | std::shared_ptr<Array> CastInt64ArrayToTemporalArray( | |
114 | const std::shared_ptr<DataType>& type, std::shared_ptr<Array> array) { | |
115 | std::shared_ptr<ArrayData> new_array_data = | |
116 | ArrayData::Make(type, array->length(), array->data()->buffers); | |
117 | return std::make_shared<TargetArrayType>(new_array_data); | |
118 | } | |
119 | ||
120 | Result<std::shared_ptr<Array>> GenerateRandomDate64Array(int64_t size, | |
121 | double null_probability) { | |
122 | arrow::random::RandomArrayGenerator rand(kRandomSeed); | |
123 | return CastInt64ArrayToTemporalArray<Date64Array>( | |
124 | date64(), rand.Int64(size, kMilliMin, kMilliMax, null_probability)); | |
125 | } | |
126 | ||
127 | Result<std::shared_ptr<Array>> GenerateRandomTimestampArray(int64_t size, | |
128 | arrow::TimeUnit::type type, | |
129 | double null_probability) { | |
130 | arrow::random::RandomArrayGenerator rand(kRandomSeed); | |
131 | switch (type) { | |
132 | case arrow::TimeUnit::type::SECOND: { | |
133 | return CastInt64ArrayToTemporalArray<TimestampArray>( | |
134 | timestamp(TimeUnit::SECOND), | |
135 | rand.Int64(size, kSecondMin, kSecondMax, null_probability)); | |
136 | } | |
137 | case arrow::TimeUnit::type::MILLI: { | |
138 | return CastInt64ArrayToTemporalArray<TimestampArray>( | |
139 | timestamp(TimeUnit::MILLI), | |
140 | rand.Int64(size, kMilliMin, kMilliMax, null_probability)); | |
141 | } | |
142 | case arrow::TimeUnit::type::MICRO: { | |
143 | return CastInt64ArrayToTemporalArray<TimestampArray>( | |
144 | timestamp(TimeUnit::MICRO), | |
145 | rand.Int64(size, kMicroMin, kMicroMax, null_probability)); | |
146 | } | |
147 | case arrow::TimeUnit::type::NANO: { | |
148 | return CastInt64ArrayToTemporalArray<TimestampArray>( | |
149 | timestamp(TimeUnit::NANO), | |
150 | rand.Int64(size, kNanoMin, kNanoMax, null_probability)); | |
151 | } | |
152 | default: { | |
153 | return arrow::Status::TypeError("Unknown or unsupported Arrow TimeUnit: ", type); | |
154 | } | |
155 | } | |
156 | } | |
157 | ||
158 | /// \brief Construct a random weak composition of a nonnegative integer | |
159 | /// i.e. a way of writing it as the sum of a sequence of n non-negative | |
160 | /// integers. | |
161 | /// | |
162 | /// \param[in] n the number of integers in the weak composition | |
163 | /// \param[in] sum the integer of which a random weak composition is generated | |
164 | /// \param[out] out The generated weak composition | |
165 | template <typename T, typename U> | |
166 | void RandWeakComposition(int64_t n, T sum, std::vector<U>* out) { | |
167 | const int random_seed = 0; | |
168 | std::default_random_engine gen(random_seed); | |
169 | out->resize(n, static_cast<T>(0)); | |
170 | T remaining_sum = sum; | |
171 | std::generate(out->begin(), out->end() - 1, [&gen, &remaining_sum] { | |
172 | std::uniform_int_distribution<T> d(static_cast<T>(0), remaining_sum); | |
173 | auto res = d(gen); | |
174 | remaining_sum -= res; | |
175 | return static_cast<U>(res); | |
176 | }); | |
177 | (*out)[n - 1] += remaining_sum; | |
178 | std::random_shuffle(out->begin(), out->end()); | |
179 | } | |
180 | ||
181 | std::shared_ptr<ChunkedArray> GenerateRandomChunkedArray( | |
182 | const std::shared_ptr<DataType>& data_type, int64_t size, int64_t min_num_chunks, | |
183 | int64_t max_num_chunks, double null_probability) { | |
184 | arrow::random::RandomArrayGenerator rand(kRandomSeed); | |
185 | std::vector<int64_t> num_chunks(1, 0); | |
186 | std::vector<int64_t> current_size_chunks; | |
187 | arrow::randint<int64_t, int64_t>(1, min_num_chunks, max_num_chunks, &num_chunks); | |
188 | int64_t current_num_chunks = num_chunks[0]; | |
189 | ArrayVector arrays(current_num_chunks, nullptr); | |
190 | arrow::RandWeakComposition(current_num_chunks, size, ¤t_size_chunks); | |
191 | for (int j = 0; j < current_num_chunks; j++) { | |
192 | switch (data_type->id()) { | |
193 | case arrow::Type::type::DATE64: { | |
194 | EXPECT_OK_AND_ASSIGN(arrays[j], GenerateRandomDate64Array(current_size_chunks[j], | |
195 | null_probability)); | |
196 | break; | |
197 | } | |
198 | case arrow::Type::type::TIMESTAMP: { | |
199 | EXPECT_OK_AND_ASSIGN( | |
200 | arrays[j], | |
201 | GenerateRandomTimestampArray( | |
202 | current_size_chunks[j], | |
203 | arrow::internal::checked_pointer_cast<arrow::TimestampType>(data_type) | |
204 | ->unit(), | |
205 | null_probability)); | |
206 | break; | |
207 | } | |
208 | default: | |
209 | arrays[j] = rand.ArrayOf(data_type, current_size_chunks[j], null_probability); | |
210 | } | |
211 | } | |
212 | return std::make_shared<ChunkedArray>(arrays); | |
213 | } | |
214 | ||
215 | std::shared_ptr<Table> GenerateRandomTable(const std::shared_ptr<Schema>& schema, | |
216 | int64_t size, int64_t min_num_chunks, | |
217 | int64_t max_num_chunks, | |
218 | double null_probability) { | |
219 | int num_cols = schema->num_fields(); | |
220 | ChunkedArrayVector cv; | |
221 | for (int col = 0; col < num_cols; col++) { | |
222 | cv.push_back(GenerateRandomChunkedArray(schema->field(col)->type(), size, | |
223 | min_num_chunks, max_num_chunks, | |
224 | null_probability)); | |
225 | } | |
226 | return Table::Make(schema, cv); | |
227 | } | |
228 | ||
229 | void AssertTableWriteReadEqual(const std::shared_ptr<Table>& input_table, | |
230 | const std::shared_ptr<Table>& expected_output_table, | |
231 | const int64_t max_size = kDefaultSmallMemStreamSize) { | |
232 | EXPECT_OK_AND_ASSIGN(auto buffer_output_stream, | |
233 | io::BufferOutputStream::Create(max_size)); | |
234 | EXPECT_OK_AND_ASSIGN(auto writer, | |
235 | adapters::orc::ORCFileWriter::Open(buffer_output_stream.get())); | |
236 | ARROW_EXPECT_OK(writer->Write(*input_table)); | |
237 | ARROW_EXPECT_OK(writer->Close()); | |
238 | EXPECT_OK_AND_ASSIGN(auto buffer, buffer_output_stream->Finish()); | |
239 | std::shared_ptr<io::RandomAccessFile> in_stream(new io::BufferReader(buffer)); | |
240 | EXPECT_OK_AND_ASSIGN( | |
241 | auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); | |
242 | EXPECT_OK_AND_ASSIGN(auto actual_output_table, reader->Read()); | |
243 | AssertTablesEqual(*expected_output_table, *actual_output_table, false, false); | |
244 | } | |
245 | ||
246 | void AssertArrayWriteReadEqual(const std::shared_ptr<Array>& input_array, | |
247 | const std::shared_ptr<Array>& expected_output_array, | |
248 | const int64_t max_size = kDefaultSmallMemStreamSize) { | |
249 | std::shared_ptr<Schema> input_schema = schema({field("col0", input_array->type())}), | |
250 | output_schema = | |
251 | schema({field("col0", expected_output_array->type())}); | |
252 | auto input_chunked_array = std::make_shared<ChunkedArray>(input_array), | |
253 | expected_output_chunked_array = | |
254 | std::make_shared<ChunkedArray>(expected_output_array); | |
255 | std::shared_ptr<Table> input_table = Table::Make(input_schema, {input_chunked_array}), | |
256 | expected_output_table = | |
257 | Table::Make(output_schema, {expected_output_chunked_array}); | |
258 | AssertTableWriteReadEqual(input_table, expected_output_table, max_size); | |
259 | } | |
260 | ||
261 | void SchemaORCWriteReadTest(const std::shared_ptr<Schema>& schema, int64_t size, | |
262 | int64_t min_num_chunks, int64_t max_num_chunks, | |
263 | double null_probability, | |
264 | int64_t max_size = kDefaultSmallMemStreamSize) { | |
265 | const std::shared_ptr<Table> table = | |
266 | GenerateRandomTable(schema, size, min_num_chunks, max_num_chunks, null_probability); | |
267 | AssertTableWriteReadEqual(table, table, max_size); | |
268 | } | |
269 | ||
270 | std::unique_ptr<liborc::Writer> CreateWriter(uint64_t stripe_size, | |
271 | const liborc::Type& type, | |
272 | liborc::OutputStream* stream) { | |
273 | liborc::WriterOptions options; | |
274 | options.setStripeSize(stripe_size); | |
275 | options.setCompressionBlockSize(1024); | |
276 | options.setMemoryPool(liborc::getDefaultPool()); | |
277 | options.setRowIndexStride(0); | |
278 | return liborc::createWriter(type, stream, options); | |
279 | } | |
280 | ||
281 | TEST(TestAdapterRead, ReadIntAndStringFileMultipleStripes) { | |
282 | MemoryOutputStream mem_stream(kDefaultMemStreamSize); | |
283 | ORC_UNIQUE_PTR<liborc::Type> type( | |
284 | liborc::Type::buildTypeFromString("struct<col1:int,col2:string>")); | |
285 | ||
286 | constexpr uint64_t stripe_size = 1024; // 1K | |
287 | constexpr uint64_t stripe_count = 10; | |
288 | constexpr uint64_t stripe_row_count = 16384; | |
289 | constexpr uint64_t reader_batch_size = 1024; | |
290 | ||
291 | auto writer = CreateWriter(stripe_size, *type, &mem_stream); | |
292 | auto batch = writer->createRowBatch(stripe_row_count); | |
293 | auto struct_batch = internal::checked_cast<liborc::StructVectorBatch*>(batch.get()); | |
294 | auto long_batch = | |
295 | internal::checked_cast<liborc::LongVectorBatch*>(struct_batch->fields[0]); | |
296 | auto str_batch = | |
297 | internal::checked_cast<liborc::StringVectorBatch*>(struct_batch->fields[1]); | |
298 | int64_t accumulated = 0; | |
299 | ||
300 | for (uint64_t j = 0; j < stripe_count; ++j) { | |
301 | std::string data_buffer(stripe_row_count * 5, '\0'); | |
302 | uint64_t offset = 0; | |
303 | for (uint64_t i = 0; i < stripe_row_count; ++i) { | |
304 | std::string str_data = std::to_string(accumulated % stripe_row_count); | |
305 | long_batch->data[i] = static_cast<int64_t>(accumulated % stripe_row_count); | |
306 | str_batch->data[i] = &data_buffer[offset]; | |
307 | str_batch->length[i] = static_cast<int64_t>(str_data.size()); | |
308 | memcpy(&data_buffer[offset], str_data.c_str(), str_data.size()); | |
309 | accumulated++; | |
310 | offset += str_data.size(); | |
311 | } | |
312 | struct_batch->numElements = stripe_row_count; | |
313 | long_batch->numElements = stripe_row_count; | |
314 | str_batch->numElements = stripe_row_count; | |
315 | ||
316 | writer->add(*batch); | |
317 | } | |
318 | ||
319 | writer->close(); | |
320 | ||
321 | std::shared_ptr<io::RandomAccessFile> in_stream(new io::BufferReader( | |
322 | std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(mem_stream.getData()), | |
323 | static_cast<int64_t>(mem_stream.getLength())))); | |
324 | ||
325 | ASSERT_OK_AND_ASSIGN( | |
326 | auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); | |
327 | ||
328 | EXPECT_OK_AND_ASSIGN(auto metadata, reader->ReadMetadata()); | |
329 | auto expected_metadata = std::const_pointer_cast<const KeyValueMetadata>( | |
330 | key_value_metadata(std::vector<std::string>(), std::vector<std::string>())); | |
331 | ASSERT_TRUE(metadata->Equals(*expected_metadata)); | |
332 | ASSERT_EQ(stripe_row_count * stripe_count, reader->NumberOfRows()); | |
333 | ASSERT_EQ(stripe_count, reader->NumberOfStripes()); | |
334 | accumulated = 0; | |
335 | EXPECT_OK_AND_ASSIGN(auto stripe_reader, reader->NextStripeReader(reader_batch_size)); | |
336 | while (stripe_reader) { | |
337 | std::shared_ptr<RecordBatch> record_batch; | |
338 | EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok()); | |
339 | while (record_batch) { | |
340 | auto int32_array = checked_pointer_cast<Int32Array>(record_batch->column(0)); | |
341 | auto str_array = checked_pointer_cast<StringArray>(record_batch->column(1)); | |
342 | for (int j = 0; j < record_batch->num_rows(); ++j) { | |
343 | EXPECT_EQ(accumulated % stripe_row_count, int32_array->Value(j)); | |
344 | EXPECT_EQ(std::to_string(accumulated % stripe_row_count), | |
345 | str_array->GetString(j)); | |
346 | accumulated++; | |
347 | } | |
348 | EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok()); | |
349 | } | |
350 | EXPECT_OK_AND_ASSIGN(stripe_reader, reader->NextStripeReader(reader_batch_size)); | |
351 | } | |
352 | ||
353 | // test seek operation | |
354 | int64_t start_offset = 830; | |
355 | EXPECT_TRUE(reader->Seek(stripe_row_count + start_offset).ok()); | |
356 | ||
357 | EXPECT_OK_AND_ASSIGN(stripe_reader, reader->NextStripeReader(reader_batch_size)); | |
358 | std::shared_ptr<RecordBatch> record_batch; | |
359 | EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok()); | |
360 | while (record_batch) { | |
361 | auto int32_array = std::dynamic_pointer_cast<Int32Array>(record_batch->column(0)); | |
362 | auto str_array = std::dynamic_pointer_cast<StringArray>(record_batch->column(1)); | |
363 | for (int j = 0; j < record_batch->num_rows(); ++j) { | |
364 | std::ostringstream os; | |
365 | os << start_offset % stripe_row_count; | |
366 | EXPECT_EQ(start_offset % stripe_row_count, int32_array->Value(j)); | |
367 | EXPECT_EQ(os.str(), str_array->GetString(j)); | |
368 | start_offset++; | |
369 | } | |
370 | EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok()); | |
371 | } | |
372 | } | |
373 | ||
374 | // WriteORC tests | |
375 | // Trivial | |
376 | ||
377 | class TestORCWriterTrivialNoConversion : public ::testing::Test { | |
378 | public: | |
379 | TestORCWriterTrivialNoConversion() { | |
380 | table_schema = schema( | |
381 | {field("bool", boolean()), field("int8", int8()), field("int16", int16()), | |
382 | field("int32", int32()), field("int64", int64()), field("float", float32()), | |
383 | field("double", float64()), field("decimal128nz", decimal128(25, 6)), | |
384 | field("decimal128z", decimal128(32, 0)), field("date32", date32()), | |
385 | field("ts3", timestamp(TimeUnit::NANO)), field("string", utf8()), | |
386 | field("binary", binary()), | |
387 | field("struct", struct_({field("a", utf8()), field("b", int64())})), | |
388 | field("list", list(int32())), | |
389 | field("lsl", list(struct_({field("lsl0", list(int32()))}))), | |
390 | field("map", map(utf8(), utf8()))}); | |
391 | } | |
392 | ||
393 | protected: | |
394 | std::shared_ptr<Schema> table_schema; | |
395 | }; | |
396 | TEST_F(TestORCWriterTrivialNoConversion, writeTrivialChunk) { | |
397 | std::shared_ptr<Table> table = TableFromJSON(table_schema, {R"([])"}); | |
398 | AssertTableWriteReadEqual(table, table, kDefaultSmallMemStreamSize / 16); | |
399 | } | |
400 | TEST_F(TestORCWriterTrivialNoConversion, writeChunkless) { | |
401 | std::shared_ptr<Table> table = TableFromJSON(table_schema, {}); | |
402 | AssertTableWriteReadEqual(table, table, kDefaultSmallMemStreamSize / 16); | |
403 | } | |
404 | class TestORCWriterTrivialWithConversion : public ::testing::Test { | |
405 | public: | |
406 | TestORCWriterTrivialWithConversion() { | |
407 | input_schema = schema( | |
408 | {field("date64", date64()), field("ts0", timestamp(TimeUnit::SECOND)), | |
409 | field("ts1", timestamp(TimeUnit::MILLI)), | |
410 | field("ts2", timestamp(TimeUnit::MICRO)), field("large_string", large_utf8()), | |
411 | field("large_binary", large_binary()), | |
412 | field("fixed_size_binary0", fixed_size_binary(0)), | |
413 | field("fixed_size_binary", fixed_size_binary(5)), | |
414 | field("large_list", large_list(int32())), | |
415 | field("fixed_size_list", fixed_size_list(int32(), 3))}), | |
416 | output_schema = schema( | |
417 | {field("date64", timestamp(TimeUnit::NANO)), | |
418 | field("ts0", timestamp(TimeUnit::NANO)), field("ts1", timestamp(TimeUnit::NANO)), | |
419 | field("ts2", timestamp(TimeUnit::NANO)), field("large_string", utf8()), | |
420 | field("large_binary", binary()), field("fixed_size_binary0", binary()), | |
421 | field("fixed_size_binary", binary()), field("large_list", list(int32())), | |
422 | field("fixed_size_list", list(int32()))}); | |
423 | } | |
424 | ||
425 | protected: | |
426 | std::shared_ptr<Schema> input_schema, output_schema; | |
427 | }; | |
428 | TEST_F(TestORCWriterTrivialWithConversion, writeTrivialChunk) { | |
429 | std::shared_ptr<Table> input_table = TableFromJSON(input_schema, {R"([])"}), | |
430 | expected_output_table = TableFromJSON(output_schema, {R"([])"}); | |
431 | AssertTableWriteReadEqual(input_table, expected_output_table, | |
432 | kDefaultSmallMemStreamSize / 16); | |
433 | } | |
434 | TEST_F(TestORCWriterTrivialWithConversion, writeChunkless) { | |
435 | std::shared_ptr<Table> input_table = TableFromJSON(input_schema, {}), | |
436 | expected_output_table = TableFromJSON(output_schema, {}); | |
437 | AssertTableWriteReadEqual(input_table, expected_output_table, | |
438 | kDefaultSmallMemStreamSize / 16); | |
439 | } | |
440 | ||
441 | // General | |
442 | ||
443 | class TestORCWriterNoConversion : public ::testing::Test { | |
444 | public: | |
445 | TestORCWriterNoConversion() { | |
446 | table_schema = schema( | |
447 | {field("bool", boolean()), field("int8", int8()), field("int16", int16()), | |
448 | field("int32", int32()), field("int64", int64()), field("float", float32()), | |
449 | field("double", float64()), field("date32", date32()), | |
450 | field("decimal64", decimal128(18, 4)), field("decimal64z", decimal128(18, 0)), | |
451 | field("ts3", timestamp(TimeUnit::NANO)), field("string", utf8()), | |
452 | field("binary", binary())}); | |
453 | } | |
454 | ||
455 | protected: | |
456 | std::shared_ptr<Schema> table_schema; | |
457 | }; | |
458 | TEST_F(TestORCWriterNoConversion, writeNoNulls) { | |
459 | SchemaORCWriteReadTest(table_schema, 11203, 5, 10, 0, kDefaultSmallMemStreamSize * 5); | |
460 | } | |
461 | TEST_F(TestORCWriterNoConversion, writeMixed) { | |
462 | SchemaORCWriteReadTest(table_schema, 9405, 1, 20, 0.6, kDefaultSmallMemStreamSize * 5); | |
463 | } | |
464 | TEST_F(TestORCWriterNoConversion, writeAllNulls) { | |
465 | SchemaORCWriteReadTest(table_schema, 4006, 1, 5, 1); | |
466 | } | |
467 | ||
468 | // Converts | |
469 | // Since Arrow has way more types than ORC type conversions are unavoidable | |
470 | class TestORCWriterWithConversion : public ::testing::Test { | |
471 | public: | |
472 | TestORCWriterWithConversion() { | |
473 | input_schema = schema( | |
474 | {field("date64", date64()), field("ts0", timestamp(TimeUnit::SECOND)), | |
475 | field("ts1", timestamp(TimeUnit::MILLI)), | |
476 | field("ts2", timestamp(TimeUnit::MICRO)), field("large_string", large_utf8()), | |
477 | field("large_binary", large_binary()), | |
478 | field("fixed_size_binary0", fixed_size_binary(0)), | |
479 | field("fixed_size_binary", fixed_size_binary(5))}), | |
480 | output_schema = schema( | |
481 | {field("date64", timestamp(TimeUnit::NANO)), | |
482 | field("ts0", timestamp(TimeUnit::NANO)), field("ts1", timestamp(TimeUnit::NANO)), | |
483 | field("ts2", timestamp(TimeUnit::NANO)), field("large_string", utf8()), | |
484 | field("large_binary", binary()), field("fixed_size_binary0", binary()), | |
485 | field("fixed_size_binary", binary())}); | |
486 | } | |
487 | void RunTest(int64_t num_rows, double null_possibility, | |
488 | int64_t max_size = kDefaultSmallMemStreamSize) { | |
489 | int64_t num_cols = (input_schema->fields()).size(); | |
490 | std::shared_ptr<Table> input_table = | |
491 | GenerateRandomTable(input_schema, num_rows, 1, 1, null_possibility); | |
492 | ArrayVector av(num_cols); | |
493 | for (int i = 0; i < num_cols - 2; i++) { | |
494 | EXPECT_OK_AND_ASSIGN(av[i], | |
495 | arrow::compute::Cast(*(input_table->column(i)->chunk(0)), | |
496 | output_schema->field(i)->type())); | |
497 | } | |
498 | for (int i = num_cols - 2; i < num_cols; i++) { | |
499 | av[i] = CastFixedSizeBinaryArrayToBinaryArray(input_table->column(i)->chunk(0)); | |
500 | } | |
501 | std::shared_ptr<Table> expected_output_table = Table::Make(output_schema, av); | |
502 | AssertTableWriteReadEqual(input_table, expected_output_table, max_size); | |
503 | } | |
504 | ||
505 | protected: | |
506 | std::shared_ptr<Schema> input_schema, output_schema; | |
507 | }; | |
508 | TEST_F(TestORCWriterWithConversion, writeAllNulls) { RunTest(12000, 1); } | |
509 | TEST_F(TestORCWriterWithConversion, writeNoNulls) { RunTest(10009, 0); } | |
510 | TEST_F(TestORCWriterWithConversion, writeMixed) { RunTest(8021, 0.5); } | |
511 | ||
512 | class TestORCWriterSingleArray : public ::testing::Test { | |
513 | public: | |
514 | TestORCWriterSingleArray() : rand(kRandomSeed) {} | |
515 | ||
516 | protected: | |
517 | arrow::random::RandomArrayGenerator rand; | |
518 | }; | |
519 | ||
520 | // Nested types | |
521 | TEST_F(TestORCWriterSingleArray, WriteStruct) { | |
522 | std::vector<std::shared_ptr<Field>> subfields{field("int32", boolean())}; | |
523 | const int64_t num_rows = 1234; | |
524 | int num_subcols = subfields.size(); | |
525 | ArrayVector av0(num_subcols); | |
526 | for (int i = 0; i < num_subcols; i++) { | |
527 | av0[i] = rand.ArrayOf(subfields[i]->type(), num_rows, 0.4); | |
528 | } | |
529 | std::shared_ptr<Buffer> bitmap = rand.NullBitmap(num_rows, 0.5); | |
530 | std::shared_ptr<Array> array = | |
531 | std::make_shared<StructArray>(struct_(subfields), num_rows, av0, bitmap); | |
532 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 10); | |
533 | } | |
534 | ||
535 | TEST_F(TestORCWriterSingleArray, WriteStructOfStruct) { | |
536 | std::vector<std::shared_ptr<Field>> subsubfields{ | |
537 | field("bool", boolean()), | |
538 | field("int8", int8()), | |
539 | field("int16", int16()), | |
540 | field("int32", int32()), | |
541 | field("int64", int64()), | |
542 | field("date32", date32()), | |
543 | field("ts3", timestamp(TimeUnit::NANO)), | |
544 | field("string", utf8()), | |
545 | field("binary", binary())}; | |
546 | const int64_t num_rows = 1234; | |
547 | int num_subsubcols = subsubfields.size(); | |
548 | ArrayVector av00(num_subsubcols), av0(1); | |
549 | for (int i = 0; i < num_subsubcols; i++) { | |
550 | av00[i] = rand.ArrayOf(subsubfields[i]->type(), num_rows, 0); | |
551 | } | |
552 | std::shared_ptr<Buffer> bitmap0 = rand.NullBitmap(num_rows, 0); | |
553 | av0[0] = std::make_shared<StructArray>(struct_(subsubfields), num_rows, av00, bitmap0); | |
554 | std::shared_ptr<Buffer> bitmap = rand.NullBitmap(num_rows, 0.2); | |
555 | std::shared_ptr<Array> array = std::make_shared<StructArray>( | |
556 | struct_({field("struct2", struct_(subsubfields))}), num_rows, av0, bitmap); | |
557 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 10); | |
558 | } | |
559 | ||
560 | TEST_F(TestORCWriterSingleArray, WriteList) { | |
561 | const int64_t num_rows = 1234; | |
562 | auto value_array = rand.ArrayOf(int32(), 125 * num_rows, 0); | |
563 | std::shared_ptr<Array> array = rand.List(*value_array, num_rows, 1); | |
564 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 100); | |
565 | } | |
566 | ||
567 | TEST_F(TestORCWriterSingleArray, WriteLargeList) { | |
568 | const int64_t num_rows = 1234; | |
569 | auto value_array = rand.ArrayOf(int32(), 5 * num_rows, 0.5); | |
570 | auto output_offsets = rand.Offsets(num_rows + 1, 0, 5 * num_rows, 0.6, false); | |
571 | EXPECT_OK_AND_ASSIGN(auto input_offsets, | |
572 | arrow::compute::Cast(*output_offsets, int64())); | |
573 | EXPECT_OK_AND_ASSIGN(auto input_array, | |
574 | arrow::LargeListArray::FromArrays(*input_offsets, *value_array)); | |
575 | EXPECT_OK_AND_ASSIGN(auto output_array, | |
576 | arrow::ListArray::FromArrays(*output_offsets, *value_array)); | |
577 | AssertArrayWriteReadEqual(input_array, output_array, kDefaultSmallMemStreamSize * 10); | |
578 | } | |
579 | ||
580 | TEST_F(TestORCWriterSingleArray, WriteFixedSizeList) { | |
581 | const int64_t num_rows = 1234; | |
582 | std::shared_ptr<Array> value_array = rand.ArrayOf(int32(), 3 * num_rows, 0.8); | |
583 | std::shared_ptr<Buffer> bitmap = rand.NullBitmap(num_rows, 1); | |
584 | std::shared_ptr<Buffer> buffer = GenerateFixedDifferenceBuffer(3, num_rows + 1); | |
585 | std::shared_ptr<Array> input_array = std::make_shared<FixedSizeListArray>( | |
586 | fixed_size_list(int32(), 3), num_rows, value_array, bitmap), | |
587 | output_array = std::make_shared<ListArray>( | |
588 | list(int32()), num_rows, buffer, value_array, bitmap); | |
589 | AssertArrayWriteReadEqual(input_array, output_array, kDefaultSmallMemStreamSize * 10); | |
590 | } | |
591 | ||
592 | TEST_F(TestORCWriterSingleArray, WriteListOfList) { | |
593 | const int64_t num_rows = 1234; | |
594 | auto value_value_array = rand.ArrayOf(utf8(), 4 * num_rows, 0.5); | |
595 | std::shared_ptr<Array> value_array = rand.List(*value_value_array, 2 * num_rows, 0.7); | |
596 | std::shared_ptr<Array> array = rand.List(*value_array, num_rows, 0.4); | |
597 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 10); | |
598 | } | |
599 | ||
600 | TEST_F(TestORCWriterSingleArray, WriteListOfListOfList) { | |
601 | const int64_t num_rows = 1234; | |
602 | auto value3_array = rand.ArrayOf(int64(), 12 * num_rows, 0.1); | |
603 | std::shared_ptr<Array> value2_array = rand.List(*value3_array, 5 * num_rows, 0); | |
604 | std::shared_ptr<Array> value_array = rand.List(*value2_array, 2 * num_rows, 0.1); | |
605 | std::shared_ptr<Array> array = rand.List(*value_array, num_rows, 0.1); | |
606 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 35); | |
607 | } | |
608 | ||
609 | TEST_F(TestORCWriterSingleArray, WriteListOfStruct) { | |
610 | const int64_t num_rows = 1234, num_values = 3 * num_rows; | |
611 | ArrayVector av00(1); | |
612 | av00[0] = rand.ArrayOf(int32(), num_values, 0); | |
613 | std::shared_ptr<Buffer> bitmap = rand.NullBitmap(num_values, 0.2); | |
614 | std::shared_ptr<Array> value_array = std::make_shared<StructArray>( | |
615 | struct_({field("a", int32())}), num_values, av00, bitmap); | |
616 | std::shared_ptr<Array> array = rand.List(*value_array, num_rows, 0); | |
617 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 30); | |
618 | } | |
619 | ||
620 | TEST_F(TestORCWriterSingleArray, WriteStructOfList) { | |
621 | const int64_t num_rows = 1234; | |
622 | ArrayVector av0(1); | |
623 | auto value_array = rand.ArrayOf(int32(), 5 * num_rows, 0.2); | |
624 | av0[0] = rand.List(*value_array, num_rows, 0); | |
625 | std::shared_ptr<Buffer> bitmap = rand.NullBitmap(num_rows, 0.2); | |
626 | std::shared_ptr<Array> array = std::make_shared<StructArray>( | |
627 | struct_({field("a", list(int32()))}), num_rows, av0, bitmap); | |
628 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 20); | |
629 | } | |
630 | ||
631 | TEST_F(TestORCWriterSingleArray, WriteMap) { | |
632 | const int64_t num_rows = 1234; | |
633 | auto key_array = rand.ArrayOf(int32(), 20 * num_rows, 0); | |
634 | auto item_array = rand.ArrayOf(int32(), 20 * num_rows, 1); | |
635 | std::shared_ptr<Array> array = rand.Map(key_array, item_array, num_rows, 0.1); | |
636 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 50); | |
637 | } | |
638 | ||
639 | TEST_F(TestORCWriterSingleArray, WriteStructOfMap) { | |
640 | const int64_t num_rows = 1234, num_values = 5 * num_rows; | |
641 | ArrayVector av0(1); | |
642 | auto key_array = rand.ArrayOf(binary(), num_values, 0); | |
643 | auto item_array = rand.ArrayOf(int32(), num_values, 0.5); | |
644 | av0[0] = rand.Map(key_array, item_array, num_rows, 0.2); | |
645 | std::shared_ptr<Array> array = std::make_shared<StructArray>( | |
646 | struct_({field("a", map(binary(), int32()))}), num_rows, av0); | |
647 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 20); | |
648 | } | |
649 | ||
650 | TEST_F(TestORCWriterSingleArray, WriteMapOfStruct) { | |
651 | const int64_t num_rows = 1234, num_values = 10 * num_rows; | |
652 | std::shared_ptr<Array> key_array = rand.ArrayOf(utf8(), num_values, 0); | |
653 | ArrayVector av00(1); | |
654 | av00[0] = rand.ArrayOf(int32(), num_values, 0.1); | |
655 | std::shared_ptr<Buffer> bitmap = rand.NullBitmap(num_values, 0.2); | |
656 | std::shared_ptr<Array> item_array = std::make_shared<StructArray>( | |
657 | struct_({field("a", int32())}), num_values, av00, bitmap); | |
658 | std::shared_ptr<Array> array = rand.Map(key_array, item_array, num_rows, 0.1); | |
659 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 10); | |
660 | } | |
661 | ||
662 | TEST_F(TestORCWriterSingleArray, WriteMapOfMap) { | |
663 | const int64_t num_rows = 1234; | |
664 | auto key_key_array = rand.ArrayOf(utf8(), 4 * num_rows, 0); | |
665 | auto key_item_array = rand.ArrayOf(int32(), 4 * num_rows, 0.5); | |
666 | std::shared_ptr<Array> key_array = | |
667 | rand.Map(key_key_array, key_item_array, 2 * num_rows, 0); | |
668 | auto item_key_array = rand.ArrayOf(utf8(), 4 * num_rows, 0); | |
669 | auto item_item_array = rand.ArrayOf(int32(), 4 * num_rows, 0.2); | |
670 | std::shared_ptr<Array> item_array = | |
671 | rand.Map(item_key_array, item_item_array, 2 * num_rows, 0.3); | |
672 | std::shared_ptr<Array> array = rand.Map(key_array, item_array, num_rows, 0.4); | |
673 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 10); | |
674 | } | |
675 | ||
676 | TEST_F(TestORCWriterSingleArray, WriteListOfMap) { | |
677 | const int64_t num_rows = 1234; | |
678 | auto value_key_array = rand.ArrayOf(utf8(), 4 * num_rows, 0); | |
679 | auto value_item_array = rand.ArrayOf(int32(), 4 * num_rows, 0.5); | |
680 | std::shared_ptr<Array> value_array = | |
681 | rand.Map(value_key_array, value_item_array, 2 * num_rows, 0.2); | |
682 | std::shared_ptr<Array> array = rand.List(*value_array, num_rows, 0.4); | |
683 | AssertArrayWriteReadEqual(array, array, kDefaultSmallMemStreamSize * 10); | |
684 | } | |
685 | ||
686 | } // namespace arrow |