1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/util/windows_compatibility.h" // IWYU pragma: keep
19
20 // sys/mman.h not present in Visual Studio or Cygwin
21 #ifdef _WIN32
22 #ifndef NOMINMAX
23 #define NOMINMAX
24 #endif
25 #include "arrow/io/mman.h"
26 #undef Realloc
27 #undef Free
28 #else
29 #include <fcntl.h>
30 #include <sys/mman.h>
31 #include <unistd.h> // IWYU pragma: keep
32 #endif
33
34 #include <algorithm>
35 #include <atomic>
36 #include <cerrno>
37 #include <cstdint>
38 #include <cstring>
39 #include <memory>
40 #include <mutex>
41 #include <sstream>
42 #include <string>
43 #include <utility>
44
45 // ----------------------------------------------------------------------
46 // Other Arrow includes
47
48 #include "arrow/io/file.h"
49 #include "arrow/io/interfaces.h"
50 #include "arrow/io/util_internal.h"
51
52 #include "arrow/buffer.h"
53 #include "arrow/memory_pool.h"
54 #include "arrow/status.h"
55 #include "arrow/util/future.h"
56 #include "arrow/util/io_util.h"
57 #include "arrow/util/logging.h"
58
59 namespace arrow {
60
61 using internal::IOErrorFromErrno;
62
63 namespace io {
64
65 class OSFile {
66 public:
OSFile()67 OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}
68
~OSFile()69 ~OSFile() {}
70
71 // Note: only one of the Open* methods below may be called on a given instance
72
OpenWritable(const std::string & path,bool truncate,bool append,bool write_only)73 Status OpenWritable(const std::string& path, bool truncate, bool append,
74 bool write_only) {
75 RETURN_NOT_OK(SetFileName(path));
76
77 ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_, write_only,
78 truncate, append));
79 is_open_ = true;
80 mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
81
82 if (!truncate) {
83 ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
84 } else {
85 size_ = 0;
86 }
87 return Status::OK();
88 }
89
90 // This is different from OpenWritable(string, ...) in that it doesn't
91 // truncate nor mandate a seekable file
OpenWritable(int fd)92 Status OpenWritable(int fd) {
93 auto result = ::arrow::internal::FileGetSize(fd);
94 if (result.ok()) {
95 size_ = *result;
96 } else {
97 // Non-seekable file
98 size_ = -1;
99 }
100 RETURN_NOT_OK(SetFileName(fd));
101 is_open_ = true;
102 mode_ = FileMode::WRITE;
103 fd_ = fd;
104 return Status::OK();
105 }
106
OpenReadable(const std::string & path)107 Status OpenReadable(const std::string& path) {
108 RETURN_NOT_OK(SetFileName(path));
109
110 ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenReadable(file_name_));
111 ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
112
113 is_open_ = true;
114 mode_ = FileMode::READ;
115 return Status::OK();
116 }
117
OpenReadable(int fd)118 Status OpenReadable(int fd) {
119 ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd));
120 RETURN_NOT_OK(SetFileName(fd));
121 is_open_ = true;
122 mode_ = FileMode::READ;
123 fd_ = fd;
124 return Status::OK();
125 }
126
CheckClosed() const127 Status CheckClosed() const {
128 if (!is_open_) {
129 return Status::Invalid("Invalid operation on closed file");
130 }
131 return Status::OK();
132 }
133
Close()134 Status Close() {
135 if (is_open_) {
136 // Even if closing fails, the fd will likely be closed (perhaps it's
137 // already closed).
138 is_open_ = false;
139 int fd = fd_;
140 fd_ = -1;
141 RETURN_NOT_OK(::arrow::internal::FileClose(fd));
142 }
143 return Status::OK();
144 }
145
Read(int64_t nbytes,void * out)146 Result<int64_t> Read(int64_t nbytes, void* out) {
147 RETURN_NOT_OK(CheckClosed());
148 RETURN_NOT_OK(CheckPositioned());
149 return ::arrow::internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes);
150 }
151
ReadAt(int64_t position,int64_t nbytes,void * out)152 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
153 RETURN_NOT_OK(CheckClosed());
154 RETURN_NOT_OK(internal::ValidateRange(position, nbytes));
155 // ReadAt() leaves the file position undefined, so require that we seek
156 // before calling Read() or Write().
157 need_seeking_.store(true);
158 return ::arrow::internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position,
159 nbytes);
160 }
161
Seek(int64_t pos)162 Status Seek(int64_t pos) {
163 RETURN_NOT_OK(CheckClosed());
164 if (pos < 0) {
165 return Status::Invalid("Invalid position");
166 }
167 Status st = ::arrow::internal::FileSeek(fd_, pos);
168 if (st.ok()) {
169 need_seeking_.store(false);
170 }
171 return st;
172 }
173
Tell() const174 Result<int64_t> Tell() const {
175 RETURN_NOT_OK(CheckClosed());
176 return ::arrow::internal::FileTell(fd_);
177 }
178
Write(const void * data,int64_t length)179 Status Write(const void* data, int64_t length) {
180 RETURN_NOT_OK(CheckClosed());
181
182 std::lock_guard<std::mutex> guard(lock_);
183 RETURN_NOT_OK(CheckPositioned());
184 if (length < 0) {
185 return Status::IOError("Length must be non-negative");
186 }
187 return ::arrow::internal::FileWrite(fd_, reinterpret_cast<const uint8_t*>(data),
188 length);
189 }
190
fd() const191 int fd() const { return fd_; }
192
is_open() const193 bool is_open() const { return is_open_; }
194
size() const195 int64_t size() const { return size_; }
196
mode() const197 FileMode::type mode() const { return mode_; }
198
lock()199 std::mutex& lock() { return lock_; }
200
201 protected:
SetFileName(const std::string & file_name)202 Status SetFileName(const std::string& file_name) {
203 return ::arrow::internal::PlatformFilename::FromString(file_name).Value(&file_name_);
204 }
205
SetFileName(int fd)206 Status SetFileName(int fd) {
207 std::stringstream ss;
208 ss << "<fd " << fd << ">";
209 return SetFileName(ss.str());
210 }
211
CheckPositioned()212 Status CheckPositioned() {
213 if (need_seeking_.load()) {
214 return Status::Invalid(
215 "Need seeking after ReadAt() before "
216 "calling implicitly-positioned operation");
217 }
218 return Status::OK();
219 }
220
221 ::arrow::internal::PlatformFilename file_name_;
222
223 std::mutex lock_;
224
225 // File descriptor
226 int fd_;
227
228 FileMode::type mode_;
229
230 bool is_open_;
231 int64_t size_;
232 // Whether ReadAt made the file position non-deterministic.
233 std::atomic<bool> need_seeking_;
234 };
235
236 // ----------------------------------------------------------------------
237 // ReadableFile implementation
238
239 class ReadableFile::ReadableFileImpl : public OSFile {
240 public:
ReadableFileImpl(MemoryPool * pool)241 explicit ReadableFileImpl(MemoryPool* pool) : OSFile(), pool_(pool) {}
242
Open(const std::string & path)243 Status Open(const std::string& path) { return OpenReadable(path); }
Open(int fd)244 Status Open(int fd) { return OpenReadable(fd); }
245
ReadBuffer(int64_t nbytes)246 Result<std::shared_ptr<Buffer>> ReadBuffer(int64_t nbytes) {
247 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
248
249 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()));
250 if (bytes_read < nbytes) {
251 RETURN_NOT_OK(buffer->Resize(bytes_read));
252 buffer->ZeroPadding();
253 }
254 return std::move(buffer);
255 }
256
ReadBufferAt(int64_t position,int64_t nbytes)257 Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t nbytes) {
258 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
259
260 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
261 ReadAt(position, nbytes, buffer->mutable_data()));
262 if (bytes_read < nbytes) {
263 RETURN_NOT_OK(buffer->Resize(bytes_read));
264 buffer->ZeroPadding();
265 }
266 return std::move(buffer);
267 }
268
WillNeed(const std::vector<ReadRange> & ranges)269 Status WillNeed(const std::vector<ReadRange>& ranges) {
270 RETURN_NOT_OK(CheckClosed());
271 for (const auto& range : ranges) {
272 RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length));
273 #if defined(POSIX_FADV_WILLNEED)
274 if (posix_fadvise(fd_, range.offset, range.length, POSIX_FADV_WILLNEED)) {
275 return IOErrorFromErrno(errno, "posix_fadvise failed");
276 }
277 #elif defined(F_RDADVISE) // macOS, BSD?
278 struct {
279 off_t ra_offset;
280 int ra_count;
281 } radvisory{range.offset, static_cast<int>(range.length)};
282 if (radvisory.ra_count > 0 && fcntl(fd_, F_RDADVISE, &radvisory) == -1) {
283 return IOErrorFromErrno(errno, "fcntl(fd, F_RDADVISE, ...) failed");
284 }
285 #endif
286 }
287 return Status::OK();
288 }
289
290 private:
291 MemoryPool* pool_;
292 };
293
ReadableFile(MemoryPool * pool)294 ReadableFile::ReadableFile(MemoryPool* pool) { impl_.reset(new ReadableFileImpl(pool)); }
295
~ReadableFile()296 ReadableFile::~ReadableFile() { internal::CloseFromDestructor(this); }
297
Open(const std::string & path,MemoryPool * pool)298 Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(const std::string& path,
299 MemoryPool* pool) {
300 auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool));
301 RETURN_NOT_OK(file->impl_->Open(path));
302 return file;
303 }
304
Open(int fd,MemoryPool * pool)305 Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(int fd, MemoryPool* pool) {
306 auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool));
307 RETURN_NOT_OK(file->impl_->Open(fd));
308 return file;
309 }
310
DoClose()311 Status ReadableFile::DoClose() { return impl_->Close(); }
312
closed() const313 bool ReadableFile::closed() const { return !impl_->is_open(); }
314
WillNeed(const std::vector<ReadRange> & ranges)315 Status ReadableFile::WillNeed(const std::vector<ReadRange>& ranges) {
316 return impl_->WillNeed(ranges);
317 }
318
DoTell() const319 Result<int64_t> ReadableFile::DoTell() const { return impl_->Tell(); }
320
DoRead(int64_t nbytes,void * out)321 Result<int64_t> ReadableFile::DoRead(int64_t nbytes, void* out) {
322 return impl_->Read(nbytes, out);
323 }
324
DoReadAt(int64_t position,int64_t nbytes,void * out)325 Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void* out) {
326 return impl_->ReadAt(position, nbytes, out);
327 }
328
DoReadAt(int64_t position,int64_t nbytes)329 Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position, int64_t nbytes) {
330 return impl_->ReadBufferAt(position, nbytes);
331 }
332
DoRead(int64_t nbytes)333 Result<std::shared_ptr<Buffer>> ReadableFile::DoRead(int64_t nbytes) {
334 return impl_->ReadBuffer(nbytes);
335 }
336
DoGetSize()337 Result<int64_t> ReadableFile::DoGetSize() { return impl_->size(); }
338
DoSeek(int64_t pos)339 Status ReadableFile::DoSeek(int64_t pos) { return impl_->Seek(pos); }
340
file_descriptor() const341 int ReadableFile::file_descriptor() const { return impl_->fd(); }
342
343 // ----------------------------------------------------------------------
344 // FileOutputStream
345
346 class FileOutputStream::FileOutputStreamImpl : public OSFile {
347 public:
Open(const std::string & path,bool append)348 Status Open(const std::string& path, bool append) {
349 const bool truncate = !append;
350 return OpenWritable(path, truncate, append, true /* write_only */);
351 }
Open(int fd)352 Status Open(int fd) { return OpenWritable(fd); }
353 };
354
FileOutputStream()355 FileOutputStream::FileOutputStream() { impl_.reset(new FileOutputStreamImpl()); }
356
~FileOutputStream()357 FileOutputStream::~FileOutputStream() { internal::CloseFromDestructor(this); }
358
Open(const std::string & path,bool append)359 Result<std::shared_ptr<FileOutputStream>> FileOutputStream::Open(const std::string& path,
360 bool append) {
361 auto stream = std::shared_ptr<FileOutputStream>(new FileOutputStream());
362 RETURN_NOT_OK(stream->impl_->Open(path, append));
363 return stream;
364 }
365
Open(int fd)366 Result<std::shared_ptr<FileOutputStream>> FileOutputStream::Open(int fd) {
367 auto stream = std::shared_ptr<FileOutputStream>(new FileOutputStream());
368 RETURN_NOT_OK(stream->impl_->Open(fd));
369 return stream;
370 }
371
Close()372 Status FileOutputStream::Close() { return impl_->Close(); }
373
closed() const374 bool FileOutputStream::closed() const { return !impl_->is_open(); }
375
Tell() const376 Result<int64_t> FileOutputStream::Tell() const { return impl_->Tell(); }
377
Write(const void * data,int64_t length)378 Status FileOutputStream::Write(const void* data, int64_t length) {
379 return impl_->Write(data, length);
380 }
381
file_descriptor() const382 int FileOutputStream::file_descriptor() const { return impl_->fd(); }
383
384 // ----------------------------------------------------------------------
385 // Implement MemoryMappedFile
386
387 class MemoryMappedFile::MemoryMap
388 : public std::enable_shared_from_this<MemoryMappedFile::MemoryMap> {
389 public:
390 // An object representing the entire memory-mapped region.
391 // It can be sliced in order to return individual subregions, which
392 // will then keep the original region alive as long as necessary.
393 class Region : public MutableBuffer {
394 public:
Region(std::shared_ptr<MemoryMappedFile::MemoryMap> memory_map,uint8_t * data,int64_t size)395 Region(std::shared_ptr<MemoryMappedFile::MemoryMap> memory_map, uint8_t* data,
396 int64_t size)
397 : MutableBuffer(data, size) {
398 is_mutable_ = memory_map->writable();
399 if (!is_mutable_) {
400 mutable_data_ = nullptr;
401 }
402 }
403
~Region()404 ~Region() {
405 if (data_ != nullptr) {
406 int result = munmap(data(), static_cast<size_t>(size_));
407 ARROW_CHECK_EQ(result, 0) << "munmap failed";
408 }
409 }
410
411 // For convenience
data()412 uint8_t* data() { return const_cast<uint8_t*>(data_); }
413
Detach()414 void Detach() { data_ = nullptr; }
415 };
416
MemoryMap()417 MemoryMap() : file_size_(0), map_len_(0) {}
418
~MemoryMap()419 ~MemoryMap() { ARROW_CHECK_OK(Close()); }
420
Close()421 Status Close() {
422 if (file_->is_open()) {
423 // Lose our reference to the MemoryMappedRegion, so that munmap()
424 // is called as soon as all buffer exports are released.
425 region_.reset();
426 return file_->Close();
427 } else {
428 return Status::OK();
429 }
430 }
431
closed() const432 bool closed() const { return !file_->is_open(); }
433
CheckClosed() const434 Status CheckClosed() const {
435 if (closed()) {
436 return Status::Invalid("Invalid operation on closed file");
437 }
438 return Status::OK();
439 }
440
Open(const std::string & path,FileMode::type mode,const int64_t offset=0,const int64_t length=-1)441 Status Open(const std::string& path, FileMode::type mode, const int64_t offset = 0,
442 const int64_t length = -1) {
443 file_.reset(new OSFile());
444
445 if (mode != FileMode::READ) {
446 // Memory mapping has permission failures if PROT_READ not set
447 prot_flags_ = PROT_READ | PROT_WRITE;
448 map_mode_ = MAP_SHARED;
449 constexpr bool append = false;
450 constexpr bool truncate = false;
451 constexpr bool write_only = false;
452 RETURN_NOT_OK(file_->OpenWritable(path, truncate, append, write_only));
453 } else {
454 prot_flags_ = PROT_READ;
455 map_mode_ = MAP_PRIVATE; // Changes are not to be committed back to the file
456 RETURN_NOT_OK(file_->OpenReadable(path));
457 }
458 map_len_ = offset_ = 0;
459
460 // Memory mapping fails when file size is 0
461 // delay it until the first resize
462 if (file_->size() > 0) {
463 RETURN_NOT_OK(InitMMap(file_->size(), false, offset, length));
464 }
465
466 position_ = 0;
467
468 return Status::OK();
469 }
470
471 // Resize the mmap and file to the specified size.
472 // Resize on memory mapped file region is not supported.
Resize(const int64_t new_size)473 Status Resize(const int64_t new_size) {
474 if (!writable()) {
475 return Status::IOError("Cannot resize a readonly memory map");
476 }
477 if (map_len_ != file_size_) {
478 return Status::IOError("Cannot resize a partial memory map");
479 }
480 if (region_.use_count() > 1) {
481 // There are buffer exports currently, the MemoryMapRemap() call
482 // would make the buffers invalid
483 return Status::IOError("Cannot resize memory map while there are active readers");
484 }
485
486 if (new_size == 0) {
487 if (map_len_ > 0) {
488 // Just unmap the mmap and truncate the file to 0 size
489 region_.reset();
490 RETURN_NOT_OK(::arrow::internal::FileTruncate(file_->fd(), 0));
491 map_len_ = offset_ = file_size_ = 0;
492 }
493 position_ = 0;
494 return Status::OK();
495 }
496
497 if (map_len_ > 0) {
498 void* result;
499 auto data = region_->data();
500 RETURN_NOT_OK(::arrow::internal::MemoryMapRemap(data, map_len_, new_size,
501 file_->fd(), &result));
502 region_->Detach(); // avoid munmap() on destruction
503 region_ = std::make_shared<Region>(shared_from_this(),
504 static_cast<uint8_t*>(result), new_size);
505 map_len_ = file_size_ = new_size;
506 offset_ = 0;
507 if (position_ > map_len_) {
508 position_ = map_len_;
509 }
510 } else {
511 DCHECK_EQ(position_, 0);
512 // the mmap is not yet initialized, resize the underlying
513 // file, since it might have been 0-sized
514 RETURN_NOT_OK(InitMMap(new_size, /*resize_file*/ true));
515 }
516 return Status::OK();
517 }
518
Seek(int64_t position)519 Status Seek(int64_t position) {
520 if (position < 0) {
521 return Status::Invalid("position is out of bounds");
522 }
523 position_ = position;
524 return Status::OK();
525 }
526
Slice(int64_t offset,int64_t length)527 Result<std::shared_ptr<Buffer>> Slice(int64_t offset, int64_t length) {
528 length = std::max<int64_t>(0, std::min(length, map_len_ - offset));
529
530 if (length > 0) {
531 DCHECK_NE(region_, nullptr);
532 return SliceBuffer(region_, offset, length);
533 } else {
534 return std::make_shared<Buffer>(nullptr, 0);
535 }
536 }
537
538 // map_len_ == file_size_ if memory mapping on the whole file
size() const539 int64_t size() const { return map_len_; }
540
position()541 int64_t position() { return position_; }
542
advance(int64_t nbytes)543 void advance(int64_t nbytes) { position_ = position_ + nbytes; }
544
head()545 uint8_t* head() { return data() + position_; }
546
data()547 uint8_t* data() { return region_ ? region_->data() : nullptr; }
548
writable()549 bool writable() { return file_->mode() != FileMode::READ; }
550
opened()551 bool opened() { return file_->is_open(); }
552
fd() const553 int fd() const { return file_->fd(); }
554
write_lock()555 std::mutex& write_lock() { return file_->lock(); }
556
resize_lock()557 std::mutex& resize_lock() { return resize_lock_; }
558
559 private:
560 // Initialize the mmap and set size, capacity and the data pointers
InitMMap(int64_t initial_size,bool resize_file=false,const int64_t offset=0,const int64_t length=-1)561 Status InitMMap(int64_t initial_size, bool resize_file = false,
562 const int64_t offset = 0, const int64_t length = -1) {
563 DCHECK(!region_);
564
565 if (resize_file) {
566 RETURN_NOT_OK(::arrow::internal::FileTruncate(file_->fd(), initial_size));
567 }
568
569 size_t mmap_length = static_cast<size_t>(initial_size);
570 if (length > initial_size) {
571 return Status::Invalid("mapping length is beyond file size");
572 }
573 if (length >= 0 && length < initial_size) {
574 // memory mapping a file region
575 mmap_length = static_cast<size_t>(length);
576 }
577
578 void* result = mmap(nullptr, mmap_length, prot_flags_, map_mode_, file_->fd(),
579 static_cast<off_t>(offset));
580 if (result == MAP_FAILED) {
581 return Status::IOError("Memory mapping file failed: ",
582 ::arrow::internal::ErrnoMessage(errno));
583 }
584 map_len_ = mmap_length;
585 offset_ = offset;
586 region_ = std::make_shared<Region>(shared_from_this(), static_cast<uint8_t*>(result),
587 map_len_);
588 file_size_ = initial_size;
589
590 return Status::OK();
591 }
592
593 std::unique_ptr<OSFile> file_;
594 int prot_flags_;
595 int map_mode_;
596
597 std::shared_ptr<Region> region_;
598 int64_t file_size_;
599 int64_t position_;
600 int64_t offset_;
601 int64_t map_len_;
602 std::mutex resize_lock_;
603 };
604
MemoryMappedFile()605 MemoryMappedFile::MemoryMappedFile() {}
606
~MemoryMappedFile()607 MemoryMappedFile::~MemoryMappedFile() { internal::CloseFromDestructor(this); }
608
Create(const std::string & path,int64_t size)609 Result<std::shared_ptr<MemoryMappedFile>> MemoryMappedFile::Create(
610 const std::string& path, int64_t size) {
611 ARROW_ASSIGN_OR_RAISE(auto file, FileOutputStream::Open(path));
612 RETURN_NOT_OK(::arrow::internal::FileTruncate(file->file_descriptor(), size));
613 RETURN_NOT_OK(file->Close());
614 return MemoryMappedFile::Open(path, FileMode::READWRITE);
615 }
616
Open(const std::string & path,FileMode::type mode)617 Result<std::shared_ptr<MemoryMappedFile>> MemoryMappedFile::Open(const std::string& path,
618 FileMode::type mode) {
619 std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile());
620
621 result->memory_map_.reset(new MemoryMap());
622 RETURN_NOT_OK(result->memory_map_->Open(path, mode));
623 return result;
624 }
625
Open(const std::string & path,FileMode::type mode,const int64_t offset,const int64_t length)626 Result<std::shared_ptr<MemoryMappedFile>> MemoryMappedFile::Open(const std::string& path,
627 FileMode::type mode,
628 const int64_t offset,
629 const int64_t length) {
630 std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile());
631
632 result->memory_map_.reset(new MemoryMap());
633 RETURN_NOT_OK(result->memory_map_->Open(path, mode, offset, length));
634 return result;
635 }
636
GetSize()637 Result<int64_t> MemoryMappedFile::GetSize() {
638 RETURN_NOT_OK(memory_map_->CheckClosed());
639 return memory_map_->size();
640 }
641
Tell() const642 Result<int64_t> MemoryMappedFile::Tell() const {
643 RETURN_NOT_OK(memory_map_->CheckClosed());
644 return memory_map_->position();
645 }
646
Seek(int64_t position)647 Status MemoryMappedFile::Seek(int64_t position) {
648 RETURN_NOT_OK(memory_map_->CheckClosed());
649 return memory_map_->Seek(position);
650 }
651
Close()652 Status MemoryMappedFile::Close() { return memory_map_->Close(); }
653
closed() const654 bool MemoryMappedFile::closed() const { return memory_map_->closed(); }
655
ReadAt(int64_t position,int64_t nbytes)656 Result<std::shared_ptr<Buffer>> MemoryMappedFile::ReadAt(int64_t position,
657 int64_t nbytes) {
658 RETURN_NOT_OK(memory_map_->CheckClosed());
659 // if the file is writable, we acquire the lock before creating any slices
660 // in case a resize is triggered concurrently, otherwise we wouldn't detect
661 // a change in the use count
662 auto guard_resize = memory_map_->writable()
663 ? std::unique_lock<std::mutex>(memory_map_->resize_lock())
664 : std::unique_lock<std::mutex>();
665
666 ARROW_ASSIGN_OR_RAISE(
667 nbytes, internal::ValidateReadRange(position, nbytes, memory_map_->size()));
668 // Arrange to page data in
669 RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed(
670 {{memory_map_->data() + position, static_cast<size_t>(nbytes)}}));
671 return memory_map_->Slice(position, nbytes);
672 }
673
ReadAt(int64_t position,int64_t nbytes,void * out)674 Result<int64_t> MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
675 RETURN_NOT_OK(memory_map_->CheckClosed());
676 auto guard_resize = memory_map_->writable()
677 ? std::unique_lock<std::mutex>(memory_map_->resize_lock())
678 : std::unique_lock<std::mutex>();
679
680 ARROW_ASSIGN_OR_RAISE(
681 nbytes, internal::ValidateReadRange(position, nbytes, memory_map_->size()));
682 if (nbytes > 0) {
683 memcpy(out, memory_map_->data() + position, static_cast<size_t>(nbytes));
684 }
685 return nbytes;
686 }
687
Read(int64_t nbytes,void * out)688 Result<int64_t> MemoryMappedFile::Read(int64_t nbytes, void* out) {
689 RETURN_NOT_OK(memory_map_->CheckClosed());
690 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(memory_map_->position(), nbytes, out));
691 memory_map_->advance(bytes_read);
692 return bytes_read;
693 }
694
Read(int64_t nbytes)695 Result<std::shared_ptr<Buffer>> MemoryMappedFile::Read(int64_t nbytes) {
696 RETURN_NOT_OK(memory_map_->CheckClosed());
697 ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(memory_map_->position(), nbytes));
698 memory_map_->advance(buffer->size());
699 return buffer;
700 }
701
ReadAsync(const AsyncContext &,int64_t position,int64_t nbytes)702 Future<std::shared_ptr<Buffer>> MemoryMappedFile::ReadAsync(const AsyncContext&,
703 int64_t position,
704 int64_t nbytes) {
705 return Future<std::shared_ptr<Buffer>>::MakeFinished(ReadAt(position, nbytes));
706 }
707
WillNeed(const std::vector<ReadRange> & ranges)708 Status MemoryMappedFile::WillNeed(const std::vector<ReadRange>& ranges) {
709 using ::arrow::internal::MemoryRegion;
710
711 RETURN_NOT_OK(memory_map_->CheckClosed());
712 auto guard_resize = memory_map_->writable()
713 ? std::unique_lock<std::mutex>(memory_map_->resize_lock())
714 : std::unique_lock<std::mutex>();
715
716 std::vector<MemoryRegion> regions(ranges.size());
717 for (size_t i = 0; i < ranges.size(); ++i) {
718 const auto& range = ranges[i];
719 ARROW_ASSIGN_OR_RAISE(
720 auto size,
721 internal::ValidateReadRange(range.offset, range.length, memory_map_->size()));
722 DCHECK_NE(memory_map_->data(), nullptr);
723 regions[i] = {const_cast<uint8_t*>(memory_map_->data() + range.offset),
724 static_cast<size_t>(size)};
725 }
726 return ::arrow::internal::MemoryAdviseWillNeed(regions);
727 }
728
supports_zero_copy() const729 bool MemoryMappedFile::supports_zero_copy() const { return true; }
730
WriteAt(int64_t position,const void * data,int64_t nbytes)731 Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) {
732 RETURN_NOT_OK(memory_map_->CheckClosed());
733 std::lock_guard<std::mutex> guard(memory_map_->write_lock());
734
735 if (!memory_map_->opened() || !memory_map_->writable()) {
736 return Status::IOError("Unable to write");
737 }
738 RETURN_NOT_OK(internal::ValidateWriteRange(position, nbytes, memory_map_->size()));
739
740 RETURN_NOT_OK(memory_map_->Seek(position));
741 return WriteInternal(data, nbytes);
742 }
743
Write(const void * data,int64_t nbytes)744 Status MemoryMappedFile::Write(const void* data, int64_t nbytes) {
745 RETURN_NOT_OK(memory_map_->CheckClosed());
746 std::lock_guard<std::mutex> guard(memory_map_->write_lock());
747
748 if (!memory_map_->opened() || !memory_map_->writable()) {
749 return Status::IOError("Unable to write");
750 }
751 RETURN_NOT_OK(
752 internal::ValidateWriteRange(memory_map_->position(), nbytes, memory_map_->size()));
753
754 return WriteInternal(data, nbytes);
755 }
756
WriteInternal(const void * data,int64_t nbytes)757 Status MemoryMappedFile::WriteInternal(const void* data, int64_t nbytes) {
758 memcpy(memory_map_->head(), data, static_cast<size_t>(nbytes));
759 memory_map_->advance(nbytes);
760 return Status::OK();
761 }
762
Resize(int64_t new_size)763 Status MemoryMappedFile::Resize(int64_t new_size) {
764 RETURN_NOT_OK(memory_map_->CheckClosed());
765 std::unique_lock<std::mutex> write_guard(memory_map_->write_lock(), std::defer_lock);
766 std::unique_lock<std::mutex> resize_guard(memory_map_->resize_lock(), std::defer_lock);
767 std::lock(write_guard, resize_guard);
768 RETURN_NOT_OK(memory_map_->Resize(new_size));
769 return Status::OK();
770 }
771
file_descriptor() const772 int MemoryMappedFile::file_descriptor() const { return memory_map_->fd(); }
773
774 } // namespace io
775 } // namespace arrow
776