]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/dataset/dataset_internal.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / dataset_internal.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #pragma once
19
20 #include <memory>
21 #include <string>
22 #include <utility>
23 #include <vector>
24
25 #include "arrow/dataset/dataset.h"
26 #include "arrow/dataset/file_base.h"
27 #include "arrow/dataset/type_fwd.h"
28 #include "arrow/record_batch.h"
29 #include "arrow/scalar.h"
30 #include "arrow/type.h"
31 #include "arrow/util/async_generator.h"
32 #include "arrow/util/checked_cast.h"
33 #include "arrow/util/iterator.h"
34
35 namespace arrow {
36 namespace dataset {
37
38 /// \brief GetFragmentsFromDatasets transforms a vector<Dataset> into a
39 /// flattened FragmentIterator.
40 inline Result<FragmentIterator> GetFragmentsFromDatasets(const DatasetVector& datasets,
41 compute::Expression predicate) {
42 // Iterator<Dataset>
43 auto datasets_it = MakeVectorIterator(datasets);
44
45 // Dataset -> Iterator<Fragment>
46 auto fn = [predicate](std::shared_ptr<Dataset> dataset) -> Result<FragmentIterator> {
47 return dataset->GetFragments(predicate);
48 };
49
50 // Iterator<Iterator<Fragment>>
51 auto fragments_it = MakeMaybeMapIterator(fn, std::move(datasets_it));
52
53 // Iterator<Fragment>
54 return MakeFlattenIterator(std::move(fragments_it));
55 }
56
57 inline std::shared_ptr<Schema> SchemaFromColumnNames(
58 const std::shared_ptr<Schema>& input, const std::vector<std::string>& column_names) {
59 std::vector<std::shared_ptr<Field>> columns;
60 for (FieldRef ref : column_names) {
61 auto maybe_field = ref.GetOne(*input);
62 if (maybe_field.ok()) {
63 columns.push_back(std::move(maybe_field).ValueOrDie());
64 }
65 }
66
67 return schema(std::move(columns))->WithMetadata(input->metadata());
68 }
69
70 /// Get fragment scan options of the expected type.
71 /// \return Fragment scan options if provided on the scan options, else the default
72 /// options if set, else a default-constructed value. If options are provided
73 /// but of the wrong type, an error is returned.
74 template <typename T>
75 arrow::Result<std::shared_ptr<T>> GetFragmentScanOptions(
76 const std::string& type_name, const ScanOptions* scan_options,
77 const std::shared_ptr<FragmentScanOptions>& default_options) {
78 auto source = default_options;
79 if (scan_options && scan_options->fragment_scan_options) {
80 source = scan_options->fragment_scan_options;
81 }
82 if (!source) {
83 return std::make_shared<T>();
84 }
85 if (source->type_name() != type_name) {
86 return Status::Invalid("FragmentScanOptions of type ", source->type_name(),
87 " were provided for scanning a fragment of type ", type_name);
88 }
89 return ::arrow::internal::checked_pointer_cast<T>(source);
90 }
91
92 class FragmentDataset : public Dataset {
93 public:
94 FragmentDataset(std::shared_ptr<Schema> schema, FragmentVector fragments)
95 : Dataset(std::move(schema)), fragments_(std::move(fragments)) {}
96
97 FragmentDataset(std::shared_ptr<Schema> schema,
98 AsyncGenerator<std::shared_ptr<Fragment>> fragments)
99 : Dataset(std::move(schema)), fragment_gen_(std::move(fragments)) {}
100
101 std::string type_name() const override { return "fragment"; }
102
103 Result<std::shared_ptr<Dataset>> ReplaceSchema(
104 std::shared_ptr<Schema> schema) const override {
105 return std::make_shared<FragmentDataset>(std::move(schema), fragments_);
106 }
107
108 protected:
109 Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override {
110 if (fragment_gen_) {
111 // TODO(ARROW-8163): Async fragment scanning can be forwarded rather than waiting
112 // for the whole generator here. For now, all Dataset impls have a vector of
113 // Fragments anyway
114 auto fragments_fut = CollectAsyncGenerator(std::move(fragment_gen_));
115 ARROW_ASSIGN_OR_RAISE(fragments_, fragments_fut.result());
116 }
117
118 // TODO(ARROW-12891) Provide subtree pruning for any vector of fragments
119 FragmentVector fragments;
120 for (const auto& fragment : fragments_) {
121 ARROW_ASSIGN_OR_RAISE(
122 auto simplified_filter,
123 compute::SimplifyWithGuarantee(predicate, fragment->partition_expression()));
124
125 if (simplified_filter.IsSatisfiable()) {
126 fragments.push_back(fragment);
127 }
128 }
129 return MakeVectorIterator(std::move(fragments));
130 }
131
132 FragmentVector fragments_;
133 AsyncGenerator<std::shared_ptr<Fragment>> fragment_gen_;
134 };
135
136 // Given a record batch generator, creates a new generator that slices
137 // batches so individual batches have at most batch_size rows. The
138 // resulting generator is async-reentrant, but does not forward
139 // reentrant pulls, so apply readahead before using this helper.
140 inline RecordBatchGenerator MakeChunkedBatchGenerator(RecordBatchGenerator gen,
141 int64_t batch_size) {
142 return MakeFlatMappedGenerator(
143 std::move(gen),
144 [batch_size](const std::shared_ptr<RecordBatch>& batch)
145 -> ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>> {
146 const int64_t rows = batch->num_rows();
147 if (rows <= batch_size) {
148 return ::arrow::MakeVectorGenerator<std::shared_ptr<RecordBatch>>({batch});
149 }
150 std::vector<std::shared_ptr<RecordBatch>> slices;
151 slices.reserve(rows / batch_size + (rows % batch_size != 0));
152 for (int64_t i = 0; i < rows; i += batch_size) {
153 slices.push_back(batch->Slice(i, batch_size));
154 }
155 return ::arrow::MakeVectorGenerator(std::move(slices));
156 });
157 }
158
159 } // namespace dataset
160 } // namespace arrow