1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/gpu/cuda_memory.h"
19 
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstdlib>
23 #include <memory>
24 #include <mutex>
25 #include <utility>
26 
27 #include <cuda.h>
28 
29 #include "arrow/buffer.h"
30 #include "arrow/io/memory.h"
31 #include "arrow/memory_pool.h"
32 #include "arrow/status.h"
33 #include "arrow/util/logging.h"
34 
35 #include "arrow/gpu/cuda_context.h"
36 #include "arrow/gpu/cuda_internal.h"
37 
38 namespace arrow {
39 namespace cuda {
40 
41 using internal::ContextSaver;
42 
43 // ----------------------------------------------------------------------
44 // CUDA IPC memory handle
45 
46 struct CudaIpcMemHandle::CudaIpcMemHandleImpl {
CudaIpcMemHandleImplarrow::cuda::CudaIpcMemHandle::CudaIpcMemHandleImpl47   explicit CudaIpcMemHandleImpl(const uint8_t* handle) {
48     memcpy(&memory_size, handle, sizeof(memory_size));
49     if (memory_size != 0)
50       memcpy(&ipc_handle, handle + sizeof(memory_size), sizeof(CUipcMemHandle));
51   }
52 
CudaIpcMemHandleImplarrow::cuda::CudaIpcMemHandle::CudaIpcMemHandleImpl53   explicit CudaIpcMemHandleImpl(int64_t memory_size, const void* cu_handle)
54       : memory_size(memory_size) {
55     if (memory_size != 0) {
56       memcpy(&ipc_handle, cu_handle, sizeof(CUipcMemHandle));
57     }
58   }
59 
60   CUipcMemHandle ipc_handle;  /// initialized only when memory_size != 0
61   int64_t memory_size;        /// size of the memory that ipc_handle refers to
62 };
63 
CudaIpcMemHandle(const void * handle)64 CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) {
65   impl_.reset(new CudaIpcMemHandleImpl(reinterpret_cast<const uint8_t*>(handle)));
66 }
67 
CudaIpcMemHandle(int64_t memory_size,const void * cu_handle)68 CudaIpcMemHandle::CudaIpcMemHandle(int64_t memory_size, const void* cu_handle) {
69   impl_.reset(new CudaIpcMemHandleImpl(memory_size, cu_handle));
70 }
71 
~CudaIpcMemHandle()72 CudaIpcMemHandle::~CudaIpcMemHandle() {}
73 
FromBuffer(const void * opaque_handle)74 Result<std::shared_ptr<CudaIpcMemHandle>> CudaIpcMemHandle::FromBuffer(
75     const void* opaque_handle) {
76   return std::shared_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(opaque_handle));
77 }
78 
FromBuffer(const void * opaque_handle,std::shared_ptr<CudaIpcMemHandle> * handle)79 Status CudaIpcMemHandle::FromBuffer(const void* opaque_handle,
80                                     std::shared_ptr<CudaIpcMemHandle>* handle) {
81   return FromBuffer(opaque_handle).Value(handle);
82 }
83 
Serialize(MemoryPool * pool) const84 Result<std::shared_ptr<Buffer>> CudaIpcMemHandle::Serialize(MemoryPool* pool) const {
85   int64_t size = impl_->memory_size;
86   const size_t handle_size =
87       (size > 0 ? sizeof(int64_t) + sizeof(CUipcMemHandle) : sizeof(int64_t));
88 
89   ARROW_ASSIGN_OR_RAISE(auto buffer,
90                         AllocateBuffer(static_cast<int64_t>(handle_size), pool));
91   memcpy(buffer->mutable_data(), &impl_->memory_size, sizeof(impl_->memory_size));
92   if (size > 0) {
93     memcpy(buffer->mutable_data() + sizeof(impl_->memory_size), &impl_->ipc_handle,
94            sizeof(impl_->ipc_handle));
95   }
96   return std::move(buffer);
97 }
98 
Serialize(MemoryPool * pool,std::shared_ptr<Buffer> * out) const99 Status CudaIpcMemHandle::Serialize(MemoryPool* pool, std::shared_ptr<Buffer>* out) const {
100   return Serialize(pool).Value(out);
101 }
102 
handle() const103 const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; }
104 
memory_size() const105 int64_t CudaIpcMemHandle::memory_size() const { return impl_->memory_size; }
106 
107 // ----------------------------------------------------------------------
108 
CudaBuffer(uint8_t * data,int64_t size,const std::shared_ptr<CudaContext> & context,bool own_data,bool is_ipc)109 CudaBuffer::CudaBuffer(uint8_t* data, int64_t size,
110                        const std::shared_ptr<CudaContext>& context, bool own_data,
111                        bool is_ipc)
112     : Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) {
113   is_mutable_ = true;
114   mutable_data_ = data;
115   SetMemoryManager(context_->memory_manager());
116 }
117 
CudaBuffer(uintptr_t address,int64_t size,const std::shared_ptr<CudaContext> & context,bool own_data,bool is_ipc)118 CudaBuffer::CudaBuffer(uintptr_t address, int64_t size,
119                        const std::shared_ptr<CudaContext>& context, bool own_data,
120                        bool is_ipc)
121     : CudaBuffer(reinterpret_cast<uint8_t*>(address), size, context, own_data, is_ipc) {}
122 
~CudaBuffer()123 CudaBuffer::~CudaBuffer() { ARROW_CHECK_OK(Close()); }
124 
Close()125 Status CudaBuffer::Close() {
126   if (own_data_) {
127     if (is_ipc_) {
128       return context_->CloseIpcBuffer(this);
129     } else {
130       return context_->Free(mutable_data_, size_);
131     }
132   }
133   return Status::OK();
134 }
135 
CudaBuffer(const std::shared_ptr<CudaBuffer> & parent,const int64_t offset,const int64_t size)136 CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset,
137                        const int64_t size)
138     : Buffer(parent, offset, size),
139       context_(parent->context()),
140       own_data_(false),
141       is_ipc_(false) {
142   if (parent->is_mutable()) {
143     is_mutable_ = true;
144     mutable_data_ = const_cast<uint8_t*>(data_);
145   }
146 }
147 
FromBuffer(std::shared_ptr<Buffer> buffer)148 Result<std::shared_ptr<CudaBuffer>> CudaBuffer::FromBuffer(
149     std::shared_ptr<Buffer> buffer) {
150   int64_t offset = 0, size = buffer->size();
151   bool is_mutable = buffer->is_mutable();
152   std::shared_ptr<CudaBuffer> cuda_buffer;
153 
154   // The original CudaBuffer may have been wrapped in another Buffer
155   // (for example through slicing).
156   // TODO check device instead
157   while (!(cuda_buffer = std::dynamic_pointer_cast<CudaBuffer>(buffer))) {
158     const std::shared_ptr<Buffer> parent = buffer->parent();
159     if (!parent) {
160       return Status::TypeError("buffer is not backed by a CudaBuffer");
161     }
162     offset += buffer->address() - parent->address();
163     buffer = parent;
164   }
165   // Re-slice to represent the same memory area
166   if (offset != 0 || cuda_buffer->size() != size || !is_mutable) {
167     cuda_buffer = std::make_shared<CudaBuffer>(std::move(cuda_buffer), offset, size);
168     cuda_buffer->is_mutable_ = is_mutable;
169   }
170   return cuda_buffer;
171 }
172 
FromBuffer(std::shared_ptr<Buffer> buffer,std::shared_ptr<CudaBuffer> * out)173 Status CudaBuffer::FromBuffer(std::shared_ptr<Buffer> buffer,
174                               std::shared_ptr<CudaBuffer>* out) {
175   return FromBuffer(std::move(buffer)).Value(out);
176 }
177 
CopyToHost(const int64_t position,const int64_t nbytes,void * out) const178 Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
179                               void* out) const {
180   return context_->CopyDeviceToHost(out, data_ + position, nbytes);
181 }
182 
CopyFromHost(const int64_t position,const void * data,int64_t nbytes)183 Status CudaBuffer::CopyFromHost(const int64_t position, const void* data,
184                                 int64_t nbytes) {
185   if (nbytes > size_ - position) {
186     return Status::Invalid("Copy would overflow buffer");
187   }
188   return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes);
189 }
190 
CopyFromDevice(const int64_t position,const void * data,int64_t nbytes)191 Status CudaBuffer::CopyFromDevice(const int64_t position, const void* data,
192                                   int64_t nbytes) {
193   if (nbytes > size_ - position) {
194     return Status::Invalid("Copy would overflow buffer");
195   }
196   return context_->CopyDeviceToDevice(mutable_data_ + position, data, nbytes);
197 }
198 
CopyFromAnotherDevice(const std::shared_ptr<CudaContext> & src_ctx,const int64_t position,const void * data,int64_t nbytes)199 Status CudaBuffer::CopyFromAnotherDevice(const std::shared_ptr<CudaContext>& src_ctx,
200                                          const int64_t position, const void* data,
201                                          int64_t nbytes) {
202   if (nbytes > size_ - position) {
203     return Status::Invalid("Copy would overflow buffer");
204   }
205   return src_ctx->CopyDeviceToAnotherDevice(context_, mutable_data_ + position, data,
206                                             nbytes);
207 }
208 
ExportForIpc()209 Result<std::shared_ptr<CudaIpcMemHandle>> CudaBuffer::ExportForIpc() {
210   if (is_ipc_) {
211     return Status::Invalid("Buffer has already been exported for IPC");
212   }
213   ARROW_ASSIGN_OR_RAISE(auto handle, context_->ExportIpcBuffer(mutable_data_, size_));
214   own_data_ = false;
215   return handle;
216 }
217 
ExportForIpc(std::shared_ptr<CudaIpcMemHandle> * handle)218 Status CudaBuffer::ExportForIpc(std::shared_ptr<CudaIpcMemHandle>* handle) {
219   return ExportForIpc().Value(handle);
220 }
221 
~CudaHostBuffer()222 CudaHostBuffer::~CudaHostBuffer() {
223   auto maybe_manager = CudaDeviceManager::Instance();
224   ARROW_CHECK_OK(maybe_manager.status());
225   ARROW_CHECK_OK((*maybe_manager)->FreeHost(mutable_data_, size_));
226 }
227 
GetDeviceAddress(const std::shared_ptr<CudaContext> & ctx)228 Result<uintptr_t> CudaHostBuffer::GetDeviceAddress(
229     const std::shared_ptr<CudaContext>& ctx) {
230   return ::arrow::cuda::GetDeviceAddress(data(), ctx);
231 }
232 
233 // ----------------------------------------------------------------------
234 // CudaBufferReader
235 
CudaBufferReader(const std::shared_ptr<Buffer> & buffer)236 CudaBufferReader::CudaBufferReader(const std::shared_ptr<Buffer>& buffer)
237     : address_(buffer->address()), size_(buffer->size()), position_(0), is_open_(true) {
238   auto maybe_buffer = CudaBuffer::FromBuffer(buffer);
239   if (ARROW_PREDICT_FALSE(!maybe_buffer.ok())) {
240     throw std::bad_cast();
241   }
242   buffer_ = *std::move(maybe_buffer);
243   context_ = buffer_->context();
244 }
245 
DoClose()246 Status CudaBufferReader::DoClose() {
247   is_open_ = false;
248   return Status::OK();
249 }
250 
closed() const251 bool CudaBufferReader::closed() const { return !is_open_; }
252 
253 // XXX Only in a certain sense (not on the CPU)...
supports_zero_copy() const254 bool CudaBufferReader::supports_zero_copy() const { return true; }
255 
DoTell() const256 Result<int64_t> CudaBufferReader::DoTell() const {
257   RETURN_NOT_OK(CheckClosed());
258   return position_;
259 }
260 
DoGetSize()261 Result<int64_t> CudaBufferReader::DoGetSize() {
262   RETURN_NOT_OK(CheckClosed());
263   return size_;
264 }
265 
DoSeek(int64_t position)266 Status CudaBufferReader::DoSeek(int64_t position) {
267   RETURN_NOT_OK(CheckClosed());
268 
269   if (position < 0 || position > size_) {
270     return Status::IOError("Seek out of bounds");
271   }
272 
273   position_ = position;
274   return Status::OK();
275 }
276 
DoReadAt(int64_t position,int64_t nbytes,void * buffer)277 Result<int64_t> CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes,
278                                            void* buffer) {
279   RETURN_NOT_OK(CheckClosed());
280 
281   nbytes = std::min(nbytes, size_ - position);
282   RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, nbytes));
283   return nbytes;
284 }
285 
DoRead(int64_t nbytes,void * buffer)286 Result<int64_t> CudaBufferReader::DoRead(int64_t nbytes, void* buffer) {
287   RETURN_NOT_OK(CheckClosed());
288 
289   ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, buffer));
290   position_ += bytes_read;
291   return bytes_read;
292 }
293 
DoReadAt(int64_t position,int64_t nbytes)294 Result<std::shared_ptr<Buffer>> CudaBufferReader::DoReadAt(int64_t position,
295                                                            int64_t nbytes) {
296   RETURN_NOT_OK(CheckClosed());
297 
298   int64_t size = std::min(nbytes, size_ - position);
299   return std::make_shared<CudaBuffer>(buffer_, position, size);
300 }
301 
DoRead(int64_t nbytes)302 Result<std::shared_ptr<Buffer>> CudaBufferReader::DoRead(int64_t nbytes) {
303   RETURN_NOT_OK(CheckClosed());
304 
305   int64_t size = std::min(nbytes, size_ - position_);
306   auto buffer = std::make_shared<CudaBuffer>(buffer_, position_, size);
307   position_ += size;
308   return buffer;
309 }
310 
311 // ----------------------------------------------------------------------
312 // CudaBufferWriter
313 
314 class CudaBufferWriter::CudaBufferWriterImpl {
315  public:
CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer> & buffer)316   explicit CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer>& buffer)
317       : context_(buffer->context()),
318         buffer_(buffer),
319         buffer_size_(0),
320         buffer_position_(0) {
321     buffer_ = buffer;
322     ARROW_CHECK(buffer->is_mutable()) << "Must pass mutable buffer";
323     address_ = buffer->mutable_address();
324     size_ = buffer->size();
325     position_ = 0;
326     closed_ = false;
327   }
328 
329 #define CHECK_CLOSED()                                              \
330   if (closed_) {                                                    \
331     return Status::Invalid("Operation on closed CudaBufferWriter"); \
332   }
333 
Seek(int64_t position)334   Status Seek(int64_t position) {
335     CHECK_CLOSED();
336     if (position < 0 || position >= size_) {
337       return Status::IOError("position out of bounds");
338     }
339     position_ = position;
340     return Status::OK();
341   }
342 
Close()343   Status Close() {
344     if (!closed_) {
345       closed_ = true;
346       RETURN_NOT_OK(FlushInternal());
347     }
348     return Status::OK();
349   }
350 
Flush()351   Status Flush() {
352     CHECK_CLOSED();
353     return FlushInternal();
354   }
355 
FlushInternal()356   Status FlushInternal() {
357     if (buffer_size_ > 0 && buffer_position_ > 0) {
358       // Only need to flush when the write has been buffered
359       RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_ - buffer_position_,
360                                                host_buffer_data_, buffer_position_));
361       buffer_position_ = 0;
362     }
363     return Status::OK();
364   }
365 
closed() const366   bool closed() const { return closed_; }
367 
Tell() const368   Result<int64_t> Tell() const {
369     CHECK_CLOSED();
370     return position_;
371   }
372 
Write(const void * data,int64_t nbytes)373   Status Write(const void* data, int64_t nbytes) {
374     CHECK_CLOSED();
375     if (nbytes == 0) {
376       return Status::OK();
377     }
378 
379     if (buffer_size_ > 0) {
380       if (nbytes + buffer_position_ >= buffer_size_) {
381         // Reach end of buffer, write everything
382         RETURN_NOT_OK(Flush());
383         RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
384       } else {
385         // Write bytes to buffer
386         std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
387         buffer_position_ += nbytes;
388       }
389     } else {
390       // Unbuffered write
391       RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
392     }
393     position_ += nbytes;
394     return Status::OK();
395   }
396 
WriteAt(int64_t position,const void * data,int64_t nbytes)397   Status WriteAt(int64_t position, const void* data, int64_t nbytes) {
398     std::lock_guard<std::mutex> guard(lock_);
399     CHECK_CLOSED();
400     RETURN_NOT_OK(Seek(position));
401     return Write(data, nbytes);
402   }
403 
SetBufferSize(const int64_t buffer_size)404   Status SetBufferSize(const int64_t buffer_size) {
405     CHECK_CLOSED();
406     if (buffer_position_ > 0) {
407       // Flush any buffered data
408       RETURN_NOT_OK(Flush());
409     }
410     ARROW_ASSIGN_OR_RAISE(
411         host_buffer_,
412         AllocateCudaHostBuffer(context_.get()->device_number(), buffer_size));
413     host_buffer_data_ = host_buffer_->mutable_data();
414     buffer_size_ = buffer_size;
415     return Status::OK();
416   }
417 
buffer_size() const418   int64_t buffer_size() const { return buffer_size_; }
419 
buffer_position() const420   int64_t buffer_position() const { return buffer_position_; }
421 
422 #undef CHECK_CLOSED
423 
424  private:
425   std::shared_ptr<CudaContext> context_;
426   std::shared_ptr<CudaBuffer> buffer_;
427   std::mutex lock_;
428   uintptr_t address_;
429   int64_t size_;
430   int64_t position_;
431   bool closed_;
432 
433   // Pinned host buffer for buffering writes on CPU before calling cudaMalloc
434   int64_t buffer_size_;
435   int64_t buffer_position_;
436   std::shared_ptr<CudaHostBuffer> host_buffer_;
437   uint8_t* host_buffer_data_;
438 };
439 
CudaBufferWriter(const std::shared_ptr<CudaBuffer> & buffer)440 CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer) {
441   impl_.reset(new CudaBufferWriterImpl(buffer));
442 }
443 
~CudaBufferWriter()444 CudaBufferWriter::~CudaBufferWriter() {}
445 
Close()446 Status CudaBufferWriter::Close() { return impl_->Close(); }
447 
closed() const448 bool CudaBufferWriter::closed() const { return impl_->closed(); }
449 
Flush()450 Status CudaBufferWriter::Flush() { return impl_->Flush(); }
451 
Seek(int64_t position)452 Status CudaBufferWriter::Seek(int64_t position) {
453   if (impl_->buffer_position() > 0) {
454     RETURN_NOT_OK(Flush());
455   }
456   return impl_->Seek(position);
457 }
458 
Tell() const459 Result<int64_t> CudaBufferWriter::Tell() const { return impl_->Tell(); }
460 
Write(const void * data,int64_t nbytes)461 Status CudaBufferWriter::Write(const void* data, int64_t nbytes) {
462   return impl_->Write(data, nbytes);
463 }
464 
WriteAt(int64_t position,const void * data,int64_t nbytes)465 Status CudaBufferWriter::WriteAt(int64_t position, const void* data, int64_t nbytes) {
466   return impl_->WriteAt(position, data, nbytes);
467 }
468 
SetBufferSize(const int64_t buffer_size)469 Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
470   return impl_->SetBufferSize(buffer_size);
471 }
472 
buffer_size() const473 int64_t CudaBufferWriter::buffer_size() const { return impl_->buffer_size(); }
474 
num_bytes_buffered() const475 int64_t CudaBufferWriter::num_bytes_buffered() const { return impl_->buffer_position(); }
476 
477 // ----------------------------------------------------------------------
478 
AllocateCudaHostBuffer(int device_number,const int64_t size)479 Result<std::shared_ptr<CudaHostBuffer>> AllocateCudaHostBuffer(int device_number,
480                                                                const int64_t size) {
481   ARROW_ASSIGN_OR_RAISE(auto manager, CudaDeviceManager::Instance());
482   return manager->AllocateHost(device_number, size);
483 }
484 
AllocateCudaHostBuffer(int device_number,const int64_t size,std::shared_ptr<CudaHostBuffer> * out)485 Status AllocateCudaHostBuffer(int device_number, const int64_t size,
486                               std::shared_ptr<CudaHostBuffer>* out) {
487   return AllocateCudaHostBuffer(device_number, size).Value(out);
488 }
489 
GetDeviceAddress(const uint8_t * cpu_data,const std::shared_ptr<CudaContext> & ctx)490 Result<uintptr_t> GetDeviceAddress(const uint8_t* cpu_data,
491                                    const std::shared_ptr<CudaContext>& ctx) {
492   ContextSaver context_saver(*ctx);
493   CUdeviceptr ptr;
494   // XXX should we use cuPointerGetAttribute(CU_POINTER_ATTRIBUTE_DEVICE_POINTER)
495   // instead?
496   CU_RETURN_NOT_OK("cuMemHostGetDevicePointer",
497                    cuMemHostGetDevicePointer(&ptr, const_cast<uint8_t*>(cpu_data), 0));
498   return static_cast<uintptr_t>(ptr);
499 }
500 
GetHostAddress(uintptr_t device_ptr)501 Result<uint8_t*> GetHostAddress(uintptr_t device_ptr) {
502   void* ptr;
503   CU_RETURN_NOT_OK(
504       "cuPointerGetAttribute",
505       cuPointerGetAttribute(&ptr, CU_POINTER_ATTRIBUTE_HOST_POINTER, device_ptr));
506   return static_cast<uint8_t*>(ptr);
507 }
508 
509 }  // namespace cuda
510 }  // namespace arrow
511