1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "leveldb/db.h"
6 #include "leveldb/filter_policy.h"
7 #include "db/db_impl.h"
8 #include "db/filename.h"
9 #include "db/version_set.h"
10 #include "db/write_batch_internal.h"
11 #include "leveldb/cache.h"
12 #include "leveldb/env.h"
13 #include "leveldb/table.h"
14 #include "util/hash.h"
15 #include "util/logging.h"
16 #include "util/mutexlock.h"
17 #include "util/testharness.h"
18 #include "util/testutil.h"
19 
20 namespace leveldb {
21 
RandomString(Random * rnd,int len)22 static std::string RandomString(Random* rnd, int len) {
23   std::string r;
24   test::RandomString(rnd, len, &r);
25   return r;
26 }
27 
28 namespace {
29 class AtomicCounter {
30  private:
31   port::Mutex mu_;
32   int count_;
33  public:
AtomicCounter()34   AtomicCounter() : count_(0) { }
Increment()35   void Increment() {
36     IncrementBy(1);
37   }
IncrementBy(int count)38   void IncrementBy(int count) {
39     MutexLock l(&mu_);
40     count_ += count;
41   }
Read()42   int Read() {
43     MutexLock l(&mu_);
44     return count_;
45   }
Reset()46   void Reset() {
47     MutexLock l(&mu_);
48     count_ = 0;
49   }
50 };
51 
DelayMilliseconds(int millis)52 void DelayMilliseconds(int millis) {
53   Env::Default()->SleepForMicroseconds(millis * 1000);
54 }
55 }
56 
57 // Special Env used to delay background operations
58 class SpecialEnv : public EnvWrapper {
59  public:
60   // sstable/log Sync() calls are blocked while this pointer is non-NULL.
61   port::AtomicPointer delay_data_sync_;
62 
63   // sstable/log Sync() calls return an error.
64   port::AtomicPointer data_sync_error_;
65 
66   // Simulate no-space errors while this pointer is non-NULL.
67   port::AtomicPointer no_space_;
68 
69   // Simulate non-writable file system while this pointer is non-NULL
70   port::AtomicPointer non_writable_;
71 
72   // Force sync of manifest files to fail while this pointer is non-NULL
73   port::AtomicPointer manifest_sync_error_;
74 
75   // Force write to manifest files to fail while this pointer is non-NULL
76   port::AtomicPointer manifest_write_error_;
77 
78   bool count_random_reads_;
79   AtomicCounter random_read_counter_;
80 
SpecialEnv(Env * base)81   explicit SpecialEnv(Env* base) : EnvWrapper(base) {
82     delay_data_sync_.Release_Store(NULL);
83     data_sync_error_.Release_Store(NULL);
84     no_space_.Release_Store(NULL);
85     non_writable_.Release_Store(NULL);
86     count_random_reads_ = false;
87     manifest_sync_error_.Release_Store(NULL);
88     manifest_write_error_.Release_Store(NULL);
89   }
90 
NewWritableFile(const std::string & f,WritableFile ** r)91   Status NewWritableFile(const std::string& f, WritableFile** r) {
92     class DataFile : public WritableFile {
93      private:
94       SpecialEnv* env_;
95       WritableFile* base_;
96 
97      public:
98       DataFile(SpecialEnv* env, WritableFile* base)
99           : env_(env),
100             base_(base) {
101       }
102       ~DataFile() { delete base_; }
103       Status Append(const Slice& data) {
104         if (env_->no_space_.Acquire_Load() != NULL) {
105           // Drop writes on the floor
106           return Status::OK();
107         } else {
108           return base_->Append(data);
109         }
110       }
111       Status Close() { return base_->Close(); }
112       Status Flush() { return base_->Flush(); }
113       Status Sync() {
114         if (env_->data_sync_error_.Acquire_Load() != NULL) {
115           return Status::IOError("simulated data sync error");
116         }
117         while (env_->delay_data_sync_.Acquire_Load() != NULL) {
118           DelayMilliseconds(100);
119         }
120         return base_->Sync();
121       }
122     };
123     class ManifestFile : public WritableFile {
124      private:
125       SpecialEnv* env_;
126       WritableFile* base_;
127      public:
128       ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
129       ~ManifestFile() { delete base_; }
130       Status Append(const Slice& data) {
131         if (env_->manifest_write_error_.Acquire_Load() != NULL) {
132           return Status::IOError("simulated writer error");
133         } else {
134           return base_->Append(data);
135         }
136       }
137       Status Close() { return base_->Close(); }
138       Status Flush() { return base_->Flush(); }
139       Status Sync() {
140         if (env_->manifest_sync_error_.Acquire_Load() != NULL) {
141           return Status::IOError("simulated sync error");
142         } else {
143           return base_->Sync();
144         }
145       }
146     };
147 
148     if (non_writable_.Acquire_Load() != NULL) {
149       return Status::IOError("simulated write error");
150     }
151 
152     Status s = target()->NewWritableFile(f, r);
153     if (s.ok()) {
154       if (strstr(f.c_str(), ".ldb") != NULL ||
155           strstr(f.c_str(), ".log") != NULL) {
156         *r = new DataFile(this, *r);
157       } else if (strstr(f.c_str(), "MANIFEST") != NULL) {
158         *r = new ManifestFile(this, *r);
159       }
160     }
161     return s;
162   }
163 
NewRandomAccessFile(const std::string & f,RandomAccessFile ** r)164   Status NewRandomAccessFile(const std::string& f, RandomAccessFile** r) {
165     class CountingFile : public RandomAccessFile {
166      private:
167       RandomAccessFile* target_;
168       AtomicCounter* counter_;
169      public:
170       CountingFile(RandomAccessFile* target, AtomicCounter* counter)
171           : target_(target), counter_(counter) {
172       }
173       virtual ~CountingFile() { delete target_; }
174       virtual Status Read(uint64_t offset, size_t n, Slice* result,
175                           char* scratch) const {
176         counter_->Increment();
177         return target_->Read(offset, n, result, scratch);
178       }
179     };
180 
181     Status s = target()->NewRandomAccessFile(f, r);
182     if (s.ok() && count_random_reads_) {
183       *r = new CountingFile(*r, &random_read_counter_);
184     }
185     return s;
186   }
187 };
188 
189 class DBTest {
190  private:
191   const FilterPolicy* filter_policy_;
192 
193   // Sequence of option configurations to try
194   enum OptionConfig {
195     kDefault,
196     kReuse,
197     kFilter,
198     kUncompressed,
199     kEnd
200   };
201   int option_config_;
202 
203  public:
204   std::string dbname_;
205   SpecialEnv* env_;
206   DB* db_;
207 
208   Options last_options_;
209 
DBTest()210   DBTest() : option_config_(kDefault),
211              env_(new SpecialEnv(Env::Default())) {
212     filter_policy_ = NewBloomFilterPolicy(10);
213     dbname_ = test::TmpDir() + "/db_test";
214     DestroyDB(dbname_, Options());
215     db_ = NULL;
216     Reopen();
217   }
218 
~DBTest()219   ~DBTest() {
220     delete db_;
221     DestroyDB(dbname_, Options());
222     delete env_;
223     delete filter_policy_;
224   }
225 
226   // Switch to a fresh database with the next option configuration to
227   // test.  Return false if there are no more configurations to test.
ChangeOptions()228   bool ChangeOptions() {
229     option_config_++;
230     if (option_config_ >= kEnd) {
231       return false;
232     } else {
233       DestroyAndReopen();
234       return true;
235     }
236   }
237 
238   // Return the current option configuration.
CurrentOptions()239   Options CurrentOptions() {
240     Options options;
241     options.reuse_logs = false;
242     switch (option_config_) {
243       case kReuse:
244         options.reuse_logs = true;
245         break;
246       case kFilter:
247         options.filter_policy = filter_policy_;
248         break;
249       case kUncompressed:
250         options.compression = kNoCompression;
251         break;
252       default:
253         break;
254     }
255     return options;
256   }
257 
dbfull()258   DBImpl* dbfull() {
259     return reinterpret_cast<DBImpl*>(db_);
260   }
261 
Reopen(Options * options=NULL)262   void Reopen(Options* options = NULL) {
263     ASSERT_OK(TryReopen(options));
264   }
265 
Close()266   void Close() {
267     delete db_;
268     db_ = NULL;
269   }
270 
DestroyAndReopen(Options * options=NULL)271   void DestroyAndReopen(Options* options = NULL) {
272     delete db_;
273     db_ = NULL;
274     DestroyDB(dbname_, Options());
275     ASSERT_OK(TryReopen(options));
276   }
277 
TryReopen(Options * options)278   Status TryReopen(Options* options) {
279     delete db_;
280     db_ = NULL;
281     Options opts;
282     if (options != NULL) {
283       opts = *options;
284     } else {
285       opts = CurrentOptions();
286       opts.create_if_missing = true;
287     }
288     last_options_ = opts;
289 
290     return DB::Open(opts, dbname_, &db_);
291   }
292 
Put(const std::string & k,const std::string & v)293   Status Put(const std::string& k, const std::string& v) {
294     return db_->Put(WriteOptions(), k, v);
295   }
296 
Delete(const std::string & k)297   Status Delete(const std::string& k) {
298     return db_->Delete(WriteOptions(), k);
299   }
300 
Get(const std::string & k,const Snapshot * snapshot=NULL)301   std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
302     ReadOptions options;
303     options.snapshot = snapshot;
304     std::string result;
305     Status s = db_->Get(options, k, &result);
306     if (s.IsNotFound()) {
307       result = "NOT_FOUND";
308     } else if (!s.ok()) {
309       result = s.ToString();
310     }
311     return result;
312   }
313 
314   // Return a string that contains all key,value pairs in order,
315   // formatted like "(k1->v1)(k2->v2)".
Contents()316   std::string Contents() {
317     std::vector<std::string> forward;
318     std::string result;
319     Iterator* iter = db_->NewIterator(ReadOptions());
320     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
321       std::string s = IterStatus(iter);
322       result.push_back('(');
323       result.append(s);
324       result.push_back(')');
325       forward.push_back(s);
326     }
327 
328     // Check reverse iteration results are the reverse of forward results
329     size_t matched = 0;
330     for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
331       ASSERT_LT(matched, forward.size());
332       ASSERT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]);
333       matched++;
334     }
335     ASSERT_EQ(matched, forward.size());
336 
337     delete iter;
338     return result;
339   }
340 
AllEntriesFor(const Slice & user_key)341   std::string AllEntriesFor(const Slice& user_key) {
342     Iterator* iter = dbfull()->TEST_NewInternalIterator();
343     InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
344     iter->Seek(target.Encode());
345     std::string result;
346     if (!iter->status().ok()) {
347       result = iter->status().ToString();
348     } else {
349       result = "[ ";
350       bool first = true;
351       while (iter->Valid()) {
352         ParsedInternalKey ikey;
353         if (!ParseInternalKey(iter->key(), &ikey)) {
354           result += "CORRUPTED";
355         } else {
356           if (last_options_.comparator->Compare(ikey.user_key, user_key) != 0) {
357             break;
358           }
359           if (!first) {
360             result += ", ";
361           }
362           first = false;
363           switch (ikey.type) {
364             case kTypeValue:
365               result += iter->value().ToString();
366               break;
367             case kTypeDeletion:
368               result += "DEL";
369               break;
370           }
371         }
372         iter->Next();
373       }
374       if (!first) {
375         result += " ";
376       }
377       result += "]";
378     }
379     delete iter;
380     return result;
381   }
382 
NumTableFilesAtLevel(int level)383   int NumTableFilesAtLevel(int level) {
384     std::string property;
385     ASSERT_TRUE(
386         db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level),
387                          &property));
388     return atoi(property.c_str());
389   }
390 
TotalTableFiles()391   int TotalTableFiles() {
392     int result = 0;
393     for (int level = 0; level < config::kNumLevels; level++) {
394       result += NumTableFilesAtLevel(level);
395     }
396     return result;
397   }
398 
399   // Return spread of files per level
FilesPerLevel()400   std::string FilesPerLevel() {
401     std::string result;
402     int last_non_zero_offset = 0;
403     for (int level = 0; level < config::kNumLevels; level++) {
404       int f = NumTableFilesAtLevel(level);
405       char buf[100];
406       snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
407       result += buf;
408       if (f > 0) {
409         last_non_zero_offset = result.size();
410       }
411     }
412     result.resize(last_non_zero_offset);
413     return result;
414   }
415 
CountFiles()416   int CountFiles() {
417     std::vector<std::string> files;
418     env_->GetChildren(dbname_, &files);
419     return static_cast<int>(files.size());
420   }
421 
Size(const Slice & start,const Slice & limit)422   uint64_t Size(const Slice& start, const Slice& limit) {
423     Range r(start, limit);
424     uint64_t size;
425     db_->GetApproximateSizes(&r, 1, &size);
426     return size;
427   }
428 
Compact(const Slice & start,const Slice & limit)429   void Compact(const Slice& start, const Slice& limit) {
430     db_->CompactRange(&start, &limit);
431   }
432 
433   // Do n memtable compactions, each of which produces an sstable
434   // covering the range [small,large].
MakeTables(int n,const std::string & small,const std::string & large)435   void MakeTables(int n, const std::string& small, const std::string& large) {
436     for (int i = 0; i < n; i++) {
437       Put(small, "begin");
438       Put(large, "end");
439       dbfull()->TEST_CompactMemTable();
440     }
441   }
442 
443   // Prevent pushing of new sstables into deeper levels by adding
444   // tables that cover a specified range to all levels.
FillLevels(const std::string & smallest,const std::string & largest)445   void FillLevels(const std::string& smallest, const std::string& largest) {
446     MakeTables(config::kNumLevels, smallest, largest);
447   }
448 
DumpFileCounts(const char * label)449   void DumpFileCounts(const char* label) {
450     fprintf(stderr, "---\n%s:\n", label);
451     fprintf(stderr, "maxoverlap: %lld\n",
452             static_cast<long long>(
453                 dbfull()->TEST_MaxNextLevelOverlappingBytes()));
454     for (int level = 0; level < config::kNumLevels; level++) {
455       int num = NumTableFilesAtLevel(level);
456       if (num > 0) {
457         fprintf(stderr, "  level %3d : %d files\n", level, num);
458       }
459     }
460   }
461 
DumpSSTableList()462   std::string DumpSSTableList() {
463     std::string property;
464     db_->GetProperty("leveldb.sstables", &property);
465     return property;
466   }
467 
IterStatus(Iterator * iter)468   std::string IterStatus(Iterator* iter) {
469     std::string result;
470     if (iter->Valid()) {
471       result = iter->key().ToString() + "->" + iter->value().ToString();
472     } else {
473       result = "(invalid)";
474     }
475     return result;
476   }
477 
DeleteAnSSTFile()478   bool DeleteAnSSTFile() {
479     std::vector<std::string> filenames;
480     ASSERT_OK(env_->GetChildren(dbname_, &filenames));
481     uint64_t number;
482     FileType type;
483     for (size_t i = 0; i < filenames.size(); i++) {
484       if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
485         ASSERT_OK(env_->DeleteFile(TableFileName(dbname_, number)));
486         return true;
487       }
488     }
489     return false;
490   }
491 
492   // Returns number of files renamed.
RenameLDBToSST()493   int RenameLDBToSST() {
494     std::vector<std::string> filenames;
495     ASSERT_OK(env_->GetChildren(dbname_, &filenames));
496     uint64_t number;
497     FileType type;
498     int files_renamed = 0;
499     for (size_t i = 0; i < filenames.size(); i++) {
500       if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
501         const std::string from = TableFileName(dbname_, number);
502         const std::string to = SSTTableFileName(dbname_, number);
503         ASSERT_OK(env_->RenameFile(from, to));
504         files_renamed++;
505       }
506     }
507     return files_renamed;
508   }
509 };
510 
TEST(DBTest,Empty)511 TEST(DBTest, Empty) {
512   do {
513     ASSERT_TRUE(db_ != NULL);
514     ASSERT_EQ("NOT_FOUND", Get("foo"));
515   } while (ChangeOptions());
516 }
517 
TEST(DBTest,ReadWrite)518 TEST(DBTest, ReadWrite) {
519   do {
520     ASSERT_OK(Put("foo", "v1"));
521     ASSERT_EQ("v1", Get("foo"));
522     ASSERT_OK(Put("bar", "v2"));
523     ASSERT_OK(Put("foo", "v3"));
524     ASSERT_EQ("v3", Get("foo"));
525     ASSERT_EQ("v2", Get("bar"));
526   } while (ChangeOptions());
527 }
528 
TEST(DBTest,PutDeleteGet)529 TEST(DBTest, PutDeleteGet) {
530   do {
531     ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
532     ASSERT_EQ("v1", Get("foo"));
533     ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
534     ASSERT_EQ("v2", Get("foo"));
535     ASSERT_OK(db_->Delete(WriteOptions(), "foo"));
536     ASSERT_EQ("NOT_FOUND", Get("foo"));
537   } while (ChangeOptions());
538 }
539 
TEST(DBTest,GetFromImmutableLayer)540 TEST(DBTest, GetFromImmutableLayer) {
541   do {
542     Options options = CurrentOptions();
543     options.env = env_;
544     options.write_buffer_size = 100000;  // Small write buffer
545     Reopen(&options);
546 
547     ASSERT_OK(Put("foo", "v1"));
548     ASSERT_EQ("v1", Get("foo"));
549 
550     env_->delay_data_sync_.Release_Store(env_);      // Block sync calls
551     Put("k1", std::string(100000, 'x'));             // Fill memtable
552     Put("k2", std::string(100000, 'y'));             // Trigger compaction
553     ASSERT_EQ("v1", Get("foo"));
554     env_->delay_data_sync_.Release_Store(NULL);      // Release sync calls
555   } while (ChangeOptions());
556 }
557 
TEST(DBTest,GetFromVersions)558 TEST(DBTest, GetFromVersions) {
559   do {
560     ASSERT_OK(Put("foo", "v1"));
561     dbfull()->TEST_CompactMemTable();
562     ASSERT_EQ("v1", Get("foo"));
563   } while (ChangeOptions());
564 }
565 
TEST(DBTest,GetMemUsage)566 TEST(DBTest, GetMemUsage) {
567   do {
568     ASSERT_OK(Put("foo", "v1"));
569     std::string val;
570     ASSERT_TRUE(db_->GetProperty("leveldb.approximate-memory-usage", &val));
571     int mem_usage = atoi(val.c_str());
572     ASSERT_GT(mem_usage, 0);
573     ASSERT_LT(mem_usage, 5*1024*1024);
574   } while (ChangeOptions());
575 }
576 
TEST(DBTest,GetSnapshot)577 TEST(DBTest, GetSnapshot) {
578   do {
579     // Try with both a short key and a long key
580     for (int i = 0; i < 2; i++) {
581       std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
582       ASSERT_OK(Put(key, "v1"));
583       const Snapshot* s1 = db_->GetSnapshot();
584       ASSERT_OK(Put(key, "v2"));
585       ASSERT_EQ("v2", Get(key));
586       ASSERT_EQ("v1", Get(key, s1));
587       dbfull()->TEST_CompactMemTable();
588       ASSERT_EQ("v2", Get(key));
589       ASSERT_EQ("v1", Get(key, s1));
590       db_->ReleaseSnapshot(s1);
591     }
592   } while (ChangeOptions());
593 }
594 
TEST(DBTest,GetLevel0Ordering)595 TEST(DBTest, GetLevel0Ordering) {
596   do {
597     // Check that we process level-0 files in correct order.  The code
598     // below generates two level-0 files where the earlier one comes
599     // before the later one in the level-0 file list since the earlier
600     // one has a smaller "smallest" key.
601     ASSERT_OK(Put("bar", "b"));
602     ASSERT_OK(Put("foo", "v1"));
603     dbfull()->TEST_CompactMemTable();
604     ASSERT_OK(Put("foo", "v2"));
605     dbfull()->TEST_CompactMemTable();
606     ASSERT_EQ("v2", Get("foo"));
607   } while (ChangeOptions());
608 }
609 
TEST(DBTest,GetOrderedByLevels)610 TEST(DBTest, GetOrderedByLevels) {
611   do {
612     ASSERT_OK(Put("foo", "v1"));
613     Compact("a", "z");
614     ASSERT_EQ("v1", Get("foo"));
615     ASSERT_OK(Put("foo", "v2"));
616     ASSERT_EQ("v2", Get("foo"));
617     dbfull()->TEST_CompactMemTable();
618     ASSERT_EQ("v2", Get("foo"));
619   } while (ChangeOptions());
620 }
621 
TEST(DBTest,GetPicksCorrectFile)622 TEST(DBTest, GetPicksCorrectFile) {
623   do {
624     // Arrange to have multiple files in a non-level-0 level.
625     ASSERT_OK(Put("a", "va"));
626     Compact("a", "b");
627     ASSERT_OK(Put("x", "vx"));
628     Compact("x", "y");
629     ASSERT_OK(Put("f", "vf"));
630     Compact("f", "g");
631     ASSERT_EQ("va", Get("a"));
632     ASSERT_EQ("vf", Get("f"));
633     ASSERT_EQ("vx", Get("x"));
634   } while (ChangeOptions());
635 }
636 
TEST(DBTest,GetEncountersEmptyLevel)637 TEST(DBTest, GetEncountersEmptyLevel) {
638   do {
639     // Arrange for the following to happen:
640     //   * sstable A in level 0
641     //   * nothing in level 1
642     //   * sstable B in level 2
643     // Then do enough Get() calls to arrange for an automatic compaction
644     // of sstable A.  A bug would cause the compaction to be marked as
645     // occurring at level 1 (instead of the correct level 0).
646 
647     // Step 1: First place sstables in levels 0 and 2
648     int compaction_count = 0;
649     while (NumTableFilesAtLevel(0) == 0 ||
650            NumTableFilesAtLevel(2) == 0) {
651       ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2";
652       compaction_count++;
653       Put("a", "begin");
654       Put("z", "end");
655       dbfull()->TEST_CompactMemTable();
656     }
657 
658     // Step 2: clear level 1 if necessary.
659     dbfull()->TEST_CompactRange(1, NULL, NULL);
660     ASSERT_EQ(NumTableFilesAtLevel(0), 1);
661     ASSERT_EQ(NumTableFilesAtLevel(1), 0);
662     ASSERT_EQ(NumTableFilesAtLevel(2), 1);
663 
664     // Step 3: read a bunch of times
665     for (int i = 0; i < 1000; i++) {
666       ASSERT_EQ("NOT_FOUND", Get("missing"));
667     }
668 
669     // Step 4: Wait for compaction to finish
670     DelayMilliseconds(1000);
671 
672     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
673   } while (ChangeOptions());
674 }
675 
TEST(DBTest,IterEmpty)676 TEST(DBTest, IterEmpty) {
677   Iterator* iter = db_->NewIterator(ReadOptions());
678 
679   iter->SeekToFirst();
680   ASSERT_EQ(IterStatus(iter), "(invalid)");
681 
682   iter->SeekToLast();
683   ASSERT_EQ(IterStatus(iter), "(invalid)");
684 
685   iter->Seek("foo");
686   ASSERT_EQ(IterStatus(iter), "(invalid)");
687 
688   delete iter;
689 }
690 
TEST(DBTest,IterSingle)691 TEST(DBTest, IterSingle) {
692   ASSERT_OK(Put("a", "va"));
693   Iterator* iter = db_->NewIterator(ReadOptions());
694 
695   iter->SeekToFirst();
696   ASSERT_EQ(IterStatus(iter), "a->va");
697   iter->Next();
698   ASSERT_EQ(IterStatus(iter), "(invalid)");
699   iter->SeekToFirst();
700   ASSERT_EQ(IterStatus(iter), "a->va");
701   iter->Prev();
702   ASSERT_EQ(IterStatus(iter), "(invalid)");
703 
704   iter->SeekToLast();
705   ASSERT_EQ(IterStatus(iter), "a->va");
706   iter->Next();
707   ASSERT_EQ(IterStatus(iter), "(invalid)");
708   iter->SeekToLast();
709   ASSERT_EQ(IterStatus(iter), "a->va");
710   iter->Prev();
711   ASSERT_EQ(IterStatus(iter), "(invalid)");
712 
713   iter->Seek("");
714   ASSERT_EQ(IterStatus(iter), "a->va");
715   iter->Next();
716   ASSERT_EQ(IterStatus(iter), "(invalid)");
717 
718   iter->Seek("a");
719   ASSERT_EQ(IterStatus(iter), "a->va");
720   iter->Next();
721   ASSERT_EQ(IterStatus(iter), "(invalid)");
722 
723   iter->Seek("b");
724   ASSERT_EQ(IterStatus(iter), "(invalid)");
725 
726   delete iter;
727 }
728 
TEST(DBTest,IterMulti)729 TEST(DBTest, IterMulti) {
730   ASSERT_OK(Put("a", "va"));
731   ASSERT_OK(Put("b", "vb"));
732   ASSERT_OK(Put("c", "vc"));
733   Iterator* iter = db_->NewIterator(ReadOptions());
734 
735   iter->SeekToFirst();
736   ASSERT_EQ(IterStatus(iter), "a->va");
737   iter->Next();
738   ASSERT_EQ(IterStatus(iter), "b->vb");
739   iter->Next();
740   ASSERT_EQ(IterStatus(iter), "c->vc");
741   iter->Next();
742   ASSERT_EQ(IterStatus(iter), "(invalid)");
743   iter->SeekToFirst();
744   ASSERT_EQ(IterStatus(iter), "a->va");
745   iter->Prev();
746   ASSERT_EQ(IterStatus(iter), "(invalid)");
747 
748   iter->SeekToLast();
749   ASSERT_EQ(IterStatus(iter), "c->vc");
750   iter->Prev();
751   ASSERT_EQ(IterStatus(iter), "b->vb");
752   iter->Prev();
753   ASSERT_EQ(IterStatus(iter), "a->va");
754   iter->Prev();
755   ASSERT_EQ(IterStatus(iter), "(invalid)");
756   iter->SeekToLast();
757   ASSERT_EQ(IterStatus(iter), "c->vc");
758   iter->Next();
759   ASSERT_EQ(IterStatus(iter), "(invalid)");
760 
761   iter->Seek("");
762   ASSERT_EQ(IterStatus(iter), "a->va");
763   iter->Seek("a");
764   ASSERT_EQ(IterStatus(iter), "a->va");
765   iter->Seek("ax");
766   ASSERT_EQ(IterStatus(iter), "b->vb");
767   iter->Seek("b");
768   ASSERT_EQ(IterStatus(iter), "b->vb");
769   iter->Seek("z");
770   ASSERT_EQ(IterStatus(iter), "(invalid)");
771 
772   // Switch from reverse to forward
773   iter->SeekToLast();
774   iter->Prev();
775   iter->Prev();
776   iter->Next();
777   ASSERT_EQ(IterStatus(iter), "b->vb");
778 
779   // Switch from forward to reverse
780   iter->SeekToFirst();
781   iter->Next();
782   iter->Next();
783   iter->Prev();
784   ASSERT_EQ(IterStatus(iter), "b->vb");
785 
786   // Make sure iter stays at snapshot
787   ASSERT_OK(Put("a",  "va2"));
788   ASSERT_OK(Put("a2", "va3"));
789   ASSERT_OK(Put("b",  "vb2"));
790   ASSERT_OK(Put("c",  "vc2"));
791   ASSERT_OK(Delete("b"));
792   iter->SeekToFirst();
793   ASSERT_EQ(IterStatus(iter), "a->va");
794   iter->Next();
795   ASSERT_EQ(IterStatus(iter), "b->vb");
796   iter->Next();
797   ASSERT_EQ(IterStatus(iter), "c->vc");
798   iter->Next();
799   ASSERT_EQ(IterStatus(iter), "(invalid)");
800   iter->SeekToLast();
801   ASSERT_EQ(IterStatus(iter), "c->vc");
802   iter->Prev();
803   ASSERT_EQ(IterStatus(iter), "b->vb");
804   iter->Prev();
805   ASSERT_EQ(IterStatus(iter), "a->va");
806   iter->Prev();
807   ASSERT_EQ(IterStatus(iter), "(invalid)");
808 
809   delete iter;
810 }
811 
TEST(DBTest,IterSmallAndLargeMix)812 TEST(DBTest, IterSmallAndLargeMix) {
813   ASSERT_OK(Put("a", "va"));
814   ASSERT_OK(Put("b", std::string(100000, 'b')));
815   ASSERT_OK(Put("c", "vc"));
816   ASSERT_OK(Put("d", std::string(100000, 'd')));
817   ASSERT_OK(Put("e", std::string(100000, 'e')));
818 
819   Iterator* iter = db_->NewIterator(ReadOptions());
820 
821   iter->SeekToFirst();
822   ASSERT_EQ(IterStatus(iter), "a->va");
823   iter->Next();
824   ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
825   iter->Next();
826   ASSERT_EQ(IterStatus(iter), "c->vc");
827   iter->Next();
828   ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
829   iter->Next();
830   ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
831   iter->Next();
832   ASSERT_EQ(IterStatus(iter), "(invalid)");
833 
834   iter->SeekToLast();
835   ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
836   iter->Prev();
837   ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
838   iter->Prev();
839   ASSERT_EQ(IterStatus(iter), "c->vc");
840   iter->Prev();
841   ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
842   iter->Prev();
843   ASSERT_EQ(IterStatus(iter), "a->va");
844   iter->Prev();
845   ASSERT_EQ(IterStatus(iter), "(invalid)");
846 
847   delete iter;
848 }
849 
TEST(DBTest,IterMultiWithDelete)850 TEST(DBTest, IterMultiWithDelete) {
851   do {
852     ASSERT_OK(Put("a", "va"));
853     ASSERT_OK(Put("b", "vb"));
854     ASSERT_OK(Put("c", "vc"));
855     ASSERT_OK(Delete("b"));
856     ASSERT_EQ("NOT_FOUND", Get("b"));
857 
858     Iterator* iter = db_->NewIterator(ReadOptions());
859     iter->Seek("c");
860     ASSERT_EQ(IterStatus(iter), "c->vc");
861     iter->Prev();
862     ASSERT_EQ(IterStatus(iter), "a->va");
863     delete iter;
864   } while (ChangeOptions());
865 }
866 
TEST(DBTest,Recover)867 TEST(DBTest, Recover) {
868   do {
869     ASSERT_OK(Put("foo", "v1"));
870     ASSERT_OK(Put("baz", "v5"));
871 
872     Reopen();
873     ASSERT_EQ("v1", Get("foo"));
874 
875     ASSERT_EQ("v1", Get("foo"));
876     ASSERT_EQ("v5", Get("baz"));
877     ASSERT_OK(Put("bar", "v2"));
878     ASSERT_OK(Put("foo", "v3"));
879 
880     Reopen();
881     ASSERT_EQ("v3", Get("foo"));
882     ASSERT_OK(Put("foo", "v4"));
883     ASSERT_EQ("v4", Get("foo"));
884     ASSERT_EQ("v2", Get("bar"));
885     ASSERT_EQ("v5", Get("baz"));
886   } while (ChangeOptions());
887 }
888 
TEST(DBTest,RecoveryWithEmptyLog)889 TEST(DBTest, RecoveryWithEmptyLog) {
890   do {
891     ASSERT_OK(Put("foo", "v1"));
892     ASSERT_OK(Put("foo", "v2"));
893     Reopen();
894     Reopen();
895     ASSERT_OK(Put("foo", "v3"));
896     Reopen();
897     ASSERT_EQ("v3", Get("foo"));
898   } while (ChangeOptions());
899 }
900 
901 // Check that writes done during a memtable compaction are recovered
902 // if the database is shutdown during the memtable compaction.
TEST(DBTest,RecoverDuringMemtableCompaction)903 TEST(DBTest, RecoverDuringMemtableCompaction) {
904   do {
905     Options options = CurrentOptions();
906     options.env = env_;
907     options.write_buffer_size = 1000000;
908     Reopen(&options);
909 
910     // Trigger a long memtable compaction and reopen the database during it
911     ASSERT_OK(Put("foo", "v1"));                         // Goes to 1st log file
912     ASSERT_OK(Put("big1", std::string(10000000, 'x')));  // Fills memtable
913     ASSERT_OK(Put("big2", std::string(1000, 'y')));      // Triggers compaction
914     ASSERT_OK(Put("bar", "v2"));                         // Goes to new log file
915 
916     Reopen(&options);
917     ASSERT_EQ("v1", Get("foo"));
918     ASSERT_EQ("v2", Get("bar"));
919     ASSERT_EQ(std::string(10000000, 'x'), Get("big1"));
920     ASSERT_EQ(std::string(1000, 'y'), Get("big2"));
921   } while (ChangeOptions());
922 }
923 
Key(int i)924 static std::string Key(int i) {
925   char buf[100];
926   snprintf(buf, sizeof(buf), "key%06d", i);
927   return std::string(buf);
928 }
929 
TEST(DBTest,MinorCompactionsHappen)930 TEST(DBTest, MinorCompactionsHappen) {
931   Options options = CurrentOptions();
932   options.write_buffer_size = 10000;
933   Reopen(&options);
934 
935   const int N = 500;
936 
937   int starting_num_tables = TotalTableFiles();
938   for (int i = 0; i < N; i++) {
939     ASSERT_OK(Put(Key(i), Key(i) + std::string(1000, 'v')));
940   }
941   int ending_num_tables = TotalTableFiles();
942   ASSERT_GT(ending_num_tables, starting_num_tables);
943 
944   for (int i = 0; i < N; i++) {
945     ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
946   }
947 
948   Reopen();
949 
950   for (int i = 0; i < N; i++) {
951     ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
952   }
953 }
954 
TEST(DBTest,RecoverWithLargeLog)955 TEST(DBTest, RecoverWithLargeLog) {
956   {
957     Options options = CurrentOptions();
958     Reopen(&options);
959     ASSERT_OK(Put("big1", std::string(200000, '1')));
960     ASSERT_OK(Put("big2", std::string(200000, '2')));
961     ASSERT_OK(Put("small3", std::string(10, '3')));
962     ASSERT_OK(Put("small4", std::string(10, '4')));
963     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
964   }
965 
966   // Make sure that if we re-open with a small write buffer size that
967   // we flush table files in the middle of a large log file.
968   Options options = CurrentOptions();
969   options.write_buffer_size = 100000;
970   Reopen(&options);
971   ASSERT_EQ(NumTableFilesAtLevel(0), 3);
972   ASSERT_EQ(std::string(200000, '1'), Get("big1"));
973   ASSERT_EQ(std::string(200000, '2'), Get("big2"));
974   ASSERT_EQ(std::string(10, '3'), Get("small3"));
975   ASSERT_EQ(std::string(10, '4'), Get("small4"));
976   ASSERT_GT(NumTableFilesAtLevel(0), 1);
977 }
978 
TEST(DBTest,CompactionsGenerateMultipleFiles)979 TEST(DBTest, CompactionsGenerateMultipleFiles) {
980   Options options = CurrentOptions();
981   options.write_buffer_size = 100000000;        // Large write buffer
982   Reopen(&options);
983 
984   Random rnd(301);
985 
986   // Write 8MB (80 values, each 100K)
987   ASSERT_EQ(NumTableFilesAtLevel(0), 0);
988   std::vector<std::string> values;
989   for (int i = 0; i < 80; i++) {
990     values.push_back(RandomString(&rnd, 100000));
991     ASSERT_OK(Put(Key(i), values[i]));
992   }
993 
994   // Reopening moves updates to level-0
995   Reopen(&options);
996   dbfull()->TEST_CompactRange(0, NULL, NULL);
997 
998   ASSERT_EQ(NumTableFilesAtLevel(0), 0);
999   ASSERT_GT(NumTableFilesAtLevel(1), 1);
1000   for (int i = 0; i < 80; i++) {
1001     ASSERT_EQ(Get(Key(i)), values[i]);
1002   }
1003 }
1004 
TEST(DBTest,RepeatedWritesToSameKey)1005 TEST(DBTest, RepeatedWritesToSameKey) {
1006   Options options = CurrentOptions();
1007   options.env = env_;
1008   options.write_buffer_size = 100000;  // Small write buffer
1009   Reopen(&options);
1010 
1011   // We must have at most one file per level except for level-0,
1012   // which may have up to kL0_StopWritesTrigger files.
1013   const int kMaxFiles = config::kNumLevels + config::kL0_StopWritesTrigger;
1014 
1015   Random rnd(301);
1016   std::string value = RandomString(&rnd, 2 * options.write_buffer_size);
1017   for (int i = 0; i < 5 * kMaxFiles; i++) {
1018     Put("key", value);
1019     ASSERT_LE(TotalTableFiles(), kMaxFiles);
1020     fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
1021   }
1022 }
1023 
TEST(DBTest,SparseMerge)1024 TEST(DBTest, SparseMerge) {
1025   Options options = CurrentOptions();
1026   options.compression = kNoCompression;
1027   Reopen(&options);
1028 
1029   FillLevels("A", "Z");
1030 
1031   // Suppose there is:
1032   //    small amount of data with prefix A
1033   //    large amount of data with prefix B
1034   //    small amount of data with prefix C
1035   // and that recent updates have made small changes to all three prefixes.
1036   // Check that we do not do a compaction that merges all of B in one shot.
1037   const std::string value(1000, 'x');
1038   Put("A", "va");
1039   // Write approximately 100MB of "B" values
1040   for (int i = 0; i < 100000; i++) {
1041     char key[100];
1042     snprintf(key, sizeof(key), "B%010d", i);
1043     Put(key, value);
1044   }
1045   Put("C", "vc");
1046   dbfull()->TEST_CompactMemTable();
1047   dbfull()->TEST_CompactRange(0, NULL, NULL);
1048 
1049   // Make sparse update
1050   Put("A",    "va2");
1051   Put("B100", "bvalue2");
1052   Put("C",    "vc2");
1053   dbfull()->TEST_CompactMemTable();
1054 
1055   // Compactions should not cause us to create a situation where
1056   // a file overlaps too much data at the next level.
1057   ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
1058   dbfull()->TEST_CompactRange(0, NULL, NULL);
1059   ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
1060   dbfull()->TEST_CompactRange(1, NULL, NULL);
1061   ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
1062 }
1063 
Between(uint64_t val,uint64_t low,uint64_t high)1064 static bool Between(uint64_t val, uint64_t low, uint64_t high) {
1065   bool result = (val >= low) && (val <= high);
1066   if (!result) {
1067     fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
1068             (unsigned long long)(val),
1069             (unsigned long long)(low),
1070             (unsigned long long)(high));
1071   }
1072   return result;
1073 }
1074 
TEST(DBTest,ApproximateSizes)1075 TEST(DBTest, ApproximateSizes) {
1076   do {
1077     Options options = CurrentOptions();
1078     options.write_buffer_size = 100000000;        // Large write buffer
1079     options.compression = kNoCompression;
1080     DestroyAndReopen();
1081 
1082     ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
1083     Reopen(&options);
1084     ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
1085 
1086     // Write 8MB (80 values, each 100K)
1087     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
1088     const int N = 80;
1089     static const int S1 = 100000;
1090     static const int S2 = 105000;  // Allow some expansion from metadata
1091     Random rnd(301);
1092     for (int i = 0; i < N; i++) {
1093       ASSERT_OK(Put(Key(i), RandomString(&rnd, S1)));
1094     }
1095 
1096     // 0 because GetApproximateSizes() does not account for memtable space
1097     ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
1098 
1099     if (options.reuse_logs) {
1100       // Recovery will reuse memtable, and GetApproximateSizes() does not
1101       // account for memtable usage;
1102       Reopen(&options);
1103       ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
1104       continue;
1105     }
1106 
1107     // Check sizes across recovery by reopening a few times
1108     for (int run = 0; run < 3; run++) {
1109       Reopen(&options);
1110 
1111       for (int compact_start = 0; compact_start < N; compact_start += 10) {
1112         for (int i = 0; i < N; i += 10) {
1113           ASSERT_TRUE(Between(Size("", Key(i)), S1*i, S2*i));
1114           ASSERT_TRUE(Between(Size("", Key(i)+".suffix"), S1*(i+1), S2*(i+1)));
1115           ASSERT_TRUE(Between(Size(Key(i), Key(i+10)), S1*10, S2*10));
1116         }
1117         ASSERT_TRUE(Between(Size("", Key(50)), S1*50, S2*50));
1118         ASSERT_TRUE(Between(Size("", Key(50)+".suffix"), S1*50, S2*50));
1119 
1120         std::string cstart_str = Key(compact_start);
1121         std::string cend_str = Key(compact_start + 9);
1122         Slice cstart = cstart_str;
1123         Slice cend = cend_str;
1124         dbfull()->TEST_CompactRange(0, &cstart, &cend);
1125       }
1126 
1127       ASSERT_EQ(NumTableFilesAtLevel(0), 0);
1128       ASSERT_GT(NumTableFilesAtLevel(1), 0);
1129     }
1130   } while (ChangeOptions());
1131 }
1132 
TEST(DBTest,ApproximateSizes_MixOfSmallAndLarge)1133 TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
1134   do {
1135     Options options = CurrentOptions();
1136     options.compression = kNoCompression;
1137     Reopen();
1138 
1139     Random rnd(301);
1140     std::string big1 = RandomString(&rnd, 100000);
1141     ASSERT_OK(Put(Key(0), RandomString(&rnd, 10000)));
1142     ASSERT_OK(Put(Key(1), RandomString(&rnd, 10000)));
1143     ASSERT_OK(Put(Key(2), big1));
1144     ASSERT_OK(Put(Key(3), RandomString(&rnd, 10000)));
1145     ASSERT_OK(Put(Key(4), big1));
1146     ASSERT_OK(Put(Key(5), RandomString(&rnd, 10000)));
1147     ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
1148     ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));
1149 
1150     if (options.reuse_logs) {
1151       // Need to force a memtable compaction since recovery does not do so.
1152       ASSERT_OK(dbfull()->TEST_CompactMemTable());
1153     }
1154 
1155     // Check sizes across recovery by reopening a few times
1156     for (int run = 0; run < 3; run++) {
1157       Reopen(&options);
1158 
1159       ASSERT_TRUE(Between(Size("", Key(0)), 0, 0));
1160       ASSERT_TRUE(Between(Size("", Key(1)), 10000, 11000));
1161       ASSERT_TRUE(Between(Size("", Key(2)), 20000, 21000));
1162       ASSERT_TRUE(Between(Size("", Key(3)), 120000, 121000));
1163       ASSERT_TRUE(Between(Size("", Key(4)), 130000, 131000));
1164       ASSERT_TRUE(Between(Size("", Key(5)), 230000, 231000));
1165       ASSERT_TRUE(Between(Size("", Key(6)), 240000, 241000));
1166       ASSERT_TRUE(Between(Size("", Key(7)), 540000, 541000));
1167       ASSERT_TRUE(Between(Size("", Key(8)), 550000, 560000));
1168 
1169       ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000));
1170 
1171       dbfull()->TEST_CompactRange(0, NULL, NULL);
1172     }
1173   } while (ChangeOptions());
1174 }
1175 
TEST(DBTest,IteratorPinsRef)1176 TEST(DBTest, IteratorPinsRef) {
1177   Put("foo", "hello");
1178 
1179   // Get iterator that will yield the current contents of the DB.
1180   Iterator* iter = db_->NewIterator(ReadOptions());
1181 
1182   // Write to force compactions
1183   Put("foo", "newvalue1");
1184   for (int i = 0; i < 100; i++) {
1185     ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
1186   }
1187   Put("foo", "newvalue2");
1188 
1189   iter->SeekToFirst();
1190   ASSERT_TRUE(iter->Valid());
1191   ASSERT_EQ("foo", iter->key().ToString());
1192   ASSERT_EQ("hello", iter->value().ToString());
1193   iter->Next();
1194   ASSERT_TRUE(!iter->Valid());
1195   delete iter;
1196 }
1197 
TEST(DBTest,Snapshot)1198 TEST(DBTest, Snapshot) {
1199   do {
1200     Put("foo", "v1");
1201     const Snapshot* s1 = db_->GetSnapshot();
1202     Put("foo", "v2");
1203     const Snapshot* s2 = db_->GetSnapshot();
1204     Put("foo", "v3");
1205     const Snapshot* s3 = db_->GetSnapshot();
1206 
1207     Put("foo", "v4");
1208     ASSERT_EQ("v1", Get("foo", s1));
1209     ASSERT_EQ("v2", Get("foo", s2));
1210     ASSERT_EQ("v3", Get("foo", s3));
1211     ASSERT_EQ("v4", Get("foo"));
1212 
1213     db_->ReleaseSnapshot(s3);
1214     ASSERT_EQ("v1", Get("foo", s1));
1215     ASSERT_EQ("v2", Get("foo", s2));
1216     ASSERT_EQ("v4", Get("foo"));
1217 
1218     db_->ReleaseSnapshot(s1);
1219     ASSERT_EQ("v2", Get("foo", s2));
1220     ASSERT_EQ("v4", Get("foo"));
1221 
1222     db_->ReleaseSnapshot(s2);
1223     ASSERT_EQ("v4", Get("foo"));
1224   } while (ChangeOptions());
1225 }
1226 
TEST(DBTest,HiddenValuesAreRemoved)1227 TEST(DBTest, HiddenValuesAreRemoved) {
1228   do {
1229     Random rnd(301);
1230     FillLevels("a", "z");
1231 
1232     std::string big = RandomString(&rnd, 50000);
1233     Put("foo", big);
1234     Put("pastfoo", "v");
1235     const Snapshot* snapshot = db_->GetSnapshot();
1236     Put("foo", "tiny");
1237     Put("pastfoo2", "v2");        // Advance sequence number one more
1238 
1239     ASSERT_OK(dbfull()->TEST_CompactMemTable());
1240     ASSERT_GT(NumTableFilesAtLevel(0), 0);
1241 
1242     ASSERT_EQ(big, Get("foo", snapshot));
1243     ASSERT_TRUE(Between(Size("", "pastfoo"), 50000, 60000));
1244     db_->ReleaseSnapshot(snapshot);
1245     ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]");
1246     Slice x("x");
1247     dbfull()->TEST_CompactRange(0, NULL, &x);
1248     ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
1249     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
1250     ASSERT_GE(NumTableFilesAtLevel(1), 1);
1251     dbfull()->TEST_CompactRange(1, NULL, &x);
1252     ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
1253 
1254     ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000));
1255   } while (ChangeOptions());
1256 }
1257 
TEST(DBTest,DeletionMarkers1)1258 TEST(DBTest, DeletionMarkers1) {
1259   Put("foo", "v1");
1260   ASSERT_OK(dbfull()->TEST_CompactMemTable());
1261   const int last = config::kMaxMemCompactLevel;
1262   ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo => v1 is now in last level
1263 
1264   // Place a table at level last-1 to prevent merging with preceding mutation
1265   Put("a", "begin");
1266   Put("z", "end");
1267   dbfull()->TEST_CompactMemTable();
1268   ASSERT_EQ(NumTableFilesAtLevel(last), 1);
1269   ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
1270 
1271   Delete("foo");
1272   Put("foo", "v2");
1273   ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
1274   ASSERT_OK(dbfull()->TEST_CompactMemTable());  // Moves to level last-2
1275   ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
1276   Slice z("z");
1277   dbfull()->TEST_CompactRange(last-2, NULL, &z);
1278   // DEL eliminated, but v1 remains because we aren't compacting that level
1279   // (DEL can be eliminated because v2 hides v1).
1280   ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
1281   dbfull()->TEST_CompactRange(last-1, NULL, NULL);
1282   // Merging last-1 w/ last, so we are the base level for "foo", so
1283   // DEL is removed.  (as is v1).
1284   ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
1285 }
1286 
TEST(DBTest,DeletionMarkers2)1287 TEST(DBTest, DeletionMarkers2) {
1288   Put("foo", "v1");
1289   ASSERT_OK(dbfull()->TEST_CompactMemTable());
1290   const int last = config::kMaxMemCompactLevel;
1291   ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo => v1 is now in last level
1292 
1293   // Place a table at level last-1 to prevent merging with preceding mutation
1294   Put("a", "begin");
1295   Put("z", "end");
1296   dbfull()->TEST_CompactMemTable();
1297   ASSERT_EQ(NumTableFilesAtLevel(last), 1);
1298   ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
1299 
1300   Delete("foo");
1301   ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
1302   ASSERT_OK(dbfull()->TEST_CompactMemTable());  // Moves to level last-2
1303   ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
1304   dbfull()->TEST_CompactRange(last-2, NULL, NULL);
1305   // DEL kept: "last" file overlaps
1306   ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
1307   dbfull()->TEST_CompactRange(last-1, NULL, NULL);
1308   // Merging last-1 w/ last, so we are the base level for "foo", so
1309   // DEL is removed.  (as is v1).
1310   ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
1311 }
1312 
TEST(DBTest,OverlapInLevel0)1313 TEST(DBTest, OverlapInLevel0) {
1314   do {
1315     ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config";
1316 
1317     // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
1318     ASSERT_OK(Put("100", "v100"));
1319     ASSERT_OK(Put("999", "v999"));
1320     dbfull()->TEST_CompactMemTable();
1321     ASSERT_OK(Delete("100"));
1322     ASSERT_OK(Delete("999"));
1323     dbfull()->TEST_CompactMemTable();
1324     ASSERT_EQ("0,1,1", FilesPerLevel());
1325 
1326     // Make files spanning the following ranges in level-0:
1327     //  files[0]  200 .. 900
1328     //  files[1]  300 .. 500
1329     // Note that files are sorted by smallest key.
1330     ASSERT_OK(Put("300", "v300"));
1331     ASSERT_OK(Put("500", "v500"));
1332     dbfull()->TEST_CompactMemTable();
1333     ASSERT_OK(Put("200", "v200"));
1334     ASSERT_OK(Put("600", "v600"));
1335     ASSERT_OK(Put("900", "v900"));
1336     dbfull()->TEST_CompactMemTable();
1337     ASSERT_EQ("2,1,1", FilesPerLevel());
1338 
1339     // Compact away the placeholder files we created initially
1340     dbfull()->TEST_CompactRange(1, NULL, NULL);
1341     dbfull()->TEST_CompactRange(2, NULL, NULL);
1342     ASSERT_EQ("2", FilesPerLevel());
1343 
1344     // Do a memtable compaction.  Before bug-fix, the compaction would
1345     // not detect the overlap with level-0 files and would incorrectly place
1346     // the deletion in a deeper level.
1347     ASSERT_OK(Delete("600"));
1348     dbfull()->TEST_CompactMemTable();
1349     ASSERT_EQ("3", FilesPerLevel());
1350     ASSERT_EQ("NOT_FOUND", Get("600"));
1351   } while (ChangeOptions());
1352 }
1353 
TEST(DBTest,L0_CompactionBug_Issue44_a)1354 TEST(DBTest, L0_CompactionBug_Issue44_a) {
1355   Reopen();
1356   ASSERT_OK(Put("b", "v"));
1357   Reopen();
1358   ASSERT_OK(Delete("b"));
1359   ASSERT_OK(Delete("a"));
1360   Reopen();
1361   ASSERT_OK(Delete("a"));
1362   Reopen();
1363   ASSERT_OK(Put("a", "v"));
1364   Reopen();
1365   Reopen();
1366   ASSERT_EQ("(a->v)", Contents());
1367   DelayMilliseconds(1000);  // Wait for compaction to finish
1368   ASSERT_EQ("(a->v)", Contents());
1369 }
1370 
TEST(DBTest,L0_CompactionBug_Issue44_b)1371 TEST(DBTest, L0_CompactionBug_Issue44_b) {
1372   Reopen();
1373   Put("","");
1374   Reopen();
1375   Delete("e");
1376   Put("","");
1377   Reopen();
1378   Put("c", "cv");
1379   Reopen();
1380   Put("","");
1381   Reopen();
1382   Put("","");
1383   DelayMilliseconds(1000);  // Wait for compaction to finish
1384   Reopen();
1385   Put("d","dv");
1386   Reopen();
1387   Put("","");
1388   Reopen();
1389   Delete("d");
1390   Delete("b");
1391   Reopen();
1392   ASSERT_EQ("(->)(c->cv)", Contents());
1393   DelayMilliseconds(1000);  // Wait for compaction to finish
1394   ASSERT_EQ("(->)(c->cv)", Contents());
1395 }
1396 
TEST(DBTest,ComparatorCheck)1397 TEST(DBTest, ComparatorCheck) {
1398   class NewComparator : public Comparator {
1399    public:
1400     virtual const char* Name() const { return "leveldb.NewComparator"; }
1401     virtual int Compare(const Slice& a, const Slice& b) const {
1402       return BytewiseComparator()->Compare(a, b);
1403     }
1404     virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
1405       BytewiseComparator()->FindShortestSeparator(s, l);
1406     }
1407     virtual void FindShortSuccessor(std::string* key) const {
1408       BytewiseComparator()->FindShortSuccessor(key);
1409     }
1410   };
1411   NewComparator cmp;
1412   Options new_options = CurrentOptions();
1413   new_options.comparator = &cmp;
1414   Status s = TryReopen(&new_options);
1415   ASSERT_TRUE(!s.ok());
1416   ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos)
1417       << s.ToString();
1418 }
1419 
TEST(DBTest,CustomComparator)1420 TEST(DBTest, CustomComparator) {
1421   class NumberComparator : public Comparator {
1422    public:
1423     virtual const char* Name() const { return "test.NumberComparator"; }
1424     virtual int Compare(const Slice& a, const Slice& b) const {
1425       return ToNumber(a) - ToNumber(b);
1426     }
1427     virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
1428       ToNumber(*s);     // Check format
1429       ToNumber(l);      // Check format
1430     }
1431     virtual void FindShortSuccessor(std::string* key) const {
1432       ToNumber(*key);   // Check format
1433     }
1434    private:
1435     static int ToNumber(const Slice& x) {
1436       // Check that there are no extra characters.
1437       ASSERT_TRUE(x.size() >= 2 && x[0] == '[' && x[x.size()-1] == ']')
1438           << EscapeString(x);
1439       int val;
1440       char ignored;
1441       ASSERT_TRUE(sscanf(x.ToString().c_str(), "[%i]%c", &val, &ignored) == 1)
1442           << EscapeString(x);
1443       return val;
1444     }
1445   };
1446   NumberComparator cmp;
1447   Options new_options = CurrentOptions();
1448   new_options.create_if_missing = true;
1449   new_options.comparator = &cmp;
1450   new_options.filter_policy = NULL;     // Cannot use bloom filters
1451   new_options.write_buffer_size = 1000;  // Compact more often
1452   DestroyAndReopen(&new_options);
1453   ASSERT_OK(Put("[10]", "ten"));
1454   ASSERT_OK(Put("[0x14]", "twenty"));
1455   for (int i = 0; i < 2; i++) {
1456     ASSERT_EQ("ten", Get("[10]"));
1457     ASSERT_EQ("ten", Get("[0xa]"));
1458     ASSERT_EQ("twenty", Get("[20]"));
1459     ASSERT_EQ("twenty", Get("[0x14]"));
1460     ASSERT_EQ("NOT_FOUND", Get("[15]"));
1461     ASSERT_EQ("NOT_FOUND", Get("[0xf]"));
1462     Compact("[0]", "[9999]");
1463   }
1464 
1465   for (int run = 0; run < 2; run++) {
1466     for (int i = 0; i < 1000; i++) {
1467       char buf[100];
1468       snprintf(buf, sizeof(buf), "[%d]", i*10);
1469       ASSERT_OK(Put(buf, buf));
1470     }
1471     Compact("[0]", "[1000000]");
1472   }
1473 }
1474 
TEST(DBTest,ManualCompaction)1475 TEST(DBTest, ManualCompaction) {
1476   ASSERT_EQ(config::kMaxMemCompactLevel, 2)
1477       << "Need to update this test to match kMaxMemCompactLevel";
1478 
1479   MakeTables(3, "p", "q");
1480   ASSERT_EQ("1,1,1", FilesPerLevel());
1481 
1482   // Compaction range falls before files
1483   Compact("", "c");
1484   ASSERT_EQ("1,1,1", FilesPerLevel());
1485 
1486   // Compaction range falls after files
1487   Compact("r", "z");
1488   ASSERT_EQ("1,1,1", FilesPerLevel());
1489 
1490   // Compaction range overlaps files
1491   Compact("p1", "p9");
1492   ASSERT_EQ("0,0,1", FilesPerLevel());
1493 
1494   // Populate a different range
1495   MakeTables(3, "c", "e");
1496   ASSERT_EQ("1,1,2", FilesPerLevel());
1497 
1498   // Compact just the new range
1499   Compact("b", "f");
1500   ASSERT_EQ("0,0,2", FilesPerLevel());
1501 
1502   // Compact all
1503   MakeTables(1, "a", "z");
1504   ASSERT_EQ("0,1,2", FilesPerLevel());
1505   db_->CompactRange(NULL, NULL);
1506   ASSERT_EQ("0,0,1", FilesPerLevel());
1507 }
1508 
TEST(DBTest,DBOpen_Options)1509 TEST(DBTest, DBOpen_Options) {
1510   std::string dbname = test::TmpDir() + "/db_options_test";
1511   DestroyDB(dbname, Options());
1512 
1513   // Does not exist, and create_if_missing == false: error
1514   DB* db = NULL;
1515   Options opts;
1516   opts.create_if_missing = false;
1517   Status s = DB::Open(opts, dbname, &db);
1518   ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != NULL);
1519   ASSERT_TRUE(db == NULL);
1520 
1521   // Does not exist, and create_if_missing == true: OK
1522   opts.create_if_missing = true;
1523   s = DB::Open(opts, dbname, &db);
1524   ASSERT_OK(s);
1525   ASSERT_TRUE(db != NULL);
1526 
1527   delete db;
1528   db = NULL;
1529 
1530   // Does exist, and error_if_exists == true: error
1531   opts.create_if_missing = false;
1532   opts.error_if_exists = true;
1533   s = DB::Open(opts, dbname, &db);
1534   ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != NULL);
1535   ASSERT_TRUE(db == NULL);
1536 
1537   // Does exist, and error_if_exists == false: OK
1538   opts.create_if_missing = true;
1539   opts.error_if_exists = false;
1540   s = DB::Open(opts, dbname, &db);
1541   ASSERT_OK(s);
1542   ASSERT_TRUE(db != NULL);
1543 
1544   delete db;
1545   db = NULL;
1546 }
1547 
TEST(DBTest,Locking)1548 TEST(DBTest, Locking) {
1549   DB* db2 = NULL;
1550   Status s = DB::Open(CurrentOptions(), dbname_, &db2);
1551   ASSERT_TRUE(!s.ok()) << "Locking did not prevent re-opening db";
1552 }
1553 
1554 // Check that number of files does not grow when we are out of space
TEST(DBTest,NoSpace)1555 TEST(DBTest, NoSpace) {
1556   Options options = CurrentOptions();
1557   options.env = env_;
1558   Reopen(&options);
1559 
1560   ASSERT_OK(Put("foo", "v1"));
1561   ASSERT_EQ("v1", Get("foo"));
1562   Compact("a", "z");
1563   const int num_files = CountFiles();
1564   env_->no_space_.Release_Store(env_);   // Force out-of-space errors
1565   for (int i = 0; i < 10; i++) {
1566     for (int level = 0; level < config::kNumLevels-1; level++) {
1567       dbfull()->TEST_CompactRange(level, NULL, NULL);
1568     }
1569   }
1570   env_->no_space_.Release_Store(NULL);
1571   ASSERT_LT(CountFiles(), num_files + 3);
1572 }
1573 
TEST(DBTest,NonWritableFileSystem)1574 TEST(DBTest, NonWritableFileSystem) {
1575   Options options = CurrentOptions();
1576   options.write_buffer_size = 1000;
1577   options.env = env_;
1578   Reopen(&options);
1579   ASSERT_OK(Put("foo", "v1"));
1580   env_->non_writable_.Release_Store(env_);  // Force errors for new files
1581   std::string big(100000, 'x');
1582   int errors = 0;
1583   for (int i = 0; i < 20; i++) {
1584     fprintf(stderr, "iter %d; errors %d\n", i, errors);
1585     if (!Put("foo", big).ok()) {
1586       errors++;
1587       DelayMilliseconds(100);
1588     }
1589   }
1590   ASSERT_GT(errors, 0);
1591   env_->non_writable_.Release_Store(NULL);
1592 }
1593 
TEST(DBTest,WriteSyncError)1594 TEST(DBTest, WriteSyncError) {
1595   // Check that log sync errors cause the DB to disallow future writes.
1596 
1597   // (a) Cause log sync calls to fail
1598   Options options = CurrentOptions();
1599   options.env = env_;
1600   Reopen(&options);
1601   env_->data_sync_error_.Release_Store(env_);
1602 
1603   // (b) Normal write should succeed
1604   WriteOptions w;
1605   ASSERT_OK(db_->Put(w, "k1", "v1"));
1606   ASSERT_EQ("v1", Get("k1"));
1607 
1608   // (c) Do a sync write; should fail
1609   w.sync = true;
1610   ASSERT_TRUE(!db_->Put(w, "k2", "v2").ok());
1611   ASSERT_EQ("v1", Get("k1"));
1612   ASSERT_EQ("NOT_FOUND", Get("k2"));
1613 
1614   // (d) make sync behave normally
1615   env_->data_sync_error_.Release_Store(NULL);
1616 
1617   // (e) Do a non-sync write; should fail
1618   w.sync = false;
1619   ASSERT_TRUE(!db_->Put(w, "k3", "v3").ok());
1620   ASSERT_EQ("v1", Get("k1"));
1621   ASSERT_EQ("NOT_FOUND", Get("k2"));
1622   ASSERT_EQ("NOT_FOUND", Get("k3"));
1623 }
1624 
TEST(DBTest,ManifestWriteError)1625 TEST(DBTest, ManifestWriteError) {
1626   // Test for the following problem:
1627   // (a) Compaction produces file F
1628   // (b) Log record containing F is written to MANIFEST file, but Sync() fails
1629   // (c) GC deletes F
1630   // (d) After reopening DB, reads fail since deleted F is named in log record
1631 
1632   // We iterate twice.  In the second iteration, everything is the
1633   // same except the log record never makes it to the MANIFEST file.
1634   for (int iter = 0; iter < 2; iter++) {
1635     port::AtomicPointer* error_type = (iter == 0)
1636         ? &env_->manifest_sync_error_
1637         : &env_->manifest_write_error_;
1638 
1639     // Insert foo=>bar mapping
1640     Options options = CurrentOptions();
1641     options.env = env_;
1642     options.create_if_missing = true;
1643     options.error_if_exists = false;
1644     DestroyAndReopen(&options);
1645     ASSERT_OK(Put("foo", "bar"));
1646     ASSERT_EQ("bar", Get("foo"));
1647 
1648     // Memtable compaction (will succeed)
1649     dbfull()->TEST_CompactMemTable();
1650     ASSERT_EQ("bar", Get("foo"));
1651     const int last = config::kMaxMemCompactLevel;
1652     ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo=>bar is now in last level
1653 
1654     // Merging compaction (will fail)
1655     error_type->Release_Store(env_);
1656     dbfull()->TEST_CompactRange(last, NULL, NULL);  // Should fail
1657     ASSERT_EQ("bar", Get("foo"));
1658 
1659     // Recovery: should not lose data
1660     error_type->Release_Store(NULL);
1661     Reopen(&options);
1662     ASSERT_EQ("bar", Get("foo"));
1663   }
1664 }
1665 
TEST(DBTest,MissingSSTFile)1666 TEST(DBTest, MissingSSTFile) {
1667   ASSERT_OK(Put("foo", "bar"));
1668   ASSERT_EQ("bar", Get("foo"));
1669 
1670   // Dump the memtable to disk.
1671   dbfull()->TEST_CompactMemTable();
1672   ASSERT_EQ("bar", Get("foo"));
1673 
1674   Close();
1675   ASSERT_TRUE(DeleteAnSSTFile());
1676   Options options = CurrentOptions();
1677   options.paranoid_checks = true;
1678   Status s = TryReopen(&options);
1679   ASSERT_TRUE(!s.ok());
1680   ASSERT_TRUE(s.ToString().find("issing") != std::string::npos)
1681       << s.ToString();
1682 }
1683 
TEST(DBTest,StillReadSST)1684 TEST(DBTest, StillReadSST) {
1685   ASSERT_OK(Put("foo", "bar"));
1686   ASSERT_EQ("bar", Get("foo"));
1687 
1688   // Dump the memtable to disk.
1689   dbfull()->TEST_CompactMemTable();
1690   ASSERT_EQ("bar", Get("foo"));
1691   Close();
1692   ASSERT_GT(RenameLDBToSST(), 0);
1693   Options options = CurrentOptions();
1694   options.paranoid_checks = true;
1695   Status s = TryReopen(&options);
1696   ASSERT_TRUE(s.ok());
1697   ASSERT_EQ("bar", Get("foo"));
1698 }
1699 
TEST(DBTest,FilesDeletedAfterCompaction)1700 TEST(DBTest, FilesDeletedAfterCompaction) {
1701   ASSERT_OK(Put("foo", "v2"));
1702   Compact("a", "z");
1703   const int num_files = CountFiles();
1704   for (int i = 0; i < 10; i++) {
1705     ASSERT_OK(Put("foo", "v2"));
1706     Compact("a", "z");
1707   }
1708   ASSERT_EQ(CountFiles(), num_files);
1709 }
1710 
TEST(DBTest,BloomFilter)1711 TEST(DBTest, BloomFilter) {
1712   env_->count_random_reads_ = true;
1713   Options options = CurrentOptions();
1714   options.env = env_;
1715   options.block_cache = NewLRUCache(0);  // Prevent cache hits
1716   options.filter_policy = NewBloomFilterPolicy(10);
1717   Reopen(&options);
1718 
1719   // Populate multiple layers
1720   const int N = 10000;
1721   for (int i = 0; i < N; i++) {
1722     ASSERT_OK(Put(Key(i), Key(i)));
1723   }
1724   Compact("a", "z");
1725   for (int i = 0; i < N; i += 100) {
1726     ASSERT_OK(Put(Key(i), Key(i)));
1727   }
1728   dbfull()->TEST_CompactMemTable();
1729 
1730   // Prevent auto compactions triggered by seeks
1731   env_->delay_data_sync_.Release_Store(env_);
1732 
1733   // Lookup present keys.  Should rarely read from small sstable.
1734   env_->random_read_counter_.Reset();
1735   for (int i = 0; i < N; i++) {
1736     ASSERT_EQ(Key(i), Get(Key(i)));
1737   }
1738   int reads = env_->random_read_counter_.Read();
1739   fprintf(stderr, "%d present => %d reads\n", N, reads);
1740   ASSERT_GE(reads, N);
1741   ASSERT_LE(reads, N + 2*N/100);
1742 
1743   // Lookup present keys.  Should rarely read from either sstable.
1744   env_->random_read_counter_.Reset();
1745   for (int i = 0; i < N; i++) {
1746     ASSERT_EQ("NOT_FOUND", Get(Key(i) + ".missing"));
1747   }
1748   reads = env_->random_read_counter_.Read();
1749   fprintf(stderr, "%d missing => %d reads\n", N, reads);
1750   ASSERT_LE(reads, 3*N/100);
1751 
1752   env_->delay_data_sync_.Release_Store(NULL);
1753   Close();
1754   delete options.block_cache;
1755   delete options.filter_policy;
1756 }
1757 
1758 // Multi-threaded test:
1759 namespace {
1760 
1761 static const int kNumThreads = 4;
1762 static const int kTestSeconds = 10;
1763 static const int kNumKeys = 1000;
1764 
1765 struct MTState {
1766   DBTest* test;
1767   port::AtomicPointer stop;
1768   port::AtomicPointer counter[kNumThreads];
1769   port::AtomicPointer thread_done[kNumThreads];
1770 };
1771 
1772 struct MTThread {
1773   MTState* state;
1774   int id;
1775 };
1776 
MTThreadBody(void * arg)1777 static void MTThreadBody(void* arg) {
1778   MTThread* t = reinterpret_cast<MTThread*>(arg);
1779   int id = t->id;
1780   DB* db = t->state->test->db_;
1781   uintptr_t counter = 0;
1782   fprintf(stderr, "... starting thread %d\n", id);
1783   Random rnd(1000 + id);
1784   std::string value;
1785   char valbuf[1500];
1786   while (t->state->stop.Acquire_Load() == NULL) {
1787     t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
1788 
1789     int key = rnd.Uniform(kNumKeys);
1790     char keybuf[20];
1791     snprintf(keybuf, sizeof(keybuf), "%016d", key);
1792 
1793     if (rnd.OneIn(2)) {
1794       // Write values of the form <key, my id, counter>.
1795       // We add some padding for force compactions.
1796       snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
1797                key, id, static_cast<int>(counter));
1798       ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
1799     } else {
1800       // Read a value and verify that it matches the pattern written above.
1801       Status s = db->Get(ReadOptions(), Slice(keybuf), &value);
1802       if (s.IsNotFound()) {
1803         // Key has not yet been written
1804       } else {
1805         // Check that the writer thread counter is >= the counter in the value
1806         ASSERT_OK(s);
1807         int k, w, c;
1808         ASSERT_EQ(3, sscanf(value.c_str(), "%d.%d.%d", &k, &w, &c)) << value;
1809         ASSERT_EQ(k, key);
1810         ASSERT_GE(w, 0);
1811         ASSERT_LT(w, kNumThreads);
1812         ASSERT_LE(static_cast<uintptr_t>(c), reinterpret_cast<uintptr_t>(
1813             t->state->counter[w].Acquire_Load()));
1814       }
1815     }
1816     counter++;
1817   }
1818   t->state->thread_done[id].Release_Store(t);
1819   fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
1820 }
1821 
1822 }  // namespace
1823 
TEST(DBTest,MultiThreaded)1824 TEST(DBTest, MultiThreaded) {
1825   do {
1826     // Initialize state
1827     MTState mt;
1828     mt.test = this;
1829     mt.stop.Release_Store(0);
1830     for (int id = 0; id < kNumThreads; id++) {
1831       mt.counter[id].Release_Store(0);
1832       mt.thread_done[id].Release_Store(0);
1833     }
1834 
1835     // Start threads
1836     MTThread thread[kNumThreads];
1837     for (int id = 0; id < kNumThreads; id++) {
1838       thread[id].state = &mt;
1839       thread[id].id = id;
1840       env_->StartThread(MTThreadBody, &thread[id]);
1841     }
1842 
1843     // Let them run for a while
1844     DelayMilliseconds(kTestSeconds * 1000);
1845 
1846     // Stop the threads and wait for them to finish
1847     mt.stop.Release_Store(&mt);
1848     for (int id = 0; id < kNumThreads; id++) {
1849       while (mt.thread_done[id].Acquire_Load() == NULL) {
1850         DelayMilliseconds(100);
1851       }
1852     }
1853   } while (ChangeOptions());
1854 }
1855 
1856 namespace {
1857 typedef std::map<std::string, std::string> KVMap;
1858 }
1859 
1860 class ModelDB: public DB {
1861  public:
1862   class ModelSnapshot : public Snapshot {
1863    public:
1864     KVMap map_;
1865   };
1866 
ModelDB(const Options & options)1867   explicit ModelDB(const Options& options): options_(options) { }
~ModelDB()1868   ~ModelDB() { }
Put(const WriteOptions & o,const Slice & k,const Slice & v)1869   virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) {
1870     return DB::Put(o, k, v);
1871   }
Delete(const WriteOptions & o,const Slice & key)1872   virtual Status Delete(const WriteOptions& o, const Slice& key) {
1873     return DB::Delete(o, key);
1874   }
Get(const ReadOptions & options,const Slice & key,std::string * value)1875   virtual Status Get(const ReadOptions& options,
1876                      const Slice& key, std::string* value) {
1877     assert(false);      // Not implemented
1878     return Status::NotFound(key);
1879   }
NewIterator(const ReadOptions & options)1880   virtual Iterator* NewIterator(const ReadOptions& options) {
1881     if (options.snapshot == NULL) {
1882       KVMap* saved = new KVMap;
1883       *saved = map_;
1884       return new ModelIter(saved, true);
1885     } else {
1886       const KVMap* snapshot_state =
1887           &(reinterpret_cast<const ModelSnapshot*>(options.snapshot)->map_);
1888       return new ModelIter(snapshot_state, false);
1889     }
1890   }
GetSnapshot()1891   virtual const Snapshot* GetSnapshot() {
1892     ModelSnapshot* snapshot = new ModelSnapshot;
1893     snapshot->map_ = map_;
1894     return snapshot;
1895   }
1896 
ReleaseSnapshot(const Snapshot * snapshot)1897   virtual void ReleaseSnapshot(const Snapshot* snapshot) {
1898     delete reinterpret_cast<const ModelSnapshot*>(snapshot);
1899   }
Write(const WriteOptions & options,WriteBatch * batch)1900   virtual Status Write(const WriteOptions& options, WriteBatch* batch) {
1901     class Handler : public WriteBatch::Handler {
1902      public:
1903       KVMap* map_;
1904       virtual void Put(const Slice& key, const Slice& value) {
1905         (*map_)[key.ToString()] = value.ToString();
1906       }
1907       virtual void Delete(const Slice& key) {
1908         map_->erase(key.ToString());
1909       }
1910     };
1911     Handler handler;
1912     handler.map_ = &map_;
1913     return batch->Iterate(&handler);
1914   }
1915 
GetProperty(const Slice & property,std::string * value)1916   virtual bool GetProperty(const Slice& property, std::string* value) {
1917     return false;
1918   }
GetApproximateSizes(const Range * r,int n,uint64_t * sizes)1919   virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) {
1920     for (int i = 0; i < n; i++) {
1921       sizes[i] = 0;
1922     }
1923   }
CompactRange(const Slice * start,const Slice * end)1924   virtual void CompactRange(const Slice* start, const Slice* end) {
1925   }
1926 
1927  private:
1928   class ModelIter: public Iterator {
1929    public:
ModelIter(const KVMap * map,bool owned)1930     ModelIter(const KVMap* map, bool owned)
1931         : map_(map), owned_(owned), iter_(map_->end()) {
1932     }
~ModelIter()1933     ~ModelIter() {
1934       if (owned_) delete map_;
1935     }
Valid() const1936     virtual bool Valid() const { return iter_ != map_->end(); }
SeekToFirst()1937     virtual void SeekToFirst() { iter_ = map_->begin(); }
SeekToLast()1938     virtual void SeekToLast() {
1939       if (map_->empty()) {
1940         iter_ = map_->end();
1941       } else {
1942         iter_ = map_->find(map_->rbegin()->first);
1943       }
1944     }
Seek(const Slice & k)1945     virtual void Seek(const Slice& k) {
1946       iter_ = map_->lower_bound(k.ToString());
1947     }
Next()1948     virtual void Next() { ++iter_; }
Prev()1949     virtual void Prev() { --iter_; }
key() const1950     virtual Slice key() const { return iter_->first; }
value() const1951     virtual Slice value() const { return iter_->second; }
status() const1952     virtual Status status() const { return Status::OK(); }
1953    private:
1954     const KVMap* const map_;
1955     const bool owned_;  // Do we own map_
1956     KVMap::const_iterator iter_;
1957   };
1958   const Options options_;
1959   KVMap map_;
1960 };
1961 
RandomKey(Random * rnd)1962 static std::string RandomKey(Random* rnd) {
1963   int len = (rnd->OneIn(3)
1964              ? 1                // Short sometimes to encourage collisions
1965              : (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
1966   return test::RandomKey(rnd, len);
1967 }
1968 
CompareIterators(int step,DB * model,DB * db,const Snapshot * model_snap,const Snapshot * db_snap)1969 static bool CompareIterators(int step,
1970                              DB* model,
1971                              DB* db,
1972                              const Snapshot* model_snap,
1973                              const Snapshot* db_snap) {
1974   ReadOptions options;
1975   options.snapshot = model_snap;
1976   Iterator* miter = model->NewIterator(options);
1977   options.snapshot = db_snap;
1978   Iterator* dbiter = db->NewIterator(options);
1979   bool ok = true;
1980   int count = 0;
1981   for (miter->SeekToFirst(), dbiter->SeekToFirst();
1982        ok && miter->Valid() && dbiter->Valid();
1983        miter->Next(), dbiter->Next()) {
1984     count++;
1985     if (miter->key().compare(dbiter->key()) != 0) {
1986       fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n",
1987               step,
1988               EscapeString(miter->key()).c_str(),
1989               EscapeString(dbiter->key()).c_str());
1990       ok = false;
1991       break;
1992     }
1993 
1994     if (miter->value().compare(dbiter->value()) != 0) {
1995       fprintf(stderr, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
1996               step,
1997               EscapeString(miter->key()).c_str(),
1998               EscapeString(miter->value()).c_str(),
1999               EscapeString(miter->value()).c_str());
2000       ok = false;
2001     }
2002   }
2003 
2004   if (ok) {
2005     if (miter->Valid() != dbiter->Valid()) {
2006       fprintf(stderr, "step %d: Mismatch at end of iterators: %d vs. %d\n",
2007               step, miter->Valid(), dbiter->Valid());
2008       ok = false;
2009     }
2010   }
2011   fprintf(stderr, "%d entries compared: ok=%d\n", count, ok);
2012   delete miter;
2013   delete dbiter;
2014   return ok;
2015 }
2016 
TEST(DBTest,Randomized)2017 TEST(DBTest, Randomized) {
2018   Random rnd(test::RandomSeed());
2019   do {
2020     ModelDB model(CurrentOptions());
2021     const int N = 10000;
2022     const Snapshot* model_snap = NULL;
2023     const Snapshot* db_snap = NULL;
2024     std::string k, v;
2025     for (int step = 0; step < N; step++) {
2026       if (step % 100 == 0) {
2027         fprintf(stderr, "Step %d of %d\n", step, N);
2028       }
2029       // TODO(sanjay): Test Get() works
2030       int p = rnd.Uniform(100);
2031       if (p < 45) {                               // Put
2032         k = RandomKey(&rnd);
2033         v = RandomString(&rnd,
2034                          rnd.OneIn(20)
2035                          ? 100 + rnd.Uniform(100)
2036                          : rnd.Uniform(8));
2037         ASSERT_OK(model.Put(WriteOptions(), k, v));
2038         ASSERT_OK(db_->Put(WriteOptions(), k, v));
2039 
2040       } else if (p < 90) {                        // Delete
2041         k = RandomKey(&rnd);
2042         ASSERT_OK(model.Delete(WriteOptions(), k));
2043         ASSERT_OK(db_->Delete(WriteOptions(), k));
2044 
2045 
2046       } else {                                    // Multi-element batch
2047         WriteBatch b;
2048         const int num = rnd.Uniform(8);
2049         for (int i = 0; i < num; i++) {
2050           if (i == 0 || !rnd.OneIn(10)) {
2051             k = RandomKey(&rnd);
2052           } else {
2053             // Periodically re-use the same key from the previous iter, so
2054             // we have multiple entries in the write batch for the same key
2055           }
2056           if (rnd.OneIn(2)) {
2057             v = RandomString(&rnd, rnd.Uniform(10));
2058             b.Put(k, v);
2059           } else {
2060             b.Delete(k);
2061           }
2062         }
2063         ASSERT_OK(model.Write(WriteOptions(), &b));
2064         ASSERT_OK(db_->Write(WriteOptions(), &b));
2065       }
2066 
2067       if ((step % 100) == 0) {
2068         ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
2069         ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap));
2070         // Save a snapshot from each DB this time that we'll use next
2071         // time we compare things, to make sure the current state is
2072         // preserved with the snapshot
2073         if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
2074         if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
2075 
2076         Reopen();
2077         ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
2078 
2079         model_snap = model.GetSnapshot();
2080         db_snap = db_->GetSnapshot();
2081       }
2082     }
2083     if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
2084     if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
2085   } while (ChangeOptions());
2086 }
2087 
MakeKey(unsigned int num)2088 std::string MakeKey(unsigned int num) {
2089   char buf[30];
2090   snprintf(buf, sizeof(buf), "%016u", num);
2091   return std::string(buf);
2092 }
2093 
BM_LogAndApply(int iters,int num_base_files)2094 void BM_LogAndApply(int iters, int num_base_files) {
2095   std::string dbname = test::TmpDir() + "/leveldb_test_benchmark";
2096   DestroyDB(dbname, Options());
2097 
2098   DB* db = NULL;
2099   Options opts;
2100   opts.create_if_missing = true;
2101   Status s = DB::Open(opts, dbname, &db);
2102   ASSERT_OK(s);
2103   ASSERT_TRUE(db != NULL);
2104 
2105   delete db;
2106   db = NULL;
2107 
2108   Env* env = Env::Default();
2109 
2110   port::Mutex mu;
2111   MutexLock l(&mu);
2112 
2113   InternalKeyComparator cmp(BytewiseComparator());
2114   Options options;
2115   VersionSet vset(dbname, &options, NULL, &cmp);
2116   bool save_manifest;
2117   ASSERT_OK(vset.Recover(&save_manifest));
2118   VersionEdit vbase;
2119   uint64_t fnum = 1;
2120   for (int i = 0; i < num_base_files; i++) {
2121     InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
2122     InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
2123     vbase.AddFile(2, fnum++, 1 /* file size */, start, limit);
2124   }
2125   ASSERT_OK(vset.LogAndApply(&vbase, &mu));
2126 
2127   uint64_t start_micros = env->NowMicros();
2128 
2129   for (int i = 0; i < iters; i++) {
2130     VersionEdit vedit;
2131     vedit.DeleteFile(2, fnum);
2132     InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
2133     InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
2134     vedit.AddFile(2, fnum++, 1 /* file size */, start, limit);
2135     vset.LogAndApply(&vedit, &mu);
2136   }
2137   uint64_t stop_micros = env->NowMicros();
2138   unsigned int us = stop_micros - start_micros;
2139   char buf[16];
2140   snprintf(buf, sizeof(buf), "%d", num_base_files);
2141   fprintf(stderr,
2142           "BM_LogAndApply/%-6s   %8d iters : %9u us (%7.0f us / iter)\n",
2143           buf, iters, us, ((float)us) / iters);
2144 }
2145 
2146 }  // namespace leveldb
2147 
main(int argc,char ** argv)2148 int main(int argc, char** argv) {
2149   if (argc > 1 && std::string(argv[1]) == "--benchmark") {
2150     leveldb::BM_LogAndApply(1000, 1);
2151     leveldb::BM_LogAndApply(1000, 100);
2152     leveldb::BM_LogAndApply(1000, 10000);
2153     leveldb::BM_LogAndApply(100, 100000);
2154     return 0;
2155   }
2156 
2157   return leveldb::test::RunAllTests();
2158 }
2159