1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
21 // N.B. we don't include async_generator.h as it's relatively heavy
26 #include "parquet/file_reader.h"
27 #include "parquet/platform.h"
28 #include "parquet/properties.h"
33 class KeyValueMetadata
;
34 class RecordBatchReader
;
45 class SchemaDescriptor
;
49 class ColumnChunkReader
;
51 struct SchemaManifest
;
54 /// \brief Arrow read adapter class for deserializing Parquet files as Arrow row batches.
56 /// This interfaces caters for different use cases and thus provides different
57 /// interfaces. In its most simplistic form, we cater for a user that wants to
58 /// read the whole Parquet at once with the `FileReader::ReadTable` method.
60 /// More advanced users that also want to implement parallelism on top of each
61 /// single Parquet files should do this on the RowGroup level. For this, they can
62 /// call `FileReader::RowGroup(i)->ReadTable` to receive only the specified
63 /// RowGroup as a table.
65 /// In the most advanced situation, where a consumer wants to independently read
66 /// RowGroups in parallel and consume each column individually, they can call
67 /// `FileReader::RowGroup(i)->Column(j)->Read` and receive an `arrow::Column`
70 /// The parquet format supports an optional integer field_id which can be assigned
71 /// to a field. Arrow will convert these field IDs to a metadata key named
72 /// PARQUET:field_id on the appropriate field.
73 // TODO(wesm): nested data does not always make sense with this user
74 // interface unless you are only reading a single leaf node from a branch of
75 // a table. For example:
77 // repeated group data {
78 // optional group record {
79 // optional int32 val1;
80 // optional byte_array val2;
81 // optional bool val3;
83 // optional int32 val4;
86 // In the Parquet file, there are 3 leaf nodes:
93 // When materializing this data in an Arrow array, we would have:
98 // val2: string (= list<uint8>),
104 // However, in the Parquet format, each leaf node has its own repetition and
105 // definition levels describing the structure of the intermediate nodes in
106 // this array structure. Thus, we will need to scan the leaf data for a group
107 // of leaf nodes part of the same type tree to create a single result Arrow
108 // nested array structure.
110 // This is additionally complicated "chunky" repeated fields or very large byte
112 class PARQUET_EXPORT FileReader
{
114 /// Factory function to create a FileReader from a ParquetFileReader and properties
115 static ::arrow::Status
Make(::arrow::MemoryPool
* pool
,
116 std::unique_ptr
<ParquetFileReader
> reader
,
117 const ArrowReaderProperties
& properties
,
118 std::unique_ptr
<FileReader
>* out
);
120 /// Factory function to create a FileReader from a ParquetFileReader
121 static ::arrow::Status
Make(::arrow::MemoryPool
* pool
,
122 std::unique_ptr
<ParquetFileReader
> reader
,
123 std::unique_ptr
<FileReader
>* out
);
125 // Since the distribution of columns amongst a Parquet file's row groups may
126 // be uneven (the number of values in each column chunk can be different), we
127 // provide a column-oriented read interface. The ColumnReader hides the
128 // details of paging through the file's row groups and yielding
129 // fully-materialized arrow::Array instances
131 // Returns error status if the column of interest is not flat.
132 virtual ::arrow::Status
GetColumn(int i
, std::unique_ptr
<ColumnReader
>* out
) = 0;
134 /// \brief Return arrow schema for all the columns.
135 virtual ::arrow::Status
GetSchema(std::shared_ptr
<::arrow::Schema
>* out
) = 0;
137 /// \brief Read column as a whole into a chunked array.
139 /// The indicated column index is relative to the schema
140 virtual ::arrow::Status
ReadColumn(int i
,
141 std::shared_ptr
<::arrow::ChunkedArray
>* out
) = 0;
143 // NOTE: Experimental API
144 // Reads a specific top level schema field into an Array
145 // The index i refers the index of the top level schema field, which may
146 // be nested or flat - e.g.
154 // i=0 will read the entire foo struct, i=1 the foo2 primitive column etc
155 virtual ::arrow::Status
ReadSchemaField(
156 int i
, std::shared_ptr
<::arrow::ChunkedArray
>* out
) = 0;
158 /// \brief Return a RecordBatchReader of row groups selected from row_group_indices.
160 /// Note that the ordering in row_group_indices matters. FileReaders must outlive
161 /// their RecordBatchReaders.
163 /// \returns error Status if row_group_indices contains an invalid index
164 virtual ::arrow::Status
GetRecordBatchReader(
165 const std::vector
<int>& row_group_indices
,
166 std::unique_ptr
<::arrow::RecordBatchReader
>* out
) = 0;
168 ::arrow::Status
GetRecordBatchReader(const std::vector
<int>& row_group_indices
,
169 std::shared_ptr
<::arrow::RecordBatchReader
>* out
);
171 /// \brief Return a RecordBatchReader of row groups selected from
172 /// row_group_indices, whose columns are selected by column_indices.
174 /// Note that the ordering in row_group_indices and column_indices
175 /// matter. FileReaders must outlive their RecordBatchReaders.
177 /// \returns error Status if either row_group_indices or column_indices
178 /// contains an invalid index
179 virtual ::arrow::Status
GetRecordBatchReader(
180 const std::vector
<int>& row_group_indices
, const std::vector
<int>& column_indices
,
181 std::unique_ptr
<::arrow::RecordBatchReader
>* out
) = 0;
183 /// \brief Return a generator of record batches.
185 /// The FileReader must outlive the generator, so this requires that you pass in a
188 /// \returns error Result if either row_group_indices or column_indices contains an
190 virtual ::arrow::Result
<
191 std::function
<::arrow::Future
<std::shared_ptr
<::arrow::RecordBatch
>>()>>
192 GetRecordBatchGenerator(std::shared_ptr
<FileReader
> reader
,
193 const std::vector
<int> row_group_indices
,
194 const std::vector
<int> column_indices
,
195 ::arrow::internal::Executor
* cpu_executor
= NULLPTR
,
196 int row_group_readahead
= 0) = 0;
198 ::arrow::Status
GetRecordBatchReader(const std::vector
<int>& row_group_indices
,
199 const std::vector
<int>& column_indices
,
200 std::shared_ptr
<::arrow::RecordBatchReader
>* out
);
202 /// Read all columns into a Table
203 virtual ::arrow::Status
ReadTable(std::shared_ptr
<::arrow::Table
>* out
) = 0;
205 /// \brief Read the given columns into a Table
207 /// The indicated column indices are relative to the schema
208 virtual ::arrow::Status
ReadTable(const std::vector
<int>& column_indices
,
209 std::shared_ptr
<::arrow::Table
>* out
) = 0;
211 virtual ::arrow::Status
ReadRowGroup(int i
, const std::vector
<int>& column_indices
,
212 std::shared_ptr
<::arrow::Table
>* out
) = 0;
214 virtual ::arrow::Status
ReadRowGroup(int i
, std::shared_ptr
<::arrow::Table
>* out
) = 0;
216 virtual ::arrow::Status
ReadRowGroups(const std::vector
<int>& row_groups
,
217 const std::vector
<int>& column_indices
,
218 std::shared_ptr
<::arrow::Table
>* out
) = 0;
220 virtual ::arrow::Status
ReadRowGroups(const std::vector
<int>& row_groups
,
221 std::shared_ptr
<::arrow::Table
>* out
) = 0;
223 /// \brief Scan file contents with one thread, return number of rows
224 virtual ::arrow::Status
ScanContents(std::vector
<int> columns
,
225 const int32_t column_batch_size
,
226 int64_t* num_rows
) = 0;
228 /// \brief Return a reader for the RowGroup, this object must not outlive the
230 virtual std::shared_ptr
<RowGroupReader
> RowGroup(int row_group_index
) = 0;
232 /// \brief The number of row groups in the file
233 virtual int num_row_groups() const = 0;
235 virtual ParquetFileReader
* parquet_reader() const = 0;
237 /// Set whether to use multiple threads during reads of multiple columns.
238 /// By default only one thread is used.
239 virtual void set_use_threads(bool use_threads
) = 0;
241 /// Set number of records to read per batch for the RecordBatchReader.
242 virtual void set_batch_size(int64_t batch_size
) = 0;
244 virtual const ArrowReaderProperties
& properties() const = 0;
246 virtual const SchemaManifest
& manifest() const = 0;
248 virtual ~FileReader() = default;
251 class RowGroupReader
{
253 virtual ~RowGroupReader() = default;
254 virtual std::shared_ptr
<ColumnChunkReader
> Column(int column_index
) = 0;
255 virtual ::arrow::Status
ReadTable(const std::vector
<int>& column_indices
,
256 std::shared_ptr
<::arrow::Table
>* out
) = 0;
257 virtual ::arrow::Status
ReadTable(std::shared_ptr
<::arrow::Table
>* out
) = 0;
263 class ColumnChunkReader
{
265 virtual ~ColumnChunkReader() = default;
266 virtual ::arrow::Status
Read(std::shared_ptr
<::arrow::ChunkedArray
>* out
) = 0;
269 // At this point, the column reader is a stream iterator. It only knows how to
270 // read the next batch of values for a particular column from the file until it
273 // We also do not expose any internal Parquet details, such as row groups. This
274 // might change in the future.
275 class PARQUET_EXPORT ColumnReader
{
277 virtual ~ColumnReader() = default;
279 // Scan the next array of the indicated size. The actual size of the
280 // returned array may be less than the passed size depending how much data is
281 // available in the file.
283 // When all the data in the file has been exhausted, the result is set to
286 // Returns Status::OK on a successful read, including if you have exhausted
287 // the data available in the file.
288 virtual ::arrow::Status
NextBatch(int64_t batch_size
,
289 std::shared_ptr
<::arrow::ChunkedArray
>* out
) = 0;
292 /// \brief Experimental helper class for bindings (like Python) that struggle
293 /// either with std::move or C++ exceptions
294 class PARQUET_EXPORT FileReaderBuilder
{
298 /// Create FileReaderBuilder from Arrow file and optional properties / metadata
299 ::arrow::Status
Open(std::shared_ptr
<::arrow::io::RandomAccessFile
> file
,
300 const ReaderProperties
& properties
= default_reader_properties(),
301 std::shared_ptr
<FileMetaData
> metadata
= NULLPTR
);
303 ParquetFileReader
* raw_reader() { return raw_reader_
.get(); }
305 /// Set Arrow MemoryPool for memory allocation
306 FileReaderBuilder
* memory_pool(::arrow::MemoryPool
* pool
);
307 /// Set Arrow reader properties
308 FileReaderBuilder
* properties(const ArrowReaderProperties
& arg_properties
);
309 /// Build FileReader instance
310 ::arrow::Status
Build(std::unique_ptr
<FileReader
>* out
);
313 ::arrow::MemoryPool
* pool_
;
314 ArrowReaderProperties properties_
;
315 std::unique_ptr
<ParquetFileReader
> raw_reader_
;
318 /// \defgroup parquet-arrow-reader-factories Factory functions for Parquet Arrow readers
322 /// \brief Build FileReader from Arrow file and MemoryPool
324 /// Advanced settings are supported through the FileReaderBuilder class.
326 ::arrow::Status
OpenFile(std::shared_ptr
<::arrow::io::RandomAccessFile
>,
327 ::arrow::MemoryPool
* allocator
,
328 std::unique_ptr
<FileReader
>* reader
);
333 ::arrow::Status
StatisticsAsScalars(const Statistics
& Statistics
,
334 std::shared_ptr
<::arrow::Scalar
>* min
,
335 std::shared_ptr
<::arrow::Scalar
>* max
);
340 ::arrow::Status
FuzzReader(const uint8_t* data
, int64_t size
);
342 } // namespace internal
344 } // namespace parquet