1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/memtable_list.h"
7 #include <algorithm>
8 #include <string>
9 #include <vector>
10 #include "db/merge_context.h"
11 #include "db/version_set.h"
12 #include "db/write_controller.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/status.h"
15 #include "rocksdb/write_buffer_manager.h"
16 #include "test_util/testharness.h"
17 #include "test_util/testutil.h"
18 #include "util/string_util.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 
22 class MemTableListTest : public testing::Test {
23  public:
24   std::string dbname;
25   DB* db;
26   Options options;
27   std::vector<ColumnFamilyHandle*> handles;
28   std::atomic<uint64_t> file_number;
29 
MemTableListTest()30   MemTableListTest() : db(nullptr), file_number(1) {
31     dbname = test::PerThreadDBPath("memtable_list_test");
32     options.create_if_missing = true;
33     DestroyDB(dbname, options);
34   }
35 
36   // Create a test db if not yet created
CreateDB()37   void CreateDB() {
38     if (db == nullptr) {
39       options.create_if_missing = true;
40       DestroyDB(dbname, options);
41       // Open DB only with default column family
42       ColumnFamilyOptions cf_options;
43       std::vector<ColumnFamilyDescriptor> cf_descs;
44       cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
45       Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
46       EXPECT_OK(s);
47 
48       ColumnFamilyOptions cf_opt1, cf_opt2;
49       cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
50                                     std::numeric_limits<uint64_t>::max());
51       cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
52                                     std::numeric_limits<uint64_t>::max());
53       int sz = static_cast<int>(handles.size());
54       handles.resize(sz + 2);
55       s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
56       EXPECT_OK(s);
57       s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
58       EXPECT_OK(s);
59 
60       cf_descs.emplace_back("one", cf_options);
61       cf_descs.emplace_back("two", cf_options);
62     }
63   }
64 
~MemTableListTest()65   ~MemTableListTest() override {
66     if (db) {
67       std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
68       for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
69         handles[i]->GetDescriptor(&cf_descs[i]);
70       }
71       for (auto h : handles) {
72         if (h) {
73           db->DestroyColumnFamilyHandle(h);
74         }
75       }
76       handles.clear();
77       delete db;
78       db = nullptr;
79       DestroyDB(dbname, options, cf_descs);
80     }
81   }
82 
83   // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
84   // structures needed to call this function.
Mock_InstallMemtableFlushResults(MemTableList * list,const MutableCFOptions & mutable_cf_options,const autovector<MemTable * > & m,autovector<MemTable * > * to_delete)85   Status Mock_InstallMemtableFlushResults(
86       MemTableList* list, const MutableCFOptions& mutable_cf_options,
87       const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) {
88     // Create a mock Logger
89     test::NullLogger logger;
90     LogBuffer log_buffer(DEBUG_LEVEL, &logger);
91 
92     CreateDB();
93     // Create a mock VersionSet
94     DBOptions db_options;
95     db_options.file_system = FileSystem::Default();
96     ImmutableDBOptions immutable_db_options(db_options);
97     EnvOptions env_options;
98     std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
99     WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
100     WriteController write_controller(10000000u);
101 
102     VersionSet versions(dbname, &immutable_db_options, env_options,
103                         table_cache.get(), &write_buffer_manager,
104                         &write_controller, /*block_cache_tracer=*/nullptr);
105     std::vector<ColumnFamilyDescriptor> cf_descs;
106     cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
107     cf_descs.emplace_back("one", ColumnFamilyOptions());
108     cf_descs.emplace_back("two", ColumnFamilyOptions());
109 
110     EXPECT_OK(versions.Recover(cf_descs, false));
111 
112     // Create mock default ColumnFamilyData
113     auto column_family_set = versions.GetColumnFamilySet();
114     LogsWithPrepTracker dummy_prep_tracker;
115     auto cfd = column_family_set->GetDefault();
116     EXPECT_TRUE(nullptr != cfd);
117     uint64_t file_num = file_number.fetch_add(1);
118     // Create dummy mutex.
119     InstrumentedMutex mutex;
120     InstrumentedMutexLock l(&mutex);
121     std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
122     Status s = list->TryInstallMemtableFlushResults(
123         cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
124         file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
125     return s;
126   }
127 
128   // Calls MemTableList::InstallMemtableFlushResults() and sets up all
129   // structures needed to call this function.
Mock_InstallMemtableAtomicFlushResults(autovector<MemTableList * > & lists,const autovector<uint32_t> & cf_ids,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<const autovector<MemTable * > * > & mems_list,autovector<MemTable * > * to_delete)130   Status Mock_InstallMemtableAtomicFlushResults(
131       autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
132       const autovector<const MutableCFOptions*>& mutable_cf_options_list,
133       const autovector<const autovector<MemTable*>*>& mems_list,
134       autovector<MemTable*>* to_delete) {
135     // Create a mock Logger
136     test::NullLogger logger;
137     LogBuffer log_buffer(DEBUG_LEVEL, &logger);
138 
139     CreateDB();
140     // Create a mock VersionSet
141     DBOptions db_options;
142     db_options.file_system.reset(new LegacyFileSystemWrapper(db_options.env));
143 
144     ImmutableDBOptions immutable_db_options(db_options);
145     EnvOptions env_options;
146     std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
147     WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
148     WriteController write_controller(10000000u);
149 
150     VersionSet versions(dbname, &immutable_db_options, env_options,
151                         table_cache.get(), &write_buffer_manager,
152                         &write_controller, /*block_cache_tracer=*/nullptr);
153     std::vector<ColumnFamilyDescriptor> cf_descs;
154     cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
155     cf_descs.emplace_back("one", ColumnFamilyOptions());
156     cf_descs.emplace_back("two", ColumnFamilyOptions());
157     EXPECT_OK(versions.Recover(cf_descs, false));
158 
159     // Create mock default ColumnFamilyData
160 
161     auto column_family_set = versions.GetColumnFamilySet();
162 
163     LogsWithPrepTracker dummy_prep_tracker;
164     autovector<ColumnFamilyData*> cfds;
165     for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
166       cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
167       EXPECT_NE(nullptr, cfds[i]);
168     }
169     std::vector<FileMetaData> file_metas;
170     file_metas.reserve(cf_ids.size());
171     for (size_t i = 0; i != cf_ids.size(); ++i) {
172       FileMetaData meta;
173       uint64_t file_num = file_number.fetch_add(1);
174       meta.fd = FileDescriptor(file_num, 0, 0);
175       file_metas.emplace_back(meta);
176     }
177     autovector<FileMetaData*> file_meta_ptrs;
178     for (auto& meta : file_metas) {
179       file_meta_ptrs.push_back(&meta);
180     }
181     InstrumentedMutex mutex;
182     InstrumentedMutexLock l(&mutex);
183     return InstallMemtableAtomicFlushResults(
184         &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
185         file_meta_ptrs, to_delete, nullptr, &log_buffer);
186   }
187 };
188 
TEST_F(MemTableListTest,Empty)189 TEST_F(MemTableListTest, Empty) {
190   // Create an empty MemTableList and validate basic functions.
191   MemTableList list(1, 0, 0);
192 
193   ASSERT_EQ(0, list.NumNotFlushed());
194   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
195   ASSERT_FALSE(list.IsFlushPending());
196 
197   autovector<MemTable*> mems;
198   list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
199   ASSERT_EQ(0, mems.size());
200 
201   autovector<MemTable*> to_delete;
202   list.current()->Unref(&to_delete);
203   ASSERT_EQ(0, to_delete.size());
204 }
205 
TEST_F(MemTableListTest,GetTest)206 TEST_F(MemTableListTest, GetTest) {
207   // Create MemTableList
208   int min_write_buffer_number_to_merge = 2;
209   int max_write_buffer_number_to_maintain = 0;
210   int64_t max_write_buffer_size_to_maintain = 0;
211   MemTableList list(min_write_buffer_number_to_merge,
212                     max_write_buffer_number_to_maintain,
213                     max_write_buffer_size_to_maintain);
214 
215   SequenceNumber seq = 1;
216   std::string value;
217   Status s;
218   MergeContext merge_context;
219   InternalKeyComparator ikey_cmp(options.comparator);
220   SequenceNumber max_covering_tombstone_seq = 0;
221   autovector<MemTable*> to_delete;
222 
223   LookupKey lkey("key1", seq);
224   bool found = list.current()->Get(lkey, &value, &s, &merge_context,
225                                    &max_covering_tombstone_seq, ReadOptions());
226   ASSERT_FALSE(found);
227 
228   // Create a MemTable
229   InternalKeyComparator cmp(BytewiseComparator());
230   auto factory = std::make_shared<SkipListFactory>();
231   options.memtable_factory = factory;
232   ImmutableCFOptions ioptions(options);
233 
234   WriteBufferManager wb(options.db_write_buffer_size);
235   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
236                                kMaxSequenceNumber, 0 /* column_family_id */);
237   mem->Ref();
238 
239   // Write some keys to this memtable.
240   mem->Add(++seq, kTypeDeletion, "key1", "");
241   mem->Add(++seq, kTypeValue, "key2", "value2");
242   mem->Add(++seq, kTypeValue, "key1", "value1");
243   mem->Add(++seq, kTypeValue, "key2", "value2.2");
244 
245   // Fetch the newly written keys
246   merge_context.Clear();
247   found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
248                    &max_covering_tombstone_seq, ReadOptions());
249   ASSERT_TRUE(s.ok() && found);
250   ASSERT_EQ(value, "value1");
251 
252   merge_context.Clear();
253   found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context,
254                    &max_covering_tombstone_seq, ReadOptions());
255   // MemTable found out that this key is *not* found (at this sequence#)
256   ASSERT_TRUE(found && s.IsNotFound());
257 
258   merge_context.Clear();
259   found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
260                    &max_covering_tombstone_seq, ReadOptions());
261   ASSERT_TRUE(s.ok() && found);
262   ASSERT_EQ(value, "value2.2");
263 
264   ASSERT_EQ(4, mem->num_entries());
265   ASSERT_EQ(1, mem->num_deletes());
266 
267   // Add memtable to list
268   list.Add(mem, &to_delete);
269 
270   SequenceNumber saved_seq = seq;
271 
272   // Create another memtable and write some keys to it
273   WriteBufferManager wb2(options.db_write_buffer_size);
274   MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
275                                 kMaxSequenceNumber, 0 /* column_family_id */);
276   mem2->Ref();
277 
278   mem2->Add(++seq, kTypeDeletion, "key1", "");
279   mem2->Add(++seq, kTypeValue, "key2", "value2.3");
280 
281   // Add second memtable to list
282   list.Add(mem2, &to_delete);
283 
284   // Fetch keys via MemTableList
285   merge_context.Clear();
286   found =
287       list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
288                           &max_covering_tombstone_seq, ReadOptions());
289   ASSERT_TRUE(found && s.IsNotFound());
290 
291   merge_context.Clear();
292   found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s,
293                               &merge_context, &max_covering_tombstone_seq,
294                               ReadOptions());
295   ASSERT_TRUE(s.ok() && found);
296   ASSERT_EQ("value1", value);
297 
298   merge_context.Clear();
299   found =
300       list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
301                           &max_covering_tombstone_seq, ReadOptions());
302   ASSERT_TRUE(s.ok() && found);
303   ASSERT_EQ(value, "value2.3");
304 
305   merge_context.Clear();
306   found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context,
307                               &max_covering_tombstone_seq, ReadOptions());
308   ASSERT_FALSE(found);
309 
310   ASSERT_EQ(2, list.NumNotFlushed());
311 
312   list.current()->Unref(&to_delete);
313   for (MemTable* m : to_delete) {
314     delete m;
315   }
316 }
317 
TEST_F(MemTableListTest,GetFromHistoryTest)318 TEST_F(MemTableListTest, GetFromHistoryTest) {
319   // Create MemTableList
320   int min_write_buffer_number_to_merge = 2;
321   int max_write_buffer_number_to_maintain = 2;
322   int64_t max_write_buffer_size_to_maintain = 2000;
323   MemTableList list(min_write_buffer_number_to_merge,
324                     max_write_buffer_number_to_maintain,
325                     max_write_buffer_size_to_maintain);
326 
327   SequenceNumber seq = 1;
328   std::string value;
329   Status s;
330   MergeContext merge_context;
331   InternalKeyComparator ikey_cmp(options.comparator);
332   SequenceNumber max_covering_tombstone_seq = 0;
333   autovector<MemTable*> to_delete;
334 
335   LookupKey lkey("key1", seq);
336   bool found = list.current()->Get(lkey, &value, &s, &merge_context,
337                                    &max_covering_tombstone_seq, ReadOptions());
338   ASSERT_FALSE(found);
339 
340   // Create a MemTable
341   InternalKeyComparator cmp(BytewiseComparator());
342   auto factory = std::make_shared<SkipListFactory>();
343   options.memtable_factory = factory;
344   ImmutableCFOptions ioptions(options);
345 
346   WriteBufferManager wb(options.db_write_buffer_size);
347   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
348                                kMaxSequenceNumber, 0 /* column_family_id */);
349   mem->Ref();
350 
351   // Write some keys to this memtable.
352   mem->Add(++seq, kTypeDeletion, "key1", "");
353   mem->Add(++seq, kTypeValue, "key2", "value2");
354   mem->Add(++seq, kTypeValue, "key2", "value2.2");
355 
356   // Fetch the newly written keys
357   merge_context.Clear();
358   found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
359                    &max_covering_tombstone_seq, ReadOptions());
360   // MemTable found out that this key is *not* found (at this sequence#)
361   ASSERT_TRUE(found && s.IsNotFound());
362 
363   merge_context.Clear();
364   found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
365                    &max_covering_tombstone_seq, ReadOptions());
366   ASSERT_TRUE(s.ok() && found);
367   ASSERT_EQ(value, "value2.2");
368 
369   // Add memtable to list
370   list.Add(mem, &to_delete);
371   ASSERT_EQ(0, to_delete.size());
372 
373   // Fetch keys via MemTableList
374   merge_context.Clear();
375   found =
376       list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
377                           &max_covering_tombstone_seq, ReadOptions());
378   ASSERT_TRUE(found && s.IsNotFound());
379 
380   merge_context.Clear();
381   found =
382       list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
383                           &max_covering_tombstone_seq, ReadOptions());
384   ASSERT_TRUE(s.ok() && found);
385   ASSERT_EQ("value2.2", value);
386 
387   // Flush this memtable from the list.
388   // (It will then be a part of the memtable history).
389   autovector<MemTable*> to_flush;
390   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
391   ASSERT_EQ(1, to_flush.size());
392 
393   MutableCFOptions mutable_cf_options(options);
394   s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
395                                        &to_delete);
396   ASSERT_OK(s);
397   ASSERT_EQ(0, list.NumNotFlushed());
398   ASSERT_EQ(1, list.NumFlushed());
399   ASSERT_EQ(0, to_delete.size());
400 
401   // Verify keys are no longer in MemTableList
402   merge_context.Clear();
403   found =
404       list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
405                           &max_covering_tombstone_seq, ReadOptions());
406   ASSERT_FALSE(found);
407 
408   merge_context.Clear();
409   found =
410       list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
411                           &max_covering_tombstone_seq, ReadOptions());
412   ASSERT_FALSE(found);
413 
414   // Verify keys are present in history
415   merge_context.Clear();
416   found = list.current()->GetFromHistory(
417       LookupKey("key1", seq), &value, &s, &merge_context,
418       &max_covering_tombstone_seq, ReadOptions());
419   ASSERT_TRUE(found && s.IsNotFound());
420 
421   merge_context.Clear();
422   found = list.current()->GetFromHistory(
423       LookupKey("key2", seq), &value, &s, &merge_context,
424       &max_covering_tombstone_seq, ReadOptions());
425   ASSERT_TRUE(found);
426   ASSERT_EQ("value2.2", value);
427 
428   // Create another memtable and write some keys to it
429   WriteBufferManager wb2(options.db_write_buffer_size);
430   MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
431                                 kMaxSequenceNumber, 0 /* column_family_id */);
432   mem2->Ref();
433 
434   mem2->Add(++seq, kTypeDeletion, "key1", "");
435   mem2->Add(++seq, kTypeValue, "key3", "value3");
436 
437   // Add second memtable to list
438   list.Add(mem2, &to_delete);
439   ASSERT_EQ(0, to_delete.size());
440 
441   to_flush.clear();
442   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
443   ASSERT_EQ(1, to_flush.size());
444 
445   // Flush second memtable
446   s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
447                                        &to_delete);
448   ASSERT_OK(s);
449   ASSERT_EQ(0, list.NumNotFlushed());
450   ASSERT_EQ(2, list.NumFlushed());
451   ASSERT_EQ(0, to_delete.size());
452 
453   // Add a third memtable to push the first memtable out of the history
454   WriteBufferManager wb3(options.db_write_buffer_size);
455   MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
456                                 kMaxSequenceNumber, 0 /* column_family_id */);
457   mem3->Ref();
458   list.Add(mem3, &to_delete);
459   ASSERT_EQ(1, list.NumNotFlushed());
460   ASSERT_EQ(1, list.NumFlushed());
461   ASSERT_EQ(1, to_delete.size());
462 
463   // Verify keys are no longer in MemTableList
464   merge_context.Clear();
465   found =
466       list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
467                           &max_covering_tombstone_seq, ReadOptions());
468   ASSERT_FALSE(found);
469 
470   merge_context.Clear();
471   found =
472       list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
473                           &max_covering_tombstone_seq, ReadOptions());
474   ASSERT_FALSE(found);
475 
476   merge_context.Clear();
477   found =
478       list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context,
479                           &max_covering_tombstone_seq, ReadOptions());
480   ASSERT_FALSE(found);
481 
482   // Verify that the second memtable's keys are in the history
483   merge_context.Clear();
484   found = list.current()->GetFromHistory(
485       LookupKey("key1", seq), &value, &s, &merge_context,
486       &max_covering_tombstone_seq, ReadOptions());
487   ASSERT_TRUE(found && s.IsNotFound());
488 
489   merge_context.Clear();
490   found = list.current()->GetFromHistory(
491       LookupKey("key3", seq), &value, &s, &merge_context,
492       &max_covering_tombstone_seq, ReadOptions());
493   ASSERT_TRUE(found);
494   ASSERT_EQ("value3", value);
495 
496   // Verify that key2 from the first memtable is no longer in the history
497   merge_context.Clear();
498   found =
499       list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
500                           &max_covering_tombstone_seq, ReadOptions());
501   ASSERT_FALSE(found);
502 
503   // Cleanup
504   list.current()->Unref(&to_delete);
505   ASSERT_EQ(3, to_delete.size());
506   for (MemTable* m : to_delete) {
507     delete m;
508   }
509 }
510 
TEST_F(MemTableListTest,FlushPendingTest)511 TEST_F(MemTableListTest, FlushPendingTest) {
512   const int num_tables = 6;
513   SequenceNumber seq = 1;
514   Status s;
515 
516   auto factory = std::make_shared<SkipListFactory>();
517   options.memtable_factory = factory;
518   ImmutableCFOptions ioptions(options);
519   InternalKeyComparator cmp(BytewiseComparator());
520   WriteBufferManager wb(options.db_write_buffer_size);
521   autovector<MemTable*> to_delete;
522 
523   // Create MemTableList
524   int min_write_buffer_number_to_merge = 3;
525   int max_write_buffer_number_to_maintain = 7;
526   int64_t max_write_buffer_size_to_maintain =
527       7 * static_cast<int>(options.write_buffer_size);
528   MemTableList list(min_write_buffer_number_to_merge,
529                     max_write_buffer_number_to_maintain,
530                     max_write_buffer_size_to_maintain);
531 
532   // Create some MemTables
533   uint64_t memtable_id = 0;
534   std::vector<MemTable*> tables;
535   MutableCFOptions mutable_cf_options(options);
536   for (int i = 0; i < num_tables; i++) {
537     MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
538                                  kMaxSequenceNumber, 0 /* column_family_id */);
539     mem->SetID(memtable_id++);
540     mem->Ref();
541 
542     std::string value;
543     MergeContext merge_context;
544 
545     mem->Add(++seq, kTypeValue, "key1", ToString(i));
546     mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
547     mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
548     mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
549     mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
550 
551     tables.push_back(mem);
552   }
553 
554   // Nothing to flush
555   ASSERT_FALSE(list.IsFlushPending());
556   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
557   autovector<MemTable*> to_flush;
558   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
559   ASSERT_EQ(0, to_flush.size());
560 
561   // Request a flush even though there is nothing to flush
562   list.FlushRequested();
563   ASSERT_FALSE(list.IsFlushPending());
564   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
565 
566   // Attempt to 'flush' to clear request for flush
567   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
568   ASSERT_EQ(0, to_flush.size());
569   ASSERT_FALSE(list.IsFlushPending());
570   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
571 
572   // Request a flush again
573   list.FlushRequested();
574   // No flush pending since the list is empty.
575   ASSERT_FALSE(list.IsFlushPending());
576   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
577 
578   // Add 2 tables
579   list.Add(tables[0], &to_delete);
580   list.Add(tables[1], &to_delete);
581   ASSERT_EQ(2, list.NumNotFlushed());
582   ASSERT_EQ(0, to_delete.size());
583 
584   // Even though we have less than the minimum to flush, a flush is
585   // pending since we had previously requested a flush and never called
586   // PickMemtablesToFlush() to clear the flush.
587   ASSERT_TRUE(list.IsFlushPending());
588   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
589 
590   // Pick tables to flush
591   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
592   ASSERT_EQ(2, to_flush.size());
593   ASSERT_EQ(2, list.NumNotFlushed());
594   ASSERT_FALSE(list.IsFlushPending());
595   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
596 
597   // Revert flush
598   list.RollbackMemtableFlush(to_flush, 0);
599   ASSERT_FALSE(list.IsFlushPending());
600   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
601   to_flush.clear();
602 
603   // Add another table
604   list.Add(tables[2], &to_delete);
605   // We now have the minimum to flush regardles of whether FlushRequested()
606   // was called.
607   ASSERT_TRUE(list.IsFlushPending());
608   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
609   ASSERT_EQ(0, to_delete.size());
610 
611   // Pick tables to flush
612   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
613   ASSERT_EQ(3, to_flush.size());
614   ASSERT_EQ(3, list.NumNotFlushed());
615   ASSERT_FALSE(list.IsFlushPending());
616   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
617 
618   // Pick tables to flush again
619   autovector<MemTable*> to_flush2;
620   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
621   ASSERT_EQ(0, to_flush2.size());
622   ASSERT_EQ(3, list.NumNotFlushed());
623   ASSERT_FALSE(list.IsFlushPending());
624   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
625 
626   // Add another table
627   list.Add(tables[3], &to_delete);
628   ASSERT_FALSE(list.IsFlushPending());
629   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
630   ASSERT_EQ(0, to_delete.size());
631 
632   // Request a flush again
633   list.FlushRequested();
634   ASSERT_TRUE(list.IsFlushPending());
635   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
636 
637   // Pick tables to flush again
638   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
639   ASSERT_EQ(1, to_flush2.size());
640   ASSERT_EQ(4, list.NumNotFlushed());
641   ASSERT_FALSE(list.IsFlushPending());
642   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
643 
644   // Rollback first pick of tables
645   list.RollbackMemtableFlush(to_flush, 0);
646   ASSERT_TRUE(list.IsFlushPending());
647   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
648   to_flush.clear();
649 
650   // Add another tables
651   list.Add(tables[4], &to_delete);
652   ASSERT_EQ(5, list.NumNotFlushed());
653   // We now have the minimum to flush regardles of whether FlushRequested()
654   ASSERT_TRUE(list.IsFlushPending());
655   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
656   ASSERT_EQ(0, to_delete.size());
657 
658   // Pick tables to flush
659   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
660   // Should pick 4 of 5 since 1 table has been picked in to_flush2
661   ASSERT_EQ(4, to_flush.size());
662   ASSERT_EQ(5, list.NumNotFlushed());
663   ASSERT_FALSE(list.IsFlushPending());
664   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
665 
666   // Pick tables to flush again
667   autovector<MemTable*> to_flush3;
668   list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
669   ASSERT_EQ(0, to_flush3.size());  // nothing not in progress of being flushed
670   ASSERT_EQ(5, list.NumNotFlushed());
671   ASSERT_FALSE(list.IsFlushPending());
672   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
673 
674   // Flush the 4 memtables that were picked in to_flush
675   s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
676                                        &to_delete);
677   ASSERT_OK(s);
678 
679   // Note:  now to_flush contains tables[0,1,2,4].  to_flush2 contains
680   // tables[3].
681   // Current implementation will only commit memtables in the order they were
682   // created. So TryInstallMemtableFlushResults will install the first 3 tables
683   // in to_flush and stop when it encounters a table not yet flushed.
684   ASSERT_EQ(2, list.NumNotFlushed());
685   int num_in_history =
686       std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
687                       static_cast<int>(options.write_buffer_size));
688   ASSERT_EQ(num_in_history, list.NumFlushed());
689   ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
690 
691   // Request a flush again. Should be nothing to flush
692   list.FlushRequested();
693   ASSERT_FALSE(list.IsFlushPending());
694   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
695 
696   // Flush the 1 memtable that was picked in to_flush2
697   s = MemTableListTest::Mock_InstallMemtableFlushResults(
698       &list, mutable_cf_options, to_flush2, &to_delete);
699   ASSERT_OK(s);
700 
701   // This will actually install 2 tables.  The 1 we told it to flush, and also
702   // tables[4] which has been waiting for tables[3] to commit.
703   ASSERT_EQ(0, list.NumNotFlushed());
704   num_in_history =
705       std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
706                       static_cast<int>(options.write_buffer_size));
707   ASSERT_EQ(num_in_history, list.NumFlushed());
708   ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
709 
710   for (const auto& m : to_delete) {
711     // Refcount should be 0 after calling TryInstallMemtableFlushResults.
712     // Verify this, by Ref'ing then UnRef'ing:
713     m->Ref();
714     ASSERT_EQ(m, m->Unref());
715     delete m;
716   }
717   to_delete.clear();
718 
719   // Add another table
720   list.Add(tables[5], &to_delete);
721   ASSERT_EQ(1, list.NumNotFlushed());
722   ASSERT_EQ(5, list.GetLatestMemTableID());
723   memtable_id = 4;
724   // Pick tables to flush. The tables to pick must have ID smaller than or
725   // equal to 4. Therefore, no table will be selected in this case.
726   autovector<MemTable*> to_flush4;
727   list.FlushRequested();
728   ASSERT_TRUE(list.HasFlushRequested());
729   list.PickMemtablesToFlush(&memtable_id, &to_flush4);
730   ASSERT_TRUE(to_flush4.empty());
731   ASSERT_EQ(1, list.NumNotFlushed());
732   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
733   ASSERT_FALSE(list.IsFlushPending());
734   ASSERT_FALSE(list.HasFlushRequested());
735 
736   // Pick tables to flush. The tables to pick must have ID smaller than or
737   // equal to 5. Therefore, only tables[5] will be selected.
738   memtable_id = 5;
739   list.FlushRequested();
740   list.PickMemtablesToFlush(&memtable_id, &to_flush4);
741   ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
742   ASSERT_EQ(1, list.NumNotFlushed());
743   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
744   ASSERT_FALSE(list.IsFlushPending());
745   to_delete.clear();
746 
747   list.current()->Unref(&to_delete);
748   int to_delete_size =
749       std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
750                                static_cast<int>(options.write_buffer_size));
751   ASSERT_EQ(to_delete_size, to_delete.size());
752 
753   for (const auto& m : to_delete) {
754     // Refcount should be 0 after calling TryInstallMemtableFlushResults.
755     // Verify this, by Ref'ing then UnRef'ing:
756     m->Ref();
757     ASSERT_EQ(m, m->Unref());
758     delete m;
759   }
760   to_delete.clear();
761 }
762 
TEST_F(MemTableListTest,EmptyAtomicFlusTest)763 TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
764   autovector<MemTableList*> lists;
765   autovector<uint32_t> cf_ids;
766   autovector<const MutableCFOptions*> options_list;
767   autovector<const autovector<MemTable*>*> to_flush;
768   autovector<MemTable*> to_delete;
769   Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
770                                                     to_flush, &to_delete);
771   ASSERT_OK(s);
772   ASSERT_TRUE(to_delete.empty());
773 }
774 
TEST_F(MemTableListTest,AtomicFlusTest)775 TEST_F(MemTableListTest, AtomicFlusTest) {
776   const int num_cfs = 3;
777   const int num_tables_per_cf = 2;
778   SequenceNumber seq = 1;
779 
780   auto factory = std::make_shared<SkipListFactory>();
781   options.memtable_factory = factory;
782   ImmutableCFOptions ioptions(options);
783   InternalKeyComparator cmp(BytewiseComparator());
784   WriteBufferManager wb(options.db_write_buffer_size);
785 
786   // Create MemTableLists
787   int min_write_buffer_number_to_merge = 3;
788   int max_write_buffer_number_to_maintain = 7;
789   int64_t max_write_buffer_size_to_maintain =
790       7 * static_cast<int64_t>(options.write_buffer_size);
791   autovector<MemTableList*> lists;
792   for (int i = 0; i != num_cfs; ++i) {
793     lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
794                                         max_write_buffer_number_to_maintain,
795                                         max_write_buffer_size_to_maintain));
796   }
797 
798   autovector<uint32_t> cf_ids;
799   std::vector<std::vector<MemTable*>> tables(num_cfs);
800   autovector<const MutableCFOptions*> mutable_cf_options_list;
801   uint32_t cf_id = 0;
802   for (auto& elem : tables) {
803     mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
804     uint64_t memtable_id = 0;
805     for (int i = 0; i != num_tables_per_cf; ++i) {
806       MemTable* mem =
807           new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
808                        kMaxSequenceNumber, cf_id);
809       mem->SetID(memtable_id++);
810       mem->Ref();
811 
812       std::string value;
813 
814       mem->Add(++seq, kTypeValue, "key1", ToString(i));
815       mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
816       mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
817       mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
818       mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
819 
820       elem.push_back(mem);
821     }
822     cf_ids.push_back(cf_id++);
823   }
824 
825   std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
826 
827   // Nothing to flush
828   for (auto i = 0; i != num_cfs; ++i) {
829     auto* list = lists[i];
830     ASSERT_FALSE(list->IsFlushPending());
831     ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
832     list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
833     ASSERT_EQ(0, flush_candidates[i].size());
834   }
835   // Request flush even though there is nothing to flush
836   for (auto i = 0; i != num_cfs; ++i) {
837     auto* list = lists[i];
838     list->FlushRequested();
839     ASSERT_FALSE(list->IsFlushPending());
840     ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
841   }
842   autovector<MemTable*> to_delete;
843   // Add tables to the immutable memtalbe lists associated with column families
844   for (auto i = 0; i != num_cfs; ++i) {
845     for (auto j = 0; j != num_tables_per_cf; ++j) {
846       lists[i]->Add(tables[i][j], &to_delete);
847     }
848     ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
849     ASSERT_TRUE(lists[i]->IsFlushPending());
850     ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
851   }
852   std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
853   //          +----+
854   // list[0]: |0  1|
855   // list[1]: |0  1|
856   //          | +--+
857   // list[2]: |0| 1
858   //          +-+
859   // Pick memtables to flush
860   for (auto i = 0; i != num_cfs; ++i) {
861     flush_candidates[i].clear();
862     lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
863                                    &flush_candidates[i]);
864     ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
865               static_cast<uint64_t>(flush_candidates[i].size()));
866   }
867   autovector<MemTableList*> tmp_lists;
868   autovector<uint32_t> tmp_cf_ids;
869   autovector<const MutableCFOptions*> tmp_options_list;
870   autovector<const autovector<MemTable*>*> to_flush;
871   for (auto i = 0; i != num_cfs; ++i) {
872     if (!flush_candidates[i].empty()) {
873       to_flush.push_back(&flush_candidates[i]);
874       tmp_lists.push_back(lists[i]);
875       tmp_cf_ids.push_back(i);
876       tmp_options_list.push_back(mutable_cf_options_list[i]);
877     }
878   }
879   Status s = Mock_InstallMemtableAtomicFlushResults(
880       tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete);
881   ASSERT_OK(s);
882 
883   for (auto i = 0; i != num_cfs; ++i) {
884     for (auto j = 0; j != num_tables_per_cf; ++j) {
885       if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
886         ASSERT_LT(0, tables[i][j]->GetFileNumber());
887       }
888     }
889     ASSERT_EQ(
890         static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
891         lists[i]->NumNotFlushed());
892   }
893 
894   to_delete.clear();
895   for (auto list : lists) {
896     list->current()->Unref(&to_delete);
897     delete list;
898   }
899   for (auto& mutable_cf_options : mutable_cf_options_list) {
900     if (mutable_cf_options != nullptr) {
901       delete mutable_cf_options;
902       mutable_cf_options = nullptr;
903     }
904   }
905   // All memtables in tables array must have been flushed, thus ready to be
906   // deleted.
907   ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
908   for (const auto& m : to_delete) {
909     // Refcount should be 0 after calling InstallMemtableFlushResults.
910     // Verify this by Ref'ing and then Unref'ing.
911     m->Ref();
912     ASSERT_EQ(m, m->Unref());
913     delete m;
914   }
915 }
916 
917 }  // namespace ROCKSDB_NAMESPACE
918 
main(int argc,char ** argv)919 int main(int argc, char** argv) {
920   ::testing::InitGoogleTest(&argc, argv);
921   return RUN_ALL_TESTS();
922 }
923