]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "benchmark/benchmark.h" | |
19 | ||
20 | #include "arrow/api.h" | |
21 | #include "arrow/compute/api.h" | |
22 | #include "arrow/compute/exec/options.h" | |
23 | #include "arrow/compute/exec/test_util.h" | |
24 | #include "arrow/dataset/dataset.h" | |
25 | #include "arrow/dataset/plan.h" | |
26 | #include "arrow/dataset/scanner.h" | |
27 | #include "arrow/dataset/test_util.h" | |
28 | #include "arrow/testing/gtest_util.h" | |
29 | #include "arrow/testing/matchers.h" | |
30 | #include "arrow/testing/random.h" | |
31 | ||
32 | namespace arrow { | |
33 | namespace compute { | |
34 | ||
35 | constexpr auto kSeed = 0x0ff1ce; | |
36 | ||
37 | void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema, size_t num_batches, | |
38 | BatchesWithSchema* out_batches, int multiplicity = 1, | |
39 | int64_t batch_size = 4) { | |
40 | ::arrow::random::RandomArrayGenerator rng_(kSeed); | |
41 | if (num_batches == 0) { | |
42 | auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0)); | |
43 | out_batches->batches.push_back(empty_record_batch); | |
44 | } else { | |
45 | for (size_t j = 0; j < num_batches; j++) { | |
46 | out_batches->batches.push_back( | |
47 | ExecBatch(*rng_.BatchOf(schema->fields(), batch_size))); | |
48 | } | |
49 | } | |
50 | ||
51 | size_t batch_count = out_batches->batches.size(); | |
52 | for (int repeat = 1; repeat < multiplicity; ++repeat) { | |
53 | for (size_t i = 0; i < batch_count; ++i) { | |
54 | out_batches->batches.push_back(out_batches->batches[i]); | |
55 | } | |
56 | } | |
57 | out_batches->schema = schema; | |
58 | } | |
59 | ||
60 | RecordBatchVector GenerateBatches(const std::shared_ptr<Schema>& schema, | |
61 | size_t num_batches, size_t batch_size) { | |
62 | BatchesWithSchema input_batches; | |
63 | ||
64 | RecordBatchVector batches; | |
65 | GenerateBatchesFromSchema(schema, num_batches, &input_batches, 1, batch_size); | |
66 | ||
67 | for (const auto& batch : input_batches.batches) { | |
68 | batches.push_back(batch.ToRecordBatch(schema).MoveValueUnsafe()); | |
69 | } | |
70 | return batches; | |
71 | } | |
72 | ||
73 | } // namespace compute | |
74 | ||
75 | namespace dataset { | |
76 | ||
77 | static std::map<std::pair<size_t, size_t>, RecordBatchVector> datasets; | |
78 | ||
79 | void StoreBatches(size_t num_batches, size_t batch_size, | |
80 | const RecordBatchVector& batches) { | |
81 | datasets[std::make_pair(num_batches, batch_size)] = batches; | |
82 | } | |
83 | ||
84 | RecordBatchVector GetBatches(size_t num_batches, size_t batch_size) { | |
85 | auto iter = datasets.find(std::make_pair(num_batches, batch_size)); | |
86 | if (iter == datasets.end()) { | |
87 | return RecordBatchVector{}; | |
88 | } | |
89 | return iter->second; | |
90 | } | |
91 | ||
92 | std::shared_ptr<Schema> GetSchema() { | |
93 | static std::shared_ptr<Schema> s = schema({field("a", int32()), field("b", boolean())}); | |
94 | return s; | |
95 | } | |
96 | ||
97 | size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); } | |
98 | ||
99 | void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) { | |
100 | // NB: This test is here for didactic purposes | |
101 | ||
102 | // Specify a MemoryPool and ThreadPool for the ExecPlan | |
103 | compute::ExecContext exec_context(default_memory_pool(), | |
104 | ::arrow::internal::GetCpuThreadPool()); | |
105 | ||
106 | // ensure arrow::dataset node factories are in the registry | |
107 | ::arrow::dataset::internal::Initialize(); | |
108 | ||
109 | // A ScanNode is constructed from an ExecPlan (into which it is inserted), | |
110 | // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for | |
111 | // predicate pushdown, a projection to skip materialization of unnecessary columns, | |
112 | // ...) | |
113 | ASSERT_OK_AND_ASSIGN(std::shared_ptr<compute::ExecPlan> plan, | |
114 | compute::ExecPlan::Make(&exec_context)); | |
115 | ||
116 | RecordBatchVector batches = GetBatches(num_batches, batch_size); | |
117 | ||
118 | std::shared_ptr<Dataset> dataset = | |
119 | std::make_shared<InMemoryDataset>(GetSchema(), batches); | |
120 | ||
121 | auto options = std::make_shared<ScanOptions>(); | |
122 | // sync scanning is not supported by ScanNode | |
123 | options->use_async = true; | |
124 | // specify the filter | |
125 | compute::Expression b_is_true = field_ref("b"); | |
126 | options->filter = b_is_true; | |
127 | // for now, specify the projection as the full project expression (eventually this can | |
128 | // just be a list of materialized field names) | |
129 | compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); | |
130 | options->projection = | |
131 | call("make_struct", {a_times_2}, compute::MakeStructOptions{{"a * 2"}}); | |
132 | ||
133 | // construct the scan node | |
134 | ASSERT_OK_AND_ASSIGN( | |
135 | compute::ExecNode * scan, | |
136 | compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); | |
137 | ||
138 | // pipe the scan node into a filter node | |
139 | ASSERT_OK_AND_ASSIGN( | |
140 | compute::ExecNode * filter, | |
141 | compute::MakeExecNode("filter", plan.get(), {scan}, | |
142 | compute::FilterNodeOptions{b_is_true, async_mode})); | |
143 | ||
144 | // pipe the filter node into a project node | |
145 | // NB: we're using the project node factory which preserves fragment/batch index | |
146 | // tagging, so we *can* reorder later if we choose. The tags will not appear in | |
147 | // our output. | |
148 | ASSERT_OK_AND_ASSIGN( | |
149 | compute::ExecNode * project, | |
150 | compute::MakeExecNode("augmented_project", plan.get(), {filter}, | |
151 | compute::ProjectNodeOptions{{a_times_2}, {}, async_mode})); | |
152 | ||
153 | // finally, pipe the project node into a sink node | |
154 | AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen; | |
155 | ASSERT_OK_AND_ASSIGN(compute::ExecNode * sink, | |
156 | compute::MakeExecNode("sink", plan.get(), {project}, | |
157 | compute::SinkNodeOptions{&sink_gen})); | |
158 | ||
159 | ASSERT_NE(sink, nullptr); | |
160 | ||
161 | // translate sink_gen (async) to sink_reader (sync) | |
162 | std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader( | |
163 | schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); | |
164 | ||
165 | // start the ExecPlan | |
166 | ASSERT_OK(plan->StartProducing()); | |
167 | ||
168 | // collect sink_reader into a Table | |
169 | ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); | |
170 | ||
171 | ASSERT_GT(collected->num_rows(), 0); | |
172 | ||
173 | // wait 1s for completion | |
174 | ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; | |
175 | } | |
176 | ||
177 | static void MinimalEndToEndBench(benchmark::State& state) { | |
178 | size_t num_batches = state.range(0); | |
179 | size_t batch_size = state.range(1); | |
180 | bool async_mode = state.range(2); | |
181 | ||
182 | for (auto _ : state) { | |
183 | MinimalEndToEndScan(num_batches, batch_size, async_mode); | |
184 | } | |
185 | state.SetItemsProcessed(state.iterations() * num_batches); | |
186 | state.SetBytesProcessed(state.iterations() * num_batches * batch_size * | |
187 | GetBytesForSchema()); | |
188 | } | |
189 | ||
190 | static const std::vector<int32_t> kWorkload = {100, 1000, 10000, 100000}; | |
191 | ||
192 | static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark* b) { | |
193 | for (const int32_t num_batches : kWorkload) { | |
194 | for (const int batch_size : {10, 100, 1000}) { | |
195 | for (const bool async_mode : {true, false}) { | |
196 | b->Args({num_batches, batch_size, async_mode}); | |
197 | RecordBatchVector batches = | |
198 | ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size); | |
199 | StoreBatches(num_batches, batch_size, batches); | |
200 | } | |
201 | } | |
202 | } | |
203 | b->ArgNames({"num_batches", "batch_size", "async_mode"}); | |
204 | b->UseRealTime(); | |
205 | } | |
206 | ||
207 | BENCHMARK(MinimalEndToEndBench)->Apply(MinimalEndToEnd_Customize); | |
208 | ||
209 | } // namespace dataset | |
210 | } // namespace arrow |