1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/random_access_file_reader.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "monitoring/histogram.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "port/port.h"
18 #include "test_util/sync_point.h"
19 #include "util/random.h"
20 #include "util/rate_limiter.h"
21 
22 namespace ROCKSDB_NAMESPACE {
Read(uint64_t offset,size_t n,Slice * result,char * scratch,bool for_compaction) const23 Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
24                                     char* scratch, bool for_compaction) const {
25   Status s;
26   uint64_t elapsed = 0;
27   {
28     StopWatch sw(env_, stats_, hist_type_,
29                  (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
30                  true /*delay_enabled*/);
31     auto prev_perf_level = GetPerfLevel();
32     IOSTATS_TIMER_GUARD(read_nanos);
33     if (use_direct_io()) {
34 #ifndef ROCKSDB_LITE
35       size_t alignment = file_->GetRequiredBufferAlignment();
36       size_t aligned_offset =
37           TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
38       size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
39       size_t read_size =
40           Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
41       AlignedBuffer buf;
42       buf.Alignment(alignment);
43       buf.AllocateNewBuffer(read_size);
44       while (buf.CurrentSize() < read_size) {
45         size_t allowed;
46         if (for_compaction && rate_limiter_ != nullptr) {
47           allowed = rate_limiter_->RequestToken(
48               buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
49               Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
50         } else {
51           assert(buf.CurrentSize() == 0);
52           allowed = read_size;
53         }
54         Slice tmp;
55 
56         FileOperationInfo::TimePoint start_ts;
57         uint64_t orig_offset = 0;
58         if (ShouldNotifyListeners()) {
59           start_ts = std::chrono::system_clock::now();
60           orig_offset = aligned_offset + buf.CurrentSize();
61         }
62         {
63           IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
64           s = file_->Read(aligned_offset + buf.CurrentSize(), allowed,
65                           IOOptions(), &tmp, buf.Destination(), nullptr);
66         }
67         if (ShouldNotifyListeners()) {
68           auto finish_ts = std::chrono::system_clock::now();
69           NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
70                                  s);
71         }
72 
73         buf.Size(buf.CurrentSize() + tmp.size());
74         if (!s.ok() || tmp.size() < allowed) {
75           break;
76         }
77       }
78       size_t res_len = 0;
79       if (s.ok() && offset_advance < buf.CurrentSize()) {
80         res_len = buf.Read(scratch, offset_advance,
81                            std::min(buf.CurrentSize() - offset_advance, n));
82       }
83       *result = Slice(scratch, res_len);
84 #endif  // !ROCKSDB_LITE
85     } else {
86       size_t pos = 0;
87       const char* res_scratch = nullptr;
88       while (pos < n) {
89         size_t allowed;
90         if (for_compaction && rate_limiter_ != nullptr) {
91           if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
92             sw.DelayStart();
93           }
94           allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
95                                                 Env::IOPriority::IO_LOW, stats_,
96                                                 RateLimiter::OpType::kRead);
97           if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
98             sw.DelayStop();
99           }
100         } else {
101           allowed = n;
102         }
103         Slice tmp_result;
104 
105 #ifndef ROCKSDB_LITE
106         FileOperationInfo::TimePoint start_ts;
107         if (ShouldNotifyListeners()) {
108           start_ts = std::chrono::system_clock::now();
109         }
110 #endif
111         {
112           IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
113           s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result,
114                           scratch + pos, nullptr);
115         }
116 #ifndef ROCKSDB_LITE
117         if (ShouldNotifyListeners()) {
118           auto finish_ts = std::chrono::system_clock::now();
119           NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
120                                  finish_ts, s);
121         }
122 #endif
123 
124         if (res_scratch == nullptr) {
125           // we can't simply use `scratch` because reads of mmap'd files return
126           // data in a different buffer.
127           res_scratch = tmp_result.data();
128         } else {
129           // make sure chunks are inserted contiguously into `res_scratch`.
130           assert(tmp_result.data() == res_scratch + pos);
131         }
132         pos += tmp_result.size();
133         if (!s.ok() || tmp_result.size() < allowed) {
134           break;
135         }
136       }
137       *result = Slice(res_scratch, s.ok() ? pos : 0);
138     }
139     IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
140     SetPerfLevel(prev_perf_level);
141   }
142   if (stats_ != nullptr && file_read_hist_ != nullptr) {
143     file_read_hist_->Add(elapsed);
144   }
145 
146   return s;
147 }
148 
MultiRead(FSReadRequest * read_reqs,size_t num_reqs) const149 Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
150                                          size_t num_reqs) const {
151   Status s;
152   uint64_t elapsed = 0;
153   assert(!use_direct_io());
154   {
155     StopWatch sw(env_, stats_, hist_type_,
156                  (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
157                  true /*delay_enabled*/);
158     auto prev_perf_level = GetPerfLevel();
159     IOSTATS_TIMER_GUARD(read_nanos);
160 
161 #ifndef ROCKSDB_LITE
162     FileOperationInfo::TimePoint start_ts;
163     if (ShouldNotifyListeners()) {
164       start_ts = std::chrono::system_clock::now();
165     }
166 #endif  // ROCKSDB_LITE
167     {
168       IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
169       s = file_->MultiRead(read_reqs, num_reqs, IOOptions(), nullptr);
170     }
171     for (size_t i = 0; i < num_reqs; ++i) {
172 #ifndef ROCKSDB_LITE
173       if (ShouldNotifyListeners()) {
174         auto finish_ts = std::chrono::system_clock::now();
175         NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
176                                start_ts, finish_ts, read_reqs[i].status);
177       }
178 #endif  // ROCKSDB_LITE
179       IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
180     }
181     SetPerfLevel(prev_perf_level);
182   }
183   if (stats_ != nullptr && file_read_hist_ != nullptr) {
184     file_read_hist_->Add(elapsed);
185   }
186 
187   return s;
188 }
189 }  // namespace ROCKSDB_NAMESPACE
190