]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/compute/exec_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / compute / exec_test.cc
diff --git a/ceph/src/arrow/cpp/src/arrow/compute/exec_test.cc b/ceph/src/arrow/cpp/src/arrow/compute/exec_test.cc
new file mode 100644 (file)
index 0000000..3769517
--- /dev/null
@@ -0,0 +1,891 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/data.h"
+#include "arrow/buffer.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/function_internal.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/compute/registry.h"
+#include "arrow/memory_pool.h"
+#include "arrow/scalar.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+namespace detail {
+
+using ::arrow::internal::BitmapEquals;
+using ::arrow::internal::CopyBitmap;
+using ::arrow::internal::CountSetBits;
+
+TEST(ExecContext, BasicWorkings) {
+  {
+    ExecContext ctx;
+    ASSERT_EQ(GetFunctionRegistry(), ctx.func_registry());
+    ASSERT_EQ(default_memory_pool(), ctx.memory_pool());
+    ASSERT_EQ(std::numeric_limits<int64_t>::max(), ctx.exec_chunksize());
+
+    ASSERT_TRUE(ctx.use_threads());
+    ASSERT_EQ(arrow::internal::CpuInfo::GetInstance(), ctx.cpu_info());
+  }
+
+  // Now, let's customize all the things
+  LoggingMemoryPool my_pool(default_memory_pool());
+  std::unique_ptr<FunctionRegistry> custom_reg = FunctionRegistry::Make();
+  ExecContext ctx(&my_pool, /*executor=*/nullptr, custom_reg.get());
+
+  ASSERT_EQ(custom_reg.get(), ctx.func_registry());
+  ASSERT_EQ(&my_pool, ctx.memory_pool());
+
+  ctx.set_exec_chunksize(1 << 20);
+  ASSERT_EQ(1 << 20, ctx.exec_chunksize());
+
+  ctx.set_use_threads(false);
+  ASSERT_FALSE(ctx.use_threads());
+}
+
+TEST(SelectionVector, Basics) {
+  auto indices = ArrayFromJSON(int32(), "[0, 3]");
+  auto sel_vector = std::make_shared<SelectionVector>(*indices);
+
+  ASSERT_EQ(indices->length(), sel_vector->length());
+  ASSERT_EQ(3, sel_vector->indices()[1]);
+}
+
+void AssertValidityZeroExtraBits(const ArrayData& arr) {
+  const Buffer& buf = *arr.buffers[0];
+
+  const int64_t bit_extent = ((arr.offset + arr.length + 7) / 8) * 8;
+  for (int64_t i = arr.offset + arr.length; i < bit_extent; ++i) {
+    EXPECT_FALSE(BitUtil::GetBit(buf.data(), i)) << i;
+  }
+}
+
+class TestComputeInternals : public ::testing::Test {
+ public:
+  void SetUp() {
+    rng_.reset(new random::RandomArrayGenerator(/*seed=*/0));
+    ResetContexts();
+  }
+
+  void ResetContexts() {
+    exec_ctx_.reset(new ExecContext(default_memory_pool()));
+    ctx_.reset(new KernelContext(exec_ctx_.get()));
+  }
+
+  std::shared_ptr<Array> GetUInt8Array(int64_t size, double null_probability = 0.1) {
+    return rng_->UInt8(size, /*min=*/0, /*max=*/100, null_probability);
+  }
+
+  std::shared_ptr<Array> GetInt32Array(int64_t size, double null_probability = 0.1) {
+    return rng_->Int32(size, /*min=*/0, /*max=*/1000, null_probability);
+  }
+
+  std::shared_ptr<Array> GetFloat64Array(int64_t size, double null_probability = 0.1) {
+    return rng_->Float64(size, /*min=*/0, /*max=*/1000, null_probability);
+  }
+
+  std::shared_ptr<ChunkedArray> GetInt32Chunked(const std::vector<int>& sizes) {
+    std::vector<std::shared_ptr<Array>> chunks;
+    for (auto size : sizes) {
+      chunks.push_back(GetInt32Array(size));
+    }
+    return std::make_shared<ChunkedArray>(std::move(chunks));
+  }
+
+ protected:
+  std::unique_ptr<ExecContext> exec_ctx_;
+  std::unique_ptr<KernelContext> ctx_;
+  std::unique_ptr<random::RandomArrayGenerator> rng_;
+};
+
+class TestPropagateNulls : public TestComputeInternals {};
+
+TEST_F(TestPropagateNulls, UnknownNullCountWithNullsZeroCopies) {
+  const int64_t length = 16;
+
+  constexpr uint8_t validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
+  auto nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+
+  ArrayData output(boolean(), length, {nullptr, nullptr});
+  ArrayData input(boolean(), length, {nulls, nullptr}, kUnknownNullCount);
+
+  ExecBatch batch({input}, length);
+  ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+  ASSERT_EQ(nulls.get(), output.buffers[0].get());
+  ASSERT_EQ(kUnknownNullCount, output.null_count);
+  ASSERT_EQ(9, output.GetNullCount());
+}
+
+TEST_F(TestPropagateNulls, UnknownNullCountWithoutNulls) {
+  const int64_t length = 16;
+  constexpr uint8_t validity_bitmap[8] = {255, 255, 0, 0, 0, 0, 0, 0};
+  auto nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+
+  ArrayData output(boolean(), length, {nullptr, nullptr});
+  ArrayData input(boolean(), length, {nulls, nullptr}, kUnknownNullCount);
+
+  ExecBatch batch({input}, length);
+  ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+  EXPECT_EQ(-1, output.null_count);
+  EXPECT_EQ(nulls.get(), output.buffers[0].get());
+}
+
+TEST_F(TestPropagateNulls, SetAllNulls) {
+  const int64_t length = 16;
+
+  auto CheckSetAllNull = [&](std::vector<Datum> values, bool preallocate) {
+    // Make fresh bitmap with all 1's
+    uint8_t bitmap_data[2] = {255, 255};
+    auto preallocated_mem = std::make_shared<MutableBuffer>(bitmap_data, 2);
+
+    std::vector<std::shared_ptr<Buffer>> buffers(2);
+    if (preallocate) {
+      buffers[0] = preallocated_mem;
+    }
+
+    ArrayData output(boolean(), length, buffers);
+
+    ExecBatch batch(values, length);
+    ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+
+    if (preallocate) {
+      // Ensure that buffer object the same when we pass in preallocated memory
+      ASSERT_EQ(preallocated_mem.get(), output.buffers[0].get());
+    }
+    ASSERT_NE(nullptr, output.buffers[0]);
+    uint8_t expected[2] = {0, 0};
+    const Buffer& out_buf = *output.buffers[0];
+    ASSERT_EQ(0, std::memcmp(out_buf.data(), expected, out_buf.size()));
+  };
+
+  // There is a null scalar
+  std::shared_ptr<Scalar> i32_val = std::make_shared<Int32Scalar>(3);
+  std::vector<Datum> vals = {i32_val, MakeNullScalar(boolean())};
+  CheckSetAllNull(vals, true);
+  CheckSetAllNull(vals, false);
+
+  const double true_prob = 0.5;
+
+  vals[0] = rng_->Boolean(length, true_prob);
+  CheckSetAllNull(vals, true);
+  CheckSetAllNull(vals, false);
+
+  auto arr_all_nulls = rng_->Boolean(length, true_prob, /*null_probability=*/1);
+
+  // One value is all null
+  vals = {rng_->Boolean(length, true_prob, /*null_probability=*/0.5), arr_all_nulls};
+  CheckSetAllNull(vals, true);
+  CheckSetAllNull(vals, false);
+
+  // A value is NullType
+  std::shared_ptr<Array> null_arr = std::make_shared<NullArray>(length);
+  vals = {rng_->Boolean(length, true_prob), null_arr};
+  CheckSetAllNull(vals, true);
+  CheckSetAllNull(vals, false);
+
+  // Other nitty-gritty scenarios
+  {
+    // An all-null bitmap is zero-copied over, even though there is a
+    // null-scalar earlier in the batch
+    ArrayData output(boolean(), length, {nullptr, nullptr});
+    ExecBatch batch({MakeNullScalar(boolean()), arr_all_nulls}, length);
+    ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+    ASSERT_EQ(arr_all_nulls->data()->buffers[0].get(), output.buffers[0].get());
+  }
+}
+
+TEST_F(TestPropagateNulls, SingleValueWithNulls) {
+  // Input offset is non-zero (0 mod 8 and nonzero mod 8 cases)
+  const int64_t length = 100;
+  auto arr = rng_->Boolean(length, 0.5, /*null_probability=*/0.5);
+
+  auto CheckSliced = [&](int64_t offset, bool preallocate = false,
+                         int64_t out_offset = 0) {
+    // Unaligned bitmap, zero copy not possible
+    auto sliced = arr->Slice(offset);
+    std::vector<Datum> vals = {sliced};
+
+    ArrayData output(boolean(), vals[0].length(), {nullptr, nullptr});
+    output.offset = out_offset;
+
+    ExecBatch batch(vals, vals[0].length());
+
+    std::shared_ptr<Buffer> preallocated_bitmap;
+    if (preallocate) {
+      ASSERT_OK_AND_ASSIGN(
+          preallocated_bitmap,
+          AllocateBuffer(BitUtil::BytesForBits(sliced->length() + out_offset)));
+      std::memset(preallocated_bitmap->mutable_data(), 0, preallocated_bitmap->size());
+      output.buffers[0] = preallocated_bitmap;
+    } else {
+      ASSERT_EQ(0, output.offset);
+    }
+
+    ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+
+    if (!preallocate) {
+      const Buffer* parent_buf = arr->data()->buffers[0].get();
+      if (offset == 0) {
+        // Validity bitmap same, no slice
+        ASSERT_EQ(parent_buf, output.buffers[0].get());
+      } else if (offset % 8 == 0) {
+        // Validity bitmap sliced
+        ASSERT_NE(parent_buf, output.buffers[0].get());
+        ASSERT_EQ(parent_buf, output.buffers[0]->parent().get());
+      } else {
+        // New memory for offset not 0 mod 8
+        ASSERT_NE(parent_buf, output.buffers[0].get());
+        ASSERT_EQ(nullptr, output.buffers[0]->parent());
+      }
+    } else {
+      // preallocated, so check that the validity bitmap is unbothered
+      ASSERT_EQ(preallocated_bitmap.get(), output.buffers[0].get());
+    }
+
+    ASSERT_EQ(arr->Slice(offset)->null_count(), output.GetNullCount());
+
+    ASSERT_TRUE(BitmapEquals(output.buffers[0]->data(), output.offset,
+                             sliced->null_bitmap_data(), sliced->offset(),
+                             output.length));
+    AssertValidityZeroExtraBits(output);
+  };
+
+  CheckSliced(8);
+  CheckSliced(7);
+  CheckSliced(8, /*preallocated=*/true);
+  CheckSliced(7, true);
+  CheckSliced(8, true, /*offset=*/4);
+  CheckSliced(7, true, 4);
+}
+
+TEST_F(TestPropagateNulls, ZeroCopyWhenZeroNullsOnOneInput) {
+  const int64_t length = 16;
+
+  constexpr uint8_t validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
+  auto nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+
+  ArrayData some_nulls(boolean(), 16, {nulls, nullptr}, /*null_count=*/9);
+  ArrayData no_nulls(boolean(), length, {nullptr, nullptr}, /*null_count=*/0);
+
+  ArrayData output(boolean(), length, {nullptr, nullptr});
+  ExecBatch batch({some_nulls, no_nulls}, length);
+  ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+  ASSERT_EQ(nulls.get(), output.buffers[0].get());
+  ASSERT_EQ(9, output.null_count);
+
+  // Flip order of args
+  output = ArrayData(boolean(), length, {nullptr, nullptr});
+  batch.values = {no_nulls, no_nulls, some_nulls};
+  ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+  ASSERT_EQ(nulls.get(), output.buffers[0].get());
+  ASSERT_EQ(9, output.null_count);
+
+  // Check that preallocated memory is not clobbered
+  uint8_t bitmap_data[2] = {0, 0};
+  auto preallocated_mem = std::make_shared<MutableBuffer>(bitmap_data, 2);
+  output.null_count = kUnknownNullCount;
+  output.buffers[0] = preallocated_mem;
+  ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+
+  ASSERT_EQ(preallocated_mem.get(), output.buffers[0].get());
+  ASSERT_EQ(9, output.null_count);
+  ASSERT_EQ(254, bitmap_data[0]);
+  ASSERT_EQ(0, bitmap_data[1]);
+}
+
+TEST_F(TestPropagateNulls, IntersectsNulls) {
+  const int64_t length = 16;
+
+  // 0b01111111 0b11001111
+  constexpr uint8_t bitmap1[8] = {127, 207, 0, 0, 0, 0, 0, 0};
+
+  // 0b11111110 0b01111111
+  constexpr uint8_t bitmap2[8] = {254, 127, 0, 0, 0, 0, 0, 0};
+
+  // 0b11101111 0b11111110
+  constexpr uint8_t bitmap3[8] = {239, 254, 0, 0, 0, 0, 0, 0};
+
+  ArrayData arr1(boolean(), length, {std::make_shared<Buffer>(bitmap1, 8), nullptr});
+  ArrayData arr2(boolean(), length, {std::make_shared<Buffer>(bitmap2, 8), nullptr});
+  ArrayData arr3(boolean(), length, {std::make_shared<Buffer>(bitmap3, 8), nullptr});
+
+  auto CheckCase = [&](std::vector<Datum> values, int64_t ex_null_count,
+                       const uint8_t* ex_bitmap, bool preallocate = false,
+                       int64_t output_offset = 0) {
+    ExecBatch batch(values, length);
+
+    std::shared_ptr<Buffer> nulls;
+    if (preallocate) {
+      // Make the buffer one byte bigger so we can have non-zero offsets
+      ASSERT_OK_AND_ASSIGN(nulls, AllocateBuffer(3));
+      std::memset(nulls->mutable_data(), 0, nulls->size());
+    } else {
+      // non-zero output offset not permitted unless the output memory is
+      // preallocated
+      ASSERT_EQ(0, output_offset);
+    }
+    ArrayData output(boolean(), length, {nulls, nullptr});
+    output.offset = output_offset;
+
+    ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+
+    // Preallocated memory used
+    if (preallocate) {
+      ASSERT_EQ(nulls.get(), output.buffers[0].get());
+    }
+
+    EXPECT_EQ(kUnknownNullCount, output.null_count);
+    EXPECT_EQ(ex_null_count, output.GetNullCount());
+
+    const auto& out_buffer = *output.buffers[0];
+
+    ASSERT_TRUE(BitmapEquals(out_buffer.data(), output_offset, ex_bitmap,
+                             /*ex_offset=*/0, length));
+
+    // Now check that the rest of the bits in out_buffer are still 0
+    AssertValidityZeroExtraBits(output);
+  };
+
+  // 0b01101110 0b01001110
+  uint8_t expected1[2] = {110, 78};
+  CheckCase({arr1, arr2, arr3}, 7, expected1);
+  CheckCase({arr1, arr2, arr3}, 7, expected1, /*preallocate=*/true);
+  CheckCase({arr1, arr2, arr3}, 7, expected1, /*preallocate=*/true,
+            /*output_offset=*/4);
+
+  // 0b01111110 0b01001111
+  uint8_t expected2[2] = {126, 79};
+  CheckCase({arr1, arr2}, 5, expected2);
+  CheckCase({arr1, arr2}, 5, expected2, /*preallocate=*/true,
+            /*output_offset=*/4);
+}
+
+TEST_F(TestPropagateNulls, NullOutputTypeNoop) {
+  // Ensure we leave the buffers alone when the output type is null()
+  const int64_t length = 100;
+  ExecBatch batch({rng_->Boolean(100, 0.5, 0.5)}, length);
+
+  ArrayData output(null(), length, {nullptr});
+  ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
+  ASSERT_EQ(nullptr, output.buffers[0]);
+}
+
+// ----------------------------------------------------------------------
+// ExecBatchIterator
+
+class TestExecBatchIterator : public TestComputeInternals {
+ public:
+  void SetupIterator(std::vector<Datum> args,
+                     int64_t max_chunksize = kDefaultMaxChunksize) {
+    ASSERT_OK_AND_ASSIGN(iterator_,
+                         ExecBatchIterator::Make(std::move(args), max_chunksize));
+  }
+  void CheckIteration(const std::vector<Datum>& args, int chunksize,
+                      const std::vector<int>& ex_batch_sizes) {
+    SetupIterator(args, chunksize);
+    ExecBatch batch;
+    int64_t position = 0;
+    for (size_t i = 0; i < ex_batch_sizes.size(); ++i) {
+      ASSERT_EQ(position, iterator_->position());
+      ASSERT_TRUE(iterator_->Next(&batch));
+      ASSERT_EQ(ex_batch_sizes[i], batch.length);
+
+      for (size_t j = 0; j < args.size(); ++j) {
+        switch (args[j].kind()) {
+          case Datum::SCALAR:
+            ASSERT_TRUE(args[j].scalar()->Equals(batch[j].scalar()));
+            break;
+          case Datum::ARRAY:
+            AssertArraysEqual(*args[j].make_array()->Slice(position, batch.length),
+                              *batch[j].make_array());
+            break;
+          case Datum::CHUNKED_ARRAY: {
+            const ChunkedArray& carr = *args[j].chunked_array();
+            if (batch.length == 0) {
+              ASSERT_EQ(0, carr.length());
+            } else {
+              auto arg_slice = carr.Slice(position, batch.length);
+              // The sliced ChunkedArrays should only ever be 1 chunk
+              ASSERT_EQ(1, arg_slice->num_chunks());
+              AssertArraysEqual(*arg_slice->chunk(0), *batch[j].make_array());
+            }
+          } break;
+          default:
+            break;
+        }
+      }
+      position += ex_batch_sizes[i];
+    }
+    // Ensure that the iterator is exhausted
+    ASSERT_FALSE(iterator_->Next(&batch));
+
+    ASSERT_EQ(iterator_->length(), iterator_->position());
+  }
+
+ protected:
+  std::unique_ptr<ExecBatchIterator> iterator_;
+};
+
+TEST_F(TestExecBatchIterator, Basics) {
+  const int64_t length = 100;
+
+  // Simple case with a single chunk
+  std::vector<Datum> args = {Datum(GetInt32Array(length)), Datum(GetFloat64Array(length)),
+                             Datum(std::make_shared<Int32Scalar>(3))};
+  SetupIterator(args);
+
+  ExecBatch batch;
+  ASSERT_TRUE(iterator_->Next(&batch));
+  ASSERT_EQ(3, batch.values.size());
+  ASSERT_EQ(3, batch.num_values());
+  ASSERT_EQ(length, batch.length);
+
+  std::vector<ValueDescr> descrs = batch.GetDescriptors();
+  ASSERT_EQ(ValueDescr::Array(int32()), descrs[0]);
+  ASSERT_EQ(ValueDescr::Array(float64()), descrs[1]);
+  ASSERT_EQ(ValueDescr::Scalar(int32()), descrs[2]);
+
+  AssertArraysEqual(*args[0].make_array(), *batch[0].make_array());
+  AssertArraysEqual(*args[1].make_array(), *batch[1].make_array());
+  ASSERT_TRUE(args[2].scalar()->Equals(batch[2].scalar()));
+
+  ASSERT_EQ(length, iterator_->position());
+  ASSERT_FALSE(iterator_->Next(&batch));
+
+  // Split into chunks of size 16
+  CheckIteration(args, /*chunksize=*/16, {16, 16, 16, 16, 16, 16, 4});
+}
+
+TEST_F(TestExecBatchIterator, InputValidation) {
+  std::vector<Datum> args = {Datum(GetInt32Array(10)), Datum(GetInt32Array(9))};
+  ASSERT_RAISES(Invalid, ExecBatchIterator::Make(args));
+
+  args = {Datum(GetInt32Array(9)), Datum(GetInt32Array(10))};
+  ASSERT_RAISES(Invalid, ExecBatchIterator::Make(args));
+
+  args = {Datum(GetInt32Array(10))};
+  ASSERT_OK_AND_ASSIGN(auto iterator, ExecBatchIterator::Make(args));
+  ASSERT_EQ(10, iterator->max_chunksize());
+}
+
+TEST_F(TestExecBatchIterator, ChunkedArrays) {
+  std::vector<Datum> args = {Datum(GetInt32Chunked({0, 20, 10})),
+                             Datum(GetInt32Chunked({15, 15})), Datum(GetInt32Array(30)),
+                             Datum(std::make_shared<Int32Scalar>(5)),
+                             Datum(MakeNullScalar(boolean()))};
+
+  CheckIteration(args, /*chunksize=*/10, {10, 5, 5, 10});
+  CheckIteration(args, /*chunksize=*/20, {15, 5, 10});
+  CheckIteration(args, /*chunksize=*/30, {15, 5, 10});
+}
+
+TEST_F(TestExecBatchIterator, ZeroLengthInputs) {
+  auto carr = std::shared_ptr<ChunkedArray>(new ChunkedArray({}, int32()));
+
+  auto CheckArgs = [&](const std::vector<Datum>& args) {
+    auto iterator = ExecBatchIterator::Make(args).ValueOrDie();
+    ExecBatch batch;
+    ASSERT_FALSE(iterator->Next(&batch));
+  };
+
+  // Zero-length ChunkedArray with zero chunks
+  std::vector<Datum> args = {Datum(carr)};
+  CheckArgs(args);
+
+  // Zero-length array
+  args = {Datum(GetInt32Array(0))};
+  CheckArgs(args);
+
+  // ChunkedArray with single empty chunk
+  args = {Datum(GetInt32Chunked({0}))};
+  CheckArgs(args);
+}
+
+// ----------------------------------------------------------------------
+// Scalar function execution
+
+Status ExecCopy(KernelContext*, const ExecBatch& batch, Datum* out) {
+  DCHECK_EQ(1, batch.num_values());
+  const auto& type = checked_cast<const FixedWidthType&>(*batch[0].type());
+  int value_size = type.bit_width() / 8;
+
+  const ArrayData& arg0 = *batch[0].array();
+  ArrayData* out_arr = out->mutable_array();
+  uint8_t* dst = out_arr->buffers[1]->mutable_data() + out_arr->offset * value_size;
+  const uint8_t* src = arg0.buffers[1]->data() + arg0.offset * value_size;
+  std::memcpy(dst, src, batch.length * value_size);
+  return Status::OK();
+}
+
+Status ExecComputedBitmap(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+  // Propagate nulls not used. Check that the out bitmap isn't the same already
+  // as the input bitmap
+  const ArrayData& arg0 = *batch[0].array();
+  ArrayData* out_arr = out->mutable_array();
+
+  if (CountSetBits(arg0.buffers[0]->data(), arg0.offset, batch.length) > 0) {
+    // Check that the bitmap has not been already copied over
+    DCHECK(!BitmapEquals(arg0.buffers[0]->data(), arg0.offset,
+                         out_arr->buffers[0]->data(), out_arr->offset, batch.length));
+  }
+
+  CopyBitmap(arg0.buffers[0]->data(), arg0.offset, batch.length,
+             out_arr->buffers[0]->mutable_data(), out_arr->offset);
+  return ExecCopy(ctx, batch, out);
+}
+
+Status ExecNoPreallocatedData(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+  // Validity preallocated, but not the data
+  ArrayData* out_arr = out->mutable_array();
+  DCHECK_EQ(0, out_arr->offset);
+  const auto& type = checked_cast<const FixedWidthType&>(*batch[0].type());
+  int value_size = type.bit_width() / 8;
+  Status s = (ctx->Allocate(out_arr->length * value_size).Value(&out_arr->buffers[1]));
+  DCHECK_OK(s);
+  return ExecCopy(ctx, batch, out);
+}
+
+Status ExecNoPreallocatedAnything(KernelContext* ctx, const ExecBatch& batch,
+                                  Datum* out) {
+  // Neither validity nor data preallocated
+  ArrayData* out_arr = out->mutable_array();
+  DCHECK_EQ(0, out_arr->offset);
+  Status s = (ctx->AllocateBitmap(out_arr->length).Value(&out_arr->buffers[0]));
+  DCHECK_OK(s);
+  const ArrayData& arg0 = *batch[0].array();
+  CopyBitmap(arg0.buffers[0]->data(), arg0.offset, batch.length,
+             out_arr->buffers[0]->mutable_data(), /*offset=*/0);
+
+  // Reuse the kernel that allocates the data
+  return ExecNoPreallocatedData(ctx, batch, out);
+}
+
+class ExampleOptions : public FunctionOptions {
+ public:
+  explicit ExampleOptions(std::shared_ptr<Scalar> value);
+  std::shared_ptr<Scalar> value;
+};
+
+class ExampleOptionsType : public FunctionOptionsType {
+ public:
+  static const FunctionOptionsType* GetInstance() {
+    static std::unique_ptr<FunctionOptionsType> instance(new ExampleOptionsType());
+    return instance.get();
+  }
+  const char* type_name() const override { return "example"; }
+  std::string Stringify(const FunctionOptions& options) const override {
+    return type_name();
+  }
+  bool Compare(const FunctionOptions& options,
+               const FunctionOptions& other) const override {
+    return true;
+  }
+  std::unique_ptr<FunctionOptions> Copy(const FunctionOptions& options) const override {
+    const auto& opts = static_cast<const ExampleOptions&>(options);
+    return arrow::internal::make_unique<ExampleOptions>(opts.value);
+  }
+};
+ExampleOptions::ExampleOptions(std::shared_ptr<Scalar> value)
+    : FunctionOptions(ExampleOptionsType::GetInstance()), value(std::move(value)) {}
+
+struct ExampleState : public KernelState {
+  std::shared_ptr<Scalar> value;
+  explicit ExampleState(std::shared_ptr<Scalar> value) : value(std::move(value)) {}
+};
+
+Result<std::unique_ptr<KernelState>> InitStateful(KernelContext*,
+                                                  const KernelInitArgs& args) {
+  auto func_options = static_cast<const ExampleOptions*>(args.options);
+  return std::unique_ptr<KernelState>(new ExampleState{func_options->value});
+}
+
+Status ExecStateful(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+  // We take the value from the state and multiply the data in batch[0] with it
+  ExampleState* state = static_cast<ExampleState*>(ctx->state());
+  int32_t multiplier = checked_cast<const Int32Scalar&>(*state->value).value;
+
+  const ArrayData& arg0 = *batch[0].array();
+  ArrayData* out_arr = out->mutable_array();
+  const int32_t* arg0_data = arg0.GetValues<int32_t>(1);
+  int32_t* dst = out_arr->GetMutableValues<int32_t>(1);
+  for (int64_t i = 0; i < arg0.length; ++i) {
+    dst[i] = arg0_data[i] * multiplier;
+  }
+  return Status::OK();
+}
+
+Status ExecAddInt32(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+  const Int32Scalar& arg0 = batch[0].scalar_as<Int32Scalar>();
+  const Int32Scalar& arg1 = batch[1].scalar_as<Int32Scalar>();
+  out->value = std::make_shared<Int32Scalar>(arg0.value + arg1.value);
+  return Status::OK();
+}
+
+class TestCallScalarFunction : public TestComputeInternals {
+ protected:
+  static bool initialized_;
+
+  void SetUp() {
+    TestComputeInternals::SetUp();
+
+    if (!initialized_) {
+      initialized_ = true;
+      AddCopyFunctions();
+      AddNoPreallocateFunctions();
+      AddStatefulFunction();
+      AddScalarFunction();
+    }
+  }
+
+  void AddCopyFunctions() {
+    auto registry = GetFunctionRegistry();
+
+    // This function simply copies memory from the input argument into the
+    // (preallocated) output
+    auto func =
+        std::make_shared<ScalarFunction>("test_copy", Arity::Unary(), /*doc=*/nullptr);
+
+    // Add a few kernels. Our implementation only accepts arrays
+    ASSERT_OK(func->AddKernel({InputType::Array(uint8())}, uint8(), ExecCopy));
+    ASSERT_OK(func->AddKernel({InputType::Array(int32())}, int32(), ExecCopy));
+    ASSERT_OK(func->AddKernel({InputType::Array(float64())}, float64(), ExecCopy));
+    ASSERT_OK(registry->AddFunction(func));
+
+    // A version which doesn't want the executor to call PropagateNulls
+    auto func2 = std::make_shared<ScalarFunction>("test_copy_computed_bitmap",
+                                                  Arity::Unary(), /*doc=*/nullptr);
+    ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecComputedBitmap);
+    kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE;
+    ASSERT_OK(func2->AddKernel(kernel));
+    ASSERT_OK(registry->AddFunction(func2));
+  }
+
+  void AddNoPreallocateFunctions() {
+    auto registry = GetFunctionRegistry();
+
+    // A function that allocates its own output memory. We have cases for both
+    // non-preallocated data and non-preallocated validity bitmap
+    auto f1 = std::make_shared<ScalarFunction>("test_nopre_data", Arity::Unary(),
+                                               /*doc=*/nullptr);
+    auto f2 = std::make_shared<ScalarFunction>("test_nopre_validity_or_data",
+                                               Arity::Unary(), /*doc=*/nullptr);
+
+    ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecNoPreallocatedData);
+    kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
+    ASSERT_OK(f1->AddKernel(kernel));
+
+    kernel.exec = ExecNoPreallocatedAnything;
+    kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
+    ASSERT_OK(f2->AddKernel(kernel));
+
+    ASSERT_OK(registry->AddFunction(f1));
+    ASSERT_OK(registry->AddFunction(f2));
+  }
+
+  void AddStatefulFunction() {
+    auto registry = GetFunctionRegistry();
+
+    // This function's behavior depends on a static parameter that is made
+    // available to the kernel's execution function through its Options object
+    auto func = std::make_shared<ScalarFunction>("test_stateful", Arity::Unary(),
+                                                 /*doc=*/nullptr);
+
+    ScalarKernel kernel({InputType::Array(int32())}, int32(), ExecStateful, InitStateful);
+    ASSERT_OK(func->AddKernel(kernel));
+    ASSERT_OK(registry->AddFunction(func));
+  }
+
+  void AddScalarFunction() {
+    auto registry = GetFunctionRegistry();
+
+    auto func = std::make_shared<ScalarFunction>("test_scalar_add_int32", Arity::Binary(),
+                                                 /*doc=*/nullptr);
+    ASSERT_OK(func->AddKernel({InputType::Scalar(int32()), InputType::Scalar(int32())},
+                              int32(), ExecAddInt32));
+    ASSERT_OK(registry->AddFunction(func));
+  }
+};
+
+bool TestCallScalarFunction::initialized_ = false;
+
+TEST_F(TestCallScalarFunction, ArgumentValidation) {
+  // Copy accepts only a single array argument
+  Datum d1(GetInt32Array(10));
+
+  // Too many args
+  std::vector<Datum> args = {d1, d1};
+  ASSERT_RAISES(Invalid, CallFunction("test_copy", args));
+
+  // Too few
+  args = {};
+  ASSERT_RAISES(Invalid, CallFunction("test_copy", args));
+
+  // Cannot do scalar
+  args = {Datum(std::make_shared<Int32Scalar>(5))};
+  ASSERT_RAISES(NotImplemented, CallFunction("test_copy", args));
+}
+
+TEST_F(TestCallScalarFunction, PreallocationCases) {
+  double null_prob = 0.2;
+
+  auto arr = GetUInt8Array(1000, null_prob);
+
+  auto CheckFunction = [&](std::string func_name) {
+    ResetContexts();
+
+    // The default should be a single array output
+    {
+      std::vector<Datum> args = {Datum(arr)};
+      ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args));
+      ASSERT_EQ(Datum::ARRAY, result.kind());
+      AssertArraysEqual(*arr, *result.make_array());
+    }
+
+    // Set the exec_chunksize to be smaller, so now we have several invocations
+    // of the kernel, but still the output is onee array
+    {
+      std::vector<Datum> args = {Datum(arr)};
+      exec_ctx_->set_exec_chunksize(80);
+      ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
+      AssertArraysEqual(*arr, *result.make_array());
+    }
+
+    {
+      // Chunksize not multiple of 8
+      std::vector<Datum> args = {Datum(arr)};
+      exec_ctx_->set_exec_chunksize(111);
+      ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
+      AssertArraysEqual(*arr, *result.make_array());
+    }
+
+    // Input is chunked, output has one big chunk
+    {
+      auto carr = std::shared_ptr<ChunkedArray>(
+          new ChunkedArray({arr->Slice(0, 100), arr->Slice(100)}));
+      std::vector<Datum> args = {Datum(carr)};
+      ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
+      std::shared_ptr<ChunkedArray> actual = result.chunked_array();
+      ASSERT_EQ(1, actual->num_chunks());
+      AssertChunkedEquivalent(*carr, *actual);
+    }
+
+    // Preallocate independently for each batch
+    {
+      std::vector<Datum> args = {Datum(arr)};
+      exec_ctx_->set_preallocate_contiguous(false);
+      exec_ctx_->set_exec_chunksize(400);
+      ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
+      ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
+      const ChunkedArray& carr = *result.chunked_array();
+      ASSERT_EQ(3, carr.num_chunks());
+      AssertArraysEqual(*arr->Slice(0, 400), *carr.chunk(0));
+      AssertArraysEqual(*arr->Slice(400, 400), *carr.chunk(1));
+      AssertArraysEqual(*arr->Slice(800), *carr.chunk(2));
+    }
+  };
+
+  CheckFunction("test_copy");
+  CheckFunction("test_copy_computed_bitmap");
+}
+
+TEST_F(TestCallScalarFunction, BasicNonStandardCases) {
+  // Test a handful of cases
+  //
+  // * Validity bitmap computed by kernel rather than using PropagateNulls
+  // * Data not pre-allocated
+  // * Validity bitmap not pre-allocated
+
+  double null_prob = 0.2;
+
+  auto arr = GetUInt8Array(1000, null_prob);
+  std::vector<Datum> args = {Datum(arr)};
+
+  auto CheckFunction = [&](std::string func_name) {
+    ResetContexts();
+
+    // The default should be a single array output
+    {
+      ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args));
+      AssertArraysEqual(*arr, *result.make_array(), true);
+    }
+
+    // Split execution into 3 chunks
+    {
+      exec_ctx_->set_exec_chunksize(400);
+      ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
+      ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
+      const ChunkedArray& carr = *result.chunked_array();
+      ASSERT_EQ(3, carr.num_chunks());
+      AssertArraysEqual(*arr->Slice(0, 400), *carr.chunk(0));
+      AssertArraysEqual(*arr->Slice(400, 400), *carr.chunk(1));
+      AssertArraysEqual(*arr->Slice(800), *carr.chunk(2));
+    }
+  };
+
+  CheckFunction("test_nopre_data");
+  CheckFunction("test_nopre_validity_or_data");
+}
+
+TEST_F(TestCallScalarFunction, StatefulKernel) {
+  auto input = ArrayFromJSON(int32(), "[1, 2, 3, null, 5]");
+  auto multiplier = std::make_shared<Int32Scalar>(2);
+  auto expected = ArrayFromJSON(int32(), "[2, 4, 6, null, 10]");
+
+  ExampleOptions options(multiplier);
+  std::vector<Datum> args = {Datum(input)};
+  ASSERT_OK_AND_ASSIGN(Datum result, CallFunction("test_stateful", args, &options));
+  AssertArraysEqual(*expected, *result.make_array());
+}
+
+TEST_F(TestCallScalarFunction, ScalarFunction) {
+  std::vector<Datum> args = {Datum(std::make_shared<Int32Scalar>(5)),
+                             Datum(std::make_shared<Int32Scalar>(7))};
+  ASSERT_OK_AND_ASSIGN(Datum result, CallFunction("test_scalar_add_int32", args));
+  ASSERT_EQ(Datum::SCALAR, result.kind());
+
+  auto expected = std::make_shared<Int32Scalar>(12);
+  ASSERT_TRUE(expected->Equals(*result.scalar()));
+}
+
+}  // namespace detail
+}  // namespace compute
+}  // namespace arrow