]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/dataset/scanner.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / scanner.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // This API is EXPERIMENTAL.
19
20 #pragma once
21
22 #include <functional>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "arrow/compute/exec/expression.h"
29 #include "arrow/compute/exec/options.h"
30 #include "arrow/compute/type_fwd.h"
31 #include "arrow/dataset/dataset.h"
32 #include "arrow/dataset/projector.h"
33 #include "arrow/dataset/type_fwd.h"
34 #include "arrow/dataset/visibility.h"
35 #include "arrow/io/interfaces.h"
36 #include "arrow/memory_pool.h"
37 #include "arrow/type_fwd.h"
38 #include "arrow/util/async_generator.h"
39 #include "arrow/util/iterator.h"
40 #include "arrow/util/thread_pool.h"
41 #include "arrow/util/type_fwd.h"
42
43 namespace arrow {
44
45 using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;
46
47 namespace dataset {
48
49 /// \defgroup dataset-scanning Scanning API
50 ///
51 /// @{
52
53 constexpr int64_t kDefaultBatchSize = 1 << 20;
54 constexpr int32_t kDefaultBatchReadahead = 32;
55 constexpr int32_t kDefaultFragmentReadahead = 8;
56 constexpr int32_t kDefaultBackpressureHigh = 64;
57 constexpr int32_t kDefaultBackpressureLow = 32;
58
59 /// Scan-specific options, which can be changed between scans of the same dataset.
60 struct ARROW_DS_EXPORT ScanOptions {
61 /// A row filter (which will be pushed down to partitioning/reading if supported).
62 compute::Expression filter = compute::literal(true);
63 /// A projection expression (which can add/remove/rename columns).
64 compute::Expression projection;
65
66 /// Schema with which batches will be read from fragments. This is also known as the
67 /// "reader schema" it will be used (for example) in constructing CSV file readers to
68 /// identify column types for parsing. Usually only a subset of its fields (see
69 /// MaterializedFields) will be materialized during a scan.
70 std::shared_ptr<Schema> dataset_schema;
71
72 /// Schema of projected record batches. This is independent of dataset_schema as its
73 /// fields are derived from the projection. For example, let
74 ///
75 /// dataset_schema = {"a": int32, "b": int32, "id": utf8}
76 /// projection = project({equal(field_ref("a"), field_ref("b"))}, {"a_plus_b"})
77 ///
78 /// (no filter specified). In this case, the projected_schema would be
79 ///
80 /// {"a_plus_b": int32}
81 std::shared_ptr<Schema> projected_schema;
82
83 /// Maximum row count for scanned batches.
84 int64_t batch_size = kDefaultBatchSize;
85
86 /// How many batches to read ahead within a file
87 ///
88 /// Set to 0 to disable batch readahead
89 ///
90 /// Note: May not be supported by all formats
91 /// Note: May not be supported by all scanners
92 /// Note: Will be ignored if use_threads is set to false
93 int32_t batch_readahead = kDefaultBatchReadahead;
94
95 /// How many files to read ahead
96 ///
97 /// Set to 0 to disable fragment readahead
98 ///
99 /// Note: May not be enforced by all scanners
100 /// Note: Will be ignored if use_threads is set to false
101 int32_t fragment_readahead = kDefaultFragmentReadahead;
102
103 /// A pool from which materialized and scanned arrays will be allocated.
104 MemoryPool* pool = arrow::default_memory_pool();
105
106 /// IOContext for any IO tasks
107 ///
108 /// Note: The IOContext executor will be ignored if use_threads is set to false
109 io::IOContext io_context;
110
111 /// If true the scanner will scan in parallel
112 ///
113 /// Note: If true, this will use threads from both the cpu_executor and the
114 /// io_context.executor
115 /// Note: This must be true in order for any readahead to happen
116 bool use_threads = false;
117
118 /// If true then an asycnhronous implementation of the scanner will be used.
119 /// This implementation is newer and generally performs better. However, it
120 /// makes extensive use of threading and is still considered experimental
121 bool use_async = false;
122
123 /// Fragment-specific scan options.
124 std::shared_ptr<FragmentScanOptions> fragment_scan_options;
125
126 // Return a vector of fields that requires materialization.
127 //
128 // This is usually the union of the fields referenced in the projection and the
129 // filter expression. Examples:
130 //
131 // - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"]
132 // - `SELECT a + b < 3 WHERE a > 1` => ["a", "b"]
133 //
134 // This is needed for expression where a field may not be directly
135 // used in the final projection but is still required to evaluate the
136 // expression.
137 //
138 // This is used by Fragment implementations to apply the column
139 // sub-selection optimization.
140 std::vector<std::string> MaterializedFields() const;
141
142 // Return a threaded or serial TaskGroup according to use_threads.
143 std::shared_ptr<::arrow::internal::TaskGroup> TaskGroup() const;
144 };
145
146 /// \brief Read record batches from a range of a single data fragment. A
147 /// ScanTask is meant to be a unit of work to be dispatched. The implementation
148 /// must be thread and concurrent safe.
149 class ARROW_DS_EXPORT ScanTask {
150 public:
151 /// \brief Iterate through sequence of materialized record batches
152 /// resulting from the Scan. Execution semantics are encapsulated in the
153 /// particular ScanTask implementation
154 virtual Result<RecordBatchIterator> Execute() = 0;
155 virtual Future<RecordBatchVector> SafeExecute(::arrow::internal::Executor* executor);
156 virtual Future<> SafeVisit(::arrow::internal::Executor* executor,
157 std::function<Status(std::shared_ptr<RecordBatch>)> visitor);
158
159 virtual ~ScanTask() = default;
160
161 const std::shared_ptr<ScanOptions>& options() const { return options_; }
162 const std::shared_ptr<Fragment>& fragment() const { return fragment_; }
163
164 protected:
165 ScanTask(std::shared_ptr<ScanOptions> options, std::shared_ptr<Fragment> fragment)
166 : options_(std::move(options)), fragment_(std::move(fragment)) {}
167
168 std::shared_ptr<ScanOptions> options_;
169 std::shared_ptr<Fragment> fragment_;
170 };
171
172 /// \brief Combines a record batch with the fragment that the record batch originated
173 /// from
174 ///
175 /// Knowing the source fragment can be useful for debugging & understanding loaded data
176 struct TaggedRecordBatch {
177 std::shared_ptr<RecordBatch> record_batch;
178 std::shared_ptr<Fragment> fragment;
179 };
180 using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
181 using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
182
183 /// \brief Combines a tagged batch with positional information
184 ///
185 /// This is returned when scanning batches in an unordered fashion. This information is
186 /// needed if you ever want to reassemble the batches in order
187 struct EnumeratedRecordBatch {
188 Enumerated<std::shared_ptr<RecordBatch>> record_batch;
189 Enumerated<std::shared_ptr<Fragment>> fragment;
190 };
191 using EnumeratedRecordBatchGenerator = std::function<Future<EnumeratedRecordBatch>()>;
192 using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
193
194 /// @}
195
196 } // namespace dataset
197
198 template <>
199 struct IterationTraits<dataset::TaggedRecordBatch> {
200 static dataset::TaggedRecordBatch End() {
201 return dataset::TaggedRecordBatch{NULLPTR, NULLPTR};
202 }
203 static bool IsEnd(const dataset::TaggedRecordBatch& val) {
204 return val.record_batch == NULLPTR;
205 }
206 };
207
208 template <>
209 struct IterationTraits<dataset::EnumeratedRecordBatch> {
210 static dataset::EnumeratedRecordBatch End() {
211 return dataset::EnumeratedRecordBatch{
212 IterationEnd<Enumerated<std::shared_ptr<RecordBatch>>>(),
213 IterationEnd<Enumerated<std::shared_ptr<dataset::Fragment>>>()};
214 }
215 static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
216 return IsIterationEnd(val.fragment);
217 }
218 };
219
220 namespace dataset {
221
222 /// \defgroup dataset-scanning Scanning API
223 ///
224 /// @{
225
226 /// \brief A scanner glues together several dataset classes to load in data.
227 /// The dataset contains a collection of fragments and partitioning rules.
228 ///
229 /// The fragments identify independently loadable units of data (i.e. each fragment has
230 /// a potentially unique schema and possibly even format. It should be possible to read
231 /// fragments in parallel if desired).
232 ///
233 /// The fragment's format contains the logic necessary to actually create a task to load
234 /// the fragment into memory. That task may or may not support parallel execution of
235 /// its own.
236 ///
237 /// The scanner is then responsible for creating scan tasks from every fragment in the
238 /// dataset and (potentially) sequencing the loaded record batches together.
239 ///
240 /// The scanner should not buffer the entire dataset in memory (unless asked) instead
241 /// yielding record batches as soon as they are ready to scan. Various readahead
242 /// properties control how much data is allowed to be scanned before pausing to let a
243 /// slow consumer catchup.
244 ///
245 /// Today the scanner also handles projection & filtering although that may change in
246 /// the future.
247 class ARROW_DS_EXPORT Scanner {
248 public:
249 virtual ~Scanner() = default;
250
251 /// \brief The Scan operator returns a stream of ScanTask. The caller is
252 /// responsible to dispatch/schedule said tasks. Tasks should be safe to run
253 /// in a concurrent fashion and outlive the iterator.
254 ///
255 /// Note: Not supported by the async scanner
256 /// Planned for removal from the public API in ARROW-11782.
257 ARROW_DEPRECATED("Deprecated in 4.0.0 for removal in 5.0.0. Use ScanBatches().")
258 virtual Result<ScanTaskIterator> Scan();
259
260 /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple threads
261 /// are used (via use_threads), the visitor will be invoked from those threads and is
262 /// responsible for any synchronization.
263 virtual Status Scan(std::function<Status(TaggedRecordBatch)> visitor) = 0;
264 /// \brief Convert a Scanner into a Table.
265 ///
266 /// Use this convenience utility with care. This will serially materialize the
267 /// Scan result in memory before creating the Table.
268 virtual Result<std::shared_ptr<Table>> ToTable() = 0;
269 /// \brief Scan the dataset into a stream of record batches. Each batch is tagged
270 /// with the fragment it originated from. The batches will arrive in order. The
271 /// order of fragments is determined by the dataset.
272 ///
273 /// Note: The scanner will perform some readahead but will avoid materializing too
274 /// much in memory (this is goverended by the readahead options and use_threads option).
275 /// If the readahead queue fills up then I/O will pause until the calling thread catches
276 /// up.
277 virtual Result<TaggedRecordBatchIterator> ScanBatches() = 0;
278 virtual Result<TaggedRecordBatchGenerator> ScanBatchesAsync() = 0;
279 /// \brief Scan the dataset into a stream of record batches. Unlike ScanBatches this
280 /// method may allow record batches to be returned out of order. This allows for more
281 /// efficient scanning: some fragments may be accessed more quickly than others (e.g.
282 /// may be cached in RAM or just happen to get scheduled earlier by the I/O)
283 ///
284 /// To make up for the out-of-order iteration each batch is further tagged with
285 /// positional information.
286 virtual Result<EnumeratedRecordBatchIterator> ScanBatchesUnordered();
287 virtual Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync() = 0;
288 /// \brief A convenience to synchronously load the given rows by index.
289 ///
290 /// Will only consume as many batches as needed from ScanBatches().
291 virtual Result<std::shared_ptr<Table>> TakeRows(const Array& indices);
292 /// \brief Get the first N rows.
293 virtual Result<std::shared_ptr<Table>> Head(int64_t num_rows);
294 /// \brief Count rows matching a predicate.
295 ///
296 /// This method will push down the predicate and compute the result based on fragment
297 /// metadata if possible.
298 virtual Result<int64_t> CountRows();
299 /// \brief Convert the Scanner to a RecordBatchReader so it can be
300 /// easily used with APIs that expect a reader.
301 Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader();
302
303 /// \brief Get the options for this scan.
304 const std::shared_ptr<ScanOptions>& options() const { return scan_options_; }
305 /// \brief Get the dataset that this scanner will scan
306 virtual const std::shared_ptr<Dataset>& dataset() const = 0;
307
308 protected:
309 explicit Scanner(std::shared_ptr<ScanOptions> scan_options)
310 : scan_options_(std::move(scan_options)) {}
311
312 Result<EnumeratedRecordBatchIterator> AddPositioningToInOrderScan(
313 TaggedRecordBatchIterator scan);
314
315 const std::shared_ptr<ScanOptions> scan_options_;
316 };
317
318 /// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
319 /// to pass information, notably a potential filter expression and a subset of
320 /// columns to materialize.
321 class ARROW_DS_EXPORT ScannerBuilder {
322 public:
323 explicit ScannerBuilder(std::shared_ptr<Dataset> dataset);
324
325 ScannerBuilder(std::shared_ptr<Dataset> dataset,
326 std::shared_ptr<ScanOptions> scan_options);
327
328 ScannerBuilder(std::shared_ptr<Schema> schema, std::shared_ptr<Fragment> fragment,
329 std::shared_ptr<ScanOptions> scan_options);
330
331 /// \brief Make a scanner from a record batch reader.
332 ///
333 /// The resulting scanner can be scanned only once. This is intended
334 /// to support writing data from streaming sources or other sources
335 /// that can be iterated only once.
336 static std::shared_ptr<ScannerBuilder> FromRecordBatchReader(
337 std::shared_ptr<RecordBatchReader> reader);
338
339 /// \brief Set the subset of columns to materialize.
340 ///
341 /// Columns which are not referenced may not be read from fragments.
342 ///
343 /// \param[in] columns list of columns to project. Order and duplicates will
344 /// be preserved.
345 ///
346 /// \return Failure if any column name does not exists in the dataset's
347 /// Schema.
348 Status Project(std::vector<std::string> columns);
349
350 /// \brief Set expressions which will be evaluated to produce the materialized
351 /// columns.
352 ///
353 /// Columns which are not referenced may not be read from fragments.
354 ///
355 /// \param[in] exprs expressions to evaluate to produce columns.
356 /// \param[in] names list of names for the resulting columns.
357 ///
358 /// \return Failure if any referenced column does not exists in the dataset's
359 /// Schema.
360 Status Project(std::vector<compute::Expression> exprs, std::vector<std::string> names);
361
362 /// \brief Set the filter expression to return only rows matching the filter.
363 ///
364 /// The predicate will be passed down to Sources and corresponding
365 /// Fragments to exploit predicate pushdown if possible using
366 /// partition information or Fragment internal metadata, e.g. Parquet statistics.
367 /// Columns which are not referenced may not be read from fragments.
368 ///
369 /// \param[in] filter expression to filter rows with.
370 ///
371 /// \return Failure if any referenced columns does not exist in the dataset's
372 /// Schema.
373 Status Filter(const compute::Expression& filter);
374
375 /// \brief Indicate if the Scanner should make use of the available
376 /// ThreadPool found in ScanOptions;
377 Status UseThreads(bool use_threads = true);
378
379 /// \brief Limit how many fragments the scanner will read at once
380 ///
381 /// Note: This is only enforced in "async" mode
382 Status FragmentReadahead(int fragment_readahead);
383
384 /// \brief Indicate if the Scanner should run in experimental "async" mode
385 ///
386 /// This mode should have considerably better performance on high-latency or parallel
387 /// filesystems but is still experimental
388 Status UseAsync(bool use_async = true);
389
390 /// \brief Set the maximum number of rows per RecordBatch.
391 ///
392 /// \param[in] batch_size the maximum number of rows.
393 /// \returns An error if the number for batch is not greater than 0.
394 ///
395 /// This option provides a control limiting the memory owned by any RecordBatch.
396 Status BatchSize(int64_t batch_size);
397
398 /// \brief Set the pool from which materialized and scanned arrays will be allocated.
399 Status Pool(MemoryPool* pool);
400
401 /// \brief Set fragment-specific scan options.
402 Status FragmentScanOptions(std::shared_ptr<FragmentScanOptions> fragment_scan_options);
403
404 /// \brief Return the constructed now-immutable Scanner object
405 Result<std::shared_ptr<Scanner>> Finish();
406
407 const std::shared_ptr<Schema>& schema() const;
408 const std::shared_ptr<Schema>& projected_schema() const;
409
410 private:
411 std::shared_ptr<Dataset> dataset_;
412 std::shared_ptr<ScanOptions> scan_options_ = std::make_shared<ScanOptions>();
413 };
414
415 /// \brief Construct a source ExecNode which yields batches from a dataset scan.
416 ///
417 /// Does not construct associated filter or project nodes.
418 /// Yielded batches will be augmented with fragment/batch indices to enable stable
419 /// ordering for simple ExecPlans.
420 class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions {
421 public:
422 explicit ScanNodeOptions(
423 std::shared_ptr<Dataset> dataset, std::shared_ptr<ScanOptions> scan_options,
424 std::shared_ptr<util::AsyncToggle> backpressure_toggle = NULLPTR,
425 bool require_sequenced_output = false)
426 : dataset(std::move(dataset)),
427 scan_options(std::move(scan_options)),
428 backpressure_toggle(std::move(backpressure_toggle)),
429 require_sequenced_output(require_sequenced_output) {}
430
431 std::shared_ptr<Dataset> dataset;
432 std::shared_ptr<ScanOptions> scan_options;
433 std::shared_ptr<util::AsyncToggle> backpressure_toggle;
434 bool require_sequenced_output;
435 };
436
437 /// @}
438
439 /// \brief A trivial ScanTask that yields the RecordBatch of an array.
440 class ARROW_DS_EXPORT InMemoryScanTask : public ScanTask {
441 public:
442 InMemoryScanTask(std::vector<std::shared_ptr<RecordBatch>> record_batches,
443 std::shared_ptr<ScanOptions> options,
444 std::shared_ptr<Fragment> fragment)
445 : ScanTask(std::move(options), std::move(fragment)),
446 record_batches_(std::move(record_batches)) {}
447
448 Result<RecordBatchIterator> Execute() override;
449
450 protected:
451 std::vector<std::shared_ptr<RecordBatch>> record_batches_;
452 };
453
454 namespace internal {
455 ARROW_DS_EXPORT void InitializeScanner(arrow::compute::ExecFactoryRegistry* registry);
456 } // namespace internal
457 } // namespace dataset
458 } // namespace arrow