]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/compute/exec_internal.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / compute / exec_internal.h
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#pragma once
19
20#include <cstdint>
21#include <limits>
22#include <memory>
23#include <string>
24#include <vector>
25
26#include "arrow/array.h"
27#include "arrow/buffer.h"
28#include "arrow/compute/exec.h"
29#include "arrow/compute/kernel.h"
30#include "arrow/status.h"
31#include "arrow/util/visibility.h"
32
33namespace arrow {
34namespace compute {
35
36class Function;
37
38static constexpr int64_t kDefaultMaxChunksize = std::numeric_limits<int64_t>::max();
39
40namespace detail {
41
42/// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel
43/// execution
44class ARROW_EXPORT ExecBatchIterator {
45 public:
46 /// \brief Construct iterator and do basic argument validation
47 ///
48 /// \param[in] args the Datum argument, must be all array-like or scalar
49 /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending
50 /// on the chunk layout of ChunkedArray.
51 static Result<std::unique_ptr<ExecBatchIterator>> Make(
52 std::vector<Datum> args, int64_t max_chunksize = kDefaultMaxChunksize);
53
54 /// \brief Compute the next batch. Always returns at least one batch. Return
55 /// false if the iterator is exhausted
56 bool Next(ExecBatch* batch);
57
58 int64_t length() const { return length_; }
59
60 int64_t position() const { return position_; }
61
62 int64_t max_chunksize() const { return max_chunksize_; }
63
64 private:
65 ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t max_chunksize);
66
67 std::vector<Datum> args_;
68 std::vector<int> chunk_indexes_;
69 std::vector<int64_t> chunk_positions_;
70 int64_t position_;
71 int64_t length_;
72 int64_t max_chunksize_;
73};
74
75// "Push" / listener API like IPC reader so that consumers can receive
76// processed chunks as soon as they're available.
77
78class ARROW_EXPORT ExecListener {
79 public:
80 virtual ~ExecListener() = default;
81
82 virtual Status OnResult(Datum) { return Status::NotImplemented("OnResult"); }
83};
84
85class DatumAccumulator : public ExecListener {
86 public:
87 DatumAccumulator() = default;
88
89 Status OnResult(Datum value) override {
90 values_.emplace_back(value);
91 return Status::OK();
92 }
93
94 std::vector<Datum> values() { return std::move(values_); }
95
96 private:
97 std::vector<Datum> values_;
98};
99
100/// \brief Check that each Datum is of a "value" type, which means either
101/// SCALAR, ARRAY, or CHUNKED_ARRAY. If there are chunked inputs, then these
102/// inputs will be split into non-chunked ExecBatch values for execution
103Status CheckAllValues(const std::vector<Datum>& values);
104
105class ARROW_EXPORT KernelExecutor {
106 public:
107 virtual ~KernelExecutor() = default;
108
109 /// The Kernel's `init` method must be called and any KernelState set in the
110 /// KernelContext *before* KernelExecutor::Init is called. This is to facilitate
111 /// the case where init may be expensive and does not need to be called again for
112 /// each execution of the kernel, for example the same lookup table can be re-used
113 /// for all scanned batches in a dataset filter.
114 virtual Status Init(KernelContext*, KernelInitArgs) = 0;
115
116 /// XXX: Better configurability for listener
117 /// Not thread-safe
118 virtual Status Execute(const std::vector<Datum>& args, ExecListener* listener) = 0;
119
120 virtual Datum WrapResults(const std::vector<Datum>& args,
121 const std::vector<Datum>& outputs) = 0;
122
123 /// \brief Check the actual result type against the resolved output type
124 virtual Status CheckResultType(const Datum& out, const char* function_name) = 0;
125
126 static std::unique_ptr<KernelExecutor> MakeScalar();
127 static std::unique_ptr<KernelExecutor> MakeVector();
128 static std::unique_ptr<KernelExecutor> MakeScalarAggregate();
129};
130
131/// \brief Populate validity bitmap with the intersection of the nullity of the
132/// arguments. If a preallocated bitmap is not provided, then one will be
133/// allocated if needed (in some cases a bitmap can be zero-copied from the
134/// arguments). If any Scalar value is null, then the entire validity bitmap
135/// will be set to null.
136///
137/// \param[in] ctx kernel execution context, for memory allocation etc.
138/// \param[in] batch the data batch
139/// \param[in] out the output ArrayData, must not be null
140ARROW_EXPORT
141Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* out);
142
143} // namespace detail
144} // namespace compute
145} // namespace arrow