1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/gpu/cuda_memory.h"
19
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstdlib>
23 #include <memory>
24 #include <mutex>
25 #include <utility>
26
27 #include <cuda.h>
28
29 #include "arrow/buffer.h"
30 #include "arrow/io/memory.h"
31 #include "arrow/memory_pool.h"
32 #include "arrow/status.h"
33 #include "arrow/util/logging.h"
34
35 #include "arrow/gpu/cuda_context.h"
36 #include "arrow/gpu/cuda_internal.h"
37
38 namespace arrow {
39 namespace cuda {
40
41 using internal::ContextSaver;
42
43 // ----------------------------------------------------------------------
44 // CUDA IPC memory handle
45
46 struct CudaIpcMemHandle::CudaIpcMemHandleImpl {
CudaIpcMemHandleImplarrow::cuda::CudaIpcMemHandle::CudaIpcMemHandleImpl47 explicit CudaIpcMemHandleImpl(const uint8_t* handle) {
48 memcpy(&memory_size, handle, sizeof(memory_size));
49 if (memory_size != 0)
50 memcpy(&ipc_handle, handle + sizeof(memory_size), sizeof(CUipcMemHandle));
51 }
52
CudaIpcMemHandleImplarrow::cuda::CudaIpcMemHandle::CudaIpcMemHandleImpl53 explicit CudaIpcMemHandleImpl(int64_t memory_size, const void* cu_handle)
54 : memory_size(memory_size) {
55 if (memory_size != 0) {
56 memcpy(&ipc_handle, cu_handle, sizeof(CUipcMemHandle));
57 }
58 }
59
60 CUipcMemHandle ipc_handle; /// initialized only when memory_size != 0
61 int64_t memory_size; /// size of the memory that ipc_handle refers to
62 };
63
CudaIpcMemHandle(const void * handle)64 CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) {
65 impl_.reset(new CudaIpcMemHandleImpl(reinterpret_cast<const uint8_t*>(handle)));
66 }
67
CudaIpcMemHandle(int64_t memory_size,const void * cu_handle)68 CudaIpcMemHandle::CudaIpcMemHandle(int64_t memory_size, const void* cu_handle) {
69 impl_.reset(new CudaIpcMemHandleImpl(memory_size, cu_handle));
70 }
71
~CudaIpcMemHandle()72 CudaIpcMemHandle::~CudaIpcMemHandle() {}
73
FromBuffer(const void * opaque_handle)74 Result<std::shared_ptr<CudaIpcMemHandle>> CudaIpcMemHandle::FromBuffer(
75 const void* opaque_handle) {
76 return std::shared_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(opaque_handle));
77 }
78
FromBuffer(const void * opaque_handle,std::shared_ptr<CudaIpcMemHandle> * handle)79 Status CudaIpcMemHandle::FromBuffer(const void* opaque_handle,
80 std::shared_ptr<CudaIpcMemHandle>* handle) {
81 return FromBuffer(opaque_handle).Value(handle);
82 }
83
Serialize(MemoryPool * pool) const84 Result<std::shared_ptr<Buffer>> CudaIpcMemHandle::Serialize(MemoryPool* pool) const {
85 int64_t size = impl_->memory_size;
86 const size_t handle_size =
87 (size > 0 ? sizeof(int64_t) + sizeof(CUipcMemHandle) : sizeof(int64_t));
88
89 ARROW_ASSIGN_OR_RAISE(auto buffer,
90 AllocateBuffer(static_cast<int64_t>(handle_size), pool));
91 memcpy(buffer->mutable_data(), &impl_->memory_size, sizeof(impl_->memory_size));
92 if (size > 0) {
93 memcpy(buffer->mutable_data() + sizeof(impl_->memory_size), &impl_->ipc_handle,
94 sizeof(impl_->ipc_handle));
95 }
96 return std::move(buffer);
97 }
98
Serialize(MemoryPool * pool,std::shared_ptr<Buffer> * out) const99 Status CudaIpcMemHandle::Serialize(MemoryPool* pool, std::shared_ptr<Buffer>* out) const {
100 return Serialize(pool).Value(out);
101 }
102
handle() const103 const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; }
104
memory_size() const105 int64_t CudaIpcMemHandle::memory_size() const { return impl_->memory_size; }
106
107 // ----------------------------------------------------------------------
108
CudaBuffer(uint8_t * data,int64_t size,const std::shared_ptr<CudaContext> & context,bool own_data,bool is_ipc)109 CudaBuffer::CudaBuffer(uint8_t* data, int64_t size,
110 const std::shared_ptr<CudaContext>& context, bool own_data,
111 bool is_ipc)
112 : Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) {
113 is_mutable_ = true;
114 mutable_data_ = data;
115 SetMemoryManager(context_->memory_manager());
116 }
117
CudaBuffer(uintptr_t address,int64_t size,const std::shared_ptr<CudaContext> & context,bool own_data,bool is_ipc)118 CudaBuffer::CudaBuffer(uintptr_t address, int64_t size,
119 const std::shared_ptr<CudaContext>& context, bool own_data,
120 bool is_ipc)
121 : CudaBuffer(reinterpret_cast<uint8_t*>(address), size, context, own_data, is_ipc) {}
122
~CudaBuffer()123 CudaBuffer::~CudaBuffer() { ARROW_CHECK_OK(Close()); }
124
Close()125 Status CudaBuffer::Close() {
126 if (own_data_) {
127 if (is_ipc_) {
128 return context_->CloseIpcBuffer(this);
129 } else {
130 return context_->Free(mutable_data_, size_);
131 }
132 }
133 return Status::OK();
134 }
135
CudaBuffer(const std::shared_ptr<CudaBuffer> & parent,const int64_t offset,const int64_t size)136 CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset,
137 const int64_t size)
138 : Buffer(parent, offset, size),
139 context_(parent->context()),
140 own_data_(false),
141 is_ipc_(false) {
142 if (parent->is_mutable()) {
143 is_mutable_ = true;
144 mutable_data_ = const_cast<uint8_t*>(data_);
145 }
146 }
147
FromBuffer(std::shared_ptr<Buffer> buffer)148 Result<std::shared_ptr<CudaBuffer>> CudaBuffer::FromBuffer(
149 std::shared_ptr<Buffer> buffer) {
150 int64_t offset = 0, size = buffer->size();
151 bool is_mutable = buffer->is_mutable();
152 std::shared_ptr<CudaBuffer> cuda_buffer;
153
154 // The original CudaBuffer may have been wrapped in another Buffer
155 // (for example through slicing).
156 // TODO check device instead
157 while (!(cuda_buffer = std::dynamic_pointer_cast<CudaBuffer>(buffer))) {
158 const std::shared_ptr<Buffer> parent = buffer->parent();
159 if (!parent) {
160 return Status::TypeError("buffer is not backed by a CudaBuffer");
161 }
162 offset += buffer->address() - parent->address();
163 buffer = parent;
164 }
165 // Re-slice to represent the same memory area
166 if (offset != 0 || cuda_buffer->size() != size || !is_mutable) {
167 cuda_buffer = std::make_shared<CudaBuffer>(std::move(cuda_buffer), offset, size);
168 cuda_buffer->is_mutable_ = is_mutable;
169 }
170 return cuda_buffer;
171 }
172
FromBuffer(std::shared_ptr<Buffer> buffer,std::shared_ptr<CudaBuffer> * out)173 Status CudaBuffer::FromBuffer(std::shared_ptr<Buffer> buffer,
174 std::shared_ptr<CudaBuffer>* out) {
175 return FromBuffer(std::move(buffer)).Value(out);
176 }
177
CopyToHost(const int64_t position,const int64_t nbytes,void * out) const178 Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
179 void* out) const {
180 return context_->CopyDeviceToHost(out, data_ + position, nbytes);
181 }
182
CopyFromHost(const int64_t position,const void * data,int64_t nbytes)183 Status CudaBuffer::CopyFromHost(const int64_t position, const void* data,
184 int64_t nbytes) {
185 if (nbytes > size_ - position) {
186 return Status::Invalid("Copy would overflow buffer");
187 }
188 return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes);
189 }
190
CopyFromDevice(const int64_t position,const void * data,int64_t nbytes)191 Status CudaBuffer::CopyFromDevice(const int64_t position, const void* data,
192 int64_t nbytes) {
193 if (nbytes > size_ - position) {
194 return Status::Invalid("Copy would overflow buffer");
195 }
196 return context_->CopyDeviceToDevice(mutable_data_ + position, data, nbytes);
197 }
198
CopyFromAnotherDevice(const std::shared_ptr<CudaContext> & src_ctx,const int64_t position,const void * data,int64_t nbytes)199 Status CudaBuffer::CopyFromAnotherDevice(const std::shared_ptr<CudaContext>& src_ctx,
200 const int64_t position, const void* data,
201 int64_t nbytes) {
202 if (nbytes > size_ - position) {
203 return Status::Invalid("Copy would overflow buffer");
204 }
205 return src_ctx->CopyDeviceToAnotherDevice(context_, mutable_data_ + position, data,
206 nbytes);
207 }
208
ExportForIpc()209 Result<std::shared_ptr<CudaIpcMemHandle>> CudaBuffer::ExportForIpc() {
210 if (is_ipc_) {
211 return Status::Invalid("Buffer has already been exported for IPC");
212 }
213 ARROW_ASSIGN_OR_RAISE(auto handle, context_->ExportIpcBuffer(mutable_data_, size_));
214 own_data_ = false;
215 return handle;
216 }
217
ExportForIpc(std::shared_ptr<CudaIpcMemHandle> * handle)218 Status CudaBuffer::ExportForIpc(std::shared_ptr<CudaIpcMemHandle>* handle) {
219 return ExportForIpc().Value(handle);
220 }
221
~CudaHostBuffer()222 CudaHostBuffer::~CudaHostBuffer() {
223 auto maybe_manager = CudaDeviceManager::Instance();
224 ARROW_CHECK_OK(maybe_manager.status());
225 ARROW_CHECK_OK((*maybe_manager)->FreeHost(mutable_data_, size_));
226 }
227
GetDeviceAddress(const std::shared_ptr<CudaContext> & ctx)228 Result<uintptr_t> CudaHostBuffer::GetDeviceAddress(
229 const std::shared_ptr<CudaContext>& ctx) {
230 return ::arrow::cuda::GetDeviceAddress(data(), ctx);
231 }
232
233 // ----------------------------------------------------------------------
234 // CudaBufferReader
235
CudaBufferReader(const std::shared_ptr<Buffer> & buffer)236 CudaBufferReader::CudaBufferReader(const std::shared_ptr<Buffer>& buffer)
237 : address_(buffer->address()), size_(buffer->size()), position_(0), is_open_(true) {
238 auto maybe_buffer = CudaBuffer::FromBuffer(buffer);
239 if (ARROW_PREDICT_FALSE(!maybe_buffer.ok())) {
240 throw std::bad_cast();
241 }
242 buffer_ = *std::move(maybe_buffer);
243 context_ = buffer_->context();
244 }
245
DoClose()246 Status CudaBufferReader::DoClose() {
247 is_open_ = false;
248 return Status::OK();
249 }
250
closed() const251 bool CudaBufferReader::closed() const { return !is_open_; }
252
253 // XXX Only in a certain sense (not on the CPU)...
supports_zero_copy() const254 bool CudaBufferReader::supports_zero_copy() const { return true; }
255
DoTell() const256 Result<int64_t> CudaBufferReader::DoTell() const {
257 RETURN_NOT_OK(CheckClosed());
258 return position_;
259 }
260
DoGetSize()261 Result<int64_t> CudaBufferReader::DoGetSize() {
262 RETURN_NOT_OK(CheckClosed());
263 return size_;
264 }
265
DoSeek(int64_t position)266 Status CudaBufferReader::DoSeek(int64_t position) {
267 RETURN_NOT_OK(CheckClosed());
268
269 if (position < 0 || position > size_) {
270 return Status::IOError("Seek out of bounds");
271 }
272
273 position_ = position;
274 return Status::OK();
275 }
276
DoReadAt(int64_t position,int64_t nbytes,void * buffer)277 Result<int64_t> CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes,
278 void* buffer) {
279 RETURN_NOT_OK(CheckClosed());
280
281 nbytes = std::min(nbytes, size_ - position);
282 RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, nbytes));
283 return nbytes;
284 }
285
DoRead(int64_t nbytes,void * buffer)286 Result<int64_t> CudaBufferReader::DoRead(int64_t nbytes, void* buffer) {
287 RETURN_NOT_OK(CheckClosed());
288
289 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, buffer));
290 position_ += bytes_read;
291 return bytes_read;
292 }
293
DoReadAt(int64_t position,int64_t nbytes)294 Result<std::shared_ptr<Buffer>> CudaBufferReader::DoReadAt(int64_t position,
295 int64_t nbytes) {
296 RETURN_NOT_OK(CheckClosed());
297
298 int64_t size = std::min(nbytes, size_ - position);
299 return std::make_shared<CudaBuffer>(buffer_, position, size);
300 }
301
DoRead(int64_t nbytes)302 Result<std::shared_ptr<Buffer>> CudaBufferReader::DoRead(int64_t nbytes) {
303 RETURN_NOT_OK(CheckClosed());
304
305 int64_t size = std::min(nbytes, size_ - position_);
306 auto buffer = std::make_shared<CudaBuffer>(buffer_, position_, size);
307 position_ += size;
308 return buffer;
309 }
310
311 // ----------------------------------------------------------------------
312 // CudaBufferWriter
313
314 class CudaBufferWriter::CudaBufferWriterImpl {
315 public:
CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer> & buffer)316 explicit CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer>& buffer)
317 : context_(buffer->context()),
318 buffer_(buffer),
319 buffer_size_(0),
320 buffer_position_(0) {
321 buffer_ = buffer;
322 ARROW_CHECK(buffer->is_mutable()) << "Must pass mutable buffer";
323 address_ = buffer->mutable_address();
324 size_ = buffer->size();
325 position_ = 0;
326 closed_ = false;
327 }
328
329 #define CHECK_CLOSED() \
330 if (closed_) { \
331 return Status::Invalid("Operation on closed CudaBufferWriter"); \
332 }
333
Seek(int64_t position)334 Status Seek(int64_t position) {
335 CHECK_CLOSED();
336 if (position < 0 || position >= size_) {
337 return Status::IOError("position out of bounds");
338 }
339 position_ = position;
340 return Status::OK();
341 }
342
Close()343 Status Close() {
344 if (!closed_) {
345 closed_ = true;
346 RETURN_NOT_OK(FlushInternal());
347 }
348 return Status::OK();
349 }
350
Flush()351 Status Flush() {
352 CHECK_CLOSED();
353 return FlushInternal();
354 }
355
FlushInternal()356 Status FlushInternal() {
357 if (buffer_size_ > 0 && buffer_position_ > 0) {
358 // Only need to flush when the write has been buffered
359 RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_ - buffer_position_,
360 host_buffer_data_, buffer_position_));
361 buffer_position_ = 0;
362 }
363 return Status::OK();
364 }
365
closed() const366 bool closed() const { return closed_; }
367
Tell() const368 Result<int64_t> Tell() const {
369 CHECK_CLOSED();
370 return position_;
371 }
372
Write(const void * data,int64_t nbytes)373 Status Write(const void* data, int64_t nbytes) {
374 CHECK_CLOSED();
375 if (nbytes == 0) {
376 return Status::OK();
377 }
378
379 if (buffer_size_ > 0) {
380 if (nbytes + buffer_position_ >= buffer_size_) {
381 // Reach end of buffer, write everything
382 RETURN_NOT_OK(Flush());
383 RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
384 } else {
385 // Write bytes to buffer
386 std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
387 buffer_position_ += nbytes;
388 }
389 } else {
390 // Unbuffered write
391 RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
392 }
393 position_ += nbytes;
394 return Status::OK();
395 }
396
WriteAt(int64_t position,const void * data,int64_t nbytes)397 Status WriteAt(int64_t position, const void* data, int64_t nbytes) {
398 std::lock_guard<std::mutex> guard(lock_);
399 CHECK_CLOSED();
400 RETURN_NOT_OK(Seek(position));
401 return Write(data, nbytes);
402 }
403
SetBufferSize(const int64_t buffer_size)404 Status SetBufferSize(const int64_t buffer_size) {
405 CHECK_CLOSED();
406 if (buffer_position_ > 0) {
407 // Flush any buffered data
408 RETURN_NOT_OK(Flush());
409 }
410 ARROW_ASSIGN_OR_RAISE(
411 host_buffer_,
412 AllocateCudaHostBuffer(context_.get()->device_number(), buffer_size));
413 host_buffer_data_ = host_buffer_->mutable_data();
414 buffer_size_ = buffer_size;
415 return Status::OK();
416 }
417
buffer_size() const418 int64_t buffer_size() const { return buffer_size_; }
419
buffer_position() const420 int64_t buffer_position() const { return buffer_position_; }
421
422 #undef CHECK_CLOSED
423
424 private:
425 std::shared_ptr<CudaContext> context_;
426 std::shared_ptr<CudaBuffer> buffer_;
427 std::mutex lock_;
428 uintptr_t address_;
429 int64_t size_;
430 int64_t position_;
431 bool closed_;
432
433 // Pinned host buffer for buffering writes on CPU before calling cudaMalloc
434 int64_t buffer_size_;
435 int64_t buffer_position_;
436 std::shared_ptr<CudaHostBuffer> host_buffer_;
437 uint8_t* host_buffer_data_;
438 };
439
CudaBufferWriter(const std::shared_ptr<CudaBuffer> & buffer)440 CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer) {
441 impl_.reset(new CudaBufferWriterImpl(buffer));
442 }
443
~CudaBufferWriter()444 CudaBufferWriter::~CudaBufferWriter() {}
445
Close()446 Status CudaBufferWriter::Close() { return impl_->Close(); }
447
closed() const448 bool CudaBufferWriter::closed() const { return impl_->closed(); }
449
Flush()450 Status CudaBufferWriter::Flush() { return impl_->Flush(); }
451
Seek(int64_t position)452 Status CudaBufferWriter::Seek(int64_t position) {
453 if (impl_->buffer_position() > 0) {
454 RETURN_NOT_OK(Flush());
455 }
456 return impl_->Seek(position);
457 }
458
Tell() const459 Result<int64_t> CudaBufferWriter::Tell() const { return impl_->Tell(); }
460
Write(const void * data,int64_t nbytes)461 Status CudaBufferWriter::Write(const void* data, int64_t nbytes) {
462 return impl_->Write(data, nbytes);
463 }
464
WriteAt(int64_t position,const void * data,int64_t nbytes)465 Status CudaBufferWriter::WriteAt(int64_t position, const void* data, int64_t nbytes) {
466 return impl_->WriteAt(position, data, nbytes);
467 }
468
SetBufferSize(const int64_t buffer_size)469 Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
470 return impl_->SetBufferSize(buffer_size);
471 }
472
buffer_size() const473 int64_t CudaBufferWriter::buffer_size() const { return impl_->buffer_size(); }
474
num_bytes_buffered() const475 int64_t CudaBufferWriter::num_bytes_buffered() const { return impl_->buffer_position(); }
476
477 // ----------------------------------------------------------------------
478
AllocateCudaHostBuffer(int device_number,const int64_t size)479 Result<std::shared_ptr<CudaHostBuffer>> AllocateCudaHostBuffer(int device_number,
480 const int64_t size) {
481 ARROW_ASSIGN_OR_RAISE(auto manager, CudaDeviceManager::Instance());
482 return manager->AllocateHost(device_number, size);
483 }
484
AllocateCudaHostBuffer(int device_number,const int64_t size,std::shared_ptr<CudaHostBuffer> * out)485 Status AllocateCudaHostBuffer(int device_number, const int64_t size,
486 std::shared_ptr<CudaHostBuffer>* out) {
487 return AllocateCudaHostBuffer(device_number, size).Value(out);
488 }
489
GetDeviceAddress(const uint8_t * cpu_data,const std::shared_ptr<CudaContext> & ctx)490 Result<uintptr_t> GetDeviceAddress(const uint8_t* cpu_data,
491 const std::shared_ptr<CudaContext>& ctx) {
492 ContextSaver context_saver(*ctx);
493 CUdeviceptr ptr;
494 // XXX should we use cuPointerGetAttribute(CU_POINTER_ATTRIBUTE_DEVICE_POINTER)
495 // instead?
496 CU_RETURN_NOT_OK("cuMemHostGetDevicePointer",
497 cuMemHostGetDevicePointer(&ptr, const_cast<uint8_t*>(cpu_data), 0));
498 return static_cast<uintptr_t>(ptr);
499 }
500
GetHostAddress(uintptr_t device_ptr)501 Result<uint8_t*> GetHostAddress(uintptr_t device_ptr) {
502 void* ptr;
503 CU_RETURN_NOT_OK(
504 "cuPointerGetAttribute",
505 cuPointerGetAttribute(&ptr, CU_POINTER_ATTRIBUTE_HOST_POINTER, device_ptr));
506 return static_cast<uint8_t*>(ptr);
507 }
508
509 } // namespace cuda
510 } // namespace arrow
511