]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/gpu/cuda_memory.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / gpu / cuda_memory.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/gpu/cuda_memory.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstdlib>
23#include <memory>
24#include <mutex>
25#include <utility>
26
27#include <cuda.h>
28
29#include "arrow/buffer.h"
30#include "arrow/io/memory.h"
31#include "arrow/memory_pool.h"
32#include "arrow/status.h"
33#include "arrow/util/logging.h"
34
35#include "arrow/gpu/cuda_context.h"
36#include "arrow/gpu/cuda_internal.h"
37
38namespace arrow {
39namespace cuda {
40
41using internal::ContextSaver;
42
43// ----------------------------------------------------------------------
44// CUDA IPC memory handle
45
46struct CudaIpcMemHandle::CudaIpcMemHandleImpl {
47 explicit CudaIpcMemHandleImpl(const uint8_t* handle) {
48 memcpy(&memory_size, handle, sizeof(memory_size));
49 if (memory_size != 0)
50 memcpy(&ipc_handle, handle + sizeof(memory_size), sizeof(CUipcMemHandle));
51 }
52
53 explicit CudaIpcMemHandleImpl(int64_t memory_size, const void* cu_handle)
54 : memory_size(memory_size) {
55 if (memory_size != 0) {
56 memcpy(&ipc_handle, cu_handle, sizeof(CUipcMemHandle));
57 }
58 }
59
60 CUipcMemHandle ipc_handle; /// initialized only when memory_size != 0
61 int64_t memory_size; /// size of the memory that ipc_handle refers to
62};
63
64CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) {
65 impl_.reset(new CudaIpcMemHandleImpl(reinterpret_cast<const uint8_t*>(handle)));
66}
67
68CudaIpcMemHandle::CudaIpcMemHandle(int64_t memory_size, const void* cu_handle) {
69 impl_.reset(new CudaIpcMemHandleImpl(memory_size, cu_handle));
70}
71
72CudaIpcMemHandle::~CudaIpcMemHandle() {}
73
74Result<std::shared_ptr<CudaIpcMemHandle>> CudaIpcMemHandle::FromBuffer(
75 const void* opaque_handle) {
76 return std::shared_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(opaque_handle));
77}
78
79Result<std::shared_ptr<Buffer>> CudaIpcMemHandle::Serialize(MemoryPool* pool) const {
80 int64_t size = impl_->memory_size;
81 const size_t handle_size =
82 (size > 0 ? sizeof(int64_t) + sizeof(CUipcMemHandle) : sizeof(int64_t));
83
84 ARROW_ASSIGN_OR_RAISE(auto buffer,
85 AllocateBuffer(static_cast<int64_t>(handle_size), pool));
86 memcpy(buffer->mutable_data(), &impl_->memory_size, sizeof(impl_->memory_size));
87 if (size > 0) {
88 memcpy(buffer->mutable_data() + sizeof(impl_->memory_size), &impl_->ipc_handle,
89 sizeof(impl_->ipc_handle));
90 }
91 return std::move(buffer);
92}
93
94const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; }
95
96int64_t CudaIpcMemHandle::memory_size() const { return impl_->memory_size; }
97
98// ----------------------------------------------------------------------
99
100CudaBuffer::CudaBuffer(uint8_t* data, int64_t size,
101 const std::shared_ptr<CudaContext>& context, bool own_data,
102 bool is_ipc)
103 : Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) {
104 is_mutable_ = true;
105 SetMemoryManager(context_->memory_manager());
106}
107
108CudaBuffer::CudaBuffer(uintptr_t address, int64_t size,
109 const std::shared_ptr<CudaContext>& context, bool own_data,
110 bool is_ipc)
111 : CudaBuffer(reinterpret_cast<uint8_t*>(address), size, context, own_data, is_ipc) {}
112
113CudaBuffer::~CudaBuffer() { ARROW_CHECK_OK(Close()); }
114
115Status CudaBuffer::Close() {
116 if (own_data_) {
117 if (is_ipc_) {
118 return context_->CloseIpcBuffer(this);
119 } else {
120 return context_->Free(const_cast<uint8_t*>(data_), size_);
121 }
122 }
123 return Status::OK();
124}
125
126CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset,
127 const int64_t size)
128 : Buffer(parent, offset, size),
129 context_(parent->context()),
130 own_data_(false),
131 is_ipc_(false) {
132 is_mutable_ = parent->is_mutable();
133}
134
135Result<std::shared_ptr<CudaBuffer>> CudaBuffer::FromBuffer(
136 std::shared_ptr<Buffer> buffer) {
137 int64_t offset = 0, size = buffer->size();
138 bool is_mutable = buffer->is_mutable();
139 std::shared_ptr<CudaBuffer> cuda_buffer;
140
141 // The original CudaBuffer may have been wrapped in another Buffer
142 // (for example through slicing).
143 // TODO check device instead
144 while (!(cuda_buffer = std::dynamic_pointer_cast<CudaBuffer>(buffer))) {
145 const std::shared_ptr<Buffer> parent = buffer->parent();
146 if (!parent) {
147 return Status::TypeError("buffer is not backed by a CudaBuffer");
148 }
149 offset += buffer->address() - parent->address();
150 buffer = parent;
151 }
152 // Re-slice to represent the same memory area
153 if (offset != 0 || cuda_buffer->size() != size || !is_mutable) {
154 cuda_buffer = std::make_shared<CudaBuffer>(std::move(cuda_buffer), offset, size);
155 cuda_buffer->is_mutable_ = is_mutable;
156 }
157 return cuda_buffer;
158}
159
160Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
161 void* out) const {
162 return context_->CopyDeviceToHost(out, data_ + position, nbytes);
163}
164
165Status CudaBuffer::CopyFromHost(const int64_t position, const void* data,
166 int64_t nbytes) {
167 if (nbytes > size_ - position) {
168 return Status::Invalid("Copy would overflow buffer");
169 }
170 return context_->CopyHostToDevice(const_cast<uint8_t*>(data_) + position, data, nbytes);
171}
172
173Status CudaBuffer::CopyFromDevice(const int64_t position, const void* data,
174 int64_t nbytes) {
175 if (nbytes > size_ - position) {
176 return Status::Invalid("Copy would overflow buffer");
177 }
178 return context_->CopyDeviceToDevice(const_cast<uint8_t*>(data_) + position, data,
179 nbytes);
180}
181
182Status CudaBuffer::CopyFromAnotherDevice(const std::shared_ptr<CudaContext>& src_ctx,
183 const int64_t position, const void* data,
184 int64_t nbytes) {
185 if (nbytes > size_ - position) {
186 return Status::Invalid("Copy would overflow buffer");
187 }
188 return src_ctx->CopyDeviceToAnotherDevice(
189 context_, const_cast<uint8_t*>(data_) + position, data, nbytes);
190}
191
192Result<std::shared_ptr<CudaIpcMemHandle>> CudaBuffer::ExportForIpc() {
193 if (is_ipc_) {
194 return Status::Invalid("Buffer has already been exported for IPC");
195 }
196 ARROW_ASSIGN_OR_RAISE(auto handle, context_->ExportIpcBuffer(data_, size_));
197 own_data_ = false;
198 return handle;
199}
200
201CudaHostBuffer::~CudaHostBuffer() {
202 auto maybe_manager = CudaDeviceManager::Instance();
203 ARROW_CHECK_OK(maybe_manager.status());
204 ARROW_CHECK_OK((*maybe_manager)->FreeHost(const_cast<uint8_t*>(data_), size_));
205}
206
207Result<uintptr_t> CudaHostBuffer::GetDeviceAddress(
208 const std::shared_ptr<CudaContext>& ctx) {
209 return ::arrow::cuda::GetDeviceAddress(data(), ctx);
210}
211
212// ----------------------------------------------------------------------
213// CudaBufferReader
214
215CudaBufferReader::CudaBufferReader(const std::shared_ptr<Buffer>& buffer)
216 : address_(buffer->address()), size_(buffer->size()), position_(0), is_open_(true) {
217 auto maybe_buffer = CudaBuffer::FromBuffer(buffer);
218 if (ARROW_PREDICT_FALSE(!maybe_buffer.ok())) {
219 throw std::bad_cast();
220 }
221 buffer_ = *std::move(maybe_buffer);
222 context_ = buffer_->context();
223}
224
225Status CudaBufferReader::DoClose() {
226 is_open_ = false;
227 return Status::OK();
228}
229
230bool CudaBufferReader::closed() const { return !is_open_; }
231
232// XXX Only in a certain sense (not on the CPU)...
233bool CudaBufferReader::supports_zero_copy() const { return true; }
234
235Result<int64_t> CudaBufferReader::DoTell() const {
236 RETURN_NOT_OK(CheckClosed());
237 return position_;
238}
239
240Result<int64_t> CudaBufferReader::DoGetSize() {
241 RETURN_NOT_OK(CheckClosed());
242 return size_;
243}
244
245Status CudaBufferReader::DoSeek(int64_t position) {
246 RETURN_NOT_OK(CheckClosed());
247
248 if (position < 0 || position > size_) {
249 return Status::IOError("Seek out of bounds");
250 }
251
252 position_ = position;
253 return Status::OK();
254}
255
256Result<int64_t> CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes,
257 void* buffer) {
258 RETURN_NOT_OK(CheckClosed());
259
260 nbytes = std::min(nbytes, size_ - position);
261 RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, nbytes));
262 return nbytes;
263}
264
265Result<int64_t> CudaBufferReader::DoRead(int64_t nbytes, void* buffer) {
266 RETURN_NOT_OK(CheckClosed());
267
268 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, buffer));
269 position_ += bytes_read;
270 return bytes_read;
271}
272
273Result<std::shared_ptr<Buffer>> CudaBufferReader::DoReadAt(int64_t position,
274 int64_t nbytes) {
275 RETURN_NOT_OK(CheckClosed());
276
277 int64_t size = std::min(nbytes, size_ - position);
278 return std::make_shared<CudaBuffer>(buffer_, position, size);
279}
280
281Result<std::shared_ptr<Buffer>> CudaBufferReader::DoRead(int64_t nbytes) {
282 RETURN_NOT_OK(CheckClosed());
283
284 int64_t size = std::min(nbytes, size_ - position_);
285 auto buffer = std::make_shared<CudaBuffer>(buffer_, position_, size);
286 position_ += size;
287 return buffer;
288}
289
290// ----------------------------------------------------------------------
291// CudaBufferWriter
292
293class CudaBufferWriter::CudaBufferWriterImpl {
294 public:
295 explicit CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer>& buffer)
296 : context_(buffer->context()),
297 buffer_(buffer),
298 buffer_size_(0),
299 buffer_position_(0) {
300 buffer_ = buffer;
301 ARROW_CHECK(buffer->is_mutable()) << "Must pass mutable buffer";
302 address_ = buffer->mutable_address();
303 size_ = buffer->size();
304 position_ = 0;
305 closed_ = false;
306 }
307
308#define CHECK_CLOSED() \
309 if (closed_) { \
310 return Status::Invalid("Operation on closed CudaBufferWriter"); \
311 }
312
313 Status Seek(int64_t position) {
314 CHECK_CLOSED();
315 if (position < 0 || position >= size_) {
316 return Status::IOError("position out of bounds");
317 }
318 position_ = position;
319 return Status::OK();
320 }
321
322 Status Close() {
323 if (!closed_) {
324 closed_ = true;
325 RETURN_NOT_OK(FlushInternal());
326 }
327 return Status::OK();
328 }
329
330 Status Flush() {
331 CHECK_CLOSED();
332 return FlushInternal();
333 }
334
335 Status FlushInternal() {
336 if (buffer_size_ > 0 && buffer_position_ > 0) {
337 // Only need to flush when the write has been buffered
338 RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_ - buffer_position_,
339 host_buffer_data_, buffer_position_));
340 buffer_position_ = 0;
341 }
342 return Status::OK();
343 }
344
345 bool closed() const { return closed_; }
346
347 Result<int64_t> Tell() const {
348 CHECK_CLOSED();
349 return position_;
350 }
351
352 Status Write(const void* data, int64_t nbytes) {
353 CHECK_CLOSED();
354 if (nbytes == 0) {
355 return Status::OK();
356 }
357
358 if (buffer_size_ > 0) {
359 if (nbytes + buffer_position_ >= buffer_size_) {
360 // Reach end of buffer, write everything
361 RETURN_NOT_OK(Flush());
362 RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
363 } else {
364 // Write bytes to buffer
365 std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
366 buffer_position_ += nbytes;
367 }
368 } else {
369 // Unbuffered write
370 RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
371 }
372 position_ += nbytes;
373 return Status::OK();
374 }
375
376 Status WriteAt(int64_t position, const void* data, int64_t nbytes) {
377 std::lock_guard<std::mutex> guard(lock_);
378 CHECK_CLOSED();
379 RETURN_NOT_OK(Seek(position));
380 return Write(data, nbytes);
381 }
382
383 Status SetBufferSize(const int64_t buffer_size) {
384 CHECK_CLOSED();
385 if (buffer_position_ > 0) {
386 // Flush any buffered data
387 RETURN_NOT_OK(Flush());
388 }
389 ARROW_ASSIGN_OR_RAISE(
390 host_buffer_,
391 AllocateCudaHostBuffer(context_.get()->device_number(), buffer_size));
392 host_buffer_data_ = host_buffer_->mutable_data();
393 buffer_size_ = buffer_size;
394 return Status::OK();
395 }
396
397 int64_t buffer_size() const { return buffer_size_; }
398
399 int64_t buffer_position() const { return buffer_position_; }
400
401#undef CHECK_CLOSED
402
403 private:
404 std::shared_ptr<CudaContext> context_;
405 std::shared_ptr<CudaBuffer> buffer_;
406 std::mutex lock_;
407 uintptr_t address_;
408 int64_t size_;
409 int64_t position_;
410 bool closed_;
411
412 // Pinned host buffer for buffering writes on CPU before calling cudaMalloc
413 int64_t buffer_size_;
414 int64_t buffer_position_;
415 std::shared_ptr<CudaHostBuffer> host_buffer_;
416 uint8_t* host_buffer_data_;
417};
418
419CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer) {
420 impl_.reset(new CudaBufferWriterImpl(buffer));
421}
422
423CudaBufferWriter::~CudaBufferWriter() {}
424
425Status CudaBufferWriter::Close() { return impl_->Close(); }
426
427bool CudaBufferWriter::closed() const { return impl_->closed(); }
428
429Status CudaBufferWriter::Flush() { return impl_->Flush(); }
430
431Status CudaBufferWriter::Seek(int64_t position) {
432 if (impl_->buffer_position() > 0) {
433 RETURN_NOT_OK(Flush());
434 }
435 return impl_->Seek(position);
436}
437
438Result<int64_t> CudaBufferWriter::Tell() const { return impl_->Tell(); }
439
440Status CudaBufferWriter::Write(const void* data, int64_t nbytes) {
441 return impl_->Write(data, nbytes);
442}
443
444Status CudaBufferWriter::WriteAt(int64_t position, const void* data, int64_t nbytes) {
445 return impl_->WriteAt(position, data, nbytes);
446}
447
448Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
449 return impl_->SetBufferSize(buffer_size);
450}
451
452int64_t CudaBufferWriter::buffer_size() const { return impl_->buffer_size(); }
453
454int64_t CudaBufferWriter::num_bytes_buffered() const { return impl_->buffer_position(); }
455
456// ----------------------------------------------------------------------
457
458Result<std::shared_ptr<CudaHostBuffer>> AllocateCudaHostBuffer(int device_number,
459 const int64_t size) {
460 ARROW_ASSIGN_OR_RAISE(auto manager, CudaDeviceManager::Instance());
461 return manager->AllocateHost(device_number, size);
462}
463
464Result<uintptr_t> GetDeviceAddress(const uint8_t* cpu_data,
465 const std::shared_ptr<CudaContext>& ctx) {
466 ContextSaver context_saver(*ctx);
467 CUdeviceptr ptr;
468 // XXX should we use cuPointerGetAttribute(CU_POINTER_ATTRIBUTE_DEVICE_POINTER)
469 // instead?
470 CU_RETURN_NOT_OK("cuMemHostGetDevicePointer",
471 cuMemHostGetDevicePointer(&ptr, const_cast<uint8_t*>(cpu_data), 0));
472 return static_cast<uintptr_t>(ptr);
473}
474
475Result<uint8_t*> GetHostAddress(uintptr_t device_ptr) {
476 void* ptr;
477 CU_RETURN_NOT_OK(
478 "cuPointerGetAttribute",
479 cuPointerGetAttribute(&ptr, CU_POINTER_ATTRIBUTE_HOST_POINTER, device_ptr));
480 return static_cast<uint8_t*>(ptr);
481}
482
483} // namespace cuda
484} // namespace arrow