1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
25 #include "arrow/dataset/dataset.h"
26 #include "arrow/dataset/file_base.h"
27 #include "arrow/dataset/type_fwd.h"
28 #include "arrow/record_batch.h"
29 #include "arrow/scalar.h"
30 #include "arrow/type.h"
31 #include "arrow/util/async_generator.h"
32 #include "arrow/util/checked_cast.h"
33 #include "arrow/util/iterator.h"
38 /// \brief GetFragmentsFromDatasets transforms a vector<Dataset> into a
39 /// flattened FragmentIterator.
40 inline Result
<FragmentIterator
> GetFragmentsFromDatasets(const DatasetVector
& datasets
,
41 compute::Expression predicate
) {
43 auto datasets_it
= MakeVectorIterator(datasets
);
45 // Dataset -> Iterator<Fragment>
46 auto fn
= [predicate
](std::shared_ptr
<Dataset
> dataset
) -> Result
<FragmentIterator
> {
47 return dataset
->GetFragments(predicate
);
50 // Iterator<Iterator<Fragment>>
51 auto fragments_it
= MakeMaybeMapIterator(fn
, std::move(datasets_it
));
54 return MakeFlattenIterator(std::move(fragments_it
));
57 inline std::shared_ptr
<Schema
> SchemaFromColumnNames(
58 const std::shared_ptr
<Schema
>& input
, const std::vector
<std::string
>& column_names
) {
59 std::vector
<std::shared_ptr
<Field
>> columns
;
60 for (FieldRef ref
: column_names
) {
61 auto maybe_field
= ref
.GetOne(*input
);
62 if (maybe_field
.ok()) {
63 columns
.push_back(std::move(maybe_field
).ValueOrDie());
67 return schema(std::move(columns
))->WithMetadata(input
->metadata());
70 /// Get fragment scan options of the expected type.
71 /// \return Fragment scan options if provided on the scan options, else the default
72 /// options if set, else a default-constructed value. If options are provided
73 /// but of the wrong type, an error is returned.
75 arrow::Result
<std::shared_ptr
<T
>> GetFragmentScanOptions(
76 const std::string
& type_name
, const ScanOptions
* scan_options
,
77 const std::shared_ptr
<FragmentScanOptions
>& default_options
) {
78 auto source
= default_options
;
79 if (scan_options
&& scan_options
->fragment_scan_options
) {
80 source
= scan_options
->fragment_scan_options
;
83 return std::make_shared
<T
>();
85 if (source
->type_name() != type_name
) {
86 return Status::Invalid("FragmentScanOptions of type ", source
->type_name(),
87 " were provided for scanning a fragment of type ", type_name
);
89 return ::arrow::internal::checked_pointer_cast
<T
>(source
);
92 class FragmentDataset
: public Dataset
{
94 FragmentDataset(std::shared_ptr
<Schema
> schema
, FragmentVector fragments
)
95 : Dataset(std::move(schema
)), fragments_(std::move(fragments
)) {}
97 FragmentDataset(std::shared_ptr
<Schema
> schema
,
98 AsyncGenerator
<std::shared_ptr
<Fragment
>> fragments
)
99 : Dataset(std::move(schema
)), fragment_gen_(std::move(fragments
)) {}
101 std::string
type_name() const override
{ return "fragment"; }
103 Result
<std::shared_ptr
<Dataset
>> ReplaceSchema(
104 std::shared_ptr
<Schema
> schema
) const override
{
105 return std::make_shared
<FragmentDataset
>(std::move(schema
), fragments_
);
109 Result
<FragmentIterator
> GetFragmentsImpl(compute::Expression predicate
) override
{
111 // TODO(ARROW-8163): Async fragment scanning can be forwarded rather than waiting
112 // for the whole generator here. For now, all Dataset impls have a vector of
114 auto fragments_fut
= CollectAsyncGenerator(std::move(fragment_gen_
));
115 ARROW_ASSIGN_OR_RAISE(fragments_
, fragments_fut
.result());
118 // TODO(ARROW-12891) Provide subtree pruning for any vector of fragments
119 FragmentVector fragments
;
120 for (const auto& fragment
: fragments_
) {
121 ARROW_ASSIGN_OR_RAISE(
122 auto simplified_filter
,
123 compute::SimplifyWithGuarantee(predicate
, fragment
->partition_expression()));
125 if (simplified_filter
.IsSatisfiable()) {
126 fragments
.push_back(fragment
);
129 return MakeVectorIterator(std::move(fragments
));
132 FragmentVector fragments_
;
133 AsyncGenerator
<std::shared_ptr
<Fragment
>> fragment_gen_
;
136 // Given a record batch generator, creates a new generator that slices
137 // batches so individual batches have at most batch_size rows. The
138 // resulting generator is async-reentrant, but does not forward
139 // reentrant pulls, so apply readahead before using this helper.
140 inline RecordBatchGenerator
MakeChunkedBatchGenerator(RecordBatchGenerator gen
,
141 int64_t batch_size
) {
142 return MakeFlatMappedGenerator(
144 [batch_size
](const std::shared_ptr
<RecordBatch
>& batch
)
145 -> ::arrow::AsyncGenerator
<std::shared_ptr
<::arrow::RecordBatch
>> {
146 const int64_t rows
= batch
->num_rows();
147 if (rows
<= batch_size
) {
148 return ::arrow::MakeVectorGenerator
<std::shared_ptr
<RecordBatch
>>({batch
});
150 std::vector
<std::shared_ptr
<RecordBatch
>> slices
;
151 slices
.reserve(rows
/ batch_size
+ (rows
% batch_size
!= 0));
152 for (int64_t i
= 0; i
< rows
; i
+= batch_size
) {
153 slices
.push_back(batch
->Slice(i
, batch_size
));
155 return ::arrow::MakeVectorGenerator(std::move(slices
));
159 } // namespace dataset