--- /dev/null
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace compute {
+
+class Function;
+
+static constexpr int64_t kDefaultMaxChunksize = std::numeric_limits<int64_t>::max();
+
+namespace detail {
+
+/// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel
+/// execution
+class ARROW_EXPORT ExecBatchIterator {
+ public:
+ /// \brief Construct iterator and do basic argument validation
+ ///
+ /// \param[in] args the Datum argument, must be all array-like or scalar
+ /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending
+ /// on the chunk layout of ChunkedArray.
+ static Result<std::unique_ptr<ExecBatchIterator>> Make(
+ std::vector<Datum> args, int64_t max_chunksize = kDefaultMaxChunksize);
+
+ /// \brief Compute the next batch. Always returns at least one batch. Return
+ /// false if the iterator is exhausted
+ bool Next(ExecBatch* batch);
+
+ int64_t length() const { return length_; }
+
+ int64_t position() const { return position_; }
+
+ int64_t max_chunksize() const { return max_chunksize_; }
+
+ private:
+ ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t max_chunksize);
+
+ std::vector<Datum> args_;
+ std::vector<int> chunk_indexes_;
+ std::vector<int64_t> chunk_positions_;
+ int64_t position_;
+ int64_t length_;
+ int64_t max_chunksize_;
+};
+
+// "Push" / listener API like IPC reader so that consumers can receive
+// processed chunks as soon as they're available.
+
+class ARROW_EXPORT ExecListener {
+ public:
+ virtual ~ExecListener() = default;
+
+ virtual Status OnResult(Datum) { return Status::NotImplemented("OnResult"); }
+};
+
+class DatumAccumulator : public ExecListener {
+ public:
+ DatumAccumulator() = default;
+
+ Status OnResult(Datum value) override {
+ values_.emplace_back(value);
+ return Status::OK();
+ }
+
+ std::vector<Datum> values() { return std::move(values_); }
+
+ private:
+ std::vector<Datum> values_;
+};
+
+/// \brief Check that each Datum is of a "value" type, which means either
+/// SCALAR, ARRAY, or CHUNKED_ARRAY. If there are chunked inputs, then these
+/// inputs will be split into non-chunked ExecBatch values for execution
+Status CheckAllValues(const std::vector<Datum>& values);
+
+class ARROW_EXPORT KernelExecutor {
+ public:
+ virtual ~KernelExecutor() = default;
+
+ /// The Kernel's `init` method must be called and any KernelState set in the
+ /// KernelContext *before* KernelExecutor::Init is called. This is to facilitate
+ /// the case where init may be expensive and does not need to be called again for
+ /// each execution of the kernel, for example the same lookup table can be re-used
+ /// for all scanned batches in a dataset filter.
+ virtual Status Init(KernelContext*, KernelInitArgs) = 0;
+
+ /// XXX: Better configurability for listener
+ /// Not thread-safe
+ virtual Status Execute(const std::vector<Datum>& args, ExecListener* listener) = 0;
+
+ virtual Datum WrapResults(const std::vector<Datum>& args,
+ const std::vector<Datum>& outputs) = 0;
+
+ /// \brief Check the actual result type against the resolved output type
+ virtual Status CheckResultType(const Datum& out, const char* function_name) = 0;
+
+ static std::unique_ptr<KernelExecutor> MakeScalar();
+ static std::unique_ptr<KernelExecutor> MakeVector();
+ static std::unique_ptr<KernelExecutor> MakeScalarAggregate();
+};
+
+/// \brief Populate validity bitmap with the intersection of the nullity of the
+/// arguments. If a preallocated bitmap is not provided, then one will be
+/// allocated if needed (in some cases a bitmap can be zero-copied from the
+/// arguments). If any Scalar value is null, then the entire validity bitmap
+/// will be set to null.
+///
+/// \param[in] ctx kernel execution context, for memory allocation etc.
+/// \param[in] batch the data batch
+/// \param[in] out the output ArrayData, must not be null
+ARROW_EXPORT
+Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* out);
+
+} // namespace detail
+} // namespace compute
+} // namespace arrow