]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/parquet/encoding.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / parquet / encoding.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/encoding.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstdlib>
23#include <limits>
24#include <memory>
25#include <string>
26#include <utility>
27#include <vector>
28
29#include "arrow/array.h"
30#include "arrow/array/builder_dict.h"
31#include "arrow/stl_allocator.h"
32#include "arrow/type_traits.h"
33#include "arrow/util/bit_run_reader.h"
34#include "arrow/util/bit_stream_utils.h"
35#include "arrow/util/bit_util.h"
36#include "arrow/util/bitmap_ops.h"
37#include "arrow/util/bitmap_writer.h"
38#include "arrow/util/byte_stream_split.h"
39#include "arrow/util/checked_cast.h"
40#include "arrow/util/hashing.h"
41#include "arrow/util/logging.h"
42#include "arrow/util/rle_encoding.h"
43#include "arrow/util/ubsan.h"
44#include "arrow/visitor_inline.h"
45#include "parquet/exception.h"
46#include "parquet/platform.h"
47#include "parquet/schema.h"
48#include "parquet/types.h"
49
50namespace BitUtil = arrow::BitUtil;
51
52using arrow::Status;
53using arrow::VisitNullBitmapInline;
54using arrow::internal::checked_cast;
55
56template <typename T>
57using ArrowPoolVector = std::vector<T, ::arrow::stl::allocator<T>>;
58
59namespace parquet {
60namespace {
61
62constexpr int64_t kInMemoryDefaultCapacity = 1024;
63// The Parquet spec isn't very clear whether ByteArray lengths are signed or
64// unsigned, but the Java implementation uses signed ints.
65constexpr size_t kMaxByteArraySize = std::numeric_limits<int32_t>::max();
66
67class EncoderImpl : virtual public Encoder {
68 public:
69 EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool)
70 : descr_(descr),
71 encoding_(encoding),
72 pool_(pool),
73 type_length_(descr ? descr->type_length() : -1) {}
74
75 Encoding::type encoding() const override { return encoding_; }
76
77 MemoryPool* memory_pool() const override { return pool_; }
78
79 protected:
80 // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
81 const ColumnDescriptor* descr_;
82 const Encoding::type encoding_;
83 MemoryPool* pool_;
84
85 /// Type length from descr
86 int type_length_;
87};
88
89// ----------------------------------------------------------------------
90// Plain encoder implementation
91
92template <typename DType>
93class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
94 public:
95 using T = typename DType::c_type;
96
97 explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
98 : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}
99
100 int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
101
102 std::shared_ptr<Buffer> FlushValues() override {
103 std::shared_ptr<Buffer> buffer;
104 PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
105 return buffer;
106 }
107
108 using TypedEncoder<DType>::Put;
109
110 void Put(const T* buffer, int num_values) override;
111
112 void Put(const ::arrow::Array& values) override;
113
114 void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
115 int64_t valid_bits_offset) override {
116 if (valid_bits != NULLPTR) {
117 PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
118 this->memory_pool()));
119 T* data = reinterpret_cast<T*>(buffer->mutable_data());
120 int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
121 src, num_values, valid_bits, valid_bits_offset, data);
122 Put(data, num_valid_values);
123 } else {
124 Put(src, num_values);
125 }
126 }
127
128 void UnsafePutByteArray(const void* data, uint32_t length) {
129 DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
130 sink_.UnsafeAppend(&length, sizeof(uint32_t));
131 sink_.UnsafeAppend(data, static_cast<int64_t>(length));
132 }
133
134 void Put(const ByteArray& val) {
135 // Write the result to the output stream
136 const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t));
137 if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) {
138 PARQUET_THROW_NOT_OK(sink_.Reserve(increment));
139 }
140 UnsafePutByteArray(val.ptr, val.len);
141 }
142
143 protected:
144 template <typename ArrayType>
145 void PutBinaryArray(const ArrayType& array) {
146 const int64_t total_bytes =
147 array.value_offset(array.length()) - array.value_offset(0);
148 PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t)));
149
150 PARQUET_THROW_NOT_OK(::arrow::VisitArrayDataInline<typename ArrayType::TypeClass>(
151 *array.data(),
152 [&](::arrow::util::string_view view) {
153 if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
154 return Status::Invalid("Parquet cannot store strings with size 2GB or more");
155 }
156 UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
157 return Status::OK();
158 },
159 []() { return Status::OK(); }));
160 }
161
162 ::arrow::BufferBuilder sink_;
163};
164
165template <typename DType>
166void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
167 if (num_values > 0) {
168 PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
169 }
170}
171
172template <>
173inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
174 for (int i = 0; i < num_values; ++i) {
175 Put(src[i]);
176 }
177}
178
179template <typename ArrayType>
180void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) {
181 if (values.type_id() != ArrayType::TypeClass::type_id) {
182 std::string type_name = ArrayType::TypeClass::type_name();
183 throw ParquetException("direct put to " + type_name + " from " +
184 values.type()->ToString() + " not supported");
185 }
186
187 using value_type = typename ArrayType::value_type;
188 constexpr auto value_size = sizeof(value_type);
189 auto raw_values = checked_cast<const ArrayType&>(values).raw_values();
190
191 if (values.null_count() == 0) {
192 // no nulls, just dump the data
193 PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size));
194 } else {
195 PARQUET_THROW_NOT_OK(
196 sink->Reserve((values.length() - values.null_count()) * value_size));
197
198 for (int64_t i = 0; i < values.length(); i++) {
199 if (values.IsValid(i)) {
200 sink->UnsafeAppend(&raw_values[i], value_size);
201 }
202 }
203 }
204}
205
206template <>
207void PlainEncoder<Int32Type>::Put(const ::arrow::Array& values) {
208 DirectPutImpl<::arrow::Int32Array>(values, &sink_);
209}
210
211template <>
212void PlainEncoder<Int64Type>::Put(const ::arrow::Array& values) {
213 DirectPutImpl<::arrow::Int64Array>(values, &sink_);
214}
215
216template <>
217void PlainEncoder<Int96Type>::Put(const ::arrow::Array& values) {
218 ParquetException::NYI("direct put to Int96");
219}
220
221template <>
222void PlainEncoder<FloatType>::Put(const ::arrow::Array& values) {
223 DirectPutImpl<::arrow::FloatArray>(values, &sink_);
224}
225
226template <>
227void PlainEncoder<DoubleType>::Put(const ::arrow::Array& values) {
228 DirectPutImpl<::arrow::DoubleArray>(values, &sink_);
229}
230
231template <typename DType>
232void PlainEncoder<DType>::Put(const ::arrow::Array& values) {
233 ParquetException::NYI("direct put of " + values.type()->ToString());
234}
235
236void AssertBaseBinary(const ::arrow::Array& values) {
237 if (!::arrow::is_base_binary_like(values.type_id())) {
238 throw ParquetException("Only BaseBinaryArray and subclasses supported");
239 }
240}
241
242template <>
243inline void PlainEncoder<ByteArrayType>::Put(const ::arrow::Array& values) {
244 AssertBaseBinary(values);
245
246 if (::arrow::is_binary_like(values.type_id())) {
247 PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
248 } else {
249 DCHECK(::arrow::is_large_binary_like(values.type_id()));
250 PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
251 }
252}
253
254void AssertFixedSizeBinary(const ::arrow::Array& values, int type_length) {
255 if (values.type_id() != ::arrow::Type::FIXED_SIZE_BINARY &&
256 values.type_id() != ::arrow::Type::DECIMAL) {
257 throw ParquetException("Only FixedSizeBinaryArray and subclasses supported");
258 }
259 if (checked_cast<const ::arrow::FixedSizeBinaryType&>(*values.type()).byte_width() !=
260 type_length) {
261 throw ParquetException("Size mismatch: " + values.type()->ToString() +
262 " should have been " + std::to_string(type_length) + " wide");
263 }
264}
265
266template <>
267inline void PlainEncoder<FLBAType>::Put(const ::arrow::Array& values) {
268 AssertFixedSizeBinary(values, descr_->type_length());
269 const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
270
271 if (data.null_count() == 0) {
272 // no nulls, just dump the data
273 PARQUET_THROW_NOT_OK(
274 sink_.Append(data.raw_values(), data.length() * data.byte_width()));
275 } else {
276 const int64_t total_bytes =
277 data.length() * data.byte_width() - data.null_count() * data.byte_width();
278 PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
279 for (int64_t i = 0; i < data.length(); i++) {
280 if (data.IsValid(i)) {
281 sink_.UnsafeAppend(data.Value(i), data.byte_width());
282 }
283 }
284 }
285}
286
287template <>
288inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
289 if (descr_->type_length() == 0) {
290 return;
291 }
292 for (int i = 0; i < num_values; ++i) {
293 // Write the result to the output stream
294 DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
295 PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
296 }
297}
298
299template <>
300class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEncoder {
301 public:
302 explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
303 : EncoderImpl(descr, Encoding::PLAIN, pool),
304 bits_available_(kInMemoryDefaultCapacity * 8),
305 bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
306 sink_(pool),
307 bit_writer_(bits_buffer_->mutable_data(),
308 static_cast<int>(bits_buffer_->size())) {}
309
310 int64_t EstimatedDataEncodedSize() override;
311 std::shared_ptr<Buffer> FlushValues() override;
312
313 void Put(const bool* src, int num_values) override;
314
315 void Put(const std::vector<bool>& src, int num_values) override;
316
317 void PutSpaced(const bool* src, int num_values, const uint8_t* valid_bits,
318 int64_t valid_bits_offset) override {
319 if (valid_bits != NULLPTR) {
320 PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
321 this->memory_pool()));
322 T* data = reinterpret_cast<T*>(buffer->mutable_data());
323 int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
324 src, num_values, valid_bits, valid_bits_offset, data);
325 Put(data, num_valid_values);
326 } else {
327 Put(src, num_values);
328 }
329 }
330
331 void Put(const ::arrow::Array& values) override {
332 if (values.type_id() != ::arrow::Type::BOOL) {
333 throw ParquetException("direct put to boolean from " + values.type()->ToString() +
334 " not supported");
335 }
336
337 const auto& data = checked_cast<const ::arrow::BooleanArray&>(values);
338 if (data.null_count() == 0) {
339 PARQUET_THROW_NOT_OK(sink_.Reserve(BitUtil::BytesForBits(data.length())));
340 // no nulls, just dump the data
341 ::arrow::internal::CopyBitmap(data.data()->GetValues<uint8_t>(1), data.offset(),
342 data.length(), sink_.mutable_data(), sink_.length());
343 } else {
344 auto n_valid = BitUtil::BytesForBits(data.length() - data.null_count());
345 PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid));
346 ::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(),
347 sink_.length(), n_valid);
348
349 for (int64_t i = 0; i < data.length(); i++) {
350 if (data.IsValid(i)) {
351 if (data.Value(i)) {
352 writer.Set();
353 } else {
354 writer.Clear();
355 }
356 writer.Next();
357 }
358 }
359 writer.Finish();
360 }
361 sink_.UnsafeAdvance(data.length());
362 }
363
364 private:
365 int bits_available_;
366 std::shared_ptr<ResizableBuffer> bits_buffer_;
367 ::arrow::BufferBuilder sink_;
368 ::arrow::BitUtil::BitWriter bit_writer_;
369
370 template <typename SequenceType>
371 void PutImpl(const SequenceType& src, int num_values);
372};
373
374template <typename SequenceType>
375void PlainEncoder<BooleanType>::PutImpl(const SequenceType& src, int num_values) {
376 int bit_offset = 0;
377 if (bits_available_ > 0) {
378 int bits_to_write = std::min(bits_available_, num_values);
379 for (int i = 0; i < bits_to_write; i++) {
380 bit_writer_.PutValue(src[i], 1);
381 }
382 bits_available_ -= bits_to_write;
383 bit_offset = bits_to_write;
384
385 if (bits_available_ == 0) {
386 bit_writer_.Flush();
387 PARQUET_THROW_NOT_OK(
388 sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
389 bit_writer_.Clear();
390 }
391 }
392
393 int bits_remaining = num_values - bit_offset;
394 while (bit_offset < num_values) {
395 bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
396
397 int bits_to_write = std::min(bits_available_, bits_remaining);
398 for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {
399 bit_writer_.PutValue(src[i], 1);
400 }
401 bit_offset += bits_to_write;
402 bits_available_ -= bits_to_write;
403 bits_remaining -= bits_to_write;
404
405 if (bits_available_ == 0) {
406 bit_writer_.Flush();
407 PARQUET_THROW_NOT_OK(
408 sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
409 bit_writer_.Clear();
410 }
411 }
412}
413
414int64_t PlainEncoder<BooleanType>::EstimatedDataEncodedSize() {
415 int64_t position = sink_.length();
416 return position + bit_writer_.bytes_written();
417}
418
419std::shared_ptr<Buffer> PlainEncoder<BooleanType>::FlushValues() {
420 if (bits_available_ > 0) {
421 bit_writer_.Flush();
422 PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
423 bit_writer_.Clear();
424 bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
425 }
426
427 std::shared_ptr<Buffer> buffer;
428 PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
429 return buffer;
430}
431
432void PlainEncoder<BooleanType>::Put(const bool* src, int num_values) {
433 PutImpl(src, num_values);
434}
435
436void PlainEncoder<BooleanType>::Put(const std::vector<bool>& src, int num_values) {
437 PutImpl(src, num_values);
438}
439
440// ----------------------------------------------------------------------
441// DictEncoder<T> implementations
442
443template <typename DType>
444struct DictEncoderTraits {
445 using c_type = typename DType::c_type;
446 using MemoTableType = ::arrow::internal::ScalarMemoTable<c_type>;
447};
448
449template <>
450struct DictEncoderTraits<ByteArrayType> {
451 using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>;
452};
453
454template <>
455struct DictEncoderTraits<FLBAType> {
456 using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>;
457};
458
459// Initially 1024 elements
460static constexpr int32_t kInitialHashTableSize = 1 << 10;
461
462/// See the dictionary encoding section of
463/// https://github.com/Parquet/parquet-format. The encoding supports
464/// streaming encoding. Values are encoded as they are added while the
465/// dictionary is being constructed. At any time, the buffered values
466/// can be written out with the current dictionary size. More values
467/// can then be added to the encoder, including new dictionary
468/// entries.
469template <typename DType>
470class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
471 using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType;
472
473 public:
474 typedef typename DType::c_type T;
475
476 explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool)
477 : EncoderImpl(desc, Encoding::PLAIN_DICTIONARY, pool),
478 buffered_indices_(::arrow::stl::allocator<int32_t>(pool)),
479 dict_encoded_size_(0),
480 memo_table_(pool, kInitialHashTableSize) {}
481
482 ~DictEncoderImpl() override { DCHECK(buffered_indices_.empty()); }
483
484 int dict_encoded_size() override { return dict_encoded_size_; }
485
486 int WriteIndices(uint8_t* buffer, int buffer_len) override {
487 // Write bit width in first byte
488 *buffer = static_cast<uint8_t>(bit_width());
489 ++buffer;
490 --buffer_len;
491
492 ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
493
494 for (int32_t index : buffered_indices_) {
495 if (!encoder.Put(index)) return -1;
496 }
497 encoder.Flush();
498
499 ClearIndices();
500 return 1 + encoder.len();
501 }
502
503 void set_type_length(int type_length) { this->type_length_ = type_length; }
504
505 /// Returns a conservative estimate of the number of bytes needed to encode the buffered
506 /// indices. Used to size the buffer passed to WriteIndices().
507 int64_t EstimatedDataEncodedSize() override {
508 // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
509 // reserve
510 // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
511 // but not reserving them would cause the encoder to fail.
512 return 1 +
513 ::arrow::util::RleEncoder::MaxBufferSize(
514 bit_width(), static_cast<int>(buffered_indices_.size())) +
515 ::arrow::util::RleEncoder::MinBufferSize(bit_width());
516 }
517
518 /// The minimum bit width required to encode the currently buffered indices.
519 int bit_width() const override {
520 if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0;
521 if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1;
522 return BitUtil::Log2(num_entries());
523 }
524
525 /// Encode value. Note that this does not actually write any data, just
526 /// buffers the value's index to be written later.
527 inline void Put(const T& value);
528
529 // Not implemented for other data types
530 inline void PutByteArray(const void* ptr, int32_t length);
531
532 void Put(const T* src, int num_values) override {
533 for (int32_t i = 0; i < num_values; i++) {
534 Put(src[i]);
535 }
536 }
537
538 void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
539 int64_t valid_bits_offset) override {
540 ::arrow::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, num_values,
541 [&](int64_t position, int64_t length) {
542 for (int64_t i = 0; i < length; i++) {
543 Put(src[i + position]);
544 }
545 });
546 }
547
548 using TypedEncoder<DType>::Put;
549
550 void Put(const ::arrow::Array& values) override;
551 void PutDictionary(const ::arrow::Array& values) override;
552
553 template <typename ArrowType, typename T = typename ArrowType::c_type>
554 void PutIndicesTyped(const ::arrow::Array& data) {
555 auto values = data.data()->GetValues<T>(1);
556 size_t buffer_position = buffered_indices_.size();
557 buffered_indices_.resize(buffer_position +
558 static_cast<size_t>(data.length() - data.null_count()));
559 ::arrow::internal::VisitSetBitRunsVoid(
560 data.null_bitmap_data(), data.offset(), data.length(),
561 [&](int64_t position, int64_t length) {
562 for (int64_t i = 0; i < length; ++i) {
563 buffered_indices_[buffer_position++] =
564 static_cast<int32_t>(values[i + position]);
565 }
566 });
567 }
568
569 void PutIndices(const ::arrow::Array& data) override {
570 switch (data.type()->id()) {
571 case ::arrow::Type::UINT8:
572 case ::arrow::Type::INT8:
573 return PutIndicesTyped<::arrow::UInt8Type>(data);
574 case ::arrow::Type::UINT16:
575 case ::arrow::Type::INT16:
576 return PutIndicesTyped<::arrow::UInt16Type>(data);
577 case ::arrow::Type::UINT32:
578 case ::arrow::Type::INT32:
579 return PutIndicesTyped<::arrow::UInt32Type>(data);
580 case ::arrow::Type::UINT64:
581 case ::arrow::Type::INT64:
582 return PutIndicesTyped<::arrow::UInt64Type>(data);
583 default:
584 throw ParquetException("Passed non-integer array to PutIndices");
585 }
586 }
587
588 std::shared_ptr<Buffer> FlushValues() override {
589 std::shared_ptr<ResizableBuffer> buffer =
590 AllocateBuffer(this->pool_, EstimatedDataEncodedSize());
591 int result_size = WriteIndices(buffer->mutable_data(),
592 static_cast<int>(EstimatedDataEncodedSize()));
593 PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
594 return std::move(buffer);
595 }
596
597 /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
598 /// dict_encoded_size() bytes.
599 void WriteDict(uint8_t* buffer) override;
600
601 /// The number of entries in the dictionary.
602 int num_entries() const override { return memo_table_.size(); }
603
604 private:
605 /// Clears all the indices (but leaves the dictionary).
606 void ClearIndices() { buffered_indices_.clear(); }
607
608 /// Indices that have not yet be written out by WriteIndices().
609 ArrowPoolVector<int32_t> buffered_indices_;
610
611 template <typename ArrayType>
612 void PutBinaryArray(const ArrayType& array) {
613 PARQUET_THROW_NOT_OK(::arrow::VisitArrayDataInline<typename ArrayType::TypeClass>(
614 *array.data(),
615 [&](::arrow::util::string_view view) {
616 if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
617 return Status::Invalid("Parquet cannot store strings with size 2GB or more");
618 }
619 PutByteArray(view.data(), static_cast<uint32_t>(view.size()));
620 return Status::OK();
621 },
622 []() { return Status::OK(); }));
623 }
624
625 template <typename ArrayType>
626 void PutBinaryDictionaryArray(const ArrayType& array) {
627 DCHECK_EQ(array.null_count(), 0);
628 for (int64_t i = 0; i < array.length(); i++) {
629 auto v = array.GetView(i);
630 if (ARROW_PREDICT_FALSE(v.size() > kMaxByteArraySize)) {
631 throw ParquetException("Parquet cannot store strings with size 2GB or more");
632 }
633 dict_encoded_size_ += static_cast<int>(v.size() + sizeof(uint32_t));
634 int32_t unused_memo_index;
635 PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(
636 v.data(), static_cast<int32_t>(v.size()), &unused_memo_index));
637 }
638 }
639
640 /// The number of bytes needed to encode the dictionary.
641 int dict_encoded_size_;
642
643 MemoTableType memo_table_;
644};
645
646template <typename DType>
647void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) {
648 // For primitive types, only a memcpy
649 DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
650 memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
651}
652
653// ByteArray and FLBA already have the dictionary encoded in their data heaps
654template <>
655void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) {
656 memo_table_.VisitValues(0, [&buffer](const ::arrow::util::string_view& v) {
657 uint32_t len = static_cast<uint32_t>(v.length());
658 memcpy(buffer, &len, sizeof(len));
659 buffer += sizeof(len);
660 memcpy(buffer, v.data(), len);
661 buffer += len;
662 });
663}
664
665template <>
666void DictEncoderImpl<FLBAType>::WriteDict(uint8_t* buffer) {
667 memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) {
668 DCHECK_EQ(v.length(), static_cast<size_t>(type_length_));
669 memcpy(buffer, v.data(), type_length_);
670 buffer += type_length_;
671 });
672}
673
674template <typename DType>
675inline void DictEncoderImpl<DType>::Put(const T& v) {
676 // Put() implementation for primitive types
677 auto on_found = [](int32_t memo_index) {};
678 auto on_not_found = [this](int32_t memo_index) {
679 dict_encoded_size_ += static_cast<int>(sizeof(T));
680 };
681
682 int32_t memo_index;
683 PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(v, on_found, on_not_found, &memo_index));
684 buffered_indices_.push_back(memo_index);
685}
686
687template <typename DType>
688inline void DictEncoderImpl<DType>::PutByteArray(const void* ptr, int32_t length) {
689 DCHECK(false);
690}
691
692template <>
693inline void DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
694 int32_t length) {
695 static const uint8_t empty[] = {0};
696
697 auto on_found = [](int32_t memo_index) {};
698 auto on_not_found = [&](int32_t memo_index) {
699 dict_encoded_size_ += static_cast<int>(length + sizeof(uint32_t));
700 };
701
702 DCHECK(ptr != nullptr || length == 0);
703 ptr = (ptr != nullptr) ? ptr : empty;
704 int32_t memo_index;
705 PARQUET_THROW_NOT_OK(
706 memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, &memo_index));
707 buffered_indices_.push_back(memo_index);
708}
709
710template <>
711inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& val) {
712 return PutByteArray(val.ptr, static_cast<int32_t>(val.len));
713}
714
715template <>
716inline void DictEncoderImpl<FLBAType>::Put(const FixedLenByteArray& v) {
717 static const uint8_t empty[] = {0};
718
719 auto on_found = [](int32_t memo_index) {};
720 auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; };
721
722 DCHECK(v.ptr != nullptr || type_length_ == 0);
723 const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
724 int32_t memo_index;
725 PARQUET_THROW_NOT_OK(
726 memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found, &memo_index));
727 buffered_indices_.push_back(memo_index);
728}
729
730template <>
731void DictEncoderImpl<Int96Type>::Put(const ::arrow::Array& values) {
732 ParquetException::NYI("Direct put to Int96");
733}
734
735template <>
736void DictEncoderImpl<Int96Type>::PutDictionary(const ::arrow::Array& values) {
737 ParquetException::NYI("Direct put to Int96");
738}
739
740template <typename DType>
741void DictEncoderImpl<DType>::Put(const ::arrow::Array& values) {
742 using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType;
743 const auto& data = checked_cast<const ArrayType&>(values);
744 if (data.null_count() == 0) {
745 // no nulls, just dump the data
746 for (int64_t i = 0; i < data.length(); i++) {
747 Put(data.Value(i));
748 }
749 } else {
750 for (int64_t i = 0; i < data.length(); i++) {
751 if (data.IsValid(i)) {
752 Put(data.Value(i));
753 }
754 }
755 }
756}
757
758template <>
759void DictEncoderImpl<FLBAType>::Put(const ::arrow::Array& values) {
760 AssertFixedSizeBinary(values, type_length_);
761 const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
762 if (data.null_count() == 0) {
763 // no nulls, just dump the data
764 for (int64_t i = 0; i < data.length(); i++) {
765 Put(FixedLenByteArray(data.Value(i)));
766 }
767 } else {
768 std::vector<uint8_t> empty(type_length_, 0);
769 for (int64_t i = 0; i < data.length(); i++) {
770 if (data.IsValid(i)) {
771 Put(FixedLenByteArray(data.Value(i)));
772 }
773 }
774 }
775}
776
777template <>
778void DictEncoderImpl<ByteArrayType>::Put(const ::arrow::Array& values) {
779 AssertBaseBinary(values);
780 if (::arrow::is_binary_like(values.type_id())) {
781 PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
782 } else {
783 DCHECK(::arrow::is_large_binary_like(values.type_id()));
784 PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
785 }
786}
787
788template <typename DType>
789void AssertCanPutDictionary(DictEncoderImpl<DType>* encoder, const ::arrow::Array& dict) {
790 if (dict.null_count() > 0) {
791 throw ParquetException("Inserted dictionary cannot cannot contain nulls");
792 }
793
794 if (encoder->num_entries() > 0) {
795 throw ParquetException("Can only call PutDictionary on an empty DictEncoder");
796 }
797}
798
799template <typename DType>
800void DictEncoderImpl<DType>::PutDictionary(const ::arrow::Array& values) {
801 AssertCanPutDictionary(this, values);
802
803 using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType;
804 const auto& data = checked_cast<const ArrayType&>(values);
805
806 dict_encoded_size_ += static_cast<int>(sizeof(typename DType::c_type) * data.length());
807 for (int64_t i = 0; i < data.length(); i++) {
808 int32_t unused_memo_index;
809 PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(data.Value(i), &unused_memo_index));
810 }
811}
812
813template <>
814void DictEncoderImpl<FLBAType>::PutDictionary(const ::arrow::Array& values) {
815 AssertFixedSizeBinary(values, type_length_);
816 AssertCanPutDictionary(this, values);
817
818 const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
819
820 dict_encoded_size_ += static_cast<int>(type_length_ * data.length());
821 for (int64_t i = 0; i < data.length(); i++) {
822 int32_t unused_memo_index;
823 PARQUET_THROW_NOT_OK(
824 memo_table_.GetOrInsert(data.Value(i), type_length_, &unused_memo_index));
825 }
826}
827
828template <>
829void DictEncoderImpl<ByteArrayType>::PutDictionary(const ::arrow::Array& values) {
830 AssertBaseBinary(values);
831 AssertCanPutDictionary(this, values);
832
833 if (::arrow::is_binary_like(values.type_id())) {
834 PutBinaryDictionaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
835 } else {
836 DCHECK(::arrow::is_large_binary_like(values.type_id()));
837 PutBinaryDictionaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
838 }
839}
840
841// ----------------------------------------------------------------------
842// ByteStreamSplitEncoder<T> implementations
843
844template <typename DType>
845class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
846 public:
847 using T = typename DType::c_type;
848 using TypedEncoder<DType>::Put;
849
850 explicit ByteStreamSplitEncoder(
851 const ColumnDescriptor* descr,
852 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
853
854 int64_t EstimatedDataEncodedSize() override;
855 std::shared_ptr<Buffer> FlushValues() override;
856
857 void Put(const T* buffer, int num_values) override;
858 void Put(const ::arrow::Array& values) override;
859 void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
860 int64_t valid_bits_offset) override;
861
862 protected:
863 template <typename ArrowType>
864 void PutImpl(const ::arrow::Array& values) {
865 if (values.type_id() != ArrowType::type_id) {
866 throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() +
867 " from " + values.type()->ToString() + " not supported");
868 }
869 const auto& data = *values.data();
870 PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
871 static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), data.offset);
872 }
873
874 ::arrow::BufferBuilder sink_;
875 int64_t num_values_in_buffer_;
876};
877
878template <typename DType>
879ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* descr,
880 ::arrow::MemoryPool* pool)
881 : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool),
882 sink_{pool},
883 num_values_in_buffer_{0} {}
884
885template <typename DType>
886int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
887 return sink_.length();
888}
889
890template <typename DType>
891std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
892 std::shared_ptr<ResizableBuffer> output_buffer =
893 AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
894 uint8_t* output_buffer_raw = output_buffer->mutable_data();
895 const uint8_t* raw_values = sink_.data();
896 ::arrow::util::internal::ByteStreamSplitEncode<T>(raw_values, num_values_in_buffer_,
897 output_buffer_raw);
898 sink_.Reset();
899 num_values_in_buffer_ = 0;
900 return std::move(output_buffer);
901}
902
903template <typename DType>
904void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
905 if (num_values > 0) {
906 PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
907 num_values_in_buffer_ += num_values;
908 }
909}
910
911template <>
912void ByteStreamSplitEncoder<FloatType>::Put(const ::arrow::Array& values) {
913 PutImpl<::arrow::FloatType>(values);
914}
915
916template <>
917void ByteStreamSplitEncoder<DoubleType>::Put(const ::arrow::Array& values) {
918 PutImpl<::arrow::DoubleType>(values);
919}
920
921template <typename DType>
922void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
923 const uint8_t* valid_bits,
924 int64_t valid_bits_offset) {
925 if (valid_bits != NULLPTR) {
926 PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
927 this->memory_pool()));
928 T* data = reinterpret_cast<T*>(buffer->mutable_data());
929 int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
930 src, num_values, valid_bits, valid_bits_offset, data);
931 Put(data, num_valid_values);
932 } else {
933 Put(src, num_values);
934 }
935}
936
937class DecoderImpl : virtual public Decoder {
938 public:
939 void SetData(int num_values, const uint8_t* data, int len) override {
940 num_values_ = num_values;
941 data_ = data;
942 len_ = len;
943 }
944
945 int values_left() const override { return num_values_; }
946 Encoding::type encoding() const override { return encoding_; }
947
948 protected:
949 explicit DecoderImpl(const ColumnDescriptor* descr, Encoding::type encoding)
950 : descr_(descr), encoding_(encoding), num_values_(0), data_(NULLPTR), len_(0) {}
951
952 // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
953 const ColumnDescriptor* descr_;
954
955 const Encoding::type encoding_;
956 int num_values_;
957 const uint8_t* data_;
958 int len_;
959 int type_length_;
960};
961
962template <typename DType>
963class PlainDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
964 public:
965 using T = typename DType::c_type;
966 explicit PlainDecoder(const ColumnDescriptor* descr);
967
968 int Decode(T* buffer, int max_values) override;
969
970 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
971 int64_t valid_bits_offset,
972 typename EncodingTraits<DType>::Accumulator* builder) override;
973
974 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
975 int64_t valid_bits_offset,
976 typename EncodingTraits<DType>::DictAccumulator* builder) override;
977};
978
979template <>
980inline int PlainDecoder<Int96Type>::DecodeArrow(
981 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
982 typename EncodingTraits<Int96Type>::Accumulator* builder) {
983 ParquetException::NYI("DecodeArrow not supported for Int96");
984}
985
986template <>
987inline int PlainDecoder<Int96Type>::DecodeArrow(
988 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
989 typename EncodingTraits<Int96Type>::DictAccumulator* builder) {
990 ParquetException::NYI("DecodeArrow not supported for Int96");
991}
992
993template <>
994inline int PlainDecoder<BooleanType>::DecodeArrow(
995 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
996 typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
997 ParquetException::NYI("dictionaries of BooleanType");
998}
999
1000template <typename DType>
1001int PlainDecoder<DType>::DecodeArrow(
1002 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1003 typename EncodingTraits<DType>::Accumulator* builder) {
1004 using value_type = typename DType::c_type;
1005
1006 constexpr int value_size = static_cast<int>(sizeof(value_type));
1007 int values_decoded = num_values - null_count;
1008 if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
1009 ParquetException::EofException();
1010 }
1011
1012 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
1013
1014 VisitNullBitmapInline(
1015 valid_bits, valid_bits_offset, num_values, null_count,
1016 [&]() {
1017 builder->UnsafeAppend(::arrow::util::SafeLoadAs<value_type>(data_));
1018 data_ += sizeof(value_type);
1019 },
1020 [&]() { builder->UnsafeAppendNull(); });
1021
1022 num_values_ -= values_decoded;
1023 len_ -= sizeof(value_type) * values_decoded;
1024 return values_decoded;
1025}
1026
1027template <typename DType>
1028int PlainDecoder<DType>::DecodeArrow(
1029 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1030 typename EncodingTraits<DType>::DictAccumulator* builder) {
1031 using value_type = typename DType::c_type;
1032
1033 constexpr int value_size = static_cast<int>(sizeof(value_type));
1034 int values_decoded = num_values - null_count;
1035 if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
1036 ParquetException::EofException();
1037 }
1038
1039 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
1040
1041 VisitNullBitmapInline(
1042 valid_bits, valid_bits_offset, num_values, null_count,
1043 [&]() {
1044 PARQUET_THROW_NOT_OK(
1045 builder->Append(::arrow::util::SafeLoadAs<value_type>(data_)));
1046 data_ += sizeof(value_type);
1047 },
1048 [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
1049
1050 num_values_ -= values_decoded;
1051 len_ -= sizeof(value_type) * values_decoded;
1052 return values_decoded;
1053}
1054
1055// Decode routine templated on C++ type rather than type enum
1056template <typename T>
1057inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
1058 int type_length, T* out) {
1059 int64_t bytes_to_decode = num_values * static_cast<int64_t>(sizeof(T));
1060 if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) {
1061 ParquetException::EofException();
1062 }
1063 // If bytes_to_decode == 0, data could be null
1064 if (bytes_to_decode > 0) {
1065 memcpy(out, data, bytes_to_decode);
1066 }
1067 return static_cast<int>(bytes_to_decode);
1068}
1069
1070template <typename DType>
1071PlainDecoder<DType>::PlainDecoder(const ColumnDescriptor* descr)
1072 : DecoderImpl(descr, Encoding::PLAIN) {
1073 if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
1074 type_length_ = descr_->type_length();
1075 } else {
1076 type_length_ = -1;
1077 }
1078}
1079
1080// Template specialization for BYTE_ARRAY. The written values do not own their
1081// own data.
1082
1083static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size,
1084 ByteArray* out) {
1085 if (ARROW_PREDICT_FALSE(data_size < 4)) {
1086 ParquetException::EofException();
1087 }
1088 const int32_t len = ::arrow::util::SafeLoadAs<int32_t>(data);
1089 if (len < 0) {
1090 throw ParquetException("Invalid BYTE_ARRAY value");
1091 }
1092 const int64_t consumed_length = static_cast<int64_t>(len) + 4;
1093 if (ARROW_PREDICT_FALSE(data_size < consumed_length)) {
1094 ParquetException::EofException();
1095 }
1096 *out = ByteArray{static_cast<uint32_t>(len), data + 4};
1097 return consumed_length;
1098}
1099
1100template <>
1101inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values,
1102 int type_length, ByteArray* out) {
1103 int bytes_decoded = 0;
1104 for (int i = 0; i < num_values; ++i) {
1105 const auto increment = ReadByteArray(data, data_size, out + i);
1106 if (ARROW_PREDICT_FALSE(increment > INT_MAX - bytes_decoded)) {
1107 throw ParquetException("BYTE_ARRAY chunk too large");
1108 }
1109 data += increment;
1110 data_size -= increment;
1111 bytes_decoded += static_cast<int>(increment);
1112 }
1113 return bytes_decoded;
1114}
1115
1116// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
1117// own their own data.
1118template <>
1119inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size,
1120 int num_values, int type_length,
1121 FixedLenByteArray* out) {
1122 int64_t bytes_to_decode = static_cast<int64_t>(type_length) * num_values;
1123 if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) {
1124 ParquetException::EofException();
1125 }
1126 for (int i = 0; i < num_values; ++i) {
1127 out[i].ptr = data;
1128 data += type_length;
1129 data_size -= type_length;
1130 }
1131 return static_cast<int>(bytes_to_decode);
1132}
1133
1134template <typename DType>
1135int PlainDecoder<DType>::Decode(T* buffer, int max_values) {
1136 max_values = std::min(max_values, num_values_);
1137 int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer);
1138 data_ += bytes_consumed;
1139 len_ -= bytes_consumed;
1140 num_values_ -= max_values;
1141 return max_values;
1142}
1143
1144class PlainBooleanDecoder : public DecoderImpl,
1145 virtual public TypedDecoder<BooleanType>,
1146 virtual public BooleanDecoder {
1147 public:
1148 explicit PlainBooleanDecoder(const ColumnDescriptor* descr);
1149 void SetData(int num_values, const uint8_t* data, int len) override;
1150
1151 // Two flavors of bool decoding
1152 int Decode(uint8_t* buffer, int max_values) override;
1153 int Decode(bool* buffer, int max_values) override;
1154 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1155 int64_t valid_bits_offset,
1156 typename EncodingTraits<BooleanType>::Accumulator* out) override;
1157
1158 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1159 int64_t valid_bits_offset,
1160 typename EncodingTraits<BooleanType>::DictAccumulator* out) override;
1161
1162 private:
1163 std::unique_ptr<::arrow::BitUtil::BitReader> bit_reader_;
1164};
1165
1166PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr)
1167 : DecoderImpl(descr, Encoding::PLAIN) {}
1168
1169void PlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) {
1170 num_values_ = num_values;
1171 bit_reader_.reset(new BitUtil::BitReader(data, len));
1172}
1173
1174int PlainBooleanDecoder::DecodeArrow(
1175 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1176 typename EncodingTraits<BooleanType>::Accumulator* builder) {
1177 int values_decoded = num_values - null_count;
1178 if (ARROW_PREDICT_FALSE(num_values_ < values_decoded)) {
1179 ParquetException::EofException();
1180 }
1181
1182 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
1183
1184 VisitNullBitmapInline(
1185 valid_bits, valid_bits_offset, num_values, null_count,
1186 [&]() {
1187 bool value;
1188 ARROW_IGNORE_EXPR(bit_reader_->GetValue(1, &value));
1189 builder->UnsafeAppend(value);
1190 },
1191 [&]() { builder->UnsafeAppendNull(); });
1192
1193 num_values_ -= values_decoded;
1194 return values_decoded;
1195}
1196
1197inline int PlainBooleanDecoder::DecodeArrow(
1198 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1199 typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
1200 ParquetException::NYI("dictionaries of BooleanType");
1201}
1202
1203int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) {
1204 max_values = std::min(max_values, num_values_);
1205 bool val;
1206 ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values);
1207 for (int i = 0; i < max_values; ++i) {
1208 if (!bit_reader_->GetValue(1, &val)) {
1209 ParquetException::EofException();
1210 }
1211 if (val) {
1212 bit_writer.Set();
1213 }
1214 bit_writer.Next();
1215 }
1216 bit_writer.Finish();
1217 num_values_ -= max_values;
1218 return max_values;
1219}
1220
1221int PlainBooleanDecoder::Decode(bool* buffer, int max_values) {
1222 max_values = std::min(max_values, num_values_);
1223 if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) {
1224 ParquetException::EofException();
1225 }
1226 num_values_ -= max_values;
1227 return max_values;
1228}
1229
1230struct ArrowBinaryHelper {
1231 explicit ArrowBinaryHelper(typename EncodingTraits<ByteArrayType>::Accumulator* out) {
1232 this->out = out;
1233 this->builder = out->builder.get();
1234 this->chunk_space_remaining =
1235 ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
1236 }
1237
1238 Status PushChunk() {
1239 std::shared_ptr<::arrow::Array> result;
1240 RETURN_NOT_OK(builder->Finish(&result));
1241 out->chunks.push_back(result);
1242 chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
1243 return Status::OK();
1244 }
1245
1246 bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
1247
1248 void UnsafeAppend(const uint8_t* data, int32_t length) {
1249 chunk_space_remaining -= length;
1250 builder->UnsafeAppend(data, length);
1251 }
1252
1253 void UnsafeAppendNull() { builder->UnsafeAppendNull(); }
1254
1255 Status Append(const uint8_t* data, int32_t length) {
1256 chunk_space_remaining -= length;
1257 return builder->Append(data, length);
1258 }
1259
1260 Status AppendNull() { return builder->AppendNull(); }
1261
1262 typename EncodingTraits<ByteArrayType>::Accumulator* out;
1263 ::arrow::BinaryBuilder* builder;
1264 int64_t chunk_space_remaining;
1265};
1266
1267template <>
1268inline int PlainDecoder<ByteArrayType>::DecodeArrow(
1269 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1270 typename EncodingTraits<ByteArrayType>::Accumulator* builder) {
1271 ParquetException::NYI();
1272}
1273
1274template <>
1275inline int PlainDecoder<ByteArrayType>::DecodeArrow(
1276 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1277 typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) {
1278 ParquetException::NYI();
1279}
1280
1281template <>
1282inline int PlainDecoder<FLBAType>::DecodeArrow(
1283 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1284 typename EncodingTraits<FLBAType>::Accumulator* builder) {
1285 int values_decoded = num_values - null_count;
1286 if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) {
1287 ParquetException::EofException();
1288 }
1289
1290 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
1291
1292 VisitNullBitmapInline(
1293 valid_bits, valid_bits_offset, num_values, null_count,
1294 [&]() {
1295 builder->UnsafeAppend(data_);
1296 data_ += descr_->type_length();
1297 },
1298 [&]() { builder->UnsafeAppendNull(); });
1299
1300 num_values_ -= values_decoded;
1301 len_ -= descr_->type_length() * values_decoded;
1302 return values_decoded;
1303}
1304
1305template <>
1306inline int PlainDecoder<FLBAType>::DecodeArrow(
1307 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1308 typename EncodingTraits<FLBAType>::DictAccumulator* builder) {
1309 int values_decoded = num_values - null_count;
1310 if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) {
1311 ParquetException::EofException();
1312 }
1313
1314 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
1315
1316 VisitNullBitmapInline(
1317 valid_bits, valid_bits_offset, num_values, null_count,
1318 [&]() {
1319 PARQUET_THROW_NOT_OK(builder->Append(data_));
1320 data_ += descr_->type_length();
1321 },
1322 [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
1323
1324 num_values_ -= values_decoded;
1325 len_ -= descr_->type_length() * values_decoded;
1326 return values_decoded;
1327}
1328
1329class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
1330 virtual public ByteArrayDecoder {
1331 public:
1332 using Base = PlainDecoder<ByteArrayType>;
1333 using Base::DecodeSpaced;
1334 using Base::PlainDecoder;
1335
1336 // ----------------------------------------------------------------------
1337 // Dictionary read paths
1338
1339 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1340 int64_t valid_bits_offset,
1341 ::arrow::BinaryDictionary32Builder* builder) override {
1342 int result = 0;
1343 PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
1344 valid_bits_offset, builder, &result));
1345 return result;
1346 }
1347
1348 // ----------------------------------------------------------------------
1349 // Optimized dense binary read paths
1350
1351 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1352 int64_t valid_bits_offset,
1353 typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
1354 int result = 0;
1355 PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
1356 valid_bits_offset, out, &result));
1357 return result;
1358 }
1359
1360 private:
1361 Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
1362 int64_t valid_bits_offset,
1363 typename EncodingTraits<ByteArrayType>::Accumulator* out,
1364 int* out_values_decoded) {
1365 ArrowBinaryHelper helper(out);
1366 int values_decoded = 0;
1367
1368 RETURN_NOT_OK(helper.builder->Reserve(num_values));
1369 RETURN_NOT_OK(helper.builder->ReserveData(
1370 std::min<int64_t>(len_, helper.chunk_space_remaining)));
1371
1372 int i = 0;
1373 RETURN_NOT_OK(VisitNullBitmapInline(
1374 valid_bits, valid_bits_offset, num_values, null_count,
1375 [&]() {
1376 if (ARROW_PREDICT_FALSE(len_ < 4)) {
1377 ParquetException::EofException();
1378 }
1379 auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_);
1380 if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) {
1381 return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
1382 }
1383 auto increment = value_len + 4;
1384 if (ARROW_PREDICT_FALSE(len_ < increment)) {
1385 ParquetException::EofException();
1386 }
1387 if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
1388 // This element would exceed the capacity of a chunk
1389 RETURN_NOT_OK(helper.PushChunk());
1390 RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
1391 RETURN_NOT_OK(helper.builder->ReserveData(
1392 std::min<int64_t>(len_, helper.chunk_space_remaining)));
1393 }
1394 helper.UnsafeAppend(data_ + 4, value_len);
1395 data_ += increment;
1396 len_ -= increment;
1397 ++values_decoded;
1398 ++i;
1399 return Status::OK();
1400 },
1401 [&]() {
1402 helper.UnsafeAppendNull();
1403 ++i;
1404 return Status::OK();
1405 }));
1406
1407 num_values_ -= values_decoded;
1408 *out_values_decoded = values_decoded;
1409 return Status::OK();
1410 }
1411
1412 template <typename BuilderType>
1413 Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1414 int64_t valid_bits_offset, BuilderType* builder,
1415 int* out_values_decoded) {
1416 RETURN_NOT_OK(builder->Reserve(num_values));
1417 int values_decoded = 0;
1418
1419 RETURN_NOT_OK(VisitNullBitmapInline(
1420 valid_bits, valid_bits_offset, num_values, null_count,
1421 [&]() {
1422 if (ARROW_PREDICT_FALSE(len_ < 4)) {
1423 ParquetException::EofException();
1424 }
1425 auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_);
1426 if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) {
1427 return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
1428 }
1429 auto increment = value_len + 4;
1430 if (ARROW_PREDICT_FALSE(len_ < increment)) {
1431 ParquetException::EofException();
1432 }
1433 RETURN_NOT_OK(builder->Append(data_ + 4, value_len));
1434 data_ += increment;
1435 len_ -= increment;
1436 ++values_decoded;
1437 return Status::OK();
1438 },
1439 [&]() { return builder->AppendNull(); }));
1440
1441 num_values_ -= values_decoded;
1442 *out_values_decoded = values_decoded;
1443 return Status::OK();
1444 }
1445};
1446
1447class PlainFLBADecoder : public PlainDecoder<FLBAType>, virtual public FLBADecoder {
1448 public:
1449 using Base = PlainDecoder<FLBAType>;
1450 using Base::PlainDecoder;
1451};
1452
1453// ----------------------------------------------------------------------
1454// Dictionary encoding and decoding
1455
1456template <typename Type>
1457class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
1458 public:
1459 typedef typename Type::c_type T;
1460
1461 // Initializes the dictionary with values from 'dictionary'. The data in
1462 // dictionary is not guaranteed to persist in memory after this call so the
1463 // dictionary decoder needs to copy the data out if necessary.
1464 explicit DictDecoderImpl(const ColumnDescriptor* descr,
1465 MemoryPool* pool = ::arrow::default_memory_pool())
1466 : DecoderImpl(descr, Encoding::RLE_DICTIONARY),
1467 dictionary_(AllocateBuffer(pool, 0)),
1468 dictionary_length_(0),
1469 byte_array_data_(AllocateBuffer(pool, 0)),
1470 byte_array_offsets_(AllocateBuffer(pool, 0)),
1471 indices_scratch_space_(AllocateBuffer(pool, 0)) {}
1472
1473 // Perform type-specific initiatialization
1474 void SetDict(TypedDecoder<Type>* dictionary) override;
1475
1476 void SetData(int num_values, const uint8_t* data, int len) override {
1477 num_values_ = num_values;
1478 if (len == 0) {
1479 // Initialize dummy decoder to avoid crashes later on
1480 idx_decoder_ = ::arrow::util::RleDecoder(data, len, /*bit_width=*/1);
1481 return;
1482 }
1483 uint8_t bit_width = *data;
1484 if (ARROW_PREDICT_FALSE(bit_width >= 64)) {
1485 throw ParquetException("Invalid or corrupted bit_width");
1486 }
1487 idx_decoder_ = ::arrow::util::RleDecoder(++data, --len, bit_width);
1488 }
1489
1490 int Decode(T* buffer, int num_values) override {
1491 num_values = std::min(num_values, num_values_);
1492 int decoded_values =
1493 idx_decoder_.GetBatchWithDict(reinterpret_cast<const T*>(dictionary_->data()),
1494 dictionary_length_, buffer, num_values);
1495 if (decoded_values != num_values) {
1496 ParquetException::EofException();
1497 }
1498 num_values_ -= num_values;
1499 return num_values;
1500 }
1501
1502 int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
1503 int64_t valid_bits_offset) override {
1504 num_values = std::min(num_values, num_values_);
1505 if (num_values != idx_decoder_.GetBatchWithDictSpaced(
1506 reinterpret_cast<const T*>(dictionary_->data()),
1507 dictionary_length_, buffer, num_values, null_count, valid_bits,
1508 valid_bits_offset)) {
1509 ParquetException::EofException();
1510 }
1511 num_values_ -= num_values;
1512 return num_values;
1513 }
1514
1515 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1516 int64_t valid_bits_offset,
1517 typename EncodingTraits<Type>::Accumulator* out) override;
1518
1519 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1520 int64_t valid_bits_offset,
1521 typename EncodingTraits<Type>::DictAccumulator* out) override;
1522
1523 void InsertDictionary(::arrow::ArrayBuilder* builder) override;
1524
1525 int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits,
1526 int64_t valid_bits_offset,
1527 ::arrow::ArrayBuilder* builder) override {
1528 if (num_values > 0) {
1529 // TODO(wesm): Refactor to batch reads for improved memory use. It is not
1530 // trivial because the null_count is relative to the entire bitmap
1531 PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
1532 num_values, /*shrink_to_fit=*/false));
1533 }
1534
1535 auto indices_buffer =
1536 reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
1537
1538 if (num_values != idx_decoder_.GetBatchSpaced(num_values, null_count, valid_bits,
1539 valid_bits_offset, indices_buffer)) {
1540 ParquetException::EofException();
1541 }
1542
1543 /// XXX(wesm): Cannot append "valid bits" directly to the builder
1544 std::vector<uint8_t> valid_bytes(num_values);
1545 ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
1546 for (int64_t i = 0; i < num_values; ++i) {
1547 valid_bytes[i] = static_cast<uint8_t>(bit_reader.IsSet());
1548 bit_reader.Next();
1549 }
1550
1551 auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);
1552 PARQUET_THROW_NOT_OK(
1553 binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data()));
1554 num_values_ -= num_values - null_count;
1555 return num_values - null_count;
1556 }
1557
1558 int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) override {
1559 num_values = std::min(num_values, num_values_);
1560 if (num_values > 0) {
1561 // TODO(wesm): Refactor to batch reads for improved memory use. This is
1562 // relatively simple here because we don't have to do any bookkeeping of
1563 // nulls
1564 PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
1565 num_values, /*shrink_to_fit=*/false));
1566 }
1567 auto indices_buffer =
1568 reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
1569 if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) {
1570 ParquetException::EofException();
1571 }
1572 auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);
1573 PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values));
1574 num_values_ -= num_values;
1575 return num_values;
1576 }
1577
1578 int DecodeIndices(int num_values, int32_t* indices) override {
1579 if (num_values != idx_decoder_.GetBatch(indices, num_values)) {
1580 ParquetException::EofException();
1581 }
1582 num_values_ -= num_values;
1583 return num_values;
1584 }
1585
1586 void GetDictionary(const T** dictionary, int32_t* dictionary_length) override {
1587 *dictionary_length = dictionary_length_;
1588 *dictionary = reinterpret_cast<T*>(dictionary_->mutable_data());
1589 }
1590
1591 protected:
1592 Status IndexInBounds(int32_t index) {
1593 if (ARROW_PREDICT_TRUE(0 <= index && index < dictionary_length_)) {
1594 return Status::OK();
1595 }
1596 return Status::Invalid("Index not in dictionary bounds");
1597 }
1598
1599 inline void DecodeDict(TypedDecoder<Type>* dictionary) {
1600 dictionary_length_ = static_cast<int32_t>(dictionary->values_left());
1601 PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T),
1602 /*shrink_to_fit=*/false));
1603 dictionary->Decode(reinterpret_cast<T*>(dictionary_->mutable_data()),
1604 dictionary_length_);
1605 }
1606
1607 // Only one is set.
1608 std::shared_ptr<ResizableBuffer> dictionary_;
1609
1610 int32_t dictionary_length_;
1611
1612 // Data that contains the byte array data (byte_array_dictionary_ just has the
1613 // pointers).
1614 std::shared_ptr<ResizableBuffer> byte_array_data_;
1615
1616 // Arrow-style byte offsets for each dictionary value. We maintain two
1617 // representations of the dictionary, one as ByteArray* for non-Arrow
1618 // consumers and this one for Arrow consumers. Since dictionaries are
1619 // generally pretty small to begin with this doesn't mean too much extra
1620 // memory use in most cases
1621 std::shared_ptr<ResizableBuffer> byte_array_offsets_;
1622
1623 // Reusable buffer for decoding dictionary indices to be appended to a
1624 // BinaryDictionary32Builder
1625 std::shared_ptr<ResizableBuffer> indices_scratch_space_;
1626
1627 ::arrow::util::RleDecoder idx_decoder_;
1628};
1629
1630template <typename Type>
1631void DictDecoderImpl<Type>::SetDict(TypedDecoder<Type>* dictionary) {
1632 DecodeDict(dictionary);
1633}
1634
1635template <>
1636void DictDecoderImpl<BooleanType>::SetDict(TypedDecoder<BooleanType>* dictionary) {
1637 ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
1638}
1639
1640template <>
1641void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictionary) {
1642 DecodeDict(dictionary);
1643
1644 auto dict_values = reinterpret_cast<ByteArray*>(dictionary_->mutable_data());
1645
1646 int total_size = 0;
1647 for (int i = 0; i < dictionary_length_; ++i) {
1648 total_size += dict_values[i].len;
1649 }
1650 PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
1651 /*shrink_to_fit=*/false));
1652 PARQUET_THROW_NOT_OK(
1653 byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t),
1654 /*shrink_to_fit=*/false));
1655
1656 int32_t offset = 0;
1657 uint8_t* bytes_data = byte_array_data_->mutable_data();
1658 int32_t* bytes_offsets =
1659 reinterpret_cast<int32_t*>(byte_array_offsets_->mutable_data());
1660 for (int i = 0; i < dictionary_length_; ++i) {
1661 memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len);
1662 bytes_offsets[i] = offset;
1663 dict_values[i].ptr = bytes_data + offset;
1664 offset += dict_values[i].len;
1665 }
1666 bytes_offsets[dictionary_length_] = offset;
1667}
1668
1669template <>
1670inline void DictDecoderImpl<FLBAType>::SetDict(TypedDecoder<FLBAType>* dictionary) {
1671 DecodeDict(dictionary);
1672
1673 auto dict_values = reinterpret_cast<FLBA*>(dictionary_->mutable_data());
1674
1675 int fixed_len = descr_->type_length();
1676 int total_size = dictionary_length_ * fixed_len;
1677
1678 PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
1679 /*shrink_to_fit=*/false));
1680 uint8_t* bytes_data = byte_array_data_->mutable_data();
1681 for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) {
1682 memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len);
1683 dict_values[i].ptr = bytes_data + offset;
1684 }
1685}
1686
1687template <>
1688inline int DictDecoderImpl<Int96Type>::DecodeArrow(
1689 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1690 typename EncodingTraits<Int96Type>::Accumulator* builder) {
1691 ParquetException::NYI("DecodeArrow to Int96Type");
1692}
1693
1694template <>
1695inline int DictDecoderImpl<Int96Type>::DecodeArrow(
1696 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1697 typename EncodingTraits<Int96Type>::DictAccumulator* builder) {
1698 ParquetException::NYI("DecodeArrow to Int96Type");
1699}
1700
1701template <>
1702inline int DictDecoderImpl<ByteArrayType>::DecodeArrow(
1703 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1704 typename EncodingTraits<ByteArrayType>::Accumulator* builder) {
1705 ParquetException::NYI("DecodeArrow implemented elsewhere");
1706}
1707
1708template <>
1709inline int DictDecoderImpl<ByteArrayType>::DecodeArrow(
1710 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1711 typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) {
1712 ParquetException::NYI("DecodeArrow implemented elsewhere");
1713}
1714
1715template <typename DType>
1716int DictDecoderImpl<DType>::DecodeArrow(
1717 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1718 typename EncodingTraits<DType>::DictAccumulator* builder) {
1719 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
1720
1721 auto dict_values = reinterpret_cast<const typename DType::c_type*>(dictionary_->data());
1722
1723 VisitNullBitmapInline(
1724 valid_bits, valid_bits_offset, num_values, null_count,
1725 [&]() {
1726 int32_t index;
1727 if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
1728 throw ParquetException("");
1729 }
1730 PARQUET_THROW_NOT_OK(IndexInBounds(index));
1731 PARQUET_THROW_NOT_OK(builder->Append(dict_values[index]));
1732 },
1733 [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
1734
1735 return num_values - null_count;
1736}
1737
1738template <>
1739int DictDecoderImpl<BooleanType>::DecodeArrow(
1740 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1741 typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
1742 ParquetException::NYI("No dictionary encoding for BooleanType");
1743}
1744
1745template <>
1746inline int DictDecoderImpl<FLBAType>::DecodeArrow(
1747 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1748 typename EncodingTraits<FLBAType>::Accumulator* builder) {
1749 if (builder->byte_width() != descr_->type_length()) {
1750 throw ParquetException("Byte width mismatch: builder was " +
1751 std::to_string(builder->byte_width()) + " but decoder was " +
1752 std::to_string(descr_->type_length()));
1753 }
1754
1755 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
1756
1757 auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());
1758
1759 VisitNullBitmapInline(
1760 valid_bits, valid_bits_offset, num_values, null_count,
1761 [&]() {
1762 int32_t index;
1763 if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
1764 throw ParquetException("");
1765 }
1766 PARQUET_THROW_NOT_OK(IndexInBounds(index));
1767 builder->UnsafeAppend(dict_values[index].ptr);
1768 },
1769 [&]() { builder->UnsafeAppendNull(); });
1770
1771 return num_values - null_count;
1772}
1773
1774template <>
1775int DictDecoderImpl<FLBAType>::DecodeArrow(
1776 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1777 typename EncodingTraits<FLBAType>::DictAccumulator* builder) {
1778 auto value_type =
1779 checked_cast<const ::arrow::DictionaryType&>(*builder->type()).value_type();
1780 auto byte_width =
1781 checked_cast<const ::arrow::FixedSizeBinaryType&>(*value_type).byte_width();
1782 if (byte_width != descr_->type_length()) {
1783 throw ParquetException("Byte width mismatch: builder was " +
1784 std::to_string(byte_width) + " but decoder was " +
1785 std::to_string(descr_->type_length()));
1786 }
1787
1788 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
1789
1790 auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());
1791
1792 VisitNullBitmapInline(
1793 valid_bits, valid_bits_offset, num_values, null_count,
1794 [&]() {
1795 int32_t index;
1796 if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
1797 throw ParquetException("");
1798 }
1799 PARQUET_THROW_NOT_OK(IndexInBounds(index));
1800 PARQUET_THROW_NOT_OK(builder->Append(dict_values[index].ptr));
1801 },
1802 [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
1803
1804 return num_values - null_count;
1805}
1806
1807template <typename Type>
1808int DictDecoderImpl<Type>::DecodeArrow(
1809 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
1810 typename EncodingTraits<Type>::Accumulator* builder) {
1811 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
1812
1813 using value_type = typename Type::c_type;
1814 auto dict_values = reinterpret_cast<const value_type*>(dictionary_->data());
1815
1816 VisitNullBitmapInline(
1817 valid_bits, valid_bits_offset, num_values, null_count,
1818 [&]() {
1819 int32_t index;
1820 if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
1821 throw ParquetException("");
1822 }
1823 PARQUET_THROW_NOT_OK(IndexInBounds(index));
1824 builder->UnsafeAppend(dict_values[index]);
1825 },
1826 [&]() { builder->UnsafeAppendNull(); });
1827
1828 return num_values - null_count;
1829}
1830
1831template <typename Type>
1832void DictDecoderImpl<Type>::InsertDictionary(::arrow::ArrayBuilder* builder) {
1833 ParquetException::NYI("InsertDictionary only implemented for BYTE_ARRAY types");
1834}
1835
1836template <>
1837void DictDecoderImpl<ByteArrayType>::InsertDictionary(::arrow::ArrayBuilder* builder) {
1838 auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);
1839
1840 // Make a BinaryArray referencing the internal dictionary data
1841 auto arr = std::make_shared<::arrow::BinaryArray>(
1842 dictionary_length_, byte_array_offsets_, byte_array_data_);
1843 PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr));
1844}
1845
1846class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
1847 virtual public ByteArrayDecoder {
1848 public:
1849 using BASE = DictDecoderImpl<ByteArrayType>;
1850 using BASE::DictDecoderImpl;
1851
1852 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1853 int64_t valid_bits_offset,
1854 ::arrow::BinaryDictionary32Builder* builder) override {
1855 int result = 0;
1856 if (null_count == 0) {
1857 PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
1858 } else {
1859 PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
1860 valid_bits_offset, builder, &result));
1861 }
1862 return result;
1863 }
1864
1865 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1866 int64_t valid_bits_offset,
1867 typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
1868 int result = 0;
1869 if (null_count == 0) {
1870 PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result));
1871 } else {
1872 PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
1873 valid_bits_offset, out, &result));
1874 }
1875 return result;
1876 }
1877
1878 private:
1879 Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
1880 int64_t valid_bits_offset,
1881 typename EncodingTraits<ByteArrayType>::Accumulator* out,
1882 int* out_num_values) {
1883 constexpr int32_t kBufferSize = 1024;
1884 int32_t indices[kBufferSize];
1885
1886 ArrowBinaryHelper helper(out);
1887
1888 ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
1889
1890 auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
1891 int values_decoded = 0;
1892 int num_appended = 0;
1893 while (num_appended < num_values) {
1894 bool is_valid = bit_reader.IsSet();
1895 bit_reader.Next();
1896
1897 if (is_valid) {
1898 int32_t batch_size =
1899 std::min<int32_t>(kBufferSize, num_values - num_appended - null_count);
1900 int num_indices = idx_decoder_.GetBatch(indices, batch_size);
1901
1902 if (ARROW_PREDICT_FALSE(num_indices < 1)) {
1903 return Status::Invalid("Invalid number of indices '", num_indices, "'");
1904 }
1905
1906 int i = 0;
1907 while (true) {
1908 // Consume all indices
1909 if (is_valid) {
1910 auto idx = indices[i];
1911 RETURN_NOT_OK(IndexInBounds(idx));
1912 const auto& val = dict_values[idx];
1913 if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
1914 RETURN_NOT_OK(helper.PushChunk());
1915 }
1916 RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
1917 ++i;
1918 ++values_decoded;
1919 } else {
1920 RETURN_NOT_OK(helper.AppendNull());
1921 --null_count;
1922 }
1923 ++num_appended;
1924 if (i == num_indices) {
1925 // Do not advance the bit_reader if we have fulfilled the decode
1926 // request
1927 break;
1928 }
1929 is_valid = bit_reader.IsSet();
1930 bit_reader.Next();
1931 }
1932 } else {
1933 RETURN_NOT_OK(helper.AppendNull());
1934 --null_count;
1935 ++num_appended;
1936 }
1937 }
1938 *out_num_values = values_decoded;
1939 return Status::OK();
1940 }
1941
1942 Status DecodeArrowDenseNonNull(int num_values,
1943 typename EncodingTraits<ByteArrayType>::Accumulator* out,
1944 int* out_num_values) {
1945 constexpr int32_t kBufferSize = 2048;
1946 int32_t indices[kBufferSize];
1947 int values_decoded = 0;
1948
1949 ArrowBinaryHelper helper(out);
1950 auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
1951
1952 while (values_decoded < num_values) {
1953 int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
1954 int num_indices = idx_decoder_.GetBatch(indices, batch_size);
1955 if (num_indices == 0) ParquetException::EofException();
1956 for (int i = 0; i < num_indices; ++i) {
1957 auto idx = indices[i];
1958 RETURN_NOT_OK(IndexInBounds(idx));
1959 const auto& val = dict_values[idx];
1960 if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
1961 RETURN_NOT_OK(helper.PushChunk());
1962 }
1963 RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
1964 }
1965 values_decoded += num_indices;
1966 }
1967 *out_num_values = values_decoded;
1968 return Status::OK();
1969 }
1970
1971 template <typename BuilderType>
1972 Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1973 int64_t valid_bits_offset, BuilderType* builder,
1974 int* out_num_values) {
1975 constexpr int32_t kBufferSize = 1024;
1976 int32_t indices[kBufferSize];
1977
1978 RETURN_NOT_OK(builder->Reserve(num_values));
1979 ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
1980
1981 auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
1982
1983 int values_decoded = 0;
1984 int num_appended = 0;
1985 while (num_appended < num_values) {
1986 bool is_valid = bit_reader.IsSet();
1987 bit_reader.Next();
1988
1989 if (is_valid) {
1990 int32_t batch_size =
1991 std::min<int32_t>(kBufferSize, num_values - num_appended - null_count);
1992 int num_indices = idx_decoder_.GetBatch(indices, batch_size);
1993
1994 int i = 0;
1995 while (true) {
1996 // Consume all indices
1997 if (is_valid) {
1998 auto idx = indices[i];
1999 RETURN_NOT_OK(IndexInBounds(idx));
2000 const auto& val = dict_values[idx];
2001 RETURN_NOT_OK(builder->Append(val.ptr, val.len));
2002 ++i;
2003 ++values_decoded;
2004 } else {
2005 RETURN_NOT_OK(builder->AppendNull());
2006 --null_count;
2007 }
2008 ++num_appended;
2009 if (i == num_indices) {
2010 // Do not advance the bit_reader if we have fulfilled the decode
2011 // request
2012 break;
2013 }
2014 is_valid = bit_reader.IsSet();
2015 bit_reader.Next();
2016 }
2017 } else {
2018 RETURN_NOT_OK(builder->AppendNull());
2019 --null_count;
2020 ++num_appended;
2021 }
2022 }
2023 *out_num_values = values_decoded;
2024 return Status::OK();
2025 }
2026
2027 template <typename BuilderType>
2028 Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* out_num_values) {
2029 constexpr int32_t kBufferSize = 2048;
2030 int32_t indices[kBufferSize];
2031
2032 RETURN_NOT_OK(builder->Reserve(num_values));
2033
2034 auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
2035
2036 int values_decoded = 0;
2037 while (values_decoded < num_values) {
2038 int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
2039 int num_indices = idx_decoder_.GetBatch(indices, batch_size);
2040 if (num_indices == 0) ParquetException::EofException();
2041 for (int i = 0; i < num_indices; ++i) {
2042 auto idx = indices[i];
2043 RETURN_NOT_OK(IndexInBounds(idx));
2044 const auto& val = dict_values[idx];
2045 RETURN_NOT_OK(builder->Append(val.ptr, val.len));
2046 }
2047 values_decoded += num_indices;
2048 }
2049 *out_num_values = values_decoded;
2050 return Status::OK();
2051 }
2052};
2053
2054// ----------------------------------------------------------------------
2055// DeltaBitPackDecoder
2056
2057template <typename DType>
2058class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
2059 public:
2060 typedef typename DType::c_type T;
2061
2062 explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
2063 MemoryPool* pool = ::arrow::default_memory_pool())
2064 : DecoderImpl(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) {
2065 if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
2066 throw ParquetException("Delta bit pack encoding should only be for integer data.");
2067 }
2068 }
2069
2070 void SetData(int num_values, const uint8_t* data, int len) override {
2071 this->num_values_ = num_values;
2072 decoder_ = ::arrow::BitUtil::BitReader(data, len);
2073 InitHeader();
2074 }
2075
2076 int Decode(T* buffer, int max_values) override {
2077 return GetInternal(buffer, max_values);
2078 }
2079
2080 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2081 int64_t valid_bits_offset,
2082 typename EncodingTraits<DType>::Accumulator* out) override {
2083 if (null_count != 0) {
2084 ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
2085 }
2086 std::vector<T> values(num_values);
2087 GetInternal(values.data(), num_values);
2088 PARQUET_THROW_NOT_OK(out->AppendValues(values));
2089 return num_values;
2090 }
2091
2092 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2093 int64_t valid_bits_offset,
2094 typename EncodingTraits<DType>::DictAccumulator* out) override {
2095 if (null_count != 0) {
2096 ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
2097 }
2098 std::vector<T> values(num_values);
2099 GetInternal(values.data(), num_values);
2100 PARQUET_THROW_NOT_OK(out->Reserve(num_values));
2101 for (T value : values) {
2102 PARQUET_THROW_NOT_OK(out->Append(value));
2103 }
2104 return num_values;
2105 }
2106
2107 private:
2108 static constexpr int kMaxDeltaBitWidth = static_cast<int>(sizeof(T) * 8);
2109
2110 void InitHeader() {
2111 if (!decoder_.GetVlqInt(&values_per_block_) ||
2112 !decoder_.GetVlqInt(&mini_blocks_per_block_) ||
2113 !decoder_.GetVlqInt(&total_value_count_) ||
2114 !decoder_.GetZigZagVlqInt(&last_value_)) {
2115 ParquetException::EofException();
2116 }
2117
2118 if (values_per_block_ == 0) {
2119 throw ParquetException("cannot have zero value per block");
2120 }
2121 if (mini_blocks_per_block_ == 0) {
2122 throw ParquetException("cannot have zero miniblock per block");
2123 }
2124 values_per_mini_block_ = values_per_block_ / mini_blocks_per_block_;
2125 if (values_per_mini_block_ == 0) {
2126 throw ParquetException("cannot have zero value per miniblock");
2127 }
2128 if (values_per_mini_block_ % 32 != 0) {
2129 throw ParquetException(
2130 "the number of values in a miniblock must be multiple of 32, but it's " +
2131 std::to_string(values_per_mini_block_));
2132 }
2133
2134 delta_bit_widths_ = AllocateBuffer(pool_, mini_blocks_per_block_);
2135 block_initialized_ = false;
2136 values_current_mini_block_ = 0;
2137 }
2138
2139 void InitBlock() {
2140 if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
2141
2142 // read the bitwidth of each miniblock
2143 uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
2144 for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
2145 if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) {
2146 ParquetException::EofException();
2147 }
2148 if (bit_width_data[i] > kMaxDeltaBitWidth) {
2149 throw ParquetException("delta bit width larger than integer bit width");
2150 }
2151 }
2152 mini_block_idx_ = 0;
2153 delta_bit_width_ = bit_width_data[0];
2154 values_current_mini_block_ = values_per_mini_block_;
2155 block_initialized_ = true;
2156 }
2157
2158 int GetInternal(T* buffer, int max_values) {
2159 max_values = std::min(max_values, this->num_values_);
2160 DCHECK_LE(static_cast<uint32_t>(max_values), total_value_count_);
2161 int i = 0;
2162 while (i < max_values) {
2163 if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
2164 if (ARROW_PREDICT_FALSE(!block_initialized_)) {
2165 buffer[i++] = last_value_;
2166 --total_value_count_;
2167 if (ARROW_PREDICT_FALSE(i == max_values)) break;
2168 InitBlock();
2169 } else {
2170 ++mini_block_idx_;
2171 if (mini_block_idx_ < mini_blocks_per_block_) {
2172 delta_bit_width_ = delta_bit_widths_->data()[mini_block_idx_];
2173 values_current_mini_block_ = values_per_mini_block_;
2174 } else {
2175 InitBlock();
2176 }
2177 }
2178 }
2179
2180 int values_decode =
2181 std::min(values_current_mini_block_, static_cast<uint32_t>(max_values - i));
2182 if (decoder_.GetBatch(delta_bit_width_, buffer + i, values_decode) !=
2183 values_decode) {
2184 ParquetException::EofException();
2185 }
2186 for (int j = 0; j < values_decode; ++j) {
2187 // Addition between min_delta, packed int and last_value should be treated as
2188 // unsigned addtion. Overflow is as expected.
2189 uint64_t delta =
2190 static_cast<uint64_t>(min_delta_) + static_cast<uint64_t>(buffer[i + j]);
2191 buffer[i + j] = static_cast<T>(delta + static_cast<uint64_t>(last_value_));
2192 last_value_ = buffer[i + j];
2193 }
2194 values_current_mini_block_ -= values_decode;
2195 total_value_count_ -= values_decode;
2196 i += values_decode;
2197 }
2198 this->num_values_ -= max_values;
2199 return max_values;
2200 }
2201
2202 MemoryPool* pool_;
2203 ::arrow::BitUtil::BitReader decoder_;
2204 uint32_t values_per_block_;
2205 uint32_t mini_blocks_per_block_;
2206 uint32_t values_per_mini_block_;
2207 uint32_t values_current_mini_block_;
2208 uint32_t total_value_count_;
2209
2210 bool block_initialized_;
2211 T min_delta_;
2212 uint32_t mini_block_idx_;
2213 std::shared_ptr<ResizableBuffer> delta_bit_widths_;
2214 int delta_bit_width_;
2215
2216 T last_value_;
2217};
2218
2219// ----------------------------------------------------------------------
2220// DELTA_LENGTH_BYTE_ARRAY
2221
2222class DeltaLengthByteArrayDecoder : public DecoderImpl,
2223 virtual public TypedDecoder<ByteArrayType> {
2224 public:
2225 explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
2226 MemoryPool* pool = ::arrow::default_memory_pool())
2227 : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
2228 len_decoder_(nullptr, pool),
2229 pool_(pool) {}
2230
2231 void SetData(int num_values, const uint8_t* data, int len) override {
2232 num_values_ = num_values;
2233 if (len == 0) return;
2234 int total_lengths_len = ::arrow::util::SafeLoadAs<int32_t>(data);
2235 data += 4;
2236 this->len_decoder_.SetData(num_values, data, total_lengths_len);
2237 data_ = data + total_lengths_len;
2238 this->len_ = len - 4 - total_lengths_len;
2239 }
2240
2241 int Decode(ByteArray* buffer, int max_values) override {
2242 using VectorT = ArrowPoolVector<int>;
2243 max_values = std::min(max_values, num_values_);
2244 VectorT lengths(max_values, 0, ::arrow::stl::allocator<int>(pool_));
2245 len_decoder_.Decode(lengths.data(), max_values);
2246 for (int i = 0; i < max_values; ++i) {
2247 buffer[i].len = lengths[i];
2248 buffer[i].ptr = data_;
2249 this->data_ += lengths[i];
2250 this->len_ -= lengths[i];
2251 }
2252 this->num_values_ -= max_values;
2253 return max_values;
2254 }
2255
2256 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2257 int64_t valid_bits_offset,
2258 typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
2259 ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
2260 }
2261
2262 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2263 int64_t valid_bits_offset,
2264 typename EncodingTraits<ByteArrayType>::DictAccumulator* out) override {
2265 ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
2266 }
2267
2268 private:
2269 DeltaBitPackDecoder<Int32Type> len_decoder_;
2270 ::arrow::MemoryPool* pool_;
2271};
2272
2273// ----------------------------------------------------------------------
2274// DELTA_BYTE_ARRAY
2275
2276class DeltaByteArrayDecoder : public DecoderImpl,
2277 virtual public TypedDecoder<ByteArrayType> {
2278 public:
2279 explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
2280 MemoryPool* pool = ::arrow::default_memory_pool())
2281 : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
2282 prefix_len_decoder_(nullptr, pool),
2283 suffix_decoder_(nullptr, pool),
2284 last_value_(0, nullptr) {}
2285
2286 virtual void SetData(int num_values, const uint8_t* data, int len) {
2287 num_values_ = num_values;
2288 if (len == 0) return;
2289 int prefix_len_length = ::arrow::util::SafeLoadAs<int32_t>(data);
2290 data += 4;
2291 len -= 4;
2292 prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
2293 data += prefix_len_length;
2294 len -= prefix_len_length;
2295 suffix_decoder_.SetData(num_values, data, len);
2296 }
2297
2298 // TODO: this doesn't work and requires memory management. We need to allocate
2299 // new strings to store the results.
2300 virtual int Decode(ByteArray* buffer, int max_values) {
2301 max_values = std::min(max_values, this->num_values_);
2302 for (int i = 0; i < max_values; ++i) {
2303 int prefix_len = 0;
2304 prefix_len_decoder_.Decode(&prefix_len, 1);
2305 ByteArray suffix = {0, nullptr};
2306 suffix_decoder_.Decode(&suffix, 1);
2307 buffer[i].len = prefix_len + suffix.len;
2308
2309 uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len));
2310 memcpy(result, last_value_.ptr, prefix_len);
2311 memcpy(result + prefix_len, suffix.ptr, suffix.len);
2312
2313 buffer[i].ptr = result;
2314 last_value_ = buffer[i];
2315 }
2316 this->num_values_ -= max_values;
2317 return max_values;
2318 }
2319
2320 private:
2321 DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
2322 DeltaLengthByteArrayDecoder suffix_decoder_;
2323 ByteArray last_value_;
2324};
2325
2326// ----------------------------------------------------------------------
2327// BYTE_STREAM_SPLIT
2328
2329template <typename DType>
2330class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
2331 public:
2332 using T = typename DType::c_type;
2333 explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);
2334
2335 int Decode(T* buffer, int max_values) override;
2336
2337 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2338 int64_t valid_bits_offset,
2339 typename EncodingTraits<DType>::Accumulator* builder) override;
2340
2341 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2342 int64_t valid_bits_offset,
2343 typename EncodingTraits<DType>::DictAccumulator* builder) override;
2344
2345 void SetData(int num_values, const uint8_t* data, int len) override;
2346
2347 T* EnsureDecodeBuffer(int64_t min_values) {
2348 const int64_t size = sizeof(T) * min_values;
2349 if (!decode_buffer_ || decode_buffer_->size() < size) {
2350 PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size));
2351 }
2352 return reinterpret_cast<T*>(decode_buffer_->mutable_data());
2353 }
2354
2355 private:
2356 int num_values_in_buffer_{0};
2357 std::shared_ptr<Buffer> decode_buffer_;
2358
2359 static constexpr size_t kNumStreams = sizeof(T);
2360};
2361
2362template <typename DType>
2363ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor* descr)
2364 : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {}
2365
2366template <typename DType>
2367void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t* data,
2368 int len) {
2369 DecoderImpl::SetData(num_values, data, len);
2370 if (num_values * static_cast<int64_t>(sizeof(T)) > len) {
2371 throw ParquetException("Data size too small for number of values (corrupted file?)");
2372 }
2373 num_values_in_buffer_ = num_values;
2374}
2375
2376template <typename DType>
2377int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
2378 const int values_to_decode = std::min(num_values_, max_values);
2379 const int num_decoded_previously = num_values_in_buffer_ - num_values_;
2380 const uint8_t* data = data_ + num_decoded_previously;
2381
2382 ::arrow::util::internal::ByteStreamSplitDecode<T>(data, values_to_decode,
2383 num_values_in_buffer_, buffer);
2384 num_values_ -= values_to_decode;
2385 len_ -= sizeof(T) * values_to_decode;
2386 return values_to_decode;
2387}
2388
2389template <typename DType>
2390int ByteStreamSplitDecoder<DType>::DecodeArrow(
2391 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
2392 typename EncodingTraits<DType>::Accumulator* builder) {
2393 constexpr int value_size = static_cast<int>(kNumStreams);
2394 int values_decoded = num_values - null_count;
2395 if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
2396 ParquetException::EofException();
2397 }
2398
2399 PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
2400
2401 const int num_decoded_previously = num_values_in_buffer_ - num_values_;
2402 const uint8_t* data = data_ + num_decoded_previously;
2403 int offset = 0;
2404
2405#if defined(ARROW_HAVE_SIMD_SPLIT)
2406 // Use fast decoding into intermediate buffer. This will also decode
2407 // some null values, but it's fast enough that we don't care.
2408 T* decode_out = EnsureDecodeBuffer(values_decoded);
2409 ::arrow::util::internal::ByteStreamSplitDecode<T>(data, values_decoded,
2410 num_values_in_buffer_, decode_out);
2411
2412 // XXX If null_count is 0, we could even append in bulk or decode directly into
2413 // builder
2414 VisitNullBitmapInline(
2415 valid_bits, valid_bits_offset, num_values, null_count,
2416 [&]() {
2417 builder->UnsafeAppend(decode_out[offset]);
2418 ++offset;
2419 },
2420 [&]() { builder->UnsafeAppendNull(); });
2421
2422#else
2423 VisitNullBitmapInline(
2424 valid_bits, valid_bits_offset, num_values, null_count,
2425 [&]() {
2426 uint8_t gathered_byte_data[kNumStreams];
2427 for (size_t b = 0; b < kNumStreams; ++b) {
2428 const size_t byte_index = b * num_values_in_buffer_ + offset;
2429 gathered_byte_data[b] = data[byte_index];
2430 }
2431 builder->UnsafeAppend(::arrow::util::SafeLoadAs<T>(&gathered_byte_data[0]));
2432 ++offset;
2433 },
2434 [&]() { builder->UnsafeAppendNull(); });
2435#endif
2436
2437 num_values_ -= values_decoded;
2438 len_ -= sizeof(T) * values_decoded;
2439 return values_decoded;
2440}
2441
2442template <typename DType>
2443int ByteStreamSplitDecoder<DType>::DecodeArrow(
2444 int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
2445 typename EncodingTraits<DType>::DictAccumulator* builder) {
2446 ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
2447}
2448
2449} // namespace
2450
2451// ----------------------------------------------------------------------
2452// Encoder and decoder factory functions
2453
2454std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encoding,
2455 bool use_dictionary, const ColumnDescriptor* descr,
2456 MemoryPool* pool) {
2457 if (use_dictionary) {
2458 switch (type_num) {
2459 case Type::INT32:
2460 return std::unique_ptr<Encoder>(new DictEncoderImpl<Int32Type>(descr, pool));
2461 case Type::INT64:
2462 return std::unique_ptr<Encoder>(new DictEncoderImpl<Int64Type>(descr, pool));
2463 case Type::INT96:
2464 return std::unique_ptr<Encoder>(new DictEncoderImpl<Int96Type>(descr, pool));
2465 case Type::FLOAT:
2466 return std::unique_ptr<Encoder>(new DictEncoderImpl<FloatType>(descr, pool));
2467 case Type::DOUBLE:
2468 return std::unique_ptr<Encoder>(new DictEncoderImpl<DoubleType>(descr, pool));
2469 case Type::BYTE_ARRAY:
2470 return std::unique_ptr<Encoder>(new DictEncoderImpl<ByteArrayType>(descr, pool));
2471 case Type::FIXED_LEN_BYTE_ARRAY:
2472 return std::unique_ptr<Encoder>(new DictEncoderImpl<FLBAType>(descr, pool));
2473 default:
2474 DCHECK(false) << "Encoder not implemented";
2475 break;
2476 }
2477 } else if (encoding == Encoding::PLAIN) {
2478 switch (type_num) {
2479 case Type::BOOLEAN:
2480 return std::unique_ptr<Encoder>(new PlainEncoder<BooleanType>(descr, pool));
2481 case Type::INT32:
2482 return std::unique_ptr<Encoder>(new PlainEncoder<Int32Type>(descr, pool));
2483 case Type::INT64:
2484 return std::unique_ptr<Encoder>(new PlainEncoder<Int64Type>(descr, pool));
2485 case Type::INT96:
2486 return std::unique_ptr<Encoder>(new PlainEncoder<Int96Type>(descr, pool));
2487 case Type::FLOAT:
2488 return std::unique_ptr<Encoder>(new PlainEncoder<FloatType>(descr, pool));
2489 case Type::DOUBLE:
2490 return std::unique_ptr<Encoder>(new PlainEncoder<DoubleType>(descr, pool));
2491 case Type::BYTE_ARRAY:
2492 return std::unique_ptr<Encoder>(new PlainEncoder<ByteArrayType>(descr, pool));
2493 case Type::FIXED_LEN_BYTE_ARRAY:
2494 return std::unique_ptr<Encoder>(new PlainEncoder<FLBAType>(descr, pool));
2495 default:
2496 DCHECK(false) << "Encoder not implemented";
2497 break;
2498 }
2499 } else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
2500 switch (type_num) {
2501 case Type::FLOAT:
2502 return std::unique_ptr<Encoder>(
2503 new ByteStreamSplitEncoder<FloatType>(descr, pool));
2504 case Type::DOUBLE:
2505 return std::unique_ptr<Encoder>(
2506 new ByteStreamSplitEncoder<DoubleType>(descr, pool));
2507 default:
2508 throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
2509 break;
2510 }
2511 } else {
2512 ParquetException::NYI("Selected encoding is not supported");
2513 }
2514 DCHECK(false) << "Should not be able to reach this code";
2515 return nullptr;
2516}
2517
2518std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding,
2519 const ColumnDescriptor* descr) {
2520 if (encoding == Encoding::PLAIN) {
2521 switch (type_num) {
2522 case Type::BOOLEAN:
2523 return std::unique_ptr<Decoder>(new PlainBooleanDecoder(descr));
2524 case Type::INT32:
2525 return std::unique_ptr<Decoder>(new PlainDecoder<Int32Type>(descr));
2526 case Type::INT64:
2527 return std::unique_ptr<Decoder>(new PlainDecoder<Int64Type>(descr));
2528 case Type::INT96:
2529 return std::unique_ptr<Decoder>(new PlainDecoder<Int96Type>(descr));
2530 case Type::FLOAT:
2531 return std::unique_ptr<Decoder>(new PlainDecoder<FloatType>(descr));
2532 case Type::DOUBLE:
2533 return std::unique_ptr<Decoder>(new PlainDecoder<DoubleType>(descr));
2534 case Type::BYTE_ARRAY:
2535 return std::unique_ptr<Decoder>(new PlainByteArrayDecoder(descr));
2536 case Type::FIXED_LEN_BYTE_ARRAY:
2537 return std::unique_ptr<Decoder>(new PlainFLBADecoder(descr));
2538 default:
2539 break;
2540 }
2541 } else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
2542 switch (type_num) {
2543 case Type::FLOAT:
2544 return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<FloatType>(descr));
2545 case Type::DOUBLE:
2546 return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<DoubleType>(descr));
2547 default:
2548 throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
2549 break;
2550 }
2551 } else if (encoding == Encoding::DELTA_BINARY_PACKED) {
2552 switch (type_num) {
2553 case Type::INT32:
2554 return std::unique_ptr<Decoder>(new DeltaBitPackDecoder<Int32Type>(descr));
2555 case Type::INT64:
2556 return std::unique_ptr<Decoder>(new DeltaBitPackDecoder<Int64Type>(descr));
2557 default:
2558 throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64");
2559 break;
2560 }
2561 } else {
2562 ParquetException::NYI("Selected encoding is not supported");
2563 }
2564 DCHECK(false) << "Should not be able to reach this code";
2565 return nullptr;
2566}
2567
2568namespace detail {
2569std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num,
2570 const ColumnDescriptor* descr,
2571 MemoryPool* pool) {
2572 switch (type_num) {
2573 case Type::BOOLEAN:
2574 ParquetException::NYI("Dictionary encoding not implemented for boolean type");
2575 case Type::INT32:
2576 return std::unique_ptr<Decoder>(new DictDecoderImpl<Int32Type>(descr, pool));
2577 case Type::INT64:
2578 return std::unique_ptr<Decoder>(new DictDecoderImpl<Int64Type>(descr, pool));
2579 case Type::INT96:
2580 return std::unique_ptr<Decoder>(new DictDecoderImpl<Int96Type>(descr, pool));
2581 case Type::FLOAT:
2582 return std::unique_ptr<Decoder>(new DictDecoderImpl<FloatType>(descr, pool));
2583 case Type::DOUBLE:
2584 return std::unique_ptr<Decoder>(new DictDecoderImpl<DoubleType>(descr, pool));
2585 case Type::BYTE_ARRAY:
2586 return std::unique_ptr<Decoder>(new DictByteArrayDecoderImpl(descr, pool));
2587 case Type::FIXED_LEN_BYTE_ARRAY:
2588 return std::unique_ptr<Decoder>(new DictDecoderImpl<FLBAType>(descr, pool));
2589 default:
2590 break;
2591 }
2592 DCHECK(false) << "Should not be able to reach this code";
2593 return nullptr;
2594}
2595
2596} // namespace detail
2597} // namespace parquet