1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 // This API is EXPERIMENTAL.
24 #include <unordered_set>
28 #include "arrow/dataset/discovery.h"
29 #include "arrow/dataset/file_base.h"
30 #include "arrow/dataset/type_fwd.h"
31 #include "arrow/dataset/visibility.h"
32 #include "arrow/io/caching.h"
33 #include "arrow/util/optional.h"
36 class ParquetFileReader
;
38 class ColumnChunkMetaData
;
39 class RowGroupMetaData
;
41 class FileDecryptionProperties
;
42 class FileEncryptionProperties
;
44 class ReaderProperties
;
45 class ArrowReaderProperties
;
47 class WriterProperties
;
48 class ArrowWriterProperties
;
53 struct SchemaManifest
;
55 } // namespace parquet
60 /// \addtogroup dataset-file-formats
64 constexpr char kParquetTypeName
[] = "parquet";
66 /// \brief A FileFormat implementation that reads from Parquet files
67 class ARROW_DS_EXPORT ParquetFileFormat
: public FileFormat
{
69 ParquetFileFormat() = default;
71 /// Convenience constructor which copies properties from a parquet::ReaderProperties.
72 /// memory_pool will be ignored.
73 explicit ParquetFileFormat(const parquet::ReaderProperties
& reader_properties
);
75 std::string
type_name() const override
{ return kParquetTypeName
; }
77 bool Equals(const FileFormat
& other
) const override
;
79 struct ReaderOptions
{
80 /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond
81 /// to members of parquet::ArrowReaderProperties.
83 /// We don't embed parquet::ReaderProperties directly because column names (rather
84 /// than indices) are used to indicate dictionary columns, and other options are
85 /// deferred to scan time.
88 std::unordered_set
<std::string
> dict_columns
;
89 arrow::TimeUnit::type coerce_int96_timestamp_unit
= arrow::TimeUnit::NANO
;
93 Result
<bool> IsSupported(const FileSource
& source
) const override
;
95 /// \brief Return the schema of the file if possible.
96 Result
<std::shared_ptr
<Schema
>> Inspect(const FileSource
& source
) const override
;
98 /// \brief Open a file for scanning
99 Result
<ScanTaskIterator
> ScanFile(
100 const std::shared_ptr
<ScanOptions
>& options
,
101 const std::shared_ptr
<FileFragment
>& file
) const override
;
103 Result
<RecordBatchGenerator
> ScanBatchesAsync(
104 const std::shared_ptr
<ScanOptions
>& options
,
105 const std::shared_ptr
<FileFragment
>& file
) const override
;
107 Future
<util::optional
<int64_t>> CountRows(
108 const std::shared_ptr
<FileFragment
>& file
, compute::Expression predicate
,
109 const std::shared_ptr
<ScanOptions
>& options
) override
;
111 using FileFormat::MakeFragment
;
113 /// \brief Create a Fragment targeting all RowGroups.
114 Result
<std::shared_ptr
<FileFragment
>> MakeFragment(
115 FileSource source
, compute::Expression partition_expression
,
116 std::shared_ptr
<Schema
> physical_schema
) override
;
118 /// \brief Create a Fragment, restricted to the specified row groups.
119 Result
<std::shared_ptr
<ParquetFileFragment
>> MakeFragment(
120 FileSource source
, compute::Expression partition_expression
,
121 std::shared_ptr
<Schema
> physical_schema
, std::vector
<int> row_groups
);
123 /// \brief Return a FileReader on the given source.
124 Result
<std::unique_ptr
<parquet::arrow::FileReader
>> GetReader(
125 const FileSource
& source
, ScanOptions
* = NULLPTR
) const;
127 Future
<std::shared_ptr
<parquet::arrow::FileReader
>> GetReaderAsync(
128 const FileSource
& source
, const std::shared_ptr
<ScanOptions
>& options
) const;
130 Result
<std::shared_ptr
<FileWriter
>> MakeWriter(
131 std::shared_ptr
<io::OutputStream
> destination
, std::shared_ptr
<Schema
> schema
,
132 std::shared_ptr
<FileWriteOptions
> options
,
133 fs::FileLocator destination_locator
) const override
;
135 std::shared_ptr
<FileWriteOptions
> DefaultWriteOptions() override
;
138 /// \brief A FileFragment with parquet logic.
140 /// ParquetFileFragment provides a lazy (with respect to IO) interface to
141 /// scan parquet files. Any heavy IO calls are deferred to the Scan() method.
143 /// The caller can provide an optional list of selected RowGroups to limit the
144 /// number of scanned RowGroups, or to partition the scans across multiple
147 /// Metadata can be explicitly provided, enabling pushdown predicate benefits without
148 /// the potentially heavy IO of loading Metadata from the file system. This can induce
149 /// significant performance boost when scanning high latency file systems.
150 class ARROW_DS_EXPORT ParquetFileFragment
: public FileFragment
{
152 Result
<FragmentVector
> SplitByRowGroup(compute::Expression predicate
);
154 /// \brief Return the RowGroups selected by this fragment.
155 const std::vector
<int>& row_groups() const {
156 if (row_groups_
) return *row_groups_
;
157 static std::vector
<int> empty
;
161 /// \brief Return the FileMetaData associated with this fragment.
162 const std::shared_ptr
<parquet::FileMetaData
>& metadata() const { return metadata_
; }
164 /// \brief Ensure this fragment's FileMetaData is in memory.
165 Status
EnsureCompleteMetadata(parquet::arrow::FileReader
* reader
= NULLPTR
);
167 /// \brief Return fragment which selects a filtered subset of this fragment's RowGroups.
168 Result
<std::shared_ptr
<Fragment
>> Subset(compute::Expression predicate
);
169 Result
<std::shared_ptr
<Fragment
>> Subset(std::vector
<int> row_group_ids
);
172 ParquetFileFragment(FileSource source
, std::shared_ptr
<FileFormat
> format
,
173 compute::Expression partition_expression
,
174 std::shared_ptr
<Schema
> physical_schema
,
175 util::optional
<std::vector
<int>> row_groups
);
177 Status
SetMetadata(std::shared_ptr
<parquet::FileMetaData
> metadata
,
178 std::shared_ptr
<parquet::arrow::SchemaManifest
> manifest
);
180 // Overridden to opportunistically set metadata since a reader must be opened anyway.
181 Result
<std::shared_ptr
<Schema
>> ReadPhysicalSchemaImpl() override
{
182 ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
183 return physical_schema_
;
186 /// Return a filtered subset of row group indices.
187 Result
<std::vector
<int>> FilterRowGroups(compute::Expression predicate
);
188 /// Simplify the predicate against the statistics of each row group.
189 Result
<std::vector
<compute::Expression
>> TestRowGroups(compute::Expression predicate
);
190 /// Try to count rows matching the predicate using metadata. Expects
191 /// metadata to be present, and expects the predicate to have been
192 /// simplified against the partition expression already.
193 Result
<util::optional
<int64_t>> TryCountRows(compute::Expression predicate
);
195 ParquetFileFormat
& parquet_format_
;
197 /// Indices of row groups selected by this fragment,
198 /// or util::nullopt if all row groups are selected.
199 util::optional
<std::vector
<int>> row_groups_
;
201 std::vector
<compute::Expression
> statistics_expressions_
;
202 std::vector
<bool> statistics_expressions_complete_
;
203 std::shared_ptr
<parquet::FileMetaData
> metadata_
;
204 std::shared_ptr
<parquet::arrow::SchemaManifest
> manifest_
;
206 friend class ParquetFileFormat
;
207 friend class ParquetDatasetFactory
;
210 /// \brief Per-scan options for Parquet fragments
211 class ARROW_DS_EXPORT ParquetFragmentScanOptions
: public FragmentScanOptions
{
213 ParquetFragmentScanOptions();
214 std::string
type_name() const override
{ return kParquetTypeName
; }
216 /// Reader properties. Not all properties are respected: memory_pool comes from
218 std::shared_ptr
<parquet::ReaderProperties
> reader_properties
;
219 /// Arrow reader properties. Not all properties are respected: batch_size comes from
220 /// ScanOptions, and use_threads will be overridden based on
221 /// enable_parallel_column_conversion. Additionally, dictionary columns come from
222 /// ParquetFileFormat::ReaderOptions::dict_columns.
223 std::shared_ptr
<parquet::ArrowReaderProperties
> arrow_reader_properties
;
224 /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
225 /// scan is already parallelized across input files to avoid thread contention. This
226 /// option will be removed after support is added for simultaneous parallelization
227 /// across files and columns. Only affects the threaded reader; the async reader
228 /// will parallelize across columns if use_threads is enabled.
229 bool enable_parallel_column_conversion
= false;
232 class ARROW_DS_EXPORT ParquetFileWriteOptions
: public FileWriteOptions
{
234 /// \brief Parquet writer properties.
235 std::shared_ptr
<parquet::WriterProperties
> writer_properties
;
237 /// \brief Parquet Arrow writer properties.
238 std::shared_ptr
<parquet::ArrowWriterProperties
> arrow_writer_properties
;
241 using FileWriteOptions::FileWriteOptions
;
243 friend class ParquetFileFormat
;
246 class ARROW_DS_EXPORT ParquetFileWriter
: public FileWriter
{
248 const std::shared_ptr
<parquet::arrow::FileWriter
>& parquet_writer() const {
249 return parquet_writer_
;
252 Status
Write(const std::shared_ptr
<RecordBatch
>& batch
) override
;
255 ParquetFileWriter(std::shared_ptr
<io::OutputStream
> destination
,
256 std::shared_ptr
<parquet::arrow::FileWriter
> writer
,
257 std::shared_ptr
<ParquetFileWriteOptions
> options
,
258 fs::FileLocator destination_locator
);
260 Status
FinishInternal() override
;
262 std::shared_ptr
<parquet::arrow::FileWriter
> parquet_writer_
;
264 friend class ParquetFileFormat
;
267 /// \brief Options for making a FileSystemDataset from a Parquet _metadata file.
268 struct ParquetFactoryOptions
{
269 /// Either an explicit Partitioning or a PartitioningFactory to discover one.
271 /// If a factory is provided, it will be used to infer a schema for partition fields
272 /// based on file and directory paths then construct a Partitioning. The default
273 /// is a Partitioning which will yield no partition information.
275 /// The (explicit or discovered) partitioning will be applied to discovered files
276 /// and the resulting partition information embedded in the Dataset.
277 PartitioningOrFactory partitioning
{Partitioning::Default()};
279 /// For the purposes of applying the partitioning, paths will be stripped
280 /// of the partition_base_dir. Files not matching the partition_base_dir
281 /// prefix will be skipped for partition discovery. The ignored files will still
282 /// be part of the Dataset, but will not have partition information.
285 /// partition_base_dir = "/dataset";
287 /// - "/dataset/US/sales.csv" -> "US/sales.csv" will be given to the partitioning
289 /// - "/home/john/late_sales.csv" -> Will be ignored for partition discovery.
291 /// This is useful for partitioning which parses directory when ordering
292 /// is important, e.g. DirectoryPartitioning.
293 std::string partition_base_dir
;
295 /// Assert that all ColumnChunk paths are consistent. The parquet spec allows for
296 /// ColumnChunk data to be stored in multiple files, but ParquetDatasetFactory
297 /// supports only a single file with all ColumnChunk data. If this flag is set
298 /// construction of a ParquetDatasetFactory will raise an error if ColumnChunk
299 /// data is not resident in a single file.
300 bool validate_column_chunk_paths
= false;
303 /// \brief Create FileSystemDataset from custom `_metadata` cache file.
305 /// Dask and other systems will generate a cache metadata file by concatenating
306 /// the RowGroupMetaData of multiple parquet files into a single parquet file
307 /// that only contains metadata and no ColumnChunk data.
309 /// ParquetDatasetFactory creates a FileSystemDataset composed of
310 /// ParquetFileFragment where each fragment is pre-populated with the exact
311 /// number of row groups and statistics for each columns.
312 class ARROW_DS_EXPORT ParquetDatasetFactory
: public DatasetFactory
{
314 /// \brief Create a ParquetDatasetFactory from a metadata path.
316 /// The `metadata_path` will be read from `filesystem`. Each RowGroup
317 /// contained in the metadata file will be relative to `dirname(metadata_path)`.
319 /// \param[in] metadata_path path of the metadata parquet file
320 /// \param[in] filesystem from which to open/read the path
321 /// \param[in] format to read the file with.
322 /// \param[in] options see ParquetFactoryOptions
323 static Result
<std::shared_ptr
<DatasetFactory
>> Make(
324 const std::string
& metadata_path
, std::shared_ptr
<fs::FileSystem
> filesystem
,
325 std::shared_ptr
<ParquetFileFormat
> format
, ParquetFactoryOptions options
);
327 /// \brief Create a ParquetDatasetFactory from a metadata source.
329 /// Similar to the previous Make definition, but the metadata can be a Buffer
330 /// and the base_path is explicited instead of inferred from the metadata
333 /// \param[in] metadata source to open the metadata parquet file from
334 /// \param[in] base_path used as the prefix of every parquet files referenced
335 /// \param[in] filesystem from which to read the files referenced.
336 /// \param[in] format to read the file with.
337 /// \param[in] options see ParquetFactoryOptions
338 static Result
<std::shared_ptr
<DatasetFactory
>> Make(
339 const FileSource
& metadata
, const std::string
& base_path
,
340 std::shared_ptr
<fs::FileSystem
> filesystem
,
341 std::shared_ptr
<ParquetFileFormat
> format
, ParquetFactoryOptions options
);
343 Result
<std::vector
<std::shared_ptr
<Schema
>>> InspectSchemas(
344 InspectOptions options
) override
;
346 Result
<std::shared_ptr
<Dataset
>> Finish(FinishOptions options
) override
;
349 ParquetDatasetFactory(
350 std::shared_ptr
<fs::FileSystem
> filesystem
,
351 std::shared_ptr
<ParquetFileFormat
> format
,
352 std::shared_ptr
<parquet::FileMetaData
> metadata
,
353 std::shared_ptr
<parquet::arrow::SchemaManifest
> manifest
,
354 std::shared_ptr
<Schema
> physical_schema
, std::string base_path
,
355 ParquetFactoryOptions options
,
356 std::vector
<std::pair
<std::string
, std::vector
<int>>> paths_with_row_group_ids
)
357 : filesystem_(std::move(filesystem
)),
358 format_(std::move(format
)),
359 metadata_(std::move(metadata
)),
360 manifest_(std::move(manifest
)),
361 physical_schema_(std::move(physical_schema
)),
362 base_path_(std::move(base_path
)),
363 options_(std::move(options
)),
364 paths_with_row_group_ids_(std::move(paths_with_row_group_ids
)) {}
366 std::shared_ptr
<fs::FileSystem
> filesystem_
;
367 std::shared_ptr
<ParquetFileFormat
> format_
;
368 std::shared_ptr
<parquet::FileMetaData
> metadata_
;
369 std::shared_ptr
<parquet::arrow::SchemaManifest
> manifest_
;
370 std::shared_ptr
<Schema
> physical_schema_
;
371 std::string base_path_
;
372 ParquetFactoryOptions options_
;
373 std::vector
<std::pair
<std::string
, std::vector
<int>>> paths_with_row_group_ids_
;
376 Result
<std::vector
<std::shared_ptr
<FileFragment
>>> CollectParquetFragments(
377 const Partitioning
& partitioning
);
379 Result
<std::shared_ptr
<Schema
>> PartitionSchema();
384 } // namespace dataset