1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/transaction_test.h"
9 
10 #include <algorithm>
11 #include <functional>
12 #include <string>
13 #include <thread>
14 
15 #include "db/db_impl/db_impl.h"
16 #include "port/port.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/options.h"
19 #include "rocksdb/perf_context.h"
20 #include "rocksdb/utilities/transaction.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "table/mock_table.h"
23 #include "test_util/sync_point.h"
24 #include "test_util/testharness.h"
25 #include "test_util/testutil.h"
26 #include "test_util/transaction_test_util.h"
27 #include "util/random.h"
28 #include "util/string_util.h"
29 #include "utilities/fault_injection_env.h"
30 #include "utilities/merge_operators.h"
31 #include "utilities/merge_operators/string_append/stringappend.h"
32 #include "utilities/transactions/pessimistic_transaction_db.h"
33 
34 using std::string;
35 
36 namespace ROCKSDB_NAMESPACE {
37 
38 INSTANTIATE_TEST_CASE_P(
39     DBAsBaseDB, TransactionTest,
40     ::testing::Values(
41         std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
42         std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
43         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
44         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
45         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
46         std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
47         std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
48 INSTANTIATE_TEST_CASE_P(
49     DBAsBaseDB, TransactionStressTest,
50     ::testing::Values(
51         std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
52         std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
53         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
54         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
55         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
56         std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
57         std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
58 INSTANTIATE_TEST_CASE_P(
59     StackableDBAsBaseDB, TransactionTest,
60     ::testing::Values(
61         std::make_tuple(true, true, WRITE_COMMITTED, kOrderedWrite),
62         std::make_tuple(true, true, WRITE_PREPARED, kOrderedWrite),
63         std::make_tuple(true, true, WRITE_UNPREPARED, kOrderedWrite)));
64 
65 // MySQLStyleTransactionTest takes far too long for valgrind to run.
66 #ifndef ROCKSDB_VALGRIND_RUN
67 INSTANTIATE_TEST_CASE_P(
68     MySQLStyleTransactionTest, MySQLStyleTransactionTest,
69     ::testing::Values(
70         std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite, false),
71         std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite, false),
72         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, false),
73         std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, true),
74         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, false),
75         std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, true),
76         std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, false),
77         std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, true),
78         std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, false),
79         std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, true),
80         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, false),
81         std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, true)));
82 #endif  // ROCKSDB_VALGRIND_RUN
83 
TEST_P(TransactionTest,DoubleEmptyWrite)84 TEST_P(TransactionTest, DoubleEmptyWrite) {
85   WriteOptions write_options;
86   write_options.sync = true;
87   write_options.disableWAL = false;
88 
89   WriteBatch batch;
90 
91   ASSERT_OK(db->Write(write_options, &batch));
92   ASSERT_OK(db->Write(write_options, &batch));
93 
94   // Also test committing empty transactions in 2PC
95   TransactionOptions txn_options;
96   Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
97   ASSERT_OK(txn0->SetName("xid"));
98   ASSERT_OK(txn0->Prepare());
99   ASSERT_OK(txn0->Commit());
100   delete txn0;
101 
102   // Also test that it works during recovery
103   txn0 = db->BeginTransaction(write_options, txn_options);
104   ASSERT_OK(txn0->SetName("xid2"));
105   ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
106   ASSERT_OK(txn0->Prepare());
107   delete txn0;
108   reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
109   ASSERT_OK(ReOpenNoDelete());
110   assert(db != nullptr);
111   txn0 = db->GetTransactionByName("xid2");
112   ASSERT_OK(txn0->Commit());
113   delete txn0;
114 }
115 
TEST_P(TransactionTest,SuccessTest)116 TEST_P(TransactionTest, SuccessTest) {
117   ASSERT_OK(db->ResetStats());
118 
119   WriteOptions write_options;
120   ReadOptions read_options;
121   std::string value;
122 
123   ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
124   ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
125 
126   Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
127   ASSERT_TRUE(txn);
128 
129   ASSERT_EQ(0, txn->GetNumPuts());
130   ASSERT_LE(0, txn->GetID());
131 
132   ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
133   ASSERT_EQ(value, "bar");
134 
135   ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
136 
137   ASSERT_EQ(1, txn->GetNumPuts());
138 
139   ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
140   ASSERT_EQ(value, "bar2");
141 
142   ASSERT_OK(txn->Commit());
143 
144   ASSERT_OK(db->Get(read_options, "foo", &value));
145   ASSERT_EQ(value, "bar2");
146 
147   delete txn;
148 }
149 
150 // The test clarifies the contract of do_validate and assume_tracked
151 // in GetForUpdate and Put/Merge/Delete
TEST_P(TransactionTest,AssumeExclusiveTracked)152 TEST_P(TransactionTest, AssumeExclusiveTracked) {
153   WriteOptions write_options;
154   ReadOptions read_options;
155   std::string value;
156   Status s;
157   TransactionOptions txn_options;
158   txn_options.lock_timeout = 1;
159   const bool EXCLUSIVE = true;
160   const bool DO_VALIDATE = true;
161   const bool ASSUME_LOCKED = true;
162 
163   Transaction* txn = db->BeginTransaction(write_options, txn_options);
164   ASSERT_TRUE(txn);
165   txn->SetSnapshot();
166 
167   // commit a value after the snapshot is taken
168   ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
169 
170   // By default write should fail to the commit after our snapshot
171   s = txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE);
172   ASSERT_TRUE(s.IsBusy());
173   // But the user could direct the db to skip validating the snapshot. The read
174   // value then should be the most recently committed
175   ASSERT_OK(
176       txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE, !DO_VALIDATE));
177   ASSERT_EQ(value, "bar");
178 
179   // Although ValidateSnapshot is skipped the key must have still got locked
180   s = db->Put(write_options, Slice("foo"), Slice("bar"));
181   ASSERT_TRUE(s.IsTimedOut());
182 
183   // By default the write operations should fail due to the commit after the
184   // snapshot
185   s = txn->Put(Slice("foo"), Slice("bar1"));
186   ASSERT_TRUE(s.IsBusy());
187   s = txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
188                !ASSUME_LOCKED);
189   ASSERT_TRUE(s.IsBusy());
190   // But the user could direct the db that it already assumes exclusive lock on
191   // the key due to the previous GetForUpdate call.
192   ASSERT_OK(txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
193                      ASSUME_LOCKED));
194   ASSERT_OK(txn->Merge(db->DefaultColumnFamily(), Slice("foo"), Slice("bar2"),
195                        ASSUME_LOCKED));
196   ASSERT_OK(
197       txn->Delete(db->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED));
198   ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"),
199                               ASSUME_LOCKED));
200 
201   ASSERT_OK(txn->Rollback());
202   delete txn;
203 }
204 
205 // This test clarifies the contract of ValidateSnapshot
TEST_P(TransactionTest,ValidateSnapshotTest)206 TEST_P(TransactionTest, ValidateSnapshotTest) {
207   for (bool with_flush : {true}) {
208     for (bool with_2pc : {true}) {
209       ASSERT_OK(ReOpen());
210       WriteOptions write_options;
211       ReadOptions read_options;
212       std::string value;
213 
214       assert(db != nullptr);
215       Transaction* txn1 =
216           db->BeginTransaction(write_options, TransactionOptions());
217       ASSERT_TRUE(txn1);
218       ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1")));
219       if (with_2pc) {
220         ASSERT_OK(txn1->SetName("xid1"));
221         ASSERT_OK(txn1->Prepare());
222       }
223 
224       if (with_flush) {
225         auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
226         ASSERT_OK(db_impl->TEST_FlushMemTable(true));
227         // Make sure the flushed memtable is not kept in memory
228         int max_memtable_in_history =
229             std::max(
230                 options.max_write_buffer_number,
231                 static_cast<int>(options.max_write_buffer_size_to_maintain) /
232                     static_cast<int>(options.write_buffer_size)) +
233             1;
234         for (int i = 0; i < max_memtable_in_history; i++) {
235           ASSERT_OK(db->Put(write_options, Slice("key"), Slice("value")));
236           ASSERT_OK(db_impl->TEST_FlushMemTable(true));
237         }
238       }
239 
240       Transaction* txn2 =
241           db->BeginTransaction(write_options, TransactionOptions());
242       ASSERT_TRUE(txn2);
243       txn2->SetSnapshot();
244 
245       ASSERT_OK(txn1->Commit());
246       delete txn1;
247 
248       auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
249       // Test the simple case where the key is not tracked yet
250       auto trakced_seq = kMaxSequenceNumber;
251       auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
252                                           &trakced_seq);
253       ASSERT_TRUE(s.IsBusy());
254       delete txn2;
255     }
256   }
257 }
258 
TEST_P(TransactionTest,WaitingTxn)259 TEST_P(TransactionTest, WaitingTxn) {
260   WriteOptions write_options;
261   ReadOptions read_options;
262   TransactionOptions txn_options;
263   string value;
264   Status s;
265 
266   txn_options.lock_timeout = 1;
267   s = db->Put(write_options, Slice("foo"), Slice("bar"));
268   ASSERT_OK(s);
269 
270   /* create second cf */
271   ColumnFamilyHandle* cfa;
272   ColumnFamilyOptions cf_options;
273   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
274   ASSERT_OK(s);
275   s = db->Put(write_options, cfa, Slice("foo"), Slice("bar"));
276   ASSERT_OK(s);
277 
278   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
279   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
280   TransactionID id1 = txn1->GetID();
281   ASSERT_TRUE(txn1);
282   ASSERT_TRUE(txn2);
283 
284   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
285       "PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) {
286         std::string key;
287         uint32_t cf_id;
288         std::vector<TransactionID> wait = txn2->GetWaitingTxns(&cf_id, &key);
289         ASSERT_EQ(key, "foo");
290         ASSERT_EQ(wait.size(), 1);
291         ASSERT_EQ(wait[0], id1);
292         ASSERT_EQ(cf_id, 0U);
293       });
294 
295   get_perf_context()->Reset();
296   // lock key in default cf
297   s = txn1->GetForUpdate(read_options, "foo", &value);
298   ASSERT_OK(s);
299   ASSERT_EQ(value, "bar");
300   ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
301 
302   // lock key in cfa
303   s = txn1->GetForUpdate(read_options, cfa, "foo", &value);
304   ASSERT_OK(s);
305   ASSERT_EQ(value, "bar");
306   ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
307 
308   auto lock_data = db->GetLockStatusData();
309   // Locked keys exist in both column family.
310   ASSERT_EQ(lock_data.size(), 2);
311 
312   auto cf_iterator = lock_data.begin();
313 
314   // The iterator points to an unordered_multimap
315   // thus the test can not assume any particular order.
316 
317   // Column family is 1 or 0 (cfa).
318   if (cf_iterator->first != 1 && cf_iterator->first != 0) {
319     FAIL();
320   }
321   // The locked key is "foo" and is locked by txn1
322   ASSERT_EQ(cf_iterator->second.key, "foo");
323   ASSERT_EQ(cf_iterator->second.ids.size(), 1);
324   ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
325 
326   cf_iterator++;
327 
328   // Column family is 0 (default) or 1.
329   if (cf_iterator->first != 1 && cf_iterator->first != 0) {
330     FAIL();
331   }
332   // The locked key is "foo" and is locked by txn1
333   ASSERT_EQ(cf_iterator->second.key, "foo");
334   ASSERT_EQ(cf_iterator->second.ids.size(), 1);
335   ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
336 
337   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
338 
339   s = txn2->GetForUpdate(read_options, "foo", &value);
340   ASSERT_TRUE(s.IsTimedOut());
341   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
342   ASSERT_EQ(get_perf_context()->key_lock_wait_count, 1);
343   ASSERT_GE(get_perf_context()->key_lock_wait_time, 0);
344 
345   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
346   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
347 
348   delete cfa;
349   delete txn1;
350   delete txn2;
351 }
352 
TEST_P(TransactionTest,SharedLocks)353 TEST_P(TransactionTest, SharedLocks) {
354   WriteOptions write_options;
355   ReadOptions read_options;
356   TransactionOptions txn_options;
357   Status s;
358 
359   txn_options.lock_timeout = 1;
360   s = db->Put(write_options, Slice("foo"), Slice("bar"));
361   ASSERT_OK(s);
362 
363   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
364   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
365   Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
366   ASSERT_TRUE(txn1);
367   ASSERT_TRUE(txn2);
368   ASSERT_TRUE(txn3);
369 
370   // Test shared access between txns
371   s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
372   ASSERT_OK(s);
373 
374   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
375   ASSERT_OK(s);
376 
377   s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
378   ASSERT_OK(s);
379 
380   auto lock_data = db->GetLockStatusData();
381   ASSERT_EQ(lock_data.size(), 1);
382 
383   auto cf_iterator = lock_data.begin();
384   ASSERT_EQ(cf_iterator->second.key, "foo");
385 
386   // We compare whether the set of txns locking this key is the same. To do
387   // this, we need to sort both vectors so that the comparison is done
388   // correctly.
389   std::vector<TransactionID> expected_txns = {txn1->GetID(), txn2->GetID(),
390                                               txn3->GetID()};
391   std::vector<TransactionID> lock_txns = cf_iterator->second.ids;
392   ASSERT_EQ(expected_txns, lock_txns);
393   ASSERT_FALSE(cf_iterator->second.exclusive);
394 
395   ASSERT_OK(txn1->Rollback());
396   ASSERT_OK(txn2->Rollback());
397   ASSERT_OK(txn3->Rollback());
398 
399   // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
400   s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
401   ASSERT_OK(s);
402 
403   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
404   ASSERT_OK(s);
405 
406   s = txn3->GetForUpdate(read_options, "foo", nullptr);
407   ASSERT_TRUE(s.IsTimedOut());
408   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
409 
410   txn1->UndoGetForUpdate("foo");
411   s = txn3->GetForUpdate(read_options, "foo", nullptr);
412   ASSERT_TRUE(s.IsTimedOut());
413   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
414 
415   txn2->UndoGetForUpdate("foo");
416   s = txn3->GetForUpdate(read_options, "foo", nullptr);
417   ASSERT_OK(s);
418 
419   ASSERT_OK(txn1->Rollback());
420   ASSERT_OK(txn2->Rollback());
421   ASSERT_OK(txn3->Rollback());
422 
423   // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
424   s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
425   ASSERT_OK(s);
426 
427   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
428   ASSERT_OK(s);
429 
430   s = txn2->GetForUpdate(read_options, "foo", nullptr);
431   ASSERT_TRUE(s.IsTimedOut());
432   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
433 
434   txn1->UndoGetForUpdate("foo");
435   s = txn2->GetForUpdate(read_options, "foo", nullptr);
436   ASSERT_OK(s);
437 
438   ASSERT_OK(txn1->Rollback());
439   ASSERT_OK(txn2->Rollback());
440 
441   // Test txn1 trying to downgrade its lock.
442   s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */);
443   ASSERT_OK(s);
444 
445   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
446   ASSERT_TRUE(s.IsTimedOut());
447   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
448 
449   // Should still fail after "downgrading".
450   s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
451   ASSERT_OK(s);
452 
453   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
454   ASSERT_TRUE(s.IsTimedOut());
455   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
456 
457   ASSERT_OK(txn1->Rollback());
458   ASSERT_OK(txn2->Rollback());
459 
460   // Test txn1 holding an exclusive lock and txn2 trying to obtain shared
461   // access.
462   s = txn1->GetForUpdate(read_options, "foo", nullptr);
463   ASSERT_OK(s);
464 
465   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
466   ASSERT_TRUE(s.IsTimedOut());
467   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
468 
469   txn1->UndoGetForUpdate("foo");
470   s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
471   ASSERT_OK(s);
472 
473   delete txn1;
474   delete txn2;
475   delete txn3;
476 }
477 
TEST_P(TransactionTest,DeadlockCycleShared)478 TEST_P(TransactionTest, DeadlockCycleShared) {
479   WriteOptions write_options;
480   ReadOptions read_options;
481   TransactionOptions txn_options;
482 
483   txn_options.lock_timeout = 1000000;
484   txn_options.deadlock_detect = true;
485 
486   // Set up a wait for chain like this:
487   //
488   // Tn -> T(n*2)
489   // Tn -> T(n*2 + 1)
490   //
491   // So we have:
492   // T1 -> T2 -> T4 ...
493   //    |     |> T5 ...
494   //    |> T3 -> T6 ...
495   //          |> T7 ...
496   // up to T31, then T[16 - 31] -> T1.
497   // Note that Tn holds lock on floor(n / 2).
498 
499   std::vector<Transaction*> txns(31);
500 
501   for (uint32_t i = 0; i < 31; i++) {
502     txns[i] = db->BeginTransaction(write_options, txn_options);
503     ASSERT_TRUE(txns[i]);
504     auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr,
505                                    false /* exclusive */);
506     ASSERT_OK(s);
507   }
508 
509   std::atomic<uint32_t> checkpoints(0);
510   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
511       "PointLockManager::AcquireWithTimeout:WaitingTxn",
512       [&](void* /*arg*/) { checkpoints.fetch_add(1); });
513   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
514 
515   // We want the leaf transactions to block and hold everyone back.
516   std::vector<port::Thread> threads;
517   for (uint32_t i = 0; i < 15; i++) {
518     std::function<void()> blocking_thread = [&, i] {
519       auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr,
520                                      true /* exclusive */);
521       ASSERT_OK(s);
522       ASSERT_OK(txns[i]->Rollback());
523       delete txns[i];
524     };
525     threads.emplace_back(blocking_thread);
526   }
527 
528   // Wait until all threads are waiting on each other.
529   while (checkpoints.load() != 15) {
530     /* sleep override */
531     std::this_thread::sleep_for(std::chrono::milliseconds(100));
532   }
533   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
534   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
535 
536   // Complete the cycle T[16 - 31] -> T1
537   for (uint32_t i = 15; i < 31; i++) {
538     auto s =
539         txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */);
540     ASSERT_TRUE(s.IsDeadlock());
541 
542     // Calculate next buffer len, plateau at 5 when 5 records are inserted.
543     const uint32_t curr_dlock_buffer_len_ =
544         (i - 14 > kInitialMaxDeadlocks) ? kInitialMaxDeadlocks : (i - 14);
545 
546     auto dlock_buffer = db->GetDeadlockInfoBuffer();
547     ASSERT_EQ(dlock_buffer.size(), curr_dlock_buffer_len_);
548     auto dlock_entry = dlock_buffer[0].path;
549     ASSERT_EQ(dlock_entry.size(), kInitialMaxDeadlocks);
550     int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
551     int64_t cur_deadlock_time = 0;
552     for (auto const& dl_path_rec : dlock_buffer) {
553       cur_deadlock_time = dl_path_rec.deadlock_time;
554       ASSERT_NE(cur_deadlock_time, 0);
555       ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
556       pre_deadlock_time = cur_deadlock_time;
557     }
558 
559     int64_t curr_waiting_key = 0;
560 
561     // Offset of each txn id from the root of the shared dlock tree's txn id.
562     int64_t offset_root = dlock_entry[0].m_txn_id - 1;
563     // Offset of the final entry in the dlock path from the root's txn id.
564     TransactionID leaf_id =
565         dlock_entry[dlock_entry.size() - 1].m_txn_id - offset_root;
566 
567     for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
568       auto dl_node = *it;
569       ASSERT_EQ(dl_node.m_txn_id, offset_root + leaf_id);
570       ASSERT_EQ(dl_node.m_cf_id, 0U);
571       ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
572       ASSERT_EQ(dl_node.m_exclusive, true);
573 
574       if (curr_waiting_key == 0) {
575         curr_waiting_key = leaf_id;
576       }
577       curr_waiting_key /= 2;
578       leaf_id /= 2;
579     }
580   }
581 
582   // Rollback the leaf transaction.
583   for (uint32_t i = 15; i < 31; i++) {
584     ASSERT_OK(txns[i]->Rollback());
585     delete txns[i];
586   }
587 
588   for (auto& t : threads) {
589     t.join();
590   }
591 
592   // Downsize the buffer and verify the 3 latest deadlocks are preserved.
593   auto dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
594   db->SetDeadlockInfoBufferSize(3);
595   auto dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
596   ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
597 
598   for (uint32_t i = 0; i < dlock_buffer_after_resize.size(); i++) {
599     for (uint32_t j = 0; j < dlock_buffer_after_resize[i].path.size(); j++) {
600       ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
601                 dlock_buffer_before_resize[i].path[j].m_txn_id);
602     }
603   }
604 
605   // Upsize the buffer and verify the 3 latest dealocks are preserved.
606   dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
607   db->SetDeadlockInfoBufferSize(5);
608   dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
609   ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
610 
611   for (uint32_t i = 0; i < dlock_buffer_before_resize.size(); i++) {
612     for (uint32_t j = 0; j < dlock_buffer_before_resize[i].path.size(); j++) {
613       ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
614                 dlock_buffer_before_resize[i].path[j].m_txn_id);
615     }
616   }
617 
618   // Downsize to 0 and verify the size is consistent.
619   dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
620   db->SetDeadlockInfoBufferSize(0);
621   dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
622   ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
623 
624   // Upsize from 0 to verify the size is persistent.
625   dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
626   db->SetDeadlockInfoBufferSize(3);
627   dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
628   ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
629 
630   // Contrived case of shared lock of cycle size 2 to verify that a shared
631   // lock causing a deadlock is correctly reported as "shared" in the buffer.
632   std::vector<Transaction*> txns_shared(2);
633 
634   // Create a cycle of size 2.
635   for (uint32_t i = 0; i < 2; i++) {
636     txns_shared[i] = db->BeginTransaction(write_options, txn_options);
637     ASSERT_TRUE(txns_shared[i]);
638     auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i), nullptr);
639     ASSERT_OK(s);
640   }
641 
642   std::atomic<uint32_t> checkpoints_shared(0);
643   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
644       "PointLockManager::AcquireWithTimeout:WaitingTxn",
645       [&](void* /*arg*/) { checkpoints_shared.fetch_add(1); });
646   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
647 
648   std::vector<port::Thread> threads_shared;
649   for (uint32_t i = 0; i < 1; i++) {
650     std::function<void()> blocking_thread = [&, i] {
651       auto s =
652           txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
653       ASSERT_OK(s);
654       ASSERT_OK(txns_shared[i]->Rollback());
655       delete txns_shared[i];
656     };
657     threads_shared.emplace_back(blocking_thread);
658   }
659 
660   // Wait until all threads are waiting on each other.
661   while (checkpoints_shared.load() != 1) {
662     /* sleep override */
663     std::this_thread::sleep_for(std::chrono::milliseconds(100));
664   }
665   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
666   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
667 
668   // Complete the cycle T2 -> T1 with a shared lock.
669   auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false);
670   ASSERT_TRUE(s.IsDeadlock());
671 
672   auto dlock_buffer = db->GetDeadlockInfoBuffer();
673 
674   // Verify the size of the buffer and the single path.
675   ASSERT_EQ(dlock_buffer.size(), 1);
676   ASSERT_EQ(dlock_buffer[0].path.size(), 2);
677 
678   // Verify the exclusivity field of the transactions in the deadlock path.
679   ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive);
680   ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive);
681   ASSERT_OK(txns_shared[1]->Rollback());
682   delete txns_shared[1];
683 
684   for (auto& t : threads_shared) {
685     t.join();
686   }
687 }
688 
689 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionStressTest,DeadlockCycle)690 TEST_P(TransactionStressTest, DeadlockCycle) {
691   WriteOptions write_options;
692   ReadOptions read_options;
693   TransactionOptions txn_options;
694 
695   // offset by 2 from the max depth to test edge case
696   const uint32_t kMaxCycleLength = 52;
697 
698   txn_options.lock_timeout = 1000000;
699   txn_options.deadlock_detect = true;
700 
701   for (uint32_t len = 2; len < kMaxCycleLength; len++) {
702     // Set up a long wait for chain like this:
703     //
704     // T1 -> T2 -> T3 -> ... -> Tlen
705 
706     std::vector<Transaction*> txns(len);
707 
708     for (uint32_t i = 0; i < len; i++) {
709       txns[i] = db->BeginTransaction(write_options, txn_options);
710       ASSERT_TRUE(txns[i]);
711       auto s = txns[i]->GetForUpdate(read_options, ToString(i), nullptr);
712       ASSERT_OK(s);
713     }
714 
715     std::atomic<uint32_t> checkpoints(0);
716     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
717         "PointLockManager::AcquireWithTimeout:WaitingTxn",
718         [&](void* /*arg*/) { checkpoints.fetch_add(1); });
719     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
720 
721     // We want the last transaction in the chain to block and hold everyone
722     // back.
723     std::vector<port::Thread> threads;
724     for (uint32_t i = 0; i + 1 < len; i++) {
725       std::function<void()> blocking_thread = [&, i] {
726         auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
727         ASSERT_OK(s);
728         ASSERT_OK(txns[i]->Rollback());
729         delete txns[i];
730       };
731       threads.emplace_back(blocking_thread);
732     }
733 
734     // Wait until all threads are waiting on each other.
735     while (checkpoints.load() != len - 1) {
736       /* sleep override */
737       std::this_thread::sleep_for(std::chrono::milliseconds(100));
738     }
739     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
740     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
741 
742     // Complete the cycle Tlen -> T1
743     auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr);
744     ASSERT_TRUE(s.IsDeadlock());
745 
746     const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1);
747     uint32_t curr_waiting_key = 0;
748     TransactionID curr_txn_id = txns[0]->GetID();
749 
750     auto dlock_buffer = db->GetDeadlockInfoBuffer();
751     ASSERT_EQ(dlock_buffer.size(), dlock_buffer_size_);
752     uint32_t check_len = len;
753     bool check_limit_flag = false;
754 
755     // Special case for a deadlock path that exceeds the maximum depth.
756     if (len > 50) {
757       check_len = 0;
758       check_limit_flag = true;
759     }
760     auto dlock_entry = dlock_buffer[0].path;
761     ASSERT_EQ(dlock_entry.size(), check_len);
762     ASSERT_EQ(dlock_buffer[0].limit_exceeded, check_limit_flag);
763 
764     int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
765     int64_t cur_deadlock_time = 0;
766     for (auto const& dl_path_rec : dlock_buffer) {
767       cur_deadlock_time = dl_path_rec.deadlock_time;
768       ASSERT_NE(cur_deadlock_time, 0);
769       ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
770       pre_deadlock_time = cur_deadlock_time;
771     }
772 
773     // Iterates backwards over path verifying decreasing txn_ids.
774     for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
775       auto dl_node = *it;
776       ASSERT_EQ(dl_node.m_txn_id, len + curr_txn_id - 1);
777       ASSERT_EQ(dl_node.m_cf_id, 0u);
778       ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
779       ASSERT_EQ(dl_node.m_exclusive, true);
780 
781       curr_txn_id--;
782       if (curr_waiting_key == 0) {
783         curr_waiting_key = len;
784       }
785       curr_waiting_key--;
786     }
787 
788     // Rollback the last transaction.
789     ASSERT_OK(txns[len - 1]->Rollback());
790     delete txns[len - 1];
791 
792     for (auto& t : threads) {
793       t.join();
794     }
795   }
796 }
797 
TEST_P(TransactionStressTest,DeadlockStress)798 TEST_P(TransactionStressTest, DeadlockStress) {
799   const uint32_t NUM_TXN_THREADS = 10;
800   const uint32_t NUM_KEYS = 100;
801   const uint32_t NUM_ITERS = 10000;
802 
803   WriteOptions write_options;
804   ReadOptions read_options;
805   TransactionOptions txn_options;
806 
807   txn_options.lock_timeout = 1000000;
808   txn_options.deadlock_detect = true;
809   std::vector<std::string> keys;
810 
811   for (uint32_t i = 0; i < NUM_KEYS; i++) {
812     ASSERT_OK(db->Put(write_options, Slice(ToString(i)), Slice("")));
813     keys.push_back(ToString(i));
814   }
815 
816   size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
817   Random rnd(static_cast<uint32_t>(tid));
818   std::function<void(uint32_t)> stress_thread = [&](uint32_t seed) {
819     std::default_random_engine g(seed);
820 
821     Transaction* txn;
822     for (uint32_t i = 0; i < NUM_ITERS; i++) {
823       txn = db->BeginTransaction(write_options, txn_options);
824       auto random_keys = keys;
825       std::shuffle(random_keys.begin(), random_keys.end(), g);
826 
827       // Lock keys in random order.
828       for (const auto& k : random_keys) {
829         // Lock mostly for shared access, but exclusive 1/4 of the time.
830         auto s =
831             txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0);
832         if (!s.ok()) {
833           ASSERT_TRUE(s.IsDeadlock());
834           ASSERT_OK(txn->Rollback());
835           break;
836         }
837       }
838 
839       delete txn;
840     }
841   };
842 
843   std::vector<port::Thread> threads;
844   for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
845     threads.emplace_back(stress_thread, rnd.Next());
846   }
847 
848   for (auto& t : threads) {
849     t.join();
850   }
851 }
852 #endif  // ROCKSDB_VALGRIND_RUN
853 
TEST_P(TransactionTest,CommitTimeBatchFailTest)854 TEST_P(TransactionTest, CommitTimeBatchFailTest) {
855   WriteOptions write_options;
856   TransactionOptions txn_options;
857 
858   std::string value;
859   Status s;
860 
861   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
862   ASSERT_TRUE(txn1);
863 
864   ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"));
865 
866   s = txn1->Put("foo", "bar");
867   ASSERT_OK(s);
868 
869   // fails due to non-empty commit-time batch
870   s = txn1->Commit();
871   ASSERT_EQ(s, Status::InvalidArgument());
872 
873   delete txn1;
874 }
875 
TEST_P(TransactionTest,LogMarkLeakTest)876 TEST_P(TransactionTest, LogMarkLeakTest) {
877   TransactionOptions txn_options;
878   WriteOptions write_options;
879   options.write_buffer_size = 1024;
880   ASSERT_OK(ReOpenNoDelete());
881   assert(db != nullptr);
882   Random rnd(47);
883   std::vector<Transaction*> txns;
884   DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
885   // At the beginning there should be no log containing prepare data
886   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
887   for (size_t i = 0; i < 100; i++) {
888     Transaction* txn = db->BeginTransaction(write_options, txn_options);
889     ASSERT_OK(txn->SetName("xid" + ToString(i)));
890     ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar")));
891     ASSERT_OK(txn->Prepare());
892     ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
893     if (rnd.OneIn(5)) {
894       txns.push_back(txn);
895     } else {
896       ASSERT_OK(txn->Commit());
897       delete txn;
898     }
899     ASSERT_OK(db_impl->TEST_FlushMemTable(true));
900   }
901   for (auto txn : txns) {
902     ASSERT_OK(txn->Commit());
903     delete txn;
904   }
905   // At the end there should be no log left containing prepare data
906   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
907   // Make sure that the underlying data structures are properly truncated and
908   // cause not leak
909   ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0);
910   ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0);
911 }
912 
TEST_P(TransactionTest,SimpleTwoPhaseTransactionTest)913 TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
914   for (bool cwb4recovery : {true, false}) {
915     ASSERT_OK(ReOpen());
916     WriteOptions write_options;
917     ReadOptions read_options;
918 
919     TransactionOptions txn_options;
920     txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery;
921 
922     string value;
923     Status s;
924 
925     DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
926 
927     Transaction* txn = db->BeginTransaction(write_options, txn_options);
928     s = txn->SetName("xid");
929     ASSERT_OK(s);
930 
931     ASSERT_EQ(db->GetTransactionByName("xid"), txn);
932 
933     // transaction put
934     s = txn->Put(Slice("foo"), Slice("bar"));
935     ASSERT_OK(s);
936     ASSERT_EQ(1, txn->GetNumPuts());
937 
938     // regular db put
939     s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
940     ASSERT_OK(s);
941     ASSERT_EQ(1, txn->GetNumPuts());
942 
943     // regular db read
944     ASSERT_OK(db->Get(read_options, "foo2", &value));
945     ASSERT_EQ(value, "bar2");
946 
947     // commit time put
948     ASSERT_OK(
949         txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")));
950     ASSERT_OK(
951         txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")));
952 
953     // nothing has been prepped yet
954     ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
955 
956     s = txn->Prepare();
957     ASSERT_OK(s);
958 
959     // data not im mem yet
960     s = db->Get(read_options, Slice("foo"), &value);
961     ASSERT_TRUE(s.IsNotFound());
962     s = db->Get(read_options, Slice("gtid"), &value);
963     ASSERT_TRUE(s.IsNotFound());
964 
965     // find trans in list of prepared transactions
966     std::vector<Transaction*> prepared_trans;
967     db->GetAllPreparedTransactions(&prepared_trans);
968     ASSERT_EQ(prepared_trans.size(), 1);
969     ASSERT_EQ(prepared_trans.front()->GetName(), "xid");
970 
971     auto log_containing_prep =
972         db_impl->TEST_FindMinLogContainingOutstandingPrep();
973     ASSERT_GT(log_containing_prep, 0);
974 
975     // make commit
976     s = txn->Commit();
977     ASSERT_OK(s);
978 
979     // value is now available
980     s = db->Get(read_options, "foo", &value);
981     ASSERT_OK(s);
982     ASSERT_EQ(value, "bar");
983 
984     if (!cwb4recovery) {
985       s = db->Get(read_options, "gtid", &value);
986       ASSERT_OK(s);
987       ASSERT_EQ(value, "dogs");
988 
989       s = db->Get(read_options, "gtid2", &value);
990       ASSERT_OK(s);
991       ASSERT_EQ(value, "cats");
992     }
993 
994     // we already committed
995     s = txn->Commit();
996     ASSERT_EQ(s, Status::InvalidArgument());
997 
998     // no longer is prepared results
999     db->GetAllPreparedTransactions(&prepared_trans);
1000     ASSERT_EQ(prepared_trans.size(), 0);
1001     ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1002 
1003     // heap should not care about prepared section anymore
1004     ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1005 
1006     switch (txn_db_options.write_policy) {
1007       case WRITE_COMMITTED:
1008         // but now our memtable should be referencing the prep section
1009         ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1010         ASSERT_EQ(log_containing_prep,
1011                   db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1012         break;
1013       case WRITE_PREPARED:
1014       case WRITE_UNPREPARED:
1015         // In these modes memtable do not ref the prep sections
1016         ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1017         break;
1018       default:
1019         assert(false);
1020     }
1021 
1022     ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1023     // After flush the recoverable state must be visible
1024     if (cwb4recovery) {
1025       s = db->Get(read_options, "gtid", &value);
1026       ASSERT_OK(s);
1027       ASSERT_EQ(value, "dogs");
1028 
1029       s = db->Get(read_options, "gtid2", &value);
1030       ASSERT_OK(s);
1031       ASSERT_EQ(value, "cats");
1032     }
1033 
1034     // after memtable flush we can now relese the log
1035     ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
1036     ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1037 
1038     delete txn;
1039 
1040     if (cwb4recovery) {
1041       // kill and reopen to trigger recovery
1042       s = ReOpenNoDelete();
1043       ASSERT_OK(s);
1044       assert(db != nullptr);
1045       s = db->Get(read_options, "gtid", &value);
1046       ASSERT_OK(s);
1047       ASSERT_EQ(value, "dogs");
1048 
1049       s = db->Get(read_options, "gtid2", &value);
1050       ASSERT_OK(s);
1051       ASSERT_EQ(value, "cats");
1052     }
1053   }
1054 }
1055 
TEST_P(TransactionTest,TwoPhaseNameTest)1056 TEST_P(TransactionTest, TwoPhaseNameTest) {
1057   Status s;
1058 
1059   WriteOptions write_options;
1060   TransactionOptions txn_options;
1061   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1062   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1063   Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
1064   ASSERT_TRUE(txn3);
1065   delete txn3;
1066 
1067   // cant prepare txn without name
1068   s = txn1->Prepare();
1069   ASSERT_EQ(s, Status::InvalidArgument());
1070 
1071   // name too short
1072   s = txn1->SetName("");
1073   ASSERT_EQ(s, Status::InvalidArgument());
1074 
1075   // name too long
1076   s = txn1->SetName(std::string(513, 'x'));
1077   ASSERT_EQ(s, Status::InvalidArgument());
1078 
1079   // valid set name
1080   s = txn1->SetName("name1");
1081   ASSERT_OK(s);
1082 
1083   // cant have duplicate name
1084   s = txn2->SetName("name1");
1085   ASSERT_EQ(s, Status::InvalidArgument());
1086 
1087   // shouldn't be able to prepare
1088   s = txn2->Prepare();
1089   ASSERT_EQ(s, Status::InvalidArgument());
1090 
1091   // valid name set
1092   s = txn2->SetName("name2");
1093   ASSERT_OK(s);
1094 
1095   // cant reset name
1096   s = txn2->SetName("name3");
1097   ASSERT_EQ(s, Status::InvalidArgument());
1098 
1099   ASSERT_EQ(txn1->GetName(), "name1");
1100   ASSERT_EQ(txn2->GetName(), "name2");
1101 
1102   s = txn1->Prepare();
1103   ASSERT_OK(s);
1104 
1105   // can't rename after prepare
1106   s = txn1->SetName("name4");
1107   ASSERT_EQ(s, Status::InvalidArgument());
1108 
1109   ASSERT_OK(txn1->Rollback());
1110   ASSERT_OK(txn2->Rollback());
1111   delete txn1;
1112   delete txn2;
1113 }
1114 
TEST_P(TransactionTest,TwoPhaseEmptyWriteTest)1115 TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
1116   for (bool cwb4recovery : {true, false}) {
1117     for (bool test_with_empty_wal : {true, false}) {
1118       if (!cwb4recovery && test_with_empty_wal) {
1119         continue;
1120       }
1121       ASSERT_OK(ReOpen());
1122       Status s;
1123       std::string value;
1124 
1125       WriteOptions write_options;
1126       ReadOptions read_options;
1127       TransactionOptions txn_options;
1128       txn_options.use_only_the_last_commit_time_batch_for_recovery =
1129           cwb4recovery;
1130       Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1131       ASSERT_TRUE(txn1);
1132       Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1133       ASSERT_TRUE(txn2);
1134 
1135       s = txn1->SetName("joe");
1136       ASSERT_OK(s);
1137 
1138       s = txn2->SetName("bob");
1139       ASSERT_OK(s);
1140 
1141       s = txn1->Prepare();
1142       ASSERT_OK(s);
1143 
1144       s = txn1->Commit();
1145       ASSERT_OK(s);
1146 
1147       delete txn1;
1148 
1149       ASSERT_OK(
1150           txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")));
1151 
1152       s = txn2->Prepare();
1153       ASSERT_OK(s);
1154 
1155       s = txn2->Commit();
1156       ASSERT_OK(s);
1157 
1158       delete txn2;
1159       if (!cwb4recovery) {
1160         s = db->Get(read_options, "foo", &value);
1161         ASSERT_OK(s);
1162         ASSERT_EQ(value, "bar");
1163       } else {
1164         if (test_with_empty_wal) {
1165           DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1166           ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1167           // After flush the state must be visible
1168           s = db->Get(read_options, "foo", &value);
1169           ASSERT_OK(s);
1170           ASSERT_EQ(value, "bar");
1171         }
1172         ASSERT_OK(db->FlushWAL(true));
1173         // kill and reopen to trigger recovery
1174         s = ReOpenNoDelete();
1175         ASSERT_OK(s);
1176         assert(db != nullptr);
1177         s = db->Get(read_options, "foo", &value);
1178         ASSERT_OK(s);
1179         ASSERT_EQ(value, "bar");
1180       }
1181     }
1182   }
1183 }
1184 
1185 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionStressTest,TwoPhaseExpirationTest)1186 TEST_P(TransactionStressTest, TwoPhaseExpirationTest) {
1187   Status s;
1188 
1189   WriteOptions write_options;
1190   TransactionOptions txn_options;
1191   txn_options.expiration = 500;  // 500ms
1192   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1193   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1194   ASSERT_TRUE(txn1);
1195   ASSERT_TRUE(txn1);
1196 
1197   s = txn1->SetName("joe");
1198   ASSERT_OK(s);
1199   s = txn2->SetName("bob");
1200   ASSERT_OK(s);
1201 
1202   s = txn1->Prepare();
1203   ASSERT_OK(s);
1204 
1205   /* sleep override */
1206   std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1207 
1208   s = txn1->Commit();
1209   ASSERT_OK(s);
1210 
1211   s = txn2->Prepare();
1212   ASSERT_EQ(s, Status::Expired());
1213 
1214   delete txn1;
1215   delete txn2;
1216 }
1217 
TEST_P(TransactionTest,TwoPhaseRollbackTest)1218 TEST_P(TransactionTest, TwoPhaseRollbackTest) {
1219   WriteOptions write_options;
1220   ReadOptions read_options;
1221 
1222   TransactionOptions txn_options;
1223 
1224   std::string value;
1225   Status s;
1226 
1227   DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1228   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1229   s = txn->SetName("xid");
1230   ASSERT_OK(s);
1231 
1232   // transaction put
1233   s = txn->Put(Slice("tfoo"), Slice("tbar"));
1234   ASSERT_OK(s);
1235 
1236   // value is readable form txn
1237   s = txn->Get(read_options, Slice("tfoo"), &value);
1238   ASSERT_OK(s);
1239   ASSERT_EQ(value, "tbar");
1240 
1241   // issue rollback
1242   s = txn->Rollback();
1243   ASSERT_OK(s);
1244 
1245   // value is nolonger readable
1246   s = txn->Get(read_options, Slice("tfoo"), &value);
1247   ASSERT_TRUE(s.IsNotFound());
1248   ASSERT_EQ(txn->GetNumPuts(), 0);
1249 
1250   // put new txn values
1251   s = txn->Put(Slice("tfoo2"), Slice("tbar2"));
1252   ASSERT_OK(s);
1253 
1254   // new value is readable from txn
1255   s = txn->Get(read_options, Slice("tfoo2"), &value);
1256   ASSERT_OK(s);
1257   ASSERT_EQ(value, "tbar2");
1258 
1259   s = txn->Prepare();
1260   ASSERT_OK(s);
1261 
1262   // flush to next wal
1263   s = db->Put(write_options, Slice("foo"), Slice("bar"));
1264   ASSERT_OK(s);
1265   ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1266 
1267   // issue rollback (marker written to WAL)
1268   s = txn->Rollback();
1269   ASSERT_OK(s);
1270 
1271   // value is nolonger readable
1272   s = txn->Get(read_options, Slice("tfoo2"), &value);
1273   ASSERT_TRUE(s.IsNotFound());
1274   ASSERT_EQ(txn->GetNumPuts(), 0);
1275 
1276   // make commit
1277   s = txn->Commit();
1278   ASSERT_EQ(s, Status::InvalidArgument());
1279 
1280   // try rollback again
1281   s = txn->Rollback();
1282   ASSERT_EQ(s, Status::InvalidArgument());
1283 
1284   delete txn;
1285 }
1286 
TEST_P(TransactionTest,PersistentTwoPhaseTransactionTest)1287 TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
1288   WriteOptions write_options;
1289   write_options.sync = true;
1290   write_options.disableWAL = false;
1291   ReadOptions read_options;
1292 
1293   TransactionOptions txn_options;
1294 
1295   std::string value;
1296   Status s;
1297 
1298   DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1299 
1300   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1301   s = txn->SetName("xid");
1302   ASSERT_OK(s);
1303 
1304   ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1305 
1306   // transaction put
1307   s = txn->Put(Slice("foo"), Slice("bar"));
1308   ASSERT_OK(s);
1309   ASSERT_EQ(1, txn->GetNumPuts());
1310 
1311   // txn read
1312   s = txn->Get(read_options, "foo", &value);
1313   ASSERT_OK(s);
1314   ASSERT_EQ(value, "bar");
1315 
1316   // regular db put
1317   s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
1318   ASSERT_OK(s);
1319   ASSERT_EQ(1, txn->GetNumPuts());
1320 
1321   ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1322 
1323   // regular db read
1324   db->Get(read_options, "foo2", &value);
1325   ASSERT_EQ(value, "bar2");
1326 
1327   // nothing has been prepped yet
1328   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1329 
1330   // prepare
1331   s = txn->Prepare();
1332   ASSERT_OK(s);
1333 
1334   // still not available to db
1335   s = db->Get(read_options, Slice("foo"), &value);
1336   ASSERT_TRUE(s.IsNotFound());
1337 
1338   ASSERT_OK(db->FlushWAL(false));
1339   delete txn;
1340   // kill and reopen
1341   reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1342   s = ReOpenNoDelete();
1343   ASSERT_OK(s);
1344   assert(db != nullptr);
1345   db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1346 
1347   // find trans in list of prepared transactions
1348   std::vector<Transaction*> prepared_trans;
1349   db->GetAllPreparedTransactions(&prepared_trans);
1350   ASSERT_EQ(prepared_trans.size(), 1);
1351 
1352   txn = prepared_trans.front();
1353   ASSERT_TRUE(txn);
1354   ASSERT_EQ(txn->GetName(), "xid");
1355   ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1356 
1357   // log has been marked
1358   auto log_containing_prep =
1359       db_impl->TEST_FindMinLogContainingOutstandingPrep();
1360   ASSERT_GT(log_containing_prep, 0);
1361 
1362   // value is readable from txn
1363   s = txn->Get(read_options, "foo", &value);
1364   ASSERT_OK(s);
1365   ASSERT_EQ(value, "bar");
1366 
1367   // make commit
1368   s = txn->Commit();
1369   ASSERT_OK(s);
1370 
1371   // value is now available
1372   db->Get(read_options, "foo", &value);
1373   ASSERT_EQ(value, "bar");
1374 
1375   // we already committed
1376   s = txn->Commit();
1377   ASSERT_EQ(s, Status::InvalidArgument());
1378 
1379   // no longer is prepared results
1380   prepared_trans.clear();
1381   db->GetAllPreparedTransactions(&prepared_trans);
1382   ASSERT_EQ(prepared_trans.size(), 0);
1383 
1384   // transaction should no longer be visible
1385   ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1386 
1387   // heap should not care about prepared section anymore
1388   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1389 
1390   switch (txn_db_options.write_policy) {
1391     case WRITE_COMMITTED:
1392       // but now our memtable should be referencing the prep section
1393       ASSERT_EQ(log_containing_prep,
1394                 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1395       ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1396 
1397       break;
1398     case WRITE_PREPARED:
1399     case WRITE_UNPREPARED:
1400       // In these modes memtable do not ref the prep sections
1401       ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1402       break;
1403     default:
1404       assert(false);
1405   }
1406 
1407   // Add a dummy record to memtable before a flush. Otherwise, the
1408   // memtable will be empty and flush will be skipped.
1409   s = db->Put(write_options, Slice("foo3"), Slice("bar3"));
1410   ASSERT_OK(s);
1411 
1412   ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1413 
1414   // after memtable flush we can now release the log
1415   ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
1416   ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1417 
1418   delete txn;
1419 
1420   // deleting transaction should unregister transaction
1421   ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1422 }
1423 #endif  // ROCKSDB_VALGRIND_RUN
1424 
1425 // TODO this test needs to be updated with serial commits
TEST_P(TransactionTest,DISABLED_TwoPhaseMultiThreadTest)1426 TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) {
1427   // mix transaction writes and regular writes
1428   const uint32_t NUM_TXN_THREADS = 50;
1429   std::atomic<uint32_t> txn_thread_num(0);
1430 
1431   std::function<void()> txn_write_thread = [&]() {
1432     uint32_t id = txn_thread_num.fetch_add(1);
1433 
1434     WriteOptions write_options;
1435     write_options.sync = true;
1436     write_options.disableWAL = false;
1437     TransactionOptions txn_options;
1438     txn_options.lock_timeout = 1000000;
1439     if (id % 2 == 0) {
1440       txn_options.expiration = 1000000;
1441     }
1442     TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(id)));
1443     Transaction* txn = db->BeginTransaction(write_options, txn_options);
1444     ASSERT_OK(txn->SetName(name));
1445     for (int i = 0; i < 10; i++) {
1446       std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
1447       ASSERT_OK(txn->Put(key, "val"));
1448     }
1449     ASSERT_OK(txn->Prepare());
1450     ASSERT_OK(txn->Commit());
1451     delete txn;
1452   };
1453 
1454   // assure that all thread are in the same write group
1455   std::atomic<uint32_t> t_wait_on_prepare(0);
1456   std::atomic<uint32_t> t_wait_on_commit(0);
1457 
1458   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1459       "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
1460         auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
1461 
1462         if (writer->ShouldWriteToWAL()) {
1463           t_wait_on_prepare.fetch_add(1);
1464           // wait for friends
1465           while (t_wait_on_prepare.load() < NUM_TXN_THREADS) {
1466             env->SleepForMicroseconds(10);
1467           }
1468         } else if (writer->ShouldWriteToMemtable()) {
1469           t_wait_on_commit.fetch_add(1);
1470           // wait for friends
1471           while (t_wait_on_commit.load() < NUM_TXN_THREADS) {
1472             env->SleepForMicroseconds(10);
1473           }
1474         } else {
1475           FAIL();
1476         }
1477       });
1478 
1479   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1480 
1481   // do all the writes
1482   std::vector<port::Thread> threads;
1483   for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
1484     threads.emplace_back(txn_write_thread);
1485   }
1486   for (auto& t : threads) {
1487     t.join();
1488   }
1489 
1490   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1491   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1492 
1493   ReadOptions read_options;
1494   std::string value;
1495   Status s;
1496   for (uint32_t t = 0; t < NUM_TXN_THREADS; t++) {
1497     TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(t)));
1498     for (int i = 0; i < 10; i++) {
1499       std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
1500       s = db->Get(read_options, key, &value);
1501       ASSERT_OK(s);
1502       ASSERT_EQ(value, "val");
1503     }
1504   }
1505 }
1506 
TEST_P(TransactionStressTest,TwoPhaseLongPrepareTest)1507 TEST_P(TransactionStressTest, TwoPhaseLongPrepareTest) {
1508   WriteOptions write_options;
1509   write_options.sync = true;
1510   write_options.disableWAL = false;
1511   ReadOptions read_options;
1512   TransactionOptions txn_options;
1513 
1514   std::string value;
1515   Status s;
1516 
1517   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1518   s = txn->SetName("bob");
1519   ASSERT_OK(s);
1520 
1521   // transaction put
1522   s = txn->Put(Slice("foo"), Slice("bar"));
1523   ASSERT_OK(s);
1524 
1525   // prepare
1526   s = txn->Prepare();
1527   ASSERT_OK(s);
1528 
1529   delete txn;
1530 
1531   for (int i = 0; i < 1000; i++) {
1532     std::string key(i, 'k');
1533     std::string val(1000, 'v');
1534     assert(db != nullptr);
1535     s = db->Put(write_options, key, val);
1536     ASSERT_OK(s);
1537 
1538     if (i % 29 == 0) {
1539       // crash
1540       env->SetFilesystemActive(false);
1541       reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1542       ReOpenNoDelete();
1543     } else if (i % 37 == 0) {
1544       // close
1545       ReOpenNoDelete();
1546     }
1547   }
1548 
1549   // commit old txn
1550   txn = db->GetTransactionByName("bob");
1551   ASSERT_TRUE(txn);
1552   s = txn->Commit();
1553   ASSERT_OK(s);
1554 
1555   // verify data txn data
1556   s = db->Get(read_options, "foo", &value);
1557   ASSERT_EQ(s, Status::OK());
1558   ASSERT_EQ(value, "bar");
1559 
1560   // verify non txn data
1561   for (int i = 0; i < 1000; i++) {
1562     std::string key(i, 'k');
1563     std::string val(1000, 'v');
1564     s = db->Get(read_options, key, &value);
1565     ASSERT_EQ(s, Status::OK());
1566     ASSERT_EQ(value, val);
1567   }
1568 
1569   delete txn;
1570 }
1571 
TEST_P(TransactionTest,TwoPhaseSequenceTest)1572 TEST_P(TransactionTest, TwoPhaseSequenceTest) {
1573   WriteOptions write_options;
1574   write_options.sync = true;
1575   write_options.disableWAL = false;
1576   ReadOptions read_options;
1577 
1578   TransactionOptions txn_options;
1579 
1580   std::string value;
1581   Status s;
1582 
1583   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1584   s = txn->SetName("xid");
1585   ASSERT_OK(s);
1586 
1587   // transaction put
1588   s = txn->Put(Slice("foo"), Slice("bar"));
1589   ASSERT_OK(s);
1590   s = txn->Put(Slice("foo2"), Slice("bar2"));
1591   ASSERT_OK(s);
1592   s = txn->Put(Slice("foo3"), Slice("bar3"));
1593   ASSERT_OK(s);
1594   s = txn->Put(Slice("foo4"), Slice("bar4"));
1595   ASSERT_OK(s);
1596 
1597   // prepare
1598   s = txn->Prepare();
1599   ASSERT_OK(s);
1600 
1601   // make commit
1602   s = txn->Commit();
1603   ASSERT_OK(s);
1604 
1605   delete txn;
1606 
1607   // kill and reopen
1608   env->SetFilesystemActive(false);
1609   ReOpenNoDelete();
1610   assert(db != nullptr);
1611 
1612   // value is now available
1613   s = db->Get(read_options, "foo4", &value);
1614   ASSERT_EQ(s, Status::OK());
1615   ASSERT_EQ(value, "bar4");
1616 }
1617 
TEST_P(TransactionTest,TwoPhaseDoubleRecoveryTest)1618 TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) {
1619   WriteOptions write_options;
1620   write_options.sync = true;
1621   write_options.disableWAL = false;
1622   ReadOptions read_options;
1623 
1624   TransactionOptions txn_options;
1625 
1626   std::string value;
1627   Status s;
1628 
1629   Transaction* txn = db->BeginTransaction(write_options, txn_options);
1630   s = txn->SetName("a");
1631   ASSERT_OK(s);
1632 
1633   // transaction put
1634   s = txn->Put(Slice("foo"), Slice("bar"));
1635   ASSERT_OK(s);
1636 
1637   // prepare
1638   s = txn->Prepare();
1639   ASSERT_OK(s);
1640 
1641   delete txn;
1642 
1643   // kill and reopen
1644   env->SetFilesystemActive(false);
1645   reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1646   ReOpenNoDelete();
1647 
1648   // commit old txn
1649   assert(db != nullptr);  // Make clang analyze happy.
1650   txn = db->GetTransactionByName("a");
1651   assert(txn != nullptr);
1652   s = txn->Commit();
1653   ASSERT_OK(s);
1654 
1655   s = db->Get(read_options, "foo", &value);
1656   ASSERT_EQ(s, Status::OK());
1657   ASSERT_EQ(value, "bar");
1658 
1659   delete txn;
1660 
1661   txn = db->BeginTransaction(write_options, txn_options);
1662   s = txn->SetName("b");
1663   ASSERT_OK(s);
1664 
1665   s = txn->Put(Slice("foo2"), Slice("bar2"));
1666   ASSERT_OK(s);
1667 
1668   s = txn->Prepare();
1669   ASSERT_OK(s);
1670 
1671   s = txn->Commit();
1672   ASSERT_OK(s);
1673 
1674   delete txn;
1675 
1676   // kill and reopen
1677   env->SetFilesystemActive(false);
1678   ASSERT_OK(ReOpenNoDelete());
1679   assert(db != nullptr);
1680 
1681   // value is now available
1682   s = db->Get(read_options, "foo", &value);
1683   ASSERT_EQ(s, Status::OK());
1684   ASSERT_EQ(value, "bar");
1685 
1686   s = db->Get(read_options, "foo2", &value);
1687   ASSERT_EQ(s, Status::OK());
1688   ASSERT_EQ(value, "bar2");
1689 }
1690 
TEST_P(TransactionTest,TwoPhaseLogRollingTest)1691 TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
1692   DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1693 
1694   Status s;
1695   std::string v;
1696   ColumnFamilyHandle *cfa, *cfb;
1697 
1698   // Create 2 new column families
1699   ColumnFamilyOptions cf_options;
1700   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1701   ASSERT_OK(s);
1702   s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1703   ASSERT_OK(s);
1704 
1705   WriteOptions wopts;
1706   wopts.disableWAL = false;
1707   wopts.sync = true;
1708 
1709   TransactionOptions topts1;
1710   Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1711   s = txn1->SetName("xid1");
1712   ASSERT_OK(s);
1713 
1714   TransactionOptions topts2;
1715   Transaction* txn2 = db->BeginTransaction(wopts, topts2);
1716   s = txn2->SetName("xid2");
1717   ASSERT_OK(s);
1718 
1719   // transaction put in two column families
1720   s = txn1->Put(cfa, "ka1", "va1");
1721   ASSERT_OK(s);
1722 
1723   // transaction put in two column families
1724   s = txn2->Put(cfa, "ka2", "va2");
1725   ASSERT_OK(s);
1726   s = txn2->Put(cfb, "kb2", "vb2");
1727   ASSERT_OK(s);
1728 
1729   // write prep section to wal
1730   s = txn1->Prepare();
1731   ASSERT_OK(s);
1732 
1733   // our log should be in the heap
1734   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1735             txn1->GetLogNumber());
1736   ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
1737 
1738   // flush default cf to crate new log
1739   s = db->Put(wopts, "foo", "bar");
1740   ASSERT_OK(s);
1741   s = db_impl->TEST_FlushMemTable(true);
1742   ASSERT_OK(s);
1743 
1744   // make sure we are on a new log
1745   ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
1746 
1747   // put txn2 prep section in this log
1748   s = txn2->Prepare();
1749   ASSERT_OK(s);
1750   ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
1751 
1752   // heap should still see first log
1753   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1754             txn1->GetLogNumber());
1755 
1756   // commit txn1
1757   s = txn1->Commit();
1758   ASSERT_OK(s);
1759 
1760   // heap should now show txn2s log
1761   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1762             txn2->GetLogNumber());
1763 
1764   switch (txn_db_options.write_policy) {
1765     case WRITE_COMMITTED:
1766       // we should see txn1s log refernced by the memtables
1767       ASSERT_EQ(txn1->GetLogNumber(),
1768                 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1769       break;
1770     case WRITE_PREPARED:
1771     case WRITE_UNPREPARED:
1772       // In these modes memtable do not ref the prep sections
1773       ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1774       break;
1775     default:
1776       assert(false);
1777   }
1778 
1779   // flush default cf to crate new log
1780   s = db->Put(wopts, "foo", "bar2");
1781   ASSERT_OK(s);
1782   s = db_impl->TEST_FlushMemTable(true);
1783   ASSERT_OK(s);
1784 
1785   // make sure we are on a new log
1786   ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
1787 
1788   // commit txn2
1789   s = txn2->Commit();
1790   ASSERT_OK(s);
1791 
1792   // heap should not show any logs
1793   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1794 
1795   switch (txn_db_options.write_policy) {
1796     case WRITE_COMMITTED:
1797       // should show the first txn log
1798       ASSERT_EQ(txn1->GetLogNumber(),
1799                 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1800       break;
1801     case WRITE_PREPARED:
1802     case WRITE_UNPREPARED:
1803       // In these modes memtable do not ref the prep sections
1804       ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1805       break;
1806     default:
1807       assert(false);
1808   }
1809 
1810   // flush only cfa memtable
1811   s = db_impl->TEST_FlushMemTable(true, false, cfa);
1812   ASSERT_OK(s);
1813 
1814   switch (txn_db_options.write_policy) {
1815     case WRITE_COMMITTED:
1816       // should show the first txn log
1817       ASSERT_EQ(txn2->GetLogNumber(),
1818                 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1819       break;
1820     case WRITE_PREPARED:
1821     case WRITE_UNPREPARED:
1822       // In these modes memtable do not ref the prep sections
1823       ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1824       break;
1825     default:
1826       assert(false);
1827   }
1828 
1829   // flush only cfb memtable
1830   s = db_impl->TEST_FlushMemTable(true, false, cfb);
1831   ASSERT_OK(s);
1832 
1833   // should show not dependency on logs
1834   ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1835   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1836 
1837   delete txn1;
1838   delete txn2;
1839   delete cfa;
1840   delete cfb;
1841 }
1842 
TEST_P(TransactionTest,TwoPhaseLogRollingTest2)1843 TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
1844   DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1845 
1846   Status s;
1847   ColumnFamilyHandle *cfa, *cfb;
1848 
1849   ColumnFamilyOptions cf_options;
1850   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1851   ASSERT_OK(s);
1852   s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1853   ASSERT_OK(s);
1854 
1855   WriteOptions wopts;
1856   wopts.disableWAL = false;
1857   wopts.sync = true;
1858 
1859   auto cfh_a = static_cast_with_check<ColumnFamilyHandleImpl>(cfa);
1860   auto cfh_b = static_cast_with_check<ColumnFamilyHandleImpl>(cfb);
1861 
1862   TransactionOptions topts1;
1863   Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1864   s = txn1->SetName("xid1");
1865   ASSERT_OK(s);
1866   s = txn1->Put(cfa, "boys", "girls1");
1867   ASSERT_OK(s);
1868 
1869   Transaction* txn2 = db->BeginTransaction(wopts, topts1);
1870   s = txn2->SetName("xid2");
1871   ASSERT_OK(s);
1872   s = txn2->Put(cfb, "up", "down1");
1873   ASSERT_OK(s);
1874 
1875   // prepre transaction in LOG A
1876   s = txn1->Prepare();
1877   ASSERT_OK(s);
1878 
1879   // prepre transaction in LOG A
1880   s = txn2->Prepare();
1881   ASSERT_OK(s);
1882 
1883   // regular put so that mem table can actually be flushed for log rolling
1884   s = db->Put(wopts, "cats", "dogs1");
1885   ASSERT_OK(s);
1886 
1887   auto prepare_log_no = txn1->GetLastLogNumber();
1888 
1889   // roll to LOG B
1890   s = db_impl->TEST_FlushMemTable(true);
1891   ASSERT_OK(s);
1892 
1893   // now we pause background work so that
1894   // imm()s are not flushed before we can check their status
1895   s = db_impl->PauseBackgroundWork();
1896   ASSERT_OK(s);
1897 
1898   ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
1899   switch (txn_db_options.write_policy) {
1900     case WRITE_COMMITTED:
1901       // This cf is empty and should ref the latest log
1902       ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1903       ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
1904       break;
1905     case WRITE_PREPARED:
1906     case WRITE_UNPREPARED:
1907       // This cf is not flushed yet and should ref the log that has its data
1908       ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1909       break;
1910     default:
1911       assert(false);
1912   }
1913   ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1914             txn1->GetLogNumber());
1915   ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1916 
1917   // commit in LOG B
1918   s = txn1->Commit();
1919   ASSERT_OK(s);
1920 
1921   switch (txn_db_options.write_policy) {
1922     case WRITE_COMMITTED:
1923       ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
1924                 prepare_log_no);
1925       break;
1926     case WRITE_PREPARED:
1927     case WRITE_UNPREPARED:
1928       // In these modes memtable do not ref the prep sections
1929       ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1930       break;
1931     default:
1932       assert(false);
1933   }
1934 
1935   ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
1936 
1937   // request a flush for all column families such that the earliest
1938   // alive log file can be killed
1939   ASSERT_OK(db_impl->TEST_SwitchWAL());
1940   // log cannot be flushed because txn2 has not been commited
1941   ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
1942   ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog());
1943 
1944   // assert that cfa has a flush requested
1945   ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
1946 
1947   switch (txn_db_options.write_policy) {
1948     case WRITE_COMMITTED:
1949       // cfb should not be flushed becuse it has no data from LOG A
1950       ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
1951       break;
1952     case WRITE_PREPARED:
1953     case WRITE_UNPREPARED:
1954       // cfb should be flushed becuse it has prepared data from LOG A
1955       ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1956       break;
1957     default:
1958       assert(false);
1959   }
1960 
1961   // cfb now has data from LOG A
1962   s = txn2->Commit();
1963   ASSERT_OK(s);
1964 
1965   ASSERT_OK(db_impl->TEST_SwitchWAL());
1966   ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
1967 
1968   // we should see that cfb now has a flush requested
1969   ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1970 
1971   // all data in LOG A resides in a memtable that has been
1972   // requested for a flush
1973   ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
1974 
1975   delete txn1;
1976   delete txn2;
1977   delete cfa;
1978   delete cfb;
1979 }
1980 /*
1981  * 1) use prepare to keep first log around to determine starting sequence
1982  * during recovery.
1983  * 2) insert many values, skipping wal, to increase seqid.
1984  * 3) insert final value into wal
1985  * 4) recover and see that final value was properly recovered - not
1986  * hidden behind improperly summed sequence ids
1987  */
TEST_P(TransactionTest,TwoPhaseOutOfOrderDelete)1988 TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
1989   DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1990   WriteOptions wal_on, wal_off;
1991   wal_on.sync = true;
1992   wal_on.disableWAL = false;
1993   wal_off.disableWAL = true;
1994   ReadOptions read_options;
1995   TransactionOptions txn_options;
1996 
1997   std::string value;
1998   Status s;
1999 
2000   Transaction* txn1 = db->BeginTransaction(wal_on, txn_options);
2001 
2002   s = txn1->SetName("1");
2003   ASSERT_OK(s);
2004 
2005   s = db->Put(wal_on, "first", "first");
2006   ASSERT_OK(s);
2007 
2008   s = txn1->Put(Slice("dummy"), Slice("dummy"));
2009   ASSERT_OK(s);
2010   s = txn1->Prepare();
2011   ASSERT_OK(s);
2012 
2013   s = db->Put(wal_off, "cats", "dogs1");
2014   ASSERT_OK(s);
2015   s = db->Put(wal_off, "cats", "dogs2");
2016   ASSERT_OK(s);
2017   s = db->Put(wal_off, "cats", "dogs3");
2018   ASSERT_OK(s);
2019 
2020   s = db_impl->TEST_FlushMemTable(true);
2021   ASSERT_OK(s);
2022 
2023   s = db->Put(wal_on, "cats", "dogs4");
2024   ASSERT_OK(s);
2025 
2026   ASSERT_OK(db->FlushWAL(false));
2027 
2028   // kill and reopen
2029   env->SetFilesystemActive(false);
2030   reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
2031   ASSERT_OK(ReOpenNoDelete());
2032   assert(db != nullptr);
2033 
2034   s = db->Get(read_options, "first", &value);
2035   ASSERT_OK(s);
2036   ASSERT_EQ(value, "first");
2037 
2038   s = db->Get(read_options, "cats", &value);
2039   ASSERT_OK(s);
2040   ASSERT_EQ(value, "dogs4");
2041 }
2042 
TEST_P(TransactionTest,FirstWriteTest)2043 TEST_P(TransactionTest, FirstWriteTest) {
2044   WriteOptions write_options;
2045 
2046   // Test conflict checking against the very first write to a db.
2047   // The transaction's snapshot will have seq 1 and the following write
2048   // will have sequence 1.
2049   Status s = db->Put(write_options, "A", "a");
2050 
2051   Transaction* txn = db->BeginTransaction(write_options);
2052   txn->SetSnapshot();
2053 
2054   ASSERT_OK(s);
2055 
2056   s = txn->Put("A", "b");
2057   ASSERT_OK(s);
2058 
2059   delete txn;
2060 }
2061 
TEST_P(TransactionTest,FirstWriteTest2)2062 TEST_P(TransactionTest, FirstWriteTest2) {
2063   WriteOptions write_options;
2064 
2065   Transaction* txn = db->BeginTransaction(write_options);
2066   txn->SetSnapshot();
2067 
2068   // Test conflict checking against the very first write to a db.
2069   // The transaction's snapshot is a seq 0 while the following write
2070   // will have sequence 1.
2071   Status s = db->Put(write_options, "A", "a");
2072   ASSERT_OK(s);
2073 
2074   s = txn->Put("A", "b");
2075   ASSERT_TRUE(s.IsBusy());
2076 
2077   delete txn;
2078 }
2079 
TEST_P(TransactionTest,WriteOptionsTest)2080 TEST_P(TransactionTest, WriteOptionsTest) {
2081   WriteOptions write_options;
2082   write_options.sync = true;
2083   write_options.disableWAL = true;
2084 
2085   Transaction* txn = db->BeginTransaction(write_options);
2086   ASSERT_TRUE(txn);
2087 
2088   ASSERT_TRUE(txn->GetWriteOptions()->sync);
2089 
2090   write_options.sync = false;
2091   txn->SetWriteOptions(write_options);
2092   ASSERT_FALSE(txn->GetWriteOptions()->sync);
2093   ASSERT_TRUE(txn->GetWriteOptions()->disableWAL);
2094 
2095   delete txn;
2096 }
2097 
TEST_P(TransactionTest,WriteConflictTest)2098 TEST_P(TransactionTest, WriteConflictTest) {
2099   WriteOptions write_options;
2100   ReadOptions read_options;
2101   string value;
2102   Status s;
2103 
2104   ASSERT_OK(db->Put(write_options, "foo", "A"));
2105   ASSERT_OK(db->Put(write_options, "foo2", "B"));
2106 
2107   Transaction* txn = db->BeginTransaction(write_options);
2108   ASSERT_TRUE(txn);
2109 
2110   s = txn->Put("foo", "A2");
2111   ASSERT_OK(s);
2112 
2113   s = txn->Put("foo2", "B2");
2114   ASSERT_OK(s);
2115 
2116   // This Put outside of a transaction will conflict with the previous write
2117   s = db->Put(write_options, "foo", "xxx");
2118   ASSERT_TRUE(s.IsTimedOut());
2119 
2120   s = db->Get(read_options, "foo", &value);
2121   ASSERT_EQ(value, "A");
2122 
2123   s = txn->Commit();
2124   ASSERT_OK(s);
2125 
2126   db->Get(read_options, "foo", &value);
2127   ASSERT_EQ(value, "A2");
2128   db->Get(read_options, "foo2", &value);
2129   ASSERT_EQ(value, "B2");
2130 
2131   delete txn;
2132 }
2133 
TEST_P(TransactionTest,WriteConflictTest2)2134 TEST_P(TransactionTest, WriteConflictTest2) {
2135   WriteOptions write_options;
2136   ReadOptions read_options;
2137   TransactionOptions txn_options;
2138   std::string value;
2139   Status s;
2140 
2141   ASSERT_OK(db->Put(write_options, "foo", "bar"));
2142 
2143   txn_options.set_snapshot = true;
2144   Transaction* txn = db->BeginTransaction(write_options, txn_options);
2145   ASSERT_TRUE(txn);
2146 
2147   // This Put outside of a transaction will conflict with a later write
2148   s = db->Put(write_options, "foo", "barz");
2149   ASSERT_OK(s);
2150 
2151   s = txn->Put("foo2", "X");
2152   ASSERT_OK(s);
2153 
2154   s = txn->Put("foo",
2155                "bar2");  // Conflicts with write done after snapshot taken
2156   ASSERT_TRUE(s.IsBusy());
2157 
2158   s = txn->Put("foo3", "Y");
2159   ASSERT_OK(s);
2160 
2161   s = db->Get(read_options, "foo", &value);
2162   ASSERT_EQ(value, "barz");
2163 
2164   ASSERT_EQ(2, txn->GetNumKeys());
2165 
2166   s = txn->Commit();
2167   ASSERT_OK(s);  // Txn should commit, but only write foo2 and foo3
2168 
2169   // Verify that transaction wrote foo2 and foo3 but not foo
2170   db->Get(read_options, "foo", &value);
2171   ASSERT_EQ(value, "barz");
2172 
2173   db->Get(read_options, "foo2", &value);
2174   ASSERT_EQ(value, "X");
2175 
2176   db->Get(read_options, "foo3", &value);
2177   ASSERT_EQ(value, "Y");
2178 
2179   delete txn;
2180 }
2181 
TEST_P(TransactionTest,ReadConflictTest)2182 TEST_P(TransactionTest, ReadConflictTest) {
2183   WriteOptions write_options;
2184   ReadOptions read_options, snapshot_read_options;
2185   TransactionOptions txn_options;
2186   std::string value;
2187   Status s;
2188 
2189   ASSERT_OK(db->Put(write_options, "foo", "bar"));
2190   ASSERT_OK(db->Put(write_options, "foo2", "bar"));
2191 
2192   txn_options.set_snapshot = true;
2193   Transaction* txn = db->BeginTransaction(write_options, txn_options);
2194   ASSERT_TRUE(txn);
2195 
2196   txn->SetSnapshot();
2197   snapshot_read_options.snapshot = txn->GetSnapshot();
2198 
2199   ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2200   ASSERT_EQ(value, "bar");
2201 
2202   // This Put outside of a transaction will conflict with the previous read
2203   s = db->Put(write_options, "foo", "barz");
2204   ASSERT_TRUE(s.IsTimedOut());
2205 
2206   s = db->Get(read_options, "foo", &value);
2207   ASSERT_EQ(value, "bar");
2208 
2209   s = txn->Get(read_options, "foo", &value);
2210   ASSERT_EQ(value, "bar");
2211 
2212   s = txn->Commit();
2213   ASSERT_OK(s);
2214 
2215   delete txn;
2216 }
2217 
TEST_P(TransactionTest,TxnOnlyTest)2218 TEST_P(TransactionTest, TxnOnlyTest) {
2219   // Test to make sure transactions work when there are no other writes in an
2220   // empty db.
2221 
2222   WriteOptions write_options;
2223   ReadOptions read_options;
2224   std::string value;
2225   Status s;
2226 
2227   Transaction* txn = db->BeginTransaction(write_options);
2228   ASSERT_TRUE(txn);
2229 
2230   s = txn->Put("x", "y");
2231   ASSERT_OK(s);
2232 
2233   s = txn->Commit();
2234   ASSERT_OK(s);
2235 
2236   delete txn;
2237 }
2238 
TEST_P(TransactionTest,FlushTest)2239 TEST_P(TransactionTest, FlushTest) {
2240   WriteOptions write_options;
2241   ReadOptions read_options, snapshot_read_options;
2242   std::string value;
2243   Status s;
2244 
2245   ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
2246   ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
2247 
2248   Transaction* txn = db->BeginTransaction(write_options);
2249   ASSERT_TRUE(txn);
2250 
2251   snapshot_read_options.snapshot = txn->GetSnapshot();
2252 
2253   ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2254   ASSERT_EQ(value, "bar");
2255 
2256   s = txn->Put(Slice("foo"), Slice("bar2"));
2257   ASSERT_OK(s);
2258 
2259   ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2260   ASSERT_EQ(value, "bar2");
2261 
2262   // Put a random key so we have a memtable to flush
2263   s = db->Put(write_options, "dummy", "dummy");
2264   ASSERT_OK(s);
2265 
2266   // force a memtable flush
2267   FlushOptions flush_ops;
2268   db->Flush(flush_ops);
2269 
2270   s = txn->Commit();
2271   // txn should commit since the flushed table is still in MemtableList History
2272   ASSERT_OK(s);
2273 
2274   db->Get(read_options, "foo", &value);
2275   ASSERT_EQ(value, "bar2");
2276 
2277   delete txn;
2278 }
2279 
TEST_P(TransactionTest,FlushTest2)2280 TEST_P(TransactionTest, FlushTest2) {
2281   const size_t num_tests = 3;
2282 
2283   for (size_t n = 0; n < num_tests; n++) {
2284     // Test different table factories
2285     switch (n) {
2286       case 0:
2287         break;
2288       case 1:
2289         options.table_factory.reset(new mock::MockTableFactory());
2290         break;
2291       case 2: {
2292         PlainTableOptions pt_opts;
2293         pt_opts.hash_table_ratio = 0;
2294         options.table_factory.reset(NewPlainTableFactory(pt_opts));
2295         break;
2296       }
2297     }
2298 
2299     Status s = ReOpen();
2300     ASSERT_OK(s);
2301     assert(db != nullptr);
2302 
2303     WriteOptions write_options;
2304     ReadOptions read_options, snapshot_read_options;
2305     TransactionOptions txn_options;
2306     string value;
2307 
2308     DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2309 
2310     ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
2311     ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar2")));
2312     ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar3")));
2313 
2314     txn_options.set_snapshot = true;
2315     Transaction* txn = db->BeginTransaction(write_options, txn_options);
2316     ASSERT_TRUE(txn);
2317 
2318     snapshot_read_options.snapshot = txn->GetSnapshot();
2319 
2320     ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2321     ASSERT_EQ(value, "bar");
2322 
2323     s = txn->Put(Slice("foo"), Slice("bar2"));
2324     ASSERT_OK(s);
2325 
2326     ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2327     ASSERT_EQ(value, "bar2");
2328     // verify foo is locked by txn
2329     s = db->Delete(write_options, "foo");
2330     ASSERT_TRUE(s.IsTimedOut());
2331 
2332     s = db->Put(write_options, "Z", "z");
2333     ASSERT_OK(s);
2334     s = db->Put(write_options, "dummy", "dummy");
2335     ASSERT_OK(s);
2336 
2337     s = db->Put(write_options, "S", "s");
2338     ASSERT_OK(s);
2339     s = db->SingleDelete(write_options, "S");
2340     ASSERT_OK(s);
2341 
2342     s = txn->Delete("S");
2343     // Should fail after encountering a write to S in memtable
2344     ASSERT_TRUE(s.IsBusy());
2345 
2346     // force a memtable flush
2347     s = db_impl->TEST_FlushMemTable(true);
2348     ASSERT_OK(s);
2349 
2350     // Put a random key so we have a MemTable to flush
2351     s = db->Put(write_options, "dummy", "dummy2");
2352     ASSERT_OK(s);
2353 
2354     // force a memtable flush
2355     ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2356 
2357     s = db->Put(write_options, "dummy", "dummy3");
2358     ASSERT_OK(s);
2359 
2360     // force a memtable flush
2361     // Since our test db has max_write_buffer_number=2, this flush will cause
2362     // the first memtable to get purged from the MemtableList history.
2363     ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2364 
2365     s = txn->Put("X", "Y");
2366     // Should succeed after verifying there is no write to X in SST file
2367     ASSERT_OK(s);
2368 
2369     s = txn->Put("Z", "zz");
2370     // Should fail after encountering a write to Z in SST file
2371     ASSERT_TRUE(s.IsBusy());
2372 
2373     s = txn->GetForUpdate(read_options, "foo2", &value);
2374     // should succeed since key was written before txn started
2375     ASSERT_OK(s);
2376     // verify foo2 is locked by txn
2377     s = db->Delete(write_options, "foo2");
2378     ASSERT_TRUE(s.IsTimedOut());
2379 
2380     s = txn->Delete("S");
2381     // Should fail after encountering a write to S in SST file
2382     ASSERT_TRUE(s.IsBusy());
2383 
2384     // Write a bunch of keys to db to force a compaction
2385     Random rnd(47);
2386     for (int i = 0; i < 1000; i++) {
2387       s = db->Put(write_options, std::to_string(i),
2388                   test::CompressibleString(&rnd, 0.8, 100, &value));
2389       ASSERT_OK(s);
2390     }
2391 
2392     s = txn->Put("X", "yy");
2393     // Should succeed after verifying there is no write to X in SST file
2394     ASSERT_OK(s);
2395 
2396     s = txn->Put("Z", "zzz");
2397     // Should fail after encountering a write to Z in SST file
2398     ASSERT_TRUE(s.IsBusy());
2399 
2400     s = txn->Delete("S");
2401     // Should fail after encountering a write to S in SST file
2402     ASSERT_TRUE(s.IsBusy());
2403 
2404     s = txn->GetForUpdate(read_options, "foo3", &value);
2405     // should succeed since key was written before txn started
2406     ASSERT_OK(s);
2407     // verify foo3 is locked by txn
2408     s = db->Delete(write_options, "foo3");
2409     ASSERT_TRUE(s.IsTimedOut());
2410 
2411     ASSERT_OK(db_impl->TEST_WaitForCompact());
2412 
2413     s = txn->Commit();
2414     ASSERT_OK(s);
2415 
2416     // Transaction should only write the keys that succeeded.
2417     s = db->Get(read_options, "foo", &value);
2418     ASSERT_EQ(value, "bar2");
2419 
2420     s = db->Get(read_options, "X", &value);
2421     ASSERT_OK(s);
2422     ASSERT_EQ("yy", value);
2423 
2424     s = db->Get(read_options, "Z", &value);
2425     ASSERT_OK(s);
2426     ASSERT_EQ("z", value);
2427 
2428   delete txn;
2429   }
2430 }
2431 
TEST_P(TransactionTest,NoSnapshotTest)2432 TEST_P(TransactionTest, NoSnapshotTest) {
2433   WriteOptions write_options;
2434   ReadOptions read_options;
2435   std::string value;
2436   Status s;
2437 
2438   ASSERT_OK(db->Put(write_options, "AAA", "bar"));
2439 
2440   Transaction* txn = db->BeginTransaction(write_options);
2441   ASSERT_TRUE(txn);
2442 
2443   // Modify key after transaction start
2444   ASSERT_OK(db->Put(write_options, "AAA", "bar1"));
2445 
2446   // Read and write without a snap
2447   ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
2448   ASSERT_EQ(value, "bar1");
2449   s = txn->Put("AAA", "bar2");
2450   ASSERT_OK(s);
2451 
2452   // Should commit since read/write was done after data changed
2453   s = txn->Commit();
2454   ASSERT_OK(s);
2455 
2456   ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
2457   ASSERT_EQ(value, "bar2");
2458 
2459   delete txn;
2460 }
2461 
TEST_P(TransactionTest,MultipleSnapshotTest)2462 TEST_P(TransactionTest, MultipleSnapshotTest) {
2463   WriteOptions write_options;
2464   ReadOptions read_options, snapshot_read_options;
2465   std::string value;
2466   Status s;
2467 
2468   ASSERT_OK(db->Put(write_options, "AAA", "bar"));
2469   ASSERT_OK(db->Put(write_options, "BBB", "bar"));
2470   ASSERT_OK(db->Put(write_options, "CCC", "bar"));
2471 
2472   Transaction* txn = db->BeginTransaction(write_options);
2473   ASSERT_TRUE(txn);
2474 
2475   ASSERT_OK(db->Put(write_options, "AAA", "bar1"));
2476 
2477   // Read and write without a snapshot
2478   ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
2479   ASSERT_EQ(value, "bar1");
2480   s = txn->Put("AAA", "bar2");
2481   ASSERT_OK(s);
2482 
2483   // Modify BBB before snapshot is taken
2484   ASSERT_OK(db->Put(write_options, "BBB", "bar1"));
2485 
2486   txn->SetSnapshot();
2487   snapshot_read_options.snapshot = txn->GetSnapshot();
2488 
2489   // Read and write with snapshot
2490   ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value));
2491   ASSERT_EQ(value, "bar1");
2492   s = txn->Put("BBB", "bar2");
2493   ASSERT_OK(s);
2494 
2495   ASSERT_OK(db->Put(write_options, "CCC", "bar1"));
2496 
2497   // Set a new snapshot
2498   txn->SetSnapshot();
2499   snapshot_read_options.snapshot = txn->GetSnapshot();
2500 
2501   // Read and write with snapshot
2502   ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "CCC", &value));
2503   ASSERT_EQ(value, "bar1");
2504   s = txn->Put("CCC", "bar2");
2505   ASSERT_OK(s);
2506 
2507   s = txn->GetForUpdate(read_options, "AAA", &value);
2508   ASSERT_OK(s);
2509   ASSERT_EQ(value, "bar2");
2510   s = txn->GetForUpdate(read_options, "BBB", &value);
2511   ASSERT_OK(s);
2512   ASSERT_EQ(value, "bar2");
2513   s = txn->GetForUpdate(read_options, "CCC", &value);
2514   ASSERT_OK(s);
2515   ASSERT_EQ(value, "bar2");
2516 
2517   s = db->Get(read_options, "AAA", &value);
2518   ASSERT_OK(s);
2519   ASSERT_EQ(value, "bar1");
2520   s = db->Get(read_options, "BBB", &value);
2521   ASSERT_OK(s);
2522   ASSERT_EQ(value, "bar1");
2523   s = db->Get(read_options, "CCC", &value);
2524   ASSERT_OK(s);
2525   ASSERT_EQ(value, "bar1");
2526 
2527   s = txn->Commit();
2528   ASSERT_OK(s);
2529 
2530   s = db->Get(read_options, "AAA", &value);
2531   ASSERT_OK(s);
2532   ASSERT_EQ(value, "bar2");
2533   s = db->Get(read_options, "BBB", &value);
2534   ASSERT_OK(s);
2535   ASSERT_EQ(value, "bar2");
2536   s = db->Get(read_options, "CCC", &value);
2537   ASSERT_OK(s);
2538   ASSERT_EQ(value, "bar2");
2539 
2540   // verify that we track multiple writes to the same key at different snapshots
2541   delete txn;
2542   txn = db->BeginTransaction(write_options);
2543 
2544   // Potentially conflicting writes
2545   ASSERT_OK(db->Put(write_options, "ZZZ", "zzz"));
2546   ASSERT_OK(db->Put(write_options, "XXX", "xxx"));
2547 
2548   txn->SetSnapshot();
2549 
2550   TransactionOptions txn_options;
2551   txn_options.set_snapshot = true;
2552   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2553   txn2->SetSnapshot();
2554 
2555   // This should not conflict in txn since the snapshot is later than the
2556   // previous write (spoiler alert:  it will later conflict with txn2).
2557   s = txn->Put("ZZZ", "zzzz");
2558   ASSERT_OK(s);
2559 
2560   s = txn->Commit();
2561   ASSERT_OK(s);
2562 
2563   delete txn;
2564 
2565   // This will conflict since the snapshot is earlier than another write to ZZZ
2566   s = txn2->Put("ZZZ", "xxxxx");
2567   ASSERT_TRUE(s.IsBusy());
2568 
2569   s = txn2->Commit();
2570   ASSERT_OK(s);
2571 
2572   s = db->Get(read_options, "ZZZ", &value);
2573   ASSERT_OK(s);
2574   ASSERT_EQ(value, "zzzz");
2575 
2576   delete txn2;
2577 }
2578 
TEST_P(TransactionTest,ColumnFamiliesTest)2579 TEST_P(TransactionTest, ColumnFamiliesTest) {
2580   WriteOptions write_options;
2581   ReadOptions read_options, snapshot_read_options;
2582   TransactionOptions txn_options;
2583   string value;
2584   Status s;
2585 
2586   ColumnFamilyHandle *cfa, *cfb;
2587   ColumnFamilyOptions cf_options;
2588 
2589   // Create 2 new column families
2590   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
2591   ASSERT_OK(s);
2592   s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
2593   ASSERT_OK(s);
2594 
2595   delete cfa;
2596   delete cfb;
2597   delete db;
2598   db = nullptr;
2599 
2600   // open DB with three column families
2601   std::vector<ColumnFamilyDescriptor> column_families;
2602   // have to open default column family
2603   column_families.push_back(
2604       ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2605   // open the new column families
2606   column_families.push_back(
2607       ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
2608   column_families.push_back(
2609       ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
2610 
2611   std::vector<ColumnFamilyHandle*> handles;
2612 
2613   ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2614   assert(db != nullptr);
2615 
2616   Transaction* txn = db->BeginTransaction(write_options);
2617   ASSERT_TRUE(txn);
2618 
2619   txn->SetSnapshot();
2620   snapshot_read_options.snapshot = txn->GetSnapshot();
2621 
2622   txn_options.set_snapshot = true;
2623   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2624   ASSERT_TRUE(txn2);
2625 
2626   // Write some data to the db
2627   WriteBatch batch;
2628   ASSERT_OK(batch.Put("foo", "foo"));
2629   ASSERT_OK(batch.Put(handles[1], "AAA", "bar"));
2630   ASSERT_OK(batch.Put(handles[1], "AAAZZZ", "bar"));
2631   s = db->Write(write_options, &batch);
2632   ASSERT_OK(s);
2633   ASSERT_OK(db->Delete(write_options, handles[1], "AAAZZZ"));
2634 
2635   // These keys do not conflict with existing writes since they're in
2636   // different column families
2637   s = txn->Delete("AAA");
2638   ASSERT_OK(s);
2639   s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
2640   ASSERT_TRUE(s.IsNotFound());
2641   Slice key_slice("AAAZZZ");
2642   Slice value_slices[2] = {Slice("bar"), Slice("bar")};
2643   s = txn->Put(handles[2], SliceParts(&key_slice, 1),
2644                SliceParts(value_slices, 2));
2645   ASSERT_OK(s);
2646   ASSERT_EQ(3, txn->GetNumKeys());
2647 
2648   s = txn->Commit();
2649   ASSERT_OK(s);
2650   s = db->Get(read_options, "AAA", &value);
2651   ASSERT_TRUE(s.IsNotFound());
2652   s = db->Get(read_options, handles[2], "AAAZZZ", &value);
2653   ASSERT_EQ(value, "barbar");
2654 
2655   Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
2656   Slice value_slice("barbarbar");
2657 
2658   s = txn2->Delete(handles[2], "XXX");
2659   ASSERT_OK(s);
2660   s = txn2->Delete(handles[1], "XXX");
2661   ASSERT_OK(s);
2662 
2663   // This write will cause a conflict with the earlier batch write
2664   s = txn2->Put(handles[1], SliceParts(key_slices, 3),
2665                 SliceParts(&value_slice, 1));
2666   ASSERT_TRUE(s.IsBusy());
2667 
2668   s = txn2->Commit();
2669   ASSERT_OK(s);
2670   // In the above the latest change to AAAZZZ in handles[1] is delete.
2671   s = db->Get(read_options, handles[1], "AAAZZZ", &value);
2672   ASSERT_TRUE(s.IsNotFound());
2673 
2674   delete txn;
2675   delete txn2;
2676 
2677   txn = db->BeginTransaction(write_options, txn_options);
2678   snapshot_read_options.snapshot = txn->GetSnapshot();
2679 
2680   txn2 = db->BeginTransaction(write_options, txn_options);
2681   ASSERT_TRUE(txn);
2682 
2683   std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
2684                                                    handles[0], handles[2]};
2685   std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
2686   std::vector<std::string> values(4);
2687   std::vector<Status> results = txn->MultiGetForUpdate(
2688       snapshot_read_options, multiget_cfh, multiget_keys, &values);
2689   ASSERT_OK(results[0]);
2690   ASSERT_OK(results[1]);
2691   ASSERT_OK(results[2]);
2692   ASSERT_TRUE(results[3].IsNotFound());
2693   ASSERT_EQ(values[0], "bar");
2694   ASSERT_EQ(values[1], "barbar");
2695   ASSERT_EQ(values[2], "foo");
2696 
2697   s = txn->SingleDelete(handles[2], "ZZZ");
2698   ASSERT_OK(s);
2699   s = txn->Put(handles[2], "ZZZ", "YYY");
2700   ASSERT_OK(s);
2701   s = txn->Put(handles[2], "ZZZ", "YYYY");
2702   ASSERT_OK(s);
2703   s = txn->Delete(handles[2], "ZZZ");
2704   ASSERT_OK(s);
2705   s = txn->Put(handles[2], "AAAZZZ", "barbarbar");
2706   ASSERT_OK(s);
2707 
2708   ASSERT_EQ(5, txn->GetNumKeys());
2709 
2710   // Txn should commit
2711   s = txn->Commit();
2712   ASSERT_OK(s);
2713   s = db->Get(read_options, handles[2], "ZZZ", &value);
2714   ASSERT_TRUE(s.IsNotFound());
2715 
2716   // Put a key which will conflict with the next txn using the previous snapshot
2717   ASSERT_OK(db->Put(write_options, handles[2], "foo", "000"));
2718 
2719   results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
2720                                     multiget_keys, &values);
2721   // All results should fail since there was a conflict
2722   ASSERT_TRUE(results[0].IsBusy());
2723   ASSERT_TRUE(results[1].IsBusy());
2724   ASSERT_TRUE(results[2].IsBusy());
2725   ASSERT_TRUE(results[3].IsBusy());
2726 
2727   s = db->Get(read_options, handles[2], "foo", &value);
2728   ASSERT_EQ(value, "000");
2729 
2730   s = txn2->Commit();
2731   ASSERT_OK(s);
2732 
2733   s = db->DropColumnFamily(handles[1]);
2734   ASSERT_OK(s);
2735   s = db->DropColumnFamily(handles[2]);
2736   ASSERT_OK(s);
2737 
2738   delete txn;
2739   delete txn2;
2740 
2741   for (auto handle : handles) {
2742     delete handle;
2743   }
2744 }
2745 
TEST_P(TransactionTest,MultiGetBatchedTest)2746 TEST_P(TransactionTest, MultiGetBatchedTest) {
2747   WriteOptions write_options;
2748   ReadOptions read_options, snapshot_read_options;
2749   TransactionOptions txn_options;
2750   string value;
2751   Status s;
2752 
2753   ColumnFamilyHandle* cf;
2754   ColumnFamilyOptions cf_options;
2755 
2756   // Create a new column families
2757   s = db->CreateColumnFamily(cf_options, "CF", &cf);
2758   ASSERT_OK(s);
2759 
2760   delete cf;
2761   delete db;
2762   db = nullptr;
2763 
2764   // open DB with three column families
2765   std::vector<ColumnFamilyDescriptor> column_families;
2766   // have to open default column family
2767   column_families.push_back(
2768       ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2769   // open the new column families
2770   cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2771   column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2772 
2773   std::vector<ColumnFamilyHandle*> handles;
2774 
2775   options.merge_operator = MergeOperators::CreateStringAppendOperator();
2776   ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2777   assert(db != nullptr);
2778 
2779   // Write some data to the db
2780   WriteBatch batch;
2781   ASSERT_OK(batch.Put(handles[1], "aaa", "val1"));
2782   ASSERT_OK(batch.Put(handles[1], "bbb", "val2"));
2783   ASSERT_OK(batch.Put(handles[1], "ccc", "val3"));
2784   ASSERT_OK(batch.Put(handles[1], "ddd", "foo"));
2785   ASSERT_OK(batch.Put(handles[1], "eee", "val5"));
2786   ASSERT_OK(batch.Put(handles[1], "fff", "val6"));
2787   ASSERT_OK(batch.Merge(handles[1], "ggg", "foo"));
2788   s = db->Write(write_options, &batch);
2789   ASSERT_OK(s);
2790 
2791   Transaction* txn = db->BeginTransaction(write_options);
2792   ASSERT_TRUE(txn);
2793 
2794   txn->SetSnapshot();
2795   snapshot_read_options.snapshot = txn->GetSnapshot();
2796 
2797   txn_options.set_snapshot = true;
2798   // Write some data to the db
2799   s = txn->Delete(handles[1], "bbb");
2800   ASSERT_OK(s);
2801   s = txn->Put(handles[1], "ccc", "val3_new");
2802   ASSERT_OK(s);
2803   s = txn->Merge(handles[1], "ddd", "bar");
2804   ASSERT_OK(s);
2805 
2806   std::vector<Slice> keys = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg"};
2807   std::vector<PinnableSlice> values(keys.size());
2808   std::vector<Status> statuses(keys.size());
2809 
2810   txn->MultiGet(snapshot_read_options, handles[1], keys.size(), keys.data(),
2811                 values.data(), statuses.data());
2812   ASSERT_TRUE(statuses[0].ok());
2813   ASSERT_EQ(values[0], "val1");
2814   ASSERT_TRUE(statuses[1].IsNotFound());
2815   ASSERT_TRUE(statuses[2].ok());
2816   ASSERT_EQ(values[2], "val3_new");
2817   ASSERT_TRUE(statuses[3].ok());
2818   ASSERT_EQ(values[3], "foo,bar");
2819   ASSERT_TRUE(statuses[4].ok());
2820   ASSERT_EQ(values[4], "val5");
2821   ASSERT_TRUE(statuses[5].ok());
2822   ASSERT_EQ(values[5], "val6");
2823   ASSERT_TRUE(statuses[6].ok());
2824   ASSERT_EQ(values[6], "foo");
2825   delete txn;
2826   for (auto handle : handles) {
2827     delete handle;
2828   }
2829 }
2830 
2831 // This test calls WriteBatchWithIndex::MultiGetFromBatchAndDB with a large
2832 // number of keys, i.e greater than MultiGetContext::MAX_BATCH_SIZE, which is
2833 // is 32. This forces autovector allocations in the MultiGet code paths
2834 // to use std::vector in addition to stack allocations. The MultiGet keys
2835 // includes Merges, which are handled specially in MultiGetFromBatchAndDB by
2836 // allocating an autovector of MergeContexts
TEST_P(TransactionTest,MultiGetLargeBatchedTest)2837 TEST_P(TransactionTest, MultiGetLargeBatchedTest) {
2838   WriteOptions write_options;
2839   ReadOptions read_options, snapshot_read_options;
2840   string value;
2841   Status s;
2842 
2843   ColumnFamilyHandle* cf;
2844   ColumnFamilyOptions cf_options;
2845 
2846   std::vector<std::string> key_str;
2847   for (int i = 0; i < 100; ++i) {
2848     key_str.emplace_back(std::to_string(i));
2849   }
2850   // Create a new column families
2851   s = db->CreateColumnFamily(cf_options, "CF", &cf);
2852   ASSERT_OK(s);
2853 
2854   delete cf;
2855   delete db;
2856   db = nullptr;
2857 
2858   // open DB with three column families
2859   std::vector<ColumnFamilyDescriptor> column_families;
2860   // have to open default column family
2861   column_families.push_back(
2862       ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2863   // open the new column families
2864   cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2865   column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2866 
2867   std::vector<ColumnFamilyHandle*> handles;
2868 
2869   options.merge_operator = MergeOperators::CreateStringAppendOperator();
2870   ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2871   assert(db != nullptr);
2872 
2873   // Write some data to the db
2874   WriteBatch batch;
2875   for (int i = 0; i < 3 * MultiGetContext::MAX_BATCH_SIZE; ++i) {
2876     std::string val = "val" + std::to_string(i);
2877     ASSERT_OK(batch.Put(handles[1], key_str[i], val));
2878   }
2879   s = db->Write(write_options, &batch);
2880   ASSERT_OK(s);
2881 
2882   WriteBatchWithIndex wb;
2883   // Write some data to the db
2884   s = wb.Delete(handles[1], std::to_string(1));
2885   ASSERT_OK(s);
2886   s = wb.Put(handles[1], std::to_string(2), "new_val" + std::to_string(2));
2887   ASSERT_OK(s);
2888   // Write a lot of merges so when we call MultiGetFromBatchAndDB later on,
2889   // it is forced to use std::vector in ROCKSDB_NAMESPACE::autovector to
2890   // allocate MergeContexts. The number of merges needs to be >
2891   // MultiGetContext::MAX_BATCH_SIZE
2892   for (int i = 8; i < MultiGetContext::MAX_BATCH_SIZE + 24; ++i) {
2893     s = wb.Merge(handles[1], std::to_string(i), "merge");
2894     ASSERT_OK(s);
2895   }
2896 
2897   // MultiGet a lot of keys in order to force std::vector reallocations
2898   std::vector<Slice> keys;
2899   for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE + 32; ++i) {
2900     keys.emplace_back(key_str[i]);
2901   }
2902   std::vector<PinnableSlice> values(keys.size());
2903   std::vector<Status> statuses(keys.size());
2904 
2905   wb.MultiGetFromBatchAndDB(db, snapshot_read_options, handles[1], keys.size(), keys.data(),
2906                 values.data(), statuses.data(), false);
2907   for (size_t i =0; i < keys.size(); ++i) {
2908     if (i == 1) {
2909       ASSERT_TRUE(statuses[1].IsNotFound());
2910     } else if (i == 2) {
2911       ASSERT_TRUE(statuses[2].ok());
2912       ASSERT_EQ(values[2], "new_val" + std::to_string(2));
2913     } else if (i >= 8 && i < 56) {
2914       ASSERT_TRUE(statuses[i].ok());
2915       ASSERT_EQ(values[i], "val" + std::to_string(i) + ",merge");
2916     } else {
2917       ASSERT_TRUE(statuses[i].ok());
2918       if (values[i] != "val" + std::to_string(i)) {
2919         ASSERT_EQ(values[i], "val" + std::to_string(i));
2920       }
2921     }
2922   }
2923 
2924   for (auto handle : handles) {
2925     delete handle;
2926   }
2927 }
2928 
TEST_P(TransactionTest,MultiGetSnapshot)2929 TEST_P(TransactionTest, MultiGetSnapshot) {
2930   WriteOptions write_options;
2931   TransactionOptions transaction_options;
2932   Transaction* txn1 = db->BeginTransaction(write_options, transaction_options);
2933 
2934   Slice key = "foo";
2935 
2936   Status s = txn1->Put(key, "bar");
2937   ASSERT_OK(s);
2938 
2939   s = txn1->SetName("test");
2940   ASSERT_OK(s);
2941 
2942   s = txn1->Prepare();
2943   ASSERT_OK(s);
2944 
2945   // Get snapshot between prepare and commit
2946   // Un-committed data should be invisible to other transactions
2947   const Snapshot* s1 = db->GetSnapshot();
2948 
2949   s = txn1->Commit();
2950   ASSERT_OK(s);
2951   delete txn1;
2952 
2953   Transaction* txn2 = db->BeginTransaction(write_options, transaction_options);
2954   ReadOptions read_options;
2955   read_options.snapshot = s1;
2956 
2957   std::vector<Slice> keys;
2958   std::vector<PinnableSlice> values(1);
2959   std::vector<Status> statuses(1);
2960   keys.push_back(key);
2961   auto cfd = db->DefaultColumnFamily();
2962   txn2->MultiGet(read_options, cfd, 1, keys.data(), values.data(),
2963                  statuses.data());
2964   ASSERT_TRUE(statuses[0].IsNotFound());
2965   delete txn2;
2966 
2967   db->ReleaseSnapshot(s1);
2968 }
2969 
TEST_P(TransactionTest,ColumnFamiliesTest2)2970 TEST_P(TransactionTest, ColumnFamiliesTest2) {
2971   WriteOptions write_options;
2972   ReadOptions read_options, snapshot_read_options;
2973   string value;
2974   Status s;
2975 
2976   ColumnFamilyHandle *one, *two;
2977   ColumnFamilyOptions cf_options;
2978 
2979   // Create 2 new column families
2980   s = db->CreateColumnFamily(cf_options, "ONE", &one);
2981   ASSERT_OK(s);
2982   s = db->CreateColumnFamily(cf_options, "TWO", &two);
2983   ASSERT_OK(s);
2984 
2985   Transaction* txn1 = db->BeginTransaction(write_options);
2986   ASSERT_TRUE(txn1);
2987   Transaction* txn2 = db->BeginTransaction(write_options);
2988   ASSERT_TRUE(txn2);
2989 
2990   s = txn1->Put(one, "X", "1");
2991   ASSERT_OK(s);
2992   s = txn1->Put(two, "X", "2");
2993   ASSERT_OK(s);
2994   s = txn1->Put("X", "0");
2995   ASSERT_OK(s);
2996 
2997   s = txn2->Put(one, "X", "11");
2998   ASSERT_TRUE(s.IsTimedOut());
2999 
3000   s = txn1->Commit();
3001   ASSERT_OK(s);
3002 
3003   // Drop first column family
3004   s = db->DropColumnFamily(one);
3005   ASSERT_OK(s);
3006 
3007   // Should fail since column family was dropped.
3008   s = txn2->Commit();
3009   ASSERT_OK(s);
3010 
3011   delete txn1;
3012   txn1 = db->BeginTransaction(write_options);
3013   ASSERT_TRUE(txn1);
3014 
3015   // Should fail since column family was dropped
3016   s = txn1->Put(one, "X", "111");
3017   ASSERT_TRUE(s.IsInvalidArgument());
3018 
3019   s = txn1->Put(two, "X", "222");
3020   ASSERT_OK(s);
3021 
3022   s = txn1->Put("X", "000");
3023   ASSERT_OK(s);
3024 
3025   s = txn1->Commit();
3026   ASSERT_OK(s);
3027 
3028   s = db->Get(read_options, two, "X", &value);
3029   ASSERT_OK(s);
3030   ASSERT_EQ("222", value);
3031 
3032   s = db->Get(read_options, "X", &value);
3033   ASSERT_OK(s);
3034   ASSERT_EQ("000", value);
3035 
3036   s = db->DropColumnFamily(two);
3037   ASSERT_OK(s);
3038 
3039   delete txn1;
3040   delete txn2;
3041 
3042   delete one;
3043   delete two;
3044 }
3045 
TEST_P(TransactionTest,EmptyTest)3046 TEST_P(TransactionTest, EmptyTest) {
3047   WriteOptions write_options;
3048   ReadOptions read_options;
3049   string value;
3050   Status s;
3051 
3052   s = db->Put(write_options, "aaa", "aaa");
3053   ASSERT_OK(s);
3054 
3055   Transaction* txn = db->BeginTransaction(write_options);
3056   s = txn->Commit();
3057   ASSERT_OK(s);
3058   delete txn;
3059 
3060   txn = db->BeginTransaction(write_options);
3061   ASSERT_OK(txn->Rollback());
3062   delete txn;
3063 
3064   txn = db->BeginTransaction(write_options);
3065   s = txn->GetForUpdate(read_options, "aaa", &value);
3066   ASSERT_EQ(value, "aaa");
3067 
3068   s = txn->Commit();
3069   ASSERT_OK(s);
3070   delete txn;
3071 
3072   txn = db->BeginTransaction(write_options);
3073   txn->SetSnapshot();
3074 
3075   s = txn->GetForUpdate(read_options, "aaa", &value);
3076   ASSERT_EQ(value, "aaa");
3077 
3078   // Conflicts with previous GetForUpdate
3079   s = db->Put(write_options, "aaa", "xxx");
3080   ASSERT_TRUE(s.IsTimedOut());
3081 
3082   // transaction expired!
3083   s = txn->Commit();
3084   ASSERT_OK(s);
3085   delete txn;
3086 }
3087 
TEST_P(TransactionTest,PredicateManyPreceders)3088 TEST_P(TransactionTest, PredicateManyPreceders) {
3089   WriteOptions write_options;
3090   ReadOptions read_options1, read_options2;
3091   TransactionOptions txn_options;
3092   string value;
3093   Status s;
3094 
3095   txn_options.set_snapshot = true;
3096   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3097   read_options1.snapshot = txn1->GetSnapshot();
3098 
3099   Transaction* txn2 = db->BeginTransaction(write_options);
3100   txn2->SetSnapshot();
3101   read_options2.snapshot = txn2->GetSnapshot();
3102 
3103   std::vector<Slice> multiget_keys = {"1", "2", "3"};
3104   std::vector<std::string> multiget_values;
3105 
3106   std::vector<Status> results =
3107       txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
3108   ASSERT_EQ(results.size(), 3);
3109   ASSERT_TRUE(results[0].IsNotFound());
3110   ASSERT_TRUE(results[1].IsNotFound());
3111   ASSERT_TRUE(results[2].IsNotFound());
3112 
3113   s = txn2->Put("2", "x");  // Conflict's with txn1's MultiGetForUpdate
3114   ASSERT_TRUE(s.IsTimedOut());
3115 
3116   ASSERT_OK(txn2->Rollback());
3117 
3118   multiget_values.clear();
3119   results =
3120       txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
3121   ASSERT_EQ(results.size(), 3);
3122   ASSERT_TRUE(results[0].IsNotFound());
3123   ASSERT_TRUE(results[1].IsNotFound());
3124   ASSERT_TRUE(results[2].IsNotFound());
3125 
3126   s = txn1->Commit();
3127   ASSERT_OK(s);
3128 
3129   delete txn1;
3130   delete txn2;
3131 
3132   txn1 = db->BeginTransaction(write_options, txn_options);
3133   read_options1.snapshot = txn1->GetSnapshot();
3134 
3135   txn2 = db->BeginTransaction(write_options, txn_options);
3136   read_options2.snapshot = txn2->GetSnapshot();
3137 
3138   s = txn1->Put("4", "x");
3139   ASSERT_OK(s);
3140 
3141   s = txn2->Delete("4");  // conflict
3142   ASSERT_TRUE(s.IsTimedOut());
3143 
3144   s = txn1->Commit();
3145   ASSERT_OK(s);
3146 
3147   s = txn2->GetForUpdate(read_options2, "4", &value);
3148   ASSERT_TRUE(s.IsBusy());
3149 
3150   ASSERT_OK(txn2->Rollback());
3151 
3152   delete txn1;
3153   delete txn2;
3154 }
3155 
TEST_P(TransactionTest,LostUpdate)3156 TEST_P(TransactionTest, LostUpdate) {
3157   WriteOptions write_options;
3158   ReadOptions read_options, read_options1, read_options2;
3159   TransactionOptions txn_options;
3160   std::string value;
3161   Status s;
3162 
3163   // Test 2 transactions writing to the same key in multiple orders and
3164   // with/without snapshots
3165 
3166   Transaction* txn1 = db->BeginTransaction(write_options);
3167   Transaction* txn2 = db->BeginTransaction(write_options);
3168 
3169   s = txn1->Put("1", "1");
3170   ASSERT_OK(s);
3171 
3172   s = txn2->Put("1", "2");  // conflict
3173   ASSERT_TRUE(s.IsTimedOut());
3174 
3175   s = txn2->Commit();
3176   ASSERT_OK(s);
3177 
3178   s = txn1->Commit();
3179   ASSERT_OK(s);
3180 
3181   s = db->Get(read_options, "1", &value);
3182   ASSERT_OK(s);
3183   ASSERT_EQ("1", value);
3184 
3185   delete txn1;
3186   delete txn2;
3187 
3188   txn_options.set_snapshot = true;
3189   txn1 = db->BeginTransaction(write_options, txn_options);
3190   read_options1.snapshot = txn1->GetSnapshot();
3191 
3192   txn2 = db->BeginTransaction(write_options, txn_options);
3193   read_options2.snapshot = txn2->GetSnapshot();
3194 
3195   s = txn1->Put("1", "3");
3196   ASSERT_OK(s);
3197   s = txn2->Put("1", "4");  // conflict
3198   ASSERT_TRUE(s.IsTimedOut());
3199 
3200   s = txn1->Commit();
3201   ASSERT_OK(s);
3202 
3203   s = txn2->Commit();
3204   ASSERT_OK(s);
3205 
3206   s = db->Get(read_options, "1", &value);
3207   ASSERT_OK(s);
3208   ASSERT_EQ("3", value);
3209 
3210   delete txn1;
3211   delete txn2;
3212 
3213   txn1 = db->BeginTransaction(write_options, txn_options);
3214   read_options1.snapshot = txn1->GetSnapshot();
3215 
3216   txn2 = db->BeginTransaction(write_options, txn_options);
3217   read_options2.snapshot = txn2->GetSnapshot();
3218 
3219   s = txn1->Put("1", "5");
3220   ASSERT_OK(s);
3221 
3222   s = txn1->Commit();
3223   ASSERT_OK(s);
3224 
3225   s = txn2->Put("1", "6");
3226   ASSERT_TRUE(s.IsBusy());
3227   s = txn2->Commit();
3228   ASSERT_OK(s);
3229 
3230   s = db->Get(read_options, "1", &value);
3231   ASSERT_OK(s);
3232   ASSERT_EQ("5", value);
3233 
3234   delete txn1;
3235   delete txn2;
3236 
3237   txn1 = db->BeginTransaction(write_options, txn_options);
3238   read_options1.snapshot = txn1->GetSnapshot();
3239 
3240   txn2 = db->BeginTransaction(write_options, txn_options);
3241   read_options2.snapshot = txn2->GetSnapshot();
3242 
3243   s = txn1->Put("1", "7");
3244   ASSERT_OK(s);
3245   s = txn1->Commit();
3246   ASSERT_OK(s);
3247 
3248   txn2->SetSnapshot();
3249   s = txn2->Put("1", "8");
3250   ASSERT_OK(s);
3251   s = txn2->Commit();
3252   ASSERT_OK(s);
3253 
3254   s = db->Get(read_options, "1", &value);
3255   ASSERT_OK(s);
3256   ASSERT_EQ("8", value);
3257 
3258   delete txn1;
3259   delete txn2;
3260 
3261   txn1 = db->BeginTransaction(write_options);
3262   txn2 = db->BeginTransaction(write_options);
3263 
3264   s = txn1->Put("1", "9");
3265   ASSERT_OK(s);
3266   s = txn1->Commit();
3267   ASSERT_OK(s);
3268 
3269   s = txn2->Put("1", "10");
3270   ASSERT_OK(s);
3271   s = txn2->Commit();
3272   ASSERT_OK(s);
3273 
3274   delete txn1;
3275   delete txn2;
3276 
3277   s = db->Get(read_options, "1", &value);
3278   ASSERT_OK(s);
3279   ASSERT_EQ(value, "10");
3280 }
3281 
TEST_P(TransactionTest,UntrackedWrites)3282 TEST_P(TransactionTest, UntrackedWrites) {
3283   if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3284     // TODO(lth): For WriteUnprepared, validate that untracked writes are
3285     // not supported.
3286     return;
3287   }
3288 
3289   WriteOptions write_options;
3290   ReadOptions read_options;
3291   std::string value;
3292   Status s;
3293 
3294   // Verify transaction rollback works for untracked keys.
3295   Transaction* txn = db->BeginTransaction(write_options);
3296   txn->SetSnapshot();
3297 
3298   s = txn->PutUntracked("untracked", "0");
3299   ASSERT_OK(s);
3300   ASSERT_OK(txn->Rollback());
3301   s = db->Get(read_options, "untracked", &value);
3302   ASSERT_TRUE(s.IsNotFound());
3303 
3304   delete txn;
3305   txn = db->BeginTransaction(write_options);
3306   txn->SetSnapshot();
3307 
3308   s = db->Put(write_options, "untracked", "x");
3309   ASSERT_OK(s);
3310 
3311   // Untracked writes should succeed even though key was written after snapshot
3312   s = txn->PutUntracked("untracked", "1");
3313   ASSERT_OK(s);
3314   s = txn->MergeUntracked("untracked", "2");
3315   ASSERT_OK(s);
3316   s = txn->DeleteUntracked("untracked");
3317   ASSERT_OK(s);
3318 
3319   // Conflict
3320   s = txn->Put("untracked", "3");
3321   ASSERT_TRUE(s.IsBusy());
3322 
3323   s = txn->Commit();
3324   ASSERT_OK(s);
3325 
3326   s = db->Get(read_options, "untracked", &value);
3327   ASSERT_TRUE(s.IsNotFound());
3328 
3329   delete txn;
3330 }
3331 
TEST_P(TransactionTest,ExpiredTransaction)3332 TEST_P(TransactionTest, ExpiredTransaction) {
3333   WriteOptions write_options;
3334   ReadOptions read_options;
3335   TransactionOptions txn_options;
3336   string value;
3337   Status s;
3338 
3339   // Set txn expiration timeout to 0 microseconds (expires instantly)
3340   txn_options.expiration = 0;
3341   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3342 
3343   s = txn1->Put("X", "1");
3344   ASSERT_OK(s);
3345 
3346   s = txn1->Put("Y", "1");
3347   ASSERT_OK(s);
3348 
3349   Transaction* txn2 = db->BeginTransaction(write_options);
3350 
3351   // txn2 should be able to write to X since txn1 has expired
3352   s = txn2->Put("X", "2");
3353   ASSERT_OK(s);
3354 
3355   s = txn2->Commit();
3356   ASSERT_OK(s);
3357   s = db->Get(read_options, "X", &value);
3358   ASSERT_OK(s);
3359   ASSERT_EQ("2", value);
3360 
3361   s = txn1->Put("Z", "1");
3362   ASSERT_OK(s);
3363 
3364   // txn1 should fail to commit since it is expired
3365   s = txn1->Commit();
3366   ASSERT_TRUE(s.IsExpired());
3367 
3368   s = db->Get(read_options, "Y", &value);
3369   ASSERT_TRUE(s.IsNotFound());
3370 
3371   s = db->Get(read_options, "Z", &value);
3372   ASSERT_TRUE(s.IsNotFound());
3373 
3374   delete txn1;
3375   delete txn2;
3376 }
3377 
TEST_P(TransactionTest,ReinitializeTest)3378 TEST_P(TransactionTest, ReinitializeTest) {
3379   WriteOptions write_options;
3380   ReadOptions read_options;
3381   TransactionOptions txn_options;
3382   std::string value;
3383   Status s;
3384 
3385   // Set txn expiration timeout to 0 microseconds (expires instantly)
3386   txn_options.expiration = 0;
3387   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3388 
3389   // Reinitialize transaction to no long expire
3390   txn_options.expiration = -1;
3391   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3392 
3393   s = txn1->Put("Z", "z");
3394   ASSERT_OK(s);
3395 
3396   // Should commit since not expired
3397   s = txn1->Commit();
3398   ASSERT_OK(s);
3399 
3400   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3401 
3402   s = txn1->Put("Z", "zz");
3403   ASSERT_OK(s);
3404 
3405   // Reinitilize txn1 and verify that Z gets unlocked
3406   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3407 
3408   Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr);
3409   s = txn2->Put("Z", "zzz");
3410   ASSERT_OK(s);
3411   s = txn2->Commit();
3412   ASSERT_OK(s);
3413   delete txn2;
3414 
3415   s = db->Get(read_options, "Z", &value);
3416   ASSERT_OK(s);
3417   ASSERT_EQ(value, "zzz");
3418 
3419   // Verify snapshots get reinitialized correctly
3420   txn1->SetSnapshot();
3421   s = txn1->Put("Z", "zzzz");
3422   ASSERT_OK(s);
3423 
3424   s = txn1->Commit();
3425   ASSERT_OK(s);
3426 
3427   s = db->Get(read_options, "Z", &value);
3428   ASSERT_OK(s);
3429   ASSERT_EQ(value, "zzzz");
3430 
3431   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3432   const Snapshot* snapshot = txn1->GetSnapshot();
3433   ASSERT_FALSE(snapshot);
3434 
3435   txn_options.set_snapshot = true;
3436   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3437   snapshot = txn1->GetSnapshot();
3438   ASSERT_TRUE(snapshot);
3439 
3440   s = txn1->Put("Z", "a");
3441   ASSERT_OK(s);
3442 
3443   ASSERT_OK(txn1->Rollback());
3444 
3445   s = txn1->Put("Y", "y");
3446   ASSERT_OK(s);
3447 
3448   txn_options.set_snapshot = false;
3449   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3450   snapshot = txn1->GetSnapshot();
3451   ASSERT_FALSE(snapshot);
3452 
3453   s = txn1->Put("X", "x");
3454   ASSERT_OK(s);
3455 
3456   s = txn1->Commit();
3457   ASSERT_OK(s);
3458 
3459   s = db->Get(read_options, "Z", &value);
3460   ASSERT_OK(s);
3461   ASSERT_EQ(value, "zzzz");
3462 
3463   s = db->Get(read_options, "Y", &value);
3464   ASSERT_TRUE(s.IsNotFound());
3465 
3466   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3467 
3468   s = txn1->SetName("name");
3469   ASSERT_OK(s);
3470 
3471   s = txn1->Prepare();
3472   ASSERT_OK(s);
3473   s = txn1->Commit();
3474   ASSERT_OK(s);
3475 
3476   txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3477 
3478   s = txn1->SetName("name");
3479   ASSERT_OK(s);
3480 
3481   delete txn1;
3482 }
3483 
TEST_P(TransactionTest,Rollback)3484 TEST_P(TransactionTest, Rollback) {
3485   WriteOptions write_options;
3486   ReadOptions read_options;
3487   TransactionOptions txn_options;
3488   std::string value;
3489   Status s;
3490 
3491   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3492 
3493   ASSERT_OK(s);
3494 
3495   s = txn1->Put("X", "1");
3496   ASSERT_OK(s);
3497 
3498   Transaction* txn2 = db->BeginTransaction(write_options);
3499 
3500   // txn2 should not be able to write to X since txn1 has it locked
3501   s = txn2->Put("X", "2");
3502   ASSERT_TRUE(s.IsTimedOut());
3503 
3504   ASSERT_OK(txn1->Rollback());
3505   delete txn1;
3506 
3507   // txn2 should now be able to write to X
3508   s = txn2->Put("X", "3");
3509   ASSERT_OK(s);
3510 
3511   s = txn2->Commit();
3512   ASSERT_OK(s);
3513 
3514   s = db->Get(read_options, "X", &value);
3515   ASSERT_OK(s);
3516   ASSERT_EQ("3", value);
3517 
3518   delete txn2;
3519 }
3520 
TEST_P(TransactionTest,LockLimitTest)3521 TEST_P(TransactionTest, LockLimitTest) {
3522   WriteOptions write_options;
3523   ReadOptions read_options, snapshot_read_options;
3524   TransactionOptions txn_options;
3525   string value;
3526   Status s;
3527 
3528   delete db;
3529   db = nullptr;
3530 
3531   // Open DB with a lock limit of 3
3532   txn_db_options.max_num_locks = 3;
3533   ASSERT_OK(ReOpen());
3534   assert(db != nullptr);
3535   ASSERT_OK(s);
3536 
3537   // Create a txn and verify we can only lock up to 3 keys
3538   Transaction* txn = db->BeginTransaction(write_options, txn_options);
3539   ASSERT_TRUE(txn);
3540 
3541   s = txn->Put("X", "x");
3542   ASSERT_OK(s);
3543 
3544   s = txn->Put("Y", "y");
3545   ASSERT_OK(s);
3546 
3547   s = txn->Put("Z", "z");
3548   ASSERT_OK(s);
3549 
3550   // lock limit reached
3551   s = txn->Put("W", "w");
3552   ASSERT_TRUE(s.IsBusy());
3553 
3554   // re-locking same key shouldn't put us over the limit
3555   s = txn->Put("X", "xx");
3556   ASSERT_OK(s);
3557 
3558   s = txn->GetForUpdate(read_options, "W", &value);
3559   ASSERT_TRUE(s.IsBusy());
3560   s = txn->GetForUpdate(read_options, "V", &value);
3561   ASSERT_TRUE(s.IsBusy());
3562 
3563   // re-locking same key shouldn't put us over the limit
3564   s = txn->GetForUpdate(read_options, "Y", &value);
3565   ASSERT_OK(s);
3566   ASSERT_EQ("y", value);
3567 
3568   s = txn->Get(read_options, "W", &value);
3569   ASSERT_TRUE(s.IsNotFound());
3570 
3571   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
3572   ASSERT_TRUE(txn2);
3573 
3574   // "X" currently locked
3575   s = txn2->Put("X", "x");
3576   ASSERT_TRUE(s.IsTimedOut());
3577 
3578   // lock limit reached
3579   s = txn2->Put("M", "m");
3580   ASSERT_TRUE(s.IsBusy());
3581 
3582   s = txn->Commit();
3583   ASSERT_OK(s);
3584 
3585   s = db->Get(read_options, "X", &value);
3586   ASSERT_OK(s);
3587   ASSERT_EQ("xx", value);
3588 
3589   s = db->Get(read_options, "W", &value);
3590   ASSERT_TRUE(s.IsNotFound());
3591 
3592   // Committing txn should release its locks and allow txn2 to proceed
3593   s = txn2->Put("X", "x2");
3594   ASSERT_OK(s);
3595 
3596   s = txn2->Delete("X");
3597   ASSERT_OK(s);
3598 
3599   s = txn2->Put("M", "m");
3600   ASSERT_OK(s);
3601 
3602   s = txn2->Put("Z", "z2");
3603   ASSERT_OK(s);
3604 
3605   // lock limit reached
3606   s = txn2->Delete("Y");
3607   ASSERT_TRUE(s.IsBusy());
3608 
3609   s = txn2->Commit();
3610   ASSERT_OK(s);
3611 
3612   s = db->Get(read_options, "Z", &value);
3613   ASSERT_OK(s);
3614   ASSERT_EQ("z2", value);
3615 
3616   s = db->Get(read_options, "Y", &value);
3617   ASSERT_OK(s);
3618   ASSERT_EQ("y", value);
3619 
3620   s = db->Get(read_options, "X", &value);
3621   ASSERT_TRUE(s.IsNotFound());
3622 
3623   delete txn;
3624   delete txn2;
3625 }
3626 
TEST_P(TransactionTest,IteratorTest)3627 TEST_P(TransactionTest, IteratorTest) {
3628   // This test does writes without snapshot validation, and then tries to create
3629   // iterator later, which is unsupported in write unprepared.
3630   if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3631     return;
3632   }
3633 
3634   WriteOptions write_options;
3635   ReadOptions read_options, snapshot_read_options;
3636   std::string value;
3637   Status s;
3638 
3639   // Write some keys to the db
3640   s = db->Put(write_options, "A", "a");
3641   ASSERT_OK(s);
3642 
3643   s = db->Put(write_options, "G", "g");
3644   ASSERT_OK(s);
3645 
3646   s = db->Put(write_options, "F", "f");
3647   ASSERT_OK(s);
3648 
3649   s = db->Put(write_options, "C", "c");
3650   ASSERT_OK(s);
3651 
3652   s = db->Put(write_options, "D", "d");
3653   ASSERT_OK(s);
3654 
3655   Transaction* txn = db->BeginTransaction(write_options);
3656   ASSERT_TRUE(txn);
3657 
3658   // Write some keys in a txn
3659   s = txn->Put("B", "b");
3660   ASSERT_OK(s);
3661 
3662   s = txn->Put("H", "h");
3663   ASSERT_OK(s);
3664 
3665   s = txn->Delete("D");
3666   ASSERT_OK(s);
3667 
3668   s = txn->Put("E", "e");
3669   ASSERT_OK(s);
3670 
3671   txn->SetSnapshot();
3672   const Snapshot* snapshot = txn->GetSnapshot();
3673 
3674   // Write some keys to the db after the snapshot
3675   s = db->Put(write_options, "BB", "xx");
3676   ASSERT_OK(s);
3677 
3678   s = db->Put(write_options, "C", "xx");
3679   ASSERT_OK(s);
3680 
3681   read_options.snapshot = snapshot;
3682   Iterator* iter = txn->GetIterator(read_options);
3683   ASSERT_OK(iter->status());
3684   iter->SeekToFirst();
3685 
3686   // Read all keys via iter and lock them all
3687   std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
3688   for (int i = 0; i < 7; i++) {
3689     ASSERT_OK(iter->status());
3690     ASSERT_TRUE(iter->Valid());
3691     ASSERT_EQ(results[i], iter->value().ToString());
3692 
3693     s = txn->GetForUpdate(read_options, iter->key(), nullptr);
3694     if (i == 2) {
3695       // "C" was modified after txn's snapshot
3696       ASSERT_TRUE(s.IsBusy());
3697     } else {
3698       ASSERT_OK(s);
3699     }
3700 
3701     iter->Next();
3702   }
3703   ASSERT_FALSE(iter->Valid());
3704 
3705   iter->Seek("G");
3706   ASSERT_OK(iter->status());
3707   ASSERT_TRUE(iter->Valid());
3708   ASSERT_EQ("g", iter->value().ToString());
3709 
3710   iter->Prev();
3711   ASSERT_OK(iter->status());
3712   ASSERT_TRUE(iter->Valid());
3713   ASSERT_EQ("f", iter->value().ToString());
3714 
3715   iter->Seek("D");
3716   ASSERT_OK(iter->status());
3717   ASSERT_TRUE(iter->Valid());
3718   ASSERT_EQ("e", iter->value().ToString());
3719 
3720   iter->Seek("C");
3721   ASSERT_OK(iter->status());
3722   ASSERT_TRUE(iter->Valid());
3723   ASSERT_EQ("c", iter->value().ToString());
3724 
3725   iter->Next();
3726   ASSERT_OK(iter->status());
3727   ASSERT_TRUE(iter->Valid());
3728   ASSERT_EQ("e", iter->value().ToString());
3729 
3730   iter->Seek("");
3731   ASSERT_OK(iter->status());
3732   ASSERT_TRUE(iter->Valid());
3733   ASSERT_EQ("a", iter->value().ToString());
3734 
3735   iter->Seek("X");
3736   ASSERT_OK(iter->status());
3737   ASSERT_FALSE(iter->Valid());
3738 
3739   iter->SeekToLast();
3740   ASSERT_OK(iter->status());
3741   ASSERT_TRUE(iter->Valid());
3742   ASSERT_EQ("h", iter->value().ToString());
3743 
3744   s = txn->Commit();
3745   ASSERT_OK(s);
3746 
3747   delete iter;
3748   delete txn;
3749 }
3750 
TEST_P(TransactionTest,DisableIndexingTest)3751 TEST_P(TransactionTest, DisableIndexingTest) {
3752   // Skip this test for write unprepared. It does not solely rely on WBWI for
3753   // read your own writes, so depending on whether batches are flushed or not,
3754   // only some writes will be visible.
3755   //
3756   // Also, write unprepared does not support creating iterators if there has
3757   // been txn->Put() without snapshot validation.
3758   if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3759     return;
3760   }
3761 
3762   WriteOptions write_options;
3763   ReadOptions read_options;
3764   std::string value;
3765   Status s;
3766 
3767   Transaction* txn = db->BeginTransaction(write_options);
3768   ASSERT_TRUE(txn);
3769 
3770   s = txn->Put("A", "a");
3771   ASSERT_OK(s);
3772 
3773   s = txn->Get(read_options, "A", &value);
3774   ASSERT_OK(s);
3775   ASSERT_EQ("a", value);
3776 
3777   txn->DisableIndexing();
3778 
3779   s = txn->Put("B", "b");
3780   ASSERT_OK(s);
3781 
3782   s = txn->Get(read_options, "B", &value);
3783   ASSERT_TRUE(s.IsNotFound());
3784 
3785   Iterator* iter = txn->GetIterator(read_options);
3786   ASSERT_OK(iter->status());
3787 
3788   iter->Seek("B");
3789   ASSERT_OK(iter->status());
3790   ASSERT_FALSE(iter->Valid());
3791 
3792   s = txn->Delete("A");
3793 
3794   s = txn->Get(read_options, "A", &value);
3795   ASSERT_OK(s);
3796   ASSERT_EQ("a", value);
3797 
3798   txn->EnableIndexing();
3799 
3800   s = txn->Put("B", "bb");
3801   ASSERT_OK(s);
3802 
3803   iter->Seek("B");
3804   ASSERT_OK(iter->status());
3805   ASSERT_TRUE(iter->Valid());
3806   ASSERT_EQ("bb", iter->value().ToString());
3807 
3808   s = txn->Get(read_options, "B", &value);
3809   ASSERT_OK(s);
3810   ASSERT_EQ("bb", value);
3811 
3812   s = txn->Put("A", "aa");
3813   ASSERT_OK(s);
3814 
3815   s = txn->Get(read_options, "A", &value);
3816   ASSERT_OK(s);
3817   ASSERT_EQ("aa", value);
3818 
3819   delete iter;
3820   delete txn;
3821 }
3822 
TEST_P(TransactionTest,SavepointTest)3823 TEST_P(TransactionTest, SavepointTest) {
3824   WriteOptions write_options;
3825   ReadOptions read_options, snapshot_read_options;
3826   std::string value;
3827   Status s;
3828 
3829   Transaction* txn = db->BeginTransaction(write_options);
3830   ASSERT_TRUE(txn);
3831 
3832   ASSERT_EQ(0, txn->GetNumPuts());
3833 
3834   s = txn->RollbackToSavePoint();
3835   ASSERT_TRUE(s.IsNotFound());
3836 
3837   txn->SetSavePoint();  // 1
3838 
3839   ASSERT_OK(txn->RollbackToSavePoint());  // Rollback to beginning of txn
3840   s = txn->RollbackToSavePoint();
3841   ASSERT_TRUE(s.IsNotFound());
3842 
3843   s = txn->Put("B", "b");
3844   ASSERT_OK(s);
3845 
3846   ASSERT_EQ(1, txn->GetNumPuts());
3847   ASSERT_EQ(0, txn->GetNumDeletes());
3848 
3849   s = txn->Commit();
3850   ASSERT_OK(s);
3851 
3852   s = db->Get(read_options, "B", &value);
3853   ASSERT_OK(s);
3854   ASSERT_EQ("b", value);
3855 
3856   delete txn;
3857   txn = db->BeginTransaction(write_options);
3858   ASSERT_TRUE(txn);
3859 
3860   s = txn->Put("A", "a");
3861   ASSERT_OK(s);
3862 
3863   s = txn->Put("B", "bb");
3864   ASSERT_OK(s);
3865 
3866   s = txn->Put("C", "c");
3867   ASSERT_OK(s);
3868 
3869   txn->SetSavePoint();  // 2
3870 
3871   s = txn->Delete("B");
3872   ASSERT_OK(s);
3873 
3874   s = txn->Put("C", "cc");
3875   ASSERT_OK(s);
3876 
3877   s = txn->Put("D", "d");
3878   ASSERT_OK(s);
3879 
3880   ASSERT_EQ(5, txn->GetNumPuts());
3881   ASSERT_EQ(1, txn->GetNumDeletes());
3882 
3883   ASSERT_OK(txn->RollbackToSavePoint());  // Rollback to 2
3884 
3885   ASSERT_EQ(3, txn->GetNumPuts());
3886   ASSERT_EQ(0, txn->GetNumDeletes());
3887 
3888   s = txn->Get(read_options, "A", &value);
3889   ASSERT_OK(s);
3890   ASSERT_EQ("a", value);
3891 
3892   s = txn->Get(read_options, "B", &value);
3893   ASSERT_OK(s);
3894   ASSERT_EQ("bb", value);
3895 
3896   s = txn->Get(read_options, "C", &value);
3897   ASSERT_OK(s);
3898   ASSERT_EQ("c", value);
3899 
3900   s = txn->Get(read_options, "D", &value);
3901   ASSERT_TRUE(s.IsNotFound());
3902 
3903   s = txn->Put("A", "a");
3904   ASSERT_OK(s);
3905 
3906   s = txn->Put("E", "e");
3907   ASSERT_OK(s);
3908 
3909   ASSERT_EQ(5, txn->GetNumPuts());
3910   ASSERT_EQ(0, txn->GetNumDeletes());
3911 
3912   // Rollback to beginning of txn
3913   s = txn->RollbackToSavePoint();
3914   ASSERT_TRUE(s.IsNotFound());
3915   ASSERT_OK(txn->Rollback());
3916 
3917   ASSERT_EQ(0, txn->GetNumPuts());
3918   ASSERT_EQ(0, txn->GetNumDeletes());
3919 
3920   s = txn->Get(read_options, "A", &value);
3921   ASSERT_TRUE(s.IsNotFound());
3922 
3923   s = txn->Get(read_options, "B", &value);
3924   ASSERT_OK(s);
3925   ASSERT_EQ("b", value);
3926 
3927   s = txn->Get(read_options, "D", &value);
3928   ASSERT_TRUE(s.IsNotFound());
3929 
3930   s = txn->Get(read_options, "D", &value);
3931   ASSERT_TRUE(s.IsNotFound());
3932 
3933   s = txn->Get(read_options, "E", &value);
3934   ASSERT_TRUE(s.IsNotFound());
3935 
3936   s = txn->Put("A", "aa");
3937   ASSERT_OK(s);
3938 
3939   s = txn->Put("F", "f");
3940   ASSERT_OK(s);
3941 
3942   ASSERT_EQ(2, txn->GetNumPuts());
3943   ASSERT_EQ(0, txn->GetNumDeletes());
3944 
3945   txn->SetSavePoint();  // 3
3946   txn->SetSavePoint();  // 4
3947 
3948   s = txn->Put("G", "g");
3949   ASSERT_OK(s);
3950 
3951   s = txn->SingleDelete("F");
3952   ASSERT_OK(s);
3953 
3954   s = txn->Delete("B");
3955   ASSERT_OK(s);
3956 
3957   s = txn->Get(read_options, "A", &value);
3958   ASSERT_OK(s);
3959   ASSERT_EQ("aa", value);
3960 
3961   s = txn->Get(read_options, "F", &value);
3962   // According to db.h, doing a SingleDelete on a key that has been
3963   // overwritten will have undefinied behavior.  So it is unclear what the
3964   // result of fetching "F" should be. The current implementation will
3965   // return NotFound in this case.
3966   ASSERT_TRUE(s.IsNotFound());
3967 
3968   s = txn->Get(read_options, "B", &value);
3969   ASSERT_TRUE(s.IsNotFound());
3970 
3971   ASSERT_EQ(3, txn->GetNumPuts());
3972   ASSERT_EQ(2, txn->GetNumDeletes());
3973 
3974   ASSERT_OK(txn->RollbackToSavePoint());  // Rollback to 3
3975 
3976   ASSERT_EQ(2, txn->GetNumPuts());
3977   ASSERT_EQ(0, txn->GetNumDeletes());
3978 
3979   s = txn->Get(read_options, "F", &value);
3980   ASSERT_OK(s);
3981   ASSERT_EQ("f", value);
3982 
3983   s = txn->Get(read_options, "G", &value);
3984   ASSERT_TRUE(s.IsNotFound());
3985 
3986   s = txn->Commit();
3987   ASSERT_OK(s);
3988 
3989   s = db->Get(read_options, "F", &value);
3990   ASSERT_OK(s);
3991   ASSERT_EQ("f", value);
3992 
3993   s = db->Get(read_options, "G", &value);
3994   ASSERT_TRUE(s.IsNotFound());
3995 
3996   s = db->Get(read_options, "A", &value);
3997   ASSERT_OK(s);
3998   ASSERT_EQ("aa", value);
3999 
4000   s = db->Get(read_options, "B", &value);
4001   ASSERT_OK(s);
4002   ASSERT_EQ("b", value);
4003 
4004   s = db->Get(read_options, "C", &value);
4005   ASSERT_TRUE(s.IsNotFound());
4006 
4007   s = db->Get(read_options, "D", &value);
4008   ASSERT_TRUE(s.IsNotFound());
4009 
4010   s = db->Get(read_options, "E", &value);
4011   ASSERT_TRUE(s.IsNotFound());
4012 
4013   delete txn;
4014 }
4015 
TEST_P(TransactionTest,SavepointTest2)4016 TEST_P(TransactionTest, SavepointTest2) {
4017   WriteOptions write_options;
4018   ReadOptions read_options;
4019   TransactionOptions txn_options;
4020   Status s;
4021 
4022   txn_options.lock_timeout = 1;  // 1 ms
4023   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4024   ASSERT_TRUE(txn1);
4025 
4026   s = txn1->Put("A", "");
4027   ASSERT_OK(s);
4028 
4029   txn1->SetSavePoint();  // 1
4030 
4031   s = txn1->Put("A", "a");
4032   ASSERT_OK(s);
4033 
4034   s = txn1->Put("C", "c");
4035   ASSERT_OK(s);
4036 
4037   txn1->SetSavePoint();  // 2
4038 
4039   s = txn1->Put("A", "a");
4040   ASSERT_OK(s);
4041   s = txn1->Put("B", "b");
4042   ASSERT_OK(s);
4043 
4044   ASSERT_OK(txn1->RollbackToSavePoint());  // Rollback to 2
4045 
4046   // Verify that "A" and "C" is still locked while "B" is not
4047   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4048   ASSERT_TRUE(txn2);
4049 
4050   s = txn2->Put("A", "a2");
4051   ASSERT_TRUE(s.IsTimedOut());
4052   s = txn2->Put("C", "c2");
4053   ASSERT_TRUE(s.IsTimedOut());
4054   s = txn2->Put("B", "b2");
4055   ASSERT_OK(s);
4056 
4057   s = txn1->Put("A", "aa");
4058   ASSERT_OK(s);
4059   s = txn1->Put("B", "bb");
4060   ASSERT_TRUE(s.IsTimedOut());
4061 
4062   s = txn2->Commit();
4063   ASSERT_OK(s);
4064   delete txn2;
4065 
4066   s = txn1->Put("A", "aaa");
4067   ASSERT_OK(s);
4068   s = txn1->Put("B", "bbb");
4069   ASSERT_OK(s);
4070   s = txn1->Put("C", "ccc");
4071   ASSERT_OK(s);
4072 
4073   txn1->SetSavePoint();                    // 3
4074   ASSERT_OK(txn1->RollbackToSavePoint());  // Rollback to 3
4075 
4076   // Verify that "A", "B", "C" are still locked
4077   txn2 = db->BeginTransaction(write_options, txn_options);
4078   ASSERT_TRUE(txn2);
4079 
4080   s = txn2->Put("A", "a2");
4081   ASSERT_TRUE(s.IsTimedOut());
4082   s = txn2->Put("B", "b2");
4083   ASSERT_TRUE(s.IsTimedOut());
4084   s = txn2->Put("C", "c2");
4085   ASSERT_TRUE(s.IsTimedOut());
4086 
4087   ASSERT_OK(txn1->RollbackToSavePoint());  // Rollback to 1
4088 
4089   // Verify that only "A" is locked
4090   s = txn2->Put("A", "a3");
4091   ASSERT_TRUE(s.IsTimedOut());
4092   s = txn2->Put("B", "b3");
4093   ASSERT_OK(s);
4094   s = txn2->Put("C", "c3po");
4095   ASSERT_OK(s);
4096 
4097   s = txn1->Commit();
4098   ASSERT_OK(s);
4099   delete txn1;
4100 
4101   // Verify "A" "C" "B" are no longer locked
4102   s = txn2->Put("A", "a4");
4103   ASSERT_OK(s);
4104   s = txn2->Put("B", "b4");
4105   ASSERT_OK(s);
4106   s = txn2->Put("C", "c4");
4107   ASSERT_OK(s);
4108 
4109   s = txn2->Commit();
4110   ASSERT_OK(s);
4111   delete txn2;
4112 }
4113 
TEST_P(TransactionTest,SavepointTest3)4114 TEST_P(TransactionTest, SavepointTest3) {
4115   WriteOptions write_options;
4116   ReadOptions read_options;
4117   TransactionOptions txn_options;
4118   Status s;
4119 
4120   txn_options.lock_timeout = 1;  // 1 ms
4121   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4122   ASSERT_TRUE(txn1);
4123 
4124   s = txn1->PopSavePoint();  // No SavePoint present
4125   ASSERT_TRUE(s.IsNotFound());
4126 
4127   s = txn1->Put("A", "");
4128   ASSERT_OK(s);
4129 
4130   s = txn1->PopSavePoint();  // Still no SavePoint present
4131   ASSERT_TRUE(s.IsNotFound());
4132 
4133   txn1->SetSavePoint();  // 1
4134 
4135   s = txn1->Put("A", "a");
4136   ASSERT_OK(s);
4137 
4138   s = txn1->PopSavePoint();  // Remove 1
4139   ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
4140 
4141   // Verify that "A" is still locked
4142   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4143   ASSERT_TRUE(txn2);
4144 
4145   s = txn2->Put("A", "a2");
4146   ASSERT_TRUE(s.IsTimedOut());
4147   delete txn2;
4148 
4149   txn1->SetSavePoint();  // 2
4150 
4151   s = txn1->Put("B", "b");
4152   ASSERT_OK(s);
4153 
4154   txn1->SetSavePoint();  // 3
4155 
4156   s = txn1->Put("B", "b2");
4157   ASSERT_OK(s);
4158 
4159   ASSERT_OK(txn1->RollbackToSavePoint());  // Roll back to 2
4160 
4161   s = txn1->PopSavePoint();
4162   ASSERT_OK(s);
4163 
4164   s = txn1->PopSavePoint();
4165   ASSERT_TRUE(s.IsNotFound());
4166 
4167   s = txn1->Commit();
4168   ASSERT_OK(s);
4169   delete txn1;
4170 
4171   std::string value;
4172 
4173   // tnx1 should have modified "A" to "a"
4174   s = db->Get(read_options, "A", &value);
4175   ASSERT_OK(s);
4176   ASSERT_EQ("a", value);
4177 
4178   // tnx1 should have set "B" to just "b"
4179   s = db->Get(read_options, "B", &value);
4180   ASSERT_OK(s);
4181   ASSERT_EQ("b", value);
4182 
4183   s = db->Get(read_options, "C", &value);
4184   ASSERT_TRUE(s.IsNotFound());
4185 }
4186 
TEST_P(TransactionTest,SavepointTest4)4187 TEST_P(TransactionTest, SavepointTest4) {
4188   WriteOptions write_options;
4189   ReadOptions read_options;
4190   TransactionOptions txn_options;
4191   Status s;
4192 
4193   txn_options.lock_timeout = 1;  // 1 ms
4194   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4195   ASSERT_TRUE(txn1);
4196 
4197   txn1->SetSavePoint();  // 1
4198   s = txn1->Put("A", "a");
4199   ASSERT_OK(s);
4200 
4201   txn1->SetSavePoint();  // 2
4202   s = txn1->Put("B", "b");
4203   ASSERT_OK(s);
4204 
4205   s = txn1->PopSavePoint();  // Remove 2
4206   ASSERT_OK(s);
4207 
4208   // Verify that A/B still exists.
4209   std::string value;
4210   ASSERT_OK(txn1->Get(read_options, "A", &value));
4211   ASSERT_EQ("a", value);
4212 
4213   ASSERT_OK(txn1->Get(read_options, "B", &value));
4214   ASSERT_EQ("b", value);
4215 
4216   ASSERT_OK(txn1->RollbackToSavePoint());  // Rollback to 1
4217 
4218   // Verify that everything was rolled back.
4219   s = txn1->Get(read_options, "A", &value);
4220   ASSERT_TRUE(s.IsNotFound());
4221 
4222   s = txn1->Get(read_options, "B", &value);
4223   ASSERT_TRUE(s.IsNotFound());
4224 
4225   // Nothing should be locked
4226   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4227   ASSERT_TRUE(txn2);
4228 
4229   s = txn2->Put("A", "");
4230   ASSERT_OK(s);
4231 
4232   s = txn2->Put("B", "");
4233   ASSERT_OK(s);
4234 
4235   delete txn2;
4236   delete txn1;
4237 }
4238 
TEST_P(TransactionTest,UndoGetForUpdateTest)4239 TEST_P(TransactionTest, UndoGetForUpdateTest) {
4240   WriteOptions write_options;
4241   ReadOptions read_options;
4242   TransactionOptions txn_options;
4243   std::string value;
4244   Status s;
4245 
4246   txn_options.lock_timeout = 1;  // 1 ms
4247   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4248   ASSERT_TRUE(txn1);
4249 
4250   txn1->UndoGetForUpdate("A");
4251 
4252   s = txn1->Commit();
4253   ASSERT_OK(s);
4254   delete txn1;
4255 
4256   txn1 = db->BeginTransaction(write_options, txn_options);
4257 
4258   txn1->UndoGetForUpdate("A");
4259   s = txn1->GetForUpdate(read_options, "A", &value);
4260   ASSERT_TRUE(s.IsNotFound());
4261 
4262   // Verify that A is locked
4263   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4264   s = txn2->Put("A", "a");
4265   ASSERT_TRUE(s.IsTimedOut());
4266 
4267   txn1->UndoGetForUpdate("A");
4268 
4269   // Verify that A is now unlocked
4270   s = txn2->Put("A", "a2");
4271   ASSERT_OK(s);
4272   ASSERT_OK(txn2->Commit());
4273   delete txn2;
4274   s = db->Get(read_options, "A", &value);
4275   ASSERT_OK(s);
4276   ASSERT_EQ("a2", value);
4277 
4278   s = txn1->Delete("A");
4279   ASSERT_OK(s);
4280   s = txn1->GetForUpdate(read_options, "A", &value);
4281   ASSERT_TRUE(s.IsNotFound());
4282 
4283   s = txn1->Put("B", "b3");
4284   ASSERT_OK(s);
4285   s = txn1->GetForUpdate(read_options, "B", &value);
4286   ASSERT_OK(s);
4287 
4288   txn1->UndoGetForUpdate("A");
4289   txn1->UndoGetForUpdate("B");
4290 
4291   // Verify that A and B are still locked
4292   txn2 = db->BeginTransaction(write_options, txn_options);
4293   s = txn2->Put("A", "a4");
4294   ASSERT_TRUE(s.IsTimedOut());
4295   s = txn2->Put("B", "b4");
4296   ASSERT_TRUE(s.IsTimedOut());
4297 
4298   ASSERT_OK(txn1->Rollback());
4299   delete txn1;
4300 
4301   // Verify that A and B are no longer locked
4302   s = txn2->Put("A", "a5");
4303   ASSERT_OK(s);
4304   s = txn2->Put("B", "b5");
4305   ASSERT_OK(s);
4306   s = txn2->Commit();
4307   delete txn2;
4308   ASSERT_OK(s);
4309 
4310   txn1 = db->BeginTransaction(write_options, txn_options);
4311 
4312   s = txn1->GetForUpdate(read_options, "A", &value);
4313   ASSERT_OK(s);
4314   s = txn1->GetForUpdate(read_options, "A", &value);
4315   ASSERT_OK(s);
4316   s = txn1->GetForUpdate(read_options, "C", &value);
4317   ASSERT_TRUE(s.IsNotFound());
4318   s = txn1->GetForUpdate(read_options, "A", &value);
4319   ASSERT_OK(s);
4320   s = txn1->GetForUpdate(read_options, "C", &value);
4321   ASSERT_TRUE(s.IsNotFound());
4322   s = txn1->GetForUpdate(read_options, "B", &value);
4323   ASSERT_OK(s);
4324   s = txn1->Put("B", "b5");
4325   s = txn1->GetForUpdate(read_options, "B", &value);
4326   ASSERT_OK(s);
4327 
4328   txn1->UndoGetForUpdate("A");
4329   txn1->UndoGetForUpdate("B");
4330   txn1->UndoGetForUpdate("C");
4331   txn1->UndoGetForUpdate("X");
4332 
4333   // Verify A,B,C are locked
4334   txn2 = db->BeginTransaction(write_options, txn_options);
4335   s = txn2->Put("A", "a6");
4336   ASSERT_TRUE(s.IsTimedOut());
4337   s = txn2->Delete("B");
4338   ASSERT_TRUE(s.IsTimedOut());
4339   s = txn2->Put("C", "c6");
4340   ASSERT_TRUE(s.IsTimedOut());
4341   s = txn2->Put("X", "x6");
4342   ASSERT_OK(s);
4343 
4344   txn1->UndoGetForUpdate("A");
4345   txn1->UndoGetForUpdate("B");
4346   txn1->UndoGetForUpdate("C");
4347   txn1->UndoGetForUpdate("X");
4348 
4349   // Verify A,B are locked and C is not
4350   s = txn2->Put("A", "a6");
4351   ASSERT_TRUE(s.IsTimedOut());
4352   s = txn2->Delete("B");
4353   ASSERT_TRUE(s.IsTimedOut());
4354   s = txn2->Put("C", "c6");
4355   ASSERT_OK(s);
4356   s = txn2->Put("X", "x6");
4357   ASSERT_OK(s);
4358 
4359   txn1->UndoGetForUpdate("A");
4360   txn1->UndoGetForUpdate("B");
4361   txn1->UndoGetForUpdate("C");
4362   txn1->UndoGetForUpdate("X");
4363 
4364   // Verify B is locked and A and C are not
4365   s = txn2->Put("A", "a7");
4366   ASSERT_OK(s);
4367   s = txn2->Delete("B");
4368   ASSERT_TRUE(s.IsTimedOut());
4369   s = txn2->Put("C", "c7");
4370   ASSERT_OK(s);
4371   s = txn2->Put("X", "x7");
4372   ASSERT_OK(s);
4373 
4374   s = txn2->Commit();
4375   ASSERT_OK(s);
4376   delete txn2;
4377 
4378   s = txn1->Commit();
4379   ASSERT_OK(s);
4380   delete txn1;
4381 }
4382 
TEST_P(TransactionTest,UndoGetForUpdateTest2)4383 TEST_P(TransactionTest, UndoGetForUpdateTest2) {
4384   WriteOptions write_options;
4385   ReadOptions read_options;
4386   TransactionOptions txn_options;
4387   std::string value;
4388   Status s;
4389 
4390   s = db->Put(write_options, "A", "");
4391   ASSERT_OK(s);
4392 
4393   txn_options.lock_timeout = 1;  // 1 ms
4394   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4395   ASSERT_TRUE(txn1);
4396 
4397   s = txn1->GetForUpdate(read_options, "A", &value);
4398   ASSERT_OK(s);
4399   s = txn1->GetForUpdate(read_options, "B", &value);
4400   ASSERT_TRUE(s.IsNotFound());
4401 
4402   s = txn1->Put("F", "f");
4403   ASSERT_OK(s);
4404 
4405   txn1->SetSavePoint();  // 1
4406 
4407   txn1->UndoGetForUpdate("A");
4408 
4409   s = txn1->GetForUpdate(read_options, "C", &value);
4410   ASSERT_TRUE(s.IsNotFound());
4411   s = txn1->GetForUpdate(read_options, "D", &value);
4412   ASSERT_TRUE(s.IsNotFound());
4413 
4414   s = txn1->Put("E", "e");
4415   ASSERT_OK(s);
4416   s = txn1->GetForUpdate(read_options, "E", &value);
4417   ASSERT_OK(s);
4418 
4419   s = txn1->GetForUpdate(read_options, "F", &value);
4420   ASSERT_OK(s);
4421 
4422   // Verify A,B,C,D,E,F are still locked
4423   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4424   s = txn2->Put("A", "a1");
4425   ASSERT_TRUE(s.IsTimedOut());
4426   s = txn2->Put("B", "b1");
4427   ASSERT_TRUE(s.IsTimedOut());
4428   s = txn2->Put("C", "c1");
4429   ASSERT_TRUE(s.IsTimedOut());
4430   s = txn2->Put("D", "d1");
4431   ASSERT_TRUE(s.IsTimedOut());
4432   s = txn2->Put("E", "e1");
4433   ASSERT_TRUE(s.IsTimedOut());
4434   s = txn2->Put("F", "f1");
4435   ASSERT_TRUE(s.IsTimedOut());
4436 
4437   txn1->UndoGetForUpdate("C");
4438   txn1->UndoGetForUpdate("E");
4439 
4440   // Verify A,B,D,E,F are still locked and C is not.
4441   s = txn2->Put("A", "a2");
4442   ASSERT_TRUE(s.IsTimedOut());
4443   s = txn2->Put("B", "b2");
4444   ASSERT_TRUE(s.IsTimedOut());
4445   s = txn2->Put("D", "d2");
4446   ASSERT_TRUE(s.IsTimedOut());
4447   s = txn2->Put("E", "e2");
4448   ASSERT_TRUE(s.IsTimedOut());
4449   s = txn2->Put("F", "f2");
4450   ASSERT_TRUE(s.IsTimedOut());
4451   s = txn2->Put("C", "c2");
4452   ASSERT_OK(s);
4453 
4454   txn1->SetSavePoint();  // 2
4455 
4456   s = txn1->Put("H", "h");
4457   ASSERT_OK(s);
4458 
4459   txn1->UndoGetForUpdate("A");
4460   txn1->UndoGetForUpdate("B");
4461   txn1->UndoGetForUpdate("C");
4462   txn1->UndoGetForUpdate("D");
4463   txn1->UndoGetForUpdate("E");
4464   txn1->UndoGetForUpdate("F");
4465   txn1->UndoGetForUpdate("G");
4466   txn1->UndoGetForUpdate("H");
4467 
4468   // Verify A,B,D,E,F,H are still locked and C,G are not.
4469   s = txn2->Put("A", "a3");
4470   ASSERT_TRUE(s.IsTimedOut());
4471   s = txn2->Put("B", "b3");
4472   ASSERT_TRUE(s.IsTimedOut());
4473   s = txn2->Put("D", "d3");
4474   ASSERT_TRUE(s.IsTimedOut());
4475   s = txn2->Put("E", "e3");
4476   ASSERT_TRUE(s.IsTimedOut());
4477   s = txn2->Put("F", "f3");
4478   ASSERT_TRUE(s.IsTimedOut());
4479   s = txn2->Put("H", "h3");
4480   ASSERT_TRUE(s.IsTimedOut());
4481   s = txn2->Put("C", "c3");
4482   ASSERT_OK(s);
4483   s = txn2->Put("G", "g3");
4484   ASSERT_OK(s);
4485 
4486   ASSERT_OK(txn1->RollbackToSavePoint());  // rollback to 2
4487 
4488   // Verify A,B,D,E,F are still locked and C,G,H are not.
4489   s = txn2->Put("A", "a3");
4490   ASSERT_TRUE(s.IsTimedOut());
4491   s = txn2->Put("B", "b3");
4492   ASSERT_TRUE(s.IsTimedOut());
4493   s = txn2->Put("D", "d3");
4494   ASSERT_TRUE(s.IsTimedOut());
4495   s = txn2->Put("E", "e3");
4496   ASSERT_TRUE(s.IsTimedOut());
4497   s = txn2->Put("F", "f3");
4498   ASSERT_TRUE(s.IsTimedOut());
4499   s = txn2->Put("C", "c3");
4500   ASSERT_OK(s);
4501   s = txn2->Put("G", "g3");
4502   ASSERT_OK(s);
4503   s = txn2->Put("H", "h3");
4504   ASSERT_OK(s);
4505 
4506   txn1->UndoGetForUpdate("A");
4507   txn1->UndoGetForUpdate("B");
4508   txn1->UndoGetForUpdate("C");
4509   txn1->UndoGetForUpdate("D");
4510   txn1->UndoGetForUpdate("E");
4511   txn1->UndoGetForUpdate("F");
4512   txn1->UndoGetForUpdate("G");
4513   txn1->UndoGetForUpdate("H");
4514 
4515   // Verify A,B,E,F are still locked and C,D,G,H are not.
4516   s = txn2->Put("A", "a3");
4517   ASSERT_TRUE(s.IsTimedOut());
4518   s = txn2->Put("B", "b3");
4519   ASSERT_TRUE(s.IsTimedOut());
4520   s = txn2->Put("E", "e3");
4521   ASSERT_TRUE(s.IsTimedOut());
4522   s = txn2->Put("F", "f3");
4523   ASSERT_TRUE(s.IsTimedOut());
4524   s = txn2->Put("C", "c3");
4525   ASSERT_OK(s);
4526   s = txn2->Put("D", "d3");
4527   ASSERT_OK(s);
4528   s = txn2->Put("G", "g3");
4529   ASSERT_OK(s);
4530   s = txn2->Put("H", "h3");
4531   ASSERT_OK(s);
4532 
4533   ASSERT_OK(txn1->RollbackToSavePoint());  // rollback to 1
4534 
4535   // Verify A,B,F are still locked and C,D,E,G,H are not.
4536   s = txn2->Put("A", "a3");
4537   ASSERT_TRUE(s.IsTimedOut());
4538   s = txn2->Put("B", "b3");
4539   ASSERT_TRUE(s.IsTimedOut());
4540   s = txn2->Put("F", "f3");
4541   ASSERT_TRUE(s.IsTimedOut());
4542   s = txn2->Put("C", "c3");
4543   ASSERT_OK(s);
4544   s = txn2->Put("D", "d3");
4545   ASSERT_OK(s);
4546   s = txn2->Put("E", "e3");
4547   ASSERT_OK(s);
4548   s = txn2->Put("G", "g3");
4549   ASSERT_OK(s);
4550   s = txn2->Put("H", "h3");
4551   ASSERT_OK(s);
4552 
4553   txn1->UndoGetForUpdate("A");
4554   txn1->UndoGetForUpdate("B");
4555   txn1->UndoGetForUpdate("C");
4556   txn1->UndoGetForUpdate("D");
4557   txn1->UndoGetForUpdate("E");
4558   txn1->UndoGetForUpdate("F");
4559   txn1->UndoGetForUpdate("G");
4560   txn1->UndoGetForUpdate("H");
4561 
4562   // Verify F is still locked and A,B,C,D,E,G,H are not.
4563   s = txn2->Put("F", "f3");
4564   ASSERT_TRUE(s.IsTimedOut());
4565   s = txn2->Put("A", "a3");
4566   ASSERT_OK(s);
4567   s = txn2->Put("B", "b3");
4568   ASSERT_OK(s);
4569   s = txn2->Put("C", "c3");
4570   ASSERT_OK(s);
4571   s = txn2->Put("D", "d3");
4572   ASSERT_OK(s);
4573   s = txn2->Put("E", "e3");
4574   ASSERT_OK(s);
4575   s = txn2->Put("G", "g3");
4576   ASSERT_OK(s);
4577   s = txn2->Put("H", "h3");
4578   ASSERT_OK(s);
4579 
4580   s = txn1->Commit();
4581   ASSERT_OK(s);
4582   s = txn2->Commit();
4583   ASSERT_OK(s);
4584 
4585   delete txn1;
4586   delete txn2;
4587 }
4588 
TEST_P(TransactionTest,TimeoutTest)4589 TEST_P(TransactionTest, TimeoutTest) {
4590   WriteOptions write_options;
4591   ReadOptions read_options;
4592   std::string value;
4593   Status s;
4594 
4595   delete db;
4596   db = nullptr;
4597 
4598   // transaction writes have an infinite timeout,
4599   // but we will override this when we start a txn
4600   // db writes have infinite timeout
4601   txn_db_options.transaction_lock_timeout = -1;
4602   txn_db_options.default_lock_timeout = -1;
4603 
4604   s = TransactionDB::Open(options, txn_db_options, dbname, &db);
4605   assert(db != nullptr);
4606   ASSERT_OK(s);
4607 
4608   s = db->Put(write_options, "aaa", "aaa");
4609   ASSERT_OK(s);
4610 
4611   TransactionOptions txn_options0;
4612   txn_options0.expiration = 100;  // 100ms
4613   txn_options0.lock_timeout = 50;  // txn timeout no longer infinite
4614   Transaction* txn1 = db->BeginTransaction(write_options, txn_options0);
4615 
4616   s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4617   ASSERT_OK(s);
4618 
4619   // Conflicts with previous GetForUpdate.
4620   // Since db writes do not have a timeout, this should eventually succeed when
4621   // the transaction expires.
4622   s = db->Put(write_options, "aaa", "xxx");
4623   ASSERT_OK(s);
4624 
4625   ASSERT_GE(txn1->GetElapsedTime(),
4626             static_cast<uint64_t>(txn_options0.expiration));
4627 
4628   s = txn1->Commit();
4629   ASSERT_TRUE(s.IsExpired());  // expired!
4630 
4631   s = db->Get(read_options, "aaa", &value);
4632   ASSERT_OK(s);
4633   ASSERT_EQ("xxx", value);
4634 
4635   delete txn1;
4636   delete db;
4637 
4638   // transaction writes have 10ms timeout,
4639   // db writes have infinite timeout
4640   txn_db_options.transaction_lock_timeout = 50;
4641   txn_db_options.default_lock_timeout = -1;
4642 
4643   s = TransactionDB::Open(options, txn_db_options, dbname, &db);
4644   ASSERT_OK(s);
4645 
4646   s = db->Put(write_options, "aaa", "aaa");
4647   ASSERT_OK(s);
4648 
4649   TransactionOptions txn_options;
4650   txn_options.expiration = 100;  // 100ms
4651   txn1 = db->BeginTransaction(write_options, txn_options);
4652 
4653   s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4654   ASSERT_OK(s);
4655 
4656   // Conflicts with previous GetForUpdate.
4657   // Since db writes do not have a timeout, this should eventually succeed when
4658   // the transaction expires.
4659   s = db->Put(write_options, "aaa", "xxx");
4660   ASSERT_OK(s);
4661 
4662   s = txn1->Commit();
4663   ASSERT_NOK(s);  // expired!
4664 
4665   s = db->Get(read_options, "aaa", &value);
4666   ASSERT_OK(s);
4667   ASSERT_EQ("xxx", value);
4668 
4669   delete txn1;
4670   txn_options.expiration = 6000000;  // 100 minutes
4671   txn_options.lock_timeout = 1;      // 1ms
4672   txn1 = db->BeginTransaction(write_options, txn_options);
4673   txn1->SetLockTimeout(100);
4674 
4675   TransactionOptions txn_options2;
4676   txn_options2.expiration = 10;  // 10ms
4677   Transaction* txn2 = db->BeginTransaction(write_options, txn_options2);
4678   ASSERT_OK(s);
4679 
4680   s = txn2->Put("a", "2");
4681   ASSERT_OK(s);
4682 
4683   // txn1 has a lock timeout longer than txn2's expiration, so it will win
4684   s = txn1->Delete("a");
4685   ASSERT_OK(s);
4686 
4687   s = txn1->Commit();
4688   ASSERT_OK(s);
4689 
4690   // txn2 should be expired out since txn1 waiting until its timeout expired.
4691   s = txn2->Commit();
4692   ASSERT_TRUE(s.IsExpired());
4693 
4694   delete txn1;
4695   delete txn2;
4696   txn_options.expiration = 6000000;  // 100 minutes
4697   txn1 = db->BeginTransaction(write_options, txn_options);
4698   txn_options2.expiration = 100000000;
4699   txn2 = db->BeginTransaction(write_options, txn_options2);
4700 
4701   s = txn1->Delete("asdf");
4702   ASSERT_OK(s);
4703 
4704   // txn2 has a smaller lock timeout than txn1's expiration, so it will time out
4705   s = txn2->Delete("asdf");
4706   ASSERT_TRUE(s.IsTimedOut());
4707   ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
4708 
4709   s = txn1->Commit();
4710   ASSERT_OK(s);
4711 
4712   s = txn2->Put("asdf", "asdf");
4713   ASSERT_OK(s);
4714 
4715   s = txn2->Commit();
4716   ASSERT_OK(s);
4717 
4718   s = db->Get(read_options, "asdf", &value);
4719   ASSERT_OK(s);
4720   ASSERT_EQ("asdf", value);
4721 
4722   delete txn1;
4723   delete txn2;
4724 }
4725 
TEST_P(TransactionTest,SingleDeleteTest)4726 TEST_P(TransactionTest, SingleDeleteTest) {
4727   WriteOptions write_options;
4728   ReadOptions read_options;
4729   std::string value;
4730   Status s;
4731 
4732   Transaction* txn = db->BeginTransaction(write_options);
4733   ASSERT_TRUE(txn);
4734 
4735   s = txn->SingleDelete("A");
4736   ASSERT_OK(s);
4737 
4738   s = txn->Get(read_options, "A", &value);
4739   ASSERT_TRUE(s.IsNotFound());
4740 
4741   s = txn->Commit();
4742   ASSERT_OK(s);
4743   delete txn;
4744 
4745   txn = db->BeginTransaction(write_options);
4746 
4747   s = txn->SingleDelete("A");
4748   ASSERT_OK(s);
4749 
4750   s = txn->Put("A", "a");
4751   ASSERT_OK(s);
4752 
4753   s = txn->Get(read_options, "A", &value);
4754   ASSERT_OK(s);
4755   ASSERT_EQ("a", value);
4756 
4757   s = txn->Commit();
4758   ASSERT_OK(s);
4759   delete txn;
4760 
4761   s = db->Get(read_options, "A", &value);
4762   ASSERT_OK(s);
4763   ASSERT_EQ("a", value);
4764 
4765   txn = db->BeginTransaction(write_options);
4766 
4767   s = txn->SingleDelete("A");
4768   ASSERT_OK(s);
4769 
4770   s = txn->Get(read_options, "A", &value);
4771   ASSERT_TRUE(s.IsNotFound());
4772 
4773   s = txn->Commit();
4774   ASSERT_OK(s);
4775   delete txn;
4776 
4777   s = db->Get(read_options, "A", &value);
4778   ASSERT_TRUE(s.IsNotFound());
4779 
4780   txn = db->BeginTransaction(write_options);
4781   Transaction* txn2 = db->BeginTransaction(write_options);
4782   txn2->SetSnapshot();
4783 
4784   s = txn->Put("A", "a");
4785   ASSERT_OK(s);
4786 
4787   s = txn->Put("A", "a2");
4788   ASSERT_OK(s);
4789 
4790   s = txn->SingleDelete("A");
4791   ASSERT_OK(s);
4792 
4793   s = txn->SingleDelete("B");
4794   ASSERT_OK(s);
4795 
4796   // According to db.h, doing a SingleDelete on a key that has been
4797   // overwritten will have undefinied behavior.  So it is unclear what the
4798   // result of fetching "A" should be. The current implementation will
4799   // return NotFound in this case.
4800   s = txn->Get(read_options, "A", &value);
4801   ASSERT_TRUE(s.IsNotFound());
4802 
4803   s = txn2->Put("B", "b");
4804   ASSERT_TRUE(s.IsTimedOut());
4805   s = txn2->Commit();
4806   ASSERT_OK(s);
4807   delete txn2;
4808 
4809   s = txn->Commit();
4810   ASSERT_OK(s);
4811   delete txn;
4812 
4813   // According to db.h, doing a SingleDelete on a key that has been
4814   // overwritten will have undefinied behavior.  So it is unclear what the
4815   // result of fetching "A" should be. The current implementation will
4816   // return NotFound in this case.
4817   s = db->Get(read_options, "A", &value);
4818   ASSERT_TRUE(s.IsNotFound());
4819 
4820   s = db->Get(read_options, "B", &value);
4821   ASSERT_TRUE(s.IsNotFound());
4822 }
4823 
TEST_P(TransactionTest,MergeTest)4824 TEST_P(TransactionTest, MergeTest) {
4825   WriteOptions write_options;
4826   ReadOptions read_options;
4827   std::string value;
4828   Status s;
4829 
4830   Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
4831   ASSERT_TRUE(txn);
4832 
4833   s = db->Put(write_options, "A", "a0");
4834   ASSERT_OK(s);
4835 
4836   s = txn->Merge("A", "1");
4837   ASSERT_OK(s);
4838 
4839   s = txn->Merge("A", "2");
4840   ASSERT_OK(s);
4841 
4842   s = txn->Get(read_options, "A", &value);
4843   ASSERT_OK(s);
4844   ASSERT_EQ("a0,1,2", value);
4845 
4846   s = txn->Put("A", "a");
4847   ASSERT_OK(s);
4848 
4849   s = txn->Get(read_options, "A", &value);
4850   ASSERT_OK(s);
4851   ASSERT_EQ("a", value);
4852 
4853   s = txn->Merge("A", "3");
4854   ASSERT_OK(s);
4855 
4856   s = txn->Get(read_options, "A", &value);
4857   ASSERT_OK(s);
4858   ASSERT_EQ("a,3", value);
4859 
4860   TransactionOptions txn_options;
4861   txn_options.lock_timeout = 1;  // 1 ms
4862   Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4863   ASSERT_TRUE(txn2);
4864 
4865   // verify that txn has "A" locked
4866   s = txn2->Merge("A", "4");
4867   ASSERT_TRUE(s.IsTimedOut());
4868 
4869   s = txn2->Commit();
4870   ASSERT_OK(s);
4871   delete txn2;
4872 
4873   s = txn->Commit();
4874   ASSERT_OK(s);
4875   delete txn;
4876 
4877   s = db->Get(read_options, "A", &value);
4878   ASSERT_OK(s);
4879   ASSERT_EQ("a,3", value);
4880 }
4881 
TEST_P(TransactionTest,DeleteRangeSupportTest)4882 TEST_P(TransactionTest, DeleteRangeSupportTest) {
4883   // The `DeleteRange()` API is banned everywhere.
4884   ASSERT_TRUE(
4885       db->DeleteRange(WriteOptions(), db->DefaultColumnFamily(), "a", "b")
4886           .IsNotSupported());
4887 
4888   // But range deletions can be added via the `Write()` API by specifying the
4889   // proper flags to promise there are no conflicts according to the DB type
4890   // (see `TransactionDB::DeleteRange()` API doc for details).
4891   for (bool skip_concurrency_control : {false, true}) {
4892     for (bool skip_duplicate_key_check : {false, true}) {
4893       ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
4894       WriteBatch wb;
4895       ASSERT_OK(wb.DeleteRange("a", "b"));
4896       TransactionDBWriteOptimizations flags;
4897       flags.skip_concurrency_control = skip_concurrency_control;
4898       flags.skip_duplicate_key_check = skip_duplicate_key_check;
4899       Status s = db->Write(WriteOptions(), flags, &wb);
4900       std::string value;
4901       switch (txn_db_options.write_policy) {
4902         case WRITE_COMMITTED:
4903           if (skip_concurrency_control) {
4904             ASSERT_OK(s);
4905             ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
4906           } else {
4907             ASSERT_NOK(s);
4908             ASSERT_OK(db->Get(ReadOptions(), "a", &value));
4909           }
4910           break;
4911         case WRITE_PREPARED:
4912           // Intentional fall-through
4913         case WRITE_UNPREPARED:
4914           if (skip_concurrency_control && skip_duplicate_key_check) {
4915             ASSERT_OK(s);
4916             ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
4917           } else {
4918             ASSERT_NOK(s);
4919             ASSERT_OK(db->Get(ReadOptions(), "a", &value));
4920           }
4921           break;
4922       }
4923       // Without any promises from the user, range deletion via other `Write()`
4924       // APIs are still banned.
4925       ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
4926       ASSERT_NOK(db->Write(WriteOptions(), &wb));
4927       ASSERT_OK(db->Get(ReadOptions(), "a", &value));
4928     }
4929   }
4930 }
4931 
TEST_P(TransactionTest,DeferSnapshotTest)4932 TEST_P(TransactionTest, DeferSnapshotTest) {
4933   WriteOptions write_options;
4934   ReadOptions read_options;
4935   std::string value;
4936   Status s;
4937 
4938   s = db->Put(write_options, "A", "a0");
4939   ASSERT_OK(s);
4940 
4941   Transaction* txn1 = db->BeginTransaction(write_options);
4942   Transaction* txn2 = db->BeginTransaction(write_options);
4943 
4944   txn1->SetSnapshotOnNextOperation();
4945   auto snapshot = txn1->GetSnapshot();
4946   ASSERT_FALSE(snapshot);
4947 
4948   s = txn2->Put("A", "a2");
4949   ASSERT_OK(s);
4950   s = txn2->Commit();
4951   ASSERT_OK(s);
4952   delete txn2;
4953 
4954   s = txn1->GetForUpdate(read_options, "A", &value);
4955   // Should not conflict with txn2 since snapshot wasn't set until
4956   // GetForUpdate was called.
4957   ASSERT_OK(s);
4958   ASSERT_EQ("a2", value);
4959 
4960   s = txn1->Put("A", "a1");
4961   ASSERT_OK(s);
4962 
4963   s = db->Put(write_options, "B", "b0");
4964   ASSERT_OK(s);
4965 
4966   // Cannot lock B since it was written after the snapshot was set
4967   s = txn1->Put("B", "b1");
4968   ASSERT_TRUE(s.IsBusy());
4969 
4970   s = txn1->Commit();
4971   ASSERT_OK(s);
4972   delete txn1;
4973 
4974   s = db->Get(read_options, "A", &value);
4975   ASSERT_OK(s);
4976   ASSERT_EQ("a1", value);
4977 
4978   s = db->Get(read_options, "B", &value);
4979   ASSERT_OK(s);
4980   ASSERT_EQ("b0", value);
4981 }
4982 
TEST_P(TransactionTest,DeferSnapshotTest2)4983 TEST_P(TransactionTest, DeferSnapshotTest2) {
4984   WriteOptions write_options;
4985   ReadOptions read_options, snapshot_read_options;
4986   std::string value;
4987   Status s;
4988 
4989   Transaction* txn1 = db->BeginTransaction(write_options);
4990 
4991   txn1->SetSnapshot();
4992 
4993   s = txn1->Put("A", "a1");
4994   ASSERT_OK(s);
4995 
4996   s = db->Put(write_options, "C", "c0");
4997   ASSERT_OK(s);
4998   s = db->Put(write_options, "D", "d0");
4999   ASSERT_OK(s);
5000 
5001   snapshot_read_options.snapshot = txn1->GetSnapshot();
5002 
5003   txn1->SetSnapshotOnNextOperation();
5004 
5005   s = txn1->Get(snapshot_read_options, "C", &value);
5006   // Snapshot was set before C was written
5007   ASSERT_TRUE(s.IsNotFound());
5008   s = txn1->Get(snapshot_read_options, "D", &value);
5009   // Snapshot was set before D was written
5010   ASSERT_TRUE(s.IsNotFound());
5011 
5012   // Snapshot should not have changed yet.
5013   snapshot_read_options.snapshot = txn1->GetSnapshot();
5014 
5015   s = txn1->Get(snapshot_read_options, "C", &value);
5016   // Snapshot was set before C was written
5017   ASSERT_TRUE(s.IsNotFound());
5018   s = txn1->Get(snapshot_read_options, "D", &value);
5019   // Snapshot was set before D was written
5020   ASSERT_TRUE(s.IsNotFound());
5021 
5022   s = txn1->GetForUpdate(read_options, "C", &value);
5023   ASSERT_OK(s);
5024   ASSERT_EQ("c0", value);
5025 
5026   s = db->Put(write_options, "D", "d00");
5027   ASSERT_OK(s);
5028 
5029   // Snapshot is now set
5030   snapshot_read_options.snapshot = txn1->GetSnapshot();
5031   s = txn1->Get(snapshot_read_options, "D", &value);
5032   ASSERT_OK(s);
5033   ASSERT_EQ("d0", value);
5034 
5035   s = txn1->Commit();
5036   ASSERT_OK(s);
5037   delete txn1;
5038 }
5039 
TEST_P(TransactionTest,DeferSnapshotSavePointTest)5040 TEST_P(TransactionTest, DeferSnapshotSavePointTest) {
5041   WriteOptions write_options;
5042   ReadOptions read_options, snapshot_read_options;
5043   std::string value;
5044   Status s;
5045 
5046   Transaction* txn1 = db->BeginTransaction(write_options);
5047 
5048   txn1->SetSavePoint();  // 1
5049 
5050   s = db->Put(write_options, "T", "1");
5051   ASSERT_OK(s);
5052 
5053   txn1->SetSnapshotOnNextOperation();
5054 
5055   s = db->Put(write_options, "T", "2");
5056   ASSERT_OK(s);
5057 
5058   txn1->SetSavePoint();  // 2
5059 
5060   s = db->Put(write_options, "T", "3");
5061   ASSERT_OK(s);
5062 
5063   s = txn1->Put("A", "a");
5064   ASSERT_OK(s);
5065 
5066   txn1->SetSavePoint();  // 3
5067 
5068   s = db->Put(write_options, "T", "4");
5069   ASSERT_OK(s);
5070 
5071   txn1->SetSnapshot();
5072   txn1->SetSnapshotOnNextOperation();
5073 
5074   txn1->SetSavePoint();  // 4
5075 
5076   s = db->Put(write_options, "T", "5");
5077   ASSERT_OK(s);
5078 
5079   snapshot_read_options.snapshot = txn1->GetSnapshot();
5080   s = txn1->Get(snapshot_read_options, "T", &value);
5081   ASSERT_OK(s);
5082   ASSERT_EQ("4", value);
5083 
5084   s = txn1->Put("A", "a1");
5085   ASSERT_OK(s);
5086 
5087   snapshot_read_options.snapshot = txn1->GetSnapshot();
5088   s = txn1->Get(snapshot_read_options, "T", &value);
5089   ASSERT_OK(s);
5090   ASSERT_EQ("5", value);
5091 
5092   s = txn1->RollbackToSavePoint();  // Rollback to 4
5093   ASSERT_OK(s);
5094 
5095   snapshot_read_options.snapshot = txn1->GetSnapshot();
5096   s = txn1->Get(snapshot_read_options, "T", &value);
5097   ASSERT_OK(s);
5098   ASSERT_EQ("4", value);
5099 
5100   s = txn1->RollbackToSavePoint();  // Rollback to 3
5101   ASSERT_OK(s);
5102 
5103   snapshot_read_options.snapshot = txn1->GetSnapshot();
5104   s = txn1->Get(snapshot_read_options, "T", &value);
5105   ASSERT_OK(s);
5106   ASSERT_EQ("3", value);
5107 
5108   s = txn1->Get(read_options, "T", &value);
5109   ASSERT_OK(s);
5110   ASSERT_EQ("5", value);
5111 
5112   s = txn1->RollbackToSavePoint();  // Rollback to 2
5113   ASSERT_OK(s);
5114 
5115   snapshot_read_options.snapshot = txn1->GetSnapshot();
5116   ASSERT_FALSE(snapshot_read_options.snapshot);
5117   s = txn1->Get(snapshot_read_options, "T", &value);
5118   ASSERT_OK(s);
5119   ASSERT_EQ("5", value);
5120 
5121   s = txn1->Delete("A");
5122   ASSERT_OK(s);
5123 
5124   snapshot_read_options.snapshot = txn1->GetSnapshot();
5125   ASSERT_TRUE(snapshot_read_options.snapshot);
5126   s = txn1->Get(snapshot_read_options, "T", &value);
5127   ASSERT_OK(s);
5128   ASSERT_EQ("5", value);
5129 
5130   s = txn1->RollbackToSavePoint();  // Rollback to 1
5131   ASSERT_OK(s);
5132 
5133   s = txn1->Delete("A");
5134   ASSERT_OK(s);
5135 
5136   snapshot_read_options.snapshot = txn1->GetSnapshot();
5137   ASSERT_FALSE(snapshot_read_options.snapshot);
5138   s = txn1->Get(snapshot_read_options, "T", &value);
5139   ASSERT_OK(s);
5140   ASSERT_EQ("5", value);
5141 
5142   s = txn1->Commit();
5143   ASSERT_OK(s);
5144 
5145   delete txn1;
5146 }
5147 
TEST_P(TransactionTest,SetSnapshotOnNextOperationWithNotification)5148 TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
5149   WriteOptions write_options;
5150   ReadOptions read_options;
5151   std::string value;
5152 
5153   class Notifier : public TransactionNotifier {
5154    private:
5155     const Snapshot** snapshot_ptr_;
5156 
5157    public:
5158     explicit Notifier(const Snapshot** snapshot_ptr)
5159         : snapshot_ptr_(snapshot_ptr) {}
5160 
5161     void SnapshotCreated(const Snapshot* newSnapshot) override {
5162       *snapshot_ptr_ = newSnapshot;
5163     }
5164   };
5165 
5166   std::shared_ptr<Notifier> notifier =
5167       std::make_shared<Notifier>(&read_options.snapshot);
5168   Status s;
5169 
5170   s = db->Put(write_options, "B", "0");
5171   ASSERT_OK(s);
5172 
5173   Transaction* txn1 = db->BeginTransaction(write_options);
5174 
5175   txn1->SetSnapshotOnNextOperation(notifier);
5176   ASSERT_FALSE(read_options.snapshot);
5177 
5178   s = db->Put(write_options, "B", "1");
5179   ASSERT_OK(s);
5180 
5181   // A Get does not generate the snapshot
5182   s = txn1->Get(read_options, "B", &value);
5183   ASSERT_OK(s);
5184   ASSERT_FALSE(read_options.snapshot);
5185   ASSERT_EQ(value, "1");
5186 
5187   // Any other operation does
5188   s = txn1->Put("A", "0");
5189   ASSERT_OK(s);
5190 
5191   // Now change "B".
5192   s = db->Put(write_options, "B", "2");
5193   ASSERT_OK(s);
5194 
5195   // The original value should still be read
5196   s = txn1->Get(read_options, "B", &value);
5197   ASSERT_OK(s);
5198   ASSERT_TRUE(read_options.snapshot);
5199   ASSERT_EQ(value, "1");
5200 
5201   s = txn1->Commit();
5202   ASSERT_OK(s);
5203 
5204   delete txn1;
5205 }
5206 
TEST_P(TransactionTest,ClearSnapshotTest)5207 TEST_P(TransactionTest, ClearSnapshotTest) {
5208   WriteOptions write_options;
5209   ReadOptions read_options, snapshot_read_options;
5210   std::string value;
5211   Status s;
5212 
5213   s = db->Put(write_options, "foo", "0");
5214   ASSERT_OK(s);
5215 
5216   Transaction* txn = db->BeginTransaction(write_options);
5217   ASSERT_TRUE(txn);
5218 
5219   s = db->Put(write_options, "foo", "1");
5220   ASSERT_OK(s);
5221 
5222   snapshot_read_options.snapshot = txn->GetSnapshot();
5223   ASSERT_FALSE(snapshot_read_options.snapshot);
5224 
5225   // No snapshot created yet
5226   s = txn->Get(snapshot_read_options, "foo", &value);
5227   ASSERT_EQ(value, "1");
5228 
5229   txn->SetSnapshot();
5230   snapshot_read_options.snapshot = txn->GetSnapshot();
5231   ASSERT_TRUE(snapshot_read_options.snapshot);
5232 
5233   s = db->Put(write_options, "foo", "2");
5234   ASSERT_OK(s);
5235 
5236   // Snapshot was created before change to '2'
5237   s = txn->Get(snapshot_read_options, "foo", &value);
5238   ASSERT_EQ(value, "1");
5239 
5240   txn->ClearSnapshot();
5241   snapshot_read_options.snapshot = txn->GetSnapshot();
5242   ASSERT_FALSE(snapshot_read_options.snapshot);
5243 
5244   // Snapshot has now been cleared
5245   s = txn->Get(snapshot_read_options, "foo", &value);
5246   ASSERT_EQ(value, "2");
5247 
5248   s = txn->Commit();
5249   ASSERT_OK(s);
5250 
5251   delete txn;
5252 }
5253 
TEST_P(TransactionTest,ToggleAutoCompactionTest)5254 TEST_P(TransactionTest, ToggleAutoCompactionTest) {
5255   Status s;
5256 
5257   ColumnFamilyHandle *cfa, *cfb;
5258   ColumnFamilyOptions cf_options;
5259 
5260   // Create 2 new column families
5261   s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
5262   ASSERT_OK(s);
5263   s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
5264   ASSERT_OK(s);
5265 
5266   delete cfa;
5267   delete cfb;
5268   delete db;
5269 
5270   // open DB with three column families
5271   std::vector<ColumnFamilyDescriptor> column_families;
5272   // have to open default column family
5273   column_families.push_back(
5274       ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
5275   // open the new column families
5276   column_families.push_back(
5277       ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
5278   column_families.push_back(
5279       ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
5280 
5281   ColumnFamilyOptions* cf_opt_default = &column_families[0].options;
5282   ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options;
5283   ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options;
5284   cf_opt_default->disable_auto_compactions = false;
5285   cf_opt_cfa->disable_auto_compactions = true;
5286   cf_opt_cfb->disable_auto_compactions = false;
5287 
5288   std::vector<ColumnFamilyHandle*> handles;
5289 
5290   s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
5291                           &handles, &db);
5292   ASSERT_OK(s);
5293 
5294   auto cfh_default = static_cast_with_check<ColumnFamilyHandleImpl>(handles[0]);
5295   auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions();
5296 
5297   auto cfh_a = static_cast_with_check<ColumnFamilyHandleImpl>(handles[1]);
5298   auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions();
5299 
5300   auto cfh_b = static_cast_with_check<ColumnFamilyHandleImpl>(handles[2]);
5301   auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions();
5302 
5303   ASSERT_EQ(opt_default.disable_auto_compactions, false);
5304   ASSERT_EQ(opt_a.disable_auto_compactions, true);
5305   ASSERT_EQ(opt_b.disable_auto_compactions, false);
5306 
5307   for (auto handle : handles) {
5308     delete handle;
5309   }
5310 }
5311 
TEST_P(TransactionStressTest,ExpiredTransactionDataRace1)5312 TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) {
5313   // In this test, txn1 should succeed committing,
5314   // as the callback is called after txn1 starts committing.
5315   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5316       {{"TransactionTest::ExpirableTransactionDataRace:1"}});
5317   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5318       "TransactionTest::ExpirableTransactionDataRace:1", [&](void* /*arg*/) {
5319         WriteOptions write_options;
5320         TransactionOptions txn_options;
5321 
5322         // Force txn1 to expire
5323         /* sleep override */
5324         std::this_thread::sleep_for(std::chrono::milliseconds(1500));
5325 
5326         Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
5327         Status s;
5328         s = txn2->Put("X", "2");
5329         ASSERT_TRUE(s.IsTimedOut());
5330         s = txn2->Commit();
5331         ASSERT_OK(s);
5332         delete txn2;
5333       });
5334 
5335   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5336 
5337   WriteOptions write_options;
5338   TransactionOptions txn_options;
5339 
5340   txn_options.expiration = 1000;  // 1 second
5341   Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
5342 
5343   Status s;
5344   s = txn1->Put("X", "1");
5345   ASSERT_OK(s);
5346   s = txn1->Commit();
5347   ASSERT_OK(s);
5348 
5349   ReadOptions read_options;
5350   string value;
5351   s = db->Get(read_options, "X", &value);
5352   ASSERT_OK(s);
5353   ASSERT_EQ("1", value);
5354 
5355   delete txn1;
5356   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5357 }
5358 
5359 #ifndef ROCKSDB_VALGRIND_RUN
5360 namespace {
5361 // cmt_delay_ms is the delay between prepare and commit
5362 // first_id is the id of the first transaction
TransactionStressTestInserter(TransactionDB * db,const size_t num_transactions,const size_t num_sets,const size_t num_keys_per_set,Random64 * rand,const uint64_t cmt_delay_ms=0,const uint64_t first_id=0)5363 Status TransactionStressTestInserter(
5364     TransactionDB* db, const size_t num_transactions, const size_t num_sets,
5365     const size_t num_keys_per_set, Random64* rand,
5366     const uint64_t cmt_delay_ms = 0, const uint64_t first_id = 0) {
5367   WriteOptions write_options;
5368   ReadOptions read_options;
5369   TransactionOptions txn_options;
5370   if (rand->OneIn(2)) {
5371     txn_options.use_only_the_last_commit_time_batch_for_recovery = true;
5372   }
5373   // Inside the inserter we might also retake the snapshot. We do both since two
5374   // separte functions are engaged for each.
5375   txn_options.set_snapshot = rand->OneIn(2);
5376 
5377   RandomTransactionInserter inserter(
5378       rand, write_options, read_options, num_keys_per_set,
5379       static_cast<uint16_t>(num_sets), cmt_delay_ms, first_id);
5380 
5381   for (size_t t = 0; t < num_transactions; t++) {
5382     bool success = inserter.TransactionDBInsert(db, txn_options);
5383     if (!success) {
5384       // unexpected failure
5385       return inserter.GetLastStatus();
5386     }
5387   }
5388   inserter.GetLastStatus().PermitUncheckedError();
5389 
5390   // Make sure at least some of the transactions succeeded.  It's ok if
5391   // some failed due to write-conflicts.
5392   if (num_transactions != 1 &&
5393       inserter.GetFailureCount() > num_transactions / 2) {
5394     return Status::TryAgain("Too many transactions failed! " +
5395                             std::to_string(inserter.GetFailureCount()) + " / " +
5396                             std::to_string(num_transactions));
5397   }
5398 
5399   return Status::OK();
5400 }
5401 }  // namespace
5402 
5403 // Worker threads add a number to a key from each set of keys. The checker
5404 // threads verify that the sum of all keys in each set are equal.
TEST_P(MySQLStyleTransactionTest,TransactionStressTest)5405 TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
5406   // Small write buffer to trigger more compactions
5407   options.write_buffer_size = 1024;
5408   ASSERT_OK(ReOpenNoDelete());
5409   constexpr size_t num_workers = 4;        // worker threads count
5410   constexpr size_t num_checkers = 2;       // checker threads count
5411   constexpr size_t num_slow_checkers = 2;  // checker threads emulating backups
5412   constexpr size_t num_slow_workers = 1;   // slow worker threads count
5413   constexpr size_t num_transactions_per_thread = 10000;
5414   constexpr uint16_t num_sets = 3;
5415   constexpr size_t num_keys_per_set = 100;
5416   // Setting the key-space to be 100 keys should cause enough write-conflicts
5417   // to make this test interesting.
5418 
5419   std::vector<port::Thread> threads;
5420   std::atomic<uint32_t> finished = {0};
5421   constexpr bool TAKE_SNAPSHOT = true;
5422   uint64_t time_seed = env->NowMicros();
5423   printf("time_seed is %" PRIu64 "\n", time_seed);  // would help to reproduce
5424 
5425   std::function<void()> call_inserter = [&] {
5426     size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5427     Random64 rand(time_seed * thd_seed);
5428     ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread,
5429                                             num_sets, num_keys_per_set, &rand));
5430     finished++;
5431   };
5432   std::function<void()> call_checker = [&] {
5433     size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5434     Random64 rand(time_seed * thd_seed);
5435     // Verify that data is consistent
5436     while (finished < num_workers) {
5437       ASSERT_OK(RandomTransactionInserter::Verify(
5438           db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand));
5439     }
5440   };
5441   std::function<void()> call_slow_checker = [&] {
5442     size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5443     Random64 rand(time_seed * thd_seed);
5444     // Verify that data is consistent
5445     while (finished < num_workers) {
5446       uint64_t delay_ms = rand.Uniform(100) + 1;
5447       Status s = RandomTransactionInserter::Verify(
5448           db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand, delay_ms);
5449       ASSERT_OK(s);
5450     }
5451   };
5452   std::function<void()> call_slow_inserter = [&] {
5453     size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5454     Random64 rand(time_seed * thd_seed);
5455     uint64_t id = 0;
5456     // Verify that data is consistent
5457     while (finished < num_workers) {
5458       uint64_t delay_ms = rand.Uniform(500) + 1;
5459       ASSERT_OK(TransactionStressTestInserter(db, 1, num_sets, num_keys_per_set,
5460                                               &rand, delay_ms, id++));
5461     }
5462   };
5463 
5464   for (uint32_t i = 0; i < num_workers; i++) {
5465     threads.emplace_back(call_inserter);
5466   }
5467   for (uint32_t i = 0; i < num_checkers; i++) {
5468     threads.emplace_back(call_checker);
5469   }
5470   if (with_slow_threads_) {
5471     for (uint32_t i = 0; i < num_slow_checkers; i++) {
5472       threads.emplace_back(call_slow_checker);
5473     }
5474     for (uint32_t i = 0; i < num_slow_workers; i++) {
5475       threads.emplace_back(call_slow_inserter);
5476     }
5477   }
5478 
5479   // Wait for all threads to finish
5480   for (auto& t : threads) {
5481     t.join();
5482   }
5483 
5484   // Verify that data is consistent
5485   Status s = RandomTransactionInserter::Verify(db, num_sets, num_keys_per_set,
5486                                                !TAKE_SNAPSHOT);
5487   ASSERT_OK(s);
5488 }
5489 #endif  // ROCKSDB_VALGRIND_RUN
5490 
TEST_P(TransactionTest,MemoryLimitTest)5491 TEST_P(TransactionTest, MemoryLimitTest) {
5492   TransactionOptions txn_options;
5493   // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
5494   txn_options.max_write_batch_size = 29;
5495   // Set threshold to unlimited so that the write batch does not get flushed,
5496   // and can hit the memory limit.
5497   txn_options.write_batch_flush_threshold = 0;
5498   std::string value;
5499   Status s;
5500 
5501   Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options);
5502   ASSERT_TRUE(txn);
5503 
5504   ASSERT_EQ(0, txn->GetNumPuts());
5505   ASSERT_LE(0, txn->GetID());
5506 
5507   s = txn->Put(Slice("a"), Slice("...."));
5508   ASSERT_OK(s);
5509   ASSERT_EQ(1, txn->GetNumPuts());
5510 
5511   s = txn->Put(Slice("b"), Slice("...."));
5512   ASSERT_OK(s);
5513   ASSERT_EQ(2, txn->GetNumPuts());
5514 
5515   s = txn->Put(Slice("b"), Slice("...."));
5516   ASSERT_TRUE(s.IsMemoryLimit());
5517   ASSERT_EQ(2, txn->GetNumPuts());
5518 
5519   ASSERT_OK(txn->Rollback());
5520   delete txn;
5521 }
5522 
5523 // This test clarifies the existing expectation from the sequence number
5524 // algorithm. It could detect mistakes in updating the code but it is not
5525 // necessarily the one acceptable way. If the algorithm is legitimately changed,
5526 // this unit test should be updated as well.
TEST_P(TransactionStressTest,SeqAdvanceTest)5527 TEST_P(TransactionStressTest, SeqAdvanceTest) {
5528   // TODO(myabandeh): must be test with false before new releases
5529   const bool short_test = true;
5530   WriteOptions wopts;
5531   FlushOptions fopt;
5532 
5533   options.disable_auto_compactions = true;
5534   ASSERT_OK(ReOpen());
5535 
5536   // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
5537   // of the branches. This is the same as counting a binary number where i-th
5538   // bit represents whether we take branch i in the represented by the number.
5539   const size_t NUM_BRANCHES = short_test ? 6 : 10;
5540   // Helper function that shows if the branch is to be taken in the run
5541   // represented by the number n.
5542   auto branch_do = [&](size_t n, size_t* branch) {
5543     assert(*branch < NUM_BRANCHES);
5544     const size_t filter = static_cast<size_t>(1) << *branch;
5545     return n & filter;
5546   };
5547   const size_t max_n = static_cast<size_t>(1) << NUM_BRANCHES;
5548   for (size_t n = 0; n < max_n; n++) {
5549     DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5550     size_t branch = 0;
5551     auto seq = db_impl->GetLatestSequenceNumber();
5552     exp_seq = seq;
5553     txn_t0(0);
5554     seq = db_impl->TEST_GetLastVisibleSequence();
5555     ASSERT_EQ(exp_seq, seq);
5556 
5557     if (branch_do(n, &branch)) {
5558       ASSERT_OK(db_impl->Flush(fopt));
5559       seq = db_impl->TEST_GetLastVisibleSequence();
5560       ASSERT_EQ(exp_seq, seq);
5561     }
5562     if (!short_test && branch_do(n, &branch)) {
5563       ASSERT_OK(db_impl->FlushWAL(true));
5564       ASSERT_OK(ReOpenNoDelete());
5565       db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5566       seq = db_impl->GetLatestSequenceNumber();
5567       ASSERT_EQ(exp_seq, seq);
5568     }
5569 
5570     // Doing it twice might detect some bugs
5571     txn_t0(1);
5572     seq = db_impl->TEST_GetLastVisibleSequence();
5573     ASSERT_EQ(exp_seq, seq);
5574 
5575     txn_t1(0);
5576     seq = db_impl->TEST_GetLastVisibleSequence();
5577     ASSERT_EQ(exp_seq, seq);
5578 
5579     if (branch_do(n, &branch)) {
5580       ASSERT_OK(db_impl->Flush(fopt));
5581       seq = db_impl->TEST_GetLastVisibleSequence();
5582       ASSERT_EQ(exp_seq, seq);
5583     }
5584     if (!short_test && branch_do(n, &branch)) {
5585       ASSERT_OK(db_impl->FlushWAL(true));
5586       ASSERT_OK(ReOpenNoDelete());
5587       db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5588       seq = db_impl->GetLatestSequenceNumber();
5589       ASSERT_EQ(exp_seq, seq);
5590     }
5591 
5592     txn_t3(0);
5593     seq = db_impl->TEST_GetLastVisibleSequence();
5594     ASSERT_EQ(exp_seq, seq);
5595 
5596     if (branch_do(n, &branch)) {
5597       ASSERT_OK(db_impl->Flush(fopt));
5598       seq = db_impl->TEST_GetLastVisibleSequence();
5599       ASSERT_EQ(exp_seq, seq);
5600     }
5601     if (!short_test && branch_do(n, &branch)) {
5602       ASSERT_OK(db_impl->FlushWAL(true));
5603       ASSERT_OK(ReOpenNoDelete());
5604       db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5605       seq = db_impl->GetLatestSequenceNumber();
5606       ASSERT_EQ(exp_seq, seq);
5607     }
5608 
5609     txn_t4(0);
5610     seq = db_impl->TEST_GetLastVisibleSequence();
5611 
5612     ASSERT_EQ(exp_seq, seq);
5613 
5614     if (branch_do(n, &branch)) {
5615       ASSERT_OK(db_impl->Flush(fopt));
5616       seq = db_impl->TEST_GetLastVisibleSequence();
5617       ASSERT_EQ(exp_seq, seq);
5618     }
5619     if (!short_test && branch_do(n, &branch)) {
5620       ASSERT_OK(db_impl->FlushWAL(true));
5621       ASSERT_OK(ReOpenNoDelete());
5622       db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5623       seq = db_impl->GetLatestSequenceNumber();
5624       ASSERT_EQ(exp_seq, seq);
5625     }
5626 
5627     txn_t2(0);
5628     seq = db_impl->TEST_GetLastVisibleSequence();
5629     ASSERT_EQ(exp_seq, seq);
5630 
5631     if (branch_do(n, &branch)) {
5632       ASSERT_OK(db_impl->Flush(fopt));
5633       seq = db_impl->TEST_GetLastVisibleSequence();
5634       ASSERT_EQ(exp_seq, seq);
5635     }
5636     if (!short_test && branch_do(n, &branch)) {
5637       ASSERT_OK(db_impl->FlushWAL(true));
5638       ASSERT_OK(ReOpenNoDelete());
5639       db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5640       seq = db_impl->GetLatestSequenceNumber();
5641       ASSERT_EQ(exp_seq, seq);
5642     }
5643     ASSERT_OK(ReOpen());
5644   }
5645 }
5646 
5647 // Verify that the optimization would not compromize the correctness
TEST_P(TransactionTest,Optimizations)5648 TEST_P(TransactionTest, Optimizations) {
5649   size_t comb_cnt = size_t(1) << 2;  // 2 is number of optimization vars
5650   for (size_t new_comb = 0; new_comb < comb_cnt; new_comb++) {
5651     TransactionDBWriteOptimizations optimizations;
5652     optimizations.skip_concurrency_control = IsInCombination(0, new_comb);
5653     optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb);
5654 
5655     ASSERT_OK(ReOpen());
5656     WriteOptions write_options;
5657     WriteBatch batch;
5658     ASSERT_OK(batch.Put(Slice("k"), Slice("v1")));
5659     ASSERT_OK(db->Write(write_options, &batch));
5660 
5661     ReadOptions ropt;
5662     PinnableSlice pinnable_val;
5663     ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "k", &pinnable_val));
5664     ASSERT_TRUE(pinnable_val == ("v1"));
5665   }
5666 }
5667 
5668 // A comparator that uses only the first three bytes
5669 class ThreeBytewiseComparator : public Comparator {
5670  public:
ThreeBytewiseComparator()5671   ThreeBytewiseComparator() {}
Name() const5672   const char* Name() const override { return "test.ThreeBytewiseComparator"; }
Compare(const Slice & a,const Slice & b) const5673   int Compare(const Slice& a, const Slice& b) const override {
5674     Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5675     Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5676     return na.compare(nb);
5677   }
Equal(const Slice & a,const Slice & b) const5678   bool Equal(const Slice& a, const Slice& b) const override {
5679     Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5680     Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5681     return na == nb;
5682   }
5683   // These methods below don't seem relevant to this test. Implement them if
5684   // proven othersize.
FindShortestSeparator(std::string * start,const Slice & limit) const5685   void FindShortestSeparator(std::string* start,
5686                              const Slice& limit) const override {
5687     const Comparator* bytewise_comp = BytewiseComparator();
5688     bytewise_comp->FindShortestSeparator(start, limit);
5689   }
FindShortSuccessor(std::string * key) const5690   void FindShortSuccessor(std::string* key) const override {
5691     const Comparator* bytewise_comp = BytewiseComparator();
5692     bytewise_comp->FindShortSuccessor(key);
5693   }
5694 };
5695 
5696 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionTest,GetWithoutSnapshot)5697 TEST_P(TransactionTest, GetWithoutSnapshot) {
5698   WriteOptions write_options;
5699   std::atomic<bool> finish = {false};
5700   ASSERT_OK(db->Put(write_options, "key", "value"));
5701   ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
5702     for (int i = 0; i < 100; i++) {
5703       TransactionOptions txn_options;
5704       Transaction* txn = db->BeginTransaction(write_options, txn_options);
5705       ASSERT_OK(txn->SetName("xid"));
5706       ASSERT_OK(txn->Put("key", "overridedvalue"));
5707       ASSERT_OK(txn->Put("key", "value"));
5708       ASSERT_OK(txn->Prepare());
5709       ASSERT_OK(txn->Commit());
5710       delete txn;
5711     }
5712     finish = true;
5713   });
5714   ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
5715     while (!finish) {
5716       ReadOptions ropt;
5717       PinnableSlice pinnable_val;
5718       ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val));
5719       ASSERT_TRUE(pinnable_val == ("value"));
5720     }
5721   });
5722   commit_thread.join();
5723   read_thread.join();
5724 }
5725 #endif  // ROCKSDB_VALGRIND_RUN
5726 
5727 // Test that the transactional db can handle duplicate keys in the write batch
TEST_P(TransactionTest,DuplicateKeys)5728 TEST_P(TransactionTest, DuplicateKeys) {
5729   ColumnFamilyOptions cf_options;
5730   std::string cf_name = "two";
5731   ColumnFamilyHandle* cf_handle = nullptr;
5732   {
5733     ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5734     WriteOptions write_options;
5735     WriteBatch batch;
5736     ASSERT_OK(batch.Put(Slice("key"), Slice("value")));
5737     ASSERT_OK(batch.Put(Slice("key2"), Slice("value2")));
5738     // duplicate the keys
5739     ASSERT_OK(batch.Put(Slice("key"), Slice("value3")));
5740     // duplicate the 2nd key. It should not be counted duplicate since a
5741     // sub-patch is cut after the last duplicate.
5742     ASSERT_OK(batch.Put(Slice("key2"), Slice("value4")));
5743     // duplicate the keys but in a different cf. It should not be counted as
5744     // duplicate keys
5745     ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5")));
5746 
5747     ASSERT_OK(db->Write(write_options, &batch));
5748 
5749     ReadOptions ropt;
5750     PinnableSlice pinnable_val;
5751     auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
5752     ASSERT_OK(s);
5753     ASSERT_TRUE(pinnable_val == ("value3"));
5754     s = db->Get(ropt, db->DefaultColumnFamily(), "key2", &pinnable_val);
5755     ASSERT_OK(s);
5756     ASSERT_TRUE(pinnable_val == ("value4"));
5757     s = db->Get(ropt, cf_handle, "key", &pinnable_val);
5758     ASSERT_OK(s);
5759     ASSERT_TRUE(pinnable_val == ("value5"));
5760 
5761     delete cf_handle;
5762   }
5763 
5764   // Test with non-bytewise comparator
5765   {
5766     ASSERT_OK(ReOpen());
5767     std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5768     cf_options.comparator = comp_gc.get();
5769     ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5770     WriteOptions write_options;
5771     WriteBatch batch;
5772     ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value")));
5773     // The first three bytes are the same, do it must be counted as duplicate
5774     ASSERT_OK(batch.Put(cf_handle, Slice("key2"), Slice("value2")));
5775     // check for 2nd duplicate key in cf with non-default comparator
5776     ASSERT_OK(batch.Put(cf_handle, Slice("key2b"), Slice("value2b")));
5777     ASSERT_OK(db->Write(write_options, &batch));
5778 
5779     // The value must be the most recent value for all the keys equal to "key",
5780     // including "key2"
5781     ReadOptions ropt;
5782     PinnableSlice pinnable_val;
5783     ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val));
5784     ASSERT_TRUE(pinnable_val == ("value2b"));
5785 
5786     // Test duplicate keys with rollback
5787     TransactionOptions txn_options;
5788     Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5789     ASSERT_OK(txn0->SetName("xid"));
5790     ASSERT_OK(txn0->Put(cf_handle, Slice("key3"), Slice("value3")));
5791     ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4")));
5792     ASSERT_OK(txn0->Rollback());
5793     ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val));
5794     ASSERT_TRUE(pinnable_val == ("value2b"));
5795     delete txn0;
5796 
5797     delete cf_handle;
5798     cf_options.comparator = BytewiseComparator();
5799   }
5800 
5801   for (bool do_prepare : {true, false}) {
5802     for (bool do_rollback : {true, false}) {
5803       for (bool with_commit_batch : {true, false}) {
5804         if (with_commit_batch && !do_prepare) {
5805           continue;
5806         }
5807         if (with_commit_batch && do_rollback) {
5808           continue;
5809         }
5810         ASSERT_OK(ReOpen());
5811         ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5812         TransactionOptions txn_options;
5813         txn_options.use_only_the_last_commit_time_batch_for_recovery = false;
5814         WriteOptions write_options;
5815         Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5816         auto s = txn0->SetName("xid");
5817         ASSERT_OK(s);
5818         s = txn0->Put(Slice("foo0"), Slice("bar0a"));
5819         ASSERT_OK(s);
5820         s = txn0->Put(Slice("foo0"), Slice("bar0b"));
5821         ASSERT_OK(s);
5822         s = txn0->Put(Slice("foo1"), Slice("bar1"));
5823         ASSERT_OK(s);
5824         s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
5825         ASSERT_OK(s);
5826         // Repeat a key after the start of a sub-patch. This should not cause a
5827         // duplicate in the most recent sub-patch and hence not creating a new
5828         // sub-patch.
5829         s = txn0->Put(Slice("foo0"), Slice("bar0c"));
5830         ASSERT_OK(s);
5831         s = txn0->Merge(Slice("foo2"), Slice("bar2b"));
5832         ASSERT_OK(s);
5833         // duplicate the keys but in a different cf. It should not be counted as
5834         // duplicate.
5835         s = txn0->Put(cf_handle, Slice("foo0"), Slice("bar0-cf1"));
5836         ASSERT_OK(s);
5837         s = txn0->Put(Slice("foo3"), Slice("bar3"));
5838         ASSERT_OK(s);
5839         s = txn0->Merge(Slice("foo3"), Slice("bar3"));
5840         ASSERT_OK(s);
5841         s = txn0->Put(Slice("foo4"), Slice("bar4"));
5842         ASSERT_OK(s);
5843         s = txn0->Delete(Slice("foo4"));
5844         ASSERT_OK(s);
5845         s = txn0->SingleDelete(Slice("foo4"));
5846         ASSERT_OK(s);
5847         if (do_prepare) {
5848           s = txn0->Prepare();
5849           ASSERT_OK(s);
5850         }
5851         if (do_rollback) {
5852           // Test rolling back the batch with duplicates
5853           s = txn0->Rollback();
5854           ASSERT_OK(s);
5855         } else {
5856           if (with_commit_batch) {
5857             assert(do_prepare);
5858             auto cb = txn0->GetCommitTimeWriteBatch();
5859             // duplicate a key in the original batch
5860             // TODO(myabandeh): the behavior of GetCommitTimeWriteBatch
5861             // conflicting with the prepared batch is currently undefined and
5862             // gives different results in different implementations.
5863 
5864             // s = cb->Put(Slice("foo0"), Slice("bar0d"));
5865             // ASSERT_OK(s);
5866             // add a new duplicate key
5867             s = cb->Put(Slice("foo6"), Slice("bar6a"));
5868             ASSERT_OK(s);
5869             s = cb->Put(Slice("foo6"), Slice("bar6b"));
5870             ASSERT_OK(s);
5871             // add a duplicate key that is removed in the same batch
5872             s = cb->Put(Slice("foo7"), Slice("bar7a"));
5873             ASSERT_OK(s);
5874             s = cb->Delete(Slice("foo7"));
5875             ASSERT_OK(s);
5876           }
5877           s = txn0->Commit();
5878           ASSERT_OK(s);
5879         }
5880         delete txn0;
5881         ReadOptions ropt;
5882         PinnableSlice pinnable_val;
5883 
5884         if (do_rollback) {
5885           s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5886           ASSERT_TRUE(s.IsNotFound());
5887           s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5888           ASSERT_TRUE(s.IsNotFound());
5889           s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5890           ASSERT_TRUE(s.IsNotFound());
5891           s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5892           ASSERT_TRUE(s.IsNotFound());
5893           s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5894           ASSERT_TRUE(s.IsNotFound());
5895           s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5896           ASSERT_TRUE(s.IsNotFound());
5897         } else {
5898           s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5899           ASSERT_OK(s);
5900           ASSERT_TRUE(pinnable_val == ("bar0c"));
5901           s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5902           ASSERT_OK(s);
5903           ASSERT_TRUE(pinnable_val == ("bar0-cf1"));
5904           s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5905           ASSERT_OK(s);
5906           ASSERT_TRUE(pinnable_val == ("bar1"));
5907           s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5908           ASSERT_OK(s);
5909           ASSERT_TRUE(pinnable_val == ("bar2a,bar2b"));
5910           s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5911           ASSERT_OK(s);
5912           ASSERT_TRUE(pinnable_val == ("bar3,bar3"));
5913           s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5914           ASSERT_TRUE(s.IsNotFound());
5915           if (with_commit_batch) {
5916             s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val);
5917             ASSERT_OK(s);
5918             ASSERT_TRUE(pinnable_val == ("bar6b"));
5919             s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val);
5920             ASSERT_TRUE(s.IsNotFound());
5921           }
5922         }
5923         delete cf_handle;
5924       }  // with_commit_batch
5925     }    // do_rollback
5926   }      // do_prepare
5927 
5928   if (!options.unordered_write) {
5929     // Also test with max_successive_merges > 0. max_successive_merges will not
5930     // affect our algorithm for duplicate key insertion but we add the test to
5931     // verify that.
5932     cf_options.max_successive_merges = 2;
5933     cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
5934     ASSERT_OK(ReOpen());
5935     db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
5936     WriteOptions write_options;
5937     // Ensure one value for the key
5938     ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value")));
5939     WriteBatch batch;
5940     // Merge more than max_successive_merges times
5941     ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("1")));
5942     ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("2")));
5943     ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("3")));
5944     ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("4")));
5945     ASSERT_OK(db->Write(write_options, &batch));
5946     ReadOptions read_options;
5947     string value;
5948     ASSERT_OK(db->Get(read_options, cf_handle, "key", &value));
5949     ASSERT_EQ(value, "value,1,2,3,4");
5950     delete cf_handle;
5951   }
5952 
5953   {
5954     // Test that the duplicate detection is not compromised after rolling back
5955     // to a save point
5956     TransactionOptions txn_options;
5957     WriteOptions write_options;
5958     Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5959     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
5960     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
5961     txn0->SetSavePoint();
5962     ASSERT_OK(txn0->RollbackToSavePoint());
5963     ASSERT_OK(txn0->Commit());
5964     delete txn0;
5965   }
5966 
5967   // Test sucessfull recovery after a crash
5968   {
5969     ASSERT_OK(ReOpen());
5970     TransactionOptions txn_options;
5971     WriteOptions write_options;
5972     ReadOptions ropt;
5973     Transaction* txn0;
5974     PinnableSlice pinnable_val;
5975     Status s;
5976 
5977     std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5978     cf_options.comparator = comp_gc.get();
5979     cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
5980     ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5981     delete cf_handle;
5982     std::vector<ColumnFamilyDescriptor> cfds{
5983         ColumnFamilyDescriptor(kDefaultColumnFamilyName,
5984                                ColumnFamilyOptions(options)),
5985         ColumnFamilyDescriptor(cf_name, cf_options),
5986     };
5987     std::vector<ColumnFamilyHandle*> handles;
5988     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5989 
5990     assert(db != nullptr);
5991     ASSERT_OK(db->Put(write_options, "foo0", "init"));
5992     ASSERT_OK(db->Put(write_options, "foo1", "init"));
5993     ASSERT_OK(db->Put(write_options, handles[1], "foo0", "init"));
5994     ASSERT_OK(db->Put(write_options, handles[1], "foo1", "init"));
5995 
5996     // one entry
5997     txn0 = db->BeginTransaction(write_options, txn_options);
5998     ASSERT_OK(txn0->SetName("xid"));
5999     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
6000     ASSERT_OK(txn0->Prepare());
6001     delete txn0;
6002     // This will check the asserts inside recovery code
6003     ASSERT_OK(db->FlushWAL(true));
6004     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6005     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6006     txn0 = db->GetTransactionByName("xid");
6007     ASSERT_TRUE(txn0 != nullptr);
6008     ASSERT_OK(txn0->Commit());
6009     delete txn0;
6010     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6011     ASSERT_OK(s);
6012     ASSERT_TRUE(pinnable_val == ("bar0a"));
6013 
6014     // two entries, no duplicate
6015     txn0 = db->BeginTransaction(write_options, txn_options);
6016     ASSERT_OK(txn0->SetName("xid"));
6017     ASSERT_OK(txn0->Put(handles[1], Slice("foo0"), Slice("bar0b")));
6018     ASSERT_OK(txn0->Put(handles[1], Slice("fol1"), Slice("bar1b")));
6019     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
6020     ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1b")));
6021     ASSERT_OK(txn0->Prepare());
6022     delete txn0;
6023     // This will check the asserts inside recovery code
6024     ASSERT_OK(db->FlushWAL(true));
6025     // Flush only cf 1
6026     ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6027                   ->TEST_FlushMemTable(true, false, handles[1]));
6028     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6029     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6030     txn0 = db->GetTransactionByName("xid");
6031     ASSERT_TRUE(txn0 != nullptr);
6032     ASSERT_OK(txn0->Commit());
6033     delete txn0;
6034     pinnable_val.Reset();
6035     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6036     ASSERT_OK(s);
6037     ASSERT_TRUE(pinnable_val == ("bar0b"));
6038     pinnable_val.Reset();
6039     s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
6040     ASSERT_OK(s);
6041     ASSERT_TRUE(pinnable_val == ("bar1b"));
6042     pinnable_val.Reset();
6043     s = db->Get(ropt, handles[1], "foo0", &pinnable_val);
6044     ASSERT_OK(s);
6045     ASSERT_TRUE(pinnable_val == ("bar0b"));
6046     pinnable_val.Reset();
6047     s = db->Get(ropt, handles[1], "fol1", &pinnable_val);
6048     ASSERT_OK(s);
6049     ASSERT_TRUE(pinnable_val == ("bar1b"));
6050 
6051     // one duplicate with ::Put
6052     txn0 = db->BeginTransaction(write_options, txn_options);
6053     ASSERT_OK(txn0->SetName("xid"));
6054     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0c")));
6055     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey1"), Slice("bar1d")));
6056     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0c")));
6057     ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1c")));
6058     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0d")));
6059     ASSERT_OK(txn0->Prepare());
6060     delete txn0;
6061     // This will check the asserts inside recovery code
6062     ASSERT_OK(db->FlushWAL(true));
6063     // Flush only cf 1
6064     ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6065                   ->TEST_FlushMemTable(true, false, handles[1]));
6066     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6067     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6068     txn0 = db->GetTransactionByName("xid");
6069     ASSERT_TRUE(txn0 != nullptr);
6070     ASSERT_OK(txn0->Commit());
6071     delete txn0;
6072     pinnable_val.Reset();
6073     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6074     ASSERT_OK(s);
6075     ASSERT_TRUE(pinnable_val == ("bar0d"));
6076     pinnable_val.Reset();
6077     s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
6078     ASSERT_OK(s);
6079     ASSERT_TRUE(pinnable_val == ("bar1c"));
6080     pinnable_val.Reset();
6081     s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6082     ASSERT_OK(s);
6083     ASSERT_TRUE(pinnable_val == ("bar1d"));
6084 
6085     // Duplicate with ::Put, ::Delete
6086     txn0 = db->BeginTransaction(write_options, txn_options);
6087     ASSERT_OK(txn0->SetName("xid"));
6088     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0e")));
6089     ASSERT_OK(txn0->Delete(handles[1], Slice("key-nonkey1")));
6090     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
6091     ASSERT_OK(txn0->Delete(Slice("foo0")));
6092     ASSERT_OK(txn0->Prepare());
6093     delete txn0;
6094     // This will check the asserts inside recovery code
6095     ASSERT_OK(db->FlushWAL(true));
6096     // Flush only cf 1
6097     ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6098                   ->TEST_FlushMemTable(true, false, handles[1]));
6099     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6100     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6101     txn0 = db->GetTransactionByName("xid");
6102     ASSERT_TRUE(txn0 != nullptr);
6103     ASSERT_OK(txn0->Commit());
6104     delete txn0;
6105     pinnable_val.Reset();
6106     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6107     ASSERT_TRUE(s.IsNotFound());
6108     pinnable_val.Reset();
6109     s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6110     ASSERT_TRUE(s.IsNotFound());
6111 
6112     // Duplicate with ::Put, ::SingleDelete
6113     txn0 = db->BeginTransaction(write_options, txn_options);
6114     ASSERT_OK(txn0->SetName("xid"));
6115     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0g")));
6116     ASSERT_OK(txn0->SingleDelete(handles[1], Slice("key-nonkey1")));
6117     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
6118     ASSERT_OK(txn0->SingleDelete(Slice("foo0")));
6119     ASSERT_OK(txn0->Prepare());
6120     delete txn0;
6121     // This will check the asserts inside recovery code
6122     ASSERT_OK(db->FlushWAL(true));
6123     // Flush only cf 1
6124     ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6125                   ->TEST_FlushMemTable(true, false, handles[1]));
6126     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6127     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6128     txn0 = db->GetTransactionByName("xid");
6129     ASSERT_TRUE(txn0 != nullptr);
6130     ASSERT_OK(txn0->Commit());
6131     delete txn0;
6132     pinnable_val.Reset();
6133     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6134     ASSERT_TRUE(s.IsNotFound());
6135     pinnable_val.Reset();
6136     s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6137     ASSERT_TRUE(s.IsNotFound());
6138 
6139     // Duplicate with ::Put, ::Merge
6140     txn0 = db->BeginTransaction(write_options, txn_options);
6141     ASSERT_OK(txn0->SetName("xid"));
6142     ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar1i")));
6143     ASSERT_OK(txn0->Merge(handles[1], Slice("key-nonkey1"), Slice("bar1j")));
6144     ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0f")));
6145     ASSERT_OK(txn0->Merge(Slice("foo0"), Slice("bar0g")));
6146     ASSERT_OK(txn0->Prepare());
6147     delete txn0;
6148     // This will check the asserts inside recovery code
6149     ASSERT_OK(db->FlushWAL(true));
6150     // Flush only cf 1
6151     ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6152                   ->TEST_FlushMemTable(true, false, handles[1]));
6153     reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6154     ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6155     txn0 = db->GetTransactionByName("xid");
6156     ASSERT_TRUE(txn0 != nullptr);
6157     ASSERT_OK(txn0->Commit());
6158     delete txn0;
6159     pinnable_val.Reset();
6160     s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6161     ASSERT_OK(s);
6162     ASSERT_TRUE(pinnable_val == ("bar0f,bar0g"));
6163     pinnable_val.Reset();
6164     s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6165     ASSERT_OK(s);
6166     ASSERT_TRUE(pinnable_val == ("bar1i,bar1j"));
6167 
6168     for (auto h : handles) {
6169       delete h;
6170     }
6171     delete db;
6172     db = nullptr;
6173   }
6174 }
6175 
6176 // Test that the reseek optimization in iterators will not result in an infinite
6177 // loop if there are too many uncommitted entries before the snapshot.
TEST_P(TransactionTest,ReseekOptimization)6178 TEST_P(TransactionTest, ReseekOptimization) {
6179   WriteOptions write_options;
6180   write_options.sync = true;
6181   write_options.disableWAL = false;
6182   ColumnFamilyDescriptor cfd;
6183   ASSERT_OK(db->DefaultColumnFamily()->GetDescriptor(&cfd));
6184   auto max_skip = cfd.options.max_sequential_skip_in_iterations;
6185 
6186   ASSERT_OK(db->Put(write_options, Slice("foo0"), Slice("initv")));
6187 
6188   TransactionOptions txn_options;
6189   Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
6190   ASSERT_OK(txn0->SetName("xid"));
6191   // Duplicate keys will result into separate sequence numbers in WritePrepared
6192   // and WriteUnPrepared
6193   for (size_t i = 0; i < 2 * max_skip; i++) {
6194     ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar")));
6195   }
6196   ASSERT_OK(txn0->Prepare());
6197   ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("initv")));
6198 
6199   ReadOptions read_options;
6200   // To avoid loops
6201   read_options.max_skippable_internal_keys = 10 * max_skip;
6202   Iterator* iter = db->NewIterator(read_options);
6203   ASSERT_OK(iter->status());
6204   size_t cnt = 0;
6205   iter->SeekToFirst();
6206   while (iter->Valid()) {
6207     iter->Next();
6208     ASSERT_OK(iter->status());
6209     cnt++;
6210   }
6211   ASSERT_EQ(cnt, 2);
6212   cnt = 0;
6213   iter->SeekToLast();
6214   while (iter->Valid()) {
6215     iter->Prev();
6216     ASSERT_OK(iter->status());
6217     cnt++;
6218   }
6219   ASSERT_EQ(cnt, 2);
6220   delete iter;
6221   ASSERT_OK(txn0->Rollback());
6222   delete txn0;
6223 }
6224 
6225 // After recovery in kPointInTimeRecovery mode, the corrupted log file remains
6226 // there. The new log files should be still read succesfully during recovery of
6227 // the 2nd crash.
TEST_P(TransactionTest,DoubleCrashInRecovery)6228 TEST_P(TransactionTest, DoubleCrashInRecovery) {
6229   for (const bool manual_wal_flush : {false, true}) {
6230     for (const bool write_after_recovery : {false, true}) {
6231       options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
6232       options.manual_wal_flush = manual_wal_flush;
6233       ASSERT_OK(ReOpen());
6234       std::string cf_name = "two";
6235       ColumnFamilyOptions cf_options;
6236       ColumnFamilyHandle* cf_handle = nullptr;
6237       ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
6238 
6239       // Add a prepare entry to prevent the older logs from being deleted.
6240       WriteOptions write_options;
6241       TransactionOptions txn_options;
6242       Transaction* txn = db->BeginTransaction(write_options, txn_options);
6243       ASSERT_OK(txn->SetName("xid"));
6244       ASSERT_OK(txn->Put(Slice("foo-prepare"), Slice("bar-prepare")));
6245       ASSERT_OK(txn->Prepare());
6246 
6247       FlushOptions flush_ops;
6248       ASSERT_OK(db->Flush(flush_ops));
6249       // Now we have a log that cannot be deleted
6250 
6251       ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1"));
6252       // Flush only the 2nd cf
6253       ASSERT_OK(db->Flush(flush_ops, cf_handle));
6254 
6255       // The value is large enough to be touched by the corruption we ingest
6256       // below.
6257       std::string large_value(400, ' ');
6258       // key/value not touched by corruption
6259       ASSERT_OK(db->Put(write_options, "foo2", "bar2"));
6260       // key/value touched by corruption
6261       ASSERT_OK(db->Put(write_options, "foo3", large_value));
6262       // key/value not touched by corruption
6263       ASSERT_OK(db->Put(write_options, "foo4", "bar4"));
6264 
6265       ASSERT_OK(db->FlushWAL(true));
6266       DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
6267       uint64_t wal_file_id = db_impl->TEST_LogfileNumber();
6268       std::string fname = LogFileName(dbname, wal_file_id);
6269       reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6270       delete txn;
6271       delete cf_handle;
6272       delete db;
6273       db = nullptr;
6274 
6275       // Corrupt the last log file in the middle, so that it is not corrupted
6276       // in the tail.
6277       std::string file_content;
6278       ASSERT_OK(ReadFileToString(env, fname, &file_content));
6279       file_content[400] = 'h';
6280       file_content[401] = 'a';
6281       ASSERT_OK(env->DeleteFile(fname));
6282       ASSERT_OK(WriteStringToFile(env, file_content, fname, true));
6283 
6284       // Recover from corruption
6285       std::vector<ColumnFamilyHandle*> handles;
6286       std::vector<ColumnFamilyDescriptor> column_families;
6287       column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName,
6288                                                        ColumnFamilyOptions()));
6289       column_families.push_back(
6290           ColumnFamilyDescriptor("two", ColumnFamilyOptions()));
6291       ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6292       assert(db != nullptr);
6293 
6294       if (write_after_recovery) {
6295         // Write data to the log right after the corrupted log
6296         ASSERT_OK(db->Put(write_options, "foo5", large_value));
6297       }
6298 
6299       // Persist data written to WAL during recovery or by the last Put
6300       ASSERT_OK(db->FlushWAL(true));
6301       // 2nd crash to recover while having a valid log after the corrupted one.
6302       ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6303       assert(db != nullptr);
6304       txn = db->GetTransactionByName("xid");
6305       ASSERT_TRUE(txn != nullptr);
6306       ASSERT_OK(txn->Commit());
6307       delete txn;
6308       for (auto handle : handles) {
6309         delete handle;
6310       }
6311     }
6312   }
6313 }
6314 
TEST_P(TransactionTest,CommitWithoutPrepare)6315 TEST_P(TransactionTest, CommitWithoutPrepare) {
6316   {
6317     // skip_prepare = false.
6318     WriteOptions write_options;
6319     TransactionOptions txn_options;
6320     txn_options.skip_prepare = false;
6321     Transaction* txn = db->BeginTransaction(write_options, txn_options);
6322     ASSERT_TRUE(txn->Commit().IsTxnNotPrepared());
6323     delete txn;
6324   }
6325 
6326   {
6327     // skip_prepare = true.
6328     WriteOptions write_options;
6329     TransactionOptions txn_options;
6330     txn_options.skip_prepare = true;
6331     Transaction* txn = db->BeginTransaction(write_options, txn_options);
6332     ASSERT_OK(txn->Commit());
6333     delete txn;
6334   }
6335 }
6336 
6337 }  // namespace ROCKSDB_NAMESPACE
6338 
main(int argc,char ** argv)6339 int main(int argc, char** argv) {
6340   ::testing::InitGoogleTest(&argc, argv);
6341   return RUN_ALL_TESTS();
6342 }
6343 
6344 #else
6345 #include <stdio.h>
6346 
main(int,char **)6347 int main(int /*argc*/, char** /*argv*/) {
6348   fprintf(stderr,
6349           "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
6350   return 0;
6351 }
6352 
6353 #endif  // ROCKSDB_LITE
6354