1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 #ifndef ROCKSDB_LITE
8 
9 #include <cinttypes>
10 #include <mutex>
11 #include <queue>
12 #include <set>
13 #include <string>
14 #include <unordered_map>
15 #include <vector>
16 
17 #include "db/db_iter.h"
18 #include "db/pre_release_callback.h"
19 #include "db/read_callback.h"
20 #include "db/snapshot_checker.h"
21 #include "rocksdb/db.h"
22 #include "rocksdb/options.h"
23 #include "rocksdb/utilities/transaction_db.h"
24 #include "util/cast_util.h"
25 #include "util/set_comparator.h"
26 #include "util/string_util.h"
27 #include "utilities/transactions/pessimistic_transaction.h"
28 #include "utilities/transactions/pessimistic_transaction_db.h"
29 #include "utilities/transactions/transaction_lock_mgr.h"
30 #include "utilities/transactions/write_prepared_txn.h"
31 
32 namespace ROCKSDB_NAMESPACE {
33 enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot };
34 
35 // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
36 // In this way some data in the DB might not be committed. The DB provides
37 // mechanisms to tell such data apart from committed data.
38 class WritePreparedTxnDB : public PessimisticTransactionDB {
39  public:
WritePreparedTxnDB(DB * db,const TransactionDBOptions & txn_db_options)40   explicit WritePreparedTxnDB(DB* db,
41                               const TransactionDBOptions& txn_db_options)
42       : PessimisticTransactionDB(db, txn_db_options),
43         SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
44         SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
45         COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
46         COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
47         FORMAT(COMMIT_CACHE_BITS) {
48     Init(txn_db_options);
49   }
50 
WritePreparedTxnDB(StackableDB * db,const TransactionDBOptions & txn_db_options)51   explicit WritePreparedTxnDB(StackableDB* db,
52                               const TransactionDBOptions& txn_db_options)
53       : PessimisticTransactionDB(db, txn_db_options),
54         SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
55         SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
56         COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
57         COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
58         FORMAT(COMMIT_CACHE_BITS) {
59     Init(txn_db_options);
60   }
61 
62   virtual ~WritePreparedTxnDB();
63 
64   virtual Status Initialize(
65       const std::vector<size_t>& compaction_enabled_cf_indices,
66       const std::vector<ColumnFamilyHandle*>& handles) override;
67 
68   Transaction* BeginTransaction(const WriteOptions& write_options,
69                                 const TransactionOptions& txn_options,
70                                 Transaction* old_txn) override;
71 
72   using TransactionDB::Write;
73   Status Write(const WriteOptions& opts, WriteBatch* updates) override;
74 
75   // Optimized version of ::Write that receives more optimization request such
76   // as skip_concurrency_control.
77   using PessimisticTransactionDB::Write;
78   Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&,
79                WriteBatch* updates) override;
80 
81   // Write the batch to the underlying DB and mark it as committed. Could be
82   // used by both directly from TxnDB or through a transaction.
83   Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch,
84                        size_t batch_cnt, WritePreparedTxn* txn);
85 
86   using DB::Get;
87   virtual Status Get(const ReadOptions& options,
88                      ColumnFamilyHandle* column_family, const Slice& key,
89                      PinnableSlice* value) override;
90 
91   using DB::MultiGet;
92   virtual std::vector<Status> MultiGet(
93       const ReadOptions& options,
94       const std::vector<ColumnFamilyHandle*>& column_family,
95       const std::vector<Slice>& keys,
96       std::vector<std::string>* values) override;
97 
98   using DB::NewIterator;
99   virtual Iterator* NewIterator(const ReadOptions& options,
100                                 ColumnFamilyHandle* column_family) override;
101 
102   using DB::NewIterators;
103   virtual Status NewIterators(
104       const ReadOptions& options,
105       const std::vector<ColumnFamilyHandle*>& column_families,
106       std::vector<Iterator*>* iterators) override;
107 
108   // Check whether the transaction that wrote the value with sequence number seq
109   // is visible to the snapshot with sequence number snapshot_seq.
110   // Returns true if commit_seq <= snapshot_seq
111   // If the snapshot_seq is already released and snapshot_seq <= max, sets
112   // *snap_released to true and returns true as well.
113   inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
114                            uint64_t min_uncommitted = kMinUnCommittedSeq,
115                            bool* snap_released = nullptr) const {
116     ROCKS_LOG_DETAILS(info_log_,
117                       "IsInSnapshot %" PRIu64 " in %" PRIu64
118                       " min_uncommitted %" PRIu64,
119                       prep_seq, snapshot_seq, min_uncommitted);
120     assert(min_uncommitted >= kMinUnCommittedSeq);
121     // Caller is responsible to initialize snap_released.
122     assert(snap_released == nullptr || *snap_released == false);
123     // Here we try to infer the return value without looking into prepare list.
124     // This would help avoiding synchronization over a shared map.
125     // TODO(myabandeh): optimize this. This sequence of checks must be correct
126     // but not necessary efficient
127     if (prep_seq == 0) {
128       // Compaction will output keys to bottom-level with sequence number 0 if
129       // it is visible to the earliest snapshot.
130       ROCKS_LOG_DETAILS(
131           info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
132           prep_seq, snapshot_seq, 1);
133       return true;
134     }
135     if (snapshot_seq < prep_seq) {
136       // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
137       ROCKS_LOG_DETAILS(
138           info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
139           prep_seq, snapshot_seq, 0);
140       return false;
141     }
142     if (prep_seq < min_uncommitted) {
143       ROCKS_LOG_DETAILS(info_log_,
144                         "IsInSnapshot %" PRIu64 " in %" PRIu64
145                         " returns %" PRId32
146                         " because of min_uncommitted %" PRIu64,
147                         prep_seq, snapshot_seq, 1, min_uncommitted);
148       return true;
149     }
150     // Commit of delayed prepared has two non-atomic steps: add to commit cache,
151     // remove from delayed prepared. Our reads from these two is also
152     // non-atomic. By looking into commit cache first thus we might not find the
153     // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
154     // we check if there was any delayed prepared BEFORE looking into commit
155     // cache, ii) if there was, we complete the search steps to be these: i)
156     // commit cache, ii) delayed prepared, commit cache again. In this way if
157     // the first query to commit cache missed the commit, the 2nd will catch it.
158     bool was_empty;
159     SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
160     CommitEntry64b dont_care;
161     auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
162     size_t repeats = 0;
163     do {
164       repeats++;
165       assert(repeats < 100);
166       if (UNLIKELY(repeats >= 100)) {
167         throw std::runtime_error(
168             "The read was intrupted 100 times by update to max_evicted_seq_. "
169             "This is unexpected in all setups");
170       }
171       max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
172       TEST_SYNC_POINT(
173           "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
174       TEST_SYNC_POINT(
175           "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
176       was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
177       TEST_SYNC_POINT(
178           "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
179       TEST_SYNC_POINT(
180           "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
181       CommitEntry cached;
182       bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
183       TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
184       TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
185       if (exist && prep_seq == cached.prep_seq) {
186         // It is committed and also not evicted from commit cache
187         ROCKS_LOG_DETAILS(
188             info_log_,
189             "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
190             prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
191         return cached.commit_seq <= snapshot_seq;
192       }
193       // else it could be committed but not inserted in the map which could
194       // happen after recovery, or it could be committed and evicted by another
195       // commit, or never committed.
196 
197       // At this point we dont know if it was committed or it is still prepared
198       max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
199       if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) {
200         continue;
201       }
202       // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
203       if (max_evicted_seq_ub < prep_seq) {
204         // Not evicted from cache and also not present, so must be still
205         // prepared
206         ROCKS_LOG_DETAILS(info_log_,
207                           "IsInSnapshot %" PRIu64 " in %" PRIu64
208                           " returns %" PRId32,
209                           prep_seq, snapshot_seq, 0);
210         return false;
211       }
212       TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
213       TEST_SYNC_POINT(
214           "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
215       if (!was_empty) {
216         // We should not normally reach here
217         WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
218         ReadLock rl(&prepared_mutex_);
219         ROCKS_LOG_WARN(
220             info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
221             static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
222         if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
223           // This is the order: 1) delayed_prepared_commits_ update, 2) publish
224           // 3) delayed_prepared_ clean up. So check if it is the case of a late
225           // clenaup.
226           auto it = delayed_prepared_commits_.find(prep_seq);
227           if (it == delayed_prepared_commits_.end()) {
228             // Then it is not committed yet
229             ROCKS_LOG_DETAILS(info_log_,
230                               "IsInSnapshot %" PRIu64 " in %" PRIu64
231                               " returns %" PRId32,
232                               prep_seq, snapshot_seq, 0);
233             return false;
234           } else {
235             ROCKS_LOG_DETAILS(info_log_,
236                               "IsInSnapshot %" PRIu64 " in %" PRIu64
237                               " commit: %" PRIu64 " returns %" PRId32,
238                               prep_seq, snapshot_seq, it->second,
239                               snapshot_seq <= it->second);
240             return it->second <= snapshot_seq;
241           }
242         } else {
243           // 2nd query to commit cache. Refer to was_empty comment above.
244           exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
245           if (exist && prep_seq == cached.prep_seq) {
246             ROCKS_LOG_DETAILS(
247                 info_log_,
248                 "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
249                 prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
250             return cached.commit_seq <= snapshot_seq;
251           }
252           max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
253         }
254       }
255     } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub));
256     // When advancing max_evicted_seq_, we move older entires from prepared to
257     // delayed_prepared_. Also we move evicted entries from commit cache to
258     // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
259     // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
260     // old_commit_map_, iii) committed with no conflict with any snapshot. Case
261     // (i) delayed_prepared_ is checked above
262     if (max_evicted_seq_ub < snapshot_seq) {  // then (ii) cannot be the case
263       // only (iii) is the case: committed
264       // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
265       // snapshot_seq
266       ROCKS_LOG_DETAILS(
267           info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
268           prep_seq, snapshot_seq, 1);
269       return true;
270     }
271     // else (ii) might be the case: check the commit data saved for this
272     // snapshot. If there was no overlapping commit entry, then it is committed
273     // with a commit_seq lower than any live snapshot, including snapshot_seq.
274     if (old_commit_map_empty_.load(std::memory_order_acquire)) {
275       ROCKS_LOG_DETAILS(info_log_,
276                         "IsInSnapshot %" PRIu64 " in %" PRIu64
277                         " returns %" PRId32 " released=1",
278                         prep_seq, snapshot_seq, 0);
279       assert(snap_released);
280       // This snapshot is not valid anymore. We cannot tell if prep_seq is
281       // committed before or after the snapshot. Return true but also set
282       // snap_released to true.
283       *snap_released = true;
284       return true;
285     }
286     {
287       // We should not normally reach here unless sapshot_seq is old. This is a
288       // rare case and it is ok to pay the cost of mutex ReadLock for such old,
289       // reading transactions.
290       WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
291       ReadLock rl(&old_commit_map_mutex_);
292       auto prep_set_entry = old_commit_map_.find(snapshot_seq);
293       bool found = prep_set_entry != old_commit_map_.end();
294       if (found) {
295         auto& vec = prep_set_entry->second;
296         found = std::binary_search(vec.begin(), vec.end(), prep_seq);
297       } else {
298         // coming from compaction
299         ROCKS_LOG_DETAILS(info_log_,
300                           "IsInSnapshot %" PRIu64 " in %" PRIu64
301                           " returns %" PRId32 " released=1",
302                           prep_seq, snapshot_seq, 0);
303         // This snapshot is not valid anymore. We cannot tell if prep_seq is
304         // committed before or after the snapshot. Return true but also set
305         // snap_released to true.
306         assert(snap_released);
307         *snap_released = true;
308         return true;
309       }
310 
311       if (!found) {
312         ROCKS_LOG_DETAILS(info_log_,
313                           "IsInSnapshot %" PRIu64 " in %" PRIu64
314                           " returns %" PRId32,
315                           prep_seq, snapshot_seq, 1);
316         return true;
317       }
318     }
319     // (ii) it the case: it is committed but after the snapshot_seq
320     ROCKS_LOG_DETAILS(
321         info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
322         prep_seq, snapshot_seq, 0);
323     return false;
324   }
325 
326   // Add the transaction with prepare sequence seq to the prepared list.
327   // Note: must be called serially with increasing seq on each call.
328   // locked is true if prepared_mutex_ is already locked.
329   void AddPrepared(uint64_t seq, bool locked = false);
330   // Check if any of the prepared txns are less than new max_evicted_seq_. Must
331   // be called with prepared_mutex_ write locked.
332   void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked);
333   // Remove the transaction with prepare sequence seq from the prepared list
334   void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
335   // Add the transaction with prepare sequence prepare_seq and commit sequence
336   // commit_seq to the commit map. loop_cnt is to detect infinite loops.
337   // Note: must be called serially.
338   void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
339                     uint8_t loop_cnt = 0);
340 
341   struct CommitEntry {
342     uint64_t prep_seq;
343     uint64_t commit_seq;
CommitEntryCommitEntry344     CommitEntry() : prep_seq(0), commit_seq(0) {}
CommitEntryCommitEntry345     CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
346     bool operator==(const CommitEntry& rhs) const {
347       return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
348     }
349   };
350 
351   struct CommitEntry64bFormat {
CommitEntry64bFormatCommitEntry64bFormat352     explicit CommitEntry64bFormat(size_t index_bits)
353         : INDEX_BITS(index_bits),
354           PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
355           COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
356           COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)),
357           DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {}
358     // Number of higher bits of a sequence number that is not used. They are
359     // used to encode the value type, ...
360     const size_t PAD_BITS = static_cast<size_t>(8);
361     // Number of lower bits from prepare seq that can be skipped as they are
362     // implied by the index of the entry in the array
363     const size_t INDEX_BITS;
364     // Number of bits we use to encode the prepare seq
365     const size_t PREP_BITS;
366     // Number of bits we use to encode the commit seq.
367     const size_t COMMIT_BITS;
368     // Filter to encode/decode commit seq
369     const uint64_t COMMIT_FILTER;
370     // The value of commit_seq - prepare_seq + 1 must be less than this bound
371     const uint64_t DELTA_UPPERBOUND;
372   };
373 
374   // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
375   // INDEX Delta Seq (64 bits)   = 0 0 0 0 0 0 0 0 0  0 0 0 DELTA DELTA ...
376   // DELTA DELTA Encoded Value         = PREP PREP .... PREP PREP DELTA DELTA
377   // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
378   // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
379   // bits that do not have to be encoded (will be provided externally) DELTA:
380   // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
381   // index bits + PADs
382   struct CommitEntry64b {
CommitEntry64bCommitEntry64b383     constexpr CommitEntry64b() noexcept : rep_(0) {}
384 
CommitEntry64bCommitEntry64b385     CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
386         : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
387 
CommitEntry64bCommitEntry64b388     CommitEntry64b(const uint64_t ps, const uint64_t cs,
389                    const CommitEntry64bFormat& format) {
390       assert(ps < static_cast<uint64_t>(
391                       (1ull << (format.PREP_BITS + format.INDEX_BITS))));
392       assert(ps <= cs);
393       uint64_t delta = cs - ps + 1;  // make initialized delta always >= 1
394       // zero is reserved for uninitialized entries
395       assert(0 < delta);
396       assert(delta < format.DELTA_UPPERBOUND);
397       if (delta >= format.DELTA_UPPERBOUND) {
398         throw std::runtime_error(
399             "commit_seq >> prepare_seq. The allowed distance is " +
400             ToString(format.DELTA_UPPERBOUND) + " commit_seq is " +
401             ToString(cs) + " prepare_seq is " + ToString(ps));
402       }
403       rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
404       rep_ = rep_ | delta;
405     }
406 
407     // Return false if the entry is empty
ParseCommitEntry64b408     bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
409                const CommitEntry64bFormat& format) {
410       uint64_t delta = rep_ & format.COMMIT_FILTER;
411       // zero is reserved for uninitialized entries
412       assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
413       if (delta == 0) {
414         return false;  // initialized entry would have non-zero delta
415       }
416 
417       assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
418       uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
419       prep_up >>= format.PAD_BITS;
420       const uint64_t& prep_low = indexed_seq;
421       entry->prep_seq = prep_up | prep_low;
422 
423       entry->commit_seq = entry->prep_seq + delta - 1;
424       return true;
425     }
426 
427    private:
428     uint64_t rep_;
429   };
430 
431   // Struct to hold ownership of snapshot and read callback for cleanup.
432   struct IteratorState;
433 
GetCFComparatorMap()434   std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() {
435     return cf_map_;
436   }
GetCFHandleMap()437   std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() {
438     return handle_map_;
439   }
440   void UpdateCFComparatorMap(
441       const std::vector<ColumnFamilyHandle*>& handles) override;
442   void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
443 
444   virtual const Snapshot* GetSnapshot() override;
445   SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
446 
447  protected:
448   virtual Status VerifyCFOptions(
449       const ColumnFamilyOptions& cf_options) override;
450   // Assign the min and max sequence numbers for reading from the db. A seq >
451   // max is not valid, and a seq < min is valid, and a min <= seq < max requires
452   // further checking. Normally max is defined by the snapshot and min is by
453   // minimum uncommitted seq.
454   inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot,
455                                          SequenceNumber* min,
456                                          SequenceNumber* max);
457   // Validate is a snapshot sequence number is still valid based on the latest
458   // db status. backed_by_snapshot specifies if the number is baked by an actual
459   // snapshot object. order specified the memory order with which we load the
460   // atomic variables: relax is enough for the default since we care about last
461   // value seen by same thread.
462   inline bool ValidateSnapshot(
463       const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
464       std::memory_order order = std::memory_order_relaxed);
465   // Get a dummy snapshot that refers to kMaxSequenceNumber
GetMaxSnapshot()466   Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
467 
468  private:
469   friend class AddPreparedCallback;
470   friend class PreparedHeap_BasicsTest_Test;
471   friend class PreparedHeap_Concurrent_Test;
472   friend class PreparedHeap_EmptyAtTheEnd_Test;
473   friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test;
474   friend class WritePreparedCommitEntryPreReleaseCallback;
475   friend class WritePreparedTransactionTestBase;
476   friend class WritePreparedTxn;
477   friend class WritePreparedTxnDBMock;
478   friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
479   friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test;
480   friend class
481       WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test;
482   friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
483   friend class WritePreparedTransactionTest_BasicRecovery_Test;
484   friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test;
485   friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
486   friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test;
487   friend class WritePreparedTransactionTest_CommitMap_Test;
488   friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
489   friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test;
490   friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
491   friend class WritePreparedTransactionTest_IsInSnapshot_Test;
492   friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
493   friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
494   friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test;
495   friend class
496       WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
497   friend class
498       WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
499   friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
500   friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
501   friend class WritePreparedTransactionTest_Rollback_Test;
502   friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test;
503   friend class WriteUnpreparedTxn;
504   friend class WriteUnpreparedTxnDB;
505   friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
506 
507   void Init(const TransactionDBOptions& /* unused */);
508 
WPRecordTick(uint32_t ticker_type)509   void WPRecordTick(uint32_t ticker_type) const {
510     RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
511   }
512 
513   // A heap with the amortized O(1) complexity for erase. It uses one extra heap
514   // to keep track of erased entries that are not yet on top of the main heap.
515   class PreparedHeap {
516     // The mutex is required for push and pop from PreparedHeap. ::erase will
517     // use external synchronization via prepared_mutex_.
518     port::Mutex push_pop_mutex_;
519     std::deque<uint64_t> heap_;
520     std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
521         erased_heap_;
522     std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
523     // True when testing crash recovery
524     bool TEST_CRASH_ = false;
525     friend class WritePreparedTxnDB;
526 
527    public:
~PreparedHeap()528     ~PreparedHeap() {
529       if (!TEST_CRASH_) {
530         assert(heap_.empty());
531         assert(erased_heap_.empty());
532       }
533     }
push_pop_mutex()534     port::Mutex* push_pop_mutex() { return &push_pop_mutex_; }
535 
empty()536     inline bool empty() { return top() == kMaxSequenceNumber; }
537     // Returns kMaxSequenceNumber if empty() and the smallest otherwise.
top()538     inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
push(uint64_t v)539     inline void push(uint64_t v) {
540       push_pop_mutex_.AssertHeld();
541       if (heap_.empty()) {
542         heap_top_.store(v, std::memory_order_release);
543       } else {
544         assert(heap_top_.load() < v);
545       }
546       heap_.push_back(v);
547     }
548     void pop(bool locked = false) {
549       if (!locked) {
550         push_pop_mutex()->Lock();
551       }
552       push_pop_mutex_.AssertHeld();
553       heap_.pop_front();
554       while (!heap_.empty() && !erased_heap_.empty() &&
555              // heap_.top() > erased_heap_.top() could happen if we have erased
556              // a non-existent entry. Ideally the user should not do that but we
557              // should be resilient against it.
558              heap_.front() >= erased_heap_.top()) {
559         if (heap_.front() == erased_heap_.top()) {
560           heap_.pop_front();
561         }
562         uint64_t erased __attribute__((__unused__));
563         erased = erased_heap_.top();
564         erased_heap_.pop();
565         // No duplicate prepare sequence numbers
566         assert(erased_heap_.empty() || erased_heap_.top() != erased);
567       }
568       while (heap_.empty() && !erased_heap_.empty()) {
569         erased_heap_.pop();
570       }
571       heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber,
572                       std::memory_order_release);
573       if (!locked) {
574         push_pop_mutex()->Unlock();
575       }
576     }
577     // Concurrrent calls needs external synchronization. It is safe to be called
578     // concurrent to push and pop though.
erase(uint64_t seq)579     void erase(uint64_t seq) {
580       if (!empty()) {
581         auto top_seq = top();
582         if (seq < top_seq) {
583           // Already popped, ignore it.
584         } else if (top_seq == seq) {
585           pop();
586 #ifndef NDEBUG
587           MutexLock ml(push_pop_mutex());
588           assert(heap_.empty() || heap_.front() != seq);
589 #endif
590         } else {  // top() > seq
591           // Down the heap, remember to pop it later
592           erased_heap_.push(seq);
593         }
594       }
595     }
596   };
597 
TEST_Crash()598   void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; }
599 
600   // Get the commit entry with index indexed_seq from the commit table. It
601   // returns true if such entry exists.
602   bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
603                       CommitEntry* entry) const;
604 
605   // Rewrite the entry with the index indexed_seq in the commit table with the
606   // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
607   // sets the evicted_entry and returns true.
608   bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
609                       CommitEntry* evicted_entry);
610 
611   // Rewrite the entry with the index indexed_seq in the commit table with the
612   // commit entry new_entry only if the existing entry matches the
613   // expected_entry. Returns false otherwise.
614   bool ExchangeCommitEntry(const uint64_t indexed_seq,
615                            CommitEntry64b& expected_entry,
616                            const CommitEntry& new_entry);
617 
618   // Increase max_evicted_seq_ from the previous value prev_max to the new
619   // value. This also involves taking care of prepared txns that are not
620   // committed before new_max, as well as updating the list of live snapshots at
621   // the time of updating the max. Thread-safety: this function can be called
622   // concurrently. The concurrent invocations of this function is equivalent to
623   // a serial invocation in which the last invocation is the one with the
624   // largest new_max value.
625   void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
626                             const SequenceNumber& new_max);
627 
SmallestUnCommittedSeq()628   inline SequenceNumber SmallestUnCommittedSeq() {
629     // Note: We have two lists to look into, but for performance reasons they
630     // are not read atomically. Since CheckPreparedAgainstMax copies the entry
631     // to delayed_prepared_ before removing it from prepared_txns_, to ensure
632     // that a prepared entry will not go unmissed, we look into them in opposite
633     // order: first read prepared_txns_ and then delayed_prepared_.
634 
635     // This must be called before calling ::top. This is because the concurrent
636     // thread would call ::RemovePrepared before updating
637     // GetLatestSequenceNumber(). Reading then in opposite order here guarantees
638     // that the ::top that we read would be lower the ::top if we had otherwise
639     // update/read them atomically.
640     auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
641     auto min_prepare = prepared_txns_.top();
642     // Since we update the prepare_heap always from the main write queue via
643     // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
644     // prepared data in 2pc transactions. For non-2pc transactions that are
645     // written in two steps, we also update prepared_txns_ at the first step
646     // (via the same mechanism) so that their uncommitted data is reflected in
647     // SmallestUnCommittedSeq.
648     if (!delayed_prepared_empty_.load()) {
649       ReadLock rl(&prepared_mutex_);
650       if (!delayed_prepared_.empty()) {
651         return *delayed_prepared_.begin();
652       }
653     }
654     bool empty = min_prepare == kMaxSequenceNumber;
655     if (empty) {
656       // Since GetLatestSequenceNumber is updated
657       // after prepared_txns_ are, the value of GetLatestSequenceNumber would
658       // reflect any uncommitted data that is not added to prepared_txns_ yet.
659       // Otherwise, if there is no concurrent txn, this value simply reflects
660       // that latest value in the memtable.
661       return next_prepare;
662     } else {
663       return std::min(min_prepare, next_prepare);
664     }
665   }
666 
667   // Enhance the snapshot object by recording in it the smallest uncommitted seq
EnhanceSnapshot(SnapshotImpl * snapshot,SequenceNumber min_uncommitted)668   inline void EnhanceSnapshot(SnapshotImpl* snapshot,
669                               SequenceNumber min_uncommitted) {
670     assert(snapshot);
671     assert(min_uncommitted <= snapshot->number_ + 1);
672     snapshot->min_uncommitted_ = min_uncommitted;
673   }
674 
675   virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
676       SequenceNumber max);
677 
678   // Will be called by the public ReleaseSnapshot method. Does the maintenance
679   // internal to WritePreparedTxnDB
680   void ReleaseSnapshotInternal(const SequenceNumber snap_seq);
681 
682   // Update the list of snapshots corresponding to the soon-to-be-updated
683   // max_evicted_seq_. Thread-safety: this function can be called concurrently.
684   // The concurrent invocations of this function is equivalent to a serial
685   // invocation in which the last invocation is the one with the largest
686   // version value.
687   void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
688                        const SequenceNumber& version);
689   // Check the new list of new snapshots against the old one to see  if any of
690   // the snapshots are released and to do the cleanup for the released snapshot.
691   void CleanupReleasedSnapshots(
692       const std::vector<SequenceNumber>& new_snapshots,
693       const std::vector<SequenceNumber>& old_snapshots);
694 
695   // Check an evicted entry against live snapshots to see if it should be kept
696   // around or it can be safely discarded (and hence assume committed for all
697   // snapshots). Thread-safety: this function can be called concurrently. If it
698   // is called concurrently with multiple UpdateSnapshots, the result is the
699   // same as checking the intersection of the snapshot list before updates with
700   // the snapshot list of all the concurrent updates.
701   void CheckAgainstSnapshots(const CommitEntry& evicted);
702 
703   // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
704   // commit_seq. Return false if checking the next snapshot(s) is not needed.
705   // This is the case if none of the next snapshots could satisfy the condition.
706   // next_is_larger: the next snapshot will be a larger value
707   bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
708                                const uint64_t& commit_seq,
709                                const uint64_t& snapshot_seq,
710                                const bool next_is_larger);
711 
712   // A trick to increase the last visible sequence number by one and also wait
713   // for the in-flight commits to be visible.
714   void AdvanceSeqByOne();
715 
716   // The list of live snapshots at the last time that max_evicted_seq_ advanced.
717   // The list stored into two data structures: in snapshot_cache_ that is
718   // efficient for concurrent reads, and in snapshots_ if the data does not fit
719   // into snapshot_cache_. The total number of snapshots in the two lists
720   std::atomic<size_t> snapshots_total_ = {};
721   // The list sorted in ascending order. Thread-safety for writes is provided
722   // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
723   // each entry. In x86_64 architecture such reads are compiled to simple read
724   // instructions.
725   const size_t SNAPSHOT_CACHE_BITS;
726   const size_t SNAPSHOT_CACHE_SIZE;
727   std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
728   // 2nd list for storing snapshots. The list sorted in ascending order.
729   // Thread-safety is provided with snapshots_mutex_.
730   std::vector<SequenceNumber> snapshots_;
731   // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
732   // redundant but simplifies CleanupOldSnapshots implementation.
733   // Thread-safety is provided with snapshots_mutex_.
734   std::vector<SequenceNumber> snapshots_all_;
735   // The version of the latest list of snapshots. This can be used to avoid
736   // rewriting a list that is concurrently updated with a more recent version.
737   SequenceNumber snapshots_version_ = 0;
738 
739   // A heap of prepared transactions. Thread-safety is provided with
740   // prepared_mutex_.
741   PreparedHeap prepared_txns_;
742   const size_t COMMIT_CACHE_BITS;
743   const size_t COMMIT_CACHE_SIZE;
744   const CommitEntry64bFormat FORMAT;
745   // commit_cache_ must be initialized to zero to tell apart an empty index from
746   // a filled one. Thread-safety is provided with commit_cache_mutex_.
747   std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
748   // The largest evicted *commit* sequence number from the commit_cache_. If a
749   // seq is smaller than max_evicted_seq_ is might or might not be present in
750   // commit_cache_. So commit_cache_ must first be checked before consulting
751   // with max_evicted_seq_.
752   std::atomic<uint64_t> max_evicted_seq_ = {};
753   // Order: 1) update future_max_evicted_seq_ = new_max, 2)
754   // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
755   // GetSnapshotInternal guarantess that the snapshot seq is larger than
756   // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
757   // than max has already being looked at via a GetSnapshotListFromDB(new_max).
758   std::atomic<uint64_t> future_max_evicted_seq_ = {};
759   // Advance max_evicted_seq_ by this value each time it needs an update. The
760   // larger the value, the less frequent advances we would have. We do not want
761   // it to be too large either as it would cause stalls by doing too much
762   // maintenance work under the lock.
763   size_t INC_STEP_FOR_MAX_EVICTED = 1;
764   // A map from old snapshots (expected to be used by a few read-only txns) to
765   // prepared sequence number of the evicted entries from commit_cache_ that
766   // overlaps with such snapshot. These are the prepared sequence numbers that
767   // the snapshot, to which they are mapped, cannot assume to be committed just
768   // because it is no longer in the commit_cache_. The vector must be sorted
769   // after each update.
770   // Thread-safety is provided with old_commit_map_mutex_.
771   std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_;
772   // A set of long-running prepared transactions that are not finished by the
773   // time max_evicted_seq_ advances their sequence number. This is expected to
774   // be empty normally. Thread-safety is provided with prepared_mutex_.
775   std::set<uint64_t> delayed_prepared_;
776   // Commit of a delayed prepared: 1) update commit cache, 2) update
777   // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
778   // delayed_prepared_commits_ will help us tell apart the unprepared txns from
779   // the ones that are committed but not cleaned up yet.
780   std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
781   // Update when delayed_prepared_.empty() changes. Expected to be true
782   // normally.
783   std::atomic<bool> delayed_prepared_empty_ = {true};
784   // Update when old_commit_map_.empty() changes. Expected to be true normally.
785   std::atomic<bool> old_commit_map_empty_ = {true};
786   mutable port::RWMutex prepared_mutex_;
787   mutable port::RWMutex old_commit_map_mutex_;
788   mutable port::RWMutex commit_cache_mutex_;
789   mutable port::RWMutex snapshots_mutex_;
790   // A cache of the cf comparators
791   // Thread safety: since it is a const it is safe to read it concurrently
792   std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_;
793   // A cache of the cf handles
794   // Thread safety: since the handle is read-only object it is a const it is
795   // safe to read it concurrently
796   std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
797   // A dummy snapshot object that refers to kMaxSequenceNumber
798   SnapshotImpl dummy_max_snapshot_;
799 };
800 
801 class WritePreparedTxnReadCallback : public ReadCallback {
802  public:
WritePreparedTxnReadCallback(WritePreparedTxnDB * db,SequenceNumber snapshot)803   WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
804       : ReadCallback(snapshot),
805         db_(db),
806         backed_by_snapshot_(kBackedByDBSnapshot) {}
WritePreparedTxnReadCallback(WritePreparedTxnDB * db,SequenceNumber snapshot,SequenceNumber min_uncommitted,SnapshotBackup backed_by_snapshot)807   WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
808                                SequenceNumber min_uncommitted,
809                                SnapshotBackup backed_by_snapshot)
810       : ReadCallback(snapshot, min_uncommitted),
811         db_(db),
812         backed_by_snapshot_(backed_by_snapshot) {
813     (void)backed_by_snapshot_;  // to silence unused private field warning
814   }
815 
~WritePreparedTxnReadCallback()816   virtual ~WritePreparedTxnReadCallback() {
817     // If it is not backed by snapshot, the caller must check validity
818     assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
819   }
820 
821   // Will be called to see if the seq number visible; if not it moves on to
822   // the next seq number.
IsVisibleFullCheck(SequenceNumber seq)823   inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
824     auto snapshot = max_visible_seq_;
825     bool snap_released = false;
826     auto ret =
827         db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released);
828     assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
829     snap_released_ |= snap_released;
830     return ret;
831   }
832 
valid()833   inline bool valid() {
834     valid_checked_ = true;
835     return snap_released_ == false;
836   }
837 
838   // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
839  private:
840   WritePreparedTxnDB* db_;
841   // Whether max_visible_seq_ is backed by a snapshot
842   const SnapshotBackup backed_by_snapshot_;
843   bool snap_released_ = false;
844   // Safety check to ensure that the caller has checked invalid statuses
845   bool valid_checked_ = false;
846 };
847 
848 class AddPreparedCallback : public PreReleaseCallback {
849  public:
AddPreparedCallback(WritePreparedTxnDB * db,DBImpl * db_impl,size_t sub_batch_cnt,bool two_write_queues,bool first_prepare_batch)850   AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
851                       size_t sub_batch_cnt, bool two_write_queues,
852                       bool first_prepare_batch)
853       : db_(db),
854         db_impl_(db_impl),
855         sub_batch_cnt_(sub_batch_cnt),
856         two_write_queues_(two_write_queues),
857         first_prepare_batch_(first_prepare_batch) {
858     (void)two_write_queues_;  // to silence unused private field warning
859   }
Callback(SequenceNumber prepare_seq,bool is_mem_disabled,uint64_t log_number,size_t index,size_t total)860   virtual Status Callback(SequenceNumber prepare_seq,
861                           bool is_mem_disabled __attribute__((__unused__)),
862                           uint64_t log_number, size_t index,
863                           size_t total) override {
864     assert(index < total);
865     // To reduce the cost of lock acquisition competing with the concurrent
866     // prepare requests, lock on the first callback and unlock on the last.
867     const bool do_lock = !two_write_queues_ || index == 0;
868     const bool do_unlock = !two_write_queues_ || index + 1 == total;
869     // Always Prepare from the main queue
870     assert(!two_write_queues_ || !is_mem_disabled);  // implies the 1st queue
871     TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause");
872     TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume");
873     if (do_lock) {
874       db_->prepared_txns_.push_pop_mutex()->Lock();
875     }
876     const bool kLocked = true;
877     for (size_t i = 0; i < sub_batch_cnt_; i++) {
878       db_->AddPrepared(prepare_seq + i, kLocked);
879     }
880     if (do_unlock) {
881       db_->prepared_txns_.push_pop_mutex()->Unlock();
882     }
883     TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end");
884     if (first_prepare_batch_) {
885       assert(log_number != 0);
886       db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
887           log_number);
888     }
889     return Status::OK();
890   }
891 
892  private:
893   WritePreparedTxnDB* db_;
894   DBImpl* db_impl_;
895   size_t sub_batch_cnt_;
896   bool two_write_queues_;
897   // It is 2PC and this is the first prepare batch. Always the case in 2PC
898   // unless it is WriteUnPrepared.
899   bool first_prepare_batch_;
900 };
901 
902 class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
903  public:
904   // includes_data indicates that the commit also writes non-empty
905   // CommitTimeWriteBatch to memtable, which needs to be committed separately.
906   WritePreparedCommitEntryPreReleaseCallback(
907       WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
908       size_t prep_batch_cnt, size_t data_batch_cnt = 0,
909       SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
db_(db)910       : db_(db),
911         db_impl_(db_impl),
912         prep_seq_(prep_seq),
913         prep_batch_cnt_(prep_batch_cnt),
914         data_batch_cnt_(data_batch_cnt),
915         includes_data_(data_batch_cnt_ > 0),
916         aux_seq_(aux_seq),
917         aux_batch_cnt_(aux_batch_cnt),
918         includes_aux_batch_(aux_batch_cnt > 0) {
919     assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber));  // xor
920     assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
921     assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber));  // xor
922   }
923 
Callback(SequenceNumber commit_seq,bool is_mem_disabled,uint64_t,size_t,size_t)924   virtual Status Callback(SequenceNumber commit_seq,
925                           bool is_mem_disabled __attribute__((__unused__)),
926                           uint64_t, size_t /*index*/,
927                           size_t /*total*/) override {
928     // Always commit from the 2nd queue
929     assert(!db_impl_->immutable_db_options().two_write_queues ||
930            is_mem_disabled);
931     assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
932     // Data batch is what accompanied with the commit marker and affects the
933     // last seq in the commit batch.
934     const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
935                                          ? commit_seq
936                                          : commit_seq + data_batch_cnt_ - 1;
937     if (prep_seq_ != kMaxSequenceNumber) {
938       for (size_t i = 0; i < prep_batch_cnt_; i++) {
939         db_->AddCommitted(prep_seq_ + i, last_commit_seq);
940       }
941     }  // else there was no prepare phase
942     if (includes_aux_batch_) {
943       for (size_t i = 0; i < aux_batch_cnt_; i++) {
944         db_->AddCommitted(aux_seq_ + i, last_commit_seq);
945       }
946     }
947     if (includes_data_) {
948       assert(data_batch_cnt_);
949       // Commit the data that is accompanied with the commit request
950       for (size_t i = 0; i < data_batch_cnt_; i++) {
951         // For commit seq of each batch use the commit seq of the last batch.
952         // This would make debugging easier by having all the batches having
953         // the same sequence number.
954         db_->AddCommitted(commit_seq + i, last_commit_seq);
955       }
956     }
957     if (db_impl_->immutable_db_options().two_write_queues) {
958       assert(is_mem_disabled);  // implies the 2nd queue
959       // Publish the sequence number. We can do that here assuming the callback
960       // is invoked only from one write queue, which would guarantee that the
961       // publish sequence numbers will be in order, i.e., once a seq is
962       // published all the seq prior to that are also publishable.
963       db_impl_->SetLastPublishedSequence(last_commit_seq);
964       // Note RemovePrepared should be called after publishing the seq.
965       // Otherwise SmallestUnCommittedSeq optimization breaks.
966       if (prep_seq_ != kMaxSequenceNumber) {
967         db_->RemovePrepared(prep_seq_, prep_batch_cnt_);
968       }  // else there was no prepare phase
969       if (includes_aux_batch_) {
970         db_->RemovePrepared(aux_seq_, aux_batch_cnt_);
971       }
972     }
973     // else SequenceNumber that is updated as part of the write already does the
974     // publishing
975     return Status::OK();
976   }
977 
978  private:
979   WritePreparedTxnDB* db_;
980   DBImpl* db_impl_;
981   // kMaxSequenceNumber if there was no prepare phase
982   SequenceNumber prep_seq_;
983   size_t prep_batch_cnt_;
984   size_t data_batch_cnt_;
985   // Data here is the batch that is written with the commit marker, either
986   // because it is commit without prepare or commit has a CommitTimeWriteBatch.
987   bool includes_data_;
988   // Auxiliary batch (if there is any) is a batch that is written before, but
989   // gets the same commit seq as prepare batch or data batch. This is used in
990   // two write queues where the CommitTimeWriteBatch becomes the aux batch and
991   // we do a separate write to actually commit everything.
992   SequenceNumber aux_seq_;
993   size_t aux_batch_cnt_;
994   bool includes_aux_batch_;
995 };
996 
997 // For two_write_queues commit both the aborted batch and the cleanup batch and
998 // then published the seq
999 class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
1000  public:
WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB * db,DBImpl * db_impl,SequenceNumber prep_seq,SequenceNumber rollback_seq,size_t prep_batch_cnt)1001   WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
1002                                           DBImpl* db_impl,
1003                                           SequenceNumber prep_seq,
1004                                           SequenceNumber rollback_seq,
1005                                           size_t prep_batch_cnt)
1006       : db_(db),
1007         db_impl_(db_impl),
1008         prep_seq_(prep_seq),
1009         rollback_seq_(rollback_seq),
1010         prep_batch_cnt_(prep_batch_cnt) {
1011     assert(prep_seq != kMaxSequenceNumber);
1012     assert(rollback_seq != kMaxSequenceNumber);
1013     assert(prep_batch_cnt_ > 0);
1014   }
1015 
Callback(SequenceNumber commit_seq,bool is_mem_disabled,uint64_t,size_t,size_t)1016   Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t,
1017                   size_t /*index*/, size_t /*total*/) override {
1018     // Always commit from the 2nd queue
1019     assert(is_mem_disabled);  // implies the 2nd queue
1020     assert(db_impl_->immutable_db_options().two_write_queues);
1021 #ifdef NDEBUG
1022     (void)is_mem_disabled;
1023 #endif
1024     const uint64_t last_commit_seq = commit_seq;
1025     db_->AddCommitted(rollback_seq_, last_commit_seq);
1026     for (size_t i = 0; i < prep_batch_cnt_; i++) {
1027       db_->AddCommitted(prep_seq_ + i, last_commit_seq);
1028     }
1029     db_impl_->SetLastPublishedSequence(last_commit_seq);
1030     return Status::OK();
1031   }
1032 
1033  private:
1034   WritePreparedTxnDB* db_;
1035   DBImpl* db_impl_;
1036   SequenceNumber prep_seq_;
1037   SequenceNumber rollback_seq_;
1038   size_t prep_batch_cnt_;
1039 };
1040 
1041 // Count the number of sub-batches inside a batch. A sub-batch does not have
1042 // duplicate keys.
1043 struct SubBatchCounter : public WriteBatch::Handler {
SubBatchCounterSubBatchCounter1044   explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
1045       : comparators_(comparators), batches_(1) {}
1046   std::map<uint32_t, const Comparator*>& comparators_;
1047   using CFKeys = std::set<Slice, SetComparator>;
1048   std::map<uint32_t, CFKeys> keys_;
1049   size_t batches_;
BatchCountSubBatchCounter1050   size_t BatchCount() { return batches_; }
1051   void AddKey(const uint32_t cf, const Slice& key);
1052   void InitWithComp(const uint32_t cf);
MarkNoopSubBatchCounter1053   Status MarkNoop(bool) override { return Status::OK(); }
MarkEndPrepareSubBatchCounter1054   Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
MarkCommitSubBatchCounter1055   Status MarkCommit(const Slice&) override { return Status::OK(); }
PutCFSubBatchCounter1056   Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
1057     AddKey(cf, key);
1058     return Status::OK();
1059   }
DeleteCFSubBatchCounter1060   Status DeleteCF(uint32_t cf, const Slice& key) override {
1061     AddKey(cf, key);
1062     return Status::OK();
1063   }
SingleDeleteCFSubBatchCounter1064   Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
1065     AddKey(cf, key);
1066     return Status::OK();
1067   }
MergeCFSubBatchCounter1068   Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
1069     AddKey(cf, key);
1070     return Status::OK();
1071   }
MarkBeginPrepareSubBatchCounter1072   Status MarkBeginPrepare(bool) override { return Status::OK(); }
MarkRollbackSubBatchCounter1073   Status MarkRollback(const Slice&) override { return Status::OK(); }
WriteAfterCommitSubBatchCounter1074   bool WriteAfterCommit() const override { return false; }
1075 };
1076 
AssignMinMaxSeqs(const Snapshot * snapshot,SequenceNumber * min,SequenceNumber * max)1077 SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
1078                                                     SequenceNumber* min,
1079                                                     SequenceNumber* max) {
1080   if (snapshot != nullptr) {
1081     *min = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
1082                ->min_uncommitted_;
1083     *max = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
1084                ->number_;
1085     return kBackedByDBSnapshot;
1086   } else {
1087     *min = SmallestUnCommittedSeq();
1088     *max = 0;  // to be assigned later after sv is referenced.
1089     return kUnbackedByDBSnapshot;
1090   }
1091 }
1092 
ValidateSnapshot(const SequenceNumber snap_seq,const SnapshotBackup backed_by_snapshot,std::memory_order order)1093 bool WritePreparedTxnDB::ValidateSnapshot(
1094     const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
1095     std::memory_order order) {
1096   if (backed_by_snapshot == kBackedByDBSnapshot) {
1097     return true;
1098   } else {
1099     SequenceNumber max = max_evicted_seq_.load(order);
1100     // Validate that max has not advanced the snapshot seq that is not backed
1101     // by a real snapshot. This is a very rare case that should not happen in
1102     // real workloads.
1103     if (UNLIKELY(snap_seq <= max && snap_seq != 0)) {
1104       return false;
1105     }
1106   }
1107   return true;
1108 }
1109 
1110 }  // namespace ROCKSDB_NAMESPACE
1111 #endif  // ROCKSDB_LITE
1112