1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/random_access_file_reader.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "file/file_util.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "table/format.h"
20 #include "test_util/sync_point.h"
21 #include "util/random.h"
22 #include "util/rate_limiter.h"
23
24 namespace ROCKSDB_NAMESPACE {
Create(const std::shared_ptr<FileSystem> & fs,const std::string & fname,const FileOptions & file_opts,std::unique_ptr<RandomAccessFileReader> * reader,IODebugContext * dbg)25 IOStatus RandomAccessFileReader::Create(
26 const std::shared_ptr<FileSystem>& fs, const std::string& fname,
27 const FileOptions& file_opts,
28 std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) {
29 std::unique_ptr<FSRandomAccessFile> file;
30 IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
31 if (io_s.ok()) {
32 reader->reset(new RandomAccessFileReader(std::move(file), fname));
33 }
34 return io_s;
35 }
36
Read(const IOOptions & opts,uint64_t offset,size_t n,Slice * result,char * scratch,AlignedBuf * aligned_buf,bool for_compaction) const37 IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
38 size_t n, Slice* result, char* scratch,
39 AlignedBuf* aligned_buf,
40 bool for_compaction) const {
41 (void)aligned_buf;
42
43 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
44 IOStatus io_s;
45 uint64_t elapsed = 0;
46 {
47 StopWatch sw(clock_, stats_, hist_type_,
48 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
49 true /*delay_enabled*/);
50 auto prev_perf_level = GetPerfLevel();
51 IOSTATS_TIMER_GUARD(read_nanos);
52 if (use_direct_io()) {
53 #ifndef ROCKSDB_LITE
54 size_t alignment = file_->GetRequiredBufferAlignment();
55 size_t aligned_offset =
56 TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
57 size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
58 size_t read_size =
59 Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
60 AlignedBuffer buf;
61 buf.Alignment(alignment);
62 buf.AllocateNewBuffer(read_size);
63 while (buf.CurrentSize() < read_size) {
64 size_t allowed;
65 if (for_compaction && rate_limiter_ != nullptr) {
66 allowed = rate_limiter_->RequestToken(
67 buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
68 Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
69 } else {
70 assert(buf.CurrentSize() == 0);
71 allowed = read_size;
72 }
73 Slice tmp;
74
75 FileOperationInfo::StartTimePoint start_ts;
76 uint64_t orig_offset = 0;
77 if (ShouldNotifyListeners()) {
78 start_ts = FileOperationInfo::StartNow();
79 orig_offset = aligned_offset + buf.CurrentSize();
80 }
81
82 {
83 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
84 // Only user reads are expected to specify a timeout. And user reads
85 // are not subjected to rate_limiter and should go through only
86 // one iteration of this loop, so we don't need to check and adjust
87 // the opts.timeout before calling file_->Read
88 assert(!opts.timeout.count() || allowed == read_size);
89 io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
90 &tmp, buf.Destination(), nullptr);
91 }
92 if (ShouldNotifyListeners()) {
93 auto finish_ts = FileOperationInfo::FinishNow();
94 NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
95 io_s);
96 }
97
98 buf.Size(buf.CurrentSize() + tmp.size());
99 if (!io_s.ok() || tmp.size() < allowed) {
100 break;
101 }
102 }
103 size_t res_len = 0;
104 if (io_s.ok() && offset_advance < buf.CurrentSize()) {
105 res_len = std::min(buf.CurrentSize() - offset_advance, n);
106 if (aligned_buf == nullptr) {
107 buf.Read(scratch, offset_advance, res_len);
108 } else {
109 scratch = buf.BufferStart() + offset_advance;
110 aligned_buf->reset(buf.Release());
111 }
112 }
113 *result = Slice(scratch, res_len);
114 #endif // !ROCKSDB_LITE
115 } else {
116 size_t pos = 0;
117 const char* res_scratch = nullptr;
118 while (pos < n) {
119 size_t allowed;
120 if (for_compaction && rate_limiter_ != nullptr) {
121 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
122 sw.DelayStart();
123 }
124 allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
125 Env::IOPriority::IO_LOW, stats_,
126 RateLimiter::OpType::kRead);
127 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
128 sw.DelayStop();
129 }
130 } else {
131 allowed = n;
132 }
133 Slice tmp_result;
134
135 #ifndef ROCKSDB_LITE
136 FileOperationInfo::StartTimePoint start_ts;
137 if (ShouldNotifyListeners()) {
138 start_ts = FileOperationInfo::StartNow();
139 }
140 #endif
141
142 {
143 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
144 // Only user reads are expected to specify a timeout. And user reads
145 // are not subjected to rate_limiter and should go through only
146 // one iteration of this loop, so we don't need to check and adjust
147 // the opts.timeout before calling file_->Read
148 assert(!opts.timeout.count() || allowed == n);
149 io_s = file_->Read(offset + pos, allowed, opts, &tmp_result,
150 scratch + pos, nullptr);
151 }
152 #ifndef ROCKSDB_LITE
153 if (ShouldNotifyListeners()) {
154 auto finish_ts = FileOperationInfo::FinishNow();
155 NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
156 finish_ts, io_s);
157 }
158 #endif
159
160 if (res_scratch == nullptr) {
161 // we can't simply use `scratch` because reads of mmap'd files return
162 // data in a different buffer.
163 res_scratch = tmp_result.data();
164 } else {
165 // make sure chunks are inserted contiguously into `res_scratch`.
166 assert(tmp_result.data() == res_scratch + pos);
167 }
168 pos += tmp_result.size();
169 if (!io_s.ok() || tmp_result.size() < allowed) {
170 break;
171 }
172 }
173 *result = Slice(res_scratch, io_s.ok() ? pos : 0);
174 }
175 IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
176 SetPerfLevel(prev_perf_level);
177 }
178 if (stats_ != nullptr && file_read_hist_ != nullptr) {
179 file_read_hist_->Add(elapsed);
180 }
181
182 return io_s;
183 }
184
End(const FSReadRequest & r)185 size_t End(const FSReadRequest& r) {
186 return static_cast<size_t>(r.offset) + r.len;
187 }
188
Align(const FSReadRequest & r,size_t alignment)189 FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
190 FSReadRequest req;
191 req.offset = static_cast<uint64_t>(
192 TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
193 req.len = Roundup(End(r), alignment) - req.offset;
194 req.scratch = nullptr;
195 return req;
196 }
197
TryMerge(FSReadRequest * dest,const FSReadRequest & src)198 bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
199 size_t dest_offset = static_cast<size_t>(dest->offset);
200 size_t src_offset = static_cast<size_t>(src.offset);
201 size_t dest_end = End(*dest);
202 size_t src_end = End(src);
203 if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) {
204 return false;
205 }
206 dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
207 dest->len = std::max(dest_end, src_end) - dest->offset;
208 return true;
209 }
210
MultiRead(const IOOptions & opts,FSReadRequest * read_reqs,size_t num_reqs,AlignedBuf * aligned_buf) const211 IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
212 FSReadRequest* read_reqs,
213 size_t num_reqs,
214 AlignedBuf* aligned_buf) const {
215 (void)aligned_buf; // suppress warning of unused variable in LITE mode
216 assert(num_reqs > 0);
217 IOStatus io_s;
218 uint64_t elapsed = 0;
219 {
220 StopWatch sw(clock_, stats_, hist_type_,
221 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
222 true /*delay_enabled*/);
223 auto prev_perf_level = GetPerfLevel();
224 IOSTATS_TIMER_GUARD(read_nanos);
225
226 FSReadRequest* fs_reqs = read_reqs;
227 size_t num_fs_reqs = num_reqs;
228 #ifndef ROCKSDB_LITE
229 std::vector<FSReadRequest> aligned_reqs;
230 if (use_direct_io()) {
231 // num_reqs is the max possible size,
232 // this can reduce std::vecector's internal resize operations.
233 aligned_reqs.reserve(num_reqs);
234 // Align and merge the read requests.
235 size_t alignment = file_->GetRequiredBufferAlignment();
236 for (size_t i = 0; i < num_reqs; i++) {
237 const auto& r = Align(read_reqs[i], alignment);
238 if (i == 0) {
239 // head
240 aligned_reqs.push_back(r);
241
242 } else if (!TryMerge(&aligned_reqs.back(), r)) {
243 // head + n
244 aligned_reqs.push_back(r);
245
246 } else {
247 // unused
248 r.status.PermitUncheckedError();
249 }
250 }
251 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
252 &aligned_reqs);
253
254 // Allocate aligned buffer and let scratch buffers point to it.
255 size_t total_len = 0;
256 for (const auto& r : aligned_reqs) {
257 total_len += r.len;
258 }
259 AlignedBuffer buf;
260 buf.Alignment(alignment);
261 buf.AllocateNewBuffer(total_len);
262 char* scratch = buf.BufferStart();
263 for (auto& r : aligned_reqs) {
264 r.scratch = scratch;
265 scratch += r.len;
266 }
267
268 aligned_buf->reset(buf.Release());
269 fs_reqs = aligned_reqs.data();
270 num_fs_reqs = aligned_reqs.size();
271 }
272 #endif // ROCKSDB_LITE
273
274 #ifndef ROCKSDB_LITE
275 FileOperationInfo::StartTimePoint start_ts;
276 if (ShouldNotifyListeners()) {
277 start_ts = FileOperationInfo::StartNow();
278 }
279 #endif // ROCKSDB_LITE
280
281 {
282 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
283 io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
284 }
285
286 #ifndef ROCKSDB_LITE
287 if (use_direct_io()) {
288 // Populate results in the unaligned read requests.
289 size_t aligned_i = 0;
290 for (size_t i = 0; i < num_reqs; i++) {
291 auto& r = read_reqs[i];
292 if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
293 aligned_i++;
294 }
295 const auto& fs_r = fs_reqs[aligned_i];
296 r.status = fs_r.status;
297 if (r.status.ok()) {
298 uint64_t offset = r.offset - fs_r.offset;
299 size_t len = std::min(r.len, static_cast<size_t>(fs_r.len - offset));
300 r.result = Slice(fs_r.scratch + offset, len);
301 } else {
302 r.result = Slice();
303 }
304 }
305 }
306 #endif // ROCKSDB_LITE
307
308 for (size_t i = 0; i < num_reqs; ++i) {
309 #ifndef ROCKSDB_LITE
310 if (ShouldNotifyListeners()) {
311 auto finish_ts = FileOperationInfo::FinishNow();
312 NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
313 start_ts, finish_ts, read_reqs[i].status);
314 }
315 #endif // ROCKSDB_LITE
316 IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
317 }
318 SetPerfLevel(prev_perf_level);
319 }
320 if (stats_ != nullptr && file_read_hist_ != nullptr) {
321 file_read_hist_->Add(elapsed);
322 }
323
324 return io_s;
325 }
326
PrepareIOOptions(const ReadOptions & ro,IOOptions & opts)327 IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
328 IOOptions& opts) {
329 if (clock_ != nullptr) {
330 return PrepareIOFromReadOptions(ro, clock_, opts);
331 } else {
332 return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts);
333 }
334 }
335 } // namespace ROCKSDB_NAMESPACE
336