]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/dataset/dataset.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / dataset.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // This API is EXPERIMENTAL.
19
20 #pragma once
21
22 #include <functional>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "arrow/compute/exec/expression.h"
29 #include "arrow/dataset/type_fwd.h"
30 #include "arrow/dataset/visibility.h"
31 #include "arrow/util/macros.h"
32 #include "arrow/util/mutex.h"
33 #include "arrow/util/optional.h"
34
35 namespace arrow {
36 namespace dataset {
37
38 using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;
39
40 /// \brief A granular piece of a Dataset, such as an individual file.
41 ///
42 /// A Fragment can be read/scanned separately from other fragments. It yields a
43 /// collection of RecordBatches when scanned, encapsulated in one or more
44 /// ScanTasks.
45 ///
46 /// Note that Fragments have well defined physical schemas which are reconciled by
47 /// the Datasets which contain them; these physical schemas may differ from a parent
48 /// Dataset's schema and the physical schemas of sibling Fragments.
49 class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
50 public:
51 /// \brief Return the physical schema of the Fragment.
52 ///
53 /// The physical schema is also called the writer schema.
54 /// This method is blocking and may suffer from high latency filesystem.
55 /// The schema is cached after being read once, or may be specified at construction.
56 Result<std::shared_ptr<Schema>> ReadPhysicalSchema();
57
58 /// \brief Scan returns an iterator of ScanTasks, each of which yields
59 /// RecordBatches from this Fragment.
60 ///
61 /// Note that batches yielded using this method will not be filtered and may not align
62 /// with the Fragment's schema. In particular, note that columns referenced by the
63 /// filter may be present in yielded batches even if they are not projected (so that
64 /// they are available when a filter is applied). Additionally, explicitly projected
65 /// columns may be absent if they were not present in this fragment.
66 ///
67 /// To receive a record batch stream which is fully filtered and projected, use Scanner.
68 virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) = 0;
69
70 /// An asynchronous version of Scan
71 virtual Result<RecordBatchGenerator> ScanBatchesAsync(
72 const std::shared_ptr<ScanOptions>& options) = 0;
73
74 /// \brief Count the number of rows in this fragment matching the filter using metadata
75 /// only. That is, this method may perform I/O, but will not load data.
76 ///
77 /// If this is not possible, resolve with an empty optional. The fragment can perform
78 /// I/O (e.g. to read metadata) before it deciding whether it can satisfy the request.
79 virtual Future<util::optional<int64_t>> CountRows(
80 compute::Expression predicate, const std::shared_ptr<ScanOptions>& options);
81
82 virtual std::string type_name() const = 0;
83 virtual std::string ToString() const { return type_name(); }
84
85 /// \brief An expression which evaluates to true for all data viewed by this
86 /// Fragment.
87 const compute::Expression& partition_expression() const {
88 return partition_expression_;
89 }
90
91 virtual ~Fragment() = default;
92
93 protected:
94 Fragment() = default;
95 explicit Fragment(compute::Expression partition_expression,
96 std::shared_ptr<Schema> physical_schema);
97
98 virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() = 0;
99
100 util::Mutex physical_schema_mutex_;
101 compute::Expression partition_expression_ = compute::literal(true);
102 std::shared_ptr<Schema> physical_schema_;
103 };
104
105 /// \brief Per-scan options for fragment(s) in a dataset.
106 ///
107 /// These options are not intrinsic to the format or fragment itself, but do affect
108 /// the results of a scan. These are options which make sense to change between
109 /// repeated reads of the same dataset, such as format-specific conversion options
110 /// (that do not affect the schema).
111 ///
112 /// \ingroup dataset-scanning
113 class ARROW_DS_EXPORT FragmentScanOptions {
114 public:
115 virtual std::string type_name() const = 0;
116 virtual std::string ToString() const { return type_name(); }
117 virtual ~FragmentScanOptions() = default;
118 };
119
120 /// \defgroup dataset-implementations Concrete implementations
121 ///
122 /// @{
123
124 /// \brief A trivial Fragment that yields ScanTask out of a fixed set of
125 /// RecordBatch.
126 class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
127 public:
128 InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
129 compute::Expression = compute::literal(true));
130 explicit InMemoryFragment(RecordBatchVector record_batches,
131 compute::Expression = compute::literal(true));
132
133 Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override;
134 Result<RecordBatchGenerator> ScanBatchesAsync(
135 const std::shared_ptr<ScanOptions>& options) override;
136 Future<util::optional<int64_t>> CountRows(
137 compute::Expression predicate,
138 const std::shared_ptr<ScanOptions>& options) override;
139
140 std::string type_name() const override { return "in-memory"; }
141
142 protected:
143 Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
144
145 RecordBatchVector record_batches_;
146 };
147
148 /// @}
149
150 /// \brief A container of zero or more Fragments.
151 ///
152 /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
153 /// directory. A Dataset has a schema to which Fragments must align during a
154 /// scan operation. This is analogous to Avro's reader and writer schema.
155 class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
156 public:
157 /// \brief Begin to build a new Scan operation against this Dataset
158 Result<std::shared_ptr<ScannerBuilder>> NewScan();
159
160 /// \brief GetFragments returns an iterator of Fragments given a predicate.
161 Result<FragmentIterator> GetFragments(compute::Expression predicate);
162 Result<FragmentIterator> GetFragments();
163
164 const std::shared_ptr<Schema>& schema() const { return schema_; }
165
166 /// \brief An expression which evaluates to true for all data viewed by this Dataset.
167 /// May be null, which indicates no information is available.
168 const compute::Expression& partition_expression() const {
169 return partition_expression_;
170 }
171
172 /// \brief The name identifying the kind of Dataset
173 virtual std::string type_name() const = 0;
174
175 /// \brief Return a copy of this Dataset with a different schema.
176 ///
177 /// The copy will view the same Fragments. If the new schema is not compatible with the
178 /// original dataset's schema then an error will be raised.
179 virtual Result<std::shared_ptr<Dataset>> ReplaceSchema(
180 std::shared_ptr<Schema> schema) const = 0;
181
182 virtual ~Dataset() = default;
183
184 protected:
185 explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}
186
187 Dataset(std::shared_ptr<Schema> schema, compute::Expression partition_expression);
188
189 virtual Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) = 0;
190
191 std::shared_ptr<Schema> schema_;
192 compute::Expression partition_expression_ = compute::literal(true);
193 };
194
195 /// \addtogroup dataset-implementations
196 ///
197 /// @{
198
199 /// \brief A Source which yields fragments wrapping a stream of record batches.
200 ///
201 /// The record batches must match the schema provided to the source at construction.
202 class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
203 public:
204 class RecordBatchGenerator {
205 public:
206 virtual ~RecordBatchGenerator() = default;
207 virtual RecordBatchIterator Get() const = 0;
208 };
209
210 /// Construct a dataset from a schema and a factory of record batch iterators.
211 InMemoryDataset(std::shared_ptr<Schema> schema,
212 std::shared_ptr<RecordBatchGenerator> get_batches)
213 : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}
214
215 /// Convenience constructor taking a fixed list of batches
216 InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches);
217
218 /// Convenience constructor taking a Table
219 explicit InMemoryDataset(std::shared_ptr<Table> table);
220
221 std::string type_name() const override { return "in-memory"; }
222
223 Result<std::shared_ptr<Dataset>> ReplaceSchema(
224 std::shared_ptr<Schema> schema) const override;
225
226 protected:
227 Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
228
229 std::shared_ptr<RecordBatchGenerator> get_batches_;
230 };
231
232 /// \brief A Dataset wrapping child Datasets.
233 class ARROW_DS_EXPORT UnionDataset : public Dataset {
234 public:
235 /// \brief Construct a UnionDataset wrapping child Datasets.
236 ///
237 /// \param[in] schema the schema of the resulting dataset.
238 /// \param[in] children one or more child Datasets. Their schemas must be identical to
239 /// schema.
240 static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema,
241 DatasetVector children);
242
243 const DatasetVector& children() const { return children_; }
244
245 std::string type_name() const override { return "union"; }
246
247 Result<std::shared_ptr<Dataset>> ReplaceSchema(
248 std::shared_ptr<Schema> schema) const override;
249
250 protected:
251 Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
252
253 explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
254 : Dataset(std::move(schema)), children_(std::move(children)) {}
255
256 DatasetVector children_;
257
258 friend class UnionDatasetFactory;
259 };
260
261 /// @}
262
263 } // namespace dataset
264 } // namespace arrow