]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/dataset/dataset.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / dataset.h
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// This API is EXPERIMENTAL.
19
20#pragma once
21
22#include <functional>
23#include <memory>
24#include <string>
25#include <utility>
26#include <vector>
27
28#include "arrow/compute/exec/expression.h"
29#include "arrow/dataset/type_fwd.h"
30#include "arrow/dataset/visibility.h"
31#include "arrow/util/macros.h"
32#include "arrow/util/mutex.h"
33#include "arrow/util/optional.h"
34
35namespace arrow {
36namespace dataset {
37
38using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;
39
40/// \brief A granular piece of a Dataset, such as an individual file.
41///
42/// A Fragment can be read/scanned separately from other fragments. It yields a
43/// collection of RecordBatches when scanned, encapsulated in one or more
44/// ScanTasks.
45///
46/// Note that Fragments have well defined physical schemas which are reconciled by
47/// the Datasets which contain them; these physical schemas may differ from a parent
48/// Dataset's schema and the physical schemas of sibling Fragments.
49class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
50 public:
51 /// \brief Return the physical schema of the Fragment.
52 ///
53 /// The physical schema is also called the writer schema.
54 /// This method is blocking and may suffer from high latency filesystem.
55 /// The schema is cached after being read once, or may be specified at construction.
56 Result<std::shared_ptr<Schema>> ReadPhysicalSchema();
57
58 /// \brief Scan returns an iterator of ScanTasks, each of which yields
59 /// RecordBatches from this Fragment.
60 ///
61 /// Note that batches yielded using this method will not be filtered and may not align
62 /// with the Fragment's schema. In particular, note that columns referenced by the
63 /// filter may be present in yielded batches even if they are not projected (so that
64 /// they are available when a filter is applied). Additionally, explicitly projected
65 /// columns may be absent if they were not present in this fragment.
66 ///
67 /// To receive a record batch stream which is fully filtered and projected, use Scanner.
68 virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) = 0;
69
70 /// An asynchronous version of Scan
71 virtual Result<RecordBatchGenerator> ScanBatchesAsync(
72 const std::shared_ptr<ScanOptions>& options) = 0;
73
74 /// \brief Count the number of rows in this fragment matching the filter using metadata
75 /// only. That is, this method may perform I/O, but will not load data.
76 ///
77 /// If this is not possible, resolve with an empty optional. The fragment can perform
78 /// I/O (e.g. to read metadata) before it deciding whether it can satisfy the request.
79 virtual Future<util::optional<int64_t>> CountRows(
80 compute::Expression predicate, const std::shared_ptr<ScanOptions>& options);
81
82 virtual std::string type_name() const = 0;
83 virtual std::string ToString() const { return type_name(); }
84
85 /// \brief An expression which evaluates to true for all data viewed by this
86 /// Fragment.
87 const compute::Expression& partition_expression() const {
88 return partition_expression_;
89 }
90
91 virtual ~Fragment() = default;
92
93 protected:
94 Fragment() = default;
95 explicit Fragment(compute::Expression partition_expression,
96 std::shared_ptr<Schema> physical_schema);
97
98 virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() = 0;
99
100 util::Mutex physical_schema_mutex_;
101 compute::Expression partition_expression_ = compute::literal(true);
102 std::shared_ptr<Schema> physical_schema_;
103};
104
105/// \brief Per-scan options for fragment(s) in a dataset.
106///
107/// These options are not intrinsic to the format or fragment itself, but do affect
108/// the results of a scan. These are options which make sense to change between
109/// repeated reads of the same dataset, such as format-specific conversion options
110/// (that do not affect the schema).
111///
112/// \ingroup dataset-scanning
113class ARROW_DS_EXPORT FragmentScanOptions {
114 public:
115 virtual std::string type_name() const = 0;
116 virtual std::string ToString() const { return type_name(); }
117 virtual ~FragmentScanOptions() = default;
118};
119
120/// \defgroup dataset-implementations Concrete implementations
121///
122/// @{
123
124/// \brief A trivial Fragment that yields ScanTask out of a fixed set of
125/// RecordBatch.
126class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
127 public:
128 InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
129 compute::Expression = compute::literal(true));
130 explicit InMemoryFragment(RecordBatchVector record_batches,
131 compute::Expression = compute::literal(true));
132
133 Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override;
134 Result<RecordBatchGenerator> ScanBatchesAsync(
135 const std::shared_ptr<ScanOptions>& options) override;
136 Future<util::optional<int64_t>> CountRows(
137 compute::Expression predicate,
138 const std::shared_ptr<ScanOptions>& options) override;
139
140 std::string type_name() const override { return "in-memory"; }
141
142 protected:
143 Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
144
145 RecordBatchVector record_batches_;
146};
147
148/// @}
149
150/// \brief A container of zero or more Fragments.
151///
152/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
153/// directory. A Dataset has a schema to which Fragments must align during a
154/// scan operation. This is analogous to Avro's reader and writer schema.
155class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
156 public:
157 /// \brief Begin to build a new Scan operation against this Dataset
158 Result<std::shared_ptr<ScannerBuilder>> NewScan();
159
160 /// \brief GetFragments returns an iterator of Fragments given a predicate.
161 Result<FragmentIterator> GetFragments(compute::Expression predicate);
162 Result<FragmentIterator> GetFragments();
163
164 const std::shared_ptr<Schema>& schema() const { return schema_; }
165
166 /// \brief An expression which evaluates to true for all data viewed by this Dataset.
167 /// May be null, which indicates no information is available.
168 const compute::Expression& partition_expression() const {
169 return partition_expression_;
170 }
171
172 /// \brief The name identifying the kind of Dataset
173 virtual std::string type_name() const = 0;
174
175 /// \brief Return a copy of this Dataset with a different schema.
176 ///
177 /// The copy will view the same Fragments. If the new schema is not compatible with the
178 /// original dataset's schema then an error will be raised.
179 virtual Result<std::shared_ptr<Dataset>> ReplaceSchema(
180 std::shared_ptr<Schema> schema) const = 0;
181
182 virtual ~Dataset() = default;
183
184 protected:
185 explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}
186
187 Dataset(std::shared_ptr<Schema> schema, compute::Expression partition_expression);
188
189 virtual Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) = 0;
190
191 std::shared_ptr<Schema> schema_;
192 compute::Expression partition_expression_ = compute::literal(true);
193};
194
195/// \addtogroup dataset-implementations
196///
197/// @{
198
199/// \brief A Source which yields fragments wrapping a stream of record batches.
200///
201/// The record batches must match the schema provided to the source at construction.
202class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
203 public:
204 class RecordBatchGenerator {
205 public:
206 virtual ~RecordBatchGenerator() = default;
207 virtual RecordBatchIterator Get() const = 0;
208 };
209
210 /// Construct a dataset from a schema and a factory of record batch iterators.
211 InMemoryDataset(std::shared_ptr<Schema> schema,
212 std::shared_ptr<RecordBatchGenerator> get_batches)
213 : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}
214
215 /// Convenience constructor taking a fixed list of batches
216 InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches);
217
218 /// Convenience constructor taking a Table
219 explicit InMemoryDataset(std::shared_ptr<Table> table);
220
221 std::string type_name() const override { return "in-memory"; }
222
223 Result<std::shared_ptr<Dataset>> ReplaceSchema(
224 std::shared_ptr<Schema> schema) const override;
225
226 protected:
227 Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
228
229 std::shared_ptr<RecordBatchGenerator> get_batches_;
230};
231
232/// \brief A Dataset wrapping child Datasets.
233class ARROW_DS_EXPORT UnionDataset : public Dataset {
234 public:
235 /// \brief Construct a UnionDataset wrapping child Datasets.
236 ///
237 /// \param[in] schema the schema of the resulting dataset.
238 /// \param[in] children one or more child Datasets. Their schemas must be identical to
239 /// schema.
240 static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema,
241 DatasetVector children);
242
243 const DatasetVector& children() const { return children_; }
244
245 std::string type_name() const override { return "union"; }
246
247 Result<std::shared_ptr<Dataset>> ReplaceSchema(
248 std::shared_ptr<Schema> schema) const override;
249
250 protected:
251 Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
252
253 explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
254 : Dataset(std::move(schema)), children_(std::move(children)) {}
255
256 DatasetVector children_;
257
258 friend class UnionDatasetFactory;
259};
260
261/// @}
262
263} // namespace dataset
264} // namespace arrow