1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "parquet/encoding.h"
29 #include "arrow/array.h"
30 #include "arrow/array/builder_dict.h"
31 #include "arrow/stl_allocator.h"
32 #include "arrow/type_traits.h"
33 #include "arrow/util/bit_run_reader.h"
34 #include "arrow/util/bit_stream_utils.h"
35 #include "arrow/util/bit_util.h"
36 #include "arrow/util/bitmap_ops.h"
37 #include "arrow/util/bitmap_writer.h"
38 #include "arrow/util/byte_stream_split.h"
39 #include "arrow/util/checked_cast.h"
40 #include "arrow/util/hashing.h"
41 #include "arrow/util/logging.h"
42 #include "arrow/util/rle_encoding.h"
43 #include "arrow/util/ubsan.h"
44 #include "arrow/visitor_inline.h"
45 #include "parquet/exception.h"
46 #include "parquet/platform.h"
47 #include "parquet/schema.h"
48 #include "parquet/types.h"
50 namespace BitUtil
= arrow::BitUtil
;
53 using arrow::VisitNullBitmapInline
;
54 using arrow::internal::checked_cast
;
57 using ArrowPoolVector
= std::vector
<T
, ::arrow::stl::allocator
<T
>>;
62 constexpr int64_t kInMemoryDefaultCapacity
= 1024;
63 // The Parquet spec isn't very clear whether ByteArray lengths are signed or
64 // unsigned, but the Java implementation uses signed ints.
65 constexpr size_t kMaxByteArraySize
= std::numeric_limits
<int32_t>::max();
67 class EncoderImpl
: virtual public Encoder
{
69 EncoderImpl(const ColumnDescriptor
* descr
, Encoding::type encoding
, MemoryPool
* pool
)
73 type_length_(descr
? descr
->type_length() : -1) {}
75 Encoding::type
encoding() const override
{ return encoding_
; }
77 MemoryPool
* memory_pool() const override
{ return pool_
; }
80 // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
81 const ColumnDescriptor
* descr_
;
82 const Encoding::type encoding_
;
85 /// Type length from descr
89 // ----------------------------------------------------------------------
90 // Plain encoder implementation
92 template <typename DType
>
93 class PlainEncoder
: public EncoderImpl
, virtual public TypedEncoder
<DType
> {
95 using T
= typename
DType::c_type
;
97 explicit PlainEncoder(const ColumnDescriptor
* descr
, MemoryPool
* pool
)
98 : EncoderImpl(descr
, Encoding::PLAIN
, pool
), sink_(pool
) {}
100 int64_t EstimatedDataEncodedSize() override
{ return sink_
.length(); }
102 std::shared_ptr
<Buffer
> FlushValues() override
{
103 std::shared_ptr
<Buffer
> buffer
;
104 PARQUET_THROW_NOT_OK(sink_
.Finish(&buffer
));
108 using TypedEncoder
<DType
>::Put
;
110 void Put(const T
* buffer
, int num_values
) override
;
112 void Put(const ::arrow::Array
& values
) override
;
114 void PutSpaced(const T
* src
, int num_values
, const uint8_t* valid_bits
,
115 int64_t valid_bits_offset
) override
{
116 if (valid_bits
!= NULLPTR
) {
117 PARQUET_ASSIGN_OR_THROW(auto buffer
, ::arrow::AllocateBuffer(num_values
* sizeof(T
),
118 this->memory_pool()));
119 T
* data
= reinterpret_cast<T
*>(buffer
->mutable_data());
120 int num_valid_values
= ::arrow::util::internal::SpacedCompress
<T
>(
121 src
, num_values
, valid_bits
, valid_bits_offset
, data
);
122 Put(data
, num_valid_values
);
124 Put(src
, num_values
);
128 void UnsafePutByteArray(const void* data
, uint32_t length
) {
129 DCHECK(length
== 0 || data
!= nullptr) << "Value ptr cannot be NULL";
130 sink_
.UnsafeAppend(&length
, sizeof(uint32_t));
131 sink_
.UnsafeAppend(data
, static_cast<int64_t>(length
));
134 void Put(const ByteArray
& val
) {
135 // Write the result to the output stream
136 const int64_t increment
= static_cast<int64_t>(val
.len
+ sizeof(uint32_t));
137 if (ARROW_PREDICT_FALSE(sink_
.length() + increment
> sink_
.capacity())) {
138 PARQUET_THROW_NOT_OK(sink_
.Reserve(increment
));
140 UnsafePutByteArray(val
.ptr
, val
.len
);
144 template <typename ArrayType
>
145 void PutBinaryArray(const ArrayType
& array
) {
146 const int64_t total_bytes
=
147 array
.value_offset(array
.length()) - array
.value_offset(0);
148 PARQUET_THROW_NOT_OK(sink_
.Reserve(total_bytes
+ array
.length() * sizeof(uint32_t)));
150 PARQUET_THROW_NOT_OK(::arrow::VisitArrayDataInline
<typename
ArrayType::TypeClass
>(
152 [&](::arrow::util::string_view view
) {
153 if (ARROW_PREDICT_FALSE(view
.size() > kMaxByteArraySize
)) {
154 return Status::Invalid("Parquet cannot store strings with size 2GB or more");
156 UnsafePutByteArray(view
.data(), static_cast<uint32_t>(view
.size()));
159 []() { return Status::OK(); }));
162 ::arrow::BufferBuilder sink_
;
165 template <typename DType
>
166 void PlainEncoder
<DType
>::Put(const T
* buffer
, int num_values
) {
167 if (num_values
> 0) {
168 PARQUET_THROW_NOT_OK(sink_
.Append(buffer
, num_values
* sizeof(T
)));
173 inline void PlainEncoder
<ByteArrayType
>::Put(const ByteArray
* src
, int num_values
) {
174 for (int i
= 0; i
< num_values
; ++i
) {
179 template <typename ArrayType
>
180 void DirectPutImpl(const ::arrow::Array
& values
, ::arrow::BufferBuilder
* sink
) {
181 if (values
.type_id() != ArrayType::TypeClass::type_id
) {
182 std::string type_name
= ArrayType::TypeClass::type_name();
183 throw ParquetException("direct put to " + type_name
+ " from " +
184 values
.type()->ToString() + " not supported");
187 using value_type
= typename
ArrayType::value_type
;
188 constexpr auto value_size
= sizeof(value_type
);
189 auto raw_values
= checked_cast
<const ArrayType
&>(values
).raw_values();
191 if (values
.null_count() == 0) {
192 // no nulls, just dump the data
193 PARQUET_THROW_NOT_OK(sink
->Append(raw_values
, values
.length() * value_size
));
195 PARQUET_THROW_NOT_OK(
196 sink
->Reserve((values
.length() - values
.null_count()) * value_size
));
198 for (int64_t i
= 0; i
< values
.length(); i
++) {
199 if (values
.IsValid(i
)) {
200 sink
->UnsafeAppend(&raw_values
[i
], value_size
);
207 void PlainEncoder
<Int32Type
>::Put(const ::arrow::Array
& values
) {
208 DirectPutImpl
<::arrow::Int32Array
>(values
, &sink_
);
212 void PlainEncoder
<Int64Type
>::Put(const ::arrow::Array
& values
) {
213 DirectPutImpl
<::arrow::Int64Array
>(values
, &sink_
);
217 void PlainEncoder
<Int96Type
>::Put(const ::arrow::Array
& values
) {
218 ParquetException::NYI("direct put to Int96");
222 void PlainEncoder
<FloatType
>::Put(const ::arrow::Array
& values
) {
223 DirectPutImpl
<::arrow::FloatArray
>(values
, &sink_
);
227 void PlainEncoder
<DoubleType
>::Put(const ::arrow::Array
& values
) {
228 DirectPutImpl
<::arrow::DoubleArray
>(values
, &sink_
);
231 template <typename DType
>
232 void PlainEncoder
<DType
>::Put(const ::arrow::Array
& values
) {
233 ParquetException::NYI("direct put of " + values
.type()->ToString());
236 void AssertBaseBinary(const ::arrow::Array
& values
) {
237 if (!::arrow::is_base_binary_like(values
.type_id())) {
238 throw ParquetException("Only BaseBinaryArray and subclasses supported");
243 inline void PlainEncoder
<ByteArrayType
>::Put(const ::arrow::Array
& values
) {
244 AssertBaseBinary(values
);
246 if (::arrow::is_binary_like(values
.type_id())) {
247 PutBinaryArray(checked_cast
<const ::arrow::BinaryArray
&>(values
));
249 DCHECK(::arrow::is_large_binary_like(values
.type_id()));
250 PutBinaryArray(checked_cast
<const ::arrow::LargeBinaryArray
&>(values
));
254 void AssertFixedSizeBinary(const ::arrow::Array
& values
, int type_length
) {
255 if (values
.type_id() != ::arrow::Type::FIXED_SIZE_BINARY
&&
256 values
.type_id() != ::arrow::Type::DECIMAL
) {
257 throw ParquetException("Only FixedSizeBinaryArray and subclasses supported");
259 if (checked_cast
<const ::arrow::FixedSizeBinaryType
&>(*values
.type()).byte_width() !=
261 throw ParquetException("Size mismatch: " + values
.type()->ToString() +
262 " should have been " + std::to_string(type_length
) + " wide");
267 inline void PlainEncoder
<FLBAType
>::Put(const ::arrow::Array
& values
) {
268 AssertFixedSizeBinary(values
, descr_
->type_length());
269 const auto& data
= checked_cast
<const ::arrow::FixedSizeBinaryArray
&>(values
);
271 if (data
.null_count() == 0) {
272 // no nulls, just dump the data
273 PARQUET_THROW_NOT_OK(
274 sink_
.Append(data
.raw_values(), data
.length() * data
.byte_width()));
276 const int64_t total_bytes
=
277 data
.length() * data
.byte_width() - data
.null_count() * data
.byte_width();
278 PARQUET_THROW_NOT_OK(sink_
.Reserve(total_bytes
));
279 for (int64_t i
= 0; i
< data
.length(); i
++) {
280 if (data
.IsValid(i
)) {
281 sink_
.UnsafeAppend(data
.Value(i
), data
.byte_width());
288 inline void PlainEncoder
<FLBAType
>::Put(const FixedLenByteArray
* src
, int num_values
) {
289 if (descr_
->type_length() == 0) {
292 for (int i
= 0; i
< num_values
; ++i
) {
293 // Write the result to the output stream
294 DCHECK(src
[i
].ptr
!= nullptr) << "Value ptr cannot be NULL";
295 PARQUET_THROW_NOT_OK(sink_
.Append(src
[i
].ptr
, descr_
->type_length()));
300 class PlainEncoder
<BooleanType
> : public EncoderImpl
, virtual public BooleanEncoder
{
302 explicit PlainEncoder(const ColumnDescriptor
* descr
, MemoryPool
* pool
)
303 : EncoderImpl(descr
, Encoding::PLAIN
, pool
),
304 bits_available_(kInMemoryDefaultCapacity
* 8),
305 bits_buffer_(AllocateBuffer(pool
, kInMemoryDefaultCapacity
)),
307 bit_writer_(bits_buffer_
->mutable_data(),
308 static_cast<int>(bits_buffer_
->size())) {}
310 int64_t EstimatedDataEncodedSize() override
;
311 std::shared_ptr
<Buffer
> FlushValues() override
;
313 void Put(const bool* src
, int num_values
) override
;
315 void Put(const std::vector
<bool>& src
, int num_values
) override
;
317 void PutSpaced(const bool* src
, int num_values
, const uint8_t* valid_bits
,
318 int64_t valid_bits_offset
) override
{
319 if (valid_bits
!= NULLPTR
) {
320 PARQUET_ASSIGN_OR_THROW(auto buffer
, ::arrow::AllocateBuffer(num_values
* sizeof(T
),
321 this->memory_pool()));
322 T
* data
= reinterpret_cast<T
*>(buffer
->mutable_data());
323 int num_valid_values
= ::arrow::util::internal::SpacedCompress
<T
>(
324 src
, num_values
, valid_bits
, valid_bits_offset
, data
);
325 Put(data
, num_valid_values
);
327 Put(src
, num_values
);
331 void Put(const ::arrow::Array
& values
) override
{
332 if (values
.type_id() != ::arrow::Type::BOOL
) {
333 throw ParquetException("direct put to boolean from " + values
.type()->ToString() +
337 const auto& data
= checked_cast
<const ::arrow::BooleanArray
&>(values
);
338 if (data
.null_count() == 0) {
339 PARQUET_THROW_NOT_OK(sink_
.Reserve(BitUtil::BytesForBits(data
.length())));
340 // no nulls, just dump the data
341 ::arrow::internal::CopyBitmap(data
.data()->GetValues
<uint8_t>(1), data
.offset(),
342 data
.length(), sink_
.mutable_data(), sink_
.length());
344 auto n_valid
= BitUtil::BytesForBits(data
.length() - data
.null_count());
345 PARQUET_THROW_NOT_OK(sink_
.Reserve(n_valid
));
346 ::arrow::internal::FirstTimeBitmapWriter
writer(sink_
.mutable_data(),
347 sink_
.length(), n_valid
);
349 for (int64_t i
= 0; i
< data
.length(); i
++) {
350 if (data
.IsValid(i
)) {
361 sink_
.UnsafeAdvance(data
.length());
366 std::shared_ptr
<ResizableBuffer
> bits_buffer_
;
367 ::arrow::BufferBuilder sink_
;
368 ::arrow::BitUtil::BitWriter bit_writer_
;
370 template <typename SequenceType
>
371 void PutImpl(const SequenceType
& src
, int num_values
);
374 template <typename SequenceType
>
375 void PlainEncoder
<BooleanType
>::PutImpl(const SequenceType
& src
, int num_values
) {
377 if (bits_available_
> 0) {
378 int bits_to_write
= std::min(bits_available_
, num_values
);
379 for (int i
= 0; i
< bits_to_write
; i
++) {
380 bit_writer_
.PutValue(src
[i
], 1);
382 bits_available_
-= bits_to_write
;
383 bit_offset
= bits_to_write
;
385 if (bits_available_
== 0) {
387 PARQUET_THROW_NOT_OK(
388 sink_
.Append(bit_writer_
.buffer(), bit_writer_
.bytes_written()));
393 int bits_remaining
= num_values
- bit_offset
;
394 while (bit_offset
< num_values
) {
395 bits_available_
= static_cast<int>(bits_buffer_
->size()) * 8;
397 int bits_to_write
= std::min(bits_available_
, bits_remaining
);
398 for (int i
= bit_offset
; i
< bit_offset
+ bits_to_write
; i
++) {
399 bit_writer_
.PutValue(src
[i
], 1);
401 bit_offset
+= bits_to_write
;
402 bits_available_
-= bits_to_write
;
403 bits_remaining
-= bits_to_write
;
405 if (bits_available_
== 0) {
407 PARQUET_THROW_NOT_OK(
408 sink_
.Append(bit_writer_
.buffer(), bit_writer_
.bytes_written()));
414 int64_t PlainEncoder
<BooleanType
>::EstimatedDataEncodedSize() {
415 int64_t position
= sink_
.length();
416 return position
+ bit_writer_
.bytes_written();
419 std::shared_ptr
<Buffer
> PlainEncoder
<BooleanType
>::FlushValues() {
420 if (bits_available_
> 0) {
422 PARQUET_THROW_NOT_OK(sink_
.Append(bit_writer_
.buffer(), bit_writer_
.bytes_written()));
424 bits_available_
= static_cast<int>(bits_buffer_
->size()) * 8;
427 std::shared_ptr
<Buffer
> buffer
;
428 PARQUET_THROW_NOT_OK(sink_
.Finish(&buffer
));
432 void PlainEncoder
<BooleanType
>::Put(const bool* src
, int num_values
) {
433 PutImpl(src
, num_values
);
436 void PlainEncoder
<BooleanType
>::Put(const std::vector
<bool>& src
, int num_values
) {
437 PutImpl(src
, num_values
);
440 // ----------------------------------------------------------------------
441 // DictEncoder<T> implementations
443 template <typename DType
>
444 struct DictEncoderTraits
{
445 using c_type
= typename
DType::c_type
;
446 using MemoTableType
= ::arrow::internal::ScalarMemoTable
<c_type
>;
450 struct DictEncoderTraits
<ByteArrayType
> {
451 using MemoTableType
= ::arrow::internal::BinaryMemoTable
<::arrow::BinaryBuilder
>;
455 struct DictEncoderTraits
<FLBAType
> {
456 using MemoTableType
= ::arrow::internal::BinaryMemoTable
<::arrow::BinaryBuilder
>;
459 // Initially 1024 elements
460 static constexpr int32_t kInitialHashTableSize
= 1 << 10;
462 /// See the dictionary encoding section of
463 /// https://github.com/Parquet/parquet-format. The encoding supports
464 /// streaming encoding. Values are encoded as they are added while the
465 /// dictionary is being constructed. At any time, the buffered values
466 /// can be written out with the current dictionary size. More values
467 /// can then be added to the encoder, including new dictionary
469 template <typename DType
>
470 class DictEncoderImpl
: public EncoderImpl
, virtual public DictEncoder
<DType
> {
471 using MemoTableType
= typename DictEncoderTraits
<DType
>::MemoTableType
;
474 typedef typename
DType::c_type T
;
476 explicit DictEncoderImpl(const ColumnDescriptor
* desc
, MemoryPool
* pool
)
477 : EncoderImpl(desc
, Encoding::PLAIN_DICTIONARY
, pool
),
478 buffered_indices_(::arrow::stl::allocator
<int32_t>(pool
)),
479 dict_encoded_size_(0),
480 memo_table_(pool
, kInitialHashTableSize
) {}
482 ~DictEncoderImpl() override
{ DCHECK(buffered_indices_
.empty()); }
484 int dict_encoded_size() override
{ return dict_encoded_size_
; }
486 int WriteIndices(uint8_t* buffer
, int buffer_len
) override
{
487 // Write bit width in first byte
488 *buffer
= static_cast<uint8_t>(bit_width());
492 ::arrow::util::RleEncoder
encoder(buffer
, buffer_len
, bit_width());
494 for (int32_t index
: buffered_indices_
) {
495 if (!encoder
.Put(index
)) return -1;
500 return 1 + encoder
.len();
503 void set_type_length(int type_length
) { this->type_length_
= type_length
; }
505 /// Returns a conservative estimate of the number of bytes needed to encode the buffered
506 /// indices. Used to size the buffer passed to WriteIndices().
507 int64_t EstimatedDataEncodedSize() override
{
508 // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
510 // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
511 // but not reserving them would cause the encoder to fail.
513 ::arrow::util::RleEncoder::MaxBufferSize(
514 bit_width(), static_cast<int>(buffered_indices_
.size())) +
515 ::arrow::util::RleEncoder::MinBufferSize(bit_width());
518 /// The minimum bit width required to encode the currently buffered indices.
519 int bit_width() const override
{
520 if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0;
521 if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1;
522 return BitUtil::Log2(num_entries());
525 /// Encode value. Note that this does not actually write any data, just
526 /// buffers the value's index to be written later.
527 inline void Put(const T
& value
);
529 // Not implemented for other data types
530 inline void PutByteArray(const void* ptr
, int32_t length
);
532 void Put(const T
* src
, int num_values
) override
{
533 for (int32_t i
= 0; i
< num_values
; i
++) {
538 void PutSpaced(const T
* src
, int num_values
, const uint8_t* valid_bits
,
539 int64_t valid_bits_offset
) override
{
540 ::arrow::internal::VisitSetBitRunsVoid(valid_bits
, valid_bits_offset
, num_values
,
541 [&](int64_t position
, int64_t length
) {
542 for (int64_t i
= 0; i
< length
; i
++) {
543 Put(src
[i
+ position
]);
548 using TypedEncoder
<DType
>::Put
;
550 void Put(const ::arrow::Array
& values
) override
;
551 void PutDictionary(const ::arrow::Array
& values
) override
;
553 template <typename ArrowType
, typename T
= typename
ArrowType::c_type
>
554 void PutIndicesTyped(const ::arrow::Array
& data
) {
555 auto values
= data
.data()->GetValues
<T
>(1);
556 size_t buffer_position
= buffered_indices_
.size();
557 buffered_indices_
.resize(buffer_position
+
558 static_cast<size_t>(data
.length() - data
.null_count()));
559 ::arrow::internal::VisitSetBitRunsVoid(
560 data
.null_bitmap_data(), data
.offset(), data
.length(),
561 [&](int64_t position
, int64_t length
) {
562 for (int64_t i
= 0; i
< length
; ++i
) {
563 buffered_indices_
[buffer_position
++] =
564 static_cast<int32_t>(values
[i
+ position
]);
569 void PutIndices(const ::arrow::Array
& data
) override
{
570 switch (data
.type()->id()) {
571 case ::arrow::Type::UINT8
:
572 case ::arrow::Type::INT8
:
573 return PutIndicesTyped
<::arrow::UInt8Type
>(data
);
574 case ::arrow::Type::UINT16
:
575 case ::arrow::Type::INT16
:
576 return PutIndicesTyped
<::arrow::UInt16Type
>(data
);
577 case ::arrow::Type::UINT32
:
578 case ::arrow::Type::INT32
:
579 return PutIndicesTyped
<::arrow::UInt32Type
>(data
);
580 case ::arrow::Type::UINT64
:
581 case ::arrow::Type::INT64
:
582 return PutIndicesTyped
<::arrow::UInt64Type
>(data
);
584 throw ParquetException("Passed non-integer array to PutIndices");
588 std::shared_ptr
<Buffer
> FlushValues() override
{
589 std::shared_ptr
<ResizableBuffer
> buffer
=
590 AllocateBuffer(this->pool_
, EstimatedDataEncodedSize());
591 int result_size
= WriteIndices(buffer
->mutable_data(),
592 static_cast<int>(EstimatedDataEncodedSize()));
593 PARQUET_THROW_NOT_OK(buffer
->Resize(result_size
, false));
594 return std::move(buffer
);
597 /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
598 /// dict_encoded_size() bytes.
599 void WriteDict(uint8_t* buffer
) override
;
601 /// The number of entries in the dictionary.
602 int num_entries() const override
{ return memo_table_
.size(); }
605 /// Clears all the indices (but leaves the dictionary).
606 void ClearIndices() { buffered_indices_
.clear(); }
608 /// Indices that have not yet be written out by WriteIndices().
609 ArrowPoolVector
<int32_t> buffered_indices_
;
611 template <typename ArrayType
>
612 void PutBinaryArray(const ArrayType
& array
) {
613 PARQUET_THROW_NOT_OK(::arrow::VisitArrayDataInline
<typename
ArrayType::TypeClass
>(
615 [&](::arrow::util::string_view view
) {
616 if (ARROW_PREDICT_FALSE(view
.size() > kMaxByteArraySize
)) {
617 return Status::Invalid("Parquet cannot store strings with size 2GB or more");
619 PutByteArray(view
.data(), static_cast<uint32_t>(view
.size()));
622 []() { return Status::OK(); }));
625 template <typename ArrayType
>
626 void PutBinaryDictionaryArray(const ArrayType
& array
) {
627 DCHECK_EQ(array
.null_count(), 0);
628 for (int64_t i
= 0; i
< array
.length(); i
++) {
629 auto v
= array
.GetView(i
);
630 if (ARROW_PREDICT_FALSE(v
.size() > kMaxByteArraySize
)) {
631 throw ParquetException("Parquet cannot store strings with size 2GB or more");
633 dict_encoded_size_
+= static_cast<int>(v
.size() + sizeof(uint32_t));
634 int32_t unused_memo_index
;
635 PARQUET_THROW_NOT_OK(memo_table_
.GetOrInsert(
636 v
.data(), static_cast<int32_t>(v
.size()), &unused_memo_index
));
640 /// The number of bytes needed to encode the dictionary.
641 int dict_encoded_size_
;
643 MemoTableType memo_table_
;
646 template <typename DType
>
647 void DictEncoderImpl
<DType
>::WriteDict(uint8_t* buffer
) {
648 // For primitive types, only a memcpy
649 DCHECK_EQ(static_cast<size_t>(dict_encoded_size_
), sizeof(T
) * memo_table_
.size());
650 memo_table_
.CopyValues(0 /* start_pos */, reinterpret_cast<T
*>(buffer
));
653 // ByteArray and FLBA already have the dictionary encoded in their data heaps
655 void DictEncoderImpl
<ByteArrayType
>::WriteDict(uint8_t* buffer
) {
656 memo_table_
.VisitValues(0, [&buffer
](const ::arrow::util::string_view
& v
) {
657 uint32_t len
= static_cast<uint32_t>(v
.length());
658 memcpy(buffer
, &len
, sizeof(len
));
659 buffer
+= sizeof(len
);
660 memcpy(buffer
, v
.data(), len
);
666 void DictEncoderImpl
<FLBAType
>::WriteDict(uint8_t* buffer
) {
667 memo_table_
.VisitValues(0, [&](const ::arrow::util::string_view
& v
) {
668 DCHECK_EQ(v
.length(), static_cast<size_t>(type_length_
));
669 memcpy(buffer
, v
.data(), type_length_
);
670 buffer
+= type_length_
;
674 template <typename DType
>
675 inline void DictEncoderImpl
<DType
>::Put(const T
& v
) {
676 // Put() implementation for primitive types
677 auto on_found
= [](int32_t memo_index
) {};
678 auto on_not_found
= [this](int32_t memo_index
) {
679 dict_encoded_size_
+= static_cast<int>(sizeof(T
));
683 PARQUET_THROW_NOT_OK(memo_table_
.GetOrInsert(v
, on_found
, on_not_found
, &memo_index
));
684 buffered_indices_
.push_back(memo_index
);
687 template <typename DType
>
688 inline void DictEncoderImpl
<DType
>::PutByteArray(const void* ptr
, int32_t length
) {
693 inline void DictEncoderImpl
<ByteArrayType
>::PutByteArray(const void* ptr
,
695 static const uint8_t empty
[] = {0};
697 auto on_found
= [](int32_t memo_index
) {};
698 auto on_not_found
= [&](int32_t memo_index
) {
699 dict_encoded_size_
+= static_cast<int>(length
+ sizeof(uint32_t));
702 DCHECK(ptr
!= nullptr || length
== 0);
703 ptr
= (ptr
!= nullptr) ? ptr
: empty
;
705 PARQUET_THROW_NOT_OK(
706 memo_table_
.GetOrInsert(ptr
, length
, on_found
, on_not_found
, &memo_index
));
707 buffered_indices_
.push_back(memo_index
);
711 inline void DictEncoderImpl
<ByteArrayType
>::Put(const ByteArray
& val
) {
712 return PutByteArray(val
.ptr
, static_cast<int32_t>(val
.len
));
716 inline void DictEncoderImpl
<FLBAType
>::Put(const FixedLenByteArray
& v
) {
717 static const uint8_t empty
[] = {0};
719 auto on_found
= [](int32_t memo_index
) {};
720 auto on_not_found
= [this](int32_t memo_index
) { dict_encoded_size_
+= type_length_
; };
722 DCHECK(v
.ptr
!= nullptr || type_length_
== 0);
723 const void* ptr
= (v
.ptr
!= nullptr) ? v
.ptr
: empty
;
725 PARQUET_THROW_NOT_OK(
726 memo_table_
.GetOrInsert(ptr
, type_length_
, on_found
, on_not_found
, &memo_index
));
727 buffered_indices_
.push_back(memo_index
);
731 void DictEncoderImpl
<Int96Type
>::Put(const ::arrow::Array
& values
) {
732 ParquetException::NYI("Direct put to Int96");
736 void DictEncoderImpl
<Int96Type
>::PutDictionary(const ::arrow::Array
& values
) {
737 ParquetException::NYI("Direct put to Int96");
740 template <typename DType
>
741 void DictEncoderImpl
<DType
>::Put(const ::arrow::Array
& values
) {
742 using ArrayType
= typename ::arrow::CTypeTraits
<typename
DType::c_type
>::ArrayType
;
743 const auto& data
= checked_cast
<const ArrayType
&>(values
);
744 if (data
.null_count() == 0) {
745 // no nulls, just dump the data
746 for (int64_t i
= 0; i
< data
.length(); i
++) {
750 for (int64_t i
= 0; i
< data
.length(); i
++) {
751 if (data
.IsValid(i
)) {
759 void DictEncoderImpl
<FLBAType
>::Put(const ::arrow::Array
& values
) {
760 AssertFixedSizeBinary(values
, type_length_
);
761 const auto& data
= checked_cast
<const ::arrow::FixedSizeBinaryArray
&>(values
);
762 if (data
.null_count() == 0) {
763 // no nulls, just dump the data
764 for (int64_t i
= 0; i
< data
.length(); i
++) {
765 Put(FixedLenByteArray(data
.Value(i
)));
768 std::vector
<uint8_t> empty(type_length_
, 0);
769 for (int64_t i
= 0; i
< data
.length(); i
++) {
770 if (data
.IsValid(i
)) {
771 Put(FixedLenByteArray(data
.Value(i
)));
778 void DictEncoderImpl
<ByteArrayType
>::Put(const ::arrow::Array
& values
) {
779 AssertBaseBinary(values
);
780 if (::arrow::is_binary_like(values
.type_id())) {
781 PutBinaryArray(checked_cast
<const ::arrow::BinaryArray
&>(values
));
783 DCHECK(::arrow::is_large_binary_like(values
.type_id()));
784 PutBinaryArray(checked_cast
<const ::arrow::LargeBinaryArray
&>(values
));
788 template <typename DType
>
789 void AssertCanPutDictionary(DictEncoderImpl
<DType
>* encoder
, const ::arrow::Array
& dict
) {
790 if (dict
.null_count() > 0) {
791 throw ParquetException("Inserted dictionary cannot cannot contain nulls");
794 if (encoder
->num_entries() > 0) {
795 throw ParquetException("Can only call PutDictionary on an empty DictEncoder");
799 template <typename DType
>
800 void DictEncoderImpl
<DType
>::PutDictionary(const ::arrow::Array
& values
) {
801 AssertCanPutDictionary(this, values
);
803 using ArrayType
= typename ::arrow::CTypeTraits
<typename
DType::c_type
>::ArrayType
;
804 const auto& data
= checked_cast
<const ArrayType
&>(values
);
806 dict_encoded_size_
+= static_cast<int>(sizeof(typename
DType::c_type
) * data
.length());
807 for (int64_t i
= 0; i
< data
.length(); i
++) {
808 int32_t unused_memo_index
;
809 PARQUET_THROW_NOT_OK(memo_table_
.GetOrInsert(data
.Value(i
), &unused_memo_index
));
814 void DictEncoderImpl
<FLBAType
>::PutDictionary(const ::arrow::Array
& values
) {
815 AssertFixedSizeBinary(values
, type_length_
);
816 AssertCanPutDictionary(this, values
);
818 const auto& data
= checked_cast
<const ::arrow::FixedSizeBinaryArray
&>(values
);
820 dict_encoded_size_
+= static_cast<int>(type_length_
* data
.length());
821 for (int64_t i
= 0; i
< data
.length(); i
++) {
822 int32_t unused_memo_index
;
823 PARQUET_THROW_NOT_OK(
824 memo_table_
.GetOrInsert(data
.Value(i
), type_length_
, &unused_memo_index
));
829 void DictEncoderImpl
<ByteArrayType
>::PutDictionary(const ::arrow::Array
& values
) {
830 AssertBaseBinary(values
);
831 AssertCanPutDictionary(this, values
);
833 if (::arrow::is_binary_like(values
.type_id())) {
834 PutBinaryDictionaryArray(checked_cast
<const ::arrow::BinaryArray
&>(values
));
836 DCHECK(::arrow::is_large_binary_like(values
.type_id()));
837 PutBinaryDictionaryArray(checked_cast
<const ::arrow::LargeBinaryArray
&>(values
));
841 // ----------------------------------------------------------------------
842 // ByteStreamSplitEncoder<T> implementations
844 template <typename DType
>
845 class ByteStreamSplitEncoder
: public EncoderImpl
, virtual public TypedEncoder
<DType
> {
847 using T
= typename
DType::c_type
;
848 using TypedEncoder
<DType
>::Put
;
850 explicit ByteStreamSplitEncoder(
851 const ColumnDescriptor
* descr
,
852 ::arrow::MemoryPool
* pool
= ::arrow::default_memory_pool());
854 int64_t EstimatedDataEncodedSize() override
;
855 std::shared_ptr
<Buffer
> FlushValues() override
;
857 void Put(const T
* buffer
, int num_values
) override
;
858 void Put(const ::arrow::Array
& values
) override
;
859 void PutSpaced(const T
* src
, int num_values
, const uint8_t* valid_bits
,
860 int64_t valid_bits_offset
) override
;
863 template <typename ArrowType
>
864 void PutImpl(const ::arrow::Array
& values
) {
865 if (values
.type_id() != ArrowType::type_id
) {
866 throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() +
867 " from " + values
.type()->ToString() + " not supported");
869 const auto& data
= *values
.data();
870 PutSpaced(data
.GetValues
<typename
ArrowType::c_type
>(1),
871 static_cast<int>(data
.length
), data
.GetValues
<uint8_t>(0, 0), data
.offset
);
874 ::arrow::BufferBuilder sink_
;
875 int64_t num_values_in_buffer_
;
878 template <typename DType
>
879 ByteStreamSplitEncoder
<DType
>::ByteStreamSplitEncoder(const ColumnDescriptor
* descr
,
880 ::arrow::MemoryPool
* pool
)
881 : EncoderImpl(descr
, Encoding::BYTE_STREAM_SPLIT
, pool
),
883 num_values_in_buffer_
{0} {}
885 template <typename DType
>
886 int64_t ByteStreamSplitEncoder
<DType
>::EstimatedDataEncodedSize() {
887 return sink_
.length();
890 template <typename DType
>
891 std::shared_ptr
<Buffer
> ByteStreamSplitEncoder
<DType
>::FlushValues() {
892 std::shared_ptr
<ResizableBuffer
> output_buffer
=
893 AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
894 uint8_t* output_buffer_raw
= output_buffer
->mutable_data();
895 const uint8_t* raw_values
= sink_
.data();
896 ::arrow::util::internal::ByteStreamSplitEncode
<T
>(raw_values
, num_values_in_buffer_
,
899 num_values_in_buffer_
= 0;
900 return std::move(output_buffer
);
903 template <typename DType
>
904 void ByteStreamSplitEncoder
<DType
>::Put(const T
* buffer
, int num_values
) {
905 if (num_values
> 0) {
906 PARQUET_THROW_NOT_OK(sink_
.Append(buffer
, num_values
* sizeof(T
)));
907 num_values_in_buffer_
+= num_values
;
912 void ByteStreamSplitEncoder
<FloatType
>::Put(const ::arrow::Array
& values
) {
913 PutImpl
<::arrow::FloatType
>(values
);
917 void ByteStreamSplitEncoder
<DoubleType
>::Put(const ::arrow::Array
& values
) {
918 PutImpl
<::arrow::DoubleType
>(values
);
921 template <typename DType
>
922 void ByteStreamSplitEncoder
<DType
>::PutSpaced(const T
* src
, int num_values
,
923 const uint8_t* valid_bits
,
924 int64_t valid_bits_offset
) {
925 if (valid_bits
!= NULLPTR
) {
926 PARQUET_ASSIGN_OR_THROW(auto buffer
, ::arrow::AllocateBuffer(num_values
* sizeof(T
),
927 this->memory_pool()));
928 T
* data
= reinterpret_cast<T
*>(buffer
->mutable_data());
929 int num_valid_values
= ::arrow::util::internal::SpacedCompress
<T
>(
930 src
, num_values
, valid_bits
, valid_bits_offset
, data
);
931 Put(data
, num_valid_values
);
933 Put(src
, num_values
);
937 class DecoderImpl
: virtual public Decoder
{
939 void SetData(int num_values
, const uint8_t* data
, int len
) override
{
940 num_values_
= num_values
;
945 int values_left() const override
{ return num_values_
; }
946 Encoding::type
encoding() const override
{ return encoding_
; }
949 explicit DecoderImpl(const ColumnDescriptor
* descr
, Encoding::type encoding
)
950 : descr_(descr
), encoding_(encoding
), num_values_(0), data_(NULLPTR
), len_(0) {}
952 // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
953 const ColumnDescriptor
* descr_
;
955 const Encoding::type encoding_
;
957 const uint8_t* data_
;
962 template <typename DType
>
963 class PlainDecoder
: public DecoderImpl
, virtual public TypedDecoder
<DType
> {
965 using T
= typename
DType::c_type
;
966 explicit PlainDecoder(const ColumnDescriptor
* descr
);
968 int Decode(T
* buffer
, int max_values
) override
;
970 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
971 int64_t valid_bits_offset
,
972 typename EncodingTraits
<DType
>::Accumulator
* builder
) override
;
974 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
975 int64_t valid_bits_offset
,
976 typename EncodingTraits
<DType
>::DictAccumulator
* builder
) override
;
980 inline int PlainDecoder
<Int96Type
>::DecodeArrow(
981 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
982 typename EncodingTraits
<Int96Type
>::Accumulator
* builder
) {
983 ParquetException::NYI("DecodeArrow not supported for Int96");
987 inline int PlainDecoder
<Int96Type
>::DecodeArrow(
988 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
989 typename EncodingTraits
<Int96Type
>::DictAccumulator
* builder
) {
990 ParquetException::NYI("DecodeArrow not supported for Int96");
994 inline int PlainDecoder
<BooleanType
>::DecodeArrow(
995 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
996 typename EncodingTraits
<BooleanType
>::DictAccumulator
* builder
) {
997 ParquetException::NYI("dictionaries of BooleanType");
1000 template <typename DType
>
1001 int PlainDecoder
<DType
>::DecodeArrow(
1002 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1003 typename EncodingTraits
<DType
>::Accumulator
* builder
) {
1004 using value_type
= typename
DType::c_type
;
1006 constexpr int value_size
= static_cast<int>(sizeof(value_type
));
1007 int values_decoded
= num_values
- null_count
;
1008 if (ARROW_PREDICT_FALSE(len_
< value_size
* values_decoded
)) {
1009 ParquetException::EofException();
1012 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
1014 VisitNullBitmapInline(
1015 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1017 builder
->UnsafeAppend(::arrow::util::SafeLoadAs
<value_type
>(data_
));
1018 data_
+= sizeof(value_type
);
1020 [&]() { builder
->UnsafeAppendNull(); });
1022 num_values_
-= values_decoded
;
1023 len_
-= sizeof(value_type
) * values_decoded
;
1024 return values_decoded
;
1027 template <typename DType
>
1028 int PlainDecoder
<DType
>::DecodeArrow(
1029 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1030 typename EncodingTraits
<DType
>::DictAccumulator
* builder
) {
1031 using value_type
= typename
DType::c_type
;
1033 constexpr int value_size
= static_cast<int>(sizeof(value_type
));
1034 int values_decoded
= num_values
- null_count
;
1035 if (ARROW_PREDICT_FALSE(len_
< value_size
* values_decoded
)) {
1036 ParquetException::EofException();
1039 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
1041 VisitNullBitmapInline(
1042 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1044 PARQUET_THROW_NOT_OK(
1045 builder
->Append(::arrow::util::SafeLoadAs
<value_type
>(data_
)));
1046 data_
+= sizeof(value_type
);
1048 [&]() { PARQUET_THROW_NOT_OK(builder
->AppendNull()); });
1050 num_values_
-= values_decoded
;
1051 len_
-= sizeof(value_type
) * values_decoded
;
1052 return values_decoded
;
1055 // Decode routine templated on C++ type rather than type enum
1056 template <typename T
>
1057 inline int DecodePlain(const uint8_t* data
, int64_t data_size
, int num_values
,
1058 int type_length
, T
* out
) {
1059 int64_t bytes_to_decode
= num_values
* static_cast<int64_t>(sizeof(T
));
1060 if (bytes_to_decode
> data_size
|| bytes_to_decode
> INT_MAX
) {
1061 ParquetException::EofException();
1063 // If bytes_to_decode == 0, data could be null
1064 if (bytes_to_decode
> 0) {
1065 memcpy(out
, data
, bytes_to_decode
);
1067 return static_cast<int>(bytes_to_decode
);
1070 template <typename DType
>
1071 PlainDecoder
<DType
>::PlainDecoder(const ColumnDescriptor
* descr
)
1072 : DecoderImpl(descr
, Encoding::PLAIN
) {
1073 if (descr_
&& descr_
->physical_type() == Type::FIXED_LEN_BYTE_ARRAY
) {
1074 type_length_
= descr_
->type_length();
1080 // Template specialization for BYTE_ARRAY. The written values do not own their
1083 static inline int64_t ReadByteArray(const uint8_t* data
, int64_t data_size
,
1085 if (ARROW_PREDICT_FALSE(data_size
< 4)) {
1086 ParquetException::EofException();
1088 const int32_t len
= ::arrow::util::SafeLoadAs
<int32_t>(data
);
1090 throw ParquetException("Invalid BYTE_ARRAY value");
1092 const int64_t consumed_length
= static_cast<int64_t>(len
) + 4;
1093 if (ARROW_PREDICT_FALSE(data_size
< consumed_length
)) {
1094 ParquetException::EofException();
1096 *out
= ByteArray
{static_cast<uint32_t>(len
), data
+ 4};
1097 return consumed_length
;
1101 inline int DecodePlain
<ByteArray
>(const uint8_t* data
, int64_t data_size
, int num_values
,
1102 int type_length
, ByteArray
* out
) {
1103 int bytes_decoded
= 0;
1104 for (int i
= 0; i
< num_values
; ++i
) {
1105 const auto increment
= ReadByteArray(data
, data_size
, out
+ i
);
1106 if (ARROW_PREDICT_FALSE(increment
> INT_MAX
- bytes_decoded
)) {
1107 throw ParquetException("BYTE_ARRAY chunk too large");
1110 data_size
-= increment
;
1111 bytes_decoded
+= static_cast<int>(increment
);
1113 return bytes_decoded
;
1116 // Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
1117 // own their own data.
1119 inline int DecodePlain
<FixedLenByteArray
>(const uint8_t* data
, int64_t data_size
,
1120 int num_values
, int type_length
,
1121 FixedLenByteArray
* out
) {
1122 int64_t bytes_to_decode
= static_cast<int64_t>(type_length
) * num_values
;
1123 if (bytes_to_decode
> data_size
|| bytes_to_decode
> INT_MAX
) {
1124 ParquetException::EofException();
1126 for (int i
= 0; i
< num_values
; ++i
) {
1128 data
+= type_length
;
1129 data_size
-= type_length
;
1131 return static_cast<int>(bytes_to_decode
);
1134 template <typename DType
>
1135 int PlainDecoder
<DType
>::Decode(T
* buffer
, int max_values
) {
1136 max_values
= std::min(max_values
, num_values_
);
1137 int bytes_consumed
= DecodePlain
<T
>(data_
, len_
, max_values
, type_length_
, buffer
);
1138 data_
+= bytes_consumed
;
1139 len_
-= bytes_consumed
;
1140 num_values_
-= max_values
;
1144 class PlainBooleanDecoder
: public DecoderImpl
,
1145 virtual public TypedDecoder
<BooleanType
>,
1146 virtual public BooleanDecoder
{
1148 explicit PlainBooleanDecoder(const ColumnDescriptor
* descr
);
1149 void SetData(int num_values
, const uint8_t* data
, int len
) override
;
1151 // Two flavors of bool decoding
1152 int Decode(uint8_t* buffer
, int max_values
) override
;
1153 int Decode(bool* buffer
, int max_values
) override
;
1154 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1155 int64_t valid_bits_offset
,
1156 typename EncodingTraits
<BooleanType
>::Accumulator
* out
) override
;
1158 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1159 int64_t valid_bits_offset
,
1160 typename EncodingTraits
<BooleanType
>::DictAccumulator
* out
) override
;
1163 std::unique_ptr
<::arrow::BitUtil::BitReader
> bit_reader_
;
1166 PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor
* descr
)
1167 : DecoderImpl(descr
, Encoding::PLAIN
) {}
1169 void PlainBooleanDecoder::SetData(int num_values
, const uint8_t* data
, int len
) {
1170 num_values_
= num_values
;
1171 bit_reader_
.reset(new BitUtil::BitReader(data
, len
));
1174 int PlainBooleanDecoder::DecodeArrow(
1175 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1176 typename EncodingTraits
<BooleanType
>::Accumulator
* builder
) {
1177 int values_decoded
= num_values
- null_count
;
1178 if (ARROW_PREDICT_FALSE(num_values_
< values_decoded
)) {
1179 ParquetException::EofException();
1182 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
1184 VisitNullBitmapInline(
1185 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1188 ARROW_IGNORE_EXPR(bit_reader_
->GetValue(1, &value
));
1189 builder
->UnsafeAppend(value
);
1191 [&]() { builder
->UnsafeAppendNull(); });
1193 num_values_
-= values_decoded
;
1194 return values_decoded
;
1197 inline int PlainBooleanDecoder::DecodeArrow(
1198 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1199 typename EncodingTraits
<BooleanType
>::DictAccumulator
* builder
) {
1200 ParquetException::NYI("dictionaries of BooleanType");
1203 int PlainBooleanDecoder::Decode(uint8_t* buffer
, int max_values
) {
1204 max_values
= std::min(max_values
, num_values_
);
1206 ::arrow::internal::BitmapWriter
bit_writer(buffer
, 0, max_values
);
1207 for (int i
= 0; i
< max_values
; ++i
) {
1208 if (!bit_reader_
->GetValue(1, &val
)) {
1209 ParquetException::EofException();
1216 bit_writer
.Finish();
1217 num_values_
-= max_values
;
1221 int PlainBooleanDecoder::Decode(bool* buffer
, int max_values
) {
1222 max_values
= std::min(max_values
, num_values_
);
1223 if (bit_reader_
->GetBatch(1, buffer
, max_values
) != max_values
) {
1224 ParquetException::EofException();
1226 num_values_
-= max_values
;
1230 struct ArrowBinaryHelper
{
1231 explicit ArrowBinaryHelper(typename EncodingTraits
<ByteArrayType
>::Accumulator
* out
) {
1233 this->builder
= out
->builder
.get();
1234 this->chunk_space_remaining
=
1235 ::arrow::kBinaryMemoryLimit
- this->builder
->value_data_length();
1238 Status
PushChunk() {
1239 std::shared_ptr
<::arrow::Array
> result
;
1240 RETURN_NOT_OK(builder
->Finish(&result
));
1241 out
->chunks
.push_back(result
);
1242 chunk_space_remaining
= ::arrow::kBinaryMemoryLimit
;
1243 return Status::OK();
1246 bool CanFit(int64_t length
) const { return length
<= chunk_space_remaining
; }
1248 void UnsafeAppend(const uint8_t* data
, int32_t length
) {
1249 chunk_space_remaining
-= length
;
1250 builder
->UnsafeAppend(data
, length
);
1253 void UnsafeAppendNull() { builder
->UnsafeAppendNull(); }
1255 Status
Append(const uint8_t* data
, int32_t length
) {
1256 chunk_space_remaining
-= length
;
1257 return builder
->Append(data
, length
);
1260 Status
AppendNull() { return builder
->AppendNull(); }
1262 typename EncodingTraits
<ByteArrayType
>::Accumulator
* out
;
1263 ::arrow::BinaryBuilder
* builder
;
1264 int64_t chunk_space_remaining
;
1268 inline int PlainDecoder
<ByteArrayType
>::DecodeArrow(
1269 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1270 typename EncodingTraits
<ByteArrayType
>::Accumulator
* builder
) {
1271 ParquetException::NYI();
1275 inline int PlainDecoder
<ByteArrayType
>::DecodeArrow(
1276 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1277 typename EncodingTraits
<ByteArrayType
>::DictAccumulator
* builder
) {
1278 ParquetException::NYI();
1282 inline int PlainDecoder
<FLBAType
>::DecodeArrow(
1283 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1284 typename EncodingTraits
<FLBAType
>::Accumulator
* builder
) {
1285 int values_decoded
= num_values
- null_count
;
1286 if (ARROW_PREDICT_FALSE(len_
< descr_
->type_length() * values_decoded
)) {
1287 ParquetException::EofException();
1290 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
1292 VisitNullBitmapInline(
1293 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1295 builder
->UnsafeAppend(data_
);
1296 data_
+= descr_
->type_length();
1298 [&]() { builder
->UnsafeAppendNull(); });
1300 num_values_
-= values_decoded
;
1301 len_
-= descr_
->type_length() * values_decoded
;
1302 return values_decoded
;
1306 inline int PlainDecoder
<FLBAType
>::DecodeArrow(
1307 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1308 typename EncodingTraits
<FLBAType
>::DictAccumulator
* builder
) {
1309 int values_decoded
= num_values
- null_count
;
1310 if (ARROW_PREDICT_FALSE(len_
< descr_
->type_length() * values_decoded
)) {
1311 ParquetException::EofException();
1314 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
1316 VisitNullBitmapInline(
1317 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1319 PARQUET_THROW_NOT_OK(builder
->Append(data_
));
1320 data_
+= descr_
->type_length();
1322 [&]() { PARQUET_THROW_NOT_OK(builder
->AppendNull()); });
1324 num_values_
-= values_decoded
;
1325 len_
-= descr_
->type_length() * values_decoded
;
1326 return values_decoded
;
1329 class PlainByteArrayDecoder
: public PlainDecoder
<ByteArrayType
>,
1330 virtual public ByteArrayDecoder
{
1332 using Base
= PlainDecoder
<ByteArrayType
>;
1333 using Base::DecodeSpaced
;
1334 using Base::PlainDecoder
;
1336 // ----------------------------------------------------------------------
1337 // Dictionary read paths
1339 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1340 int64_t valid_bits_offset
,
1341 ::arrow::BinaryDictionary32Builder
* builder
) override
{
1343 PARQUET_THROW_NOT_OK(DecodeArrow(num_values
, null_count
, valid_bits
,
1344 valid_bits_offset
, builder
, &result
));
1348 // ----------------------------------------------------------------------
1349 // Optimized dense binary read paths
1351 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1352 int64_t valid_bits_offset
,
1353 typename EncodingTraits
<ByteArrayType
>::Accumulator
* out
) override
{
1355 PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values
, null_count
, valid_bits
,
1356 valid_bits_offset
, out
, &result
));
1361 Status
DecodeArrowDense(int num_values
, int null_count
, const uint8_t* valid_bits
,
1362 int64_t valid_bits_offset
,
1363 typename EncodingTraits
<ByteArrayType
>::Accumulator
* out
,
1364 int* out_values_decoded
) {
1365 ArrowBinaryHelper
helper(out
);
1366 int values_decoded
= 0;
1368 RETURN_NOT_OK(helper
.builder
->Reserve(num_values
));
1369 RETURN_NOT_OK(helper
.builder
->ReserveData(
1370 std::min
<int64_t>(len_
, helper
.chunk_space_remaining
)));
1373 RETURN_NOT_OK(VisitNullBitmapInline(
1374 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1376 if (ARROW_PREDICT_FALSE(len_
< 4)) {
1377 ParquetException::EofException();
1379 auto value_len
= ::arrow::util::SafeLoadAs
<int32_t>(data_
);
1380 if (ARROW_PREDICT_FALSE(value_len
< 0 || value_len
> INT32_MAX
- 4)) {
1381 return Status::Invalid("Invalid or corrupted value_len '", value_len
, "'");
1383 auto increment
= value_len
+ 4;
1384 if (ARROW_PREDICT_FALSE(len_
< increment
)) {
1385 ParquetException::EofException();
1387 if (ARROW_PREDICT_FALSE(!helper
.CanFit(value_len
))) {
1388 // This element would exceed the capacity of a chunk
1389 RETURN_NOT_OK(helper
.PushChunk());
1390 RETURN_NOT_OK(helper
.builder
->Reserve(num_values
- i
));
1391 RETURN_NOT_OK(helper
.builder
->ReserveData(
1392 std::min
<int64_t>(len_
, helper
.chunk_space_remaining
)));
1394 helper
.UnsafeAppend(data_
+ 4, value_len
);
1399 return Status::OK();
1402 helper
.UnsafeAppendNull();
1404 return Status::OK();
1407 num_values_
-= values_decoded
;
1408 *out_values_decoded
= values_decoded
;
1409 return Status::OK();
1412 template <typename BuilderType
>
1413 Status
DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1414 int64_t valid_bits_offset
, BuilderType
* builder
,
1415 int* out_values_decoded
) {
1416 RETURN_NOT_OK(builder
->Reserve(num_values
));
1417 int values_decoded
= 0;
1419 RETURN_NOT_OK(VisitNullBitmapInline(
1420 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1422 if (ARROW_PREDICT_FALSE(len_
< 4)) {
1423 ParquetException::EofException();
1425 auto value_len
= ::arrow::util::SafeLoadAs
<int32_t>(data_
);
1426 if (ARROW_PREDICT_FALSE(value_len
< 0 || value_len
> INT32_MAX
- 4)) {
1427 return Status::Invalid("Invalid or corrupted value_len '", value_len
, "'");
1429 auto increment
= value_len
+ 4;
1430 if (ARROW_PREDICT_FALSE(len_
< increment
)) {
1431 ParquetException::EofException();
1433 RETURN_NOT_OK(builder
->Append(data_
+ 4, value_len
));
1437 return Status::OK();
1439 [&]() { return builder
->AppendNull(); }));
1441 num_values_
-= values_decoded
;
1442 *out_values_decoded
= values_decoded
;
1443 return Status::OK();
1447 class PlainFLBADecoder
: public PlainDecoder
<FLBAType
>, virtual public FLBADecoder
{
1449 using Base
= PlainDecoder
<FLBAType
>;
1450 using Base::PlainDecoder
;
1453 // ----------------------------------------------------------------------
1454 // Dictionary encoding and decoding
1456 template <typename Type
>
1457 class DictDecoderImpl
: public DecoderImpl
, virtual public DictDecoder
<Type
> {
1459 typedef typename
Type::c_type T
;
1461 // Initializes the dictionary with values from 'dictionary'. The data in
1462 // dictionary is not guaranteed to persist in memory after this call so the
1463 // dictionary decoder needs to copy the data out if necessary.
1464 explicit DictDecoderImpl(const ColumnDescriptor
* descr
,
1465 MemoryPool
* pool
= ::arrow::default_memory_pool())
1466 : DecoderImpl(descr
, Encoding::RLE_DICTIONARY
),
1467 dictionary_(AllocateBuffer(pool
, 0)),
1468 dictionary_length_(0),
1469 byte_array_data_(AllocateBuffer(pool
, 0)),
1470 byte_array_offsets_(AllocateBuffer(pool
, 0)),
1471 indices_scratch_space_(AllocateBuffer(pool
, 0)) {}
1473 // Perform type-specific initiatialization
1474 void SetDict(TypedDecoder
<Type
>* dictionary
) override
;
1476 void SetData(int num_values
, const uint8_t* data
, int len
) override
{
1477 num_values_
= num_values
;
1479 // Initialize dummy decoder to avoid crashes later on
1480 idx_decoder_
= ::arrow::util::RleDecoder(data
, len
, /*bit_width=*/1);
1483 uint8_t bit_width
= *data
;
1484 if (ARROW_PREDICT_FALSE(bit_width
>= 64)) {
1485 throw ParquetException("Invalid or corrupted bit_width");
1487 idx_decoder_
= ::arrow::util::RleDecoder(++data
, --len
, bit_width
);
1490 int Decode(T
* buffer
, int num_values
) override
{
1491 num_values
= std::min(num_values
, num_values_
);
1492 int decoded_values
=
1493 idx_decoder_
.GetBatchWithDict(reinterpret_cast<const T
*>(dictionary_
->data()),
1494 dictionary_length_
, buffer
, num_values
);
1495 if (decoded_values
!= num_values
) {
1496 ParquetException::EofException();
1498 num_values_
-= num_values
;
1502 int DecodeSpaced(T
* buffer
, int num_values
, int null_count
, const uint8_t* valid_bits
,
1503 int64_t valid_bits_offset
) override
{
1504 num_values
= std::min(num_values
, num_values_
);
1505 if (num_values
!= idx_decoder_
.GetBatchWithDictSpaced(
1506 reinterpret_cast<const T
*>(dictionary_
->data()),
1507 dictionary_length_
, buffer
, num_values
, null_count
, valid_bits
,
1508 valid_bits_offset
)) {
1509 ParquetException::EofException();
1511 num_values_
-= num_values
;
1515 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1516 int64_t valid_bits_offset
,
1517 typename EncodingTraits
<Type
>::Accumulator
* out
) override
;
1519 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1520 int64_t valid_bits_offset
,
1521 typename EncodingTraits
<Type
>::DictAccumulator
* out
) override
;
1523 void InsertDictionary(::arrow::ArrayBuilder
* builder
) override
;
1525 int DecodeIndicesSpaced(int num_values
, int null_count
, const uint8_t* valid_bits
,
1526 int64_t valid_bits_offset
,
1527 ::arrow::ArrayBuilder
* builder
) override
{
1528 if (num_values
> 0) {
1529 // TODO(wesm): Refactor to batch reads for improved memory use. It is not
1530 // trivial because the null_count is relative to the entire bitmap
1531 PARQUET_THROW_NOT_OK(indices_scratch_space_
->TypedResize
<int32_t>(
1532 num_values
, /*shrink_to_fit=*/false));
1535 auto indices_buffer
=
1536 reinterpret_cast<int32_t*>(indices_scratch_space_
->mutable_data());
1538 if (num_values
!= idx_decoder_
.GetBatchSpaced(num_values
, null_count
, valid_bits
,
1539 valid_bits_offset
, indices_buffer
)) {
1540 ParquetException::EofException();
1543 /// XXX(wesm): Cannot append "valid bits" directly to the builder
1544 std::vector
<uint8_t> valid_bytes(num_values
);
1545 ::arrow::internal::BitmapReader
bit_reader(valid_bits
, valid_bits_offset
, num_values
);
1546 for (int64_t i
= 0; i
< num_values
; ++i
) {
1547 valid_bytes
[i
] = static_cast<uint8_t>(bit_reader
.IsSet());
1551 auto binary_builder
= checked_cast
<::arrow::BinaryDictionary32Builder
*>(builder
);
1552 PARQUET_THROW_NOT_OK(
1553 binary_builder
->AppendIndices(indices_buffer
, num_values
, valid_bytes
.data()));
1554 num_values_
-= num_values
- null_count
;
1555 return num_values
- null_count
;
1558 int DecodeIndices(int num_values
, ::arrow::ArrayBuilder
* builder
) override
{
1559 num_values
= std::min(num_values
, num_values_
);
1560 if (num_values
> 0) {
1561 // TODO(wesm): Refactor to batch reads for improved memory use. This is
1562 // relatively simple here because we don't have to do any bookkeeping of
1564 PARQUET_THROW_NOT_OK(indices_scratch_space_
->TypedResize
<int32_t>(
1565 num_values
, /*shrink_to_fit=*/false));
1567 auto indices_buffer
=
1568 reinterpret_cast<int32_t*>(indices_scratch_space_
->mutable_data());
1569 if (num_values
!= idx_decoder_
.GetBatch(indices_buffer
, num_values
)) {
1570 ParquetException::EofException();
1572 auto binary_builder
= checked_cast
<::arrow::BinaryDictionary32Builder
*>(builder
);
1573 PARQUET_THROW_NOT_OK(binary_builder
->AppendIndices(indices_buffer
, num_values
));
1574 num_values_
-= num_values
;
1578 int DecodeIndices(int num_values
, int32_t* indices
) override
{
1579 if (num_values
!= idx_decoder_
.GetBatch(indices
, num_values
)) {
1580 ParquetException::EofException();
1582 num_values_
-= num_values
;
1586 void GetDictionary(const T
** dictionary
, int32_t* dictionary_length
) override
{
1587 *dictionary_length
= dictionary_length_
;
1588 *dictionary
= reinterpret_cast<T
*>(dictionary_
->mutable_data());
1592 Status
IndexInBounds(int32_t index
) {
1593 if (ARROW_PREDICT_TRUE(0 <= index
&& index
< dictionary_length_
)) {
1594 return Status::OK();
1596 return Status::Invalid("Index not in dictionary bounds");
1599 inline void DecodeDict(TypedDecoder
<Type
>* dictionary
) {
1600 dictionary_length_
= static_cast<int32_t>(dictionary
->values_left());
1601 PARQUET_THROW_NOT_OK(dictionary_
->Resize(dictionary_length_
* sizeof(T
),
1602 /*shrink_to_fit=*/false));
1603 dictionary
->Decode(reinterpret_cast<T
*>(dictionary_
->mutable_data()),
1604 dictionary_length_
);
1608 std::shared_ptr
<ResizableBuffer
> dictionary_
;
1610 int32_t dictionary_length_
;
1612 // Data that contains the byte array data (byte_array_dictionary_ just has the
1614 std::shared_ptr
<ResizableBuffer
> byte_array_data_
;
1616 // Arrow-style byte offsets for each dictionary value. We maintain two
1617 // representations of the dictionary, one as ByteArray* for non-Arrow
1618 // consumers and this one for Arrow consumers. Since dictionaries are
1619 // generally pretty small to begin with this doesn't mean too much extra
1620 // memory use in most cases
1621 std::shared_ptr
<ResizableBuffer
> byte_array_offsets_
;
1623 // Reusable buffer for decoding dictionary indices to be appended to a
1624 // BinaryDictionary32Builder
1625 std::shared_ptr
<ResizableBuffer
> indices_scratch_space_
;
1627 ::arrow::util::RleDecoder idx_decoder_
;
1630 template <typename Type
>
1631 void DictDecoderImpl
<Type
>::SetDict(TypedDecoder
<Type
>* dictionary
) {
1632 DecodeDict(dictionary
);
1636 void DictDecoderImpl
<BooleanType
>::SetDict(TypedDecoder
<BooleanType
>* dictionary
) {
1637 ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
1641 void DictDecoderImpl
<ByteArrayType
>::SetDict(TypedDecoder
<ByteArrayType
>* dictionary
) {
1642 DecodeDict(dictionary
);
1644 auto dict_values
= reinterpret_cast<ByteArray
*>(dictionary_
->mutable_data());
1647 for (int i
= 0; i
< dictionary_length_
; ++i
) {
1648 total_size
+= dict_values
[i
].len
;
1650 PARQUET_THROW_NOT_OK(byte_array_data_
->Resize(total_size
,
1651 /*shrink_to_fit=*/false));
1652 PARQUET_THROW_NOT_OK(
1653 byte_array_offsets_
->Resize((dictionary_length_
+ 1) * sizeof(int32_t),
1654 /*shrink_to_fit=*/false));
1657 uint8_t* bytes_data
= byte_array_data_
->mutable_data();
1658 int32_t* bytes_offsets
=
1659 reinterpret_cast<int32_t*>(byte_array_offsets_
->mutable_data());
1660 for (int i
= 0; i
< dictionary_length_
; ++i
) {
1661 memcpy(bytes_data
+ offset
, dict_values
[i
].ptr
, dict_values
[i
].len
);
1662 bytes_offsets
[i
] = offset
;
1663 dict_values
[i
].ptr
= bytes_data
+ offset
;
1664 offset
+= dict_values
[i
].len
;
1666 bytes_offsets
[dictionary_length_
] = offset
;
1670 inline void DictDecoderImpl
<FLBAType
>::SetDict(TypedDecoder
<FLBAType
>* dictionary
) {
1671 DecodeDict(dictionary
);
1673 auto dict_values
= reinterpret_cast<FLBA
*>(dictionary_
->mutable_data());
1675 int fixed_len
= descr_
->type_length();
1676 int total_size
= dictionary_length_
* fixed_len
;
1678 PARQUET_THROW_NOT_OK(byte_array_data_
->Resize(total_size
,
1679 /*shrink_to_fit=*/false));
1680 uint8_t* bytes_data
= byte_array_data_
->mutable_data();
1681 for (int32_t i
= 0, offset
= 0; i
< dictionary_length_
; ++i
, offset
+= fixed_len
) {
1682 memcpy(bytes_data
+ offset
, dict_values
[i
].ptr
, fixed_len
);
1683 dict_values
[i
].ptr
= bytes_data
+ offset
;
1688 inline int DictDecoderImpl
<Int96Type
>::DecodeArrow(
1689 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1690 typename EncodingTraits
<Int96Type
>::Accumulator
* builder
) {
1691 ParquetException::NYI("DecodeArrow to Int96Type");
1695 inline int DictDecoderImpl
<Int96Type
>::DecodeArrow(
1696 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1697 typename EncodingTraits
<Int96Type
>::DictAccumulator
* builder
) {
1698 ParquetException::NYI("DecodeArrow to Int96Type");
1702 inline int DictDecoderImpl
<ByteArrayType
>::DecodeArrow(
1703 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1704 typename EncodingTraits
<ByteArrayType
>::Accumulator
* builder
) {
1705 ParquetException::NYI("DecodeArrow implemented elsewhere");
1709 inline int DictDecoderImpl
<ByteArrayType
>::DecodeArrow(
1710 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1711 typename EncodingTraits
<ByteArrayType
>::DictAccumulator
* builder
) {
1712 ParquetException::NYI("DecodeArrow implemented elsewhere");
1715 template <typename DType
>
1716 int DictDecoderImpl
<DType
>::DecodeArrow(
1717 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1718 typename EncodingTraits
<DType
>::DictAccumulator
* builder
) {
1719 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
1721 auto dict_values
= reinterpret_cast<const typename
DType::c_type
*>(dictionary_
->data());
1723 VisitNullBitmapInline(
1724 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1727 if (ARROW_PREDICT_FALSE(!idx_decoder_
.Get(&index
))) {
1728 throw ParquetException("");
1730 PARQUET_THROW_NOT_OK(IndexInBounds(index
));
1731 PARQUET_THROW_NOT_OK(builder
->Append(dict_values
[index
]));
1733 [&]() { PARQUET_THROW_NOT_OK(builder
->AppendNull()); });
1735 return num_values
- null_count
;
1739 int DictDecoderImpl
<BooleanType
>::DecodeArrow(
1740 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1741 typename EncodingTraits
<BooleanType
>::DictAccumulator
* builder
) {
1742 ParquetException::NYI("No dictionary encoding for BooleanType");
1746 inline int DictDecoderImpl
<FLBAType
>::DecodeArrow(
1747 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1748 typename EncodingTraits
<FLBAType
>::Accumulator
* builder
) {
1749 if (builder
->byte_width() != descr_
->type_length()) {
1750 throw ParquetException("Byte width mismatch: builder was " +
1751 std::to_string(builder
->byte_width()) + " but decoder was " +
1752 std::to_string(descr_
->type_length()));
1755 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
1757 auto dict_values
= reinterpret_cast<const FLBA
*>(dictionary_
->data());
1759 VisitNullBitmapInline(
1760 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1763 if (ARROW_PREDICT_FALSE(!idx_decoder_
.Get(&index
))) {
1764 throw ParquetException("");
1766 PARQUET_THROW_NOT_OK(IndexInBounds(index
));
1767 builder
->UnsafeAppend(dict_values
[index
].ptr
);
1769 [&]() { builder
->UnsafeAppendNull(); });
1771 return num_values
- null_count
;
1775 int DictDecoderImpl
<FLBAType
>::DecodeArrow(
1776 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1777 typename EncodingTraits
<FLBAType
>::DictAccumulator
* builder
) {
1779 checked_cast
<const ::arrow::DictionaryType
&>(*builder
->type()).value_type();
1781 checked_cast
<const ::arrow::FixedSizeBinaryType
&>(*value_type
).byte_width();
1782 if (byte_width
!= descr_
->type_length()) {
1783 throw ParquetException("Byte width mismatch: builder was " +
1784 std::to_string(byte_width
) + " but decoder was " +
1785 std::to_string(descr_
->type_length()));
1788 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
1790 auto dict_values
= reinterpret_cast<const FLBA
*>(dictionary_
->data());
1792 VisitNullBitmapInline(
1793 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1796 if (ARROW_PREDICT_FALSE(!idx_decoder_
.Get(&index
))) {
1797 throw ParquetException("");
1799 PARQUET_THROW_NOT_OK(IndexInBounds(index
));
1800 PARQUET_THROW_NOT_OK(builder
->Append(dict_values
[index
].ptr
));
1802 [&]() { PARQUET_THROW_NOT_OK(builder
->AppendNull()); });
1804 return num_values
- null_count
;
1807 template <typename Type
>
1808 int DictDecoderImpl
<Type
>::DecodeArrow(
1809 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
1810 typename EncodingTraits
<Type
>::Accumulator
* builder
) {
1811 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
1813 using value_type
= typename
Type::c_type
;
1814 auto dict_values
= reinterpret_cast<const value_type
*>(dictionary_
->data());
1816 VisitNullBitmapInline(
1817 valid_bits
, valid_bits_offset
, num_values
, null_count
,
1820 if (ARROW_PREDICT_FALSE(!idx_decoder_
.Get(&index
))) {
1821 throw ParquetException("");
1823 PARQUET_THROW_NOT_OK(IndexInBounds(index
));
1824 builder
->UnsafeAppend(dict_values
[index
]);
1826 [&]() { builder
->UnsafeAppendNull(); });
1828 return num_values
- null_count
;
1831 template <typename Type
>
1832 void DictDecoderImpl
<Type
>::InsertDictionary(::arrow::ArrayBuilder
* builder
) {
1833 ParquetException::NYI("InsertDictionary only implemented for BYTE_ARRAY types");
1837 void DictDecoderImpl
<ByteArrayType
>::InsertDictionary(::arrow::ArrayBuilder
* builder
) {
1838 auto binary_builder
= checked_cast
<::arrow::BinaryDictionary32Builder
*>(builder
);
1840 // Make a BinaryArray referencing the internal dictionary data
1841 auto arr
= std::make_shared
<::arrow::BinaryArray
>(
1842 dictionary_length_
, byte_array_offsets_
, byte_array_data_
);
1843 PARQUET_THROW_NOT_OK(binary_builder
->InsertMemoValues(*arr
));
1846 class DictByteArrayDecoderImpl
: public DictDecoderImpl
<ByteArrayType
>,
1847 virtual public ByteArrayDecoder
{
1849 using BASE
= DictDecoderImpl
<ByteArrayType
>;
1850 using BASE::DictDecoderImpl
;
1852 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1853 int64_t valid_bits_offset
,
1854 ::arrow::BinaryDictionary32Builder
* builder
) override
{
1856 if (null_count
== 0) {
1857 PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values
, builder
, &result
));
1859 PARQUET_THROW_NOT_OK(DecodeArrow(num_values
, null_count
, valid_bits
,
1860 valid_bits_offset
, builder
, &result
));
1865 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1866 int64_t valid_bits_offset
,
1867 typename EncodingTraits
<ByteArrayType
>::Accumulator
* out
) override
{
1869 if (null_count
== 0) {
1870 PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values
, out
, &result
));
1872 PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values
, null_count
, valid_bits
,
1873 valid_bits_offset
, out
, &result
));
1879 Status
DecodeArrowDense(int num_values
, int null_count
, const uint8_t* valid_bits
,
1880 int64_t valid_bits_offset
,
1881 typename EncodingTraits
<ByteArrayType
>::Accumulator
* out
,
1882 int* out_num_values
) {
1883 constexpr int32_t kBufferSize
= 1024;
1884 int32_t indices
[kBufferSize
];
1886 ArrowBinaryHelper
helper(out
);
1888 ::arrow::internal::BitmapReader
bit_reader(valid_bits
, valid_bits_offset
, num_values
);
1890 auto dict_values
= reinterpret_cast<const ByteArray
*>(dictionary_
->data());
1891 int values_decoded
= 0;
1892 int num_appended
= 0;
1893 while (num_appended
< num_values
) {
1894 bool is_valid
= bit_reader
.IsSet();
1898 int32_t batch_size
=
1899 std::min
<int32_t>(kBufferSize
, num_values
- num_appended
- null_count
);
1900 int num_indices
= idx_decoder_
.GetBatch(indices
, batch_size
);
1902 if (ARROW_PREDICT_FALSE(num_indices
< 1)) {
1903 return Status::Invalid("Invalid number of indices '", num_indices
, "'");
1908 // Consume all indices
1910 auto idx
= indices
[i
];
1911 RETURN_NOT_OK(IndexInBounds(idx
));
1912 const auto& val
= dict_values
[idx
];
1913 if (ARROW_PREDICT_FALSE(!helper
.CanFit(val
.len
))) {
1914 RETURN_NOT_OK(helper
.PushChunk());
1916 RETURN_NOT_OK(helper
.Append(val
.ptr
, static_cast<int32_t>(val
.len
)));
1920 RETURN_NOT_OK(helper
.AppendNull());
1924 if (i
== num_indices
) {
1925 // Do not advance the bit_reader if we have fulfilled the decode
1929 is_valid
= bit_reader
.IsSet();
1933 RETURN_NOT_OK(helper
.AppendNull());
1938 *out_num_values
= values_decoded
;
1939 return Status::OK();
1942 Status
DecodeArrowDenseNonNull(int num_values
,
1943 typename EncodingTraits
<ByteArrayType
>::Accumulator
* out
,
1944 int* out_num_values
) {
1945 constexpr int32_t kBufferSize
= 2048;
1946 int32_t indices
[kBufferSize
];
1947 int values_decoded
= 0;
1949 ArrowBinaryHelper
helper(out
);
1950 auto dict_values
= reinterpret_cast<const ByteArray
*>(dictionary_
->data());
1952 while (values_decoded
< num_values
) {
1953 int32_t batch_size
= std::min
<int32_t>(kBufferSize
, num_values
- values_decoded
);
1954 int num_indices
= idx_decoder_
.GetBatch(indices
, batch_size
);
1955 if (num_indices
== 0) ParquetException::EofException();
1956 for (int i
= 0; i
< num_indices
; ++i
) {
1957 auto idx
= indices
[i
];
1958 RETURN_NOT_OK(IndexInBounds(idx
));
1959 const auto& val
= dict_values
[idx
];
1960 if (ARROW_PREDICT_FALSE(!helper
.CanFit(val
.len
))) {
1961 RETURN_NOT_OK(helper
.PushChunk());
1963 RETURN_NOT_OK(helper
.Append(val
.ptr
, static_cast<int32_t>(val
.len
)));
1965 values_decoded
+= num_indices
;
1967 *out_num_values
= values_decoded
;
1968 return Status::OK();
1971 template <typename BuilderType
>
1972 Status
DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
1973 int64_t valid_bits_offset
, BuilderType
* builder
,
1974 int* out_num_values
) {
1975 constexpr int32_t kBufferSize
= 1024;
1976 int32_t indices
[kBufferSize
];
1978 RETURN_NOT_OK(builder
->Reserve(num_values
));
1979 ::arrow::internal::BitmapReader
bit_reader(valid_bits
, valid_bits_offset
, num_values
);
1981 auto dict_values
= reinterpret_cast<const ByteArray
*>(dictionary_
->data());
1983 int values_decoded
= 0;
1984 int num_appended
= 0;
1985 while (num_appended
< num_values
) {
1986 bool is_valid
= bit_reader
.IsSet();
1990 int32_t batch_size
=
1991 std::min
<int32_t>(kBufferSize
, num_values
- num_appended
- null_count
);
1992 int num_indices
= idx_decoder_
.GetBatch(indices
, batch_size
);
1996 // Consume all indices
1998 auto idx
= indices
[i
];
1999 RETURN_NOT_OK(IndexInBounds(idx
));
2000 const auto& val
= dict_values
[idx
];
2001 RETURN_NOT_OK(builder
->Append(val
.ptr
, val
.len
));
2005 RETURN_NOT_OK(builder
->AppendNull());
2009 if (i
== num_indices
) {
2010 // Do not advance the bit_reader if we have fulfilled the decode
2014 is_valid
= bit_reader
.IsSet();
2018 RETURN_NOT_OK(builder
->AppendNull());
2023 *out_num_values
= values_decoded
;
2024 return Status::OK();
2027 template <typename BuilderType
>
2028 Status
DecodeArrowNonNull(int num_values
, BuilderType
* builder
, int* out_num_values
) {
2029 constexpr int32_t kBufferSize
= 2048;
2030 int32_t indices
[kBufferSize
];
2032 RETURN_NOT_OK(builder
->Reserve(num_values
));
2034 auto dict_values
= reinterpret_cast<const ByteArray
*>(dictionary_
->data());
2036 int values_decoded
= 0;
2037 while (values_decoded
< num_values
) {
2038 int32_t batch_size
= std::min
<int32_t>(kBufferSize
, num_values
- values_decoded
);
2039 int num_indices
= idx_decoder_
.GetBatch(indices
, batch_size
);
2040 if (num_indices
== 0) ParquetException::EofException();
2041 for (int i
= 0; i
< num_indices
; ++i
) {
2042 auto idx
= indices
[i
];
2043 RETURN_NOT_OK(IndexInBounds(idx
));
2044 const auto& val
= dict_values
[idx
];
2045 RETURN_NOT_OK(builder
->Append(val
.ptr
, val
.len
));
2047 values_decoded
+= num_indices
;
2049 *out_num_values
= values_decoded
;
2050 return Status::OK();
2054 // ----------------------------------------------------------------------
2055 // DeltaBitPackDecoder
2057 template <typename DType
>
2058 class DeltaBitPackDecoder
: public DecoderImpl
, virtual public TypedDecoder
<DType
> {
2060 typedef typename
DType::c_type T
;
2062 explicit DeltaBitPackDecoder(const ColumnDescriptor
* descr
,
2063 MemoryPool
* pool
= ::arrow::default_memory_pool())
2064 : DecoderImpl(descr
, Encoding::DELTA_BINARY_PACKED
), pool_(pool
) {
2065 if (DType::type_num
!= Type::INT32
&& DType::type_num
!= Type::INT64
) {
2066 throw ParquetException("Delta bit pack encoding should only be for integer data.");
2070 void SetData(int num_values
, const uint8_t* data
, int len
) override
{
2071 this->num_values_
= num_values
;
2072 decoder_
= ::arrow::BitUtil::BitReader(data
, len
);
2076 int Decode(T
* buffer
, int max_values
) override
{
2077 return GetInternal(buffer
, max_values
);
2080 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
2081 int64_t valid_bits_offset
,
2082 typename EncodingTraits
<DType
>::Accumulator
* out
) override
{
2083 if (null_count
!= 0) {
2084 ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
2086 std::vector
<T
> values(num_values
);
2087 GetInternal(values
.data(), num_values
);
2088 PARQUET_THROW_NOT_OK(out
->AppendValues(values
));
2092 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
2093 int64_t valid_bits_offset
,
2094 typename EncodingTraits
<DType
>::DictAccumulator
* out
) override
{
2095 if (null_count
!= 0) {
2096 ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
2098 std::vector
<T
> values(num_values
);
2099 GetInternal(values
.data(), num_values
);
2100 PARQUET_THROW_NOT_OK(out
->Reserve(num_values
));
2101 for (T value
: values
) {
2102 PARQUET_THROW_NOT_OK(out
->Append(value
));
2108 static constexpr int kMaxDeltaBitWidth
= static_cast<int>(sizeof(T
) * 8);
2111 if (!decoder_
.GetVlqInt(&values_per_block_
) ||
2112 !decoder_
.GetVlqInt(&mini_blocks_per_block_
) ||
2113 !decoder_
.GetVlqInt(&total_value_count_
) ||
2114 !decoder_
.GetZigZagVlqInt(&last_value_
)) {
2115 ParquetException::EofException();
2118 if (values_per_block_
== 0) {
2119 throw ParquetException("cannot have zero value per block");
2121 if (mini_blocks_per_block_
== 0) {
2122 throw ParquetException("cannot have zero miniblock per block");
2124 values_per_mini_block_
= values_per_block_
/ mini_blocks_per_block_
;
2125 if (values_per_mini_block_
== 0) {
2126 throw ParquetException("cannot have zero value per miniblock");
2128 if (values_per_mini_block_
% 32 != 0) {
2129 throw ParquetException(
2130 "the number of values in a miniblock must be multiple of 32, but it's " +
2131 std::to_string(values_per_mini_block_
));
2134 delta_bit_widths_
= AllocateBuffer(pool_
, mini_blocks_per_block_
);
2135 block_initialized_
= false;
2136 values_current_mini_block_
= 0;
2140 if (!decoder_
.GetZigZagVlqInt(&min_delta_
)) ParquetException::EofException();
2142 // read the bitwidth of each miniblock
2143 uint8_t* bit_width_data
= delta_bit_widths_
->mutable_data();
2144 for (uint32_t i
= 0; i
< mini_blocks_per_block_
; ++i
) {
2145 if (!decoder_
.GetAligned
<uint8_t>(1, bit_width_data
+ i
)) {
2146 ParquetException::EofException();
2148 if (bit_width_data
[i
] > kMaxDeltaBitWidth
) {
2149 throw ParquetException("delta bit width larger than integer bit width");
2152 mini_block_idx_
= 0;
2153 delta_bit_width_
= bit_width_data
[0];
2154 values_current_mini_block_
= values_per_mini_block_
;
2155 block_initialized_
= true;
2158 int GetInternal(T
* buffer
, int max_values
) {
2159 max_values
= std::min(max_values
, this->num_values_
);
2160 DCHECK_LE(static_cast<uint32_t>(max_values
), total_value_count_
);
2162 while (i
< max_values
) {
2163 if (ARROW_PREDICT_FALSE(values_current_mini_block_
== 0)) {
2164 if (ARROW_PREDICT_FALSE(!block_initialized_
)) {
2165 buffer
[i
++] = last_value_
;
2166 --total_value_count_
;
2167 if (ARROW_PREDICT_FALSE(i
== max_values
)) break;
2171 if (mini_block_idx_
< mini_blocks_per_block_
) {
2172 delta_bit_width_
= delta_bit_widths_
->data()[mini_block_idx_
];
2173 values_current_mini_block_
= values_per_mini_block_
;
2181 std::min(values_current_mini_block_
, static_cast<uint32_t>(max_values
- i
));
2182 if (decoder_
.GetBatch(delta_bit_width_
, buffer
+ i
, values_decode
) !=
2184 ParquetException::EofException();
2186 for (int j
= 0; j
< values_decode
; ++j
) {
2187 // Addition between min_delta, packed int and last_value should be treated as
2188 // unsigned addtion. Overflow is as expected.
2190 static_cast<uint64_t>(min_delta_
) + static_cast<uint64_t>(buffer
[i
+ j
]);
2191 buffer
[i
+ j
] = static_cast<T
>(delta
+ static_cast<uint64_t>(last_value_
));
2192 last_value_
= buffer
[i
+ j
];
2194 values_current_mini_block_
-= values_decode
;
2195 total_value_count_
-= values_decode
;
2198 this->num_values_
-= max_values
;
2203 ::arrow::BitUtil::BitReader decoder_
;
2204 uint32_t values_per_block_
;
2205 uint32_t mini_blocks_per_block_
;
2206 uint32_t values_per_mini_block_
;
2207 uint32_t values_current_mini_block_
;
2208 uint32_t total_value_count_
;
2210 bool block_initialized_
;
2212 uint32_t mini_block_idx_
;
2213 std::shared_ptr
<ResizableBuffer
> delta_bit_widths_
;
2214 int delta_bit_width_
;
2219 // ----------------------------------------------------------------------
2220 // DELTA_LENGTH_BYTE_ARRAY
2222 class DeltaLengthByteArrayDecoder
: public DecoderImpl
,
2223 virtual public TypedDecoder
<ByteArrayType
> {
2225 explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor
* descr
,
2226 MemoryPool
* pool
= ::arrow::default_memory_pool())
2227 : DecoderImpl(descr
, Encoding::DELTA_LENGTH_BYTE_ARRAY
),
2228 len_decoder_(nullptr, pool
),
2231 void SetData(int num_values
, const uint8_t* data
, int len
) override
{
2232 num_values_
= num_values
;
2233 if (len
== 0) return;
2234 int total_lengths_len
= ::arrow::util::SafeLoadAs
<int32_t>(data
);
2236 this->len_decoder_
.SetData(num_values
, data
, total_lengths_len
);
2237 data_
= data
+ total_lengths_len
;
2238 this->len_
= len
- 4 - total_lengths_len
;
2241 int Decode(ByteArray
* buffer
, int max_values
) override
{
2242 using VectorT
= ArrowPoolVector
<int>;
2243 max_values
= std::min(max_values
, num_values_
);
2244 VectorT
lengths(max_values
, 0, ::arrow::stl::allocator
<int>(pool_
));
2245 len_decoder_
.Decode(lengths
.data(), max_values
);
2246 for (int i
= 0; i
< max_values
; ++i
) {
2247 buffer
[i
].len
= lengths
[i
];
2248 buffer
[i
].ptr
= data_
;
2249 this->data_
+= lengths
[i
];
2250 this->len_
-= lengths
[i
];
2252 this->num_values_
-= max_values
;
2256 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
2257 int64_t valid_bits_offset
,
2258 typename EncodingTraits
<ByteArrayType
>::Accumulator
* out
) override
{
2259 ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
2262 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
2263 int64_t valid_bits_offset
,
2264 typename EncodingTraits
<ByteArrayType
>::DictAccumulator
* out
) override
{
2265 ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
2269 DeltaBitPackDecoder
<Int32Type
> len_decoder_
;
2270 ::arrow::MemoryPool
* pool_
;
2273 // ----------------------------------------------------------------------
2276 class DeltaByteArrayDecoder
: public DecoderImpl
,
2277 virtual public TypedDecoder
<ByteArrayType
> {
2279 explicit DeltaByteArrayDecoder(const ColumnDescriptor
* descr
,
2280 MemoryPool
* pool
= ::arrow::default_memory_pool())
2281 : DecoderImpl(descr
, Encoding::DELTA_BYTE_ARRAY
),
2282 prefix_len_decoder_(nullptr, pool
),
2283 suffix_decoder_(nullptr, pool
),
2284 last_value_(0, nullptr) {}
2286 virtual void SetData(int num_values
, const uint8_t* data
, int len
) {
2287 num_values_
= num_values
;
2288 if (len
== 0) return;
2289 int prefix_len_length
= ::arrow::util::SafeLoadAs
<int32_t>(data
);
2292 prefix_len_decoder_
.SetData(num_values
, data
, prefix_len_length
);
2293 data
+= prefix_len_length
;
2294 len
-= prefix_len_length
;
2295 suffix_decoder_
.SetData(num_values
, data
, len
);
2298 // TODO: this doesn't work and requires memory management. We need to allocate
2299 // new strings to store the results.
2300 virtual int Decode(ByteArray
* buffer
, int max_values
) {
2301 max_values
= std::min(max_values
, this->num_values_
);
2302 for (int i
= 0; i
< max_values
; ++i
) {
2304 prefix_len_decoder_
.Decode(&prefix_len
, 1);
2305 ByteArray suffix
= {0, nullptr};
2306 suffix_decoder_
.Decode(&suffix
, 1);
2307 buffer
[i
].len
= prefix_len
+ suffix
.len
;
2309 uint8_t* result
= reinterpret_cast<uint8_t*>(malloc(buffer
[i
].len
));
2310 memcpy(result
, last_value_
.ptr
, prefix_len
);
2311 memcpy(result
+ prefix_len
, suffix
.ptr
, suffix
.len
);
2313 buffer
[i
].ptr
= result
;
2314 last_value_
= buffer
[i
];
2316 this->num_values_
-= max_values
;
2321 DeltaBitPackDecoder
<Int32Type
> prefix_len_decoder_
;
2322 DeltaLengthByteArrayDecoder suffix_decoder_
;
2323 ByteArray last_value_
;
2326 // ----------------------------------------------------------------------
2327 // BYTE_STREAM_SPLIT
2329 template <typename DType
>
2330 class ByteStreamSplitDecoder
: public DecoderImpl
, virtual public TypedDecoder
<DType
> {
2332 using T
= typename
DType::c_type
;
2333 explicit ByteStreamSplitDecoder(const ColumnDescriptor
* descr
);
2335 int Decode(T
* buffer
, int max_values
) override
;
2337 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
2338 int64_t valid_bits_offset
,
2339 typename EncodingTraits
<DType
>::Accumulator
* builder
) override
;
2341 int DecodeArrow(int num_values
, int null_count
, const uint8_t* valid_bits
,
2342 int64_t valid_bits_offset
,
2343 typename EncodingTraits
<DType
>::DictAccumulator
* builder
) override
;
2345 void SetData(int num_values
, const uint8_t* data
, int len
) override
;
2347 T
* EnsureDecodeBuffer(int64_t min_values
) {
2348 const int64_t size
= sizeof(T
) * min_values
;
2349 if (!decode_buffer_
|| decode_buffer_
->size() < size
) {
2350 PARQUET_ASSIGN_OR_THROW(decode_buffer_
, ::arrow::AllocateBuffer(size
));
2352 return reinterpret_cast<T
*>(decode_buffer_
->mutable_data());
2356 int num_values_in_buffer_
{0};
2357 std::shared_ptr
<Buffer
> decode_buffer_
;
2359 static constexpr size_t kNumStreams
= sizeof(T
);
2362 template <typename DType
>
2363 ByteStreamSplitDecoder
<DType
>::ByteStreamSplitDecoder(const ColumnDescriptor
* descr
)
2364 : DecoderImpl(descr
, Encoding::BYTE_STREAM_SPLIT
) {}
2366 template <typename DType
>
2367 void ByteStreamSplitDecoder
<DType
>::SetData(int num_values
, const uint8_t* data
,
2369 DecoderImpl::SetData(num_values
, data
, len
);
2370 if (num_values
* static_cast<int64_t>(sizeof(T
)) > len
) {
2371 throw ParquetException("Data size too small for number of values (corrupted file?)");
2373 num_values_in_buffer_
= num_values
;
2376 template <typename DType
>
2377 int ByteStreamSplitDecoder
<DType
>::Decode(T
* buffer
, int max_values
) {
2378 const int values_to_decode
= std::min(num_values_
, max_values
);
2379 const int num_decoded_previously
= num_values_in_buffer_
- num_values_
;
2380 const uint8_t* data
= data_
+ num_decoded_previously
;
2382 ::arrow::util::internal::ByteStreamSplitDecode
<T
>(data
, values_to_decode
,
2383 num_values_in_buffer_
, buffer
);
2384 num_values_
-= values_to_decode
;
2385 len_
-= sizeof(T
) * values_to_decode
;
2386 return values_to_decode
;
2389 template <typename DType
>
2390 int ByteStreamSplitDecoder
<DType
>::DecodeArrow(
2391 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
2392 typename EncodingTraits
<DType
>::Accumulator
* builder
) {
2393 constexpr int value_size
= static_cast<int>(kNumStreams
);
2394 int values_decoded
= num_values
- null_count
;
2395 if (ARROW_PREDICT_FALSE(len_
< value_size
* values_decoded
)) {
2396 ParquetException::EofException();
2399 PARQUET_THROW_NOT_OK(builder
->Reserve(num_values
));
2401 const int num_decoded_previously
= num_values_in_buffer_
- num_values_
;
2402 const uint8_t* data
= data_
+ num_decoded_previously
;
2405 #if defined(ARROW_HAVE_SIMD_SPLIT)
2406 // Use fast decoding into intermediate buffer. This will also decode
2407 // some null values, but it's fast enough that we don't care.
2408 T
* decode_out
= EnsureDecodeBuffer(values_decoded
);
2409 ::arrow::util::internal::ByteStreamSplitDecode
<T
>(data
, values_decoded
,
2410 num_values_in_buffer_
, decode_out
);
2412 // XXX If null_count is 0, we could even append in bulk or decode directly into
2414 VisitNullBitmapInline(
2415 valid_bits
, valid_bits_offset
, num_values
, null_count
,
2417 builder
->UnsafeAppend(decode_out
[offset
]);
2420 [&]() { builder
->UnsafeAppendNull(); });
2423 VisitNullBitmapInline(
2424 valid_bits
, valid_bits_offset
, num_values
, null_count
,
2426 uint8_t gathered_byte_data
[kNumStreams
];
2427 for (size_t b
= 0; b
< kNumStreams
; ++b
) {
2428 const size_t byte_index
= b
* num_values_in_buffer_
+ offset
;
2429 gathered_byte_data
[b
] = data
[byte_index
];
2431 builder
->UnsafeAppend(::arrow::util::SafeLoadAs
<T
>(&gathered_byte_data
[0]));
2434 [&]() { builder
->UnsafeAppendNull(); });
2437 num_values_
-= values_decoded
;
2438 len_
-= sizeof(T
) * values_decoded
;
2439 return values_decoded
;
2442 template <typename DType
>
2443 int ByteStreamSplitDecoder
<DType
>::DecodeArrow(
2444 int num_values
, int null_count
, const uint8_t* valid_bits
, int64_t valid_bits_offset
,
2445 typename EncodingTraits
<DType
>::DictAccumulator
* builder
) {
2446 ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
2451 // ----------------------------------------------------------------------
2452 // Encoder and decoder factory functions
2454 std::unique_ptr
<Encoder
> MakeEncoder(Type::type type_num
, Encoding::type encoding
,
2455 bool use_dictionary
, const ColumnDescriptor
* descr
,
2457 if (use_dictionary
) {
2460 return std::unique_ptr
<Encoder
>(new DictEncoderImpl
<Int32Type
>(descr
, pool
));
2462 return std::unique_ptr
<Encoder
>(new DictEncoderImpl
<Int64Type
>(descr
, pool
));
2464 return std::unique_ptr
<Encoder
>(new DictEncoderImpl
<Int96Type
>(descr
, pool
));
2466 return std::unique_ptr
<Encoder
>(new DictEncoderImpl
<FloatType
>(descr
, pool
));
2468 return std::unique_ptr
<Encoder
>(new DictEncoderImpl
<DoubleType
>(descr
, pool
));
2469 case Type::BYTE_ARRAY
:
2470 return std::unique_ptr
<Encoder
>(new DictEncoderImpl
<ByteArrayType
>(descr
, pool
));
2471 case Type::FIXED_LEN_BYTE_ARRAY
:
2472 return std::unique_ptr
<Encoder
>(new DictEncoderImpl
<FLBAType
>(descr
, pool
));
2474 DCHECK(false) << "Encoder not implemented";
2477 } else if (encoding
== Encoding::PLAIN
) {
2480 return std::unique_ptr
<Encoder
>(new PlainEncoder
<BooleanType
>(descr
, pool
));
2482 return std::unique_ptr
<Encoder
>(new PlainEncoder
<Int32Type
>(descr
, pool
));
2484 return std::unique_ptr
<Encoder
>(new PlainEncoder
<Int64Type
>(descr
, pool
));
2486 return std::unique_ptr
<Encoder
>(new PlainEncoder
<Int96Type
>(descr
, pool
));
2488 return std::unique_ptr
<Encoder
>(new PlainEncoder
<FloatType
>(descr
, pool
));
2490 return std::unique_ptr
<Encoder
>(new PlainEncoder
<DoubleType
>(descr
, pool
));
2491 case Type::BYTE_ARRAY
:
2492 return std::unique_ptr
<Encoder
>(new PlainEncoder
<ByteArrayType
>(descr
, pool
));
2493 case Type::FIXED_LEN_BYTE_ARRAY
:
2494 return std::unique_ptr
<Encoder
>(new PlainEncoder
<FLBAType
>(descr
, pool
));
2496 DCHECK(false) << "Encoder not implemented";
2499 } else if (encoding
== Encoding::BYTE_STREAM_SPLIT
) {
2502 return std::unique_ptr
<Encoder
>(
2503 new ByteStreamSplitEncoder
<FloatType
>(descr
, pool
));
2505 return std::unique_ptr
<Encoder
>(
2506 new ByteStreamSplitEncoder
<DoubleType
>(descr
, pool
));
2508 throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
2512 ParquetException::NYI("Selected encoding is not supported");
2514 DCHECK(false) << "Should not be able to reach this code";
2518 std::unique_ptr
<Decoder
> MakeDecoder(Type::type type_num
, Encoding::type encoding
,
2519 const ColumnDescriptor
* descr
) {
2520 if (encoding
== Encoding::PLAIN
) {
2523 return std::unique_ptr
<Decoder
>(new PlainBooleanDecoder(descr
));
2525 return std::unique_ptr
<Decoder
>(new PlainDecoder
<Int32Type
>(descr
));
2527 return std::unique_ptr
<Decoder
>(new PlainDecoder
<Int64Type
>(descr
));
2529 return std::unique_ptr
<Decoder
>(new PlainDecoder
<Int96Type
>(descr
));
2531 return std::unique_ptr
<Decoder
>(new PlainDecoder
<FloatType
>(descr
));
2533 return std::unique_ptr
<Decoder
>(new PlainDecoder
<DoubleType
>(descr
));
2534 case Type::BYTE_ARRAY
:
2535 return std::unique_ptr
<Decoder
>(new PlainByteArrayDecoder(descr
));
2536 case Type::FIXED_LEN_BYTE_ARRAY
:
2537 return std::unique_ptr
<Decoder
>(new PlainFLBADecoder(descr
));
2541 } else if (encoding
== Encoding::BYTE_STREAM_SPLIT
) {
2544 return std::unique_ptr
<Decoder
>(new ByteStreamSplitDecoder
<FloatType
>(descr
));
2546 return std::unique_ptr
<Decoder
>(new ByteStreamSplitDecoder
<DoubleType
>(descr
));
2548 throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
2551 } else if (encoding
== Encoding::DELTA_BINARY_PACKED
) {
2554 return std::unique_ptr
<Decoder
>(new DeltaBitPackDecoder
<Int32Type
>(descr
));
2556 return std::unique_ptr
<Decoder
>(new DeltaBitPackDecoder
<Int64Type
>(descr
));
2558 throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64");
2562 ParquetException::NYI("Selected encoding is not supported");
2564 DCHECK(false) << "Should not be able to reach this code";
2569 std::unique_ptr
<Decoder
> MakeDictDecoder(Type::type type_num
,
2570 const ColumnDescriptor
* descr
,
2574 ParquetException::NYI("Dictionary encoding not implemented for boolean type");
2576 return std::unique_ptr
<Decoder
>(new DictDecoderImpl
<Int32Type
>(descr
, pool
));
2578 return std::unique_ptr
<Decoder
>(new DictDecoderImpl
<Int64Type
>(descr
, pool
));
2580 return std::unique_ptr
<Decoder
>(new DictDecoderImpl
<Int96Type
>(descr
, pool
));
2582 return std::unique_ptr
<Decoder
>(new DictDecoderImpl
<FloatType
>(descr
, pool
));
2584 return std::unique_ptr
<Decoder
>(new DictDecoderImpl
<DoubleType
>(descr
, pool
));
2585 case Type::BYTE_ARRAY
:
2586 return std::unique_ptr
<Decoder
>(new DictByteArrayDecoderImpl(descr
, pool
));
2587 case Type::FIXED_LEN_BYTE_ARRAY
:
2588 return std::unique_ptr
<Decoder
>(new DictDecoderImpl
<FLBAType
>(descr
, pool
));
2592 DCHECK(false) << "Should not be able to reach this code";
2596 } // namespace detail
2597 } // namespace parquet