1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/transaction_test.h"
9
10 #include <algorithm>
11 #include <functional>
12 #include <string>
13 #include <thread>
14
15 #include "db/db_impl/db_impl.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/options.h"
18 #include "rocksdb/perf_context.h"
19 #include "rocksdb/utilities/transaction.h"
20 #include "rocksdb/utilities/transaction_db.h"
21 #include "table/mock_table.h"
22 #include "test_util/fault_injection_test_env.h"
23 #include "test_util/sync_point.h"
24 #include "test_util/testharness.h"
25 #include "test_util/testutil.h"
26 #include "test_util/transaction_test_util.h"
27 #include "util/random.h"
28 #include "util/string_util.h"
29 #include "utilities/merge_operators.h"
30 #include "utilities/merge_operators/string_append/stringappend.h"
31 #include "utilities/transactions/pessimistic_transaction_db.h"
32
33 #include "port/port.h"
34
35 using std::string;
36
37 namespace ROCKSDB_NAMESPACE {
38
39 INSTANTIATE_TEST_CASE_P(
40 DBAsBaseDB, TransactionTest,
41 ::testing::Values(
42 std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
43 std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
44 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
45 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
46 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
47 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
48 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
49 INSTANTIATE_TEST_CASE_P(
50 DBAsBaseDB, TransactionStressTest,
51 ::testing::Values(
52 std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
53 std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
54 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
55 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
56 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
57 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
58 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
59 INSTANTIATE_TEST_CASE_P(
60 StackableDBAsBaseDB, TransactionTest,
61 ::testing::Values(
62 std::make_tuple(true, true, WRITE_COMMITTED, kOrderedWrite),
63 std::make_tuple(true, true, WRITE_PREPARED, kOrderedWrite),
64 std::make_tuple(true, true, WRITE_UNPREPARED, kOrderedWrite)));
65
66 // MySQLStyleTransactionTest takes far too long for valgrind to run.
67 #ifndef ROCKSDB_VALGRIND_RUN
68 INSTANTIATE_TEST_CASE_P(
69 MySQLStyleTransactionTest, MySQLStyleTransactionTest,
70 ::testing::Values(
71 std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite, false),
72 std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite, false),
73 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, false),
74 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, true),
75 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, false),
76 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, true),
77 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, false),
78 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, true),
79 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, false),
80 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, true),
81 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, false),
82 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, true)));
83 #endif // ROCKSDB_VALGRIND_RUN
84
TEST_P(TransactionTest,DoubleEmptyWrite)85 TEST_P(TransactionTest, DoubleEmptyWrite) {
86 WriteOptions write_options;
87 write_options.sync = true;
88 write_options.disableWAL = false;
89
90 WriteBatch batch;
91
92 ASSERT_OK(db->Write(write_options, &batch));
93 ASSERT_OK(db->Write(write_options, &batch));
94
95 // Also test committing empty transactions in 2PC
96 TransactionOptions txn_options;
97 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
98 ASSERT_OK(txn0->SetName("xid"));
99 ASSERT_OK(txn0->Prepare());
100 ASSERT_OK(txn0->Commit());
101 delete txn0;
102
103 // Also test that it works during recovery
104 txn0 = db->BeginTransaction(write_options, txn_options);
105 ASSERT_OK(txn0->SetName("xid2"));
106 txn0->Put(Slice("foo0"), Slice("bar0a"));
107 ASSERT_OK(txn0->Prepare());
108 delete txn0;
109 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
110 ASSERT_OK(ReOpenNoDelete());
111 assert(db != nullptr);
112 txn0 = db->GetTransactionByName("xid2");
113 ASSERT_OK(txn0->Commit());
114 delete txn0;
115 }
116
TEST_P(TransactionTest,SuccessTest)117 TEST_P(TransactionTest, SuccessTest) {
118 ASSERT_OK(db->ResetStats());
119
120 WriteOptions write_options;
121 ReadOptions read_options;
122 std::string value;
123
124 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
125 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
126
127 Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
128 ASSERT_TRUE(txn);
129
130 ASSERT_EQ(0, txn->GetNumPuts());
131 ASSERT_LE(0, txn->GetID());
132
133 ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
134 ASSERT_EQ(value, "bar");
135
136 ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
137
138 ASSERT_EQ(1, txn->GetNumPuts());
139
140 ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
141 ASSERT_EQ(value, "bar2");
142
143 ASSERT_OK(txn->Commit());
144
145 ASSERT_OK(db->Get(read_options, "foo", &value));
146 ASSERT_EQ(value, "bar2");
147
148 delete txn;
149 }
150
151 // The test clarifies the contract of do_validate and assume_tracked
152 // in GetForUpdate and Put/Merge/Delete
TEST_P(TransactionTest,AssumeExclusiveTracked)153 TEST_P(TransactionTest, AssumeExclusiveTracked) {
154 WriteOptions write_options;
155 ReadOptions read_options;
156 std::string value;
157 Status s;
158 TransactionOptions txn_options;
159 txn_options.lock_timeout = 1;
160 const bool EXCLUSIVE = true;
161 const bool DO_VALIDATE = true;
162 const bool ASSUME_LOCKED = true;
163
164 Transaction* txn = db->BeginTransaction(write_options, txn_options);
165 ASSERT_TRUE(txn);
166 txn->SetSnapshot();
167
168 // commit a value after the snapshot is taken
169 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
170
171 // By default write should fail to the commit after our snapshot
172 s = txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE);
173 ASSERT_TRUE(s.IsBusy());
174 // But the user could direct the db to skip validating the snapshot. The read
175 // value then should be the most recently committed
176 ASSERT_OK(
177 txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE, !DO_VALIDATE));
178 ASSERT_EQ(value, "bar");
179
180 // Although ValidateSnapshot is skipped the key must have still got locked
181 s = db->Put(write_options, Slice("foo"), Slice("bar"));
182 ASSERT_TRUE(s.IsTimedOut());
183
184 // By default the write operations should fail due to the commit after the
185 // snapshot
186 s = txn->Put(Slice("foo"), Slice("bar1"));
187 ASSERT_TRUE(s.IsBusy());
188 s = txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
189 !ASSUME_LOCKED);
190 ASSERT_TRUE(s.IsBusy());
191 // But the user could direct the db that it already assumes exclusive lock on
192 // the key due to the previous GetForUpdate call.
193 ASSERT_OK(txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
194 ASSUME_LOCKED));
195 ASSERT_OK(txn->Merge(db->DefaultColumnFamily(), Slice("foo"), Slice("bar2"),
196 ASSUME_LOCKED));
197 ASSERT_OK(
198 txn->Delete(db->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED));
199 ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"),
200 ASSUME_LOCKED));
201
202 txn->Rollback();
203 delete txn;
204 }
205
206 // This test clarifies the contract of ValidateSnapshot
TEST_P(TransactionTest,ValidateSnapshotTest)207 TEST_P(TransactionTest, ValidateSnapshotTest) {
208 for (bool with_flush : {true}) {
209 for (bool with_2pc : {true}) {
210 ASSERT_OK(ReOpen());
211 WriteOptions write_options;
212 ReadOptions read_options;
213 std::string value;
214
215 assert(db != nullptr);
216 Transaction* txn1 =
217 db->BeginTransaction(write_options, TransactionOptions());
218 ASSERT_TRUE(txn1);
219 ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1")));
220 if (with_2pc) {
221 ASSERT_OK(txn1->SetName("xid1"));
222 ASSERT_OK(txn1->Prepare());
223 }
224
225 if (with_flush) {
226 auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
227 db_impl->TEST_FlushMemTable(true);
228 // Make sure the flushed memtable is not kept in memory
229 int max_memtable_in_history =
230 std::max(
231 options.max_write_buffer_number,
232 static_cast<int>(options.max_write_buffer_size_to_maintain) /
233 static_cast<int>(options.write_buffer_size)) +
234 1;
235 for (int i = 0; i < max_memtable_in_history; i++) {
236 db->Put(write_options, Slice("key"), Slice("value"));
237 db_impl->TEST_FlushMemTable(true);
238 }
239 }
240
241 Transaction* txn2 =
242 db->BeginTransaction(write_options, TransactionOptions());
243 ASSERT_TRUE(txn2);
244 txn2->SetSnapshot();
245
246 ASSERT_OK(txn1->Commit());
247 delete txn1;
248
249 auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
250 // Test the simple case where the key is not tracked yet
251 auto trakced_seq = kMaxSequenceNumber;
252 auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
253 &trakced_seq);
254 ASSERT_TRUE(s.IsBusy());
255 delete txn2;
256 }
257 }
258 }
259
TEST_P(TransactionTest,WaitingTxn)260 TEST_P(TransactionTest, WaitingTxn) {
261 WriteOptions write_options;
262 ReadOptions read_options;
263 TransactionOptions txn_options;
264 string value;
265 Status s;
266
267 txn_options.lock_timeout = 1;
268 s = db->Put(write_options, Slice("foo"), Slice("bar"));
269 ASSERT_OK(s);
270
271 /* create second cf */
272 ColumnFamilyHandle* cfa;
273 ColumnFamilyOptions cf_options;
274 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
275 ASSERT_OK(s);
276 s = db->Put(write_options, cfa, Slice("foo"), Slice("bar"));
277 ASSERT_OK(s);
278
279 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
280 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
281 TransactionID id1 = txn1->GetID();
282 ASSERT_TRUE(txn1);
283 ASSERT_TRUE(txn2);
284
285 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
286 "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) {
287 std::string key;
288 uint32_t cf_id;
289 std::vector<TransactionID> wait = txn2->GetWaitingTxns(&cf_id, &key);
290 ASSERT_EQ(key, "foo");
291 ASSERT_EQ(wait.size(), 1);
292 ASSERT_EQ(wait[0], id1);
293 ASSERT_EQ(cf_id, 0U);
294 });
295
296 get_perf_context()->Reset();
297 // lock key in default cf
298 s = txn1->GetForUpdate(read_options, "foo", &value);
299 ASSERT_OK(s);
300 ASSERT_EQ(value, "bar");
301 ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
302
303 // lock key in cfa
304 s = txn1->GetForUpdate(read_options, cfa, "foo", &value);
305 ASSERT_OK(s);
306 ASSERT_EQ(value, "bar");
307 ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
308
309 auto lock_data = db->GetLockStatusData();
310 // Locked keys exist in both column family.
311 ASSERT_EQ(lock_data.size(), 2);
312
313 auto cf_iterator = lock_data.begin();
314
315 // The iterator points to an unordered_multimap
316 // thus the test can not assume any particular order.
317
318 // Column family is 1 or 0 (cfa).
319 if (cf_iterator->first != 1 && cf_iterator->first != 0) {
320 FAIL();
321 }
322 // The locked key is "foo" and is locked by txn1
323 ASSERT_EQ(cf_iterator->second.key, "foo");
324 ASSERT_EQ(cf_iterator->second.ids.size(), 1);
325 ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
326
327 cf_iterator++;
328
329 // Column family is 0 (default) or 1.
330 if (cf_iterator->first != 1 && cf_iterator->first != 0) {
331 FAIL();
332 }
333 // The locked key is "foo" and is locked by txn1
334 ASSERT_EQ(cf_iterator->second.key, "foo");
335 ASSERT_EQ(cf_iterator->second.ids.size(), 1);
336 ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
337
338 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
339
340 s = txn2->GetForUpdate(read_options, "foo", &value);
341 ASSERT_TRUE(s.IsTimedOut());
342 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
343 ASSERT_EQ(get_perf_context()->key_lock_wait_count, 1);
344 ASSERT_GE(get_perf_context()->key_lock_wait_time, 0);
345
346 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
347 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
348
349 delete cfa;
350 delete txn1;
351 delete txn2;
352 }
353
TEST_P(TransactionTest,SharedLocks)354 TEST_P(TransactionTest, SharedLocks) {
355 WriteOptions write_options;
356 ReadOptions read_options;
357 TransactionOptions txn_options;
358 Status s;
359
360 txn_options.lock_timeout = 1;
361 s = db->Put(write_options, Slice("foo"), Slice("bar"));
362 ASSERT_OK(s);
363
364 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
365 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
366 Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
367 ASSERT_TRUE(txn1);
368 ASSERT_TRUE(txn2);
369 ASSERT_TRUE(txn3);
370
371 // Test shared access between txns
372 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
373 ASSERT_OK(s);
374
375 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
376 ASSERT_OK(s);
377
378 s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
379 ASSERT_OK(s);
380
381 auto lock_data = db->GetLockStatusData();
382 ASSERT_EQ(lock_data.size(), 1);
383
384 auto cf_iterator = lock_data.begin();
385 ASSERT_EQ(cf_iterator->second.key, "foo");
386
387 // We compare whether the set of txns locking this key is the same. To do
388 // this, we need to sort both vectors so that the comparison is done
389 // correctly.
390 std::vector<TransactionID> expected_txns = {txn1->GetID(), txn2->GetID(),
391 txn3->GetID()};
392 std::vector<TransactionID> lock_txns = cf_iterator->second.ids;
393 ASSERT_EQ(expected_txns, lock_txns);
394 ASSERT_FALSE(cf_iterator->second.exclusive);
395
396 txn1->Rollback();
397 txn2->Rollback();
398 txn3->Rollback();
399
400 // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
401 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
402 ASSERT_OK(s);
403
404 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
405 ASSERT_OK(s);
406
407 s = txn3->GetForUpdate(read_options, "foo", nullptr);
408 ASSERT_TRUE(s.IsTimedOut());
409 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
410
411 txn1->UndoGetForUpdate("foo");
412 s = txn3->GetForUpdate(read_options, "foo", nullptr);
413 ASSERT_TRUE(s.IsTimedOut());
414 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
415
416 txn2->UndoGetForUpdate("foo");
417 s = txn3->GetForUpdate(read_options, "foo", nullptr);
418 ASSERT_OK(s);
419
420 txn1->Rollback();
421 txn2->Rollback();
422 txn3->Rollback();
423
424 // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
425 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
426 ASSERT_OK(s);
427
428 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
429 ASSERT_OK(s);
430
431 s = txn2->GetForUpdate(read_options, "foo", nullptr);
432 ASSERT_TRUE(s.IsTimedOut());
433 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
434
435 txn1->UndoGetForUpdate("foo");
436 s = txn2->GetForUpdate(read_options, "foo", nullptr);
437 ASSERT_OK(s);
438
439 ASSERT_OK(txn1->Rollback());
440 ASSERT_OK(txn2->Rollback());
441
442 // Test txn1 trying to downgrade its lock.
443 s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */);
444 ASSERT_OK(s);
445
446 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
447 ASSERT_TRUE(s.IsTimedOut());
448 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
449
450 // Should still fail after "downgrading".
451 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
452 ASSERT_OK(s);
453
454 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
455 ASSERT_TRUE(s.IsTimedOut());
456 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
457
458 txn1->Rollback();
459 txn2->Rollback();
460
461 // Test txn1 holding an exclusive lock and txn2 trying to obtain shared
462 // access.
463 s = txn1->GetForUpdate(read_options, "foo", nullptr);
464 ASSERT_OK(s);
465
466 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
467 ASSERT_TRUE(s.IsTimedOut());
468 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
469
470 txn1->UndoGetForUpdate("foo");
471 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
472 ASSERT_OK(s);
473
474 delete txn1;
475 delete txn2;
476 delete txn3;
477 }
478
TEST_P(TransactionTest,DeadlockCycleShared)479 TEST_P(TransactionTest, DeadlockCycleShared) {
480 WriteOptions write_options;
481 ReadOptions read_options;
482 TransactionOptions txn_options;
483
484 txn_options.lock_timeout = 1000000;
485 txn_options.deadlock_detect = true;
486
487 // Set up a wait for chain like this:
488 //
489 // Tn -> T(n*2)
490 // Tn -> T(n*2 + 1)
491 //
492 // So we have:
493 // T1 -> T2 -> T4 ...
494 // | |> T5 ...
495 // |> T3 -> T6 ...
496 // |> T7 ...
497 // up to T31, then T[16 - 31] -> T1.
498 // Note that Tn holds lock on floor(n / 2).
499
500 std::vector<Transaction*> txns(31);
501
502 for (uint32_t i = 0; i < 31; i++) {
503 txns[i] = db->BeginTransaction(write_options, txn_options);
504 ASSERT_TRUE(txns[i]);
505 auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr,
506 false /* exclusive */);
507 ASSERT_OK(s);
508 }
509
510 std::atomic<uint32_t> checkpoints(0);
511 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
512 "TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
513 [&](void* /*arg*/) { checkpoints.fetch_add(1); });
514 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
515
516 // We want the leaf transactions to block and hold everyone back.
517 std::vector<port::Thread> threads;
518 for (uint32_t i = 0; i < 15; i++) {
519 std::function<void()> blocking_thread = [&, i] {
520 auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr,
521 true /* exclusive */);
522 ASSERT_OK(s);
523 txns[i]->Rollback();
524 delete txns[i];
525 };
526 threads.emplace_back(blocking_thread);
527 }
528
529 // Wait until all threads are waiting on each other.
530 while (checkpoints.load() != 15) {
531 /* sleep override */
532 std::this_thread::sleep_for(std::chrono::milliseconds(100));
533 }
534 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
535 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
536
537 // Complete the cycle T[16 - 31] -> T1
538 for (uint32_t i = 15; i < 31; i++) {
539 auto s =
540 txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */);
541 ASSERT_TRUE(s.IsDeadlock());
542
543 // Calculate next buffer len, plateau at 5 when 5 records are inserted.
544 const uint32_t curr_dlock_buffer_len_ =
545 (i - 14 > kInitialMaxDeadlocks) ? kInitialMaxDeadlocks : (i - 14);
546
547 auto dlock_buffer = db->GetDeadlockInfoBuffer();
548 ASSERT_EQ(dlock_buffer.size(), curr_dlock_buffer_len_);
549 auto dlock_entry = dlock_buffer[0].path;
550 ASSERT_EQ(dlock_entry.size(), kInitialMaxDeadlocks);
551 int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
552 int64_t cur_deadlock_time = 0;
553 for (auto const& dl_path_rec : dlock_buffer) {
554 cur_deadlock_time = dl_path_rec.deadlock_time;
555 ASSERT_NE(cur_deadlock_time, 0);
556 ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
557 pre_deadlock_time = cur_deadlock_time;
558 }
559
560 int64_t curr_waiting_key = 0;
561
562 // Offset of each txn id from the root of the shared dlock tree's txn id.
563 int64_t offset_root = dlock_entry[0].m_txn_id - 1;
564 // Offset of the final entry in the dlock path from the root's txn id.
565 TransactionID leaf_id =
566 dlock_entry[dlock_entry.size() - 1].m_txn_id - offset_root;
567
568 for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
569 auto dl_node = *it;
570 ASSERT_EQ(dl_node.m_txn_id, offset_root + leaf_id);
571 ASSERT_EQ(dl_node.m_cf_id, 0U);
572 ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
573 ASSERT_EQ(dl_node.m_exclusive, true);
574
575 if (curr_waiting_key == 0) {
576 curr_waiting_key = leaf_id;
577 }
578 curr_waiting_key /= 2;
579 leaf_id /= 2;
580 }
581 }
582
583 // Rollback the leaf transaction.
584 for (uint32_t i = 15; i < 31; i++) {
585 txns[i]->Rollback();
586 delete txns[i];
587 }
588
589 for (auto& t : threads) {
590 t.join();
591 }
592
593 // Downsize the buffer and verify the 3 latest deadlocks are preserved.
594 auto dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
595 db->SetDeadlockInfoBufferSize(3);
596 auto dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
597 ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
598
599 for (uint32_t i = 0; i < dlock_buffer_after_resize.size(); i++) {
600 for (uint32_t j = 0; j < dlock_buffer_after_resize[i].path.size(); j++) {
601 ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
602 dlock_buffer_before_resize[i].path[j].m_txn_id);
603 }
604 }
605
606 // Upsize the buffer and verify the 3 latest dealocks are preserved.
607 dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
608 db->SetDeadlockInfoBufferSize(5);
609 dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
610 ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
611
612 for (uint32_t i = 0; i < dlock_buffer_before_resize.size(); i++) {
613 for (uint32_t j = 0; j < dlock_buffer_before_resize[i].path.size(); j++) {
614 ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
615 dlock_buffer_before_resize[i].path[j].m_txn_id);
616 }
617 }
618
619 // Downsize to 0 and verify the size is consistent.
620 dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
621 db->SetDeadlockInfoBufferSize(0);
622 dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
623 ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
624
625 // Upsize from 0 to verify the size is persistent.
626 dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
627 db->SetDeadlockInfoBufferSize(3);
628 dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
629 ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
630
631 // Contrived case of shared lock of cycle size 2 to verify that a shared
632 // lock causing a deadlock is correctly reported as "shared" in the buffer.
633 std::vector<Transaction*> txns_shared(2);
634
635 // Create a cycle of size 2.
636 for (uint32_t i = 0; i < 2; i++) {
637 txns_shared[i] = db->BeginTransaction(write_options, txn_options);
638 ASSERT_TRUE(txns_shared[i]);
639 auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i), nullptr);
640 ASSERT_OK(s);
641 }
642
643 std::atomic<uint32_t> checkpoints_shared(0);
644 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
645 "TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
646 [&](void* /*arg*/) { checkpoints_shared.fetch_add(1); });
647 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
648
649 std::vector<port::Thread> threads_shared;
650 for (uint32_t i = 0; i < 1; i++) {
651 std::function<void()> blocking_thread = [&, i] {
652 auto s =
653 txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
654 ASSERT_OK(s);
655 txns_shared[i]->Rollback();
656 delete txns_shared[i];
657 };
658 threads_shared.emplace_back(blocking_thread);
659 }
660
661 // Wait until all threads are waiting on each other.
662 while (checkpoints_shared.load() != 1) {
663 /* sleep override */
664 std::this_thread::sleep_for(std::chrono::milliseconds(100));
665 }
666 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
667 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
668
669 // Complete the cycle T2 -> T1 with a shared lock.
670 auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false);
671 ASSERT_TRUE(s.IsDeadlock());
672
673 auto dlock_buffer = db->GetDeadlockInfoBuffer();
674
675 // Verify the size of the buffer and the single path.
676 ASSERT_EQ(dlock_buffer.size(), 1);
677 ASSERT_EQ(dlock_buffer[0].path.size(), 2);
678
679 // Verify the exclusivity field of the transactions in the deadlock path.
680 ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive);
681 ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive);
682 txns_shared[1]->Rollback();
683 delete txns_shared[1];
684
685 for (auto& t : threads_shared) {
686 t.join();
687 }
688 }
689
690 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionStressTest,DeadlockCycle)691 TEST_P(TransactionStressTest, DeadlockCycle) {
692 WriteOptions write_options;
693 ReadOptions read_options;
694 TransactionOptions txn_options;
695
696 // offset by 2 from the max depth to test edge case
697 const uint32_t kMaxCycleLength = 52;
698
699 txn_options.lock_timeout = 1000000;
700 txn_options.deadlock_detect = true;
701
702 for (uint32_t len = 2; len < kMaxCycleLength; len++) {
703 // Set up a long wait for chain like this:
704 //
705 // T1 -> T2 -> T3 -> ... -> Tlen
706
707 std::vector<Transaction*> txns(len);
708
709 for (uint32_t i = 0; i < len; i++) {
710 txns[i] = db->BeginTransaction(write_options, txn_options);
711 ASSERT_TRUE(txns[i]);
712 auto s = txns[i]->GetForUpdate(read_options, ToString(i), nullptr);
713 ASSERT_OK(s);
714 }
715
716 std::atomic<uint32_t> checkpoints(0);
717 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
718 "TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
719 [&](void* /*arg*/) { checkpoints.fetch_add(1); });
720 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
721
722 // We want the last transaction in the chain to block and hold everyone
723 // back.
724 std::vector<port::Thread> threads;
725 for (uint32_t i = 0; i < len - 1; i++) {
726 std::function<void()> blocking_thread = [&, i] {
727 auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
728 ASSERT_OK(s);
729 txns[i]->Rollback();
730 delete txns[i];
731 };
732 threads.emplace_back(blocking_thread);
733 }
734
735 // Wait until all threads are waiting on each other.
736 while (checkpoints.load() != len - 1) {
737 /* sleep override */
738 std::this_thread::sleep_for(std::chrono::milliseconds(100));
739 }
740 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
741 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
742
743 // Complete the cycle Tlen -> T1
744 auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr);
745 ASSERT_TRUE(s.IsDeadlock());
746
747 const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1);
748 uint32_t curr_waiting_key = 0;
749 TransactionID curr_txn_id = txns[0]->GetID();
750
751 auto dlock_buffer = db->GetDeadlockInfoBuffer();
752 ASSERT_EQ(dlock_buffer.size(), dlock_buffer_size_);
753 uint32_t check_len = len;
754 bool check_limit_flag = false;
755
756 // Special case for a deadlock path that exceeds the maximum depth.
757 if (len > 50) {
758 check_len = 0;
759 check_limit_flag = true;
760 }
761 auto dlock_entry = dlock_buffer[0].path;
762 ASSERT_EQ(dlock_entry.size(), check_len);
763 ASSERT_EQ(dlock_buffer[0].limit_exceeded, check_limit_flag);
764
765 int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
766 int64_t cur_deadlock_time = 0;
767 for (auto const& dl_path_rec : dlock_buffer) {
768 cur_deadlock_time = dl_path_rec.deadlock_time;
769 ASSERT_NE(cur_deadlock_time, 0);
770 ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
771 pre_deadlock_time = cur_deadlock_time;
772 }
773
774 // Iterates backwards over path verifying decreasing txn_ids.
775 for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
776 auto dl_node = *it;
777 ASSERT_EQ(dl_node.m_txn_id, len + curr_txn_id - 1);
778 ASSERT_EQ(dl_node.m_cf_id, 0u);
779 ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
780 ASSERT_EQ(dl_node.m_exclusive, true);
781
782 curr_txn_id--;
783 if (curr_waiting_key == 0) {
784 curr_waiting_key = len;
785 }
786 curr_waiting_key--;
787 }
788
789 // Rollback the last transaction.
790 txns[len - 1]->Rollback();
791 delete txns[len - 1];
792
793 for (auto& t : threads) {
794 t.join();
795 }
796 }
797 }
798
TEST_P(TransactionStressTest,DeadlockStress)799 TEST_P(TransactionStressTest, DeadlockStress) {
800 const uint32_t NUM_TXN_THREADS = 10;
801 const uint32_t NUM_KEYS = 100;
802 const uint32_t NUM_ITERS = 10000;
803
804 WriteOptions write_options;
805 ReadOptions read_options;
806 TransactionOptions txn_options;
807
808 txn_options.lock_timeout = 1000000;
809 txn_options.deadlock_detect = true;
810 std::vector<std::string> keys;
811
812 for (uint32_t i = 0; i < NUM_KEYS; i++) {
813 db->Put(write_options, Slice(ToString(i)), Slice(""));
814 keys.push_back(ToString(i));
815 }
816
817 size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
818 Random rnd(static_cast<uint32_t>(tid));
819 std::function<void(uint32_t)> stress_thread = [&](uint32_t seed) {
820 std::default_random_engine g(seed);
821
822 Transaction* txn;
823 for (uint32_t i = 0; i < NUM_ITERS; i++) {
824 txn = db->BeginTransaction(write_options, txn_options);
825 auto random_keys = keys;
826 std::shuffle(random_keys.begin(), random_keys.end(), g);
827
828 // Lock keys in random order.
829 for (const auto& k : random_keys) {
830 // Lock mostly for shared access, but exclusive 1/4 of the time.
831 auto s =
832 txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0);
833 if (!s.ok()) {
834 ASSERT_TRUE(s.IsDeadlock());
835 txn->Rollback();
836 break;
837 }
838 }
839
840 delete txn;
841 }
842 };
843
844 std::vector<port::Thread> threads;
845 for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
846 threads.emplace_back(stress_thread, rnd.Next());
847 }
848
849 for (auto& t : threads) {
850 t.join();
851 }
852 }
853 #endif // ROCKSDB_VALGRIND_RUN
854
TEST_P(TransactionTest,CommitTimeBatchFailTest)855 TEST_P(TransactionTest, CommitTimeBatchFailTest) {
856 WriteOptions write_options;
857 TransactionOptions txn_options;
858
859 std::string value;
860 Status s;
861
862 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
863 ASSERT_TRUE(txn1);
864
865 ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"));
866
867 s = txn1->Put("foo", "bar");
868 ASSERT_OK(s);
869
870 // fails due to non-empty commit-time batch
871 s = txn1->Commit();
872 ASSERT_EQ(s, Status::InvalidArgument());
873
874 delete txn1;
875 }
876
TEST_P(TransactionTest,LogMarkLeakTest)877 TEST_P(TransactionTest, LogMarkLeakTest) {
878 TransactionOptions txn_options;
879 WriteOptions write_options;
880 options.write_buffer_size = 1024;
881 ASSERT_OK(ReOpenNoDelete());
882 assert(db != nullptr);
883 Random rnd(47);
884 std::vector<Transaction*> txns;
885 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
886 // At the beginning there should be no log containing prepare data
887 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
888 for (size_t i = 0; i < 100; i++) {
889 Transaction* txn = db->BeginTransaction(write_options, txn_options);
890 ASSERT_OK(txn->SetName("xid" + ToString(i)));
891 ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar")));
892 ASSERT_OK(txn->Prepare());
893 ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
894 if (rnd.OneIn(5)) {
895 txns.push_back(txn);
896 } else {
897 ASSERT_OK(txn->Commit());
898 delete txn;
899 }
900 db_impl->TEST_FlushMemTable(true);
901 }
902 for (auto txn : txns) {
903 ASSERT_OK(txn->Commit());
904 delete txn;
905 }
906 // At the end there should be no log left containing prepare data
907 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
908 // Make sure that the underlying data structures are properly truncated and
909 // cause not leak
910 ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0);
911 ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0);
912 }
913
TEST_P(TransactionTest,SimpleTwoPhaseTransactionTest)914 TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
915 for (bool cwb4recovery : {true, false}) {
916 ASSERT_OK(ReOpen());
917 WriteOptions write_options;
918 ReadOptions read_options;
919
920 TransactionOptions txn_options;
921 txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery;
922
923 string value;
924 Status s;
925
926 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
927
928 Transaction* txn = db->BeginTransaction(write_options, txn_options);
929 s = txn->SetName("xid");
930 ASSERT_OK(s);
931
932 ASSERT_EQ(db->GetTransactionByName("xid"), txn);
933
934 // transaction put
935 s = txn->Put(Slice("foo"), Slice("bar"));
936 ASSERT_OK(s);
937 ASSERT_EQ(1, txn->GetNumPuts());
938
939 // regular db put
940 s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
941 ASSERT_OK(s);
942 ASSERT_EQ(1, txn->GetNumPuts());
943
944 // regular db read
945 db->Get(read_options, "foo2", &value);
946 ASSERT_EQ(value, "bar2");
947
948 // commit time put
949 txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs"));
950 txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats"));
951
952 // nothing has been prepped yet
953 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
954
955 s = txn->Prepare();
956 ASSERT_OK(s);
957
958 // data not im mem yet
959 s = db->Get(read_options, Slice("foo"), &value);
960 ASSERT_TRUE(s.IsNotFound());
961 s = db->Get(read_options, Slice("gtid"), &value);
962 ASSERT_TRUE(s.IsNotFound());
963
964 // find trans in list of prepared transactions
965 std::vector<Transaction*> prepared_trans;
966 db->GetAllPreparedTransactions(&prepared_trans);
967 ASSERT_EQ(prepared_trans.size(), 1);
968 ASSERT_EQ(prepared_trans.front()->GetName(), "xid");
969
970 auto log_containing_prep =
971 db_impl->TEST_FindMinLogContainingOutstandingPrep();
972 ASSERT_GT(log_containing_prep, 0);
973
974 // make commit
975 s = txn->Commit();
976 ASSERT_OK(s);
977
978 // value is now available
979 s = db->Get(read_options, "foo", &value);
980 ASSERT_OK(s);
981 ASSERT_EQ(value, "bar");
982
983 if (!cwb4recovery) {
984 s = db->Get(read_options, "gtid", &value);
985 ASSERT_OK(s);
986 ASSERT_EQ(value, "dogs");
987
988 s = db->Get(read_options, "gtid2", &value);
989 ASSERT_OK(s);
990 ASSERT_EQ(value, "cats");
991 }
992
993 // we already committed
994 s = txn->Commit();
995 ASSERT_EQ(s, Status::InvalidArgument());
996
997 // no longer is prepared results
998 db->GetAllPreparedTransactions(&prepared_trans);
999 ASSERT_EQ(prepared_trans.size(), 0);
1000 ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1001
1002 // heap should not care about prepared section anymore
1003 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1004
1005 switch (txn_db_options.write_policy) {
1006 case WRITE_COMMITTED:
1007 // but now our memtable should be referencing the prep section
1008 ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1009 ASSERT_EQ(log_containing_prep,
1010 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1011 break;
1012 case WRITE_PREPARED:
1013 case WRITE_UNPREPARED:
1014 // In these modes memtable do not ref the prep sections
1015 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1016 break;
1017 default:
1018 assert(false);
1019 }
1020
1021 db_impl->TEST_FlushMemTable(true);
1022 // After flush the recoverable state must be visible
1023 if (cwb4recovery) {
1024 s = db->Get(read_options, "gtid", &value);
1025 ASSERT_OK(s);
1026 ASSERT_EQ(value, "dogs");
1027
1028 s = db->Get(read_options, "gtid2", &value);
1029 ASSERT_OK(s);
1030 ASSERT_EQ(value, "cats");
1031 }
1032
1033 // after memtable flush we can now relese the log
1034 ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
1035 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1036
1037 delete txn;
1038
1039 if (cwb4recovery) {
1040 // kill and reopen to trigger recovery
1041 s = ReOpenNoDelete();
1042 ASSERT_OK(s);
1043 assert(db != nullptr);
1044 s = db->Get(read_options, "gtid", &value);
1045 ASSERT_OK(s);
1046 ASSERT_EQ(value, "dogs");
1047
1048 s = db->Get(read_options, "gtid2", &value);
1049 ASSERT_OK(s);
1050 ASSERT_EQ(value, "cats");
1051 }
1052 }
1053 }
1054
TEST_P(TransactionTest,TwoPhaseNameTest)1055 TEST_P(TransactionTest, TwoPhaseNameTest) {
1056 Status s;
1057
1058 WriteOptions write_options;
1059 TransactionOptions txn_options;
1060 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1061 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1062 Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
1063 ASSERT_TRUE(txn3);
1064 delete txn3;
1065
1066 // cant prepare txn without name
1067 s = txn1->Prepare();
1068 ASSERT_EQ(s, Status::InvalidArgument());
1069
1070 // name too short
1071 s = txn1->SetName("");
1072 ASSERT_EQ(s, Status::InvalidArgument());
1073
1074 // name too long
1075 s = txn1->SetName(std::string(513, 'x'));
1076 ASSERT_EQ(s, Status::InvalidArgument());
1077
1078 // valid set name
1079 s = txn1->SetName("name1");
1080 ASSERT_OK(s);
1081
1082 // cant have duplicate name
1083 s = txn2->SetName("name1");
1084 ASSERT_EQ(s, Status::InvalidArgument());
1085
1086 // shouldn't be able to prepare
1087 s = txn2->Prepare();
1088 ASSERT_EQ(s, Status::InvalidArgument());
1089
1090 // valid name set
1091 s = txn2->SetName("name2");
1092 ASSERT_OK(s);
1093
1094 // cant reset name
1095 s = txn2->SetName("name3");
1096 ASSERT_EQ(s, Status::InvalidArgument());
1097
1098 ASSERT_EQ(txn1->GetName(), "name1");
1099 ASSERT_EQ(txn2->GetName(), "name2");
1100
1101 s = txn1->Prepare();
1102 ASSERT_OK(s);
1103
1104 // can't rename after prepare
1105 s = txn1->SetName("name4");
1106 ASSERT_EQ(s, Status::InvalidArgument());
1107
1108 txn1->Rollback();
1109 txn2->Rollback();
1110 delete txn1;
1111 delete txn2;
1112 }
1113
TEST_P(TransactionTest,TwoPhaseEmptyWriteTest)1114 TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
1115 for (bool cwb4recovery : {true, false}) {
1116 for (bool test_with_empty_wal : {true, false}) {
1117 if (!cwb4recovery && test_with_empty_wal) {
1118 continue;
1119 }
1120 ASSERT_OK(ReOpen());
1121 Status s;
1122 std::string value;
1123
1124 WriteOptions write_options;
1125 ReadOptions read_options;
1126 TransactionOptions txn_options;
1127 txn_options.use_only_the_last_commit_time_batch_for_recovery =
1128 cwb4recovery;
1129 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1130 ASSERT_TRUE(txn1);
1131 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1132 ASSERT_TRUE(txn2);
1133
1134 s = txn1->SetName("joe");
1135 ASSERT_OK(s);
1136
1137 s = txn2->SetName("bob");
1138 ASSERT_OK(s);
1139
1140 s = txn1->Prepare();
1141 ASSERT_OK(s);
1142
1143 s = txn1->Commit();
1144 ASSERT_OK(s);
1145
1146 delete txn1;
1147
1148 txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar"));
1149
1150 s = txn2->Prepare();
1151 ASSERT_OK(s);
1152
1153 s = txn2->Commit();
1154 ASSERT_OK(s);
1155
1156 delete txn2;
1157 if (!cwb4recovery) {
1158 s = db->Get(read_options, "foo", &value);
1159 ASSERT_OK(s);
1160 ASSERT_EQ(value, "bar");
1161 } else {
1162 if (test_with_empty_wal) {
1163 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1164 db_impl->TEST_FlushMemTable(true);
1165 // After flush the state must be visible
1166 s = db->Get(read_options, "foo", &value);
1167 ASSERT_OK(s);
1168 ASSERT_EQ(value, "bar");
1169 }
1170 db->FlushWAL(true);
1171 // kill and reopen to trigger recovery
1172 s = ReOpenNoDelete();
1173 ASSERT_OK(s);
1174 assert(db != nullptr);
1175 s = db->Get(read_options, "foo", &value);
1176 ASSERT_OK(s);
1177 ASSERT_EQ(value, "bar");
1178 }
1179 }
1180 }
1181 }
1182
1183 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionStressTest,TwoPhaseExpirationTest)1184 TEST_P(TransactionStressTest, TwoPhaseExpirationTest) {
1185 Status s;
1186
1187 WriteOptions write_options;
1188 TransactionOptions txn_options;
1189 txn_options.expiration = 500; // 500ms
1190 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1191 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1192 ASSERT_TRUE(txn1);
1193 ASSERT_TRUE(txn1);
1194
1195 s = txn1->SetName("joe");
1196 ASSERT_OK(s);
1197 s = txn2->SetName("bob");
1198 ASSERT_OK(s);
1199
1200 s = txn1->Prepare();
1201 ASSERT_OK(s);
1202
1203 /* sleep override */
1204 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1205
1206 s = txn1->Commit();
1207 ASSERT_OK(s);
1208
1209 s = txn2->Prepare();
1210 ASSERT_EQ(s, Status::Expired());
1211
1212 delete txn1;
1213 delete txn2;
1214 }
1215
TEST_P(TransactionTest,TwoPhaseRollbackTest)1216 TEST_P(TransactionTest, TwoPhaseRollbackTest) {
1217 WriteOptions write_options;
1218 ReadOptions read_options;
1219
1220 TransactionOptions txn_options;
1221
1222 std::string value;
1223 Status s;
1224
1225 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1226 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1227 s = txn->SetName("xid");
1228 ASSERT_OK(s);
1229
1230 // transaction put
1231 s = txn->Put(Slice("tfoo"), Slice("tbar"));
1232 ASSERT_OK(s);
1233
1234 // value is readable form txn
1235 s = txn->Get(read_options, Slice("tfoo"), &value);
1236 ASSERT_OK(s);
1237 ASSERT_EQ(value, "tbar");
1238
1239 // issue rollback
1240 s = txn->Rollback();
1241 ASSERT_OK(s);
1242
1243 // value is nolonger readable
1244 s = txn->Get(read_options, Slice("tfoo"), &value);
1245 ASSERT_TRUE(s.IsNotFound());
1246 ASSERT_EQ(txn->GetNumPuts(), 0);
1247
1248 // put new txn values
1249 s = txn->Put(Slice("tfoo2"), Slice("tbar2"));
1250 ASSERT_OK(s);
1251
1252 // new value is readable from txn
1253 s = txn->Get(read_options, Slice("tfoo2"), &value);
1254 ASSERT_OK(s);
1255 ASSERT_EQ(value, "tbar2");
1256
1257 s = txn->Prepare();
1258 ASSERT_OK(s);
1259
1260 // flush to next wal
1261 s = db->Put(write_options, Slice("foo"), Slice("bar"));
1262 ASSERT_OK(s);
1263 db_impl->TEST_FlushMemTable(true);
1264
1265 // issue rollback (marker written to WAL)
1266 s = txn->Rollback();
1267 ASSERT_OK(s);
1268
1269 // value is nolonger readable
1270 s = txn->Get(read_options, Slice("tfoo2"), &value);
1271 ASSERT_TRUE(s.IsNotFound());
1272 ASSERT_EQ(txn->GetNumPuts(), 0);
1273
1274 // make commit
1275 s = txn->Commit();
1276 ASSERT_EQ(s, Status::InvalidArgument());
1277
1278 // try rollback again
1279 s = txn->Rollback();
1280 ASSERT_EQ(s, Status::InvalidArgument());
1281
1282 delete txn;
1283 }
1284
TEST_P(TransactionTest,PersistentTwoPhaseTransactionTest)1285 TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
1286 WriteOptions write_options;
1287 write_options.sync = true;
1288 write_options.disableWAL = false;
1289 ReadOptions read_options;
1290
1291 TransactionOptions txn_options;
1292
1293 std::string value;
1294 Status s;
1295
1296 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1297
1298 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1299 s = txn->SetName("xid");
1300 ASSERT_OK(s);
1301
1302 ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1303
1304 // transaction put
1305 s = txn->Put(Slice("foo"), Slice("bar"));
1306 ASSERT_OK(s);
1307 ASSERT_EQ(1, txn->GetNumPuts());
1308
1309 // txn read
1310 s = txn->Get(read_options, "foo", &value);
1311 ASSERT_OK(s);
1312 ASSERT_EQ(value, "bar");
1313
1314 // regular db put
1315 s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
1316 ASSERT_OK(s);
1317 ASSERT_EQ(1, txn->GetNumPuts());
1318
1319 db_impl->TEST_FlushMemTable(true);
1320
1321 // regular db read
1322 db->Get(read_options, "foo2", &value);
1323 ASSERT_EQ(value, "bar2");
1324
1325 // nothing has been prepped yet
1326 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1327
1328 // prepare
1329 s = txn->Prepare();
1330 ASSERT_OK(s);
1331
1332 // still not available to db
1333 s = db->Get(read_options, Slice("foo"), &value);
1334 ASSERT_TRUE(s.IsNotFound());
1335
1336 db->FlushWAL(false);
1337 delete txn;
1338 // kill and reopen
1339 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1340 s = ReOpenNoDelete();
1341 ASSERT_OK(s);
1342 assert(db != nullptr);
1343 db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1344
1345 // find trans in list of prepared transactions
1346 std::vector<Transaction*> prepared_trans;
1347 db->GetAllPreparedTransactions(&prepared_trans);
1348 ASSERT_EQ(prepared_trans.size(), 1);
1349
1350 txn = prepared_trans.front();
1351 ASSERT_TRUE(txn);
1352 ASSERT_EQ(txn->GetName(), "xid");
1353 ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1354
1355 // log has been marked
1356 auto log_containing_prep =
1357 db_impl->TEST_FindMinLogContainingOutstandingPrep();
1358 ASSERT_GT(log_containing_prep, 0);
1359
1360 // value is readable from txn
1361 s = txn->Get(read_options, "foo", &value);
1362 ASSERT_OK(s);
1363 ASSERT_EQ(value, "bar");
1364
1365 // make commit
1366 s = txn->Commit();
1367 ASSERT_OK(s);
1368
1369 // value is now available
1370 db->Get(read_options, "foo", &value);
1371 ASSERT_EQ(value, "bar");
1372
1373 // we already committed
1374 s = txn->Commit();
1375 ASSERT_EQ(s, Status::InvalidArgument());
1376
1377 // no longer is prepared results
1378 prepared_trans.clear();
1379 db->GetAllPreparedTransactions(&prepared_trans);
1380 ASSERT_EQ(prepared_trans.size(), 0);
1381
1382 // transaction should no longer be visible
1383 ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1384
1385 // heap should not care about prepared section anymore
1386 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1387
1388 switch (txn_db_options.write_policy) {
1389 case WRITE_COMMITTED:
1390 // but now our memtable should be referencing the prep section
1391 ASSERT_EQ(log_containing_prep,
1392 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1393 ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1394
1395 break;
1396 case WRITE_PREPARED:
1397 case WRITE_UNPREPARED:
1398 // In these modes memtable do not ref the prep sections
1399 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1400 break;
1401 default:
1402 assert(false);
1403 }
1404
1405 // Add a dummy record to memtable before a flush. Otherwise, the
1406 // memtable will be empty and flush will be skipped.
1407 s = db->Put(write_options, Slice("foo3"), Slice("bar3"));
1408 ASSERT_OK(s);
1409
1410 db_impl->TEST_FlushMemTable(true);
1411
1412 // after memtable flush we can now release the log
1413 ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
1414 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1415
1416 delete txn;
1417
1418 // deleting transaction should unregister transaction
1419 ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1420 }
1421 #endif // ROCKSDB_VALGRIND_RUN
1422
1423 // TODO this test needs to be updated with serial commits
TEST_P(TransactionTest,DISABLED_TwoPhaseMultiThreadTest)1424 TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) {
1425 // mix transaction writes and regular writes
1426 const uint32_t NUM_TXN_THREADS = 50;
1427 std::atomic<uint32_t> txn_thread_num(0);
1428
1429 std::function<void()> txn_write_thread = [&]() {
1430 uint32_t id = txn_thread_num.fetch_add(1);
1431
1432 WriteOptions write_options;
1433 write_options.sync = true;
1434 write_options.disableWAL = false;
1435 TransactionOptions txn_options;
1436 txn_options.lock_timeout = 1000000;
1437 if (id % 2 == 0) {
1438 txn_options.expiration = 1000000;
1439 }
1440 TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(id)));
1441 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1442 ASSERT_OK(txn->SetName(name));
1443 for (int i = 0; i < 10; i++) {
1444 std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
1445 ASSERT_OK(txn->Put(key, "val"));
1446 }
1447 ASSERT_OK(txn->Prepare());
1448 ASSERT_OK(txn->Commit());
1449 delete txn;
1450 };
1451
1452 // assure that all thread are in the same write group
1453 std::atomic<uint32_t> t_wait_on_prepare(0);
1454 std::atomic<uint32_t> t_wait_on_commit(0);
1455
1456 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1457 "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
1458 auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
1459
1460 if (writer->ShouldWriteToWAL()) {
1461 t_wait_on_prepare.fetch_add(1);
1462 // wait for friends
1463 while (t_wait_on_prepare.load() < NUM_TXN_THREADS) {
1464 env->SleepForMicroseconds(10);
1465 }
1466 } else if (writer->ShouldWriteToMemtable()) {
1467 t_wait_on_commit.fetch_add(1);
1468 // wait for friends
1469 while (t_wait_on_commit.load() < NUM_TXN_THREADS) {
1470 env->SleepForMicroseconds(10);
1471 }
1472 } else {
1473 FAIL();
1474 }
1475 });
1476
1477 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1478
1479 // do all the writes
1480 std::vector<port::Thread> threads;
1481 for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
1482 threads.emplace_back(txn_write_thread);
1483 }
1484 for (auto& t : threads) {
1485 t.join();
1486 }
1487
1488 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1489 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1490
1491 ReadOptions read_options;
1492 std::string value;
1493 Status s;
1494 for (uint32_t t = 0; t < NUM_TXN_THREADS; t++) {
1495 TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(t)));
1496 for (int i = 0; i < 10; i++) {
1497 std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
1498 s = db->Get(read_options, key, &value);
1499 ASSERT_OK(s);
1500 ASSERT_EQ(value, "val");
1501 }
1502 }
1503 }
1504
TEST_P(TransactionStressTest,TwoPhaseLongPrepareTest)1505 TEST_P(TransactionStressTest, TwoPhaseLongPrepareTest) {
1506 WriteOptions write_options;
1507 write_options.sync = true;
1508 write_options.disableWAL = false;
1509 ReadOptions read_options;
1510 TransactionOptions txn_options;
1511
1512 std::string value;
1513 Status s;
1514
1515 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1516 s = txn->SetName("bob");
1517 ASSERT_OK(s);
1518
1519 // transaction put
1520 s = txn->Put(Slice("foo"), Slice("bar"));
1521 ASSERT_OK(s);
1522
1523 // prepare
1524 s = txn->Prepare();
1525 ASSERT_OK(s);
1526
1527 delete txn;
1528
1529 for (int i = 0; i < 1000; i++) {
1530 std::string key(i, 'k');
1531 std::string val(1000, 'v');
1532 assert(db != nullptr);
1533 s = db->Put(write_options, key, val);
1534 ASSERT_OK(s);
1535
1536 if (i % 29 == 0) {
1537 // crash
1538 env->SetFilesystemActive(false);
1539 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1540 ReOpenNoDelete();
1541 } else if (i % 37 == 0) {
1542 // close
1543 ReOpenNoDelete();
1544 }
1545 }
1546
1547 // commit old txn
1548 txn = db->GetTransactionByName("bob");
1549 ASSERT_TRUE(txn);
1550 s = txn->Commit();
1551 ASSERT_OK(s);
1552
1553 // verify data txn data
1554 s = db->Get(read_options, "foo", &value);
1555 ASSERT_EQ(s, Status::OK());
1556 ASSERT_EQ(value, "bar");
1557
1558 // verify non txn data
1559 for (int i = 0; i < 1000; i++) {
1560 std::string key(i, 'k');
1561 std::string val(1000, 'v');
1562 s = db->Get(read_options, key, &value);
1563 ASSERT_EQ(s, Status::OK());
1564 ASSERT_EQ(value, val);
1565 }
1566
1567 delete txn;
1568 }
1569
TEST_P(TransactionTest,TwoPhaseSequenceTest)1570 TEST_P(TransactionTest, TwoPhaseSequenceTest) {
1571 WriteOptions write_options;
1572 write_options.sync = true;
1573 write_options.disableWAL = false;
1574 ReadOptions read_options;
1575
1576 TransactionOptions txn_options;
1577
1578 std::string value;
1579 Status s;
1580
1581 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1582 s = txn->SetName("xid");
1583 ASSERT_OK(s);
1584
1585 // transaction put
1586 s = txn->Put(Slice("foo"), Slice("bar"));
1587 ASSERT_OK(s);
1588 s = txn->Put(Slice("foo2"), Slice("bar2"));
1589 ASSERT_OK(s);
1590 s = txn->Put(Slice("foo3"), Slice("bar3"));
1591 ASSERT_OK(s);
1592 s = txn->Put(Slice("foo4"), Slice("bar4"));
1593 ASSERT_OK(s);
1594
1595 // prepare
1596 s = txn->Prepare();
1597 ASSERT_OK(s);
1598
1599 // make commit
1600 s = txn->Commit();
1601 ASSERT_OK(s);
1602
1603 delete txn;
1604
1605 // kill and reopen
1606 env->SetFilesystemActive(false);
1607 ReOpenNoDelete();
1608 assert(db != nullptr);
1609
1610 // value is now available
1611 s = db->Get(read_options, "foo4", &value);
1612 ASSERT_EQ(s, Status::OK());
1613 ASSERT_EQ(value, "bar4");
1614 }
1615
TEST_P(TransactionTest,TwoPhaseDoubleRecoveryTest)1616 TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) {
1617 WriteOptions write_options;
1618 write_options.sync = true;
1619 write_options.disableWAL = false;
1620 ReadOptions read_options;
1621
1622 TransactionOptions txn_options;
1623
1624 std::string value;
1625 Status s;
1626
1627 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1628 s = txn->SetName("a");
1629 ASSERT_OK(s);
1630
1631 // transaction put
1632 s = txn->Put(Slice("foo"), Slice("bar"));
1633 ASSERT_OK(s);
1634
1635 // prepare
1636 s = txn->Prepare();
1637 ASSERT_OK(s);
1638
1639 delete txn;
1640
1641 // kill and reopen
1642 env->SetFilesystemActive(false);
1643 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1644 ReOpenNoDelete();
1645
1646 // commit old txn
1647 txn = db->GetTransactionByName("a");
1648 s = txn->Commit();
1649 ASSERT_OK(s);
1650
1651 s = db->Get(read_options, "foo", &value);
1652 ASSERT_EQ(s, Status::OK());
1653 ASSERT_EQ(value, "bar");
1654
1655 delete txn;
1656
1657 txn = db->BeginTransaction(write_options, txn_options);
1658 s = txn->SetName("b");
1659 ASSERT_OK(s);
1660
1661 s = txn->Put(Slice("foo2"), Slice("bar2"));
1662 ASSERT_OK(s);
1663
1664 s = txn->Prepare();
1665 ASSERT_OK(s);
1666
1667 s = txn->Commit();
1668 ASSERT_OK(s);
1669
1670 delete txn;
1671
1672 // kill and reopen
1673 env->SetFilesystemActive(false);
1674 ReOpenNoDelete();
1675 assert(db != nullptr);
1676
1677 // value is now available
1678 s = db->Get(read_options, "foo", &value);
1679 ASSERT_EQ(s, Status::OK());
1680 ASSERT_EQ(value, "bar");
1681
1682 s = db->Get(read_options, "foo2", &value);
1683 ASSERT_EQ(s, Status::OK());
1684 ASSERT_EQ(value, "bar2");
1685 }
1686
TEST_P(TransactionTest,TwoPhaseLogRollingTest)1687 TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
1688 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1689
1690 Status s;
1691 std::string v;
1692 ColumnFamilyHandle *cfa, *cfb;
1693
1694 // Create 2 new column families
1695 ColumnFamilyOptions cf_options;
1696 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1697 ASSERT_OK(s);
1698 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1699 ASSERT_OK(s);
1700
1701 WriteOptions wopts;
1702 wopts.disableWAL = false;
1703 wopts.sync = true;
1704
1705 TransactionOptions topts1;
1706 Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1707 s = txn1->SetName("xid1");
1708 ASSERT_OK(s);
1709
1710 TransactionOptions topts2;
1711 Transaction* txn2 = db->BeginTransaction(wopts, topts2);
1712 s = txn2->SetName("xid2");
1713 ASSERT_OK(s);
1714
1715 // transaction put in two column families
1716 s = txn1->Put(cfa, "ka1", "va1");
1717 ASSERT_OK(s);
1718
1719 // transaction put in two column families
1720 s = txn2->Put(cfa, "ka2", "va2");
1721 ASSERT_OK(s);
1722 s = txn2->Put(cfb, "kb2", "vb2");
1723 ASSERT_OK(s);
1724
1725 // write prep section to wal
1726 s = txn1->Prepare();
1727 ASSERT_OK(s);
1728
1729 // our log should be in the heap
1730 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1731 txn1->GetLogNumber());
1732 ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
1733
1734 // flush default cf to crate new log
1735 s = db->Put(wopts, "foo", "bar");
1736 ASSERT_OK(s);
1737 s = db_impl->TEST_FlushMemTable(true);
1738 ASSERT_OK(s);
1739
1740 // make sure we are on a new log
1741 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
1742
1743 // put txn2 prep section in this log
1744 s = txn2->Prepare();
1745 ASSERT_OK(s);
1746 ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
1747
1748 // heap should still see first log
1749 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1750 txn1->GetLogNumber());
1751
1752 // commit txn1
1753 s = txn1->Commit();
1754 ASSERT_OK(s);
1755
1756 // heap should now show txn2s log
1757 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1758 txn2->GetLogNumber());
1759
1760 switch (txn_db_options.write_policy) {
1761 case WRITE_COMMITTED:
1762 // we should see txn1s log refernced by the memtables
1763 ASSERT_EQ(txn1->GetLogNumber(),
1764 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1765 break;
1766 case WRITE_PREPARED:
1767 case WRITE_UNPREPARED:
1768 // In these modes memtable do not ref the prep sections
1769 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1770 break;
1771 default:
1772 assert(false);
1773 }
1774
1775 // flush default cf to crate new log
1776 s = db->Put(wopts, "foo", "bar2");
1777 ASSERT_OK(s);
1778 s = db_impl->TEST_FlushMemTable(true);
1779 ASSERT_OK(s);
1780
1781 // make sure we are on a new log
1782 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
1783
1784 // commit txn2
1785 s = txn2->Commit();
1786 ASSERT_OK(s);
1787
1788 // heap should not show any logs
1789 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1790
1791 switch (txn_db_options.write_policy) {
1792 case WRITE_COMMITTED:
1793 // should show the first txn log
1794 ASSERT_EQ(txn1->GetLogNumber(),
1795 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1796 break;
1797 case WRITE_PREPARED:
1798 case WRITE_UNPREPARED:
1799 // In these modes memtable do not ref the prep sections
1800 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1801 break;
1802 default:
1803 assert(false);
1804 }
1805
1806 // flush only cfa memtable
1807 s = db_impl->TEST_FlushMemTable(true, false, cfa);
1808 ASSERT_OK(s);
1809
1810 switch (txn_db_options.write_policy) {
1811 case WRITE_COMMITTED:
1812 // should show the first txn log
1813 ASSERT_EQ(txn2->GetLogNumber(),
1814 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1815 break;
1816 case WRITE_PREPARED:
1817 case WRITE_UNPREPARED:
1818 // In these modes memtable do not ref the prep sections
1819 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1820 break;
1821 default:
1822 assert(false);
1823 }
1824
1825 // flush only cfb memtable
1826 s = db_impl->TEST_FlushMemTable(true, false, cfb);
1827 ASSERT_OK(s);
1828
1829 // should show not dependency on logs
1830 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1831 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1832
1833 delete txn1;
1834 delete txn2;
1835 delete cfa;
1836 delete cfb;
1837 }
1838
TEST_P(TransactionTest,TwoPhaseLogRollingTest2)1839 TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
1840 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1841
1842 Status s;
1843 ColumnFamilyHandle *cfa, *cfb;
1844
1845 ColumnFamilyOptions cf_options;
1846 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1847 ASSERT_OK(s);
1848 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1849 ASSERT_OK(s);
1850
1851 WriteOptions wopts;
1852 wopts.disableWAL = false;
1853 wopts.sync = true;
1854
1855 auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
1856 auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
1857
1858 TransactionOptions topts1;
1859 Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1860 s = txn1->SetName("xid1");
1861 ASSERT_OK(s);
1862 s = txn1->Put(cfa, "boys", "girls1");
1863 ASSERT_OK(s);
1864
1865 Transaction* txn2 = db->BeginTransaction(wopts, topts1);
1866 s = txn2->SetName("xid2");
1867 ASSERT_OK(s);
1868 s = txn2->Put(cfb, "up", "down1");
1869 ASSERT_OK(s);
1870
1871 // prepre transaction in LOG A
1872 s = txn1->Prepare();
1873 ASSERT_OK(s);
1874
1875 // prepre transaction in LOG A
1876 s = txn2->Prepare();
1877 ASSERT_OK(s);
1878
1879 // regular put so that mem table can actually be flushed for log rolling
1880 s = db->Put(wopts, "cats", "dogs1");
1881 ASSERT_OK(s);
1882
1883 auto prepare_log_no = txn1->GetLastLogNumber();
1884
1885 // roll to LOG B
1886 s = db_impl->TEST_FlushMemTable(true);
1887 ASSERT_OK(s);
1888
1889 // now we pause background work so that
1890 // imm()s are not flushed before we can check their status
1891 s = db_impl->PauseBackgroundWork();
1892 ASSERT_OK(s);
1893
1894 ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
1895 switch (txn_db_options.write_policy) {
1896 case WRITE_COMMITTED:
1897 // This cf is empty and should ref the latest log
1898 ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1899 ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
1900 break;
1901 case WRITE_PREPARED:
1902 case WRITE_UNPREPARED:
1903 // This cf is not flushed yet and should ref the log that has its data
1904 ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1905 break;
1906 default:
1907 assert(false);
1908 }
1909 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1910 txn1->GetLogNumber());
1911 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1912
1913 // commit in LOG B
1914 s = txn1->Commit();
1915 ASSERT_OK(s);
1916
1917 switch (txn_db_options.write_policy) {
1918 case WRITE_COMMITTED:
1919 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
1920 prepare_log_no);
1921 break;
1922 case WRITE_PREPARED:
1923 case WRITE_UNPREPARED:
1924 // In these modes memtable do not ref the prep sections
1925 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1926 break;
1927 default:
1928 assert(false);
1929 }
1930
1931 ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
1932
1933 // request a flush for all column families such that the earliest
1934 // alive log file can be killed
1935 db_impl->TEST_SwitchWAL();
1936 // log cannot be flushed because txn2 has not been commited
1937 ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
1938 ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog());
1939
1940 // assert that cfa has a flush requested
1941 ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
1942
1943 switch (txn_db_options.write_policy) {
1944 case WRITE_COMMITTED:
1945 // cfb should not be flushed becuse it has no data from LOG A
1946 ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
1947 break;
1948 case WRITE_PREPARED:
1949 case WRITE_UNPREPARED:
1950 // cfb should be flushed becuse it has prepared data from LOG A
1951 ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1952 break;
1953 default:
1954 assert(false);
1955 }
1956
1957 // cfb now has data from LOG A
1958 s = txn2->Commit();
1959 ASSERT_OK(s);
1960
1961 db_impl->TEST_SwitchWAL();
1962 ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
1963
1964 // we should see that cfb now has a flush requested
1965 ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1966
1967 // all data in LOG A resides in a memtable that has been
1968 // requested for a flush
1969 ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
1970
1971 delete txn1;
1972 delete txn2;
1973 delete cfa;
1974 delete cfb;
1975 }
1976 /*
1977 * 1) use prepare to keep first log around to determine starting sequence
1978 * during recovery.
1979 * 2) insert many values, skipping wal, to increase seqid.
1980 * 3) insert final value into wal
1981 * 4) recover and see that final value was properly recovered - not
1982 * hidden behind improperly summed sequence ids
1983 */
TEST_P(TransactionTest,TwoPhaseOutOfOrderDelete)1984 TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
1985 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1986 WriteOptions wal_on, wal_off;
1987 wal_on.sync = true;
1988 wal_on.disableWAL = false;
1989 wal_off.disableWAL = true;
1990 ReadOptions read_options;
1991 TransactionOptions txn_options;
1992
1993 std::string value;
1994 Status s;
1995
1996 Transaction* txn1 = db->BeginTransaction(wal_on, txn_options);
1997
1998 s = txn1->SetName("1");
1999 ASSERT_OK(s);
2000
2001 s = db->Put(wal_on, "first", "first");
2002 ASSERT_OK(s);
2003
2004 s = txn1->Put(Slice("dummy"), Slice("dummy"));
2005 ASSERT_OK(s);
2006 s = txn1->Prepare();
2007 ASSERT_OK(s);
2008
2009 s = db->Put(wal_off, "cats", "dogs1");
2010 ASSERT_OK(s);
2011 s = db->Put(wal_off, "cats", "dogs2");
2012 ASSERT_OK(s);
2013 s = db->Put(wal_off, "cats", "dogs3");
2014 ASSERT_OK(s);
2015
2016 s = db_impl->TEST_FlushMemTable(true);
2017 ASSERT_OK(s);
2018
2019 s = db->Put(wal_on, "cats", "dogs4");
2020 ASSERT_OK(s);
2021
2022 db->FlushWAL(false);
2023
2024 // kill and reopen
2025 env->SetFilesystemActive(false);
2026 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
2027 ReOpenNoDelete();
2028 assert(db != nullptr);
2029
2030 s = db->Get(read_options, "first", &value);
2031 ASSERT_OK(s);
2032 ASSERT_EQ(value, "first");
2033
2034 s = db->Get(read_options, "cats", &value);
2035 ASSERT_OK(s);
2036 ASSERT_EQ(value, "dogs4");
2037 }
2038
TEST_P(TransactionTest,FirstWriteTest)2039 TEST_P(TransactionTest, FirstWriteTest) {
2040 WriteOptions write_options;
2041
2042 // Test conflict checking against the very first write to a db.
2043 // The transaction's snapshot will have seq 1 and the following write
2044 // will have sequence 1.
2045 Status s = db->Put(write_options, "A", "a");
2046
2047 Transaction* txn = db->BeginTransaction(write_options);
2048 txn->SetSnapshot();
2049
2050 ASSERT_OK(s);
2051
2052 s = txn->Put("A", "b");
2053 ASSERT_OK(s);
2054
2055 delete txn;
2056 }
2057
TEST_P(TransactionTest,FirstWriteTest2)2058 TEST_P(TransactionTest, FirstWriteTest2) {
2059 WriteOptions write_options;
2060
2061 Transaction* txn = db->BeginTransaction(write_options);
2062 txn->SetSnapshot();
2063
2064 // Test conflict checking against the very first write to a db.
2065 // The transaction's snapshot is a seq 0 while the following write
2066 // will have sequence 1.
2067 Status s = db->Put(write_options, "A", "a");
2068 ASSERT_OK(s);
2069
2070 s = txn->Put("A", "b");
2071 ASSERT_TRUE(s.IsBusy());
2072
2073 delete txn;
2074 }
2075
TEST_P(TransactionTest,WriteOptionsTest)2076 TEST_P(TransactionTest, WriteOptionsTest) {
2077 WriteOptions write_options;
2078 write_options.sync = true;
2079 write_options.disableWAL = true;
2080
2081 Transaction* txn = db->BeginTransaction(write_options);
2082 ASSERT_TRUE(txn);
2083
2084 ASSERT_TRUE(txn->GetWriteOptions()->sync);
2085
2086 write_options.sync = false;
2087 txn->SetWriteOptions(write_options);
2088 ASSERT_FALSE(txn->GetWriteOptions()->sync);
2089 ASSERT_TRUE(txn->GetWriteOptions()->disableWAL);
2090
2091 delete txn;
2092 }
2093
TEST_P(TransactionTest,WriteConflictTest)2094 TEST_P(TransactionTest, WriteConflictTest) {
2095 WriteOptions write_options;
2096 ReadOptions read_options;
2097 string value;
2098 Status s;
2099
2100 db->Put(write_options, "foo", "A");
2101 db->Put(write_options, "foo2", "B");
2102
2103 Transaction* txn = db->BeginTransaction(write_options);
2104 ASSERT_TRUE(txn);
2105
2106 s = txn->Put("foo", "A2");
2107 ASSERT_OK(s);
2108
2109 s = txn->Put("foo2", "B2");
2110 ASSERT_OK(s);
2111
2112 // This Put outside of a transaction will conflict with the previous write
2113 s = db->Put(write_options, "foo", "xxx");
2114 ASSERT_TRUE(s.IsTimedOut());
2115
2116 s = db->Get(read_options, "foo", &value);
2117 ASSERT_EQ(value, "A");
2118
2119 s = txn->Commit();
2120 ASSERT_OK(s);
2121
2122 db->Get(read_options, "foo", &value);
2123 ASSERT_EQ(value, "A2");
2124 db->Get(read_options, "foo2", &value);
2125 ASSERT_EQ(value, "B2");
2126
2127 delete txn;
2128 }
2129
TEST_P(TransactionTest,WriteConflictTest2)2130 TEST_P(TransactionTest, WriteConflictTest2) {
2131 WriteOptions write_options;
2132 ReadOptions read_options;
2133 TransactionOptions txn_options;
2134 std::string value;
2135 Status s;
2136
2137 db->Put(write_options, "foo", "bar");
2138
2139 txn_options.set_snapshot = true;
2140 Transaction* txn = db->BeginTransaction(write_options, txn_options);
2141 ASSERT_TRUE(txn);
2142
2143 // This Put outside of a transaction will conflict with a later write
2144 s = db->Put(write_options, "foo", "barz");
2145 ASSERT_OK(s);
2146
2147 s = txn->Put("foo2", "X");
2148 ASSERT_OK(s);
2149
2150 s = txn->Put("foo",
2151 "bar2"); // Conflicts with write done after snapshot taken
2152 ASSERT_TRUE(s.IsBusy());
2153
2154 s = txn->Put("foo3", "Y");
2155 ASSERT_OK(s);
2156
2157 s = db->Get(read_options, "foo", &value);
2158 ASSERT_EQ(value, "barz");
2159
2160 ASSERT_EQ(2, txn->GetNumKeys());
2161
2162 s = txn->Commit();
2163 ASSERT_OK(s); // Txn should commit, but only write foo2 and foo3
2164
2165 // Verify that transaction wrote foo2 and foo3 but not foo
2166 db->Get(read_options, "foo", &value);
2167 ASSERT_EQ(value, "barz");
2168
2169 db->Get(read_options, "foo2", &value);
2170 ASSERT_EQ(value, "X");
2171
2172 db->Get(read_options, "foo3", &value);
2173 ASSERT_EQ(value, "Y");
2174
2175 delete txn;
2176 }
2177
TEST_P(TransactionTest,ReadConflictTest)2178 TEST_P(TransactionTest, ReadConflictTest) {
2179 WriteOptions write_options;
2180 ReadOptions read_options, snapshot_read_options;
2181 TransactionOptions txn_options;
2182 std::string value;
2183 Status s;
2184
2185 db->Put(write_options, "foo", "bar");
2186 db->Put(write_options, "foo2", "bar");
2187
2188 txn_options.set_snapshot = true;
2189 Transaction* txn = db->BeginTransaction(write_options, txn_options);
2190 ASSERT_TRUE(txn);
2191
2192 txn->SetSnapshot();
2193 snapshot_read_options.snapshot = txn->GetSnapshot();
2194
2195 txn->GetForUpdate(snapshot_read_options, "foo", &value);
2196 ASSERT_EQ(value, "bar");
2197
2198 // This Put outside of a transaction will conflict with the previous read
2199 s = db->Put(write_options, "foo", "barz");
2200 ASSERT_TRUE(s.IsTimedOut());
2201
2202 s = db->Get(read_options, "foo", &value);
2203 ASSERT_EQ(value, "bar");
2204
2205 s = txn->Get(read_options, "foo", &value);
2206 ASSERT_EQ(value, "bar");
2207
2208 s = txn->Commit();
2209 ASSERT_OK(s);
2210
2211 delete txn;
2212 }
2213
TEST_P(TransactionTest,TxnOnlyTest)2214 TEST_P(TransactionTest, TxnOnlyTest) {
2215 // Test to make sure transactions work when there are no other writes in an
2216 // empty db.
2217
2218 WriteOptions write_options;
2219 ReadOptions read_options;
2220 std::string value;
2221 Status s;
2222
2223 Transaction* txn = db->BeginTransaction(write_options);
2224 ASSERT_TRUE(txn);
2225
2226 s = txn->Put("x", "y");
2227 ASSERT_OK(s);
2228
2229 s = txn->Commit();
2230 ASSERT_OK(s);
2231
2232 delete txn;
2233 }
2234
TEST_P(TransactionTest,FlushTest)2235 TEST_P(TransactionTest, FlushTest) {
2236 WriteOptions write_options;
2237 ReadOptions read_options, snapshot_read_options;
2238 std::string value;
2239 Status s;
2240
2241 db->Put(write_options, Slice("foo"), Slice("bar"));
2242 db->Put(write_options, Slice("foo2"), Slice("bar"));
2243
2244 Transaction* txn = db->BeginTransaction(write_options);
2245 ASSERT_TRUE(txn);
2246
2247 snapshot_read_options.snapshot = txn->GetSnapshot();
2248
2249 txn->GetForUpdate(snapshot_read_options, "foo", &value);
2250 ASSERT_EQ(value, "bar");
2251
2252 s = txn->Put(Slice("foo"), Slice("bar2"));
2253 ASSERT_OK(s);
2254
2255 txn->GetForUpdate(snapshot_read_options, "foo", &value);
2256 ASSERT_EQ(value, "bar2");
2257
2258 // Put a random key so we have a memtable to flush
2259 s = db->Put(write_options, "dummy", "dummy");
2260 ASSERT_OK(s);
2261
2262 // force a memtable flush
2263 FlushOptions flush_ops;
2264 db->Flush(flush_ops);
2265
2266 s = txn->Commit();
2267 // txn should commit since the flushed table is still in MemtableList History
2268 ASSERT_OK(s);
2269
2270 db->Get(read_options, "foo", &value);
2271 ASSERT_EQ(value, "bar2");
2272
2273 delete txn;
2274 }
2275
TEST_P(TransactionTest,FlushTest2)2276 TEST_P(TransactionTest, FlushTest2) {
2277 const size_t num_tests = 3;
2278
2279 for (size_t n = 0; n < num_tests; n++) {
2280 // Test different table factories
2281 switch (n) {
2282 case 0:
2283 break;
2284 case 1:
2285 options.table_factory.reset(new mock::MockTableFactory());
2286 break;
2287 case 2: {
2288 PlainTableOptions pt_opts;
2289 pt_opts.hash_table_ratio = 0;
2290 options.table_factory.reset(NewPlainTableFactory(pt_opts));
2291 break;
2292 }
2293 }
2294
2295 Status s = ReOpen();
2296 ASSERT_OK(s);
2297 assert(db != nullptr);
2298
2299 WriteOptions write_options;
2300 ReadOptions read_options, snapshot_read_options;
2301 TransactionOptions txn_options;
2302 string value;
2303
2304 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
2305
2306 db->Put(write_options, Slice("foo"), Slice("bar"));
2307 db->Put(write_options, Slice("foo2"), Slice("bar2"));
2308 db->Put(write_options, Slice("foo3"), Slice("bar3"));
2309
2310 txn_options.set_snapshot = true;
2311 Transaction* txn = db->BeginTransaction(write_options, txn_options);
2312 ASSERT_TRUE(txn);
2313
2314 snapshot_read_options.snapshot = txn->GetSnapshot();
2315
2316 txn->GetForUpdate(snapshot_read_options, "foo", &value);
2317 ASSERT_EQ(value, "bar");
2318
2319 s = txn->Put(Slice("foo"), Slice("bar2"));
2320 ASSERT_OK(s);
2321
2322 txn->GetForUpdate(snapshot_read_options, "foo", &value);
2323 ASSERT_EQ(value, "bar2");
2324 // verify foo is locked by txn
2325 s = db->Delete(write_options, "foo");
2326 ASSERT_TRUE(s.IsTimedOut());
2327
2328 s = db->Put(write_options, "Z", "z");
2329 ASSERT_OK(s);
2330 s = db->Put(write_options, "dummy", "dummy");
2331 ASSERT_OK(s);
2332
2333 s = db->Put(write_options, "S", "s");
2334 ASSERT_OK(s);
2335 s = db->SingleDelete(write_options, "S");
2336 ASSERT_OK(s);
2337
2338 s = txn->Delete("S");
2339 // Should fail after encountering a write to S in memtable
2340 ASSERT_TRUE(s.IsBusy());
2341
2342 // force a memtable flush
2343 s = db_impl->TEST_FlushMemTable(true);
2344 ASSERT_OK(s);
2345
2346 // Put a random key so we have a MemTable to flush
2347 s = db->Put(write_options, "dummy", "dummy2");
2348 ASSERT_OK(s);
2349
2350 // force a memtable flush
2351 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2352
2353 s = db->Put(write_options, "dummy", "dummy3");
2354 ASSERT_OK(s);
2355
2356 // force a memtable flush
2357 // Since our test db has max_write_buffer_number=2, this flush will cause
2358 // the first memtable to get purged from the MemtableList history.
2359 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2360
2361 s = txn->Put("X", "Y");
2362 // Should succeed after verifying there is no write to X in SST file
2363 ASSERT_OK(s);
2364
2365 s = txn->Put("Z", "zz");
2366 // Should fail after encountering a write to Z in SST file
2367 ASSERT_TRUE(s.IsBusy());
2368
2369 s = txn->GetForUpdate(read_options, "foo2", &value);
2370 // should succeed since key was written before txn started
2371 ASSERT_OK(s);
2372 // verify foo2 is locked by txn
2373 s = db->Delete(write_options, "foo2");
2374 ASSERT_TRUE(s.IsTimedOut());
2375
2376 s = txn->Delete("S");
2377 // Should fail after encountering a write to S in SST file
2378 ASSERT_TRUE(s.IsBusy());
2379
2380 // Write a bunch of keys to db to force a compaction
2381 Random rnd(47);
2382 for (int i = 0; i < 1000; i++) {
2383 s = db->Put(write_options, std::to_string(i),
2384 test::CompressibleString(&rnd, 0.8, 100, &value));
2385 ASSERT_OK(s);
2386 }
2387
2388 s = txn->Put("X", "yy");
2389 // Should succeed after verifying there is no write to X in SST file
2390 ASSERT_OK(s);
2391
2392 s = txn->Put("Z", "zzz");
2393 // Should fail after encountering a write to Z in SST file
2394 ASSERT_TRUE(s.IsBusy());
2395
2396 s = txn->Delete("S");
2397 // Should fail after encountering a write to S in SST file
2398 ASSERT_TRUE(s.IsBusy());
2399
2400 s = txn->GetForUpdate(read_options, "foo3", &value);
2401 // should succeed since key was written before txn started
2402 ASSERT_OK(s);
2403 // verify foo3 is locked by txn
2404 s = db->Delete(write_options, "foo3");
2405 ASSERT_TRUE(s.IsTimedOut());
2406
2407 db_impl->TEST_WaitForCompact();
2408
2409 s = txn->Commit();
2410 ASSERT_OK(s);
2411
2412 // Transaction should only write the keys that succeeded.
2413 s = db->Get(read_options, "foo", &value);
2414 ASSERT_EQ(value, "bar2");
2415
2416 s = db->Get(read_options, "X", &value);
2417 ASSERT_OK(s);
2418 ASSERT_EQ("yy", value);
2419
2420 s = db->Get(read_options, "Z", &value);
2421 ASSERT_OK(s);
2422 ASSERT_EQ("z", value);
2423
2424 delete txn;
2425 }
2426 }
2427
TEST_P(TransactionTest,NoSnapshotTest)2428 TEST_P(TransactionTest, NoSnapshotTest) {
2429 WriteOptions write_options;
2430 ReadOptions read_options;
2431 std::string value;
2432 Status s;
2433
2434 db->Put(write_options, "AAA", "bar");
2435
2436 Transaction* txn = db->BeginTransaction(write_options);
2437 ASSERT_TRUE(txn);
2438
2439 // Modify key after transaction start
2440 db->Put(write_options, "AAA", "bar1");
2441
2442 // Read and write without a snap
2443 txn->GetForUpdate(read_options, "AAA", &value);
2444 ASSERT_EQ(value, "bar1");
2445 s = txn->Put("AAA", "bar2");
2446 ASSERT_OK(s);
2447
2448 // Should commit since read/write was done after data changed
2449 s = txn->Commit();
2450 ASSERT_OK(s);
2451
2452 txn->GetForUpdate(read_options, "AAA", &value);
2453 ASSERT_EQ(value, "bar2");
2454
2455 delete txn;
2456 }
2457
TEST_P(TransactionTest,MultipleSnapshotTest)2458 TEST_P(TransactionTest, MultipleSnapshotTest) {
2459 WriteOptions write_options;
2460 ReadOptions read_options, snapshot_read_options;
2461 std::string value;
2462 Status s;
2463
2464 ASSERT_OK(db->Put(write_options, "AAA", "bar"));
2465 ASSERT_OK(db->Put(write_options, "BBB", "bar"));
2466 ASSERT_OK(db->Put(write_options, "CCC", "bar"));
2467
2468 Transaction* txn = db->BeginTransaction(write_options);
2469 ASSERT_TRUE(txn);
2470
2471 db->Put(write_options, "AAA", "bar1");
2472
2473 // Read and write without a snapshot
2474 ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
2475 ASSERT_EQ(value, "bar1");
2476 s = txn->Put("AAA", "bar2");
2477 ASSERT_OK(s);
2478
2479 // Modify BBB before snapshot is taken
2480 ASSERT_OK(db->Put(write_options, "BBB", "bar1"));
2481
2482 txn->SetSnapshot();
2483 snapshot_read_options.snapshot = txn->GetSnapshot();
2484
2485 // Read and write with snapshot
2486 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value));
2487 ASSERT_EQ(value, "bar1");
2488 s = txn->Put("BBB", "bar2");
2489 ASSERT_OK(s);
2490
2491 ASSERT_OK(db->Put(write_options, "CCC", "bar1"));
2492
2493 // Set a new snapshot
2494 txn->SetSnapshot();
2495 snapshot_read_options.snapshot = txn->GetSnapshot();
2496
2497 // Read and write with snapshot
2498 txn->GetForUpdate(snapshot_read_options, "CCC", &value);
2499 ASSERT_EQ(value, "bar1");
2500 s = txn->Put("CCC", "bar2");
2501 ASSERT_OK(s);
2502
2503 s = txn->GetForUpdate(read_options, "AAA", &value);
2504 ASSERT_OK(s);
2505 ASSERT_EQ(value, "bar2");
2506 s = txn->GetForUpdate(read_options, "BBB", &value);
2507 ASSERT_OK(s);
2508 ASSERT_EQ(value, "bar2");
2509 s = txn->GetForUpdate(read_options, "CCC", &value);
2510 ASSERT_OK(s);
2511 ASSERT_EQ(value, "bar2");
2512
2513 s = db->Get(read_options, "AAA", &value);
2514 ASSERT_OK(s);
2515 ASSERT_EQ(value, "bar1");
2516 s = db->Get(read_options, "BBB", &value);
2517 ASSERT_OK(s);
2518 ASSERT_EQ(value, "bar1");
2519 s = db->Get(read_options, "CCC", &value);
2520 ASSERT_OK(s);
2521 ASSERT_EQ(value, "bar1");
2522
2523 s = txn->Commit();
2524 ASSERT_OK(s);
2525
2526 s = db->Get(read_options, "AAA", &value);
2527 ASSERT_OK(s);
2528 ASSERT_EQ(value, "bar2");
2529 s = db->Get(read_options, "BBB", &value);
2530 ASSERT_OK(s);
2531 ASSERT_EQ(value, "bar2");
2532 s = db->Get(read_options, "CCC", &value);
2533 ASSERT_OK(s);
2534 ASSERT_EQ(value, "bar2");
2535
2536 // verify that we track multiple writes to the same key at different snapshots
2537 delete txn;
2538 txn = db->BeginTransaction(write_options);
2539
2540 // Potentially conflicting writes
2541 db->Put(write_options, "ZZZ", "zzz");
2542 db->Put(write_options, "XXX", "xxx");
2543
2544 txn->SetSnapshot();
2545
2546 TransactionOptions txn_options;
2547 txn_options.set_snapshot = true;
2548 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2549 txn2->SetSnapshot();
2550
2551 // This should not conflict in txn since the snapshot is later than the
2552 // previous write (spoiler alert: it will later conflict with txn2).
2553 s = txn->Put("ZZZ", "zzzz");
2554 ASSERT_OK(s);
2555
2556 s = txn->Commit();
2557 ASSERT_OK(s);
2558
2559 delete txn;
2560
2561 // This will conflict since the snapshot is earlier than another write to ZZZ
2562 s = txn2->Put("ZZZ", "xxxxx");
2563 ASSERT_TRUE(s.IsBusy());
2564
2565 s = txn2->Commit();
2566 ASSERT_OK(s);
2567
2568 s = db->Get(read_options, "ZZZ", &value);
2569 ASSERT_OK(s);
2570 ASSERT_EQ(value, "zzzz");
2571
2572 delete txn2;
2573 }
2574
TEST_P(TransactionTest,ColumnFamiliesTest)2575 TEST_P(TransactionTest, ColumnFamiliesTest) {
2576 WriteOptions write_options;
2577 ReadOptions read_options, snapshot_read_options;
2578 TransactionOptions txn_options;
2579 string value;
2580 Status s;
2581
2582 ColumnFamilyHandle *cfa, *cfb;
2583 ColumnFamilyOptions cf_options;
2584
2585 // Create 2 new column families
2586 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
2587 ASSERT_OK(s);
2588 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
2589 ASSERT_OK(s);
2590
2591 delete cfa;
2592 delete cfb;
2593 delete db;
2594 db = nullptr;
2595
2596 // open DB with three column families
2597 std::vector<ColumnFamilyDescriptor> column_families;
2598 // have to open default column family
2599 column_families.push_back(
2600 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2601 // open the new column families
2602 column_families.push_back(
2603 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
2604 column_families.push_back(
2605 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
2606
2607 std::vector<ColumnFamilyHandle*> handles;
2608
2609 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2610 assert(db != nullptr);
2611
2612 Transaction* txn = db->BeginTransaction(write_options);
2613 ASSERT_TRUE(txn);
2614
2615 txn->SetSnapshot();
2616 snapshot_read_options.snapshot = txn->GetSnapshot();
2617
2618 txn_options.set_snapshot = true;
2619 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2620 ASSERT_TRUE(txn2);
2621
2622 // Write some data to the db
2623 WriteBatch batch;
2624 batch.Put("foo", "foo");
2625 batch.Put(handles[1], "AAA", "bar");
2626 batch.Put(handles[1], "AAAZZZ", "bar");
2627 s = db->Write(write_options, &batch);
2628 ASSERT_OK(s);
2629 db->Delete(write_options, handles[1], "AAAZZZ");
2630
2631 // These keys do not conflict with existing writes since they're in
2632 // different column families
2633 s = txn->Delete("AAA");
2634 ASSERT_OK(s);
2635 s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
2636 ASSERT_TRUE(s.IsNotFound());
2637 Slice key_slice("AAAZZZ");
2638 Slice value_slices[2] = {Slice("bar"), Slice("bar")};
2639 s = txn->Put(handles[2], SliceParts(&key_slice, 1),
2640 SliceParts(value_slices, 2));
2641 ASSERT_OK(s);
2642 ASSERT_EQ(3, txn->GetNumKeys());
2643
2644 s = txn->Commit();
2645 ASSERT_OK(s);
2646 s = db->Get(read_options, "AAA", &value);
2647 ASSERT_TRUE(s.IsNotFound());
2648 s = db->Get(read_options, handles[2], "AAAZZZ", &value);
2649 ASSERT_EQ(value, "barbar");
2650
2651 Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
2652 Slice value_slice("barbarbar");
2653
2654 s = txn2->Delete(handles[2], "XXX");
2655 ASSERT_OK(s);
2656 s = txn2->Delete(handles[1], "XXX");
2657 ASSERT_OK(s);
2658
2659 // This write will cause a conflict with the earlier batch write
2660 s = txn2->Put(handles[1], SliceParts(key_slices, 3),
2661 SliceParts(&value_slice, 1));
2662 ASSERT_TRUE(s.IsBusy());
2663
2664 s = txn2->Commit();
2665 ASSERT_OK(s);
2666 // In the above the latest change to AAAZZZ in handles[1] is delete.
2667 s = db->Get(read_options, handles[1], "AAAZZZ", &value);
2668 ASSERT_TRUE(s.IsNotFound());
2669
2670 delete txn;
2671 delete txn2;
2672
2673 txn = db->BeginTransaction(write_options, txn_options);
2674 snapshot_read_options.snapshot = txn->GetSnapshot();
2675
2676 txn2 = db->BeginTransaction(write_options, txn_options);
2677 ASSERT_TRUE(txn);
2678
2679 std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
2680 handles[0], handles[2]};
2681 std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
2682 std::vector<std::string> values(4);
2683 std::vector<Status> results = txn->MultiGetForUpdate(
2684 snapshot_read_options, multiget_cfh, multiget_keys, &values);
2685 ASSERT_OK(results[0]);
2686 ASSERT_OK(results[1]);
2687 ASSERT_OK(results[2]);
2688 ASSERT_TRUE(results[3].IsNotFound());
2689 ASSERT_EQ(values[0], "bar");
2690 ASSERT_EQ(values[1], "barbar");
2691 ASSERT_EQ(values[2], "foo");
2692
2693 s = txn->SingleDelete(handles[2], "ZZZ");
2694 ASSERT_OK(s);
2695 s = txn->Put(handles[2], "ZZZ", "YYY");
2696 ASSERT_OK(s);
2697 s = txn->Put(handles[2], "ZZZ", "YYYY");
2698 ASSERT_OK(s);
2699 s = txn->Delete(handles[2], "ZZZ");
2700 ASSERT_OK(s);
2701 s = txn->Put(handles[2], "AAAZZZ", "barbarbar");
2702 ASSERT_OK(s);
2703
2704 ASSERT_EQ(5, txn->GetNumKeys());
2705
2706 // Txn should commit
2707 s = txn->Commit();
2708 ASSERT_OK(s);
2709 s = db->Get(read_options, handles[2], "ZZZ", &value);
2710 ASSERT_TRUE(s.IsNotFound());
2711
2712 // Put a key which will conflict with the next txn using the previous snapshot
2713 db->Put(write_options, handles[2], "foo", "000");
2714
2715 results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
2716 multiget_keys, &values);
2717 // All results should fail since there was a conflict
2718 ASSERT_TRUE(results[0].IsBusy());
2719 ASSERT_TRUE(results[1].IsBusy());
2720 ASSERT_TRUE(results[2].IsBusy());
2721 ASSERT_TRUE(results[3].IsBusy());
2722
2723 s = db->Get(read_options, handles[2], "foo", &value);
2724 ASSERT_EQ(value, "000");
2725
2726 s = txn2->Commit();
2727 ASSERT_OK(s);
2728
2729 s = db->DropColumnFamily(handles[1]);
2730 ASSERT_OK(s);
2731 s = db->DropColumnFamily(handles[2]);
2732 ASSERT_OK(s);
2733
2734 delete txn;
2735 delete txn2;
2736
2737 for (auto handle : handles) {
2738 delete handle;
2739 }
2740 }
2741
TEST_P(TransactionTest,MultiGetBatchedTest)2742 TEST_P(TransactionTest, MultiGetBatchedTest) {
2743 WriteOptions write_options;
2744 ReadOptions read_options, snapshot_read_options;
2745 TransactionOptions txn_options;
2746 string value;
2747 Status s;
2748
2749 ColumnFamilyHandle* cf;
2750 ColumnFamilyOptions cf_options;
2751
2752 // Create a new column families
2753 s = db->CreateColumnFamily(cf_options, "CF", &cf);
2754 ASSERT_OK(s);
2755
2756 delete cf;
2757 delete db;
2758 db = nullptr;
2759
2760 // open DB with three column families
2761 std::vector<ColumnFamilyDescriptor> column_families;
2762 // have to open default column family
2763 column_families.push_back(
2764 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2765 // open the new column families
2766 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2767 column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2768
2769 std::vector<ColumnFamilyHandle*> handles;
2770
2771 options.merge_operator = MergeOperators::CreateStringAppendOperator();
2772 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2773 assert(db != nullptr);
2774
2775 // Write some data to the db
2776 WriteBatch batch;
2777 batch.Put(handles[1], "aaa", "val1");
2778 batch.Put(handles[1], "bbb", "val2");
2779 batch.Put(handles[1], "ccc", "val3");
2780 batch.Put(handles[1], "ddd", "foo");
2781 batch.Put(handles[1], "eee", "val5");
2782 batch.Put(handles[1], "fff", "val6");
2783 batch.Merge(handles[1], "ggg", "foo");
2784 s = db->Write(write_options, &batch);
2785 ASSERT_OK(s);
2786
2787 Transaction* txn = db->BeginTransaction(write_options);
2788 ASSERT_TRUE(txn);
2789
2790 txn->SetSnapshot();
2791 snapshot_read_options.snapshot = txn->GetSnapshot();
2792
2793 txn_options.set_snapshot = true;
2794 // Write some data to the db
2795 s = txn->Delete(handles[1], "bbb");
2796 ASSERT_OK(s);
2797 s = txn->Put(handles[1], "ccc", "val3_new");
2798 ASSERT_OK(s);
2799 s = txn->Merge(handles[1], "ddd", "bar");
2800 ASSERT_OK(s);
2801
2802 std::vector<Slice> keys = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg"};
2803 std::vector<PinnableSlice> values(keys.size());
2804 std::vector<Status> statuses(keys.size());
2805
2806 txn->MultiGet(snapshot_read_options, handles[1], keys.size(), keys.data(),
2807 values.data(), statuses.data());
2808 ASSERT_TRUE(statuses[0].ok());
2809 ASSERT_EQ(values[0], "val1");
2810 ASSERT_TRUE(statuses[1].IsNotFound());
2811 ASSERT_TRUE(statuses[2].ok());
2812 ASSERT_EQ(values[2], "val3_new");
2813 ASSERT_TRUE(statuses[3].IsMergeInProgress());
2814 ASSERT_TRUE(statuses[4].ok());
2815 ASSERT_EQ(values[4], "val5");
2816 ASSERT_TRUE(statuses[5].ok());
2817 ASSERT_EQ(values[5], "val6");
2818 ASSERT_TRUE(statuses[6].ok());
2819 ASSERT_EQ(values[6], "foo");
2820 delete txn;
2821 for (auto handle : handles) {
2822 delete handle;
2823 }
2824 }
2825
2826 // This test calls WriteBatchWithIndex::MultiGetFromBatchAndDB with a large
2827 // number of keys, i.e greater than MultiGetContext::MAX_BATCH_SIZE, which is
2828 // is 32. This forces autovector allocations in the MultiGet code paths
2829 // to use std::vector in addition to stack allocations. The MultiGet keys
2830 // includes Merges, which are handled specially in MultiGetFromBatchAndDB by
2831 // allocating an autovector of MergeContexts
TEST_P(TransactionTest,MultiGetLargeBatchedTest)2832 TEST_P(TransactionTest, MultiGetLargeBatchedTest) {
2833 WriteOptions write_options;
2834 ReadOptions read_options, snapshot_read_options;
2835 string value;
2836 Status s;
2837
2838 ColumnFamilyHandle* cf;
2839 ColumnFamilyOptions cf_options;
2840
2841 std::vector<std::string> key_str;
2842 for (int i = 0; i < 100; ++i) {
2843 key_str.emplace_back(std::to_string(i));
2844 }
2845 // Create a new column families
2846 s = db->CreateColumnFamily(cf_options, "CF", &cf);
2847 ASSERT_OK(s);
2848
2849 delete cf;
2850 delete db;
2851 db = nullptr;
2852
2853 // open DB with three column families
2854 std::vector<ColumnFamilyDescriptor> column_families;
2855 // have to open default column family
2856 column_families.push_back(
2857 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2858 // open the new column families
2859 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2860 column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2861
2862 std::vector<ColumnFamilyHandle*> handles;
2863
2864 options.merge_operator = MergeOperators::CreateStringAppendOperator();
2865 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2866 assert(db != nullptr);
2867
2868 // Write some data to the db
2869 WriteBatch batch;
2870 for (int i = 0; i < 3 * MultiGetContext::MAX_BATCH_SIZE; ++i) {
2871 std::string val = "val" + std::to_string(i);
2872 batch.Put(handles[1], key_str[i], val);
2873 }
2874 s = db->Write(write_options, &batch);
2875 ASSERT_OK(s);
2876
2877 WriteBatchWithIndex wb;
2878 // Write some data to the db
2879 s = wb.Delete(handles[1], std::to_string(1));
2880 ASSERT_OK(s);
2881 s = wb.Put(handles[1], std::to_string(2), "new_val" + std::to_string(2));
2882 ASSERT_OK(s);
2883 // Write a lot of merges so when we call MultiGetFromBatchAndDB later on,
2884 // it is forced to use std::vector in ROCKSDB_NAMESPACE::autovector to
2885 // allocate MergeContexts. The number of merges needs to be >
2886 // MultiGetContext::MAX_BATCH_SIZE
2887 for (int i = 8; i < MultiGetContext::MAX_BATCH_SIZE + 24; ++i) {
2888 s = wb.Merge(handles[1], std::to_string(i), "merge");
2889 ASSERT_OK(s);
2890 }
2891
2892 // MultiGet a lot of keys in order to force std::vector reallocations
2893 std::vector<Slice> keys;
2894 for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE + 32; ++i) {
2895 keys.emplace_back(key_str[i]);
2896 }
2897 std::vector<PinnableSlice> values(keys.size());
2898 std::vector<Status> statuses(keys.size());
2899
2900 wb.MultiGetFromBatchAndDB(db, snapshot_read_options, handles[1], keys.size(), keys.data(),
2901 values.data(), statuses.data(), false);
2902 for (size_t i =0; i < keys.size(); ++i) {
2903 if (i == 1) {
2904 ASSERT_TRUE(statuses[1].IsNotFound());
2905 } else if (i == 2) {
2906 ASSERT_TRUE(statuses[2].ok());
2907 ASSERT_EQ(values[2], "new_val" + std::to_string(2));
2908 } else if (i >= 8 && i < 56) {
2909 ASSERT_TRUE(statuses[i].ok());
2910 ASSERT_EQ(values[i], "val" + std::to_string(i) + ",merge");
2911 } else {
2912 ASSERT_TRUE(statuses[i].ok());
2913 if (values[i] != "val" + std::to_string(i)) {
2914 ASSERT_EQ(values[i], "val" + std::to_string(i));
2915 }
2916 }
2917 }
2918
2919 for (auto handle : handles) {
2920 delete handle;
2921 }
2922 }
2923
TEST_P(TransactionTest,ColumnFamiliesTest2)2924 TEST_P(TransactionTest, ColumnFamiliesTest2) {
2925 WriteOptions write_options;
2926 ReadOptions read_options, snapshot_read_options;
2927 string value;
2928 Status s;
2929
2930 ColumnFamilyHandle *one, *two;
2931 ColumnFamilyOptions cf_options;
2932
2933 // Create 2 new column families
2934 s = db->CreateColumnFamily(cf_options, "ONE", &one);
2935 ASSERT_OK(s);
2936 s = db->CreateColumnFamily(cf_options, "TWO", &two);
2937 ASSERT_OK(s);
2938
2939 Transaction* txn1 = db->BeginTransaction(write_options);
2940 ASSERT_TRUE(txn1);
2941 Transaction* txn2 = db->BeginTransaction(write_options);
2942 ASSERT_TRUE(txn2);
2943
2944 s = txn1->Put(one, "X", "1");
2945 ASSERT_OK(s);
2946 s = txn1->Put(two, "X", "2");
2947 ASSERT_OK(s);
2948 s = txn1->Put("X", "0");
2949 ASSERT_OK(s);
2950
2951 s = txn2->Put(one, "X", "11");
2952 ASSERT_TRUE(s.IsTimedOut());
2953
2954 s = txn1->Commit();
2955 ASSERT_OK(s);
2956
2957 // Drop first column family
2958 s = db->DropColumnFamily(one);
2959 ASSERT_OK(s);
2960
2961 // Should fail since column family was dropped.
2962 s = txn2->Commit();
2963 ASSERT_OK(s);
2964
2965 delete txn1;
2966 txn1 = db->BeginTransaction(write_options);
2967 ASSERT_TRUE(txn1);
2968
2969 // Should fail since column family was dropped
2970 s = txn1->Put(one, "X", "111");
2971 ASSERT_TRUE(s.IsInvalidArgument());
2972
2973 s = txn1->Put(two, "X", "222");
2974 ASSERT_OK(s);
2975
2976 s = txn1->Put("X", "000");
2977 ASSERT_OK(s);
2978
2979 s = txn1->Commit();
2980 ASSERT_OK(s);
2981
2982 s = db->Get(read_options, two, "X", &value);
2983 ASSERT_OK(s);
2984 ASSERT_EQ("222", value);
2985
2986 s = db->Get(read_options, "X", &value);
2987 ASSERT_OK(s);
2988 ASSERT_EQ("000", value);
2989
2990 s = db->DropColumnFamily(two);
2991 ASSERT_OK(s);
2992
2993 delete txn1;
2994 delete txn2;
2995
2996 delete one;
2997 delete two;
2998 }
2999
TEST_P(TransactionTest,EmptyTest)3000 TEST_P(TransactionTest, EmptyTest) {
3001 WriteOptions write_options;
3002 ReadOptions read_options;
3003 string value;
3004 Status s;
3005
3006 s = db->Put(write_options, "aaa", "aaa");
3007 ASSERT_OK(s);
3008
3009 Transaction* txn = db->BeginTransaction(write_options);
3010 s = txn->Commit();
3011 ASSERT_OK(s);
3012 delete txn;
3013
3014 txn = db->BeginTransaction(write_options);
3015 txn->Rollback();
3016 delete txn;
3017
3018 txn = db->BeginTransaction(write_options);
3019 s = txn->GetForUpdate(read_options, "aaa", &value);
3020 ASSERT_EQ(value, "aaa");
3021
3022 s = txn->Commit();
3023 ASSERT_OK(s);
3024 delete txn;
3025
3026 txn = db->BeginTransaction(write_options);
3027 txn->SetSnapshot();
3028
3029 s = txn->GetForUpdate(read_options, "aaa", &value);
3030 ASSERT_EQ(value, "aaa");
3031
3032 // Conflicts with previous GetForUpdate
3033 s = db->Put(write_options, "aaa", "xxx");
3034 ASSERT_TRUE(s.IsTimedOut());
3035
3036 // transaction expired!
3037 s = txn->Commit();
3038 ASSERT_OK(s);
3039 delete txn;
3040 }
3041
TEST_P(TransactionTest,PredicateManyPreceders)3042 TEST_P(TransactionTest, PredicateManyPreceders) {
3043 WriteOptions write_options;
3044 ReadOptions read_options1, read_options2;
3045 TransactionOptions txn_options;
3046 string value;
3047 Status s;
3048
3049 txn_options.set_snapshot = true;
3050 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3051 read_options1.snapshot = txn1->GetSnapshot();
3052
3053 Transaction* txn2 = db->BeginTransaction(write_options);
3054 txn2->SetSnapshot();
3055 read_options2.snapshot = txn2->GetSnapshot();
3056
3057 std::vector<Slice> multiget_keys = {"1", "2", "3"};
3058 std::vector<std::string> multiget_values;
3059
3060 std::vector<Status> results =
3061 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
3062 ASSERT_TRUE(results[1].IsNotFound());
3063
3064 s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate
3065 ASSERT_TRUE(s.IsTimedOut());
3066
3067 txn2->Rollback();
3068
3069 multiget_values.clear();
3070 results =
3071 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
3072 ASSERT_TRUE(results[1].IsNotFound());
3073
3074 s = txn1->Commit();
3075 ASSERT_OK(s);
3076
3077 delete txn1;
3078 delete txn2;
3079
3080 txn1 = db->BeginTransaction(write_options, txn_options);
3081 read_options1.snapshot = txn1->GetSnapshot();
3082
3083 txn2 = db->BeginTransaction(write_options, txn_options);
3084 read_options2.snapshot = txn2->GetSnapshot();
3085
3086 s = txn1->Put("4", "x");
3087 ASSERT_OK(s);
3088
3089 s = txn2->Delete("4"); // conflict
3090 ASSERT_TRUE(s.IsTimedOut());
3091
3092 s = txn1->Commit();
3093 ASSERT_OK(s);
3094
3095 s = txn2->GetForUpdate(read_options2, "4", &value);
3096 ASSERT_TRUE(s.IsBusy());
3097
3098 txn2->Rollback();
3099
3100 delete txn1;
3101 delete txn2;
3102 }
3103
TEST_P(TransactionTest,LostUpdate)3104 TEST_P(TransactionTest, LostUpdate) {
3105 WriteOptions write_options;
3106 ReadOptions read_options, read_options1, read_options2;
3107 TransactionOptions txn_options;
3108 std::string value;
3109 Status s;
3110
3111 // Test 2 transactions writing to the same key in multiple orders and
3112 // with/without snapshots
3113
3114 Transaction* txn1 = db->BeginTransaction(write_options);
3115 Transaction* txn2 = db->BeginTransaction(write_options);
3116
3117 s = txn1->Put("1", "1");
3118 ASSERT_OK(s);
3119
3120 s = txn2->Put("1", "2"); // conflict
3121 ASSERT_TRUE(s.IsTimedOut());
3122
3123 s = txn2->Commit();
3124 ASSERT_OK(s);
3125
3126 s = txn1->Commit();
3127 ASSERT_OK(s);
3128
3129 s = db->Get(read_options, "1", &value);
3130 ASSERT_OK(s);
3131 ASSERT_EQ("1", value);
3132
3133 delete txn1;
3134 delete txn2;
3135
3136 txn_options.set_snapshot = true;
3137 txn1 = db->BeginTransaction(write_options, txn_options);
3138 read_options1.snapshot = txn1->GetSnapshot();
3139
3140 txn2 = db->BeginTransaction(write_options, txn_options);
3141 read_options2.snapshot = txn2->GetSnapshot();
3142
3143 s = txn1->Put("1", "3");
3144 ASSERT_OK(s);
3145 s = txn2->Put("1", "4"); // conflict
3146 ASSERT_TRUE(s.IsTimedOut());
3147
3148 s = txn1->Commit();
3149 ASSERT_OK(s);
3150
3151 s = txn2->Commit();
3152 ASSERT_OK(s);
3153
3154 s = db->Get(read_options, "1", &value);
3155 ASSERT_OK(s);
3156 ASSERT_EQ("3", value);
3157
3158 delete txn1;
3159 delete txn2;
3160
3161 txn1 = db->BeginTransaction(write_options, txn_options);
3162 read_options1.snapshot = txn1->GetSnapshot();
3163
3164 txn2 = db->BeginTransaction(write_options, txn_options);
3165 read_options2.snapshot = txn2->GetSnapshot();
3166
3167 s = txn1->Put("1", "5");
3168 ASSERT_OK(s);
3169
3170 s = txn1->Commit();
3171 ASSERT_OK(s);
3172
3173 s = txn2->Put("1", "6");
3174 ASSERT_TRUE(s.IsBusy());
3175 s = txn2->Commit();
3176 ASSERT_OK(s);
3177
3178 s = db->Get(read_options, "1", &value);
3179 ASSERT_OK(s);
3180 ASSERT_EQ("5", value);
3181
3182 delete txn1;
3183 delete txn2;
3184
3185 txn1 = db->BeginTransaction(write_options, txn_options);
3186 read_options1.snapshot = txn1->GetSnapshot();
3187
3188 txn2 = db->BeginTransaction(write_options, txn_options);
3189 read_options2.snapshot = txn2->GetSnapshot();
3190
3191 s = txn1->Put("1", "7");
3192 ASSERT_OK(s);
3193 s = txn1->Commit();
3194 ASSERT_OK(s);
3195
3196 txn2->SetSnapshot();
3197 s = txn2->Put("1", "8");
3198 ASSERT_OK(s);
3199 s = txn2->Commit();
3200 ASSERT_OK(s);
3201
3202 s = db->Get(read_options, "1", &value);
3203 ASSERT_OK(s);
3204 ASSERT_EQ("8", value);
3205
3206 delete txn1;
3207 delete txn2;
3208
3209 txn1 = db->BeginTransaction(write_options);
3210 txn2 = db->BeginTransaction(write_options);
3211
3212 s = txn1->Put("1", "9");
3213 ASSERT_OK(s);
3214 s = txn1->Commit();
3215 ASSERT_OK(s);
3216
3217 s = txn2->Put("1", "10");
3218 ASSERT_OK(s);
3219 s = txn2->Commit();
3220 ASSERT_OK(s);
3221
3222 delete txn1;
3223 delete txn2;
3224
3225 s = db->Get(read_options, "1", &value);
3226 ASSERT_OK(s);
3227 ASSERT_EQ(value, "10");
3228 }
3229
TEST_P(TransactionTest,UntrackedWrites)3230 TEST_P(TransactionTest, UntrackedWrites) {
3231 if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3232 // TODO(lth): For WriteUnprepared, validate that untracked writes are
3233 // not supported.
3234 return;
3235 }
3236
3237 WriteOptions write_options;
3238 ReadOptions read_options;
3239 std::string value;
3240 Status s;
3241
3242 // Verify transaction rollback works for untracked keys.
3243 Transaction* txn = db->BeginTransaction(write_options);
3244 txn->SetSnapshot();
3245
3246 s = txn->PutUntracked("untracked", "0");
3247 ASSERT_OK(s);
3248 txn->Rollback();
3249 s = db->Get(read_options, "untracked", &value);
3250 ASSERT_TRUE(s.IsNotFound());
3251
3252 delete txn;
3253 txn = db->BeginTransaction(write_options);
3254 txn->SetSnapshot();
3255
3256 s = db->Put(write_options, "untracked", "x");
3257 ASSERT_OK(s);
3258
3259 // Untracked writes should succeed even though key was written after snapshot
3260 s = txn->PutUntracked("untracked", "1");
3261 ASSERT_OK(s);
3262 s = txn->MergeUntracked("untracked", "2");
3263 ASSERT_OK(s);
3264 s = txn->DeleteUntracked("untracked");
3265 ASSERT_OK(s);
3266
3267 // Conflict
3268 s = txn->Put("untracked", "3");
3269 ASSERT_TRUE(s.IsBusy());
3270
3271 s = txn->Commit();
3272 ASSERT_OK(s);
3273
3274 s = db->Get(read_options, "untracked", &value);
3275 ASSERT_TRUE(s.IsNotFound());
3276
3277 delete txn;
3278 }
3279
TEST_P(TransactionTest,ExpiredTransaction)3280 TEST_P(TransactionTest, ExpiredTransaction) {
3281 WriteOptions write_options;
3282 ReadOptions read_options;
3283 TransactionOptions txn_options;
3284 string value;
3285 Status s;
3286
3287 // Set txn expiration timeout to 0 microseconds (expires instantly)
3288 txn_options.expiration = 0;
3289 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3290
3291 s = txn1->Put("X", "1");
3292 ASSERT_OK(s);
3293
3294 s = txn1->Put("Y", "1");
3295 ASSERT_OK(s);
3296
3297 Transaction* txn2 = db->BeginTransaction(write_options);
3298
3299 // txn2 should be able to write to X since txn1 has expired
3300 s = txn2->Put("X", "2");
3301 ASSERT_OK(s);
3302
3303 s = txn2->Commit();
3304 ASSERT_OK(s);
3305 s = db->Get(read_options, "X", &value);
3306 ASSERT_OK(s);
3307 ASSERT_EQ("2", value);
3308
3309 s = txn1->Put("Z", "1");
3310 ASSERT_OK(s);
3311
3312 // txn1 should fail to commit since it is expired
3313 s = txn1->Commit();
3314 ASSERT_TRUE(s.IsExpired());
3315
3316 s = db->Get(read_options, "Y", &value);
3317 ASSERT_TRUE(s.IsNotFound());
3318
3319 s = db->Get(read_options, "Z", &value);
3320 ASSERT_TRUE(s.IsNotFound());
3321
3322 delete txn1;
3323 delete txn2;
3324 }
3325
TEST_P(TransactionTest,ReinitializeTest)3326 TEST_P(TransactionTest, ReinitializeTest) {
3327 WriteOptions write_options;
3328 ReadOptions read_options;
3329 TransactionOptions txn_options;
3330 std::string value;
3331 Status s;
3332
3333 // Set txn expiration timeout to 0 microseconds (expires instantly)
3334 txn_options.expiration = 0;
3335 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3336
3337 // Reinitialize transaction to no long expire
3338 txn_options.expiration = -1;
3339 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3340
3341 s = txn1->Put("Z", "z");
3342 ASSERT_OK(s);
3343
3344 // Should commit since not expired
3345 s = txn1->Commit();
3346 ASSERT_OK(s);
3347
3348 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3349
3350 s = txn1->Put("Z", "zz");
3351 ASSERT_OK(s);
3352
3353 // Reinitilize txn1 and verify that Z gets unlocked
3354 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3355
3356 Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr);
3357 s = txn2->Put("Z", "zzz");
3358 ASSERT_OK(s);
3359 s = txn2->Commit();
3360 ASSERT_OK(s);
3361 delete txn2;
3362
3363 s = db->Get(read_options, "Z", &value);
3364 ASSERT_OK(s);
3365 ASSERT_EQ(value, "zzz");
3366
3367 // Verify snapshots get reinitialized correctly
3368 txn1->SetSnapshot();
3369 s = txn1->Put("Z", "zzzz");
3370 ASSERT_OK(s);
3371
3372 s = txn1->Commit();
3373 ASSERT_OK(s);
3374
3375 s = db->Get(read_options, "Z", &value);
3376 ASSERT_OK(s);
3377 ASSERT_EQ(value, "zzzz");
3378
3379 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3380 const Snapshot* snapshot = txn1->GetSnapshot();
3381 ASSERT_FALSE(snapshot);
3382
3383 txn_options.set_snapshot = true;
3384 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3385 snapshot = txn1->GetSnapshot();
3386 ASSERT_TRUE(snapshot);
3387
3388 s = txn1->Put("Z", "a");
3389 ASSERT_OK(s);
3390
3391 txn1->Rollback();
3392
3393 s = txn1->Put("Y", "y");
3394 ASSERT_OK(s);
3395
3396 txn_options.set_snapshot = false;
3397 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3398 snapshot = txn1->GetSnapshot();
3399 ASSERT_FALSE(snapshot);
3400
3401 s = txn1->Put("X", "x");
3402 ASSERT_OK(s);
3403
3404 s = txn1->Commit();
3405 ASSERT_OK(s);
3406
3407 s = db->Get(read_options, "Z", &value);
3408 ASSERT_OK(s);
3409 ASSERT_EQ(value, "zzzz");
3410
3411 s = db->Get(read_options, "Y", &value);
3412 ASSERT_TRUE(s.IsNotFound());
3413
3414 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3415
3416 s = txn1->SetName("name");
3417 ASSERT_OK(s);
3418
3419 s = txn1->Prepare();
3420 ASSERT_OK(s);
3421 s = txn1->Commit();
3422 ASSERT_OK(s);
3423
3424 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3425
3426 s = txn1->SetName("name");
3427 ASSERT_OK(s);
3428
3429 delete txn1;
3430 }
3431
TEST_P(TransactionTest,Rollback)3432 TEST_P(TransactionTest, Rollback) {
3433 WriteOptions write_options;
3434 ReadOptions read_options;
3435 TransactionOptions txn_options;
3436 std::string value;
3437 Status s;
3438
3439 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3440
3441 ASSERT_OK(s);
3442
3443 s = txn1->Put("X", "1");
3444 ASSERT_OK(s);
3445
3446 Transaction* txn2 = db->BeginTransaction(write_options);
3447
3448 // txn2 should not be able to write to X since txn1 has it locked
3449 s = txn2->Put("X", "2");
3450 ASSERT_TRUE(s.IsTimedOut());
3451
3452 txn1->Rollback();
3453 delete txn1;
3454
3455 // txn2 should now be able to write to X
3456 s = txn2->Put("X", "3");
3457 ASSERT_OK(s);
3458
3459 s = txn2->Commit();
3460 ASSERT_OK(s);
3461
3462 s = db->Get(read_options, "X", &value);
3463 ASSERT_OK(s);
3464 ASSERT_EQ("3", value);
3465
3466 delete txn2;
3467 }
3468
TEST_P(TransactionTest,LockLimitTest)3469 TEST_P(TransactionTest, LockLimitTest) {
3470 WriteOptions write_options;
3471 ReadOptions read_options, snapshot_read_options;
3472 TransactionOptions txn_options;
3473 string value;
3474 Status s;
3475
3476 delete db;
3477 db = nullptr;
3478
3479 // Open DB with a lock limit of 3
3480 txn_db_options.max_num_locks = 3;
3481 ASSERT_OK(ReOpen());
3482 assert(db != nullptr);
3483 ASSERT_OK(s);
3484
3485 // Create a txn and verify we can only lock up to 3 keys
3486 Transaction* txn = db->BeginTransaction(write_options, txn_options);
3487 ASSERT_TRUE(txn);
3488
3489 s = txn->Put("X", "x");
3490 ASSERT_OK(s);
3491
3492 s = txn->Put("Y", "y");
3493 ASSERT_OK(s);
3494
3495 s = txn->Put("Z", "z");
3496 ASSERT_OK(s);
3497
3498 // lock limit reached
3499 s = txn->Put("W", "w");
3500 ASSERT_TRUE(s.IsBusy());
3501
3502 // re-locking same key shouldn't put us over the limit
3503 s = txn->Put("X", "xx");
3504 ASSERT_OK(s);
3505
3506 s = txn->GetForUpdate(read_options, "W", &value);
3507 ASSERT_TRUE(s.IsBusy());
3508 s = txn->GetForUpdate(read_options, "V", &value);
3509 ASSERT_TRUE(s.IsBusy());
3510
3511 // re-locking same key shouldn't put us over the limit
3512 s = txn->GetForUpdate(read_options, "Y", &value);
3513 ASSERT_OK(s);
3514 ASSERT_EQ("y", value);
3515
3516 s = txn->Get(read_options, "W", &value);
3517 ASSERT_TRUE(s.IsNotFound());
3518
3519 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
3520 ASSERT_TRUE(txn2);
3521
3522 // "X" currently locked
3523 s = txn2->Put("X", "x");
3524 ASSERT_TRUE(s.IsTimedOut());
3525
3526 // lock limit reached
3527 s = txn2->Put("M", "m");
3528 ASSERT_TRUE(s.IsBusy());
3529
3530 s = txn->Commit();
3531 ASSERT_OK(s);
3532
3533 s = db->Get(read_options, "X", &value);
3534 ASSERT_OK(s);
3535 ASSERT_EQ("xx", value);
3536
3537 s = db->Get(read_options, "W", &value);
3538 ASSERT_TRUE(s.IsNotFound());
3539
3540 // Committing txn should release its locks and allow txn2 to proceed
3541 s = txn2->Put("X", "x2");
3542 ASSERT_OK(s);
3543
3544 s = txn2->Delete("X");
3545 ASSERT_OK(s);
3546
3547 s = txn2->Put("M", "m");
3548 ASSERT_OK(s);
3549
3550 s = txn2->Put("Z", "z2");
3551 ASSERT_OK(s);
3552
3553 // lock limit reached
3554 s = txn2->Delete("Y");
3555 ASSERT_TRUE(s.IsBusy());
3556
3557 s = txn2->Commit();
3558 ASSERT_OK(s);
3559
3560 s = db->Get(read_options, "Z", &value);
3561 ASSERT_OK(s);
3562 ASSERT_EQ("z2", value);
3563
3564 s = db->Get(read_options, "Y", &value);
3565 ASSERT_OK(s);
3566 ASSERT_EQ("y", value);
3567
3568 s = db->Get(read_options, "X", &value);
3569 ASSERT_TRUE(s.IsNotFound());
3570
3571 delete txn;
3572 delete txn2;
3573 }
3574
TEST_P(TransactionTest,IteratorTest)3575 TEST_P(TransactionTest, IteratorTest) {
3576 // This test does writes without snapshot validation, and then tries to create
3577 // iterator later, which is unsupported in write unprepared.
3578 if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3579 return;
3580 }
3581
3582 WriteOptions write_options;
3583 ReadOptions read_options, snapshot_read_options;
3584 std::string value;
3585 Status s;
3586
3587 // Write some keys to the db
3588 s = db->Put(write_options, "A", "a");
3589 ASSERT_OK(s);
3590
3591 s = db->Put(write_options, "G", "g");
3592 ASSERT_OK(s);
3593
3594 s = db->Put(write_options, "F", "f");
3595 ASSERT_OK(s);
3596
3597 s = db->Put(write_options, "C", "c");
3598 ASSERT_OK(s);
3599
3600 s = db->Put(write_options, "D", "d");
3601 ASSERT_OK(s);
3602
3603 Transaction* txn = db->BeginTransaction(write_options);
3604 ASSERT_TRUE(txn);
3605
3606 // Write some keys in a txn
3607 s = txn->Put("B", "b");
3608 ASSERT_OK(s);
3609
3610 s = txn->Put("H", "h");
3611 ASSERT_OK(s);
3612
3613 s = txn->Delete("D");
3614 ASSERT_OK(s);
3615
3616 s = txn->Put("E", "e");
3617 ASSERT_OK(s);
3618
3619 txn->SetSnapshot();
3620 const Snapshot* snapshot = txn->GetSnapshot();
3621
3622 // Write some keys to the db after the snapshot
3623 s = db->Put(write_options, "BB", "xx");
3624 ASSERT_OK(s);
3625
3626 s = db->Put(write_options, "C", "xx");
3627 ASSERT_OK(s);
3628
3629 read_options.snapshot = snapshot;
3630 Iterator* iter = txn->GetIterator(read_options);
3631 ASSERT_OK(iter->status());
3632 iter->SeekToFirst();
3633
3634 // Read all keys via iter and lock them all
3635 std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
3636 for (int i = 0; i < 7; i++) {
3637 ASSERT_OK(iter->status());
3638 ASSERT_TRUE(iter->Valid());
3639 ASSERT_EQ(results[i], iter->value().ToString());
3640
3641 s = txn->GetForUpdate(read_options, iter->key(), nullptr);
3642 if (i == 2) {
3643 // "C" was modified after txn's snapshot
3644 ASSERT_TRUE(s.IsBusy());
3645 } else {
3646 ASSERT_OK(s);
3647 }
3648
3649 iter->Next();
3650 }
3651 ASSERT_FALSE(iter->Valid());
3652
3653 iter->Seek("G");
3654 ASSERT_OK(iter->status());
3655 ASSERT_TRUE(iter->Valid());
3656 ASSERT_EQ("g", iter->value().ToString());
3657
3658 iter->Prev();
3659 ASSERT_OK(iter->status());
3660 ASSERT_TRUE(iter->Valid());
3661 ASSERT_EQ("f", iter->value().ToString());
3662
3663 iter->Seek("D");
3664 ASSERT_OK(iter->status());
3665 ASSERT_TRUE(iter->Valid());
3666 ASSERT_EQ("e", iter->value().ToString());
3667
3668 iter->Seek("C");
3669 ASSERT_OK(iter->status());
3670 ASSERT_TRUE(iter->Valid());
3671 ASSERT_EQ("c", iter->value().ToString());
3672
3673 iter->Next();
3674 ASSERT_OK(iter->status());
3675 ASSERT_TRUE(iter->Valid());
3676 ASSERT_EQ("e", iter->value().ToString());
3677
3678 iter->Seek("");
3679 ASSERT_OK(iter->status());
3680 ASSERT_TRUE(iter->Valid());
3681 ASSERT_EQ("a", iter->value().ToString());
3682
3683 iter->Seek("X");
3684 ASSERT_OK(iter->status());
3685 ASSERT_FALSE(iter->Valid());
3686
3687 iter->SeekToLast();
3688 ASSERT_OK(iter->status());
3689 ASSERT_TRUE(iter->Valid());
3690 ASSERT_EQ("h", iter->value().ToString());
3691
3692 s = txn->Commit();
3693 ASSERT_OK(s);
3694
3695 delete iter;
3696 delete txn;
3697 }
3698
TEST_P(TransactionTest,DisableIndexingTest)3699 TEST_P(TransactionTest, DisableIndexingTest) {
3700 // Skip this test for write unprepared. It does not solely rely on WBWI for
3701 // read your own writes, so depending on whether batches are flushed or not,
3702 // only some writes will be visible.
3703 //
3704 // Also, write unprepared does not support creating iterators if there has
3705 // been txn->Put() without snapshot validation.
3706 if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3707 return;
3708 }
3709
3710 WriteOptions write_options;
3711 ReadOptions read_options;
3712 std::string value;
3713 Status s;
3714
3715 Transaction* txn = db->BeginTransaction(write_options);
3716 ASSERT_TRUE(txn);
3717
3718 s = txn->Put("A", "a");
3719 ASSERT_OK(s);
3720
3721 s = txn->Get(read_options, "A", &value);
3722 ASSERT_OK(s);
3723 ASSERT_EQ("a", value);
3724
3725 txn->DisableIndexing();
3726
3727 s = txn->Put("B", "b");
3728 ASSERT_OK(s);
3729
3730 s = txn->Get(read_options, "B", &value);
3731 ASSERT_TRUE(s.IsNotFound());
3732
3733 Iterator* iter = txn->GetIterator(read_options);
3734 ASSERT_OK(iter->status());
3735
3736 iter->Seek("B");
3737 ASSERT_OK(iter->status());
3738 ASSERT_FALSE(iter->Valid());
3739
3740 s = txn->Delete("A");
3741
3742 s = txn->Get(read_options, "A", &value);
3743 ASSERT_OK(s);
3744 ASSERT_EQ("a", value);
3745
3746 txn->EnableIndexing();
3747
3748 s = txn->Put("B", "bb");
3749 ASSERT_OK(s);
3750
3751 iter->Seek("B");
3752 ASSERT_OK(iter->status());
3753 ASSERT_TRUE(iter->Valid());
3754 ASSERT_EQ("bb", iter->value().ToString());
3755
3756 s = txn->Get(read_options, "B", &value);
3757 ASSERT_OK(s);
3758 ASSERT_EQ("bb", value);
3759
3760 s = txn->Put("A", "aa");
3761 ASSERT_OK(s);
3762
3763 s = txn->Get(read_options, "A", &value);
3764 ASSERT_OK(s);
3765 ASSERT_EQ("aa", value);
3766
3767 delete iter;
3768 delete txn;
3769 }
3770
TEST_P(TransactionTest,SavepointTest)3771 TEST_P(TransactionTest, SavepointTest) {
3772 WriteOptions write_options;
3773 ReadOptions read_options, snapshot_read_options;
3774 std::string value;
3775 Status s;
3776
3777 Transaction* txn = db->BeginTransaction(write_options);
3778 ASSERT_TRUE(txn);
3779
3780 ASSERT_EQ(0, txn->GetNumPuts());
3781
3782 s = txn->RollbackToSavePoint();
3783 ASSERT_TRUE(s.IsNotFound());
3784
3785 txn->SetSavePoint(); // 1
3786
3787 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
3788 s = txn->RollbackToSavePoint();
3789 ASSERT_TRUE(s.IsNotFound());
3790
3791 s = txn->Put("B", "b");
3792 ASSERT_OK(s);
3793
3794 ASSERT_EQ(1, txn->GetNumPuts());
3795 ASSERT_EQ(0, txn->GetNumDeletes());
3796
3797 s = txn->Commit();
3798 ASSERT_OK(s);
3799
3800 s = db->Get(read_options, "B", &value);
3801 ASSERT_OK(s);
3802 ASSERT_EQ("b", value);
3803
3804 delete txn;
3805 txn = db->BeginTransaction(write_options);
3806 ASSERT_TRUE(txn);
3807
3808 s = txn->Put("A", "a");
3809 ASSERT_OK(s);
3810
3811 s = txn->Put("B", "bb");
3812 ASSERT_OK(s);
3813
3814 s = txn->Put("C", "c");
3815 ASSERT_OK(s);
3816
3817 txn->SetSavePoint(); // 2
3818
3819 s = txn->Delete("B");
3820 ASSERT_OK(s);
3821
3822 s = txn->Put("C", "cc");
3823 ASSERT_OK(s);
3824
3825 s = txn->Put("D", "d");
3826 ASSERT_OK(s);
3827
3828 ASSERT_EQ(5, txn->GetNumPuts());
3829 ASSERT_EQ(1, txn->GetNumDeletes());
3830
3831 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
3832
3833 ASSERT_EQ(3, txn->GetNumPuts());
3834 ASSERT_EQ(0, txn->GetNumDeletes());
3835
3836 s = txn->Get(read_options, "A", &value);
3837 ASSERT_OK(s);
3838 ASSERT_EQ("a", value);
3839
3840 s = txn->Get(read_options, "B", &value);
3841 ASSERT_OK(s);
3842 ASSERT_EQ("bb", value);
3843
3844 s = txn->Get(read_options, "C", &value);
3845 ASSERT_OK(s);
3846 ASSERT_EQ("c", value);
3847
3848 s = txn->Get(read_options, "D", &value);
3849 ASSERT_TRUE(s.IsNotFound());
3850
3851 s = txn->Put("A", "a");
3852 ASSERT_OK(s);
3853
3854 s = txn->Put("E", "e");
3855 ASSERT_OK(s);
3856
3857 ASSERT_EQ(5, txn->GetNumPuts());
3858 ASSERT_EQ(0, txn->GetNumDeletes());
3859
3860 // Rollback to beginning of txn
3861 s = txn->RollbackToSavePoint();
3862 ASSERT_TRUE(s.IsNotFound());
3863 txn->Rollback();
3864
3865 ASSERT_EQ(0, txn->GetNumPuts());
3866 ASSERT_EQ(0, txn->GetNumDeletes());
3867
3868 s = txn->Get(read_options, "A", &value);
3869 ASSERT_TRUE(s.IsNotFound());
3870
3871 s = txn->Get(read_options, "B", &value);
3872 ASSERT_OK(s);
3873 ASSERT_EQ("b", value);
3874
3875 s = txn->Get(read_options, "D", &value);
3876 ASSERT_TRUE(s.IsNotFound());
3877
3878 s = txn->Get(read_options, "D", &value);
3879 ASSERT_TRUE(s.IsNotFound());
3880
3881 s = txn->Get(read_options, "E", &value);
3882 ASSERT_TRUE(s.IsNotFound());
3883
3884 s = txn->Put("A", "aa");
3885 ASSERT_OK(s);
3886
3887 s = txn->Put("F", "f");
3888 ASSERT_OK(s);
3889
3890 ASSERT_EQ(2, txn->GetNumPuts());
3891 ASSERT_EQ(0, txn->GetNumDeletes());
3892
3893 txn->SetSavePoint(); // 3
3894 txn->SetSavePoint(); // 4
3895
3896 s = txn->Put("G", "g");
3897 ASSERT_OK(s);
3898
3899 s = txn->SingleDelete("F");
3900 ASSERT_OK(s);
3901
3902 s = txn->Delete("B");
3903 ASSERT_OK(s);
3904
3905 s = txn->Get(read_options, "A", &value);
3906 ASSERT_OK(s);
3907 ASSERT_EQ("aa", value);
3908
3909 s = txn->Get(read_options, "F", &value);
3910 // According to db.h, doing a SingleDelete on a key that has been
3911 // overwritten will have undefinied behavior. So it is unclear what the
3912 // result of fetching "F" should be. The current implementation will
3913 // return NotFound in this case.
3914 ASSERT_TRUE(s.IsNotFound());
3915
3916 s = txn->Get(read_options, "B", &value);
3917 ASSERT_TRUE(s.IsNotFound());
3918
3919 ASSERT_EQ(3, txn->GetNumPuts());
3920 ASSERT_EQ(2, txn->GetNumDeletes());
3921
3922 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
3923
3924 ASSERT_EQ(2, txn->GetNumPuts());
3925 ASSERT_EQ(0, txn->GetNumDeletes());
3926
3927 s = txn->Get(read_options, "F", &value);
3928 ASSERT_OK(s);
3929 ASSERT_EQ("f", value);
3930
3931 s = txn->Get(read_options, "G", &value);
3932 ASSERT_TRUE(s.IsNotFound());
3933
3934 s = txn->Commit();
3935 ASSERT_OK(s);
3936
3937 s = db->Get(read_options, "F", &value);
3938 ASSERT_OK(s);
3939 ASSERT_EQ("f", value);
3940
3941 s = db->Get(read_options, "G", &value);
3942 ASSERT_TRUE(s.IsNotFound());
3943
3944 s = db->Get(read_options, "A", &value);
3945 ASSERT_OK(s);
3946 ASSERT_EQ("aa", value);
3947
3948 s = db->Get(read_options, "B", &value);
3949 ASSERT_OK(s);
3950 ASSERT_EQ("b", value);
3951
3952 s = db->Get(read_options, "C", &value);
3953 ASSERT_TRUE(s.IsNotFound());
3954
3955 s = db->Get(read_options, "D", &value);
3956 ASSERT_TRUE(s.IsNotFound());
3957
3958 s = db->Get(read_options, "E", &value);
3959 ASSERT_TRUE(s.IsNotFound());
3960
3961 delete txn;
3962 }
3963
TEST_P(TransactionTest,SavepointTest2)3964 TEST_P(TransactionTest, SavepointTest2) {
3965 WriteOptions write_options;
3966 ReadOptions read_options;
3967 TransactionOptions txn_options;
3968 Status s;
3969
3970 txn_options.lock_timeout = 1; // 1 ms
3971 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3972 ASSERT_TRUE(txn1);
3973
3974 s = txn1->Put("A", "");
3975 ASSERT_OK(s);
3976
3977 txn1->SetSavePoint(); // 1
3978
3979 s = txn1->Put("A", "a");
3980 ASSERT_OK(s);
3981
3982 s = txn1->Put("C", "c");
3983 ASSERT_OK(s);
3984
3985 txn1->SetSavePoint(); // 2
3986
3987 s = txn1->Put("A", "a");
3988 ASSERT_OK(s);
3989 s = txn1->Put("B", "b");
3990 ASSERT_OK(s);
3991
3992 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2
3993
3994 // Verify that "A" and "C" is still locked while "B" is not
3995 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
3996 ASSERT_TRUE(txn2);
3997
3998 s = txn2->Put("A", "a2");
3999 ASSERT_TRUE(s.IsTimedOut());
4000 s = txn2->Put("C", "c2");
4001 ASSERT_TRUE(s.IsTimedOut());
4002 s = txn2->Put("B", "b2");
4003 ASSERT_OK(s);
4004
4005 s = txn1->Put("A", "aa");
4006 ASSERT_OK(s);
4007 s = txn1->Put("B", "bb");
4008 ASSERT_TRUE(s.IsTimedOut());
4009
4010 s = txn2->Commit();
4011 ASSERT_OK(s);
4012 delete txn2;
4013
4014 s = txn1->Put("A", "aaa");
4015 ASSERT_OK(s);
4016 s = txn1->Put("B", "bbb");
4017 ASSERT_OK(s);
4018 s = txn1->Put("C", "ccc");
4019 ASSERT_OK(s);
4020
4021 txn1->SetSavePoint(); // 3
4022 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3
4023
4024 // Verify that "A", "B", "C" are still locked
4025 txn2 = db->BeginTransaction(write_options, txn_options);
4026 ASSERT_TRUE(txn2);
4027
4028 s = txn2->Put("A", "a2");
4029 ASSERT_TRUE(s.IsTimedOut());
4030 s = txn2->Put("B", "b2");
4031 ASSERT_TRUE(s.IsTimedOut());
4032 s = txn2->Put("C", "c2");
4033 ASSERT_TRUE(s.IsTimedOut());
4034
4035 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
4036
4037 // Verify that only "A" is locked
4038 s = txn2->Put("A", "a3");
4039 ASSERT_TRUE(s.IsTimedOut());
4040 s = txn2->Put("B", "b3");
4041 ASSERT_OK(s);
4042 s = txn2->Put("C", "c3po");
4043 ASSERT_OK(s);
4044
4045 s = txn1->Commit();
4046 ASSERT_OK(s);
4047 delete txn1;
4048
4049 // Verify "A" "C" "B" are no longer locked
4050 s = txn2->Put("A", "a4");
4051 ASSERT_OK(s);
4052 s = txn2->Put("B", "b4");
4053 ASSERT_OK(s);
4054 s = txn2->Put("C", "c4");
4055 ASSERT_OK(s);
4056
4057 s = txn2->Commit();
4058 ASSERT_OK(s);
4059 delete txn2;
4060 }
4061
TEST_P(TransactionTest,SavepointTest3)4062 TEST_P(TransactionTest, SavepointTest3) {
4063 WriteOptions write_options;
4064 ReadOptions read_options;
4065 TransactionOptions txn_options;
4066 Status s;
4067
4068 txn_options.lock_timeout = 1; // 1 ms
4069 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4070 ASSERT_TRUE(txn1);
4071
4072 s = txn1->PopSavePoint(); // No SavePoint present
4073 ASSERT_TRUE(s.IsNotFound());
4074
4075 s = txn1->Put("A", "");
4076 ASSERT_OK(s);
4077
4078 s = txn1->PopSavePoint(); // Still no SavePoint present
4079 ASSERT_TRUE(s.IsNotFound());
4080
4081 txn1->SetSavePoint(); // 1
4082
4083 s = txn1->Put("A", "a");
4084 ASSERT_OK(s);
4085
4086 s = txn1->PopSavePoint(); // Remove 1
4087 ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
4088
4089 // Verify that "A" is still locked
4090 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4091 ASSERT_TRUE(txn2);
4092
4093 s = txn2->Put("A", "a2");
4094 ASSERT_TRUE(s.IsTimedOut());
4095 delete txn2;
4096
4097 txn1->SetSavePoint(); // 2
4098
4099 s = txn1->Put("B", "b");
4100 ASSERT_OK(s);
4101
4102 txn1->SetSavePoint(); // 3
4103
4104 s = txn1->Put("B", "b2");
4105 ASSERT_OK(s);
4106
4107 ASSERT_OK(txn1->RollbackToSavePoint()); // Roll back to 2
4108
4109 s = txn1->PopSavePoint();
4110 ASSERT_OK(s);
4111
4112 s = txn1->PopSavePoint();
4113 ASSERT_TRUE(s.IsNotFound());
4114
4115 s = txn1->Commit();
4116 ASSERT_OK(s);
4117 delete txn1;
4118
4119 std::string value;
4120
4121 // tnx1 should have modified "A" to "a"
4122 s = db->Get(read_options, "A", &value);
4123 ASSERT_OK(s);
4124 ASSERT_EQ("a", value);
4125
4126 // tnx1 should have set "B" to just "b"
4127 s = db->Get(read_options, "B", &value);
4128 ASSERT_OK(s);
4129 ASSERT_EQ("b", value);
4130
4131 s = db->Get(read_options, "C", &value);
4132 ASSERT_TRUE(s.IsNotFound());
4133 }
4134
TEST_P(TransactionTest,SavepointTest4)4135 TEST_P(TransactionTest, SavepointTest4) {
4136 WriteOptions write_options;
4137 ReadOptions read_options;
4138 TransactionOptions txn_options;
4139 Status s;
4140
4141 txn_options.lock_timeout = 1; // 1 ms
4142 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4143 ASSERT_TRUE(txn1);
4144
4145 txn1->SetSavePoint(); // 1
4146 s = txn1->Put("A", "a");
4147 ASSERT_OK(s);
4148
4149 txn1->SetSavePoint(); // 2
4150 s = txn1->Put("B", "b");
4151 ASSERT_OK(s);
4152
4153 s = txn1->PopSavePoint(); // Remove 2
4154 ASSERT_OK(s);
4155
4156 // Verify that A/B still exists.
4157 std::string value;
4158 ASSERT_OK(txn1->Get(read_options, "A", &value));
4159 ASSERT_EQ("a", value);
4160
4161 ASSERT_OK(txn1->Get(read_options, "B", &value));
4162 ASSERT_EQ("b", value);
4163
4164 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
4165
4166 // Verify that everything was rolled back.
4167 s = txn1->Get(read_options, "A", &value);
4168 ASSERT_TRUE(s.IsNotFound());
4169
4170 s = txn1->Get(read_options, "B", &value);
4171 ASSERT_TRUE(s.IsNotFound());
4172
4173 // Nothing should be locked
4174 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4175 ASSERT_TRUE(txn2);
4176
4177 s = txn2->Put("A", "");
4178 ASSERT_OK(s);
4179
4180 s = txn2->Put("B", "");
4181 ASSERT_OK(s);
4182
4183 delete txn2;
4184 delete txn1;
4185 }
4186
TEST_P(TransactionTest,UndoGetForUpdateTest)4187 TEST_P(TransactionTest, UndoGetForUpdateTest) {
4188 WriteOptions write_options;
4189 ReadOptions read_options;
4190 TransactionOptions txn_options;
4191 std::string value;
4192 Status s;
4193
4194 txn_options.lock_timeout = 1; // 1 ms
4195 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4196 ASSERT_TRUE(txn1);
4197
4198 txn1->UndoGetForUpdate("A");
4199
4200 s = txn1->Commit();
4201 ASSERT_OK(s);
4202 delete txn1;
4203
4204 txn1 = db->BeginTransaction(write_options, txn_options);
4205
4206 txn1->UndoGetForUpdate("A");
4207 s = txn1->GetForUpdate(read_options, "A", &value);
4208 ASSERT_TRUE(s.IsNotFound());
4209
4210 // Verify that A is locked
4211 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4212 s = txn2->Put("A", "a");
4213 ASSERT_TRUE(s.IsTimedOut());
4214
4215 txn1->UndoGetForUpdate("A");
4216
4217 // Verify that A is now unlocked
4218 s = txn2->Put("A", "a2");
4219 ASSERT_OK(s);
4220 txn2->Commit();
4221 delete txn2;
4222 s = db->Get(read_options, "A", &value);
4223 ASSERT_OK(s);
4224 ASSERT_EQ("a2", value);
4225
4226 s = txn1->Delete("A");
4227 ASSERT_OK(s);
4228 s = txn1->GetForUpdate(read_options, "A", &value);
4229 ASSERT_TRUE(s.IsNotFound());
4230
4231 s = txn1->Put("B", "b3");
4232 ASSERT_OK(s);
4233 s = txn1->GetForUpdate(read_options, "B", &value);
4234 ASSERT_OK(s);
4235
4236 txn1->UndoGetForUpdate("A");
4237 txn1->UndoGetForUpdate("B");
4238
4239 // Verify that A and B are still locked
4240 txn2 = db->BeginTransaction(write_options, txn_options);
4241 s = txn2->Put("A", "a4");
4242 ASSERT_TRUE(s.IsTimedOut());
4243 s = txn2->Put("B", "b4");
4244 ASSERT_TRUE(s.IsTimedOut());
4245
4246 txn1->Rollback();
4247 delete txn1;
4248
4249 // Verify that A and B are no longer locked
4250 s = txn2->Put("A", "a5");
4251 ASSERT_OK(s);
4252 s = txn2->Put("B", "b5");
4253 ASSERT_OK(s);
4254 s = txn2->Commit();
4255 delete txn2;
4256 ASSERT_OK(s);
4257
4258 txn1 = db->BeginTransaction(write_options, txn_options);
4259
4260 s = txn1->GetForUpdate(read_options, "A", &value);
4261 ASSERT_OK(s);
4262 s = txn1->GetForUpdate(read_options, "A", &value);
4263 ASSERT_OK(s);
4264 s = txn1->GetForUpdate(read_options, "C", &value);
4265 ASSERT_TRUE(s.IsNotFound());
4266 s = txn1->GetForUpdate(read_options, "A", &value);
4267 ASSERT_OK(s);
4268 s = txn1->GetForUpdate(read_options, "C", &value);
4269 ASSERT_TRUE(s.IsNotFound());
4270 s = txn1->GetForUpdate(read_options, "B", &value);
4271 ASSERT_OK(s);
4272 s = txn1->Put("B", "b5");
4273 s = txn1->GetForUpdate(read_options, "B", &value);
4274 ASSERT_OK(s);
4275
4276 txn1->UndoGetForUpdate("A");
4277 txn1->UndoGetForUpdate("B");
4278 txn1->UndoGetForUpdate("C");
4279 txn1->UndoGetForUpdate("X");
4280
4281 // Verify A,B,C are locked
4282 txn2 = db->BeginTransaction(write_options, txn_options);
4283 s = txn2->Put("A", "a6");
4284 ASSERT_TRUE(s.IsTimedOut());
4285 s = txn2->Delete("B");
4286 ASSERT_TRUE(s.IsTimedOut());
4287 s = txn2->Put("C", "c6");
4288 ASSERT_TRUE(s.IsTimedOut());
4289 s = txn2->Put("X", "x6");
4290 ASSERT_OK(s);
4291
4292 txn1->UndoGetForUpdate("A");
4293 txn1->UndoGetForUpdate("B");
4294 txn1->UndoGetForUpdate("C");
4295 txn1->UndoGetForUpdate("X");
4296
4297 // Verify A,B are locked and C is not
4298 s = txn2->Put("A", "a6");
4299 ASSERT_TRUE(s.IsTimedOut());
4300 s = txn2->Delete("B");
4301 ASSERT_TRUE(s.IsTimedOut());
4302 s = txn2->Put("C", "c6");
4303 ASSERT_OK(s);
4304 s = txn2->Put("X", "x6");
4305 ASSERT_OK(s);
4306
4307 txn1->UndoGetForUpdate("A");
4308 txn1->UndoGetForUpdate("B");
4309 txn1->UndoGetForUpdate("C");
4310 txn1->UndoGetForUpdate("X");
4311
4312 // Verify B is locked and A and C are not
4313 s = txn2->Put("A", "a7");
4314 ASSERT_OK(s);
4315 s = txn2->Delete("B");
4316 ASSERT_TRUE(s.IsTimedOut());
4317 s = txn2->Put("C", "c7");
4318 ASSERT_OK(s);
4319 s = txn2->Put("X", "x7");
4320 ASSERT_OK(s);
4321
4322 s = txn2->Commit();
4323 ASSERT_OK(s);
4324 delete txn2;
4325
4326 s = txn1->Commit();
4327 ASSERT_OK(s);
4328 delete txn1;
4329 }
4330
TEST_P(TransactionTest,UndoGetForUpdateTest2)4331 TEST_P(TransactionTest, UndoGetForUpdateTest2) {
4332 WriteOptions write_options;
4333 ReadOptions read_options;
4334 TransactionOptions txn_options;
4335 std::string value;
4336 Status s;
4337
4338 s = db->Put(write_options, "A", "");
4339 ASSERT_OK(s);
4340
4341 txn_options.lock_timeout = 1; // 1 ms
4342 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4343 ASSERT_TRUE(txn1);
4344
4345 s = txn1->GetForUpdate(read_options, "A", &value);
4346 ASSERT_OK(s);
4347 s = txn1->GetForUpdate(read_options, "B", &value);
4348 ASSERT_TRUE(s.IsNotFound());
4349
4350 s = txn1->Put("F", "f");
4351 ASSERT_OK(s);
4352
4353 txn1->SetSavePoint(); // 1
4354
4355 txn1->UndoGetForUpdate("A");
4356
4357 s = txn1->GetForUpdate(read_options, "C", &value);
4358 ASSERT_TRUE(s.IsNotFound());
4359 s = txn1->GetForUpdate(read_options, "D", &value);
4360 ASSERT_TRUE(s.IsNotFound());
4361
4362 s = txn1->Put("E", "e");
4363 ASSERT_OK(s);
4364 s = txn1->GetForUpdate(read_options, "E", &value);
4365 ASSERT_OK(s);
4366
4367 s = txn1->GetForUpdate(read_options, "F", &value);
4368 ASSERT_OK(s);
4369
4370 // Verify A,B,C,D,E,F are still locked
4371 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4372 s = txn2->Put("A", "a1");
4373 ASSERT_TRUE(s.IsTimedOut());
4374 s = txn2->Put("B", "b1");
4375 ASSERT_TRUE(s.IsTimedOut());
4376 s = txn2->Put("C", "c1");
4377 ASSERT_TRUE(s.IsTimedOut());
4378 s = txn2->Put("D", "d1");
4379 ASSERT_TRUE(s.IsTimedOut());
4380 s = txn2->Put("E", "e1");
4381 ASSERT_TRUE(s.IsTimedOut());
4382 s = txn2->Put("F", "f1");
4383 ASSERT_TRUE(s.IsTimedOut());
4384
4385 txn1->UndoGetForUpdate("C");
4386 txn1->UndoGetForUpdate("E");
4387
4388 // Verify A,B,D,E,F are still locked and C is not.
4389 s = txn2->Put("A", "a2");
4390 ASSERT_TRUE(s.IsTimedOut());
4391 s = txn2->Put("B", "b2");
4392 ASSERT_TRUE(s.IsTimedOut());
4393 s = txn2->Put("D", "d2");
4394 ASSERT_TRUE(s.IsTimedOut());
4395 s = txn2->Put("E", "e2");
4396 ASSERT_TRUE(s.IsTimedOut());
4397 s = txn2->Put("F", "f2");
4398 ASSERT_TRUE(s.IsTimedOut());
4399 s = txn2->Put("C", "c2");
4400 ASSERT_OK(s);
4401
4402 txn1->SetSavePoint(); // 2
4403
4404 s = txn1->Put("H", "h");
4405 ASSERT_OK(s);
4406
4407 txn1->UndoGetForUpdate("A");
4408 txn1->UndoGetForUpdate("B");
4409 txn1->UndoGetForUpdate("C");
4410 txn1->UndoGetForUpdate("D");
4411 txn1->UndoGetForUpdate("E");
4412 txn1->UndoGetForUpdate("F");
4413 txn1->UndoGetForUpdate("G");
4414 txn1->UndoGetForUpdate("H");
4415
4416 // Verify A,B,D,E,F,H are still locked and C,G are not.
4417 s = txn2->Put("A", "a3");
4418 ASSERT_TRUE(s.IsTimedOut());
4419 s = txn2->Put("B", "b3");
4420 ASSERT_TRUE(s.IsTimedOut());
4421 s = txn2->Put("D", "d3");
4422 ASSERT_TRUE(s.IsTimedOut());
4423 s = txn2->Put("E", "e3");
4424 ASSERT_TRUE(s.IsTimedOut());
4425 s = txn2->Put("F", "f3");
4426 ASSERT_TRUE(s.IsTimedOut());
4427 s = txn2->Put("H", "h3");
4428 ASSERT_TRUE(s.IsTimedOut());
4429 s = txn2->Put("C", "c3");
4430 ASSERT_OK(s);
4431 s = txn2->Put("G", "g3");
4432 ASSERT_OK(s);
4433
4434 txn1->RollbackToSavePoint(); // rollback to 2
4435
4436 // Verify A,B,D,E,F are still locked and C,G,H are not.
4437 s = txn2->Put("A", "a3");
4438 ASSERT_TRUE(s.IsTimedOut());
4439 s = txn2->Put("B", "b3");
4440 ASSERT_TRUE(s.IsTimedOut());
4441 s = txn2->Put("D", "d3");
4442 ASSERT_TRUE(s.IsTimedOut());
4443 s = txn2->Put("E", "e3");
4444 ASSERT_TRUE(s.IsTimedOut());
4445 s = txn2->Put("F", "f3");
4446 ASSERT_TRUE(s.IsTimedOut());
4447 s = txn2->Put("C", "c3");
4448 ASSERT_OK(s);
4449 s = txn2->Put("G", "g3");
4450 ASSERT_OK(s);
4451 s = txn2->Put("H", "h3");
4452 ASSERT_OK(s);
4453
4454 txn1->UndoGetForUpdate("A");
4455 txn1->UndoGetForUpdate("B");
4456 txn1->UndoGetForUpdate("C");
4457 txn1->UndoGetForUpdate("D");
4458 txn1->UndoGetForUpdate("E");
4459 txn1->UndoGetForUpdate("F");
4460 txn1->UndoGetForUpdate("G");
4461 txn1->UndoGetForUpdate("H");
4462
4463 // Verify A,B,E,F are still locked and C,D,G,H are not.
4464 s = txn2->Put("A", "a3");
4465 ASSERT_TRUE(s.IsTimedOut());
4466 s = txn2->Put("B", "b3");
4467 ASSERT_TRUE(s.IsTimedOut());
4468 s = txn2->Put("E", "e3");
4469 ASSERT_TRUE(s.IsTimedOut());
4470 s = txn2->Put("F", "f3");
4471 ASSERT_TRUE(s.IsTimedOut());
4472 s = txn2->Put("C", "c3");
4473 ASSERT_OK(s);
4474 s = txn2->Put("D", "d3");
4475 ASSERT_OK(s);
4476 s = txn2->Put("G", "g3");
4477 ASSERT_OK(s);
4478 s = txn2->Put("H", "h3");
4479 ASSERT_OK(s);
4480
4481 txn1->RollbackToSavePoint(); // rollback to 1
4482
4483 // Verify A,B,F are still locked and C,D,E,G,H are not.
4484 s = txn2->Put("A", "a3");
4485 ASSERT_TRUE(s.IsTimedOut());
4486 s = txn2->Put("B", "b3");
4487 ASSERT_TRUE(s.IsTimedOut());
4488 s = txn2->Put("F", "f3");
4489 ASSERT_TRUE(s.IsTimedOut());
4490 s = txn2->Put("C", "c3");
4491 ASSERT_OK(s);
4492 s = txn2->Put("D", "d3");
4493 ASSERT_OK(s);
4494 s = txn2->Put("E", "e3");
4495 ASSERT_OK(s);
4496 s = txn2->Put("G", "g3");
4497 ASSERT_OK(s);
4498 s = txn2->Put("H", "h3");
4499 ASSERT_OK(s);
4500
4501 txn1->UndoGetForUpdate("A");
4502 txn1->UndoGetForUpdate("B");
4503 txn1->UndoGetForUpdate("C");
4504 txn1->UndoGetForUpdate("D");
4505 txn1->UndoGetForUpdate("E");
4506 txn1->UndoGetForUpdate("F");
4507 txn1->UndoGetForUpdate("G");
4508 txn1->UndoGetForUpdate("H");
4509
4510 // Verify F is still locked and A,B,C,D,E,G,H are not.
4511 s = txn2->Put("F", "f3");
4512 ASSERT_TRUE(s.IsTimedOut());
4513 s = txn2->Put("A", "a3");
4514 ASSERT_OK(s);
4515 s = txn2->Put("B", "b3");
4516 ASSERT_OK(s);
4517 s = txn2->Put("C", "c3");
4518 ASSERT_OK(s);
4519 s = txn2->Put("D", "d3");
4520 ASSERT_OK(s);
4521 s = txn2->Put("E", "e3");
4522 ASSERT_OK(s);
4523 s = txn2->Put("G", "g3");
4524 ASSERT_OK(s);
4525 s = txn2->Put("H", "h3");
4526 ASSERT_OK(s);
4527
4528 s = txn1->Commit();
4529 ASSERT_OK(s);
4530 s = txn2->Commit();
4531 ASSERT_OK(s);
4532
4533 delete txn1;
4534 delete txn2;
4535 }
4536
TEST_P(TransactionTest,TimeoutTest)4537 TEST_P(TransactionTest, TimeoutTest) {
4538 WriteOptions write_options;
4539 ReadOptions read_options;
4540 std::string value;
4541 Status s;
4542
4543 delete db;
4544 db = nullptr;
4545
4546 // transaction writes have an infinite timeout,
4547 // but we will override this when we start a txn
4548 // db writes have infinite timeout
4549 txn_db_options.transaction_lock_timeout = -1;
4550 txn_db_options.default_lock_timeout = -1;
4551
4552 s = TransactionDB::Open(options, txn_db_options, dbname, &db);
4553 assert(db != nullptr);
4554 ASSERT_OK(s);
4555
4556 s = db->Put(write_options, "aaa", "aaa");
4557 ASSERT_OK(s);
4558
4559 TransactionOptions txn_options0;
4560 txn_options0.expiration = 100; // 100ms
4561 txn_options0.lock_timeout = 50; // txn timeout no longer infinite
4562 Transaction* txn1 = db->BeginTransaction(write_options, txn_options0);
4563
4564 s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4565 ASSERT_OK(s);
4566
4567 // Conflicts with previous GetForUpdate.
4568 // Since db writes do not have a timeout, this should eventually succeed when
4569 // the transaction expires.
4570 s = db->Put(write_options, "aaa", "xxx");
4571 ASSERT_OK(s);
4572
4573 ASSERT_GE(txn1->GetElapsedTime(),
4574 static_cast<uint64_t>(txn_options0.expiration));
4575
4576 s = txn1->Commit();
4577 ASSERT_TRUE(s.IsExpired()); // expired!
4578
4579 s = db->Get(read_options, "aaa", &value);
4580 ASSERT_OK(s);
4581 ASSERT_EQ("xxx", value);
4582
4583 delete txn1;
4584 delete db;
4585
4586 // transaction writes have 10ms timeout,
4587 // db writes have infinite timeout
4588 txn_db_options.transaction_lock_timeout = 50;
4589 txn_db_options.default_lock_timeout = -1;
4590
4591 s = TransactionDB::Open(options, txn_db_options, dbname, &db);
4592 ASSERT_OK(s);
4593
4594 s = db->Put(write_options, "aaa", "aaa");
4595 ASSERT_OK(s);
4596
4597 TransactionOptions txn_options;
4598 txn_options.expiration = 100; // 100ms
4599 txn1 = db->BeginTransaction(write_options, txn_options);
4600
4601 s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4602 ASSERT_OK(s);
4603
4604 // Conflicts with previous GetForUpdate.
4605 // Since db writes do not have a timeout, this should eventually succeed when
4606 // the transaction expires.
4607 s = db->Put(write_options, "aaa", "xxx");
4608 ASSERT_OK(s);
4609
4610 s = txn1->Commit();
4611 ASSERT_NOK(s); // expired!
4612
4613 s = db->Get(read_options, "aaa", &value);
4614 ASSERT_OK(s);
4615 ASSERT_EQ("xxx", value);
4616
4617 delete txn1;
4618 txn_options.expiration = 6000000; // 100 minutes
4619 txn_options.lock_timeout = 1; // 1ms
4620 txn1 = db->BeginTransaction(write_options, txn_options);
4621 txn1->SetLockTimeout(100);
4622
4623 TransactionOptions txn_options2;
4624 txn_options2.expiration = 10; // 10ms
4625 Transaction* txn2 = db->BeginTransaction(write_options, txn_options2);
4626 ASSERT_OK(s);
4627
4628 s = txn2->Put("a", "2");
4629 ASSERT_OK(s);
4630
4631 // txn1 has a lock timeout longer than txn2's expiration, so it will win
4632 s = txn1->Delete("a");
4633 ASSERT_OK(s);
4634
4635 s = txn1->Commit();
4636 ASSERT_OK(s);
4637
4638 // txn2 should be expired out since txn1 waiting until its timeout expired.
4639 s = txn2->Commit();
4640 ASSERT_TRUE(s.IsExpired());
4641
4642 delete txn1;
4643 delete txn2;
4644 txn_options.expiration = 6000000; // 100 minutes
4645 txn1 = db->BeginTransaction(write_options, txn_options);
4646 txn_options2.expiration = 100000000;
4647 txn2 = db->BeginTransaction(write_options, txn_options2);
4648
4649 s = txn1->Delete("asdf");
4650 ASSERT_OK(s);
4651
4652 // txn2 has a smaller lock timeout than txn1's expiration, so it will time out
4653 s = txn2->Delete("asdf");
4654 ASSERT_TRUE(s.IsTimedOut());
4655 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
4656
4657 s = txn1->Commit();
4658 ASSERT_OK(s);
4659
4660 s = txn2->Put("asdf", "asdf");
4661 ASSERT_OK(s);
4662
4663 s = txn2->Commit();
4664 ASSERT_OK(s);
4665
4666 s = db->Get(read_options, "asdf", &value);
4667 ASSERT_OK(s);
4668 ASSERT_EQ("asdf", value);
4669
4670 delete txn1;
4671 delete txn2;
4672 }
4673
TEST_P(TransactionTest,SingleDeleteTest)4674 TEST_P(TransactionTest, SingleDeleteTest) {
4675 WriteOptions write_options;
4676 ReadOptions read_options;
4677 std::string value;
4678 Status s;
4679
4680 Transaction* txn = db->BeginTransaction(write_options);
4681 ASSERT_TRUE(txn);
4682
4683 s = txn->SingleDelete("A");
4684 ASSERT_OK(s);
4685
4686 s = txn->Get(read_options, "A", &value);
4687 ASSERT_TRUE(s.IsNotFound());
4688
4689 s = txn->Commit();
4690 ASSERT_OK(s);
4691 delete txn;
4692
4693 txn = db->BeginTransaction(write_options);
4694
4695 s = txn->SingleDelete("A");
4696 ASSERT_OK(s);
4697
4698 s = txn->Put("A", "a");
4699 ASSERT_OK(s);
4700
4701 s = txn->Get(read_options, "A", &value);
4702 ASSERT_OK(s);
4703 ASSERT_EQ("a", value);
4704
4705 s = txn->Commit();
4706 ASSERT_OK(s);
4707 delete txn;
4708
4709 s = db->Get(read_options, "A", &value);
4710 ASSERT_OK(s);
4711 ASSERT_EQ("a", value);
4712
4713 txn = db->BeginTransaction(write_options);
4714
4715 s = txn->SingleDelete("A");
4716 ASSERT_OK(s);
4717
4718 s = txn->Get(read_options, "A", &value);
4719 ASSERT_TRUE(s.IsNotFound());
4720
4721 s = txn->Commit();
4722 ASSERT_OK(s);
4723 delete txn;
4724
4725 s = db->Get(read_options, "A", &value);
4726 ASSERT_TRUE(s.IsNotFound());
4727
4728 txn = db->BeginTransaction(write_options);
4729 Transaction* txn2 = db->BeginTransaction(write_options);
4730 txn2->SetSnapshot();
4731
4732 s = txn->Put("A", "a");
4733 ASSERT_OK(s);
4734
4735 s = txn->Put("A", "a2");
4736 ASSERT_OK(s);
4737
4738 s = txn->SingleDelete("A");
4739 ASSERT_OK(s);
4740
4741 s = txn->SingleDelete("B");
4742 ASSERT_OK(s);
4743
4744 // According to db.h, doing a SingleDelete on a key that has been
4745 // overwritten will have undefinied behavior. So it is unclear what the
4746 // result of fetching "A" should be. The current implementation will
4747 // return NotFound in this case.
4748 s = txn->Get(read_options, "A", &value);
4749 ASSERT_TRUE(s.IsNotFound());
4750
4751 s = txn2->Put("B", "b");
4752 ASSERT_TRUE(s.IsTimedOut());
4753 s = txn2->Commit();
4754 ASSERT_OK(s);
4755 delete txn2;
4756
4757 s = txn->Commit();
4758 ASSERT_OK(s);
4759 delete txn;
4760
4761 // According to db.h, doing a SingleDelete on a key that has been
4762 // overwritten will have undefinied behavior. So it is unclear what the
4763 // result of fetching "A" should be. The current implementation will
4764 // return NotFound in this case.
4765 s = db->Get(read_options, "A", &value);
4766 ASSERT_TRUE(s.IsNotFound());
4767
4768 s = db->Get(read_options, "B", &value);
4769 ASSERT_TRUE(s.IsNotFound());
4770 }
4771
TEST_P(TransactionTest,MergeTest)4772 TEST_P(TransactionTest, MergeTest) {
4773 WriteOptions write_options;
4774 ReadOptions read_options;
4775 std::string value;
4776 Status s;
4777
4778 Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
4779 ASSERT_TRUE(txn);
4780
4781 s = db->Put(write_options, "A", "a0");
4782 ASSERT_OK(s);
4783
4784 s = txn->Merge("A", "1");
4785 ASSERT_OK(s);
4786
4787 s = txn->Merge("A", "2");
4788 ASSERT_OK(s);
4789
4790 s = txn->Get(read_options, "A", &value);
4791 ASSERT_TRUE(s.IsMergeInProgress());
4792
4793 s = txn->Put("A", "a");
4794 ASSERT_OK(s);
4795
4796 s = txn->Get(read_options, "A", &value);
4797 ASSERT_OK(s);
4798 ASSERT_EQ("a", value);
4799
4800 s = txn->Merge("A", "3");
4801 ASSERT_OK(s);
4802
4803 s = txn->Get(read_options, "A", &value);
4804 ASSERT_TRUE(s.IsMergeInProgress());
4805
4806 TransactionOptions txn_options;
4807 txn_options.lock_timeout = 1; // 1 ms
4808 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4809 ASSERT_TRUE(txn2);
4810
4811 // verify that txn has "A" locked
4812 s = txn2->Merge("A", "4");
4813 ASSERT_TRUE(s.IsTimedOut());
4814
4815 s = txn2->Commit();
4816 ASSERT_OK(s);
4817 delete txn2;
4818
4819 s = txn->Commit();
4820 ASSERT_OK(s);
4821 delete txn;
4822
4823 s = db->Get(read_options, "A", &value);
4824 ASSERT_OK(s);
4825 ASSERT_EQ("a,3", value);
4826 }
4827
TEST_P(TransactionTest,DeferSnapshotTest)4828 TEST_P(TransactionTest, DeferSnapshotTest) {
4829 WriteOptions write_options;
4830 ReadOptions read_options;
4831 std::string value;
4832 Status s;
4833
4834 s = db->Put(write_options, "A", "a0");
4835 ASSERT_OK(s);
4836
4837 Transaction* txn1 = db->BeginTransaction(write_options);
4838 Transaction* txn2 = db->BeginTransaction(write_options);
4839
4840 txn1->SetSnapshotOnNextOperation();
4841 auto snapshot = txn1->GetSnapshot();
4842 ASSERT_FALSE(snapshot);
4843
4844 s = txn2->Put("A", "a2");
4845 ASSERT_OK(s);
4846 s = txn2->Commit();
4847 ASSERT_OK(s);
4848 delete txn2;
4849
4850 s = txn1->GetForUpdate(read_options, "A", &value);
4851 // Should not conflict with txn2 since snapshot wasn't set until
4852 // GetForUpdate was called.
4853 ASSERT_OK(s);
4854 ASSERT_EQ("a2", value);
4855
4856 s = txn1->Put("A", "a1");
4857 ASSERT_OK(s);
4858
4859 s = db->Put(write_options, "B", "b0");
4860 ASSERT_OK(s);
4861
4862 // Cannot lock B since it was written after the snapshot was set
4863 s = txn1->Put("B", "b1");
4864 ASSERT_TRUE(s.IsBusy());
4865
4866 s = txn1->Commit();
4867 ASSERT_OK(s);
4868 delete txn1;
4869
4870 s = db->Get(read_options, "A", &value);
4871 ASSERT_OK(s);
4872 ASSERT_EQ("a1", value);
4873
4874 s = db->Get(read_options, "B", &value);
4875 ASSERT_OK(s);
4876 ASSERT_EQ("b0", value);
4877 }
4878
TEST_P(TransactionTest,DeferSnapshotTest2)4879 TEST_P(TransactionTest, DeferSnapshotTest2) {
4880 WriteOptions write_options;
4881 ReadOptions read_options, snapshot_read_options;
4882 std::string value;
4883 Status s;
4884
4885 Transaction* txn1 = db->BeginTransaction(write_options);
4886
4887 txn1->SetSnapshot();
4888
4889 s = txn1->Put("A", "a1");
4890 ASSERT_OK(s);
4891
4892 s = db->Put(write_options, "C", "c0");
4893 ASSERT_OK(s);
4894 s = db->Put(write_options, "D", "d0");
4895 ASSERT_OK(s);
4896
4897 snapshot_read_options.snapshot = txn1->GetSnapshot();
4898
4899 txn1->SetSnapshotOnNextOperation();
4900
4901 s = txn1->Get(snapshot_read_options, "C", &value);
4902 // Snapshot was set before C was written
4903 ASSERT_TRUE(s.IsNotFound());
4904 s = txn1->Get(snapshot_read_options, "D", &value);
4905 // Snapshot was set before D was written
4906 ASSERT_TRUE(s.IsNotFound());
4907
4908 // Snapshot should not have changed yet.
4909 snapshot_read_options.snapshot = txn1->GetSnapshot();
4910
4911 s = txn1->Get(snapshot_read_options, "C", &value);
4912 // Snapshot was set before C was written
4913 ASSERT_TRUE(s.IsNotFound());
4914 s = txn1->Get(snapshot_read_options, "D", &value);
4915 // Snapshot was set before D was written
4916 ASSERT_TRUE(s.IsNotFound());
4917
4918 s = txn1->GetForUpdate(read_options, "C", &value);
4919 ASSERT_OK(s);
4920 ASSERT_EQ("c0", value);
4921
4922 s = db->Put(write_options, "D", "d00");
4923 ASSERT_OK(s);
4924
4925 // Snapshot is now set
4926 snapshot_read_options.snapshot = txn1->GetSnapshot();
4927 s = txn1->Get(snapshot_read_options, "D", &value);
4928 ASSERT_OK(s);
4929 ASSERT_EQ("d0", value);
4930
4931 s = txn1->Commit();
4932 ASSERT_OK(s);
4933 delete txn1;
4934 }
4935
TEST_P(TransactionTest,DeferSnapshotSavePointTest)4936 TEST_P(TransactionTest, DeferSnapshotSavePointTest) {
4937 WriteOptions write_options;
4938 ReadOptions read_options, snapshot_read_options;
4939 std::string value;
4940 Status s;
4941
4942 Transaction* txn1 = db->BeginTransaction(write_options);
4943
4944 txn1->SetSavePoint(); // 1
4945
4946 s = db->Put(write_options, "T", "1");
4947 ASSERT_OK(s);
4948
4949 txn1->SetSnapshotOnNextOperation();
4950
4951 s = db->Put(write_options, "T", "2");
4952 ASSERT_OK(s);
4953
4954 txn1->SetSavePoint(); // 2
4955
4956 s = db->Put(write_options, "T", "3");
4957 ASSERT_OK(s);
4958
4959 s = txn1->Put("A", "a");
4960 ASSERT_OK(s);
4961
4962 txn1->SetSavePoint(); // 3
4963
4964 s = db->Put(write_options, "T", "4");
4965 ASSERT_OK(s);
4966
4967 txn1->SetSnapshot();
4968 txn1->SetSnapshotOnNextOperation();
4969
4970 txn1->SetSavePoint(); // 4
4971
4972 s = db->Put(write_options, "T", "5");
4973 ASSERT_OK(s);
4974
4975 snapshot_read_options.snapshot = txn1->GetSnapshot();
4976 s = txn1->Get(snapshot_read_options, "T", &value);
4977 ASSERT_OK(s);
4978 ASSERT_EQ("4", value);
4979
4980 s = txn1->Put("A", "a1");
4981 ASSERT_OK(s);
4982
4983 snapshot_read_options.snapshot = txn1->GetSnapshot();
4984 s = txn1->Get(snapshot_read_options, "T", &value);
4985 ASSERT_OK(s);
4986 ASSERT_EQ("5", value);
4987
4988 s = txn1->RollbackToSavePoint(); // Rollback to 4
4989 ASSERT_OK(s);
4990
4991 snapshot_read_options.snapshot = txn1->GetSnapshot();
4992 s = txn1->Get(snapshot_read_options, "T", &value);
4993 ASSERT_OK(s);
4994 ASSERT_EQ("4", value);
4995
4996 s = txn1->RollbackToSavePoint(); // Rollback to 3
4997 ASSERT_OK(s);
4998
4999 snapshot_read_options.snapshot = txn1->GetSnapshot();
5000 s = txn1->Get(snapshot_read_options, "T", &value);
5001 ASSERT_OK(s);
5002 ASSERT_EQ("3", value);
5003
5004 s = txn1->Get(read_options, "T", &value);
5005 ASSERT_OK(s);
5006 ASSERT_EQ("5", value);
5007
5008 s = txn1->RollbackToSavePoint(); // Rollback to 2
5009 ASSERT_OK(s);
5010
5011 snapshot_read_options.snapshot = txn1->GetSnapshot();
5012 ASSERT_FALSE(snapshot_read_options.snapshot);
5013 s = txn1->Get(snapshot_read_options, "T", &value);
5014 ASSERT_OK(s);
5015 ASSERT_EQ("5", value);
5016
5017 s = txn1->Delete("A");
5018 ASSERT_OK(s);
5019
5020 snapshot_read_options.snapshot = txn1->GetSnapshot();
5021 ASSERT_TRUE(snapshot_read_options.snapshot);
5022 s = txn1->Get(snapshot_read_options, "T", &value);
5023 ASSERT_OK(s);
5024 ASSERT_EQ("5", value);
5025
5026 s = txn1->RollbackToSavePoint(); // Rollback to 1
5027 ASSERT_OK(s);
5028
5029 s = txn1->Delete("A");
5030 ASSERT_OK(s);
5031
5032 snapshot_read_options.snapshot = txn1->GetSnapshot();
5033 ASSERT_FALSE(snapshot_read_options.snapshot);
5034 s = txn1->Get(snapshot_read_options, "T", &value);
5035 ASSERT_OK(s);
5036 ASSERT_EQ("5", value);
5037
5038 s = txn1->Commit();
5039 ASSERT_OK(s);
5040
5041 delete txn1;
5042 }
5043
TEST_P(TransactionTest,SetSnapshotOnNextOperationWithNotification)5044 TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
5045 WriteOptions write_options;
5046 ReadOptions read_options;
5047 std::string value;
5048
5049 class Notifier : public TransactionNotifier {
5050 private:
5051 const Snapshot** snapshot_ptr_;
5052
5053 public:
5054 explicit Notifier(const Snapshot** snapshot_ptr)
5055 : snapshot_ptr_(snapshot_ptr) {}
5056
5057 void SnapshotCreated(const Snapshot* newSnapshot) override {
5058 *snapshot_ptr_ = newSnapshot;
5059 }
5060 };
5061
5062 std::shared_ptr<Notifier> notifier =
5063 std::make_shared<Notifier>(&read_options.snapshot);
5064 Status s;
5065
5066 s = db->Put(write_options, "B", "0");
5067 ASSERT_OK(s);
5068
5069 Transaction* txn1 = db->BeginTransaction(write_options);
5070
5071 txn1->SetSnapshotOnNextOperation(notifier);
5072 ASSERT_FALSE(read_options.snapshot);
5073
5074 s = db->Put(write_options, "B", "1");
5075 ASSERT_OK(s);
5076
5077 // A Get does not generate the snapshot
5078 s = txn1->Get(read_options, "B", &value);
5079 ASSERT_OK(s);
5080 ASSERT_FALSE(read_options.snapshot);
5081 ASSERT_EQ(value, "1");
5082
5083 // Any other operation does
5084 s = txn1->Put("A", "0");
5085 ASSERT_OK(s);
5086
5087 // Now change "B".
5088 s = db->Put(write_options, "B", "2");
5089 ASSERT_OK(s);
5090
5091 // The original value should still be read
5092 s = txn1->Get(read_options, "B", &value);
5093 ASSERT_OK(s);
5094 ASSERT_TRUE(read_options.snapshot);
5095 ASSERT_EQ(value, "1");
5096
5097 s = txn1->Commit();
5098 ASSERT_OK(s);
5099
5100 delete txn1;
5101 }
5102
TEST_P(TransactionTest,ClearSnapshotTest)5103 TEST_P(TransactionTest, ClearSnapshotTest) {
5104 WriteOptions write_options;
5105 ReadOptions read_options, snapshot_read_options;
5106 std::string value;
5107 Status s;
5108
5109 s = db->Put(write_options, "foo", "0");
5110 ASSERT_OK(s);
5111
5112 Transaction* txn = db->BeginTransaction(write_options);
5113 ASSERT_TRUE(txn);
5114
5115 s = db->Put(write_options, "foo", "1");
5116 ASSERT_OK(s);
5117
5118 snapshot_read_options.snapshot = txn->GetSnapshot();
5119 ASSERT_FALSE(snapshot_read_options.snapshot);
5120
5121 // No snapshot created yet
5122 s = txn->Get(snapshot_read_options, "foo", &value);
5123 ASSERT_EQ(value, "1");
5124
5125 txn->SetSnapshot();
5126 snapshot_read_options.snapshot = txn->GetSnapshot();
5127 ASSERT_TRUE(snapshot_read_options.snapshot);
5128
5129 s = db->Put(write_options, "foo", "2");
5130 ASSERT_OK(s);
5131
5132 // Snapshot was created before change to '2'
5133 s = txn->Get(snapshot_read_options, "foo", &value);
5134 ASSERT_EQ(value, "1");
5135
5136 txn->ClearSnapshot();
5137 snapshot_read_options.snapshot = txn->GetSnapshot();
5138 ASSERT_FALSE(snapshot_read_options.snapshot);
5139
5140 // Snapshot has now been cleared
5141 s = txn->Get(snapshot_read_options, "foo", &value);
5142 ASSERT_EQ(value, "2");
5143
5144 s = txn->Commit();
5145 ASSERT_OK(s);
5146
5147 delete txn;
5148 }
5149
TEST_P(TransactionTest,ToggleAutoCompactionTest)5150 TEST_P(TransactionTest, ToggleAutoCompactionTest) {
5151 Status s;
5152
5153 ColumnFamilyHandle *cfa, *cfb;
5154 ColumnFamilyOptions cf_options;
5155
5156 // Create 2 new column families
5157 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
5158 ASSERT_OK(s);
5159 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
5160 ASSERT_OK(s);
5161
5162 delete cfa;
5163 delete cfb;
5164 delete db;
5165
5166 // open DB with three column families
5167 std::vector<ColumnFamilyDescriptor> column_families;
5168 // have to open default column family
5169 column_families.push_back(
5170 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
5171 // open the new column families
5172 column_families.push_back(
5173 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
5174 column_families.push_back(
5175 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
5176
5177 ColumnFamilyOptions* cf_opt_default = &column_families[0].options;
5178 ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options;
5179 ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options;
5180 cf_opt_default->disable_auto_compactions = false;
5181 cf_opt_cfa->disable_auto_compactions = true;
5182 cf_opt_cfb->disable_auto_compactions = false;
5183
5184 std::vector<ColumnFamilyHandle*> handles;
5185
5186 s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
5187 &handles, &db);
5188 ASSERT_OK(s);
5189
5190 auto cfh_default = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[0]);
5191 auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions();
5192
5193 auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[1]);
5194 auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions();
5195
5196 auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[2]);
5197 auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions();
5198
5199 ASSERT_EQ(opt_default.disable_auto_compactions, false);
5200 ASSERT_EQ(opt_a.disable_auto_compactions, true);
5201 ASSERT_EQ(opt_b.disable_auto_compactions, false);
5202
5203 for (auto handle : handles) {
5204 delete handle;
5205 }
5206 }
5207
TEST_P(TransactionStressTest,ExpiredTransactionDataRace1)5208 TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) {
5209 // In this test, txn1 should succeed committing,
5210 // as the callback is called after txn1 starts committing.
5211 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5212 {{"TransactionTest::ExpirableTransactionDataRace:1"}});
5213 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5214 "TransactionTest::ExpirableTransactionDataRace:1", [&](void* /*arg*/) {
5215 WriteOptions write_options;
5216 TransactionOptions txn_options;
5217
5218 // Force txn1 to expire
5219 /* sleep override */
5220 std::this_thread::sleep_for(std::chrono::milliseconds(150));
5221
5222 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
5223 Status s;
5224 s = txn2->Put("X", "2");
5225 ASSERT_TRUE(s.IsTimedOut());
5226 s = txn2->Commit();
5227 ASSERT_OK(s);
5228 delete txn2;
5229 });
5230
5231 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5232
5233 WriteOptions write_options;
5234 TransactionOptions txn_options;
5235
5236 txn_options.expiration = 100;
5237 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
5238
5239 Status s;
5240 s = txn1->Put("X", "1");
5241 ASSERT_OK(s);
5242 s = txn1->Commit();
5243 ASSERT_OK(s);
5244
5245 ReadOptions read_options;
5246 string value;
5247 s = db->Get(read_options, "X", &value);
5248 ASSERT_EQ("1", value);
5249
5250 delete txn1;
5251 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5252 }
5253
5254 #ifndef ROCKSDB_VALGRIND_RUN
5255 namespace {
5256 // cmt_delay_ms is the delay between prepare and commit
5257 // first_id is the id of the first transaction
TransactionStressTestInserter(TransactionDB * db,const size_t num_transactions,const size_t num_sets,const size_t num_keys_per_set,Random64 * rand,const uint64_t cmt_delay_ms=0,const uint64_t first_id=0)5258 Status TransactionStressTestInserter(
5259 TransactionDB* db, const size_t num_transactions, const size_t num_sets,
5260 const size_t num_keys_per_set, Random64* rand,
5261 const uint64_t cmt_delay_ms = 0, const uint64_t first_id = 0) {
5262 WriteOptions write_options;
5263 ReadOptions read_options;
5264 TransactionOptions txn_options;
5265 if (rand->OneIn(2)) {
5266 txn_options.use_only_the_last_commit_time_batch_for_recovery = true;
5267 }
5268 // Inside the inserter we might also retake the snapshot. We do both since two
5269 // separte functions are engaged for each.
5270 txn_options.set_snapshot = rand->OneIn(2);
5271
5272 RandomTransactionInserter inserter(
5273 rand, write_options, read_options, num_keys_per_set,
5274 static_cast<uint16_t>(num_sets), cmt_delay_ms, first_id);
5275
5276 for (size_t t = 0; t < num_transactions; t++) {
5277 bool success = inserter.TransactionDBInsert(db, txn_options);
5278 if (!success) {
5279 // unexpected failure
5280 return inserter.GetLastStatus();
5281 }
5282 }
5283
5284 // Make sure at least some of the transactions succeeded. It's ok if
5285 // some failed due to write-conflicts.
5286 if (num_transactions != 1 &&
5287 inserter.GetFailureCount() > num_transactions / 2) {
5288 return Status::TryAgain("Too many transactions failed! " +
5289 std::to_string(inserter.GetFailureCount()) + " / " +
5290 std::to_string(num_transactions));
5291 }
5292
5293 return Status::OK();
5294 }
5295 } // namespace
5296
5297 // Worker threads add a number to a key from each set of keys. The checker
5298 // threads verify that the sum of all keys in each set are equal.
TEST_P(MySQLStyleTransactionTest,TransactionStressTest)5299 TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
5300 // Small write buffer to trigger more compactions
5301 options.write_buffer_size = 1024;
5302 ReOpenNoDelete();
5303 const size_t num_workers = 4; // worker threads count
5304 const size_t num_checkers = 2; // checker threads count
5305 const size_t num_slow_checkers = 2; // checker threads emulating backups
5306 const size_t num_slow_workers = 1; // slow worker threads count
5307 const size_t num_transactions_per_thread = 10000;
5308 const uint16_t num_sets = 3;
5309 const size_t num_keys_per_set = 100;
5310 // Setting the key-space to be 100 keys should cause enough write-conflicts
5311 // to make this test interesting.
5312
5313 std::vector<port::Thread> threads;
5314 std::atomic<uint32_t> finished = {0};
5315 bool TAKE_SNAPSHOT = true;
5316 uint64_t time_seed = env->NowMicros();
5317 printf("time_seed is %" PRIu64 "\n", time_seed); // would help to reproduce
5318
5319 std::function<void()> call_inserter = [&] {
5320 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5321 Random64 rand(time_seed * thd_seed);
5322 ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread,
5323 num_sets, num_keys_per_set, &rand));
5324 finished++;
5325 };
5326 std::function<void()> call_checker = [&] {
5327 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5328 Random64 rand(time_seed * thd_seed);
5329 // Verify that data is consistent
5330 while (finished < num_workers) {
5331 Status s = RandomTransactionInserter::Verify(
5332 db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand);
5333 ASSERT_OK(s);
5334 }
5335 };
5336 std::function<void()> call_slow_checker = [&] {
5337 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5338 Random64 rand(time_seed * thd_seed);
5339 // Verify that data is consistent
5340 while (finished < num_workers) {
5341 uint64_t delay_ms = rand.Uniform(100) + 1;
5342 Status s = RandomTransactionInserter::Verify(
5343 db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand, delay_ms);
5344 ASSERT_OK(s);
5345 }
5346 };
5347 std::function<void()> call_slow_inserter = [&] {
5348 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5349 Random64 rand(time_seed * thd_seed);
5350 uint64_t id = 0;
5351 // Verify that data is consistent
5352 while (finished < num_workers) {
5353 uint64_t delay_ms = rand.Uniform(500) + 1;
5354 ASSERT_OK(TransactionStressTestInserter(db, 1, num_sets, num_keys_per_set,
5355 &rand, delay_ms, id++));
5356 }
5357 };
5358
5359 for (uint32_t i = 0; i < num_workers; i++) {
5360 threads.emplace_back(call_inserter);
5361 }
5362 for (uint32_t i = 0; i < num_checkers; i++) {
5363 threads.emplace_back(call_checker);
5364 }
5365 if (with_slow_threads_) {
5366 for (uint32_t i = 0; i < num_slow_checkers; i++) {
5367 threads.emplace_back(call_slow_checker);
5368 }
5369 for (uint32_t i = 0; i < num_slow_workers; i++) {
5370 threads.emplace_back(call_slow_inserter);
5371 }
5372 }
5373
5374 // Wait for all threads to finish
5375 for (auto& t : threads) {
5376 t.join();
5377 }
5378
5379 // Verify that data is consistent
5380 Status s = RandomTransactionInserter::Verify(db, num_sets, num_keys_per_set,
5381 !TAKE_SNAPSHOT);
5382 ASSERT_OK(s);
5383 }
5384 #endif // ROCKSDB_VALGRIND_RUN
5385
TEST_P(TransactionTest,MemoryLimitTest)5386 TEST_P(TransactionTest, MemoryLimitTest) {
5387 TransactionOptions txn_options;
5388 // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
5389 txn_options.max_write_batch_size = 29;
5390 // Set threshold to unlimited so that the write batch does not get flushed,
5391 // and can hit the memory limit.
5392 txn_options.write_batch_flush_threshold = 0;
5393 std::string value;
5394 Status s;
5395
5396 Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options);
5397 ASSERT_TRUE(txn);
5398
5399 ASSERT_EQ(0, txn->GetNumPuts());
5400 ASSERT_LE(0, txn->GetID());
5401
5402 s = txn->Put(Slice("a"), Slice("...."));
5403 ASSERT_OK(s);
5404 ASSERT_EQ(1, txn->GetNumPuts());
5405
5406 s = txn->Put(Slice("b"), Slice("...."));
5407 ASSERT_OK(s);
5408 ASSERT_EQ(2, txn->GetNumPuts());
5409
5410 s = txn->Put(Slice("b"), Slice("...."));
5411 ASSERT_TRUE(s.IsMemoryLimit());
5412 ASSERT_EQ(2, txn->GetNumPuts());
5413
5414 txn->Rollback();
5415 delete txn;
5416 }
5417
5418 // This test clarifies the existing expectation from the sequence number
5419 // algorithm. It could detect mistakes in updating the code but it is not
5420 // necessarily the one acceptable way. If the algorithm is legitimately changed,
5421 // this unit test should be updated as well.
TEST_P(TransactionStressTest,SeqAdvanceTest)5422 TEST_P(TransactionStressTest, SeqAdvanceTest) {
5423 // TODO(myabandeh): must be test with false before new releases
5424 const bool short_test = true;
5425 WriteOptions wopts;
5426 FlushOptions fopt;
5427
5428 options.disable_auto_compactions = true;
5429 ASSERT_OK(ReOpen());
5430
5431 // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
5432 // of the branches. This is the same as counting a binary number where i-th
5433 // bit represents whether we take branch i in the represented by the number.
5434 const size_t NUM_BRANCHES = short_test ? 6 : 10;
5435 // Helper function that shows if the branch is to be taken in the run
5436 // represented by the number n.
5437 auto branch_do = [&](size_t n, size_t* branch) {
5438 assert(*branch < NUM_BRANCHES);
5439 const size_t filter = static_cast<size_t>(1) << *branch;
5440 return n & filter;
5441 };
5442 const size_t max_n = static_cast<size_t>(1) << NUM_BRANCHES;
5443 for (size_t n = 0; n < max_n; n++) {
5444 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5445 size_t branch = 0;
5446 auto seq = db_impl->GetLatestSequenceNumber();
5447 exp_seq = seq;
5448 txn_t0(0);
5449 seq = db_impl->TEST_GetLastVisibleSequence();
5450 ASSERT_EQ(exp_seq, seq);
5451
5452 if (branch_do(n, &branch)) {
5453 ASSERT_OK(db_impl->Flush(fopt));
5454 seq = db_impl->TEST_GetLastVisibleSequence();
5455 ASSERT_EQ(exp_seq, seq);
5456 }
5457 if (!short_test && branch_do(n, &branch)) {
5458 ASSERT_OK(db_impl->FlushWAL(true));
5459 ASSERT_OK(ReOpenNoDelete());
5460 db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5461 seq = db_impl->GetLatestSequenceNumber();
5462 ASSERT_EQ(exp_seq, seq);
5463 }
5464
5465 // Doing it twice might detect some bugs
5466 txn_t0(1);
5467 seq = db_impl->TEST_GetLastVisibleSequence();
5468 ASSERT_EQ(exp_seq, seq);
5469
5470 txn_t1(0);
5471 seq = db_impl->TEST_GetLastVisibleSequence();
5472 ASSERT_EQ(exp_seq, seq);
5473
5474 if (branch_do(n, &branch)) {
5475 ASSERT_OK(db_impl->Flush(fopt));
5476 seq = db_impl->TEST_GetLastVisibleSequence();
5477 ASSERT_EQ(exp_seq, seq);
5478 }
5479 if (!short_test && branch_do(n, &branch)) {
5480 ASSERT_OK(db_impl->FlushWAL(true));
5481 ASSERT_OK(ReOpenNoDelete());
5482 db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5483 seq = db_impl->GetLatestSequenceNumber();
5484 ASSERT_EQ(exp_seq, seq);
5485 }
5486
5487 txn_t3(0);
5488 seq = db_impl->TEST_GetLastVisibleSequence();
5489 ASSERT_EQ(exp_seq, seq);
5490
5491 if (branch_do(n, &branch)) {
5492 ASSERT_OK(db_impl->Flush(fopt));
5493 seq = db_impl->TEST_GetLastVisibleSequence();
5494 ASSERT_EQ(exp_seq, seq);
5495 }
5496 if (!short_test && branch_do(n, &branch)) {
5497 ASSERT_OK(db_impl->FlushWAL(true));
5498 ASSERT_OK(ReOpenNoDelete());
5499 db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5500 seq = db_impl->GetLatestSequenceNumber();
5501 ASSERT_EQ(exp_seq, seq);
5502 }
5503
5504 txn_t4(0);
5505 seq = db_impl->TEST_GetLastVisibleSequence();
5506
5507 ASSERT_EQ(exp_seq, seq);
5508
5509 if (branch_do(n, &branch)) {
5510 ASSERT_OK(db_impl->Flush(fopt));
5511 seq = db_impl->TEST_GetLastVisibleSequence();
5512 ASSERT_EQ(exp_seq, seq);
5513 }
5514 if (!short_test && branch_do(n, &branch)) {
5515 ASSERT_OK(db_impl->FlushWAL(true));
5516 ASSERT_OK(ReOpenNoDelete());
5517 db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5518 seq = db_impl->GetLatestSequenceNumber();
5519 ASSERT_EQ(exp_seq, seq);
5520 }
5521
5522 txn_t2(0);
5523 seq = db_impl->TEST_GetLastVisibleSequence();
5524 ASSERT_EQ(exp_seq, seq);
5525
5526 if (branch_do(n, &branch)) {
5527 ASSERT_OK(db_impl->Flush(fopt));
5528 seq = db_impl->TEST_GetLastVisibleSequence();
5529 ASSERT_EQ(exp_seq, seq);
5530 }
5531 if (!short_test && branch_do(n, &branch)) {
5532 ASSERT_OK(db_impl->FlushWAL(true));
5533 ASSERT_OK(ReOpenNoDelete());
5534 db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
5535 seq = db_impl->GetLatestSequenceNumber();
5536 ASSERT_EQ(exp_seq, seq);
5537 }
5538 ASSERT_OK(ReOpen());
5539 }
5540 }
5541
5542 // Verify that the optimization would not compromize the correctness
TEST_P(TransactionTest,Optimizations)5543 TEST_P(TransactionTest, Optimizations) {
5544 size_t comb_cnt = size_t(1) << 2; // 2 is number of optimization vars
5545 for (size_t new_comb = 0; new_comb < comb_cnt; new_comb++) {
5546 TransactionDBWriteOptimizations optimizations;
5547 optimizations.skip_concurrency_control = IsInCombination(0, new_comb);
5548 optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb);
5549
5550 ASSERT_OK(ReOpen());
5551 WriteOptions write_options;
5552 WriteBatch batch;
5553 batch.Put(Slice("k"), Slice("v1"));
5554 ASSERT_OK(db->Write(write_options, &batch));
5555
5556 ReadOptions ropt;
5557 PinnableSlice pinnable_val;
5558 ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "k", &pinnable_val));
5559 ASSERT_TRUE(pinnable_val == ("v1"));
5560 }
5561 }
5562
5563 // A comparator that uses only the first three bytes
5564 class ThreeBytewiseComparator : public Comparator {
5565 public:
ThreeBytewiseComparator()5566 ThreeBytewiseComparator() {}
Name() const5567 const char* Name() const override { return "test.ThreeBytewiseComparator"; }
Compare(const Slice & a,const Slice & b) const5568 int Compare(const Slice& a, const Slice& b) const override {
5569 Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5570 Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5571 return na.compare(nb);
5572 }
Equal(const Slice & a,const Slice & b) const5573 bool Equal(const Slice& a, const Slice& b) const override {
5574 Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5575 Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5576 return na == nb;
5577 }
5578 // This methods below dont seem relevant to this test. Implement them if
5579 // proven othersize.
FindShortestSeparator(std::string * start,const Slice & limit) const5580 void FindShortestSeparator(std::string* start,
5581 const Slice& limit) const override {
5582 const Comparator* bytewise_comp = BytewiseComparator();
5583 bytewise_comp->FindShortestSeparator(start, limit);
5584 }
FindShortSuccessor(std::string * key) const5585 void FindShortSuccessor(std::string* key) const override {
5586 const Comparator* bytewise_comp = BytewiseComparator();
5587 bytewise_comp->FindShortSuccessor(key);
5588 }
5589 };
5590
5591 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionTest,GetWithoutSnapshot)5592 TEST_P(TransactionTest, GetWithoutSnapshot) {
5593 WriteOptions write_options;
5594 std::atomic<bool> finish = {false};
5595 db->Put(write_options, "key", "value");
5596 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
5597 for (int i = 0; i < 100; i++) {
5598 TransactionOptions txn_options;
5599 Transaction* txn = db->BeginTransaction(write_options, txn_options);
5600 ASSERT_OK(txn->SetName("xid"));
5601 ASSERT_OK(txn->Put("key", "overridedvalue"));
5602 ASSERT_OK(txn->Put("key", "value"));
5603 ASSERT_OK(txn->Prepare());
5604 ASSERT_OK(txn->Commit());
5605 delete txn;
5606 }
5607 finish = true;
5608 });
5609 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
5610 while (!finish) {
5611 ReadOptions ropt;
5612 PinnableSlice pinnable_val;
5613 ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val));
5614 ASSERT_TRUE(pinnable_val == ("value"));
5615 }
5616 });
5617 commit_thread.join();
5618 read_thread.join();
5619 }
5620 #endif // ROCKSDB_VALGRIND_RUN
5621
5622 // Test that the transactional db can handle duplicate keys in the write batch
TEST_P(TransactionTest,DuplicateKeys)5623 TEST_P(TransactionTest, DuplicateKeys) {
5624 ColumnFamilyOptions cf_options;
5625 std::string cf_name = "two";
5626 ColumnFamilyHandle* cf_handle = nullptr;
5627 {
5628 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5629 WriteOptions write_options;
5630 WriteBatch batch;
5631 batch.Put(Slice("key"), Slice("value"));
5632 batch.Put(Slice("key2"), Slice("value2"));
5633 // duplicate the keys
5634 batch.Put(Slice("key"), Slice("value3"));
5635 // duplicate the 2nd key. It should not be counted duplicate since a
5636 // sub-patch is cut after the last duplicate.
5637 batch.Put(Slice("key2"), Slice("value4"));
5638 // duplicate the keys but in a different cf. It should not be counted as
5639 // duplicate keys
5640 batch.Put(cf_handle, Slice("key"), Slice("value5"));
5641
5642 ASSERT_OK(db->Write(write_options, &batch));
5643
5644 ReadOptions ropt;
5645 PinnableSlice pinnable_val;
5646 auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
5647 ASSERT_OK(s);
5648 ASSERT_TRUE(pinnable_val == ("value3"));
5649 s = db->Get(ropt, db->DefaultColumnFamily(), "key2", &pinnable_val);
5650 ASSERT_OK(s);
5651 ASSERT_TRUE(pinnable_val == ("value4"));
5652 s = db->Get(ropt, cf_handle, "key", &pinnable_val);
5653 ASSERT_OK(s);
5654 ASSERT_TRUE(pinnable_val == ("value5"));
5655
5656 delete cf_handle;
5657 }
5658
5659 // Test with non-bytewise comparator
5660 {
5661 ASSERT_OK(ReOpen());
5662 std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5663 cf_options.comparator = comp_gc.get();
5664 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5665 WriteOptions write_options;
5666 WriteBatch batch;
5667 batch.Put(cf_handle, Slice("key"), Slice("value"));
5668 // The first three bytes are the same, do it must be counted as duplicate
5669 batch.Put(cf_handle, Slice("key2"), Slice("value2"));
5670 // check for 2nd duplicate key in cf with non-default comparator
5671 batch.Put(cf_handle, Slice("key2b"), Slice("value2b"));
5672 ASSERT_OK(db->Write(write_options, &batch));
5673
5674 // The value must be the most recent value for all the keys equal to "key",
5675 // including "key2"
5676 ReadOptions ropt;
5677 PinnableSlice pinnable_val;
5678 ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val));
5679 ASSERT_TRUE(pinnable_val == ("value2b"));
5680
5681 // Test duplicate keys with rollback
5682 TransactionOptions txn_options;
5683 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5684 ASSERT_OK(txn0->SetName("xid"));
5685 ASSERT_OK(txn0->Put(cf_handle, Slice("key3"), Slice("value3")));
5686 ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4")));
5687 ASSERT_OK(txn0->Rollback());
5688 ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val));
5689 ASSERT_TRUE(pinnable_val == ("value2b"));
5690 delete txn0;
5691
5692 delete cf_handle;
5693 cf_options.comparator = BytewiseComparator();
5694 }
5695
5696 for (bool do_prepare : {true, false}) {
5697 for (bool do_rollback : {true, false}) {
5698 for (bool with_commit_batch : {true, false}) {
5699 if (with_commit_batch && !do_prepare) {
5700 continue;
5701 }
5702 if (with_commit_batch && do_rollback) {
5703 continue;
5704 }
5705 ASSERT_OK(ReOpen());
5706 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5707 TransactionOptions txn_options;
5708 txn_options.use_only_the_last_commit_time_batch_for_recovery = false;
5709 WriteOptions write_options;
5710 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5711 auto s = txn0->SetName("xid");
5712 ASSERT_OK(s);
5713 s = txn0->Put(Slice("foo0"), Slice("bar0a"));
5714 ASSERT_OK(s);
5715 s = txn0->Put(Slice("foo0"), Slice("bar0b"));
5716 ASSERT_OK(s);
5717 s = txn0->Put(Slice("foo1"), Slice("bar1"));
5718 ASSERT_OK(s);
5719 s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
5720 ASSERT_OK(s);
5721 // Repeat a key after the start of a sub-patch. This should not cause a
5722 // duplicate in the most recent sub-patch and hence not creating a new
5723 // sub-patch.
5724 s = txn0->Put(Slice("foo0"), Slice("bar0c"));
5725 ASSERT_OK(s);
5726 s = txn0->Merge(Slice("foo2"), Slice("bar2b"));
5727 ASSERT_OK(s);
5728 // duplicate the keys but in a different cf. It should not be counted as
5729 // duplicate.
5730 s = txn0->Put(cf_handle, Slice("foo0"), Slice("bar0-cf1"));
5731 ASSERT_OK(s);
5732 s = txn0->Put(Slice("foo3"), Slice("bar3"));
5733 ASSERT_OK(s);
5734 s = txn0->Merge(Slice("foo3"), Slice("bar3"));
5735 ASSERT_OK(s);
5736 s = txn0->Put(Slice("foo4"), Slice("bar4"));
5737 ASSERT_OK(s);
5738 s = txn0->Delete(Slice("foo4"));
5739 ASSERT_OK(s);
5740 s = txn0->SingleDelete(Slice("foo4"));
5741 ASSERT_OK(s);
5742 if (do_prepare) {
5743 s = txn0->Prepare();
5744 ASSERT_OK(s);
5745 }
5746 if (do_rollback) {
5747 // Test rolling back the batch with duplicates
5748 s = txn0->Rollback();
5749 ASSERT_OK(s);
5750 } else {
5751 if (with_commit_batch) {
5752 assert(do_prepare);
5753 auto cb = txn0->GetCommitTimeWriteBatch();
5754 // duplicate a key in the original batch
5755 // TODO(myabandeh): the behavior of GetCommitTimeWriteBatch
5756 // conflicting with the prepared batch is currently undefined and
5757 // gives different results in different implementations.
5758
5759 // s = cb->Put(Slice("foo0"), Slice("bar0d"));
5760 // ASSERT_OK(s);
5761 // add a new duplicate key
5762 s = cb->Put(Slice("foo6"), Slice("bar6a"));
5763 ASSERT_OK(s);
5764 s = cb->Put(Slice("foo6"), Slice("bar6b"));
5765 ASSERT_OK(s);
5766 // add a duplicate key that is removed in the same batch
5767 s = cb->Put(Slice("foo7"), Slice("bar7a"));
5768 ASSERT_OK(s);
5769 s = cb->Delete(Slice("foo7"));
5770 ASSERT_OK(s);
5771 }
5772 s = txn0->Commit();
5773 ASSERT_OK(s);
5774 }
5775 delete txn0;
5776 ReadOptions ropt;
5777 PinnableSlice pinnable_val;
5778
5779 if (do_rollback) {
5780 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5781 ASSERT_TRUE(s.IsNotFound());
5782 s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5783 ASSERT_TRUE(s.IsNotFound());
5784 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5785 ASSERT_TRUE(s.IsNotFound());
5786 s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5787 ASSERT_TRUE(s.IsNotFound());
5788 s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5789 ASSERT_TRUE(s.IsNotFound());
5790 s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5791 ASSERT_TRUE(s.IsNotFound());
5792 } else {
5793 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5794 ASSERT_OK(s);
5795 ASSERT_TRUE(pinnable_val == ("bar0c"));
5796 s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5797 ASSERT_OK(s);
5798 ASSERT_TRUE(pinnable_val == ("bar0-cf1"));
5799 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5800 ASSERT_OK(s);
5801 ASSERT_TRUE(pinnable_val == ("bar1"));
5802 s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5803 ASSERT_OK(s);
5804 ASSERT_TRUE(pinnable_val == ("bar2a,bar2b"));
5805 s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5806 ASSERT_OK(s);
5807 ASSERT_TRUE(pinnable_val == ("bar3,bar3"));
5808 s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5809 ASSERT_TRUE(s.IsNotFound());
5810 if (with_commit_batch) {
5811 s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val);
5812 ASSERT_OK(s);
5813 ASSERT_TRUE(pinnable_val == ("bar6b"));
5814 s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val);
5815 ASSERT_TRUE(s.IsNotFound());
5816 }
5817 }
5818 delete cf_handle;
5819 } // with_commit_batch
5820 } // do_rollback
5821 } // do_prepare
5822
5823 if (!options.unordered_write) {
5824 // Also test with max_successive_merges > 0. max_successive_merges will not
5825 // affect our algorithm for duplicate key insertion but we add the test to
5826 // verify that.
5827 cf_options.max_successive_merges = 2;
5828 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
5829 ASSERT_OK(ReOpen());
5830 db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
5831 WriteOptions write_options;
5832 // Ensure one value for the key
5833 ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value")));
5834 WriteBatch batch;
5835 // Merge more than max_successive_merges times
5836 batch.Merge(cf_handle, Slice("key"), Slice("1"));
5837 batch.Merge(cf_handle, Slice("key"), Slice("2"));
5838 batch.Merge(cf_handle, Slice("key"), Slice("3"));
5839 batch.Merge(cf_handle, Slice("key"), Slice("4"));
5840 ASSERT_OK(db->Write(write_options, &batch));
5841 ReadOptions read_options;
5842 string value;
5843 ASSERT_OK(db->Get(read_options, cf_handle, "key", &value));
5844 ASSERT_EQ(value, "value,1,2,3,4");
5845 delete cf_handle;
5846 }
5847
5848 {
5849 // Test that the duplicate detection is not compromised after rolling back
5850 // to a save point
5851 TransactionOptions txn_options;
5852 WriteOptions write_options;
5853 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5854 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
5855 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
5856 txn0->SetSavePoint();
5857 ASSERT_OK(txn0->RollbackToSavePoint());
5858 ASSERT_OK(txn0->Commit());
5859 delete txn0;
5860 }
5861
5862 // Test sucessfull recovery after a crash
5863 {
5864 ASSERT_OK(ReOpen());
5865 TransactionOptions txn_options;
5866 WriteOptions write_options;
5867 ReadOptions ropt;
5868 Transaction* txn0;
5869 PinnableSlice pinnable_val;
5870 Status s;
5871
5872 std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5873 cf_options.comparator = comp_gc.get();
5874 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
5875 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5876 delete cf_handle;
5877 std::vector<ColumnFamilyDescriptor> cfds{
5878 ColumnFamilyDescriptor(kDefaultColumnFamilyName,
5879 ColumnFamilyOptions(options)),
5880 ColumnFamilyDescriptor(cf_name, cf_options),
5881 };
5882 std::vector<ColumnFamilyHandle*> handles;
5883 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5884
5885 ASSERT_OK(db->Put(write_options, "foo0", "init"));
5886 ASSERT_OK(db->Put(write_options, "foo1", "init"));
5887 ASSERT_OK(db->Put(write_options, handles[1], "foo0", "init"));
5888 ASSERT_OK(db->Put(write_options, handles[1], "foo1", "init"));
5889
5890 // one entry
5891 txn0 = db->BeginTransaction(write_options, txn_options);
5892 ASSERT_OK(txn0->SetName("xid"));
5893 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
5894 ASSERT_OK(txn0->Prepare());
5895 delete txn0;
5896 // This will check the asserts inside recovery code
5897 ASSERT_OK(db->FlushWAL(true));
5898 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5899 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5900 txn0 = db->GetTransactionByName("xid");
5901 ASSERT_TRUE(txn0 != nullptr);
5902 ASSERT_OK(txn0->Commit());
5903 delete txn0;
5904 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5905 ASSERT_OK(s);
5906 ASSERT_TRUE(pinnable_val == ("bar0a"));
5907
5908 // two entries, no duplicate
5909 txn0 = db->BeginTransaction(write_options, txn_options);
5910 ASSERT_OK(txn0->SetName("xid"));
5911 ASSERT_OK(txn0->Put(handles[1], Slice("foo0"), Slice("bar0b")));
5912 ASSERT_OK(txn0->Put(handles[1], Slice("fol1"), Slice("bar1b")));
5913 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
5914 ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1b")));
5915 ASSERT_OK(txn0->Prepare());
5916 delete txn0;
5917 // This will check the asserts inside recovery code
5918 db->FlushWAL(true);
5919 // Flush only cf 1
5920 reinterpret_cast<DBImpl*>(db->GetRootDB())
5921 ->TEST_FlushMemTable(true, false, handles[1]);
5922 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5923 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5924 txn0 = db->GetTransactionByName("xid");
5925 ASSERT_TRUE(txn0 != nullptr);
5926 ASSERT_OK(txn0->Commit());
5927 delete txn0;
5928 pinnable_val.Reset();
5929 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5930 ASSERT_OK(s);
5931 ASSERT_TRUE(pinnable_val == ("bar0b"));
5932 pinnable_val.Reset();
5933 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5934 ASSERT_OK(s);
5935 ASSERT_TRUE(pinnable_val == ("bar1b"));
5936 pinnable_val.Reset();
5937 s = db->Get(ropt, handles[1], "foo0", &pinnable_val);
5938 ASSERT_OK(s);
5939 ASSERT_TRUE(pinnable_val == ("bar0b"));
5940 pinnable_val.Reset();
5941 s = db->Get(ropt, handles[1], "fol1", &pinnable_val);
5942 ASSERT_OK(s);
5943 ASSERT_TRUE(pinnable_val == ("bar1b"));
5944
5945 // one duplicate with ::Put
5946 txn0 = db->BeginTransaction(write_options, txn_options);
5947 ASSERT_OK(txn0->SetName("xid"));
5948 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0c")));
5949 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey1"), Slice("bar1d")));
5950 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0c")));
5951 ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1c")));
5952 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0d")));
5953 ASSERT_OK(txn0->Prepare());
5954 delete txn0;
5955 // This will check the asserts inside recovery code
5956 ASSERT_OK(db->FlushWAL(true));
5957 // Flush only cf 1
5958 reinterpret_cast<DBImpl*>(db->GetRootDB())
5959 ->TEST_FlushMemTable(true, false, handles[1]);
5960 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5961 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5962 txn0 = db->GetTransactionByName("xid");
5963 ASSERT_TRUE(txn0 != nullptr);
5964 ASSERT_OK(txn0->Commit());
5965 delete txn0;
5966 pinnable_val.Reset();
5967 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5968 ASSERT_OK(s);
5969 ASSERT_TRUE(pinnable_val == ("bar0d"));
5970 pinnable_val.Reset();
5971 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5972 ASSERT_OK(s);
5973 ASSERT_TRUE(pinnable_val == ("bar1c"));
5974 pinnable_val.Reset();
5975 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
5976 ASSERT_OK(s);
5977 ASSERT_TRUE(pinnable_val == ("bar1d"));
5978
5979 // Duplicate with ::Put, ::Delete
5980 txn0 = db->BeginTransaction(write_options, txn_options);
5981 ASSERT_OK(txn0->SetName("xid"));
5982 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0e")));
5983 ASSERT_OK(txn0->Delete(handles[1], Slice("key-nonkey1")));
5984 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
5985 ASSERT_OK(txn0->Delete(Slice("foo0")));
5986 ASSERT_OK(txn0->Prepare());
5987 delete txn0;
5988 // This will check the asserts inside recovery code
5989 ASSERT_OK(db->FlushWAL(true));
5990 // Flush only cf 1
5991 reinterpret_cast<DBImpl*>(db->GetRootDB())
5992 ->TEST_FlushMemTable(true, false, handles[1]);
5993 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5994 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5995 txn0 = db->GetTransactionByName("xid");
5996 ASSERT_TRUE(txn0 != nullptr);
5997 ASSERT_OK(txn0->Commit());
5998 delete txn0;
5999 pinnable_val.Reset();
6000 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6001 ASSERT_TRUE(s.IsNotFound());
6002 pinnable_val.Reset();
6003 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6004 ASSERT_TRUE(s.IsNotFound());
6005
6006 // Duplicate with ::Put, ::SingleDelete
6007 txn0 = db->BeginTransaction(write_options, txn_options);
6008 ASSERT_OK(txn0->SetName("xid"));
6009 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0g")));
6010 ASSERT_OK(txn0->SingleDelete(handles[1], Slice("key-nonkey1")));
6011 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
6012 ASSERT_OK(txn0->SingleDelete(Slice("foo0")));
6013 ASSERT_OK(txn0->Prepare());
6014 delete txn0;
6015 // This will check the asserts inside recovery code
6016 ASSERT_OK(db->FlushWAL(true));
6017 // Flush only cf 1
6018 reinterpret_cast<DBImpl*>(db->GetRootDB())
6019 ->TEST_FlushMemTable(true, false, handles[1]);
6020 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6021 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6022 txn0 = db->GetTransactionByName("xid");
6023 ASSERT_TRUE(txn0 != nullptr);
6024 ASSERT_OK(txn0->Commit());
6025 delete txn0;
6026 pinnable_val.Reset();
6027 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6028 ASSERT_TRUE(s.IsNotFound());
6029 pinnable_val.Reset();
6030 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6031 ASSERT_TRUE(s.IsNotFound());
6032
6033 // Duplicate with ::Put, ::Merge
6034 txn0 = db->BeginTransaction(write_options, txn_options);
6035 ASSERT_OK(txn0->SetName("xid"));
6036 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar1i")));
6037 ASSERT_OK(txn0->Merge(handles[1], Slice("key-nonkey1"), Slice("bar1j")));
6038 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0f")));
6039 ASSERT_OK(txn0->Merge(Slice("foo0"), Slice("bar0g")));
6040 ASSERT_OK(txn0->Prepare());
6041 delete txn0;
6042 // This will check the asserts inside recovery code
6043 ASSERT_OK(db->FlushWAL(true));
6044 // Flush only cf 1
6045 reinterpret_cast<DBImpl*>(db->GetRootDB())
6046 ->TEST_FlushMemTable(true, false, handles[1]);
6047 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6048 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6049 txn0 = db->GetTransactionByName("xid");
6050 ASSERT_TRUE(txn0 != nullptr);
6051 ASSERT_OK(txn0->Commit());
6052 delete txn0;
6053 pinnable_val.Reset();
6054 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6055 ASSERT_OK(s);
6056 ASSERT_TRUE(pinnable_val == ("bar0f,bar0g"));
6057 pinnable_val.Reset();
6058 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6059 ASSERT_OK(s);
6060 ASSERT_TRUE(pinnable_val == ("bar1i,bar1j"));
6061
6062 for (auto h : handles) {
6063 delete h;
6064 }
6065 delete db;
6066 db = nullptr;
6067 }
6068 }
6069
6070 // Test that the reseek optimization in iterators will not result in an infinite
6071 // loop if there are too many uncommitted entries before the snapshot.
TEST_P(TransactionTest,ReseekOptimization)6072 TEST_P(TransactionTest, ReseekOptimization) {
6073 WriteOptions write_options;
6074 write_options.sync = true;
6075 write_options.disableWAL = false;
6076 ColumnFamilyDescriptor cfd;
6077 db->DefaultColumnFamily()->GetDescriptor(&cfd);
6078 auto max_skip = cfd.options.max_sequential_skip_in_iterations;
6079
6080 ASSERT_OK(db->Put(write_options, Slice("foo0"), Slice("initv")));
6081
6082 TransactionOptions txn_options;
6083 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
6084 ASSERT_OK(txn0->SetName("xid"));
6085 // Duplicate keys will result into separate sequence numbers in WritePrepared
6086 // and WriteUnPrepared
6087 for (size_t i = 0; i < 2 * max_skip; i++) {
6088 ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar")));
6089 }
6090 ASSERT_OK(txn0->Prepare());
6091 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("initv")));
6092
6093 ReadOptions read_options;
6094 // To avoid loops
6095 read_options.max_skippable_internal_keys = 10 * max_skip;
6096 Iterator* iter = db->NewIterator(read_options);
6097 ASSERT_OK(iter->status());
6098 size_t cnt = 0;
6099 iter->SeekToFirst();
6100 while (iter->Valid()) {
6101 iter->Next();
6102 ASSERT_OK(iter->status());
6103 cnt++;
6104 }
6105 ASSERT_EQ(cnt, 2);
6106 cnt = 0;
6107 iter->SeekToLast();
6108 while (iter->Valid()) {
6109 iter->Prev();
6110 ASSERT_OK(iter->status());
6111 cnt++;
6112 }
6113 ASSERT_EQ(cnt, 2);
6114 delete iter;
6115 txn0->Rollback();
6116 delete txn0;
6117 }
6118
6119 // After recovery in kPointInTimeRecovery mode, the corrupted log file remains
6120 // there. The new log files should be still read succesfully during recovery of
6121 // the 2nd crash.
TEST_P(TransactionTest,DoubleCrashInRecovery)6122 TEST_P(TransactionTest, DoubleCrashInRecovery) {
6123 for (const bool manual_wal_flush : {false, true}) {
6124 for (const bool write_after_recovery : {false, true}) {
6125 options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
6126 options.manual_wal_flush = manual_wal_flush;
6127 ReOpen();
6128 std::string cf_name = "two";
6129 ColumnFamilyOptions cf_options;
6130 ColumnFamilyHandle* cf_handle = nullptr;
6131 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
6132
6133 // Add a prepare entry to prevent the older logs from being deleted.
6134 WriteOptions write_options;
6135 TransactionOptions txn_options;
6136 Transaction* txn = db->BeginTransaction(write_options, txn_options);
6137 ASSERT_OK(txn->SetName("xid"));
6138 ASSERT_OK(txn->Put(Slice("foo-prepare"), Slice("bar-prepare")));
6139 ASSERT_OK(txn->Prepare());
6140
6141 FlushOptions flush_ops;
6142 db->Flush(flush_ops);
6143 // Now we have a log that cannot be deleted
6144
6145 ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1"));
6146 // Flush only the 2nd cf
6147 db->Flush(flush_ops, cf_handle);
6148
6149 // The value is large enough to be touched by the corruption we ingest
6150 // below.
6151 std::string large_value(400, ' ');
6152 // key/value not touched by corruption
6153 ASSERT_OK(db->Put(write_options, "foo2", "bar2"));
6154 // key/value touched by corruption
6155 ASSERT_OK(db->Put(write_options, "foo3", large_value));
6156 // key/value not touched by corruption
6157 ASSERT_OK(db->Put(write_options, "foo4", "bar4"));
6158
6159 db->FlushWAL(true);
6160 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
6161 uint64_t wal_file_id = db_impl->TEST_LogfileNumber();
6162 std::string fname = LogFileName(dbname, wal_file_id);
6163 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6164 delete txn;
6165 delete cf_handle;
6166 delete db;
6167 db = nullptr;
6168
6169 // Corrupt the last log file in the middle, so that it is not corrupted
6170 // in the tail.
6171 std::string file_content;
6172 ASSERT_OK(ReadFileToString(env, fname, &file_content));
6173 file_content[400] = 'h';
6174 file_content[401] = 'a';
6175 ASSERT_OK(env->DeleteFile(fname));
6176 ASSERT_OK(WriteStringToFile(env, file_content, fname, true));
6177
6178 // Recover from corruption
6179 std::vector<ColumnFamilyHandle*> handles;
6180 std::vector<ColumnFamilyDescriptor> column_families;
6181 column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName,
6182 ColumnFamilyOptions()));
6183 column_families.push_back(
6184 ColumnFamilyDescriptor("two", ColumnFamilyOptions()));
6185 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6186
6187 if (write_after_recovery) {
6188 // Write data to the log right after the corrupted log
6189 ASSERT_OK(db->Put(write_options, "foo5", large_value));
6190 }
6191
6192 // Persist data written to WAL during recovery or by the last Put
6193 db->FlushWAL(true);
6194 // 2nd crash to recover while having a valid log after the corrupted one.
6195 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6196 assert(db != nullptr);
6197 txn = db->GetTransactionByName("xid");
6198 ASSERT_TRUE(txn != nullptr);
6199 ASSERT_OK(txn->Commit());
6200 delete txn;
6201 for (auto handle : handles) {
6202 delete handle;
6203 }
6204 }
6205 }
6206 }
6207
6208 } // namespace ROCKSDB_NAMESPACE
6209
main(int argc,char ** argv)6210 int main(int argc, char** argv) {
6211 ::testing::InitGoogleTest(&argc, argv);
6212 return RUN_ALL_TESTS();
6213 }
6214
6215 #else
6216 #include <stdio.h>
6217
main(int,char **)6218 int main(int /*argc*/, char** /*argv*/) {
6219 fprintf(stderr,
6220 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
6221 return 0;
6222 }
6223
6224 #endif // ROCKSDB_LITE
6225