]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | // This API is EXPERIMENTAL. | |
19 | ||
20 | #pragma once | |
21 | ||
22 | #include <functional> | |
23 | #include <memory> | |
24 | #include <string> | |
25 | #include <utility> | |
26 | #include <vector> | |
27 | ||
28 | #include "arrow/compute/exec/expression.h" | |
29 | #include "arrow/dataset/type_fwd.h" | |
30 | #include "arrow/dataset/visibility.h" | |
31 | #include "arrow/util/macros.h" | |
32 | #include "arrow/util/mutex.h" | |
33 | #include "arrow/util/optional.h" | |
34 | ||
35 | namespace arrow { | |
36 | namespace dataset { | |
37 | ||
38 | using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>; | |
39 | ||
40 | /// \brief A granular piece of a Dataset, such as an individual file. | |
41 | /// | |
42 | /// A Fragment can be read/scanned separately from other fragments. It yields a | |
43 | /// collection of RecordBatches when scanned, encapsulated in one or more | |
44 | /// ScanTasks. | |
45 | /// | |
46 | /// Note that Fragments have well defined physical schemas which are reconciled by | |
47 | /// the Datasets which contain them; these physical schemas may differ from a parent | |
48 | /// Dataset's schema and the physical schemas of sibling Fragments. | |
49 | class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> { | |
50 | public: | |
51 | /// \brief Return the physical schema of the Fragment. | |
52 | /// | |
53 | /// The physical schema is also called the writer schema. | |
54 | /// This method is blocking and may suffer from high latency filesystem. | |
55 | /// The schema is cached after being read once, or may be specified at construction. | |
56 | Result<std::shared_ptr<Schema>> ReadPhysicalSchema(); | |
57 | ||
58 | /// \brief Scan returns an iterator of ScanTasks, each of which yields | |
59 | /// RecordBatches from this Fragment. | |
60 | /// | |
61 | /// Note that batches yielded using this method will not be filtered and may not align | |
62 | /// with the Fragment's schema. In particular, note that columns referenced by the | |
63 | /// filter may be present in yielded batches even if they are not projected (so that | |
64 | /// they are available when a filter is applied). Additionally, explicitly projected | |
65 | /// columns may be absent if they were not present in this fragment. | |
66 | /// | |
67 | /// To receive a record batch stream which is fully filtered and projected, use Scanner. | |
68 | virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) = 0; | |
69 | ||
70 | /// An asynchronous version of Scan | |
71 | virtual Result<RecordBatchGenerator> ScanBatchesAsync( | |
72 | const std::shared_ptr<ScanOptions>& options) = 0; | |
73 | ||
74 | /// \brief Count the number of rows in this fragment matching the filter using metadata | |
75 | /// only. That is, this method may perform I/O, but will not load data. | |
76 | /// | |
77 | /// If this is not possible, resolve with an empty optional. The fragment can perform | |
78 | /// I/O (e.g. to read metadata) before it deciding whether it can satisfy the request. | |
79 | virtual Future<util::optional<int64_t>> CountRows( | |
80 | compute::Expression predicate, const std::shared_ptr<ScanOptions>& options); | |
81 | ||
82 | virtual std::string type_name() const = 0; | |
83 | virtual std::string ToString() const { return type_name(); } | |
84 | ||
85 | /// \brief An expression which evaluates to true for all data viewed by this | |
86 | /// Fragment. | |
87 | const compute::Expression& partition_expression() const { | |
88 | return partition_expression_; | |
89 | } | |
90 | ||
91 | virtual ~Fragment() = default; | |
92 | ||
93 | protected: | |
94 | Fragment() = default; | |
95 | explicit Fragment(compute::Expression partition_expression, | |
96 | std::shared_ptr<Schema> physical_schema); | |
97 | ||
98 | virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() = 0; | |
99 | ||
100 | util::Mutex physical_schema_mutex_; | |
101 | compute::Expression partition_expression_ = compute::literal(true); | |
102 | std::shared_ptr<Schema> physical_schema_; | |
103 | }; | |
104 | ||
105 | /// \brief Per-scan options for fragment(s) in a dataset. | |
106 | /// | |
107 | /// These options are not intrinsic to the format or fragment itself, but do affect | |
108 | /// the results of a scan. These are options which make sense to change between | |
109 | /// repeated reads of the same dataset, such as format-specific conversion options | |
110 | /// (that do not affect the schema). | |
111 | /// | |
112 | /// \ingroup dataset-scanning | |
113 | class ARROW_DS_EXPORT FragmentScanOptions { | |
114 | public: | |
115 | virtual std::string type_name() const = 0; | |
116 | virtual std::string ToString() const { return type_name(); } | |
117 | virtual ~FragmentScanOptions() = default; | |
118 | }; | |
119 | ||
120 | /// \defgroup dataset-implementations Concrete implementations | |
121 | /// | |
122 | /// @{ | |
123 | ||
124 | /// \brief A trivial Fragment that yields ScanTask out of a fixed set of | |
125 | /// RecordBatch. | |
126 | class ARROW_DS_EXPORT InMemoryFragment : public Fragment { | |
127 | public: | |
128 | InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches, | |
129 | compute::Expression = compute::literal(true)); | |
130 | explicit InMemoryFragment(RecordBatchVector record_batches, | |
131 | compute::Expression = compute::literal(true)); | |
132 | ||
133 | Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override; | |
134 | Result<RecordBatchGenerator> ScanBatchesAsync( | |
135 | const std::shared_ptr<ScanOptions>& options) override; | |
136 | Future<util::optional<int64_t>> CountRows( | |
137 | compute::Expression predicate, | |
138 | const std::shared_ptr<ScanOptions>& options) override; | |
139 | ||
140 | std::string type_name() const override { return "in-memory"; } | |
141 | ||
142 | protected: | |
143 | Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override; | |
144 | ||
145 | RecordBatchVector record_batches_; | |
146 | }; | |
147 | ||
148 | /// @} | |
149 | ||
150 | /// \brief A container of zero or more Fragments. | |
151 | /// | |
152 | /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a | |
153 | /// directory. A Dataset has a schema to which Fragments must align during a | |
154 | /// scan operation. This is analogous to Avro's reader and writer schema. | |
155 | class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> { | |
156 | public: | |
157 | /// \brief Begin to build a new Scan operation against this Dataset | |
158 | Result<std::shared_ptr<ScannerBuilder>> NewScan(); | |
159 | ||
160 | /// \brief GetFragments returns an iterator of Fragments given a predicate. | |
161 | Result<FragmentIterator> GetFragments(compute::Expression predicate); | |
162 | Result<FragmentIterator> GetFragments(); | |
163 | ||
164 | const std::shared_ptr<Schema>& schema() const { return schema_; } | |
165 | ||
166 | /// \brief An expression which evaluates to true for all data viewed by this Dataset. | |
167 | /// May be null, which indicates no information is available. | |
168 | const compute::Expression& partition_expression() const { | |
169 | return partition_expression_; | |
170 | } | |
171 | ||
172 | /// \brief The name identifying the kind of Dataset | |
173 | virtual std::string type_name() const = 0; | |
174 | ||
175 | /// \brief Return a copy of this Dataset with a different schema. | |
176 | /// | |
177 | /// The copy will view the same Fragments. If the new schema is not compatible with the | |
178 | /// original dataset's schema then an error will be raised. | |
179 | virtual Result<std::shared_ptr<Dataset>> ReplaceSchema( | |
180 | std::shared_ptr<Schema> schema) const = 0; | |
181 | ||
182 | virtual ~Dataset() = default; | |
183 | ||
184 | protected: | |
185 | explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {} | |
186 | ||
187 | Dataset(std::shared_ptr<Schema> schema, compute::Expression partition_expression); | |
188 | ||
189 | virtual Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) = 0; | |
190 | ||
191 | std::shared_ptr<Schema> schema_; | |
192 | compute::Expression partition_expression_ = compute::literal(true); | |
193 | }; | |
194 | ||
195 | /// \addtogroup dataset-implementations | |
196 | /// | |
197 | /// @{ | |
198 | ||
199 | /// \brief A Source which yields fragments wrapping a stream of record batches. | |
200 | /// | |
201 | /// The record batches must match the schema provided to the source at construction. | |
202 | class ARROW_DS_EXPORT InMemoryDataset : public Dataset { | |
203 | public: | |
204 | class RecordBatchGenerator { | |
205 | public: | |
206 | virtual ~RecordBatchGenerator() = default; | |
207 | virtual RecordBatchIterator Get() const = 0; | |
208 | }; | |
209 | ||
210 | /// Construct a dataset from a schema and a factory of record batch iterators. | |
211 | InMemoryDataset(std::shared_ptr<Schema> schema, | |
212 | std::shared_ptr<RecordBatchGenerator> get_batches) | |
213 | : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {} | |
214 | ||
215 | /// Convenience constructor taking a fixed list of batches | |
216 | InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches); | |
217 | ||
218 | /// Convenience constructor taking a Table | |
219 | explicit InMemoryDataset(std::shared_ptr<Table> table); | |
220 | ||
221 | std::string type_name() const override { return "in-memory"; } | |
222 | ||
223 | Result<std::shared_ptr<Dataset>> ReplaceSchema( | |
224 | std::shared_ptr<Schema> schema) const override; | |
225 | ||
226 | protected: | |
227 | Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override; | |
228 | ||
229 | std::shared_ptr<RecordBatchGenerator> get_batches_; | |
230 | }; | |
231 | ||
232 | /// \brief A Dataset wrapping child Datasets. | |
233 | class ARROW_DS_EXPORT UnionDataset : public Dataset { | |
234 | public: | |
235 | /// \brief Construct a UnionDataset wrapping child Datasets. | |
236 | /// | |
237 | /// \param[in] schema the schema of the resulting dataset. | |
238 | /// \param[in] children one or more child Datasets. Their schemas must be identical to | |
239 | /// schema. | |
240 | static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema, | |
241 | DatasetVector children); | |
242 | ||
243 | const DatasetVector& children() const { return children_; } | |
244 | ||
245 | std::string type_name() const override { return "union"; } | |
246 | ||
247 | Result<std::shared_ptr<Dataset>> ReplaceSchema( | |
248 | std::shared_ptr<Schema> schema) const override; | |
249 | ||
250 | protected: | |
251 | Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override; | |
252 | ||
253 | explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children) | |
254 | : Dataset(std::move(schema)), children_(std::move(children)) {} | |
255 | ||
256 | DatasetVector children_; | |
257 | ||
258 | friend class UnionDatasetFactory; | |
259 | }; | |
260 | ||
261 | /// @} | |
262 | ||
263 | } // namespace dataset | |
264 | } // namespace arrow |