1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/readahead_raf.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 #include "file/read_write_util.h"
15 #include "util/aligned_buffer.h"
16 #include "util/rate_limiter.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 namespace {
20 class ReadaheadRandomAccessFile : public RandomAccessFile {
21  public:
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile> && file,size_t readahead_size)22   ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
23                             size_t readahead_size)
24       : file_(std::move(file)),
25         alignment_(file_->GetRequiredBufferAlignment()),
26         readahead_size_(Roundup(readahead_size, alignment_)),
27         buffer_(),
28         buffer_offset_(0) {
29     buffer_.Alignment(alignment_);
30     buffer_.AllocateNewBuffer(readahead_size_);
31   }
32 
33   ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
34 
35   ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) =
36       delete;
37 
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const38   Status Read(uint64_t offset, size_t n, Slice* result,
39               char* scratch) const override {
40     // Read-ahead only make sense if we have some slack left after reading
41     if (n + alignment_ >= readahead_size_) {
42       return file_->Read(offset, n, result, scratch);
43     }
44 
45     std::unique_lock<std::mutex> lk(lock_);
46 
47     size_t cached_len = 0;
48     // Check if there is a cache hit, meaning that [offset, offset + n) is
49     // either completely or partially in the buffer. If it's completely cached,
50     // including end of file case when offset + n is greater than EOF, then
51     // return.
52     if (TryReadFromCache(offset, n, &cached_len, scratch) &&
53         (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
54       // We read exactly what we needed, or we hit end of file - return.
55       *result = Slice(scratch, cached_len);
56       return Status::OK();
57     }
58     size_t advanced_offset = static_cast<size_t>(offset + cached_len);
59     // In the case of cache hit advanced_offset is already aligned, means that
60     // chunk_offset equals to advanced_offset
61     size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
62 
63     Status s = ReadIntoBuffer(chunk_offset, readahead_size_);
64     if (s.ok()) {
65       // The data we need is now in cache, so we can safely read it
66       size_t remaining_len;
67       TryReadFromCache(advanced_offset, n - cached_len, &remaining_len,
68                        scratch + cached_len);
69       *result = Slice(scratch, cached_len + remaining_len);
70     }
71     return s;
72   }
73 
Prefetch(uint64_t offset,size_t n)74   Status Prefetch(uint64_t offset, size_t n) override {
75     if (n < readahead_size_) {
76       // Don't allow smaller prefetches than the configured `readahead_size_`.
77       // `Read()` assumes a smaller prefetch buffer indicates EOF was reached.
78       return Status::OK();
79     }
80 
81     std::unique_lock<std::mutex> lk(lock_);
82 
83     size_t offset_ = static_cast<size_t>(offset);
84     size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
85     if (prefetch_offset == buffer_offset_) {
86       return Status::OK();
87     }
88     return ReadIntoBuffer(prefetch_offset,
89                           Roundup(offset_ + n, alignment_) - prefetch_offset);
90   }
91 
GetUniqueId(char * id,size_t max_size) const92   size_t GetUniqueId(char* id, size_t max_size) const override {
93     return file_->GetUniqueId(id, max_size);
94   }
95 
Hint(AccessPattern pattern)96   void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
97 
InvalidateCache(size_t offset,size_t length)98   Status InvalidateCache(size_t offset, size_t length) override {
99     std::unique_lock<std::mutex> lk(lock_);
100     buffer_.Clear();
101     return file_->InvalidateCache(offset, length);
102   }
103 
use_direct_io() const104   bool use_direct_io() const override { return file_->use_direct_io(); }
105 
106  private:
107   // Tries to read from buffer_ n bytes starting at offset. If anything was read
108   // from the cache, it sets cached_len to the number of bytes actually read,
109   // copies these number of bytes to scratch and returns true.
110   // If nothing was read sets cached_len to 0 and returns false.
TryReadFromCache(uint64_t offset,size_t n,size_t * cached_len,char * scratch) const111   bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len,
112                         char* scratch) const {
113     if (offset < buffer_offset_ ||
114         offset >= buffer_offset_ + buffer_.CurrentSize()) {
115       *cached_len = 0;
116       return false;
117     }
118     uint64_t offset_in_buffer = offset - buffer_offset_;
119     *cached_len = std::min(
120         buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
121     memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
122     return true;
123   }
124 
125   // Reads into buffer_ the next n bytes from file_ starting at offset.
126   // Can actually read less if EOF was reached.
127   // Returns the status of the read operastion on the file.
ReadIntoBuffer(uint64_t offset,size_t n) const128   Status ReadIntoBuffer(uint64_t offset, size_t n) const {
129     if (n > buffer_.Capacity()) {
130       n = buffer_.Capacity();
131     }
132     assert(IsFileSectorAligned(offset, alignment_));
133     assert(IsFileSectorAligned(n, alignment_));
134     Slice result;
135     Status s = file_->Read(offset, n, &result, buffer_.BufferStart());
136     if (s.ok()) {
137       buffer_offset_ = offset;
138       buffer_.Size(result.size());
139       assert(result.size() == 0 || buffer_.BufferStart() == result.data());
140     }
141     return s;
142   }
143 
144   const std::unique_ptr<RandomAccessFile> file_;
145   const size_t alignment_;
146   const size_t readahead_size_;
147 
148   mutable std::mutex lock_;
149   // The buffer storing the prefetched data
150   mutable AlignedBuffer buffer_;
151   // The offset in file_, corresponding to data stored in buffer_
152   mutable uint64_t buffer_offset_;
153 };
154 }  // namespace
155 
NewReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile> && file,size_t readahead_size)156 std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
157     std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
158   std::unique_ptr<RandomAccessFile> result(
159       new ReadaheadRandomAccessFile(std::move(file), readahead_size));
160   return result;
161 }
162 }  // namespace ROCKSDB_NAMESPACE
163