1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include <cstring>
11 
12 #include "db/db_test_util.h"
13 #include "port/stack_trace.h"
14 #include "rocksdb/flush_block_policy.h"
15 #include "rocksdb/merge_operator.h"
16 #include "rocksdb/perf_context.h"
17 #include "rocksdb/utilities/debug.h"
18 #include "table/block_based/block_based_table_reader.h"
19 #include "table/block_based/block_builder.h"
20 #if !defined(ROCKSDB_LITE)
21 #include "test_util/sync_point.h"
22 #endif
23 #include "util/file_checksum_helper.h"
24 #include "util/random.h"
25 #include "utilities/fault_injection_env.h"
26 #include "utilities/merge_operators.h"
27 #include "utilities/merge_operators/string_append/stringappend.h"
28 
29 namespace ROCKSDB_NAMESPACE {
30 
31 class DBBasicTest : public DBTestBase {
32  public:
DBBasicTest()33   DBBasicTest() : DBTestBase("db_basic_test", /*env_do_fsync=*/false) {}
34 };
35 
TEST_F(DBBasicTest,OpenWhenOpen)36 TEST_F(DBBasicTest, OpenWhenOpen) {
37   Options options = CurrentOptions();
38   options.env = env_;
39   DB* db2 = nullptr;
40   Status s = DB::Open(options, dbname_, &db2);
41   ASSERT_NOK(s) << [db2]() {
42     delete db2;
43     return "db2 open: ok";
44   }();
45   ASSERT_EQ(Status::Code::kIOError, s.code());
46   ASSERT_EQ(Status::SubCode::kNone, s.subcode());
47   ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
48 
49   delete db2;
50 }
51 
TEST_F(DBBasicTest,UniqueSession)52 TEST_F(DBBasicTest, UniqueSession) {
53   Options options = CurrentOptions();
54   std::string sid1, sid2, sid3, sid4;
55 
56   ASSERT_OK(db_->GetDbSessionId(sid1));
57   Reopen(options);
58   ASSERT_OK(db_->GetDbSessionId(sid2));
59   ASSERT_OK(Put("foo", "v1"));
60   ASSERT_OK(db_->GetDbSessionId(sid4));
61   Reopen(options);
62   ASSERT_OK(db_->GetDbSessionId(sid3));
63 
64   ASSERT_NE(sid1, sid2);
65   ASSERT_NE(sid1, sid3);
66   ASSERT_NE(sid2, sid3);
67 
68   ASSERT_EQ(sid2, sid4);
69 
70   // Expected compact format for session ids (see notes in implementation)
71   TestRegex expected("[0-9A-Z]{20}");
72   EXPECT_MATCHES_REGEX(sid1, expected);
73   EXPECT_MATCHES_REGEX(sid2, expected);
74   EXPECT_MATCHES_REGEX(sid3, expected);
75 
76 #ifndef ROCKSDB_LITE
77   Close();
78   ASSERT_OK(ReadOnlyReopen(options));
79   ASSERT_OK(db_->GetDbSessionId(sid1));
80   // Test uniqueness between readonly open (sid1) and regular open (sid3)
81   ASSERT_NE(sid1, sid3);
82   Close();
83   ASSERT_OK(ReadOnlyReopen(options));
84   ASSERT_OK(db_->GetDbSessionId(sid2));
85   ASSERT_EQ("v1", Get("foo"));
86   ASSERT_OK(db_->GetDbSessionId(sid3));
87 
88   ASSERT_NE(sid1, sid2);
89 
90   ASSERT_EQ(sid2, sid3);
91 #endif  // ROCKSDB_LITE
92 
93   CreateAndReopenWithCF({"goku"}, options);
94   ASSERT_OK(db_->GetDbSessionId(sid1));
95   ASSERT_OK(Put("bar", "e1"));
96   ASSERT_OK(db_->GetDbSessionId(sid2));
97   ASSERT_EQ("e1", Get("bar"));
98   ASSERT_OK(db_->GetDbSessionId(sid3));
99   ReopenWithColumnFamilies({"default", "goku"}, options);
100   ASSERT_OK(db_->GetDbSessionId(sid4));
101 
102   ASSERT_EQ(sid1, sid2);
103   ASSERT_EQ(sid2, sid3);
104 
105   ASSERT_NE(sid1, sid4);
106 }
107 
108 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,ReadOnlyDB)109 TEST_F(DBBasicTest, ReadOnlyDB) {
110   ASSERT_OK(Put("foo", "v1"));
111   ASSERT_OK(Put("bar", "v2"));
112   ASSERT_OK(Put("foo", "v3"));
113   Close();
114 
115   auto verify_one_iter = [&](Iterator* iter) {
116     int count = 0;
117     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
118       ASSERT_OK(iter->status());
119       ++count;
120     }
121     // Always expect two keys: "foo" and "bar"
122     ASSERT_EQ(count, 2);
123   };
124 
125   auto verify_all_iters = [&]() {
126     Iterator* iter = db_->NewIterator(ReadOptions());
127     verify_one_iter(iter);
128     delete iter;
129 
130     std::vector<Iterator*> iters;
131     ASSERT_OK(db_->NewIterators(ReadOptions(),
132                                 {dbfull()->DefaultColumnFamily()}, &iters));
133     ASSERT_EQ(static_cast<uint64_t>(1), iters.size());
134     verify_one_iter(iters[0]);
135     delete iters[0];
136   };
137 
138   auto options = CurrentOptions();
139   assert(options.env == env_);
140   ASSERT_OK(ReadOnlyReopen(options));
141   ASSERT_EQ("v3", Get("foo"));
142   ASSERT_EQ("v2", Get("bar"));
143   verify_all_iters();
144   Close();
145 
146   // Reopen and flush memtable.
147   Reopen(options);
148   ASSERT_OK(Flush());
149   Close();
150   // Now check keys in read only mode.
151   ASSERT_OK(ReadOnlyReopen(options));
152   ASSERT_EQ("v3", Get("foo"));
153   ASSERT_EQ("v2", Get("bar"));
154   verify_all_iters();
155   ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
156 }
157 
TEST_F(DBBasicTest,ReadOnlyDBWithWriteDBIdToManifestSet)158 TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
159   ASSERT_OK(Put("foo", "v1"));
160   ASSERT_OK(Put("bar", "v2"));
161   ASSERT_OK(Put("foo", "v3"));
162   Close();
163 
164   auto options = CurrentOptions();
165   options.write_dbid_to_manifest = true;
166   assert(options.env == env_);
167   ASSERT_OK(ReadOnlyReopen(options));
168   std::string db_id1;
169   ASSERT_OK(db_->GetDbIdentity(db_id1));
170   ASSERT_EQ("v3", Get("foo"));
171   ASSERT_EQ("v2", Get("bar"));
172   Iterator* iter = db_->NewIterator(ReadOptions());
173   int count = 0;
174   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
175     ASSERT_OK(iter->status());
176     ++count;
177   }
178   ASSERT_EQ(count, 2);
179   delete iter;
180   Close();
181 
182   // Reopen and flush memtable.
183   Reopen(options);
184   ASSERT_OK(Flush());
185   Close();
186   // Now check keys in read only mode.
187   ASSERT_OK(ReadOnlyReopen(options));
188   ASSERT_EQ("v3", Get("foo"));
189   ASSERT_EQ("v2", Get("bar"));
190   ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
191   std::string db_id2;
192   ASSERT_OK(db_->GetDbIdentity(db_id2));
193   ASSERT_EQ(db_id1, db_id2);
194 }
195 
TEST_F(DBBasicTest,CompactedDB)196 TEST_F(DBBasicTest, CompactedDB) {
197   const uint64_t kFileSize = 1 << 20;
198   Options options = CurrentOptions();
199   options.disable_auto_compactions = true;
200   options.write_buffer_size = kFileSize;
201   options.target_file_size_base = kFileSize;
202   options.max_bytes_for_level_base = 1 << 30;
203   options.compression = kNoCompression;
204   Reopen(options);
205   // 1 L0 file, use CompactedDB if max_open_files = -1
206   ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
207   ASSERT_OK(Flush());
208   Close();
209   ASSERT_OK(ReadOnlyReopen(options));
210   Status s = Put("new", "value");
211   ASSERT_EQ(s.ToString(),
212             "Not implemented: Not supported operation in read only mode.");
213   ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
214   Close();
215   options.max_open_files = -1;
216   ASSERT_OK(ReadOnlyReopen(options));
217   s = Put("new", "value");
218   ASSERT_EQ(s.ToString(),
219             "Not implemented: Not supported in compacted db mode.");
220   ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
221   Close();
222   Reopen(options);
223   // Add more L0 files
224   ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
225   ASSERT_OK(Flush());
226   ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
227   ASSERT_OK(Flush());
228   ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
229   ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
230   ASSERT_OK(Flush());
231   Close();
232 
233   ASSERT_OK(ReadOnlyReopen(options));
234   // Fallback to read-only DB
235   s = Put("new", "value");
236   ASSERT_EQ(s.ToString(),
237             "Not implemented: Not supported operation in read only mode.");
238   Close();
239 
240   // Full compaction
241   Reopen(options);
242   // Add more keys
243   ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
244   ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
245   ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
246   ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
247   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
248   ASSERT_EQ(3, NumTableFilesAtLevel(1));
249   Close();
250 
251   // CompactedDB
252   ASSERT_OK(ReadOnlyReopen(options));
253   s = Put("new", "value");
254   ASSERT_EQ(s.ToString(),
255             "Not implemented: Not supported in compacted db mode.");
256   ASSERT_EQ("NOT_FOUND", Get("abc"));
257   ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
258   ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
259   ASSERT_EQ("NOT_FOUND", Get("ccc"));
260   ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
261   ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
262   ASSERT_EQ("NOT_FOUND", Get("ggg"));
263   ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
264   ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
265   ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
266   ASSERT_EQ("NOT_FOUND", Get("kkk"));
267 
268   // MultiGet
269   std::vector<std::string> values;
270   std::vector<Status> status_list = dbfull()->MultiGet(
271       ReadOptions(),
272       std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
273                           Slice("ggg"), Slice("iii"), Slice("kkk")}),
274       &values);
275   ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
276   ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
277   ASSERT_OK(status_list[0]);
278   ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
279   ASSERT_TRUE(status_list[1].IsNotFound());
280   ASSERT_OK(status_list[2]);
281   ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
282   ASSERT_TRUE(status_list[3].IsNotFound());
283   ASSERT_OK(status_list[4]);
284   ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
285   ASSERT_TRUE(status_list[5].IsNotFound());
286 
287   Reopen(options);
288   // Add a key
289   ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
290   Close();
291   ASSERT_OK(ReadOnlyReopen(options));
292   s = Put("new", "value");
293   ASSERT_EQ(s.ToString(),
294             "Not implemented: Not supported operation in read only mode.");
295 }
296 
TEST_F(DBBasicTest,LevelLimitReopen)297 TEST_F(DBBasicTest, LevelLimitReopen) {
298   Options options = CurrentOptions();
299   CreateAndReopenWithCF({"pikachu"}, options);
300 
301   const std::string value(1024 * 1024, ' ');
302   int i = 0;
303   while (NumTableFilesAtLevel(2, 1) == 0) {
304     ASSERT_OK(Put(1, Key(i++), value));
305     ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
306     ASSERT_OK(dbfull()->TEST_WaitForCompact());
307   }
308 
309   options.num_levels = 1;
310   options.max_bytes_for_level_multiplier_additional.resize(1, 1);
311   Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
312   ASSERT_EQ(s.IsInvalidArgument(), true);
313   ASSERT_EQ(s.ToString(),
314             "Invalid argument: db has more levels than options.num_levels");
315 
316   options.num_levels = 10;
317   options.max_bytes_for_level_multiplier_additional.resize(10, 1);
318   ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
319 }
320 #endif  // ROCKSDB_LITE
321 
TEST_F(DBBasicTest,PutDeleteGet)322 TEST_F(DBBasicTest, PutDeleteGet) {
323   do {
324     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
325     ASSERT_OK(Put(1, "foo", "v1"));
326     ASSERT_EQ("v1", Get(1, "foo"));
327     ASSERT_OK(Put(1, "foo", "v2"));
328     ASSERT_EQ("v2", Get(1, "foo"));
329     ASSERT_OK(Delete(1, "foo"));
330     ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
331   } while (ChangeOptions());
332 }
333 
TEST_F(DBBasicTest,PutSingleDeleteGet)334 TEST_F(DBBasicTest, PutSingleDeleteGet) {
335   do {
336     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
337     ASSERT_OK(Put(1, "foo", "v1"));
338     ASSERT_EQ("v1", Get(1, "foo"));
339     ASSERT_OK(Put(1, "foo2", "v2"));
340     ASSERT_EQ("v2", Get(1, "foo2"));
341     ASSERT_OK(SingleDelete(1, "foo"));
342     ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
343     // Ski FIFO and universal compaction because they do not apply to the test
344     // case. Skip MergePut because single delete does not get removed when it
345     // encounters a merge.
346   } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
347                          kSkipMergePut));
348 }
349 
TEST_F(DBBasicTest,EmptyFlush)350 TEST_F(DBBasicTest, EmptyFlush) {
351   // It is possible to produce empty flushes when using single deletes. Tests
352   // whether empty flushes cause issues.
353   do {
354     Random rnd(301);
355 
356     Options options = CurrentOptions();
357     options.disable_auto_compactions = true;
358     CreateAndReopenWithCF({"pikachu"}, options);
359 
360     ASSERT_OK(Put(1, "a", Slice()));
361     ASSERT_OK(SingleDelete(1, "a"));
362     ASSERT_OK(Flush(1));
363 
364     ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
365     // Skip FIFO and  universal compaction as they do not apply to the test
366     // case. Skip MergePut because merges cannot be combined with single
367     // deletions.
368   } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
369                          kSkipMergePut));
370 }
371 
TEST_F(DBBasicTest,GetFromVersions)372 TEST_F(DBBasicTest, GetFromVersions) {
373   do {
374     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
375     ASSERT_OK(Put(1, "foo", "v1"));
376     ASSERT_OK(Flush(1));
377     ASSERT_EQ("v1", Get(1, "foo"));
378     ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
379   } while (ChangeOptions());
380 }
381 
382 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,GetSnapshot)383 TEST_F(DBBasicTest, GetSnapshot) {
384   anon::OptionsOverride options_override;
385   options_override.skip_policy = kSkipNoSnapshot;
386   do {
387     CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
388     // Try with both a short key and a long key
389     for (int i = 0; i < 2; i++) {
390       std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
391       ASSERT_OK(Put(1, key, "v1"));
392       const Snapshot* s1 = db_->GetSnapshot();
393       ASSERT_OK(Put(1, key, "v2"));
394       ASSERT_EQ("v2", Get(1, key));
395       ASSERT_EQ("v1", Get(1, key, s1));
396       ASSERT_OK(Flush(1));
397       ASSERT_EQ("v2", Get(1, key));
398       ASSERT_EQ("v1", Get(1, key, s1));
399       db_->ReleaseSnapshot(s1);
400     }
401   } while (ChangeOptions());
402 }
403 #endif  // ROCKSDB_LITE
404 
TEST_F(DBBasicTest,CheckLock)405 TEST_F(DBBasicTest, CheckLock) {
406   do {
407     DB* localdb = nullptr;
408     Options options = CurrentOptions();
409     ASSERT_OK(TryReopen(options));
410 
411     // second open should fail
412     Status s = DB::Open(options, dbname_, &localdb);
413     ASSERT_NOK(s) << [localdb]() {
414       delete localdb;
415       return "localdb open: ok";
416     }();
417 #ifdef OS_LINUX
418     ASSERT_TRUE(s.ToString().find("lock ") != std::string::npos);
419 #endif  // OS_LINUX
420   } while (ChangeCompactOptions());
421 }
422 
TEST_F(DBBasicTest,FlushMultipleMemtable)423 TEST_F(DBBasicTest, FlushMultipleMemtable) {
424   do {
425     Options options = CurrentOptions();
426     WriteOptions writeOpt = WriteOptions();
427     writeOpt.disableWAL = true;
428     options.max_write_buffer_number = 4;
429     options.min_write_buffer_number_to_merge = 3;
430     options.max_write_buffer_size_to_maintain = -1;
431     CreateAndReopenWithCF({"pikachu"}, options);
432     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
433     ASSERT_OK(Flush(1));
434     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
435 
436     ASSERT_EQ("v1", Get(1, "foo"));
437     ASSERT_EQ("v1", Get(1, "bar"));
438     ASSERT_OK(Flush(1));
439   } while (ChangeCompactOptions());
440 }
441 
TEST_F(DBBasicTest,FlushEmptyColumnFamily)442 TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
443   // Block flush thread and disable compaction thread
444   env_->SetBackgroundThreads(1, Env::HIGH);
445   env_->SetBackgroundThreads(1, Env::LOW);
446   test::SleepingBackgroundTask sleeping_task_low;
447   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
448                  Env::Priority::LOW);
449   test::SleepingBackgroundTask sleeping_task_high;
450   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
451                  &sleeping_task_high, Env::Priority::HIGH);
452 
453   Options options = CurrentOptions();
454   // disable compaction
455   options.disable_auto_compactions = true;
456   WriteOptions writeOpt = WriteOptions();
457   writeOpt.disableWAL = true;
458   options.max_write_buffer_number = 2;
459   options.min_write_buffer_number_to_merge = 1;
460   options.max_write_buffer_size_to_maintain =
461       static_cast<int64_t>(options.write_buffer_size);
462   CreateAndReopenWithCF({"pikachu"}, options);
463 
464   // Compaction can still go through even if no thread can flush the
465   // mem table.
466   ASSERT_OK(Flush(0));
467   ASSERT_OK(Flush(1));
468 
469   // Insert can go through
470   ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
471   ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
472 
473   ASSERT_EQ("v1", Get(0, "foo"));
474   ASSERT_EQ("v1", Get(1, "bar"));
475 
476   sleeping_task_high.WakeUp();
477   sleeping_task_high.WaitUntilDone();
478 
479   // Flush can still go through.
480   ASSERT_OK(Flush(0));
481   ASSERT_OK(Flush(1));
482 
483   sleeping_task_low.WakeUp();
484   sleeping_task_low.WaitUntilDone();
485 }
486 
TEST_F(DBBasicTest,Flush)487 TEST_F(DBBasicTest, Flush) {
488   do {
489     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
490     WriteOptions writeOpt = WriteOptions();
491     writeOpt.disableWAL = true;
492     SetPerfLevel(kEnableTime);
493     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
494     // this will now also flush the last 2 writes
495     ASSERT_OK(Flush(1));
496     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
497 
498     get_perf_context()->Reset();
499     Get(1, "foo");
500     ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
501     ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
502 
503     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
504     ASSERT_EQ("v1", Get(1, "foo"));
505     ASSERT_EQ("v1", Get(1, "bar"));
506 
507     writeOpt.disableWAL = true;
508     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
509     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
510     ASSERT_OK(Flush(1));
511 
512     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
513     ASSERT_EQ("v2", Get(1, "bar"));
514     get_perf_context()->Reset();
515     ASSERT_EQ("v2", Get(1, "foo"));
516     ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
517 
518     writeOpt.disableWAL = false;
519     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
520     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
521     ASSERT_OK(Flush(1));
522 
523     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
524     // 'foo' should be there because its put
525     // has WAL enabled.
526     ASSERT_EQ("v3", Get(1, "foo"));
527     ASSERT_EQ("v3", Get(1, "bar"));
528 
529     SetPerfLevel(kDisable);
530   } while (ChangeCompactOptions());
531 }
532 
TEST_F(DBBasicTest,ManifestRollOver)533 TEST_F(DBBasicTest, ManifestRollOver) {
534   do {
535     Options options;
536     options.max_manifest_file_size = 10;  // 10 bytes
537     options = CurrentOptions(options);
538     CreateAndReopenWithCF({"pikachu"}, options);
539     {
540       ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
541       ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
542       ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
543       uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
544       ASSERT_OK(Flush(1));  // This should trigger LogAndApply.
545       uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
546       ASSERT_GT(manifest_after_flush, manifest_before_flush);
547       ReopenWithColumnFamilies({"default", "pikachu"}, options);
548       ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
549       // check if a new manifest file got inserted or not.
550       ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
551       ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
552       ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
553     }
554   } while (ChangeCompactOptions());
555 }
556 
TEST_F(DBBasicTest,IdentityAcrossRestarts1)557 TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
558   do {
559     std::string id1;
560     ASSERT_OK(db_->GetDbIdentity(id1));
561 
562     Options options = CurrentOptions();
563     Reopen(options);
564     std::string id2;
565     ASSERT_OK(db_->GetDbIdentity(id2));
566     // id1 should match id2 because identity was not regenerated
567     ASSERT_EQ(id1.compare(id2), 0);
568 
569     std::string idfilename = IdentityFileName(dbname_);
570     ASSERT_OK(env_->DeleteFile(idfilename));
571     Reopen(options);
572     std::string id3;
573     ASSERT_OK(db_->GetDbIdentity(id3));
574     if (options.write_dbid_to_manifest) {
575       ASSERT_EQ(id1.compare(id3), 0);
576     } else {
577       // id1 should NOT match id3 because identity was regenerated
578       ASSERT_NE(id1.compare(id3), 0);
579     }
580   } while (ChangeCompactOptions());
581 }
582 
TEST_F(DBBasicTest,IdentityAcrossRestarts2)583 TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
584   do {
585     std::string id1;
586     ASSERT_OK(db_->GetDbIdentity(id1));
587 
588     Options options = CurrentOptions();
589     options.write_dbid_to_manifest = true;
590     Reopen(options);
591     std::string id2;
592     ASSERT_OK(db_->GetDbIdentity(id2));
593     // id1 should match id2 because identity was not regenerated
594     ASSERT_EQ(id1.compare(id2), 0);
595 
596     std::string idfilename = IdentityFileName(dbname_);
597     ASSERT_OK(env_->DeleteFile(idfilename));
598     Reopen(options);
599     std::string id3;
600     ASSERT_OK(db_->GetDbIdentity(id3));
601     // id1 should NOT match id3 because identity was regenerated
602     ASSERT_EQ(id1, id3);
603   } while (ChangeCompactOptions());
604 }
605 
606 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,Snapshot)607 TEST_F(DBBasicTest, Snapshot) {
608   env_->SetMockSleep();
609   anon::OptionsOverride options_override;
610   options_override.skip_policy = kSkipNoSnapshot;
611   do {
612     CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
613     ASSERT_OK(Put(0, "foo", "0v1"));
614     ASSERT_OK(Put(1, "foo", "1v1"));
615 
616     const Snapshot* s1 = db_->GetSnapshot();
617     ASSERT_EQ(1U, GetNumSnapshots());
618     uint64_t time_snap1 = GetTimeOldestSnapshots();
619     ASSERT_GT(time_snap1, 0U);
620     ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
621     ASSERT_OK(Put(0, "foo", "0v2"));
622     ASSERT_OK(Put(1, "foo", "1v2"));
623 
624     env_->MockSleepForSeconds(1);
625 
626     const Snapshot* s2 = db_->GetSnapshot();
627     ASSERT_EQ(2U, GetNumSnapshots());
628     ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
629     ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
630     ASSERT_OK(Put(0, "foo", "0v3"));
631     ASSERT_OK(Put(1, "foo", "1v3"));
632 
633     {
634       ManagedSnapshot s3(db_);
635       ASSERT_EQ(3U, GetNumSnapshots());
636       ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
637       ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
638 
639       ASSERT_OK(Put(0, "foo", "0v4"));
640       ASSERT_OK(Put(1, "foo", "1v4"));
641       ASSERT_EQ("0v1", Get(0, "foo", s1));
642       ASSERT_EQ("1v1", Get(1, "foo", s1));
643       ASSERT_EQ("0v2", Get(0, "foo", s2));
644       ASSERT_EQ("1v2", Get(1, "foo", s2));
645       ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
646       ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
647       ASSERT_EQ("0v4", Get(0, "foo"));
648       ASSERT_EQ("1v4", Get(1, "foo"));
649     }
650 
651     ASSERT_EQ(2U, GetNumSnapshots());
652     ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
653     ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
654     ASSERT_EQ("0v1", Get(0, "foo", s1));
655     ASSERT_EQ("1v1", Get(1, "foo", s1));
656     ASSERT_EQ("0v2", Get(0, "foo", s2));
657     ASSERT_EQ("1v2", Get(1, "foo", s2));
658     ASSERT_EQ("0v4", Get(0, "foo"));
659     ASSERT_EQ("1v4", Get(1, "foo"));
660 
661     db_->ReleaseSnapshot(s1);
662     ASSERT_EQ("0v2", Get(0, "foo", s2));
663     ASSERT_EQ("1v2", Get(1, "foo", s2));
664     ASSERT_EQ("0v4", Get(0, "foo"));
665     ASSERT_EQ("1v4", Get(1, "foo"));
666     ASSERT_EQ(1U, GetNumSnapshots());
667     ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
668     ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
669 
670     db_->ReleaseSnapshot(s2);
671     ASSERT_EQ(0U, GetNumSnapshots());
672     ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
673     ASSERT_EQ("0v4", Get(0, "foo"));
674     ASSERT_EQ("1v4", Get(1, "foo"));
675   } while (ChangeOptions());
676 }
677 
678 #endif  // ROCKSDB_LITE
679 
680 class DBBasicMultiConfigs : public DBBasicTest,
681                             public ::testing::WithParamInterface<int> {
682  public:
DBBasicMultiConfigs()683   DBBasicMultiConfigs() { option_config_ = GetParam(); }
684 
GenerateOptionConfigs()685   static std::vector<int> GenerateOptionConfigs() {
686     std::vector<int> option_configs;
687     for (int option_config = kDefault; option_config < kEnd; ++option_config) {
688       if (!ShouldSkipOptions(option_config, kSkipFIFOCompaction)) {
689         option_configs.push_back(option_config);
690       }
691     }
692     return option_configs;
693   }
694 };
695 
TEST_P(DBBasicMultiConfigs,CompactBetweenSnapshots)696 TEST_P(DBBasicMultiConfigs, CompactBetweenSnapshots) {
697   anon::OptionsOverride options_override;
698   options_override.skip_policy = kSkipNoSnapshot;
699   Options options = CurrentOptions(options_override);
700   options.disable_auto_compactions = true;
701   DestroyAndReopen(options);
702   CreateAndReopenWithCF({"pikachu"}, options);
703   Random rnd(301);
704   FillLevels("a", "z", 1);
705 
706   ASSERT_OK(Put(1, "foo", "first"));
707   const Snapshot* snapshot1 = db_->GetSnapshot();
708   ASSERT_OK(Put(1, "foo", "second"));
709   ASSERT_OK(Put(1, "foo", "third"));
710   ASSERT_OK(Put(1, "foo", "fourth"));
711   const Snapshot* snapshot2 = db_->GetSnapshot();
712   ASSERT_OK(Put(1, "foo", "fifth"));
713   ASSERT_OK(Put(1, "foo", "sixth"));
714 
715   // All entries (including duplicates) exist
716   // before any compaction or flush is triggered.
717   ASSERT_EQ(AllEntriesFor("foo", 1),
718             "[ sixth, fifth, fourth, third, second, first ]");
719   ASSERT_EQ("sixth", Get(1, "foo"));
720   ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
721   ASSERT_EQ("first", Get(1, "foo", snapshot1));
722 
723   // After a flush, "second", "third" and "fifth" should
724   // be removed
725   ASSERT_OK(Flush(1));
726   ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
727 
728   // after we release the snapshot1, only two values left
729   db_->ReleaseSnapshot(snapshot1);
730   FillLevels("a", "z", 1);
731   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
732                                    nullptr));
733 
734   // We have only one valid snapshot snapshot2. Since snapshot1 is
735   // not valid anymore, "first" should be removed by a compaction.
736   ASSERT_EQ("sixth", Get(1, "foo"));
737   ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
738   ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
739 
740   // after we release the snapshot2, only one value should be left
741   db_->ReleaseSnapshot(snapshot2);
742   FillLevels("a", "z", 1);
743   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
744                                    nullptr));
745   ASSERT_EQ("sixth", Get(1, "foo"));
746   ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
747 }
748 
749 INSTANTIATE_TEST_CASE_P(
750     DBBasicMultiConfigs, DBBasicMultiConfigs,
751     ::testing::ValuesIn(DBBasicMultiConfigs::GenerateOptionConfigs()));
752 
TEST_F(DBBasicTest,DBOpen_Options)753 TEST_F(DBBasicTest, DBOpen_Options) {
754   Options options = CurrentOptions();
755   Close();
756   Destroy(options);
757 
758   // Does not exist, and create_if_missing == false: error
759   DB* db = nullptr;
760   options.create_if_missing = false;
761   Status s = DB::Open(options, dbname_, &db);
762   ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
763   ASSERT_TRUE(db == nullptr);
764 
765   // Does not exist, and create_if_missing == true: OK
766   options.create_if_missing = true;
767   s = DB::Open(options, dbname_, &db);
768   ASSERT_OK(s);
769   ASSERT_TRUE(db != nullptr);
770 
771   delete db;
772   db = nullptr;
773 
774   // Does exist, and error_if_exists == true: error
775   options.create_if_missing = false;
776   options.error_if_exists = true;
777   s = DB::Open(options, dbname_, &db);
778   ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
779   ASSERT_TRUE(db == nullptr);
780 
781   // Does exist, and error_if_exists == false: OK
782   options.create_if_missing = true;
783   options.error_if_exists = false;
784   s = DB::Open(options, dbname_, &db);
785   ASSERT_OK(s);
786   ASSERT_TRUE(db != nullptr);
787 
788   delete db;
789   db = nullptr;
790 }
791 
TEST_F(DBBasicTest,CompactOnFlush)792 TEST_F(DBBasicTest, CompactOnFlush) {
793   anon::OptionsOverride options_override;
794   options_override.skip_policy = kSkipNoSnapshot;
795   do {
796     Options options = CurrentOptions(options_override);
797     options.disable_auto_compactions = true;
798     CreateAndReopenWithCF({"pikachu"}, options);
799 
800     ASSERT_OK(Put(1, "foo", "v1"));
801     ASSERT_OK(Flush(1));
802     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
803 
804     // Write two new keys
805     ASSERT_OK(Put(1, "a", "begin"));
806     ASSERT_OK(Put(1, "z", "end"));
807     ASSERT_OK(Flush(1));
808 
809     // Case1: Delete followed by a put
810     ASSERT_OK(Delete(1, "foo"));
811     ASSERT_OK(Put(1, "foo", "v2"));
812     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
813 
814     // After the current memtable is flushed, the DEL should
815     // have been removed
816     ASSERT_OK(Flush(1));
817     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
818 
819     ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
820                                      nullptr, nullptr));
821     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
822 
823     // Case 2: Delete followed by another delete
824     ASSERT_OK(Delete(1, "foo"));
825     ASSERT_OK(Delete(1, "foo"));
826     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
827     ASSERT_OK(Flush(1));
828     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
829     ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
830                                      nullptr, nullptr));
831     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
832 
833     // Case 3: Put followed by a delete
834     ASSERT_OK(Put(1, "foo", "v3"));
835     ASSERT_OK(Delete(1, "foo"));
836     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
837     ASSERT_OK(Flush(1));
838     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
839     ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
840                                      nullptr, nullptr));
841     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
842 
843     // Case 4: Put followed by another Put
844     ASSERT_OK(Put(1, "foo", "v4"));
845     ASSERT_OK(Put(1, "foo", "v5"));
846     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
847     ASSERT_OK(Flush(1));
848     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
849     ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
850                                      nullptr, nullptr));
851     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
852 
853     // clear database
854     ASSERT_OK(Delete(1, "foo"));
855     ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
856                                      nullptr, nullptr));
857     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
858 
859     // Case 5: Put followed by snapshot followed by another Put
860     // Both puts should remain.
861     ASSERT_OK(Put(1, "foo", "v6"));
862     const Snapshot* snapshot = db_->GetSnapshot();
863     ASSERT_OK(Put(1, "foo", "v7"));
864     ASSERT_OK(Flush(1));
865     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
866     db_->ReleaseSnapshot(snapshot);
867 
868     // clear database
869     ASSERT_OK(Delete(1, "foo"));
870     ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
871                                      nullptr, nullptr));
872     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
873 
874     // Case 5: snapshot followed by a put followed by another Put
875     // Only the last put should remain.
876     const Snapshot* snapshot1 = db_->GetSnapshot();
877     ASSERT_OK(Put(1, "foo", "v8"));
878     ASSERT_OK(Put(1, "foo", "v9"));
879     ASSERT_OK(Flush(1));
880     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
881     db_->ReleaseSnapshot(snapshot1);
882   } while (ChangeCompactOptions());
883 }
884 
TEST_F(DBBasicTest,FlushOneColumnFamily)885 TEST_F(DBBasicTest, FlushOneColumnFamily) {
886   Options options = CurrentOptions();
887   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
888                          "alyosha", "popovich"},
889                         options);
890 
891   ASSERT_OK(Put(0, "Default", "Default"));
892   ASSERT_OK(Put(1, "pikachu", "pikachu"));
893   ASSERT_OK(Put(2, "ilya", "ilya"));
894   ASSERT_OK(Put(3, "muromec", "muromec"));
895   ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
896   ASSERT_OK(Put(5, "nikitich", "nikitich"));
897   ASSERT_OK(Put(6, "alyosha", "alyosha"));
898   ASSERT_OK(Put(7, "popovich", "popovich"));
899 
900   for (int i = 0; i < 8; ++i) {
901     ASSERT_OK(Flush(i));
902     auto tables = ListTableFiles(env_, dbname_);
903     ASSERT_EQ(tables.size(), i + 1U);
904   }
905 }
906 
TEST_F(DBBasicTest,MultiGetSimple)907 TEST_F(DBBasicTest, MultiGetSimple) {
908   do {
909     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
910     SetPerfLevel(kEnableCount);
911     ASSERT_OK(Put(1, "k1", "v1"));
912     ASSERT_OK(Put(1, "k2", "v2"));
913     ASSERT_OK(Put(1, "k3", "v3"));
914     ASSERT_OK(Put(1, "k4", "v4"));
915     ASSERT_OK(Delete(1, "k4"));
916     ASSERT_OK(Put(1, "k5", "v5"));
917     ASSERT_OK(Delete(1, "no_key"));
918 
919     std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
920 
921     std::vector<std::string> values(20, "Temporary data to be overwritten");
922     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
923 
924     get_perf_context()->Reset();
925     std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
926     ASSERT_EQ(values.size(), keys.size());
927     ASSERT_EQ(values[0], "v1");
928     ASSERT_EQ(values[1], "v2");
929     ASSERT_EQ(values[2], "v3");
930     ASSERT_EQ(values[4], "v5");
931     // four kv pairs * two bytes per value
932     ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
933 
934     ASSERT_OK(s[0]);
935     ASSERT_OK(s[1]);
936     ASSERT_OK(s[2]);
937     ASSERT_TRUE(s[3].IsNotFound());
938     ASSERT_OK(s[4]);
939     ASSERT_TRUE(s[5].IsNotFound());
940     SetPerfLevel(kDisable);
941   } while (ChangeCompactOptions());
942 }
943 
TEST_F(DBBasicTest,MultiGetEmpty)944 TEST_F(DBBasicTest, MultiGetEmpty) {
945   do {
946     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
947     // Empty Key Set
948     std::vector<Slice> keys;
949     std::vector<std::string> values;
950     std::vector<ColumnFamilyHandle*> cfs;
951     std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
952     ASSERT_EQ(s.size(), 0U);
953 
954     // Empty Database, Empty Key Set
955     Options options = CurrentOptions();
956     options.create_if_missing = true;
957     DestroyAndReopen(options);
958     CreateAndReopenWithCF({"pikachu"}, options);
959     s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
960     ASSERT_EQ(s.size(), 0U);
961 
962     // Empty Database, Search for Keys
963     keys.resize(2);
964     keys[0] = "a";
965     keys[1] = "b";
966     cfs.push_back(handles_[0]);
967     cfs.push_back(handles_[1]);
968     s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
969     ASSERT_EQ(static_cast<int>(s.size()), 2);
970     ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
971   } while (ChangeCompactOptions());
972 }
973 
TEST_F(DBBasicTest,ChecksumTest)974 TEST_F(DBBasicTest, ChecksumTest) {
975   BlockBasedTableOptions table_options;
976   Options options = CurrentOptions();
977   // change when new checksum type added
978   int max_checksum = static_cast<int>(kxxHash64);
979   const int kNumPerFile = 2;
980 
981   // generate one table with each type of checksum
982   for (int i = 0; i <= max_checksum; ++i) {
983     table_options.checksum = static_cast<ChecksumType>(i);
984     options.table_factory.reset(NewBlockBasedTableFactory(table_options));
985     Reopen(options);
986     for (int j = 0; j < kNumPerFile; ++j) {
987       ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
988     }
989     ASSERT_OK(Flush());
990   }
991 
992   // with each valid checksum type setting...
993   for (int i = 0; i <= max_checksum; ++i) {
994     table_options.checksum = static_cast<ChecksumType>(i);
995     options.table_factory.reset(NewBlockBasedTableFactory(table_options));
996     Reopen(options);
997     // verify every type of checksum (should be regardless of that setting)
998     for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
999       ASSERT_EQ(Key(j), Get(Key(j)));
1000     }
1001   }
1002 }
1003 
1004 // On Windows you can have either memory mapped file or a file
1005 // with unbuffered access. So this asserts and does not make
1006 // sense to run
1007 #ifndef OS_WIN
TEST_F(DBBasicTest,MmapAndBufferOptions)1008 TEST_F(DBBasicTest, MmapAndBufferOptions) {
1009   if (!IsMemoryMappedAccessSupported()) {
1010     return;
1011   }
1012   Options options = CurrentOptions();
1013 
1014   options.use_direct_reads = true;
1015   options.allow_mmap_reads = true;
1016   ASSERT_NOK(TryReopen(options));
1017 
1018   // All other combinations are acceptable
1019   options.use_direct_reads = false;
1020   ASSERT_OK(TryReopen(options));
1021 
1022   if (IsDirectIOSupported()) {
1023     options.use_direct_reads = true;
1024     options.allow_mmap_reads = false;
1025     ASSERT_OK(TryReopen(options));
1026   }
1027 
1028   options.use_direct_reads = false;
1029   ASSERT_OK(TryReopen(options));
1030 }
1031 #endif
1032 
1033 class TestEnv : public EnvWrapper {
1034  public:
TestEnv(Env * base_env)1035   explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
1036 
1037   class TestLogger : public Logger {
1038    public:
1039     using Logger::Logv;
TestLogger(TestEnv * env_ptr)1040     explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger()1041     ~TestLogger() override {
1042       if (!closed_) {
1043         CloseHelper().PermitUncheckedError();
1044       }
1045     }
Logv(const char *,va_list)1046     void Logv(const char* /*format*/, va_list /*ap*/) override {}
1047 
1048    protected:
CloseImpl()1049     Status CloseImpl() override { return CloseHelper(); }
1050 
1051    private:
CloseHelper()1052     Status CloseHelper() {
1053       env->CloseCountInc();
1054       ;
1055       return Status::IOError();
1056     }
1057     TestEnv* env;
1058   };
1059 
CloseCountInc()1060   void CloseCountInc() { close_count++; }
1061 
GetCloseCount()1062   int GetCloseCount() { return close_count; }
1063 
NewLogger(const std::string &,std::shared_ptr<Logger> * result)1064   Status NewLogger(const std::string& /*fname*/,
1065                    std::shared_ptr<Logger>* result) override {
1066     result->reset(new TestLogger(this));
1067     return Status::OK();
1068   }
1069 
1070  private:
1071   int close_count;
1072 };
1073 
TEST_F(DBBasicTest,DBClose)1074 TEST_F(DBBasicTest, DBClose) {
1075   Options options = GetDefaultOptions();
1076   std::string dbname = test::PerThreadDBPath("db_close_test");
1077   ASSERT_OK(DestroyDB(dbname, options));
1078 
1079   DB* db = nullptr;
1080   TestEnv* env = new TestEnv(env_);
1081   std::unique_ptr<TestEnv> local_env_guard(env);
1082   options.create_if_missing = true;
1083   options.env = env;
1084   Status s = DB::Open(options, dbname, &db);
1085   ASSERT_OK(s);
1086   ASSERT_TRUE(db != nullptr);
1087 
1088   s = db->Close();
1089   ASSERT_EQ(env->GetCloseCount(), 1);
1090   ASSERT_EQ(s, Status::IOError());
1091 
1092   delete db;
1093   ASSERT_EQ(env->GetCloseCount(), 1);
1094 
1095   // Do not call DB::Close() and ensure our logger Close() still gets called
1096   s = DB::Open(options, dbname, &db);
1097   ASSERT_OK(s);
1098   ASSERT_TRUE(db != nullptr);
1099   delete db;
1100   ASSERT_EQ(env->GetCloseCount(), 2);
1101 
1102   // Provide our own logger and ensure DB::Close() does not close it
1103   options.info_log.reset(new TestEnv::TestLogger(env));
1104   options.create_if_missing = false;
1105   s = DB::Open(options, dbname, &db);
1106   ASSERT_OK(s);
1107   ASSERT_TRUE(db != nullptr);
1108 
1109   s = db->Close();
1110   ASSERT_EQ(s, Status::OK());
1111   delete db;
1112   ASSERT_EQ(env->GetCloseCount(), 2);
1113   options.info_log.reset();
1114   ASSERT_EQ(env->GetCloseCount(), 3);
1115 }
1116 
TEST_F(DBBasicTest,DBCloseFlushError)1117 TEST_F(DBBasicTest, DBCloseFlushError) {
1118   std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
1119       new FaultInjectionTestEnv(env_));
1120   Options options = GetDefaultOptions();
1121   options.create_if_missing = true;
1122   options.manual_wal_flush = true;
1123   options.write_buffer_size = 100;
1124   options.env = fault_injection_env.get();
1125 
1126   Reopen(options);
1127   ASSERT_OK(Put("key1", "value1"));
1128   ASSERT_OK(Put("key2", "value2"));
1129   ASSERT_OK(dbfull()->TEST_SwitchMemtable());
1130   ASSERT_OK(Put("key3", "value3"));
1131   fault_injection_env->SetFilesystemActive(false);
1132   Status s = dbfull()->Close();
1133   ASSERT_NE(s, Status::OK());
1134   // retry should return the same error
1135   s = dbfull()->Close();
1136   ASSERT_NE(s, Status::OK());
1137   fault_injection_env->SetFilesystemActive(true);
1138   // retry close() is no-op even the system is back. Could be improved if
1139   // Close() is retry-able: #9029
1140   s = dbfull()->Close();
1141   ASSERT_NE(s, Status::OK());
1142   Destroy(options);
1143 }
1144 
1145 class DBMultiGetTestWithParam : public DBBasicTest,
1146                                 public testing::WithParamInterface<bool> {};
1147 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCF)1148 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
1149   Options options = CurrentOptions();
1150   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1151                          "alyosha", "popovich"},
1152                         options);
1153   // <CF, key, value> tuples
1154   std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
1155   static const int num_keys = 24;
1156   cf_kv_vec.reserve(num_keys);
1157 
1158   for (int i = 0; i < num_keys; ++i) {
1159     int cf = i / 3;
1160     int cf_key = 1 % 3;
1161     cf_kv_vec.emplace_back(std::make_tuple(
1162         cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
1163         "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
1164     ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1165                   std::get<2>(cf_kv_vec[i])));
1166   }
1167 
1168   int get_sv_count = 0;
1169   ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
1170   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1171       "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1172         if (++get_sv_count == 2) {
1173           // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
1174           // is forced to repeat the process
1175           for (int i = 0; i < num_keys; ++i) {
1176             int cf = i / 3;
1177             int cf_key = i % 8;
1178             if (cf_key == 0) {
1179               ASSERT_OK(Flush(cf));
1180             }
1181             ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1182                           std::get<2>(cf_kv_vec[i]) + "_2"));
1183           }
1184         }
1185         if (get_sv_count == 11) {
1186           for (int i = 0; i < 8; ++i) {
1187             auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
1188                             db->GetColumnFamilyHandle(i))
1189                             ->cfd();
1190             ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1191           }
1192         }
1193       });
1194   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1195 
1196   std::vector<int> cfs;
1197   std::vector<std::string> keys;
1198   std::vector<std::string> values;
1199 
1200   for (int i = 0; i < num_keys; ++i) {
1201     cfs.push_back(std::get<0>(cf_kv_vec[i]));
1202     keys.push_back(std::get<1>(cf_kv_vec[i]));
1203   }
1204 
1205   values = MultiGet(cfs, keys, nullptr, GetParam());
1206   ASSERT_EQ(values.size(), num_keys);
1207   for (unsigned int j = 0; j < values.size(); ++j) {
1208     ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
1209   }
1210 
1211   keys.clear();
1212   cfs.clear();
1213   cfs.push_back(std::get<0>(cf_kv_vec[0]));
1214   keys.push_back(std::get<1>(cf_kv_vec[0]));
1215   cfs.push_back(std::get<0>(cf_kv_vec[3]));
1216   keys.push_back(std::get<1>(cf_kv_vec[3]));
1217   cfs.push_back(std::get<0>(cf_kv_vec[4]));
1218   keys.push_back(std::get<1>(cf_kv_vec[4]));
1219   values = MultiGet(cfs, keys, nullptr, GetParam());
1220   ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
1221   ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
1222   ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
1223 
1224   keys.clear();
1225   cfs.clear();
1226   cfs.push_back(std::get<0>(cf_kv_vec[7]));
1227   keys.push_back(std::get<1>(cf_kv_vec[7]));
1228   cfs.push_back(std::get<0>(cf_kv_vec[6]));
1229   keys.push_back(std::get<1>(cf_kv_vec[6]));
1230   cfs.push_back(std::get<0>(cf_kv_vec[1]));
1231   keys.push_back(std::get<1>(cf_kv_vec[1]));
1232   values = MultiGet(cfs, keys, nullptr, GetParam());
1233   ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
1234   ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
1235   ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
1236 
1237   for (int cf = 0; cf < 8; ++cf) {
1238     auto* cfd =
1239         static_cast_with_check<ColumnFamilyHandleImpl>(
1240             static_cast_with_check<DBImpl>(db_)->GetColumnFamilyHandle(cf))
1241             ->cfd();
1242     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1243     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
1244   }
1245 }
1246 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFMutex)1247 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
1248   Options options = CurrentOptions();
1249   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1250                          "alyosha", "popovich"},
1251                         options);
1252 
1253   for (int i = 0; i < 8; ++i) {
1254     ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1255                   "cf" + std::to_string(i) + "_val"));
1256   }
1257 
1258   int get_sv_count = 0;
1259   int retries = 0;
1260   bool last_try = false;
1261   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1262       "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
1263         last_try = true;
1264         ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1265       });
1266   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1267       "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1268         if (last_try) {
1269           return;
1270         }
1271         if (++get_sv_count == 2) {
1272           ++retries;
1273           get_sv_count = 0;
1274           for (int i = 0; i < 8; ++i) {
1275             ASSERT_OK(Flush(i));
1276             ASSERT_OK(Put(
1277                 i, "cf" + std::to_string(i) + "_key",
1278                 "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
1279           }
1280         }
1281       });
1282   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1283 
1284   std::vector<int> cfs;
1285   std::vector<std::string> keys;
1286   std::vector<std::string> values;
1287 
1288   for (int i = 0; i < 8; ++i) {
1289     cfs.push_back(i);
1290     keys.push_back("cf" + std::to_string(i) + "_key");
1291   }
1292 
1293   values = MultiGet(cfs, keys, nullptr, GetParam());
1294   ASSERT_TRUE(last_try);
1295   ASSERT_EQ(values.size(), 8);
1296   for (unsigned int j = 0; j < values.size(); ++j) {
1297     ASSERT_EQ(values[j],
1298               "cf" + std::to_string(j) + "_val" + std::to_string(retries));
1299   }
1300   for (int i = 0; i < 8; ++i) {
1301     auto* cfd =
1302         static_cast_with_check<ColumnFamilyHandleImpl>(
1303             static_cast_with_check<DBImpl>(db_)->GetColumnFamilyHandle(i))
1304             ->cfd();
1305     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1306   }
1307 }
1308 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFSnapshot)1309 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
1310   Options options = CurrentOptions();
1311   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1312                          "alyosha", "popovich"},
1313                         options);
1314 
1315   for (int i = 0; i < 8; ++i) {
1316     ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1317                   "cf" + std::to_string(i) + "_val"));
1318   }
1319 
1320   int get_sv_count = 0;
1321   ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
1322   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1323       "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1324         if (++get_sv_count == 2) {
1325           for (int i = 0; i < 8; ++i) {
1326             ASSERT_OK(Flush(i));
1327             ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1328                           "cf" + std::to_string(i) + "_val2"));
1329           }
1330         }
1331         if (get_sv_count == 8) {
1332           for (int i = 0; i < 8; ++i) {
1333             auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
1334                             db->GetColumnFamilyHandle(i))
1335                             ->cfd();
1336             ASSERT_TRUE(
1337                 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
1338                 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
1339           }
1340         }
1341       });
1342   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1343 
1344   std::vector<int> cfs;
1345   std::vector<std::string> keys;
1346   std::vector<std::string> values;
1347 
1348   for (int i = 0; i < 8; ++i) {
1349     cfs.push_back(i);
1350     keys.push_back("cf" + std::to_string(i) + "_key");
1351   }
1352 
1353   const Snapshot* snapshot = db_->GetSnapshot();
1354   values = MultiGet(cfs, keys, snapshot, GetParam());
1355   db_->ReleaseSnapshot(snapshot);
1356   ASSERT_EQ(values.size(), 8);
1357   for (unsigned int j = 0; j < values.size(); ++j) {
1358     ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
1359   }
1360   for (int i = 0; i < 8; ++i) {
1361     auto* cfd =
1362         static_cast_with_check<ColumnFamilyHandleImpl>(
1363             static_cast_with_check<DBImpl>(db_)->GetColumnFamilyHandle(i))
1364             ->cfd();
1365     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1366   }
1367 }
1368 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFUnsorted)1369 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFUnsorted) {
1370   Options options = CurrentOptions();
1371   CreateAndReopenWithCF({"one", "two"}, options);
1372 
1373   ASSERT_OK(Put(1, "foo", "bar"));
1374   ASSERT_OK(Put(2, "baz", "xyz"));
1375   ASSERT_OK(Put(1, "abc", "def"));
1376 
1377   // Note: keys for the same CF do not form a consecutive range
1378   std::vector<int> cfs{1, 2, 1};
1379   std::vector<std::string> keys{"foo", "baz", "abc"};
1380   std::vector<std::string> values;
1381 
1382   values =
1383       MultiGet(cfs, keys, /* snapshot */ nullptr, /* batched */ GetParam());
1384 
1385   ASSERT_EQ(values.size(), 3);
1386   ASSERT_EQ(values[0], "bar");
1387   ASSERT_EQ(values[1], "xyz");
1388   ASSERT_EQ(values[2], "def");
1389 }
1390 
1391 INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
1392                         testing::Bool());
1393 
TEST_F(DBBasicTest,MultiGetBatchedSimpleUnsorted)1394 TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
1395   do {
1396     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1397     SetPerfLevel(kEnableCount);
1398     ASSERT_OK(Put(1, "k1", "v1"));
1399     ASSERT_OK(Put(1, "k2", "v2"));
1400     ASSERT_OK(Put(1, "k3", "v3"));
1401     ASSERT_OK(Put(1, "k4", "v4"));
1402     ASSERT_OK(Delete(1, "k4"));
1403     ASSERT_OK(Put(1, "k5", "v5"));
1404     ASSERT_OK(Delete(1, "no_key"));
1405 
1406     get_perf_context()->Reset();
1407 
1408     std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
1409     std::vector<PinnableSlice> values(keys.size());
1410     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1411     std::vector<Status> s(keys.size());
1412 
1413     db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1414                   values.data(), s.data(), false);
1415 
1416     ASSERT_EQ(values.size(), keys.size());
1417     ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
1418     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
1419     ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1420     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1421     // four kv pairs * two bytes per value
1422     ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1423 
1424     ASSERT_TRUE(s[0].IsNotFound());
1425     ASSERT_OK(s[1]);
1426     ASSERT_TRUE(s[2].IsNotFound());
1427     ASSERT_OK(s[3]);
1428     ASSERT_OK(s[4]);
1429     ASSERT_OK(s[5]);
1430 
1431     SetPerfLevel(kDisable);
1432   } while (ChangeCompactOptions());
1433 }
1434 
TEST_F(DBBasicTest,MultiGetBatchedSortedMultiFile)1435 TEST_F(DBBasicTest, MultiGetBatchedSortedMultiFile) {
1436   do {
1437     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1438     SetPerfLevel(kEnableCount);
1439     // To expand the power of this test, generate > 1 table file and
1440     // mix with memtable
1441     ASSERT_OK(Put(1, "k1", "v1"));
1442     ASSERT_OK(Put(1, "k2", "v2"));
1443     ASSERT_OK(Flush(1));
1444     ASSERT_OK(Put(1, "k3", "v3"));
1445     ASSERT_OK(Put(1, "k4", "v4"));
1446     ASSERT_OK(Flush(1));
1447     ASSERT_OK(Delete(1, "k4"));
1448     ASSERT_OK(Put(1, "k5", "v5"));
1449     ASSERT_OK(Delete(1, "no_key"));
1450 
1451     get_perf_context()->Reset();
1452 
1453     std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
1454     std::vector<PinnableSlice> values(keys.size());
1455     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1456     std::vector<Status> s(keys.size());
1457 
1458     db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1459                   values.data(), s.data(), true);
1460 
1461     ASSERT_EQ(values.size(), keys.size());
1462     ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
1463     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
1464     ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1465     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
1466     // four kv pairs * two bytes per value
1467     ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1468 
1469     ASSERT_OK(s[0]);
1470     ASSERT_OK(s[1]);
1471     ASSERT_OK(s[2]);
1472     ASSERT_TRUE(s[3].IsNotFound());
1473     ASSERT_OK(s[4]);
1474     ASSERT_TRUE(s[5].IsNotFound());
1475 
1476     SetPerfLevel(kDisable);
1477   } while (ChangeOptions());
1478 }
1479 
TEST_F(DBBasicTest,MultiGetBatchedDuplicateKeys)1480 TEST_F(DBBasicTest, MultiGetBatchedDuplicateKeys) {
1481   Options opts = CurrentOptions();
1482   opts.merge_operator = MergeOperators::CreateStringAppendOperator();
1483   CreateAndReopenWithCF({"pikachu"}, opts);
1484   SetPerfLevel(kEnableCount);
1485   // To expand the power of this test, generate > 1 table file and
1486   // mix with memtable
1487   ASSERT_OK(Merge(1, "k1", "v1"));
1488   ASSERT_OK(Merge(1, "k2", "v2"));
1489   ASSERT_OK(Flush(1));
1490   MoveFilesToLevel(2, 1);
1491   ASSERT_OK(Merge(1, "k3", "v3"));
1492   ASSERT_OK(Merge(1, "k4", "v4"));
1493   ASSERT_OK(Flush(1));
1494   MoveFilesToLevel(2, 1);
1495   ASSERT_OK(Merge(1, "k4", "v4_2"));
1496   ASSERT_OK(Merge(1, "k6", "v6"));
1497   ASSERT_OK(Flush(1));
1498   MoveFilesToLevel(2, 1);
1499   ASSERT_OK(Merge(1, "k7", "v7"));
1500   ASSERT_OK(Merge(1, "k8", "v8"));
1501   ASSERT_OK(Flush(1));
1502   MoveFilesToLevel(2, 1);
1503 
1504   get_perf_context()->Reset();
1505 
1506   std::vector<Slice> keys({"k8", "k8", "k8", "k4", "k4", "k1", "k3"});
1507   std::vector<PinnableSlice> values(keys.size());
1508   std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1509   std::vector<Status> s(keys.size());
1510 
1511   db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1512                 values.data(), s.data(), false);
1513 
1514   ASSERT_EQ(values.size(), keys.size());
1515   ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v8");
1516   ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v8");
1517   ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v8");
1518   ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v4,v4_2");
1519   ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v4,v4_2");
1520   ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
1521   ASSERT_EQ(std::string(values[6].data(), values[6].size()), "v3");
1522   ASSERT_EQ(24, (int)get_perf_context()->multiget_read_bytes);
1523 
1524   for (Status& status : s) {
1525     ASSERT_OK(status);
1526   }
1527 
1528   SetPerfLevel(kDisable);
1529 }
1530 
TEST_F(DBBasicTest,MultiGetBatchedMultiLevel)1531 TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
1532   Options options = CurrentOptions();
1533   options.disable_auto_compactions = true;
1534   Reopen(options);
1535   int num_keys = 0;
1536 
1537   for (int i = 0; i < 128; ++i) {
1538     ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1539     num_keys++;
1540     if (num_keys == 8) {
1541       ASSERT_OK(Flush());
1542       num_keys = 0;
1543     }
1544   }
1545   if (num_keys > 0) {
1546     ASSERT_OK(Flush());
1547     num_keys = 0;
1548   }
1549   MoveFilesToLevel(2);
1550 
1551   for (int i = 0; i < 128; i += 3) {
1552     ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1553     num_keys++;
1554     if (num_keys == 8) {
1555       ASSERT_OK(Flush());
1556       num_keys = 0;
1557     }
1558   }
1559   if (num_keys > 0) {
1560     ASSERT_OK(Flush());
1561     num_keys = 0;
1562   }
1563   MoveFilesToLevel(1);
1564 
1565   for (int i = 0; i < 128; i += 5) {
1566     ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1567     num_keys++;
1568     if (num_keys == 8) {
1569       ASSERT_OK(Flush());
1570       num_keys = 0;
1571     }
1572   }
1573   if (num_keys > 0) {
1574     ASSERT_OK(Flush());
1575     num_keys = 0;
1576   }
1577   ASSERT_EQ(0, num_keys);
1578 
1579   for (int i = 0; i < 128; i += 9) {
1580     ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1581   }
1582 
1583   std::vector<std::string> keys;
1584   std::vector<std::string> values;
1585 
1586   for (int i = 64; i < 80; ++i) {
1587     keys.push_back("key_" + std::to_string(i));
1588   }
1589 
1590   values = MultiGet(keys, nullptr);
1591   ASSERT_EQ(values.size(), 16);
1592   for (unsigned int j = 0; j < values.size(); ++j) {
1593     int key = j + 64;
1594     if (key % 9 == 0) {
1595       ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
1596     } else if (key % 5 == 0) {
1597       ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
1598     } else if (key % 3 == 0) {
1599       ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
1600     } else {
1601       ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
1602     }
1603   }
1604 }
1605 
TEST_F(DBBasicTest,MultiGetBatchedMultiLevelMerge)1606 TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
1607   Options options = CurrentOptions();
1608   options.disable_auto_compactions = true;
1609   options.merge_operator = MergeOperators::CreateStringAppendOperator();
1610   BlockBasedTableOptions bbto;
1611   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1612   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1613   Reopen(options);
1614   int num_keys = 0;
1615 
1616   for (int i = 0; i < 128; ++i) {
1617     ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1618     num_keys++;
1619     if (num_keys == 8) {
1620       ASSERT_OK(Flush());
1621       num_keys = 0;
1622     }
1623   }
1624   if (num_keys > 0) {
1625     ASSERT_OK(Flush());
1626     num_keys = 0;
1627   }
1628   MoveFilesToLevel(2);
1629 
1630   for (int i = 0; i < 128; i += 3) {
1631     ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1632     num_keys++;
1633     if (num_keys == 8) {
1634       ASSERT_OK(Flush());
1635       num_keys = 0;
1636     }
1637   }
1638   if (num_keys > 0) {
1639     ASSERT_OK(Flush());
1640     num_keys = 0;
1641   }
1642   MoveFilesToLevel(1);
1643 
1644   for (int i = 0; i < 128; i += 5) {
1645     ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1646     num_keys++;
1647     if (num_keys == 8) {
1648       ASSERT_OK(Flush());
1649       num_keys = 0;
1650     }
1651   }
1652   if (num_keys > 0) {
1653     ASSERT_OK(Flush());
1654     num_keys = 0;
1655   }
1656   ASSERT_EQ(0, num_keys);
1657 
1658   for (int i = 0; i < 128; i += 9) {
1659     ASSERT_OK(
1660         Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1661   }
1662 
1663   std::vector<std::string> keys;
1664   std::vector<std::string> values;
1665 
1666   for (int i = 32; i < 80; ++i) {
1667     keys.push_back("key_" + std::to_string(i));
1668   }
1669 
1670   values = MultiGet(keys, nullptr);
1671   ASSERT_EQ(values.size(), keys.size());
1672   for (unsigned int j = 0; j < 48; ++j) {
1673     int key = j + 32;
1674     std::string value;
1675     value.append("val_l2_" + std::to_string(key));
1676     if (key % 3 == 0) {
1677       value.append(",");
1678       value.append("val_l1_" + std::to_string(key));
1679     }
1680     if (key % 5 == 0) {
1681       value.append(",");
1682       value.append("val_l0_" + std::to_string(key));
1683     }
1684     if (key % 9 == 0) {
1685       value.append(",");
1686       value.append("val_mem_" + std::to_string(key));
1687     }
1688     ASSERT_EQ(values[j], value);
1689   }
1690 }
1691 
TEST_F(DBBasicTest,MultiGetBatchedValueSizeInMemory)1692 TEST_F(DBBasicTest, MultiGetBatchedValueSizeInMemory) {
1693   CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1694   SetPerfLevel(kEnableCount);
1695   ASSERT_OK(Put(1, "k1", "v_1"));
1696   ASSERT_OK(Put(1, "k2", "v_2"));
1697   ASSERT_OK(Put(1, "k3", "v_3"));
1698   ASSERT_OK(Put(1, "k4", "v_4"));
1699   ASSERT_OK(Put(1, "k5", "v_5"));
1700   ASSERT_OK(Put(1, "k6", "v_6"));
1701   std::vector<Slice> keys = {"k1", "k2", "k3", "k4", "k5", "k6"};
1702   std::vector<PinnableSlice> values(keys.size());
1703   std::vector<Status> s(keys.size());
1704   std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1705 
1706   get_perf_context()->Reset();
1707   ReadOptions ro;
1708   ro.value_size_soft_limit = 11;
1709   db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
1710                 s.data(), false);
1711 
1712   ASSERT_EQ(values.size(), keys.size());
1713   for (unsigned int i = 0; i < 4; i++) {
1714     ASSERT_EQ(std::string(values[i].data(), values[i].size()),
1715               "v_" + std::to_string(i + 1));
1716   }
1717 
1718   for (unsigned int i = 4; i < 6; i++) {
1719     ASSERT_TRUE(s[i].IsAborted());
1720   }
1721 
1722   ASSERT_EQ(12, (int)get_perf_context()->multiget_read_bytes);
1723   SetPerfLevel(kDisable);
1724 }
1725 
TEST_F(DBBasicTest,MultiGetBatchedValueSize)1726 TEST_F(DBBasicTest, MultiGetBatchedValueSize) {
1727   do {
1728     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1729     SetPerfLevel(kEnableCount);
1730 
1731     ASSERT_OK(Put(1, "k6", "v6"));
1732     ASSERT_OK(Put(1, "k7", "v7_"));
1733     ASSERT_OK(Put(1, "k3", "v3_"));
1734     ASSERT_OK(Put(1, "k4", "v4"));
1735     ASSERT_OK(Flush(1));
1736     ASSERT_OK(Delete(1, "k4"));
1737     ASSERT_OK(Put(1, "k11", "v11"));
1738     ASSERT_OK(Delete(1, "no_key"));
1739     ASSERT_OK(Put(1, "k8", "v8_"));
1740     ASSERT_OK(Put(1, "k13", "v13"));
1741     ASSERT_OK(Put(1, "k14", "v14"));
1742     ASSERT_OK(Put(1, "k15", "v15"));
1743     ASSERT_OK(Put(1, "k16", "v16"));
1744     ASSERT_OK(Put(1, "k17", "v17"));
1745     ASSERT_OK(Flush(1));
1746 
1747     ASSERT_OK(Put(1, "k1", "v1_"));
1748     ASSERT_OK(Put(1, "k2", "v2_"));
1749     ASSERT_OK(Put(1, "k5", "v5_"));
1750     ASSERT_OK(Put(1, "k9", "v9_"));
1751     ASSERT_OK(Put(1, "k10", "v10"));
1752     ASSERT_OK(Delete(1, "k2"));
1753     ASSERT_OK(Delete(1, "k6"));
1754 
1755     get_perf_context()->Reset();
1756 
1757     std::vector<Slice> keys({"k1", "k10", "k11", "k12", "k13", "k14", "k15",
1758                              "k16", "k17", "k2", "k3", "k4", "k5", "k6", "k7",
1759                              "k8", "k9", "no_key"});
1760     std::vector<PinnableSlice> values(keys.size());
1761     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1762     std::vector<Status> s(keys.size());
1763 
1764     ReadOptions ro;
1765     ro.value_size_soft_limit = 20;
1766     db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
1767                   s.data(), false);
1768 
1769     ASSERT_EQ(values.size(), keys.size());
1770 
1771     // In memory keys
1772     ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1_");
1773     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v10");
1774     ASSERT_TRUE(s[9].IsNotFound());  // k2
1775     ASSERT_EQ(std::string(values[12].data(), values[12].size()), "v5_");
1776     ASSERT_TRUE(s[13].IsNotFound());  // k6
1777     ASSERT_EQ(std::string(values[16].data(), values[16].size()), "v9_");
1778 
1779     // In sst files
1780     ASSERT_EQ(std::string(values[2].data(), values[1].size()), "v11");
1781     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v13");
1782     ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v14");
1783 
1784     // Remaining aborted after value_size exceeds.
1785     ASSERT_TRUE(s[3].IsAborted());
1786     ASSERT_TRUE(s[6].IsAborted());
1787     ASSERT_TRUE(s[7].IsAborted());
1788     ASSERT_TRUE(s[8].IsAborted());
1789     ASSERT_TRUE(s[10].IsAborted());
1790     ASSERT_TRUE(s[11].IsAborted());
1791     ASSERT_TRUE(s[14].IsAborted());
1792     ASSERT_TRUE(s[15].IsAborted());
1793     ASSERT_TRUE(s[17].IsAborted());
1794 
1795     // 6 kv pairs * 3 bytes per value (i.e. 18)
1796     ASSERT_EQ(21, (int)get_perf_context()->multiget_read_bytes);
1797     SetPerfLevel(kDisable);
1798   } while (ChangeCompactOptions());
1799 }
1800 
TEST_F(DBBasicTest,MultiGetBatchedValueSizeMultiLevelMerge)1801 TEST_F(DBBasicTest, MultiGetBatchedValueSizeMultiLevelMerge) {
1802   Options options = CurrentOptions();
1803   options.disable_auto_compactions = true;
1804   options.merge_operator = MergeOperators::CreateStringAppendOperator();
1805   BlockBasedTableOptions bbto;
1806   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1807   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1808   Reopen(options);
1809   int num_keys = 0;
1810 
1811   for (int i = 0; i < 64; ++i) {
1812     ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1813     num_keys++;
1814     if (num_keys == 8) {
1815       ASSERT_OK(Flush());
1816       num_keys = 0;
1817     }
1818   }
1819   if (num_keys > 0) {
1820     ASSERT_OK(Flush());
1821     num_keys = 0;
1822   }
1823   MoveFilesToLevel(2);
1824 
1825   for (int i = 0; i < 64; i += 3) {
1826     ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1827     num_keys++;
1828     if (num_keys == 8) {
1829       ASSERT_OK(Flush());
1830       num_keys = 0;
1831     }
1832   }
1833   if (num_keys > 0) {
1834     ASSERT_OK(Flush());
1835     num_keys = 0;
1836   }
1837   MoveFilesToLevel(1);
1838 
1839   for (int i = 0; i < 64; i += 5) {
1840     ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1841     num_keys++;
1842     if (num_keys == 8) {
1843       ASSERT_OK(Flush());
1844       num_keys = 0;
1845     }
1846   }
1847   if (num_keys > 0) {
1848     ASSERT_OK(Flush());
1849     num_keys = 0;
1850   }
1851   ASSERT_EQ(0, num_keys);
1852 
1853   for (int i = 0; i < 64; i += 9) {
1854     ASSERT_OK(
1855         Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1856   }
1857 
1858   std::vector<std::string> keys_str;
1859   for (int i = 10; i < 50; ++i) {
1860     keys_str.push_back("key_" + std::to_string(i));
1861   }
1862 
1863   std::vector<Slice> keys(keys_str.size());
1864   for (int i = 0; i < 40; i++) {
1865     keys[i] = Slice(keys_str[i]);
1866   }
1867 
1868   std::vector<PinnableSlice> values(keys_str.size());
1869   std::vector<Status> statuses(keys_str.size());
1870   ReadOptions read_options;
1871   read_options.verify_checksums = true;
1872   read_options.value_size_soft_limit = 380;
1873   db_->MultiGet(read_options, dbfull()->DefaultColumnFamily(), keys.size(),
1874                 keys.data(), values.data(), statuses.data());
1875 
1876   ASSERT_EQ(values.size(), keys.size());
1877 
1878   uint64_t curr_value_size = 0;
1879   for (unsigned int j = 0; j < 26; ++j) {
1880     int key = j + 10;
1881     std::string value;
1882     value.append("val_l2_" + std::to_string(key));
1883     if (key % 3 == 0) {
1884       value.append(",");
1885       value.append("val_l1_" + std::to_string(key));
1886     }
1887     if (key % 5 == 0) {
1888       value.append(",");
1889       value.append("val_l0_" + std::to_string(key));
1890     }
1891     if (key % 9 == 0) {
1892       value.append(",");
1893       value.append("val_mem_" + std::to_string(key));
1894     }
1895     curr_value_size += value.size();
1896     ASSERT_EQ(values[j], value);
1897     ASSERT_OK(statuses[j]);
1898   }
1899   // ASSERT_TRUE(curr_value_size <= read_options.value_size_hard_limit);
1900 
1901   // All remaning keys status is set Status::Abort
1902   for (unsigned int j = 26; j < 40; j++) {
1903     ASSERT_TRUE(statuses[j].IsAborted());
1904   }
1905 }
1906 
TEST_F(DBBasicTest,MultiGetStats)1907 TEST_F(DBBasicTest, MultiGetStats) {
1908   Options options;
1909   options.create_if_missing = true;
1910   options.disable_auto_compactions = true;
1911   options.env = env_;
1912   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1913   BlockBasedTableOptions table_options;
1914   table_options.block_size = 1;
1915   table_options.index_type =
1916       BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1917   table_options.partition_filters = true;
1918   table_options.no_block_cache = true;
1919   table_options.cache_index_and_filter_blocks = false;
1920   table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
1921   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1922   CreateAndReopenWithCF({"pikachu"}, options);
1923 
1924   int total_keys = 2000;
1925   std::vector<std::string> keys_str(total_keys);
1926   std::vector<Slice> keys(total_keys);
1927   std::vector<PinnableSlice> values(total_keys);
1928   std::vector<Status> s(total_keys);
1929   ReadOptions read_opts;
1930 
1931   Random rnd(309);
1932   // Create Multiple SST files at multiple levels.
1933   for (int i = 0; i < 500; ++i) {
1934     keys_str[i] = "k" + std::to_string(i);
1935     keys[i] = Slice(keys_str[i]);
1936     ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
1937     if (i % 100 == 0) {
1938       ASSERT_OK(Flush(1));
1939     }
1940   }
1941   ASSERT_OK(Flush(1));
1942   MoveFilesToLevel(2, 1);
1943 
1944   for (int i = 501; i < 1000; ++i) {
1945     keys_str[i] = "k" + std::to_string(i);
1946     keys[i] = Slice(keys_str[i]);
1947     ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
1948     if (i % 100 == 0) {
1949       ASSERT_OK(Flush(1));
1950     }
1951   }
1952 
1953   ASSERT_OK(Flush(1));
1954   MoveFilesToLevel(2, 1);
1955 
1956   for (int i = 1001; i < total_keys; ++i) {
1957     keys_str[i] = "k" + std::to_string(i);
1958     keys[i] = Slice(keys_str[i]);
1959     ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
1960     if (i % 100 == 0) {
1961       ASSERT_OK(Flush(1));
1962     }
1963   }
1964   ASSERT_OK(Flush(1));
1965   Close();
1966 
1967   ReopenWithColumnFamilies({"default", "pikachu"}, options);
1968   ASSERT_OK(options.statistics->Reset());
1969 
1970   db_->MultiGet(read_opts, handles_[1], total_keys, keys.data(), values.data(),
1971                 s.data(), false);
1972 
1973   ASSERT_EQ(values.size(), total_keys);
1974   HistogramData hist_data_blocks;
1975   HistogramData hist_index_and_filter_blocks;
1976   HistogramData hist_sst;
1977 
1978   options.statistics->histogramData(NUM_DATA_BLOCKS_READ_PER_LEVEL,
1979                                     &hist_data_blocks);
1980   options.statistics->histogramData(NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
1981                                     &hist_index_and_filter_blocks);
1982   options.statistics->histogramData(NUM_SST_READ_PER_LEVEL, &hist_sst);
1983 
1984   // Maximum number of blocks read from a file system in a level.
1985   ASSERT_GT(hist_data_blocks.max, 0);
1986   ASSERT_GT(hist_index_and_filter_blocks.max, 0);
1987   // Maximum number of sst files read from file system in a level.
1988   ASSERT_GT(hist_sst.max, 0);
1989 
1990   // Minimun number of blocks read in a level.
1991   ASSERT_EQ(hist_data_blocks.min, 3);
1992   ASSERT_GT(hist_index_and_filter_blocks.min, 0);
1993   // Minimun number of sst files read in a level.
1994   ASSERT_GT(hist_sst.max, 0);
1995 }
1996 
1997 // Test class for batched MultiGet with prefix extractor
1998 // Param bool - If true, use partitioned filters
1999 //              If false, use full filter block
2000 class MultiGetPrefixExtractorTest : public DBBasicTest,
2001                                     public ::testing::WithParamInterface<bool> {
2002 };
2003 
TEST_P(MultiGetPrefixExtractorTest,Batched)2004 TEST_P(MultiGetPrefixExtractorTest, Batched) {
2005   Options options = CurrentOptions();
2006   options.prefix_extractor.reset(NewFixedPrefixTransform(2));
2007   options.memtable_prefix_bloom_size_ratio = 10;
2008   BlockBasedTableOptions bbto;
2009   if (GetParam()) {
2010     bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
2011     bbto.partition_filters = true;
2012   }
2013   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
2014   bbto.whole_key_filtering = false;
2015   bbto.cache_index_and_filter_blocks = false;
2016   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2017   Reopen(options);
2018 
2019   SetPerfLevel(kEnableCount);
2020   get_perf_context()->Reset();
2021 
2022   // First key is not in the prefix_extractor domain
2023   ASSERT_OK(Put("k", "v0"));
2024   ASSERT_OK(Put("kk1", "v1"));
2025   ASSERT_OK(Put("kk2", "v2"));
2026   ASSERT_OK(Put("kk3", "v3"));
2027   ASSERT_OK(Put("kk4", "v4"));
2028   std::vector<std::string> mem_keys(
2029       {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
2030   std::vector<std::string> inmem_values;
2031   inmem_values = MultiGet(mem_keys, nullptr);
2032   ASSERT_EQ(inmem_values[0], "v0");
2033   ASSERT_EQ(inmem_values[1], "v1");
2034   ASSERT_EQ(inmem_values[2], "v2");
2035   ASSERT_EQ(inmem_values[3], "v3");
2036   ASSERT_EQ(inmem_values[4], "v4");
2037   ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
2038   ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
2039   ASSERT_OK(Flush());
2040 
2041   std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
2042   std::vector<std::string> values;
2043   get_perf_context()->Reset();
2044   values = MultiGet(keys, nullptr);
2045   ASSERT_EQ(values[0], "v0");
2046   ASSERT_EQ(values[1], "v1");
2047   ASSERT_EQ(values[2], "v2");
2048   ASSERT_EQ(values[3], "v3");
2049   ASSERT_EQ(values[4], "v4");
2050   // Filter hits for 4 in-domain keys
2051   ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
2052 }
2053 
2054 INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
2055                         ::testing::Bool());
2056 
2057 #ifndef ROCKSDB_LITE
2058 class DBMultiGetRowCacheTest : public DBBasicTest,
2059                                public ::testing::WithParamInterface<bool> {};
2060 
TEST_P(DBMultiGetRowCacheTest,MultiGetBatched)2061 TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
2062   do {
2063     option_config_ = kRowCache;
2064     Options options = CurrentOptions();
2065     options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2066     CreateAndReopenWithCF({"pikachu"}, options);
2067     SetPerfLevel(kEnableCount);
2068     ASSERT_OK(Put(1, "k1", "v1"));
2069     ASSERT_OK(Put(1, "k2", "v2"));
2070     ASSERT_OK(Put(1, "k3", "v3"));
2071     ASSERT_OK(Put(1, "k4", "v4"));
2072     ASSERT_OK(Flush(1));
2073     ASSERT_OK(Put(1, "k5", "v5"));
2074     const Snapshot* snap1 = dbfull()->GetSnapshot();
2075     ASSERT_OK(Delete(1, "k4"));
2076     ASSERT_OK(Flush(1));
2077     const Snapshot* snap2 = dbfull()->GetSnapshot();
2078 
2079     get_perf_context()->Reset();
2080 
2081     std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
2082     std::vector<PinnableSlice> values(keys.size());
2083     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
2084     std::vector<Status> s(keys.size());
2085 
2086     ReadOptions ro;
2087     bool use_snapshots = GetParam();
2088     if (use_snapshots) {
2089       ro.snapshot = snap2;
2090     }
2091     db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
2092                   s.data(), false);
2093 
2094     ASSERT_EQ(values.size(), keys.size());
2095     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
2096     ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
2097     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
2098     // four kv pairs * two bytes per value
2099     ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
2100 
2101     ASSERT_TRUE(s[0].IsNotFound());
2102     ASSERT_OK(s[1]);
2103     ASSERT_TRUE(s[2].IsNotFound());
2104     ASSERT_OK(s[3]);
2105     ASSERT_OK(s[4]);
2106 
2107     // Call MultiGet() again with some intersection with the previous set of
2108     // keys. Those should already be in the row cache.
2109     keys.assign({"no_key", "k5", "k3", "k2"});
2110     for (size_t i = 0; i < keys.size(); ++i) {
2111       values[i].Reset();
2112       s[i] = Status::OK();
2113     }
2114     get_perf_context()->Reset();
2115 
2116     if (use_snapshots) {
2117       ro.snapshot = snap1;
2118     }
2119     db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
2120                   values.data(), s.data(), false);
2121 
2122     ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
2123     ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
2124     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
2125     // four kv pairs * two bytes per value
2126     ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
2127 
2128     ASSERT_TRUE(s[0].IsNotFound());
2129     ASSERT_OK(s[1]);
2130     ASSERT_OK(s[2]);
2131     ASSERT_OK(s[3]);
2132     if (use_snapshots) {
2133       // Only reads from the first SST file would have been cached, since
2134       // snapshot seq no is > fd.largest_seqno
2135       ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
2136     } else {
2137       ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
2138     }
2139 
2140     SetPerfLevel(kDisable);
2141     dbfull()->ReleaseSnapshot(snap1);
2142     dbfull()->ReleaseSnapshot(snap2);
2143   } while (ChangeCompactOptions());
2144 }
2145 
2146 INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
2147                         testing::Values(true, false));
2148 
TEST_F(DBBasicTest,GetAllKeyVersions)2149 TEST_F(DBBasicTest, GetAllKeyVersions) {
2150   Options options = CurrentOptions();
2151   options.env = env_;
2152   options.create_if_missing = true;
2153   options.disable_auto_compactions = true;
2154   CreateAndReopenWithCF({"pikachu"}, options);
2155   ASSERT_EQ(2, handles_.size());
2156   const size_t kNumInserts = 4;
2157   const size_t kNumDeletes = 4;
2158   const size_t kNumUpdates = 4;
2159 
2160   // Check default column family
2161   for (size_t i = 0; i != kNumInserts; ++i) {
2162     ASSERT_OK(Put(std::to_string(i), "value"));
2163   }
2164   for (size_t i = 0; i != kNumUpdates; ++i) {
2165     ASSERT_OK(Put(std::to_string(i), "value1"));
2166   }
2167   for (size_t i = 0; i != kNumDeletes; ++i) {
2168     ASSERT_OK(Delete(std::to_string(i)));
2169   }
2170   std::vector<KeyVersion> key_versions;
2171   ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
2172       db_, Slice(), Slice(), std::numeric_limits<size_t>::max(),
2173       &key_versions));
2174   ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
2175   ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
2176       db_, handles_[0], Slice(), Slice(), std::numeric_limits<size_t>::max(),
2177       &key_versions));
2178   ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
2179 
2180   // Check non-default column family
2181   for (size_t i = 0; i + 1 != kNumInserts; ++i) {
2182     ASSERT_OK(Put(1, std::to_string(i), "value"));
2183   }
2184   for (size_t i = 0; i + 1 != kNumUpdates; ++i) {
2185     ASSERT_OK(Put(1, std::to_string(i), "value1"));
2186   }
2187   for (size_t i = 0; i + 1 != kNumDeletes; ++i) {
2188     ASSERT_OK(Delete(1, std::to_string(i)));
2189   }
2190   ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
2191       db_, handles_[1], Slice(), Slice(), std::numeric_limits<size_t>::max(),
2192       &key_versions));
2193   ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
2194 }
2195 #endif  // !ROCKSDB_LITE
2196 
TEST_F(DBBasicTest,MultiGetIOBufferOverrun)2197 TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
2198   Options options = CurrentOptions();
2199   Random rnd(301);
2200   BlockBasedTableOptions table_options;
2201   table_options.pin_l0_filter_and_index_blocks_in_cache = true;
2202   table_options.block_size = 16 * 1024;
2203   ASSERT_TRUE(table_options.block_size >
2204             BlockBasedTable::kMultiGetReadStackBufSize);
2205   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2206   Reopen(options);
2207 
2208   std::string zero_str(128, '\0');
2209   for (int i = 0; i < 100; ++i) {
2210     // Make the value compressible. A purely random string doesn't compress
2211     // and the resultant data block will not be compressed
2212     std::string value(rnd.RandomString(128) + zero_str);
2213     assert(Put(Key(i), value) == Status::OK());
2214   }
2215   ASSERT_OK(Flush());
2216 
2217   std::vector<std::string> key_data(10);
2218   std::vector<Slice> keys;
2219   // We cannot resize a PinnableSlice vector, so just set initial size to
2220   // largest we think we will need
2221   std::vector<PinnableSlice> values(10);
2222   std::vector<Status> statuses;
2223   ReadOptions ro;
2224 
2225   // Warm up the cache first
2226   key_data.emplace_back(Key(0));
2227   keys.emplace_back(Slice(key_data.back()));
2228   key_data.emplace_back(Key(50));
2229   keys.emplace_back(Slice(key_data.back()));
2230   statuses.resize(keys.size());
2231 
2232   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2233                      keys.data(), values.data(), statuses.data(), true);
2234 }
2235 
TEST_F(DBBasicTest,IncrementalRecoveryNoCorrupt)2236 TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
2237   Options options = CurrentOptions();
2238   DestroyAndReopen(options);
2239   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
2240   size_t num_cfs = handles_.size();
2241   ASSERT_EQ(3, num_cfs);
2242   WriteOptions write_opts;
2243   write_opts.disableWAL = true;
2244   for (size_t cf = 0; cf != num_cfs; ++cf) {
2245     for (size_t i = 0; i != 10000; ++i) {
2246       std::string key_str = Key(static_cast<int>(i));
2247       std::string value_str = std::to_string(cf) + "_" + std::to_string(i);
2248 
2249       ASSERT_OK(Put(static_cast<int>(cf), key_str, value_str));
2250       if (0 == (i % 1000)) {
2251         ASSERT_OK(Flush(static_cast<int>(cf)));
2252       }
2253     }
2254   }
2255   for (size_t cf = 0; cf != num_cfs; ++cf) {
2256     ASSERT_OK(Flush(static_cast<int>(cf)));
2257   }
2258   Close();
2259   options.best_efforts_recovery = true;
2260   ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
2261                            options);
2262   num_cfs = handles_.size();
2263   ASSERT_EQ(3, num_cfs);
2264   for (size_t cf = 0; cf != num_cfs; ++cf) {
2265     for (int i = 0; i != 10000; ++i) {
2266       std::string key_str = Key(static_cast<int>(i));
2267       std::string expected_value_str =
2268           std::to_string(cf) + "_" + std::to_string(i);
2269       ASSERT_EQ(expected_value_str, Get(static_cast<int>(cf), key_str));
2270     }
2271   }
2272 }
2273 
TEST_F(DBBasicTest,BestEffortsRecoveryWithVersionBuildingFailure)2274 TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) {
2275   Options options = CurrentOptions();
2276   DestroyAndReopen(options);
2277   ASSERT_OK(Put("foo", "value"));
2278   ASSERT_OK(Flush());
2279   SyncPoint::GetInstance()->DisableProcessing();
2280   SyncPoint::GetInstance()->ClearAllCallBacks();
2281   SyncPoint::GetInstance()->SetCallBack(
2282       "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
2283         ASSERT_NE(nullptr, arg);
2284         *(reinterpret_cast<Status*>(arg)) =
2285             Status::Corruption("Inject corruption");
2286       });
2287   SyncPoint::GetInstance()->EnableProcessing();
2288 
2289   options.best_efforts_recovery = true;
2290   Status s = TryReopen(options);
2291   ASSERT_TRUE(s.IsCorruption());
2292   SyncPoint::GetInstance()->DisableProcessing();
2293   SyncPoint::GetInstance()->ClearAllCallBacks();
2294 }
2295 
2296 #ifndef ROCKSDB_LITE
2297 namespace {
2298 class TableFileListener : public EventListener {
2299  public:
OnTableFileCreated(const TableFileCreationInfo & info)2300   void OnTableFileCreated(const TableFileCreationInfo& info) override {
2301     InstrumentedMutexLock lock(&mutex_);
2302     cf_to_paths_[info.cf_name].push_back(info.file_path);
2303   }
GetFiles(const std::string & cf_name)2304   std::vector<std::string>& GetFiles(const std::string& cf_name) {
2305     InstrumentedMutexLock lock(&mutex_);
2306     return cf_to_paths_[cf_name];
2307   }
2308 
2309  private:
2310   InstrumentedMutex mutex_;
2311   std::unordered_map<std::string, std::vector<std::string>> cf_to_paths_;
2312 };
2313 }  // namespace
2314 
TEST_F(DBBasicTest,LastSstFileNotInManifest)2315 TEST_F(DBBasicTest, LastSstFileNotInManifest) {
2316   // If the last sst file is not tracked in MANIFEST,
2317   // or the VersionEdit for the last sst file is not synced,
2318   // on recovery, the last sst file should be deleted,
2319   // and new sst files shouldn't reuse its file number.
2320   Options options = CurrentOptions();
2321   DestroyAndReopen(options);
2322   Close();
2323 
2324   // Manually add a sst file.
2325   constexpr uint64_t kSstFileNumber = 100;
2326   const std::string kSstFile = MakeTableFileName(dbname_, kSstFileNumber);
2327   ASSERT_OK(WriteStringToFile(env_, /* data = */ "bad sst file content",
2328                               /* fname = */ kSstFile,
2329                               /* should_sync = */ true));
2330   ASSERT_OK(env_->FileExists(kSstFile));
2331 
2332   TableFileListener* listener = new TableFileListener();
2333   options.listeners.emplace_back(listener);
2334   Reopen(options);
2335   // kSstFile should already be deleted.
2336   ASSERT_TRUE(env_->FileExists(kSstFile).IsNotFound());
2337 
2338   ASSERT_OK(Put("k", "v"));
2339   ASSERT_OK(Flush());
2340   // New sst file should have file number > kSstFileNumber.
2341   std::vector<std::string>& files =
2342       listener->GetFiles(kDefaultColumnFamilyName);
2343   ASSERT_EQ(files.size(), 1);
2344   const std::string fname = files[0].erase(0, (dbname_ + "/").size());
2345   uint64_t number = 0;
2346   FileType type = kTableFile;
2347   ASSERT_TRUE(ParseFileName(fname, &number, &type));
2348   ASSERT_EQ(type, kTableFile);
2349   ASSERT_GT(number, kSstFileNumber);
2350 }
2351 
TEST_F(DBBasicTest,RecoverWithMissingFiles)2352 TEST_F(DBBasicTest, RecoverWithMissingFiles) {
2353   Options options = CurrentOptions();
2354   DestroyAndReopen(options);
2355   TableFileListener* listener = new TableFileListener();
2356   // Disable auto compaction to simplify SST file name tracking.
2357   options.disable_auto_compactions = true;
2358   options.listeners.emplace_back(listener);
2359   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
2360   std::vector<std::string> all_cf_names = {kDefaultColumnFamilyName, "pikachu",
2361                                            "eevee"};
2362   size_t num_cfs = handles_.size();
2363   ASSERT_EQ(3, num_cfs);
2364   for (size_t cf = 0; cf != num_cfs; ++cf) {
2365     ASSERT_OK(Put(static_cast<int>(cf), "a", "0_value"));
2366     ASSERT_OK(Flush(static_cast<int>(cf)));
2367     ASSERT_OK(Put(static_cast<int>(cf), "b", "0_value"));
2368     ASSERT_OK(Flush(static_cast<int>(cf)));
2369     ASSERT_OK(Put(static_cast<int>(cf), "c", "0_value"));
2370     ASSERT_OK(Flush(static_cast<int>(cf)));
2371   }
2372 
2373   // Delete and corrupt files
2374   for (size_t i = 0; i < all_cf_names.size(); ++i) {
2375     std::vector<std::string>& files = listener->GetFiles(all_cf_names[i]);
2376     ASSERT_EQ(3, files.size());
2377     std::string corrupted_data;
2378     ASSERT_OK(ReadFileToString(env_, files[files.size() - 1], &corrupted_data));
2379     ASSERT_OK(WriteStringToFile(
2380         env_, corrupted_data.substr(0, corrupted_data.size() - 2),
2381         files[files.size() - 1], /*should_sync=*/true));
2382     for (int j = static_cast<int>(files.size() - 2); j >= static_cast<int>(i);
2383          --j) {
2384       ASSERT_OK(env_->DeleteFile(files[j]));
2385     }
2386   }
2387   options.best_efforts_recovery = true;
2388   ReopenWithColumnFamilies(all_cf_names, options);
2389   // Verify data
2390   ReadOptions read_opts;
2391   read_opts.total_order_seek = true;
2392   {
2393     std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
2394     iter->SeekToFirst();
2395     ASSERT_FALSE(iter->Valid());
2396     ASSERT_OK(iter->status());
2397     iter.reset(db_->NewIterator(read_opts, handles_[1]));
2398     iter->SeekToFirst();
2399     ASSERT_TRUE(iter->Valid());
2400     ASSERT_EQ("a", iter->key());
2401     iter->Next();
2402     ASSERT_FALSE(iter->Valid());
2403     ASSERT_OK(iter->status());
2404     iter.reset(db_->NewIterator(read_opts, handles_[2]));
2405     iter->SeekToFirst();
2406     ASSERT_TRUE(iter->Valid());
2407     ASSERT_EQ("a", iter->key());
2408     iter->Next();
2409     ASSERT_TRUE(iter->Valid());
2410     ASSERT_EQ("b", iter->key());
2411     iter->Next();
2412     ASSERT_FALSE(iter->Valid());
2413     ASSERT_OK(iter->status());
2414   }
2415 }
2416 
TEST_F(DBBasicTest,BestEffortsRecoveryTryMultipleManifests)2417 TEST_F(DBBasicTest, BestEffortsRecoveryTryMultipleManifests) {
2418   Options options = CurrentOptions();
2419   options.env = env_;
2420   DestroyAndReopen(options);
2421   ASSERT_OK(Put("foo", "value0"));
2422   ASSERT_OK(Flush());
2423   Close();
2424   {
2425     // Hack by adding a new MANIFEST with high file number
2426     std::string garbage(10, '\0');
2427     ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/MANIFEST-001000",
2428                                 /*should_sync=*/true));
2429   }
2430   {
2431     // Hack by adding a corrupted SST not referenced by any MANIFEST
2432     std::string garbage(10, '\0');
2433     ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/001001.sst",
2434                                 /*should_sync=*/true));
2435   }
2436 
2437   options.best_efforts_recovery = true;
2438 
2439   Reopen(options);
2440   ASSERT_OK(Put("bar", "value"));
2441 }
2442 
TEST_F(DBBasicTest,RecoverWithNoCurrentFile)2443 TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
2444   Options options = CurrentOptions();
2445   options.env = env_;
2446   DestroyAndReopen(options);
2447   CreateAndReopenWithCF({"pikachu"}, options);
2448   options.best_efforts_recovery = true;
2449   ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
2450   ASSERT_EQ(2, handles_.size());
2451   ASSERT_OK(Put("foo", "value"));
2452   ASSERT_OK(Put(1, "bar", "value"));
2453   ASSERT_OK(Flush());
2454   ASSERT_OK(Flush(1));
2455   Close();
2456   ASSERT_OK(env_->DeleteFile(CurrentFileName(dbname_)));
2457   ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
2458   std::vector<std::string> cf_names;
2459   ASSERT_OK(DB::ListColumnFamilies(DBOptions(options), dbname_, &cf_names));
2460   ASSERT_EQ(2, cf_names.size());
2461   for (const auto& name : cf_names) {
2462     ASSERT_TRUE(name == kDefaultColumnFamilyName || name == "pikachu");
2463   }
2464 }
2465 
TEST_F(DBBasicTest,RecoverWithNoManifest)2466 TEST_F(DBBasicTest, RecoverWithNoManifest) {
2467   Options options = CurrentOptions();
2468   options.env = env_;
2469   DestroyAndReopen(options);
2470   ASSERT_OK(Put("foo", "value"));
2471   ASSERT_OK(Flush());
2472   Close();
2473   {
2474     // Delete all MANIFEST.
2475     std::vector<std::string> files;
2476     ASSERT_OK(env_->GetChildren(dbname_, &files));
2477     for (const auto& file : files) {
2478       uint64_t number = 0;
2479       FileType type = kWalFile;
2480       if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
2481         ASSERT_OK(env_->DeleteFile(dbname_ + "/" + file));
2482       }
2483     }
2484   }
2485   options.best_efforts_recovery = true;
2486   options.create_if_missing = false;
2487   Status s = TryReopen(options);
2488   ASSERT_TRUE(s.IsInvalidArgument());
2489   options.create_if_missing = true;
2490   Reopen(options);
2491   // Since no MANIFEST exists, best-efforts recovery creates a new, empty db.
2492   ASSERT_EQ("NOT_FOUND", Get("foo"));
2493 }
2494 
TEST_F(DBBasicTest,SkipWALIfMissingTableFiles)2495 TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
2496   Options options = CurrentOptions();
2497   DestroyAndReopen(options);
2498   TableFileListener* listener = new TableFileListener();
2499   options.listeners.emplace_back(listener);
2500   CreateAndReopenWithCF({"pikachu"}, options);
2501   std::vector<std::string> kAllCfNames = {kDefaultColumnFamilyName, "pikachu"};
2502   size_t num_cfs = handles_.size();
2503   ASSERT_EQ(2, num_cfs);
2504   for (int cf = 0; cf < static_cast<int>(kAllCfNames.size()); ++cf) {
2505     ASSERT_OK(Put(cf, "a", "0_value"));
2506     ASSERT_OK(Flush(cf));
2507     ASSERT_OK(Put(cf, "b", "0_value"));
2508   }
2509   // Delete files
2510   for (size_t i = 0; i < kAllCfNames.size(); ++i) {
2511     std::vector<std::string>& files = listener->GetFiles(kAllCfNames[i]);
2512     ASSERT_EQ(1, files.size());
2513     for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
2514          --j) {
2515       ASSERT_OK(env_->DeleteFile(files[j]));
2516     }
2517   }
2518   options.best_efforts_recovery = true;
2519   ReopenWithColumnFamilies(kAllCfNames, options);
2520   // Verify WAL is not applied
2521   ReadOptions read_opts;
2522   read_opts.total_order_seek = true;
2523   std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
2524   iter->SeekToFirst();
2525   ASSERT_FALSE(iter->Valid());
2526   ASSERT_OK(iter->status());
2527   iter.reset(db_->NewIterator(read_opts, handles_[1]));
2528   iter->SeekToFirst();
2529   ASSERT_TRUE(iter->Valid());
2530   ASSERT_EQ("a", iter->key());
2531   iter->Next();
2532   ASSERT_FALSE(iter->Valid());
2533   ASSERT_OK(iter->status());
2534 }
2535 
TEST_F(DBBasicTest,DisableTrackWal)2536 TEST_F(DBBasicTest, DisableTrackWal) {
2537   // If WAL tracking was enabled, and then disabled during reopen,
2538   // the previously tracked WALs should be removed from MANIFEST.
2539 
2540   Options options = CurrentOptions();
2541   options.track_and_verify_wals_in_manifest = true;
2542   // extremely small write buffer size,
2543   // so that new WALs are created more frequently.
2544   options.write_buffer_size = 100;
2545   options.env = env_;
2546   DestroyAndReopen(options);
2547   for (int i = 0; i < 100; i++) {
2548     ASSERT_OK(Put("foo" + std::to_string(i), "value" + std::to_string(i)));
2549   }
2550   ASSERT_OK(dbfull()->TEST_SwitchMemtable());
2551   ASSERT_OK(db_->SyncWAL());
2552   // Some WALs are tracked.
2553   ASSERT_FALSE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty());
2554   Close();
2555 
2556   // Disable WAL tracking.
2557   options.track_and_verify_wals_in_manifest = false;
2558   options.create_if_missing = false;
2559   ASSERT_OK(TryReopen(options));
2560   // Previously tracked WALs are cleared.
2561   ASSERT_TRUE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty());
2562   Close();
2563 
2564   // Re-enable WAL tracking again.
2565   options.track_and_verify_wals_in_manifest = true;
2566   options.create_if_missing = false;
2567   ASSERT_OK(TryReopen(options));
2568   ASSERT_TRUE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty());
2569   Close();
2570 }
2571 #endif  // !ROCKSDB_LITE
2572 
TEST_F(DBBasicTest,ManifestChecksumMismatch)2573 TEST_F(DBBasicTest, ManifestChecksumMismatch) {
2574   Options options = CurrentOptions();
2575   DestroyAndReopen(options);
2576   ASSERT_OK(Put("bar", "value"));
2577   ASSERT_OK(Flush());
2578   SyncPoint::GetInstance()->DisableProcessing();
2579   SyncPoint::GetInstance()->ClearAllCallBacks();
2580   SyncPoint::GetInstance()->SetCallBack(
2581       "LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
2582         auto* crc = reinterpret_cast<uint32_t*>(arg);
2583         *crc = *crc + 1;
2584       });
2585   SyncPoint::GetInstance()->EnableProcessing();
2586 
2587   WriteOptions write_opts;
2588   write_opts.disableWAL = true;
2589   Status s = db_->Put(write_opts, "foo", "value");
2590   ASSERT_OK(s);
2591   ASSERT_OK(Flush());
2592   SyncPoint::GetInstance()->DisableProcessing();
2593   SyncPoint::GetInstance()->ClearAllCallBacks();
2594   ASSERT_OK(Put("foo", "value1"));
2595   ASSERT_OK(Flush());
2596   s = TryReopen(options);
2597   ASSERT_TRUE(s.IsCorruption());
2598 }
2599 
TEST_F(DBBasicTest,ConcurrentlyCloseDB)2600 TEST_F(DBBasicTest, ConcurrentlyCloseDB) {
2601   Options options = CurrentOptions();
2602   DestroyAndReopen(options);
2603   std::vector<std::thread> workers;
2604   for (int i = 0; i < 10; i++) {
2605     workers.push_back(std::thread([&]() {
2606       auto s = db_->Close();
2607       ASSERT_OK(s);
2608     }));
2609   }
2610   for (auto& w : workers) {
2611     w.join();
2612   }
2613 }
2614 
2615 #ifndef ROCKSDB_LITE
2616 class DBBasicTestTrackWal : public DBTestBase,
2617                             public testing::WithParamInterface<bool> {
2618  public:
DBBasicTestTrackWal()2619   DBBasicTestTrackWal()
2620       : DBTestBase("db_basic_test_track_wal", /*env_do_fsync=*/false) {}
2621 
CountWalFiles()2622   int CountWalFiles() {
2623     VectorLogPtr log_files;
2624     EXPECT_OK(dbfull()->GetSortedWalFiles(log_files));
2625     return static_cast<int>(log_files.size());
2626   };
2627 };
2628 
TEST_P(DBBasicTestTrackWal,DoNotTrackObsoleteWal)2629 TEST_P(DBBasicTestTrackWal, DoNotTrackObsoleteWal) {
2630   // If a WAL becomes obsolete after flushing, but is not deleted from disk yet,
2631   // then if SyncWAL is called afterwards, the obsolete WAL should not be
2632   // tracked in MANIFEST.
2633 
2634   Options options = CurrentOptions();
2635   options.create_if_missing = true;
2636   options.track_and_verify_wals_in_manifest = true;
2637   options.atomic_flush = GetParam();
2638 
2639   DestroyAndReopen(options);
2640   CreateAndReopenWithCF({"cf"}, options);
2641   ASSERT_EQ(handles_.size(), 2);  // default, cf
2642   // Do not delete WALs.
2643   ASSERT_OK(db_->DisableFileDeletions());
2644   constexpr int n = 10;
2645   std::vector<std::unique_ptr<LogFile>> wals(n);
2646   for (size_t i = 0; i < n; i++) {
2647     // Generate a new WAL for each key-value.
2648     const int cf = i % 2;
2649     ASSERT_OK(db_->GetCurrentWalFile(&wals[i]));
2650     ASSERT_OK(Put(cf, "k" + std::to_string(i), "v" + std::to_string(i)));
2651     ASSERT_OK(Flush({0, 1}));
2652   }
2653   ASSERT_EQ(CountWalFiles(), n);
2654   // Since all WALs are obsolete, no WAL should be tracked in MANIFEST.
2655   ASSERT_OK(db_->SyncWAL());
2656 
2657   // Manually delete all WALs.
2658   Close();
2659   for (const auto& wal : wals) {
2660     ASSERT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));
2661   }
2662 
2663   // If SyncWAL tracks the obsolete WALs in MANIFEST,
2664   // reopen will fail because the WALs are missing from disk.
2665   ASSERT_OK(TryReopenWithColumnFamilies({"default", "cf"}, options));
2666   Destroy(options);
2667 }
2668 
2669 INSTANTIATE_TEST_CASE_P(DBBasicTestTrackWal, DBBasicTestTrackWal,
2670                         testing::Bool());
2671 #endif  // ROCKSDB_LITE
2672 
2673 class DBBasicTestMultiGet : public DBTestBase {
2674  public:
DBBasicTestMultiGet(std::string test_dir,int num_cfs,bool compressed_cache,bool uncompressed_cache,bool _compression_enabled,bool _fill_cache,uint32_t compression_parallel_threads)2675   DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
2676                       bool uncompressed_cache, bool _compression_enabled,
2677                       bool _fill_cache, uint32_t compression_parallel_threads)
2678       : DBTestBase(test_dir, /*env_do_fsync=*/false) {
2679     compression_enabled_ = _compression_enabled;
2680     fill_cache_ = _fill_cache;
2681 
2682     if (compressed_cache) {
2683       std::shared_ptr<Cache> cache = NewLRUCache(1048576);
2684       compressed_cache_ = std::make_shared<MyBlockCache>(cache);
2685     }
2686     if (uncompressed_cache) {
2687       std::shared_ptr<Cache> cache = NewLRUCache(1048576);
2688       uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
2689     }
2690 
2691     env_->count_random_reads_ = true;
2692 
2693     Options options = CurrentOptions();
2694     Random rnd(301);
2695     BlockBasedTableOptions table_options;
2696 
2697 #ifndef ROCKSDB_LITE
2698     if (compression_enabled_) {
2699       std::vector<CompressionType> compression_types;
2700       compression_types = GetSupportedCompressions();
2701       // Not every platform may have compression libraries available, so
2702       // dynamically pick based on what's available
2703       CompressionType tmp_type = kNoCompression;
2704       for (auto c_type : compression_types) {
2705         if (c_type != kNoCompression) {
2706           tmp_type = c_type;
2707           break;
2708         }
2709       }
2710       if (tmp_type != kNoCompression) {
2711         options.compression = tmp_type;
2712       } else {
2713         compression_enabled_ = false;
2714       }
2715     }
2716 #else
2717     // GetSupportedCompressions() is not available in LITE build
2718     if (!Snappy_Supported()) {
2719       compression_enabled_ = false;
2720     }
2721 #endif  // ROCKSDB_LITE
2722 
2723     table_options.block_cache = uncompressed_cache_;
2724     if (table_options.block_cache == nullptr) {
2725       table_options.no_block_cache = true;
2726     } else {
2727       table_options.pin_l0_filter_and_index_blocks_in_cache = true;
2728     }
2729     table_options.block_cache_compressed = compressed_cache_;
2730     table_options.flush_block_policy_factory.reset(
2731         new MyFlushBlockPolicyFactory());
2732     options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2733     if (!compression_enabled_) {
2734       options.compression = kNoCompression;
2735     } else {
2736       options.compression_opts.parallel_threads = compression_parallel_threads;
2737     }
2738     options_ = options;
2739     Reopen(options);
2740 
2741     if (num_cfs > 1) {
2742       for (int cf = 0; cf < num_cfs; ++cf) {
2743         cf_names_.emplace_back("cf" + std::to_string(cf));
2744       }
2745       CreateColumnFamilies(cf_names_, options);
2746       cf_names_.emplace_back("default");
2747     }
2748 
2749     std::string zero_str(128, '\0');
2750     for (int cf = 0; cf < num_cfs; ++cf) {
2751       for (int i = 0; i < 100; ++i) {
2752         // Make the value compressible. A purely random string doesn't compress
2753         // and the resultant data block will not be compressed
2754         values_.emplace_back(rnd.RandomString(128) + zero_str);
2755         assert(((num_cfs == 1) ? Put(Key(i), values_[i])
2756                                : Put(cf, Key(i), values_[i])) == Status::OK());
2757       }
2758       if (num_cfs == 1) {
2759         EXPECT_OK(Flush());
2760       } else {
2761         EXPECT_OK(dbfull()->Flush(FlushOptions(), handles_[cf]));
2762       }
2763 
2764       for (int i = 0; i < 100; ++i) {
2765         // block cannot gain space by compression
2766         uncompressable_values_.emplace_back(rnd.RandomString(256) + '\0');
2767         std::string tmp_key = "a" + Key(i);
2768         assert(((num_cfs == 1) ? Put(tmp_key, uncompressable_values_[i])
2769                                : Put(cf, tmp_key, uncompressable_values_[i])) ==
2770                Status::OK());
2771       }
2772       if (num_cfs == 1) {
2773         EXPECT_OK(Flush());
2774       } else {
2775         EXPECT_OK(dbfull()->Flush(FlushOptions(), handles_[cf]));
2776       }
2777     }
2778     // Clear compressed cache, which is always pre-populated
2779     if (compressed_cache_) {
2780       compressed_cache_->SetCapacity(0);
2781       compressed_cache_->SetCapacity(1048576);
2782     }
2783   }
2784 
CheckValue(int i,const std::string & value)2785   bool CheckValue(int i, const std::string& value) {
2786     if (values_[i].compare(value) == 0) {
2787       return true;
2788     }
2789     return false;
2790   }
2791 
CheckUncompressableValue(int i,const std::string & value)2792   bool CheckUncompressableValue(int i, const std::string& value) {
2793     if (uncompressable_values_[i].compare(value) == 0) {
2794       return true;
2795     }
2796     return false;
2797   }
2798 
GetCFNames() const2799   const std::vector<std::string>& GetCFNames() const { return cf_names_; }
2800 
num_lookups()2801   int num_lookups() { return uncompressed_cache_->num_lookups(); }
num_found()2802   int num_found() { return uncompressed_cache_->num_found(); }
num_inserts()2803   int num_inserts() { return uncompressed_cache_->num_inserts(); }
2804 
num_lookups_compressed()2805   int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
num_found_compressed()2806   int num_found_compressed() { return compressed_cache_->num_found(); }
num_inserts_compressed()2807   int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
2808 
fill_cache()2809   bool fill_cache() { return fill_cache_; }
compression_enabled()2810   bool compression_enabled() { return compression_enabled_; }
has_compressed_cache()2811   bool has_compressed_cache() { return compressed_cache_ != nullptr; }
has_uncompressed_cache()2812   bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
get_options()2813   Options get_options() { return options_; }
2814 
SetUpTestCase()2815   static void SetUpTestCase() {}
TearDownTestCase()2816   static void TearDownTestCase() {}
2817 
2818  protected:
2819   class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
2820    public:
MyFlushBlockPolicyFactory()2821     MyFlushBlockPolicyFactory() {}
2822 
Name() const2823     virtual const char* Name() const override {
2824       return "MyFlushBlockPolicyFactory";
2825     }
2826 
NewFlushBlockPolicy(const BlockBasedTableOptions &,const BlockBuilder & data_block_builder) const2827     virtual FlushBlockPolicy* NewFlushBlockPolicy(
2828         const BlockBasedTableOptions& /*table_options*/,
2829         const BlockBuilder& data_block_builder) const override {
2830       return new MyFlushBlockPolicy(data_block_builder);
2831     }
2832   };
2833 
2834   class MyFlushBlockPolicy : public FlushBlockPolicy {
2835    public:
MyFlushBlockPolicy(const BlockBuilder & data_block_builder)2836     explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
2837         : num_keys_(0), data_block_builder_(data_block_builder) {}
2838 
Update(const Slice &,const Slice &)2839     bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
2840       if (data_block_builder_.empty()) {
2841         // First key in this block
2842         num_keys_ = 1;
2843         return false;
2844       }
2845       // Flush every 10 keys
2846       if (num_keys_ == 10) {
2847         num_keys_ = 1;
2848         return true;
2849       }
2850       num_keys_++;
2851       return false;
2852     }
2853 
2854    private:
2855     int num_keys_;
2856     const BlockBuilder& data_block_builder_;
2857   };
2858 
2859   class MyBlockCache : public CacheWrapper {
2860    public:
MyBlockCache(std::shared_ptr<Cache> target)2861     explicit MyBlockCache(std::shared_ptr<Cache> target)
2862         : CacheWrapper(target),
2863           num_lookups_(0),
2864           num_found_(0),
2865           num_inserts_(0) {}
2866 
Name() const2867     const char* Name() const override { return "MyBlockCache"; }
2868 
2869     using Cache::Insert;
Insert(const Slice & key,void * value,size_t charge,void (* deleter)(const Slice & key,void * value),Handle ** handle=nullptr,Priority priority=Priority::LOW)2870     Status Insert(const Slice& key, void* value, size_t charge,
2871                   void (*deleter)(const Slice& key, void* value),
2872                   Handle** handle = nullptr,
2873                   Priority priority = Priority::LOW) override {
2874       num_inserts_++;
2875       return target_->Insert(key, value, charge, deleter, handle, priority);
2876     }
2877 
2878     using Cache::Lookup;
Lookup(const Slice & key,Statistics * stats=nullptr)2879     Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override {
2880       num_lookups_++;
2881       Handle* handle = target_->Lookup(key, stats);
2882       if (handle != nullptr) {
2883         num_found_++;
2884       }
2885       return handle;
2886     }
num_lookups()2887     int num_lookups() { return num_lookups_; }
2888 
num_found()2889     int num_found() { return num_found_; }
2890 
num_inserts()2891     int num_inserts() { return num_inserts_; }
2892 
2893    private:
2894     int num_lookups_;
2895     int num_found_;
2896     int num_inserts_;
2897   };
2898 
2899   std::shared_ptr<MyBlockCache> compressed_cache_;
2900   std::shared_ptr<MyBlockCache> uncompressed_cache_;
2901   Options options_;
2902   bool compression_enabled_;
2903   std::vector<std::string> values_;
2904   std::vector<std::string> uncompressable_values_;
2905   bool fill_cache_;
2906   std::vector<std::string> cf_names_;
2907 };
2908 
2909 class DBBasicTestWithParallelIO
2910     : public DBBasicTestMultiGet,
2911       public testing::WithParamInterface<
2912           std::tuple<bool, bool, bool, bool, uint32_t>> {
2913  public:
DBBasicTestWithParallelIO()2914   DBBasicTestWithParallelIO()
2915       : DBBasicTestMultiGet("/db_basic_test_with_parallel_io", 1,
2916                             std::get<0>(GetParam()), std::get<1>(GetParam()),
2917                             std::get<2>(GetParam()), std::get<3>(GetParam()),
2918                             std::get<4>(GetParam())) {}
2919 };
2920 
TEST_P(DBBasicTestWithParallelIO,MultiGet)2921 TEST_P(DBBasicTestWithParallelIO, MultiGet) {
2922   std::vector<std::string> key_data(10);
2923   std::vector<Slice> keys;
2924   // We cannot resize a PinnableSlice vector, so just set initial size to
2925   // largest we think we will need
2926   std::vector<PinnableSlice> values(10);
2927   std::vector<Status> statuses;
2928   ReadOptions ro;
2929   ro.fill_cache = fill_cache();
2930 
2931   // Warm up the cache first
2932   key_data.emplace_back(Key(0));
2933   keys.emplace_back(Slice(key_data.back()));
2934   key_data.emplace_back(Key(50));
2935   keys.emplace_back(Slice(key_data.back()));
2936   statuses.resize(keys.size());
2937 
2938   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2939                      keys.data(), values.data(), statuses.data(), true);
2940   ASSERT_TRUE(CheckValue(0, values[0].ToString()));
2941   ASSERT_TRUE(CheckValue(50, values[1].ToString()));
2942 
2943   int random_reads = env_->random_read_counter_.Read();
2944   key_data[0] = Key(1);
2945   key_data[1] = Key(51);
2946   keys[0] = Slice(key_data[0]);
2947   keys[1] = Slice(key_data[1]);
2948   values[0].Reset();
2949   values[1].Reset();
2950   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2951                      keys.data(), values.data(), statuses.data(), true);
2952   ASSERT_TRUE(CheckValue(1, values[0].ToString()));
2953   ASSERT_TRUE(CheckValue(51, values[1].ToString()));
2954 
2955   bool read_from_cache = false;
2956   if (fill_cache()) {
2957     if (has_uncompressed_cache()) {
2958       read_from_cache = true;
2959     } else if (has_compressed_cache() && compression_enabled()) {
2960       read_from_cache = true;
2961     }
2962   }
2963 
2964   int expected_reads = random_reads + (read_from_cache ? 0 : 2);
2965   ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2966 
2967   keys.resize(10);
2968   statuses.resize(10);
2969   std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2970   for (size_t i = 0; i < key_ints.size(); ++i) {
2971     key_data[i] = Key(key_ints[i]);
2972     keys[i] = Slice(key_data[i]);
2973     statuses[i] = Status::OK();
2974     values[i].Reset();
2975   }
2976   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2977                      keys.data(), values.data(), statuses.data(), true);
2978   for (size_t i = 0; i < key_ints.size(); ++i) {
2979     ASSERT_OK(statuses[i]);
2980     ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
2981   }
2982   if (compression_enabled() && !has_compressed_cache()) {
2983     expected_reads += (read_from_cache ? 2 : 3);
2984   } else {
2985     expected_reads += (read_from_cache ? 2 : 4);
2986   }
2987   ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2988 
2989   keys.resize(10);
2990   statuses.resize(10);
2991   std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2992   for (size_t i = 0; i < key_uncmp.size(); ++i) {
2993     key_data[i] = "a" + Key(key_uncmp[i]);
2994     keys[i] = Slice(key_data[i]);
2995     statuses[i] = Status::OK();
2996     values[i].Reset();
2997   }
2998   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2999                      keys.data(), values.data(), statuses.data(), true);
3000   for (size_t i = 0; i < key_uncmp.size(); ++i) {
3001     ASSERT_OK(statuses[i]);
3002     ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
3003   }
3004   if (compression_enabled() && !has_compressed_cache()) {
3005     expected_reads += (read_from_cache ? 3 : 3);
3006   } else {
3007     expected_reads += (read_from_cache ? 4 : 4);
3008   }
3009   ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
3010 
3011   keys.resize(5);
3012   statuses.resize(5);
3013   std::vector<int> key_tr{1, 2, 15, 16, 55};
3014   for (size_t i = 0; i < key_tr.size(); ++i) {
3015     key_data[i] = "a" + Key(key_tr[i]);
3016     keys[i] = Slice(key_data[i]);
3017     statuses[i] = Status::OK();
3018     values[i].Reset();
3019   }
3020   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3021                      keys.data(), values.data(), statuses.data(), true);
3022   for (size_t i = 0; i < key_tr.size(); ++i) {
3023     ASSERT_OK(statuses[i]);
3024     ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
3025   }
3026   if (compression_enabled() && !has_compressed_cache()) {
3027     expected_reads += (read_from_cache ? 0 : 2);
3028     ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
3029   } else {
3030     if (has_uncompressed_cache()) {
3031       expected_reads += (read_from_cache ? 0 : 3);
3032       ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
3033     } else {
3034       // A rare case, even we enable the block compression but some of data
3035       // blocks are not compressed due to content. If user only enable the
3036       // compressed cache, the uncompressed blocks will not tbe cached, and
3037       // block reads will be triggered. The number of reads is related to
3038       // the compression algorithm.
3039       ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
3040     }
3041   }
3042 }
3043 
3044 #ifndef ROCKSDB_LITE
TEST_P(DBBasicTestWithParallelIO,MultiGetDirectIO)3045 TEST_P(DBBasicTestWithParallelIO, MultiGetDirectIO) {
3046   class FakeDirectIOEnv : public EnvWrapper {
3047     class FakeDirectIOSequentialFile;
3048     class FakeDirectIORandomAccessFile;
3049 
3050    public:
3051     FakeDirectIOEnv(Env* env) : EnvWrapper(env) {}
3052 
3053     Status NewRandomAccessFile(const std::string& fname,
3054                                std::unique_ptr<RandomAccessFile>* result,
3055                                const EnvOptions& options) override {
3056       std::unique_ptr<RandomAccessFile> file;
3057       assert(options.use_direct_reads);
3058       EnvOptions opts = options;
3059       opts.use_direct_reads = false;
3060       Status s = target()->NewRandomAccessFile(fname, &file, opts);
3061       if (!s.ok()) {
3062         return s;
3063       }
3064       result->reset(new FakeDirectIORandomAccessFile(std::move(file)));
3065       return s;
3066     }
3067 
3068    private:
3069     class FakeDirectIOSequentialFile : public SequentialFileWrapper {
3070      public:
3071       FakeDirectIOSequentialFile(std::unique_ptr<SequentialFile>&& file)
3072           : SequentialFileWrapper(file.get()), file_(std::move(file)) {}
3073       ~FakeDirectIOSequentialFile() {}
3074 
3075       bool use_direct_io() const override { return true; }
3076       size_t GetRequiredBufferAlignment() const override { return 1; }
3077 
3078      private:
3079       std::unique_ptr<SequentialFile> file_;
3080     };
3081 
3082     class FakeDirectIORandomAccessFile : public RandomAccessFileWrapper {
3083      public:
3084       FakeDirectIORandomAccessFile(std::unique_ptr<RandomAccessFile>&& file)
3085           : RandomAccessFileWrapper(file.get()), file_(std::move(file)) {}
3086       ~FakeDirectIORandomAccessFile() {}
3087 
3088       bool use_direct_io() const override { return true; }
3089       size_t GetRequiredBufferAlignment() const override { return 1; }
3090 
3091      private:
3092       std::unique_ptr<RandomAccessFile> file_;
3093     };
3094   };
3095 
3096   std::unique_ptr<FakeDirectIOEnv> env(new FakeDirectIOEnv(env_));
3097   Options opts = get_options();
3098   opts.env = env.get();
3099   opts.use_direct_reads = true;
3100   Reopen(opts);
3101 
3102   std::vector<std::string> key_data(10);
3103   std::vector<Slice> keys;
3104   // We cannot resize a PinnableSlice vector, so just set initial size to
3105   // largest we think we will need
3106   std::vector<PinnableSlice> values(10);
3107   std::vector<Status> statuses;
3108   ReadOptions ro;
3109   ro.fill_cache = fill_cache();
3110 
3111   // Warm up the cache first
3112   key_data.emplace_back(Key(0));
3113   keys.emplace_back(Slice(key_data.back()));
3114   key_data.emplace_back(Key(50));
3115   keys.emplace_back(Slice(key_data.back()));
3116   statuses.resize(keys.size());
3117 
3118   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3119                      keys.data(), values.data(), statuses.data(), true);
3120   ASSERT_TRUE(CheckValue(0, values[0].ToString()));
3121   ASSERT_TRUE(CheckValue(50, values[1].ToString()));
3122 
3123   int random_reads = env_->random_read_counter_.Read();
3124   key_data[0] = Key(1);
3125   key_data[1] = Key(51);
3126   keys[0] = Slice(key_data[0]);
3127   keys[1] = Slice(key_data[1]);
3128   values[0].Reset();
3129   values[1].Reset();
3130   if (uncompressed_cache_) {
3131     uncompressed_cache_->SetCapacity(0);
3132     uncompressed_cache_->SetCapacity(1048576);
3133   }
3134   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3135                      keys.data(), values.data(), statuses.data(), true);
3136   ASSERT_TRUE(CheckValue(1, values[0].ToString()));
3137   ASSERT_TRUE(CheckValue(51, values[1].ToString()));
3138 
3139   bool read_from_cache = false;
3140   if (fill_cache()) {
3141     if (has_uncompressed_cache()) {
3142       read_from_cache = true;
3143     } else if (has_compressed_cache() && compression_enabled()) {
3144       read_from_cache = true;
3145     }
3146   }
3147 
3148   int expected_reads = random_reads;
3149   if (!compression_enabled() || !has_compressed_cache()) {
3150     expected_reads += 2;
3151   } else {
3152     expected_reads += (read_from_cache ? 0 : 2);
3153   }
3154   if (env_->random_read_counter_.Read() != expected_reads) {
3155     ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
3156   }
3157   Close();
3158 }
3159 #endif  // ROCKSDB_LITE
3160 
TEST_P(DBBasicTestWithParallelIO,MultiGetWithChecksumMismatch)3161 TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
3162   std::vector<std::string> key_data(10);
3163   std::vector<Slice> keys;
3164   // We cannot resize a PinnableSlice vector, so just set initial size to
3165   // largest we think we will need
3166   std::vector<PinnableSlice> values(10);
3167   std::vector<Status> statuses;
3168   int read_count = 0;
3169   ReadOptions ro;
3170   ro.fill_cache = fill_cache();
3171 
3172   SyncPoint::GetInstance()->SetCallBack(
3173       "RetrieveMultipleBlocks:VerifyChecksum", [&](void* status) {
3174         Status* s = static_cast<Status*>(status);
3175         read_count++;
3176         if (read_count == 2) {
3177           *s = Status::Corruption();
3178         }
3179       });
3180   SyncPoint::GetInstance()->EnableProcessing();
3181 
3182   // Warm up the cache first
3183   key_data.emplace_back(Key(0));
3184   keys.emplace_back(Slice(key_data.back()));
3185   key_data.emplace_back(Key(50));
3186   keys.emplace_back(Slice(key_data.back()));
3187   statuses.resize(keys.size());
3188 
3189   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3190                      keys.data(), values.data(), statuses.data(), true);
3191   ASSERT_TRUE(CheckValue(0, values[0].ToString()));
3192   // ASSERT_TRUE(CheckValue(50, values[1].ToString()));
3193   ASSERT_EQ(statuses[0], Status::OK());
3194   ASSERT_EQ(statuses[1], Status::Corruption());
3195 
3196   SyncPoint::GetInstance()->DisableProcessing();
3197 }
3198 
TEST_P(DBBasicTestWithParallelIO,MultiGetWithMissingFile)3199 TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
3200   std::vector<std::string> key_data(10);
3201   std::vector<Slice> keys;
3202   // We cannot resize a PinnableSlice vector, so just set initial size to
3203   // largest we think we will need
3204   std::vector<PinnableSlice> values(10);
3205   std::vector<Status> statuses;
3206   ReadOptions ro;
3207   ro.fill_cache = fill_cache();
3208 
3209   SyncPoint::GetInstance()->SetCallBack(
3210       "TableCache::MultiGet:FindTable", [&](void* status) {
3211         Status* s = static_cast<Status*>(status);
3212         *s = Status::IOError();
3213       });
3214   // DB open will create table readers unless we reduce the table cache
3215   // capacity.
3216   // SanitizeOptions will set max_open_files to minimum of 20. Table cache
3217   // is allocated with max_open_files - 10 as capacity. So override
3218   // max_open_files to 11 so table cache capacity will become 1. This will
3219   // prevent file open during DB open and force the file to be opened
3220   // during MultiGet
3221   SyncPoint::GetInstance()->SetCallBack(
3222       "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3223         int* max_open_files = (int*)arg;
3224         *max_open_files = 11;
3225       });
3226   SyncPoint::GetInstance()->EnableProcessing();
3227 
3228   Reopen(CurrentOptions());
3229 
3230   // Warm up the cache first
3231   key_data.emplace_back(Key(0));
3232   keys.emplace_back(Slice(key_data.back()));
3233   key_data.emplace_back(Key(50));
3234   keys.emplace_back(Slice(key_data.back()));
3235   statuses.resize(keys.size());
3236 
3237   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3238                      keys.data(), values.data(), statuses.data(), true);
3239   ASSERT_EQ(statuses[0], Status::IOError());
3240   ASSERT_EQ(statuses[1], Status::IOError());
3241 
3242   SyncPoint::GetInstance()->DisableProcessing();
3243 }
3244 
3245 INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO,
3246                         // Params are as follows -
3247                         // Param 0 - Compressed cache enabled
3248                         // Param 1 - Uncompressed cache enabled
3249                         // Param 2 - Data compression enabled
3250                         // Param 3 - ReadOptions::fill_cache
3251                         // Param 4 - CompressionOptions::parallel_threads
3252                         ::testing::Combine(::testing::Bool(), ::testing::Bool(),
3253                                            ::testing::Bool(), ::testing::Bool(),
3254                                            ::testing::Values(1, 4)));
3255 
3256 // Forward declaration
3257 class DeadlineFS;
3258 
3259 class DeadlineRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
3260  public:
DeadlineRandomAccessFile(DeadlineFS & fs,std::unique_ptr<FSRandomAccessFile> & file)3261   DeadlineRandomAccessFile(DeadlineFS& fs,
3262                            std::unique_ptr<FSRandomAccessFile>& file)
3263       : FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {}
3264 
3265   IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
3266                 Slice* result, char* scratch,
3267                 IODebugContext* dbg) const override;
3268 
3269   IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
3270                      const IOOptions& options, IODebugContext* dbg) override;
3271 
3272  private:
3273   DeadlineFS& fs_;
3274   std::unique_ptr<FSRandomAccessFile> file_;
3275 };
3276 
3277 class DeadlineFS : public FileSystemWrapper {
3278  public:
3279   // The error_on_delay parameter specifies whether a IOStatus::TimedOut()
3280   // status should be returned after delaying the IO to exceed the timeout,
3281   // or to simply delay but return success anyway. The latter mimics the
3282   // behavior of PosixFileSystem, which does not enforce any timeout
DeadlineFS(SpecialEnv * env,bool error_on_delay)3283   explicit DeadlineFS(SpecialEnv* env, bool error_on_delay)
3284       : FileSystemWrapper(env->GetFileSystem()),
3285         deadline_(std::chrono::microseconds::zero()),
3286         io_timeout_(std::chrono::microseconds::zero()),
3287         env_(env),
3288         timedout_(false),
3289         ignore_deadline_(false),
3290         error_on_delay_(error_on_delay) {}
3291 
NewRandomAccessFile(const std::string & fname,const FileOptions & opts,std::unique_ptr<FSRandomAccessFile> * result,IODebugContext * dbg)3292   IOStatus NewRandomAccessFile(const std::string& fname,
3293                                const FileOptions& opts,
3294                                std::unique_ptr<FSRandomAccessFile>* result,
3295                                IODebugContext* dbg) override {
3296     std::unique_ptr<FSRandomAccessFile> file;
3297     IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
3298     EXPECT_OK(s);
3299     result->reset(new DeadlineRandomAccessFile(*this, file));
3300 
3301     const std::chrono::microseconds deadline = GetDeadline();
3302     const std::chrono::microseconds io_timeout = GetIOTimeout();
3303     if (deadline.count() || io_timeout.count()) {
3304       AssertDeadline(deadline, io_timeout, opts.io_options);
3305     }
3306     return ShouldDelay(opts.io_options);
3307   }
3308 
3309   // Set a vector of {IO counter, delay in microseconds, return status} tuples
3310   // that control when to inject a delay and duration of the delay
SetDelayTrigger(const std::chrono::microseconds deadline,const std::chrono::microseconds io_timeout,const int trigger)3311   void SetDelayTrigger(const std::chrono::microseconds deadline,
3312                        const std::chrono::microseconds io_timeout,
3313                        const int trigger) {
3314     delay_trigger_ = trigger;
3315     io_count_ = 0;
3316     deadline_ = deadline;
3317     io_timeout_ = io_timeout;
3318     timedout_ = false;
3319   }
3320 
3321   // Increment the IO counter and return a delay in microseconds
ShouldDelay(const IOOptions & opts)3322   IOStatus ShouldDelay(const IOOptions& opts) {
3323     if (timedout_) {
3324       return IOStatus::TimedOut();
3325     } else if (!deadline_.count() && !io_timeout_.count()) {
3326       return IOStatus::OK();
3327     }
3328     if (!ignore_deadline_ && delay_trigger_ == io_count_++) {
3329       env_->SleepForMicroseconds(static_cast<int>(opts.timeout.count() + 1));
3330       timedout_ = true;
3331       if (error_on_delay_) {
3332         return IOStatus::TimedOut();
3333       }
3334     }
3335     return IOStatus::OK();
3336   }
3337 
GetDeadline()3338   const std::chrono::microseconds GetDeadline() {
3339     return ignore_deadline_ ? std::chrono::microseconds::zero() : deadline_;
3340   }
3341 
GetIOTimeout()3342   const std::chrono::microseconds GetIOTimeout() {
3343     return ignore_deadline_ ? std::chrono::microseconds::zero() : io_timeout_;
3344   }
3345 
TimedOut()3346   bool TimedOut() { return timedout_; }
3347 
IgnoreDeadline(bool ignore)3348   void IgnoreDeadline(bool ignore) { ignore_deadline_ = ignore; }
3349 
AssertDeadline(const std::chrono::microseconds deadline,const std::chrono::microseconds io_timeout,const IOOptions & opts) const3350   void AssertDeadline(const std::chrono::microseconds deadline,
3351                       const std::chrono::microseconds io_timeout,
3352                       const IOOptions& opts) const {
3353     // Give a leeway of +- 10us as it can take some time for the Get/
3354     // MultiGet call to reach here, in order to avoid false alarms
3355     std::chrono::microseconds now =
3356         std::chrono::microseconds(env_->NowMicros());
3357     std::chrono::microseconds timeout;
3358     if (deadline.count()) {
3359       timeout = deadline - now;
3360       if (io_timeout.count()) {
3361         timeout = std::min(timeout, io_timeout);
3362       }
3363     } else {
3364       timeout = io_timeout;
3365     }
3366     if (opts.timeout != timeout) {
3367       ASSERT_EQ(timeout, opts.timeout);
3368     }
3369   }
3370 
3371  private:
3372   // The number of IOs to trigger the delay after
3373   int delay_trigger_;
3374   // Current IO count
3375   int io_count_;
3376   // ReadOptions deadline for the Get/MultiGet/Iterator
3377   std::chrono::microseconds deadline_;
3378   // ReadOptions io_timeout for the Get/MultiGet/Iterator
3379   std::chrono::microseconds io_timeout_;
3380   SpecialEnv* env_;
3381   // Flag to indicate whether we injected a delay
3382   bool timedout_;
3383   // Temporarily ignore deadlines/timeouts
3384   bool ignore_deadline_;
3385   // Return IOStatus::TimedOut() or IOStatus::OK()
3386   bool error_on_delay_;
3387 };
3388 
Read(uint64_t offset,size_t len,const IOOptions & opts,Slice * result,char * scratch,IODebugContext * dbg) const3389 IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len,
3390                                         const IOOptions& opts, Slice* result,
3391                                         char* scratch,
3392                                         IODebugContext* dbg) const {
3393   const std::chrono::microseconds deadline = fs_.GetDeadline();
3394   const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
3395   IOStatus s;
3396   if (deadline.count() || io_timeout.count()) {
3397     fs_.AssertDeadline(deadline, io_timeout, opts);
3398   }
3399   if (s.ok()) {
3400     s = FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch,
3401                                         dbg);
3402   }
3403   if (s.ok()) {
3404     s = fs_.ShouldDelay(opts);
3405   }
3406   return s;
3407 }
3408 
MultiRead(FSReadRequest * reqs,size_t num_reqs,const IOOptions & options,IODebugContext * dbg)3409 IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs,
3410                                              size_t num_reqs,
3411                                              const IOOptions& options,
3412                                              IODebugContext* dbg) {
3413   const std::chrono::microseconds deadline = fs_.GetDeadline();
3414   const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
3415   IOStatus s;
3416   if (deadline.count() || io_timeout.count()) {
3417     fs_.AssertDeadline(deadline, io_timeout, options);
3418   }
3419   if (s.ok()) {
3420     s = FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg);
3421   }
3422   if (s.ok()) {
3423     s = fs_.ShouldDelay(options);
3424   }
3425   return s;
3426 }
3427 
3428 // A test class for intercepting random reads and injecting artificial
3429 // delays. Used for testing the MultiGet deadline feature
3430 class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
3431  public:
DBBasicTestMultiGetDeadline()3432   DBBasicTestMultiGetDeadline()
3433       : DBBasicTestMultiGet(
3434             "db_basic_test_multiget_deadline" /*Test dir*/,
3435             10 /*# of column families*/, false /*compressed cache enabled*/,
3436             true /*uncompressed cache enabled*/, true /*compression enabled*/,
3437             true /*ReadOptions.fill_cache*/,
3438             1 /*# of parallel compression threads*/) {}
3439 
CheckStatus(std::vector<Status> & statuses,size_t num_ok)3440   inline void CheckStatus(std::vector<Status>& statuses, size_t num_ok) {
3441     for (size_t i = 0; i < statuses.size(); ++i) {
3442       if (i < num_ok) {
3443         EXPECT_OK(statuses[i]);
3444       } else {
3445         if (statuses[i] != Status::TimedOut()) {
3446           EXPECT_EQ(statuses[i], Status::TimedOut());
3447         }
3448       }
3449     }
3450   }
3451 };
3452 
TEST_F(DBBasicTestMultiGetDeadline,MultiGetDeadlineExceeded)3453 TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
3454   std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, false);
3455   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
3456   Options options = CurrentOptions();
3457 
3458   std::shared_ptr<Cache> cache = NewLRUCache(1048576);
3459   BlockBasedTableOptions table_options;
3460   table_options.block_cache = cache;
3461   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3462   options.env = env.get();
3463   SetTimeElapseOnlySleepOnReopen(&options);
3464   ReopenWithColumnFamilies(GetCFNames(), options);
3465 
3466   // Test the non-batched version of MultiGet with multiple column
3467   // families
3468   std::vector<std::string> key_str;
3469   size_t i;
3470   for (i = 0; i < 5; ++i) {
3471     key_str.emplace_back(Key(static_cast<int>(i)));
3472   }
3473   std::vector<ColumnFamilyHandle*> cfs(key_str.size());
3474   ;
3475   std::vector<Slice> keys(key_str.size());
3476   std::vector<std::string> values(key_str.size());
3477   for (i = 0; i < key_str.size(); ++i) {
3478     cfs[i] = handles_[i];
3479     keys[i] = Slice(key_str[i].data(), key_str[i].size());
3480   }
3481 
3482   ReadOptions ro;
3483   ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3484   // Delay the first IO
3485   fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
3486 
3487   std::vector<Status> statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
3488   // The first key is successful because we check after the lookup, but
3489   // subsequent keys fail due to deadline exceeded
3490   CheckStatus(statuses, 1);
3491 
3492   // Clear the cache
3493   cache->SetCapacity(0);
3494   cache->SetCapacity(1048576);
3495   // Test non-batched Multiget with multiple column families and
3496   // introducing an IO delay in one of the middle CFs
3497   key_str.clear();
3498   for (i = 0; i < 10; ++i) {
3499     key_str.emplace_back(Key(static_cast<int>(i)));
3500   }
3501   cfs.resize(key_str.size());
3502   keys.resize(key_str.size());
3503   values.resize(key_str.size());
3504   for (i = 0; i < key_str.size(); ++i) {
3505     // 2 keys per CF
3506     cfs[i] = handles_[i / 2];
3507     keys[i] = Slice(key_str[i].data(), key_str[i].size());
3508   }
3509   ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3510   fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1);
3511   statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
3512   CheckStatus(statuses, 3);
3513 
3514   // Test batched MultiGet with an IO delay in the first data block read.
3515   // Both keys in the first CF should succeed as they're in the same data
3516   // block and would form one batch, and we check for deadline between
3517   // batches.
3518   std::vector<PinnableSlice> pin_values(keys.size());
3519   cache->SetCapacity(0);
3520   cache->SetCapacity(1048576);
3521   statuses.clear();
3522   statuses.resize(keys.size());
3523   ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3524   fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
3525   dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
3526                      pin_values.data(), statuses.data());
3527   CheckStatus(statuses, 2);
3528 
3529   // Similar to the previous one, but an IO delay in the third CF data block
3530   // read
3531   for (PinnableSlice& value : pin_values) {
3532     value.Reset();
3533   }
3534   cache->SetCapacity(0);
3535   cache->SetCapacity(1048576);
3536   statuses.clear();
3537   statuses.resize(keys.size());
3538   ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3539   fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 2);
3540   dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
3541                      pin_values.data(), statuses.data());
3542   CheckStatus(statuses, 6);
3543 
3544   // Similar to the previous one, but an IO delay in the last but one CF
3545   for (PinnableSlice& value : pin_values) {
3546     value.Reset();
3547   }
3548   cache->SetCapacity(0);
3549   cache->SetCapacity(1048576);
3550   statuses.clear();
3551   statuses.resize(keys.size());
3552   ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3553   fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 3);
3554   dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
3555                      pin_values.data(), statuses.data());
3556   CheckStatus(statuses, 8);
3557 
3558   // Test batched MultiGet with single CF and lots of keys. Inject delay
3559   // into the second batch of keys. As each batch is 32, the first 64 keys,
3560   // i.e first two batches, should succeed and the rest should time out
3561   for (PinnableSlice& value : pin_values) {
3562     value.Reset();
3563   }
3564   cache->SetCapacity(0);
3565   cache->SetCapacity(1048576);
3566   key_str.clear();
3567   for (i = 0; i < 100; ++i) {
3568     key_str.emplace_back(Key(static_cast<int>(i)));
3569   }
3570   keys.resize(key_str.size());
3571   pin_values.clear();
3572   pin_values.resize(key_str.size());
3573   for (i = 0; i < key_str.size(); ++i) {
3574     keys[i] = Slice(key_str[i].data(), key_str[i].size());
3575   }
3576   statuses.clear();
3577   statuses.resize(keys.size());
3578   ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3579   fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1);
3580   dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(),
3581                      pin_values.data(), statuses.data());
3582   CheckStatus(statuses, 64);
3583   Close();
3584 }
3585 
TEST_F(DBBasicTest,ManifestWriteFailure)3586 TEST_F(DBBasicTest, ManifestWriteFailure) {
3587   Options options = GetDefaultOptions();
3588   options.create_if_missing = true;
3589   options.disable_auto_compactions = true;
3590   options.env = env_;
3591   DestroyAndReopen(options);
3592   ASSERT_OK(Put("foo", "bar"));
3593   ASSERT_OK(Flush());
3594   SyncPoint::GetInstance()->DisableProcessing();
3595   SyncPoint::GetInstance()->ClearAllCallBacks();
3596   SyncPoint::GetInstance()->SetCallBack(
3597       "VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
3598         ASSERT_NE(nullptr, arg);
3599         auto* s = reinterpret_cast<Status*>(arg);
3600         ASSERT_OK(*s);
3601         // Manually overwrite return status
3602         *s = Status::IOError();
3603       });
3604   SyncPoint::GetInstance()->EnableProcessing();
3605   ASSERT_OK(Put("key", "value"));
3606   ASSERT_NOK(Flush());
3607   SyncPoint::GetInstance()->DisableProcessing();
3608   SyncPoint::GetInstance()->ClearAllCallBacks();
3609   SyncPoint::GetInstance()->EnableProcessing();
3610   Reopen(options);
3611 }
3612 
3613 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,VerifyFileChecksums)3614 TEST_F(DBBasicTest, VerifyFileChecksums) {
3615   Options options = GetDefaultOptions();
3616   options.create_if_missing = true;
3617   options.env = env_;
3618   DestroyAndReopen(options);
3619   ASSERT_OK(Put("a", "value"));
3620   ASSERT_OK(Flush());
3621   ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
3622 
3623   options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
3624   Reopen(options);
3625   ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
3626 
3627   // Write an L0 with checksum computed.
3628   ASSERT_OK(Put("b", "value"));
3629   ASSERT_OK(Flush());
3630 
3631   ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
3632 
3633   // Does the right thing but with the wrong name -- using it should lead to an
3634   // error.
3635   class MisnamedFileChecksumGenerator : public FileChecksumGenCrc32c {
3636    public:
3637     MisnamedFileChecksumGenerator(const FileChecksumGenContext& context)
3638         : FileChecksumGenCrc32c(context) {}
3639 
3640     const char* Name() const override { return "sha1"; }
3641   };
3642 
3643   class MisnamedFileChecksumGenFactory : public FileChecksumGenCrc32cFactory {
3644    public:
3645     std::unique_ptr<FileChecksumGenerator> CreateFileChecksumGenerator(
3646         const FileChecksumGenContext& context) override {
3647       return std::unique_ptr<FileChecksumGenerator>(
3648           new MisnamedFileChecksumGenerator(context));
3649     }
3650   };
3651 
3652   options.file_checksum_gen_factory.reset(new MisnamedFileChecksumGenFactory());
3653   Reopen(options);
3654   ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
3655 }
3656 #endif  // !ROCKSDB_LITE
3657 
3658 // A test class for intercepting random reads and injecting artificial
3659 // delays. Used for testing the deadline/timeout feature
3660 class DBBasicTestDeadline
3661     : public DBBasicTest,
3662       public testing::WithParamInterface<std::tuple<bool, bool>> {};
3663 
TEST_P(DBBasicTestDeadline,PointLookupDeadline)3664 TEST_P(DBBasicTestDeadline, PointLookupDeadline) {
3665   std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
3666   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
3667   bool set_deadline = std::get<0>(GetParam());
3668   bool set_timeout = std::get<1>(GetParam());
3669 
3670   for (int option_config = kDefault; option_config < kEnd; ++option_config) {
3671     if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
3672       continue;
3673     }
3674     option_config_ = option_config;
3675     Options options = CurrentOptions();
3676     if (options.use_direct_reads) {
3677       continue;
3678     }
3679     options.env = env.get();
3680     options.disable_auto_compactions = true;
3681     Cache* block_cache = nullptr;
3682     // Fileter block reads currently don't cause the request to get
3683     // aborted on a read timeout, so its possible those block reads
3684     // may get issued even if the deadline is past
3685     SyncPoint::GetInstance()->SetCallBack(
3686         "BlockBasedTable::Get:BeforeFilterMatch",
3687         [&](void* /*arg*/) { fs->IgnoreDeadline(true); });
3688     SyncPoint::GetInstance()->SetCallBack(
3689         "BlockBasedTable::Get:AfterFilterMatch",
3690         [&](void* /*arg*/) { fs->IgnoreDeadline(false); });
3691     // DB open will create table readers unless we reduce the table cache
3692     // capacity.
3693     // SanitizeOptions will set max_open_files to minimum of 20. Table cache
3694     // is allocated with max_open_files - 10 as capacity. So override
3695     // max_open_files to 11 so table cache capacity will become 1. This will
3696     // prevent file open during DB open and force the file to be opened
3697     // during MultiGet
3698     SyncPoint::GetInstance()->SetCallBack(
3699         "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3700           int* max_open_files = (int*)arg;
3701           *max_open_files = 11;
3702         });
3703     SyncPoint::GetInstance()->EnableProcessing();
3704 
3705     SetTimeElapseOnlySleepOnReopen(&options);
3706     Reopen(options);
3707 
3708     if (options.table_factory) {
3709       block_cache = options.table_factory->GetOptions<Cache>(
3710           TableFactory::kBlockCacheOpts());
3711     }
3712 
3713     Random rnd(301);
3714     for (int i = 0; i < 400; ++i) {
3715       std::string key = "k" + ToString(i);
3716       ASSERT_OK(Put(key, rnd.RandomString(100)));
3717     }
3718     ASSERT_OK(Flush());
3719 
3720     bool timedout = true;
3721     // A timeout will be forced when the IO counter reaches this value
3722     int io_deadline_trigger = 0;
3723     // Keep incrementing io_deadline_trigger and call Get() until there is an
3724     // iteration that doesn't cause a timeout. This ensures that we cover
3725     // all file reads in the point lookup path that can potentially timeout
3726     // and cause the Get() to fail.
3727     while (timedout) {
3728       ReadOptions ro;
3729       if (set_deadline) {
3730         ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3731       }
3732       if (set_timeout) {
3733         ro.io_timeout = std::chrono::microseconds{5000};
3734       }
3735       fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
3736 
3737       block_cache->SetCapacity(0);
3738       block_cache->SetCapacity(1048576);
3739 
3740       std::string value;
3741       Status s = dbfull()->Get(ro, "k50", &value);
3742       if (fs->TimedOut()) {
3743         ASSERT_EQ(s, Status::TimedOut());
3744       } else {
3745         timedout = false;
3746         ASSERT_OK(s);
3747       }
3748       io_deadline_trigger++;
3749     }
3750     // Reset the delay sequence in order to avoid false alarms during Reopen
3751     fs->SetDelayTrigger(std::chrono::microseconds::zero(),
3752                         std::chrono::microseconds::zero(), 0);
3753   }
3754   Close();
3755 }
3756 
TEST_P(DBBasicTestDeadline,IteratorDeadline)3757 TEST_P(DBBasicTestDeadline, IteratorDeadline) {
3758   std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
3759   std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
3760   bool set_deadline = std::get<0>(GetParam());
3761   bool set_timeout = std::get<1>(GetParam());
3762 
3763   for (int option_config = kDefault; option_config < kEnd; ++option_config) {
3764     if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
3765       continue;
3766     }
3767     Options options = CurrentOptions();
3768     if (options.use_direct_reads) {
3769       continue;
3770     }
3771     options.env = env.get();
3772     options.disable_auto_compactions = true;
3773     Cache* block_cache = nullptr;
3774     // DB open will create table readers unless we reduce the table cache
3775     // capacity.
3776     // SanitizeOptions will set max_open_files to minimum of 20. Table cache
3777     // is allocated with max_open_files - 10 as capacity. So override
3778     // max_open_files to 11 so table cache capacity will become 1. This will
3779     // prevent file open during DB open and force the file to be opened
3780     // during MultiGet
3781     SyncPoint::GetInstance()->SetCallBack(
3782         "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3783           int* max_open_files = (int*)arg;
3784           *max_open_files = 11;
3785         });
3786     SyncPoint::GetInstance()->EnableProcessing();
3787 
3788     SetTimeElapseOnlySleepOnReopen(&options);
3789     Reopen(options);
3790 
3791     if (options.table_factory) {
3792       block_cache = options.table_factory->GetOptions<Cache>(
3793           TableFactory::kBlockCacheOpts());
3794     }
3795 
3796     Random rnd(301);
3797     for (int i = 0; i < 400; ++i) {
3798       std::string key = "k" + ToString(i);
3799       ASSERT_OK(Put(key, rnd.RandomString(100)));
3800     }
3801     ASSERT_OK(Flush());
3802 
3803     bool timedout = true;
3804     // A timeout will be forced when the IO counter reaches this value
3805     int io_deadline_trigger = 0;
3806     // Keep incrementing io_deadline_trigger and call Get() until there is an
3807     // iteration that doesn't cause a timeout. This ensures that we cover
3808     // all file reads in the point lookup path that can potentially timeout
3809     while (timedout) {
3810       ReadOptions ro;
3811       if (set_deadline) {
3812         ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3813       }
3814       if (set_timeout) {
3815         ro.io_timeout = std::chrono::microseconds{5000};
3816       }
3817       fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
3818 
3819       block_cache->SetCapacity(0);
3820       block_cache->SetCapacity(1048576);
3821 
3822       Iterator* iter = dbfull()->NewIterator(ro);
3823       int count = 0;
3824       iter->Seek("k50");
3825       while (iter->Valid() && count++ < 100) {
3826         iter->Next();
3827       }
3828       if (fs->TimedOut()) {
3829         ASSERT_FALSE(iter->Valid());
3830         ASSERT_EQ(iter->status(), Status::TimedOut());
3831       } else {
3832         timedout = false;
3833         ASSERT_OK(iter->status());
3834       }
3835       delete iter;
3836       io_deadline_trigger++;
3837     }
3838     // Reset the delay sequence in order to avoid false alarms during Reopen
3839     fs->SetDelayTrigger(std::chrono::microseconds::zero(),
3840                         std::chrono::microseconds::zero(), 0);
3841   }
3842   Close();
3843 }
3844 
3845 // Param 0: If true, set read_options.deadline
3846 // Param 1: If true, set read_options.io_timeout
3847 INSTANTIATE_TEST_CASE_P(DBBasicTestDeadline, DBBasicTestDeadline,
3848                         ::testing::Values(std::make_tuple(true, false),
3849                                           std::make_tuple(false, true),
3850                                           std::make_tuple(true, true)));
3851 }  // namespace ROCKSDB_NAMESPACE
3852 
3853 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
3854 extern "C" {
3855 void RegisterCustomObjects(int argc, char** argv);
3856 }
3857 #else
RegisterCustomObjects(int,char **)3858 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
3859 #endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
3860 
main(int argc,char ** argv)3861 int main(int argc, char** argv) {
3862   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
3863   ::testing::InitGoogleTest(&argc, argv);
3864   RegisterCustomObjects(argc, argv);
3865   return RUN_ALL_TESTS();
3866 }
3867