1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "parquet/arrow/reader.h"
22 #include <unordered_set>
26 #include "arrow/array.h"
27 #include "arrow/buffer.h"
28 #include "arrow/extension_type.h"
29 #include "arrow/io/memory.h"
30 #include "arrow/record_batch.h"
31 #include "arrow/table.h"
32 #include "arrow/type.h"
33 #include "arrow/util/async_generator.h"
34 #include "arrow/util/bit_util.h"
35 #include "arrow/util/future.h"
36 #include "arrow/util/iterator.h"
37 #include "arrow/util/logging.h"
38 #include "arrow/util/make_unique.h"
39 #include "arrow/util/parallel.h"
40 #include "arrow/util/range.h"
41 #include "parquet/arrow/reader_internal.h"
42 #include "parquet/column_reader.h"
43 #include "parquet/exception.h"
44 #include "parquet/file_reader.h"
45 #include "parquet/metadata.h"
46 #include "parquet/properties.h"
47 #include "parquet/schema.h"
50 using arrow::ArrayData
;
51 using arrow::BooleanArray
;
52 using arrow::ChunkedArray
;
53 using arrow::DataType
;
54 using arrow::ExtensionType
;
57 using arrow::Int32Array
;
58 using arrow::ListArray
;
59 using arrow::MemoryPool
;
60 using arrow::RecordBatchReader
;
61 using arrow::ResizableBuffer
;
63 using arrow::StructArray
;
65 using arrow::TimestampArray
;
67 using arrow::internal::checked_cast
;
68 using arrow::internal::Iota
;
70 // Help reduce verbosity
71 using ParquetReader
= parquet::ParquetFileReader
;
73 using parquet::internal::RecordReader
;
75 namespace BitUtil
= arrow::BitUtil
;
81 ::arrow::Result
<std::shared_ptr
<ArrayData
>> ChunksToSingle(const ChunkedArray
& chunked
) {
82 switch (chunked
.num_chunks()) {
84 ARROW_ASSIGN_OR_RAISE(std::shared_ptr
<Array
> array
,
85 ::arrow::MakeArrayOfNull(chunked
.type(), 0));
89 return chunked
.chunk(0)->data();
91 // ARROW-3762(wesm): If item reader yields a chunked array, we reject as
92 // this is not yet implemented
93 return Status::NotImplemented(
94 "Nested data conversions not implemented for chunked array outputs");
100 class ColumnReaderImpl
: public ColumnReader
{
102 virtual Status
GetDefLevels(const int16_t** data
, int64_t* length
) = 0;
103 virtual Status
GetRepLevels(const int16_t** data
, int64_t* length
) = 0;
104 virtual const std::shared_ptr
<Field
> field() = 0;
106 ::arrow::Status
NextBatch(int64_t batch_size
,
107 std::shared_ptr
<::arrow::ChunkedArray
>* out
) final
{
108 RETURN_NOT_OK(LoadBatch(batch_size
));
109 RETURN_NOT_OK(BuildArray(batch_size
, out
));
110 for (int x
= 0; x
< (*out
)->num_chunks(); x
++) {
111 RETURN_NOT_OK((*out
)->chunk(x
)->Validate());
116 virtual ::arrow::Status
LoadBatch(int64_t num_records
) = 0;
118 virtual ::arrow::Status
BuildArray(int64_t length_upper_bound
,
119 std::shared_ptr
<::arrow::ChunkedArray
>* out
) = 0;
120 virtual bool IsOrHasRepeatedChild() const = 0;
125 std::shared_ptr
<std::unordered_set
<int>> VectorToSharedSet(
126 const std::vector
<int>& values
) {
127 std::shared_ptr
<std::unordered_set
<int>> result(new std::unordered_set
<int>());
128 result
->insert(values
.begin(), values
.end());
132 // Forward declaration
133 Status
GetReader(const SchemaField
& field
, const std::shared_ptr
<ReaderContext
>& context
,
134 std::unique_ptr
<ColumnReaderImpl
>* out
);
136 // ----------------------------------------------------------------------
137 // FileReaderImpl forward declaration
139 class FileReaderImpl
: public FileReader
{
141 FileReaderImpl(MemoryPool
* pool
, std::unique_ptr
<ParquetFileReader
> reader
,
142 ArrowReaderProperties properties
)
144 reader_(std::move(reader
)),
145 reader_properties_(std::move(properties
)) {}
148 return SchemaManifest::Make(reader_
->metadata()->schema(),
149 reader_
->metadata()->key_value_metadata(),
150 reader_properties_
, &manifest_
);
153 FileColumnIteratorFactory
SomeRowGroupsFactory(std::vector
<int> row_groups
) {
154 return [row_groups
](int i
, ParquetFileReader
* reader
) {
155 return new FileColumnIterator(i
, reader
, row_groups
);
159 FileColumnIteratorFactory
AllRowGroupsFactory() {
160 return SomeRowGroupsFactory(Iota(reader_
->metadata()->num_row_groups()));
163 Status
BoundsCheckColumn(int column
) {
164 if (column
< 0 || column
>= this->num_columns()) {
165 return Status::Invalid("Column index out of bounds (got ", column
,
168 this->num_columns() - 1, ")");
173 Status
BoundsCheckRowGroup(int row_group
) {
174 // row group indices check
175 if (row_group
< 0 || row_group
>= num_row_groups()) {
176 return Status::Invalid("Some index in row_group_indices is ", row_group
,
177 ", which is either < 0 or >= num_row_groups(",
178 num_row_groups(), ")");
183 Status
BoundsCheck(const std::vector
<int>& row_groups
,
184 const std::vector
<int>& column_indices
) {
185 for (int i
: row_groups
) {
186 RETURN_NOT_OK(BoundsCheckRowGroup(i
));
188 for (int i
: column_indices
) {
189 RETURN_NOT_OK(BoundsCheckColumn(i
));
194 std::shared_ptr
<RowGroupReader
> RowGroup(int row_group_index
) override
;
196 Status
ReadTable(const std::vector
<int>& indices
,
197 std::shared_ptr
<Table
>* out
) override
{
198 return ReadRowGroups(Iota(reader_
->metadata()->num_row_groups()), indices
, out
);
201 Status
GetFieldReader(int i
,
202 const std::shared_ptr
<std::unordered_set
<int>>& included_leaves
,
203 const std::vector
<int>& row_groups
,
204 std::unique_ptr
<ColumnReaderImpl
>* out
) {
205 auto ctx
= std::make_shared
<ReaderContext
>();
206 ctx
->reader
= reader_
.get();
208 ctx
->iterator_factory
= SomeRowGroupsFactory(row_groups
);
209 ctx
->filter_leaves
= true;
210 ctx
->included_leaves
= included_leaves
;
211 return GetReader(manifest_
.schema_fields
[i
], ctx
, out
);
214 Status
GetFieldReaders(const std::vector
<int>& column_indices
,
215 const std::vector
<int>& row_groups
,
216 std::vector
<std::shared_ptr
<ColumnReaderImpl
>>* out
,
217 std::shared_ptr
<::arrow::Schema
>* out_schema
) {
218 // We only need to read schema fields which have columns indicated
219 // in the indices vector
220 ARROW_ASSIGN_OR_RAISE(std::vector
<int> field_indices
,
221 manifest_
.GetFieldIndices(column_indices
));
223 auto included_leaves
= VectorToSharedSet(column_indices
);
225 out
->resize(field_indices
.size());
226 ::arrow::FieldVector
out_fields(field_indices
.size());
227 for (size_t i
= 0; i
< out
->size(); ++i
) {
228 std::unique_ptr
<ColumnReaderImpl
> reader
;
230 GetFieldReader(field_indices
[i
], included_leaves
, row_groups
, &reader
));
232 out_fields
[i
] = reader
->field();
233 out
->at(i
) = std::move(reader
);
236 *out_schema
= ::arrow::schema(std::move(out_fields
), manifest_
.schema_metadata
);
240 Status
GetColumn(int i
, FileColumnIteratorFactory iterator_factory
,
241 std::unique_ptr
<ColumnReader
>* out
);
243 Status
GetColumn(int i
, std::unique_ptr
<ColumnReader
>* out
) override
{
244 return GetColumn(i
, AllRowGroupsFactory(), out
);
247 Status
GetSchema(std::shared_ptr
<::arrow::Schema
>* out
) override
{
248 return FromParquetSchema(reader_
->metadata()->schema(), reader_properties_
,
249 reader_
->metadata()->key_value_metadata(), out
);
252 Status
ReadSchemaField(int i
, std::shared_ptr
<ChunkedArray
>* out
) override
{
253 auto included_leaves
= VectorToSharedSet(Iota(reader_
->metadata()->num_columns()));
254 std::vector
<int> row_groups
= Iota(reader_
->metadata()->num_row_groups());
256 std::unique_ptr
<ColumnReaderImpl
> reader
;
257 RETURN_NOT_OK(GetFieldReader(i
, included_leaves
, row_groups
, &reader
));
259 return ReadColumn(i
, row_groups
, reader
.get(), out
);
262 Status
ReadColumn(int i
, const std::vector
<int>& row_groups
, ColumnReader
* reader
,
263 std::shared_ptr
<ChunkedArray
>* out
) {
264 BEGIN_PARQUET_CATCH_EXCEPTIONS
265 // TODO(wesm): This calculation doesn't make much sense when we have repeated
267 int64_t records_to_read
= 0;
268 for (auto row_group
: row_groups
) {
269 // Can throw exception
271 reader_
->metadata()->RowGroup(row_group
)->ColumnChunk(i
)->num_values();
273 return reader
->NextBatch(records_to_read
, out
);
274 END_PARQUET_CATCH_EXCEPTIONS
277 Status
ReadColumn(int i
, const std::vector
<int>& row_groups
,
278 std::shared_ptr
<ChunkedArray
>* out
) {
279 std::unique_ptr
<ColumnReader
> flat_column_reader
;
280 RETURN_NOT_OK(GetColumn(i
, SomeRowGroupsFactory(row_groups
), &flat_column_reader
));
281 return ReadColumn(i
, row_groups
, flat_column_reader
.get(), out
);
284 Status
ReadColumn(int i
, std::shared_ptr
<ChunkedArray
>* out
) override
{
285 return ReadColumn(i
, Iota(reader_
->metadata()->num_row_groups()), out
);
288 Status
ReadTable(std::shared_ptr
<Table
>* table
) override
{
289 return ReadTable(Iota(reader_
->metadata()->num_columns()), table
);
292 Status
ReadRowGroups(const std::vector
<int>& row_groups
,
293 const std::vector
<int>& indices
,
294 std::shared_ptr
<Table
>* table
) override
;
296 // Helper method used by ReadRowGroups - read the given row groups/columns, skipping
297 // bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
298 // alive in async contexts.
299 Future
<std::shared_ptr
<Table
>> DecodeRowGroups(
300 std::shared_ptr
<FileReaderImpl
> self
, const std::vector
<int>& row_groups
,
301 const std::vector
<int>& column_indices
, ::arrow::internal::Executor
* cpu_executor
);
303 Status
ReadRowGroups(const std::vector
<int>& row_groups
,
304 std::shared_ptr
<Table
>* table
) override
{
305 return ReadRowGroups(row_groups
, Iota(reader_
->metadata()->num_columns()), table
);
308 Status
ReadRowGroup(int row_group_index
, const std::vector
<int>& column_indices
,
309 std::shared_ptr
<Table
>* out
) override
{
310 return ReadRowGroups({row_group_index
}, column_indices
, out
);
313 Status
ReadRowGroup(int i
, std::shared_ptr
<Table
>* table
) override
{
314 return ReadRowGroup(i
, Iota(reader_
->metadata()->num_columns()), table
);
317 Status
GetRecordBatchReader(const std::vector
<int>& row_group_indices
,
318 const std::vector
<int>& column_indices
,
319 std::unique_ptr
<RecordBatchReader
>* out
) override
;
321 Status
GetRecordBatchReader(const std::vector
<int>& row_group_indices
,
322 std::unique_ptr
<RecordBatchReader
>* out
) override
{
323 return GetRecordBatchReader(row_group_indices
,
324 Iota(reader_
->metadata()->num_columns()), out
);
327 ::arrow::Result
<::arrow::AsyncGenerator
<std::shared_ptr
<::arrow::RecordBatch
>>>
328 GetRecordBatchGenerator(std::shared_ptr
<FileReader
> reader
,
329 const std::vector
<int> row_group_indices
,
330 const std::vector
<int> column_indices
,
331 ::arrow::internal::Executor
* cpu_executor
,
332 int row_group_readahead
) override
;
334 int num_columns() const { return reader_
->metadata()->num_columns(); }
336 ParquetFileReader
* parquet_reader() const override
{ return reader_
.get(); }
338 int num_row_groups() const override
{ return reader_
->metadata()->num_row_groups(); }
340 void set_use_threads(bool use_threads
) override
{
341 reader_properties_
.set_use_threads(use_threads
);
344 void set_batch_size(int64_t batch_size
) override
{
345 reader_properties_
.set_batch_size(batch_size
);
348 const ArrowReaderProperties
& properties() const override
{ return reader_properties_
; }
350 const SchemaManifest
& manifest() const override
{ return manifest_
; }
352 Status
ScanContents(std::vector
<int> columns
, const int32_t column_batch_size
,
353 int64_t* num_rows
) override
{
354 BEGIN_PARQUET_CATCH_EXCEPTIONS
355 *num_rows
= ScanFileContents(columns
, column_batch_size
, reader_
.get());
357 END_PARQUET_CATCH_EXCEPTIONS
361 std::unique_ptr
<ParquetFileReader
> reader_
;
362 ArrowReaderProperties reader_properties_
;
364 SchemaManifest manifest_
;
367 class RowGroupRecordBatchReader
: public ::arrow::RecordBatchReader
{
369 RowGroupRecordBatchReader(::arrow::RecordBatchIterator batches
,
370 std::shared_ptr
<::arrow::Schema
> schema
)
371 : batches_(std::move(batches
)), schema_(std::move(schema
)) {}
373 ~RowGroupRecordBatchReader() override
{}
375 Status
ReadNext(std::shared_ptr
<::arrow::RecordBatch
>* out
) override
{
376 return batches_
.Next().Value(out
);
379 std::shared_ptr
<::arrow::Schema
> schema() const override
{ return schema_
; }
382 ::arrow::Iterator
<std::shared_ptr
<::arrow::RecordBatch
>> batches_
;
383 std::shared_ptr
<::arrow::Schema
> schema_
;
386 class ColumnChunkReaderImpl
: public ColumnChunkReader
{
388 ColumnChunkReaderImpl(FileReaderImpl
* impl
, int row_group_index
, int column_index
)
389 : impl_(impl
), column_index_(column_index
), row_group_index_(row_group_index
) {}
391 Status
Read(std::shared_ptr
<::arrow::ChunkedArray
>* out
) override
{
392 return impl_
->ReadColumn(column_index_
, {row_group_index_
}, out
);
396 FileReaderImpl
* impl_
;
398 int row_group_index_
;
401 class RowGroupReaderImpl
: public RowGroupReader
{
403 RowGroupReaderImpl(FileReaderImpl
* impl
, int row_group_index
)
404 : impl_(impl
), row_group_index_(row_group_index
) {}
406 std::shared_ptr
<ColumnChunkReader
> Column(int column_index
) override
{
407 return std::shared_ptr
<ColumnChunkReader
>(
408 new ColumnChunkReaderImpl(impl_
, row_group_index_
, column_index
));
411 Status
ReadTable(const std::vector
<int>& column_indices
,
412 std::shared_ptr
<::arrow::Table
>* out
) override
{
413 return impl_
->ReadRowGroup(row_group_index_
, column_indices
, out
);
416 Status
ReadTable(std::shared_ptr
<::arrow::Table
>* out
) override
{
417 return impl_
->ReadRowGroup(row_group_index_
, out
);
421 FileReaderImpl
* impl_
;
422 int row_group_index_
;
425 // ----------------------------------------------------------------------
426 // Column reader implementations
428 // Leaf reader is for primitive arrays and primitive children of nested arrays
429 class LeafReader
: public ColumnReaderImpl
{
431 LeafReader(std::shared_ptr
<ReaderContext
> ctx
, std::shared_ptr
<Field
> field
,
432 std::unique_ptr
<FileColumnIterator
> input
,
433 ::parquet::internal::LevelInfo leaf_info
)
434 : ctx_(std::move(ctx
)),
435 field_(std::move(field
)),
436 input_(std::move(input
)),
437 descr_(input_
->descr()) {
438 record_reader_
= RecordReader::Make(
439 descr_
, leaf_info
, ctx_
->pool
, field_
->type()->id() == ::arrow::Type::DICTIONARY
);
443 Status
GetDefLevels(const int16_t** data
, int64_t* length
) final
{
444 *data
= record_reader_
->def_levels();
445 *length
= record_reader_
->levels_position();
449 Status
GetRepLevels(const int16_t** data
, int64_t* length
) final
{
450 *data
= record_reader_
->rep_levels();
451 *length
= record_reader_
->levels_position();
455 bool IsOrHasRepeatedChild() const final
{ return false; }
457 Status
LoadBatch(int64_t records_to_read
) final
{
458 BEGIN_PARQUET_CATCH_EXCEPTIONS
460 record_reader_
->Reset();
461 // Pre-allocation gives much better performance for flat columns
462 record_reader_
->Reserve(records_to_read
);
463 while (records_to_read
> 0) {
464 if (!record_reader_
->HasMoreData()) {
467 int64_t records_read
= record_reader_
->ReadRecords(records_to_read
);
468 records_to_read
-= records_read
;
469 if (records_read
== 0) {
473 RETURN_NOT_OK(TransferColumnData(record_reader_
.get(), field_
->type(), descr_
,
476 END_PARQUET_CATCH_EXCEPTIONS
479 ::arrow::Status
BuildArray(int64_t length_upper_bound
,
480 std::shared_ptr
<::arrow::ChunkedArray
>* out
) final
{
485 const std::shared_ptr
<Field
> field() override
{ return field_
; }
488 std::shared_ptr
<ChunkedArray
> out_
;
489 void NextRowGroup() {
490 std::unique_ptr
<PageReader
> page_reader
= input_
->NextChunk();
491 record_reader_
->SetPageReader(std::move(page_reader
));
494 std::shared_ptr
<ReaderContext
> ctx_
;
495 std::shared_ptr
<Field
> field_
;
496 std::unique_ptr
<FileColumnIterator
> input_
;
497 const ColumnDescriptor
* descr_
;
498 std::shared_ptr
<RecordReader
> record_reader_
;
501 // Column reader for extension arrays
502 class ExtensionReader
: public ColumnReaderImpl
{
504 ExtensionReader(std::shared_ptr
<Field
> field
,
505 std::unique_ptr
<ColumnReaderImpl
> storage_reader
)
506 : field_(std::move(field
)), storage_reader_(std::move(storage_reader
)) {}
508 Status
GetDefLevels(const int16_t** data
, int64_t* length
) override
{
509 return storage_reader_
->GetDefLevels(data
, length
);
512 Status
GetRepLevels(const int16_t** data
, int64_t* length
) override
{
513 return storage_reader_
->GetRepLevels(data
, length
);
516 Status
LoadBatch(int64_t number_of_records
) final
{
517 return storage_reader_
->LoadBatch(number_of_records
);
520 Status
BuildArray(int64_t length_upper_bound
,
521 std::shared_ptr
<ChunkedArray
>* out
) override
{
522 std::shared_ptr
<ChunkedArray
> storage
;
523 RETURN_NOT_OK(storage_reader_
->BuildArray(length_upper_bound
, &storage
));
524 *out
= ExtensionType::WrapArray(field_
->type(), storage
);
528 bool IsOrHasRepeatedChild() const final
{
529 return storage_reader_
->IsOrHasRepeatedChild();
532 const std::shared_ptr
<Field
> field() override
{ return field_
; }
535 std::shared_ptr
<Field
> field_
;
536 std::unique_ptr
<ColumnReaderImpl
> storage_reader_
;
539 template <typename IndexType
>
540 class ListReader
: public ColumnReaderImpl
{
542 ListReader(std::shared_ptr
<ReaderContext
> ctx
, std::shared_ptr
<Field
> field
,
543 ::parquet::internal::LevelInfo level_info
,
544 std::unique_ptr
<ColumnReaderImpl
> child_reader
)
545 : ctx_(std::move(ctx
)),
546 field_(std::move(field
)),
547 level_info_(level_info
),
548 item_reader_(std::move(child_reader
)) {}
550 Status
GetDefLevels(const int16_t** data
, int64_t* length
) override
{
551 return item_reader_
->GetDefLevels(data
, length
);
554 Status
GetRepLevels(const int16_t** data
, int64_t* length
) override
{
555 return item_reader_
->GetRepLevels(data
, length
);
558 bool IsOrHasRepeatedChild() const final
{ return true; }
560 Status
LoadBatch(int64_t number_of_records
) final
{
561 return item_reader_
->LoadBatch(number_of_records
);
564 virtual ::arrow::Result
<std::shared_ptr
<ChunkedArray
>> AssembleArray(
565 std::shared_ptr
<ArrayData
> data
) {
566 if (field_
->type()->id() == ::arrow::Type::MAP
) {
567 // Error out if data is not map-compliant instead of aborting in MakeArray below
568 RETURN_NOT_OK(::arrow::MapArray::ValidateChildData(data
->child_data
));
570 std::shared_ptr
<Array
> result
= ::arrow::MakeArray(data
);
571 return std::make_shared
<ChunkedArray
>(result
);
574 Status
BuildArray(int64_t length_upper_bound
,
575 std::shared_ptr
<ChunkedArray
>* out
) override
{
576 const int16_t* def_levels
;
577 const int16_t* rep_levels
;
579 RETURN_NOT_OK(item_reader_
->GetDefLevels(&def_levels
, &num_levels
));
580 RETURN_NOT_OK(item_reader_
->GetRepLevels(&rep_levels
, &num_levels
));
582 std::shared_ptr
<ResizableBuffer
> validity_buffer
;
583 ::parquet::internal::ValidityBitmapInputOutput validity_io
;
584 validity_io
.values_read_upper_bound
= length_upper_bound
;
585 if (field_
->nullable()) {
586 ARROW_ASSIGN_OR_RAISE(
588 AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound
), ctx_
->pool
));
589 validity_io
.valid_bits
= validity_buffer
->mutable_data();
591 ARROW_ASSIGN_OR_RAISE(
592 std::shared_ptr
<ResizableBuffer
> offsets_buffer
,
593 AllocateResizableBuffer(
594 sizeof(IndexType
) * std::max(int64_t{1}, length_upper_bound
+ 1),
596 // Ensure zero initialization in case we have reached a zero length list (and
597 // because first entry is always zero).
598 IndexType
* offset_data
= reinterpret_cast<IndexType
*>(offsets_buffer
->mutable_data());
600 BEGIN_PARQUET_CATCH_EXCEPTIONS
601 ::parquet::internal::DefRepLevelsToList(def_levels
, rep_levels
, num_levels
,
602 level_info_
, &validity_io
, offset_data
);
603 END_PARQUET_CATCH_EXCEPTIONS
605 RETURN_NOT_OK(item_reader_
->BuildArray(offset_data
[validity_io
.values_read
], out
));
607 // Resize to actual number of elements returned.
609 offsets_buffer
->Resize((validity_io
.values_read
+ 1) * sizeof(IndexType
)));
610 if (validity_buffer
!= nullptr) {
612 validity_buffer
->Resize(BitUtil::BytesForBits(validity_io
.values_read
)));
613 validity_buffer
->ZeroPadding();
615 ARROW_ASSIGN_OR_RAISE(std::shared_ptr
<ArrayData
> item_chunk
, ChunksToSingle(**out
));
617 std::vector
<std::shared_ptr
<Buffer
>> buffers
{
618 validity_io
.null_count
> 0 ? validity_buffer
: nullptr, offsets_buffer
};
619 auto data
= std::make_shared
<ArrayData
>(
621 /*length=*/validity_io
.values_read
, std::move(buffers
),
622 std::vector
<std::shared_ptr
<ArrayData
>>{item_chunk
}, validity_io
.null_count
);
624 ARROW_ASSIGN_OR_RAISE(*out
, AssembleArray(std::move(data
)));
628 const std::shared_ptr
<Field
> field() override
{ return field_
; }
631 std::shared_ptr
<ReaderContext
> ctx_
;
632 std::shared_ptr
<Field
> field_
;
633 ::parquet::internal::LevelInfo level_info_
;
634 std::unique_ptr
<ColumnReaderImpl
> item_reader_
;
637 class PARQUET_NO_EXPORT FixedSizeListReader
: public ListReader
<int32_t> {
639 FixedSizeListReader(std::shared_ptr
<ReaderContext
> ctx
, std::shared_ptr
<Field
> field
,
640 ::parquet::internal::LevelInfo level_info
,
641 std::unique_ptr
<ColumnReaderImpl
> child_reader
)
642 : ListReader(std::move(ctx
), std::move(field
), level_info
,
643 std::move(child_reader
)) {}
644 ::arrow::Result
<std::shared_ptr
<ChunkedArray
>> AssembleArray(
645 std::shared_ptr
<ArrayData
> data
) final
{
646 DCHECK_EQ(data
->buffers
.size(), 2);
647 DCHECK_EQ(field()->type()->id(), ::arrow::Type::FIXED_SIZE_LIST
);
648 const auto& type
= checked_cast
<::arrow::FixedSizeListType
&>(*field()->type());
649 const int32_t* offsets
= reinterpret_cast<const int32_t*>(data
->buffers
[1]->data());
650 for (int x
= 1; x
<= data
->length
; x
++) {
651 int32_t size
= offsets
[x
] - offsets
[x
- 1];
652 if (size
!= type
.list_size()) {
653 return Status::Invalid("Expected all lists to be of size=", type
.list_size(),
654 " but index ", x
, " had size=", size
);
657 data
->buffers
.resize(1);
658 std::shared_ptr
<Array
> result
= ::arrow::MakeArray(data
);
659 return std::make_shared
<ChunkedArray
>(result
);
663 class PARQUET_NO_EXPORT StructReader
: public ColumnReaderImpl
{
665 explicit StructReader(std::shared_ptr
<ReaderContext
> ctx
,
666 std::shared_ptr
<Field
> filtered_field
,
667 ::parquet::internal::LevelInfo level_info
,
668 std::vector
<std::unique_ptr
<ColumnReaderImpl
>> children
)
669 : ctx_(std::move(ctx
)),
670 filtered_field_(std::move(filtered_field
)),
671 level_info_(level_info
),
672 children_(std::move(children
)) {
673 // There could be a mix of children some might be repeated some might not be.
674 // If possible use one that isn't since that will be guaranteed to have the least
675 // number of levels to reconstruct a nullable bitmap.
676 auto result
= std::find_if(children_
.begin(), children_
.end(),
677 [](const std::unique_ptr
<ColumnReaderImpl
>& child
) {
678 return !child
->IsOrHasRepeatedChild();
680 if (result
!= children_
.end()) {
681 def_rep_level_child_
= result
->get();
682 has_repeated_child_
= false;
683 } else if (!children_
.empty()) {
684 def_rep_level_child_
= children_
.front().get();
685 has_repeated_child_
= true;
689 bool IsOrHasRepeatedChild() const final
{ return has_repeated_child_
; }
691 Status
LoadBatch(int64_t records_to_read
) override
{
692 for (const std::unique_ptr
<ColumnReaderImpl
>& reader
: children_
) {
693 RETURN_NOT_OK(reader
->LoadBatch(records_to_read
));
697 Status
BuildArray(int64_t length_upper_bound
,
698 std::shared_ptr
<ChunkedArray
>* out
) override
;
699 Status
GetDefLevels(const int16_t** data
, int64_t* length
) override
;
700 Status
GetRepLevels(const int16_t** data
, int64_t* length
) override
;
701 const std::shared_ptr
<Field
> field() override
{ return filtered_field_
; }
704 const std::shared_ptr
<ReaderContext
> ctx_
;
705 const std::shared_ptr
<Field
> filtered_field_
;
706 const ::parquet::internal::LevelInfo level_info_
;
707 const std::vector
<std::unique_ptr
<ColumnReaderImpl
>> children_
;
708 ColumnReaderImpl
* def_rep_level_child_
= nullptr;
709 bool has_repeated_child_
;
712 Status
StructReader::GetDefLevels(const int16_t** data
, int64_t* length
) {
714 if (children_
.size() == 0) {
716 return Status::Invalid("StructReader had no children");
719 // This method should only be called when this struct or one of its parents
720 // are optional/repeated or it has a repeated child.
721 // Meaning all children must have rep/def levels associated
723 RETURN_NOT_OK(def_rep_level_child_
->GetDefLevels(data
, length
));
727 Status
StructReader::GetRepLevels(const int16_t** data
, int64_t* length
) {
729 if (children_
.size() == 0) {
731 return Status::Invalid("StructReader had no childre");
734 // This method should only be called when this struct or one of its parents
735 // are optional/repeated or it has repeated child.
736 // Meaning all children must have rep/def levels associated
738 RETURN_NOT_OK(def_rep_level_child_
->GetRepLevels(data
, length
));
742 Status
StructReader::BuildArray(int64_t length_upper_bound
,
743 std::shared_ptr
<ChunkedArray
>* out
) {
744 std::vector
<std::shared_ptr
<ArrayData
>> children_array_data
;
745 std::shared_ptr
<ResizableBuffer
> null_bitmap
;
747 ::parquet::internal::ValidityBitmapInputOutput validity_io
;
748 validity_io
.values_read_upper_bound
= length_upper_bound
;
749 // This simplifies accounting below.
750 validity_io
.values_read
= length_upper_bound
;
752 BEGIN_PARQUET_CATCH_EXCEPTIONS
753 const int16_t* def_levels
;
754 const int16_t* rep_levels
;
757 if (has_repeated_child_
) {
758 ARROW_ASSIGN_OR_RAISE(
760 AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound
), ctx_
->pool
));
761 validity_io
.valid_bits
= null_bitmap
->mutable_data();
762 RETURN_NOT_OK(GetDefLevels(&def_levels
, &num_levels
));
763 RETURN_NOT_OK(GetRepLevels(&rep_levels
, &num_levels
));
764 DefRepLevelsToBitmap(def_levels
, rep_levels
, num_levels
, level_info_
, &validity_io
);
765 } else if (filtered_field_
->nullable()) {
766 ARROW_ASSIGN_OR_RAISE(
768 AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound
), ctx_
->pool
));
769 validity_io
.valid_bits
= null_bitmap
->mutable_data();
770 RETURN_NOT_OK(GetDefLevels(&def_levels
, &num_levels
));
771 DefLevelsToBitmap(def_levels
, num_levels
, level_info_
, &validity_io
);
774 // Ensure all values are initialized.
776 RETURN_NOT_OK(null_bitmap
->Resize(BitUtil::BytesForBits(validity_io
.values_read
)));
777 null_bitmap
->ZeroPadding();
780 END_PARQUET_CATCH_EXCEPTIONS
781 // Gather children arrays and def levels
782 for (auto& child
: children_
) {
783 std::shared_ptr
<ChunkedArray
> field
;
784 RETURN_NOT_OK(child
->BuildArray(validity_io
.values_read
, &field
));
785 ARROW_ASSIGN_OR_RAISE(std::shared_ptr
<ArrayData
> array_data
, ChunksToSingle(*field
));
786 children_array_data
.push_back(std::move(array_data
));
789 if (!filtered_field_
->nullable() && !has_repeated_child_
) {
790 validity_io
.values_read
= children_array_data
.front()->length
;
793 std::vector
<std::shared_ptr
<Buffer
>> buffers
{validity_io
.null_count
> 0 ? null_bitmap
796 std::make_shared
<ArrayData
>(filtered_field_
->type(),
797 /*length=*/validity_io
.values_read
, std::move(buffers
),
798 std::move(children_array_data
));
799 std::shared_ptr
<Array
> result
= ::arrow::MakeArray(data
);
801 *out
= std::make_shared
<ChunkedArray
>(result
);
805 // ----------------------------------------------------------------------
806 // File reader implementation
808 Status
GetReader(const SchemaField
& field
, const std::shared_ptr
<Field
>& arrow_field
,
809 const std::shared_ptr
<ReaderContext
>& ctx
,
810 std::unique_ptr
<ColumnReaderImpl
>* out
) {
811 BEGIN_PARQUET_CATCH_EXCEPTIONS
813 auto type_id
= arrow_field
->type()->id();
815 if (type_id
== ::arrow::Type::EXTENSION
) {
816 auto storage_field
= arrow_field
->WithType(
817 checked_cast
<const ExtensionType
&>(*arrow_field
->type()).storage_type());
818 RETURN_NOT_OK(GetReader(field
, storage_field
, ctx
, out
));
819 out
->reset(new ExtensionReader(arrow_field
, std::move(*out
)));
823 if (field
.children
.size() == 0) {
824 if (!field
.is_leaf()) {
825 return Status::Invalid("Parquet non-leaf node has no children");
827 if (!ctx
->IncludesLeaf(field
.column_index
)) {
831 std::unique_ptr
<FileColumnIterator
> input(
832 ctx
->iterator_factory(field
.column_index
, ctx
->reader
));
833 out
->reset(new LeafReader(ctx
, arrow_field
, std::move(input
), field
.level_info
));
834 } else if (type_id
== ::arrow::Type::LIST
|| type_id
== ::arrow::Type::MAP
||
835 type_id
== ::arrow::Type::FIXED_SIZE_LIST
||
836 type_id
== ::arrow::Type::LARGE_LIST
) {
837 auto list_field
= arrow_field
;
838 auto child
= &field
.children
[0];
839 std::unique_ptr
<ColumnReaderImpl
> child_reader
;
840 RETURN_NOT_OK(GetReader(*child
, ctx
, &child_reader
));
841 if (child_reader
== nullptr) {
846 // These two types might not be equal if there column pruning occurred.
847 // further down the stack.
848 const std::shared_ptr
<DataType
> reader_child_type
= child_reader
->field()->type();
849 // This should really never happen but was raised as a question on the code
850 // review, this should be pretty cheap check so leave it in.
851 if (ARROW_PREDICT_FALSE(list_field
->type()->num_fields() != 1)) {
852 return Status::Invalid("expected exactly one child field for: ",
853 list_field
->ToString());
855 const DataType
& schema_child_type
= *(list_field
->type()->field(0)->type());
856 if (type_id
== ::arrow::Type::MAP
) {
857 if (reader_child_type
->num_fields() != 2 ||
858 !reader_child_type
->field(0)->type()->Equals(
859 *schema_child_type
.field(0)->type())) {
860 // This case applies if either key or value are completed filtered
861 // out so we can take the type as is or the key was partially
862 // so keeping it as a map no longer makes sence.
863 list_field
= list_field
->WithType(::arrow::list(child_reader
->field()));
864 } else if (!reader_child_type
->field(1)->type()->Equals(
865 *schema_child_type
.field(1)->type())) {
866 list_field
= list_field
->WithType(std::make_shared
<::arrow::MapType
>(
867 reader_child_type
->field(
868 0), // field 0 is unchanged baed on previous if statement
869 reader_child_type
->field(1)));
871 // Map types are list<struct<key, value>> so use ListReader
872 // for reconstruction.
873 out
->reset(new ListReader
<int32_t>(ctx
, list_field
, field
.level_info
,
874 std::move(child_reader
)));
875 } else if (type_id
== ::arrow::Type::LIST
) {
876 if (!reader_child_type
->Equals(schema_child_type
)) {
877 list_field
= list_field
->WithType(::arrow::list(reader_child_type
));
880 out
->reset(new ListReader
<int32_t>(ctx
, list_field
, field
.level_info
,
881 std::move(child_reader
)));
882 } else if (type_id
== ::arrow::Type::LARGE_LIST
) {
883 if (!reader_child_type
->Equals(schema_child_type
)) {
884 list_field
= list_field
->WithType(::arrow::large_list(reader_child_type
));
887 out
->reset(new ListReader
<int64_t>(ctx
, list_field
, field
.level_info
,
888 std::move(child_reader
)));
889 } else if (type_id
== ::arrow::Type::FIXED_SIZE_LIST
) {
890 if (!reader_child_type
->Equals(schema_child_type
)) {
891 auto& fixed_list_type
=
892 checked_cast
<const ::arrow::FixedSizeListType
&>(*list_field
->type());
893 int32_t list_size
= fixed_list_type
.list_size();
895 list_field
->WithType(::arrow::fixed_size_list(reader_child_type
, list_size
));
898 out
->reset(new FixedSizeListReader(ctx
, list_field
, field
.level_info
,
899 std::move(child_reader
)));
901 return Status::UnknownError("Unknown list type: ", field
.field
->ToString());
903 } else if (type_id
== ::arrow::Type::STRUCT
) {
904 std::vector
<std::shared_ptr
<Field
>> child_fields
;
905 int arrow_field_idx
= 0;
906 std::vector
<std::unique_ptr
<ColumnReaderImpl
>> child_readers
;
907 for (const auto& child
: field
.children
) {
908 std::unique_ptr
<ColumnReaderImpl
> child_reader
;
909 RETURN_NOT_OK(GetReader(child
, ctx
, &child_reader
));
912 // If all children were pruned, then we do not try to read this field
915 std::shared_ptr
<::arrow::Field
> child_field
= child
.field
;
916 const DataType
& reader_child_type
= *child_reader
->field()->type();
917 const DataType
& schema_child_type
=
918 *arrow_field
->type()->field(arrow_field_idx
++)->type();
919 // These might not be equal if column pruning occurred.
920 if (!schema_child_type
.Equals(reader_child_type
)) {
921 child_field
= child_field
->WithType(child_reader
->field()->type());
923 child_fields
.push_back(child_field
);
924 child_readers
.emplace_back(std::move(child_reader
));
926 if (child_fields
.size() == 0) {
930 auto filtered_field
=
931 ::arrow::field(arrow_field
->name(), ::arrow::struct_(child_fields
),
932 arrow_field
->nullable(), arrow_field
->metadata());
933 out
->reset(new StructReader(ctx
, filtered_field
, field
.level_info
,
934 std::move(child_readers
)));
936 return Status::Invalid("Unsupported nested type: ", arrow_field
->ToString());
940 END_PARQUET_CATCH_EXCEPTIONS
943 Status
GetReader(const SchemaField
& field
, const std::shared_ptr
<ReaderContext
>& ctx
,
944 std::unique_ptr
<ColumnReaderImpl
>* out
) {
945 return GetReader(field
, field
.field
, ctx
, out
);
950 Status
FileReaderImpl::GetRecordBatchReader(const std::vector
<int>& row_groups
,
951 const std::vector
<int>& column_indices
,
952 std::unique_ptr
<RecordBatchReader
>* out
) {
953 RETURN_NOT_OK(BoundsCheck(row_groups
, column_indices
));
955 if (reader_properties_
.pre_buffer()) {
956 // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
957 BEGIN_PARQUET_CATCH_EXCEPTIONS
958 reader_
->PreBuffer(row_groups
, column_indices
, reader_properties_
.io_context(),
959 reader_properties_
.cache_options());
960 END_PARQUET_CATCH_EXCEPTIONS
963 std::vector
<std::shared_ptr
<ColumnReaderImpl
>> readers
;
964 std::shared_ptr
<::arrow::Schema
> batch_schema
;
965 RETURN_NOT_OK(GetFieldReaders(column_indices
, row_groups
, &readers
, &batch_schema
));
967 if (readers
.empty()) {
968 // Just generate all batches right now; they're cheap since they have no columns.
969 int64_t batch_size
= properties().batch_size();
970 auto max_sized_batch
=
971 ::arrow::RecordBatch::Make(batch_schema
, batch_size
, ::arrow::ArrayVector
{});
973 ::arrow::RecordBatchVector batches
;
975 for (int row_group
: row_groups
) {
976 int64_t num_rows
= parquet_reader()->metadata()->RowGroup(row_group
)->num_rows();
978 batches
.insert(batches
.end(), num_rows
/ batch_size
, max_sized_batch
);
980 if (int64_t trailing_rows
= num_rows
% batch_size
) {
981 batches
.push_back(max_sized_batch
->Slice(0, trailing_rows
));
985 *out
= ::arrow::internal::make_unique
<RowGroupRecordBatchReader
>(
986 ::arrow::MakeVectorIterator(std::move(batches
)), std::move(batch_schema
));
991 int64_t num_rows
= 0;
992 for (int row_group
: row_groups
) {
993 num_rows
+= parquet_reader()->metadata()->RowGroup(row_group
)->num_rows();
996 using ::arrow::RecordBatchIterator
;
998 // NB: This lambda will be invoked outside the scope of this call to
999 // `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
1000 // `this` is a non-owning pointer so we are relying on the parent FileReader outliving
1001 // this RecordBatchReader.
1002 ::arrow::Iterator
<RecordBatchIterator
> batches
= ::arrow::MakeFunctionIterator(
1003 [readers
, batch_schema
, num_rows
,
1004 this]() mutable -> ::arrow::Result
<RecordBatchIterator
> {
1005 ::arrow::ChunkedArrayVector
columns(readers
.size());
1007 // don't reserve more rows than necessary
1008 int64_t batch_size
= std::min(properties().batch_size(), num_rows
);
1009 num_rows
-= batch_size
;
1011 RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
1012 reader_properties_
.use_threads(), static_cast<int>(readers
.size()),
1013 [&](int i
) { return readers
[i
]->NextBatch(batch_size
, &columns
[i
]); }));
1015 for (const auto& column
: columns
) {
1016 if (column
== nullptr || column
->length() == 0) {
1017 return ::arrow::IterationTraits
<RecordBatchIterator
>::End();
1021 auto table
= ::arrow::Table::Make(batch_schema
, std::move(columns
));
1022 auto table_reader
= std::make_shared
<::arrow::TableBatchReader
>(*table
);
1024 // NB: explicitly preserve table so that table_reader doesn't outlive it
1025 return ::arrow::MakeFunctionIterator(
1026 [table
, table_reader
] { return table_reader
->Next(); });
1029 *out
= ::arrow::internal::make_unique
<RowGroupRecordBatchReader
>(
1030 ::arrow::MakeFlattenIterator(std::move(batches
)), std::move(batch_schema
));
1032 return Status::OK();
1035 /// Given a file reader and a list of row groups, this is a generator of record
1036 /// batch generators (where each sub-generator is the contents of a single row group).
1037 class RowGroupGenerator
{
1039 using RecordBatchGenerator
=
1040 ::arrow::AsyncGenerator
<std::shared_ptr
<::arrow::RecordBatch
>>;
1042 explicit RowGroupGenerator(std::shared_ptr
<FileReaderImpl
> arrow_reader
,
1043 ::arrow::internal::Executor
* cpu_executor
,
1044 std::vector
<int> row_groups
, std::vector
<int> column_indices
)
1045 : arrow_reader_(std::move(arrow_reader
)),
1046 cpu_executor_(cpu_executor
),
1047 row_groups_(std::move(row_groups
)),
1048 column_indices_(std::move(column_indices
)),
1051 ::arrow::Future
<RecordBatchGenerator
> operator()() {
1052 if (index_
>= row_groups_
.size()) {
1053 return ::arrow::AsyncGeneratorEnd
<RecordBatchGenerator
>();
1055 int row_group
= row_groups_
[index_
++];
1056 std::vector
<int> column_indices
= column_indices_
;
1057 auto reader
= arrow_reader_
;
1058 if (!reader
->properties().pre_buffer()) {
1059 return SubmitRead(cpu_executor_
, reader
, row_group
, column_indices
);
1061 auto ready
= reader
->parquet_reader()->WhenBuffered({row_group
}, column_indices
);
1062 if (cpu_executor_
) ready
= cpu_executor_
->TransferAlways(ready
);
1063 return ready
.Then([=]() -> ::arrow::Future
<RecordBatchGenerator
> {
1064 return ReadOneRowGroup(cpu_executor_
, reader
, row_group
, column_indices
);
1069 // Synchronous fallback for when pre-buffer isn't enabled.
1071 // Making the Parquet reader truly asynchronous requires heavy refactoring, so the
1072 // generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for
1073 // async I/O without forcing readahead.
1074 static ::arrow::Future
<RecordBatchGenerator
> SubmitRead(
1075 ::arrow::internal::Executor
* cpu_executor
, std::shared_ptr
<FileReaderImpl
> self
,
1076 const int row_group
, const std::vector
<int>& column_indices
) {
1077 if (!cpu_executor
) {
1078 return ReadOneRowGroup(cpu_executor
, self
, row_group
, column_indices
);
1080 // If we have an executor, then force transfer (even if I/O was complete)
1081 return ::arrow::DeferNotOk(cpu_executor
->Submit(ReadOneRowGroup
, cpu_executor
, self
,
1082 row_group
, column_indices
));
1085 static ::arrow::Future
<RecordBatchGenerator
> ReadOneRowGroup(
1086 ::arrow::internal::Executor
* cpu_executor
, std::shared_ptr
<FileReaderImpl
> self
,
1087 const int row_group
, const std::vector
<int>& column_indices
) {
1088 // Skips bound checks/pre-buffering, since we've done that already
1089 const int64_t batch_size
= self
->properties().batch_size();
1090 return self
->DecodeRowGroups(self
, {row_group
}, column_indices
, cpu_executor
)
1091 .Then([batch_size
](const std::shared_ptr
<Table
>& table
)
1092 -> ::arrow::Result
<RecordBatchGenerator
> {
1093 ::arrow::TableBatchReader
table_reader(*table
);
1094 table_reader
.set_chunksize(batch_size
);
1095 ::arrow::RecordBatchVector batches
;
1096 RETURN_NOT_OK(table_reader
.ReadAll(&batches
));
1097 return ::arrow::MakeVectorGenerator(std::move(batches
));
1101 std::shared_ptr
<FileReaderImpl
> arrow_reader_
;
1102 ::arrow::internal::Executor
* cpu_executor_
;
1103 std::vector
<int> row_groups_
;
1104 std::vector
<int> column_indices_
;
1108 ::arrow::Result
<::arrow::AsyncGenerator
<std::shared_ptr
<::arrow::RecordBatch
>>>
1109 FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr
<FileReader
> reader
,
1110 const std::vector
<int> row_group_indices
,
1111 const std::vector
<int> column_indices
,
1112 ::arrow::internal::Executor
* cpu_executor
,
1113 int row_group_readahead
) {
1114 RETURN_NOT_OK(BoundsCheck(row_group_indices
, column_indices
));
1115 if (reader_properties_
.pre_buffer()) {
1116 BEGIN_PARQUET_CATCH_EXCEPTIONS
1117 reader_
->PreBuffer(row_group_indices
, column_indices
, reader_properties_
.io_context(),
1118 reader_properties_
.cache_options());
1119 END_PARQUET_CATCH_EXCEPTIONS
1121 ::arrow::AsyncGenerator
<RowGroupGenerator::RecordBatchGenerator
> row_group_generator
=
1122 RowGroupGenerator(::arrow::internal::checked_pointer_cast
<FileReaderImpl
>(reader
),
1123 cpu_executor
, row_group_indices
, column_indices
);
1124 if (row_group_readahead
> 0) {
1125 row_group_generator
= ::arrow::MakeReadaheadGenerator(std::move(row_group_generator
),
1126 row_group_readahead
);
1128 return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator
));
1131 Status
FileReaderImpl::GetColumn(int i
, FileColumnIteratorFactory iterator_factory
,
1132 std::unique_ptr
<ColumnReader
>* out
) {
1133 RETURN_NOT_OK(BoundsCheckColumn(i
));
1134 auto ctx
= std::make_shared
<ReaderContext
>();
1135 ctx
->reader
= reader_
.get();
1137 ctx
->iterator_factory
= iterator_factory
;
1138 ctx
->filter_leaves
= false;
1139 std::unique_ptr
<ColumnReaderImpl
> result
;
1140 RETURN_NOT_OK(GetReader(manifest_
.schema_fields
[i
], ctx
, &result
));
1141 out
->reset(result
.release());
1142 return Status::OK();
1145 Status
FileReaderImpl::ReadRowGroups(const std::vector
<int>& row_groups
,
1146 const std::vector
<int>& column_indices
,
1147 std::shared_ptr
<Table
>* out
) {
1148 RETURN_NOT_OK(BoundsCheck(row_groups
, column_indices
));
1150 // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
1151 if (reader_properties_
.pre_buffer()) {
1152 BEGIN_PARQUET_CATCH_EXCEPTIONS
1153 parquet_reader()->PreBuffer(row_groups
, column_indices
,
1154 reader_properties_
.io_context(),
1155 reader_properties_
.cache_options());
1156 END_PARQUET_CATCH_EXCEPTIONS
1159 auto fut
= DecodeRowGroups(/*self=*/nullptr, row_groups
, column_indices
,
1160 /*cpu_executor=*/nullptr);
1161 ARROW_ASSIGN_OR_RAISE(*out
, fut
.MoveResult());
1162 return Status::OK();
1165 Future
<std::shared_ptr
<Table
>> FileReaderImpl::DecodeRowGroups(
1166 std::shared_ptr
<FileReaderImpl
> self
, const std::vector
<int>& row_groups
,
1167 const std::vector
<int>& column_indices
, ::arrow::internal::Executor
* cpu_executor
) {
1168 // `self` is used solely to keep `this` alive in an async context - but we use this
1169 // in a sync context too so use `this` over `self`
1170 std::vector
<std::shared_ptr
<ColumnReaderImpl
>> readers
;
1171 std::shared_ptr
<::arrow::Schema
> result_schema
;
1172 RETURN_NOT_OK(GetFieldReaders(column_indices
, row_groups
, &readers
, &result_schema
));
1173 // OptionalParallelForAsync requires an executor
1174 if (!cpu_executor
) cpu_executor
= ::arrow::internal::GetCpuThreadPool();
1176 auto read_column
= [row_groups
, self
, this](size_t i
,
1177 std::shared_ptr
<ColumnReaderImpl
> reader
)
1178 -> ::arrow::Result
<std::shared_ptr
<::arrow::ChunkedArray
>> {
1179 std::shared_ptr
<::arrow::ChunkedArray
> column
;
1180 RETURN_NOT_OK(ReadColumn(static_cast<int>(i
), row_groups
, reader
.get(), &column
));
1183 auto make_table
= [result_schema
, row_groups
, self
,
1184 this](const ::arrow::ChunkedArrayVector
& columns
)
1185 -> ::arrow::Result
<std::shared_ptr
<Table
>> {
1186 int64_t num_rows
= 0;
1187 if (!columns
.empty()) {
1188 num_rows
= columns
[0]->length();
1190 for (int i
: row_groups
) {
1191 num_rows
+= parquet_reader()->metadata()->RowGroup(i
)->num_rows();
1194 auto table
= Table::Make(std::move(result_schema
), columns
, num_rows
);
1195 RETURN_NOT_OK(table
->Validate());
1198 return ::arrow::internal::OptionalParallelForAsync(reader_properties_
.use_threads(),
1199 std::move(readers
), read_column
,
1201 .Then(std::move(make_table
));
1204 std::shared_ptr
<RowGroupReader
> FileReaderImpl::RowGroup(int row_group_index
) {
1205 return std::make_shared
<RowGroupReaderImpl
>(this, row_group_index
);
1208 // ----------------------------------------------------------------------
1209 // Public factory functions
1211 Status
FileReader::GetRecordBatchReader(const std::vector
<int>& row_group_indices
,
1212 std::shared_ptr
<RecordBatchReader
>* out
) {
1213 std::unique_ptr
<RecordBatchReader
> tmp
;
1214 ARROW_RETURN_NOT_OK(GetRecordBatchReader(row_group_indices
, &tmp
));
1215 out
->reset(tmp
.release());
1216 return Status::OK();
1219 Status
FileReader::GetRecordBatchReader(const std::vector
<int>& row_group_indices
,
1220 const std::vector
<int>& column_indices
,
1221 std::shared_ptr
<RecordBatchReader
>* out
) {
1222 std::unique_ptr
<RecordBatchReader
> tmp
;
1223 ARROW_RETURN_NOT_OK(GetRecordBatchReader(row_group_indices
, column_indices
, &tmp
));
1224 out
->reset(tmp
.release());
1225 return Status::OK();
1228 Status
FileReader::Make(::arrow::MemoryPool
* pool
,
1229 std::unique_ptr
<ParquetFileReader
> reader
,
1230 const ArrowReaderProperties
& properties
,
1231 std::unique_ptr
<FileReader
>* out
) {
1232 out
->reset(new FileReaderImpl(pool
, std::move(reader
), properties
));
1233 return static_cast<FileReaderImpl
*>(out
->get())->Init();
1236 Status
FileReader::Make(::arrow::MemoryPool
* pool
,
1237 std::unique_ptr
<ParquetFileReader
> reader
,
1238 std::unique_ptr
<FileReader
>* out
) {
1239 return Make(pool
, std::move(reader
), default_arrow_reader_properties(), out
);
1242 FileReaderBuilder::FileReaderBuilder()
1243 : pool_(::arrow::default_memory_pool()),
1244 properties_(default_arrow_reader_properties()) {}
1246 Status
FileReaderBuilder::Open(std::shared_ptr
<::arrow::io::RandomAccessFile
> file
,
1247 const ReaderProperties
& properties
,
1248 std::shared_ptr
<FileMetaData
> metadata
) {
1249 PARQUET_CATCH_NOT_OK(raw_reader_
= ParquetReader::Open(std::move(file
), properties
,
1250 std::move(metadata
)));
1251 return Status::OK();
1254 FileReaderBuilder
* FileReaderBuilder::memory_pool(::arrow::MemoryPool
* pool
) {
1259 FileReaderBuilder
* FileReaderBuilder::properties(
1260 const ArrowReaderProperties
& arg_properties
) {
1261 properties_
= arg_properties
;
1265 Status
FileReaderBuilder::Build(std::unique_ptr
<FileReader
>* out
) {
1266 return FileReader::Make(pool_
, std::move(raw_reader_
), properties_
, out
);
1269 Status
OpenFile(std::shared_ptr
<::arrow::io::RandomAccessFile
> file
, MemoryPool
* pool
,
1270 std::unique_ptr
<FileReader
>* reader
) {
1271 FileReaderBuilder builder
;
1272 RETURN_NOT_OK(builder
.Open(std::move(file
)));
1273 return builder
.memory_pool(pool
)->Build(reader
);
1276 namespace internal
{
1278 Status
FuzzReader(std::unique_ptr
<FileReader
> reader
) {
1279 auto st
= Status::OK();
1280 for (int i
= 0; i
< reader
->num_row_groups(); ++i
) {
1281 std::shared_ptr
<Table
> table
;
1282 auto row_group_status
= reader
->ReadRowGroup(i
, &table
);
1283 if (row_group_status
.ok()) {
1284 row_group_status
&= table
->ValidateFull();
1286 st
&= row_group_status
;
1291 Status
FuzzReader(const uint8_t* data
, int64_t size
) {
1292 auto buffer
= std::make_shared
<::arrow::Buffer
>(data
, size
);
1293 auto file
= std::make_shared
<::arrow::io::BufferReader
>(buffer
);
1294 FileReaderBuilder builder
;
1295 RETURN_NOT_OK(builder
.Open(std::move(file
)));
1297 std::unique_ptr
<FileReader
> reader
;
1298 RETURN_NOT_OK(builder
.Build(&reader
));
1299 return FuzzReader(std::move(reader
));
1302 } // namespace internal
1304 } // namespace arrow
1305 } // namespace parquet