1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #pragma once
7 #ifndef ROCKSDB_LITE
8
9 #include <cinttypes>
10 #include <mutex>
11 #include <queue>
12 #include <set>
13 #include <string>
14 #include <unordered_map>
15 #include <vector>
16
17 #include "db/db_iter.h"
18 #include "db/pre_release_callback.h"
19 #include "db/read_callback.h"
20 #include "db/snapshot_checker.h"
21 #include "rocksdb/db.h"
22 #include "rocksdb/options.h"
23 #include "rocksdb/utilities/transaction_db.h"
24 #include "util/cast_util.h"
25 #include "util/set_comparator.h"
26 #include "util/string_util.h"
27 #include "utilities/transactions/pessimistic_transaction.h"
28 #include "utilities/transactions/pessimistic_transaction_db.h"
29 #include "utilities/transactions/transaction_lock_mgr.h"
30 #include "utilities/transactions/write_prepared_txn.h"
31
32 namespace ROCKSDB_NAMESPACE {
33 enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot };
34
35 // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
36 // In this way some data in the DB might not be committed. The DB provides
37 // mechanisms to tell such data apart from committed data.
38 class WritePreparedTxnDB : public PessimisticTransactionDB {
39 public:
WritePreparedTxnDB(DB * db,const TransactionDBOptions & txn_db_options)40 explicit WritePreparedTxnDB(DB* db,
41 const TransactionDBOptions& txn_db_options)
42 : PessimisticTransactionDB(db, txn_db_options),
43 SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
44 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
45 COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
46 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
47 FORMAT(COMMIT_CACHE_BITS) {
48 Init(txn_db_options);
49 }
50
WritePreparedTxnDB(StackableDB * db,const TransactionDBOptions & txn_db_options)51 explicit WritePreparedTxnDB(StackableDB* db,
52 const TransactionDBOptions& txn_db_options)
53 : PessimisticTransactionDB(db, txn_db_options),
54 SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
55 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
56 COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
57 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
58 FORMAT(COMMIT_CACHE_BITS) {
59 Init(txn_db_options);
60 }
61
62 virtual ~WritePreparedTxnDB();
63
64 virtual Status Initialize(
65 const std::vector<size_t>& compaction_enabled_cf_indices,
66 const std::vector<ColumnFamilyHandle*>& handles) override;
67
68 Transaction* BeginTransaction(const WriteOptions& write_options,
69 const TransactionOptions& txn_options,
70 Transaction* old_txn) override;
71
72 using TransactionDB::Write;
73 Status Write(const WriteOptions& opts, WriteBatch* updates) override;
74
75 // Optimized version of ::Write that receives more optimization request such
76 // as skip_concurrency_control.
77 using PessimisticTransactionDB::Write;
78 Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&,
79 WriteBatch* updates) override;
80
81 // Write the batch to the underlying DB and mark it as committed. Could be
82 // used by both directly from TxnDB or through a transaction.
83 Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch,
84 size_t batch_cnt, WritePreparedTxn* txn);
85
86 using DB::Get;
87 virtual Status Get(const ReadOptions& options,
88 ColumnFamilyHandle* column_family, const Slice& key,
89 PinnableSlice* value) override;
90
91 using DB::MultiGet;
92 virtual std::vector<Status> MultiGet(
93 const ReadOptions& options,
94 const std::vector<ColumnFamilyHandle*>& column_family,
95 const std::vector<Slice>& keys,
96 std::vector<std::string>* values) override;
97
98 using DB::NewIterator;
99 virtual Iterator* NewIterator(const ReadOptions& options,
100 ColumnFamilyHandle* column_family) override;
101
102 using DB::NewIterators;
103 virtual Status NewIterators(
104 const ReadOptions& options,
105 const std::vector<ColumnFamilyHandle*>& column_families,
106 std::vector<Iterator*>* iterators) override;
107
108 // Check whether the transaction that wrote the value with sequence number seq
109 // is visible to the snapshot with sequence number snapshot_seq.
110 // Returns true if commit_seq <= snapshot_seq
111 // If the snapshot_seq is already released and snapshot_seq <= max, sets
112 // *snap_released to true and returns true as well.
113 inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
114 uint64_t min_uncommitted = kMinUnCommittedSeq,
115 bool* snap_released = nullptr) const {
116 ROCKS_LOG_DETAILS(info_log_,
117 "IsInSnapshot %" PRIu64 " in %" PRIu64
118 " min_uncommitted %" PRIu64,
119 prep_seq, snapshot_seq, min_uncommitted);
120 assert(min_uncommitted >= kMinUnCommittedSeq);
121 // Caller is responsible to initialize snap_released.
122 assert(snap_released == nullptr || *snap_released == false);
123 // Here we try to infer the return value without looking into prepare list.
124 // This would help avoiding synchronization over a shared map.
125 // TODO(myabandeh): optimize this. This sequence of checks must be correct
126 // but not necessary efficient
127 if (prep_seq == 0) {
128 // Compaction will output keys to bottom-level with sequence number 0 if
129 // it is visible to the earliest snapshot.
130 ROCKS_LOG_DETAILS(
131 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
132 prep_seq, snapshot_seq, 1);
133 return true;
134 }
135 if (snapshot_seq < prep_seq) {
136 // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
137 ROCKS_LOG_DETAILS(
138 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
139 prep_seq, snapshot_seq, 0);
140 return false;
141 }
142 if (prep_seq < min_uncommitted) {
143 ROCKS_LOG_DETAILS(info_log_,
144 "IsInSnapshot %" PRIu64 " in %" PRIu64
145 " returns %" PRId32
146 " because of min_uncommitted %" PRIu64,
147 prep_seq, snapshot_seq, 1, min_uncommitted);
148 return true;
149 }
150 // Commit of delayed prepared has two non-atomic steps: add to commit cache,
151 // remove from delayed prepared. Our reads from these two is also
152 // non-atomic. By looking into commit cache first thus we might not find the
153 // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
154 // we check if there was any delayed prepared BEFORE looking into commit
155 // cache, ii) if there was, we complete the search steps to be these: i)
156 // commit cache, ii) delayed prepared, commit cache again. In this way if
157 // the first query to commit cache missed the commit, the 2nd will catch it.
158 bool was_empty;
159 SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
160 CommitEntry64b dont_care;
161 auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
162 size_t repeats = 0;
163 do {
164 repeats++;
165 assert(repeats < 100);
166 if (UNLIKELY(repeats >= 100)) {
167 throw std::runtime_error(
168 "The read was intrupted 100 times by update to max_evicted_seq_. "
169 "This is unexpected in all setups");
170 }
171 max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
172 TEST_SYNC_POINT(
173 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
174 TEST_SYNC_POINT(
175 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
176 was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
177 TEST_SYNC_POINT(
178 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
179 TEST_SYNC_POINT(
180 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
181 CommitEntry cached;
182 bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
183 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
184 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
185 if (exist && prep_seq == cached.prep_seq) {
186 // It is committed and also not evicted from commit cache
187 ROCKS_LOG_DETAILS(
188 info_log_,
189 "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
190 prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
191 return cached.commit_seq <= snapshot_seq;
192 }
193 // else it could be committed but not inserted in the map which could
194 // happen after recovery, or it could be committed and evicted by another
195 // commit, or never committed.
196
197 // At this point we dont know if it was committed or it is still prepared
198 max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
199 if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) {
200 continue;
201 }
202 // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
203 if (max_evicted_seq_ub < prep_seq) {
204 // Not evicted from cache and also not present, so must be still
205 // prepared
206 ROCKS_LOG_DETAILS(info_log_,
207 "IsInSnapshot %" PRIu64 " in %" PRIu64
208 " returns %" PRId32,
209 prep_seq, snapshot_seq, 0);
210 return false;
211 }
212 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
213 TEST_SYNC_POINT(
214 "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
215 if (!was_empty) {
216 // We should not normally reach here
217 WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
218 ReadLock rl(&prepared_mutex_);
219 ROCKS_LOG_WARN(
220 info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
221 static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
222 if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
223 // This is the order: 1) delayed_prepared_commits_ update, 2) publish
224 // 3) delayed_prepared_ clean up. So check if it is the case of a late
225 // clenaup.
226 auto it = delayed_prepared_commits_.find(prep_seq);
227 if (it == delayed_prepared_commits_.end()) {
228 // Then it is not committed yet
229 ROCKS_LOG_DETAILS(info_log_,
230 "IsInSnapshot %" PRIu64 " in %" PRIu64
231 " returns %" PRId32,
232 prep_seq, snapshot_seq, 0);
233 return false;
234 } else {
235 ROCKS_LOG_DETAILS(info_log_,
236 "IsInSnapshot %" PRIu64 " in %" PRIu64
237 " commit: %" PRIu64 " returns %" PRId32,
238 prep_seq, snapshot_seq, it->second,
239 snapshot_seq <= it->second);
240 return it->second <= snapshot_seq;
241 }
242 } else {
243 // 2nd query to commit cache. Refer to was_empty comment above.
244 exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
245 if (exist && prep_seq == cached.prep_seq) {
246 ROCKS_LOG_DETAILS(
247 info_log_,
248 "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
249 prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
250 return cached.commit_seq <= snapshot_seq;
251 }
252 max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
253 }
254 }
255 } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub));
256 // When advancing max_evicted_seq_, we move older entires from prepared to
257 // delayed_prepared_. Also we move evicted entries from commit cache to
258 // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
259 // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
260 // old_commit_map_, iii) committed with no conflict with any snapshot. Case
261 // (i) delayed_prepared_ is checked above
262 if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case
263 // only (iii) is the case: committed
264 // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
265 // snapshot_seq
266 ROCKS_LOG_DETAILS(
267 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
268 prep_seq, snapshot_seq, 1);
269 return true;
270 }
271 // else (ii) might be the case: check the commit data saved for this
272 // snapshot. If there was no overlapping commit entry, then it is committed
273 // with a commit_seq lower than any live snapshot, including snapshot_seq.
274 if (old_commit_map_empty_.load(std::memory_order_acquire)) {
275 ROCKS_LOG_DETAILS(info_log_,
276 "IsInSnapshot %" PRIu64 " in %" PRIu64
277 " returns %" PRId32 " released=1",
278 prep_seq, snapshot_seq, 0);
279 assert(snap_released);
280 // This snapshot is not valid anymore. We cannot tell if prep_seq is
281 // committed before or after the snapshot. Return true but also set
282 // snap_released to true.
283 *snap_released = true;
284 return true;
285 }
286 {
287 // We should not normally reach here unless sapshot_seq is old. This is a
288 // rare case and it is ok to pay the cost of mutex ReadLock for such old,
289 // reading transactions.
290 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
291 ReadLock rl(&old_commit_map_mutex_);
292 auto prep_set_entry = old_commit_map_.find(snapshot_seq);
293 bool found = prep_set_entry != old_commit_map_.end();
294 if (found) {
295 auto& vec = prep_set_entry->second;
296 found = std::binary_search(vec.begin(), vec.end(), prep_seq);
297 } else {
298 // coming from compaction
299 ROCKS_LOG_DETAILS(info_log_,
300 "IsInSnapshot %" PRIu64 " in %" PRIu64
301 " returns %" PRId32 " released=1",
302 prep_seq, snapshot_seq, 0);
303 // This snapshot is not valid anymore. We cannot tell if prep_seq is
304 // committed before or after the snapshot. Return true but also set
305 // snap_released to true.
306 assert(snap_released);
307 *snap_released = true;
308 return true;
309 }
310
311 if (!found) {
312 ROCKS_LOG_DETAILS(info_log_,
313 "IsInSnapshot %" PRIu64 " in %" PRIu64
314 " returns %" PRId32,
315 prep_seq, snapshot_seq, 1);
316 return true;
317 }
318 }
319 // (ii) it the case: it is committed but after the snapshot_seq
320 ROCKS_LOG_DETAILS(
321 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
322 prep_seq, snapshot_seq, 0);
323 return false;
324 }
325
326 // Add the transaction with prepare sequence seq to the prepared list.
327 // Note: must be called serially with increasing seq on each call.
328 // locked is true if prepared_mutex_ is already locked.
329 void AddPrepared(uint64_t seq, bool locked = false);
330 // Check if any of the prepared txns are less than new max_evicted_seq_. Must
331 // be called with prepared_mutex_ write locked.
332 void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked);
333 // Remove the transaction with prepare sequence seq from the prepared list
334 void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
335 // Add the transaction with prepare sequence prepare_seq and commit sequence
336 // commit_seq to the commit map. loop_cnt is to detect infinite loops.
337 // Note: must be called serially.
338 void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
339 uint8_t loop_cnt = 0);
340
341 struct CommitEntry {
342 uint64_t prep_seq;
343 uint64_t commit_seq;
CommitEntryCommitEntry344 CommitEntry() : prep_seq(0), commit_seq(0) {}
CommitEntryCommitEntry345 CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
346 bool operator==(const CommitEntry& rhs) const {
347 return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
348 }
349 };
350
351 struct CommitEntry64bFormat {
CommitEntry64bFormatCommitEntry64bFormat352 explicit CommitEntry64bFormat(size_t index_bits)
353 : INDEX_BITS(index_bits),
354 PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
355 COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
356 COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)),
357 DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {}
358 // Number of higher bits of a sequence number that is not used. They are
359 // used to encode the value type, ...
360 const size_t PAD_BITS = static_cast<size_t>(8);
361 // Number of lower bits from prepare seq that can be skipped as they are
362 // implied by the index of the entry in the array
363 const size_t INDEX_BITS;
364 // Number of bits we use to encode the prepare seq
365 const size_t PREP_BITS;
366 // Number of bits we use to encode the commit seq.
367 const size_t COMMIT_BITS;
368 // Filter to encode/decode commit seq
369 const uint64_t COMMIT_FILTER;
370 // The value of commit_seq - prepare_seq + 1 must be less than this bound
371 const uint64_t DELTA_UPPERBOUND;
372 };
373
374 // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
375 // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
376 // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
377 // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
378 // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
379 // bits that do not have to be encoded (will be provided externally) DELTA:
380 // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
381 // index bits + PADs
382 struct CommitEntry64b {
CommitEntry64bCommitEntry64b383 constexpr CommitEntry64b() noexcept : rep_(0) {}
384
CommitEntry64bCommitEntry64b385 CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
386 : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
387
CommitEntry64bCommitEntry64b388 CommitEntry64b(const uint64_t ps, const uint64_t cs,
389 const CommitEntry64bFormat& format) {
390 assert(ps < static_cast<uint64_t>(
391 (1ull << (format.PREP_BITS + format.INDEX_BITS))));
392 assert(ps <= cs);
393 uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
394 // zero is reserved for uninitialized entries
395 assert(0 < delta);
396 assert(delta < format.DELTA_UPPERBOUND);
397 if (delta >= format.DELTA_UPPERBOUND) {
398 throw std::runtime_error(
399 "commit_seq >> prepare_seq. The allowed distance is " +
400 ToString(format.DELTA_UPPERBOUND) + " commit_seq is " +
401 ToString(cs) + " prepare_seq is " + ToString(ps));
402 }
403 rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
404 rep_ = rep_ | delta;
405 }
406
407 // Return false if the entry is empty
ParseCommitEntry64b408 bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
409 const CommitEntry64bFormat& format) {
410 uint64_t delta = rep_ & format.COMMIT_FILTER;
411 // zero is reserved for uninitialized entries
412 assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
413 if (delta == 0) {
414 return false; // initialized entry would have non-zero delta
415 }
416
417 assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
418 uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
419 prep_up >>= format.PAD_BITS;
420 const uint64_t& prep_low = indexed_seq;
421 entry->prep_seq = prep_up | prep_low;
422
423 entry->commit_seq = entry->prep_seq + delta - 1;
424 return true;
425 }
426
427 private:
428 uint64_t rep_;
429 };
430
431 // Struct to hold ownership of snapshot and read callback for cleanup.
432 struct IteratorState;
433
GetCFComparatorMap()434 std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() {
435 return cf_map_;
436 }
GetCFHandleMap()437 std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() {
438 return handle_map_;
439 }
440 void UpdateCFComparatorMap(
441 const std::vector<ColumnFamilyHandle*>& handles) override;
442 void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
443
444 virtual const Snapshot* GetSnapshot() override;
445 SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
446
447 protected:
448 virtual Status VerifyCFOptions(
449 const ColumnFamilyOptions& cf_options) override;
450 // Assign the min and max sequence numbers for reading from the db. A seq >
451 // max is not valid, and a seq < min is valid, and a min <= seq < max requires
452 // further checking. Normally max is defined by the snapshot and min is by
453 // minimum uncommitted seq.
454 inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot,
455 SequenceNumber* min,
456 SequenceNumber* max);
457 // Validate is a snapshot sequence number is still valid based on the latest
458 // db status. backed_by_snapshot specifies if the number is baked by an actual
459 // snapshot object. order specified the memory order with which we load the
460 // atomic variables: relax is enough for the default since we care about last
461 // value seen by same thread.
462 inline bool ValidateSnapshot(
463 const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
464 std::memory_order order = std::memory_order_relaxed);
465 // Get a dummy snapshot that refers to kMaxSequenceNumber
GetMaxSnapshot()466 Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
467
468 private:
469 friend class AddPreparedCallback;
470 friend class PreparedHeap_BasicsTest_Test;
471 friend class PreparedHeap_Concurrent_Test;
472 friend class PreparedHeap_EmptyAtTheEnd_Test;
473 friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test;
474 friend class WritePreparedCommitEntryPreReleaseCallback;
475 friend class WritePreparedTransactionTestBase;
476 friend class WritePreparedTxn;
477 friend class WritePreparedTxnDBMock;
478 friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
479 friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test;
480 friend class
481 WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test;
482 friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
483 friend class WritePreparedTransactionTest_BasicRecovery_Test;
484 friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test;
485 friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
486 friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test;
487 friend class WritePreparedTransactionTest_CommitMap_Test;
488 friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
489 friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test;
490 friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
491 friend class WritePreparedTransactionTest_IsInSnapshot_Test;
492 friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
493 friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
494 friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test;
495 friend class
496 WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
497 friend class
498 WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
499 friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
500 friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
501 friend class WritePreparedTransactionTest_Rollback_Test;
502 friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test;
503 friend class WriteUnpreparedTxn;
504 friend class WriteUnpreparedTxnDB;
505 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
506
507 void Init(const TransactionDBOptions& /* unused */);
508
WPRecordTick(uint32_t ticker_type)509 void WPRecordTick(uint32_t ticker_type) const {
510 RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
511 }
512
513 // A heap with the amortized O(1) complexity for erase. It uses one extra heap
514 // to keep track of erased entries that are not yet on top of the main heap.
515 class PreparedHeap {
516 // The mutex is required for push and pop from PreparedHeap. ::erase will
517 // use external synchronization via prepared_mutex_.
518 port::Mutex push_pop_mutex_;
519 std::deque<uint64_t> heap_;
520 std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
521 erased_heap_;
522 std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
523 // True when testing crash recovery
524 bool TEST_CRASH_ = false;
525 friend class WritePreparedTxnDB;
526
527 public:
~PreparedHeap()528 ~PreparedHeap() {
529 if (!TEST_CRASH_) {
530 assert(heap_.empty());
531 assert(erased_heap_.empty());
532 }
533 }
push_pop_mutex()534 port::Mutex* push_pop_mutex() { return &push_pop_mutex_; }
535
empty()536 inline bool empty() { return top() == kMaxSequenceNumber; }
537 // Returns kMaxSequenceNumber if empty() and the smallest otherwise.
top()538 inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
push(uint64_t v)539 inline void push(uint64_t v) {
540 push_pop_mutex_.AssertHeld();
541 if (heap_.empty()) {
542 heap_top_.store(v, std::memory_order_release);
543 } else {
544 assert(heap_top_.load() < v);
545 }
546 heap_.push_back(v);
547 }
548 void pop(bool locked = false) {
549 if (!locked) {
550 push_pop_mutex()->Lock();
551 }
552 push_pop_mutex_.AssertHeld();
553 heap_.pop_front();
554 while (!heap_.empty() && !erased_heap_.empty() &&
555 // heap_.top() > erased_heap_.top() could happen if we have erased
556 // a non-existent entry. Ideally the user should not do that but we
557 // should be resilient against it.
558 heap_.front() >= erased_heap_.top()) {
559 if (heap_.front() == erased_heap_.top()) {
560 heap_.pop_front();
561 }
562 uint64_t erased __attribute__((__unused__));
563 erased = erased_heap_.top();
564 erased_heap_.pop();
565 // No duplicate prepare sequence numbers
566 assert(erased_heap_.empty() || erased_heap_.top() != erased);
567 }
568 while (heap_.empty() && !erased_heap_.empty()) {
569 erased_heap_.pop();
570 }
571 heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber,
572 std::memory_order_release);
573 if (!locked) {
574 push_pop_mutex()->Unlock();
575 }
576 }
577 // Concurrrent calls needs external synchronization. It is safe to be called
578 // concurrent to push and pop though.
erase(uint64_t seq)579 void erase(uint64_t seq) {
580 if (!empty()) {
581 auto top_seq = top();
582 if (seq < top_seq) {
583 // Already popped, ignore it.
584 } else if (top_seq == seq) {
585 pop();
586 #ifndef NDEBUG
587 MutexLock ml(push_pop_mutex());
588 assert(heap_.empty() || heap_.front() != seq);
589 #endif
590 } else { // top() > seq
591 // Down the heap, remember to pop it later
592 erased_heap_.push(seq);
593 }
594 }
595 }
596 };
597
TEST_Crash()598 void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; }
599
600 // Get the commit entry with index indexed_seq from the commit table. It
601 // returns true if such entry exists.
602 bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
603 CommitEntry* entry) const;
604
605 // Rewrite the entry with the index indexed_seq in the commit table with the
606 // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
607 // sets the evicted_entry and returns true.
608 bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
609 CommitEntry* evicted_entry);
610
611 // Rewrite the entry with the index indexed_seq in the commit table with the
612 // commit entry new_entry only if the existing entry matches the
613 // expected_entry. Returns false otherwise.
614 bool ExchangeCommitEntry(const uint64_t indexed_seq,
615 CommitEntry64b& expected_entry,
616 const CommitEntry& new_entry);
617
618 // Increase max_evicted_seq_ from the previous value prev_max to the new
619 // value. This also involves taking care of prepared txns that are not
620 // committed before new_max, as well as updating the list of live snapshots at
621 // the time of updating the max. Thread-safety: this function can be called
622 // concurrently. The concurrent invocations of this function is equivalent to
623 // a serial invocation in which the last invocation is the one with the
624 // largest new_max value.
625 void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
626 const SequenceNumber& new_max);
627
SmallestUnCommittedSeq()628 inline SequenceNumber SmallestUnCommittedSeq() {
629 // Note: We have two lists to look into, but for performance reasons they
630 // are not read atomically. Since CheckPreparedAgainstMax copies the entry
631 // to delayed_prepared_ before removing it from prepared_txns_, to ensure
632 // that a prepared entry will not go unmissed, we look into them in opposite
633 // order: first read prepared_txns_ and then delayed_prepared_.
634
635 // This must be called before calling ::top. This is because the concurrent
636 // thread would call ::RemovePrepared before updating
637 // GetLatestSequenceNumber(). Reading then in opposite order here guarantees
638 // that the ::top that we read would be lower the ::top if we had otherwise
639 // update/read them atomically.
640 auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
641 auto min_prepare = prepared_txns_.top();
642 // Since we update the prepare_heap always from the main write queue via
643 // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
644 // prepared data in 2pc transactions. For non-2pc transactions that are
645 // written in two steps, we also update prepared_txns_ at the first step
646 // (via the same mechanism) so that their uncommitted data is reflected in
647 // SmallestUnCommittedSeq.
648 if (!delayed_prepared_empty_.load()) {
649 ReadLock rl(&prepared_mutex_);
650 if (!delayed_prepared_.empty()) {
651 return *delayed_prepared_.begin();
652 }
653 }
654 bool empty = min_prepare == kMaxSequenceNumber;
655 if (empty) {
656 // Since GetLatestSequenceNumber is updated
657 // after prepared_txns_ are, the value of GetLatestSequenceNumber would
658 // reflect any uncommitted data that is not added to prepared_txns_ yet.
659 // Otherwise, if there is no concurrent txn, this value simply reflects
660 // that latest value in the memtable.
661 return next_prepare;
662 } else {
663 return std::min(min_prepare, next_prepare);
664 }
665 }
666
667 // Enhance the snapshot object by recording in it the smallest uncommitted seq
EnhanceSnapshot(SnapshotImpl * snapshot,SequenceNumber min_uncommitted)668 inline void EnhanceSnapshot(SnapshotImpl* snapshot,
669 SequenceNumber min_uncommitted) {
670 assert(snapshot);
671 assert(min_uncommitted <= snapshot->number_ + 1);
672 snapshot->min_uncommitted_ = min_uncommitted;
673 }
674
675 virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
676 SequenceNumber max);
677
678 // Will be called by the public ReleaseSnapshot method. Does the maintenance
679 // internal to WritePreparedTxnDB
680 void ReleaseSnapshotInternal(const SequenceNumber snap_seq);
681
682 // Update the list of snapshots corresponding to the soon-to-be-updated
683 // max_evicted_seq_. Thread-safety: this function can be called concurrently.
684 // The concurrent invocations of this function is equivalent to a serial
685 // invocation in which the last invocation is the one with the largest
686 // version value.
687 void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
688 const SequenceNumber& version);
689 // Check the new list of new snapshots against the old one to see if any of
690 // the snapshots are released and to do the cleanup for the released snapshot.
691 void CleanupReleasedSnapshots(
692 const std::vector<SequenceNumber>& new_snapshots,
693 const std::vector<SequenceNumber>& old_snapshots);
694
695 // Check an evicted entry against live snapshots to see if it should be kept
696 // around or it can be safely discarded (and hence assume committed for all
697 // snapshots). Thread-safety: this function can be called concurrently. If it
698 // is called concurrently with multiple UpdateSnapshots, the result is the
699 // same as checking the intersection of the snapshot list before updates with
700 // the snapshot list of all the concurrent updates.
701 void CheckAgainstSnapshots(const CommitEntry& evicted);
702
703 // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
704 // commit_seq. Return false if checking the next snapshot(s) is not needed.
705 // This is the case if none of the next snapshots could satisfy the condition.
706 // next_is_larger: the next snapshot will be a larger value
707 bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
708 const uint64_t& commit_seq,
709 const uint64_t& snapshot_seq,
710 const bool next_is_larger);
711
712 // A trick to increase the last visible sequence number by one and also wait
713 // for the in-flight commits to be visible.
714 void AdvanceSeqByOne();
715
716 // The list of live snapshots at the last time that max_evicted_seq_ advanced.
717 // The list stored into two data structures: in snapshot_cache_ that is
718 // efficient for concurrent reads, and in snapshots_ if the data does not fit
719 // into snapshot_cache_. The total number of snapshots in the two lists
720 std::atomic<size_t> snapshots_total_ = {};
721 // The list sorted in ascending order. Thread-safety for writes is provided
722 // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
723 // each entry. In x86_64 architecture such reads are compiled to simple read
724 // instructions.
725 const size_t SNAPSHOT_CACHE_BITS;
726 const size_t SNAPSHOT_CACHE_SIZE;
727 std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
728 // 2nd list for storing snapshots. The list sorted in ascending order.
729 // Thread-safety is provided with snapshots_mutex_.
730 std::vector<SequenceNumber> snapshots_;
731 // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
732 // redundant but simplifies CleanupOldSnapshots implementation.
733 // Thread-safety is provided with snapshots_mutex_.
734 std::vector<SequenceNumber> snapshots_all_;
735 // The version of the latest list of snapshots. This can be used to avoid
736 // rewriting a list that is concurrently updated with a more recent version.
737 SequenceNumber snapshots_version_ = 0;
738
739 // A heap of prepared transactions. Thread-safety is provided with
740 // prepared_mutex_.
741 PreparedHeap prepared_txns_;
742 const size_t COMMIT_CACHE_BITS;
743 const size_t COMMIT_CACHE_SIZE;
744 const CommitEntry64bFormat FORMAT;
745 // commit_cache_ must be initialized to zero to tell apart an empty index from
746 // a filled one. Thread-safety is provided with commit_cache_mutex_.
747 std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
748 // The largest evicted *commit* sequence number from the commit_cache_. If a
749 // seq is smaller than max_evicted_seq_ is might or might not be present in
750 // commit_cache_. So commit_cache_ must first be checked before consulting
751 // with max_evicted_seq_.
752 std::atomic<uint64_t> max_evicted_seq_ = {};
753 // Order: 1) update future_max_evicted_seq_ = new_max, 2)
754 // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
755 // GetSnapshotInternal guarantess that the snapshot seq is larger than
756 // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
757 // than max has already being looked at via a GetSnapshotListFromDB(new_max).
758 std::atomic<uint64_t> future_max_evicted_seq_ = {};
759 // Advance max_evicted_seq_ by this value each time it needs an update. The
760 // larger the value, the less frequent advances we would have. We do not want
761 // it to be too large either as it would cause stalls by doing too much
762 // maintenance work under the lock.
763 size_t INC_STEP_FOR_MAX_EVICTED = 1;
764 // A map from old snapshots (expected to be used by a few read-only txns) to
765 // prepared sequence number of the evicted entries from commit_cache_ that
766 // overlaps with such snapshot. These are the prepared sequence numbers that
767 // the snapshot, to which they are mapped, cannot assume to be committed just
768 // because it is no longer in the commit_cache_. The vector must be sorted
769 // after each update.
770 // Thread-safety is provided with old_commit_map_mutex_.
771 std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_;
772 // A set of long-running prepared transactions that are not finished by the
773 // time max_evicted_seq_ advances their sequence number. This is expected to
774 // be empty normally. Thread-safety is provided with prepared_mutex_.
775 std::set<uint64_t> delayed_prepared_;
776 // Commit of a delayed prepared: 1) update commit cache, 2) update
777 // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
778 // delayed_prepared_commits_ will help us tell apart the unprepared txns from
779 // the ones that are committed but not cleaned up yet.
780 std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
781 // Update when delayed_prepared_.empty() changes. Expected to be true
782 // normally.
783 std::atomic<bool> delayed_prepared_empty_ = {true};
784 // Update when old_commit_map_.empty() changes. Expected to be true normally.
785 std::atomic<bool> old_commit_map_empty_ = {true};
786 mutable port::RWMutex prepared_mutex_;
787 mutable port::RWMutex old_commit_map_mutex_;
788 mutable port::RWMutex commit_cache_mutex_;
789 mutable port::RWMutex snapshots_mutex_;
790 // A cache of the cf comparators
791 // Thread safety: since it is a const it is safe to read it concurrently
792 std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_;
793 // A cache of the cf handles
794 // Thread safety: since the handle is read-only object it is a const it is
795 // safe to read it concurrently
796 std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
797 // A dummy snapshot object that refers to kMaxSequenceNumber
798 SnapshotImpl dummy_max_snapshot_;
799 };
800
801 class WritePreparedTxnReadCallback : public ReadCallback {
802 public:
WritePreparedTxnReadCallback(WritePreparedTxnDB * db,SequenceNumber snapshot)803 WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
804 : ReadCallback(snapshot),
805 db_(db),
806 backed_by_snapshot_(kBackedByDBSnapshot) {}
WritePreparedTxnReadCallback(WritePreparedTxnDB * db,SequenceNumber snapshot,SequenceNumber min_uncommitted,SnapshotBackup backed_by_snapshot)807 WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
808 SequenceNumber min_uncommitted,
809 SnapshotBackup backed_by_snapshot)
810 : ReadCallback(snapshot, min_uncommitted),
811 db_(db),
812 backed_by_snapshot_(backed_by_snapshot) {
813 (void)backed_by_snapshot_; // to silence unused private field warning
814 }
815
~WritePreparedTxnReadCallback()816 virtual ~WritePreparedTxnReadCallback() {
817 // If it is not backed by snapshot, the caller must check validity
818 assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
819 }
820
821 // Will be called to see if the seq number visible; if not it moves on to
822 // the next seq number.
IsVisibleFullCheck(SequenceNumber seq)823 inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
824 auto snapshot = max_visible_seq_;
825 bool snap_released = false;
826 auto ret =
827 db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released);
828 assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
829 snap_released_ |= snap_released;
830 return ret;
831 }
832
valid()833 inline bool valid() {
834 valid_checked_ = true;
835 return snap_released_ == false;
836 }
837
838 // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
839 private:
840 WritePreparedTxnDB* db_;
841 // Whether max_visible_seq_ is backed by a snapshot
842 const SnapshotBackup backed_by_snapshot_;
843 bool snap_released_ = false;
844 // Safety check to ensure that the caller has checked invalid statuses
845 bool valid_checked_ = false;
846 };
847
848 class AddPreparedCallback : public PreReleaseCallback {
849 public:
AddPreparedCallback(WritePreparedTxnDB * db,DBImpl * db_impl,size_t sub_batch_cnt,bool two_write_queues,bool first_prepare_batch)850 AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
851 size_t sub_batch_cnt, bool two_write_queues,
852 bool first_prepare_batch)
853 : db_(db),
854 db_impl_(db_impl),
855 sub_batch_cnt_(sub_batch_cnt),
856 two_write_queues_(two_write_queues),
857 first_prepare_batch_(first_prepare_batch) {
858 (void)two_write_queues_; // to silence unused private field warning
859 }
Callback(SequenceNumber prepare_seq,bool is_mem_disabled,uint64_t log_number,size_t index,size_t total)860 virtual Status Callback(SequenceNumber prepare_seq,
861 bool is_mem_disabled __attribute__((__unused__)),
862 uint64_t log_number, size_t index,
863 size_t total) override {
864 assert(index < total);
865 // To reduce the cost of lock acquisition competing with the concurrent
866 // prepare requests, lock on the first callback and unlock on the last.
867 const bool do_lock = !two_write_queues_ || index == 0;
868 const bool do_unlock = !two_write_queues_ || index + 1 == total;
869 // Always Prepare from the main queue
870 assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
871 TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause");
872 TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume");
873 if (do_lock) {
874 db_->prepared_txns_.push_pop_mutex()->Lock();
875 }
876 const bool kLocked = true;
877 for (size_t i = 0; i < sub_batch_cnt_; i++) {
878 db_->AddPrepared(prepare_seq + i, kLocked);
879 }
880 if (do_unlock) {
881 db_->prepared_txns_.push_pop_mutex()->Unlock();
882 }
883 TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end");
884 if (first_prepare_batch_) {
885 assert(log_number != 0);
886 db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
887 log_number);
888 }
889 return Status::OK();
890 }
891
892 private:
893 WritePreparedTxnDB* db_;
894 DBImpl* db_impl_;
895 size_t sub_batch_cnt_;
896 bool two_write_queues_;
897 // It is 2PC and this is the first prepare batch. Always the case in 2PC
898 // unless it is WriteUnPrepared.
899 bool first_prepare_batch_;
900 };
901
902 class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
903 public:
904 // includes_data indicates that the commit also writes non-empty
905 // CommitTimeWriteBatch to memtable, which needs to be committed separately.
906 WritePreparedCommitEntryPreReleaseCallback(
907 WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
908 size_t prep_batch_cnt, size_t data_batch_cnt = 0,
909 SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
db_(db)910 : db_(db),
911 db_impl_(db_impl),
912 prep_seq_(prep_seq),
913 prep_batch_cnt_(prep_batch_cnt),
914 data_batch_cnt_(data_batch_cnt),
915 includes_data_(data_batch_cnt_ > 0),
916 aux_seq_(aux_seq),
917 aux_batch_cnt_(aux_batch_cnt),
918 includes_aux_batch_(aux_batch_cnt > 0) {
919 assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
920 assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
921 assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor
922 }
923
Callback(SequenceNumber commit_seq,bool is_mem_disabled,uint64_t,size_t,size_t)924 virtual Status Callback(SequenceNumber commit_seq,
925 bool is_mem_disabled __attribute__((__unused__)),
926 uint64_t, size_t /*index*/,
927 size_t /*total*/) override {
928 // Always commit from the 2nd queue
929 assert(!db_impl_->immutable_db_options().two_write_queues ||
930 is_mem_disabled);
931 assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
932 // Data batch is what accompanied with the commit marker and affects the
933 // last seq in the commit batch.
934 const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
935 ? commit_seq
936 : commit_seq + data_batch_cnt_ - 1;
937 if (prep_seq_ != kMaxSequenceNumber) {
938 for (size_t i = 0; i < prep_batch_cnt_; i++) {
939 db_->AddCommitted(prep_seq_ + i, last_commit_seq);
940 }
941 } // else there was no prepare phase
942 if (includes_aux_batch_) {
943 for (size_t i = 0; i < aux_batch_cnt_; i++) {
944 db_->AddCommitted(aux_seq_ + i, last_commit_seq);
945 }
946 }
947 if (includes_data_) {
948 assert(data_batch_cnt_);
949 // Commit the data that is accompanied with the commit request
950 for (size_t i = 0; i < data_batch_cnt_; i++) {
951 // For commit seq of each batch use the commit seq of the last batch.
952 // This would make debugging easier by having all the batches having
953 // the same sequence number.
954 db_->AddCommitted(commit_seq + i, last_commit_seq);
955 }
956 }
957 if (db_impl_->immutable_db_options().two_write_queues) {
958 assert(is_mem_disabled); // implies the 2nd queue
959 // Publish the sequence number. We can do that here assuming the callback
960 // is invoked only from one write queue, which would guarantee that the
961 // publish sequence numbers will be in order, i.e., once a seq is
962 // published all the seq prior to that are also publishable.
963 db_impl_->SetLastPublishedSequence(last_commit_seq);
964 // Note RemovePrepared should be called after publishing the seq.
965 // Otherwise SmallestUnCommittedSeq optimization breaks.
966 if (prep_seq_ != kMaxSequenceNumber) {
967 db_->RemovePrepared(prep_seq_, prep_batch_cnt_);
968 } // else there was no prepare phase
969 if (includes_aux_batch_) {
970 db_->RemovePrepared(aux_seq_, aux_batch_cnt_);
971 }
972 }
973 // else SequenceNumber that is updated as part of the write already does the
974 // publishing
975 return Status::OK();
976 }
977
978 private:
979 WritePreparedTxnDB* db_;
980 DBImpl* db_impl_;
981 // kMaxSequenceNumber if there was no prepare phase
982 SequenceNumber prep_seq_;
983 size_t prep_batch_cnt_;
984 size_t data_batch_cnt_;
985 // Data here is the batch that is written with the commit marker, either
986 // because it is commit without prepare or commit has a CommitTimeWriteBatch.
987 bool includes_data_;
988 // Auxiliary batch (if there is any) is a batch that is written before, but
989 // gets the same commit seq as prepare batch or data batch. This is used in
990 // two write queues where the CommitTimeWriteBatch becomes the aux batch and
991 // we do a separate write to actually commit everything.
992 SequenceNumber aux_seq_;
993 size_t aux_batch_cnt_;
994 bool includes_aux_batch_;
995 };
996
997 // For two_write_queues commit both the aborted batch and the cleanup batch and
998 // then published the seq
999 class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
1000 public:
WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB * db,DBImpl * db_impl,SequenceNumber prep_seq,SequenceNumber rollback_seq,size_t prep_batch_cnt)1001 WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
1002 DBImpl* db_impl,
1003 SequenceNumber prep_seq,
1004 SequenceNumber rollback_seq,
1005 size_t prep_batch_cnt)
1006 : db_(db),
1007 db_impl_(db_impl),
1008 prep_seq_(prep_seq),
1009 rollback_seq_(rollback_seq),
1010 prep_batch_cnt_(prep_batch_cnt) {
1011 assert(prep_seq != kMaxSequenceNumber);
1012 assert(rollback_seq != kMaxSequenceNumber);
1013 assert(prep_batch_cnt_ > 0);
1014 }
1015
Callback(SequenceNumber commit_seq,bool is_mem_disabled,uint64_t,size_t,size_t)1016 Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t,
1017 size_t /*index*/, size_t /*total*/) override {
1018 // Always commit from the 2nd queue
1019 assert(is_mem_disabled); // implies the 2nd queue
1020 assert(db_impl_->immutable_db_options().two_write_queues);
1021 #ifdef NDEBUG
1022 (void)is_mem_disabled;
1023 #endif
1024 const uint64_t last_commit_seq = commit_seq;
1025 db_->AddCommitted(rollback_seq_, last_commit_seq);
1026 for (size_t i = 0; i < prep_batch_cnt_; i++) {
1027 db_->AddCommitted(prep_seq_ + i, last_commit_seq);
1028 }
1029 db_impl_->SetLastPublishedSequence(last_commit_seq);
1030 return Status::OK();
1031 }
1032
1033 private:
1034 WritePreparedTxnDB* db_;
1035 DBImpl* db_impl_;
1036 SequenceNumber prep_seq_;
1037 SequenceNumber rollback_seq_;
1038 size_t prep_batch_cnt_;
1039 };
1040
1041 // Count the number of sub-batches inside a batch. A sub-batch does not have
1042 // duplicate keys.
1043 struct SubBatchCounter : public WriteBatch::Handler {
SubBatchCounterSubBatchCounter1044 explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
1045 : comparators_(comparators), batches_(1) {}
1046 std::map<uint32_t, const Comparator*>& comparators_;
1047 using CFKeys = std::set<Slice, SetComparator>;
1048 std::map<uint32_t, CFKeys> keys_;
1049 size_t batches_;
BatchCountSubBatchCounter1050 size_t BatchCount() { return batches_; }
1051 void AddKey(const uint32_t cf, const Slice& key);
1052 void InitWithComp(const uint32_t cf);
MarkNoopSubBatchCounter1053 Status MarkNoop(bool) override { return Status::OK(); }
MarkEndPrepareSubBatchCounter1054 Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
MarkCommitSubBatchCounter1055 Status MarkCommit(const Slice&) override { return Status::OK(); }
PutCFSubBatchCounter1056 Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
1057 AddKey(cf, key);
1058 return Status::OK();
1059 }
DeleteCFSubBatchCounter1060 Status DeleteCF(uint32_t cf, const Slice& key) override {
1061 AddKey(cf, key);
1062 return Status::OK();
1063 }
SingleDeleteCFSubBatchCounter1064 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
1065 AddKey(cf, key);
1066 return Status::OK();
1067 }
MergeCFSubBatchCounter1068 Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
1069 AddKey(cf, key);
1070 return Status::OK();
1071 }
MarkBeginPrepareSubBatchCounter1072 Status MarkBeginPrepare(bool) override { return Status::OK(); }
MarkRollbackSubBatchCounter1073 Status MarkRollback(const Slice&) override { return Status::OK(); }
WriteAfterCommitSubBatchCounter1074 bool WriteAfterCommit() const override { return false; }
1075 };
1076
AssignMinMaxSeqs(const Snapshot * snapshot,SequenceNumber * min,SequenceNumber * max)1077 SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
1078 SequenceNumber* min,
1079 SequenceNumber* max) {
1080 if (snapshot != nullptr) {
1081 *min = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
1082 ->min_uncommitted_;
1083 *max = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
1084 ->number_;
1085 return kBackedByDBSnapshot;
1086 } else {
1087 *min = SmallestUnCommittedSeq();
1088 *max = 0; // to be assigned later after sv is referenced.
1089 return kUnbackedByDBSnapshot;
1090 }
1091 }
1092
ValidateSnapshot(const SequenceNumber snap_seq,const SnapshotBackup backed_by_snapshot,std::memory_order order)1093 bool WritePreparedTxnDB::ValidateSnapshot(
1094 const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
1095 std::memory_order order) {
1096 if (backed_by_snapshot == kBackedByDBSnapshot) {
1097 return true;
1098 } else {
1099 SequenceNumber max = max_evicted_seq_.load(order);
1100 // Validate that max has not advanced the snapshot seq that is not backed
1101 // by a real snapshot. This is a very rare case that should not happen in
1102 // real workloads.
1103 if (UNLIKELY(snap_seq <= max && snap_seq != 0)) {
1104 return false;
1105 }
1106 }
1107 return true;
1108 }
1109
1110 } // namespace ROCKSDB_NAMESPACE
1111 #endif // ROCKSDB_LITE
1112