1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 // This API is EXPERIMENTAL.
28 #include "arrow/compute/exec/expression.h"
29 #include "arrow/dataset/type_fwd.h"
30 #include "arrow/dataset/visibility.h"
31 #include "arrow/util/macros.h"
32 #include "arrow/util/mutex.h"
33 #include "arrow/util/optional.h"
38 using RecordBatchGenerator
= std::function
<Future
<std::shared_ptr
<RecordBatch
>>()>;
40 /// \brief A granular piece of a Dataset, such as an individual file.
42 /// A Fragment can be read/scanned separately from other fragments. It yields a
43 /// collection of RecordBatches when scanned, encapsulated in one or more
46 /// Note that Fragments have well defined physical schemas which are reconciled by
47 /// the Datasets which contain them; these physical schemas may differ from a parent
48 /// Dataset's schema and the physical schemas of sibling Fragments.
49 class ARROW_DS_EXPORT Fragment
: public std::enable_shared_from_this
<Fragment
> {
51 /// \brief Return the physical schema of the Fragment.
53 /// The physical schema is also called the writer schema.
54 /// This method is blocking and may suffer from high latency filesystem.
55 /// The schema is cached after being read once, or may be specified at construction.
56 Result
<std::shared_ptr
<Schema
>> ReadPhysicalSchema();
58 /// \brief Scan returns an iterator of ScanTasks, each of which yields
59 /// RecordBatches from this Fragment.
61 /// Note that batches yielded using this method will not be filtered and may not align
62 /// with the Fragment's schema. In particular, note that columns referenced by the
63 /// filter may be present in yielded batches even if they are not projected (so that
64 /// they are available when a filter is applied). Additionally, explicitly projected
65 /// columns may be absent if they were not present in this fragment.
67 /// To receive a record batch stream which is fully filtered and projected, use Scanner.
68 virtual Result
<ScanTaskIterator
> Scan(std::shared_ptr
<ScanOptions
> options
) = 0;
70 /// An asynchronous version of Scan
71 virtual Result
<RecordBatchGenerator
> ScanBatchesAsync(
72 const std::shared_ptr
<ScanOptions
>& options
) = 0;
74 /// \brief Count the number of rows in this fragment matching the filter using metadata
75 /// only. That is, this method may perform I/O, but will not load data.
77 /// If this is not possible, resolve with an empty optional. The fragment can perform
78 /// I/O (e.g. to read metadata) before it deciding whether it can satisfy the request.
79 virtual Future
<util::optional
<int64_t>> CountRows(
80 compute::Expression predicate
, const std::shared_ptr
<ScanOptions
>& options
);
82 virtual std::string
type_name() const = 0;
83 virtual std::string
ToString() const { return type_name(); }
85 /// \brief An expression which evaluates to true for all data viewed by this
87 const compute::Expression
& partition_expression() const {
88 return partition_expression_
;
91 virtual ~Fragment() = default;
95 explicit Fragment(compute::Expression partition_expression
,
96 std::shared_ptr
<Schema
> physical_schema
);
98 virtual Result
<std::shared_ptr
<Schema
>> ReadPhysicalSchemaImpl() = 0;
100 util::Mutex physical_schema_mutex_
;
101 compute::Expression partition_expression_
= compute::literal(true);
102 std::shared_ptr
<Schema
> physical_schema_
;
105 /// \brief Per-scan options for fragment(s) in a dataset.
107 /// These options are not intrinsic to the format or fragment itself, but do affect
108 /// the results of a scan. These are options which make sense to change between
109 /// repeated reads of the same dataset, such as format-specific conversion options
110 /// (that do not affect the schema).
112 /// \ingroup dataset-scanning
113 class ARROW_DS_EXPORT FragmentScanOptions
{
115 virtual std::string
type_name() const = 0;
116 virtual std::string
ToString() const { return type_name(); }
117 virtual ~FragmentScanOptions() = default;
120 /// \defgroup dataset-implementations Concrete implementations
124 /// \brief A trivial Fragment that yields ScanTask out of a fixed set of
126 class ARROW_DS_EXPORT InMemoryFragment
: public Fragment
{
128 InMemoryFragment(std::shared_ptr
<Schema
> schema
, RecordBatchVector record_batches
,
129 compute::Expression
= compute::literal(true));
130 explicit InMemoryFragment(RecordBatchVector record_batches
,
131 compute::Expression
= compute::literal(true));
133 Result
<ScanTaskIterator
> Scan(std::shared_ptr
<ScanOptions
> options
) override
;
134 Result
<RecordBatchGenerator
> ScanBatchesAsync(
135 const std::shared_ptr
<ScanOptions
>& options
) override
;
136 Future
<util::optional
<int64_t>> CountRows(
137 compute::Expression predicate
,
138 const std::shared_ptr
<ScanOptions
>& options
) override
;
140 std::string
type_name() const override
{ return "in-memory"; }
143 Result
<std::shared_ptr
<Schema
>> ReadPhysicalSchemaImpl() override
;
145 RecordBatchVector record_batches_
;
150 /// \brief A container of zero or more Fragments.
152 /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
153 /// directory. A Dataset has a schema to which Fragments must align during a
154 /// scan operation. This is analogous to Avro's reader and writer schema.
155 class ARROW_DS_EXPORT Dataset
: public std::enable_shared_from_this
<Dataset
> {
157 /// \brief Begin to build a new Scan operation against this Dataset
158 Result
<std::shared_ptr
<ScannerBuilder
>> NewScan();
160 /// \brief GetFragments returns an iterator of Fragments given a predicate.
161 Result
<FragmentIterator
> GetFragments(compute::Expression predicate
);
162 Result
<FragmentIterator
> GetFragments();
164 const std::shared_ptr
<Schema
>& schema() const { return schema_
; }
166 /// \brief An expression which evaluates to true for all data viewed by this Dataset.
167 /// May be null, which indicates no information is available.
168 const compute::Expression
& partition_expression() const {
169 return partition_expression_
;
172 /// \brief The name identifying the kind of Dataset
173 virtual std::string
type_name() const = 0;
175 /// \brief Return a copy of this Dataset with a different schema.
177 /// The copy will view the same Fragments. If the new schema is not compatible with the
178 /// original dataset's schema then an error will be raised.
179 virtual Result
<std::shared_ptr
<Dataset
>> ReplaceSchema(
180 std::shared_ptr
<Schema
> schema
) const = 0;
182 virtual ~Dataset() = default;
185 explicit Dataset(std::shared_ptr
<Schema
> schema
) : schema_(std::move(schema
)) {}
187 Dataset(std::shared_ptr
<Schema
> schema
, compute::Expression partition_expression
);
189 virtual Result
<FragmentIterator
> GetFragmentsImpl(compute::Expression predicate
) = 0;
191 std::shared_ptr
<Schema
> schema_
;
192 compute::Expression partition_expression_
= compute::literal(true);
195 /// \addtogroup dataset-implementations
199 /// \brief A Source which yields fragments wrapping a stream of record batches.
201 /// The record batches must match the schema provided to the source at construction.
202 class ARROW_DS_EXPORT InMemoryDataset
: public Dataset
{
204 class RecordBatchGenerator
{
206 virtual ~RecordBatchGenerator() = default;
207 virtual RecordBatchIterator
Get() const = 0;
210 /// Construct a dataset from a schema and a factory of record batch iterators.
211 InMemoryDataset(std::shared_ptr
<Schema
> schema
,
212 std::shared_ptr
<RecordBatchGenerator
> get_batches
)
213 : Dataset(std::move(schema
)), get_batches_(std::move(get_batches
)) {}
215 /// Convenience constructor taking a fixed list of batches
216 InMemoryDataset(std::shared_ptr
<Schema
> schema
, RecordBatchVector batches
);
218 /// Convenience constructor taking a Table
219 explicit InMemoryDataset(std::shared_ptr
<Table
> table
);
221 std::string
type_name() const override
{ return "in-memory"; }
223 Result
<std::shared_ptr
<Dataset
>> ReplaceSchema(
224 std::shared_ptr
<Schema
> schema
) const override
;
227 Result
<FragmentIterator
> GetFragmentsImpl(compute::Expression predicate
) override
;
229 std::shared_ptr
<RecordBatchGenerator
> get_batches_
;
232 /// \brief A Dataset wrapping child Datasets.
233 class ARROW_DS_EXPORT UnionDataset
: public Dataset
{
235 /// \brief Construct a UnionDataset wrapping child Datasets.
237 /// \param[in] schema the schema of the resulting dataset.
238 /// \param[in] children one or more child Datasets. Their schemas must be identical to
240 static Result
<std::shared_ptr
<UnionDataset
>> Make(std::shared_ptr
<Schema
> schema
,
241 DatasetVector children
);
243 const DatasetVector
& children() const { return children_
; }
245 std::string
type_name() const override
{ return "union"; }
247 Result
<std::shared_ptr
<Dataset
>> ReplaceSchema(
248 std::shared_ptr
<Schema
> schema
) const override
;
251 Result
<FragmentIterator
> GetFragmentsImpl(compute::Expression predicate
) override
;
253 explicit UnionDataset(std::shared_ptr
<Schema
> schema
, DatasetVector children
)
254 : Dataset(std::move(schema
)), children_(std::move(children
)) {}
256 DatasetVector children_
;
258 friend class UnionDatasetFactory
;
263 } // namespace dataset