]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/parquet/arrow/reader_internal.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / parquet / arrow / reader_internal.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/arrow/reader_internal.h"
19
20#include <algorithm>
21#include <climits>
22#include <cstdint>
23#include <cstring>
24#include <memory>
25#include <string>
26#include <type_traits>
27#include <vector>
28
29#include "arrow/array.h"
30#include "arrow/compute/api.h"
31#include "arrow/datum.h"
32#include "arrow/io/memory.h"
33#include "arrow/ipc/reader.h"
34#include "arrow/ipc/writer.h"
35#include "arrow/scalar.h"
36#include "arrow/status.h"
37#include "arrow/table.h"
38#include "arrow/type.h"
39#include "arrow/type_traits.h"
40#include "arrow/util/base64.h"
41#include "arrow/util/bit_util.h"
42#include "arrow/util/checked_cast.h"
43#include "arrow/util/endian.h"
44#include "arrow/util/int_util_internal.h"
45#include "arrow/util/logging.h"
46#include "arrow/util/string_view.h"
47#include "arrow/util/ubsan.h"
48#include "arrow/visitor_inline.h"
49#include "parquet/arrow/reader.h"
50#include "parquet/arrow/schema.h"
51#include "parquet/arrow/schema_internal.h"
52#include "parquet/column_reader.h"
53#include "parquet/platform.h"
54#include "parquet/properties.h"
55#include "parquet/schema.h"
56#include "parquet/statistics.h"
57#include "parquet/types.h"
58// Required after "arrow/util/int_util_internal.h" (for OPTIONAL)
59#include "parquet/windows_compatibility.h"
60
61using arrow::Array;
62using arrow::BooleanArray;
63using arrow::ChunkedArray;
64using arrow::DataType;
65using arrow::Datum;
66using arrow::Decimal128;
67using arrow::Decimal128Array;
68using arrow::Decimal128Type;
69using arrow::Decimal256;
70using arrow::Decimal256Array;
71using arrow::Decimal256Type;
72using arrow::Field;
73using arrow::Int32Array;
74using arrow::ListArray;
75using arrow::MemoryPool;
76using arrow::ResizableBuffer;
77using arrow::Status;
78using arrow::StructArray;
79using arrow::Table;
80using arrow::TimestampArray;
81
82using ::arrow::BitUtil::FromBigEndian;
83using ::arrow::internal::checked_cast;
84using ::arrow::internal::checked_pointer_cast;
85using ::arrow::internal::SafeLeftShift;
86using ::arrow::util::SafeLoadAs;
87
88using parquet::internal::BinaryRecordReader;
89using parquet::internal::DictionaryRecordReader;
90using parquet::internal::RecordReader;
91using parquet::schema::GroupNode;
92using parquet::schema::Node;
93using parquet::schema::PrimitiveNode;
94using ParquetType = parquet::Type;
95
96namespace BitUtil = arrow::BitUtil;
97
98namespace parquet {
99namespace arrow {
100namespace {
101
102template <typename ArrowType>
103using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
104
105template <typename CType, typename StatisticsType>
106Status MakeMinMaxScalar(const StatisticsType& statistics,
107 std::shared_ptr<::arrow::Scalar>* min,
108 std::shared_ptr<::arrow::Scalar>* max) {
109 *min = ::arrow::MakeScalar(static_cast<CType>(statistics.min()));
110 *max = ::arrow::MakeScalar(static_cast<CType>(statistics.max()));
111 return Status::OK();
112}
113
114template <typename CType, typename StatisticsType>
115Status MakeMinMaxTypedScalar(const StatisticsType& statistics,
116 std::shared_ptr<DataType> type,
117 std::shared_ptr<::arrow::Scalar>* min,
118 std::shared_ptr<::arrow::Scalar>* max) {
119 ARROW_ASSIGN_OR_RAISE(*min, ::arrow::MakeScalar(type, statistics.min()));
120 ARROW_ASSIGN_OR_RAISE(*max, ::arrow::MakeScalar(type, statistics.max()));
121 return Status::OK();
122}
123
124template <typename StatisticsType>
125Status MakeMinMaxIntegralScalar(const StatisticsType& statistics,
126 const ::arrow::DataType& arrow_type,
127 std::shared_ptr<::arrow::Scalar>* min,
128 std::shared_ptr<::arrow::Scalar>* max) {
129 const auto column_desc = statistics.descr();
130 const auto& logical_type = column_desc->logical_type();
131 const auto& integer = checked_pointer_cast<const IntLogicalType>(logical_type);
132 const bool is_signed = integer->is_signed();
133
134 switch (integer->bit_width()) {
135 case 8:
136 return is_signed ? MakeMinMaxScalar<int8_t>(statistics, min, max)
137 : MakeMinMaxScalar<uint8_t>(statistics, min, max);
138 case 16:
139 return is_signed ? MakeMinMaxScalar<int16_t>(statistics, min, max)
140 : MakeMinMaxScalar<uint16_t>(statistics, min, max);
141 case 32:
142 return is_signed ? MakeMinMaxScalar<int32_t>(statistics, min, max)
143 : MakeMinMaxScalar<uint32_t>(statistics, min, max);
144 case 64:
145 return is_signed ? MakeMinMaxScalar<int64_t>(statistics, min, max)
146 : MakeMinMaxScalar<uint64_t>(statistics, min, max);
147 }
148
149 return Status::OK();
150}
151
152static Status FromInt32Statistics(const Int32Statistics& statistics,
153 const LogicalType& logical_type,
154 std::shared_ptr<::arrow::Scalar>* min,
155 std::shared_ptr<::arrow::Scalar>* max) {
156 ARROW_ASSIGN_OR_RAISE(auto type, FromInt32(logical_type));
157
158 switch (logical_type.type()) {
159 case LogicalType::Type::INT:
160 return MakeMinMaxIntegralScalar(statistics, *type, min, max);
161 break;
162 case LogicalType::Type::DATE:
163 case LogicalType::Type::TIME:
164 case LogicalType::Type::NONE:
165 return MakeMinMaxTypedScalar<int32_t>(statistics, type, min, max);
166 break;
167 default:
168 break;
169 }
170
171 return Status::NotImplemented("Cannot extract statistics for type ");
172}
173
174static Status FromInt64Statistics(const Int64Statistics& statistics,
175 const LogicalType& logical_type,
176 std::shared_ptr<::arrow::Scalar>* min,
177 std::shared_ptr<::arrow::Scalar>* max) {
178 ARROW_ASSIGN_OR_RAISE(auto type, FromInt64(logical_type));
179
180 switch (logical_type.type()) {
181 case LogicalType::Type::INT:
182 return MakeMinMaxIntegralScalar(statistics, *type, min, max);
183 break;
184 case LogicalType::Type::TIME:
185 case LogicalType::Type::TIMESTAMP:
186 case LogicalType::Type::NONE:
187 return MakeMinMaxTypedScalar<int64_t>(statistics, type, min, max);
188 break;
189 default:
190 break;
191 }
192
193 return Status::NotImplemented("Cannot extract statistics for type ");
194}
195
196template <typename DecimalType>
197Result<std::shared_ptr<::arrow::Scalar>> FromBigEndianString(
198 const std::string& data, std::shared_ptr<DataType> arrow_type) {
199 ARROW_ASSIGN_OR_RAISE(
200 DecimalType decimal,
201 DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
202 static_cast<int32_t>(data.size())));
203 return ::arrow::MakeScalar(std::move(arrow_type), decimal);
204}
205
206// Extracts Min and Max scalar from bytes like types (i.e. types where
207// decimal is encoded as little endian.
208Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics,
209 const LogicalType& logical_type,
210 std::shared_ptr<::arrow::Scalar>* min,
211 std::shared_ptr<::arrow::Scalar>* max) {
212 const DecimalLogicalType& decimal_type =
213 checked_cast<const DecimalLogicalType&>(logical_type);
214
215 Result<std::shared_ptr<DataType>> maybe_type =
216 Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
217 std::shared_ptr<DataType> arrow_type;
218 if (maybe_type.ok()) {
219 arrow_type = maybe_type.ValueOrDie();
220 ARROW_ASSIGN_OR_RAISE(
221 *min, FromBigEndianString<Decimal128>(statistics.EncodeMin(), arrow_type));
222 ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal128>(statistics.EncodeMax(),
223 std::move(arrow_type)));
224 return Status::OK();
225 }
226 // Fallback to see if Decimal256 can represent the type.
227 ARROW_ASSIGN_OR_RAISE(
228 arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale()));
229 ARROW_ASSIGN_OR_RAISE(
230 *min, FromBigEndianString<Decimal256>(statistics.EncodeMin(), arrow_type));
231 ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal256>(statistics.EncodeMax(),
232 std::move(arrow_type)));
233
234 return Status::OK();
235}
236
237Status ByteArrayStatisticsAsScalars(const Statistics& statistics,
238 std::shared_ptr<::arrow::Scalar>* min,
239 std::shared_ptr<::arrow::Scalar>* max) {
240 auto logical_type = statistics.descr()->logical_type();
241 if (logical_type->type() == LogicalType::Type::DECIMAL) {
242 return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min, max);
243 }
244 std::shared_ptr<::arrow::DataType> type;
245 if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
246 type = ::arrow::fixed_size_binary(statistics.descr()->type_length());
247 } else {
248 type = logical_type->type() == LogicalType::Type::STRING ? ::arrow::utf8()
249 : ::arrow::binary();
250 }
251 ARROW_ASSIGN_OR_RAISE(
252 *min, ::arrow::MakeScalar(type, Buffer::FromString(statistics.EncodeMin())));
253 ARROW_ASSIGN_OR_RAISE(
254 *max, ::arrow::MakeScalar(type, Buffer::FromString(statistics.EncodeMax())));
255
256 return Status::OK();
257}
258
259} // namespace
260
261Status StatisticsAsScalars(const Statistics& statistics,
262 std::shared_ptr<::arrow::Scalar>* min,
263 std::shared_ptr<::arrow::Scalar>* max) {
264 if (!statistics.HasMinMax()) {
265 return Status::Invalid("Statistics has no min max.");
266 }
267
268 auto column_desc = statistics.descr();
269 if (column_desc == nullptr) {
270 return Status::Invalid("Statistics carries no descriptor, can't infer arrow type.");
271 }
272
273 auto physical_type = column_desc->physical_type();
274 auto logical_type = column_desc->logical_type();
275 switch (physical_type) {
276 case Type::BOOLEAN:
277 return MakeMinMaxScalar<bool, BoolStatistics>(
278 checked_cast<const BoolStatistics&>(statistics), min, max);
279 case Type::FLOAT:
280 return MakeMinMaxScalar<float, FloatStatistics>(
281 checked_cast<const FloatStatistics&>(statistics), min, max);
282 case Type::DOUBLE:
283 return MakeMinMaxScalar<double, DoubleStatistics>(
284 checked_cast<const DoubleStatistics&>(statistics), min, max);
285 case Type::INT32:
286 return FromInt32Statistics(checked_cast<const Int32Statistics&>(statistics),
287 *logical_type, min, max);
288 case Type::INT64:
289 return FromInt64Statistics(checked_cast<const Int64Statistics&>(statistics),
290 *logical_type, min, max);
291 case Type::BYTE_ARRAY:
292 case Type::FIXED_LEN_BYTE_ARRAY:
293 return ByteArrayStatisticsAsScalars(statistics, min, max);
294 default:
295 return Status::NotImplemented("Extract statistics unsupported for physical_type ",
296 physical_type, " unsupported.");
297 }
298
299 return Status::OK();
300}
301
302// ----------------------------------------------------------------------
303// Primitive types
304
305namespace {
306
307template <typename ArrowType, typename ParquetType>
308Status TransferInt(RecordReader* reader, MemoryPool* pool,
309 const std::shared_ptr<DataType>& type, Datum* out) {
310 using ArrowCType = typename ArrowType::c_type;
311 using ParquetCType = typename ParquetType::c_type;
312 int64_t length = reader->values_written();
313 ARROW_ASSIGN_OR_RAISE(auto data,
314 ::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool));
315
316 auto values = reinterpret_cast<const ParquetCType*>(reader->values());
317 auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
318 std::copy(values, values + length, out_ptr);
319 *out = std::make_shared<ArrayType<ArrowType>>(
320 type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count());
321 return Status::OK();
322}
323
324std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
325 const std::shared_ptr<DataType>& type) {
326 std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
327 reader->ReleaseValues()};
328 auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(),
329 buffers, reader->null_count());
330 return ::arrow::MakeArray(data);
331}
332
333Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
334 int64_t length = reader->values_written();
335
336 const int64_t buffer_size = BitUtil::BytesForBits(length);
337 ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(buffer_size, pool));
338
339 // Transfer boolean values to packed bitmap
340 auto values = reinterpret_cast<const bool*>(reader->values());
341 uint8_t* data_ptr = data->mutable_data();
342 memset(data_ptr, 0, buffer_size);
343
344 for (int64_t i = 0; i < length; i++) {
345 if (values[i]) {
346 ::arrow::BitUtil::SetBit(data_ptr, i);
347 }
348 }
349
350 *out = std::make_shared<BooleanArray>(length, std::move(data), reader->ReleaseIsValid(),
351 reader->null_count());
352 return Status::OK();
353}
354
355Status TransferInt96(RecordReader* reader, MemoryPool* pool,
356 const std::shared_ptr<DataType>& type, Datum* out,
357 const ::arrow::TimeUnit::type int96_arrow_time_unit) {
358 int64_t length = reader->values_written();
359 auto values = reinterpret_cast<const Int96*>(reader->values());
360 ARROW_ASSIGN_OR_RAISE(auto data,
361 ::arrow::AllocateBuffer(length * sizeof(int64_t), pool));
362 auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
363 for (int64_t i = 0; i < length; i++) {
364 if (values[i].value[2] == 0) {
365 // Happens for null entries: avoid triggering UBSAN as that Int96 timestamp
366 // isn't representable as a 64-bit Unix timestamp.
367 *data_ptr++ = 0;
368 } else {
369 switch (int96_arrow_time_unit) {
370 case ::arrow::TimeUnit::NANO:
371 *data_ptr++ = Int96GetNanoSeconds(values[i]);
372 break;
373 case ::arrow::TimeUnit::MICRO:
374 *data_ptr++ = Int96GetMicroSeconds(values[i]);
375 break;
376 case ::arrow::TimeUnit::MILLI:
377 *data_ptr++ = Int96GetMilliSeconds(values[i]);
378 break;
379 case ::arrow::TimeUnit::SECOND:
380 *data_ptr++ = Int96GetSeconds(values[i]);
381 break;
382 }
383 }
384 }
385 *out = std::make_shared<TimestampArray>(type, length, std::move(data),
386 reader->ReleaseIsValid(), reader->null_count());
387 return Status::OK();
388}
389
390Status TransferDate64(RecordReader* reader, MemoryPool* pool,
391 const std::shared_ptr<DataType>& type, Datum* out) {
392 int64_t length = reader->values_written();
393 auto values = reinterpret_cast<const int32_t*>(reader->values());
394
395 ARROW_ASSIGN_OR_RAISE(auto data,
396 ::arrow::AllocateBuffer(length * sizeof(int64_t), pool));
397 auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
398
399 for (int64_t i = 0; i < length; i++) {
400 *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
401 }
402
403 *out = std::make_shared<::arrow::Date64Array>(
404 type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count());
405 return Status::OK();
406}
407
408// ----------------------------------------------------------------------
409// Binary, direct to dictionary-encoded
410
411Status TransferDictionary(RecordReader* reader,
412 const std::shared_ptr<DataType>& logical_value_type,
413 std::shared_ptr<ChunkedArray>* out) {
414 auto dict_reader = dynamic_cast<DictionaryRecordReader*>(reader);
415 DCHECK(dict_reader);
416 *out = dict_reader->GetResult();
417 if (!logical_value_type->Equals(*(*out)->type())) {
418 ARROW_ASSIGN_OR_RAISE(*out, (*out)->View(logical_value_type));
419 }
420 return Status::OK();
421}
422
423Status TransferBinary(RecordReader* reader, MemoryPool* pool,
424 const std::shared_ptr<DataType>& logical_value_type,
425 std::shared_ptr<ChunkedArray>* out) {
426 if (reader->read_dictionary()) {
427 return TransferDictionary(
428 reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), out);
429 }
430 ::arrow::compute::ExecContext ctx(pool);
431 ::arrow::compute::CastOptions cast_options;
432 cast_options.allow_invalid_utf8 = true; // avoid spending time validating UTF8 data
433
434 auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
435 DCHECK(binary_reader);
436 auto chunks = binary_reader->GetBuilderChunks();
437 for (auto& chunk : chunks) {
438 if (!chunk->type()->Equals(*logical_value_type)) {
439 // XXX: if a LargeBinary chunk is larger than 2GB, the MSBs of offsets
440 // will be lost because they are first created as int32 and then cast to int64.
441 ARROW_ASSIGN_OR_RAISE(
442 chunk, ::arrow::compute::Cast(*chunk, logical_value_type, cast_options, &ctx));
443 }
444 }
445 *out = std::make_shared<ChunkedArray>(chunks, logical_value_type);
446 return Status::OK();
447}
448
449// ----------------------------------------------------------------------
450// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || Decimal256
451
452template <typename DecimalType>
453Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
454 uint8_t* out_buf) {
455 ARROW_ASSIGN_OR_RAISE(DecimalType t, DecimalType::FromBigEndian(value, byte_width));
456 t.ToBytes(out_buf);
457 return ::arrow::Status::OK();
458}
459
460template <typename DecimalArrayType>
461struct DecimalTypeTrait;
462
463template <>
464struct DecimalTypeTrait<::arrow::Decimal128Array> {
465 using value = ::arrow::Decimal128;
466};
467
468template <>
469struct DecimalTypeTrait<::arrow::Decimal256Array> {
470 using value = ::arrow::Decimal256;
471};
472
473template <typename DecimalArrayType, typename ParquetType>
474struct DecimalConverter {
475 static inline Status ConvertToDecimal(const Array& array,
476 const std::shared_ptr<DataType>&,
477 MemoryPool* pool, std::shared_ptr<Array>*) {
478 return Status::NotImplemented("not implemented");
479 }
480};
481
482template <typename DecimalArrayType>
483struct DecimalConverter<DecimalArrayType, FLBAType> {
484 static inline Status ConvertToDecimal(const Array& array,
485 const std::shared_ptr<DataType>& type,
486 MemoryPool* pool, std::shared_ptr<Array>* out) {
487 const auto& fixed_size_binary_array =
488 checked_cast<const ::arrow::FixedSizeBinaryArray&>(array);
489
490 // The byte width of each decimal value
491 const int32_t type_length =
492 checked_cast<const ::arrow::DecimalType&>(*type).byte_width();
493
494 // number of elements in the entire array
495 const int64_t length = fixed_size_binary_array.length();
496
497 // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
498 // this will be different from the decimal array width because we write the minimum
499 // number of bytes necessary to represent a given precision
500 const int32_t byte_width =
501 checked_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
502 .byte_width();
503 // allocate memory for the decimal array
504 ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
505
506 // raw bytes that we can write to
507 uint8_t* out_ptr = data->mutable_data();
508
509 // convert each FixedSizeBinary value to valid decimal bytes
510 const int64_t null_count = fixed_size_binary_array.null_count();
511
512 using DecimalType = typename DecimalTypeTrait<DecimalArrayType>::value;
513 if (null_count > 0) {
514 for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
515 if (!fixed_size_binary_array.IsNull(i)) {
516 RETURN_NOT_OK(RawBytesToDecimalBytes<DecimalType>(
517 fixed_size_binary_array.GetValue(i), byte_width, out_ptr));
518 } else {
519 std::memset(out_ptr, 0, type_length);
520 }
521 }
522 } else {
523 for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
524 RETURN_NOT_OK(RawBytesToDecimalBytes<DecimalType>(
525 fixed_size_binary_array.GetValue(i), byte_width, out_ptr));
526 }
527 }
528
529 *out = std::make_shared<DecimalArrayType>(
530 type, length, std::move(data), fixed_size_binary_array.null_bitmap(), null_count);
531
532 return Status::OK();
533 }
534};
535
536template <typename DecimalArrayType>
537struct DecimalConverter<DecimalArrayType, ByteArrayType> {
538 static inline Status ConvertToDecimal(const Array& array,
539 const std::shared_ptr<DataType>& type,
540 MemoryPool* pool, std::shared_ptr<Array>* out) {
541 const auto& binary_array = checked_cast<const ::arrow::BinaryArray&>(array);
542 const int64_t length = binary_array.length();
543
544 const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*type);
545 const int64_t type_length = decimal_type.byte_width();
546
547 ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
548
549 // raw bytes that we can write to
550 uint8_t* out_ptr = data->mutable_data();
551
552 const int64_t null_count = binary_array.null_count();
553
554 // convert each BinaryArray value to valid decimal bytes
555 for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
556 int32_t record_len = 0;
557 const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
558
559 if (record_len < 0 || record_len > type_length) {
560 return Status::Invalid("Invalid BYTE_ARRAY length for ", type->ToString());
561 }
562
563 auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
564 out_ptr_view[0] = 0;
565 out_ptr_view[1] = 0;
566
567 // only convert rows that are not null if there are nulls, or
568 // all rows, if there are not
569 if ((null_count > 0 && !binary_array.IsNull(i)) || null_count <= 0) {
570 using DecimalType = typename DecimalTypeTrait<DecimalArrayType>::value;
571 RETURN_NOT_OK(
572 RawBytesToDecimalBytes<DecimalType>(record_loc, record_len, out_ptr));
573 }
574 }
575 *out = std::make_shared<DecimalArrayType>(type, length, std::move(data),
576 binary_array.null_bitmap(), null_count);
577 return Status::OK();
578 }
579};
580
581/// \brief Convert an Int32 or Int64 array into a Decimal128Array
582/// The parquet spec allows systems to write decimals in int32, int64 if the values are
583/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
584/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
585template <
586 typename ParquetIntegerType,
587 typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, Int32Type>::value ||
588 std::is_same<ParquetIntegerType, Int64Type>::value>>
589static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
590 const std::shared_ptr<DataType>& type, Datum* out) {
591 // Decimal128 and Decimal256 are only Arrow constructs. Parquet does not
592 // specifically distinguish between decimal byte widths.
593 // Decimal256 isn't relevant here because the Arrow-Parquet C++ bindings never
594 // write Decimal values as integers and if the decimal value can fit in an
595 // integer it is wasteful to use Decimal256. Put another way, the only
596 // way an integer column could be construed as Decimal256 is if an arrow
597 // schema was stored as metadata in the file indicating the column was
598 // Decimal256. The current Arrow-Parquet C++ bindings will never do this.
599 DCHECK(type->id() == ::arrow::Type::DECIMAL128);
600
601 const int64_t length = reader->values_written();
602
603 using ElementType = typename ParquetIntegerType::c_type;
604 static_assert(std::is_same<ElementType, int32_t>::value ||
605 std::is_same<ElementType, int64_t>::value,
606 "ElementType must be int32_t or int64_t");
607
608 const auto values = reinterpret_cast<const ElementType*>(reader->values());
609
610 const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*type);
611 const int64_t type_length = decimal_type.byte_width();
612
613 ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
614 uint8_t* out_ptr = data->mutable_data();
615
616 using ::arrow::BitUtil::FromLittleEndian;
617
618 for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
619 // sign/zero extend int32_t values, otherwise a no-op
620 const auto value = static_cast<int64_t>(values[i]);
621
622 ::arrow::Decimal128 decimal(value);
623 decimal.ToBytes(out_ptr);
624 }
625
626 if (reader->nullable_values()) {
627 std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
628 *out = std::make_shared<Decimal128Array>(type, length, std::move(data), is_valid,
629 reader->null_count());
630 } else {
631 *out = std::make_shared<Decimal128Array>(type, length, std::move(data));
632 }
633 return Status::OK();
634}
635
636/// \brief Convert an arrow::BinaryArray to an arrow::Decimal{128,256}Array
637/// We do this by:
638/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
639/// 2. Allocating a buffer for the arrow::Decimal{128,256}Array
640/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
641/// representing the high and low bits of each decimal value.
642template <typename DecimalArrayType, typename ParquetType>
643Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
644 const std::shared_ptr<DataType>& type, Datum* out) {
645 auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
646 DCHECK(binary_reader);
647 ::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
648 for (size_t i = 0; i < chunks.size(); ++i) {
649 std::shared_ptr<Array> chunk_as_decimal;
650 auto fn = &DecimalConverter<DecimalArrayType, ParquetType>::ConvertToDecimal;
651 RETURN_NOT_OK(fn(*chunks[i], type, pool, &chunk_as_decimal));
652 // Replace the chunk, which will hopefully also free memory as we go
653 chunks[i] = chunk_as_decimal;
654 }
655 *out = std::make_shared<ChunkedArray>(chunks, type);
656 return Status::OK();
657}
658
659} // namespace
660
661#define TRANSFER_INT32(ENUM, ArrowType) \
662 case ::arrow::Type::ENUM: { \
663 Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type, &result); \
664 RETURN_NOT_OK(s); \
665 } break;
666
667#define TRANSFER_INT64(ENUM, ArrowType) \
668 case ::arrow::Type::ENUM: { \
669 Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_type, &result); \
670 RETURN_NOT_OK(s); \
671 } break;
672
673Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> value_type,
674 const ColumnDescriptor* descr, MemoryPool* pool,
675 std::shared_ptr<ChunkedArray>* out) {
676 Datum result;
677 std::shared_ptr<ChunkedArray> chunked_result;
678 switch (value_type->id()) {
679 case ::arrow::Type::DICTIONARY: {
680 RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result));
681 result = chunked_result;
682 } break;
683 case ::arrow::Type::NA: {
684 result = std::make_shared<::arrow::NullArray>(reader->values_written());
685 break;
686 }
687 case ::arrow::Type::INT32:
688 case ::arrow::Type::INT64:
689 case ::arrow::Type::FLOAT:
690 case ::arrow::Type::DOUBLE:
691 result = TransferZeroCopy(reader, value_type);
692 break;
693 case ::arrow::Type::BOOL:
694 RETURN_NOT_OK(TransferBool(reader, pool, &result));
695 break;
696 TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
697 TRANSFER_INT32(INT8, ::arrow::Int8Type);
698 TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
699 TRANSFER_INT32(INT16, ::arrow::Int16Type);
700 TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
701 TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
702 TRANSFER_INT32(DATE32, ::arrow::Date32Type);
703 TRANSFER_INT32(TIME32, ::arrow::Time32Type);
704 TRANSFER_INT64(TIME64, ::arrow::Time64Type);
705 case ::arrow::Type::DATE64:
706 RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
707 break;
708 case ::arrow::Type::FIXED_SIZE_BINARY:
709 case ::arrow::Type::BINARY:
710 case ::arrow::Type::STRING:
711 case ::arrow::Type::LARGE_BINARY:
712 case ::arrow::Type::LARGE_STRING: {
713 RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result));
714 result = chunked_result;
715 } break;
716 case ::arrow::Type::DECIMAL128: {
717 switch (descr->physical_type()) {
718 case ::parquet::Type::INT32: {
719 auto fn = DecimalIntegerTransfer<Int32Type>;
720 RETURN_NOT_OK(fn(reader, pool, value_type, &result));
721 } break;
722 case ::parquet::Type::INT64: {
723 auto fn = &DecimalIntegerTransfer<Int64Type>;
724 RETURN_NOT_OK(fn(reader, pool, value_type, &result));
725 } break;
726 case ::parquet::Type::BYTE_ARRAY: {
727 auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
728 RETURN_NOT_OK(fn(reader, pool, value_type, &result));
729 } break;
730 case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
731 auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
732 RETURN_NOT_OK(fn(reader, pool, value_type, &result));
733 } break;
734 default:
735 return Status::Invalid(
736 "Physical type for decimal128 must be int32, int64, byte array, or fixed "
737 "length binary");
738 }
739 } break;
740 case ::arrow::Type::DECIMAL256:
741 switch (descr->physical_type()) {
742 case ::parquet::Type::BYTE_ARRAY: {
743 auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
744 RETURN_NOT_OK(fn(reader, pool, value_type, &result));
745 } break;
746 case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
747 auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
748 RETURN_NOT_OK(fn(reader, pool, value_type, &result));
749 } break;
750 default:
751 return Status::Invalid(
752 "Physical type for decimal256 must be fixed length binary");
753 }
754 break;
755
756 case ::arrow::Type::TIMESTAMP: {
757 const ::arrow::TimestampType& timestamp_type =
758 checked_cast<::arrow::TimestampType&>(*value_type);
759 if (descr->physical_type() == ::parquet::Type::INT96) {
760 RETURN_NOT_OK(
761 TransferInt96(reader, pool, value_type, &result, timestamp_type.unit()));
762 } else {
763 switch (timestamp_type.unit()) {
764 case ::arrow::TimeUnit::MILLI:
765 case ::arrow::TimeUnit::MICRO:
766 case ::arrow::TimeUnit::NANO:
767 result = TransferZeroCopy(reader, value_type);
768 break;
769 default:
770 return Status::NotImplemented("TimeUnit not supported");
771 }
772 }
773 } break;
774 default:
775 return Status::NotImplemented("No support for reading columns of type ",
776 value_type->ToString());
777 }
778
779 if (result.kind() == Datum::ARRAY) {
780 *out = std::make_shared<ChunkedArray>(result.make_array());
781 } else if (result.kind() == Datum::CHUNKED_ARRAY) {
782 *out = result.chunked_array();
783 } else {
784 DCHECK(false) << "Should be impossible, result was " << result.ToString();
785 }
786
787 return Status::OK();
788}
789
790} // namespace arrow
791} // namespace parquet