1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/transaction_test.h"
9 
10 #include <algorithm>
11 #include <functional>
12 #include <string>
13 #include <thread>
14 
15 #include "db/db_impl/db_impl.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/options.h"
18 #include "rocksdb/perf_context.h"
19 #include "rocksdb/utilities/transaction.h"
20 #include "rocksdb/utilities/transaction_db.h"
21 #include "table/mock_table.h"
22 #include "test_util/fault_injection_test_env.h"
23 #include "test_util/sync_point.h"
24 #include "test_util/testharness.h"
25 #include "test_util/testutil.h"
26 #include "test_util/transaction_test_util.h"
27 #include "util/random.h"
28 #include "util/string_util.h"
29 #include "utilities/merge_operators.h"
30 #include "utilities/merge_operators/string_append/stringappend.h"
31 #include "utilities/transactions/pessimistic_transaction_db.h"
32 
33 #include "port/port.h"
34 
35 using std::string;
36 
37 namespace ROCKSDB_NAMESPACE {
38 
39 INSTANTIATE_TEST_CASE_P(
40     DBAsBaseDB, TransactionTest,
41     ::testing::Values(
42         std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
43         std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
44         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
45         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
46         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
47         std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
48         std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
49 INSTANTIATE_TEST_CASE_P(
50     DBAsBaseDB, TransactionStressTest,
51     ::testing::Values(
52         std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
53         std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
54         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
55         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
56         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
57         std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
58         std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
59 INSTANTIATE_TEST_CASE_P(
60     StackableDBAsBaseDB, TransactionTest,
61     ::testing::Values(
62         std::make_tuple(true, true, WRITE_COMMITTED, kOrderedWrite),
63         std::make_tuple(true, true, WRITE_PREPARED, kOrderedWrite),
64         std::make_tuple(true, true, WRITE_UNPREPARED, kOrderedWrite)));
65 
66 // MySQLStyleTransactionTest takes far too long for valgrind to run.
67 #ifndef ROCKSDB_VALGRIND_RUN
68 INSTANTIATE_TEST_CASE_P(
69     MySQLStyleTransactionTest, MySQLStyleTransactionTest,
70     ::testing::Values(
71         std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite, false),
72         std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite, false),
73         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, false),
74         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, true),
75         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, false),
76         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, true),
77         std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, false),
78         std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, true),
79         std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, false),
80         std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, true),
81         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, false),
82         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, true)));
83 #endif  // ROCKSDB_VALGRIND_RUN
84 
TEST_P(TransactionTest,DoubleEmptyWrite)85 TEST_P(TransactionTest, DoubleEmptyWrite) {
86   WriteOptions write_options;
87   write_options.sync = true;
88   write_options.disableWAL = false;
89 
90   WriteBatch batch;
91 
92   ASSERT_OK(db->Write(write_options, &batch));
93   ASSERT_OK(db->Write(write_options, &batch));
94 
95   // Also test committing empty transactions in 2PC
96   TransactionOptions txn_options;
97   Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
98   ASSERT_OK(txn0->SetName("xid"));
99   ASSERT_OK(txn0->Prepare());
100   ASSERT_OK(txn0->Commit());
101   delete txn0;
102 
103   // Also test that it works during recovery
104   txn0 = db->BeginTransaction(write_options, txn_options);
105   ASSERT_OK(txn0->SetName("xid2"));
106   txn0->Put(Slice("foo0"), Slice("bar0a"));
107   ASSERT_OK(txn0->Prepare());
108   delete txn0;
109   reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
110   ASSERT_OK(ReOpenNoDelete());
111   assert(db != nullptr);
112   txn0 = db->GetTransactionByName("xid2");
113   ASSERT_OK(txn0->Commit());
114   delete txn0;
115 }
116 
TEST_P(TransactionTest,SuccessTest)117 TEST_P(TransactionTest, SuccessTest) {
118   ASSERT_OK(db->ResetStats());
119 
120   WriteOptions write_options;
121   ReadOptions read_options;
122   std::string value;
123 
124   ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
125   ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
126 
127   Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
128   ASSERT_TRUE(txn);
129 
130   ASSERT_EQ(0, txn->GetNumPuts());
131   ASSERT_LE(0, txn->GetID());
132 
133   ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
134   ASSERT_EQ(value, "bar");
135 
136   ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
137 
138   ASSERT_EQ(1, txn->GetNumPuts());
139 
140   ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
141   ASSERT_EQ(value, "bar2");
142 
143   ASSERT_OK(txn->Commit());
144 
145   ASSERT_OK(db->Get(read_options, "foo", &value));
146   ASSERT_EQ(value, "bar2");
147 
148   delete txn;
149 }
150 
151 // The test clarifies the contract of do_validate and assume_tracked
152 // in GetForUpdate and Put/Merge/Delete
TEST_P(TransactionTest,AssumeExclusiveTracked)153 TEST_P(TransactionTest, AssumeExclusiveTracked) {
154   WriteOptions write_options;
155   ReadOptions read_options;
156   std::string value;
157   Status s;
158   TransactionOptions txn_options;
159   txn_options.lock_timeout = 1;
160   const bool EXCLUSIVE = true;
161   const bool DO_VALIDATE = true;
162   const bool ASSUME_LOCKED = true;
163 
164   Transaction* txn = db->BeginTransaction(write_options, txn_options);
165   ASSERT_TRUE(txn);
166   txn->SetSnapshot();
167 
168   // commit a value after the snapshot is taken
169   ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
170 
171   // By default write should fail to the commit after our snapshot
172   s = txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE);
173   ASSERT_TRUE(s.IsBusy());
174   // But the user could direct the db to skip validating the snapshot. The read
175   // value then should be the most recently committed
176   ASSERT_OK(
177       txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE, !DO_VALIDATE));
178   ASSERT_EQ(value, "bar");
179 
180   // Although ValidateSnapshot is skipped the key must have still got locked
181   s = db->Put(write_options, Slice("foo"), Slice("bar"));
182   ASSERT_TRUE(s.IsTimedOut());
183 
184   // By default the write operations should fail due to the commit after the
185   // snapshot
186   s = txn->Put(Slice("foo"), Slice("bar1"));
187   ASSERT_TRUE(s.IsBusy());
188   s = txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
189                !ASSUME_LOCKED);
190   ASSERT_TRUE(s.IsBusy());
191   // But the user could direct the db that it already assumes exclusive lock on
192   // the key due to the previous GetForUpdate call.
193   ASSERT_OK(txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
194                      ASSUME_LOCKED));
195   ASSERT_OK(txn->Merge(db->DefaultColumnFamily(), Slice("foo"), Slice("bar2"),
196                        ASSUME_LOCKED));
197   ASSERT_OK(
198       txn->Delete(db->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED));
199   ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"),
200                               ASSUME_LOCKED));
201 
202   txn->Rollback();
203   delete txn;
204 }
205 
206 // This test clarifies the contract of ValidateSnapshot
TEST_P(TransactionTest,ValidateSnapshotTest)207 TEST_P(TransactionTest, ValidateSnapshotTest) {
208   for (bool with_flush : {true}) {
209     for (bool with_2pc : {true}) {
210       ASSERT_OK(ReOpen());
211       WriteOptions write_options;
212       ReadOptions read_options;
213       std::string value;
214 
215       assert(db != nullptr);
216       Transaction* txn1 =
217           db->BeginTransaction(write_options, TransactionOptions());
218       ASSERT_TRUE(txn1);
219       ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1")));
220       if (with_2pc) {
221         ASSERT_OK(txn1->SetName("xid1"));
222         ASSERT_OK(txn1->Prepare());
223       }
224 
225       if (with_flush) {
226         auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
227         db_impl->TEST_FlushMemTable(true);
228         // Make sure the flushed memtable is not kept in memory
229         int max_memtable_in_history =
230             std::max(
231                 options.max_write_buffer_number,
232                 static_cast<int>(options.max_write_buffer_size_to_maintain) /
233                     static_cast<int>(options.write_buffer_size)) +
234             1;
235         for (int i = 0; i < max_memtable_in_history; i++) {
236           db->Put(write_options, Slice("key"), Slice("value"));
237           db_impl->TEST_FlushMemTable(true);
238         }
239       }
240 
241       Transaction* txn2 =
242           db->BeginTransaction(write_options, TransactionOptions());
243       ASSERT_TRUE(txn2);
244       txn2->SetSnapshot();
245 
246       ASSERT_OK(txn1->Commit());
247       delete txn1;
248 
249       auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
250       // Test the simple case where the key is not tracked yet
251       auto trakced_seq = kMaxSequenceNumber;
252       auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
253                                           &trakced_seq);
254       ASSERT_TRUE(s.IsBusy());
255       delete txn2;
256     }
257   }
258 }
259 
TEST_P(TransactionTest,WaitingTxn)260 TEST_P(TransactionTest, WaitingTxn) {
261   WriteOptions write_options;
262   ReadOptions read_options;
263   TransactionOptions txn_options;
264   string value;
265   Status s;
266 
267   txn_options.lock_timeout = 1;
268   s = db->Put(write_options, Slice("foo"), Slice("bar"));
269   ASSERT_OK(s);
270 
271   /* create second cf */
272   ColumnFamilyHandle* cfa;
273   ColumnFamilyOptions cf_options;
274   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
275   ASSERT_OK(s);
276   s = db->Put(write_options, cfa, Slice("foo"), Slice("bar"));
277   ASSERT_OK(s);
278 
279   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
280   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
281   TransactionID id1 = txn1->GetID();
282   ASSERT_TRUE(txn1);
283   ASSERT_TRUE(txn2);
284 
285   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
286       "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) {
287         std::string key;
288         uint32_t cf_id;
289         std::vector<TransactionID> wait = txn2->GetWaitingTxns(&cf_id, &key);
290         ASSERT_EQ(key, "foo");
291         ASSERT_EQ(wait.size(), 1);
292         ASSERT_EQ(wait[0], id1);
293         ASSERT_EQ(cf_id, 0U);
294       });
295 
296   get_perf_context()->Reset();
297   // lock key in default cf
298   s = txn1->GetForUpdate(read_options, "foo", &value);
299   ASSERT_OK(s);
300   ASSERT_EQ(value, "bar");
301   ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
302 
303   // lock key in cfa
304   s = txn1->GetForUpdate(read_options, cfa, "foo", &value);
305   ASSERT_OK(s);
306   ASSERT_EQ(value, "bar");
307   ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
308 
309   auto lock_data = db->GetLockStatusData();
310   // Locked keys exist in both column family.
311   ASSERT_EQ(lock_data.size(), 2);
312 
313   auto cf_iterator = lock_data.begin();
314 
315   // The iterator points to an unordered_multimap
316   // thus the test can not assume any particular order.
317 
318   // Column family is 1 or 0 (cfa).
319   if (cf_iterator->first != 1 && cf_iterator->first != 0) {
320     FAIL();
321   }
322   // The locked key is "foo" and is locked by txn1
323   ASSERT_EQ(cf_iterator->second.key, "foo");
324   ASSERT_EQ(cf_iterator->second.ids.size(), 1);
325   ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
326 
327   cf_iterator++;
328 
329   // Column family is 0 (default) or 1.
330   if (cf_iterator->first != 1 && cf_iterator->first != 0) {
331     FAIL();
332   }
333   // The locked key is "foo" and is locked by txn1
334   ASSERT_EQ(cf_iterator->second.key, "foo");
335   ASSERT_EQ(cf_iterator->second.ids.size(), 1);
336   ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
337 
338   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
339 
340   s = txn2->GetForUpdate(read_options, "foo", &value);
341   ASSERT_TRUE(s.IsTimedOut());
342   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
343   ASSERT_EQ(get_perf_context()->key_lock_wait_count, 1);
344   ASSERT_GE(get_perf_context()->key_lock_wait_time, 0);
345 
346   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
347   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
348 
349   delete cfa;
350   delete txn1;
351   delete txn2;
352 }
353 
TEST_P(TransactionTest,SharedLocks)354 TEST_P(TransactionTest, SharedLocks) {
355   WriteOptions write_options;
356   ReadOptions read_options;
357   TransactionOptions txn_options;
358   Status s;
359 
360   txn_options.lock_timeout = 1;
361   s = db->Put(write_options, Slice("foo"), Slice("bar"));
362   ASSERT_OK(s);
363 
364   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
365   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
366   Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
367   ASSERT_TRUE(txn1);
368   ASSERT_TRUE(txn2);
369   ASSERT_TRUE(txn3);
370 
371   // Test shared access between txns
372   s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
373   ASSERT_OK(s);
374 
375   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
376   ASSERT_OK(s);
377 
378   s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
379   ASSERT_OK(s);
380 
381   auto lock_data = db->GetLockStatusData();
382   ASSERT_EQ(lock_data.size(), 1);
383 
384   auto cf_iterator = lock_data.begin();
385   ASSERT_EQ(cf_iterator->second.key, "foo");
386 
387   // We compare whether the set of txns locking this key is the same. To do
388   // this, we need to sort both vectors so that the comparison is done
389   // correctly.
390   std::vector<TransactionID> expected_txns = {txn1->GetID(), txn2->GetID(),
391                                               txn3->GetID()};
392   std::vector<TransactionID> lock_txns = cf_iterator->second.ids;
393   ASSERT_EQ(expected_txns, lock_txns);
394   ASSERT_FALSE(cf_iterator->second.exclusive);
395 
396   txn1->Rollback();
397   txn2->Rollback();
398   txn3->Rollback();
399 
400   // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
401   s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
402   ASSERT_OK(s);
403 
404   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
405   ASSERT_OK(s);
406 
407   s = txn3->GetForUpdate(read_options, "foo", nullptr);
408   ASSERT_TRUE(s.IsTimedOut());
409   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
410 
411   txn1->UndoGetForUpdate("foo");
412   s = txn3->GetForUpdate(read_options, "foo", nullptr);
413   ASSERT_TRUE(s.IsTimedOut());
414   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
415 
416   txn2->UndoGetForUpdate("foo");
417   s = txn3->GetForUpdate(read_options, "foo", nullptr);
418   ASSERT_OK(s);
419 
420   txn1->Rollback();
421   txn2->Rollback();
422   txn3->Rollback();
423 
424   // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
425   s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
426   ASSERT_OK(s);
427 
428   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
429   ASSERT_OK(s);
430 
431   s = txn2->GetForUpdate(read_options, "foo", nullptr);
432   ASSERT_TRUE(s.IsTimedOut());
433   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
434 
435   txn1->UndoGetForUpdate("foo");
436   s = txn2->GetForUpdate(read_options, "foo", nullptr);
437   ASSERT_OK(s);
438 
439   ASSERT_OK(txn1->Rollback());
440   ASSERT_OK(txn2->Rollback());
441 
442   // Test txn1 trying to downgrade its lock.
443   s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */);
444   ASSERT_OK(s);
445 
446   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
447   ASSERT_TRUE(s.IsTimedOut());
448   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
449 
450   // Should still fail after "downgrading".
451   s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
452   ASSERT_OK(s);
453 
454   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
455   ASSERT_TRUE(s.IsTimedOut());
456   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
457 
458   txn1->Rollback();
459   txn2->Rollback();
460 
461   // Test txn1 holding an exclusive lock and txn2 trying to obtain shared
462   // access.
463   s = txn1->GetForUpdate(read_options, "foo", nullptr);
464   ASSERT_OK(s);
465 
466   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
467   ASSERT_TRUE(s.IsTimedOut());
468   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
469 
470   txn1->UndoGetForUpdate("foo");
471   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
472   ASSERT_OK(s);
473 
474   delete txn1;
475   delete txn2;
476   delete txn3;
477 }
478 
TEST_P(TransactionTest,DeadlockCycleShared)479 TEST_P(TransactionTest, DeadlockCycleShared) {
480   WriteOptions write_options;
481   ReadOptions read_options;
482   TransactionOptions txn_options;
483 
484   txn_options.lock_timeout = 1000000;
485   txn_options.deadlock_detect = true;
486 
487   // Set up a wait for chain like this:
488   //
489   // Tn -> T(n*2)
490   // Tn -> T(n*2 + 1)
491   //
492   // So we have:
493   // T1 -> T2 -> T4 ...
494   //    |     |> T5 ...
495   //    |> T3 -> T6 ...
496   //          |> T7 ...
497   // up to T31, then T[16 - 31] -> T1.
498   // Note that Tn holds lock on floor(n / 2).
499 
500   std::vector<Transaction*> txns(31);
501 
502   for (uint32_t i = 0; i < 31; i++) {
503     txns[i] = db->BeginTransaction(write_options, txn_options);
504     ASSERT_TRUE(txns[i]);
505     auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr,
506                                    false /* exclusive */);
507     ASSERT_OK(s);
508   }
509 
510   std::atomic<uint32_t> checkpoints(0);
511   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
512       "TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
513       [&](void* /*arg*/) { checkpoints.fetch_add(1); });
514   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
515 
516   // We want the leaf transactions to block and hold everyone back.
517   std::vector<port::Thread> threads;
518   for (uint32_t i = 0; i < 15; i++) {
519     std::function<void()> blocking_thread = [&, i] {
520       auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr,
521                                      true /* exclusive */);
522       ASSERT_OK(s);
523       txns[i]->Rollback();
524       delete txns[i];
525     };
526     threads.emplace_back(blocking_thread);
527   }
528 
529   // Wait until all threads are waiting on each other.
530   while (checkpoints.load() != 15) {
531     /* sleep override */
532     std::this_thread::sleep_for(std::chrono::milliseconds(100));
533   }
534   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
535   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
536 
537   // Complete the cycle T[16 - 31] -> T1
538   for (uint32_t i = 15; i < 31; i++) {
539     auto s =
540         txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */);
541     ASSERT_TRUE(s.IsDeadlock());
542 
543     // Calculate next buffer len, plateau at 5 when 5 records are inserted.
544     const uint32_t curr_dlock_buffer_len_ =
545         (i - 14 > kInitialMaxDeadlocks) ? kInitialMaxDeadlocks : (i - 14);
546 
547     auto dlock_buffer = db->GetDeadlockInfoBuffer();
548     ASSERT_EQ(dlock_buffer.size(), curr_dlock_buffer_len_);
549     auto dlock_entry = dlock_buffer[0].path;
550     ASSERT_EQ(dlock_entry.size(), kInitialMaxDeadlocks);
551     int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
552     int64_t cur_deadlock_time = 0;
553     for (auto const& dl_path_rec : dlock_buffer) {
554       cur_deadlock_time = dl_path_rec.deadlock_time;
555       ASSERT_NE(cur_deadlock_time, 0);
556       ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
557       pre_deadlock_time = cur_deadlock_time;
558     }
559 
560     int64_t curr_waiting_key = 0;
561 
562     // Offset of each txn id from the root of the shared dlock tree's txn id.
563     int64_t offset_root = dlock_entry[0].m_txn_id - 1;
564     // Offset of the final entry in the dlock path from the root's txn id.
565     TransactionID leaf_id =
566         dlock_entry[dlock_entry.size() - 1].m_txn_id - offset_root;
567 
568     for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
569       auto dl_node = *it;
570       ASSERT_EQ(dl_node.m_txn_id, offset_root + leaf_id);
571       ASSERT_EQ(dl_node.m_cf_id, 0U);
572       ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
573       ASSERT_EQ(dl_node.m_exclusive, true);
574 
575       if (curr_waiting_key == 0) {
576         curr_waiting_key = leaf_id;
577       }
578       curr_waiting_key /= 2;
579       leaf_id /= 2;
580     }
581   }
582 
583   // Rollback the leaf transaction.
584   for (uint32_t i = 15; i < 31; i++) {
585     txns[i]->Rollback();
586     delete txns[i];
587   }
588 
589   for (auto& t : threads) {
590     t.join();
591   }
592 
593   // Downsize the buffer and verify the 3 latest deadlocks are preserved.
594   auto dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
595   db->SetDeadlockInfoBufferSize(3);
596   auto dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
597   ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
598 
599   for (uint32_t i = 0; i < dlock_buffer_after_resize.size(); i++) {
600     for (uint32_t j = 0; j < dlock_buffer_after_resize[i].path.size(); j++) {
601       ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
602                 dlock_buffer_before_resize[i].path[j].m_txn_id);
603     }
604   }
605 
606   // Upsize the buffer and verify the 3 latest dealocks are preserved.
607   dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
608   db->SetDeadlockInfoBufferSize(5);
609   dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
610   ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
611 
612   for (uint32_t i = 0; i < dlock_buffer_before_resize.size(); i++) {
613     for (uint32_t j = 0; j < dlock_buffer_before_resize[i].path.size(); j++) {
614       ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
615                 dlock_buffer_before_resize[i].path[j].m_txn_id);
616     }
617   }
618 
619   // Downsize to 0 and verify the size is consistent.
620   dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
621   db->SetDeadlockInfoBufferSize(0);
622   dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
623   ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
624 
625   // Upsize from 0 to verify the size is persistent.
626   dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
627   db->SetDeadlockInfoBufferSize(3);
628   dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
629   ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
630 
631   // Contrived case of shared lock of cycle size 2 to verify that a shared
632   // lock causing a deadlock is correctly reported as "shared" in the buffer.
633   std::vector<Transaction*> txns_shared(2);
634 
635   // Create a cycle of size 2.
636   for (uint32_t i = 0; i < 2; i++) {
637     txns_shared[i] = db->BeginTransaction(write_options, txn_options);
638     ASSERT_TRUE(txns_shared[i]);
639     auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i), nullptr);
640     ASSERT_OK(s);
641   }
642 
643   std::atomic<uint32_t> checkpoints_shared(0);
644   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
645       "TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
646       [&](void* /*arg*/) { checkpoints_shared.fetch_add(1); });
647   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
648 
649   std::vector<port::Thread> threads_shared;
650   for (uint32_t i = 0; i < 1; i++) {
651     std::function<void()> blocking_thread = [&, i] {
652       auto s =
653           txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
654       ASSERT_OK(s);
655       txns_shared[i]->Rollback();
656       delete txns_shared[i];
657     };
658     threads_shared.emplace_back(blocking_thread);
659   }
660 
661   // Wait until all threads are waiting on each other.
662   while (checkpoints_shared.load() != 1) {
663     /* sleep override */
664     std::this_thread::sleep_for(std::chrono::milliseconds(100));
665   }
666   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
667   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
668 
669   // Complete the cycle T2 -> T1 with a shared lock.
670   auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false);
671   ASSERT_TRUE(s.IsDeadlock());
672 
673   auto dlock_buffer = db->GetDeadlockInfoBuffer();
674 
675   // Verify the size of the buffer and the single path.
676   ASSERT_EQ(dlock_buffer.size(), 1);
677   ASSERT_EQ(dlock_buffer[0].path.size(), 2);
678 
679   // Verify the exclusivity field of the transactions in the deadlock path.
680   ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive);
681   ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive);
682   txns_shared[1]->Rollback();
683   delete txns_shared[1];
684 
685   for (auto& t : threads_shared) {
686     t.join();
687   }
688 }
689 
690 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionStressTest,DeadlockCycle)691 TEST_P(TransactionStressTest, DeadlockCycle) {
692   WriteOptions write_options;
693   ReadOptions read_options;
694   TransactionOptions txn_options;
695 
696   // offset by 2 from the max depth to test edge case
697   const uint32_t kMaxCycleLength = 52;
698 
699   txn_options.lock_timeout = 1000000;
700   txn_options.deadlock_detect = true;
701 
702   for (uint32_t len = 2; len < kMaxCycleLength; len++) {
703     // Set up a long wait for chain like this:
704     //
705     // T1 -> T2 -> T3 -> ... -> Tlen
706 
707     std::vector<Transaction*> txns(len);
708 
709     for (uint32_t i = 0; i < len; i++) {
710       txns[i] = db->BeginTransaction(write_options, txn_options);
711       ASSERT_TRUE(txns[i]);
712       auto s = txns[i]->GetForUpdate(read_options, ToString(i), nullptr);
713       ASSERT_OK(s);
714     }
715 
716     std::atomic<uint32_t> checkpoints(0);
717     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
718         "TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
719         [&](void* /*arg*/) { checkpoints.fetch_add(1); });
720     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
721 
722     // We want the last transaction in the chain to block and hold everyone
723     // back.
724     std::vector<port::Thread> threads;
725     for (uint32_t i = 0; i < len - 1; i++) {
726       std::function<void()> blocking_thread = [&, i] {
727         auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
728         ASSERT_OK(s);
729         txns[i]->Rollback();
730         delete txns[i];
731       };
732       threads.emplace_back(blocking_thread);
733     }
734 
735     // Wait until all threads are waiting on each other.
736     while (checkpoints.load() != len - 1) {
737       /* sleep override */
738       std::this_thread::sleep_for(std::chrono::milliseconds(100));
739     }
740     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
741     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
742 
743     // Complete the cycle Tlen -> T1
744     auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr);
745     ASSERT_TRUE(s.IsDeadlock());
746 
747     const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1);
748     uint32_t curr_waiting_key = 0;
749     TransactionID curr_txn_id = txns[0]->GetID();
750 
751     auto dlock_buffer = db->GetDeadlockInfoBuffer();
752     ASSERT_EQ(dlock_buffer.size(), dlock_buffer_size_);
753     uint32_t check_len = len;
754     bool check_limit_flag = false;
755 
756     // Special case for a deadlock path that exceeds the maximum depth.
757     if (len > 50) {
758       check_len = 0;
759       check_limit_flag = true;
760     }
761     auto dlock_entry = dlock_buffer[0].path;
762     ASSERT_EQ(dlock_entry.size(), check_len);
763     ASSERT_EQ(dlock_buffer[0].limit_exceeded, check_limit_flag);
764 
765     int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
766     int64_t cur_deadlock_time = 0;
767     for (auto const& dl_path_rec : dlock_buffer) {
768       cur_deadlock_time = dl_path_rec.deadlock_time;
769       ASSERT_NE(cur_deadlock_time, 0);
770       ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
771       pre_deadlock_time = cur_deadlock_time;
772     }
773 
774     // Iterates backwards over path verifying decreasing txn_ids.
775     for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
776       auto dl_node = *it;
777       ASSERT_EQ(dl_node.m_txn_id, len + curr_txn_id - 1);
778       ASSERT_EQ(dl_node.m_cf_id, 0u);
779       ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
780       ASSERT_EQ(dl_node.m_exclusive, true);
781 
782       curr_txn_id--;
783       if (curr_waiting_key == 0) {
784         curr_waiting_key = len;
785       }
786       curr_waiting_key--;
787     }
788 
789     // Rollback the last transaction.
790     txns[len - 1]->Rollback();
791     delete txns[len - 1];
792 
793     for (auto& t : threads) {
794       t.join();
795     }
796   }
797 }
798 
TEST_P(TransactionStressTest,DeadlockStress)799 TEST_P(TransactionStressTest, DeadlockStress) {
800   const uint32_t NUM_TXN_THREADS = 10;
801   const uint32_t NUM_KEYS = 100;
802   const uint32_t NUM_ITERS = 10000;
803 
804   WriteOptions write_options;
805   ReadOptions read_options;
806   TransactionOptions txn_options;
807 
808   txn_options.lock_timeout = 1000000;
809   txn_options.deadlock_detect = true;
810   std::vector<std::string> keys;
811 
812   for (uint32_t i = 0; i < NUM_KEYS; i++) {
813     db->Put(write_options, Slice(ToString(i)), Slice(""));
814     keys.push_back(ToString(i));
815   }
816 
817   size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
818   Random rnd(static_cast<uint32_t>(tid));
819   std::function<void(uint32_t)> stress_thread = [&](uint32_t seed) {
820     std::default_random_engine g(seed);
821 
822     Transaction* txn;
823     for (uint32_t i = 0; i < NUM_ITERS; i++) {
824       txn = db->BeginTransaction(write_options, txn_options);
825       auto random_keys = keys;
826       std::shuffle(random_keys.begin(), random_keys.end(), g);
827 
828       // Lock keys in random order.
829       for (const auto& k : random_keys) {
830         // Lock mostly for shared access, but exclusive 1/4 of the time.
831         auto s =
832             txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0);
833         if (!s.ok()) {
834           ASSERT_TRUE(s.IsDeadlock());
835           txn->Rollback();
836           break;
837         }
838       }
839 
840       delete txn;
841     }
842   };
843 
844   std::vector<port::Thread> threads;
845   for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
846     threads.emplace_back(stress_thread, rnd.Next());
847   }
848 
849   for (auto& t : threads) {
850     t.join();
851   }
852 }
853 #endif  // ROCKSDB_VALGRIND_RUN
854 
TEST_P(TransactionTest,CommitTimeBatchFailTest)855 TEST_P(TransactionTest, CommitTimeBatchFailTest) {
856   WriteOptions write_options;
857   TransactionOptions txn_options;
858 
859   std::string value;
860   Status s;
861 
862   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
863   ASSERT_TRUE(txn1);
864 
865   ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"));
866 
867   s = txn1->Put("foo", "bar");
868   ASSERT_OK(s);
869 
870   // fails due to non-empty commit-time batch
871   s = txn1->Commit();
872   ASSERT_EQ(s, Status::InvalidArgument());
873 
874   delete txn1;
875 }
876 
TEST_P(TransactionTest,LogMarkLeakTest)877 TEST_P(TransactionTest, LogMarkLeakTest) {
878   TransactionOptions txn_options;
879   WriteOptions write_options;
880   options.write_buffer_size = 1024;
881   ASSERT_OK(ReOpenNoDelete());
882   assert(db != nullptr);
883   Random rnd(47);
884   std::vector<Transaction*> txns;
885   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
886   // At the beginning there should be no log containing prepare data
887   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
888   for (size_t i = 0; i < 100; i++) {
889     Transaction* txn = db->BeginTransaction(write_options, txn_options);
890     ASSERT_OK(txn->SetName("xid" + ToString(i)));
891     ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar")));
892     ASSERT_OK(txn->Prepare());
893     ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
894     if (rnd.OneIn(5)) {
895       txns.push_back(txn);
896     } else {
897       ASSERT_OK(txn->Commit());
898       delete txn;
899     }
900     db_impl->TEST_FlushMemTable(true);
901   }
902   for (auto txn : txns) {
903     ASSERT_OK(txn->Commit());
904     delete txn;
905   }
906   // At the end there should be no log left containing prepare data
907   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
908   // Make sure that the underlying data structures are properly truncated and
909   // cause not leak
910   ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0);
911   ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0);
912 }
913 
TEST_P(TransactionTest,SimpleTwoPhaseTransactionTest)914 TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
915   for (bool cwb4recovery : {true, false}) {
916     ASSERT_OK(ReOpen());
917     WriteOptions write_options;
918     ReadOptions read_options;
919 
920     TransactionOptions txn_options;
921     txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery;
922 
923     string value;
924     Status s;
925 
926     DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
927 
928     Transaction* txn = db->BeginTransaction(write_options, txn_options);
929     s = txn->SetName("xid");
930     ASSERT_OK(s);
931 
932     ASSERT_EQ(db->GetTransactionByName("xid"), txn);
933 
934     // transaction put
935     s = txn->Put(Slice("foo"), Slice("bar"));
936     ASSERT_OK(s);
937     ASSERT_EQ(1, txn->GetNumPuts());
938 
939     // regular db put
940     s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
941     ASSERT_OK(s);
942     ASSERT_EQ(1, txn->GetNumPuts());
943 
944     // regular db read
945     db->Get(read_options, "foo2", &value);
946     ASSERT_EQ(value, "bar2");
947 
948     // commit time put
949     txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs"));
950     txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats"));
951 
952     // nothing has been prepped yet
953     ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
954 
955     s = txn->Prepare();
956     ASSERT_OK(s);
957 
958     // data not im mem yet
959     s = db->Get(read_options, Slice("foo"), &value);
960     ASSERT_TRUE(s.IsNotFound());
961     s = db->Get(read_options, Slice("gtid"), &value);
962     ASSERT_TRUE(s.IsNotFound());
963 
964     // find trans in list of prepared transactions
965     std::vector<Transaction*> prepared_trans;
966     db->GetAllPreparedTransactions(&prepared_trans);
967     ASSERT_EQ(prepared_trans.size(), 1);
968     ASSERT_EQ(prepared_trans.front()->GetName(), "xid");
969 
970     auto log_containing_prep =
971         db_impl->TEST_FindMinLogContainingOutstandingPrep();
972     ASSERT_GT(log_containing_prep, 0);
973 
974     // make commit
975     s = txn->Commit();
976     ASSERT_OK(s);
977 
978     // value is now available
979     s = db->Get(read_options, "foo", &value);
980     ASSERT_OK(s);
981     ASSERT_EQ(value, "bar");
982 
983     if (!cwb4recovery) {
984       s = db->Get(read_options, "gtid", &value);
985       ASSERT_OK(s);
986       ASSERT_EQ(value, "dogs");
987 
988       s = db->Get(read_options, "gtid2", &value);
989       ASSERT_OK(s);
990       ASSERT_EQ(value, "cats");
991     }
992 
993     // we already committed
994     s = txn->Commit();
995     ASSERT_EQ(s, Status::InvalidArgument());
996 
997     // no longer is prepared results
998     db->GetAllPreparedTransactions(&prepared_trans);
999     ASSERT_EQ(prepared_trans.size(), 0);
1000     ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1001 
1002     // heap should not care about prepared section anymore
1003     ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1004 
1005     switch (txn_db_options.write_policy) {
1006       case WRITE_COMMITTED:
1007         // but now our memtable should be referencing the prep section
1008         ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1009         ASSERT_EQ(log_containing_prep,
1010                   db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1011         break;
1012       case WRITE_PREPARED:
1013       case WRITE_UNPREPARED:
1014         // In these modes memtable do not ref the prep sections
1015         ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1016         break;
1017       default:
1018         assert(false);
1019     }
1020 
1021     db_impl->TEST_FlushMemTable(true);
1022     // After flush the recoverable state must be visible
1023     if (cwb4recovery) {
1024       s = db->Get(read_options, "gtid", &value);
1025       ASSERT_OK(s);
1026       ASSERT_EQ(value, "dogs");
1027 
1028       s = db->Get(read_options, "gtid2", &value);
1029       ASSERT_OK(s);
1030       ASSERT_EQ(value, "cats");
1031     }
1032 
1033     // after memtable flush we can now relese the log
1034     ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
1035     ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1036 
1037     delete txn;
1038 
1039     if (cwb4recovery) {
1040       // kill and reopen to trigger recovery
1041       s = ReOpenNoDelete();
1042       ASSERT_OK(s);
1043       assert(db != nullptr);
1044       s = db->Get(read_options, "gtid", &value);
1045       ASSERT_OK(s);
1046       ASSERT_EQ(value, "dogs");
1047 
1048       s = db->Get(read_options, "gtid2", &value);
1049       ASSERT_OK(s);
1050       ASSERT_EQ(value, "cats");
1051     }
1052   }
1053 }
1054 
TEST_P(TransactionTest,TwoPhaseNameTest)1055 TEST_P(TransactionTest, TwoPhaseNameTest) {
1056   Status s;
1057 
1058   WriteOptions write_options;
1059   TransactionOptions txn_options;
1060   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1061   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1062   Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
1063   ASSERT_TRUE(txn3);
1064   delete txn3;
1065 
1066   // cant prepare txn without name
1067   s = txn1->Prepare();
1068   ASSERT_EQ(s, Status::InvalidArgument());
1069 
1070   // name too short
1071   s = txn1->SetName("");
1072   ASSERT_EQ(s, Status::InvalidArgument());
1073 
1074   // name too long
1075   s = txn1->SetName(std::string(513, 'x'));
1076   ASSERT_EQ(s, Status::InvalidArgument());
1077 
1078   // valid set name
1079   s = txn1->SetName("name1");
1080   ASSERT_OK(s);
1081 
1082   // cant have duplicate name
1083   s = txn2->SetName("name1");
1084   ASSERT_EQ(s, Status::InvalidArgument());
1085 
1086   // shouldn't be able to prepare
1087   s = txn2->Prepare();
1088   ASSERT_EQ(s, Status::InvalidArgument());
1089 
1090   // valid name set
1091   s = txn2->SetName("name2");
1092   ASSERT_OK(s);
1093 
1094   // cant reset name
1095   s = txn2->SetName("name3");
1096   ASSERT_EQ(s, Status::InvalidArgument());
1097 
1098   ASSERT_EQ(txn1->GetName(), "name1");
1099   ASSERT_EQ(txn2->GetName(), "name2");
1100 
1101   s = txn1->Prepare();
1102   ASSERT_OK(s);
1103 
1104   // can't rename after prepare
1105   s = txn1->SetName("name4");
1106   ASSERT_EQ(s, Status::InvalidArgument());
1107 
1108   txn1->Rollback();
1109   txn2->Rollback();
1110   delete txn1;
1111   delete txn2;
1112 }
1113 
TEST_P(TransactionTest,TwoPhaseEmptyWriteTest)1114 TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
1115   for (bool cwb4recovery : {true, false}) {
1116     for (bool test_with_empty_wal : {true, false}) {
1117       if (!cwb4recovery && test_with_empty_wal) {
1118         continue;
1119       }
1120       ASSERT_OK(ReOpen());
1121       Status s;
1122       std::string value;
1123 
1124       WriteOptions write_options;
1125       ReadOptions read_options;
1126       TransactionOptions txn_options;
1127       txn_options.use_only_the_last_commit_time_batch_for_recovery =
1128           cwb4recovery;
1129       Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1130       ASSERT_TRUE(txn1);
1131       Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1132       ASSERT_TRUE(txn2);
1133 
1134       s = txn1->SetName("joe");
1135       ASSERT_OK(s);
1136 
1137       s = txn2->SetName("bob");
1138       ASSERT_OK(s);
1139 
1140       s = txn1->Prepare();
1141       ASSERT_OK(s);
1142 
1143       s = txn1->Commit();
1144       ASSERT_OK(s);
1145 
1146       delete txn1;
1147 
1148       txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar"));
1149 
1150       s = txn2->Prepare();
1151       ASSERT_OK(s);
1152 
1153       s = txn2->Commit();
1154       ASSERT_OK(s);
1155 
1156       delete txn2;
1157       if (!cwb4recovery) {
1158         s = db->Get(read_options, "foo", &value);
1159         ASSERT_OK(s);
1160         ASSERT_EQ(value, "bar");
1161       } else {
1162         if (test_with_empty_wal) {
1163           DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1164           db_impl->TEST_FlushMemTable(true);
1165           // After flush the state must be visible
1166           s = db->Get(read_options, "foo", &value);
1167           ASSERT_OK(s);
1168           ASSERT_EQ(value, "bar");
1169         }
1170         db->FlushWAL(true);
1171         // kill and reopen to trigger recovery
1172         s = ReOpenNoDelete();
1173         ASSERT_OK(s);
1174         assert(db != nullptr);
1175         s = db->Get(read_options, "foo", &value);
1176         ASSERT_OK(s);
1177         ASSERT_EQ(value, "bar");
1178       }
1179     }
1180   }
1181 }
1182 
1183 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionStressTest,TwoPhaseExpirationTest)1184 TEST_P(TransactionStressTest, TwoPhaseExpirationTest) {
1185   Status s;
1186 
1187   WriteOptions write_options;
1188   TransactionOptions txn_options;
1189   txn_options.expiration = 500;  // 500ms
1190   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1191   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1192   ASSERT_TRUE(txn1);
1193   ASSERT_TRUE(txn1);
1194 
1195   s = txn1->SetName("joe");
1196   ASSERT_OK(s);
1197   s = txn2->SetName("bob");
1198   ASSERT_OK(s);
1199 
1200   s = txn1->Prepare();
1201   ASSERT_OK(s);
1202 
1203   /* sleep override */
1204   std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1205 
1206   s = txn1->Commit();
1207   ASSERT_OK(s);
1208 
1209   s = txn2->Prepare();
1210   ASSERT_EQ(s, Status::Expired());
1211 
1212   delete txn1;
1213   delete txn2;
1214 }
1215 
TEST_P(TransactionTest,TwoPhaseRollbackTest)1216 TEST_P(TransactionTest, TwoPhaseRollbackTest) {
1217   WriteOptions write_options;
1218   ReadOptions read_options;
1219 
1220   TransactionOptions txn_options;
1221 
1222   std::string value;
1223   Status s;
1224 
1225   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1226   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1227   s = txn->SetName("xid");
1228   ASSERT_OK(s);
1229 
1230   // transaction put
1231   s = txn->Put(Slice("tfoo"), Slice("tbar"));
1232   ASSERT_OK(s);
1233 
1234   // value is readable form txn
1235   s = txn->Get(read_options, Slice("tfoo"), &value);
1236   ASSERT_OK(s);
1237   ASSERT_EQ(value, "tbar");
1238 
1239   // issue rollback
1240   s = txn->Rollback();
1241   ASSERT_OK(s);
1242 
1243   // value is nolonger readable
1244   s = txn->Get(read_options, Slice("tfoo"), &value);
1245   ASSERT_TRUE(s.IsNotFound());
1246   ASSERT_EQ(txn->GetNumPuts(), 0);
1247 
1248   // put new txn values
1249   s = txn->Put(Slice("tfoo2"), Slice("tbar2"));
1250   ASSERT_OK(s);
1251 
1252   // new value is readable from txn
1253   s = txn->Get(read_options, Slice("tfoo2"), &value);
1254   ASSERT_OK(s);
1255   ASSERT_EQ(value, "tbar2");
1256 
1257   s = txn->Prepare();
1258   ASSERT_OK(s);
1259 
1260   // flush to next wal
1261   s = db->Put(write_options, Slice("foo"), Slice("bar"));
1262   ASSERT_OK(s);
1263   db_impl->TEST_FlushMemTable(true);
1264 
1265   // issue rollback (marker written to WAL)
1266   s = txn->Rollback();
1267   ASSERT_OK(s);
1268 
1269   // value is nolonger readable
1270   s = txn->Get(read_options, Slice("tfoo2"), &value);
1271   ASSERT_TRUE(s.IsNotFound());
1272   ASSERT_EQ(txn->GetNumPuts(), 0);
1273 
1274   // make commit
1275   s = txn->Commit();
1276   ASSERT_EQ(s, Status::InvalidArgument());
1277 
1278   // try rollback again
1279   s = txn->Rollback();
1280   ASSERT_EQ(s, Status::InvalidArgument());
1281 
1282   delete txn;
1283 }
1284 
TEST_P(TransactionTest,PersistentTwoPhaseTransactionTest)1285 TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
1286   WriteOptions write_options;
1287   write_options.sync = true;
1288   write_options.disableWAL = false;
1289   ReadOptions read_options;
1290 
1291   TransactionOptions txn_options;
1292 
1293   std::string value;
1294   Status s;
1295 
1296   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1297 
1298   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1299   s = txn->SetName("xid");
1300   ASSERT_OK(s);
1301 
1302   ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1303 
1304   // transaction put
1305   s = txn->Put(Slice("foo"), Slice("bar"));
1306   ASSERT_OK(s);
1307   ASSERT_EQ(1, txn->GetNumPuts());
1308 
1309   // txn read
1310   s = txn->Get(read_options, "foo", &value);
1311   ASSERT_OK(s);
1312   ASSERT_EQ(value, "bar");
1313 
1314   // regular db put
1315   s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
1316   ASSERT_OK(s);
1317   ASSERT_EQ(1, txn->GetNumPuts());
1318 
1319   db_impl->TEST_FlushMemTable(true);
1320 
1321   // regular db read
1322   db->Get(read_options, "foo2", &value);
1323   ASSERT_EQ(value, "bar2");
1324 
1325   // nothing has been prepped yet
1326   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1327 
1328   // prepare
1329   s = txn->Prepare();
1330   ASSERT_OK(s);
1331 
1332   // still not available to db
1333   s = db->Get(read_options, Slice("foo"), &value);
1334   ASSERT_TRUE(s.IsNotFound());
1335 
1336   db->FlushWAL(false);
1337   delete txn;
1338   // kill and reopen
1339   reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1340   s = ReOpenNoDelete();
1341   ASSERT_OK(s);
1342   assert(db != nullptr);
1343   db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1344 
1345   // find trans in list of prepared transactions
1346   std::vector<Transaction*> prepared_trans;
1347   db->GetAllPreparedTransactions(&prepared_trans);
1348   ASSERT_EQ(prepared_trans.size(), 1);
1349 
1350   txn = prepared_trans.front();
1351   ASSERT_TRUE(txn);
1352   ASSERT_EQ(txn->GetName(), "xid");
1353   ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1354 
1355   // log has been marked
1356   auto log_containing_prep =
1357       db_impl->TEST_FindMinLogContainingOutstandingPrep();
1358   ASSERT_GT(log_containing_prep, 0);
1359 
1360   // value is readable from txn
1361   s = txn->Get(read_options, "foo", &value);
1362   ASSERT_OK(s);
1363   ASSERT_EQ(value, "bar");
1364 
1365   // make commit
1366   s = txn->Commit();
1367   ASSERT_OK(s);
1368 
1369   // value is now available
1370   db->Get(read_options, "foo", &value);
1371   ASSERT_EQ(value, "bar");
1372 
1373   // we already committed
1374   s = txn->Commit();
1375   ASSERT_EQ(s, Status::InvalidArgument());
1376 
1377   // no longer is prepared results
1378   prepared_trans.clear();
1379   db->GetAllPreparedTransactions(&prepared_trans);
1380   ASSERT_EQ(prepared_trans.size(), 0);
1381 
1382   // transaction should no longer be visible
1383   ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1384 
1385   // heap should not care about prepared section anymore
1386   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1387 
1388   switch (txn_db_options.write_policy) {
1389     case WRITE_COMMITTED:
1390       // but now our memtable should be referencing the prep section
1391       ASSERT_EQ(log_containing_prep,
1392                 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1393       ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1394 
1395       break;
1396     case WRITE_PREPARED:
1397     case WRITE_UNPREPARED:
1398       // In these modes memtable do not ref the prep sections
1399       ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1400       break;
1401     default:
1402       assert(false);
1403   }
1404 
1405   // Add a dummy record to memtable before a flush. Otherwise, the
1406   // memtable will be empty and flush will be skipped.
1407   s = db->Put(write_options, Slice("foo3"), Slice("bar3"));
1408   ASSERT_OK(s);
1409 
1410   db_impl->TEST_FlushMemTable(true);
1411 
1412   // after memtable flush we can now release the log
1413   ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
1414   ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1415 
1416   delete txn;
1417 
1418   // deleting transaction should unregister transaction
1419   ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1420 }
1421 #endif  // ROCKSDB_VALGRIND_RUN
1422 
1423 // TODO this test needs to be updated with serial commits
TEST_P(TransactionTest,DISABLED_TwoPhaseMultiThreadTest)1424 TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) {
1425   // mix transaction writes and regular writes
1426   const uint32_t NUM_TXN_THREADS = 50;
1427   std::atomic<uint32_t> txn_thread_num(0);
1428 
1429   std::function<void()> txn_write_thread = [&]() {
1430     uint32_t id = txn_thread_num.fetch_add(1);
1431 
1432     WriteOptions write_options;
1433     write_options.sync = true;
1434     write_options.disableWAL = false;
1435     TransactionOptions txn_options;
1436     txn_options.lock_timeout = 1000000;
1437     if (id % 2 == 0) {
1438       txn_options.expiration = 1000000;
1439     }
1440     TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(id)));
1441     Transaction* txn = db->BeginTransaction(write_options, txn_options);
1442     ASSERT_OK(txn->SetName(name));
1443     for (int i = 0; i < 10; i++) {
1444       std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
1445       ASSERT_OK(txn->Put(key, "val"));
1446     }
1447     ASSERT_OK(txn->Prepare());
1448     ASSERT_OK(txn->Commit());
1449     delete txn;
1450   };
1451 
1452   // assure that all thread are in the same write group
1453   std::atomic<uint32_t> t_wait_on_prepare(0);
1454   std::atomic<uint32_t> t_wait_on_commit(0);
1455 
1456   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1457       "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
1458         auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
1459 
1460         if (writer->ShouldWriteToWAL()) {
1461           t_wait_on_prepare.fetch_add(1);
1462           // wait for friends
1463           while (t_wait_on_prepare.load() < NUM_TXN_THREADS) {
1464             env->SleepForMicroseconds(10);
1465           }
1466         } else if (writer->ShouldWriteToMemtable()) {
1467           t_wait_on_commit.fetch_add(1);
1468           // wait for friends
1469           while (t_wait_on_commit.load() < NUM_TXN_THREADS) {
1470             env->SleepForMicroseconds(10);
1471           }
1472         } else {
1473           FAIL();
1474         }
1475       });
1476 
1477   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1478 
1479   // do all the writes
1480   std::vector<port::Thread> threads;
1481   for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
1482     threads.emplace_back(txn_write_thread);
1483   }
1484   for (auto& t : threads) {
1485     t.join();
1486   }
1487 
1488   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1489   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1490 
1491   ReadOptions read_options;
1492   std::string value;
1493   Status s;
1494   for (uint32_t t = 0; t < NUM_TXN_THREADS; t++) {
1495     TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(t)));
1496     for (int i = 0; i < 10; i++) {
1497       std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
1498       s = db->Get(read_options, key, &value);
1499       ASSERT_OK(s);
1500       ASSERT_EQ(value, "val");
1501     }
1502   }
1503 }
1504 
TEST_P(TransactionStressTest,TwoPhaseLongPrepareTest)1505 TEST_P(TransactionStressTest, TwoPhaseLongPrepareTest) {
1506   WriteOptions write_options;
1507   write_options.sync = true;
1508   write_options.disableWAL = false;
1509   ReadOptions read_options;
1510   TransactionOptions txn_options;
1511 
1512   std::string value;
1513   Status s;
1514 
1515   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1516   s = txn->SetName("bob");
1517   ASSERT_OK(s);
1518 
1519   // transaction put
1520   s = txn->Put(Slice("foo"), Slice("bar"));
1521   ASSERT_OK(s);
1522 
1523   // prepare
1524   s = txn->Prepare();
1525   ASSERT_OK(s);
1526 
1527   delete txn;
1528 
1529   for (int i = 0; i < 1000; i++) {
1530     std::string key(i, 'k');
1531     std::string val(1000, 'v');
1532     assert(db != nullptr);
1533     s = db->Put(write_options, key, val);
1534     ASSERT_OK(s);
1535 
1536     if (i % 29 == 0) {
1537       // crash
1538       env->SetFilesystemActive(false);
1539       reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1540       ReOpenNoDelete();
1541     } else if (i % 37 == 0) {
1542       // close
1543       ReOpenNoDelete();
1544     }
1545   }
1546 
1547   // commit old txn
1548   txn = db->GetTransactionByName("bob");
1549   ASSERT_TRUE(txn);
1550   s = txn->Commit();
1551   ASSERT_OK(s);
1552 
1553   // verify data txn data
1554   s = db->Get(read_options, "foo", &value);
1555   ASSERT_EQ(s, Status::OK());
1556   ASSERT_EQ(value, "bar");
1557 
1558   // verify non txn data
1559   for (int i = 0; i < 1000; i++) {
1560     std::string key(i, 'k');
1561     std::string val(1000, 'v');
1562     s = db->Get(read_options, key, &value);
1563     ASSERT_EQ(s, Status::OK());
1564     ASSERT_EQ(value, val);
1565   }
1566 
1567   delete txn;
1568 }
1569 
TEST_P(TransactionTest,TwoPhaseSequenceTest)1570 TEST_P(TransactionTest, TwoPhaseSequenceTest) {
1571   WriteOptions write_options;
1572   write_options.sync = true;
1573   write_options.disableWAL = false;
1574   ReadOptions read_options;
1575 
1576   TransactionOptions txn_options;
1577 
1578   std::string value;
1579   Status s;
1580 
1581   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1582   s = txn->SetName("xid");
1583   ASSERT_OK(s);
1584 
1585   // transaction put
1586   s = txn->Put(Slice("foo"), Slice("bar"));
1587   ASSERT_OK(s);
1588   s = txn->Put(Slice("foo2"), Slice("bar2"));
1589   ASSERT_OK(s);
1590   s = txn->Put(Slice("foo3"), Slice("bar3"));
1591   ASSERT_OK(s);
1592   s = txn->Put(Slice("foo4"), Slice("bar4"));
1593   ASSERT_OK(s);
1594 
1595   // prepare
1596   s = txn->Prepare();
1597   ASSERT_OK(s);
1598 
1599   // make commit
1600   s = txn->Commit();
1601   ASSERT_OK(s);
1602 
1603   delete txn;
1604 
1605   // kill and reopen
1606   env->SetFilesystemActive(false);
1607   ReOpenNoDelete();
1608   assert(db != nullptr);
1609 
1610   // value is now available
1611   s = db->Get(read_options, "foo4", &value);
1612   ASSERT_EQ(s, Status::OK());
1613   ASSERT_EQ(value, "bar4");
1614 }
1615 
TEST_P(TransactionTest,TwoPhaseDoubleRecoveryTest)1616 TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) {
1617   WriteOptions write_options;
1618   write_options.sync = true;
1619   write_options.disableWAL = false;
1620   ReadOptions read_options;
1621 
1622   TransactionOptions txn_options;
1623 
1624   std::string value;
1625   Status s;
1626 
1627   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1628   s = txn->SetName("a");
1629   ASSERT_OK(s);
1630 
1631   // transaction put
1632   s = txn->Put(Slice("foo"), Slice("bar"));
1633   ASSERT_OK(s);
1634 
1635   // prepare
1636   s = txn->Prepare();
1637   ASSERT_OK(s);
1638 
1639   delete txn;
1640 
1641   // kill and reopen
1642   env->SetFilesystemActive(false);
1643   reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1644   ReOpenNoDelete();
1645 
1646   // commit old txn
1647   txn = db->GetTransactionByName("a");
1648   s = txn->Commit();
1649   ASSERT_OK(s);
1650 
1651   s = db->Get(read_options, "foo", &value);
1652   ASSERT_EQ(s, Status::OK());
1653   ASSERT_EQ(value, "bar");
1654 
1655   delete txn;
1656 
1657   txn = db->BeginTransaction(write_options, txn_options);
1658   s = txn->SetName("b");
1659   ASSERT_OK(s);
1660 
1661   s = txn->Put(Slice("foo2"), Slice("bar2"));
1662   ASSERT_OK(s);
1663 
1664   s = txn->Prepare();
1665   ASSERT_OK(s);
1666 
1667   s = txn->Commit();
1668   ASSERT_OK(s);
1669 
1670   delete txn;
1671 
1672   // kill and reopen
1673   env->SetFilesystemActive(false);
1674   ReOpenNoDelete();
1675   assert(db != nullptr);
1676 
1677   // value is now available
1678   s = db->Get(read_options, "foo", &value);
1679   ASSERT_EQ(s, Status::OK());
1680   ASSERT_EQ(value, "bar");
1681 
1682   s = db->Get(read_options, "foo2", &value);
1683   ASSERT_EQ(s, Status::OK());
1684   ASSERT_EQ(value, "bar2");
1685 }
1686 
TEST_P(TransactionTest,TwoPhaseLogRollingTest)1687 TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
1688   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1689 
1690   Status s;
1691   std::string v;
1692   ColumnFamilyHandle *cfa, *cfb;
1693 
1694   // Create 2 new column families
1695   ColumnFamilyOptions cf_options;
1696   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1697   ASSERT_OK(s);
1698   s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1699   ASSERT_OK(s);
1700 
1701   WriteOptions wopts;
1702   wopts.disableWAL = false;
1703   wopts.sync = true;
1704 
1705   TransactionOptions topts1;
1706   Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1707   s = txn1->SetName("xid1");
1708   ASSERT_OK(s);
1709 
1710   TransactionOptions topts2;
1711   Transaction* txn2 = db->BeginTransaction(wopts, topts2);
1712   s = txn2->SetName("xid2");
1713   ASSERT_OK(s);
1714 
1715   // transaction put in two column families
1716   s = txn1->Put(cfa, "ka1", "va1");
1717   ASSERT_OK(s);
1718 
1719   // transaction put in two column families
1720   s = txn2->Put(cfa, "ka2", "va2");
1721   ASSERT_OK(s);
1722   s = txn2->Put(cfb, "kb2", "vb2");
1723   ASSERT_OK(s);
1724 
1725   // write prep section to wal
1726   s = txn1->Prepare();
1727   ASSERT_OK(s);
1728 
1729   // our log should be in the heap
1730   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1731             txn1->GetLogNumber());
1732   ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
1733 
1734   // flush default cf to crate new log
1735   s = db->Put(wopts, "foo", "bar");
1736   ASSERT_OK(s);
1737   s = db_impl->TEST_FlushMemTable(true);
1738   ASSERT_OK(s);
1739 
1740   // make sure we are on a new log
1741   ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
1742 
1743   // put txn2 prep section in this log
1744   s = txn2->Prepare();
1745   ASSERT_OK(s);
1746   ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
1747 
1748   // heap should still see first log
1749   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1750             txn1->GetLogNumber());
1751 
1752   // commit txn1
1753   s = txn1->Commit();
1754   ASSERT_OK(s);
1755 
1756   // heap should now show txn2s log
1757   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1758             txn2->GetLogNumber());
1759 
1760   switch (txn_db_options.write_policy) {
1761     case WRITE_COMMITTED:
1762       // we should see txn1s log refernced by the memtables
1763       ASSERT_EQ(txn1->GetLogNumber(),
1764                 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1765       break;
1766     case WRITE_PREPARED:
1767     case WRITE_UNPREPARED:
1768       // In these modes memtable do not ref the prep sections
1769       ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1770       break;
1771     default:
1772       assert(false);
1773   }
1774 
1775   // flush default cf to crate new log
1776   s = db->Put(wopts, "foo", "bar2");
1777   ASSERT_OK(s);
1778   s = db_impl->TEST_FlushMemTable(true);
1779   ASSERT_OK(s);
1780 
1781   // make sure we are on a new log
1782   ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
1783 
1784   // commit txn2
1785   s = txn2->Commit();
1786   ASSERT_OK(s);
1787 
1788   // heap should not show any logs
1789   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1790 
1791   switch (txn_db_options.write_policy) {
1792     case WRITE_COMMITTED:
1793       // should show the first txn log
1794       ASSERT_EQ(txn1->GetLogNumber(),
1795                 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1796       break;
1797     case WRITE_PREPARED:
1798     case WRITE_UNPREPARED:
1799       // In these modes memtable do not ref the prep sections
1800       ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1801       break;
1802     default:
1803       assert(false);
1804   }
1805 
1806   // flush only cfa memtable
1807   s = db_impl->TEST_FlushMemTable(true, false, cfa);
1808   ASSERT_OK(s);
1809 
1810   switch (txn_db_options.write_policy) {
1811     case WRITE_COMMITTED:
1812       // should show the first txn log
1813       ASSERT_EQ(txn2->GetLogNumber(),
1814                 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1815       break;
1816     case WRITE_PREPARED:
1817     case WRITE_UNPREPARED:
1818       // In these modes memtable do not ref the prep sections
1819       ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1820       break;
1821     default:
1822       assert(false);
1823   }
1824 
1825   // flush only cfb memtable
1826   s = db_impl->TEST_FlushMemTable(true, false, cfb);
1827   ASSERT_OK(s);
1828 
1829   // should show not dependency on logs
1830   ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1831   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1832 
1833   delete txn1;
1834   delete txn2;
1835   delete cfa;
1836   delete cfb;
1837 }
1838 
TEST_P(TransactionTest,TwoPhaseLogRollingTest2)1839 TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
1840   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1841 
1842   Status s;
1843   ColumnFamilyHandle *cfa, *cfb;
1844 
1845   ColumnFamilyOptions cf_options;
1846   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1847   ASSERT_OK(s);
1848   s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1849   ASSERT_OK(s);
1850 
1851   WriteOptions wopts;
1852   wopts.disableWAL = false;
1853   wopts.sync = true;
1854 
1855   auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
1856   auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
1857 
1858   TransactionOptions topts1;
1859   Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1860   s = txn1->SetName("xid1");
1861   ASSERT_OK(s);
1862   s = txn1->Put(cfa, "boys", "girls1");
1863   ASSERT_OK(s);
1864 
1865   Transaction* txn2 = db->BeginTransaction(wopts, topts1);
1866   s = txn2->SetName("xid2");
1867   ASSERT_OK(s);
1868   s = txn2->Put(cfb, "up", "down1");
1869   ASSERT_OK(s);
1870 
1871   // prepre transaction in LOG A
1872   s = txn1->Prepare();
1873   ASSERT_OK(s);
1874 
1875   // prepre transaction in LOG A
1876   s = txn2->Prepare();
1877   ASSERT_OK(s);
1878 
1879   // regular put so that mem table can actually be flushed for log rolling
1880   s = db->Put(wopts, "cats", "dogs1");
1881   ASSERT_OK(s);
1882 
1883   auto prepare_log_no = txn1->GetLastLogNumber();
1884 
1885   // roll to LOG B
1886   s = db_impl->TEST_FlushMemTable(true);
1887   ASSERT_OK(s);
1888 
1889   // now we pause background work so that
1890   // imm()s are not flushed before we can check their status
1891   s = db_impl->PauseBackgroundWork();
1892   ASSERT_OK(s);
1893 
1894   ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
1895   switch (txn_db_options.write_policy) {
1896     case WRITE_COMMITTED:
1897       // This cf is empty and should ref the latest log
1898       ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1899       ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
1900       break;
1901     case WRITE_PREPARED:
1902     case WRITE_UNPREPARED:
1903       // This cf is not flushed yet and should ref the log that has its data
1904       ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1905       break;
1906     default:
1907       assert(false);
1908   }
1909   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1910             txn1->GetLogNumber());
1911   ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1912 
1913   // commit in LOG B
1914   s = txn1->Commit();
1915   ASSERT_OK(s);
1916 
1917   switch (txn_db_options.write_policy) {
1918     case WRITE_COMMITTED:
1919       ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
1920                 prepare_log_no);
1921       break;
1922     case WRITE_PREPARED:
1923     case WRITE_UNPREPARED:
1924       // In these modes memtable do not ref the prep sections
1925       ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1926       break;
1927     default:
1928       assert(false);
1929   }
1930 
1931   ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
1932 
1933   // request a flush for all column families such that the earliest
1934   // alive log file can be killed
1935   db_impl->TEST_SwitchWAL();
1936   // log cannot be flushed because txn2 has not been commited
1937   ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
1938   ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog());
1939 
1940   // assert that cfa has a flush requested
1941   ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
1942 
1943   switch (txn_db_options.write_policy) {
1944     case WRITE_COMMITTED:
1945       // cfb should not be flushed becuse it has no data from LOG A
1946       ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
1947       break;
1948     case WRITE_PREPARED:
1949     case WRITE_UNPREPARED:
1950       // cfb should be flushed becuse it has prepared data from LOG A
1951       ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1952       break;
1953     default:
1954       assert(false);
1955   }
1956 
1957   // cfb now has data from LOG A
1958   s = txn2->Commit();
1959   ASSERT_OK(s);
1960 
1961   db_impl->TEST_SwitchWAL();
1962   ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
1963 
1964   // we should see that cfb now has a flush requested
1965   ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1966 
1967   // all data in LOG A resides in a memtable that has been
1968   // requested for a flush
1969   ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
1970 
1971   delete txn1;
1972   delete txn2;
1973   delete cfa;
1974   delete cfb;
1975 }
1976 /*
1977  * 1) use prepare to keep first log around to determine starting sequence
1978  * during recovery.
1979  * 2) insert many values, skipping wal, to increase seqid.
1980  * 3) insert final value into wal
1981  * 4) recover and see that final value was properly recovered - not
1982  * hidden behind improperly summed sequence ids
1983  */
TEST_P(TransactionTest,TwoPhaseOutOfOrderDelete)1984 TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
1985   DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1986   WriteOptions wal_on, wal_off;
1987   wal_on.sync = true;
1988   wal_on.disableWAL = false;
1989   wal_off.disableWAL = true;
1990   ReadOptions read_options;
1991   TransactionOptions txn_options;
1992 
1993   std::string value;
1994   Status s;
1995 
1996   Transaction* txn1 = db->BeginTransaction(wal_on, txn_options);
1997 
1998   s = txn1->SetName("1");
1999   ASSERT_OK(s);
2000 
2001   s = db->Put(wal_on, "first", "first");
2002   ASSERT_OK(s);
2003 
2004   s = txn1->Put(Slice("dummy"), Slice("dummy"));
2005   ASSERT_OK(s);
2006   s = txn1->Prepare();
2007   ASSERT_OK(s);
2008 
2009   s = db->Put(wal_off, "cats", "dogs1");
2010   ASSERT_OK(s);
2011   s = db->Put(wal_off, "cats", "dogs2");
2012   ASSERT_OK(s);
2013   s = db->Put(wal_off, "cats", "dogs3");
2014   ASSERT_OK(s);
2015 
2016   s = db_impl->TEST_FlushMemTable(true);
2017   ASSERT_OK(s);
2018 
2019   s = db->Put(wal_on, "cats", "dogs4");
2020   ASSERT_OK(s);
2021 
2022   db->FlushWAL(false);
2023 
2024   // kill and reopen
2025   env->SetFilesystemActive(false);
2026   reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
2027   ReOpenNoDelete();
2028   assert(db != nullptr);
2029 
2030   s = db->Get(read_options, "first", &value);
2031   ASSERT_OK(s);
2032   ASSERT_EQ(value, "first");
2033 
2034   s = db->Get(read_options, "cats", &value);
2035   ASSERT_OK(s);
2036   ASSERT_EQ(value, "dogs4");
2037 }
2038 
TEST_P(TransactionTest,FirstWriteTest)2039 TEST_P(TransactionTest, FirstWriteTest) {
2040   WriteOptions write_options;
2041 
2042   // Test conflict checking against the very first write to a db.
2043   // The transaction's snapshot will have seq 1 and the following write
2044   // will have sequence 1.
2045   Status s = db->Put(write_options, "A", "a");
2046 
2047   Transaction* txn = db->BeginTransaction(write_options);
2048   txn->SetSnapshot();
2049 
2050   ASSERT_OK(s);
2051 
2052   s = txn->Put("A", "b");
2053   ASSERT_OK(s);
2054 
2055   delete txn;
2056 }
2057 
TEST_P(TransactionTest,FirstWriteTest2)2058 TEST_P(TransactionTest, FirstWriteTest2) {
2059   WriteOptions write_options;
2060 
2061   Transaction* txn = db->BeginTransaction(write_options);
2062   txn->SetSnapshot();
2063 
2064   // Test conflict checking against the very first write to a db.
2065   // The transaction's snapshot is a seq 0 while the following write
2066   // will have sequence 1.
2067   Status s = db->Put(write_options, "A", "a");
2068   ASSERT_OK(s);
2069 
2070   s = txn->Put("A", "b");
2071   ASSERT_TRUE(s.IsBusy());
2072 
2073   delete txn;
2074 }
2075 
TEST_P(TransactionTest,WriteOptionsTest)2076 TEST_P(TransactionTest, WriteOptionsTest) {
2077   WriteOptions write_options;
2078   write_options.sync = true;
2079   write_options.disableWAL = true;
2080 
2081   Transaction* txn = db->BeginTransaction(write_options);
2082   ASSERT_TRUE(txn);
2083 
2084   ASSERT_TRUE(txn->GetWriteOptions()->sync);
2085 
2086   write_options.sync = false;
2087   txn->SetWriteOptions(write_options);
2088   ASSERT_FALSE(txn->GetWriteOptions()->sync);
2089   ASSERT_TRUE(txn->GetWriteOptions()->disableWAL);
2090 
2091   delete txn;
2092 }
2093 
TEST_P(TransactionTest,WriteConflictTest)2094 TEST_P(TransactionTest, WriteConflictTest) {
2095   WriteOptions write_options;
2096   ReadOptions read_options;
2097   string value;
2098   Status s;
2099 
2100   db->Put(write_options, "foo", "A");
2101   db->Put(write_options, "foo2", "B");
2102 
2103   Transaction* txn = db->BeginTransaction(write_options);
2104   ASSERT_TRUE(txn);
2105 
2106   s = txn->Put("foo", "A2");
2107   ASSERT_OK(s);
2108 
2109   s = txn->Put("foo2", "B2");
2110   ASSERT_OK(s);
2111 
2112   // This Put outside of a transaction will conflict with the previous write
2113   s = db->Put(write_options, "foo", "xxx");
2114   ASSERT_TRUE(s.IsTimedOut());
2115 
2116   s = db->Get(read_options, "foo", &value);
2117   ASSERT_EQ(value, "A");
2118 
2119   s = txn->Commit();
2120   ASSERT_OK(s);
2121 
2122   db->Get(read_options, "foo", &value);
2123   ASSERT_EQ(value, "A2");
2124   db->Get(read_options, "foo2", &value);
2125   ASSERT_EQ(value, "B2");
2126 
2127   delete txn;
2128 }
2129 
TEST_P(TransactionTest,WriteConflictTest2)2130 TEST_P(TransactionTest, WriteConflictTest2) {
2131   WriteOptions write_options;
2132   ReadOptions read_options;
2133   TransactionOptions txn_options;
2134   std::string value;
2135   Status s;
2136 
2137   db->Put(write_options, "foo", "bar");
2138 
2139   txn_options.set_snapshot = true;
2140   Transaction* txn = db->BeginTransaction(write_options, txn_options);
2141   ASSERT_TRUE(txn);
2142 
2143   // This Put outside of a transaction will conflict with a later write
2144   s = db->Put(write_options, "foo", "barz");
2145   ASSERT_OK(s);
2146 
2147   s = txn->Put("foo2", "X");
2148   ASSERT_OK(s);
2149 
2150   s = txn->Put("foo",
2151                "bar2");  // Conflicts with write done after snapshot taken
2152   ASSERT_TRUE(s.IsBusy());
2153 
2154   s = txn->Put("foo3", "Y");
2155   ASSERT_OK(s);
2156 
2157   s = db->Get(read_options, "foo", &value);
2158   ASSERT_EQ(value, "barz");
2159 
2160   ASSERT_EQ(2, txn->GetNumKeys());
2161 
2162   s = txn->Commit();
2163   ASSERT_OK(s);  // Txn should commit, but only write foo2 and foo3
2164 
2165   // Verify that transaction wrote foo2 and foo3 but not foo
2166   db->Get(read_options, "foo", &value);
2167   ASSERT_EQ(value, "barz");
2168 
2169   db->Get(read_options, "foo2", &value);
2170   ASSERT_EQ(value, "X");
2171 
2172   db->Get(read_options, "foo3", &value);
2173   ASSERT_EQ(value, "Y");
2174 
2175   delete txn;
2176 }
2177 
TEST_P(TransactionTest,ReadConflictTest)2178 TEST_P(TransactionTest, ReadConflictTest) {
2179   WriteOptions write_options;
2180   ReadOptions read_options, snapshot_read_options;
2181   TransactionOptions txn_options;
2182   std::string value;
2183   Status s;
2184 
2185   db->Put(write_options, "foo", "bar");
2186   db->Put(write_options, "foo2", "bar");
2187 
2188   txn_options.set_snapshot = true;
2189   Transaction* txn = db->BeginTransaction(write_options, txn_options);
2190   ASSERT_TRUE(txn);
2191 
2192   txn->SetSnapshot();
2193   snapshot_read_options.snapshot = txn->GetSnapshot();
2194 
2195   txn->GetForUpdate(snapshot_read_options, "foo", &value);
2196   ASSERT_EQ(value, "bar");
2197 
2198   // This Put outside of a transaction will conflict with the previous read
2199   s = db->Put(write_options, "foo", "barz");
2200   ASSERT_TRUE(s.IsTimedOut());
2201 
2202   s = db->Get(read_options, "foo", &value);
2203   ASSERT_EQ(value, "bar");
2204 
2205   s = txn->Get(read_options, "foo", &value);
2206   ASSERT_EQ(value, "bar");
2207 
2208   s = txn->Commit();
2209   ASSERT_OK(s);
2210 
2211   delete txn;
2212 }
2213 
TEST_P(TransactionTest,TxnOnlyTest)2214 TEST_P(TransactionTest, TxnOnlyTest) {
2215   // Test to make sure transactions work when there are no other writes in an
2216   // empty db.
2217 
2218   WriteOptions write_options;
2219   ReadOptions read_options;
2220   std::string value;
2221   Status s;
2222 
2223   Transaction* txn = db->BeginTransaction(write_options);
2224   ASSERT_TRUE(txn);
2225 
2226   s = txn->Put("x", "y");
2227   ASSERT_OK(s);
2228 
2229   s = txn->Commit();
2230   ASSERT_OK(s);
2231 
2232   delete txn;
2233 }
2234 
TEST_P(TransactionTest,FlushTest)2235 TEST_P(TransactionTest, FlushTest) {
2236   WriteOptions write_options;
2237   ReadOptions read_options, snapshot_read_options;
2238   std::string value;
2239   Status s;
2240 
2241   db->Put(write_options, Slice("foo"), Slice("bar"));
2242   db->Put(write_options, Slice("foo2"), Slice("bar"));
2243 
2244   Transaction* txn = db->BeginTransaction(write_options);
2245   ASSERT_TRUE(txn);
2246 
2247   snapshot_read_options.snapshot = txn->GetSnapshot();
2248 
2249   txn->GetForUpdate(snapshot_read_options, "foo", &value);
2250   ASSERT_EQ(value, "bar");
2251 
2252   s = txn->Put(Slice("foo"), Slice("bar2"));
2253   ASSERT_OK(s);
2254 
2255   txn->GetForUpdate(snapshot_read_options, "foo", &value);
2256   ASSERT_EQ(value, "bar2");
2257 
2258   // Put a random key so we have a memtable to flush
2259   s = db->Put(write_options, "dummy", "dummy");
2260   ASSERT_OK(s);
2261 
2262   // force a memtable flush
2263   FlushOptions flush_ops;
2264   db->Flush(flush_ops);
2265 
2266   s = txn->Commit();
2267   // txn should commit since the flushed table is still in MemtableList History
2268   ASSERT_OK(s);
2269 
2270   db->Get(read_options, "foo", &value);
2271   ASSERT_EQ(value, "bar2");
2272 
2273   delete txn;
2274 }
2275 
TEST_P(TransactionTest,FlushTest2)2276 TEST_P(TransactionTest, FlushTest2) {
2277   const size_t num_tests = 3;
2278 
2279   for (size_t n = 0; n < num_tests; n++) {
2280     // Test different table factories
2281     switch (n) {
2282       case 0:
2283         break;
2284       case 1:
2285         options.table_factory.reset(new mock::MockTableFactory());
2286         break;
2287       case 2: {
2288         PlainTableOptions pt_opts;
2289         pt_opts.hash_table_ratio = 0;
2290         options.table_factory.reset(NewPlainTableFactory(pt_opts));
2291         break;
2292       }
2293     }
2294 
2295     Status s = ReOpen();
2296     ASSERT_OK(s);
2297     assert(db != nullptr);
2298 
2299     WriteOptions write_options;
2300     ReadOptions read_options, snapshot_read_options;
2301     TransactionOptions txn_options;
2302     string value;
2303 
2304     DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
2305 
2306     db->Put(write_options, Slice("foo"), Slice("bar"));
2307     db->Put(write_options, Slice("foo2"), Slice("bar2"));
2308     db->Put(write_options, Slice("foo3"), Slice("bar3"));
2309 
2310     txn_options.set_snapshot = true;
2311     Transaction* txn = db->BeginTransaction(write_options, txn_options);
2312     ASSERT_TRUE(txn);
2313 
2314     snapshot_read_options.snapshot = txn->GetSnapshot();
2315 
2316     txn->GetForUpdate(snapshot_read_options, "foo", &value);
2317     ASSERT_EQ(value, "bar");
2318 
2319     s = txn->Put(Slice("foo"), Slice("bar2"));
2320     ASSERT_OK(s);
2321 
2322     txn->GetForUpdate(snapshot_read_options, "foo", &value);
2323     ASSERT_EQ(value, "bar2");
2324     // verify foo is locked by txn
2325     s = db->Delete(write_options, "foo");
2326     ASSERT_TRUE(s.IsTimedOut());
2327 
2328     s = db->Put(write_options, "Z", "z");
2329     ASSERT_OK(s);
2330     s = db->Put(write_options, "dummy", "dummy");
2331     ASSERT_OK(s);
2332 
2333     s = db->Put(write_options, "S", "s");
2334     ASSERT_OK(s);
2335     s = db->SingleDelete(write_options, "S");
2336     ASSERT_OK(s);
2337 
2338     s = txn->Delete("S");
2339     // Should fail after encountering a write to S in memtable
2340     ASSERT_TRUE(s.IsBusy());
2341 
2342     // force a memtable flush
2343     s = db_impl->TEST_FlushMemTable(true);
2344     ASSERT_OK(s);
2345 
2346     // Put a random key so we have a MemTable to flush
2347     s = db->Put(write_options, "dummy", "dummy2");
2348     ASSERT_OK(s);
2349 
2350     // force a memtable flush
2351     ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2352 
2353     s = db->Put(write_options, "dummy", "dummy3");
2354     ASSERT_OK(s);
2355 
2356     // force a memtable flush
2357     // Since our test db has max_write_buffer_number=2, this flush will cause
2358     // the first memtable to get purged from the MemtableList history.
2359     ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2360 
2361     s = txn->Put("X", "Y");
2362     // Should succeed after verifying there is no write to X in SST file
2363     ASSERT_OK(s);
2364 
2365     s = txn->Put("Z", "zz");
2366     // Should fail after encountering a write to Z in SST file
2367     ASSERT_TRUE(s.IsBusy());
2368 
2369     s = txn->GetForUpdate(read_options, "foo2", &value);
2370     // should succeed since key was written before txn started
2371     ASSERT_OK(s);
2372     // verify foo2 is locked by txn
2373     s = db->Delete(write_options, "foo2");
2374     ASSERT_TRUE(s.IsTimedOut());
2375 
2376     s = txn->Delete("S");
2377     // Should fail after encountering a write to S in SST file
2378     ASSERT_TRUE(s.IsBusy());
2379 
2380     // Write a bunch of keys to db to force a compaction
2381     Random rnd(47);
2382     for (int i = 0; i < 1000; i++) {
2383       s = db->Put(write_options, std::to_string(i),
2384                   test::CompressibleString(&rnd, 0.8, 100, &value));
2385       ASSERT_OK(s);
2386     }
2387 
2388     s = txn->Put("X", "yy");
2389     // Should succeed after verifying there is no write to X in SST file
2390     ASSERT_OK(s);
2391 
2392     s = txn->Put("Z", "zzz");
2393     // Should fail after encountering a write to Z in SST file
2394     ASSERT_TRUE(s.IsBusy());
2395 
2396     s = txn->Delete("S");
2397     // Should fail after encountering a write to S in SST file
2398     ASSERT_TRUE(s.IsBusy());
2399 
2400     s = txn->GetForUpdate(read_options, "foo3", &value);
2401     // should succeed since key was written before txn started
2402     ASSERT_OK(s);
2403     // verify foo3 is locked by txn
2404     s = db->Delete(write_options, "foo3");
2405     ASSERT_TRUE(s.IsTimedOut());
2406 
2407     db_impl->TEST_WaitForCompact();
2408 
2409     s = txn->Commit();
2410     ASSERT_OK(s);
2411 
2412     // Transaction should only write the keys that succeeded.
2413     s = db->Get(read_options, "foo", &value);
2414     ASSERT_EQ(value, "bar2");
2415 
2416     s = db->Get(read_options, "X", &value);
2417     ASSERT_OK(s);
2418     ASSERT_EQ("yy", value);
2419 
2420     s = db->Get(read_options, "Z", &value);
2421     ASSERT_OK(s);
2422     ASSERT_EQ("z", value);
2423 
2424   delete txn;
2425   }
2426 }
2427 
TEST_P(TransactionTest,NoSnapshotTest)2428 TEST_P(TransactionTest, NoSnapshotTest) {
2429   WriteOptions write_options;
2430   ReadOptions read_options;
2431   std::string value;
2432   Status s;
2433 
2434   db->Put(write_options, "AAA", "bar");
2435 
2436   Transaction* txn = db->BeginTransaction(write_options);
2437   ASSERT_TRUE(txn);
2438 
2439   // Modify key after transaction start
2440   db->Put(write_options, "AAA", "bar1");
2441 
2442   // Read and write without a snap
2443   txn->GetForUpdate(read_options, "AAA", &value);
2444   ASSERT_EQ(value, "bar1");
2445   s = txn->Put("AAA", "bar2");
2446   ASSERT_OK(s);
2447 
2448   // Should commit since read/write was done after data changed
2449   s = txn->Commit();
2450   ASSERT_OK(s);
2451 
2452   txn->GetForUpdate(read_options, "AAA", &value);
2453   ASSERT_EQ(value, "bar2");
2454 
2455   delete txn;
2456 }
2457 
TEST_P(TransactionTest,MultipleSnapshotTest)2458 TEST_P(TransactionTest, MultipleSnapshotTest) {
2459   WriteOptions write_options;
2460   ReadOptions read_options, snapshot_read_options;
2461   std::string value;
2462   Status s;
2463 
2464   ASSERT_OK(db->Put(write_options, "AAA", "bar"));
2465   ASSERT_OK(db->Put(write_options, "BBB", "bar"));
2466   ASSERT_OK(db->Put(write_options, "CCC", "bar"));
2467 
2468   Transaction* txn = db->BeginTransaction(write_options);
2469   ASSERT_TRUE(txn);
2470 
2471   db->Put(write_options, "AAA", "bar1");
2472 
2473   // Read and write without a snapshot
2474   ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
2475   ASSERT_EQ(value, "bar1");
2476   s = txn->Put("AAA", "bar2");
2477   ASSERT_OK(s);
2478 
2479   // Modify BBB before snapshot is taken
2480   ASSERT_OK(db->Put(write_options, "BBB", "bar1"));
2481 
2482   txn->SetSnapshot();
2483   snapshot_read_options.snapshot = txn->GetSnapshot();
2484 
2485   // Read and write with snapshot
2486   ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value));
2487   ASSERT_EQ(value, "bar1");
2488   s = txn->Put("BBB", "bar2");
2489   ASSERT_OK(s);
2490 
2491   ASSERT_OK(db->Put(write_options, "CCC", "bar1"));
2492 
2493   // Set a new snapshot
2494   txn->SetSnapshot();
2495   snapshot_read_options.snapshot = txn->GetSnapshot();
2496 
2497   // Read and write with snapshot
2498   txn->GetForUpdate(snapshot_read_options, "CCC", &value);
2499   ASSERT_EQ(value, "bar1");
2500   s = txn->Put("CCC", "bar2");
2501   ASSERT_OK(s);
2502 
2503   s = txn->GetForUpdate(read_options, "AAA", &value);
2504   ASSERT_OK(s);
2505   ASSERT_EQ(value, "bar2");
2506   s = txn->GetForUpdate(read_options, "BBB", &value);
2507   ASSERT_OK(s);
2508   ASSERT_EQ(value, "bar2");
2509   s = txn->GetForUpdate(read_options, "CCC", &value);
2510   ASSERT_OK(s);
2511   ASSERT_EQ(value, "bar2");
2512 
2513   s = db->Get(read_options, "AAA", &value);
2514   ASSERT_OK(s);
2515   ASSERT_EQ(value, "bar1");
2516   s = db->Get(read_options, "BBB", &value);
2517   ASSERT_OK(s);
2518   ASSERT_EQ(value, "bar1");
2519   s = db->Get(read_options, "CCC", &value);
2520   ASSERT_OK(s);
2521   ASSERT_EQ(value, "bar1");
2522 
2523   s = txn->Commit();
2524   ASSERT_OK(s);
2525 
2526   s = db->Get(read_options, "AAA", &value);
2527   ASSERT_OK(s);
2528   ASSERT_EQ(value, "bar2");
2529   s = db->Get(read_options, "BBB", &value);
2530   ASSERT_OK(s);
2531   ASSERT_EQ(value, "bar2");
2532   s = db->Get(read_options, "CCC", &value);
2533   ASSERT_OK(s);
2534   ASSERT_EQ(value, "bar2");
2535 
2536   // verify that we track multiple writes to the same key at different snapshots
2537   delete txn;
2538   txn = db->BeginTransaction(write_options);
2539 
2540   // Potentially conflicting writes
2541   db->Put(write_options, "ZZZ", "zzz");
2542   db->Put(write_options, "XXX", "xxx");
2543 
2544   txn->SetSnapshot();
2545 
2546   TransactionOptions txn_options;
2547   txn_options.set_snapshot = true;
2548   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2549   txn2->SetSnapshot();
2550 
2551   // This should not conflict in txn since the snapshot is later than the
2552   // previous write (spoiler alert:  it will later conflict with txn2).
2553   s = txn->Put("ZZZ", "zzzz");
2554   ASSERT_OK(s);
2555 
2556   s = txn->Commit();
2557   ASSERT_OK(s);
2558 
2559   delete txn;
2560 
2561   // This will conflict since the snapshot is earlier than another write to ZZZ
2562   s = txn2->Put("ZZZ", "xxxxx");
2563   ASSERT_TRUE(s.IsBusy());
2564 
2565   s = txn2->Commit();
2566   ASSERT_OK(s);
2567 
2568   s = db->Get(read_options, "ZZZ", &value);
2569   ASSERT_OK(s);
2570   ASSERT_EQ(value, "zzzz");
2571 
2572   delete txn2;
2573 }
2574 
TEST_P(TransactionTest,ColumnFamiliesTest)2575 TEST_P(TransactionTest, ColumnFamiliesTest) {
2576   WriteOptions write_options;
2577   ReadOptions read_options, snapshot_read_options;
2578   TransactionOptions txn_options;
2579   string value;
2580   Status s;
2581 
2582   ColumnFamilyHandle *cfa, *cfb;
2583   ColumnFamilyOptions cf_options;
2584 
2585   // Create 2 new column families
2586   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
2587   ASSERT_OK(s);
2588   s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
2589   ASSERT_OK(s);
2590 
2591   delete cfa;
2592   delete cfb;
2593   delete db;
2594   db = nullptr;
2595 
2596   // open DB with three column families
2597   std::vector<ColumnFamilyDescriptor> column_families;
2598   // have to open default column family
2599   column_families.push_back(
2600       ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2601   // open the new column families
2602   column_families.push_back(
2603       ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
2604   column_families.push_back(
2605       ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
2606 
2607   std::vector<ColumnFamilyHandle*> handles;
2608 
2609   ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2610   assert(db != nullptr);
2611 
2612   Transaction* txn = db->BeginTransaction(write_options);
2613   ASSERT_TRUE(txn);
2614 
2615   txn->SetSnapshot();
2616   snapshot_read_options.snapshot = txn->GetSnapshot();
2617 
2618   txn_options.set_snapshot = true;
2619   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2620   ASSERT_TRUE(txn2);
2621 
2622   // Write some data to the db
2623   WriteBatch batch;
2624   batch.Put("foo", "foo");
2625   batch.Put(handles[1], "AAA", "bar");
2626   batch.Put(handles[1], "AAAZZZ", "bar");
2627   s = db->Write(write_options, &batch);
2628   ASSERT_OK(s);
2629   db->Delete(write_options, handles[1], "AAAZZZ");
2630 
2631   // These keys do not conflict with existing writes since they're in
2632   // different column families
2633   s = txn->Delete("AAA");
2634   ASSERT_OK(s);
2635   s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
2636   ASSERT_TRUE(s.IsNotFound());
2637   Slice key_slice("AAAZZZ");
2638   Slice value_slices[2] = {Slice("bar"), Slice("bar")};
2639   s = txn->Put(handles[2], SliceParts(&key_slice, 1),
2640                SliceParts(value_slices, 2));
2641   ASSERT_OK(s);
2642   ASSERT_EQ(3, txn->GetNumKeys());
2643 
2644   s = txn->Commit();
2645   ASSERT_OK(s);
2646   s = db->Get(read_options, "AAA", &value);
2647   ASSERT_TRUE(s.IsNotFound());
2648   s = db->Get(read_options, handles[2], "AAAZZZ", &value);
2649   ASSERT_EQ(value, "barbar");
2650 
2651   Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
2652   Slice value_slice("barbarbar");
2653 
2654   s = txn2->Delete(handles[2], "XXX");
2655   ASSERT_OK(s);
2656   s = txn2->Delete(handles[1], "XXX");
2657   ASSERT_OK(s);
2658 
2659   // This write will cause a conflict with the earlier batch write
2660   s = txn2->Put(handles[1], SliceParts(key_slices, 3),
2661                 SliceParts(&value_slice, 1));
2662   ASSERT_TRUE(s.IsBusy());
2663 
2664   s = txn2->Commit();
2665   ASSERT_OK(s);
2666   // In the above the latest change to AAAZZZ in handles[1] is delete.
2667   s = db->Get(read_options, handles[1], "AAAZZZ", &value);
2668   ASSERT_TRUE(s.IsNotFound());
2669 
2670   delete txn;
2671   delete txn2;
2672 
2673   txn = db->BeginTransaction(write_options, txn_options);
2674   snapshot_read_options.snapshot = txn->GetSnapshot();
2675 
2676   txn2 = db->BeginTransaction(write_options, txn_options);
2677   ASSERT_TRUE(txn);
2678 
2679   std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
2680                                                    handles[0], handles[2]};
2681   std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
2682   std::vector<std::string> values(4);
2683   std::vector<Status> results = txn->MultiGetForUpdate(
2684       snapshot_read_options, multiget_cfh, multiget_keys, &values);
2685   ASSERT_OK(results[0]);
2686   ASSERT_OK(results[1]);
2687   ASSERT_OK(results[2]);
2688   ASSERT_TRUE(results[3].IsNotFound());
2689   ASSERT_EQ(values[0], "bar");
2690   ASSERT_EQ(values[1], "barbar");
2691   ASSERT_EQ(values[2], "foo");
2692 
2693   s = txn->SingleDelete(handles[2], "ZZZ");
2694   ASSERT_OK(s);
2695   s = txn->Put(handles[2], "ZZZ", "YYY");
2696   ASSERT_OK(s);
2697   s = txn->Put(handles[2], "ZZZ", "YYYY");
2698   ASSERT_OK(s);
2699   s = txn->Delete(handles[2], "ZZZ");
2700   ASSERT_OK(s);
2701   s = txn->Put(handles[2], "AAAZZZ", "barbarbar");
2702   ASSERT_OK(s);
2703 
2704   ASSERT_EQ(5, txn->GetNumKeys());
2705 
2706   // Txn should commit
2707   s = txn->Commit();
2708   ASSERT_OK(s);
2709   s = db->Get(read_options, handles[2], "ZZZ", &value);
2710   ASSERT_TRUE(s.IsNotFound());
2711 
2712   // Put a key which will conflict with the next txn using the previous snapshot
2713   db->Put(write_options, handles[2], "foo", "000");
2714 
2715   results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
2716                                     multiget_keys, &values);
2717   // All results should fail since there was a conflict
2718   ASSERT_TRUE(results[0].IsBusy());
2719   ASSERT_TRUE(results[1].IsBusy());
2720   ASSERT_TRUE(results[2].IsBusy());
2721   ASSERT_TRUE(results[3].IsBusy());
2722 
2723   s = db->Get(read_options, handles[2], "foo", &value);
2724   ASSERT_EQ(value, "000");
2725 
2726   s = txn2->Commit();
2727   ASSERT_OK(s);
2728 
2729   s = db->DropColumnFamily(handles[1]);
2730   ASSERT_OK(s);
2731   s = db->DropColumnFamily(handles[2]);
2732   ASSERT_OK(s);
2733 
2734   delete txn;
2735   delete txn2;
2736 
2737   for (auto handle : handles) {
2738     delete handle;
2739   }
2740 }
2741 
TEST_P(TransactionTest,MultiGetBatchedTest)2742 TEST_P(TransactionTest, MultiGetBatchedTest) {
2743   WriteOptions write_options;
2744   ReadOptions read_options, snapshot_read_options;
2745   TransactionOptions txn_options;
2746   string value;
2747   Status s;
2748 
2749   ColumnFamilyHandle* cf;
2750   ColumnFamilyOptions cf_options;
2751 
2752   // Create a new column families
2753   s = db->CreateColumnFamily(cf_options, "CF", &cf);
2754   ASSERT_OK(s);
2755 
2756   delete cf;
2757   delete db;
2758   db = nullptr;
2759 
2760   // open DB with three column families
2761   std::vector<ColumnFamilyDescriptor> column_families;
2762   // have to open default column family
2763   column_families.push_back(
2764       ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2765   // open the new column families
2766   cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2767   column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2768 
2769   std::vector<ColumnFamilyHandle*> handles;
2770 
2771   options.merge_operator = MergeOperators::CreateStringAppendOperator();
2772   ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2773   assert(db != nullptr);
2774 
2775   // Write some data to the db
2776   WriteBatch batch;
2777   batch.Put(handles[1], "aaa", "val1");
2778   batch.Put(handles[1], "bbb", "val2");
2779   batch.Put(handles[1], "ccc", "val3");
2780   batch.Put(handles[1], "ddd", "foo");
2781   batch.Put(handles[1], "eee", "val5");
2782   batch.Put(handles[1], "fff", "val6");
2783   batch.Merge(handles[1], "ggg", "foo");
2784   s = db->Write(write_options, &batch);
2785   ASSERT_OK(s);
2786 
2787   Transaction* txn = db->BeginTransaction(write_options);
2788   ASSERT_TRUE(txn);
2789 
2790   txn->SetSnapshot();
2791   snapshot_read_options.snapshot = txn->GetSnapshot();
2792 
2793   txn_options.set_snapshot = true;
2794   // Write some data to the db
2795   s = txn->Delete(handles[1], "bbb");
2796   ASSERT_OK(s);
2797   s = txn->Put(handles[1], "ccc", "val3_new");
2798   ASSERT_OK(s);
2799   s = txn->Merge(handles[1], "ddd", "bar");
2800   ASSERT_OK(s);
2801 
2802   std::vector<Slice> keys = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg"};
2803   std::vector<PinnableSlice> values(keys.size());
2804   std::vector<Status> statuses(keys.size());
2805 
2806   txn->MultiGet(snapshot_read_options, handles[1], keys.size(), keys.data(),
2807                 values.data(), statuses.data());
2808   ASSERT_TRUE(statuses[0].ok());
2809   ASSERT_EQ(values[0], "val1");
2810   ASSERT_TRUE(statuses[1].IsNotFound());
2811   ASSERT_TRUE(statuses[2].ok());
2812   ASSERT_EQ(values[2], "val3_new");
2813   ASSERT_TRUE(statuses[3].IsMergeInProgress());
2814   ASSERT_TRUE(statuses[4].ok());
2815   ASSERT_EQ(values[4], "val5");
2816   ASSERT_TRUE(statuses[5].ok());
2817   ASSERT_EQ(values[5], "val6");
2818   ASSERT_TRUE(statuses[6].ok());
2819   ASSERT_EQ(values[6], "foo");
2820   delete txn;
2821   for (auto handle : handles) {
2822     delete handle;
2823   }
2824 }
2825 
2826 // This test calls WriteBatchWithIndex::MultiGetFromBatchAndDB with a large
2827 // number of keys, i.e greater than MultiGetContext::MAX_BATCH_SIZE, which is
2828 // is 32. This forces autovector allocations in the MultiGet code paths
2829 // to use std::vector in addition to stack allocations. The MultiGet keys
2830 // includes Merges, which are handled specially in MultiGetFromBatchAndDB by
2831 // allocating an autovector of MergeContexts
TEST_P(TransactionTest,MultiGetLargeBatchedTest)2832 TEST_P(TransactionTest, MultiGetLargeBatchedTest) {
2833   WriteOptions write_options;
2834   ReadOptions read_options, snapshot_read_options;
2835   string value;
2836   Status s;
2837 
2838   ColumnFamilyHandle* cf;
2839   ColumnFamilyOptions cf_options;
2840 
2841   std::vector<std::string> key_str;
2842   for (int i = 0; i < 100; ++i) {
2843     key_str.emplace_back(std::to_string(i));
2844   }
2845   // Create a new column families
2846   s = db->CreateColumnFamily(cf_options, "CF", &cf);
2847   ASSERT_OK(s);
2848 
2849   delete cf;
2850   delete db;
2851   db = nullptr;
2852 
2853   // open DB with three column families
2854   std::vector<ColumnFamilyDescriptor> column_families;
2855   // have to open default column family
2856   column_families.push_back(
2857       ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2858   // open the new column families
2859   cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2860   column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2861 
2862   std::vector<ColumnFamilyHandle*> handles;
2863 
2864   options.merge_operator = MergeOperators::CreateStringAppendOperator();
2865   ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2866   assert(db != nullptr);
2867 
2868   // Write some data to the db
2869   WriteBatch batch;
2870   for (int i = 0; i < 3 * MultiGetContext::MAX_BATCH_SIZE; ++i) {
2871     std::string val = "val" + std::to_string(i);
2872     batch.Put(handles[1], key_str[i], val);
2873   }
2874   s = db->Write(write_options, &batch);
2875   ASSERT_OK(s);
2876 
2877   WriteBatchWithIndex wb;
2878   // Write some data to the db
2879   s = wb.Delete(handles[1], std::to_string(1));
2880   ASSERT_OK(s);
2881   s = wb.Put(handles[1], std::to_string(2), "new_val" + std::to_string(2));
2882   ASSERT_OK(s);
2883   // Write a lot of merges so when we call MultiGetFromBatchAndDB later on,
2884   // it is forced to use std::vector in ROCKSDB_NAMESPACE::autovector to
2885   // allocate MergeContexts. The number of merges needs to be >
2886   // MultiGetContext::MAX_BATCH_SIZE
2887   for (int i = 8; i < MultiGetContext::MAX_BATCH_SIZE + 24; ++i) {
2888     s = wb.Merge(handles[1], std::to_string(i), "merge");
2889     ASSERT_OK(s);
2890   }
2891 
2892   // MultiGet a lot of keys in order to force std::vector reallocations
2893   std::vector<Slice> keys;
2894   for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE + 32; ++i) {
2895     keys.emplace_back(key_str[i]);
2896   }
2897   std::vector<PinnableSlice> values(keys.size());
2898   std::vector<Status> statuses(keys.size());
2899 
2900   wb.MultiGetFromBatchAndDB(db, snapshot_read_options, handles[1], keys.size(), keys.data(),
2901                 values.data(), statuses.data(), false);
2902   for (size_t i =0; i < keys.size(); ++i) {
2903     if (i == 1) {
2904       ASSERT_TRUE(statuses[1].IsNotFound());
2905     } else if (i == 2) {
2906       ASSERT_TRUE(statuses[2].ok());
2907       ASSERT_EQ(values[2], "new_val" + std::to_string(2));
2908     } else if (i >= 8 && i < 56) {
2909       ASSERT_TRUE(statuses[i].ok());
2910       ASSERT_EQ(values[i], "val" + std::to_string(i) + ",merge");
2911     } else {
2912       ASSERT_TRUE(statuses[i].ok());
2913       if (values[i] != "val" + std::to_string(i)) {
2914         ASSERT_EQ(values[i], "val" + std::to_string(i));
2915       }
2916     }
2917   }
2918 
2919   for (auto handle : handles) {
2920     delete handle;
2921   }
2922 }
2923 
TEST_P(TransactionTest,ColumnFamiliesTest2)2924 TEST_P(TransactionTest, ColumnFamiliesTest2) {
2925   WriteOptions write_options;
2926   ReadOptions read_options, snapshot_read_options;
2927   string value;
2928   Status s;
2929 
2930   ColumnFamilyHandle *one, *two;
2931   ColumnFamilyOptions cf_options;
2932 
2933   // Create 2 new column families
2934   s = db->CreateColumnFamily(cf_options, "ONE", &one);
2935   ASSERT_OK(s);
2936   s = db->CreateColumnFamily(cf_options, "TWO", &two);
2937   ASSERT_OK(s);
2938 
2939   Transaction* txn1 = db->BeginTransaction(write_options);
2940   ASSERT_TRUE(txn1);
2941   Transaction* txn2 = db->BeginTransaction(write_options);
2942   ASSERT_TRUE(txn2);
2943 
2944   s = txn1->Put(one, "X", "1");
2945   ASSERT_OK(s);
2946   s = txn1->Put(two, "X", "2");
2947   ASSERT_OK(s);
2948   s = txn1->Put("X", "0");
2949   ASSERT_OK(s);
2950 
2951   s = txn2->Put(one, "X", "11");
2952   ASSERT_TRUE(s.IsTimedOut());
2953 
2954   s = txn1->Commit();
2955   ASSERT_OK(s);
2956 
2957   // Drop first column family
2958   s = db->DropColumnFamily(one);
2959   ASSERT_OK(s);
2960 
2961   // Should fail since column family was dropped.
2962   s = txn2->Commit();
2963   ASSERT_OK(s);
2964 
2965   delete txn1;
2966   txn1 = db->BeginTransaction(write_options);
2967   ASSERT_TRUE(txn1);
2968 
2969   // Should fail since column family was dropped
2970   s = txn1->Put(one, "X", "111");
2971   ASSERT_TRUE(s.IsInvalidArgument());
2972 
2973   s = txn1->Put(two, "X", "222");
2974   ASSERT_OK(s);
2975 
2976   s = txn1->Put("X", "000");
2977   ASSERT_OK(s);
2978 
2979   s = txn1->Commit();
2980   ASSERT_OK(s);
2981 
2982   s = db->Get(read_options, two, "X", &value);
2983   ASSERT_OK(s);
2984   ASSERT_EQ("222", value);
2985 
2986   s = db->Get(read_options, "X", &value);
2987   ASSERT_OK(s);
2988   ASSERT_EQ("000", value);
2989 
2990   s = db->DropColumnFamily(two);
2991   ASSERT_OK(s);
2992 
2993   delete txn1;
2994   delete txn2;
2995 
2996   delete one;
2997   delete two;
2998 }
2999 
TEST_P(TransactionTest,EmptyTest)3000 TEST_P(TransactionTest, EmptyTest) {
3001   WriteOptions write_options;
3002   ReadOptions read_options;
3003   string value;
3004   Status s;
3005 
3006   s = db->Put(write_options, "aaa", "aaa");
3007   ASSERT_OK(s);
3008 
3009   Transaction* txn = db->BeginTransaction(write_options);
3010   s = txn->Commit();
3011   ASSERT_OK(s);
3012   delete txn;
3013 
3014   txn = db->BeginTransaction(write_options);
3015   txn->Rollback();
3016   delete txn;
3017 
3018   txn = db->BeginTransaction(write_options);
3019   s = txn->GetForUpdate(read_options, "aaa", &value);
3020   ASSERT_EQ(value, "aaa");
3021 
3022   s = txn->Commit();
3023   ASSERT_OK(s);
3024   delete txn;
3025 
3026   txn = db->BeginTransaction(write_options);
3027   txn->SetSnapshot();
3028 
3029   s = txn->GetForUpdate(read_options, "aaa", &value);
3030   ASSERT_EQ(value, "aaa");
3031 
3032   // Conflicts with previous GetForUpdate
3033   s = db->Put(write_options, "aaa", "xxx");
3034   ASSERT_TRUE(s.IsTimedOut());
3035 
3036   // transaction expired!
3037   s = txn->Commit();
3038   ASSERT_OK(s);
3039   delete txn;
3040 }
3041 
TEST_P(TransactionTest,PredicateManyPreceders)3042 TEST_P(TransactionTest, PredicateManyPreceders) {
3043   WriteOptions write_options;
3044   ReadOptions read_options1, read_options2;
3045   TransactionOptions txn_options;
3046   string value;
3047   Status s;
3048 
3049   txn_options.set_snapshot = true;
3050   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3051   read_options1.snapshot = txn1->GetSnapshot();
3052 
3053   Transaction* txn2 = db->BeginTransaction(write_options);
3054   txn2->SetSnapshot();
3055   read_options2.snapshot = txn2->GetSnapshot();
3056 
3057   std::vector<Slice> multiget_keys = {"1", "2", "3"};
3058   std::vector<std::string> multiget_values;
3059 
3060   std::vector<Status> results =
3061       txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
3062   ASSERT_TRUE(results[1].IsNotFound());
3063 
3064   s = txn2->Put("2", "x");  // Conflict's with txn1's MultiGetForUpdate
3065   ASSERT_TRUE(s.IsTimedOut());
3066 
3067   txn2->Rollback();
3068 
3069   multiget_values.clear();
3070   results =
3071       txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
3072   ASSERT_TRUE(results[1].IsNotFound());
3073 
3074   s = txn1->Commit();
3075   ASSERT_OK(s);
3076 
3077   delete txn1;
3078   delete txn2;
3079 
3080   txn1 = db->BeginTransaction(write_options, txn_options);
3081   read_options1.snapshot = txn1->GetSnapshot();
3082 
3083   txn2 = db->BeginTransaction(write_options, txn_options);
3084   read_options2.snapshot = txn2->GetSnapshot();
3085 
3086   s = txn1->Put("4", "x");
3087   ASSERT_OK(s);
3088 
3089   s = txn2->Delete("4");  // conflict
3090   ASSERT_TRUE(s.IsTimedOut());
3091 
3092   s = txn1->Commit();
3093   ASSERT_OK(s);
3094 
3095   s = txn2->GetForUpdate(read_options2, "4", &value);
3096   ASSERT_TRUE(s.IsBusy());
3097 
3098   txn2->Rollback();
3099 
3100   delete txn1;
3101   delete txn2;
3102 }
3103 
TEST_P(TransactionTest,LostUpdate)3104 TEST_P(TransactionTest, LostUpdate) {
3105   WriteOptions write_options;
3106   ReadOptions read_options, read_options1, read_options2;
3107   TransactionOptions txn_options;
3108   std::string value;
3109   Status s;
3110 
3111   // Test 2 transactions writing to the same key in multiple orders and
3112   // with/without snapshots
3113 
3114   Transaction* txn1 = db->BeginTransaction(write_options);
3115   Transaction* txn2 = db->BeginTransaction(write_options);
3116 
3117   s = txn1->Put("1", "1");
3118   ASSERT_OK(s);
3119 
3120   s = txn2->Put("1", "2");  // conflict
3121   ASSERT_TRUE(s.IsTimedOut());
3122 
3123   s = txn2->Commit();
3124   ASSERT_OK(s);
3125 
3126   s = txn1->Commit();
3127   ASSERT_OK(s);
3128 
3129   s = db->Get(read_options, "1", &value);
3130   ASSERT_OK(s);
3131   ASSERT_EQ("1", value);
3132 
3133   delete txn1;
3134   delete txn2;
3135 
3136   txn_options.set_snapshot = true;
3137   txn1 = db->BeginTransaction(write_options, txn_options);
3138   read_options1.snapshot = txn1->GetSnapshot();
3139 
3140   txn2 = db->BeginTransaction(write_options, txn_options);
3141   read_options2.snapshot = txn2->GetSnapshot();
3142 
3143   s = txn1->Put("1", "3");
3144   ASSERT_OK(s);
3145   s = txn2->Put("1", "4");  // conflict
3146   ASSERT_TRUE(s.IsTimedOut());
3147 
3148   s = txn1->Commit();
3149   ASSERT_OK(s);
3150 
3151   s = txn2->Commit();
3152   ASSERT_OK(s);
3153 
3154   s = db->Get(read_options, "1", &value);
3155   ASSERT_OK(s);
3156   ASSERT_EQ("3", value);
3157 
3158   delete txn1;
3159   delete txn2;
3160 
3161   txn1 = db->BeginTransaction(write_options, txn_options);
3162   read_options1.snapshot = txn1->GetSnapshot();
3163 
3164   txn2 = db->BeginTransaction(write_options, txn_options);
3165   read_options2.snapshot = txn2->GetSnapshot();
3166 
3167   s = txn1->Put("1", "5");
3168   ASSERT_OK(s);
3169 
3170   s = txn1->Commit();
3171   ASSERT_OK(s);
3172 
3173   s = txn2->Put("1", "6");
3174   ASSERT_TRUE(s.IsBusy());
3175   s = txn2->Commit();
3176   ASSERT_OK(s);
3177 
3178   s = db->Get(read_options, "1", &value);
3179   ASSERT_OK(s);
3180   ASSERT_EQ("5", value);
3181 
3182   delete txn1;
3183   delete txn2;
3184 
3185   txn1 = db->BeginTransaction(write_options, txn_options);
3186   read_options1.snapshot = txn1->GetSnapshot();
3187 
3188   txn2 = db->BeginTransaction(write_options, txn_options);
3189   read_options2.snapshot = txn2->GetSnapshot();
3190 
3191   s = txn1->Put("1", "7");
3192   ASSERT_OK(s);
3193   s = txn1->Commit();
3194   ASSERT_OK(s);
3195 
3196   txn2->SetSnapshot();
3197   s = txn2->Put("1", "8");
3198   ASSERT_OK(s);
3199   s = txn2->Commit();
3200   ASSERT_OK(s);
3201 
3202   s = db->Get(read_options, "1", &value);
3203   ASSERT_OK(s);
3204   ASSERT_EQ("8", value);
3205 
3206   delete txn1;
3207   delete txn2;
3208 
3209   txn1 = db->BeginTransaction(write_options);
3210   txn2 = db->BeginTransaction(write_options);
3211 
3212   s = txn1->Put("1", "9");
3213   ASSERT_OK(s);
3214   s = txn1->Commit();
3215   ASSERT_OK(s);
3216 
3217   s = txn2->Put("1", "10");
3218   ASSERT_OK(s);
3219   s = txn2->Commit();
3220   ASSERT_OK(s);
3221 
3222   delete txn1;
3223   delete txn2;
3224 
3225   s = db->Get(read_options, "1", &value);
3226   ASSERT_OK(s);
3227   ASSERT_EQ(value, "10");
3228 }
3229 
TEST_P(TransactionTest,UntrackedWrites)3230 TEST_P(TransactionTest, UntrackedWrites) {
3231   if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3232     // TODO(lth): For WriteUnprepared, validate that untracked writes are
3233     // not supported.
3234     return;
3235   }
3236 
3237   WriteOptions write_options;
3238   ReadOptions read_options;
3239   std::string value;
3240   Status s;
3241 
3242   // Verify transaction rollback works for untracked keys.
3243   Transaction* txn = db->BeginTransaction(write_options);
3244   txn->SetSnapshot();
3245 
3246   s = txn->PutUntracked("untracked", "0");
3247   ASSERT_OK(s);
3248   txn->Rollback();
3249   s = db->Get(read_options, "untracked", &value);
3250   ASSERT_TRUE(s.IsNotFound());
3251 
3252   delete txn;
3253   txn = db->BeginTransaction(write_options);
3254   txn->SetSnapshot();
3255 
3256   s = db->Put(write_options, "untracked", "x");
3257   ASSERT_OK(s);
3258 
3259   // Untracked writes should succeed even though key was written after snapshot
3260   s = txn->PutUntracked("untracked", "1");
3261   ASSERT_OK(s);
3262   s = txn->MergeUntracked("untracked", "2");
3263   ASSERT_OK(s);
3264   s = txn->DeleteUntracked("untracked");
3265   ASSERT_OK(s);
3266 
3267   // Conflict
3268   s = txn->Put("untracked", "3");
3269   ASSERT_TRUE(s.IsBusy());
3270 
3271   s = txn->Commit();
3272   ASSERT_OK(s);
3273 
3274   s = db->Get(read_options, "untracked", &value);
3275   ASSERT_TRUE(s.IsNotFound());
3276 
3277   delete txn;
3278 }
3279 
TEST_P(TransactionTest,ExpiredTransaction)3280 TEST_P(TransactionTest, ExpiredTransaction) {
3281   WriteOptions write_options;
3282   ReadOptions read_options;
3283   TransactionOptions txn_options;
3284   string value;
3285   Status s;
3286 
3287   // Set txn expiration timeout to 0 microseconds (expires instantly)
3288   txn_options.expiration = 0;
3289   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3290 
3291   s = txn1->Put("X", "1");
3292   ASSERT_OK(s);
3293 
3294   s = txn1->Put("Y", "1");
3295   ASSERT_OK(s);
3296 
3297   Transaction* txn2 = db->BeginTransaction(write_options);
3298 
3299   // txn2 should be able to write to X since txn1 has expired
3300   s = txn2->Put("X", "2");
3301   ASSERT_OK(s);
3302 
3303   s = txn2->Commit();
3304   ASSERT_OK(s);
3305   s = db->Get(read_options, "X", &value);
3306   ASSERT_OK(s);
3307   ASSERT_EQ("2", value);
3308 
3309   s = txn1->Put("Z", "1");
3310   ASSERT_OK(s);
3311 
3312   // txn1 should fail to commit since it is expired
3313   s = txn1->Commit();
3314   ASSERT_TRUE(s.IsExpired());
3315 
3316   s = db->Get(read_options, "Y", &value);
3317   ASSERT_TRUE(s.IsNotFound());
3318 
3319   s = db->Get(read_options, "Z", &value);
3320   ASSERT_TRUE(s.IsNotFound());
3321 
3322   delete txn1;
3323   delete txn2;
3324 }
3325 
TEST_P(TransactionTest,ReinitializeTest)3326 TEST_P(TransactionTest, ReinitializeTest) {
3327   WriteOptions write_options;
3328   ReadOptions read_options;
3329   TransactionOptions txn_options;
3330   std::string value;
3331   Status s;
3332 
3333   // Set txn expiration timeout to 0 microseconds (expires instantly)
3334   txn_options.expiration = 0;
3335   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3336 
3337   // Reinitialize transaction to no long expire
3338   txn_options.expiration = -1;
3339   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3340 
3341   s = txn1->Put("Z", "z");
3342   ASSERT_OK(s);
3343 
3344   // Should commit since not expired
3345   s = txn1->Commit();
3346   ASSERT_OK(s);
3347 
3348   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3349 
3350   s = txn1->Put("Z", "zz");
3351   ASSERT_OK(s);
3352 
3353   // Reinitilize txn1 and verify that Z gets unlocked
3354   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3355 
3356   Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr);
3357   s = txn2->Put("Z", "zzz");
3358   ASSERT_OK(s);
3359   s = txn2->Commit();
3360   ASSERT_OK(s);
3361   delete txn2;
3362 
3363   s = db->Get(read_options, "Z", &value);
3364   ASSERT_OK(s);
3365   ASSERT_EQ(value, "zzz");
3366 
3367   // Verify snapshots get reinitialized correctly
3368   txn1->SetSnapshot();
3369   s = txn1->Put("Z", "zzzz");
3370   ASSERT_OK(s);
3371 
3372   s = txn1->Commit();
3373   ASSERT_OK(s);
3374 
3375   s = db->Get(read_options, "Z", &value);
3376   ASSERT_OK(s);
3377   ASSERT_EQ(value, "zzzz");
3378 
3379   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3380   const Snapshot* snapshot = txn1->GetSnapshot();
3381   ASSERT_FALSE(snapshot);
3382 
3383   txn_options.set_snapshot = true;
3384   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3385   snapshot = txn1->GetSnapshot();
3386   ASSERT_TRUE(snapshot);
3387 
3388   s = txn1->Put("Z", "a");
3389   ASSERT_OK(s);
3390 
3391   txn1->Rollback();
3392 
3393   s = txn1->Put("Y", "y");
3394   ASSERT_OK(s);
3395 
3396   txn_options.set_snapshot = false;
3397   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3398   snapshot = txn1->GetSnapshot();
3399   ASSERT_FALSE(snapshot);
3400 
3401   s = txn1->Put("X", "x");
3402   ASSERT_OK(s);
3403 
3404   s = txn1->Commit();
3405   ASSERT_OK(s);
3406 
3407   s = db->Get(read_options, "Z", &value);
3408   ASSERT_OK(s);
3409   ASSERT_EQ(value, "zzzz");
3410 
3411   s = db->Get(read_options, "Y", &value);
3412   ASSERT_TRUE(s.IsNotFound());
3413 
3414   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3415 
3416   s = txn1->SetName("name");
3417   ASSERT_OK(s);
3418 
3419   s = txn1->Prepare();
3420   ASSERT_OK(s);
3421   s = txn1->Commit();
3422   ASSERT_OK(s);
3423 
3424   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3425 
3426   s = txn1->SetName("name");
3427   ASSERT_OK(s);
3428 
3429   delete txn1;
3430 }
3431 
TEST_P(TransactionTest,Rollback)3432 TEST_P(TransactionTest, Rollback) {
3433   WriteOptions write_options;
3434   ReadOptions read_options;
3435   TransactionOptions txn_options;
3436   std::string value;
3437   Status s;
3438 
3439   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3440 
3441   ASSERT_OK(s);
3442 
3443   s = txn1->Put("X", "1");
3444   ASSERT_OK(s);
3445 
3446   Transaction* txn2 = db->BeginTransaction(write_options);
3447 
3448   // txn2 should not be able to write to X since txn1 has it locked
3449   s = txn2->Put("X", "2");
3450   ASSERT_TRUE(s.IsTimedOut());
3451 
3452   txn1->Rollback();
3453   delete txn1;
3454 
3455   // txn2 should now be able to write to X
3456   s = txn2->Put("X", "3");
3457   ASSERT_OK(s);
3458 
3459   s = txn2->Commit();
3460   ASSERT_OK(s);
3461 
3462   s = db->Get(read_options, "X", &value);
3463   ASSERT_OK(s);
3464   ASSERT_EQ("3", value);
3465 
3466   delete txn2;
3467 }
3468 
TEST_P(TransactionTest,LockLimitTest)3469 TEST_P(TransactionTest, LockLimitTest) {
3470   WriteOptions write_options;
3471   ReadOptions read_options, snapshot_read_options;
3472   TransactionOptions txn_options;
3473   string value;
3474   Status s;
3475 
3476   delete db;
3477   db = nullptr;
3478 
3479   // Open DB with a lock limit of 3
3480   txn_db_options.max_num_locks = 3;
3481   ASSERT_OK(ReOpen());
3482   assert(db != nullptr);
3483   ASSERT_OK(s);
3484 
3485   // Create a txn and verify we can only lock up to 3 keys
3486   Transaction* txn = db->BeginTransaction(write_options, txn_options);
3487   ASSERT_TRUE(txn);
3488 
3489   s = txn->Put("X", "x");
3490   ASSERT_OK(s);
3491 
3492   s = txn->Put("Y", "y");
3493   ASSERT_OK(s);
3494 
3495   s = txn->Put("Z", "z");
3496   ASSERT_OK(s);
3497 
3498   // lock limit reached
3499   s = txn->Put("W", "w");
3500   ASSERT_TRUE(s.IsBusy());
3501 
3502   // re-locking same key shouldn't put us over the limit
3503   s = txn->Put("X", "xx");
3504   ASSERT_OK(s);
3505 
3506   s = txn->GetForUpdate(read_options, "W", &value);
3507   ASSERT_TRUE(s.IsBusy());
3508   s = txn->GetForUpdate(read_options, "V", &value);
3509   ASSERT_TRUE(s.IsBusy());
3510 
3511   // re-locking same key shouldn't put us over the limit
3512   s = txn->GetForUpdate(read_options, "Y", &value);
3513   ASSERT_OK(s);
3514   ASSERT_EQ("y", value);
3515 
3516   s = txn->Get(read_options, "W", &value);
3517   ASSERT_TRUE(s.IsNotFound());
3518 
3519   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
3520   ASSERT_TRUE(txn2);
3521 
3522   // "X" currently locked
3523   s = txn2->Put("X", "x");
3524   ASSERT_TRUE(s.IsTimedOut());
3525 
3526   // lock limit reached
3527   s = txn2->Put("M", "m");
3528   ASSERT_TRUE(s.IsBusy());
3529 
3530   s = txn->Commit();
3531   ASSERT_OK(s);
3532 
3533   s = db->Get(read_options, "X", &value);
3534   ASSERT_OK(s);
3535   ASSERT_EQ("xx", value);
3536 
3537   s = db->Get(read_options, "W", &value);
3538   ASSERT_TRUE(s.IsNotFound());
3539 
3540   // Committing txn should release its locks and allow txn2 to proceed
3541   s = txn2->Put("X", "x2");
3542   ASSERT_OK(s);
3543 
3544   s = txn2->Delete("X");
3545   ASSERT_OK(s);
3546 
3547   s = txn2->Put("M", "m");
3548   ASSERT_OK(s);
3549 
3550   s = txn2->Put("Z", "z2");
3551   ASSERT_OK(s);
3552 
3553   // lock limit reached
3554   s = txn2->Delete("Y");
3555   ASSERT_TRUE(s.IsBusy());
3556 
3557   s = txn2->Commit();
3558   ASSERT_OK(s);
3559 
3560   s = db->Get(read_options, "Z", &value);
3561   ASSERT_OK(s);
3562   ASSERT_EQ("z2", value);
3563 
3564   s = db->Get(read_options, "Y", &value);
3565   ASSERT_OK(s);
3566   ASSERT_EQ("y", value);
3567 
3568   s = db->Get(read_options, "X", &value);
3569   ASSERT_TRUE(s.IsNotFound());
3570 
3571   delete txn;
3572   delete txn2;
3573 }
3574 
TEST_P(TransactionTest,IteratorTest)3575 TEST_P(TransactionTest, IteratorTest) {
3576   // This test does writes without snapshot validation, and then tries to create
3577   // iterator later, which is unsupported in write unprepared.
3578   if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3579     return;
3580   }
3581 
3582   WriteOptions write_options;
3583   ReadOptions read_options, snapshot_read_options;
3584   std::string value;
3585   Status s;
3586 
3587   // Write some keys to the db
3588   s = db->Put(write_options, "A", "a");
3589   ASSERT_OK(s);
3590 
3591   s = db->Put(write_options, "G", "g");
3592   ASSERT_OK(s);
3593 
3594   s = db->Put(write_options, "F", "f");
3595   ASSERT_OK(s);
3596 
3597   s = db->Put(write_options, "C", "c");
3598   ASSERT_OK(s);
3599 
3600   s = db->Put(write_options, "D", "d");
3601   ASSERT_OK(s);
3602 
3603   Transaction* txn = db->BeginTransaction(write_options);
3604   ASSERT_TRUE(txn);
3605 
3606   // Write some keys in a txn
3607   s = txn->Put("B", "b");
3608   ASSERT_OK(s);
3609 
3610   s = txn->Put("H", "h");
3611   ASSERT_OK(s);
3612 
3613   s = txn->Delete("D");
3614   ASSERT_OK(s);
3615 
3616   s = txn->Put("E", "e");
3617   ASSERT_OK(s);
3618 
3619   txn->SetSnapshot();
3620   const Snapshot* snapshot = txn->GetSnapshot();
3621 
3622   // Write some keys to the db after the snapshot
3623   s = db->Put(write_options, "BB", "xx");
3624   ASSERT_OK(s);
3625 
3626   s = db->Put(write_options, "C", "xx");
3627   ASSERT_OK(s);
3628 
3629   read_options.snapshot = snapshot;
3630   Iterator* iter = txn->GetIterator(read_options);
3631   ASSERT_OK(iter->status());
3632   iter->SeekToFirst();
3633 
3634   // Read all keys via iter and lock them all
3635   std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
3636   for (int i = 0; i < 7; i++) {
3637     ASSERT_OK(iter->status());
3638     ASSERT_TRUE(iter->Valid());
3639     ASSERT_EQ(results[i], iter->value().ToString());
3640 
3641     s = txn->GetForUpdate(read_options, iter->key(), nullptr);
3642     if (i == 2) {
3643       // "C" was modified after txn's snapshot
3644       ASSERT_TRUE(s.IsBusy());
3645     } else {
3646       ASSERT_OK(s);
3647     }
3648 
3649     iter->Next();
3650   }
3651   ASSERT_FALSE(iter->Valid());
3652 
3653   iter->Seek("G");
3654   ASSERT_OK(iter->status());
3655   ASSERT_TRUE(iter->Valid());
3656   ASSERT_EQ("g", iter->value().ToString());
3657 
3658   iter->Prev();
3659   ASSERT_OK(iter->status());
3660   ASSERT_TRUE(iter->Valid());
3661   ASSERT_EQ("f", iter->value().ToString());
3662 
3663   iter->Seek("D");
3664   ASSERT_OK(iter->status());
3665   ASSERT_TRUE(iter->Valid());
3666   ASSERT_EQ("e", iter->value().ToString());
3667 
3668   iter->Seek("C");
3669   ASSERT_OK(iter->status());
3670   ASSERT_TRUE(iter->Valid());
3671   ASSERT_EQ("c", iter->value().ToString());
3672 
3673   iter->Next();
3674   ASSERT_OK(iter->status());
3675   ASSERT_TRUE(iter->Valid());
3676   ASSERT_EQ("e", iter->value().ToString());
3677 
3678   iter->Seek("");
3679   ASSERT_OK(iter->status());
3680   ASSERT_TRUE(iter->Valid());
3681   ASSERT_EQ("a", iter->value().ToString());
3682 
3683   iter->Seek("X");
3684   ASSERT_OK(iter->status());
3685   ASSERT_FALSE(iter->Valid());
3686 
3687   iter->SeekToLast();
3688   ASSERT_OK(iter->status());
3689   ASSERT_TRUE(iter->Valid());
3690   ASSERT_EQ("h", iter->value().ToString());
3691 
3692   s = txn->Commit();
3693   ASSERT_OK(s);
3694 
3695   delete iter;
3696   delete txn;
3697 }
3698 
TEST_P(TransactionTest,DisableIndexingTest)3699 TEST_P(TransactionTest, DisableIndexingTest) {
3700   // Skip this test for write unprepared. It does not solely rely on WBWI for
3701   // read your own writes, so depending on whether batches are flushed or not,
3702   // only some writes will be visible.
3703   //
3704   // Also, write unprepared does not support creating iterators if there has
3705   // been txn->Put() without snapshot validation.
3706   if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3707     return;
3708   }
3709 
3710   WriteOptions write_options;
3711   ReadOptions read_options;
3712   std::string value;
3713   Status s;
3714 
3715   Transaction* txn = db->BeginTransaction(write_options);
3716   ASSERT_TRUE(txn);
3717 
3718   s = txn->Put("A", "a");
3719   ASSERT_OK(s);
3720 
3721   s = txn->Get(read_options, "A", &value);
3722   ASSERT_OK(s);
3723   ASSERT_EQ("a", value);
3724 
3725   txn->DisableIndexing();
3726 
3727   s = txn->Put("B", "b");
3728   ASSERT_OK(s);
3729 
3730   s = txn->Get(read_options, "B", &value);
3731   ASSERT_TRUE(s.IsNotFound());
3732 
3733   Iterator* iter = txn->GetIterator(read_options);
3734   ASSERT_OK(iter->status());
3735 
3736   iter->Seek("B");
3737   ASSERT_OK(iter->status());
3738   ASSERT_FALSE(iter->Valid());
3739 
3740   s = txn->Delete("A");
3741 
3742   s = txn->Get(read_options, "A", &value);
3743   ASSERT_OK(s);
3744   ASSERT_EQ("a", value);
3745 
3746   txn->EnableIndexing();
3747 
3748   s = txn->Put("B", "bb");
3749   ASSERT_OK(s);
3750 
3751   iter->Seek("B");
3752   ASSERT_OK(iter->status());
3753   ASSERT_TRUE(iter->Valid());
3754   ASSERT_EQ("bb", iter->value().ToString());
3755 
3756   s = txn->Get(read_options, "B", &value);
3757   ASSERT_OK(s);
3758   ASSERT_EQ("bb", value);
3759 
3760   s = txn->Put("A", "aa");
3761   ASSERT_OK(s);
3762 
3763   s = txn->Get(read_options, "A", &value);
3764   ASSERT_OK(s);
3765   ASSERT_EQ("aa", value);
3766 
3767   delete iter;
3768   delete txn;
3769 }
3770 
TEST_P(TransactionTest,SavepointTest)3771 TEST_P(TransactionTest, SavepointTest) {
3772   WriteOptions write_options;
3773   ReadOptions read_options, snapshot_read_options;
3774   std::string value;
3775   Status s;
3776 
3777   Transaction* txn = db->BeginTransaction(write_options);
3778   ASSERT_TRUE(txn);
3779 
3780   ASSERT_EQ(0, txn->GetNumPuts());
3781 
3782   s = txn->RollbackToSavePoint();
3783   ASSERT_TRUE(s.IsNotFound());
3784 
3785   txn->SetSavePoint();  // 1
3786 
3787   ASSERT_OK(txn->RollbackToSavePoint());  // Rollback to beginning of txn
3788   s = txn->RollbackToSavePoint();
3789   ASSERT_TRUE(s.IsNotFound());
3790 
3791   s = txn->Put("B", "b");
3792   ASSERT_OK(s);
3793 
3794   ASSERT_EQ(1, txn->GetNumPuts());
3795   ASSERT_EQ(0, txn->GetNumDeletes());
3796 
3797   s = txn->Commit();
3798   ASSERT_OK(s);
3799 
3800   s = db->Get(read_options, "B", &value);
3801   ASSERT_OK(s);
3802   ASSERT_EQ("b", value);
3803 
3804   delete txn;
3805   txn = db->BeginTransaction(write_options);
3806   ASSERT_TRUE(txn);
3807 
3808   s = txn->Put("A", "a");
3809   ASSERT_OK(s);
3810 
3811   s = txn->Put("B", "bb");
3812   ASSERT_OK(s);
3813 
3814   s = txn->Put("C", "c");
3815   ASSERT_OK(s);
3816 
3817   txn->SetSavePoint();  // 2
3818 
3819   s = txn->Delete("B");
3820   ASSERT_OK(s);
3821 
3822   s = txn->Put("C", "cc");
3823   ASSERT_OK(s);
3824 
3825   s = txn->Put("D", "d");
3826   ASSERT_OK(s);
3827 
3828   ASSERT_EQ(5, txn->GetNumPuts());
3829   ASSERT_EQ(1, txn->GetNumDeletes());
3830 
3831   ASSERT_OK(txn->RollbackToSavePoint());  // Rollback to 2
3832 
3833   ASSERT_EQ(3, txn->GetNumPuts());
3834   ASSERT_EQ(0, txn->GetNumDeletes());
3835 
3836   s = txn->Get(read_options, "A", &value);
3837   ASSERT_OK(s);
3838   ASSERT_EQ("a", value);
3839 
3840   s = txn->Get(read_options, "B", &value);
3841   ASSERT_OK(s);
3842   ASSERT_EQ("bb", value);
3843 
3844   s = txn->Get(read_options, "C", &value);
3845   ASSERT_OK(s);
3846   ASSERT_EQ("c", value);
3847 
3848   s = txn->Get(read_options, "D", &value);
3849   ASSERT_TRUE(s.IsNotFound());
3850 
3851   s = txn->Put("A", "a");
3852   ASSERT_OK(s);
3853 
3854   s = txn->Put("E", "e");
3855   ASSERT_OK(s);
3856 
3857   ASSERT_EQ(5, txn->GetNumPuts());
3858   ASSERT_EQ(0, txn->GetNumDeletes());
3859 
3860   // Rollback to beginning of txn
3861   s = txn->RollbackToSavePoint();
3862   ASSERT_TRUE(s.IsNotFound());
3863   txn->Rollback();
3864 
3865   ASSERT_EQ(0, txn->GetNumPuts());
3866   ASSERT_EQ(0, txn->GetNumDeletes());
3867 
3868   s = txn->Get(read_options, "A", &value);
3869   ASSERT_TRUE(s.IsNotFound());
3870 
3871   s = txn->Get(read_options, "B", &value);
3872   ASSERT_OK(s);
3873   ASSERT_EQ("b", value);
3874 
3875   s = txn->Get(read_options, "D", &value);
3876   ASSERT_TRUE(s.IsNotFound());
3877 
3878   s = txn->Get(read_options, "D", &value);
3879   ASSERT_TRUE(s.IsNotFound());
3880 
3881   s = txn->Get(read_options, "E", &value);
3882   ASSERT_TRUE(s.IsNotFound());
3883 
3884   s = txn->Put("A", "aa");
3885   ASSERT_OK(s);
3886 
3887   s = txn->Put("F", "f");
3888   ASSERT_OK(s);
3889 
3890   ASSERT_EQ(2, txn->GetNumPuts());
3891   ASSERT_EQ(0, txn->GetNumDeletes());
3892 
3893   txn->SetSavePoint();  // 3
3894   txn->SetSavePoint();  // 4
3895 
3896   s = txn->Put("G", "g");
3897   ASSERT_OK(s);
3898 
3899   s = txn->SingleDelete("F");
3900   ASSERT_OK(s);
3901 
3902   s = txn->Delete("B");
3903   ASSERT_OK(s);
3904 
3905   s = txn->Get(read_options, "A", &value);
3906   ASSERT_OK(s);
3907   ASSERT_EQ("aa", value);
3908 
3909   s = txn->Get(read_options, "F", &value);
3910   // According to db.h, doing a SingleDelete on a key that has been
3911   // overwritten will have undefinied behavior.  So it is unclear what the
3912   // result of fetching "F" should be. The current implementation will
3913   // return NotFound in this case.
3914   ASSERT_TRUE(s.IsNotFound());
3915 
3916   s = txn->Get(read_options, "B", &value);
3917   ASSERT_TRUE(s.IsNotFound());
3918 
3919   ASSERT_EQ(3, txn->GetNumPuts());
3920   ASSERT_EQ(2, txn->GetNumDeletes());
3921 
3922   ASSERT_OK(txn->RollbackToSavePoint());  // Rollback to 3
3923 
3924   ASSERT_EQ(2, txn->GetNumPuts());
3925   ASSERT_EQ(0, txn->GetNumDeletes());
3926 
3927   s = txn->Get(read_options, "F", &value);
3928   ASSERT_OK(s);
3929   ASSERT_EQ("f", value);
3930 
3931   s = txn->Get(read_options, "G", &value);
3932   ASSERT_TRUE(s.IsNotFound());
3933 
3934   s = txn->Commit();
3935   ASSERT_OK(s);
3936 
3937   s = db->Get(read_options, "F", &value);
3938   ASSERT_OK(s);
3939   ASSERT_EQ("f", value);
3940 
3941   s = db->Get(read_options, "G", &value);
3942   ASSERT_TRUE(s.IsNotFound());
3943 
3944   s = db->Get(read_options, "A", &value);
3945   ASSERT_OK(s);
3946   ASSERT_EQ("aa", value);
3947 
3948   s = db->Get(read_options, "B", &value);
3949   ASSERT_OK(s);
3950   ASSERT_EQ("b", value);
3951 
3952   s = db->Get(read_options, "C", &value);
3953   ASSERT_TRUE(s.IsNotFound());
3954 
3955   s = db->Get(read_options, "D", &value);
3956   ASSERT_TRUE(s.IsNotFound());
3957 
3958   s = db->Get(read_options, "E", &value);
3959   ASSERT_TRUE(s.IsNotFound());
3960 
3961   delete txn;
3962 }
3963 
TEST_P(TransactionTest,SavepointTest2)3964 TEST_P(TransactionTest, SavepointTest2) {
3965   WriteOptions write_options;
3966   ReadOptions read_options;
3967   TransactionOptions txn_options;
3968   Status s;
3969 
3970   txn_options.lock_timeout = 1;  // 1 ms
3971   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3972   ASSERT_TRUE(txn1);
3973 
3974   s = txn1->Put("A", "");
3975   ASSERT_OK(s);
3976 
3977   txn1->SetSavePoint();  // 1
3978 
3979   s = txn1->Put("A", "a");
3980   ASSERT_OK(s);
3981 
3982   s = txn1->Put("C", "c");
3983   ASSERT_OK(s);
3984 
3985   txn1->SetSavePoint();  // 2
3986 
3987   s = txn1->Put("A", "a");
3988   ASSERT_OK(s);
3989   s = txn1->Put("B", "b");
3990   ASSERT_OK(s);
3991 
3992   ASSERT_OK(txn1->RollbackToSavePoint());  // Rollback to 2
3993 
3994   // Verify that "A" and "C" is still locked while "B" is not
3995   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
3996   ASSERT_TRUE(txn2);
3997 
3998   s = txn2->Put("A", "a2");
3999   ASSERT_TRUE(s.IsTimedOut());
4000   s = txn2->Put("C", "c2");
4001   ASSERT_TRUE(s.IsTimedOut());
4002   s = txn2->Put("B", "b2");
4003   ASSERT_OK(s);
4004 
4005   s = txn1->Put("A", "aa");
4006   ASSERT_OK(s);
4007   s = txn1->Put("B", "bb");
4008   ASSERT_TRUE(s.IsTimedOut());
4009 
4010   s = txn2->Commit();
4011   ASSERT_OK(s);
4012   delete txn2;
4013 
4014   s = txn1->Put("A", "aaa");
4015   ASSERT_OK(s);
4016   s = txn1->Put("B", "bbb");
4017   ASSERT_OK(s);
4018   s = txn1->Put("C", "ccc");
4019   ASSERT_OK(s);
4020 
4021   txn1->SetSavePoint();                    // 3
4022   ASSERT_OK(txn1->RollbackToSavePoint());  // Rollback to 3
4023 
4024   // Verify that "A", "B", "C" are still locked
4025   txn2 = db->BeginTransaction(write_options, txn_options);
4026   ASSERT_TRUE(txn2);
4027 
4028   s = txn2->Put("A", "a2");
4029   ASSERT_TRUE(s.IsTimedOut());
4030   s = txn2->Put("B", "b2");
4031   ASSERT_TRUE(s.IsTimedOut());
4032   s = txn2->Put("C", "c2");
4033   ASSERT_TRUE(s.IsTimedOut());
4034 
4035   ASSERT_OK(txn1->RollbackToSavePoint());  // Rollback to 1
4036 
4037   // Verify that only "A" is locked
4038   s = txn2->Put("A", "a3");
4039   ASSERT_TRUE(s.IsTimedOut());
4040   s = txn2->Put("B", "b3");
4041   ASSERT_OK(s);
4042   s = txn2->Put("C", "c3po");
4043   ASSERT_OK(s);
4044 
4045   s = txn1->Commit();
4046   ASSERT_OK(s);
4047   delete txn1;
4048 
4049   // Verify "A" "C" "B" are no longer locked
4050   s = txn2->Put("A", "a4");
4051   ASSERT_OK(s);
4052   s = txn2->Put("B", "b4");
4053   ASSERT_OK(s);
4054   s = txn2->Put("C", "c4");
4055   ASSERT_OK(s);
4056 
4057   s = txn2->Commit();
4058   ASSERT_OK(s);
4059   delete txn2;
4060 }
4061 
TEST_P(TransactionTest,SavepointTest3)4062 TEST_P(TransactionTest, SavepointTest3) {
4063   WriteOptions write_options;
4064   ReadOptions read_options;
4065   TransactionOptions txn_options;
4066   Status s;
4067 
4068   txn_options.lock_timeout = 1;  // 1 ms
4069   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4070   ASSERT_TRUE(txn1);
4071 
4072   s = txn1->PopSavePoint();  // No SavePoint present
4073   ASSERT_TRUE(s.IsNotFound());
4074 
4075   s = txn1->Put("A", "");
4076   ASSERT_OK(s);
4077 
4078   s = txn1->PopSavePoint();  // Still no SavePoint present
4079   ASSERT_TRUE(s.IsNotFound());
4080 
4081   txn1->SetSavePoint();  // 1
4082 
4083   s = txn1->Put("A", "a");
4084   ASSERT_OK(s);
4085 
4086   s = txn1->PopSavePoint();  // Remove 1
4087   ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
4088 
4089   // Verify that "A" is still locked
4090   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4091   ASSERT_TRUE(txn2);
4092 
4093   s = txn2->Put("A", "a2");
4094   ASSERT_TRUE(s.IsTimedOut());
4095   delete txn2;
4096 
4097   txn1->SetSavePoint();  // 2
4098 
4099   s = txn1->Put("B", "b");
4100   ASSERT_OK(s);
4101 
4102   txn1->SetSavePoint();  // 3
4103 
4104   s = txn1->Put("B", "b2");
4105   ASSERT_OK(s);
4106 
4107   ASSERT_OK(txn1->RollbackToSavePoint());  // Roll back to 2
4108 
4109   s = txn1->PopSavePoint();
4110   ASSERT_OK(s);
4111 
4112   s = txn1->PopSavePoint();
4113   ASSERT_TRUE(s.IsNotFound());
4114 
4115   s = txn1->Commit();
4116   ASSERT_OK(s);
4117   delete txn1;
4118 
4119   std::string value;
4120 
4121   // tnx1 should have modified "A" to "a"
4122   s = db->Get(read_options, "A", &value);
4123   ASSERT_OK(s);
4124   ASSERT_EQ("a", value);
4125 
4126   // tnx1 should have set "B" to just "b"
4127   s = db->Get(read_options, "B", &value);
4128   ASSERT_OK(s);
4129   ASSERT_EQ("b", value);
4130 
4131   s = db->Get(read_options, "C", &value);
4132   ASSERT_TRUE(s.IsNotFound());
4133 }
4134 
TEST_P(TransactionTest,SavepointTest4)4135 TEST_P(TransactionTest, SavepointTest4) {
4136   WriteOptions write_options;
4137   ReadOptions read_options;
4138   TransactionOptions txn_options;
4139   Status s;
4140 
4141   txn_options.lock_timeout = 1;  // 1 ms
4142   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4143   ASSERT_TRUE(txn1);
4144 
4145   txn1->SetSavePoint();  // 1
4146   s = txn1->Put("A", "a");
4147   ASSERT_OK(s);
4148 
4149   txn1->SetSavePoint();  // 2
4150   s = txn1->Put("B", "b");
4151   ASSERT_OK(s);
4152 
4153   s = txn1->PopSavePoint();  // Remove 2
4154   ASSERT_OK(s);
4155 
4156   // Verify that A/B still exists.
4157   std::string value;
4158   ASSERT_OK(txn1->Get(read_options, "A", &value));
4159   ASSERT_EQ("a", value);
4160 
4161   ASSERT_OK(txn1->Get(read_options, "B", &value));
4162   ASSERT_EQ("b", value);
4163 
4164   ASSERT_OK(txn1->RollbackToSavePoint());  // Rollback to 1
4165 
4166   // Verify that everything was rolled back.
4167   s = txn1->Get(read_options, "A", &value);
4168   ASSERT_TRUE(s.IsNotFound());
4169 
4170   s = txn1->Get(read_options, "B", &value);
4171   ASSERT_TRUE(s.IsNotFound());
4172 
4173   // Nothing should be locked
4174   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4175   ASSERT_TRUE(txn2);
4176 
4177   s = txn2->Put("A", "");
4178   ASSERT_OK(s);
4179 
4180   s = txn2->Put("B", "");
4181   ASSERT_OK(s);
4182 
4183   delete txn2;
4184   delete txn1;
4185 }
4186 
TEST_P(TransactionTest,UndoGetForUpdateTest)4187 TEST_P(TransactionTest, UndoGetForUpdateTest) {
4188   WriteOptions write_options;
4189   ReadOptions read_options;
4190   TransactionOptions txn_options;
4191   std::string value;
4192   Status s;
4193 
4194   txn_options.lock_timeout = 1;  // 1 ms
4195   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4196   ASSERT_TRUE(txn1);
4197 
4198   txn1->UndoGetForUpdate("A");
4199 
4200   s = txn1->Commit();
4201   ASSERT_OK(s);
4202   delete txn1;
4203 
4204   txn1 = db->BeginTransaction(write_options, txn_options);
4205 
4206   txn1->UndoGetForUpdate("A");
4207   s = txn1->GetForUpdate(read_options, "A", &value);
4208   ASSERT_TRUE(s.IsNotFound());
4209 
4210   // Verify that A is locked
4211   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4212   s = txn2->Put("A", "a");
4213   ASSERT_TRUE(s.IsTimedOut());
4214 
4215   txn1->UndoGetForUpdate("A");
4216 
4217   // Verify that A is now unlocked
4218   s = txn2->Put("A", "a2");
4219   ASSERT_OK(s);
4220   txn2->Commit();
4221   delete txn2;
4222   s = db->Get(read_options, "A", &value);
4223   ASSERT_OK(s);
4224   ASSERT_EQ("a2", value);
4225 
4226   s = txn1->Delete("A");
4227   ASSERT_OK(s);
4228   s = txn1->GetForUpdate(read_options, "A", &value);
4229   ASSERT_TRUE(s.IsNotFound());
4230 
4231   s = txn1->Put("B", "b3");
4232   ASSERT_OK(s);
4233   s = txn1->GetForUpdate(read_options, "B", &value);
4234   ASSERT_OK(s);
4235 
4236   txn1->UndoGetForUpdate("A");
4237   txn1->UndoGetForUpdate("B");
4238 
4239   // Verify that A and B are still locked
4240   txn2 = db->BeginTransaction(write_options, txn_options);
4241   s = txn2->Put("A", "a4");
4242   ASSERT_TRUE(s.IsTimedOut());
4243   s = txn2->Put("B", "b4");
4244   ASSERT_TRUE(s.IsTimedOut());
4245 
4246   txn1->Rollback();
4247   delete txn1;
4248 
4249   // Verify that A and B are no longer locked
4250   s = txn2->Put("A", "a5");
4251   ASSERT_OK(s);
4252   s = txn2->Put("B", "b5");
4253   ASSERT_OK(s);
4254   s = txn2->Commit();
4255   delete txn2;
4256   ASSERT_OK(s);
4257 
4258   txn1 = db->BeginTransaction(write_options, txn_options);
4259 
4260   s = txn1->GetForUpdate(read_options, "A", &value);
4261   ASSERT_OK(s);
4262   s = txn1->GetForUpdate(read_options, "A", &value);
4263   ASSERT_OK(s);
4264   s = txn1->GetForUpdate(read_options, "C", &value);
4265   ASSERT_TRUE(s.IsNotFound());
4266   s = txn1->GetForUpdate(read_options, "A", &value);
4267   ASSERT_OK(s);
4268   s = txn1->GetForUpdate(read_options, "C", &value);
4269   ASSERT_TRUE(s.IsNotFound());
4270   s = txn1->GetForUpdate(read_options, "B", &value);
4271   ASSERT_OK(s);
4272   s = txn1->Put("B", "b5");
4273   s = txn1->GetForUpdate(read_options, "B", &value);
4274   ASSERT_OK(s);
4275 
4276   txn1->UndoGetForUpdate("A");
4277   txn1->UndoGetForUpdate("B");
4278   txn1->UndoGetForUpdate("C");
4279   txn1->UndoGetForUpdate("X");
4280 
4281   // Verify A,B,C are locked
4282   txn2 = db->BeginTransaction(write_options, txn_options);
4283   s = txn2->Put("A", "a6");
4284   ASSERT_TRUE(s.IsTimedOut());
4285   s = txn2->Delete("B");
4286   ASSERT_TRUE(s.IsTimedOut());
4287   s = txn2->Put("C", "c6");
4288   ASSERT_TRUE(s.IsTimedOut());
4289   s = txn2->Put("X", "x6");
4290   ASSERT_OK(s);
4291 
4292   txn1->UndoGetForUpdate("A");
4293   txn1->UndoGetForUpdate("B");
4294   txn1->UndoGetForUpdate("C");
4295   txn1->UndoGetForUpdate("X");
4296 
4297   // Verify A,B are locked and C is not
4298   s = txn2->Put("A", "a6");
4299   ASSERT_TRUE(s.IsTimedOut());
4300   s = txn2->Delete("B");
4301   ASSERT_TRUE(s.IsTimedOut());
4302   s = txn2->Put("C", "c6");
4303   ASSERT_OK(s);
4304   s = txn2->Put("X", "x6");
4305   ASSERT_OK(s);
4306 
4307   txn1->UndoGetForUpdate("A");
4308   txn1->UndoGetForUpdate("B");
4309   txn1->UndoGetForUpdate("C");
4310   txn1->UndoGetForUpdate("X");
4311 
4312   // Verify B is locked and A and C are not
4313   s = txn2->Put("A", "a7");
4314   ASSERT_OK(s);
4315   s = txn2->Delete("B");
4316   ASSERT_TRUE(s.IsTimedOut());
4317   s = txn2->Put("C", "c7");
4318   ASSERT_OK(s);
4319   s = txn2->Put("X", "x7");
4320   ASSERT_OK(s);
4321 
4322   s = txn2->Commit();
4323   ASSERT_OK(s);
4324   delete txn2;
4325 
4326   s = txn1->Commit();
4327   ASSERT_OK(s);
4328   delete txn1;
4329 }
4330 
TEST_P(TransactionTest,UndoGetForUpdateTest2)4331 TEST_P(TransactionTest, UndoGetForUpdateTest2) {
4332   WriteOptions write_options;
4333   ReadOptions read_options;
4334   TransactionOptions txn_options;
4335   std::string value;
4336   Status s;
4337 
4338   s = db->Put(write_options, "A", "");
4339   ASSERT_OK(s);
4340 
4341   txn_options.lock_timeout = 1;  // 1 ms
4342   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4343   ASSERT_TRUE(txn1);
4344 
4345   s = txn1->GetForUpdate(read_options, "A", &value);
4346   ASSERT_OK(s);
4347   s = txn1->GetForUpdate(read_options, "B", &value);
4348   ASSERT_TRUE(s.IsNotFound());
4349 
4350   s = txn1->Put("F", "f");
4351   ASSERT_OK(s);
4352 
4353   txn1->SetSavePoint();  // 1
4354 
4355   txn1->UndoGetForUpdate("A");
4356 
4357   s = txn1->GetForUpdate(read_options, "C", &value);
4358   ASSERT_TRUE(s.IsNotFound());
4359   s = txn1->GetForUpdate(read_options, "D", &value);
4360   ASSERT_TRUE(s.IsNotFound());
4361 
4362   s = txn1->Put("E", "e");
4363   ASSERT_OK(s);
4364   s = txn1->GetForUpdate(read_options, "E", &value);
4365   ASSERT_OK(s);
4366 
4367   s = txn1->GetForUpdate(read_options, "F", &value);
4368   ASSERT_OK(s);
4369 
4370   // Verify A,B,C,D,E,F are still locked
4371   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4372   s = txn2->Put("A", "a1");
4373   ASSERT_TRUE(s.IsTimedOut());
4374   s = txn2->Put("B", "b1");
4375   ASSERT_TRUE(s.IsTimedOut());
4376   s = txn2->Put("C", "c1");
4377   ASSERT_TRUE(s.IsTimedOut());
4378   s = txn2->Put("D", "d1");
4379   ASSERT_TRUE(s.IsTimedOut());
4380   s = txn2->Put("E", "e1");
4381   ASSERT_TRUE(s.IsTimedOut());
4382   s = txn2->Put("F", "f1");
4383   ASSERT_TRUE(s.IsTimedOut());
4384 
4385   txn1->UndoGetForUpdate("C");
4386   txn1->UndoGetForUpdate("E");
4387 
4388   // Verify A,B,D,E,F are still locked and C is not.
4389   s = txn2->Put("A", "a2");
4390   ASSERT_TRUE(s.IsTimedOut());
4391   s = txn2->Put("B", "b2");
4392   ASSERT_TRUE(s.IsTimedOut());
4393   s = txn2->Put("D", "d2");
4394   ASSERT_TRUE(s.IsTimedOut());
4395   s = txn2->Put("E", "e2");
4396   ASSERT_TRUE(s.IsTimedOut());
4397   s = txn2->Put("F", "f2");
4398   ASSERT_TRUE(s.IsTimedOut());
4399   s = txn2->Put("C", "c2");
4400   ASSERT_OK(s);
4401 
4402   txn1->SetSavePoint();  // 2
4403 
4404   s = txn1->Put("H", "h");
4405   ASSERT_OK(s);
4406 
4407   txn1->UndoGetForUpdate("A");
4408   txn1->UndoGetForUpdate("B");
4409   txn1->UndoGetForUpdate("C");
4410   txn1->UndoGetForUpdate("D");
4411   txn1->UndoGetForUpdate("E");
4412   txn1->UndoGetForUpdate("F");
4413   txn1->UndoGetForUpdate("G");
4414   txn1->UndoGetForUpdate("H");
4415 
4416   // Verify A,B,D,E,F,H are still locked and C,G are not.
4417   s = txn2->Put("A", "a3");
4418   ASSERT_TRUE(s.IsTimedOut());
4419   s = txn2->Put("B", "b3");
4420   ASSERT_TRUE(s.IsTimedOut());
4421   s = txn2->Put("D", "d3");
4422   ASSERT_TRUE(s.IsTimedOut());
4423   s = txn2->Put("E", "e3");
4424   ASSERT_TRUE(s.IsTimedOut());
4425   s = txn2->Put("F", "f3");
4426   ASSERT_TRUE(s.IsTimedOut());
4427   s = txn2->Put("H", "h3");
4428   ASSERT_TRUE(s.IsTimedOut());
4429   s = txn2->Put("C", "c3");
4430   ASSERT_OK(s);
4431   s = txn2->Put("G", "g3");
4432   ASSERT_OK(s);
4433 
4434   txn1->RollbackToSavePoint();  // rollback to 2
4435 
4436   // Verify A,B,D,E,F are still locked and C,G,H are not.
4437   s = txn2->Put("A", "a3");
4438   ASSERT_TRUE(s.IsTimedOut());
4439   s = txn2->Put("B", "b3");
4440   ASSERT_TRUE(s.IsTimedOut());
4441   s = txn2->Put("D", "d3");
4442   ASSERT_TRUE(s.IsTimedOut());
4443   s = txn2->Put("E", "e3");
4444   ASSERT_TRUE(s.IsTimedOut());
4445   s = txn2->Put("F", "f3");
4446   ASSERT_TRUE(s.IsTimedOut());
4447   s = txn2->Put("C", "c3");
4448   ASSERT_OK(s);
4449   s = txn2->Put("G", "g3");
4450   ASSERT_OK(s);
4451   s = txn2->Put("H", "h3");
4452   ASSERT_OK(s);
4453 
4454   txn1->UndoGetForUpdate("A");
4455   txn1->UndoGetForUpdate("B");
4456   txn1->UndoGetForUpdate("C");
4457   txn1->UndoGetForUpdate("D");
4458   txn1->UndoGetForUpdate("E");
4459   txn1->UndoGetForUpdate("F");
4460   txn1->UndoGetForUpdate("G");
4461   txn1->UndoGetForUpdate("H");
4462 
4463   // Verify A,B,E,F are still locked and C,D,G,H are not.
4464   s = txn2->Put("A", "a3");
4465   ASSERT_TRUE(s.IsTimedOut());
4466   s = txn2->Put("B", "b3");
4467   ASSERT_TRUE(s.IsTimedOut());
4468   s = txn2->Put("E", "e3");
4469   ASSERT_TRUE(s.IsTimedOut());
4470   s = txn2->Put("F", "f3");
4471   ASSERT_TRUE(s.IsTimedOut());
4472   s = txn2->Put("C", "c3");
4473   ASSERT_OK(s);
4474   s = txn2->Put("D", "d3");
4475   ASSERT_OK(s);
4476   s = txn2->Put("G", "g3");
4477   ASSERT_OK(s);
4478   s = txn2->Put("H", "h3");
4479   ASSERT_OK(s);
4480 
4481   txn1->RollbackToSavePoint();  // rollback to 1
4482 
4483   // Verify A,B,F are still locked and C,D,E,G,H are not.
4484   s = txn2->Put("A", "a3");
4485   ASSERT_TRUE(s.IsTimedOut());
4486   s = txn2->Put("B", "b3");
4487   ASSERT_TRUE(s.IsTimedOut());
4488   s = txn2->Put("F", "f3");
4489   ASSERT_TRUE(s.IsTimedOut());
4490   s = txn2->Put("C", "c3");
4491   ASSERT_OK(s);
4492   s = txn2->Put("D", "d3");
4493   ASSERT_OK(s);
4494   s = txn2->Put("E", "e3");
4495   ASSERT_OK(s);
4496   s = txn2->Put("G", "g3");
4497   ASSERT_OK(s);
4498   s = txn2->Put("H", "h3");
4499   ASSERT_OK(s);
4500 
4501   txn1->UndoGetForUpdate("A");
4502   txn1->UndoGetForUpdate("B");
4503   txn1->UndoGetForUpdate("C");
4504   txn1->UndoGetForUpdate("D");
4505   txn1->UndoGetForUpdate("E");
4506   txn1->UndoGetForUpdate("F");
4507   txn1->UndoGetForUpdate("G");
4508   txn1->UndoGetForUpdate("H");
4509 
4510   // Verify F is still locked and A,B,C,D,E,G,H are not.
4511   s = txn2->Put("F", "f3");
4512   ASSERT_TRUE(s.IsTimedOut());
4513   s = txn2->Put("A", "a3");
4514   ASSERT_OK(s);
4515   s = txn2->Put("B", "b3");
4516   ASSERT_OK(s);
4517   s = txn2->Put("C", "c3");
4518   ASSERT_OK(s);
4519   s = txn2->Put("D", "d3");
4520   ASSERT_OK(s);
4521   s = txn2->Put("E", "e3");
4522   ASSERT_OK(s);
4523   s = txn2->Put("G", "g3");
4524   ASSERT_OK(s);
4525   s = txn2->Put("H", "h3");
4526   ASSERT_OK(s);
4527 
4528   s = txn1->Commit();
4529   ASSERT_OK(s);
4530   s = txn2->Commit();
4531   ASSERT_OK(s);
4532 
4533   delete txn1;
4534   delete txn2;
4535 }
4536 
TEST_P(TransactionTest,TimeoutTest)4537 TEST_P(TransactionTest, TimeoutTest) {
4538   WriteOptions write_options;
4539   ReadOptions read_options;
4540   std::string value;
4541   Status s;
4542 
4543   delete db;
4544   db = nullptr;
4545 
4546   // transaction writes have an infinite timeout,
4547   // but we will override this when we start a txn
4548   // db writes have infinite timeout
4549   txn_db_options.transaction_lock_timeout = -1;
4550   txn_db_options.default_lock_timeout = -1;
4551 
4552   s = TransactionDB::Open(options, txn_db_options, dbname, &db);
4553   assert(db != nullptr);
4554   ASSERT_OK(s);
4555 
4556   s = db->Put(write_options, "aaa", "aaa");
4557   ASSERT_OK(s);
4558 
4559   TransactionOptions txn_options0;
4560   txn_options0.expiration = 100;  // 100ms
4561   txn_options0.lock_timeout = 50;  // txn timeout no longer infinite
4562   Transaction* txn1 = db->BeginTransaction(write_options, txn_options0);
4563 
4564   s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4565   ASSERT_OK(s);
4566 
4567   // Conflicts with previous GetForUpdate.
4568   // Since db writes do not have a timeout, this should eventually succeed when
4569   // the transaction expires.
4570   s = db->Put(write_options, "aaa", "xxx");
4571   ASSERT_OK(s);
4572 
4573   ASSERT_GE(txn1->GetElapsedTime(),
4574             static_cast<uint64_t>(txn_options0.expiration));
4575 
4576   s = txn1->Commit();
4577   ASSERT_TRUE(s.IsExpired());  // expired!
4578 
4579   s = db->Get(read_options, "aaa", &value);
4580   ASSERT_OK(s);
4581   ASSERT_EQ("xxx", value);
4582 
4583   delete txn1;
4584   delete db;
4585 
4586   // transaction writes have 10ms timeout,
4587   // db writes have infinite timeout
4588   txn_db_options.transaction_lock_timeout = 50;
4589   txn_db_options.default_lock_timeout = -1;
4590 
4591   s = TransactionDB::Open(options, txn_db_options, dbname, &db);
4592   ASSERT_OK(s);
4593 
4594   s = db->Put(write_options, "aaa", "aaa");
4595   ASSERT_OK(s);
4596 
4597   TransactionOptions txn_options;
4598   txn_options.expiration = 100;  // 100ms
4599   txn1 = db->BeginTransaction(write_options, txn_options);
4600 
4601   s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4602   ASSERT_OK(s);
4603 
4604   // Conflicts with previous GetForUpdate.
4605   // Since db writes do not have a timeout, this should eventually succeed when
4606   // the transaction expires.
4607   s = db->Put(write_options, "aaa", "xxx");
4608   ASSERT_OK(s);
4609 
4610   s = txn1->Commit();
4611   ASSERT_NOK(s);  // expired!
4612 
4613   s = db->Get(read_options, "aaa", &value);
4614   ASSERT_OK(s);
4615   ASSERT_EQ("xxx", value);
4616 
4617   delete txn1;
4618   txn_options.expiration = 6000000;  // 100 minutes
4619   txn_options.lock_timeout = 1;      // 1ms
4620   txn1 = db->BeginTransaction(write_options, txn_options);
4621   txn1->SetLockTimeout(100);
4622 
4623   TransactionOptions txn_options2;
4624   txn_options2.expiration = 10;  // 10ms
4625   Transaction* txn2 = db->BeginTransaction(write_options, txn_options2);
4626   ASSERT_OK(s);
4627 
4628   s = txn2->Put("a", "2");
4629   ASSERT_OK(s);
4630 
4631   // txn1 has a lock timeout longer than txn2's expiration, so it will win
4632   s = txn1->Delete("a");
4633   ASSERT_OK(s);
4634 
4635   s = txn1->Commit();
4636   ASSERT_OK(s);
4637 
4638   // txn2 should be expired out since txn1 waiting until its timeout expired.
4639   s = txn2->Commit();
4640   ASSERT_TRUE(s.IsExpired());
4641 
4642   delete txn1;
4643   delete txn2;
4644   txn_options.expiration = 6000000;  // 100 minutes
4645   txn1 = db->BeginTransaction(write_options, txn_options);
4646   txn_options2.expiration = 100000000;
4647   txn2 = db->BeginTransaction(write_options, txn_options2);
4648 
4649   s = txn1->Delete("asdf");
4650   ASSERT_OK(s);
4651 
4652   // txn2 has a smaller lock timeout than txn1's expiration, so it will time out
4653   s = txn2->Delete("asdf");
4654   ASSERT_TRUE(s.IsTimedOut());
4655   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
4656 
4657   s = txn1->Commit();
4658   ASSERT_OK(s);
4659 
4660   s = txn2->Put("asdf", "asdf");
4661   ASSERT_OK(s);
4662 
4663   s = txn2->Commit();
4664   ASSERT_OK(s);
4665 
4666   s = db->Get(read_options, "asdf", &value);
4667   ASSERT_OK(s);
4668   ASSERT_EQ("asdf", value);
4669 
4670   delete txn1;
4671   delete txn2;
4672 }
4673 
TEST_P(TransactionTest,SingleDeleteTest)4674 TEST_P(TransactionTest, SingleDeleteTest) {
4675   WriteOptions write_options;
4676   ReadOptions read_options;
4677   std::string value;
4678   Status s;
4679 
4680   Transaction* txn = db->BeginTransaction(write_options);
4681   ASSERT_TRUE(txn);
4682 
4683   s = txn->SingleDelete("A");
4684   ASSERT_OK(s);
4685 
4686   s = txn->Get(read_options, "A", &value);
4687   ASSERT_TRUE(s.IsNotFound());
4688 
4689   s = txn->Commit();
4690   ASSERT_OK(s);
4691   delete txn;
4692 
4693   txn = db->BeginTransaction(write_options);
4694 
4695   s = txn->SingleDelete("A");
4696   ASSERT_OK(s);
4697 
4698   s = txn->Put("A", "a");
4699   ASSERT_OK(s);
4700 
4701   s = txn->Get(read_options, "A", &value);
4702   ASSERT_OK(s);
4703   ASSERT_EQ("a", value);
4704 
4705   s = txn->Commit();
4706   ASSERT_OK(s);
4707   delete txn;
4708 
4709   s = db->Get(read_options, "A", &value);
4710   ASSERT_OK(s);
4711   ASSERT_EQ("a", value);
4712 
4713   txn = db->BeginTransaction(write_options);
4714 
4715   s = txn->SingleDelete("A");
4716   ASSERT_OK(s);
4717 
4718   s = txn->Get(read_options, "A", &value);
4719   ASSERT_TRUE(s.IsNotFound());
4720 
4721   s = txn->Commit();
4722   ASSERT_OK(s);
4723   delete txn;
4724 
4725   s = db->Get(read_options, "A", &value);
4726   ASSERT_TRUE(s.IsNotFound());
4727 
4728   txn = db->BeginTransaction(write_options);
4729   Transaction* txn2 = db->BeginTransaction(write_options);
4730   txn2->SetSnapshot();
4731 
4732   s = txn->Put("A", "a");
4733   ASSERT_OK(s);
4734 
4735   s = txn->Put("A", "a2");
4736   ASSERT_OK(s);
4737 
4738   s = txn->SingleDelete("A");
4739   ASSERT_OK(s);
4740 
4741   s = txn->SingleDelete("B");
4742   ASSERT_OK(s);
4743 
4744   // According to db.h, doing a SingleDelete on a key that has been
4745   // overwritten will have undefinied behavior.  So it is unclear what the
4746   // result of fetching "A" should be. The current implementation will
4747   // return NotFound in this case.
4748   s = txn->Get(read_options, "A", &value);
4749   ASSERT_TRUE(s.IsNotFound());
4750 
4751   s = txn2->Put("B", "b");
4752   ASSERT_TRUE(s.IsTimedOut());
4753   s = txn2->Commit();
4754   ASSERT_OK(s);
4755   delete txn2;
4756 
4757   s = txn->Commit();
4758   ASSERT_OK(s);
4759   delete txn;
4760 
4761   // According to db.h, doing a SingleDelete on a key that has been
4762   // overwritten will have undefinied behavior.  So it is unclear what the
4763   // result of fetching "A" should be. The current implementation will
4764   // return NotFound in this case.
4765   s = db->Get(read_options, "A", &value);
4766   ASSERT_TRUE(s.IsNotFound());
4767 
4768   s = db->Get(read_options, "B", &value);
4769   ASSERT_TRUE(s.IsNotFound());
4770 }
4771 
TEST_P(TransactionTest,MergeTest)4772 TEST_P(TransactionTest, MergeTest) {
4773   WriteOptions write_options;
4774   ReadOptions read_options;
4775   std::string value;
4776   Status s;
4777 
4778   Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
4779   ASSERT_TRUE(txn);
4780 
4781   s = db->Put(write_options, "A", "a0");
4782   ASSERT_OK(s);
4783 
4784   s = txn->Merge("A", "1");
4785   ASSERT_OK(s);
4786 
4787   s = txn->Merge("A", "2");
4788   ASSERT_OK(s);
4789 
4790   s = txn->Get(read_options, "A", &value);
4791   ASSERT_TRUE(s.IsMergeInProgress());
4792 
4793   s = txn->Put("A", "a");
4794   ASSERT_OK(s);
4795 
4796   s = txn->Get(read_options, "A", &value);
4797   ASSERT_OK(s);
4798   ASSERT_EQ("a", value);
4799 
4800   s = txn->Merge("A", "3");
4801   ASSERT_OK(s);
4802 
4803   s = txn->Get(read_options, "A", &value);
4804   ASSERT_TRUE(s.IsMergeInProgress());
4805 
4806   TransactionOptions txn_options;
4807   txn_options.lock_timeout = 1;  // 1 ms
4808   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4809   ASSERT_TRUE(txn2);
4810 
4811   // verify that txn has "A" locked
4812   s = txn2->Merge("A", "4");
4813   ASSERT_TRUE(s.IsTimedOut());
4814 
4815   s = txn2->Commit();
4816   ASSERT_OK(s);
4817   delete txn2;
4818 
4819   s = txn->Commit();
4820   ASSERT_OK(s);
4821   delete txn;
4822 
4823   s = db->Get(read_options, "A", &value);
4824   ASSERT_OK(s);
4825   ASSERT_EQ("a,3", value);
4826 }
4827 
TEST_P(TransactionTest,DeferSnapshotTest)4828 TEST_P(TransactionTest, DeferSnapshotTest) {
4829   WriteOptions write_options;
4830   ReadOptions read_options;
4831   std::string value;
4832   Status s;
4833 
4834   s = db->Put(write_options, "A", "a0");
4835   ASSERT_OK(s);
4836 
4837   Transaction* txn1 = db->BeginTransaction(write_options);
4838   Transaction* txn2 = db->BeginTransaction(write_options);
4839 
4840   txn1->SetSnapshotOnNextOperation();
4841   auto snapshot = txn1->GetSnapshot();
4842   ASSERT_FALSE(snapshot);
4843 
4844   s = txn2->Put("A", "a2");
4845   ASSERT_OK(s);
4846   s = txn2->Commit();
4847   ASSERT_OK(s);
4848   delete txn2;
4849 
4850   s = txn1->GetForUpdate(read_options, "A", &value);
4851   // Should not conflict with txn2 since snapshot wasn't set until
4852   // GetForUpdate was called.
4853   ASSERT_OK(s);
4854   ASSERT_EQ("a2", value);
4855 
4856   s = txn1->Put("A", "a1");
4857   ASSERT_OK(s);
4858 
4859   s = db->Put(write_options, "B", "b0");
4860   ASSERT_OK(s);
4861 
4862   // Cannot lock B since it was written after the snapshot was set
4863   s = txn1->Put("B", "b1");
4864   ASSERT_TRUE(s.IsBusy());
4865 
4866   s = txn1->Commit();
4867   ASSERT_OK(s);
4868   delete txn1;
4869 
4870   s = db->Get(read_options, "A", &value);
4871   ASSERT_OK(s);
4872   ASSERT_EQ("a1", value);
4873 
4874   s = db->Get(read_options, "B", &value);
4875   ASSERT_OK(s);
4876   ASSERT_EQ("b0", value);
4877 }
4878 
TEST_P(TransactionTest,DeferSnapshotTest2)4879 TEST_P(TransactionTest, DeferSnapshotTest2) {
4880   WriteOptions write_options;
4881   ReadOptions read_options, snapshot_read_options;
4882   std::string value;
4883   Status s;
4884 
4885   Transaction* txn1 = db->BeginTransaction(write_options);
4886 
4887   txn1->SetSnapshot();
4888 
4889   s = txn1->Put("A", "a1");
4890   ASSERT_OK(s);
4891 
4892   s = db->Put(write_options, "C", "c0");
4893   ASSERT_OK(s);
4894   s = db->Put(write_options, "D", "d0");
4895   ASSERT_OK(s);
4896 
4897   snapshot_read_options.snapshot = txn1->GetSnapshot();
4898 
4899   txn1->SetSnapshotOnNextOperation();
4900 
4901   s = txn1->Get(snapshot_read_options, "C", &value);
4902   // Snapshot was set before C was written
4903   ASSERT_TRUE(s.IsNotFound());
4904   s = txn1->Get(snapshot_read_options, "D", &value);
4905   // Snapshot was set before D was written
4906   ASSERT_TRUE(s.IsNotFound());
4907 
4908   // Snapshot should not have changed yet.
4909   snapshot_read_options.snapshot = txn1->GetSnapshot();
4910 
4911   s = txn1->Get(snapshot_read_options, "C", &value);
4912   // Snapshot was set before C was written
4913   ASSERT_TRUE(s.IsNotFound());
4914   s = txn1->Get(snapshot_read_options, "D", &value);
4915   // Snapshot was set before D was written
4916   ASSERT_TRUE(s.IsNotFound());
4917 
4918   s = txn1->GetForUpdate(read_options, "C", &value);
4919   ASSERT_OK(s);
4920   ASSERT_EQ("c0", value);
4921 
4922   s = db->Put(write_options, "D", "d00");
4923   ASSERT_OK(s);
4924 
4925   // Snapshot is now set
4926   snapshot_read_options.snapshot = txn1->GetSnapshot();
4927   s = txn1->Get(snapshot_read_options, "D", &value);
4928   ASSERT_OK(s);
4929   ASSERT_EQ("d0", value);
4930 
4931   s = txn1->Commit();
4932   ASSERT_OK(s);
4933   delete txn1;
4934 }
4935 
TEST_P(TransactionTest,DeferSnapshotSavePointTest)4936 TEST_P(TransactionTest, DeferSnapshotSavePointTest) {
4937   WriteOptions write_options;
4938   ReadOptions read_options, snapshot_read_options;
4939   std::string value;
4940   Status s;
4941 
4942   Transaction* txn1 = db->BeginTransaction(write_options);
4943 
4944   txn1->SetSavePoint();  // 1
4945 
4946   s = db->Put(write_options, "T", "1");
4947   ASSERT_OK(s);
4948 
4949   txn1->SetSnapshotOnNextOperation();
4950 
4951   s = db->Put(write_options, "T", "2");
4952   ASSERT_OK(s);
4953 
4954   txn1->SetSavePoint();  // 2
4955 
4956   s = db->Put(write_options, "T", "3");
4957   ASSERT_OK(s);
4958 
4959   s = txn1->Put("A", "a");
4960   ASSERT_OK(s);
4961 
4962   txn1->SetSavePoint();  // 3
4963 
4964   s = db->Put(write_options, "T", "4");
4965   ASSERT_OK(s);
4966 
4967   txn1->SetSnapshot();
4968   txn1->SetSnapshotOnNextOperation();
4969 
4970   txn1->SetSavePoint();  // 4
4971 
4972   s = db->Put(write_options, "T", "5");
4973   ASSERT_OK(s);
4974 
4975   snapshot_read_options.snapshot = txn1->GetSnapshot();
4976   s = txn1->Get(snapshot_read_options, "T", &value);
4977   ASSERT_OK(s);
4978   ASSERT_EQ("4", value);
4979 
4980   s = txn1->Put("A", "a1");
4981   ASSERT_OK(s);
4982 
4983   snapshot_read_options.snapshot = txn1->GetSnapshot();
4984   s = txn1->Get(snapshot_read_options, "T", &value);
4985   ASSERT_OK(s);
4986   ASSERT_EQ("5", value);
4987 
4988   s = txn1->RollbackToSavePoint();  // Rollback to 4
4989   ASSERT_OK(s);
4990 
4991   snapshot_read_options.snapshot = txn1->GetSnapshot();
4992   s = txn1->Get(snapshot_read_options, "T", &value);
4993   ASSERT_OK(s);
4994   ASSERT_EQ("4", value);
4995 
4996   s = txn1->RollbackToSavePoint();  // Rollback to 3
4997   ASSERT_OK(s);
4998 
4999   snapshot_read_options.snapshot = txn1->GetSnapshot();
5000   s = txn1->Get(snapshot_read_options, "T", &value);
5001   ASSERT_OK(s);
5002   ASSERT_EQ("3", value);
5003 
5004   s = txn1->Get(read_options, "T", &value);
5005   ASSERT_OK(s);
5006   ASSERT_EQ("5", value);
5007 
5008   s = txn1->RollbackToSavePoint();  // Rollback to 2
5009   ASSERT_OK(s);
5010 
5011   snapshot_read_options.snapshot = txn1->GetSnapshot();
5012   ASSERT_FALSE(snapshot_read_options.snapshot);
5013   s = txn1->Get(snapshot_read_options, "T", &value);
5014   ASSERT_OK(s);
5015   ASSERT_EQ("5", value);
5016 
5017   s = txn1->Delete("A");
5018   ASSERT_OK(s);
5019 
5020   snapshot_read_options.snapshot = txn1->GetSnapshot();
5021   ASSERT_TRUE(snapshot_read_options.snapshot);
5022   s = txn1->Get(snapshot_read_options, "T", &value);
5023   ASSERT_OK(s);
5024   ASSERT_EQ("5", value);
5025 
5026   s = txn1->RollbackToSavePoint();  // Rollback to 1
5027   ASSERT_OK(s);
5028 
5029   s = txn1->Delete("A");
5030   ASSERT_OK(s);
5031 
5032   snapshot_read_options.snapshot = txn1->GetSnapshot();
5033   ASSERT_FALSE(snapshot_read_options.snapshot);
5034   s = txn1->Get(snapshot_read_options, "T", &value);
5035   ASSERT_OK(s);
5036   ASSERT_EQ("5", value);
5037 
5038   s = txn1->Commit();
5039   ASSERT_OK(s);
5040 
5041   delete txn1;
5042 }
5043 
TEST_P(TransactionTest,SetSnapshotOnNextOperationWithNotification)5044 TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
5045   WriteOptions write_options;
5046   ReadOptions read_options;
5047   std::string value;
5048 
5049   class Notifier : public TransactionNotifier {
5050    private:
5051     const Snapshot** snapshot_ptr_;
5052 
5053    public:
5054     explicit Notifier(const Snapshot** snapshot_ptr)
5055         : snapshot_ptr_(snapshot_ptr) {}
5056 
5057     void SnapshotCreated(const Snapshot* newSnapshot) override {
5058       *snapshot_ptr_ = newSnapshot;
5059     }
5060   };
5061 
5062   std::shared_ptr<Notifier> notifier =
5063       std::make_shared<Notifier>(&read_options.snapshot);
5064   Status s;
5065 
5066   s = db->Put(write_options, "B", "0");
5067   ASSERT_OK(s);
5068 
5069   Transaction* txn1 = db->BeginTransaction(write_options);
5070 
5071   txn1->SetSnapshotOnNextOperation(notifier);
5072   ASSERT_FALSE(read_options.snapshot);
5073 
5074   s = db->Put(write_options, "B", "1");
5075   ASSERT_OK(s);
5076 
5077   // A Get does not generate the snapshot
5078   s = txn1->Get(read_options, "B", &value);
5079   ASSERT_OK(s);
5080   ASSERT_FALSE(read_options.snapshot);
5081   ASSERT_EQ(value, "1");
5082 
5083   // Any other operation does
5084   s = txn1->Put("A", "0");
5085   ASSERT_OK(s);
5086 
5087   // Now change "B".
5088   s = db->Put(write_options, "B", "2");
5089   ASSERT_OK(s);
5090 
5091   // The original value should still be read
5092   s = txn1->Get(read_options, "B", &value);
5093   ASSERT_OK(s);
5094   ASSERT_TRUE(read_options.snapshot);
5095   ASSERT_EQ(value, "1");
5096 
5097   s = txn1->Commit();
5098   ASSERT_OK(s);
5099 
5100   delete txn1;
5101 }
5102 
TEST_P(TransactionTest,ClearSnapshotTest)5103 TEST_P(TransactionTest, ClearSnapshotTest) {
5104   WriteOptions write_options;
5105   ReadOptions read_options, snapshot_read_options;
5106   std::string value;
5107   Status s;
5108 
5109   s = db->Put(write_options, "foo", "0");
5110   ASSERT_OK(s);
5111 
5112   Transaction* txn = db->BeginTransaction(write_options);
5113   ASSERT_TRUE(txn);
5114 
5115   s = db->Put(write_options, "foo", "1");
5116   ASSERT_OK(s);
5117 
5118   snapshot_read_options.snapshot = txn->GetSnapshot();
5119   ASSERT_FALSE(snapshot_read_options.snapshot);
5120 
5121   // No snapshot created yet
5122   s = txn->Get(snapshot_read_options, "foo", &value);
5123   ASSERT_EQ(value, "1");
5124 
5125   txn->SetSnapshot();
5126   snapshot_read_options.snapshot = txn->GetSnapshot();
5127   ASSERT_TRUE(snapshot_read_options.snapshot);
5128 
5129   s = db->Put(write_options, "foo", "2");
5130   ASSERT_OK(s);
5131 
5132   // Snapshot was created before change to '2'
5133   s = txn->Get(snapshot_read_options, "foo", &value);
5134   ASSERT_EQ(value, "1");
5135 
5136   txn->ClearSnapshot();
5137   snapshot_read_options.snapshot = txn->GetSnapshot();
5138   ASSERT_FALSE(snapshot_read_options.snapshot);
5139 
5140   // Snapshot has now been cleared
5141   s = txn->Get(snapshot_read_options, "foo", &value);
5142   ASSERT_EQ(value, "2");
5143 
5144   s = txn->Commit();
5145   ASSERT_OK(s);
5146 
5147   delete txn;
5148 }
5149 
TEST_P(TransactionTest,ToggleAutoCompactionTest)5150 TEST_P(TransactionTest, ToggleAutoCompactionTest) {
5151   Status s;
5152 
5153   ColumnFamilyHandle *cfa, *cfb;
5154   ColumnFamilyOptions cf_options;
5155 
5156   // Create 2 new column families
5157   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
5158   ASSERT_OK(s);
5159   s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
5160   ASSERT_OK(s);
5161 
5162   delete cfa;
5163   delete cfb;
5164   delete db;
5165 
5166   // open DB with three column families
5167   std::vector<ColumnFamilyDescriptor> column_families;
5168   // have to open default column family
5169   column_families.push_back(
5170       ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
5171   // open the new column families
5172   column_families.push_back(
5173       ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
5174   column_families.push_back(
5175       ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
5176 
5177   ColumnFamilyOptions* cf_opt_default = &column_families[0].options;
5178   ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options;
5179   ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options;
5180   cf_opt_default->disable_auto_compactions = false;
5181   cf_opt_cfa->disable_auto_compactions = true;
5182   cf_opt_cfb->disable_auto_compactions = false;
5183 
5184   std::vector<ColumnFamilyHandle*> handles;
5185 
5186   s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
5187                           &handles, &db);
5188   ASSERT_OK(s);
5189 
5190   auto cfh_default = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[0]);
5191   auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions();
5192 
5193   auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[1]);
5194   auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions();
5195 
5196   auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[2]);
5197   auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions();
5198 
5199   ASSERT_EQ(opt_default.disable_auto_compactions, false);
5200   ASSERT_EQ(opt_a.disable_auto_compactions, true);
5201   ASSERT_EQ(opt_b.disable_auto_compactions, false);
5202 
5203   for (auto handle : handles) {
5204     delete handle;
5205   }
5206 }
5207 
TEST_P(TransactionStressTest,ExpiredTransactionDataRace1)5208 TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) {
5209   // In this test, txn1 should succeed committing,
5210   // as the callback is called after txn1 starts committing.
5211   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5212       {{"TransactionTest::ExpirableTransactionDataRace:1"}});
5213   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5214       "TransactionTest::ExpirableTransactionDataRace:1", [&](void* /*arg*/) {
5215         WriteOptions write_options;
5216         TransactionOptions txn_options;
5217 
5218         // Force txn1 to expire
5219         /* sleep override */
5220         std::this_thread::sleep_for(std::chrono::milliseconds(150));
5221 
5222         Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
5223         Status s;
5224         s = txn2->Put("X", "2");
5225         ASSERT_TRUE(s.IsTimedOut());
5226         s = txn2->Commit();
5227         ASSERT_OK(s);
5228         delete txn2;
5229       });
5230 
5231   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5232 
5233   WriteOptions write_options;
5234   TransactionOptions txn_options;
5235 
5236   txn_options.expiration = 100;
5237   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
5238 
5239   Status s;
5240   s = txn1->Put("X", "1");
5241   ASSERT_OK(s);
5242   s = txn1->Commit();
5243   ASSERT_OK(s);
5244 
5245   ReadOptions read_options;
5246   string value;
5247   s = db->Get(read_options, "X", &value);
5248   ASSERT_EQ("1", value);
5249 
5250   delete txn1;
5251   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5252 }
5253 
5254 #ifndef ROCKSDB_VALGRIND_RUN
5255 namespace {
5256 // cmt_delay_ms is the delay between prepare and commit
5257 // first_id is the id of the first transaction
TransactionStressTestInserter(TransactionDB * db,const size_t num_transactions,const size_t num_sets,const size_t num_keys_per_set,Random64 * rand,const uint64_t cmt_delay_ms=0,const uint64_t first_id=0)5258 Status TransactionStressTestInserter(
5259     TransactionDB* db, const size_t num_transactions, const size_t num_sets,
5260     const size_t num_keys_per_set, Random64* rand,
5261     const uint64_t cmt_delay_ms = 0, const uint64_t first_id = 0) {
5262   WriteOptions write_options;
5263   ReadOptions read_options;
5264   TransactionOptions txn_options;
5265   if (rand->OneIn(2)) {
5266     txn_options.use_only_the_last_commit_time_batch_for_recovery = true;
5267   }
5268   // Inside the inserter we might also retake the snapshot. We do both since two
5269   // separte functions are engaged for each.
5270   txn_options.set_snapshot = rand->OneIn(2);
5271 
5272   RandomTransactionInserter inserter(
5273       rand, write_options, read_options, num_keys_per_set,
5274       static_cast<uint16_t>(num_sets), cmt_delay_ms, first_id);
5275 
5276   for (size_t t = 0; t < num_transactions; t++) {
5277     bool success = inserter.TransactionDBInsert(db, txn_options);
5278     if (!success) {
5279       // unexpected failure
5280       return inserter.GetLastStatus();
5281     }
5282   }
5283 
5284   // Make sure at least some of the transactions succeeded.  It's ok if
5285   // some failed due to write-conflicts.
5286   if (num_transactions != 1 &&
5287       inserter.GetFailureCount() > num_transactions / 2) {
5288     return Status::TryAgain("Too many transactions failed! " +
5289                             std::to_string(inserter.GetFailureCount()) + " / " +
5290                             std::to_string(num_transactions));
5291   }
5292 
5293   return Status::OK();
5294 }
5295 }  // namespace
5296 
5297 // Worker threads add a number to a key from each set of keys. The checker
5298 // threads verify that the sum of all keys in each set are equal.
TEST_P(MySQLStyleTransactionTest,TransactionStressTest)5299 TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
5300   // Small write buffer to trigger more compactions
5301   options.write_buffer_size = 1024;
5302   ReOpenNoDelete();
5303   const size_t num_workers = 4;   // worker threads count
5304   const size_t num_checkers = 2;  // checker threads count
5305   const size_t num_slow_checkers = 2;  // checker threads emulating backups
5306   const size_t num_slow_workers = 1;   // slow worker threads count
5307   const size_t num_transactions_per_thread = 10000;
5308   const uint16_t num_sets = 3;
5309   const size_t num_keys_per_set = 100;
5310   // Setting the key-space to be 100 keys should cause enough write-conflicts
5311   // to make this test interesting.
5312 
5313   std::vector<port::Thread> threads;
5314   std::atomic<uint32_t> finished = {0};
5315   bool TAKE_SNAPSHOT = true;
5316   uint64_t time_seed = env->NowMicros();
5317   printf("time_seed is %" PRIu64 "\n", time_seed);  // would help to reproduce
5318 
5319   std::function<void()> call_inserter = [&] {
5320     size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5321     Random64 rand(time_seed * thd_seed);
5322     ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread,
5323                                             num_sets, num_keys_per_set, &rand));
5324     finished++;
5325   };
5326   std::function<void()> call_checker = [&] {
5327     size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5328     Random64 rand(time_seed * thd_seed);
5329     // Verify that data is consistent
5330     while (finished < num_workers) {
5331       Status s = RandomTransactionInserter::Verify(
5332           db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand);
5333       ASSERT_OK(s);
5334     }
5335   };
5336   std::function<void()> call_slow_checker = [&] {
5337     size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5338     Random64 rand(time_seed * thd_seed);
5339     // Verify that data is consistent
5340     while (finished < num_workers) {
5341       uint64_t delay_ms = rand.Uniform(100) + 1;
5342       Status s = RandomTransactionInserter::Verify(
5343           db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand, delay_ms);
5344       ASSERT_OK(s);
5345     }
5346   };
5347   std::function<void()> call_slow_inserter = [&] {
5348     size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5349     Random64 rand(time_seed * thd_seed);
5350     uint64_t id = 0;
5351     // Verify that data is consistent
5352     while (finished < num_workers) {
5353       uint64_t delay_ms = rand.Uniform(500) + 1;
5354       ASSERT_OK(TransactionStressTestInserter(db, 1, num_sets, num_keys_per_set,
5355                                               &rand, delay_ms, id++));
5356     }
5357   };
5358 
5359   for (uint32_t i = 0; i < num_workers; i++) {
5360     threads.emplace_back(call_inserter);
5361   }
5362   for (uint32_t i = 0; i < num_checkers; i++) {
5363     threads.emplace_back(call_checker);
5364   }
5365   if (with_slow_threads_) {
5366     for (uint32_t i = 0; i < num_slow_checkers; i++) {
5367       threads.emplace_back(call_slow_checker);
5368     }
5369     for (uint32_t i = 0; i < num_slow_workers; i++) {
5370       threads.emplace_back(call_slow_inserter);
5371     }
5372   }
5373 
5374   // Wait for all threads to finish
5375   for (auto& t : threads) {
5376     t.join();
5377   }
5378 
5379   // Verify that data is consistent
5380   Status s = RandomTransactionInserter::Verify(db, num_sets, num_keys_per_set,
5381                                                !TAKE_SNAPSHOT);
5382   ASSERT_OK(s);
5383 }
5384 #endif  // ROCKSDB_VALGRIND_RUN
5385 
TEST_P(TransactionTest,MemoryLimitTest)5386 TEST_P(TransactionTest, MemoryLimitTest) {
5387   TransactionOptions txn_options;
5388   // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
5389   txn_options.max_write_batch_size = 29;
5390   // Set threshold to unlimited so that the write batch does not get flushed,
5391   // and can hit the memory limit.
5392   txn_options.write_batch_flush_threshold = 0;
5393   std::string value;
5394   Status s;
5395 
5396   Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options);
5397   ASSERT_TRUE(txn);
5398 
5399   ASSERT_EQ(0, txn->GetNumPuts());
5400   ASSERT_LE(0, txn->GetID());
5401 
5402   s = txn->Put(Slice("a"), Slice("...."));
5403   ASSERT_OK(s);
5404   ASSERT_EQ(1, txn->GetNumPuts());
5405 
5406   s = txn->Put(Slice("b"), Slice("...."));
5407   ASSERT_OK(s);
5408   ASSERT_EQ(2, txn->GetNumPuts());
5409 
5410   s = txn->Put(Slice("b"), Slice("...."));
5411   ASSERT_TRUE(s.IsMemoryLimit());
5412   ASSERT_EQ(2, txn->GetNumPuts());
5413 
5414   txn->Rollback();
5415   delete txn;
5416 }
5417 
5418 // This test clarifies the existing expectation from the sequence number
5419 // algorithm. It could detect mistakes in updating the code but it is not
5420 // necessarily the one acceptable way. If the algorithm is legitimately changed,
5421 // this unit test should be updated as well.
TEST_P(TransactionStressTest,SeqAdvanceTest)5422 TEST_P(TransactionStressTest, SeqAdvanceTest) {
5423   // TODO(myabandeh): must be test with false before new releases
5424   const bool short_test = true;
5425   WriteOptions wopts;
5426   FlushOptions fopt;
5427 
5428   options.disable_auto_compactions = true;
5429   ASSERT_OK(ReOpen());
5430 
5431   // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
5432   // of the branches. This is the same as counting a binary number where i-th
5433   // bit represents whether we take branch i in the represented by the number.
5434   const size_t NUM_BRANCHES = short_test ? 6 : 10;
5435   // Helper function that shows if the branch is to be taken in the run
5436   // represented by the number n.
5437   auto branch_do = [&](size_t n, size_t* branch) {
5438     assert(*branch < NUM_BRANCHES);
5439     const size_t filter = static_cast<size_t>(1) << *branch;
5440     return n & filter;
5441   };
5442   const size_t max_n = static_cast<size_t>(1) << NUM_BRANCHES;
5443   for (size_t n = 0; n < max_n; n++) {
5444     DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5445     size_t branch = 0;
5446     auto seq = db_impl->GetLatestSequenceNumber();
5447     exp_seq = seq;
5448     txn_t0(0);
5449     seq = db_impl->TEST_GetLastVisibleSequence();
5450     ASSERT_EQ(exp_seq, seq);
5451 
5452     if (branch_do(n, &branch)) {
5453       ASSERT_OK(db_impl->Flush(fopt));
5454       seq = db_impl->TEST_GetLastVisibleSequence();
5455       ASSERT_EQ(exp_seq, seq);
5456     }
5457     if (!short_test && branch_do(n, &branch)) {
5458       ASSERT_OK(db_impl->FlushWAL(true));
5459       ASSERT_OK(ReOpenNoDelete());
5460       db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5461       seq = db_impl->GetLatestSequenceNumber();
5462       ASSERT_EQ(exp_seq, seq);
5463     }
5464 
5465     // Doing it twice might detect some bugs
5466     txn_t0(1);
5467     seq = db_impl->TEST_GetLastVisibleSequence();
5468     ASSERT_EQ(exp_seq, seq);
5469 
5470     txn_t1(0);
5471     seq = db_impl->TEST_GetLastVisibleSequence();
5472     ASSERT_EQ(exp_seq, seq);
5473 
5474     if (branch_do(n, &branch)) {
5475       ASSERT_OK(db_impl->Flush(fopt));
5476       seq = db_impl->TEST_GetLastVisibleSequence();
5477       ASSERT_EQ(exp_seq, seq);
5478     }
5479     if (!short_test && branch_do(n, &branch)) {
5480       ASSERT_OK(db_impl->FlushWAL(true));
5481       ASSERT_OK(ReOpenNoDelete());
5482       db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5483       seq = db_impl->GetLatestSequenceNumber();
5484       ASSERT_EQ(exp_seq, seq);
5485     }
5486 
5487     txn_t3(0);
5488     seq = db_impl->TEST_GetLastVisibleSequence();
5489     ASSERT_EQ(exp_seq, seq);
5490 
5491     if (branch_do(n, &branch)) {
5492       ASSERT_OK(db_impl->Flush(fopt));
5493       seq = db_impl->TEST_GetLastVisibleSequence();
5494       ASSERT_EQ(exp_seq, seq);
5495     }
5496     if (!short_test && branch_do(n, &branch)) {
5497       ASSERT_OK(db_impl->FlushWAL(true));
5498       ASSERT_OK(ReOpenNoDelete());
5499       db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5500       seq = db_impl->GetLatestSequenceNumber();
5501       ASSERT_EQ(exp_seq, seq);
5502     }
5503 
5504     txn_t4(0);
5505     seq = db_impl->TEST_GetLastVisibleSequence();
5506 
5507     ASSERT_EQ(exp_seq, seq);
5508 
5509     if (branch_do(n, &branch)) {
5510       ASSERT_OK(db_impl->Flush(fopt));
5511       seq = db_impl->TEST_GetLastVisibleSequence();
5512       ASSERT_EQ(exp_seq, seq);
5513     }
5514     if (!short_test && branch_do(n, &branch)) {
5515       ASSERT_OK(db_impl->FlushWAL(true));
5516       ASSERT_OK(ReOpenNoDelete());
5517       db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5518       seq = db_impl->GetLatestSequenceNumber();
5519       ASSERT_EQ(exp_seq, seq);
5520     }
5521 
5522     txn_t2(0);
5523     seq = db_impl->TEST_GetLastVisibleSequence();
5524     ASSERT_EQ(exp_seq, seq);
5525 
5526     if (branch_do(n, &branch)) {
5527       ASSERT_OK(db_impl->Flush(fopt));
5528       seq = db_impl->TEST_GetLastVisibleSequence();
5529       ASSERT_EQ(exp_seq, seq);
5530     }
5531     if (!short_test && branch_do(n, &branch)) {
5532       ASSERT_OK(db_impl->FlushWAL(true));
5533       ASSERT_OK(ReOpenNoDelete());
5534       db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5535       seq = db_impl->GetLatestSequenceNumber();
5536       ASSERT_EQ(exp_seq, seq);
5537     }
5538     ASSERT_OK(ReOpen());
5539   }
5540 }
5541 
5542 // Verify that the optimization would not compromize the correctness
TEST_P(TransactionTest,Optimizations)5543 TEST_P(TransactionTest, Optimizations) {
5544   size_t comb_cnt = size_t(1) << 2;  // 2 is number of optimization vars
5545   for (size_t new_comb = 0; new_comb < comb_cnt; new_comb++) {
5546     TransactionDBWriteOptimizations optimizations;
5547     optimizations.skip_concurrency_control = IsInCombination(0, new_comb);
5548     optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb);
5549 
5550     ASSERT_OK(ReOpen());
5551     WriteOptions write_options;
5552     WriteBatch batch;
5553     batch.Put(Slice("k"), Slice("v1"));
5554     ASSERT_OK(db->Write(write_options, &batch));
5555 
5556     ReadOptions ropt;
5557     PinnableSlice pinnable_val;
5558     ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "k", &pinnable_val));
5559     ASSERT_TRUE(pinnable_val == ("v1"));
5560   }
5561 }
5562 
5563 // A comparator that uses only the first three bytes
5564 class ThreeBytewiseComparator : public Comparator {
5565  public:
ThreeBytewiseComparator()5566   ThreeBytewiseComparator() {}
Name() const5567   const char* Name() const override { return "test.ThreeBytewiseComparator"; }
Compare(const Slice & a,const Slice & b) const5568   int Compare(const Slice& a, const Slice& b) const override {
5569     Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5570     Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5571     return na.compare(nb);
5572   }
Equal(const Slice & a,const Slice & b) const5573   bool Equal(const Slice& a, const Slice& b) const override {
5574     Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5575     Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5576     return na == nb;
5577   }
5578   // This methods below dont seem relevant to this test. Implement them if
5579   // proven othersize.
FindShortestSeparator(std::string * start,const Slice & limit) const5580   void FindShortestSeparator(std::string* start,
5581                              const Slice& limit) const override {
5582     const Comparator* bytewise_comp = BytewiseComparator();
5583     bytewise_comp->FindShortestSeparator(start, limit);
5584   }
FindShortSuccessor(std::string * key) const5585   void FindShortSuccessor(std::string* key) const override {
5586     const Comparator* bytewise_comp = BytewiseComparator();
5587     bytewise_comp->FindShortSuccessor(key);
5588   }
5589 };
5590 
5591 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionTest,GetWithoutSnapshot)5592 TEST_P(TransactionTest, GetWithoutSnapshot) {
5593   WriteOptions write_options;
5594   std::atomic<bool> finish = {false};
5595   db->Put(write_options, "key", "value");
5596   ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
5597     for (int i = 0; i < 100; i++) {
5598       TransactionOptions txn_options;
5599       Transaction* txn = db->BeginTransaction(write_options, txn_options);
5600       ASSERT_OK(txn->SetName("xid"));
5601       ASSERT_OK(txn->Put("key", "overridedvalue"));
5602       ASSERT_OK(txn->Put("key", "value"));
5603       ASSERT_OK(txn->Prepare());
5604       ASSERT_OK(txn->Commit());
5605       delete txn;
5606     }
5607     finish = true;
5608   });
5609   ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
5610     while (!finish) {
5611       ReadOptions ropt;
5612       PinnableSlice pinnable_val;
5613       ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val));
5614       ASSERT_TRUE(pinnable_val == ("value"));
5615     }
5616   });
5617   commit_thread.join();
5618   read_thread.join();
5619 }
5620 #endif  // ROCKSDB_VALGRIND_RUN
5621 
5622 // Test that the transactional db can handle duplicate keys in the write batch
TEST_P(TransactionTest,DuplicateKeys)5623 TEST_P(TransactionTest, DuplicateKeys) {
5624   ColumnFamilyOptions cf_options;
5625   std::string cf_name = "two";
5626   ColumnFamilyHandle* cf_handle = nullptr;
5627   {
5628     ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5629     WriteOptions write_options;
5630     WriteBatch batch;
5631     batch.Put(Slice("key"), Slice("value"));
5632     batch.Put(Slice("key2"), Slice("value2"));
5633     // duplicate the keys
5634     batch.Put(Slice("key"), Slice("value3"));
5635     // duplicate the 2nd key. It should not be counted duplicate since a
5636     // sub-patch is cut after the last duplicate.
5637     batch.Put(Slice("key2"), Slice("value4"));
5638     // duplicate the keys but in a different cf. It should not be counted as
5639     // duplicate keys
5640     batch.Put(cf_handle, Slice("key"), Slice("value5"));
5641 
5642     ASSERT_OK(db->Write(write_options, &batch));
5643 
5644     ReadOptions ropt;
5645     PinnableSlice pinnable_val;
5646     auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
5647     ASSERT_OK(s);
5648     ASSERT_TRUE(pinnable_val == ("value3"));
5649     s = db->Get(ropt, db->DefaultColumnFamily(), "key2", &pinnable_val);
5650     ASSERT_OK(s);
5651     ASSERT_TRUE(pinnable_val == ("value4"));
5652     s = db->Get(ropt, cf_handle, "key", &pinnable_val);
5653     ASSERT_OK(s);
5654     ASSERT_TRUE(pinnable_val == ("value5"));
5655 
5656     delete cf_handle;
5657   }
5658 
5659   // Test with non-bytewise comparator
5660   {
5661     ASSERT_OK(ReOpen());
5662     std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5663     cf_options.comparator = comp_gc.get();
5664     ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5665     WriteOptions write_options;
5666     WriteBatch batch;
5667     batch.Put(cf_handle, Slice("key"), Slice("value"));
5668     // The first three bytes are the same, do it must be counted as duplicate
5669     batch.Put(cf_handle, Slice("key2"), Slice("value2"));
5670     // check for 2nd duplicate key in cf with non-default comparator
5671     batch.Put(cf_handle, Slice("key2b"), Slice("value2b"));
5672     ASSERT_OK(db->Write(write_options, &batch));
5673 
5674     // The value must be the most recent value for all the keys equal to "key",
5675     // including "key2"
5676     ReadOptions ropt;
5677     PinnableSlice pinnable_val;
5678     ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val));
5679     ASSERT_TRUE(pinnable_val == ("value2b"));
5680 
5681     // Test duplicate keys with rollback
5682     TransactionOptions txn_options;
5683     Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5684     ASSERT_OK(txn0->SetName("xid"));
5685     ASSERT_OK(txn0->Put(cf_handle, Slice("key3"), Slice("value3")));
5686     ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4")));
5687     ASSERT_OK(txn0->Rollback());
5688     ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val));
5689     ASSERT_TRUE(pinnable_val == ("value2b"));
5690     delete txn0;
5691 
5692     delete cf_handle;
5693     cf_options.comparator = BytewiseComparator();
5694   }
5695 
5696   for (bool do_prepare : {true, false}) {
5697     for (bool do_rollback : {true, false}) {
5698       for (bool with_commit_batch : {true, false}) {
5699         if (with_commit_batch && !do_prepare) {
5700           continue;
5701         }
5702         if (with_commit_batch && do_rollback) {
5703           continue;
5704         }
5705         ASSERT_OK(ReOpen());
5706         ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5707         TransactionOptions txn_options;
5708         txn_options.use_only_the_last_commit_time_batch_for_recovery = false;
5709         WriteOptions write_options;
5710         Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5711         auto s = txn0->SetName("xid");
5712         ASSERT_OK(s);
5713         s = txn0->Put(Slice("foo0"), Slice("bar0a"));
5714         ASSERT_OK(s);
5715         s = txn0->Put(Slice("foo0"), Slice("bar0b"));
5716         ASSERT_OK(s);
5717         s = txn0->Put(Slice("foo1"), Slice("bar1"));
5718         ASSERT_OK(s);
5719         s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
5720         ASSERT_OK(s);
5721         // Repeat a key after the start of a sub-patch. This should not cause a
5722         // duplicate in the most recent sub-patch and hence not creating a new
5723         // sub-patch.
5724         s = txn0->Put(Slice("foo0"), Slice("bar0c"));
5725         ASSERT_OK(s);
5726         s = txn0->Merge(Slice("foo2"), Slice("bar2b"));
5727         ASSERT_OK(s);
5728         // duplicate the keys but in a different cf. It should not be counted as
5729         // duplicate.
5730         s = txn0->Put(cf_handle, Slice("foo0"), Slice("bar0-cf1"));
5731         ASSERT_OK(s);
5732         s = txn0->Put(Slice("foo3"), Slice("bar3"));
5733         ASSERT_OK(s);
5734         s = txn0->Merge(Slice("foo3"), Slice("bar3"));
5735         ASSERT_OK(s);
5736         s = txn0->Put(Slice("foo4"), Slice("bar4"));
5737         ASSERT_OK(s);
5738         s = txn0->Delete(Slice("foo4"));
5739         ASSERT_OK(s);
5740         s = txn0->SingleDelete(Slice("foo4"));
5741         ASSERT_OK(s);
5742         if (do_prepare) {
5743           s = txn0->Prepare();
5744           ASSERT_OK(s);
5745         }
5746         if (do_rollback) {
5747           // Test rolling back the batch with duplicates
5748           s = txn0->Rollback();
5749           ASSERT_OK(s);
5750         } else {
5751           if (with_commit_batch) {
5752             assert(do_prepare);
5753             auto cb = txn0->GetCommitTimeWriteBatch();
5754             // duplicate a key in the original batch
5755             // TODO(myabandeh): the behavior of GetCommitTimeWriteBatch
5756             // conflicting with the prepared batch is currently undefined and
5757             // gives different results in different implementations.
5758 
5759             // s = cb->Put(Slice("foo0"), Slice("bar0d"));
5760             // ASSERT_OK(s);
5761             // add a new duplicate key
5762             s = cb->Put(Slice("foo6"), Slice("bar6a"));
5763             ASSERT_OK(s);
5764             s = cb->Put(Slice("foo6"), Slice("bar6b"));
5765             ASSERT_OK(s);
5766             // add a duplicate key that is removed in the same batch
5767             s = cb->Put(Slice("foo7"), Slice("bar7a"));
5768             ASSERT_OK(s);
5769             s = cb->Delete(Slice("foo7"));
5770             ASSERT_OK(s);
5771           }
5772           s = txn0->Commit();
5773           ASSERT_OK(s);
5774         }
5775         delete txn0;
5776         ReadOptions ropt;
5777         PinnableSlice pinnable_val;
5778 
5779         if (do_rollback) {
5780           s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5781           ASSERT_TRUE(s.IsNotFound());
5782           s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5783           ASSERT_TRUE(s.IsNotFound());
5784           s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5785           ASSERT_TRUE(s.IsNotFound());
5786           s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5787           ASSERT_TRUE(s.IsNotFound());
5788           s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5789           ASSERT_TRUE(s.IsNotFound());
5790           s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5791           ASSERT_TRUE(s.IsNotFound());
5792         } else {
5793           s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5794           ASSERT_OK(s);
5795           ASSERT_TRUE(pinnable_val == ("bar0c"));
5796           s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5797           ASSERT_OK(s);
5798           ASSERT_TRUE(pinnable_val == ("bar0-cf1"));
5799           s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5800           ASSERT_OK(s);
5801           ASSERT_TRUE(pinnable_val == ("bar1"));
5802           s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5803           ASSERT_OK(s);
5804           ASSERT_TRUE(pinnable_val == ("bar2a,bar2b"));
5805           s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5806           ASSERT_OK(s);
5807           ASSERT_TRUE(pinnable_val == ("bar3,bar3"));
5808           s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5809           ASSERT_TRUE(s.IsNotFound());
5810           if (with_commit_batch) {
5811             s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val);
5812             ASSERT_OK(s);
5813             ASSERT_TRUE(pinnable_val == ("bar6b"));
5814             s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val);
5815             ASSERT_TRUE(s.IsNotFound());
5816           }
5817         }
5818         delete cf_handle;
5819       }  // with_commit_batch
5820     }    // do_rollback
5821   }      // do_prepare
5822 
5823   if (!options.unordered_write) {
5824     // Also test with max_successive_merges > 0. max_successive_merges will not
5825     // affect our algorithm for duplicate key insertion but we add the test to
5826     // verify that.
5827     cf_options.max_successive_merges = 2;
5828     cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
5829     ASSERT_OK(ReOpen());
5830     db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
5831     WriteOptions write_options;
5832     // Ensure one value for the key
5833     ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value")));
5834     WriteBatch batch;
5835     // Merge more than max_successive_merges times
5836     batch.Merge(cf_handle, Slice("key"), Slice("1"));
5837     batch.Merge(cf_handle, Slice("key"), Slice("2"));
5838     batch.Merge(cf_handle, Slice("key"), Slice("3"));
5839     batch.Merge(cf_handle, Slice("key"), Slice("4"));
5840     ASSERT_OK(db->Write(write_options, &batch));
5841     ReadOptions read_options;
5842     string value;
5843     ASSERT_OK(db->Get(read_options, cf_handle, "key", &value));
5844     ASSERT_EQ(value, "value,1,2,3,4");
5845     delete cf_handle;
5846   }
5847 
5848   {
5849     // Test that the duplicate detection is not compromised after rolling back
5850     // to a save point
5851     TransactionOptions txn_options;
5852     WriteOptions write_options;
5853     Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5854     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
5855     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
5856     txn0->SetSavePoint();
5857     ASSERT_OK(txn0->RollbackToSavePoint());
5858     ASSERT_OK(txn0->Commit());
5859     delete txn0;
5860   }
5861 
5862   // Test sucessfull recovery after a crash
5863   {
5864     ASSERT_OK(ReOpen());
5865     TransactionOptions txn_options;
5866     WriteOptions write_options;
5867     ReadOptions ropt;
5868     Transaction* txn0;
5869     PinnableSlice pinnable_val;
5870     Status s;
5871 
5872     std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5873     cf_options.comparator = comp_gc.get();
5874     cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
5875     ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5876     delete cf_handle;
5877     std::vector<ColumnFamilyDescriptor> cfds{
5878         ColumnFamilyDescriptor(kDefaultColumnFamilyName,
5879                                ColumnFamilyOptions(options)),
5880         ColumnFamilyDescriptor(cf_name, cf_options),
5881     };
5882     std::vector<ColumnFamilyHandle*> handles;
5883     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5884 
5885     ASSERT_OK(db->Put(write_options, "foo0", "init"));
5886     ASSERT_OK(db->Put(write_options, "foo1", "init"));
5887     ASSERT_OK(db->Put(write_options, handles[1], "foo0", "init"));
5888     ASSERT_OK(db->Put(write_options, handles[1], "foo1", "init"));
5889 
5890     // one entry
5891     txn0 = db->BeginTransaction(write_options, txn_options);
5892     ASSERT_OK(txn0->SetName("xid"));
5893     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
5894     ASSERT_OK(txn0->Prepare());
5895     delete txn0;
5896     // This will check the asserts inside recovery code
5897     ASSERT_OK(db->FlushWAL(true));
5898     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5899     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5900     txn0 = db->GetTransactionByName("xid");
5901     ASSERT_TRUE(txn0 != nullptr);
5902     ASSERT_OK(txn0->Commit());
5903     delete txn0;
5904     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5905     ASSERT_OK(s);
5906     ASSERT_TRUE(pinnable_val == ("bar0a"));
5907 
5908     // two entries, no duplicate
5909     txn0 = db->BeginTransaction(write_options, txn_options);
5910     ASSERT_OK(txn0->SetName("xid"));
5911     ASSERT_OK(txn0->Put(handles[1], Slice("foo0"), Slice("bar0b")));
5912     ASSERT_OK(txn0->Put(handles[1], Slice("fol1"), Slice("bar1b")));
5913     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
5914     ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1b")));
5915     ASSERT_OK(txn0->Prepare());
5916     delete txn0;
5917     // This will check the asserts inside recovery code
5918     db->FlushWAL(true);
5919     // Flush only cf 1
5920     reinterpret_cast<DBImpl*>(db->GetRootDB())
5921         ->TEST_FlushMemTable(true, false, handles[1]);
5922     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5923     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5924     txn0 = db->GetTransactionByName("xid");
5925     ASSERT_TRUE(txn0 != nullptr);
5926     ASSERT_OK(txn0->Commit());
5927     delete txn0;
5928     pinnable_val.Reset();
5929     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5930     ASSERT_OK(s);
5931     ASSERT_TRUE(pinnable_val == ("bar0b"));
5932     pinnable_val.Reset();
5933     s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5934     ASSERT_OK(s);
5935     ASSERT_TRUE(pinnable_val == ("bar1b"));
5936     pinnable_val.Reset();
5937     s = db->Get(ropt, handles[1], "foo0", &pinnable_val);
5938     ASSERT_OK(s);
5939     ASSERT_TRUE(pinnable_val == ("bar0b"));
5940     pinnable_val.Reset();
5941     s = db->Get(ropt, handles[1], "fol1", &pinnable_val);
5942     ASSERT_OK(s);
5943     ASSERT_TRUE(pinnable_val == ("bar1b"));
5944 
5945     // one duplicate with ::Put
5946     txn0 = db->BeginTransaction(write_options, txn_options);
5947     ASSERT_OK(txn0->SetName("xid"));
5948     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0c")));
5949     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey1"), Slice("bar1d")));
5950     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0c")));
5951     ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1c")));
5952     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0d")));
5953     ASSERT_OK(txn0->Prepare());
5954     delete txn0;
5955     // This will check the asserts inside recovery code
5956     ASSERT_OK(db->FlushWAL(true));
5957     // Flush only cf 1
5958     reinterpret_cast<DBImpl*>(db->GetRootDB())
5959         ->TEST_FlushMemTable(true, false, handles[1]);
5960     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5961     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5962     txn0 = db->GetTransactionByName("xid");
5963     ASSERT_TRUE(txn0 != nullptr);
5964     ASSERT_OK(txn0->Commit());
5965     delete txn0;
5966     pinnable_val.Reset();
5967     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5968     ASSERT_OK(s);
5969     ASSERT_TRUE(pinnable_val == ("bar0d"));
5970     pinnable_val.Reset();
5971     s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5972     ASSERT_OK(s);
5973     ASSERT_TRUE(pinnable_val == ("bar1c"));
5974     pinnable_val.Reset();
5975     s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
5976     ASSERT_OK(s);
5977     ASSERT_TRUE(pinnable_val == ("bar1d"));
5978 
5979     // Duplicate with ::Put, ::Delete
5980     txn0 = db->BeginTransaction(write_options, txn_options);
5981     ASSERT_OK(txn0->SetName("xid"));
5982     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0e")));
5983     ASSERT_OK(txn0->Delete(handles[1], Slice("key-nonkey1")));
5984     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
5985     ASSERT_OK(txn0->Delete(Slice("foo0")));
5986     ASSERT_OK(txn0->Prepare());
5987     delete txn0;
5988     // This will check the asserts inside recovery code
5989     ASSERT_OK(db->FlushWAL(true));
5990     // Flush only cf 1
5991     reinterpret_cast<DBImpl*>(db->GetRootDB())
5992         ->TEST_FlushMemTable(true, false, handles[1]);
5993     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5994     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5995     txn0 = db->GetTransactionByName("xid");
5996     ASSERT_TRUE(txn0 != nullptr);
5997     ASSERT_OK(txn0->Commit());
5998     delete txn0;
5999     pinnable_val.Reset();
6000     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6001     ASSERT_TRUE(s.IsNotFound());
6002     pinnable_val.Reset();
6003     s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6004     ASSERT_TRUE(s.IsNotFound());
6005 
6006     // Duplicate with ::Put, ::SingleDelete
6007     txn0 = db->BeginTransaction(write_options, txn_options);
6008     ASSERT_OK(txn0->SetName("xid"));
6009     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0g")));
6010     ASSERT_OK(txn0->SingleDelete(handles[1], Slice("key-nonkey1")));
6011     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
6012     ASSERT_OK(txn0->SingleDelete(Slice("foo0")));
6013     ASSERT_OK(txn0->Prepare());
6014     delete txn0;
6015     // This will check the asserts inside recovery code
6016     ASSERT_OK(db->FlushWAL(true));
6017     // Flush only cf 1
6018     reinterpret_cast<DBImpl*>(db->GetRootDB())
6019         ->TEST_FlushMemTable(true, false, handles[1]);
6020     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6021     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6022     txn0 = db->GetTransactionByName("xid");
6023     ASSERT_TRUE(txn0 != nullptr);
6024     ASSERT_OK(txn0->Commit());
6025     delete txn0;
6026     pinnable_val.Reset();
6027     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6028     ASSERT_TRUE(s.IsNotFound());
6029     pinnable_val.Reset();
6030     s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6031     ASSERT_TRUE(s.IsNotFound());
6032 
6033     // Duplicate with ::Put, ::Merge
6034     txn0 = db->BeginTransaction(write_options, txn_options);
6035     ASSERT_OK(txn0->SetName("xid"));
6036     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar1i")));
6037     ASSERT_OK(txn0->Merge(handles[1], Slice("key-nonkey1"), Slice("bar1j")));
6038     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0f")));
6039     ASSERT_OK(txn0->Merge(Slice("foo0"), Slice("bar0g")));
6040     ASSERT_OK(txn0->Prepare());
6041     delete txn0;
6042     // This will check the asserts inside recovery code
6043     ASSERT_OK(db->FlushWAL(true));
6044     // Flush only cf 1
6045     reinterpret_cast<DBImpl*>(db->GetRootDB())
6046         ->TEST_FlushMemTable(true, false, handles[1]);
6047     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6048     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6049     txn0 = db->GetTransactionByName("xid");
6050     ASSERT_TRUE(txn0 != nullptr);
6051     ASSERT_OK(txn0->Commit());
6052     delete txn0;
6053     pinnable_val.Reset();
6054     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6055     ASSERT_OK(s);
6056     ASSERT_TRUE(pinnable_val == ("bar0f,bar0g"));
6057     pinnable_val.Reset();
6058     s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6059     ASSERT_OK(s);
6060     ASSERT_TRUE(pinnable_val == ("bar1i,bar1j"));
6061 
6062     for (auto h : handles) {
6063       delete h;
6064     }
6065     delete db;
6066     db = nullptr;
6067   }
6068 }
6069 
6070 // Test that the reseek optimization in iterators will not result in an infinite
6071 // loop if there are too many uncommitted entries before the snapshot.
TEST_P(TransactionTest,ReseekOptimization)6072 TEST_P(TransactionTest, ReseekOptimization) {
6073   WriteOptions write_options;
6074   write_options.sync = true;
6075   write_options.disableWAL = false;
6076   ColumnFamilyDescriptor cfd;
6077   db->DefaultColumnFamily()->GetDescriptor(&cfd);
6078   auto max_skip = cfd.options.max_sequential_skip_in_iterations;
6079 
6080   ASSERT_OK(db->Put(write_options, Slice("foo0"), Slice("initv")));
6081 
6082   TransactionOptions txn_options;
6083   Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
6084   ASSERT_OK(txn0->SetName("xid"));
6085   // Duplicate keys will result into separate sequence numbers in WritePrepared
6086   // and WriteUnPrepared
6087   for (size_t i = 0; i < 2 * max_skip; i++) {
6088     ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar")));
6089   }
6090   ASSERT_OK(txn0->Prepare());
6091   ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("initv")));
6092 
6093   ReadOptions read_options;
6094   // To avoid loops
6095   read_options.max_skippable_internal_keys = 10 * max_skip;
6096   Iterator* iter = db->NewIterator(read_options);
6097   ASSERT_OK(iter->status());
6098   size_t cnt = 0;
6099   iter->SeekToFirst();
6100   while (iter->Valid()) {
6101     iter->Next();
6102     ASSERT_OK(iter->status());
6103     cnt++;
6104   }
6105   ASSERT_EQ(cnt, 2);
6106   cnt = 0;
6107   iter->SeekToLast();
6108   while (iter->Valid()) {
6109     iter->Prev();
6110     ASSERT_OK(iter->status());
6111     cnt++;
6112   }
6113   ASSERT_EQ(cnt, 2);
6114   delete iter;
6115   txn0->Rollback();
6116   delete txn0;
6117 }
6118 
6119 // After recovery in kPointInTimeRecovery mode, the corrupted log file remains
6120 // there. The new log files should be still read succesfully during recovery of
6121 // the 2nd crash.
TEST_P(TransactionTest,DoubleCrashInRecovery)6122 TEST_P(TransactionTest, DoubleCrashInRecovery) {
6123   for (const bool manual_wal_flush : {false, true}) {
6124     for (const bool write_after_recovery : {false, true}) {
6125       options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
6126       options.manual_wal_flush = manual_wal_flush;
6127       ReOpen();
6128       std::string cf_name = "two";
6129       ColumnFamilyOptions cf_options;
6130       ColumnFamilyHandle* cf_handle = nullptr;
6131       ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
6132 
6133       // Add a prepare entry to prevent the older logs from being deleted.
6134       WriteOptions write_options;
6135       TransactionOptions txn_options;
6136       Transaction* txn = db->BeginTransaction(write_options, txn_options);
6137       ASSERT_OK(txn->SetName("xid"));
6138       ASSERT_OK(txn->Put(Slice("foo-prepare"), Slice("bar-prepare")));
6139       ASSERT_OK(txn->Prepare());
6140 
6141       FlushOptions flush_ops;
6142       db->Flush(flush_ops);
6143       // Now we have a log that cannot be deleted
6144 
6145       ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1"));
6146       // Flush only the 2nd cf
6147       db->Flush(flush_ops, cf_handle);
6148 
6149       // The value is large enough to be touched by the corruption we ingest
6150       // below.
6151       std::string large_value(400, ' ');
6152       // key/value not touched by corruption
6153       ASSERT_OK(db->Put(write_options, "foo2", "bar2"));
6154       // key/value touched by corruption
6155       ASSERT_OK(db->Put(write_options, "foo3", large_value));
6156       // key/value not touched by corruption
6157       ASSERT_OK(db->Put(write_options, "foo4", "bar4"));
6158 
6159       db->FlushWAL(true);
6160       DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
6161       uint64_t wal_file_id = db_impl->TEST_LogfileNumber();
6162       std::string fname = LogFileName(dbname, wal_file_id);
6163       reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6164       delete txn;
6165       delete cf_handle;
6166       delete db;
6167       db = nullptr;
6168 
6169       // Corrupt the last log file in the middle, so that it is not corrupted
6170       // in the tail.
6171       std::string file_content;
6172       ASSERT_OK(ReadFileToString(env, fname, &file_content));
6173       file_content[400] = 'h';
6174       file_content[401] = 'a';
6175       ASSERT_OK(env->DeleteFile(fname));
6176       ASSERT_OK(WriteStringToFile(env, file_content, fname, true));
6177 
6178       // Recover from corruption
6179       std::vector<ColumnFamilyHandle*> handles;
6180       std::vector<ColumnFamilyDescriptor> column_families;
6181       column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName,
6182                                                        ColumnFamilyOptions()));
6183       column_families.push_back(
6184           ColumnFamilyDescriptor("two", ColumnFamilyOptions()));
6185       ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6186 
6187       if (write_after_recovery) {
6188         // Write data to the log right after the corrupted log
6189         ASSERT_OK(db->Put(write_options, "foo5", large_value));
6190       }
6191 
6192       // Persist data written to WAL during recovery or by the last Put
6193       db->FlushWAL(true);
6194       // 2nd crash to recover while having a valid log after the corrupted one.
6195       ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6196       assert(db != nullptr);
6197       txn = db->GetTransactionByName("xid");
6198       ASSERT_TRUE(txn != nullptr);
6199       ASSERT_OK(txn->Commit());
6200       delete txn;
6201       for (auto handle : handles) {
6202         delete handle;
6203       }
6204     }
6205   }
6206 }
6207 
6208 }  // namespace ROCKSDB_NAMESPACE
6209 
main(int argc,char ** argv)6210 int main(int argc, char** argv) {
6211   ::testing::InitGoogleTest(&argc, argv);
6212   return RUN_ALL_TESTS();
6213 }
6214 
6215 #else
6216 #include <stdio.h>
6217 
main(int,char **)6218 int main(int /*argc*/, char** /*argv*/) {
6219   fprintf(stderr,
6220           "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
6221   return 0;
6222 }
6223 
6224 #endif  // ROCKSDB_LITE
6225