1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/db_test_util.h"
7 #include "test_util/sync_point.h"
8 
9 namespace ROCKSDB_NAMESPACE {
10 
11 class MockFS;
12 
13 class MockRandomAccessFile : public FSRandomAccessFileWrapper {
14  public:
MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile> & file,bool support_prefetch,std::atomic_int & prefetch_count)15   MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file,
16                        bool support_prefetch, std::atomic_int& prefetch_count)
17       : FSRandomAccessFileWrapper(file.get()),
18         file_(std::move(file)),
19         support_prefetch_(support_prefetch),
20         prefetch_count_(prefetch_count) {}
21 
Prefetch(uint64_t offset,size_t n,const IOOptions & options,IODebugContext * dbg)22   IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
23                     IODebugContext* dbg) override {
24     if (support_prefetch_) {
25       prefetch_count_.fetch_add(1);
26       return target()->Prefetch(offset, n, options, dbg);
27     } else {
28       return IOStatus::NotSupported("Prefetch not supported");
29     }
30   }
31 
32  private:
33   std::unique_ptr<FSRandomAccessFile> file_;
34   const bool support_prefetch_;
35   std::atomic_int& prefetch_count_;
36 };
37 
38 class MockFS : public FileSystemWrapper {
39  public:
MockFS(const std::shared_ptr<FileSystem> & wrapped,bool support_prefetch)40   explicit MockFS(const std::shared_ptr<FileSystem>& wrapped,
41                   bool support_prefetch)
42       : FileSystemWrapper(wrapped), support_prefetch_(support_prefetch) {}
43 
NewRandomAccessFile(const std::string & fname,const FileOptions & opts,std::unique_ptr<FSRandomAccessFile> * result,IODebugContext * dbg)44   IOStatus NewRandomAccessFile(const std::string& fname,
45                                const FileOptions& opts,
46                                std::unique_ptr<FSRandomAccessFile>* result,
47                                IODebugContext* dbg) override {
48     std::unique_ptr<FSRandomAccessFile> file;
49     IOStatus s;
50     s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
51     result->reset(
52         new MockRandomAccessFile(file, support_prefetch_, prefetch_count_));
53     return s;
54   }
55 
ClearPrefetchCount()56   void ClearPrefetchCount() { prefetch_count_ = 0; }
57 
IsPrefetchCalled()58   bool IsPrefetchCalled() { return prefetch_count_ > 0; }
59 
GetPrefetchCount()60   int GetPrefetchCount() {
61     return prefetch_count_.load(std::memory_order_relaxed);
62   }
63 
64  private:
65   const bool support_prefetch_;
66   std::atomic_int prefetch_count_{0};
67 };
68 
69 class PrefetchTest
70     : public DBTestBase,
71       public ::testing::WithParamInterface<std::tuple<bool, bool>> {
72  public:
PrefetchTest()73   PrefetchTest() : DBTestBase("/prefetch_test", true) {}
74 };
75 
76 INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
77                         ::testing::Combine(::testing::Bool(),
78                                            ::testing::Bool()));
79 
BuildKey(int num,std::string postfix="")80 std::string BuildKey(int num, std::string postfix = "") {
81   return "my_key_" + std::to_string(num) + postfix;
82 }
83 
TEST_P(PrefetchTest,Basic)84 TEST_P(PrefetchTest, Basic) {
85   // First param is if the mockFS support_prefetch or not
86   bool support_prefetch =
87       std::get<0>(GetParam()) &&
88       test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
89 
90   // Second param is if directIO is enabled or not
91   bool use_direct_io = std::get<1>(GetParam());
92   const int kNumKeys = 1100;
93   std::shared_ptr<MockFS> fs =
94       std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
95   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
96   Options options = CurrentOptions();
97   options.write_buffer_size = 1024;
98   options.create_if_missing = true;
99   options.compression = kNoCompression;
100   options.env = env.get();
101   if (use_direct_io) {
102     options.use_direct_reads = true;
103     options.use_direct_io_for_flush_and_compaction = true;
104   }
105 
106   int buff_prefetch_count = 0;
107   SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
108                                         [&](void*) { buff_prefetch_count++; });
109   SyncPoint::GetInstance()->EnableProcessing();
110 
111   Status s = TryReopen(options);
112   if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
113     // If direct IO is not supported, skip the test
114     return;
115   } else {
116     ASSERT_OK(s);
117   }
118 
119   // create first key range
120   WriteBatch batch;
121   for (int i = 0; i < kNumKeys; i++) {
122     ASSERT_OK(batch.Put(BuildKey(i), "value for range 1 key"));
123   }
124   ASSERT_OK(db_->Write(WriteOptions(), &batch));
125 
126   // create second key range
127   batch.Clear();
128   for (int i = 0; i < kNumKeys; i++) {
129     ASSERT_OK(batch.Put(BuildKey(i, "key2"), "value for range 2 key"));
130   }
131   ASSERT_OK(db_->Write(WriteOptions(), &batch));
132 
133   // delete second key range
134   batch.Clear();
135   for (int i = 0; i < kNumKeys; i++) {
136     ASSERT_OK(batch.Delete(BuildKey(i, "key2")));
137   }
138   ASSERT_OK(db_->Write(WriteOptions(), &batch));
139 
140   // compact database
141   std::string start_key = BuildKey(0);
142   std::string end_key = BuildKey(kNumKeys - 1);
143   Slice least(start_key.data(), start_key.size());
144   Slice greatest(end_key.data(), end_key.size());
145 
146   // commenting out the line below causes the example to work correctly
147   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
148 
149   if (support_prefetch && !use_direct_io) {
150     // If underline file system supports prefetch, and directIO is not enabled
151     // make sure prefetch() is called and FilePrefetchBuffer is not used.
152     ASSERT_TRUE(fs->IsPrefetchCalled());
153     fs->ClearPrefetchCount();
154     ASSERT_EQ(0, buff_prefetch_count);
155   } else {
156     // If underline file system doesn't support prefetch, or directIO is
157     // enabled, make sure prefetch() is not called and FilePrefetchBuffer is
158     // used.
159     ASSERT_FALSE(fs->IsPrefetchCalled());
160     ASSERT_GT(buff_prefetch_count, 0);
161     buff_prefetch_count = 0;
162   }
163 
164   // count the keys
165   {
166     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
167     int num_keys = 0;
168     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
169       num_keys++;
170     }
171   }
172 
173   // Make sure prefetch is called only if file system support prefetch.
174   if (support_prefetch && !use_direct_io) {
175     ASSERT_TRUE(fs->IsPrefetchCalled());
176     fs->ClearPrefetchCount();
177     ASSERT_EQ(0, buff_prefetch_count);
178   } else {
179     ASSERT_FALSE(fs->IsPrefetchCalled());
180     ASSERT_GT(buff_prefetch_count, 0);
181     buff_prefetch_count = 0;
182   }
183   Close();
184 }
185 
186 #ifndef ROCKSDB_LITE
TEST_P(PrefetchTest,ConfigureAutoMaxReadaheadSize)187 TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
188   // First param is if the mockFS support_prefetch or not
189   bool support_prefetch =
190       std::get<0>(GetParam()) &&
191       test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
192 
193   // Second param is if directIO is enabled or not
194   bool use_direct_io = std::get<1>(GetParam());
195 
196   std::shared_ptr<MockFS> fs =
197       std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
198   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
199 
200   Options options = CurrentOptions();
201   options.write_buffer_size = 1024;
202   options.create_if_missing = true;
203   options.compression = kNoCompression;
204   options.env = env.get();
205   options.disable_auto_compactions = true;
206   if (use_direct_io) {
207     options.use_direct_reads = true;
208     options.use_direct_io_for_flush_and_compaction = true;
209   }
210   BlockBasedTableOptions table_options;
211   table_options.no_block_cache = true;
212   table_options.cache_index_and_filter_blocks = false;
213   table_options.metadata_block_size = 1024;
214   table_options.index_type =
215       BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
216   table_options.max_auto_readahead_size = 0;
217   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
218 
219   int buff_prefetch_count = 0;
220   SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
221                                         [&](void*) { buff_prefetch_count++; });
222 
223   // DB open will create table readers unless we reduce the table cache
224   // capacity. SanitizeOptions will set max_open_files to minimum of 20. Table
225   // cache is allocated with max_open_files - 10 as capacity. So override
226   // max_open_files to 10 so table cache capacity will become 0. This will
227   // prevent file open during DB open and force the file to be opened during
228   // Iteration.
229   SyncPoint::GetInstance()->SetCallBack(
230       "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
231         int* max_open_files = (int*)arg;
232         *max_open_files = 11;
233       });
234 
235   SyncPoint::GetInstance()->EnableProcessing();
236 
237   Status s = TryReopen(options);
238 
239   if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
240     // If direct IO is not supported, skip the test
241     return;
242   } else {
243     ASSERT_OK(s);
244   }
245 
246   Random rnd(309);
247   int key_count = 0;
248   const int num_keys_per_level = 100;
249   // Level 0 : Keys in range [0, 99], Level 1:[100, 199], Level 2:[200, 299].
250   for (int level = 2; level >= 0; level--) {
251     key_count = level * num_keys_per_level;
252     for (int i = 0; i < num_keys_per_level; ++i) {
253       ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500)));
254     }
255     ASSERT_OK(Flush());
256     MoveFilesToLevel(level);
257   }
258   Close();
259   std::vector<int> buff_prefectch_level_count = {0, 0, 0};
260   TryReopen(options);
261   {
262     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
263     fs->ClearPrefetchCount();
264     buff_prefetch_count = 0;
265 
266     for (int level = 2; level >= 0; level--) {
267       key_count = level * num_keys_per_level;
268       switch (level) {
269         case 0:
270           // max_auto_readahead_size is set 0 so data and index blocks are not
271           // prefetched.
272           ASSERT_OK(db_->SetOptions(
273               {{"block_based_table_factory", "{max_auto_readahead_size=0;}"}}));
274           break;
275         case 1:
276           // max_auto_readahead_size is set less than
277           // BlockBasedTable::kInitAutoReadaheadSize. So readahead_size remains
278           // equal to max_auto_readahead_size.
279           ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
280                                       "{max_auto_readahead_size=4096;}"}}));
281           break;
282         case 2:
283           ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
284                                       "{max_auto_readahead_size=65536;}"}}));
285           break;
286         default:
287           assert(false);
288       }
289 
290       for (int i = 0; i < num_keys_per_level; ++i) {
291         iter->Seek(Key(key_count++));
292         iter->Next();
293       }
294 
295       buff_prefectch_level_count[level] = buff_prefetch_count;
296       if (support_prefetch && !use_direct_io) {
297         if (level == 0) {
298           ASSERT_FALSE(fs->IsPrefetchCalled());
299         } else {
300           ASSERT_TRUE(fs->IsPrefetchCalled());
301         }
302         fs->ClearPrefetchCount();
303       } else {
304         ASSERT_FALSE(fs->IsPrefetchCalled());
305         if (level == 0) {
306           ASSERT_EQ(buff_prefetch_count, 0);
307         } else {
308           ASSERT_GT(buff_prefetch_count, 0);
309         }
310         buff_prefetch_count = 0;
311       }
312     }
313   }
314 
315   if (!support_prefetch) {
316     ASSERT_GT(buff_prefectch_level_count[1], buff_prefectch_level_count[2]);
317   }
318 
319   SyncPoint::GetInstance()->DisableProcessing();
320   SyncPoint::GetInstance()->ClearAllCallBacks();
321   Close();
322 }
323 #endif  // !ROCKSDB_LITE
324 
TEST_P(PrefetchTest,PrefetchWhenReseek)325 TEST_P(PrefetchTest, PrefetchWhenReseek) {
326   // First param is if the mockFS support_prefetch or not
327   bool support_prefetch =
328       std::get<0>(GetParam()) &&
329       test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
330 
331   const int kNumKeys = 2000;
332   std::shared_ptr<MockFS> fs =
333       std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
334   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
335 
336   // Second param is if directIO is enabled or not
337   bool use_direct_io = std::get<1>(GetParam());
338 
339   Options options = CurrentOptions();
340   options.write_buffer_size = 1024;
341   options.create_if_missing = true;
342   options.compression = kNoCompression;
343   options.env = env.get();
344 
345   BlockBasedTableOptions table_options;
346   table_options.no_block_cache = true;
347   table_options.cache_index_and_filter_blocks = false;
348   table_options.metadata_block_size = 1024;
349   table_options.index_type =
350       BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
351   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
352 
353   if (use_direct_io) {
354     options.use_direct_reads = true;
355     options.use_direct_io_for_flush_and_compaction = true;
356   }
357 
358   int buff_prefetch_count = 0;
359   SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
360                                         [&](void*) { buff_prefetch_count++; });
361   SyncPoint::GetInstance()->EnableProcessing();
362 
363   Status s = TryReopen(options);
364   if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
365     // If direct IO is not supported, skip the test
366     return;
367   } else {
368     ASSERT_OK(s);
369   }
370 
371   WriteBatch batch;
372   Random rnd(309);
373   for (int i = 0; i < kNumKeys; i++) {
374     ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
375   }
376   ASSERT_OK(db_->Write(WriteOptions(), &batch));
377 
378   std::string start_key = BuildKey(0);
379   std::string end_key = BuildKey(kNumKeys - 1);
380   Slice least(start_key.data(), start_key.size());
381   Slice greatest(end_key.data(), end_key.size());
382 
383   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
384 
385   fs->ClearPrefetchCount();
386   buff_prefetch_count = 0;
387 
388   {
389     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
390     /*
391      * Reseek keys from sequential Data Blocks within same partitioned
392      * index. After 2 sequential reads it will prefetch the data block.
393      * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more
394      * initially (2 more data blocks).
395      */
396     iter->Seek(BuildKey(0));
397     iter->Seek(BuildKey(1000));
398     iter->Seek(BuildKey(1004));  // Prefetch Data
399     iter->Seek(BuildKey(1008));
400     iter->Seek(BuildKey(1011));
401     iter->Seek(BuildKey(1015));  // Prefetch Data
402     iter->Seek(BuildKey(1019));
403     // Missed 2 blocks but they are already in buffer so no reset.
404     iter->Seek(BuildKey(103));   // Already in buffer.
405     iter->Seek(BuildKey(1033));  // Prefetch Data
406     if (support_prefetch && !use_direct_io) {
407       ASSERT_EQ(fs->GetPrefetchCount(), 3);
408       fs->ClearPrefetchCount();
409     } else {
410       ASSERT_EQ(buff_prefetch_count, 3);
411       buff_prefetch_count = 0;
412     }
413   }
414   {
415     /*
416      * Reseek keys from  non sequential data blocks within same partitioned
417      * index. buff_prefetch_count will be 0 in that case.
418      */
419     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
420     iter->Seek(BuildKey(0));
421     iter->Seek(BuildKey(1008));
422     iter->Seek(BuildKey(1019));
423     iter->Seek(BuildKey(1033));
424     iter->Seek(BuildKey(1048));
425     if (support_prefetch && !use_direct_io) {
426       ASSERT_EQ(fs->GetPrefetchCount(), 0);
427       fs->ClearPrefetchCount();
428     } else {
429       ASSERT_EQ(buff_prefetch_count, 0);
430       buff_prefetch_count = 0;
431     }
432   }
433   {
434     /*
435      * Reesek keys from Single Data Block.
436      */
437     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
438     iter->Seek(BuildKey(0));
439     iter->Seek(BuildKey(1));
440     iter->Seek(BuildKey(10));
441     iter->Seek(BuildKey(100));
442     if (support_prefetch && !use_direct_io) {
443       ASSERT_EQ(fs->GetPrefetchCount(), 0);
444       fs->ClearPrefetchCount();
445     } else {
446       ASSERT_EQ(buff_prefetch_count, 0);
447       buff_prefetch_count = 0;
448     }
449   }
450   {
451     /*
452      * Reseek keys from  sequential data blocks to set implicit auto readahead
453      * and prefetch data but after that iterate over different (non sequential)
454      * data blocks which won't prefetch any data further. So buff_prefetch_count
455      * will be 1 for the first one.
456      */
457     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
458     iter->Seek(BuildKey(0));
459     iter->Seek(BuildKey(1000));
460     iter->Seek(BuildKey(1004));  // This iteration will prefetch buffer
461     iter->Seek(BuildKey(1008));
462     iter->Seek(
463         BuildKey(996));  // Reseek won't prefetch any data and
464                          // readahead_size will be initiallized to 8*1024.
465     iter->Seek(BuildKey(992));
466     iter->Seek(BuildKey(989));
467     if (support_prefetch && !use_direct_io) {
468       ASSERT_EQ(fs->GetPrefetchCount(), 1);
469       fs->ClearPrefetchCount();
470     } else {
471       ASSERT_EQ(buff_prefetch_count, 1);
472       buff_prefetch_count = 0;
473     }
474 
475     // Read sequentially to confirm readahead_size is reset to initial value (2
476     // more data blocks)
477     iter->Seek(BuildKey(1011));
478     iter->Seek(BuildKey(1015));
479     iter->Seek(BuildKey(1019));  // Prefetch Data
480     iter->Seek(BuildKey(1022));
481     iter->Seek(BuildKey(1026));
482     iter->Seek(BuildKey(103));  // Prefetch Data
483     if (support_prefetch && !use_direct_io) {
484       ASSERT_EQ(fs->GetPrefetchCount(), 2);
485       fs->ClearPrefetchCount();
486     } else {
487       ASSERT_EQ(buff_prefetch_count, 2);
488       buff_prefetch_count = 0;
489     }
490   }
491   {
492     /* Reseek keys from sequential partitioned index block. Since partitioned
493      * index fetch are sequential, buff_prefetch_count will be 1.
494      */
495     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
496     iter->Seek(BuildKey(0));
497     iter->Seek(BuildKey(1167));
498     iter->Seek(BuildKey(1334));  // This iteration will prefetch buffer
499     iter->Seek(BuildKey(1499));
500     iter->Seek(BuildKey(1667));
501     iter->Seek(BuildKey(1847));
502     iter->Seek(BuildKey(1999));
503     if (support_prefetch && !use_direct_io) {
504       ASSERT_EQ(fs->GetPrefetchCount(), 1);
505       fs->ClearPrefetchCount();
506     } else {
507       ASSERT_EQ(buff_prefetch_count, 1);
508       buff_prefetch_count = 0;
509     }
510   }
511   {
512     /*
513      * Reseek over different keys from different blocks. buff_prefetch_count is
514      * set 0.
515      */
516     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
517     int i = 0;
518     int j = 1000;
519     do {
520       iter->Seek(BuildKey(i));
521       if (!iter->Valid()) {
522         break;
523       }
524       i = i + 100;
525       iter->Seek(BuildKey(j));
526       j = j + 100;
527     } while (i < 1000 && j < kNumKeys && iter->Valid());
528     if (support_prefetch && !use_direct_io) {
529       ASSERT_EQ(fs->GetPrefetchCount(), 0);
530       fs->ClearPrefetchCount();
531     } else {
532       ASSERT_EQ(buff_prefetch_count, 0);
533       buff_prefetch_count = 0;
534     }
535   }
536   {
537     /* Iterates sequentially over all keys. It will prefetch the buffer.*/
538     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
539     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
540     }
541     if (support_prefetch && !use_direct_io) {
542       ASSERT_EQ(fs->GetPrefetchCount(), 13);
543       fs->ClearPrefetchCount();
544     } else {
545       ASSERT_EQ(buff_prefetch_count, 13);
546       buff_prefetch_count = 0;
547     }
548   }
549 
550   SyncPoint::GetInstance()->DisableProcessing();
551   SyncPoint::GetInstance()->ClearAllCallBacks();
552   Close();
553 }
554 
TEST_P(PrefetchTest,PrefetchWhenReseekwithCache)555 TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
556   // First param is if the mockFS support_prefetch or not
557   bool support_prefetch =
558       std::get<0>(GetParam()) &&
559       test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
560 
561   const int kNumKeys = 2000;
562   std::shared_ptr<MockFS> fs =
563       std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
564   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
565 
566   // Second param is if directIO is enabled or not
567   bool use_direct_io = std::get<1>(GetParam());
568 
569   Options options = CurrentOptions();
570   options.write_buffer_size = 1024;
571   options.create_if_missing = true;
572   options.compression = kNoCompression;
573   options.env = env.get();
574 
575   BlockBasedTableOptions table_options;
576   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);  // 8MB
577   table_options.block_cache = cache;
578   table_options.cache_index_and_filter_blocks = false;
579   table_options.metadata_block_size = 1024;
580   table_options.index_type =
581       BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
582   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
583 
584   if (use_direct_io) {
585     options.use_direct_reads = true;
586     options.use_direct_io_for_flush_and_compaction = true;
587   }
588 
589   int buff_prefetch_count = 0;
590   SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
591                                         [&](void*) { buff_prefetch_count++; });
592   SyncPoint::GetInstance()->EnableProcessing();
593 
594   Status s = TryReopen(options);
595   if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
596     // If direct IO is not supported, skip the test
597     return;
598   } else {
599     ASSERT_OK(s);
600   }
601 
602   WriteBatch batch;
603   Random rnd(309);
604   for (int i = 0; i < kNumKeys; i++) {
605     ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
606   }
607   ASSERT_OK(db_->Write(WriteOptions(), &batch));
608 
609   std::string start_key = BuildKey(0);
610   std::string end_key = BuildKey(kNumKeys - 1);
611   Slice least(start_key.data(), start_key.size());
612   Slice greatest(end_key.data(), end_key.size());
613 
614   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
615 
616   fs->ClearPrefetchCount();
617   buff_prefetch_count = 0;
618 
619   {
620     /*
621      * Reseek keys from sequential Data Blocks within same partitioned
622      * index. After 2 sequential reads it will prefetch the data block.
623      * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more
624      * initially (2 more data blocks).
625      */
626     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
627     // Warm up the cache
628     iter->Seek(BuildKey(1011));
629     iter->Seek(BuildKey(1015));
630     iter->Seek(BuildKey(1019));
631     if (support_prefetch && !use_direct_io) {
632       ASSERT_EQ(fs->GetPrefetchCount(), 1);
633       fs->ClearPrefetchCount();
634     } else {
635       ASSERT_EQ(buff_prefetch_count, 1);
636       buff_prefetch_count = 0;
637     }
638   }
639   {
640     // After caching, blocks will be read from cache (Sequential blocks)
641     auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
642     iter->Seek(BuildKey(0));
643     iter->Seek(BuildKey(1000));
644     iter->Seek(BuildKey(1004));  // Prefetch data (not in cache).
645     // Missed one sequential block but next is in already in buffer so readahead
646     // will not be reset.
647     iter->Seek(BuildKey(1011));
648     // Prefetch data but blocks are in cache so no prefetch and reset.
649     iter->Seek(BuildKey(1015));
650     iter->Seek(BuildKey(1019));
651     iter->Seek(BuildKey(1022));
652     // Prefetch data with readahead_size = 4 blocks.
653     iter->Seek(BuildKey(1026));
654     iter->Seek(BuildKey(103));
655     iter->Seek(BuildKey(1033));
656     iter->Seek(BuildKey(1037));
657 
658     if (support_prefetch && !use_direct_io) {
659       ASSERT_EQ(fs->GetPrefetchCount(), 3);
660       fs->ClearPrefetchCount();
661     } else {
662       ASSERT_EQ(buff_prefetch_count, 2);
663       buff_prefetch_count = 0;
664     }
665   }
666 
667   SyncPoint::GetInstance()->DisableProcessing();
668   SyncPoint::GetInstance()->ClearAllCallBacks();
669   Close();
670 }
671 
672 }  // namespace ROCKSDB_NAMESPACE
673 
main(int argc,char ** argv)674 int main(int argc, char** argv) {
675   ::testing::InitGoogleTest(&argc, argv);
676 
677   return RUN_ALL_TESTS();
678 }
679