]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/compute/exec/plan_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / compute / exec / plan_test.cc
diff --git a/ceph/src/arrow/cpp/src/arrow/compute/exec/plan_test.cc b/ceph/src/arrow/cpp/src/arrow/compute/exec/plan_test.cc
new file mode 100644 (file)
index 0000000..54d807e
--- /dev/null
@@ -0,0 +1,1226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <functional>
+#include <memory>
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/util_internal.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/thread_pool.h"
+#include "arrow/util/vector.h"
+
+using testing::ElementsAre;
+using testing::ElementsAreArray;
+using testing::HasSubstr;
+using testing::Optional;
+using testing::UnorderedElementsAreArray;
+
+namespace arrow {
+
+namespace compute {
+
+TEST(ExecPlanConstruction, Empty) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+
+  ASSERT_THAT(plan->Validate(), Raises(StatusCode::Invalid));
+}
+
+TEST(ExecPlanConstruction, SingleNode) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  auto node = MakeDummyNode(plan.get(), "dummy", /*inputs=*/{}, /*num_outputs=*/0);
+  ASSERT_OK(plan->Validate());
+  ASSERT_THAT(plan->sources(), ElementsAre(node));
+  ASSERT_THAT(plan->sinks(), ElementsAre(node));
+
+  ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
+  node = MakeDummyNode(plan.get(), "dummy", /*inputs=*/{}, /*num_outputs=*/1);
+  // Output not bound
+  ASSERT_THAT(plan->Validate(), Raises(StatusCode::Invalid));
+}
+
+TEST(ExecPlanConstruction, SourceSink) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  auto source = MakeDummyNode(plan.get(), "source", /*inputs=*/{}, /*num_outputs=*/1);
+  auto sink = MakeDummyNode(plan.get(), "sink", /*inputs=*/{source}, /*num_outputs=*/0);
+
+  ASSERT_OK(plan->Validate());
+  EXPECT_THAT(plan->sources(), ElementsAre(source));
+  EXPECT_THAT(plan->sinks(), ElementsAre(sink));
+}
+
+TEST(ExecPlanConstruction, MultipleNode) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+
+  auto source1 = MakeDummyNode(plan.get(), "source1", /*inputs=*/{}, /*num_outputs=*/2);
+
+  auto source2 = MakeDummyNode(plan.get(), "source2", /*inputs=*/{}, /*num_outputs=*/1);
+
+  auto process1 =
+      MakeDummyNode(plan.get(), "process1", /*inputs=*/{source1}, /*num_outputs=*/2);
+
+  auto process2 = MakeDummyNode(plan.get(), "process1", /*inputs=*/{source1, source2},
+                                /*num_outputs=*/1);
+
+  auto process3 =
+      MakeDummyNode(plan.get(), "process3", /*inputs=*/{process1, process2, process1},
+                    /*num_outputs=*/1);
+
+  auto sink = MakeDummyNode(plan.get(), "sink", /*inputs=*/{process3}, /*num_outputs=*/0);
+
+  ASSERT_OK(plan->Validate());
+  ASSERT_THAT(plan->sources(), ElementsAre(source1, source2));
+  ASSERT_THAT(plan->sinks(), ElementsAre(sink));
+}
+
+TEST(ExecPlanConstruction, AutoLabel) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  auto source1 = MakeDummyNode(plan.get(), "", /*inputs=*/{}, /*num_outputs=*/2);
+  auto source2 =
+      MakeDummyNode(plan.get(), "some_label", /*inputs=*/{}, /*num_outputs=*/1);
+  auto source3 = MakeDummyNode(plan.get(), "", /*inputs=*/{}, /*num_outputs=*/2);
+
+  ASSERT_EQ("0", source1->label());
+  ASSERT_EQ("some_label", source2->label());
+  ASSERT_EQ("2", source3->label());
+}
+
+struct StartStopTracker {
+  std::vector<std::string> started, stopped;
+
+  StartProducingFunc start_producing_func(Status st = Status::OK()) {
+    return [this, st](ExecNode* node) {
+      started.push_back(node->label());
+      return st;
+    };
+  }
+
+  StopProducingFunc stop_producing_func() {
+    return [this](ExecNode* node) { stopped.push_back(node->label()); };
+  }
+};
+
+TEST(ExecPlan, DummyStartProducing) {
+  StartStopTracker t;
+
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+
+  auto source1 = MakeDummyNode(plan.get(), "source1", /*inputs=*/{}, /*num_outputs=*/2,
+                               t.start_producing_func(), t.stop_producing_func());
+
+  auto source2 = MakeDummyNode(plan.get(), "source2", /*inputs=*/{}, /*num_outputs=*/1,
+                               t.start_producing_func(), t.stop_producing_func());
+
+  auto process1 =
+      MakeDummyNode(plan.get(), "process1", /*inputs=*/{source1}, /*num_outputs=*/2,
+                    t.start_producing_func(), t.stop_producing_func());
+
+  auto process2 =
+      MakeDummyNode(plan.get(), "process2", /*inputs=*/{process1, source2},
+                    /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
+
+  auto process3 =
+      MakeDummyNode(plan.get(), "process3", /*inputs=*/{process1, source1, process2},
+                    /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
+
+  MakeDummyNode(plan.get(), "sink", /*inputs=*/{process3}, /*num_outputs=*/0,
+                t.start_producing_func(), t.stop_producing_func());
+
+  ASSERT_OK(plan->Validate());
+  ASSERT_EQ(t.started.size(), 0);
+  ASSERT_EQ(t.stopped.size(), 0);
+
+  ASSERT_OK(plan->StartProducing());
+  // Note that any correct reverse topological order may do
+  ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1",
+                                     "source2", "source1"));
+
+  plan->StopProducing();
+  ASSERT_THAT(plan->finished(), Finishes(Ok()));
+  // Note that any correct topological order may do
+  ASSERT_THAT(t.stopped, ElementsAre("source1", "source2", "process1", "process2",
+                                     "process3", "sink"));
+
+  ASSERT_THAT(plan->StartProducing(),
+              Raises(StatusCode::Invalid, HasSubstr("restarted")));
+}
+
+TEST(ExecPlan, DummyStartProducingError) {
+  StartStopTracker t;
+
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  auto source1 = MakeDummyNode(
+      plan.get(), "source1", /*num_inputs=*/{}, /*num_outputs=*/2,
+      t.start_producing_func(Status::NotImplemented("zzz")), t.stop_producing_func());
+
+  auto source2 =
+      MakeDummyNode(plan.get(), "source2", /*num_inputs=*/{}, /*num_outputs=*/1,
+                    t.start_producing_func(), t.stop_producing_func());
+
+  auto process1 = MakeDummyNode(
+      plan.get(), "process1", /*num_inputs=*/{source1}, /*num_outputs=*/2,
+      t.start_producing_func(Status::IOError("xxx")), t.stop_producing_func());
+
+  auto process2 =
+      MakeDummyNode(plan.get(), "process2", /*num_inputs=*/{process1, source2},
+                    /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
+
+  auto process3 =
+      MakeDummyNode(plan.get(), "process3", /*num_inputs=*/{process1, source1, process2},
+                    /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
+
+  MakeDummyNode(plan.get(), "sink", /*num_inputs=*/{process3}, /*num_outputs=*/0,
+                t.start_producing_func(), t.stop_producing_func());
+
+  ASSERT_OK(plan->Validate());
+  ASSERT_EQ(t.started.size(), 0);
+  ASSERT_EQ(t.stopped.size(), 0);
+
+  // `process1` raises IOError
+  ASSERT_THAT(plan->StartProducing(), Raises(StatusCode::IOError));
+  ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1"));
+  // Nodes that started successfully were stopped in reverse order
+  ASSERT_THAT(t.stopped, ElementsAre("process2", "process3", "sink"));
+}
+
+TEST(ExecPlanExecution, SourceSink) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto basic_data = MakeBasicBatches();
+
+      ASSERT_OK(Declaration::Sequence(
+                    {
+                        {"source", SourceNodeOptions{basic_data.schema,
+                                                     basic_data.gen(parallel, slow)}},
+                        {"sink", SinkNodeOptions{&sink_gen}},
+                    })
+                    .AddToPlan(plan.get()));
+
+      ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                  Finishes(ResultWith(UnorderedElementsAreArray(basic_data.batches))));
+    }
+  }
+}
+
+TEST(ExecPlanExecution, SinkNodeBackpressure) {
+  constexpr uint32_t kPauseIfAbove = 4;
+  constexpr uint32_t kResumeIfBelow = 2;
+  EXPECT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> plan, ExecPlan::Make());
+  PushGenerator<util::optional<ExecBatch>> batch_producer;
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+  util::BackpressureOptions backpressure_options =
+      util::BackpressureOptions::Make(kResumeIfBelow, kPauseIfAbove);
+  std::shared_ptr<Schema> schema_ = schema({field("data", uint32())});
+  ARROW_EXPECT_OK(compute::Declaration::Sequence(
+                      {
+                          {"source", SourceNodeOptions(schema_, batch_producer)},
+                          {"sink", SinkNodeOptions{&sink_gen, backpressure_options}},
+                      })
+                      .AddToPlan(plan.get()));
+  ARROW_EXPECT_OK(plan->StartProducing());
+
+  EXPECT_OK_AND_ASSIGN(util::optional<ExecBatch> batch, ExecBatch::Make({MakeScalar(0)}));
+  ASSERT_TRUE(backpressure_options.toggle->IsOpen());
+
+  // Should be able to push kPauseIfAbove batches without triggering back pressure
+  for (uint32_t i = 0; i < kPauseIfAbove; i++) {
+    batch_producer.producer().Push(batch);
+  }
+  SleepABit();
+  ASSERT_TRUE(backpressure_options.toggle->IsOpen());
+
+  // One more batch should trigger back pressure
+  batch_producer.producer().Push(batch);
+  BusyWait(10, [&] { return !backpressure_options.toggle->IsOpen(); });
+  ASSERT_FALSE(backpressure_options.toggle->IsOpen());
+
+  // Reading as much as we can while keeping it paused
+  for (uint32_t i = kPauseIfAbove; i >= kResumeIfBelow; i--) {
+    ASSERT_FINISHES_OK(sink_gen());
+  }
+  SleepABit();
+  ASSERT_FALSE(backpressure_options.toggle->IsOpen());
+
+  // Reading one more item should open up backpressure
+  ASSERT_FINISHES_OK(sink_gen());
+  BusyWait(10, [&] { return backpressure_options.toggle->IsOpen(); });
+  ASSERT_TRUE(backpressure_options.toggle->IsOpen());
+
+  // Cleanup
+  batch_producer.producer().Push(IterationEnd<util::optional<ExecBatch>>());
+  plan->StopProducing();
+  ASSERT_FINISHES_OK(plan->finished());
+}
+
+TEST(ExecPlan, ToString) {
+  auto basic_data = MakeBasicBatches();
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    {"source", SourceNodeOptions{basic_data.schema,
+                                                 basic_data.gen(/*parallel=*/false,
+                                                                /*slow=*/false)}},
+                    {"sink", SinkNodeOptions{&sink_gen}},
+                })
+                .AddToPlan(plan.get()));
+  EXPECT_EQ(plan->sources()[0]->ToString(), R"(SourceNode{"source", outputs=["sink"]})");
+  EXPECT_EQ(plan->sinks()[0]->ToString(),
+            R"(SinkNode{"sink", inputs=[collected: "source"]})");
+  EXPECT_EQ(plan->ToString(), R"(ExecPlan with 2 nodes:
+SourceNode{"source", outputs=["sink"]}
+SinkNode{"sink", inputs=[collected: "source"]}
+)");
+
+  ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
+  CountOptions options(CountOptions::ONLY_VALID);
+  ASSERT_OK(
+      Declaration::Sequence(
+          {
+              {"source",
+               SourceNodeOptions{basic_data.schema,
+                                 basic_data.gen(/*parallel=*/false, /*slow=*/false)}},
+              {"filter", FilterNodeOptions{greater_equal(field_ref("i32"), literal(0))}},
+              {"project", ProjectNodeOptions{{
+                              field_ref("bool"),
+                              call("multiply", {field_ref("i32"), literal(2)}),
+                          }}},
+              {"aggregate",
+               AggregateNodeOptions{
+                   /*aggregates=*/{{"hash_sum", nullptr}, {"hash_count", &options}},
+                   /*targets=*/{"multiply(i32, 2)", "multiply(i32, 2)"},
+                   /*names=*/{"sum(multiply(i32, 2))", "count(multiply(i32, 2))"},
+                   /*keys=*/{"bool"}}},
+              {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
+                                                   literal(10))}},
+              {"order_by_sink",
+               OrderBySinkNodeOptions{
+                   SortOptions({SortKey{"sum(multiply(i32, 2))", SortOrder::Ascending}}),
+                   &sink_gen}},
+          })
+          .AddToPlan(plan.get()));
+  EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 6 nodes:
+SourceNode{"source", outputs=["filter"]}
+FilterNode{"filter", inputs=[target: "source"], outputs=["project"], filter=(i32 >= 0)}
+ProjectNode{"project", inputs=[target: "filter"], outputs=["aggregate"], projection=[bool, multiply(i32, 2)]}
+GroupByNode{"aggregate", inputs=[groupby: "project"], outputs=["filter"], keys=["bool"], aggregates=[
+       hash_sum(multiply(i32, 2)),
+       hash_count(multiply(i32, 2), {mode=NON_NULL}),
+]}
+FilterNode{"filter", inputs=[target: "aggregate"], outputs=["order_by_sink"], filter=(sum(multiply(i32, 2)) > 10)}
+OrderBySinkNode{"order_by_sink", inputs=[collected: "filter"], by={sort_keys=[sum(multiply(i32, 2)) ASC], null_placement=AtEnd}}
+)a");
+
+  ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
+  Declaration union_node{"union", ExecNodeOptions{}};
+  Declaration lhs{"source",
+                  SourceNodeOptions{basic_data.schema,
+                                    basic_data.gen(/*parallel=*/false, /*slow=*/false)}};
+  lhs.label = "lhs";
+  Declaration rhs{"source",
+                  SourceNodeOptions{basic_data.schema,
+                                    basic_data.gen(/*parallel=*/false, /*slow=*/false)}};
+  rhs.label = "rhs";
+  union_node.inputs.emplace_back(lhs);
+  union_node.inputs.emplace_back(rhs);
+  ASSERT_OK(
+      Declaration::Sequence(
+          {
+              union_node,
+              {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"count", &options}},
+                                                 /*targets=*/{"i32"},
+                                                 /*names=*/{"count(i32)"},
+                                                 /*keys=*/{}}},
+              {"sink", SinkNodeOptions{&sink_gen}},
+          })
+          .AddToPlan(plan.get()));
+  EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 5 nodes:
+SourceNode{"lhs", outputs=["union"]}
+SourceNode{"rhs", outputs=["union"]}
+UnionNode{"union", inputs=[input_0_label: "lhs", input_1_label: "rhs"], outputs=["aggregate"]}
+ScalarAggregateNode{"aggregate", inputs=[target: "union"], outputs=["sink"], aggregates=[
+       count(i32, {mode=NON_NULL}),
+]}
+SinkNode{"sink", inputs=[collected: "aggregate"]}
+)a");
+}
+
+TEST(ExecPlanExecution, SourceOrderBy) {
+  std::vector<ExecBatch> expected = {
+      ExecBatchFromJSON({int32(), boolean()},
+                        "[[4, false], [5, null], [6, false], [7, false], [null, true]]")};
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto basic_data = MakeBasicBatches();
+
+      SortOptions options({SortKey("i32", SortOrder::Ascending)});
+      ASSERT_OK(Declaration::Sequence(
+                    {
+                        {"source", SourceNodeOptions{basic_data.schema,
+                                                     basic_data.gen(parallel, slow)}},
+                        {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
+                    })
+                    .AddToPlan(plan.get()));
+
+      ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                  Finishes(ResultWith(ElementsAreArray(expected))));
+    }
+  }
+}
+
+TEST(ExecPlanExecution, SourceSinkError) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  auto basic_data = MakeBasicBatches();
+  auto it = basic_data.batches.begin();
+  AsyncGenerator<util::optional<ExecBatch>> error_source_gen =
+      [&]() -> Result<util::optional<ExecBatch>> {
+    if (it == basic_data.batches.end()) {
+      return Status::Invalid("Artificial error");
+    }
+    return util::make_optional(*it++);
+  };
+
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    {"source", SourceNodeOptions{basic_data.schema, error_source_gen}},
+                    {"sink", SinkNodeOptions{&sink_gen}},
+                })
+                .AddToPlan(plan.get()));
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              Finishes(Raises(StatusCode::Invalid, HasSubstr("Artificial"))));
+}
+
+TEST(ExecPlanExecution, SourceConsumingSink) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      std::atomic<uint32_t> batches_seen{0};
+      Future<> finish = Future<>::Make();
+      struct TestConsumer : public SinkNodeConsumer {
+        TestConsumer(std::atomic<uint32_t>* batches_seen, Future<> finish)
+            : batches_seen(batches_seen), finish(std::move(finish)) {}
+
+        Status Consume(ExecBatch batch) override {
+          (*batches_seen)++;
+          return Status::OK();
+        }
+
+        Future<> Finish() override { return finish; }
+
+        std::atomic<uint32_t>* batches_seen;
+        Future<> finish;
+      };
+      std::shared_ptr<TestConsumer> consumer =
+          std::make_shared<TestConsumer>(&batches_seen, finish);
+
+      auto basic_data = MakeBasicBatches();
+      ASSERT_OK_AND_ASSIGN(
+          auto source, MakeExecNode("source", plan.get(), {},
+                                    SourceNodeOptions(basic_data.schema,
+                                                      basic_data.gen(parallel, slow))));
+      ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
+                             ConsumingSinkNodeOptions(consumer)));
+      ASSERT_OK(plan->StartProducing());
+      // Source should finish fairly quickly
+      ASSERT_FINISHES_OK(source->finished());
+      SleepABit();
+      ASSERT_EQ(2, batches_seen);
+      // Consumer isn't finished and so plan shouldn't have finished
+      AssertNotFinished(plan->finished());
+      // Mark consumption complete, plan should finish
+      finish.MarkFinished();
+      ASSERT_FINISHES_OK(plan->finished());
+    }
+  }
+}
+
+TEST(ExecPlanExecution, ConsumingSinkError) {
+  struct ConsumeErrorConsumer : public SinkNodeConsumer {
+    Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); }
+    Future<> Finish() override { return Future<>::MakeFinished(); }
+  };
+  struct FinishErrorConsumer : public SinkNodeConsumer {
+    Status Consume(ExecBatch batch) override { return Status::OK(); }
+    Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); }
+  };
+  std::vector<std::shared_ptr<SinkNodeConsumer>> consumers{
+      std::make_shared<ConsumeErrorConsumer>(), std::make_shared<FinishErrorConsumer>()};
+
+  for (auto& consumer : consumers) {
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    auto basic_data = MakeBasicBatches();
+    ASSERT_OK(Declaration::Sequence(
+                  {{"source",
+                    SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
+                   {"consuming_sink", ConsumingSinkNodeOptions(consumer)}})
+                  .AddToPlan(plan.get()));
+    ASSERT_OK_AND_ASSIGN(
+        auto source,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
+    ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
+                           ConsumingSinkNodeOptions(consumer)));
+    ASSERT_OK(plan->StartProducing());
+    ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
+  }
+}
+
+TEST(ExecPlanExecution, ConsumingSinkErrorFinish) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  struct FinishErrorConsumer : public SinkNodeConsumer {
+    Status Consume(ExecBatch batch) override { return Status::OK(); }
+    Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); }
+  };
+  std::shared_ptr<FinishErrorConsumer> consumer = std::make_shared<FinishErrorConsumer>();
+
+  auto basic_data = MakeBasicBatches();
+  ASSERT_OK(
+      Declaration::Sequence(
+          {{"source", SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
+           {"consuming_sink", ConsumingSinkNodeOptions(consumer)}})
+          .AddToPlan(plan.get()));
+  ASSERT_OK_AND_ASSIGN(
+      auto source,
+      MakeExecNode("source", plan.get(), {},
+                   SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
+  ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
+                         ConsumingSinkNodeOptions(consumer)));
+  ASSERT_OK(plan->StartProducing());
+  ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
+}
+
+TEST(ExecPlanExecution, StressSourceSink) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      int num_batches = (slow && !parallel) ? 30 : 300;
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto random_data = MakeRandomBatches(
+          schema({field("a", int32()), field("b", boolean())}), num_batches);
+
+      ASSERT_OK(Declaration::Sequence(
+                    {
+                        {"source", SourceNodeOptions{random_data.schema,
+                                                     random_data.gen(parallel, slow)}},
+                        {"sink", SinkNodeOptions{&sink_gen}},
+                    })
+                    .AddToPlan(plan.get()));
+
+      ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                  Finishes(ResultWith(UnorderedElementsAreArray(random_data.batches))));
+    }
+  }
+}
+
+TEST(ExecPlanExecution, StressSourceOrderBy) {
+  auto input_schema = schema({field("a", int32()), field("b", boolean())});
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      int num_batches = (slow && !parallel) ? 30 : 300;
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto random_data = MakeRandomBatches(input_schema, num_batches);
+
+      SortOptions options({SortKey("a", SortOrder::Ascending)});
+      ASSERT_OK(Declaration::Sequence(
+                    {
+                        {"source", SourceNodeOptions{random_data.schema,
+                                                     random_data.gen(parallel, slow)}},
+                        {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
+                    })
+                    .AddToPlan(plan.get()));
+
+      // Check that data is sorted appropriately
+      ASSERT_FINISHES_OK_AND_ASSIGN(auto exec_batches,
+                                    StartAndCollect(plan.get(), sink_gen));
+      ASSERT_OK_AND_ASSIGN(auto actual, TableFromExecBatches(input_schema, exec_batches));
+      ASSERT_OK_AND_ASSIGN(auto original,
+                           TableFromExecBatches(input_schema, random_data.batches));
+      ASSERT_OK_AND_ASSIGN(auto sort_indices, SortIndices(original, options));
+      ASSERT_OK_AND_ASSIGN(auto expected, Take(original, sort_indices));
+      AssertTablesEqual(*actual, *expected.table());
+    }
+  }
+}
+
+TEST(ExecPlanExecution, StressSourceGroupedSumStop) {
+  auto input_schema = schema({field("a", int32()), field("b", boolean())});
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      int num_batches = (slow && !parallel) ? 30 : 300;
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto random_data = MakeRandomBatches(input_schema, num_batches);
+
+      SortOptions options({SortKey("a", SortOrder::Ascending)});
+      ASSERT_OK(Declaration::Sequence(
+                    {
+                        {"source", SourceNodeOptions{random_data.schema,
+                                                     random_data.gen(parallel, slow)}},
+                        {"aggregate",
+                         AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
+                                              /*targets=*/{"a"}, /*names=*/{"sum(a)"},
+                                              /*keys=*/{"b"}}},
+                        {"sink", SinkNodeOptions{&sink_gen}},
+                    })
+                    .AddToPlan(plan.get()));
+
+      ASSERT_OK(plan->Validate());
+      ASSERT_OK(plan->StartProducing());
+      plan->StopProducing();
+      ASSERT_FINISHES_OK(plan->finished());
+    }
+  }
+}
+
+TEST(ExecPlanExecution, StressSourceSinkStopped) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      int num_batches = (slow && !parallel) ? 30 : 300;
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto random_data = MakeRandomBatches(
+          schema({field("a", int32()), field("b", boolean())}), num_batches);
+
+      ASSERT_OK(Declaration::Sequence(
+                    {
+                        {"source", SourceNodeOptions{random_data.schema,
+                                                     random_data.gen(parallel, slow)}},
+                        {"sink", SinkNodeOptions{&sink_gen}},
+                    })
+                    .AddToPlan(plan.get()));
+
+      ASSERT_OK(plan->Validate());
+      ASSERT_OK(plan->StartProducing());
+
+      EXPECT_THAT(sink_gen(), Finishes(ResultWith(Optional(random_data.batches[0]))));
+
+      plan->StopProducing();
+      ASSERT_THAT(plan->finished(), Finishes(Ok()));
+    }
+  }
+}
+
+TEST(ExecPlanExecution, SourceFilterSink) {
+  auto basic_data = MakeBasicBatches();
+
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    {"source", SourceNodeOptions{basic_data.schema,
+                                                 basic_data.gen(/*parallel=*/false,
+                                                                /*slow=*/false)}},
+                    {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}},
+                    {"sink", SinkNodeOptions{&sink_gen}},
+                })
+                .AddToPlan(plan.get()));
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              Finishes(ResultWith(UnorderedElementsAreArray(
+                  {ExecBatchFromJSON({int32(), boolean()}, "[]"),
+                   ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")}))));
+}
+
+TEST(ExecPlanExecution, SourceProjectSink) {
+  auto basic_data = MakeBasicBatches();
+
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    {"source", SourceNodeOptions{basic_data.schema,
+                                                 basic_data.gen(/*parallel=*/false,
+                                                                /*slow=*/false)}},
+                    {"project",
+                     ProjectNodeOptions{{
+                                            not_(field_ref("bool")),
+                                            call("add", {field_ref("i32"), literal(1)}),
+                                        },
+                                        {"!bool", "i32 + 1"}}},
+                    {"sink", SinkNodeOptions{&sink_gen}},
+                })
+                .AddToPlan(plan.get()));
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              Finishes(ResultWith(UnorderedElementsAreArray(
+                  {ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"),
+                   ExecBatchFromJSON({boolean(), int32()},
+                                     "[[null, 6], [true, 7], [true, 8]]")}))));
+}
+
+namespace {
+
+BatchesWithSchema MakeGroupableBatches(int multiplicity = 1) {
+  BatchesWithSchema out;
+
+  out.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([
+                   [12, "alfa"],
+                   [7,  "beta"],
+                   [3,  "alfa"]
+                 ])"),
+                 ExecBatchFromJSON({int32(), utf8()}, R"([
+                   [-2, "alfa"],
+                   [-1, "gama"],
+                   [3,  "alfa"]
+                 ])"),
+                 ExecBatchFromJSON({int32(), utf8()}, R"([
+                   [5,  "gama"],
+                   [3,  "beta"],
+                   [-8, "alfa"]
+                 ])")};
+
+  size_t batch_count = out.batches.size();
+  for (int repeat = 1; repeat < multiplicity; ++repeat) {
+    for (size_t i = 0; i < batch_count; ++i) {
+      out.batches.push_back(out.batches[i]);
+    }
+  }
+
+  out.schema = schema({field("i32", int32()), field("str", utf8())});
+
+  return out;
+}
+
+}  // namespace
+
+TEST(ExecPlanExecution, SourceGroupedSum) {
+  for (bool parallel : {false, true}) {
+    SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
+
+    auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1);
+
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source", SourceNodeOptions{input.schema,
+                                                   input.gen(parallel, /*slow=*/false)}},
+                      {"aggregate",
+                       AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
+                                            /*targets=*/{"i32"}, /*names=*/{"sum(i32)"},
+                                            /*keys=*/{"str"}}},
+                      {"sink", SinkNodeOptions{&sink_gen}},
+                  })
+                  .AddToPlan(plan.get()));
+
+    ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON(
+                    {int64(), utf8()},
+                    parallel ? R"([[800, "alfa"], [1000, "beta"], [400, "gama"]])"
+                             : R"([[8, "alfa"], [10, "beta"], [4, "gama"]])")}))));
+  }
+}
+
+TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) {
+  for (bool parallel : {false, true}) {
+    SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
+
+    int batch_multiplicity = parallel ? 100 : 1;
+    auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity);
+
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ASSERT_OK(
+        Declaration::Sequence(
+            {
+                {"source",
+                 SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
+                {"filter",
+                 FilterNodeOptions{greater_equal(field_ref("i32"), literal(0))}},
+                {"project", ProjectNodeOptions{{
+                                field_ref("str"),
+                                call("multiply", {field_ref("i32"), literal(2)}),
+                            }}},
+                {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
+                                                   /*targets=*/{"multiply(i32, 2)"},
+                                                   /*names=*/{"sum(multiply(i32, 2))"},
+                                                   /*keys=*/{"str"}}},
+                {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
+                                                     literal(10 * batch_multiplicity))}},
+                {"sink", SinkNodeOptions{&sink_gen}},
+            })
+            .AddToPlan(plan.get()));
+
+    ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON(
+                    {int64(), utf8()}, parallel ? R"([[3600, "alfa"], [2000, "beta"]])"
+                                                : R"([[36, "alfa"], [20, "beta"]])")}))));
+  }
+}
+
+TEST(ExecPlanExecution, SourceFilterProjectGroupedSumOrderBy) {
+  for (bool parallel : {false, true}) {
+    SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
+
+    int batch_multiplicity = parallel ? 100 : 1;
+    auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity);
+
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    SortOptions options({SortKey("str", SortOrder::Descending)});
+    ASSERT_OK(
+        Declaration::Sequence(
+            {
+                {"source",
+                 SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
+                {"filter",
+                 FilterNodeOptions{greater_equal(field_ref("i32"), literal(0))}},
+                {"project", ProjectNodeOptions{{
+                                field_ref("str"),
+                                call("multiply", {field_ref("i32"), literal(2)}),
+                            }}},
+                {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
+                                                   /*targets=*/{"multiply(i32, 2)"},
+                                                   /*names=*/{"sum(multiply(i32, 2))"},
+                                                   /*keys=*/{"str"}}},
+                {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
+                                                     literal(10 * batch_multiplicity))}},
+                {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
+            })
+            .AddToPlan(plan.get()));
+
+    ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                Finishes(ResultWith(ElementsAreArray({ExecBatchFromJSON(
+                    {int64(), utf8()}, parallel ? R"([[2000, "beta"], [3600, "alfa"]])"
+                                                : R"([[20, "beta"], [36, "alfa"]])")}))));
+  }
+}
+
+TEST(ExecPlanExecution, SourceFilterProjectGroupedSumTopK) {
+  for (bool parallel : {false, true}) {
+    SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
+
+    int batch_multiplicity = parallel ? 100 : 1;
+    auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity);
+
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    SelectKOptions options = SelectKOptions::TopKDefault(/*k=*/1, {"str"});
+    ASSERT_OK(
+        Declaration::Sequence(
+            {
+                {"source",
+                 SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
+                {"project", ProjectNodeOptions{{
+                                field_ref("str"),
+                                call("multiply", {field_ref("i32"), literal(2)}),
+                            }}},
+                {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
+                                                   /*targets=*/{"multiply(i32, 2)"},
+                                                   /*names=*/{"sum(multiply(i32, 2))"},
+                                                   /*keys=*/{"str"}}},
+                {"select_k_sink", SelectKSinkNodeOptions{options, &sink_gen}},
+            })
+            .AddToPlan(plan.get()));
+
+    ASSERT_THAT(
+        StartAndCollect(plan.get(), sink_gen),
+        Finishes(ResultWith(ElementsAreArray({ExecBatchFromJSON(
+            {int64(), utf8()}, parallel ? R"([[800, "gama"]])" : R"([[8, "gama"]])")}))));
+  }
+}
+
+TEST(ExecPlanExecution, SourceScalarAggSink) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  auto basic_data = MakeBasicBatches();
+
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    {"source", SourceNodeOptions{basic_data.schema,
+                                                 basic_data.gen(/*parallel=*/false,
+                                                                /*slow=*/false)}},
+                    {"aggregate", AggregateNodeOptions{
+                                      /*aggregates=*/{{"sum", nullptr}, {"any", nullptr}},
+                                      /*targets=*/{"i32", "bool"},
+                                      /*names=*/{"sum(i32)", "any(bool)"}}},
+                    {"sink", SinkNodeOptions{&sink_gen}},
+                })
+                .AddToPlan(plan.get()));
+
+  ASSERT_THAT(
+      StartAndCollect(plan.get(), sink_gen),
+      Finishes(ResultWith(UnorderedElementsAreArray({
+          ExecBatchFromJSON({ValueDescr::Scalar(int64()), ValueDescr::Scalar(boolean())},
+                            "[[22, true]]"),
+      }))));
+}
+
+TEST(ExecPlanExecution, AggregationPreservesOptions) {
+  // ARROW-13638: aggregation nodes initialize per-thread kernel state lazily
+  // and need to keep a copy/strong reference to function options
+  {
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    auto basic_data = MakeBasicBatches();
+
+    {
+      auto options = std::make_shared<TDigestOptions>(TDigestOptions::Defaults());
+      ASSERT_OK(Declaration::Sequence(
+                    {
+                        {"source", SourceNodeOptions{basic_data.schema,
+                                                     basic_data.gen(/*parallel=*/false,
+                                                                    /*slow=*/false)}},
+                        {"aggregate",
+                         AggregateNodeOptions{/*aggregates=*/{{"tdigest", options.get()}},
+                                              /*targets=*/{"i32"},
+                                              /*names=*/{"tdigest(i32)"}}},
+                        {"sink", SinkNodeOptions{&sink_gen}},
+                    })
+                    .AddToPlan(plan.get()));
+    }
+
+    ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                Finishes(ResultWith(UnorderedElementsAreArray({
+                    ExecBatchFromJSON({ValueDescr::Array(float64())}, "[[5.5]]"),
+                }))));
+  }
+  {
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    auto data = MakeGroupableBatches(/*multiplicity=*/100);
+
+    {
+      auto options = std::make_shared<CountOptions>(CountOptions::Defaults());
+      ASSERT_OK(
+          Declaration::Sequence(
+              {
+                  {"source", SourceNodeOptions{data.schema, data.gen(/*parallel=*/false,
+                                                                     /*slow=*/false)}},
+                  {"aggregate",
+                   AggregateNodeOptions{/*aggregates=*/{{"hash_count", options.get()}},
+                                        /*targets=*/{"i32"},
+                                        /*names=*/{"count(i32)"},
+                                        /*keys=*/{"str"}}},
+                  {"sink", SinkNodeOptions{&sink_gen}},
+              })
+              .AddToPlan(plan.get()));
+    }
+
+    ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                Finishes(ResultWith(UnorderedElementsAreArray({
+                    ExecBatchFromJSON({int64(), utf8()},
+                                      R"([[500, "alfa"], [200, "beta"], [200, "gama"]])"),
+                }))));
+  }
+}
+
+TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
+  // ARROW-9056: scalar aggregation can be done over scalars, taking
+  // into account batch.length > 1 (e.g. a partition column)
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  BatchesWithSchema scalar_data;
+  scalar_data.batches = {
+      ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(boolean())},
+                        "[[5, false], [5, false], [5, false]]"),
+      ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")};
+  scalar_data.schema = schema({field("a", int32()), field("b", boolean())});
+
+  // index can't be tested as it's order-dependent
+  // mode/quantile can't be tested as they're technically vector kernels
+  ASSERT_OK(
+      Declaration::Sequence(
+          {
+              {"source",
+               SourceNodeOptions{scalar_data.schema, scalar_data.gen(/*parallel=*/false,
+                                                                     /*slow=*/false)}},
+              {"aggregate", AggregateNodeOptions{
+                                /*aggregates=*/{{"all", nullptr},
+                                                {"any", nullptr},
+                                                {"count", nullptr},
+                                                {"mean", nullptr},
+                                                {"product", nullptr},
+                                                {"stddev", nullptr},
+                                                {"sum", nullptr},
+                                                {"tdigest", nullptr},
+                                                {"variance", nullptr}},
+                                /*targets=*/{"b", "b", "a", "a", "a", "a", "a", "a", "a"},
+                                /*names=*/
+                                {"all(b)", "any(b)", "count(a)", "mean(a)", "product(a)",
+                                 "stddev(a)", "sum(a)", "tdigest(a)", "variance(a)"}}},
+              {"sink", SinkNodeOptions{&sink_gen}},
+          })
+          .AddToPlan(plan.get()));
+
+  ASSERT_THAT(
+      StartAndCollect(plan.get(), sink_gen),
+      Finishes(ResultWith(UnorderedElementsAreArray({
+          ExecBatchFromJSON(
+              {ValueDescr::Scalar(boolean()), ValueDescr::Scalar(boolean()),
+               ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
+               ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
+               ValueDescr::Scalar(int64()), ValueDescr::Array(float64()),
+               ValueDescr::Scalar(float64())},
+              R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"),
+      }))));
+}
+
+TEST(ExecPlanExecution, ScalarSourceGroupedSum) {
+  // ARROW-14630: ensure grouped aggregation with a scalar key/array input doesn't error
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  BatchesWithSchema scalar_data;
+  scalar_data.batches = {
+      ExecBatchFromJSON({int32(), ValueDescr::Scalar(boolean())},
+                        "[[5, false], [6, false], [7, false]]"),
+      ExecBatchFromJSON({int32(), ValueDescr::Scalar(boolean())},
+                        "[[1, true], [2, true], [3, true]]"),
+  };
+  scalar_data.schema = schema({field("a", int32()), field("b", boolean())});
+
+  SortOptions options({SortKey("b", SortOrder::Descending)});
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    {"source", SourceNodeOptions{scalar_data.schema,
+                                                 scalar_data.gen(/*parallel=*/false,
+                                                                 /*slow=*/false)}},
+                    {"aggregate",
+                     AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
+                                          /*targets=*/{"a"}, /*names=*/{"hash_sum(a)"},
+                                          /*keys=*/{"b"}}},
+                    {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
+                })
+                .AddToPlan(plan.get()));
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              Finishes(ResultWith(UnorderedElementsAreArray({
+                  ExecBatchFromJSON({int64(), boolean()}, R"([[6, true], [18, false]])"),
+              }))));
+}
+
+TEST(ExecPlanExecution, SelfInnerHashJoinSink) {
+  for (bool parallel : {false, true}) {
+    SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
+
+    auto input = MakeGroupableBatches();
+
+    auto exec_ctx = arrow::internal::make_unique<ExecContext>(
+        default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
+
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ExecNode* left_source;
+    ExecNode* right_source;
+    for (auto source : {&left_source, &right_source}) {
+      ASSERT_OK_AND_ASSIGN(
+          *source, MakeExecNode("source", plan.get(), {},
+                                SourceNodeOptions{input.schema,
+                                                  input.gen(parallel, /*slow=*/false)}));
+    }
+    ASSERT_OK_AND_ASSIGN(
+        auto left_filter,
+        MakeExecNode("filter", plan.get(), {left_source},
+                     FilterNodeOptions{greater_equal(field_ref("i32"), literal(-1))}));
+    ASSERT_OK_AND_ASSIGN(
+        auto right_filter,
+        MakeExecNode("filter", plan.get(), {right_source},
+                     FilterNodeOptions{less_equal(field_ref("i32"), literal(2))}));
+
+    // left side: [3,  "alfa"], [3,  "alfa"], [12, "alfa"], [3,  "beta"], [7,  "beta"],
+    // [-1, "gama"], [5,  "gama"]
+    // right side: [-2, "alfa"], [-8, "alfa"], [-1, "gama"]
+
+    HashJoinNodeOptions join_opts{JoinType::INNER,
+                                  /*left_keys=*/{"str"},
+                                  /*right_keys=*/{"str"}, "l_", "r_"};
+
+    ASSERT_OK_AND_ASSIGN(
+        auto hashjoin,
+        MakeExecNode("hashjoin", plan.get(), {left_filter, right_filter}, join_opts));
+
+    ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin},
+                                                   SinkNodeOptions{&sink_gen}));
+
+    ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
+
+    std::vector<ExecBatch> expected = {
+        ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([
+            [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
+            [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
+            [12, "alfa", -2, "alfa"], [12, "alfa", -8, "alfa"],
+            [-1, "gama", -1, "gama"], [5, "gama", -1, "gama"]])")};
+
+    AssertExecBatchesEqual(hashjoin->output_schema(), result, expected);
+  }
+}
+
+TEST(ExecPlanExecution, SelfOuterHashJoinSink) {
+  for (bool parallel : {false, true}) {
+    SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
+
+    auto input = MakeGroupableBatches();
+
+    auto exec_ctx = arrow::internal::make_unique<ExecContext>(
+        default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
+
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ExecNode* left_source;
+    ExecNode* right_source;
+    for (auto source : {&left_source, &right_source}) {
+      ASSERT_OK_AND_ASSIGN(
+          *source, MakeExecNode("source", plan.get(), {},
+                                SourceNodeOptions{input.schema,
+                                                  input.gen(parallel, /*slow=*/false)}));
+    }
+    ASSERT_OK_AND_ASSIGN(
+        auto left_filter,
+        MakeExecNode("filter", plan.get(), {left_source},
+                     FilterNodeOptions{greater_equal(field_ref("i32"), literal(-1))}));
+    ASSERT_OK_AND_ASSIGN(
+        auto right_filter,
+        MakeExecNode("filter", plan.get(), {right_source},
+                     FilterNodeOptions{less_equal(field_ref("i32"), literal(2))}));
+
+    // left side: [3,  "alfa"], [3,  "alfa"], [12, "alfa"], [3,  "beta"], [7,  "beta"],
+    // [-1, "gama"], [5,  "gama"]
+    // right side: [-2, "alfa"], [-8, "alfa"], [-1, "gama"]
+
+    HashJoinNodeOptions join_opts{JoinType::FULL_OUTER,
+                                  /*left_keys=*/{"str"},
+                                  /*right_keys=*/{"str"}, "l_", "r_"};
+
+    ASSERT_OK_AND_ASSIGN(
+        auto hashjoin,
+        MakeExecNode("hashjoin", plan.get(), {left_filter, right_filter}, join_opts));
+
+    ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin},
+                                                   SinkNodeOptions{&sink_gen}));
+
+    ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
+
+    std::vector<ExecBatch> expected = {
+        ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([
+            [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
+            [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
+            [12, "alfa", -2, "alfa"], [12, "alfa", -8, "alfa"],
+            [3,  "beta", null, null], [7,  "beta", null, null],
+            [-1, "gama", -1, "gama"], [5, "gama", -1, "gama"]])")};
+
+    AssertExecBatchesEqual(hashjoin->output_schema(), result, expected);
+  }
+}
+
+TEST(ExecPlan, RecordBatchReaderSourceSink) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  // set up a RecordBatchReader:
+  auto input = MakeBasicBatches();
+
+  RecordBatchVector batches;
+  for (const ExecBatch& exec_batch : input.batches) {
+    ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema));
+    batches.push_back(batch);
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  std::shared_ptr<RecordBatchReader> reader = std::make_shared<TableBatchReader>(*table);
+
+  // Map the RecordBatchReader to a SourceNode
+  ASSERT_OK_AND_ASSIGN(
+      auto batch_gen,
+      MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool()));
+
+  ASSERT_OK(
+      Declaration::Sequence({
+                                {"source", SourceNodeOptions{table->schema(), batch_gen}},
+                                {"sink", SinkNodeOptions{&sink_gen}},
+                            })
+          .AddToPlan(plan.get()));
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              Finishes(ResultWith(UnorderedElementsAreArray(input.batches))));
+}
+
+}  // namespace compute
+}  // namespace arrow