1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #ifdef ROCKSDB_LIB_IO_POSIX
11 #include "env/io_posix.h"
12 #include <errno.h>
13 #include <fcntl.h>
14 #include <algorithm>
15 #if defined(OS_LINUX)
16 #include <linux/fs.h>
17 #ifndef FALLOC_FL_KEEP_SIZE
18 #include <linux/falloc.h>
19 #endif
20 #endif
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/ioctl.h>
25 #include <sys/mman.h>
26 #include <sys/stat.h>
27 #include <sys/types.h>
28 #ifdef OS_LINUX
29 #include <sys/statfs.h>
30 #include <sys/sysmacros.h>
31 #endif
32 #include "monitoring/iostats_context_imp.h"
33 #include "port/port.h"
34 #include "rocksdb/slice.h"
35 #include "test_util/sync_point.h"
36 #include "util/autovector.h"
37 #include "util/coding.h"
38 #include "util/string_util.h"
39 
40 #if defined(OS_LINUX) && !defined(F_SET_RW_HINT)
41 #define F_LINUX_SPECIFIC_BASE 1024
42 #define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12)
43 #endif
44 
45 namespace ROCKSDB_NAMESPACE {
46 
IOErrorMsg(const std::string & context,const std::string & file_name)47 std::string IOErrorMsg(const std::string& context,
48                        const std::string& file_name) {
49   if (file_name.empty()) {
50     return context;
51   }
52   return context + ": " + file_name;
53 }
54 
55 // file_name can be left empty if it is not unkown.
IOError(const std::string & context,const std::string & file_name,int err_number)56 IOStatus IOError(const std::string& context, const std::string& file_name,
57                  int err_number) {
58   switch (err_number) {
59     case ENOSPC: {
60       IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name),
61                                      errnoStr(err_number).c_str());
62       s.SetRetryable(true);
63       return s;
64     }
65     case ESTALE:
66       return IOStatus::IOError(IOStatus::kStaleFile);
67     case ENOENT:
68       return IOStatus::PathNotFound(IOErrorMsg(context, file_name),
69                                     errnoStr(err_number).c_str());
70     default:
71       return IOStatus::IOError(IOErrorMsg(context, file_name),
72                                errnoStr(err_number).c_str());
73   }
74 }
75 
76 // A wrapper for fadvise, if the platform doesn't support fadvise,
77 // it will simply return 0.
Fadvise(int fd,off_t offset,size_t len,int advice)78 int Fadvise(int fd, off_t offset, size_t len, int advice) {
79 #ifdef OS_LINUX
80   return posix_fadvise(fd, offset, len, advice);
81 #else
82   (void)fd;
83   (void)offset;
84   (void)len;
85   (void)advice;
86   return 0;  // simply do nothing.
87 #endif
88 }
89 
90 namespace {
91 
92 // On MacOS (and probably *BSD), the posix write and pwrite calls do not support
93 // buffers larger than 2^31-1 bytes. These two wrappers fix this issue by
94 // cutting the buffer in 1GB chunks. We use this chunk size to be sure to keep
95 // the writes aligned.
96 
PosixWrite(int fd,const char * buf,size_t nbyte)97 bool PosixWrite(int fd, const char* buf, size_t nbyte) {
98   const size_t kLimit1Gb = 1UL << 30;
99 
100   const char* src = buf;
101   size_t left = nbyte;
102 
103   while (left != 0) {
104     size_t bytes_to_write = std::min(left, kLimit1Gb);
105 
106     ssize_t done = write(fd, src, bytes_to_write);
107     if (done < 0) {
108       if (errno == EINTR) {
109         continue;
110       }
111       return false;
112     }
113     left -= done;
114     src += done;
115   }
116   return true;
117 }
118 
PosixPositionedWrite(int fd,const char * buf,size_t nbyte,off_t offset)119 bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) {
120   const size_t kLimit1Gb = 1UL << 30;
121 
122   const char* src = buf;
123   size_t left = nbyte;
124 
125   while (left != 0) {
126     size_t bytes_to_write = std::min(left, kLimit1Gb);
127 
128     ssize_t done = pwrite(fd, src, bytes_to_write, offset);
129     if (done < 0) {
130       if (errno == EINTR) {
131         continue;
132       }
133       return false;
134     }
135     left -= done;
136     offset += done;
137     src += done;
138   }
139 
140   return true;
141 }
142 
143 #ifdef ROCKSDB_RANGESYNC_PRESENT
144 
145 #if !defined(ZFS_SUPER_MAGIC)
146 // The magic number for ZFS was not exposed until recently. It should be fixed
147 // forever so we can just copy the magic number here.
148 #define ZFS_SUPER_MAGIC 0x2fc12fc1
149 #endif
150 
IsSyncFileRangeSupported(int fd)151 bool IsSyncFileRangeSupported(int fd) {
152   // This function tracks and checks for cases where we know `sync_file_range`
153   // definitely will not work properly despite passing the compile-time check
154   // (`ROCKSDB_RANGESYNC_PRESENT`). If we are unsure, or if any of the checks
155   // fail in unexpected ways, we allow `sync_file_range` to be used. This way
156   // should minimize risk of impacting existing use cases.
157   struct statfs buf;
158   int ret = fstatfs(fd, &buf);
159   assert(ret == 0);
160   if (ret == 0 && buf.f_type == ZFS_SUPER_MAGIC) {
161     // Testing on ZFS showed the writeback did not happen asynchronously when
162     // `sync_file_range` was called, even though it returned success. Avoid it
163     // and use `fdatasync` instead to preserve the contract of `bytes_per_sync`,
164     // even though this'll incur extra I/O for metadata.
165     return false;
166   }
167 
168   ret = sync_file_range(fd, 0 /* offset */, 0 /* nbytes */, 0 /* flags */);
169   assert(!(ret == -1 && errno != ENOSYS));
170   if (ret == -1 && errno == ENOSYS) {
171     // `sync_file_range` is not implemented on all platforms even if
172     // compile-time checks pass and a supported filesystem is in-use. For
173     // example, using ext4 on WSL (Windows Subsystem for Linux),
174     // `sync_file_range()` returns `ENOSYS`
175     // ("Function not implemented").
176     return false;
177   }
178   // None of the known cases matched, so allow `sync_file_range` use.
179   return true;
180 }
181 
182 #undef ZFS_SUPER_MAGIC
183 
184 #endif  // ROCKSDB_RANGESYNC_PRESENT
185 
186 }  // anonymous namespace
187 
188 /*
189  * DirectIOHelper
190  */
191 namespace {
192 
IsSectorAligned(const size_t off,size_t sector_size)193 bool IsSectorAligned(const size_t off, size_t sector_size) {
194   assert((sector_size & (sector_size - 1)) == 0);
195   return (off & (sector_size - 1)) == 0;
196 }
197 
198 #ifndef NDEBUG
IsSectorAligned(const void * ptr,size_t sector_size)199 bool IsSectorAligned(const void* ptr, size_t sector_size) {
200   return uintptr_t(ptr) % sector_size == 0;
201 }
202 #endif
203 }  // namespace
204 
205 /*
206  * PosixSequentialFile
207  */
PosixSequentialFile(const std::string & fname,FILE * file,int fd,size_t logical_block_size,const EnvOptions & options)208 PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
209                                          int fd, size_t logical_block_size,
210                                          const EnvOptions& options)
211     : filename_(fname),
212       file_(file),
213       fd_(fd),
214       use_direct_io_(options.use_direct_reads),
215       logical_sector_size_(logical_block_size) {
216   assert(!options.use_direct_reads || !options.use_mmap_reads);
217 }
218 
~PosixSequentialFile()219 PosixSequentialFile::~PosixSequentialFile() {
220   if (!use_direct_io()) {
221     assert(file_);
222     fclose(file_);
223   } else {
224     assert(fd_);
225     close(fd_);
226   }
227 }
228 
Read(size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *)229 IOStatus PosixSequentialFile::Read(size_t n, const IOOptions& /*opts*/,
230                                    Slice* result, char* scratch,
231                                    IODebugContext* /*dbg*/) {
232   assert(result != nullptr && !use_direct_io());
233   IOStatus s;
234   size_t r = 0;
235   do {
236     clearerr(file_);
237     r = fread_unlocked(scratch, 1, n, file_);
238   } while (r == 0 && ferror(file_) && errno == EINTR);
239   *result = Slice(scratch, r);
240   if (r < n) {
241     if (feof(file_)) {
242       // We leave status as ok if we hit the end of the file
243       // We also clear the error so that the reads can continue
244       // if a new data is written to the file
245       clearerr(file_);
246     } else {
247       // A partial read with an error: return a non-ok status
248       s = IOError("While reading file sequentially", filename_, errno);
249     }
250   }
251   return s;
252 }
253 
PositionedRead(uint64_t offset,size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *)254 IOStatus PosixSequentialFile::PositionedRead(uint64_t offset, size_t n,
255                                              const IOOptions& /*opts*/,
256                                              Slice* result, char* scratch,
257                                              IODebugContext* /*dbg*/) {
258   assert(use_direct_io());
259   assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
260   assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
261   assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
262 
263   IOStatus s;
264   ssize_t r = -1;
265   size_t left = n;
266   char* ptr = scratch;
267   while (left > 0) {
268     r = pread(fd_, ptr, left, static_cast<off_t>(offset));
269     if (r <= 0) {
270       if (r == -1 && errno == EINTR) {
271         continue;
272       }
273       break;
274     }
275     ptr += r;
276     offset += r;
277     left -= r;
278     if (!IsSectorAligned(r, GetRequiredBufferAlignment())) {
279       // Bytes reads don't fill sectors. Should only happen at the end
280       // of the file.
281       break;
282     }
283   }
284   if (r < 0) {
285     // An error: return a non-ok status
286     s = IOError(
287         "While pread " + ToString(n) + " bytes from offset " + ToString(offset),
288         filename_, errno);
289   }
290   *result = Slice(scratch, (r < 0) ? 0 : n - left);
291   return s;
292 }
293 
Skip(uint64_t n)294 IOStatus PosixSequentialFile::Skip(uint64_t n) {
295   if (fseek(file_, static_cast<long int>(n), SEEK_CUR)) {
296     return IOError("While fseek to skip " + ToString(n) + " bytes", filename_,
297                    errno);
298   }
299   return IOStatus::OK();
300 }
301 
InvalidateCache(size_t offset,size_t length)302 IOStatus PosixSequentialFile::InvalidateCache(size_t offset, size_t length) {
303 #ifndef OS_LINUX
304   (void)offset;
305   (void)length;
306   return IOStatus::OK();
307 #else
308   if (!use_direct_io()) {
309     // free OS pages
310     int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
311     if (ret != 0) {
312       return IOError("While fadvise NotNeeded offset " + ToString(offset) +
313                          " len " + ToString(length),
314                      filename_, errno);
315     }
316   }
317   return IOStatus::OK();
318 #endif
319 }
320 
321 /*
322  * PosixRandomAccessFile
323  */
324 #if defined(OS_LINUX)
GetUniqueIdFromFile(int fd,char * id,size_t max_size)325 size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
326   if (max_size < kMaxVarint64Length * 3) {
327     return 0;
328   }
329 
330   struct stat buf;
331   int result = fstat(fd, &buf);
332   if (result == -1) {
333     return 0;
334   }
335 
336   long version = 0;
337   result = ioctl(fd, FS_IOC_GETVERSION, &version);
338   TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
339   if (result == -1) {
340     return 0;
341   }
342   uint64_t uversion = (uint64_t)version;
343 
344   char* rid = id;
345   rid = EncodeVarint64(rid, buf.st_dev);
346   rid = EncodeVarint64(rid, buf.st_ino);
347   rid = EncodeVarint64(rid, uversion);
348   assert(rid >= id);
349   return static_cast<size_t>(rid - id);
350 }
351 #endif
352 
353 #if defined(OS_MACOSX) || defined(OS_AIX)
GetUniqueIdFromFile(int fd,char * id,size_t max_size)354 size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
355   if (max_size < kMaxVarint64Length * 3) {
356     return 0;
357   }
358 
359   struct stat buf;
360   int result = fstat(fd, &buf);
361   if (result == -1) {
362     return 0;
363   }
364 
365   char* rid = id;
366   rid = EncodeVarint64(rid, buf.st_dev);
367   rid = EncodeVarint64(rid, buf.st_ino);
368   rid = EncodeVarint64(rid, buf.st_gen);
369   assert(rid >= id);
370   return static_cast<size_t>(rid - id);
371 }
372 #endif
373 
374 #ifdef OS_LINUX
RemoveTrailingSlash(const std::string & path)375 std::string RemoveTrailingSlash(const std::string& path) {
376   std::string p = path;
377   if (p.size() > 1 && p.back() == '/') {
378     p.pop_back();
379   }
380   return p;
381 }
382 
RefAndCacheLogicalBlockSize(const std::vector<std::string> & directories)383 Status LogicalBlockSizeCache::RefAndCacheLogicalBlockSize(
384     const std::vector<std::string>& directories) {
385   std::vector<std::string> dirs;
386   dirs.reserve(directories.size());
387   for (auto& d : directories) {
388     dirs.emplace_back(RemoveTrailingSlash(d));
389   }
390 
391   std::map<std::string, size_t> dir_sizes;
392   {
393     ReadLock lock(&cache_mutex_);
394     for (const auto& dir : dirs) {
395       if (cache_.find(dir) == cache_.end()) {
396         dir_sizes.emplace(dir, 0);
397       }
398     }
399   }
400 
401   Status s;
402   for (auto& dir_size : dir_sizes) {
403     s = get_logical_block_size_of_directory_(dir_size.first, &dir_size.second);
404     if (!s.ok()) {
405       return s;
406     }
407   }
408 
409   WriteLock lock(&cache_mutex_);
410   for (const auto& dir : dirs) {
411     auto& v = cache_[dir];
412     v.ref++;
413     auto dir_size = dir_sizes.find(dir);
414     if (dir_size != dir_sizes.end()) {
415       v.size = dir_size->second;
416     }
417   }
418   return s;
419 }
420 
UnrefAndTryRemoveCachedLogicalBlockSize(const std::vector<std::string> & directories)421 void LogicalBlockSizeCache::UnrefAndTryRemoveCachedLogicalBlockSize(
422     const std::vector<std::string>& directories) {
423   std::vector<std::string> dirs;
424   dirs.reserve(directories.size());
425   for (auto& dir : directories) {
426     dirs.emplace_back(RemoveTrailingSlash(dir));
427   }
428 
429   WriteLock lock(&cache_mutex_);
430   for (const auto& dir : dirs) {
431     auto it = cache_.find(dir);
432     if (it != cache_.end() && !(--(it->second.ref))) {
433       cache_.erase(it);
434     }
435   }
436 }
437 
GetLogicalBlockSize(const std::string & fname,int fd)438 size_t LogicalBlockSizeCache::GetLogicalBlockSize(const std::string& fname,
439                                                   int fd) {
440   std::string dir = fname.substr(0, fname.find_last_of("/"));
441   if (dir.empty()) {
442     dir = "/";
443   }
444   {
445     ReadLock lock(&cache_mutex_);
446     auto it = cache_.find(dir);
447     if (it != cache_.end()) {
448       return it->second.size;
449     }
450   }
451   return get_logical_block_size_of_fd_(fd);
452 }
453 #endif
454 
GetLogicalBlockSizeOfDirectory(const std::string & directory,size_t * size)455 Status PosixHelper::GetLogicalBlockSizeOfDirectory(const std::string& directory,
456                                                    size_t* size) {
457   int fd = open(directory.c_str(), O_DIRECTORY | O_RDONLY);
458   if (fd == -1) {
459     close(fd);
460     return Status::IOError("Cannot open directory " + directory);
461   }
462   *size = PosixHelper::GetLogicalBlockSizeOfFd(fd);
463   close(fd);
464   return Status::OK();
465 }
466 
GetLogicalBlockSizeOfFd(int fd)467 size_t PosixHelper::GetLogicalBlockSizeOfFd(int fd) {
468 #ifdef OS_LINUX
469   struct stat buf;
470   int result = fstat(fd, &buf);
471   if (result == -1) {
472     return kDefaultPageSize;
473   }
474   if (major(buf.st_dev) == 0) {
475     // Unnamed devices (e.g. non-device mounts), reserved as null device number.
476     // These don't have an entry in /sys/dev/block/. Return a sensible default.
477     return kDefaultPageSize;
478   }
479 
480   // Reading queue/logical_block_size does not require special permissions.
481   const int kBufferSize = 100;
482   char path[kBufferSize];
483   char real_path[PATH_MAX + 1];
484   snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
485            minor(buf.st_dev));
486   if (realpath(path, real_path) == nullptr) {
487     return kDefaultPageSize;
488   }
489   std::string device_dir(real_path);
490   if (!device_dir.empty() && device_dir.back() == '/') {
491     device_dir.pop_back();
492   }
493   // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
494   // and nvme0n1 have it.
495   // $ ls -al '/sys/dev/block/8:3'
496   // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
497   // ../../block/sda/sda3
498   // $ ls -al '/sys/dev/block/259:4'
499   // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
500   // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
501   size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
502   if (parent_end == std::string::npos) {
503     return kDefaultPageSize;
504   }
505   size_t parent_begin = device_dir.rfind('/', parent_end - 1);
506   if (parent_begin == std::string::npos) {
507     return kDefaultPageSize;
508   }
509   std::string parent =
510       device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
511   std::string child = device_dir.substr(parent_end + 1, std::string::npos);
512   if (parent != "block" &&
513       (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
514     device_dir = device_dir.substr(0, parent_end);
515   }
516   std::string fname = device_dir + "/queue/logical_block_size";
517   FILE* fp;
518   size_t size = 0;
519   fp = fopen(fname.c_str(), "r");
520   if (fp != nullptr) {
521     char* line = nullptr;
522     size_t len = 0;
523     if (getline(&line, &len, fp) != -1) {
524       sscanf(line, "%zu", &size);
525     }
526     free(line);
527     fclose(fp);
528   }
529   if (size != 0 && (size & (size - 1)) == 0) {
530     return size;
531   }
532 #endif
533   (void)fd;
534   return kDefaultPageSize;
535 }
536 
537 /*
538  * PosixRandomAccessFile
539  *
540  * pread() based random-access
541  */
PosixRandomAccessFile(const std::string & fname,int fd,size_t logical_block_size,const EnvOptions & options,ThreadLocalPtr * thread_local_io_urings)542 PosixRandomAccessFile::PosixRandomAccessFile(
543     const std::string& fname, int fd, size_t logical_block_size,
544     const EnvOptions& options
545 #if defined(ROCKSDB_IOURING_PRESENT)
546     ,
547     ThreadLocalPtr* thread_local_io_urings
548 #endif
549     )
550     : filename_(fname),
551       fd_(fd),
552       use_direct_io_(options.use_direct_reads),
553       logical_sector_size_(logical_block_size)
554 #if defined(ROCKSDB_IOURING_PRESENT)
555       ,
556       thread_local_io_urings_(thread_local_io_urings)
557 #endif
558 {
559   assert(!options.use_direct_reads || !options.use_mmap_reads);
560   assert(!options.use_mmap_reads || sizeof(void*) < 8);
561 }
562 
~PosixRandomAccessFile()563 PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); }
564 
Read(uint64_t offset,size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *) const565 IOStatus PosixRandomAccessFile::Read(uint64_t offset, size_t n,
566                                      const IOOptions& /*opts*/, Slice* result,
567                                      char* scratch,
568                                      IODebugContext* /*dbg*/) const {
569   if (use_direct_io()) {
570     assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
571     assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
572     assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
573   }
574   IOStatus s;
575   ssize_t r = -1;
576   size_t left = n;
577   char* ptr = scratch;
578   while (left > 0) {
579     r = pread(fd_, ptr, left, static_cast<off_t>(offset));
580     if (r <= 0) {
581       if (r == -1 && errno == EINTR) {
582         continue;
583       }
584       break;
585     }
586     ptr += r;
587     offset += r;
588     left -= r;
589     if (use_direct_io() &&
590         r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
591       // Bytes reads don't fill sectors. Should only happen at the end
592       // of the file.
593       break;
594     }
595   }
596   if (r < 0) {
597     // An error: return a non-ok status
598     s = IOError(
599         "While pread offset " + ToString(offset) + " len " + ToString(n),
600         filename_, errno);
601   }
602   *result = Slice(scratch, (r < 0) ? 0 : n - left);
603   return s;
604 }
605 
MultiRead(FSReadRequest * reqs,size_t num_reqs,const IOOptions & options,IODebugContext * dbg)606 IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
607                                           size_t num_reqs,
608                                           const IOOptions& options,
609                                           IODebugContext* dbg) {
610   if (use_direct_io()) {
611     for (size_t i = 0; i < num_reqs; i++) {
612       assert(IsSectorAligned(reqs[i].offset, GetRequiredBufferAlignment()));
613       assert(IsSectorAligned(reqs[i].len, GetRequiredBufferAlignment()));
614       assert(IsSectorAligned(reqs[i].scratch, GetRequiredBufferAlignment()));
615     }
616   }
617 
618 #if defined(ROCKSDB_IOURING_PRESENT)
619   struct io_uring* iu = nullptr;
620   if (thread_local_io_urings_) {
621     iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
622     if (iu == nullptr) {
623       iu = CreateIOUring();
624       if (iu != nullptr) {
625         thread_local_io_urings_->Reset(iu);
626       }
627     }
628   }
629 
630   // Init failed, platform doesn't support io_uring. Fall back to
631   // serialized reads
632   if (iu == nullptr) {
633     return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
634   }
635 
636   IOStatus ios = IOStatus::OK();
637 
638   struct WrappedReadRequest {
639     FSReadRequest* req;
640     struct iovec iov;
641     size_t finished_len;
642     explicit WrappedReadRequest(FSReadRequest* r) : req(r), finished_len(0) {}
643   };
644 
645   autovector<WrappedReadRequest, 32> req_wraps;
646   autovector<WrappedReadRequest*, 4> incomplete_rq_list;
647 
648   for (size_t i = 0; i < num_reqs; i++) {
649     req_wraps.emplace_back(&reqs[i]);
650   }
651 
652   size_t reqs_off = 0;
653   while (num_reqs > reqs_off || !incomplete_rq_list.empty()) {
654     size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size();
655 
656     // If requests exceed depth, split it into batches
657     if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth;
658 
659     assert(incomplete_rq_list.size() <= this_reqs);
660     for (size_t i = 0; i < this_reqs; i++) {
661       WrappedReadRequest* rep_to_submit;
662       if (i < incomplete_rq_list.size()) {
663         rep_to_submit = incomplete_rq_list[i];
664       } else {
665         rep_to_submit = &req_wraps[reqs_off++];
666       }
667       assert(rep_to_submit->req->len > rep_to_submit->finished_len);
668       rep_to_submit->iov.iov_base =
669           rep_to_submit->req->scratch + rep_to_submit->finished_len;
670       rep_to_submit->iov.iov_len =
671           rep_to_submit->req->len - rep_to_submit->finished_len;
672 
673       struct io_uring_sqe* sqe;
674       sqe = io_uring_get_sqe(iu);
675       io_uring_prep_readv(
676           sqe, fd_, &rep_to_submit->iov, 1,
677           rep_to_submit->req->offset + rep_to_submit->finished_len);
678       io_uring_sqe_set_data(sqe, rep_to_submit);
679     }
680     incomplete_rq_list.clear();
681 
682     ssize_t ret =
683         io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
684     TEST_SYNC_POINT_CALLBACK(
685         "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
686         &ret);
687     TEST_SYNC_POINT_CALLBACK(
688         "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
689         iu);
690 
691     if (static_cast<size_t>(ret) != this_reqs) {
692       fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
693       // If error happens and we submitted fewer than expected, it is an
694       // exception case and we don't retry here. We should still consume
695       // what is is submitted in the ring.
696       for (ssize_t i = 0; i < ret; i++) {
697         struct io_uring_cqe* cqe = nullptr;
698         io_uring_wait_cqe(iu, &cqe);
699         if (cqe != nullptr) {
700           io_uring_cqe_seen(iu, cqe);
701         }
702       }
703       return IOStatus::IOError("io_uring_submit_and_wait() requested " +
704                                ToString(this_reqs) + " but returned " +
705                                ToString(ret));
706     }
707 
708     for (size_t i = 0; i < this_reqs; i++) {
709       struct io_uring_cqe* cqe = nullptr;
710       WrappedReadRequest* req_wrap;
711 
712       // We could use the peek variant here, but this seems safer in terms
713       // of our initial wait not reaping all completions
714       ret = io_uring_wait_cqe(iu, &cqe);
715       TEST_SYNC_POINT_CALLBACK(
716           "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret);
717       if (ret) {
718         ios = IOStatus::IOError("io_uring_wait_cqe() returns " + ToString(ret));
719 
720         if (cqe != nullptr) {
721           io_uring_cqe_seen(iu, cqe);
722         }
723         continue;
724       }
725 
726       req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
727       FSReadRequest* req = req_wrap->req;
728       if (cqe->res < 0) {
729         req->result = Slice(req->scratch, 0);
730         req->status = IOError("Req failed", filename_, cqe->res);
731       } else {
732         size_t bytes_read = static_cast<size_t>(cqe->res);
733         TEST_SYNC_POINT_CALLBACK(
734             "PosixRandomAccessFile::MultiRead:io_uring_result", &bytes_read);
735         if (bytes_read == req_wrap->iov.iov_len) {
736           req->result = Slice(req->scratch, req->len);
737           req->status = IOStatus::OK();
738         } else if (bytes_read == 0) {
739           // cqe->res == 0 can means EOF, or can mean partial results. See
740           // comment
741           // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
742           // Fall back to pread in this case.
743           if (use_direct_io() &&
744               !IsSectorAligned(req_wrap->finished_len,
745                                GetRequiredBufferAlignment())) {
746             // Bytes reads don't fill sectors. Should only happen at the end
747             // of the file.
748             req->result = Slice(req->scratch, req_wrap->finished_len);
749             req->status = IOStatus::OK();
750           } else {
751             Slice tmp_slice;
752             req->status =
753                 Read(req->offset + req_wrap->finished_len,
754                      req->len - req_wrap->finished_len, options, &tmp_slice,
755                      req->scratch + req_wrap->finished_len, dbg);
756             req->result =
757                 Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
758           }
759         } else if (bytes_read < req_wrap->iov.iov_len) {
760           assert(bytes_read > 0);
761           assert(bytes_read + req_wrap->finished_len < req->len);
762           req_wrap->finished_len += bytes_read;
763           incomplete_rq_list.push_back(req_wrap);
764         } else {
765           req->result = Slice(req->scratch, 0);
766           req->status = IOError("Req returned more bytes than requested",
767                                 filename_, cqe->res);
768         }
769       }
770       io_uring_cqe_seen(iu, cqe);
771     }
772   }
773   return ios;
774 #else
775   return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
776 #endif
777 }
778 
Prefetch(uint64_t offset,size_t n,const IOOptions &,IODebugContext *)779 IOStatus PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n,
780                                          const IOOptions& /*opts*/,
781                                          IODebugContext* /*dbg*/) {
782   IOStatus s;
783   if (!use_direct_io()) {
784     ssize_t r = 0;
785 #ifdef OS_LINUX
786     r = readahead(fd_, offset, n);
787 #endif
788 #ifdef OS_MACOSX
789     radvisory advice;
790     advice.ra_offset = static_cast<off_t>(offset);
791     advice.ra_count = static_cast<int>(n);
792     r = fcntl(fd_, F_RDADVISE, &advice);
793 #endif
794     if (r == -1) {
795       s = IOError("While prefetching offset " + ToString(offset) + " len " +
796                       ToString(n),
797                   filename_, errno);
798     }
799   }
800   return s;
801 }
802 
803 #if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
GetUniqueId(char * id,size_t max_size) const804 size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
805   return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
806 }
807 #endif
808 
Hint(AccessPattern pattern)809 void PosixRandomAccessFile::Hint(AccessPattern pattern) {
810   if (use_direct_io()) {
811     return;
812   }
813   switch (pattern) {
814     case kNormal:
815       Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL);
816       break;
817     case kRandom:
818       Fadvise(fd_, 0, 0, POSIX_FADV_RANDOM);
819       break;
820     case kSequential:
821       Fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
822       break;
823     case kWillNeed:
824       Fadvise(fd_, 0, 0, POSIX_FADV_WILLNEED);
825       break;
826     case kWontNeed:
827       Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED);
828       break;
829     default:
830       assert(false);
831       break;
832   }
833 }
834 
InvalidateCache(size_t offset,size_t length)835 IOStatus PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
836   if (use_direct_io()) {
837     return IOStatus::OK();
838   }
839 #ifndef OS_LINUX
840   (void)offset;
841   (void)length;
842   return IOStatus::OK();
843 #else
844   // free OS pages
845   int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
846   if (ret == 0) {
847     return IOStatus::OK();
848   }
849   return IOError("While fadvise NotNeeded offset " + ToString(offset) +
850                      " len " + ToString(length),
851                  filename_, errno);
852 #endif
853 }
854 
855 /*
856  * PosixMmapReadableFile
857  *
858  * mmap() based random-access
859  */
860 // base[0,length-1] contains the mmapped contents of the file.
PosixMmapReadableFile(const int fd,const std::string & fname,void * base,size_t length,const EnvOptions & options)861 PosixMmapReadableFile::PosixMmapReadableFile(const int fd,
862                                              const std::string& fname,
863                                              void* base, size_t length,
864                                              const EnvOptions& options)
865     : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) {
866 #ifdef NDEBUG
867   (void)options;
868 #endif
869   fd_ = fd_ + 0;  // suppress the warning for used variables
870   assert(options.use_mmap_reads);
871   assert(!options.use_direct_reads);
872 }
873 
~PosixMmapReadableFile()874 PosixMmapReadableFile::~PosixMmapReadableFile() {
875   int ret = munmap(mmapped_region_, length_);
876   if (ret != 0) {
877     fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n",
878             mmapped_region_, length_);
879   }
880   close(fd_);
881 }
882 
Read(uint64_t offset,size_t n,const IOOptions &,Slice * result,char *,IODebugContext *) const883 IOStatus PosixMmapReadableFile::Read(uint64_t offset, size_t n,
884                                      const IOOptions& /*opts*/, Slice* result,
885                                      char* /*scratch*/,
886                                      IODebugContext* /*dbg*/) const {
887   IOStatus s;
888   if (offset > length_) {
889     *result = Slice();
890     return IOError("While mmap read offset " + ToString(offset) +
891                        " larger than file length " + ToString(length_),
892                    filename_, EINVAL);
893   } else if (offset + n > length_) {
894     n = static_cast<size_t>(length_ - offset);
895   }
896   *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
897   return s;
898 }
899 
InvalidateCache(size_t offset,size_t length)900 IOStatus PosixMmapReadableFile::InvalidateCache(size_t offset, size_t length) {
901 #ifndef OS_LINUX
902   (void)offset;
903   (void)length;
904   return IOStatus::OK();
905 #else
906   // free OS pages
907   int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
908   if (ret == 0) {
909     return IOStatus::OK();
910   }
911   return IOError("While fadvise not needed. Offset " + ToString(offset) +
912                      " len" + ToString(length),
913                  filename_, errno);
914 #endif
915 }
916 
917 /*
918  * PosixMmapFile
919  *
920  * We preallocate up to an extra megabyte and use memcpy to append new
921  * data to the file.  This is safe since we either properly close the
922  * file before reading from it, or for log files, the reading code
923  * knows enough to skip zero suffixes.
924  */
UnmapCurrentRegion()925 IOStatus PosixMmapFile::UnmapCurrentRegion() {
926   TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0");
927   if (base_ != nullptr) {
928     int munmap_status = munmap(base_, limit_ - base_);
929     if (munmap_status != 0) {
930       return IOError("While munmap", filename_, munmap_status);
931     }
932     file_offset_ += limit_ - base_;
933     base_ = nullptr;
934     limit_ = nullptr;
935     last_sync_ = nullptr;
936     dst_ = nullptr;
937 
938     // Increase the amount we map the next time, but capped at 1MB
939     if (map_size_ < (1 << 20)) {
940       map_size_ *= 2;
941     }
942   }
943   return IOStatus::OK();
944 }
945 
MapNewRegion()946 IOStatus PosixMmapFile::MapNewRegion() {
947 #ifdef ROCKSDB_FALLOCATE_PRESENT
948   assert(base_ == nullptr);
949   TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0");
950   // we can't fallocate with FALLOC_FL_KEEP_SIZE here
951   if (allow_fallocate_) {
952     IOSTATS_TIMER_GUARD(allocate_nanos);
953     int alloc_status = fallocate(fd_, 0, file_offset_, map_size_);
954     if (alloc_status != 0) {
955       // fallback to posix_fallocate
956       alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
957     }
958     if (alloc_status != 0) {
959       return IOStatus::IOError("Error allocating space to file : " + filename_ +
960                                "Error : " + errnoStr(alloc_status).c_str());
961     }
962   }
963 
964   TEST_KILL_RANDOM("PosixMmapFile::Append:1");
965   void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_,
966                    file_offset_);
967   if (ptr == MAP_FAILED) {
968     return IOStatus::IOError("MMap failed on " + filename_);
969   }
970   TEST_KILL_RANDOM("PosixMmapFile::Append:2");
971 
972   base_ = reinterpret_cast<char*>(ptr);
973   limit_ = base_ + map_size_;
974   dst_ = base_;
975   last_sync_ = base_;
976   return IOStatus::OK();
977 #else
978   return IOStatus::NotSupported("This platform doesn't support fallocate()");
979 #endif
980 }
981 
Msync()982 IOStatus PosixMmapFile::Msync() {
983   if (dst_ == last_sync_) {
984     return IOStatus::OK();
985   }
986   // Find the beginnings of the pages that contain the first and last
987   // bytes to be synced.
988   size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
989   size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
990   last_sync_ = dst_;
991   TEST_KILL_RANDOM("PosixMmapFile::Msync:0");
992   if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
993     return IOError("While msync", filename_, errno);
994   }
995   return IOStatus::OK();
996 }
997 
PosixMmapFile(const std::string & fname,int fd,size_t page_size,const EnvOptions & options)998 PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size,
999                              const EnvOptions& options)
1000     : filename_(fname),
1001       fd_(fd),
1002       page_size_(page_size),
1003       map_size_(Roundup(65536, page_size)),
1004       base_(nullptr),
1005       limit_(nullptr),
1006       dst_(nullptr),
1007       last_sync_(nullptr),
1008       file_offset_(0) {
1009 #ifdef ROCKSDB_FALLOCATE_PRESENT
1010   allow_fallocate_ = options.allow_fallocate;
1011   fallocate_with_keep_size_ = options.fallocate_with_keep_size;
1012 #else
1013   (void)options;
1014 #endif
1015   assert((page_size & (page_size - 1)) == 0);
1016   assert(options.use_mmap_writes);
1017   assert(!options.use_direct_writes);
1018 }
1019 
~PosixMmapFile()1020 PosixMmapFile::~PosixMmapFile() {
1021   if (fd_ >= 0) {
1022     IOStatus s = PosixMmapFile::Close(IOOptions(), nullptr);
1023     s.PermitUncheckedError();
1024   }
1025 }
1026 
Append(const Slice & data,const IOOptions &,IODebugContext *)1027 IOStatus PosixMmapFile::Append(const Slice& data, const IOOptions& /*opts*/,
1028                                IODebugContext* /*dbg*/) {
1029   const char* src = data.data();
1030   size_t left = data.size();
1031   while (left > 0) {
1032     assert(base_ <= dst_);
1033     assert(dst_ <= limit_);
1034     size_t avail = limit_ - dst_;
1035     if (avail == 0) {
1036       IOStatus s = UnmapCurrentRegion();
1037       if (!s.ok()) {
1038         return s;
1039       }
1040       s = MapNewRegion();
1041       if (!s.ok()) {
1042         return s;
1043       }
1044       TEST_KILL_RANDOM("PosixMmapFile::Append:0");
1045     }
1046 
1047     size_t n = (left <= avail) ? left : avail;
1048     assert(dst_);
1049     memcpy(dst_, src, n);
1050     dst_ += n;
1051     src += n;
1052     left -= n;
1053   }
1054   return IOStatus::OK();
1055 }
1056 
Close(const IOOptions &,IODebugContext *)1057 IOStatus PosixMmapFile::Close(const IOOptions& /*opts*/,
1058                               IODebugContext* /*dbg*/) {
1059   IOStatus s;
1060   size_t unused = limit_ - dst_;
1061 
1062   s = UnmapCurrentRegion();
1063   if (!s.ok()) {
1064     s = IOError("While closing mmapped file", filename_, errno);
1065   } else if (unused > 0) {
1066     // Trim the extra space at the end of the file
1067     if (ftruncate(fd_, file_offset_ - unused) < 0) {
1068       s = IOError("While ftruncating mmaped file", filename_, errno);
1069     }
1070   }
1071 
1072   if (close(fd_) < 0) {
1073     if (s.ok()) {
1074       s = IOError("While closing mmapped file", filename_, errno);
1075     }
1076   }
1077 
1078   fd_ = -1;
1079   base_ = nullptr;
1080   limit_ = nullptr;
1081   return s;
1082 }
1083 
Flush(const IOOptions &,IODebugContext *)1084 IOStatus PosixMmapFile::Flush(const IOOptions& /*opts*/,
1085                               IODebugContext* /*dbg*/) {
1086   return IOStatus::OK();
1087 }
1088 
Sync(const IOOptions &,IODebugContext *)1089 IOStatus PosixMmapFile::Sync(const IOOptions& /*opts*/,
1090                              IODebugContext* /*dbg*/) {
1091   if (fdatasync(fd_) < 0) {
1092     return IOError("While fdatasync mmapped file", filename_, errno);
1093   }
1094 
1095   return Msync();
1096 }
1097 
1098 /**
1099  * Flush data as well as metadata to stable storage.
1100  */
Fsync(const IOOptions &,IODebugContext *)1101 IOStatus PosixMmapFile::Fsync(const IOOptions& /*opts*/,
1102                               IODebugContext* /*dbg*/) {
1103   if (fsync(fd_) < 0) {
1104     return IOError("While fsync mmaped file", filename_, errno);
1105   }
1106 
1107   return Msync();
1108 }
1109 
1110 /**
1111  * Get the size of valid data in the file. This will not match the
1112  * size that is returned from the filesystem because we use mmap
1113  * to extend file by map_size every time.
1114  */
GetFileSize(const IOOptions &,IODebugContext *)1115 uint64_t PosixMmapFile::GetFileSize(const IOOptions& /*opts*/,
1116                                     IODebugContext* /*dbg*/) {
1117   size_t used = dst_ - base_;
1118   return file_offset_ + used;
1119 }
1120 
InvalidateCache(size_t offset,size_t length)1121 IOStatus PosixMmapFile::InvalidateCache(size_t offset, size_t length) {
1122 #ifndef OS_LINUX
1123   (void)offset;
1124   (void)length;
1125   return IOStatus::OK();
1126 #else
1127   // free OS pages
1128   int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
1129   if (ret == 0) {
1130     return IOStatus::OK();
1131   }
1132   return IOError("While fadvise NotNeeded mmapped file", filename_, errno);
1133 #endif
1134 }
1135 
1136 #ifdef ROCKSDB_FALLOCATE_PRESENT
Allocate(uint64_t offset,uint64_t len,const IOOptions &,IODebugContext *)1137 IOStatus PosixMmapFile::Allocate(uint64_t offset, uint64_t len,
1138                                  const IOOptions& /*opts*/,
1139                                  IODebugContext* /*dbg*/) {
1140   assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1141   assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1142   TEST_KILL_RANDOM("PosixMmapFile::Allocate:0");
1143   int alloc_status = 0;
1144   if (allow_fallocate_) {
1145     alloc_status =
1146         fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
1147                   static_cast<off_t>(offset), static_cast<off_t>(len));
1148   }
1149   if (alloc_status == 0) {
1150     return IOStatus::OK();
1151   } else {
1152     return IOError(
1153         "While fallocate offset " + ToString(offset) + " len " + ToString(len),
1154         filename_, errno);
1155   }
1156 }
1157 #endif
1158 
1159 /*
1160  * PosixWritableFile
1161  *
1162  * Use posix write to write data to a file.
1163  */
PosixWritableFile(const std::string & fname,int fd,size_t logical_block_size,const EnvOptions & options)1164 PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
1165                                      size_t logical_block_size,
1166                                      const EnvOptions& options)
1167     : FSWritableFile(options),
1168       filename_(fname),
1169       use_direct_io_(options.use_direct_writes),
1170       fd_(fd),
1171       filesize_(0),
1172       logical_sector_size_(logical_block_size) {
1173 #ifdef ROCKSDB_FALLOCATE_PRESENT
1174   allow_fallocate_ = options.allow_fallocate;
1175   fallocate_with_keep_size_ = options.fallocate_with_keep_size;
1176 #endif
1177 #ifdef ROCKSDB_RANGESYNC_PRESENT
1178   sync_file_range_supported_ = IsSyncFileRangeSupported(fd_);
1179 #endif  // ROCKSDB_RANGESYNC_PRESENT
1180   assert(!options.use_mmap_writes);
1181 }
1182 
~PosixWritableFile()1183 PosixWritableFile::~PosixWritableFile() {
1184   if (fd_ >= 0) {
1185     IOStatus s = PosixWritableFile::Close(IOOptions(), nullptr);
1186     s.PermitUncheckedError();
1187   }
1188 }
1189 
Append(const Slice & data,const IOOptions &,IODebugContext *)1190 IOStatus PosixWritableFile::Append(const Slice& data, const IOOptions& /*opts*/,
1191                                    IODebugContext* /*dbg*/) {
1192   if (use_direct_io()) {
1193     assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
1194     assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
1195   }
1196   const char* src = data.data();
1197   size_t nbytes = data.size();
1198 
1199   if (!PosixWrite(fd_, src, nbytes)) {
1200     return IOError("While appending to file", filename_, errno);
1201   }
1202 
1203   filesize_ += nbytes;
1204   return IOStatus::OK();
1205 }
1206 
PositionedAppend(const Slice & data,uint64_t offset,const IOOptions &,IODebugContext *)1207 IOStatus PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset,
1208                                              const IOOptions& /*opts*/,
1209                                              IODebugContext* /*dbg*/) {
1210   if (use_direct_io()) {
1211     assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
1212     assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
1213     assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
1214   }
1215   assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1216   const char* src = data.data();
1217   size_t nbytes = data.size();
1218   if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
1219     return IOError("While pwrite to file at offset " + ToString(offset),
1220                    filename_, errno);
1221   }
1222   filesize_ = offset + nbytes;
1223   return IOStatus::OK();
1224 }
1225 
Truncate(uint64_t size,const IOOptions &,IODebugContext *)1226 IOStatus PosixWritableFile::Truncate(uint64_t size, const IOOptions& /*opts*/,
1227                                      IODebugContext* /*dbg*/) {
1228   IOStatus s;
1229   int r = ftruncate(fd_, size);
1230   if (r < 0) {
1231     s = IOError("While ftruncate file to size " + ToString(size), filename_,
1232                 errno);
1233   } else {
1234     filesize_ = size;
1235   }
1236   return s;
1237 }
1238 
Close(const IOOptions &,IODebugContext *)1239 IOStatus PosixWritableFile::Close(const IOOptions& /*opts*/,
1240                                   IODebugContext* /*dbg*/) {
1241   IOStatus s;
1242 
1243   size_t block_size;
1244   size_t last_allocated_block;
1245   GetPreallocationStatus(&block_size, &last_allocated_block);
1246   TEST_SYNC_POINT_CALLBACK("PosixWritableFile::Close", &last_allocated_block);
1247   if (last_allocated_block > 0) {
1248     // trim the extra space preallocated at the end of the file
1249     // NOTE(ljin): we probably don't want to surface failure as an IOError,
1250     // but it will be nice to log these errors.
1251     int dummy __attribute__((__unused__));
1252     dummy = ftruncate(fd_, filesize_);
1253 #if defined(ROCKSDB_FALLOCATE_PRESENT) && defined(FALLOC_FL_PUNCH_HOLE) && \
1254     !defined(TRAVIS)
1255     // in some file systems, ftruncate only trims trailing space if the
1256     // new file size is smaller than the current size. Calling fallocate
1257     // with FALLOC_FL_PUNCH_HOLE flag to explicitly release these unused
1258     // blocks. FALLOC_FL_PUNCH_HOLE is supported on at least the following
1259     // filesystems:
1260     //   XFS (since Linux 2.6.38)
1261     //   ext4 (since Linux 3.0)
1262     //   Btrfs (since Linux 3.7)
1263     //   tmpfs (since Linux 3.5)
1264     // We ignore error since failure of this operation does not affect
1265     // correctness.
1266     // TRAVIS - this code does not work on TRAVIS filesystems.
1267     // the FALLOC_FL_KEEP_SIZE option is expected to not change the size
1268     // of the file, but it does. Simple strace report will show that.
1269     // While we work with Travis-CI team to figure out if this is a
1270     // quirk of Docker/AUFS, we will comment this out.
1271     struct stat file_stats;
1272     int result = fstat(fd_, &file_stats);
1273     // After ftruncate, we check whether ftruncate has the correct behavior.
1274     // If not, we should hack it with FALLOC_FL_PUNCH_HOLE
1275     if (result == 0 &&
1276         (file_stats.st_size + file_stats.st_blksize - 1) /
1277                 file_stats.st_blksize !=
1278             file_stats.st_blocks / (file_stats.st_blksize / 512)) {
1279       IOSTATS_TIMER_GUARD(allocate_nanos);
1280       if (allow_fallocate_) {
1281         fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_,
1282                   block_size * last_allocated_block - filesize_);
1283       }
1284     }
1285 #endif
1286   }
1287 
1288   if (close(fd_) < 0) {
1289     s = IOError("While closing file after writing", filename_, errno);
1290   }
1291   fd_ = -1;
1292   return s;
1293 }
1294 
1295 // write out the cached data to the OS cache
Flush(const IOOptions &,IODebugContext *)1296 IOStatus PosixWritableFile::Flush(const IOOptions& /*opts*/,
1297                                   IODebugContext* /*dbg*/) {
1298   return IOStatus::OK();
1299 }
1300 
Sync(const IOOptions &,IODebugContext *)1301 IOStatus PosixWritableFile::Sync(const IOOptions& /*opts*/,
1302                                  IODebugContext* /*dbg*/) {
1303   if (fdatasync(fd_) < 0) {
1304     return IOError("While fdatasync", filename_, errno);
1305   }
1306   return IOStatus::OK();
1307 }
1308 
Fsync(const IOOptions &,IODebugContext *)1309 IOStatus PosixWritableFile::Fsync(const IOOptions& /*opts*/,
1310                                   IODebugContext* /*dbg*/) {
1311   if (fsync(fd_) < 0) {
1312     return IOError("While fsync", filename_, errno);
1313   }
1314   return IOStatus::OK();
1315 }
1316 
IsSyncThreadSafe() const1317 bool PosixWritableFile::IsSyncThreadSafe() const { return true; }
1318 
GetFileSize(const IOOptions &,IODebugContext *)1319 uint64_t PosixWritableFile::GetFileSize(const IOOptions& /*opts*/,
1320                                         IODebugContext* /*dbg*/) {
1321   return filesize_;
1322 }
1323 
SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint)1324 void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
1325 #ifdef OS_LINUX
1326 // Suppress Valgrind "Unimplemented functionality" error.
1327 #ifndef ROCKSDB_VALGRIND_RUN
1328   if (hint == write_hint_) {
1329     return;
1330   }
1331   if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) {
1332     write_hint_ = hint;
1333   }
1334 #else
1335   (void)hint;
1336 #endif  // ROCKSDB_VALGRIND_RUN
1337 #else
1338   (void)hint;
1339 #endif  // OS_LINUX
1340 }
1341 
InvalidateCache(size_t offset,size_t length)1342 IOStatus PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
1343   if (use_direct_io()) {
1344     return IOStatus::OK();
1345   }
1346 #ifndef OS_LINUX
1347   (void)offset;
1348   (void)length;
1349   return IOStatus::OK();
1350 #else
1351   // free OS pages
1352   int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
1353   if (ret == 0) {
1354     return IOStatus::OK();
1355   }
1356   return IOError("While fadvise NotNeeded", filename_, errno);
1357 #endif
1358 }
1359 
1360 #ifdef ROCKSDB_FALLOCATE_PRESENT
Allocate(uint64_t offset,uint64_t len,const IOOptions &,IODebugContext *)1361 IOStatus PosixWritableFile::Allocate(uint64_t offset, uint64_t len,
1362                                      const IOOptions& /*opts*/,
1363                                      IODebugContext* /*dbg*/) {
1364   assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1365   assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1366   TEST_KILL_RANDOM("PosixWritableFile::Allocate:0");
1367   IOSTATS_TIMER_GUARD(allocate_nanos);
1368   int alloc_status = 0;
1369   if (allow_fallocate_) {
1370     alloc_status =
1371         fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
1372                   static_cast<off_t>(offset), static_cast<off_t>(len));
1373   }
1374   if (alloc_status == 0) {
1375     return IOStatus::OK();
1376   } else {
1377     return IOError(
1378         "While fallocate offset " + ToString(offset) + " len " + ToString(len),
1379         filename_, errno);
1380   }
1381 }
1382 #endif
1383 
RangeSync(uint64_t offset,uint64_t nbytes,const IOOptions & opts,IODebugContext * dbg)1384 IOStatus PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes,
1385                                       const IOOptions& opts,
1386                                       IODebugContext* dbg) {
1387 #ifdef ROCKSDB_RANGESYNC_PRESENT
1388   assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1389   assert(nbytes <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1390   if (sync_file_range_supported_) {
1391     int ret;
1392     if (strict_bytes_per_sync_) {
1393       // Specifying `SYNC_FILE_RANGE_WAIT_BEFORE` together with an offset/length
1394       // that spans all bytes written so far tells `sync_file_range` to wait for
1395       // any outstanding writeback requests to finish before issuing a new one.
1396       ret =
1397           sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes),
1398                           SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE);
1399     } else {
1400       ret = sync_file_range(fd_, static_cast<off_t>(offset),
1401                             static_cast<off_t>(nbytes), SYNC_FILE_RANGE_WRITE);
1402     }
1403     if (ret != 0) {
1404       return IOError("While sync_file_range returned " + ToString(ret),
1405                      filename_, errno);
1406     }
1407     return IOStatus::OK();
1408   }
1409 #endif  // ROCKSDB_RANGESYNC_PRESENT
1410   return FSWritableFile::RangeSync(offset, nbytes, opts, dbg);
1411 }
1412 
1413 #ifdef OS_LINUX
GetUniqueId(char * id,size_t max_size) const1414 size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const {
1415   return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
1416 }
1417 #endif
1418 
1419 /*
1420  * PosixRandomRWFile
1421  */
1422 
PosixRandomRWFile(const std::string & fname,int fd,const EnvOptions &)1423 PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd,
1424                                      const EnvOptions& /*options*/)
1425     : filename_(fname), fd_(fd) {}
1426 
~PosixRandomRWFile()1427 PosixRandomRWFile::~PosixRandomRWFile() {
1428   if (fd_ >= 0) {
1429     IOStatus s = Close(IOOptions(), nullptr);
1430     s.PermitUncheckedError();
1431   }
1432 }
1433 
Write(uint64_t offset,const Slice & data,const IOOptions &,IODebugContext *)1434 IOStatus PosixRandomRWFile::Write(uint64_t offset, const Slice& data,
1435                                   const IOOptions& /*opts*/,
1436                                   IODebugContext* /*dbg*/) {
1437   const char* src = data.data();
1438   size_t nbytes = data.size();
1439   if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
1440     return IOError(
1441         "While write random read/write file at offset " + ToString(offset),
1442         filename_, errno);
1443   }
1444 
1445   return IOStatus::OK();
1446 }
1447 
Read(uint64_t offset,size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *) const1448 IOStatus PosixRandomRWFile::Read(uint64_t offset, size_t n,
1449                                  const IOOptions& /*opts*/, Slice* result,
1450                                  char* scratch, IODebugContext* /*dbg*/) const {
1451   size_t left = n;
1452   char* ptr = scratch;
1453   while (left > 0) {
1454     ssize_t done = pread(fd_, ptr, left, offset);
1455     if (done < 0) {
1456       // error while reading from file
1457       if (errno == EINTR) {
1458         // read was interrupted, try again.
1459         continue;
1460       }
1461       return IOError("While reading random read/write file offset " +
1462                          ToString(offset) + " len " + ToString(n),
1463                      filename_, errno);
1464     } else if (done == 0) {
1465       // Nothing more to read
1466       break;
1467     }
1468 
1469     // Read `done` bytes
1470     ptr += done;
1471     offset += done;
1472     left -= done;
1473   }
1474 
1475   *result = Slice(scratch, n - left);
1476   return IOStatus::OK();
1477 }
1478 
Flush(const IOOptions &,IODebugContext *)1479 IOStatus PosixRandomRWFile::Flush(const IOOptions& /*opts*/,
1480                                   IODebugContext* /*dbg*/) {
1481   return IOStatus::OK();
1482 }
1483 
Sync(const IOOptions &,IODebugContext *)1484 IOStatus PosixRandomRWFile::Sync(const IOOptions& /*opts*/,
1485                                  IODebugContext* /*dbg*/) {
1486   if (fdatasync(fd_) < 0) {
1487     return IOError("While fdatasync random read/write file", filename_, errno);
1488   }
1489   return IOStatus::OK();
1490 }
1491 
Fsync(const IOOptions &,IODebugContext *)1492 IOStatus PosixRandomRWFile::Fsync(const IOOptions& /*opts*/,
1493                                   IODebugContext* /*dbg*/) {
1494   if (fsync(fd_) < 0) {
1495     return IOError("While fsync random read/write file", filename_, errno);
1496   }
1497   return IOStatus::OK();
1498 }
1499 
Close(const IOOptions &,IODebugContext *)1500 IOStatus PosixRandomRWFile::Close(const IOOptions& /*opts*/,
1501                                   IODebugContext* /*dbg*/) {
1502   if (close(fd_) < 0) {
1503     return IOError("While close random read/write file", filename_, errno);
1504   }
1505   fd_ = -1;
1506   return IOStatus::OK();
1507 }
1508 
~PosixMemoryMappedFileBuffer()1509 PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
1510   // TODO should have error handling though not much we can do...
1511   munmap(this->base_, length_);
1512 }
1513 
1514 /*
1515  * PosixDirectory
1516  */
1517 
~PosixDirectory()1518 PosixDirectory::~PosixDirectory() { close(fd_); }
1519 
Fsync(const IOOptions &,IODebugContext *)1520 IOStatus PosixDirectory::Fsync(const IOOptions& /*opts*/,
1521                                IODebugContext* /*dbg*/) {
1522 #ifndef OS_AIX
1523   if (fsync(fd_) == -1) {
1524     return IOError("While fsync", "a directory", errno);
1525   }
1526 #endif
1527   return IOStatus::OK();
1528 }
1529 }  // namespace ROCKSDB_NAMESPACE
1530 #endif
1531