1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/random_access_file_reader.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "file/file_util.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "table/format.h"
20 #include "test_util/sync_point.h"
21 #include "util/random.h"
22 #include "util/rate_limiter.h"
23 
24 namespace ROCKSDB_NAMESPACE {
Create(const std::shared_ptr<FileSystem> & fs,const std::string & fname,const FileOptions & file_opts,std::unique_ptr<RandomAccessFileReader> * reader,IODebugContext * dbg)25 IOStatus RandomAccessFileReader::Create(
26     const std::shared_ptr<FileSystem>& fs, const std::string& fname,
27     const FileOptions& file_opts,
28     std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) {
29   std::unique_ptr<FSRandomAccessFile> file;
30   IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
31   if (io_s.ok()) {
32     reader->reset(new RandomAccessFileReader(std::move(file), fname));
33   }
34   return io_s;
35 }
36 
Read(const IOOptions & opts,uint64_t offset,size_t n,Slice * result,char * scratch,AlignedBuf * aligned_buf,bool for_compaction) const37 IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
38                                       size_t n, Slice* result, char* scratch,
39                                       AlignedBuf* aligned_buf,
40                                       bool for_compaction) const {
41   (void)aligned_buf;
42 
43   TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
44   IOStatus io_s;
45   uint64_t elapsed = 0;
46   {
47     StopWatch sw(clock_, stats_, hist_type_,
48                  (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
49                  true /*delay_enabled*/);
50     auto prev_perf_level = GetPerfLevel();
51     IOSTATS_TIMER_GUARD(read_nanos);
52     if (use_direct_io()) {
53 #ifndef ROCKSDB_LITE
54       size_t alignment = file_->GetRequiredBufferAlignment();
55       size_t aligned_offset =
56           TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
57       size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
58       size_t read_size =
59           Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
60       AlignedBuffer buf;
61       buf.Alignment(alignment);
62       buf.AllocateNewBuffer(read_size);
63       while (buf.CurrentSize() < read_size) {
64         size_t allowed;
65         if (for_compaction && rate_limiter_ != nullptr) {
66           allowed = rate_limiter_->RequestToken(
67               buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
68               Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
69         } else {
70           assert(buf.CurrentSize() == 0);
71           allowed = read_size;
72         }
73         Slice tmp;
74 
75         FileOperationInfo::StartTimePoint start_ts;
76         uint64_t orig_offset = 0;
77         if (ShouldNotifyListeners()) {
78           start_ts = FileOperationInfo::StartNow();
79           orig_offset = aligned_offset + buf.CurrentSize();
80         }
81 
82         {
83           IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
84           // Only user reads are expected to specify a timeout. And user reads
85           // are not subjected to rate_limiter and should go through only
86           // one iteration of this loop, so we don't need to check and adjust
87           // the opts.timeout before calling file_->Read
88           assert(!opts.timeout.count() || allowed == read_size);
89           io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
90                              &tmp, buf.Destination(), nullptr);
91         }
92         if (ShouldNotifyListeners()) {
93           auto finish_ts = FileOperationInfo::FinishNow();
94           NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
95                                  io_s);
96         }
97 
98         buf.Size(buf.CurrentSize() + tmp.size());
99         if (!io_s.ok() || tmp.size() < allowed) {
100           break;
101         }
102       }
103       size_t res_len = 0;
104       if (io_s.ok() && offset_advance < buf.CurrentSize()) {
105         res_len = std::min(buf.CurrentSize() - offset_advance, n);
106         if (aligned_buf == nullptr) {
107           buf.Read(scratch, offset_advance, res_len);
108         } else {
109           scratch = buf.BufferStart() + offset_advance;
110           aligned_buf->reset(buf.Release());
111         }
112       }
113       *result = Slice(scratch, res_len);
114 #endif  // !ROCKSDB_LITE
115     } else {
116       size_t pos = 0;
117       const char* res_scratch = nullptr;
118       while (pos < n) {
119         size_t allowed;
120         if (for_compaction && rate_limiter_ != nullptr) {
121           if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
122             sw.DelayStart();
123           }
124           allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
125                                                 Env::IOPriority::IO_LOW, stats_,
126                                                 RateLimiter::OpType::kRead);
127           if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
128             sw.DelayStop();
129           }
130         } else {
131           allowed = n;
132         }
133         Slice tmp_result;
134 
135 #ifndef ROCKSDB_LITE
136         FileOperationInfo::StartTimePoint start_ts;
137         if (ShouldNotifyListeners()) {
138           start_ts = FileOperationInfo::StartNow();
139         }
140 #endif
141 
142         {
143           IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
144           // Only user reads are expected to specify a timeout. And user reads
145           // are not subjected to rate_limiter and should go through only
146           // one iteration of this loop, so we don't need to check and adjust
147           // the opts.timeout before calling file_->Read
148           assert(!opts.timeout.count() || allowed == n);
149           io_s = file_->Read(offset + pos, allowed, opts, &tmp_result,
150                              scratch + pos, nullptr);
151         }
152 #ifndef ROCKSDB_LITE
153         if (ShouldNotifyListeners()) {
154           auto finish_ts = FileOperationInfo::FinishNow();
155           NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
156                                  finish_ts, io_s);
157         }
158 #endif
159 
160         if (res_scratch == nullptr) {
161           // we can't simply use `scratch` because reads of mmap'd files return
162           // data in a different buffer.
163           res_scratch = tmp_result.data();
164         } else {
165           // make sure chunks are inserted contiguously into `res_scratch`.
166           assert(tmp_result.data() == res_scratch + pos);
167         }
168         pos += tmp_result.size();
169         if (!io_s.ok() || tmp_result.size() < allowed) {
170           break;
171         }
172       }
173       *result = Slice(res_scratch, io_s.ok() ? pos : 0);
174     }
175     IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
176     SetPerfLevel(prev_perf_level);
177   }
178   if (stats_ != nullptr && file_read_hist_ != nullptr) {
179     file_read_hist_->Add(elapsed);
180   }
181 
182   return io_s;
183 }
184 
End(const FSReadRequest & r)185 size_t End(const FSReadRequest& r) {
186   return static_cast<size_t>(r.offset) + r.len;
187 }
188 
Align(const FSReadRequest & r,size_t alignment)189 FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
190   FSReadRequest req;
191   req.offset = static_cast<uint64_t>(
192     TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
193   req.len = Roundup(End(r), alignment) - req.offset;
194   req.scratch = nullptr;
195   return req;
196 }
197 
TryMerge(FSReadRequest * dest,const FSReadRequest & src)198 bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
199   size_t dest_offset = static_cast<size_t>(dest->offset);
200   size_t src_offset = static_cast<size_t>(src.offset);
201   size_t dest_end = End(*dest);
202   size_t src_end = End(src);
203   if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) {
204     return false;
205   }
206   dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
207   dest->len = std::max(dest_end, src_end) - dest->offset;
208   return true;
209 }
210 
MultiRead(const IOOptions & opts,FSReadRequest * read_reqs,size_t num_reqs,AlignedBuf * aligned_buf) const211 IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
212                                            FSReadRequest* read_reqs,
213                                            size_t num_reqs,
214                                            AlignedBuf* aligned_buf) const {
215   (void)aligned_buf;  // suppress warning of unused variable in LITE mode
216   assert(num_reqs > 0);
217   IOStatus io_s;
218   uint64_t elapsed = 0;
219   {
220     StopWatch sw(clock_, stats_, hist_type_,
221                  (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
222                  true /*delay_enabled*/);
223     auto prev_perf_level = GetPerfLevel();
224     IOSTATS_TIMER_GUARD(read_nanos);
225 
226     FSReadRequest* fs_reqs = read_reqs;
227     size_t num_fs_reqs = num_reqs;
228 #ifndef ROCKSDB_LITE
229     std::vector<FSReadRequest> aligned_reqs;
230     if (use_direct_io()) {
231       // num_reqs is the max possible size,
232       // this can reduce std::vecector's internal resize operations.
233       aligned_reqs.reserve(num_reqs);
234       // Align and merge the read requests.
235       size_t alignment = file_->GetRequiredBufferAlignment();
236       for (size_t i = 0; i < num_reqs; i++) {
237         const auto& r = Align(read_reqs[i], alignment);
238         if (i == 0) {
239           // head
240           aligned_reqs.push_back(r);
241 
242         } else if (!TryMerge(&aligned_reqs.back(), r)) {
243           // head + n
244           aligned_reqs.push_back(r);
245 
246         } else {
247           // unused
248           r.status.PermitUncheckedError();
249         }
250       }
251       TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
252                                &aligned_reqs);
253 
254       // Allocate aligned buffer and let scratch buffers point to it.
255       size_t total_len = 0;
256       for (const auto& r : aligned_reqs) {
257         total_len += r.len;
258       }
259       AlignedBuffer buf;
260       buf.Alignment(alignment);
261       buf.AllocateNewBuffer(total_len);
262       char* scratch = buf.BufferStart();
263       for (auto& r : aligned_reqs) {
264         r.scratch = scratch;
265         scratch += r.len;
266       }
267 
268       aligned_buf->reset(buf.Release());
269       fs_reqs = aligned_reqs.data();
270       num_fs_reqs = aligned_reqs.size();
271     }
272 #endif  // ROCKSDB_LITE
273 
274 #ifndef ROCKSDB_LITE
275     FileOperationInfo::StartTimePoint start_ts;
276     if (ShouldNotifyListeners()) {
277       start_ts = FileOperationInfo::StartNow();
278     }
279 #endif  // ROCKSDB_LITE
280 
281     {
282       IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
283       io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
284     }
285 
286 #ifndef ROCKSDB_LITE
287     if (use_direct_io()) {
288       // Populate results in the unaligned read requests.
289       size_t aligned_i = 0;
290       for (size_t i = 0; i < num_reqs; i++) {
291         auto& r = read_reqs[i];
292         if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
293           aligned_i++;
294         }
295         const auto& fs_r = fs_reqs[aligned_i];
296         r.status = fs_r.status;
297         if (r.status.ok()) {
298           uint64_t offset = r.offset - fs_r.offset;
299           size_t len = std::min(r.len, static_cast<size_t>(fs_r.len - offset));
300           r.result = Slice(fs_r.scratch + offset, len);
301         } else {
302           r.result = Slice();
303         }
304       }
305     }
306 #endif  // ROCKSDB_LITE
307 
308     for (size_t i = 0; i < num_reqs; ++i) {
309 #ifndef ROCKSDB_LITE
310       if (ShouldNotifyListeners()) {
311         auto finish_ts = FileOperationInfo::FinishNow();
312         NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
313                                start_ts, finish_ts, read_reqs[i].status);
314       }
315 #endif  // ROCKSDB_LITE
316       IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
317     }
318     SetPerfLevel(prev_perf_level);
319   }
320   if (stats_ != nullptr && file_read_hist_ != nullptr) {
321     file_read_hist_->Add(elapsed);
322   }
323 
324   return io_s;
325 }
326 
PrepareIOOptions(const ReadOptions & ro,IOOptions & opts)327 IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
328                                                   IOOptions& opts) {
329   if (clock_ != nullptr) {
330     return PrepareIOFromReadOptions(ro, clock_, opts);
331   } else {
332     return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts);
333   }
334 }
335 }  // namespace ROCKSDB_NAMESPACE
336