]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/gpu/cuda_memory.h" | |
19 | ||
20 | #include <algorithm> | |
21 | #include <cstdint> | |
22 | #include <cstdlib> | |
23 | #include <memory> | |
24 | #include <mutex> | |
25 | #include <utility> | |
26 | ||
27 | #include <cuda.h> | |
28 | ||
29 | #include "arrow/buffer.h" | |
30 | #include "arrow/io/memory.h" | |
31 | #include "arrow/memory_pool.h" | |
32 | #include "arrow/status.h" | |
33 | #include "arrow/util/logging.h" | |
34 | ||
35 | #include "arrow/gpu/cuda_context.h" | |
36 | #include "arrow/gpu/cuda_internal.h" | |
37 | ||
38 | namespace arrow { | |
39 | namespace cuda { | |
40 | ||
41 | using internal::ContextSaver; | |
42 | ||
43 | // ---------------------------------------------------------------------- | |
44 | // CUDA IPC memory handle | |
45 | ||
46 | struct CudaIpcMemHandle::CudaIpcMemHandleImpl { | |
47 | explicit CudaIpcMemHandleImpl(const uint8_t* handle) { | |
48 | memcpy(&memory_size, handle, sizeof(memory_size)); | |
49 | if (memory_size != 0) | |
50 | memcpy(&ipc_handle, handle + sizeof(memory_size), sizeof(CUipcMemHandle)); | |
51 | } | |
52 | ||
53 | explicit CudaIpcMemHandleImpl(int64_t memory_size, const void* cu_handle) | |
54 | : memory_size(memory_size) { | |
55 | if (memory_size != 0) { | |
56 | memcpy(&ipc_handle, cu_handle, sizeof(CUipcMemHandle)); | |
57 | } | |
58 | } | |
59 | ||
60 | CUipcMemHandle ipc_handle; /// initialized only when memory_size != 0 | |
61 | int64_t memory_size; /// size of the memory that ipc_handle refers to | |
62 | }; | |
63 | ||
64 | CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) { | |
65 | impl_.reset(new CudaIpcMemHandleImpl(reinterpret_cast<const uint8_t*>(handle))); | |
66 | } | |
67 | ||
68 | CudaIpcMemHandle::CudaIpcMemHandle(int64_t memory_size, const void* cu_handle) { | |
69 | impl_.reset(new CudaIpcMemHandleImpl(memory_size, cu_handle)); | |
70 | } | |
71 | ||
72 | CudaIpcMemHandle::~CudaIpcMemHandle() {} | |
73 | ||
74 | Result<std::shared_ptr<CudaIpcMemHandle>> CudaIpcMemHandle::FromBuffer( | |
75 | const void* opaque_handle) { | |
76 | return std::shared_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(opaque_handle)); | |
77 | } | |
78 | ||
79 | Result<std::shared_ptr<Buffer>> CudaIpcMemHandle::Serialize(MemoryPool* pool) const { | |
80 | int64_t size = impl_->memory_size; | |
81 | const size_t handle_size = | |
82 | (size > 0 ? sizeof(int64_t) + sizeof(CUipcMemHandle) : sizeof(int64_t)); | |
83 | ||
84 | ARROW_ASSIGN_OR_RAISE(auto buffer, | |
85 | AllocateBuffer(static_cast<int64_t>(handle_size), pool)); | |
86 | memcpy(buffer->mutable_data(), &impl_->memory_size, sizeof(impl_->memory_size)); | |
87 | if (size > 0) { | |
88 | memcpy(buffer->mutable_data() + sizeof(impl_->memory_size), &impl_->ipc_handle, | |
89 | sizeof(impl_->ipc_handle)); | |
90 | } | |
91 | return std::move(buffer); | |
92 | } | |
93 | ||
94 | const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; } | |
95 | ||
96 | int64_t CudaIpcMemHandle::memory_size() const { return impl_->memory_size; } | |
97 | ||
98 | // ---------------------------------------------------------------------- | |
99 | ||
100 | CudaBuffer::CudaBuffer(uint8_t* data, int64_t size, | |
101 | const std::shared_ptr<CudaContext>& context, bool own_data, | |
102 | bool is_ipc) | |
103 | : Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) { | |
104 | is_mutable_ = true; | |
105 | SetMemoryManager(context_->memory_manager()); | |
106 | } | |
107 | ||
108 | CudaBuffer::CudaBuffer(uintptr_t address, int64_t size, | |
109 | const std::shared_ptr<CudaContext>& context, bool own_data, | |
110 | bool is_ipc) | |
111 | : CudaBuffer(reinterpret_cast<uint8_t*>(address), size, context, own_data, is_ipc) {} | |
112 | ||
113 | CudaBuffer::~CudaBuffer() { ARROW_CHECK_OK(Close()); } | |
114 | ||
115 | Status CudaBuffer::Close() { | |
116 | if (own_data_) { | |
117 | if (is_ipc_) { | |
118 | return context_->CloseIpcBuffer(this); | |
119 | } else { | |
120 | return context_->Free(const_cast<uint8_t*>(data_), size_); | |
121 | } | |
122 | } | |
123 | return Status::OK(); | |
124 | } | |
125 | ||
126 | CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset, | |
127 | const int64_t size) | |
128 | : Buffer(parent, offset, size), | |
129 | context_(parent->context()), | |
130 | own_data_(false), | |
131 | is_ipc_(false) { | |
132 | is_mutable_ = parent->is_mutable(); | |
133 | } | |
134 | ||
135 | Result<std::shared_ptr<CudaBuffer>> CudaBuffer::FromBuffer( | |
136 | std::shared_ptr<Buffer> buffer) { | |
137 | int64_t offset = 0, size = buffer->size(); | |
138 | bool is_mutable = buffer->is_mutable(); | |
139 | std::shared_ptr<CudaBuffer> cuda_buffer; | |
140 | ||
141 | // The original CudaBuffer may have been wrapped in another Buffer | |
142 | // (for example through slicing). | |
143 | // TODO check device instead | |
144 | while (!(cuda_buffer = std::dynamic_pointer_cast<CudaBuffer>(buffer))) { | |
145 | const std::shared_ptr<Buffer> parent = buffer->parent(); | |
146 | if (!parent) { | |
147 | return Status::TypeError("buffer is not backed by a CudaBuffer"); | |
148 | } | |
149 | offset += buffer->address() - parent->address(); | |
150 | buffer = parent; | |
151 | } | |
152 | // Re-slice to represent the same memory area | |
153 | if (offset != 0 || cuda_buffer->size() != size || !is_mutable) { | |
154 | cuda_buffer = std::make_shared<CudaBuffer>(std::move(cuda_buffer), offset, size); | |
155 | cuda_buffer->is_mutable_ = is_mutable; | |
156 | } | |
157 | return cuda_buffer; | |
158 | } | |
159 | ||
160 | Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes, | |
161 | void* out) const { | |
162 | return context_->CopyDeviceToHost(out, data_ + position, nbytes); | |
163 | } | |
164 | ||
165 | Status CudaBuffer::CopyFromHost(const int64_t position, const void* data, | |
166 | int64_t nbytes) { | |
167 | if (nbytes > size_ - position) { | |
168 | return Status::Invalid("Copy would overflow buffer"); | |
169 | } | |
170 | return context_->CopyHostToDevice(const_cast<uint8_t*>(data_) + position, data, nbytes); | |
171 | } | |
172 | ||
173 | Status CudaBuffer::CopyFromDevice(const int64_t position, const void* data, | |
174 | int64_t nbytes) { | |
175 | if (nbytes > size_ - position) { | |
176 | return Status::Invalid("Copy would overflow buffer"); | |
177 | } | |
178 | return context_->CopyDeviceToDevice(const_cast<uint8_t*>(data_) + position, data, | |
179 | nbytes); | |
180 | } | |
181 | ||
182 | Status CudaBuffer::CopyFromAnotherDevice(const std::shared_ptr<CudaContext>& src_ctx, | |
183 | const int64_t position, const void* data, | |
184 | int64_t nbytes) { | |
185 | if (nbytes > size_ - position) { | |
186 | return Status::Invalid("Copy would overflow buffer"); | |
187 | } | |
188 | return src_ctx->CopyDeviceToAnotherDevice( | |
189 | context_, const_cast<uint8_t*>(data_) + position, data, nbytes); | |
190 | } | |
191 | ||
192 | Result<std::shared_ptr<CudaIpcMemHandle>> CudaBuffer::ExportForIpc() { | |
193 | if (is_ipc_) { | |
194 | return Status::Invalid("Buffer has already been exported for IPC"); | |
195 | } | |
196 | ARROW_ASSIGN_OR_RAISE(auto handle, context_->ExportIpcBuffer(data_, size_)); | |
197 | own_data_ = false; | |
198 | return handle; | |
199 | } | |
200 | ||
201 | CudaHostBuffer::~CudaHostBuffer() { | |
202 | auto maybe_manager = CudaDeviceManager::Instance(); | |
203 | ARROW_CHECK_OK(maybe_manager.status()); | |
204 | ARROW_CHECK_OK((*maybe_manager)->FreeHost(const_cast<uint8_t*>(data_), size_)); | |
205 | } | |
206 | ||
207 | Result<uintptr_t> CudaHostBuffer::GetDeviceAddress( | |
208 | const std::shared_ptr<CudaContext>& ctx) { | |
209 | return ::arrow::cuda::GetDeviceAddress(data(), ctx); | |
210 | } | |
211 | ||
212 | // ---------------------------------------------------------------------- | |
213 | // CudaBufferReader | |
214 | ||
215 | CudaBufferReader::CudaBufferReader(const std::shared_ptr<Buffer>& buffer) | |
216 | : address_(buffer->address()), size_(buffer->size()), position_(0), is_open_(true) { | |
217 | auto maybe_buffer = CudaBuffer::FromBuffer(buffer); | |
218 | if (ARROW_PREDICT_FALSE(!maybe_buffer.ok())) { | |
219 | throw std::bad_cast(); | |
220 | } | |
221 | buffer_ = *std::move(maybe_buffer); | |
222 | context_ = buffer_->context(); | |
223 | } | |
224 | ||
225 | Status CudaBufferReader::DoClose() { | |
226 | is_open_ = false; | |
227 | return Status::OK(); | |
228 | } | |
229 | ||
230 | bool CudaBufferReader::closed() const { return !is_open_; } | |
231 | ||
232 | // XXX Only in a certain sense (not on the CPU)... | |
233 | bool CudaBufferReader::supports_zero_copy() const { return true; } | |
234 | ||
235 | Result<int64_t> CudaBufferReader::DoTell() const { | |
236 | RETURN_NOT_OK(CheckClosed()); | |
237 | return position_; | |
238 | } | |
239 | ||
240 | Result<int64_t> CudaBufferReader::DoGetSize() { | |
241 | RETURN_NOT_OK(CheckClosed()); | |
242 | return size_; | |
243 | } | |
244 | ||
245 | Status CudaBufferReader::DoSeek(int64_t position) { | |
246 | RETURN_NOT_OK(CheckClosed()); | |
247 | ||
248 | if (position < 0 || position > size_) { | |
249 | return Status::IOError("Seek out of bounds"); | |
250 | } | |
251 | ||
252 | position_ = position; | |
253 | return Status::OK(); | |
254 | } | |
255 | ||
256 | Result<int64_t> CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes, | |
257 | void* buffer) { | |
258 | RETURN_NOT_OK(CheckClosed()); | |
259 | ||
260 | nbytes = std::min(nbytes, size_ - position); | |
261 | RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, nbytes)); | |
262 | return nbytes; | |
263 | } | |
264 | ||
265 | Result<int64_t> CudaBufferReader::DoRead(int64_t nbytes, void* buffer) { | |
266 | RETURN_NOT_OK(CheckClosed()); | |
267 | ||
268 | ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, buffer)); | |
269 | position_ += bytes_read; | |
270 | return bytes_read; | |
271 | } | |
272 | ||
273 | Result<std::shared_ptr<Buffer>> CudaBufferReader::DoReadAt(int64_t position, | |
274 | int64_t nbytes) { | |
275 | RETURN_NOT_OK(CheckClosed()); | |
276 | ||
277 | int64_t size = std::min(nbytes, size_ - position); | |
278 | return std::make_shared<CudaBuffer>(buffer_, position, size); | |
279 | } | |
280 | ||
281 | Result<std::shared_ptr<Buffer>> CudaBufferReader::DoRead(int64_t nbytes) { | |
282 | RETURN_NOT_OK(CheckClosed()); | |
283 | ||
284 | int64_t size = std::min(nbytes, size_ - position_); | |
285 | auto buffer = std::make_shared<CudaBuffer>(buffer_, position_, size); | |
286 | position_ += size; | |
287 | return buffer; | |
288 | } | |
289 | ||
290 | // ---------------------------------------------------------------------- | |
291 | // CudaBufferWriter | |
292 | ||
293 | class CudaBufferWriter::CudaBufferWriterImpl { | |
294 | public: | |
295 | explicit CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer>& buffer) | |
296 | : context_(buffer->context()), | |
297 | buffer_(buffer), | |
298 | buffer_size_(0), | |
299 | buffer_position_(0) { | |
300 | buffer_ = buffer; | |
301 | ARROW_CHECK(buffer->is_mutable()) << "Must pass mutable buffer"; | |
302 | address_ = buffer->mutable_address(); | |
303 | size_ = buffer->size(); | |
304 | position_ = 0; | |
305 | closed_ = false; | |
306 | } | |
307 | ||
308 | #define CHECK_CLOSED() \ | |
309 | if (closed_) { \ | |
310 | return Status::Invalid("Operation on closed CudaBufferWriter"); \ | |
311 | } | |
312 | ||
313 | Status Seek(int64_t position) { | |
314 | CHECK_CLOSED(); | |
315 | if (position < 0 || position >= size_) { | |
316 | return Status::IOError("position out of bounds"); | |
317 | } | |
318 | position_ = position; | |
319 | return Status::OK(); | |
320 | } | |
321 | ||
322 | Status Close() { | |
323 | if (!closed_) { | |
324 | closed_ = true; | |
325 | RETURN_NOT_OK(FlushInternal()); | |
326 | } | |
327 | return Status::OK(); | |
328 | } | |
329 | ||
330 | Status Flush() { | |
331 | CHECK_CLOSED(); | |
332 | return FlushInternal(); | |
333 | } | |
334 | ||
335 | Status FlushInternal() { | |
336 | if (buffer_size_ > 0 && buffer_position_ > 0) { | |
337 | // Only need to flush when the write has been buffered | |
338 | RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_ - buffer_position_, | |
339 | host_buffer_data_, buffer_position_)); | |
340 | buffer_position_ = 0; | |
341 | } | |
342 | return Status::OK(); | |
343 | } | |
344 | ||
345 | bool closed() const { return closed_; } | |
346 | ||
347 | Result<int64_t> Tell() const { | |
348 | CHECK_CLOSED(); | |
349 | return position_; | |
350 | } | |
351 | ||
352 | Status Write(const void* data, int64_t nbytes) { | |
353 | CHECK_CLOSED(); | |
354 | if (nbytes == 0) { | |
355 | return Status::OK(); | |
356 | } | |
357 | ||
358 | if (buffer_size_ > 0) { | |
359 | if (nbytes + buffer_position_ >= buffer_size_) { | |
360 | // Reach end of buffer, write everything | |
361 | RETURN_NOT_OK(Flush()); | |
362 | RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes)); | |
363 | } else { | |
364 | // Write bytes to buffer | |
365 | std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes); | |
366 | buffer_position_ += nbytes; | |
367 | } | |
368 | } else { | |
369 | // Unbuffered write | |
370 | RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes)); | |
371 | } | |
372 | position_ += nbytes; | |
373 | return Status::OK(); | |
374 | } | |
375 | ||
376 | Status WriteAt(int64_t position, const void* data, int64_t nbytes) { | |
377 | std::lock_guard<std::mutex> guard(lock_); | |
378 | CHECK_CLOSED(); | |
379 | RETURN_NOT_OK(Seek(position)); | |
380 | return Write(data, nbytes); | |
381 | } | |
382 | ||
383 | Status SetBufferSize(const int64_t buffer_size) { | |
384 | CHECK_CLOSED(); | |
385 | if (buffer_position_ > 0) { | |
386 | // Flush any buffered data | |
387 | RETURN_NOT_OK(Flush()); | |
388 | } | |
389 | ARROW_ASSIGN_OR_RAISE( | |
390 | host_buffer_, | |
391 | AllocateCudaHostBuffer(context_.get()->device_number(), buffer_size)); | |
392 | host_buffer_data_ = host_buffer_->mutable_data(); | |
393 | buffer_size_ = buffer_size; | |
394 | return Status::OK(); | |
395 | } | |
396 | ||
397 | int64_t buffer_size() const { return buffer_size_; } | |
398 | ||
399 | int64_t buffer_position() const { return buffer_position_; } | |
400 | ||
401 | #undef CHECK_CLOSED | |
402 | ||
403 | private: | |
404 | std::shared_ptr<CudaContext> context_; | |
405 | std::shared_ptr<CudaBuffer> buffer_; | |
406 | std::mutex lock_; | |
407 | uintptr_t address_; | |
408 | int64_t size_; | |
409 | int64_t position_; | |
410 | bool closed_; | |
411 | ||
412 | // Pinned host buffer for buffering writes on CPU before calling cudaMalloc | |
413 | int64_t buffer_size_; | |
414 | int64_t buffer_position_; | |
415 | std::shared_ptr<CudaHostBuffer> host_buffer_; | |
416 | uint8_t* host_buffer_data_; | |
417 | }; | |
418 | ||
419 | CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer) { | |
420 | impl_.reset(new CudaBufferWriterImpl(buffer)); | |
421 | } | |
422 | ||
423 | CudaBufferWriter::~CudaBufferWriter() {} | |
424 | ||
425 | Status CudaBufferWriter::Close() { return impl_->Close(); } | |
426 | ||
427 | bool CudaBufferWriter::closed() const { return impl_->closed(); } | |
428 | ||
429 | Status CudaBufferWriter::Flush() { return impl_->Flush(); } | |
430 | ||
431 | Status CudaBufferWriter::Seek(int64_t position) { | |
432 | if (impl_->buffer_position() > 0) { | |
433 | RETURN_NOT_OK(Flush()); | |
434 | } | |
435 | return impl_->Seek(position); | |
436 | } | |
437 | ||
438 | Result<int64_t> CudaBufferWriter::Tell() const { return impl_->Tell(); } | |
439 | ||
440 | Status CudaBufferWriter::Write(const void* data, int64_t nbytes) { | |
441 | return impl_->Write(data, nbytes); | |
442 | } | |
443 | ||
444 | Status CudaBufferWriter::WriteAt(int64_t position, const void* data, int64_t nbytes) { | |
445 | return impl_->WriteAt(position, data, nbytes); | |
446 | } | |
447 | ||
448 | Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) { | |
449 | return impl_->SetBufferSize(buffer_size); | |
450 | } | |
451 | ||
452 | int64_t CudaBufferWriter::buffer_size() const { return impl_->buffer_size(); } | |
453 | ||
454 | int64_t CudaBufferWriter::num_bytes_buffered() const { return impl_->buffer_position(); } | |
455 | ||
456 | // ---------------------------------------------------------------------- | |
457 | ||
458 | Result<std::shared_ptr<CudaHostBuffer>> AllocateCudaHostBuffer(int device_number, | |
459 | const int64_t size) { | |
460 | ARROW_ASSIGN_OR_RAISE(auto manager, CudaDeviceManager::Instance()); | |
461 | return manager->AllocateHost(device_number, size); | |
462 | } | |
463 | ||
464 | Result<uintptr_t> GetDeviceAddress(const uint8_t* cpu_data, | |
465 | const std::shared_ptr<CudaContext>& ctx) { | |
466 | ContextSaver context_saver(*ctx); | |
467 | CUdeviceptr ptr; | |
468 | // XXX should we use cuPointerGetAttribute(CU_POINTER_ATTRIBUTE_DEVICE_POINTER) | |
469 | // instead? | |
470 | CU_RETURN_NOT_OK("cuMemHostGetDevicePointer", | |
471 | cuMemHostGetDevicePointer(&ptr, const_cast<uint8_t*>(cpu_data), 0)); | |
472 | return static_cast<uintptr_t>(ptr); | |
473 | } | |
474 | ||
475 | Result<uint8_t*> GetHostAddress(uintptr_t device_ptr) { | |
476 | void* ptr; | |
477 | CU_RETURN_NOT_OK( | |
478 | "cuPointerGetAttribute", | |
479 | cuPointerGetAttribute(&ptr, CU_POINTER_ATTRIBUTE_HOST_POINTER, device_ptr)); | |
480 | return static_cast<uint8_t*>(ptr); | |
481 | } | |
482 | ||
483 | } // namespace cuda | |
484 | } // namespace arrow |