]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/device.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / device.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/device.h"
19
20#include <cstring>
21#include <utility>
22
23#include "arrow/buffer.h"
24#include "arrow/io/memory.h"
25#include "arrow/result.h"
26#include "arrow/util/logging.h"
27
28namespace arrow {
29
30MemoryManager::~MemoryManager() {}
31
32Device::~Device() {}
33
34#define COPY_BUFFER_SUCCESS(maybe_buffer) \
35 ((maybe_buffer).ok() && *(maybe_buffer) != nullptr)
36
37#define COPY_BUFFER_RETURN(maybe_buffer, to) \
38 if (!maybe_buffer.ok()) { \
39 return maybe_buffer; \
40 } \
41 if (COPY_BUFFER_SUCCESS(maybe_buffer)) { \
42 DCHECK_EQ(*(**maybe_buffer).device(), *to->device()); \
43 return maybe_buffer; \
44 }
45
46Result<std::shared_ptr<Buffer>> MemoryManager::CopyBuffer(
47 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
48 const auto& from = buf->memory_manager();
49 auto maybe_buffer = to->CopyBufferFrom(buf, from);
50 COPY_BUFFER_RETURN(maybe_buffer, to);
51 // `to` doesn't support copying from `from`, try the other way
52 maybe_buffer = from->CopyBufferTo(buf, to);
53 COPY_BUFFER_RETURN(maybe_buffer, to);
54 if (!from->is_cpu() && !to->is_cpu()) {
55 // Try an intermediate view on the CPU
56 auto cpu_mm = default_cpu_memory_manager();
57 maybe_buffer = from->ViewBufferTo(buf, cpu_mm);
58 if (!COPY_BUFFER_SUCCESS(maybe_buffer)) {
59 // View failed, try a copy instead
60 // XXX should we have a MemoryManager::IsCopySupportedTo(MemoryManager)
61 // to avoid copying to CPU if copy from CPU to dest is unsupported?
62 maybe_buffer = from->CopyBufferTo(buf, cpu_mm);
63 }
64 if (COPY_BUFFER_SUCCESS(maybe_buffer)) {
65 // Copy from source to CPU succeeded, now try to copy from CPU into dest
66 maybe_buffer = to->CopyBufferFrom(*maybe_buffer, cpu_mm);
67 if (COPY_BUFFER_SUCCESS(maybe_buffer)) {
68 return maybe_buffer;
69 }
70 }
71 }
72
73 return Status::NotImplemented("Copying buffer from ", from->device()->ToString(),
74 " to ", to->device()->ToString(), " not supported");
75}
76
77Result<std::shared_ptr<Buffer>> MemoryManager::ViewBuffer(
78 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
79 if (buf->memory_manager() == to) {
80 return buf;
81 }
82 const auto& from = buf->memory_manager();
83 auto maybe_buffer = to->ViewBufferFrom(buf, from);
84 COPY_BUFFER_RETURN(maybe_buffer, to);
85 // `to` doesn't support viewing from `from`, try the other way
86 maybe_buffer = from->ViewBufferTo(buf, to);
87 COPY_BUFFER_RETURN(maybe_buffer, to);
88
89 return Status::NotImplemented("Viewing buffer from ", from->device()->ToString(),
90 " on ", to->device()->ToString(), " not supported");
91}
92
93#undef COPY_BUFFER_RETURN
94#undef COPY_BUFFER_SUCCESS
95
96Result<std::shared_ptr<Buffer>> MemoryManager::CopyBufferFrom(
97 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
98 return nullptr;
99}
100
101Result<std::shared_ptr<Buffer>> MemoryManager::CopyBufferTo(
102 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
103 return nullptr;
104}
105
106Result<std::shared_ptr<Buffer>> MemoryManager::ViewBufferFrom(
107 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
108 return nullptr;
109}
110
111Result<std::shared_ptr<Buffer>> MemoryManager::ViewBufferTo(
112 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
113 return nullptr;
114}
115
116// ----------------------------------------------------------------------
117// CPU backend implementation
118
119namespace {
120const char kCPUDeviceTypeName[] = "arrow::CPUDevice";
121}
122
123std::shared_ptr<MemoryManager> CPUMemoryManager::Make(
124 const std::shared_ptr<Device>& device, MemoryPool* pool) {
125 return std::shared_ptr<MemoryManager>(new CPUMemoryManager(device, pool));
126}
127
128Result<std::shared_ptr<io::RandomAccessFile>> CPUMemoryManager::GetBufferReader(
129 std::shared_ptr<Buffer> buf) {
130 return std::make_shared<io::BufferReader>(std::move(buf));
131}
132
133Result<std::shared_ptr<io::OutputStream>> CPUMemoryManager::GetBufferWriter(
134 std::shared_ptr<Buffer> buf) {
135 return std::make_shared<io::FixedSizeBufferWriter>(std::move(buf));
136}
137
138Result<std::shared_ptr<Buffer>> CPUMemoryManager::AllocateBuffer(int64_t size) {
139 return ::arrow::AllocateBuffer(size, pool_);
140}
141
142Result<std::shared_ptr<Buffer>> CPUMemoryManager::CopyBufferFrom(
143 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
144 if (!from->is_cpu()) {
145 return nullptr;
146 }
147 ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf->size(), pool_));
148 if (buf->size() > 0) {
149 memcpy(dest->mutable_data(), buf->data(), static_cast<size_t>(buf->size()));
150 }
151 return std::move(dest);
152}
153
154Result<std::shared_ptr<Buffer>> CPUMemoryManager::ViewBufferFrom(
155 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
156 if (!from->is_cpu()) {
157 return nullptr;
158 }
159 return buf;
160}
161
162Result<std::shared_ptr<Buffer>> CPUMemoryManager::CopyBufferTo(
163 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
164 if (!to->is_cpu()) {
165 return nullptr;
166 }
167 ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf->size(), pool_));
168 if (buf->size() > 0) {
169 memcpy(dest->mutable_data(), buf->data(), static_cast<size_t>(buf->size()));
170 }
171 return std::move(dest);
172}
173
174Result<std::shared_ptr<Buffer>> CPUMemoryManager::ViewBufferTo(
175 const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
176 if (!to->is_cpu()) {
177 return nullptr;
178 }
179 return buf;
180}
181
182std::shared_ptr<MemoryManager> default_cpu_memory_manager() {
183 static auto instance =
184 CPUMemoryManager::Make(CPUDevice::Instance(), default_memory_pool());
185 return instance;
186}
187
188std::shared_ptr<Device> CPUDevice::Instance() {
189 static auto instance = std::shared_ptr<Device>(new CPUDevice());
190 return instance;
191}
192
193const char* CPUDevice::type_name() const { return kCPUDeviceTypeName; }
194
195std::string CPUDevice::ToString() const { return "CPUDevice()"; }
196
197bool CPUDevice::Equals(const Device& other) const {
198 return other.type_name() == kCPUDeviceTypeName;
199}
200
201std::shared_ptr<MemoryManager> CPUDevice::memory_manager(MemoryPool* pool) {
202 return CPUMemoryManager::Make(Instance(), pool);
203}
204
205std::shared_ptr<MemoryManager> CPUDevice::default_memory_manager() {
206 return default_cpu_memory_manager();
207}
208
209} // namespace arrow