]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/compute/exec/util.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
23 #include <unordered_map>
26 #include "arrow/buffer.h"
27 #include "arrow/compute/type_fwd.h"
28 #include "arrow/memory_pool.h"
29 #include "arrow/result.h"
30 #include "arrow/status.h"
31 #include "arrow/util/bit_util.h"
32 #include "arrow/util/cpu_info.h"
33 #include "arrow/util/logging.h"
34 #include "arrow/util/mutex.h"
35 #include "arrow/util/optional.h"
36 #include "arrow/util/thread_pool.h"
38 #if defined(__clang__) || defined(__GNUC__)
39 #define BYTESWAP(x) __builtin_bswap64(x)
40 #define ROTL(x, n) (((x) << (n)) | ((x) >> (32 - (n))))
41 #elif defined(_MSC_VER)
43 #define BYTESWAP(x) _byteswap_uint64(x)
44 #define ROTL(x, n) _rotl((x), (n))
51 inline void CheckAlignment(const void* ptr
) {
52 ARROW_DCHECK(reinterpret_cast<uint64_t>(ptr
) % sizeof(T
) == 0);
55 // Some platforms typedef int64_t as long int instead of long long int,
56 // which breaks the _mm256_i64gather_epi64 and _mm256_i32gather_epi64 intrinsics
57 // which need long long.
58 // We use the cast to the type below in these intrinsics to make the code
59 // compile in all cases.
61 using int64_for_gather_t
= const long long int; // NOLINT runtime-int
63 /// Storage used to allocate temporary vectors of a batch size.
64 /// Temporary vectors should resemble allocating temporary variables on the stack
65 /// but in the context of vectorized processing where we need to store a vector of
66 /// temporaries instead of a single value.
67 class TempVectorStack
{
69 friend class TempVectorHolder
;
72 Status
Init(MemoryPool
* pool
, int64_t size
) {
76 ARROW_ASSIGN_OR_RAISE(auto buffer
, AllocateResizableBuffer(size
, pool
));
77 // Ensure later operations don't accidentally read uninitialized memory.
78 std::memset(buffer
->mutable_data(), 0xFF, size
);
79 buffer_
= std::move(buffer
);
84 int64_t PaddedAllocationSize(int64_t num_bytes
) {
85 // Round up allocation size to multiple of 8 bytes
86 // to avoid returning temp vectors with unaligned address.
88 // Also add padding at the end to facilitate loads and stores
89 // using SIMD when number of vector elements is not divisible
90 // by the number of SIMD lanes.
92 return ::arrow::BitUtil::RoundUp(num_bytes
, sizeof(int64_t)) + kPadding
;
94 void alloc(uint32_t num_bytes
, uint8_t** data
, int* id
) {
95 int64_t old_top
= top_
;
96 top_
+= PaddedAllocationSize(num_bytes
) + 2 * sizeof(uint64_t);
97 // Stack overflow check
98 ARROW_DCHECK(top_
<= buffer_size_
);
99 *data
= buffer_
->mutable_data() + old_top
+ sizeof(uint64_t);
100 // We set 8 bytes before the beginning of the allocated range and
101 // 8 bytes after the end to check for stack overflow (which would
102 // result in those known bytes being corrupted).
103 reinterpret_cast<uint64_t*>(buffer_
->mutable_data() + old_top
)[0] = kGuard1
;
104 reinterpret_cast<uint64_t*>(buffer_
->mutable_data() + top_
)[-1] = kGuard2
;
105 *id
= num_vectors_
++;
107 void release(int id
, uint32_t num_bytes
) {
108 ARROW_DCHECK(num_vectors_
== id
+ 1);
109 int64_t size
= PaddedAllocationSize(num_bytes
) + 2 * sizeof(uint64_t);
110 ARROW_DCHECK(reinterpret_cast<const uint64_t*>(buffer_
->mutable_data() + top_
)[-1] ==
112 ARROW_DCHECK(top_
>= size
);
114 ARROW_DCHECK(reinterpret_cast<const uint64_t*>(buffer_
->mutable_data() + top_
)[0] ==
118 static constexpr uint64_t kGuard1
= 0x3141592653589793ULL
;
119 static constexpr uint64_t kGuard2
= 0x0577215664901532ULL
;
120 static constexpr int64_t kPadding
= 64;
123 std::unique_ptr
<Buffer
> buffer_
;
124 int64_t buffer_size_
;
127 template <typename T
>
128 class TempVectorHolder
{
129 friend class TempVectorStack
;
132 ~TempVectorHolder() { stack_
->release(id_
, num_elements_
* sizeof(T
)); }
133 T
* mutable_data() { return reinterpret_cast<T
*>(data_
); }
134 TempVectorHolder(TempVectorStack
* stack
, uint32_t num_elements
) {
136 num_elements_
= num_elements
;
137 stack_
->alloc(num_elements
* sizeof(T
), &data_
, &id_
);
141 TempVectorStack
* stack_
;
144 uint32_t num_elements_
;
149 static void bits_to_indexes(int bit_to_search
, int64_t hardware_flags
,
150 const int num_bits
, const uint8_t* bits
, int* num_indexes
,
151 uint16_t* indexes
, int bit_offset
= 0);
153 static void bits_filter_indexes(int bit_to_search
, int64_t hardware_flags
,
154 const int num_bits
, const uint8_t* bits
,
155 const uint16_t* input_indexes
, int* num_indexes
,
156 uint16_t* indexes
, int bit_offset
= 0);
158 // Input and output indexes may be pointing to the same data (in-place filtering).
159 static void bits_split_indexes(int64_t hardware_flags
, const int num_bits
,
160 const uint8_t* bits
, int* num_indexes_bit0
,
161 uint16_t* indexes_bit0
, uint16_t* indexes_bit1
,
164 // Bit 1 is replaced with byte 0xFF.
165 static void bits_to_bytes(int64_t hardware_flags
, const int num_bits
,
166 const uint8_t* bits
, uint8_t* bytes
, int bit_offset
= 0);
168 // Return highest bit of each byte.
169 static void bytes_to_bits(int64_t hardware_flags
, const int num_bits
,
170 const uint8_t* bytes
, uint8_t* bits
, int bit_offset
= 0);
172 static bool are_all_bytes_zero(int64_t hardware_flags
, const uint8_t* bytes
,
176 inline static void bits_to_indexes_helper(uint64_t word
, uint16_t base_index
,
177 int* num_indexes
, uint16_t* indexes
);
178 inline static void bits_filter_indexes_helper(uint64_t word
,
179 const uint16_t* input_indexes
,
180 int* num_indexes
, uint16_t* indexes
);
181 template <int bit_to_search
, bool filter_input_indexes
>
182 static void bits_to_indexes_internal(int64_t hardware_flags
, const int num_bits
,
183 const uint8_t* bits
, const uint16_t* input_indexes
,
184 int* num_indexes
, uint16_t* indexes
,
185 uint16_t base_index
= 0);
187 #if defined(ARROW_HAVE_AVX2)
188 static void bits_to_indexes_avx2(int bit_to_search
, const int num_bits
,
189 const uint8_t* bits
, int* num_indexes
,
190 uint16_t* indexes
, uint16_t base_index
= 0);
191 static void bits_filter_indexes_avx2(int bit_to_search
, const int num_bits
,
192 const uint8_t* bits
, const uint16_t* input_indexes
,
193 int* num_indexes
, uint16_t* indexes
);
194 template <int bit_to_search
>
195 static void bits_to_indexes_imp_avx2(const int num_bits
, const uint8_t* bits
,
196 int* num_indexes
, uint16_t* indexes
,
197 uint16_t base_index
= 0);
198 template <int bit_to_search
>
199 static void bits_filter_indexes_imp_avx2(const int num_bits
, const uint8_t* bits
,
200 const uint16_t* input_indexes
,
201 int* num_indexes
, uint16_t* indexes
);
202 static void bits_to_bytes_avx2(const int num_bits
, const uint8_t* bits
, uint8_t* bytes
);
203 static void bytes_to_bits_avx2(const int num_bits
, const uint8_t* bytes
, uint8_t* bits
);
204 static bool are_all_bytes_zero_avx2(const uint8_t* bytes
, uint32_t num_bytes
);
212 Status
ValidateExecNodeInputs(ExecPlan
* plan
, const std::vector
<ExecNode
*>& inputs
,
213 int expected_num_inputs
, const char* kind_name
);
216 Result
<std::shared_ptr
<Table
>> TableFromExecBatches(
217 const std::shared_ptr
<Schema
>& schema
, const std::vector
<ExecBatch
>& exec_batches
);
219 class AtomicCounter
{
221 AtomicCounter() = default;
223 int count() const { return count_
.load(); }
225 util::optional
<int> total() const {
226 int total
= total_
.load();
227 if (total
== -1) return {};
231 // return true if the counter is complete
233 DCHECK_NE(count_
.load(), total_
.load());
234 int count
= count_
.fetch_add(1) + 1;
235 if (count
!= total_
.load()) return false;
239 // return true if the counter is complete
240 bool SetTotal(int total
) {
242 if (count_
.load() != total
) return false;
246 // return true if the counter has not already been completed
247 bool Cancel() { return DoneOnce(); }
249 // return true if the counter has finished or been cancelled
250 bool Completed() { return complete_
.load(); }
253 // ensure there is only one true return from Increment(), SetTotal(), or Cancel()
255 bool expected
= false;
256 return complete_
.compare_exchange_strong(expected
, true);
259 std::atomic
<int> count_
{0}, total_
{-1};
260 std::atomic
<bool> complete_
{false};
263 class ThreadIndexer
{
267 static size_t Capacity();
270 static size_t Check(size_t thread_index
);
273 std::unordered_map
<std::thread::id
, size_t> id_to_index_
;
276 } // namespace compute