]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #pragma once | |
19 | ||
20 | #include <cstdint> | |
21 | #include <limits> | |
22 | #include <memory> | |
23 | #include <string> | |
24 | #include <vector> | |
25 | ||
26 | #include "arrow/array.h" | |
27 | #include "arrow/buffer.h" | |
28 | #include "arrow/compute/exec.h" | |
29 | #include "arrow/compute/kernel.h" | |
30 | #include "arrow/status.h" | |
31 | #include "arrow/util/visibility.h" | |
32 | ||
33 | namespace arrow { | |
34 | namespace compute { | |
35 | ||
36 | class Function; | |
37 | ||
38 | static constexpr int64_t kDefaultMaxChunksize = std::numeric_limits<int64_t>::max(); | |
39 | ||
40 | namespace detail { | |
41 | ||
42 | /// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel | |
43 | /// execution | |
44 | class ARROW_EXPORT ExecBatchIterator { | |
45 | public: | |
46 | /// \brief Construct iterator and do basic argument validation | |
47 | /// | |
48 | /// \param[in] args the Datum argument, must be all array-like or scalar | |
49 | /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending | |
50 | /// on the chunk layout of ChunkedArray. | |
51 | static Result<std::unique_ptr<ExecBatchIterator>> Make( | |
52 | std::vector<Datum> args, int64_t max_chunksize = kDefaultMaxChunksize); | |
53 | ||
54 | /// \brief Compute the next batch. Always returns at least one batch. Return | |
55 | /// false if the iterator is exhausted | |
56 | bool Next(ExecBatch* batch); | |
57 | ||
58 | int64_t length() const { return length_; } | |
59 | ||
60 | int64_t position() const { return position_; } | |
61 | ||
62 | int64_t max_chunksize() const { return max_chunksize_; } | |
63 | ||
64 | private: | |
65 | ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t max_chunksize); | |
66 | ||
67 | std::vector<Datum> args_; | |
68 | std::vector<int> chunk_indexes_; | |
69 | std::vector<int64_t> chunk_positions_; | |
70 | int64_t position_; | |
71 | int64_t length_; | |
72 | int64_t max_chunksize_; | |
73 | }; | |
74 | ||
75 | // "Push" / listener API like IPC reader so that consumers can receive | |
76 | // processed chunks as soon as they're available. | |
77 | ||
78 | class ARROW_EXPORT ExecListener { | |
79 | public: | |
80 | virtual ~ExecListener() = default; | |
81 | ||
82 | virtual Status OnResult(Datum) { return Status::NotImplemented("OnResult"); } | |
83 | }; | |
84 | ||
85 | class DatumAccumulator : public ExecListener { | |
86 | public: | |
87 | DatumAccumulator() = default; | |
88 | ||
89 | Status OnResult(Datum value) override { | |
90 | values_.emplace_back(value); | |
91 | return Status::OK(); | |
92 | } | |
93 | ||
94 | std::vector<Datum> values() { return std::move(values_); } | |
95 | ||
96 | private: | |
97 | std::vector<Datum> values_; | |
98 | }; | |
99 | ||
100 | /// \brief Check that each Datum is of a "value" type, which means either | |
101 | /// SCALAR, ARRAY, or CHUNKED_ARRAY. If there are chunked inputs, then these | |
102 | /// inputs will be split into non-chunked ExecBatch values for execution | |
103 | Status CheckAllValues(const std::vector<Datum>& values); | |
104 | ||
105 | class ARROW_EXPORT KernelExecutor { | |
106 | public: | |
107 | virtual ~KernelExecutor() = default; | |
108 | ||
109 | /// The Kernel's `init` method must be called and any KernelState set in the | |
110 | /// KernelContext *before* KernelExecutor::Init is called. This is to facilitate | |
111 | /// the case where init may be expensive and does not need to be called again for | |
112 | /// each execution of the kernel, for example the same lookup table can be re-used | |
113 | /// for all scanned batches in a dataset filter. | |
114 | virtual Status Init(KernelContext*, KernelInitArgs) = 0; | |
115 | ||
116 | /// XXX: Better configurability for listener | |
117 | /// Not thread-safe | |
118 | virtual Status Execute(const std::vector<Datum>& args, ExecListener* listener) = 0; | |
119 | ||
120 | virtual Datum WrapResults(const std::vector<Datum>& args, | |
121 | const std::vector<Datum>& outputs) = 0; | |
122 | ||
123 | /// \brief Check the actual result type against the resolved output type | |
124 | virtual Status CheckResultType(const Datum& out, const char* function_name) = 0; | |
125 | ||
126 | static std::unique_ptr<KernelExecutor> MakeScalar(); | |
127 | static std::unique_ptr<KernelExecutor> MakeVector(); | |
128 | static std::unique_ptr<KernelExecutor> MakeScalarAggregate(); | |
129 | }; | |
130 | ||
131 | /// \brief Populate validity bitmap with the intersection of the nullity of the | |
132 | /// arguments. If a preallocated bitmap is not provided, then one will be | |
133 | /// allocated if needed (in some cases a bitmap can be zero-copied from the | |
134 | /// arguments). If any Scalar value is null, then the entire validity bitmap | |
135 | /// will be set to null. | |
136 | /// | |
137 | /// \param[in] ctx kernel execution context, for memory allocation etc. | |
138 | /// \param[in] batch the data batch | |
139 | /// \param[in] out the output ArrayData, must not be null | |
140 | ARROW_EXPORT | |
141 | Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* out); | |
142 | ||
143 | } // namespace detail | |
144 | } // namespace compute | |
145 | } // namespace arrow |