1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include <cstring>
11
12 #include "db/db_test_util.h"
13 #include "port/stack_trace.h"
14 #include "rocksdb/flush_block_policy.h"
15 #include "rocksdb/merge_operator.h"
16 #include "rocksdb/perf_context.h"
17 #include "rocksdb/utilities/debug.h"
18 #include "table/block_based/block_based_table_reader.h"
19 #include "table/block_based/block_builder.h"
20 #if !defined(ROCKSDB_LITE)
21 #include "test_util/sync_point.h"
22 #endif
23 #include "util/file_checksum_helper.h"
24 #include "util/random.h"
25 #include "utilities/fault_injection_env.h"
26 #include "utilities/merge_operators.h"
27 #include "utilities/merge_operators/string_append/stringappend.h"
28
29 namespace ROCKSDB_NAMESPACE {
30
31 class DBBasicTest : public DBTestBase {
32 public:
DBBasicTest()33 DBBasicTest() : DBTestBase("db_basic_test", /*env_do_fsync=*/false) {}
34 };
35
TEST_F(DBBasicTest,OpenWhenOpen)36 TEST_F(DBBasicTest, OpenWhenOpen) {
37 Options options = CurrentOptions();
38 options.env = env_;
39 DB* db2 = nullptr;
40 Status s = DB::Open(options, dbname_, &db2);
41 ASSERT_NOK(s) << [db2]() {
42 delete db2;
43 return "db2 open: ok";
44 }();
45 ASSERT_EQ(Status::Code::kIOError, s.code());
46 ASSERT_EQ(Status::SubCode::kNone, s.subcode());
47 ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
48
49 delete db2;
50 }
51
TEST_F(DBBasicTest,UniqueSession)52 TEST_F(DBBasicTest, UniqueSession) {
53 Options options = CurrentOptions();
54 std::string sid1, sid2, sid3, sid4;
55
56 ASSERT_OK(db_->GetDbSessionId(sid1));
57 Reopen(options);
58 ASSERT_OK(db_->GetDbSessionId(sid2));
59 ASSERT_OK(Put("foo", "v1"));
60 ASSERT_OK(db_->GetDbSessionId(sid4));
61 Reopen(options);
62 ASSERT_OK(db_->GetDbSessionId(sid3));
63
64 ASSERT_NE(sid1, sid2);
65 ASSERT_NE(sid1, sid3);
66 ASSERT_NE(sid2, sid3);
67
68 ASSERT_EQ(sid2, sid4);
69
70 // Expected compact format for session ids (see notes in implementation)
71 TestRegex expected("[0-9A-Z]{20}");
72 EXPECT_MATCHES_REGEX(sid1, expected);
73 EXPECT_MATCHES_REGEX(sid2, expected);
74 EXPECT_MATCHES_REGEX(sid3, expected);
75
76 #ifndef ROCKSDB_LITE
77 Close();
78 ASSERT_OK(ReadOnlyReopen(options));
79 ASSERT_OK(db_->GetDbSessionId(sid1));
80 // Test uniqueness between readonly open (sid1) and regular open (sid3)
81 ASSERT_NE(sid1, sid3);
82 Close();
83 ASSERT_OK(ReadOnlyReopen(options));
84 ASSERT_OK(db_->GetDbSessionId(sid2));
85 ASSERT_EQ("v1", Get("foo"));
86 ASSERT_OK(db_->GetDbSessionId(sid3));
87
88 ASSERT_NE(sid1, sid2);
89
90 ASSERT_EQ(sid2, sid3);
91 #endif // ROCKSDB_LITE
92
93 CreateAndReopenWithCF({"goku"}, options);
94 ASSERT_OK(db_->GetDbSessionId(sid1));
95 ASSERT_OK(Put("bar", "e1"));
96 ASSERT_OK(db_->GetDbSessionId(sid2));
97 ASSERT_EQ("e1", Get("bar"));
98 ASSERT_OK(db_->GetDbSessionId(sid3));
99 ReopenWithColumnFamilies({"default", "goku"}, options);
100 ASSERT_OK(db_->GetDbSessionId(sid4));
101
102 ASSERT_EQ(sid1, sid2);
103 ASSERT_EQ(sid2, sid3);
104
105 ASSERT_NE(sid1, sid4);
106 }
107
108 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,ReadOnlyDB)109 TEST_F(DBBasicTest, ReadOnlyDB) {
110 ASSERT_OK(Put("foo", "v1"));
111 ASSERT_OK(Put("bar", "v2"));
112 ASSERT_OK(Put("foo", "v3"));
113 Close();
114
115 auto verify_one_iter = [&](Iterator* iter) {
116 int count = 0;
117 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
118 ASSERT_OK(iter->status());
119 ++count;
120 }
121 // Always expect two keys: "foo" and "bar"
122 ASSERT_EQ(count, 2);
123 };
124
125 auto verify_all_iters = [&]() {
126 Iterator* iter = db_->NewIterator(ReadOptions());
127 verify_one_iter(iter);
128 delete iter;
129
130 std::vector<Iterator*> iters;
131 ASSERT_OK(db_->NewIterators(ReadOptions(),
132 {dbfull()->DefaultColumnFamily()}, &iters));
133 ASSERT_EQ(static_cast<uint64_t>(1), iters.size());
134 verify_one_iter(iters[0]);
135 delete iters[0];
136 };
137
138 auto options = CurrentOptions();
139 assert(options.env == env_);
140 ASSERT_OK(ReadOnlyReopen(options));
141 ASSERT_EQ("v3", Get("foo"));
142 ASSERT_EQ("v2", Get("bar"));
143 verify_all_iters();
144 Close();
145
146 // Reopen and flush memtable.
147 Reopen(options);
148 ASSERT_OK(Flush());
149 Close();
150 // Now check keys in read only mode.
151 ASSERT_OK(ReadOnlyReopen(options));
152 ASSERT_EQ("v3", Get("foo"));
153 ASSERT_EQ("v2", Get("bar"));
154 verify_all_iters();
155 ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
156 }
157
TEST_F(DBBasicTest,ReadOnlyDBWithWriteDBIdToManifestSet)158 TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
159 ASSERT_OK(Put("foo", "v1"));
160 ASSERT_OK(Put("bar", "v2"));
161 ASSERT_OK(Put("foo", "v3"));
162 Close();
163
164 auto options = CurrentOptions();
165 options.write_dbid_to_manifest = true;
166 assert(options.env == env_);
167 ASSERT_OK(ReadOnlyReopen(options));
168 std::string db_id1;
169 ASSERT_OK(db_->GetDbIdentity(db_id1));
170 ASSERT_EQ("v3", Get("foo"));
171 ASSERT_EQ("v2", Get("bar"));
172 Iterator* iter = db_->NewIterator(ReadOptions());
173 int count = 0;
174 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
175 ASSERT_OK(iter->status());
176 ++count;
177 }
178 ASSERT_EQ(count, 2);
179 delete iter;
180 Close();
181
182 // Reopen and flush memtable.
183 Reopen(options);
184 ASSERT_OK(Flush());
185 Close();
186 // Now check keys in read only mode.
187 ASSERT_OK(ReadOnlyReopen(options));
188 ASSERT_EQ("v3", Get("foo"));
189 ASSERT_EQ("v2", Get("bar"));
190 ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
191 std::string db_id2;
192 ASSERT_OK(db_->GetDbIdentity(db_id2));
193 ASSERT_EQ(db_id1, db_id2);
194 }
195
TEST_F(DBBasicTest,CompactedDB)196 TEST_F(DBBasicTest, CompactedDB) {
197 const uint64_t kFileSize = 1 << 20;
198 Options options = CurrentOptions();
199 options.disable_auto_compactions = true;
200 options.write_buffer_size = kFileSize;
201 options.target_file_size_base = kFileSize;
202 options.max_bytes_for_level_base = 1 << 30;
203 options.compression = kNoCompression;
204 Reopen(options);
205 // 1 L0 file, use CompactedDB if max_open_files = -1
206 ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
207 ASSERT_OK(Flush());
208 Close();
209 ASSERT_OK(ReadOnlyReopen(options));
210 Status s = Put("new", "value");
211 ASSERT_EQ(s.ToString(),
212 "Not implemented: Not supported operation in read only mode.");
213 ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
214 Close();
215 options.max_open_files = -1;
216 ASSERT_OK(ReadOnlyReopen(options));
217 s = Put("new", "value");
218 ASSERT_EQ(s.ToString(),
219 "Not implemented: Not supported in compacted db mode.");
220 ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
221 Close();
222 Reopen(options);
223 // Add more L0 files
224 ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
225 ASSERT_OK(Flush());
226 ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
227 ASSERT_OK(Flush());
228 ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
229 ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
230 ASSERT_OK(Flush());
231 Close();
232
233 ASSERT_OK(ReadOnlyReopen(options));
234 // Fallback to read-only DB
235 s = Put("new", "value");
236 ASSERT_EQ(s.ToString(),
237 "Not implemented: Not supported operation in read only mode.");
238 Close();
239
240 // Full compaction
241 Reopen(options);
242 // Add more keys
243 ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
244 ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
245 ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
246 ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
247 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
248 ASSERT_EQ(3, NumTableFilesAtLevel(1));
249 Close();
250
251 // CompactedDB
252 ASSERT_OK(ReadOnlyReopen(options));
253 s = Put("new", "value");
254 ASSERT_EQ(s.ToString(),
255 "Not implemented: Not supported in compacted db mode.");
256 ASSERT_EQ("NOT_FOUND", Get("abc"));
257 ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
258 ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
259 ASSERT_EQ("NOT_FOUND", Get("ccc"));
260 ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
261 ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
262 ASSERT_EQ("NOT_FOUND", Get("ggg"));
263 ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
264 ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
265 ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
266 ASSERT_EQ("NOT_FOUND", Get("kkk"));
267
268 // MultiGet
269 std::vector<std::string> values;
270 std::vector<Status> status_list = dbfull()->MultiGet(
271 ReadOptions(),
272 std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
273 Slice("ggg"), Slice("iii"), Slice("kkk")}),
274 &values);
275 ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
276 ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
277 ASSERT_OK(status_list[0]);
278 ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
279 ASSERT_TRUE(status_list[1].IsNotFound());
280 ASSERT_OK(status_list[2]);
281 ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
282 ASSERT_TRUE(status_list[3].IsNotFound());
283 ASSERT_OK(status_list[4]);
284 ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
285 ASSERT_TRUE(status_list[5].IsNotFound());
286
287 Reopen(options);
288 // Add a key
289 ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
290 Close();
291 ASSERT_OK(ReadOnlyReopen(options));
292 s = Put("new", "value");
293 ASSERT_EQ(s.ToString(),
294 "Not implemented: Not supported operation in read only mode.");
295 }
296
TEST_F(DBBasicTest,LevelLimitReopen)297 TEST_F(DBBasicTest, LevelLimitReopen) {
298 Options options = CurrentOptions();
299 CreateAndReopenWithCF({"pikachu"}, options);
300
301 const std::string value(1024 * 1024, ' ');
302 int i = 0;
303 while (NumTableFilesAtLevel(2, 1) == 0) {
304 ASSERT_OK(Put(1, Key(i++), value));
305 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
306 ASSERT_OK(dbfull()->TEST_WaitForCompact());
307 }
308
309 options.num_levels = 1;
310 options.max_bytes_for_level_multiplier_additional.resize(1, 1);
311 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
312 ASSERT_EQ(s.IsInvalidArgument(), true);
313 ASSERT_EQ(s.ToString(),
314 "Invalid argument: db has more levels than options.num_levels");
315
316 options.num_levels = 10;
317 options.max_bytes_for_level_multiplier_additional.resize(10, 1);
318 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
319 }
320 #endif // ROCKSDB_LITE
321
TEST_F(DBBasicTest,PutDeleteGet)322 TEST_F(DBBasicTest, PutDeleteGet) {
323 do {
324 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
325 ASSERT_OK(Put(1, "foo", "v1"));
326 ASSERT_EQ("v1", Get(1, "foo"));
327 ASSERT_OK(Put(1, "foo", "v2"));
328 ASSERT_EQ("v2", Get(1, "foo"));
329 ASSERT_OK(Delete(1, "foo"));
330 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
331 } while (ChangeOptions());
332 }
333
TEST_F(DBBasicTest,PutSingleDeleteGet)334 TEST_F(DBBasicTest, PutSingleDeleteGet) {
335 do {
336 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
337 ASSERT_OK(Put(1, "foo", "v1"));
338 ASSERT_EQ("v1", Get(1, "foo"));
339 ASSERT_OK(Put(1, "foo2", "v2"));
340 ASSERT_EQ("v2", Get(1, "foo2"));
341 ASSERT_OK(SingleDelete(1, "foo"));
342 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
343 // Ski FIFO and universal compaction because they do not apply to the test
344 // case. Skip MergePut because single delete does not get removed when it
345 // encounters a merge.
346 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
347 kSkipMergePut));
348 }
349
TEST_F(DBBasicTest,EmptyFlush)350 TEST_F(DBBasicTest, EmptyFlush) {
351 // It is possible to produce empty flushes when using single deletes. Tests
352 // whether empty flushes cause issues.
353 do {
354 Random rnd(301);
355
356 Options options = CurrentOptions();
357 options.disable_auto_compactions = true;
358 CreateAndReopenWithCF({"pikachu"}, options);
359
360 ASSERT_OK(Put(1, "a", Slice()));
361 ASSERT_OK(SingleDelete(1, "a"));
362 ASSERT_OK(Flush(1));
363
364 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
365 // Skip FIFO and universal compaction as they do not apply to the test
366 // case. Skip MergePut because merges cannot be combined with single
367 // deletions.
368 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
369 kSkipMergePut));
370 }
371
TEST_F(DBBasicTest,GetFromVersions)372 TEST_F(DBBasicTest, GetFromVersions) {
373 do {
374 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
375 ASSERT_OK(Put(1, "foo", "v1"));
376 ASSERT_OK(Flush(1));
377 ASSERT_EQ("v1", Get(1, "foo"));
378 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
379 } while (ChangeOptions());
380 }
381
382 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,GetSnapshot)383 TEST_F(DBBasicTest, GetSnapshot) {
384 anon::OptionsOverride options_override;
385 options_override.skip_policy = kSkipNoSnapshot;
386 do {
387 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
388 // Try with both a short key and a long key
389 for (int i = 0; i < 2; i++) {
390 std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
391 ASSERT_OK(Put(1, key, "v1"));
392 const Snapshot* s1 = db_->GetSnapshot();
393 ASSERT_OK(Put(1, key, "v2"));
394 ASSERT_EQ("v2", Get(1, key));
395 ASSERT_EQ("v1", Get(1, key, s1));
396 ASSERT_OK(Flush(1));
397 ASSERT_EQ("v2", Get(1, key));
398 ASSERT_EQ("v1", Get(1, key, s1));
399 db_->ReleaseSnapshot(s1);
400 }
401 } while (ChangeOptions());
402 }
403 #endif // ROCKSDB_LITE
404
TEST_F(DBBasicTest,CheckLock)405 TEST_F(DBBasicTest, CheckLock) {
406 do {
407 DB* localdb = nullptr;
408 Options options = CurrentOptions();
409 ASSERT_OK(TryReopen(options));
410
411 // second open should fail
412 Status s = DB::Open(options, dbname_, &localdb);
413 ASSERT_NOK(s) << [localdb]() {
414 delete localdb;
415 return "localdb open: ok";
416 }();
417 #ifdef OS_LINUX
418 ASSERT_TRUE(s.ToString().find("lock ") != std::string::npos);
419 #endif // OS_LINUX
420 } while (ChangeCompactOptions());
421 }
422
TEST_F(DBBasicTest,FlushMultipleMemtable)423 TEST_F(DBBasicTest, FlushMultipleMemtable) {
424 do {
425 Options options = CurrentOptions();
426 WriteOptions writeOpt = WriteOptions();
427 writeOpt.disableWAL = true;
428 options.max_write_buffer_number = 4;
429 options.min_write_buffer_number_to_merge = 3;
430 options.max_write_buffer_size_to_maintain = -1;
431 CreateAndReopenWithCF({"pikachu"}, options);
432 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
433 ASSERT_OK(Flush(1));
434 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
435
436 ASSERT_EQ("v1", Get(1, "foo"));
437 ASSERT_EQ("v1", Get(1, "bar"));
438 ASSERT_OK(Flush(1));
439 } while (ChangeCompactOptions());
440 }
441
TEST_F(DBBasicTest,FlushEmptyColumnFamily)442 TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
443 // Block flush thread and disable compaction thread
444 env_->SetBackgroundThreads(1, Env::HIGH);
445 env_->SetBackgroundThreads(1, Env::LOW);
446 test::SleepingBackgroundTask sleeping_task_low;
447 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
448 Env::Priority::LOW);
449 test::SleepingBackgroundTask sleeping_task_high;
450 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
451 &sleeping_task_high, Env::Priority::HIGH);
452
453 Options options = CurrentOptions();
454 // disable compaction
455 options.disable_auto_compactions = true;
456 WriteOptions writeOpt = WriteOptions();
457 writeOpt.disableWAL = true;
458 options.max_write_buffer_number = 2;
459 options.min_write_buffer_number_to_merge = 1;
460 options.max_write_buffer_size_to_maintain =
461 static_cast<int64_t>(options.write_buffer_size);
462 CreateAndReopenWithCF({"pikachu"}, options);
463
464 // Compaction can still go through even if no thread can flush the
465 // mem table.
466 ASSERT_OK(Flush(0));
467 ASSERT_OK(Flush(1));
468
469 // Insert can go through
470 ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
471 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
472
473 ASSERT_EQ("v1", Get(0, "foo"));
474 ASSERT_EQ("v1", Get(1, "bar"));
475
476 sleeping_task_high.WakeUp();
477 sleeping_task_high.WaitUntilDone();
478
479 // Flush can still go through.
480 ASSERT_OK(Flush(0));
481 ASSERT_OK(Flush(1));
482
483 sleeping_task_low.WakeUp();
484 sleeping_task_low.WaitUntilDone();
485 }
486
TEST_F(DBBasicTest,Flush)487 TEST_F(DBBasicTest, Flush) {
488 do {
489 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
490 WriteOptions writeOpt = WriteOptions();
491 writeOpt.disableWAL = true;
492 SetPerfLevel(kEnableTime);
493 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
494 // this will now also flush the last 2 writes
495 ASSERT_OK(Flush(1));
496 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
497
498 get_perf_context()->Reset();
499 Get(1, "foo");
500 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
501 ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
502
503 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
504 ASSERT_EQ("v1", Get(1, "foo"));
505 ASSERT_EQ("v1", Get(1, "bar"));
506
507 writeOpt.disableWAL = true;
508 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
509 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
510 ASSERT_OK(Flush(1));
511
512 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
513 ASSERT_EQ("v2", Get(1, "bar"));
514 get_perf_context()->Reset();
515 ASSERT_EQ("v2", Get(1, "foo"));
516 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
517
518 writeOpt.disableWAL = false;
519 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
520 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
521 ASSERT_OK(Flush(1));
522
523 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
524 // 'foo' should be there because its put
525 // has WAL enabled.
526 ASSERT_EQ("v3", Get(1, "foo"));
527 ASSERT_EQ("v3", Get(1, "bar"));
528
529 SetPerfLevel(kDisable);
530 } while (ChangeCompactOptions());
531 }
532
TEST_F(DBBasicTest,ManifestRollOver)533 TEST_F(DBBasicTest, ManifestRollOver) {
534 do {
535 Options options;
536 options.max_manifest_file_size = 10; // 10 bytes
537 options = CurrentOptions(options);
538 CreateAndReopenWithCF({"pikachu"}, options);
539 {
540 ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
541 ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
542 ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
543 uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
544 ASSERT_OK(Flush(1)); // This should trigger LogAndApply.
545 uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
546 ASSERT_GT(manifest_after_flush, manifest_before_flush);
547 ReopenWithColumnFamilies({"default", "pikachu"}, options);
548 ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
549 // check if a new manifest file got inserted or not.
550 ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
551 ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
552 ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
553 }
554 } while (ChangeCompactOptions());
555 }
556
TEST_F(DBBasicTest,IdentityAcrossRestarts1)557 TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
558 do {
559 std::string id1;
560 ASSERT_OK(db_->GetDbIdentity(id1));
561
562 Options options = CurrentOptions();
563 Reopen(options);
564 std::string id2;
565 ASSERT_OK(db_->GetDbIdentity(id2));
566 // id1 should match id2 because identity was not regenerated
567 ASSERT_EQ(id1.compare(id2), 0);
568
569 std::string idfilename = IdentityFileName(dbname_);
570 ASSERT_OK(env_->DeleteFile(idfilename));
571 Reopen(options);
572 std::string id3;
573 ASSERT_OK(db_->GetDbIdentity(id3));
574 if (options.write_dbid_to_manifest) {
575 ASSERT_EQ(id1.compare(id3), 0);
576 } else {
577 // id1 should NOT match id3 because identity was regenerated
578 ASSERT_NE(id1.compare(id3), 0);
579 }
580 } while (ChangeCompactOptions());
581 }
582
TEST_F(DBBasicTest,IdentityAcrossRestarts2)583 TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
584 do {
585 std::string id1;
586 ASSERT_OK(db_->GetDbIdentity(id1));
587
588 Options options = CurrentOptions();
589 options.write_dbid_to_manifest = true;
590 Reopen(options);
591 std::string id2;
592 ASSERT_OK(db_->GetDbIdentity(id2));
593 // id1 should match id2 because identity was not regenerated
594 ASSERT_EQ(id1.compare(id2), 0);
595
596 std::string idfilename = IdentityFileName(dbname_);
597 ASSERT_OK(env_->DeleteFile(idfilename));
598 Reopen(options);
599 std::string id3;
600 ASSERT_OK(db_->GetDbIdentity(id3));
601 // id1 should NOT match id3 because identity was regenerated
602 ASSERT_EQ(id1, id3);
603 } while (ChangeCompactOptions());
604 }
605
606 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,Snapshot)607 TEST_F(DBBasicTest, Snapshot) {
608 env_->SetMockSleep();
609 anon::OptionsOverride options_override;
610 options_override.skip_policy = kSkipNoSnapshot;
611 do {
612 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
613 ASSERT_OK(Put(0, "foo", "0v1"));
614 ASSERT_OK(Put(1, "foo", "1v1"));
615
616 const Snapshot* s1 = db_->GetSnapshot();
617 ASSERT_EQ(1U, GetNumSnapshots());
618 uint64_t time_snap1 = GetTimeOldestSnapshots();
619 ASSERT_GT(time_snap1, 0U);
620 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
621 ASSERT_OK(Put(0, "foo", "0v2"));
622 ASSERT_OK(Put(1, "foo", "1v2"));
623
624 env_->MockSleepForSeconds(1);
625
626 const Snapshot* s2 = db_->GetSnapshot();
627 ASSERT_EQ(2U, GetNumSnapshots());
628 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
629 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
630 ASSERT_OK(Put(0, "foo", "0v3"));
631 ASSERT_OK(Put(1, "foo", "1v3"));
632
633 {
634 ManagedSnapshot s3(db_);
635 ASSERT_EQ(3U, GetNumSnapshots());
636 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
637 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
638
639 ASSERT_OK(Put(0, "foo", "0v4"));
640 ASSERT_OK(Put(1, "foo", "1v4"));
641 ASSERT_EQ("0v1", Get(0, "foo", s1));
642 ASSERT_EQ("1v1", Get(1, "foo", s1));
643 ASSERT_EQ("0v2", Get(0, "foo", s2));
644 ASSERT_EQ("1v2", Get(1, "foo", s2));
645 ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
646 ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
647 ASSERT_EQ("0v4", Get(0, "foo"));
648 ASSERT_EQ("1v4", Get(1, "foo"));
649 }
650
651 ASSERT_EQ(2U, GetNumSnapshots());
652 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
653 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
654 ASSERT_EQ("0v1", Get(0, "foo", s1));
655 ASSERT_EQ("1v1", Get(1, "foo", s1));
656 ASSERT_EQ("0v2", Get(0, "foo", s2));
657 ASSERT_EQ("1v2", Get(1, "foo", s2));
658 ASSERT_EQ("0v4", Get(0, "foo"));
659 ASSERT_EQ("1v4", Get(1, "foo"));
660
661 db_->ReleaseSnapshot(s1);
662 ASSERT_EQ("0v2", Get(0, "foo", s2));
663 ASSERT_EQ("1v2", Get(1, "foo", s2));
664 ASSERT_EQ("0v4", Get(0, "foo"));
665 ASSERT_EQ("1v4", Get(1, "foo"));
666 ASSERT_EQ(1U, GetNumSnapshots());
667 ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
668 ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
669
670 db_->ReleaseSnapshot(s2);
671 ASSERT_EQ(0U, GetNumSnapshots());
672 ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
673 ASSERT_EQ("0v4", Get(0, "foo"));
674 ASSERT_EQ("1v4", Get(1, "foo"));
675 } while (ChangeOptions());
676 }
677
678 #endif // ROCKSDB_LITE
679
680 class DBBasicMultiConfigs : public DBBasicTest,
681 public ::testing::WithParamInterface<int> {
682 public:
DBBasicMultiConfigs()683 DBBasicMultiConfigs() { option_config_ = GetParam(); }
684
GenerateOptionConfigs()685 static std::vector<int> GenerateOptionConfigs() {
686 std::vector<int> option_configs;
687 for (int option_config = kDefault; option_config < kEnd; ++option_config) {
688 if (!ShouldSkipOptions(option_config, kSkipFIFOCompaction)) {
689 option_configs.push_back(option_config);
690 }
691 }
692 return option_configs;
693 }
694 };
695
TEST_P(DBBasicMultiConfigs,CompactBetweenSnapshots)696 TEST_P(DBBasicMultiConfigs, CompactBetweenSnapshots) {
697 anon::OptionsOverride options_override;
698 options_override.skip_policy = kSkipNoSnapshot;
699 Options options = CurrentOptions(options_override);
700 options.disable_auto_compactions = true;
701 DestroyAndReopen(options);
702 CreateAndReopenWithCF({"pikachu"}, options);
703 Random rnd(301);
704 FillLevels("a", "z", 1);
705
706 ASSERT_OK(Put(1, "foo", "first"));
707 const Snapshot* snapshot1 = db_->GetSnapshot();
708 ASSERT_OK(Put(1, "foo", "second"));
709 ASSERT_OK(Put(1, "foo", "third"));
710 ASSERT_OK(Put(1, "foo", "fourth"));
711 const Snapshot* snapshot2 = db_->GetSnapshot();
712 ASSERT_OK(Put(1, "foo", "fifth"));
713 ASSERT_OK(Put(1, "foo", "sixth"));
714
715 // All entries (including duplicates) exist
716 // before any compaction or flush is triggered.
717 ASSERT_EQ(AllEntriesFor("foo", 1),
718 "[ sixth, fifth, fourth, third, second, first ]");
719 ASSERT_EQ("sixth", Get(1, "foo"));
720 ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
721 ASSERT_EQ("first", Get(1, "foo", snapshot1));
722
723 // After a flush, "second", "third" and "fifth" should
724 // be removed
725 ASSERT_OK(Flush(1));
726 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
727
728 // after we release the snapshot1, only two values left
729 db_->ReleaseSnapshot(snapshot1);
730 FillLevels("a", "z", 1);
731 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
732 nullptr));
733
734 // We have only one valid snapshot snapshot2. Since snapshot1 is
735 // not valid anymore, "first" should be removed by a compaction.
736 ASSERT_EQ("sixth", Get(1, "foo"));
737 ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
738 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
739
740 // after we release the snapshot2, only one value should be left
741 db_->ReleaseSnapshot(snapshot2);
742 FillLevels("a", "z", 1);
743 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
744 nullptr));
745 ASSERT_EQ("sixth", Get(1, "foo"));
746 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
747 }
748
749 INSTANTIATE_TEST_CASE_P(
750 DBBasicMultiConfigs, DBBasicMultiConfigs,
751 ::testing::ValuesIn(DBBasicMultiConfigs::GenerateOptionConfigs()));
752
TEST_F(DBBasicTest,DBOpen_Options)753 TEST_F(DBBasicTest, DBOpen_Options) {
754 Options options = CurrentOptions();
755 Close();
756 Destroy(options);
757
758 // Does not exist, and create_if_missing == false: error
759 DB* db = nullptr;
760 options.create_if_missing = false;
761 Status s = DB::Open(options, dbname_, &db);
762 ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
763 ASSERT_TRUE(db == nullptr);
764
765 // Does not exist, and create_if_missing == true: OK
766 options.create_if_missing = true;
767 s = DB::Open(options, dbname_, &db);
768 ASSERT_OK(s);
769 ASSERT_TRUE(db != nullptr);
770
771 delete db;
772 db = nullptr;
773
774 // Does exist, and error_if_exists == true: error
775 options.create_if_missing = false;
776 options.error_if_exists = true;
777 s = DB::Open(options, dbname_, &db);
778 ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
779 ASSERT_TRUE(db == nullptr);
780
781 // Does exist, and error_if_exists == false: OK
782 options.create_if_missing = true;
783 options.error_if_exists = false;
784 s = DB::Open(options, dbname_, &db);
785 ASSERT_OK(s);
786 ASSERT_TRUE(db != nullptr);
787
788 delete db;
789 db = nullptr;
790 }
791
TEST_F(DBBasicTest,CompactOnFlush)792 TEST_F(DBBasicTest, CompactOnFlush) {
793 anon::OptionsOverride options_override;
794 options_override.skip_policy = kSkipNoSnapshot;
795 do {
796 Options options = CurrentOptions(options_override);
797 options.disable_auto_compactions = true;
798 CreateAndReopenWithCF({"pikachu"}, options);
799
800 ASSERT_OK(Put(1, "foo", "v1"));
801 ASSERT_OK(Flush(1));
802 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
803
804 // Write two new keys
805 ASSERT_OK(Put(1, "a", "begin"));
806 ASSERT_OK(Put(1, "z", "end"));
807 ASSERT_OK(Flush(1));
808
809 // Case1: Delete followed by a put
810 ASSERT_OK(Delete(1, "foo"));
811 ASSERT_OK(Put(1, "foo", "v2"));
812 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
813
814 // After the current memtable is flushed, the DEL should
815 // have been removed
816 ASSERT_OK(Flush(1));
817 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
818
819 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
820 nullptr, nullptr));
821 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
822
823 // Case 2: Delete followed by another delete
824 ASSERT_OK(Delete(1, "foo"));
825 ASSERT_OK(Delete(1, "foo"));
826 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
827 ASSERT_OK(Flush(1));
828 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
829 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
830 nullptr, nullptr));
831 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
832
833 // Case 3: Put followed by a delete
834 ASSERT_OK(Put(1, "foo", "v3"));
835 ASSERT_OK(Delete(1, "foo"));
836 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
837 ASSERT_OK(Flush(1));
838 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
839 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
840 nullptr, nullptr));
841 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
842
843 // Case 4: Put followed by another Put
844 ASSERT_OK(Put(1, "foo", "v4"));
845 ASSERT_OK(Put(1, "foo", "v5"));
846 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
847 ASSERT_OK(Flush(1));
848 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
849 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
850 nullptr, nullptr));
851 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
852
853 // clear database
854 ASSERT_OK(Delete(1, "foo"));
855 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
856 nullptr, nullptr));
857 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
858
859 // Case 5: Put followed by snapshot followed by another Put
860 // Both puts should remain.
861 ASSERT_OK(Put(1, "foo", "v6"));
862 const Snapshot* snapshot = db_->GetSnapshot();
863 ASSERT_OK(Put(1, "foo", "v7"));
864 ASSERT_OK(Flush(1));
865 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
866 db_->ReleaseSnapshot(snapshot);
867
868 // clear database
869 ASSERT_OK(Delete(1, "foo"));
870 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
871 nullptr, nullptr));
872 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
873
874 // Case 5: snapshot followed by a put followed by another Put
875 // Only the last put should remain.
876 const Snapshot* snapshot1 = db_->GetSnapshot();
877 ASSERT_OK(Put(1, "foo", "v8"));
878 ASSERT_OK(Put(1, "foo", "v9"));
879 ASSERT_OK(Flush(1));
880 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
881 db_->ReleaseSnapshot(snapshot1);
882 } while (ChangeCompactOptions());
883 }
884
TEST_F(DBBasicTest,FlushOneColumnFamily)885 TEST_F(DBBasicTest, FlushOneColumnFamily) {
886 Options options = CurrentOptions();
887 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
888 "alyosha", "popovich"},
889 options);
890
891 ASSERT_OK(Put(0, "Default", "Default"));
892 ASSERT_OK(Put(1, "pikachu", "pikachu"));
893 ASSERT_OK(Put(2, "ilya", "ilya"));
894 ASSERT_OK(Put(3, "muromec", "muromec"));
895 ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
896 ASSERT_OK(Put(5, "nikitich", "nikitich"));
897 ASSERT_OK(Put(6, "alyosha", "alyosha"));
898 ASSERT_OK(Put(7, "popovich", "popovich"));
899
900 for (int i = 0; i < 8; ++i) {
901 ASSERT_OK(Flush(i));
902 auto tables = ListTableFiles(env_, dbname_);
903 ASSERT_EQ(tables.size(), i + 1U);
904 }
905 }
906
TEST_F(DBBasicTest,MultiGetSimple)907 TEST_F(DBBasicTest, MultiGetSimple) {
908 do {
909 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
910 SetPerfLevel(kEnableCount);
911 ASSERT_OK(Put(1, "k1", "v1"));
912 ASSERT_OK(Put(1, "k2", "v2"));
913 ASSERT_OK(Put(1, "k3", "v3"));
914 ASSERT_OK(Put(1, "k4", "v4"));
915 ASSERT_OK(Delete(1, "k4"));
916 ASSERT_OK(Put(1, "k5", "v5"));
917 ASSERT_OK(Delete(1, "no_key"));
918
919 std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
920
921 std::vector<std::string> values(20, "Temporary data to be overwritten");
922 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
923
924 get_perf_context()->Reset();
925 std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
926 ASSERT_EQ(values.size(), keys.size());
927 ASSERT_EQ(values[0], "v1");
928 ASSERT_EQ(values[1], "v2");
929 ASSERT_EQ(values[2], "v3");
930 ASSERT_EQ(values[4], "v5");
931 // four kv pairs * two bytes per value
932 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
933
934 ASSERT_OK(s[0]);
935 ASSERT_OK(s[1]);
936 ASSERT_OK(s[2]);
937 ASSERT_TRUE(s[3].IsNotFound());
938 ASSERT_OK(s[4]);
939 ASSERT_TRUE(s[5].IsNotFound());
940 SetPerfLevel(kDisable);
941 } while (ChangeCompactOptions());
942 }
943
TEST_F(DBBasicTest,MultiGetEmpty)944 TEST_F(DBBasicTest, MultiGetEmpty) {
945 do {
946 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
947 // Empty Key Set
948 std::vector<Slice> keys;
949 std::vector<std::string> values;
950 std::vector<ColumnFamilyHandle*> cfs;
951 std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
952 ASSERT_EQ(s.size(), 0U);
953
954 // Empty Database, Empty Key Set
955 Options options = CurrentOptions();
956 options.create_if_missing = true;
957 DestroyAndReopen(options);
958 CreateAndReopenWithCF({"pikachu"}, options);
959 s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
960 ASSERT_EQ(s.size(), 0U);
961
962 // Empty Database, Search for Keys
963 keys.resize(2);
964 keys[0] = "a";
965 keys[1] = "b";
966 cfs.push_back(handles_[0]);
967 cfs.push_back(handles_[1]);
968 s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
969 ASSERT_EQ(static_cast<int>(s.size()), 2);
970 ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
971 } while (ChangeCompactOptions());
972 }
973
TEST_F(DBBasicTest,ChecksumTest)974 TEST_F(DBBasicTest, ChecksumTest) {
975 BlockBasedTableOptions table_options;
976 Options options = CurrentOptions();
977 // change when new checksum type added
978 int max_checksum = static_cast<int>(kxxHash64);
979 const int kNumPerFile = 2;
980
981 // generate one table with each type of checksum
982 for (int i = 0; i <= max_checksum; ++i) {
983 table_options.checksum = static_cast<ChecksumType>(i);
984 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
985 Reopen(options);
986 for (int j = 0; j < kNumPerFile; ++j) {
987 ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
988 }
989 ASSERT_OK(Flush());
990 }
991
992 // with each valid checksum type setting...
993 for (int i = 0; i <= max_checksum; ++i) {
994 table_options.checksum = static_cast<ChecksumType>(i);
995 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
996 Reopen(options);
997 // verify every type of checksum (should be regardless of that setting)
998 for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
999 ASSERT_EQ(Key(j), Get(Key(j)));
1000 }
1001 }
1002 }
1003
1004 // On Windows you can have either memory mapped file or a file
1005 // with unbuffered access. So this asserts and does not make
1006 // sense to run
1007 #ifndef OS_WIN
TEST_F(DBBasicTest,MmapAndBufferOptions)1008 TEST_F(DBBasicTest, MmapAndBufferOptions) {
1009 if (!IsMemoryMappedAccessSupported()) {
1010 return;
1011 }
1012 Options options = CurrentOptions();
1013
1014 options.use_direct_reads = true;
1015 options.allow_mmap_reads = true;
1016 ASSERT_NOK(TryReopen(options));
1017
1018 // All other combinations are acceptable
1019 options.use_direct_reads = false;
1020 ASSERT_OK(TryReopen(options));
1021
1022 if (IsDirectIOSupported()) {
1023 options.use_direct_reads = true;
1024 options.allow_mmap_reads = false;
1025 ASSERT_OK(TryReopen(options));
1026 }
1027
1028 options.use_direct_reads = false;
1029 ASSERT_OK(TryReopen(options));
1030 }
1031 #endif
1032
1033 class TestEnv : public EnvWrapper {
1034 public:
TestEnv(Env * base_env)1035 explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
1036
1037 class TestLogger : public Logger {
1038 public:
1039 using Logger::Logv;
TestLogger(TestEnv * env_ptr)1040 explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger()1041 ~TestLogger() override {
1042 if (!closed_) {
1043 CloseHelper().PermitUncheckedError();
1044 }
1045 }
Logv(const char *,va_list)1046 void Logv(const char* /*format*/, va_list /*ap*/) override {}
1047
1048 protected:
CloseImpl()1049 Status CloseImpl() override { return CloseHelper(); }
1050
1051 private:
CloseHelper()1052 Status CloseHelper() {
1053 env->CloseCountInc();
1054 ;
1055 return Status::IOError();
1056 }
1057 TestEnv* env;
1058 };
1059
CloseCountInc()1060 void CloseCountInc() { close_count++; }
1061
GetCloseCount()1062 int GetCloseCount() { return close_count; }
1063
NewLogger(const std::string &,std::shared_ptr<Logger> * result)1064 Status NewLogger(const std::string& /*fname*/,
1065 std::shared_ptr<Logger>* result) override {
1066 result->reset(new TestLogger(this));
1067 return Status::OK();
1068 }
1069
1070 private:
1071 int close_count;
1072 };
1073
TEST_F(DBBasicTest,DBClose)1074 TEST_F(DBBasicTest, DBClose) {
1075 Options options = GetDefaultOptions();
1076 std::string dbname = test::PerThreadDBPath("db_close_test");
1077 ASSERT_OK(DestroyDB(dbname, options));
1078
1079 DB* db = nullptr;
1080 TestEnv* env = new TestEnv(env_);
1081 std::unique_ptr<TestEnv> local_env_guard(env);
1082 options.create_if_missing = true;
1083 options.env = env;
1084 Status s = DB::Open(options, dbname, &db);
1085 ASSERT_OK(s);
1086 ASSERT_TRUE(db != nullptr);
1087
1088 s = db->Close();
1089 ASSERT_EQ(env->GetCloseCount(), 1);
1090 ASSERT_EQ(s, Status::IOError());
1091
1092 delete db;
1093 ASSERT_EQ(env->GetCloseCount(), 1);
1094
1095 // Do not call DB::Close() and ensure our logger Close() still gets called
1096 s = DB::Open(options, dbname, &db);
1097 ASSERT_OK(s);
1098 ASSERT_TRUE(db != nullptr);
1099 delete db;
1100 ASSERT_EQ(env->GetCloseCount(), 2);
1101
1102 // Provide our own logger and ensure DB::Close() does not close it
1103 options.info_log.reset(new TestEnv::TestLogger(env));
1104 options.create_if_missing = false;
1105 s = DB::Open(options, dbname, &db);
1106 ASSERT_OK(s);
1107 ASSERT_TRUE(db != nullptr);
1108
1109 s = db->Close();
1110 ASSERT_EQ(s, Status::OK());
1111 delete db;
1112 ASSERT_EQ(env->GetCloseCount(), 2);
1113 options.info_log.reset();
1114 ASSERT_EQ(env->GetCloseCount(), 3);
1115 }
1116
TEST_F(DBBasicTest,DBCloseFlushError)1117 TEST_F(DBBasicTest, DBCloseFlushError) {
1118 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
1119 new FaultInjectionTestEnv(env_));
1120 Options options = GetDefaultOptions();
1121 options.create_if_missing = true;
1122 options.manual_wal_flush = true;
1123 options.write_buffer_size = 100;
1124 options.env = fault_injection_env.get();
1125
1126 Reopen(options);
1127 ASSERT_OK(Put("key1", "value1"));
1128 ASSERT_OK(Put("key2", "value2"));
1129 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
1130 ASSERT_OK(Put("key3", "value3"));
1131 fault_injection_env->SetFilesystemActive(false);
1132 Status s = dbfull()->Close();
1133 ASSERT_NE(s, Status::OK());
1134 // retry should return the same error
1135 s = dbfull()->Close();
1136 ASSERT_NE(s, Status::OK());
1137 fault_injection_env->SetFilesystemActive(true);
1138 // retry close() is no-op even the system is back. Could be improved if
1139 // Close() is retry-able: #9029
1140 s = dbfull()->Close();
1141 ASSERT_NE(s, Status::OK());
1142 Destroy(options);
1143 }
1144
1145 class DBMultiGetTestWithParam : public DBBasicTest,
1146 public testing::WithParamInterface<bool> {};
1147
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCF)1148 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
1149 Options options = CurrentOptions();
1150 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1151 "alyosha", "popovich"},
1152 options);
1153 // <CF, key, value> tuples
1154 std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
1155 static const int num_keys = 24;
1156 cf_kv_vec.reserve(num_keys);
1157
1158 for (int i = 0; i < num_keys; ++i) {
1159 int cf = i / 3;
1160 int cf_key = 1 % 3;
1161 cf_kv_vec.emplace_back(std::make_tuple(
1162 cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
1163 "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
1164 ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1165 std::get<2>(cf_kv_vec[i])));
1166 }
1167
1168 int get_sv_count = 0;
1169 ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
1170 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1171 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1172 if (++get_sv_count == 2) {
1173 // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
1174 // is forced to repeat the process
1175 for (int i = 0; i < num_keys; ++i) {
1176 int cf = i / 3;
1177 int cf_key = i % 8;
1178 if (cf_key == 0) {
1179 ASSERT_OK(Flush(cf));
1180 }
1181 ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1182 std::get<2>(cf_kv_vec[i]) + "_2"));
1183 }
1184 }
1185 if (get_sv_count == 11) {
1186 for (int i = 0; i < 8; ++i) {
1187 auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
1188 db->GetColumnFamilyHandle(i))
1189 ->cfd();
1190 ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1191 }
1192 }
1193 });
1194 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1195
1196 std::vector<int> cfs;
1197 std::vector<std::string> keys;
1198 std::vector<std::string> values;
1199
1200 for (int i = 0; i < num_keys; ++i) {
1201 cfs.push_back(std::get<0>(cf_kv_vec[i]));
1202 keys.push_back(std::get<1>(cf_kv_vec[i]));
1203 }
1204
1205 values = MultiGet(cfs, keys, nullptr, GetParam());
1206 ASSERT_EQ(values.size(), num_keys);
1207 for (unsigned int j = 0; j < values.size(); ++j) {
1208 ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
1209 }
1210
1211 keys.clear();
1212 cfs.clear();
1213 cfs.push_back(std::get<0>(cf_kv_vec[0]));
1214 keys.push_back(std::get<1>(cf_kv_vec[0]));
1215 cfs.push_back(std::get<0>(cf_kv_vec[3]));
1216 keys.push_back(std::get<1>(cf_kv_vec[3]));
1217 cfs.push_back(std::get<0>(cf_kv_vec[4]));
1218 keys.push_back(std::get<1>(cf_kv_vec[4]));
1219 values = MultiGet(cfs, keys, nullptr, GetParam());
1220 ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
1221 ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
1222 ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
1223
1224 keys.clear();
1225 cfs.clear();
1226 cfs.push_back(std::get<0>(cf_kv_vec[7]));
1227 keys.push_back(std::get<1>(cf_kv_vec[7]));
1228 cfs.push_back(std::get<0>(cf_kv_vec[6]));
1229 keys.push_back(std::get<1>(cf_kv_vec[6]));
1230 cfs.push_back(std::get<0>(cf_kv_vec[1]));
1231 keys.push_back(std::get<1>(cf_kv_vec[1]));
1232 values = MultiGet(cfs, keys, nullptr, GetParam());
1233 ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
1234 ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
1235 ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
1236
1237 for (int cf = 0; cf < 8; ++cf) {
1238 auto* cfd =
1239 static_cast_with_check<ColumnFamilyHandleImpl>(
1240 static_cast_with_check<DBImpl>(db_)->GetColumnFamilyHandle(cf))
1241 ->cfd();
1242 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1243 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
1244 }
1245 }
1246
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFMutex)1247 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
1248 Options options = CurrentOptions();
1249 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1250 "alyosha", "popovich"},
1251 options);
1252
1253 for (int i = 0; i < 8; ++i) {
1254 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1255 "cf" + std::to_string(i) + "_val"));
1256 }
1257
1258 int get_sv_count = 0;
1259 int retries = 0;
1260 bool last_try = false;
1261 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1262 "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
1263 last_try = true;
1264 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1265 });
1266 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1267 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1268 if (last_try) {
1269 return;
1270 }
1271 if (++get_sv_count == 2) {
1272 ++retries;
1273 get_sv_count = 0;
1274 for (int i = 0; i < 8; ++i) {
1275 ASSERT_OK(Flush(i));
1276 ASSERT_OK(Put(
1277 i, "cf" + std::to_string(i) + "_key",
1278 "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
1279 }
1280 }
1281 });
1282 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1283
1284 std::vector<int> cfs;
1285 std::vector<std::string> keys;
1286 std::vector<std::string> values;
1287
1288 for (int i = 0; i < 8; ++i) {
1289 cfs.push_back(i);
1290 keys.push_back("cf" + std::to_string(i) + "_key");
1291 }
1292
1293 values = MultiGet(cfs, keys, nullptr, GetParam());
1294 ASSERT_TRUE(last_try);
1295 ASSERT_EQ(values.size(), 8);
1296 for (unsigned int j = 0; j < values.size(); ++j) {
1297 ASSERT_EQ(values[j],
1298 "cf" + std::to_string(j) + "_val" + std::to_string(retries));
1299 }
1300 for (int i = 0; i < 8; ++i) {
1301 auto* cfd =
1302 static_cast_with_check<ColumnFamilyHandleImpl>(
1303 static_cast_with_check<DBImpl>(db_)->GetColumnFamilyHandle(i))
1304 ->cfd();
1305 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1306 }
1307 }
1308
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFSnapshot)1309 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
1310 Options options = CurrentOptions();
1311 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1312 "alyosha", "popovich"},
1313 options);
1314
1315 for (int i = 0; i < 8; ++i) {
1316 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1317 "cf" + std::to_string(i) + "_val"));
1318 }
1319
1320 int get_sv_count = 0;
1321 ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
1322 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1323 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1324 if (++get_sv_count == 2) {
1325 for (int i = 0; i < 8; ++i) {
1326 ASSERT_OK(Flush(i));
1327 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1328 "cf" + std::to_string(i) + "_val2"));
1329 }
1330 }
1331 if (get_sv_count == 8) {
1332 for (int i = 0; i < 8; ++i) {
1333 auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
1334 db->GetColumnFamilyHandle(i))
1335 ->cfd();
1336 ASSERT_TRUE(
1337 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
1338 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
1339 }
1340 }
1341 });
1342 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1343
1344 std::vector<int> cfs;
1345 std::vector<std::string> keys;
1346 std::vector<std::string> values;
1347
1348 for (int i = 0; i < 8; ++i) {
1349 cfs.push_back(i);
1350 keys.push_back("cf" + std::to_string(i) + "_key");
1351 }
1352
1353 const Snapshot* snapshot = db_->GetSnapshot();
1354 values = MultiGet(cfs, keys, snapshot, GetParam());
1355 db_->ReleaseSnapshot(snapshot);
1356 ASSERT_EQ(values.size(), 8);
1357 for (unsigned int j = 0; j < values.size(); ++j) {
1358 ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
1359 }
1360 for (int i = 0; i < 8; ++i) {
1361 auto* cfd =
1362 static_cast_with_check<ColumnFamilyHandleImpl>(
1363 static_cast_with_check<DBImpl>(db_)->GetColumnFamilyHandle(i))
1364 ->cfd();
1365 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1366 }
1367 }
1368
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFUnsorted)1369 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFUnsorted) {
1370 Options options = CurrentOptions();
1371 CreateAndReopenWithCF({"one", "two"}, options);
1372
1373 ASSERT_OK(Put(1, "foo", "bar"));
1374 ASSERT_OK(Put(2, "baz", "xyz"));
1375 ASSERT_OK(Put(1, "abc", "def"));
1376
1377 // Note: keys for the same CF do not form a consecutive range
1378 std::vector<int> cfs{1, 2, 1};
1379 std::vector<std::string> keys{"foo", "baz", "abc"};
1380 std::vector<std::string> values;
1381
1382 values =
1383 MultiGet(cfs, keys, /* snapshot */ nullptr, /* batched */ GetParam());
1384
1385 ASSERT_EQ(values.size(), 3);
1386 ASSERT_EQ(values[0], "bar");
1387 ASSERT_EQ(values[1], "xyz");
1388 ASSERT_EQ(values[2], "def");
1389 }
1390
1391 INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
1392 testing::Bool());
1393
TEST_F(DBBasicTest,MultiGetBatchedSimpleUnsorted)1394 TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
1395 do {
1396 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1397 SetPerfLevel(kEnableCount);
1398 ASSERT_OK(Put(1, "k1", "v1"));
1399 ASSERT_OK(Put(1, "k2", "v2"));
1400 ASSERT_OK(Put(1, "k3", "v3"));
1401 ASSERT_OK(Put(1, "k4", "v4"));
1402 ASSERT_OK(Delete(1, "k4"));
1403 ASSERT_OK(Put(1, "k5", "v5"));
1404 ASSERT_OK(Delete(1, "no_key"));
1405
1406 get_perf_context()->Reset();
1407
1408 std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
1409 std::vector<PinnableSlice> values(keys.size());
1410 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1411 std::vector<Status> s(keys.size());
1412
1413 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1414 values.data(), s.data(), false);
1415
1416 ASSERT_EQ(values.size(), keys.size());
1417 ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
1418 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
1419 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1420 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1421 // four kv pairs * two bytes per value
1422 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1423
1424 ASSERT_TRUE(s[0].IsNotFound());
1425 ASSERT_OK(s[1]);
1426 ASSERT_TRUE(s[2].IsNotFound());
1427 ASSERT_OK(s[3]);
1428 ASSERT_OK(s[4]);
1429 ASSERT_OK(s[5]);
1430
1431 SetPerfLevel(kDisable);
1432 } while (ChangeCompactOptions());
1433 }
1434
TEST_F(DBBasicTest,MultiGetBatchedSortedMultiFile)1435 TEST_F(DBBasicTest, MultiGetBatchedSortedMultiFile) {
1436 do {
1437 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1438 SetPerfLevel(kEnableCount);
1439 // To expand the power of this test, generate > 1 table file and
1440 // mix with memtable
1441 ASSERT_OK(Put(1, "k1", "v1"));
1442 ASSERT_OK(Put(1, "k2", "v2"));
1443 ASSERT_OK(Flush(1));
1444 ASSERT_OK(Put(1, "k3", "v3"));
1445 ASSERT_OK(Put(1, "k4", "v4"));
1446 ASSERT_OK(Flush(1));
1447 ASSERT_OK(Delete(1, "k4"));
1448 ASSERT_OK(Put(1, "k5", "v5"));
1449 ASSERT_OK(Delete(1, "no_key"));
1450
1451 get_perf_context()->Reset();
1452
1453 std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
1454 std::vector<PinnableSlice> values(keys.size());
1455 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1456 std::vector<Status> s(keys.size());
1457
1458 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1459 values.data(), s.data(), true);
1460
1461 ASSERT_EQ(values.size(), keys.size());
1462 ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
1463 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
1464 ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1465 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
1466 // four kv pairs * two bytes per value
1467 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1468
1469 ASSERT_OK(s[0]);
1470 ASSERT_OK(s[1]);
1471 ASSERT_OK(s[2]);
1472 ASSERT_TRUE(s[3].IsNotFound());
1473 ASSERT_OK(s[4]);
1474 ASSERT_TRUE(s[5].IsNotFound());
1475
1476 SetPerfLevel(kDisable);
1477 } while (ChangeOptions());
1478 }
1479
TEST_F(DBBasicTest,MultiGetBatchedDuplicateKeys)1480 TEST_F(DBBasicTest, MultiGetBatchedDuplicateKeys) {
1481 Options opts = CurrentOptions();
1482 opts.merge_operator = MergeOperators::CreateStringAppendOperator();
1483 CreateAndReopenWithCF({"pikachu"}, opts);
1484 SetPerfLevel(kEnableCount);
1485 // To expand the power of this test, generate > 1 table file and
1486 // mix with memtable
1487 ASSERT_OK(Merge(1, "k1", "v1"));
1488 ASSERT_OK(Merge(1, "k2", "v2"));
1489 ASSERT_OK(Flush(1));
1490 MoveFilesToLevel(2, 1);
1491 ASSERT_OK(Merge(1, "k3", "v3"));
1492 ASSERT_OK(Merge(1, "k4", "v4"));
1493 ASSERT_OK(Flush(1));
1494 MoveFilesToLevel(2, 1);
1495 ASSERT_OK(Merge(1, "k4", "v4_2"));
1496 ASSERT_OK(Merge(1, "k6", "v6"));
1497 ASSERT_OK(Flush(1));
1498 MoveFilesToLevel(2, 1);
1499 ASSERT_OK(Merge(1, "k7", "v7"));
1500 ASSERT_OK(Merge(1, "k8", "v8"));
1501 ASSERT_OK(Flush(1));
1502 MoveFilesToLevel(2, 1);
1503
1504 get_perf_context()->Reset();
1505
1506 std::vector<Slice> keys({"k8", "k8", "k8", "k4", "k4", "k1", "k3"});
1507 std::vector<PinnableSlice> values(keys.size());
1508 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1509 std::vector<Status> s(keys.size());
1510
1511 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1512 values.data(), s.data(), false);
1513
1514 ASSERT_EQ(values.size(), keys.size());
1515 ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v8");
1516 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v8");
1517 ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v8");
1518 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v4,v4_2");
1519 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v4,v4_2");
1520 ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
1521 ASSERT_EQ(std::string(values[6].data(), values[6].size()), "v3");
1522 ASSERT_EQ(24, (int)get_perf_context()->multiget_read_bytes);
1523
1524 for (Status& status : s) {
1525 ASSERT_OK(status);
1526 }
1527
1528 SetPerfLevel(kDisable);
1529 }
1530
TEST_F(DBBasicTest,MultiGetBatchedMultiLevel)1531 TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
1532 Options options = CurrentOptions();
1533 options.disable_auto_compactions = true;
1534 Reopen(options);
1535 int num_keys = 0;
1536
1537 for (int i = 0; i < 128; ++i) {
1538 ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1539 num_keys++;
1540 if (num_keys == 8) {
1541 ASSERT_OK(Flush());
1542 num_keys = 0;
1543 }
1544 }
1545 if (num_keys > 0) {
1546 ASSERT_OK(Flush());
1547 num_keys = 0;
1548 }
1549 MoveFilesToLevel(2);
1550
1551 for (int i = 0; i < 128; i += 3) {
1552 ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1553 num_keys++;
1554 if (num_keys == 8) {
1555 ASSERT_OK(Flush());
1556 num_keys = 0;
1557 }
1558 }
1559 if (num_keys > 0) {
1560 ASSERT_OK(Flush());
1561 num_keys = 0;
1562 }
1563 MoveFilesToLevel(1);
1564
1565 for (int i = 0; i < 128; i += 5) {
1566 ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1567 num_keys++;
1568 if (num_keys == 8) {
1569 ASSERT_OK(Flush());
1570 num_keys = 0;
1571 }
1572 }
1573 if (num_keys > 0) {
1574 ASSERT_OK(Flush());
1575 num_keys = 0;
1576 }
1577 ASSERT_EQ(0, num_keys);
1578
1579 for (int i = 0; i < 128; i += 9) {
1580 ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1581 }
1582
1583 std::vector<std::string> keys;
1584 std::vector<std::string> values;
1585
1586 for (int i = 64; i < 80; ++i) {
1587 keys.push_back("key_" + std::to_string(i));
1588 }
1589
1590 values = MultiGet(keys, nullptr);
1591 ASSERT_EQ(values.size(), 16);
1592 for (unsigned int j = 0; j < values.size(); ++j) {
1593 int key = j + 64;
1594 if (key % 9 == 0) {
1595 ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
1596 } else if (key % 5 == 0) {
1597 ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
1598 } else if (key % 3 == 0) {
1599 ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
1600 } else {
1601 ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
1602 }
1603 }
1604 }
1605
TEST_F(DBBasicTest,MultiGetBatchedMultiLevelMerge)1606 TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
1607 Options options = CurrentOptions();
1608 options.disable_auto_compactions = true;
1609 options.merge_operator = MergeOperators::CreateStringAppendOperator();
1610 BlockBasedTableOptions bbto;
1611 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1612 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1613 Reopen(options);
1614 int num_keys = 0;
1615
1616 for (int i = 0; i < 128; ++i) {
1617 ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1618 num_keys++;
1619 if (num_keys == 8) {
1620 ASSERT_OK(Flush());
1621 num_keys = 0;
1622 }
1623 }
1624 if (num_keys > 0) {
1625 ASSERT_OK(Flush());
1626 num_keys = 0;
1627 }
1628 MoveFilesToLevel(2);
1629
1630 for (int i = 0; i < 128; i += 3) {
1631 ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1632 num_keys++;
1633 if (num_keys == 8) {
1634 ASSERT_OK(Flush());
1635 num_keys = 0;
1636 }
1637 }
1638 if (num_keys > 0) {
1639 ASSERT_OK(Flush());
1640 num_keys = 0;
1641 }
1642 MoveFilesToLevel(1);
1643
1644 for (int i = 0; i < 128; i += 5) {
1645 ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1646 num_keys++;
1647 if (num_keys == 8) {
1648 ASSERT_OK(Flush());
1649 num_keys = 0;
1650 }
1651 }
1652 if (num_keys > 0) {
1653 ASSERT_OK(Flush());
1654 num_keys = 0;
1655 }
1656 ASSERT_EQ(0, num_keys);
1657
1658 for (int i = 0; i < 128; i += 9) {
1659 ASSERT_OK(
1660 Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1661 }
1662
1663 std::vector<std::string> keys;
1664 std::vector<std::string> values;
1665
1666 for (int i = 32; i < 80; ++i) {
1667 keys.push_back("key_" + std::to_string(i));
1668 }
1669
1670 values = MultiGet(keys, nullptr);
1671 ASSERT_EQ(values.size(), keys.size());
1672 for (unsigned int j = 0; j < 48; ++j) {
1673 int key = j + 32;
1674 std::string value;
1675 value.append("val_l2_" + std::to_string(key));
1676 if (key % 3 == 0) {
1677 value.append(",");
1678 value.append("val_l1_" + std::to_string(key));
1679 }
1680 if (key % 5 == 0) {
1681 value.append(",");
1682 value.append("val_l0_" + std::to_string(key));
1683 }
1684 if (key % 9 == 0) {
1685 value.append(",");
1686 value.append("val_mem_" + std::to_string(key));
1687 }
1688 ASSERT_EQ(values[j], value);
1689 }
1690 }
1691
TEST_F(DBBasicTest,MultiGetBatchedValueSizeInMemory)1692 TEST_F(DBBasicTest, MultiGetBatchedValueSizeInMemory) {
1693 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1694 SetPerfLevel(kEnableCount);
1695 ASSERT_OK(Put(1, "k1", "v_1"));
1696 ASSERT_OK(Put(1, "k2", "v_2"));
1697 ASSERT_OK(Put(1, "k3", "v_3"));
1698 ASSERT_OK(Put(1, "k4", "v_4"));
1699 ASSERT_OK(Put(1, "k5", "v_5"));
1700 ASSERT_OK(Put(1, "k6", "v_6"));
1701 std::vector<Slice> keys = {"k1", "k2", "k3", "k4", "k5", "k6"};
1702 std::vector<PinnableSlice> values(keys.size());
1703 std::vector<Status> s(keys.size());
1704 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1705
1706 get_perf_context()->Reset();
1707 ReadOptions ro;
1708 ro.value_size_soft_limit = 11;
1709 db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
1710 s.data(), false);
1711
1712 ASSERT_EQ(values.size(), keys.size());
1713 for (unsigned int i = 0; i < 4; i++) {
1714 ASSERT_EQ(std::string(values[i].data(), values[i].size()),
1715 "v_" + std::to_string(i + 1));
1716 }
1717
1718 for (unsigned int i = 4; i < 6; i++) {
1719 ASSERT_TRUE(s[i].IsAborted());
1720 }
1721
1722 ASSERT_EQ(12, (int)get_perf_context()->multiget_read_bytes);
1723 SetPerfLevel(kDisable);
1724 }
1725
TEST_F(DBBasicTest,MultiGetBatchedValueSize)1726 TEST_F(DBBasicTest, MultiGetBatchedValueSize) {
1727 do {
1728 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1729 SetPerfLevel(kEnableCount);
1730
1731 ASSERT_OK(Put(1, "k6", "v6"));
1732 ASSERT_OK(Put(1, "k7", "v7_"));
1733 ASSERT_OK(Put(1, "k3", "v3_"));
1734 ASSERT_OK(Put(1, "k4", "v4"));
1735 ASSERT_OK(Flush(1));
1736 ASSERT_OK(Delete(1, "k4"));
1737 ASSERT_OK(Put(1, "k11", "v11"));
1738 ASSERT_OK(Delete(1, "no_key"));
1739 ASSERT_OK(Put(1, "k8", "v8_"));
1740 ASSERT_OK(Put(1, "k13", "v13"));
1741 ASSERT_OK(Put(1, "k14", "v14"));
1742 ASSERT_OK(Put(1, "k15", "v15"));
1743 ASSERT_OK(Put(1, "k16", "v16"));
1744 ASSERT_OK(Put(1, "k17", "v17"));
1745 ASSERT_OK(Flush(1));
1746
1747 ASSERT_OK(Put(1, "k1", "v1_"));
1748 ASSERT_OK(Put(1, "k2", "v2_"));
1749 ASSERT_OK(Put(1, "k5", "v5_"));
1750 ASSERT_OK(Put(1, "k9", "v9_"));
1751 ASSERT_OK(Put(1, "k10", "v10"));
1752 ASSERT_OK(Delete(1, "k2"));
1753 ASSERT_OK(Delete(1, "k6"));
1754
1755 get_perf_context()->Reset();
1756
1757 std::vector<Slice> keys({"k1", "k10", "k11", "k12", "k13", "k14", "k15",
1758 "k16", "k17", "k2", "k3", "k4", "k5", "k6", "k7",
1759 "k8", "k9", "no_key"});
1760 std::vector<PinnableSlice> values(keys.size());
1761 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1762 std::vector<Status> s(keys.size());
1763
1764 ReadOptions ro;
1765 ro.value_size_soft_limit = 20;
1766 db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
1767 s.data(), false);
1768
1769 ASSERT_EQ(values.size(), keys.size());
1770
1771 // In memory keys
1772 ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1_");
1773 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v10");
1774 ASSERT_TRUE(s[9].IsNotFound()); // k2
1775 ASSERT_EQ(std::string(values[12].data(), values[12].size()), "v5_");
1776 ASSERT_TRUE(s[13].IsNotFound()); // k6
1777 ASSERT_EQ(std::string(values[16].data(), values[16].size()), "v9_");
1778
1779 // In sst files
1780 ASSERT_EQ(std::string(values[2].data(), values[1].size()), "v11");
1781 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v13");
1782 ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v14");
1783
1784 // Remaining aborted after value_size exceeds.
1785 ASSERT_TRUE(s[3].IsAborted());
1786 ASSERT_TRUE(s[6].IsAborted());
1787 ASSERT_TRUE(s[7].IsAborted());
1788 ASSERT_TRUE(s[8].IsAborted());
1789 ASSERT_TRUE(s[10].IsAborted());
1790 ASSERT_TRUE(s[11].IsAborted());
1791 ASSERT_TRUE(s[14].IsAborted());
1792 ASSERT_TRUE(s[15].IsAborted());
1793 ASSERT_TRUE(s[17].IsAborted());
1794
1795 // 6 kv pairs * 3 bytes per value (i.e. 18)
1796 ASSERT_EQ(21, (int)get_perf_context()->multiget_read_bytes);
1797 SetPerfLevel(kDisable);
1798 } while (ChangeCompactOptions());
1799 }
1800
TEST_F(DBBasicTest,MultiGetBatchedValueSizeMultiLevelMerge)1801 TEST_F(DBBasicTest, MultiGetBatchedValueSizeMultiLevelMerge) {
1802 Options options = CurrentOptions();
1803 options.disable_auto_compactions = true;
1804 options.merge_operator = MergeOperators::CreateStringAppendOperator();
1805 BlockBasedTableOptions bbto;
1806 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1807 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1808 Reopen(options);
1809 int num_keys = 0;
1810
1811 for (int i = 0; i < 64; ++i) {
1812 ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1813 num_keys++;
1814 if (num_keys == 8) {
1815 ASSERT_OK(Flush());
1816 num_keys = 0;
1817 }
1818 }
1819 if (num_keys > 0) {
1820 ASSERT_OK(Flush());
1821 num_keys = 0;
1822 }
1823 MoveFilesToLevel(2);
1824
1825 for (int i = 0; i < 64; i += 3) {
1826 ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1827 num_keys++;
1828 if (num_keys == 8) {
1829 ASSERT_OK(Flush());
1830 num_keys = 0;
1831 }
1832 }
1833 if (num_keys > 0) {
1834 ASSERT_OK(Flush());
1835 num_keys = 0;
1836 }
1837 MoveFilesToLevel(1);
1838
1839 for (int i = 0; i < 64; i += 5) {
1840 ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1841 num_keys++;
1842 if (num_keys == 8) {
1843 ASSERT_OK(Flush());
1844 num_keys = 0;
1845 }
1846 }
1847 if (num_keys > 0) {
1848 ASSERT_OK(Flush());
1849 num_keys = 0;
1850 }
1851 ASSERT_EQ(0, num_keys);
1852
1853 for (int i = 0; i < 64; i += 9) {
1854 ASSERT_OK(
1855 Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1856 }
1857
1858 std::vector<std::string> keys_str;
1859 for (int i = 10; i < 50; ++i) {
1860 keys_str.push_back("key_" + std::to_string(i));
1861 }
1862
1863 std::vector<Slice> keys(keys_str.size());
1864 for (int i = 0; i < 40; i++) {
1865 keys[i] = Slice(keys_str[i]);
1866 }
1867
1868 std::vector<PinnableSlice> values(keys_str.size());
1869 std::vector<Status> statuses(keys_str.size());
1870 ReadOptions read_options;
1871 read_options.verify_checksums = true;
1872 read_options.value_size_soft_limit = 380;
1873 db_->MultiGet(read_options, dbfull()->DefaultColumnFamily(), keys.size(),
1874 keys.data(), values.data(), statuses.data());
1875
1876 ASSERT_EQ(values.size(), keys.size());
1877
1878 uint64_t curr_value_size = 0;
1879 for (unsigned int j = 0; j < 26; ++j) {
1880 int key = j + 10;
1881 std::string value;
1882 value.append("val_l2_" + std::to_string(key));
1883 if (key % 3 == 0) {
1884 value.append(",");
1885 value.append("val_l1_" + std::to_string(key));
1886 }
1887 if (key % 5 == 0) {
1888 value.append(",");
1889 value.append("val_l0_" + std::to_string(key));
1890 }
1891 if (key % 9 == 0) {
1892 value.append(",");
1893 value.append("val_mem_" + std::to_string(key));
1894 }
1895 curr_value_size += value.size();
1896 ASSERT_EQ(values[j], value);
1897 ASSERT_OK(statuses[j]);
1898 }
1899 // ASSERT_TRUE(curr_value_size <= read_options.value_size_hard_limit);
1900
1901 // All remaning keys status is set Status::Abort
1902 for (unsigned int j = 26; j < 40; j++) {
1903 ASSERT_TRUE(statuses[j].IsAborted());
1904 }
1905 }
1906
TEST_F(DBBasicTest,MultiGetStats)1907 TEST_F(DBBasicTest, MultiGetStats) {
1908 Options options;
1909 options.create_if_missing = true;
1910 options.disable_auto_compactions = true;
1911 options.env = env_;
1912 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1913 BlockBasedTableOptions table_options;
1914 table_options.block_size = 1;
1915 table_options.index_type =
1916 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1917 table_options.partition_filters = true;
1918 table_options.no_block_cache = true;
1919 table_options.cache_index_and_filter_blocks = false;
1920 table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
1921 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1922 CreateAndReopenWithCF({"pikachu"}, options);
1923
1924 int total_keys = 2000;
1925 std::vector<std::string> keys_str(total_keys);
1926 std::vector<Slice> keys(total_keys);
1927 std::vector<PinnableSlice> values(total_keys);
1928 std::vector<Status> s(total_keys);
1929 ReadOptions read_opts;
1930
1931 Random rnd(309);
1932 // Create Multiple SST files at multiple levels.
1933 for (int i = 0; i < 500; ++i) {
1934 keys_str[i] = "k" + std::to_string(i);
1935 keys[i] = Slice(keys_str[i]);
1936 ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
1937 if (i % 100 == 0) {
1938 ASSERT_OK(Flush(1));
1939 }
1940 }
1941 ASSERT_OK(Flush(1));
1942 MoveFilesToLevel(2, 1);
1943
1944 for (int i = 501; i < 1000; ++i) {
1945 keys_str[i] = "k" + std::to_string(i);
1946 keys[i] = Slice(keys_str[i]);
1947 ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
1948 if (i % 100 == 0) {
1949 ASSERT_OK(Flush(1));
1950 }
1951 }
1952
1953 ASSERT_OK(Flush(1));
1954 MoveFilesToLevel(2, 1);
1955
1956 for (int i = 1001; i < total_keys; ++i) {
1957 keys_str[i] = "k" + std::to_string(i);
1958 keys[i] = Slice(keys_str[i]);
1959 ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
1960 if (i % 100 == 0) {
1961 ASSERT_OK(Flush(1));
1962 }
1963 }
1964 ASSERT_OK(Flush(1));
1965 Close();
1966
1967 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1968 ASSERT_OK(options.statistics->Reset());
1969
1970 db_->MultiGet(read_opts, handles_[1], total_keys, keys.data(), values.data(),
1971 s.data(), false);
1972
1973 ASSERT_EQ(values.size(), total_keys);
1974 HistogramData hist_data_blocks;
1975 HistogramData hist_index_and_filter_blocks;
1976 HistogramData hist_sst;
1977
1978 options.statistics->histogramData(NUM_DATA_BLOCKS_READ_PER_LEVEL,
1979 &hist_data_blocks);
1980 options.statistics->histogramData(NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
1981 &hist_index_and_filter_blocks);
1982 options.statistics->histogramData(NUM_SST_READ_PER_LEVEL, &hist_sst);
1983
1984 // Maximum number of blocks read from a file system in a level.
1985 ASSERT_GT(hist_data_blocks.max, 0);
1986 ASSERT_GT(hist_index_and_filter_blocks.max, 0);
1987 // Maximum number of sst files read from file system in a level.
1988 ASSERT_GT(hist_sst.max, 0);
1989
1990 // Minimun number of blocks read in a level.
1991 ASSERT_EQ(hist_data_blocks.min, 3);
1992 ASSERT_GT(hist_index_and_filter_blocks.min, 0);
1993 // Minimun number of sst files read in a level.
1994 ASSERT_GT(hist_sst.max, 0);
1995 }
1996
1997 // Test class for batched MultiGet with prefix extractor
1998 // Param bool - If true, use partitioned filters
1999 // If false, use full filter block
2000 class MultiGetPrefixExtractorTest : public DBBasicTest,
2001 public ::testing::WithParamInterface<bool> {
2002 };
2003
TEST_P(MultiGetPrefixExtractorTest,Batched)2004 TEST_P(MultiGetPrefixExtractorTest, Batched) {
2005 Options options = CurrentOptions();
2006 options.prefix_extractor.reset(NewFixedPrefixTransform(2));
2007 options.memtable_prefix_bloom_size_ratio = 10;
2008 BlockBasedTableOptions bbto;
2009 if (GetParam()) {
2010 bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
2011 bbto.partition_filters = true;
2012 }
2013 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
2014 bbto.whole_key_filtering = false;
2015 bbto.cache_index_and_filter_blocks = false;
2016 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
2017 Reopen(options);
2018
2019 SetPerfLevel(kEnableCount);
2020 get_perf_context()->Reset();
2021
2022 // First key is not in the prefix_extractor domain
2023 ASSERT_OK(Put("k", "v0"));
2024 ASSERT_OK(Put("kk1", "v1"));
2025 ASSERT_OK(Put("kk2", "v2"));
2026 ASSERT_OK(Put("kk3", "v3"));
2027 ASSERT_OK(Put("kk4", "v4"));
2028 std::vector<std::string> mem_keys(
2029 {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
2030 std::vector<std::string> inmem_values;
2031 inmem_values = MultiGet(mem_keys, nullptr);
2032 ASSERT_EQ(inmem_values[0], "v0");
2033 ASSERT_EQ(inmem_values[1], "v1");
2034 ASSERT_EQ(inmem_values[2], "v2");
2035 ASSERT_EQ(inmem_values[3], "v3");
2036 ASSERT_EQ(inmem_values[4], "v4");
2037 ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
2038 ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
2039 ASSERT_OK(Flush());
2040
2041 std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
2042 std::vector<std::string> values;
2043 get_perf_context()->Reset();
2044 values = MultiGet(keys, nullptr);
2045 ASSERT_EQ(values[0], "v0");
2046 ASSERT_EQ(values[1], "v1");
2047 ASSERT_EQ(values[2], "v2");
2048 ASSERT_EQ(values[3], "v3");
2049 ASSERT_EQ(values[4], "v4");
2050 // Filter hits for 4 in-domain keys
2051 ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
2052 }
2053
2054 INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
2055 ::testing::Bool());
2056
2057 #ifndef ROCKSDB_LITE
2058 class DBMultiGetRowCacheTest : public DBBasicTest,
2059 public ::testing::WithParamInterface<bool> {};
2060
TEST_P(DBMultiGetRowCacheTest,MultiGetBatched)2061 TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
2062 do {
2063 option_config_ = kRowCache;
2064 Options options = CurrentOptions();
2065 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2066 CreateAndReopenWithCF({"pikachu"}, options);
2067 SetPerfLevel(kEnableCount);
2068 ASSERT_OK(Put(1, "k1", "v1"));
2069 ASSERT_OK(Put(1, "k2", "v2"));
2070 ASSERT_OK(Put(1, "k3", "v3"));
2071 ASSERT_OK(Put(1, "k4", "v4"));
2072 ASSERT_OK(Flush(1));
2073 ASSERT_OK(Put(1, "k5", "v5"));
2074 const Snapshot* snap1 = dbfull()->GetSnapshot();
2075 ASSERT_OK(Delete(1, "k4"));
2076 ASSERT_OK(Flush(1));
2077 const Snapshot* snap2 = dbfull()->GetSnapshot();
2078
2079 get_perf_context()->Reset();
2080
2081 std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
2082 std::vector<PinnableSlice> values(keys.size());
2083 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
2084 std::vector<Status> s(keys.size());
2085
2086 ReadOptions ro;
2087 bool use_snapshots = GetParam();
2088 if (use_snapshots) {
2089 ro.snapshot = snap2;
2090 }
2091 db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
2092 s.data(), false);
2093
2094 ASSERT_EQ(values.size(), keys.size());
2095 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
2096 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
2097 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
2098 // four kv pairs * two bytes per value
2099 ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
2100
2101 ASSERT_TRUE(s[0].IsNotFound());
2102 ASSERT_OK(s[1]);
2103 ASSERT_TRUE(s[2].IsNotFound());
2104 ASSERT_OK(s[3]);
2105 ASSERT_OK(s[4]);
2106
2107 // Call MultiGet() again with some intersection with the previous set of
2108 // keys. Those should already be in the row cache.
2109 keys.assign({"no_key", "k5", "k3", "k2"});
2110 for (size_t i = 0; i < keys.size(); ++i) {
2111 values[i].Reset();
2112 s[i] = Status::OK();
2113 }
2114 get_perf_context()->Reset();
2115
2116 if (use_snapshots) {
2117 ro.snapshot = snap1;
2118 }
2119 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
2120 values.data(), s.data(), false);
2121
2122 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
2123 ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
2124 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
2125 // four kv pairs * two bytes per value
2126 ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
2127
2128 ASSERT_TRUE(s[0].IsNotFound());
2129 ASSERT_OK(s[1]);
2130 ASSERT_OK(s[2]);
2131 ASSERT_OK(s[3]);
2132 if (use_snapshots) {
2133 // Only reads from the first SST file would have been cached, since
2134 // snapshot seq no is > fd.largest_seqno
2135 ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
2136 } else {
2137 ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
2138 }
2139
2140 SetPerfLevel(kDisable);
2141 dbfull()->ReleaseSnapshot(snap1);
2142 dbfull()->ReleaseSnapshot(snap2);
2143 } while (ChangeCompactOptions());
2144 }
2145
2146 INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
2147 testing::Values(true, false));
2148
TEST_F(DBBasicTest,GetAllKeyVersions)2149 TEST_F(DBBasicTest, GetAllKeyVersions) {
2150 Options options = CurrentOptions();
2151 options.env = env_;
2152 options.create_if_missing = true;
2153 options.disable_auto_compactions = true;
2154 CreateAndReopenWithCF({"pikachu"}, options);
2155 ASSERT_EQ(2, handles_.size());
2156 const size_t kNumInserts = 4;
2157 const size_t kNumDeletes = 4;
2158 const size_t kNumUpdates = 4;
2159
2160 // Check default column family
2161 for (size_t i = 0; i != kNumInserts; ++i) {
2162 ASSERT_OK(Put(std::to_string(i), "value"));
2163 }
2164 for (size_t i = 0; i != kNumUpdates; ++i) {
2165 ASSERT_OK(Put(std::to_string(i), "value1"));
2166 }
2167 for (size_t i = 0; i != kNumDeletes; ++i) {
2168 ASSERT_OK(Delete(std::to_string(i)));
2169 }
2170 std::vector<KeyVersion> key_versions;
2171 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
2172 db_, Slice(), Slice(), std::numeric_limits<size_t>::max(),
2173 &key_versions));
2174 ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
2175 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
2176 db_, handles_[0], Slice(), Slice(), std::numeric_limits<size_t>::max(),
2177 &key_versions));
2178 ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
2179
2180 // Check non-default column family
2181 for (size_t i = 0; i + 1 != kNumInserts; ++i) {
2182 ASSERT_OK(Put(1, std::to_string(i), "value"));
2183 }
2184 for (size_t i = 0; i + 1 != kNumUpdates; ++i) {
2185 ASSERT_OK(Put(1, std::to_string(i), "value1"));
2186 }
2187 for (size_t i = 0; i + 1 != kNumDeletes; ++i) {
2188 ASSERT_OK(Delete(1, std::to_string(i)));
2189 }
2190 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
2191 db_, handles_[1], Slice(), Slice(), std::numeric_limits<size_t>::max(),
2192 &key_versions));
2193 ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
2194 }
2195 #endif // !ROCKSDB_LITE
2196
TEST_F(DBBasicTest,MultiGetIOBufferOverrun)2197 TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
2198 Options options = CurrentOptions();
2199 Random rnd(301);
2200 BlockBasedTableOptions table_options;
2201 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
2202 table_options.block_size = 16 * 1024;
2203 ASSERT_TRUE(table_options.block_size >
2204 BlockBasedTable::kMultiGetReadStackBufSize);
2205 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2206 Reopen(options);
2207
2208 std::string zero_str(128, '\0');
2209 for (int i = 0; i < 100; ++i) {
2210 // Make the value compressible. A purely random string doesn't compress
2211 // and the resultant data block will not be compressed
2212 std::string value(rnd.RandomString(128) + zero_str);
2213 assert(Put(Key(i), value) == Status::OK());
2214 }
2215 ASSERT_OK(Flush());
2216
2217 std::vector<std::string> key_data(10);
2218 std::vector<Slice> keys;
2219 // We cannot resize a PinnableSlice vector, so just set initial size to
2220 // largest we think we will need
2221 std::vector<PinnableSlice> values(10);
2222 std::vector<Status> statuses;
2223 ReadOptions ro;
2224
2225 // Warm up the cache first
2226 key_data.emplace_back(Key(0));
2227 keys.emplace_back(Slice(key_data.back()));
2228 key_data.emplace_back(Key(50));
2229 keys.emplace_back(Slice(key_data.back()));
2230 statuses.resize(keys.size());
2231
2232 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2233 keys.data(), values.data(), statuses.data(), true);
2234 }
2235
TEST_F(DBBasicTest,IncrementalRecoveryNoCorrupt)2236 TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
2237 Options options = CurrentOptions();
2238 DestroyAndReopen(options);
2239 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
2240 size_t num_cfs = handles_.size();
2241 ASSERT_EQ(3, num_cfs);
2242 WriteOptions write_opts;
2243 write_opts.disableWAL = true;
2244 for (size_t cf = 0; cf != num_cfs; ++cf) {
2245 for (size_t i = 0; i != 10000; ++i) {
2246 std::string key_str = Key(static_cast<int>(i));
2247 std::string value_str = std::to_string(cf) + "_" + std::to_string(i);
2248
2249 ASSERT_OK(Put(static_cast<int>(cf), key_str, value_str));
2250 if (0 == (i % 1000)) {
2251 ASSERT_OK(Flush(static_cast<int>(cf)));
2252 }
2253 }
2254 }
2255 for (size_t cf = 0; cf != num_cfs; ++cf) {
2256 ASSERT_OK(Flush(static_cast<int>(cf)));
2257 }
2258 Close();
2259 options.best_efforts_recovery = true;
2260 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
2261 options);
2262 num_cfs = handles_.size();
2263 ASSERT_EQ(3, num_cfs);
2264 for (size_t cf = 0; cf != num_cfs; ++cf) {
2265 for (int i = 0; i != 10000; ++i) {
2266 std::string key_str = Key(static_cast<int>(i));
2267 std::string expected_value_str =
2268 std::to_string(cf) + "_" + std::to_string(i);
2269 ASSERT_EQ(expected_value_str, Get(static_cast<int>(cf), key_str));
2270 }
2271 }
2272 }
2273
TEST_F(DBBasicTest,BestEffortsRecoveryWithVersionBuildingFailure)2274 TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) {
2275 Options options = CurrentOptions();
2276 DestroyAndReopen(options);
2277 ASSERT_OK(Put("foo", "value"));
2278 ASSERT_OK(Flush());
2279 SyncPoint::GetInstance()->DisableProcessing();
2280 SyncPoint::GetInstance()->ClearAllCallBacks();
2281 SyncPoint::GetInstance()->SetCallBack(
2282 "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
2283 ASSERT_NE(nullptr, arg);
2284 *(reinterpret_cast<Status*>(arg)) =
2285 Status::Corruption("Inject corruption");
2286 });
2287 SyncPoint::GetInstance()->EnableProcessing();
2288
2289 options.best_efforts_recovery = true;
2290 Status s = TryReopen(options);
2291 ASSERT_TRUE(s.IsCorruption());
2292 SyncPoint::GetInstance()->DisableProcessing();
2293 SyncPoint::GetInstance()->ClearAllCallBacks();
2294 }
2295
2296 #ifndef ROCKSDB_LITE
2297 namespace {
2298 class TableFileListener : public EventListener {
2299 public:
OnTableFileCreated(const TableFileCreationInfo & info)2300 void OnTableFileCreated(const TableFileCreationInfo& info) override {
2301 InstrumentedMutexLock lock(&mutex_);
2302 cf_to_paths_[info.cf_name].push_back(info.file_path);
2303 }
GetFiles(const std::string & cf_name)2304 std::vector<std::string>& GetFiles(const std::string& cf_name) {
2305 InstrumentedMutexLock lock(&mutex_);
2306 return cf_to_paths_[cf_name];
2307 }
2308
2309 private:
2310 InstrumentedMutex mutex_;
2311 std::unordered_map<std::string, std::vector<std::string>> cf_to_paths_;
2312 };
2313 } // namespace
2314
TEST_F(DBBasicTest,LastSstFileNotInManifest)2315 TEST_F(DBBasicTest, LastSstFileNotInManifest) {
2316 // If the last sst file is not tracked in MANIFEST,
2317 // or the VersionEdit for the last sst file is not synced,
2318 // on recovery, the last sst file should be deleted,
2319 // and new sst files shouldn't reuse its file number.
2320 Options options = CurrentOptions();
2321 DestroyAndReopen(options);
2322 Close();
2323
2324 // Manually add a sst file.
2325 constexpr uint64_t kSstFileNumber = 100;
2326 const std::string kSstFile = MakeTableFileName(dbname_, kSstFileNumber);
2327 ASSERT_OK(WriteStringToFile(env_, /* data = */ "bad sst file content",
2328 /* fname = */ kSstFile,
2329 /* should_sync = */ true));
2330 ASSERT_OK(env_->FileExists(kSstFile));
2331
2332 TableFileListener* listener = new TableFileListener();
2333 options.listeners.emplace_back(listener);
2334 Reopen(options);
2335 // kSstFile should already be deleted.
2336 ASSERT_TRUE(env_->FileExists(kSstFile).IsNotFound());
2337
2338 ASSERT_OK(Put("k", "v"));
2339 ASSERT_OK(Flush());
2340 // New sst file should have file number > kSstFileNumber.
2341 std::vector<std::string>& files =
2342 listener->GetFiles(kDefaultColumnFamilyName);
2343 ASSERT_EQ(files.size(), 1);
2344 const std::string fname = files[0].erase(0, (dbname_ + "/").size());
2345 uint64_t number = 0;
2346 FileType type = kTableFile;
2347 ASSERT_TRUE(ParseFileName(fname, &number, &type));
2348 ASSERT_EQ(type, kTableFile);
2349 ASSERT_GT(number, kSstFileNumber);
2350 }
2351
TEST_F(DBBasicTest,RecoverWithMissingFiles)2352 TEST_F(DBBasicTest, RecoverWithMissingFiles) {
2353 Options options = CurrentOptions();
2354 DestroyAndReopen(options);
2355 TableFileListener* listener = new TableFileListener();
2356 // Disable auto compaction to simplify SST file name tracking.
2357 options.disable_auto_compactions = true;
2358 options.listeners.emplace_back(listener);
2359 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
2360 std::vector<std::string> all_cf_names = {kDefaultColumnFamilyName, "pikachu",
2361 "eevee"};
2362 size_t num_cfs = handles_.size();
2363 ASSERT_EQ(3, num_cfs);
2364 for (size_t cf = 0; cf != num_cfs; ++cf) {
2365 ASSERT_OK(Put(static_cast<int>(cf), "a", "0_value"));
2366 ASSERT_OK(Flush(static_cast<int>(cf)));
2367 ASSERT_OK(Put(static_cast<int>(cf), "b", "0_value"));
2368 ASSERT_OK(Flush(static_cast<int>(cf)));
2369 ASSERT_OK(Put(static_cast<int>(cf), "c", "0_value"));
2370 ASSERT_OK(Flush(static_cast<int>(cf)));
2371 }
2372
2373 // Delete and corrupt files
2374 for (size_t i = 0; i < all_cf_names.size(); ++i) {
2375 std::vector<std::string>& files = listener->GetFiles(all_cf_names[i]);
2376 ASSERT_EQ(3, files.size());
2377 std::string corrupted_data;
2378 ASSERT_OK(ReadFileToString(env_, files[files.size() - 1], &corrupted_data));
2379 ASSERT_OK(WriteStringToFile(
2380 env_, corrupted_data.substr(0, corrupted_data.size() - 2),
2381 files[files.size() - 1], /*should_sync=*/true));
2382 for (int j = static_cast<int>(files.size() - 2); j >= static_cast<int>(i);
2383 --j) {
2384 ASSERT_OK(env_->DeleteFile(files[j]));
2385 }
2386 }
2387 options.best_efforts_recovery = true;
2388 ReopenWithColumnFamilies(all_cf_names, options);
2389 // Verify data
2390 ReadOptions read_opts;
2391 read_opts.total_order_seek = true;
2392 {
2393 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
2394 iter->SeekToFirst();
2395 ASSERT_FALSE(iter->Valid());
2396 ASSERT_OK(iter->status());
2397 iter.reset(db_->NewIterator(read_opts, handles_[1]));
2398 iter->SeekToFirst();
2399 ASSERT_TRUE(iter->Valid());
2400 ASSERT_EQ("a", iter->key());
2401 iter->Next();
2402 ASSERT_FALSE(iter->Valid());
2403 ASSERT_OK(iter->status());
2404 iter.reset(db_->NewIterator(read_opts, handles_[2]));
2405 iter->SeekToFirst();
2406 ASSERT_TRUE(iter->Valid());
2407 ASSERT_EQ("a", iter->key());
2408 iter->Next();
2409 ASSERT_TRUE(iter->Valid());
2410 ASSERT_EQ("b", iter->key());
2411 iter->Next();
2412 ASSERT_FALSE(iter->Valid());
2413 ASSERT_OK(iter->status());
2414 }
2415 }
2416
TEST_F(DBBasicTest,BestEffortsRecoveryTryMultipleManifests)2417 TEST_F(DBBasicTest, BestEffortsRecoveryTryMultipleManifests) {
2418 Options options = CurrentOptions();
2419 options.env = env_;
2420 DestroyAndReopen(options);
2421 ASSERT_OK(Put("foo", "value0"));
2422 ASSERT_OK(Flush());
2423 Close();
2424 {
2425 // Hack by adding a new MANIFEST with high file number
2426 std::string garbage(10, '\0');
2427 ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/MANIFEST-001000",
2428 /*should_sync=*/true));
2429 }
2430 {
2431 // Hack by adding a corrupted SST not referenced by any MANIFEST
2432 std::string garbage(10, '\0');
2433 ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/001001.sst",
2434 /*should_sync=*/true));
2435 }
2436
2437 options.best_efforts_recovery = true;
2438
2439 Reopen(options);
2440 ASSERT_OK(Put("bar", "value"));
2441 }
2442
TEST_F(DBBasicTest,RecoverWithNoCurrentFile)2443 TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
2444 Options options = CurrentOptions();
2445 options.env = env_;
2446 DestroyAndReopen(options);
2447 CreateAndReopenWithCF({"pikachu"}, options);
2448 options.best_efforts_recovery = true;
2449 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
2450 ASSERT_EQ(2, handles_.size());
2451 ASSERT_OK(Put("foo", "value"));
2452 ASSERT_OK(Put(1, "bar", "value"));
2453 ASSERT_OK(Flush());
2454 ASSERT_OK(Flush(1));
2455 Close();
2456 ASSERT_OK(env_->DeleteFile(CurrentFileName(dbname_)));
2457 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
2458 std::vector<std::string> cf_names;
2459 ASSERT_OK(DB::ListColumnFamilies(DBOptions(options), dbname_, &cf_names));
2460 ASSERT_EQ(2, cf_names.size());
2461 for (const auto& name : cf_names) {
2462 ASSERT_TRUE(name == kDefaultColumnFamilyName || name == "pikachu");
2463 }
2464 }
2465
TEST_F(DBBasicTest,RecoverWithNoManifest)2466 TEST_F(DBBasicTest, RecoverWithNoManifest) {
2467 Options options = CurrentOptions();
2468 options.env = env_;
2469 DestroyAndReopen(options);
2470 ASSERT_OK(Put("foo", "value"));
2471 ASSERT_OK(Flush());
2472 Close();
2473 {
2474 // Delete all MANIFEST.
2475 std::vector<std::string> files;
2476 ASSERT_OK(env_->GetChildren(dbname_, &files));
2477 for (const auto& file : files) {
2478 uint64_t number = 0;
2479 FileType type = kWalFile;
2480 if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
2481 ASSERT_OK(env_->DeleteFile(dbname_ + "/" + file));
2482 }
2483 }
2484 }
2485 options.best_efforts_recovery = true;
2486 options.create_if_missing = false;
2487 Status s = TryReopen(options);
2488 ASSERT_TRUE(s.IsInvalidArgument());
2489 options.create_if_missing = true;
2490 Reopen(options);
2491 // Since no MANIFEST exists, best-efforts recovery creates a new, empty db.
2492 ASSERT_EQ("NOT_FOUND", Get("foo"));
2493 }
2494
TEST_F(DBBasicTest,SkipWALIfMissingTableFiles)2495 TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
2496 Options options = CurrentOptions();
2497 DestroyAndReopen(options);
2498 TableFileListener* listener = new TableFileListener();
2499 options.listeners.emplace_back(listener);
2500 CreateAndReopenWithCF({"pikachu"}, options);
2501 std::vector<std::string> kAllCfNames = {kDefaultColumnFamilyName, "pikachu"};
2502 size_t num_cfs = handles_.size();
2503 ASSERT_EQ(2, num_cfs);
2504 for (int cf = 0; cf < static_cast<int>(kAllCfNames.size()); ++cf) {
2505 ASSERT_OK(Put(cf, "a", "0_value"));
2506 ASSERT_OK(Flush(cf));
2507 ASSERT_OK(Put(cf, "b", "0_value"));
2508 }
2509 // Delete files
2510 for (size_t i = 0; i < kAllCfNames.size(); ++i) {
2511 std::vector<std::string>& files = listener->GetFiles(kAllCfNames[i]);
2512 ASSERT_EQ(1, files.size());
2513 for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
2514 --j) {
2515 ASSERT_OK(env_->DeleteFile(files[j]));
2516 }
2517 }
2518 options.best_efforts_recovery = true;
2519 ReopenWithColumnFamilies(kAllCfNames, options);
2520 // Verify WAL is not applied
2521 ReadOptions read_opts;
2522 read_opts.total_order_seek = true;
2523 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
2524 iter->SeekToFirst();
2525 ASSERT_FALSE(iter->Valid());
2526 ASSERT_OK(iter->status());
2527 iter.reset(db_->NewIterator(read_opts, handles_[1]));
2528 iter->SeekToFirst();
2529 ASSERT_TRUE(iter->Valid());
2530 ASSERT_EQ("a", iter->key());
2531 iter->Next();
2532 ASSERT_FALSE(iter->Valid());
2533 ASSERT_OK(iter->status());
2534 }
2535
TEST_F(DBBasicTest,DisableTrackWal)2536 TEST_F(DBBasicTest, DisableTrackWal) {
2537 // If WAL tracking was enabled, and then disabled during reopen,
2538 // the previously tracked WALs should be removed from MANIFEST.
2539
2540 Options options = CurrentOptions();
2541 options.track_and_verify_wals_in_manifest = true;
2542 // extremely small write buffer size,
2543 // so that new WALs are created more frequently.
2544 options.write_buffer_size = 100;
2545 options.env = env_;
2546 DestroyAndReopen(options);
2547 for (int i = 0; i < 100; i++) {
2548 ASSERT_OK(Put("foo" + std::to_string(i), "value" + std::to_string(i)));
2549 }
2550 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
2551 ASSERT_OK(db_->SyncWAL());
2552 // Some WALs are tracked.
2553 ASSERT_FALSE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty());
2554 Close();
2555
2556 // Disable WAL tracking.
2557 options.track_and_verify_wals_in_manifest = false;
2558 options.create_if_missing = false;
2559 ASSERT_OK(TryReopen(options));
2560 // Previously tracked WALs are cleared.
2561 ASSERT_TRUE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty());
2562 Close();
2563
2564 // Re-enable WAL tracking again.
2565 options.track_and_verify_wals_in_manifest = true;
2566 options.create_if_missing = false;
2567 ASSERT_OK(TryReopen(options));
2568 ASSERT_TRUE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty());
2569 Close();
2570 }
2571 #endif // !ROCKSDB_LITE
2572
TEST_F(DBBasicTest,ManifestChecksumMismatch)2573 TEST_F(DBBasicTest, ManifestChecksumMismatch) {
2574 Options options = CurrentOptions();
2575 DestroyAndReopen(options);
2576 ASSERT_OK(Put("bar", "value"));
2577 ASSERT_OK(Flush());
2578 SyncPoint::GetInstance()->DisableProcessing();
2579 SyncPoint::GetInstance()->ClearAllCallBacks();
2580 SyncPoint::GetInstance()->SetCallBack(
2581 "LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
2582 auto* crc = reinterpret_cast<uint32_t*>(arg);
2583 *crc = *crc + 1;
2584 });
2585 SyncPoint::GetInstance()->EnableProcessing();
2586
2587 WriteOptions write_opts;
2588 write_opts.disableWAL = true;
2589 Status s = db_->Put(write_opts, "foo", "value");
2590 ASSERT_OK(s);
2591 ASSERT_OK(Flush());
2592 SyncPoint::GetInstance()->DisableProcessing();
2593 SyncPoint::GetInstance()->ClearAllCallBacks();
2594 ASSERT_OK(Put("foo", "value1"));
2595 ASSERT_OK(Flush());
2596 s = TryReopen(options);
2597 ASSERT_TRUE(s.IsCorruption());
2598 }
2599
TEST_F(DBBasicTest,ConcurrentlyCloseDB)2600 TEST_F(DBBasicTest, ConcurrentlyCloseDB) {
2601 Options options = CurrentOptions();
2602 DestroyAndReopen(options);
2603 std::vector<std::thread> workers;
2604 for (int i = 0; i < 10; i++) {
2605 workers.push_back(std::thread([&]() {
2606 auto s = db_->Close();
2607 ASSERT_OK(s);
2608 }));
2609 }
2610 for (auto& w : workers) {
2611 w.join();
2612 }
2613 }
2614
2615 #ifndef ROCKSDB_LITE
2616 class DBBasicTestTrackWal : public DBTestBase,
2617 public testing::WithParamInterface<bool> {
2618 public:
DBBasicTestTrackWal()2619 DBBasicTestTrackWal()
2620 : DBTestBase("db_basic_test_track_wal", /*env_do_fsync=*/false) {}
2621
CountWalFiles()2622 int CountWalFiles() {
2623 VectorLogPtr log_files;
2624 EXPECT_OK(dbfull()->GetSortedWalFiles(log_files));
2625 return static_cast<int>(log_files.size());
2626 };
2627 };
2628
TEST_P(DBBasicTestTrackWal,DoNotTrackObsoleteWal)2629 TEST_P(DBBasicTestTrackWal, DoNotTrackObsoleteWal) {
2630 // If a WAL becomes obsolete after flushing, but is not deleted from disk yet,
2631 // then if SyncWAL is called afterwards, the obsolete WAL should not be
2632 // tracked in MANIFEST.
2633
2634 Options options = CurrentOptions();
2635 options.create_if_missing = true;
2636 options.track_and_verify_wals_in_manifest = true;
2637 options.atomic_flush = GetParam();
2638
2639 DestroyAndReopen(options);
2640 CreateAndReopenWithCF({"cf"}, options);
2641 ASSERT_EQ(handles_.size(), 2); // default, cf
2642 // Do not delete WALs.
2643 ASSERT_OK(db_->DisableFileDeletions());
2644 constexpr int n = 10;
2645 std::vector<std::unique_ptr<LogFile>> wals(n);
2646 for (size_t i = 0; i < n; i++) {
2647 // Generate a new WAL for each key-value.
2648 const int cf = i % 2;
2649 ASSERT_OK(db_->GetCurrentWalFile(&wals[i]));
2650 ASSERT_OK(Put(cf, "k" + std::to_string(i), "v" + std::to_string(i)));
2651 ASSERT_OK(Flush({0, 1}));
2652 }
2653 ASSERT_EQ(CountWalFiles(), n);
2654 // Since all WALs are obsolete, no WAL should be tracked in MANIFEST.
2655 ASSERT_OK(db_->SyncWAL());
2656
2657 // Manually delete all WALs.
2658 Close();
2659 for (const auto& wal : wals) {
2660 ASSERT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));
2661 }
2662
2663 // If SyncWAL tracks the obsolete WALs in MANIFEST,
2664 // reopen will fail because the WALs are missing from disk.
2665 ASSERT_OK(TryReopenWithColumnFamilies({"default", "cf"}, options));
2666 Destroy(options);
2667 }
2668
2669 INSTANTIATE_TEST_CASE_P(DBBasicTestTrackWal, DBBasicTestTrackWal,
2670 testing::Bool());
2671 #endif // ROCKSDB_LITE
2672
2673 class DBBasicTestMultiGet : public DBTestBase {
2674 public:
DBBasicTestMultiGet(std::string test_dir,int num_cfs,bool compressed_cache,bool uncompressed_cache,bool _compression_enabled,bool _fill_cache,uint32_t compression_parallel_threads)2675 DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
2676 bool uncompressed_cache, bool _compression_enabled,
2677 bool _fill_cache, uint32_t compression_parallel_threads)
2678 : DBTestBase(test_dir, /*env_do_fsync=*/false) {
2679 compression_enabled_ = _compression_enabled;
2680 fill_cache_ = _fill_cache;
2681
2682 if (compressed_cache) {
2683 std::shared_ptr<Cache> cache = NewLRUCache(1048576);
2684 compressed_cache_ = std::make_shared<MyBlockCache>(cache);
2685 }
2686 if (uncompressed_cache) {
2687 std::shared_ptr<Cache> cache = NewLRUCache(1048576);
2688 uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
2689 }
2690
2691 env_->count_random_reads_ = true;
2692
2693 Options options = CurrentOptions();
2694 Random rnd(301);
2695 BlockBasedTableOptions table_options;
2696
2697 #ifndef ROCKSDB_LITE
2698 if (compression_enabled_) {
2699 std::vector<CompressionType> compression_types;
2700 compression_types = GetSupportedCompressions();
2701 // Not every platform may have compression libraries available, so
2702 // dynamically pick based on what's available
2703 CompressionType tmp_type = kNoCompression;
2704 for (auto c_type : compression_types) {
2705 if (c_type != kNoCompression) {
2706 tmp_type = c_type;
2707 break;
2708 }
2709 }
2710 if (tmp_type != kNoCompression) {
2711 options.compression = tmp_type;
2712 } else {
2713 compression_enabled_ = false;
2714 }
2715 }
2716 #else
2717 // GetSupportedCompressions() is not available in LITE build
2718 if (!Snappy_Supported()) {
2719 compression_enabled_ = false;
2720 }
2721 #endif // ROCKSDB_LITE
2722
2723 table_options.block_cache = uncompressed_cache_;
2724 if (table_options.block_cache == nullptr) {
2725 table_options.no_block_cache = true;
2726 } else {
2727 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
2728 }
2729 table_options.block_cache_compressed = compressed_cache_;
2730 table_options.flush_block_policy_factory.reset(
2731 new MyFlushBlockPolicyFactory());
2732 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2733 if (!compression_enabled_) {
2734 options.compression = kNoCompression;
2735 } else {
2736 options.compression_opts.parallel_threads = compression_parallel_threads;
2737 }
2738 options_ = options;
2739 Reopen(options);
2740
2741 if (num_cfs > 1) {
2742 for (int cf = 0; cf < num_cfs; ++cf) {
2743 cf_names_.emplace_back("cf" + std::to_string(cf));
2744 }
2745 CreateColumnFamilies(cf_names_, options);
2746 cf_names_.emplace_back("default");
2747 }
2748
2749 std::string zero_str(128, '\0');
2750 for (int cf = 0; cf < num_cfs; ++cf) {
2751 for (int i = 0; i < 100; ++i) {
2752 // Make the value compressible. A purely random string doesn't compress
2753 // and the resultant data block will not be compressed
2754 values_.emplace_back(rnd.RandomString(128) + zero_str);
2755 assert(((num_cfs == 1) ? Put(Key(i), values_[i])
2756 : Put(cf, Key(i), values_[i])) == Status::OK());
2757 }
2758 if (num_cfs == 1) {
2759 EXPECT_OK(Flush());
2760 } else {
2761 EXPECT_OK(dbfull()->Flush(FlushOptions(), handles_[cf]));
2762 }
2763
2764 for (int i = 0; i < 100; ++i) {
2765 // block cannot gain space by compression
2766 uncompressable_values_.emplace_back(rnd.RandomString(256) + '\0');
2767 std::string tmp_key = "a" + Key(i);
2768 assert(((num_cfs == 1) ? Put(tmp_key, uncompressable_values_[i])
2769 : Put(cf, tmp_key, uncompressable_values_[i])) ==
2770 Status::OK());
2771 }
2772 if (num_cfs == 1) {
2773 EXPECT_OK(Flush());
2774 } else {
2775 EXPECT_OK(dbfull()->Flush(FlushOptions(), handles_[cf]));
2776 }
2777 }
2778 // Clear compressed cache, which is always pre-populated
2779 if (compressed_cache_) {
2780 compressed_cache_->SetCapacity(0);
2781 compressed_cache_->SetCapacity(1048576);
2782 }
2783 }
2784
CheckValue(int i,const std::string & value)2785 bool CheckValue(int i, const std::string& value) {
2786 if (values_[i].compare(value) == 0) {
2787 return true;
2788 }
2789 return false;
2790 }
2791
CheckUncompressableValue(int i,const std::string & value)2792 bool CheckUncompressableValue(int i, const std::string& value) {
2793 if (uncompressable_values_[i].compare(value) == 0) {
2794 return true;
2795 }
2796 return false;
2797 }
2798
GetCFNames() const2799 const std::vector<std::string>& GetCFNames() const { return cf_names_; }
2800
num_lookups()2801 int num_lookups() { return uncompressed_cache_->num_lookups(); }
num_found()2802 int num_found() { return uncompressed_cache_->num_found(); }
num_inserts()2803 int num_inserts() { return uncompressed_cache_->num_inserts(); }
2804
num_lookups_compressed()2805 int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
num_found_compressed()2806 int num_found_compressed() { return compressed_cache_->num_found(); }
num_inserts_compressed()2807 int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
2808
fill_cache()2809 bool fill_cache() { return fill_cache_; }
compression_enabled()2810 bool compression_enabled() { return compression_enabled_; }
has_compressed_cache()2811 bool has_compressed_cache() { return compressed_cache_ != nullptr; }
has_uncompressed_cache()2812 bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
get_options()2813 Options get_options() { return options_; }
2814
SetUpTestCase()2815 static void SetUpTestCase() {}
TearDownTestCase()2816 static void TearDownTestCase() {}
2817
2818 protected:
2819 class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
2820 public:
MyFlushBlockPolicyFactory()2821 MyFlushBlockPolicyFactory() {}
2822
Name() const2823 virtual const char* Name() const override {
2824 return "MyFlushBlockPolicyFactory";
2825 }
2826
NewFlushBlockPolicy(const BlockBasedTableOptions &,const BlockBuilder & data_block_builder) const2827 virtual FlushBlockPolicy* NewFlushBlockPolicy(
2828 const BlockBasedTableOptions& /*table_options*/,
2829 const BlockBuilder& data_block_builder) const override {
2830 return new MyFlushBlockPolicy(data_block_builder);
2831 }
2832 };
2833
2834 class MyFlushBlockPolicy : public FlushBlockPolicy {
2835 public:
MyFlushBlockPolicy(const BlockBuilder & data_block_builder)2836 explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
2837 : num_keys_(0), data_block_builder_(data_block_builder) {}
2838
Update(const Slice &,const Slice &)2839 bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
2840 if (data_block_builder_.empty()) {
2841 // First key in this block
2842 num_keys_ = 1;
2843 return false;
2844 }
2845 // Flush every 10 keys
2846 if (num_keys_ == 10) {
2847 num_keys_ = 1;
2848 return true;
2849 }
2850 num_keys_++;
2851 return false;
2852 }
2853
2854 private:
2855 int num_keys_;
2856 const BlockBuilder& data_block_builder_;
2857 };
2858
2859 class MyBlockCache : public CacheWrapper {
2860 public:
MyBlockCache(std::shared_ptr<Cache> target)2861 explicit MyBlockCache(std::shared_ptr<Cache> target)
2862 : CacheWrapper(target),
2863 num_lookups_(0),
2864 num_found_(0),
2865 num_inserts_(0) {}
2866
Name() const2867 const char* Name() const override { return "MyBlockCache"; }
2868
2869 using Cache::Insert;
Insert(const Slice & key,void * value,size_t charge,void (* deleter)(const Slice & key,void * value),Handle ** handle=nullptr,Priority priority=Priority::LOW)2870 Status Insert(const Slice& key, void* value, size_t charge,
2871 void (*deleter)(const Slice& key, void* value),
2872 Handle** handle = nullptr,
2873 Priority priority = Priority::LOW) override {
2874 num_inserts_++;
2875 return target_->Insert(key, value, charge, deleter, handle, priority);
2876 }
2877
2878 using Cache::Lookup;
Lookup(const Slice & key,Statistics * stats=nullptr)2879 Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override {
2880 num_lookups_++;
2881 Handle* handle = target_->Lookup(key, stats);
2882 if (handle != nullptr) {
2883 num_found_++;
2884 }
2885 return handle;
2886 }
num_lookups()2887 int num_lookups() { return num_lookups_; }
2888
num_found()2889 int num_found() { return num_found_; }
2890
num_inserts()2891 int num_inserts() { return num_inserts_; }
2892
2893 private:
2894 int num_lookups_;
2895 int num_found_;
2896 int num_inserts_;
2897 };
2898
2899 std::shared_ptr<MyBlockCache> compressed_cache_;
2900 std::shared_ptr<MyBlockCache> uncompressed_cache_;
2901 Options options_;
2902 bool compression_enabled_;
2903 std::vector<std::string> values_;
2904 std::vector<std::string> uncompressable_values_;
2905 bool fill_cache_;
2906 std::vector<std::string> cf_names_;
2907 };
2908
2909 class DBBasicTestWithParallelIO
2910 : public DBBasicTestMultiGet,
2911 public testing::WithParamInterface<
2912 std::tuple<bool, bool, bool, bool, uint32_t>> {
2913 public:
DBBasicTestWithParallelIO()2914 DBBasicTestWithParallelIO()
2915 : DBBasicTestMultiGet("/db_basic_test_with_parallel_io", 1,
2916 std::get<0>(GetParam()), std::get<1>(GetParam()),
2917 std::get<2>(GetParam()), std::get<3>(GetParam()),
2918 std::get<4>(GetParam())) {}
2919 };
2920
TEST_P(DBBasicTestWithParallelIO,MultiGet)2921 TEST_P(DBBasicTestWithParallelIO, MultiGet) {
2922 std::vector<std::string> key_data(10);
2923 std::vector<Slice> keys;
2924 // We cannot resize a PinnableSlice vector, so just set initial size to
2925 // largest we think we will need
2926 std::vector<PinnableSlice> values(10);
2927 std::vector<Status> statuses;
2928 ReadOptions ro;
2929 ro.fill_cache = fill_cache();
2930
2931 // Warm up the cache first
2932 key_data.emplace_back(Key(0));
2933 keys.emplace_back(Slice(key_data.back()));
2934 key_data.emplace_back(Key(50));
2935 keys.emplace_back(Slice(key_data.back()));
2936 statuses.resize(keys.size());
2937
2938 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2939 keys.data(), values.data(), statuses.data(), true);
2940 ASSERT_TRUE(CheckValue(0, values[0].ToString()));
2941 ASSERT_TRUE(CheckValue(50, values[1].ToString()));
2942
2943 int random_reads = env_->random_read_counter_.Read();
2944 key_data[0] = Key(1);
2945 key_data[1] = Key(51);
2946 keys[0] = Slice(key_data[0]);
2947 keys[1] = Slice(key_data[1]);
2948 values[0].Reset();
2949 values[1].Reset();
2950 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2951 keys.data(), values.data(), statuses.data(), true);
2952 ASSERT_TRUE(CheckValue(1, values[0].ToString()));
2953 ASSERT_TRUE(CheckValue(51, values[1].ToString()));
2954
2955 bool read_from_cache = false;
2956 if (fill_cache()) {
2957 if (has_uncompressed_cache()) {
2958 read_from_cache = true;
2959 } else if (has_compressed_cache() && compression_enabled()) {
2960 read_from_cache = true;
2961 }
2962 }
2963
2964 int expected_reads = random_reads + (read_from_cache ? 0 : 2);
2965 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2966
2967 keys.resize(10);
2968 statuses.resize(10);
2969 std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2970 for (size_t i = 0; i < key_ints.size(); ++i) {
2971 key_data[i] = Key(key_ints[i]);
2972 keys[i] = Slice(key_data[i]);
2973 statuses[i] = Status::OK();
2974 values[i].Reset();
2975 }
2976 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2977 keys.data(), values.data(), statuses.data(), true);
2978 for (size_t i = 0; i < key_ints.size(); ++i) {
2979 ASSERT_OK(statuses[i]);
2980 ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
2981 }
2982 if (compression_enabled() && !has_compressed_cache()) {
2983 expected_reads += (read_from_cache ? 2 : 3);
2984 } else {
2985 expected_reads += (read_from_cache ? 2 : 4);
2986 }
2987 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2988
2989 keys.resize(10);
2990 statuses.resize(10);
2991 std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2992 for (size_t i = 0; i < key_uncmp.size(); ++i) {
2993 key_data[i] = "a" + Key(key_uncmp[i]);
2994 keys[i] = Slice(key_data[i]);
2995 statuses[i] = Status::OK();
2996 values[i].Reset();
2997 }
2998 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2999 keys.data(), values.data(), statuses.data(), true);
3000 for (size_t i = 0; i < key_uncmp.size(); ++i) {
3001 ASSERT_OK(statuses[i]);
3002 ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
3003 }
3004 if (compression_enabled() && !has_compressed_cache()) {
3005 expected_reads += (read_from_cache ? 3 : 3);
3006 } else {
3007 expected_reads += (read_from_cache ? 4 : 4);
3008 }
3009 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
3010
3011 keys.resize(5);
3012 statuses.resize(5);
3013 std::vector<int> key_tr{1, 2, 15, 16, 55};
3014 for (size_t i = 0; i < key_tr.size(); ++i) {
3015 key_data[i] = "a" + Key(key_tr[i]);
3016 keys[i] = Slice(key_data[i]);
3017 statuses[i] = Status::OK();
3018 values[i].Reset();
3019 }
3020 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3021 keys.data(), values.data(), statuses.data(), true);
3022 for (size_t i = 0; i < key_tr.size(); ++i) {
3023 ASSERT_OK(statuses[i]);
3024 ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
3025 }
3026 if (compression_enabled() && !has_compressed_cache()) {
3027 expected_reads += (read_from_cache ? 0 : 2);
3028 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
3029 } else {
3030 if (has_uncompressed_cache()) {
3031 expected_reads += (read_from_cache ? 0 : 3);
3032 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
3033 } else {
3034 // A rare case, even we enable the block compression but some of data
3035 // blocks are not compressed due to content. If user only enable the
3036 // compressed cache, the uncompressed blocks will not tbe cached, and
3037 // block reads will be triggered. The number of reads is related to
3038 // the compression algorithm.
3039 ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
3040 }
3041 }
3042 }
3043
3044 #ifndef ROCKSDB_LITE
TEST_P(DBBasicTestWithParallelIO,MultiGetDirectIO)3045 TEST_P(DBBasicTestWithParallelIO, MultiGetDirectIO) {
3046 class FakeDirectIOEnv : public EnvWrapper {
3047 class FakeDirectIOSequentialFile;
3048 class FakeDirectIORandomAccessFile;
3049
3050 public:
3051 FakeDirectIOEnv(Env* env) : EnvWrapper(env) {}
3052
3053 Status NewRandomAccessFile(const std::string& fname,
3054 std::unique_ptr<RandomAccessFile>* result,
3055 const EnvOptions& options) override {
3056 std::unique_ptr<RandomAccessFile> file;
3057 assert(options.use_direct_reads);
3058 EnvOptions opts = options;
3059 opts.use_direct_reads = false;
3060 Status s = target()->NewRandomAccessFile(fname, &file, opts);
3061 if (!s.ok()) {
3062 return s;
3063 }
3064 result->reset(new FakeDirectIORandomAccessFile(std::move(file)));
3065 return s;
3066 }
3067
3068 private:
3069 class FakeDirectIOSequentialFile : public SequentialFileWrapper {
3070 public:
3071 FakeDirectIOSequentialFile(std::unique_ptr<SequentialFile>&& file)
3072 : SequentialFileWrapper(file.get()), file_(std::move(file)) {}
3073 ~FakeDirectIOSequentialFile() {}
3074
3075 bool use_direct_io() const override { return true; }
3076 size_t GetRequiredBufferAlignment() const override { return 1; }
3077
3078 private:
3079 std::unique_ptr<SequentialFile> file_;
3080 };
3081
3082 class FakeDirectIORandomAccessFile : public RandomAccessFileWrapper {
3083 public:
3084 FakeDirectIORandomAccessFile(std::unique_ptr<RandomAccessFile>&& file)
3085 : RandomAccessFileWrapper(file.get()), file_(std::move(file)) {}
3086 ~FakeDirectIORandomAccessFile() {}
3087
3088 bool use_direct_io() const override { return true; }
3089 size_t GetRequiredBufferAlignment() const override { return 1; }
3090
3091 private:
3092 std::unique_ptr<RandomAccessFile> file_;
3093 };
3094 };
3095
3096 std::unique_ptr<FakeDirectIOEnv> env(new FakeDirectIOEnv(env_));
3097 Options opts = get_options();
3098 opts.env = env.get();
3099 opts.use_direct_reads = true;
3100 Reopen(opts);
3101
3102 std::vector<std::string> key_data(10);
3103 std::vector<Slice> keys;
3104 // We cannot resize a PinnableSlice vector, so just set initial size to
3105 // largest we think we will need
3106 std::vector<PinnableSlice> values(10);
3107 std::vector<Status> statuses;
3108 ReadOptions ro;
3109 ro.fill_cache = fill_cache();
3110
3111 // Warm up the cache first
3112 key_data.emplace_back(Key(0));
3113 keys.emplace_back(Slice(key_data.back()));
3114 key_data.emplace_back(Key(50));
3115 keys.emplace_back(Slice(key_data.back()));
3116 statuses.resize(keys.size());
3117
3118 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3119 keys.data(), values.data(), statuses.data(), true);
3120 ASSERT_TRUE(CheckValue(0, values[0].ToString()));
3121 ASSERT_TRUE(CheckValue(50, values[1].ToString()));
3122
3123 int random_reads = env_->random_read_counter_.Read();
3124 key_data[0] = Key(1);
3125 key_data[1] = Key(51);
3126 keys[0] = Slice(key_data[0]);
3127 keys[1] = Slice(key_data[1]);
3128 values[0].Reset();
3129 values[1].Reset();
3130 if (uncompressed_cache_) {
3131 uncompressed_cache_->SetCapacity(0);
3132 uncompressed_cache_->SetCapacity(1048576);
3133 }
3134 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3135 keys.data(), values.data(), statuses.data(), true);
3136 ASSERT_TRUE(CheckValue(1, values[0].ToString()));
3137 ASSERT_TRUE(CheckValue(51, values[1].ToString()));
3138
3139 bool read_from_cache = false;
3140 if (fill_cache()) {
3141 if (has_uncompressed_cache()) {
3142 read_from_cache = true;
3143 } else if (has_compressed_cache() && compression_enabled()) {
3144 read_from_cache = true;
3145 }
3146 }
3147
3148 int expected_reads = random_reads;
3149 if (!compression_enabled() || !has_compressed_cache()) {
3150 expected_reads += 2;
3151 } else {
3152 expected_reads += (read_from_cache ? 0 : 2);
3153 }
3154 if (env_->random_read_counter_.Read() != expected_reads) {
3155 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
3156 }
3157 Close();
3158 }
3159 #endif // ROCKSDB_LITE
3160
TEST_P(DBBasicTestWithParallelIO,MultiGetWithChecksumMismatch)3161 TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
3162 std::vector<std::string> key_data(10);
3163 std::vector<Slice> keys;
3164 // We cannot resize a PinnableSlice vector, so just set initial size to
3165 // largest we think we will need
3166 std::vector<PinnableSlice> values(10);
3167 std::vector<Status> statuses;
3168 int read_count = 0;
3169 ReadOptions ro;
3170 ro.fill_cache = fill_cache();
3171
3172 SyncPoint::GetInstance()->SetCallBack(
3173 "RetrieveMultipleBlocks:VerifyChecksum", [&](void* status) {
3174 Status* s = static_cast<Status*>(status);
3175 read_count++;
3176 if (read_count == 2) {
3177 *s = Status::Corruption();
3178 }
3179 });
3180 SyncPoint::GetInstance()->EnableProcessing();
3181
3182 // Warm up the cache first
3183 key_data.emplace_back(Key(0));
3184 keys.emplace_back(Slice(key_data.back()));
3185 key_data.emplace_back(Key(50));
3186 keys.emplace_back(Slice(key_data.back()));
3187 statuses.resize(keys.size());
3188
3189 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3190 keys.data(), values.data(), statuses.data(), true);
3191 ASSERT_TRUE(CheckValue(0, values[0].ToString()));
3192 // ASSERT_TRUE(CheckValue(50, values[1].ToString()));
3193 ASSERT_EQ(statuses[0], Status::OK());
3194 ASSERT_EQ(statuses[1], Status::Corruption());
3195
3196 SyncPoint::GetInstance()->DisableProcessing();
3197 }
3198
TEST_P(DBBasicTestWithParallelIO,MultiGetWithMissingFile)3199 TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
3200 std::vector<std::string> key_data(10);
3201 std::vector<Slice> keys;
3202 // We cannot resize a PinnableSlice vector, so just set initial size to
3203 // largest we think we will need
3204 std::vector<PinnableSlice> values(10);
3205 std::vector<Status> statuses;
3206 ReadOptions ro;
3207 ro.fill_cache = fill_cache();
3208
3209 SyncPoint::GetInstance()->SetCallBack(
3210 "TableCache::MultiGet:FindTable", [&](void* status) {
3211 Status* s = static_cast<Status*>(status);
3212 *s = Status::IOError();
3213 });
3214 // DB open will create table readers unless we reduce the table cache
3215 // capacity.
3216 // SanitizeOptions will set max_open_files to minimum of 20. Table cache
3217 // is allocated with max_open_files - 10 as capacity. So override
3218 // max_open_files to 11 so table cache capacity will become 1. This will
3219 // prevent file open during DB open and force the file to be opened
3220 // during MultiGet
3221 SyncPoint::GetInstance()->SetCallBack(
3222 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3223 int* max_open_files = (int*)arg;
3224 *max_open_files = 11;
3225 });
3226 SyncPoint::GetInstance()->EnableProcessing();
3227
3228 Reopen(CurrentOptions());
3229
3230 // Warm up the cache first
3231 key_data.emplace_back(Key(0));
3232 keys.emplace_back(Slice(key_data.back()));
3233 key_data.emplace_back(Key(50));
3234 keys.emplace_back(Slice(key_data.back()));
3235 statuses.resize(keys.size());
3236
3237 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
3238 keys.data(), values.data(), statuses.data(), true);
3239 ASSERT_EQ(statuses[0], Status::IOError());
3240 ASSERT_EQ(statuses[1], Status::IOError());
3241
3242 SyncPoint::GetInstance()->DisableProcessing();
3243 }
3244
3245 INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO,
3246 // Params are as follows -
3247 // Param 0 - Compressed cache enabled
3248 // Param 1 - Uncompressed cache enabled
3249 // Param 2 - Data compression enabled
3250 // Param 3 - ReadOptions::fill_cache
3251 // Param 4 - CompressionOptions::parallel_threads
3252 ::testing::Combine(::testing::Bool(), ::testing::Bool(),
3253 ::testing::Bool(), ::testing::Bool(),
3254 ::testing::Values(1, 4)));
3255
3256 // Forward declaration
3257 class DeadlineFS;
3258
3259 class DeadlineRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
3260 public:
DeadlineRandomAccessFile(DeadlineFS & fs,std::unique_ptr<FSRandomAccessFile> & file)3261 DeadlineRandomAccessFile(DeadlineFS& fs,
3262 std::unique_ptr<FSRandomAccessFile>& file)
3263 : FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {}
3264
3265 IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
3266 Slice* result, char* scratch,
3267 IODebugContext* dbg) const override;
3268
3269 IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
3270 const IOOptions& options, IODebugContext* dbg) override;
3271
3272 private:
3273 DeadlineFS& fs_;
3274 std::unique_ptr<FSRandomAccessFile> file_;
3275 };
3276
3277 class DeadlineFS : public FileSystemWrapper {
3278 public:
3279 // The error_on_delay parameter specifies whether a IOStatus::TimedOut()
3280 // status should be returned after delaying the IO to exceed the timeout,
3281 // or to simply delay but return success anyway. The latter mimics the
3282 // behavior of PosixFileSystem, which does not enforce any timeout
DeadlineFS(SpecialEnv * env,bool error_on_delay)3283 explicit DeadlineFS(SpecialEnv* env, bool error_on_delay)
3284 : FileSystemWrapper(env->GetFileSystem()),
3285 deadline_(std::chrono::microseconds::zero()),
3286 io_timeout_(std::chrono::microseconds::zero()),
3287 env_(env),
3288 timedout_(false),
3289 ignore_deadline_(false),
3290 error_on_delay_(error_on_delay) {}
3291
NewRandomAccessFile(const std::string & fname,const FileOptions & opts,std::unique_ptr<FSRandomAccessFile> * result,IODebugContext * dbg)3292 IOStatus NewRandomAccessFile(const std::string& fname,
3293 const FileOptions& opts,
3294 std::unique_ptr<FSRandomAccessFile>* result,
3295 IODebugContext* dbg) override {
3296 std::unique_ptr<FSRandomAccessFile> file;
3297 IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
3298 EXPECT_OK(s);
3299 result->reset(new DeadlineRandomAccessFile(*this, file));
3300
3301 const std::chrono::microseconds deadline = GetDeadline();
3302 const std::chrono::microseconds io_timeout = GetIOTimeout();
3303 if (deadline.count() || io_timeout.count()) {
3304 AssertDeadline(deadline, io_timeout, opts.io_options);
3305 }
3306 return ShouldDelay(opts.io_options);
3307 }
3308
3309 // Set a vector of {IO counter, delay in microseconds, return status} tuples
3310 // that control when to inject a delay and duration of the delay
SetDelayTrigger(const std::chrono::microseconds deadline,const std::chrono::microseconds io_timeout,const int trigger)3311 void SetDelayTrigger(const std::chrono::microseconds deadline,
3312 const std::chrono::microseconds io_timeout,
3313 const int trigger) {
3314 delay_trigger_ = trigger;
3315 io_count_ = 0;
3316 deadline_ = deadline;
3317 io_timeout_ = io_timeout;
3318 timedout_ = false;
3319 }
3320
3321 // Increment the IO counter and return a delay in microseconds
ShouldDelay(const IOOptions & opts)3322 IOStatus ShouldDelay(const IOOptions& opts) {
3323 if (timedout_) {
3324 return IOStatus::TimedOut();
3325 } else if (!deadline_.count() && !io_timeout_.count()) {
3326 return IOStatus::OK();
3327 }
3328 if (!ignore_deadline_ && delay_trigger_ == io_count_++) {
3329 env_->SleepForMicroseconds(static_cast<int>(opts.timeout.count() + 1));
3330 timedout_ = true;
3331 if (error_on_delay_) {
3332 return IOStatus::TimedOut();
3333 }
3334 }
3335 return IOStatus::OK();
3336 }
3337
GetDeadline()3338 const std::chrono::microseconds GetDeadline() {
3339 return ignore_deadline_ ? std::chrono::microseconds::zero() : deadline_;
3340 }
3341
GetIOTimeout()3342 const std::chrono::microseconds GetIOTimeout() {
3343 return ignore_deadline_ ? std::chrono::microseconds::zero() : io_timeout_;
3344 }
3345
TimedOut()3346 bool TimedOut() { return timedout_; }
3347
IgnoreDeadline(bool ignore)3348 void IgnoreDeadline(bool ignore) { ignore_deadline_ = ignore; }
3349
AssertDeadline(const std::chrono::microseconds deadline,const std::chrono::microseconds io_timeout,const IOOptions & opts) const3350 void AssertDeadline(const std::chrono::microseconds deadline,
3351 const std::chrono::microseconds io_timeout,
3352 const IOOptions& opts) const {
3353 // Give a leeway of +- 10us as it can take some time for the Get/
3354 // MultiGet call to reach here, in order to avoid false alarms
3355 std::chrono::microseconds now =
3356 std::chrono::microseconds(env_->NowMicros());
3357 std::chrono::microseconds timeout;
3358 if (deadline.count()) {
3359 timeout = deadline - now;
3360 if (io_timeout.count()) {
3361 timeout = std::min(timeout, io_timeout);
3362 }
3363 } else {
3364 timeout = io_timeout;
3365 }
3366 if (opts.timeout != timeout) {
3367 ASSERT_EQ(timeout, opts.timeout);
3368 }
3369 }
3370
3371 private:
3372 // The number of IOs to trigger the delay after
3373 int delay_trigger_;
3374 // Current IO count
3375 int io_count_;
3376 // ReadOptions deadline for the Get/MultiGet/Iterator
3377 std::chrono::microseconds deadline_;
3378 // ReadOptions io_timeout for the Get/MultiGet/Iterator
3379 std::chrono::microseconds io_timeout_;
3380 SpecialEnv* env_;
3381 // Flag to indicate whether we injected a delay
3382 bool timedout_;
3383 // Temporarily ignore deadlines/timeouts
3384 bool ignore_deadline_;
3385 // Return IOStatus::TimedOut() or IOStatus::OK()
3386 bool error_on_delay_;
3387 };
3388
Read(uint64_t offset,size_t len,const IOOptions & opts,Slice * result,char * scratch,IODebugContext * dbg) const3389 IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len,
3390 const IOOptions& opts, Slice* result,
3391 char* scratch,
3392 IODebugContext* dbg) const {
3393 const std::chrono::microseconds deadline = fs_.GetDeadline();
3394 const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
3395 IOStatus s;
3396 if (deadline.count() || io_timeout.count()) {
3397 fs_.AssertDeadline(deadline, io_timeout, opts);
3398 }
3399 if (s.ok()) {
3400 s = FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch,
3401 dbg);
3402 }
3403 if (s.ok()) {
3404 s = fs_.ShouldDelay(opts);
3405 }
3406 return s;
3407 }
3408
MultiRead(FSReadRequest * reqs,size_t num_reqs,const IOOptions & options,IODebugContext * dbg)3409 IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs,
3410 size_t num_reqs,
3411 const IOOptions& options,
3412 IODebugContext* dbg) {
3413 const std::chrono::microseconds deadline = fs_.GetDeadline();
3414 const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
3415 IOStatus s;
3416 if (deadline.count() || io_timeout.count()) {
3417 fs_.AssertDeadline(deadline, io_timeout, options);
3418 }
3419 if (s.ok()) {
3420 s = FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg);
3421 }
3422 if (s.ok()) {
3423 s = fs_.ShouldDelay(options);
3424 }
3425 return s;
3426 }
3427
3428 // A test class for intercepting random reads and injecting artificial
3429 // delays. Used for testing the MultiGet deadline feature
3430 class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
3431 public:
DBBasicTestMultiGetDeadline()3432 DBBasicTestMultiGetDeadline()
3433 : DBBasicTestMultiGet(
3434 "db_basic_test_multiget_deadline" /*Test dir*/,
3435 10 /*# of column families*/, false /*compressed cache enabled*/,
3436 true /*uncompressed cache enabled*/, true /*compression enabled*/,
3437 true /*ReadOptions.fill_cache*/,
3438 1 /*# of parallel compression threads*/) {}
3439
CheckStatus(std::vector<Status> & statuses,size_t num_ok)3440 inline void CheckStatus(std::vector<Status>& statuses, size_t num_ok) {
3441 for (size_t i = 0; i < statuses.size(); ++i) {
3442 if (i < num_ok) {
3443 EXPECT_OK(statuses[i]);
3444 } else {
3445 if (statuses[i] != Status::TimedOut()) {
3446 EXPECT_EQ(statuses[i], Status::TimedOut());
3447 }
3448 }
3449 }
3450 }
3451 };
3452
TEST_F(DBBasicTestMultiGetDeadline,MultiGetDeadlineExceeded)3453 TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
3454 std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, false);
3455 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
3456 Options options = CurrentOptions();
3457
3458 std::shared_ptr<Cache> cache = NewLRUCache(1048576);
3459 BlockBasedTableOptions table_options;
3460 table_options.block_cache = cache;
3461 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
3462 options.env = env.get();
3463 SetTimeElapseOnlySleepOnReopen(&options);
3464 ReopenWithColumnFamilies(GetCFNames(), options);
3465
3466 // Test the non-batched version of MultiGet with multiple column
3467 // families
3468 std::vector<std::string> key_str;
3469 size_t i;
3470 for (i = 0; i < 5; ++i) {
3471 key_str.emplace_back(Key(static_cast<int>(i)));
3472 }
3473 std::vector<ColumnFamilyHandle*> cfs(key_str.size());
3474 ;
3475 std::vector<Slice> keys(key_str.size());
3476 std::vector<std::string> values(key_str.size());
3477 for (i = 0; i < key_str.size(); ++i) {
3478 cfs[i] = handles_[i];
3479 keys[i] = Slice(key_str[i].data(), key_str[i].size());
3480 }
3481
3482 ReadOptions ro;
3483 ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3484 // Delay the first IO
3485 fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
3486
3487 std::vector<Status> statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
3488 // The first key is successful because we check after the lookup, but
3489 // subsequent keys fail due to deadline exceeded
3490 CheckStatus(statuses, 1);
3491
3492 // Clear the cache
3493 cache->SetCapacity(0);
3494 cache->SetCapacity(1048576);
3495 // Test non-batched Multiget with multiple column families and
3496 // introducing an IO delay in one of the middle CFs
3497 key_str.clear();
3498 for (i = 0; i < 10; ++i) {
3499 key_str.emplace_back(Key(static_cast<int>(i)));
3500 }
3501 cfs.resize(key_str.size());
3502 keys.resize(key_str.size());
3503 values.resize(key_str.size());
3504 for (i = 0; i < key_str.size(); ++i) {
3505 // 2 keys per CF
3506 cfs[i] = handles_[i / 2];
3507 keys[i] = Slice(key_str[i].data(), key_str[i].size());
3508 }
3509 ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3510 fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1);
3511 statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
3512 CheckStatus(statuses, 3);
3513
3514 // Test batched MultiGet with an IO delay in the first data block read.
3515 // Both keys in the first CF should succeed as they're in the same data
3516 // block and would form one batch, and we check for deadline between
3517 // batches.
3518 std::vector<PinnableSlice> pin_values(keys.size());
3519 cache->SetCapacity(0);
3520 cache->SetCapacity(1048576);
3521 statuses.clear();
3522 statuses.resize(keys.size());
3523 ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3524 fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
3525 dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
3526 pin_values.data(), statuses.data());
3527 CheckStatus(statuses, 2);
3528
3529 // Similar to the previous one, but an IO delay in the third CF data block
3530 // read
3531 for (PinnableSlice& value : pin_values) {
3532 value.Reset();
3533 }
3534 cache->SetCapacity(0);
3535 cache->SetCapacity(1048576);
3536 statuses.clear();
3537 statuses.resize(keys.size());
3538 ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3539 fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 2);
3540 dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
3541 pin_values.data(), statuses.data());
3542 CheckStatus(statuses, 6);
3543
3544 // Similar to the previous one, but an IO delay in the last but one CF
3545 for (PinnableSlice& value : pin_values) {
3546 value.Reset();
3547 }
3548 cache->SetCapacity(0);
3549 cache->SetCapacity(1048576);
3550 statuses.clear();
3551 statuses.resize(keys.size());
3552 ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3553 fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 3);
3554 dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
3555 pin_values.data(), statuses.data());
3556 CheckStatus(statuses, 8);
3557
3558 // Test batched MultiGet with single CF and lots of keys. Inject delay
3559 // into the second batch of keys. As each batch is 32, the first 64 keys,
3560 // i.e first two batches, should succeed and the rest should time out
3561 for (PinnableSlice& value : pin_values) {
3562 value.Reset();
3563 }
3564 cache->SetCapacity(0);
3565 cache->SetCapacity(1048576);
3566 key_str.clear();
3567 for (i = 0; i < 100; ++i) {
3568 key_str.emplace_back(Key(static_cast<int>(i)));
3569 }
3570 keys.resize(key_str.size());
3571 pin_values.clear();
3572 pin_values.resize(key_str.size());
3573 for (i = 0; i < key_str.size(); ++i) {
3574 keys[i] = Slice(key_str[i].data(), key_str[i].size());
3575 }
3576 statuses.clear();
3577 statuses.resize(keys.size());
3578 ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3579 fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1);
3580 dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(),
3581 pin_values.data(), statuses.data());
3582 CheckStatus(statuses, 64);
3583 Close();
3584 }
3585
TEST_F(DBBasicTest,ManifestWriteFailure)3586 TEST_F(DBBasicTest, ManifestWriteFailure) {
3587 Options options = GetDefaultOptions();
3588 options.create_if_missing = true;
3589 options.disable_auto_compactions = true;
3590 options.env = env_;
3591 DestroyAndReopen(options);
3592 ASSERT_OK(Put("foo", "bar"));
3593 ASSERT_OK(Flush());
3594 SyncPoint::GetInstance()->DisableProcessing();
3595 SyncPoint::GetInstance()->ClearAllCallBacks();
3596 SyncPoint::GetInstance()->SetCallBack(
3597 "VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
3598 ASSERT_NE(nullptr, arg);
3599 auto* s = reinterpret_cast<Status*>(arg);
3600 ASSERT_OK(*s);
3601 // Manually overwrite return status
3602 *s = Status::IOError();
3603 });
3604 SyncPoint::GetInstance()->EnableProcessing();
3605 ASSERT_OK(Put("key", "value"));
3606 ASSERT_NOK(Flush());
3607 SyncPoint::GetInstance()->DisableProcessing();
3608 SyncPoint::GetInstance()->ClearAllCallBacks();
3609 SyncPoint::GetInstance()->EnableProcessing();
3610 Reopen(options);
3611 }
3612
3613 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,VerifyFileChecksums)3614 TEST_F(DBBasicTest, VerifyFileChecksums) {
3615 Options options = GetDefaultOptions();
3616 options.create_if_missing = true;
3617 options.env = env_;
3618 DestroyAndReopen(options);
3619 ASSERT_OK(Put("a", "value"));
3620 ASSERT_OK(Flush());
3621 ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
3622
3623 options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
3624 Reopen(options);
3625 ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
3626
3627 // Write an L0 with checksum computed.
3628 ASSERT_OK(Put("b", "value"));
3629 ASSERT_OK(Flush());
3630
3631 ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
3632
3633 // Does the right thing but with the wrong name -- using it should lead to an
3634 // error.
3635 class MisnamedFileChecksumGenerator : public FileChecksumGenCrc32c {
3636 public:
3637 MisnamedFileChecksumGenerator(const FileChecksumGenContext& context)
3638 : FileChecksumGenCrc32c(context) {}
3639
3640 const char* Name() const override { return "sha1"; }
3641 };
3642
3643 class MisnamedFileChecksumGenFactory : public FileChecksumGenCrc32cFactory {
3644 public:
3645 std::unique_ptr<FileChecksumGenerator> CreateFileChecksumGenerator(
3646 const FileChecksumGenContext& context) override {
3647 return std::unique_ptr<FileChecksumGenerator>(
3648 new MisnamedFileChecksumGenerator(context));
3649 }
3650 };
3651
3652 options.file_checksum_gen_factory.reset(new MisnamedFileChecksumGenFactory());
3653 Reopen(options);
3654 ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
3655 }
3656 #endif // !ROCKSDB_LITE
3657
3658 // A test class for intercepting random reads and injecting artificial
3659 // delays. Used for testing the deadline/timeout feature
3660 class DBBasicTestDeadline
3661 : public DBBasicTest,
3662 public testing::WithParamInterface<std::tuple<bool, bool>> {};
3663
TEST_P(DBBasicTestDeadline,PointLookupDeadline)3664 TEST_P(DBBasicTestDeadline, PointLookupDeadline) {
3665 std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
3666 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
3667 bool set_deadline = std::get<0>(GetParam());
3668 bool set_timeout = std::get<1>(GetParam());
3669
3670 for (int option_config = kDefault; option_config < kEnd; ++option_config) {
3671 if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
3672 continue;
3673 }
3674 option_config_ = option_config;
3675 Options options = CurrentOptions();
3676 if (options.use_direct_reads) {
3677 continue;
3678 }
3679 options.env = env.get();
3680 options.disable_auto_compactions = true;
3681 Cache* block_cache = nullptr;
3682 // Fileter block reads currently don't cause the request to get
3683 // aborted on a read timeout, so its possible those block reads
3684 // may get issued even if the deadline is past
3685 SyncPoint::GetInstance()->SetCallBack(
3686 "BlockBasedTable::Get:BeforeFilterMatch",
3687 [&](void* /*arg*/) { fs->IgnoreDeadline(true); });
3688 SyncPoint::GetInstance()->SetCallBack(
3689 "BlockBasedTable::Get:AfterFilterMatch",
3690 [&](void* /*arg*/) { fs->IgnoreDeadline(false); });
3691 // DB open will create table readers unless we reduce the table cache
3692 // capacity.
3693 // SanitizeOptions will set max_open_files to minimum of 20. Table cache
3694 // is allocated with max_open_files - 10 as capacity. So override
3695 // max_open_files to 11 so table cache capacity will become 1. This will
3696 // prevent file open during DB open and force the file to be opened
3697 // during MultiGet
3698 SyncPoint::GetInstance()->SetCallBack(
3699 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3700 int* max_open_files = (int*)arg;
3701 *max_open_files = 11;
3702 });
3703 SyncPoint::GetInstance()->EnableProcessing();
3704
3705 SetTimeElapseOnlySleepOnReopen(&options);
3706 Reopen(options);
3707
3708 if (options.table_factory) {
3709 block_cache = options.table_factory->GetOptions<Cache>(
3710 TableFactory::kBlockCacheOpts());
3711 }
3712
3713 Random rnd(301);
3714 for (int i = 0; i < 400; ++i) {
3715 std::string key = "k" + ToString(i);
3716 ASSERT_OK(Put(key, rnd.RandomString(100)));
3717 }
3718 ASSERT_OK(Flush());
3719
3720 bool timedout = true;
3721 // A timeout will be forced when the IO counter reaches this value
3722 int io_deadline_trigger = 0;
3723 // Keep incrementing io_deadline_trigger and call Get() until there is an
3724 // iteration that doesn't cause a timeout. This ensures that we cover
3725 // all file reads in the point lookup path that can potentially timeout
3726 // and cause the Get() to fail.
3727 while (timedout) {
3728 ReadOptions ro;
3729 if (set_deadline) {
3730 ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3731 }
3732 if (set_timeout) {
3733 ro.io_timeout = std::chrono::microseconds{5000};
3734 }
3735 fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
3736
3737 block_cache->SetCapacity(0);
3738 block_cache->SetCapacity(1048576);
3739
3740 std::string value;
3741 Status s = dbfull()->Get(ro, "k50", &value);
3742 if (fs->TimedOut()) {
3743 ASSERT_EQ(s, Status::TimedOut());
3744 } else {
3745 timedout = false;
3746 ASSERT_OK(s);
3747 }
3748 io_deadline_trigger++;
3749 }
3750 // Reset the delay sequence in order to avoid false alarms during Reopen
3751 fs->SetDelayTrigger(std::chrono::microseconds::zero(),
3752 std::chrono::microseconds::zero(), 0);
3753 }
3754 Close();
3755 }
3756
TEST_P(DBBasicTestDeadline,IteratorDeadline)3757 TEST_P(DBBasicTestDeadline, IteratorDeadline) {
3758 std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
3759 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
3760 bool set_deadline = std::get<0>(GetParam());
3761 bool set_timeout = std::get<1>(GetParam());
3762
3763 for (int option_config = kDefault; option_config < kEnd; ++option_config) {
3764 if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
3765 continue;
3766 }
3767 Options options = CurrentOptions();
3768 if (options.use_direct_reads) {
3769 continue;
3770 }
3771 options.env = env.get();
3772 options.disable_auto_compactions = true;
3773 Cache* block_cache = nullptr;
3774 // DB open will create table readers unless we reduce the table cache
3775 // capacity.
3776 // SanitizeOptions will set max_open_files to minimum of 20. Table cache
3777 // is allocated with max_open_files - 10 as capacity. So override
3778 // max_open_files to 11 so table cache capacity will become 1. This will
3779 // prevent file open during DB open and force the file to be opened
3780 // during MultiGet
3781 SyncPoint::GetInstance()->SetCallBack(
3782 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3783 int* max_open_files = (int*)arg;
3784 *max_open_files = 11;
3785 });
3786 SyncPoint::GetInstance()->EnableProcessing();
3787
3788 SetTimeElapseOnlySleepOnReopen(&options);
3789 Reopen(options);
3790
3791 if (options.table_factory) {
3792 block_cache = options.table_factory->GetOptions<Cache>(
3793 TableFactory::kBlockCacheOpts());
3794 }
3795
3796 Random rnd(301);
3797 for (int i = 0; i < 400; ++i) {
3798 std::string key = "k" + ToString(i);
3799 ASSERT_OK(Put(key, rnd.RandomString(100)));
3800 }
3801 ASSERT_OK(Flush());
3802
3803 bool timedout = true;
3804 // A timeout will be forced when the IO counter reaches this value
3805 int io_deadline_trigger = 0;
3806 // Keep incrementing io_deadline_trigger and call Get() until there is an
3807 // iteration that doesn't cause a timeout. This ensures that we cover
3808 // all file reads in the point lookup path that can potentially timeout
3809 while (timedout) {
3810 ReadOptions ro;
3811 if (set_deadline) {
3812 ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
3813 }
3814 if (set_timeout) {
3815 ro.io_timeout = std::chrono::microseconds{5000};
3816 }
3817 fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
3818
3819 block_cache->SetCapacity(0);
3820 block_cache->SetCapacity(1048576);
3821
3822 Iterator* iter = dbfull()->NewIterator(ro);
3823 int count = 0;
3824 iter->Seek("k50");
3825 while (iter->Valid() && count++ < 100) {
3826 iter->Next();
3827 }
3828 if (fs->TimedOut()) {
3829 ASSERT_FALSE(iter->Valid());
3830 ASSERT_EQ(iter->status(), Status::TimedOut());
3831 } else {
3832 timedout = false;
3833 ASSERT_OK(iter->status());
3834 }
3835 delete iter;
3836 io_deadline_trigger++;
3837 }
3838 // Reset the delay sequence in order to avoid false alarms during Reopen
3839 fs->SetDelayTrigger(std::chrono::microseconds::zero(),
3840 std::chrono::microseconds::zero(), 0);
3841 }
3842 Close();
3843 }
3844
3845 // Param 0: If true, set read_options.deadline
3846 // Param 1: If true, set read_options.io_timeout
3847 INSTANTIATE_TEST_CASE_P(DBBasicTestDeadline, DBBasicTestDeadline,
3848 ::testing::Values(std::make_tuple(true, false),
3849 std::make_tuple(false, true),
3850 std::make_tuple(true, true)));
3851 } // namespace ROCKSDB_NAMESPACE
3852
3853 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
3854 extern "C" {
3855 void RegisterCustomObjects(int argc, char** argv);
3856 }
3857 #else
RegisterCustomObjects(int,char **)3858 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
3859 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
3860
main(int argc,char ** argv)3861 int main(int argc, char** argv) {
3862 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
3863 ::testing::InitGoogleTest(&argc, argv);
3864 RegisterCustomObjects(argc, argv);
3865 return RUN_ALL_TESTS();
3866 }
3867