]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/dataset/scanner_benchmark.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / scanner_benchmark.cc
diff --git a/ceph/src/arrow/cpp/src/arrow/dataset/scanner_benchmark.cc b/ceph/src/arrow/cpp/src/arrow/dataset/scanner_benchmark.cc
new file mode 100644 (file)
index 0000000..e302179
--- /dev/null
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/plan.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/test_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
+
+namespace arrow {
+namespace compute {
+
+constexpr auto kSeed = 0x0ff1ce;
+
+void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema, size_t num_batches,
+                               BatchesWithSchema* out_batches, int multiplicity = 1,
+                               int64_t batch_size = 4) {
+  ::arrow::random::RandomArrayGenerator rng_(kSeed);
+  if (num_batches == 0) {
+    auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
+    out_batches->batches.push_back(empty_record_batch);
+  } else {
+    for (size_t j = 0; j < num_batches; j++) {
+      out_batches->batches.push_back(
+          ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+    }
+  }
+
+  size_t batch_count = out_batches->batches.size();
+  for (int repeat = 1; repeat < multiplicity; ++repeat) {
+    for (size_t i = 0; i < batch_count; ++i) {
+      out_batches->batches.push_back(out_batches->batches[i]);
+    }
+  }
+  out_batches->schema = schema;
+}
+
+RecordBatchVector GenerateBatches(const std::shared_ptr<Schema>& schema,
+                                  size_t num_batches, size_t batch_size) {
+  BatchesWithSchema input_batches;
+
+  RecordBatchVector batches;
+  GenerateBatchesFromSchema(schema, num_batches, &input_batches, 1, batch_size);
+
+  for (const auto& batch : input_batches.batches) {
+    batches.push_back(batch.ToRecordBatch(schema).MoveValueUnsafe());
+  }
+  return batches;
+}
+
+}  // namespace compute
+
+namespace dataset {
+
+static std::map<std::pair<size_t, size_t>, RecordBatchVector> datasets;
+
+void StoreBatches(size_t num_batches, size_t batch_size,
+                  const RecordBatchVector& batches) {
+  datasets[std::make_pair(num_batches, batch_size)] = batches;
+}
+
+RecordBatchVector GetBatches(size_t num_batches, size_t batch_size) {
+  auto iter = datasets.find(std::make_pair(num_batches, batch_size));
+  if (iter == datasets.end()) {
+    return RecordBatchVector{};
+  }
+  return iter->second;
+}
+
+std::shared_ptr<Schema> GetSchema() {
+  static std::shared_ptr<Schema> s = schema({field("a", int32()), field("b", boolean())});
+  return s;
+}
+
+size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); }
+
+void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) {
+  // NB: This test is here for didactic purposes
+
+  // Specify a MemoryPool and ThreadPool for the ExecPlan
+  compute::ExecContext exec_context(default_memory_pool(),
+                                    ::arrow::internal::GetCpuThreadPool());
+
+  // ensure arrow::dataset node factories are in the registry
+  ::arrow::dataset::internal::Initialize();
+
+  // A ScanNode is constructed from an ExecPlan (into which it is inserted),
+  // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for
+  // predicate pushdown, a projection to skip materialization of unnecessary columns,
+  // ...)
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<compute::ExecPlan> plan,
+                       compute::ExecPlan::Make(&exec_context));
+
+  RecordBatchVector batches = GetBatches(num_batches, batch_size);
+
+  std::shared_ptr<Dataset> dataset =
+      std::make_shared<InMemoryDataset>(GetSchema(), batches);
+
+  auto options = std::make_shared<ScanOptions>();
+  // sync scanning is not supported by ScanNode
+  options->use_async = true;
+  // specify the filter
+  compute::Expression b_is_true = field_ref("b");
+  options->filter = b_is_true;
+  // for now, specify the projection as the full project expression (eventually this can
+  // just be a list of materialized field names)
+  compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)});
+  options->projection =
+      call("make_struct", {a_times_2}, compute::MakeStructOptions{{"a * 2"}});
+
+  // construct the scan node
+  ASSERT_OK_AND_ASSIGN(
+      compute::ExecNode * scan,
+      compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options}));
+
+  // pipe the scan node into a filter node
+  ASSERT_OK_AND_ASSIGN(
+      compute::ExecNode * filter,
+      compute::MakeExecNode("filter", plan.get(), {scan},
+                            compute::FilterNodeOptions{b_is_true, async_mode}));
+
+  // pipe the filter node into a project node
+  // NB: we're using the project node factory which preserves fragment/batch index
+  // tagging, so we *can* reorder later if we choose. The tags will not appear in
+  // our output.
+  ASSERT_OK_AND_ASSIGN(
+      compute::ExecNode * project,
+      compute::MakeExecNode("augmented_project", plan.get(), {filter},
+                            compute::ProjectNodeOptions{{a_times_2}, {}, async_mode}));
+
+  // finally, pipe the project node into a sink node
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+  ASSERT_OK_AND_ASSIGN(compute::ExecNode * sink,
+                       compute::MakeExecNode("sink", plan.get(), {project},
+                                             compute::SinkNodeOptions{&sink_gen}));
+
+  ASSERT_NE(sink, nullptr);
+
+  // translate sink_gen (async) to sink_reader (sync)
+  std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
+      schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool());
+
+  // start the ExecPlan
+  ASSERT_OK(plan->StartProducing());
+
+  // collect sink_reader into a Table
+  ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get()));
+
+  ASSERT_GT(collected->num_rows(), 0);
+
+  // wait 1s for completion
+  ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s";
+}
+
+static void MinimalEndToEndBench(benchmark::State& state) {
+  size_t num_batches = state.range(0);
+  size_t batch_size = state.range(1);
+  bool async_mode = state.range(2);
+
+  for (auto _ : state) {
+    MinimalEndToEndScan(num_batches, batch_size, async_mode);
+  }
+  state.SetItemsProcessed(state.iterations() * num_batches);
+  state.SetBytesProcessed(state.iterations() * num_batches * batch_size *
+                          GetBytesForSchema());
+}
+
+static const std::vector<int32_t> kWorkload = {100, 1000, 10000, 100000};
+
+static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark* b) {
+  for (const int32_t num_batches : kWorkload) {
+    for (const int batch_size : {10, 100, 1000}) {
+      for (const bool async_mode : {true, false}) {
+        b->Args({num_batches, batch_size, async_mode});
+        RecordBatchVector batches =
+            ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size);
+        StoreBatches(num_batches, batch_size, batches);
+      }
+    }
+  }
+  b->ArgNames({"num_batches", "batch_size", "async_mode"});
+  b->UseRealTime();
+}
+
+BENCHMARK(MinimalEndToEndBench)->Apply(MinimalEndToEnd_Customize);
+
+}  // namespace dataset
+}  // namespace arrow