]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/util/memory.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / memory.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <vector>
19
20 #include "arrow/util/logging.h"
21 #include "arrow/util/memory.h"
22 #include "arrow/util/thread_pool.h"
23
24 namespace arrow {
25 namespace internal {
26
27 inline uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
28 uintptr_t value = reinterpret_cast<uintptr_t>(address);
29 return reinterpret_cast<uint8_t*>(value & bits);
30 }
31
32 // This function is just for avoiding MinGW-w64 32bit crash.
33 // See also: https://sourceforge.net/p/mingw-w64/bugs/767/
34 void* wrap_memcpy(void* dst, const void* src, size_t n) { return memcpy(dst, src, n); }
35
36 void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
37 uintptr_t block_size, int num_threads) {
38 // XXX This function is really using `num_threads + 1` threads.
39 auto pool = GetCpuThreadPool();
40
41 uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
42 uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1));
43 int64_t num_blocks = (right - left) / block_size;
44
45 // Update right address
46 right = right - (num_blocks % num_threads) * block_size;
47
48 // Now we divide these blocks between available threads. The remainder is
49 // handled separately.
50 size_t chunk_size = (right - left) / num_threads;
51 int64_t prefix = left - src;
52 int64_t suffix = src + nbytes - right;
53 // Now the data layout is | prefix | k * num_threads * block_size | suffix |.
54 // We have chunk_size = k * block_size, therefore the data layout is
55 // | prefix | num_threads * chunk_size | suffix |.
56 // Each thread gets a "chunk" of k blocks.
57
58 // Start all parallel memcpy tasks and handle leftovers while threads run.
59 std::vector<Future<void*>> futures;
60
61 for (int i = 0; i < num_threads; i++) {
62 futures.push_back(*pool->Submit(wrap_memcpy, dst + prefix + i * chunk_size,
63 left + i * chunk_size, chunk_size));
64 }
65 memcpy(dst, src, prefix);
66 memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
67
68 for (auto& fut : futures) {
69 ARROW_CHECK_OK(fut.status());
70 }
71 }
72
73 } // namespace internal
74 } // namespace arrow