1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_test_util.h"
10 #include "port/stack_trace.h"
11 #include "rocksdb/perf_context.h"
12 #include "rocksdb/utilities/debug.h"
13 #include "table/block_based/block_based_table_reader.h"
14 #include "table/block_based/block_builder.h"
15 #include "test_util/fault_injection_test_env.h"
16 #if !defined(ROCKSDB_LITE)
17 #include "test_util/sync_point.h"
18 #endif
19 
20 namespace ROCKSDB_NAMESPACE {
21 
22 class DBBasicTest : public DBTestBase {
23  public:
DBBasicTest()24   DBBasicTest() : DBTestBase("/db_basic_test") {}
25 };
26 
TEST_F(DBBasicTest,OpenWhenOpen)27 TEST_F(DBBasicTest, OpenWhenOpen) {
28   Options options = CurrentOptions();
29   options.env = env_;
30   ROCKSDB_NAMESPACE::DB* db2 = nullptr;
31   ROCKSDB_NAMESPACE::Status s = DB::Open(options, dbname_, &db2);
32 
33   ASSERT_EQ(Status::Code::kIOError, s.code());
34   ASSERT_EQ(Status::SubCode::kNone, s.subcode());
35   ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
36 
37   delete db2;
38 }
39 
40 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,ReadOnlyDB)41 TEST_F(DBBasicTest, ReadOnlyDB) {
42   ASSERT_OK(Put("foo", "v1"));
43   ASSERT_OK(Put("bar", "v2"));
44   ASSERT_OK(Put("foo", "v3"));
45   Close();
46 
47   auto options = CurrentOptions();
48   assert(options.env == env_);
49   ASSERT_OK(ReadOnlyReopen(options));
50   ASSERT_EQ("v3", Get("foo"));
51   ASSERT_EQ("v2", Get("bar"));
52   Iterator* iter = db_->NewIterator(ReadOptions());
53   int count = 0;
54   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
55     ASSERT_OK(iter->status());
56     ++count;
57   }
58   ASSERT_EQ(count, 2);
59   delete iter;
60   Close();
61 
62   // Reopen and flush memtable.
63   Reopen(options);
64   Flush();
65   Close();
66   // Now check keys in read only mode.
67   ASSERT_OK(ReadOnlyReopen(options));
68   ASSERT_EQ("v3", Get("foo"));
69   ASSERT_EQ("v2", Get("bar"));
70   ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
71 }
72 
TEST_F(DBBasicTest,ReadOnlyDBWithWriteDBIdToManifestSet)73 TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
74   ASSERT_OK(Put("foo", "v1"));
75   ASSERT_OK(Put("bar", "v2"));
76   ASSERT_OK(Put("foo", "v3"));
77   Close();
78 
79   auto options = CurrentOptions();
80   options.write_dbid_to_manifest = true;
81   assert(options.env == env_);
82   ASSERT_OK(ReadOnlyReopen(options));
83   std::string db_id1;
84   db_->GetDbIdentity(db_id1);
85   ASSERT_EQ("v3", Get("foo"));
86   ASSERT_EQ("v2", Get("bar"));
87   Iterator* iter = db_->NewIterator(ReadOptions());
88   int count = 0;
89   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
90     ASSERT_OK(iter->status());
91     ++count;
92   }
93   ASSERT_EQ(count, 2);
94   delete iter;
95   Close();
96 
97   // Reopen and flush memtable.
98   Reopen(options);
99   Flush();
100   Close();
101   // Now check keys in read only mode.
102   ASSERT_OK(ReadOnlyReopen(options));
103   ASSERT_EQ("v3", Get("foo"));
104   ASSERT_EQ("v2", Get("bar"));
105   ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
106   std::string db_id2;
107   db_->GetDbIdentity(db_id2);
108   ASSERT_EQ(db_id1, db_id2);
109 }
110 
TEST_F(DBBasicTest,CompactedDB)111 TEST_F(DBBasicTest, CompactedDB) {
112   const uint64_t kFileSize = 1 << 20;
113   Options options = CurrentOptions();
114   options.disable_auto_compactions = true;
115   options.write_buffer_size = kFileSize;
116   options.target_file_size_base = kFileSize;
117   options.max_bytes_for_level_base = 1 << 30;
118   options.compression = kNoCompression;
119   Reopen(options);
120   // 1 L0 file, use CompactedDB if max_open_files = -1
121   ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
122   Flush();
123   Close();
124   ASSERT_OK(ReadOnlyReopen(options));
125   Status s = Put("new", "value");
126   ASSERT_EQ(s.ToString(),
127             "Not implemented: Not supported operation in read only mode.");
128   ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
129   Close();
130   options.max_open_files = -1;
131   ASSERT_OK(ReadOnlyReopen(options));
132   s = Put("new", "value");
133   ASSERT_EQ(s.ToString(),
134             "Not implemented: Not supported in compacted db mode.");
135   ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
136   Close();
137   Reopen(options);
138   // Add more L0 files
139   ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
140   Flush();
141   ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
142   Flush();
143   ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
144   ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
145   Flush();
146   Close();
147 
148   ASSERT_OK(ReadOnlyReopen(options));
149   // Fallback to read-only DB
150   s = Put("new", "value");
151   ASSERT_EQ(s.ToString(),
152             "Not implemented: Not supported operation in read only mode.");
153   Close();
154 
155   // Full compaction
156   Reopen(options);
157   // Add more keys
158   ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
159   ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
160   ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
161   ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
162   db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
163   ASSERT_EQ(3, NumTableFilesAtLevel(1));
164   Close();
165 
166   // CompactedDB
167   ASSERT_OK(ReadOnlyReopen(options));
168   s = Put("new", "value");
169   ASSERT_EQ(s.ToString(),
170             "Not implemented: Not supported in compacted db mode.");
171   ASSERT_EQ("NOT_FOUND", Get("abc"));
172   ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
173   ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
174   ASSERT_EQ("NOT_FOUND", Get("ccc"));
175   ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
176   ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
177   ASSERT_EQ("NOT_FOUND", Get("ggg"));
178   ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
179   ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
180   ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
181   ASSERT_EQ("NOT_FOUND", Get("kkk"));
182 
183   // MultiGet
184   std::vector<std::string> values;
185   std::vector<Status> status_list = dbfull()->MultiGet(
186       ReadOptions(),
187       std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
188                           Slice("ggg"), Slice("iii"), Slice("kkk")}),
189       &values);
190   ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
191   ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
192   ASSERT_OK(status_list[0]);
193   ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
194   ASSERT_TRUE(status_list[1].IsNotFound());
195   ASSERT_OK(status_list[2]);
196   ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
197   ASSERT_TRUE(status_list[3].IsNotFound());
198   ASSERT_OK(status_list[4]);
199   ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
200   ASSERT_TRUE(status_list[5].IsNotFound());
201 
202   Reopen(options);
203   // Add a key
204   ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
205   Close();
206   ASSERT_OK(ReadOnlyReopen(options));
207   s = Put("new", "value");
208   ASSERT_EQ(s.ToString(),
209             "Not implemented: Not supported operation in read only mode.");
210 }
211 
TEST_F(DBBasicTest,LevelLimitReopen)212 TEST_F(DBBasicTest, LevelLimitReopen) {
213   Options options = CurrentOptions();
214   CreateAndReopenWithCF({"pikachu"}, options);
215 
216   const std::string value(1024 * 1024, ' ');
217   int i = 0;
218   while (NumTableFilesAtLevel(2, 1) == 0) {
219     ASSERT_OK(Put(1, Key(i++), value));
220     dbfull()->TEST_WaitForFlushMemTable();
221     dbfull()->TEST_WaitForCompact();
222   }
223 
224   options.num_levels = 1;
225   options.max_bytes_for_level_multiplier_additional.resize(1, 1);
226   Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
227   ASSERT_EQ(s.IsInvalidArgument(), true);
228   ASSERT_EQ(s.ToString(),
229             "Invalid argument: db has more levels than options.num_levels");
230 
231   options.num_levels = 10;
232   options.max_bytes_for_level_multiplier_additional.resize(10, 1);
233   ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
234 }
235 #endif  // ROCKSDB_LITE
236 
TEST_F(DBBasicTest,PutDeleteGet)237 TEST_F(DBBasicTest, PutDeleteGet) {
238   do {
239     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
240     ASSERT_OK(Put(1, "foo", "v1"));
241     ASSERT_EQ("v1", Get(1, "foo"));
242     ASSERT_OK(Put(1, "foo", "v2"));
243     ASSERT_EQ("v2", Get(1, "foo"));
244     ASSERT_OK(Delete(1, "foo"));
245     ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
246   } while (ChangeOptions());
247 }
248 
TEST_F(DBBasicTest,PutSingleDeleteGet)249 TEST_F(DBBasicTest, PutSingleDeleteGet) {
250   do {
251     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
252     ASSERT_OK(Put(1, "foo", "v1"));
253     ASSERT_EQ("v1", Get(1, "foo"));
254     ASSERT_OK(Put(1, "foo2", "v2"));
255     ASSERT_EQ("v2", Get(1, "foo2"));
256     ASSERT_OK(SingleDelete(1, "foo"));
257     ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
258     // Ski FIFO and universal compaction because they do not apply to the test
259     // case. Skip MergePut because single delete does not get removed when it
260     // encounters a merge.
261   } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
262                          kSkipMergePut));
263 }
264 
TEST_F(DBBasicTest,EmptyFlush)265 TEST_F(DBBasicTest, EmptyFlush) {
266   // It is possible to produce empty flushes when using single deletes. Tests
267   // whether empty flushes cause issues.
268   do {
269     Random rnd(301);
270 
271     Options options = CurrentOptions();
272     options.disable_auto_compactions = true;
273     CreateAndReopenWithCF({"pikachu"}, options);
274 
275     Put(1, "a", Slice());
276     SingleDelete(1, "a");
277     ASSERT_OK(Flush(1));
278 
279     ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
280     // Skip FIFO and  universal compaction as they do not apply to the test
281     // case. Skip MergePut because merges cannot be combined with single
282     // deletions.
283   } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
284                          kSkipMergePut));
285 }
286 
TEST_F(DBBasicTest,GetFromVersions)287 TEST_F(DBBasicTest, GetFromVersions) {
288   do {
289     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
290     ASSERT_OK(Put(1, "foo", "v1"));
291     ASSERT_OK(Flush(1));
292     ASSERT_EQ("v1", Get(1, "foo"));
293     ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
294   } while (ChangeOptions());
295 }
296 
297 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,GetSnapshot)298 TEST_F(DBBasicTest, GetSnapshot) {
299   anon::OptionsOverride options_override;
300   options_override.skip_policy = kSkipNoSnapshot;
301   do {
302     CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
303     // Try with both a short key and a long key
304     for (int i = 0; i < 2; i++) {
305       std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
306       ASSERT_OK(Put(1, key, "v1"));
307       const Snapshot* s1 = db_->GetSnapshot();
308       ASSERT_OK(Put(1, key, "v2"));
309       ASSERT_EQ("v2", Get(1, key));
310       ASSERT_EQ("v1", Get(1, key, s1));
311       ASSERT_OK(Flush(1));
312       ASSERT_EQ("v2", Get(1, key));
313       ASSERT_EQ("v1", Get(1, key, s1));
314       db_->ReleaseSnapshot(s1);
315     }
316   } while (ChangeOptions());
317 }
318 #endif  // ROCKSDB_LITE
319 
TEST_F(DBBasicTest,CheckLock)320 TEST_F(DBBasicTest, CheckLock) {
321   do {
322     DB* localdb;
323     Options options = CurrentOptions();
324     ASSERT_OK(TryReopen(options));
325 
326     // second open should fail
327     ASSERT_TRUE(!(DB::Open(options, dbname_, &localdb)).ok());
328   } while (ChangeCompactOptions());
329 }
330 
TEST_F(DBBasicTest,FlushMultipleMemtable)331 TEST_F(DBBasicTest, FlushMultipleMemtable) {
332   do {
333     Options options = CurrentOptions();
334     WriteOptions writeOpt = WriteOptions();
335     writeOpt.disableWAL = true;
336     options.max_write_buffer_number = 4;
337     options.min_write_buffer_number_to_merge = 3;
338     options.max_write_buffer_size_to_maintain = -1;
339     CreateAndReopenWithCF({"pikachu"}, options);
340     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
341     ASSERT_OK(Flush(1));
342     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
343 
344     ASSERT_EQ("v1", Get(1, "foo"));
345     ASSERT_EQ("v1", Get(1, "bar"));
346     ASSERT_OK(Flush(1));
347   } while (ChangeCompactOptions());
348 }
349 
TEST_F(DBBasicTest,FlushEmptyColumnFamily)350 TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
351   // Block flush thread and disable compaction thread
352   env_->SetBackgroundThreads(1, Env::HIGH);
353   env_->SetBackgroundThreads(1, Env::LOW);
354   test::SleepingBackgroundTask sleeping_task_low;
355   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
356                  Env::Priority::LOW);
357   test::SleepingBackgroundTask sleeping_task_high;
358   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
359                  &sleeping_task_high, Env::Priority::HIGH);
360 
361   Options options = CurrentOptions();
362   // disable compaction
363   options.disable_auto_compactions = true;
364   WriteOptions writeOpt = WriteOptions();
365   writeOpt.disableWAL = true;
366   options.max_write_buffer_number = 2;
367   options.min_write_buffer_number_to_merge = 1;
368   options.max_write_buffer_size_to_maintain =
369       static_cast<int64_t>(options.write_buffer_size);
370   CreateAndReopenWithCF({"pikachu"}, options);
371 
372   // Compaction can still go through even if no thread can flush the
373   // mem table.
374   ASSERT_OK(Flush(0));
375   ASSERT_OK(Flush(1));
376 
377   // Insert can go through
378   ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
379   ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
380 
381   ASSERT_EQ("v1", Get(0, "foo"));
382   ASSERT_EQ("v1", Get(1, "bar"));
383 
384   sleeping_task_high.WakeUp();
385   sleeping_task_high.WaitUntilDone();
386 
387   // Flush can still go through.
388   ASSERT_OK(Flush(0));
389   ASSERT_OK(Flush(1));
390 
391   sleeping_task_low.WakeUp();
392   sleeping_task_low.WaitUntilDone();
393 }
394 
TEST_F(DBBasicTest,FLUSH)395 TEST_F(DBBasicTest, FLUSH) {
396   do {
397     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
398     WriteOptions writeOpt = WriteOptions();
399     writeOpt.disableWAL = true;
400     SetPerfLevel(kEnableTime);
401     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
402     // this will now also flush the last 2 writes
403     ASSERT_OK(Flush(1));
404     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
405 
406     get_perf_context()->Reset();
407     Get(1, "foo");
408     ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
409     ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
410 
411     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
412     ASSERT_EQ("v1", Get(1, "foo"));
413     ASSERT_EQ("v1", Get(1, "bar"));
414 
415     writeOpt.disableWAL = true;
416     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
417     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
418     ASSERT_OK(Flush(1));
419 
420     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
421     ASSERT_EQ("v2", Get(1, "bar"));
422     get_perf_context()->Reset();
423     ASSERT_EQ("v2", Get(1, "foo"));
424     ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
425 
426     writeOpt.disableWAL = false;
427     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
428     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
429     ASSERT_OK(Flush(1));
430 
431     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
432     // 'foo' should be there because its put
433     // has WAL enabled.
434     ASSERT_EQ("v3", Get(1, "foo"));
435     ASSERT_EQ("v3", Get(1, "bar"));
436 
437     SetPerfLevel(kDisable);
438   } while (ChangeCompactOptions());
439 }
440 
TEST_F(DBBasicTest,ManifestRollOver)441 TEST_F(DBBasicTest, ManifestRollOver) {
442   do {
443     Options options;
444     options.max_manifest_file_size = 10;  // 10 bytes
445     options = CurrentOptions(options);
446     CreateAndReopenWithCF({"pikachu"}, options);
447     {
448       ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
449       ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
450       ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
451       uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
452       ASSERT_OK(Flush(1));  // This should trigger LogAndApply.
453       uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
454       ASSERT_GT(manifest_after_flush, manifest_before_flush);
455       ReopenWithColumnFamilies({"default", "pikachu"}, options);
456       ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
457       // check if a new manifest file got inserted or not.
458       ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
459       ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
460       ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
461     }
462   } while (ChangeCompactOptions());
463 }
464 
TEST_F(DBBasicTest,IdentityAcrossRestarts1)465 TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
466   do {
467     std::string id1;
468     ASSERT_OK(db_->GetDbIdentity(id1));
469 
470     Options options = CurrentOptions();
471     Reopen(options);
472     std::string id2;
473     ASSERT_OK(db_->GetDbIdentity(id2));
474     // id1 should match id2 because identity was not regenerated
475     ASSERT_EQ(id1.compare(id2), 0);
476 
477     std::string idfilename = IdentityFileName(dbname_);
478     ASSERT_OK(env_->DeleteFile(idfilename));
479     Reopen(options);
480     std::string id3;
481     ASSERT_OK(db_->GetDbIdentity(id3));
482     if (options.write_dbid_to_manifest) {
483       ASSERT_EQ(id1.compare(id3), 0);
484     } else {
485       // id1 should NOT match id3 because identity was regenerated
486       ASSERT_NE(id1.compare(id3), 0);
487     }
488   } while (ChangeCompactOptions());
489 }
490 
TEST_F(DBBasicTest,IdentityAcrossRestarts2)491 TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
492   do {
493     std::string id1;
494     ASSERT_OK(db_->GetDbIdentity(id1));
495 
496     Options options = CurrentOptions();
497     options.write_dbid_to_manifest = true;
498     Reopen(options);
499     std::string id2;
500     ASSERT_OK(db_->GetDbIdentity(id2));
501     // id1 should match id2 because identity was not regenerated
502     ASSERT_EQ(id1.compare(id2), 0);
503 
504     std::string idfilename = IdentityFileName(dbname_);
505     ASSERT_OK(env_->DeleteFile(idfilename));
506     Reopen(options);
507     std::string id3;
508     ASSERT_OK(db_->GetDbIdentity(id3));
509     // id1 should NOT match id3 because identity was regenerated
510     ASSERT_EQ(id1, id3);
511   } while (ChangeCompactOptions());
512 }
513 
514 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,Snapshot)515 TEST_F(DBBasicTest, Snapshot) {
516   anon::OptionsOverride options_override;
517   options_override.skip_policy = kSkipNoSnapshot;
518   do {
519     CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
520     Put(0, "foo", "0v1");
521     Put(1, "foo", "1v1");
522 
523     const Snapshot* s1 = db_->GetSnapshot();
524     ASSERT_EQ(1U, GetNumSnapshots());
525     uint64_t time_snap1 = GetTimeOldestSnapshots();
526     ASSERT_GT(time_snap1, 0U);
527     ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
528     Put(0, "foo", "0v2");
529     Put(1, "foo", "1v2");
530 
531     env_->addon_time_.fetch_add(1);
532 
533     const Snapshot* s2 = db_->GetSnapshot();
534     ASSERT_EQ(2U, GetNumSnapshots());
535     ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
536     ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
537     Put(0, "foo", "0v3");
538     Put(1, "foo", "1v3");
539 
540     {
541       ManagedSnapshot s3(db_);
542       ASSERT_EQ(3U, GetNumSnapshots());
543       ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
544       ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
545 
546       Put(0, "foo", "0v4");
547       Put(1, "foo", "1v4");
548       ASSERT_EQ("0v1", Get(0, "foo", s1));
549       ASSERT_EQ("1v1", Get(1, "foo", s1));
550       ASSERT_EQ("0v2", Get(0, "foo", s2));
551       ASSERT_EQ("1v2", Get(1, "foo", s2));
552       ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
553       ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
554       ASSERT_EQ("0v4", Get(0, "foo"));
555       ASSERT_EQ("1v4", Get(1, "foo"));
556     }
557 
558     ASSERT_EQ(2U, GetNumSnapshots());
559     ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
560     ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
561     ASSERT_EQ("0v1", Get(0, "foo", s1));
562     ASSERT_EQ("1v1", Get(1, "foo", s1));
563     ASSERT_EQ("0v2", Get(0, "foo", s2));
564     ASSERT_EQ("1v2", Get(1, "foo", s2));
565     ASSERT_EQ("0v4", Get(0, "foo"));
566     ASSERT_EQ("1v4", Get(1, "foo"));
567 
568     db_->ReleaseSnapshot(s1);
569     ASSERT_EQ("0v2", Get(0, "foo", s2));
570     ASSERT_EQ("1v2", Get(1, "foo", s2));
571     ASSERT_EQ("0v4", Get(0, "foo"));
572     ASSERT_EQ("1v4", Get(1, "foo"));
573     ASSERT_EQ(1U, GetNumSnapshots());
574     ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
575     ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
576 
577     db_->ReleaseSnapshot(s2);
578     ASSERT_EQ(0U, GetNumSnapshots());
579     ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
580     ASSERT_EQ("0v4", Get(0, "foo"));
581     ASSERT_EQ("1v4", Get(1, "foo"));
582   } while (ChangeOptions());
583 }
584 
585 #endif  // ROCKSDB_LITE
586 
TEST_F(DBBasicTest,CompactBetweenSnapshots)587 TEST_F(DBBasicTest, CompactBetweenSnapshots) {
588   anon::OptionsOverride options_override;
589   options_override.skip_policy = kSkipNoSnapshot;
590   do {
591     Options options = CurrentOptions(options_override);
592     options.disable_auto_compactions = true;
593     CreateAndReopenWithCF({"pikachu"}, options);
594     Random rnd(301);
595     FillLevels("a", "z", 1);
596 
597     Put(1, "foo", "first");
598     const Snapshot* snapshot1 = db_->GetSnapshot();
599     Put(1, "foo", "second");
600     Put(1, "foo", "third");
601     Put(1, "foo", "fourth");
602     const Snapshot* snapshot2 = db_->GetSnapshot();
603     Put(1, "foo", "fifth");
604     Put(1, "foo", "sixth");
605 
606     // All entries (including duplicates) exist
607     // before any compaction or flush is triggered.
608     ASSERT_EQ(AllEntriesFor("foo", 1),
609               "[ sixth, fifth, fourth, third, second, first ]");
610     ASSERT_EQ("sixth", Get(1, "foo"));
611     ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
612     ASSERT_EQ("first", Get(1, "foo", snapshot1));
613 
614     // After a flush, "second", "third" and "fifth" should
615     // be removed
616     ASSERT_OK(Flush(1));
617     ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
618 
619     // after we release the snapshot1, only two values left
620     db_->ReleaseSnapshot(snapshot1);
621     FillLevels("a", "z", 1);
622     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
623                            nullptr);
624 
625     // We have only one valid snapshot snapshot2. Since snapshot1 is
626     // not valid anymore, "first" should be removed by a compaction.
627     ASSERT_EQ("sixth", Get(1, "foo"));
628     ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
629     ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
630 
631     // after we release the snapshot2, only one value should be left
632     db_->ReleaseSnapshot(snapshot2);
633     FillLevels("a", "z", 1);
634     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
635                            nullptr);
636     ASSERT_EQ("sixth", Get(1, "foo"));
637     ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
638   } while (ChangeOptions(kSkipFIFOCompaction));
639 }
640 
TEST_F(DBBasicTest,DBOpen_Options)641 TEST_F(DBBasicTest, DBOpen_Options) {
642   Options options = CurrentOptions();
643   Close();
644   Destroy(options);
645 
646   // Does not exist, and create_if_missing == false: error
647   DB* db = nullptr;
648   options.create_if_missing = false;
649   Status s = DB::Open(options, dbname_, &db);
650   ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
651   ASSERT_TRUE(db == nullptr);
652 
653   // Does not exist, and create_if_missing == true: OK
654   options.create_if_missing = true;
655   s = DB::Open(options, dbname_, &db);
656   ASSERT_OK(s);
657   ASSERT_TRUE(db != nullptr);
658 
659   delete db;
660   db = nullptr;
661 
662   // Does exist, and error_if_exists == true: error
663   options.create_if_missing = false;
664   options.error_if_exists = true;
665   s = DB::Open(options, dbname_, &db);
666   ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
667   ASSERT_TRUE(db == nullptr);
668 
669   // Does exist, and error_if_exists == false: OK
670   options.create_if_missing = true;
671   options.error_if_exists = false;
672   s = DB::Open(options, dbname_, &db);
673   ASSERT_OK(s);
674   ASSERT_TRUE(db != nullptr);
675 
676   delete db;
677   db = nullptr;
678 }
679 
TEST_F(DBBasicTest,CompactOnFlush)680 TEST_F(DBBasicTest, CompactOnFlush) {
681   anon::OptionsOverride options_override;
682   options_override.skip_policy = kSkipNoSnapshot;
683   do {
684     Options options = CurrentOptions(options_override);
685     options.disable_auto_compactions = true;
686     CreateAndReopenWithCF({"pikachu"}, options);
687 
688     Put(1, "foo", "v1");
689     ASSERT_OK(Flush(1));
690     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
691 
692     // Write two new keys
693     Put(1, "a", "begin");
694     Put(1, "z", "end");
695     Flush(1);
696 
697     // Case1: Delete followed by a put
698     Delete(1, "foo");
699     Put(1, "foo", "v2");
700     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
701 
702     // After the current memtable is flushed, the DEL should
703     // have been removed
704     ASSERT_OK(Flush(1));
705     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
706 
707     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
708                            nullptr);
709     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
710 
711     // Case 2: Delete followed by another delete
712     Delete(1, "foo");
713     Delete(1, "foo");
714     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
715     ASSERT_OK(Flush(1));
716     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
717     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
718                            nullptr);
719     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
720 
721     // Case 3: Put followed by a delete
722     Put(1, "foo", "v3");
723     Delete(1, "foo");
724     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
725     ASSERT_OK(Flush(1));
726     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
727     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
728                            nullptr);
729     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
730 
731     // Case 4: Put followed by another Put
732     Put(1, "foo", "v4");
733     Put(1, "foo", "v5");
734     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
735     ASSERT_OK(Flush(1));
736     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
737     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
738                            nullptr);
739     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
740 
741     // clear database
742     Delete(1, "foo");
743     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
744                            nullptr);
745     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
746 
747     // Case 5: Put followed by snapshot followed by another Put
748     // Both puts should remain.
749     Put(1, "foo", "v6");
750     const Snapshot* snapshot = db_->GetSnapshot();
751     Put(1, "foo", "v7");
752     ASSERT_OK(Flush(1));
753     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
754     db_->ReleaseSnapshot(snapshot);
755 
756     // clear database
757     Delete(1, "foo");
758     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
759                            nullptr);
760     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
761 
762     // Case 5: snapshot followed by a put followed by another Put
763     // Only the last put should remain.
764     const Snapshot* snapshot1 = db_->GetSnapshot();
765     Put(1, "foo", "v8");
766     Put(1, "foo", "v9");
767     ASSERT_OK(Flush(1));
768     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
769     db_->ReleaseSnapshot(snapshot1);
770   } while (ChangeCompactOptions());
771 }
772 
TEST_F(DBBasicTest,FlushOneColumnFamily)773 TEST_F(DBBasicTest, FlushOneColumnFamily) {
774   Options options = CurrentOptions();
775   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
776                          "alyosha", "popovich"},
777                         options);
778 
779   ASSERT_OK(Put(0, "Default", "Default"));
780   ASSERT_OK(Put(1, "pikachu", "pikachu"));
781   ASSERT_OK(Put(2, "ilya", "ilya"));
782   ASSERT_OK(Put(3, "muromec", "muromec"));
783   ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
784   ASSERT_OK(Put(5, "nikitich", "nikitich"));
785   ASSERT_OK(Put(6, "alyosha", "alyosha"));
786   ASSERT_OK(Put(7, "popovich", "popovich"));
787 
788   for (int i = 0; i < 8; ++i) {
789     Flush(i);
790     auto tables = ListTableFiles(env_, dbname_);
791     ASSERT_EQ(tables.size(), i + 1U);
792   }
793 }
794 
TEST_F(DBBasicTest,MultiGetSimple)795 TEST_F(DBBasicTest, MultiGetSimple) {
796   do {
797     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
798     SetPerfLevel(kEnableCount);
799     ASSERT_OK(Put(1, "k1", "v1"));
800     ASSERT_OK(Put(1, "k2", "v2"));
801     ASSERT_OK(Put(1, "k3", "v3"));
802     ASSERT_OK(Put(1, "k4", "v4"));
803     ASSERT_OK(Delete(1, "k4"));
804     ASSERT_OK(Put(1, "k5", "v5"));
805     ASSERT_OK(Delete(1, "no_key"));
806 
807     std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
808 
809     std::vector<std::string> values(20, "Temporary data to be overwritten");
810     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
811 
812     get_perf_context()->Reset();
813     std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
814     ASSERT_EQ(values.size(), keys.size());
815     ASSERT_EQ(values[0], "v1");
816     ASSERT_EQ(values[1], "v2");
817     ASSERT_EQ(values[2], "v3");
818     ASSERT_EQ(values[4], "v5");
819     // four kv pairs * two bytes per value
820     ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
821 
822     ASSERT_OK(s[0]);
823     ASSERT_OK(s[1]);
824     ASSERT_OK(s[2]);
825     ASSERT_TRUE(s[3].IsNotFound());
826     ASSERT_OK(s[4]);
827     ASSERT_TRUE(s[5].IsNotFound());
828     SetPerfLevel(kDisable);
829   } while (ChangeCompactOptions());
830 }
831 
TEST_F(DBBasicTest,MultiGetEmpty)832 TEST_F(DBBasicTest, MultiGetEmpty) {
833   do {
834     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
835     // Empty Key Set
836     std::vector<Slice> keys;
837     std::vector<std::string> values;
838     std::vector<ColumnFamilyHandle*> cfs;
839     std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
840     ASSERT_EQ(s.size(), 0U);
841 
842     // Empty Database, Empty Key Set
843     Options options = CurrentOptions();
844     options.create_if_missing = true;
845     DestroyAndReopen(options);
846     CreateAndReopenWithCF({"pikachu"}, options);
847     s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
848     ASSERT_EQ(s.size(), 0U);
849 
850     // Empty Database, Search for Keys
851     keys.resize(2);
852     keys[0] = "a";
853     keys[1] = "b";
854     cfs.push_back(handles_[0]);
855     cfs.push_back(handles_[1]);
856     s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
857     ASSERT_EQ(static_cast<int>(s.size()), 2);
858     ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
859   } while (ChangeCompactOptions());
860 }
861 
TEST_F(DBBasicTest,ChecksumTest)862 TEST_F(DBBasicTest, ChecksumTest) {
863   BlockBasedTableOptions table_options;
864   Options options = CurrentOptions();
865   // change when new checksum type added
866   int max_checksum = static_cast<int>(kxxHash64);
867   const int kNumPerFile = 2;
868 
869   // generate one table with each type of checksum
870   for (int i = 0; i <= max_checksum; ++i) {
871     table_options.checksum = static_cast<ChecksumType>(i);
872     options.table_factory.reset(NewBlockBasedTableFactory(table_options));
873     Reopen(options);
874     for (int j = 0; j < kNumPerFile; ++j) {
875       ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
876     }
877     ASSERT_OK(Flush());
878   }
879 
880   // with each valid checksum type setting...
881   for (int i = 0; i <= max_checksum; ++i) {
882     table_options.checksum = static_cast<ChecksumType>(i);
883     options.table_factory.reset(NewBlockBasedTableFactory(table_options));
884     Reopen(options);
885     // verify every type of checksum (should be regardless of that setting)
886     for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
887       ASSERT_EQ(Key(j), Get(Key(j)));
888     }
889   }
890 }
891 
892 // On Windows you can have either memory mapped file or a file
893 // with unbuffered access. So this asserts and does not make
894 // sense to run
895 #ifndef OS_WIN
TEST_F(DBBasicTest,MmapAndBufferOptions)896 TEST_F(DBBasicTest, MmapAndBufferOptions) {
897   if (!IsMemoryMappedAccessSupported()) {
898     return;
899   }
900   Options options = CurrentOptions();
901 
902   options.use_direct_reads = true;
903   options.allow_mmap_reads = true;
904   ASSERT_NOK(TryReopen(options));
905 
906   // All other combinations are acceptable
907   options.use_direct_reads = false;
908   ASSERT_OK(TryReopen(options));
909 
910   if (IsDirectIOSupported()) {
911     options.use_direct_reads = true;
912     options.allow_mmap_reads = false;
913     ASSERT_OK(TryReopen(options));
914   }
915 
916   options.use_direct_reads = false;
917   ASSERT_OK(TryReopen(options));
918 }
919 #endif
920 
921 class TestEnv : public EnvWrapper {
922   public:
TestEnv(Env * base_env)923    explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
924 
925    class TestLogger : public Logger {
926     public:
927      using Logger::Logv;
TestLogger(TestEnv * env_ptr)928      explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger()929      ~TestLogger() override {
930        if (!closed_) {
931          CloseHelper();
932        }
933      }
Logv(const char *,va_list)934      void Logv(const char* /*format*/, va_list /*ap*/) override {}
935 
936     protected:
CloseImpl()937      Status CloseImpl() override { return CloseHelper(); }
938 
939     private:
CloseHelper()940      Status CloseHelper() {
941        env->CloseCountInc();
942        ;
943        return Status::IOError();
944      }
945      TestEnv* env;
946    };
947 
CloseCountInc()948     void CloseCountInc() { close_count++; }
949 
GetCloseCount()950     int GetCloseCount() { return close_count; }
951 
NewLogger(const std::string &,std::shared_ptr<Logger> * result)952     Status NewLogger(const std::string& /*fname*/,
953                      std::shared_ptr<Logger>* result) override {
954       result->reset(new TestLogger(this));
955       return Status::OK();
956     }
957 
958    private:
959     int close_count;
960 };
961 
TEST_F(DBBasicTest,DBClose)962 TEST_F(DBBasicTest, DBClose) {
963   Options options = GetDefaultOptions();
964   std::string dbname = test::PerThreadDBPath("db_close_test");
965   ASSERT_OK(DestroyDB(dbname, options));
966 
967   DB* db = nullptr;
968   TestEnv* env = new TestEnv(env_);
969   std::unique_ptr<TestEnv> local_env_guard(env);
970   options.create_if_missing = true;
971   options.env = env;
972   Status s = DB::Open(options, dbname, &db);
973   ASSERT_OK(s);
974   ASSERT_TRUE(db != nullptr);
975 
976   s = db->Close();
977   ASSERT_EQ(env->GetCloseCount(), 1);
978   ASSERT_EQ(s, Status::IOError());
979 
980   delete db;
981   ASSERT_EQ(env->GetCloseCount(), 1);
982 
983   // Do not call DB::Close() and ensure our logger Close() still gets called
984   s = DB::Open(options, dbname, &db);
985   ASSERT_OK(s);
986   ASSERT_TRUE(db != nullptr);
987   delete db;
988   ASSERT_EQ(env->GetCloseCount(), 2);
989 
990   // Provide our own logger and ensure DB::Close() does not close it
991   options.info_log.reset(new TestEnv::TestLogger(env));
992   options.create_if_missing = false;
993   s = DB::Open(options, dbname, &db);
994   ASSERT_OK(s);
995   ASSERT_TRUE(db != nullptr);
996 
997   s = db->Close();
998   ASSERT_EQ(s, Status::OK());
999   delete db;
1000   ASSERT_EQ(env->GetCloseCount(), 2);
1001   options.info_log.reset();
1002   ASSERT_EQ(env->GetCloseCount(), 3);
1003 }
1004 
TEST_F(DBBasicTest,DBCloseFlushError)1005 TEST_F(DBBasicTest, DBCloseFlushError) {
1006   std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
1007       new FaultInjectionTestEnv(env_));
1008   Options options = GetDefaultOptions();
1009   options.create_if_missing = true;
1010   options.manual_wal_flush = true;
1011   options.write_buffer_size=100;
1012   options.env = fault_injection_env.get();
1013 
1014   Reopen(options);
1015   ASSERT_OK(Put("key1", "value1"));
1016   ASSERT_OK(Put("key2", "value2"));
1017   ASSERT_OK(dbfull()->TEST_SwitchMemtable());
1018   ASSERT_OK(Put("key3", "value3"));
1019   fault_injection_env->SetFilesystemActive(false);
1020   Status s = dbfull()->Close();
1021   fault_injection_env->SetFilesystemActive(true);
1022   ASSERT_NE(s, Status::OK());
1023 
1024   Destroy(options);
1025 }
1026 
1027 class DBMultiGetTestWithParam : public DBBasicTest,
1028                                 public testing::WithParamInterface<bool> {};
1029 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCF)1030 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
1031   Options options = CurrentOptions();
1032   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1033                          "alyosha", "popovich"},
1034                         options);
1035   // <CF, key, value> tuples
1036   std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
1037   static const int num_keys = 24;
1038   cf_kv_vec.reserve(num_keys);
1039 
1040   for (int i = 0; i < num_keys; ++i) {
1041     int cf = i / 3;
1042     int cf_key = 1 % 3;
1043     cf_kv_vec.emplace_back(std::make_tuple(
1044         cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
1045         "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
1046     ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1047                   std::get<2>(cf_kv_vec[i])));
1048   }
1049 
1050   int get_sv_count = 0;
1051   ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
1052   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1053       "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1054         if (++get_sv_count == 2) {
1055           // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
1056           // is forced to repeat the process
1057           for (int i = 0; i < num_keys; ++i) {
1058             int cf = i / 3;
1059             int cf_key = i % 8;
1060             if (cf_key == 0) {
1061               ASSERT_OK(Flush(cf));
1062             }
1063             ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1064                           std::get<2>(cf_kv_vec[i]) + "_2"));
1065           }
1066         }
1067         if (get_sv_count == 11) {
1068           for (int i = 0; i < 8; ++i) {
1069             auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1070                             db->GetColumnFamilyHandle(i))
1071                             ->cfd();
1072             ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1073           }
1074         }
1075       });
1076   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1077 
1078   std::vector<int> cfs;
1079   std::vector<std::string> keys;
1080   std::vector<std::string> values;
1081 
1082   for (int i = 0; i < num_keys; ++i) {
1083     cfs.push_back(std::get<0>(cf_kv_vec[i]));
1084     keys.push_back(std::get<1>(cf_kv_vec[i]));
1085   }
1086 
1087   values = MultiGet(cfs, keys, nullptr, GetParam());
1088   ASSERT_EQ(values.size(), num_keys);
1089   for (unsigned int j = 0; j < values.size(); ++j) {
1090     ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
1091   }
1092 
1093   keys.clear();
1094   cfs.clear();
1095   cfs.push_back(std::get<0>(cf_kv_vec[0]));
1096   keys.push_back(std::get<1>(cf_kv_vec[0]));
1097   cfs.push_back(std::get<0>(cf_kv_vec[3]));
1098   keys.push_back(std::get<1>(cf_kv_vec[3]));
1099   cfs.push_back(std::get<0>(cf_kv_vec[4]));
1100   keys.push_back(std::get<1>(cf_kv_vec[4]));
1101   values = MultiGet(cfs, keys, nullptr, GetParam());
1102   ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
1103   ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
1104   ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
1105 
1106   keys.clear();
1107   cfs.clear();
1108   cfs.push_back(std::get<0>(cf_kv_vec[7]));
1109   keys.push_back(std::get<1>(cf_kv_vec[7]));
1110   cfs.push_back(std::get<0>(cf_kv_vec[6]));
1111   keys.push_back(std::get<1>(cf_kv_vec[6]));
1112   cfs.push_back(std::get<0>(cf_kv_vec[1]));
1113   keys.push_back(std::get<1>(cf_kv_vec[1]));
1114   values = MultiGet(cfs, keys, nullptr, GetParam());
1115   ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
1116   ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
1117   ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
1118 
1119   for (int cf = 0; cf < 8; ++cf) {
1120     auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1121                     reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(cf))
1122                     ->cfd();
1123     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1124     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
1125   }
1126 }
1127 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFMutex)1128 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
1129   Options options = CurrentOptions();
1130   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1131                          "alyosha", "popovich"},
1132                         options);
1133 
1134   for (int i = 0; i < 8; ++i) {
1135     ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1136                   "cf" + std::to_string(i) + "_val"));
1137   }
1138 
1139   int get_sv_count = 0;
1140   int retries = 0;
1141   bool last_try = false;
1142   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1143       "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
1144         last_try = true;
1145         ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1146       });
1147   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1148       "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1149         if (last_try) {
1150           return;
1151         }
1152         if (++get_sv_count == 2) {
1153           ++retries;
1154           get_sv_count = 0;
1155           for (int i = 0; i < 8; ++i) {
1156             ASSERT_OK(Flush(i));
1157             ASSERT_OK(Put(
1158                 i, "cf" + std::to_string(i) + "_key",
1159                 "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
1160           }
1161         }
1162       });
1163   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1164 
1165   std::vector<int> cfs;
1166   std::vector<std::string> keys;
1167   std::vector<std::string> values;
1168 
1169   for (int i = 0; i < 8; ++i) {
1170     cfs.push_back(i);
1171     keys.push_back("cf" + std::to_string(i) + "_key");
1172   }
1173 
1174   values = MultiGet(cfs, keys, nullptr, GetParam());
1175   ASSERT_TRUE(last_try);
1176   ASSERT_EQ(values.size(), 8);
1177   for (unsigned int j = 0; j < values.size(); ++j) {
1178     ASSERT_EQ(values[j],
1179               "cf" + std::to_string(j) + "_val" + std::to_string(retries));
1180   }
1181   for (int i = 0; i < 8; ++i) {
1182     auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1183                     reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1184                     ->cfd();
1185     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1186   }
1187 }
1188 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFSnapshot)1189 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
1190   Options options = CurrentOptions();
1191   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1192                          "alyosha", "popovich"},
1193                         options);
1194 
1195   for (int i = 0; i < 8; ++i) {
1196     ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1197                   "cf" + std::to_string(i) + "_val"));
1198   }
1199 
1200   int get_sv_count = 0;
1201   ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
1202   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1203       "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1204         if (++get_sv_count == 2) {
1205           for (int i = 0; i < 8; ++i) {
1206             ASSERT_OK(Flush(i));
1207             ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1208                           "cf" + std::to_string(i) + "_val2"));
1209           }
1210         }
1211         if (get_sv_count == 8) {
1212           for (int i = 0; i < 8; ++i) {
1213             auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1214                             db->GetColumnFamilyHandle(i))
1215                             ->cfd();
1216             ASSERT_TRUE(
1217                 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
1218                 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
1219           }
1220         }
1221       });
1222   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1223 
1224   std::vector<int> cfs;
1225   std::vector<std::string> keys;
1226   std::vector<std::string> values;
1227 
1228   for (int i = 0; i < 8; ++i) {
1229     cfs.push_back(i);
1230     keys.push_back("cf" + std::to_string(i) + "_key");
1231   }
1232 
1233   const Snapshot* snapshot = db_->GetSnapshot();
1234   values = MultiGet(cfs, keys, snapshot, GetParam());
1235   db_->ReleaseSnapshot(snapshot);
1236   ASSERT_EQ(values.size(), 8);
1237   for (unsigned int j = 0; j < values.size(); ++j) {
1238     ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
1239   }
1240   for (int i = 0; i < 8; ++i) {
1241     auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1242                     reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1243                     ->cfd();
1244     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1245   }
1246 }
1247 
1248 INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
1249                         testing::Bool());
1250 
TEST_F(DBBasicTest,MultiGetBatchedSimpleUnsorted)1251 TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
1252   do {
1253     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1254     SetPerfLevel(kEnableCount);
1255     ASSERT_OK(Put(1, "k1", "v1"));
1256     ASSERT_OK(Put(1, "k2", "v2"));
1257     ASSERT_OK(Put(1, "k3", "v3"));
1258     ASSERT_OK(Put(1, "k4", "v4"));
1259     ASSERT_OK(Delete(1, "k4"));
1260     ASSERT_OK(Put(1, "k5", "v5"));
1261     ASSERT_OK(Delete(1, "no_key"));
1262 
1263     get_perf_context()->Reset();
1264 
1265     std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
1266     std::vector<PinnableSlice> values(keys.size());
1267     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1268     std::vector<Status> s(keys.size());
1269 
1270     db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1271                   values.data(), s.data(), false);
1272 
1273     ASSERT_EQ(values.size(), keys.size());
1274     ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
1275     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
1276     ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1277     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1278     // four kv pairs * two bytes per value
1279     ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1280 
1281     ASSERT_TRUE(s[0].IsNotFound());
1282     ASSERT_OK(s[1]);
1283     ASSERT_TRUE(s[2].IsNotFound());
1284     ASSERT_OK(s[3]);
1285     ASSERT_OK(s[4]);
1286     ASSERT_OK(s[5]);
1287 
1288     SetPerfLevel(kDisable);
1289   } while (ChangeCompactOptions());
1290 }
1291 
TEST_F(DBBasicTest,MultiGetBatchedSimpleSorted)1292 TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
1293   do {
1294     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1295     SetPerfLevel(kEnableCount);
1296     ASSERT_OK(Put(1, "k1", "v1"));
1297     ASSERT_OK(Put(1, "k2", "v2"));
1298     ASSERT_OK(Put(1, "k3", "v3"));
1299     ASSERT_OK(Put(1, "k4", "v4"));
1300     ASSERT_OK(Delete(1, "k4"));
1301     ASSERT_OK(Put(1, "k5", "v5"));
1302     ASSERT_OK(Delete(1, "no_key"));
1303 
1304     get_perf_context()->Reset();
1305 
1306     std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
1307     std::vector<PinnableSlice> values(keys.size());
1308     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1309     std::vector<Status> s(keys.size());
1310 
1311     db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1312                   values.data(), s.data(), true);
1313 
1314     ASSERT_EQ(values.size(), keys.size());
1315     ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
1316     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
1317     ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1318     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
1319     // four kv pairs * two bytes per value
1320     ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1321 
1322     ASSERT_OK(s[0]);
1323     ASSERT_OK(s[1]);
1324     ASSERT_OK(s[2]);
1325     ASSERT_TRUE(s[3].IsNotFound());
1326     ASSERT_OK(s[4]);
1327     ASSERT_TRUE(s[5].IsNotFound());
1328 
1329     SetPerfLevel(kDisable);
1330   } while (ChangeCompactOptions());
1331 }
1332 
TEST_F(DBBasicTest,MultiGetBatchedMultiLevel)1333 TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
1334   Options options = CurrentOptions();
1335   options.disable_auto_compactions = true;
1336   Reopen(options);
1337   int num_keys = 0;
1338 
1339   for (int i = 0; i < 128; ++i) {
1340     ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1341     num_keys++;
1342     if (num_keys == 8) {
1343       Flush();
1344       num_keys = 0;
1345     }
1346   }
1347   if (num_keys > 0) {
1348     Flush();
1349     num_keys = 0;
1350   }
1351   MoveFilesToLevel(2);
1352 
1353   for (int i = 0; i < 128; i += 3) {
1354     ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1355     num_keys++;
1356     if (num_keys == 8) {
1357       Flush();
1358       num_keys = 0;
1359     }
1360   }
1361   if (num_keys > 0) {
1362     Flush();
1363     num_keys = 0;
1364   }
1365   MoveFilesToLevel(1);
1366 
1367   for (int i = 0; i < 128; i += 5) {
1368     ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1369     num_keys++;
1370     if (num_keys == 8) {
1371       Flush();
1372       num_keys = 0;
1373     }
1374   }
1375   if (num_keys > 0) {
1376     Flush();
1377     num_keys = 0;
1378   }
1379   ASSERT_EQ(0, num_keys);
1380 
1381   for (int i = 0; i < 128; i += 9) {
1382     ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1383   }
1384 
1385   std::vector<std::string> keys;
1386   std::vector<std::string> values;
1387 
1388   for (int i = 64; i < 80; ++i) {
1389     keys.push_back("key_" + std::to_string(i));
1390   }
1391 
1392   values = MultiGet(keys, nullptr);
1393   ASSERT_EQ(values.size(), 16);
1394   for (unsigned int j = 0; j < values.size(); ++j) {
1395     int key = j + 64;
1396     if (key % 9 == 0) {
1397       ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
1398     } else if (key % 5 == 0) {
1399       ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
1400     } else if (key % 3 == 0) {
1401       ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
1402     } else {
1403       ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
1404     }
1405   }
1406 }
1407 
TEST_F(DBBasicTest,MultiGetBatchedMultiLevelMerge)1408 TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
1409   Options options = CurrentOptions();
1410   options.disable_auto_compactions = true;
1411   options.merge_operator = MergeOperators::CreateStringAppendOperator();
1412   BlockBasedTableOptions bbto;
1413   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1414   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1415   Reopen(options);
1416   int num_keys = 0;
1417 
1418   for (int i = 0; i < 128; ++i) {
1419     ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1420     num_keys++;
1421     if (num_keys == 8) {
1422       Flush();
1423       num_keys = 0;
1424     }
1425   }
1426   if (num_keys > 0) {
1427     Flush();
1428     num_keys = 0;
1429   }
1430   MoveFilesToLevel(2);
1431 
1432   for (int i = 0; i < 128; i += 3) {
1433     ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1434     num_keys++;
1435     if (num_keys == 8) {
1436       Flush();
1437       num_keys = 0;
1438     }
1439   }
1440   if (num_keys > 0) {
1441     Flush();
1442     num_keys = 0;
1443   }
1444   MoveFilesToLevel(1);
1445 
1446   for (int i = 0; i < 128; i += 5) {
1447     ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1448     num_keys++;
1449     if (num_keys == 8) {
1450       Flush();
1451       num_keys = 0;
1452     }
1453   }
1454   if (num_keys > 0) {
1455     Flush();
1456     num_keys = 0;
1457   }
1458   ASSERT_EQ(0, num_keys);
1459 
1460   for (int i = 0; i < 128; i += 9) {
1461     ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1462   }
1463 
1464   std::vector<std::string> keys;
1465   std::vector<std::string> values;
1466 
1467   for (int i = 32; i < 80; ++i) {
1468     keys.push_back("key_" + std::to_string(i));
1469   }
1470 
1471   values = MultiGet(keys, nullptr);
1472   ASSERT_EQ(values.size(), keys.size());
1473   for (unsigned int j = 0; j < 48; ++j) {
1474     int key = j + 32;
1475     std::string value;
1476     value.append("val_l2_" + std::to_string(key));
1477     if (key % 3 == 0) {
1478       value.append(",");
1479       value.append("val_l1_" + std::to_string(key));
1480     }
1481     if (key % 5 == 0) {
1482       value.append(",");
1483       value.append("val_l0_" + std::to_string(key));
1484     }
1485     if (key % 9 == 0) {
1486       value.append(",");
1487       value.append("val_mem_" + std::to_string(key));
1488     }
1489     ASSERT_EQ(values[j], value);
1490   }
1491 }
1492 
1493 // Test class for batched MultiGet with prefix extractor
1494 // Param bool - If true, use partitioned filters
1495 //              If false, use full filter block
1496 class MultiGetPrefixExtractorTest : public DBBasicTest,
1497                                     public ::testing::WithParamInterface<bool> {
1498 };
1499 
TEST_P(MultiGetPrefixExtractorTest,Batched)1500 TEST_P(MultiGetPrefixExtractorTest, Batched) {
1501   Options options = CurrentOptions();
1502   options.prefix_extractor.reset(NewFixedPrefixTransform(2));
1503   options.memtable_prefix_bloom_size_ratio = 10;
1504   BlockBasedTableOptions bbto;
1505   if (GetParam()) {
1506     bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1507     bbto.partition_filters = true;
1508   }
1509   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1510   bbto.whole_key_filtering = false;
1511   bbto.cache_index_and_filter_blocks = false;
1512   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1513   Reopen(options);
1514 
1515   SetPerfLevel(kEnableCount);
1516   get_perf_context()->Reset();
1517 
1518   // First key is not in the prefix_extractor domain
1519   ASSERT_OK(Put("k", "v0"));
1520   ASSERT_OK(Put("kk1", "v1"));
1521   ASSERT_OK(Put("kk2", "v2"));
1522   ASSERT_OK(Put("kk3", "v3"));
1523   ASSERT_OK(Put("kk4", "v4"));
1524   std::vector<std::string> mem_keys(
1525       {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
1526   std::vector<std::string> inmem_values;
1527   inmem_values = MultiGet(mem_keys, nullptr);
1528   ASSERT_EQ(inmem_values[0], "v0");
1529   ASSERT_EQ(inmem_values[1], "v1");
1530   ASSERT_EQ(inmem_values[2], "v2");
1531   ASSERT_EQ(inmem_values[3], "v3");
1532   ASSERT_EQ(inmem_values[4], "v4");
1533   ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
1534   ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
1535   ASSERT_OK(Flush());
1536 
1537   std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
1538   std::vector<std::string> values;
1539   get_perf_context()->Reset();
1540   values = MultiGet(keys, nullptr);
1541   ASSERT_EQ(values[0], "v0");
1542   ASSERT_EQ(values[1], "v1");
1543   ASSERT_EQ(values[2], "v2");
1544   ASSERT_EQ(values[3], "v3");
1545   ASSERT_EQ(values[4], "v4");
1546   // Filter hits for 4 in-domain keys
1547   ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
1548 }
1549 
1550 INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
1551                         ::testing::Bool());
1552 
1553 #ifndef ROCKSDB_LITE
1554 class DBMultiGetRowCacheTest : public DBBasicTest,
1555                                public ::testing::WithParamInterface<bool> {};
1556 
TEST_P(DBMultiGetRowCacheTest,MultiGetBatched)1557 TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
1558   do {
1559     option_config_ = kRowCache;
1560     Options options = CurrentOptions();
1561     options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1562     CreateAndReopenWithCF({"pikachu"}, options);
1563     SetPerfLevel(kEnableCount);
1564     ASSERT_OK(Put(1, "k1", "v1"));
1565     ASSERT_OK(Put(1, "k2", "v2"));
1566     ASSERT_OK(Put(1, "k3", "v3"));
1567     ASSERT_OK(Put(1, "k4", "v4"));
1568     Flush(1);
1569     ASSERT_OK(Put(1, "k5", "v5"));
1570     const Snapshot* snap1 = dbfull()->GetSnapshot();
1571     ASSERT_OK(Delete(1, "k4"));
1572     Flush(1);
1573     const Snapshot* snap2 = dbfull()->GetSnapshot();
1574 
1575     get_perf_context()->Reset();
1576 
1577     std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
1578     std::vector<PinnableSlice> values(keys.size());
1579     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1580     std::vector<Status> s(keys.size());
1581 
1582     ReadOptions ro;
1583     bool use_snapshots = GetParam();
1584     if (use_snapshots) {
1585       ro.snapshot = snap2;
1586     }
1587     db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
1588                   s.data(), false);
1589 
1590     ASSERT_EQ(values.size(), keys.size());
1591     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
1592     ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1593     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1594     // four kv pairs * two bytes per value
1595     ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
1596 
1597     ASSERT_TRUE(s[0].IsNotFound());
1598     ASSERT_OK(s[1]);
1599     ASSERT_TRUE(s[2].IsNotFound());
1600     ASSERT_OK(s[3]);
1601     ASSERT_OK(s[4]);
1602 
1603     // Call MultiGet() again with some intersection with the previous set of
1604     // keys. Those should already be in the row cache.
1605     keys.assign({"no_key", "k5", "k3", "k2"});
1606     for (size_t i = 0; i < keys.size(); ++i) {
1607       values[i].Reset();
1608       s[i] = Status::OK();
1609     }
1610     get_perf_context()->Reset();
1611 
1612     if (use_snapshots) {
1613       ro.snapshot = snap1;
1614     }
1615     db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1616                   values.data(), s.data(), false);
1617 
1618     ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
1619     ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1620     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1621     // four kv pairs * two bytes per value
1622     ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
1623 
1624     ASSERT_TRUE(s[0].IsNotFound());
1625     ASSERT_OK(s[1]);
1626     ASSERT_OK(s[2]);
1627     ASSERT_OK(s[3]);
1628     if (use_snapshots) {
1629       // Only reads from the first SST file would have been cached, since
1630       // snapshot seq no is > fd.largest_seqno
1631       ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
1632     } else {
1633       ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
1634     }
1635 
1636     SetPerfLevel(kDisable);
1637     dbfull()->ReleaseSnapshot(snap1);
1638     dbfull()->ReleaseSnapshot(snap2);
1639   } while (ChangeCompactOptions());
1640 }
1641 
1642 INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
1643                         testing::Values(true, false));
1644 
TEST_F(DBBasicTest,GetAllKeyVersions)1645 TEST_F(DBBasicTest, GetAllKeyVersions) {
1646   Options options = CurrentOptions();
1647   options.env = env_;
1648   options.create_if_missing = true;
1649   options.disable_auto_compactions = true;
1650   CreateAndReopenWithCF({"pikachu"}, options);
1651   ASSERT_EQ(2, handles_.size());
1652   const size_t kNumInserts = 4;
1653   const size_t kNumDeletes = 4;
1654   const size_t kNumUpdates = 4;
1655 
1656   // Check default column family
1657   for (size_t i = 0; i != kNumInserts; ++i) {
1658     ASSERT_OK(Put(std::to_string(i), "value"));
1659   }
1660   for (size_t i = 0; i != kNumUpdates; ++i) {
1661     ASSERT_OK(Put(std::to_string(i), "value1"));
1662   }
1663   for (size_t i = 0; i != kNumDeletes; ++i) {
1664     ASSERT_OK(Delete(std::to_string(i)));
1665   }
1666   std::vector<KeyVersion> key_versions;
1667   ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1668       db_, Slice(), Slice(), std::numeric_limits<size_t>::max(),
1669       &key_versions));
1670   ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
1671   ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1672       db_, handles_[0], Slice(), Slice(), std::numeric_limits<size_t>::max(),
1673       &key_versions));
1674   ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
1675 
1676   // Check non-default column family
1677   for (size_t i = 0; i != kNumInserts - 1; ++i) {
1678     ASSERT_OK(Put(1, std::to_string(i), "value"));
1679   }
1680   for (size_t i = 0; i != kNumUpdates - 1; ++i) {
1681     ASSERT_OK(Put(1, std::to_string(i), "value1"));
1682   }
1683   for (size_t i = 0; i != kNumDeletes - 1; ++i) {
1684     ASSERT_OK(Delete(1, std::to_string(i)));
1685   }
1686   ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1687       db_, handles_[1], Slice(), Slice(), std::numeric_limits<size_t>::max(),
1688       &key_versions));
1689   ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
1690 }
1691 #endif  // !ROCKSDB_LITE
1692 
TEST_F(DBBasicTest,MultiGetIOBufferOverrun)1693 TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
1694   Options options = CurrentOptions();
1695   Random rnd(301);
1696   BlockBasedTableOptions table_options;
1697   table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1698   table_options.block_size = 16 * 1024;
1699   assert(table_options.block_size >
1700           BlockBasedTable::kMultiGetReadStackBufSize);
1701   options.table_factory.reset(new BlockBasedTableFactory(table_options));
1702   Reopen(options);
1703 
1704   std::string zero_str(128, '\0');
1705   for (int i = 0; i < 100; ++i) {
1706     // Make the value compressible. A purely random string doesn't compress
1707     // and the resultant data block will not be compressed
1708     std::string value(RandomString(&rnd, 128) + zero_str);
1709     assert(Put(Key(i), value) == Status::OK());
1710   }
1711   Flush();
1712 
1713   std::vector<std::string> key_data(10);
1714   std::vector<Slice> keys;
1715   // We cannot resize a PinnableSlice vector, so just set initial size to
1716   // largest we think we will need
1717   std::vector<PinnableSlice> values(10);
1718   std::vector<Status> statuses;
1719   ReadOptions ro;
1720 
1721   // Warm up the cache first
1722   key_data.emplace_back(Key(0));
1723   keys.emplace_back(Slice(key_data.back()));
1724   key_data.emplace_back(Key(50));
1725   keys.emplace_back(Slice(key_data.back()));
1726   statuses.resize(keys.size());
1727 
1728   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
1729                      keys.data(), values.data(), statuses.data(), true);
1730 }
1731 
1732 class DBBasicTestWithParallelIO
1733     : public DBTestBase,
1734       public testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> {
1735  public:
DBBasicTestWithParallelIO()1736   DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
1737     bool compressed_cache = std::get<0>(GetParam());
1738     bool uncompressed_cache = std::get<1>(GetParam());
1739     compression_enabled_ = std::get<2>(GetParam());
1740     fill_cache_ = std::get<3>(GetParam());
1741 
1742     if (compressed_cache) {
1743       std::shared_ptr<Cache> cache = NewLRUCache(1048576);
1744       compressed_cache_ = std::make_shared<MyBlockCache>(cache);
1745     }
1746     if (uncompressed_cache) {
1747       std::shared_ptr<Cache> cache = NewLRUCache(1048576);
1748       uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
1749     }
1750 
1751     env_->count_random_reads_ = true;
1752 
1753     Options options = CurrentOptions();
1754     Random rnd(301);
1755     BlockBasedTableOptions table_options;
1756 
1757 #ifndef ROCKSDB_LITE
1758     if (compression_enabled_) {
1759       std::vector<CompressionType> compression_types;
1760       compression_types = GetSupportedCompressions();
1761       // Not every platform may have compression libraries available, so
1762       // dynamically pick based on what's available
1763       if (compression_types.size() == 0) {
1764         compression_enabled_ = false;
1765       } else {
1766         options.compression = compression_types[0];
1767       }
1768     }
1769 #else
1770     // GetSupportedCompressions() is not available in LITE build
1771     if (!Snappy_Supported()) {
1772       compression_enabled_ = false;
1773     }
1774 #endif //ROCKSDB_LITE
1775 
1776     table_options.block_cache = uncompressed_cache_;
1777     if (table_options.block_cache == nullptr) {
1778       table_options.no_block_cache = true;
1779     } else {
1780       table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1781     }
1782     table_options.block_cache_compressed = compressed_cache_;
1783     table_options.flush_block_policy_factory.reset(
1784         new MyFlushBlockPolicyFactory());
1785     options.table_factory.reset(new BlockBasedTableFactory(table_options));
1786     if (!compression_enabled_) {
1787       options.compression = kNoCompression;
1788     }
1789     Reopen(options);
1790 
1791     std::string zero_str(128, '\0');
1792     for (int i = 0; i < 100; ++i) {
1793       // Make the value compressible. A purely random string doesn't compress
1794       // and the resultant data block will not be compressed
1795       values_.emplace_back(RandomString(&rnd, 128) + zero_str);
1796       assert(Put(Key(i), values_[i]) == Status::OK());
1797     }
1798     Flush();
1799 
1800     for (int i = 0; i < 100; ++i) {
1801       // block cannot gain space by compression
1802       uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0');
1803       std::string tmp_key = "a" + Key(i);
1804       assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK());
1805     }
1806     Flush();
1807   }
1808 
CheckValue(int i,const std::string & value)1809   bool CheckValue(int i, const std::string& value) {
1810     if (values_[i].compare(value) == 0) {
1811       return true;
1812     }
1813     return false;
1814   }
1815 
CheckUncompressableValue(int i,const std::string & value)1816   bool CheckUncompressableValue(int i, const std::string& value) {
1817     if (uncompressable_values_[i].compare(value) == 0) {
1818       return true;
1819     }
1820     return false;
1821   }
1822 
num_lookups()1823   int num_lookups() { return uncompressed_cache_->num_lookups(); }
num_found()1824   int num_found() { return uncompressed_cache_->num_found(); }
num_inserts()1825   int num_inserts() { return uncompressed_cache_->num_inserts(); }
1826 
num_lookups_compressed()1827   int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
num_found_compressed()1828   int num_found_compressed() { return compressed_cache_->num_found(); }
num_inserts_compressed()1829   int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
1830 
fill_cache()1831   bool fill_cache() { return fill_cache_; }
compression_enabled()1832   bool compression_enabled() { return compression_enabled_; }
has_compressed_cache()1833   bool has_compressed_cache() { return compressed_cache_ != nullptr; }
has_uncompressed_cache()1834   bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
1835 
SetUpTestCase()1836   static void SetUpTestCase() {}
TearDownTestCase()1837   static void TearDownTestCase() {}
1838 
1839  private:
1840   class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
1841    public:
MyFlushBlockPolicyFactory()1842     MyFlushBlockPolicyFactory() {}
1843 
Name() const1844     virtual const char* Name() const override {
1845       return "MyFlushBlockPolicyFactory";
1846     }
1847 
NewFlushBlockPolicy(const BlockBasedTableOptions &,const BlockBuilder & data_block_builder) const1848     virtual FlushBlockPolicy* NewFlushBlockPolicy(
1849         const BlockBasedTableOptions& /*table_options*/,
1850         const BlockBuilder& data_block_builder) const override {
1851       return new MyFlushBlockPolicy(data_block_builder);
1852     }
1853   };
1854 
1855   class MyFlushBlockPolicy : public FlushBlockPolicy {
1856    public:
MyFlushBlockPolicy(const BlockBuilder & data_block_builder)1857     explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
1858         : num_keys_(0), data_block_builder_(data_block_builder) {}
1859 
Update(const Slice &,const Slice &)1860     bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
1861       if (data_block_builder_.empty()) {
1862         // First key in this block
1863         num_keys_ = 1;
1864         return false;
1865       }
1866       // Flush every 10 keys
1867       if (num_keys_ == 10) {
1868         num_keys_ = 1;
1869         return true;
1870       }
1871       num_keys_++;
1872       return false;
1873     }
1874 
1875    private:
1876     int num_keys_;
1877     const BlockBuilder& data_block_builder_;
1878   };
1879 
1880   class MyBlockCache : public Cache {
1881    public:
MyBlockCache(std::shared_ptr<Cache> & target)1882     explicit MyBlockCache(std::shared_ptr<Cache>& target)
1883         : target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
1884 
Name() const1885     virtual const char* Name() const override { return "MyBlockCache"; }
1886 
Insert(const Slice & key,void * value,size_t charge,void (* deleter)(const Slice & key,void * value),Handle ** handle=nullptr,Priority priority=Priority::LOW)1887     virtual Status Insert(const Slice& key, void* value, size_t charge,
1888                           void (*deleter)(const Slice& key, void* value),
1889                           Handle** handle = nullptr,
1890                           Priority priority = Priority::LOW) override {
1891       num_inserts_++;
1892       return target_->Insert(key, value, charge, deleter, handle, priority);
1893     }
1894 
Lookup(const Slice & key,Statistics * stats=nullptr)1895     virtual Handle* Lookup(const Slice& key,
1896                            Statistics* stats = nullptr) override {
1897       num_lookups_++;
1898       Handle* handle = target_->Lookup(key, stats);
1899       if (handle != nullptr) {
1900         num_found_++;
1901       }
1902       return handle;
1903     }
1904 
Ref(Handle * handle)1905     virtual bool Ref(Handle* handle) override { return target_->Ref(handle); }
1906 
Release(Handle * handle,bool force_erase=false)1907     virtual bool Release(Handle* handle, bool force_erase = false) override {
1908       return target_->Release(handle, force_erase);
1909     }
1910 
Value(Handle * handle)1911     virtual void* Value(Handle* handle) override {
1912       return target_->Value(handle);
1913     }
1914 
Erase(const Slice & key)1915     virtual void Erase(const Slice& key) override { target_->Erase(key); }
NewId()1916     virtual uint64_t NewId() override { return target_->NewId(); }
1917 
SetCapacity(size_t capacity)1918     virtual void SetCapacity(size_t capacity) override {
1919       target_->SetCapacity(capacity);
1920     }
1921 
SetStrictCapacityLimit(bool strict_capacity_limit)1922     virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
1923       target_->SetStrictCapacityLimit(strict_capacity_limit);
1924     }
1925 
HasStrictCapacityLimit() const1926     virtual bool HasStrictCapacityLimit() const override {
1927       return target_->HasStrictCapacityLimit();
1928     }
1929 
GetCapacity() const1930     virtual size_t GetCapacity() const override {
1931       return target_->GetCapacity();
1932     }
1933 
GetUsage() const1934     virtual size_t GetUsage() const override { return target_->GetUsage(); }
1935 
GetUsage(Handle * handle) const1936     virtual size_t GetUsage(Handle* handle) const override {
1937       return target_->GetUsage(handle);
1938     }
1939 
GetPinnedUsage() const1940     virtual size_t GetPinnedUsage() const override {
1941       return target_->GetPinnedUsage();
1942     }
1943 
GetCharge(Handle *) const1944     virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
1945 
ApplyToAllCacheEntries(void (* callback)(void *,size_t),bool thread_safe)1946     virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
1947                                         bool thread_safe) override {
1948       return target_->ApplyToAllCacheEntries(callback, thread_safe);
1949     }
1950 
EraseUnRefEntries()1951     virtual void EraseUnRefEntries() override {
1952       return target_->EraseUnRefEntries();
1953     }
1954 
num_lookups()1955     int num_lookups() { return num_lookups_; }
1956 
num_found()1957     int num_found() { return num_found_; }
1958 
num_inserts()1959     int num_inserts() { return num_inserts_; }
1960 
1961    private:
1962     std::shared_ptr<Cache> target_;
1963     int num_lookups_;
1964     int num_found_;
1965     int num_inserts_;
1966   };
1967 
1968   std::shared_ptr<MyBlockCache> compressed_cache_;
1969   std::shared_ptr<MyBlockCache> uncompressed_cache_;
1970   bool compression_enabled_;
1971   std::vector<std::string> values_;
1972   std::vector<std::string> uncompressable_values_;
1973   bool fill_cache_;
1974 };
1975 
TEST_P(DBBasicTestWithParallelIO,MultiGet)1976 TEST_P(DBBasicTestWithParallelIO, MultiGet) {
1977   std::vector<std::string> key_data(10);
1978   std::vector<Slice> keys;
1979   // We cannot resize a PinnableSlice vector, so just set initial size to
1980   // largest we think we will need
1981   std::vector<PinnableSlice> values(10);
1982   std::vector<Status> statuses;
1983   ReadOptions ro;
1984   ro.fill_cache = fill_cache();
1985 
1986   // Warm up the cache first
1987   key_data.emplace_back(Key(0));
1988   keys.emplace_back(Slice(key_data.back()));
1989   key_data.emplace_back(Key(50));
1990   keys.emplace_back(Slice(key_data.back()));
1991   statuses.resize(keys.size());
1992 
1993   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
1994                      keys.data(), values.data(), statuses.data(), true);
1995   ASSERT_TRUE(CheckValue(0, values[0].ToString()));
1996   ASSERT_TRUE(CheckValue(50, values[1].ToString()));
1997 
1998   int random_reads = env_->random_read_counter_.Read();
1999   key_data[0] = Key(1);
2000   key_data[1] = Key(51);
2001   keys[0] = Slice(key_data[0]);
2002   keys[1] = Slice(key_data[1]);
2003   values[0].Reset();
2004   values[1].Reset();
2005   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2006                      keys.data(), values.data(), statuses.data(), true);
2007   ASSERT_TRUE(CheckValue(1, values[0].ToString()));
2008   ASSERT_TRUE(CheckValue(51, values[1].ToString()));
2009 
2010   bool read_from_cache = false;
2011   if (fill_cache()) {
2012     if (has_uncompressed_cache()) {
2013       read_from_cache = true;
2014     } else if (has_compressed_cache() && compression_enabled()) {
2015       read_from_cache = true;
2016     }
2017   }
2018 
2019   int expected_reads = random_reads + (read_from_cache ? 0 : 2);
2020   ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2021 
2022   keys.resize(10);
2023   statuses.resize(10);
2024   std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2025   for (size_t i = 0; i < key_ints.size(); ++i) {
2026     key_data[i] = Key(key_ints[i]);
2027     keys[i] = Slice(key_data[i]);
2028     statuses[i] = Status::OK();
2029     values[i].Reset();
2030   }
2031   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2032                      keys.data(), values.data(), statuses.data(), true);
2033   for (size_t i = 0; i < key_ints.size(); ++i) {
2034     ASSERT_OK(statuses[i]);
2035     ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
2036   }
2037   if (compression_enabled() && !has_compressed_cache()) {
2038     expected_reads += (read_from_cache ? 2 : 3);
2039   } else {
2040     expected_reads += (read_from_cache ? 2 : 4);
2041   }
2042   ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2043 
2044   keys.resize(10);
2045   statuses.resize(10);
2046   std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2047   for (size_t i = 0; i < key_uncmp.size(); ++i) {
2048     key_data[i] = "a" + Key(key_uncmp[i]);
2049     keys[i] = Slice(key_data[i]);
2050     statuses[i] = Status::OK();
2051     values[i].Reset();
2052   }
2053   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2054                      keys.data(), values.data(), statuses.data(), true);
2055   for (size_t i = 0; i < key_uncmp.size(); ++i) {
2056     ASSERT_OK(statuses[i]);
2057     ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
2058   }
2059   if (compression_enabled() && !has_compressed_cache()) {
2060     expected_reads += (read_from_cache ? 3 : 3);
2061   } else {
2062     expected_reads += (read_from_cache ? 4 : 4);
2063   }
2064   ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2065 
2066   keys.resize(5);
2067   statuses.resize(5);
2068   std::vector<int> key_tr{1, 2, 15, 16, 55};
2069   for (size_t i = 0; i < key_tr.size(); ++i) {
2070     key_data[i] = "a" + Key(key_tr[i]);
2071     keys[i] = Slice(key_data[i]);
2072     statuses[i] = Status::OK();
2073     values[i].Reset();
2074   }
2075   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2076                      keys.data(), values.data(), statuses.data(), true);
2077   for (size_t i = 0; i < key_tr.size(); ++i) {
2078     ASSERT_OK(statuses[i]);
2079     ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
2080   }
2081   if (compression_enabled() && !has_compressed_cache()) {
2082     expected_reads += (read_from_cache ? 0 : 2);
2083     ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2084   } else {
2085     if (has_uncompressed_cache()) {
2086       expected_reads += (read_from_cache ? 0 : 3);
2087       ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2088     } else {
2089       // A rare case, even we enable the block compression but some of data
2090       // blocks are not compressed due to content. If user only enable the
2091       // compressed cache, the uncompressed blocks will not tbe cached, and
2092       // block reads will be triggered. The number of reads is related to
2093       // the compression algorithm.
2094       ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
2095     }
2096   }
2097 }
2098 
TEST_P(DBBasicTestWithParallelIO,MultiGetWithChecksumMismatch)2099 TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
2100   std::vector<std::string> key_data(10);
2101   std::vector<Slice> keys;
2102   // We cannot resize a PinnableSlice vector, so just set initial size to
2103   // largest we think we will need
2104   std::vector<PinnableSlice> values(10);
2105   std::vector<Status> statuses;
2106   int read_count = 0;
2107   ReadOptions ro;
2108   ro.fill_cache = fill_cache();
2109 
2110   SyncPoint::GetInstance()->SetCallBack(
2111       "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) {
2112       Status* s = static_cast<Status*>(status);
2113       read_count++;
2114       if (read_count == 2) {
2115         *s = Status::Corruption();
2116       }
2117     });
2118   SyncPoint::GetInstance()->EnableProcessing();
2119 
2120   // Warm up the cache first
2121   key_data.emplace_back(Key(0));
2122   keys.emplace_back(Slice(key_data.back()));
2123   key_data.emplace_back(Key(50));
2124   keys.emplace_back(Slice(key_data.back()));
2125   statuses.resize(keys.size());
2126 
2127   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2128                      keys.data(), values.data(), statuses.data(), true);
2129   ASSERT_TRUE(CheckValue(0, values[0].ToString()));
2130   //ASSERT_TRUE(CheckValue(50, values[1].ToString()));
2131   ASSERT_EQ(statuses[0], Status::OK());
2132   ASSERT_EQ(statuses[1], Status::Corruption());
2133 
2134   SyncPoint::GetInstance()->DisableProcessing();
2135 }
2136 
TEST_P(DBBasicTestWithParallelIO,MultiGetWithMissingFile)2137 TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
2138   std::vector<std::string> key_data(10);
2139   std::vector<Slice> keys;
2140   // We cannot resize a PinnableSlice vector, so just set initial size to
2141   // largest we think we will need
2142   std::vector<PinnableSlice> values(10);
2143   std::vector<Status> statuses;
2144   ReadOptions ro;
2145   ro.fill_cache = fill_cache();
2146 
2147   SyncPoint::GetInstance()->SetCallBack(
2148       "TableCache::MultiGet:FindTable", [&](void *status) {
2149       Status* s = static_cast<Status*>(status);
2150       *s = Status::IOError();
2151     });
2152   // DB open will create table readers unless we reduce the table cache
2153   // capacity.
2154   // SanitizeOptions will set max_open_files to minimum of 20. Table cache
2155   // is allocated with max_open_files - 10 as capacity. So override
2156   // max_open_files to 11 so table cache capacity will become 1. This will
2157   // prevent file open during DB open and force the file to be opened
2158   // during MultiGet
2159   SyncPoint::GetInstance()->SetCallBack(
2160       "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) {
2161       int* max_open_files = (int*)arg;
2162       *max_open_files = 11;
2163     });
2164   SyncPoint::GetInstance()->EnableProcessing();
2165 
2166   Reopen(CurrentOptions());
2167 
2168   // Warm up the cache first
2169   key_data.emplace_back(Key(0));
2170   keys.emplace_back(Slice(key_data.back()));
2171   key_data.emplace_back(Key(50));
2172   keys.emplace_back(Slice(key_data.back()));
2173   statuses.resize(keys.size());
2174 
2175   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2176                      keys.data(), values.data(), statuses.data(), true);
2177   ASSERT_EQ(statuses[0], Status::IOError());
2178   ASSERT_EQ(statuses[1], Status::IOError());
2179 
2180   SyncPoint::GetInstance()->DisableProcessing();
2181 }
2182 
2183 INSTANTIATE_TEST_CASE_P(
2184     ParallelIO, DBBasicTestWithParallelIO,
2185     // Params are as follows -
2186     // Param 0 - Compressed cache enabled
2187     // Param 1 - Uncompressed cache enabled
2188     // Param 2 - Data compression enabled
2189     // Param 3 - ReadOptions::fill_cache
2190     ::testing::Combine(::testing::Bool(), ::testing::Bool(),
2191                        ::testing::Bool(), ::testing::Bool()));
2192 
2193 class DBBasicTestWithTimestampBase : public DBTestBase {
2194  public:
DBBasicTestWithTimestampBase(const std::string & dbname)2195   explicit DBBasicTestWithTimestampBase(const std::string& dbname)
2196       : DBTestBase(dbname) {}
2197 
2198  protected:
2199   class TestComparatorBase : public Comparator {
2200    public:
TestComparatorBase(size_t ts_sz)2201     explicit TestComparatorBase(size_t ts_sz) : Comparator(ts_sz) {}
2202 
Name() const2203     const char* Name() const override { return "TestComparator"; }
2204 
FindShortSuccessor(std::string *) const2205     void FindShortSuccessor(std::string*) const override {}
2206 
FindShortestSeparator(std::string *,const Slice &) const2207     void FindShortestSeparator(std::string*, const Slice&) const override {}
2208 
Compare(const Slice & a,const Slice & b) const2209     int Compare(const Slice& a, const Slice& b) const override {
2210       int r = CompareWithoutTimestamp(a, b);
2211       if (r != 0 || 0 == timestamp_size()) {
2212         return r;
2213       }
2214       return CompareTimestamp(
2215           Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
2216           Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
2217     }
2218 
2219     virtual int CompareImpl(const Slice& a, const Slice& b) const = 0;
2220 
CompareWithoutTimestamp(const Slice & a,const Slice & b) const2221     int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override {
2222       assert(a.size() >= timestamp_size());
2223       assert(b.size() >= timestamp_size());
2224       Slice k1 = StripTimestampFromUserKey(a, timestamp_size());
2225       Slice k2 = StripTimestampFromUserKey(b, timestamp_size());
2226 
2227       return CompareImpl(k1, k2);
2228     }
2229 
CompareTimestamp(const Slice & ts1,const Slice & ts2) const2230     int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
2231       if (!ts1.data() && !ts2.data()) {
2232         return 0;
2233       } else if (ts1.data() && !ts2.data()) {
2234         return 1;
2235       } else if (!ts1.data() && ts2.data()) {
2236         return -1;
2237       }
2238       assert(ts1.size() == ts2.size());
2239       uint64_t low1 = 0;
2240       uint64_t low2 = 0;
2241       uint64_t high1 = 0;
2242       uint64_t high2 = 0;
2243       auto* ptr1 = const_cast<Slice*>(&ts1);
2244       auto* ptr2 = const_cast<Slice*>(&ts2);
2245       if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
2246           !GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
2247         assert(false);
2248       }
2249       if (high1 < high2) {
2250         return 1;
2251       } else if (high1 > high2) {
2252         return -1;
2253       }
2254       if (low1 < low2) {
2255         return 1;
2256       } else if (low1 > low2) {
2257         return -1;
2258       }
2259       return 0;
2260     }
2261   };
2262 
EncodeTimestamp(uint64_t low,uint64_t high,std::string * ts)2263   Slice EncodeTimestamp(uint64_t low, uint64_t high, std::string* ts) {
2264     assert(nullptr != ts);
2265     ts->clear();
2266     PutFixed64(ts, low);
2267     PutFixed64(ts, high);
2268     assert(ts->size() == sizeof(low) + sizeof(high));
2269     return Slice(*ts);
2270   }
2271 };
2272 
2273 class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
2274  public:
DBBasicTestWithTimestamp()2275   DBBasicTestWithTimestamp()
2276       : DBBasicTestWithTimestampBase("/db_basic_test_with_timestamp") {}
2277 
2278  protected:
2279   class TestComparator : public TestComparatorBase {
2280    public:
2281     const int kKeyPrefixLength =
2282         3;  // 3: length of "key" in generated keys ("key" + std::to_string(j))
TestComparator(size_t ts_sz)2283     explicit TestComparator(size_t ts_sz) : TestComparatorBase(ts_sz) {}
2284 
CompareImpl(const Slice & a,const Slice & b) const2285     int CompareImpl(const Slice& a, const Slice& b) const override {
2286       int n1 = atoi(
2287           std::string(a.data() + kKeyPrefixLength, a.size() - kKeyPrefixLength)
2288               .c_str());
2289       int n2 = atoi(
2290           std::string(b.data() + kKeyPrefixLength, b.size() - kKeyPrefixLength)
2291               .c_str());
2292       return (n1 < n2) ? -1 : (n1 > n2) ? 1 : 0;
2293     }
2294   };
2295 };
2296 
2297 #ifndef ROCKSDB_LITE
2298 // A class which remembers the name of each flushed file.
2299 class FlushedFileCollector : public EventListener {
2300  public:
FlushedFileCollector()2301   FlushedFileCollector() {}
~FlushedFileCollector()2302   ~FlushedFileCollector() override {}
2303 
OnFlushCompleted(DB *,const FlushJobInfo & info)2304   void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
2305     InstrumentedMutexLock lock(&mutex_);
2306     flushed_files_.push_back(info.file_path);
2307   }
2308 
GetFlushedFiles()2309   std::vector<std::string> GetFlushedFiles() {
2310     std::vector<std::string> result;
2311     {
2312       InstrumentedMutexLock lock(&mutex_);
2313       result = flushed_files_;
2314     }
2315     return result;
2316   }
2317 
ClearFlushedFiles()2318   void ClearFlushedFiles() {
2319     InstrumentedMutexLock lock(&mutex_);
2320     flushed_files_.clear();
2321   }
2322 
2323  private:
2324   std::vector<std::string> flushed_files_;
2325   InstrumentedMutex mutex_;
2326 };
2327 
TEST_F(DBBasicTestWithTimestamp,PutAndGetWithCompaction)2328 TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
2329   const int kNumKeysPerFile = 8192;
2330   const size_t kNumTimestamps = 2;
2331   const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
2332   const size_t kSplitPosBase = kNumKeysPerTimestamp / 2;
2333   Options options = CurrentOptions();
2334   options.create_if_missing = true;
2335   options.env = env_;
2336   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
2337 
2338   FlushedFileCollector* collector = new FlushedFileCollector();
2339   options.listeners.emplace_back(collector);
2340 
2341   std::string tmp;
2342   size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
2343   TestComparator test_cmp(ts_sz);
2344   options.comparator = &test_cmp;
2345   BlockBasedTableOptions bbto;
2346   bbto.filter_policy.reset(NewBloomFilterPolicy(
2347       10 /*bits_per_key*/, false /*use_block_based_builder*/));
2348   bbto.whole_key_filtering = true;
2349   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2350   DestroyAndReopen(options);
2351   CreateAndReopenWithCF({"pikachu"}, options);
2352   size_t num_cfs = handles_.size();
2353   ASSERT_EQ(2, num_cfs);
2354   std::vector<std::string> write_ts_strs(kNumTimestamps);
2355   std::vector<std::string> read_ts_strs(kNumTimestamps);
2356   std::vector<Slice> write_ts_list;
2357   std::vector<Slice> read_ts_list;
2358 
2359   for (size_t i = 0; i != kNumTimestamps; ++i) {
2360     write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
2361     read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
2362     const Slice& write_ts = write_ts_list.back();
2363     WriteOptions wopts;
2364     wopts.timestamp = &write_ts;
2365     for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2366       for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
2367         ASSERT_OK(Put(cf, "key" + std::to_string(j),
2368                       "value_" + std::to_string(j) + "_" + std::to_string(i),
2369                       wopts));
2370         if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
2371           // flush all keys with the same timestamp to two sst files, split at
2372           // incremental positions such that lowerlevel[1].smallest.userkey ==
2373           // higherlevel[0].largest.userkey
2374           ASSERT_OK(Flush(cf));
2375 
2376           // compact files (2 at each level) to a lower level such that all keys
2377           // with the same timestamp is at one level, with newer versions at
2378           // higher levels.
2379           CompactionOptions compact_opt;
2380           compact_opt.compression = kNoCompression;
2381           db_->CompactFiles(compact_opt, handles_[cf],
2382                             collector->GetFlushedFiles(),
2383                             static_cast<int>(kNumTimestamps - i));
2384           collector->ClearFlushedFiles();
2385         }
2386       }
2387     }
2388   }
2389   const auto& verify_db_func = [&]() {
2390     for (size_t i = 0; i != kNumTimestamps; ++i) {
2391       ReadOptions ropts;
2392       ropts.timestamp = &read_ts_list[i];
2393       for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2394         ColumnFamilyHandle* cfh = handles_[cf];
2395         for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
2396           std::string value;
2397           ASSERT_OK(db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
2398           ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
2399                     value);
2400         }
2401       }
2402     }
2403   };
2404   verify_db_func();
2405 }
2406 #endif  // !ROCKSDB_LITE
2407 
2408 class DBBasicTestWithTimestampWithParam
2409     : public DBBasicTestWithTimestampBase,
2410       public testing::WithParamInterface<bool> {
2411  public:
DBBasicTestWithTimestampWithParam()2412   DBBasicTestWithTimestampWithParam()
2413       : DBBasicTestWithTimestampBase(
2414             "/db_basic_test_with_timestamp_with_param") {}
2415 
2416  protected:
2417   class TestComparator : public TestComparatorBase {
2418    private:
2419     const Comparator* cmp_without_ts_;
2420 
2421    public:
TestComparator(size_t ts_sz)2422     explicit TestComparator(size_t ts_sz)
2423         : TestComparatorBase(ts_sz), cmp_without_ts_(nullptr) {
2424       cmp_without_ts_ = BytewiseComparator();
2425     }
2426 
CompareImpl(const Slice & a,const Slice & b) const2427     int CompareImpl(const Slice& a, const Slice& b) const override {
2428       return cmp_without_ts_->Compare(a, b);
2429     }
2430   };
2431 };
2432 
TEST_P(DBBasicTestWithTimestampWithParam,PutAndGet)2433 TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) {
2434   const int kNumKeysPerFile = 8192;
2435   const size_t kNumTimestamps = 6;
2436   bool memtable_only = GetParam();
2437   Options options = CurrentOptions();
2438   options.create_if_missing = true;
2439   options.env = env_;
2440   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
2441   std::string tmp;
2442   size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
2443   TestComparator test_cmp(ts_sz);
2444   options.comparator = &test_cmp;
2445   BlockBasedTableOptions bbto;
2446   bbto.filter_policy.reset(NewBloomFilterPolicy(
2447       10 /*bits_per_key*/, false /*use_block_based_builder*/));
2448   bbto.whole_key_filtering = true;
2449   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2450 
2451   std::vector<CompressionType> compression_types;
2452   compression_types.push_back(kNoCompression);
2453   if (Zlib_Supported()) {
2454     compression_types.push_back(kZlibCompression);
2455   }
2456 #if LZ4_VERSION_NUMBER >= 10400  // r124+
2457   compression_types.push_back(kLZ4Compression);
2458   compression_types.push_back(kLZ4HCCompression);
2459 #endif  // LZ4_VERSION_NUMBER >= 10400
2460   if (ZSTD_Supported()) {
2461     compression_types.push_back(kZSTD);
2462   }
2463 
2464   // Switch compression dictionary on/off to check key extraction
2465   // correctness in kBuffered state
2466   std::vector<uint32_t> max_dict_bytes_list = {0, 1 << 14};  // 0 or 16KB
2467 
2468   for (auto compression_type : compression_types) {
2469     for (uint32_t max_dict_bytes : max_dict_bytes_list) {
2470       options.compression = compression_type;
2471       options.compression_opts.max_dict_bytes = max_dict_bytes;
2472       if (compression_type == kZSTD) {
2473         options.compression_opts.zstd_max_train_bytes = max_dict_bytes;
2474       }
2475       options.target_file_size_base = 1 << 26;  // 64MB
2476 
2477       DestroyAndReopen(options);
2478       CreateAndReopenWithCF({"pikachu"}, options);
2479       size_t num_cfs = handles_.size();
2480       ASSERT_EQ(2, num_cfs);
2481       std::vector<std::string> write_ts_strs(kNumTimestamps);
2482       std::vector<std::string> read_ts_strs(kNumTimestamps);
2483       std::vector<Slice> write_ts_list;
2484       std::vector<Slice> read_ts_list;
2485 
2486       for (size_t i = 0; i != kNumTimestamps; ++i) {
2487         write_ts_list.emplace_back(
2488             EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
2489         read_ts_list.emplace_back(
2490             EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
2491         const Slice& write_ts = write_ts_list.back();
2492         WriteOptions wopts;
2493         wopts.timestamp = &write_ts;
2494         for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2495           for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
2496             ASSERT_OK(Put(
2497                 cf, "key" + std::to_string(j),
2498                 "value_" + std::to_string(j) + "_" + std::to_string(i), wopts));
2499           }
2500           if (!memtable_only) {
2501             ASSERT_OK(Flush(cf));
2502           }
2503         }
2504       }
2505       const auto& verify_db_func = [&]() {
2506         for (size_t i = 0; i != kNumTimestamps; ++i) {
2507           ReadOptions ropts;
2508           ropts.timestamp = &read_ts_list[i];
2509           for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
2510             ColumnFamilyHandle* cfh = handles_[cf];
2511             for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps;
2512                  ++j) {
2513               std::string value;
2514               ASSERT_OK(
2515                   db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
2516               ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
2517                         value);
2518             }
2519           }
2520         }
2521       };
2522       verify_db_func();
2523     }
2524   }
2525 }
2526 
2527 INSTANTIATE_TEST_CASE_P(Timestamp, DBBasicTestWithTimestampWithParam,
2528                         ::testing::Bool());
2529 
2530 }  // namespace ROCKSDB_NAMESPACE
2531 
2532 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2533 extern "C" {
2534 void RegisterCustomObjects(int argc, char** argv);
2535 }
2536 #else
RegisterCustomObjects(int,char **)2537 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
2538 #endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2539 
main(int argc,char ** argv)2540 int main(int argc, char** argv) {
2541   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2542   ::testing::InitGoogleTest(&argc, argv);
2543   RegisterCustomObjects(argc, argv);
2544   return RUN_ALL_TESTS();
2545 }
2546