1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 // This API is EXPERIMENTAL.
28 #include "arrow/compute/exec/expression.h"
29 #include "arrow/compute/exec/options.h"
30 #include "arrow/compute/type_fwd.h"
31 #include "arrow/dataset/dataset.h"
32 #include "arrow/dataset/projector.h"
33 #include "arrow/dataset/type_fwd.h"
34 #include "arrow/dataset/visibility.h"
35 #include "arrow/io/interfaces.h"
36 #include "arrow/memory_pool.h"
37 #include "arrow/type_fwd.h"
38 #include "arrow/util/async_generator.h"
39 #include "arrow/util/iterator.h"
40 #include "arrow/util/thread_pool.h"
41 #include "arrow/util/type_fwd.h"
45 using RecordBatchGenerator
= std::function
<Future
<std::shared_ptr
<RecordBatch
>>()>;
49 /// \defgroup dataset-scanning Scanning API
53 constexpr int64_t kDefaultBatchSize
= 1 << 20;
54 constexpr int32_t kDefaultBatchReadahead
= 32;
55 constexpr int32_t kDefaultFragmentReadahead
= 8;
56 constexpr int32_t kDefaultBackpressureHigh
= 64;
57 constexpr int32_t kDefaultBackpressureLow
= 32;
59 /// Scan-specific options, which can be changed between scans of the same dataset.
60 struct ARROW_DS_EXPORT ScanOptions
{
61 /// A row filter (which will be pushed down to partitioning/reading if supported).
62 compute::Expression filter
= compute::literal(true);
63 /// A projection expression (which can add/remove/rename columns).
64 compute::Expression projection
;
66 /// Schema with which batches will be read from fragments. This is also known as the
67 /// "reader schema" it will be used (for example) in constructing CSV file readers to
68 /// identify column types for parsing. Usually only a subset of its fields (see
69 /// MaterializedFields) will be materialized during a scan.
70 std::shared_ptr
<Schema
> dataset_schema
;
72 /// Schema of projected record batches. This is independent of dataset_schema as its
73 /// fields are derived from the projection. For example, let
75 /// dataset_schema = {"a": int32, "b": int32, "id": utf8}
76 /// projection = project({equal(field_ref("a"), field_ref("b"))}, {"a_plus_b"})
78 /// (no filter specified). In this case, the projected_schema would be
80 /// {"a_plus_b": int32}
81 std::shared_ptr
<Schema
> projected_schema
;
83 /// Maximum row count for scanned batches.
84 int64_t batch_size
= kDefaultBatchSize
;
86 /// How many batches to read ahead within a file
88 /// Set to 0 to disable batch readahead
90 /// Note: May not be supported by all formats
91 /// Note: May not be supported by all scanners
92 /// Note: Will be ignored if use_threads is set to false
93 int32_t batch_readahead
= kDefaultBatchReadahead
;
95 /// How many files to read ahead
97 /// Set to 0 to disable fragment readahead
99 /// Note: May not be enforced by all scanners
100 /// Note: Will be ignored if use_threads is set to false
101 int32_t fragment_readahead
= kDefaultFragmentReadahead
;
103 /// A pool from which materialized and scanned arrays will be allocated.
104 MemoryPool
* pool
= arrow::default_memory_pool();
106 /// IOContext for any IO tasks
108 /// Note: The IOContext executor will be ignored if use_threads is set to false
109 io::IOContext io_context
;
111 /// If true the scanner will scan in parallel
113 /// Note: If true, this will use threads from both the cpu_executor and the
114 /// io_context.executor
115 /// Note: This must be true in order for any readahead to happen
116 bool use_threads
= false;
118 /// If true then an asycnhronous implementation of the scanner will be used.
119 /// This implementation is newer and generally performs better. However, it
120 /// makes extensive use of threading and is still considered experimental
121 bool use_async
= false;
123 /// Fragment-specific scan options.
124 std::shared_ptr
<FragmentScanOptions
> fragment_scan_options
;
126 // Return a vector of fields that requires materialization.
128 // This is usually the union of the fields referenced in the projection and the
129 // filter expression. Examples:
131 // - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"]
132 // - `SELECT a + b < 3 WHERE a > 1` => ["a", "b"]
134 // This is needed for expression where a field may not be directly
135 // used in the final projection but is still required to evaluate the
138 // This is used by Fragment implementations to apply the column
139 // sub-selection optimization.
140 std::vector
<std::string
> MaterializedFields() const;
142 // Return a threaded or serial TaskGroup according to use_threads.
143 std::shared_ptr
<::arrow::internal::TaskGroup
> TaskGroup() const;
146 /// \brief Read record batches from a range of a single data fragment. A
147 /// ScanTask is meant to be a unit of work to be dispatched. The implementation
148 /// must be thread and concurrent safe.
149 class ARROW_DS_EXPORT ScanTask
{
151 /// \brief Iterate through sequence of materialized record batches
152 /// resulting from the Scan. Execution semantics are encapsulated in the
153 /// particular ScanTask implementation
154 virtual Result
<RecordBatchIterator
> Execute() = 0;
155 virtual Future
<RecordBatchVector
> SafeExecute(::arrow::internal::Executor
* executor
);
156 virtual Future
<> SafeVisit(::arrow::internal::Executor
* executor
,
157 std::function
<Status(std::shared_ptr
<RecordBatch
>)> visitor
);
159 virtual ~ScanTask() = default;
161 const std::shared_ptr
<ScanOptions
>& options() const { return options_
; }
162 const std::shared_ptr
<Fragment
>& fragment() const { return fragment_
; }
165 ScanTask(std::shared_ptr
<ScanOptions
> options
, std::shared_ptr
<Fragment
> fragment
)
166 : options_(std::move(options
)), fragment_(std::move(fragment
)) {}
168 std::shared_ptr
<ScanOptions
> options_
;
169 std::shared_ptr
<Fragment
> fragment_
;
172 /// \brief Combines a record batch with the fragment that the record batch originated
175 /// Knowing the source fragment can be useful for debugging & understanding loaded data
176 struct TaggedRecordBatch
{
177 std::shared_ptr
<RecordBatch
> record_batch
;
178 std::shared_ptr
<Fragment
> fragment
;
180 using TaggedRecordBatchGenerator
= std::function
<Future
<TaggedRecordBatch
>()>;
181 using TaggedRecordBatchIterator
= Iterator
<TaggedRecordBatch
>;
183 /// \brief Combines a tagged batch with positional information
185 /// This is returned when scanning batches in an unordered fashion. This information is
186 /// needed if you ever want to reassemble the batches in order
187 struct EnumeratedRecordBatch
{
188 Enumerated
<std::shared_ptr
<RecordBatch
>> record_batch
;
189 Enumerated
<std::shared_ptr
<Fragment
>> fragment
;
191 using EnumeratedRecordBatchGenerator
= std::function
<Future
<EnumeratedRecordBatch
>()>;
192 using EnumeratedRecordBatchIterator
= Iterator
<EnumeratedRecordBatch
>;
196 } // namespace dataset
199 struct IterationTraits
<dataset::TaggedRecordBatch
> {
200 static dataset::TaggedRecordBatch
End() {
201 return dataset::TaggedRecordBatch
{NULLPTR
, NULLPTR
};
203 static bool IsEnd(const dataset::TaggedRecordBatch
& val
) {
204 return val
.record_batch
== NULLPTR
;
209 struct IterationTraits
<dataset::EnumeratedRecordBatch
> {
210 static dataset::EnumeratedRecordBatch
End() {
211 return dataset::EnumeratedRecordBatch
{
212 IterationEnd
<Enumerated
<std::shared_ptr
<RecordBatch
>>>(),
213 IterationEnd
<Enumerated
<std::shared_ptr
<dataset::Fragment
>>>()};
215 static bool IsEnd(const dataset::EnumeratedRecordBatch
& val
) {
216 return IsIterationEnd(val
.fragment
);
222 /// \defgroup dataset-scanning Scanning API
226 /// \brief A scanner glues together several dataset classes to load in data.
227 /// The dataset contains a collection of fragments and partitioning rules.
229 /// The fragments identify independently loadable units of data (i.e. each fragment has
230 /// a potentially unique schema and possibly even format. It should be possible to read
231 /// fragments in parallel if desired).
233 /// The fragment's format contains the logic necessary to actually create a task to load
234 /// the fragment into memory. That task may or may not support parallel execution of
237 /// The scanner is then responsible for creating scan tasks from every fragment in the
238 /// dataset and (potentially) sequencing the loaded record batches together.
240 /// The scanner should not buffer the entire dataset in memory (unless asked) instead
241 /// yielding record batches as soon as they are ready to scan. Various readahead
242 /// properties control how much data is allowed to be scanned before pausing to let a
243 /// slow consumer catchup.
245 /// Today the scanner also handles projection & filtering although that may change in
247 class ARROW_DS_EXPORT Scanner
{
249 virtual ~Scanner() = default;
251 /// \brief The Scan operator returns a stream of ScanTask. The caller is
252 /// responsible to dispatch/schedule said tasks. Tasks should be safe to run
253 /// in a concurrent fashion and outlive the iterator.
255 /// Note: Not supported by the async scanner
256 /// Planned for removal from the public API in ARROW-11782.
257 ARROW_DEPRECATED("Deprecated in 4.0.0 for removal in 5.0.0. Use ScanBatches().")
258 virtual Result
<ScanTaskIterator
> Scan();
260 /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple threads
261 /// are used (via use_threads), the visitor will be invoked from those threads and is
262 /// responsible for any synchronization.
263 virtual Status
Scan(std::function
<Status(TaggedRecordBatch
)> visitor
) = 0;
264 /// \brief Convert a Scanner into a Table.
266 /// Use this convenience utility with care. This will serially materialize the
267 /// Scan result in memory before creating the Table.
268 virtual Result
<std::shared_ptr
<Table
>> ToTable() = 0;
269 /// \brief Scan the dataset into a stream of record batches. Each batch is tagged
270 /// with the fragment it originated from. The batches will arrive in order. The
271 /// order of fragments is determined by the dataset.
273 /// Note: The scanner will perform some readahead but will avoid materializing too
274 /// much in memory (this is goverended by the readahead options and use_threads option).
275 /// If the readahead queue fills up then I/O will pause until the calling thread catches
277 virtual Result
<TaggedRecordBatchIterator
> ScanBatches() = 0;
278 virtual Result
<TaggedRecordBatchGenerator
> ScanBatchesAsync() = 0;
279 /// \brief Scan the dataset into a stream of record batches. Unlike ScanBatches this
280 /// method may allow record batches to be returned out of order. This allows for more
281 /// efficient scanning: some fragments may be accessed more quickly than others (e.g.
282 /// may be cached in RAM or just happen to get scheduled earlier by the I/O)
284 /// To make up for the out-of-order iteration each batch is further tagged with
285 /// positional information.
286 virtual Result
<EnumeratedRecordBatchIterator
> ScanBatchesUnordered();
287 virtual Result
<EnumeratedRecordBatchGenerator
> ScanBatchesUnorderedAsync() = 0;
288 /// \brief A convenience to synchronously load the given rows by index.
290 /// Will only consume as many batches as needed from ScanBatches().
291 virtual Result
<std::shared_ptr
<Table
>> TakeRows(const Array
& indices
);
292 /// \brief Get the first N rows.
293 virtual Result
<std::shared_ptr
<Table
>> Head(int64_t num_rows
);
294 /// \brief Count rows matching a predicate.
296 /// This method will push down the predicate and compute the result based on fragment
297 /// metadata if possible.
298 virtual Result
<int64_t> CountRows();
299 /// \brief Convert the Scanner to a RecordBatchReader so it can be
300 /// easily used with APIs that expect a reader.
301 Result
<std::shared_ptr
<RecordBatchReader
>> ToRecordBatchReader();
303 /// \brief Get the options for this scan.
304 const std::shared_ptr
<ScanOptions
>& options() const { return scan_options_
; }
305 /// \brief Get the dataset that this scanner will scan
306 virtual const std::shared_ptr
<Dataset
>& dataset() const = 0;
309 explicit Scanner(std::shared_ptr
<ScanOptions
> scan_options
)
310 : scan_options_(std::move(scan_options
)) {}
312 Result
<EnumeratedRecordBatchIterator
> AddPositioningToInOrderScan(
313 TaggedRecordBatchIterator scan
);
315 const std::shared_ptr
<ScanOptions
> scan_options_
;
318 /// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
319 /// to pass information, notably a potential filter expression and a subset of
320 /// columns to materialize.
321 class ARROW_DS_EXPORT ScannerBuilder
{
323 explicit ScannerBuilder(std::shared_ptr
<Dataset
> dataset
);
325 ScannerBuilder(std::shared_ptr
<Dataset
> dataset
,
326 std::shared_ptr
<ScanOptions
> scan_options
);
328 ScannerBuilder(std::shared_ptr
<Schema
> schema
, std::shared_ptr
<Fragment
> fragment
,
329 std::shared_ptr
<ScanOptions
> scan_options
);
331 /// \brief Make a scanner from a record batch reader.
333 /// The resulting scanner can be scanned only once. This is intended
334 /// to support writing data from streaming sources or other sources
335 /// that can be iterated only once.
336 static std::shared_ptr
<ScannerBuilder
> FromRecordBatchReader(
337 std::shared_ptr
<RecordBatchReader
> reader
);
339 /// \brief Set the subset of columns to materialize.
341 /// Columns which are not referenced may not be read from fragments.
343 /// \param[in] columns list of columns to project. Order and duplicates will
346 /// \return Failure if any column name does not exists in the dataset's
348 Status
Project(std::vector
<std::string
> columns
);
350 /// \brief Set expressions which will be evaluated to produce the materialized
353 /// Columns which are not referenced may not be read from fragments.
355 /// \param[in] exprs expressions to evaluate to produce columns.
356 /// \param[in] names list of names for the resulting columns.
358 /// \return Failure if any referenced column does not exists in the dataset's
360 Status
Project(std::vector
<compute::Expression
> exprs
, std::vector
<std::string
> names
);
362 /// \brief Set the filter expression to return only rows matching the filter.
364 /// The predicate will be passed down to Sources and corresponding
365 /// Fragments to exploit predicate pushdown if possible using
366 /// partition information or Fragment internal metadata, e.g. Parquet statistics.
367 /// Columns which are not referenced may not be read from fragments.
369 /// \param[in] filter expression to filter rows with.
371 /// \return Failure if any referenced columns does not exist in the dataset's
373 Status
Filter(const compute::Expression
& filter
);
375 /// \brief Indicate if the Scanner should make use of the available
376 /// ThreadPool found in ScanOptions;
377 Status
UseThreads(bool use_threads
= true);
379 /// \brief Limit how many fragments the scanner will read at once
381 /// Note: This is only enforced in "async" mode
382 Status
FragmentReadahead(int fragment_readahead
);
384 /// \brief Indicate if the Scanner should run in experimental "async" mode
386 /// This mode should have considerably better performance on high-latency or parallel
387 /// filesystems but is still experimental
388 Status
UseAsync(bool use_async
= true);
390 /// \brief Set the maximum number of rows per RecordBatch.
392 /// \param[in] batch_size the maximum number of rows.
393 /// \returns An error if the number for batch is not greater than 0.
395 /// This option provides a control limiting the memory owned by any RecordBatch.
396 Status
BatchSize(int64_t batch_size
);
398 /// \brief Set the pool from which materialized and scanned arrays will be allocated.
399 Status
Pool(MemoryPool
* pool
);
401 /// \brief Set fragment-specific scan options.
402 Status
FragmentScanOptions(std::shared_ptr
<FragmentScanOptions
> fragment_scan_options
);
404 /// \brief Return the constructed now-immutable Scanner object
405 Result
<std::shared_ptr
<Scanner
>> Finish();
407 const std::shared_ptr
<Schema
>& schema() const;
408 const std::shared_ptr
<Schema
>& projected_schema() const;
411 std::shared_ptr
<Dataset
> dataset_
;
412 std::shared_ptr
<ScanOptions
> scan_options_
= std::make_shared
<ScanOptions
>();
415 /// \brief Construct a source ExecNode which yields batches from a dataset scan.
417 /// Does not construct associated filter or project nodes.
418 /// Yielded batches will be augmented with fragment/batch indices to enable stable
419 /// ordering for simple ExecPlans.
420 class ARROW_DS_EXPORT ScanNodeOptions
: public compute::ExecNodeOptions
{
422 explicit ScanNodeOptions(
423 std::shared_ptr
<Dataset
> dataset
, std::shared_ptr
<ScanOptions
> scan_options
,
424 std::shared_ptr
<util::AsyncToggle
> backpressure_toggle
= NULLPTR
,
425 bool require_sequenced_output
= false)
426 : dataset(std::move(dataset
)),
427 scan_options(std::move(scan_options
)),
428 backpressure_toggle(std::move(backpressure_toggle
)),
429 require_sequenced_output(require_sequenced_output
) {}
431 std::shared_ptr
<Dataset
> dataset
;
432 std::shared_ptr
<ScanOptions
> scan_options
;
433 std::shared_ptr
<util::AsyncToggle
> backpressure_toggle
;
434 bool require_sequenced_output
;
439 /// \brief A trivial ScanTask that yields the RecordBatch of an array.
440 class ARROW_DS_EXPORT InMemoryScanTask
: public ScanTask
{
442 InMemoryScanTask(std::vector
<std::shared_ptr
<RecordBatch
>> record_batches
,
443 std::shared_ptr
<ScanOptions
> options
,
444 std::shared_ptr
<Fragment
> fragment
)
445 : ScanTask(std::move(options
), std::move(fragment
)),
446 record_batches_(std::move(record_batches
)) {}
448 Result
<RecordBatchIterator
> Execute() override
;
451 std::vector
<std::shared_ptr
<RecordBatch
>> record_batches_
;
455 ARROW_DS_EXPORT
void InitializeScanner(arrow::compute::ExecFactoryRegistry
* registry
);
456 } // namespace internal
457 } // namespace dataset