1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/db_test_util.h"
7 #include "test_util/sync_point.h"
8
9 namespace ROCKSDB_NAMESPACE {
10
11 class MockFS;
12
13 class MockRandomAccessFile : public FSRandomAccessFileWrapper {
14 public:
MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile> & file,bool support_prefetch,std::atomic_int & prefetch_count)15 MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file,
16 bool support_prefetch, std::atomic_int& prefetch_count)
17 : FSRandomAccessFileWrapper(file.get()),
18 file_(std::move(file)),
19 support_prefetch_(support_prefetch),
20 prefetch_count_(prefetch_count) {}
21
Prefetch(uint64_t offset,size_t n,const IOOptions & options,IODebugContext * dbg)22 IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
23 IODebugContext* dbg) override {
24 if (support_prefetch_) {
25 prefetch_count_.fetch_add(1);
26 return target()->Prefetch(offset, n, options, dbg);
27 } else {
28 return IOStatus::NotSupported("Prefetch not supported");
29 }
30 }
31
32 private:
33 std::unique_ptr<FSRandomAccessFile> file_;
34 const bool support_prefetch_;
35 std::atomic_int& prefetch_count_;
36 };
37
38 class MockFS : public FileSystemWrapper {
39 public:
MockFS(const std::shared_ptr<FileSystem> & wrapped,bool support_prefetch)40 explicit MockFS(const std::shared_ptr<FileSystem>& wrapped,
41 bool support_prefetch)
42 : FileSystemWrapper(wrapped), support_prefetch_(support_prefetch) {}
43
NewRandomAccessFile(const std::string & fname,const FileOptions & opts,std::unique_ptr<FSRandomAccessFile> * result,IODebugContext * dbg)44 IOStatus NewRandomAccessFile(const std::string& fname,
45 const FileOptions& opts,
46 std::unique_ptr<FSRandomAccessFile>* result,
47 IODebugContext* dbg) override {
48 std::unique_ptr<FSRandomAccessFile> file;
49 IOStatus s;
50 s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
51 result->reset(
52 new MockRandomAccessFile(file, support_prefetch_, prefetch_count_));
53 return s;
54 }
55
ClearPrefetchCount()56 void ClearPrefetchCount() { prefetch_count_ = 0; }
57
IsPrefetchCalled()58 bool IsPrefetchCalled() { return prefetch_count_ > 0; }
59
GetPrefetchCount()60 int GetPrefetchCount() {
61 return prefetch_count_.load(std::memory_order_relaxed);
62 }
63
64 private:
65 const bool support_prefetch_;
66 std::atomic_int prefetch_count_{0};
67 };
68
69 class PrefetchTest
70 : public DBTestBase,
71 public ::testing::WithParamInterface<std::tuple<bool, bool>> {
72 public:
PrefetchTest()73 PrefetchTest() : DBTestBase("/prefetch_test", true) {}
74 };
75
76 INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
77 ::testing::Combine(::testing::Bool(),
78 ::testing::Bool()));
79
BuildKey(int num,std::string postfix="")80 std::string BuildKey(int num, std::string postfix = "") {
81 return "my_key_" + std::to_string(num) + postfix;
82 }
83
TEST_P(PrefetchTest,Basic)84 TEST_P(PrefetchTest, Basic) {
85 // First param is if the mockFS support_prefetch or not
86 bool support_prefetch =
87 std::get<0>(GetParam()) &&
88 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
89
90 // Second param is if directIO is enabled or not
91 bool use_direct_io = std::get<1>(GetParam());
92 const int kNumKeys = 1100;
93 std::shared_ptr<MockFS> fs =
94 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
95 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
96 Options options = CurrentOptions();
97 options.write_buffer_size = 1024;
98 options.create_if_missing = true;
99 options.compression = kNoCompression;
100 options.env = env.get();
101 if (use_direct_io) {
102 options.use_direct_reads = true;
103 options.use_direct_io_for_flush_and_compaction = true;
104 }
105
106 int buff_prefetch_count = 0;
107 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
108 [&](void*) { buff_prefetch_count++; });
109 SyncPoint::GetInstance()->EnableProcessing();
110
111 Status s = TryReopen(options);
112 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
113 // If direct IO is not supported, skip the test
114 return;
115 } else {
116 ASSERT_OK(s);
117 }
118
119 // create first key range
120 WriteBatch batch;
121 for (int i = 0; i < kNumKeys; i++) {
122 ASSERT_OK(batch.Put(BuildKey(i), "value for range 1 key"));
123 }
124 ASSERT_OK(db_->Write(WriteOptions(), &batch));
125
126 // create second key range
127 batch.Clear();
128 for (int i = 0; i < kNumKeys; i++) {
129 ASSERT_OK(batch.Put(BuildKey(i, "key2"), "value for range 2 key"));
130 }
131 ASSERT_OK(db_->Write(WriteOptions(), &batch));
132
133 // delete second key range
134 batch.Clear();
135 for (int i = 0; i < kNumKeys; i++) {
136 ASSERT_OK(batch.Delete(BuildKey(i, "key2")));
137 }
138 ASSERT_OK(db_->Write(WriteOptions(), &batch));
139
140 // compact database
141 std::string start_key = BuildKey(0);
142 std::string end_key = BuildKey(kNumKeys - 1);
143 Slice least(start_key.data(), start_key.size());
144 Slice greatest(end_key.data(), end_key.size());
145
146 // commenting out the line below causes the example to work correctly
147 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
148
149 if (support_prefetch && !use_direct_io) {
150 // If underline file system supports prefetch, and directIO is not enabled
151 // make sure prefetch() is called and FilePrefetchBuffer is not used.
152 ASSERT_TRUE(fs->IsPrefetchCalled());
153 fs->ClearPrefetchCount();
154 ASSERT_EQ(0, buff_prefetch_count);
155 } else {
156 // If underline file system doesn't support prefetch, or directIO is
157 // enabled, make sure prefetch() is not called and FilePrefetchBuffer is
158 // used.
159 ASSERT_FALSE(fs->IsPrefetchCalled());
160 ASSERT_GT(buff_prefetch_count, 0);
161 buff_prefetch_count = 0;
162 }
163
164 // count the keys
165 {
166 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
167 int num_keys = 0;
168 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
169 num_keys++;
170 }
171 }
172
173 // Make sure prefetch is called only if file system support prefetch.
174 if (support_prefetch && !use_direct_io) {
175 ASSERT_TRUE(fs->IsPrefetchCalled());
176 fs->ClearPrefetchCount();
177 ASSERT_EQ(0, buff_prefetch_count);
178 } else {
179 ASSERT_FALSE(fs->IsPrefetchCalled());
180 ASSERT_GT(buff_prefetch_count, 0);
181 buff_prefetch_count = 0;
182 }
183 Close();
184 }
185
186 #ifndef ROCKSDB_LITE
TEST_P(PrefetchTest,ConfigureAutoMaxReadaheadSize)187 TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
188 // First param is if the mockFS support_prefetch or not
189 bool support_prefetch =
190 std::get<0>(GetParam()) &&
191 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
192
193 // Second param is if directIO is enabled or not
194 bool use_direct_io = std::get<1>(GetParam());
195
196 std::shared_ptr<MockFS> fs =
197 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
198 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
199
200 Options options = CurrentOptions();
201 options.write_buffer_size = 1024;
202 options.create_if_missing = true;
203 options.compression = kNoCompression;
204 options.env = env.get();
205 options.disable_auto_compactions = true;
206 if (use_direct_io) {
207 options.use_direct_reads = true;
208 options.use_direct_io_for_flush_and_compaction = true;
209 }
210 BlockBasedTableOptions table_options;
211 table_options.no_block_cache = true;
212 table_options.cache_index_and_filter_blocks = false;
213 table_options.metadata_block_size = 1024;
214 table_options.index_type =
215 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
216 table_options.max_auto_readahead_size = 0;
217 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
218
219 int buff_prefetch_count = 0;
220 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
221 [&](void*) { buff_prefetch_count++; });
222
223 // DB open will create table readers unless we reduce the table cache
224 // capacity. SanitizeOptions will set max_open_files to minimum of 20. Table
225 // cache is allocated with max_open_files - 10 as capacity. So override
226 // max_open_files to 10 so table cache capacity will become 0. This will
227 // prevent file open during DB open and force the file to be opened during
228 // Iteration.
229 SyncPoint::GetInstance()->SetCallBack(
230 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
231 int* max_open_files = (int*)arg;
232 *max_open_files = 11;
233 });
234
235 SyncPoint::GetInstance()->EnableProcessing();
236
237 Status s = TryReopen(options);
238
239 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
240 // If direct IO is not supported, skip the test
241 return;
242 } else {
243 ASSERT_OK(s);
244 }
245
246 Random rnd(309);
247 int key_count = 0;
248 const int num_keys_per_level = 100;
249 // Level 0 : Keys in range [0, 99], Level 1:[100, 199], Level 2:[200, 299].
250 for (int level = 2; level >= 0; level--) {
251 key_count = level * num_keys_per_level;
252 for (int i = 0; i < num_keys_per_level; ++i) {
253 ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500)));
254 }
255 ASSERT_OK(Flush());
256 MoveFilesToLevel(level);
257 }
258 Close();
259 std::vector<int> buff_prefectch_level_count = {0, 0, 0};
260 TryReopen(options);
261 {
262 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
263 fs->ClearPrefetchCount();
264 buff_prefetch_count = 0;
265
266 for (int level = 2; level >= 0; level--) {
267 key_count = level * num_keys_per_level;
268 switch (level) {
269 case 0:
270 // max_auto_readahead_size is set 0 so data and index blocks are not
271 // prefetched.
272 ASSERT_OK(db_->SetOptions(
273 {{"block_based_table_factory", "{max_auto_readahead_size=0;}"}}));
274 break;
275 case 1:
276 // max_auto_readahead_size is set less than
277 // BlockBasedTable::kInitAutoReadaheadSize. So readahead_size remains
278 // equal to max_auto_readahead_size.
279 ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
280 "{max_auto_readahead_size=4096;}"}}));
281 break;
282 case 2:
283 ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
284 "{max_auto_readahead_size=65536;}"}}));
285 break;
286 default:
287 assert(false);
288 }
289
290 for (int i = 0; i < num_keys_per_level; ++i) {
291 iter->Seek(Key(key_count++));
292 iter->Next();
293 }
294
295 buff_prefectch_level_count[level] = buff_prefetch_count;
296 if (support_prefetch && !use_direct_io) {
297 if (level == 0) {
298 ASSERT_FALSE(fs->IsPrefetchCalled());
299 } else {
300 ASSERT_TRUE(fs->IsPrefetchCalled());
301 }
302 fs->ClearPrefetchCount();
303 } else {
304 ASSERT_FALSE(fs->IsPrefetchCalled());
305 if (level == 0) {
306 ASSERT_EQ(buff_prefetch_count, 0);
307 } else {
308 ASSERT_GT(buff_prefetch_count, 0);
309 }
310 buff_prefetch_count = 0;
311 }
312 }
313 }
314
315 if (!support_prefetch) {
316 ASSERT_GT(buff_prefectch_level_count[1], buff_prefectch_level_count[2]);
317 }
318
319 SyncPoint::GetInstance()->DisableProcessing();
320 SyncPoint::GetInstance()->ClearAllCallBacks();
321 Close();
322 }
323 #endif // !ROCKSDB_LITE
324
TEST_P(PrefetchTest,PrefetchWhenReseek)325 TEST_P(PrefetchTest, PrefetchWhenReseek) {
326 // First param is if the mockFS support_prefetch or not
327 bool support_prefetch =
328 std::get<0>(GetParam()) &&
329 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
330
331 const int kNumKeys = 2000;
332 std::shared_ptr<MockFS> fs =
333 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
334 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
335
336 // Second param is if directIO is enabled or not
337 bool use_direct_io = std::get<1>(GetParam());
338
339 Options options = CurrentOptions();
340 options.write_buffer_size = 1024;
341 options.create_if_missing = true;
342 options.compression = kNoCompression;
343 options.env = env.get();
344
345 BlockBasedTableOptions table_options;
346 table_options.no_block_cache = true;
347 table_options.cache_index_and_filter_blocks = false;
348 table_options.metadata_block_size = 1024;
349 table_options.index_type =
350 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
351 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
352
353 if (use_direct_io) {
354 options.use_direct_reads = true;
355 options.use_direct_io_for_flush_and_compaction = true;
356 }
357
358 int buff_prefetch_count = 0;
359 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
360 [&](void*) { buff_prefetch_count++; });
361 SyncPoint::GetInstance()->EnableProcessing();
362
363 Status s = TryReopen(options);
364 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
365 // If direct IO is not supported, skip the test
366 return;
367 } else {
368 ASSERT_OK(s);
369 }
370
371 WriteBatch batch;
372 Random rnd(309);
373 for (int i = 0; i < kNumKeys; i++) {
374 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
375 }
376 ASSERT_OK(db_->Write(WriteOptions(), &batch));
377
378 std::string start_key = BuildKey(0);
379 std::string end_key = BuildKey(kNumKeys - 1);
380 Slice least(start_key.data(), start_key.size());
381 Slice greatest(end_key.data(), end_key.size());
382
383 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
384
385 fs->ClearPrefetchCount();
386 buff_prefetch_count = 0;
387
388 {
389 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
390 /*
391 * Reseek keys from sequential Data Blocks within same partitioned
392 * index. After 2 sequential reads it will prefetch the data block.
393 * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more
394 * initially (2 more data blocks).
395 */
396 iter->Seek(BuildKey(0));
397 iter->Seek(BuildKey(1000));
398 iter->Seek(BuildKey(1004)); // Prefetch Data
399 iter->Seek(BuildKey(1008));
400 iter->Seek(BuildKey(1011));
401 iter->Seek(BuildKey(1015)); // Prefetch Data
402 iter->Seek(BuildKey(1019));
403 // Missed 2 blocks but they are already in buffer so no reset.
404 iter->Seek(BuildKey(103)); // Already in buffer.
405 iter->Seek(BuildKey(1033)); // Prefetch Data
406 if (support_prefetch && !use_direct_io) {
407 ASSERT_EQ(fs->GetPrefetchCount(), 3);
408 fs->ClearPrefetchCount();
409 } else {
410 ASSERT_EQ(buff_prefetch_count, 3);
411 buff_prefetch_count = 0;
412 }
413 }
414 {
415 /*
416 * Reseek keys from non sequential data blocks within same partitioned
417 * index. buff_prefetch_count will be 0 in that case.
418 */
419 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
420 iter->Seek(BuildKey(0));
421 iter->Seek(BuildKey(1008));
422 iter->Seek(BuildKey(1019));
423 iter->Seek(BuildKey(1033));
424 iter->Seek(BuildKey(1048));
425 if (support_prefetch && !use_direct_io) {
426 ASSERT_EQ(fs->GetPrefetchCount(), 0);
427 fs->ClearPrefetchCount();
428 } else {
429 ASSERT_EQ(buff_prefetch_count, 0);
430 buff_prefetch_count = 0;
431 }
432 }
433 {
434 /*
435 * Reesek keys from Single Data Block.
436 */
437 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
438 iter->Seek(BuildKey(0));
439 iter->Seek(BuildKey(1));
440 iter->Seek(BuildKey(10));
441 iter->Seek(BuildKey(100));
442 if (support_prefetch && !use_direct_io) {
443 ASSERT_EQ(fs->GetPrefetchCount(), 0);
444 fs->ClearPrefetchCount();
445 } else {
446 ASSERT_EQ(buff_prefetch_count, 0);
447 buff_prefetch_count = 0;
448 }
449 }
450 {
451 /*
452 * Reseek keys from sequential data blocks to set implicit auto readahead
453 * and prefetch data but after that iterate over different (non sequential)
454 * data blocks which won't prefetch any data further. So buff_prefetch_count
455 * will be 1 for the first one.
456 */
457 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
458 iter->Seek(BuildKey(0));
459 iter->Seek(BuildKey(1000));
460 iter->Seek(BuildKey(1004)); // This iteration will prefetch buffer
461 iter->Seek(BuildKey(1008));
462 iter->Seek(
463 BuildKey(996)); // Reseek won't prefetch any data and
464 // readahead_size will be initiallized to 8*1024.
465 iter->Seek(BuildKey(992));
466 iter->Seek(BuildKey(989));
467 if (support_prefetch && !use_direct_io) {
468 ASSERT_EQ(fs->GetPrefetchCount(), 1);
469 fs->ClearPrefetchCount();
470 } else {
471 ASSERT_EQ(buff_prefetch_count, 1);
472 buff_prefetch_count = 0;
473 }
474
475 // Read sequentially to confirm readahead_size is reset to initial value (2
476 // more data blocks)
477 iter->Seek(BuildKey(1011));
478 iter->Seek(BuildKey(1015));
479 iter->Seek(BuildKey(1019)); // Prefetch Data
480 iter->Seek(BuildKey(1022));
481 iter->Seek(BuildKey(1026));
482 iter->Seek(BuildKey(103)); // Prefetch Data
483 if (support_prefetch && !use_direct_io) {
484 ASSERT_EQ(fs->GetPrefetchCount(), 2);
485 fs->ClearPrefetchCount();
486 } else {
487 ASSERT_EQ(buff_prefetch_count, 2);
488 buff_prefetch_count = 0;
489 }
490 }
491 {
492 /* Reseek keys from sequential partitioned index block. Since partitioned
493 * index fetch are sequential, buff_prefetch_count will be 1.
494 */
495 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
496 iter->Seek(BuildKey(0));
497 iter->Seek(BuildKey(1167));
498 iter->Seek(BuildKey(1334)); // This iteration will prefetch buffer
499 iter->Seek(BuildKey(1499));
500 iter->Seek(BuildKey(1667));
501 iter->Seek(BuildKey(1847));
502 iter->Seek(BuildKey(1999));
503 if (support_prefetch && !use_direct_io) {
504 ASSERT_EQ(fs->GetPrefetchCount(), 1);
505 fs->ClearPrefetchCount();
506 } else {
507 ASSERT_EQ(buff_prefetch_count, 1);
508 buff_prefetch_count = 0;
509 }
510 }
511 {
512 /*
513 * Reseek over different keys from different blocks. buff_prefetch_count is
514 * set 0.
515 */
516 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
517 int i = 0;
518 int j = 1000;
519 do {
520 iter->Seek(BuildKey(i));
521 if (!iter->Valid()) {
522 break;
523 }
524 i = i + 100;
525 iter->Seek(BuildKey(j));
526 j = j + 100;
527 } while (i < 1000 && j < kNumKeys && iter->Valid());
528 if (support_prefetch && !use_direct_io) {
529 ASSERT_EQ(fs->GetPrefetchCount(), 0);
530 fs->ClearPrefetchCount();
531 } else {
532 ASSERT_EQ(buff_prefetch_count, 0);
533 buff_prefetch_count = 0;
534 }
535 }
536 {
537 /* Iterates sequentially over all keys. It will prefetch the buffer.*/
538 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
539 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
540 }
541 if (support_prefetch && !use_direct_io) {
542 ASSERT_EQ(fs->GetPrefetchCount(), 13);
543 fs->ClearPrefetchCount();
544 } else {
545 ASSERT_EQ(buff_prefetch_count, 13);
546 buff_prefetch_count = 0;
547 }
548 }
549
550 SyncPoint::GetInstance()->DisableProcessing();
551 SyncPoint::GetInstance()->ClearAllCallBacks();
552 Close();
553 }
554
TEST_P(PrefetchTest,PrefetchWhenReseekwithCache)555 TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
556 // First param is if the mockFS support_prefetch or not
557 bool support_prefetch =
558 std::get<0>(GetParam()) &&
559 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
560
561 const int kNumKeys = 2000;
562 std::shared_ptr<MockFS> fs =
563 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
564 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
565
566 // Second param is if directIO is enabled or not
567 bool use_direct_io = std::get<1>(GetParam());
568
569 Options options = CurrentOptions();
570 options.write_buffer_size = 1024;
571 options.create_if_missing = true;
572 options.compression = kNoCompression;
573 options.env = env.get();
574
575 BlockBasedTableOptions table_options;
576 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
577 table_options.block_cache = cache;
578 table_options.cache_index_and_filter_blocks = false;
579 table_options.metadata_block_size = 1024;
580 table_options.index_type =
581 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
582 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
583
584 if (use_direct_io) {
585 options.use_direct_reads = true;
586 options.use_direct_io_for_flush_and_compaction = true;
587 }
588
589 int buff_prefetch_count = 0;
590 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
591 [&](void*) { buff_prefetch_count++; });
592 SyncPoint::GetInstance()->EnableProcessing();
593
594 Status s = TryReopen(options);
595 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
596 // If direct IO is not supported, skip the test
597 return;
598 } else {
599 ASSERT_OK(s);
600 }
601
602 WriteBatch batch;
603 Random rnd(309);
604 for (int i = 0; i < kNumKeys; i++) {
605 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
606 }
607 ASSERT_OK(db_->Write(WriteOptions(), &batch));
608
609 std::string start_key = BuildKey(0);
610 std::string end_key = BuildKey(kNumKeys - 1);
611 Slice least(start_key.data(), start_key.size());
612 Slice greatest(end_key.data(), end_key.size());
613
614 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
615
616 fs->ClearPrefetchCount();
617 buff_prefetch_count = 0;
618
619 {
620 /*
621 * Reseek keys from sequential Data Blocks within same partitioned
622 * index. After 2 sequential reads it will prefetch the data block.
623 * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more
624 * initially (2 more data blocks).
625 */
626 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
627 // Warm up the cache
628 iter->Seek(BuildKey(1011));
629 iter->Seek(BuildKey(1015));
630 iter->Seek(BuildKey(1019));
631 if (support_prefetch && !use_direct_io) {
632 ASSERT_EQ(fs->GetPrefetchCount(), 1);
633 fs->ClearPrefetchCount();
634 } else {
635 ASSERT_EQ(buff_prefetch_count, 1);
636 buff_prefetch_count = 0;
637 }
638 }
639 {
640 // After caching, blocks will be read from cache (Sequential blocks)
641 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
642 iter->Seek(BuildKey(0));
643 iter->Seek(BuildKey(1000));
644 iter->Seek(BuildKey(1004)); // Prefetch data (not in cache).
645 // Missed one sequential block but next is in already in buffer so readahead
646 // will not be reset.
647 iter->Seek(BuildKey(1011));
648 // Prefetch data but blocks are in cache so no prefetch and reset.
649 iter->Seek(BuildKey(1015));
650 iter->Seek(BuildKey(1019));
651 iter->Seek(BuildKey(1022));
652 // Prefetch data with readahead_size = 4 blocks.
653 iter->Seek(BuildKey(1026));
654 iter->Seek(BuildKey(103));
655 iter->Seek(BuildKey(1033));
656 iter->Seek(BuildKey(1037));
657
658 if (support_prefetch && !use_direct_io) {
659 ASSERT_EQ(fs->GetPrefetchCount(), 3);
660 fs->ClearPrefetchCount();
661 } else {
662 ASSERT_EQ(buff_prefetch_count, 2);
663 buff_prefetch_count = 0;
664 }
665 }
666
667 SyncPoint::GetInstance()->DisableProcessing();
668 SyncPoint::GetInstance()->ClearAllCallBacks();
669 Close();
670 }
671
672 } // namespace ROCKSDB_NAMESPACE
673
main(int argc,char ** argv)674 int main(int argc, char** argv) {
675 ::testing::InitGoogleTest(&argc, argv);
676
677 return RUN_ALL_TESTS();
678 }
679