]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "parquet/arrow/reader_internal.h" | |
19 | ||
20 | #include <algorithm> | |
21 | #include <climits> | |
22 | #include <cstdint> | |
23 | #include <cstring> | |
24 | #include <memory> | |
25 | #include <string> | |
26 | #include <type_traits> | |
27 | #include <vector> | |
28 | ||
29 | #include "arrow/array.h" | |
30 | #include "arrow/compute/api.h" | |
31 | #include "arrow/datum.h" | |
32 | #include "arrow/io/memory.h" | |
33 | #include "arrow/ipc/reader.h" | |
34 | #include "arrow/ipc/writer.h" | |
35 | #include "arrow/scalar.h" | |
36 | #include "arrow/status.h" | |
37 | #include "arrow/table.h" | |
38 | #include "arrow/type.h" | |
39 | #include "arrow/type_traits.h" | |
40 | #include "arrow/util/base64.h" | |
41 | #include "arrow/util/bit_util.h" | |
42 | #include "arrow/util/checked_cast.h" | |
43 | #include "arrow/util/endian.h" | |
44 | #include "arrow/util/int_util_internal.h" | |
45 | #include "arrow/util/logging.h" | |
46 | #include "arrow/util/string_view.h" | |
47 | #include "arrow/util/ubsan.h" | |
48 | #include "arrow/visitor_inline.h" | |
49 | #include "parquet/arrow/reader.h" | |
50 | #include "parquet/arrow/schema.h" | |
51 | #include "parquet/arrow/schema_internal.h" | |
52 | #include "parquet/column_reader.h" | |
53 | #include "parquet/platform.h" | |
54 | #include "parquet/properties.h" | |
55 | #include "parquet/schema.h" | |
56 | #include "parquet/statistics.h" | |
57 | #include "parquet/types.h" | |
58 | // Required after "arrow/util/int_util_internal.h" (for OPTIONAL) | |
59 | #include "parquet/windows_compatibility.h" | |
60 | ||
61 | using arrow::Array; | |
62 | using arrow::BooleanArray; | |
63 | using arrow::ChunkedArray; | |
64 | using arrow::DataType; | |
65 | using arrow::Datum; | |
66 | using arrow::Decimal128; | |
67 | using arrow::Decimal128Array; | |
68 | using arrow::Decimal128Type; | |
69 | using arrow::Decimal256; | |
70 | using arrow::Decimal256Array; | |
71 | using arrow::Decimal256Type; | |
72 | using arrow::Field; | |
73 | using arrow::Int32Array; | |
74 | using arrow::ListArray; | |
75 | using arrow::MemoryPool; | |
76 | using arrow::ResizableBuffer; | |
77 | using arrow::Status; | |
78 | using arrow::StructArray; | |
79 | using arrow::Table; | |
80 | using arrow::TimestampArray; | |
81 | ||
82 | using ::arrow::BitUtil::FromBigEndian; | |
83 | using ::arrow::internal::checked_cast; | |
84 | using ::arrow::internal::checked_pointer_cast; | |
85 | using ::arrow::internal::SafeLeftShift; | |
86 | using ::arrow::util::SafeLoadAs; | |
87 | ||
88 | using parquet::internal::BinaryRecordReader; | |
89 | using parquet::internal::DictionaryRecordReader; | |
90 | using parquet::internal::RecordReader; | |
91 | using parquet::schema::GroupNode; | |
92 | using parquet::schema::Node; | |
93 | using parquet::schema::PrimitiveNode; | |
94 | using ParquetType = parquet::Type; | |
95 | ||
96 | namespace BitUtil = arrow::BitUtil; | |
97 | ||
98 | namespace parquet { | |
99 | namespace arrow { | |
100 | namespace { | |
101 | ||
102 | template <typename ArrowType> | |
103 | using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType; | |
104 | ||
105 | template <typename CType, typename StatisticsType> | |
106 | Status MakeMinMaxScalar(const StatisticsType& statistics, | |
107 | std::shared_ptr<::arrow::Scalar>* min, | |
108 | std::shared_ptr<::arrow::Scalar>* max) { | |
109 | *min = ::arrow::MakeScalar(static_cast<CType>(statistics.min())); | |
110 | *max = ::arrow::MakeScalar(static_cast<CType>(statistics.max())); | |
111 | return Status::OK(); | |
112 | } | |
113 | ||
114 | template <typename CType, typename StatisticsType> | |
115 | Status MakeMinMaxTypedScalar(const StatisticsType& statistics, | |
116 | std::shared_ptr<DataType> type, | |
117 | std::shared_ptr<::arrow::Scalar>* min, | |
118 | std::shared_ptr<::arrow::Scalar>* max) { | |
119 | ARROW_ASSIGN_OR_RAISE(*min, ::arrow::MakeScalar(type, statistics.min())); | |
120 | ARROW_ASSIGN_OR_RAISE(*max, ::arrow::MakeScalar(type, statistics.max())); | |
121 | return Status::OK(); | |
122 | } | |
123 | ||
124 | template <typename StatisticsType> | |
125 | Status MakeMinMaxIntegralScalar(const StatisticsType& statistics, | |
126 | const ::arrow::DataType& arrow_type, | |
127 | std::shared_ptr<::arrow::Scalar>* min, | |
128 | std::shared_ptr<::arrow::Scalar>* max) { | |
129 | const auto column_desc = statistics.descr(); | |
130 | const auto& logical_type = column_desc->logical_type(); | |
131 | const auto& integer = checked_pointer_cast<const IntLogicalType>(logical_type); | |
132 | const bool is_signed = integer->is_signed(); | |
133 | ||
134 | switch (integer->bit_width()) { | |
135 | case 8: | |
136 | return is_signed ? MakeMinMaxScalar<int8_t>(statistics, min, max) | |
137 | : MakeMinMaxScalar<uint8_t>(statistics, min, max); | |
138 | case 16: | |
139 | return is_signed ? MakeMinMaxScalar<int16_t>(statistics, min, max) | |
140 | : MakeMinMaxScalar<uint16_t>(statistics, min, max); | |
141 | case 32: | |
142 | return is_signed ? MakeMinMaxScalar<int32_t>(statistics, min, max) | |
143 | : MakeMinMaxScalar<uint32_t>(statistics, min, max); | |
144 | case 64: | |
145 | return is_signed ? MakeMinMaxScalar<int64_t>(statistics, min, max) | |
146 | : MakeMinMaxScalar<uint64_t>(statistics, min, max); | |
147 | } | |
148 | ||
149 | return Status::OK(); | |
150 | } | |
151 | ||
152 | static Status FromInt32Statistics(const Int32Statistics& statistics, | |
153 | const LogicalType& logical_type, | |
154 | std::shared_ptr<::arrow::Scalar>* min, | |
155 | std::shared_ptr<::arrow::Scalar>* max) { | |
156 | ARROW_ASSIGN_OR_RAISE(auto type, FromInt32(logical_type)); | |
157 | ||
158 | switch (logical_type.type()) { | |
159 | case LogicalType::Type::INT: | |
160 | return MakeMinMaxIntegralScalar(statistics, *type, min, max); | |
161 | break; | |
162 | case LogicalType::Type::DATE: | |
163 | case LogicalType::Type::TIME: | |
164 | case LogicalType::Type::NONE: | |
165 | return MakeMinMaxTypedScalar<int32_t>(statistics, type, min, max); | |
166 | break; | |
167 | default: | |
168 | break; | |
169 | } | |
170 | ||
171 | return Status::NotImplemented("Cannot extract statistics for type "); | |
172 | } | |
173 | ||
174 | static Status FromInt64Statistics(const Int64Statistics& statistics, | |
175 | const LogicalType& logical_type, | |
176 | std::shared_ptr<::arrow::Scalar>* min, | |
177 | std::shared_ptr<::arrow::Scalar>* max) { | |
178 | ARROW_ASSIGN_OR_RAISE(auto type, FromInt64(logical_type)); | |
179 | ||
180 | switch (logical_type.type()) { | |
181 | case LogicalType::Type::INT: | |
182 | return MakeMinMaxIntegralScalar(statistics, *type, min, max); | |
183 | break; | |
184 | case LogicalType::Type::TIME: | |
185 | case LogicalType::Type::TIMESTAMP: | |
186 | case LogicalType::Type::NONE: | |
187 | return MakeMinMaxTypedScalar<int64_t>(statistics, type, min, max); | |
188 | break; | |
189 | default: | |
190 | break; | |
191 | } | |
192 | ||
193 | return Status::NotImplemented("Cannot extract statistics for type "); | |
194 | } | |
195 | ||
196 | template <typename DecimalType> | |
197 | Result<std::shared_ptr<::arrow::Scalar>> FromBigEndianString( | |
198 | const std::string& data, std::shared_ptr<DataType> arrow_type) { | |
199 | ARROW_ASSIGN_OR_RAISE( | |
200 | DecimalType decimal, | |
201 | DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()), | |
202 | static_cast<int32_t>(data.size()))); | |
203 | return ::arrow::MakeScalar(std::move(arrow_type), decimal); | |
204 | } | |
205 | ||
206 | // Extracts Min and Max scalar from bytes like types (i.e. types where | |
207 | // decimal is encoded as little endian. | |
208 | Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics, | |
209 | const LogicalType& logical_type, | |
210 | std::shared_ptr<::arrow::Scalar>* min, | |
211 | std::shared_ptr<::arrow::Scalar>* max) { | |
212 | const DecimalLogicalType& decimal_type = | |
213 | checked_cast<const DecimalLogicalType&>(logical_type); | |
214 | ||
215 | Result<std::shared_ptr<DataType>> maybe_type = | |
216 | Decimal128Type::Make(decimal_type.precision(), decimal_type.scale()); | |
217 | std::shared_ptr<DataType> arrow_type; | |
218 | if (maybe_type.ok()) { | |
219 | arrow_type = maybe_type.ValueOrDie(); | |
220 | ARROW_ASSIGN_OR_RAISE( | |
221 | *min, FromBigEndianString<Decimal128>(statistics.EncodeMin(), arrow_type)); | |
222 | ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal128>(statistics.EncodeMax(), | |
223 | std::move(arrow_type))); | |
224 | return Status::OK(); | |
225 | } | |
226 | // Fallback to see if Decimal256 can represent the type. | |
227 | ARROW_ASSIGN_OR_RAISE( | |
228 | arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale())); | |
229 | ARROW_ASSIGN_OR_RAISE( | |
230 | *min, FromBigEndianString<Decimal256>(statistics.EncodeMin(), arrow_type)); | |
231 | ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal256>(statistics.EncodeMax(), | |
232 | std::move(arrow_type))); | |
233 | ||
234 | return Status::OK(); | |
235 | } | |
236 | ||
237 | Status ByteArrayStatisticsAsScalars(const Statistics& statistics, | |
238 | std::shared_ptr<::arrow::Scalar>* min, | |
239 | std::shared_ptr<::arrow::Scalar>* max) { | |
240 | auto logical_type = statistics.descr()->logical_type(); | |
241 | if (logical_type->type() == LogicalType::Type::DECIMAL) { | |
242 | return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min, max); | |
243 | } | |
244 | std::shared_ptr<::arrow::DataType> type; | |
245 | if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { | |
246 | type = ::arrow::fixed_size_binary(statistics.descr()->type_length()); | |
247 | } else { | |
248 | type = logical_type->type() == LogicalType::Type::STRING ? ::arrow::utf8() | |
249 | : ::arrow::binary(); | |
250 | } | |
251 | ARROW_ASSIGN_OR_RAISE( | |
252 | *min, ::arrow::MakeScalar(type, Buffer::FromString(statistics.EncodeMin()))); | |
253 | ARROW_ASSIGN_OR_RAISE( | |
254 | *max, ::arrow::MakeScalar(type, Buffer::FromString(statistics.EncodeMax()))); | |
255 | ||
256 | return Status::OK(); | |
257 | } | |
258 | ||
259 | } // namespace | |
260 | ||
261 | Status StatisticsAsScalars(const Statistics& statistics, | |
262 | std::shared_ptr<::arrow::Scalar>* min, | |
263 | std::shared_ptr<::arrow::Scalar>* max) { | |
264 | if (!statistics.HasMinMax()) { | |
265 | return Status::Invalid("Statistics has no min max."); | |
266 | } | |
267 | ||
268 | auto column_desc = statistics.descr(); | |
269 | if (column_desc == nullptr) { | |
270 | return Status::Invalid("Statistics carries no descriptor, can't infer arrow type."); | |
271 | } | |
272 | ||
273 | auto physical_type = column_desc->physical_type(); | |
274 | auto logical_type = column_desc->logical_type(); | |
275 | switch (physical_type) { | |
276 | case Type::BOOLEAN: | |
277 | return MakeMinMaxScalar<bool, BoolStatistics>( | |
278 | checked_cast<const BoolStatistics&>(statistics), min, max); | |
279 | case Type::FLOAT: | |
280 | return MakeMinMaxScalar<float, FloatStatistics>( | |
281 | checked_cast<const FloatStatistics&>(statistics), min, max); | |
282 | case Type::DOUBLE: | |
283 | return MakeMinMaxScalar<double, DoubleStatistics>( | |
284 | checked_cast<const DoubleStatistics&>(statistics), min, max); | |
285 | case Type::INT32: | |
286 | return FromInt32Statistics(checked_cast<const Int32Statistics&>(statistics), | |
287 | *logical_type, min, max); | |
288 | case Type::INT64: | |
289 | return FromInt64Statistics(checked_cast<const Int64Statistics&>(statistics), | |
290 | *logical_type, min, max); | |
291 | case Type::BYTE_ARRAY: | |
292 | case Type::FIXED_LEN_BYTE_ARRAY: | |
293 | return ByteArrayStatisticsAsScalars(statistics, min, max); | |
294 | default: | |
295 | return Status::NotImplemented("Extract statistics unsupported for physical_type ", | |
296 | physical_type, " unsupported."); | |
297 | } | |
298 | ||
299 | return Status::OK(); | |
300 | } | |
301 | ||
302 | // ---------------------------------------------------------------------- | |
303 | // Primitive types | |
304 | ||
305 | namespace { | |
306 | ||
307 | template <typename ArrowType, typename ParquetType> | |
308 | Status TransferInt(RecordReader* reader, MemoryPool* pool, | |
309 | const std::shared_ptr<DataType>& type, Datum* out) { | |
310 | using ArrowCType = typename ArrowType::c_type; | |
311 | using ParquetCType = typename ParquetType::c_type; | |
312 | int64_t length = reader->values_written(); | |
313 | ARROW_ASSIGN_OR_RAISE(auto data, | |
314 | ::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool)); | |
315 | ||
316 | auto values = reinterpret_cast<const ParquetCType*>(reader->values()); | |
317 | auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data()); | |
318 | std::copy(values, values + length, out_ptr); | |
319 | *out = std::make_shared<ArrayType<ArrowType>>( | |
320 | type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count()); | |
321 | return Status::OK(); | |
322 | } | |
323 | ||
324 | std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader, | |
325 | const std::shared_ptr<DataType>& type) { | |
326 | std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(), | |
327 | reader->ReleaseValues()}; | |
328 | auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(), | |
329 | buffers, reader->null_count()); | |
330 | return ::arrow::MakeArray(data); | |
331 | } | |
332 | ||
333 | Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) { | |
334 | int64_t length = reader->values_written(); | |
335 | ||
336 | const int64_t buffer_size = BitUtil::BytesForBits(length); | |
337 | ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(buffer_size, pool)); | |
338 | ||
339 | // Transfer boolean values to packed bitmap | |
340 | auto values = reinterpret_cast<const bool*>(reader->values()); | |
341 | uint8_t* data_ptr = data->mutable_data(); | |
342 | memset(data_ptr, 0, buffer_size); | |
343 | ||
344 | for (int64_t i = 0; i < length; i++) { | |
345 | if (values[i]) { | |
346 | ::arrow::BitUtil::SetBit(data_ptr, i); | |
347 | } | |
348 | } | |
349 | ||
350 | *out = std::make_shared<BooleanArray>(length, std::move(data), reader->ReleaseIsValid(), | |
351 | reader->null_count()); | |
352 | return Status::OK(); | |
353 | } | |
354 | ||
355 | Status TransferInt96(RecordReader* reader, MemoryPool* pool, | |
356 | const std::shared_ptr<DataType>& type, Datum* out, | |
357 | const ::arrow::TimeUnit::type int96_arrow_time_unit) { | |
358 | int64_t length = reader->values_written(); | |
359 | auto values = reinterpret_cast<const Int96*>(reader->values()); | |
360 | ARROW_ASSIGN_OR_RAISE(auto data, | |
361 | ::arrow::AllocateBuffer(length * sizeof(int64_t), pool)); | |
362 | auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data()); | |
363 | for (int64_t i = 0; i < length; i++) { | |
364 | if (values[i].value[2] == 0) { | |
365 | // Happens for null entries: avoid triggering UBSAN as that Int96 timestamp | |
366 | // isn't representable as a 64-bit Unix timestamp. | |
367 | *data_ptr++ = 0; | |
368 | } else { | |
369 | switch (int96_arrow_time_unit) { | |
370 | case ::arrow::TimeUnit::NANO: | |
371 | *data_ptr++ = Int96GetNanoSeconds(values[i]); | |
372 | break; | |
373 | case ::arrow::TimeUnit::MICRO: | |
374 | *data_ptr++ = Int96GetMicroSeconds(values[i]); | |
375 | break; | |
376 | case ::arrow::TimeUnit::MILLI: | |
377 | *data_ptr++ = Int96GetMilliSeconds(values[i]); | |
378 | break; | |
379 | case ::arrow::TimeUnit::SECOND: | |
380 | *data_ptr++ = Int96GetSeconds(values[i]); | |
381 | break; | |
382 | } | |
383 | } | |
384 | } | |
385 | *out = std::make_shared<TimestampArray>(type, length, std::move(data), | |
386 | reader->ReleaseIsValid(), reader->null_count()); | |
387 | return Status::OK(); | |
388 | } | |
389 | ||
390 | Status TransferDate64(RecordReader* reader, MemoryPool* pool, | |
391 | const std::shared_ptr<DataType>& type, Datum* out) { | |
392 | int64_t length = reader->values_written(); | |
393 | auto values = reinterpret_cast<const int32_t*>(reader->values()); | |
394 | ||
395 | ARROW_ASSIGN_OR_RAISE(auto data, | |
396 | ::arrow::AllocateBuffer(length * sizeof(int64_t), pool)); | |
397 | auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data()); | |
398 | ||
399 | for (int64_t i = 0; i < length; i++) { | |
400 | *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay; | |
401 | } | |
402 | ||
403 | *out = std::make_shared<::arrow::Date64Array>( | |
404 | type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count()); | |
405 | return Status::OK(); | |
406 | } | |
407 | ||
408 | // ---------------------------------------------------------------------- | |
409 | // Binary, direct to dictionary-encoded | |
410 | ||
411 | Status TransferDictionary(RecordReader* reader, | |
412 | const std::shared_ptr<DataType>& logical_value_type, | |
413 | std::shared_ptr<ChunkedArray>* out) { | |
414 | auto dict_reader = dynamic_cast<DictionaryRecordReader*>(reader); | |
415 | DCHECK(dict_reader); | |
416 | *out = dict_reader->GetResult(); | |
417 | if (!logical_value_type->Equals(*(*out)->type())) { | |
418 | ARROW_ASSIGN_OR_RAISE(*out, (*out)->View(logical_value_type)); | |
419 | } | |
420 | return Status::OK(); | |
421 | } | |
422 | ||
423 | Status TransferBinary(RecordReader* reader, MemoryPool* pool, | |
424 | const std::shared_ptr<DataType>& logical_value_type, | |
425 | std::shared_ptr<ChunkedArray>* out) { | |
426 | if (reader->read_dictionary()) { | |
427 | return TransferDictionary( | |
428 | reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), out); | |
429 | } | |
430 | ::arrow::compute::ExecContext ctx(pool); | |
431 | ::arrow::compute::CastOptions cast_options; | |
432 | cast_options.allow_invalid_utf8 = true; // avoid spending time validating UTF8 data | |
433 | ||
434 | auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader); | |
435 | DCHECK(binary_reader); | |
436 | auto chunks = binary_reader->GetBuilderChunks(); | |
437 | for (auto& chunk : chunks) { | |
438 | if (!chunk->type()->Equals(*logical_value_type)) { | |
439 | // XXX: if a LargeBinary chunk is larger than 2GB, the MSBs of offsets | |
440 | // will be lost because they are first created as int32 and then cast to int64. | |
441 | ARROW_ASSIGN_OR_RAISE( | |
442 | chunk, ::arrow::compute::Cast(*chunk, logical_value_type, cast_options, &ctx)); | |
443 | } | |
444 | } | |
445 | *out = std::make_shared<ChunkedArray>(chunks, logical_value_type); | |
446 | return Status::OK(); | |
447 | } | |
448 | ||
449 | // ---------------------------------------------------------------------- | |
450 | // INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || Decimal256 | |
451 | ||
452 | template <typename DecimalType> | |
453 | Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, | |
454 | uint8_t* out_buf) { | |
455 | ARROW_ASSIGN_OR_RAISE(DecimalType t, DecimalType::FromBigEndian(value, byte_width)); | |
456 | t.ToBytes(out_buf); | |
457 | return ::arrow::Status::OK(); | |
458 | } | |
459 | ||
460 | template <typename DecimalArrayType> | |
461 | struct DecimalTypeTrait; | |
462 | ||
463 | template <> | |
464 | struct DecimalTypeTrait<::arrow::Decimal128Array> { | |
465 | using value = ::arrow::Decimal128; | |
466 | }; | |
467 | ||
468 | template <> | |
469 | struct DecimalTypeTrait<::arrow::Decimal256Array> { | |
470 | using value = ::arrow::Decimal256; | |
471 | }; | |
472 | ||
473 | template <typename DecimalArrayType, typename ParquetType> | |
474 | struct DecimalConverter { | |
475 | static inline Status ConvertToDecimal(const Array& array, | |
476 | const std::shared_ptr<DataType>&, | |
477 | MemoryPool* pool, std::shared_ptr<Array>*) { | |
478 | return Status::NotImplemented("not implemented"); | |
479 | } | |
480 | }; | |
481 | ||
482 | template <typename DecimalArrayType> | |
483 | struct DecimalConverter<DecimalArrayType, FLBAType> { | |
484 | static inline Status ConvertToDecimal(const Array& array, | |
485 | const std::shared_ptr<DataType>& type, | |
486 | MemoryPool* pool, std::shared_ptr<Array>* out) { | |
487 | const auto& fixed_size_binary_array = | |
488 | checked_cast<const ::arrow::FixedSizeBinaryArray&>(array); | |
489 | ||
490 | // The byte width of each decimal value | |
491 | const int32_t type_length = | |
492 | checked_cast<const ::arrow::DecimalType&>(*type).byte_width(); | |
493 | ||
494 | // number of elements in the entire array | |
495 | const int64_t length = fixed_size_binary_array.length(); | |
496 | ||
497 | // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time | |
498 | // this will be different from the decimal array width because we write the minimum | |
499 | // number of bytes necessary to represent a given precision | |
500 | const int32_t byte_width = | |
501 | checked_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type()) | |
502 | .byte_width(); | |
503 | // allocate memory for the decimal array | |
504 | ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool)); | |
505 | ||
506 | // raw bytes that we can write to | |
507 | uint8_t* out_ptr = data->mutable_data(); | |
508 | ||
509 | // convert each FixedSizeBinary value to valid decimal bytes | |
510 | const int64_t null_count = fixed_size_binary_array.null_count(); | |
511 | ||
512 | using DecimalType = typename DecimalTypeTrait<DecimalArrayType>::value; | |
513 | if (null_count > 0) { | |
514 | for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { | |
515 | if (!fixed_size_binary_array.IsNull(i)) { | |
516 | RETURN_NOT_OK(RawBytesToDecimalBytes<DecimalType>( | |
517 | fixed_size_binary_array.GetValue(i), byte_width, out_ptr)); | |
518 | } else { | |
519 | std::memset(out_ptr, 0, type_length); | |
520 | } | |
521 | } | |
522 | } else { | |
523 | for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { | |
524 | RETURN_NOT_OK(RawBytesToDecimalBytes<DecimalType>( | |
525 | fixed_size_binary_array.GetValue(i), byte_width, out_ptr)); | |
526 | } | |
527 | } | |
528 | ||
529 | *out = std::make_shared<DecimalArrayType>( | |
530 | type, length, std::move(data), fixed_size_binary_array.null_bitmap(), null_count); | |
531 | ||
532 | return Status::OK(); | |
533 | } | |
534 | }; | |
535 | ||
536 | template <typename DecimalArrayType> | |
537 | struct DecimalConverter<DecimalArrayType, ByteArrayType> { | |
538 | static inline Status ConvertToDecimal(const Array& array, | |
539 | const std::shared_ptr<DataType>& type, | |
540 | MemoryPool* pool, std::shared_ptr<Array>* out) { | |
541 | const auto& binary_array = checked_cast<const ::arrow::BinaryArray&>(array); | |
542 | const int64_t length = binary_array.length(); | |
543 | ||
544 | const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*type); | |
545 | const int64_t type_length = decimal_type.byte_width(); | |
546 | ||
547 | ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool)); | |
548 | ||
549 | // raw bytes that we can write to | |
550 | uint8_t* out_ptr = data->mutable_data(); | |
551 | ||
552 | const int64_t null_count = binary_array.null_count(); | |
553 | ||
554 | // convert each BinaryArray value to valid decimal bytes | |
555 | for (int64_t i = 0; i < length; i++, out_ptr += type_length) { | |
556 | int32_t record_len = 0; | |
557 | const uint8_t* record_loc = binary_array.GetValue(i, &record_len); | |
558 | ||
559 | if (record_len < 0 || record_len > type_length) { | |
560 | return Status::Invalid("Invalid BYTE_ARRAY length for ", type->ToString()); | |
561 | } | |
562 | ||
563 | auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr); | |
564 | out_ptr_view[0] = 0; | |
565 | out_ptr_view[1] = 0; | |
566 | ||
567 | // only convert rows that are not null if there are nulls, or | |
568 | // all rows, if there are not | |
569 | if ((null_count > 0 && !binary_array.IsNull(i)) || null_count <= 0) { | |
570 | using DecimalType = typename DecimalTypeTrait<DecimalArrayType>::value; | |
571 | RETURN_NOT_OK( | |
572 | RawBytesToDecimalBytes<DecimalType>(record_loc, record_len, out_ptr)); | |
573 | } | |
574 | } | |
575 | *out = std::make_shared<DecimalArrayType>(type, length, std::move(data), | |
576 | binary_array.null_bitmap(), null_count); | |
577 | return Status::OK(); | |
578 | } | |
579 | }; | |
580 | ||
581 | /// \brief Convert an Int32 or Int64 array into a Decimal128Array | |
582 | /// The parquet spec allows systems to write decimals in int32, int64 if the values are | |
583 | /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. | |
584 | /// This function implements the conversion from int32 and int64 arrays to decimal arrays. | |
585 | template < | |
586 | typename ParquetIntegerType, | |
587 | typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, Int32Type>::value || | |
588 | std::is_same<ParquetIntegerType, Int64Type>::value>> | |
589 | static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, | |
590 | const std::shared_ptr<DataType>& type, Datum* out) { | |
591 | // Decimal128 and Decimal256 are only Arrow constructs. Parquet does not | |
592 | // specifically distinguish between decimal byte widths. | |
593 | // Decimal256 isn't relevant here because the Arrow-Parquet C++ bindings never | |
594 | // write Decimal values as integers and if the decimal value can fit in an | |
595 | // integer it is wasteful to use Decimal256. Put another way, the only | |
596 | // way an integer column could be construed as Decimal256 is if an arrow | |
597 | // schema was stored as metadata in the file indicating the column was | |
598 | // Decimal256. The current Arrow-Parquet C++ bindings will never do this. | |
599 | DCHECK(type->id() == ::arrow::Type::DECIMAL128); | |
600 | ||
601 | const int64_t length = reader->values_written(); | |
602 | ||
603 | using ElementType = typename ParquetIntegerType::c_type; | |
604 | static_assert(std::is_same<ElementType, int32_t>::value || | |
605 | std::is_same<ElementType, int64_t>::value, | |
606 | "ElementType must be int32_t or int64_t"); | |
607 | ||
608 | const auto values = reinterpret_cast<const ElementType*>(reader->values()); | |
609 | ||
610 | const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*type); | |
611 | const int64_t type_length = decimal_type.byte_width(); | |
612 | ||
613 | ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool)); | |
614 | uint8_t* out_ptr = data->mutable_data(); | |
615 | ||
616 | using ::arrow::BitUtil::FromLittleEndian; | |
617 | ||
618 | for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { | |
619 | // sign/zero extend int32_t values, otherwise a no-op | |
620 | const auto value = static_cast<int64_t>(values[i]); | |
621 | ||
622 | ::arrow::Decimal128 decimal(value); | |
623 | decimal.ToBytes(out_ptr); | |
624 | } | |
625 | ||
626 | if (reader->nullable_values()) { | |
627 | std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid(); | |
628 | *out = std::make_shared<Decimal128Array>(type, length, std::move(data), is_valid, | |
629 | reader->null_count()); | |
630 | } else { | |
631 | *out = std::make_shared<Decimal128Array>(type, length, std::move(data)); | |
632 | } | |
633 | return Status::OK(); | |
634 | } | |
635 | ||
636 | /// \brief Convert an arrow::BinaryArray to an arrow::Decimal{128,256}Array | |
637 | /// We do this by: | |
638 | /// 1. Creating an arrow::BinaryArray from the RecordReader's builder | |
639 | /// 2. Allocating a buffer for the arrow::Decimal{128,256}Array | |
640 | /// 3. Converting the big-endian bytes in each BinaryArray entry to two integers | |
641 | /// representing the high and low bits of each decimal value. | |
642 | template <typename DecimalArrayType, typename ParquetType> | |
643 | Status TransferDecimal(RecordReader* reader, MemoryPool* pool, | |
644 | const std::shared_ptr<DataType>& type, Datum* out) { | |
645 | auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader); | |
646 | DCHECK(binary_reader); | |
647 | ::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks(); | |
648 | for (size_t i = 0; i < chunks.size(); ++i) { | |
649 | std::shared_ptr<Array> chunk_as_decimal; | |
650 | auto fn = &DecimalConverter<DecimalArrayType, ParquetType>::ConvertToDecimal; | |
651 | RETURN_NOT_OK(fn(*chunks[i], type, pool, &chunk_as_decimal)); | |
652 | // Replace the chunk, which will hopefully also free memory as we go | |
653 | chunks[i] = chunk_as_decimal; | |
654 | } | |
655 | *out = std::make_shared<ChunkedArray>(chunks, type); | |
656 | return Status::OK(); | |
657 | } | |
658 | ||
659 | } // namespace | |
660 | ||
661 | #define TRANSFER_INT32(ENUM, ArrowType) \ | |
662 | case ::arrow::Type::ENUM: { \ | |
663 | Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type, &result); \ | |
664 | RETURN_NOT_OK(s); \ | |
665 | } break; | |
666 | ||
667 | #define TRANSFER_INT64(ENUM, ArrowType) \ | |
668 | case ::arrow::Type::ENUM: { \ | |
669 | Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_type, &result); \ | |
670 | RETURN_NOT_OK(s); \ | |
671 | } break; | |
672 | ||
673 | Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> value_type, | |
674 | const ColumnDescriptor* descr, MemoryPool* pool, | |
675 | std::shared_ptr<ChunkedArray>* out) { | |
676 | Datum result; | |
677 | std::shared_ptr<ChunkedArray> chunked_result; | |
678 | switch (value_type->id()) { | |
679 | case ::arrow::Type::DICTIONARY: { | |
680 | RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result)); | |
681 | result = chunked_result; | |
682 | } break; | |
683 | case ::arrow::Type::NA: { | |
684 | result = std::make_shared<::arrow::NullArray>(reader->values_written()); | |
685 | break; | |
686 | } | |
687 | case ::arrow::Type::INT32: | |
688 | case ::arrow::Type::INT64: | |
689 | case ::arrow::Type::FLOAT: | |
690 | case ::arrow::Type::DOUBLE: | |
691 | result = TransferZeroCopy(reader, value_type); | |
692 | break; | |
693 | case ::arrow::Type::BOOL: | |
694 | RETURN_NOT_OK(TransferBool(reader, pool, &result)); | |
695 | break; | |
696 | TRANSFER_INT32(UINT8, ::arrow::UInt8Type); | |
697 | TRANSFER_INT32(INT8, ::arrow::Int8Type); | |
698 | TRANSFER_INT32(UINT16, ::arrow::UInt16Type); | |
699 | TRANSFER_INT32(INT16, ::arrow::Int16Type); | |
700 | TRANSFER_INT32(UINT32, ::arrow::UInt32Type); | |
701 | TRANSFER_INT64(UINT64, ::arrow::UInt64Type); | |
702 | TRANSFER_INT32(DATE32, ::arrow::Date32Type); | |
703 | TRANSFER_INT32(TIME32, ::arrow::Time32Type); | |
704 | TRANSFER_INT64(TIME64, ::arrow::Time64Type); | |
705 | case ::arrow::Type::DATE64: | |
706 | RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result)); | |
707 | break; | |
708 | case ::arrow::Type::FIXED_SIZE_BINARY: | |
709 | case ::arrow::Type::BINARY: | |
710 | case ::arrow::Type::STRING: | |
711 | case ::arrow::Type::LARGE_BINARY: | |
712 | case ::arrow::Type::LARGE_STRING: { | |
713 | RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result)); | |
714 | result = chunked_result; | |
715 | } break; | |
716 | case ::arrow::Type::DECIMAL128: { | |
717 | switch (descr->physical_type()) { | |
718 | case ::parquet::Type::INT32: { | |
719 | auto fn = DecimalIntegerTransfer<Int32Type>; | |
720 | RETURN_NOT_OK(fn(reader, pool, value_type, &result)); | |
721 | } break; | |
722 | case ::parquet::Type::INT64: { | |
723 | auto fn = &DecimalIntegerTransfer<Int64Type>; | |
724 | RETURN_NOT_OK(fn(reader, pool, value_type, &result)); | |
725 | } break; | |
726 | case ::parquet::Type::BYTE_ARRAY: { | |
727 | auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>; | |
728 | RETURN_NOT_OK(fn(reader, pool, value_type, &result)); | |
729 | } break; | |
730 | case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { | |
731 | auto fn = &TransferDecimal<Decimal128Array, FLBAType>; | |
732 | RETURN_NOT_OK(fn(reader, pool, value_type, &result)); | |
733 | } break; | |
734 | default: | |
735 | return Status::Invalid( | |
736 | "Physical type for decimal128 must be int32, int64, byte array, or fixed " | |
737 | "length binary"); | |
738 | } | |
739 | } break; | |
740 | case ::arrow::Type::DECIMAL256: | |
741 | switch (descr->physical_type()) { | |
742 | case ::parquet::Type::BYTE_ARRAY: { | |
743 | auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>; | |
744 | RETURN_NOT_OK(fn(reader, pool, value_type, &result)); | |
745 | } break; | |
746 | case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { | |
747 | auto fn = &TransferDecimal<Decimal256Array, FLBAType>; | |
748 | RETURN_NOT_OK(fn(reader, pool, value_type, &result)); | |
749 | } break; | |
750 | default: | |
751 | return Status::Invalid( | |
752 | "Physical type for decimal256 must be fixed length binary"); | |
753 | } | |
754 | break; | |
755 | ||
756 | case ::arrow::Type::TIMESTAMP: { | |
757 | const ::arrow::TimestampType& timestamp_type = | |
758 | checked_cast<::arrow::TimestampType&>(*value_type); | |
759 | if (descr->physical_type() == ::parquet::Type::INT96) { | |
760 | RETURN_NOT_OK( | |
761 | TransferInt96(reader, pool, value_type, &result, timestamp_type.unit())); | |
762 | } else { | |
763 | switch (timestamp_type.unit()) { | |
764 | case ::arrow::TimeUnit::MILLI: | |
765 | case ::arrow::TimeUnit::MICRO: | |
766 | case ::arrow::TimeUnit::NANO: | |
767 | result = TransferZeroCopy(reader, value_type); | |
768 | break; | |
769 | default: | |
770 | return Status::NotImplemented("TimeUnit not supported"); | |
771 | } | |
772 | } | |
773 | } break; | |
774 | default: | |
775 | return Status::NotImplemented("No support for reading columns of type ", | |
776 | value_type->ToString()); | |
777 | } | |
778 | ||
779 | if (result.kind() == Datum::ARRAY) { | |
780 | *out = std::make_shared<ChunkedArray>(result.make_array()); | |
781 | } else if (result.kind() == Datum::CHUNKED_ARRAY) { | |
782 | *out = result.chunked_array(); | |
783 | } else { | |
784 | DCHECK(false) << "Should be impossible, result was " << result.ToString(); | |
785 | } | |
786 | ||
787 | return Status::OK(); | |
788 | } | |
789 | ||
790 | } // namespace arrow | |
791 | } // namespace parquet |