1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "benchmark/benchmark.h"
20 #include "arrow/api.h"
21 #include "arrow/compute/api.h"
22 #include "arrow/compute/exec/options.h"
23 #include "arrow/compute/exec/test_util.h"
24 #include "arrow/dataset/dataset.h"
25 #include "arrow/dataset/plan.h"
26 #include "arrow/dataset/scanner.h"
27 #include "arrow/dataset/test_util.h"
28 #include "arrow/testing/gtest_util.h"
29 #include "arrow/testing/matchers.h"
30 #include "arrow/testing/random.h"
35 constexpr auto kSeed
= 0x0ff1ce;
37 void GenerateBatchesFromSchema(const std::shared_ptr
<Schema
>& schema
, size_t num_batches
,
38 BatchesWithSchema
* out_batches
, int multiplicity
= 1,
39 int64_t batch_size
= 4) {
40 ::arrow::random::RandomArrayGenerator
rng_(kSeed
);
41 if (num_batches
== 0) {
42 auto empty_record_batch
= ExecBatch(*rng_
.BatchOf(schema
->fields(), 0));
43 out_batches
->batches
.push_back(empty_record_batch
);
45 for (size_t j
= 0; j
< num_batches
; j
++) {
46 out_batches
->batches
.push_back(
47 ExecBatch(*rng_
.BatchOf(schema
->fields(), batch_size
)));
51 size_t batch_count
= out_batches
->batches
.size();
52 for (int repeat
= 1; repeat
< multiplicity
; ++repeat
) {
53 for (size_t i
= 0; i
< batch_count
; ++i
) {
54 out_batches
->batches
.push_back(out_batches
->batches
[i
]);
57 out_batches
->schema
= schema
;
60 RecordBatchVector
GenerateBatches(const std::shared_ptr
<Schema
>& schema
,
61 size_t num_batches
, size_t batch_size
) {
62 BatchesWithSchema input_batches
;
64 RecordBatchVector batches
;
65 GenerateBatchesFromSchema(schema
, num_batches
, &input_batches
, 1, batch_size
);
67 for (const auto& batch
: input_batches
.batches
) {
68 batches
.push_back(batch
.ToRecordBatch(schema
).MoveValueUnsafe());
73 } // namespace compute
77 static std::map
<std::pair
<size_t, size_t>, RecordBatchVector
> datasets
;
79 void StoreBatches(size_t num_batches
, size_t batch_size
,
80 const RecordBatchVector
& batches
) {
81 datasets
[std::make_pair(num_batches
, batch_size
)] = batches
;
84 RecordBatchVector
GetBatches(size_t num_batches
, size_t batch_size
) {
85 auto iter
= datasets
.find(std::make_pair(num_batches
, batch_size
));
86 if (iter
== datasets
.end()) {
87 return RecordBatchVector
{};
92 std::shared_ptr
<Schema
> GetSchema() {
93 static std::shared_ptr
<Schema
> s
= schema({field("a", int32()), field("b", boolean())});
97 size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); }
99 void MinimalEndToEndScan(size_t num_batches
, size_t batch_size
, bool async_mode
) {
100 // NB: This test is here for didactic purposes
102 // Specify a MemoryPool and ThreadPool for the ExecPlan
103 compute::ExecContext
exec_context(default_memory_pool(),
104 ::arrow::internal::GetCpuThreadPool());
106 // ensure arrow::dataset node factories are in the registry
107 ::arrow::dataset::internal::Initialize();
109 // A ScanNode is constructed from an ExecPlan (into which it is inserted),
110 // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for
111 // predicate pushdown, a projection to skip materialization of unnecessary columns,
113 ASSERT_OK_AND_ASSIGN(std::shared_ptr
<compute::ExecPlan
> plan
,
114 compute::ExecPlan::Make(&exec_context
));
116 RecordBatchVector batches
= GetBatches(num_batches
, batch_size
);
118 std::shared_ptr
<Dataset
> dataset
=
119 std::make_shared
<InMemoryDataset
>(GetSchema(), batches
);
121 auto options
= std::make_shared
<ScanOptions
>();
122 // sync scanning is not supported by ScanNode
123 options
->use_async
= true;
124 // specify the filter
125 compute::Expression b_is_true
= field_ref("b");
126 options
->filter
= b_is_true
;
127 // for now, specify the projection as the full project expression (eventually this can
128 // just be a list of materialized field names)
129 compute::Expression a_times_2
= call("multiply", {field_ref("a"), literal(2)});
130 options
->projection
=
131 call("make_struct", {a_times_2
}, compute::MakeStructOptions
{{"a * 2"}});
133 // construct the scan node
134 ASSERT_OK_AND_ASSIGN(
135 compute::ExecNode
* scan
,
136 compute::MakeExecNode("scan", plan
.get(), {}, ScanNodeOptions
{dataset
, options
}));
138 // pipe the scan node into a filter node
139 ASSERT_OK_AND_ASSIGN(
140 compute::ExecNode
* filter
,
141 compute::MakeExecNode("filter", plan
.get(), {scan
},
142 compute::FilterNodeOptions
{b_is_true
, async_mode
}));
144 // pipe the filter node into a project node
145 // NB: we're using the project node factory which preserves fragment/batch index
146 // tagging, so we *can* reorder later if we choose. The tags will not appear in
148 ASSERT_OK_AND_ASSIGN(
149 compute::ExecNode
* project
,
150 compute::MakeExecNode("augmented_project", plan
.get(), {filter
},
151 compute::ProjectNodeOptions
{{a_times_2
}, {}, async_mode
}));
153 // finally, pipe the project node into a sink node
154 AsyncGenerator
<util::optional
<compute::ExecBatch
>> sink_gen
;
155 ASSERT_OK_AND_ASSIGN(compute::ExecNode
* sink
,
156 compute::MakeExecNode("sink", plan
.get(), {project
},
157 compute::SinkNodeOptions
{&sink_gen
}));
159 ASSERT_NE(sink
, nullptr);
161 // translate sink_gen (async) to sink_reader (sync)
162 std::shared_ptr
<RecordBatchReader
> sink_reader
= compute::MakeGeneratorReader(
163 schema({field("a * 2", int32())}), std::move(sink_gen
), exec_context
.memory_pool());
165 // start the ExecPlan
166 ASSERT_OK(plan
->StartProducing());
168 // collect sink_reader into a Table
169 ASSERT_OK_AND_ASSIGN(auto collected
, Table::FromRecordBatchReader(sink_reader
.get()));
171 ASSERT_GT(collected
->num_rows(), 0);
173 // wait 1s for completion
174 ASSERT_TRUE(plan
->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s";
177 static void MinimalEndToEndBench(benchmark::State
& state
) {
178 size_t num_batches
= state
.range(0);
179 size_t batch_size
= state
.range(1);
180 bool async_mode
= state
.range(2);
182 for (auto _
: state
) {
183 MinimalEndToEndScan(num_batches
, batch_size
, async_mode
);
185 state
.SetItemsProcessed(state
.iterations() * num_batches
);
186 state
.SetBytesProcessed(state
.iterations() * num_batches
* batch_size
*
187 GetBytesForSchema());
190 static const std::vector
<int32_t> kWorkload
= {100, 1000, 10000, 100000};
192 static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark
* b
) {
193 for (const int32_t num_batches
: kWorkload
) {
194 for (const int batch_size
: {10, 100, 1000}) {
195 for (const bool async_mode
: {true, false}) {
196 b
->Args({num_batches
, batch_size
, async_mode
});
197 RecordBatchVector batches
=
198 ::arrow::compute::GenerateBatches(GetSchema(), num_batches
, batch_size
);
199 StoreBatches(num_batches
, batch_size
, batches
);
203 b
->ArgNames({"num_batches", "batch_size", "async_mode"});
207 BENCHMARK(MinimalEndToEndBench
)->Apply(MinimalEndToEnd_Customize
);
209 } // namespace dataset