1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/gpu/cuda_memory.h"
19 
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstdlib>
23 #include <memory>
24 #include <mutex>
25 #include <utility>
26 
27 #include <cuda.h>
28 
29 #include "arrow/buffer.h"
30 #include "arrow/io/memory.h"
31 #include "arrow/memory_pool.h"
32 #include "arrow/status.h"
33 #include "arrow/util/logging.h"
34 
35 #include "arrow/gpu/cuda_context.h"
36 #include "arrow/gpu/cuda_internal.h"
37 
38 namespace arrow {
39 namespace cuda {
40 
41 using internal::ContextSaver;
42 
43 // ----------------------------------------------------------------------
44 // CUDA IPC memory handle
45 
46 struct CudaIpcMemHandle::CudaIpcMemHandleImpl {
CudaIpcMemHandleImplarrow::cuda::CudaIpcMemHandle::CudaIpcMemHandleImpl47   explicit CudaIpcMemHandleImpl(const uint8_t* handle) {
48     memcpy(&memory_size, handle, sizeof(memory_size));
49     if (memory_size != 0)
50       memcpy(&ipc_handle, handle + sizeof(memory_size), sizeof(CUipcMemHandle));
51   }
52 
CudaIpcMemHandleImplarrow::cuda::CudaIpcMemHandle::CudaIpcMemHandleImpl53   explicit CudaIpcMemHandleImpl(int64_t memory_size, const void* cu_handle)
54       : memory_size(memory_size) {
55     if (memory_size != 0) {
56       memcpy(&ipc_handle, cu_handle, sizeof(CUipcMemHandle));
57     }
58   }
59 
60   CUipcMemHandle ipc_handle;  /// initialized only when memory_size != 0
61   int64_t memory_size;        /// size of the memory that ipc_handle refers to
62 };
63 
CudaIpcMemHandle(const void * handle)64 CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) {
65   impl_.reset(new CudaIpcMemHandleImpl(reinterpret_cast<const uint8_t*>(handle)));
66 }
67 
CudaIpcMemHandle(int64_t memory_size,const void * cu_handle)68 CudaIpcMemHandle::CudaIpcMemHandle(int64_t memory_size, const void* cu_handle) {
69   impl_.reset(new CudaIpcMemHandleImpl(memory_size, cu_handle));
70 }
71 
~CudaIpcMemHandle()72 CudaIpcMemHandle::~CudaIpcMemHandle() {}
73 
FromBuffer(const void * opaque_handle)74 Result<std::shared_ptr<CudaIpcMemHandle>> CudaIpcMemHandle::FromBuffer(
75     const void* opaque_handle) {
76   return std::shared_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(opaque_handle));
77 }
78 
Serialize(MemoryPool * pool) const79 Result<std::shared_ptr<Buffer>> CudaIpcMemHandle::Serialize(MemoryPool* pool) const {
80   int64_t size = impl_->memory_size;
81   const size_t handle_size =
82       (size > 0 ? sizeof(int64_t) + sizeof(CUipcMemHandle) : sizeof(int64_t));
83 
84   ARROW_ASSIGN_OR_RAISE(auto buffer,
85                         AllocateBuffer(static_cast<int64_t>(handle_size), pool));
86   memcpy(buffer->mutable_data(), &impl_->memory_size, sizeof(impl_->memory_size));
87   if (size > 0) {
88     memcpy(buffer->mutable_data() + sizeof(impl_->memory_size), &impl_->ipc_handle,
89            sizeof(impl_->ipc_handle));
90   }
91   return std::move(buffer);
92 }
93 
handle() const94 const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; }
95 
memory_size() const96 int64_t CudaIpcMemHandle::memory_size() const { return impl_->memory_size; }
97 
98 // ----------------------------------------------------------------------
99 
CudaBuffer(uint8_t * data,int64_t size,const std::shared_ptr<CudaContext> & context,bool own_data,bool is_ipc)100 CudaBuffer::CudaBuffer(uint8_t* data, int64_t size,
101                        const std::shared_ptr<CudaContext>& context, bool own_data,
102                        bool is_ipc)
103     : Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) {
104   is_mutable_ = true;
105   mutable_data_ = data;
106   SetMemoryManager(context_->memory_manager());
107 }
108 
CudaBuffer(uintptr_t address,int64_t size,const std::shared_ptr<CudaContext> & context,bool own_data,bool is_ipc)109 CudaBuffer::CudaBuffer(uintptr_t address, int64_t size,
110                        const std::shared_ptr<CudaContext>& context, bool own_data,
111                        bool is_ipc)
112     : CudaBuffer(reinterpret_cast<uint8_t*>(address), size, context, own_data, is_ipc) {}
113 
~CudaBuffer()114 CudaBuffer::~CudaBuffer() { ARROW_CHECK_OK(Close()); }
115 
Close()116 Status CudaBuffer::Close() {
117   if (own_data_) {
118     if (is_ipc_) {
119       return context_->CloseIpcBuffer(this);
120     } else {
121       return context_->Free(mutable_data_, size_);
122     }
123   }
124   return Status::OK();
125 }
126 
CudaBuffer(const std::shared_ptr<CudaBuffer> & parent,const int64_t offset,const int64_t size)127 CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset,
128                        const int64_t size)
129     : Buffer(parent, offset, size),
130       context_(parent->context()),
131       own_data_(false),
132       is_ipc_(false) {
133   if (parent->is_mutable()) {
134     is_mutable_ = true;
135     mutable_data_ = const_cast<uint8_t*>(data_);
136   }
137 }
138 
FromBuffer(std::shared_ptr<Buffer> buffer)139 Result<std::shared_ptr<CudaBuffer>> CudaBuffer::FromBuffer(
140     std::shared_ptr<Buffer> buffer) {
141   int64_t offset = 0, size = buffer->size();
142   bool is_mutable = buffer->is_mutable();
143   std::shared_ptr<CudaBuffer> cuda_buffer;
144 
145   // The original CudaBuffer may have been wrapped in another Buffer
146   // (for example through slicing).
147   // TODO check device instead
148   while (!(cuda_buffer = std::dynamic_pointer_cast<CudaBuffer>(buffer))) {
149     const std::shared_ptr<Buffer> parent = buffer->parent();
150     if (!parent) {
151       return Status::TypeError("buffer is not backed by a CudaBuffer");
152     }
153     offset += buffer->address() - parent->address();
154     buffer = parent;
155   }
156   // Re-slice to represent the same memory area
157   if (offset != 0 || cuda_buffer->size() != size || !is_mutable) {
158     cuda_buffer = std::make_shared<CudaBuffer>(std::move(cuda_buffer), offset, size);
159     cuda_buffer->is_mutable_ = is_mutable;
160   }
161   return cuda_buffer;
162 }
163 
CopyToHost(const int64_t position,const int64_t nbytes,void * out) const164 Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
165                               void* out) const {
166   return context_->CopyDeviceToHost(out, data_ + position, nbytes);
167 }
168 
CopyFromHost(const int64_t position,const void * data,int64_t nbytes)169 Status CudaBuffer::CopyFromHost(const int64_t position, const void* data,
170                                 int64_t nbytes) {
171   if (nbytes > size_ - position) {
172     return Status::Invalid("Copy would overflow buffer");
173   }
174   return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes);
175 }
176 
CopyFromDevice(const int64_t position,const void * data,int64_t nbytes)177 Status CudaBuffer::CopyFromDevice(const int64_t position, const void* data,
178                                   int64_t nbytes) {
179   if (nbytes > size_ - position) {
180     return Status::Invalid("Copy would overflow buffer");
181   }
182   return context_->CopyDeviceToDevice(mutable_data_ + position, data, nbytes);
183 }
184 
CopyFromAnotherDevice(const std::shared_ptr<CudaContext> & src_ctx,const int64_t position,const void * data,int64_t nbytes)185 Status CudaBuffer::CopyFromAnotherDevice(const std::shared_ptr<CudaContext>& src_ctx,
186                                          const int64_t position, const void* data,
187                                          int64_t nbytes) {
188   if (nbytes > size_ - position) {
189     return Status::Invalid("Copy would overflow buffer");
190   }
191   return src_ctx->CopyDeviceToAnotherDevice(context_, mutable_data_ + position, data,
192                                             nbytes);
193 }
194 
ExportForIpc()195 Result<std::shared_ptr<CudaIpcMemHandle>> CudaBuffer::ExportForIpc() {
196   if (is_ipc_) {
197     return Status::Invalid("Buffer has already been exported for IPC");
198   }
199   ARROW_ASSIGN_OR_RAISE(auto handle, context_->ExportIpcBuffer(mutable_data_, size_));
200   own_data_ = false;
201   return handle;
202 }
203 
~CudaHostBuffer()204 CudaHostBuffer::~CudaHostBuffer() {
205   auto maybe_manager = CudaDeviceManager::Instance();
206   ARROW_CHECK_OK(maybe_manager.status());
207   ARROW_CHECK_OK((*maybe_manager)->FreeHost(mutable_data_, size_));
208 }
209 
GetDeviceAddress(const std::shared_ptr<CudaContext> & ctx)210 Result<uintptr_t> CudaHostBuffer::GetDeviceAddress(
211     const std::shared_ptr<CudaContext>& ctx) {
212   return ::arrow::cuda::GetDeviceAddress(data(), ctx);
213 }
214 
215 // ----------------------------------------------------------------------
216 // CudaBufferReader
217 
CudaBufferReader(const std::shared_ptr<Buffer> & buffer)218 CudaBufferReader::CudaBufferReader(const std::shared_ptr<Buffer>& buffer)
219     : address_(buffer->address()), size_(buffer->size()), position_(0), is_open_(true) {
220   auto maybe_buffer = CudaBuffer::FromBuffer(buffer);
221   if (ARROW_PREDICT_FALSE(!maybe_buffer.ok())) {
222     throw std::bad_cast();
223   }
224   buffer_ = *std::move(maybe_buffer);
225   context_ = buffer_->context();
226 }
227 
DoClose()228 Status CudaBufferReader::DoClose() {
229   is_open_ = false;
230   return Status::OK();
231 }
232 
closed() const233 bool CudaBufferReader::closed() const { return !is_open_; }
234 
235 // XXX Only in a certain sense (not on the CPU)...
supports_zero_copy() const236 bool CudaBufferReader::supports_zero_copy() const { return true; }
237 
DoTell() const238 Result<int64_t> CudaBufferReader::DoTell() const {
239   RETURN_NOT_OK(CheckClosed());
240   return position_;
241 }
242 
DoGetSize()243 Result<int64_t> CudaBufferReader::DoGetSize() {
244   RETURN_NOT_OK(CheckClosed());
245   return size_;
246 }
247 
DoSeek(int64_t position)248 Status CudaBufferReader::DoSeek(int64_t position) {
249   RETURN_NOT_OK(CheckClosed());
250 
251   if (position < 0 || position > size_) {
252     return Status::IOError("Seek out of bounds");
253   }
254 
255   position_ = position;
256   return Status::OK();
257 }
258 
DoReadAt(int64_t position,int64_t nbytes,void * buffer)259 Result<int64_t> CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes,
260                                            void* buffer) {
261   RETURN_NOT_OK(CheckClosed());
262 
263   nbytes = std::min(nbytes, size_ - position);
264   RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, nbytes));
265   return nbytes;
266 }
267 
DoRead(int64_t nbytes,void * buffer)268 Result<int64_t> CudaBufferReader::DoRead(int64_t nbytes, void* buffer) {
269   RETURN_NOT_OK(CheckClosed());
270 
271   ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, buffer));
272   position_ += bytes_read;
273   return bytes_read;
274 }
275 
DoReadAt(int64_t position,int64_t nbytes)276 Result<std::shared_ptr<Buffer>> CudaBufferReader::DoReadAt(int64_t position,
277                                                            int64_t nbytes) {
278   RETURN_NOT_OK(CheckClosed());
279 
280   int64_t size = std::min(nbytes, size_ - position);
281   return std::make_shared<CudaBuffer>(buffer_, position, size);
282 }
283 
DoRead(int64_t nbytes)284 Result<std::shared_ptr<Buffer>> CudaBufferReader::DoRead(int64_t nbytes) {
285   RETURN_NOT_OK(CheckClosed());
286 
287   int64_t size = std::min(nbytes, size_ - position_);
288   auto buffer = std::make_shared<CudaBuffer>(buffer_, position_, size);
289   position_ += size;
290   return buffer;
291 }
292 
293 // ----------------------------------------------------------------------
294 // CudaBufferWriter
295 
296 class CudaBufferWriter::CudaBufferWriterImpl {
297  public:
CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer> & buffer)298   explicit CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer>& buffer)
299       : context_(buffer->context()),
300         buffer_(buffer),
301         buffer_size_(0),
302         buffer_position_(0) {
303     buffer_ = buffer;
304     ARROW_CHECK(buffer->is_mutable()) << "Must pass mutable buffer";
305     address_ = buffer->mutable_address();
306     size_ = buffer->size();
307     position_ = 0;
308     closed_ = false;
309   }
310 
311 #define CHECK_CLOSED()                                              \
312   if (closed_) {                                                    \
313     return Status::Invalid("Operation on closed CudaBufferWriter"); \
314   }
315 
Seek(int64_t position)316   Status Seek(int64_t position) {
317     CHECK_CLOSED();
318     if (position < 0 || position >= size_) {
319       return Status::IOError("position out of bounds");
320     }
321     position_ = position;
322     return Status::OK();
323   }
324 
Close()325   Status Close() {
326     if (!closed_) {
327       closed_ = true;
328       RETURN_NOT_OK(FlushInternal());
329     }
330     return Status::OK();
331   }
332 
Flush()333   Status Flush() {
334     CHECK_CLOSED();
335     return FlushInternal();
336   }
337 
FlushInternal()338   Status FlushInternal() {
339     if (buffer_size_ > 0 && buffer_position_ > 0) {
340       // Only need to flush when the write has been buffered
341       RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_ - buffer_position_,
342                                                host_buffer_data_, buffer_position_));
343       buffer_position_ = 0;
344     }
345     return Status::OK();
346   }
347 
closed() const348   bool closed() const { return closed_; }
349 
Tell() const350   Result<int64_t> Tell() const {
351     CHECK_CLOSED();
352     return position_;
353   }
354 
Write(const void * data,int64_t nbytes)355   Status Write(const void* data, int64_t nbytes) {
356     CHECK_CLOSED();
357     if (nbytes == 0) {
358       return Status::OK();
359     }
360 
361     if (buffer_size_ > 0) {
362       if (nbytes + buffer_position_ >= buffer_size_) {
363         // Reach end of buffer, write everything
364         RETURN_NOT_OK(Flush());
365         RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
366       } else {
367         // Write bytes to buffer
368         std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
369         buffer_position_ += nbytes;
370       }
371     } else {
372       // Unbuffered write
373       RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
374     }
375     position_ += nbytes;
376     return Status::OK();
377   }
378 
WriteAt(int64_t position,const void * data,int64_t nbytes)379   Status WriteAt(int64_t position, const void* data, int64_t nbytes) {
380     std::lock_guard<std::mutex> guard(lock_);
381     CHECK_CLOSED();
382     RETURN_NOT_OK(Seek(position));
383     return Write(data, nbytes);
384   }
385 
SetBufferSize(const int64_t buffer_size)386   Status SetBufferSize(const int64_t buffer_size) {
387     CHECK_CLOSED();
388     if (buffer_position_ > 0) {
389       // Flush any buffered data
390       RETURN_NOT_OK(Flush());
391     }
392     ARROW_ASSIGN_OR_RAISE(
393         host_buffer_,
394         AllocateCudaHostBuffer(context_.get()->device_number(), buffer_size));
395     host_buffer_data_ = host_buffer_->mutable_data();
396     buffer_size_ = buffer_size;
397     return Status::OK();
398   }
399 
buffer_size() const400   int64_t buffer_size() const { return buffer_size_; }
401 
buffer_position() const402   int64_t buffer_position() const { return buffer_position_; }
403 
404 #undef CHECK_CLOSED
405 
406  private:
407   std::shared_ptr<CudaContext> context_;
408   std::shared_ptr<CudaBuffer> buffer_;
409   std::mutex lock_;
410   uintptr_t address_;
411   int64_t size_;
412   int64_t position_;
413   bool closed_;
414 
415   // Pinned host buffer for buffering writes on CPU before calling cudaMalloc
416   int64_t buffer_size_;
417   int64_t buffer_position_;
418   std::shared_ptr<CudaHostBuffer> host_buffer_;
419   uint8_t* host_buffer_data_;
420 };
421 
CudaBufferWriter(const std::shared_ptr<CudaBuffer> & buffer)422 CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer) {
423   impl_.reset(new CudaBufferWriterImpl(buffer));
424 }
425 
~CudaBufferWriter()426 CudaBufferWriter::~CudaBufferWriter() {}
427 
Close()428 Status CudaBufferWriter::Close() { return impl_->Close(); }
429 
closed() const430 bool CudaBufferWriter::closed() const { return impl_->closed(); }
431 
Flush()432 Status CudaBufferWriter::Flush() { return impl_->Flush(); }
433 
Seek(int64_t position)434 Status CudaBufferWriter::Seek(int64_t position) {
435   if (impl_->buffer_position() > 0) {
436     RETURN_NOT_OK(Flush());
437   }
438   return impl_->Seek(position);
439 }
440 
Tell() const441 Result<int64_t> CudaBufferWriter::Tell() const { return impl_->Tell(); }
442 
Write(const void * data,int64_t nbytes)443 Status CudaBufferWriter::Write(const void* data, int64_t nbytes) {
444   return impl_->Write(data, nbytes);
445 }
446 
WriteAt(int64_t position,const void * data,int64_t nbytes)447 Status CudaBufferWriter::WriteAt(int64_t position, const void* data, int64_t nbytes) {
448   return impl_->WriteAt(position, data, nbytes);
449 }
450 
SetBufferSize(const int64_t buffer_size)451 Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
452   return impl_->SetBufferSize(buffer_size);
453 }
454 
buffer_size() const455 int64_t CudaBufferWriter::buffer_size() const { return impl_->buffer_size(); }
456 
num_bytes_buffered() const457 int64_t CudaBufferWriter::num_bytes_buffered() const { return impl_->buffer_position(); }
458 
459 // ----------------------------------------------------------------------
460 
AllocateCudaHostBuffer(int device_number,const int64_t size)461 Result<std::shared_ptr<CudaHostBuffer>> AllocateCudaHostBuffer(int device_number,
462                                                                const int64_t size) {
463   ARROW_ASSIGN_OR_RAISE(auto manager, CudaDeviceManager::Instance());
464   return manager->AllocateHost(device_number, size);
465 }
466 
GetDeviceAddress(const uint8_t * cpu_data,const std::shared_ptr<CudaContext> & ctx)467 Result<uintptr_t> GetDeviceAddress(const uint8_t* cpu_data,
468                                    const std::shared_ptr<CudaContext>& ctx) {
469   ContextSaver context_saver(*ctx);
470   CUdeviceptr ptr;
471   // XXX should we use cuPointerGetAttribute(CU_POINTER_ATTRIBUTE_DEVICE_POINTER)
472   // instead?
473   CU_RETURN_NOT_OK("cuMemHostGetDevicePointer",
474                    cuMemHostGetDevicePointer(&ptr, const_cast<uint8_t*>(cpu_data), 0));
475   return static_cast<uintptr_t>(ptr);
476 }
477 
GetHostAddress(uintptr_t device_ptr)478 Result<uint8_t*> GetHostAddress(uintptr_t device_ptr) {
479   void* ptr;
480   CU_RETURN_NOT_OK(
481       "cuPointerGetAttribute",
482       cuPointerGetAttribute(&ptr, CU_POINTER_ATTRIBUTE_HOST_POINTER, device_ptr));
483   return static_cast<uint8_t*>(ptr);
484 }
485 
486 }  // namespace cuda
487 }  // namespace arrow
488