]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/dataset/file_parquet.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / file_parquet.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // This API is EXPERIMENTAL.
19
20 #pragma once
21
22 #include <memory>
23 #include <string>
24 #include <unordered_set>
25 #include <utility>
26 #include <vector>
27
28 #include "arrow/dataset/discovery.h"
29 #include "arrow/dataset/file_base.h"
30 #include "arrow/dataset/type_fwd.h"
31 #include "arrow/dataset/visibility.h"
32 #include "arrow/io/caching.h"
33 #include "arrow/util/optional.h"
34
35 namespace parquet {
36 class ParquetFileReader;
37 class Statistics;
38 class ColumnChunkMetaData;
39 class RowGroupMetaData;
40 class FileMetaData;
41 class FileDecryptionProperties;
42 class FileEncryptionProperties;
43
44 class ReaderProperties;
45 class ArrowReaderProperties;
46
47 class WriterProperties;
48 class ArrowWriterProperties;
49
50 namespace arrow {
51 class FileReader;
52 class FileWriter;
53 struct SchemaManifest;
54 } // namespace arrow
55 } // namespace parquet
56
57 namespace arrow {
58 namespace dataset {
59
60 /// \addtogroup dataset-file-formats
61 ///
62 /// @{
63
64 constexpr char kParquetTypeName[] = "parquet";
65
66 /// \brief A FileFormat implementation that reads from Parquet files
67 class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
68 public:
69 ParquetFileFormat() = default;
70
71 /// Convenience constructor which copies properties from a parquet::ReaderProperties.
72 /// memory_pool will be ignored.
73 explicit ParquetFileFormat(const parquet::ReaderProperties& reader_properties);
74
75 std::string type_name() const override { return kParquetTypeName; }
76
77 bool Equals(const FileFormat& other) const override;
78
79 struct ReaderOptions {
80 /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond
81 /// to members of parquet::ArrowReaderProperties.
82 ///
83 /// We don't embed parquet::ReaderProperties directly because column names (rather
84 /// than indices) are used to indicate dictionary columns, and other options are
85 /// deferred to scan time.
86 ///
87 /// @{
88 std::unordered_set<std::string> dict_columns;
89 arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO;
90 /// @}
91 } reader_options;
92
93 Result<bool> IsSupported(const FileSource& source) const override;
94
95 /// \brief Return the schema of the file if possible.
96 Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
97
98 /// \brief Open a file for scanning
99 Result<ScanTaskIterator> ScanFile(
100 const std::shared_ptr<ScanOptions>& options,
101 const std::shared_ptr<FileFragment>& file) const override;
102
103 Result<RecordBatchGenerator> ScanBatchesAsync(
104 const std::shared_ptr<ScanOptions>& options,
105 const std::shared_ptr<FileFragment>& file) const override;
106
107 Future<util::optional<int64_t>> CountRows(
108 const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
109 const std::shared_ptr<ScanOptions>& options) override;
110
111 using FileFormat::MakeFragment;
112
113 /// \brief Create a Fragment targeting all RowGroups.
114 Result<std::shared_ptr<FileFragment>> MakeFragment(
115 FileSource source, compute::Expression partition_expression,
116 std::shared_ptr<Schema> physical_schema) override;
117
118 /// \brief Create a Fragment, restricted to the specified row groups.
119 Result<std::shared_ptr<ParquetFileFragment>> MakeFragment(
120 FileSource source, compute::Expression partition_expression,
121 std::shared_ptr<Schema> physical_schema, std::vector<int> row_groups);
122
123 /// \brief Return a FileReader on the given source.
124 Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
125 const FileSource& source, ScanOptions* = NULLPTR) const;
126
127 Future<std::shared_ptr<parquet::arrow::FileReader>> GetReaderAsync(
128 const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;
129
130 Result<std::shared_ptr<FileWriter>> MakeWriter(
131 std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
132 std::shared_ptr<FileWriteOptions> options,
133 fs::FileLocator destination_locator) const override;
134
135 std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
136 };
137
138 /// \brief A FileFragment with parquet logic.
139 ///
140 /// ParquetFileFragment provides a lazy (with respect to IO) interface to
141 /// scan parquet files. Any heavy IO calls are deferred to the Scan() method.
142 ///
143 /// The caller can provide an optional list of selected RowGroups to limit the
144 /// number of scanned RowGroups, or to partition the scans across multiple
145 /// threads.
146 ///
147 /// Metadata can be explicitly provided, enabling pushdown predicate benefits without
148 /// the potentially heavy IO of loading Metadata from the file system. This can induce
149 /// significant performance boost when scanning high latency file systems.
150 class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
151 public:
152 Result<FragmentVector> SplitByRowGroup(compute::Expression predicate);
153
154 /// \brief Return the RowGroups selected by this fragment.
155 const std::vector<int>& row_groups() const {
156 if (row_groups_) return *row_groups_;
157 static std::vector<int> empty;
158 return empty;
159 }
160
161 /// \brief Return the FileMetaData associated with this fragment.
162 const std::shared_ptr<parquet::FileMetaData>& metadata() const { return metadata_; }
163
164 /// \brief Ensure this fragment's FileMetaData is in memory.
165 Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR);
166
167 /// \brief Return fragment which selects a filtered subset of this fragment's RowGroups.
168 Result<std::shared_ptr<Fragment>> Subset(compute::Expression predicate);
169 Result<std::shared_ptr<Fragment>> Subset(std::vector<int> row_group_ids);
170
171 private:
172 ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
173 compute::Expression partition_expression,
174 std::shared_ptr<Schema> physical_schema,
175 util::optional<std::vector<int>> row_groups);
176
177 Status SetMetadata(std::shared_ptr<parquet::FileMetaData> metadata,
178 std::shared_ptr<parquet::arrow::SchemaManifest> manifest);
179
180 // Overridden to opportunistically set metadata since a reader must be opened anyway.
181 Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
182 ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
183 return physical_schema_;
184 }
185
186 /// Return a filtered subset of row group indices.
187 Result<std::vector<int>> FilterRowGroups(compute::Expression predicate);
188 /// Simplify the predicate against the statistics of each row group.
189 Result<std::vector<compute::Expression>> TestRowGroups(compute::Expression predicate);
190 /// Try to count rows matching the predicate using metadata. Expects
191 /// metadata to be present, and expects the predicate to have been
192 /// simplified against the partition expression already.
193 Result<util::optional<int64_t>> TryCountRows(compute::Expression predicate);
194
195 ParquetFileFormat& parquet_format_;
196
197 /// Indices of row groups selected by this fragment,
198 /// or util::nullopt if all row groups are selected.
199 util::optional<std::vector<int>> row_groups_;
200
201 std::vector<compute::Expression> statistics_expressions_;
202 std::vector<bool> statistics_expressions_complete_;
203 std::shared_ptr<parquet::FileMetaData> metadata_;
204 std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
205
206 friend class ParquetFileFormat;
207 friend class ParquetDatasetFactory;
208 };
209
210 /// \brief Per-scan options for Parquet fragments
211 class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
212 public:
213 ParquetFragmentScanOptions();
214 std::string type_name() const override { return kParquetTypeName; }
215
216 /// Reader properties. Not all properties are respected: memory_pool comes from
217 /// ScanOptions.
218 std::shared_ptr<parquet::ReaderProperties> reader_properties;
219 /// Arrow reader properties. Not all properties are respected: batch_size comes from
220 /// ScanOptions, and use_threads will be overridden based on
221 /// enable_parallel_column_conversion. Additionally, dictionary columns come from
222 /// ParquetFileFormat::ReaderOptions::dict_columns.
223 std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
224 /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
225 /// scan is already parallelized across input files to avoid thread contention. This
226 /// option will be removed after support is added for simultaneous parallelization
227 /// across files and columns. Only affects the threaded reader; the async reader
228 /// will parallelize across columns if use_threads is enabled.
229 bool enable_parallel_column_conversion = false;
230 };
231
232 class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
233 public:
234 /// \brief Parquet writer properties.
235 std::shared_ptr<parquet::WriterProperties> writer_properties;
236
237 /// \brief Parquet Arrow writer properties.
238 std::shared_ptr<parquet::ArrowWriterProperties> arrow_writer_properties;
239
240 protected:
241 using FileWriteOptions::FileWriteOptions;
242
243 friend class ParquetFileFormat;
244 };
245
246 class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter {
247 public:
248 const std::shared_ptr<parquet::arrow::FileWriter>& parquet_writer() const {
249 return parquet_writer_;
250 }
251
252 Status Write(const std::shared_ptr<RecordBatch>& batch) override;
253
254 private:
255 ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
256 std::shared_ptr<parquet::arrow::FileWriter> writer,
257 std::shared_ptr<ParquetFileWriteOptions> options,
258 fs::FileLocator destination_locator);
259
260 Status FinishInternal() override;
261
262 std::shared_ptr<parquet::arrow::FileWriter> parquet_writer_;
263
264 friend class ParquetFileFormat;
265 };
266
267 /// \brief Options for making a FileSystemDataset from a Parquet _metadata file.
268 struct ParquetFactoryOptions {
269 /// Either an explicit Partitioning or a PartitioningFactory to discover one.
270 ///
271 /// If a factory is provided, it will be used to infer a schema for partition fields
272 /// based on file and directory paths then construct a Partitioning. The default
273 /// is a Partitioning which will yield no partition information.
274 ///
275 /// The (explicit or discovered) partitioning will be applied to discovered files
276 /// and the resulting partition information embedded in the Dataset.
277 PartitioningOrFactory partitioning{Partitioning::Default()};
278
279 /// For the purposes of applying the partitioning, paths will be stripped
280 /// of the partition_base_dir. Files not matching the partition_base_dir
281 /// prefix will be skipped for partition discovery. The ignored files will still
282 /// be part of the Dataset, but will not have partition information.
283 ///
284 /// Example:
285 /// partition_base_dir = "/dataset";
286 ///
287 /// - "/dataset/US/sales.csv" -> "US/sales.csv" will be given to the partitioning
288 ///
289 /// - "/home/john/late_sales.csv" -> Will be ignored for partition discovery.
290 ///
291 /// This is useful for partitioning which parses directory when ordering
292 /// is important, e.g. DirectoryPartitioning.
293 std::string partition_base_dir;
294
295 /// Assert that all ColumnChunk paths are consistent. The parquet spec allows for
296 /// ColumnChunk data to be stored in multiple files, but ParquetDatasetFactory
297 /// supports only a single file with all ColumnChunk data. If this flag is set
298 /// construction of a ParquetDatasetFactory will raise an error if ColumnChunk
299 /// data is not resident in a single file.
300 bool validate_column_chunk_paths = false;
301 };
302
303 /// \brief Create FileSystemDataset from custom `_metadata` cache file.
304 ///
305 /// Dask and other systems will generate a cache metadata file by concatenating
306 /// the RowGroupMetaData of multiple parquet files into a single parquet file
307 /// that only contains metadata and no ColumnChunk data.
308 ///
309 /// ParquetDatasetFactory creates a FileSystemDataset composed of
310 /// ParquetFileFragment where each fragment is pre-populated with the exact
311 /// number of row groups and statistics for each columns.
312 class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
313 public:
314 /// \brief Create a ParquetDatasetFactory from a metadata path.
315 ///
316 /// The `metadata_path` will be read from `filesystem`. Each RowGroup
317 /// contained in the metadata file will be relative to `dirname(metadata_path)`.
318 ///
319 /// \param[in] metadata_path path of the metadata parquet file
320 /// \param[in] filesystem from which to open/read the path
321 /// \param[in] format to read the file with.
322 /// \param[in] options see ParquetFactoryOptions
323 static Result<std::shared_ptr<DatasetFactory>> Make(
324 const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
325 std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options);
326
327 /// \brief Create a ParquetDatasetFactory from a metadata source.
328 ///
329 /// Similar to the previous Make definition, but the metadata can be a Buffer
330 /// and the base_path is explicited instead of inferred from the metadata
331 /// path.
332 ///
333 /// \param[in] metadata source to open the metadata parquet file from
334 /// \param[in] base_path used as the prefix of every parquet files referenced
335 /// \param[in] filesystem from which to read the files referenced.
336 /// \param[in] format to read the file with.
337 /// \param[in] options see ParquetFactoryOptions
338 static Result<std::shared_ptr<DatasetFactory>> Make(
339 const FileSource& metadata, const std::string& base_path,
340 std::shared_ptr<fs::FileSystem> filesystem,
341 std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options);
342
343 Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
344 InspectOptions options) override;
345
346 Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
347
348 protected:
349 ParquetDatasetFactory(
350 std::shared_ptr<fs::FileSystem> filesystem,
351 std::shared_ptr<ParquetFileFormat> format,
352 std::shared_ptr<parquet::FileMetaData> metadata,
353 std::shared_ptr<parquet::arrow::SchemaManifest> manifest,
354 std::shared_ptr<Schema> physical_schema, std::string base_path,
355 ParquetFactoryOptions options,
356 std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids)
357 : filesystem_(std::move(filesystem)),
358 format_(std::move(format)),
359 metadata_(std::move(metadata)),
360 manifest_(std::move(manifest)),
361 physical_schema_(std::move(physical_schema)),
362 base_path_(std::move(base_path)),
363 options_(std::move(options)),
364 paths_with_row_group_ids_(std::move(paths_with_row_group_ids)) {}
365
366 std::shared_ptr<fs::FileSystem> filesystem_;
367 std::shared_ptr<ParquetFileFormat> format_;
368 std::shared_ptr<parquet::FileMetaData> metadata_;
369 std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
370 std::shared_ptr<Schema> physical_schema_;
371 std::string base_path_;
372 ParquetFactoryOptions options_;
373 std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids_;
374
375 private:
376 Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
377 const Partitioning& partitioning);
378
379 Result<std::shared_ptr<Schema>> PartitionSchema();
380 };
381
382 /// @}
383
384 } // namespace dataset
385 } // namespace arrow