1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include <atomic>
10 #include <cstdlib>
11 #include <functional>
12 
13 #include "db/db_test_util.h"
14 #include "db/read_callback.h"
15 #include "port/port.h"
16 #include "port/stack_trace.h"
17 #include "rocksdb/persistent_cache.h"
18 #include "rocksdb/wal_filter.h"
19 #include "test_util/fault_injection_test_env.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 class DBTest2 : public DBTestBase {
24  public:
DBTest2()25   DBTest2() : DBTestBase("/db_test2") {}
26 };
27 
28 class PrefixFullBloomWithReverseComparator
29     : public DBTestBase,
30       public ::testing::WithParamInterface<bool> {
31  public:
PrefixFullBloomWithReverseComparator()32   PrefixFullBloomWithReverseComparator()
33       : DBTestBase("/prefix_bloom_reverse") {}
SetUp()34   void SetUp() override { if_cache_filter_ = GetParam(); }
35   bool if_cache_filter_;
36 };
37 
TEST_P(PrefixFullBloomWithReverseComparator,PrefixFullBloomWithReverseComparator)38 TEST_P(PrefixFullBloomWithReverseComparator,
39        PrefixFullBloomWithReverseComparator) {
40   Options options = last_options_;
41   options.comparator = ReverseBytewiseComparator();
42   options.prefix_extractor.reset(NewCappedPrefixTransform(3));
43   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
44   BlockBasedTableOptions bbto;
45   if (if_cache_filter_) {
46     bbto.no_block_cache = false;
47     bbto.cache_index_and_filter_blocks = true;
48     bbto.block_cache = NewLRUCache(1);
49   }
50   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
51   bbto.whole_key_filtering = false;
52   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
53   DestroyAndReopen(options);
54 
55   ASSERT_OK(dbfull()->Put(WriteOptions(), "bar123", "foo"));
56   ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2"));
57   ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3"));
58 
59   dbfull()->Flush(FlushOptions());
60 
61   if (bbto.block_cache) {
62     bbto.block_cache->EraseUnRefEntries();
63   }
64 
65   std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
66   iter->Seek("bar345");
67   ASSERT_OK(iter->status());
68   ASSERT_TRUE(iter->Valid());
69   ASSERT_EQ("bar234", iter->key().ToString());
70   ASSERT_EQ("foo2", iter->value().ToString());
71   iter->Next();
72   ASSERT_TRUE(iter->Valid());
73   ASSERT_EQ("bar123", iter->key().ToString());
74   ASSERT_EQ("foo", iter->value().ToString());
75 
76   iter->Seek("foo234");
77   ASSERT_OK(iter->status());
78   ASSERT_TRUE(iter->Valid());
79   ASSERT_EQ("foo123", iter->key().ToString());
80   ASSERT_EQ("foo3", iter->value().ToString());
81 
82   iter->Seek("bar");
83   ASSERT_OK(iter->status());
84   ASSERT_TRUE(!iter->Valid());
85 }
86 
87 INSTANTIATE_TEST_CASE_P(PrefixFullBloomWithReverseComparator,
88                         PrefixFullBloomWithReverseComparator, testing::Bool());
89 
TEST_F(DBTest2,IteratorPropertyVersionNumber)90 TEST_F(DBTest2, IteratorPropertyVersionNumber) {
91   Put("", "");
92   Iterator* iter1 = db_->NewIterator(ReadOptions());
93   std::string prop_value;
94   ASSERT_OK(
95       iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
96   uint64_t version_number1 =
97       static_cast<uint64_t>(std::atoi(prop_value.c_str()));
98 
99   Put("", "");
100   Flush();
101 
102   Iterator* iter2 = db_->NewIterator(ReadOptions());
103   ASSERT_OK(
104       iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
105   uint64_t version_number2 =
106       static_cast<uint64_t>(std::atoi(prop_value.c_str()));
107 
108   ASSERT_GT(version_number2, version_number1);
109 
110   Put("", "");
111 
112   Iterator* iter3 = db_->NewIterator(ReadOptions());
113   ASSERT_OK(
114       iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
115   uint64_t version_number3 =
116       static_cast<uint64_t>(std::atoi(prop_value.c_str()));
117 
118   ASSERT_EQ(version_number2, version_number3);
119 
120   iter1->SeekToFirst();
121   ASSERT_OK(
122       iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
123   uint64_t version_number1_new =
124       static_cast<uint64_t>(std::atoi(prop_value.c_str()));
125   ASSERT_EQ(version_number1, version_number1_new);
126 
127   delete iter1;
128   delete iter2;
129   delete iter3;
130 }
131 
TEST_F(DBTest2,CacheIndexAndFilterWithDBRestart)132 TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
133   Options options = CurrentOptions();
134   options.create_if_missing = true;
135   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
136   BlockBasedTableOptions table_options;
137   table_options.cache_index_and_filter_blocks = true;
138   table_options.filter_policy.reset(NewBloomFilterPolicy(20));
139   options.table_factory.reset(new BlockBasedTableFactory(table_options));
140   CreateAndReopenWithCF({"pikachu"}, options);
141 
142   Put(1, "a", "begin");
143   Put(1, "z", "end");
144   ASSERT_OK(Flush(1));
145   TryReopenWithColumnFamilies({"default", "pikachu"}, options);
146 
147   std::string value;
148   value = Get(1, "a");
149 }
150 
TEST_F(DBTest2,MaxSuccessiveMergesChangeWithDBRecovery)151 TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) {
152   Options options = CurrentOptions();
153   options.create_if_missing = true;
154   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
155   options.max_successive_merges = 3;
156   options.merge_operator = MergeOperators::CreatePutOperator();
157   options.disable_auto_compactions = true;
158   DestroyAndReopen(options);
159   Put("poi", "Finch");
160   db_->Merge(WriteOptions(), "poi", "Reese");
161   db_->Merge(WriteOptions(), "poi", "Shaw");
162   db_->Merge(WriteOptions(), "poi", "Root");
163   options.max_successive_merges = 2;
164   Reopen(options);
165 }
166 
167 #ifndef ROCKSDB_LITE
168 class DBTestSharedWriteBufferAcrossCFs
169     : public DBTestBase,
170       public testing::WithParamInterface<std::tuple<bool, bool>> {
171  public:
DBTestSharedWriteBufferAcrossCFs()172   DBTestSharedWriteBufferAcrossCFs()
173       : DBTestBase("/db_test_shared_write_buffer") {}
SetUp()174   void SetUp() override {
175     use_old_interface_ = std::get<0>(GetParam());
176     cost_cache_ = std::get<1>(GetParam());
177   }
178   bool use_old_interface_;
179   bool cost_cache_;
180 };
181 
TEST_P(DBTestSharedWriteBufferAcrossCFs,SharedWriteBufferAcrossCFs)182 TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
183   Options options = CurrentOptions();
184   options.arena_block_size = 4096;
185 
186   // Avoid undeterministic value by malloc_usable_size();
187   // Force arena block size to 1
188   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
189       "Arena::Arena:0", [&](void* arg) {
190         size_t* block_size = static_cast<size_t*>(arg);
191         *block_size = 1;
192       });
193 
194   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
195       "Arena::AllocateNewBlock:0", [&](void* arg) {
196         std::pair<size_t*, size_t*>* pair =
197             static_cast<std::pair<size_t*, size_t*>*>(arg);
198         *std::get<0>(*pair) = *std::get<1>(*pair);
199       });
200   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
201 
202   // The total soft write buffer size is about 105000
203   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
204   ASSERT_LT(cache->GetUsage(), 256 * 1024);
205 
206   if (use_old_interface_) {
207     options.db_write_buffer_size = 120000;  // this is the real limit
208   } else if (!cost_cache_) {
209     options.write_buffer_manager.reset(new WriteBufferManager(114285));
210   } else {
211     options.write_buffer_manager.reset(new WriteBufferManager(114285, cache));
212   }
213   options.write_buffer_size = 500000;  // this is never hit
214   CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
215 
216   WriteOptions wo;
217   wo.disableWAL = true;
218 
219   std::function<void()> wait_flush = [&]() {
220     dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
221     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
222     dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
223     dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
224   };
225 
226   // Create some data and flush "default" and "nikitich" so that they
227   // are newer CFs created.
228   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
229   Flush(3);
230   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
231   ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
232   Flush(0);
233   ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
234             static_cast<uint64_t>(1));
235   ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
236             static_cast<uint64_t>(1));
237 
238   ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
239   if (cost_cache_) {
240     ASSERT_GE(cache->GetUsage(), 256 * 1024);
241     ASSERT_LE(cache->GetUsage(), 2 * 256 * 1024);
242   }
243   wait_flush();
244   ASSERT_OK(Put(0, Key(1), DummyString(60000), wo));
245   if (cost_cache_) {
246     ASSERT_GE(cache->GetUsage(), 256 * 1024);
247     ASSERT_LE(cache->GetUsage(), 2 * 256 * 1024);
248   }
249   wait_flush();
250   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
251   // No flush should trigger
252   wait_flush();
253   {
254     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
255               static_cast<uint64_t>(1));
256     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
257               static_cast<uint64_t>(0));
258     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
259               static_cast<uint64_t>(0));
260     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
261               static_cast<uint64_t>(1));
262   }
263 
264   // Trigger a flush. Flushing "nikitich".
265   ASSERT_OK(Put(3, Key(2), DummyString(30000), wo));
266   wait_flush();
267   ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
268   wait_flush();
269   {
270     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
271               static_cast<uint64_t>(1));
272     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
273               static_cast<uint64_t>(0));
274     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
275               static_cast<uint64_t>(0));
276     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
277               static_cast<uint64_t>(2));
278   }
279 
280   // Without hitting the threshold, no flush should trigger.
281   ASSERT_OK(Put(2, Key(1), DummyString(30000), wo));
282   wait_flush();
283   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
284   wait_flush();
285   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
286   wait_flush();
287   {
288     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
289               static_cast<uint64_t>(1));
290     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
291               static_cast<uint64_t>(0));
292     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
293               static_cast<uint64_t>(0));
294     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
295               static_cast<uint64_t>(2));
296   }
297 
298   // Hit the write buffer limit again. "default"
299   // will have been flushed.
300   ASSERT_OK(Put(2, Key(2), DummyString(10000), wo));
301   wait_flush();
302   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
303   wait_flush();
304   ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
305   wait_flush();
306   ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
307   wait_flush();
308   ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
309   wait_flush();
310   {
311     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
312               static_cast<uint64_t>(2));
313     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
314               static_cast<uint64_t>(0));
315     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
316               static_cast<uint64_t>(0));
317     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
318               static_cast<uint64_t>(2));
319   }
320 
321   // Trigger another flush. This time "dobrynia". "pikachu" should not
322   // be flushed, althrough it was never flushed.
323   ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
324   wait_flush();
325   ASSERT_OK(Put(2, Key(1), DummyString(80000), wo));
326   wait_flush();
327   ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
328   wait_flush();
329   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
330   wait_flush();
331 
332   {
333     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
334               static_cast<uint64_t>(2));
335     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
336               static_cast<uint64_t>(0));
337     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
338               static_cast<uint64_t>(1));
339     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
340               static_cast<uint64_t>(2));
341   }
342   if (cost_cache_) {
343     ASSERT_GE(cache->GetUsage(), 256 * 1024);
344     Close();
345     options.write_buffer_manager.reset();
346     last_options_.write_buffer_manager.reset();
347     ASSERT_LT(cache->GetUsage(), 256 * 1024);
348   }
349   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
350 }
351 
352 INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
353                         DBTestSharedWriteBufferAcrossCFs,
354                         ::testing::Values(std::make_tuple(true, false),
355                                           std::make_tuple(false, false),
356                                           std::make_tuple(false, true)));
357 
TEST_F(DBTest2,SharedWriteBufferLimitAcrossDB)358 TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
359   std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2");
360   Options options = CurrentOptions();
361   options.arena_block_size = 4096;
362   // Avoid undeterministic value by malloc_usable_size();
363   // Force arena block size to 1
364   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
365       "Arena::Arena:0", [&](void* arg) {
366         size_t* block_size = static_cast<size_t*>(arg);
367         *block_size = 1;
368       });
369 
370   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
371       "Arena::AllocateNewBlock:0", [&](void* arg) {
372         std::pair<size_t*, size_t*>* pair =
373             static_cast<std::pair<size_t*, size_t*>*>(arg);
374         *std::get<0>(*pair) = *std::get<1>(*pair);
375       });
376   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
377 
378   options.write_buffer_size = 500000;  // this is never hit
379   // Use a write buffer total size so that the soft limit is about
380   // 105000.
381   options.write_buffer_manager.reset(new WriteBufferManager(120000));
382   CreateAndReopenWithCF({"cf1", "cf2"}, options);
383 
384   ASSERT_OK(DestroyDB(dbname2, options));
385   DB* db2 = nullptr;
386   ASSERT_OK(DB::Open(options, dbname2, &db2));
387 
388   WriteOptions wo;
389   wo.disableWAL = true;
390 
391   std::function<void()> wait_flush = [&]() {
392     dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
393     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
394     dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
395     static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
396   };
397 
398   // Trigger a flush on cf2
399   ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
400   wait_flush();
401   ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));
402   wait_flush();
403 
404   // Insert to DB2
405   ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
406   wait_flush();
407 
408   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
409   wait_flush();
410   static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
411   {
412     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") +
413                   GetNumberOfSstFilesForColumnFamily(db_, "cf1") +
414                   GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
415               static_cast<uint64_t>(1));
416     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
417               static_cast<uint64_t>(0));
418   }
419 
420   // Triggering to flush another CF in DB1
421   ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000)));
422   wait_flush();
423   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
424   wait_flush();
425   {
426     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
427               static_cast<uint64_t>(1));
428     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
429               static_cast<uint64_t>(0));
430     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
431               static_cast<uint64_t>(1));
432     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
433               static_cast<uint64_t>(0));
434   }
435 
436   // Triggering flush in DB2.
437   ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000)));
438   wait_flush();
439   ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
440   wait_flush();
441   static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
442   {
443     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
444               static_cast<uint64_t>(1));
445     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
446               static_cast<uint64_t>(0));
447     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
448               static_cast<uint64_t>(1));
449     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
450               static_cast<uint64_t>(1));
451   }
452 
453   delete db2;
454   ASSERT_OK(DestroyDB(dbname2, options));
455 
456   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
457 }
458 
TEST_F(DBTest2,TestWriteBufferNoLimitWithCache)459 TEST_F(DBTest2, TestWriteBufferNoLimitWithCache) {
460   Options options = CurrentOptions();
461   options.arena_block_size = 4096;
462   std::shared_ptr<Cache> cache =
463       NewLRUCache(LRUCacheOptions(10000000, 1, false, 0.0));
464   options.write_buffer_size = 50000;  // this is never hit
465   // Use a write buffer total size so that the soft limit is about
466   // 105000.
467   options.write_buffer_manager.reset(new WriteBufferManager(0, cache));
468   Reopen(options);
469 
470   ASSERT_OK(Put("foo", "bar"));
471   // One dummy entry is 256KB.
472   ASSERT_GT(cache->GetUsage(), 128000);
473 }
474 
475 namespace {
ValidateKeyExistence(DB * db,const std::vector<Slice> & keys_must_exist,const std::vector<Slice> & keys_must_not_exist)476   void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
477     const std::vector<Slice>& keys_must_not_exist) {
478     // Ensure that expected keys exist
479     std::vector<std::string> values;
480     if (keys_must_exist.size() > 0) {
481       std::vector<Status> status_list =
482         db->MultiGet(ReadOptions(), keys_must_exist, &values);
483       for (size_t i = 0; i < keys_must_exist.size(); i++) {
484         ASSERT_OK(status_list[i]);
485       }
486     }
487 
488     // Ensure that given keys don't exist
489     if (keys_must_not_exist.size() > 0) {
490       std::vector<Status> status_list =
491         db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
492       for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
493         ASSERT_TRUE(status_list[i].IsNotFound());
494       }
495     }
496   }
497 
498 }  // namespace
499 
TEST_F(DBTest2,WalFilterTest)500 TEST_F(DBTest2, WalFilterTest) {
501   class TestWalFilter : public WalFilter {
502   private:
503     // Processing option that is requested to be applied at the given index
504     WalFilter::WalProcessingOption wal_processing_option_;
505     // Index at which to apply wal_processing_option_
506     // At other indexes default wal_processing_option::kContinueProcessing is
507     // returned.
508     size_t apply_option_at_record_index_;
509     // Current record index, incremented with each record encountered.
510     size_t current_record_index_;
511 
512   public:
513     TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
514       size_t apply_option_for_record_index)
515       : wal_processing_option_(wal_processing_option),
516       apply_option_at_record_index_(apply_option_for_record_index),
517       current_record_index_(0) {}
518 
519     WalProcessingOption LogRecord(const WriteBatch& /*batch*/,
520                                   WriteBatch* /*new_batch*/,
521                                   bool* /*batch_changed*/) const override {
522       WalFilter::WalProcessingOption option_to_return;
523 
524       if (current_record_index_ == apply_option_at_record_index_) {
525         option_to_return = wal_processing_option_;
526       }
527       else {
528         option_to_return = WalProcessingOption::kContinueProcessing;
529       }
530 
531       // Filter is passed as a const object for RocksDB to not modify the
532       // object, however we modify it for our own purpose here and hence
533       // cast the constness away.
534       (const_cast<TestWalFilter*>(this)->current_record_index_)++;
535 
536       return option_to_return;
537     }
538 
539     const char* Name() const override { return "TestWalFilter"; }
540   };
541 
542   // Create 3 batches with two keys each
543   std::vector<std::vector<std::string>> batch_keys(3);
544 
545   batch_keys[0].push_back("key1");
546   batch_keys[0].push_back("key2");
547   batch_keys[1].push_back("key3");
548   batch_keys[1].push_back("key4");
549   batch_keys[2].push_back("key5");
550   batch_keys[2].push_back("key6");
551 
552   // Test with all WAL processing options
553   for (int option = 0;
554     option < static_cast<int>(
555     WalFilter::WalProcessingOption::kWalProcessingOptionMax);
556   option++) {
557     Options options = OptionsForLogIterTest();
558     DestroyAndReopen(options);
559     CreateAndReopenWithCF({ "pikachu" }, options);
560 
561     // Write given keys in given batches
562     for (size_t i = 0; i < batch_keys.size(); i++) {
563       WriteBatch batch;
564       for (size_t j = 0; j < batch_keys[i].size(); j++) {
565         batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
566       }
567       dbfull()->Write(WriteOptions(), &batch);
568     }
569 
570     WalFilter::WalProcessingOption wal_processing_option =
571       static_cast<WalFilter::WalProcessingOption>(option);
572 
573     // Create a test filter that would apply wal_processing_option at the first
574     // record
575     size_t apply_option_for_record_index = 1;
576     TestWalFilter test_wal_filter(wal_processing_option,
577       apply_option_for_record_index);
578 
579     // Reopen database with option to use WAL filter
580     options = OptionsForLogIterTest();
581     options.wal_filter = &test_wal_filter;
582     Status status =
583       TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
584     if (wal_processing_option ==
585       WalFilter::WalProcessingOption::kCorruptedRecord) {
586       assert(!status.ok());
587       // In case of corruption we can turn off paranoid_checks to reopen
588       // databse
589       options.paranoid_checks = false;
590       ReopenWithColumnFamilies({ "default", "pikachu" }, options);
591     }
592     else {
593       assert(status.ok());
594     }
595 
596     // Compute which keys we expect to be found
597     // and which we expect not to be found after recovery.
598     std::vector<Slice> keys_must_exist;
599     std::vector<Slice> keys_must_not_exist;
600     switch (wal_processing_option) {
601     case WalFilter::WalProcessingOption::kCorruptedRecord:
602     case WalFilter::WalProcessingOption::kContinueProcessing: {
603       fprintf(stderr, "Testing with complete WAL processing\n");
604       // we expect all records to be processed
605       for (size_t i = 0; i < batch_keys.size(); i++) {
606         for (size_t j = 0; j < batch_keys[i].size(); j++) {
607           keys_must_exist.push_back(Slice(batch_keys[i][j]));
608         }
609       }
610       break;
611     }
612     case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
613       fprintf(stderr,
614         "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
615         apply_option_for_record_index);
616       // We expect the record with apply_option_for_record_index to be not
617       // found.
618       for (size_t i = 0; i < batch_keys.size(); i++) {
619         for (size_t j = 0; j < batch_keys[i].size(); j++) {
620           if (i == apply_option_for_record_index) {
621             keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
622           }
623           else {
624             keys_must_exist.push_back(Slice(batch_keys[i][j]));
625           }
626         }
627       }
628       break;
629     }
630     case WalFilter::WalProcessingOption::kStopReplay: {
631       fprintf(stderr,
632         "Testing with stopping replay from record %" ROCKSDB_PRIszt
633         "\n",
634         apply_option_for_record_index);
635       // We expect records beyond apply_option_for_record_index to be not
636       // found.
637       for (size_t i = 0; i < batch_keys.size(); i++) {
638         for (size_t j = 0; j < batch_keys[i].size(); j++) {
639           if (i >= apply_option_for_record_index) {
640             keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
641           }
642           else {
643             keys_must_exist.push_back(Slice(batch_keys[i][j]));
644           }
645         }
646       }
647       break;
648     }
649     default:
650       assert(false);  // unhandled case
651     }
652 
653     bool checked_after_reopen = false;
654 
655     while (true) {
656       // Ensure that expected keys exists
657       // and not expected keys don't exist after recovery
658       ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
659 
660       if (checked_after_reopen) {
661         break;
662       }
663 
664       // reopen database again to make sure previous log(s) are not used
665       //(even if they were skipped)
666       // reopn database with option to use WAL filter
667       options = OptionsForLogIterTest();
668       ReopenWithColumnFamilies({ "default", "pikachu" }, options);
669 
670       checked_after_reopen = true;
671     }
672   }
673 }
674 
TEST_F(DBTest2,WalFilterTestWithChangeBatch)675 TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
676   class ChangeBatchHandler : public WriteBatch::Handler {
677   private:
678     // Batch to insert keys in
679     WriteBatch* new_write_batch_;
680     // Number of keys to add in the new batch
681     size_t num_keys_to_add_in_new_batch_;
682     // Number of keys added to new batch
683     size_t num_keys_added_;
684 
685   public:
686     ChangeBatchHandler(WriteBatch* new_write_batch,
687       size_t num_keys_to_add_in_new_batch)
688       : new_write_batch_(new_write_batch),
689       num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
690       num_keys_added_(0) {}
691     void Put(const Slice& key, const Slice& value) override {
692       if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
693         new_write_batch_->Put(key, value);
694         ++num_keys_added_;
695       }
696     }
697   };
698 
699   class TestWalFilterWithChangeBatch : public WalFilter {
700   private:
701     // Index at which to start changing records
702     size_t change_records_from_index_;
703     // Number of keys to add in the new batch
704     size_t num_keys_to_add_in_new_batch_;
705     // Current record index, incremented with each record encountered.
706     size_t current_record_index_;
707 
708   public:
709     TestWalFilterWithChangeBatch(size_t change_records_from_index,
710       size_t num_keys_to_add_in_new_batch)
711       : change_records_from_index_(change_records_from_index),
712       num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
713       current_record_index_(0) {}
714 
715     WalProcessingOption LogRecord(const WriteBatch& batch,
716                                   WriteBatch* new_batch,
717                                   bool* batch_changed) const override {
718       if (current_record_index_ >= change_records_from_index_) {
719         ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
720         batch.Iterate(&handler);
721         *batch_changed = true;
722       }
723 
724       // Filter is passed as a const object for RocksDB to not modify the
725       // object, however we modify it for our own purpose here and hence
726       // cast the constness away.
727       (const_cast<TestWalFilterWithChangeBatch*>(this)
728         ->current_record_index_)++;
729 
730       return WalProcessingOption::kContinueProcessing;
731     }
732 
733     const char* Name() const override { return "TestWalFilterWithChangeBatch"; }
734   };
735 
736   std::vector<std::vector<std::string>> batch_keys(3);
737 
738   batch_keys[0].push_back("key1");
739   batch_keys[0].push_back("key2");
740   batch_keys[1].push_back("key3");
741   batch_keys[1].push_back("key4");
742   batch_keys[2].push_back("key5");
743   batch_keys[2].push_back("key6");
744 
745   Options options = OptionsForLogIterTest();
746   DestroyAndReopen(options);
747   CreateAndReopenWithCF({ "pikachu" }, options);
748 
749   // Write given keys in given batches
750   for (size_t i = 0; i < batch_keys.size(); i++) {
751     WriteBatch batch;
752     for (size_t j = 0; j < batch_keys[i].size(); j++) {
753       batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
754     }
755     dbfull()->Write(WriteOptions(), &batch);
756   }
757 
758   // Create a test filter that would apply wal_processing_option at the first
759   // record
760   size_t change_records_from_index = 1;
761   size_t num_keys_to_add_in_new_batch = 1;
762   TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
763     change_records_from_index, num_keys_to_add_in_new_batch);
764 
765   // Reopen database with option to use WAL filter
766   options = OptionsForLogIterTest();
767   options.wal_filter = &test_wal_filter_with_change_batch;
768   ReopenWithColumnFamilies({ "default", "pikachu" }, options);
769 
770   // Ensure that all keys exist before change_records_from_index_
771   // And after that index only single key exists
772   // as our filter adds only single key for each batch
773   std::vector<Slice> keys_must_exist;
774   std::vector<Slice> keys_must_not_exist;
775 
776   for (size_t i = 0; i < batch_keys.size(); i++) {
777     for (size_t j = 0; j < batch_keys[i].size(); j++) {
778       if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
779         keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
780       }
781       else {
782         keys_must_exist.push_back(Slice(batch_keys[i][j]));
783       }
784     }
785   }
786 
787   bool checked_after_reopen = false;
788 
789   while (true) {
790     // Ensure that expected keys exists
791     // and not expected keys don't exist after recovery
792     ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
793 
794     if (checked_after_reopen) {
795       break;
796     }
797 
798     // reopen database again to make sure previous log(s) are not used
799     //(even if they were skipped)
800     // reopn database with option to use WAL filter
801     options = OptionsForLogIterTest();
802     ReopenWithColumnFamilies({ "default", "pikachu" }, options);
803 
804     checked_after_reopen = true;
805   }
806 }
807 
TEST_F(DBTest2,WalFilterTestWithChangeBatchExtraKeys)808 TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
809   class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
810   public:
811    WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch,
812                                  bool* batch_changed) const override {
813      *new_batch = batch;
814      new_batch->Put("key_extra", "value_extra");
815      *batch_changed = true;
816      return WalProcessingOption::kContinueProcessing;
817    }
818 
819    const char* Name() const override {
820      return "WalFilterTestWithChangeBatchExtraKeys";
821    }
822   };
823 
824   std::vector<std::vector<std::string>> batch_keys(3);
825 
826   batch_keys[0].push_back("key1");
827   batch_keys[0].push_back("key2");
828   batch_keys[1].push_back("key3");
829   batch_keys[1].push_back("key4");
830   batch_keys[2].push_back("key5");
831   batch_keys[2].push_back("key6");
832 
833   Options options = OptionsForLogIterTest();
834   DestroyAndReopen(options);
835   CreateAndReopenWithCF({ "pikachu" }, options);
836 
837   // Write given keys in given batches
838   for (size_t i = 0; i < batch_keys.size(); i++) {
839     WriteBatch batch;
840     for (size_t j = 0; j < batch_keys[i].size(); j++) {
841       batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
842     }
843     dbfull()->Write(WriteOptions(), &batch);
844   }
845 
846   // Create a test filter that would add extra keys
847   TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys;
848 
849   // Reopen database with option to use WAL filter
850   options = OptionsForLogIterTest();
851   options.wal_filter = &test_wal_filter_extra_keys;
852   Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
853   ASSERT_TRUE(status.IsNotSupported());
854 
855   // Reopen without filter, now reopen should succeed - previous
856   // attempt to open must not have altered the db.
857   options = OptionsForLogIterTest();
858   ReopenWithColumnFamilies({ "default", "pikachu" }, options);
859 
860   std::vector<Slice> keys_must_exist;
861   std::vector<Slice> keys_must_not_exist;  // empty vector
862 
863   for (size_t i = 0; i < batch_keys.size(); i++) {
864     for (size_t j = 0; j < batch_keys[i].size(); j++) {
865       keys_must_exist.push_back(Slice(batch_keys[i][j]));
866     }
867   }
868 
869   ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
870 }
871 
TEST_F(DBTest2,WalFilterTestWithColumnFamilies)872 TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
873   class TestWalFilterWithColumnFamilies : public WalFilter {
874   private:
875     // column_family_id -> log_number map (provided to WALFilter)
876     std::map<uint32_t, uint64_t> cf_log_number_map_;
877     // column_family_name -> column_family_id map (provided to WALFilter)
878     std::map<std::string, uint32_t> cf_name_id_map_;
879     // column_family_name -> keys_found_in_wal map
880     // We store keys that are applicable to the column_family
881     // during recovery (i.e. aren't already flushed to SST file(s))
882     // for verification against the keys we expect.
883     std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
884   public:
885    void ColumnFamilyLogNumberMap(
886        const std::map<uint32_t, uint64_t>& cf_lognumber_map,
887        const std::map<std::string, uint32_t>& cf_name_id_map) override {
888      cf_log_number_map_ = cf_lognumber_map;
889      cf_name_id_map_ = cf_name_id_map;
890    }
891 
892    WalProcessingOption LogRecordFound(unsigned long long log_number,
893                                       const std::string& /*log_file_name*/,
894                                       const WriteBatch& batch,
895                                       WriteBatch* /*new_batch*/,
896                                       bool* /*batch_changed*/) override {
897      class LogRecordBatchHandler : public WriteBatch::Handler {
898       private:
899         const std::map<uint32_t, uint64_t> & cf_log_number_map_;
900         std::map<uint32_t, std::vector<std::string>> & cf_wal_keys_;
901         unsigned long long log_number_;
902       public:
903         LogRecordBatchHandler(unsigned long long current_log_number,
904           const std::map<uint32_t, uint64_t> & cf_log_number_map,
905           std::map<uint32_t, std::vector<std::string>> & cf_wal_keys) :
906           cf_log_number_map_(cf_log_number_map),
907           cf_wal_keys_(cf_wal_keys),
908           log_number_(current_log_number){}
909 
910         Status PutCF(uint32_t column_family_id, const Slice& key,
911                      const Slice& /*value*/) override {
912           auto it = cf_log_number_map_.find(column_family_id);
913           assert(it != cf_log_number_map_.end());
914           unsigned long long log_number_for_cf = it->second;
915           // If the current record is applicable for column_family_id
916           // (i.e. isn't flushed to SST file(s) for column_family_id)
917           // add it to the cf_wal_keys_ map for verification.
918           if (log_number_ >= log_number_for_cf) {
919             cf_wal_keys_[column_family_id].push_back(std::string(key.data(),
920               key.size()));
921           }
922           return Status::OK();
923         }
924       } handler(log_number, cf_log_number_map_, cf_wal_keys_);
925 
926       batch.Iterate(&handler);
927 
928       return WalProcessingOption::kContinueProcessing;
929    }
930 
931    const char* Name() const override {
932      return "WalFilterTestWithColumnFamilies";
933    }
934 
935     const std::map<uint32_t, std::vector<std::string>>& GetColumnFamilyKeys() {
936       return cf_wal_keys_;
937     }
938 
939     const std::map<std::string, uint32_t> & GetColumnFamilyNameIdMap() {
940       return cf_name_id_map_;
941     }
942   };
943 
944   std::vector<std::vector<std::string>> batch_keys_pre_flush(3);
945 
946   batch_keys_pre_flush[0].push_back("key1");
947   batch_keys_pre_flush[0].push_back("key2");
948   batch_keys_pre_flush[1].push_back("key3");
949   batch_keys_pre_flush[1].push_back("key4");
950   batch_keys_pre_flush[2].push_back("key5");
951   batch_keys_pre_flush[2].push_back("key6");
952 
953   Options options = OptionsForLogIterTest();
954   DestroyAndReopen(options);
955   CreateAndReopenWithCF({ "pikachu" }, options);
956 
957   // Write given keys in given batches
958   for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
959     WriteBatch batch;
960     for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
961       batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024));
962       batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024));
963     }
964     dbfull()->Write(WriteOptions(), &batch);
965   }
966 
967   //Flush default column-family
968   db_->Flush(FlushOptions(), handles_[0]);
969 
970   // Do some more writes
971   std::vector<std::vector<std::string>> batch_keys_post_flush(3);
972 
973   batch_keys_post_flush[0].push_back("key7");
974   batch_keys_post_flush[0].push_back("key8");
975   batch_keys_post_flush[1].push_back("key9");
976   batch_keys_post_flush[1].push_back("key10");
977   batch_keys_post_flush[2].push_back("key11");
978   batch_keys_post_flush[2].push_back("key12");
979 
980   // Write given keys in given batches
981   for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
982     WriteBatch batch;
983     for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
984       batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024));
985       batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024));
986     }
987     dbfull()->Write(WriteOptions(), &batch);
988   }
989 
990   // On Recovery we should only find the second batch applicable to default CF
991   // But both batches applicable to pikachu CF
992 
993   // Create a test filter that would add extra keys
994   TestWalFilterWithColumnFamilies test_wal_filter_column_families;
995 
996   // Reopen database with option to use WAL filter
997   options = OptionsForLogIterTest();
998   options.wal_filter = &test_wal_filter_column_families;
999   Status status =
1000     TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
1001   ASSERT_TRUE(status.ok());
1002 
1003   // verify that handles_[0] only has post_flush keys
1004   // while handles_[1] has pre and post flush keys
1005   auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys();
1006   auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
1007   size_t index = 0;
1008   auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
1009   //default column-family, only post_flush keys are expected
1010   for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
1011     for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
1012       Slice key_from_the_log(keys_cf[index++]);
1013       Slice batch_key(batch_keys_post_flush[i][j]);
1014       ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
1015     }
1016   }
1017   ASSERT_TRUE(index == keys_cf.size());
1018 
1019   index = 0;
1020   keys_cf = cf_wal_keys[name_id_map["pikachu"]];
1021   //pikachu column-family, all keys are expected
1022   for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
1023     for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
1024       Slice key_from_the_log(keys_cf[index++]);
1025       Slice batch_key(batch_keys_pre_flush[i][j]);
1026       ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
1027     }
1028   }
1029 
1030   for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
1031     for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
1032       Slice key_from_the_log(keys_cf[index++]);
1033       Slice batch_key(batch_keys_post_flush[i][j]);
1034       ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
1035     }
1036   }
1037   ASSERT_TRUE(index == keys_cf.size());
1038 }
1039 
TEST_F(DBTest2,PresetCompressionDict)1040 TEST_F(DBTest2, PresetCompressionDict) {
1041   // Verifies that compression ratio improves when dictionary is enabled, and
1042   // improves even further when the dictionary is trained by ZSTD.
1043   const size_t kBlockSizeBytes = 4 << 10;
1044   const size_t kL0FileBytes = 128 << 10;
1045   const size_t kApproxPerBlockOverheadBytes = 50;
1046   const int kNumL0Files = 5;
1047 
1048   Options options;
1049   // Make sure to use any custom env that the test is configured with.
1050   options.env = CurrentOptions().env;
1051   options.allow_concurrent_memtable_write = false;
1052   options.arena_block_size = kBlockSizeBytes;
1053   options.create_if_missing = true;
1054   options.disable_auto_compactions = true;
1055   options.level0_file_num_compaction_trigger = kNumL0Files;
1056   options.memtable_factory.reset(
1057       new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
1058   options.num_levels = 2;
1059   options.target_file_size_base = kL0FileBytes;
1060   options.target_file_size_multiplier = 2;
1061   options.write_buffer_size = kL0FileBytes;
1062   BlockBasedTableOptions table_options;
1063   table_options.block_size = kBlockSizeBytes;
1064   std::vector<CompressionType> compression_types;
1065   if (Zlib_Supported()) {
1066     compression_types.push_back(kZlibCompression);
1067   }
1068 #if LZ4_VERSION_NUMBER >= 10400  // r124+
1069   compression_types.push_back(kLZ4Compression);
1070   compression_types.push_back(kLZ4HCCompression);
1071 #endif                          // LZ4_VERSION_NUMBER >= 10400
1072   if (ZSTD_Supported()) {
1073     compression_types.push_back(kZSTD);
1074   }
1075 
1076   enum DictionaryTypes : int {
1077     kWithoutDict,
1078     kWithDict,
1079     kWithZSTDTrainedDict,
1080     kDictEnd,
1081   };
1082 
1083   for (auto compression_type : compression_types) {
1084     options.compression = compression_type;
1085     size_t bytes_without_dict = 0;
1086     size_t bytes_with_dict = 0;
1087     size_t bytes_with_zstd_trained_dict = 0;
1088     for (int i = kWithoutDict; i < kDictEnd; i++) {
1089       // First iteration: compress without preset dictionary
1090       // Second iteration: compress with preset dictionary
1091       // Third iteration (zstd only): compress with zstd-trained dictionary
1092       //
1093       // To make sure the compression dictionary has the intended effect, we
1094       // verify the compressed size is smaller in successive iterations. Also in
1095       // the non-first iterations, verify the data we get out is the same data
1096       // we put in.
1097       switch (i) {
1098         case kWithoutDict:
1099           options.compression_opts.max_dict_bytes = 0;
1100           options.compression_opts.zstd_max_train_bytes = 0;
1101           break;
1102         case kWithDict:
1103           options.compression_opts.max_dict_bytes = kBlockSizeBytes;
1104           options.compression_opts.zstd_max_train_bytes = 0;
1105           break;
1106         case kWithZSTDTrainedDict:
1107           if (compression_type != kZSTD) {
1108             continue;
1109           }
1110           options.compression_opts.max_dict_bytes = kBlockSizeBytes;
1111           options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
1112           break;
1113         default:
1114           assert(false);
1115       }
1116 
1117       options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1118       options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1119       CreateAndReopenWithCF({"pikachu"}, options);
1120       Random rnd(301);
1121       std::string seq_datas[10];
1122       for (int j = 0; j < 10; ++j) {
1123         seq_datas[j] =
1124             RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
1125       }
1126 
1127       ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
1128       for (int j = 0; j < kNumL0Files; ++j) {
1129         for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
1130           auto key_num = j * (kL0FileBytes / kBlockSizeBytes) + k;
1131           ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
1132                         seq_datas[(key_num / 10) % 10]));
1133         }
1134         dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1135         ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
1136       }
1137       dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
1138                                   true /* disallow_trivial_move */);
1139       ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
1140       ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
1141 
1142       // Get the live sst files size
1143       size_t total_sst_bytes = TotalSize(1);
1144       if (i == kWithoutDict) {
1145         bytes_without_dict = total_sst_bytes;
1146       } else if (i == kWithDict) {
1147         bytes_with_dict = total_sst_bytes;
1148       } else if (i == kWithZSTDTrainedDict) {
1149         bytes_with_zstd_trained_dict = total_sst_bytes;
1150       }
1151 
1152       for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
1153            j++) {
1154         ASSERT_EQ(seq_datas[(j / 10) % 10], Get(1, Key(static_cast<int>(j))));
1155       }
1156       if (i == kWithDict) {
1157         ASSERT_GT(bytes_without_dict, bytes_with_dict);
1158       } else if (i == kWithZSTDTrainedDict) {
1159         // In zstd compression, it is sometimes possible that using a trained
1160         // dictionary does not get as good a compression ratio as without
1161         // training.
1162         // But using a dictionary (with or without training) should always get
1163         // better compression ratio than not using one.
1164         ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_trained_dict ||
1165                     bytes_without_dict > bytes_with_zstd_trained_dict);
1166       }
1167 
1168       DestroyAndReopen(options);
1169     }
1170   }
1171 }
1172 
TEST_F(DBTest2,PresetCompressionDictLocality)1173 TEST_F(DBTest2, PresetCompressionDictLocality) {
1174   if (!ZSTD_Supported()) {
1175     return;
1176   }
1177   // Verifies that compression dictionary is generated from local data. The
1178   // verification simply checks all output SSTs have different compression
1179   // dictionaries. We do not verify effectiveness as that'd likely be flaky in
1180   // the future.
1181   const int kNumEntriesPerFile = 1 << 10;  // 1KB
1182   const int kNumBytesPerEntry = 1 << 10;   // 1KB
1183   const int kNumFiles = 4;
1184   Options options = CurrentOptions();
1185   options.compression = kZSTD;
1186   options.compression_opts.max_dict_bytes = 1 << 14;        // 16KB
1187   options.compression_opts.zstd_max_train_bytes = 1 << 18;  // 256KB
1188   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1189   options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
1190   BlockBasedTableOptions table_options;
1191   table_options.cache_index_and_filter_blocks = true;
1192   options.table_factory.reset(new BlockBasedTableFactory(table_options));
1193   Reopen(options);
1194 
1195   Random rnd(301);
1196   for (int i = 0; i < kNumFiles; ++i) {
1197     for (int j = 0; j < kNumEntriesPerFile; ++j) {
1198       ASSERT_OK(Put(Key(i * kNumEntriesPerFile + j),
1199                     RandomString(&rnd, kNumBytesPerEntry)));
1200     }
1201     ASSERT_OK(Flush());
1202     MoveFilesToLevel(1);
1203     ASSERT_EQ(NumTableFilesAtLevel(1), i + 1);
1204   }
1205 
1206   // Store all the dictionaries generated during a full compaction.
1207   std::vector<std::string> compression_dicts;
1208   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1209       "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1210       [&](void* arg) {
1211         compression_dicts.emplace_back(static_cast<Slice*>(arg)->ToString());
1212       });
1213   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1214   CompactRangeOptions compact_range_opts;
1215   compact_range_opts.bottommost_level_compaction =
1216       BottommostLevelCompaction::kForceOptimized;
1217   ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
1218 
1219   // Dictionary compression should not be so good as to compress four totally
1220   // random files into one. If it does then there's probably something wrong
1221   // with the test.
1222   ASSERT_GT(NumTableFilesAtLevel(1), 1);
1223 
1224   // Furthermore, there should be one compression dictionary generated per file.
1225   // And they should all be different from each other.
1226   ASSERT_EQ(NumTableFilesAtLevel(1),
1227             static_cast<int>(compression_dicts.size()));
1228   for (size_t i = 1; i < compression_dicts.size(); ++i) {
1229     std::string& a = compression_dicts[i - 1];
1230     std::string& b = compression_dicts[i];
1231     size_t alen = a.size();
1232     size_t blen = b.size();
1233     ASSERT_TRUE(alen != blen || memcmp(a.data(), b.data(), alen) != 0);
1234   }
1235 }
1236 
1237 class CompactionCompressionListener : public EventListener {
1238  public:
CompactionCompressionListener(Options * db_options)1239   explicit CompactionCompressionListener(Options* db_options)
1240       : db_options_(db_options) {}
1241 
OnCompactionCompleted(DB * db,const CompactionJobInfo & ci)1242   void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
1243     // Figure out last level with files
1244     int bottommost_level = 0;
1245     for (int level = 0; level < db->NumberLevels(); level++) {
1246       std::string files_at_level;
1247       ASSERT_TRUE(
1248           db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
1249                           &files_at_level));
1250       if (files_at_level != "0") {
1251         bottommost_level = level;
1252       }
1253     }
1254 
1255     if (db_options_->bottommost_compression != kDisableCompressionOption &&
1256         ci.output_level == bottommost_level) {
1257       ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
1258     } else if (db_options_->compression_per_level.size() != 0) {
1259       ASSERT_EQ(ci.compression,
1260                 db_options_->compression_per_level[ci.output_level]);
1261     } else {
1262       ASSERT_EQ(ci.compression, db_options_->compression);
1263     }
1264     max_level_checked = std::max(max_level_checked, ci.output_level);
1265   }
1266 
1267   int max_level_checked = 0;
1268   const Options* db_options_;
1269 };
1270 
TEST_F(DBTest2,CompressionOptions)1271 TEST_F(DBTest2, CompressionOptions) {
1272   if (!Zlib_Supported() || !Snappy_Supported()) {
1273     return;
1274   }
1275 
1276   Options options = CurrentOptions();
1277   options.level0_file_num_compaction_trigger = 2;
1278   options.max_bytes_for_level_base = 100;
1279   options.max_bytes_for_level_multiplier = 2;
1280   options.num_levels = 7;
1281   options.max_background_compactions = 1;
1282 
1283   CompactionCompressionListener* listener =
1284       new CompactionCompressionListener(&options);
1285   options.listeners.emplace_back(listener);
1286 
1287   const int kKeySize = 5;
1288   const int kValSize = 20;
1289   Random rnd(301);
1290 
1291   for (int iter = 0; iter <= 2; iter++) {
1292     listener->max_level_checked = 0;
1293 
1294     if (iter == 0) {
1295       // Use different compression algorithms for different levels but
1296       // always use Zlib for bottommost level
1297       options.compression_per_level = {kNoCompression,     kNoCompression,
1298                                        kNoCompression,     kSnappyCompression,
1299                                        kSnappyCompression, kSnappyCompression,
1300                                        kZlibCompression};
1301       options.compression = kNoCompression;
1302       options.bottommost_compression = kZlibCompression;
1303     } else if (iter == 1) {
1304       // Use Snappy except for bottommost level use ZLib
1305       options.compression_per_level = {};
1306       options.compression = kSnappyCompression;
1307       options.bottommost_compression = kZlibCompression;
1308     } else if (iter == 2) {
1309       // Use Snappy everywhere
1310       options.compression_per_level = {};
1311       options.compression = kSnappyCompression;
1312       options.bottommost_compression = kDisableCompressionOption;
1313     }
1314 
1315     DestroyAndReopen(options);
1316     // Write 10 random files
1317     for (int i = 0; i < 10; i++) {
1318       for (int j = 0; j < 5; j++) {
1319         ASSERT_OK(
1320             Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize)));
1321       }
1322       ASSERT_OK(Flush());
1323       dbfull()->TEST_WaitForCompact();
1324     }
1325 
1326     // Make sure that we wrote enough to check all 7 levels
1327     ASSERT_EQ(listener->max_level_checked, 6);
1328   }
1329 }
1330 
1331 class CompactionStallTestListener : public EventListener {
1332  public:
CompactionStallTestListener()1333   CompactionStallTestListener() : compacting_files_cnt_(0), compacted_files_cnt_(0) {}
1334 
OnCompactionBegin(DB *,const CompactionJobInfo & ci)1335   void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
1336     ASSERT_EQ(ci.cf_name, "default");
1337     ASSERT_EQ(ci.base_input_level, 0);
1338     ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
1339     compacting_files_cnt_ += ci.input_files.size();
1340   }
1341 
OnCompactionCompleted(DB *,const CompactionJobInfo & ci)1342   void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
1343     ASSERT_EQ(ci.cf_name, "default");
1344     ASSERT_EQ(ci.base_input_level, 0);
1345     ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
1346     compacted_files_cnt_ += ci.input_files.size();
1347   }
1348 
1349   std::atomic<size_t> compacting_files_cnt_;
1350   std::atomic<size_t> compacted_files_cnt_;
1351 };
1352 
TEST_F(DBTest2,CompactionStall)1353 TEST_F(DBTest2, CompactionStall) {
1354   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1355       {{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"},
1356        {"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"},
1357        {"DBTest2::CompactionStall:2",
1358         "DBImpl::NotifyOnCompactionBegin::UnlockMutex"},
1359        {"DBTest2::CompactionStall:3",
1360         "DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}});
1361   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1362 
1363   Options options = CurrentOptions();
1364   options.level0_file_num_compaction_trigger = 4;
1365   options.max_background_compactions = 40;
1366   CompactionStallTestListener* listener = new CompactionStallTestListener();
1367   options.listeners.emplace_back(listener);
1368   DestroyAndReopen(options);
1369   // make sure all background compaction jobs can be scheduled
1370   auto stop_token =
1371       dbfull()->TEST_write_controler().GetCompactionPressureToken();
1372 
1373   Random rnd(301);
1374 
1375   // 4 Files in L0
1376   for (int i = 0; i < 4; i++) {
1377     for (int j = 0; j < 10; j++) {
1378       ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
1379     }
1380     ASSERT_OK(Flush());
1381   }
1382 
1383   // Wait for compaction to be triggered
1384   TEST_SYNC_POINT("DBTest2::CompactionStall:0");
1385 
1386   // Clear "DBImpl::BGWorkCompaction" SYNC_POINT since we want to hold it again
1387   // at DBTest2::CompactionStall::1
1388   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
1389 
1390   // Another 6 L0 files to trigger compaction again
1391   for (int i = 0; i < 6; i++) {
1392     for (int j = 0; j < 10; j++) {
1393       ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
1394     }
1395     ASSERT_OK(Flush());
1396   }
1397 
1398   // Wait for another compaction to be triggered
1399   TEST_SYNC_POINT("DBTest2::CompactionStall:1");
1400 
1401   // Hold NotifyOnCompactionBegin in the unlock mutex section
1402   TEST_SYNC_POINT("DBTest2::CompactionStall:2");
1403 
1404   // Hold NotifyOnCompactionCompleted in the unlock mutex section
1405   TEST_SYNC_POINT("DBTest2::CompactionStall:3");
1406 
1407   dbfull()->TEST_WaitForCompact();
1408   ASSERT_LT(NumTableFilesAtLevel(0),
1409             options.level0_file_num_compaction_trigger);
1410   ASSERT_GT(listener->compacted_files_cnt_.load(),
1411             10 - options.level0_file_num_compaction_trigger);
1412   ASSERT_EQ(listener->compacting_files_cnt_.load(), listener->compacted_files_cnt_.load());
1413 
1414   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1415 }
1416 
1417 #endif  // ROCKSDB_LITE
1418 
TEST_F(DBTest2,FirstSnapshotTest)1419 TEST_F(DBTest2, FirstSnapshotTest) {
1420   Options options;
1421   options.write_buffer_size = 100000;  // Small write buffer
1422   options = CurrentOptions(options);
1423   CreateAndReopenWithCF({"pikachu"}, options);
1424 
1425   // This snapshot will have sequence number 0 what is expected behaviour.
1426   const Snapshot* s1 = db_->GetSnapshot();
1427 
1428   Put(1, "k1", std::string(100000, 'x'));  // Fill memtable
1429   Put(1, "k2", std::string(100000, 'y'));  // Trigger flush
1430 
1431   db_->ReleaseSnapshot(s1);
1432 }
1433 
1434 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,DuplicateSnapshot)1435 TEST_F(DBTest2, DuplicateSnapshot) {
1436   Options options;
1437   options = CurrentOptions(options);
1438   std::vector<const Snapshot*> snapshots;
1439   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
1440   SequenceNumber oldest_ww_snap, first_ww_snap;
1441 
1442   Put("k", "v");  // inc seq
1443   snapshots.push_back(db_->GetSnapshot());
1444   snapshots.push_back(db_->GetSnapshot());
1445   Put("k", "v");  // inc seq
1446   snapshots.push_back(db_->GetSnapshot());
1447   snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
1448   first_ww_snap = snapshots.back()->GetSequenceNumber();
1449   Put("k", "v");  // inc seq
1450   snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
1451   snapshots.push_back(db_->GetSnapshot());
1452   Put("k", "v");  // inc seq
1453   snapshots.push_back(db_->GetSnapshot());
1454 
1455   {
1456     InstrumentedMutexLock l(dbi->mutex());
1457     auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap);
1458     ASSERT_EQ(seqs.size(), 4);  // duplicates are not counted
1459     ASSERT_EQ(oldest_ww_snap, first_ww_snap);
1460   }
1461 
1462   for (auto s : snapshots) {
1463     db_->ReleaseSnapshot(s);
1464   }
1465 }
1466 #endif  // ROCKSDB_LITE
1467 
1468 class PinL0IndexAndFilterBlocksTest
1469     : public DBTestBase,
1470       public testing::WithParamInterface<std::tuple<bool, bool>> {
1471  public:
PinL0IndexAndFilterBlocksTest()1472   PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {}
SetUp()1473   void SetUp() override {
1474     infinite_max_files_ = std::get<0>(GetParam());
1475     disallow_preload_ = std::get<1>(GetParam());
1476   }
1477 
CreateTwoLevels(Options * options,bool close_afterwards)1478   void CreateTwoLevels(Options* options, bool close_afterwards) {
1479     if (infinite_max_files_) {
1480       options->max_open_files = -1;
1481     }
1482     options->create_if_missing = true;
1483     options->statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1484     BlockBasedTableOptions table_options;
1485     table_options.cache_index_and_filter_blocks = true;
1486     table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1487     table_options.filter_policy.reset(NewBloomFilterPolicy(20));
1488     options->table_factory.reset(new BlockBasedTableFactory(table_options));
1489     CreateAndReopenWithCF({"pikachu"}, *options);
1490 
1491     Put(1, "a", "begin");
1492     Put(1, "z", "end");
1493     ASSERT_OK(Flush(1));
1494     // move this table to L1
1495     dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
1496 
1497     // reset block cache
1498     table_options.block_cache = NewLRUCache(64 * 1024);
1499     options->table_factory.reset(NewBlockBasedTableFactory(table_options));
1500     TryReopenWithColumnFamilies({"default", "pikachu"}, *options);
1501     // create new table at L0
1502     Put(1, "a2", "begin2");
1503     Put(1, "z2", "end2");
1504     ASSERT_OK(Flush(1));
1505 
1506     if (close_afterwards) {
1507       Close();  // This ensures that there is no ref to block cache entries
1508     }
1509     table_options.block_cache->EraseUnRefEntries();
1510   }
1511 
1512   bool infinite_max_files_;
1513   bool disallow_preload_;
1514 };
1515 
TEST_P(PinL0IndexAndFilterBlocksTest,IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning)1516 TEST_P(PinL0IndexAndFilterBlocksTest,
1517        IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) {
1518   Options options = CurrentOptions();
1519   if (infinite_max_files_) {
1520     options.max_open_files = -1;
1521   }
1522   options.create_if_missing = true;
1523   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1524   BlockBasedTableOptions table_options;
1525   table_options.cache_index_and_filter_blocks = true;
1526   table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1527   table_options.filter_policy.reset(NewBloomFilterPolicy(20));
1528   options.table_factory.reset(new BlockBasedTableFactory(table_options));
1529   CreateAndReopenWithCF({"pikachu"}, options);
1530 
1531   ASSERT_OK(Put(1, "key", "val"));
1532   // Create a new table.
1533   ASSERT_OK(Flush(1));
1534 
1535   // index/filter blocks added to block cache right after table creation.
1536   ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1537   ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1538   ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1539   ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1540 
1541   // only index/filter were added
1542   ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD));
1543   ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
1544 
1545   std::string value;
1546   // Miss and hit count should remain the same, they're all pinned.
1547   db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
1548   ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1549   ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1550   ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1551   ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1552 
1553   // Miss and hit count should remain the same, they're all pinned.
1554   value = Get(1, "key");
1555   ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1556   ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1557   ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1558   ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1559 }
1560 
TEST_P(PinL0IndexAndFilterBlocksTest,MultiLevelIndexAndFilterBlocksCachedWithPinning)1561 TEST_P(PinL0IndexAndFilterBlocksTest,
1562        MultiLevelIndexAndFilterBlocksCachedWithPinning) {
1563   Options options = CurrentOptions();
1564   PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, false);
1565   // get base cache values
1566   uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
1567   uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
1568   uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
1569   uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
1570 
1571   std::string value;
1572   // this should be read from L0
1573   // so cache values don't change
1574   value = Get(1, "a2");
1575   ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1576   ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1577   ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1578   ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1579 
1580   // this should be read from L1
1581   // the file is opened, prefetching results in a cache filter miss
1582   // the block is loaded and added to the cache,
1583   // then the get results in a cache hit for L1
1584   // When we have inifinite max_files, there is still cache miss because we have
1585   // reset the block cache
1586   value = Get(1, "a");
1587   ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1588   ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1589 }
1590 
TEST_P(PinL0IndexAndFilterBlocksTest,DisablePrefetchingNonL0IndexAndFilter)1591 TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
1592   Options options = CurrentOptions();
1593   // This ensures that db does not ref anything in the block cache, so
1594   // EraseUnRefEntries could clear them up.
1595   bool close_afterwards = true;
1596   PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, close_afterwards);
1597 
1598   // Get base cache values
1599   uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
1600   uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
1601   uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
1602   uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
1603 
1604   if (disallow_preload_) {
1605     // Now we have two files. We narrow the max open files to allow 3 entries
1606     // so that preloading SST files won't happen.
1607     options.max_open_files = 13;
1608     // RocksDB sanitize max open files to at least 20. Modify it back.
1609     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1610         "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
1611           int* max_open_files = static_cast<int*>(arg);
1612           *max_open_files = 13;
1613         });
1614   }
1615   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1616 
1617   // Reopen database. If max_open_files is set as -1, table readers will be
1618   // preloaded. This will trigger a BlockBasedTable::Open() and prefetch
1619   // L0 index and filter. Level 1's prefetching is disabled in DB::Open()
1620   TryReopenWithColumnFamilies({"default", "pikachu"}, options);
1621 
1622   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1623 
1624   if (!disallow_preload_) {
1625     // After reopen, cache miss are increased by one because we read (and only
1626     // read) filter and index on L0
1627     ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1628     ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1629     ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1630     ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1631   } else {
1632     // If max_open_files is not -1, we do not preload table readers, so there is
1633     // no change.
1634     ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1635     ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1636     ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1637     ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1638   }
1639   std::string value;
1640   // this should be read from L0
1641   value = Get(1, "a2");
1642   // If max_open_files is -1, we have pinned index and filter in Rep, so there
1643   // will not be changes in index and filter misses or hits. If max_open_files
1644   // is not -1, Get() will open a TableReader and prefetch index and filter.
1645   ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1646   ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1647   ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1648   ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1649 
1650   // this should be read from L1
1651   value = Get(1, "a");
1652   if (!disallow_preload_) {
1653     // In inifinite max files case, there's a cache miss in executing Get()
1654     // because index and filter are not prefetched before.
1655     ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1656     ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1657     ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1658     ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1659   } else {
1660     // In this case, cache miss will be increased by one in
1661     // BlockBasedTable::Open() because this is not in DB::Open() code path so we
1662     // will prefetch L1's index and filter. Cache hit will also be increased by
1663     // one because Get() will read index and filter from the block cache
1664     // prefetched in previous Open() call.
1665     ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1666     ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1667     ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1668     ASSERT_EQ(ih + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1669   }
1670 
1671   // Force a full compaction to one single file. There will be a block
1672   // cache read for both of index and filter. If prefetch doesn't explicitly
1673   // happen, it will happen when verifying the file.
1674   Compact(1, "a", "zzzzz");
1675   dbfull()->TEST_WaitForCompact();
1676 
1677   if (!disallow_preload_) {
1678     ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1679     ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1680     ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1681     ASSERT_EQ(ih + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1682   } else {
1683     ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1684     ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1685     ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1686     ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1687   }
1688 
1689   // Bloom and index hit will happen when a Get() happens.
1690   value = Get(1, "a");
1691   if (!disallow_preload_) {
1692     ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1693     ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1694     ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1695     ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1696   } else {
1697     ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1698     ASSERT_EQ(fh + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1699     ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1700     ASSERT_EQ(ih + 4, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1701   }
1702 }
1703 
1704 INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
1705                         PinL0IndexAndFilterBlocksTest,
1706                         ::testing::Values(std::make_tuple(true, false),
1707                                           std::make_tuple(false, false),
1708                                           std::make_tuple(false, true)));
1709 
1710 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,MaxCompactionBytesTest)1711 TEST_F(DBTest2, MaxCompactionBytesTest) {
1712   Options options = CurrentOptions();
1713   options.memtable_factory.reset(
1714       new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
1715   options.compaction_style = kCompactionStyleLevel;
1716   options.write_buffer_size = 200 << 10;
1717   options.arena_block_size = 4 << 10;
1718   options.level0_file_num_compaction_trigger = 4;
1719   options.num_levels = 4;
1720   options.compression = kNoCompression;
1721   options.max_bytes_for_level_base = 450 << 10;
1722   options.target_file_size_base = 100 << 10;
1723   // Infinite for full compaction.
1724   options.max_compaction_bytes = options.target_file_size_base * 100;
1725 
1726   Reopen(options);
1727 
1728   Random rnd(301);
1729 
1730   for (int num = 0; num < 8; num++) {
1731     GenerateNewRandomFile(&rnd);
1732   }
1733   CompactRangeOptions cro;
1734   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
1735   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
1736   ASSERT_EQ("0,0,8", FilesPerLevel(0));
1737 
1738   // When compact from Ln -> Ln+1, cut a file if the file overlaps with
1739   // more than three files in Ln+1.
1740   options.max_compaction_bytes = options.target_file_size_base * 3;
1741   Reopen(options);
1742 
1743   GenerateNewRandomFile(&rnd);
1744   // Add three more small files that overlap with the previous file
1745   for (int i = 0; i < 3; i++) {
1746     Put("a", "z");
1747     ASSERT_OK(Flush());
1748   }
1749   dbfull()->TEST_WaitForCompact();
1750 
1751   // Output files to L1 are cut to three pieces, according to
1752   // options.max_compaction_bytes
1753   ASSERT_EQ("0,3,8", FilesPerLevel(0));
1754 }
1755 
UniqueIdCallback(void * arg)1756 static void UniqueIdCallback(void* arg) {
1757   int* result = reinterpret_cast<int*>(arg);
1758   if (*result == -1) {
1759     *result = 0;
1760   }
1761 
1762   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
1763   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1764       "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
1765 }
1766 
1767 class MockPersistentCache : public PersistentCache {
1768  public:
MockPersistentCache(const bool is_compressed,const size_t max_size)1769   explicit MockPersistentCache(const bool is_compressed, const size_t max_size)
1770       : is_compressed_(is_compressed), max_size_(max_size) {
1771     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1772     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1773         "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
1774   }
1775 
~MockPersistentCache()1776   ~MockPersistentCache() override {}
1777 
Stats()1778   PersistentCache::StatsType Stats() override {
1779     return PersistentCache::StatsType();
1780   }
1781 
Insert(const Slice & page_key,const char * data,const size_t size)1782   Status Insert(const Slice& page_key, const char* data,
1783                 const size_t size) override {
1784     MutexLock _(&lock_);
1785 
1786     if (size_ > max_size_) {
1787       size_ -= data_.begin()->second.size();
1788       data_.erase(data_.begin());
1789     }
1790 
1791     data_.insert(std::make_pair(page_key.ToString(), std::string(data, size)));
1792     size_ += size;
1793     return Status::OK();
1794   }
1795 
Lookup(const Slice & page_key,std::unique_ptr<char[]> * data,size_t * size)1796   Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
1797                 size_t* size) override {
1798     MutexLock _(&lock_);
1799     auto it = data_.find(page_key.ToString());
1800     if (it == data_.end()) {
1801       return Status::NotFound();
1802     }
1803 
1804     assert(page_key.ToString() == it->first);
1805     data->reset(new char[it->second.size()]);
1806     memcpy(data->get(), it->second.c_str(), it->second.size());
1807     *size = it->second.size();
1808     return Status::OK();
1809   }
1810 
IsCompressed()1811   bool IsCompressed() override { return is_compressed_; }
1812 
GetPrintableOptions() const1813   std::string GetPrintableOptions() const override {
1814     return "MockPersistentCache";
1815   }
1816 
1817   port::Mutex lock_;
1818   std::map<std::string, std::string> data_;
1819   const bool is_compressed_ = true;
1820   size_t size_ = 0;
1821   const size_t max_size_ = 10 * 1024;  // 10KiB
1822 };
1823 
1824 #ifdef OS_LINUX
1825 // Make sure that in CPU time perf context counters, Env::NowCPUNanos()
1826 // is used, rather than Env::CPUNanos();
TEST_F(DBTest2,TestPerfContextGetCpuTime)1827 TEST_F(DBTest2, TestPerfContextGetCpuTime) {
1828   // force resizing table cache so table handle is not preloaded so that
1829   // we can measure find_table_nanos during Get().
1830   dbfull()->TEST_table_cache()->SetCapacity(0);
1831   ASSERT_OK(Put("foo", "bar"));
1832   ASSERT_OK(Flush());
1833   env_->now_cpu_count_.store(0);
1834 
1835   // CPU timing is not enabled with kEnableTimeExceptForMutex
1836   SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
1837   ASSERT_EQ("bar", Get("foo"));
1838   ASSERT_EQ(0, get_perf_context()->get_cpu_nanos);
1839   ASSERT_EQ(0, env_->now_cpu_count_.load());
1840 
1841   uint64_t kDummyAddonTime = uint64_t{1000000000000};
1842 
1843   // Add time to NowNanos() reading.
1844   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1845       "TableCache::FindTable:0",
1846       [&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); });
1847   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1848 
1849   SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1850   ASSERT_EQ("bar", Get("foo"));
1851   ASSERT_GT(env_->now_cpu_count_.load(), 2);
1852   ASSERT_LT(get_perf_context()->get_cpu_nanos, kDummyAddonTime);
1853   ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime);
1854 
1855   SetPerfLevel(PerfLevel::kDisable);
1856   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1857 }
1858 
TEST_F(DBTest2,TestPerfContextIterCpuTime)1859 TEST_F(DBTest2, TestPerfContextIterCpuTime) {
1860   DestroyAndReopen(CurrentOptions());
1861   // force resizing table cache so table handle is not preloaded so that
1862   // we can measure find_table_nanos during iteration
1863   dbfull()->TEST_table_cache()->SetCapacity(0);
1864 
1865   const size_t kNumEntries = 10;
1866   for (size_t i = 0; i < kNumEntries; ++i) {
1867     ASSERT_OK(Put("k" + ToString(i), "v" + ToString(i)));
1868   }
1869   ASSERT_OK(Flush());
1870   for (size_t i = 0; i < kNumEntries; ++i) {
1871     ASSERT_EQ("v" + ToString(i), Get("k" + ToString(i)));
1872   }
1873   std::string last_key = "k" + ToString(kNumEntries - 1);
1874   std::string last_value = "v" + ToString(kNumEntries - 1);
1875   env_->now_cpu_count_.store(0);
1876 
1877   // CPU timing is not enabled with kEnableTimeExceptForMutex
1878   SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
1879   Iterator* iter = db_->NewIterator(ReadOptions());
1880   iter->Seek("k0");
1881   ASSERT_TRUE(iter->Valid());
1882   ASSERT_EQ("v0", iter->value().ToString());
1883   iter->SeekForPrev(last_key);
1884   ASSERT_TRUE(iter->Valid());
1885   iter->SeekToLast();
1886   ASSERT_TRUE(iter->Valid());
1887   ASSERT_EQ(last_value, iter->value().ToString());
1888   iter->SeekToFirst();
1889   ASSERT_TRUE(iter->Valid());
1890   ASSERT_EQ("v0", iter->value().ToString());
1891   ASSERT_EQ(0, get_perf_context()->iter_seek_cpu_nanos);
1892   iter->Next();
1893   ASSERT_TRUE(iter->Valid());
1894   ASSERT_EQ("v1", iter->value().ToString());
1895   ASSERT_EQ(0, get_perf_context()->iter_next_cpu_nanos);
1896   iter->Prev();
1897   ASSERT_TRUE(iter->Valid());
1898   ASSERT_EQ("v0", iter->value().ToString());
1899   ASSERT_EQ(0, get_perf_context()->iter_prev_cpu_nanos);
1900   ASSERT_EQ(0, env_->now_cpu_count_.load());
1901   delete iter;
1902 
1903   uint64_t kDummyAddonTime = uint64_t{1000000000000};
1904 
1905   // Add time to NowNanos() reading.
1906   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1907       "TableCache::FindTable:0",
1908       [&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); });
1909   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1910 
1911   SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1912   iter = db_->NewIterator(ReadOptions());
1913   iter->Seek("k0");
1914   ASSERT_TRUE(iter->Valid());
1915   ASSERT_EQ("v0", iter->value().ToString());
1916   iter->SeekForPrev(last_key);
1917   ASSERT_TRUE(iter->Valid());
1918   iter->SeekToLast();
1919   ASSERT_TRUE(iter->Valid());
1920   ASSERT_EQ(last_value, iter->value().ToString());
1921   iter->SeekToFirst();
1922   ASSERT_TRUE(iter->Valid());
1923   ASSERT_EQ("v0", iter->value().ToString());
1924   ASSERT_GT(get_perf_context()->iter_seek_cpu_nanos, 0);
1925   ASSERT_LT(get_perf_context()->iter_seek_cpu_nanos, kDummyAddonTime);
1926   iter->Next();
1927   ASSERT_TRUE(iter->Valid());
1928   ASSERT_EQ("v1", iter->value().ToString());
1929   ASSERT_GT(get_perf_context()->iter_next_cpu_nanos, 0);
1930   ASSERT_LT(get_perf_context()->iter_next_cpu_nanos, kDummyAddonTime);
1931   iter->Prev();
1932   ASSERT_TRUE(iter->Valid());
1933   ASSERT_EQ("v0", iter->value().ToString());
1934   ASSERT_GT(get_perf_context()->iter_prev_cpu_nanos, 0);
1935   ASSERT_LT(get_perf_context()->iter_prev_cpu_nanos, kDummyAddonTime);
1936   ASSERT_GE(env_->now_cpu_count_.load(), 12);
1937   ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime);
1938 
1939   SetPerfLevel(PerfLevel::kDisable);
1940   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1941   delete iter;
1942 }
1943 #endif  // OS_LINUX
1944 
1945 // GetUniqueIdFromFile is not implemented on these platforms. Persistent cache
1946 // breaks when that function is not implemented and no regular block cache is
1947 // provided.
1948 #if !defined(OS_SOLARIS) && !defined(OS_WIN)
TEST_F(DBTest2,PersistentCache)1949 TEST_F(DBTest2, PersistentCache) {
1950   int num_iter = 80;
1951 
1952   Options options;
1953   options.write_buffer_size = 64 * 1024;  // small write buffer
1954   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1955   options = CurrentOptions(options);
1956 
1957   auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024};
1958   auto types = {/*compressed*/ 1, /*uncompressed*/ 0};
1959   for (auto bsize : bsizes) {
1960     for (auto type : types) {
1961       BlockBasedTableOptions table_options;
1962       table_options.persistent_cache.reset(
1963           new MockPersistentCache(type, 10 * 1024));
1964       table_options.no_block_cache = true;
1965       table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr;
1966       table_options.block_cache_compressed = nullptr;
1967       options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1968 
1969       DestroyAndReopen(options);
1970       CreateAndReopenWithCF({"pikachu"}, options);
1971       // default column family doesn't have block cache
1972       Options no_block_cache_opts;
1973       no_block_cache_opts.statistics = options.statistics;
1974       no_block_cache_opts = CurrentOptions(no_block_cache_opts);
1975       BlockBasedTableOptions table_options_no_bc;
1976       table_options_no_bc.no_block_cache = true;
1977       no_block_cache_opts.table_factory.reset(
1978           NewBlockBasedTableFactory(table_options_no_bc));
1979       ReopenWithColumnFamilies(
1980           {"default", "pikachu"},
1981           std::vector<Options>({no_block_cache_opts, options}));
1982 
1983       Random rnd(301);
1984 
1985       // Write 8MB (80 values, each 100K)
1986       ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1987       std::vector<std::string> values;
1988       std::string str;
1989       for (int i = 0; i < num_iter; i++) {
1990         if (i % 4 == 0) {  // high compression ratio
1991           str = RandomString(&rnd, 1000);
1992         }
1993         values.push_back(str);
1994         ASSERT_OK(Put(1, Key(i), values[i]));
1995       }
1996 
1997       // flush all data from memtable so that reads are from block cache
1998       ASSERT_OK(Flush(1));
1999 
2000       for (int i = 0; i < num_iter; i++) {
2001         ASSERT_EQ(Get(1, Key(i)), values[i]);
2002       }
2003 
2004       auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT);
2005       auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS);
2006 
2007       ASSERT_GT(hit, 0);
2008       ASSERT_GT(miss, 0);
2009     }
2010   }
2011 }
2012 #endif  // !defined(OS_SOLARIS) && !defined(OS_WIN)
2013 
2014 namespace {
CountSyncPoint()2015 void CountSyncPoint() {
2016   TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */);
2017 }
2018 }  // namespace
2019 
TEST_F(DBTest2,SyncPointMarker)2020 TEST_F(DBTest2, SyncPointMarker) {
2021   std::atomic<int> sync_point_called(0);
2022   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2023       "DBTest2::MarkedPoint",
2024       [&](void* /*arg*/) { sync_point_called.fetch_add(1); });
2025 
2026   // The first dependency enforces Marker can be loaded before MarkedPoint.
2027   // The second checks that thread 1's MarkedPoint should be disabled here.
2028   // Execution order:
2029   // |   Thread 1    |  Thread 2   |
2030   // |               |   Marker    |
2031   // |  MarkedPoint  |             |
2032   // | Thread1First  |             |
2033   // |               | MarkedPoint |
2034   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
2035       {{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}},
2036       {{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}});
2037 
2038   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2039 
2040   std::function<void()> func1 = [&]() {
2041     CountSyncPoint();
2042     TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First");
2043   };
2044 
2045   std::function<void()> func2 = [&]() {
2046     TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker");
2047     CountSyncPoint();
2048   };
2049 
2050   auto thread1 = port::Thread(func1);
2051   auto thread2 = port::Thread(func2);
2052   thread1.join();
2053   thread2.join();
2054 
2055   // Callback is only executed once
2056   ASSERT_EQ(sync_point_called.load(), 1);
2057   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2058 }
2059 #endif
2060 
GetEncodedEntrySize(size_t key_size,size_t value_size)2061 size_t GetEncodedEntrySize(size_t key_size, size_t value_size) {
2062   std::string buffer;
2063 
2064   PutVarint32(&buffer, static_cast<uint32_t>(0));
2065   PutVarint32(&buffer, static_cast<uint32_t>(key_size));
2066   PutVarint32(&buffer, static_cast<uint32_t>(value_size));
2067 
2068   return buffer.size() + key_size + value_size;
2069 }
2070 
TEST_F(DBTest2,ReadAmpBitmap)2071 TEST_F(DBTest2, ReadAmpBitmap) {
2072   Options options = CurrentOptions();
2073   BlockBasedTableOptions bbto;
2074   uint32_t bytes_per_bit[2] = {1, 16};
2075   for (size_t k = 0; k < 2; k++) {
2076     // Disable delta encoding to make it easier to calculate read amplification
2077     bbto.use_delta_encoding = false;
2078     // Huge block cache to make it easier to calculate read amplification
2079     bbto.block_cache = NewLRUCache(1024 * 1024 * 1024);
2080     bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
2081     options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2082     options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2083     DestroyAndReopen(options);
2084 
2085     const size_t kNumEntries = 10000;
2086 
2087     Random rnd(301);
2088     for (size_t i = 0; i < kNumEntries; i++) {
2089       ASSERT_OK(Put(Key(static_cast<int>(i)), RandomString(&rnd, 100)));
2090     }
2091     ASSERT_OK(Flush());
2092 
2093     Close();
2094     Reopen(options);
2095 
2096     // Read keys/values randomly and verify that reported read amp error
2097     // is less than 2%
2098     uint64_t total_useful_bytes = 0;
2099     std::set<int> read_keys;
2100     std::string value;
2101     for (size_t i = 0; i < kNumEntries * 5; i++) {
2102       int key_idx = rnd.Next() % kNumEntries;
2103       std::string key = Key(key_idx);
2104       ASSERT_OK(db_->Get(ReadOptions(), key, &value));
2105 
2106       if (read_keys.find(key_idx) == read_keys.end()) {
2107         auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
2108         total_useful_bytes +=
2109             GetEncodedEntrySize(internal_key.size(), value.size());
2110         read_keys.insert(key_idx);
2111       }
2112 
2113       double expected_read_amp =
2114           static_cast<double>(total_useful_bytes) /
2115           options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
2116 
2117       double read_amp =
2118           static_cast<double>(options.statistics->getTickerCount(
2119               READ_AMP_ESTIMATE_USEFUL_BYTES)) /
2120           options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
2121 
2122       double error_pct = fabs(expected_read_amp - read_amp) * 100;
2123       // Error between reported read amp and real read amp should be less than
2124       // 2%
2125       EXPECT_LE(error_pct, 2);
2126     }
2127 
2128     // Make sure we read every thing in the DB (which is smaller than our cache)
2129     Iterator* iter = db_->NewIterator(ReadOptions());
2130     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2131       ASSERT_EQ(iter->value().ToString(), Get(iter->key().ToString()));
2132     }
2133     delete iter;
2134 
2135     // Read amp is on average 100% since we read all what we loaded in memory
2136     if (k == 0) {
2137       ASSERT_EQ(
2138           options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES),
2139           options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES));
2140     } else {
2141       ASSERT_NEAR(
2142           options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES) *
2143               1.0f /
2144               options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES),
2145           1, .01);
2146     }
2147   }
2148 }
2149 
2150 #ifndef OS_SOLARIS // GetUniqueIdFromFile is not implemented
TEST_F(DBTest2,ReadAmpBitmapLiveInCacheAfterDBClose)2151 TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
2152   {
2153     const int kIdBufLen = 100;
2154     char id_buf[kIdBufLen];
2155 #ifndef OS_WIN
2156     // You can't open a directory on windows using random access file
2157     std::unique_ptr<RandomAccessFile> file;
2158     ASSERT_OK(env_->NewRandomAccessFile(dbname_, &file, EnvOptions()));
2159     if (file->GetUniqueId(id_buf, kIdBufLen) == 0) {
2160       // fs holding db directory doesn't support getting a unique file id,
2161       // this means that running this test will fail because lru_cache will load
2162       // the blocks again regardless of them being already in the cache
2163       return;
2164     }
2165 #else
2166     std::unique_ptr<Directory> dir;
2167     ASSERT_OK(env_->NewDirectory(dbname_, &dir));
2168     if (dir->GetUniqueId(id_buf, kIdBufLen) == 0) {
2169       // fs holding db directory doesn't support getting a unique file id,
2170       // this means that running this test will fail because lru_cache will load
2171       // the blocks again regardless of them being already in the cache
2172       return;
2173     }
2174 #endif
2175   }
2176   uint32_t bytes_per_bit[2] = {1, 16};
2177   for (size_t k = 0; k < 2; k++) {
2178     std::shared_ptr<Cache> lru_cache = NewLRUCache(1024 * 1024 * 1024);
2179     std::shared_ptr<Statistics> stats = ROCKSDB_NAMESPACE::CreateDBStatistics();
2180 
2181     Options options = CurrentOptions();
2182     BlockBasedTableOptions bbto;
2183     // Disable delta encoding to make it easier to calculate read amplification
2184     bbto.use_delta_encoding = false;
2185     // Huge block cache to make it easier to calculate read amplification
2186     bbto.block_cache = lru_cache;
2187     bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
2188     options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2189     options.statistics = stats;
2190     DestroyAndReopen(options);
2191 
2192     const int kNumEntries = 10000;
2193 
2194     Random rnd(301);
2195     for (int i = 0; i < kNumEntries; i++) {
2196       ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
2197     }
2198     ASSERT_OK(Flush());
2199 
2200     Close();
2201     Reopen(options);
2202 
2203     uint64_t total_useful_bytes = 0;
2204     std::set<int> read_keys;
2205     std::string value;
2206     // Iter1: Read half the DB, Read even keys
2207     // Key(0), Key(2), Key(4), Key(6), Key(8), ...
2208     for (int i = 0; i < kNumEntries; i += 2) {
2209       std::string key = Key(i);
2210       ASSERT_OK(db_->Get(ReadOptions(), key, &value));
2211 
2212       if (read_keys.find(i) == read_keys.end()) {
2213         auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
2214         total_useful_bytes +=
2215             GetEncodedEntrySize(internal_key.size(), value.size());
2216         read_keys.insert(i);
2217       }
2218     }
2219 
2220     size_t total_useful_bytes_iter1 =
2221         options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
2222     size_t total_loaded_bytes_iter1 =
2223         options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
2224 
2225     Close();
2226     std::shared_ptr<Statistics> new_statistics =
2227         ROCKSDB_NAMESPACE::CreateDBStatistics();
2228     // Destroy old statistics obj that the blocks in lru_cache are pointing to
2229     options.statistics.reset();
2230     // Use the statistics object that we just created
2231     options.statistics = new_statistics;
2232     Reopen(options);
2233 
2234     // Iter2: Read half the DB, Read odd keys
2235     // Key(1), Key(3), Key(5), Key(7), Key(9), ...
2236     for (int i = 1; i < kNumEntries; i += 2) {
2237       std::string key = Key(i);
2238       ASSERT_OK(db_->Get(ReadOptions(), key, &value));
2239 
2240       if (read_keys.find(i) == read_keys.end()) {
2241         auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
2242         total_useful_bytes +=
2243             GetEncodedEntrySize(internal_key.size(), value.size());
2244         read_keys.insert(i);
2245       }
2246     }
2247 
2248     size_t total_useful_bytes_iter2 =
2249         options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
2250     size_t total_loaded_bytes_iter2 =
2251         options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
2252 
2253 
2254     // Read amp is on average 100% since we read all what we loaded in memory
2255     if (k == 0) {
2256       ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
2257                 total_loaded_bytes_iter1 + total_loaded_bytes_iter2);
2258     } else {
2259       ASSERT_NEAR((total_useful_bytes_iter1 + total_useful_bytes_iter2) * 1.0f /
2260                       (total_loaded_bytes_iter1 + total_loaded_bytes_iter2),
2261                   1, .01);
2262     }
2263   }
2264 }
2265 #endif // !OS_SOLARIS
2266 
2267 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,AutomaticCompactionOverlapManualCompaction)2268 TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) {
2269   Options options = CurrentOptions();
2270   options.num_levels = 3;
2271   options.IncreaseParallelism(20);
2272   DestroyAndReopen(options);
2273 
2274   ASSERT_OK(Put(Key(0), "a"));
2275   ASSERT_OK(Put(Key(5), "a"));
2276   ASSERT_OK(Flush());
2277 
2278   ASSERT_OK(Put(Key(10), "a"));
2279   ASSERT_OK(Put(Key(15), "a"));
2280   ASSERT_OK(Flush());
2281 
2282   CompactRangeOptions cro;
2283   cro.change_level = true;
2284   cro.target_level = 2;
2285   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2286 
2287   auto get_stat = [](std::string level_str, LevelStatType type,
2288                      std::map<std::string, std::string> props) {
2289     auto prop_str =
2290         "compaction." + level_str + "." +
2291         InternalStats::compaction_level_stats.at(type).property_name.c_str();
2292     auto prop_item = props.find(prop_str);
2293     return prop_item == props.end() ? 0 : std::stod(prop_item->second);
2294   };
2295 
2296   // Trivial move 2 files to L2
2297   ASSERT_EQ("0,0,2", FilesPerLevel());
2298   // Also test that the stats GetMapProperty API reporting the same result
2299   {
2300     std::map<std::string, std::string> prop;
2301     ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
2302     ASSERT_EQ(0, get_stat("L0", LevelStatType::NUM_FILES, prop));
2303     ASSERT_EQ(0, get_stat("L1", LevelStatType::NUM_FILES, prop));
2304     ASSERT_EQ(2, get_stat("L2", LevelStatType::NUM_FILES, prop));
2305     ASSERT_EQ(2, get_stat("Sum", LevelStatType::NUM_FILES, prop));
2306   }
2307 
2308   // While the compaction is running, we will create 2 new files that
2309   // can fit in L2, these 2 files will be moved to L2 and overlap with
2310   // the running compaction and break the LSM consistency.
2311   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2312       "CompactionJob::Run():Start", [&](void* /*arg*/) {
2313         ASSERT_OK(
2314             dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
2315                                   {"max_bytes_for_level_base", "1"}}));
2316         ASSERT_OK(Put(Key(6), "a"));
2317         ASSERT_OK(Put(Key(7), "a"));
2318         ASSERT_OK(Flush());
2319 
2320         ASSERT_OK(Put(Key(8), "a"));
2321         ASSERT_OK(Put(Key(9), "a"));
2322         ASSERT_OK(Flush());
2323       });
2324   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2325 
2326   // Run a manual compaction that will compact the 2 files in L2
2327   // into 1 file in L2
2328   cro.exclusive_manual_compaction = false;
2329   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
2330   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2331 
2332   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2333 
2334   // Test that the stats GetMapProperty API reporting 1 file in L2
2335   {
2336     std::map<std::string, std::string> prop;
2337     ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
2338     ASSERT_EQ(1, get_stat("L2", LevelStatType::NUM_FILES, prop));
2339   }
2340 }
2341 
TEST_F(DBTest2,ManualCompactionOverlapManualCompaction)2342 TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
2343   Options options = CurrentOptions();
2344   options.num_levels = 2;
2345   options.IncreaseParallelism(20);
2346   options.disable_auto_compactions = true;
2347   DestroyAndReopen(options);
2348 
2349   ASSERT_OK(Put(Key(0), "a"));
2350   ASSERT_OK(Put(Key(5), "a"));
2351   ASSERT_OK(Flush());
2352 
2353   ASSERT_OK(Put(Key(10), "a"));
2354   ASSERT_OK(Put(Key(15), "a"));
2355   ASSERT_OK(Flush());
2356 
2357   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2358 
2359   // Trivial move 2 files to L1
2360   ASSERT_EQ("0,2", FilesPerLevel());
2361 
2362   std::function<void()> bg_manual_compact = [&]() {
2363     std::string k1 = Key(6);
2364     std::string k2 = Key(9);
2365     Slice k1s(k1);
2366     Slice k2s(k2);
2367     CompactRangeOptions cro;
2368     cro.exclusive_manual_compaction = false;
2369     ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s));
2370   };
2371   ROCKSDB_NAMESPACE::port::Thread bg_thread;
2372 
2373   // While the compaction is running, we will create 2 new files that
2374   // can fit in L1, these 2 files will be moved to L1 and overlap with
2375   // the running compaction and break the LSM consistency.
2376   std::atomic<bool> flag(false);
2377   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2378       "CompactionJob::Run():Start", [&](void* /*arg*/) {
2379         if (flag.exchange(true)) {
2380           // We want to make sure to call this callback only once
2381           return;
2382         }
2383         ASSERT_OK(Put(Key(6), "a"));
2384         ASSERT_OK(Put(Key(7), "a"));
2385         ASSERT_OK(Flush());
2386 
2387         ASSERT_OK(Put(Key(8), "a"));
2388         ASSERT_OK(Put(Key(9), "a"));
2389         ASSERT_OK(Flush());
2390 
2391         // Start a non-exclusive manual compaction in a bg thread
2392         bg_thread = port::Thread(bg_manual_compact);
2393         // This manual compaction conflict with the other manual compaction
2394         // so it should wait until the first compaction finish
2395         env_->SleepForMicroseconds(1000000);
2396       });
2397   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2398 
2399   // Run a manual compaction that will compact the 2 files in L1
2400   // into 1 file in L1
2401   CompactRangeOptions cro;
2402   cro.exclusive_manual_compaction = false;
2403   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
2404   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2405   bg_thread.join();
2406 
2407   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2408 }
2409 
TEST_F(DBTest2,PausingManualCompaction1)2410 TEST_F(DBTest2, PausingManualCompaction1) {
2411   Options options = CurrentOptions();
2412   options.disable_auto_compactions = true;
2413   options.num_levels = 7;
2414 
2415   DestroyAndReopen(options);
2416   Random rnd(301);
2417   // Generate a file containing 10 keys.
2418   for (int i = 0; i < 10; i++) {
2419     ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
2420   }
2421   ASSERT_OK(Flush());
2422 
2423   // Generate another file containing same keys
2424   for (int i = 0; i < 10; i++) {
2425     ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
2426   }
2427   ASSERT_OK(Flush());
2428 
2429   int manual_compactions_paused = 0;
2430   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2431       "CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
2432         auto paused = reinterpret_cast<std::atomic<bool>*>(arg);
2433         ASSERT_FALSE(paused->load(std::memory_order_acquire));
2434         paused->store(true, std::memory_order_release);
2435         manual_compactions_paused += 1;
2436       });
2437   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2438 
2439   std::vector<std::string> files_before_compact, files_after_compact;
2440   // Remember file name before compaction is triggered
2441   std::vector<LiveFileMetaData> files_meta;
2442   dbfull()->GetLiveFilesMetaData(&files_meta);
2443   for (auto file : files_meta) {
2444     files_before_compact.push_back(file.name);
2445   }
2446 
2447   // OK, now trigger a manual compaction
2448   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2449 
2450   // Wait for compactions to get scheduled and stopped
2451   dbfull()->TEST_WaitForCompact(true);
2452 
2453   // Get file names after compaction is stopped
2454   files_meta.clear();
2455   dbfull()->GetLiveFilesMetaData(&files_meta);
2456   for (auto file : files_meta) {
2457     files_after_compact.push_back(file.name);
2458   }
2459 
2460   // Like nothing happened
2461   ASSERT_EQ(files_before_compact, files_after_compact);
2462   ASSERT_EQ(manual_compactions_paused, 1);
2463 
2464   manual_compactions_paused = 0;
2465   // Now make sure CompactFiles also not run
2466   dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
2467                          files_before_compact, 0);
2468   // Wait for manual compaction to get scheduled and finish
2469   dbfull()->TEST_WaitForCompact(true);
2470 
2471   files_meta.clear();
2472   files_after_compact.clear();
2473   dbfull()->GetLiveFilesMetaData(&files_meta);
2474   for (auto file : files_meta) {
2475     files_after_compact.push_back(file.name);
2476   }
2477 
2478   ASSERT_EQ(files_before_compact, files_after_compact);
2479   // CompactFiles returns at entry point
2480   ASSERT_EQ(manual_compactions_paused, 0);
2481 
2482   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2483 }
2484 
2485 // PausingManualCompaction does not affect auto compaction
TEST_F(DBTest2,PausingManualCompaction2)2486 TEST_F(DBTest2, PausingManualCompaction2) {
2487   Options options = CurrentOptions();
2488   options.level0_file_num_compaction_trigger = 2;
2489   options.disable_auto_compactions = false;
2490 
2491   DestroyAndReopen(options);
2492   dbfull()->DisableManualCompaction();
2493 
2494   Random rnd(301);
2495   for (int i = 0; i < 2; i++) {
2496     // Generate a file containing 10 keys.
2497     for (int j = 0; j < 100; j++) {
2498       ASSERT_OK(Put(Key(j), RandomString(&rnd, 50)));
2499     }
2500     ASSERT_OK(Flush());
2501   }
2502   ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
2503 
2504   std::vector<LiveFileMetaData> files_meta;
2505   dbfull()->GetLiveFilesMetaData(&files_meta);
2506   ASSERT_EQ(files_meta.size(), 1);
2507 }
2508 
TEST_F(DBTest2,PausingManualCompaction3)2509 TEST_F(DBTest2, PausingManualCompaction3) {
2510   CompactRangeOptions compact_options;
2511   Options options = CurrentOptions();
2512   options.disable_auto_compactions = true;
2513   options.num_levels = 7;
2514 
2515   Random rnd(301);
2516   auto generate_files = [&]() {
2517     for (int i = 0; i < options.num_levels; i++) {
2518       for (int j = 0; j < options.num_levels - i + 1; j++) {
2519         for (int k = 0; k < 1000; k++) {
2520           ASSERT_OK(Put(Key(k + j * 1000), RandomString(&rnd, 50)));
2521         }
2522         Flush();
2523       }
2524 
2525       for (int l = 1; l < options.num_levels - i; l++) {
2526         MoveFilesToLevel(l);
2527       }
2528     }
2529   };
2530 
2531   DestroyAndReopen(options);
2532   generate_files();
2533 #ifndef ROCKSDB_LITE
2534   ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
2535 #endif  // !ROCKSDB_LITE
2536   int run_manual_compactions = 0;
2537   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2538       "CompactionJob::Run():PausingManualCompaction:1",
2539       [&](void* /*arg*/) { run_manual_compactions++; });
2540   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2541 
2542   dbfull()->DisableManualCompaction();
2543   dbfull()->CompactRange(compact_options, nullptr, nullptr);
2544   dbfull()->TEST_WaitForCompact(true);
2545   // As manual compaction disabled, not even reach sync point
2546   ASSERT_EQ(run_manual_compactions, 0);
2547 #ifndef ROCKSDB_LITE
2548   ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
2549 #endif  // !ROCKSDB_LITE
2550 
2551   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
2552       "CompactionJob::Run():PausingManualCompaction:1");
2553   dbfull()->EnableManualCompaction();
2554   dbfull()->CompactRange(compact_options, nullptr, nullptr);
2555   dbfull()->TEST_WaitForCompact(true);
2556 #ifndef ROCKSDB_LITE
2557   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
2558 #endif  // !ROCKSDB_LITE
2559 
2560   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2561 }
2562 
TEST_F(DBTest2,PausingManualCompaction4)2563 TEST_F(DBTest2, PausingManualCompaction4) {
2564   CompactRangeOptions compact_options;
2565   Options options = CurrentOptions();
2566   options.disable_auto_compactions = true;
2567   options.num_levels = 7;
2568 
2569   Random rnd(301);
2570   auto generate_files = [&]() {
2571     for (int i = 0; i < options.num_levels; i++) {
2572       for (int j = 0; j < options.num_levels - i + 1; j++) {
2573         for (int k = 0; k < 1000; k++) {
2574           ASSERT_OK(Put(Key(k + j * 1000), RandomString(&rnd, 50)));
2575         }
2576         Flush();
2577       }
2578 
2579       for (int l = 1; l < options.num_levels - i; l++) {
2580         MoveFilesToLevel(l);
2581       }
2582     }
2583   };
2584 
2585   DestroyAndReopen(options);
2586   generate_files();
2587 #ifndef ROCKSDB_LITE
2588   ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
2589 #endif  // !ROCKSDB_LITE
2590   int run_manual_compactions = 0;
2591   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2592       "CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
2593         auto paused = reinterpret_cast<std::atomic<bool>*>(arg);
2594         ASSERT_FALSE(paused->load(std::memory_order_acquire));
2595         paused->store(true, std::memory_order_release);
2596         run_manual_compactions++;
2597       });
2598   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2599 
2600   dbfull()->EnableManualCompaction();
2601   dbfull()->CompactRange(compact_options, nullptr, nullptr);
2602   dbfull()->TEST_WaitForCompact(true);
2603   ASSERT_EQ(run_manual_compactions, 1);
2604 #ifndef ROCKSDB_LITE
2605   ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
2606 #endif  // !ROCKSDB_LITE
2607 
2608   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
2609       "CompactionJob::Run():PausingManualCompaction:2");
2610   dbfull()->EnableManualCompaction();
2611   dbfull()->CompactRange(compact_options, nullptr, nullptr);
2612   dbfull()->TEST_WaitForCompact(true);
2613 #ifndef ROCKSDB_LITE
2614   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
2615 #endif  // !ROCKSDB_LITE
2616 
2617   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2618 }
2619 
TEST_F(DBTest2,OptimizeForPointLookup)2620 TEST_F(DBTest2, OptimizeForPointLookup) {
2621   Options options = CurrentOptions();
2622   Close();
2623   options.OptimizeForPointLookup(2);
2624   ASSERT_OK(DB::Open(options, dbname_, &db_));
2625 
2626   ASSERT_OK(Put("foo", "v1"));
2627   ASSERT_EQ("v1", Get("foo"));
2628   Flush();
2629   ASSERT_EQ("v1", Get("foo"));
2630 }
2631 
TEST_F(DBTest2,OptimizeForSmallDB)2632 TEST_F(DBTest2, OptimizeForSmallDB) {
2633   Options options = CurrentOptions();
2634   Close();
2635   options.OptimizeForSmallDb();
2636 
2637   // Find the cache object
2638   ASSERT_EQ(std::string(BlockBasedTableFactory::kName),
2639             std::string(options.table_factory->Name()));
2640   BlockBasedTableOptions* table_options =
2641       reinterpret_cast<BlockBasedTableOptions*>(
2642           options.table_factory->GetOptions());
2643   ASSERT_TRUE(table_options != nullptr);
2644   std::shared_ptr<Cache> cache = table_options->block_cache;
2645 
2646   ASSERT_EQ(0, cache->GetUsage());
2647   ASSERT_OK(DB::Open(options, dbname_, &db_));
2648   ASSERT_OK(Put("foo", "v1"));
2649 
2650   // memtable size is costed to the block cache
2651   ASSERT_NE(0, cache->GetUsage());
2652 
2653   ASSERT_EQ("v1", Get("foo"));
2654   Flush();
2655 
2656   size_t prev_size = cache->GetUsage();
2657   // Remember block cache size, so that we can find that
2658   // it is filled after Get().
2659   // Use pinnable slice so that it can ping the block so that
2660   // when we check the size it is not evicted.
2661   PinnableSlice value;
2662   ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), "foo", &value));
2663   ASSERT_GT(cache->GetUsage(), prev_size);
2664   value.Reset();
2665 }
2666 
2667 #endif  // ROCKSDB_LITE
2668 
TEST_F(DBTest2,GetRaceFlush1)2669 TEST_F(DBTest2, GetRaceFlush1) {
2670   ASSERT_OK(Put("foo", "v1"));
2671 
2672   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2673       {{"DBImpl::GetImpl:1", "DBTest2::GetRaceFlush:1"},
2674        {"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:2"}});
2675 
2676   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2677 
2678   ROCKSDB_NAMESPACE::port::Thread t1([&] {
2679     TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
2680     ASSERT_OK(Put("foo", "v2"));
2681     Flush();
2682     TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
2683   });
2684 
2685   // Get() is issued after the first Put(), so it should see either
2686   // "v1" or "v2".
2687   ASSERT_NE("NOT_FOUND", Get("foo"));
2688   t1.join();
2689   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2690 }
2691 
TEST_F(DBTest2,GetRaceFlush2)2692 TEST_F(DBTest2, GetRaceFlush2) {
2693   ASSERT_OK(Put("foo", "v1"));
2694 
2695   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2696       {{"DBImpl::GetImpl:3", "DBTest2::GetRaceFlush:1"},
2697        {"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:4"}});
2698 
2699   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2700 
2701   port::Thread t1([&] {
2702     TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
2703     ASSERT_OK(Put("foo", "v2"));
2704     Flush();
2705     TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
2706   });
2707 
2708   // Get() is issued after the first Put(), so it should see either
2709   // "v1" or "v2".
2710   ASSERT_NE("NOT_FOUND", Get("foo"));
2711   t1.join();
2712   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2713 }
2714 
TEST_F(DBTest2,DirectIO)2715 TEST_F(DBTest2, DirectIO) {
2716   if (!IsDirectIOSupported()) {
2717     return;
2718   }
2719   Options options = CurrentOptions();
2720   options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
2721       true;
2722   options.allow_mmap_reads = options.allow_mmap_writes = false;
2723   DestroyAndReopen(options);
2724 
2725   ASSERT_OK(Put(Key(0), "a"));
2726   ASSERT_OK(Put(Key(5), "a"));
2727   ASSERT_OK(Flush());
2728 
2729   ASSERT_OK(Put(Key(10), "a"));
2730   ASSERT_OK(Put(Key(15), "a"));
2731   ASSERT_OK(Flush());
2732 
2733   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2734   Reopen(options);
2735 }
2736 
TEST_F(DBTest2,MemtableOnlyIterator)2737 TEST_F(DBTest2, MemtableOnlyIterator) {
2738   Options options = CurrentOptions();
2739   CreateAndReopenWithCF({"pikachu"}, options);
2740 
2741   ASSERT_OK(Put(1, "foo", "first"));
2742   ASSERT_OK(Put(1, "bar", "second"));
2743 
2744   ReadOptions ropt;
2745   ropt.read_tier = kMemtableTier;
2746   std::string value;
2747   Iterator* it = nullptr;
2748 
2749   // Before flushing
2750   // point lookups
2751   ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
2752   ASSERT_EQ("first", value);
2753   ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
2754   ASSERT_EQ("second", value);
2755 
2756   // Memtable-only iterator (read_tier=kMemtableTier); data not flushed yet.
2757   it = db_->NewIterator(ropt, handles_[1]);
2758   int count = 0;
2759   for (it->SeekToFirst(); it->Valid(); it->Next()) {
2760     ASSERT_TRUE(it->Valid());
2761     count++;
2762   }
2763   ASSERT_TRUE(!it->Valid());
2764   ASSERT_EQ(2, count);
2765   delete it;
2766 
2767   Flush(1);
2768 
2769   // After flushing
2770   // point lookups
2771   ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
2772   ASSERT_EQ("first", value);
2773   ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
2774   ASSERT_EQ("second", value);
2775   // nothing should be returned using memtable-only iterator after flushing.
2776   it = db_->NewIterator(ropt, handles_[1]);
2777   count = 0;
2778   for (it->SeekToFirst(); it->Valid(); it->Next()) {
2779     ASSERT_TRUE(it->Valid());
2780     count++;
2781   }
2782   ASSERT_TRUE(!it->Valid());
2783   ASSERT_EQ(0, count);
2784   delete it;
2785 
2786   // Add a key to memtable
2787   ASSERT_OK(Put(1, "foobar", "third"));
2788   it = db_->NewIterator(ropt, handles_[1]);
2789   count = 0;
2790   for (it->SeekToFirst(); it->Valid(); it->Next()) {
2791     ASSERT_TRUE(it->Valid());
2792     ASSERT_EQ("foobar", it->key().ToString());
2793     ASSERT_EQ("third", it->value().ToString());
2794     count++;
2795   }
2796   ASSERT_TRUE(!it->Valid());
2797   ASSERT_EQ(1, count);
2798   delete it;
2799 }
2800 
TEST_F(DBTest2,LowPriWrite)2801 TEST_F(DBTest2, LowPriWrite) {
2802   Options options = CurrentOptions();
2803   // Compaction pressure should trigger since 6 files
2804   options.level0_file_num_compaction_trigger = 4;
2805   options.level0_slowdown_writes_trigger = 12;
2806   options.level0_stop_writes_trigger = 30;
2807   options.delayed_write_rate = 8 * 1024 * 1024;
2808   Reopen(options);
2809 
2810   std::atomic<int> rate_limit_count(0);
2811 
2812   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2813       "GenericRateLimiter::Request:1", [&](void* arg) {
2814         rate_limit_count.fetch_add(1);
2815         int64_t* rate_bytes_per_sec = static_cast<int64_t*>(arg);
2816         ASSERT_EQ(1024 * 1024, *rate_bytes_per_sec);
2817       });
2818   // Block compaction
2819   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
2820       {"DBTest.LowPriWrite:0", "DBImpl::BGWorkCompaction"},
2821   });
2822   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2823   WriteOptions wo;
2824   for (int i = 0; i < 6; i++) {
2825     wo.low_pri = false;
2826     Put("", "", wo);
2827     wo.low_pri = true;
2828     Put("", "", wo);
2829     Flush();
2830   }
2831   ASSERT_EQ(0, rate_limit_count.load());
2832   wo.low_pri = true;
2833   Put("", "", wo);
2834   ASSERT_EQ(1, rate_limit_count.load());
2835   wo.low_pri = false;
2836   Put("", "", wo);
2837   ASSERT_EQ(1, rate_limit_count.load());
2838 
2839   TEST_SYNC_POINT("DBTest.LowPriWrite:0");
2840   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2841 
2842   dbfull()->TEST_WaitForCompact();
2843   wo.low_pri = true;
2844   Put("", "", wo);
2845   ASSERT_EQ(1, rate_limit_count.load());
2846   wo.low_pri = false;
2847   Put("", "", wo);
2848   ASSERT_EQ(1, rate_limit_count.load());
2849 }
2850 
2851 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,RateLimitedCompactionReads)2852 TEST_F(DBTest2, RateLimitedCompactionReads) {
2853   // compaction input has 512KB data
2854   const int kNumKeysPerFile = 128;
2855   const int kBytesPerKey = 1024;
2856   const int kNumL0Files = 4;
2857 
2858   for (auto use_direct_io : {false, true}) {
2859     if (use_direct_io && !IsDirectIOSupported()) {
2860       continue;
2861     }
2862     Options options = CurrentOptions();
2863     options.compression = kNoCompression;
2864     options.level0_file_num_compaction_trigger = kNumL0Files;
2865     options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
2866     options.new_table_reader_for_compaction_inputs = true;
2867     // takes roughly one second, split into 100 x 10ms intervals. Each interval
2868     // permits 5.12KB, which is smaller than the block size, so this test
2869     // exercises the code for chunking reads.
2870     options.rate_limiter.reset(NewGenericRateLimiter(
2871         static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
2872                              kBytesPerKey) /* rate_bytes_per_sec */,
2873         10 * 1000 /* refill_period_us */, 10 /* fairness */,
2874         RateLimiter::Mode::kReadsOnly));
2875     options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
2876         use_direct_io;
2877     BlockBasedTableOptions bbto;
2878     bbto.block_size = 16384;
2879     bbto.no_block_cache = true;
2880     options.table_factory.reset(new BlockBasedTableFactory(bbto));
2881     DestroyAndReopen(options);
2882 
2883     for (int i = 0; i < kNumL0Files; ++i) {
2884       for (int j = 0; j <= kNumKeysPerFile; ++j) {
2885         ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
2886       }
2887       dbfull()->TEST_WaitForFlushMemTable();
2888       ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
2889     }
2890     dbfull()->TEST_WaitForCompact();
2891     ASSERT_EQ(0, NumTableFilesAtLevel(0));
2892 
2893     ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH));
2894     // should be slightly above 512KB due to non-data blocks read. Arbitrarily
2895     // chose 1MB as the upper bound on the total bytes read.
2896     size_t rate_limited_bytes =
2897         options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
2898     // Include the explicit prefetch of the footer in direct I/O case.
2899     size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
2900     ASSERT_GE(
2901         rate_limited_bytes,
2902         static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
2903     ASSERT_LT(
2904         rate_limited_bytes,
2905         static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
2906                             direct_io_extra));
2907 
2908     Iterator* iter = db_->NewIterator(ReadOptions());
2909     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2910       ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
2911     }
2912     delete iter;
2913     // bytes read for user iterator shouldn't count against the rate limit.
2914     ASSERT_EQ(rate_limited_bytes,
2915               static_cast<size_t>(
2916                   options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
2917   }
2918 }
2919 #endif  // ROCKSDB_LITE
2920 
2921 // Make sure DB can be reopen with reduced number of levels, given no file
2922 // is on levels higher than the new num_levels.
TEST_F(DBTest2,ReduceLevel)2923 TEST_F(DBTest2, ReduceLevel) {
2924   Options options;
2925   options.disable_auto_compactions = true;
2926   options.num_levels = 7;
2927   Reopen(options);
2928   Put("foo", "bar");
2929   Flush();
2930   MoveFilesToLevel(6);
2931 #ifndef ROCKSDB_LITE
2932   ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
2933 #endif  // !ROCKSDB_LITE
2934   CompactRangeOptions compact_options;
2935   compact_options.change_level = true;
2936   compact_options.target_level = 1;
2937   dbfull()->CompactRange(compact_options, nullptr, nullptr);
2938 #ifndef ROCKSDB_LITE
2939   ASSERT_EQ("0,1", FilesPerLevel());
2940 #endif  // !ROCKSDB_LITE
2941   options.num_levels = 3;
2942   Reopen(options);
2943 #ifndef ROCKSDB_LITE
2944   ASSERT_EQ("0,1", FilesPerLevel());
2945 #endif  // !ROCKSDB_LITE
2946 }
2947 
2948 // Test that ReadCallback is actually used in both memtbale and sst tables
TEST_F(DBTest2,ReadCallbackTest)2949 TEST_F(DBTest2, ReadCallbackTest) {
2950   Options options;
2951   options.disable_auto_compactions = true;
2952   options.num_levels = 7;
2953   Reopen(options);
2954   std::vector<const Snapshot*> snapshots;
2955   // Try to create a db with multiple layers and a memtable
2956   const std::string key = "foo";
2957   const std::string value = "bar";
2958   // This test assumes that the seq start with 1 and increased by 1 after each
2959   // write batch of size 1. If that behavior changes, the test needs to be
2960   // updated as well.
2961   // TODO(myabandeh): update this test to use the seq number that is returned by
2962   // the DB instead of assuming what seq the DB used.
2963   int i = 1;
2964   for (; i < 10; i++) {
2965     Put(key, value + std::to_string(i));
2966     // Take a snapshot to avoid the value being removed during compaction
2967     auto snapshot = dbfull()->GetSnapshot();
2968     snapshots.push_back(snapshot);
2969   }
2970   Flush();
2971   for (; i < 20; i++) {
2972     Put(key, value + std::to_string(i));
2973     // Take a snapshot to avoid the value being removed during compaction
2974     auto snapshot = dbfull()->GetSnapshot();
2975     snapshots.push_back(snapshot);
2976   }
2977   Flush();
2978   MoveFilesToLevel(6);
2979 #ifndef ROCKSDB_LITE
2980   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
2981 #endif  // !ROCKSDB_LITE
2982   for (; i < 30; i++) {
2983     Put(key, value + std::to_string(i));
2984     auto snapshot = dbfull()->GetSnapshot();
2985     snapshots.push_back(snapshot);
2986   }
2987   Flush();
2988 #ifndef ROCKSDB_LITE
2989   ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel());
2990 #endif  // !ROCKSDB_LITE
2991   // And also add some values to the memtable
2992   for (; i < 40; i++) {
2993     Put(key, value + std::to_string(i));
2994     auto snapshot = dbfull()->GetSnapshot();
2995     snapshots.push_back(snapshot);
2996   }
2997 
2998   class TestReadCallback : public ReadCallback {
2999    public:
3000     explicit TestReadCallback(SequenceNumber snapshot)
3001         : ReadCallback(snapshot), snapshot_(snapshot) {}
3002     bool IsVisibleFullCheck(SequenceNumber seq) override {
3003       return seq <= snapshot_;
3004     }
3005 
3006    private:
3007     SequenceNumber snapshot_;
3008   };
3009 
3010   for (int seq = 1; seq < i; seq++) {
3011     PinnableSlice pinnable_val;
3012     ReadOptions roptions;
3013     TestReadCallback callback(seq);
3014     bool dont_care = true;
3015     DBImpl::GetImplOptions get_impl_options;
3016     get_impl_options.column_family = dbfull()->DefaultColumnFamily();
3017     get_impl_options.value = &pinnable_val;
3018     get_impl_options.value_found = &dont_care;
3019     get_impl_options.callback = &callback;
3020     Status s = dbfull()->GetImpl(roptions, key, get_impl_options);
3021     ASSERT_TRUE(s.ok());
3022     // Assuming that after each Put the DB increased seq by one, the value and
3023     // seq number must be equal since we also inc value by 1 after each Put.
3024     ASSERT_EQ(value + std::to_string(seq), pinnable_val.ToString());
3025   }
3026 
3027   for (auto snapshot : snapshots) {
3028     dbfull()->ReleaseSnapshot(snapshot);
3029   }
3030 }
3031 
3032 #ifndef ROCKSDB_LITE
3033 
TEST_F(DBTest2,LiveFilesOmitObsoleteFiles)3034 TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
3035   // Regression test for race condition where an obsolete file is returned to
3036   // user as a "live file" but then deleted, all while file deletions are
3037   // disabled.
3038   //
3039   // It happened like this:
3040   //
3041   // 1. [flush thread] Log file "x.log" found by FindObsoleteFiles
3042   // 2. [user thread] DisableFileDeletions, GetSortedWalFiles are called and the
3043   //    latter returned "x.log"
3044   // 3. [flush thread] PurgeObsoleteFiles deleted "x.log"
3045   // 4. [user thread] Reading "x.log" failed
3046   //
3047   // Unfortunately the only regression test I can come up with involves sleep.
3048   // We cannot set SyncPoints to repro since, once the fix is applied, the
3049   // SyncPoints would cause a deadlock as the repro's sequence of events is now
3050   // prohibited.
3051   //
3052   // Instead, if we sleep for a second between Find and Purge, and ensure the
3053   // read attempt happens after purge, then the sequence of events will almost
3054   // certainly happen on the old code.
3055   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3056       {"DBImpl::BackgroundCallFlush:FilesFound",
3057        "DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered"},
3058       {"DBImpl::PurgeObsoleteFiles:End",
3059        "DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured"},
3060   });
3061   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3062       "DBImpl::PurgeObsoleteFiles:Begin",
3063       [&](void* /*arg*/) { env_->SleepForMicroseconds(1000000); });
3064   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3065 
3066   Put("key", "val");
3067   FlushOptions flush_opts;
3068   flush_opts.wait = false;
3069   db_->Flush(flush_opts);
3070   TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered");
3071 
3072   db_->DisableFileDeletions();
3073   VectorLogPtr log_files;
3074   db_->GetSortedWalFiles(log_files);
3075   TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured");
3076   for (const auto& log_file : log_files) {
3077     ASSERT_OK(env_->FileExists(LogFileName(dbname_, log_file->LogNumber())));
3078   }
3079 
3080   db_->EnableFileDeletions();
3081   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3082 }
3083 
TEST_F(DBTest2,TestNumPread)3084 TEST_F(DBTest2, TestNumPread) {
3085   Options options = CurrentOptions();
3086   // disable block cache
3087   BlockBasedTableOptions table_options;
3088   table_options.no_block_cache = true;
3089   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3090   Reopen(options);
3091   env_->count_random_reads_ = true;
3092 
3093   env_->random_file_open_counter_.store(0);
3094   ASSERT_OK(Put("bar", "foo"));
3095   ASSERT_OK(Put("foo", "bar"));
3096   ASSERT_OK(Flush());
3097   // After flush, we'll open the file and read footer, meta block,
3098   // property block and index block.
3099   ASSERT_EQ(4, env_->random_read_counter_.Read());
3100   ASSERT_EQ(1, env_->random_file_open_counter_.load());
3101 
3102   // One pread per a normal data block read
3103   env_->random_file_open_counter_.store(0);
3104   env_->random_read_counter_.Reset();
3105   ASSERT_EQ("bar", Get("foo"));
3106   ASSERT_EQ(1, env_->random_read_counter_.Read());
3107   // All files are already opened.
3108   ASSERT_EQ(0, env_->random_file_open_counter_.load());
3109 
3110   env_->random_file_open_counter_.store(0);
3111   env_->random_read_counter_.Reset();
3112   ASSERT_OK(Put("bar2", "foo2"));
3113   ASSERT_OK(Put("foo2", "bar2"));
3114   ASSERT_OK(Flush());
3115   // After flush, we'll open the file and read footer, meta block,
3116   // property block and index block.
3117   ASSERT_EQ(4, env_->random_read_counter_.Read());
3118   ASSERT_EQ(1, env_->random_file_open_counter_.load());
3119 
3120   // Compaction needs two input blocks, which requires 2 preads, and
3121   // generate a new SST file which needs 4 preads (footer, meta block,
3122   // property block and index block). In total 6.
3123   env_->random_file_open_counter_.store(0);
3124   env_->random_read_counter_.Reset();
3125   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3126   ASSERT_EQ(6, env_->random_read_counter_.Read());
3127   // All compactin input files should have already been opened.
3128   ASSERT_EQ(1, env_->random_file_open_counter_.load());
3129 
3130   // One pread per a normal data block read
3131   env_->random_file_open_counter_.store(0);
3132   env_->random_read_counter_.Reset();
3133   ASSERT_EQ("foo2", Get("bar2"));
3134   ASSERT_EQ(1, env_->random_read_counter_.Read());
3135   // SST files are already opened.
3136   ASSERT_EQ(0, env_->random_file_open_counter_.load());
3137 }
3138 
TEST_F(DBTest2,TraceAndReplay)3139 TEST_F(DBTest2, TraceAndReplay) {
3140   Options options = CurrentOptions();
3141   options.merge_operator = MergeOperators::CreatePutOperator();
3142   ReadOptions ro;
3143   WriteOptions wo;
3144   TraceOptions trace_opts;
3145   EnvOptions env_opts;
3146   CreateAndReopenWithCF({"pikachu"}, options);
3147   Random rnd(301);
3148   Iterator* single_iter = nullptr;
3149 
3150   ASSERT_TRUE(db_->EndTrace().IsIOError());
3151 
3152   std::string trace_filename = dbname_ + "/rocksdb.trace";
3153   std::unique_ptr<TraceWriter> trace_writer;
3154   ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3155   ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3156 
3157   ASSERT_OK(Put(0, "a", "1"));
3158   ASSERT_OK(Merge(0, "b", "2"));
3159   ASSERT_OK(Delete(0, "c"));
3160   ASSERT_OK(SingleDelete(0, "d"));
3161   ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
3162 
3163   WriteBatch batch;
3164   ASSERT_OK(batch.Put("f", "11"));
3165   ASSERT_OK(batch.Merge("g", "12"));
3166   ASSERT_OK(batch.Delete("h"));
3167   ASSERT_OK(batch.SingleDelete("i"));
3168   ASSERT_OK(batch.DeleteRange("j", "k"));
3169   ASSERT_OK(db_->Write(wo, &batch));
3170 
3171   single_iter = db_->NewIterator(ro);
3172   single_iter->Seek("f");
3173   single_iter->SeekForPrev("g");
3174   delete single_iter;
3175 
3176   ASSERT_EQ("1", Get(0, "a"));
3177   ASSERT_EQ("12", Get(0, "g"));
3178 
3179   ASSERT_OK(Put(1, "foo", "bar"));
3180   ASSERT_OK(Put(1, "rocksdb", "rocks"));
3181   ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
3182 
3183   ASSERT_OK(db_->EndTrace());
3184   // These should not get into the trace file as it is after EndTrace.
3185   Put("hello", "world");
3186   Merge("foo", "bar");
3187 
3188   // Open another db, replay, and verify the data
3189   std::string value;
3190   std::string dbname2 = test::TmpDir(env_) + "/db_replay";
3191   ASSERT_OK(DestroyDB(dbname2, options));
3192 
3193   // Using a different name than db2, to pacify infer's use-after-lifetime
3194   // warnings (http://fbinfer.com).
3195   DB* db2_init = nullptr;
3196   options.create_if_missing = true;
3197   ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3198   ColumnFamilyHandle* cf;
3199   ASSERT_OK(
3200       db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3201   delete cf;
3202   delete db2_init;
3203 
3204   DB* db2 = nullptr;
3205   std::vector<ColumnFamilyDescriptor> column_families;
3206   ColumnFamilyOptions cf_options;
3207   cf_options.merge_operator = MergeOperators::CreatePutOperator();
3208   column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3209   column_families.push_back(
3210       ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3211   std::vector<ColumnFamilyHandle*> handles;
3212   ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3213 
3214   env_->SleepForMicroseconds(100);
3215   // Verify that the keys don't already exist
3216   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3217   ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
3218 
3219   std::unique_ptr<TraceReader> trace_reader;
3220   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3221   Replayer replayer(db2, handles_, std::move(trace_reader));
3222   ASSERT_OK(replayer.Replay());
3223 
3224   ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
3225   ASSERT_EQ("1", value);
3226   ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
3227   ASSERT_EQ("12", value);
3228   ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
3229   ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
3230 
3231   ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
3232   ASSERT_EQ("bar", value);
3233   ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
3234   ASSERT_EQ("rocks", value);
3235 
3236   for (auto handle : handles) {
3237     delete handle;
3238   }
3239   delete db2;
3240   ASSERT_OK(DestroyDB(dbname2, options));
3241 }
3242 
TEST_F(DBTest2,TraceWithLimit)3243 TEST_F(DBTest2, TraceWithLimit) {
3244   Options options = CurrentOptions();
3245   options.merge_operator = MergeOperators::CreatePutOperator();
3246   ReadOptions ro;
3247   WriteOptions wo;
3248   TraceOptions trace_opts;
3249   EnvOptions env_opts;
3250   CreateAndReopenWithCF({"pikachu"}, options);
3251   Random rnd(301);
3252 
3253   // test the max trace file size options
3254   trace_opts.max_trace_file_size = 5;
3255   std::string trace_filename = dbname_ + "/rocksdb.trace1";
3256   std::unique_ptr<TraceWriter> trace_writer;
3257   ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3258   ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3259   ASSERT_OK(Put(0, "a", "1"));
3260   ASSERT_OK(Put(0, "b", "1"));
3261   ASSERT_OK(Put(0, "c", "1"));
3262   ASSERT_OK(db_->EndTrace());
3263 
3264   std::string dbname2 = test::TmpDir(env_) + "/db_replay2";
3265   std::string value;
3266   ASSERT_OK(DestroyDB(dbname2, options));
3267 
3268   // Using a different name than db2, to pacify infer's use-after-lifetime
3269   // warnings (http://fbinfer.com).
3270   DB* db2_init = nullptr;
3271   options.create_if_missing = true;
3272   ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3273   ColumnFamilyHandle* cf;
3274   ASSERT_OK(
3275       db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3276   delete cf;
3277   delete db2_init;
3278 
3279   DB* db2 = nullptr;
3280   std::vector<ColumnFamilyDescriptor> column_families;
3281   ColumnFamilyOptions cf_options;
3282   cf_options.merge_operator = MergeOperators::CreatePutOperator();
3283   column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3284   column_families.push_back(
3285       ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3286   std::vector<ColumnFamilyHandle*> handles;
3287   ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3288 
3289   env_->SleepForMicroseconds(100);
3290   // Verify that the keys don't already exist
3291   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3292   ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3293   ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3294 
3295   std::unique_ptr<TraceReader> trace_reader;
3296   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3297   Replayer replayer(db2, handles_, std::move(trace_reader));
3298   ASSERT_OK(replayer.Replay());
3299 
3300   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3301   ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3302   ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3303 
3304   for (auto handle : handles) {
3305     delete handle;
3306   }
3307   delete db2;
3308   ASSERT_OK(DestroyDB(dbname2, options));
3309 }
3310 
TEST_F(DBTest2,TraceWithSampling)3311 TEST_F(DBTest2, TraceWithSampling) {
3312   Options options = CurrentOptions();
3313   ReadOptions ro;
3314   WriteOptions wo;
3315   TraceOptions trace_opts;
3316   EnvOptions env_opts;
3317   CreateAndReopenWithCF({"pikachu"}, options);
3318   Random rnd(301);
3319 
3320   // test the trace file sampling options
3321   trace_opts.sampling_frequency = 2;
3322   std::string trace_filename = dbname_ + "/rocksdb.trace_sampling";
3323   std::unique_ptr<TraceWriter> trace_writer;
3324   ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3325   ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3326   ASSERT_OK(Put(0, "a", "1"));
3327   ASSERT_OK(Put(0, "b", "2"));
3328   ASSERT_OK(Put(0, "c", "3"));
3329   ASSERT_OK(Put(0, "d", "4"));
3330   ASSERT_OK(Put(0, "e", "5"));
3331   ASSERT_OK(db_->EndTrace());
3332 
3333   std::string dbname2 = test::TmpDir(env_) + "/db_replay_sampling";
3334   std::string value;
3335   ASSERT_OK(DestroyDB(dbname2, options));
3336 
3337   // Using a different name than db2, to pacify infer's use-after-lifetime
3338   // warnings (http://fbinfer.com).
3339   DB* db2_init = nullptr;
3340   options.create_if_missing = true;
3341   ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3342   ColumnFamilyHandle* cf;
3343   ASSERT_OK(
3344       db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3345   delete cf;
3346   delete db2_init;
3347 
3348   DB* db2 = nullptr;
3349   std::vector<ColumnFamilyDescriptor> column_families;
3350   ColumnFamilyOptions cf_options;
3351   column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3352   column_families.push_back(
3353       ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3354   std::vector<ColumnFamilyHandle*> handles;
3355   ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3356 
3357   env_->SleepForMicroseconds(100);
3358   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3359   ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3360   ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3361   ASSERT_TRUE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
3362   ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
3363 
3364   std::unique_ptr<TraceReader> trace_reader;
3365   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3366   Replayer replayer(db2, handles_, std::move(trace_reader));
3367   ASSERT_OK(replayer.Replay());
3368 
3369   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3370   ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3371   ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3372   ASSERT_FALSE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
3373   ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
3374 
3375   for (auto handle : handles) {
3376     delete handle;
3377   }
3378   delete db2;
3379   ASSERT_OK(DestroyDB(dbname2, options));
3380 }
3381 
TEST_F(DBTest2,TraceWithFilter)3382 TEST_F(DBTest2, TraceWithFilter) {
3383   Options options = CurrentOptions();
3384   options.merge_operator = MergeOperators::CreatePutOperator();
3385   ReadOptions ro;
3386   WriteOptions wo;
3387   TraceOptions trace_opts;
3388   EnvOptions env_opts;
3389   CreateAndReopenWithCF({"pikachu"}, options);
3390   Random rnd(301);
3391   Iterator* single_iter = nullptr;
3392 
3393   trace_opts.filter = TraceFilterType::kTraceFilterWrite;
3394 
3395   std::string trace_filename = dbname_ + "/rocksdb.trace";
3396   std::unique_ptr<TraceWriter> trace_writer;
3397   ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3398   ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3399 
3400   ASSERT_OK(Put(0, "a", "1"));
3401   ASSERT_OK(Merge(0, "b", "2"));
3402   ASSERT_OK(Delete(0, "c"));
3403   ASSERT_OK(SingleDelete(0, "d"));
3404   ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
3405 
3406   WriteBatch batch;
3407   ASSERT_OK(batch.Put("f", "11"));
3408   ASSERT_OK(batch.Merge("g", "12"));
3409   ASSERT_OK(batch.Delete("h"));
3410   ASSERT_OK(batch.SingleDelete("i"));
3411   ASSERT_OK(batch.DeleteRange("j", "k"));
3412   ASSERT_OK(db_->Write(wo, &batch));
3413 
3414   single_iter = db_->NewIterator(ro);
3415   single_iter->Seek("f");
3416   single_iter->SeekForPrev("g");
3417   delete single_iter;
3418 
3419   ASSERT_EQ("1", Get(0, "a"));
3420   ASSERT_EQ("12", Get(0, "g"));
3421 
3422   ASSERT_OK(Put(1, "foo", "bar"));
3423   ASSERT_OK(Put(1, "rocksdb", "rocks"));
3424   ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
3425 
3426   ASSERT_OK(db_->EndTrace());
3427   // These should not get into the trace file as it is after EndTrace.
3428   Put("hello", "world");
3429   Merge("foo", "bar");
3430 
3431   // Open another db, replay, and verify the data
3432   std::string value;
3433   std::string dbname2 = test::TmpDir(env_) + "/db_replay";
3434   ASSERT_OK(DestroyDB(dbname2, options));
3435 
3436   // Using a different name than db2, to pacify infer's use-after-lifetime
3437   // warnings (http://fbinfer.com).
3438   DB* db2_init = nullptr;
3439   options.create_if_missing = true;
3440   ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3441   ColumnFamilyHandle* cf;
3442   ASSERT_OK(
3443       db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3444   delete cf;
3445   delete db2_init;
3446 
3447   DB* db2 = nullptr;
3448   std::vector<ColumnFamilyDescriptor> column_families;
3449   ColumnFamilyOptions cf_options;
3450   cf_options.merge_operator = MergeOperators::CreatePutOperator();
3451   column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3452   column_families.push_back(
3453       ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3454   std::vector<ColumnFamilyHandle*> handles;
3455   ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3456 
3457   env_->SleepForMicroseconds(100);
3458   // Verify that the keys don't already exist
3459   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3460   ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
3461 
3462   std::unique_ptr<TraceReader> trace_reader;
3463   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3464   Replayer replayer(db2, handles_, std::move(trace_reader));
3465   ASSERT_OK(replayer.Replay());
3466 
3467   // All the key-values should not present since we filter out the WRITE ops.
3468   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3469   ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
3470   ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
3471   ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
3472   ASSERT_TRUE(db2->Get(ro, handles[0], "foo", &value).IsNotFound());
3473   ASSERT_TRUE(db2->Get(ro, handles[0], "rocksdb", &value).IsNotFound());
3474 
3475   for (auto handle : handles) {
3476     delete handle;
3477   }
3478   delete db2;
3479   ASSERT_OK(DestroyDB(dbname2, options));
3480 
3481   // Set up a new db.
3482   std::string dbname3 = test::TmpDir(env_) + "/db_not_trace_read";
3483   ASSERT_OK(DestroyDB(dbname3, options));
3484 
3485   DB* db3_init = nullptr;
3486   options.create_if_missing = true;
3487   ColumnFamilyHandle* cf3;
3488   ASSERT_OK(DB::Open(options, dbname3, &db3_init));
3489   ASSERT_OK(
3490       db3_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf3));
3491   delete cf3;
3492   delete db3_init;
3493 
3494   column_families.clear();
3495   column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3496   column_families.push_back(
3497       ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3498   handles.clear();
3499 
3500   DB* db3 =  nullptr;
3501   ASSERT_OK(DB::Open(DBOptions(), dbname3, column_families, &handles, &db3));
3502 
3503   env_->SleepForMicroseconds(100);
3504   // Verify that the keys don't already exist
3505   ASSERT_TRUE(db3->Get(ro, handles[0], "a", &value).IsNotFound());
3506   ASSERT_TRUE(db3->Get(ro, handles[0], "g", &value).IsNotFound());
3507 
3508   //The tracer will not record the READ ops.
3509   trace_opts.filter = TraceFilterType::kTraceFilterGet;
3510   std::string trace_filename3 = dbname_ + "/rocksdb.trace_3";
3511   std::unique_ptr<TraceWriter> trace_writer3;
3512   ASSERT_OK(
3513     NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
3514   ASSERT_OK(db3->StartTrace(trace_opts, std::move(trace_writer3)));
3515 
3516   ASSERT_OK(db3->Put(wo, handles[0], "a", "1"));
3517   ASSERT_OK(db3->Merge(wo, handles[0], "b", "2"));
3518   ASSERT_OK(db3->Delete(wo, handles[0], "c"));
3519   ASSERT_OK(db3->SingleDelete(wo, handles[0], "d"));
3520 
3521   ASSERT_OK(db3->Get(ro, handles[0], "a", &value));
3522   ASSERT_EQ(value, "1");
3523   ASSERT_TRUE(db3->Get(ro, handles[0], "c", &value).IsNotFound());
3524 
3525   ASSERT_OK(db3->EndTrace());
3526 
3527   for (auto handle : handles) {
3528     delete handle;
3529   }
3530   delete db3;
3531   ASSERT_OK(DestroyDB(dbname3, options));
3532 
3533   std::unique_ptr<TraceReader> trace_reader3;
3534   ASSERT_OK(
3535     NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
3536 
3537   // Count the number of records in the trace file;
3538   int count = 0;
3539   std::string data;
3540   Status s;
3541   while (true) {
3542     s = trace_reader3->Read(&data);
3543     if (!s.ok()) {
3544       break;
3545     }
3546     count += 1;
3547   }
3548   // We also need to count the header and footer
3549   // 4 WRITE + HEADER + FOOTER = 6
3550   ASSERT_EQ(count, 6);
3551 }
3552 
3553 #endif  // ROCKSDB_LITE
3554 
TEST_F(DBTest2,PinnableSliceAndMmapReads)3555 TEST_F(DBTest2, PinnableSliceAndMmapReads) {
3556   Options options = CurrentOptions();
3557   options.allow_mmap_reads = true;
3558   options.max_open_files = 100;
3559   options.compression = kNoCompression;
3560   Reopen(options);
3561 
3562   ASSERT_OK(Put("foo", "bar"));
3563   ASSERT_OK(Flush());
3564 
3565   PinnableSlice pinned_value;
3566   ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
3567   // It is not safe to pin mmap files as they might disappear by compaction
3568   ASSERT_FALSE(pinned_value.IsPinned());
3569   ASSERT_EQ(pinned_value.ToString(), "bar");
3570 
3571   dbfull()->TEST_CompactRange(0 /* level */, nullptr /* begin */,
3572                               nullptr /* end */, nullptr /* column_family */,
3573                               true /* disallow_trivial_move */);
3574 
3575   // Ensure pinned_value doesn't rely on memory munmap'd by the above
3576   // compaction. It crashes if it does.
3577   ASSERT_EQ(pinned_value.ToString(), "bar");
3578 
3579 #ifndef ROCKSDB_LITE
3580   pinned_value.Reset();
3581   // Unsafe to pin mmap files when they could be kicked out of table cache
3582   Close();
3583   ASSERT_OK(ReadOnlyReopen(options));
3584   ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
3585   ASSERT_FALSE(pinned_value.IsPinned());
3586   ASSERT_EQ(pinned_value.ToString(), "bar");
3587 
3588   pinned_value.Reset();
3589   // In read-only mode with infinite capacity on table cache it should pin the
3590   // value and avoid the memcpy
3591   Close();
3592   options.max_open_files = -1;
3593   ASSERT_OK(ReadOnlyReopen(options));
3594   ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
3595   ASSERT_TRUE(pinned_value.IsPinned());
3596   ASSERT_EQ(pinned_value.ToString(), "bar");
3597 #endif
3598 }
3599 
TEST_F(DBTest2,DISABLED_IteratorPinnedMemory)3600 TEST_F(DBTest2, DISABLED_IteratorPinnedMemory) {
3601   Options options = CurrentOptions();
3602   options.create_if_missing = true;
3603   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
3604   BlockBasedTableOptions bbto;
3605   bbto.no_block_cache = false;
3606   bbto.cache_index_and_filter_blocks = false;
3607   bbto.block_cache = NewLRUCache(100000);
3608   bbto.block_size = 400;  // small block size
3609   options.table_factory.reset(new BlockBasedTableFactory(bbto));
3610   Reopen(options);
3611 
3612   Random rnd(301);
3613   std::string v = RandomString(&rnd, 400);
3614 
3615   // Since v is the size of a block, each key should take a block
3616   // of 400+ bytes.
3617   Put("1", v);
3618   Put("3", v);
3619   Put("5", v);
3620   Put("7", v);
3621   ASSERT_OK(Flush());
3622 
3623   ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
3624 
3625   // Verify that iterators don't pin more than one data block in block cache
3626   // at each time.
3627   {
3628     std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
3629     iter->SeekToFirst();
3630 
3631     for (int i = 0; i < 4; i++) {
3632       ASSERT_TRUE(iter->Valid());
3633       // Block cache should contain exactly one block.
3634       ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
3635       ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
3636       iter->Next();
3637     }
3638     ASSERT_FALSE(iter->Valid());
3639 
3640     iter->Seek("4");
3641     ASSERT_TRUE(iter->Valid());
3642 
3643     ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
3644     ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
3645 
3646     iter->Seek("3");
3647     ASSERT_TRUE(iter->Valid());
3648 
3649     ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
3650     ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
3651   }
3652   ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
3653 
3654   // Test compaction case
3655   Put("2", v);
3656   Put("5", v);
3657   Put("6", v);
3658   Put("8", v);
3659   ASSERT_OK(Flush());
3660 
3661   // Clear existing data in block cache
3662   bbto.block_cache->SetCapacity(0);
3663   bbto.block_cache->SetCapacity(100000);
3664 
3665   // Verify compaction input iterators don't hold more than one data blocks at
3666   // one time.
3667   std::atomic<bool> finished(false);
3668   std::atomic<int> block_newed(0);
3669   std::atomic<int> block_destroyed(0);
3670   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3671       "Block::Block:0", [&](void* /*arg*/) {
3672         if (finished) {
3673           return;
3674         }
3675         // Two iterators. At most 2 outstanding blocks.
3676         EXPECT_GE(block_newed.load(), block_destroyed.load());
3677         EXPECT_LE(block_newed.load(), block_destroyed.load() + 1);
3678         block_newed.fetch_add(1);
3679       });
3680   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3681       "Block::~Block", [&](void* /*arg*/) {
3682         if (finished) {
3683           return;
3684         }
3685         // Two iterators. At most 2 outstanding blocks.
3686         EXPECT_GE(block_newed.load(), block_destroyed.load() + 1);
3687         EXPECT_LE(block_newed.load(), block_destroyed.load() + 2);
3688         block_destroyed.fetch_add(1);
3689       });
3690   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3691       "CompactionJob::Run:BeforeVerify",
3692       [&](void* /*arg*/) { finished = true; });
3693   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3694 
3695   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3696 
3697   // Two input files. Each of them has 4 data blocks.
3698   ASSERT_EQ(8, block_newed.load());
3699   ASSERT_EQ(8, block_destroyed.load());
3700 
3701   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3702 }
3703 
TEST_F(DBTest2,TestBBTTailPrefetch)3704 TEST_F(DBTest2, TestBBTTailPrefetch) {
3705   std::atomic<bool> called(false);
3706   size_t expected_lower_bound = 512 * 1024;
3707   size_t expected_higher_bound = 512 * 1024;
3708   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3709       "BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
3710         size_t* prefetch_size = static_cast<size_t*>(arg);
3711         EXPECT_LE(expected_lower_bound, *prefetch_size);
3712         EXPECT_GE(expected_higher_bound, *prefetch_size);
3713         called = true;
3714       });
3715   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3716 
3717   Put("1", "1");
3718   Put("9", "1");
3719   Flush();
3720 
3721   expected_lower_bound = 0;
3722   expected_higher_bound = 8 * 1024;
3723 
3724   Put("1", "1");
3725   Put("9", "1");
3726   Flush();
3727 
3728   Put("1", "1");
3729   Put("9", "1");
3730   Flush();
3731 
3732   // Full compaction to make sure there is no L0 file after the open.
3733   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3734 
3735   ASSERT_TRUE(called.load());
3736   called = false;
3737 
3738   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3739   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3740 
3741   std::atomic<bool> first_call(true);
3742   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3743       "BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
3744         size_t* prefetch_size = static_cast<size_t*>(arg);
3745         if (first_call) {
3746           EXPECT_EQ(4 * 1024, *prefetch_size);
3747           first_call = false;
3748         } else {
3749           EXPECT_GE(4 * 1024, *prefetch_size);
3750         }
3751         called = true;
3752       });
3753   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3754 
3755   Options options = CurrentOptions();
3756   options.max_file_opening_threads = 1;  // one thread
3757   BlockBasedTableOptions table_options;
3758   table_options.cache_index_and_filter_blocks = true;
3759   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3760   options.max_open_files = -1;
3761   Reopen(options);
3762 
3763   Put("1", "1");
3764   Put("9", "1");
3765   Flush();
3766 
3767   Put("1", "1");
3768   Put("9", "1");
3769   Flush();
3770 
3771   ASSERT_TRUE(called.load());
3772   called = false;
3773 
3774   // Parallel loading SST files
3775   options.max_file_opening_threads = 16;
3776   Reopen(options);
3777 
3778   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3779 
3780   ASSERT_TRUE(called.load());
3781 
3782   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3783   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3784 }
3785 
TEST_F(DBTest2,TestGetColumnFamilyHandleUnlocked)3786 TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) {
3787   // Setup sync point dependency to reproduce the race condition of
3788   // DBImpl::GetColumnFamilyHandleUnlocked
3789   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3790       {"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1",
3791        "TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2"},
3792       {"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2",
3793        "TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1"},
3794   });
3795   SyncPoint::GetInstance()->EnableProcessing();
3796 
3797   CreateColumnFamilies({"test1", "test2"}, Options());
3798   ASSERT_EQ(handles_.size(), 2);
3799 
3800   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
3801   port::Thread user_thread1([&]() {
3802     auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[0]->GetID());
3803     ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
3804     TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1");
3805     TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1");
3806     ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
3807   });
3808 
3809   port::Thread user_thread2([&]() {
3810     TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2");
3811     auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[1]->GetID());
3812     ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
3813     TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2");
3814     ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
3815   });
3816 
3817   user_thread1.join();
3818   user_thread2.join();
3819 
3820   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3821   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3822 }
3823 
3824 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,TestCompactFiles)3825 TEST_F(DBTest2, TestCompactFiles) {
3826   // Setup sync point dependency to reproduce the race condition of
3827   // DBImpl::GetColumnFamilyHandleUnlocked
3828   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3829       {"TestCompactFiles::IngestExternalFile1",
3830        "TestCompactFiles::IngestExternalFile2"},
3831   });
3832   SyncPoint::GetInstance()->EnableProcessing();
3833 
3834   Options options;
3835   options.num_levels = 2;
3836   options.disable_auto_compactions = true;
3837   Reopen(options);
3838   auto* handle = db_->DefaultColumnFamily();
3839   ASSERT_EQ(db_->NumberLevels(handle), 2);
3840 
3841   ROCKSDB_NAMESPACE::SstFileWriter sst_file_writer{
3842       ROCKSDB_NAMESPACE::EnvOptions(), options};
3843   std::string external_file1 = dbname_ + "/test_compact_files1.sst_t";
3844   std::string external_file2 = dbname_ + "/test_compact_files2.sst_t";
3845   std::string external_file3 = dbname_ + "/test_compact_files3.sst_t";
3846 
3847   ASSERT_OK(sst_file_writer.Open(external_file1));
3848   ASSERT_OK(sst_file_writer.Put("1", "1"));
3849   ASSERT_OK(sst_file_writer.Put("2", "2"));
3850   ASSERT_OK(sst_file_writer.Finish());
3851 
3852   ASSERT_OK(sst_file_writer.Open(external_file2));
3853   ASSERT_OK(sst_file_writer.Put("3", "3"));
3854   ASSERT_OK(sst_file_writer.Put("4", "4"));
3855   ASSERT_OK(sst_file_writer.Finish());
3856 
3857   ASSERT_OK(sst_file_writer.Open(external_file3));
3858   ASSERT_OK(sst_file_writer.Put("5", "5"));
3859   ASSERT_OK(sst_file_writer.Put("6", "6"));
3860   ASSERT_OK(sst_file_writer.Finish());
3861 
3862   ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3},
3863                                     IngestExternalFileOptions()));
3864   ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
3865   std::vector<std::string> files;
3866   GetSstFiles(env_, dbname_, &files);
3867   ASSERT_EQ(files.size(), 2);
3868 
3869   port::Thread user_thread1(
3870       [&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); });
3871 
3872   port::Thread user_thread2([&]() {
3873     ASSERT_OK(db_->IngestExternalFile(handle, {external_file2},
3874                                       IngestExternalFileOptions()));
3875     TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
3876   });
3877 
3878   user_thread1.join();
3879   user_thread2.join();
3880 
3881   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3882   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3883 }
3884 #endif  // ROCKSDB_LITE
3885 
3886 // TODO: figure out why this test fails in appveyor
3887 #ifndef OS_WIN
TEST_F(DBTest2,MultiDBParallelOpenTest)3888 TEST_F(DBTest2, MultiDBParallelOpenTest) {
3889   const int kNumDbs = 2;
3890   Options options = CurrentOptions();
3891   std::vector<std::string> dbnames;
3892   for (int i = 0; i < kNumDbs; ++i) {
3893     dbnames.emplace_back(test::TmpDir(env_) + "/db" + ToString(i));
3894     ASSERT_OK(DestroyDB(dbnames.back(), options));
3895   }
3896 
3897   // Verify empty DBs can be created in parallel
3898   std::vector<std::thread> open_threads;
3899   std::vector<DB*> dbs{static_cast<unsigned int>(kNumDbs), nullptr};
3900   options.create_if_missing = true;
3901   for (int i = 0; i < kNumDbs; ++i) {
3902     open_threads.emplace_back(
3903         [&](int dbnum) {
3904           ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
3905         },
3906         i);
3907   }
3908 
3909   // Now add some data and close, so next we can verify non-empty DBs can be
3910   // recovered in parallel
3911   for (int i = 0; i < kNumDbs; ++i) {
3912     open_threads[i].join();
3913     ASSERT_OK(dbs[i]->Put(WriteOptions(), "xi", "gua"));
3914     delete dbs[i];
3915   }
3916 
3917   // Verify non-empty DBs can be recovered in parallel
3918   dbs.clear();
3919   open_threads.clear();
3920   for (int i = 0; i < kNumDbs; ++i) {
3921     open_threads.emplace_back(
3922         [&](int dbnum) {
3923           ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
3924         },
3925         i);
3926   }
3927 
3928   // Wait and cleanup
3929   for (int i = 0; i < kNumDbs; ++i) {
3930     open_threads[i].join();
3931     delete dbs[i];
3932     ASSERT_OK(DestroyDB(dbnames[i], options));
3933   }
3934 }
3935 #endif  // OS_WIN
3936 
3937 namespace {
3938 class DummyOldStats : public Statistics {
3939  public:
getTickerCount(uint32_t) const3940   uint64_t getTickerCount(uint32_t /*ticker_type*/) const override { return 0; }
recordTick(uint32_t,uint64_t)3941   void recordTick(uint32_t /* ticker_type */, uint64_t /* count */) override {
3942     num_rt++;
3943   }
setTickerCount(uint32_t,uint64_t)3944   void setTickerCount(uint32_t /*ticker_type*/, uint64_t /*count*/) override {}
getAndResetTickerCount(uint32_t)3945   uint64_t getAndResetTickerCount(uint32_t /*ticker_type*/) override {
3946     return 0;
3947   }
measureTime(uint32_t,uint64_t)3948   void measureTime(uint32_t /*histogram_type*/, uint64_t /*count*/) override {
3949     num_mt++;
3950   }
histogramData(uint32_t,ROCKSDB_NAMESPACE::HistogramData * const) const3951   void histogramData(
3952       uint32_t /*histogram_type*/,
3953       ROCKSDB_NAMESPACE::HistogramData* const /*data*/) const override {}
getHistogramString(uint32_t) const3954   std::string getHistogramString(uint32_t /*type*/) const override {
3955     return "";
3956   }
HistEnabledForType(uint32_t) const3957   bool HistEnabledForType(uint32_t /*type*/) const override { return false; }
ToString() const3958   std::string ToString() const override { return ""; }
3959   int num_rt = 0;
3960   int num_mt = 0;
3961 };
3962 }  // namespace
3963 
TEST_F(DBTest2,OldStatsInterface)3964 TEST_F(DBTest2, OldStatsInterface) {
3965   DummyOldStats* dos = new DummyOldStats();
3966   std::shared_ptr<Statistics> stats(dos);
3967   Options options = CurrentOptions();
3968   options.create_if_missing = true;
3969   options.statistics = stats;
3970   Reopen(options);
3971 
3972   Put("foo", "bar");
3973   ASSERT_EQ("bar", Get("foo"));
3974   ASSERT_OK(Flush());
3975   ASSERT_EQ("bar", Get("foo"));
3976 
3977   ASSERT_GT(dos->num_rt, 0);
3978   ASSERT_GT(dos->num_mt, 0);
3979 }
3980 
TEST_F(DBTest2,CloseWithUnreleasedSnapshot)3981 TEST_F(DBTest2, CloseWithUnreleasedSnapshot) {
3982   const Snapshot* ss = db_->GetSnapshot();
3983 
3984   for (auto h : handles_) {
3985     db_->DestroyColumnFamilyHandle(h);
3986   }
3987   handles_.clear();
3988 
3989   ASSERT_NOK(db_->Close());
3990   db_->ReleaseSnapshot(ss);
3991   ASSERT_OK(db_->Close());
3992   delete db_;
3993   db_ = nullptr;
3994 }
3995 
TEST_F(DBTest2,PrefixBloomReseek)3996 TEST_F(DBTest2, PrefixBloomReseek) {
3997   Options options = CurrentOptions();
3998   options.create_if_missing = true;
3999   options.prefix_extractor.reset(NewCappedPrefixTransform(3));
4000   BlockBasedTableOptions bbto;
4001   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
4002   bbto.whole_key_filtering = false;
4003   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
4004   DestroyAndReopen(options);
4005 
4006   // Construct two L1 files with keys:
4007   // f1:[aaa1 ccc1] f2:[ddd0]
4008   ASSERT_OK(Put("aaa1", ""));
4009   ASSERT_OK(Put("ccc1", ""));
4010   ASSERT_OK(Flush());
4011   ASSERT_OK(Put("ddd0", ""));
4012   ASSERT_OK(Flush());
4013   CompactRangeOptions cro;
4014   cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
4015   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
4016 
4017   ASSERT_OK(Put("bbb1", ""));
4018 
4019   Iterator* iter = db_->NewIterator(ReadOptions());
4020 
4021   // Seeking into f1, the iterator will check bloom filter which returns the
4022   // file iterator ot be invalidate, and the cursor will put into f2, with
4023   // the next key to be "ddd0".
4024   iter->Seek("bbb1");
4025   ASSERT_TRUE(iter->Valid());
4026   ASSERT_EQ("bbb1", iter->key().ToString());
4027 
4028   // Reseek ccc1, the L1 iterator needs to go back to f1 and reseek.
4029   iter->Seek("ccc1");
4030   ASSERT_TRUE(iter->Valid());
4031   ASSERT_EQ("ccc1", iter->key().ToString());
4032 
4033   delete iter;
4034 }
4035 
TEST_F(DBTest2,PrefixBloomFilteredOut)4036 TEST_F(DBTest2, PrefixBloomFilteredOut) {
4037   Options options = CurrentOptions();
4038   options.create_if_missing = true;
4039   options.prefix_extractor.reset(NewCappedPrefixTransform(3));
4040   BlockBasedTableOptions bbto;
4041   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
4042   bbto.whole_key_filtering = false;
4043   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
4044   DestroyAndReopen(options);
4045 
4046   // Construct two L1 files with keys:
4047   // f1:[aaa1 ccc1] f2:[ddd0]
4048   ASSERT_OK(Put("aaa1", ""));
4049   ASSERT_OK(Put("ccc1", ""));
4050   ASSERT_OK(Flush());
4051   ASSERT_OK(Put("ddd0", ""));
4052   ASSERT_OK(Flush());
4053   CompactRangeOptions cro;
4054   cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
4055   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
4056 
4057   Iterator* iter = db_->NewIterator(ReadOptions());
4058 
4059   // Bloom filter is filterd out by f1.
4060   // This is just one of several valid position following the contract.
4061   // Postioning to ccc1 or ddd0 is also valid. This is just to validate
4062   // the behavior of the current implementation. If underlying implementation
4063   // changes, the test might fail here.
4064   iter->Seek("bbb1");
4065   ASSERT_FALSE(iter->Valid());
4066 
4067   delete iter;
4068 }
4069 
4070 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,RowCacheSnapshot)4071 TEST_F(DBTest2, RowCacheSnapshot) {
4072   Options options = CurrentOptions();
4073   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
4074   options.row_cache = NewLRUCache(8 * 8192);
4075   DestroyAndReopen(options);
4076 
4077   ASSERT_OK(Put("foo", "bar1"));
4078 
4079   const Snapshot* s1 = db_->GetSnapshot();
4080 
4081   ASSERT_OK(Put("foo", "bar2"));
4082   ASSERT_OK(Flush());
4083 
4084   ASSERT_OK(Put("foo2", "bar"));
4085   const Snapshot* s2 = db_->GetSnapshot();
4086   ASSERT_OK(Put("foo3", "bar"));
4087   const Snapshot* s3 = db_->GetSnapshot();
4088 
4089   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
4090   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 0);
4091   ASSERT_EQ(Get("foo"), "bar2");
4092   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
4093   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
4094   ASSERT_EQ(Get("foo"), "bar2");
4095   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
4096   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
4097   ASSERT_EQ(Get("foo", s1), "bar1");
4098   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
4099   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
4100   ASSERT_EQ(Get("foo", s2), "bar2");
4101   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 2);
4102   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
4103   ASSERT_EQ(Get("foo", s1), "bar1");
4104   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 3);
4105   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
4106   ASSERT_EQ(Get("foo", s3), "bar2");
4107   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 4);
4108   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
4109 
4110   db_->ReleaseSnapshot(s1);
4111   db_->ReleaseSnapshot(s2);
4112   db_->ReleaseSnapshot(s3);
4113 }
4114 #endif  // ROCKSDB_LITE
4115 
4116 // When DB is reopened with multiple column families, the manifest file
4117 // is written after the first CF is flushed, and it is written again
4118 // after each flush. If DB crashes between the flushes, the flushed CF
4119 // flushed will pass the latest log file, and now we require it not
4120 // to be corrupted, and triggering a corruption report.
4121 // We need to fix the bug and enable the test.
TEST_F(DBTest2,CrashInRecoveryMultipleCF)4122 TEST_F(DBTest2, CrashInRecoveryMultipleCF) {
4123   const std::vector<std::string> sync_points = {
4124       "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable",
4125       "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0"};
4126   for (const auto& test_sync_point : sync_points) {
4127     Options options = CurrentOptions();
4128     // First destroy original db to ensure a clean start.
4129     DestroyAndReopen(options);
4130     options.create_if_missing = true;
4131     options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
4132     CreateAndReopenWithCF({"pikachu"}, options);
4133     ASSERT_OK(Put("foo", "bar"));
4134     ASSERT_OK(Flush());
4135     ASSERT_OK(Put(1, "foo", "bar"));
4136     ASSERT_OK(Flush(1));
4137     ASSERT_OK(Put("foo", "bar"));
4138     ASSERT_OK(Put(1, "foo", "bar"));
4139     // The value is large enough to be divided to two blocks.
4140     std::string large_value(400, ' ');
4141     ASSERT_OK(Put("foo1", large_value));
4142     ASSERT_OK(Put("foo2", large_value));
4143     Close();
4144 
4145     // Corrupt the log file in the middle, so that it is not corrupted
4146     // in the tail.
4147     std::vector<std::string> filenames;
4148     ASSERT_OK(env_->GetChildren(dbname_, &filenames));
4149     for (const auto& f : filenames) {
4150       uint64_t number;
4151       FileType type;
4152       if (ParseFileName(f, &number, &type) && type == FileType::kLogFile) {
4153         std::string fname = dbname_ + "/" + f;
4154         std::string file_content;
4155         ASSERT_OK(ReadFileToString(env_, fname, &file_content));
4156         file_content[400] = 'h';
4157         file_content[401] = 'a';
4158         ASSERT_OK(WriteStringToFile(env_, file_content, fname));
4159         break;
4160       }
4161     }
4162 
4163     // Reopen and freeze the file system after the first manifest write.
4164     FaultInjectionTestEnv fit_env(options.env);
4165     options.env = &fit_env;
4166     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
4167     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4168         test_sync_point,
4169         [&](void* /*arg*/) { fit_env.SetFilesystemActive(false); });
4170     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4171     ASSERT_NOK(TryReopenWithColumnFamilies(
4172         {kDefaultColumnFamilyName, "pikachu"}, options));
4173     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4174 
4175     fit_env.SetFilesystemActive(true);
4176     // If we continue using failure ingestion Env, it will conplain something
4177     // when renaming current file, which is not expected. Need to investigate
4178     // why.
4179     options.env = env_;
4180     ASSERT_OK(TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
4181                                           options));
4182   }
4183 }
4184 
TEST_F(DBTest2,SeekFileRangeDeleteTail)4185 TEST_F(DBTest2, SeekFileRangeDeleteTail) {
4186   Options options = CurrentOptions();
4187   options.prefix_extractor.reset(NewCappedPrefixTransform(1));
4188   options.num_levels = 3;
4189   DestroyAndReopen(options);
4190 
4191   ASSERT_OK(Put("a", "a"));
4192   const Snapshot* s1 = db_->GetSnapshot();
4193   ASSERT_OK(
4194       db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "f"));
4195   ASSERT_OK(Put("b", "a"));
4196   ASSERT_OK(Flush());
4197 
4198   ASSERT_OK(Put("x", "a"));
4199   ASSERT_OK(Put("z", "a"));
4200   ASSERT_OK(Flush());
4201 
4202   CompactRangeOptions cro;
4203   cro.change_level = true;
4204   cro.target_level = 2;
4205   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
4206 
4207   {
4208     ReadOptions ro;
4209     ro.total_order_seek = true;
4210     std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
4211     iter->Seek("e");
4212     ASSERT_TRUE(iter->Valid());
4213     ASSERT_EQ("x", iter->key().ToString());
4214   }
4215   db_->ReleaseSnapshot(s1);
4216 }
4217 
TEST_F(DBTest2,BackgroundPurgeTest)4218 TEST_F(DBTest2, BackgroundPurgeTest) {
4219   Options options = CurrentOptions();
4220   options.write_buffer_manager =
4221       std::make_shared<ROCKSDB_NAMESPACE::WriteBufferManager>(1 << 20);
4222   options.avoid_unnecessary_blocking_io = true;
4223   DestroyAndReopen(options);
4224   size_t base_value = options.write_buffer_manager->memory_usage();
4225 
4226   ASSERT_OK(Put("a", "a"));
4227   Iterator* iter = db_->NewIterator(ReadOptions());
4228   ASSERT_OK(Flush());
4229   size_t value = options.write_buffer_manager->memory_usage();
4230   ASSERT_GT(value, base_value);
4231 
4232   db_->GetEnv()->SetBackgroundThreads(1, Env::Priority::HIGH);
4233   test::SleepingBackgroundTask sleeping_task_after;
4234   db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4235                           &sleeping_task_after, Env::Priority::HIGH);
4236   delete iter;
4237 
4238   Env::Default()->SleepForMicroseconds(100000);
4239   value = options.write_buffer_manager->memory_usage();
4240   ASSERT_GT(value, base_value);
4241 
4242   sleeping_task_after.WakeUp();
4243   sleeping_task_after.WaitUntilDone();
4244 
4245   test::SleepingBackgroundTask sleeping_task_after2;
4246   db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4247                           &sleeping_task_after2, Env::Priority::HIGH);
4248   sleeping_task_after2.WakeUp();
4249   sleeping_task_after2.WaitUntilDone();
4250 
4251   value = options.write_buffer_manager->memory_usage();
4252   ASSERT_EQ(base_value, value);
4253 }
4254 
TEST_F(DBTest2,SwitchMemtableRaceWithNewManifest)4255 TEST_F(DBTest2, SwitchMemtableRaceWithNewManifest) {
4256   Options options = CurrentOptions();
4257   DestroyAndReopen(options);
4258   options.max_manifest_file_size = 10;
4259   options.create_if_missing = true;
4260   CreateAndReopenWithCF({"pikachu"}, options);
4261   ASSERT_EQ(2, handles_.size());
4262 
4263   ASSERT_OK(Put("foo", "value"));
4264   const int kL0Files = options.level0_file_num_compaction_trigger;
4265   for (int i = 0; i < kL0Files; ++i) {
4266     ASSERT_OK(Put(/*cf=*/1, "a", std::to_string(i)));
4267     ASSERT_OK(Flush(/*cf=*/1));
4268   }
4269 
4270   port::Thread thread([&]() { ASSERT_OK(Flush()); });
4271   ASSERT_OK(dbfull()->TEST_WaitForCompact());
4272   thread.join();
4273 }
4274 
TEST_F(DBTest2,SameSmallestInSameLevel)4275 TEST_F(DBTest2, SameSmallestInSameLevel) {
4276   // This test validates fractional casacading logic when several files at one
4277   // one level only contains the same user key.
4278   Options options = CurrentOptions();
4279   options.merge_operator = MergeOperators::CreateStringAppendOperator();
4280   DestroyAndReopen(options);
4281 
4282   ASSERT_OK(Put("key", "1"));
4283   ASSERT_OK(Put("key", "2"));
4284   ASSERT_OK(db_->Merge(WriteOptions(), "key", "3"));
4285   ASSERT_OK(db_->Merge(WriteOptions(), "key", "4"));
4286   Flush();
4287   CompactRangeOptions cro;
4288   cro.change_level = true;
4289   cro.target_level = 2;
4290   ASSERT_OK(dbfull()->CompactRange(cro, db_->DefaultColumnFamily(), nullptr,
4291                                    nullptr));
4292 
4293   ASSERT_OK(db_->Merge(WriteOptions(), "key", "5"));
4294   Flush();
4295   ASSERT_OK(db_->Merge(WriteOptions(), "key", "6"));
4296   Flush();
4297   ASSERT_OK(db_->Merge(WriteOptions(), "key", "7"));
4298   Flush();
4299   ASSERT_OK(db_->Merge(WriteOptions(), "key", "8"));
4300   Flush();
4301   dbfull()->TEST_WaitForCompact(true);
4302 #ifndef ROCKSDB_LITE
4303   ASSERT_EQ("0,4,1", FilesPerLevel());
4304 #endif  // ROCKSDB_LITE
4305 
4306   ASSERT_EQ("2,3,4,5,6,7,8", Get("key"));
4307 }
4308 
TEST_F(DBTest2,BlockBasedTablePrefixIndexSeekForPrev)4309 TEST_F(DBTest2, BlockBasedTablePrefixIndexSeekForPrev) {
4310   // create a DB with block prefix index
4311   BlockBasedTableOptions table_options;
4312   Options options = CurrentOptions();
4313   table_options.block_size = 300;
4314   table_options.index_type = BlockBasedTableOptions::kHashSearch;
4315   table_options.index_shortening =
4316       BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
4317   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4318   options.prefix_extractor.reset(NewFixedPrefixTransform(1));
4319 
4320   Reopen(options);
4321 
4322   Random rnd(301);
4323   std::string large_value = RandomString(&rnd, 500);
4324 
4325   ASSERT_OK(Put("a1", large_value));
4326   ASSERT_OK(Put("x1", large_value));
4327   ASSERT_OK(Put("y1", large_value));
4328   Flush();
4329 
4330   {
4331     std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
4332     iterator->SeekForPrev("x3");
4333     ASSERT_TRUE(iterator->Valid());
4334     ASSERT_EQ("x1", iterator->key().ToString());
4335 
4336     iterator->SeekForPrev("a3");
4337     ASSERT_TRUE(iterator->Valid());
4338     ASSERT_EQ("a1", iterator->key().ToString());
4339 
4340     iterator->SeekForPrev("y3");
4341     ASSERT_TRUE(iterator->Valid());
4342     ASSERT_EQ("y1", iterator->key().ToString());
4343 
4344     // Query more than one non-existing prefix to cover the case both
4345     // of empty hash bucket and hash bucket conflict.
4346     iterator->SeekForPrev("b1");
4347     // Result should be not valid or "a1".
4348     if (iterator->Valid()) {
4349       ASSERT_EQ("a1", iterator->key().ToString());
4350     }
4351 
4352     iterator->SeekForPrev("c1");
4353     // Result should be not valid or "a1".
4354     if (iterator->Valid()) {
4355       ASSERT_EQ("a1", iterator->key().ToString());
4356     }
4357 
4358     iterator->SeekForPrev("d1");
4359     // Result should be not valid or "a1".
4360     if (iterator->Valid()) {
4361       ASSERT_EQ("a1", iterator->key().ToString());
4362     }
4363 
4364     iterator->SeekForPrev("y3");
4365     ASSERT_TRUE(iterator->Valid());
4366     ASSERT_EQ("y1", iterator->key().ToString());
4367   }
4368 }
4369 
TEST_F(DBTest2,ChangePrefixExtractor)4370 TEST_F(DBTest2, ChangePrefixExtractor) {
4371   for (bool use_partitioned_filter : {true, false}) {
4372     // create a DB with block prefix index
4373     BlockBasedTableOptions table_options;
4374     Options options = CurrentOptions();
4375 
4376     // Sometimes filter is checked based on upper bound. Assert counters
4377     // for that case. Otherwise, only check data correctness.
4378 #ifndef ROCKSDB_LITE
4379     bool expect_filter_check = !use_partitioned_filter;
4380 #else
4381     bool expect_filter_check = false;
4382 #endif
4383     table_options.partition_filters = use_partitioned_filter;
4384     if (use_partitioned_filter) {
4385       table_options.index_type =
4386           BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
4387     }
4388     table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
4389 
4390     options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4391     options.statistics = CreateDBStatistics();
4392 
4393     options.prefix_extractor.reset(NewFixedPrefixTransform(2));
4394     DestroyAndReopen(options);
4395 
4396     Random rnd(301);
4397 
4398     ASSERT_OK(Put("aa", ""));
4399     ASSERT_OK(Put("xb", ""));
4400     ASSERT_OK(Put("xx1", ""));
4401     ASSERT_OK(Put("xz1", ""));
4402     ASSERT_OK(Put("zz", ""));
4403     Flush();
4404 
4405     // After reopening DB with prefix size 2 => 1, prefix extractor
4406     // won't take effective unless it won't change results based
4407     // on upper bound and seek key.
4408     options.prefix_extractor.reset(NewFixedPrefixTransform(1));
4409     Reopen(options);
4410 
4411     {
4412       std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
4413       iterator->Seek("xa");
4414       ASSERT_TRUE(iterator->Valid());
4415       ASSERT_EQ("xb", iterator->key().ToString());
4416       // It's a bug that the counter BLOOM_FILTER_PREFIX_CHECKED is not
4417       // correct in this case. So don't check counters in this case.
4418       if (expect_filter_check) {
4419         ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4420       }
4421 
4422       iterator->Seek("xz");
4423       ASSERT_TRUE(iterator->Valid());
4424       ASSERT_EQ("xz1", iterator->key().ToString());
4425       if (expect_filter_check) {
4426         ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4427       }
4428     }
4429 
4430     std::string ub_str = "xg9";
4431     Slice ub(ub_str);
4432     ReadOptions ro;
4433     ro.iterate_upper_bound = &ub;
4434 
4435     {
4436       std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4437 
4438       // SeekForPrev() never uses prefix bloom if it is changed.
4439       iterator->SeekForPrev("xg0");
4440       ASSERT_TRUE(iterator->Valid());
4441       ASSERT_EQ("xb", iterator->key().ToString());
4442       if (expect_filter_check) {
4443         ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4444       }
4445     }
4446 
4447     ub_str = "xx9";
4448     ub = Slice(ub_str);
4449     {
4450       std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4451 
4452       iterator->Seek("x");
4453       ASSERT_TRUE(iterator->Valid());
4454       ASSERT_EQ("xb", iterator->key().ToString());
4455       if (expect_filter_check) {
4456         ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4457       }
4458 
4459       iterator->Seek("xx0");
4460       ASSERT_TRUE(iterator->Valid());
4461       ASSERT_EQ("xx1", iterator->key().ToString());
4462       if (expect_filter_check) {
4463         ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4464       }
4465     }
4466 
4467     CompactRangeOptions compact_range_opts;
4468     compact_range_opts.bottommost_level_compaction =
4469         BottommostLevelCompaction::kForce;
4470     ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
4471     ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
4472 
4473     // Re-execute similar queries after a full compaction
4474     {
4475       std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
4476 
4477       iterator->Seek("x");
4478       ASSERT_TRUE(iterator->Valid());
4479       ASSERT_EQ("xb", iterator->key().ToString());
4480       if (expect_filter_check) {
4481         ASSERT_EQ(2, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4482       }
4483 
4484       iterator->Seek("xg");
4485       ASSERT_TRUE(iterator->Valid());
4486       ASSERT_EQ("xx1", iterator->key().ToString());
4487       if (expect_filter_check) {
4488         ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4489       }
4490 
4491       iterator->Seek("xz");
4492       ASSERT_TRUE(iterator->Valid());
4493       ASSERT_EQ("xz1", iterator->key().ToString());
4494       if (expect_filter_check) {
4495         ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4496       }
4497     }
4498     {
4499       std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4500 
4501       iterator->SeekForPrev("xx0");
4502       ASSERT_TRUE(iterator->Valid());
4503       ASSERT_EQ("xb", iterator->key().ToString());
4504       if (expect_filter_check) {
4505         ASSERT_EQ(5, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4506       }
4507 
4508       iterator->Seek("xx0");
4509       ASSERT_TRUE(iterator->Valid());
4510       ASSERT_EQ("xx1", iterator->key().ToString());
4511       if (expect_filter_check) {
4512         ASSERT_EQ(6, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4513       }
4514     }
4515 
4516     ub_str = "xg9";
4517     ub = Slice(ub_str);
4518     {
4519       std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4520       iterator->SeekForPrev("xg0");
4521       ASSERT_TRUE(iterator->Valid());
4522       ASSERT_EQ("xb", iterator->key().ToString());
4523       if (expect_filter_check) {
4524         ASSERT_EQ(7, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4525       }
4526     }
4527   }
4528 }
4529 
TEST_F(DBTest2,BlockBasedTablePrefixGetIndexNotFound)4530 TEST_F(DBTest2, BlockBasedTablePrefixGetIndexNotFound) {
4531   // create a DB with block prefix index
4532   BlockBasedTableOptions table_options;
4533   Options options = CurrentOptions();
4534   table_options.block_size = 300;
4535   table_options.index_type = BlockBasedTableOptions::kHashSearch;
4536   table_options.index_shortening =
4537       BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
4538   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4539   options.prefix_extractor.reset(NewFixedPrefixTransform(1));
4540   options.level0_file_num_compaction_trigger = 8;
4541 
4542   Reopen(options);
4543 
4544   ASSERT_OK(Put("b1", "ok"));
4545   Flush();
4546 
4547   // Flushing several files so that the chance that hash bucket
4548   // is empty fo "b" in at least one of the files is high.
4549   ASSERT_OK(Put("a1", ""));
4550   ASSERT_OK(Put("c1", ""));
4551   Flush();
4552 
4553   ASSERT_OK(Put("a2", ""));
4554   ASSERT_OK(Put("c2", ""));
4555   Flush();
4556 
4557   ASSERT_OK(Put("a3", ""));
4558   ASSERT_OK(Put("c3", ""));
4559   Flush();
4560 
4561   ASSERT_OK(Put("a4", ""));
4562   ASSERT_OK(Put("c4", ""));
4563   Flush();
4564 
4565   ASSERT_OK(Put("a5", ""));
4566   ASSERT_OK(Put("c5", ""));
4567   Flush();
4568 
4569   ASSERT_EQ("ok", Get("b1"));
4570 }
4571 
4572 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,AutoPrefixMode1)4573 TEST_F(DBTest2, AutoPrefixMode1) {
4574   // create a DB with block prefix index
4575   BlockBasedTableOptions table_options;
4576   Options options = CurrentOptions();
4577   table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
4578   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4579   options.prefix_extractor.reset(NewFixedPrefixTransform(1));
4580   options.statistics = CreateDBStatistics();
4581 
4582   Reopen(options);
4583 
4584   Random rnd(301);
4585   std::string large_value = RandomString(&rnd, 500);
4586 
4587   ASSERT_OK(Put("a1", large_value));
4588   ASSERT_OK(Put("x1", large_value));
4589   ASSERT_OK(Put("y1", large_value));
4590   Flush();
4591 
4592   ReadOptions ro;
4593   ro.total_order_seek = false;
4594   ro.auto_prefix_mode = true;
4595   {
4596     std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4597     iterator->Seek("b1");
4598     ASSERT_TRUE(iterator->Valid());
4599     ASSERT_EQ("x1", iterator->key().ToString());
4600     ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4601   }
4602 
4603   std::string ub_str = "b9";
4604   Slice ub(ub_str);
4605   ro.iterate_upper_bound = &ub;
4606 
4607   {
4608     std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4609     iterator->Seek("b1");
4610     ASSERT_FALSE(iterator->Valid());
4611     ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4612   }
4613 
4614   ub_str = "z";
4615   ub = Slice(ub_str);
4616   {
4617     std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4618     iterator->Seek("b1");
4619     ASSERT_TRUE(iterator->Valid());
4620     ASSERT_EQ("x1", iterator->key().ToString());
4621     ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4622   }
4623 
4624   ub_str = "c";
4625   ub = Slice(ub_str);
4626   {
4627     std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4628     iterator->Seek("b1");
4629     ASSERT_FALSE(iterator->Valid());
4630     ASSERT_EQ(2, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4631   }
4632 
4633   // The same queries without recreating iterator
4634   {
4635     ub_str = "b9";
4636     ub = Slice(ub_str);
4637     ro.iterate_upper_bound = &ub;
4638 
4639     std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4640     iterator->Seek("b1");
4641     ASSERT_FALSE(iterator->Valid());
4642     ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4643 
4644     ub_str = "z";
4645     ub = Slice(ub_str);
4646 
4647     iterator->Seek("b1");
4648     ASSERT_TRUE(iterator->Valid());
4649     ASSERT_EQ("x1", iterator->key().ToString());
4650     ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4651 
4652     ub_str = "c";
4653     ub = Slice(ub_str);
4654 
4655     iterator->Seek("b1");
4656     ASSERT_FALSE(iterator->Valid());
4657     ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4658 
4659     ub_str = "b9";
4660     ub = Slice(ub_str);
4661     ro.iterate_upper_bound = &ub;
4662     iterator->SeekForPrev("b1");
4663     ASSERT_TRUE(iterator->Valid());
4664     ASSERT_EQ("a1", iterator->key().ToString());
4665     ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4666 
4667     ub_str = "zz";
4668     ub = Slice(ub_str);
4669     ro.iterate_upper_bound = &ub;
4670     iterator->SeekToLast();
4671     ASSERT_TRUE(iterator->Valid());
4672     ASSERT_EQ("y1", iterator->key().ToString());
4673 
4674     iterator->SeekToFirst();
4675     ASSERT_TRUE(iterator->Valid());
4676     ASSERT_EQ("a1", iterator->key().ToString());
4677   }
4678 }
4679 #endif  // ROCKSDB_LITE
4680 }  // namespace ROCKSDB_NAMESPACE
4681 
4682 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
4683 extern "C" {
4684 void RegisterCustomObjects(int argc, char** argv);
4685 }
4686 #else
RegisterCustomObjects(int,char **)4687 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
4688 #endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
4689 
main(int argc,char ** argv)4690 int main(int argc, char** argv) {
4691   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
4692   ::testing::InitGoogleTest(&argc, argv);
4693   RegisterCustomObjects(argc, argv);
4694   return RUN_ALL_TESTS();
4695 }
4696