]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/compute/exec_internal.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / compute / exec_internal.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #pragma once
19
20 #include <cstdint>
21 #include <limits>
22 #include <memory>
23 #include <string>
24 #include <vector>
25
26 #include "arrow/array.h"
27 #include "arrow/buffer.h"
28 #include "arrow/compute/exec.h"
29 #include "arrow/compute/kernel.h"
30 #include "arrow/status.h"
31 #include "arrow/util/visibility.h"
32
33 namespace arrow {
34 namespace compute {
35
36 class Function;
37
38 static constexpr int64_t kDefaultMaxChunksize = std::numeric_limits<int64_t>::max();
39
40 namespace detail {
41
42 /// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel
43 /// execution
44 class ARROW_EXPORT ExecBatchIterator {
45 public:
46 /// \brief Construct iterator and do basic argument validation
47 ///
48 /// \param[in] args the Datum argument, must be all array-like or scalar
49 /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending
50 /// on the chunk layout of ChunkedArray.
51 static Result<std::unique_ptr<ExecBatchIterator>> Make(
52 std::vector<Datum> args, int64_t max_chunksize = kDefaultMaxChunksize);
53
54 /// \brief Compute the next batch. Always returns at least one batch. Return
55 /// false if the iterator is exhausted
56 bool Next(ExecBatch* batch);
57
58 int64_t length() const { return length_; }
59
60 int64_t position() const { return position_; }
61
62 int64_t max_chunksize() const { return max_chunksize_; }
63
64 private:
65 ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t max_chunksize);
66
67 std::vector<Datum> args_;
68 std::vector<int> chunk_indexes_;
69 std::vector<int64_t> chunk_positions_;
70 int64_t position_;
71 int64_t length_;
72 int64_t max_chunksize_;
73 };
74
75 // "Push" / listener API like IPC reader so that consumers can receive
76 // processed chunks as soon as they're available.
77
78 class ARROW_EXPORT ExecListener {
79 public:
80 virtual ~ExecListener() = default;
81
82 virtual Status OnResult(Datum) { return Status::NotImplemented("OnResult"); }
83 };
84
85 class DatumAccumulator : public ExecListener {
86 public:
87 DatumAccumulator() = default;
88
89 Status OnResult(Datum value) override {
90 values_.emplace_back(value);
91 return Status::OK();
92 }
93
94 std::vector<Datum> values() { return std::move(values_); }
95
96 private:
97 std::vector<Datum> values_;
98 };
99
100 /// \brief Check that each Datum is of a "value" type, which means either
101 /// SCALAR, ARRAY, or CHUNKED_ARRAY. If there are chunked inputs, then these
102 /// inputs will be split into non-chunked ExecBatch values for execution
103 Status CheckAllValues(const std::vector<Datum>& values);
104
105 class ARROW_EXPORT KernelExecutor {
106 public:
107 virtual ~KernelExecutor() = default;
108
109 /// The Kernel's `init` method must be called and any KernelState set in the
110 /// KernelContext *before* KernelExecutor::Init is called. This is to facilitate
111 /// the case where init may be expensive and does not need to be called again for
112 /// each execution of the kernel, for example the same lookup table can be re-used
113 /// for all scanned batches in a dataset filter.
114 virtual Status Init(KernelContext*, KernelInitArgs) = 0;
115
116 /// XXX: Better configurability for listener
117 /// Not thread-safe
118 virtual Status Execute(const std::vector<Datum>& args, ExecListener* listener) = 0;
119
120 virtual Datum WrapResults(const std::vector<Datum>& args,
121 const std::vector<Datum>& outputs) = 0;
122
123 /// \brief Check the actual result type against the resolved output type
124 virtual Status CheckResultType(const Datum& out, const char* function_name) = 0;
125
126 static std::unique_ptr<KernelExecutor> MakeScalar();
127 static std::unique_ptr<KernelExecutor> MakeVector();
128 static std::unique_ptr<KernelExecutor> MakeScalarAggregate();
129 };
130
131 /// \brief Populate validity bitmap with the intersection of the nullity of the
132 /// arguments. If a preallocated bitmap is not provided, then one will be
133 /// allocated if needed (in some cases a bitmap can be zero-copied from the
134 /// arguments). If any Scalar value is null, then the entire validity bitmap
135 /// will be set to null.
136 ///
137 /// \param[in] ctx kernel execution context, for memory allocation etc.
138 /// \param[in] batch the data batch
139 /// \param[in] out the output ArrayData, must not be null
140 ARROW_EXPORT
141 Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* out);
142
143 } // namespace detail
144 } // namespace compute
145 } // namespace arrow