1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/readahead_raf.h"
11
12 #include <algorithm>
13 #include <mutex>
14 #include "file/read_write_util.h"
15 #include "util/aligned_buffer.h"
16 #include "util/rate_limiter.h"
17
18 namespace ROCKSDB_NAMESPACE {
19 namespace {
20 class ReadaheadRandomAccessFile : public RandomAccessFile {
21 public:
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile> && file,size_t readahead_size)22 ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
23 size_t readahead_size)
24 : file_(std::move(file)),
25 alignment_(file_->GetRequiredBufferAlignment()),
26 readahead_size_(Roundup(readahead_size, alignment_)),
27 buffer_(),
28 buffer_offset_(0) {
29 buffer_.Alignment(alignment_);
30 buffer_.AllocateNewBuffer(readahead_size_);
31 }
32
33 ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
34
35 ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) =
36 delete;
37
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const38 Status Read(uint64_t offset, size_t n, Slice* result,
39 char* scratch) const override {
40 // Read-ahead only make sense if we have some slack left after reading
41 if (n + alignment_ >= readahead_size_) {
42 return file_->Read(offset, n, result, scratch);
43 }
44
45 std::unique_lock<std::mutex> lk(lock_);
46
47 size_t cached_len = 0;
48 // Check if there is a cache hit, meaning that [offset, offset + n) is
49 // either completely or partially in the buffer. If it's completely cached,
50 // including end of file case when offset + n is greater than EOF, then
51 // return.
52 if (TryReadFromCache(offset, n, &cached_len, scratch) &&
53 (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
54 // We read exactly what we needed, or we hit end of file - return.
55 *result = Slice(scratch, cached_len);
56 return Status::OK();
57 }
58 size_t advanced_offset = static_cast<size_t>(offset + cached_len);
59 // In the case of cache hit advanced_offset is already aligned, means that
60 // chunk_offset equals to advanced_offset
61 size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
62
63 Status s = ReadIntoBuffer(chunk_offset, readahead_size_);
64 if (s.ok()) {
65 // The data we need is now in cache, so we can safely read it
66 size_t remaining_len;
67 TryReadFromCache(advanced_offset, n - cached_len, &remaining_len,
68 scratch + cached_len);
69 *result = Slice(scratch, cached_len + remaining_len);
70 }
71 return s;
72 }
73
Prefetch(uint64_t offset,size_t n)74 Status Prefetch(uint64_t offset, size_t n) override {
75 if (n < readahead_size_) {
76 // Don't allow smaller prefetches than the configured `readahead_size_`.
77 // `Read()` assumes a smaller prefetch buffer indicates EOF was reached.
78 return Status::OK();
79 }
80
81 std::unique_lock<std::mutex> lk(lock_);
82
83 size_t offset_ = static_cast<size_t>(offset);
84 size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
85 if (prefetch_offset == buffer_offset_) {
86 return Status::OK();
87 }
88 return ReadIntoBuffer(prefetch_offset,
89 Roundup(offset_ + n, alignment_) - prefetch_offset);
90 }
91
GetUniqueId(char * id,size_t max_size) const92 size_t GetUniqueId(char* id, size_t max_size) const override {
93 return file_->GetUniqueId(id, max_size);
94 }
95
Hint(AccessPattern pattern)96 void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
97
InvalidateCache(size_t offset,size_t length)98 Status InvalidateCache(size_t offset, size_t length) override {
99 std::unique_lock<std::mutex> lk(lock_);
100 buffer_.Clear();
101 return file_->InvalidateCache(offset, length);
102 }
103
use_direct_io() const104 bool use_direct_io() const override { return file_->use_direct_io(); }
105
106 private:
107 // Tries to read from buffer_ n bytes starting at offset. If anything was read
108 // from the cache, it sets cached_len to the number of bytes actually read,
109 // copies these number of bytes to scratch and returns true.
110 // If nothing was read sets cached_len to 0 and returns false.
TryReadFromCache(uint64_t offset,size_t n,size_t * cached_len,char * scratch) const111 bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len,
112 char* scratch) const {
113 if (offset < buffer_offset_ ||
114 offset >= buffer_offset_ + buffer_.CurrentSize()) {
115 *cached_len = 0;
116 return false;
117 }
118 uint64_t offset_in_buffer = offset - buffer_offset_;
119 *cached_len = std::min(
120 buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
121 memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
122 return true;
123 }
124
125 // Reads into buffer_ the next n bytes from file_ starting at offset.
126 // Can actually read less if EOF was reached.
127 // Returns the status of the read operastion on the file.
ReadIntoBuffer(uint64_t offset,size_t n) const128 Status ReadIntoBuffer(uint64_t offset, size_t n) const {
129 if (n > buffer_.Capacity()) {
130 n = buffer_.Capacity();
131 }
132 assert(IsFileSectorAligned(offset, alignment_));
133 assert(IsFileSectorAligned(n, alignment_));
134 Slice result;
135 Status s = file_->Read(offset, n, &result, buffer_.BufferStart());
136 if (s.ok()) {
137 buffer_offset_ = offset;
138 buffer_.Size(result.size());
139 assert(result.size() == 0 || buffer_.BufferStart() == result.data());
140 }
141 return s;
142 }
143
144 const std::unique_ptr<RandomAccessFile> file_;
145 const size_t alignment_;
146 const size_t readahead_size_;
147
148 mutable std::mutex lock_;
149 // The buffer storing the prefetched data
150 mutable AlignedBuffer buffer_;
151 // The offset in file_, corresponding to data stored in buffer_
152 mutable uint64_t buffer_offset_;
153 };
154 } // namespace
155
NewReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile> && file,size_t readahead_size)156 std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
157 std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
158 std::unique_ptr<RandomAccessFile> result(
159 new ReadaheadRandomAccessFile(std::move(file), readahead_size));
160 return result;
161 }
162 } // namespace ROCKSDB_NAMESPACE
163