1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/util/windows_compatibility.h"  // IWYU pragma: keep
19 
20 // sys/mman.h not present in Visual Studio or Cygwin
21 #ifdef _WIN32
22 #ifndef NOMINMAX
23 #define NOMINMAX
24 #endif
25 #include "arrow/io/mman.h"
26 #undef Realloc
27 #undef Free
28 #else
29 #include <fcntl.h>
30 #include <sys/mman.h>
31 #include <unistd.h>  // IWYU pragma: keep
32 #endif
33 
34 #include <algorithm>
35 #include <atomic>
36 #include <cerrno>
37 #include <cstdint>
38 #include <cstring>
39 #include <memory>
40 #include <mutex>
41 #include <sstream>
42 #include <string>
43 #include <utility>
44 
45 // ----------------------------------------------------------------------
46 // Other Arrow includes
47 
48 #include "arrow/io/file.h"
49 #include "arrow/io/interfaces.h"
50 #include "arrow/io/util_internal.h"
51 
52 #include "arrow/buffer.h"
53 #include "arrow/memory_pool.h"
54 #include "arrow/status.h"
55 #include "arrow/util/future.h"
56 #include "arrow/util/io_util.h"
57 #include "arrow/util/logging.h"
58 
59 namespace arrow {
60 
61 using internal::IOErrorFromErrno;
62 
63 namespace io {
64 
65 class OSFile {
66  public:
OSFile()67   OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}
68 
~OSFile()69   ~OSFile() {}
70 
71   // Note: only one of the Open* methods below may be called on a given instance
72 
OpenWritable(const std::string & path,bool truncate,bool append,bool write_only)73   Status OpenWritable(const std::string& path, bool truncate, bool append,
74                       bool write_only) {
75     RETURN_NOT_OK(SetFileName(path));
76 
77     ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_, write_only,
78                                                                    truncate, append));
79     is_open_ = true;
80     mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
81 
82     if (!truncate) {
83       ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
84     } else {
85       size_ = 0;
86     }
87     return Status::OK();
88   }
89 
90   // This is different from OpenWritable(string, ...) in that it doesn't
91   // truncate nor mandate a seekable file
OpenWritable(int fd)92   Status OpenWritable(int fd) {
93     auto result = ::arrow::internal::FileGetSize(fd);
94     if (result.ok()) {
95       size_ = *result;
96     } else {
97       // Non-seekable file
98       size_ = -1;
99     }
100     RETURN_NOT_OK(SetFileName(fd));
101     is_open_ = true;
102     mode_ = FileMode::WRITE;
103     fd_ = fd;
104     return Status::OK();
105   }
106 
OpenReadable(const std::string & path)107   Status OpenReadable(const std::string& path) {
108     RETURN_NOT_OK(SetFileName(path));
109 
110     ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenReadable(file_name_));
111     ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
112 
113     is_open_ = true;
114     mode_ = FileMode::READ;
115     return Status::OK();
116   }
117 
OpenReadable(int fd)118   Status OpenReadable(int fd) {
119     ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd));
120     RETURN_NOT_OK(SetFileName(fd));
121     is_open_ = true;
122     mode_ = FileMode::READ;
123     fd_ = fd;
124     return Status::OK();
125   }
126 
CheckClosed() const127   Status CheckClosed() const {
128     if (!is_open_) {
129       return Status::Invalid("Invalid operation on closed file");
130     }
131     return Status::OK();
132   }
133 
Close()134   Status Close() {
135     if (is_open_) {
136       // Even if closing fails, the fd will likely be closed (perhaps it's
137       // already closed).
138       is_open_ = false;
139       int fd = fd_;
140       fd_ = -1;
141       RETURN_NOT_OK(::arrow::internal::FileClose(fd));
142     }
143     return Status::OK();
144   }
145 
Read(int64_t nbytes,void * out)146   Result<int64_t> Read(int64_t nbytes, void* out) {
147     RETURN_NOT_OK(CheckClosed());
148     RETURN_NOT_OK(CheckPositioned());
149     return ::arrow::internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes);
150   }
151 
ReadAt(int64_t position,int64_t nbytes,void * out)152   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
153     RETURN_NOT_OK(CheckClosed());
154     RETURN_NOT_OK(internal::ValidateRange(position, nbytes));
155     // ReadAt() leaves the file position undefined, so require that we seek
156     // before calling Read() or Write().
157     need_seeking_.store(true);
158     return ::arrow::internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position,
159                                          nbytes);
160   }
161 
Seek(int64_t pos)162   Status Seek(int64_t pos) {
163     RETURN_NOT_OK(CheckClosed());
164     if (pos < 0) {
165       return Status::Invalid("Invalid position");
166     }
167     Status st = ::arrow::internal::FileSeek(fd_, pos);
168     if (st.ok()) {
169       need_seeking_.store(false);
170     }
171     return st;
172   }
173 
Tell() const174   Result<int64_t> Tell() const {
175     RETURN_NOT_OK(CheckClosed());
176     return ::arrow::internal::FileTell(fd_);
177   }
178 
Write(const void * data,int64_t length)179   Status Write(const void* data, int64_t length) {
180     RETURN_NOT_OK(CheckClosed());
181 
182     std::lock_guard<std::mutex> guard(lock_);
183     RETURN_NOT_OK(CheckPositioned());
184     if (length < 0) {
185       return Status::IOError("Length must be non-negative");
186     }
187     return ::arrow::internal::FileWrite(fd_, reinterpret_cast<const uint8_t*>(data),
188                                         length);
189   }
190 
fd() const191   int fd() const { return fd_; }
192 
is_open() const193   bool is_open() const { return is_open_; }
194 
size() const195   int64_t size() const { return size_; }
196 
mode() const197   FileMode::type mode() const { return mode_; }
198 
lock()199   std::mutex& lock() { return lock_; }
200 
201  protected:
SetFileName(const std::string & file_name)202   Status SetFileName(const std::string& file_name) {
203     return ::arrow::internal::PlatformFilename::FromString(file_name).Value(&file_name_);
204   }
205 
SetFileName(int fd)206   Status SetFileName(int fd) {
207     std::stringstream ss;
208     ss << "<fd " << fd << ">";
209     return SetFileName(ss.str());
210   }
211 
CheckPositioned()212   Status CheckPositioned() {
213     if (need_seeking_.load()) {
214       return Status::Invalid(
215           "Need seeking after ReadAt() before "
216           "calling implicitly-positioned operation");
217     }
218     return Status::OK();
219   }
220 
221   ::arrow::internal::PlatformFilename file_name_;
222 
223   std::mutex lock_;
224 
225   // File descriptor
226   int fd_;
227 
228   FileMode::type mode_;
229 
230   bool is_open_;
231   int64_t size_;
232   // Whether ReadAt made the file position non-deterministic.
233   std::atomic<bool> need_seeking_;
234 };
235 
236 // ----------------------------------------------------------------------
237 // ReadableFile implementation
238 
239 class ReadableFile::ReadableFileImpl : public OSFile {
240  public:
ReadableFileImpl(MemoryPool * pool)241   explicit ReadableFileImpl(MemoryPool* pool) : OSFile(), pool_(pool) {}
242 
Open(const std::string & path)243   Status Open(const std::string& path) { return OpenReadable(path); }
Open(int fd)244   Status Open(int fd) { return OpenReadable(fd); }
245 
ReadBuffer(int64_t nbytes)246   Result<std::shared_ptr<Buffer>> ReadBuffer(int64_t nbytes) {
247     ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
248 
249     ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()));
250     if (bytes_read < nbytes) {
251       RETURN_NOT_OK(buffer->Resize(bytes_read));
252       buffer->ZeroPadding();
253     }
254     return std::move(buffer);
255   }
256 
ReadBufferAt(int64_t position,int64_t nbytes)257   Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t nbytes) {
258     ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
259 
260     ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
261                           ReadAt(position, nbytes, buffer->mutable_data()));
262     if (bytes_read < nbytes) {
263       RETURN_NOT_OK(buffer->Resize(bytes_read));
264       buffer->ZeroPadding();
265     }
266     return std::move(buffer);
267   }
268 
WillNeed(const std::vector<ReadRange> & ranges)269   Status WillNeed(const std::vector<ReadRange>& ranges) {
270     RETURN_NOT_OK(CheckClosed());
271     for (const auto& range : ranges) {
272       RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length));
273 #if defined(POSIX_FADV_WILLNEED)
274       if (posix_fadvise(fd_, range.offset, range.length, POSIX_FADV_WILLNEED)) {
275         return IOErrorFromErrno(errno, "posix_fadvise failed");
276       }
277 #elif defined(F_RDADVISE)  // macOS, BSD?
278       struct {
279         off_t ra_offset;
280         int ra_count;
281       } radvisory{range.offset, static_cast<int>(range.length)};
282       if (radvisory.ra_count > 0 && fcntl(fd_, F_RDADVISE, &radvisory) == -1) {
283         return IOErrorFromErrno(errno, "fcntl(fd, F_RDADVISE, ...) failed");
284       }
285 #endif
286     }
287     return Status::OK();
288   }
289 
290  private:
291   MemoryPool* pool_;
292 };
293 
ReadableFile(MemoryPool * pool)294 ReadableFile::ReadableFile(MemoryPool* pool) { impl_.reset(new ReadableFileImpl(pool)); }
295 
~ReadableFile()296 ReadableFile::~ReadableFile() { internal::CloseFromDestructor(this); }
297 
Open(const std::string & path,MemoryPool * pool)298 Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(const std::string& path,
299                                                          MemoryPool* pool) {
300   auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool));
301   RETURN_NOT_OK(file->impl_->Open(path));
302   return file;
303 }
304 
Open(int fd,MemoryPool * pool)305 Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(int fd, MemoryPool* pool) {
306   auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool));
307   RETURN_NOT_OK(file->impl_->Open(fd));
308   return file;
309 }
310 
DoClose()311 Status ReadableFile::DoClose() { return impl_->Close(); }
312 
closed() const313 bool ReadableFile::closed() const { return !impl_->is_open(); }
314 
WillNeed(const std::vector<ReadRange> & ranges)315 Status ReadableFile::WillNeed(const std::vector<ReadRange>& ranges) {
316   return impl_->WillNeed(ranges);
317 }
318 
DoTell() const319 Result<int64_t> ReadableFile::DoTell() const { return impl_->Tell(); }
320 
DoRead(int64_t nbytes,void * out)321 Result<int64_t> ReadableFile::DoRead(int64_t nbytes, void* out) {
322   return impl_->Read(nbytes, out);
323 }
324 
DoReadAt(int64_t position,int64_t nbytes,void * out)325 Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void* out) {
326   return impl_->ReadAt(position, nbytes, out);
327 }
328 
DoReadAt(int64_t position,int64_t nbytes)329 Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position, int64_t nbytes) {
330   return impl_->ReadBufferAt(position, nbytes);
331 }
332 
DoRead(int64_t nbytes)333 Result<std::shared_ptr<Buffer>> ReadableFile::DoRead(int64_t nbytes) {
334   return impl_->ReadBuffer(nbytes);
335 }
336 
DoGetSize()337 Result<int64_t> ReadableFile::DoGetSize() { return impl_->size(); }
338 
DoSeek(int64_t pos)339 Status ReadableFile::DoSeek(int64_t pos) { return impl_->Seek(pos); }
340 
file_descriptor() const341 int ReadableFile::file_descriptor() const { return impl_->fd(); }
342 
343 // ----------------------------------------------------------------------
344 // FileOutputStream
345 
346 class FileOutputStream::FileOutputStreamImpl : public OSFile {
347  public:
Open(const std::string & path,bool append)348   Status Open(const std::string& path, bool append) {
349     const bool truncate = !append;
350     return OpenWritable(path, truncate, append, true /* write_only */);
351   }
Open(int fd)352   Status Open(int fd) { return OpenWritable(fd); }
353 };
354 
FileOutputStream()355 FileOutputStream::FileOutputStream() { impl_.reset(new FileOutputStreamImpl()); }
356 
~FileOutputStream()357 FileOutputStream::~FileOutputStream() { internal::CloseFromDestructor(this); }
358 
Open(const std::string & path,bool append)359 Result<std::shared_ptr<FileOutputStream>> FileOutputStream::Open(const std::string& path,
360                                                                  bool append) {
361   auto stream = std::shared_ptr<FileOutputStream>(new FileOutputStream());
362   RETURN_NOT_OK(stream->impl_->Open(path, append));
363   return stream;
364 }
365 
Open(int fd)366 Result<std::shared_ptr<FileOutputStream>> FileOutputStream::Open(int fd) {
367   auto stream = std::shared_ptr<FileOutputStream>(new FileOutputStream());
368   RETURN_NOT_OK(stream->impl_->Open(fd));
369   return stream;
370 }
371 
Close()372 Status FileOutputStream::Close() { return impl_->Close(); }
373 
closed() const374 bool FileOutputStream::closed() const { return !impl_->is_open(); }
375 
Tell() const376 Result<int64_t> FileOutputStream::Tell() const { return impl_->Tell(); }
377 
Write(const void * data,int64_t length)378 Status FileOutputStream::Write(const void* data, int64_t length) {
379   return impl_->Write(data, length);
380 }
381 
file_descriptor() const382 int FileOutputStream::file_descriptor() const { return impl_->fd(); }
383 
384 // ----------------------------------------------------------------------
385 // Implement MemoryMappedFile
386 
387 class MemoryMappedFile::MemoryMap
388     : public std::enable_shared_from_this<MemoryMappedFile::MemoryMap> {
389  public:
390   // An object representing the entire memory-mapped region.
391   // It can be sliced in order to return individual subregions, which
392   // will then keep the original region alive as long as necessary.
393   class Region : public MutableBuffer {
394    public:
Region(std::shared_ptr<MemoryMappedFile::MemoryMap> memory_map,uint8_t * data,int64_t size)395     Region(std::shared_ptr<MemoryMappedFile::MemoryMap> memory_map, uint8_t* data,
396            int64_t size)
397         : MutableBuffer(data, size) {
398       is_mutable_ = memory_map->writable();
399       if (!is_mutable_) {
400         mutable_data_ = nullptr;
401       }
402     }
403 
~Region()404     ~Region() {
405       if (data_ != nullptr) {
406         int result = munmap(data(), static_cast<size_t>(size_));
407         ARROW_CHECK_EQ(result, 0) << "munmap failed";
408       }
409     }
410 
411     // For convenience
data()412     uint8_t* data() { return const_cast<uint8_t*>(data_); }
413 
Detach()414     void Detach() { data_ = nullptr; }
415   };
416 
MemoryMap()417   MemoryMap() : file_size_(0), map_len_(0) {}
418 
~MemoryMap()419   ~MemoryMap() { ARROW_CHECK_OK(Close()); }
420 
Close()421   Status Close() {
422     if (file_->is_open()) {
423       // Lose our reference to the MemoryMappedRegion, so that munmap()
424       // is called as soon as all buffer exports are released.
425       region_.reset();
426       return file_->Close();
427     } else {
428       return Status::OK();
429     }
430   }
431 
closed() const432   bool closed() const { return !file_->is_open(); }
433 
CheckClosed() const434   Status CheckClosed() const {
435     if (closed()) {
436       return Status::Invalid("Invalid operation on closed file");
437     }
438     return Status::OK();
439   }
440 
Open(const std::string & path,FileMode::type mode,const int64_t offset=0,const int64_t length=-1)441   Status Open(const std::string& path, FileMode::type mode, const int64_t offset = 0,
442               const int64_t length = -1) {
443     file_.reset(new OSFile());
444 
445     if (mode != FileMode::READ) {
446       // Memory mapping has permission failures if PROT_READ not set
447       prot_flags_ = PROT_READ | PROT_WRITE;
448       map_mode_ = MAP_SHARED;
449       constexpr bool append = false;
450       constexpr bool truncate = false;
451       constexpr bool write_only = false;
452       RETURN_NOT_OK(file_->OpenWritable(path, truncate, append, write_only));
453     } else {
454       prot_flags_ = PROT_READ;
455       map_mode_ = MAP_PRIVATE;  // Changes are not to be committed back to the file
456       RETURN_NOT_OK(file_->OpenReadable(path));
457     }
458     map_len_ = offset_ = 0;
459 
460     // Memory mapping fails when file size is 0
461     // delay it until the first resize
462     if (file_->size() > 0) {
463       RETURN_NOT_OK(InitMMap(file_->size(), false, offset, length));
464     }
465 
466     position_ = 0;
467 
468     return Status::OK();
469   }
470 
471   // Resize the mmap and file to the specified size.
472   // Resize on memory mapped file region is not supported.
Resize(const int64_t new_size)473   Status Resize(const int64_t new_size) {
474     if (!writable()) {
475       return Status::IOError("Cannot resize a readonly memory map");
476     }
477     if (map_len_ != file_size_) {
478       return Status::IOError("Cannot resize a partial memory map");
479     }
480     if (region_.use_count() > 1) {
481       // There are buffer exports currently, the MemoryMapRemap() call
482       // would make the buffers invalid
483       return Status::IOError("Cannot resize memory map while there are active readers");
484     }
485 
486     if (new_size == 0) {
487       if (map_len_ > 0) {
488         // Just unmap the mmap and truncate the file to 0 size
489         region_.reset();
490         RETURN_NOT_OK(::arrow::internal::FileTruncate(file_->fd(), 0));
491         map_len_ = offset_ = file_size_ = 0;
492       }
493       position_ = 0;
494       return Status::OK();
495     }
496 
497     if (map_len_ > 0) {
498       void* result;
499       auto data = region_->data();
500       RETURN_NOT_OK(::arrow::internal::MemoryMapRemap(data, map_len_, new_size,
501                                                       file_->fd(), &result));
502       region_->Detach();  // avoid munmap() on destruction
503       region_ = std::make_shared<Region>(shared_from_this(),
504                                          static_cast<uint8_t*>(result), new_size);
505       map_len_ = file_size_ = new_size;
506       offset_ = 0;
507       if (position_ > map_len_) {
508         position_ = map_len_;
509       }
510     } else {
511       DCHECK_EQ(position_, 0);
512       // the mmap is not yet initialized, resize the underlying
513       // file, since it might have been 0-sized
514       RETURN_NOT_OK(InitMMap(new_size, /*resize_file*/ true));
515     }
516     return Status::OK();
517   }
518 
Seek(int64_t position)519   Status Seek(int64_t position) {
520     if (position < 0) {
521       return Status::Invalid("position is out of bounds");
522     }
523     position_ = position;
524     return Status::OK();
525   }
526 
Slice(int64_t offset,int64_t length)527   Result<std::shared_ptr<Buffer>> Slice(int64_t offset, int64_t length) {
528     length = std::max<int64_t>(0, std::min(length, map_len_ - offset));
529 
530     if (length > 0) {
531       DCHECK_NE(region_, nullptr);
532       return SliceBuffer(region_, offset, length);
533     } else {
534       return std::make_shared<Buffer>(nullptr, 0);
535     }
536   }
537 
538   // map_len_ == file_size_ if memory mapping on the whole file
size() const539   int64_t size() const { return map_len_; }
540 
position()541   int64_t position() { return position_; }
542 
advance(int64_t nbytes)543   void advance(int64_t nbytes) { position_ = position_ + nbytes; }
544 
head()545   uint8_t* head() { return data() + position_; }
546 
data()547   uint8_t* data() { return region_ ? region_->data() : nullptr; }
548 
writable()549   bool writable() { return file_->mode() != FileMode::READ; }
550 
opened()551   bool opened() { return file_->is_open(); }
552 
fd() const553   int fd() const { return file_->fd(); }
554 
write_lock()555   std::mutex& write_lock() { return file_->lock(); }
556 
resize_lock()557   std::mutex& resize_lock() { return resize_lock_; }
558 
559  private:
560   // Initialize the mmap and set size, capacity and the data pointers
InitMMap(int64_t initial_size,bool resize_file=false,const int64_t offset=0,const int64_t length=-1)561   Status InitMMap(int64_t initial_size, bool resize_file = false,
562                   const int64_t offset = 0, const int64_t length = -1) {
563     DCHECK(!region_);
564 
565     if (resize_file) {
566       RETURN_NOT_OK(::arrow::internal::FileTruncate(file_->fd(), initial_size));
567     }
568 
569     size_t mmap_length = static_cast<size_t>(initial_size);
570     if (length > initial_size) {
571       return Status::Invalid("mapping length is beyond file size");
572     }
573     if (length >= 0 && length < initial_size) {
574       // memory mapping a file region
575       mmap_length = static_cast<size_t>(length);
576     }
577 
578     void* result = mmap(nullptr, mmap_length, prot_flags_, map_mode_, file_->fd(),
579                         static_cast<off_t>(offset));
580     if (result == MAP_FAILED) {
581       return Status::IOError("Memory mapping file failed: ",
582                              ::arrow::internal::ErrnoMessage(errno));
583     }
584     map_len_ = mmap_length;
585     offset_ = offset;
586     region_ = std::make_shared<Region>(shared_from_this(), static_cast<uint8_t*>(result),
587                                        map_len_);
588     file_size_ = initial_size;
589 
590     return Status::OK();
591   }
592 
593   std::unique_ptr<OSFile> file_;
594   int prot_flags_;
595   int map_mode_;
596 
597   std::shared_ptr<Region> region_;
598   int64_t file_size_;
599   int64_t position_;
600   int64_t offset_;
601   int64_t map_len_;
602   std::mutex resize_lock_;
603 };
604 
MemoryMappedFile()605 MemoryMappedFile::MemoryMappedFile() {}
606 
~MemoryMappedFile()607 MemoryMappedFile::~MemoryMappedFile() { internal::CloseFromDestructor(this); }
608 
Create(const std::string & path,int64_t size)609 Result<std::shared_ptr<MemoryMappedFile>> MemoryMappedFile::Create(
610     const std::string& path, int64_t size) {
611   ARROW_ASSIGN_OR_RAISE(auto file, FileOutputStream::Open(path));
612   RETURN_NOT_OK(::arrow::internal::FileTruncate(file->file_descriptor(), size));
613   RETURN_NOT_OK(file->Close());
614   return MemoryMappedFile::Open(path, FileMode::READWRITE);
615 }
616 
Open(const std::string & path,FileMode::type mode)617 Result<std::shared_ptr<MemoryMappedFile>> MemoryMappedFile::Open(const std::string& path,
618                                                                  FileMode::type mode) {
619   std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile());
620 
621   result->memory_map_.reset(new MemoryMap());
622   RETURN_NOT_OK(result->memory_map_->Open(path, mode));
623   return result;
624 }
625 
Open(const std::string & path,FileMode::type mode,const int64_t offset,const int64_t length)626 Result<std::shared_ptr<MemoryMappedFile>> MemoryMappedFile::Open(const std::string& path,
627                                                                  FileMode::type mode,
628                                                                  const int64_t offset,
629                                                                  const int64_t length) {
630   std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile());
631 
632   result->memory_map_.reset(new MemoryMap());
633   RETURN_NOT_OK(result->memory_map_->Open(path, mode, offset, length));
634   return result;
635 }
636 
GetSize()637 Result<int64_t> MemoryMappedFile::GetSize() {
638   RETURN_NOT_OK(memory_map_->CheckClosed());
639   return memory_map_->size();
640 }
641 
Tell() const642 Result<int64_t> MemoryMappedFile::Tell() const {
643   RETURN_NOT_OK(memory_map_->CheckClosed());
644   return memory_map_->position();
645 }
646 
Seek(int64_t position)647 Status MemoryMappedFile::Seek(int64_t position) {
648   RETURN_NOT_OK(memory_map_->CheckClosed());
649   return memory_map_->Seek(position);
650 }
651 
Close()652 Status MemoryMappedFile::Close() { return memory_map_->Close(); }
653 
closed() const654 bool MemoryMappedFile::closed() const { return memory_map_->closed(); }
655 
ReadAt(int64_t position,int64_t nbytes)656 Result<std::shared_ptr<Buffer>> MemoryMappedFile::ReadAt(int64_t position,
657                                                          int64_t nbytes) {
658   RETURN_NOT_OK(memory_map_->CheckClosed());
659   // if the file is writable, we acquire the lock before creating any slices
660   // in case a resize is triggered concurrently, otherwise we wouldn't detect
661   // a change in the use count
662   auto guard_resize = memory_map_->writable()
663                           ? std::unique_lock<std::mutex>(memory_map_->resize_lock())
664                           : std::unique_lock<std::mutex>();
665 
666   ARROW_ASSIGN_OR_RAISE(
667       nbytes, internal::ValidateReadRange(position, nbytes, memory_map_->size()));
668   // Arrange to page data in
669   RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed(
670       {{memory_map_->data() + position, static_cast<size_t>(nbytes)}}));
671   return memory_map_->Slice(position, nbytes);
672 }
673 
ReadAt(int64_t position,int64_t nbytes,void * out)674 Result<int64_t> MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
675   RETURN_NOT_OK(memory_map_->CheckClosed());
676   auto guard_resize = memory_map_->writable()
677                           ? std::unique_lock<std::mutex>(memory_map_->resize_lock())
678                           : std::unique_lock<std::mutex>();
679 
680   ARROW_ASSIGN_OR_RAISE(
681       nbytes, internal::ValidateReadRange(position, nbytes, memory_map_->size()));
682   if (nbytes > 0) {
683     memcpy(out, memory_map_->data() + position, static_cast<size_t>(nbytes));
684   }
685   return nbytes;
686 }
687 
Read(int64_t nbytes,void * out)688 Result<int64_t> MemoryMappedFile::Read(int64_t nbytes, void* out) {
689   RETURN_NOT_OK(memory_map_->CheckClosed());
690   ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(memory_map_->position(), nbytes, out));
691   memory_map_->advance(bytes_read);
692   return bytes_read;
693 }
694 
Read(int64_t nbytes)695 Result<std::shared_ptr<Buffer>> MemoryMappedFile::Read(int64_t nbytes) {
696   RETURN_NOT_OK(memory_map_->CheckClosed());
697   ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(memory_map_->position(), nbytes));
698   memory_map_->advance(buffer->size());
699   return buffer;
700 }
701 
ReadAsync(const AsyncContext &,int64_t position,int64_t nbytes)702 Future<std::shared_ptr<Buffer>> MemoryMappedFile::ReadAsync(const AsyncContext&,
703                                                             int64_t position,
704                                                             int64_t nbytes) {
705   return Future<std::shared_ptr<Buffer>>::MakeFinished(ReadAt(position, nbytes));
706 }
707 
WillNeed(const std::vector<ReadRange> & ranges)708 Status MemoryMappedFile::WillNeed(const std::vector<ReadRange>& ranges) {
709   using ::arrow::internal::MemoryRegion;
710 
711   RETURN_NOT_OK(memory_map_->CheckClosed());
712   auto guard_resize = memory_map_->writable()
713                           ? std::unique_lock<std::mutex>(memory_map_->resize_lock())
714                           : std::unique_lock<std::mutex>();
715 
716   std::vector<MemoryRegion> regions(ranges.size());
717   for (size_t i = 0; i < ranges.size(); ++i) {
718     const auto& range = ranges[i];
719     ARROW_ASSIGN_OR_RAISE(
720         auto size,
721         internal::ValidateReadRange(range.offset, range.length, memory_map_->size()));
722     DCHECK_NE(memory_map_->data(), nullptr);
723     regions[i] = {const_cast<uint8_t*>(memory_map_->data() + range.offset),
724                   static_cast<size_t>(size)};
725   }
726   return ::arrow::internal::MemoryAdviseWillNeed(regions);
727 }
728 
supports_zero_copy() const729 bool MemoryMappedFile::supports_zero_copy() const { return true; }
730 
WriteAt(int64_t position,const void * data,int64_t nbytes)731 Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) {
732   RETURN_NOT_OK(memory_map_->CheckClosed());
733   std::lock_guard<std::mutex> guard(memory_map_->write_lock());
734 
735   if (!memory_map_->opened() || !memory_map_->writable()) {
736     return Status::IOError("Unable to write");
737   }
738   RETURN_NOT_OK(internal::ValidateWriteRange(position, nbytes, memory_map_->size()));
739 
740   RETURN_NOT_OK(memory_map_->Seek(position));
741   return WriteInternal(data, nbytes);
742 }
743 
Write(const void * data,int64_t nbytes)744 Status MemoryMappedFile::Write(const void* data, int64_t nbytes) {
745   RETURN_NOT_OK(memory_map_->CheckClosed());
746   std::lock_guard<std::mutex> guard(memory_map_->write_lock());
747 
748   if (!memory_map_->opened() || !memory_map_->writable()) {
749     return Status::IOError("Unable to write");
750   }
751   RETURN_NOT_OK(
752       internal::ValidateWriteRange(memory_map_->position(), nbytes, memory_map_->size()));
753 
754   return WriteInternal(data, nbytes);
755 }
756 
WriteInternal(const void * data,int64_t nbytes)757 Status MemoryMappedFile::WriteInternal(const void* data, int64_t nbytes) {
758   memcpy(memory_map_->head(), data, static_cast<size_t>(nbytes));
759   memory_map_->advance(nbytes);
760   return Status::OK();
761 }
762 
Resize(int64_t new_size)763 Status MemoryMappedFile::Resize(int64_t new_size) {
764   RETURN_NOT_OK(memory_map_->CheckClosed());
765   std::unique_lock<std::mutex> write_guard(memory_map_->write_lock(), std::defer_lock);
766   std::unique_lock<std::mutex> resize_guard(memory_map_->resize_lock(), std::defer_lock);
767   std::lock(write_guard, resize_guard);
768   RETURN_NOT_OK(memory_map_->Resize(new_size));
769   return Status::OK();
770 }
771 
file_descriptor() const772 int MemoryMappedFile::file_descriptor() const { return memory_map_->fd(); }
773 
774 }  // namespace io
775 }  // namespace arrow
776