1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/transaction_test.h"
9
10 #include <algorithm>
11 #include <functional>
12 #include <string>
13 #include <thread>
14
15 #include "db/db_impl/db_impl.h"
16 #include "port/port.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/options.h"
19 #include "rocksdb/perf_context.h"
20 #include "rocksdb/utilities/transaction.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "table/mock_table.h"
23 #include "test_util/sync_point.h"
24 #include "test_util/testharness.h"
25 #include "test_util/testutil.h"
26 #include "test_util/transaction_test_util.h"
27 #include "util/random.h"
28 #include "util/string_util.h"
29 #include "utilities/fault_injection_env.h"
30 #include "utilities/merge_operators.h"
31 #include "utilities/merge_operators/string_append/stringappend.h"
32 #include "utilities/transactions/pessimistic_transaction_db.h"
33
34 using std::string;
35
36 namespace ROCKSDB_NAMESPACE {
37
38 INSTANTIATE_TEST_CASE_P(
39 DBAsBaseDB, TransactionTest,
40 ::testing::Values(
41 std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
42 std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
43 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
44 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
45 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
46 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
47 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
48 INSTANTIATE_TEST_CASE_P(
49 DBAsBaseDB, TransactionStressTest,
50 ::testing::Values(
51 std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
52 std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
53 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
54 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
55 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
56 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
57 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
58 INSTANTIATE_TEST_CASE_P(
59 StackableDBAsBaseDB, TransactionTest,
60 ::testing::Values(
61 std::make_tuple(true, true, WRITE_COMMITTED, kOrderedWrite),
62 std::make_tuple(true, true, WRITE_PREPARED, kOrderedWrite),
63 std::make_tuple(true, true, WRITE_UNPREPARED, kOrderedWrite)));
64
65 // MySQLStyleTransactionTest takes far too long for valgrind to run.
66 #ifndef ROCKSDB_VALGRIND_RUN
67 INSTANTIATE_TEST_CASE_P(
68 MySQLStyleTransactionTest, MySQLStyleTransactionTest,
69 ::testing::Values(
70 std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite, false),
71 std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite, false),
72 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, false),
73 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, true),
74 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, false),
75 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, true),
76 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, false),
77 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, true),
78 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, false),
79 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, true),
80 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, false),
81 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, true)));
82 #endif // ROCKSDB_VALGRIND_RUN
83
TEST_P(TransactionTest,DoubleEmptyWrite)84 TEST_P(TransactionTest, DoubleEmptyWrite) {
85 WriteOptions write_options;
86 write_options.sync = true;
87 write_options.disableWAL = false;
88
89 WriteBatch batch;
90
91 ASSERT_OK(db->Write(write_options, &batch));
92 ASSERT_OK(db->Write(write_options, &batch));
93
94 // Also test committing empty transactions in 2PC
95 TransactionOptions txn_options;
96 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
97 ASSERT_OK(txn0->SetName("xid"));
98 ASSERT_OK(txn0->Prepare());
99 ASSERT_OK(txn0->Commit());
100 delete txn0;
101
102 // Also test that it works during recovery
103 txn0 = db->BeginTransaction(write_options, txn_options);
104 ASSERT_OK(txn0->SetName("xid2"));
105 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
106 ASSERT_OK(txn0->Prepare());
107 delete txn0;
108 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
109 ASSERT_OK(ReOpenNoDelete());
110 assert(db != nullptr);
111 txn0 = db->GetTransactionByName("xid2");
112 ASSERT_OK(txn0->Commit());
113 delete txn0;
114 }
115
TEST_P(TransactionTest,SuccessTest)116 TEST_P(TransactionTest, SuccessTest) {
117 ASSERT_OK(db->ResetStats());
118
119 WriteOptions write_options;
120 ReadOptions read_options;
121 std::string value;
122
123 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
124 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
125
126 Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
127 ASSERT_TRUE(txn);
128
129 ASSERT_EQ(0, txn->GetNumPuts());
130 ASSERT_LE(0, txn->GetID());
131
132 ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
133 ASSERT_EQ(value, "bar");
134
135 ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
136
137 ASSERT_EQ(1, txn->GetNumPuts());
138
139 ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
140 ASSERT_EQ(value, "bar2");
141
142 ASSERT_OK(txn->Commit());
143
144 ASSERT_OK(db->Get(read_options, "foo", &value));
145 ASSERT_EQ(value, "bar2");
146
147 delete txn;
148 }
149
150 // The test clarifies the contract of do_validate and assume_tracked
151 // in GetForUpdate and Put/Merge/Delete
TEST_P(TransactionTest,AssumeExclusiveTracked)152 TEST_P(TransactionTest, AssumeExclusiveTracked) {
153 WriteOptions write_options;
154 ReadOptions read_options;
155 std::string value;
156 Status s;
157 TransactionOptions txn_options;
158 txn_options.lock_timeout = 1;
159 const bool EXCLUSIVE = true;
160 const bool DO_VALIDATE = true;
161 const bool ASSUME_LOCKED = true;
162
163 Transaction* txn = db->BeginTransaction(write_options, txn_options);
164 ASSERT_TRUE(txn);
165 txn->SetSnapshot();
166
167 // commit a value after the snapshot is taken
168 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
169
170 // By default write should fail to the commit after our snapshot
171 s = txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE);
172 ASSERT_TRUE(s.IsBusy());
173 // But the user could direct the db to skip validating the snapshot. The read
174 // value then should be the most recently committed
175 ASSERT_OK(
176 txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE, !DO_VALIDATE));
177 ASSERT_EQ(value, "bar");
178
179 // Although ValidateSnapshot is skipped the key must have still got locked
180 s = db->Put(write_options, Slice("foo"), Slice("bar"));
181 ASSERT_TRUE(s.IsTimedOut());
182
183 // By default the write operations should fail due to the commit after the
184 // snapshot
185 s = txn->Put(Slice("foo"), Slice("bar1"));
186 ASSERT_TRUE(s.IsBusy());
187 s = txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
188 !ASSUME_LOCKED);
189 ASSERT_TRUE(s.IsBusy());
190 // But the user could direct the db that it already assumes exclusive lock on
191 // the key due to the previous GetForUpdate call.
192 ASSERT_OK(txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
193 ASSUME_LOCKED));
194 ASSERT_OK(txn->Merge(db->DefaultColumnFamily(), Slice("foo"), Slice("bar2"),
195 ASSUME_LOCKED));
196 ASSERT_OK(
197 txn->Delete(db->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED));
198 ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"),
199 ASSUME_LOCKED));
200
201 ASSERT_OK(txn->Rollback());
202 delete txn;
203 }
204
205 // This test clarifies the contract of ValidateSnapshot
TEST_P(TransactionTest,ValidateSnapshotTest)206 TEST_P(TransactionTest, ValidateSnapshotTest) {
207 for (bool with_flush : {true}) {
208 for (bool with_2pc : {true}) {
209 ASSERT_OK(ReOpen());
210 WriteOptions write_options;
211 ReadOptions read_options;
212 std::string value;
213
214 assert(db != nullptr);
215 Transaction* txn1 =
216 db->BeginTransaction(write_options, TransactionOptions());
217 ASSERT_TRUE(txn1);
218 ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1")));
219 if (with_2pc) {
220 ASSERT_OK(txn1->SetName("xid1"));
221 ASSERT_OK(txn1->Prepare());
222 }
223
224 if (with_flush) {
225 auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
226 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
227 // Make sure the flushed memtable is not kept in memory
228 int max_memtable_in_history =
229 std::max(
230 options.max_write_buffer_number,
231 static_cast<int>(options.max_write_buffer_size_to_maintain) /
232 static_cast<int>(options.write_buffer_size)) +
233 1;
234 for (int i = 0; i < max_memtable_in_history; i++) {
235 ASSERT_OK(db->Put(write_options, Slice("key"), Slice("value")));
236 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
237 }
238 }
239
240 Transaction* txn2 =
241 db->BeginTransaction(write_options, TransactionOptions());
242 ASSERT_TRUE(txn2);
243 txn2->SetSnapshot();
244
245 ASSERT_OK(txn1->Commit());
246 delete txn1;
247
248 auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
249 // Test the simple case where the key is not tracked yet
250 auto trakced_seq = kMaxSequenceNumber;
251 auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
252 &trakced_seq);
253 ASSERT_TRUE(s.IsBusy());
254 delete txn2;
255 }
256 }
257 }
258
TEST_P(TransactionTest,WaitingTxn)259 TEST_P(TransactionTest, WaitingTxn) {
260 WriteOptions write_options;
261 ReadOptions read_options;
262 TransactionOptions txn_options;
263 string value;
264 Status s;
265
266 txn_options.lock_timeout = 1;
267 s = db->Put(write_options, Slice("foo"), Slice("bar"));
268 ASSERT_OK(s);
269
270 /* create second cf */
271 ColumnFamilyHandle* cfa;
272 ColumnFamilyOptions cf_options;
273 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
274 ASSERT_OK(s);
275 s = db->Put(write_options, cfa, Slice("foo"), Slice("bar"));
276 ASSERT_OK(s);
277
278 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
279 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
280 TransactionID id1 = txn1->GetID();
281 ASSERT_TRUE(txn1);
282 ASSERT_TRUE(txn2);
283
284 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
285 "PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) {
286 std::string key;
287 uint32_t cf_id;
288 std::vector<TransactionID> wait = txn2->GetWaitingTxns(&cf_id, &key);
289 ASSERT_EQ(key, "foo");
290 ASSERT_EQ(wait.size(), 1);
291 ASSERT_EQ(wait[0], id1);
292 ASSERT_EQ(cf_id, 0U);
293 });
294
295 get_perf_context()->Reset();
296 // lock key in default cf
297 s = txn1->GetForUpdate(read_options, "foo", &value);
298 ASSERT_OK(s);
299 ASSERT_EQ(value, "bar");
300 ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
301
302 // lock key in cfa
303 s = txn1->GetForUpdate(read_options, cfa, "foo", &value);
304 ASSERT_OK(s);
305 ASSERT_EQ(value, "bar");
306 ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
307
308 auto lock_data = db->GetLockStatusData();
309 // Locked keys exist in both column family.
310 ASSERT_EQ(lock_data.size(), 2);
311
312 auto cf_iterator = lock_data.begin();
313
314 // The iterator points to an unordered_multimap
315 // thus the test can not assume any particular order.
316
317 // Column family is 1 or 0 (cfa).
318 if (cf_iterator->first != 1 && cf_iterator->first != 0) {
319 FAIL();
320 }
321 // The locked key is "foo" and is locked by txn1
322 ASSERT_EQ(cf_iterator->second.key, "foo");
323 ASSERT_EQ(cf_iterator->second.ids.size(), 1);
324 ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
325
326 cf_iterator++;
327
328 // Column family is 0 (default) or 1.
329 if (cf_iterator->first != 1 && cf_iterator->first != 0) {
330 FAIL();
331 }
332 // The locked key is "foo" and is locked by txn1
333 ASSERT_EQ(cf_iterator->second.key, "foo");
334 ASSERT_EQ(cf_iterator->second.ids.size(), 1);
335 ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
336
337 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
338
339 s = txn2->GetForUpdate(read_options, "foo", &value);
340 ASSERT_TRUE(s.IsTimedOut());
341 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
342 ASSERT_EQ(get_perf_context()->key_lock_wait_count, 1);
343 ASSERT_GE(get_perf_context()->key_lock_wait_time, 0);
344
345 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
346 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
347
348 delete cfa;
349 delete txn1;
350 delete txn2;
351 }
352
TEST_P(TransactionTest,SharedLocks)353 TEST_P(TransactionTest, SharedLocks) {
354 WriteOptions write_options;
355 ReadOptions read_options;
356 TransactionOptions txn_options;
357 Status s;
358
359 txn_options.lock_timeout = 1;
360 s = db->Put(write_options, Slice("foo"), Slice("bar"));
361 ASSERT_OK(s);
362
363 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
364 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
365 Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
366 ASSERT_TRUE(txn1);
367 ASSERT_TRUE(txn2);
368 ASSERT_TRUE(txn3);
369
370 // Test shared access between txns
371 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
372 ASSERT_OK(s);
373
374 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
375 ASSERT_OK(s);
376
377 s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
378 ASSERT_OK(s);
379
380 auto lock_data = db->GetLockStatusData();
381 ASSERT_EQ(lock_data.size(), 1);
382
383 auto cf_iterator = lock_data.begin();
384 ASSERT_EQ(cf_iterator->second.key, "foo");
385
386 // We compare whether the set of txns locking this key is the same. To do
387 // this, we need to sort both vectors so that the comparison is done
388 // correctly.
389 std::vector<TransactionID> expected_txns = {txn1->GetID(), txn2->GetID(),
390 txn3->GetID()};
391 std::vector<TransactionID> lock_txns = cf_iterator->second.ids;
392 ASSERT_EQ(expected_txns, lock_txns);
393 ASSERT_FALSE(cf_iterator->second.exclusive);
394
395 ASSERT_OK(txn1->Rollback());
396 ASSERT_OK(txn2->Rollback());
397 ASSERT_OK(txn3->Rollback());
398
399 // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
400 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
401 ASSERT_OK(s);
402
403 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
404 ASSERT_OK(s);
405
406 s = txn3->GetForUpdate(read_options, "foo", nullptr);
407 ASSERT_TRUE(s.IsTimedOut());
408 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
409
410 txn1->UndoGetForUpdate("foo");
411 s = txn3->GetForUpdate(read_options, "foo", nullptr);
412 ASSERT_TRUE(s.IsTimedOut());
413 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
414
415 txn2->UndoGetForUpdate("foo");
416 s = txn3->GetForUpdate(read_options, "foo", nullptr);
417 ASSERT_OK(s);
418
419 ASSERT_OK(txn1->Rollback());
420 ASSERT_OK(txn2->Rollback());
421 ASSERT_OK(txn3->Rollback());
422
423 // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
424 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
425 ASSERT_OK(s);
426
427 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
428 ASSERT_OK(s);
429
430 s = txn2->GetForUpdate(read_options, "foo", nullptr);
431 ASSERT_TRUE(s.IsTimedOut());
432 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
433
434 txn1->UndoGetForUpdate("foo");
435 s = txn2->GetForUpdate(read_options, "foo", nullptr);
436 ASSERT_OK(s);
437
438 ASSERT_OK(txn1->Rollback());
439 ASSERT_OK(txn2->Rollback());
440
441 // Test txn1 trying to downgrade its lock.
442 s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */);
443 ASSERT_OK(s);
444
445 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
446 ASSERT_TRUE(s.IsTimedOut());
447 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
448
449 // Should still fail after "downgrading".
450 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
451 ASSERT_OK(s);
452
453 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
454 ASSERT_TRUE(s.IsTimedOut());
455 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
456
457 ASSERT_OK(txn1->Rollback());
458 ASSERT_OK(txn2->Rollback());
459
460 // Test txn1 holding an exclusive lock and txn2 trying to obtain shared
461 // access.
462 s = txn1->GetForUpdate(read_options, "foo", nullptr);
463 ASSERT_OK(s);
464
465 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
466 ASSERT_TRUE(s.IsTimedOut());
467 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
468
469 txn1->UndoGetForUpdate("foo");
470 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
471 ASSERT_OK(s);
472
473 delete txn1;
474 delete txn2;
475 delete txn3;
476 }
477
TEST_P(TransactionTest,DeadlockCycleShared)478 TEST_P(TransactionTest, DeadlockCycleShared) {
479 WriteOptions write_options;
480 ReadOptions read_options;
481 TransactionOptions txn_options;
482
483 txn_options.lock_timeout = 1000000;
484 txn_options.deadlock_detect = true;
485
486 // Set up a wait for chain like this:
487 //
488 // Tn -> T(n*2)
489 // Tn -> T(n*2 + 1)
490 //
491 // So we have:
492 // T1 -> T2 -> T4 ...
493 // | |> T5 ...
494 // |> T3 -> T6 ...
495 // |> T7 ...
496 // up to T31, then T[16 - 31] -> T1.
497 // Note that Tn holds lock on floor(n / 2).
498
499 std::vector<Transaction*> txns(31);
500
501 for (uint32_t i = 0; i < 31; i++) {
502 txns[i] = db->BeginTransaction(write_options, txn_options);
503 ASSERT_TRUE(txns[i]);
504 auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr,
505 false /* exclusive */);
506 ASSERT_OK(s);
507 }
508
509 std::atomic<uint32_t> checkpoints(0);
510 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
511 "PointLockManager::AcquireWithTimeout:WaitingTxn",
512 [&](void* /*arg*/) { checkpoints.fetch_add(1); });
513 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
514
515 // We want the leaf transactions to block and hold everyone back.
516 std::vector<port::Thread> threads;
517 for (uint32_t i = 0; i < 15; i++) {
518 std::function<void()> blocking_thread = [&, i] {
519 auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr,
520 true /* exclusive */);
521 ASSERT_OK(s);
522 ASSERT_OK(txns[i]->Rollback());
523 delete txns[i];
524 };
525 threads.emplace_back(blocking_thread);
526 }
527
528 // Wait until all threads are waiting on each other.
529 while (checkpoints.load() != 15) {
530 /* sleep override */
531 std::this_thread::sleep_for(std::chrono::milliseconds(100));
532 }
533 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
534 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
535
536 // Complete the cycle T[16 - 31] -> T1
537 for (uint32_t i = 15; i < 31; i++) {
538 auto s =
539 txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */);
540 ASSERT_TRUE(s.IsDeadlock());
541
542 // Calculate next buffer len, plateau at 5 when 5 records are inserted.
543 const uint32_t curr_dlock_buffer_len_ =
544 (i - 14 > kInitialMaxDeadlocks) ? kInitialMaxDeadlocks : (i - 14);
545
546 auto dlock_buffer = db->GetDeadlockInfoBuffer();
547 ASSERT_EQ(dlock_buffer.size(), curr_dlock_buffer_len_);
548 auto dlock_entry = dlock_buffer[0].path;
549 ASSERT_EQ(dlock_entry.size(), kInitialMaxDeadlocks);
550 int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
551 int64_t cur_deadlock_time = 0;
552 for (auto const& dl_path_rec : dlock_buffer) {
553 cur_deadlock_time = dl_path_rec.deadlock_time;
554 ASSERT_NE(cur_deadlock_time, 0);
555 ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
556 pre_deadlock_time = cur_deadlock_time;
557 }
558
559 int64_t curr_waiting_key = 0;
560
561 // Offset of each txn id from the root of the shared dlock tree's txn id.
562 int64_t offset_root = dlock_entry[0].m_txn_id - 1;
563 // Offset of the final entry in the dlock path from the root's txn id.
564 TransactionID leaf_id =
565 dlock_entry[dlock_entry.size() - 1].m_txn_id - offset_root;
566
567 for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
568 auto dl_node = *it;
569 ASSERT_EQ(dl_node.m_txn_id, offset_root + leaf_id);
570 ASSERT_EQ(dl_node.m_cf_id, 0U);
571 ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
572 ASSERT_EQ(dl_node.m_exclusive, true);
573
574 if (curr_waiting_key == 0) {
575 curr_waiting_key = leaf_id;
576 }
577 curr_waiting_key /= 2;
578 leaf_id /= 2;
579 }
580 }
581
582 // Rollback the leaf transaction.
583 for (uint32_t i = 15; i < 31; i++) {
584 ASSERT_OK(txns[i]->Rollback());
585 delete txns[i];
586 }
587
588 for (auto& t : threads) {
589 t.join();
590 }
591
592 // Downsize the buffer and verify the 3 latest deadlocks are preserved.
593 auto dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
594 db->SetDeadlockInfoBufferSize(3);
595 auto dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
596 ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
597
598 for (uint32_t i = 0; i < dlock_buffer_after_resize.size(); i++) {
599 for (uint32_t j = 0; j < dlock_buffer_after_resize[i].path.size(); j++) {
600 ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
601 dlock_buffer_before_resize[i].path[j].m_txn_id);
602 }
603 }
604
605 // Upsize the buffer and verify the 3 latest dealocks are preserved.
606 dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
607 db->SetDeadlockInfoBufferSize(5);
608 dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
609 ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
610
611 for (uint32_t i = 0; i < dlock_buffer_before_resize.size(); i++) {
612 for (uint32_t j = 0; j < dlock_buffer_before_resize[i].path.size(); j++) {
613 ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
614 dlock_buffer_before_resize[i].path[j].m_txn_id);
615 }
616 }
617
618 // Downsize to 0 and verify the size is consistent.
619 dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
620 db->SetDeadlockInfoBufferSize(0);
621 dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
622 ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
623
624 // Upsize from 0 to verify the size is persistent.
625 dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
626 db->SetDeadlockInfoBufferSize(3);
627 dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
628 ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
629
630 // Contrived case of shared lock of cycle size 2 to verify that a shared
631 // lock causing a deadlock is correctly reported as "shared" in the buffer.
632 std::vector<Transaction*> txns_shared(2);
633
634 // Create a cycle of size 2.
635 for (uint32_t i = 0; i < 2; i++) {
636 txns_shared[i] = db->BeginTransaction(write_options, txn_options);
637 ASSERT_TRUE(txns_shared[i]);
638 auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i), nullptr);
639 ASSERT_OK(s);
640 }
641
642 std::atomic<uint32_t> checkpoints_shared(0);
643 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
644 "PointLockManager::AcquireWithTimeout:WaitingTxn",
645 [&](void* /*arg*/) { checkpoints_shared.fetch_add(1); });
646 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
647
648 std::vector<port::Thread> threads_shared;
649 for (uint32_t i = 0; i < 1; i++) {
650 std::function<void()> blocking_thread = [&, i] {
651 auto s =
652 txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
653 ASSERT_OK(s);
654 ASSERT_OK(txns_shared[i]->Rollback());
655 delete txns_shared[i];
656 };
657 threads_shared.emplace_back(blocking_thread);
658 }
659
660 // Wait until all threads are waiting on each other.
661 while (checkpoints_shared.load() != 1) {
662 /* sleep override */
663 std::this_thread::sleep_for(std::chrono::milliseconds(100));
664 }
665 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
666 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
667
668 // Complete the cycle T2 -> T1 with a shared lock.
669 auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false);
670 ASSERT_TRUE(s.IsDeadlock());
671
672 auto dlock_buffer = db->GetDeadlockInfoBuffer();
673
674 // Verify the size of the buffer and the single path.
675 ASSERT_EQ(dlock_buffer.size(), 1);
676 ASSERT_EQ(dlock_buffer[0].path.size(), 2);
677
678 // Verify the exclusivity field of the transactions in the deadlock path.
679 ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive);
680 ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive);
681 ASSERT_OK(txns_shared[1]->Rollback());
682 delete txns_shared[1];
683
684 for (auto& t : threads_shared) {
685 t.join();
686 }
687 }
688
689 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionStressTest,DeadlockCycle)690 TEST_P(TransactionStressTest, DeadlockCycle) {
691 WriteOptions write_options;
692 ReadOptions read_options;
693 TransactionOptions txn_options;
694
695 // offset by 2 from the max depth to test edge case
696 const uint32_t kMaxCycleLength = 52;
697
698 txn_options.lock_timeout = 1000000;
699 txn_options.deadlock_detect = true;
700
701 for (uint32_t len = 2; len < kMaxCycleLength; len++) {
702 // Set up a long wait for chain like this:
703 //
704 // T1 -> T2 -> T3 -> ... -> Tlen
705
706 std::vector<Transaction*> txns(len);
707
708 for (uint32_t i = 0; i < len; i++) {
709 txns[i] = db->BeginTransaction(write_options, txn_options);
710 ASSERT_TRUE(txns[i]);
711 auto s = txns[i]->GetForUpdate(read_options, ToString(i), nullptr);
712 ASSERT_OK(s);
713 }
714
715 std::atomic<uint32_t> checkpoints(0);
716 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
717 "PointLockManager::AcquireWithTimeout:WaitingTxn",
718 [&](void* /*arg*/) { checkpoints.fetch_add(1); });
719 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
720
721 // We want the last transaction in the chain to block and hold everyone
722 // back.
723 std::vector<port::Thread> threads;
724 for (uint32_t i = 0; i + 1 < len; i++) {
725 std::function<void()> blocking_thread = [&, i] {
726 auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
727 ASSERT_OK(s);
728 ASSERT_OK(txns[i]->Rollback());
729 delete txns[i];
730 };
731 threads.emplace_back(blocking_thread);
732 }
733
734 // Wait until all threads are waiting on each other.
735 while (checkpoints.load() != len - 1) {
736 /* sleep override */
737 std::this_thread::sleep_for(std::chrono::milliseconds(100));
738 }
739 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
740 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
741
742 // Complete the cycle Tlen -> T1
743 auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr);
744 ASSERT_TRUE(s.IsDeadlock());
745
746 const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1);
747 uint32_t curr_waiting_key = 0;
748 TransactionID curr_txn_id = txns[0]->GetID();
749
750 auto dlock_buffer = db->GetDeadlockInfoBuffer();
751 ASSERT_EQ(dlock_buffer.size(), dlock_buffer_size_);
752 uint32_t check_len = len;
753 bool check_limit_flag = false;
754
755 // Special case for a deadlock path that exceeds the maximum depth.
756 if (len > 50) {
757 check_len = 0;
758 check_limit_flag = true;
759 }
760 auto dlock_entry = dlock_buffer[0].path;
761 ASSERT_EQ(dlock_entry.size(), check_len);
762 ASSERT_EQ(dlock_buffer[0].limit_exceeded, check_limit_flag);
763
764 int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
765 int64_t cur_deadlock_time = 0;
766 for (auto const& dl_path_rec : dlock_buffer) {
767 cur_deadlock_time = dl_path_rec.deadlock_time;
768 ASSERT_NE(cur_deadlock_time, 0);
769 ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
770 pre_deadlock_time = cur_deadlock_time;
771 }
772
773 // Iterates backwards over path verifying decreasing txn_ids.
774 for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
775 auto dl_node = *it;
776 ASSERT_EQ(dl_node.m_txn_id, len + curr_txn_id - 1);
777 ASSERT_EQ(dl_node.m_cf_id, 0u);
778 ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
779 ASSERT_EQ(dl_node.m_exclusive, true);
780
781 curr_txn_id--;
782 if (curr_waiting_key == 0) {
783 curr_waiting_key = len;
784 }
785 curr_waiting_key--;
786 }
787
788 // Rollback the last transaction.
789 ASSERT_OK(txns[len - 1]->Rollback());
790 delete txns[len - 1];
791
792 for (auto& t : threads) {
793 t.join();
794 }
795 }
796 }
797
TEST_P(TransactionStressTest,DeadlockStress)798 TEST_P(TransactionStressTest, DeadlockStress) {
799 const uint32_t NUM_TXN_THREADS = 10;
800 const uint32_t NUM_KEYS = 100;
801 const uint32_t NUM_ITERS = 10000;
802
803 WriteOptions write_options;
804 ReadOptions read_options;
805 TransactionOptions txn_options;
806
807 txn_options.lock_timeout = 1000000;
808 txn_options.deadlock_detect = true;
809 std::vector<std::string> keys;
810
811 for (uint32_t i = 0; i < NUM_KEYS; i++) {
812 ASSERT_OK(db->Put(write_options, Slice(ToString(i)), Slice("")));
813 keys.push_back(ToString(i));
814 }
815
816 size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
817 Random rnd(static_cast<uint32_t>(tid));
818 std::function<void(uint32_t)> stress_thread = [&](uint32_t seed) {
819 std::default_random_engine g(seed);
820
821 Transaction* txn;
822 for (uint32_t i = 0; i < NUM_ITERS; i++) {
823 txn = db->BeginTransaction(write_options, txn_options);
824 auto random_keys = keys;
825 std::shuffle(random_keys.begin(), random_keys.end(), g);
826
827 // Lock keys in random order.
828 for (const auto& k : random_keys) {
829 // Lock mostly for shared access, but exclusive 1/4 of the time.
830 auto s =
831 txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0);
832 if (!s.ok()) {
833 ASSERT_TRUE(s.IsDeadlock());
834 ASSERT_OK(txn->Rollback());
835 break;
836 }
837 }
838
839 delete txn;
840 }
841 };
842
843 std::vector<port::Thread> threads;
844 for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
845 threads.emplace_back(stress_thread, rnd.Next());
846 }
847
848 for (auto& t : threads) {
849 t.join();
850 }
851 }
852 #endif // ROCKSDB_VALGRIND_RUN
853
TEST_P(TransactionTest,CommitTimeBatchFailTest)854 TEST_P(TransactionTest, CommitTimeBatchFailTest) {
855 WriteOptions write_options;
856 TransactionOptions txn_options;
857
858 std::string value;
859 Status s;
860
861 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
862 ASSERT_TRUE(txn1);
863
864 ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"));
865
866 s = txn1->Put("foo", "bar");
867 ASSERT_OK(s);
868
869 // fails due to non-empty commit-time batch
870 s = txn1->Commit();
871 ASSERT_EQ(s, Status::InvalidArgument());
872
873 delete txn1;
874 }
875
TEST_P(TransactionTest,LogMarkLeakTest)876 TEST_P(TransactionTest, LogMarkLeakTest) {
877 TransactionOptions txn_options;
878 WriteOptions write_options;
879 options.write_buffer_size = 1024;
880 ASSERT_OK(ReOpenNoDelete());
881 assert(db != nullptr);
882 Random rnd(47);
883 std::vector<Transaction*> txns;
884 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
885 // At the beginning there should be no log containing prepare data
886 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
887 for (size_t i = 0; i < 100; i++) {
888 Transaction* txn = db->BeginTransaction(write_options, txn_options);
889 ASSERT_OK(txn->SetName("xid" + ToString(i)));
890 ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar")));
891 ASSERT_OK(txn->Prepare());
892 ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
893 if (rnd.OneIn(5)) {
894 txns.push_back(txn);
895 } else {
896 ASSERT_OK(txn->Commit());
897 delete txn;
898 }
899 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
900 }
901 for (auto txn : txns) {
902 ASSERT_OK(txn->Commit());
903 delete txn;
904 }
905 // At the end there should be no log left containing prepare data
906 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
907 // Make sure that the underlying data structures are properly truncated and
908 // cause not leak
909 ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0);
910 ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0);
911 }
912
TEST_P(TransactionTest,SimpleTwoPhaseTransactionTest)913 TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
914 for (bool cwb4recovery : {true, false}) {
915 ASSERT_OK(ReOpen());
916 WriteOptions write_options;
917 ReadOptions read_options;
918
919 TransactionOptions txn_options;
920 txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery;
921
922 string value;
923 Status s;
924
925 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
926
927 Transaction* txn = db->BeginTransaction(write_options, txn_options);
928 s = txn->SetName("xid");
929 ASSERT_OK(s);
930
931 ASSERT_EQ(db->GetTransactionByName("xid"), txn);
932
933 // transaction put
934 s = txn->Put(Slice("foo"), Slice("bar"));
935 ASSERT_OK(s);
936 ASSERT_EQ(1, txn->GetNumPuts());
937
938 // regular db put
939 s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
940 ASSERT_OK(s);
941 ASSERT_EQ(1, txn->GetNumPuts());
942
943 // regular db read
944 ASSERT_OK(db->Get(read_options, "foo2", &value));
945 ASSERT_EQ(value, "bar2");
946
947 // commit time put
948 ASSERT_OK(
949 txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")));
950 ASSERT_OK(
951 txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")));
952
953 // nothing has been prepped yet
954 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
955
956 s = txn->Prepare();
957 ASSERT_OK(s);
958
959 // data not im mem yet
960 s = db->Get(read_options, Slice("foo"), &value);
961 ASSERT_TRUE(s.IsNotFound());
962 s = db->Get(read_options, Slice("gtid"), &value);
963 ASSERT_TRUE(s.IsNotFound());
964
965 // find trans in list of prepared transactions
966 std::vector<Transaction*> prepared_trans;
967 db->GetAllPreparedTransactions(&prepared_trans);
968 ASSERT_EQ(prepared_trans.size(), 1);
969 ASSERT_EQ(prepared_trans.front()->GetName(), "xid");
970
971 auto log_containing_prep =
972 db_impl->TEST_FindMinLogContainingOutstandingPrep();
973 ASSERT_GT(log_containing_prep, 0);
974
975 // make commit
976 s = txn->Commit();
977 ASSERT_OK(s);
978
979 // value is now available
980 s = db->Get(read_options, "foo", &value);
981 ASSERT_OK(s);
982 ASSERT_EQ(value, "bar");
983
984 if (!cwb4recovery) {
985 s = db->Get(read_options, "gtid", &value);
986 ASSERT_OK(s);
987 ASSERT_EQ(value, "dogs");
988
989 s = db->Get(read_options, "gtid2", &value);
990 ASSERT_OK(s);
991 ASSERT_EQ(value, "cats");
992 }
993
994 // we already committed
995 s = txn->Commit();
996 ASSERT_EQ(s, Status::InvalidArgument());
997
998 // no longer is prepared results
999 db->GetAllPreparedTransactions(&prepared_trans);
1000 ASSERT_EQ(prepared_trans.size(), 0);
1001 ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1002
1003 // heap should not care about prepared section anymore
1004 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1005
1006 switch (txn_db_options.write_policy) {
1007 case WRITE_COMMITTED:
1008 // but now our memtable should be referencing the prep section
1009 ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1010 ASSERT_EQ(log_containing_prep,
1011 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1012 break;
1013 case WRITE_PREPARED:
1014 case WRITE_UNPREPARED:
1015 // In these modes memtable do not ref the prep sections
1016 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1017 break;
1018 default:
1019 assert(false);
1020 }
1021
1022 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1023 // After flush the recoverable state must be visible
1024 if (cwb4recovery) {
1025 s = db->Get(read_options, "gtid", &value);
1026 ASSERT_OK(s);
1027 ASSERT_EQ(value, "dogs");
1028
1029 s = db->Get(read_options, "gtid2", &value);
1030 ASSERT_OK(s);
1031 ASSERT_EQ(value, "cats");
1032 }
1033
1034 // after memtable flush we can now relese the log
1035 ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
1036 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1037
1038 delete txn;
1039
1040 if (cwb4recovery) {
1041 // kill and reopen to trigger recovery
1042 s = ReOpenNoDelete();
1043 ASSERT_OK(s);
1044 assert(db != nullptr);
1045 s = db->Get(read_options, "gtid", &value);
1046 ASSERT_OK(s);
1047 ASSERT_EQ(value, "dogs");
1048
1049 s = db->Get(read_options, "gtid2", &value);
1050 ASSERT_OK(s);
1051 ASSERT_EQ(value, "cats");
1052 }
1053 }
1054 }
1055
TEST_P(TransactionTest,TwoPhaseNameTest)1056 TEST_P(TransactionTest, TwoPhaseNameTest) {
1057 Status s;
1058
1059 WriteOptions write_options;
1060 TransactionOptions txn_options;
1061 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1062 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1063 Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
1064 ASSERT_TRUE(txn3);
1065 delete txn3;
1066
1067 // cant prepare txn without name
1068 s = txn1->Prepare();
1069 ASSERT_EQ(s, Status::InvalidArgument());
1070
1071 // name too short
1072 s = txn1->SetName("");
1073 ASSERT_EQ(s, Status::InvalidArgument());
1074
1075 // name too long
1076 s = txn1->SetName(std::string(513, 'x'));
1077 ASSERT_EQ(s, Status::InvalidArgument());
1078
1079 // valid set name
1080 s = txn1->SetName("name1");
1081 ASSERT_OK(s);
1082
1083 // cant have duplicate name
1084 s = txn2->SetName("name1");
1085 ASSERT_EQ(s, Status::InvalidArgument());
1086
1087 // shouldn't be able to prepare
1088 s = txn2->Prepare();
1089 ASSERT_EQ(s, Status::InvalidArgument());
1090
1091 // valid name set
1092 s = txn2->SetName("name2");
1093 ASSERT_OK(s);
1094
1095 // cant reset name
1096 s = txn2->SetName("name3");
1097 ASSERT_EQ(s, Status::InvalidArgument());
1098
1099 ASSERT_EQ(txn1->GetName(), "name1");
1100 ASSERT_EQ(txn2->GetName(), "name2");
1101
1102 s = txn1->Prepare();
1103 ASSERT_OK(s);
1104
1105 // can't rename after prepare
1106 s = txn1->SetName("name4");
1107 ASSERT_EQ(s, Status::InvalidArgument());
1108
1109 ASSERT_OK(txn1->Rollback());
1110 ASSERT_OK(txn2->Rollback());
1111 delete txn1;
1112 delete txn2;
1113 }
1114
TEST_P(TransactionTest,TwoPhaseEmptyWriteTest)1115 TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
1116 for (bool cwb4recovery : {true, false}) {
1117 for (bool test_with_empty_wal : {true, false}) {
1118 if (!cwb4recovery && test_with_empty_wal) {
1119 continue;
1120 }
1121 ASSERT_OK(ReOpen());
1122 Status s;
1123 std::string value;
1124
1125 WriteOptions write_options;
1126 ReadOptions read_options;
1127 TransactionOptions txn_options;
1128 txn_options.use_only_the_last_commit_time_batch_for_recovery =
1129 cwb4recovery;
1130 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1131 ASSERT_TRUE(txn1);
1132 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1133 ASSERT_TRUE(txn2);
1134
1135 s = txn1->SetName("joe");
1136 ASSERT_OK(s);
1137
1138 s = txn2->SetName("bob");
1139 ASSERT_OK(s);
1140
1141 s = txn1->Prepare();
1142 ASSERT_OK(s);
1143
1144 s = txn1->Commit();
1145 ASSERT_OK(s);
1146
1147 delete txn1;
1148
1149 ASSERT_OK(
1150 txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")));
1151
1152 s = txn2->Prepare();
1153 ASSERT_OK(s);
1154
1155 s = txn2->Commit();
1156 ASSERT_OK(s);
1157
1158 delete txn2;
1159 if (!cwb4recovery) {
1160 s = db->Get(read_options, "foo", &value);
1161 ASSERT_OK(s);
1162 ASSERT_EQ(value, "bar");
1163 } else {
1164 if (test_with_empty_wal) {
1165 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1166 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1167 // After flush the state must be visible
1168 s = db->Get(read_options, "foo", &value);
1169 ASSERT_OK(s);
1170 ASSERT_EQ(value, "bar");
1171 }
1172 ASSERT_OK(db->FlushWAL(true));
1173 // kill and reopen to trigger recovery
1174 s = ReOpenNoDelete();
1175 ASSERT_OK(s);
1176 assert(db != nullptr);
1177 s = db->Get(read_options, "foo", &value);
1178 ASSERT_OK(s);
1179 ASSERT_EQ(value, "bar");
1180 }
1181 }
1182 }
1183 }
1184
1185 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionStressTest,TwoPhaseExpirationTest)1186 TEST_P(TransactionStressTest, TwoPhaseExpirationTest) {
1187 Status s;
1188
1189 WriteOptions write_options;
1190 TransactionOptions txn_options;
1191 txn_options.expiration = 500; // 500ms
1192 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1193 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1194 ASSERT_TRUE(txn1);
1195 ASSERT_TRUE(txn1);
1196
1197 s = txn1->SetName("joe");
1198 ASSERT_OK(s);
1199 s = txn2->SetName("bob");
1200 ASSERT_OK(s);
1201
1202 s = txn1->Prepare();
1203 ASSERT_OK(s);
1204
1205 /* sleep override */
1206 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1207
1208 s = txn1->Commit();
1209 ASSERT_OK(s);
1210
1211 s = txn2->Prepare();
1212 ASSERT_EQ(s, Status::Expired());
1213
1214 delete txn1;
1215 delete txn2;
1216 }
1217
TEST_P(TransactionTest,TwoPhaseRollbackTest)1218 TEST_P(TransactionTest, TwoPhaseRollbackTest) {
1219 WriteOptions write_options;
1220 ReadOptions read_options;
1221
1222 TransactionOptions txn_options;
1223
1224 std::string value;
1225 Status s;
1226
1227 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1228 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1229 s = txn->SetName("xid");
1230 ASSERT_OK(s);
1231
1232 // transaction put
1233 s = txn->Put(Slice("tfoo"), Slice("tbar"));
1234 ASSERT_OK(s);
1235
1236 // value is readable form txn
1237 s = txn->Get(read_options, Slice("tfoo"), &value);
1238 ASSERT_OK(s);
1239 ASSERT_EQ(value, "tbar");
1240
1241 // issue rollback
1242 s = txn->Rollback();
1243 ASSERT_OK(s);
1244
1245 // value is nolonger readable
1246 s = txn->Get(read_options, Slice("tfoo"), &value);
1247 ASSERT_TRUE(s.IsNotFound());
1248 ASSERT_EQ(txn->GetNumPuts(), 0);
1249
1250 // put new txn values
1251 s = txn->Put(Slice("tfoo2"), Slice("tbar2"));
1252 ASSERT_OK(s);
1253
1254 // new value is readable from txn
1255 s = txn->Get(read_options, Slice("tfoo2"), &value);
1256 ASSERT_OK(s);
1257 ASSERT_EQ(value, "tbar2");
1258
1259 s = txn->Prepare();
1260 ASSERT_OK(s);
1261
1262 // flush to next wal
1263 s = db->Put(write_options, Slice("foo"), Slice("bar"));
1264 ASSERT_OK(s);
1265 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1266
1267 // issue rollback (marker written to WAL)
1268 s = txn->Rollback();
1269 ASSERT_OK(s);
1270
1271 // value is nolonger readable
1272 s = txn->Get(read_options, Slice("tfoo2"), &value);
1273 ASSERT_TRUE(s.IsNotFound());
1274 ASSERT_EQ(txn->GetNumPuts(), 0);
1275
1276 // make commit
1277 s = txn->Commit();
1278 ASSERT_EQ(s, Status::InvalidArgument());
1279
1280 // try rollback again
1281 s = txn->Rollback();
1282 ASSERT_EQ(s, Status::InvalidArgument());
1283
1284 delete txn;
1285 }
1286
TEST_P(TransactionTest,PersistentTwoPhaseTransactionTest)1287 TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
1288 WriteOptions write_options;
1289 write_options.sync = true;
1290 write_options.disableWAL = false;
1291 ReadOptions read_options;
1292
1293 TransactionOptions txn_options;
1294
1295 std::string value;
1296 Status s;
1297
1298 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1299
1300 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1301 s = txn->SetName("xid");
1302 ASSERT_OK(s);
1303
1304 ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1305
1306 // transaction put
1307 s = txn->Put(Slice("foo"), Slice("bar"));
1308 ASSERT_OK(s);
1309 ASSERT_EQ(1, txn->GetNumPuts());
1310
1311 // txn read
1312 s = txn->Get(read_options, "foo", &value);
1313 ASSERT_OK(s);
1314 ASSERT_EQ(value, "bar");
1315
1316 // regular db put
1317 s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
1318 ASSERT_OK(s);
1319 ASSERT_EQ(1, txn->GetNumPuts());
1320
1321 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1322
1323 // regular db read
1324 db->Get(read_options, "foo2", &value);
1325 ASSERT_EQ(value, "bar2");
1326
1327 // nothing has been prepped yet
1328 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1329
1330 // prepare
1331 s = txn->Prepare();
1332 ASSERT_OK(s);
1333
1334 // still not available to db
1335 s = db->Get(read_options, Slice("foo"), &value);
1336 ASSERT_TRUE(s.IsNotFound());
1337
1338 ASSERT_OK(db->FlushWAL(false));
1339 delete txn;
1340 // kill and reopen
1341 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1342 s = ReOpenNoDelete();
1343 ASSERT_OK(s);
1344 assert(db != nullptr);
1345 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1346
1347 // find trans in list of prepared transactions
1348 std::vector<Transaction*> prepared_trans;
1349 db->GetAllPreparedTransactions(&prepared_trans);
1350 ASSERT_EQ(prepared_trans.size(), 1);
1351
1352 txn = prepared_trans.front();
1353 ASSERT_TRUE(txn);
1354 ASSERT_EQ(txn->GetName(), "xid");
1355 ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1356
1357 // log has been marked
1358 auto log_containing_prep =
1359 db_impl->TEST_FindMinLogContainingOutstandingPrep();
1360 ASSERT_GT(log_containing_prep, 0);
1361
1362 // value is readable from txn
1363 s = txn->Get(read_options, "foo", &value);
1364 ASSERT_OK(s);
1365 ASSERT_EQ(value, "bar");
1366
1367 // make commit
1368 s = txn->Commit();
1369 ASSERT_OK(s);
1370
1371 // value is now available
1372 db->Get(read_options, "foo", &value);
1373 ASSERT_EQ(value, "bar");
1374
1375 // we already committed
1376 s = txn->Commit();
1377 ASSERT_EQ(s, Status::InvalidArgument());
1378
1379 // no longer is prepared results
1380 prepared_trans.clear();
1381 db->GetAllPreparedTransactions(&prepared_trans);
1382 ASSERT_EQ(prepared_trans.size(), 0);
1383
1384 // transaction should no longer be visible
1385 ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1386
1387 // heap should not care about prepared section anymore
1388 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1389
1390 switch (txn_db_options.write_policy) {
1391 case WRITE_COMMITTED:
1392 // but now our memtable should be referencing the prep section
1393 ASSERT_EQ(log_containing_prep,
1394 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1395 ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1396
1397 break;
1398 case WRITE_PREPARED:
1399 case WRITE_UNPREPARED:
1400 // In these modes memtable do not ref the prep sections
1401 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1402 break;
1403 default:
1404 assert(false);
1405 }
1406
1407 // Add a dummy record to memtable before a flush. Otherwise, the
1408 // memtable will be empty and flush will be skipped.
1409 s = db->Put(write_options, Slice("foo3"), Slice("bar3"));
1410 ASSERT_OK(s);
1411
1412 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
1413
1414 // after memtable flush we can now release the log
1415 ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
1416 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1417
1418 delete txn;
1419
1420 // deleting transaction should unregister transaction
1421 ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1422 }
1423 #endif // ROCKSDB_VALGRIND_RUN
1424
1425 // TODO this test needs to be updated with serial commits
TEST_P(TransactionTest,DISABLED_TwoPhaseMultiThreadTest)1426 TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) {
1427 // mix transaction writes and regular writes
1428 const uint32_t NUM_TXN_THREADS = 50;
1429 std::atomic<uint32_t> txn_thread_num(0);
1430
1431 std::function<void()> txn_write_thread = [&]() {
1432 uint32_t id = txn_thread_num.fetch_add(1);
1433
1434 WriteOptions write_options;
1435 write_options.sync = true;
1436 write_options.disableWAL = false;
1437 TransactionOptions txn_options;
1438 txn_options.lock_timeout = 1000000;
1439 if (id % 2 == 0) {
1440 txn_options.expiration = 1000000;
1441 }
1442 TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(id)));
1443 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1444 ASSERT_OK(txn->SetName(name));
1445 for (int i = 0; i < 10; i++) {
1446 std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
1447 ASSERT_OK(txn->Put(key, "val"));
1448 }
1449 ASSERT_OK(txn->Prepare());
1450 ASSERT_OK(txn->Commit());
1451 delete txn;
1452 };
1453
1454 // assure that all thread are in the same write group
1455 std::atomic<uint32_t> t_wait_on_prepare(0);
1456 std::atomic<uint32_t> t_wait_on_commit(0);
1457
1458 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1459 "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
1460 auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
1461
1462 if (writer->ShouldWriteToWAL()) {
1463 t_wait_on_prepare.fetch_add(1);
1464 // wait for friends
1465 while (t_wait_on_prepare.load() < NUM_TXN_THREADS) {
1466 env->SleepForMicroseconds(10);
1467 }
1468 } else if (writer->ShouldWriteToMemtable()) {
1469 t_wait_on_commit.fetch_add(1);
1470 // wait for friends
1471 while (t_wait_on_commit.load() < NUM_TXN_THREADS) {
1472 env->SleepForMicroseconds(10);
1473 }
1474 } else {
1475 FAIL();
1476 }
1477 });
1478
1479 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1480
1481 // do all the writes
1482 std::vector<port::Thread> threads;
1483 for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
1484 threads.emplace_back(txn_write_thread);
1485 }
1486 for (auto& t : threads) {
1487 t.join();
1488 }
1489
1490 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1491 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1492
1493 ReadOptions read_options;
1494 std::string value;
1495 Status s;
1496 for (uint32_t t = 0; t < NUM_TXN_THREADS; t++) {
1497 TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(t)));
1498 for (int i = 0; i < 10; i++) {
1499 std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
1500 s = db->Get(read_options, key, &value);
1501 ASSERT_OK(s);
1502 ASSERT_EQ(value, "val");
1503 }
1504 }
1505 }
1506
TEST_P(TransactionStressTest,TwoPhaseLongPrepareTest)1507 TEST_P(TransactionStressTest, TwoPhaseLongPrepareTest) {
1508 WriteOptions write_options;
1509 write_options.sync = true;
1510 write_options.disableWAL = false;
1511 ReadOptions read_options;
1512 TransactionOptions txn_options;
1513
1514 std::string value;
1515 Status s;
1516
1517 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1518 s = txn->SetName("bob");
1519 ASSERT_OK(s);
1520
1521 // transaction put
1522 s = txn->Put(Slice("foo"), Slice("bar"));
1523 ASSERT_OK(s);
1524
1525 // prepare
1526 s = txn->Prepare();
1527 ASSERT_OK(s);
1528
1529 delete txn;
1530
1531 for (int i = 0; i < 1000; i++) {
1532 std::string key(i, 'k');
1533 std::string val(1000, 'v');
1534 assert(db != nullptr);
1535 s = db->Put(write_options, key, val);
1536 ASSERT_OK(s);
1537
1538 if (i % 29 == 0) {
1539 // crash
1540 env->SetFilesystemActive(false);
1541 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1542 ReOpenNoDelete();
1543 } else if (i % 37 == 0) {
1544 // close
1545 ReOpenNoDelete();
1546 }
1547 }
1548
1549 // commit old txn
1550 txn = db->GetTransactionByName("bob");
1551 ASSERT_TRUE(txn);
1552 s = txn->Commit();
1553 ASSERT_OK(s);
1554
1555 // verify data txn data
1556 s = db->Get(read_options, "foo", &value);
1557 ASSERT_EQ(s, Status::OK());
1558 ASSERT_EQ(value, "bar");
1559
1560 // verify non txn data
1561 for (int i = 0; i < 1000; i++) {
1562 std::string key(i, 'k');
1563 std::string val(1000, 'v');
1564 s = db->Get(read_options, key, &value);
1565 ASSERT_EQ(s, Status::OK());
1566 ASSERT_EQ(value, val);
1567 }
1568
1569 delete txn;
1570 }
1571
TEST_P(TransactionTest,TwoPhaseSequenceTest)1572 TEST_P(TransactionTest, TwoPhaseSequenceTest) {
1573 WriteOptions write_options;
1574 write_options.sync = true;
1575 write_options.disableWAL = false;
1576 ReadOptions read_options;
1577
1578 TransactionOptions txn_options;
1579
1580 std::string value;
1581 Status s;
1582
1583 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1584 s = txn->SetName("xid");
1585 ASSERT_OK(s);
1586
1587 // transaction put
1588 s = txn->Put(Slice("foo"), Slice("bar"));
1589 ASSERT_OK(s);
1590 s = txn->Put(Slice("foo2"), Slice("bar2"));
1591 ASSERT_OK(s);
1592 s = txn->Put(Slice("foo3"), Slice("bar3"));
1593 ASSERT_OK(s);
1594 s = txn->Put(Slice("foo4"), Slice("bar4"));
1595 ASSERT_OK(s);
1596
1597 // prepare
1598 s = txn->Prepare();
1599 ASSERT_OK(s);
1600
1601 // make commit
1602 s = txn->Commit();
1603 ASSERT_OK(s);
1604
1605 delete txn;
1606
1607 // kill and reopen
1608 env->SetFilesystemActive(false);
1609 ReOpenNoDelete();
1610 assert(db != nullptr);
1611
1612 // value is now available
1613 s = db->Get(read_options, "foo4", &value);
1614 ASSERT_EQ(s, Status::OK());
1615 ASSERT_EQ(value, "bar4");
1616 }
1617
TEST_P(TransactionTest,TwoPhaseDoubleRecoveryTest)1618 TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) {
1619 WriteOptions write_options;
1620 write_options.sync = true;
1621 write_options.disableWAL = false;
1622 ReadOptions read_options;
1623
1624 TransactionOptions txn_options;
1625
1626 std::string value;
1627 Status s;
1628
1629 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1630 s = txn->SetName("a");
1631 ASSERT_OK(s);
1632
1633 // transaction put
1634 s = txn->Put(Slice("foo"), Slice("bar"));
1635 ASSERT_OK(s);
1636
1637 // prepare
1638 s = txn->Prepare();
1639 ASSERT_OK(s);
1640
1641 delete txn;
1642
1643 // kill and reopen
1644 env->SetFilesystemActive(false);
1645 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
1646 ReOpenNoDelete();
1647
1648 // commit old txn
1649 assert(db != nullptr); // Make clang analyze happy.
1650 txn = db->GetTransactionByName("a");
1651 assert(txn != nullptr);
1652 s = txn->Commit();
1653 ASSERT_OK(s);
1654
1655 s = db->Get(read_options, "foo", &value);
1656 ASSERT_EQ(s, Status::OK());
1657 ASSERT_EQ(value, "bar");
1658
1659 delete txn;
1660
1661 txn = db->BeginTransaction(write_options, txn_options);
1662 s = txn->SetName("b");
1663 ASSERT_OK(s);
1664
1665 s = txn->Put(Slice("foo2"), Slice("bar2"));
1666 ASSERT_OK(s);
1667
1668 s = txn->Prepare();
1669 ASSERT_OK(s);
1670
1671 s = txn->Commit();
1672 ASSERT_OK(s);
1673
1674 delete txn;
1675
1676 // kill and reopen
1677 env->SetFilesystemActive(false);
1678 ASSERT_OK(ReOpenNoDelete());
1679 assert(db != nullptr);
1680
1681 // value is now available
1682 s = db->Get(read_options, "foo", &value);
1683 ASSERT_EQ(s, Status::OK());
1684 ASSERT_EQ(value, "bar");
1685
1686 s = db->Get(read_options, "foo2", &value);
1687 ASSERT_EQ(s, Status::OK());
1688 ASSERT_EQ(value, "bar2");
1689 }
1690
TEST_P(TransactionTest,TwoPhaseLogRollingTest)1691 TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
1692 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1693
1694 Status s;
1695 std::string v;
1696 ColumnFamilyHandle *cfa, *cfb;
1697
1698 // Create 2 new column families
1699 ColumnFamilyOptions cf_options;
1700 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1701 ASSERT_OK(s);
1702 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1703 ASSERT_OK(s);
1704
1705 WriteOptions wopts;
1706 wopts.disableWAL = false;
1707 wopts.sync = true;
1708
1709 TransactionOptions topts1;
1710 Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1711 s = txn1->SetName("xid1");
1712 ASSERT_OK(s);
1713
1714 TransactionOptions topts2;
1715 Transaction* txn2 = db->BeginTransaction(wopts, topts2);
1716 s = txn2->SetName("xid2");
1717 ASSERT_OK(s);
1718
1719 // transaction put in two column families
1720 s = txn1->Put(cfa, "ka1", "va1");
1721 ASSERT_OK(s);
1722
1723 // transaction put in two column families
1724 s = txn2->Put(cfa, "ka2", "va2");
1725 ASSERT_OK(s);
1726 s = txn2->Put(cfb, "kb2", "vb2");
1727 ASSERT_OK(s);
1728
1729 // write prep section to wal
1730 s = txn1->Prepare();
1731 ASSERT_OK(s);
1732
1733 // our log should be in the heap
1734 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1735 txn1->GetLogNumber());
1736 ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
1737
1738 // flush default cf to crate new log
1739 s = db->Put(wopts, "foo", "bar");
1740 ASSERT_OK(s);
1741 s = db_impl->TEST_FlushMemTable(true);
1742 ASSERT_OK(s);
1743
1744 // make sure we are on a new log
1745 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
1746
1747 // put txn2 prep section in this log
1748 s = txn2->Prepare();
1749 ASSERT_OK(s);
1750 ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
1751
1752 // heap should still see first log
1753 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1754 txn1->GetLogNumber());
1755
1756 // commit txn1
1757 s = txn1->Commit();
1758 ASSERT_OK(s);
1759
1760 // heap should now show txn2s log
1761 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1762 txn2->GetLogNumber());
1763
1764 switch (txn_db_options.write_policy) {
1765 case WRITE_COMMITTED:
1766 // we should see txn1s log refernced by the memtables
1767 ASSERT_EQ(txn1->GetLogNumber(),
1768 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1769 break;
1770 case WRITE_PREPARED:
1771 case WRITE_UNPREPARED:
1772 // In these modes memtable do not ref the prep sections
1773 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1774 break;
1775 default:
1776 assert(false);
1777 }
1778
1779 // flush default cf to crate new log
1780 s = db->Put(wopts, "foo", "bar2");
1781 ASSERT_OK(s);
1782 s = db_impl->TEST_FlushMemTable(true);
1783 ASSERT_OK(s);
1784
1785 // make sure we are on a new log
1786 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
1787
1788 // commit txn2
1789 s = txn2->Commit();
1790 ASSERT_OK(s);
1791
1792 // heap should not show any logs
1793 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1794
1795 switch (txn_db_options.write_policy) {
1796 case WRITE_COMMITTED:
1797 // should show the first txn log
1798 ASSERT_EQ(txn1->GetLogNumber(),
1799 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1800 break;
1801 case WRITE_PREPARED:
1802 case WRITE_UNPREPARED:
1803 // In these modes memtable do not ref the prep sections
1804 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1805 break;
1806 default:
1807 assert(false);
1808 }
1809
1810 // flush only cfa memtable
1811 s = db_impl->TEST_FlushMemTable(true, false, cfa);
1812 ASSERT_OK(s);
1813
1814 switch (txn_db_options.write_policy) {
1815 case WRITE_COMMITTED:
1816 // should show the first txn log
1817 ASSERT_EQ(txn2->GetLogNumber(),
1818 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1819 break;
1820 case WRITE_PREPARED:
1821 case WRITE_UNPREPARED:
1822 // In these modes memtable do not ref the prep sections
1823 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1824 break;
1825 default:
1826 assert(false);
1827 }
1828
1829 // flush only cfb memtable
1830 s = db_impl->TEST_FlushMemTable(true, false, cfb);
1831 ASSERT_OK(s);
1832
1833 // should show not dependency on logs
1834 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1835 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1836
1837 delete txn1;
1838 delete txn2;
1839 delete cfa;
1840 delete cfb;
1841 }
1842
TEST_P(TransactionTest,TwoPhaseLogRollingTest2)1843 TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
1844 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1845
1846 Status s;
1847 ColumnFamilyHandle *cfa, *cfb;
1848
1849 ColumnFamilyOptions cf_options;
1850 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1851 ASSERT_OK(s);
1852 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1853 ASSERT_OK(s);
1854
1855 WriteOptions wopts;
1856 wopts.disableWAL = false;
1857 wopts.sync = true;
1858
1859 auto cfh_a = static_cast_with_check<ColumnFamilyHandleImpl>(cfa);
1860 auto cfh_b = static_cast_with_check<ColumnFamilyHandleImpl>(cfb);
1861
1862 TransactionOptions topts1;
1863 Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1864 s = txn1->SetName("xid1");
1865 ASSERT_OK(s);
1866 s = txn1->Put(cfa, "boys", "girls1");
1867 ASSERT_OK(s);
1868
1869 Transaction* txn2 = db->BeginTransaction(wopts, topts1);
1870 s = txn2->SetName("xid2");
1871 ASSERT_OK(s);
1872 s = txn2->Put(cfb, "up", "down1");
1873 ASSERT_OK(s);
1874
1875 // prepre transaction in LOG A
1876 s = txn1->Prepare();
1877 ASSERT_OK(s);
1878
1879 // prepre transaction in LOG A
1880 s = txn2->Prepare();
1881 ASSERT_OK(s);
1882
1883 // regular put so that mem table can actually be flushed for log rolling
1884 s = db->Put(wopts, "cats", "dogs1");
1885 ASSERT_OK(s);
1886
1887 auto prepare_log_no = txn1->GetLastLogNumber();
1888
1889 // roll to LOG B
1890 s = db_impl->TEST_FlushMemTable(true);
1891 ASSERT_OK(s);
1892
1893 // now we pause background work so that
1894 // imm()s are not flushed before we can check their status
1895 s = db_impl->PauseBackgroundWork();
1896 ASSERT_OK(s);
1897
1898 ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
1899 switch (txn_db_options.write_policy) {
1900 case WRITE_COMMITTED:
1901 // This cf is empty and should ref the latest log
1902 ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1903 ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
1904 break;
1905 case WRITE_PREPARED:
1906 case WRITE_UNPREPARED:
1907 // This cf is not flushed yet and should ref the log that has its data
1908 ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1909 break;
1910 default:
1911 assert(false);
1912 }
1913 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1914 txn1->GetLogNumber());
1915 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1916
1917 // commit in LOG B
1918 s = txn1->Commit();
1919 ASSERT_OK(s);
1920
1921 switch (txn_db_options.write_policy) {
1922 case WRITE_COMMITTED:
1923 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
1924 prepare_log_no);
1925 break;
1926 case WRITE_PREPARED:
1927 case WRITE_UNPREPARED:
1928 // In these modes memtable do not ref the prep sections
1929 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1930 break;
1931 default:
1932 assert(false);
1933 }
1934
1935 ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
1936
1937 // request a flush for all column families such that the earliest
1938 // alive log file can be killed
1939 ASSERT_OK(db_impl->TEST_SwitchWAL());
1940 // log cannot be flushed because txn2 has not been commited
1941 ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
1942 ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog());
1943
1944 // assert that cfa has a flush requested
1945 ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
1946
1947 switch (txn_db_options.write_policy) {
1948 case WRITE_COMMITTED:
1949 // cfb should not be flushed becuse it has no data from LOG A
1950 ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
1951 break;
1952 case WRITE_PREPARED:
1953 case WRITE_UNPREPARED:
1954 // cfb should be flushed becuse it has prepared data from LOG A
1955 ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1956 break;
1957 default:
1958 assert(false);
1959 }
1960
1961 // cfb now has data from LOG A
1962 s = txn2->Commit();
1963 ASSERT_OK(s);
1964
1965 ASSERT_OK(db_impl->TEST_SwitchWAL());
1966 ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
1967
1968 // we should see that cfb now has a flush requested
1969 ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1970
1971 // all data in LOG A resides in a memtable that has been
1972 // requested for a flush
1973 ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
1974
1975 delete txn1;
1976 delete txn2;
1977 delete cfa;
1978 delete cfb;
1979 }
1980 /*
1981 * 1) use prepare to keep first log around to determine starting sequence
1982 * during recovery.
1983 * 2) insert many values, skipping wal, to increase seqid.
1984 * 3) insert final value into wal
1985 * 4) recover and see that final value was properly recovered - not
1986 * hidden behind improperly summed sequence ids
1987 */
TEST_P(TransactionTest,TwoPhaseOutOfOrderDelete)1988 TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
1989 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1990 WriteOptions wal_on, wal_off;
1991 wal_on.sync = true;
1992 wal_on.disableWAL = false;
1993 wal_off.disableWAL = true;
1994 ReadOptions read_options;
1995 TransactionOptions txn_options;
1996
1997 std::string value;
1998 Status s;
1999
2000 Transaction* txn1 = db->BeginTransaction(wal_on, txn_options);
2001
2002 s = txn1->SetName("1");
2003 ASSERT_OK(s);
2004
2005 s = db->Put(wal_on, "first", "first");
2006 ASSERT_OK(s);
2007
2008 s = txn1->Put(Slice("dummy"), Slice("dummy"));
2009 ASSERT_OK(s);
2010 s = txn1->Prepare();
2011 ASSERT_OK(s);
2012
2013 s = db->Put(wal_off, "cats", "dogs1");
2014 ASSERT_OK(s);
2015 s = db->Put(wal_off, "cats", "dogs2");
2016 ASSERT_OK(s);
2017 s = db->Put(wal_off, "cats", "dogs3");
2018 ASSERT_OK(s);
2019
2020 s = db_impl->TEST_FlushMemTable(true);
2021 ASSERT_OK(s);
2022
2023 s = db->Put(wal_on, "cats", "dogs4");
2024 ASSERT_OK(s);
2025
2026 ASSERT_OK(db->FlushWAL(false));
2027
2028 // kill and reopen
2029 env->SetFilesystemActive(false);
2030 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
2031 ASSERT_OK(ReOpenNoDelete());
2032 assert(db != nullptr);
2033
2034 s = db->Get(read_options, "first", &value);
2035 ASSERT_OK(s);
2036 ASSERT_EQ(value, "first");
2037
2038 s = db->Get(read_options, "cats", &value);
2039 ASSERT_OK(s);
2040 ASSERT_EQ(value, "dogs4");
2041 }
2042
TEST_P(TransactionTest,FirstWriteTest)2043 TEST_P(TransactionTest, FirstWriteTest) {
2044 WriteOptions write_options;
2045
2046 // Test conflict checking against the very first write to a db.
2047 // The transaction's snapshot will have seq 1 and the following write
2048 // will have sequence 1.
2049 Status s = db->Put(write_options, "A", "a");
2050
2051 Transaction* txn = db->BeginTransaction(write_options);
2052 txn->SetSnapshot();
2053
2054 ASSERT_OK(s);
2055
2056 s = txn->Put("A", "b");
2057 ASSERT_OK(s);
2058
2059 delete txn;
2060 }
2061
TEST_P(TransactionTest,FirstWriteTest2)2062 TEST_P(TransactionTest, FirstWriteTest2) {
2063 WriteOptions write_options;
2064
2065 Transaction* txn = db->BeginTransaction(write_options);
2066 txn->SetSnapshot();
2067
2068 // Test conflict checking against the very first write to a db.
2069 // The transaction's snapshot is a seq 0 while the following write
2070 // will have sequence 1.
2071 Status s = db->Put(write_options, "A", "a");
2072 ASSERT_OK(s);
2073
2074 s = txn->Put("A", "b");
2075 ASSERT_TRUE(s.IsBusy());
2076
2077 delete txn;
2078 }
2079
TEST_P(TransactionTest,WriteOptionsTest)2080 TEST_P(TransactionTest, WriteOptionsTest) {
2081 WriteOptions write_options;
2082 write_options.sync = true;
2083 write_options.disableWAL = true;
2084
2085 Transaction* txn = db->BeginTransaction(write_options);
2086 ASSERT_TRUE(txn);
2087
2088 ASSERT_TRUE(txn->GetWriteOptions()->sync);
2089
2090 write_options.sync = false;
2091 txn->SetWriteOptions(write_options);
2092 ASSERT_FALSE(txn->GetWriteOptions()->sync);
2093 ASSERT_TRUE(txn->GetWriteOptions()->disableWAL);
2094
2095 delete txn;
2096 }
2097
TEST_P(TransactionTest,WriteConflictTest)2098 TEST_P(TransactionTest, WriteConflictTest) {
2099 WriteOptions write_options;
2100 ReadOptions read_options;
2101 string value;
2102 Status s;
2103
2104 ASSERT_OK(db->Put(write_options, "foo", "A"));
2105 ASSERT_OK(db->Put(write_options, "foo2", "B"));
2106
2107 Transaction* txn = db->BeginTransaction(write_options);
2108 ASSERT_TRUE(txn);
2109
2110 s = txn->Put("foo", "A2");
2111 ASSERT_OK(s);
2112
2113 s = txn->Put("foo2", "B2");
2114 ASSERT_OK(s);
2115
2116 // This Put outside of a transaction will conflict with the previous write
2117 s = db->Put(write_options, "foo", "xxx");
2118 ASSERT_TRUE(s.IsTimedOut());
2119
2120 s = db->Get(read_options, "foo", &value);
2121 ASSERT_EQ(value, "A");
2122
2123 s = txn->Commit();
2124 ASSERT_OK(s);
2125
2126 db->Get(read_options, "foo", &value);
2127 ASSERT_EQ(value, "A2");
2128 db->Get(read_options, "foo2", &value);
2129 ASSERT_EQ(value, "B2");
2130
2131 delete txn;
2132 }
2133
TEST_P(TransactionTest,WriteConflictTest2)2134 TEST_P(TransactionTest, WriteConflictTest2) {
2135 WriteOptions write_options;
2136 ReadOptions read_options;
2137 TransactionOptions txn_options;
2138 std::string value;
2139 Status s;
2140
2141 ASSERT_OK(db->Put(write_options, "foo", "bar"));
2142
2143 txn_options.set_snapshot = true;
2144 Transaction* txn = db->BeginTransaction(write_options, txn_options);
2145 ASSERT_TRUE(txn);
2146
2147 // This Put outside of a transaction will conflict with a later write
2148 s = db->Put(write_options, "foo", "barz");
2149 ASSERT_OK(s);
2150
2151 s = txn->Put("foo2", "X");
2152 ASSERT_OK(s);
2153
2154 s = txn->Put("foo",
2155 "bar2"); // Conflicts with write done after snapshot taken
2156 ASSERT_TRUE(s.IsBusy());
2157
2158 s = txn->Put("foo3", "Y");
2159 ASSERT_OK(s);
2160
2161 s = db->Get(read_options, "foo", &value);
2162 ASSERT_EQ(value, "barz");
2163
2164 ASSERT_EQ(2, txn->GetNumKeys());
2165
2166 s = txn->Commit();
2167 ASSERT_OK(s); // Txn should commit, but only write foo2 and foo3
2168
2169 // Verify that transaction wrote foo2 and foo3 but not foo
2170 db->Get(read_options, "foo", &value);
2171 ASSERT_EQ(value, "barz");
2172
2173 db->Get(read_options, "foo2", &value);
2174 ASSERT_EQ(value, "X");
2175
2176 db->Get(read_options, "foo3", &value);
2177 ASSERT_EQ(value, "Y");
2178
2179 delete txn;
2180 }
2181
TEST_P(TransactionTest,ReadConflictTest)2182 TEST_P(TransactionTest, ReadConflictTest) {
2183 WriteOptions write_options;
2184 ReadOptions read_options, snapshot_read_options;
2185 TransactionOptions txn_options;
2186 std::string value;
2187 Status s;
2188
2189 ASSERT_OK(db->Put(write_options, "foo", "bar"));
2190 ASSERT_OK(db->Put(write_options, "foo2", "bar"));
2191
2192 txn_options.set_snapshot = true;
2193 Transaction* txn = db->BeginTransaction(write_options, txn_options);
2194 ASSERT_TRUE(txn);
2195
2196 txn->SetSnapshot();
2197 snapshot_read_options.snapshot = txn->GetSnapshot();
2198
2199 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2200 ASSERT_EQ(value, "bar");
2201
2202 // This Put outside of a transaction will conflict with the previous read
2203 s = db->Put(write_options, "foo", "barz");
2204 ASSERT_TRUE(s.IsTimedOut());
2205
2206 s = db->Get(read_options, "foo", &value);
2207 ASSERT_EQ(value, "bar");
2208
2209 s = txn->Get(read_options, "foo", &value);
2210 ASSERT_EQ(value, "bar");
2211
2212 s = txn->Commit();
2213 ASSERT_OK(s);
2214
2215 delete txn;
2216 }
2217
TEST_P(TransactionTest,TxnOnlyTest)2218 TEST_P(TransactionTest, TxnOnlyTest) {
2219 // Test to make sure transactions work when there are no other writes in an
2220 // empty db.
2221
2222 WriteOptions write_options;
2223 ReadOptions read_options;
2224 std::string value;
2225 Status s;
2226
2227 Transaction* txn = db->BeginTransaction(write_options);
2228 ASSERT_TRUE(txn);
2229
2230 s = txn->Put("x", "y");
2231 ASSERT_OK(s);
2232
2233 s = txn->Commit();
2234 ASSERT_OK(s);
2235
2236 delete txn;
2237 }
2238
TEST_P(TransactionTest,FlushTest)2239 TEST_P(TransactionTest, FlushTest) {
2240 WriteOptions write_options;
2241 ReadOptions read_options, snapshot_read_options;
2242 std::string value;
2243 Status s;
2244
2245 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
2246 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
2247
2248 Transaction* txn = db->BeginTransaction(write_options);
2249 ASSERT_TRUE(txn);
2250
2251 snapshot_read_options.snapshot = txn->GetSnapshot();
2252
2253 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2254 ASSERT_EQ(value, "bar");
2255
2256 s = txn->Put(Slice("foo"), Slice("bar2"));
2257 ASSERT_OK(s);
2258
2259 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2260 ASSERT_EQ(value, "bar2");
2261
2262 // Put a random key so we have a memtable to flush
2263 s = db->Put(write_options, "dummy", "dummy");
2264 ASSERT_OK(s);
2265
2266 // force a memtable flush
2267 FlushOptions flush_ops;
2268 db->Flush(flush_ops);
2269
2270 s = txn->Commit();
2271 // txn should commit since the flushed table is still in MemtableList History
2272 ASSERT_OK(s);
2273
2274 db->Get(read_options, "foo", &value);
2275 ASSERT_EQ(value, "bar2");
2276
2277 delete txn;
2278 }
2279
TEST_P(TransactionTest,FlushTest2)2280 TEST_P(TransactionTest, FlushTest2) {
2281 const size_t num_tests = 3;
2282
2283 for (size_t n = 0; n < num_tests; n++) {
2284 // Test different table factories
2285 switch (n) {
2286 case 0:
2287 break;
2288 case 1:
2289 options.table_factory.reset(new mock::MockTableFactory());
2290 break;
2291 case 2: {
2292 PlainTableOptions pt_opts;
2293 pt_opts.hash_table_ratio = 0;
2294 options.table_factory.reset(NewPlainTableFactory(pt_opts));
2295 break;
2296 }
2297 }
2298
2299 Status s = ReOpen();
2300 ASSERT_OK(s);
2301 assert(db != nullptr);
2302
2303 WriteOptions write_options;
2304 ReadOptions read_options, snapshot_read_options;
2305 TransactionOptions txn_options;
2306 string value;
2307
2308 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2309
2310 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
2311 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar2")));
2312 ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar3")));
2313
2314 txn_options.set_snapshot = true;
2315 Transaction* txn = db->BeginTransaction(write_options, txn_options);
2316 ASSERT_TRUE(txn);
2317
2318 snapshot_read_options.snapshot = txn->GetSnapshot();
2319
2320 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2321 ASSERT_EQ(value, "bar");
2322
2323 s = txn->Put(Slice("foo"), Slice("bar2"));
2324 ASSERT_OK(s);
2325
2326 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
2327 ASSERT_EQ(value, "bar2");
2328 // verify foo is locked by txn
2329 s = db->Delete(write_options, "foo");
2330 ASSERT_TRUE(s.IsTimedOut());
2331
2332 s = db->Put(write_options, "Z", "z");
2333 ASSERT_OK(s);
2334 s = db->Put(write_options, "dummy", "dummy");
2335 ASSERT_OK(s);
2336
2337 s = db->Put(write_options, "S", "s");
2338 ASSERT_OK(s);
2339 s = db->SingleDelete(write_options, "S");
2340 ASSERT_OK(s);
2341
2342 s = txn->Delete("S");
2343 // Should fail after encountering a write to S in memtable
2344 ASSERT_TRUE(s.IsBusy());
2345
2346 // force a memtable flush
2347 s = db_impl->TEST_FlushMemTable(true);
2348 ASSERT_OK(s);
2349
2350 // Put a random key so we have a MemTable to flush
2351 s = db->Put(write_options, "dummy", "dummy2");
2352 ASSERT_OK(s);
2353
2354 // force a memtable flush
2355 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2356
2357 s = db->Put(write_options, "dummy", "dummy3");
2358 ASSERT_OK(s);
2359
2360 // force a memtable flush
2361 // Since our test db has max_write_buffer_number=2, this flush will cause
2362 // the first memtable to get purged from the MemtableList history.
2363 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2364
2365 s = txn->Put("X", "Y");
2366 // Should succeed after verifying there is no write to X in SST file
2367 ASSERT_OK(s);
2368
2369 s = txn->Put("Z", "zz");
2370 // Should fail after encountering a write to Z in SST file
2371 ASSERT_TRUE(s.IsBusy());
2372
2373 s = txn->GetForUpdate(read_options, "foo2", &value);
2374 // should succeed since key was written before txn started
2375 ASSERT_OK(s);
2376 // verify foo2 is locked by txn
2377 s = db->Delete(write_options, "foo2");
2378 ASSERT_TRUE(s.IsTimedOut());
2379
2380 s = txn->Delete("S");
2381 // Should fail after encountering a write to S in SST file
2382 ASSERT_TRUE(s.IsBusy());
2383
2384 // Write a bunch of keys to db to force a compaction
2385 Random rnd(47);
2386 for (int i = 0; i < 1000; i++) {
2387 s = db->Put(write_options, std::to_string(i),
2388 test::CompressibleString(&rnd, 0.8, 100, &value));
2389 ASSERT_OK(s);
2390 }
2391
2392 s = txn->Put("X", "yy");
2393 // Should succeed after verifying there is no write to X in SST file
2394 ASSERT_OK(s);
2395
2396 s = txn->Put("Z", "zzz");
2397 // Should fail after encountering a write to Z in SST file
2398 ASSERT_TRUE(s.IsBusy());
2399
2400 s = txn->Delete("S");
2401 // Should fail after encountering a write to S in SST file
2402 ASSERT_TRUE(s.IsBusy());
2403
2404 s = txn->GetForUpdate(read_options, "foo3", &value);
2405 // should succeed since key was written before txn started
2406 ASSERT_OK(s);
2407 // verify foo3 is locked by txn
2408 s = db->Delete(write_options, "foo3");
2409 ASSERT_TRUE(s.IsTimedOut());
2410
2411 ASSERT_OK(db_impl->TEST_WaitForCompact());
2412
2413 s = txn->Commit();
2414 ASSERT_OK(s);
2415
2416 // Transaction should only write the keys that succeeded.
2417 s = db->Get(read_options, "foo", &value);
2418 ASSERT_EQ(value, "bar2");
2419
2420 s = db->Get(read_options, "X", &value);
2421 ASSERT_OK(s);
2422 ASSERT_EQ("yy", value);
2423
2424 s = db->Get(read_options, "Z", &value);
2425 ASSERT_OK(s);
2426 ASSERT_EQ("z", value);
2427
2428 delete txn;
2429 }
2430 }
2431
TEST_P(TransactionTest,NoSnapshotTest)2432 TEST_P(TransactionTest, NoSnapshotTest) {
2433 WriteOptions write_options;
2434 ReadOptions read_options;
2435 std::string value;
2436 Status s;
2437
2438 ASSERT_OK(db->Put(write_options, "AAA", "bar"));
2439
2440 Transaction* txn = db->BeginTransaction(write_options);
2441 ASSERT_TRUE(txn);
2442
2443 // Modify key after transaction start
2444 ASSERT_OK(db->Put(write_options, "AAA", "bar1"));
2445
2446 // Read and write without a snap
2447 ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
2448 ASSERT_EQ(value, "bar1");
2449 s = txn->Put("AAA", "bar2");
2450 ASSERT_OK(s);
2451
2452 // Should commit since read/write was done after data changed
2453 s = txn->Commit();
2454 ASSERT_OK(s);
2455
2456 ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
2457 ASSERT_EQ(value, "bar2");
2458
2459 delete txn;
2460 }
2461
TEST_P(TransactionTest,MultipleSnapshotTest)2462 TEST_P(TransactionTest, MultipleSnapshotTest) {
2463 WriteOptions write_options;
2464 ReadOptions read_options, snapshot_read_options;
2465 std::string value;
2466 Status s;
2467
2468 ASSERT_OK(db->Put(write_options, "AAA", "bar"));
2469 ASSERT_OK(db->Put(write_options, "BBB", "bar"));
2470 ASSERT_OK(db->Put(write_options, "CCC", "bar"));
2471
2472 Transaction* txn = db->BeginTransaction(write_options);
2473 ASSERT_TRUE(txn);
2474
2475 ASSERT_OK(db->Put(write_options, "AAA", "bar1"));
2476
2477 // Read and write without a snapshot
2478 ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
2479 ASSERT_EQ(value, "bar1");
2480 s = txn->Put("AAA", "bar2");
2481 ASSERT_OK(s);
2482
2483 // Modify BBB before snapshot is taken
2484 ASSERT_OK(db->Put(write_options, "BBB", "bar1"));
2485
2486 txn->SetSnapshot();
2487 snapshot_read_options.snapshot = txn->GetSnapshot();
2488
2489 // Read and write with snapshot
2490 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value));
2491 ASSERT_EQ(value, "bar1");
2492 s = txn->Put("BBB", "bar2");
2493 ASSERT_OK(s);
2494
2495 ASSERT_OK(db->Put(write_options, "CCC", "bar1"));
2496
2497 // Set a new snapshot
2498 txn->SetSnapshot();
2499 snapshot_read_options.snapshot = txn->GetSnapshot();
2500
2501 // Read and write with snapshot
2502 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "CCC", &value));
2503 ASSERT_EQ(value, "bar1");
2504 s = txn->Put("CCC", "bar2");
2505 ASSERT_OK(s);
2506
2507 s = txn->GetForUpdate(read_options, "AAA", &value);
2508 ASSERT_OK(s);
2509 ASSERT_EQ(value, "bar2");
2510 s = txn->GetForUpdate(read_options, "BBB", &value);
2511 ASSERT_OK(s);
2512 ASSERT_EQ(value, "bar2");
2513 s = txn->GetForUpdate(read_options, "CCC", &value);
2514 ASSERT_OK(s);
2515 ASSERT_EQ(value, "bar2");
2516
2517 s = db->Get(read_options, "AAA", &value);
2518 ASSERT_OK(s);
2519 ASSERT_EQ(value, "bar1");
2520 s = db->Get(read_options, "BBB", &value);
2521 ASSERT_OK(s);
2522 ASSERT_EQ(value, "bar1");
2523 s = db->Get(read_options, "CCC", &value);
2524 ASSERT_OK(s);
2525 ASSERT_EQ(value, "bar1");
2526
2527 s = txn->Commit();
2528 ASSERT_OK(s);
2529
2530 s = db->Get(read_options, "AAA", &value);
2531 ASSERT_OK(s);
2532 ASSERT_EQ(value, "bar2");
2533 s = db->Get(read_options, "BBB", &value);
2534 ASSERT_OK(s);
2535 ASSERT_EQ(value, "bar2");
2536 s = db->Get(read_options, "CCC", &value);
2537 ASSERT_OK(s);
2538 ASSERT_EQ(value, "bar2");
2539
2540 // verify that we track multiple writes to the same key at different snapshots
2541 delete txn;
2542 txn = db->BeginTransaction(write_options);
2543
2544 // Potentially conflicting writes
2545 ASSERT_OK(db->Put(write_options, "ZZZ", "zzz"));
2546 ASSERT_OK(db->Put(write_options, "XXX", "xxx"));
2547
2548 txn->SetSnapshot();
2549
2550 TransactionOptions txn_options;
2551 txn_options.set_snapshot = true;
2552 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2553 txn2->SetSnapshot();
2554
2555 // This should not conflict in txn since the snapshot is later than the
2556 // previous write (spoiler alert: it will later conflict with txn2).
2557 s = txn->Put("ZZZ", "zzzz");
2558 ASSERT_OK(s);
2559
2560 s = txn->Commit();
2561 ASSERT_OK(s);
2562
2563 delete txn;
2564
2565 // This will conflict since the snapshot is earlier than another write to ZZZ
2566 s = txn2->Put("ZZZ", "xxxxx");
2567 ASSERT_TRUE(s.IsBusy());
2568
2569 s = txn2->Commit();
2570 ASSERT_OK(s);
2571
2572 s = db->Get(read_options, "ZZZ", &value);
2573 ASSERT_OK(s);
2574 ASSERT_EQ(value, "zzzz");
2575
2576 delete txn2;
2577 }
2578
TEST_P(TransactionTest,ColumnFamiliesTest)2579 TEST_P(TransactionTest, ColumnFamiliesTest) {
2580 WriteOptions write_options;
2581 ReadOptions read_options, snapshot_read_options;
2582 TransactionOptions txn_options;
2583 string value;
2584 Status s;
2585
2586 ColumnFamilyHandle *cfa, *cfb;
2587 ColumnFamilyOptions cf_options;
2588
2589 // Create 2 new column families
2590 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
2591 ASSERT_OK(s);
2592 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
2593 ASSERT_OK(s);
2594
2595 delete cfa;
2596 delete cfb;
2597 delete db;
2598 db = nullptr;
2599
2600 // open DB with three column families
2601 std::vector<ColumnFamilyDescriptor> column_families;
2602 // have to open default column family
2603 column_families.push_back(
2604 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2605 // open the new column families
2606 column_families.push_back(
2607 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
2608 column_families.push_back(
2609 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
2610
2611 std::vector<ColumnFamilyHandle*> handles;
2612
2613 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2614 assert(db != nullptr);
2615
2616 Transaction* txn = db->BeginTransaction(write_options);
2617 ASSERT_TRUE(txn);
2618
2619 txn->SetSnapshot();
2620 snapshot_read_options.snapshot = txn->GetSnapshot();
2621
2622 txn_options.set_snapshot = true;
2623 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2624 ASSERT_TRUE(txn2);
2625
2626 // Write some data to the db
2627 WriteBatch batch;
2628 ASSERT_OK(batch.Put("foo", "foo"));
2629 ASSERT_OK(batch.Put(handles[1], "AAA", "bar"));
2630 ASSERT_OK(batch.Put(handles[1], "AAAZZZ", "bar"));
2631 s = db->Write(write_options, &batch);
2632 ASSERT_OK(s);
2633 ASSERT_OK(db->Delete(write_options, handles[1], "AAAZZZ"));
2634
2635 // These keys do not conflict with existing writes since they're in
2636 // different column families
2637 s = txn->Delete("AAA");
2638 ASSERT_OK(s);
2639 s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
2640 ASSERT_TRUE(s.IsNotFound());
2641 Slice key_slice("AAAZZZ");
2642 Slice value_slices[2] = {Slice("bar"), Slice("bar")};
2643 s = txn->Put(handles[2], SliceParts(&key_slice, 1),
2644 SliceParts(value_slices, 2));
2645 ASSERT_OK(s);
2646 ASSERT_EQ(3, txn->GetNumKeys());
2647
2648 s = txn->Commit();
2649 ASSERT_OK(s);
2650 s = db->Get(read_options, "AAA", &value);
2651 ASSERT_TRUE(s.IsNotFound());
2652 s = db->Get(read_options, handles[2], "AAAZZZ", &value);
2653 ASSERT_EQ(value, "barbar");
2654
2655 Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
2656 Slice value_slice("barbarbar");
2657
2658 s = txn2->Delete(handles[2], "XXX");
2659 ASSERT_OK(s);
2660 s = txn2->Delete(handles[1], "XXX");
2661 ASSERT_OK(s);
2662
2663 // This write will cause a conflict with the earlier batch write
2664 s = txn2->Put(handles[1], SliceParts(key_slices, 3),
2665 SliceParts(&value_slice, 1));
2666 ASSERT_TRUE(s.IsBusy());
2667
2668 s = txn2->Commit();
2669 ASSERT_OK(s);
2670 // In the above the latest change to AAAZZZ in handles[1] is delete.
2671 s = db->Get(read_options, handles[1], "AAAZZZ", &value);
2672 ASSERT_TRUE(s.IsNotFound());
2673
2674 delete txn;
2675 delete txn2;
2676
2677 txn = db->BeginTransaction(write_options, txn_options);
2678 snapshot_read_options.snapshot = txn->GetSnapshot();
2679
2680 txn2 = db->BeginTransaction(write_options, txn_options);
2681 ASSERT_TRUE(txn);
2682
2683 std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
2684 handles[0], handles[2]};
2685 std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
2686 std::vector<std::string> values(4);
2687 std::vector<Status> results = txn->MultiGetForUpdate(
2688 snapshot_read_options, multiget_cfh, multiget_keys, &values);
2689 ASSERT_OK(results[0]);
2690 ASSERT_OK(results[1]);
2691 ASSERT_OK(results[2]);
2692 ASSERT_TRUE(results[3].IsNotFound());
2693 ASSERT_EQ(values[0], "bar");
2694 ASSERT_EQ(values[1], "barbar");
2695 ASSERT_EQ(values[2], "foo");
2696
2697 s = txn->SingleDelete(handles[2], "ZZZ");
2698 ASSERT_OK(s);
2699 s = txn->Put(handles[2], "ZZZ", "YYY");
2700 ASSERT_OK(s);
2701 s = txn->Put(handles[2], "ZZZ", "YYYY");
2702 ASSERT_OK(s);
2703 s = txn->Delete(handles[2], "ZZZ");
2704 ASSERT_OK(s);
2705 s = txn->Put(handles[2], "AAAZZZ", "barbarbar");
2706 ASSERT_OK(s);
2707
2708 ASSERT_EQ(5, txn->GetNumKeys());
2709
2710 // Txn should commit
2711 s = txn->Commit();
2712 ASSERT_OK(s);
2713 s = db->Get(read_options, handles[2], "ZZZ", &value);
2714 ASSERT_TRUE(s.IsNotFound());
2715
2716 // Put a key which will conflict with the next txn using the previous snapshot
2717 ASSERT_OK(db->Put(write_options, handles[2], "foo", "000"));
2718
2719 results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
2720 multiget_keys, &values);
2721 // All results should fail since there was a conflict
2722 ASSERT_TRUE(results[0].IsBusy());
2723 ASSERT_TRUE(results[1].IsBusy());
2724 ASSERT_TRUE(results[2].IsBusy());
2725 ASSERT_TRUE(results[3].IsBusy());
2726
2727 s = db->Get(read_options, handles[2], "foo", &value);
2728 ASSERT_EQ(value, "000");
2729
2730 s = txn2->Commit();
2731 ASSERT_OK(s);
2732
2733 s = db->DropColumnFamily(handles[1]);
2734 ASSERT_OK(s);
2735 s = db->DropColumnFamily(handles[2]);
2736 ASSERT_OK(s);
2737
2738 delete txn;
2739 delete txn2;
2740
2741 for (auto handle : handles) {
2742 delete handle;
2743 }
2744 }
2745
TEST_P(TransactionTest,MultiGetBatchedTest)2746 TEST_P(TransactionTest, MultiGetBatchedTest) {
2747 WriteOptions write_options;
2748 ReadOptions read_options, snapshot_read_options;
2749 TransactionOptions txn_options;
2750 string value;
2751 Status s;
2752
2753 ColumnFamilyHandle* cf;
2754 ColumnFamilyOptions cf_options;
2755
2756 // Create a new column families
2757 s = db->CreateColumnFamily(cf_options, "CF", &cf);
2758 ASSERT_OK(s);
2759
2760 delete cf;
2761 delete db;
2762 db = nullptr;
2763
2764 // open DB with three column families
2765 std::vector<ColumnFamilyDescriptor> column_families;
2766 // have to open default column family
2767 column_families.push_back(
2768 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2769 // open the new column families
2770 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2771 column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2772
2773 std::vector<ColumnFamilyHandle*> handles;
2774
2775 options.merge_operator = MergeOperators::CreateStringAppendOperator();
2776 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2777 assert(db != nullptr);
2778
2779 // Write some data to the db
2780 WriteBatch batch;
2781 ASSERT_OK(batch.Put(handles[1], "aaa", "val1"));
2782 ASSERT_OK(batch.Put(handles[1], "bbb", "val2"));
2783 ASSERT_OK(batch.Put(handles[1], "ccc", "val3"));
2784 ASSERT_OK(batch.Put(handles[1], "ddd", "foo"));
2785 ASSERT_OK(batch.Put(handles[1], "eee", "val5"));
2786 ASSERT_OK(batch.Put(handles[1], "fff", "val6"));
2787 ASSERT_OK(batch.Merge(handles[1], "ggg", "foo"));
2788 s = db->Write(write_options, &batch);
2789 ASSERT_OK(s);
2790
2791 Transaction* txn = db->BeginTransaction(write_options);
2792 ASSERT_TRUE(txn);
2793
2794 txn->SetSnapshot();
2795 snapshot_read_options.snapshot = txn->GetSnapshot();
2796
2797 txn_options.set_snapshot = true;
2798 // Write some data to the db
2799 s = txn->Delete(handles[1], "bbb");
2800 ASSERT_OK(s);
2801 s = txn->Put(handles[1], "ccc", "val3_new");
2802 ASSERT_OK(s);
2803 s = txn->Merge(handles[1], "ddd", "bar");
2804 ASSERT_OK(s);
2805
2806 std::vector<Slice> keys = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg"};
2807 std::vector<PinnableSlice> values(keys.size());
2808 std::vector<Status> statuses(keys.size());
2809
2810 txn->MultiGet(snapshot_read_options, handles[1], keys.size(), keys.data(),
2811 values.data(), statuses.data());
2812 ASSERT_TRUE(statuses[0].ok());
2813 ASSERT_EQ(values[0], "val1");
2814 ASSERT_TRUE(statuses[1].IsNotFound());
2815 ASSERT_TRUE(statuses[2].ok());
2816 ASSERT_EQ(values[2], "val3_new");
2817 ASSERT_TRUE(statuses[3].ok());
2818 ASSERT_EQ(values[3], "foo,bar");
2819 ASSERT_TRUE(statuses[4].ok());
2820 ASSERT_EQ(values[4], "val5");
2821 ASSERT_TRUE(statuses[5].ok());
2822 ASSERT_EQ(values[5], "val6");
2823 ASSERT_TRUE(statuses[6].ok());
2824 ASSERT_EQ(values[6], "foo");
2825 delete txn;
2826 for (auto handle : handles) {
2827 delete handle;
2828 }
2829 }
2830
2831 // This test calls WriteBatchWithIndex::MultiGetFromBatchAndDB with a large
2832 // number of keys, i.e greater than MultiGetContext::MAX_BATCH_SIZE, which is
2833 // is 32. This forces autovector allocations in the MultiGet code paths
2834 // to use std::vector in addition to stack allocations. The MultiGet keys
2835 // includes Merges, which are handled specially in MultiGetFromBatchAndDB by
2836 // allocating an autovector of MergeContexts
TEST_P(TransactionTest,MultiGetLargeBatchedTest)2837 TEST_P(TransactionTest, MultiGetLargeBatchedTest) {
2838 WriteOptions write_options;
2839 ReadOptions read_options, snapshot_read_options;
2840 string value;
2841 Status s;
2842
2843 ColumnFamilyHandle* cf;
2844 ColumnFamilyOptions cf_options;
2845
2846 std::vector<std::string> key_str;
2847 for (int i = 0; i < 100; ++i) {
2848 key_str.emplace_back(std::to_string(i));
2849 }
2850 // Create a new column families
2851 s = db->CreateColumnFamily(cf_options, "CF", &cf);
2852 ASSERT_OK(s);
2853
2854 delete cf;
2855 delete db;
2856 db = nullptr;
2857
2858 // open DB with three column families
2859 std::vector<ColumnFamilyDescriptor> column_families;
2860 // have to open default column family
2861 column_families.push_back(
2862 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2863 // open the new column families
2864 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2865 column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2866
2867 std::vector<ColumnFamilyHandle*> handles;
2868
2869 options.merge_operator = MergeOperators::CreateStringAppendOperator();
2870 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2871 assert(db != nullptr);
2872
2873 // Write some data to the db
2874 WriteBatch batch;
2875 for (int i = 0; i < 3 * MultiGetContext::MAX_BATCH_SIZE; ++i) {
2876 std::string val = "val" + std::to_string(i);
2877 ASSERT_OK(batch.Put(handles[1], key_str[i], val));
2878 }
2879 s = db->Write(write_options, &batch);
2880 ASSERT_OK(s);
2881
2882 WriteBatchWithIndex wb;
2883 // Write some data to the db
2884 s = wb.Delete(handles[1], std::to_string(1));
2885 ASSERT_OK(s);
2886 s = wb.Put(handles[1], std::to_string(2), "new_val" + std::to_string(2));
2887 ASSERT_OK(s);
2888 // Write a lot of merges so when we call MultiGetFromBatchAndDB later on,
2889 // it is forced to use std::vector in ROCKSDB_NAMESPACE::autovector to
2890 // allocate MergeContexts. The number of merges needs to be >
2891 // MultiGetContext::MAX_BATCH_SIZE
2892 for (int i = 8; i < MultiGetContext::MAX_BATCH_SIZE + 24; ++i) {
2893 s = wb.Merge(handles[1], std::to_string(i), "merge");
2894 ASSERT_OK(s);
2895 }
2896
2897 // MultiGet a lot of keys in order to force std::vector reallocations
2898 std::vector<Slice> keys;
2899 for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE + 32; ++i) {
2900 keys.emplace_back(key_str[i]);
2901 }
2902 std::vector<PinnableSlice> values(keys.size());
2903 std::vector<Status> statuses(keys.size());
2904
2905 wb.MultiGetFromBatchAndDB(db, snapshot_read_options, handles[1], keys.size(), keys.data(),
2906 values.data(), statuses.data(), false);
2907 for (size_t i =0; i < keys.size(); ++i) {
2908 if (i == 1) {
2909 ASSERT_TRUE(statuses[1].IsNotFound());
2910 } else if (i == 2) {
2911 ASSERT_TRUE(statuses[2].ok());
2912 ASSERT_EQ(values[2], "new_val" + std::to_string(2));
2913 } else if (i >= 8 && i < 56) {
2914 ASSERT_TRUE(statuses[i].ok());
2915 ASSERT_EQ(values[i], "val" + std::to_string(i) + ",merge");
2916 } else {
2917 ASSERT_TRUE(statuses[i].ok());
2918 if (values[i] != "val" + std::to_string(i)) {
2919 ASSERT_EQ(values[i], "val" + std::to_string(i));
2920 }
2921 }
2922 }
2923
2924 for (auto handle : handles) {
2925 delete handle;
2926 }
2927 }
2928
TEST_P(TransactionTest,MultiGetSnapshot)2929 TEST_P(TransactionTest, MultiGetSnapshot) {
2930 WriteOptions write_options;
2931 TransactionOptions transaction_options;
2932 Transaction* txn1 = db->BeginTransaction(write_options, transaction_options);
2933
2934 Slice key = "foo";
2935
2936 Status s = txn1->Put(key, "bar");
2937 ASSERT_OK(s);
2938
2939 s = txn1->SetName("test");
2940 ASSERT_OK(s);
2941
2942 s = txn1->Prepare();
2943 ASSERT_OK(s);
2944
2945 // Get snapshot between prepare and commit
2946 // Un-committed data should be invisible to other transactions
2947 const Snapshot* s1 = db->GetSnapshot();
2948
2949 s = txn1->Commit();
2950 ASSERT_OK(s);
2951 delete txn1;
2952
2953 Transaction* txn2 = db->BeginTransaction(write_options, transaction_options);
2954 ReadOptions read_options;
2955 read_options.snapshot = s1;
2956
2957 std::vector<Slice> keys;
2958 std::vector<PinnableSlice> values(1);
2959 std::vector<Status> statuses(1);
2960 keys.push_back(key);
2961 auto cfd = db->DefaultColumnFamily();
2962 txn2->MultiGet(read_options, cfd, 1, keys.data(), values.data(),
2963 statuses.data());
2964 ASSERT_TRUE(statuses[0].IsNotFound());
2965 delete txn2;
2966
2967 db->ReleaseSnapshot(s1);
2968 }
2969
TEST_P(TransactionTest,ColumnFamiliesTest2)2970 TEST_P(TransactionTest, ColumnFamiliesTest2) {
2971 WriteOptions write_options;
2972 ReadOptions read_options, snapshot_read_options;
2973 string value;
2974 Status s;
2975
2976 ColumnFamilyHandle *one, *two;
2977 ColumnFamilyOptions cf_options;
2978
2979 // Create 2 new column families
2980 s = db->CreateColumnFamily(cf_options, "ONE", &one);
2981 ASSERT_OK(s);
2982 s = db->CreateColumnFamily(cf_options, "TWO", &two);
2983 ASSERT_OK(s);
2984
2985 Transaction* txn1 = db->BeginTransaction(write_options);
2986 ASSERT_TRUE(txn1);
2987 Transaction* txn2 = db->BeginTransaction(write_options);
2988 ASSERT_TRUE(txn2);
2989
2990 s = txn1->Put(one, "X", "1");
2991 ASSERT_OK(s);
2992 s = txn1->Put(two, "X", "2");
2993 ASSERT_OK(s);
2994 s = txn1->Put("X", "0");
2995 ASSERT_OK(s);
2996
2997 s = txn2->Put(one, "X", "11");
2998 ASSERT_TRUE(s.IsTimedOut());
2999
3000 s = txn1->Commit();
3001 ASSERT_OK(s);
3002
3003 // Drop first column family
3004 s = db->DropColumnFamily(one);
3005 ASSERT_OK(s);
3006
3007 // Should fail since column family was dropped.
3008 s = txn2->Commit();
3009 ASSERT_OK(s);
3010
3011 delete txn1;
3012 txn1 = db->BeginTransaction(write_options);
3013 ASSERT_TRUE(txn1);
3014
3015 // Should fail since column family was dropped
3016 s = txn1->Put(one, "X", "111");
3017 ASSERT_TRUE(s.IsInvalidArgument());
3018
3019 s = txn1->Put(two, "X", "222");
3020 ASSERT_OK(s);
3021
3022 s = txn1->Put("X", "000");
3023 ASSERT_OK(s);
3024
3025 s = txn1->Commit();
3026 ASSERT_OK(s);
3027
3028 s = db->Get(read_options, two, "X", &value);
3029 ASSERT_OK(s);
3030 ASSERT_EQ("222", value);
3031
3032 s = db->Get(read_options, "X", &value);
3033 ASSERT_OK(s);
3034 ASSERT_EQ("000", value);
3035
3036 s = db->DropColumnFamily(two);
3037 ASSERT_OK(s);
3038
3039 delete txn1;
3040 delete txn2;
3041
3042 delete one;
3043 delete two;
3044 }
3045
TEST_P(TransactionTest,EmptyTest)3046 TEST_P(TransactionTest, EmptyTest) {
3047 WriteOptions write_options;
3048 ReadOptions read_options;
3049 string value;
3050 Status s;
3051
3052 s = db->Put(write_options, "aaa", "aaa");
3053 ASSERT_OK(s);
3054
3055 Transaction* txn = db->BeginTransaction(write_options);
3056 s = txn->Commit();
3057 ASSERT_OK(s);
3058 delete txn;
3059
3060 txn = db->BeginTransaction(write_options);
3061 ASSERT_OK(txn->Rollback());
3062 delete txn;
3063
3064 txn = db->BeginTransaction(write_options);
3065 s = txn->GetForUpdate(read_options, "aaa", &value);
3066 ASSERT_EQ(value, "aaa");
3067
3068 s = txn->Commit();
3069 ASSERT_OK(s);
3070 delete txn;
3071
3072 txn = db->BeginTransaction(write_options);
3073 txn->SetSnapshot();
3074
3075 s = txn->GetForUpdate(read_options, "aaa", &value);
3076 ASSERT_EQ(value, "aaa");
3077
3078 // Conflicts with previous GetForUpdate
3079 s = db->Put(write_options, "aaa", "xxx");
3080 ASSERT_TRUE(s.IsTimedOut());
3081
3082 // transaction expired!
3083 s = txn->Commit();
3084 ASSERT_OK(s);
3085 delete txn;
3086 }
3087
TEST_P(TransactionTest,PredicateManyPreceders)3088 TEST_P(TransactionTest, PredicateManyPreceders) {
3089 WriteOptions write_options;
3090 ReadOptions read_options1, read_options2;
3091 TransactionOptions txn_options;
3092 string value;
3093 Status s;
3094
3095 txn_options.set_snapshot = true;
3096 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3097 read_options1.snapshot = txn1->GetSnapshot();
3098
3099 Transaction* txn2 = db->BeginTransaction(write_options);
3100 txn2->SetSnapshot();
3101 read_options2.snapshot = txn2->GetSnapshot();
3102
3103 std::vector<Slice> multiget_keys = {"1", "2", "3"};
3104 std::vector<std::string> multiget_values;
3105
3106 std::vector<Status> results =
3107 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
3108 ASSERT_EQ(results.size(), 3);
3109 ASSERT_TRUE(results[0].IsNotFound());
3110 ASSERT_TRUE(results[1].IsNotFound());
3111 ASSERT_TRUE(results[2].IsNotFound());
3112
3113 s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate
3114 ASSERT_TRUE(s.IsTimedOut());
3115
3116 ASSERT_OK(txn2->Rollback());
3117
3118 multiget_values.clear();
3119 results =
3120 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
3121 ASSERT_EQ(results.size(), 3);
3122 ASSERT_TRUE(results[0].IsNotFound());
3123 ASSERT_TRUE(results[1].IsNotFound());
3124 ASSERT_TRUE(results[2].IsNotFound());
3125
3126 s = txn1->Commit();
3127 ASSERT_OK(s);
3128
3129 delete txn1;
3130 delete txn2;
3131
3132 txn1 = db->BeginTransaction(write_options, txn_options);
3133 read_options1.snapshot = txn1->GetSnapshot();
3134
3135 txn2 = db->BeginTransaction(write_options, txn_options);
3136 read_options2.snapshot = txn2->GetSnapshot();
3137
3138 s = txn1->Put("4", "x");
3139 ASSERT_OK(s);
3140
3141 s = txn2->Delete("4"); // conflict
3142 ASSERT_TRUE(s.IsTimedOut());
3143
3144 s = txn1->Commit();
3145 ASSERT_OK(s);
3146
3147 s = txn2->GetForUpdate(read_options2, "4", &value);
3148 ASSERT_TRUE(s.IsBusy());
3149
3150 ASSERT_OK(txn2->Rollback());
3151
3152 delete txn1;
3153 delete txn2;
3154 }
3155
TEST_P(TransactionTest,LostUpdate)3156 TEST_P(TransactionTest, LostUpdate) {
3157 WriteOptions write_options;
3158 ReadOptions read_options, read_options1, read_options2;
3159 TransactionOptions txn_options;
3160 std::string value;
3161 Status s;
3162
3163 // Test 2 transactions writing to the same key in multiple orders and
3164 // with/without snapshots
3165
3166 Transaction* txn1 = db->BeginTransaction(write_options);
3167 Transaction* txn2 = db->BeginTransaction(write_options);
3168
3169 s = txn1->Put("1", "1");
3170 ASSERT_OK(s);
3171
3172 s = txn2->Put("1", "2"); // conflict
3173 ASSERT_TRUE(s.IsTimedOut());
3174
3175 s = txn2->Commit();
3176 ASSERT_OK(s);
3177
3178 s = txn1->Commit();
3179 ASSERT_OK(s);
3180
3181 s = db->Get(read_options, "1", &value);
3182 ASSERT_OK(s);
3183 ASSERT_EQ("1", value);
3184
3185 delete txn1;
3186 delete txn2;
3187
3188 txn_options.set_snapshot = true;
3189 txn1 = db->BeginTransaction(write_options, txn_options);
3190 read_options1.snapshot = txn1->GetSnapshot();
3191
3192 txn2 = db->BeginTransaction(write_options, txn_options);
3193 read_options2.snapshot = txn2->GetSnapshot();
3194
3195 s = txn1->Put("1", "3");
3196 ASSERT_OK(s);
3197 s = txn2->Put("1", "4"); // conflict
3198 ASSERT_TRUE(s.IsTimedOut());
3199
3200 s = txn1->Commit();
3201 ASSERT_OK(s);
3202
3203 s = txn2->Commit();
3204 ASSERT_OK(s);
3205
3206 s = db->Get(read_options, "1", &value);
3207 ASSERT_OK(s);
3208 ASSERT_EQ("3", value);
3209
3210 delete txn1;
3211 delete txn2;
3212
3213 txn1 = db->BeginTransaction(write_options, txn_options);
3214 read_options1.snapshot = txn1->GetSnapshot();
3215
3216 txn2 = db->BeginTransaction(write_options, txn_options);
3217 read_options2.snapshot = txn2->GetSnapshot();
3218
3219 s = txn1->Put("1", "5");
3220 ASSERT_OK(s);
3221
3222 s = txn1->Commit();
3223 ASSERT_OK(s);
3224
3225 s = txn2->Put("1", "6");
3226 ASSERT_TRUE(s.IsBusy());
3227 s = txn2->Commit();
3228 ASSERT_OK(s);
3229
3230 s = db->Get(read_options, "1", &value);
3231 ASSERT_OK(s);
3232 ASSERT_EQ("5", value);
3233
3234 delete txn1;
3235 delete txn2;
3236
3237 txn1 = db->BeginTransaction(write_options, txn_options);
3238 read_options1.snapshot = txn1->GetSnapshot();
3239
3240 txn2 = db->BeginTransaction(write_options, txn_options);
3241 read_options2.snapshot = txn2->GetSnapshot();
3242
3243 s = txn1->Put("1", "7");
3244 ASSERT_OK(s);
3245 s = txn1->Commit();
3246 ASSERT_OK(s);
3247
3248 txn2->SetSnapshot();
3249 s = txn2->Put("1", "8");
3250 ASSERT_OK(s);
3251 s = txn2->Commit();
3252 ASSERT_OK(s);
3253
3254 s = db->Get(read_options, "1", &value);
3255 ASSERT_OK(s);
3256 ASSERT_EQ("8", value);
3257
3258 delete txn1;
3259 delete txn2;
3260
3261 txn1 = db->BeginTransaction(write_options);
3262 txn2 = db->BeginTransaction(write_options);
3263
3264 s = txn1->Put("1", "9");
3265 ASSERT_OK(s);
3266 s = txn1->Commit();
3267 ASSERT_OK(s);
3268
3269 s = txn2->Put("1", "10");
3270 ASSERT_OK(s);
3271 s = txn2->Commit();
3272 ASSERT_OK(s);
3273
3274 delete txn1;
3275 delete txn2;
3276
3277 s = db->Get(read_options, "1", &value);
3278 ASSERT_OK(s);
3279 ASSERT_EQ(value, "10");
3280 }
3281
TEST_P(TransactionTest,UntrackedWrites)3282 TEST_P(TransactionTest, UntrackedWrites) {
3283 if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3284 // TODO(lth): For WriteUnprepared, validate that untracked writes are
3285 // not supported.
3286 return;
3287 }
3288
3289 WriteOptions write_options;
3290 ReadOptions read_options;
3291 std::string value;
3292 Status s;
3293
3294 // Verify transaction rollback works for untracked keys.
3295 Transaction* txn = db->BeginTransaction(write_options);
3296 txn->SetSnapshot();
3297
3298 s = txn->PutUntracked("untracked", "0");
3299 ASSERT_OK(s);
3300 ASSERT_OK(txn->Rollback());
3301 s = db->Get(read_options, "untracked", &value);
3302 ASSERT_TRUE(s.IsNotFound());
3303
3304 delete txn;
3305 txn = db->BeginTransaction(write_options);
3306 txn->SetSnapshot();
3307
3308 s = db->Put(write_options, "untracked", "x");
3309 ASSERT_OK(s);
3310
3311 // Untracked writes should succeed even though key was written after snapshot
3312 s = txn->PutUntracked("untracked", "1");
3313 ASSERT_OK(s);
3314 s = txn->MergeUntracked("untracked", "2");
3315 ASSERT_OK(s);
3316 s = txn->DeleteUntracked("untracked");
3317 ASSERT_OK(s);
3318
3319 // Conflict
3320 s = txn->Put("untracked", "3");
3321 ASSERT_TRUE(s.IsBusy());
3322
3323 s = txn->Commit();
3324 ASSERT_OK(s);
3325
3326 s = db->Get(read_options, "untracked", &value);
3327 ASSERT_TRUE(s.IsNotFound());
3328
3329 delete txn;
3330 }
3331
TEST_P(TransactionTest,ExpiredTransaction)3332 TEST_P(TransactionTest, ExpiredTransaction) {
3333 WriteOptions write_options;
3334 ReadOptions read_options;
3335 TransactionOptions txn_options;
3336 string value;
3337 Status s;
3338
3339 // Set txn expiration timeout to 0 microseconds (expires instantly)
3340 txn_options.expiration = 0;
3341 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3342
3343 s = txn1->Put("X", "1");
3344 ASSERT_OK(s);
3345
3346 s = txn1->Put("Y", "1");
3347 ASSERT_OK(s);
3348
3349 Transaction* txn2 = db->BeginTransaction(write_options);
3350
3351 // txn2 should be able to write to X since txn1 has expired
3352 s = txn2->Put("X", "2");
3353 ASSERT_OK(s);
3354
3355 s = txn2->Commit();
3356 ASSERT_OK(s);
3357 s = db->Get(read_options, "X", &value);
3358 ASSERT_OK(s);
3359 ASSERT_EQ("2", value);
3360
3361 s = txn1->Put("Z", "1");
3362 ASSERT_OK(s);
3363
3364 // txn1 should fail to commit since it is expired
3365 s = txn1->Commit();
3366 ASSERT_TRUE(s.IsExpired());
3367
3368 s = db->Get(read_options, "Y", &value);
3369 ASSERT_TRUE(s.IsNotFound());
3370
3371 s = db->Get(read_options, "Z", &value);
3372 ASSERT_TRUE(s.IsNotFound());
3373
3374 delete txn1;
3375 delete txn2;
3376 }
3377
TEST_P(TransactionTest,ReinitializeTest)3378 TEST_P(TransactionTest, ReinitializeTest) {
3379 WriteOptions write_options;
3380 ReadOptions read_options;
3381 TransactionOptions txn_options;
3382 std::string value;
3383 Status s;
3384
3385 // Set txn expiration timeout to 0 microseconds (expires instantly)
3386 txn_options.expiration = 0;
3387 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3388
3389 // Reinitialize transaction to no long expire
3390 txn_options.expiration = -1;
3391 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3392
3393 s = txn1->Put("Z", "z");
3394 ASSERT_OK(s);
3395
3396 // Should commit since not expired
3397 s = txn1->Commit();
3398 ASSERT_OK(s);
3399
3400 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3401
3402 s = txn1->Put("Z", "zz");
3403 ASSERT_OK(s);
3404
3405 // Reinitilize txn1 and verify that Z gets unlocked
3406 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3407
3408 Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr);
3409 s = txn2->Put("Z", "zzz");
3410 ASSERT_OK(s);
3411 s = txn2->Commit();
3412 ASSERT_OK(s);
3413 delete txn2;
3414
3415 s = db->Get(read_options, "Z", &value);
3416 ASSERT_OK(s);
3417 ASSERT_EQ(value, "zzz");
3418
3419 // Verify snapshots get reinitialized correctly
3420 txn1->SetSnapshot();
3421 s = txn1->Put("Z", "zzzz");
3422 ASSERT_OK(s);
3423
3424 s = txn1->Commit();
3425 ASSERT_OK(s);
3426
3427 s = db->Get(read_options, "Z", &value);
3428 ASSERT_OK(s);
3429 ASSERT_EQ(value, "zzzz");
3430
3431 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3432 const Snapshot* snapshot = txn1->GetSnapshot();
3433 ASSERT_FALSE(snapshot);
3434
3435 txn_options.set_snapshot = true;
3436 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3437 snapshot = txn1->GetSnapshot();
3438 ASSERT_TRUE(snapshot);
3439
3440 s = txn1->Put("Z", "a");
3441 ASSERT_OK(s);
3442
3443 ASSERT_OK(txn1->Rollback());
3444
3445 s = txn1->Put("Y", "y");
3446 ASSERT_OK(s);
3447
3448 txn_options.set_snapshot = false;
3449 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3450 snapshot = txn1->GetSnapshot();
3451 ASSERT_FALSE(snapshot);
3452
3453 s = txn1->Put("X", "x");
3454 ASSERT_OK(s);
3455
3456 s = txn1->Commit();
3457 ASSERT_OK(s);
3458
3459 s = db->Get(read_options, "Z", &value);
3460 ASSERT_OK(s);
3461 ASSERT_EQ(value, "zzzz");
3462
3463 s = db->Get(read_options, "Y", &value);
3464 ASSERT_TRUE(s.IsNotFound());
3465
3466 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3467
3468 s = txn1->SetName("name");
3469 ASSERT_OK(s);
3470
3471 s = txn1->Prepare();
3472 ASSERT_OK(s);
3473 s = txn1->Commit();
3474 ASSERT_OK(s);
3475
3476 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3477
3478 s = txn1->SetName("name");
3479 ASSERT_OK(s);
3480
3481 delete txn1;
3482 }
3483
TEST_P(TransactionTest,Rollback)3484 TEST_P(TransactionTest, Rollback) {
3485 WriteOptions write_options;
3486 ReadOptions read_options;
3487 TransactionOptions txn_options;
3488 std::string value;
3489 Status s;
3490
3491 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3492
3493 ASSERT_OK(s);
3494
3495 s = txn1->Put("X", "1");
3496 ASSERT_OK(s);
3497
3498 Transaction* txn2 = db->BeginTransaction(write_options);
3499
3500 // txn2 should not be able to write to X since txn1 has it locked
3501 s = txn2->Put("X", "2");
3502 ASSERT_TRUE(s.IsTimedOut());
3503
3504 ASSERT_OK(txn1->Rollback());
3505 delete txn1;
3506
3507 // txn2 should now be able to write to X
3508 s = txn2->Put("X", "3");
3509 ASSERT_OK(s);
3510
3511 s = txn2->Commit();
3512 ASSERT_OK(s);
3513
3514 s = db->Get(read_options, "X", &value);
3515 ASSERT_OK(s);
3516 ASSERT_EQ("3", value);
3517
3518 delete txn2;
3519 }
3520
TEST_P(TransactionTest,LockLimitTest)3521 TEST_P(TransactionTest, LockLimitTest) {
3522 WriteOptions write_options;
3523 ReadOptions read_options, snapshot_read_options;
3524 TransactionOptions txn_options;
3525 string value;
3526 Status s;
3527
3528 delete db;
3529 db = nullptr;
3530
3531 // Open DB with a lock limit of 3
3532 txn_db_options.max_num_locks = 3;
3533 ASSERT_OK(ReOpen());
3534 assert(db != nullptr);
3535 ASSERT_OK(s);
3536
3537 // Create a txn and verify we can only lock up to 3 keys
3538 Transaction* txn = db->BeginTransaction(write_options, txn_options);
3539 ASSERT_TRUE(txn);
3540
3541 s = txn->Put("X", "x");
3542 ASSERT_OK(s);
3543
3544 s = txn->Put("Y", "y");
3545 ASSERT_OK(s);
3546
3547 s = txn->Put("Z", "z");
3548 ASSERT_OK(s);
3549
3550 // lock limit reached
3551 s = txn->Put("W", "w");
3552 ASSERT_TRUE(s.IsBusy());
3553
3554 // re-locking same key shouldn't put us over the limit
3555 s = txn->Put("X", "xx");
3556 ASSERT_OK(s);
3557
3558 s = txn->GetForUpdate(read_options, "W", &value);
3559 ASSERT_TRUE(s.IsBusy());
3560 s = txn->GetForUpdate(read_options, "V", &value);
3561 ASSERT_TRUE(s.IsBusy());
3562
3563 // re-locking same key shouldn't put us over the limit
3564 s = txn->GetForUpdate(read_options, "Y", &value);
3565 ASSERT_OK(s);
3566 ASSERT_EQ("y", value);
3567
3568 s = txn->Get(read_options, "W", &value);
3569 ASSERT_TRUE(s.IsNotFound());
3570
3571 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
3572 ASSERT_TRUE(txn2);
3573
3574 // "X" currently locked
3575 s = txn2->Put("X", "x");
3576 ASSERT_TRUE(s.IsTimedOut());
3577
3578 // lock limit reached
3579 s = txn2->Put("M", "m");
3580 ASSERT_TRUE(s.IsBusy());
3581
3582 s = txn->Commit();
3583 ASSERT_OK(s);
3584
3585 s = db->Get(read_options, "X", &value);
3586 ASSERT_OK(s);
3587 ASSERT_EQ("xx", value);
3588
3589 s = db->Get(read_options, "W", &value);
3590 ASSERT_TRUE(s.IsNotFound());
3591
3592 // Committing txn should release its locks and allow txn2 to proceed
3593 s = txn2->Put("X", "x2");
3594 ASSERT_OK(s);
3595
3596 s = txn2->Delete("X");
3597 ASSERT_OK(s);
3598
3599 s = txn2->Put("M", "m");
3600 ASSERT_OK(s);
3601
3602 s = txn2->Put("Z", "z2");
3603 ASSERT_OK(s);
3604
3605 // lock limit reached
3606 s = txn2->Delete("Y");
3607 ASSERT_TRUE(s.IsBusy());
3608
3609 s = txn2->Commit();
3610 ASSERT_OK(s);
3611
3612 s = db->Get(read_options, "Z", &value);
3613 ASSERT_OK(s);
3614 ASSERT_EQ("z2", value);
3615
3616 s = db->Get(read_options, "Y", &value);
3617 ASSERT_OK(s);
3618 ASSERT_EQ("y", value);
3619
3620 s = db->Get(read_options, "X", &value);
3621 ASSERT_TRUE(s.IsNotFound());
3622
3623 delete txn;
3624 delete txn2;
3625 }
3626
TEST_P(TransactionTest,IteratorTest)3627 TEST_P(TransactionTest, IteratorTest) {
3628 // This test does writes without snapshot validation, and then tries to create
3629 // iterator later, which is unsupported in write unprepared.
3630 if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3631 return;
3632 }
3633
3634 WriteOptions write_options;
3635 ReadOptions read_options, snapshot_read_options;
3636 std::string value;
3637 Status s;
3638
3639 // Write some keys to the db
3640 s = db->Put(write_options, "A", "a");
3641 ASSERT_OK(s);
3642
3643 s = db->Put(write_options, "G", "g");
3644 ASSERT_OK(s);
3645
3646 s = db->Put(write_options, "F", "f");
3647 ASSERT_OK(s);
3648
3649 s = db->Put(write_options, "C", "c");
3650 ASSERT_OK(s);
3651
3652 s = db->Put(write_options, "D", "d");
3653 ASSERT_OK(s);
3654
3655 Transaction* txn = db->BeginTransaction(write_options);
3656 ASSERT_TRUE(txn);
3657
3658 // Write some keys in a txn
3659 s = txn->Put("B", "b");
3660 ASSERT_OK(s);
3661
3662 s = txn->Put("H", "h");
3663 ASSERT_OK(s);
3664
3665 s = txn->Delete("D");
3666 ASSERT_OK(s);
3667
3668 s = txn->Put("E", "e");
3669 ASSERT_OK(s);
3670
3671 txn->SetSnapshot();
3672 const Snapshot* snapshot = txn->GetSnapshot();
3673
3674 // Write some keys to the db after the snapshot
3675 s = db->Put(write_options, "BB", "xx");
3676 ASSERT_OK(s);
3677
3678 s = db->Put(write_options, "C", "xx");
3679 ASSERT_OK(s);
3680
3681 read_options.snapshot = snapshot;
3682 Iterator* iter = txn->GetIterator(read_options);
3683 ASSERT_OK(iter->status());
3684 iter->SeekToFirst();
3685
3686 // Read all keys via iter and lock them all
3687 std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
3688 for (int i = 0; i < 7; i++) {
3689 ASSERT_OK(iter->status());
3690 ASSERT_TRUE(iter->Valid());
3691 ASSERT_EQ(results[i], iter->value().ToString());
3692
3693 s = txn->GetForUpdate(read_options, iter->key(), nullptr);
3694 if (i == 2) {
3695 // "C" was modified after txn's snapshot
3696 ASSERT_TRUE(s.IsBusy());
3697 } else {
3698 ASSERT_OK(s);
3699 }
3700
3701 iter->Next();
3702 }
3703 ASSERT_FALSE(iter->Valid());
3704
3705 iter->Seek("G");
3706 ASSERT_OK(iter->status());
3707 ASSERT_TRUE(iter->Valid());
3708 ASSERT_EQ("g", iter->value().ToString());
3709
3710 iter->Prev();
3711 ASSERT_OK(iter->status());
3712 ASSERT_TRUE(iter->Valid());
3713 ASSERT_EQ("f", iter->value().ToString());
3714
3715 iter->Seek("D");
3716 ASSERT_OK(iter->status());
3717 ASSERT_TRUE(iter->Valid());
3718 ASSERT_EQ("e", iter->value().ToString());
3719
3720 iter->Seek("C");
3721 ASSERT_OK(iter->status());
3722 ASSERT_TRUE(iter->Valid());
3723 ASSERT_EQ("c", iter->value().ToString());
3724
3725 iter->Next();
3726 ASSERT_OK(iter->status());
3727 ASSERT_TRUE(iter->Valid());
3728 ASSERT_EQ("e", iter->value().ToString());
3729
3730 iter->Seek("");
3731 ASSERT_OK(iter->status());
3732 ASSERT_TRUE(iter->Valid());
3733 ASSERT_EQ("a", iter->value().ToString());
3734
3735 iter->Seek("X");
3736 ASSERT_OK(iter->status());
3737 ASSERT_FALSE(iter->Valid());
3738
3739 iter->SeekToLast();
3740 ASSERT_OK(iter->status());
3741 ASSERT_TRUE(iter->Valid());
3742 ASSERT_EQ("h", iter->value().ToString());
3743
3744 s = txn->Commit();
3745 ASSERT_OK(s);
3746
3747 delete iter;
3748 delete txn;
3749 }
3750
TEST_P(TransactionTest,DisableIndexingTest)3751 TEST_P(TransactionTest, DisableIndexingTest) {
3752 // Skip this test for write unprepared. It does not solely rely on WBWI for
3753 // read your own writes, so depending on whether batches are flushed or not,
3754 // only some writes will be visible.
3755 //
3756 // Also, write unprepared does not support creating iterators if there has
3757 // been txn->Put() without snapshot validation.
3758 if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3759 return;
3760 }
3761
3762 WriteOptions write_options;
3763 ReadOptions read_options;
3764 std::string value;
3765 Status s;
3766
3767 Transaction* txn = db->BeginTransaction(write_options);
3768 ASSERT_TRUE(txn);
3769
3770 s = txn->Put("A", "a");
3771 ASSERT_OK(s);
3772
3773 s = txn->Get(read_options, "A", &value);
3774 ASSERT_OK(s);
3775 ASSERT_EQ("a", value);
3776
3777 txn->DisableIndexing();
3778
3779 s = txn->Put("B", "b");
3780 ASSERT_OK(s);
3781
3782 s = txn->Get(read_options, "B", &value);
3783 ASSERT_TRUE(s.IsNotFound());
3784
3785 Iterator* iter = txn->GetIterator(read_options);
3786 ASSERT_OK(iter->status());
3787
3788 iter->Seek("B");
3789 ASSERT_OK(iter->status());
3790 ASSERT_FALSE(iter->Valid());
3791
3792 s = txn->Delete("A");
3793
3794 s = txn->Get(read_options, "A", &value);
3795 ASSERT_OK(s);
3796 ASSERT_EQ("a", value);
3797
3798 txn->EnableIndexing();
3799
3800 s = txn->Put("B", "bb");
3801 ASSERT_OK(s);
3802
3803 iter->Seek("B");
3804 ASSERT_OK(iter->status());
3805 ASSERT_TRUE(iter->Valid());
3806 ASSERT_EQ("bb", iter->value().ToString());
3807
3808 s = txn->Get(read_options, "B", &value);
3809 ASSERT_OK(s);
3810 ASSERT_EQ("bb", value);
3811
3812 s = txn->Put("A", "aa");
3813 ASSERT_OK(s);
3814
3815 s = txn->Get(read_options, "A", &value);
3816 ASSERT_OK(s);
3817 ASSERT_EQ("aa", value);
3818
3819 delete iter;
3820 delete txn;
3821 }
3822
TEST_P(TransactionTest,SavepointTest)3823 TEST_P(TransactionTest, SavepointTest) {
3824 WriteOptions write_options;
3825 ReadOptions read_options, snapshot_read_options;
3826 std::string value;
3827 Status s;
3828
3829 Transaction* txn = db->BeginTransaction(write_options);
3830 ASSERT_TRUE(txn);
3831
3832 ASSERT_EQ(0, txn->GetNumPuts());
3833
3834 s = txn->RollbackToSavePoint();
3835 ASSERT_TRUE(s.IsNotFound());
3836
3837 txn->SetSavePoint(); // 1
3838
3839 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
3840 s = txn->RollbackToSavePoint();
3841 ASSERT_TRUE(s.IsNotFound());
3842
3843 s = txn->Put("B", "b");
3844 ASSERT_OK(s);
3845
3846 ASSERT_EQ(1, txn->GetNumPuts());
3847 ASSERT_EQ(0, txn->GetNumDeletes());
3848
3849 s = txn->Commit();
3850 ASSERT_OK(s);
3851
3852 s = db->Get(read_options, "B", &value);
3853 ASSERT_OK(s);
3854 ASSERT_EQ("b", value);
3855
3856 delete txn;
3857 txn = db->BeginTransaction(write_options);
3858 ASSERT_TRUE(txn);
3859
3860 s = txn->Put("A", "a");
3861 ASSERT_OK(s);
3862
3863 s = txn->Put("B", "bb");
3864 ASSERT_OK(s);
3865
3866 s = txn->Put("C", "c");
3867 ASSERT_OK(s);
3868
3869 txn->SetSavePoint(); // 2
3870
3871 s = txn->Delete("B");
3872 ASSERT_OK(s);
3873
3874 s = txn->Put("C", "cc");
3875 ASSERT_OK(s);
3876
3877 s = txn->Put("D", "d");
3878 ASSERT_OK(s);
3879
3880 ASSERT_EQ(5, txn->GetNumPuts());
3881 ASSERT_EQ(1, txn->GetNumDeletes());
3882
3883 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
3884
3885 ASSERT_EQ(3, txn->GetNumPuts());
3886 ASSERT_EQ(0, txn->GetNumDeletes());
3887
3888 s = txn->Get(read_options, "A", &value);
3889 ASSERT_OK(s);
3890 ASSERT_EQ("a", value);
3891
3892 s = txn->Get(read_options, "B", &value);
3893 ASSERT_OK(s);
3894 ASSERT_EQ("bb", value);
3895
3896 s = txn->Get(read_options, "C", &value);
3897 ASSERT_OK(s);
3898 ASSERT_EQ("c", value);
3899
3900 s = txn->Get(read_options, "D", &value);
3901 ASSERT_TRUE(s.IsNotFound());
3902
3903 s = txn->Put("A", "a");
3904 ASSERT_OK(s);
3905
3906 s = txn->Put("E", "e");
3907 ASSERT_OK(s);
3908
3909 ASSERT_EQ(5, txn->GetNumPuts());
3910 ASSERT_EQ(0, txn->GetNumDeletes());
3911
3912 // Rollback to beginning of txn
3913 s = txn->RollbackToSavePoint();
3914 ASSERT_TRUE(s.IsNotFound());
3915 ASSERT_OK(txn->Rollback());
3916
3917 ASSERT_EQ(0, txn->GetNumPuts());
3918 ASSERT_EQ(0, txn->GetNumDeletes());
3919
3920 s = txn->Get(read_options, "A", &value);
3921 ASSERT_TRUE(s.IsNotFound());
3922
3923 s = txn->Get(read_options, "B", &value);
3924 ASSERT_OK(s);
3925 ASSERT_EQ("b", value);
3926
3927 s = txn->Get(read_options, "D", &value);
3928 ASSERT_TRUE(s.IsNotFound());
3929
3930 s = txn->Get(read_options, "D", &value);
3931 ASSERT_TRUE(s.IsNotFound());
3932
3933 s = txn->Get(read_options, "E", &value);
3934 ASSERT_TRUE(s.IsNotFound());
3935
3936 s = txn->Put("A", "aa");
3937 ASSERT_OK(s);
3938
3939 s = txn->Put("F", "f");
3940 ASSERT_OK(s);
3941
3942 ASSERT_EQ(2, txn->GetNumPuts());
3943 ASSERT_EQ(0, txn->GetNumDeletes());
3944
3945 txn->SetSavePoint(); // 3
3946 txn->SetSavePoint(); // 4
3947
3948 s = txn->Put("G", "g");
3949 ASSERT_OK(s);
3950
3951 s = txn->SingleDelete("F");
3952 ASSERT_OK(s);
3953
3954 s = txn->Delete("B");
3955 ASSERT_OK(s);
3956
3957 s = txn->Get(read_options, "A", &value);
3958 ASSERT_OK(s);
3959 ASSERT_EQ("aa", value);
3960
3961 s = txn->Get(read_options, "F", &value);
3962 // According to db.h, doing a SingleDelete on a key that has been
3963 // overwritten will have undefinied behavior. So it is unclear what the
3964 // result of fetching "F" should be. The current implementation will
3965 // return NotFound in this case.
3966 ASSERT_TRUE(s.IsNotFound());
3967
3968 s = txn->Get(read_options, "B", &value);
3969 ASSERT_TRUE(s.IsNotFound());
3970
3971 ASSERT_EQ(3, txn->GetNumPuts());
3972 ASSERT_EQ(2, txn->GetNumDeletes());
3973
3974 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
3975
3976 ASSERT_EQ(2, txn->GetNumPuts());
3977 ASSERT_EQ(0, txn->GetNumDeletes());
3978
3979 s = txn->Get(read_options, "F", &value);
3980 ASSERT_OK(s);
3981 ASSERT_EQ("f", value);
3982
3983 s = txn->Get(read_options, "G", &value);
3984 ASSERT_TRUE(s.IsNotFound());
3985
3986 s = txn->Commit();
3987 ASSERT_OK(s);
3988
3989 s = db->Get(read_options, "F", &value);
3990 ASSERT_OK(s);
3991 ASSERT_EQ("f", value);
3992
3993 s = db->Get(read_options, "G", &value);
3994 ASSERT_TRUE(s.IsNotFound());
3995
3996 s = db->Get(read_options, "A", &value);
3997 ASSERT_OK(s);
3998 ASSERT_EQ("aa", value);
3999
4000 s = db->Get(read_options, "B", &value);
4001 ASSERT_OK(s);
4002 ASSERT_EQ("b", value);
4003
4004 s = db->Get(read_options, "C", &value);
4005 ASSERT_TRUE(s.IsNotFound());
4006
4007 s = db->Get(read_options, "D", &value);
4008 ASSERT_TRUE(s.IsNotFound());
4009
4010 s = db->Get(read_options, "E", &value);
4011 ASSERT_TRUE(s.IsNotFound());
4012
4013 delete txn;
4014 }
4015
TEST_P(TransactionTest,SavepointTest2)4016 TEST_P(TransactionTest, SavepointTest2) {
4017 WriteOptions write_options;
4018 ReadOptions read_options;
4019 TransactionOptions txn_options;
4020 Status s;
4021
4022 txn_options.lock_timeout = 1; // 1 ms
4023 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4024 ASSERT_TRUE(txn1);
4025
4026 s = txn1->Put("A", "");
4027 ASSERT_OK(s);
4028
4029 txn1->SetSavePoint(); // 1
4030
4031 s = txn1->Put("A", "a");
4032 ASSERT_OK(s);
4033
4034 s = txn1->Put("C", "c");
4035 ASSERT_OK(s);
4036
4037 txn1->SetSavePoint(); // 2
4038
4039 s = txn1->Put("A", "a");
4040 ASSERT_OK(s);
4041 s = txn1->Put("B", "b");
4042 ASSERT_OK(s);
4043
4044 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2
4045
4046 // Verify that "A" and "C" is still locked while "B" is not
4047 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4048 ASSERT_TRUE(txn2);
4049
4050 s = txn2->Put("A", "a2");
4051 ASSERT_TRUE(s.IsTimedOut());
4052 s = txn2->Put("C", "c2");
4053 ASSERT_TRUE(s.IsTimedOut());
4054 s = txn2->Put("B", "b2");
4055 ASSERT_OK(s);
4056
4057 s = txn1->Put("A", "aa");
4058 ASSERT_OK(s);
4059 s = txn1->Put("B", "bb");
4060 ASSERT_TRUE(s.IsTimedOut());
4061
4062 s = txn2->Commit();
4063 ASSERT_OK(s);
4064 delete txn2;
4065
4066 s = txn1->Put("A", "aaa");
4067 ASSERT_OK(s);
4068 s = txn1->Put("B", "bbb");
4069 ASSERT_OK(s);
4070 s = txn1->Put("C", "ccc");
4071 ASSERT_OK(s);
4072
4073 txn1->SetSavePoint(); // 3
4074 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3
4075
4076 // Verify that "A", "B", "C" are still locked
4077 txn2 = db->BeginTransaction(write_options, txn_options);
4078 ASSERT_TRUE(txn2);
4079
4080 s = txn2->Put("A", "a2");
4081 ASSERT_TRUE(s.IsTimedOut());
4082 s = txn2->Put("B", "b2");
4083 ASSERT_TRUE(s.IsTimedOut());
4084 s = txn2->Put("C", "c2");
4085 ASSERT_TRUE(s.IsTimedOut());
4086
4087 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
4088
4089 // Verify that only "A" is locked
4090 s = txn2->Put("A", "a3");
4091 ASSERT_TRUE(s.IsTimedOut());
4092 s = txn2->Put("B", "b3");
4093 ASSERT_OK(s);
4094 s = txn2->Put("C", "c3po");
4095 ASSERT_OK(s);
4096
4097 s = txn1->Commit();
4098 ASSERT_OK(s);
4099 delete txn1;
4100
4101 // Verify "A" "C" "B" are no longer locked
4102 s = txn2->Put("A", "a4");
4103 ASSERT_OK(s);
4104 s = txn2->Put("B", "b4");
4105 ASSERT_OK(s);
4106 s = txn2->Put("C", "c4");
4107 ASSERT_OK(s);
4108
4109 s = txn2->Commit();
4110 ASSERT_OK(s);
4111 delete txn2;
4112 }
4113
TEST_P(TransactionTest,SavepointTest3)4114 TEST_P(TransactionTest, SavepointTest3) {
4115 WriteOptions write_options;
4116 ReadOptions read_options;
4117 TransactionOptions txn_options;
4118 Status s;
4119
4120 txn_options.lock_timeout = 1; // 1 ms
4121 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4122 ASSERT_TRUE(txn1);
4123
4124 s = txn1->PopSavePoint(); // No SavePoint present
4125 ASSERT_TRUE(s.IsNotFound());
4126
4127 s = txn1->Put("A", "");
4128 ASSERT_OK(s);
4129
4130 s = txn1->PopSavePoint(); // Still no SavePoint present
4131 ASSERT_TRUE(s.IsNotFound());
4132
4133 txn1->SetSavePoint(); // 1
4134
4135 s = txn1->Put("A", "a");
4136 ASSERT_OK(s);
4137
4138 s = txn1->PopSavePoint(); // Remove 1
4139 ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
4140
4141 // Verify that "A" is still locked
4142 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4143 ASSERT_TRUE(txn2);
4144
4145 s = txn2->Put("A", "a2");
4146 ASSERT_TRUE(s.IsTimedOut());
4147 delete txn2;
4148
4149 txn1->SetSavePoint(); // 2
4150
4151 s = txn1->Put("B", "b");
4152 ASSERT_OK(s);
4153
4154 txn1->SetSavePoint(); // 3
4155
4156 s = txn1->Put("B", "b2");
4157 ASSERT_OK(s);
4158
4159 ASSERT_OK(txn1->RollbackToSavePoint()); // Roll back to 2
4160
4161 s = txn1->PopSavePoint();
4162 ASSERT_OK(s);
4163
4164 s = txn1->PopSavePoint();
4165 ASSERT_TRUE(s.IsNotFound());
4166
4167 s = txn1->Commit();
4168 ASSERT_OK(s);
4169 delete txn1;
4170
4171 std::string value;
4172
4173 // tnx1 should have modified "A" to "a"
4174 s = db->Get(read_options, "A", &value);
4175 ASSERT_OK(s);
4176 ASSERT_EQ("a", value);
4177
4178 // tnx1 should have set "B" to just "b"
4179 s = db->Get(read_options, "B", &value);
4180 ASSERT_OK(s);
4181 ASSERT_EQ("b", value);
4182
4183 s = db->Get(read_options, "C", &value);
4184 ASSERT_TRUE(s.IsNotFound());
4185 }
4186
TEST_P(TransactionTest,SavepointTest4)4187 TEST_P(TransactionTest, SavepointTest4) {
4188 WriteOptions write_options;
4189 ReadOptions read_options;
4190 TransactionOptions txn_options;
4191 Status s;
4192
4193 txn_options.lock_timeout = 1; // 1 ms
4194 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4195 ASSERT_TRUE(txn1);
4196
4197 txn1->SetSavePoint(); // 1
4198 s = txn1->Put("A", "a");
4199 ASSERT_OK(s);
4200
4201 txn1->SetSavePoint(); // 2
4202 s = txn1->Put("B", "b");
4203 ASSERT_OK(s);
4204
4205 s = txn1->PopSavePoint(); // Remove 2
4206 ASSERT_OK(s);
4207
4208 // Verify that A/B still exists.
4209 std::string value;
4210 ASSERT_OK(txn1->Get(read_options, "A", &value));
4211 ASSERT_EQ("a", value);
4212
4213 ASSERT_OK(txn1->Get(read_options, "B", &value));
4214 ASSERT_EQ("b", value);
4215
4216 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
4217
4218 // Verify that everything was rolled back.
4219 s = txn1->Get(read_options, "A", &value);
4220 ASSERT_TRUE(s.IsNotFound());
4221
4222 s = txn1->Get(read_options, "B", &value);
4223 ASSERT_TRUE(s.IsNotFound());
4224
4225 // Nothing should be locked
4226 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4227 ASSERT_TRUE(txn2);
4228
4229 s = txn2->Put("A", "");
4230 ASSERT_OK(s);
4231
4232 s = txn2->Put("B", "");
4233 ASSERT_OK(s);
4234
4235 delete txn2;
4236 delete txn1;
4237 }
4238
TEST_P(TransactionTest,UndoGetForUpdateTest)4239 TEST_P(TransactionTest, UndoGetForUpdateTest) {
4240 WriteOptions write_options;
4241 ReadOptions read_options;
4242 TransactionOptions txn_options;
4243 std::string value;
4244 Status s;
4245
4246 txn_options.lock_timeout = 1; // 1 ms
4247 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4248 ASSERT_TRUE(txn1);
4249
4250 txn1->UndoGetForUpdate("A");
4251
4252 s = txn1->Commit();
4253 ASSERT_OK(s);
4254 delete txn1;
4255
4256 txn1 = db->BeginTransaction(write_options, txn_options);
4257
4258 txn1->UndoGetForUpdate("A");
4259 s = txn1->GetForUpdate(read_options, "A", &value);
4260 ASSERT_TRUE(s.IsNotFound());
4261
4262 // Verify that A is locked
4263 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4264 s = txn2->Put("A", "a");
4265 ASSERT_TRUE(s.IsTimedOut());
4266
4267 txn1->UndoGetForUpdate("A");
4268
4269 // Verify that A is now unlocked
4270 s = txn2->Put("A", "a2");
4271 ASSERT_OK(s);
4272 ASSERT_OK(txn2->Commit());
4273 delete txn2;
4274 s = db->Get(read_options, "A", &value);
4275 ASSERT_OK(s);
4276 ASSERT_EQ("a2", value);
4277
4278 s = txn1->Delete("A");
4279 ASSERT_OK(s);
4280 s = txn1->GetForUpdate(read_options, "A", &value);
4281 ASSERT_TRUE(s.IsNotFound());
4282
4283 s = txn1->Put("B", "b3");
4284 ASSERT_OK(s);
4285 s = txn1->GetForUpdate(read_options, "B", &value);
4286 ASSERT_OK(s);
4287
4288 txn1->UndoGetForUpdate("A");
4289 txn1->UndoGetForUpdate("B");
4290
4291 // Verify that A and B are still locked
4292 txn2 = db->BeginTransaction(write_options, txn_options);
4293 s = txn2->Put("A", "a4");
4294 ASSERT_TRUE(s.IsTimedOut());
4295 s = txn2->Put("B", "b4");
4296 ASSERT_TRUE(s.IsTimedOut());
4297
4298 ASSERT_OK(txn1->Rollback());
4299 delete txn1;
4300
4301 // Verify that A and B are no longer locked
4302 s = txn2->Put("A", "a5");
4303 ASSERT_OK(s);
4304 s = txn2->Put("B", "b5");
4305 ASSERT_OK(s);
4306 s = txn2->Commit();
4307 delete txn2;
4308 ASSERT_OK(s);
4309
4310 txn1 = db->BeginTransaction(write_options, txn_options);
4311
4312 s = txn1->GetForUpdate(read_options, "A", &value);
4313 ASSERT_OK(s);
4314 s = txn1->GetForUpdate(read_options, "A", &value);
4315 ASSERT_OK(s);
4316 s = txn1->GetForUpdate(read_options, "C", &value);
4317 ASSERT_TRUE(s.IsNotFound());
4318 s = txn1->GetForUpdate(read_options, "A", &value);
4319 ASSERT_OK(s);
4320 s = txn1->GetForUpdate(read_options, "C", &value);
4321 ASSERT_TRUE(s.IsNotFound());
4322 s = txn1->GetForUpdate(read_options, "B", &value);
4323 ASSERT_OK(s);
4324 s = txn1->Put("B", "b5");
4325 s = txn1->GetForUpdate(read_options, "B", &value);
4326 ASSERT_OK(s);
4327
4328 txn1->UndoGetForUpdate("A");
4329 txn1->UndoGetForUpdate("B");
4330 txn1->UndoGetForUpdate("C");
4331 txn1->UndoGetForUpdate("X");
4332
4333 // Verify A,B,C are locked
4334 txn2 = db->BeginTransaction(write_options, txn_options);
4335 s = txn2->Put("A", "a6");
4336 ASSERT_TRUE(s.IsTimedOut());
4337 s = txn2->Delete("B");
4338 ASSERT_TRUE(s.IsTimedOut());
4339 s = txn2->Put("C", "c6");
4340 ASSERT_TRUE(s.IsTimedOut());
4341 s = txn2->Put("X", "x6");
4342 ASSERT_OK(s);
4343
4344 txn1->UndoGetForUpdate("A");
4345 txn1->UndoGetForUpdate("B");
4346 txn1->UndoGetForUpdate("C");
4347 txn1->UndoGetForUpdate("X");
4348
4349 // Verify A,B are locked and C is not
4350 s = txn2->Put("A", "a6");
4351 ASSERT_TRUE(s.IsTimedOut());
4352 s = txn2->Delete("B");
4353 ASSERT_TRUE(s.IsTimedOut());
4354 s = txn2->Put("C", "c6");
4355 ASSERT_OK(s);
4356 s = txn2->Put("X", "x6");
4357 ASSERT_OK(s);
4358
4359 txn1->UndoGetForUpdate("A");
4360 txn1->UndoGetForUpdate("B");
4361 txn1->UndoGetForUpdate("C");
4362 txn1->UndoGetForUpdate("X");
4363
4364 // Verify B is locked and A and C are not
4365 s = txn2->Put("A", "a7");
4366 ASSERT_OK(s);
4367 s = txn2->Delete("B");
4368 ASSERT_TRUE(s.IsTimedOut());
4369 s = txn2->Put("C", "c7");
4370 ASSERT_OK(s);
4371 s = txn2->Put("X", "x7");
4372 ASSERT_OK(s);
4373
4374 s = txn2->Commit();
4375 ASSERT_OK(s);
4376 delete txn2;
4377
4378 s = txn1->Commit();
4379 ASSERT_OK(s);
4380 delete txn1;
4381 }
4382
TEST_P(TransactionTest,UndoGetForUpdateTest2)4383 TEST_P(TransactionTest, UndoGetForUpdateTest2) {
4384 WriteOptions write_options;
4385 ReadOptions read_options;
4386 TransactionOptions txn_options;
4387 std::string value;
4388 Status s;
4389
4390 s = db->Put(write_options, "A", "");
4391 ASSERT_OK(s);
4392
4393 txn_options.lock_timeout = 1; // 1 ms
4394 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4395 ASSERT_TRUE(txn1);
4396
4397 s = txn1->GetForUpdate(read_options, "A", &value);
4398 ASSERT_OK(s);
4399 s = txn1->GetForUpdate(read_options, "B", &value);
4400 ASSERT_TRUE(s.IsNotFound());
4401
4402 s = txn1->Put("F", "f");
4403 ASSERT_OK(s);
4404
4405 txn1->SetSavePoint(); // 1
4406
4407 txn1->UndoGetForUpdate("A");
4408
4409 s = txn1->GetForUpdate(read_options, "C", &value);
4410 ASSERT_TRUE(s.IsNotFound());
4411 s = txn1->GetForUpdate(read_options, "D", &value);
4412 ASSERT_TRUE(s.IsNotFound());
4413
4414 s = txn1->Put("E", "e");
4415 ASSERT_OK(s);
4416 s = txn1->GetForUpdate(read_options, "E", &value);
4417 ASSERT_OK(s);
4418
4419 s = txn1->GetForUpdate(read_options, "F", &value);
4420 ASSERT_OK(s);
4421
4422 // Verify A,B,C,D,E,F are still locked
4423 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4424 s = txn2->Put("A", "a1");
4425 ASSERT_TRUE(s.IsTimedOut());
4426 s = txn2->Put("B", "b1");
4427 ASSERT_TRUE(s.IsTimedOut());
4428 s = txn2->Put("C", "c1");
4429 ASSERT_TRUE(s.IsTimedOut());
4430 s = txn2->Put("D", "d1");
4431 ASSERT_TRUE(s.IsTimedOut());
4432 s = txn2->Put("E", "e1");
4433 ASSERT_TRUE(s.IsTimedOut());
4434 s = txn2->Put("F", "f1");
4435 ASSERT_TRUE(s.IsTimedOut());
4436
4437 txn1->UndoGetForUpdate("C");
4438 txn1->UndoGetForUpdate("E");
4439
4440 // Verify A,B,D,E,F are still locked and C is not.
4441 s = txn2->Put("A", "a2");
4442 ASSERT_TRUE(s.IsTimedOut());
4443 s = txn2->Put("B", "b2");
4444 ASSERT_TRUE(s.IsTimedOut());
4445 s = txn2->Put("D", "d2");
4446 ASSERT_TRUE(s.IsTimedOut());
4447 s = txn2->Put("E", "e2");
4448 ASSERT_TRUE(s.IsTimedOut());
4449 s = txn2->Put("F", "f2");
4450 ASSERT_TRUE(s.IsTimedOut());
4451 s = txn2->Put("C", "c2");
4452 ASSERT_OK(s);
4453
4454 txn1->SetSavePoint(); // 2
4455
4456 s = txn1->Put("H", "h");
4457 ASSERT_OK(s);
4458
4459 txn1->UndoGetForUpdate("A");
4460 txn1->UndoGetForUpdate("B");
4461 txn1->UndoGetForUpdate("C");
4462 txn1->UndoGetForUpdate("D");
4463 txn1->UndoGetForUpdate("E");
4464 txn1->UndoGetForUpdate("F");
4465 txn1->UndoGetForUpdate("G");
4466 txn1->UndoGetForUpdate("H");
4467
4468 // Verify A,B,D,E,F,H are still locked and C,G are not.
4469 s = txn2->Put("A", "a3");
4470 ASSERT_TRUE(s.IsTimedOut());
4471 s = txn2->Put("B", "b3");
4472 ASSERT_TRUE(s.IsTimedOut());
4473 s = txn2->Put("D", "d3");
4474 ASSERT_TRUE(s.IsTimedOut());
4475 s = txn2->Put("E", "e3");
4476 ASSERT_TRUE(s.IsTimedOut());
4477 s = txn2->Put("F", "f3");
4478 ASSERT_TRUE(s.IsTimedOut());
4479 s = txn2->Put("H", "h3");
4480 ASSERT_TRUE(s.IsTimedOut());
4481 s = txn2->Put("C", "c3");
4482 ASSERT_OK(s);
4483 s = txn2->Put("G", "g3");
4484 ASSERT_OK(s);
4485
4486 ASSERT_OK(txn1->RollbackToSavePoint()); // rollback to 2
4487
4488 // Verify A,B,D,E,F are still locked and C,G,H are not.
4489 s = txn2->Put("A", "a3");
4490 ASSERT_TRUE(s.IsTimedOut());
4491 s = txn2->Put("B", "b3");
4492 ASSERT_TRUE(s.IsTimedOut());
4493 s = txn2->Put("D", "d3");
4494 ASSERT_TRUE(s.IsTimedOut());
4495 s = txn2->Put("E", "e3");
4496 ASSERT_TRUE(s.IsTimedOut());
4497 s = txn2->Put("F", "f3");
4498 ASSERT_TRUE(s.IsTimedOut());
4499 s = txn2->Put("C", "c3");
4500 ASSERT_OK(s);
4501 s = txn2->Put("G", "g3");
4502 ASSERT_OK(s);
4503 s = txn2->Put("H", "h3");
4504 ASSERT_OK(s);
4505
4506 txn1->UndoGetForUpdate("A");
4507 txn1->UndoGetForUpdate("B");
4508 txn1->UndoGetForUpdate("C");
4509 txn1->UndoGetForUpdate("D");
4510 txn1->UndoGetForUpdate("E");
4511 txn1->UndoGetForUpdate("F");
4512 txn1->UndoGetForUpdate("G");
4513 txn1->UndoGetForUpdate("H");
4514
4515 // Verify A,B,E,F are still locked and C,D,G,H are not.
4516 s = txn2->Put("A", "a3");
4517 ASSERT_TRUE(s.IsTimedOut());
4518 s = txn2->Put("B", "b3");
4519 ASSERT_TRUE(s.IsTimedOut());
4520 s = txn2->Put("E", "e3");
4521 ASSERT_TRUE(s.IsTimedOut());
4522 s = txn2->Put("F", "f3");
4523 ASSERT_TRUE(s.IsTimedOut());
4524 s = txn2->Put("C", "c3");
4525 ASSERT_OK(s);
4526 s = txn2->Put("D", "d3");
4527 ASSERT_OK(s);
4528 s = txn2->Put("G", "g3");
4529 ASSERT_OK(s);
4530 s = txn2->Put("H", "h3");
4531 ASSERT_OK(s);
4532
4533 ASSERT_OK(txn1->RollbackToSavePoint()); // rollback to 1
4534
4535 // Verify A,B,F are still locked and C,D,E,G,H are not.
4536 s = txn2->Put("A", "a3");
4537 ASSERT_TRUE(s.IsTimedOut());
4538 s = txn2->Put("B", "b3");
4539 ASSERT_TRUE(s.IsTimedOut());
4540 s = txn2->Put("F", "f3");
4541 ASSERT_TRUE(s.IsTimedOut());
4542 s = txn2->Put("C", "c3");
4543 ASSERT_OK(s);
4544 s = txn2->Put("D", "d3");
4545 ASSERT_OK(s);
4546 s = txn2->Put("E", "e3");
4547 ASSERT_OK(s);
4548 s = txn2->Put("G", "g3");
4549 ASSERT_OK(s);
4550 s = txn2->Put("H", "h3");
4551 ASSERT_OK(s);
4552
4553 txn1->UndoGetForUpdate("A");
4554 txn1->UndoGetForUpdate("B");
4555 txn1->UndoGetForUpdate("C");
4556 txn1->UndoGetForUpdate("D");
4557 txn1->UndoGetForUpdate("E");
4558 txn1->UndoGetForUpdate("F");
4559 txn1->UndoGetForUpdate("G");
4560 txn1->UndoGetForUpdate("H");
4561
4562 // Verify F is still locked and A,B,C,D,E,G,H are not.
4563 s = txn2->Put("F", "f3");
4564 ASSERT_TRUE(s.IsTimedOut());
4565 s = txn2->Put("A", "a3");
4566 ASSERT_OK(s);
4567 s = txn2->Put("B", "b3");
4568 ASSERT_OK(s);
4569 s = txn2->Put("C", "c3");
4570 ASSERT_OK(s);
4571 s = txn2->Put("D", "d3");
4572 ASSERT_OK(s);
4573 s = txn2->Put("E", "e3");
4574 ASSERT_OK(s);
4575 s = txn2->Put("G", "g3");
4576 ASSERT_OK(s);
4577 s = txn2->Put("H", "h3");
4578 ASSERT_OK(s);
4579
4580 s = txn1->Commit();
4581 ASSERT_OK(s);
4582 s = txn2->Commit();
4583 ASSERT_OK(s);
4584
4585 delete txn1;
4586 delete txn2;
4587 }
4588
TEST_P(TransactionTest,TimeoutTest)4589 TEST_P(TransactionTest, TimeoutTest) {
4590 WriteOptions write_options;
4591 ReadOptions read_options;
4592 std::string value;
4593 Status s;
4594
4595 delete db;
4596 db = nullptr;
4597
4598 // transaction writes have an infinite timeout,
4599 // but we will override this when we start a txn
4600 // db writes have infinite timeout
4601 txn_db_options.transaction_lock_timeout = -1;
4602 txn_db_options.default_lock_timeout = -1;
4603
4604 s = TransactionDB::Open(options, txn_db_options, dbname, &db);
4605 assert(db != nullptr);
4606 ASSERT_OK(s);
4607
4608 s = db->Put(write_options, "aaa", "aaa");
4609 ASSERT_OK(s);
4610
4611 TransactionOptions txn_options0;
4612 txn_options0.expiration = 100; // 100ms
4613 txn_options0.lock_timeout = 50; // txn timeout no longer infinite
4614 Transaction* txn1 = db->BeginTransaction(write_options, txn_options0);
4615
4616 s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4617 ASSERT_OK(s);
4618
4619 // Conflicts with previous GetForUpdate.
4620 // Since db writes do not have a timeout, this should eventually succeed when
4621 // the transaction expires.
4622 s = db->Put(write_options, "aaa", "xxx");
4623 ASSERT_OK(s);
4624
4625 ASSERT_GE(txn1->GetElapsedTime(),
4626 static_cast<uint64_t>(txn_options0.expiration));
4627
4628 s = txn1->Commit();
4629 ASSERT_TRUE(s.IsExpired()); // expired!
4630
4631 s = db->Get(read_options, "aaa", &value);
4632 ASSERT_OK(s);
4633 ASSERT_EQ("xxx", value);
4634
4635 delete txn1;
4636 delete db;
4637
4638 // transaction writes have 10ms timeout,
4639 // db writes have infinite timeout
4640 txn_db_options.transaction_lock_timeout = 50;
4641 txn_db_options.default_lock_timeout = -1;
4642
4643 s = TransactionDB::Open(options, txn_db_options, dbname, &db);
4644 ASSERT_OK(s);
4645
4646 s = db->Put(write_options, "aaa", "aaa");
4647 ASSERT_OK(s);
4648
4649 TransactionOptions txn_options;
4650 txn_options.expiration = 100; // 100ms
4651 txn1 = db->BeginTransaction(write_options, txn_options);
4652
4653 s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4654 ASSERT_OK(s);
4655
4656 // Conflicts with previous GetForUpdate.
4657 // Since db writes do not have a timeout, this should eventually succeed when
4658 // the transaction expires.
4659 s = db->Put(write_options, "aaa", "xxx");
4660 ASSERT_OK(s);
4661
4662 s = txn1->Commit();
4663 ASSERT_NOK(s); // expired!
4664
4665 s = db->Get(read_options, "aaa", &value);
4666 ASSERT_OK(s);
4667 ASSERT_EQ("xxx", value);
4668
4669 delete txn1;
4670 txn_options.expiration = 6000000; // 100 minutes
4671 txn_options.lock_timeout = 1; // 1ms
4672 txn1 = db->BeginTransaction(write_options, txn_options);
4673 txn1->SetLockTimeout(100);
4674
4675 TransactionOptions txn_options2;
4676 txn_options2.expiration = 10; // 10ms
4677 Transaction* txn2 = db->BeginTransaction(write_options, txn_options2);
4678 ASSERT_OK(s);
4679
4680 s = txn2->Put("a", "2");
4681 ASSERT_OK(s);
4682
4683 // txn1 has a lock timeout longer than txn2's expiration, so it will win
4684 s = txn1->Delete("a");
4685 ASSERT_OK(s);
4686
4687 s = txn1->Commit();
4688 ASSERT_OK(s);
4689
4690 // txn2 should be expired out since txn1 waiting until its timeout expired.
4691 s = txn2->Commit();
4692 ASSERT_TRUE(s.IsExpired());
4693
4694 delete txn1;
4695 delete txn2;
4696 txn_options.expiration = 6000000; // 100 minutes
4697 txn1 = db->BeginTransaction(write_options, txn_options);
4698 txn_options2.expiration = 100000000;
4699 txn2 = db->BeginTransaction(write_options, txn_options2);
4700
4701 s = txn1->Delete("asdf");
4702 ASSERT_OK(s);
4703
4704 // txn2 has a smaller lock timeout than txn1's expiration, so it will time out
4705 s = txn2->Delete("asdf");
4706 ASSERT_TRUE(s.IsTimedOut());
4707 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
4708
4709 s = txn1->Commit();
4710 ASSERT_OK(s);
4711
4712 s = txn2->Put("asdf", "asdf");
4713 ASSERT_OK(s);
4714
4715 s = txn2->Commit();
4716 ASSERT_OK(s);
4717
4718 s = db->Get(read_options, "asdf", &value);
4719 ASSERT_OK(s);
4720 ASSERT_EQ("asdf", value);
4721
4722 delete txn1;
4723 delete txn2;
4724 }
4725
TEST_P(TransactionTest,SingleDeleteTest)4726 TEST_P(TransactionTest, SingleDeleteTest) {
4727 WriteOptions write_options;
4728 ReadOptions read_options;
4729 std::string value;
4730 Status s;
4731
4732 Transaction* txn = db->BeginTransaction(write_options);
4733 ASSERT_TRUE(txn);
4734
4735 s = txn->SingleDelete("A");
4736 ASSERT_OK(s);
4737
4738 s = txn->Get(read_options, "A", &value);
4739 ASSERT_TRUE(s.IsNotFound());
4740
4741 s = txn->Commit();
4742 ASSERT_OK(s);
4743 delete txn;
4744
4745 txn = db->BeginTransaction(write_options);
4746
4747 s = txn->SingleDelete("A");
4748 ASSERT_OK(s);
4749
4750 s = txn->Put("A", "a");
4751 ASSERT_OK(s);
4752
4753 s = txn->Get(read_options, "A", &value);
4754 ASSERT_OK(s);
4755 ASSERT_EQ("a", value);
4756
4757 s = txn->Commit();
4758 ASSERT_OK(s);
4759 delete txn;
4760
4761 s = db->Get(read_options, "A", &value);
4762 ASSERT_OK(s);
4763 ASSERT_EQ("a", value);
4764
4765 txn = db->BeginTransaction(write_options);
4766
4767 s = txn->SingleDelete("A");
4768 ASSERT_OK(s);
4769
4770 s = txn->Get(read_options, "A", &value);
4771 ASSERT_TRUE(s.IsNotFound());
4772
4773 s = txn->Commit();
4774 ASSERT_OK(s);
4775 delete txn;
4776
4777 s = db->Get(read_options, "A", &value);
4778 ASSERT_TRUE(s.IsNotFound());
4779
4780 txn = db->BeginTransaction(write_options);
4781 Transaction* txn2 = db->BeginTransaction(write_options);
4782 txn2->SetSnapshot();
4783
4784 s = txn->Put("A", "a");
4785 ASSERT_OK(s);
4786
4787 s = txn->Put("A", "a2");
4788 ASSERT_OK(s);
4789
4790 s = txn->SingleDelete("A");
4791 ASSERT_OK(s);
4792
4793 s = txn->SingleDelete("B");
4794 ASSERT_OK(s);
4795
4796 // According to db.h, doing a SingleDelete on a key that has been
4797 // overwritten will have undefinied behavior. So it is unclear what the
4798 // result of fetching "A" should be. The current implementation will
4799 // return NotFound in this case.
4800 s = txn->Get(read_options, "A", &value);
4801 ASSERT_TRUE(s.IsNotFound());
4802
4803 s = txn2->Put("B", "b");
4804 ASSERT_TRUE(s.IsTimedOut());
4805 s = txn2->Commit();
4806 ASSERT_OK(s);
4807 delete txn2;
4808
4809 s = txn->Commit();
4810 ASSERT_OK(s);
4811 delete txn;
4812
4813 // According to db.h, doing a SingleDelete on a key that has been
4814 // overwritten will have undefinied behavior. So it is unclear what the
4815 // result of fetching "A" should be. The current implementation will
4816 // return NotFound in this case.
4817 s = db->Get(read_options, "A", &value);
4818 ASSERT_TRUE(s.IsNotFound());
4819
4820 s = db->Get(read_options, "B", &value);
4821 ASSERT_TRUE(s.IsNotFound());
4822 }
4823
TEST_P(TransactionTest,MergeTest)4824 TEST_P(TransactionTest, MergeTest) {
4825 WriteOptions write_options;
4826 ReadOptions read_options;
4827 std::string value;
4828 Status s;
4829
4830 Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
4831 ASSERT_TRUE(txn);
4832
4833 s = db->Put(write_options, "A", "a0");
4834 ASSERT_OK(s);
4835
4836 s = txn->Merge("A", "1");
4837 ASSERT_OK(s);
4838
4839 s = txn->Merge("A", "2");
4840 ASSERT_OK(s);
4841
4842 s = txn->Get(read_options, "A", &value);
4843 ASSERT_OK(s);
4844 ASSERT_EQ("a0,1,2", value);
4845
4846 s = txn->Put("A", "a");
4847 ASSERT_OK(s);
4848
4849 s = txn->Get(read_options, "A", &value);
4850 ASSERT_OK(s);
4851 ASSERT_EQ("a", value);
4852
4853 s = txn->Merge("A", "3");
4854 ASSERT_OK(s);
4855
4856 s = txn->Get(read_options, "A", &value);
4857 ASSERT_OK(s);
4858 ASSERT_EQ("a,3", value);
4859
4860 TransactionOptions txn_options;
4861 txn_options.lock_timeout = 1; // 1 ms
4862 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4863 ASSERT_TRUE(txn2);
4864
4865 // verify that txn has "A" locked
4866 s = txn2->Merge("A", "4");
4867 ASSERT_TRUE(s.IsTimedOut());
4868
4869 s = txn2->Commit();
4870 ASSERT_OK(s);
4871 delete txn2;
4872
4873 s = txn->Commit();
4874 ASSERT_OK(s);
4875 delete txn;
4876
4877 s = db->Get(read_options, "A", &value);
4878 ASSERT_OK(s);
4879 ASSERT_EQ("a,3", value);
4880 }
4881
TEST_P(TransactionTest,DeleteRangeSupportTest)4882 TEST_P(TransactionTest, DeleteRangeSupportTest) {
4883 // The `DeleteRange()` API is banned everywhere.
4884 ASSERT_TRUE(
4885 db->DeleteRange(WriteOptions(), db->DefaultColumnFamily(), "a", "b")
4886 .IsNotSupported());
4887
4888 // But range deletions can be added via the `Write()` API by specifying the
4889 // proper flags to promise there are no conflicts according to the DB type
4890 // (see `TransactionDB::DeleteRange()` API doc for details).
4891 for (bool skip_concurrency_control : {false, true}) {
4892 for (bool skip_duplicate_key_check : {false, true}) {
4893 ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
4894 WriteBatch wb;
4895 ASSERT_OK(wb.DeleteRange("a", "b"));
4896 TransactionDBWriteOptimizations flags;
4897 flags.skip_concurrency_control = skip_concurrency_control;
4898 flags.skip_duplicate_key_check = skip_duplicate_key_check;
4899 Status s = db->Write(WriteOptions(), flags, &wb);
4900 std::string value;
4901 switch (txn_db_options.write_policy) {
4902 case WRITE_COMMITTED:
4903 if (skip_concurrency_control) {
4904 ASSERT_OK(s);
4905 ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
4906 } else {
4907 ASSERT_NOK(s);
4908 ASSERT_OK(db->Get(ReadOptions(), "a", &value));
4909 }
4910 break;
4911 case WRITE_PREPARED:
4912 // Intentional fall-through
4913 case WRITE_UNPREPARED:
4914 if (skip_concurrency_control && skip_duplicate_key_check) {
4915 ASSERT_OK(s);
4916 ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
4917 } else {
4918 ASSERT_NOK(s);
4919 ASSERT_OK(db->Get(ReadOptions(), "a", &value));
4920 }
4921 break;
4922 }
4923 // Without any promises from the user, range deletion via other `Write()`
4924 // APIs are still banned.
4925 ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
4926 ASSERT_NOK(db->Write(WriteOptions(), &wb));
4927 ASSERT_OK(db->Get(ReadOptions(), "a", &value));
4928 }
4929 }
4930 }
4931
TEST_P(TransactionTest,DeferSnapshotTest)4932 TEST_P(TransactionTest, DeferSnapshotTest) {
4933 WriteOptions write_options;
4934 ReadOptions read_options;
4935 std::string value;
4936 Status s;
4937
4938 s = db->Put(write_options, "A", "a0");
4939 ASSERT_OK(s);
4940
4941 Transaction* txn1 = db->BeginTransaction(write_options);
4942 Transaction* txn2 = db->BeginTransaction(write_options);
4943
4944 txn1->SetSnapshotOnNextOperation();
4945 auto snapshot = txn1->GetSnapshot();
4946 ASSERT_FALSE(snapshot);
4947
4948 s = txn2->Put("A", "a2");
4949 ASSERT_OK(s);
4950 s = txn2->Commit();
4951 ASSERT_OK(s);
4952 delete txn2;
4953
4954 s = txn1->GetForUpdate(read_options, "A", &value);
4955 // Should not conflict with txn2 since snapshot wasn't set until
4956 // GetForUpdate was called.
4957 ASSERT_OK(s);
4958 ASSERT_EQ("a2", value);
4959
4960 s = txn1->Put("A", "a1");
4961 ASSERT_OK(s);
4962
4963 s = db->Put(write_options, "B", "b0");
4964 ASSERT_OK(s);
4965
4966 // Cannot lock B since it was written after the snapshot was set
4967 s = txn1->Put("B", "b1");
4968 ASSERT_TRUE(s.IsBusy());
4969
4970 s = txn1->Commit();
4971 ASSERT_OK(s);
4972 delete txn1;
4973
4974 s = db->Get(read_options, "A", &value);
4975 ASSERT_OK(s);
4976 ASSERT_EQ("a1", value);
4977
4978 s = db->Get(read_options, "B", &value);
4979 ASSERT_OK(s);
4980 ASSERT_EQ("b0", value);
4981 }
4982
TEST_P(TransactionTest,DeferSnapshotTest2)4983 TEST_P(TransactionTest, DeferSnapshotTest2) {
4984 WriteOptions write_options;
4985 ReadOptions read_options, snapshot_read_options;
4986 std::string value;
4987 Status s;
4988
4989 Transaction* txn1 = db->BeginTransaction(write_options);
4990
4991 txn1->SetSnapshot();
4992
4993 s = txn1->Put("A", "a1");
4994 ASSERT_OK(s);
4995
4996 s = db->Put(write_options, "C", "c0");
4997 ASSERT_OK(s);
4998 s = db->Put(write_options, "D", "d0");
4999 ASSERT_OK(s);
5000
5001 snapshot_read_options.snapshot = txn1->GetSnapshot();
5002
5003 txn1->SetSnapshotOnNextOperation();
5004
5005 s = txn1->Get(snapshot_read_options, "C", &value);
5006 // Snapshot was set before C was written
5007 ASSERT_TRUE(s.IsNotFound());
5008 s = txn1->Get(snapshot_read_options, "D", &value);
5009 // Snapshot was set before D was written
5010 ASSERT_TRUE(s.IsNotFound());
5011
5012 // Snapshot should not have changed yet.
5013 snapshot_read_options.snapshot = txn1->GetSnapshot();
5014
5015 s = txn1->Get(snapshot_read_options, "C", &value);
5016 // Snapshot was set before C was written
5017 ASSERT_TRUE(s.IsNotFound());
5018 s = txn1->Get(snapshot_read_options, "D", &value);
5019 // Snapshot was set before D was written
5020 ASSERT_TRUE(s.IsNotFound());
5021
5022 s = txn1->GetForUpdate(read_options, "C", &value);
5023 ASSERT_OK(s);
5024 ASSERT_EQ("c0", value);
5025
5026 s = db->Put(write_options, "D", "d00");
5027 ASSERT_OK(s);
5028
5029 // Snapshot is now set
5030 snapshot_read_options.snapshot = txn1->GetSnapshot();
5031 s = txn1->Get(snapshot_read_options, "D", &value);
5032 ASSERT_OK(s);
5033 ASSERT_EQ("d0", value);
5034
5035 s = txn1->Commit();
5036 ASSERT_OK(s);
5037 delete txn1;
5038 }
5039
TEST_P(TransactionTest,DeferSnapshotSavePointTest)5040 TEST_P(TransactionTest, DeferSnapshotSavePointTest) {
5041 WriteOptions write_options;
5042 ReadOptions read_options, snapshot_read_options;
5043 std::string value;
5044 Status s;
5045
5046 Transaction* txn1 = db->BeginTransaction(write_options);
5047
5048 txn1->SetSavePoint(); // 1
5049
5050 s = db->Put(write_options, "T", "1");
5051 ASSERT_OK(s);
5052
5053 txn1->SetSnapshotOnNextOperation();
5054
5055 s = db->Put(write_options, "T", "2");
5056 ASSERT_OK(s);
5057
5058 txn1->SetSavePoint(); // 2
5059
5060 s = db->Put(write_options, "T", "3");
5061 ASSERT_OK(s);
5062
5063 s = txn1->Put("A", "a");
5064 ASSERT_OK(s);
5065
5066 txn1->SetSavePoint(); // 3
5067
5068 s = db->Put(write_options, "T", "4");
5069 ASSERT_OK(s);
5070
5071 txn1->SetSnapshot();
5072 txn1->SetSnapshotOnNextOperation();
5073
5074 txn1->SetSavePoint(); // 4
5075
5076 s = db->Put(write_options, "T", "5");
5077 ASSERT_OK(s);
5078
5079 snapshot_read_options.snapshot = txn1->GetSnapshot();
5080 s = txn1->Get(snapshot_read_options, "T", &value);
5081 ASSERT_OK(s);
5082 ASSERT_EQ("4", value);
5083
5084 s = txn1->Put("A", "a1");
5085 ASSERT_OK(s);
5086
5087 snapshot_read_options.snapshot = txn1->GetSnapshot();
5088 s = txn1->Get(snapshot_read_options, "T", &value);
5089 ASSERT_OK(s);
5090 ASSERT_EQ("5", value);
5091
5092 s = txn1->RollbackToSavePoint(); // Rollback to 4
5093 ASSERT_OK(s);
5094
5095 snapshot_read_options.snapshot = txn1->GetSnapshot();
5096 s = txn1->Get(snapshot_read_options, "T", &value);
5097 ASSERT_OK(s);
5098 ASSERT_EQ("4", value);
5099
5100 s = txn1->RollbackToSavePoint(); // Rollback to 3
5101 ASSERT_OK(s);
5102
5103 snapshot_read_options.snapshot = txn1->GetSnapshot();
5104 s = txn1->Get(snapshot_read_options, "T", &value);
5105 ASSERT_OK(s);
5106 ASSERT_EQ("3", value);
5107
5108 s = txn1->Get(read_options, "T", &value);
5109 ASSERT_OK(s);
5110 ASSERT_EQ("5", value);
5111
5112 s = txn1->RollbackToSavePoint(); // Rollback to 2
5113 ASSERT_OK(s);
5114
5115 snapshot_read_options.snapshot = txn1->GetSnapshot();
5116 ASSERT_FALSE(snapshot_read_options.snapshot);
5117 s = txn1->Get(snapshot_read_options, "T", &value);
5118 ASSERT_OK(s);
5119 ASSERT_EQ("5", value);
5120
5121 s = txn1->Delete("A");
5122 ASSERT_OK(s);
5123
5124 snapshot_read_options.snapshot = txn1->GetSnapshot();
5125 ASSERT_TRUE(snapshot_read_options.snapshot);
5126 s = txn1->Get(snapshot_read_options, "T", &value);
5127 ASSERT_OK(s);
5128 ASSERT_EQ("5", value);
5129
5130 s = txn1->RollbackToSavePoint(); // Rollback to 1
5131 ASSERT_OK(s);
5132
5133 s = txn1->Delete("A");
5134 ASSERT_OK(s);
5135
5136 snapshot_read_options.snapshot = txn1->GetSnapshot();
5137 ASSERT_FALSE(snapshot_read_options.snapshot);
5138 s = txn1->Get(snapshot_read_options, "T", &value);
5139 ASSERT_OK(s);
5140 ASSERT_EQ("5", value);
5141
5142 s = txn1->Commit();
5143 ASSERT_OK(s);
5144
5145 delete txn1;
5146 }
5147
TEST_P(TransactionTest,SetSnapshotOnNextOperationWithNotification)5148 TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
5149 WriteOptions write_options;
5150 ReadOptions read_options;
5151 std::string value;
5152
5153 class Notifier : public TransactionNotifier {
5154 private:
5155 const Snapshot** snapshot_ptr_;
5156
5157 public:
5158 explicit Notifier(const Snapshot** snapshot_ptr)
5159 : snapshot_ptr_(snapshot_ptr) {}
5160
5161 void SnapshotCreated(const Snapshot* newSnapshot) override {
5162 *snapshot_ptr_ = newSnapshot;
5163 }
5164 };
5165
5166 std::shared_ptr<Notifier> notifier =
5167 std::make_shared<Notifier>(&read_options.snapshot);
5168 Status s;
5169
5170 s = db->Put(write_options, "B", "0");
5171 ASSERT_OK(s);
5172
5173 Transaction* txn1 = db->BeginTransaction(write_options);
5174
5175 txn1->SetSnapshotOnNextOperation(notifier);
5176 ASSERT_FALSE(read_options.snapshot);
5177
5178 s = db->Put(write_options, "B", "1");
5179 ASSERT_OK(s);
5180
5181 // A Get does not generate the snapshot
5182 s = txn1->Get(read_options, "B", &value);
5183 ASSERT_OK(s);
5184 ASSERT_FALSE(read_options.snapshot);
5185 ASSERT_EQ(value, "1");
5186
5187 // Any other operation does
5188 s = txn1->Put("A", "0");
5189 ASSERT_OK(s);
5190
5191 // Now change "B".
5192 s = db->Put(write_options, "B", "2");
5193 ASSERT_OK(s);
5194
5195 // The original value should still be read
5196 s = txn1->Get(read_options, "B", &value);
5197 ASSERT_OK(s);
5198 ASSERT_TRUE(read_options.snapshot);
5199 ASSERT_EQ(value, "1");
5200
5201 s = txn1->Commit();
5202 ASSERT_OK(s);
5203
5204 delete txn1;
5205 }
5206
TEST_P(TransactionTest,ClearSnapshotTest)5207 TEST_P(TransactionTest, ClearSnapshotTest) {
5208 WriteOptions write_options;
5209 ReadOptions read_options, snapshot_read_options;
5210 std::string value;
5211 Status s;
5212
5213 s = db->Put(write_options, "foo", "0");
5214 ASSERT_OK(s);
5215
5216 Transaction* txn = db->BeginTransaction(write_options);
5217 ASSERT_TRUE(txn);
5218
5219 s = db->Put(write_options, "foo", "1");
5220 ASSERT_OK(s);
5221
5222 snapshot_read_options.snapshot = txn->GetSnapshot();
5223 ASSERT_FALSE(snapshot_read_options.snapshot);
5224
5225 // No snapshot created yet
5226 s = txn->Get(snapshot_read_options, "foo", &value);
5227 ASSERT_EQ(value, "1");
5228
5229 txn->SetSnapshot();
5230 snapshot_read_options.snapshot = txn->GetSnapshot();
5231 ASSERT_TRUE(snapshot_read_options.snapshot);
5232
5233 s = db->Put(write_options, "foo", "2");
5234 ASSERT_OK(s);
5235
5236 // Snapshot was created before change to '2'
5237 s = txn->Get(snapshot_read_options, "foo", &value);
5238 ASSERT_EQ(value, "1");
5239
5240 txn->ClearSnapshot();
5241 snapshot_read_options.snapshot = txn->GetSnapshot();
5242 ASSERT_FALSE(snapshot_read_options.snapshot);
5243
5244 // Snapshot has now been cleared
5245 s = txn->Get(snapshot_read_options, "foo", &value);
5246 ASSERT_EQ(value, "2");
5247
5248 s = txn->Commit();
5249 ASSERT_OK(s);
5250
5251 delete txn;
5252 }
5253
TEST_P(TransactionTest,ToggleAutoCompactionTest)5254 TEST_P(TransactionTest, ToggleAutoCompactionTest) {
5255 Status s;
5256
5257 ColumnFamilyHandle *cfa, *cfb;
5258 ColumnFamilyOptions cf_options;
5259
5260 // Create 2 new column families
5261 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
5262 ASSERT_OK(s);
5263 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
5264 ASSERT_OK(s);
5265
5266 delete cfa;
5267 delete cfb;
5268 delete db;
5269
5270 // open DB with three column families
5271 std::vector<ColumnFamilyDescriptor> column_families;
5272 // have to open default column family
5273 column_families.push_back(
5274 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
5275 // open the new column families
5276 column_families.push_back(
5277 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
5278 column_families.push_back(
5279 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
5280
5281 ColumnFamilyOptions* cf_opt_default = &column_families[0].options;
5282 ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options;
5283 ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options;
5284 cf_opt_default->disable_auto_compactions = false;
5285 cf_opt_cfa->disable_auto_compactions = true;
5286 cf_opt_cfb->disable_auto_compactions = false;
5287
5288 std::vector<ColumnFamilyHandle*> handles;
5289
5290 s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
5291 &handles, &db);
5292 ASSERT_OK(s);
5293
5294 auto cfh_default = static_cast_with_check<ColumnFamilyHandleImpl>(handles[0]);
5295 auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions();
5296
5297 auto cfh_a = static_cast_with_check<ColumnFamilyHandleImpl>(handles[1]);
5298 auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions();
5299
5300 auto cfh_b = static_cast_with_check<ColumnFamilyHandleImpl>(handles[2]);
5301 auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions();
5302
5303 ASSERT_EQ(opt_default.disable_auto_compactions, false);
5304 ASSERT_EQ(opt_a.disable_auto_compactions, true);
5305 ASSERT_EQ(opt_b.disable_auto_compactions, false);
5306
5307 for (auto handle : handles) {
5308 delete handle;
5309 }
5310 }
5311
TEST_P(TransactionStressTest,ExpiredTransactionDataRace1)5312 TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) {
5313 // In this test, txn1 should succeed committing,
5314 // as the callback is called after txn1 starts committing.
5315 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5316 {{"TransactionTest::ExpirableTransactionDataRace:1"}});
5317 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5318 "TransactionTest::ExpirableTransactionDataRace:1", [&](void* /*arg*/) {
5319 WriteOptions write_options;
5320 TransactionOptions txn_options;
5321
5322 // Force txn1 to expire
5323 /* sleep override */
5324 std::this_thread::sleep_for(std::chrono::milliseconds(1500));
5325
5326 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
5327 Status s;
5328 s = txn2->Put("X", "2");
5329 ASSERT_TRUE(s.IsTimedOut());
5330 s = txn2->Commit();
5331 ASSERT_OK(s);
5332 delete txn2;
5333 });
5334
5335 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5336
5337 WriteOptions write_options;
5338 TransactionOptions txn_options;
5339
5340 txn_options.expiration = 1000; // 1 second
5341 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
5342
5343 Status s;
5344 s = txn1->Put("X", "1");
5345 ASSERT_OK(s);
5346 s = txn1->Commit();
5347 ASSERT_OK(s);
5348
5349 ReadOptions read_options;
5350 string value;
5351 s = db->Get(read_options, "X", &value);
5352 ASSERT_OK(s);
5353 ASSERT_EQ("1", value);
5354
5355 delete txn1;
5356 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5357 }
5358
5359 #ifndef ROCKSDB_VALGRIND_RUN
5360 namespace {
5361 // cmt_delay_ms is the delay between prepare and commit
5362 // first_id is the id of the first transaction
TransactionStressTestInserter(TransactionDB * db,const size_t num_transactions,const size_t num_sets,const size_t num_keys_per_set,Random64 * rand,const uint64_t cmt_delay_ms=0,const uint64_t first_id=0)5363 Status TransactionStressTestInserter(
5364 TransactionDB* db, const size_t num_transactions, const size_t num_sets,
5365 const size_t num_keys_per_set, Random64* rand,
5366 const uint64_t cmt_delay_ms = 0, const uint64_t first_id = 0) {
5367 WriteOptions write_options;
5368 ReadOptions read_options;
5369 TransactionOptions txn_options;
5370 if (rand->OneIn(2)) {
5371 txn_options.use_only_the_last_commit_time_batch_for_recovery = true;
5372 }
5373 // Inside the inserter we might also retake the snapshot. We do both since two
5374 // separte functions are engaged for each.
5375 txn_options.set_snapshot = rand->OneIn(2);
5376
5377 RandomTransactionInserter inserter(
5378 rand, write_options, read_options, num_keys_per_set,
5379 static_cast<uint16_t>(num_sets), cmt_delay_ms, first_id);
5380
5381 for (size_t t = 0; t < num_transactions; t++) {
5382 bool success = inserter.TransactionDBInsert(db, txn_options);
5383 if (!success) {
5384 // unexpected failure
5385 return inserter.GetLastStatus();
5386 }
5387 }
5388 inserter.GetLastStatus().PermitUncheckedError();
5389
5390 // Make sure at least some of the transactions succeeded. It's ok if
5391 // some failed due to write-conflicts.
5392 if (num_transactions != 1 &&
5393 inserter.GetFailureCount() > num_transactions / 2) {
5394 return Status::TryAgain("Too many transactions failed! " +
5395 std::to_string(inserter.GetFailureCount()) + " / " +
5396 std::to_string(num_transactions));
5397 }
5398
5399 return Status::OK();
5400 }
5401 } // namespace
5402
5403 // Worker threads add a number to a key from each set of keys. The checker
5404 // threads verify that the sum of all keys in each set are equal.
TEST_P(MySQLStyleTransactionTest,TransactionStressTest)5405 TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
5406 // Small write buffer to trigger more compactions
5407 options.write_buffer_size = 1024;
5408 ASSERT_OK(ReOpenNoDelete());
5409 constexpr size_t num_workers = 4; // worker threads count
5410 constexpr size_t num_checkers = 2; // checker threads count
5411 constexpr size_t num_slow_checkers = 2; // checker threads emulating backups
5412 constexpr size_t num_slow_workers = 1; // slow worker threads count
5413 constexpr size_t num_transactions_per_thread = 10000;
5414 constexpr uint16_t num_sets = 3;
5415 constexpr size_t num_keys_per_set = 100;
5416 // Setting the key-space to be 100 keys should cause enough write-conflicts
5417 // to make this test interesting.
5418
5419 std::vector<port::Thread> threads;
5420 std::atomic<uint32_t> finished = {0};
5421 constexpr bool TAKE_SNAPSHOT = true;
5422 uint64_t time_seed = env->NowMicros();
5423 printf("time_seed is %" PRIu64 "\n", time_seed); // would help to reproduce
5424
5425 std::function<void()> call_inserter = [&] {
5426 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5427 Random64 rand(time_seed * thd_seed);
5428 ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread,
5429 num_sets, num_keys_per_set, &rand));
5430 finished++;
5431 };
5432 std::function<void()> call_checker = [&] {
5433 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5434 Random64 rand(time_seed * thd_seed);
5435 // Verify that data is consistent
5436 while (finished < num_workers) {
5437 ASSERT_OK(RandomTransactionInserter::Verify(
5438 db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand));
5439 }
5440 };
5441 std::function<void()> call_slow_checker = [&] {
5442 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5443 Random64 rand(time_seed * thd_seed);
5444 // Verify that data is consistent
5445 while (finished < num_workers) {
5446 uint64_t delay_ms = rand.Uniform(100) + 1;
5447 Status s = RandomTransactionInserter::Verify(
5448 db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand, delay_ms);
5449 ASSERT_OK(s);
5450 }
5451 };
5452 std::function<void()> call_slow_inserter = [&] {
5453 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5454 Random64 rand(time_seed * thd_seed);
5455 uint64_t id = 0;
5456 // Verify that data is consistent
5457 while (finished < num_workers) {
5458 uint64_t delay_ms = rand.Uniform(500) + 1;
5459 ASSERT_OK(TransactionStressTestInserter(db, 1, num_sets, num_keys_per_set,
5460 &rand, delay_ms, id++));
5461 }
5462 };
5463
5464 for (uint32_t i = 0; i < num_workers; i++) {
5465 threads.emplace_back(call_inserter);
5466 }
5467 for (uint32_t i = 0; i < num_checkers; i++) {
5468 threads.emplace_back(call_checker);
5469 }
5470 if (with_slow_threads_) {
5471 for (uint32_t i = 0; i < num_slow_checkers; i++) {
5472 threads.emplace_back(call_slow_checker);
5473 }
5474 for (uint32_t i = 0; i < num_slow_workers; i++) {
5475 threads.emplace_back(call_slow_inserter);
5476 }
5477 }
5478
5479 // Wait for all threads to finish
5480 for (auto& t : threads) {
5481 t.join();
5482 }
5483
5484 // Verify that data is consistent
5485 Status s = RandomTransactionInserter::Verify(db, num_sets, num_keys_per_set,
5486 !TAKE_SNAPSHOT);
5487 ASSERT_OK(s);
5488 }
5489 #endif // ROCKSDB_VALGRIND_RUN
5490
TEST_P(TransactionTest,MemoryLimitTest)5491 TEST_P(TransactionTest, MemoryLimitTest) {
5492 TransactionOptions txn_options;
5493 // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
5494 txn_options.max_write_batch_size = 29;
5495 // Set threshold to unlimited so that the write batch does not get flushed,
5496 // and can hit the memory limit.
5497 txn_options.write_batch_flush_threshold = 0;
5498 std::string value;
5499 Status s;
5500
5501 Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options);
5502 ASSERT_TRUE(txn);
5503
5504 ASSERT_EQ(0, txn->GetNumPuts());
5505 ASSERT_LE(0, txn->GetID());
5506
5507 s = txn->Put(Slice("a"), Slice("...."));
5508 ASSERT_OK(s);
5509 ASSERT_EQ(1, txn->GetNumPuts());
5510
5511 s = txn->Put(Slice("b"), Slice("...."));
5512 ASSERT_OK(s);
5513 ASSERT_EQ(2, txn->GetNumPuts());
5514
5515 s = txn->Put(Slice("b"), Slice("...."));
5516 ASSERT_TRUE(s.IsMemoryLimit());
5517 ASSERT_EQ(2, txn->GetNumPuts());
5518
5519 ASSERT_OK(txn->Rollback());
5520 delete txn;
5521 }
5522
5523 // This test clarifies the existing expectation from the sequence number
5524 // algorithm. It could detect mistakes in updating the code but it is not
5525 // necessarily the one acceptable way. If the algorithm is legitimately changed,
5526 // this unit test should be updated as well.
TEST_P(TransactionStressTest,SeqAdvanceTest)5527 TEST_P(TransactionStressTest, SeqAdvanceTest) {
5528 // TODO(myabandeh): must be test with false before new releases
5529 const bool short_test = true;
5530 WriteOptions wopts;
5531 FlushOptions fopt;
5532
5533 options.disable_auto_compactions = true;
5534 ASSERT_OK(ReOpen());
5535
5536 // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
5537 // of the branches. This is the same as counting a binary number where i-th
5538 // bit represents whether we take branch i in the represented by the number.
5539 const size_t NUM_BRANCHES = short_test ? 6 : 10;
5540 // Helper function that shows if the branch is to be taken in the run
5541 // represented by the number n.
5542 auto branch_do = [&](size_t n, size_t* branch) {
5543 assert(*branch < NUM_BRANCHES);
5544 const size_t filter = static_cast<size_t>(1) << *branch;
5545 return n & filter;
5546 };
5547 const size_t max_n = static_cast<size_t>(1) << NUM_BRANCHES;
5548 for (size_t n = 0; n < max_n; n++) {
5549 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5550 size_t branch = 0;
5551 auto seq = db_impl->GetLatestSequenceNumber();
5552 exp_seq = seq;
5553 txn_t0(0);
5554 seq = db_impl->TEST_GetLastVisibleSequence();
5555 ASSERT_EQ(exp_seq, seq);
5556
5557 if (branch_do(n, &branch)) {
5558 ASSERT_OK(db_impl->Flush(fopt));
5559 seq = db_impl->TEST_GetLastVisibleSequence();
5560 ASSERT_EQ(exp_seq, seq);
5561 }
5562 if (!short_test && branch_do(n, &branch)) {
5563 ASSERT_OK(db_impl->FlushWAL(true));
5564 ASSERT_OK(ReOpenNoDelete());
5565 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5566 seq = db_impl->GetLatestSequenceNumber();
5567 ASSERT_EQ(exp_seq, seq);
5568 }
5569
5570 // Doing it twice might detect some bugs
5571 txn_t0(1);
5572 seq = db_impl->TEST_GetLastVisibleSequence();
5573 ASSERT_EQ(exp_seq, seq);
5574
5575 txn_t1(0);
5576 seq = db_impl->TEST_GetLastVisibleSequence();
5577 ASSERT_EQ(exp_seq, seq);
5578
5579 if (branch_do(n, &branch)) {
5580 ASSERT_OK(db_impl->Flush(fopt));
5581 seq = db_impl->TEST_GetLastVisibleSequence();
5582 ASSERT_EQ(exp_seq, seq);
5583 }
5584 if (!short_test && branch_do(n, &branch)) {
5585 ASSERT_OK(db_impl->FlushWAL(true));
5586 ASSERT_OK(ReOpenNoDelete());
5587 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5588 seq = db_impl->GetLatestSequenceNumber();
5589 ASSERT_EQ(exp_seq, seq);
5590 }
5591
5592 txn_t3(0);
5593 seq = db_impl->TEST_GetLastVisibleSequence();
5594 ASSERT_EQ(exp_seq, seq);
5595
5596 if (branch_do(n, &branch)) {
5597 ASSERT_OK(db_impl->Flush(fopt));
5598 seq = db_impl->TEST_GetLastVisibleSequence();
5599 ASSERT_EQ(exp_seq, seq);
5600 }
5601 if (!short_test && branch_do(n, &branch)) {
5602 ASSERT_OK(db_impl->FlushWAL(true));
5603 ASSERT_OK(ReOpenNoDelete());
5604 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5605 seq = db_impl->GetLatestSequenceNumber();
5606 ASSERT_EQ(exp_seq, seq);
5607 }
5608
5609 txn_t4(0);
5610 seq = db_impl->TEST_GetLastVisibleSequence();
5611
5612 ASSERT_EQ(exp_seq, seq);
5613
5614 if (branch_do(n, &branch)) {
5615 ASSERT_OK(db_impl->Flush(fopt));
5616 seq = db_impl->TEST_GetLastVisibleSequence();
5617 ASSERT_EQ(exp_seq, seq);
5618 }
5619 if (!short_test && branch_do(n, &branch)) {
5620 ASSERT_OK(db_impl->FlushWAL(true));
5621 ASSERT_OK(ReOpenNoDelete());
5622 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5623 seq = db_impl->GetLatestSequenceNumber();
5624 ASSERT_EQ(exp_seq, seq);
5625 }
5626
5627 txn_t2(0);
5628 seq = db_impl->TEST_GetLastVisibleSequence();
5629 ASSERT_EQ(exp_seq, seq);
5630
5631 if (branch_do(n, &branch)) {
5632 ASSERT_OK(db_impl->Flush(fopt));
5633 seq = db_impl->TEST_GetLastVisibleSequence();
5634 ASSERT_EQ(exp_seq, seq);
5635 }
5636 if (!short_test && branch_do(n, &branch)) {
5637 ASSERT_OK(db_impl->FlushWAL(true));
5638 ASSERT_OK(ReOpenNoDelete());
5639 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
5640 seq = db_impl->GetLatestSequenceNumber();
5641 ASSERT_EQ(exp_seq, seq);
5642 }
5643 ASSERT_OK(ReOpen());
5644 }
5645 }
5646
5647 // Verify that the optimization would not compromize the correctness
TEST_P(TransactionTest,Optimizations)5648 TEST_P(TransactionTest, Optimizations) {
5649 size_t comb_cnt = size_t(1) << 2; // 2 is number of optimization vars
5650 for (size_t new_comb = 0; new_comb < comb_cnt; new_comb++) {
5651 TransactionDBWriteOptimizations optimizations;
5652 optimizations.skip_concurrency_control = IsInCombination(0, new_comb);
5653 optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb);
5654
5655 ASSERT_OK(ReOpen());
5656 WriteOptions write_options;
5657 WriteBatch batch;
5658 ASSERT_OK(batch.Put(Slice("k"), Slice("v1")));
5659 ASSERT_OK(db->Write(write_options, &batch));
5660
5661 ReadOptions ropt;
5662 PinnableSlice pinnable_val;
5663 ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "k", &pinnable_val));
5664 ASSERT_TRUE(pinnable_val == ("v1"));
5665 }
5666 }
5667
5668 // A comparator that uses only the first three bytes
5669 class ThreeBytewiseComparator : public Comparator {
5670 public:
ThreeBytewiseComparator()5671 ThreeBytewiseComparator() {}
Name() const5672 const char* Name() const override { return "test.ThreeBytewiseComparator"; }
Compare(const Slice & a,const Slice & b) const5673 int Compare(const Slice& a, const Slice& b) const override {
5674 Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5675 Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5676 return na.compare(nb);
5677 }
Equal(const Slice & a,const Slice & b) const5678 bool Equal(const Slice& a, const Slice& b) const override {
5679 Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5680 Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5681 return na == nb;
5682 }
5683 // These methods below don't seem relevant to this test. Implement them if
5684 // proven othersize.
FindShortestSeparator(std::string * start,const Slice & limit) const5685 void FindShortestSeparator(std::string* start,
5686 const Slice& limit) const override {
5687 const Comparator* bytewise_comp = BytewiseComparator();
5688 bytewise_comp->FindShortestSeparator(start, limit);
5689 }
FindShortSuccessor(std::string * key) const5690 void FindShortSuccessor(std::string* key) const override {
5691 const Comparator* bytewise_comp = BytewiseComparator();
5692 bytewise_comp->FindShortSuccessor(key);
5693 }
5694 };
5695
5696 #ifndef ROCKSDB_VALGRIND_RUN
TEST_P(TransactionTest,GetWithoutSnapshot)5697 TEST_P(TransactionTest, GetWithoutSnapshot) {
5698 WriteOptions write_options;
5699 std::atomic<bool> finish = {false};
5700 ASSERT_OK(db->Put(write_options, "key", "value"));
5701 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
5702 for (int i = 0; i < 100; i++) {
5703 TransactionOptions txn_options;
5704 Transaction* txn = db->BeginTransaction(write_options, txn_options);
5705 ASSERT_OK(txn->SetName("xid"));
5706 ASSERT_OK(txn->Put("key", "overridedvalue"));
5707 ASSERT_OK(txn->Put("key", "value"));
5708 ASSERT_OK(txn->Prepare());
5709 ASSERT_OK(txn->Commit());
5710 delete txn;
5711 }
5712 finish = true;
5713 });
5714 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
5715 while (!finish) {
5716 ReadOptions ropt;
5717 PinnableSlice pinnable_val;
5718 ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val));
5719 ASSERT_TRUE(pinnable_val == ("value"));
5720 }
5721 });
5722 commit_thread.join();
5723 read_thread.join();
5724 }
5725 #endif // ROCKSDB_VALGRIND_RUN
5726
5727 // Test that the transactional db can handle duplicate keys in the write batch
TEST_P(TransactionTest,DuplicateKeys)5728 TEST_P(TransactionTest, DuplicateKeys) {
5729 ColumnFamilyOptions cf_options;
5730 std::string cf_name = "two";
5731 ColumnFamilyHandle* cf_handle = nullptr;
5732 {
5733 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5734 WriteOptions write_options;
5735 WriteBatch batch;
5736 ASSERT_OK(batch.Put(Slice("key"), Slice("value")));
5737 ASSERT_OK(batch.Put(Slice("key2"), Slice("value2")));
5738 // duplicate the keys
5739 ASSERT_OK(batch.Put(Slice("key"), Slice("value3")));
5740 // duplicate the 2nd key. It should not be counted duplicate since a
5741 // sub-patch is cut after the last duplicate.
5742 ASSERT_OK(batch.Put(Slice("key2"), Slice("value4")));
5743 // duplicate the keys but in a different cf. It should not be counted as
5744 // duplicate keys
5745 ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5")));
5746
5747 ASSERT_OK(db->Write(write_options, &batch));
5748
5749 ReadOptions ropt;
5750 PinnableSlice pinnable_val;
5751 auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
5752 ASSERT_OK(s);
5753 ASSERT_TRUE(pinnable_val == ("value3"));
5754 s = db->Get(ropt, db->DefaultColumnFamily(), "key2", &pinnable_val);
5755 ASSERT_OK(s);
5756 ASSERT_TRUE(pinnable_val == ("value4"));
5757 s = db->Get(ropt, cf_handle, "key", &pinnable_val);
5758 ASSERT_OK(s);
5759 ASSERT_TRUE(pinnable_val == ("value5"));
5760
5761 delete cf_handle;
5762 }
5763
5764 // Test with non-bytewise comparator
5765 {
5766 ASSERT_OK(ReOpen());
5767 std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5768 cf_options.comparator = comp_gc.get();
5769 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5770 WriteOptions write_options;
5771 WriteBatch batch;
5772 ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value")));
5773 // The first three bytes are the same, do it must be counted as duplicate
5774 ASSERT_OK(batch.Put(cf_handle, Slice("key2"), Slice("value2")));
5775 // check for 2nd duplicate key in cf with non-default comparator
5776 ASSERT_OK(batch.Put(cf_handle, Slice("key2b"), Slice("value2b")));
5777 ASSERT_OK(db->Write(write_options, &batch));
5778
5779 // The value must be the most recent value for all the keys equal to "key",
5780 // including "key2"
5781 ReadOptions ropt;
5782 PinnableSlice pinnable_val;
5783 ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val));
5784 ASSERT_TRUE(pinnable_val == ("value2b"));
5785
5786 // Test duplicate keys with rollback
5787 TransactionOptions txn_options;
5788 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5789 ASSERT_OK(txn0->SetName("xid"));
5790 ASSERT_OK(txn0->Put(cf_handle, Slice("key3"), Slice("value3")));
5791 ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4")));
5792 ASSERT_OK(txn0->Rollback());
5793 ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val));
5794 ASSERT_TRUE(pinnable_val == ("value2b"));
5795 delete txn0;
5796
5797 delete cf_handle;
5798 cf_options.comparator = BytewiseComparator();
5799 }
5800
5801 for (bool do_prepare : {true, false}) {
5802 for (bool do_rollback : {true, false}) {
5803 for (bool with_commit_batch : {true, false}) {
5804 if (with_commit_batch && !do_prepare) {
5805 continue;
5806 }
5807 if (with_commit_batch && do_rollback) {
5808 continue;
5809 }
5810 ASSERT_OK(ReOpen());
5811 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5812 TransactionOptions txn_options;
5813 txn_options.use_only_the_last_commit_time_batch_for_recovery = false;
5814 WriteOptions write_options;
5815 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5816 auto s = txn0->SetName("xid");
5817 ASSERT_OK(s);
5818 s = txn0->Put(Slice("foo0"), Slice("bar0a"));
5819 ASSERT_OK(s);
5820 s = txn0->Put(Slice("foo0"), Slice("bar0b"));
5821 ASSERT_OK(s);
5822 s = txn0->Put(Slice("foo1"), Slice("bar1"));
5823 ASSERT_OK(s);
5824 s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
5825 ASSERT_OK(s);
5826 // Repeat a key after the start of a sub-patch. This should not cause a
5827 // duplicate in the most recent sub-patch and hence not creating a new
5828 // sub-patch.
5829 s = txn0->Put(Slice("foo0"), Slice("bar0c"));
5830 ASSERT_OK(s);
5831 s = txn0->Merge(Slice("foo2"), Slice("bar2b"));
5832 ASSERT_OK(s);
5833 // duplicate the keys but in a different cf. It should not be counted as
5834 // duplicate.
5835 s = txn0->Put(cf_handle, Slice("foo0"), Slice("bar0-cf1"));
5836 ASSERT_OK(s);
5837 s = txn0->Put(Slice("foo3"), Slice("bar3"));
5838 ASSERT_OK(s);
5839 s = txn0->Merge(Slice("foo3"), Slice("bar3"));
5840 ASSERT_OK(s);
5841 s = txn0->Put(Slice("foo4"), Slice("bar4"));
5842 ASSERT_OK(s);
5843 s = txn0->Delete(Slice("foo4"));
5844 ASSERT_OK(s);
5845 s = txn0->SingleDelete(Slice("foo4"));
5846 ASSERT_OK(s);
5847 if (do_prepare) {
5848 s = txn0->Prepare();
5849 ASSERT_OK(s);
5850 }
5851 if (do_rollback) {
5852 // Test rolling back the batch with duplicates
5853 s = txn0->Rollback();
5854 ASSERT_OK(s);
5855 } else {
5856 if (with_commit_batch) {
5857 assert(do_prepare);
5858 auto cb = txn0->GetCommitTimeWriteBatch();
5859 // duplicate a key in the original batch
5860 // TODO(myabandeh): the behavior of GetCommitTimeWriteBatch
5861 // conflicting with the prepared batch is currently undefined and
5862 // gives different results in different implementations.
5863
5864 // s = cb->Put(Slice("foo0"), Slice("bar0d"));
5865 // ASSERT_OK(s);
5866 // add a new duplicate key
5867 s = cb->Put(Slice("foo6"), Slice("bar6a"));
5868 ASSERT_OK(s);
5869 s = cb->Put(Slice("foo6"), Slice("bar6b"));
5870 ASSERT_OK(s);
5871 // add a duplicate key that is removed in the same batch
5872 s = cb->Put(Slice("foo7"), Slice("bar7a"));
5873 ASSERT_OK(s);
5874 s = cb->Delete(Slice("foo7"));
5875 ASSERT_OK(s);
5876 }
5877 s = txn0->Commit();
5878 ASSERT_OK(s);
5879 }
5880 delete txn0;
5881 ReadOptions ropt;
5882 PinnableSlice pinnable_val;
5883
5884 if (do_rollback) {
5885 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5886 ASSERT_TRUE(s.IsNotFound());
5887 s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5888 ASSERT_TRUE(s.IsNotFound());
5889 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5890 ASSERT_TRUE(s.IsNotFound());
5891 s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5892 ASSERT_TRUE(s.IsNotFound());
5893 s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5894 ASSERT_TRUE(s.IsNotFound());
5895 s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5896 ASSERT_TRUE(s.IsNotFound());
5897 } else {
5898 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5899 ASSERT_OK(s);
5900 ASSERT_TRUE(pinnable_val == ("bar0c"));
5901 s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5902 ASSERT_OK(s);
5903 ASSERT_TRUE(pinnable_val == ("bar0-cf1"));
5904 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5905 ASSERT_OK(s);
5906 ASSERT_TRUE(pinnable_val == ("bar1"));
5907 s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5908 ASSERT_OK(s);
5909 ASSERT_TRUE(pinnable_val == ("bar2a,bar2b"));
5910 s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5911 ASSERT_OK(s);
5912 ASSERT_TRUE(pinnable_val == ("bar3,bar3"));
5913 s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5914 ASSERT_TRUE(s.IsNotFound());
5915 if (with_commit_batch) {
5916 s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val);
5917 ASSERT_OK(s);
5918 ASSERT_TRUE(pinnable_val == ("bar6b"));
5919 s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val);
5920 ASSERT_TRUE(s.IsNotFound());
5921 }
5922 }
5923 delete cf_handle;
5924 } // with_commit_batch
5925 } // do_rollback
5926 } // do_prepare
5927
5928 if (!options.unordered_write) {
5929 // Also test with max_successive_merges > 0. max_successive_merges will not
5930 // affect our algorithm for duplicate key insertion but we add the test to
5931 // verify that.
5932 cf_options.max_successive_merges = 2;
5933 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
5934 ASSERT_OK(ReOpen());
5935 db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
5936 WriteOptions write_options;
5937 // Ensure one value for the key
5938 ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value")));
5939 WriteBatch batch;
5940 // Merge more than max_successive_merges times
5941 ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("1")));
5942 ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("2")));
5943 ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("3")));
5944 ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("4")));
5945 ASSERT_OK(db->Write(write_options, &batch));
5946 ReadOptions read_options;
5947 string value;
5948 ASSERT_OK(db->Get(read_options, cf_handle, "key", &value));
5949 ASSERT_EQ(value, "value,1,2,3,4");
5950 delete cf_handle;
5951 }
5952
5953 {
5954 // Test that the duplicate detection is not compromised after rolling back
5955 // to a save point
5956 TransactionOptions txn_options;
5957 WriteOptions write_options;
5958 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5959 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
5960 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
5961 txn0->SetSavePoint();
5962 ASSERT_OK(txn0->RollbackToSavePoint());
5963 ASSERT_OK(txn0->Commit());
5964 delete txn0;
5965 }
5966
5967 // Test sucessfull recovery after a crash
5968 {
5969 ASSERT_OK(ReOpen());
5970 TransactionOptions txn_options;
5971 WriteOptions write_options;
5972 ReadOptions ropt;
5973 Transaction* txn0;
5974 PinnableSlice pinnable_val;
5975 Status s;
5976
5977 std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5978 cf_options.comparator = comp_gc.get();
5979 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
5980 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5981 delete cf_handle;
5982 std::vector<ColumnFamilyDescriptor> cfds{
5983 ColumnFamilyDescriptor(kDefaultColumnFamilyName,
5984 ColumnFamilyOptions(options)),
5985 ColumnFamilyDescriptor(cf_name, cf_options),
5986 };
5987 std::vector<ColumnFamilyHandle*> handles;
5988 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5989
5990 assert(db != nullptr);
5991 ASSERT_OK(db->Put(write_options, "foo0", "init"));
5992 ASSERT_OK(db->Put(write_options, "foo1", "init"));
5993 ASSERT_OK(db->Put(write_options, handles[1], "foo0", "init"));
5994 ASSERT_OK(db->Put(write_options, handles[1], "foo1", "init"));
5995
5996 // one entry
5997 txn0 = db->BeginTransaction(write_options, txn_options);
5998 ASSERT_OK(txn0->SetName("xid"));
5999 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
6000 ASSERT_OK(txn0->Prepare());
6001 delete txn0;
6002 // This will check the asserts inside recovery code
6003 ASSERT_OK(db->FlushWAL(true));
6004 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6005 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6006 txn0 = db->GetTransactionByName("xid");
6007 ASSERT_TRUE(txn0 != nullptr);
6008 ASSERT_OK(txn0->Commit());
6009 delete txn0;
6010 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6011 ASSERT_OK(s);
6012 ASSERT_TRUE(pinnable_val == ("bar0a"));
6013
6014 // two entries, no duplicate
6015 txn0 = db->BeginTransaction(write_options, txn_options);
6016 ASSERT_OK(txn0->SetName("xid"));
6017 ASSERT_OK(txn0->Put(handles[1], Slice("foo0"), Slice("bar0b")));
6018 ASSERT_OK(txn0->Put(handles[1], Slice("fol1"), Slice("bar1b")));
6019 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
6020 ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1b")));
6021 ASSERT_OK(txn0->Prepare());
6022 delete txn0;
6023 // This will check the asserts inside recovery code
6024 ASSERT_OK(db->FlushWAL(true));
6025 // Flush only cf 1
6026 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6027 ->TEST_FlushMemTable(true, false, handles[1]));
6028 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6029 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6030 txn0 = db->GetTransactionByName("xid");
6031 ASSERT_TRUE(txn0 != nullptr);
6032 ASSERT_OK(txn0->Commit());
6033 delete txn0;
6034 pinnable_val.Reset();
6035 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6036 ASSERT_OK(s);
6037 ASSERT_TRUE(pinnable_val == ("bar0b"));
6038 pinnable_val.Reset();
6039 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
6040 ASSERT_OK(s);
6041 ASSERT_TRUE(pinnable_val == ("bar1b"));
6042 pinnable_val.Reset();
6043 s = db->Get(ropt, handles[1], "foo0", &pinnable_val);
6044 ASSERT_OK(s);
6045 ASSERT_TRUE(pinnable_val == ("bar0b"));
6046 pinnable_val.Reset();
6047 s = db->Get(ropt, handles[1], "fol1", &pinnable_val);
6048 ASSERT_OK(s);
6049 ASSERT_TRUE(pinnable_val == ("bar1b"));
6050
6051 // one duplicate with ::Put
6052 txn0 = db->BeginTransaction(write_options, txn_options);
6053 ASSERT_OK(txn0->SetName("xid"));
6054 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0c")));
6055 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey1"), Slice("bar1d")));
6056 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0c")));
6057 ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1c")));
6058 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0d")));
6059 ASSERT_OK(txn0->Prepare());
6060 delete txn0;
6061 // This will check the asserts inside recovery code
6062 ASSERT_OK(db->FlushWAL(true));
6063 // Flush only cf 1
6064 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6065 ->TEST_FlushMemTable(true, false, handles[1]));
6066 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6067 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6068 txn0 = db->GetTransactionByName("xid");
6069 ASSERT_TRUE(txn0 != nullptr);
6070 ASSERT_OK(txn0->Commit());
6071 delete txn0;
6072 pinnable_val.Reset();
6073 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6074 ASSERT_OK(s);
6075 ASSERT_TRUE(pinnable_val == ("bar0d"));
6076 pinnable_val.Reset();
6077 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
6078 ASSERT_OK(s);
6079 ASSERT_TRUE(pinnable_val == ("bar1c"));
6080 pinnable_val.Reset();
6081 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6082 ASSERT_OK(s);
6083 ASSERT_TRUE(pinnable_val == ("bar1d"));
6084
6085 // Duplicate with ::Put, ::Delete
6086 txn0 = db->BeginTransaction(write_options, txn_options);
6087 ASSERT_OK(txn0->SetName("xid"));
6088 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0e")));
6089 ASSERT_OK(txn0->Delete(handles[1], Slice("key-nonkey1")));
6090 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
6091 ASSERT_OK(txn0->Delete(Slice("foo0")));
6092 ASSERT_OK(txn0->Prepare());
6093 delete txn0;
6094 // This will check the asserts inside recovery code
6095 ASSERT_OK(db->FlushWAL(true));
6096 // Flush only cf 1
6097 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6098 ->TEST_FlushMemTable(true, false, handles[1]));
6099 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6100 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6101 txn0 = db->GetTransactionByName("xid");
6102 ASSERT_TRUE(txn0 != nullptr);
6103 ASSERT_OK(txn0->Commit());
6104 delete txn0;
6105 pinnable_val.Reset();
6106 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6107 ASSERT_TRUE(s.IsNotFound());
6108 pinnable_val.Reset();
6109 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6110 ASSERT_TRUE(s.IsNotFound());
6111
6112 // Duplicate with ::Put, ::SingleDelete
6113 txn0 = db->BeginTransaction(write_options, txn_options);
6114 ASSERT_OK(txn0->SetName("xid"));
6115 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0g")));
6116 ASSERT_OK(txn0->SingleDelete(handles[1], Slice("key-nonkey1")));
6117 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
6118 ASSERT_OK(txn0->SingleDelete(Slice("foo0")));
6119 ASSERT_OK(txn0->Prepare());
6120 delete txn0;
6121 // This will check the asserts inside recovery code
6122 ASSERT_OK(db->FlushWAL(true));
6123 // Flush only cf 1
6124 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6125 ->TEST_FlushMemTable(true, false, handles[1]));
6126 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6127 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6128 txn0 = db->GetTransactionByName("xid");
6129 ASSERT_TRUE(txn0 != nullptr);
6130 ASSERT_OK(txn0->Commit());
6131 delete txn0;
6132 pinnable_val.Reset();
6133 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6134 ASSERT_TRUE(s.IsNotFound());
6135 pinnable_val.Reset();
6136 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6137 ASSERT_TRUE(s.IsNotFound());
6138
6139 // Duplicate with ::Put, ::Merge
6140 txn0 = db->BeginTransaction(write_options, txn_options);
6141 ASSERT_OK(txn0->SetName("xid"));
6142 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar1i")));
6143 ASSERT_OK(txn0->Merge(handles[1], Slice("key-nonkey1"), Slice("bar1j")));
6144 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0f")));
6145 ASSERT_OK(txn0->Merge(Slice("foo0"), Slice("bar0g")));
6146 ASSERT_OK(txn0->Prepare());
6147 delete txn0;
6148 // This will check the asserts inside recovery code
6149 ASSERT_OK(db->FlushWAL(true));
6150 // Flush only cf 1
6151 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6152 ->TEST_FlushMemTable(true, false, handles[1]));
6153 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6154 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6155 txn0 = db->GetTransactionByName("xid");
6156 ASSERT_TRUE(txn0 != nullptr);
6157 ASSERT_OK(txn0->Commit());
6158 delete txn0;
6159 pinnable_val.Reset();
6160 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6161 ASSERT_OK(s);
6162 ASSERT_TRUE(pinnable_val == ("bar0f,bar0g"));
6163 pinnable_val.Reset();
6164 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6165 ASSERT_OK(s);
6166 ASSERT_TRUE(pinnable_val == ("bar1i,bar1j"));
6167
6168 for (auto h : handles) {
6169 delete h;
6170 }
6171 delete db;
6172 db = nullptr;
6173 }
6174 }
6175
6176 // Test that the reseek optimization in iterators will not result in an infinite
6177 // loop if there are too many uncommitted entries before the snapshot.
TEST_P(TransactionTest,ReseekOptimization)6178 TEST_P(TransactionTest, ReseekOptimization) {
6179 WriteOptions write_options;
6180 write_options.sync = true;
6181 write_options.disableWAL = false;
6182 ColumnFamilyDescriptor cfd;
6183 ASSERT_OK(db->DefaultColumnFamily()->GetDescriptor(&cfd));
6184 auto max_skip = cfd.options.max_sequential_skip_in_iterations;
6185
6186 ASSERT_OK(db->Put(write_options, Slice("foo0"), Slice("initv")));
6187
6188 TransactionOptions txn_options;
6189 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
6190 ASSERT_OK(txn0->SetName("xid"));
6191 // Duplicate keys will result into separate sequence numbers in WritePrepared
6192 // and WriteUnPrepared
6193 for (size_t i = 0; i < 2 * max_skip; i++) {
6194 ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar")));
6195 }
6196 ASSERT_OK(txn0->Prepare());
6197 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("initv")));
6198
6199 ReadOptions read_options;
6200 // To avoid loops
6201 read_options.max_skippable_internal_keys = 10 * max_skip;
6202 Iterator* iter = db->NewIterator(read_options);
6203 ASSERT_OK(iter->status());
6204 size_t cnt = 0;
6205 iter->SeekToFirst();
6206 while (iter->Valid()) {
6207 iter->Next();
6208 ASSERT_OK(iter->status());
6209 cnt++;
6210 }
6211 ASSERT_EQ(cnt, 2);
6212 cnt = 0;
6213 iter->SeekToLast();
6214 while (iter->Valid()) {
6215 iter->Prev();
6216 ASSERT_OK(iter->status());
6217 cnt++;
6218 }
6219 ASSERT_EQ(cnt, 2);
6220 delete iter;
6221 ASSERT_OK(txn0->Rollback());
6222 delete txn0;
6223 }
6224
6225 // After recovery in kPointInTimeRecovery mode, the corrupted log file remains
6226 // there. The new log files should be still read succesfully during recovery of
6227 // the 2nd crash.
TEST_P(TransactionTest,DoubleCrashInRecovery)6228 TEST_P(TransactionTest, DoubleCrashInRecovery) {
6229 for (const bool manual_wal_flush : {false, true}) {
6230 for (const bool write_after_recovery : {false, true}) {
6231 options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
6232 options.manual_wal_flush = manual_wal_flush;
6233 ASSERT_OK(ReOpen());
6234 std::string cf_name = "two";
6235 ColumnFamilyOptions cf_options;
6236 ColumnFamilyHandle* cf_handle = nullptr;
6237 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
6238
6239 // Add a prepare entry to prevent the older logs from being deleted.
6240 WriteOptions write_options;
6241 TransactionOptions txn_options;
6242 Transaction* txn = db->BeginTransaction(write_options, txn_options);
6243 ASSERT_OK(txn->SetName("xid"));
6244 ASSERT_OK(txn->Put(Slice("foo-prepare"), Slice("bar-prepare")));
6245 ASSERT_OK(txn->Prepare());
6246
6247 FlushOptions flush_ops;
6248 ASSERT_OK(db->Flush(flush_ops));
6249 // Now we have a log that cannot be deleted
6250
6251 ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1"));
6252 // Flush only the 2nd cf
6253 ASSERT_OK(db->Flush(flush_ops, cf_handle));
6254
6255 // The value is large enough to be touched by the corruption we ingest
6256 // below.
6257 std::string large_value(400, ' ');
6258 // key/value not touched by corruption
6259 ASSERT_OK(db->Put(write_options, "foo2", "bar2"));
6260 // key/value touched by corruption
6261 ASSERT_OK(db->Put(write_options, "foo3", large_value));
6262 // key/value not touched by corruption
6263 ASSERT_OK(db->Put(write_options, "foo4", "bar4"));
6264
6265 ASSERT_OK(db->FlushWAL(true));
6266 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
6267 uint64_t wal_file_id = db_impl->TEST_LogfileNumber();
6268 std::string fname = LogFileName(dbname, wal_file_id);
6269 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6270 delete txn;
6271 delete cf_handle;
6272 delete db;
6273 db = nullptr;
6274
6275 // Corrupt the last log file in the middle, so that it is not corrupted
6276 // in the tail.
6277 std::string file_content;
6278 ASSERT_OK(ReadFileToString(env, fname, &file_content));
6279 file_content[400] = 'h';
6280 file_content[401] = 'a';
6281 ASSERT_OK(env->DeleteFile(fname));
6282 ASSERT_OK(WriteStringToFile(env, file_content, fname, true));
6283
6284 // Recover from corruption
6285 std::vector<ColumnFamilyHandle*> handles;
6286 std::vector<ColumnFamilyDescriptor> column_families;
6287 column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName,
6288 ColumnFamilyOptions()));
6289 column_families.push_back(
6290 ColumnFamilyDescriptor("two", ColumnFamilyOptions()));
6291 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6292 assert(db != nullptr);
6293
6294 if (write_after_recovery) {
6295 // Write data to the log right after the corrupted log
6296 ASSERT_OK(db->Put(write_options, "foo5", large_value));
6297 }
6298
6299 // Persist data written to WAL during recovery or by the last Put
6300 ASSERT_OK(db->FlushWAL(true));
6301 // 2nd crash to recover while having a valid log after the corrupted one.
6302 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6303 assert(db != nullptr);
6304 txn = db->GetTransactionByName("xid");
6305 ASSERT_TRUE(txn != nullptr);
6306 ASSERT_OK(txn->Commit());
6307 delete txn;
6308 for (auto handle : handles) {
6309 delete handle;
6310 }
6311 }
6312 }
6313 }
6314
TEST_P(TransactionTest,CommitWithoutPrepare)6315 TEST_P(TransactionTest, CommitWithoutPrepare) {
6316 {
6317 // skip_prepare = false.
6318 WriteOptions write_options;
6319 TransactionOptions txn_options;
6320 txn_options.skip_prepare = false;
6321 Transaction* txn = db->BeginTransaction(write_options, txn_options);
6322 ASSERT_TRUE(txn->Commit().IsTxnNotPrepared());
6323 delete txn;
6324 }
6325
6326 {
6327 // skip_prepare = true.
6328 WriteOptions write_options;
6329 TransactionOptions txn_options;
6330 txn_options.skip_prepare = true;
6331 Transaction* txn = db->BeginTransaction(write_options, txn_options);
6332 ASSERT_OK(txn->Commit());
6333 delete txn;
6334 }
6335 }
6336
6337 } // namespace ROCKSDB_NAMESPACE
6338
main(int argc,char ** argv)6339 int main(int argc, char** argv) {
6340 ::testing::InitGoogleTest(&argc, argv);
6341 return RUN_ALL_TESTS();
6342 }
6343
6344 #else
6345 #include <stdio.h>
6346
main(int,char **)6347 int main(int /*argc*/, char** /*argv*/) {
6348 fprintf(stderr,
6349 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
6350 return 0;
6351 }
6352
6353 #endif // ROCKSDB_LITE
6354