]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/compute/exec/util.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / compute / exec / util.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #pragma once
19
20 #include <atomic>
21 #include <cstdint>
22 #include <thread>
23 #include <unordered_map>
24 #include <vector>
25
26 #include "arrow/buffer.h"
27 #include "arrow/compute/type_fwd.h"
28 #include "arrow/memory_pool.h"
29 #include "arrow/result.h"
30 #include "arrow/status.h"
31 #include "arrow/util/bit_util.h"
32 #include "arrow/util/cpu_info.h"
33 #include "arrow/util/logging.h"
34 #include "arrow/util/mutex.h"
35 #include "arrow/util/optional.h"
36 #include "arrow/util/thread_pool.h"
37
38 #if defined(__clang__) || defined(__GNUC__)
39 #define BYTESWAP(x) __builtin_bswap64(x)
40 #define ROTL(x, n) (((x) << (n)) | ((x) >> (32 - (n))))
41 #elif defined(_MSC_VER)
42 #include <intrin.h>
43 #define BYTESWAP(x) _byteswap_uint64(x)
44 #define ROTL(x, n) _rotl((x), (n))
45 #endif
46
47 namespace arrow {
48 namespace util {
49
50 template <typename T>
51 inline void CheckAlignment(const void* ptr) {
52 ARROW_DCHECK(reinterpret_cast<uint64_t>(ptr) % sizeof(T) == 0);
53 }
54
55 // Some platforms typedef int64_t as long int instead of long long int,
56 // which breaks the _mm256_i64gather_epi64 and _mm256_i32gather_epi64 intrinsics
57 // which need long long.
58 // We use the cast to the type below in these intrinsics to make the code
59 // compile in all cases.
60 //
61 using int64_for_gather_t = const long long int; // NOLINT runtime-int
62
63 /// Storage used to allocate temporary vectors of a batch size.
64 /// Temporary vectors should resemble allocating temporary variables on the stack
65 /// but in the context of vectorized processing where we need to store a vector of
66 /// temporaries instead of a single value.
67 class TempVectorStack {
68 template <typename>
69 friend class TempVectorHolder;
70
71 public:
72 Status Init(MemoryPool* pool, int64_t size) {
73 num_vectors_ = 0;
74 top_ = 0;
75 buffer_size_ = size;
76 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool));
77 // Ensure later operations don't accidentally read uninitialized memory.
78 std::memset(buffer->mutable_data(), 0xFF, size);
79 buffer_ = std::move(buffer);
80 return Status::OK();
81 }
82
83 private:
84 int64_t PaddedAllocationSize(int64_t num_bytes) {
85 // Round up allocation size to multiple of 8 bytes
86 // to avoid returning temp vectors with unaligned address.
87 //
88 // Also add padding at the end to facilitate loads and stores
89 // using SIMD when number of vector elements is not divisible
90 // by the number of SIMD lanes.
91 //
92 return ::arrow::BitUtil::RoundUp(num_bytes, sizeof(int64_t)) + kPadding;
93 }
94 void alloc(uint32_t num_bytes, uint8_t** data, int* id) {
95 int64_t old_top = top_;
96 top_ += PaddedAllocationSize(num_bytes) + 2 * sizeof(uint64_t);
97 // Stack overflow check
98 ARROW_DCHECK(top_ <= buffer_size_);
99 *data = buffer_->mutable_data() + old_top + sizeof(uint64_t);
100 // We set 8 bytes before the beginning of the allocated range and
101 // 8 bytes after the end to check for stack overflow (which would
102 // result in those known bytes being corrupted).
103 reinterpret_cast<uint64_t*>(buffer_->mutable_data() + old_top)[0] = kGuard1;
104 reinterpret_cast<uint64_t*>(buffer_->mutable_data() + top_)[-1] = kGuard2;
105 *id = num_vectors_++;
106 }
107 void release(int id, uint32_t num_bytes) {
108 ARROW_DCHECK(num_vectors_ == id + 1);
109 int64_t size = PaddedAllocationSize(num_bytes) + 2 * sizeof(uint64_t);
110 ARROW_DCHECK(reinterpret_cast<const uint64_t*>(buffer_->mutable_data() + top_)[-1] ==
111 kGuard2);
112 ARROW_DCHECK(top_ >= size);
113 top_ -= size;
114 ARROW_DCHECK(reinterpret_cast<const uint64_t*>(buffer_->mutable_data() + top_)[0] ==
115 kGuard1);
116 --num_vectors_;
117 }
118 static constexpr uint64_t kGuard1 = 0x3141592653589793ULL;
119 static constexpr uint64_t kGuard2 = 0x0577215664901532ULL;
120 static constexpr int64_t kPadding = 64;
121 int num_vectors_;
122 int64_t top_;
123 std::unique_ptr<Buffer> buffer_;
124 int64_t buffer_size_;
125 };
126
127 template <typename T>
128 class TempVectorHolder {
129 friend class TempVectorStack;
130
131 public:
132 ~TempVectorHolder() { stack_->release(id_, num_elements_ * sizeof(T)); }
133 T* mutable_data() { return reinterpret_cast<T*>(data_); }
134 TempVectorHolder(TempVectorStack* stack, uint32_t num_elements) {
135 stack_ = stack;
136 num_elements_ = num_elements;
137 stack_->alloc(num_elements * sizeof(T), &data_, &id_);
138 }
139
140 private:
141 TempVectorStack* stack_;
142 uint8_t* data_;
143 int id_;
144 uint32_t num_elements_;
145 };
146
147 class BitUtil {
148 public:
149 static void bits_to_indexes(int bit_to_search, int64_t hardware_flags,
150 const int num_bits, const uint8_t* bits, int* num_indexes,
151 uint16_t* indexes, int bit_offset = 0);
152
153 static void bits_filter_indexes(int bit_to_search, int64_t hardware_flags,
154 const int num_bits, const uint8_t* bits,
155 const uint16_t* input_indexes, int* num_indexes,
156 uint16_t* indexes, int bit_offset = 0);
157
158 // Input and output indexes may be pointing to the same data (in-place filtering).
159 static void bits_split_indexes(int64_t hardware_flags, const int num_bits,
160 const uint8_t* bits, int* num_indexes_bit0,
161 uint16_t* indexes_bit0, uint16_t* indexes_bit1,
162 int bit_offset = 0);
163
164 // Bit 1 is replaced with byte 0xFF.
165 static void bits_to_bytes(int64_t hardware_flags, const int num_bits,
166 const uint8_t* bits, uint8_t* bytes, int bit_offset = 0);
167
168 // Return highest bit of each byte.
169 static void bytes_to_bits(int64_t hardware_flags, const int num_bits,
170 const uint8_t* bytes, uint8_t* bits, int bit_offset = 0);
171
172 static bool are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes,
173 uint32_t num_bytes);
174
175 private:
176 inline static void bits_to_indexes_helper(uint64_t word, uint16_t base_index,
177 int* num_indexes, uint16_t* indexes);
178 inline static void bits_filter_indexes_helper(uint64_t word,
179 const uint16_t* input_indexes,
180 int* num_indexes, uint16_t* indexes);
181 template <int bit_to_search, bool filter_input_indexes>
182 static void bits_to_indexes_internal(int64_t hardware_flags, const int num_bits,
183 const uint8_t* bits, const uint16_t* input_indexes,
184 int* num_indexes, uint16_t* indexes,
185 uint16_t base_index = 0);
186
187 #if defined(ARROW_HAVE_AVX2)
188 static void bits_to_indexes_avx2(int bit_to_search, const int num_bits,
189 const uint8_t* bits, int* num_indexes,
190 uint16_t* indexes, uint16_t base_index = 0);
191 static void bits_filter_indexes_avx2(int bit_to_search, const int num_bits,
192 const uint8_t* bits, const uint16_t* input_indexes,
193 int* num_indexes, uint16_t* indexes);
194 template <int bit_to_search>
195 static void bits_to_indexes_imp_avx2(const int num_bits, const uint8_t* bits,
196 int* num_indexes, uint16_t* indexes,
197 uint16_t base_index = 0);
198 template <int bit_to_search>
199 static void bits_filter_indexes_imp_avx2(const int num_bits, const uint8_t* bits,
200 const uint16_t* input_indexes,
201 int* num_indexes, uint16_t* indexes);
202 static void bits_to_bytes_avx2(const int num_bits, const uint8_t* bits, uint8_t* bytes);
203 static void bytes_to_bits_avx2(const int num_bits, const uint8_t* bytes, uint8_t* bits);
204 static bool are_all_bytes_zero_avx2(const uint8_t* bytes, uint32_t num_bytes);
205 #endif
206 };
207
208 } // namespace util
209 namespace compute {
210
211 ARROW_EXPORT
212 Status ValidateExecNodeInputs(ExecPlan* plan, const std::vector<ExecNode*>& inputs,
213 int expected_num_inputs, const char* kind_name);
214
215 ARROW_EXPORT
216 Result<std::shared_ptr<Table>> TableFromExecBatches(
217 const std::shared_ptr<Schema>& schema, const std::vector<ExecBatch>& exec_batches);
218
219 class AtomicCounter {
220 public:
221 AtomicCounter() = default;
222
223 int count() const { return count_.load(); }
224
225 util::optional<int> total() const {
226 int total = total_.load();
227 if (total == -1) return {};
228 return total;
229 }
230
231 // return true if the counter is complete
232 bool Increment() {
233 DCHECK_NE(count_.load(), total_.load());
234 int count = count_.fetch_add(1) + 1;
235 if (count != total_.load()) return false;
236 return DoneOnce();
237 }
238
239 // return true if the counter is complete
240 bool SetTotal(int total) {
241 total_.store(total);
242 if (count_.load() != total) return false;
243 return DoneOnce();
244 }
245
246 // return true if the counter has not already been completed
247 bool Cancel() { return DoneOnce(); }
248
249 // return true if the counter has finished or been cancelled
250 bool Completed() { return complete_.load(); }
251
252 private:
253 // ensure there is only one true return from Increment(), SetTotal(), or Cancel()
254 bool DoneOnce() {
255 bool expected = false;
256 return complete_.compare_exchange_strong(expected, true);
257 }
258
259 std::atomic<int> count_{0}, total_{-1};
260 std::atomic<bool> complete_{false};
261 };
262
263 class ThreadIndexer {
264 public:
265 size_t operator()();
266
267 static size_t Capacity();
268
269 private:
270 static size_t Check(size_t thread_index);
271
272 util::Mutex mutex_;
273 std::unordered_map<std::thread::id, size_t> id_to_index_;
274 };
275
276 } // namespace compute
277 } // namespace arrow