1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "file/random_access_file_reader.h"
7
8 #include <algorithm>
9
10 #include "file/file_util.h"
11 #include "port/port.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/file_system.h"
14 #include "test_util/sync_point.h"
15 #include "test_util/testharness.h"
16 #include "test_util/testutil.h"
17 #include "util/random.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 class RandomAccessFileReaderTest : public testing::Test {
22 public:
SetUp()23 void SetUp() override {
24 SetupSyncPointsToMockDirectIO();
25 env_ = Env::Default();
26 fs_ = FileSystem::Default();
27 test_dir_ = test::PerThreadDBPath("random_access_file_reader_test");
28 ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
29 }
30
TearDown()31 void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }
32
Write(const std::string & fname,const std::string & content)33 void Write(const std::string& fname, const std::string& content) {
34 std::unique_ptr<FSWritableFile> f;
35 ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr));
36 ASSERT_OK(f->Append(content, IOOptions(), nullptr));
37 ASSERT_OK(f->Close(IOOptions(), nullptr));
38 }
39
Read(const std::string & fname,const FileOptions & opts,std::unique_ptr<RandomAccessFileReader> * reader)40 void Read(const std::string& fname, const FileOptions& opts,
41 std::unique_ptr<RandomAccessFileReader>* reader) {
42 std::string fpath = Path(fname);
43 std::unique_ptr<FSRandomAccessFile> f;
44 ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr));
45 reader->reset(new RandomAccessFileReader(std::move(f), fpath,
46 env_->GetSystemClock().get()));
47 }
48
AssertResult(const std::string & content,const std::vector<FSReadRequest> & reqs)49 void AssertResult(const std::string& content,
50 const std::vector<FSReadRequest>& reqs) {
51 for (const auto& r : reqs) {
52 ASSERT_OK(r.status);
53 ASSERT_EQ(r.len, r.result.size());
54 ASSERT_EQ(content.substr(r.offset, r.len), r.result.ToString());
55 }
56 }
57
58 private:
59 Env* env_;
60 std::shared_ptr<FileSystem> fs_;
61 std::string test_dir_;
62
Path(const std::string & fname)63 std::string Path(const std::string& fname) {
64 return test_dir_ + "/" + fname;
65 }
66 };
67
68 // Skip the following tests in lite mode since direct I/O is unsupported.
69 #ifndef ROCKSDB_LITE
70
TEST_F(RandomAccessFileReaderTest,ReadDirectIO)71 TEST_F(RandomAccessFileReaderTest, ReadDirectIO) {
72 std::string fname = "read-direct-io";
73 Random rand(0);
74 std::string content = rand.RandomString(kDefaultPageSize);
75 Write(fname, content);
76
77 FileOptions opts;
78 opts.use_direct_reads = true;
79 std::unique_ptr<RandomAccessFileReader> r;
80 Read(fname, opts, &r);
81 ASSERT_TRUE(r->use_direct_io());
82
83 const size_t page_size = r->file()->GetRequiredBufferAlignment();
84 size_t offset = page_size / 2;
85 size_t len = page_size / 3;
86 Slice result;
87 AlignedBuf buf;
88 for (bool for_compaction : {true, false}) {
89 ASSERT_OK(r->Read(IOOptions(), offset, len, &result, nullptr, &buf,
90 for_compaction));
91 ASSERT_EQ(result.ToString(), content.substr(offset, len));
92 }
93 }
94
TEST_F(RandomAccessFileReaderTest,MultiReadDirectIO)95 TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
96 std::vector<FSReadRequest> aligned_reqs;
97 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
98 "RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* reqs) {
99 // Copy reqs, since it's allocated on stack inside MultiRead, which will
100 // be deallocated after MultiRead returns.
101 aligned_reqs = *reinterpret_cast<std::vector<FSReadRequest>*>(reqs);
102 });
103 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
104
105 // Creates a file with 3 pages.
106 std::string fname = "multi-read-direct-io";
107 Random rand(0);
108 std::string content = rand.RandomString(3 * kDefaultPageSize);
109 Write(fname, content);
110
111 FileOptions opts;
112 opts.use_direct_reads = true;
113 std::unique_ptr<RandomAccessFileReader> r;
114 Read(fname, opts, &r);
115 ASSERT_TRUE(r->use_direct_io());
116
117 const size_t page_size = r->file()->GetRequiredBufferAlignment();
118
119 {
120 // Reads 2 blocks in the 1st page.
121 // The results should be SharedSlices of the same underlying buffer.
122 //
123 // Illustration (each x is a 1/4 page)
124 // First page: xxxx
125 // 1st block: x
126 // 2nd block: xx
127 FSReadRequest r0;
128 r0.offset = 0;
129 r0.len = page_size / 4;
130 r0.scratch = nullptr;
131
132 FSReadRequest r1;
133 r1.offset = page_size / 2;
134 r1.len = page_size / 2;
135 r1.scratch = nullptr;
136
137 std::vector<FSReadRequest> reqs;
138 reqs.push_back(std::move(r0));
139 reqs.push_back(std::move(r1));
140 AlignedBuf aligned_buf;
141 ASSERT_OK(
142 r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));
143
144 AssertResult(content, reqs);
145
146 // Reads the first page internally.
147 ASSERT_EQ(aligned_reqs.size(), 1);
148 const FSReadRequest& aligned_r = aligned_reqs[0];
149 ASSERT_OK(aligned_r.status);
150 ASSERT_EQ(aligned_r.offset, 0);
151 ASSERT_EQ(aligned_r.len, page_size);
152 }
153
154 {
155 // Reads 3 blocks:
156 // 1st block in the 1st page;
157 // 2nd block from the middle of the 1st page to the middle of the 2nd page;
158 // 3rd block in the 2nd page.
159 // The results should be SharedSlices of the same underlying buffer.
160 //
161 // Illustration (each x is a 1/4 page)
162 // 2 pages: xxxxxxxx
163 // 1st block: x
164 // 2nd block: xxxx
165 // 3rd block: x
166 FSReadRequest r0;
167 r0.offset = 0;
168 r0.len = page_size / 4;
169 r0.scratch = nullptr;
170
171 FSReadRequest r1;
172 r1.offset = page_size / 2;
173 r1.len = page_size;
174 r1.scratch = nullptr;
175
176 FSReadRequest r2;
177 r2.offset = 2 * page_size - page_size / 4;
178 r2.len = page_size / 4;
179 r2.scratch = nullptr;
180
181 std::vector<FSReadRequest> reqs;
182 reqs.push_back(std::move(r0));
183 reqs.push_back(std::move(r1));
184 reqs.push_back(std::move(r2));
185 AlignedBuf aligned_buf;
186 ASSERT_OK(
187 r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));
188
189 AssertResult(content, reqs);
190
191 // Reads the first two pages in one request internally.
192 ASSERT_EQ(aligned_reqs.size(), 1);
193 const FSReadRequest& aligned_r = aligned_reqs[0];
194 ASSERT_OK(aligned_r.status);
195 ASSERT_EQ(aligned_r.offset, 0);
196 ASSERT_EQ(aligned_r.len, 2 * page_size);
197 }
198
199 {
200 // Reads 3 blocks:
201 // 1st block in the middle of the 1st page;
202 // 2nd block in the middle of the 2nd page;
203 // 3rd block in the middle of the 3rd page.
204 // The results should be SharedSlices of the same underlying buffer.
205 //
206 // Illustration (each x is a 1/4 page)
207 // 3 pages: xxxxxxxxxxxx
208 // 1st block: xx
209 // 2nd block: xx
210 // 3rd block: xx
211 FSReadRequest r0;
212 r0.offset = page_size / 4;
213 r0.len = page_size / 2;
214 r0.scratch = nullptr;
215
216 FSReadRequest r1;
217 r1.offset = page_size + page_size / 4;
218 r1.len = page_size / 2;
219 r1.scratch = nullptr;
220
221 FSReadRequest r2;
222 r2.offset = 2 * page_size + page_size / 4;
223 r2.len = page_size / 2;
224 r2.scratch = nullptr;
225
226 std::vector<FSReadRequest> reqs;
227 reqs.push_back(std::move(r0));
228 reqs.push_back(std::move(r1));
229 reqs.push_back(std::move(r2));
230 AlignedBuf aligned_buf;
231 ASSERT_OK(
232 r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));
233
234 AssertResult(content, reqs);
235
236 // Reads the first 3 pages in one request internally.
237 ASSERT_EQ(aligned_reqs.size(), 1);
238 const FSReadRequest& aligned_r = aligned_reqs[0];
239 ASSERT_OK(aligned_r.status);
240 ASSERT_EQ(aligned_r.offset, 0);
241 ASSERT_EQ(aligned_r.len, 3 * page_size);
242 }
243
244 {
245 // Reads 2 blocks:
246 // 1st block in the middle of the 1st page;
247 // 2nd block in the middle of the 3rd page.
248 // The results are two different buffers.
249 //
250 // Illustration (each x is a 1/4 page)
251 // 3 pages: xxxxxxxxxxxx
252 // 1st block: xx
253 // 2nd block: xx
254 FSReadRequest r0;
255 r0.offset = page_size / 4;
256 r0.len = page_size / 2;
257 r0.scratch = nullptr;
258
259 FSReadRequest r1;
260 r1.offset = 2 * page_size + page_size / 4;
261 r1.len = page_size / 2;
262 r1.scratch = nullptr;
263
264 std::vector<FSReadRequest> reqs;
265 reqs.push_back(std::move(r0));
266 reqs.push_back(std::move(r1));
267 AlignedBuf aligned_buf;
268 ASSERT_OK(
269 r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));
270
271 AssertResult(content, reqs);
272
273 // Reads the 1st and 3rd pages in two requests internally.
274 ASSERT_EQ(aligned_reqs.size(), 2);
275 const FSReadRequest& aligned_r0 = aligned_reqs[0];
276 const FSReadRequest& aligned_r1 = aligned_reqs[1];
277 ASSERT_OK(aligned_r0.status);
278 ASSERT_EQ(aligned_r0.offset, 0);
279 ASSERT_EQ(aligned_r0.len, page_size);
280 ASSERT_OK(aligned_r1.status);
281 ASSERT_EQ(aligned_r1.offset, 2 * page_size);
282 ASSERT_EQ(aligned_r1.len, page_size);
283 }
284
285 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
286 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
287 }
288
289 #endif // ROCKSDB_LITE
290
TEST(FSReadRequest,Align)291 TEST(FSReadRequest, Align) {
292 FSReadRequest r;
293 r.offset = 2000;
294 r.len = 2000;
295 r.scratch = nullptr;
296 ASSERT_OK(r.status);
297
298 FSReadRequest aligned_r = Align(r, 1024);
299 ASSERT_OK(r.status);
300 ASSERT_OK(aligned_r.status);
301 ASSERT_EQ(aligned_r.offset, 1024);
302 ASSERT_EQ(aligned_r.len, 3072);
303 }
304
TEST(FSReadRequest,TryMerge)305 TEST(FSReadRequest, TryMerge) {
306 // reverse means merging dest into src.
307 for (bool reverse : {true, false}) {
308 {
309 // dest: [ ]
310 // src: [ ]
311 FSReadRequest dest;
312 dest.offset = 0;
313 dest.len = 10;
314 dest.scratch = nullptr;
315 ASSERT_OK(dest.status);
316
317 FSReadRequest src;
318 src.offset = 15;
319 src.len = 10;
320 src.scratch = nullptr;
321 ASSERT_OK(src.status);
322
323 if (reverse) {
324 std::swap(dest, src);
325 }
326 ASSERT_FALSE(TryMerge(&dest, src));
327 ASSERT_OK(dest.status);
328 ASSERT_OK(src.status);
329 }
330
331 {
332 // dest: [ ]
333 // src: [ ]
334 FSReadRequest dest;
335 dest.offset = 0;
336 dest.len = 10;
337 dest.scratch = nullptr;
338 ASSERT_OK(dest.status);
339
340 FSReadRequest src;
341 src.offset = 10;
342 src.len = 10;
343 src.scratch = nullptr;
344 ASSERT_OK(src.status);
345
346 if (reverse) {
347 std::swap(dest, src);
348 }
349 ASSERT_TRUE(TryMerge(&dest, src));
350 ASSERT_EQ(dest.offset, 0);
351 ASSERT_EQ(dest.len, 20);
352 ASSERT_OK(dest.status);
353 ASSERT_OK(src.status);
354 }
355
356 {
357 // dest: [ ]
358 // src: [ ]
359 FSReadRequest dest;
360 dest.offset = 0;
361 dest.len = 10;
362 dest.scratch = nullptr;
363 ASSERT_OK(dest.status);
364
365 FSReadRequest src;
366 src.offset = 5;
367 src.len = 10;
368 src.scratch = nullptr;
369 ASSERT_OK(src.status);
370
371 if (reverse) {
372 std::swap(dest, src);
373 }
374 ASSERT_TRUE(TryMerge(&dest, src));
375 ASSERT_EQ(dest.offset, 0);
376 ASSERT_EQ(dest.len, 15);
377 ASSERT_OK(dest.status);
378 ASSERT_OK(src.status);
379 }
380
381 {
382 // dest: [ ]
383 // src: [ ]
384 FSReadRequest dest;
385 dest.offset = 0;
386 dest.len = 10;
387 dest.scratch = nullptr;
388 ASSERT_OK(dest.status);
389
390 FSReadRequest src;
391 src.offset = 5;
392 src.len = 5;
393 src.scratch = nullptr;
394 ASSERT_OK(src.status);
395
396 if (reverse) {
397 std::swap(dest, src);
398 }
399 ASSERT_TRUE(TryMerge(&dest, src));
400 ASSERT_EQ(dest.offset, 0);
401 ASSERT_EQ(dest.len, 10);
402 ASSERT_OK(dest.status);
403 ASSERT_OK(src.status);
404 }
405
406 {
407 // dest: [ ]
408 // src: [ ]
409 FSReadRequest dest;
410 dest.offset = 0;
411 dest.len = 10;
412 dest.scratch = nullptr;
413 ASSERT_OK(dest.status);
414
415 FSReadRequest src;
416 src.offset = 5;
417 src.len = 1;
418 src.scratch = nullptr;
419 ASSERT_OK(src.status);
420
421 if (reverse) std::swap(dest, src);
422 ASSERT_TRUE(TryMerge(&dest, src));
423 ASSERT_EQ(dest.offset, 0);
424 ASSERT_EQ(dest.len, 10);
425 ASSERT_OK(dest.status);
426 ASSERT_OK(src.status);
427 }
428
429 {
430 // dest: [ ]
431 // src: [ ]
432 FSReadRequest dest;
433 dest.offset = 0;
434 dest.len = 10;
435 dest.scratch = nullptr;
436 ASSERT_OK(dest.status);
437
438 FSReadRequest src;
439 src.offset = 0;
440 src.len = 10;
441 src.scratch = nullptr;
442 ASSERT_OK(src.status);
443
444 if (reverse) std::swap(dest, src);
445 ASSERT_TRUE(TryMerge(&dest, src));
446 ASSERT_EQ(dest.offset, 0);
447 ASSERT_EQ(dest.len, 10);
448 ASSERT_OK(dest.status);
449 ASSERT_OK(src.status);
450 }
451
452 {
453 // dest: [ ]
454 // src: [ ]
455 FSReadRequest dest;
456 dest.offset = 0;
457 dest.len = 10;
458 dest.scratch = nullptr;
459 ASSERT_OK(dest.status);
460
461 FSReadRequest src;
462 src.offset = 0;
463 src.len = 5;
464 src.scratch = nullptr;
465 ASSERT_OK(src.status);
466
467 if (reverse) std::swap(dest, src);
468 ASSERT_TRUE(TryMerge(&dest, src));
469 ASSERT_EQ(dest.offset, 0);
470 ASSERT_EQ(dest.len, 10);
471 ASSERT_OK(dest.status);
472 ASSERT_OK(src.status);
473 }
474 }
475 }
476
477 } // namespace ROCKSDB_NAMESPACE
478
main(int argc,char ** argv)479 int main(int argc, char** argv) {
480 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
481 ::testing::InitGoogleTest(&argc, argv);
482 return RUN_ALL_TESTS();
483 }
484