1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/sequence_file_reader.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "file/read_write_util.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/aligned_buffer.h"
21 #include "util/random.h"
22 #include "util/rate_limiter.h"
23
24 namespace ROCKSDB_NAMESPACE {
Create(const std::shared_ptr<FileSystem> & fs,const std::string & fname,const FileOptions & file_opts,std::unique_ptr<SequentialFileReader> * reader,IODebugContext * dbg)25 Status SequentialFileReader::Create(
26 const std::shared_ptr<FileSystem>& fs, const std::string& fname,
27 const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
28 IODebugContext* dbg) {
29 std::unique_ptr<FSSequentialFile> file;
30 Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
31 if (s.ok()) {
32 reader->reset(new SequentialFileReader(std::move(file), fname));
33 }
34 return s;
35 }
36
Read(size_t n,Slice * result,char * scratch)37 Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
38 Status s;
39 if (use_direct_io()) {
40 #ifndef ROCKSDB_LITE
41 size_t offset = offset_.fetch_add(n);
42 size_t alignment = file_->GetRequiredBufferAlignment();
43 size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
44 size_t offset_advance = offset - aligned_offset;
45 size_t size = Roundup(offset + n, alignment) - aligned_offset;
46 size_t r = 0;
47 AlignedBuffer buf;
48 buf.Alignment(alignment);
49 buf.AllocateNewBuffer(size);
50 Slice tmp;
51 s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
52 buf.BufferStart(), nullptr);
53 if (s.ok() && offset_advance < tmp.size()) {
54 buf.Size(tmp.size());
55 r = buf.Read(scratch, offset_advance,
56 std::min(tmp.size() - offset_advance, n));
57 }
58 *result = Slice(scratch, r);
59 #endif // !ROCKSDB_LITE
60 } else {
61 s = file_->Read(n, IOOptions(), result, scratch, nullptr);
62 }
63 IOSTATS_ADD(bytes_read, result->size());
64 return s;
65 }
66
Skip(uint64_t n)67 Status SequentialFileReader::Skip(uint64_t n) {
68 #ifndef ROCKSDB_LITE
69 if (use_direct_io()) {
70 offset_ += static_cast<size_t>(n);
71 return Status::OK();
72 }
73 #endif // !ROCKSDB_LITE
74 return file_->Skip(n);
75 }
76
77 namespace {
78 // This class wraps a SequentialFile, exposing same API, with the differenece
79 // of being able to prefetch up to readahead_size bytes and then serve them
80 // from memory, avoiding the entire round-trip if, for example, the data for the
81 // file is actually remote.
82 class ReadaheadSequentialFile : public FSSequentialFile {
83 public:
ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile> && file,size_t readahead_size)84 ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
85 size_t readahead_size)
86 : file_(std::move(file)),
87 alignment_(file_->GetRequiredBufferAlignment()),
88 readahead_size_(Roundup(readahead_size, alignment_)),
89 buffer_(),
90 buffer_offset_(0),
91 read_offset_(0) {
92 buffer_.Alignment(alignment_);
93 buffer_.AllocateNewBuffer(readahead_size_);
94 }
95
96 ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
97
98 ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
99
Read(size_t n,const IOOptions & opts,Slice * result,char * scratch,IODebugContext * dbg)100 IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
101 IODebugContext* dbg) override {
102 std::unique_lock<std::mutex> lk(lock_);
103
104 size_t cached_len = 0;
105 // Check if there is a cache hit, meaning that [offset, offset + n) is
106 // either completely or partially in the buffer. If it's completely cached,
107 // including end of file case when offset + n is greater than EOF, then
108 // return.
109 if (TryReadFromCache(n, &cached_len, scratch) &&
110 (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
111 // We read exactly what we needed, or we hit end of file - return.
112 *result = Slice(scratch, cached_len);
113 return IOStatus::OK();
114 }
115 n -= cached_len;
116
117 IOStatus s;
118 // Read-ahead only make sense if we have some slack left after reading
119 if (n + alignment_ >= readahead_size_) {
120 s = file_->Read(n, opts, result, scratch + cached_len, dbg);
121 if (s.ok()) {
122 read_offset_ += result->size();
123 *result = Slice(scratch, cached_len + result->size());
124 }
125 buffer_.Clear();
126 return s;
127 }
128
129 s = ReadIntoBuffer(readahead_size_, opts, dbg);
130 if (s.ok()) {
131 // The data we need is now in cache, so we can safely read it
132 size_t remaining_len;
133 TryReadFromCache(n, &remaining_len, scratch + cached_len);
134 *result = Slice(scratch, cached_len + remaining_len);
135 }
136 return s;
137 }
138
Skip(uint64_t n)139 IOStatus Skip(uint64_t n) override {
140 std::unique_lock<std::mutex> lk(lock_);
141 IOStatus s = IOStatus::OK();
142 // First check if we need to skip already cached data
143 if (buffer_.CurrentSize() > 0) {
144 // Do we need to skip beyond cached data?
145 if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
146 // Yes. Skip whaterver is in memory and adjust offset accordingly
147 n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
148 read_offset_ = buffer_offset_ + buffer_.CurrentSize();
149 } else {
150 // No. The entire section to be skipped is entirely i cache.
151 read_offset_ += n;
152 n = 0;
153 }
154 }
155 if (n > 0) {
156 // We still need to skip more, so call the file API for skipping
157 s = file_->Skip(n);
158 if (s.ok()) {
159 read_offset_ += n;
160 }
161 buffer_.Clear();
162 }
163 return s;
164 }
165
PositionedRead(uint64_t offset,size_t n,const IOOptions & opts,Slice * result,char * scratch,IODebugContext * dbg)166 IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
167 Slice* result, char* scratch,
168 IODebugContext* dbg) override {
169 return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
170 }
171
InvalidateCache(size_t offset,size_t length)172 IOStatus InvalidateCache(size_t offset, size_t length) override {
173 std::unique_lock<std::mutex> lk(lock_);
174 buffer_.Clear();
175 return file_->InvalidateCache(offset, length);
176 }
177
use_direct_io() const178 bool use_direct_io() const override { return file_->use_direct_io(); }
179
180 private:
181 // Tries to read from buffer_ n bytes. If anything was read from the cache, it
182 // sets cached_len to the number of bytes actually read, copies these number
183 // of bytes to scratch and returns true.
184 // If nothing was read sets cached_len to 0 and returns false.
TryReadFromCache(size_t n,size_t * cached_len,char * scratch)185 bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
186 if (read_offset_ < buffer_offset_ ||
187 read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
188 *cached_len = 0;
189 return false;
190 }
191 uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
192 *cached_len = std::min(
193 buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
194 memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
195 read_offset_ += *cached_len;
196 return true;
197 }
198
199 // Reads into buffer_ the next n bytes from file_.
200 // Can actually read less if EOF was reached.
201 // Returns the status of the read operastion on the file.
ReadIntoBuffer(size_t n,const IOOptions & opts,IODebugContext * dbg)202 IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
203 IODebugContext* dbg) {
204 if (n > buffer_.Capacity()) {
205 n = buffer_.Capacity();
206 }
207 assert(IsFileSectorAligned(n, alignment_));
208 Slice result;
209 IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
210 if (s.ok()) {
211 buffer_offset_ = read_offset_;
212 buffer_.Size(result.size());
213 assert(result.size() == 0 || buffer_.BufferStart() == result.data());
214 }
215 return s;
216 }
217
218 const std::unique_ptr<FSSequentialFile> file_;
219 const size_t alignment_;
220 const size_t readahead_size_;
221
222 std::mutex lock_;
223 // The buffer storing the prefetched data
224 AlignedBuffer buffer_;
225 // The offset in file_, corresponding to data stored in buffer_
226 uint64_t buffer_offset_;
227 // The offset up to which data was read from file_. In fact, it can be larger
228 // than the actual file size, since the file_->Skip(n) call doesn't return the
229 // actual number of bytes that were skipped, which can be less than n.
230 // This is not a problemm since read_offset_ is monotonically increasing and
231 // its only use is to figure out if next piece of data should be read from
232 // buffer_ or file_ directly.
233 uint64_t read_offset_;
234 };
235 } // namespace
236
237 std::unique_ptr<FSSequentialFile>
NewReadaheadSequentialFile(std::unique_ptr<FSSequentialFile> && file,size_t readahead_size)238 SequentialFileReader::NewReadaheadSequentialFile(
239 std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
240 if (file->GetRequiredBufferAlignment() >= readahead_size) {
241 // Short-circuit and return the original file if readahead_size is
242 // too small and hence doesn't make sense to be used for prefetching.
243 return std::move(file);
244 }
245 std::unique_ptr<FSSequentialFile> result(
246 new ReadaheadSequentialFile(std::move(file), readahead_size));
247 return result;
248 }
249 } // namespace ROCKSDB_NAMESPACE
250