1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/gpu/cuda_memory.h"
19
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstdlib>
23 #include <memory>
24 #include <mutex>
25 #include <utility>
26
27 #include <cuda.h>
28
29 #include "arrow/buffer.h"
30 #include "arrow/io/memory.h"
31 #include "arrow/memory_pool.h"
32 #include "arrow/status.h"
33 #include "arrow/util/logging.h"
34
35 #include "arrow/gpu/cuda_context.h"
36 #include "arrow/gpu/cuda_internal.h"
37
38 namespace arrow {
39 namespace cuda {
40
41 using internal::ContextSaver;
42
43 // ----------------------------------------------------------------------
44 // CUDA IPC memory handle
45
46 struct CudaIpcMemHandle::CudaIpcMemHandleImpl {
CudaIpcMemHandleImplarrow::cuda::CudaIpcMemHandle::CudaIpcMemHandleImpl47 explicit CudaIpcMemHandleImpl(const uint8_t* handle) {
48 memcpy(&memory_size, handle, sizeof(memory_size));
49 if (memory_size != 0)
50 memcpy(&ipc_handle, handle + sizeof(memory_size), sizeof(CUipcMemHandle));
51 }
52
CudaIpcMemHandleImplarrow::cuda::CudaIpcMemHandle::CudaIpcMemHandleImpl53 explicit CudaIpcMemHandleImpl(int64_t memory_size, const void* cu_handle)
54 : memory_size(memory_size) {
55 if (memory_size != 0) {
56 memcpy(&ipc_handle, cu_handle, sizeof(CUipcMemHandle));
57 }
58 }
59
60 CUipcMemHandle ipc_handle; /// initialized only when memory_size != 0
61 int64_t memory_size; /// size of the memory that ipc_handle refers to
62 };
63
CudaIpcMemHandle(const void * handle)64 CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) {
65 impl_.reset(new CudaIpcMemHandleImpl(reinterpret_cast<const uint8_t*>(handle)));
66 }
67
CudaIpcMemHandle(int64_t memory_size,const void * cu_handle)68 CudaIpcMemHandle::CudaIpcMemHandle(int64_t memory_size, const void* cu_handle) {
69 impl_.reset(new CudaIpcMemHandleImpl(memory_size, cu_handle));
70 }
71
~CudaIpcMemHandle()72 CudaIpcMemHandle::~CudaIpcMemHandle() {}
73
FromBuffer(const void * opaque_handle)74 Result<std::shared_ptr<CudaIpcMemHandle>> CudaIpcMemHandle::FromBuffer(
75 const void* opaque_handle) {
76 return std::shared_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(opaque_handle));
77 }
78
Serialize(MemoryPool * pool) const79 Result<std::shared_ptr<Buffer>> CudaIpcMemHandle::Serialize(MemoryPool* pool) const {
80 int64_t size = impl_->memory_size;
81 const size_t handle_size =
82 (size > 0 ? sizeof(int64_t) + sizeof(CUipcMemHandle) : sizeof(int64_t));
83
84 ARROW_ASSIGN_OR_RAISE(auto buffer,
85 AllocateBuffer(static_cast<int64_t>(handle_size), pool));
86 memcpy(buffer->mutable_data(), &impl_->memory_size, sizeof(impl_->memory_size));
87 if (size > 0) {
88 memcpy(buffer->mutable_data() + sizeof(impl_->memory_size), &impl_->ipc_handle,
89 sizeof(impl_->ipc_handle));
90 }
91 return std::move(buffer);
92 }
93
handle() const94 const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; }
95
memory_size() const96 int64_t CudaIpcMemHandle::memory_size() const { return impl_->memory_size; }
97
98 // ----------------------------------------------------------------------
99
CudaBuffer(uint8_t * data,int64_t size,const std::shared_ptr<CudaContext> & context,bool own_data,bool is_ipc)100 CudaBuffer::CudaBuffer(uint8_t* data, int64_t size,
101 const std::shared_ptr<CudaContext>& context, bool own_data,
102 bool is_ipc)
103 : Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) {
104 is_mutable_ = true;
105 mutable_data_ = data;
106 SetMemoryManager(context_->memory_manager());
107 }
108
CudaBuffer(uintptr_t address,int64_t size,const std::shared_ptr<CudaContext> & context,bool own_data,bool is_ipc)109 CudaBuffer::CudaBuffer(uintptr_t address, int64_t size,
110 const std::shared_ptr<CudaContext>& context, bool own_data,
111 bool is_ipc)
112 : CudaBuffer(reinterpret_cast<uint8_t*>(address), size, context, own_data, is_ipc) {}
113
~CudaBuffer()114 CudaBuffer::~CudaBuffer() { ARROW_CHECK_OK(Close()); }
115
Close()116 Status CudaBuffer::Close() {
117 if (own_data_) {
118 if (is_ipc_) {
119 return context_->CloseIpcBuffer(this);
120 } else {
121 return context_->Free(mutable_data_, size_);
122 }
123 }
124 return Status::OK();
125 }
126
CudaBuffer(const std::shared_ptr<CudaBuffer> & parent,const int64_t offset,const int64_t size)127 CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset,
128 const int64_t size)
129 : Buffer(parent, offset, size),
130 context_(parent->context()),
131 own_data_(false),
132 is_ipc_(false) {
133 if (parent->is_mutable()) {
134 is_mutable_ = true;
135 mutable_data_ = const_cast<uint8_t*>(data_);
136 }
137 }
138
FromBuffer(std::shared_ptr<Buffer> buffer)139 Result<std::shared_ptr<CudaBuffer>> CudaBuffer::FromBuffer(
140 std::shared_ptr<Buffer> buffer) {
141 int64_t offset = 0, size = buffer->size();
142 bool is_mutable = buffer->is_mutable();
143 std::shared_ptr<CudaBuffer> cuda_buffer;
144
145 // The original CudaBuffer may have been wrapped in another Buffer
146 // (for example through slicing).
147 // TODO check device instead
148 while (!(cuda_buffer = std::dynamic_pointer_cast<CudaBuffer>(buffer))) {
149 const std::shared_ptr<Buffer> parent = buffer->parent();
150 if (!parent) {
151 return Status::TypeError("buffer is not backed by a CudaBuffer");
152 }
153 offset += buffer->address() - parent->address();
154 buffer = parent;
155 }
156 // Re-slice to represent the same memory area
157 if (offset != 0 || cuda_buffer->size() != size || !is_mutable) {
158 cuda_buffer = std::make_shared<CudaBuffer>(std::move(cuda_buffer), offset, size);
159 cuda_buffer->is_mutable_ = is_mutable;
160 }
161 return cuda_buffer;
162 }
163
CopyToHost(const int64_t position,const int64_t nbytes,void * out) const164 Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
165 void* out) const {
166 return context_->CopyDeviceToHost(out, data_ + position, nbytes);
167 }
168
CopyFromHost(const int64_t position,const void * data,int64_t nbytes)169 Status CudaBuffer::CopyFromHost(const int64_t position, const void* data,
170 int64_t nbytes) {
171 if (nbytes > size_ - position) {
172 return Status::Invalid("Copy would overflow buffer");
173 }
174 return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes);
175 }
176
CopyFromDevice(const int64_t position,const void * data,int64_t nbytes)177 Status CudaBuffer::CopyFromDevice(const int64_t position, const void* data,
178 int64_t nbytes) {
179 if (nbytes > size_ - position) {
180 return Status::Invalid("Copy would overflow buffer");
181 }
182 return context_->CopyDeviceToDevice(mutable_data_ + position, data, nbytes);
183 }
184
CopyFromAnotherDevice(const std::shared_ptr<CudaContext> & src_ctx,const int64_t position,const void * data,int64_t nbytes)185 Status CudaBuffer::CopyFromAnotherDevice(const std::shared_ptr<CudaContext>& src_ctx,
186 const int64_t position, const void* data,
187 int64_t nbytes) {
188 if (nbytes > size_ - position) {
189 return Status::Invalid("Copy would overflow buffer");
190 }
191 return src_ctx->CopyDeviceToAnotherDevice(context_, mutable_data_ + position, data,
192 nbytes);
193 }
194
ExportForIpc()195 Result<std::shared_ptr<CudaIpcMemHandle>> CudaBuffer::ExportForIpc() {
196 if (is_ipc_) {
197 return Status::Invalid("Buffer has already been exported for IPC");
198 }
199 ARROW_ASSIGN_OR_RAISE(auto handle, context_->ExportIpcBuffer(mutable_data_, size_));
200 own_data_ = false;
201 return handle;
202 }
203
~CudaHostBuffer()204 CudaHostBuffer::~CudaHostBuffer() {
205 auto maybe_manager = CudaDeviceManager::Instance();
206 ARROW_CHECK_OK(maybe_manager.status());
207 ARROW_CHECK_OK((*maybe_manager)->FreeHost(mutable_data_, size_));
208 }
209
GetDeviceAddress(const std::shared_ptr<CudaContext> & ctx)210 Result<uintptr_t> CudaHostBuffer::GetDeviceAddress(
211 const std::shared_ptr<CudaContext>& ctx) {
212 return ::arrow::cuda::GetDeviceAddress(data(), ctx);
213 }
214
215 // ----------------------------------------------------------------------
216 // CudaBufferReader
217
CudaBufferReader(const std::shared_ptr<Buffer> & buffer)218 CudaBufferReader::CudaBufferReader(const std::shared_ptr<Buffer>& buffer)
219 : address_(buffer->address()), size_(buffer->size()), position_(0), is_open_(true) {
220 auto maybe_buffer = CudaBuffer::FromBuffer(buffer);
221 if (ARROW_PREDICT_FALSE(!maybe_buffer.ok())) {
222 throw std::bad_cast();
223 }
224 buffer_ = *std::move(maybe_buffer);
225 context_ = buffer_->context();
226 }
227
DoClose()228 Status CudaBufferReader::DoClose() {
229 is_open_ = false;
230 return Status::OK();
231 }
232
closed() const233 bool CudaBufferReader::closed() const { return !is_open_; }
234
235 // XXX Only in a certain sense (not on the CPU)...
supports_zero_copy() const236 bool CudaBufferReader::supports_zero_copy() const { return true; }
237
DoTell() const238 Result<int64_t> CudaBufferReader::DoTell() const {
239 RETURN_NOT_OK(CheckClosed());
240 return position_;
241 }
242
DoGetSize()243 Result<int64_t> CudaBufferReader::DoGetSize() {
244 RETURN_NOT_OK(CheckClosed());
245 return size_;
246 }
247
DoSeek(int64_t position)248 Status CudaBufferReader::DoSeek(int64_t position) {
249 RETURN_NOT_OK(CheckClosed());
250
251 if (position < 0 || position > size_) {
252 return Status::IOError("Seek out of bounds");
253 }
254
255 position_ = position;
256 return Status::OK();
257 }
258
DoReadAt(int64_t position,int64_t nbytes,void * buffer)259 Result<int64_t> CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes,
260 void* buffer) {
261 RETURN_NOT_OK(CheckClosed());
262
263 nbytes = std::min(nbytes, size_ - position);
264 RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, nbytes));
265 return nbytes;
266 }
267
DoRead(int64_t nbytes,void * buffer)268 Result<int64_t> CudaBufferReader::DoRead(int64_t nbytes, void* buffer) {
269 RETURN_NOT_OK(CheckClosed());
270
271 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, buffer));
272 position_ += bytes_read;
273 return bytes_read;
274 }
275
DoReadAt(int64_t position,int64_t nbytes)276 Result<std::shared_ptr<Buffer>> CudaBufferReader::DoReadAt(int64_t position,
277 int64_t nbytes) {
278 RETURN_NOT_OK(CheckClosed());
279
280 int64_t size = std::min(nbytes, size_ - position);
281 return std::make_shared<CudaBuffer>(buffer_, position, size);
282 }
283
DoRead(int64_t nbytes)284 Result<std::shared_ptr<Buffer>> CudaBufferReader::DoRead(int64_t nbytes) {
285 RETURN_NOT_OK(CheckClosed());
286
287 int64_t size = std::min(nbytes, size_ - position_);
288 auto buffer = std::make_shared<CudaBuffer>(buffer_, position_, size);
289 position_ += size;
290 return buffer;
291 }
292
293 // ----------------------------------------------------------------------
294 // CudaBufferWriter
295
296 class CudaBufferWriter::CudaBufferWriterImpl {
297 public:
CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer> & buffer)298 explicit CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer>& buffer)
299 : context_(buffer->context()),
300 buffer_(buffer),
301 buffer_size_(0),
302 buffer_position_(0) {
303 buffer_ = buffer;
304 ARROW_CHECK(buffer->is_mutable()) << "Must pass mutable buffer";
305 address_ = buffer->mutable_address();
306 size_ = buffer->size();
307 position_ = 0;
308 closed_ = false;
309 }
310
311 #define CHECK_CLOSED() \
312 if (closed_) { \
313 return Status::Invalid("Operation on closed CudaBufferWriter"); \
314 }
315
Seek(int64_t position)316 Status Seek(int64_t position) {
317 CHECK_CLOSED();
318 if (position < 0 || position >= size_) {
319 return Status::IOError("position out of bounds");
320 }
321 position_ = position;
322 return Status::OK();
323 }
324
Close()325 Status Close() {
326 if (!closed_) {
327 closed_ = true;
328 RETURN_NOT_OK(FlushInternal());
329 }
330 return Status::OK();
331 }
332
Flush()333 Status Flush() {
334 CHECK_CLOSED();
335 return FlushInternal();
336 }
337
FlushInternal()338 Status FlushInternal() {
339 if (buffer_size_ > 0 && buffer_position_ > 0) {
340 // Only need to flush when the write has been buffered
341 RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_ - buffer_position_,
342 host_buffer_data_, buffer_position_));
343 buffer_position_ = 0;
344 }
345 return Status::OK();
346 }
347
closed() const348 bool closed() const { return closed_; }
349
Tell() const350 Result<int64_t> Tell() const {
351 CHECK_CLOSED();
352 return position_;
353 }
354
Write(const void * data,int64_t nbytes)355 Status Write(const void* data, int64_t nbytes) {
356 CHECK_CLOSED();
357 if (nbytes == 0) {
358 return Status::OK();
359 }
360
361 if (buffer_size_ > 0) {
362 if (nbytes + buffer_position_ >= buffer_size_) {
363 // Reach end of buffer, write everything
364 RETURN_NOT_OK(Flush());
365 RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
366 } else {
367 // Write bytes to buffer
368 std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
369 buffer_position_ += nbytes;
370 }
371 } else {
372 // Unbuffered write
373 RETURN_NOT_OK(context_->CopyHostToDevice(address_ + position_, data, nbytes));
374 }
375 position_ += nbytes;
376 return Status::OK();
377 }
378
WriteAt(int64_t position,const void * data,int64_t nbytes)379 Status WriteAt(int64_t position, const void* data, int64_t nbytes) {
380 std::lock_guard<std::mutex> guard(lock_);
381 CHECK_CLOSED();
382 RETURN_NOT_OK(Seek(position));
383 return Write(data, nbytes);
384 }
385
SetBufferSize(const int64_t buffer_size)386 Status SetBufferSize(const int64_t buffer_size) {
387 CHECK_CLOSED();
388 if (buffer_position_ > 0) {
389 // Flush any buffered data
390 RETURN_NOT_OK(Flush());
391 }
392 ARROW_ASSIGN_OR_RAISE(
393 host_buffer_,
394 AllocateCudaHostBuffer(context_.get()->device_number(), buffer_size));
395 host_buffer_data_ = host_buffer_->mutable_data();
396 buffer_size_ = buffer_size;
397 return Status::OK();
398 }
399
buffer_size() const400 int64_t buffer_size() const { return buffer_size_; }
401
buffer_position() const402 int64_t buffer_position() const { return buffer_position_; }
403
404 #undef CHECK_CLOSED
405
406 private:
407 std::shared_ptr<CudaContext> context_;
408 std::shared_ptr<CudaBuffer> buffer_;
409 std::mutex lock_;
410 uintptr_t address_;
411 int64_t size_;
412 int64_t position_;
413 bool closed_;
414
415 // Pinned host buffer for buffering writes on CPU before calling cudaMalloc
416 int64_t buffer_size_;
417 int64_t buffer_position_;
418 std::shared_ptr<CudaHostBuffer> host_buffer_;
419 uint8_t* host_buffer_data_;
420 };
421
CudaBufferWriter(const std::shared_ptr<CudaBuffer> & buffer)422 CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer) {
423 impl_.reset(new CudaBufferWriterImpl(buffer));
424 }
425
~CudaBufferWriter()426 CudaBufferWriter::~CudaBufferWriter() {}
427
Close()428 Status CudaBufferWriter::Close() { return impl_->Close(); }
429
closed() const430 bool CudaBufferWriter::closed() const { return impl_->closed(); }
431
Flush()432 Status CudaBufferWriter::Flush() { return impl_->Flush(); }
433
Seek(int64_t position)434 Status CudaBufferWriter::Seek(int64_t position) {
435 if (impl_->buffer_position() > 0) {
436 RETURN_NOT_OK(Flush());
437 }
438 return impl_->Seek(position);
439 }
440
Tell() const441 Result<int64_t> CudaBufferWriter::Tell() const { return impl_->Tell(); }
442
Write(const void * data,int64_t nbytes)443 Status CudaBufferWriter::Write(const void* data, int64_t nbytes) {
444 return impl_->Write(data, nbytes);
445 }
446
WriteAt(int64_t position,const void * data,int64_t nbytes)447 Status CudaBufferWriter::WriteAt(int64_t position, const void* data, int64_t nbytes) {
448 return impl_->WriteAt(position, data, nbytes);
449 }
450
SetBufferSize(const int64_t buffer_size)451 Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
452 return impl_->SetBufferSize(buffer_size);
453 }
454
buffer_size() const455 int64_t CudaBufferWriter::buffer_size() const { return impl_->buffer_size(); }
456
num_bytes_buffered() const457 int64_t CudaBufferWriter::num_bytes_buffered() const { return impl_->buffer_position(); }
458
459 // ----------------------------------------------------------------------
460
AllocateCudaHostBuffer(int device_number,const int64_t size)461 Result<std::shared_ptr<CudaHostBuffer>> AllocateCudaHostBuffer(int device_number,
462 const int64_t size) {
463 ARROW_ASSIGN_OR_RAISE(auto manager, CudaDeviceManager::Instance());
464 return manager->AllocateHost(device_number, size);
465 }
466
GetDeviceAddress(const uint8_t * cpu_data,const std::shared_ptr<CudaContext> & ctx)467 Result<uintptr_t> GetDeviceAddress(const uint8_t* cpu_data,
468 const std::shared_ptr<CudaContext>& ctx) {
469 ContextSaver context_saver(*ctx);
470 CUdeviceptr ptr;
471 // XXX should we use cuPointerGetAttribute(CU_POINTER_ATTRIBUTE_DEVICE_POINTER)
472 // instead?
473 CU_RETURN_NOT_OK("cuMemHostGetDevicePointer",
474 cuMemHostGetDevicePointer(&ptr, const_cast<uint8_t*>(cpu_data), 0));
475 return static_cast<uintptr_t>(ptr);
476 }
477
GetHostAddress(uintptr_t device_ptr)478 Result<uint8_t*> GetHostAddress(uintptr_t device_ptr) {
479 void* ptr;
480 CU_RETURN_NOT_OK(
481 "cuPointerGetAttribute",
482 cuPointerGetAttribute(&ptr, CU_POINTER_ATTRIBUTE_HOST_POINTER, device_ptr));
483 return static_cast<uint8_t*>(ptr);
484 }
485
486 } // namespace cuda
487 } // namespace arrow
488