]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/dataset/scanner_benchmark.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / scanner_benchmark.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "benchmark/benchmark.h"
19
20#include "arrow/api.h"
21#include "arrow/compute/api.h"
22#include "arrow/compute/exec/options.h"
23#include "arrow/compute/exec/test_util.h"
24#include "arrow/dataset/dataset.h"
25#include "arrow/dataset/plan.h"
26#include "arrow/dataset/scanner.h"
27#include "arrow/dataset/test_util.h"
28#include "arrow/testing/gtest_util.h"
29#include "arrow/testing/matchers.h"
30#include "arrow/testing/random.h"
31
32namespace arrow {
33namespace compute {
34
35constexpr auto kSeed = 0x0ff1ce;
36
37void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema, size_t num_batches,
38 BatchesWithSchema* out_batches, int multiplicity = 1,
39 int64_t batch_size = 4) {
40 ::arrow::random::RandomArrayGenerator rng_(kSeed);
41 if (num_batches == 0) {
42 auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
43 out_batches->batches.push_back(empty_record_batch);
44 } else {
45 for (size_t j = 0; j < num_batches; j++) {
46 out_batches->batches.push_back(
47 ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
48 }
49 }
50
51 size_t batch_count = out_batches->batches.size();
52 for (int repeat = 1; repeat < multiplicity; ++repeat) {
53 for (size_t i = 0; i < batch_count; ++i) {
54 out_batches->batches.push_back(out_batches->batches[i]);
55 }
56 }
57 out_batches->schema = schema;
58}
59
60RecordBatchVector GenerateBatches(const std::shared_ptr<Schema>& schema,
61 size_t num_batches, size_t batch_size) {
62 BatchesWithSchema input_batches;
63
64 RecordBatchVector batches;
65 GenerateBatchesFromSchema(schema, num_batches, &input_batches, 1, batch_size);
66
67 for (const auto& batch : input_batches.batches) {
68 batches.push_back(batch.ToRecordBatch(schema).MoveValueUnsafe());
69 }
70 return batches;
71}
72
73} // namespace compute
74
75namespace dataset {
76
77static std::map<std::pair<size_t, size_t>, RecordBatchVector> datasets;
78
79void StoreBatches(size_t num_batches, size_t batch_size,
80 const RecordBatchVector& batches) {
81 datasets[std::make_pair(num_batches, batch_size)] = batches;
82}
83
84RecordBatchVector GetBatches(size_t num_batches, size_t batch_size) {
85 auto iter = datasets.find(std::make_pair(num_batches, batch_size));
86 if (iter == datasets.end()) {
87 return RecordBatchVector{};
88 }
89 return iter->second;
90}
91
92std::shared_ptr<Schema> GetSchema() {
93 static std::shared_ptr<Schema> s = schema({field("a", int32()), field("b", boolean())});
94 return s;
95}
96
97size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); }
98
99void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) {
100 // NB: This test is here for didactic purposes
101
102 // Specify a MemoryPool and ThreadPool for the ExecPlan
103 compute::ExecContext exec_context(default_memory_pool(),
104 ::arrow::internal::GetCpuThreadPool());
105
106 // ensure arrow::dataset node factories are in the registry
107 ::arrow::dataset::internal::Initialize();
108
109 // A ScanNode is constructed from an ExecPlan (into which it is inserted),
110 // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for
111 // predicate pushdown, a projection to skip materialization of unnecessary columns,
112 // ...)
113 ASSERT_OK_AND_ASSIGN(std::shared_ptr<compute::ExecPlan> plan,
114 compute::ExecPlan::Make(&exec_context));
115
116 RecordBatchVector batches = GetBatches(num_batches, batch_size);
117
118 std::shared_ptr<Dataset> dataset =
119 std::make_shared<InMemoryDataset>(GetSchema(), batches);
120
121 auto options = std::make_shared<ScanOptions>();
122 // sync scanning is not supported by ScanNode
123 options->use_async = true;
124 // specify the filter
125 compute::Expression b_is_true = field_ref("b");
126 options->filter = b_is_true;
127 // for now, specify the projection as the full project expression (eventually this can
128 // just be a list of materialized field names)
129 compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)});
130 options->projection =
131 call("make_struct", {a_times_2}, compute::MakeStructOptions{{"a * 2"}});
132
133 // construct the scan node
134 ASSERT_OK_AND_ASSIGN(
135 compute::ExecNode * scan,
136 compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options}));
137
138 // pipe the scan node into a filter node
139 ASSERT_OK_AND_ASSIGN(
140 compute::ExecNode * filter,
141 compute::MakeExecNode("filter", plan.get(), {scan},
142 compute::FilterNodeOptions{b_is_true, async_mode}));
143
144 // pipe the filter node into a project node
145 // NB: we're using the project node factory which preserves fragment/batch index
146 // tagging, so we *can* reorder later if we choose. The tags will not appear in
147 // our output.
148 ASSERT_OK_AND_ASSIGN(
149 compute::ExecNode * project,
150 compute::MakeExecNode("augmented_project", plan.get(), {filter},
151 compute::ProjectNodeOptions{{a_times_2}, {}, async_mode}));
152
153 // finally, pipe the project node into a sink node
154 AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
155 ASSERT_OK_AND_ASSIGN(compute::ExecNode * sink,
156 compute::MakeExecNode("sink", plan.get(), {project},
157 compute::SinkNodeOptions{&sink_gen}));
158
159 ASSERT_NE(sink, nullptr);
160
161 // translate sink_gen (async) to sink_reader (sync)
162 std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
163 schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool());
164
165 // start the ExecPlan
166 ASSERT_OK(plan->StartProducing());
167
168 // collect sink_reader into a Table
169 ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get()));
170
171 ASSERT_GT(collected->num_rows(), 0);
172
173 // wait 1s for completion
174 ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s";
175}
176
177static void MinimalEndToEndBench(benchmark::State& state) {
178 size_t num_batches = state.range(0);
179 size_t batch_size = state.range(1);
180 bool async_mode = state.range(2);
181
182 for (auto _ : state) {
183 MinimalEndToEndScan(num_batches, batch_size, async_mode);
184 }
185 state.SetItemsProcessed(state.iterations() * num_batches);
186 state.SetBytesProcessed(state.iterations() * num_batches * batch_size *
187 GetBytesForSchema());
188}
189
190static const std::vector<int32_t> kWorkload = {100, 1000, 10000, 100000};
191
192static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark* b) {
193 for (const int32_t num_batches : kWorkload) {
194 for (const int batch_size : {10, 100, 1000}) {
195 for (const bool async_mode : {true, false}) {
196 b->Args({num_batches, batch_size, async_mode});
197 RecordBatchVector batches =
198 ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size);
199 StoreBatches(num_batches, batch_size, batches);
200 }
201 }
202 }
203 b->ArgNames({"num_batches", "batch_size", "async_mode"});
204 b->UseRealTime();
205}
206
207BENCHMARK(MinimalEndToEndBench)->Apply(MinimalEndToEnd_Customize);
208
209} // namespace dataset
210} // namespace arrow