1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/transaction_test.h"
9 
10 #include <algorithm>
11 #include <atomic>
12 #include <cinttypes>
13 #include <functional>
14 #include <string>
15 #include <thread>
16 
17 #include "db/db_impl/db_impl.h"
18 #include "db/dbformat.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/types.h"
22 #include "rocksdb/utilities/debug.h"
23 #include "rocksdb/utilities/transaction.h"
24 #include "rocksdb/utilities/transaction_db.h"
25 #include "table/mock_table.h"
26 #include "test_util/fault_injection_test_env.h"
27 #include "test_util/sync_point.h"
28 #include "test_util/testharness.h"
29 #include "test_util/testutil.h"
30 #include "test_util/transaction_test_util.h"
31 #include "util/mutexlock.h"
32 #include "util/random.h"
33 #include "util/string_util.h"
34 #include "utilities/merge_operators.h"
35 #include "utilities/merge_operators/string_append/stringappend.h"
36 #include "utilities/transactions/pessimistic_transaction_db.h"
37 #include "utilities/transactions/write_prepared_txn_db.h"
38 
39 #include "port/port.h"
40 
41 using std::string;
42 
43 namespace ROCKSDB_NAMESPACE {
44 
45 using CommitEntry = WritePreparedTxnDB::CommitEntry;
46 using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
47 using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
48 
TEST(PreparedHeap,BasicsTest)49 TEST(PreparedHeap, BasicsTest) {
50   WritePreparedTxnDB::PreparedHeap heap;
51   {
52     MutexLock ml(heap.push_pop_mutex());
53     heap.push(14l);
54     // Test with one element
55     ASSERT_EQ(14l, heap.top());
56     heap.push(24l);
57     heap.push(34l);
58     // Test that old min is still on top
59     ASSERT_EQ(14l, heap.top());
60     heap.push(44l);
61     heap.push(54l);
62     heap.push(64l);
63     heap.push(74l);
64     heap.push(84l);
65   }
66   // Test that old min is still on top
67   ASSERT_EQ(14l, heap.top());
68   heap.erase(24l);
69   // Test that old min is still on top
70   ASSERT_EQ(14l, heap.top());
71   heap.erase(14l);
72   // Test that the new comes to the top after multiple erase
73   ASSERT_EQ(34l, heap.top());
74   heap.erase(34l);
75   // Test that the new comes to the top after single erase
76   ASSERT_EQ(44l, heap.top());
77   heap.erase(54l);
78   ASSERT_EQ(44l, heap.top());
79   heap.pop();  // pop 44l
80   // Test that the erased items are ignored after pop
81   ASSERT_EQ(64l, heap.top());
82   heap.erase(44l);
83   // Test that erasing an already popped item would work
84   ASSERT_EQ(64l, heap.top());
85   heap.erase(84l);
86   ASSERT_EQ(64l, heap.top());
87   {
88     MutexLock ml(heap.push_pop_mutex());
89     heap.push(85l);
90     heap.push(86l);
91     heap.push(87l);
92     heap.push(88l);
93     heap.push(89l);
94   }
95   heap.erase(87l);
96   heap.erase(85l);
97   heap.erase(89l);
98   heap.erase(86l);
99   heap.erase(88l);
100   // Test top remains the same after a random order of many erases
101   ASSERT_EQ(64l, heap.top());
102   heap.pop();
103   // Test that pop works with a series of random pending erases
104   ASSERT_EQ(74l, heap.top());
105   ASSERT_FALSE(heap.empty());
106   heap.pop();
107   // Test that empty works
108   ASSERT_TRUE(heap.empty());
109 }
110 
111 // This is a scenario reconstructed from a buggy trace. Test that the bug does
112 // not resurface again.
TEST(PreparedHeap,EmptyAtTheEnd)113 TEST(PreparedHeap, EmptyAtTheEnd) {
114   WritePreparedTxnDB::PreparedHeap heap;
115   {
116     MutexLock ml(heap.push_pop_mutex());
117     heap.push(40l);
118   }
119   ASSERT_EQ(40l, heap.top());
120   // Although not a recommended scenario, we must be resilient against erase
121   // without a prior push.
122   heap.erase(50l);
123   ASSERT_EQ(40l, heap.top());
124   {
125     MutexLock ml(heap.push_pop_mutex());
126     heap.push(60l);
127   }
128   ASSERT_EQ(40l, heap.top());
129 
130   heap.erase(60l);
131   ASSERT_EQ(40l, heap.top());
132   heap.erase(40l);
133   ASSERT_TRUE(heap.empty());
134 
135   {
136     MutexLock ml(heap.push_pop_mutex());
137     heap.push(40l);
138   }
139   ASSERT_EQ(40l, heap.top());
140   heap.erase(50l);
141   ASSERT_EQ(40l, heap.top());
142   {
143     MutexLock ml(heap.push_pop_mutex());
144     heap.push(60l);
145   }
146   ASSERT_EQ(40l, heap.top());
147 
148   heap.erase(40l);
149   // Test that the erase has not emptied the heap (we had a bug doing that)
150   ASSERT_FALSE(heap.empty());
151   ASSERT_EQ(60l, heap.top());
152   heap.erase(60l);
153   ASSERT_TRUE(heap.empty());
154 }
155 
156 // Generate random order of PreparedHeap access and test that the heap will be
157 // successfully emptied at the end.
TEST(PreparedHeap,Concurrent)158 TEST(PreparedHeap, Concurrent) {
159   const size_t t_cnt = 10;
160   ROCKSDB_NAMESPACE::port::Thread t[t_cnt + 1];
161   WritePreparedTxnDB::PreparedHeap heap;
162   port::RWMutex prepared_mutex;
163   std::atomic<size_t> last;
164 
165   for (size_t n = 0; n < 100; n++) {
166     last = 0;
167     t[0] = ROCKSDB_NAMESPACE::port::Thread([&]() {
168       Random rnd(1103);
169       for (size_t seq = 1; seq <= t_cnt; seq++) {
170         // This is not recommended usage but we should be resilient against it.
171         bool skip_push = rnd.OneIn(5);
172         if (!skip_push) {
173           MutexLock ml(heap.push_pop_mutex());
174           std::this_thread::yield();
175           heap.push(seq);
176           last.store(seq);
177         }
178       }
179     });
180     for (size_t i = 1; i <= t_cnt; i++) {
181       t[i] =
182           ROCKSDB_NAMESPACE::port::Thread([&heap, &prepared_mutex, &last, i]() {
183             auto seq = i;
184             do {
185               std::this_thread::yield();
186             } while (last.load() < seq);
187             WriteLock wl(&prepared_mutex);
188             heap.erase(seq);
189           });
190     }
191     for (size_t i = 0; i <= t_cnt; i++) {
192       t[i].join();
193     }
194     ASSERT_TRUE(heap.empty());
195   }
196 }
197 
198 // Test that WriteBatchWithIndex correctly counts the number of sub-batches
TEST(WriteBatchWithIndex,SubBatchCnt)199 TEST(WriteBatchWithIndex, SubBatchCnt) {
200   ColumnFamilyOptions cf_options;
201   std::string cf_name = "two";
202   DB* db;
203   Options options;
204   options.create_if_missing = true;
205   const std::string dbname = test::PerThreadDBPath("transaction_testdb");
206   DestroyDB(dbname, options);
207   ASSERT_OK(DB::Open(options, dbname, &db));
208   ColumnFamilyHandle* cf_handle = nullptr;
209   ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
210   WriteOptions write_options;
211   size_t batch_cnt = 1;
212   size_t save_points = 0;
213   std::vector<size_t> batch_cnt_at;
214   WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
215                             0);
216   ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
217   batch_cnt_at.push_back(batch_cnt);
218   batch.SetSavePoint();
219   save_points++;
220   batch.Put(Slice("key"), Slice("value"));
221   ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
222   batch_cnt_at.push_back(batch_cnt);
223   batch.SetSavePoint();
224   save_points++;
225   batch.Put(Slice("key2"), Slice("value2"));
226   ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
227   // duplicate the keys
228   batch_cnt_at.push_back(batch_cnt);
229   batch.SetSavePoint();
230   save_points++;
231   batch.Put(Slice("key"), Slice("value3"));
232   batch_cnt++;
233   ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
234   // duplicate the 2nd key. It should not be counted duplicate since a
235   // sub-patch is cut after the last duplicate.
236   batch_cnt_at.push_back(batch_cnt);
237   batch.SetSavePoint();
238   save_points++;
239   batch.Put(Slice("key2"), Slice("value4"));
240   ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
241   // duplicate the keys but in a different cf. It should not be counted as
242   // duplicate keys
243   batch_cnt_at.push_back(batch_cnt);
244   batch.SetSavePoint();
245   save_points++;
246   batch.Put(cf_handle, Slice("key"), Slice("value5"));
247   ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
248 
249   // Test that the number of sub-batches matches what we count with
250   // SubBatchCounter
251   std::map<uint32_t, const Comparator*> comparators;
252   comparators[0] = db->DefaultColumnFamily()->GetComparator();
253   comparators[cf_handle->GetID()] = cf_handle->GetComparator();
254   SubBatchCounter counter(comparators);
255   ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
256   ASSERT_EQ(batch_cnt, counter.BatchCount());
257 
258   // Test that RollbackToSavePoint will properly resets the number of
259   // sub-batches
260   for (size_t i = save_points; i > 0; i--) {
261     batch.RollbackToSavePoint();
262     ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
263   }
264 
265   // Test the count is right with random batches
266   {
267     const size_t TOTAL_KEYS = 20;  // 20 ~= 10 to cause a few randoms
268     Random rnd(1131);
269     std::string keys[TOTAL_KEYS];
270     for (size_t k = 0; k < TOTAL_KEYS; k++) {
271       int len = static_cast<int>(rnd.Uniform(50));
272       keys[k] = test::RandomKey(&rnd, len);
273     }
274     for (size_t i = 0; i < 1000; i++) {  // 1000 random batches
275       WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
276                                    0, true, 0);
277       for (size_t k = 0; k < 10; k++) {  // 10 key per batch
278         size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
279         Slice key = Slice(keys[ki]);
280         std::string buffer;
281         Slice value = Slice(test::RandomString(&rnd, 16, &buffer));
282         rndbatch.Put(key, value);
283       }
284       SubBatchCounter batch_counter(comparators);
285       ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
286       ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
287     }
288   }
289 
290   delete cf_handle;
291   delete db;
292 }
293 
TEST(CommitEntry64b,BasicTest)294 TEST(CommitEntry64b, BasicTest) {
295   const size_t INDEX_BITS = static_cast<size_t>(21);
296   const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
297   const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
298 
299   // zero-initialized CommitEntry64b should indicate an empty entry
300   CommitEntry64b empty_entry64b;
301   uint64_t empty_index = 11ul;
302   CommitEntry empty_entry;
303   bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
304   ASSERT_FALSE(ok);
305 
306   // the zero entry is reserved for un-initialized entries
307   const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
308   // Samples over the numbers that are covered by that many index bits
309   std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
310   // Samples over the numbers that are covered by that many commit bits
311   std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
312   // Iterate over prepare numbers that have i) cover all bits of a sequence
313   // number, and ii) include some bits that fall into the range of index or
314   // commit bits
315   for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
316     for (uint64_t i : is) {
317       for (uint64_t d : ds) {
318         uint64_t p = base + i + d;
319         for (uint64_t c : {p, p + d / 2, p + d}) {
320           uint64_t index = p % INDEX_SIZE;
321           CommitEntry before(p, c), after;
322           CommitEntry64b entry64b(before, FORMAT);
323           ok = entry64b.Parse(index, &after, FORMAT);
324           ASSERT_TRUE(ok);
325           if (!(before == after)) {
326             printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
327                    " c %" PRIu64 " index %" PRIu64 "\n",
328                    base, i, d, p, c, index);
329           }
330           ASSERT_EQ(before, after);
331         }
332       }
333     }
334   }
335 }
336 
337 class WritePreparedTxnDBMock : public WritePreparedTxnDB {
338  public:
WritePreparedTxnDBMock(DBImpl * db_impl,TransactionDBOptions & opt)339   WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
340       : WritePreparedTxnDB(db_impl, opt) {}
SetDBSnapshots(const std::vector<SequenceNumber> & snapshots)341   void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
342     snapshots_ = snapshots;
343   }
TakeSnapshot(SequenceNumber seq)344   void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
345 
346  protected:
GetSnapshotListFromDB(SequenceNumber)347   const std::vector<SequenceNumber> GetSnapshotListFromDB(
348       SequenceNumber /* unused */) override {
349     return snapshots_;
350   }
351 
352  private:
353   std::vector<SequenceNumber> snapshots_;
354 };
355 
356 class WritePreparedTransactionTestBase : public TransactionTestBase {
357  public:
WritePreparedTransactionTestBase(bool use_stackable_db,bool two_write_queue,TxnDBWritePolicy write_policy,WriteOrdering write_ordering)358   WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue,
359                                    TxnDBWritePolicy write_policy,
360                                    WriteOrdering write_ordering)
361       : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
362                             write_ordering){};
363 
364  protected:
UpdateTransactionDBOptions(size_t snapshot_cache_bits,size_t commit_cache_bits)365   void UpdateTransactionDBOptions(size_t snapshot_cache_bits,
366                                   size_t commit_cache_bits) {
367     txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
368     txn_db_options.wp_commit_cache_bits = commit_cache_bits;
369   }
UpdateTransactionDBOptions(size_t snapshot_cache_bits)370   void UpdateTransactionDBOptions(size_t snapshot_cache_bits) {
371     txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
372   }
373   // If expect_update is set, check if it actually updated old_commit_map_. If
374   // it did not and yet suggested not to check the next snapshot, do the
375   // opposite to check if it was not a bad suggestion.
MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare,uint64_t commit,uint64_t snapshot,uint64_t next_snapshot,bool expect_update)376   void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
377                                            uint64_t snapshot,
378                                            uint64_t next_snapshot,
379                                            bool expect_update) {
380     WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
381     // reset old_commit_map_empty_ so that its value indicate whether
382     // old_commit_map_ was updated
383     wp_db->old_commit_map_empty_ = true;
384     bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
385                                                      snapshot < next_snapshot);
386     if (expect_update == wp_db->old_commit_map_empty_) {
387       printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
388              " next: %" PRIu64 "\n",
389              prepare, commit, snapshot, next_snapshot);
390     }
391     EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
392     if (!check_next && wp_db->old_commit_map_empty_) {
393       // do the opposite to make sure it was not a bad suggestion
394       const bool dont_care_bool = true;
395       wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
396                                      dont_care_bool);
397       if (!wp_db->old_commit_map_empty_) {
398         printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
399                " next: %" PRIu64 "\n",
400                prepare, commit, snapshot, next_snapshot);
401       }
402       EXPECT_TRUE(wp_db->old_commit_map_empty_);
403     }
404   }
405 
406   // Test that a CheckAgainstSnapshots thread reading old_snapshots will not
407   // miss a snapshot because of a concurrent update by UpdateSnapshots that is
408   // writing new_snapshots. Both threads are broken at two points. The sync
409   // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
410   // entry is expected to be vital for one of the snapshots that is common
411   // between the old and new list of snapshots.
SnapshotConcurrentAccessTestInternal(WritePreparedTxnDB * wp_db,const std::vector<SequenceNumber> & old_snapshots,const std::vector<SequenceNumber> & new_snapshots,CommitEntry & entry,SequenceNumber & version,size_t a1,size_t a2,size_t b1,size_t b2)412   void SnapshotConcurrentAccessTestInternal(
413       WritePreparedTxnDB* wp_db,
414       const std::vector<SequenceNumber>& old_snapshots,
415       const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
416       SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
417     // First reset the snapshot list
418     const std::vector<SequenceNumber> empty_snapshots;
419     wp_db->old_commit_map_empty_ = true;
420     wp_db->UpdateSnapshots(empty_snapshots, ++version);
421     // Then initialize it with the old_snapshots
422     wp_db->UpdateSnapshots(old_snapshots, ++version);
423 
424     // Starting from the first thread, cut each thread at two points
425     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
426         {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
427          "WritePreparedTxnDB::UpdateSnapshots:s:start"},
428         {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
429          "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
430         {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
431          "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
432         {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
433          "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
434         {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
435          "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
436     });
437     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
438     {
439       ASSERT_TRUE(wp_db->old_commit_map_empty_);
440       ROCKSDB_NAMESPACE::port::Thread t1(
441           [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
442       ROCKSDB_NAMESPACE::port::Thread t2(
443           [&]() { wp_db->CheckAgainstSnapshots(entry); });
444       t1.join();
445       t2.join();
446       ASSERT_FALSE(wp_db->old_commit_map_empty_);
447     }
448     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
449 
450     wp_db->old_commit_map_empty_ = true;
451     wp_db->UpdateSnapshots(empty_snapshots, ++version);
452     wp_db->UpdateSnapshots(old_snapshots, ++version);
453     // Starting from the second thread, cut each thread at two points
454     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
455         {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
456          "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
457         {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
458          "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
459         {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
460          "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
461         {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
462          "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
463         {"WritePreparedTxnDB::UpdateSnapshots:p:end",
464          "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
465     });
466     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
467     {
468       ASSERT_TRUE(wp_db->old_commit_map_empty_);
469       ROCKSDB_NAMESPACE::port::Thread t1(
470           [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
471       ROCKSDB_NAMESPACE::port::Thread t2(
472           [&]() { wp_db->CheckAgainstSnapshots(entry); });
473       t1.join();
474       t2.join();
475       ASSERT_FALSE(wp_db->old_commit_map_empty_);
476     }
477     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
478   }
479 
480   // Verify value of keys.
VerifyKeys(const std::unordered_map<std::string,std::string> & data,const Snapshot * snapshot=nullptr)481   void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
482                   const Snapshot* snapshot = nullptr) {
483     std::string value;
484     ReadOptions read_options;
485     read_options.snapshot = snapshot;
486     for (auto& kv : data) {
487       auto s = db->Get(read_options, kv.first, &value);
488       ASSERT_TRUE(s.ok() || s.IsNotFound());
489       if (s.ok()) {
490         if (kv.second != value) {
491           printf("key = %s\n", kv.first.c_str());
492         }
493         ASSERT_EQ(kv.second, value);
494       } else {
495         ASSERT_EQ(kv.second, "NOT_FOUND");
496       }
497 
498       // Try with MultiGet API too
499       std::vector<std::string> values;
500       auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()},
501                                 {kv.first}, &values);
502       ASSERT_EQ(1, values.size());
503       ASSERT_EQ(1, s_vec.size());
504       s = s_vec[0];
505       ASSERT_TRUE(s.ok() || s.IsNotFound());
506       if (s.ok()) {
507         ASSERT_TRUE(kv.second == values[0]);
508       } else {
509         ASSERT_EQ(kv.second, "NOT_FOUND");
510       }
511     }
512   }
513 
514   // Verify all versions of keys.
VerifyInternalKeys(const std::vector<KeyVersion> & expected_versions)515   void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
516     std::vector<KeyVersion> versions;
517     const size_t kMaxKeys = 100000;
518     ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
519                                 expected_versions.back().user_key, kMaxKeys,
520                                 &versions));
521     ASSERT_EQ(expected_versions.size(), versions.size());
522     for (size_t i = 0; i < versions.size(); i++) {
523       ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
524       ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
525       ASSERT_EQ(expected_versions[i].type, versions[i].type);
526       if (versions[i].type != kTypeDeletion &&
527           versions[i].type != kTypeSingleDeletion) {
528         ASSERT_EQ(expected_versions[i].value, versions[i].value);
529       }
530       // Range delete not supported.
531       assert(expected_versions[i].type != kTypeRangeDeletion);
532     }
533   }
534 };
535 
536 class WritePreparedTransactionTest
537     : public WritePreparedTransactionTestBase,
538       virtual public ::testing::WithParamInterface<
539           std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering>> {
540  public:
WritePreparedTransactionTest()541   WritePreparedTransactionTest()
542       : WritePreparedTransactionTestBase(
543             std::get<0>(GetParam()), std::get<1>(GetParam()),
544             std::get<2>(GetParam()), std::get<3>(GetParam())){};
545 };
546 
547 #ifndef ROCKSDB_VALGRIND_RUN
548 class SnapshotConcurrentAccessTest
549     : public WritePreparedTransactionTestBase,
550       virtual public ::testing::WithParamInterface<std::tuple<
551           bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> {
552  public:
SnapshotConcurrentAccessTest()553   SnapshotConcurrentAccessTest()
554       : WritePreparedTransactionTestBase(
555             std::get<0>(GetParam()), std::get<1>(GetParam()),
556             std::get<2>(GetParam()), std::get<3>(GetParam())),
557         split_id_(std::get<4>(GetParam())),
558         split_cnt_(std::get<5>(GetParam())){};
559 
560  protected:
561   // A test is split into split_cnt_ tests, each identified with split_id_ where
562   // 0 <= split_id_ < split_cnt_
563   size_t split_id_;
564   size_t split_cnt_;
565 };
566 #endif  // ROCKSDB_VALGRIND_RUN
567 
568 class SeqAdvanceConcurrentTest
569     : public WritePreparedTransactionTestBase,
570       virtual public ::testing::WithParamInterface<std::tuple<
571           bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> {
572  public:
SeqAdvanceConcurrentTest()573   SeqAdvanceConcurrentTest()
574       : WritePreparedTransactionTestBase(
575             std::get<0>(GetParam()), std::get<1>(GetParam()),
576             std::get<2>(GetParam()), std::get<3>(GetParam())),
577         split_id_(std::get<4>(GetParam())),
578         split_cnt_(std::get<5>(GetParam())){};
579 
580  protected:
581   // A test is split into split_cnt_ tests, each identified with split_id_ where
582   // 0 <= split_id_ < split_cnt_
583   size_t split_id_;
584   size_t split_cnt_;
585 };
586 
587 INSTANTIATE_TEST_CASE_P(
588     WritePreparedTransaction, WritePreparedTransactionTest,
589     ::testing::Values(
590         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
591         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
592         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite)));
593 
594 #ifndef ROCKSDB_VALGRIND_RUN
595 INSTANTIATE_TEST_CASE_P(
596     TwoWriteQueues, SnapshotConcurrentAccessTest,
597     ::testing::Values(
598         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20),
599         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20),
600         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20),
601         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20),
602         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20),
603         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20),
604         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20),
605         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20),
606         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20),
607         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20),
608         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20),
609         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20),
610         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20),
611         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20),
612         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20),
613         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20),
614         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20),
615         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20),
616         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20),
617         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20),
618 
619         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20),
620         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20),
621         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20),
622         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20),
623         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20),
624         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20),
625         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20),
626         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20),
627         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20),
628         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20),
629         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20),
630         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20),
631         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20),
632         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20),
633         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20),
634         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20),
635         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20),
636         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20),
637         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20),
638         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20)));
639 
640 INSTANTIATE_TEST_CASE_P(
641     OneWriteQueue, SnapshotConcurrentAccessTest,
642     ::testing::Values(
643         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20),
644         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20),
645         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20),
646         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20),
647         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20),
648         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20),
649         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20),
650         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20),
651         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20),
652         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20),
653         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20),
654         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20),
655         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20),
656         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20),
657         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20),
658         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20),
659         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20),
660         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20),
661         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20),
662         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20)));
663 
664 INSTANTIATE_TEST_CASE_P(
665     TwoWriteQueues, SeqAdvanceConcurrentTest,
666     ::testing::Values(
667         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10),
668         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10),
669         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10),
670         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10),
671         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10),
672         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10),
673         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10),
674         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10),
675         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10),
676         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10),
677         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10),
678         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10),
679         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10),
680         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10),
681         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10),
682         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10),
683         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10),
684         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10),
685         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10),
686         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10)));
687 
688 INSTANTIATE_TEST_CASE_P(
689     OneWriteQueue, SeqAdvanceConcurrentTest,
690     ::testing::Values(
691         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10),
692         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10),
693         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10),
694         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10),
695         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10),
696         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10),
697         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10),
698         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10),
699         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10),
700         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10)));
701 #endif  // ROCKSDB_VALGRIND_RUN
702 
TEST_P(WritePreparedTransactionTest,CommitMap)703 TEST_P(WritePreparedTransactionTest, CommitMap) {
704   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
705   assert(wp_db);
706   assert(wp_db->db_impl_);
707   size_t size = wp_db->COMMIT_CACHE_SIZE;
708   CommitEntry c = {5, 12}, e;
709   bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
710   ASSERT_FALSE(evicted);
711 
712   // Should be able to read the same value
713   CommitEntry64b dont_care;
714   bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
715   ASSERT_TRUE(found);
716   ASSERT_EQ(c, e);
717   // Should be able to distinguish between overlapping entries
718   found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
719   ASSERT_TRUE(found);
720   ASSERT_NE(c.prep_seq + size, e.prep_seq);
721   // Should be able to detect non-existent entry
722   found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
723   ASSERT_FALSE(found);
724 
725   // Reject an invalid exchange
726   CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
727   CommitEntry64b e2_64b(e2, wp_db->FORMAT);
728   bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
729   ASSERT_FALSE(exchanged);
730   // check whether it did actually reject that
731   found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
732   ASSERT_TRUE(found);
733   ASSERT_EQ(c, e);
734 
735   // Accept a valid exchange
736   CommitEntry64b c_64b(c, wp_db->FORMAT);
737   CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
738   exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
739   ASSERT_TRUE(exchanged);
740   // check whether it did actually accepted that
741   found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
742   ASSERT_TRUE(found);
743   ASSERT_EQ(e3, e);
744 
745   // Rewrite an entry
746   CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
747   evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
748   ASSERT_TRUE(evicted);
749   ASSERT_EQ(e3, e);
750   found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
751   ASSERT_TRUE(found);
752   ASSERT_EQ(e4, e);
753 }
754 
TEST_P(WritePreparedTransactionTest,MaybeUpdateOldCommitMap)755 TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
756   // If prepare <= snapshot < commit we should keep the entry around since its
757   // nonexistence could be interpreted as committed in the snapshot while it is
758   // not true. We keep such entries around by adding them to the
759   // old_commit_map_.
760   uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
761   p = 10l, c = 15l, s = 20l, ns = 21l;
762   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
763   // If we do not expect the old commit map to be updated, try also with a next
764   // snapshot that is expected to update the old commit map. This would test
765   // that MaybeUpdateOldCommitMap would not prevent us from checking the next
766   // snapshot that must be checked.
767   p = 10l, c = 15l, s = 20l, ns = 11l;
768   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
769 
770   p = 10l, c = 20l, s = 20l, ns = 19l;
771   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
772   p = 10l, c = 20l, s = 20l, ns = 21l;
773   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
774 
775   p = 20l, c = 20l, s = 20l, ns = 21l;
776   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
777   p = 20l, c = 20l, s = 20l, ns = 19l;
778   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
779 
780   p = 10l, c = 25l, s = 20l, ns = 21l;
781   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
782 
783   p = 20l, c = 25l, s = 20l, ns = 21l;
784   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
785 
786   p = 21l, c = 25l, s = 20l, ns = 22l;
787   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
788   p = 21l, c = 25l, s = 20l, ns = 19l;
789   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
790 }
791 
792 // Trigger the condition where some old memtables are skipped when doing
793 // TransactionUtil::CheckKey(), and make sure the result is still correct.
TEST_P(WritePreparedTransactionTest,CheckKeySkipOldMemtable)794 TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
795   const int kAttemptHistoryMemtable = 0;
796   const int kAttemptImmMemTable = 1;
797   for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
798        attempt++) {
799     options.max_write_buffer_number_to_maintain = 3;
800     ReOpen();
801 
802     WriteOptions write_options;
803     ReadOptions read_options;
804     TransactionOptions txn_options;
805     txn_options.set_snapshot = true;
806     string value;
807     Status s;
808 
809     ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
810     ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
811 
812     Transaction* txn = db->BeginTransaction(write_options, txn_options);
813     ASSERT_TRUE(txn != nullptr);
814     ASSERT_OK(txn->SetName("txn"));
815 
816     Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
817     ASSERT_TRUE(txn2 != nullptr);
818     ASSERT_OK(txn2->SetName("txn2"));
819 
820     // This transaction is created to cause potential conflict.
821     Transaction* txn_x = db->BeginTransaction(write_options);
822     ASSERT_OK(txn_x->SetName("txn_x"));
823     ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3")));
824     ASSERT_OK(txn_x->Prepare());
825 
826     // Create snapshots after the prepare, but there should still
827     // be a conflict when trying to read "foo".
828 
829     if (attempt == kAttemptImmMemTable) {
830       // For the second attempt, hold flush from beginning. The memtable
831       // will be switched to immutable after calling TEST_SwitchMemtable()
832       // while CheckKey() is called.
833       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
834           {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable",
835             "FlushJob::Start"}});
836       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
837     }
838 
839     // force a memtable flush. The memtable should still be kept
840     FlushOptions flush_ops;
841     if (attempt == kAttemptHistoryMemtable) {
842       ASSERT_OK(db->Flush(flush_ops));
843     } else {
844       assert(attempt == kAttemptImmMemTable);
845       DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB());
846       db_impl->TEST_SwitchMemtable();
847     }
848     uint64_t num_imm_mems;
849     ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
850                                    &num_imm_mems));
851     if (attempt == kAttemptHistoryMemtable) {
852       ASSERT_EQ(0, num_imm_mems);
853     } else {
854       assert(attempt == kAttemptImmMemTable);
855       ASSERT_EQ(1, num_imm_mems);
856     }
857 
858     // Put something in active memtable
859     ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar")));
860 
861     // Create txn3 after flushing, but this transaction also needs to
862     // check all memtables because of they contains uncommitted data.
863     Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
864     ASSERT_TRUE(txn3 != nullptr);
865     ASSERT_OK(txn3->SetName("txn3"));
866 
867     // Commit the pending write
868     ASSERT_OK(txn_x->Commit());
869 
870     // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will
871     // pass. In all cases, both memtables are queried.
872     SetPerfLevel(PerfLevel::kEnableCount);
873     get_perf_context()->Reset();
874     ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy());
875     // We should have checked two memtables, active and either immutable
876     // or history memtable, depending on the test case.
877     ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
878 
879     get_perf_context()->Reset();
880     ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy());
881     // We should have checked two memtables, active and either immutable
882     // or history memtable, depending on the test case.
883     ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
884 
885     get_perf_context()->Reset();
886     ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value));
887     ASSERT_EQ(value, "bar");
888     // We should have checked two memtables, and since there is no
889     // conflict, another Get() will be made and fetch the data from
890     // DB. If it is in immutable memtable, two extra memtable reads
891     // will be issued. If it is not (in history), only one will
892     // be made, which is to the active memtable.
893     if (attempt == kAttemptHistoryMemtable) {
894       ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
895     } else {
896       assert(attempt == kAttemptImmMemTable);
897       ASSERT_EQ(4, get_perf_context()->get_from_memtable_count);
898     }
899 
900     Transaction* txn4 = db->BeginTransaction(write_options, txn_options);
901     ASSERT_TRUE(txn4 != nullptr);
902     ASSERT_OK(txn4->SetName("txn4"));
903     get_perf_context()->Reset();
904     ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value));
905     if (attempt == kAttemptHistoryMemtable) {
906       // Active memtable will be checked in snapshot validation and when
907       // getting the value.
908       ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
909     } else {
910       // Only active memtable will be checked in snapshot validation but
911       // both of active and immutable snapshot will be queried when
912       // getting the value.
913       assert(attempt == kAttemptImmMemTable);
914       ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
915     }
916 
917     ASSERT_OK(txn2->Commit());
918     ASSERT_OK(txn4->Commit());
919 
920     TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable");
921     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
922 
923     SetPerfLevel(PerfLevel::kDisable);
924 
925     delete txn;
926     delete txn2;
927     delete txn3;
928     delete txn4;
929     delete txn_x;
930   }
931 }
932 
933 // Reproduce the bug with two snapshots with the same seuqence number and test
934 // that the release of the first snapshot will not affect the reads by the other
935 // snapshot
TEST_P(WritePreparedTransactionTest,DoubleSnapshot)936 TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
937   TransactionOptions txn_options;
938   Status s;
939 
940   // Insert initial value
941   ASSERT_OK(db->Put(WriteOptions(), "key", "value1"));
942 
943   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
944   Transaction* txn =
945       wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr);
946   ASSERT_OK(txn->SetName("txn"));
947   ASSERT_OK(txn->Put("key", "value2"));
948   ASSERT_OK(txn->Prepare());
949   // Three snapshots with the same seq number
950   const Snapshot* snapshot0 = wp_db->GetSnapshot();
951   const Snapshot* snapshot1 = wp_db->GetSnapshot();
952   const Snapshot* snapshot2 = wp_db->GetSnapshot();
953   ASSERT_OK(txn->Commit());
954   SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE;
955   SequenceNumber overlap_seq = txn->GetId() + cache_size;
956   delete txn;
957 
958   // 4th snapshot with a larger seq
959   const Snapshot* snapshot3 = wp_db->GetSnapshot();
960   // Cause an eviction to advance max evicted seq number
961   // This also fetches the 4 snapshots from db since their seq is lower than the
962   // new max
963   wp_db->AddCommitted(overlap_seq, overlap_seq);
964 
965   ReadOptions ropt;
966   // It should see the value before commit
967   ropt.snapshot = snapshot2;
968   PinnableSlice pinnable_val;
969   s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
970   ASSERT_OK(s);
971   ASSERT_TRUE(pinnable_val == "value1");
972   pinnable_val.Reset();
973 
974   wp_db->ReleaseSnapshot(snapshot1);
975 
976   // It should still see the value before commit
977   s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
978   ASSERT_OK(s);
979   ASSERT_TRUE(pinnable_val == "value1");
980   pinnable_val.Reset();
981 
982   // Cause an eviction to advance max evicted seq number and trigger updating
983   // the snapshot list
984   overlap_seq += cache_size;
985   wp_db->AddCommitted(overlap_seq, overlap_seq);
986 
987   // It should still see the value before commit
988   s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
989   ASSERT_OK(s);
990   ASSERT_TRUE(pinnable_val == "value1");
991   pinnable_val.Reset();
992 
993   wp_db->ReleaseSnapshot(snapshot0);
994   wp_db->ReleaseSnapshot(snapshot2);
995   wp_db->ReleaseSnapshot(snapshot3);
996 }
997 
UniqueCnt(std::vector<SequenceNumber> vec)998 size_t UniqueCnt(std::vector<SequenceNumber> vec) {
999   std::set<SequenceNumber> aset;
1000   for (auto i : vec) {
1001     aset.insert(i);
1002   }
1003   return aset.size();
1004 }
1005 // Test that the entries in old_commit_map_ get garbage collected properly
TEST_P(WritePreparedTransactionTest,OldCommitMapGC)1006 TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
1007   const size_t snapshot_cache_bits = 0;
1008   const size_t commit_cache_bits = 0;
1009   DBImpl* mock_db = new DBImpl(options, dbname);
1010   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1011   std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1012       new WritePreparedTxnDBMock(mock_db, txn_db_options));
1013 
1014   SequenceNumber seq = 0;
1015   // Take the first snapshot that overlaps with two txn
1016   auto prep_seq = ++seq;
1017   wp_db->AddPrepared(prep_seq);
1018   auto prep_seq2 = ++seq;
1019   wp_db->AddPrepared(prep_seq2);
1020   auto snap_seq1 = seq;
1021   wp_db->TakeSnapshot(snap_seq1);
1022   auto commit_seq = ++seq;
1023   wp_db->AddCommitted(prep_seq, commit_seq);
1024   wp_db->RemovePrepared(prep_seq);
1025   auto commit_seq2 = ++seq;
1026   wp_db->AddCommitted(prep_seq2, commit_seq2);
1027   wp_db->RemovePrepared(prep_seq2);
1028   // Take the 2nd and 3rd snapshot that overlap with the same txn
1029   prep_seq = ++seq;
1030   wp_db->AddPrepared(prep_seq);
1031   auto snap_seq2 = seq;
1032   wp_db->TakeSnapshot(snap_seq2);
1033   seq++;
1034   auto snap_seq3 = seq;
1035   wp_db->TakeSnapshot(snap_seq3);
1036   seq++;
1037   commit_seq = ++seq;
1038   wp_db->AddCommitted(prep_seq, commit_seq);
1039   wp_db->RemovePrepared(prep_seq);
1040   // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
1041   // only item in the commit_cache_ via another commit.
1042   prep_seq = ++seq;
1043   wp_db->AddPrepared(prep_seq);
1044   commit_seq = ++seq;
1045   wp_db->AddCommitted(prep_seq, commit_seq);
1046   wp_db->RemovePrepared(prep_seq);
1047 
1048   // Verify that the evicted commit entries for all snapshots are in the
1049   // old_commit_map_
1050   {
1051     ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
1052     ReadLock rl(&wp_db->old_commit_map_mutex_);
1053     ASSERT_EQ(3, wp_db->old_commit_map_.size());
1054     ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
1055     ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2]));
1056     ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
1057   }
1058 
1059   // Verify that the 2nd snapshot is cleaned up after the release
1060   wp_db->ReleaseSnapshotInternal(snap_seq2);
1061   {
1062     ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
1063     ReadLock rl(&wp_db->old_commit_map_mutex_);
1064     ASSERT_EQ(2, wp_db->old_commit_map_.size());
1065     ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
1066     ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
1067   }
1068 
1069   // Verify that the 1st snapshot is cleaned up after the release
1070   wp_db->ReleaseSnapshotInternal(snap_seq1);
1071   {
1072     ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
1073     ReadLock rl(&wp_db->old_commit_map_mutex_);
1074     ASSERT_EQ(1, wp_db->old_commit_map_.size());
1075     ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
1076   }
1077 
1078   // Verify that the 3rd snapshot is cleaned up after the release
1079   wp_db->ReleaseSnapshotInternal(snap_seq3);
1080   {
1081     ASSERT_TRUE(wp_db->old_commit_map_empty_.load());
1082     ReadLock rl(&wp_db->old_commit_map_mutex_);
1083     ASSERT_EQ(0, wp_db->old_commit_map_.size());
1084   }
1085 }
1086 
TEST_P(WritePreparedTransactionTest,CheckAgainstSnapshots)1087 TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) {
1088   std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
1089                                            600l, 700l, 800l, 900l};
1090   const size_t snapshot_cache_bits = 2;
1091   const uint64_t cache_size = 1ul << snapshot_cache_bits;
1092   // Safety check to express the intended size in the test. Can be adjusted if
1093   // the snapshots lists changed.
1094   assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
1095   DBImpl* mock_db = new DBImpl(options, dbname);
1096   UpdateTransactionDBOptions(snapshot_cache_bits);
1097   std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1098       new WritePreparedTxnDBMock(mock_db, txn_db_options));
1099   SequenceNumber version = 1000l;
1100   ASSERT_EQ(0, wp_db->snapshots_total_);
1101   wp_db->UpdateSnapshots(snapshots, version);
1102   ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
1103   // seq numbers are chosen so that we have two of them between each two
1104   // snapshots. If the diff of two consecutive seq is more than 5, there is a
1105   // snapshot between them.
1106   std::vector<SequenceNumber> seqs = {50l,  55l,  150l, 155l, 250l, 255l, 350l,
1107                                       355l, 450l, 455l, 550l, 555l, 650l, 655l,
1108                                       750l, 755l, 850l, 855l, 950l, 955l};
1109   assert(seqs.size() > 1);
1110   for (size_t i = 0; i < seqs.size() - 1; i++) {
1111     wp_db->old_commit_map_empty_ = true;  // reset
1112     CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
1113     wp_db->CheckAgainstSnapshots(commit_entry);
1114     // Expect update if there is snapshot in between the prepare and commit
1115     bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
1116                          commit_entry.commit_seq >= snapshots.front() &&
1117                          commit_entry.prep_seq <= snapshots.back();
1118     ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
1119   }
1120 
1121   // Test that search will include multiple snapshot from snapshot cache
1122   {
1123     // exclude first and last item in the cache
1124     CommitEntry commit_entry = {snapshots.front() + 1,
1125                                 snapshots[cache_size - 1] - 1};
1126     wp_db->old_commit_map_empty_ = true;  // reset
1127     wp_db->old_commit_map_.clear();
1128     wp_db->CheckAgainstSnapshots(commit_entry);
1129     ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
1130   }
1131 
1132   // Test that search will include multiple snapshot from old snapshots
1133   {
1134     // include two in the middle
1135     CommitEntry commit_entry = {snapshots[cache_size] + 1,
1136                                 snapshots[cache_size + 2] + 1};
1137     wp_db->old_commit_map_empty_ = true;  // reset
1138     wp_db->old_commit_map_.clear();
1139     wp_db->CheckAgainstSnapshots(commit_entry);
1140     ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
1141   }
1142 
1143   // Test that search will include both snapshot cache and old snapshots
1144   // Case 1: includes all in snapshot cache
1145   {
1146     CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
1147     wp_db->old_commit_map_empty_ = true;  // reset
1148     wp_db->old_commit_map_.clear();
1149     wp_db->CheckAgainstSnapshots(commit_entry);
1150     ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
1151   }
1152 
1153   // Case 2: includes all snapshot caches except the smallest
1154   {
1155     CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
1156     wp_db->old_commit_map_empty_ = true;  // reset
1157     wp_db->old_commit_map_.clear();
1158     wp_db->CheckAgainstSnapshots(commit_entry);
1159     ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
1160   }
1161 
1162   // Case 3: includes only the largest of snapshot cache
1163   {
1164     CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
1165                                 snapshots.back() + 1};
1166     wp_db->old_commit_map_empty_ = true;  // reset
1167     wp_db->old_commit_map_.clear();
1168     wp_db->CheckAgainstSnapshots(commit_entry);
1169     ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
1170   }
1171 }
1172 
1173 // This test is too slow for travis
1174 #ifndef TRAVIS
1175 #ifndef ROCKSDB_VALGRIND_RUN
1176 // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
1177 // parallel with UpdateSnapshots.
TEST_P(SnapshotConcurrentAccessTest,SnapshotConcurrentAccess)1178 TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccess) {
1179   // We have a sync point in the method under test after checking each snapshot.
1180   // If you increase the max number of snapshots in this test, more sync points
1181   // in the methods must also be added.
1182   const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
1183                                                  60l, 70l, 80l, 90l, 100l};
1184   const size_t snapshot_cache_bits = 2;
1185   // Safety check to express the intended size in the test. Can be adjusted if
1186   // the snapshots lists changed.
1187   assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size());
1188   SequenceNumber version = 1000l;
1189   // Choose the cache size so that the new snapshot list could replace all the
1190   // existing items in the cache and also have some overflow.
1191   DBImpl* mock_db = new DBImpl(options, dbname);
1192   UpdateTransactionDBOptions(snapshot_cache_bits);
1193   std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1194       new WritePreparedTxnDBMock(mock_db, txn_db_options));
1195   const size_t extra = 2;
1196   size_t loop_id = 0;
1197   // Add up to extra items that do not fit into the cache
1198   for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra;
1199        old_size++) {
1200     const std::vector<SequenceNumber> old_snapshots(
1201         snapshots.begin(), snapshots.begin() + old_size);
1202 
1203     // Each member of old snapshot might or might not appear in the new list. We
1204     // create a common_snapshots for each combination.
1205     size_t new_comb_cnt = size_t(1) << old_size;
1206     for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) {
1207       if (loop_id % split_cnt_ != split_id_) continue;
1208       printf(".");  // To signal progress
1209       fflush(stdout);
1210       std::vector<SequenceNumber> common_snapshots;
1211       for (size_t i = 0; i < old_snapshots.size(); i++) {
1212         if (IsInCombination(i, new_comb)) {
1213           common_snapshots.push_back(old_snapshots[i]);
1214         }
1215       }
1216       // And add some new snapshots to the common list
1217       for (size_t added_snapshots = 0;
1218            added_snapshots <= snapshots.size() - old_snapshots.size();
1219            added_snapshots++) {
1220         std::vector<SequenceNumber> new_snapshots = common_snapshots;
1221         for (size_t i = 0; i < added_snapshots; i++) {
1222           new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
1223         }
1224         for (auto it = common_snapshots.begin(); it != common_snapshots.end();
1225              ++it) {
1226           auto snapshot = *it;
1227           // Create a commit entry that is around the snapshot and thus should
1228           // be not be discarded
1229           CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
1230                                snapshot + 1};
1231           // The critical part is when iterating the snapshot cache. Afterwards,
1232           // we are operating under the lock
1233           size_t a_range =
1234               std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
1235           size_t b_range =
1236               std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
1237           // Break each thread at two points
1238           for (size_t a1 = 1; a1 <= a_range; a1++) {
1239             for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
1240               for (size_t b1 = 1; b1 <= b_range; b1++) {
1241                 for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
1242                   SnapshotConcurrentAccessTestInternal(
1243                       wp_db.get(), old_snapshots, new_snapshots, entry, version,
1244                       a1, a2, b1, b2);
1245                 }
1246               }
1247             }
1248           }
1249         }
1250       }
1251     }
1252   }
1253   printf("\n");
1254 }
1255 #endif  // ROCKSDB_VALGRIND_RUN
1256 #endif  // TRAVIS
1257 
1258 // This test clarifies the contract of AdvanceMaxEvictedSeq method
TEST_P(WritePreparedTransactionTest,AdvanceMaxEvictedSeqBasic)1259 TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasic) {
1260   DBImpl* mock_db = new DBImpl(options, dbname);
1261   std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1262       new WritePreparedTxnDBMock(mock_db, txn_db_options));
1263 
1264   // 1. Set the initial values for max, prepared, and snapshots
1265   SequenceNumber zero_max = 0l;
1266   // Set the initial list of prepared txns
1267   const std::vector<SequenceNumber> initial_prepared = {10,  30,  50, 100,
1268                                                         150, 200, 250};
1269   for (auto p : initial_prepared) {
1270     wp_db->AddPrepared(p);
1271   }
1272   // This updates the max value and also set old prepared
1273   SequenceNumber init_max = 100;
1274   wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
1275   const std::vector<SequenceNumber> initial_snapshots = {20, 40};
1276   wp_db->SetDBSnapshots(initial_snapshots);
1277   // This will update the internal cache of snapshots from the DB
1278   wp_db->UpdateSnapshots(initial_snapshots, init_max);
1279 
1280   // 2. Invoke AdvanceMaxEvictedSeq
1281   const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
1282   wp_db->SetDBSnapshots(latest_snapshots);
1283   SequenceNumber new_max = 200;
1284   wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
1285 
1286   // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
1287   // a. max should be updated to new_max
1288   ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
1289   // b. delayed prepared should contain every txn <= max and prepared should
1290   // only contain txns > max
1291   auto it = initial_prepared.begin();
1292   for (; it != initial_prepared.end() && *it <= new_max; ++it) {
1293     ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
1294   }
1295   ASSERT_TRUE(wp_db->delayed_prepared_.empty());
1296   for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
1297        ++it, wp_db->prepared_txns_.pop()) {
1298     ASSERT_EQ(*it, wp_db->prepared_txns_.top());
1299   }
1300   ASSERT_TRUE(it == initial_prepared.end());
1301   ASSERT_TRUE(wp_db->prepared_txns_.empty());
1302   // c. snapshots should contain everything below new_max
1303   auto sit = latest_snapshots.begin();
1304   for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
1305                      i < wp_db->snapshots_total_;
1306        sit++, i++) {
1307     ASSERT_TRUE(i < wp_db->snapshots_total_);
1308     // This test is in small scale and the list of snapshots are assumed to be
1309     // within the cache size limit. This is just a safety check to double check
1310     // that assumption.
1311     ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
1312     ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
1313   }
1314 }
1315 
1316 // A new snapshot should always be always larger than max_evicted_seq_
1317 // Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
TEST_P(WritePreparedTransactionTest,NewSnapshotLargerThanMax)1318 TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
1319   WriteOptions woptions;
1320   TransactionOptions txn_options;
1321   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1322   Transaction* txn0 = db->BeginTransaction(woptions, txn_options);
1323   ASSERT_OK(txn0->Put(Slice("key"), Slice("value")));
1324   ASSERT_OK(txn0->Commit());
1325   const SequenceNumber seq = txn0->GetId();  // is also prepare seq
1326   delete txn0;
1327   std::vector<Transaction*> txns;
1328   // Inc seq without committing anything
1329   for (int i = 0; i < 10; i++) {
1330     Transaction* txn = db->BeginTransaction(woptions, txn_options);
1331     ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
1332     ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value")));
1333     ASSERT_OK(txn->Prepare());
1334     txns.push_back(txn);
1335   }
1336 
1337   // The new commit is seq + 10
1338   ASSERT_OK(db->Put(woptions, "key", "value"));
1339   auto snap = wp_db->GetSnapshot();
1340   const SequenceNumber last_seq = snap->GetSequenceNumber();
1341   wp_db->ReleaseSnapshot(snap);
1342   ASSERT_LT(seq, last_seq);
1343   // Otherwise our test is not effective
1344   ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED);
1345 
1346   // Evict seq out of commit cache
1347   const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE;
1348   // Check that the next write could make max go beyond last
1349   auto last_max = wp_db->max_evicted_seq_.load();
1350   wp_db->AddCommitted(overwrite_seq, overwrite_seq);
1351   // Check that eviction has advanced the max
1352   ASSERT_LT(last_max, wp_db->max_evicted_seq_.load());
1353   // Check that the new max has not advanced the last seq
1354   ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq);
1355   for (auto txn : txns) {
1356     txn->Rollback();
1357     delete txn;
1358   }
1359 }
1360 
1361 // A new snapshot should always be always larger than max_evicted_seq_
1362 // In very rare cases max could be below last published seq. Test that
1363 // taking snapshot will wait for max to catch up.
TEST_P(WritePreparedTransactionTest,MaxCatchupWithNewSnapshot)1364 TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
1365   const size_t snapshot_cache_bits = 7;  // same as default
1366   const size_t commit_cache_bits = 0;    // only 1 entry => frequent eviction
1367   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1368   ReOpen();
1369   WriteOptions woptions;
1370   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1371 
1372   const int writes = 50;
1373   const int batch_cnt = 4;
1374   ROCKSDB_NAMESPACE::port::Thread t1([&]() {
1375     for (int i = 0; i < writes; i++) {
1376       WriteBatch batch;
1377       // For duplicate keys cause 4 commit entries, each evicting an entry that
1378       // is not published yet, thus causing max evicted seq go higher than last
1379       // published.
1380       for (int b = 0; b < batch_cnt; b++) {
1381         batch.Put("foo", "foo");
1382       }
1383       db->Write(woptions, &batch);
1384     }
1385   });
1386 
1387   ROCKSDB_NAMESPACE::port::Thread t2([&]() {
1388     while (wp_db->max_evicted_seq_ == 0) {  // wait for insert thread
1389       std::this_thread::yield();
1390     }
1391     for (int i = 0; i < 10; i++) {
1392       SequenceNumber max_lower_bound = wp_db->max_evicted_seq_;
1393       auto snap = db->GetSnapshot();
1394       if (snap->GetSequenceNumber() != 0) {
1395         // Value of max_evicted_seq_ when snapshot was taken in unknown. We thus
1396         // compare with the lower bound instead as an approximation.
1397         ASSERT_LT(max_lower_bound, snap->GetSequenceNumber());
1398       }  // seq 0 is ok to be less than max since nothing is visible to it
1399       db->ReleaseSnapshot(snap);
1400     }
1401   });
1402 
1403   t1.join();
1404   t2.join();
1405 
1406   // Make sure that the test has worked and seq number has advanced as we
1407   // thought
1408   auto snap = db->GetSnapshot();
1409   ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1);
1410   db->ReleaseSnapshot(snap);
1411 }
1412 
1413 // Test that reads without snapshots would not hit an undefined state
TEST_P(WritePreparedTransactionTest,MaxCatchupWithUnbackedSnapshot)1414 TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) {
1415   const size_t snapshot_cache_bits = 7;  // same as default
1416   const size_t commit_cache_bits = 0;    // only 1 entry => frequent eviction
1417   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1418   ReOpen();
1419   WriteOptions woptions;
1420   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1421 
1422   const int writes = 50;
1423   ROCKSDB_NAMESPACE::port::Thread t1([&]() {
1424     for (int i = 0; i < writes; i++) {
1425       WriteBatch batch;
1426       batch.Put("key", "foo");
1427       db->Write(woptions, &batch);
1428     }
1429   });
1430 
1431   ROCKSDB_NAMESPACE::port::Thread t2([&]() {
1432     while (wp_db->max_evicted_seq_ == 0) {  // wait for insert thread
1433       std::this_thread::yield();
1434     }
1435     ReadOptions ropt;
1436     PinnableSlice pinnable_val;
1437     TransactionOptions txn_options;
1438     for (int i = 0; i < 10; i++) {
1439       auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1440       ASSERT_TRUE(s.ok() || s.IsTryAgain());
1441       pinnable_val.Reset();
1442       Transaction* txn = db->BeginTransaction(woptions, txn_options);
1443       s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1444       ASSERT_TRUE(s.ok() || s.IsTryAgain());
1445       pinnable_val.Reset();
1446       std::vector<std::string> values;
1447       auto s_vec =
1448           txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values);
1449       ASSERT_EQ(1, values.size());
1450       ASSERT_EQ(1, s_vec.size());
1451       s = s_vec[0];
1452       ASSERT_TRUE(s.ok() || s.IsTryAgain());
1453       Slice key("key");
1454       txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val, &s,
1455                     true);
1456       ASSERT_TRUE(s.ok() || s.IsTryAgain());
1457       delete txn;
1458     }
1459   });
1460 
1461   t1.join();
1462   t2.join();
1463 
1464   // Make sure that the test has worked and seq number has advanced as we
1465   // thought
1466   auto snap = db->GetSnapshot();
1467   ASSERT_GT(snap->GetSequenceNumber(), writes - 1);
1468   db->ReleaseSnapshot(snap);
1469 }
1470 
1471 // Check that old_commit_map_ cleanup works correctly if the snapshot equals
1472 // max_evicted_seq_.
TEST_P(WritePreparedTransactionTest,CleanupSnapshotEqualToMax)1473 TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
1474   const size_t snapshot_cache_bits = 7;  // same as default
1475   const size_t commit_cache_bits = 0;    // only 1 entry => frequent eviction
1476   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1477   ReOpen();
1478   WriteOptions woptions;
1479   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1480   // Insert something to increase seq
1481   ASSERT_OK(db->Put(woptions, "key", "value"));
1482   auto snap = db->GetSnapshot();
1483   auto snap_seq = snap->GetSequenceNumber();
1484   // Another insert should trigger eviction + load snapshot from db
1485   ASSERT_OK(db->Put(woptions, "key", "value"));
1486   // This is the scenario that we check agaisnt
1487   ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_);
1488   // old_commit_map_ now has some data that needs gc
1489   ASSERT_EQ(1, wp_db->snapshots_total_);
1490   ASSERT_EQ(1, wp_db->old_commit_map_.size());
1491 
1492   db->ReleaseSnapshot(snap);
1493 
1494   // Another insert should trigger eviction + load snapshot from db
1495   ASSERT_OK(db->Put(woptions, "key", "value"));
1496 
1497   // the snapshot and related metadata must be properly garbage collected
1498   ASSERT_EQ(0, wp_db->snapshots_total_);
1499   ASSERT_TRUE(wp_db->snapshots_all_.empty());
1500   ASSERT_EQ(0, wp_db->old_commit_map_.size());
1501 }
1502 
TEST_P(WritePreparedTransactionTest,AdvanceSeqByOne)1503 TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
1504   auto snap = db->GetSnapshot();
1505   auto seq1 = snap->GetSequenceNumber();
1506   db->ReleaseSnapshot(snap);
1507 
1508   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1509   wp_db->AdvanceSeqByOne();
1510 
1511   snap = db->GetSnapshot();
1512   auto seq2 = snap->GetSequenceNumber();
1513   db->ReleaseSnapshot(snap);
1514 
1515   ASSERT_LT(seq1, seq2);
1516 }
1517 
1518 // Test that the txn Initilize calls the overridden functions
TEST_P(WritePreparedTransactionTest,TxnInitialize)1519 TEST_P(WritePreparedTransactionTest, TxnInitialize) {
1520   TransactionOptions txn_options;
1521   WriteOptions write_options;
1522   ASSERT_OK(db->Put(write_options, "key", "value"));
1523   Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1524   ASSERT_OK(txn0->SetName("xid"));
1525   ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
1526   ASSERT_OK(txn0->Prepare());
1527 
1528   // SetSnapshot is overridden to update min_uncommitted_
1529   txn_options.set_snapshot = true;
1530   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1531   auto snap = txn1->GetSnapshot();
1532   auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap);
1533   // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
1534   // udpated
1535   ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
1536 
1537   txn0->Rollback();
1538   txn1->Rollback();
1539   delete txn0;
1540   delete txn1;
1541 }
1542 
1543 // This tests that transactions with duplicate keys perform correctly after max
1544 // is advancing their prepared sequence numbers. This will not be the case if
1545 // for example the txn does not add the prepared seq for the second sub-batch to
1546 // the PreparedHeap structure.
TEST_P(WritePreparedTransactionTest,AdvanceMaxEvictedSeqWithDuplicates)1547 TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) {
1548   const size_t snapshot_cache_bits = 7;  // same as default
1549   const size_t commit_cache_bits = 1;    // disable commit cache
1550   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1551   ReOpen();
1552 
1553   ReadOptions ropt;
1554   PinnableSlice pinnable_val;
1555   WriteOptions write_options;
1556   TransactionOptions txn_options;
1557   Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1558   ASSERT_OK(txn0->SetName("xid"));
1559   ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
1560   ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
1561   ASSERT_OK(txn0->Prepare());
1562 
1563   ASSERT_OK(db->Put(write_options, "key2", "value"));
1564   // Will cause max advance due to disabled commit cache
1565   ASSERT_OK(db->Put(write_options, "key3", "value"));
1566 
1567   auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1568   ASSERT_TRUE(s.IsNotFound());
1569   delete txn0;
1570 
1571   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1572   wp_db->db_impl_->FlushWAL(true);
1573   wp_db->TEST_Crash();
1574   ReOpenNoDelete();
1575   assert(db != nullptr);
1576   s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1577   ASSERT_TRUE(s.IsNotFound());
1578 
1579   txn0 = db->GetTransactionByName("xid");
1580   ASSERT_OK(txn0->Rollback());
1581   delete txn0;
1582 }
1583 
1584 #ifndef ROCKSDB_VALGRIND_RUN
1585 // Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and
1586 // delayed_prepared_, when is run concurrently with advancing max_evicted_seq,
1587 // which moves prepared txns from prepared_txns_ to delayed_prepared_.
TEST_P(WritePreparedTransactionTest,SmallestUnCommittedSeq)1588 TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
1589   const size_t snapshot_cache_bits = 7;  // same as default
1590   const size_t commit_cache_bits = 1;    // disable commit cache
1591   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1592   ReOpen();
1593   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1594   ReadOptions ropt;
1595   PinnableSlice pinnable_val;
1596   WriteOptions write_options;
1597   TransactionOptions txn_options;
1598   std::vector<Transaction*> txns, committed_txns;
1599 
1600   const int cnt = 100;
1601   for (int i = 0; i < cnt; i++) {
1602     Transaction* txn = db->BeginTransaction(write_options, txn_options);
1603     ASSERT_OK(txn->SetName("xid" + ToString(i)));
1604     auto key = "key1" + ToString(i);
1605     auto value = "value1" + ToString(i);
1606     ASSERT_OK(txn->Put(Slice(key), Slice(value)));
1607     ASSERT_OK(txn->Prepare());
1608     txns.push_back(txn);
1609   }
1610 
1611   port::Mutex mutex;
1612   Random rnd(1103);
1613   ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
1614     for (int i = 0; i < cnt; i++) {
1615       uint32_t index = rnd.Uniform(cnt - i);
1616       Transaction* txn;
1617       {
1618         MutexLock l(&mutex);
1619         txn = txns[index];
1620         txns.erase(txns.begin() + index);
1621       }
1622       // Since commit cache is practically disabled, commit results in immediate
1623       // advance in max_evicted_seq_ and subsequently moving some prepared txns
1624       // to delayed_prepared_.
1625       txn->Commit();
1626       committed_txns.push_back(txn);
1627     }
1628   });
1629   ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
1630     while (1) {
1631       MutexLock l(&mutex);
1632       if (txns.empty()) {
1633         break;
1634       }
1635       auto min_uncommitted = wp_db->SmallestUnCommittedSeq();
1636       ASSERT_LE(min_uncommitted, (*txns.begin())->GetId());
1637     }
1638   });
1639 
1640   commit_thread.join();
1641   read_thread.join();
1642   for (auto txn : committed_txns) {
1643     delete txn;
1644   }
1645 }
1646 #endif  // ROCKSDB_VALGRIND_RUN
1647 
TEST_P(SeqAdvanceConcurrentTest,SeqAdvanceConcurrent)1648 TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
1649   // Given the sequential run of txns, with this timeout we should never see a
1650   // deadlock nor a timeout unless we have a key conflict, which should be
1651   // almost infeasible.
1652   txn_db_options.transaction_lock_timeout = 1000;
1653   txn_db_options.default_lock_timeout = 1000;
1654   ReOpen();
1655   FlushOptions fopt;
1656 
1657   // Number of different txn types we use in this test
1658   const size_t type_cnt = 5;
1659   // The size of the first write group
1660   // TODO(myabandeh): This should be increase for pre-release tests
1661   const size_t first_group_size = 2;
1662   // Total number of txns we run in each test
1663   // TODO(myabandeh): This should be increase for pre-release tests
1664   const size_t txn_cnt = first_group_size + 1;
1665 
1666   size_t base[txn_cnt + 1] = {
1667       1,
1668   };
1669   for (size_t bi = 1; bi <= txn_cnt; bi++) {
1670     base[bi] = base[bi - 1] * type_cnt;
1671   }
1672   const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
1673   printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
1674   for (size_t n = 0; n < max_n; n++, ReOpen()) {
1675     if (n % split_cnt_ != split_id_) continue;
1676     if (n % 1000 == 0) {
1677       printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
1678     }
1679     DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1680     auto seq = db_impl->TEST_GetLastVisibleSequence();
1681     with_empty_commits = 0;
1682     exp_seq = seq;
1683     // This is increased before writing the batch for commit
1684     commit_writes = 0;
1685     // This is increased before txn starts linking if it expects to do a commit
1686     // eventually
1687     expected_commits = 0;
1688     std::vector<port::Thread> threads;
1689 
1690     linked = 0;
1691     std::atomic<bool> batch_formed(false);
1692     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1693         "WriteThread::EnterAsBatchGroupLeader:End",
1694         [&](void* /*arg*/) { batch_formed = true; });
1695     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1696         "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
1697           linked++;
1698           if (linked == 1) {
1699             // Wait until the others are linked too.
1700             while (linked < first_group_size) {
1701             }
1702           } else if (linked == 1 + first_group_size) {
1703             // Make the 2nd batch of the rest of writes plus any followup
1704             // commits from the first batch
1705             while (linked < txn_cnt + commit_writes) {
1706             }
1707           }
1708           // Then we will have one or more batches consisting of follow-up
1709           // commits from the 2nd batch. There is a bit of non-determinism here
1710           // but it should be tolerable.
1711         });
1712 
1713     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1714     for (size_t bi = 0; bi < txn_cnt; bi++) {
1715       // get the bi-th digit in number system based on type_cnt
1716       size_t d = (n % base[bi + 1]) / base[bi];
1717       switch (d) {
1718         case 0:
1719           threads.emplace_back(txn_t0, bi);
1720           break;
1721         case 1:
1722           threads.emplace_back(txn_t1, bi);
1723           break;
1724         case 2:
1725           threads.emplace_back(txn_t2, bi);
1726           break;
1727         case 3:
1728           threads.emplace_back(txn_t3, bi);
1729           break;
1730         case 4:
1731           threads.emplace_back(txn_t3, bi);
1732           break;
1733         default:
1734           assert(false);
1735       }
1736       // wait to be linked
1737       while (linked.load() <= bi) {
1738       }
1739       // after a queue of size first_group_size
1740       if (bi + 1 == first_group_size) {
1741         while (!batch_formed) {
1742         }
1743         // to make it more deterministic, wait until the commits are linked
1744         while (linked.load() <= bi + expected_commits) {
1745         }
1746       }
1747     }
1748     for (auto& t : threads) {
1749       t.join();
1750     }
1751     if (options.two_write_queues) {
1752       // In this case none of the above scheduling tricks to deterministically
1753       // form merged batches works because the writes go to separate queues.
1754       // This would result in different write groups in each run of the test. We
1755       // still keep the test since although non-deterministic and hard to debug,
1756       // it is still useful to have.
1757       // TODO(myabandeh): Add a deterministic unit test for two_write_queues
1758     }
1759 
1760     // Check if memtable inserts advanced seq number as expected
1761     seq = db_impl->TEST_GetLastVisibleSequence();
1762     ASSERT_EQ(exp_seq, seq);
1763 
1764     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1765     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1766 
1767     // Check if recovery preserves the last sequence number
1768     db_impl->FlushWAL(true);
1769     ReOpenNoDelete();
1770     assert(db != nullptr);
1771     db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1772     seq = db_impl->TEST_GetLastVisibleSequence();
1773     ASSERT_LE(exp_seq, seq + with_empty_commits);
1774 
1775     // Check if flush preserves the last sequence number
1776     db_impl->Flush(fopt);
1777     seq = db_impl->GetLatestSequenceNumber();
1778     ASSERT_LE(exp_seq, seq + with_empty_commits);
1779 
1780     // Check if recovery after flush preserves the last sequence number
1781     db_impl->FlushWAL(true);
1782     ReOpenNoDelete();
1783     assert(db != nullptr);
1784     db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1785     seq = db_impl->GetLatestSequenceNumber();
1786     ASSERT_LE(exp_seq, seq + with_empty_commits);
1787   }
1788 }
1789 
1790 // Run a couple of different txns among them some uncommitted. Restart the db at
1791 // a couple points to check whether the list of uncommitted txns are recovered
1792 // properly.
TEST_P(WritePreparedTransactionTest,BasicRecovery)1793 TEST_P(WritePreparedTransactionTest, BasicRecovery) {
1794   options.disable_auto_compactions = true;
1795   ReOpen();
1796   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1797 
1798   txn_t0(0);
1799 
1800   TransactionOptions txn_options;
1801   WriteOptions write_options;
1802   size_t index = 1000;
1803   Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1804   auto istr0 = std::to_string(index);
1805   auto s = txn0->SetName("xid" + istr0);
1806   ASSERT_OK(s);
1807   s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
1808   ASSERT_OK(s);
1809   s = txn0->Prepare();
1810   auto prep_seq_0 = txn0->GetId();
1811 
1812   txn_t1(0);
1813 
1814   index++;
1815   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1816   auto istr1 = std::to_string(index);
1817   s = txn1->SetName("xid" + istr1);
1818   ASSERT_OK(s);
1819   s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
1820   ASSERT_OK(s);
1821   s = txn1->Prepare();
1822   auto prep_seq_1 = txn1->GetId();
1823 
1824   txn_t2(0);
1825 
1826   ReadOptions ropt;
1827   PinnableSlice pinnable_val;
1828   // Check the value is not committed before restart
1829   s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1830   ASSERT_TRUE(s.IsNotFound());
1831   pinnable_val.Reset();
1832 
1833   delete txn0;
1834   delete txn1;
1835   wp_db->db_impl_->FlushWAL(true);
1836   wp_db->TEST_Crash();
1837   ReOpenNoDelete();
1838   assert(db != nullptr);
1839   wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1840   // After recovery, all the uncommitted txns (0 and 1) should be inserted into
1841   // delayed_prepared_
1842   ASSERT_TRUE(wp_db->prepared_txns_.empty());
1843   ASSERT_FALSE(wp_db->delayed_prepared_empty_);
1844   ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
1845   ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
1846   {
1847     ReadLock rl(&wp_db->prepared_mutex_);
1848     ASSERT_EQ(2, wp_db->delayed_prepared_.size());
1849     ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
1850                 wp_db->delayed_prepared_.end());
1851     ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
1852                 wp_db->delayed_prepared_.end());
1853   }
1854 
1855   // Check the value is still not committed after restart
1856   s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1857   ASSERT_TRUE(s.IsNotFound());
1858   pinnable_val.Reset();
1859 
1860   txn_t3(0);
1861 
1862   // Test that a recovered txns will be properly marked committed for the next
1863   // recovery
1864   txn1 = db->GetTransactionByName("xid" + istr1);
1865   ASSERT_NE(txn1, nullptr);
1866   txn1->Commit();
1867   delete txn1;
1868 
1869   index++;
1870   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1871   auto istr2 = std::to_string(index);
1872   s = txn2->SetName("xid" + istr2);
1873   ASSERT_OK(s);
1874   s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
1875   ASSERT_OK(s);
1876   s = txn2->Prepare();
1877   auto prep_seq_2 = txn2->GetId();
1878 
1879   delete txn2;
1880   wp_db->db_impl_->FlushWAL(true);
1881   wp_db->TEST_Crash();
1882   ReOpenNoDelete();
1883   assert(db != nullptr);
1884   wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1885   ASSERT_TRUE(wp_db->prepared_txns_.empty());
1886   ASSERT_FALSE(wp_db->delayed_prepared_empty_);
1887 
1888   // 0 and 2 are prepared and 1 is committed
1889   {
1890     ReadLock rl(&wp_db->prepared_mutex_);
1891     ASSERT_EQ(2, wp_db->delayed_prepared_.size());
1892     const auto& end = wp_db->delayed_prepared_.end();
1893     ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
1894     ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
1895     ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
1896   }
1897   ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
1898   ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
1899 
1900   // Commit all the remaining txns
1901   txn0 = db->GetTransactionByName("xid" + istr0);
1902   ASSERT_NE(txn0, nullptr);
1903   txn0->Commit();
1904   txn2 = db->GetTransactionByName("xid" + istr2);
1905   ASSERT_NE(txn2, nullptr);
1906   txn2->Commit();
1907 
1908   // Check the value is committed after commit
1909   s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1910   ASSERT_TRUE(s.ok());
1911   ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
1912   pinnable_val.Reset();
1913 
1914   delete txn0;
1915   delete txn2;
1916   wp_db->db_impl_->FlushWAL(true);
1917   ReOpenNoDelete();
1918   assert(db != nullptr);
1919   wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1920   ASSERT_TRUE(wp_db->prepared_txns_.empty());
1921   ASSERT_TRUE(wp_db->delayed_prepared_empty_);
1922 
1923   // Check the value is still committed after recovery
1924   s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1925   ASSERT_TRUE(s.ok());
1926   ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
1927   pinnable_val.Reset();
1928 }
1929 
1930 // After recovery the commit map is empty while the max is set. The code would
1931 // go through a different path which requires a separate test. Test that the
1932 // committed data before the restart is visible to all snapshots.
TEST_P(WritePreparedTransactionTest,IsInSnapshotEmptyMap)1933 TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) {
1934   for (bool end_with_prepare : {false, true}) {
1935     ReOpen();
1936     WriteOptions woptions;
1937     ASSERT_OK(db->Put(woptions, "key", "value"));
1938     ASSERT_OK(db->Put(woptions, "key", "value"));
1939     ASSERT_OK(db->Put(woptions, "key", "value"));
1940     SequenceNumber prepare_seq = kMaxSequenceNumber;
1941     if (end_with_prepare) {
1942       TransactionOptions txn_options;
1943       Transaction* txn = db->BeginTransaction(woptions, txn_options);
1944       ASSERT_OK(txn->SetName("xid0"));
1945       ASSERT_OK(txn->Prepare());
1946       prepare_seq = txn->GetId();
1947       delete txn;
1948     }
1949     dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
1950     auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1951     db_impl->FlushWAL(true);
1952     ReOpenNoDelete();
1953     WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1954     assert(wp_db != nullptr);
1955     ASSERT_GT(wp_db->max_evicted_seq_, 0);  // max after recovery
1956     // Take a snapshot right after recovery
1957     const Snapshot* snap = db->GetSnapshot();
1958     auto snap_seq = snap->GetSequenceNumber();
1959     ASSERT_GT(snap_seq, 0);
1960 
1961     for (SequenceNumber seq = 0;
1962          seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
1963       ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
1964     }
1965     if (end_with_prepare) {
1966       ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
1967     }
1968     // trivial check
1969     ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
1970 
1971     db->ReleaseSnapshot(snap);
1972 
1973     ASSERT_OK(db->Put(woptions, "key", "value"));
1974     // Take a snapshot after some writes
1975     snap = db->GetSnapshot();
1976     snap_seq = snap->GetSequenceNumber();
1977     for (SequenceNumber seq = 0;
1978          seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
1979       ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
1980     }
1981     if (end_with_prepare) {
1982       ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
1983     }
1984     // trivial check
1985     ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
1986 
1987     db->ReleaseSnapshot(snap);
1988   }
1989 }
1990 
1991 // Shows the contract of IsInSnapshot when called on invalid/released snapshots
TEST_P(WritePreparedTransactionTest,IsInSnapshotReleased)1992 TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
1993   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1994   WriteOptions woptions;
1995   ASSERT_OK(db->Put(woptions, "key", "value"));
1996   // snap seq = 1
1997   const Snapshot* snap1 = db->GetSnapshot();
1998   ASSERT_OK(db->Put(woptions, "key", "value"));
1999   ASSERT_OK(db->Put(woptions, "key", "value"));
2000   // snap seq = 3
2001   const Snapshot* snap2 = db->GetSnapshot();
2002   const SequenceNumber seq = 1;
2003   // Evict seq out of commit cache
2004   size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
2005   wp_db->AddCommitted(overwrite_seq, overwrite_seq);
2006   SequenceNumber snap_seq;
2007   uint64_t min_uncommitted = kMinUnCommittedSeq;
2008   bool released;
2009 
2010   released = false;
2011   snap_seq = snap1->GetSequenceNumber();
2012   ASSERT_LE(seq, snap_seq);
2013   // Valid snapshot lower than max
2014   ASSERT_LE(snap_seq, wp_db->max_evicted_seq_);
2015   ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2016   ASSERT_FALSE(released);
2017 
2018   released = false;
2019   snap_seq = snap1->GetSequenceNumber();
2020   // Invaid snapshot lower than max
2021   ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_);
2022   ASSERT_TRUE(
2023       wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
2024   ASSERT_TRUE(released);
2025 
2026   db->ReleaseSnapshot(snap1);
2027 
2028   released = false;
2029   // Released snapshot lower than max
2030   ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2031   // The release does not take affect until the next max advance
2032   ASSERT_FALSE(released);
2033 
2034   released = false;
2035   // Invaid snapshot lower than max
2036   ASSERT_TRUE(
2037       wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
2038   ASSERT_TRUE(released);
2039 
2040   // This make the snapshot release to reflect in txn db structures
2041   wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_,
2042                               wp_db->max_evicted_seq_ + 1);
2043 
2044   released = false;
2045   // Released snapshot lower than max
2046   ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2047   ASSERT_TRUE(released);
2048 
2049   released = false;
2050   // Invaid snapshot lower than max
2051   ASSERT_TRUE(
2052       wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
2053   ASSERT_TRUE(released);
2054 
2055   snap_seq = snap2->GetSequenceNumber();
2056 
2057   released = false;
2058   // Unreleased snapshot lower than max
2059   ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2060   ASSERT_FALSE(released);
2061 
2062   db->ReleaseSnapshot(snap2);
2063 }
2064 
2065 // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
2066 // snapshot, max_committed_seq_, prepared, and commit entries.
TEST_P(WritePreparedTransactionTest,IsInSnapshot)2067 TEST_P(WritePreparedTransactionTest, IsInSnapshot) {
2068   WriteOptions wo;
2069   // Use small commit cache to trigger lots of eviction and fast advance of
2070   // max_evicted_seq_
2071   const size_t commit_cache_bits = 3;
2072   // Same for snapshot cache size
2073   const size_t snapshot_cache_bits = 2;
2074 
2075   // Take some preliminary snapshots first. This is to stress the data structure
2076   // that holds the old snapshots as it will be designed to be efficient when
2077   // only a few snapshots are below the max_evicted_seq_.
2078   for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
2079     // Leave some gap between the preliminary snapshots and the final snapshot
2080     // that we check. This should test for also different overlapping scenarios
2081     // between the last snapshot and the commits.
2082     for (int max_gap = 1; max_gap < 10; max_gap++) {
2083       // Since we do not actually write to db, we mock the seq as it would be
2084       // increased by the db. The only exception is that we need db seq to
2085       // advance for our snapshots. for which we apply a dummy put each time we
2086       // increase our mock of seq.
2087       uint64_t seq = 0;
2088       // At each step we prepare a txn and then we commit it in the next txn.
2089       // This emulates the consecutive transactions that write to the same key
2090       uint64_t cur_txn = 0;
2091       // Number of snapshots taken so far
2092       int num_snapshots = 0;
2093       // Number of gaps applied so far
2094       int gap_cnt = 0;
2095       // The final snapshot that we will inspect
2096       uint64_t snapshot = 0;
2097       bool found_committed = false;
2098       // To stress the data structure that maintain prepared txns, at each cycle
2099       // we add a new prepare txn. These do not mean to be committed for
2100       // snapshot inspection.
2101       std::set<uint64_t> prepared;
2102       // We keep the list of txns committed before we take the last snapshot.
2103       // These should be the only seq numbers that will be found in the snapshot
2104       std::set<uint64_t> committed_before;
2105       // The set of commit seq numbers to be excluded from IsInSnapshot queries
2106       std::set<uint64_t> commit_seqs;
2107       DBImpl* mock_db = new DBImpl(options, dbname);
2108       UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2109       std::unique_ptr<WritePreparedTxnDBMock> wp_db(
2110           new WritePreparedTxnDBMock(mock_db, txn_db_options));
2111       // We continue until max advances a bit beyond the snapshot.
2112       while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
2113         // do prepare for a transaction
2114         seq++;
2115         wp_db->AddPrepared(seq);
2116         prepared.insert(seq);
2117 
2118         // If cur_txn is not started, do prepare for it.
2119         if (!cur_txn) {
2120           seq++;
2121           cur_txn = seq;
2122           wp_db->AddPrepared(cur_txn);
2123         } else {                                     // else commit it
2124           seq++;
2125           wp_db->AddCommitted(cur_txn, seq);
2126           wp_db->RemovePrepared(cur_txn);
2127           commit_seqs.insert(seq);
2128           if (!snapshot) {
2129             committed_before.insert(cur_txn);
2130           }
2131           cur_txn = 0;
2132         }
2133 
2134         if (num_snapshots < max_snapshots - 1) {
2135           // Take preliminary snapshots
2136           wp_db->TakeSnapshot(seq);
2137           num_snapshots++;
2138         } else if (gap_cnt < max_gap) {
2139           // Wait for some gap before taking the final snapshot
2140           gap_cnt++;
2141         } else if (!snapshot) {
2142           // Take the final snapshot if it is not already taken
2143           snapshot = seq;
2144           wp_db->TakeSnapshot(snapshot);
2145           num_snapshots++;
2146         }
2147 
2148         // If the snapshot is taken, verify seq numbers visible to it. We redo
2149         // it at each cycle to test that the system is still sound when
2150         // max_evicted_seq_ advances.
2151         if (snapshot) {
2152           for (uint64_t s = 1;
2153                s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
2154             bool was_committed =
2155                 (committed_before.find(s) != committed_before.end());
2156             bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
2157             if (was_committed != is_in_snapshot) {
2158               printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
2159                      " snapshot %" PRIu64
2160                      " gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
2161                      max_snapshots, max_gap, seq,
2162                      wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
2163                      num_snapshots, s);
2164             }
2165             ASSERT_EQ(was_committed, is_in_snapshot);
2166             found_committed = found_committed || is_in_snapshot;
2167           }
2168         }
2169       }
2170       // Safety check to make sure the test actually ran
2171       ASSERT_TRUE(found_committed);
2172       // As an extra check, check if prepared set will be properly empty after
2173       // they are committed.
2174       if (cur_txn) {
2175         wp_db->AddCommitted(cur_txn, seq);
2176         wp_db->RemovePrepared(cur_txn);
2177       }
2178       for (auto p : prepared) {
2179         wp_db->AddCommitted(p, seq);
2180         wp_db->RemovePrepared(p);
2181       }
2182       ASSERT_TRUE(wp_db->delayed_prepared_.empty());
2183       ASSERT_TRUE(wp_db->prepared_txns_.empty());
2184     }
2185   }
2186 }
2187 
ASSERT_SAME(ReadOptions roptions,TransactionDB * db,Status exp_s,PinnableSlice & exp_v,Slice key)2188 void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
2189                  PinnableSlice& exp_v, Slice key) {
2190   Status s;
2191   PinnableSlice v;
2192   s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
2193   ASSERT_TRUE(exp_s == s);
2194   ASSERT_TRUE(s.ok() || s.IsNotFound());
2195   if (s.ok()) {
2196     ASSERT_TRUE(exp_v == v);
2197   }
2198 
2199   // Try with MultiGet API too
2200   std::vector<std::string> values;
2201   auto s_vec =
2202       db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values);
2203   ASSERT_EQ(1, values.size());
2204   ASSERT_EQ(1, s_vec.size());
2205   s = s_vec[0];
2206   ASSERT_TRUE(exp_s == s);
2207   ASSERT_TRUE(s.ok() || s.IsNotFound());
2208   if (s.ok()) {
2209     ASSERT_TRUE(exp_v == values[0]);
2210   }
2211 }
2212 
ASSERT_SAME(TransactionDB * db,Status exp_s,PinnableSlice & exp_v,Slice key)2213 void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
2214                  Slice key) {
2215   ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
2216 }
2217 
TEST_P(WritePreparedTransactionTest,Rollback)2218 TEST_P(WritePreparedTransactionTest, Rollback) {
2219   ReadOptions roptions;
2220   WriteOptions woptions;
2221   TransactionOptions txn_options;
2222   const size_t num_keys = 4;
2223   const size_t num_values = 5;
2224   for (size_t ikey = 1; ikey <= num_keys; ikey++) {
2225     for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
2226       for (bool crash : {false, true}) {
2227         ReOpen();
2228         WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
2229         std::string key_str = "key" + ToString(ikey);
2230         switch (ivalue) {
2231           case 0:
2232             break;
2233           case 1:
2234             ASSERT_OK(db->Put(woptions, key_str, "initvalue1"));
2235             break;
2236           case 2:
2237             ASSERT_OK(db->Merge(woptions, key_str, "initvalue2"));
2238             break;
2239           case 3:
2240             ASSERT_OK(db->Delete(woptions, key_str));
2241             break;
2242           case 4:
2243             ASSERT_OK(db->SingleDelete(woptions, key_str));
2244             break;
2245           default:
2246             assert(0);
2247         }
2248 
2249         PinnableSlice v1;
2250         auto s1 =
2251             db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1);
2252         PinnableSlice v2;
2253         auto s2 =
2254             db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2);
2255         PinnableSlice v3;
2256         auto s3 =
2257             db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3);
2258         PinnableSlice v4;
2259         auto s4 =
2260             db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4);
2261         Transaction* txn = db->BeginTransaction(woptions, txn_options);
2262         auto s = txn->SetName("xid0");
2263         ASSERT_OK(s);
2264         s = txn->Put(Slice("key1"), Slice("value1"));
2265         ASSERT_OK(s);
2266         s = txn->Merge(Slice("key2"), Slice("value2"));
2267         ASSERT_OK(s);
2268         s = txn->Delete(Slice("key3"));
2269         ASSERT_OK(s);
2270         s = txn->SingleDelete(Slice("key4"));
2271         ASSERT_OK(s);
2272         s = txn->Prepare();
2273         ASSERT_OK(s);
2274 
2275         {
2276           ReadLock rl(&wp_db->prepared_mutex_);
2277           ASSERT_FALSE(wp_db->prepared_txns_.empty());
2278           ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top());
2279         }
2280 
2281         ASSERT_SAME(db, s1, v1, "key1");
2282         ASSERT_SAME(db, s2, v2, "key2");
2283         ASSERT_SAME(db, s3, v3, "key3");
2284         ASSERT_SAME(db, s4, v4, "key4");
2285 
2286         if (crash) {
2287           delete txn;
2288           auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
2289           db_impl->FlushWAL(true);
2290           dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
2291           ReOpenNoDelete();
2292           assert(db != nullptr);
2293           wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
2294           txn = db->GetTransactionByName("xid0");
2295           ASSERT_FALSE(wp_db->delayed_prepared_empty_);
2296           ReadLock rl(&wp_db->prepared_mutex_);
2297           ASSERT_TRUE(wp_db->prepared_txns_.empty());
2298           ASSERT_FALSE(wp_db->delayed_prepared_.empty());
2299           ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
2300                       wp_db->delayed_prepared_.end());
2301         }
2302 
2303         ASSERT_SAME(db, s1, v1, "key1");
2304         ASSERT_SAME(db, s2, v2, "key2");
2305         ASSERT_SAME(db, s3, v3, "key3");
2306         ASSERT_SAME(db, s4, v4, "key4");
2307 
2308         s = txn->Rollback();
2309         ASSERT_OK(s);
2310 
2311         {
2312           ASSERT_TRUE(wp_db->delayed_prepared_empty_);
2313           ReadLock rl(&wp_db->prepared_mutex_);
2314           ASSERT_TRUE(wp_db->prepared_txns_.empty());
2315           ASSERT_TRUE(wp_db->delayed_prepared_.empty());
2316         }
2317 
2318         ASSERT_SAME(db, s1, v1, "key1");
2319         ASSERT_SAME(db, s2, v2, "key2");
2320         ASSERT_SAME(db, s3, v3, "key3");
2321         ASSERT_SAME(db, s4, v4, "key4");
2322         delete txn;
2323       }
2324     }
2325   }
2326 }
2327 
TEST_P(WritePreparedTransactionTest,DisableGCDuringRecovery)2328 TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) {
2329   // Use large buffer to avoid memtable flush after 1024 insertions
2330   options.write_buffer_size = 1024 * 1024;
2331   ReOpen();
2332   std::vector<KeyVersion> versions;
2333   uint64_t seq = 0;
2334   for (uint64_t i = 1; i <= 1024; i++) {
2335     std::string v = "bar" + ToString(i);
2336     ASSERT_OK(db->Put(WriteOptions(), "foo", v));
2337     VerifyKeys({{"foo", v}});
2338     seq++;  // one for the key/value
2339     KeyVersion kv = {"foo", v, seq, kTypeValue};
2340     if (options.two_write_queues) {
2341       seq++;  // one for the commit
2342     }
2343     versions.emplace_back(kv);
2344   }
2345   std::reverse(std::begin(versions), std::end(versions));
2346   VerifyInternalKeys(versions);
2347   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
2348   db_impl->FlushWAL(true);
2349   // Use small buffer to ensure memtable flush during recovery
2350   options.write_buffer_size = 1024;
2351   ReOpenNoDelete();
2352   VerifyInternalKeys(versions);
2353 }
2354 
TEST_P(WritePreparedTransactionTest,SequenceNumberZero)2355 TEST_P(WritePreparedTransactionTest, SequenceNumberZero) {
2356   ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
2357   VerifyKeys({{"foo", "bar"}});
2358   const Snapshot* snapshot = db->GetSnapshot();
2359   ASSERT_OK(db->Flush(FlushOptions()));
2360   // Dummy keys to avoid compaction trivially move files and get around actual
2361   // compaction logic.
2362   ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2363   ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2364   ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2365   // Compaction will output keys with sequence number 0, if it is visible to
2366   // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
2367   // visible to any snapshot.
2368   VerifyKeys({{"foo", "bar"}});
2369   VerifyKeys({{"foo", "bar"}}, snapshot);
2370   VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
2371   db->ReleaseSnapshot(snapshot);
2372 }
2373 
2374 // Compaction should not remove a key if it is not committed, and should
2375 // proceed with older versions of the key as-if the new version doesn't exist.
TEST_P(WritePreparedTransactionTest,CompactionShouldKeepUncommittedKeys)2376 TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
2377   options.disable_auto_compactions = true;
2378   ReOpen();
2379   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
2380   // Snapshots to avoid keys get evicted.
2381   std::vector<const Snapshot*> snapshots;
2382   // Keep track of expected sequence number.
2383   SequenceNumber expected_seq = 0;
2384 
2385   auto add_key = [&](std::function<Status()> func) {
2386     ASSERT_OK(func());
2387     expected_seq++;
2388     if (options.two_write_queues) {
2389       expected_seq++;  // 1 for commit
2390     }
2391     ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2392     snapshots.push_back(db->GetSnapshot());
2393   };
2394 
2395   // Each key here represent a standalone test case.
2396   add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
2397   add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
2398   add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
2399   add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
2400   add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
2401   add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
2402   add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
2403   add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
2404   ASSERT_OK(db->Flush(FlushOptions()));
2405   add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
2406   add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
2407 
2408   auto* transaction = db->BeginTransaction(WriteOptions());
2409   ASSERT_OK(transaction->SetName("txn"));
2410   ASSERT_OK(transaction->Put("key1", "value1_2"));
2411   ASSERT_OK(transaction->Delete("key2"));
2412   ASSERT_OK(transaction->SingleDelete("key3"));
2413   ASSERT_OK(transaction->Merge("key4", "value4_2"));
2414   ASSERT_OK(transaction->Merge("key5", "value5_3"));
2415   ASSERT_OK(transaction->Put("key6", "value6_2"));
2416   ASSERT_OK(transaction->Put("key7", "value7_2"));
2417   // Prepare but not commit.
2418   ASSERT_OK(transaction->Prepare());
2419   ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2420   ASSERT_OK(db->Flush(FlushOptions()));
2421   for (auto* s : snapshots) {
2422     db->ReleaseSnapshot(s);
2423   }
2424   // Dummy keys to avoid compaction trivially move files and get around actual
2425   // compaction logic.
2426   ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2427   ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2428   ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2429   VerifyKeys({
2430       {"key1", "value1_1"},
2431       {"key2", "value2_1"},
2432       {"key3", "value3_1"},
2433       {"key4", "value4_1"},
2434       {"key5", "value5_1,value5_2"},
2435       {"key6", "NOT_FOUND"},
2436       {"key7", "NOT_FOUND"},
2437   });
2438   VerifyInternalKeys({
2439       {"key1", "value1_2", expected_seq, kTypeValue},
2440       {"key1", "value1_1", 0, kTypeValue},
2441       {"key2", "", expected_seq, kTypeDeletion},
2442       {"key2", "value2_1", 0, kTypeValue},
2443       {"key3", "", expected_seq, kTypeSingleDeletion},
2444       {"key3", "value3_1", 0, kTypeValue},
2445       {"key4", "value4_2", expected_seq, kTypeMerge},
2446       {"key4", "value4_1", 0, kTypeValue},
2447       {"key5", "value5_3", expected_seq, kTypeMerge},
2448       {"key5", "value5_1,value5_2", 0, kTypeValue},
2449       {"key6", "value6_2", expected_seq, kTypeValue},
2450       {"key7", "value7_2", expected_seq, kTypeValue},
2451   });
2452   ASSERT_OK(transaction->Commit());
2453   VerifyKeys({
2454       {"key1", "value1_2"},
2455       {"key2", "NOT_FOUND"},
2456       {"key3", "NOT_FOUND"},
2457       {"key4", "value4_1,value4_2"},
2458       {"key5", "value5_1,value5_2,value5_3"},
2459       {"key6", "value6_2"},
2460       {"key7", "value7_2"},
2461   });
2462   delete transaction;
2463 }
2464 
2465 // Compaction should keep keys visible to a snapshot based on commit sequence,
2466 // not just prepare sequence.
TEST_P(WritePreparedTransactionTest,CompactionShouldKeepSnapshotVisibleKeys)2467 TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
2468   options.disable_auto_compactions = true;
2469   ReOpen();
2470   // Keep track of expected sequence number.
2471   SequenceNumber expected_seq = 0;
2472   auto* txn1 = db->BeginTransaction(WriteOptions());
2473   ASSERT_OK(txn1->SetName("txn1"));
2474   ASSERT_OK(txn1->Put("key1", "value1_1"));
2475   ASSERT_OK(txn1->Prepare());
2476   ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2477   ASSERT_OK(txn1->Commit());
2478   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
2479   ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
2480   delete txn1;
2481   // Take a snapshots to avoid keys get evicted before compaction.
2482   const Snapshot* snapshot1 = db->GetSnapshot();
2483   auto* txn2 = db->BeginTransaction(WriteOptions());
2484   ASSERT_OK(txn2->SetName("txn2"));
2485   ASSERT_OK(txn2->Put("key2", "value2_1"));
2486   ASSERT_OK(txn2->Prepare());
2487   ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2488   // txn1 commit before snapshot2 and it is visible to snapshot2.
2489   // txn2 commit after snapshot2 and it is not visible.
2490   const Snapshot* snapshot2 = db->GetSnapshot();
2491   ASSERT_OK(txn2->Commit());
2492   ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
2493   delete txn2;
2494   // Take a snapshots to avoid keys get evicted before compaction.
2495   const Snapshot* snapshot3 = db->GetSnapshot();
2496   ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
2497   expected_seq++;  // 1 for write
2498   SequenceNumber seq1 = expected_seq;
2499   if (options.two_write_queues) {
2500     expected_seq++;  // 1 for commit
2501   }
2502   ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2503   ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
2504   expected_seq++;  // 1 for write
2505   SequenceNumber seq2 = expected_seq;
2506   if (options.two_write_queues) {
2507     expected_seq++;  // 1 for commit
2508   }
2509   ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2510   ASSERT_OK(db->Flush(FlushOptions()));
2511   db->ReleaseSnapshot(snapshot1);
2512   db->ReleaseSnapshot(snapshot3);
2513   // Dummy keys to avoid compaction trivially move files and get around actual
2514   // compaction logic.
2515   ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2516   ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2517   ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2518   VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
2519   VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
2520   VerifyInternalKeys({
2521       {"key1", "value1_2", seq1, kTypeValue},
2522       // "value1_1" is visible to snapshot2. Also keys at bottom level visible
2523       // to earliest snapshot will output with seq = 0.
2524       {"key1", "value1_1", 0, kTypeValue},
2525       {"key2", "value2_2", seq2, kTypeValue},
2526   });
2527   db->ReleaseSnapshot(snapshot2);
2528 }
2529 
TEST_P(WritePreparedTransactionTest,SmallestUncommittedOptimization)2530 TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
2531   const size_t snapshot_cache_bits = 7;  // same as default
2532   const size_t commit_cache_bits = 0;    // disable commit cache
2533   for (bool has_recent_prepare : {true, false}) {
2534     UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2535     ReOpen();
2536 
2537     ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2538     auto* transaction =
2539         db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2540     ASSERT_OK(transaction->SetName("txn"));
2541     ASSERT_OK(transaction->Delete("key1"));
2542     ASSERT_OK(transaction->Prepare());
2543     // snapshot1 should get min_uncommitted from prepared_txns_ heap.
2544     auto snapshot1 = db->GetSnapshot();
2545     ASSERT_EQ(transaction->GetId(),
2546               ((SnapshotImpl*)snapshot1)->min_uncommitted_);
2547     // Add a commit to advance max_evicted_seq and move the prepared transaction
2548     // into delayed_prepared_ set.
2549     ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2550     Transaction* txn2 = nullptr;
2551     if (has_recent_prepare) {
2552       txn2 =
2553           db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2554       ASSERT_OK(txn2->SetName("txn2"));
2555       ASSERT_OK(txn2->Put("key3", "value3"));
2556       ASSERT_OK(txn2->Prepare());
2557     }
2558     // snapshot2 should get min_uncommitted from delayed_prepared_ set.
2559     auto snapshot2 = db->GetSnapshot();
2560     ASSERT_EQ(transaction->GetId(),
2561               ((SnapshotImpl*)snapshot1)->min_uncommitted_);
2562     ASSERT_OK(transaction->Commit());
2563     delete transaction;
2564     if (has_recent_prepare) {
2565       ASSERT_OK(txn2->Commit());
2566       delete txn2;
2567     }
2568     VerifyKeys({{"key1", "NOT_FOUND"}});
2569     VerifyKeys({{"key1", "value1"}}, snapshot1);
2570     VerifyKeys({{"key1", "value1"}}, snapshot2);
2571     db->ReleaseSnapshot(snapshot1);
2572     db->ReleaseSnapshot(snapshot2);
2573   }
2574 }
2575 
2576 // Insert two values, v1 and v2, for a key. Between prepare and commit of v2
2577 // take two snapshots, s1 and s2. Release s1 during compaction.
2578 // Test to make sure compaction doesn't get confused and think s1 can see both
2579 // values, and thus compact out the older value by mistake.
TEST_P(WritePreparedTransactionTest,ReleaseSnapshotDuringCompaction)2580 TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
2581   const size_t snapshot_cache_bits = 7;  // same as default
2582   const size_t commit_cache_bits = 0;    // minimum commit cache
2583   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2584   ReOpen();
2585 
2586   ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
2587   auto* transaction =
2588       db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2589   ASSERT_OK(transaction->SetName("txn"));
2590   ASSERT_OK(transaction->Put("key1", "value1_2"));
2591   ASSERT_OK(transaction->Prepare());
2592   auto snapshot1 = db->GetSnapshot();
2593   // Increment sequence number.
2594   ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2595   auto snapshot2 = db->GetSnapshot();
2596   ASSERT_OK(transaction->Commit());
2597   delete transaction;
2598   VerifyKeys({{"key1", "value1_2"}});
2599   VerifyKeys({{"key1", "value1_1"}}, snapshot1);
2600   VerifyKeys({{"key1", "value1_1"}}, snapshot2);
2601   // Add a flush to avoid compaction to fallback to trivial move.
2602 
2603   auto callback = [&](void*) {
2604     // Release snapshot1 after CompactionIterator init.
2605     // CompactionIterator need to figure out the earliest snapshot
2606     // that can see key1:value1_2 is kMaxSequenceNumber, not
2607     // snapshot1 or snapshot2.
2608     db->ReleaseSnapshot(snapshot1);
2609     // Add some keys to advance max_evicted_seq.
2610     ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
2611     ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
2612   };
2613   SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2614                                         callback);
2615   SyncPoint::GetInstance()->EnableProcessing();
2616 
2617   ASSERT_OK(db->Flush(FlushOptions()));
2618   VerifyKeys({{"key1", "value1_2"}});
2619   VerifyKeys({{"key1", "value1_1"}}, snapshot2);
2620   db->ReleaseSnapshot(snapshot2);
2621   SyncPoint::GetInstance()->ClearAllCallBacks();
2622 }
2623 
2624 // Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
2625 // after committing v2. Release s1 during compaction, right after compaction
2626 // processes v2 and before processes v1. Test to make sure compaction doesn't
2627 // get confused and believe v1 and v2 are visible to different snapshot
2628 // (v1 by s2, v2 by s1) and refuse to compact out v1.
TEST_P(WritePreparedTransactionTest,ReleaseSnapshotDuringCompaction2)2629 TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
2630   const size_t snapshot_cache_bits = 7;  // same as default
2631   const size_t commit_cache_bits = 0;    // minimum commit cache
2632   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2633   ReOpen();
2634 
2635   ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2636   ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
2637   SequenceNumber v2_seq = db->GetLatestSequenceNumber();
2638   auto* s1 = db->GetSnapshot();
2639   // Advance sequence number.
2640   ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy"));
2641   auto* s2 = db->GetSnapshot();
2642 
2643   int count_value = 0;
2644   auto callback = [&](void* arg) {
2645     auto* ikey = reinterpret_cast<ParsedInternalKey*>(arg);
2646     if (ikey->user_key == "key1") {
2647       count_value++;
2648       if (count_value == 2) {
2649         // Processing v1.
2650         db->ReleaseSnapshot(s1);
2651         // Add some keys to advance max_evicted_seq and update
2652         // old_commit_map.
2653         ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy"));
2654         ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy"));
2655       }
2656     }
2657   };
2658   SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
2659                                         callback);
2660   SyncPoint::GetInstance()->EnableProcessing();
2661 
2662   ASSERT_OK(db->Flush(FlushOptions()));
2663   // value1 should be compact out.
2664   VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
2665 
2666   // cleanup
2667   db->ReleaseSnapshot(s2);
2668   SyncPoint::GetInstance()->ClearAllCallBacks();
2669 }
2670 
2671 // Insert two values, v1 and v2, for a key. Insert another dummy key
2672 // so to evict the commit cache for v2, while v1 is still in commit cache.
2673 // Take two snapshots, s1 and s2. Release s1 during compaction.
2674 // Since commit cache for v2 is evicted, and old_commit_map don't have
2675 // s1 (it is released),
2676 // TODO(myabandeh): how can we be sure that the v2's commit info is evicted
2677 // (and not v1's)? Instead of putting a dummy, we can directly call
2678 // AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
TEST_P(WritePreparedTransactionTest,ReleaseSnapshotDuringCompaction3)2679 TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
2680   const size_t snapshot_cache_bits = 7;  // same as default
2681   const size_t commit_cache_bits = 1;    // commit cache size = 2
2682   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2683   ReOpen();
2684 
2685   // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
2686   // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
2687   auto add_dummy = [&]() {
2688     auto* txn_dummy =
2689         db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2690     ASSERT_OK(txn_dummy->SetName("txn_dummy"));
2691     ASSERT_OK(txn_dummy->Put("dummy", "dummy"));
2692     ASSERT_OK(txn_dummy->Prepare());
2693     ASSERT_OK(txn_dummy->Commit());
2694     delete txn_dummy;
2695   };
2696 
2697   ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2698   auto* txn =
2699       db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2700   ASSERT_OK(txn->SetName("txn"));
2701   ASSERT_OK(txn->Put("key1", "value2"));
2702   ASSERT_OK(txn->Prepare());
2703   // TODO(myabandeh): replace it with GetId()?
2704   auto v2_seq = db->GetLatestSequenceNumber();
2705   ASSERT_OK(txn->Commit());
2706   delete txn;
2707   auto* s1 = db->GetSnapshot();
2708   // Dummy key to advance sequence number.
2709   add_dummy();
2710   auto* s2 = db->GetSnapshot();
2711 
2712   auto callback = [&](void*) {
2713     db->ReleaseSnapshot(s1);
2714     // Add some dummy entries to trigger s1 being cleanup from old_commit_map.
2715     add_dummy();
2716     add_dummy();
2717   };
2718   SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2719                                         callback);
2720   SyncPoint::GetInstance()->EnableProcessing();
2721 
2722   ASSERT_OK(db->Flush(FlushOptions()));
2723   // value1 should be compact out.
2724   VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
2725 
2726   db->ReleaseSnapshot(s2);
2727   SyncPoint::GetInstance()->ClearAllCallBacks();
2728 }
2729 
TEST_P(WritePreparedTransactionTest,ReleaseEarliestSnapshotDuringCompaction)2730 TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
2731   const size_t snapshot_cache_bits = 7;  // same as default
2732   const size_t commit_cache_bits = 0;    // minimum commit cache
2733   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2734   ReOpen();
2735 
2736   ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2737   auto* transaction =
2738       db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2739   ASSERT_OK(transaction->SetName("txn"));
2740   ASSERT_OK(transaction->Delete("key1"));
2741   ASSERT_OK(transaction->Prepare());
2742   SequenceNumber del_seq = db->GetLatestSequenceNumber();
2743   auto snapshot1 = db->GetSnapshot();
2744   // Increment sequence number.
2745   ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2746   auto snapshot2 = db->GetSnapshot();
2747   ASSERT_OK(transaction->Commit());
2748   delete transaction;
2749   VerifyKeys({{"key1", "NOT_FOUND"}});
2750   VerifyKeys({{"key1", "value1"}}, snapshot1);
2751   VerifyKeys({{"key1", "value1"}}, snapshot2);
2752   ASSERT_OK(db->Flush(FlushOptions()));
2753 
2754   auto callback = [&](void* compaction) {
2755     // Release snapshot1 after CompactionIterator init.
2756     // CompactionIterator need to double check and find out snapshot2 is now
2757     // the earliest existing snapshot.
2758     if (compaction != nullptr) {
2759       db->ReleaseSnapshot(snapshot1);
2760       // Add some keys to advance max_evicted_seq.
2761       ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
2762       ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
2763     }
2764   };
2765   SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2766                                         callback);
2767   SyncPoint::GetInstance()->EnableProcessing();
2768 
2769   // Dummy keys to avoid compaction trivially move files and get around actual
2770   // compaction logic.
2771   ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2772   ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2773   ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2774   // Only verify for key1. Both the put and delete for the key should be kept.
2775   // Since the delete tombstone is not visible to snapshot2, we need to keep
2776   // at least one version of the key, for write-conflict check.
2777   VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
2778                       {"key1", "value1", 0, kTypeValue}});
2779   db->ReleaseSnapshot(snapshot2);
2780   SyncPoint::GetInstance()->ClearAllCallBacks();
2781 }
2782 
2783 // A more complex test to verify compaction/flush should keep keys visible
2784 // to snapshots.
TEST_P(WritePreparedTransactionTest,CompactionKeepSnapshotVisibleKeysRandomized)2785 TEST_P(WritePreparedTransactionTest,
2786        CompactionKeepSnapshotVisibleKeysRandomized) {
2787   constexpr size_t kNumTransactions = 10;
2788   constexpr size_t kNumIterations = 1000;
2789 
2790   std::vector<Transaction*> transactions(kNumTransactions, nullptr);
2791   std::vector<size_t> versions(kNumTransactions, 0);
2792   std::unordered_map<std::string, std::string> current_data;
2793   std::vector<const Snapshot*> snapshots;
2794   std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
2795 
2796   Random rnd(1103);
2797   options.disable_auto_compactions = true;
2798   ReOpen();
2799 
2800   for (size_t i = 0; i < kNumTransactions; i++) {
2801     std::string key = "key" + ToString(i);
2802     std::string value = "value0";
2803     ASSERT_OK(db->Put(WriteOptions(), key, value));
2804     current_data[key] = value;
2805   }
2806   VerifyKeys(current_data);
2807 
2808   for (size_t iter = 0; iter < kNumIterations; iter++) {
2809     auto r = rnd.Next() % (kNumTransactions + 1);
2810     if (r < kNumTransactions) {
2811       std::string key = "key" + ToString(r);
2812       if (transactions[r] == nullptr) {
2813         std::string value = "value" + ToString(versions[r] + 1);
2814         auto* txn = db->BeginTransaction(WriteOptions());
2815         ASSERT_OK(txn->SetName("txn" + ToString(r)));
2816         ASSERT_OK(txn->Put(key, value));
2817         ASSERT_OK(txn->Prepare());
2818         transactions[r] = txn;
2819       } else {
2820         std::string value = "value" + ToString(++versions[r]);
2821         ASSERT_OK(transactions[r]->Commit());
2822         delete transactions[r];
2823         transactions[r] = nullptr;
2824         current_data[key] = value;
2825       }
2826     } else {
2827       auto* snapshot = db->GetSnapshot();
2828       VerifyKeys(current_data, snapshot);
2829       snapshots.push_back(snapshot);
2830       snapshot_data.push_back(current_data);
2831     }
2832     VerifyKeys(current_data);
2833   }
2834   // Take a last snapshot to test compaction with uncommitted prepared
2835   // transaction.
2836   snapshots.push_back(db->GetSnapshot());
2837   snapshot_data.push_back(current_data);
2838 
2839   assert(snapshots.size() == snapshot_data.size());
2840   for (size_t i = 0; i < snapshots.size(); i++) {
2841     VerifyKeys(snapshot_data[i], snapshots[i]);
2842   }
2843   ASSERT_OK(db->Flush(FlushOptions()));
2844   for (size_t i = 0; i < snapshots.size(); i++) {
2845     VerifyKeys(snapshot_data[i], snapshots[i]);
2846   }
2847   // Dummy keys to avoid compaction trivially move files and get around actual
2848   // compaction logic.
2849   ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2850   ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2851   ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2852   for (size_t i = 0; i < snapshots.size(); i++) {
2853     VerifyKeys(snapshot_data[i], snapshots[i]);
2854   }
2855   // cleanup
2856   for (size_t i = 0; i < kNumTransactions; i++) {
2857     if (transactions[i] == nullptr) {
2858       continue;
2859     }
2860     ASSERT_OK(transactions[i]->Commit());
2861     delete transactions[i];
2862   }
2863   for (size_t i = 0; i < snapshots.size(); i++) {
2864     db->ReleaseSnapshot(snapshots[i]);
2865   }
2866 }
2867 
2868 // Compaction should not apply the optimization to output key with sequence
2869 // number equal to 0 if the key is not visible to earliest snapshot, based on
2870 // commit sequence number.
TEST_P(WritePreparedTransactionTest,CompactionShouldKeepSequenceForUncommittedKeys)2871 TEST_P(WritePreparedTransactionTest,
2872        CompactionShouldKeepSequenceForUncommittedKeys) {
2873   options.disable_auto_compactions = true;
2874   ReOpen();
2875   // Keep track of expected sequence number.
2876   SequenceNumber expected_seq = 0;
2877   auto* transaction = db->BeginTransaction(WriteOptions());
2878   ASSERT_OK(transaction->SetName("txn"));
2879   ASSERT_OK(transaction->Put("key1", "value1"));
2880   ASSERT_OK(transaction->Prepare());
2881   ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2882   SequenceNumber seq1 = expected_seq;
2883   ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2884   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
2885   expected_seq++;  // one for data
2886   if (options.two_write_queues) {
2887     expected_seq++;  // one for commit
2888   }
2889   ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2890   ASSERT_OK(db->Flush(FlushOptions()));
2891   // Dummy keys to avoid compaction trivially move files and get around actual
2892   // compaction logic.
2893   ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2894   ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2895   ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2896   VerifyKeys({
2897       {"key1", "NOT_FOUND"},
2898       {"key2", "value2"},
2899   });
2900   VerifyInternalKeys({
2901       // "key1" has not been committed. It keeps its sequence number.
2902       {"key1", "value1", seq1, kTypeValue},
2903       // "key2" is committed and output with seq = 0.
2904       {"key2", "value2", 0, kTypeValue},
2905   });
2906   ASSERT_OK(transaction->Commit());
2907   VerifyKeys({
2908       {"key1", "value1"},
2909       {"key2", "value2"},
2910   });
2911   delete transaction;
2912 }
2913 
TEST_P(WritePreparedTransactionTest,CommitAndSnapshotDuringCompaction)2914 TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
2915   options.disable_auto_compactions = true;
2916   ReOpen();
2917 
2918   const Snapshot* snapshot = nullptr;
2919   ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2920   auto* txn = db->BeginTransaction(WriteOptions());
2921   ASSERT_OK(txn->SetName("txn"));
2922   ASSERT_OK(txn->Put("key1", "value2"));
2923   ASSERT_OK(txn->Prepare());
2924 
2925   auto callback = [&](void*) {
2926     // Snapshot is taken after compaction start. It should be taken into
2927     // consideration for whether to compact out value1.
2928     snapshot = db->GetSnapshot();
2929     ASSERT_OK(txn->Commit());
2930     delete txn;
2931   };
2932   SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2933                                         callback);
2934   SyncPoint::GetInstance()->EnableProcessing();
2935   ASSERT_OK(db->Flush(FlushOptions()));
2936   ASSERT_NE(nullptr, snapshot);
2937   VerifyKeys({{"key1", "value2"}});
2938   VerifyKeys({{"key1", "value1"}}, snapshot);
2939   db->ReleaseSnapshot(snapshot);
2940 }
2941 
TEST_P(WritePreparedTransactionTest,Iterate)2942 TEST_P(WritePreparedTransactionTest, Iterate) {
2943   auto verify_state = [](Iterator* iter, const std::string& key,
2944                          const std::string& value) {
2945     ASSERT_TRUE(iter->Valid());
2946     ASSERT_OK(iter->status());
2947     ASSERT_EQ(key, iter->key().ToString());
2948     ASSERT_EQ(value, iter->value().ToString());
2949   };
2950 
2951   auto verify_iter = [&](const std::string& expected_val) {
2952     // Get iterator from a concurrent transaction and make sure it has the
2953     // same view as an iterator from the DB.
2954     auto* txn = db->BeginTransaction(WriteOptions());
2955 
2956     for (int i = 0; i < 2; i++) {
2957       Iterator* iter = (i == 0)
2958           ? db->NewIterator(ReadOptions())
2959           : txn->GetIterator(ReadOptions());
2960       // Seek
2961       iter->Seek("foo");
2962       verify_state(iter, "foo", expected_val);
2963       // Next
2964       iter->Seek("a");
2965       verify_state(iter, "a", "va");
2966       iter->Next();
2967       verify_state(iter, "foo", expected_val);
2968       // SeekForPrev
2969       iter->SeekForPrev("y");
2970       verify_state(iter, "foo", expected_val);
2971       // Prev
2972       iter->SeekForPrev("z");
2973       verify_state(iter, "z", "vz");
2974       iter->Prev();
2975       verify_state(iter, "foo", expected_val);
2976       delete iter;
2977     }
2978     delete txn;
2979   };
2980 
2981   ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
2982   auto* transaction = db->BeginTransaction(WriteOptions());
2983   ASSERT_OK(transaction->SetName("txn"));
2984   ASSERT_OK(transaction->Put("foo", "v2"));
2985   ASSERT_OK(transaction->Prepare());
2986   VerifyKeys({{"foo", "v1"}});
2987   // dummy keys
2988   ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
2989   ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
2990   verify_iter("v1");
2991   ASSERT_OK(transaction->Commit());
2992   VerifyKeys({{"foo", "v2"}});
2993   verify_iter("v2");
2994   delete transaction;
2995 }
2996 
TEST_P(WritePreparedTransactionTest,IteratorRefreshNotSupported)2997 TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
2998   Iterator* iter = db->NewIterator(ReadOptions());
2999   ASSERT_TRUE(iter->Refresh().IsNotSupported());
3000   delete iter;
3001 }
3002 
3003 // Committing an delayed prepared has two non-atomic steps: update commit cache,
3004 // remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
3005 // non-atomic steps of checking these two data structures. This test breaks each
3006 // in the middle to ensure correctness in spite of non-atomic execution.
3007 // Note: This test is limitted to the case where snapshot is larger than the
3008 // max_evicted_seq_.
TEST_P(WritePreparedTransactionTest,NonAtomicCommitOfDelayedPrepared)3009 TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
3010   const size_t snapshot_cache_bits = 7;  // same as default
3011   const size_t commit_cache_bits = 3;    // 8 entries
3012   for (auto split_read : {true, false}) {
3013     std::vector<bool> split_options = {false};
3014     if (split_read) {
3015       // Also test for break before mutex
3016       split_options.push_back(true);
3017     }
3018     for (auto split_before_mutex : split_options) {
3019       UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3020       ReOpen();
3021       WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3022       DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
3023       // Fill up the commit cache
3024       std::string init_value("value1");
3025       for (int i = 0; i < 10; i++) {
3026         db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
3027       }
3028       // Prepare a transaction but do not commit it
3029       Transaction* txn =
3030           db->BeginTransaction(WriteOptions(), TransactionOptions());
3031       ASSERT_OK(txn->SetName("xid"));
3032       ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
3033       ASSERT_OK(txn->Prepare());
3034       // Commit a bunch of entries to advance max evicted seq and make the
3035       // prepared a delayed prepared
3036       for (int i = 0; i < 10; i++) {
3037         db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3038       }
3039       // The snapshot should not see the delayed prepared entry
3040       auto snap = db->GetSnapshot();
3041 
3042       if (split_read) {
3043         if (split_before_mutex) {
3044           // split before acquiring prepare_mutex_
3045           ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3046               {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
3047                 "AtomicCommitOfDelayedPrepared:Commit:before"},
3048                {"AtomicCommitOfDelayedPrepared:Commit:after",
3049                 "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
3050         } else {
3051           // split right after reading from the commit cache
3052           ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3053               {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
3054                 "AtomicCommitOfDelayedPrepared:Commit:before"},
3055                {"AtomicCommitOfDelayedPrepared:Commit:after",
3056                 "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
3057         }
3058       } else {  // split commit
3059         // split right before removing from delayed_prepared_
3060         ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3061             {{"WritePreparedTxnDB::RemovePrepared:pause",
3062               "AtomicCommitOfDelayedPrepared:Read:before"},
3063              {"AtomicCommitOfDelayedPrepared:Read:after",
3064               "WritePreparedTxnDB::RemovePrepared:resume"}});
3065       }
3066       SyncPoint::GetInstance()->EnableProcessing();
3067 
3068       ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
3069         TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
3070         ASSERT_OK(txn->Commit());
3071         if (split_before_mutex) {
3072           // Do bunch of inserts to evict the commit entry from the cache. This
3073           // would prevent the 2nd look into commit cache under prepare_mutex_
3074           // to see the commit entry.
3075           auto seq = db_impl->TEST_GetLastVisibleSequence();
3076           size_t tries = 0;
3077           while (wp_db->max_evicted_seq_ < seq && tries < 50) {
3078             db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3079             tries++;
3080           };
3081           ASSERT_LT(tries, 50);
3082         }
3083         TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
3084         delete txn;
3085       });
3086 
3087       ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3088         TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
3089         ReadOptions roptions;
3090         roptions.snapshot = snap;
3091         PinnableSlice value;
3092         auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
3093         ASSERT_OK(s);
3094         // It should not see the commit of delayed prepared
3095         ASSERT_TRUE(value == init_value);
3096         TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
3097         db->ReleaseSnapshot(snap);
3098       });
3099 
3100       read_thread.join();
3101       commit_thread.join();
3102       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3103       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3104     }  // for split_before_mutex
3105   }    // for split_read
3106 }
3107 
3108 // When max evicted seq advances a prepared seq, it involves two updates: i)
3109 // adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
3110 // ::IsInSnapshot also reads these two values in a non-atomic way. This test
3111 // ensures correctness if the update occurs after ::IsInSnapshot reads
3112 // delayed_prepared_empty_ and before it reads max_evicted_seq_.
3113 // Note: this test focuses on read snapshot larger than max_evicted_seq_.
TEST_P(WritePreparedTransactionTest,NonAtomicUpdateOfDelayedPrepared)3114 TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
3115   const size_t snapshot_cache_bits = 7;  // same as default
3116   const size_t commit_cache_bits = 3;    // 8 entries
3117   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3118   ReOpen();
3119   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3120   // Fill up the commit cache
3121   std::string init_value("value1");
3122   for (int i = 0; i < 10; i++) {
3123     db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
3124   }
3125   // Prepare a transaction but do not commit it
3126   Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
3127   ASSERT_OK(txn->SetName("xid"));
3128   ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
3129   ASSERT_OK(txn->Prepare());
3130   // Create a gap between prepare seq and snapshot seq
3131   db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3132   db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3133   // The snapshot should not see the delayed prepared entry
3134   auto snap = db->GetSnapshot();
3135   ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
3136 
3137   // split right after reading delayed_prepared_empty_
3138   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3139       {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
3140         "AtomicUpdateOfDelayedPrepared:before"},
3141        {"AtomicUpdateOfDelayedPrepared:after",
3142         "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
3143   SyncPoint::GetInstance()->EnableProcessing();
3144 
3145   ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
3146     TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
3147     // Commit a bunch of entries to advance max evicted seq and make the
3148     // prepared a delayed prepared
3149     size_t tries = 0;
3150     while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
3151       db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3152       tries++;
3153     };
3154     ASSERT_LT(tries, 50);
3155     // This is the case on which the test focuses
3156     ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
3157     TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
3158   });
3159 
3160   ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3161     ReadOptions roptions;
3162     roptions.snapshot = snap;
3163     PinnableSlice value;
3164     auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
3165     ASSERT_OK(s);
3166     // It should not see the uncommitted value of delayed prepared
3167     ASSERT_TRUE(value == init_value);
3168     db->ReleaseSnapshot(snap);
3169   });
3170 
3171   read_thread.join();
3172   commit_thread.join();
3173   ASSERT_OK(txn->Commit());
3174   delete txn;
3175   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3176   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3177 }
3178 
3179 // Eviction from commit cache and update of max evicted seq are two non-atomic
3180 // steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
3181 // from commit cache are two non-atomic steps. This tests if the update occurs
3182 // after reading max_evicted_seq_ and before reading the commit cache.
3183 // Note: the test focuses on snapshot larger than max_evicted_seq_
TEST_P(WritePreparedTransactionTest,NonAtomicUpdateOfMaxEvictedSeq)3184 TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
3185   const size_t snapshot_cache_bits = 7;  // same as default
3186   const size_t commit_cache_bits = 3;    // 8 entries
3187   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3188   ReOpen();
3189   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3190   // Fill up the commit cache
3191   std::string init_value("value1");
3192   std::string last_value("value_final");
3193   for (int i = 0; i < 10; i++) {
3194     db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
3195   }
3196   // Do an uncommitted write to prevent min_uncommitted optimization
3197   Transaction* txn1 =
3198       db->BeginTransaction(WriteOptions(), TransactionOptions());
3199   ASSERT_OK(txn1->SetName("xid1"));
3200   ASSERT_OK(txn1->Put(Slice("key0"), last_value));
3201   ASSERT_OK(txn1->Prepare());
3202   // Do a write with prepare to get the prepare seq
3203   Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
3204   ASSERT_OK(txn->SetName("xid"));
3205   ASSERT_OK(txn->Put(Slice("key1"), last_value));
3206   ASSERT_OK(txn->Prepare());
3207   ASSERT_OK(txn->Commit());
3208   // Create a gap between commit entry and snapshot seq
3209   db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3210   db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3211   // The snapshot should see the last commit
3212   auto snap = db->GetSnapshot();
3213   ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
3214 
3215   // split right after reading max_evicted_seq_
3216   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3217       {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
3218         "NonAtomicUpdateOfMaxEvictedSeq:before"},
3219        {"NonAtomicUpdateOfMaxEvictedSeq:after",
3220         "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
3221   SyncPoint::GetInstance()->EnableProcessing();
3222 
3223   ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
3224     TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
3225     // Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
3226     size_t tries = 0;
3227     while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
3228       db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3229       tries++;
3230     };
3231     ASSERT_LT(tries, 50);
3232     // This is the case on which the test focuses
3233     ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
3234     TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
3235   });
3236 
3237   ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3238     ReadOptions roptions;
3239     roptions.snapshot = snap;
3240     PinnableSlice value;
3241     auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
3242     ASSERT_OK(s);
3243     // It should see the committed value of the evicted entry
3244     ASSERT_TRUE(value == last_value);
3245     db->ReleaseSnapshot(snap);
3246   });
3247 
3248   read_thread.join();
3249   commit_thread.join();
3250   delete txn;
3251   txn1->Commit();
3252   delete txn1;
3253   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3254   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3255 }
3256 
3257 // Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
3258 // that. The test focuses on a race condition between AddPrepared and
3259 // AdvanceMaxEvictedSeq functions.
TEST_P(WritePreparedTransactionTest,AddPreparedBeforeMax)3260 TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
3261   if (!options.two_write_queues) {
3262     // This test is only for two write queues
3263     return;
3264   }
3265   const size_t snapshot_cache_bits = 7;  // same as default
3266   // 1 entry to advance max after the 2nd commit
3267   const size_t commit_cache_bits = 0;
3268   UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3269   ReOpen();
3270   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3271   std::string some_value("value_some");
3272   std::string uncommitted_value("value_uncommitted");
3273   // Prepare two uncommitted transactions
3274   Transaction* txn1 =
3275       db->BeginTransaction(WriteOptions(), TransactionOptions());
3276   ASSERT_OK(txn1->SetName("xid1"));
3277   ASSERT_OK(txn1->Put(Slice("key1"), some_value));
3278   ASSERT_OK(txn1->Prepare());
3279   Transaction* txn2 =
3280       db->BeginTransaction(WriteOptions(), TransactionOptions());
3281   ASSERT_OK(txn2->SetName("xid2"));
3282   ASSERT_OK(txn2->Put(Slice("key2"), some_value));
3283   ASSERT_OK(txn2->Prepare());
3284   // Start the txn here so the other thread could get its id
3285   Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
3286   ASSERT_OK(txn->SetName("xid"));
3287   ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
3288   port::Mutex txn_mutex_;
3289 
3290   // t1) Insert prepared entry, t2) commit other entries to advance max
3291   // evicted sec and finish checking the existing prepared entries, t1)
3292   // AddPrepared, t2) update max_evicted_seq_
3293   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3294       {"AddPreparedCallback::AddPrepared::begin:pause",
3295        "AddPreparedBeforeMax::read_thread:start"},
3296       {"AdvanceMaxEvictedSeq::update_max:pause",
3297        "AddPreparedCallback::AddPrepared::begin:resume"},
3298       {"AddPreparedCallback::AddPrepared::end",
3299        "AdvanceMaxEvictedSeq::update_max:resume"},
3300   });
3301   SyncPoint::GetInstance()->EnableProcessing();
3302 
3303   ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
3304     txn_mutex_.Lock();
3305     ASSERT_OK(txn->Prepare());
3306     txn_mutex_.Unlock();
3307   });
3308 
3309   ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3310     TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
3311     // Publish seq number with a commit
3312     ASSERT_OK(txn1->Commit());
3313     // Since the commit cache size is one the 2nd commit evict the 1st one and
3314     // invokes AdcanceMaxEvictedSeq
3315     ASSERT_OK(txn2->Commit());
3316 
3317     ReadOptions roptions;
3318     PinnableSlice value;
3319     // The snapshot should not see the uncommitted value from write_thread
3320     auto snap = db->GetSnapshot();
3321     ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
3322     // This is the scenario that we test for
3323     txn_mutex_.Lock();
3324     ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId());
3325     txn_mutex_.Unlock();
3326     roptions.snapshot = snap;
3327     auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value);
3328     ASSERT_TRUE(s.IsNotFound());
3329     db->ReleaseSnapshot(snap);
3330   });
3331 
3332   read_thread.join();
3333   write_thread.join();
3334   delete txn1;
3335   delete txn2;
3336   ASSERT_OK(txn->Commit());
3337   delete txn;
3338   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3339   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3340 }
3341 
3342 // When an old prepared entry gets committed, there is a gap between the time
3343 // that it is published and when it is cleaned up from old_prepared_. This test
3344 // stresses such cases.
TEST_P(WritePreparedTransactionTest,CommitOfDelayedPrepared)3345 TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
3346   const size_t snapshot_cache_bits = 7;  // same as default
3347   for (const size_t commit_cache_bits : {0, 2, 3}) {
3348     for (const size_t sub_batch_cnt : {1, 2, 3}) {
3349       UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3350       ReOpen();
3351       std::atomic<const Snapshot*> snap = {nullptr};
3352       std::atomic<SequenceNumber> exp_prepare = {0};
3353       ROCKSDB_NAMESPACE::port::Thread callback_thread;
3354       // Value is synchronized via snap
3355       PinnableSlice value;
3356       // Take a snapshot after publish and before RemovePrepared:Start
3357       auto snap_callback = [&]() {
3358         ASSERT_EQ(nullptr, snap.load());
3359         snap.store(db->GetSnapshot());
3360         ReadOptions roptions;
3361         roptions.snapshot = snap.load();
3362         auto s = db->Get(roptions, db->DefaultColumnFamily(), "key2", &value);
3363         ASSERT_OK(s);
3364       };
3365       auto callback = [&](void* param) {
3366         SequenceNumber prep_seq = *((SequenceNumber*)param);
3367         if (prep_seq == exp_prepare.load()) {  // only for write_thread
3368           // We need to spawn a thread to avoid deadlock since getting a
3369           // snpashot might end up calling AdvanceSeqByOne which needs joining
3370           // the write queue.
3371           callback_thread = ROCKSDB_NAMESPACE::port::Thread(snap_callback);
3372           TEST_SYNC_POINT("callback:end");
3373         }
3374       };
3375       // Wait for the first snapshot be taken in GetSnapshotInternal. Although
3376       // it might be updated before GetSnapshotInternal finishes but this should
3377       // cover most of the cases.
3378       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3379           {"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
3380       });
3381       SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
3382       SyncPoint::GetInstance()->EnableProcessing();
3383       // Thread to cause frequent evictions
3384       ROCKSDB_NAMESPACE::port::Thread eviction_thread([&]() {
3385         // Too many txns might cause commit_seq - prepare_seq in another thread
3386         // to go beyond DELTA_UPPERBOUND
3387         for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
3388           db->Put(WriteOptions(), Slice("key1"), Slice("value1"));
3389         }
3390       });
3391       ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
3392         for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
3393           Transaction* txn =
3394               db->BeginTransaction(WriteOptions(), TransactionOptions());
3395           ASSERT_OK(txn->SetName("xid"));
3396           std::string val_str = "value" + ToString(i);
3397           for (size_t b = 0; b < sub_batch_cnt; b++) {
3398             ASSERT_OK(txn->Put(Slice("key2"), val_str));
3399           }
3400           ASSERT_OK(txn->Prepare());
3401           // Let an eviction to kick in
3402           std::this_thread::yield();
3403 
3404           exp_prepare.store(txn->GetId());
3405           ASSERT_OK(txn->Commit());
3406           delete txn;
3407           // Wait for the snapshot taking that is triggered by
3408           // RemovePrepared:Start callback
3409           callback_thread.join();
3410 
3411           // Read with the snapshot taken before delayed_prepared_ cleanup
3412           ReadOptions roptions;
3413           roptions.snapshot = snap.load();
3414           ASSERT_NE(nullptr, roptions.snapshot);
3415           PinnableSlice value2;
3416           auto s =
3417               db->Get(roptions, db->DefaultColumnFamily(), "key2", &value2);
3418           ASSERT_OK(s);
3419           // It should see its own write
3420           ASSERT_TRUE(val_str == value2);
3421           // The value read by snapshot should not change
3422           ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str());
3423 
3424           db->ReleaseSnapshot(roptions.snapshot);
3425           snap.store(nullptr);
3426         }
3427       });
3428       write_thread.join();
3429       eviction_thread.join();
3430       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3431       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3432     }
3433   }
3434 }
3435 
3436 // Test that updating the commit map will not affect the existing snapshots
TEST_P(WritePreparedTransactionTest,AtomicCommit)3437 TEST_P(WritePreparedTransactionTest, AtomicCommit) {
3438   for (bool skip_prepare : {true, false}) {
3439     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3440         {"WritePreparedTxnDB::AddCommitted:start",
3441          "AtomicCommit::GetSnapshot:start"},
3442         {"AtomicCommit::Get:end",
3443          "WritePreparedTxnDB::AddCommitted:start:pause"},
3444         {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
3445         {"AtomicCommit::Get2:end",
3446          "WritePreparedTxnDB::AddCommitted:end:pause:"},
3447     });
3448     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3449     ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
3450       if (skip_prepare) {
3451         db->Put(WriteOptions(), Slice("key"), Slice("value"));
3452       } else {
3453         Transaction* txn =
3454             db->BeginTransaction(WriteOptions(), TransactionOptions());
3455         ASSERT_OK(txn->SetName("xid"));
3456         ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
3457         ASSERT_OK(txn->Prepare());
3458         ASSERT_OK(txn->Commit());
3459         delete txn;
3460       }
3461     });
3462     ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3463       ReadOptions roptions;
3464       TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
3465       roptions.snapshot = db->GetSnapshot();
3466       PinnableSlice val;
3467       auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
3468       TEST_SYNC_POINT("AtomicCommit::Get:end");
3469       TEST_SYNC_POINT("AtomicCommit::Get2:start");
3470       ASSERT_SAME(roptions, db, s, val, "key");
3471       TEST_SYNC_POINT("AtomicCommit::Get2:end");
3472       db->ReleaseSnapshot(roptions.snapshot);
3473     });
3474     read_thread.join();
3475     write_thread.join();
3476     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3477   }
3478 }
3479 
3480 // Test that we can change write policy from WriteCommitted to WritePrepared
3481 // after a clean shutdown (which would empty the WAL)
TEST_P(WritePreparedTransactionTest,WP_WC_DBBackwardCompatibility)3482 TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
3483   bool empty_wal = true;
3484   CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
3485 }
3486 
3487 // Test that we fail fast if WAL is not emptied between changing the write
3488 // policy from WriteCommitted to WritePrepared
TEST_P(WritePreparedTransactionTest,WP_WC_WALBackwardIncompatibility)3489 TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
3490   bool empty_wal = true;
3491   CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
3492 }
3493 
3494 // Test that we can change write policy from WritePrepare back to WriteCommitted
3495 // after a clean shutdown (which would empty the WAL)
TEST_P(WritePreparedTransactionTest,WC_WP_ForwardCompatibility)3496 TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
3497   bool empty_wal = true;
3498   CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
3499 }
3500 
3501 // Test that we fail fast if WAL is not emptied between changing the write
3502 // policy from WriteCommitted to WritePrepared
TEST_P(WritePreparedTransactionTest,WC_WP_WALForwardIncompatibility)3503 TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
3504   bool empty_wal = true;
3505   CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
3506 }
3507 
3508 }  // namespace ROCKSDB_NAMESPACE
3509 
main(int argc,char ** argv)3510 int main(int argc, char** argv) {
3511   ::testing::InitGoogleTest(&argc, argv);
3512   return RUN_ALL_TESTS();
3513 }
3514 
3515 #else
3516 #include <stdio.h>
3517 
main(int,char **)3518 int main(int /*argc*/, char** /*argv*/) {
3519   fprintf(stderr,
3520           "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
3521   return 0;
3522 }
3523 
3524 #endif  // ROCKSDB_LITE
3525