1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include <functional>
9 #include <string>
10 #include <thread>
11 
12 #include "db/db_impl/db_impl.h"
13 #include "logging/logging.h"
14 #include "port/port.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/perf_context.h"
17 #include "rocksdb/utilities/optimistic_transaction_db.h"
18 #include "rocksdb/utilities/transaction.h"
19 #include "test_util/sync_point.h"
20 #include "test_util/testharness.h"
21 #include "test_util/transaction_test_util.h"
22 #include "util/crc32c.h"
23 #include "util/random.h"
24 
25 using std::string;
26 
27 namespace ROCKSDB_NAMESPACE {
28 
29 class OptimisticTransactionTest
30     : public testing::Test,
31       public testing::WithParamInterface<OccValidationPolicy> {
32  public:
33   OptimisticTransactionDB* txn_db;
34   string dbname;
35   Options options;
36 
OptimisticTransactionTest()37   OptimisticTransactionTest() {
38     options.create_if_missing = true;
39     options.max_write_buffer_number = 2;
40     options.max_write_buffer_size_to_maintain = 1600;
41     dbname = test::PerThreadDBPath("optimistic_transaction_testdb");
42 
43     DestroyDB(dbname, options);
44     Open();
45   }
~OptimisticTransactionTest()46   ~OptimisticTransactionTest() override {
47     delete txn_db;
48     DestroyDB(dbname, options);
49   }
50 
Reopen()51   void Reopen() {
52     delete txn_db;
53     txn_db = nullptr;
54     Open();
55   }
56 
57 private:
Open()58   void Open() {
59     ColumnFamilyOptions cf_options(options);
60     OptimisticTransactionDBOptions occ_opts;
61     occ_opts.validate_policy = GetParam();
62     std::vector<ColumnFamilyDescriptor> column_families;
63     std::vector<ColumnFamilyHandle*> handles;
64     column_families.push_back(
65         ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
66     Status s =
67         OptimisticTransactionDB::Open(DBOptions(options), occ_opts, dbname,
68                                       column_families, &handles, &txn_db);
69 
70     assert(s.ok());
71     assert(txn_db != nullptr);
72     assert(handles.size() == 1);
73     delete handles[0];
74   }
75 };
76 
TEST_P(OptimisticTransactionTest,SuccessTest)77 TEST_P(OptimisticTransactionTest, SuccessTest) {
78   WriteOptions write_options;
79   ReadOptions read_options;
80   string value;
81   Status s;
82 
83   txn_db->Put(write_options, Slice("foo"), Slice("bar"));
84   txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
85 
86   Transaction* txn = txn_db->BeginTransaction(write_options);
87   ASSERT_TRUE(txn);
88 
89   txn->GetForUpdate(read_options, "foo", &value);
90   ASSERT_EQ(value, "bar");
91 
92   txn->Put(Slice("foo"), Slice("bar2"));
93 
94   txn->GetForUpdate(read_options, "foo", &value);
95   ASSERT_EQ(value, "bar2");
96 
97   s = txn->Commit();
98   ASSERT_OK(s);
99 
100   txn_db->Get(read_options, "foo", &value);
101   ASSERT_EQ(value, "bar2");
102 
103   delete txn;
104 }
105 
TEST_P(OptimisticTransactionTest,WriteConflictTest)106 TEST_P(OptimisticTransactionTest, WriteConflictTest) {
107   WriteOptions write_options;
108   ReadOptions read_options;
109   string value;
110   Status s;
111 
112   txn_db->Put(write_options, "foo", "bar");
113   txn_db->Put(write_options, "foo2", "bar");
114 
115   Transaction* txn = txn_db->BeginTransaction(write_options);
116   ASSERT_TRUE(txn);
117 
118   txn->Put("foo", "bar2");
119 
120   // This Put outside of a transaction will conflict with the previous write
121   s = txn_db->Put(write_options, "foo", "barz");
122   ASSERT_OK(s);
123 
124   s = txn_db->Get(read_options, "foo", &value);
125   ASSERT_EQ(value, "barz");
126   ASSERT_EQ(1, txn->GetNumKeys());
127 
128   s = txn->Commit();
129   ASSERT_TRUE(s.IsBusy());  // Txn should not commit
130 
131   // Verify that transaction did not write anything
132   txn_db->Get(read_options, "foo", &value);
133   ASSERT_EQ(value, "barz");
134   txn_db->Get(read_options, "foo2", &value);
135   ASSERT_EQ(value, "bar");
136 
137   delete txn;
138 }
139 
TEST_P(OptimisticTransactionTest,WriteConflictTest2)140 TEST_P(OptimisticTransactionTest, WriteConflictTest2) {
141   WriteOptions write_options;
142   ReadOptions read_options;
143   OptimisticTransactionOptions txn_options;
144   string value;
145   Status s;
146 
147   txn_db->Put(write_options, "foo", "bar");
148   txn_db->Put(write_options, "foo2", "bar");
149 
150   txn_options.set_snapshot = true;
151   Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
152   ASSERT_TRUE(txn);
153 
154   // This Put outside of a transaction will conflict with a later write
155   s = txn_db->Put(write_options, "foo", "barz");
156   ASSERT_OK(s);
157 
158   txn->Put("foo", "bar2");  // Conflicts with write done after snapshot taken
159 
160   s = txn_db->Get(read_options, "foo", &value);
161   ASSERT_EQ(value, "barz");
162 
163   s = txn->Commit();
164   ASSERT_TRUE(s.IsBusy());  // Txn should not commit
165 
166   // Verify that transaction did not write anything
167   txn_db->Get(read_options, "foo", &value);
168   ASSERT_EQ(value, "barz");
169   txn_db->Get(read_options, "foo2", &value);
170   ASSERT_EQ(value, "bar");
171 
172   delete txn;
173 }
174 
TEST_P(OptimisticTransactionTest,ReadConflictTest)175 TEST_P(OptimisticTransactionTest, ReadConflictTest) {
176   WriteOptions write_options;
177   ReadOptions read_options, snapshot_read_options;
178   OptimisticTransactionOptions txn_options;
179   string value;
180   Status s;
181 
182   txn_db->Put(write_options, "foo", "bar");
183   txn_db->Put(write_options, "foo2", "bar");
184 
185   txn_options.set_snapshot = true;
186   Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
187   ASSERT_TRUE(txn);
188 
189   txn->SetSnapshot();
190   snapshot_read_options.snapshot = txn->GetSnapshot();
191 
192   txn->GetForUpdate(snapshot_read_options, "foo", &value);
193   ASSERT_EQ(value, "bar");
194 
195   // This Put outside of a transaction will conflict with the previous read
196   s = txn_db->Put(write_options, "foo", "barz");
197   ASSERT_OK(s);
198 
199   s = txn_db->Get(read_options, "foo", &value);
200   ASSERT_EQ(value, "barz");
201 
202   s = txn->Commit();
203   ASSERT_TRUE(s.IsBusy());  // Txn should not commit
204 
205   // Verify that transaction did not write anything
206   txn->GetForUpdate(read_options, "foo", &value);
207   ASSERT_EQ(value, "barz");
208   txn->GetForUpdate(read_options, "foo2", &value);
209   ASSERT_EQ(value, "bar");
210 
211   delete txn;
212 }
213 
TEST_P(OptimisticTransactionTest,TxnOnlyTest)214 TEST_P(OptimisticTransactionTest, TxnOnlyTest) {
215   // Test to make sure transactions work when there are no other writes in an
216   // empty db.
217 
218   WriteOptions write_options;
219   ReadOptions read_options;
220   string value;
221   Status s;
222 
223   Transaction* txn = txn_db->BeginTransaction(write_options);
224   ASSERT_TRUE(txn);
225 
226   txn->Put("x", "y");
227 
228   s = txn->Commit();
229   ASSERT_OK(s);
230 
231   delete txn;
232 }
233 
TEST_P(OptimisticTransactionTest,FlushTest)234 TEST_P(OptimisticTransactionTest, FlushTest) {
235   WriteOptions write_options;
236   ReadOptions read_options, snapshot_read_options;
237   string value;
238   Status s;
239 
240   txn_db->Put(write_options, Slice("foo"), Slice("bar"));
241   txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
242 
243   Transaction* txn = txn_db->BeginTransaction(write_options);
244   ASSERT_TRUE(txn);
245 
246   snapshot_read_options.snapshot = txn->GetSnapshot();
247 
248   txn->GetForUpdate(snapshot_read_options, "foo", &value);
249   ASSERT_EQ(value, "bar");
250 
251   txn->Put(Slice("foo"), Slice("bar2"));
252 
253   txn->GetForUpdate(snapshot_read_options, "foo", &value);
254   ASSERT_EQ(value, "bar2");
255 
256   // Put a random key so we have a memtable to flush
257   s = txn_db->Put(write_options, "dummy", "dummy");
258   ASSERT_OK(s);
259 
260   // force a memtable flush
261   FlushOptions flush_ops;
262   txn_db->Flush(flush_ops);
263 
264   s = txn->Commit();
265   // txn should commit since the flushed table is still in MemtableList History
266   ASSERT_OK(s);
267 
268   txn_db->Get(read_options, "foo", &value);
269   ASSERT_EQ(value, "bar2");
270 
271   delete txn;
272 }
273 
TEST_P(OptimisticTransactionTest,FlushTest2)274 TEST_P(OptimisticTransactionTest, FlushTest2) {
275   WriteOptions write_options;
276   ReadOptions read_options, snapshot_read_options;
277   string value;
278   Status s;
279 
280   txn_db->Put(write_options, Slice("foo"), Slice("bar"));
281   txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
282 
283   Transaction* txn = txn_db->BeginTransaction(write_options);
284   ASSERT_TRUE(txn);
285 
286   snapshot_read_options.snapshot = txn->GetSnapshot();
287 
288   txn->GetForUpdate(snapshot_read_options, "foo", &value);
289   ASSERT_EQ(value, "bar");
290 
291   txn->Put(Slice("foo"), Slice("bar2"));
292 
293   txn->GetForUpdate(snapshot_read_options, "foo", &value);
294   ASSERT_EQ(value, "bar2");
295 
296   // Put a random key so we have a MemTable to flush
297   s = txn_db->Put(write_options, "dummy", "dummy");
298   ASSERT_OK(s);
299 
300   // force a memtable flush
301   FlushOptions flush_ops;
302   txn_db->Flush(flush_ops);
303 
304   // Put a random key so we have a MemTable to flush
305   s = txn_db->Put(write_options, "dummy", "dummy2");
306   ASSERT_OK(s);
307 
308   // force a memtable flush
309   txn_db->Flush(flush_ops);
310 
311   s = txn_db->Put(write_options, "dummy", "dummy3");
312   ASSERT_OK(s);
313 
314   // force a memtable flush
315   // Since our test db has max_write_buffer_number=2, this flush will cause
316   // the first memtable to get purged from the MemtableList history.
317   txn_db->Flush(flush_ops);
318 
319   s = txn->Commit();
320   // txn should not commit since MemTableList History is not large enough
321   ASSERT_TRUE(s.IsTryAgain());
322 
323   txn_db->Get(read_options, "foo", &value);
324   ASSERT_EQ(value, "bar");
325 
326   delete txn;
327 }
328 
329 // Trigger the condition where some old memtables are skipped when doing
330 // TransactionUtil::CheckKey(), and make sure the result is still correct.
TEST_P(OptimisticTransactionTest,CheckKeySkipOldMemtable)331 TEST_P(OptimisticTransactionTest, CheckKeySkipOldMemtable) {
332   const int kAttemptHistoryMemtable = 0;
333   const int kAttemptImmMemTable = 1;
334   for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
335        attempt++) {
336     options.max_write_buffer_number_to_maintain = 3;
337     Reopen();
338 
339     WriteOptions write_options;
340     ReadOptions read_options;
341     ReadOptions snapshot_read_options;
342     ReadOptions snapshot_read_options2;
343     string value;
344     Status s;
345 
346     ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
347     ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
348 
349     Transaction* txn = txn_db->BeginTransaction(write_options);
350     ASSERT_TRUE(txn != nullptr);
351 
352     Transaction* txn2 = txn_db->BeginTransaction(write_options);
353     ASSERT_TRUE(txn2 != nullptr);
354 
355     snapshot_read_options.snapshot = txn->GetSnapshot();
356     ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
357     ASSERT_EQ(value, "bar");
358     ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
359 
360     snapshot_read_options2.snapshot = txn2->GetSnapshot();
361     ASSERT_OK(txn2->GetForUpdate(snapshot_read_options2, "foo2", &value));
362     ASSERT_EQ(value, "bar");
363     ASSERT_OK(txn2->Put(Slice("foo2"), Slice("bar2")));
364 
365     // txn updates "foo" and txn2 updates "foo2", and now a write is
366     // issued for "foo", which conflicts with txn but not txn2
367     ASSERT_OK(txn_db->Put(write_options, "foo", "bar"));
368 
369     if (attempt == kAttemptImmMemTable) {
370       // For the second attempt, hold flush from beginning. The memtable
371       // will be switched to immutable after calling TEST_SwitchMemtable()
372       // while CheckKey() is called.
373       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
374           {{"OptimisticTransactionTest.CheckKeySkipOldMemtable",
375             "FlushJob::Start"}});
376       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
377     }
378 
379     // force a memtable flush. The memtable should still be kept
380     FlushOptions flush_ops;
381     if (attempt == kAttemptHistoryMemtable) {
382       ASSERT_OK(txn_db->Flush(flush_ops));
383     } else {
384       assert(attempt == kAttemptImmMemTable);
385       DBImpl* db_impl = static_cast<DBImpl*>(txn_db->GetRootDB());
386       db_impl->TEST_SwitchMemtable();
387     }
388     uint64_t num_imm_mems;
389     ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
390                                        &num_imm_mems));
391     if (attempt == kAttemptHistoryMemtable) {
392       ASSERT_EQ(0, num_imm_mems);
393     } else {
394       assert(attempt == kAttemptImmMemTable);
395       ASSERT_EQ(1, num_imm_mems);
396     }
397 
398     // Put something in active memtable
399     ASSERT_OK(txn_db->Put(write_options, Slice("foo3"), Slice("bar")));
400 
401     // Create txn3 after flushing, when this transaction is commited,
402     // only need to check the active memtable
403     Transaction* txn3 = txn_db->BeginTransaction(write_options);
404     ASSERT_TRUE(txn3 != nullptr);
405 
406     // Commit both of txn and txn2. txn will conflict but txn2 will
407     // pass. In both ways, both memtables are queried.
408     SetPerfLevel(PerfLevel::kEnableCount);
409 
410     get_perf_context()->Reset();
411     s = txn->Commit();
412     // We should have checked two memtables
413     ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
414     // txn should fail because of conflict, even if the memtable
415     // has flushed, because it is still preserved in history.
416     ASSERT_TRUE(s.IsBusy());
417 
418     get_perf_context()->Reset();
419     s = txn2->Commit();
420     // We should have checked two memtables
421     ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
422     ASSERT_TRUE(s.ok());
423 
424     txn3->Put(Slice("foo2"), Slice("bar2"));
425     get_perf_context()->Reset();
426     s = txn3->Commit();
427     // txn3 is created after the active memtable is created, so that is the only
428     // memtable to check.
429     ASSERT_EQ(1, get_perf_context()->get_from_memtable_count);
430     ASSERT_TRUE(s.ok());
431 
432     TEST_SYNC_POINT("OptimisticTransactionTest.CheckKeySkipOldMemtable");
433     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
434 
435     SetPerfLevel(PerfLevel::kDisable);
436 
437     delete txn;
438     delete txn2;
439     delete txn3;
440   }
441 }
442 
TEST_P(OptimisticTransactionTest,NoSnapshotTest)443 TEST_P(OptimisticTransactionTest, NoSnapshotTest) {
444   WriteOptions write_options;
445   ReadOptions read_options;
446   string value;
447   Status s;
448 
449   txn_db->Put(write_options, "AAA", "bar");
450 
451   Transaction* txn = txn_db->BeginTransaction(write_options);
452   ASSERT_TRUE(txn);
453 
454   // Modify key after transaction start
455   txn_db->Put(write_options, "AAA", "bar1");
456 
457   // Read and write without a snapshot
458   txn->GetForUpdate(read_options, "AAA", &value);
459   ASSERT_EQ(value, "bar1");
460   txn->Put("AAA", "bar2");
461 
462   // Should commit since read/write was done after data changed
463   s = txn->Commit();
464   ASSERT_OK(s);
465 
466   txn->GetForUpdate(read_options, "AAA", &value);
467   ASSERT_EQ(value, "bar2");
468 
469   delete txn;
470 }
471 
TEST_P(OptimisticTransactionTest,MultipleSnapshotTest)472 TEST_P(OptimisticTransactionTest, MultipleSnapshotTest) {
473   WriteOptions write_options;
474   ReadOptions read_options, snapshot_read_options;
475   string value;
476   Status s;
477 
478   txn_db->Put(write_options, "AAA", "bar");
479   txn_db->Put(write_options, "BBB", "bar");
480   txn_db->Put(write_options, "CCC", "bar");
481 
482   Transaction* txn = txn_db->BeginTransaction(write_options);
483   ASSERT_TRUE(txn);
484 
485   txn_db->Put(write_options, "AAA", "bar1");
486 
487   // Read and write without a snapshot
488   txn->GetForUpdate(read_options, "AAA", &value);
489   ASSERT_EQ(value, "bar1");
490   txn->Put("AAA", "bar2");
491 
492   // Modify BBB before snapshot is taken
493   txn_db->Put(write_options, "BBB", "bar1");
494 
495   txn->SetSnapshot();
496   snapshot_read_options.snapshot = txn->GetSnapshot();
497 
498   // Read and write with snapshot
499   txn->GetForUpdate(snapshot_read_options, "BBB", &value);
500   ASSERT_EQ(value, "bar1");
501   txn->Put("BBB", "bar2");
502 
503   txn_db->Put(write_options, "CCC", "bar1");
504 
505   // Set a new snapshot
506   txn->SetSnapshot();
507   snapshot_read_options.snapshot = txn->GetSnapshot();
508 
509   // Read and write with snapshot
510   txn->GetForUpdate(snapshot_read_options, "CCC", &value);
511   ASSERT_EQ(value, "bar1");
512   txn->Put("CCC", "bar2");
513 
514   s = txn->GetForUpdate(read_options, "AAA", &value);
515   ASSERT_OK(s);
516   ASSERT_EQ(value, "bar2");
517   s = txn->GetForUpdate(read_options, "BBB", &value);
518   ASSERT_OK(s);
519   ASSERT_EQ(value, "bar2");
520   s = txn->GetForUpdate(read_options, "CCC", &value);
521   ASSERT_OK(s);
522   ASSERT_EQ(value, "bar2");
523 
524   s = txn_db->Get(read_options, "AAA", &value);
525   ASSERT_OK(s);
526   ASSERT_EQ(value, "bar1");
527   s = txn_db->Get(read_options, "BBB", &value);
528   ASSERT_OK(s);
529   ASSERT_EQ(value, "bar1");
530   s = txn_db->Get(read_options, "CCC", &value);
531   ASSERT_OK(s);
532   ASSERT_EQ(value, "bar1");
533 
534   s = txn->Commit();
535   ASSERT_OK(s);
536 
537   s = txn_db->Get(read_options, "AAA", &value);
538   ASSERT_OK(s);
539   ASSERT_EQ(value, "bar2");
540   s = txn_db->Get(read_options, "BBB", &value);
541   ASSERT_OK(s);
542   ASSERT_EQ(value, "bar2");
543   s = txn_db->Get(read_options, "CCC", &value);
544   ASSERT_OK(s);
545   ASSERT_EQ(value, "bar2");
546 
547   // verify that we track multiple writes to the same key at different snapshots
548   delete txn;
549   txn = txn_db->BeginTransaction(write_options);
550 
551   // Potentially conflicting writes
552   txn_db->Put(write_options, "ZZZ", "zzz");
553   txn_db->Put(write_options, "XXX", "xxx");
554 
555   txn->SetSnapshot();
556 
557   OptimisticTransactionOptions txn_options;
558   txn_options.set_snapshot = true;
559   Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
560   txn2->SetSnapshot();
561 
562   // This should not conflict in txn since the snapshot is later than the
563   // previous write (spoiler alert:  it will later conflict with txn2).
564   txn->Put("ZZZ", "zzzz");
565   s = txn->Commit();
566   ASSERT_OK(s);
567 
568   delete txn;
569 
570   // This will conflict since the snapshot is earlier than another write to ZZZ
571   txn2->Put("ZZZ", "xxxxx");
572 
573   s = txn2->Commit();
574   ASSERT_TRUE(s.IsBusy());
575 
576   delete txn2;
577 }
578 
TEST_P(OptimisticTransactionTest,ColumnFamiliesTest)579 TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) {
580   WriteOptions write_options;
581   ReadOptions read_options, snapshot_read_options;
582   OptimisticTransactionOptions txn_options;
583   string value;
584   Status s;
585 
586   ColumnFamilyHandle *cfa, *cfb;
587   ColumnFamilyOptions cf_options;
588 
589   // Create 2 new column families
590   s = txn_db->CreateColumnFamily(cf_options, "CFA", &cfa);
591   ASSERT_OK(s);
592   s = txn_db->CreateColumnFamily(cf_options, "CFB", &cfb);
593   ASSERT_OK(s);
594 
595   delete cfa;
596   delete cfb;
597   delete txn_db;
598   txn_db = nullptr;
599 
600   // open DB with three column families
601   std::vector<ColumnFamilyDescriptor> column_families;
602   // have to open default column family
603   column_families.push_back(
604       ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
605   // open the new column families
606   column_families.push_back(
607       ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
608   column_families.push_back(
609       ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
610   std::vector<ColumnFamilyHandle*> handles;
611   s = OptimisticTransactionDB::Open(options, dbname, column_families, &handles,
612                                     &txn_db);
613   ASSERT_OK(s);
614   assert(txn_db != nullptr);
615 
616   Transaction* txn = txn_db->BeginTransaction(write_options);
617   ASSERT_TRUE(txn);
618 
619   txn->SetSnapshot();
620   snapshot_read_options.snapshot = txn->GetSnapshot();
621 
622   txn_options.set_snapshot = true;
623   Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
624   ASSERT_TRUE(txn2);
625 
626   // Write some data to the db
627   WriteBatch batch;
628   batch.Put("foo", "foo");
629   batch.Put(handles[1], "AAA", "bar");
630   batch.Put(handles[1], "AAAZZZ", "bar");
631   s = txn_db->Write(write_options, &batch);
632   ASSERT_OK(s);
633   txn_db->Delete(write_options, handles[1], "AAAZZZ");
634 
635   // These keys do no conflict with existing writes since they're in
636   // different column families
637   txn->Delete("AAA");
638   txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
639   Slice key_slice("AAAZZZ");
640   Slice value_slices[2] = {Slice("bar"), Slice("bar")};
641   txn->Put(handles[2], SliceParts(&key_slice, 1), SliceParts(value_slices, 2));
642 
643   ASSERT_EQ(3, txn->GetNumKeys());
644 
645   // Txn should commit
646   s = txn->Commit();
647   ASSERT_OK(s);
648   s = txn_db->Get(read_options, "AAA", &value);
649   ASSERT_TRUE(s.IsNotFound());
650   s = txn_db->Get(read_options, handles[2], "AAAZZZ", &value);
651   ASSERT_EQ(value, "barbar");
652 
653   Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
654   Slice value_slice("barbarbar");
655   // This write will cause a conflict with the earlier batch write
656   txn2->Put(handles[1], SliceParts(key_slices, 3), SliceParts(&value_slice, 1));
657 
658   txn2->Delete(handles[2], "XXX");
659   txn2->Delete(handles[1], "XXX");
660   s = txn2->GetForUpdate(snapshot_read_options, handles[1], "AAA", &value);
661   ASSERT_TRUE(s.IsNotFound());
662 
663   // Verify txn did not commit
664   s = txn2->Commit();
665   ASSERT_TRUE(s.IsBusy());
666   s = txn_db->Get(read_options, handles[1], "AAAZZZ", &value);
667   ASSERT_EQ(value, "barbar");
668 
669   delete txn;
670   delete txn2;
671 
672   txn = txn_db->BeginTransaction(write_options, txn_options);
673   snapshot_read_options.snapshot = txn->GetSnapshot();
674 
675   txn2 = txn_db->BeginTransaction(write_options, txn_options);
676   ASSERT_TRUE(txn);
677 
678   std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
679                                                    handles[0], handles[2]};
680   std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
681   std::vector<std::string> values(4);
682 
683   std::vector<Status> results = txn->MultiGetForUpdate(
684       snapshot_read_options, multiget_cfh, multiget_keys, &values);
685   ASSERT_OK(results[0]);
686   ASSERT_OK(results[1]);
687   ASSERT_OK(results[2]);
688   ASSERT_TRUE(results[3].IsNotFound());
689   ASSERT_EQ(values[0], "bar");
690   ASSERT_EQ(values[1], "barbar");
691   ASSERT_EQ(values[2], "foo");
692 
693   txn->Delete(handles[2], "ZZZ");
694   txn->Put(handles[2], "ZZZ", "YYY");
695   txn->Put(handles[2], "ZZZ", "YYYY");
696   txn->Delete(handles[2], "ZZZ");
697   txn->Put(handles[2], "AAAZZZ", "barbarbar");
698 
699   ASSERT_EQ(5, txn->GetNumKeys());
700 
701   // Txn should commit
702   s = txn->Commit();
703   ASSERT_OK(s);
704   s = txn_db->Get(read_options, handles[2], "ZZZ", &value);
705   ASSERT_TRUE(s.IsNotFound());
706 
707   // Put a key which will conflict with the next txn using the previous snapshot
708   txn_db->Put(write_options, handles[2], "foo", "000");
709 
710   results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
711                                     multiget_keys, &values);
712   ASSERT_OK(results[0]);
713   ASSERT_OK(results[1]);
714   ASSERT_OK(results[2]);
715   ASSERT_TRUE(results[3].IsNotFound());
716   ASSERT_EQ(values[0], "bar");
717   ASSERT_EQ(values[1], "barbar");
718   ASSERT_EQ(values[2], "foo");
719 
720   // Verify Txn Did not Commit
721   s = txn2->Commit();
722   ASSERT_TRUE(s.IsBusy());
723 
724   s = txn_db->DropColumnFamily(handles[1]);
725   ASSERT_OK(s);
726   s = txn_db->DropColumnFamily(handles[2]);
727   ASSERT_OK(s);
728 
729   delete txn;
730   delete txn2;
731 
732   for (auto handle : handles) {
733     delete handle;
734   }
735 }
736 
TEST_P(OptimisticTransactionTest,EmptyTest)737 TEST_P(OptimisticTransactionTest, EmptyTest) {
738   WriteOptions write_options;
739   ReadOptions read_options;
740   string value;
741   Status s;
742 
743   s = txn_db->Put(write_options, "aaa", "aaa");
744   ASSERT_OK(s);
745 
746   Transaction* txn = txn_db->BeginTransaction(write_options);
747   s = txn->Commit();
748   ASSERT_OK(s);
749   delete txn;
750 
751   txn = txn_db->BeginTransaction(write_options);
752   txn->Rollback();
753   delete txn;
754 
755   txn = txn_db->BeginTransaction(write_options);
756   s = txn->GetForUpdate(read_options, "aaa", &value);
757   ASSERT_EQ(value, "aaa");
758 
759   s = txn->Commit();
760   ASSERT_OK(s);
761   delete txn;
762 
763   txn = txn_db->BeginTransaction(write_options);
764   txn->SetSnapshot();
765   s = txn->GetForUpdate(read_options, "aaa", &value);
766   ASSERT_EQ(value, "aaa");
767 
768   s = txn_db->Put(write_options, "aaa", "xxx");
769   s = txn->Commit();
770   ASSERT_TRUE(s.IsBusy());
771   delete txn;
772 }
773 
TEST_P(OptimisticTransactionTest,PredicateManyPreceders)774 TEST_P(OptimisticTransactionTest, PredicateManyPreceders) {
775   WriteOptions write_options;
776   ReadOptions read_options1, read_options2;
777   OptimisticTransactionOptions txn_options;
778   string value;
779   Status s;
780 
781   txn_options.set_snapshot = true;
782   Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options);
783   read_options1.snapshot = txn1->GetSnapshot();
784 
785   Transaction* txn2 = txn_db->BeginTransaction(write_options);
786   txn2->SetSnapshot();
787   read_options2.snapshot = txn2->GetSnapshot();
788 
789   std::vector<Slice> multiget_keys = {"1", "2", "3"};
790   std::vector<std::string> multiget_values;
791 
792   std::vector<Status> results =
793       txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
794   ASSERT_TRUE(results[1].IsNotFound());
795 
796   txn2->Put("2", "x");
797 
798   s = txn2->Commit();
799   ASSERT_OK(s);
800 
801   multiget_values.clear();
802   results =
803       txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
804   ASSERT_TRUE(results[1].IsNotFound());
805 
806   // should not commit since txn2 wrote a key txn has read
807   s = txn1->Commit();
808   ASSERT_TRUE(s.IsBusy());
809 
810   delete txn1;
811   delete txn2;
812 
813   txn1 = txn_db->BeginTransaction(write_options, txn_options);
814   read_options1.snapshot = txn1->GetSnapshot();
815 
816   txn2 = txn_db->BeginTransaction(write_options, txn_options);
817   read_options2.snapshot = txn2->GetSnapshot();
818 
819   txn1->Put("4", "x");
820 
821   txn2->Delete("4");
822 
823   // txn1 can commit since txn2's delete hasn't happened yet (it's just batched)
824   s = txn1->Commit();
825   ASSERT_OK(s);
826 
827   s = txn2->GetForUpdate(read_options2, "4", &value);
828   ASSERT_TRUE(s.IsNotFound());
829 
830   // txn2 cannot commit since txn1 changed "4"
831   s = txn2->Commit();
832   ASSERT_TRUE(s.IsBusy());
833 
834   delete txn1;
835   delete txn2;
836 }
837 
TEST_P(OptimisticTransactionTest,LostUpdate)838 TEST_P(OptimisticTransactionTest, LostUpdate) {
839   WriteOptions write_options;
840   ReadOptions read_options, read_options1, read_options2;
841   OptimisticTransactionOptions txn_options;
842   string value;
843   Status s;
844 
845   // Test 2 transactions writing to the same key in multiple orders and
846   // with/without snapshots
847 
848   Transaction* txn1 = txn_db->BeginTransaction(write_options);
849   Transaction* txn2 = txn_db->BeginTransaction(write_options);
850 
851   txn1->Put("1", "1");
852   txn2->Put("1", "2");
853 
854   s = txn1->Commit();
855   ASSERT_OK(s);
856 
857   s = txn2->Commit();
858   ASSERT_TRUE(s.IsBusy());
859 
860   delete txn1;
861   delete txn2;
862 
863   txn_options.set_snapshot = true;
864   txn1 = txn_db->BeginTransaction(write_options, txn_options);
865   read_options1.snapshot = txn1->GetSnapshot();
866 
867   txn2 = txn_db->BeginTransaction(write_options, txn_options);
868   read_options2.snapshot = txn2->GetSnapshot();
869 
870   txn1->Put("1", "3");
871   txn2->Put("1", "4");
872 
873   s = txn1->Commit();
874   ASSERT_OK(s);
875 
876   s = txn2->Commit();
877   ASSERT_TRUE(s.IsBusy());
878 
879   delete txn1;
880   delete txn2;
881 
882   txn1 = txn_db->BeginTransaction(write_options, txn_options);
883   read_options1.snapshot = txn1->GetSnapshot();
884 
885   txn2 = txn_db->BeginTransaction(write_options, txn_options);
886   read_options2.snapshot = txn2->GetSnapshot();
887 
888   txn1->Put("1", "5");
889   s = txn1->Commit();
890   ASSERT_OK(s);
891 
892   txn2->Put("1", "6");
893   s = txn2->Commit();
894   ASSERT_TRUE(s.IsBusy());
895 
896   delete txn1;
897   delete txn2;
898 
899   txn1 = txn_db->BeginTransaction(write_options, txn_options);
900   read_options1.snapshot = txn1->GetSnapshot();
901 
902   txn2 = txn_db->BeginTransaction(write_options, txn_options);
903   read_options2.snapshot = txn2->GetSnapshot();
904 
905   txn1->Put("1", "5");
906   s = txn1->Commit();
907   ASSERT_OK(s);
908 
909   txn2->SetSnapshot();
910   txn2->Put("1", "6");
911   s = txn2->Commit();
912   ASSERT_OK(s);
913 
914   delete txn1;
915   delete txn2;
916 
917   txn1 = txn_db->BeginTransaction(write_options);
918   txn2 = txn_db->BeginTransaction(write_options);
919 
920   txn1->Put("1", "7");
921   s = txn1->Commit();
922   ASSERT_OK(s);
923 
924   txn2->Put("1", "8");
925   s = txn2->Commit();
926   ASSERT_OK(s);
927 
928   delete txn1;
929   delete txn2;
930 
931   s = txn_db->Get(read_options, "1", &value);
932   ASSERT_OK(s);
933   ASSERT_EQ(value, "8");
934 }
935 
TEST_P(OptimisticTransactionTest,UntrackedWrites)936 TEST_P(OptimisticTransactionTest, UntrackedWrites) {
937   WriteOptions write_options;
938   ReadOptions read_options;
939   string value;
940   Status s;
941 
942   // Verify transaction rollback works for untracked keys.
943   Transaction* txn = txn_db->BeginTransaction(write_options);
944   txn->PutUntracked("untracked", "0");
945   txn->Rollback();
946   s = txn_db->Get(read_options, "untracked", &value);
947   ASSERT_TRUE(s.IsNotFound());
948 
949   delete txn;
950   txn = txn_db->BeginTransaction(write_options);
951 
952   txn->Put("tracked", "1");
953   txn->PutUntracked("untracked", "1");
954   txn->MergeUntracked("untracked", "2");
955   txn->DeleteUntracked("untracked");
956 
957   // Write to the untracked key outside of the transaction and verify
958   // it doesn't prevent the transaction from committing.
959   s = txn_db->Put(write_options, "untracked", "x");
960   ASSERT_OK(s);
961 
962   s = txn->Commit();
963   ASSERT_OK(s);
964 
965   s = txn_db->Get(read_options, "untracked", &value);
966   ASSERT_TRUE(s.IsNotFound());
967 
968   delete txn;
969   txn = txn_db->BeginTransaction(write_options);
970 
971   txn->Put("tracked", "10");
972   txn->PutUntracked("untracked", "A");
973 
974   // Write to tracked key outside of the transaction and verify that the
975   // untracked keys are not written when the commit fails.
976   s = txn_db->Delete(write_options, "tracked");
977 
978   s = txn->Commit();
979   ASSERT_TRUE(s.IsBusy());
980 
981   s = txn_db->Get(read_options, "untracked", &value);
982   ASSERT_TRUE(s.IsNotFound());
983 
984   delete txn;
985 }
986 
TEST_P(OptimisticTransactionTest,IteratorTest)987 TEST_P(OptimisticTransactionTest, IteratorTest) {
988   WriteOptions write_options;
989   ReadOptions read_options, snapshot_read_options;
990   OptimisticTransactionOptions txn_options;
991   string value;
992   Status s;
993 
994   // Write some keys to the db
995   s = txn_db->Put(write_options, "A", "a");
996   ASSERT_OK(s);
997 
998   s = txn_db->Put(write_options, "G", "g");
999   ASSERT_OK(s);
1000 
1001   s = txn_db->Put(write_options, "F", "f");
1002   ASSERT_OK(s);
1003 
1004   s = txn_db->Put(write_options, "C", "c");
1005   ASSERT_OK(s);
1006 
1007   s = txn_db->Put(write_options, "D", "d");
1008   ASSERT_OK(s);
1009 
1010   Transaction* txn = txn_db->BeginTransaction(write_options);
1011   ASSERT_TRUE(txn);
1012 
1013   // Write some keys in a txn
1014   s = txn->Put("B", "b");
1015   ASSERT_OK(s);
1016 
1017   s = txn->Put("H", "h");
1018   ASSERT_OK(s);
1019 
1020   s = txn->Delete("D");
1021   ASSERT_OK(s);
1022 
1023   s = txn->Put("E", "e");
1024   ASSERT_OK(s);
1025 
1026   txn->SetSnapshot();
1027   const Snapshot* snapshot = txn->GetSnapshot();
1028 
1029   // Write some keys to the db after the snapshot
1030   s = txn_db->Put(write_options, "BB", "xx");
1031   ASSERT_OK(s);
1032 
1033   s = txn_db->Put(write_options, "C", "xx");
1034   ASSERT_OK(s);
1035 
1036   read_options.snapshot = snapshot;
1037   Iterator* iter = txn->GetIterator(read_options);
1038   ASSERT_OK(iter->status());
1039   iter->SeekToFirst();
1040 
1041   // Read all keys via iter and lock them all
1042   std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
1043   for (int i = 0; i < 7; i++) {
1044     ASSERT_OK(iter->status());
1045     ASSERT_TRUE(iter->Valid());
1046     ASSERT_EQ(results[i], iter->value().ToString());
1047 
1048     s = txn->GetForUpdate(read_options, iter->key(), nullptr);
1049     ASSERT_OK(s);
1050 
1051     iter->Next();
1052   }
1053   ASSERT_FALSE(iter->Valid());
1054 
1055   iter->Seek("G");
1056   ASSERT_OK(iter->status());
1057   ASSERT_TRUE(iter->Valid());
1058   ASSERT_EQ("g", iter->value().ToString());
1059 
1060   iter->Prev();
1061   ASSERT_OK(iter->status());
1062   ASSERT_TRUE(iter->Valid());
1063   ASSERT_EQ("f", iter->value().ToString());
1064 
1065   iter->Seek("D");
1066   ASSERT_OK(iter->status());
1067   ASSERT_TRUE(iter->Valid());
1068   ASSERT_EQ("e", iter->value().ToString());
1069 
1070   iter->Seek("C");
1071   ASSERT_OK(iter->status());
1072   ASSERT_TRUE(iter->Valid());
1073   ASSERT_EQ("c", iter->value().ToString());
1074 
1075   iter->Next();
1076   ASSERT_OK(iter->status());
1077   ASSERT_TRUE(iter->Valid());
1078   ASSERT_EQ("e", iter->value().ToString());
1079 
1080   iter->Seek("");
1081   ASSERT_OK(iter->status());
1082   ASSERT_TRUE(iter->Valid());
1083   ASSERT_EQ("a", iter->value().ToString());
1084 
1085   iter->Seek("X");
1086   ASSERT_OK(iter->status());
1087   ASSERT_FALSE(iter->Valid());
1088 
1089   iter->SeekToLast();
1090   ASSERT_OK(iter->status());
1091   ASSERT_TRUE(iter->Valid());
1092   ASSERT_EQ("h", iter->value().ToString());
1093 
1094   // key "C" was modified in the db after txn's snapshot.  txn will not commit.
1095   s = txn->Commit();
1096   ASSERT_TRUE(s.IsBusy());
1097 
1098   delete iter;
1099   delete txn;
1100 }
1101 
TEST_P(OptimisticTransactionTest,SavepointTest)1102 TEST_P(OptimisticTransactionTest, SavepointTest) {
1103   WriteOptions write_options;
1104   ReadOptions read_options, snapshot_read_options;
1105   OptimisticTransactionOptions txn_options;
1106   string value;
1107   Status s;
1108 
1109   Transaction* txn = txn_db->BeginTransaction(write_options);
1110   ASSERT_TRUE(txn);
1111 
1112   s = txn->RollbackToSavePoint();
1113   ASSERT_TRUE(s.IsNotFound());
1114 
1115   txn->SetSavePoint();  // 1
1116 
1117   ASSERT_OK(txn->RollbackToSavePoint());  // Rollback to beginning of txn
1118   s = txn->RollbackToSavePoint();
1119   ASSERT_TRUE(s.IsNotFound());
1120 
1121   s = txn->Put("B", "b");
1122   ASSERT_OK(s);
1123 
1124   s = txn->Commit();
1125   ASSERT_OK(s);
1126 
1127   s = txn_db->Get(read_options, "B", &value);
1128   ASSERT_OK(s);
1129   ASSERT_EQ("b", value);
1130 
1131   delete txn;
1132   txn = txn_db->BeginTransaction(write_options);
1133   ASSERT_TRUE(txn);
1134 
1135   s = txn->Put("A", "a");
1136   ASSERT_OK(s);
1137 
1138   s = txn->Put("B", "bb");
1139   ASSERT_OK(s);
1140 
1141   s = txn->Put("C", "c");
1142   ASSERT_OK(s);
1143 
1144   txn->SetSavePoint();  // 2
1145 
1146   s = txn->Delete("B");
1147   ASSERT_OK(s);
1148 
1149   s = txn->Put("C", "cc");
1150   ASSERT_OK(s);
1151 
1152   s = txn->Put("D", "d");
1153   ASSERT_OK(s);
1154 
1155   ASSERT_OK(txn->RollbackToSavePoint());  // Rollback to 2
1156 
1157   s = txn->Get(read_options, "A", &value);
1158   ASSERT_OK(s);
1159   ASSERT_EQ("a", value);
1160 
1161   s = txn->Get(read_options, "B", &value);
1162   ASSERT_OK(s);
1163   ASSERT_EQ("bb", value);
1164 
1165   s = txn->Get(read_options, "C", &value);
1166   ASSERT_OK(s);
1167   ASSERT_EQ("c", value);
1168 
1169   s = txn->Get(read_options, "D", &value);
1170   ASSERT_TRUE(s.IsNotFound());
1171 
1172   s = txn->Put("A", "a");
1173   ASSERT_OK(s);
1174 
1175   s = txn->Put("E", "e");
1176   ASSERT_OK(s);
1177 
1178   // Rollback to beginning of txn
1179   s = txn->RollbackToSavePoint();
1180   ASSERT_TRUE(s.IsNotFound());
1181   txn->Rollback();
1182 
1183   s = txn->Get(read_options, "A", &value);
1184   ASSERT_TRUE(s.IsNotFound());
1185 
1186   s = txn->Get(read_options, "B", &value);
1187   ASSERT_OK(s);
1188   ASSERT_EQ("b", value);
1189 
1190   s = txn->Get(read_options, "D", &value);
1191   ASSERT_TRUE(s.IsNotFound());
1192 
1193   s = txn->Get(read_options, "D", &value);
1194   ASSERT_TRUE(s.IsNotFound());
1195 
1196   s = txn->Get(read_options, "E", &value);
1197   ASSERT_TRUE(s.IsNotFound());
1198 
1199   s = txn->Put("A", "aa");
1200   ASSERT_OK(s);
1201 
1202   s = txn->Put("F", "f");
1203   ASSERT_OK(s);
1204 
1205   txn->SetSavePoint();  // 3
1206   txn->SetSavePoint();  // 4
1207 
1208   s = txn->Put("G", "g");
1209   ASSERT_OK(s);
1210 
1211   s = txn->Delete("F");
1212   ASSERT_OK(s);
1213 
1214   s = txn->Delete("B");
1215   ASSERT_OK(s);
1216 
1217   s = txn->Get(read_options, "A", &value);
1218   ASSERT_OK(s);
1219   ASSERT_EQ("aa", value);
1220 
1221   s = txn->Get(read_options, "F", &value);
1222   ASSERT_TRUE(s.IsNotFound());
1223 
1224   s = txn->Get(read_options, "B", &value);
1225   ASSERT_TRUE(s.IsNotFound());
1226 
1227   ASSERT_OK(txn->RollbackToSavePoint());  // Rollback to 3
1228 
1229   s = txn->Get(read_options, "F", &value);
1230   ASSERT_OK(s);
1231   ASSERT_EQ("f", value);
1232 
1233   s = txn->Get(read_options, "G", &value);
1234   ASSERT_TRUE(s.IsNotFound());
1235 
1236   s = txn->Commit();
1237   ASSERT_OK(s);
1238 
1239   s = txn_db->Get(read_options, "F", &value);
1240   ASSERT_OK(s);
1241   ASSERT_EQ("f", value);
1242 
1243   s = txn_db->Get(read_options, "G", &value);
1244   ASSERT_TRUE(s.IsNotFound());
1245 
1246   s = txn_db->Get(read_options, "A", &value);
1247   ASSERT_OK(s);
1248   ASSERT_EQ("aa", value);
1249 
1250   s = txn_db->Get(read_options, "B", &value);
1251   ASSERT_OK(s);
1252   ASSERT_EQ("b", value);
1253 
1254   s = txn_db->Get(read_options, "C", &value);
1255   ASSERT_TRUE(s.IsNotFound());
1256 
1257   s = txn_db->Get(read_options, "D", &value);
1258   ASSERT_TRUE(s.IsNotFound());
1259 
1260   s = txn_db->Get(read_options, "E", &value);
1261   ASSERT_TRUE(s.IsNotFound());
1262 
1263   delete txn;
1264 }
1265 
TEST_P(OptimisticTransactionTest,UndoGetForUpdateTest)1266 TEST_P(OptimisticTransactionTest, UndoGetForUpdateTest) {
1267   WriteOptions write_options;
1268   ReadOptions read_options, snapshot_read_options;
1269   OptimisticTransactionOptions txn_options;
1270   string value;
1271   Status s;
1272 
1273   txn_db->Put(write_options, "A", "");
1274 
1275   Transaction* txn1 = txn_db->BeginTransaction(write_options);
1276   ASSERT_TRUE(txn1);
1277 
1278   s = txn1->GetForUpdate(read_options, "A", &value);
1279   ASSERT_OK(s);
1280 
1281   txn1->UndoGetForUpdate("A");
1282 
1283   Transaction* txn2 = txn_db->BeginTransaction(write_options);
1284   txn2->Put("A", "x");
1285   s = txn2->Commit();
1286   ASSERT_OK(s);
1287   delete txn2;
1288 
1289   // Verify that txn1 can commit since A isn't conflict checked
1290   s = txn1->Commit();
1291   ASSERT_OK(s);
1292   delete txn1;
1293 
1294   txn1 = txn_db->BeginTransaction(write_options);
1295   txn1->Put("A", "a");
1296 
1297   s = txn1->GetForUpdate(read_options, "A", &value);
1298   ASSERT_OK(s);
1299 
1300   txn1->UndoGetForUpdate("A");
1301 
1302   txn2 = txn_db->BeginTransaction(write_options);
1303   txn2->Put("A", "x");
1304   s = txn2->Commit();
1305   ASSERT_OK(s);
1306   delete txn2;
1307 
1308   // Verify that txn1 cannot commit since A will still be conflict checked
1309   s = txn1->Commit();
1310   ASSERT_TRUE(s.IsBusy());
1311   delete txn1;
1312 
1313   txn1 = txn_db->BeginTransaction(write_options);
1314 
1315   s = txn1->GetForUpdate(read_options, "A", &value);
1316   ASSERT_OK(s);
1317   s = txn1->GetForUpdate(read_options, "A", &value);
1318   ASSERT_OK(s);
1319 
1320   txn1->UndoGetForUpdate("A");
1321 
1322   txn2 = txn_db->BeginTransaction(write_options);
1323   txn2->Put("A", "x");
1324   s = txn2->Commit();
1325   ASSERT_OK(s);
1326   delete txn2;
1327 
1328   // Verify that txn1 cannot commit since A will still be conflict checked
1329   s = txn1->Commit();
1330   ASSERT_TRUE(s.IsBusy());
1331   delete txn1;
1332 
1333   txn1 = txn_db->BeginTransaction(write_options);
1334 
1335   s = txn1->GetForUpdate(read_options, "A", &value);
1336   ASSERT_OK(s);
1337   s = txn1->GetForUpdate(read_options, "A", &value);
1338   ASSERT_OK(s);
1339 
1340   txn1->UndoGetForUpdate("A");
1341   txn1->UndoGetForUpdate("A");
1342 
1343   txn2 = txn_db->BeginTransaction(write_options);
1344   txn2->Put("A", "x");
1345   s = txn2->Commit();
1346   ASSERT_OK(s);
1347   delete txn2;
1348 
1349   // Verify that txn1 can commit since A isn't conflict checked
1350   s = txn1->Commit();
1351   ASSERT_OK(s);
1352   delete txn1;
1353 
1354   txn1 = txn_db->BeginTransaction(write_options);
1355 
1356   s = txn1->GetForUpdate(read_options, "A", &value);
1357   ASSERT_OK(s);
1358 
1359   txn1->SetSavePoint();
1360   txn1->UndoGetForUpdate("A");
1361 
1362   txn2 = txn_db->BeginTransaction(write_options);
1363   txn2->Put("A", "x");
1364   s = txn2->Commit();
1365   ASSERT_OK(s);
1366   delete txn2;
1367 
1368   // Verify that txn1 cannot commit since A will still be conflict checked
1369   s = txn1->Commit();
1370   ASSERT_TRUE(s.IsBusy());
1371   delete txn1;
1372 
1373   txn1 = txn_db->BeginTransaction(write_options);
1374 
1375   s = txn1->GetForUpdate(read_options, "A", &value);
1376   ASSERT_OK(s);
1377 
1378   txn1->SetSavePoint();
1379   s = txn1->GetForUpdate(read_options, "A", &value);
1380   ASSERT_OK(s);
1381   txn1->UndoGetForUpdate("A");
1382 
1383   txn2 = txn_db->BeginTransaction(write_options);
1384   txn2->Put("A", "x");
1385   s = txn2->Commit();
1386   ASSERT_OK(s);
1387   delete txn2;
1388 
1389   // Verify that txn1 cannot commit since A will still be conflict checked
1390   s = txn1->Commit();
1391   ASSERT_TRUE(s.IsBusy());
1392   delete txn1;
1393 
1394   txn1 = txn_db->BeginTransaction(write_options);
1395 
1396   s = txn1->GetForUpdate(read_options, "A", &value);
1397   ASSERT_OK(s);
1398 
1399   txn1->SetSavePoint();
1400   s = txn1->GetForUpdate(read_options, "A", &value);
1401   ASSERT_OK(s);
1402   txn1->UndoGetForUpdate("A");
1403 
1404   txn1->RollbackToSavePoint();
1405   txn1->UndoGetForUpdate("A");
1406 
1407   txn2 = txn_db->BeginTransaction(write_options);
1408   txn2->Put("A", "x");
1409   s = txn2->Commit();
1410   ASSERT_OK(s);
1411   delete txn2;
1412 
1413   // Verify that txn1 can commit since A isn't conflict checked
1414   s = txn1->Commit();
1415   ASSERT_OK(s);
1416   delete txn1;
1417 }
1418 
1419 namespace {
OptimisticTransactionStressTestInserter(OptimisticTransactionDB * db,const size_t num_transactions,const size_t num_sets,const size_t num_keys_per_set)1420 Status OptimisticTransactionStressTestInserter(OptimisticTransactionDB* db,
1421                                                const size_t num_transactions,
1422                                                const size_t num_sets,
1423                                                const size_t num_keys_per_set) {
1424   size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
1425   Random64 _rand(seed);
1426   WriteOptions write_options;
1427   ReadOptions read_options;
1428   OptimisticTransactionOptions txn_options;
1429   txn_options.set_snapshot = true;
1430 
1431   RandomTransactionInserter inserter(&_rand, write_options, read_options,
1432                                      num_keys_per_set,
1433                                      static_cast<uint16_t>(num_sets));
1434 
1435   for (size_t t = 0; t < num_transactions; t++) {
1436     bool success = inserter.OptimisticTransactionDBInsert(db, txn_options);
1437     if (!success) {
1438       // unexpected failure
1439       return inserter.GetLastStatus();
1440     }
1441   }
1442 
1443   // Make sure at least some of the transactions succeeded.  It's ok if
1444   // some failed due to write-conflicts.
1445   if (inserter.GetFailureCount() > num_transactions / 2) {
1446     return Status::TryAgain("Too many transactions failed! " +
1447                             std::to_string(inserter.GetFailureCount()) + " / " +
1448                             std::to_string(num_transactions));
1449   }
1450 
1451   return Status::OK();
1452 }
1453 }  // namespace
1454 
TEST_P(OptimisticTransactionTest,OptimisticTransactionStressTest)1455 TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) {
1456   const size_t num_threads = 4;
1457   const size_t num_transactions_per_thread = 10000;
1458   const size_t num_sets = 3;
1459   const size_t num_keys_per_set = 100;
1460   // Setting the key-space to be 100 keys should cause enough write-conflicts
1461   // to make this test interesting.
1462 
1463   std::vector<port::Thread> threads;
1464 
1465   std::function<void()> call_inserter = [&] {
1466     ASSERT_OK(OptimisticTransactionStressTestInserter(
1467         txn_db, num_transactions_per_thread, num_sets, num_keys_per_set));
1468   };
1469 
1470   // Create N threads that use RandomTransactionInserter to write
1471   // many transactions.
1472   for (uint32_t i = 0; i < num_threads; i++) {
1473     threads.emplace_back(call_inserter);
1474   }
1475 
1476   // Wait for all threads to run
1477   for (auto& t : threads) {
1478     t.join();
1479   }
1480 
1481   // Verify that data is consistent
1482   Status s = RandomTransactionInserter::Verify(txn_db, num_sets);
1483   ASSERT_OK(s);
1484 }
1485 
TEST_P(OptimisticTransactionTest,SequenceNumberAfterRecoverTest)1486 TEST_P(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) {
1487   WriteOptions write_options;
1488   OptimisticTransactionOptions transaction_options;
1489 
1490   Transaction* transaction(txn_db->BeginTransaction(write_options, transaction_options));
1491   Status s = transaction->Put("foo", "val");
1492   ASSERT_OK(s);
1493   s = transaction->Put("foo2", "val");
1494   ASSERT_OK(s);
1495   s = transaction->Put("foo3", "val");
1496   ASSERT_OK(s);
1497   s = transaction->Commit();
1498   ASSERT_OK(s);
1499   delete transaction;
1500 
1501   Reopen();
1502   transaction = txn_db->BeginTransaction(write_options, transaction_options);
1503   s = transaction->Put("bar", "val");
1504   ASSERT_OK(s);
1505   s = transaction->Put("bar2", "val");
1506   ASSERT_OK(s);
1507   s = transaction->Commit();
1508   ASSERT_OK(s);
1509 
1510   delete transaction;
1511 }
1512 
1513 INSTANTIATE_TEST_CASE_P(
1514     InstanceOccGroup, OptimisticTransactionTest,
1515     testing::Values(OccValidationPolicy::kValidateSerial,
1516                     OccValidationPolicy::kValidateParallel));
1517 
1518 }  // namespace ROCKSDB_NAMESPACE
1519 
main(int argc,char ** argv)1520 int main(int argc, char** argv) {
1521   ::testing::InitGoogleTest(&argc, argv);
1522   return RUN_ALL_TESTS();
1523 }
1524 
1525 #else
1526 #include <stdio.h>
1527 
main(int,char **)1528 int main(int /*argc*/, char** /*argv*/) {
1529   fprintf(
1530       stderr,
1531       "SKIPPED as optimistic_transaction is not supported in ROCKSDB_LITE\n");
1532   return 0;
1533 }
1534 
1535 #endif  // !ROCKSDB_LITE
1536