]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/parquet/arrow/reader.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / parquet / arrow / reader.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "parquet/arrow/reader.h"
19
20 #include <algorithm>
21 #include <cstring>
22 #include <unordered_set>
23 #include <utility>
24 #include <vector>
25
26 #include "arrow/array.h"
27 #include "arrow/buffer.h"
28 #include "arrow/extension_type.h"
29 #include "arrow/io/memory.h"
30 #include "arrow/record_batch.h"
31 #include "arrow/table.h"
32 #include "arrow/type.h"
33 #include "arrow/util/async_generator.h"
34 #include "arrow/util/bit_util.h"
35 #include "arrow/util/future.h"
36 #include "arrow/util/iterator.h"
37 #include "arrow/util/logging.h"
38 #include "arrow/util/make_unique.h"
39 #include "arrow/util/parallel.h"
40 #include "arrow/util/range.h"
41 #include "parquet/arrow/reader_internal.h"
42 #include "parquet/column_reader.h"
43 #include "parquet/exception.h"
44 #include "parquet/file_reader.h"
45 #include "parquet/metadata.h"
46 #include "parquet/properties.h"
47 #include "parquet/schema.h"
48
49 using arrow::Array;
50 using arrow::ArrayData;
51 using arrow::BooleanArray;
52 using arrow::ChunkedArray;
53 using arrow::DataType;
54 using arrow::ExtensionType;
55 using arrow::Field;
56 using arrow::Future;
57 using arrow::Int32Array;
58 using arrow::ListArray;
59 using arrow::MemoryPool;
60 using arrow::RecordBatchReader;
61 using arrow::ResizableBuffer;
62 using arrow::Status;
63 using arrow::StructArray;
64 using arrow::Table;
65 using arrow::TimestampArray;
66
67 using arrow::internal::checked_cast;
68 using arrow::internal::Iota;
69
70 // Help reduce verbosity
71 using ParquetReader = parquet::ParquetFileReader;
72
73 using parquet::internal::RecordReader;
74
75 namespace BitUtil = arrow::BitUtil;
76
77 namespace parquet {
78 namespace arrow {
79 namespace {
80
81 ::arrow::Result<std::shared_ptr<ArrayData>> ChunksToSingle(const ChunkedArray& chunked) {
82 switch (chunked.num_chunks()) {
83 case 0: {
84 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> array,
85 ::arrow::MakeArrayOfNull(chunked.type(), 0));
86 return array->data();
87 }
88 case 1:
89 return chunked.chunk(0)->data();
90 default:
91 // ARROW-3762(wesm): If item reader yields a chunked array, we reject as
92 // this is not yet implemented
93 return Status::NotImplemented(
94 "Nested data conversions not implemented for chunked array outputs");
95 }
96 }
97
98 } // namespace
99
100 class ColumnReaderImpl : public ColumnReader {
101 public:
102 virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0;
103 virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0;
104 virtual const std::shared_ptr<Field> field() = 0;
105
106 ::arrow::Status NextBatch(int64_t batch_size,
107 std::shared_ptr<::arrow::ChunkedArray>* out) final {
108 RETURN_NOT_OK(LoadBatch(batch_size));
109 RETURN_NOT_OK(BuildArray(batch_size, out));
110 for (int x = 0; x < (*out)->num_chunks(); x++) {
111 RETURN_NOT_OK((*out)->chunk(x)->Validate());
112 }
113 return Status::OK();
114 }
115
116 virtual ::arrow::Status LoadBatch(int64_t num_records) = 0;
117
118 virtual ::arrow::Status BuildArray(int64_t length_upper_bound,
119 std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
120 virtual bool IsOrHasRepeatedChild() const = 0;
121 };
122
123 namespace {
124
125 std::shared_ptr<std::unordered_set<int>> VectorToSharedSet(
126 const std::vector<int>& values) {
127 std::shared_ptr<std::unordered_set<int>> result(new std::unordered_set<int>());
128 result->insert(values.begin(), values.end());
129 return result;
130 }
131
132 // Forward declaration
133 Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& context,
134 std::unique_ptr<ColumnReaderImpl>* out);
135
136 // ----------------------------------------------------------------------
137 // FileReaderImpl forward declaration
138
139 class FileReaderImpl : public FileReader {
140 public:
141 FileReaderImpl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
142 ArrowReaderProperties properties)
143 : pool_(pool),
144 reader_(std::move(reader)),
145 reader_properties_(std::move(properties)) {}
146
147 Status Init() {
148 return SchemaManifest::Make(reader_->metadata()->schema(),
149 reader_->metadata()->key_value_metadata(),
150 reader_properties_, &manifest_);
151 }
152
153 FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups) {
154 return [row_groups](int i, ParquetFileReader* reader) {
155 return new FileColumnIterator(i, reader, row_groups);
156 };
157 }
158
159 FileColumnIteratorFactory AllRowGroupsFactory() {
160 return SomeRowGroupsFactory(Iota(reader_->metadata()->num_row_groups()));
161 }
162
163 Status BoundsCheckColumn(int column) {
164 if (column < 0 || column >= this->num_columns()) {
165 return Status::Invalid("Column index out of bounds (got ", column,
166 ", should be "
167 "between 0 and ",
168 this->num_columns() - 1, ")");
169 }
170 return Status::OK();
171 }
172
173 Status BoundsCheckRowGroup(int row_group) {
174 // row group indices check
175 if (row_group < 0 || row_group >= num_row_groups()) {
176 return Status::Invalid("Some index in row_group_indices is ", row_group,
177 ", which is either < 0 or >= num_row_groups(",
178 num_row_groups(), ")");
179 }
180 return Status::OK();
181 }
182
183 Status BoundsCheck(const std::vector<int>& row_groups,
184 const std::vector<int>& column_indices) {
185 for (int i : row_groups) {
186 RETURN_NOT_OK(BoundsCheckRowGroup(i));
187 }
188 for (int i : column_indices) {
189 RETURN_NOT_OK(BoundsCheckColumn(i));
190 }
191 return Status::OK();
192 }
193
194 std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;
195
196 Status ReadTable(const std::vector<int>& indices,
197 std::shared_ptr<Table>* out) override {
198 return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out);
199 }
200
201 Status GetFieldReader(int i,
202 const std::shared_ptr<std::unordered_set<int>>& included_leaves,
203 const std::vector<int>& row_groups,
204 std::unique_ptr<ColumnReaderImpl>* out) {
205 auto ctx = std::make_shared<ReaderContext>();
206 ctx->reader = reader_.get();
207 ctx->pool = pool_;
208 ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
209 ctx->filter_leaves = true;
210 ctx->included_leaves = included_leaves;
211 return GetReader(manifest_.schema_fields[i], ctx, out);
212 }
213
214 Status GetFieldReaders(const std::vector<int>& column_indices,
215 const std::vector<int>& row_groups,
216 std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
217 std::shared_ptr<::arrow::Schema>* out_schema) {
218 // We only need to read schema fields which have columns indicated
219 // in the indices vector
220 ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
221 manifest_.GetFieldIndices(column_indices));
222
223 auto included_leaves = VectorToSharedSet(column_indices);
224
225 out->resize(field_indices.size());
226 ::arrow::FieldVector out_fields(field_indices.size());
227 for (size_t i = 0; i < out->size(); ++i) {
228 std::unique_ptr<ColumnReaderImpl> reader;
229 RETURN_NOT_OK(
230 GetFieldReader(field_indices[i], included_leaves, row_groups, &reader));
231
232 out_fields[i] = reader->field();
233 out->at(i) = std::move(reader);
234 }
235
236 *out_schema = ::arrow::schema(std::move(out_fields), manifest_.schema_metadata);
237 return Status::OK();
238 }
239
240 Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
241 std::unique_ptr<ColumnReader>* out);
242
243 Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
244 return GetColumn(i, AllRowGroupsFactory(), out);
245 }
246
247 Status GetSchema(std::shared_ptr<::arrow::Schema>* out) override {
248 return FromParquetSchema(reader_->metadata()->schema(), reader_properties_,
249 reader_->metadata()->key_value_metadata(), out);
250 }
251
252 Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
253 auto included_leaves = VectorToSharedSet(Iota(reader_->metadata()->num_columns()));
254 std::vector<int> row_groups = Iota(reader_->metadata()->num_row_groups());
255
256 std::unique_ptr<ColumnReaderImpl> reader;
257 RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader));
258
259 return ReadColumn(i, row_groups, reader.get(), out);
260 }
261
262 Status ReadColumn(int i, const std::vector<int>& row_groups, ColumnReader* reader,
263 std::shared_ptr<ChunkedArray>* out) {
264 BEGIN_PARQUET_CATCH_EXCEPTIONS
265 // TODO(wesm): This calculation doesn't make much sense when we have repeated
266 // schema nodes
267 int64_t records_to_read = 0;
268 for (auto row_group : row_groups) {
269 // Can throw exception
270 records_to_read +=
271 reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values();
272 }
273 return reader->NextBatch(records_to_read, out);
274 END_PARQUET_CATCH_EXCEPTIONS
275 }
276
277 Status ReadColumn(int i, const std::vector<int>& row_groups,
278 std::shared_ptr<ChunkedArray>* out) {
279 std::unique_ptr<ColumnReader> flat_column_reader;
280 RETURN_NOT_OK(GetColumn(i, SomeRowGroupsFactory(row_groups), &flat_column_reader));
281 return ReadColumn(i, row_groups, flat_column_reader.get(), out);
282 }
283
284 Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) override {
285 return ReadColumn(i, Iota(reader_->metadata()->num_row_groups()), out);
286 }
287
288 Status ReadTable(std::shared_ptr<Table>* table) override {
289 return ReadTable(Iota(reader_->metadata()->num_columns()), table);
290 }
291
292 Status ReadRowGroups(const std::vector<int>& row_groups,
293 const std::vector<int>& indices,
294 std::shared_ptr<Table>* table) override;
295
296 // Helper method used by ReadRowGroups - read the given row groups/columns, skipping
297 // bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
298 // alive in async contexts.
299 Future<std::shared_ptr<Table>> DecodeRowGroups(
300 std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
301 const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor);
302
303 Status ReadRowGroups(const std::vector<int>& row_groups,
304 std::shared_ptr<Table>* table) override {
305 return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), table);
306 }
307
308 Status ReadRowGroup(int row_group_index, const std::vector<int>& column_indices,
309 std::shared_ptr<Table>* out) override {
310 return ReadRowGroups({row_group_index}, column_indices, out);
311 }
312
313 Status ReadRowGroup(int i, std::shared_ptr<Table>* table) override {
314 return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
315 }
316
317 Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
318 const std::vector<int>& column_indices,
319 std::unique_ptr<RecordBatchReader>* out) override;
320
321 Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
322 std::unique_ptr<RecordBatchReader>* out) override {
323 return GetRecordBatchReader(row_group_indices,
324 Iota(reader_->metadata()->num_columns()), out);
325 }
326
327 ::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
328 GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
329 const std::vector<int> row_group_indices,
330 const std::vector<int> column_indices,
331 ::arrow::internal::Executor* cpu_executor,
332 int row_group_readahead) override;
333
334 int num_columns() const { return reader_->metadata()->num_columns(); }
335
336 ParquetFileReader* parquet_reader() const override { return reader_.get(); }
337
338 int num_row_groups() const override { return reader_->metadata()->num_row_groups(); }
339
340 void set_use_threads(bool use_threads) override {
341 reader_properties_.set_use_threads(use_threads);
342 }
343
344 void set_batch_size(int64_t batch_size) override {
345 reader_properties_.set_batch_size(batch_size);
346 }
347
348 const ArrowReaderProperties& properties() const override { return reader_properties_; }
349
350 const SchemaManifest& manifest() const override { return manifest_; }
351
352 Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
353 int64_t* num_rows) override {
354 BEGIN_PARQUET_CATCH_EXCEPTIONS
355 *num_rows = ScanFileContents(columns, column_batch_size, reader_.get());
356 return Status::OK();
357 END_PARQUET_CATCH_EXCEPTIONS
358 }
359
360 MemoryPool* pool_;
361 std::unique_ptr<ParquetFileReader> reader_;
362 ArrowReaderProperties reader_properties_;
363
364 SchemaManifest manifest_;
365 };
366
367 class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
368 public:
369 RowGroupRecordBatchReader(::arrow::RecordBatchIterator batches,
370 std::shared_ptr<::arrow::Schema> schema)
371 : batches_(std::move(batches)), schema_(std::move(schema)) {}
372
373 ~RowGroupRecordBatchReader() override {}
374
375 Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
376 return batches_.Next().Value(out);
377 }
378
379 std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
380
381 private:
382 ::arrow::Iterator<std::shared_ptr<::arrow::RecordBatch>> batches_;
383 std::shared_ptr<::arrow::Schema> schema_;
384 };
385
386 class ColumnChunkReaderImpl : public ColumnChunkReader {
387 public:
388 ColumnChunkReaderImpl(FileReaderImpl* impl, int row_group_index, int column_index)
389 : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}
390
391 Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) override {
392 return impl_->ReadColumn(column_index_, {row_group_index_}, out);
393 }
394
395 private:
396 FileReaderImpl* impl_;
397 int column_index_;
398 int row_group_index_;
399 };
400
401 class RowGroupReaderImpl : public RowGroupReader {
402 public:
403 RowGroupReaderImpl(FileReaderImpl* impl, int row_group_index)
404 : impl_(impl), row_group_index_(row_group_index) {}
405
406 std::shared_ptr<ColumnChunkReader> Column(int column_index) override {
407 return std::shared_ptr<ColumnChunkReader>(
408 new ColumnChunkReaderImpl(impl_, row_group_index_, column_index));
409 }
410
411 Status ReadTable(const std::vector<int>& column_indices,
412 std::shared_ptr<::arrow::Table>* out) override {
413 return impl_->ReadRowGroup(row_group_index_, column_indices, out);
414 }
415
416 Status ReadTable(std::shared_ptr<::arrow::Table>* out) override {
417 return impl_->ReadRowGroup(row_group_index_, out);
418 }
419
420 private:
421 FileReaderImpl* impl_;
422 int row_group_index_;
423 };
424
425 // ----------------------------------------------------------------------
426 // Column reader implementations
427
428 // Leaf reader is for primitive arrays and primitive children of nested arrays
429 class LeafReader : public ColumnReaderImpl {
430 public:
431 LeafReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
432 std::unique_ptr<FileColumnIterator> input,
433 ::parquet::internal::LevelInfo leaf_info)
434 : ctx_(std::move(ctx)),
435 field_(std::move(field)),
436 input_(std::move(input)),
437 descr_(input_->descr()) {
438 record_reader_ = RecordReader::Make(
439 descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY);
440 NextRowGroup();
441 }
442
443 Status GetDefLevels(const int16_t** data, int64_t* length) final {
444 *data = record_reader_->def_levels();
445 *length = record_reader_->levels_position();
446 return Status::OK();
447 }
448
449 Status GetRepLevels(const int16_t** data, int64_t* length) final {
450 *data = record_reader_->rep_levels();
451 *length = record_reader_->levels_position();
452 return Status::OK();
453 }
454
455 bool IsOrHasRepeatedChild() const final { return false; }
456
457 Status LoadBatch(int64_t records_to_read) final {
458 BEGIN_PARQUET_CATCH_EXCEPTIONS
459 out_ = nullptr;
460 record_reader_->Reset();
461 // Pre-allocation gives much better performance for flat columns
462 record_reader_->Reserve(records_to_read);
463 while (records_to_read > 0) {
464 if (!record_reader_->HasMoreData()) {
465 break;
466 }
467 int64_t records_read = record_reader_->ReadRecords(records_to_read);
468 records_to_read -= records_read;
469 if (records_read == 0) {
470 NextRowGroup();
471 }
472 }
473 RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(), descr_,
474 ctx_->pool, &out_));
475 return Status::OK();
476 END_PARQUET_CATCH_EXCEPTIONS
477 }
478
479 ::arrow::Status BuildArray(int64_t length_upper_bound,
480 std::shared_ptr<::arrow::ChunkedArray>* out) final {
481 *out = out_;
482 return Status::OK();
483 }
484
485 const std::shared_ptr<Field> field() override { return field_; }
486
487 private:
488 std::shared_ptr<ChunkedArray> out_;
489 void NextRowGroup() {
490 std::unique_ptr<PageReader> page_reader = input_->NextChunk();
491 record_reader_->SetPageReader(std::move(page_reader));
492 }
493
494 std::shared_ptr<ReaderContext> ctx_;
495 std::shared_ptr<Field> field_;
496 std::unique_ptr<FileColumnIterator> input_;
497 const ColumnDescriptor* descr_;
498 std::shared_ptr<RecordReader> record_reader_;
499 };
500
501 // Column reader for extension arrays
502 class ExtensionReader : public ColumnReaderImpl {
503 public:
504 ExtensionReader(std::shared_ptr<Field> field,
505 std::unique_ptr<ColumnReaderImpl> storage_reader)
506 : field_(std::move(field)), storage_reader_(std::move(storage_reader)) {}
507
508 Status GetDefLevels(const int16_t** data, int64_t* length) override {
509 return storage_reader_->GetDefLevels(data, length);
510 }
511
512 Status GetRepLevels(const int16_t** data, int64_t* length) override {
513 return storage_reader_->GetRepLevels(data, length);
514 }
515
516 Status LoadBatch(int64_t number_of_records) final {
517 return storage_reader_->LoadBatch(number_of_records);
518 }
519
520 Status BuildArray(int64_t length_upper_bound,
521 std::shared_ptr<ChunkedArray>* out) override {
522 std::shared_ptr<ChunkedArray> storage;
523 RETURN_NOT_OK(storage_reader_->BuildArray(length_upper_bound, &storage));
524 *out = ExtensionType::WrapArray(field_->type(), storage);
525 return Status::OK();
526 }
527
528 bool IsOrHasRepeatedChild() const final {
529 return storage_reader_->IsOrHasRepeatedChild();
530 }
531
532 const std::shared_ptr<Field> field() override { return field_; }
533
534 private:
535 std::shared_ptr<Field> field_;
536 std::unique_ptr<ColumnReaderImpl> storage_reader_;
537 };
538
539 template <typename IndexType>
540 class ListReader : public ColumnReaderImpl {
541 public:
542 ListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
543 ::parquet::internal::LevelInfo level_info,
544 std::unique_ptr<ColumnReaderImpl> child_reader)
545 : ctx_(std::move(ctx)),
546 field_(std::move(field)),
547 level_info_(level_info),
548 item_reader_(std::move(child_reader)) {}
549
550 Status GetDefLevels(const int16_t** data, int64_t* length) override {
551 return item_reader_->GetDefLevels(data, length);
552 }
553
554 Status GetRepLevels(const int16_t** data, int64_t* length) override {
555 return item_reader_->GetRepLevels(data, length);
556 }
557
558 bool IsOrHasRepeatedChild() const final { return true; }
559
560 Status LoadBatch(int64_t number_of_records) final {
561 return item_reader_->LoadBatch(number_of_records);
562 }
563
564 virtual ::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray(
565 std::shared_ptr<ArrayData> data) {
566 if (field_->type()->id() == ::arrow::Type::MAP) {
567 // Error out if data is not map-compliant instead of aborting in MakeArray below
568 RETURN_NOT_OK(::arrow::MapArray::ValidateChildData(data->child_data));
569 }
570 std::shared_ptr<Array> result = ::arrow::MakeArray(data);
571 return std::make_shared<ChunkedArray>(result);
572 }
573
574 Status BuildArray(int64_t length_upper_bound,
575 std::shared_ptr<ChunkedArray>* out) override {
576 const int16_t* def_levels;
577 const int16_t* rep_levels;
578 int64_t num_levels;
579 RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
580 RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
581
582 std::shared_ptr<ResizableBuffer> validity_buffer;
583 ::parquet::internal::ValidityBitmapInputOutput validity_io;
584 validity_io.values_read_upper_bound = length_upper_bound;
585 if (field_->nullable()) {
586 ARROW_ASSIGN_OR_RAISE(
587 validity_buffer,
588 AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool));
589 validity_io.valid_bits = validity_buffer->mutable_data();
590 }
591 ARROW_ASSIGN_OR_RAISE(
592 std::shared_ptr<ResizableBuffer> offsets_buffer,
593 AllocateResizableBuffer(
594 sizeof(IndexType) * std::max(int64_t{1}, length_upper_bound + 1),
595 ctx_->pool));
596 // Ensure zero initialization in case we have reached a zero length list (and
597 // because first entry is always zero).
598 IndexType* offset_data = reinterpret_cast<IndexType*>(offsets_buffer->mutable_data());
599 offset_data[0] = 0;
600 BEGIN_PARQUET_CATCH_EXCEPTIONS
601 ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
602 level_info_, &validity_io, offset_data);
603 END_PARQUET_CATCH_EXCEPTIONS
604
605 RETURN_NOT_OK(item_reader_->BuildArray(offset_data[validity_io.values_read], out));
606
607 // Resize to actual number of elements returned.
608 RETURN_NOT_OK(
609 offsets_buffer->Resize((validity_io.values_read + 1) * sizeof(IndexType)));
610 if (validity_buffer != nullptr) {
611 RETURN_NOT_OK(
612 validity_buffer->Resize(BitUtil::BytesForBits(validity_io.values_read)));
613 validity_buffer->ZeroPadding();
614 }
615 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, ChunksToSingle(**out));
616
617 std::vector<std::shared_ptr<Buffer>> buffers{
618 validity_io.null_count > 0 ? validity_buffer : nullptr, offsets_buffer};
619 auto data = std::make_shared<ArrayData>(
620 field_->type(),
621 /*length=*/validity_io.values_read, std::move(buffers),
622 std::vector<std::shared_ptr<ArrayData>>{item_chunk}, validity_io.null_count);
623
624 ARROW_ASSIGN_OR_RAISE(*out, AssembleArray(std::move(data)));
625 return Status::OK();
626 }
627
628 const std::shared_ptr<Field> field() override { return field_; }
629
630 private:
631 std::shared_ptr<ReaderContext> ctx_;
632 std::shared_ptr<Field> field_;
633 ::parquet::internal::LevelInfo level_info_;
634 std::unique_ptr<ColumnReaderImpl> item_reader_;
635 };
636
637 class PARQUET_NO_EXPORT FixedSizeListReader : public ListReader<int32_t> {
638 public:
639 FixedSizeListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
640 ::parquet::internal::LevelInfo level_info,
641 std::unique_ptr<ColumnReaderImpl> child_reader)
642 : ListReader(std::move(ctx), std::move(field), level_info,
643 std::move(child_reader)) {}
644 ::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray(
645 std::shared_ptr<ArrayData> data) final {
646 DCHECK_EQ(data->buffers.size(), 2);
647 DCHECK_EQ(field()->type()->id(), ::arrow::Type::FIXED_SIZE_LIST);
648 const auto& type = checked_cast<::arrow::FixedSizeListType&>(*field()->type());
649 const int32_t* offsets = reinterpret_cast<const int32_t*>(data->buffers[1]->data());
650 for (int x = 1; x <= data->length; x++) {
651 int32_t size = offsets[x] - offsets[x - 1];
652 if (size != type.list_size()) {
653 return Status::Invalid("Expected all lists to be of size=", type.list_size(),
654 " but index ", x, " had size=", size);
655 }
656 }
657 data->buffers.resize(1);
658 std::shared_ptr<Array> result = ::arrow::MakeArray(data);
659 return std::make_shared<ChunkedArray>(result);
660 }
661 };
662
663 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
664 public:
665 explicit StructReader(std::shared_ptr<ReaderContext> ctx,
666 std::shared_ptr<Field> filtered_field,
667 ::parquet::internal::LevelInfo level_info,
668 std::vector<std::unique_ptr<ColumnReaderImpl>> children)
669 : ctx_(std::move(ctx)),
670 filtered_field_(std::move(filtered_field)),
671 level_info_(level_info),
672 children_(std::move(children)) {
673 // There could be a mix of children some might be repeated some might not be.
674 // If possible use one that isn't since that will be guaranteed to have the least
675 // number of levels to reconstruct a nullable bitmap.
676 auto result = std::find_if(children_.begin(), children_.end(),
677 [](const std::unique_ptr<ColumnReaderImpl>& child) {
678 return !child->IsOrHasRepeatedChild();
679 });
680 if (result != children_.end()) {
681 def_rep_level_child_ = result->get();
682 has_repeated_child_ = false;
683 } else if (!children_.empty()) {
684 def_rep_level_child_ = children_.front().get();
685 has_repeated_child_ = true;
686 }
687 }
688
689 bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
690
691 Status LoadBatch(int64_t records_to_read) override {
692 for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
693 RETURN_NOT_OK(reader->LoadBatch(records_to_read));
694 }
695 return Status::OK();
696 }
697 Status BuildArray(int64_t length_upper_bound,
698 std::shared_ptr<ChunkedArray>* out) override;
699 Status GetDefLevels(const int16_t** data, int64_t* length) override;
700 Status GetRepLevels(const int16_t** data, int64_t* length) override;
701 const std::shared_ptr<Field> field() override { return filtered_field_; }
702
703 private:
704 const std::shared_ptr<ReaderContext> ctx_;
705 const std::shared_ptr<Field> filtered_field_;
706 const ::parquet::internal::LevelInfo level_info_;
707 const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
708 ColumnReaderImpl* def_rep_level_child_ = nullptr;
709 bool has_repeated_child_;
710 };
711
712 Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
713 *data = nullptr;
714 if (children_.size() == 0) {
715 *length = 0;
716 return Status::Invalid("StructReader had no children");
717 }
718
719 // This method should only be called when this struct or one of its parents
720 // are optional/repeated or it has a repeated child.
721 // Meaning all children must have rep/def levels associated
722 // with them.
723 RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
724 return Status::OK();
725 }
726
727 Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
728 *data = nullptr;
729 if (children_.size() == 0) {
730 *length = 0;
731 return Status::Invalid("StructReader had no childre");
732 }
733
734 // This method should only be called when this struct or one of its parents
735 // are optional/repeated or it has repeated child.
736 // Meaning all children must have rep/def levels associated
737 // with them.
738 RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length));
739 return Status::OK();
740 }
741
742 Status StructReader::BuildArray(int64_t length_upper_bound,
743 std::shared_ptr<ChunkedArray>* out) {
744 std::vector<std::shared_ptr<ArrayData>> children_array_data;
745 std::shared_ptr<ResizableBuffer> null_bitmap;
746
747 ::parquet::internal::ValidityBitmapInputOutput validity_io;
748 validity_io.values_read_upper_bound = length_upper_bound;
749 // This simplifies accounting below.
750 validity_io.values_read = length_upper_bound;
751
752 BEGIN_PARQUET_CATCH_EXCEPTIONS
753 const int16_t* def_levels;
754 const int16_t* rep_levels;
755 int64_t num_levels;
756
757 if (has_repeated_child_) {
758 ARROW_ASSIGN_OR_RAISE(
759 null_bitmap,
760 AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool));
761 validity_io.valid_bits = null_bitmap->mutable_data();
762 RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
763 RETURN_NOT_OK(GetRepLevels(&rep_levels, &num_levels));
764 DefRepLevelsToBitmap(def_levels, rep_levels, num_levels, level_info_, &validity_io);
765 } else if (filtered_field_->nullable()) {
766 ARROW_ASSIGN_OR_RAISE(
767 null_bitmap,
768 AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool));
769 validity_io.valid_bits = null_bitmap->mutable_data();
770 RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
771 DefLevelsToBitmap(def_levels, num_levels, level_info_, &validity_io);
772 }
773
774 // Ensure all values are initialized.
775 if (null_bitmap) {
776 RETURN_NOT_OK(null_bitmap->Resize(BitUtil::BytesForBits(validity_io.values_read)));
777 null_bitmap->ZeroPadding();
778 }
779
780 END_PARQUET_CATCH_EXCEPTIONS
781 // Gather children arrays and def levels
782 for (auto& child : children_) {
783 std::shared_ptr<ChunkedArray> field;
784 RETURN_NOT_OK(child->BuildArray(validity_io.values_read, &field));
785 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> array_data, ChunksToSingle(*field));
786 children_array_data.push_back(std::move(array_data));
787 }
788
789 if (!filtered_field_->nullable() && !has_repeated_child_) {
790 validity_io.values_read = children_array_data.front()->length;
791 }
792
793 std::vector<std::shared_ptr<Buffer>> buffers{validity_io.null_count > 0 ? null_bitmap
794 : nullptr};
795 auto data =
796 std::make_shared<ArrayData>(filtered_field_->type(),
797 /*length=*/validity_io.values_read, std::move(buffers),
798 std::move(children_array_data));
799 std::shared_ptr<Array> result = ::arrow::MakeArray(data);
800
801 *out = std::make_shared<ChunkedArray>(result);
802 return Status::OK();
803 }
804
805 // ----------------------------------------------------------------------
806 // File reader implementation
807
808 Status GetReader(const SchemaField& field, const std::shared_ptr<Field>& arrow_field,
809 const std::shared_ptr<ReaderContext>& ctx,
810 std::unique_ptr<ColumnReaderImpl>* out) {
811 BEGIN_PARQUET_CATCH_EXCEPTIONS
812
813 auto type_id = arrow_field->type()->id();
814
815 if (type_id == ::arrow::Type::EXTENSION) {
816 auto storage_field = arrow_field->WithType(
817 checked_cast<const ExtensionType&>(*arrow_field->type()).storage_type());
818 RETURN_NOT_OK(GetReader(field, storage_field, ctx, out));
819 out->reset(new ExtensionReader(arrow_field, std::move(*out)));
820 return Status::OK();
821 }
822
823 if (field.children.size() == 0) {
824 if (!field.is_leaf()) {
825 return Status::Invalid("Parquet non-leaf node has no children");
826 }
827 if (!ctx->IncludesLeaf(field.column_index)) {
828 *out = nullptr;
829 return Status::OK();
830 }
831 std::unique_ptr<FileColumnIterator> input(
832 ctx->iterator_factory(field.column_index, ctx->reader));
833 out->reset(new LeafReader(ctx, arrow_field, std::move(input), field.level_info));
834 } else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP ||
835 type_id == ::arrow::Type::FIXED_SIZE_LIST ||
836 type_id == ::arrow::Type::LARGE_LIST) {
837 auto list_field = arrow_field;
838 auto child = &field.children[0];
839 std::unique_ptr<ColumnReaderImpl> child_reader;
840 RETURN_NOT_OK(GetReader(*child, ctx, &child_reader));
841 if (child_reader == nullptr) {
842 *out = nullptr;
843 return Status::OK();
844 }
845
846 // These two types might not be equal if there column pruning occurred.
847 // further down the stack.
848 const std::shared_ptr<DataType> reader_child_type = child_reader->field()->type();
849 // This should really never happen but was raised as a question on the code
850 // review, this should be pretty cheap check so leave it in.
851 if (ARROW_PREDICT_FALSE(list_field->type()->num_fields() != 1)) {
852 return Status::Invalid("expected exactly one child field for: ",
853 list_field->ToString());
854 }
855 const DataType& schema_child_type = *(list_field->type()->field(0)->type());
856 if (type_id == ::arrow::Type::MAP) {
857 if (reader_child_type->num_fields() != 2 ||
858 !reader_child_type->field(0)->type()->Equals(
859 *schema_child_type.field(0)->type())) {
860 // This case applies if either key or value are completed filtered
861 // out so we can take the type as is or the key was partially
862 // so keeping it as a map no longer makes sence.
863 list_field = list_field->WithType(::arrow::list(child_reader->field()));
864 } else if (!reader_child_type->field(1)->type()->Equals(
865 *schema_child_type.field(1)->type())) {
866 list_field = list_field->WithType(std::make_shared<::arrow::MapType>(
867 reader_child_type->field(
868 0), // field 0 is unchanged baed on previous if statement
869 reader_child_type->field(1)));
870 }
871 // Map types are list<struct<key, value>> so use ListReader
872 // for reconstruction.
873 out->reset(new ListReader<int32_t>(ctx, list_field, field.level_info,
874 std::move(child_reader)));
875 } else if (type_id == ::arrow::Type::LIST) {
876 if (!reader_child_type->Equals(schema_child_type)) {
877 list_field = list_field->WithType(::arrow::list(reader_child_type));
878 }
879
880 out->reset(new ListReader<int32_t>(ctx, list_field, field.level_info,
881 std::move(child_reader)));
882 } else if (type_id == ::arrow::Type::LARGE_LIST) {
883 if (!reader_child_type->Equals(schema_child_type)) {
884 list_field = list_field->WithType(::arrow::large_list(reader_child_type));
885 }
886
887 out->reset(new ListReader<int64_t>(ctx, list_field, field.level_info,
888 std::move(child_reader)));
889 } else if (type_id == ::arrow::Type::FIXED_SIZE_LIST) {
890 if (!reader_child_type->Equals(schema_child_type)) {
891 auto& fixed_list_type =
892 checked_cast<const ::arrow::FixedSizeListType&>(*list_field->type());
893 int32_t list_size = fixed_list_type.list_size();
894 list_field =
895 list_field->WithType(::arrow::fixed_size_list(reader_child_type, list_size));
896 }
897
898 out->reset(new FixedSizeListReader(ctx, list_field, field.level_info,
899 std::move(child_reader)));
900 } else {
901 return Status::UnknownError("Unknown list type: ", field.field->ToString());
902 }
903 } else if (type_id == ::arrow::Type::STRUCT) {
904 std::vector<std::shared_ptr<Field>> child_fields;
905 int arrow_field_idx = 0;
906 std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
907 for (const auto& child : field.children) {
908 std::unique_ptr<ColumnReaderImpl> child_reader;
909 RETURN_NOT_OK(GetReader(child, ctx, &child_reader));
910 if (!child_reader) {
911 arrow_field_idx++;
912 // If all children were pruned, then we do not try to read this field
913 continue;
914 }
915 std::shared_ptr<::arrow::Field> child_field = child.field;
916 const DataType& reader_child_type = *child_reader->field()->type();
917 const DataType& schema_child_type =
918 *arrow_field->type()->field(arrow_field_idx++)->type();
919 // These might not be equal if column pruning occurred.
920 if (!schema_child_type.Equals(reader_child_type)) {
921 child_field = child_field->WithType(child_reader->field()->type());
922 }
923 child_fields.push_back(child_field);
924 child_readers.emplace_back(std::move(child_reader));
925 }
926 if (child_fields.size() == 0) {
927 *out = nullptr;
928 return Status::OK();
929 }
930 auto filtered_field =
931 ::arrow::field(arrow_field->name(), ::arrow::struct_(child_fields),
932 arrow_field->nullable(), arrow_field->metadata());
933 out->reset(new StructReader(ctx, filtered_field, field.level_info,
934 std::move(child_readers)));
935 } else {
936 return Status::Invalid("Unsupported nested type: ", arrow_field->ToString());
937 }
938 return Status::OK();
939
940 END_PARQUET_CATCH_EXCEPTIONS
941 }
942
943 Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& ctx,
944 std::unique_ptr<ColumnReaderImpl>* out) {
945 return GetReader(field, field.field, ctx, out);
946 }
947
948 } // namespace
949
950 Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
951 const std::vector<int>& column_indices,
952 std::unique_ptr<RecordBatchReader>* out) {
953 RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
954
955 if (reader_properties_.pre_buffer()) {
956 // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
957 BEGIN_PARQUET_CATCH_EXCEPTIONS
958 reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(),
959 reader_properties_.cache_options());
960 END_PARQUET_CATCH_EXCEPTIONS
961 }
962
963 std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
964 std::shared_ptr<::arrow::Schema> batch_schema;
965 RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema));
966
967 if (readers.empty()) {
968 // Just generate all batches right now; they're cheap since they have no columns.
969 int64_t batch_size = properties().batch_size();
970 auto max_sized_batch =
971 ::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{});
972
973 ::arrow::RecordBatchVector batches;
974
975 for (int row_group : row_groups) {
976 int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
977
978 batches.insert(batches.end(), num_rows / batch_size, max_sized_batch);
979
980 if (int64_t trailing_rows = num_rows % batch_size) {
981 batches.push_back(max_sized_batch->Slice(0, trailing_rows));
982 }
983 }
984
985 *out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
986 ::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));
987
988 return Status::OK();
989 }
990
991 int64_t num_rows = 0;
992 for (int row_group : row_groups) {
993 num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
994 }
995
996 using ::arrow::RecordBatchIterator;
997
998 // NB: This lambda will be invoked outside the scope of this call to
999 // `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
1000 // `this` is a non-owning pointer so we are relying on the parent FileReader outliving
1001 // this RecordBatchReader.
1002 ::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
1003 [readers, batch_schema, num_rows,
1004 this]() mutable -> ::arrow::Result<RecordBatchIterator> {
1005 ::arrow::ChunkedArrayVector columns(readers.size());
1006
1007 // don't reserve more rows than necessary
1008 int64_t batch_size = std::min(properties().batch_size(), num_rows);
1009 num_rows -= batch_size;
1010
1011 RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
1012 reader_properties_.use_threads(), static_cast<int>(readers.size()),
1013 [&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); }));
1014
1015 for (const auto& column : columns) {
1016 if (column == nullptr || column->length() == 0) {
1017 return ::arrow::IterationTraits<RecordBatchIterator>::End();
1018 }
1019 }
1020
1021 auto table = ::arrow::Table::Make(batch_schema, std::move(columns));
1022 auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
1023
1024 // NB: explicitly preserve table so that table_reader doesn't outlive it
1025 return ::arrow::MakeFunctionIterator(
1026 [table, table_reader] { return table_reader->Next(); });
1027 });
1028
1029 *out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
1030 ::arrow::MakeFlattenIterator(std::move(batches)), std::move(batch_schema));
1031
1032 return Status::OK();
1033 }
1034
1035 /// Given a file reader and a list of row groups, this is a generator of record
1036 /// batch generators (where each sub-generator is the contents of a single row group).
1037 class RowGroupGenerator {
1038 public:
1039 using RecordBatchGenerator =
1040 ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
1041
1042 explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
1043 ::arrow::internal::Executor* cpu_executor,
1044 std::vector<int> row_groups, std::vector<int> column_indices)
1045 : arrow_reader_(std::move(arrow_reader)),
1046 cpu_executor_(cpu_executor),
1047 row_groups_(std::move(row_groups)),
1048 column_indices_(std::move(column_indices)),
1049 index_(0) {}
1050
1051 ::arrow::Future<RecordBatchGenerator> operator()() {
1052 if (index_ >= row_groups_.size()) {
1053 return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>();
1054 }
1055 int row_group = row_groups_[index_++];
1056 std::vector<int> column_indices = column_indices_;
1057 auto reader = arrow_reader_;
1058 if (!reader->properties().pre_buffer()) {
1059 return SubmitRead(cpu_executor_, reader, row_group, column_indices);
1060 }
1061 auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
1062 if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
1063 return ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> {
1064 return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
1065 });
1066 }
1067
1068 private:
1069 // Synchronous fallback for when pre-buffer isn't enabled.
1070 //
1071 // Making the Parquet reader truly asynchronous requires heavy refactoring, so the
1072 // generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for
1073 // async I/O without forcing readahead.
1074 static ::arrow::Future<RecordBatchGenerator> SubmitRead(
1075 ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
1076 const int row_group, const std::vector<int>& column_indices) {
1077 if (!cpu_executor) {
1078 return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
1079 }
1080 // If we have an executor, then force transfer (even if I/O was complete)
1081 return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
1082 row_group, column_indices));
1083 }
1084
1085 static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup(
1086 ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
1087 const int row_group, const std::vector<int>& column_indices) {
1088 // Skips bound checks/pre-buffering, since we've done that already
1089 const int64_t batch_size = self->properties().batch_size();
1090 return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
1091 .Then([batch_size](const std::shared_ptr<Table>& table)
1092 -> ::arrow::Result<RecordBatchGenerator> {
1093 ::arrow::TableBatchReader table_reader(*table);
1094 table_reader.set_chunksize(batch_size);
1095 ::arrow::RecordBatchVector batches;
1096 RETURN_NOT_OK(table_reader.ReadAll(&batches));
1097 return ::arrow::MakeVectorGenerator(std::move(batches));
1098 });
1099 }
1100
1101 std::shared_ptr<FileReaderImpl> arrow_reader_;
1102 ::arrow::internal::Executor* cpu_executor_;
1103 std::vector<int> row_groups_;
1104 std::vector<int> column_indices_;
1105 size_t index_;
1106 };
1107
1108 ::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
1109 FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
1110 const std::vector<int> row_group_indices,
1111 const std::vector<int> column_indices,
1112 ::arrow::internal::Executor* cpu_executor,
1113 int row_group_readahead) {
1114 RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
1115 if (reader_properties_.pre_buffer()) {
1116 BEGIN_PARQUET_CATCH_EXCEPTIONS
1117 reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(),
1118 reader_properties_.cache_options());
1119 END_PARQUET_CATCH_EXCEPTIONS
1120 }
1121 ::arrow::AsyncGenerator<RowGroupGenerator::RecordBatchGenerator> row_group_generator =
1122 RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader),
1123 cpu_executor, row_group_indices, column_indices);
1124 if (row_group_readahead > 0) {
1125 row_group_generator = ::arrow::MakeReadaheadGenerator(std::move(row_group_generator),
1126 row_group_readahead);
1127 }
1128 return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator));
1129 }
1130
1131 Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
1132 std::unique_ptr<ColumnReader>* out) {
1133 RETURN_NOT_OK(BoundsCheckColumn(i));
1134 auto ctx = std::make_shared<ReaderContext>();
1135 ctx->reader = reader_.get();
1136 ctx->pool = pool_;
1137 ctx->iterator_factory = iterator_factory;
1138 ctx->filter_leaves = false;
1139 std::unique_ptr<ColumnReaderImpl> result;
1140 RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result));
1141 out->reset(result.release());
1142 return Status::OK();
1143 }
1144
1145 Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
1146 const std::vector<int>& column_indices,
1147 std::shared_ptr<Table>* out) {
1148 RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
1149
1150 // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
1151 if (reader_properties_.pre_buffer()) {
1152 BEGIN_PARQUET_CATCH_EXCEPTIONS
1153 parquet_reader()->PreBuffer(row_groups, column_indices,
1154 reader_properties_.io_context(),
1155 reader_properties_.cache_options());
1156 END_PARQUET_CATCH_EXCEPTIONS
1157 }
1158
1159 auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
1160 /*cpu_executor=*/nullptr);
1161 ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
1162 return Status::OK();
1163 }
1164
1165 Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
1166 std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
1167 const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor) {
1168 // `self` is used solely to keep `this` alive in an async context - but we use this
1169 // in a sync context too so use `this` over `self`
1170 std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
1171 std::shared_ptr<::arrow::Schema> result_schema;
1172 RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
1173 // OptionalParallelForAsync requires an executor
1174 if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
1175
1176 auto read_column = [row_groups, self, this](size_t i,
1177 std::shared_ptr<ColumnReaderImpl> reader)
1178 -> ::arrow::Result<std::shared_ptr<::arrow::ChunkedArray>> {
1179 std::shared_ptr<::arrow::ChunkedArray> column;
1180 RETURN_NOT_OK(ReadColumn(static_cast<int>(i), row_groups, reader.get(), &column));
1181 return column;
1182 };
1183 auto make_table = [result_schema, row_groups, self,
1184 this](const ::arrow::ChunkedArrayVector& columns)
1185 -> ::arrow::Result<std::shared_ptr<Table>> {
1186 int64_t num_rows = 0;
1187 if (!columns.empty()) {
1188 num_rows = columns[0]->length();
1189 } else {
1190 for (int i : row_groups) {
1191 num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
1192 }
1193 }
1194 auto table = Table::Make(std::move(result_schema), columns, num_rows);
1195 RETURN_NOT_OK(table->Validate());
1196 return table;
1197 };
1198 return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(),
1199 std::move(readers), read_column,
1200 cpu_executor)
1201 .Then(std::move(make_table));
1202 }
1203
1204 std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
1205 return std::make_shared<RowGroupReaderImpl>(this, row_group_index);
1206 }
1207
1208 // ----------------------------------------------------------------------
1209 // Public factory functions
1210
1211 Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
1212 std::shared_ptr<RecordBatchReader>* out) {
1213 std::unique_ptr<RecordBatchReader> tmp;
1214 ARROW_RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, &tmp));
1215 out->reset(tmp.release());
1216 return Status::OK();
1217 }
1218
1219 Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
1220 const std::vector<int>& column_indices,
1221 std::shared_ptr<RecordBatchReader>* out) {
1222 std::unique_ptr<RecordBatchReader> tmp;
1223 ARROW_RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, column_indices, &tmp));
1224 out->reset(tmp.release());
1225 return Status::OK();
1226 }
1227
1228 Status FileReader::Make(::arrow::MemoryPool* pool,
1229 std::unique_ptr<ParquetFileReader> reader,
1230 const ArrowReaderProperties& properties,
1231 std::unique_ptr<FileReader>* out) {
1232 out->reset(new FileReaderImpl(pool, std::move(reader), properties));
1233 return static_cast<FileReaderImpl*>(out->get())->Init();
1234 }
1235
1236 Status FileReader::Make(::arrow::MemoryPool* pool,
1237 std::unique_ptr<ParquetFileReader> reader,
1238 std::unique_ptr<FileReader>* out) {
1239 return Make(pool, std::move(reader), default_arrow_reader_properties(), out);
1240 }
1241
1242 FileReaderBuilder::FileReaderBuilder()
1243 : pool_(::arrow::default_memory_pool()),
1244 properties_(default_arrow_reader_properties()) {}
1245
1246 Status FileReaderBuilder::Open(std::shared_ptr<::arrow::io::RandomAccessFile> file,
1247 const ReaderProperties& properties,
1248 std::shared_ptr<FileMetaData> metadata) {
1249 PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::Open(std::move(file), properties,
1250 std::move(metadata)));
1251 return Status::OK();
1252 }
1253
1254 FileReaderBuilder* FileReaderBuilder::memory_pool(::arrow::MemoryPool* pool) {
1255 pool_ = pool;
1256 return this;
1257 }
1258
1259 FileReaderBuilder* FileReaderBuilder::properties(
1260 const ArrowReaderProperties& arg_properties) {
1261 properties_ = arg_properties;
1262 return this;
1263 }
1264
1265 Status FileReaderBuilder::Build(std::unique_ptr<FileReader>* out) {
1266 return FileReader::Make(pool_, std::move(raw_reader_), properties_, out);
1267 }
1268
1269 Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool,
1270 std::unique_ptr<FileReader>* reader) {
1271 FileReaderBuilder builder;
1272 RETURN_NOT_OK(builder.Open(std::move(file)));
1273 return builder.memory_pool(pool)->Build(reader);
1274 }
1275
1276 namespace internal {
1277
1278 Status FuzzReader(std::unique_ptr<FileReader> reader) {
1279 auto st = Status::OK();
1280 for (int i = 0; i < reader->num_row_groups(); ++i) {
1281 std::shared_ptr<Table> table;
1282 auto row_group_status = reader->ReadRowGroup(i, &table);
1283 if (row_group_status.ok()) {
1284 row_group_status &= table->ValidateFull();
1285 }
1286 st &= row_group_status;
1287 }
1288 return st;
1289 }
1290
1291 Status FuzzReader(const uint8_t* data, int64_t size) {
1292 auto buffer = std::make_shared<::arrow::Buffer>(data, size);
1293 auto file = std::make_shared<::arrow::io::BufferReader>(buffer);
1294 FileReaderBuilder builder;
1295 RETURN_NOT_OK(builder.Open(std::move(file)));
1296
1297 std::unique_ptr<FileReader> reader;
1298 RETURN_NOT_OK(builder.Build(&reader));
1299 return FuzzReader(std::move(reader));
1300 }
1301
1302 } // namespace internal
1303
1304 } // namespace arrow
1305 } // namespace parquet