1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include <atomic>
10 #include <cstdlib>
11 #include <functional>
12
13 #include "db/db_test_util.h"
14 #include "db/read_callback.h"
15 #include "port/port.h"
16 #include "port/stack_trace.h"
17 #include "rocksdb/persistent_cache.h"
18 #include "rocksdb/wal_filter.h"
19 #include "test_util/fault_injection_test_env.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
23 class DBTest2 : public DBTestBase {
24 public:
DBTest2()25 DBTest2() : DBTestBase("/db_test2") {}
26 };
27
28 class PrefixFullBloomWithReverseComparator
29 : public DBTestBase,
30 public ::testing::WithParamInterface<bool> {
31 public:
PrefixFullBloomWithReverseComparator()32 PrefixFullBloomWithReverseComparator()
33 : DBTestBase("/prefix_bloom_reverse") {}
SetUp()34 void SetUp() override { if_cache_filter_ = GetParam(); }
35 bool if_cache_filter_;
36 };
37
TEST_P(PrefixFullBloomWithReverseComparator,PrefixFullBloomWithReverseComparator)38 TEST_P(PrefixFullBloomWithReverseComparator,
39 PrefixFullBloomWithReverseComparator) {
40 Options options = last_options_;
41 options.comparator = ReverseBytewiseComparator();
42 options.prefix_extractor.reset(NewCappedPrefixTransform(3));
43 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
44 BlockBasedTableOptions bbto;
45 if (if_cache_filter_) {
46 bbto.no_block_cache = false;
47 bbto.cache_index_and_filter_blocks = true;
48 bbto.block_cache = NewLRUCache(1);
49 }
50 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
51 bbto.whole_key_filtering = false;
52 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
53 DestroyAndReopen(options);
54
55 ASSERT_OK(dbfull()->Put(WriteOptions(), "bar123", "foo"));
56 ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2"));
57 ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3"));
58
59 dbfull()->Flush(FlushOptions());
60
61 if (bbto.block_cache) {
62 bbto.block_cache->EraseUnRefEntries();
63 }
64
65 std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
66 iter->Seek("bar345");
67 ASSERT_OK(iter->status());
68 ASSERT_TRUE(iter->Valid());
69 ASSERT_EQ("bar234", iter->key().ToString());
70 ASSERT_EQ("foo2", iter->value().ToString());
71 iter->Next();
72 ASSERT_TRUE(iter->Valid());
73 ASSERT_EQ("bar123", iter->key().ToString());
74 ASSERT_EQ("foo", iter->value().ToString());
75
76 iter->Seek("foo234");
77 ASSERT_OK(iter->status());
78 ASSERT_TRUE(iter->Valid());
79 ASSERT_EQ("foo123", iter->key().ToString());
80 ASSERT_EQ("foo3", iter->value().ToString());
81
82 iter->Seek("bar");
83 ASSERT_OK(iter->status());
84 ASSERT_TRUE(!iter->Valid());
85 }
86
87 INSTANTIATE_TEST_CASE_P(PrefixFullBloomWithReverseComparator,
88 PrefixFullBloomWithReverseComparator, testing::Bool());
89
TEST_F(DBTest2,IteratorPropertyVersionNumber)90 TEST_F(DBTest2, IteratorPropertyVersionNumber) {
91 Put("", "");
92 Iterator* iter1 = db_->NewIterator(ReadOptions());
93 std::string prop_value;
94 ASSERT_OK(
95 iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
96 uint64_t version_number1 =
97 static_cast<uint64_t>(std::atoi(prop_value.c_str()));
98
99 Put("", "");
100 Flush();
101
102 Iterator* iter2 = db_->NewIterator(ReadOptions());
103 ASSERT_OK(
104 iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
105 uint64_t version_number2 =
106 static_cast<uint64_t>(std::atoi(prop_value.c_str()));
107
108 ASSERT_GT(version_number2, version_number1);
109
110 Put("", "");
111
112 Iterator* iter3 = db_->NewIterator(ReadOptions());
113 ASSERT_OK(
114 iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
115 uint64_t version_number3 =
116 static_cast<uint64_t>(std::atoi(prop_value.c_str()));
117
118 ASSERT_EQ(version_number2, version_number3);
119
120 iter1->SeekToFirst();
121 ASSERT_OK(
122 iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
123 uint64_t version_number1_new =
124 static_cast<uint64_t>(std::atoi(prop_value.c_str()));
125 ASSERT_EQ(version_number1, version_number1_new);
126
127 delete iter1;
128 delete iter2;
129 delete iter3;
130 }
131
TEST_F(DBTest2,CacheIndexAndFilterWithDBRestart)132 TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
133 Options options = CurrentOptions();
134 options.create_if_missing = true;
135 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
136 BlockBasedTableOptions table_options;
137 table_options.cache_index_and_filter_blocks = true;
138 table_options.filter_policy.reset(NewBloomFilterPolicy(20));
139 options.table_factory.reset(new BlockBasedTableFactory(table_options));
140 CreateAndReopenWithCF({"pikachu"}, options);
141
142 Put(1, "a", "begin");
143 Put(1, "z", "end");
144 ASSERT_OK(Flush(1));
145 TryReopenWithColumnFamilies({"default", "pikachu"}, options);
146
147 std::string value;
148 value = Get(1, "a");
149 }
150
TEST_F(DBTest2,MaxSuccessiveMergesChangeWithDBRecovery)151 TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) {
152 Options options = CurrentOptions();
153 options.create_if_missing = true;
154 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
155 options.max_successive_merges = 3;
156 options.merge_operator = MergeOperators::CreatePutOperator();
157 options.disable_auto_compactions = true;
158 DestroyAndReopen(options);
159 Put("poi", "Finch");
160 db_->Merge(WriteOptions(), "poi", "Reese");
161 db_->Merge(WriteOptions(), "poi", "Shaw");
162 db_->Merge(WriteOptions(), "poi", "Root");
163 options.max_successive_merges = 2;
164 Reopen(options);
165 }
166
167 #ifndef ROCKSDB_LITE
168 class DBTestSharedWriteBufferAcrossCFs
169 : public DBTestBase,
170 public testing::WithParamInterface<std::tuple<bool, bool>> {
171 public:
DBTestSharedWriteBufferAcrossCFs()172 DBTestSharedWriteBufferAcrossCFs()
173 : DBTestBase("/db_test_shared_write_buffer") {}
SetUp()174 void SetUp() override {
175 use_old_interface_ = std::get<0>(GetParam());
176 cost_cache_ = std::get<1>(GetParam());
177 }
178 bool use_old_interface_;
179 bool cost_cache_;
180 };
181
TEST_P(DBTestSharedWriteBufferAcrossCFs,SharedWriteBufferAcrossCFs)182 TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
183 Options options = CurrentOptions();
184 options.arena_block_size = 4096;
185
186 // Avoid undeterministic value by malloc_usable_size();
187 // Force arena block size to 1
188 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
189 "Arena::Arena:0", [&](void* arg) {
190 size_t* block_size = static_cast<size_t*>(arg);
191 *block_size = 1;
192 });
193
194 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
195 "Arena::AllocateNewBlock:0", [&](void* arg) {
196 std::pair<size_t*, size_t*>* pair =
197 static_cast<std::pair<size_t*, size_t*>*>(arg);
198 *std::get<0>(*pair) = *std::get<1>(*pair);
199 });
200 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
201
202 // The total soft write buffer size is about 105000
203 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
204 ASSERT_LT(cache->GetUsage(), 256 * 1024);
205
206 if (use_old_interface_) {
207 options.db_write_buffer_size = 120000; // this is the real limit
208 } else if (!cost_cache_) {
209 options.write_buffer_manager.reset(new WriteBufferManager(114285));
210 } else {
211 options.write_buffer_manager.reset(new WriteBufferManager(114285, cache));
212 }
213 options.write_buffer_size = 500000; // this is never hit
214 CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
215
216 WriteOptions wo;
217 wo.disableWAL = true;
218
219 std::function<void()> wait_flush = [&]() {
220 dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
221 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
222 dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
223 dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
224 };
225
226 // Create some data and flush "default" and "nikitich" so that they
227 // are newer CFs created.
228 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
229 Flush(3);
230 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
231 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
232 Flush(0);
233 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
234 static_cast<uint64_t>(1));
235 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
236 static_cast<uint64_t>(1));
237
238 ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
239 if (cost_cache_) {
240 ASSERT_GE(cache->GetUsage(), 256 * 1024);
241 ASSERT_LE(cache->GetUsage(), 2 * 256 * 1024);
242 }
243 wait_flush();
244 ASSERT_OK(Put(0, Key(1), DummyString(60000), wo));
245 if (cost_cache_) {
246 ASSERT_GE(cache->GetUsage(), 256 * 1024);
247 ASSERT_LE(cache->GetUsage(), 2 * 256 * 1024);
248 }
249 wait_flush();
250 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
251 // No flush should trigger
252 wait_flush();
253 {
254 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
255 static_cast<uint64_t>(1));
256 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
257 static_cast<uint64_t>(0));
258 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
259 static_cast<uint64_t>(0));
260 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
261 static_cast<uint64_t>(1));
262 }
263
264 // Trigger a flush. Flushing "nikitich".
265 ASSERT_OK(Put(3, Key(2), DummyString(30000), wo));
266 wait_flush();
267 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
268 wait_flush();
269 {
270 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
271 static_cast<uint64_t>(1));
272 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
273 static_cast<uint64_t>(0));
274 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
275 static_cast<uint64_t>(0));
276 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
277 static_cast<uint64_t>(2));
278 }
279
280 // Without hitting the threshold, no flush should trigger.
281 ASSERT_OK(Put(2, Key(1), DummyString(30000), wo));
282 wait_flush();
283 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
284 wait_flush();
285 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
286 wait_flush();
287 {
288 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
289 static_cast<uint64_t>(1));
290 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
291 static_cast<uint64_t>(0));
292 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
293 static_cast<uint64_t>(0));
294 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
295 static_cast<uint64_t>(2));
296 }
297
298 // Hit the write buffer limit again. "default"
299 // will have been flushed.
300 ASSERT_OK(Put(2, Key(2), DummyString(10000), wo));
301 wait_flush();
302 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
303 wait_flush();
304 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
305 wait_flush();
306 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
307 wait_flush();
308 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
309 wait_flush();
310 {
311 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
312 static_cast<uint64_t>(2));
313 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
314 static_cast<uint64_t>(0));
315 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
316 static_cast<uint64_t>(0));
317 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
318 static_cast<uint64_t>(2));
319 }
320
321 // Trigger another flush. This time "dobrynia". "pikachu" should not
322 // be flushed, althrough it was never flushed.
323 ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
324 wait_flush();
325 ASSERT_OK(Put(2, Key(1), DummyString(80000), wo));
326 wait_flush();
327 ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
328 wait_flush();
329 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
330 wait_flush();
331
332 {
333 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
334 static_cast<uint64_t>(2));
335 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
336 static_cast<uint64_t>(0));
337 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
338 static_cast<uint64_t>(1));
339 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
340 static_cast<uint64_t>(2));
341 }
342 if (cost_cache_) {
343 ASSERT_GE(cache->GetUsage(), 256 * 1024);
344 Close();
345 options.write_buffer_manager.reset();
346 last_options_.write_buffer_manager.reset();
347 ASSERT_LT(cache->GetUsage(), 256 * 1024);
348 }
349 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
350 }
351
352 INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
353 DBTestSharedWriteBufferAcrossCFs,
354 ::testing::Values(std::make_tuple(true, false),
355 std::make_tuple(false, false),
356 std::make_tuple(false, true)));
357
TEST_F(DBTest2,SharedWriteBufferLimitAcrossDB)358 TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
359 std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2");
360 Options options = CurrentOptions();
361 options.arena_block_size = 4096;
362 // Avoid undeterministic value by malloc_usable_size();
363 // Force arena block size to 1
364 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
365 "Arena::Arena:0", [&](void* arg) {
366 size_t* block_size = static_cast<size_t*>(arg);
367 *block_size = 1;
368 });
369
370 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
371 "Arena::AllocateNewBlock:0", [&](void* arg) {
372 std::pair<size_t*, size_t*>* pair =
373 static_cast<std::pair<size_t*, size_t*>*>(arg);
374 *std::get<0>(*pair) = *std::get<1>(*pair);
375 });
376 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
377
378 options.write_buffer_size = 500000; // this is never hit
379 // Use a write buffer total size so that the soft limit is about
380 // 105000.
381 options.write_buffer_manager.reset(new WriteBufferManager(120000));
382 CreateAndReopenWithCF({"cf1", "cf2"}, options);
383
384 ASSERT_OK(DestroyDB(dbname2, options));
385 DB* db2 = nullptr;
386 ASSERT_OK(DB::Open(options, dbname2, &db2));
387
388 WriteOptions wo;
389 wo.disableWAL = true;
390
391 std::function<void()> wait_flush = [&]() {
392 dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
393 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
394 dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
395 static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
396 };
397
398 // Trigger a flush on cf2
399 ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
400 wait_flush();
401 ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));
402 wait_flush();
403
404 // Insert to DB2
405 ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
406 wait_flush();
407
408 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
409 wait_flush();
410 static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
411 {
412 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") +
413 GetNumberOfSstFilesForColumnFamily(db_, "cf1") +
414 GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
415 static_cast<uint64_t>(1));
416 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
417 static_cast<uint64_t>(0));
418 }
419
420 // Triggering to flush another CF in DB1
421 ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000)));
422 wait_flush();
423 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
424 wait_flush();
425 {
426 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
427 static_cast<uint64_t>(1));
428 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
429 static_cast<uint64_t>(0));
430 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
431 static_cast<uint64_t>(1));
432 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
433 static_cast<uint64_t>(0));
434 }
435
436 // Triggering flush in DB2.
437 ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000)));
438 wait_flush();
439 ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
440 wait_flush();
441 static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
442 {
443 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
444 static_cast<uint64_t>(1));
445 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
446 static_cast<uint64_t>(0));
447 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
448 static_cast<uint64_t>(1));
449 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
450 static_cast<uint64_t>(1));
451 }
452
453 delete db2;
454 ASSERT_OK(DestroyDB(dbname2, options));
455
456 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
457 }
458
TEST_F(DBTest2,TestWriteBufferNoLimitWithCache)459 TEST_F(DBTest2, TestWriteBufferNoLimitWithCache) {
460 Options options = CurrentOptions();
461 options.arena_block_size = 4096;
462 std::shared_ptr<Cache> cache =
463 NewLRUCache(LRUCacheOptions(10000000, 1, false, 0.0));
464 options.write_buffer_size = 50000; // this is never hit
465 // Use a write buffer total size so that the soft limit is about
466 // 105000.
467 options.write_buffer_manager.reset(new WriteBufferManager(0, cache));
468 Reopen(options);
469
470 ASSERT_OK(Put("foo", "bar"));
471 // One dummy entry is 256KB.
472 ASSERT_GT(cache->GetUsage(), 128000);
473 }
474
475 namespace {
ValidateKeyExistence(DB * db,const std::vector<Slice> & keys_must_exist,const std::vector<Slice> & keys_must_not_exist)476 void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
477 const std::vector<Slice>& keys_must_not_exist) {
478 // Ensure that expected keys exist
479 std::vector<std::string> values;
480 if (keys_must_exist.size() > 0) {
481 std::vector<Status> status_list =
482 db->MultiGet(ReadOptions(), keys_must_exist, &values);
483 for (size_t i = 0; i < keys_must_exist.size(); i++) {
484 ASSERT_OK(status_list[i]);
485 }
486 }
487
488 // Ensure that given keys don't exist
489 if (keys_must_not_exist.size() > 0) {
490 std::vector<Status> status_list =
491 db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
492 for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
493 ASSERT_TRUE(status_list[i].IsNotFound());
494 }
495 }
496 }
497
498 } // namespace
499
TEST_F(DBTest2,WalFilterTest)500 TEST_F(DBTest2, WalFilterTest) {
501 class TestWalFilter : public WalFilter {
502 private:
503 // Processing option that is requested to be applied at the given index
504 WalFilter::WalProcessingOption wal_processing_option_;
505 // Index at which to apply wal_processing_option_
506 // At other indexes default wal_processing_option::kContinueProcessing is
507 // returned.
508 size_t apply_option_at_record_index_;
509 // Current record index, incremented with each record encountered.
510 size_t current_record_index_;
511
512 public:
513 TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
514 size_t apply_option_for_record_index)
515 : wal_processing_option_(wal_processing_option),
516 apply_option_at_record_index_(apply_option_for_record_index),
517 current_record_index_(0) {}
518
519 WalProcessingOption LogRecord(const WriteBatch& /*batch*/,
520 WriteBatch* /*new_batch*/,
521 bool* /*batch_changed*/) const override {
522 WalFilter::WalProcessingOption option_to_return;
523
524 if (current_record_index_ == apply_option_at_record_index_) {
525 option_to_return = wal_processing_option_;
526 }
527 else {
528 option_to_return = WalProcessingOption::kContinueProcessing;
529 }
530
531 // Filter is passed as a const object for RocksDB to not modify the
532 // object, however we modify it for our own purpose here and hence
533 // cast the constness away.
534 (const_cast<TestWalFilter*>(this)->current_record_index_)++;
535
536 return option_to_return;
537 }
538
539 const char* Name() const override { return "TestWalFilter"; }
540 };
541
542 // Create 3 batches with two keys each
543 std::vector<std::vector<std::string>> batch_keys(3);
544
545 batch_keys[0].push_back("key1");
546 batch_keys[0].push_back("key2");
547 batch_keys[1].push_back("key3");
548 batch_keys[1].push_back("key4");
549 batch_keys[2].push_back("key5");
550 batch_keys[2].push_back("key6");
551
552 // Test with all WAL processing options
553 for (int option = 0;
554 option < static_cast<int>(
555 WalFilter::WalProcessingOption::kWalProcessingOptionMax);
556 option++) {
557 Options options = OptionsForLogIterTest();
558 DestroyAndReopen(options);
559 CreateAndReopenWithCF({ "pikachu" }, options);
560
561 // Write given keys in given batches
562 for (size_t i = 0; i < batch_keys.size(); i++) {
563 WriteBatch batch;
564 for (size_t j = 0; j < batch_keys[i].size(); j++) {
565 batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
566 }
567 dbfull()->Write(WriteOptions(), &batch);
568 }
569
570 WalFilter::WalProcessingOption wal_processing_option =
571 static_cast<WalFilter::WalProcessingOption>(option);
572
573 // Create a test filter that would apply wal_processing_option at the first
574 // record
575 size_t apply_option_for_record_index = 1;
576 TestWalFilter test_wal_filter(wal_processing_option,
577 apply_option_for_record_index);
578
579 // Reopen database with option to use WAL filter
580 options = OptionsForLogIterTest();
581 options.wal_filter = &test_wal_filter;
582 Status status =
583 TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
584 if (wal_processing_option ==
585 WalFilter::WalProcessingOption::kCorruptedRecord) {
586 assert(!status.ok());
587 // In case of corruption we can turn off paranoid_checks to reopen
588 // databse
589 options.paranoid_checks = false;
590 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
591 }
592 else {
593 assert(status.ok());
594 }
595
596 // Compute which keys we expect to be found
597 // and which we expect not to be found after recovery.
598 std::vector<Slice> keys_must_exist;
599 std::vector<Slice> keys_must_not_exist;
600 switch (wal_processing_option) {
601 case WalFilter::WalProcessingOption::kCorruptedRecord:
602 case WalFilter::WalProcessingOption::kContinueProcessing: {
603 fprintf(stderr, "Testing with complete WAL processing\n");
604 // we expect all records to be processed
605 for (size_t i = 0; i < batch_keys.size(); i++) {
606 for (size_t j = 0; j < batch_keys[i].size(); j++) {
607 keys_must_exist.push_back(Slice(batch_keys[i][j]));
608 }
609 }
610 break;
611 }
612 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
613 fprintf(stderr,
614 "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
615 apply_option_for_record_index);
616 // We expect the record with apply_option_for_record_index to be not
617 // found.
618 for (size_t i = 0; i < batch_keys.size(); i++) {
619 for (size_t j = 0; j < batch_keys[i].size(); j++) {
620 if (i == apply_option_for_record_index) {
621 keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
622 }
623 else {
624 keys_must_exist.push_back(Slice(batch_keys[i][j]));
625 }
626 }
627 }
628 break;
629 }
630 case WalFilter::WalProcessingOption::kStopReplay: {
631 fprintf(stderr,
632 "Testing with stopping replay from record %" ROCKSDB_PRIszt
633 "\n",
634 apply_option_for_record_index);
635 // We expect records beyond apply_option_for_record_index to be not
636 // found.
637 for (size_t i = 0; i < batch_keys.size(); i++) {
638 for (size_t j = 0; j < batch_keys[i].size(); j++) {
639 if (i >= apply_option_for_record_index) {
640 keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
641 }
642 else {
643 keys_must_exist.push_back(Slice(batch_keys[i][j]));
644 }
645 }
646 }
647 break;
648 }
649 default:
650 assert(false); // unhandled case
651 }
652
653 bool checked_after_reopen = false;
654
655 while (true) {
656 // Ensure that expected keys exists
657 // and not expected keys don't exist after recovery
658 ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
659
660 if (checked_after_reopen) {
661 break;
662 }
663
664 // reopen database again to make sure previous log(s) are not used
665 //(even if they were skipped)
666 // reopn database with option to use WAL filter
667 options = OptionsForLogIterTest();
668 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
669
670 checked_after_reopen = true;
671 }
672 }
673 }
674
TEST_F(DBTest2,WalFilterTestWithChangeBatch)675 TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
676 class ChangeBatchHandler : public WriteBatch::Handler {
677 private:
678 // Batch to insert keys in
679 WriteBatch* new_write_batch_;
680 // Number of keys to add in the new batch
681 size_t num_keys_to_add_in_new_batch_;
682 // Number of keys added to new batch
683 size_t num_keys_added_;
684
685 public:
686 ChangeBatchHandler(WriteBatch* new_write_batch,
687 size_t num_keys_to_add_in_new_batch)
688 : new_write_batch_(new_write_batch),
689 num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
690 num_keys_added_(0) {}
691 void Put(const Slice& key, const Slice& value) override {
692 if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
693 new_write_batch_->Put(key, value);
694 ++num_keys_added_;
695 }
696 }
697 };
698
699 class TestWalFilterWithChangeBatch : public WalFilter {
700 private:
701 // Index at which to start changing records
702 size_t change_records_from_index_;
703 // Number of keys to add in the new batch
704 size_t num_keys_to_add_in_new_batch_;
705 // Current record index, incremented with each record encountered.
706 size_t current_record_index_;
707
708 public:
709 TestWalFilterWithChangeBatch(size_t change_records_from_index,
710 size_t num_keys_to_add_in_new_batch)
711 : change_records_from_index_(change_records_from_index),
712 num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
713 current_record_index_(0) {}
714
715 WalProcessingOption LogRecord(const WriteBatch& batch,
716 WriteBatch* new_batch,
717 bool* batch_changed) const override {
718 if (current_record_index_ >= change_records_from_index_) {
719 ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
720 batch.Iterate(&handler);
721 *batch_changed = true;
722 }
723
724 // Filter is passed as a const object for RocksDB to not modify the
725 // object, however we modify it for our own purpose here and hence
726 // cast the constness away.
727 (const_cast<TestWalFilterWithChangeBatch*>(this)
728 ->current_record_index_)++;
729
730 return WalProcessingOption::kContinueProcessing;
731 }
732
733 const char* Name() const override { return "TestWalFilterWithChangeBatch"; }
734 };
735
736 std::vector<std::vector<std::string>> batch_keys(3);
737
738 batch_keys[0].push_back("key1");
739 batch_keys[0].push_back("key2");
740 batch_keys[1].push_back("key3");
741 batch_keys[1].push_back("key4");
742 batch_keys[2].push_back("key5");
743 batch_keys[2].push_back("key6");
744
745 Options options = OptionsForLogIterTest();
746 DestroyAndReopen(options);
747 CreateAndReopenWithCF({ "pikachu" }, options);
748
749 // Write given keys in given batches
750 for (size_t i = 0; i < batch_keys.size(); i++) {
751 WriteBatch batch;
752 for (size_t j = 0; j < batch_keys[i].size(); j++) {
753 batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
754 }
755 dbfull()->Write(WriteOptions(), &batch);
756 }
757
758 // Create a test filter that would apply wal_processing_option at the first
759 // record
760 size_t change_records_from_index = 1;
761 size_t num_keys_to_add_in_new_batch = 1;
762 TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
763 change_records_from_index, num_keys_to_add_in_new_batch);
764
765 // Reopen database with option to use WAL filter
766 options = OptionsForLogIterTest();
767 options.wal_filter = &test_wal_filter_with_change_batch;
768 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
769
770 // Ensure that all keys exist before change_records_from_index_
771 // And after that index only single key exists
772 // as our filter adds only single key for each batch
773 std::vector<Slice> keys_must_exist;
774 std::vector<Slice> keys_must_not_exist;
775
776 for (size_t i = 0; i < batch_keys.size(); i++) {
777 for (size_t j = 0; j < batch_keys[i].size(); j++) {
778 if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
779 keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
780 }
781 else {
782 keys_must_exist.push_back(Slice(batch_keys[i][j]));
783 }
784 }
785 }
786
787 bool checked_after_reopen = false;
788
789 while (true) {
790 // Ensure that expected keys exists
791 // and not expected keys don't exist after recovery
792 ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
793
794 if (checked_after_reopen) {
795 break;
796 }
797
798 // reopen database again to make sure previous log(s) are not used
799 //(even if they were skipped)
800 // reopn database with option to use WAL filter
801 options = OptionsForLogIterTest();
802 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
803
804 checked_after_reopen = true;
805 }
806 }
807
TEST_F(DBTest2,WalFilterTestWithChangeBatchExtraKeys)808 TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
809 class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
810 public:
811 WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch,
812 bool* batch_changed) const override {
813 *new_batch = batch;
814 new_batch->Put("key_extra", "value_extra");
815 *batch_changed = true;
816 return WalProcessingOption::kContinueProcessing;
817 }
818
819 const char* Name() const override {
820 return "WalFilterTestWithChangeBatchExtraKeys";
821 }
822 };
823
824 std::vector<std::vector<std::string>> batch_keys(3);
825
826 batch_keys[0].push_back("key1");
827 batch_keys[0].push_back("key2");
828 batch_keys[1].push_back("key3");
829 batch_keys[1].push_back("key4");
830 batch_keys[2].push_back("key5");
831 batch_keys[2].push_back("key6");
832
833 Options options = OptionsForLogIterTest();
834 DestroyAndReopen(options);
835 CreateAndReopenWithCF({ "pikachu" }, options);
836
837 // Write given keys in given batches
838 for (size_t i = 0; i < batch_keys.size(); i++) {
839 WriteBatch batch;
840 for (size_t j = 0; j < batch_keys[i].size(); j++) {
841 batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
842 }
843 dbfull()->Write(WriteOptions(), &batch);
844 }
845
846 // Create a test filter that would add extra keys
847 TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys;
848
849 // Reopen database with option to use WAL filter
850 options = OptionsForLogIterTest();
851 options.wal_filter = &test_wal_filter_extra_keys;
852 Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
853 ASSERT_TRUE(status.IsNotSupported());
854
855 // Reopen without filter, now reopen should succeed - previous
856 // attempt to open must not have altered the db.
857 options = OptionsForLogIterTest();
858 ReopenWithColumnFamilies({ "default", "pikachu" }, options);
859
860 std::vector<Slice> keys_must_exist;
861 std::vector<Slice> keys_must_not_exist; // empty vector
862
863 for (size_t i = 0; i < batch_keys.size(); i++) {
864 for (size_t j = 0; j < batch_keys[i].size(); j++) {
865 keys_must_exist.push_back(Slice(batch_keys[i][j]));
866 }
867 }
868
869 ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
870 }
871
TEST_F(DBTest2,WalFilterTestWithColumnFamilies)872 TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
873 class TestWalFilterWithColumnFamilies : public WalFilter {
874 private:
875 // column_family_id -> log_number map (provided to WALFilter)
876 std::map<uint32_t, uint64_t> cf_log_number_map_;
877 // column_family_name -> column_family_id map (provided to WALFilter)
878 std::map<std::string, uint32_t> cf_name_id_map_;
879 // column_family_name -> keys_found_in_wal map
880 // We store keys that are applicable to the column_family
881 // during recovery (i.e. aren't already flushed to SST file(s))
882 // for verification against the keys we expect.
883 std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
884 public:
885 void ColumnFamilyLogNumberMap(
886 const std::map<uint32_t, uint64_t>& cf_lognumber_map,
887 const std::map<std::string, uint32_t>& cf_name_id_map) override {
888 cf_log_number_map_ = cf_lognumber_map;
889 cf_name_id_map_ = cf_name_id_map;
890 }
891
892 WalProcessingOption LogRecordFound(unsigned long long log_number,
893 const std::string& /*log_file_name*/,
894 const WriteBatch& batch,
895 WriteBatch* /*new_batch*/,
896 bool* /*batch_changed*/) override {
897 class LogRecordBatchHandler : public WriteBatch::Handler {
898 private:
899 const std::map<uint32_t, uint64_t> & cf_log_number_map_;
900 std::map<uint32_t, std::vector<std::string>> & cf_wal_keys_;
901 unsigned long long log_number_;
902 public:
903 LogRecordBatchHandler(unsigned long long current_log_number,
904 const std::map<uint32_t, uint64_t> & cf_log_number_map,
905 std::map<uint32_t, std::vector<std::string>> & cf_wal_keys) :
906 cf_log_number_map_(cf_log_number_map),
907 cf_wal_keys_(cf_wal_keys),
908 log_number_(current_log_number){}
909
910 Status PutCF(uint32_t column_family_id, const Slice& key,
911 const Slice& /*value*/) override {
912 auto it = cf_log_number_map_.find(column_family_id);
913 assert(it != cf_log_number_map_.end());
914 unsigned long long log_number_for_cf = it->second;
915 // If the current record is applicable for column_family_id
916 // (i.e. isn't flushed to SST file(s) for column_family_id)
917 // add it to the cf_wal_keys_ map for verification.
918 if (log_number_ >= log_number_for_cf) {
919 cf_wal_keys_[column_family_id].push_back(std::string(key.data(),
920 key.size()));
921 }
922 return Status::OK();
923 }
924 } handler(log_number, cf_log_number_map_, cf_wal_keys_);
925
926 batch.Iterate(&handler);
927
928 return WalProcessingOption::kContinueProcessing;
929 }
930
931 const char* Name() const override {
932 return "WalFilterTestWithColumnFamilies";
933 }
934
935 const std::map<uint32_t, std::vector<std::string>>& GetColumnFamilyKeys() {
936 return cf_wal_keys_;
937 }
938
939 const std::map<std::string, uint32_t> & GetColumnFamilyNameIdMap() {
940 return cf_name_id_map_;
941 }
942 };
943
944 std::vector<std::vector<std::string>> batch_keys_pre_flush(3);
945
946 batch_keys_pre_flush[0].push_back("key1");
947 batch_keys_pre_flush[0].push_back("key2");
948 batch_keys_pre_flush[1].push_back("key3");
949 batch_keys_pre_flush[1].push_back("key4");
950 batch_keys_pre_flush[2].push_back("key5");
951 batch_keys_pre_flush[2].push_back("key6");
952
953 Options options = OptionsForLogIterTest();
954 DestroyAndReopen(options);
955 CreateAndReopenWithCF({ "pikachu" }, options);
956
957 // Write given keys in given batches
958 for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
959 WriteBatch batch;
960 for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
961 batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024));
962 batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024));
963 }
964 dbfull()->Write(WriteOptions(), &batch);
965 }
966
967 //Flush default column-family
968 db_->Flush(FlushOptions(), handles_[0]);
969
970 // Do some more writes
971 std::vector<std::vector<std::string>> batch_keys_post_flush(3);
972
973 batch_keys_post_flush[0].push_back("key7");
974 batch_keys_post_flush[0].push_back("key8");
975 batch_keys_post_flush[1].push_back("key9");
976 batch_keys_post_flush[1].push_back("key10");
977 batch_keys_post_flush[2].push_back("key11");
978 batch_keys_post_flush[2].push_back("key12");
979
980 // Write given keys in given batches
981 for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
982 WriteBatch batch;
983 for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
984 batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024));
985 batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024));
986 }
987 dbfull()->Write(WriteOptions(), &batch);
988 }
989
990 // On Recovery we should only find the second batch applicable to default CF
991 // But both batches applicable to pikachu CF
992
993 // Create a test filter that would add extra keys
994 TestWalFilterWithColumnFamilies test_wal_filter_column_families;
995
996 // Reopen database with option to use WAL filter
997 options = OptionsForLogIterTest();
998 options.wal_filter = &test_wal_filter_column_families;
999 Status status =
1000 TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
1001 ASSERT_TRUE(status.ok());
1002
1003 // verify that handles_[0] only has post_flush keys
1004 // while handles_[1] has pre and post flush keys
1005 auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys();
1006 auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
1007 size_t index = 0;
1008 auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
1009 //default column-family, only post_flush keys are expected
1010 for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
1011 for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
1012 Slice key_from_the_log(keys_cf[index++]);
1013 Slice batch_key(batch_keys_post_flush[i][j]);
1014 ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
1015 }
1016 }
1017 ASSERT_TRUE(index == keys_cf.size());
1018
1019 index = 0;
1020 keys_cf = cf_wal_keys[name_id_map["pikachu"]];
1021 //pikachu column-family, all keys are expected
1022 for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
1023 for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
1024 Slice key_from_the_log(keys_cf[index++]);
1025 Slice batch_key(batch_keys_pre_flush[i][j]);
1026 ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
1027 }
1028 }
1029
1030 for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
1031 for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
1032 Slice key_from_the_log(keys_cf[index++]);
1033 Slice batch_key(batch_keys_post_flush[i][j]);
1034 ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
1035 }
1036 }
1037 ASSERT_TRUE(index == keys_cf.size());
1038 }
1039
TEST_F(DBTest2,PresetCompressionDict)1040 TEST_F(DBTest2, PresetCompressionDict) {
1041 // Verifies that compression ratio improves when dictionary is enabled, and
1042 // improves even further when the dictionary is trained by ZSTD.
1043 const size_t kBlockSizeBytes = 4 << 10;
1044 const size_t kL0FileBytes = 128 << 10;
1045 const size_t kApproxPerBlockOverheadBytes = 50;
1046 const int kNumL0Files = 5;
1047
1048 Options options;
1049 // Make sure to use any custom env that the test is configured with.
1050 options.env = CurrentOptions().env;
1051 options.allow_concurrent_memtable_write = false;
1052 options.arena_block_size = kBlockSizeBytes;
1053 options.create_if_missing = true;
1054 options.disable_auto_compactions = true;
1055 options.level0_file_num_compaction_trigger = kNumL0Files;
1056 options.memtable_factory.reset(
1057 new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
1058 options.num_levels = 2;
1059 options.target_file_size_base = kL0FileBytes;
1060 options.target_file_size_multiplier = 2;
1061 options.write_buffer_size = kL0FileBytes;
1062 BlockBasedTableOptions table_options;
1063 table_options.block_size = kBlockSizeBytes;
1064 std::vector<CompressionType> compression_types;
1065 if (Zlib_Supported()) {
1066 compression_types.push_back(kZlibCompression);
1067 }
1068 #if LZ4_VERSION_NUMBER >= 10400 // r124+
1069 compression_types.push_back(kLZ4Compression);
1070 compression_types.push_back(kLZ4HCCompression);
1071 #endif // LZ4_VERSION_NUMBER >= 10400
1072 if (ZSTD_Supported()) {
1073 compression_types.push_back(kZSTD);
1074 }
1075
1076 enum DictionaryTypes : int {
1077 kWithoutDict,
1078 kWithDict,
1079 kWithZSTDTrainedDict,
1080 kDictEnd,
1081 };
1082
1083 for (auto compression_type : compression_types) {
1084 options.compression = compression_type;
1085 size_t bytes_without_dict = 0;
1086 size_t bytes_with_dict = 0;
1087 size_t bytes_with_zstd_trained_dict = 0;
1088 for (int i = kWithoutDict; i < kDictEnd; i++) {
1089 // First iteration: compress without preset dictionary
1090 // Second iteration: compress with preset dictionary
1091 // Third iteration (zstd only): compress with zstd-trained dictionary
1092 //
1093 // To make sure the compression dictionary has the intended effect, we
1094 // verify the compressed size is smaller in successive iterations. Also in
1095 // the non-first iterations, verify the data we get out is the same data
1096 // we put in.
1097 switch (i) {
1098 case kWithoutDict:
1099 options.compression_opts.max_dict_bytes = 0;
1100 options.compression_opts.zstd_max_train_bytes = 0;
1101 break;
1102 case kWithDict:
1103 options.compression_opts.max_dict_bytes = kBlockSizeBytes;
1104 options.compression_opts.zstd_max_train_bytes = 0;
1105 break;
1106 case kWithZSTDTrainedDict:
1107 if (compression_type != kZSTD) {
1108 continue;
1109 }
1110 options.compression_opts.max_dict_bytes = kBlockSizeBytes;
1111 options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
1112 break;
1113 default:
1114 assert(false);
1115 }
1116
1117 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1118 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1119 CreateAndReopenWithCF({"pikachu"}, options);
1120 Random rnd(301);
1121 std::string seq_datas[10];
1122 for (int j = 0; j < 10; ++j) {
1123 seq_datas[j] =
1124 RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
1125 }
1126
1127 ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
1128 for (int j = 0; j < kNumL0Files; ++j) {
1129 for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
1130 auto key_num = j * (kL0FileBytes / kBlockSizeBytes) + k;
1131 ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
1132 seq_datas[(key_num / 10) % 10]));
1133 }
1134 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1135 ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
1136 }
1137 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
1138 true /* disallow_trivial_move */);
1139 ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
1140 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
1141
1142 // Get the live sst files size
1143 size_t total_sst_bytes = TotalSize(1);
1144 if (i == kWithoutDict) {
1145 bytes_without_dict = total_sst_bytes;
1146 } else if (i == kWithDict) {
1147 bytes_with_dict = total_sst_bytes;
1148 } else if (i == kWithZSTDTrainedDict) {
1149 bytes_with_zstd_trained_dict = total_sst_bytes;
1150 }
1151
1152 for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
1153 j++) {
1154 ASSERT_EQ(seq_datas[(j / 10) % 10], Get(1, Key(static_cast<int>(j))));
1155 }
1156 if (i == kWithDict) {
1157 ASSERT_GT(bytes_without_dict, bytes_with_dict);
1158 } else if (i == kWithZSTDTrainedDict) {
1159 // In zstd compression, it is sometimes possible that using a trained
1160 // dictionary does not get as good a compression ratio as without
1161 // training.
1162 // But using a dictionary (with or without training) should always get
1163 // better compression ratio than not using one.
1164 ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_trained_dict ||
1165 bytes_without_dict > bytes_with_zstd_trained_dict);
1166 }
1167
1168 DestroyAndReopen(options);
1169 }
1170 }
1171 }
1172
TEST_F(DBTest2,PresetCompressionDictLocality)1173 TEST_F(DBTest2, PresetCompressionDictLocality) {
1174 if (!ZSTD_Supported()) {
1175 return;
1176 }
1177 // Verifies that compression dictionary is generated from local data. The
1178 // verification simply checks all output SSTs have different compression
1179 // dictionaries. We do not verify effectiveness as that'd likely be flaky in
1180 // the future.
1181 const int kNumEntriesPerFile = 1 << 10; // 1KB
1182 const int kNumBytesPerEntry = 1 << 10; // 1KB
1183 const int kNumFiles = 4;
1184 Options options = CurrentOptions();
1185 options.compression = kZSTD;
1186 options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
1187 options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
1188 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1189 options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
1190 BlockBasedTableOptions table_options;
1191 table_options.cache_index_and_filter_blocks = true;
1192 options.table_factory.reset(new BlockBasedTableFactory(table_options));
1193 Reopen(options);
1194
1195 Random rnd(301);
1196 for (int i = 0; i < kNumFiles; ++i) {
1197 for (int j = 0; j < kNumEntriesPerFile; ++j) {
1198 ASSERT_OK(Put(Key(i * kNumEntriesPerFile + j),
1199 RandomString(&rnd, kNumBytesPerEntry)));
1200 }
1201 ASSERT_OK(Flush());
1202 MoveFilesToLevel(1);
1203 ASSERT_EQ(NumTableFilesAtLevel(1), i + 1);
1204 }
1205
1206 // Store all the dictionaries generated during a full compaction.
1207 std::vector<std::string> compression_dicts;
1208 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1209 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1210 [&](void* arg) {
1211 compression_dicts.emplace_back(static_cast<Slice*>(arg)->ToString());
1212 });
1213 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1214 CompactRangeOptions compact_range_opts;
1215 compact_range_opts.bottommost_level_compaction =
1216 BottommostLevelCompaction::kForceOptimized;
1217 ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
1218
1219 // Dictionary compression should not be so good as to compress four totally
1220 // random files into one. If it does then there's probably something wrong
1221 // with the test.
1222 ASSERT_GT(NumTableFilesAtLevel(1), 1);
1223
1224 // Furthermore, there should be one compression dictionary generated per file.
1225 // And they should all be different from each other.
1226 ASSERT_EQ(NumTableFilesAtLevel(1),
1227 static_cast<int>(compression_dicts.size()));
1228 for (size_t i = 1; i < compression_dicts.size(); ++i) {
1229 std::string& a = compression_dicts[i - 1];
1230 std::string& b = compression_dicts[i];
1231 size_t alen = a.size();
1232 size_t blen = b.size();
1233 ASSERT_TRUE(alen != blen || memcmp(a.data(), b.data(), alen) != 0);
1234 }
1235 }
1236
1237 class CompactionCompressionListener : public EventListener {
1238 public:
CompactionCompressionListener(Options * db_options)1239 explicit CompactionCompressionListener(Options* db_options)
1240 : db_options_(db_options) {}
1241
OnCompactionCompleted(DB * db,const CompactionJobInfo & ci)1242 void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
1243 // Figure out last level with files
1244 int bottommost_level = 0;
1245 for (int level = 0; level < db->NumberLevels(); level++) {
1246 std::string files_at_level;
1247 ASSERT_TRUE(
1248 db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
1249 &files_at_level));
1250 if (files_at_level != "0") {
1251 bottommost_level = level;
1252 }
1253 }
1254
1255 if (db_options_->bottommost_compression != kDisableCompressionOption &&
1256 ci.output_level == bottommost_level) {
1257 ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
1258 } else if (db_options_->compression_per_level.size() != 0) {
1259 ASSERT_EQ(ci.compression,
1260 db_options_->compression_per_level[ci.output_level]);
1261 } else {
1262 ASSERT_EQ(ci.compression, db_options_->compression);
1263 }
1264 max_level_checked = std::max(max_level_checked, ci.output_level);
1265 }
1266
1267 int max_level_checked = 0;
1268 const Options* db_options_;
1269 };
1270
TEST_F(DBTest2,CompressionOptions)1271 TEST_F(DBTest2, CompressionOptions) {
1272 if (!Zlib_Supported() || !Snappy_Supported()) {
1273 return;
1274 }
1275
1276 Options options = CurrentOptions();
1277 options.level0_file_num_compaction_trigger = 2;
1278 options.max_bytes_for_level_base = 100;
1279 options.max_bytes_for_level_multiplier = 2;
1280 options.num_levels = 7;
1281 options.max_background_compactions = 1;
1282
1283 CompactionCompressionListener* listener =
1284 new CompactionCompressionListener(&options);
1285 options.listeners.emplace_back(listener);
1286
1287 const int kKeySize = 5;
1288 const int kValSize = 20;
1289 Random rnd(301);
1290
1291 for (int iter = 0; iter <= 2; iter++) {
1292 listener->max_level_checked = 0;
1293
1294 if (iter == 0) {
1295 // Use different compression algorithms for different levels but
1296 // always use Zlib for bottommost level
1297 options.compression_per_level = {kNoCompression, kNoCompression,
1298 kNoCompression, kSnappyCompression,
1299 kSnappyCompression, kSnappyCompression,
1300 kZlibCompression};
1301 options.compression = kNoCompression;
1302 options.bottommost_compression = kZlibCompression;
1303 } else if (iter == 1) {
1304 // Use Snappy except for bottommost level use ZLib
1305 options.compression_per_level = {};
1306 options.compression = kSnappyCompression;
1307 options.bottommost_compression = kZlibCompression;
1308 } else if (iter == 2) {
1309 // Use Snappy everywhere
1310 options.compression_per_level = {};
1311 options.compression = kSnappyCompression;
1312 options.bottommost_compression = kDisableCompressionOption;
1313 }
1314
1315 DestroyAndReopen(options);
1316 // Write 10 random files
1317 for (int i = 0; i < 10; i++) {
1318 for (int j = 0; j < 5; j++) {
1319 ASSERT_OK(
1320 Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize)));
1321 }
1322 ASSERT_OK(Flush());
1323 dbfull()->TEST_WaitForCompact();
1324 }
1325
1326 // Make sure that we wrote enough to check all 7 levels
1327 ASSERT_EQ(listener->max_level_checked, 6);
1328 }
1329 }
1330
1331 class CompactionStallTestListener : public EventListener {
1332 public:
CompactionStallTestListener()1333 CompactionStallTestListener() : compacting_files_cnt_(0), compacted_files_cnt_(0) {}
1334
OnCompactionBegin(DB *,const CompactionJobInfo & ci)1335 void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
1336 ASSERT_EQ(ci.cf_name, "default");
1337 ASSERT_EQ(ci.base_input_level, 0);
1338 ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
1339 compacting_files_cnt_ += ci.input_files.size();
1340 }
1341
OnCompactionCompleted(DB *,const CompactionJobInfo & ci)1342 void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
1343 ASSERT_EQ(ci.cf_name, "default");
1344 ASSERT_EQ(ci.base_input_level, 0);
1345 ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
1346 compacted_files_cnt_ += ci.input_files.size();
1347 }
1348
1349 std::atomic<size_t> compacting_files_cnt_;
1350 std::atomic<size_t> compacted_files_cnt_;
1351 };
1352
TEST_F(DBTest2,CompactionStall)1353 TEST_F(DBTest2, CompactionStall) {
1354 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1355 {{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"},
1356 {"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"},
1357 {"DBTest2::CompactionStall:2",
1358 "DBImpl::NotifyOnCompactionBegin::UnlockMutex"},
1359 {"DBTest2::CompactionStall:3",
1360 "DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}});
1361 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1362
1363 Options options = CurrentOptions();
1364 options.level0_file_num_compaction_trigger = 4;
1365 options.max_background_compactions = 40;
1366 CompactionStallTestListener* listener = new CompactionStallTestListener();
1367 options.listeners.emplace_back(listener);
1368 DestroyAndReopen(options);
1369 // make sure all background compaction jobs can be scheduled
1370 auto stop_token =
1371 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1372
1373 Random rnd(301);
1374
1375 // 4 Files in L0
1376 for (int i = 0; i < 4; i++) {
1377 for (int j = 0; j < 10; j++) {
1378 ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
1379 }
1380 ASSERT_OK(Flush());
1381 }
1382
1383 // Wait for compaction to be triggered
1384 TEST_SYNC_POINT("DBTest2::CompactionStall:0");
1385
1386 // Clear "DBImpl::BGWorkCompaction" SYNC_POINT since we want to hold it again
1387 // at DBTest2::CompactionStall::1
1388 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
1389
1390 // Another 6 L0 files to trigger compaction again
1391 for (int i = 0; i < 6; i++) {
1392 for (int j = 0; j < 10; j++) {
1393 ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
1394 }
1395 ASSERT_OK(Flush());
1396 }
1397
1398 // Wait for another compaction to be triggered
1399 TEST_SYNC_POINT("DBTest2::CompactionStall:1");
1400
1401 // Hold NotifyOnCompactionBegin in the unlock mutex section
1402 TEST_SYNC_POINT("DBTest2::CompactionStall:2");
1403
1404 // Hold NotifyOnCompactionCompleted in the unlock mutex section
1405 TEST_SYNC_POINT("DBTest2::CompactionStall:3");
1406
1407 dbfull()->TEST_WaitForCompact();
1408 ASSERT_LT(NumTableFilesAtLevel(0),
1409 options.level0_file_num_compaction_trigger);
1410 ASSERT_GT(listener->compacted_files_cnt_.load(),
1411 10 - options.level0_file_num_compaction_trigger);
1412 ASSERT_EQ(listener->compacting_files_cnt_.load(), listener->compacted_files_cnt_.load());
1413
1414 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1415 }
1416
1417 #endif // ROCKSDB_LITE
1418
TEST_F(DBTest2,FirstSnapshotTest)1419 TEST_F(DBTest2, FirstSnapshotTest) {
1420 Options options;
1421 options.write_buffer_size = 100000; // Small write buffer
1422 options = CurrentOptions(options);
1423 CreateAndReopenWithCF({"pikachu"}, options);
1424
1425 // This snapshot will have sequence number 0 what is expected behaviour.
1426 const Snapshot* s1 = db_->GetSnapshot();
1427
1428 Put(1, "k1", std::string(100000, 'x')); // Fill memtable
1429 Put(1, "k2", std::string(100000, 'y')); // Trigger flush
1430
1431 db_->ReleaseSnapshot(s1);
1432 }
1433
1434 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,DuplicateSnapshot)1435 TEST_F(DBTest2, DuplicateSnapshot) {
1436 Options options;
1437 options = CurrentOptions(options);
1438 std::vector<const Snapshot*> snapshots;
1439 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
1440 SequenceNumber oldest_ww_snap, first_ww_snap;
1441
1442 Put("k", "v"); // inc seq
1443 snapshots.push_back(db_->GetSnapshot());
1444 snapshots.push_back(db_->GetSnapshot());
1445 Put("k", "v"); // inc seq
1446 snapshots.push_back(db_->GetSnapshot());
1447 snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
1448 first_ww_snap = snapshots.back()->GetSequenceNumber();
1449 Put("k", "v"); // inc seq
1450 snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
1451 snapshots.push_back(db_->GetSnapshot());
1452 Put("k", "v"); // inc seq
1453 snapshots.push_back(db_->GetSnapshot());
1454
1455 {
1456 InstrumentedMutexLock l(dbi->mutex());
1457 auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap);
1458 ASSERT_EQ(seqs.size(), 4); // duplicates are not counted
1459 ASSERT_EQ(oldest_ww_snap, first_ww_snap);
1460 }
1461
1462 for (auto s : snapshots) {
1463 db_->ReleaseSnapshot(s);
1464 }
1465 }
1466 #endif // ROCKSDB_LITE
1467
1468 class PinL0IndexAndFilterBlocksTest
1469 : public DBTestBase,
1470 public testing::WithParamInterface<std::tuple<bool, bool>> {
1471 public:
PinL0IndexAndFilterBlocksTest()1472 PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {}
SetUp()1473 void SetUp() override {
1474 infinite_max_files_ = std::get<0>(GetParam());
1475 disallow_preload_ = std::get<1>(GetParam());
1476 }
1477
CreateTwoLevels(Options * options,bool close_afterwards)1478 void CreateTwoLevels(Options* options, bool close_afterwards) {
1479 if (infinite_max_files_) {
1480 options->max_open_files = -1;
1481 }
1482 options->create_if_missing = true;
1483 options->statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1484 BlockBasedTableOptions table_options;
1485 table_options.cache_index_and_filter_blocks = true;
1486 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1487 table_options.filter_policy.reset(NewBloomFilterPolicy(20));
1488 options->table_factory.reset(new BlockBasedTableFactory(table_options));
1489 CreateAndReopenWithCF({"pikachu"}, *options);
1490
1491 Put(1, "a", "begin");
1492 Put(1, "z", "end");
1493 ASSERT_OK(Flush(1));
1494 // move this table to L1
1495 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
1496
1497 // reset block cache
1498 table_options.block_cache = NewLRUCache(64 * 1024);
1499 options->table_factory.reset(NewBlockBasedTableFactory(table_options));
1500 TryReopenWithColumnFamilies({"default", "pikachu"}, *options);
1501 // create new table at L0
1502 Put(1, "a2", "begin2");
1503 Put(1, "z2", "end2");
1504 ASSERT_OK(Flush(1));
1505
1506 if (close_afterwards) {
1507 Close(); // This ensures that there is no ref to block cache entries
1508 }
1509 table_options.block_cache->EraseUnRefEntries();
1510 }
1511
1512 bool infinite_max_files_;
1513 bool disallow_preload_;
1514 };
1515
TEST_P(PinL0IndexAndFilterBlocksTest,IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning)1516 TEST_P(PinL0IndexAndFilterBlocksTest,
1517 IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) {
1518 Options options = CurrentOptions();
1519 if (infinite_max_files_) {
1520 options.max_open_files = -1;
1521 }
1522 options.create_if_missing = true;
1523 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1524 BlockBasedTableOptions table_options;
1525 table_options.cache_index_and_filter_blocks = true;
1526 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1527 table_options.filter_policy.reset(NewBloomFilterPolicy(20));
1528 options.table_factory.reset(new BlockBasedTableFactory(table_options));
1529 CreateAndReopenWithCF({"pikachu"}, options);
1530
1531 ASSERT_OK(Put(1, "key", "val"));
1532 // Create a new table.
1533 ASSERT_OK(Flush(1));
1534
1535 // index/filter blocks added to block cache right after table creation.
1536 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1537 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1538 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1539 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1540
1541 // only index/filter were added
1542 ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD));
1543 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
1544
1545 std::string value;
1546 // Miss and hit count should remain the same, they're all pinned.
1547 db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
1548 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1549 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1550 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1551 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1552
1553 // Miss and hit count should remain the same, they're all pinned.
1554 value = Get(1, "key");
1555 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1556 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1557 ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1558 ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1559 }
1560
TEST_P(PinL0IndexAndFilterBlocksTest,MultiLevelIndexAndFilterBlocksCachedWithPinning)1561 TEST_P(PinL0IndexAndFilterBlocksTest,
1562 MultiLevelIndexAndFilterBlocksCachedWithPinning) {
1563 Options options = CurrentOptions();
1564 PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, false);
1565 // get base cache values
1566 uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
1567 uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
1568 uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
1569 uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
1570
1571 std::string value;
1572 // this should be read from L0
1573 // so cache values don't change
1574 value = Get(1, "a2");
1575 ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1576 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1577 ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1578 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1579
1580 // this should be read from L1
1581 // the file is opened, prefetching results in a cache filter miss
1582 // the block is loaded and added to the cache,
1583 // then the get results in a cache hit for L1
1584 // When we have inifinite max_files, there is still cache miss because we have
1585 // reset the block cache
1586 value = Get(1, "a");
1587 ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1588 ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1589 }
1590
TEST_P(PinL0IndexAndFilterBlocksTest,DisablePrefetchingNonL0IndexAndFilter)1591 TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
1592 Options options = CurrentOptions();
1593 // This ensures that db does not ref anything in the block cache, so
1594 // EraseUnRefEntries could clear them up.
1595 bool close_afterwards = true;
1596 PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, close_afterwards);
1597
1598 // Get base cache values
1599 uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
1600 uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
1601 uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
1602 uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
1603
1604 if (disallow_preload_) {
1605 // Now we have two files. We narrow the max open files to allow 3 entries
1606 // so that preloading SST files won't happen.
1607 options.max_open_files = 13;
1608 // RocksDB sanitize max open files to at least 20. Modify it back.
1609 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1610 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
1611 int* max_open_files = static_cast<int*>(arg);
1612 *max_open_files = 13;
1613 });
1614 }
1615 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1616
1617 // Reopen database. If max_open_files is set as -1, table readers will be
1618 // preloaded. This will trigger a BlockBasedTable::Open() and prefetch
1619 // L0 index and filter. Level 1's prefetching is disabled in DB::Open()
1620 TryReopenWithColumnFamilies({"default", "pikachu"}, options);
1621
1622 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1623
1624 if (!disallow_preload_) {
1625 // After reopen, cache miss are increased by one because we read (and only
1626 // read) filter and index on L0
1627 ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1628 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1629 ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1630 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1631 } else {
1632 // If max_open_files is not -1, we do not preload table readers, so there is
1633 // no change.
1634 ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1635 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1636 ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1637 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1638 }
1639 std::string value;
1640 // this should be read from L0
1641 value = Get(1, "a2");
1642 // If max_open_files is -1, we have pinned index and filter in Rep, so there
1643 // will not be changes in index and filter misses or hits. If max_open_files
1644 // is not -1, Get() will open a TableReader and prefetch index and filter.
1645 ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1646 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1647 ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1648 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1649
1650 // this should be read from L1
1651 value = Get(1, "a");
1652 if (!disallow_preload_) {
1653 // In inifinite max files case, there's a cache miss in executing Get()
1654 // because index and filter are not prefetched before.
1655 ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1656 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1657 ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1658 ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1659 } else {
1660 // In this case, cache miss will be increased by one in
1661 // BlockBasedTable::Open() because this is not in DB::Open() code path so we
1662 // will prefetch L1's index and filter. Cache hit will also be increased by
1663 // one because Get() will read index and filter from the block cache
1664 // prefetched in previous Open() call.
1665 ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1666 ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1667 ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1668 ASSERT_EQ(ih + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1669 }
1670
1671 // Force a full compaction to one single file. There will be a block
1672 // cache read for both of index and filter. If prefetch doesn't explicitly
1673 // happen, it will happen when verifying the file.
1674 Compact(1, "a", "zzzzz");
1675 dbfull()->TEST_WaitForCompact();
1676
1677 if (!disallow_preload_) {
1678 ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1679 ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1680 ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1681 ASSERT_EQ(ih + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1682 } else {
1683 ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1684 ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1685 ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1686 ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1687 }
1688
1689 // Bloom and index hit will happen when a Get() happens.
1690 value = Get(1, "a");
1691 if (!disallow_preload_) {
1692 ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1693 ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1694 ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1695 ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1696 } else {
1697 ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
1698 ASSERT_EQ(fh + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
1699 ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
1700 ASSERT_EQ(ih + 4, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
1701 }
1702 }
1703
1704 INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
1705 PinL0IndexAndFilterBlocksTest,
1706 ::testing::Values(std::make_tuple(true, false),
1707 std::make_tuple(false, false),
1708 std::make_tuple(false, true)));
1709
1710 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,MaxCompactionBytesTest)1711 TEST_F(DBTest2, MaxCompactionBytesTest) {
1712 Options options = CurrentOptions();
1713 options.memtable_factory.reset(
1714 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
1715 options.compaction_style = kCompactionStyleLevel;
1716 options.write_buffer_size = 200 << 10;
1717 options.arena_block_size = 4 << 10;
1718 options.level0_file_num_compaction_trigger = 4;
1719 options.num_levels = 4;
1720 options.compression = kNoCompression;
1721 options.max_bytes_for_level_base = 450 << 10;
1722 options.target_file_size_base = 100 << 10;
1723 // Infinite for full compaction.
1724 options.max_compaction_bytes = options.target_file_size_base * 100;
1725
1726 Reopen(options);
1727
1728 Random rnd(301);
1729
1730 for (int num = 0; num < 8; num++) {
1731 GenerateNewRandomFile(&rnd);
1732 }
1733 CompactRangeOptions cro;
1734 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
1735 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
1736 ASSERT_EQ("0,0,8", FilesPerLevel(0));
1737
1738 // When compact from Ln -> Ln+1, cut a file if the file overlaps with
1739 // more than three files in Ln+1.
1740 options.max_compaction_bytes = options.target_file_size_base * 3;
1741 Reopen(options);
1742
1743 GenerateNewRandomFile(&rnd);
1744 // Add three more small files that overlap with the previous file
1745 for (int i = 0; i < 3; i++) {
1746 Put("a", "z");
1747 ASSERT_OK(Flush());
1748 }
1749 dbfull()->TEST_WaitForCompact();
1750
1751 // Output files to L1 are cut to three pieces, according to
1752 // options.max_compaction_bytes
1753 ASSERT_EQ("0,3,8", FilesPerLevel(0));
1754 }
1755
UniqueIdCallback(void * arg)1756 static void UniqueIdCallback(void* arg) {
1757 int* result = reinterpret_cast<int*>(arg);
1758 if (*result == -1) {
1759 *result = 0;
1760 }
1761
1762 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
1763 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1764 "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
1765 }
1766
1767 class MockPersistentCache : public PersistentCache {
1768 public:
MockPersistentCache(const bool is_compressed,const size_t max_size)1769 explicit MockPersistentCache(const bool is_compressed, const size_t max_size)
1770 : is_compressed_(is_compressed), max_size_(max_size) {
1771 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1772 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1773 "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
1774 }
1775
~MockPersistentCache()1776 ~MockPersistentCache() override {}
1777
Stats()1778 PersistentCache::StatsType Stats() override {
1779 return PersistentCache::StatsType();
1780 }
1781
Insert(const Slice & page_key,const char * data,const size_t size)1782 Status Insert(const Slice& page_key, const char* data,
1783 const size_t size) override {
1784 MutexLock _(&lock_);
1785
1786 if (size_ > max_size_) {
1787 size_ -= data_.begin()->second.size();
1788 data_.erase(data_.begin());
1789 }
1790
1791 data_.insert(std::make_pair(page_key.ToString(), std::string(data, size)));
1792 size_ += size;
1793 return Status::OK();
1794 }
1795
Lookup(const Slice & page_key,std::unique_ptr<char[]> * data,size_t * size)1796 Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
1797 size_t* size) override {
1798 MutexLock _(&lock_);
1799 auto it = data_.find(page_key.ToString());
1800 if (it == data_.end()) {
1801 return Status::NotFound();
1802 }
1803
1804 assert(page_key.ToString() == it->first);
1805 data->reset(new char[it->second.size()]);
1806 memcpy(data->get(), it->second.c_str(), it->second.size());
1807 *size = it->second.size();
1808 return Status::OK();
1809 }
1810
IsCompressed()1811 bool IsCompressed() override { return is_compressed_; }
1812
GetPrintableOptions() const1813 std::string GetPrintableOptions() const override {
1814 return "MockPersistentCache";
1815 }
1816
1817 port::Mutex lock_;
1818 std::map<std::string, std::string> data_;
1819 const bool is_compressed_ = true;
1820 size_t size_ = 0;
1821 const size_t max_size_ = 10 * 1024; // 10KiB
1822 };
1823
1824 #ifdef OS_LINUX
1825 // Make sure that in CPU time perf context counters, Env::NowCPUNanos()
1826 // is used, rather than Env::CPUNanos();
TEST_F(DBTest2,TestPerfContextGetCpuTime)1827 TEST_F(DBTest2, TestPerfContextGetCpuTime) {
1828 // force resizing table cache so table handle is not preloaded so that
1829 // we can measure find_table_nanos during Get().
1830 dbfull()->TEST_table_cache()->SetCapacity(0);
1831 ASSERT_OK(Put("foo", "bar"));
1832 ASSERT_OK(Flush());
1833 env_->now_cpu_count_.store(0);
1834
1835 // CPU timing is not enabled with kEnableTimeExceptForMutex
1836 SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
1837 ASSERT_EQ("bar", Get("foo"));
1838 ASSERT_EQ(0, get_perf_context()->get_cpu_nanos);
1839 ASSERT_EQ(0, env_->now_cpu_count_.load());
1840
1841 uint64_t kDummyAddonTime = uint64_t{1000000000000};
1842
1843 // Add time to NowNanos() reading.
1844 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1845 "TableCache::FindTable:0",
1846 [&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); });
1847 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1848
1849 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1850 ASSERT_EQ("bar", Get("foo"));
1851 ASSERT_GT(env_->now_cpu_count_.load(), 2);
1852 ASSERT_LT(get_perf_context()->get_cpu_nanos, kDummyAddonTime);
1853 ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime);
1854
1855 SetPerfLevel(PerfLevel::kDisable);
1856 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1857 }
1858
TEST_F(DBTest2,TestPerfContextIterCpuTime)1859 TEST_F(DBTest2, TestPerfContextIterCpuTime) {
1860 DestroyAndReopen(CurrentOptions());
1861 // force resizing table cache so table handle is not preloaded so that
1862 // we can measure find_table_nanos during iteration
1863 dbfull()->TEST_table_cache()->SetCapacity(0);
1864
1865 const size_t kNumEntries = 10;
1866 for (size_t i = 0; i < kNumEntries; ++i) {
1867 ASSERT_OK(Put("k" + ToString(i), "v" + ToString(i)));
1868 }
1869 ASSERT_OK(Flush());
1870 for (size_t i = 0; i < kNumEntries; ++i) {
1871 ASSERT_EQ("v" + ToString(i), Get("k" + ToString(i)));
1872 }
1873 std::string last_key = "k" + ToString(kNumEntries - 1);
1874 std::string last_value = "v" + ToString(kNumEntries - 1);
1875 env_->now_cpu_count_.store(0);
1876
1877 // CPU timing is not enabled with kEnableTimeExceptForMutex
1878 SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
1879 Iterator* iter = db_->NewIterator(ReadOptions());
1880 iter->Seek("k0");
1881 ASSERT_TRUE(iter->Valid());
1882 ASSERT_EQ("v0", iter->value().ToString());
1883 iter->SeekForPrev(last_key);
1884 ASSERT_TRUE(iter->Valid());
1885 iter->SeekToLast();
1886 ASSERT_TRUE(iter->Valid());
1887 ASSERT_EQ(last_value, iter->value().ToString());
1888 iter->SeekToFirst();
1889 ASSERT_TRUE(iter->Valid());
1890 ASSERT_EQ("v0", iter->value().ToString());
1891 ASSERT_EQ(0, get_perf_context()->iter_seek_cpu_nanos);
1892 iter->Next();
1893 ASSERT_TRUE(iter->Valid());
1894 ASSERT_EQ("v1", iter->value().ToString());
1895 ASSERT_EQ(0, get_perf_context()->iter_next_cpu_nanos);
1896 iter->Prev();
1897 ASSERT_TRUE(iter->Valid());
1898 ASSERT_EQ("v0", iter->value().ToString());
1899 ASSERT_EQ(0, get_perf_context()->iter_prev_cpu_nanos);
1900 ASSERT_EQ(0, env_->now_cpu_count_.load());
1901 delete iter;
1902
1903 uint64_t kDummyAddonTime = uint64_t{1000000000000};
1904
1905 // Add time to NowNanos() reading.
1906 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1907 "TableCache::FindTable:0",
1908 [&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); });
1909 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1910
1911 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1912 iter = db_->NewIterator(ReadOptions());
1913 iter->Seek("k0");
1914 ASSERT_TRUE(iter->Valid());
1915 ASSERT_EQ("v0", iter->value().ToString());
1916 iter->SeekForPrev(last_key);
1917 ASSERT_TRUE(iter->Valid());
1918 iter->SeekToLast();
1919 ASSERT_TRUE(iter->Valid());
1920 ASSERT_EQ(last_value, iter->value().ToString());
1921 iter->SeekToFirst();
1922 ASSERT_TRUE(iter->Valid());
1923 ASSERT_EQ("v0", iter->value().ToString());
1924 ASSERT_GT(get_perf_context()->iter_seek_cpu_nanos, 0);
1925 ASSERT_LT(get_perf_context()->iter_seek_cpu_nanos, kDummyAddonTime);
1926 iter->Next();
1927 ASSERT_TRUE(iter->Valid());
1928 ASSERT_EQ("v1", iter->value().ToString());
1929 ASSERT_GT(get_perf_context()->iter_next_cpu_nanos, 0);
1930 ASSERT_LT(get_perf_context()->iter_next_cpu_nanos, kDummyAddonTime);
1931 iter->Prev();
1932 ASSERT_TRUE(iter->Valid());
1933 ASSERT_EQ("v0", iter->value().ToString());
1934 ASSERT_GT(get_perf_context()->iter_prev_cpu_nanos, 0);
1935 ASSERT_LT(get_perf_context()->iter_prev_cpu_nanos, kDummyAddonTime);
1936 ASSERT_GE(env_->now_cpu_count_.load(), 12);
1937 ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime);
1938
1939 SetPerfLevel(PerfLevel::kDisable);
1940 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1941 delete iter;
1942 }
1943 #endif // OS_LINUX
1944
1945 // GetUniqueIdFromFile is not implemented on these platforms. Persistent cache
1946 // breaks when that function is not implemented and no regular block cache is
1947 // provided.
1948 #if !defined(OS_SOLARIS) && !defined(OS_WIN)
TEST_F(DBTest2,PersistentCache)1949 TEST_F(DBTest2, PersistentCache) {
1950 int num_iter = 80;
1951
1952 Options options;
1953 options.write_buffer_size = 64 * 1024; // small write buffer
1954 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1955 options = CurrentOptions(options);
1956
1957 auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024};
1958 auto types = {/*compressed*/ 1, /*uncompressed*/ 0};
1959 for (auto bsize : bsizes) {
1960 for (auto type : types) {
1961 BlockBasedTableOptions table_options;
1962 table_options.persistent_cache.reset(
1963 new MockPersistentCache(type, 10 * 1024));
1964 table_options.no_block_cache = true;
1965 table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr;
1966 table_options.block_cache_compressed = nullptr;
1967 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1968
1969 DestroyAndReopen(options);
1970 CreateAndReopenWithCF({"pikachu"}, options);
1971 // default column family doesn't have block cache
1972 Options no_block_cache_opts;
1973 no_block_cache_opts.statistics = options.statistics;
1974 no_block_cache_opts = CurrentOptions(no_block_cache_opts);
1975 BlockBasedTableOptions table_options_no_bc;
1976 table_options_no_bc.no_block_cache = true;
1977 no_block_cache_opts.table_factory.reset(
1978 NewBlockBasedTableFactory(table_options_no_bc));
1979 ReopenWithColumnFamilies(
1980 {"default", "pikachu"},
1981 std::vector<Options>({no_block_cache_opts, options}));
1982
1983 Random rnd(301);
1984
1985 // Write 8MB (80 values, each 100K)
1986 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
1987 std::vector<std::string> values;
1988 std::string str;
1989 for (int i = 0; i < num_iter; i++) {
1990 if (i % 4 == 0) { // high compression ratio
1991 str = RandomString(&rnd, 1000);
1992 }
1993 values.push_back(str);
1994 ASSERT_OK(Put(1, Key(i), values[i]));
1995 }
1996
1997 // flush all data from memtable so that reads are from block cache
1998 ASSERT_OK(Flush(1));
1999
2000 for (int i = 0; i < num_iter; i++) {
2001 ASSERT_EQ(Get(1, Key(i)), values[i]);
2002 }
2003
2004 auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT);
2005 auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS);
2006
2007 ASSERT_GT(hit, 0);
2008 ASSERT_GT(miss, 0);
2009 }
2010 }
2011 }
2012 #endif // !defined(OS_SOLARIS) && !defined(OS_WIN)
2013
2014 namespace {
CountSyncPoint()2015 void CountSyncPoint() {
2016 TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */);
2017 }
2018 } // namespace
2019
TEST_F(DBTest2,SyncPointMarker)2020 TEST_F(DBTest2, SyncPointMarker) {
2021 std::atomic<int> sync_point_called(0);
2022 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2023 "DBTest2::MarkedPoint",
2024 [&](void* /*arg*/) { sync_point_called.fetch_add(1); });
2025
2026 // The first dependency enforces Marker can be loaded before MarkedPoint.
2027 // The second checks that thread 1's MarkedPoint should be disabled here.
2028 // Execution order:
2029 // | Thread 1 | Thread 2 |
2030 // | | Marker |
2031 // | MarkedPoint | |
2032 // | Thread1First | |
2033 // | | MarkedPoint |
2034 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
2035 {{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}},
2036 {{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}});
2037
2038 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2039
2040 std::function<void()> func1 = [&]() {
2041 CountSyncPoint();
2042 TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First");
2043 };
2044
2045 std::function<void()> func2 = [&]() {
2046 TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker");
2047 CountSyncPoint();
2048 };
2049
2050 auto thread1 = port::Thread(func1);
2051 auto thread2 = port::Thread(func2);
2052 thread1.join();
2053 thread2.join();
2054
2055 // Callback is only executed once
2056 ASSERT_EQ(sync_point_called.load(), 1);
2057 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2058 }
2059 #endif
2060
GetEncodedEntrySize(size_t key_size,size_t value_size)2061 size_t GetEncodedEntrySize(size_t key_size, size_t value_size) {
2062 std::string buffer;
2063
2064 PutVarint32(&buffer, static_cast<uint32_t>(0));
2065 PutVarint32(&buffer, static_cast<uint32_t>(key_size));
2066 PutVarint32(&buffer, static_cast<uint32_t>(value_size));
2067
2068 return buffer.size() + key_size + value_size;
2069 }
2070
TEST_F(DBTest2,ReadAmpBitmap)2071 TEST_F(DBTest2, ReadAmpBitmap) {
2072 Options options = CurrentOptions();
2073 BlockBasedTableOptions bbto;
2074 uint32_t bytes_per_bit[2] = {1, 16};
2075 for (size_t k = 0; k < 2; k++) {
2076 // Disable delta encoding to make it easier to calculate read amplification
2077 bbto.use_delta_encoding = false;
2078 // Huge block cache to make it easier to calculate read amplification
2079 bbto.block_cache = NewLRUCache(1024 * 1024 * 1024);
2080 bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
2081 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2082 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2083 DestroyAndReopen(options);
2084
2085 const size_t kNumEntries = 10000;
2086
2087 Random rnd(301);
2088 for (size_t i = 0; i < kNumEntries; i++) {
2089 ASSERT_OK(Put(Key(static_cast<int>(i)), RandomString(&rnd, 100)));
2090 }
2091 ASSERT_OK(Flush());
2092
2093 Close();
2094 Reopen(options);
2095
2096 // Read keys/values randomly and verify that reported read amp error
2097 // is less than 2%
2098 uint64_t total_useful_bytes = 0;
2099 std::set<int> read_keys;
2100 std::string value;
2101 for (size_t i = 0; i < kNumEntries * 5; i++) {
2102 int key_idx = rnd.Next() % kNumEntries;
2103 std::string key = Key(key_idx);
2104 ASSERT_OK(db_->Get(ReadOptions(), key, &value));
2105
2106 if (read_keys.find(key_idx) == read_keys.end()) {
2107 auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
2108 total_useful_bytes +=
2109 GetEncodedEntrySize(internal_key.size(), value.size());
2110 read_keys.insert(key_idx);
2111 }
2112
2113 double expected_read_amp =
2114 static_cast<double>(total_useful_bytes) /
2115 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
2116
2117 double read_amp =
2118 static_cast<double>(options.statistics->getTickerCount(
2119 READ_AMP_ESTIMATE_USEFUL_BYTES)) /
2120 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
2121
2122 double error_pct = fabs(expected_read_amp - read_amp) * 100;
2123 // Error between reported read amp and real read amp should be less than
2124 // 2%
2125 EXPECT_LE(error_pct, 2);
2126 }
2127
2128 // Make sure we read every thing in the DB (which is smaller than our cache)
2129 Iterator* iter = db_->NewIterator(ReadOptions());
2130 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2131 ASSERT_EQ(iter->value().ToString(), Get(iter->key().ToString()));
2132 }
2133 delete iter;
2134
2135 // Read amp is on average 100% since we read all what we loaded in memory
2136 if (k == 0) {
2137 ASSERT_EQ(
2138 options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES),
2139 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES));
2140 } else {
2141 ASSERT_NEAR(
2142 options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES) *
2143 1.0f /
2144 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES),
2145 1, .01);
2146 }
2147 }
2148 }
2149
2150 #ifndef OS_SOLARIS // GetUniqueIdFromFile is not implemented
TEST_F(DBTest2,ReadAmpBitmapLiveInCacheAfterDBClose)2151 TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
2152 {
2153 const int kIdBufLen = 100;
2154 char id_buf[kIdBufLen];
2155 #ifndef OS_WIN
2156 // You can't open a directory on windows using random access file
2157 std::unique_ptr<RandomAccessFile> file;
2158 ASSERT_OK(env_->NewRandomAccessFile(dbname_, &file, EnvOptions()));
2159 if (file->GetUniqueId(id_buf, kIdBufLen) == 0) {
2160 // fs holding db directory doesn't support getting a unique file id,
2161 // this means that running this test will fail because lru_cache will load
2162 // the blocks again regardless of them being already in the cache
2163 return;
2164 }
2165 #else
2166 std::unique_ptr<Directory> dir;
2167 ASSERT_OK(env_->NewDirectory(dbname_, &dir));
2168 if (dir->GetUniqueId(id_buf, kIdBufLen) == 0) {
2169 // fs holding db directory doesn't support getting a unique file id,
2170 // this means that running this test will fail because lru_cache will load
2171 // the blocks again regardless of them being already in the cache
2172 return;
2173 }
2174 #endif
2175 }
2176 uint32_t bytes_per_bit[2] = {1, 16};
2177 for (size_t k = 0; k < 2; k++) {
2178 std::shared_ptr<Cache> lru_cache = NewLRUCache(1024 * 1024 * 1024);
2179 std::shared_ptr<Statistics> stats = ROCKSDB_NAMESPACE::CreateDBStatistics();
2180
2181 Options options = CurrentOptions();
2182 BlockBasedTableOptions bbto;
2183 // Disable delta encoding to make it easier to calculate read amplification
2184 bbto.use_delta_encoding = false;
2185 // Huge block cache to make it easier to calculate read amplification
2186 bbto.block_cache = lru_cache;
2187 bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
2188 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2189 options.statistics = stats;
2190 DestroyAndReopen(options);
2191
2192 const int kNumEntries = 10000;
2193
2194 Random rnd(301);
2195 for (int i = 0; i < kNumEntries; i++) {
2196 ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
2197 }
2198 ASSERT_OK(Flush());
2199
2200 Close();
2201 Reopen(options);
2202
2203 uint64_t total_useful_bytes = 0;
2204 std::set<int> read_keys;
2205 std::string value;
2206 // Iter1: Read half the DB, Read even keys
2207 // Key(0), Key(2), Key(4), Key(6), Key(8), ...
2208 for (int i = 0; i < kNumEntries; i += 2) {
2209 std::string key = Key(i);
2210 ASSERT_OK(db_->Get(ReadOptions(), key, &value));
2211
2212 if (read_keys.find(i) == read_keys.end()) {
2213 auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
2214 total_useful_bytes +=
2215 GetEncodedEntrySize(internal_key.size(), value.size());
2216 read_keys.insert(i);
2217 }
2218 }
2219
2220 size_t total_useful_bytes_iter1 =
2221 options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
2222 size_t total_loaded_bytes_iter1 =
2223 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
2224
2225 Close();
2226 std::shared_ptr<Statistics> new_statistics =
2227 ROCKSDB_NAMESPACE::CreateDBStatistics();
2228 // Destroy old statistics obj that the blocks in lru_cache are pointing to
2229 options.statistics.reset();
2230 // Use the statistics object that we just created
2231 options.statistics = new_statistics;
2232 Reopen(options);
2233
2234 // Iter2: Read half the DB, Read odd keys
2235 // Key(1), Key(3), Key(5), Key(7), Key(9), ...
2236 for (int i = 1; i < kNumEntries; i += 2) {
2237 std::string key = Key(i);
2238 ASSERT_OK(db_->Get(ReadOptions(), key, &value));
2239
2240 if (read_keys.find(i) == read_keys.end()) {
2241 auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
2242 total_useful_bytes +=
2243 GetEncodedEntrySize(internal_key.size(), value.size());
2244 read_keys.insert(i);
2245 }
2246 }
2247
2248 size_t total_useful_bytes_iter2 =
2249 options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
2250 size_t total_loaded_bytes_iter2 =
2251 options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
2252
2253
2254 // Read amp is on average 100% since we read all what we loaded in memory
2255 if (k == 0) {
2256 ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
2257 total_loaded_bytes_iter1 + total_loaded_bytes_iter2);
2258 } else {
2259 ASSERT_NEAR((total_useful_bytes_iter1 + total_useful_bytes_iter2) * 1.0f /
2260 (total_loaded_bytes_iter1 + total_loaded_bytes_iter2),
2261 1, .01);
2262 }
2263 }
2264 }
2265 #endif // !OS_SOLARIS
2266
2267 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,AutomaticCompactionOverlapManualCompaction)2268 TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) {
2269 Options options = CurrentOptions();
2270 options.num_levels = 3;
2271 options.IncreaseParallelism(20);
2272 DestroyAndReopen(options);
2273
2274 ASSERT_OK(Put(Key(0), "a"));
2275 ASSERT_OK(Put(Key(5), "a"));
2276 ASSERT_OK(Flush());
2277
2278 ASSERT_OK(Put(Key(10), "a"));
2279 ASSERT_OK(Put(Key(15), "a"));
2280 ASSERT_OK(Flush());
2281
2282 CompactRangeOptions cro;
2283 cro.change_level = true;
2284 cro.target_level = 2;
2285 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2286
2287 auto get_stat = [](std::string level_str, LevelStatType type,
2288 std::map<std::string, std::string> props) {
2289 auto prop_str =
2290 "compaction." + level_str + "." +
2291 InternalStats::compaction_level_stats.at(type).property_name.c_str();
2292 auto prop_item = props.find(prop_str);
2293 return prop_item == props.end() ? 0 : std::stod(prop_item->second);
2294 };
2295
2296 // Trivial move 2 files to L2
2297 ASSERT_EQ("0,0,2", FilesPerLevel());
2298 // Also test that the stats GetMapProperty API reporting the same result
2299 {
2300 std::map<std::string, std::string> prop;
2301 ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
2302 ASSERT_EQ(0, get_stat("L0", LevelStatType::NUM_FILES, prop));
2303 ASSERT_EQ(0, get_stat("L1", LevelStatType::NUM_FILES, prop));
2304 ASSERT_EQ(2, get_stat("L2", LevelStatType::NUM_FILES, prop));
2305 ASSERT_EQ(2, get_stat("Sum", LevelStatType::NUM_FILES, prop));
2306 }
2307
2308 // While the compaction is running, we will create 2 new files that
2309 // can fit in L2, these 2 files will be moved to L2 and overlap with
2310 // the running compaction and break the LSM consistency.
2311 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2312 "CompactionJob::Run():Start", [&](void* /*arg*/) {
2313 ASSERT_OK(
2314 dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
2315 {"max_bytes_for_level_base", "1"}}));
2316 ASSERT_OK(Put(Key(6), "a"));
2317 ASSERT_OK(Put(Key(7), "a"));
2318 ASSERT_OK(Flush());
2319
2320 ASSERT_OK(Put(Key(8), "a"));
2321 ASSERT_OK(Put(Key(9), "a"));
2322 ASSERT_OK(Flush());
2323 });
2324 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2325
2326 // Run a manual compaction that will compact the 2 files in L2
2327 // into 1 file in L2
2328 cro.exclusive_manual_compaction = false;
2329 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
2330 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2331
2332 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2333
2334 // Test that the stats GetMapProperty API reporting 1 file in L2
2335 {
2336 std::map<std::string, std::string> prop;
2337 ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
2338 ASSERT_EQ(1, get_stat("L2", LevelStatType::NUM_FILES, prop));
2339 }
2340 }
2341
TEST_F(DBTest2,ManualCompactionOverlapManualCompaction)2342 TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
2343 Options options = CurrentOptions();
2344 options.num_levels = 2;
2345 options.IncreaseParallelism(20);
2346 options.disable_auto_compactions = true;
2347 DestroyAndReopen(options);
2348
2349 ASSERT_OK(Put(Key(0), "a"));
2350 ASSERT_OK(Put(Key(5), "a"));
2351 ASSERT_OK(Flush());
2352
2353 ASSERT_OK(Put(Key(10), "a"));
2354 ASSERT_OK(Put(Key(15), "a"));
2355 ASSERT_OK(Flush());
2356
2357 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2358
2359 // Trivial move 2 files to L1
2360 ASSERT_EQ("0,2", FilesPerLevel());
2361
2362 std::function<void()> bg_manual_compact = [&]() {
2363 std::string k1 = Key(6);
2364 std::string k2 = Key(9);
2365 Slice k1s(k1);
2366 Slice k2s(k2);
2367 CompactRangeOptions cro;
2368 cro.exclusive_manual_compaction = false;
2369 ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s));
2370 };
2371 ROCKSDB_NAMESPACE::port::Thread bg_thread;
2372
2373 // While the compaction is running, we will create 2 new files that
2374 // can fit in L1, these 2 files will be moved to L1 and overlap with
2375 // the running compaction and break the LSM consistency.
2376 std::atomic<bool> flag(false);
2377 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2378 "CompactionJob::Run():Start", [&](void* /*arg*/) {
2379 if (flag.exchange(true)) {
2380 // We want to make sure to call this callback only once
2381 return;
2382 }
2383 ASSERT_OK(Put(Key(6), "a"));
2384 ASSERT_OK(Put(Key(7), "a"));
2385 ASSERT_OK(Flush());
2386
2387 ASSERT_OK(Put(Key(8), "a"));
2388 ASSERT_OK(Put(Key(9), "a"));
2389 ASSERT_OK(Flush());
2390
2391 // Start a non-exclusive manual compaction in a bg thread
2392 bg_thread = port::Thread(bg_manual_compact);
2393 // This manual compaction conflict with the other manual compaction
2394 // so it should wait until the first compaction finish
2395 env_->SleepForMicroseconds(1000000);
2396 });
2397 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2398
2399 // Run a manual compaction that will compact the 2 files in L1
2400 // into 1 file in L1
2401 CompactRangeOptions cro;
2402 cro.exclusive_manual_compaction = false;
2403 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
2404 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
2405 bg_thread.join();
2406
2407 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2408 }
2409
TEST_F(DBTest2,PausingManualCompaction1)2410 TEST_F(DBTest2, PausingManualCompaction1) {
2411 Options options = CurrentOptions();
2412 options.disable_auto_compactions = true;
2413 options.num_levels = 7;
2414
2415 DestroyAndReopen(options);
2416 Random rnd(301);
2417 // Generate a file containing 10 keys.
2418 for (int i = 0; i < 10; i++) {
2419 ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
2420 }
2421 ASSERT_OK(Flush());
2422
2423 // Generate another file containing same keys
2424 for (int i = 0; i < 10; i++) {
2425 ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
2426 }
2427 ASSERT_OK(Flush());
2428
2429 int manual_compactions_paused = 0;
2430 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2431 "CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
2432 auto paused = reinterpret_cast<std::atomic<bool>*>(arg);
2433 ASSERT_FALSE(paused->load(std::memory_order_acquire));
2434 paused->store(true, std::memory_order_release);
2435 manual_compactions_paused += 1;
2436 });
2437 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2438
2439 std::vector<std::string> files_before_compact, files_after_compact;
2440 // Remember file name before compaction is triggered
2441 std::vector<LiveFileMetaData> files_meta;
2442 dbfull()->GetLiveFilesMetaData(&files_meta);
2443 for (auto file : files_meta) {
2444 files_before_compact.push_back(file.name);
2445 }
2446
2447 // OK, now trigger a manual compaction
2448 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2449
2450 // Wait for compactions to get scheduled and stopped
2451 dbfull()->TEST_WaitForCompact(true);
2452
2453 // Get file names after compaction is stopped
2454 files_meta.clear();
2455 dbfull()->GetLiveFilesMetaData(&files_meta);
2456 for (auto file : files_meta) {
2457 files_after_compact.push_back(file.name);
2458 }
2459
2460 // Like nothing happened
2461 ASSERT_EQ(files_before_compact, files_after_compact);
2462 ASSERT_EQ(manual_compactions_paused, 1);
2463
2464 manual_compactions_paused = 0;
2465 // Now make sure CompactFiles also not run
2466 dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
2467 files_before_compact, 0);
2468 // Wait for manual compaction to get scheduled and finish
2469 dbfull()->TEST_WaitForCompact(true);
2470
2471 files_meta.clear();
2472 files_after_compact.clear();
2473 dbfull()->GetLiveFilesMetaData(&files_meta);
2474 for (auto file : files_meta) {
2475 files_after_compact.push_back(file.name);
2476 }
2477
2478 ASSERT_EQ(files_before_compact, files_after_compact);
2479 // CompactFiles returns at entry point
2480 ASSERT_EQ(manual_compactions_paused, 0);
2481
2482 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2483 }
2484
2485 // PausingManualCompaction does not affect auto compaction
TEST_F(DBTest2,PausingManualCompaction2)2486 TEST_F(DBTest2, PausingManualCompaction2) {
2487 Options options = CurrentOptions();
2488 options.level0_file_num_compaction_trigger = 2;
2489 options.disable_auto_compactions = false;
2490
2491 DestroyAndReopen(options);
2492 dbfull()->DisableManualCompaction();
2493
2494 Random rnd(301);
2495 for (int i = 0; i < 2; i++) {
2496 // Generate a file containing 10 keys.
2497 for (int j = 0; j < 100; j++) {
2498 ASSERT_OK(Put(Key(j), RandomString(&rnd, 50)));
2499 }
2500 ASSERT_OK(Flush());
2501 }
2502 ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
2503
2504 std::vector<LiveFileMetaData> files_meta;
2505 dbfull()->GetLiveFilesMetaData(&files_meta);
2506 ASSERT_EQ(files_meta.size(), 1);
2507 }
2508
TEST_F(DBTest2,PausingManualCompaction3)2509 TEST_F(DBTest2, PausingManualCompaction3) {
2510 CompactRangeOptions compact_options;
2511 Options options = CurrentOptions();
2512 options.disable_auto_compactions = true;
2513 options.num_levels = 7;
2514
2515 Random rnd(301);
2516 auto generate_files = [&]() {
2517 for (int i = 0; i < options.num_levels; i++) {
2518 for (int j = 0; j < options.num_levels - i + 1; j++) {
2519 for (int k = 0; k < 1000; k++) {
2520 ASSERT_OK(Put(Key(k + j * 1000), RandomString(&rnd, 50)));
2521 }
2522 Flush();
2523 }
2524
2525 for (int l = 1; l < options.num_levels - i; l++) {
2526 MoveFilesToLevel(l);
2527 }
2528 }
2529 };
2530
2531 DestroyAndReopen(options);
2532 generate_files();
2533 #ifndef ROCKSDB_LITE
2534 ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
2535 #endif // !ROCKSDB_LITE
2536 int run_manual_compactions = 0;
2537 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2538 "CompactionJob::Run():PausingManualCompaction:1",
2539 [&](void* /*arg*/) { run_manual_compactions++; });
2540 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2541
2542 dbfull()->DisableManualCompaction();
2543 dbfull()->CompactRange(compact_options, nullptr, nullptr);
2544 dbfull()->TEST_WaitForCompact(true);
2545 // As manual compaction disabled, not even reach sync point
2546 ASSERT_EQ(run_manual_compactions, 0);
2547 #ifndef ROCKSDB_LITE
2548 ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
2549 #endif // !ROCKSDB_LITE
2550
2551 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
2552 "CompactionJob::Run():PausingManualCompaction:1");
2553 dbfull()->EnableManualCompaction();
2554 dbfull()->CompactRange(compact_options, nullptr, nullptr);
2555 dbfull()->TEST_WaitForCompact(true);
2556 #ifndef ROCKSDB_LITE
2557 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
2558 #endif // !ROCKSDB_LITE
2559
2560 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2561 }
2562
TEST_F(DBTest2,PausingManualCompaction4)2563 TEST_F(DBTest2, PausingManualCompaction4) {
2564 CompactRangeOptions compact_options;
2565 Options options = CurrentOptions();
2566 options.disable_auto_compactions = true;
2567 options.num_levels = 7;
2568
2569 Random rnd(301);
2570 auto generate_files = [&]() {
2571 for (int i = 0; i < options.num_levels; i++) {
2572 for (int j = 0; j < options.num_levels - i + 1; j++) {
2573 for (int k = 0; k < 1000; k++) {
2574 ASSERT_OK(Put(Key(k + j * 1000), RandomString(&rnd, 50)));
2575 }
2576 Flush();
2577 }
2578
2579 for (int l = 1; l < options.num_levels - i; l++) {
2580 MoveFilesToLevel(l);
2581 }
2582 }
2583 };
2584
2585 DestroyAndReopen(options);
2586 generate_files();
2587 #ifndef ROCKSDB_LITE
2588 ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
2589 #endif // !ROCKSDB_LITE
2590 int run_manual_compactions = 0;
2591 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2592 "CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
2593 auto paused = reinterpret_cast<std::atomic<bool>*>(arg);
2594 ASSERT_FALSE(paused->load(std::memory_order_acquire));
2595 paused->store(true, std::memory_order_release);
2596 run_manual_compactions++;
2597 });
2598 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2599
2600 dbfull()->EnableManualCompaction();
2601 dbfull()->CompactRange(compact_options, nullptr, nullptr);
2602 dbfull()->TEST_WaitForCompact(true);
2603 ASSERT_EQ(run_manual_compactions, 1);
2604 #ifndef ROCKSDB_LITE
2605 ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
2606 #endif // !ROCKSDB_LITE
2607
2608 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
2609 "CompactionJob::Run():PausingManualCompaction:2");
2610 dbfull()->EnableManualCompaction();
2611 dbfull()->CompactRange(compact_options, nullptr, nullptr);
2612 dbfull()->TEST_WaitForCompact(true);
2613 #ifndef ROCKSDB_LITE
2614 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
2615 #endif // !ROCKSDB_LITE
2616
2617 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2618 }
2619
TEST_F(DBTest2,OptimizeForPointLookup)2620 TEST_F(DBTest2, OptimizeForPointLookup) {
2621 Options options = CurrentOptions();
2622 Close();
2623 options.OptimizeForPointLookup(2);
2624 ASSERT_OK(DB::Open(options, dbname_, &db_));
2625
2626 ASSERT_OK(Put("foo", "v1"));
2627 ASSERT_EQ("v1", Get("foo"));
2628 Flush();
2629 ASSERT_EQ("v1", Get("foo"));
2630 }
2631
TEST_F(DBTest2,OptimizeForSmallDB)2632 TEST_F(DBTest2, OptimizeForSmallDB) {
2633 Options options = CurrentOptions();
2634 Close();
2635 options.OptimizeForSmallDb();
2636
2637 // Find the cache object
2638 ASSERT_EQ(std::string(BlockBasedTableFactory::kName),
2639 std::string(options.table_factory->Name()));
2640 BlockBasedTableOptions* table_options =
2641 reinterpret_cast<BlockBasedTableOptions*>(
2642 options.table_factory->GetOptions());
2643 ASSERT_TRUE(table_options != nullptr);
2644 std::shared_ptr<Cache> cache = table_options->block_cache;
2645
2646 ASSERT_EQ(0, cache->GetUsage());
2647 ASSERT_OK(DB::Open(options, dbname_, &db_));
2648 ASSERT_OK(Put("foo", "v1"));
2649
2650 // memtable size is costed to the block cache
2651 ASSERT_NE(0, cache->GetUsage());
2652
2653 ASSERT_EQ("v1", Get("foo"));
2654 Flush();
2655
2656 size_t prev_size = cache->GetUsage();
2657 // Remember block cache size, so that we can find that
2658 // it is filled after Get().
2659 // Use pinnable slice so that it can ping the block so that
2660 // when we check the size it is not evicted.
2661 PinnableSlice value;
2662 ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), "foo", &value));
2663 ASSERT_GT(cache->GetUsage(), prev_size);
2664 value.Reset();
2665 }
2666
2667 #endif // ROCKSDB_LITE
2668
TEST_F(DBTest2,GetRaceFlush1)2669 TEST_F(DBTest2, GetRaceFlush1) {
2670 ASSERT_OK(Put("foo", "v1"));
2671
2672 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2673 {{"DBImpl::GetImpl:1", "DBTest2::GetRaceFlush:1"},
2674 {"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:2"}});
2675
2676 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2677
2678 ROCKSDB_NAMESPACE::port::Thread t1([&] {
2679 TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
2680 ASSERT_OK(Put("foo", "v2"));
2681 Flush();
2682 TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
2683 });
2684
2685 // Get() is issued after the first Put(), so it should see either
2686 // "v1" or "v2".
2687 ASSERT_NE("NOT_FOUND", Get("foo"));
2688 t1.join();
2689 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2690 }
2691
TEST_F(DBTest2,GetRaceFlush2)2692 TEST_F(DBTest2, GetRaceFlush2) {
2693 ASSERT_OK(Put("foo", "v1"));
2694
2695 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2696 {{"DBImpl::GetImpl:3", "DBTest2::GetRaceFlush:1"},
2697 {"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:4"}});
2698
2699 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2700
2701 port::Thread t1([&] {
2702 TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
2703 ASSERT_OK(Put("foo", "v2"));
2704 Flush();
2705 TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
2706 });
2707
2708 // Get() is issued after the first Put(), so it should see either
2709 // "v1" or "v2".
2710 ASSERT_NE("NOT_FOUND", Get("foo"));
2711 t1.join();
2712 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2713 }
2714
TEST_F(DBTest2,DirectIO)2715 TEST_F(DBTest2, DirectIO) {
2716 if (!IsDirectIOSupported()) {
2717 return;
2718 }
2719 Options options = CurrentOptions();
2720 options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
2721 true;
2722 options.allow_mmap_reads = options.allow_mmap_writes = false;
2723 DestroyAndReopen(options);
2724
2725 ASSERT_OK(Put(Key(0), "a"));
2726 ASSERT_OK(Put(Key(5), "a"));
2727 ASSERT_OK(Flush());
2728
2729 ASSERT_OK(Put(Key(10), "a"));
2730 ASSERT_OK(Put(Key(15), "a"));
2731 ASSERT_OK(Flush());
2732
2733 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2734 Reopen(options);
2735 }
2736
TEST_F(DBTest2,MemtableOnlyIterator)2737 TEST_F(DBTest2, MemtableOnlyIterator) {
2738 Options options = CurrentOptions();
2739 CreateAndReopenWithCF({"pikachu"}, options);
2740
2741 ASSERT_OK(Put(1, "foo", "first"));
2742 ASSERT_OK(Put(1, "bar", "second"));
2743
2744 ReadOptions ropt;
2745 ropt.read_tier = kMemtableTier;
2746 std::string value;
2747 Iterator* it = nullptr;
2748
2749 // Before flushing
2750 // point lookups
2751 ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
2752 ASSERT_EQ("first", value);
2753 ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
2754 ASSERT_EQ("second", value);
2755
2756 // Memtable-only iterator (read_tier=kMemtableTier); data not flushed yet.
2757 it = db_->NewIterator(ropt, handles_[1]);
2758 int count = 0;
2759 for (it->SeekToFirst(); it->Valid(); it->Next()) {
2760 ASSERT_TRUE(it->Valid());
2761 count++;
2762 }
2763 ASSERT_TRUE(!it->Valid());
2764 ASSERT_EQ(2, count);
2765 delete it;
2766
2767 Flush(1);
2768
2769 // After flushing
2770 // point lookups
2771 ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
2772 ASSERT_EQ("first", value);
2773 ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
2774 ASSERT_EQ("second", value);
2775 // nothing should be returned using memtable-only iterator after flushing.
2776 it = db_->NewIterator(ropt, handles_[1]);
2777 count = 0;
2778 for (it->SeekToFirst(); it->Valid(); it->Next()) {
2779 ASSERT_TRUE(it->Valid());
2780 count++;
2781 }
2782 ASSERT_TRUE(!it->Valid());
2783 ASSERT_EQ(0, count);
2784 delete it;
2785
2786 // Add a key to memtable
2787 ASSERT_OK(Put(1, "foobar", "third"));
2788 it = db_->NewIterator(ropt, handles_[1]);
2789 count = 0;
2790 for (it->SeekToFirst(); it->Valid(); it->Next()) {
2791 ASSERT_TRUE(it->Valid());
2792 ASSERT_EQ("foobar", it->key().ToString());
2793 ASSERT_EQ("third", it->value().ToString());
2794 count++;
2795 }
2796 ASSERT_TRUE(!it->Valid());
2797 ASSERT_EQ(1, count);
2798 delete it;
2799 }
2800
TEST_F(DBTest2,LowPriWrite)2801 TEST_F(DBTest2, LowPriWrite) {
2802 Options options = CurrentOptions();
2803 // Compaction pressure should trigger since 6 files
2804 options.level0_file_num_compaction_trigger = 4;
2805 options.level0_slowdown_writes_trigger = 12;
2806 options.level0_stop_writes_trigger = 30;
2807 options.delayed_write_rate = 8 * 1024 * 1024;
2808 Reopen(options);
2809
2810 std::atomic<int> rate_limit_count(0);
2811
2812 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2813 "GenericRateLimiter::Request:1", [&](void* arg) {
2814 rate_limit_count.fetch_add(1);
2815 int64_t* rate_bytes_per_sec = static_cast<int64_t*>(arg);
2816 ASSERT_EQ(1024 * 1024, *rate_bytes_per_sec);
2817 });
2818 // Block compaction
2819 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
2820 {"DBTest.LowPriWrite:0", "DBImpl::BGWorkCompaction"},
2821 });
2822 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2823 WriteOptions wo;
2824 for (int i = 0; i < 6; i++) {
2825 wo.low_pri = false;
2826 Put("", "", wo);
2827 wo.low_pri = true;
2828 Put("", "", wo);
2829 Flush();
2830 }
2831 ASSERT_EQ(0, rate_limit_count.load());
2832 wo.low_pri = true;
2833 Put("", "", wo);
2834 ASSERT_EQ(1, rate_limit_count.load());
2835 wo.low_pri = false;
2836 Put("", "", wo);
2837 ASSERT_EQ(1, rate_limit_count.load());
2838
2839 TEST_SYNC_POINT("DBTest.LowPriWrite:0");
2840 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2841
2842 dbfull()->TEST_WaitForCompact();
2843 wo.low_pri = true;
2844 Put("", "", wo);
2845 ASSERT_EQ(1, rate_limit_count.load());
2846 wo.low_pri = false;
2847 Put("", "", wo);
2848 ASSERT_EQ(1, rate_limit_count.load());
2849 }
2850
2851 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,RateLimitedCompactionReads)2852 TEST_F(DBTest2, RateLimitedCompactionReads) {
2853 // compaction input has 512KB data
2854 const int kNumKeysPerFile = 128;
2855 const int kBytesPerKey = 1024;
2856 const int kNumL0Files = 4;
2857
2858 for (auto use_direct_io : {false, true}) {
2859 if (use_direct_io && !IsDirectIOSupported()) {
2860 continue;
2861 }
2862 Options options = CurrentOptions();
2863 options.compression = kNoCompression;
2864 options.level0_file_num_compaction_trigger = kNumL0Files;
2865 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
2866 options.new_table_reader_for_compaction_inputs = true;
2867 // takes roughly one second, split into 100 x 10ms intervals. Each interval
2868 // permits 5.12KB, which is smaller than the block size, so this test
2869 // exercises the code for chunking reads.
2870 options.rate_limiter.reset(NewGenericRateLimiter(
2871 static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
2872 kBytesPerKey) /* rate_bytes_per_sec */,
2873 10 * 1000 /* refill_period_us */, 10 /* fairness */,
2874 RateLimiter::Mode::kReadsOnly));
2875 options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
2876 use_direct_io;
2877 BlockBasedTableOptions bbto;
2878 bbto.block_size = 16384;
2879 bbto.no_block_cache = true;
2880 options.table_factory.reset(new BlockBasedTableFactory(bbto));
2881 DestroyAndReopen(options);
2882
2883 for (int i = 0; i < kNumL0Files; ++i) {
2884 for (int j = 0; j <= kNumKeysPerFile; ++j) {
2885 ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
2886 }
2887 dbfull()->TEST_WaitForFlushMemTable();
2888 ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
2889 }
2890 dbfull()->TEST_WaitForCompact();
2891 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2892
2893 ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH));
2894 // should be slightly above 512KB due to non-data blocks read. Arbitrarily
2895 // chose 1MB as the upper bound on the total bytes read.
2896 size_t rate_limited_bytes =
2897 options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
2898 // Include the explicit prefetch of the footer in direct I/O case.
2899 size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
2900 ASSERT_GE(
2901 rate_limited_bytes,
2902 static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
2903 ASSERT_LT(
2904 rate_limited_bytes,
2905 static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
2906 direct_io_extra));
2907
2908 Iterator* iter = db_->NewIterator(ReadOptions());
2909 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2910 ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
2911 }
2912 delete iter;
2913 // bytes read for user iterator shouldn't count against the rate limit.
2914 ASSERT_EQ(rate_limited_bytes,
2915 static_cast<size_t>(
2916 options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
2917 }
2918 }
2919 #endif // ROCKSDB_LITE
2920
2921 // Make sure DB can be reopen with reduced number of levels, given no file
2922 // is on levels higher than the new num_levels.
TEST_F(DBTest2,ReduceLevel)2923 TEST_F(DBTest2, ReduceLevel) {
2924 Options options;
2925 options.disable_auto_compactions = true;
2926 options.num_levels = 7;
2927 Reopen(options);
2928 Put("foo", "bar");
2929 Flush();
2930 MoveFilesToLevel(6);
2931 #ifndef ROCKSDB_LITE
2932 ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
2933 #endif // !ROCKSDB_LITE
2934 CompactRangeOptions compact_options;
2935 compact_options.change_level = true;
2936 compact_options.target_level = 1;
2937 dbfull()->CompactRange(compact_options, nullptr, nullptr);
2938 #ifndef ROCKSDB_LITE
2939 ASSERT_EQ("0,1", FilesPerLevel());
2940 #endif // !ROCKSDB_LITE
2941 options.num_levels = 3;
2942 Reopen(options);
2943 #ifndef ROCKSDB_LITE
2944 ASSERT_EQ("0,1", FilesPerLevel());
2945 #endif // !ROCKSDB_LITE
2946 }
2947
2948 // Test that ReadCallback is actually used in both memtbale and sst tables
TEST_F(DBTest2,ReadCallbackTest)2949 TEST_F(DBTest2, ReadCallbackTest) {
2950 Options options;
2951 options.disable_auto_compactions = true;
2952 options.num_levels = 7;
2953 Reopen(options);
2954 std::vector<const Snapshot*> snapshots;
2955 // Try to create a db with multiple layers and a memtable
2956 const std::string key = "foo";
2957 const std::string value = "bar";
2958 // This test assumes that the seq start with 1 and increased by 1 after each
2959 // write batch of size 1. If that behavior changes, the test needs to be
2960 // updated as well.
2961 // TODO(myabandeh): update this test to use the seq number that is returned by
2962 // the DB instead of assuming what seq the DB used.
2963 int i = 1;
2964 for (; i < 10; i++) {
2965 Put(key, value + std::to_string(i));
2966 // Take a snapshot to avoid the value being removed during compaction
2967 auto snapshot = dbfull()->GetSnapshot();
2968 snapshots.push_back(snapshot);
2969 }
2970 Flush();
2971 for (; i < 20; i++) {
2972 Put(key, value + std::to_string(i));
2973 // Take a snapshot to avoid the value being removed during compaction
2974 auto snapshot = dbfull()->GetSnapshot();
2975 snapshots.push_back(snapshot);
2976 }
2977 Flush();
2978 MoveFilesToLevel(6);
2979 #ifndef ROCKSDB_LITE
2980 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
2981 #endif // !ROCKSDB_LITE
2982 for (; i < 30; i++) {
2983 Put(key, value + std::to_string(i));
2984 auto snapshot = dbfull()->GetSnapshot();
2985 snapshots.push_back(snapshot);
2986 }
2987 Flush();
2988 #ifndef ROCKSDB_LITE
2989 ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel());
2990 #endif // !ROCKSDB_LITE
2991 // And also add some values to the memtable
2992 for (; i < 40; i++) {
2993 Put(key, value + std::to_string(i));
2994 auto snapshot = dbfull()->GetSnapshot();
2995 snapshots.push_back(snapshot);
2996 }
2997
2998 class TestReadCallback : public ReadCallback {
2999 public:
3000 explicit TestReadCallback(SequenceNumber snapshot)
3001 : ReadCallback(snapshot), snapshot_(snapshot) {}
3002 bool IsVisibleFullCheck(SequenceNumber seq) override {
3003 return seq <= snapshot_;
3004 }
3005
3006 private:
3007 SequenceNumber snapshot_;
3008 };
3009
3010 for (int seq = 1; seq < i; seq++) {
3011 PinnableSlice pinnable_val;
3012 ReadOptions roptions;
3013 TestReadCallback callback(seq);
3014 bool dont_care = true;
3015 DBImpl::GetImplOptions get_impl_options;
3016 get_impl_options.column_family = dbfull()->DefaultColumnFamily();
3017 get_impl_options.value = &pinnable_val;
3018 get_impl_options.value_found = &dont_care;
3019 get_impl_options.callback = &callback;
3020 Status s = dbfull()->GetImpl(roptions, key, get_impl_options);
3021 ASSERT_TRUE(s.ok());
3022 // Assuming that after each Put the DB increased seq by one, the value and
3023 // seq number must be equal since we also inc value by 1 after each Put.
3024 ASSERT_EQ(value + std::to_string(seq), pinnable_val.ToString());
3025 }
3026
3027 for (auto snapshot : snapshots) {
3028 dbfull()->ReleaseSnapshot(snapshot);
3029 }
3030 }
3031
3032 #ifndef ROCKSDB_LITE
3033
TEST_F(DBTest2,LiveFilesOmitObsoleteFiles)3034 TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
3035 // Regression test for race condition where an obsolete file is returned to
3036 // user as a "live file" but then deleted, all while file deletions are
3037 // disabled.
3038 //
3039 // It happened like this:
3040 //
3041 // 1. [flush thread] Log file "x.log" found by FindObsoleteFiles
3042 // 2. [user thread] DisableFileDeletions, GetSortedWalFiles are called and the
3043 // latter returned "x.log"
3044 // 3. [flush thread] PurgeObsoleteFiles deleted "x.log"
3045 // 4. [user thread] Reading "x.log" failed
3046 //
3047 // Unfortunately the only regression test I can come up with involves sleep.
3048 // We cannot set SyncPoints to repro since, once the fix is applied, the
3049 // SyncPoints would cause a deadlock as the repro's sequence of events is now
3050 // prohibited.
3051 //
3052 // Instead, if we sleep for a second between Find and Purge, and ensure the
3053 // read attempt happens after purge, then the sequence of events will almost
3054 // certainly happen on the old code.
3055 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3056 {"DBImpl::BackgroundCallFlush:FilesFound",
3057 "DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered"},
3058 {"DBImpl::PurgeObsoleteFiles:End",
3059 "DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured"},
3060 });
3061 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3062 "DBImpl::PurgeObsoleteFiles:Begin",
3063 [&](void* /*arg*/) { env_->SleepForMicroseconds(1000000); });
3064 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3065
3066 Put("key", "val");
3067 FlushOptions flush_opts;
3068 flush_opts.wait = false;
3069 db_->Flush(flush_opts);
3070 TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered");
3071
3072 db_->DisableFileDeletions();
3073 VectorLogPtr log_files;
3074 db_->GetSortedWalFiles(log_files);
3075 TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured");
3076 for (const auto& log_file : log_files) {
3077 ASSERT_OK(env_->FileExists(LogFileName(dbname_, log_file->LogNumber())));
3078 }
3079
3080 db_->EnableFileDeletions();
3081 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3082 }
3083
TEST_F(DBTest2,TestNumPread)3084 TEST_F(DBTest2, TestNumPread) {
3085 Options options = CurrentOptions();
3086 // disable block cache
3087 BlockBasedTableOptions table_options;
3088 table_options.no_block_cache = true;
3089 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3090 Reopen(options);
3091 env_->count_random_reads_ = true;
3092
3093 env_->random_file_open_counter_.store(0);
3094 ASSERT_OK(Put("bar", "foo"));
3095 ASSERT_OK(Put("foo", "bar"));
3096 ASSERT_OK(Flush());
3097 // After flush, we'll open the file and read footer, meta block,
3098 // property block and index block.
3099 ASSERT_EQ(4, env_->random_read_counter_.Read());
3100 ASSERT_EQ(1, env_->random_file_open_counter_.load());
3101
3102 // One pread per a normal data block read
3103 env_->random_file_open_counter_.store(0);
3104 env_->random_read_counter_.Reset();
3105 ASSERT_EQ("bar", Get("foo"));
3106 ASSERT_EQ(1, env_->random_read_counter_.Read());
3107 // All files are already opened.
3108 ASSERT_EQ(0, env_->random_file_open_counter_.load());
3109
3110 env_->random_file_open_counter_.store(0);
3111 env_->random_read_counter_.Reset();
3112 ASSERT_OK(Put("bar2", "foo2"));
3113 ASSERT_OK(Put("foo2", "bar2"));
3114 ASSERT_OK(Flush());
3115 // After flush, we'll open the file and read footer, meta block,
3116 // property block and index block.
3117 ASSERT_EQ(4, env_->random_read_counter_.Read());
3118 ASSERT_EQ(1, env_->random_file_open_counter_.load());
3119
3120 // Compaction needs two input blocks, which requires 2 preads, and
3121 // generate a new SST file which needs 4 preads (footer, meta block,
3122 // property block and index block). In total 6.
3123 env_->random_file_open_counter_.store(0);
3124 env_->random_read_counter_.Reset();
3125 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3126 ASSERT_EQ(6, env_->random_read_counter_.Read());
3127 // All compactin input files should have already been opened.
3128 ASSERT_EQ(1, env_->random_file_open_counter_.load());
3129
3130 // One pread per a normal data block read
3131 env_->random_file_open_counter_.store(0);
3132 env_->random_read_counter_.Reset();
3133 ASSERT_EQ("foo2", Get("bar2"));
3134 ASSERT_EQ(1, env_->random_read_counter_.Read());
3135 // SST files are already opened.
3136 ASSERT_EQ(0, env_->random_file_open_counter_.load());
3137 }
3138
TEST_F(DBTest2,TraceAndReplay)3139 TEST_F(DBTest2, TraceAndReplay) {
3140 Options options = CurrentOptions();
3141 options.merge_operator = MergeOperators::CreatePutOperator();
3142 ReadOptions ro;
3143 WriteOptions wo;
3144 TraceOptions trace_opts;
3145 EnvOptions env_opts;
3146 CreateAndReopenWithCF({"pikachu"}, options);
3147 Random rnd(301);
3148 Iterator* single_iter = nullptr;
3149
3150 ASSERT_TRUE(db_->EndTrace().IsIOError());
3151
3152 std::string trace_filename = dbname_ + "/rocksdb.trace";
3153 std::unique_ptr<TraceWriter> trace_writer;
3154 ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3155 ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3156
3157 ASSERT_OK(Put(0, "a", "1"));
3158 ASSERT_OK(Merge(0, "b", "2"));
3159 ASSERT_OK(Delete(0, "c"));
3160 ASSERT_OK(SingleDelete(0, "d"));
3161 ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
3162
3163 WriteBatch batch;
3164 ASSERT_OK(batch.Put("f", "11"));
3165 ASSERT_OK(batch.Merge("g", "12"));
3166 ASSERT_OK(batch.Delete("h"));
3167 ASSERT_OK(batch.SingleDelete("i"));
3168 ASSERT_OK(batch.DeleteRange("j", "k"));
3169 ASSERT_OK(db_->Write(wo, &batch));
3170
3171 single_iter = db_->NewIterator(ro);
3172 single_iter->Seek("f");
3173 single_iter->SeekForPrev("g");
3174 delete single_iter;
3175
3176 ASSERT_EQ("1", Get(0, "a"));
3177 ASSERT_EQ("12", Get(0, "g"));
3178
3179 ASSERT_OK(Put(1, "foo", "bar"));
3180 ASSERT_OK(Put(1, "rocksdb", "rocks"));
3181 ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
3182
3183 ASSERT_OK(db_->EndTrace());
3184 // These should not get into the trace file as it is after EndTrace.
3185 Put("hello", "world");
3186 Merge("foo", "bar");
3187
3188 // Open another db, replay, and verify the data
3189 std::string value;
3190 std::string dbname2 = test::TmpDir(env_) + "/db_replay";
3191 ASSERT_OK(DestroyDB(dbname2, options));
3192
3193 // Using a different name than db2, to pacify infer's use-after-lifetime
3194 // warnings (http://fbinfer.com).
3195 DB* db2_init = nullptr;
3196 options.create_if_missing = true;
3197 ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3198 ColumnFamilyHandle* cf;
3199 ASSERT_OK(
3200 db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3201 delete cf;
3202 delete db2_init;
3203
3204 DB* db2 = nullptr;
3205 std::vector<ColumnFamilyDescriptor> column_families;
3206 ColumnFamilyOptions cf_options;
3207 cf_options.merge_operator = MergeOperators::CreatePutOperator();
3208 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3209 column_families.push_back(
3210 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3211 std::vector<ColumnFamilyHandle*> handles;
3212 ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3213
3214 env_->SleepForMicroseconds(100);
3215 // Verify that the keys don't already exist
3216 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3217 ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
3218
3219 std::unique_ptr<TraceReader> trace_reader;
3220 ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3221 Replayer replayer(db2, handles_, std::move(trace_reader));
3222 ASSERT_OK(replayer.Replay());
3223
3224 ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
3225 ASSERT_EQ("1", value);
3226 ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
3227 ASSERT_EQ("12", value);
3228 ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
3229 ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
3230
3231 ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
3232 ASSERT_EQ("bar", value);
3233 ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
3234 ASSERT_EQ("rocks", value);
3235
3236 for (auto handle : handles) {
3237 delete handle;
3238 }
3239 delete db2;
3240 ASSERT_OK(DestroyDB(dbname2, options));
3241 }
3242
TEST_F(DBTest2,TraceWithLimit)3243 TEST_F(DBTest2, TraceWithLimit) {
3244 Options options = CurrentOptions();
3245 options.merge_operator = MergeOperators::CreatePutOperator();
3246 ReadOptions ro;
3247 WriteOptions wo;
3248 TraceOptions trace_opts;
3249 EnvOptions env_opts;
3250 CreateAndReopenWithCF({"pikachu"}, options);
3251 Random rnd(301);
3252
3253 // test the max trace file size options
3254 trace_opts.max_trace_file_size = 5;
3255 std::string trace_filename = dbname_ + "/rocksdb.trace1";
3256 std::unique_ptr<TraceWriter> trace_writer;
3257 ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3258 ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3259 ASSERT_OK(Put(0, "a", "1"));
3260 ASSERT_OK(Put(0, "b", "1"));
3261 ASSERT_OK(Put(0, "c", "1"));
3262 ASSERT_OK(db_->EndTrace());
3263
3264 std::string dbname2 = test::TmpDir(env_) + "/db_replay2";
3265 std::string value;
3266 ASSERT_OK(DestroyDB(dbname2, options));
3267
3268 // Using a different name than db2, to pacify infer's use-after-lifetime
3269 // warnings (http://fbinfer.com).
3270 DB* db2_init = nullptr;
3271 options.create_if_missing = true;
3272 ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3273 ColumnFamilyHandle* cf;
3274 ASSERT_OK(
3275 db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3276 delete cf;
3277 delete db2_init;
3278
3279 DB* db2 = nullptr;
3280 std::vector<ColumnFamilyDescriptor> column_families;
3281 ColumnFamilyOptions cf_options;
3282 cf_options.merge_operator = MergeOperators::CreatePutOperator();
3283 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3284 column_families.push_back(
3285 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3286 std::vector<ColumnFamilyHandle*> handles;
3287 ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3288
3289 env_->SleepForMicroseconds(100);
3290 // Verify that the keys don't already exist
3291 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3292 ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3293 ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3294
3295 std::unique_ptr<TraceReader> trace_reader;
3296 ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3297 Replayer replayer(db2, handles_, std::move(trace_reader));
3298 ASSERT_OK(replayer.Replay());
3299
3300 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3301 ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3302 ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3303
3304 for (auto handle : handles) {
3305 delete handle;
3306 }
3307 delete db2;
3308 ASSERT_OK(DestroyDB(dbname2, options));
3309 }
3310
TEST_F(DBTest2,TraceWithSampling)3311 TEST_F(DBTest2, TraceWithSampling) {
3312 Options options = CurrentOptions();
3313 ReadOptions ro;
3314 WriteOptions wo;
3315 TraceOptions trace_opts;
3316 EnvOptions env_opts;
3317 CreateAndReopenWithCF({"pikachu"}, options);
3318 Random rnd(301);
3319
3320 // test the trace file sampling options
3321 trace_opts.sampling_frequency = 2;
3322 std::string trace_filename = dbname_ + "/rocksdb.trace_sampling";
3323 std::unique_ptr<TraceWriter> trace_writer;
3324 ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3325 ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3326 ASSERT_OK(Put(0, "a", "1"));
3327 ASSERT_OK(Put(0, "b", "2"));
3328 ASSERT_OK(Put(0, "c", "3"));
3329 ASSERT_OK(Put(0, "d", "4"));
3330 ASSERT_OK(Put(0, "e", "5"));
3331 ASSERT_OK(db_->EndTrace());
3332
3333 std::string dbname2 = test::TmpDir(env_) + "/db_replay_sampling";
3334 std::string value;
3335 ASSERT_OK(DestroyDB(dbname2, options));
3336
3337 // Using a different name than db2, to pacify infer's use-after-lifetime
3338 // warnings (http://fbinfer.com).
3339 DB* db2_init = nullptr;
3340 options.create_if_missing = true;
3341 ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3342 ColumnFamilyHandle* cf;
3343 ASSERT_OK(
3344 db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3345 delete cf;
3346 delete db2_init;
3347
3348 DB* db2 = nullptr;
3349 std::vector<ColumnFamilyDescriptor> column_families;
3350 ColumnFamilyOptions cf_options;
3351 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3352 column_families.push_back(
3353 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3354 std::vector<ColumnFamilyHandle*> handles;
3355 ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3356
3357 env_->SleepForMicroseconds(100);
3358 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3359 ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3360 ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3361 ASSERT_TRUE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
3362 ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
3363
3364 std::unique_ptr<TraceReader> trace_reader;
3365 ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3366 Replayer replayer(db2, handles_, std::move(trace_reader));
3367 ASSERT_OK(replayer.Replay());
3368
3369 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3370 ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
3371 ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
3372 ASSERT_FALSE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
3373 ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
3374
3375 for (auto handle : handles) {
3376 delete handle;
3377 }
3378 delete db2;
3379 ASSERT_OK(DestroyDB(dbname2, options));
3380 }
3381
TEST_F(DBTest2,TraceWithFilter)3382 TEST_F(DBTest2, TraceWithFilter) {
3383 Options options = CurrentOptions();
3384 options.merge_operator = MergeOperators::CreatePutOperator();
3385 ReadOptions ro;
3386 WriteOptions wo;
3387 TraceOptions trace_opts;
3388 EnvOptions env_opts;
3389 CreateAndReopenWithCF({"pikachu"}, options);
3390 Random rnd(301);
3391 Iterator* single_iter = nullptr;
3392
3393 trace_opts.filter = TraceFilterType::kTraceFilterWrite;
3394
3395 std::string trace_filename = dbname_ + "/rocksdb.trace";
3396 std::unique_ptr<TraceWriter> trace_writer;
3397 ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
3398 ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
3399
3400 ASSERT_OK(Put(0, "a", "1"));
3401 ASSERT_OK(Merge(0, "b", "2"));
3402 ASSERT_OK(Delete(0, "c"));
3403 ASSERT_OK(SingleDelete(0, "d"));
3404 ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
3405
3406 WriteBatch batch;
3407 ASSERT_OK(batch.Put("f", "11"));
3408 ASSERT_OK(batch.Merge("g", "12"));
3409 ASSERT_OK(batch.Delete("h"));
3410 ASSERT_OK(batch.SingleDelete("i"));
3411 ASSERT_OK(batch.DeleteRange("j", "k"));
3412 ASSERT_OK(db_->Write(wo, &batch));
3413
3414 single_iter = db_->NewIterator(ro);
3415 single_iter->Seek("f");
3416 single_iter->SeekForPrev("g");
3417 delete single_iter;
3418
3419 ASSERT_EQ("1", Get(0, "a"));
3420 ASSERT_EQ("12", Get(0, "g"));
3421
3422 ASSERT_OK(Put(1, "foo", "bar"));
3423 ASSERT_OK(Put(1, "rocksdb", "rocks"));
3424 ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
3425
3426 ASSERT_OK(db_->EndTrace());
3427 // These should not get into the trace file as it is after EndTrace.
3428 Put("hello", "world");
3429 Merge("foo", "bar");
3430
3431 // Open another db, replay, and verify the data
3432 std::string value;
3433 std::string dbname2 = test::TmpDir(env_) + "/db_replay";
3434 ASSERT_OK(DestroyDB(dbname2, options));
3435
3436 // Using a different name than db2, to pacify infer's use-after-lifetime
3437 // warnings (http://fbinfer.com).
3438 DB* db2_init = nullptr;
3439 options.create_if_missing = true;
3440 ASSERT_OK(DB::Open(options, dbname2, &db2_init));
3441 ColumnFamilyHandle* cf;
3442 ASSERT_OK(
3443 db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
3444 delete cf;
3445 delete db2_init;
3446
3447 DB* db2 = nullptr;
3448 std::vector<ColumnFamilyDescriptor> column_families;
3449 ColumnFamilyOptions cf_options;
3450 cf_options.merge_operator = MergeOperators::CreatePutOperator();
3451 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3452 column_families.push_back(
3453 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3454 std::vector<ColumnFamilyHandle*> handles;
3455 ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
3456
3457 env_->SleepForMicroseconds(100);
3458 // Verify that the keys don't already exist
3459 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3460 ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
3461
3462 std::unique_ptr<TraceReader> trace_reader;
3463 ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
3464 Replayer replayer(db2, handles_, std::move(trace_reader));
3465 ASSERT_OK(replayer.Replay());
3466
3467 // All the key-values should not present since we filter out the WRITE ops.
3468 ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
3469 ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
3470 ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
3471 ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
3472 ASSERT_TRUE(db2->Get(ro, handles[0], "foo", &value).IsNotFound());
3473 ASSERT_TRUE(db2->Get(ro, handles[0], "rocksdb", &value).IsNotFound());
3474
3475 for (auto handle : handles) {
3476 delete handle;
3477 }
3478 delete db2;
3479 ASSERT_OK(DestroyDB(dbname2, options));
3480
3481 // Set up a new db.
3482 std::string dbname3 = test::TmpDir(env_) + "/db_not_trace_read";
3483 ASSERT_OK(DestroyDB(dbname3, options));
3484
3485 DB* db3_init = nullptr;
3486 options.create_if_missing = true;
3487 ColumnFamilyHandle* cf3;
3488 ASSERT_OK(DB::Open(options, dbname3, &db3_init));
3489 ASSERT_OK(
3490 db3_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf3));
3491 delete cf3;
3492 delete db3_init;
3493
3494 column_families.clear();
3495 column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
3496 column_families.push_back(
3497 ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
3498 handles.clear();
3499
3500 DB* db3 = nullptr;
3501 ASSERT_OK(DB::Open(DBOptions(), dbname3, column_families, &handles, &db3));
3502
3503 env_->SleepForMicroseconds(100);
3504 // Verify that the keys don't already exist
3505 ASSERT_TRUE(db3->Get(ro, handles[0], "a", &value).IsNotFound());
3506 ASSERT_TRUE(db3->Get(ro, handles[0], "g", &value).IsNotFound());
3507
3508 //The tracer will not record the READ ops.
3509 trace_opts.filter = TraceFilterType::kTraceFilterGet;
3510 std::string trace_filename3 = dbname_ + "/rocksdb.trace_3";
3511 std::unique_ptr<TraceWriter> trace_writer3;
3512 ASSERT_OK(
3513 NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
3514 ASSERT_OK(db3->StartTrace(trace_opts, std::move(trace_writer3)));
3515
3516 ASSERT_OK(db3->Put(wo, handles[0], "a", "1"));
3517 ASSERT_OK(db3->Merge(wo, handles[0], "b", "2"));
3518 ASSERT_OK(db3->Delete(wo, handles[0], "c"));
3519 ASSERT_OK(db3->SingleDelete(wo, handles[0], "d"));
3520
3521 ASSERT_OK(db3->Get(ro, handles[0], "a", &value));
3522 ASSERT_EQ(value, "1");
3523 ASSERT_TRUE(db3->Get(ro, handles[0], "c", &value).IsNotFound());
3524
3525 ASSERT_OK(db3->EndTrace());
3526
3527 for (auto handle : handles) {
3528 delete handle;
3529 }
3530 delete db3;
3531 ASSERT_OK(DestroyDB(dbname3, options));
3532
3533 std::unique_ptr<TraceReader> trace_reader3;
3534 ASSERT_OK(
3535 NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
3536
3537 // Count the number of records in the trace file;
3538 int count = 0;
3539 std::string data;
3540 Status s;
3541 while (true) {
3542 s = trace_reader3->Read(&data);
3543 if (!s.ok()) {
3544 break;
3545 }
3546 count += 1;
3547 }
3548 // We also need to count the header and footer
3549 // 4 WRITE + HEADER + FOOTER = 6
3550 ASSERT_EQ(count, 6);
3551 }
3552
3553 #endif // ROCKSDB_LITE
3554
TEST_F(DBTest2,PinnableSliceAndMmapReads)3555 TEST_F(DBTest2, PinnableSliceAndMmapReads) {
3556 Options options = CurrentOptions();
3557 options.allow_mmap_reads = true;
3558 options.max_open_files = 100;
3559 options.compression = kNoCompression;
3560 Reopen(options);
3561
3562 ASSERT_OK(Put("foo", "bar"));
3563 ASSERT_OK(Flush());
3564
3565 PinnableSlice pinned_value;
3566 ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
3567 // It is not safe to pin mmap files as they might disappear by compaction
3568 ASSERT_FALSE(pinned_value.IsPinned());
3569 ASSERT_EQ(pinned_value.ToString(), "bar");
3570
3571 dbfull()->TEST_CompactRange(0 /* level */, nullptr /* begin */,
3572 nullptr /* end */, nullptr /* column_family */,
3573 true /* disallow_trivial_move */);
3574
3575 // Ensure pinned_value doesn't rely on memory munmap'd by the above
3576 // compaction. It crashes if it does.
3577 ASSERT_EQ(pinned_value.ToString(), "bar");
3578
3579 #ifndef ROCKSDB_LITE
3580 pinned_value.Reset();
3581 // Unsafe to pin mmap files when they could be kicked out of table cache
3582 Close();
3583 ASSERT_OK(ReadOnlyReopen(options));
3584 ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
3585 ASSERT_FALSE(pinned_value.IsPinned());
3586 ASSERT_EQ(pinned_value.ToString(), "bar");
3587
3588 pinned_value.Reset();
3589 // In read-only mode with infinite capacity on table cache it should pin the
3590 // value and avoid the memcpy
3591 Close();
3592 options.max_open_files = -1;
3593 ASSERT_OK(ReadOnlyReopen(options));
3594 ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
3595 ASSERT_TRUE(pinned_value.IsPinned());
3596 ASSERT_EQ(pinned_value.ToString(), "bar");
3597 #endif
3598 }
3599
TEST_F(DBTest2,DISABLED_IteratorPinnedMemory)3600 TEST_F(DBTest2, DISABLED_IteratorPinnedMemory) {
3601 Options options = CurrentOptions();
3602 options.create_if_missing = true;
3603 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
3604 BlockBasedTableOptions bbto;
3605 bbto.no_block_cache = false;
3606 bbto.cache_index_and_filter_blocks = false;
3607 bbto.block_cache = NewLRUCache(100000);
3608 bbto.block_size = 400; // small block size
3609 options.table_factory.reset(new BlockBasedTableFactory(bbto));
3610 Reopen(options);
3611
3612 Random rnd(301);
3613 std::string v = RandomString(&rnd, 400);
3614
3615 // Since v is the size of a block, each key should take a block
3616 // of 400+ bytes.
3617 Put("1", v);
3618 Put("3", v);
3619 Put("5", v);
3620 Put("7", v);
3621 ASSERT_OK(Flush());
3622
3623 ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
3624
3625 // Verify that iterators don't pin more than one data block in block cache
3626 // at each time.
3627 {
3628 std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
3629 iter->SeekToFirst();
3630
3631 for (int i = 0; i < 4; i++) {
3632 ASSERT_TRUE(iter->Valid());
3633 // Block cache should contain exactly one block.
3634 ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
3635 ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
3636 iter->Next();
3637 }
3638 ASSERT_FALSE(iter->Valid());
3639
3640 iter->Seek("4");
3641 ASSERT_TRUE(iter->Valid());
3642
3643 ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
3644 ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
3645
3646 iter->Seek("3");
3647 ASSERT_TRUE(iter->Valid());
3648
3649 ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
3650 ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
3651 }
3652 ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
3653
3654 // Test compaction case
3655 Put("2", v);
3656 Put("5", v);
3657 Put("6", v);
3658 Put("8", v);
3659 ASSERT_OK(Flush());
3660
3661 // Clear existing data in block cache
3662 bbto.block_cache->SetCapacity(0);
3663 bbto.block_cache->SetCapacity(100000);
3664
3665 // Verify compaction input iterators don't hold more than one data blocks at
3666 // one time.
3667 std::atomic<bool> finished(false);
3668 std::atomic<int> block_newed(0);
3669 std::atomic<int> block_destroyed(0);
3670 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3671 "Block::Block:0", [&](void* /*arg*/) {
3672 if (finished) {
3673 return;
3674 }
3675 // Two iterators. At most 2 outstanding blocks.
3676 EXPECT_GE(block_newed.load(), block_destroyed.load());
3677 EXPECT_LE(block_newed.load(), block_destroyed.load() + 1);
3678 block_newed.fetch_add(1);
3679 });
3680 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3681 "Block::~Block", [&](void* /*arg*/) {
3682 if (finished) {
3683 return;
3684 }
3685 // Two iterators. At most 2 outstanding blocks.
3686 EXPECT_GE(block_newed.load(), block_destroyed.load() + 1);
3687 EXPECT_LE(block_newed.load(), block_destroyed.load() + 2);
3688 block_destroyed.fetch_add(1);
3689 });
3690 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3691 "CompactionJob::Run:BeforeVerify",
3692 [&](void* /*arg*/) { finished = true; });
3693 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3694
3695 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3696
3697 // Two input files. Each of them has 4 data blocks.
3698 ASSERT_EQ(8, block_newed.load());
3699 ASSERT_EQ(8, block_destroyed.load());
3700
3701 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3702 }
3703
TEST_F(DBTest2,TestBBTTailPrefetch)3704 TEST_F(DBTest2, TestBBTTailPrefetch) {
3705 std::atomic<bool> called(false);
3706 size_t expected_lower_bound = 512 * 1024;
3707 size_t expected_higher_bound = 512 * 1024;
3708 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3709 "BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
3710 size_t* prefetch_size = static_cast<size_t*>(arg);
3711 EXPECT_LE(expected_lower_bound, *prefetch_size);
3712 EXPECT_GE(expected_higher_bound, *prefetch_size);
3713 called = true;
3714 });
3715 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3716
3717 Put("1", "1");
3718 Put("9", "1");
3719 Flush();
3720
3721 expected_lower_bound = 0;
3722 expected_higher_bound = 8 * 1024;
3723
3724 Put("1", "1");
3725 Put("9", "1");
3726 Flush();
3727
3728 Put("1", "1");
3729 Put("9", "1");
3730 Flush();
3731
3732 // Full compaction to make sure there is no L0 file after the open.
3733 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3734
3735 ASSERT_TRUE(called.load());
3736 called = false;
3737
3738 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3739 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3740
3741 std::atomic<bool> first_call(true);
3742 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3743 "BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
3744 size_t* prefetch_size = static_cast<size_t*>(arg);
3745 if (first_call) {
3746 EXPECT_EQ(4 * 1024, *prefetch_size);
3747 first_call = false;
3748 } else {
3749 EXPECT_GE(4 * 1024, *prefetch_size);
3750 }
3751 called = true;
3752 });
3753 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3754
3755 Options options = CurrentOptions();
3756 options.max_file_opening_threads = 1; // one thread
3757 BlockBasedTableOptions table_options;
3758 table_options.cache_index_and_filter_blocks = true;
3759 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3760 options.max_open_files = -1;
3761 Reopen(options);
3762
3763 Put("1", "1");
3764 Put("9", "1");
3765 Flush();
3766
3767 Put("1", "1");
3768 Put("9", "1");
3769 Flush();
3770
3771 ASSERT_TRUE(called.load());
3772 called = false;
3773
3774 // Parallel loading SST files
3775 options.max_file_opening_threads = 16;
3776 Reopen(options);
3777
3778 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3779
3780 ASSERT_TRUE(called.load());
3781
3782 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3783 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3784 }
3785
TEST_F(DBTest2,TestGetColumnFamilyHandleUnlocked)3786 TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) {
3787 // Setup sync point dependency to reproduce the race condition of
3788 // DBImpl::GetColumnFamilyHandleUnlocked
3789 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3790 {"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1",
3791 "TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2"},
3792 {"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2",
3793 "TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1"},
3794 });
3795 SyncPoint::GetInstance()->EnableProcessing();
3796
3797 CreateColumnFamilies({"test1", "test2"}, Options());
3798 ASSERT_EQ(handles_.size(), 2);
3799
3800 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
3801 port::Thread user_thread1([&]() {
3802 auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[0]->GetID());
3803 ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
3804 TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1");
3805 TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1");
3806 ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
3807 });
3808
3809 port::Thread user_thread2([&]() {
3810 TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2");
3811 auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[1]->GetID());
3812 ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
3813 TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2");
3814 ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
3815 });
3816
3817 user_thread1.join();
3818 user_thread2.join();
3819
3820 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3821 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3822 }
3823
3824 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,TestCompactFiles)3825 TEST_F(DBTest2, TestCompactFiles) {
3826 // Setup sync point dependency to reproduce the race condition of
3827 // DBImpl::GetColumnFamilyHandleUnlocked
3828 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3829 {"TestCompactFiles::IngestExternalFile1",
3830 "TestCompactFiles::IngestExternalFile2"},
3831 });
3832 SyncPoint::GetInstance()->EnableProcessing();
3833
3834 Options options;
3835 options.num_levels = 2;
3836 options.disable_auto_compactions = true;
3837 Reopen(options);
3838 auto* handle = db_->DefaultColumnFamily();
3839 ASSERT_EQ(db_->NumberLevels(handle), 2);
3840
3841 ROCKSDB_NAMESPACE::SstFileWriter sst_file_writer{
3842 ROCKSDB_NAMESPACE::EnvOptions(), options};
3843 std::string external_file1 = dbname_ + "/test_compact_files1.sst_t";
3844 std::string external_file2 = dbname_ + "/test_compact_files2.sst_t";
3845 std::string external_file3 = dbname_ + "/test_compact_files3.sst_t";
3846
3847 ASSERT_OK(sst_file_writer.Open(external_file1));
3848 ASSERT_OK(sst_file_writer.Put("1", "1"));
3849 ASSERT_OK(sst_file_writer.Put("2", "2"));
3850 ASSERT_OK(sst_file_writer.Finish());
3851
3852 ASSERT_OK(sst_file_writer.Open(external_file2));
3853 ASSERT_OK(sst_file_writer.Put("3", "3"));
3854 ASSERT_OK(sst_file_writer.Put("4", "4"));
3855 ASSERT_OK(sst_file_writer.Finish());
3856
3857 ASSERT_OK(sst_file_writer.Open(external_file3));
3858 ASSERT_OK(sst_file_writer.Put("5", "5"));
3859 ASSERT_OK(sst_file_writer.Put("6", "6"));
3860 ASSERT_OK(sst_file_writer.Finish());
3861
3862 ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3},
3863 IngestExternalFileOptions()));
3864 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
3865 std::vector<std::string> files;
3866 GetSstFiles(env_, dbname_, &files);
3867 ASSERT_EQ(files.size(), 2);
3868
3869 port::Thread user_thread1(
3870 [&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); });
3871
3872 port::Thread user_thread2([&]() {
3873 ASSERT_OK(db_->IngestExternalFile(handle, {external_file2},
3874 IngestExternalFileOptions()));
3875 TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
3876 });
3877
3878 user_thread1.join();
3879 user_thread2.join();
3880
3881 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3882 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3883 }
3884 #endif // ROCKSDB_LITE
3885
3886 // TODO: figure out why this test fails in appveyor
3887 #ifndef OS_WIN
TEST_F(DBTest2,MultiDBParallelOpenTest)3888 TEST_F(DBTest2, MultiDBParallelOpenTest) {
3889 const int kNumDbs = 2;
3890 Options options = CurrentOptions();
3891 std::vector<std::string> dbnames;
3892 for (int i = 0; i < kNumDbs; ++i) {
3893 dbnames.emplace_back(test::TmpDir(env_) + "/db" + ToString(i));
3894 ASSERT_OK(DestroyDB(dbnames.back(), options));
3895 }
3896
3897 // Verify empty DBs can be created in parallel
3898 std::vector<std::thread> open_threads;
3899 std::vector<DB*> dbs{static_cast<unsigned int>(kNumDbs), nullptr};
3900 options.create_if_missing = true;
3901 for (int i = 0; i < kNumDbs; ++i) {
3902 open_threads.emplace_back(
3903 [&](int dbnum) {
3904 ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
3905 },
3906 i);
3907 }
3908
3909 // Now add some data and close, so next we can verify non-empty DBs can be
3910 // recovered in parallel
3911 for (int i = 0; i < kNumDbs; ++i) {
3912 open_threads[i].join();
3913 ASSERT_OK(dbs[i]->Put(WriteOptions(), "xi", "gua"));
3914 delete dbs[i];
3915 }
3916
3917 // Verify non-empty DBs can be recovered in parallel
3918 dbs.clear();
3919 open_threads.clear();
3920 for (int i = 0; i < kNumDbs; ++i) {
3921 open_threads.emplace_back(
3922 [&](int dbnum) {
3923 ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
3924 },
3925 i);
3926 }
3927
3928 // Wait and cleanup
3929 for (int i = 0; i < kNumDbs; ++i) {
3930 open_threads[i].join();
3931 delete dbs[i];
3932 ASSERT_OK(DestroyDB(dbnames[i], options));
3933 }
3934 }
3935 #endif // OS_WIN
3936
3937 namespace {
3938 class DummyOldStats : public Statistics {
3939 public:
getTickerCount(uint32_t) const3940 uint64_t getTickerCount(uint32_t /*ticker_type*/) const override { return 0; }
recordTick(uint32_t,uint64_t)3941 void recordTick(uint32_t /* ticker_type */, uint64_t /* count */) override {
3942 num_rt++;
3943 }
setTickerCount(uint32_t,uint64_t)3944 void setTickerCount(uint32_t /*ticker_type*/, uint64_t /*count*/) override {}
getAndResetTickerCount(uint32_t)3945 uint64_t getAndResetTickerCount(uint32_t /*ticker_type*/) override {
3946 return 0;
3947 }
measureTime(uint32_t,uint64_t)3948 void measureTime(uint32_t /*histogram_type*/, uint64_t /*count*/) override {
3949 num_mt++;
3950 }
histogramData(uint32_t,ROCKSDB_NAMESPACE::HistogramData * const) const3951 void histogramData(
3952 uint32_t /*histogram_type*/,
3953 ROCKSDB_NAMESPACE::HistogramData* const /*data*/) const override {}
getHistogramString(uint32_t) const3954 std::string getHistogramString(uint32_t /*type*/) const override {
3955 return "";
3956 }
HistEnabledForType(uint32_t) const3957 bool HistEnabledForType(uint32_t /*type*/) const override { return false; }
ToString() const3958 std::string ToString() const override { return ""; }
3959 int num_rt = 0;
3960 int num_mt = 0;
3961 };
3962 } // namespace
3963
TEST_F(DBTest2,OldStatsInterface)3964 TEST_F(DBTest2, OldStatsInterface) {
3965 DummyOldStats* dos = new DummyOldStats();
3966 std::shared_ptr<Statistics> stats(dos);
3967 Options options = CurrentOptions();
3968 options.create_if_missing = true;
3969 options.statistics = stats;
3970 Reopen(options);
3971
3972 Put("foo", "bar");
3973 ASSERT_EQ("bar", Get("foo"));
3974 ASSERT_OK(Flush());
3975 ASSERT_EQ("bar", Get("foo"));
3976
3977 ASSERT_GT(dos->num_rt, 0);
3978 ASSERT_GT(dos->num_mt, 0);
3979 }
3980
TEST_F(DBTest2,CloseWithUnreleasedSnapshot)3981 TEST_F(DBTest2, CloseWithUnreleasedSnapshot) {
3982 const Snapshot* ss = db_->GetSnapshot();
3983
3984 for (auto h : handles_) {
3985 db_->DestroyColumnFamilyHandle(h);
3986 }
3987 handles_.clear();
3988
3989 ASSERT_NOK(db_->Close());
3990 db_->ReleaseSnapshot(ss);
3991 ASSERT_OK(db_->Close());
3992 delete db_;
3993 db_ = nullptr;
3994 }
3995
TEST_F(DBTest2,PrefixBloomReseek)3996 TEST_F(DBTest2, PrefixBloomReseek) {
3997 Options options = CurrentOptions();
3998 options.create_if_missing = true;
3999 options.prefix_extractor.reset(NewCappedPrefixTransform(3));
4000 BlockBasedTableOptions bbto;
4001 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
4002 bbto.whole_key_filtering = false;
4003 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
4004 DestroyAndReopen(options);
4005
4006 // Construct two L1 files with keys:
4007 // f1:[aaa1 ccc1] f2:[ddd0]
4008 ASSERT_OK(Put("aaa1", ""));
4009 ASSERT_OK(Put("ccc1", ""));
4010 ASSERT_OK(Flush());
4011 ASSERT_OK(Put("ddd0", ""));
4012 ASSERT_OK(Flush());
4013 CompactRangeOptions cro;
4014 cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
4015 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
4016
4017 ASSERT_OK(Put("bbb1", ""));
4018
4019 Iterator* iter = db_->NewIterator(ReadOptions());
4020
4021 // Seeking into f1, the iterator will check bloom filter which returns the
4022 // file iterator ot be invalidate, and the cursor will put into f2, with
4023 // the next key to be "ddd0".
4024 iter->Seek("bbb1");
4025 ASSERT_TRUE(iter->Valid());
4026 ASSERT_EQ("bbb1", iter->key().ToString());
4027
4028 // Reseek ccc1, the L1 iterator needs to go back to f1 and reseek.
4029 iter->Seek("ccc1");
4030 ASSERT_TRUE(iter->Valid());
4031 ASSERT_EQ("ccc1", iter->key().ToString());
4032
4033 delete iter;
4034 }
4035
TEST_F(DBTest2,PrefixBloomFilteredOut)4036 TEST_F(DBTest2, PrefixBloomFilteredOut) {
4037 Options options = CurrentOptions();
4038 options.create_if_missing = true;
4039 options.prefix_extractor.reset(NewCappedPrefixTransform(3));
4040 BlockBasedTableOptions bbto;
4041 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
4042 bbto.whole_key_filtering = false;
4043 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
4044 DestroyAndReopen(options);
4045
4046 // Construct two L1 files with keys:
4047 // f1:[aaa1 ccc1] f2:[ddd0]
4048 ASSERT_OK(Put("aaa1", ""));
4049 ASSERT_OK(Put("ccc1", ""));
4050 ASSERT_OK(Flush());
4051 ASSERT_OK(Put("ddd0", ""));
4052 ASSERT_OK(Flush());
4053 CompactRangeOptions cro;
4054 cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
4055 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
4056
4057 Iterator* iter = db_->NewIterator(ReadOptions());
4058
4059 // Bloom filter is filterd out by f1.
4060 // This is just one of several valid position following the contract.
4061 // Postioning to ccc1 or ddd0 is also valid. This is just to validate
4062 // the behavior of the current implementation. If underlying implementation
4063 // changes, the test might fail here.
4064 iter->Seek("bbb1");
4065 ASSERT_FALSE(iter->Valid());
4066
4067 delete iter;
4068 }
4069
4070 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,RowCacheSnapshot)4071 TEST_F(DBTest2, RowCacheSnapshot) {
4072 Options options = CurrentOptions();
4073 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
4074 options.row_cache = NewLRUCache(8 * 8192);
4075 DestroyAndReopen(options);
4076
4077 ASSERT_OK(Put("foo", "bar1"));
4078
4079 const Snapshot* s1 = db_->GetSnapshot();
4080
4081 ASSERT_OK(Put("foo", "bar2"));
4082 ASSERT_OK(Flush());
4083
4084 ASSERT_OK(Put("foo2", "bar"));
4085 const Snapshot* s2 = db_->GetSnapshot();
4086 ASSERT_OK(Put("foo3", "bar"));
4087 const Snapshot* s3 = db_->GetSnapshot();
4088
4089 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
4090 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 0);
4091 ASSERT_EQ(Get("foo"), "bar2");
4092 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
4093 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
4094 ASSERT_EQ(Get("foo"), "bar2");
4095 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
4096 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
4097 ASSERT_EQ(Get("foo", s1), "bar1");
4098 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
4099 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
4100 ASSERT_EQ(Get("foo", s2), "bar2");
4101 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 2);
4102 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
4103 ASSERT_EQ(Get("foo", s1), "bar1");
4104 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 3);
4105 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
4106 ASSERT_EQ(Get("foo", s3), "bar2");
4107 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 4);
4108 ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
4109
4110 db_->ReleaseSnapshot(s1);
4111 db_->ReleaseSnapshot(s2);
4112 db_->ReleaseSnapshot(s3);
4113 }
4114 #endif // ROCKSDB_LITE
4115
4116 // When DB is reopened with multiple column families, the manifest file
4117 // is written after the first CF is flushed, and it is written again
4118 // after each flush. If DB crashes between the flushes, the flushed CF
4119 // flushed will pass the latest log file, and now we require it not
4120 // to be corrupted, and triggering a corruption report.
4121 // We need to fix the bug and enable the test.
TEST_F(DBTest2,CrashInRecoveryMultipleCF)4122 TEST_F(DBTest2, CrashInRecoveryMultipleCF) {
4123 const std::vector<std::string> sync_points = {
4124 "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable",
4125 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0"};
4126 for (const auto& test_sync_point : sync_points) {
4127 Options options = CurrentOptions();
4128 // First destroy original db to ensure a clean start.
4129 DestroyAndReopen(options);
4130 options.create_if_missing = true;
4131 options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
4132 CreateAndReopenWithCF({"pikachu"}, options);
4133 ASSERT_OK(Put("foo", "bar"));
4134 ASSERT_OK(Flush());
4135 ASSERT_OK(Put(1, "foo", "bar"));
4136 ASSERT_OK(Flush(1));
4137 ASSERT_OK(Put("foo", "bar"));
4138 ASSERT_OK(Put(1, "foo", "bar"));
4139 // The value is large enough to be divided to two blocks.
4140 std::string large_value(400, ' ');
4141 ASSERT_OK(Put("foo1", large_value));
4142 ASSERT_OK(Put("foo2", large_value));
4143 Close();
4144
4145 // Corrupt the log file in the middle, so that it is not corrupted
4146 // in the tail.
4147 std::vector<std::string> filenames;
4148 ASSERT_OK(env_->GetChildren(dbname_, &filenames));
4149 for (const auto& f : filenames) {
4150 uint64_t number;
4151 FileType type;
4152 if (ParseFileName(f, &number, &type) && type == FileType::kLogFile) {
4153 std::string fname = dbname_ + "/" + f;
4154 std::string file_content;
4155 ASSERT_OK(ReadFileToString(env_, fname, &file_content));
4156 file_content[400] = 'h';
4157 file_content[401] = 'a';
4158 ASSERT_OK(WriteStringToFile(env_, file_content, fname));
4159 break;
4160 }
4161 }
4162
4163 // Reopen and freeze the file system after the first manifest write.
4164 FaultInjectionTestEnv fit_env(options.env);
4165 options.env = &fit_env;
4166 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
4167 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4168 test_sync_point,
4169 [&](void* /*arg*/) { fit_env.SetFilesystemActive(false); });
4170 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4171 ASSERT_NOK(TryReopenWithColumnFamilies(
4172 {kDefaultColumnFamilyName, "pikachu"}, options));
4173 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4174
4175 fit_env.SetFilesystemActive(true);
4176 // If we continue using failure ingestion Env, it will conplain something
4177 // when renaming current file, which is not expected. Need to investigate
4178 // why.
4179 options.env = env_;
4180 ASSERT_OK(TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
4181 options));
4182 }
4183 }
4184
TEST_F(DBTest2,SeekFileRangeDeleteTail)4185 TEST_F(DBTest2, SeekFileRangeDeleteTail) {
4186 Options options = CurrentOptions();
4187 options.prefix_extractor.reset(NewCappedPrefixTransform(1));
4188 options.num_levels = 3;
4189 DestroyAndReopen(options);
4190
4191 ASSERT_OK(Put("a", "a"));
4192 const Snapshot* s1 = db_->GetSnapshot();
4193 ASSERT_OK(
4194 db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "f"));
4195 ASSERT_OK(Put("b", "a"));
4196 ASSERT_OK(Flush());
4197
4198 ASSERT_OK(Put("x", "a"));
4199 ASSERT_OK(Put("z", "a"));
4200 ASSERT_OK(Flush());
4201
4202 CompactRangeOptions cro;
4203 cro.change_level = true;
4204 cro.target_level = 2;
4205 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
4206
4207 {
4208 ReadOptions ro;
4209 ro.total_order_seek = true;
4210 std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
4211 iter->Seek("e");
4212 ASSERT_TRUE(iter->Valid());
4213 ASSERT_EQ("x", iter->key().ToString());
4214 }
4215 db_->ReleaseSnapshot(s1);
4216 }
4217
TEST_F(DBTest2,BackgroundPurgeTest)4218 TEST_F(DBTest2, BackgroundPurgeTest) {
4219 Options options = CurrentOptions();
4220 options.write_buffer_manager =
4221 std::make_shared<ROCKSDB_NAMESPACE::WriteBufferManager>(1 << 20);
4222 options.avoid_unnecessary_blocking_io = true;
4223 DestroyAndReopen(options);
4224 size_t base_value = options.write_buffer_manager->memory_usage();
4225
4226 ASSERT_OK(Put("a", "a"));
4227 Iterator* iter = db_->NewIterator(ReadOptions());
4228 ASSERT_OK(Flush());
4229 size_t value = options.write_buffer_manager->memory_usage();
4230 ASSERT_GT(value, base_value);
4231
4232 db_->GetEnv()->SetBackgroundThreads(1, Env::Priority::HIGH);
4233 test::SleepingBackgroundTask sleeping_task_after;
4234 db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4235 &sleeping_task_after, Env::Priority::HIGH);
4236 delete iter;
4237
4238 Env::Default()->SleepForMicroseconds(100000);
4239 value = options.write_buffer_manager->memory_usage();
4240 ASSERT_GT(value, base_value);
4241
4242 sleeping_task_after.WakeUp();
4243 sleeping_task_after.WaitUntilDone();
4244
4245 test::SleepingBackgroundTask sleeping_task_after2;
4246 db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4247 &sleeping_task_after2, Env::Priority::HIGH);
4248 sleeping_task_after2.WakeUp();
4249 sleeping_task_after2.WaitUntilDone();
4250
4251 value = options.write_buffer_manager->memory_usage();
4252 ASSERT_EQ(base_value, value);
4253 }
4254
TEST_F(DBTest2,SwitchMemtableRaceWithNewManifest)4255 TEST_F(DBTest2, SwitchMemtableRaceWithNewManifest) {
4256 Options options = CurrentOptions();
4257 DestroyAndReopen(options);
4258 options.max_manifest_file_size = 10;
4259 options.create_if_missing = true;
4260 CreateAndReopenWithCF({"pikachu"}, options);
4261 ASSERT_EQ(2, handles_.size());
4262
4263 ASSERT_OK(Put("foo", "value"));
4264 const int kL0Files = options.level0_file_num_compaction_trigger;
4265 for (int i = 0; i < kL0Files; ++i) {
4266 ASSERT_OK(Put(/*cf=*/1, "a", std::to_string(i)));
4267 ASSERT_OK(Flush(/*cf=*/1));
4268 }
4269
4270 port::Thread thread([&]() { ASSERT_OK(Flush()); });
4271 ASSERT_OK(dbfull()->TEST_WaitForCompact());
4272 thread.join();
4273 }
4274
TEST_F(DBTest2,SameSmallestInSameLevel)4275 TEST_F(DBTest2, SameSmallestInSameLevel) {
4276 // This test validates fractional casacading logic when several files at one
4277 // one level only contains the same user key.
4278 Options options = CurrentOptions();
4279 options.merge_operator = MergeOperators::CreateStringAppendOperator();
4280 DestroyAndReopen(options);
4281
4282 ASSERT_OK(Put("key", "1"));
4283 ASSERT_OK(Put("key", "2"));
4284 ASSERT_OK(db_->Merge(WriteOptions(), "key", "3"));
4285 ASSERT_OK(db_->Merge(WriteOptions(), "key", "4"));
4286 Flush();
4287 CompactRangeOptions cro;
4288 cro.change_level = true;
4289 cro.target_level = 2;
4290 ASSERT_OK(dbfull()->CompactRange(cro, db_->DefaultColumnFamily(), nullptr,
4291 nullptr));
4292
4293 ASSERT_OK(db_->Merge(WriteOptions(), "key", "5"));
4294 Flush();
4295 ASSERT_OK(db_->Merge(WriteOptions(), "key", "6"));
4296 Flush();
4297 ASSERT_OK(db_->Merge(WriteOptions(), "key", "7"));
4298 Flush();
4299 ASSERT_OK(db_->Merge(WriteOptions(), "key", "8"));
4300 Flush();
4301 dbfull()->TEST_WaitForCompact(true);
4302 #ifndef ROCKSDB_LITE
4303 ASSERT_EQ("0,4,1", FilesPerLevel());
4304 #endif // ROCKSDB_LITE
4305
4306 ASSERT_EQ("2,3,4,5,6,7,8", Get("key"));
4307 }
4308
TEST_F(DBTest2,BlockBasedTablePrefixIndexSeekForPrev)4309 TEST_F(DBTest2, BlockBasedTablePrefixIndexSeekForPrev) {
4310 // create a DB with block prefix index
4311 BlockBasedTableOptions table_options;
4312 Options options = CurrentOptions();
4313 table_options.block_size = 300;
4314 table_options.index_type = BlockBasedTableOptions::kHashSearch;
4315 table_options.index_shortening =
4316 BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
4317 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4318 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
4319
4320 Reopen(options);
4321
4322 Random rnd(301);
4323 std::string large_value = RandomString(&rnd, 500);
4324
4325 ASSERT_OK(Put("a1", large_value));
4326 ASSERT_OK(Put("x1", large_value));
4327 ASSERT_OK(Put("y1", large_value));
4328 Flush();
4329
4330 {
4331 std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
4332 iterator->SeekForPrev("x3");
4333 ASSERT_TRUE(iterator->Valid());
4334 ASSERT_EQ("x1", iterator->key().ToString());
4335
4336 iterator->SeekForPrev("a3");
4337 ASSERT_TRUE(iterator->Valid());
4338 ASSERT_EQ("a1", iterator->key().ToString());
4339
4340 iterator->SeekForPrev("y3");
4341 ASSERT_TRUE(iterator->Valid());
4342 ASSERT_EQ("y1", iterator->key().ToString());
4343
4344 // Query more than one non-existing prefix to cover the case both
4345 // of empty hash bucket and hash bucket conflict.
4346 iterator->SeekForPrev("b1");
4347 // Result should be not valid or "a1".
4348 if (iterator->Valid()) {
4349 ASSERT_EQ("a1", iterator->key().ToString());
4350 }
4351
4352 iterator->SeekForPrev("c1");
4353 // Result should be not valid or "a1".
4354 if (iterator->Valid()) {
4355 ASSERT_EQ("a1", iterator->key().ToString());
4356 }
4357
4358 iterator->SeekForPrev("d1");
4359 // Result should be not valid or "a1".
4360 if (iterator->Valid()) {
4361 ASSERT_EQ("a1", iterator->key().ToString());
4362 }
4363
4364 iterator->SeekForPrev("y3");
4365 ASSERT_TRUE(iterator->Valid());
4366 ASSERT_EQ("y1", iterator->key().ToString());
4367 }
4368 }
4369
TEST_F(DBTest2,ChangePrefixExtractor)4370 TEST_F(DBTest2, ChangePrefixExtractor) {
4371 for (bool use_partitioned_filter : {true, false}) {
4372 // create a DB with block prefix index
4373 BlockBasedTableOptions table_options;
4374 Options options = CurrentOptions();
4375
4376 // Sometimes filter is checked based on upper bound. Assert counters
4377 // for that case. Otherwise, only check data correctness.
4378 #ifndef ROCKSDB_LITE
4379 bool expect_filter_check = !use_partitioned_filter;
4380 #else
4381 bool expect_filter_check = false;
4382 #endif
4383 table_options.partition_filters = use_partitioned_filter;
4384 if (use_partitioned_filter) {
4385 table_options.index_type =
4386 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
4387 }
4388 table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
4389
4390 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4391 options.statistics = CreateDBStatistics();
4392
4393 options.prefix_extractor.reset(NewFixedPrefixTransform(2));
4394 DestroyAndReopen(options);
4395
4396 Random rnd(301);
4397
4398 ASSERT_OK(Put("aa", ""));
4399 ASSERT_OK(Put("xb", ""));
4400 ASSERT_OK(Put("xx1", ""));
4401 ASSERT_OK(Put("xz1", ""));
4402 ASSERT_OK(Put("zz", ""));
4403 Flush();
4404
4405 // After reopening DB with prefix size 2 => 1, prefix extractor
4406 // won't take effective unless it won't change results based
4407 // on upper bound and seek key.
4408 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
4409 Reopen(options);
4410
4411 {
4412 std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
4413 iterator->Seek("xa");
4414 ASSERT_TRUE(iterator->Valid());
4415 ASSERT_EQ("xb", iterator->key().ToString());
4416 // It's a bug that the counter BLOOM_FILTER_PREFIX_CHECKED is not
4417 // correct in this case. So don't check counters in this case.
4418 if (expect_filter_check) {
4419 ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4420 }
4421
4422 iterator->Seek("xz");
4423 ASSERT_TRUE(iterator->Valid());
4424 ASSERT_EQ("xz1", iterator->key().ToString());
4425 if (expect_filter_check) {
4426 ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4427 }
4428 }
4429
4430 std::string ub_str = "xg9";
4431 Slice ub(ub_str);
4432 ReadOptions ro;
4433 ro.iterate_upper_bound = &ub;
4434
4435 {
4436 std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4437
4438 // SeekForPrev() never uses prefix bloom if it is changed.
4439 iterator->SeekForPrev("xg0");
4440 ASSERT_TRUE(iterator->Valid());
4441 ASSERT_EQ("xb", iterator->key().ToString());
4442 if (expect_filter_check) {
4443 ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4444 }
4445 }
4446
4447 ub_str = "xx9";
4448 ub = Slice(ub_str);
4449 {
4450 std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4451
4452 iterator->Seek("x");
4453 ASSERT_TRUE(iterator->Valid());
4454 ASSERT_EQ("xb", iterator->key().ToString());
4455 if (expect_filter_check) {
4456 ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4457 }
4458
4459 iterator->Seek("xx0");
4460 ASSERT_TRUE(iterator->Valid());
4461 ASSERT_EQ("xx1", iterator->key().ToString());
4462 if (expect_filter_check) {
4463 ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4464 }
4465 }
4466
4467 CompactRangeOptions compact_range_opts;
4468 compact_range_opts.bottommost_level_compaction =
4469 BottommostLevelCompaction::kForce;
4470 ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
4471 ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
4472
4473 // Re-execute similar queries after a full compaction
4474 {
4475 std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
4476
4477 iterator->Seek("x");
4478 ASSERT_TRUE(iterator->Valid());
4479 ASSERT_EQ("xb", iterator->key().ToString());
4480 if (expect_filter_check) {
4481 ASSERT_EQ(2, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4482 }
4483
4484 iterator->Seek("xg");
4485 ASSERT_TRUE(iterator->Valid());
4486 ASSERT_EQ("xx1", iterator->key().ToString());
4487 if (expect_filter_check) {
4488 ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4489 }
4490
4491 iterator->Seek("xz");
4492 ASSERT_TRUE(iterator->Valid());
4493 ASSERT_EQ("xz1", iterator->key().ToString());
4494 if (expect_filter_check) {
4495 ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4496 }
4497 }
4498 {
4499 std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4500
4501 iterator->SeekForPrev("xx0");
4502 ASSERT_TRUE(iterator->Valid());
4503 ASSERT_EQ("xb", iterator->key().ToString());
4504 if (expect_filter_check) {
4505 ASSERT_EQ(5, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4506 }
4507
4508 iterator->Seek("xx0");
4509 ASSERT_TRUE(iterator->Valid());
4510 ASSERT_EQ("xx1", iterator->key().ToString());
4511 if (expect_filter_check) {
4512 ASSERT_EQ(6, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4513 }
4514 }
4515
4516 ub_str = "xg9";
4517 ub = Slice(ub_str);
4518 {
4519 std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4520 iterator->SeekForPrev("xg0");
4521 ASSERT_TRUE(iterator->Valid());
4522 ASSERT_EQ("xb", iterator->key().ToString());
4523 if (expect_filter_check) {
4524 ASSERT_EQ(7, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4525 }
4526 }
4527 }
4528 }
4529
TEST_F(DBTest2,BlockBasedTablePrefixGetIndexNotFound)4530 TEST_F(DBTest2, BlockBasedTablePrefixGetIndexNotFound) {
4531 // create a DB with block prefix index
4532 BlockBasedTableOptions table_options;
4533 Options options = CurrentOptions();
4534 table_options.block_size = 300;
4535 table_options.index_type = BlockBasedTableOptions::kHashSearch;
4536 table_options.index_shortening =
4537 BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
4538 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4539 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
4540 options.level0_file_num_compaction_trigger = 8;
4541
4542 Reopen(options);
4543
4544 ASSERT_OK(Put("b1", "ok"));
4545 Flush();
4546
4547 // Flushing several files so that the chance that hash bucket
4548 // is empty fo "b" in at least one of the files is high.
4549 ASSERT_OK(Put("a1", ""));
4550 ASSERT_OK(Put("c1", ""));
4551 Flush();
4552
4553 ASSERT_OK(Put("a2", ""));
4554 ASSERT_OK(Put("c2", ""));
4555 Flush();
4556
4557 ASSERT_OK(Put("a3", ""));
4558 ASSERT_OK(Put("c3", ""));
4559 Flush();
4560
4561 ASSERT_OK(Put("a4", ""));
4562 ASSERT_OK(Put("c4", ""));
4563 Flush();
4564
4565 ASSERT_OK(Put("a5", ""));
4566 ASSERT_OK(Put("c5", ""));
4567 Flush();
4568
4569 ASSERT_EQ("ok", Get("b1"));
4570 }
4571
4572 #ifndef ROCKSDB_LITE
TEST_F(DBTest2,AutoPrefixMode1)4573 TEST_F(DBTest2, AutoPrefixMode1) {
4574 // create a DB with block prefix index
4575 BlockBasedTableOptions table_options;
4576 Options options = CurrentOptions();
4577 table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
4578 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
4579 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
4580 options.statistics = CreateDBStatistics();
4581
4582 Reopen(options);
4583
4584 Random rnd(301);
4585 std::string large_value = RandomString(&rnd, 500);
4586
4587 ASSERT_OK(Put("a1", large_value));
4588 ASSERT_OK(Put("x1", large_value));
4589 ASSERT_OK(Put("y1", large_value));
4590 Flush();
4591
4592 ReadOptions ro;
4593 ro.total_order_seek = false;
4594 ro.auto_prefix_mode = true;
4595 {
4596 std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4597 iterator->Seek("b1");
4598 ASSERT_TRUE(iterator->Valid());
4599 ASSERT_EQ("x1", iterator->key().ToString());
4600 ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4601 }
4602
4603 std::string ub_str = "b9";
4604 Slice ub(ub_str);
4605 ro.iterate_upper_bound = &ub;
4606
4607 {
4608 std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4609 iterator->Seek("b1");
4610 ASSERT_FALSE(iterator->Valid());
4611 ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4612 }
4613
4614 ub_str = "z";
4615 ub = Slice(ub_str);
4616 {
4617 std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4618 iterator->Seek("b1");
4619 ASSERT_TRUE(iterator->Valid());
4620 ASSERT_EQ("x1", iterator->key().ToString());
4621 ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4622 }
4623
4624 ub_str = "c";
4625 ub = Slice(ub_str);
4626 {
4627 std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4628 iterator->Seek("b1");
4629 ASSERT_FALSE(iterator->Valid());
4630 ASSERT_EQ(2, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4631 }
4632
4633 // The same queries without recreating iterator
4634 {
4635 ub_str = "b9";
4636 ub = Slice(ub_str);
4637 ro.iterate_upper_bound = &ub;
4638
4639 std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
4640 iterator->Seek("b1");
4641 ASSERT_FALSE(iterator->Valid());
4642 ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4643
4644 ub_str = "z";
4645 ub = Slice(ub_str);
4646
4647 iterator->Seek("b1");
4648 ASSERT_TRUE(iterator->Valid());
4649 ASSERT_EQ("x1", iterator->key().ToString());
4650 ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4651
4652 ub_str = "c";
4653 ub = Slice(ub_str);
4654
4655 iterator->Seek("b1");
4656 ASSERT_FALSE(iterator->Valid());
4657 ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4658
4659 ub_str = "b9";
4660 ub = Slice(ub_str);
4661 ro.iterate_upper_bound = &ub;
4662 iterator->SeekForPrev("b1");
4663 ASSERT_TRUE(iterator->Valid());
4664 ASSERT_EQ("a1", iterator->key().ToString());
4665 ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
4666
4667 ub_str = "zz";
4668 ub = Slice(ub_str);
4669 ro.iterate_upper_bound = &ub;
4670 iterator->SeekToLast();
4671 ASSERT_TRUE(iterator->Valid());
4672 ASSERT_EQ("y1", iterator->key().ToString());
4673
4674 iterator->SeekToFirst();
4675 ASSERT_TRUE(iterator->Valid());
4676 ASSERT_EQ("a1", iterator->key().ToString());
4677 }
4678 }
4679 #endif // ROCKSDB_LITE
4680 } // namespace ROCKSDB_NAMESPACE
4681
4682 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
4683 extern "C" {
4684 void RegisterCustomObjects(int argc, char** argv);
4685 }
4686 #else
RegisterCustomObjects(int,char **)4687 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
4688 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
4689
main(int argc,char ** argv)4690 int main(int argc, char** argv) {
4691 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
4692 ::testing::InitGoogleTest(&argc, argv);
4693 RegisterCustomObjects(argc, argv);
4694 return RUN_ALL_TESTS();
4695 }
4696