1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
26 #include "arrow/array.h"
27 #include "arrow/buffer.h"
28 #include "arrow/compute/exec.h"
29 #include "arrow/compute/kernel.h"
30 #include "arrow/status.h"
31 #include "arrow/util/visibility.h"
38 static constexpr int64_t kDefaultMaxChunksize
= std::numeric_limits
<int64_t>::max();
42 /// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel
44 class ARROW_EXPORT ExecBatchIterator
{
46 /// \brief Construct iterator and do basic argument validation
48 /// \param[in] args the Datum argument, must be all array-like or scalar
49 /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending
50 /// on the chunk layout of ChunkedArray.
51 static Result
<std::unique_ptr
<ExecBatchIterator
>> Make(
52 std::vector
<Datum
> args
, int64_t max_chunksize
= kDefaultMaxChunksize
);
54 /// \brief Compute the next batch. Always returns at least one batch. Return
55 /// false if the iterator is exhausted
56 bool Next(ExecBatch
* batch
);
58 int64_t length() const { return length_
; }
60 int64_t position() const { return position_
; }
62 int64_t max_chunksize() const { return max_chunksize_
; }
65 ExecBatchIterator(std::vector
<Datum
> args
, int64_t length
, int64_t max_chunksize
);
67 std::vector
<Datum
> args_
;
68 std::vector
<int> chunk_indexes_
;
69 std::vector
<int64_t> chunk_positions_
;
72 int64_t max_chunksize_
;
75 // "Push" / listener API like IPC reader so that consumers can receive
76 // processed chunks as soon as they're available.
78 class ARROW_EXPORT ExecListener
{
80 virtual ~ExecListener() = default;
82 virtual Status
OnResult(Datum
) { return Status::NotImplemented("OnResult"); }
85 class DatumAccumulator
: public ExecListener
{
87 DatumAccumulator() = default;
89 Status
OnResult(Datum value
) override
{
90 values_
.emplace_back(value
);
94 std::vector
<Datum
> values() { return std::move(values_
); }
97 std::vector
<Datum
> values_
;
100 /// \brief Check that each Datum is of a "value" type, which means either
101 /// SCALAR, ARRAY, or CHUNKED_ARRAY. If there are chunked inputs, then these
102 /// inputs will be split into non-chunked ExecBatch values for execution
103 Status
CheckAllValues(const std::vector
<Datum
>& values
);
105 class ARROW_EXPORT KernelExecutor
{
107 virtual ~KernelExecutor() = default;
109 /// The Kernel's `init` method must be called and any KernelState set in the
110 /// KernelContext *before* KernelExecutor::Init is called. This is to facilitate
111 /// the case where init may be expensive and does not need to be called again for
112 /// each execution of the kernel, for example the same lookup table can be re-used
113 /// for all scanned batches in a dataset filter.
114 virtual Status
Init(KernelContext
*, KernelInitArgs
) = 0;
116 /// XXX: Better configurability for listener
118 virtual Status
Execute(const std::vector
<Datum
>& args
, ExecListener
* listener
) = 0;
120 virtual Datum
WrapResults(const std::vector
<Datum
>& args
,
121 const std::vector
<Datum
>& outputs
) = 0;
123 /// \brief Check the actual result type against the resolved output type
124 virtual Status
CheckResultType(const Datum
& out
, const char* function_name
) = 0;
126 static std::unique_ptr
<KernelExecutor
> MakeScalar();
127 static std::unique_ptr
<KernelExecutor
> MakeVector();
128 static std::unique_ptr
<KernelExecutor
> MakeScalarAggregate();
131 /// \brief Populate validity bitmap with the intersection of the nullity of the
132 /// arguments. If a preallocated bitmap is not provided, then one will be
133 /// allocated if needed (in some cases a bitmap can be zero-copied from the
134 /// arguments). If any Scalar value is null, then the entire validity bitmap
135 /// will be set to null.
137 /// \param[in] ctx kernel execution context, for memory allocation etc.
138 /// \param[in] batch the data batch
139 /// \param[in] out the output ArrayData, must not be null
141 Status
PropagateNulls(KernelContext
* ctx
, const ExecBatch
& batch
, ArrayData
* out
);
143 } // namespace detail
144 } // namespace compute