1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/sequence_file_reader.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "file/read_write_util.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/aligned_buffer.h"
21 #include "util/random.h"
22 #include "util/rate_limiter.h"
23 
24 namespace ROCKSDB_NAMESPACE {
Create(const std::shared_ptr<FileSystem> & fs,const std::string & fname,const FileOptions & file_opts,std::unique_ptr<SequentialFileReader> * reader,IODebugContext * dbg)25 Status SequentialFileReader::Create(
26     const std::shared_ptr<FileSystem>& fs, const std::string& fname,
27     const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
28     IODebugContext* dbg) {
29   std::unique_ptr<FSSequentialFile> file;
30   Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
31   if (s.ok()) {
32     reader->reset(new SequentialFileReader(std::move(file), fname));
33   }
34   return s;
35 }
36 
Read(size_t n,Slice * result,char * scratch)37 Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
38   Status s;
39   if (use_direct_io()) {
40 #ifndef ROCKSDB_LITE
41     size_t offset = offset_.fetch_add(n);
42     size_t alignment = file_->GetRequiredBufferAlignment();
43     size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
44     size_t offset_advance = offset - aligned_offset;
45     size_t size = Roundup(offset + n, alignment) - aligned_offset;
46     size_t r = 0;
47     AlignedBuffer buf;
48     buf.Alignment(alignment);
49     buf.AllocateNewBuffer(size);
50     Slice tmp;
51     s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
52                               buf.BufferStart(), nullptr);
53     if (s.ok() && offset_advance < tmp.size()) {
54       buf.Size(tmp.size());
55       r = buf.Read(scratch, offset_advance,
56                    std::min(tmp.size() - offset_advance, n));
57     }
58     *result = Slice(scratch, r);
59 #endif  // !ROCKSDB_LITE
60   } else {
61     s = file_->Read(n, IOOptions(), result, scratch, nullptr);
62   }
63   IOSTATS_ADD(bytes_read, result->size());
64   return s;
65 }
66 
Skip(uint64_t n)67 Status SequentialFileReader::Skip(uint64_t n) {
68 #ifndef ROCKSDB_LITE
69   if (use_direct_io()) {
70     offset_ += static_cast<size_t>(n);
71     return Status::OK();
72   }
73 #endif  // !ROCKSDB_LITE
74   return file_->Skip(n);
75 }
76 
77 namespace {
78 // This class wraps a SequentialFile, exposing same API, with the differenece
79 // of being able to prefetch up to readahead_size bytes and then serve them
80 // from memory, avoiding the entire round-trip if, for example, the data for the
81 // file is actually remote.
82 class ReadaheadSequentialFile : public FSSequentialFile {
83  public:
ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile> && file,size_t readahead_size)84   ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
85                           size_t readahead_size)
86       : file_(std::move(file)),
87         alignment_(file_->GetRequiredBufferAlignment()),
88         readahead_size_(Roundup(readahead_size, alignment_)),
89         buffer_(),
90         buffer_offset_(0),
91         read_offset_(0) {
92     buffer_.Alignment(alignment_);
93     buffer_.AllocateNewBuffer(readahead_size_);
94   }
95 
96   ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
97 
98   ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
99 
Read(size_t n,const IOOptions & opts,Slice * result,char * scratch,IODebugContext * dbg)100   IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
101                 IODebugContext* dbg) override {
102     std::unique_lock<std::mutex> lk(lock_);
103 
104     size_t cached_len = 0;
105     // Check if there is a cache hit, meaning that [offset, offset + n) is
106     // either completely or partially in the buffer. If it's completely cached,
107     // including end of file case when offset + n is greater than EOF, then
108     // return.
109     if (TryReadFromCache(n, &cached_len, scratch) &&
110         (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
111       // We read exactly what we needed, or we hit end of file - return.
112       *result = Slice(scratch, cached_len);
113       return IOStatus::OK();
114     }
115     n -= cached_len;
116 
117     IOStatus s;
118     // Read-ahead only make sense if we have some slack left after reading
119     if (n + alignment_ >= readahead_size_) {
120       s = file_->Read(n, opts, result, scratch + cached_len, dbg);
121       if (s.ok()) {
122         read_offset_ += result->size();
123         *result = Slice(scratch, cached_len + result->size());
124       }
125       buffer_.Clear();
126       return s;
127     }
128 
129     s = ReadIntoBuffer(readahead_size_, opts, dbg);
130     if (s.ok()) {
131       // The data we need is now in cache, so we can safely read it
132       size_t remaining_len;
133       TryReadFromCache(n, &remaining_len, scratch + cached_len);
134       *result = Slice(scratch, cached_len + remaining_len);
135     }
136     return s;
137   }
138 
Skip(uint64_t n)139   IOStatus Skip(uint64_t n) override {
140     std::unique_lock<std::mutex> lk(lock_);
141     IOStatus s = IOStatus::OK();
142     // First check if we need to skip already cached data
143     if (buffer_.CurrentSize() > 0) {
144       // Do we need to skip beyond cached data?
145       if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
146         // Yes. Skip whaterver is in memory and adjust offset accordingly
147         n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
148         read_offset_ = buffer_offset_ + buffer_.CurrentSize();
149       } else {
150         // No. The entire section to be skipped is entirely i cache.
151         read_offset_ += n;
152         n = 0;
153       }
154     }
155     if (n > 0) {
156       // We still need to skip more, so call the file API for skipping
157       s = file_->Skip(n);
158       if (s.ok()) {
159         read_offset_ += n;
160       }
161       buffer_.Clear();
162     }
163     return s;
164   }
165 
PositionedRead(uint64_t offset,size_t n,const IOOptions & opts,Slice * result,char * scratch,IODebugContext * dbg)166   IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
167                           Slice* result, char* scratch,
168                           IODebugContext* dbg) override {
169     return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
170   }
171 
InvalidateCache(size_t offset,size_t length)172   IOStatus InvalidateCache(size_t offset, size_t length) override {
173     std::unique_lock<std::mutex> lk(lock_);
174     buffer_.Clear();
175     return file_->InvalidateCache(offset, length);
176   }
177 
use_direct_io() const178   bool use_direct_io() const override { return file_->use_direct_io(); }
179 
180  private:
181   // Tries to read from buffer_ n bytes. If anything was read from the cache, it
182   // sets cached_len to the number of bytes actually read, copies these number
183   // of bytes to scratch and returns true.
184   // If nothing was read sets cached_len to 0 and returns false.
TryReadFromCache(size_t n,size_t * cached_len,char * scratch)185   bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
186     if (read_offset_ < buffer_offset_ ||
187         read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
188       *cached_len = 0;
189       return false;
190     }
191     uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
192     *cached_len = std::min(
193         buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
194     memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
195     read_offset_ += *cached_len;
196     return true;
197   }
198 
199   // Reads into buffer_ the next n bytes from file_.
200   // Can actually read less if EOF was reached.
201   // Returns the status of the read operastion on the file.
ReadIntoBuffer(size_t n,const IOOptions & opts,IODebugContext * dbg)202   IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
203                           IODebugContext* dbg) {
204     if (n > buffer_.Capacity()) {
205       n = buffer_.Capacity();
206     }
207     assert(IsFileSectorAligned(n, alignment_));
208     Slice result;
209     IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
210     if (s.ok()) {
211       buffer_offset_ = read_offset_;
212       buffer_.Size(result.size());
213       assert(result.size() == 0 || buffer_.BufferStart() == result.data());
214     }
215     return s;
216   }
217 
218   const std::unique_ptr<FSSequentialFile> file_;
219   const size_t alignment_;
220   const size_t readahead_size_;
221 
222   std::mutex lock_;
223   // The buffer storing the prefetched data
224   AlignedBuffer buffer_;
225   // The offset in file_, corresponding to data stored in buffer_
226   uint64_t buffer_offset_;
227   // The offset up to which data was read from file_. In fact, it can be larger
228   // than the actual file size, since the file_->Skip(n) call doesn't return the
229   // actual number of bytes that were skipped, which can be less than n.
230   // This is not a problemm since read_offset_ is monotonically increasing and
231   // its only use is to figure out if next piece of data should be read from
232   // buffer_ or file_ directly.
233   uint64_t read_offset_;
234 };
235 }  // namespace
236 
237 std::unique_ptr<FSSequentialFile>
NewReadaheadSequentialFile(std::unique_ptr<FSSequentialFile> && file,size_t readahead_size)238 SequentialFileReader::NewReadaheadSequentialFile(
239     std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
240   if (file->GetRequiredBufferAlignment() >= readahead_size) {
241     // Short-circuit and return the original file if readahead_size is
242     // too small and hence doesn't make sense to be used for prefetching.
243     return std::move(file);
244   }
245   std::unique_ptr<FSSequentialFile> result(
246       new ReadaheadSequentialFile(std::move(file), readahead_size));
247   return result;
248 }
249 }  // namespace ROCKSDB_NAMESPACE
250