]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/util/memory.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
20 #include "arrow/util/logging.h"
21 #include "arrow/util/memory.h"
22 #include "arrow/util/thread_pool.h"
27 inline uint8_t* pointer_logical_and(const uint8_t* address
, uintptr_t bits
) {
28 uintptr_t value
= reinterpret_cast<uintptr_t>(address
);
29 return reinterpret_cast<uint8_t*>(value
& bits
);
32 // This function is just for avoiding MinGW-w64 32bit crash.
33 // See also: https://sourceforge.net/p/mingw-w64/bugs/767/
34 void* wrap_memcpy(void* dst
, const void* src
, size_t n
) { return memcpy(dst
, src
, n
); }
36 void parallel_memcopy(uint8_t* dst
, const uint8_t* src
, int64_t nbytes
,
37 uintptr_t block_size
, int num_threads
) {
38 // XXX This function is really using `num_threads + 1` threads.
39 auto pool
= GetCpuThreadPool();
41 uint8_t* left
= pointer_logical_and(src
+ block_size
- 1, ~(block_size
- 1));
42 uint8_t* right
= pointer_logical_and(src
+ nbytes
, ~(block_size
- 1));
43 int64_t num_blocks
= (right
- left
) / block_size
;
45 // Update right address
46 right
= right
- (num_blocks
% num_threads
) * block_size
;
48 // Now we divide these blocks between available threads. The remainder is
49 // handled separately.
50 size_t chunk_size
= (right
- left
) / num_threads
;
51 int64_t prefix
= left
- src
;
52 int64_t suffix
= src
+ nbytes
- right
;
53 // Now the data layout is | prefix | k * num_threads * block_size | suffix |.
54 // We have chunk_size = k * block_size, therefore the data layout is
55 // | prefix | num_threads * chunk_size | suffix |.
56 // Each thread gets a "chunk" of k blocks.
58 // Start all parallel memcpy tasks and handle leftovers while threads run.
59 std::vector
<Future
<void*>> futures
;
61 for (int i
= 0; i
< num_threads
; i
++) {
62 futures
.push_back(*pool
->Submit(wrap_memcpy
, dst
+ prefix
+ i
* chunk_size
,
63 left
+ i
* chunk_size
, chunk_size
));
65 memcpy(dst
, src
, prefix
);
66 memcpy(dst
+ prefix
+ num_threads
* chunk_size
, right
, suffix
);
68 for (auto& fut
: futures
) {
69 ARROW_CHECK_OK(fut
.status());
73 } // namespace internal