]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/device.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / device.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/device.h"
19
20 #include <cstring>
21 #include <utility>
22
23 #include "arrow/buffer.h"
24 #include "arrow/io/memory.h"
25 #include "arrow/result.h"
26 #include "arrow/util/logging.h"
27
28 namespace arrow {
29
30 MemoryManager::~MemoryManager() {}
31
32 Device::~Device() {}
33
34 #define COPY_BUFFER_SUCCESS(maybe_buffer) \
35 ((maybe_buffer).ok() && *(maybe_buffer) != nullptr)
36
37 #define COPY_BUFFER_RETURN(maybe_buffer, to) \
38 if (!maybe_buffer.ok()) { \
39 return maybe_buffer; \
40 } \
41 if (COPY_BUFFER_SUCCESS(maybe_buffer)) { \
42 DCHECK_EQ(*(**maybe_buffer).device(), *to->device()); \
43 return maybe_buffer; \
44 }
45
46 Result<std::shared_ptr<Buffer>> MemoryManager::CopyBuffer(
47 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
48 const auto& from = buf->memory_manager();
49 auto maybe_buffer = to->CopyBufferFrom(buf, from);
50 COPY_BUFFER_RETURN(maybe_buffer, to);
51 // `to` doesn't support copying from `from`, try the other way
52 maybe_buffer = from->CopyBufferTo(buf, to);
53 COPY_BUFFER_RETURN(maybe_buffer, to);
54 if (!from->is_cpu() && !to->is_cpu()) {
55 // Try an intermediate view on the CPU
56 auto cpu_mm = default_cpu_memory_manager();
57 maybe_buffer = from->ViewBufferTo(buf, cpu_mm);
58 if (!COPY_BUFFER_SUCCESS(maybe_buffer)) {
59 // View failed, try a copy instead
60 // XXX should we have a MemoryManager::IsCopySupportedTo(MemoryManager)
61 // to avoid copying to CPU if copy from CPU to dest is unsupported?
62 maybe_buffer = from->CopyBufferTo(buf, cpu_mm);
63 }
64 if (COPY_BUFFER_SUCCESS(maybe_buffer)) {
65 // Copy from source to CPU succeeded, now try to copy from CPU into dest
66 maybe_buffer = to->CopyBufferFrom(*maybe_buffer, cpu_mm);
67 if (COPY_BUFFER_SUCCESS(maybe_buffer)) {
68 return maybe_buffer;
69 }
70 }
71 }
72
73 return Status::NotImplemented("Copying buffer from ", from->device()->ToString(),
74 " to ", to->device()->ToString(), " not supported");
75 }
76
77 Result<std::shared_ptr<Buffer>> MemoryManager::ViewBuffer(
78 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
79 if (buf->memory_manager() == to) {
80 return buf;
81 }
82 const auto& from = buf->memory_manager();
83 auto maybe_buffer = to->ViewBufferFrom(buf, from);
84 COPY_BUFFER_RETURN(maybe_buffer, to);
85 // `to` doesn't support viewing from `from`, try the other way
86 maybe_buffer = from->ViewBufferTo(buf, to);
87 COPY_BUFFER_RETURN(maybe_buffer, to);
88
89 return Status::NotImplemented("Viewing buffer from ", from->device()->ToString(),
90 " on ", to->device()->ToString(), " not supported");
91 }
92
93 #undef COPY_BUFFER_RETURN
94 #undef COPY_BUFFER_SUCCESS
95
96 Result<std::shared_ptr<Buffer>> MemoryManager::CopyBufferFrom(
97 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
98 return nullptr;
99 }
100
101 Result<std::shared_ptr<Buffer>> MemoryManager::CopyBufferTo(
102 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
103 return nullptr;
104 }
105
106 Result<std::shared_ptr<Buffer>> MemoryManager::ViewBufferFrom(
107 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
108 return nullptr;
109 }
110
111 Result<std::shared_ptr<Buffer>> MemoryManager::ViewBufferTo(
112 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
113 return nullptr;
114 }
115
116 // ----------------------------------------------------------------------
117 // CPU backend implementation
118
119 namespace {
120 const char kCPUDeviceTypeName[] = "arrow::CPUDevice";
121 }
122
123 std::shared_ptr<MemoryManager> CPUMemoryManager::Make(
124 const std::shared_ptr<Device>& device, MemoryPool* pool) {
125 return std::shared_ptr<MemoryManager>(new CPUMemoryManager(device, pool));
126 }
127
128 Result<std::shared_ptr<io::RandomAccessFile>> CPUMemoryManager::GetBufferReader(
129 std::shared_ptr<Buffer> buf) {
130 return std::make_shared<io::BufferReader>(std::move(buf));
131 }
132
133 Result<std::shared_ptr<io::OutputStream>> CPUMemoryManager::GetBufferWriter(
134 std::shared_ptr<Buffer> buf) {
135 return std::make_shared<io::FixedSizeBufferWriter>(std::move(buf));
136 }
137
138 Result<std::shared_ptr<Buffer>> CPUMemoryManager::AllocateBuffer(int64_t size) {
139 return ::arrow::AllocateBuffer(size, pool_);
140 }
141
142 Result<std::shared_ptr<Buffer>> CPUMemoryManager::CopyBufferFrom(
143 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
144 if (!from->is_cpu()) {
145 return nullptr;
146 }
147 ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf->size(), pool_));
148 if (buf->size() > 0) {
149 memcpy(dest->mutable_data(), buf->data(), static_cast<size_t>(buf->size()));
150 }
151 return std::move(dest);
152 }
153
154 Result<std::shared_ptr<Buffer>> CPUMemoryManager::ViewBufferFrom(
155 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
156 if (!from->is_cpu()) {
157 return nullptr;
158 }
159 return buf;
160 }
161
162 Result<std::shared_ptr<Buffer>> CPUMemoryManager::CopyBufferTo(
163 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
164 if (!to->is_cpu()) {
165 return nullptr;
166 }
167 ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf->size(), pool_));
168 if (buf->size() > 0) {
169 memcpy(dest->mutable_data(), buf->data(), static_cast<size_t>(buf->size()));
170 }
171 return std::move(dest);
172 }
173
174 Result<std::shared_ptr<Buffer>> CPUMemoryManager::ViewBufferTo(
175 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
176 if (!to->is_cpu()) {
177 return nullptr;
178 }
179 return buf;
180 }
181
182 std::shared_ptr<MemoryManager> default_cpu_memory_manager() {
183 static auto instance =
184 CPUMemoryManager::Make(CPUDevice::Instance(), default_memory_pool());
185 return instance;
186 }
187
188 std::shared_ptr<Device> CPUDevice::Instance() {
189 static auto instance = std::shared_ptr<Device>(new CPUDevice());
190 return instance;
191 }
192
193 const char* CPUDevice::type_name() const { return kCPUDeviceTypeName; }
194
195 std::string CPUDevice::ToString() const { return "CPUDevice()"; }
196
197 bool CPUDevice::Equals(const Device& other) const {
198 return other.type_name() == kCPUDeviceTypeName;
199 }
200
201 std::shared_ptr<MemoryManager> CPUDevice::memory_manager(MemoryPool* pool) {
202 return CPUMemoryManager::Make(Instance(), pool);
203 }
204
205 std::shared_ptr<MemoryManager> CPUDevice::default_memory_manager() {
206 return default_cpu_memory_manager();
207 }
208
209 } // namespace arrow