1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #ifdef ROCKSDB_LIB_IO_POSIX
11 #include "env/io_posix.h"
12 #include <errno.h>
13 #include <fcntl.h>
14 #include <algorithm>
15 #if defined(OS_LINUX)
16 #include <linux/fs.h>
17 #ifndef FALLOC_FL_KEEP_SIZE
18 #include <linux/falloc.h>
19 #endif
20 #endif
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/ioctl.h>
25 #include <sys/mman.h>
26 #include <sys/stat.h>
27 #include <sys/types.h>
28 #ifdef OS_LINUX
29 #include <sys/statfs.h>
30 #include <sys/sysmacros.h>
31 #endif
32 #include "monitoring/iostats_context_imp.h"
33 #include "port/port.h"
34 #include "port/stack_trace.h"
35 #include "rocksdb/slice.h"
36 #include "test_util/sync_point.h"
37 #include "util/autovector.h"
38 #include "util/coding.h"
39 #include "util/string_util.h"
40 
41 #if defined(OS_LINUX) && !defined(F_SET_RW_HINT)
42 #define F_LINUX_SPECIFIC_BASE 1024
43 #define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12)
44 #endif
45 
46 namespace ROCKSDB_NAMESPACE {
47 
IOErrorMsg(const std::string & context,const std::string & file_name)48 std::string IOErrorMsg(const std::string& context,
49                        const std::string& file_name) {
50   if (file_name.empty()) {
51     return context;
52   }
53   return context + ": " + file_name;
54 }
55 
56 // file_name can be left empty if it is not unkown.
IOError(const std::string & context,const std::string & file_name,int err_number)57 IOStatus IOError(const std::string& context, const std::string& file_name,
58                  int err_number) {
59   switch (err_number) {
60     case ENOSPC: {
61       IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name),
62                                      errnoStr(err_number).c_str());
63       s.SetRetryable(true);
64       return s;
65     }
66     case ESTALE:
67       return IOStatus::IOError(IOStatus::kStaleFile);
68     case ENOENT:
69       return IOStatus::PathNotFound(IOErrorMsg(context, file_name),
70                                     errnoStr(err_number).c_str());
71     default:
72       return IOStatus::IOError(IOErrorMsg(context, file_name),
73                                errnoStr(err_number).c_str());
74   }
75 }
76 
77 // A wrapper for fadvise, if the platform doesn't support fadvise,
78 // it will simply return 0.
Fadvise(int fd,off_t offset,size_t len,int advice)79 int Fadvise(int fd, off_t offset, size_t len, int advice) {
80 #ifdef OS_LINUX
81   return posix_fadvise(fd, offset, len, advice);
82 #else
83   (void)fd;
84   (void)offset;
85   (void)len;
86   (void)advice;
87   return 0;  // simply do nothing.
88 #endif
89 }
90 
91 namespace {
92 
93 // On MacOS (and probably *BSD), the posix write and pwrite calls do not support
94 // buffers larger than 2^31-1 bytes. These two wrappers fix this issue by
95 // cutting the buffer in 1GB chunks. We use this chunk size to be sure to keep
96 // the writes aligned.
97 
PosixWrite(int fd,const char * buf,size_t nbyte)98 bool PosixWrite(int fd, const char* buf, size_t nbyte) {
99   const size_t kLimit1Gb = 1UL << 30;
100 
101   const char* src = buf;
102   size_t left = nbyte;
103 
104   while (left != 0) {
105     size_t bytes_to_write = std::min(left, kLimit1Gb);
106 
107     ssize_t done = write(fd, src, bytes_to_write);
108     if (done < 0) {
109       if (errno == EINTR) {
110         continue;
111       }
112       return false;
113     }
114     left -= done;
115     src += done;
116   }
117   return true;
118 }
119 
PosixPositionedWrite(int fd,const char * buf,size_t nbyte,off_t offset)120 bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) {
121   const size_t kLimit1Gb = 1UL << 30;
122 
123   const char* src = buf;
124   size_t left = nbyte;
125 
126   while (left != 0) {
127     size_t bytes_to_write = std::min(left, kLimit1Gb);
128 
129     ssize_t done = pwrite(fd, src, bytes_to_write, offset);
130     if (done < 0) {
131       if (errno == EINTR) {
132         continue;
133       }
134       return false;
135     }
136     left -= done;
137     offset += done;
138     src += done;
139   }
140 
141   return true;
142 }
143 
144 #ifdef ROCKSDB_RANGESYNC_PRESENT
145 
146 #if !defined(ZFS_SUPER_MAGIC)
147 // The magic number for ZFS was not exposed until recently. It should be fixed
148 // forever so we can just copy the magic number here.
149 #define ZFS_SUPER_MAGIC 0x2fc12fc1
150 #endif
151 
IsSyncFileRangeSupported(int fd)152 bool IsSyncFileRangeSupported(int fd) {
153   // This function tracks and checks for cases where we know `sync_file_range`
154   // definitely will not work properly despite passing the compile-time check
155   // (`ROCKSDB_RANGESYNC_PRESENT`). If we are unsure, or if any of the checks
156   // fail in unexpected ways, we allow `sync_file_range` to be used. This way
157   // should minimize risk of impacting existing use cases.
158   struct statfs buf;
159   int ret = fstatfs(fd, &buf);
160   assert(ret == 0);
161   if (ret == 0 && buf.f_type == ZFS_SUPER_MAGIC) {
162     // Testing on ZFS showed the writeback did not happen asynchronously when
163     // `sync_file_range` was called, even though it returned success. Avoid it
164     // and use `fdatasync` instead to preserve the contract of `bytes_per_sync`,
165     // even though this'll incur extra I/O for metadata.
166     return false;
167   }
168 
169   ret = sync_file_range(fd, 0 /* offset */, 0 /* nbytes */, 0 /* flags */);
170   assert(!(ret == -1 && errno != ENOSYS));
171   if (ret == -1 && errno == ENOSYS) {
172     // `sync_file_range` is not implemented on all platforms even if
173     // compile-time checks pass and a supported filesystem is in-use. For
174     // example, using ext4 on WSL (Windows Subsystem for Linux),
175     // `sync_file_range()` returns `ENOSYS`
176     // ("Function not implemented").
177     return false;
178   }
179   // None of the known cases matched, so allow `sync_file_range` use.
180   return true;
181 }
182 
183 #undef ZFS_SUPER_MAGIC
184 
185 #endif  // ROCKSDB_RANGESYNC_PRESENT
186 
187 }  // anonymous namespace
188 
189 /*
190  * DirectIOHelper
191  */
192 namespace {
193 
IsSectorAligned(const size_t off,size_t sector_size)194 bool IsSectorAligned(const size_t off, size_t sector_size) {
195   assert((sector_size & (sector_size - 1)) == 0);
196   return (off & (sector_size - 1)) == 0;
197 }
198 
199 #ifndef NDEBUG
IsSectorAligned(const void * ptr,size_t sector_size)200 bool IsSectorAligned(const void* ptr, size_t sector_size) {
201   return uintptr_t(ptr) % sector_size == 0;
202 }
203 #endif
204 }  // namespace
205 
206 /*
207  * PosixSequentialFile
208  */
PosixSequentialFile(const std::string & fname,FILE * file,int fd,size_t logical_block_size,const EnvOptions & options)209 PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
210                                          int fd, size_t logical_block_size,
211                                          const EnvOptions& options)
212     : filename_(fname),
213       file_(file),
214       fd_(fd),
215       use_direct_io_(options.use_direct_reads),
216       logical_sector_size_(logical_block_size) {
217   assert(!options.use_direct_reads || !options.use_mmap_reads);
218 }
219 
~PosixSequentialFile()220 PosixSequentialFile::~PosixSequentialFile() {
221   if (!use_direct_io()) {
222     assert(file_);
223     fclose(file_);
224   } else {
225     assert(fd_);
226     close(fd_);
227   }
228 }
229 
Read(size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *)230 IOStatus PosixSequentialFile::Read(size_t n, const IOOptions& /*opts*/,
231                                    Slice* result, char* scratch,
232                                    IODebugContext* /*dbg*/) {
233   assert(result != nullptr && !use_direct_io());
234   IOStatus s;
235   size_t r = 0;
236   do {
237     clearerr(file_);
238     r = fread_unlocked(scratch, 1, n, file_);
239   } while (r == 0 && ferror(file_) && errno == EINTR);
240   *result = Slice(scratch, r);
241   if (r < n) {
242     if (feof(file_)) {
243       // We leave status as ok if we hit the end of the file
244       // We also clear the error so that the reads can continue
245       // if a new data is written to the file
246       clearerr(file_);
247     } else {
248       // A partial read with an error: return a non-ok status
249       s = IOError("While reading file sequentially", filename_, errno);
250     }
251   }
252   return s;
253 }
254 
PositionedRead(uint64_t offset,size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *)255 IOStatus PosixSequentialFile::PositionedRead(uint64_t offset, size_t n,
256                                              const IOOptions& /*opts*/,
257                                              Slice* result, char* scratch,
258                                              IODebugContext* /*dbg*/) {
259   assert(use_direct_io());
260   assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
261   assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
262   assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
263 
264   IOStatus s;
265   ssize_t r = -1;
266   size_t left = n;
267   char* ptr = scratch;
268   while (left > 0) {
269     r = pread(fd_, ptr, left, static_cast<off_t>(offset));
270     if (r <= 0) {
271       if (r == -1 && errno == EINTR) {
272         continue;
273       }
274       break;
275     }
276     ptr += r;
277     offset += r;
278     left -= r;
279     if (!IsSectorAligned(r, GetRequiredBufferAlignment())) {
280       // Bytes reads don't fill sectors. Should only happen at the end
281       // of the file.
282       break;
283     }
284   }
285   if (r < 0) {
286     // An error: return a non-ok status
287     s = IOError(
288         "While pread " + ToString(n) + " bytes from offset " + ToString(offset),
289         filename_, errno);
290   }
291   *result = Slice(scratch, (r < 0) ? 0 : n - left);
292   return s;
293 }
294 
Skip(uint64_t n)295 IOStatus PosixSequentialFile::Skip(uint64_t n) {
296   if (fseek(file_, static_cast<long int>(n), SEEK_CUR)) {
297     return IOError("While fseek to skip " + ToString(n) + " bytes", filename_,
298                    errno);
299   }
300   return IOStatus::OK();
301 }
302 
InvalidateCache(size_t offset,size_t length)303 IOStatus PosixSequentialFile::InvalidateCache(size_t offset, size_t length) {
304 #ifndef OS_LINUX
305   (void)offset;
306   (void)length;
307   return IOStatus::OK();
308 #else
309   if (!use_direct_io()) {
310     // free OS pages
311     int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
312     if (ret != 0) {
313       return IOError("While fadvise NotNeeded offset " + ToString(offset) +
314                          " len " + ToString(length),
315                      filename_, errno);
316     }
317   }
318   return IOStatus::OK();
319 #endif
320 }
321 
322 /*
323  * PosixRandomAccessFile
324  */
325 #if defined(OS_LINUX)
GetUniqueIdFromFile(int fd,char * id,size_t max_size)326 size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
327   if (max_size < kMaxVarint64Length * 3) {
328     return 0;
329   }
330 
331   struct stat buf;
332   int result = fstat(fd, &buf);
333   if (result == -1) {
334     return 0;
335   }
336 
337   long version = 0;
338   result = ioctl(fd, FS_IOC_GETVERSION, &version);
339   TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
340   if (result == -1) {
341     return 0;
342   }
343   uint64_t uversion = (uint64_t)version;
344 
345   char* rid = id;
346   rid = EncodeVarint64(rid, buf.st_dev);
347   rid = EncodeVarint64(rid, buf.st_ino);
348   rid = EncodeVarint64(rid, uversion);
349   assert(rid >= id);
350   return static_cast<size_t>(rid - id);
351 }
352 #endif
353 
354 #if defined(OS_MACOSX) || defined(OS_AIX)
GetUniqueIdFromFile(int fd,char * id,size_t max_size)355 size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
356   if (max_size < kMaxVarint64Length * 3) {
357     return 0;
358   }
359 
360   struct stat buf;
361   int result = fstat(fd, &buf);
362   if (result == -1) {
363     return 0;
364   }
365 
366   char* rid = id;
367   rid = EncodeVarint64(rid, buf.st_dev);
368   rid = EncodeVarint64(rid, buf.st_ino);
369   rid = EncodeVarint64(rid, buf.st_gen);
370   assert(rid >= id);
371   return static_cast<size_t>(rid - id);
372 }
373 #endif
374 
375 #ifdef OS_LINUX
RemoveTrailingSlash(const std::string & path)376 std::string RemoveTrailingSlash(const std::string& path) {
377   std::string p = path;
378   if (p.size() > 1 && p.back() == '/') {
379     p.pop_back();
380   }
381   return p;
382 }
383 
RefAndCacheLogicalBlockSize(const std::vector<std::string> & directories)384 Status LogicalBlockSizeCache::RefAndCacheLogicalBlockSize(
385     const std::vector<std::string>& directories) {
386   std::vector<std::string> dirs;
387   dirs.reserve(directories.size());
388   for (auto& d : directories) {
389     dirs.emplace_back(RemoveTrailingSlash(d));
390   }
391 
392   std::map<std::string, size_t> dir_sizes;
393   {
394     ReadLock lock(&cache_mutex_);
395     for (const auto& dir : dirs) {
396       if (cache_.find(dir) == cache_.end()) {
397         dir_sizes.emplace(dir, 0);
398       }
399     }
400   }
401 
402   Status s;
403   for (auto& dir_size : dir_sizes) {
404     s = get_logical_block_size_of_directory_(dir_size.first, &dir_size.second);
405     if (!s.ok()) {
406       return s;
407     }
408   }
409 
410   WriteLock lock(&cache_mutex_);
411   for (const auto& dir : dirs) {
412     auto& v = cache_[dir];
413     v.ref++;
414     auto dir_size = dir_sizes.find(dir);
415     if (dir_size != dir_sizes.end()) {
416       v.size = dir_size->second;
417     }
418   }
419   return s;
420 }
421 
UnrefAndTryRemoveCachedLogicalBlockSize(const std::vector<std::string> & directories)422 void LogicalBlockSizeCache::UnrefAndTryRemoveCachedLogicalBlockSize(
423     const std::vector<std::string>& directories) {
424   std::vector<std::string> dirs;
425   dirs.reserve(directories.size());
426   for (auto& dir : directories) {
427     dirs.emplace_back(RemoveTrailingSlash(dir));
428   }
429 
430   WriteLock lock(&cache_mutex_);
431   for (const auto& dir : dirs) {
432     auto it = cache_.find(dir);
433     if (it != cache_.end() && !(--(it->second.ref))) {
434       cache_.erase(it);
435     }
436   }
437 }
438 
GetLogicalBlockSize(const std::string & fname,int fd)439 size_t LogicalBlockSizeCache::GetLogicalBlockSize(const std::string& fname,
440                                                   int fd) {
441   std::string dir = fname.substr(0, fname.find_last_of("/"));
442   if (dir.empty()) {
443     dir = "/";
444   }
445   {
446     ReadLock lock(&cache_mutex_);
447     auto it = cache_.find(dir);
448     if (it != cache_.end()) {
449       return it->second.size;
450     }
451   }
452   return get_logical_block_size_of_fd_(fd);
453 }
454 #endif
455 
GetLogicalBlockSizeOfDirectory(const std::string & directory,size_t * size)456 Status PosixHelper::GetLogicalBlockSizeOfDirectory(const std::string& directory,
457                                                    size_t* size) {
458   int fd = open(directory.c_str(), O_DIRECTORY | O_RDONLY);
459   if (fd == -1) {
460     close(fd);
461     return Status::IOError("Cannot open directory " + directory);
462   }
463   *size = PosixHelper::GetLogicalBlockSizeOfFd(fd);
464   close(fd);
465   return Status::OK();
466 }
467 
GetLogicalBlockSizeOfFd(int fd)468 size_t PosixHelper::GetLogicalBlockSizeOfFd(int fd) {
469 #ifdef OS_LINUX
470   struct stat buf;
471   int result = fstat(fd, &buf);
472   if (result == -1) {
473     return kDefaultPageSize;
474   }
475   if (major(buf.st_dev) == 0) {
476     // Unnamed devices (e.g. non-device mounts), reserved as null device number.
477     // These don't have an entry in /sys/dev/block/. Return a sensible default.
478     return kDefaultPageSize;
479   }
480 
481   // Reading queue/logical_block_size does not require special permissions.
482   const int kBufferSize = 100;
483   char path[kBufferSize];
484   char real_path[PATH_MAX + 1];
485   snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
486            minor(buf.st_dev));
487   if (realpath(path, real_path) == nullptr) {
488     return kDefaultPageSize;
489   }
490   std::string device_dir(real_path);
491   if (!device_dir.empty() && device_dir.back() == '/') {
492     device_dir.pop_back();
493   }
494   // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
495   // and nvme0n1 have it.
496   // $ ls -al '/sys/dev/block/8:3'
497   // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
498   // ../../block/sda/sda3
499   // $ ls -al '/sys/dev/block/259:4'
500   // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
501   // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
502   size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
503   if (parent_end == std::string::npos) {
504     return kDefaultPageSize;
505   }
506   size_t parent_begin = device_dir.rfind('/', parent_end - 1);
507   if (parent_begin == std::string::npos) {
508     return kDefaultPageSize;
509   }
510   std::string parent =
511       device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
512   std::string child = device_dir.substr(parent_end + 1, std::string::npos);
513   if (parent != "block" &&
514       (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
515     device_dir = device_dir.substr(0, parent_end);
516   }
517   std::string fname = device_dir + "/queue/logical_block_size";
518   FILE* fp;
519   size_t size = 0;
520   fp = fopen(fname.c_str(), "r");
521   if (fp != nullptr) {
522     char* line = nullptr;
523     size_t len = 0;
524     if (getline(&line, &len, fp) != -1) {
525       sscanf(line, "%zu", &size);
526     }
527     free(line);
528     fclose(fp);
529   }
530   if (size != 0 && (size & (size - 1)) == 0) {
531     return size;
532   }
533 #endif
534   (void)fd;
535   return kDefaultPageSize;
536 }
537 
538 /*
539  * PosixRandomAccessFile
540  *
541  * pread() based random-access
542  */
PosixRandomAccessFile(const std::string & fname,int fd,size_t logical_block_size,const EnvOptions & options,ThreadLocalPtr * thread_local_io_urings)543 PosixRandomAccessFile::PosixRandomAccessFile(
544     const std::string& fname, int fd, size_t logical_block_size,
545     const EnvOptions& options
546 #if defined(ROCKSDB_IOURING_PRESENT)
547     ,
548     ThreadLocalPtr* thread_local_io_urings
549 #endif
550     )
551     : filename_(fname),
552       fd_(fd),
553       use_direct_io_(options.use_direct_reads),
554       logical_sector_size_(logical_block_size)
555 #if defined(ROCKSDB_IOURING_PRESENT)
556       ,
557       thread_local_io_urings_(thread_local_io_urings)
558 #endif
559 {
560   assert(!options.use_direct_reads || !options.use_mmap_reads);
561   assert(!options.use_mmap_reads || sizeof(void*) < 8);
562 }
563 
~PosixRandomAccessFile()564 PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); }
565 
Read(uint64_t offset,size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *) const566 IOStatus PosixRandomAccessFile::Read(uint64_t offset, size_t n,
567                                      const IOOptions& /*opts*/, Slice* result,
568                                      char* scratch,
569                                      IODebugContext* /*dbg*/) const {
570   if (use_direct_io()) {
571     assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
572     assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
573     assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
574   }
575   IOStatus s;
576   ssize_t r = -1;
577   size_t left = n;
578   char* ptr = scratch;
579   while (left > 0) {
580     r = pread(fd_, ptr, left, static_cast<off_t>(offset));
581     if (r <= 0) {
582       if (r == -1 && errno == EINTR) {
583         continue;
584       }
585       break;
586     }
587     ptr += r;
588     offset += r;
589     left -= r;
590     if (use_direct_io() &&
591         r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
592       // Bytes reads don't fill sectors. Should only happen at the end
593       // of the file.
594       break;
595     }
596   }
597   if (r < 0) {
598     // An error: return a non-ok status
599     s = IOError(
600         "While pread offset " + ToString(offset) + " len " + ToString(n),
601         filename_, errno);
602   }
603   *result = Slice(scratch, (r < 0) ? 0 : n - left);
604   return s;
605 }
606 
MultiRead(FSReadRequest * reqs,size_t num_reqs,const IOOptions & options,IODebugContext * dbg)607 IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
608                                           size_t num_reqs,
609                                           const IOOptions& options,
610                                           IODebugContext* dbg) {
611   if (use_direct_io()) {
612     for (size_t i = 0; i < num_reqs; i++) {
613       assert(IsSectorAligned(reqs[i].offset, GetRequiredBufferAlignment()));
614       assert(IsSectorAligned(reqs[i].len, GetRequiredBufferAlignment()));
615       assert(IsSectorAligned(reqs[i].scratch, GetRequiredBufferAlignment()));
616     }
617   }
618 
619 #if defined(ROCKSDB_IOURING_PRESENT)
620   struct io_uring* iu = nullptr;
621   if (thread_local_io_urings_) {
622     iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
623     if (iu == nullptr) {
624       iu = CreateIOUring();
625       if (iu != nullptr) {
626         thread_local_io_urings_->Reset(iu);
627       }
628     }
629   }
630 
631   // Init failed, platform doesn't support io_uring. Fall back to
632   // serialized reads
633   if (iu == nullptr) {
634     return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
635   }
636 
637   IOStatus ios = IOStatus::OK();
638 
639   struct WrappedReadRequest {
640     FSReadRequest* req;
641     struct iovec iov;
642     size_t finished_len;
643     explicit WrappedReadRequest(FSReadRequest* r) : req(r), finished_len(0) {}
644   };
645 
646   autovector<WrappedReadRequest, 32> req_wraps;
647   autovector<WrappedReadRequest*, 4> incomplete_rq_list;
648   std::unordered_set<WrappedReadRequest*> wrap_cache;
649 
650   for (size_t i = 0; i < num_reqs; i++) {
651     req_wraps.emplace_back(&reqs[i]);
652   }
653 
654   size_t reqs_off = 0;
655   while (num_reqs > reqs_off || !incomplete_rq_list.empty()) {
656     size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size();
657 
658     // If requests exceed depth, split it into batches
659     if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth;
660 
661     assert(incomplete_rq_list.size() <= this_reqs);
662     for (size_t i = 0; i < this_reqs; i++) {
663       WrappedReadRequest* rep_to_submit;
664       if (i < incomplete_rq_list.size()) {
665         rep_to_submit = incomplete_rq_list[i];
666       } else {
667         rep_to_submit = &req_wraps[reqs_off++];
668       }
669       assert(rep_to_submit->req->len > rep_to_submit->finished_len);
670       rep_to_submit->iov.iov_base =
671           rep_to_submit->req->scratch + rep_to_submit->finished_len;
672       rep_to_submit->iov.iov_len =
673           rep_to_submit->req->len - rep_to_submit->finished_len;
674 
675       struct io_uring_sqe* sqe;
676       sqe = io_uring_get_sqe(iu);
677       io_uring_prep_readv(
678           sqe, fd_, &rep_to_submit->iov, 1,
679           rep_to_submit->req->offset + rep_to_submit->finished_len);
680       io_uring_sqe_set_data(sqe, rep_to_submit);
681       wrap_cache.emplace(rep_to_submit);
682     }
683     incomplete_rq_list.clear();
684 
685     ssize_t ret =
686         io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
687     TEST_SYNC_POINT_CALLBACK(
688         "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
689         &ret);
690     TEST_SYNC_POINT_CALLBACK(
691         "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
692         iu);
693 
694     if (static_cast<size_t>(ret) != this_reqs) {
695       fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
696       // If error happens and we submitted fewer than expected, it is an
697       // exception case and we don't retry here. We should still consume
698       // what is is submitted in the ring.
699       for (ssize_t i = 0; i < ret; i++) {
700         struct io_uring_cqe* cqe = nullptr;
701         io_uring_wait_cqe(iu, &cqe);
702         if (cqe != nullptr) {
703           io_uring_cqe_seen(iu, cqe);
704         }
705       }
706       return IOStatus::IOError("io_uring_submit_and_wait() requested " +
707                                ToString(this_reqs) + " but returned " +
708                                ToString(ret));
709     }
710 
711     for (size_t i = 0; i < this_reqs; i++) {
712       struct io_uring_cqe* cqe = nullptr;
713       WrappedReadRequest* req_wrap;
714 
715       // We could use the peek variant here, but this seems safer in terms
716       // of our initial wait not reaping all completions
717       ret = io_uring_wait_cqe(iu, &cqe);
718       TEST_SYNC_POINT_CALLBACK(
719           "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret);
720       if (ret) {
721         ios = IOStatus::IOError("io_uring_wait_cqe() returns " + ToString(ret));
722 
723         if (cqe != nullptr) {
724           io_uring_cqe_seen(iu, cqe);
725         }
726         continue;
727       }
728 
729       req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
730       // Reset cqe data to catch any stray reuse of it
731       static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
732       // Check that we got a valid unique cqe data
733       auto wrap_check = wrap_cache.find(req_wrap);
734       if (wrap_check == wrap_cache.end()) {
735         fprintf(stderr,
736                 "PosixRandomAccessFile::MultiRead: "
737                 "Bad cqe data from IO uring - %p\n",
738                 req_wrap);
739         port::PrintStack();
740         ios = IOStatus::IOError("io_uring_cqe_get_data() returned " +
741                                 ToString((uint64_t)req_wrap));
742         continue;
743       }
744       wrap_cache.erase(wrap_check);
745 
746       FSReadRequest* req = req_wrap->req;
747       if (cqe->res < 0) {
748         req->result = Slice(req->scratch, 0);
749         req->status = IOError("Req failed", filename_, cqe->res);
750       } else {
751         size_t bytes_read = static_cast<size_t>(cqe->res);
752         TEST_SYNC_POINT_CALLBACK(
753             "PosixRandomAccessFile::MultiRead:io_uring_result", &bytes_read);
754         if (bytes_read == req_wrap->iov.iov_len) {
755           req->result = Slice(req->scratch, req->len);
756           req->status = IOStatus::OK();
757         } else if (bytes_read == 0) {
758           // cqe->res == 0 can means EOF, or can mean partial results. See
759           // comment
760           // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
761           // Fall back to pread in this case.
762           if (use_direct_io() &&
763               !IsSectorAligned(req_wrap->finished_len,
764                                GetRequiredBufferAlignment())) {
765             // Bytes reads don't fill sectors. Should only happen at the end
766             // of the file.
767             req->result = Slice(req->scratch, req_wrap->finished_len);
768             req->status = IOStatus::OK();
769           } else {
770             Slice tmp_slice;
771             req->status =
772                 Read(req->offset + req_wrap->finished_len,
773                      req->len - req_wrap->finished_len, options, &tmp_slice,
774                      req->scratch + req_wrap->finished_len, dbg);
775             req->result =
776                 Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
777           }
778         } else if (bytes_read < req_wrap->iov.iov_len) {
779           assert(bytes_read > 0);
780           assert(bytes_read + req_wrap->finished_len < req->len);
781           req_wrap->finished_len += bytes_read;
782           incomplete_rq_list.push_back(req_wrap);
783         } else {
784           req->result = Slice(req->scratch, 0);
785           req->status = IOError("Req returned more bytes than requested",
786                                 filename_, cqe->res);
787         }
788       }
789       io_uring_cqe_seen(iu, cqe);
790     }
791     wrap_cache.clear();
792   }
793   return ios;
794 #else
795   return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
796 #endif
797 }
798 
Prefetch(uint64_t offset,size_t n,const IOOptions &,IODebugContext *)799 IOStatus PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n,
800                                          const IOOptions& /*opts*/,
801                                          IODebugContext* /*dbg*/) {
802   IOStatus s;
803   if (!use_direct_io()) {
804     ssize_t r = 0;
805 #ifdef OS_LINUX
806     r = readahead(fd_, offset, n);
807 #endif
808 #ifdef OS_MACOSX
809     radvisory advice;
810     advice.ra_offset = static_cast<off_t>(offset);
811     advice.ra_count = static_cast<int>(n);
812     r = fcntl(fd_, F_RDADVISE, &advice);
813 #endif
814     if (r == -1) {
815       s = IOError("While prefetching offset " + ToString(offset) + " len " +
816                       ToString(n),
817                   filename_, errno);
818     }
819   }
820   return s;
821 }
822 
823 #if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
GetUniqueId(char * id,size_t max_size) const824 size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
825   return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
826 }
827 #endif
828 
Hint(AccessPattern pattern)829 void PosixRandomAccessFile::Hint(AccessPattern pattern) {
830   if (use_direct_io()) {
831     return;
832   }
833   switch (pattern) {
834     case kNormal:
835       Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL);
836       break;
837     case kRandom:
838       Fadvise(fd_, 0, 0, POSIX_FADV_RANDOM);
839       break;
840     case kSequential:
841       Fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
842       break;
843     case kWillNeed:
844       Fadvise(fd_, 0, 0, POSIX_FADV_WILLNEED);
845       break;
846     case kWontNeed:
847       Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED);
848       break;
849     default:
850       assert(false);
851       break;
852   }
853 }
854 
InvalidateCache(size_t offset,size_t length)855 IOStatus PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
856   if (use_direct_io()) {
857     return IOStatus::OK();
858   }
859 #ifndef OS_LINUX
860   (void)offset;
861   (void)length;
862   return IOStatus::OK();
863 #else
864   // free OS pages
865   int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
866   if (ret == 0) {
867     return IOStatus::OK();
868   }
869   return IOError("While fadvise NotNeeded offset " + ToString(offset) +
870                      " len " + ToString(length),
871                  filename_, errno);
872 #endif
873 }
874 
875 /*
876  * PosixMmapReadableFile
877  *
878  * mmap() based random-access
879  */
880 // base[0,length-1] contains the mmapped contents of the file.
PosixMmapReadableFile(const int fd,const std::string & fname,void * base,size_t length,const EnvOptions & options)881 PosixMmapReadableFile::PosixMmapReadableFile(const int fd,
882                                              const std::string& fname,
883                                              void* base, size_t length,
884                                              const EnvOptions& options)
885     : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) {
886 #ifdef NDEBUG
887   (void)options;
888 #endif
889   fd_ = fd_ + 0;  // suppress the warning for used variables
890   assert(options.use_mmap_reads);
891   assert(!options.use_direct_reads);
892 }
893 
~PosixMmapReadableFile()894 PosixMmapReadableFile::~PosixMmapReadableFile() {
895   int ret = munmap(mmapped_region_, length_);
896   if (ret != 0) {
897     fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n",
898             mmapped_region_, length_);
899   }
900   close(fd_);
901 }
902 
Read(uint64_t offset,size_t n,const IOOptions &,Slice * result,char *,IODebugContext *) const903 IOStatus PosixMmapReadableFile::Read(uint64_t offset, size_t n,
904                                      const IOOptions& /*opts*/, Slice* result,
905                                      char* /*scratch*/,
906                                      IODebugContext* /*dbg*/) const {
907   IOStatus s;
908   if (offset > length_) {
909     *result = Slice();
910     return IOError("While mmap read offset " + ToString(offset) +
911                        " larger than file length " + ToString(length_),
912                    filename_, EINVAL);
913   } else if (offset + n > length_) {
914     n = static_cast<size_t>(length_ - offset);
915   }
916   *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
917   return s;
918 }
919 
InvalidateCache(size_t offset,size_t length)920 IOStatus PosixMmapReadableFile::InvalidateCache(size_t offset, size_t length) {
921 #ifndef OS_LINUX
922   (void)offset;
923   (void)length;
924   return IOStatus::OK();
925 #else
926   // free OS pages
927   int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
928   if (ret == 0) {
929     return IOStatus::OK();
930   }
931   return IOError("While fadvise not needed. Offset " + ToString(offset) +
932                      " len" + ToString(length),
933                  filename_, errno);
934 #endif
935 }
936 
937 /*
938  * PosixMmapFile
939  *
940  * We preallocate up to an extra megabyte and use memcpy to append new
941  * data to the file.  This is safe since we either properly close the
942  * file before reading from it, or for log files, the reading code
943  * knows enough to skip zero suffixes.
944  */
UnmapCurrentRegion()945 IOStatus PosixMmapFile::UnmapCurrentRegion() {
946   TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0");
947   if (base_ != nullptr) {
948     int munmap_status = munmap(base_, limit_ - base_);
949     if (munmap_status != 0) {
950       return IOError("While munmap", filename_, munmap_status);
951     }
952     file_offset_ += limit_ - base_;
953     base_ = nullptr;
954     limit_ = nullptr;
955     last_sync_ = nullptr;
956     dst_ = nullptr;
957 
958     // Increase the amount we map the next time, but capped at 1MB
959     if (map_size_ < (1 << 20)) {
960       map_size_ *= 2;
961     }
962   }
963   return IOStatus::OK();
964 }
965 
MapNewRegion()966 IOStatus PosixMmapFile::MapNewRegion() {
967 #ifdef ROCKSDB_FALLOCATE_PRESENT
968   assert(base_ == nullptr);
969   TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0");
970   // we can't fallocate with FALLOC_FL_KEEP_SIZE here
971   if (allow_fallocate_) {
972     IOSTATS_TIMER_GUARD(allocate_nanos);
973     int alloc_status = fallocate(fd_, 0, file_offset_, map_size_);
974     if (alloc_status != 0) {
975       // fallback to posix_fallocate
976       alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
977     }
978     if (alloc_status != 0) {
979       return IOStatus::IOError("Error allocating space to file : " + filename_ +
980                                "Error : " + errnoStr(alloc_status).c_str());
981     }
982   }
983 
984   TEST_KILL_RANDOM("PosixMmapFile::Append:1");
985   void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_,
986                    file_offset_);
987   if (ptr == MAP_FAILED) {
988     return IOStatus::IOError("MMap failed on " + filename_);
989   }
990   TEST_KILL_RANDOM("PosixMmapFile::Append:2");
991 
992   base_ = reinterpret_cast<char*>(ptr);
993   limit_ = base_ + map_size_;
994   dst_ = base_;
995   last_sync_ = base_;
996   return IOStatus::OK();
997 #else
998   return IOStatus::NotSupported("This platform doesn't support fallocate()");
999 #endif
1000 }
1001 
Msync()1002 IOStatus PosixMmapFile::Msync() {
1003   if (dst_ == last_sync_) {
1004     return IOStatus::OK();
1005   }
1006   // Find the beginnings of the pages that contain the first and last
1007   // bytes to be synced.
1008   size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
1009   size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
1010   last_sync_ = dst_;
1011   TEST_KILL_RANDOM("PosixMmapFile::Msync:0");
1012   if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
1013     return IOError("While msync", filename_, errno);
1014   }
1015   return IOStatus::OK();
1016 }
1017 
PosixMmapFile(const std::string & fname,int fd,size_t page_size,const EnvOptions & options)1018 PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size,
1019                              const EnvOptions& options)
1020     : filename_(fname),
1021       fd_(fd),
1022       page_size_(page_size),
1023       map_size_(Roundup(65536, page_size)),
1024       base_(nullptr),
1025       limit_(nullptr),
1026       dst_(nullptr),
1027       last_sync_(nullptr),
1028       file_offset_(0) {
1029 #ifdef ROCKSDB_FALLOCATE_PRESENT
1030   allow_fallocate_ = options.allow_fallocate;
1031   fallocate_with_keep_size_ = options.fallocate_with_keep_size;
1032 #else
1033   (void)options;
1034 #endif
1035   assert((page_size & (page_size - 1)) == 0);
1036   assert(options.use_mmap_writes);
1037   assert(!options.use_direct_writes);
1038 }
1039 
~PosixMmapFile()1040 PosixMmapFile::~PosixMmapFile() {
1041   if (fd_ >= 0) {
1042     IOStatus s = PosixMmapFile::Close(IOOptions(), nullptr);
1043     s.PermitUncheckedError();
1044   }
1045 }
1046 
Append(const Slice & data,const IOOptions &,IODebugContext *)1047 IOStatus PosixMmapFile::Append(const Slice& data, const IOOptions& /*opts*/,
1048                                IODebugContext* /*dbg*/) {
1049   const char* src = data.data();
1050   size_t left = data.size();
1051   while (left > 0) {
1052     assert(base_ <= dst_);
1053     assert(dst_ <= limit_);
1054     size_t avail = limit_ - dst_;
1055     if (avail == 0) {
1056       IOStatus s = UnmapCurrentRegion();
1057       if (!s.ok()) {
1058         return s;
1059       }
1060       s = MapNewRegion();
1061       if (!s.ok()) {
1062         return s;
1063       }
1064       TEST_KILL_RANDOM("PosixMmapFile::Append:0");
1065     }
1066 
1067     size_t n = (left <= avail) ? left : avail;
1068     assert(dst_);
1069     memcpy(dst_, src, n);
1070     dst_ += n;
1071     src += n;
1072     left -= n;
1073   }
1074   return IOStatus::OK();
1075 }
1076 
Close(const IOOptions &,IODebugContext *)1077 IOStatus PosixMmapFile::Close(const IOOptions& /*opts*/,
1078                               IODebugContext* /*dbg*/) {
1079   IOStatus s;
1080   size_t unused = limit_ - dst_;
1081 
1082   s = UnmapCurrentRegion();
1083   if (!s.ok()) {
1084     s = IOError("While closing mmapped file", filename_, errno);
1085   } else if (unused > 0) {
1086     // Trim the extra space at the end of the file
1087     if (ftruncate(fd_, file_offset_ - unused) < 0) {
1088       s = IOError("While ftruncating mmaped file", filename_, errno);
1089     }
1090   }
1091 
1092   if (close(fd_) < 0) {
1093     if (s.ok()) {
1094       s = IOError("While closing mmapped file", filename_, errno);
1095     }
1096   }
1097 
1098   fd_ = -1;
1099   base_ = nullptr;
1100   limit_ = nullptr;
1101   return s;
1102 }
1103 
Flush(const IOOptions &,IODebugContext *)1104 IOStatus PosixMmapFile::Flush(const IOOptions& /*opts*/,
1105                               IODebugContext* /*dbg*/) {
1106   return IOStatus::OK();
1107 }
1108 
Sync(const IOOptions &,IODebugContext *)1109 IOStatus PosixMmapFile::Sync(const IOOptions& /*opts*/,
1110                              IODebugContext* /*dbg*/) {
1111   if (fdatasync(fd_) < 0) {
1112     return IOError("While fdatasync mmapped file", filename_, errno);
1113   }
1114 
1115   return Msync();
1116 }
1117 
1118 /**
1119  * Flush data as well as metadata to stable storage.
1120  */
Fsync(const IOOptions &,IODebugContext *)1121 IOStatus PosixMmapFile::Fsync(const IOOptions& /*opts*/,
1122                               IODebugContext* /*dbg*/) {
1123   if (fsync(fd_) < 0) {
1124     return IOError("While fsync mmaped file", filename_, errno);
1125   }
1126 
1127   return Msync();
1128 }
1129 
1130 /**
1131  * Get the size of valid data in the file. This will not match the
1132  * size that is returned from the filesystem because we use mmap
1133  * to extend file by map_size every time.
1134  */
GetFileSize(const IOOptions &,IODebugContext *)1135 uint64_t PosixMmapFile::GetFileSize(const IOOptions& /*opts*/,
1136                                     IODebugContext* /*dbg*/) {
1137   size_t used = dst_ - base_;
1138   return file_offset_ + used;
1139 }
1140 
InvalidateCache(size_t offset,size_t length)1141 IOStatus PosixMmapFile::InvalidateCache(size_t offset, size_t length) {
1142 #ifndef OS_LINUX
1143   (void)offset;
1144   (void)length;
1145   return IOStatus::OK();
1146 #else
1147   // free OS pages
1148   int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
1149   if (ret == 0) {
1150     return IOStatus::OK();
1151   }
1152   return IOError("While fadvise NotNeeded mmapped file", filename_, errno);
1153 #endif
1154 }
1155 
1156 #ifdef ROCKSDB_FALLOCATE_PRESENT
Allocate(uint64_t offset,uint64_t len,const IOOptions &,IODebugContext *)1157 IOStatus PosixMmapFile::Allocate(uint64_t offset, uint64_t len,
1158                                  const IOOptions& /*opts*/,
1159                                  IODebugContext* /*dbg*/) {
1160   assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1161   assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1162   TEST_KILL_RANDOM("PosixMmapFile::Allocate:0");
1163   int alloc_status = 0;
1164   if (allow_fallocate_) {
1165     alloc_status =
1166         fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
1167                   static_cast<off_t>(offset), static_cast<off_t>(len));
1168   }
1169   if (alloc_status == 0) {
1170     return IOStatus::OK();
1171   } else {
1172     return IOError(
1173         "While fallocate offset " + ToString(offset) + " len " + ToString(len),
1174         filename_, errno);
1175   }
1176 }
1177 #endif
1178 
1179 /*
1180  * PosixWritableFile
1181  *
1182  * Use posix write to write data to a file.
1183  */
PosixWritableFile(const std::string & fname,int fd,size_t logical_block_size,const EnvOptions & options)1184 PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
1185                                      size_t logical_block_size,
1186                                      const EnvOptions& options)
1187     : FSWritableFile(options),
1188       filename_(fname),
1189       use_direct_io_(options.use_direct_writes),
1190       fd_(fd),
1191       filesize_(0),
1192       logical_sector_size_(logical_block_size) {
1193 #ifdef ROCKSDB_FALLOCATE_PRESENT
1194   allow_fallocate_ = options.allow_fallocate;
1195   fallocate_with_keep_size_ = options.fallocate_with_keep_size;
1196 #endif
1197 #ifdef ROCKSDB_RANGESYNC_PRESENT
1198   sync_file_range_supported_ = IsSyncFileRangeSupported(fd_);
1199 #endif  // ROCKSDB_RANGESYNC_PRESENT
1200   assert(!options.use_mmap_writes);
1201 }
1202 
~PosixWritableFile()1203 PosixWritableFile::~PosixWritableFile() {
1204   if (fd_ >= 0) {
1205     IOStatus s = PosixWritableFile::Close(IOOptions(), nullptr);
1206     s.PermitUncheckedError();
1207   }
1208 }
1209 
Append(const Slice & data,const IOOptions &,IODebugContext *)1210 IOStatus PosixWritableFile::Append(const Slice& data, const IOOptions& /*opts*/,
1211                                    IODebugContext* /*dbg*/) {
1212   if (use_direct_io()) {
1213     assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
1214     assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
1215   }
1216   const char* src = data.data();
1217   size_t nbytes = data.size();
1218 
1219   if (!PosixWrite(fd_, src, nbytes)) {
1220     return IOError("While appending to file", filename_, errno);
1221   }
1222 
1223   filesize_ += nbytes;
1224   return IOStatus::OK();
1225 }
1226 
PositionedAppend(const Slice & data,uint64_t offset,const IOOptions &,IODebugContext *)1227 IOStatus PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset,
1228                                              const IOOptions& /*opts*/,
1229                                              IODebugContext* /*dbg*/) {
1230   if (use_direct_io()) {
1231     assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
1232     assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
1233     assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
1234   }
1235   assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1236   const char* src = data.data();
1237   size_t nbytes = data.size();
1238   if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
1239     return IOError("While pwrite to file at offset " + ToString(offset),
1240                    filename_, errno);
1241   }
1242   filesize_ = offset + nbytes;
1243   return IOStatus::OK();
1244 }
1245 
Truncate(uint64_t size,const IOOptions &,IODebugContext *)1246 IOStatus PosixWritableFile::Truncate(uint64_t size, const IOOptions& /*opts*/,
1247                                      IODebugContext* /*dbg*/) {
1248   IOStatus s;
1249   int r = ftruncate(fd_, size);
1250   if (r < 0) {
1251     s = IOError("While ftruncate file to size " + ToString(size), filename_,
1252                 errno);
1253   } else {
1254     filesize_ = size;
1255   }
1256   return s;
1257 }
1258 
Close(const IOOptions &,IODebugContext *)1259 IOStatus PosixWritableFile::Close(const IOOptions& /*opts*/,
1260                                   IODebugContext* /*dbg*/) {
1261   IOStatus s;
1262 
1263   size_t block_size;
1264   size_t last_allocated_block;
1265   GetPreallocationStatus(&block_size, &last_allocated_block);
1266   TEST_SYNC_POINT_CALLBACK("PosixWritableFile::Close", &last_allocated_block);
1267   if (last_allocated_block > 0) {
1268     // trim the extra space preallocated at the end of the file
1269     // NOTE(ljin): we probably don't want to surface failure as an IOError,
1270     // but it will be nice to log these errors.
1271     int dummy __attribute__((__unused__));
1272     dummy = ftruncate(fd_, filesize_);
1273 #if defined(ROCKSDB_FALLOCATE_PRESENT) && defined(FALLOC_FL_PUNCH_HOLE) && \
1274     !defined(TRAVIS)
1275     // in some file systems, ftruncate only trims trailing space if the
1276     // new file size is smaller than the current size. Calling fallocate
1277     // with FALLOC_FL_PUNCH_HOLE flag to explicitly release these unused
1278     // blocks. FALLOC_FL_PUNCH_HOLE is supported on at least the following
1279     // filesystems:
1280     //   XFS (since Linux 2.6.38)
1281     //   ext4 (since Linux 3.0)
1282     //   Btrfs (since Linux 3.7)
1283     //   tmpfs (since Linux 3.5)
1284     // We ignore error since failure of this operation does not affect
1285     // correctness.
1286     // TRAVIS - this code does not work on TRAVIS filesystems.
1287     // the FALLOC_FL_KEEP_SIZE option is expected to not change the size
1288     // of the file, but it does. Simple strace report will show that.
1289     // While we work with Travis-CI team to figure out if this is a
1290     // quirk of Docker/AUFS, we will comment this out.
1291     struct stat file_stats;
1292     int result = fstat(fd_, &file_stats);
1293     // After ftruncate, we check whether ftruncate has the correct behavior.
1294     // If not, we should hack it with FALLOC_FL_PUNCH_HOLE
1295     if (result == 0 &&
1296         (file_stats.st_size + file_stats.st_blksize - 1) /
1297                 file_stats.st_blksize !=
1298             file_stats.st_blocks / (file_stats.st_blksize / 512)) {
1299       IOSTATS_TIMER_GUARD(allocate_nanos);
1300       if (allow_fallocate_) {
1301         fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_,
1302                   block_size * last_allocated_block - filesize_);
1303       }
1304     }
1305 #endif
1306   }
1307 
1308   if (close(fd_) < 0) {
1309     s = IOError("While closing file after writing", filename_, errno);
1310   }
1311   fd_ = -1;
1312   return s;
1313 }
1314 
1315 // write out the cached data to the OS cache
Flush(const IOOptions &,IODebugContext *)1316 IOStatus PosixWritableFile::Flush(const IOOptions& /*opts*/,
1317                                   IODebugContext* /*dbg*/) {
1318   return IOStatus::OK();
1319 }
1320 
Sync(const IOOptions &,IODebugContext *)1321 IOStatus PosixWritableFile::Sync(const IOOptions& /*opts*/,
1322                                  IODebugContext* /*dbg*/) {
1323   if (fdatasync(fd_) < 0) {
1324     return IOError("While fdatasync", filename_, errno);
1325   }
1326   return IOStatus::OK();
1327 }
1328 
Fsync(const IOOptions &,IODebugContext *)1329 IOStatus PosixWritableFile::Fsync(const IOOptions& /*opts*/,
1330                                   IODebugContext* /*dbg*/) {
1331   if (fsync(fd_) < 0) {
1332     return IOError("While fsync", filename_, errno);
1333   }
1334   return IOStatus::OK();
1335 }
1336 
IsSyncThreadSafe() const1337 bool PosixWritableFile::IsSyncThreadSafe() const { return true; }
1338 
GetFileSize(const IOOptions &,IODebugContext *)1339 uint64_t PosixWritableFile::GetFileSize(const IOOptions& /*opts*/,
1340                                         IODebugContext* /*dbg*/) {
1341   return filesize_;
1342 }
1343 
SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint)1344 void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
1345 #ifdef OS_LINUX
1346 // Suppress Valgrind "Unimplemented functionality" error.
1347 #ifndef ROCKSDB_VALGRIND_RUN
1348   if (hint == write_hint_) {
1349     return;
1350   }
1351   if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) {
1352     write_hint_ = hint;
1353   }
1354 #else
1355   (void)hint;
1356 #endif  // ROCKSDB_VALGRIND_RUN
1357 #else
1358   (void)hint;
1359 #endif  // OS_LINUX
1360 }
1361 
InvalidateCache(size_t offset,size_t length)1362 IOStatus PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
1363   if (use_direct_io()) {
1364     return IOStatus::OK();
1365   }
1366 #ifndef OS_LINUX
1367   (void)offset;
1368   (void)length;
1369   return IOStatus::OK();
1370 #else
1371   // free OS pages
1372   int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
1373   if (ret == 0) {
1374     return IOStatus::OK();
1375   }
1376   return IOError("While fadvise NotNeeded", filename_, errno);
1377 #endif
1378 }
1379 
1380 #ifdef ROCKSDB_FALLOCATE_PRESENT
Allocate(uint64_t offset,uint64_t len,const IOOptions &,IODebugContext *)1381 IOStatus PosixWritableFile::Allocate(uint64_t offset, uint64_t len,
1382                                      const IOOptions& /*opts*/,
1383                                      IODebugContext* /*dbg*/) {
1384   assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1385   assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1386   TEST_KILL_RANDOM("PosixWritableFile::Allocate:0");
1387   IOSTATS_TIMER_GUARD(allocate_nanos);
1388   int alloc_status = 0;
1389   if (allow_fallocate_) {
1390     alloc_status =
1391         fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
1392                   static_cast<off_t>(offset), static_cast<off_t>(len));
1393   }
1394   if (alloc_status == 0) {
1395     return IOStatus::OK();
1396   } else {
1397     return IOError(
1398         "While fallocate offset " + ToString(offset) + " len " + ToString(len),
1399         filename_, errno);
1400   }
1401 }
1402 #endif
1403 
RangeSync(uint64_t offset,uint64_t nbytes,const IOOptions & opts,IODebugContext * dbg)1404 IOStatus PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes,
1405                                       const IOOptions& opts,
1406                                       IODebugContext* dbg) {
1407 #ifdef ROCKSDB_RANGESYNC_PRESENT
1408   assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1409   assert(nbytes <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1410   if (sync_file_range_supported_) {
1411     int ret;
1412     if (strict_bytes_per_sync_) {
1413       // Specifying `SYNC_FILE_RANGE_WAIT_BEFORE` together with an offset/length
1414       // that spans all bytes written so far tells `sync_file_range` to wait for
1415       // any outstanding writeback requests to finish before issuing a new one.
1416       ret =
1417           sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes),
1418                           SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE);
1419     } else {
1420       ret = sync_file_range(fd_, static_cast<off_t>(offset),
1421                             static_cast<off_t>(nbytes), SYNC_FILE_RANGE_WRITE);
1422     }
1423     if (ret != 0) {
1424       return IOError("While sync_file_range returned " + ToString(ret),
1425                      filename_, errno);
1426     }
1427     return IOStatus::OK();
1428   }
1429 #endif  // ROCKSDB_RANGESYNC_PRESENT
1430   return FSWritableFile::RangeSync(offset, nbytes, opts, dbg);
1431 }
1432 
1433 #ifdef OS_LINUX
GetUniqueId(char * id,size_t max_size) const1434 size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const {
1435   return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
1436 }
1437 #endif
1438 
1439 /*
1440  * PosixRandomRWFile
1441  */
1442 
PosixRandomRWFile(const std::string & fname,int fd,const EnvOptions &)1443 PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd,
1444                                      const EnvOptions& /*options*/)
1445     : filename_(fname), fd_(fd) {}
1446 
~PosixRandomRWFile()1447 PosixRandomRWFile::~PosixRandomRWFile() {
1448   if (fd_ >= 0) {
1449     IOStatus s = Close(IOOptions(), nullptr);
1450     s.PermitUncheckedError();
1451   }
1452 }
1453 
Write(uint64_t offset,const Slice & data,const IOOptions &,IODebugContext *)1454 IOStatus PosixRandomRWFile::Write(uint64_t offset, const Slice& data,
1455                                   const IOOptions& /*opts*/,
1456                                   IODebugContext* /*dbg*/) {
1457   const char* src = data.data();
1458   size_t nbytes = data.size();
1459   if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
1460     return IOError(
1461         "While write random read/write file at offset " + ToString(offset),
1462         filename_, errno);
1463   }
1464 
1465   return IOStatus::OK();
1466 }
1467 
Read(uint64_t offset,size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *) const1468 IOStatus PosixRandomRWFile::Read(uint64_t offset, size_t n,
1469                                  const IOOptions& /*opts*/, Slice* result,
1470                                  char* scratch, IODebugContext* /*dbg*/) const {
1471   size_t left = n;
1472   char* ptr = scratch;
1473   while (left > 0) {
1474     ssize_t done = pread(fd_, ptr, left, offset);
1475     if (done < 0) {
1476       // error while reading from file
1477       if (errno == EINTR) {
1478         // read was interrupted, try again.
1479         continue;
1480       }
1481       return IOError("While reading random read/write file offset " +
1482                          ToString(offset) + " len " + ToString(n),
1483                      filename_, errno);
1484     } else if (done == 0) {
1485       // Nothing more to read
1486       break;
1487     }
1488 
1489     // Read `done` bytes
1490     ptr += done;
1491     offset += done;
1492     left -= done;
1493   }
1494 
1495   *result = Slice(scratch, n - left);
1496   return IOStatus::OK();
1497 }
1498 
Flush(const IOOptions &,IODebugContext *)1499 IOStatus PosixRandomRWFile::Flush(const IOOptions& /*opts*/,
1500                                   IODebugContext* /*dbg*/) {
1501   return IOStatus::OK();
1502 }
1503 
Sync(const IOOptions &,IODebugContext *)1504 IOStatus PosixRandomRWFile::Sync(const IOOptions& /*opts*/,
1505                                  IODebugContext* /*dbg*/) {
1506   if (fdatasync(fd_) < 0) {
1507     return IOError("While fdatasync random read/write file", filename_, errno);
1508   }
1509   return IOStatus::OK();
1510 }
1511 
Fsync(const IOOptions &,IODebugContext *)1512 IOStatus PosixRandomRWFile::Fsync(const IOOptions& /*opts*/,
1513                                   IODebugContext* /*dbg*/) {
1514   if (fsync(fd_) < 0) {
1515     return IOError("While fsync random read/write file", filename_, errno);
1516   }
1517   return IOStatus::OK();
1518 }
1519 
Close(const IOOptions &,IODebugContext *)1520 IOStatus PosixRandomRWFile::Close(const IOOptions& /*opts*/,
1521                                   IODebugContext* /*dbg*/) {
1522   if (close(fd_) < 0) {
1523     return IOError("While close random read/write file", filename_, errno);
1524   }
1525   fd_ = -1;
1526   return IOStatus::OK();
1527 }
1528 
~PosixMemoryMappedFileBuffer()1529 PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
1530   // TODO should have error handling though not much we can do...
1531   munmap(this->base_, length_);
1532 }
1533 
1534 /*
1535  * PosixDirectory
1536  */
1537 
~PosixDirectory()1538 PosixDirectory::~PosixDirectory() { close(fd_); }
1539 
Fsync(const IOOptions &,IODebugContext *)1540 IOStatus PosixDirectory::Fsync(const IOOptions& /*opts*/,
1541                                IODebugContext* /*dbg*/) {
1542 #ifndef OS_AIX
1543   if (fsync(fd_) == -1) {
1544     return IOError("While fsync", "a directory", errno);
1545   }
1546 #endif
1547   return IOStatus::OK();
1548 }
1549 }  // namespace ROCKSDB_NAMESPACE
1550 #endif
1551