]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/compute/exec_internal.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / compute / exec_internal.h
diff --git a/ceph/src/arrow/cpp/src/arrow/compute/exec_internal.h b/ceph/src/arrow/cpp/src/arrow/compute/exec_internal.h
new file mode 100644 (file)
index 0000000..74124f0
--- /dev/null
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace compute {
+
+class Function;
+
+static constexpr int64_t kDefaultMaxChunksize = std::numeric_limits<int64_t>::max();
+
+namespace detail {
+
+/// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel
+/// execution
+class ARROW_EXPORT ExecBatchIterator {
+ public:
+  /// \brief Construct iterator and do basic argument validation
+  ///
+  /// \param[in] args the Datum argument, must be all array-like or scalar
+  /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending
+  /// on the chunk layout of ChunkedArray.
+  static Result<std::unique_ptr<ExecBatchIterator>> Make(
+      std::vector<Datum> args, int64_t max_chunksize = kDefaultMaxChunksize);
+
+  /// \brief Compute the next batch. Always returns at least one batch. Return
+  /// false if the iterator is exhausted
+  bool Next(ExecBatch* batch);
+
+  int64_t length() const { return length_; }
+
+  int64_t position() const { return position_; }
+
+  int64_t max_chunksize() const { return max_chunksize_; }
+
+ private:
+  ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t max_chunksize);
+
+  std::vector<Datum> args_;
+  std::vector<int> chunk_indexes_;
+  std::vector<int64_t> chunk_positions_;
+  int64_t position_;
+  int64_t length_;
+  int64_t max_chunksize_;
+};
+
+// "Push" / listener API like IPC reader so that consumers can receive
+// processed chunks as soon as they're available.
+
+class ARROW_EXPORT ExecListener {
+ public:
+  virtual ~ExecListener() = default;
+
+  virtual Status OnResult(Datum) { return Status::NotImplemented("OnResult"); }
+};
+
+class DatumAccumulator : public ExecListener {
+ public:
+  DatumAccumulator() = default;
+
+  Status OnResult(Datum value) override {
+    values_.emplace_back(value);
+    return Status::OK();
+  }
+
+  std::vector<Datum> values() { return std::move(values_); }
+
+ private:
+  std::vector<Datum> values_;
+};
+
+/// \brief Check that each Datum is of a "value" type, which means either
+/// SCALAR, ARRAY, or CHUNKED_ARRAY. If there are chunked inputs, then these
+/// inputs will be split into non-chunked ExecBatch values for execution
+Status CheckAllValues(const std::vector<Datum>& values);
+
+class ARROW_EXPORT KernelExecutor {
+ public:
+  virtual ~KernelExecutor() = default;
+
+  /// The Kernel's `init` method must be called and any KernelState set in the
+  /// KernelContext *before* KernelExecutor::Init is called. This is to facilitate
+  /// the case where init may be expensive and does not need to be called again for
+  /// each execution of the kernel, for example the same lookup table can be re-used
+  /// for all scanned batches in a dataset filter.
+  virtual Status Init(KernelContext*, KernelInitArgs) = 0;
+
+  /// XXX: Better configurability for listener
+  /// Not thread-safe
+  virtual Status Execute(const std::vector<Datum>& args, ExecListener* listener) = 0;
+
+  virtual Datum WrapResults(const std::vector<Datum>& args,
+                            const std::vector<Datum>& outputs) = 0;
+
+  /// \brief Check the actual result type against the resolved output type
+  virtual Status CheckResultType(const Datum& out, const char* function_name) = 0;
+
+  static std::unique_ptr<KernelExecutor> MakeScalar();
+  static std::unique_ptr<KernelExecutor> MakeVector();
+  static std::unique_ptr<KernelExecutor> MakeScalarAggregate();
+};
+
+/// \brief Populate validity bitmap with the intersection of the nullity of the
+/// arguments. If a preallocated bitmap is not provided, then one will be
+/// allocated if needed (in some cases a bitmap can be zero-copied from the
+/// arguments). If any Scalar value is null, then the entire validity bitmap
+/// will be set to null.
+///
+/// \param[in] ctx kernel execution context, for memory allocation etc.
+/// \param[in] batch the data batch
+/// \param[in] out the output ArrayData, must not be null
+ARROW_EXPORT
+Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* out);
+
+}  // namespace detail
+}  // namespace compute
+}  // namespace arrow