1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/write_prepared_txn_db.h"
9 
10 #include <algorithm>
11 #include <cinttypes>
12 #include <string>
13 #include <unordered_set>
14 #include <vector>
15 
16 #include "db/arena_wrapped_db_iter.h"
17 #include "db/db_impl/db_impl.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/options.h"
20 #include "rocksdb/utilities/transaction_db.h"
21 #include "test_util/sync_point.h"
22 #include "util/cast_util.h"
23 #include "util/mutexlock.h"
24 #include "util/string_util.h"
25 #include "utilities/transactions/pessimistic_transaction.h"
26 #include "utilities/transactions/transaction_db_mutex_impl.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 
Initialize(const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles)30 Status WritePreparedTxnDB::Initialize(
31     const std::vector<size_t>& compaction_enabled_cf_indices,
32     const std::vector<ColumnFamilyHandle*>& handles) {
33   auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
34   assert(dbimpl != nullptr);
35   auto rtxns = dbimpl->recovered_transactions();
36   std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
37   for (auto rtxn : rtxns) {
38     // There should only one batch for WritePrepared policy.
39     assert(rtxn.second->batches_.size() == 1);
40     const auto& seq = rtxn.second->batches_.begin()->first;
41     const auto& batch_info = rtxn.second->batches_.begin()->second;
42     auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
43     ordered_seq_cnt[seq] = cnt;
44   }
45   // AddPrepared must be called in order
46   for (auto seq_cnt : ordered_seq_cnt) {
47     auto seq = seq_cnt.first;
48     auto cnt = seq_cnt.second;
49     for (size_t i = 0; i < cnt; i++) {
50       AddPrepared(seq + i);
51     }
52   }
53   SequenceNumber prev_max = max_evicted_seq_;
54   SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
55   AdvanceMaxEvictedSeq(prev_max, last_seq);
56   // Create a gap between max and the next snapshot. This simplifies the logic
57   // in IsInSnapshot by not having to consider the special case of max ==
58   // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
59   if (last_seq) {
60     db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
61     db_impl_->versions_->SetLastSequence(last_seq + 1);
62     db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
63   }
64 
65   db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
66   // A callback to commit a single sub-batch
67   class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
68    public:
69     explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
70         : db_(db) {}
71     Status Callback(SequenceNumber commit_seq,
72                     bool is_mem_disabled __attribute__((__unused__)), uint64_t,
73                     size_t /*index*/, size_t /*total*/) override {
74       assert(!is_mem_disabled);
75       db_->AddCommitted(commit_seq, commit_seq);
76       return Status::OK();
77     }
78 
79    private:
80     WritePreparedTxnDB* db_;
81   };
82   db_impl_->SetRecoverableStatePreReleaseCallback(
83       new CommitSubBatchPreReleaseCallback(this));
84 
85   auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
86                                                 handles);
87   return s;
88 }
89 
VerifyCFOptions(const ColumnFamilyOptions & cf_options)90 Status WritePreparedTxnDB::VerifyCFOptions(
91     const ColumnFamilyOptions& cf_options) {
92   Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
93   if (!s.ok()) {
94     return s;
95   }
96   if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
97     return Status::InvalidArgument(
98         "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
99         "WritePrpeared transactions");
100   }
101   return Status::OK();
102 }
103 
BeginTransaction(const WriteOptions & write_options,const TransactionOptions & txn_options,Transaction * old_txn)104 Transaction* WritePreparedTxnDB::BeginTransaction(
105     const WriteOptions& write_options, const TransactionOptions& txn_options,
106     Transaction* old_txn) {
107   if (old_txn != nullptr) {
108     ReinitializeTransaction(old_txn, write_options, txn_options);
109     return old_txn;
110   } else {
111     return new WritePreparedTxn(this, write_options, txn_options);
112   }
113 }
114 
Write(const WriteOptions & opts,WriteBatch * updates)115 Status WritePreparedTxnDB::Write(const WriteOptions& opts,
116                                  WriteBatch* updates) {
117   if (txn_db_options_.skip_concurrency_control) {
118     // Skip locking the rows
119     const size_t UNKNOWN_BATCH_CNT = 0;
120     WritePreparedTxn* NO_TXN = nullptr;
121     return WriteInternal(opts, updates, UNKNOWN_BATCH_CNT, NO_TXN);
122   } else {
123     return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
124   }
125 }
126 
Write(const WriteOptions & opts,const TransactionDBWriteOptimizations & optimizations,WriteBatch * updates)127 Status WritePreparedTxnDB::Write(
128     const WriteOptions& opts,
129     const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
130   if (optimizations.skip_concurrency_control) {
131     // Skip locking the rows
132     const size_t UNKNOWN_BATCH_CNT = 0;
133     const size_t ONE_BATCH_CNT = 1;
134     const size_t batch_cnt = optimizations.skip_duplicate_key_check
135                                  ? ONE_BATCH_CNT
136                                  : UNKNOWN_BATCH_CNT;
137     WritePreparedTxn* NO_TXN = nullptr;
138     return WriteInternal(opts, updates, batch_cnt, NO_TXN);
139   } else {
140     // TODO(myabandeh): Make use of skip_duplicate_key_check hint
141     // Fall back to unoptimized version
142     return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
143   }
144 }
145 
WriteInternal(const WriteOptions & write_options_orig,WriteBatch * batch,size_t batch_cnt,WritePreparedTxn * txn)146 Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
147                                          WriteBatch* batch, size_t batch_cnt,
148                                          WritePreparedTxn* txn) {
149   ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
150                     "CommitBatchInternal");
151   if (batch->Count() == 0) {
152     // Otherwise our 1 seq per batch logic will break since there is no seq
153     // increased for this batch.
154     return Status::OK();
155   }
156   if (batch_cnt == 0) {  // not provided, then compute it
157     // TODO(myabandeh): add an option to allow user skipping this cost
158     SubBatchCounter counter(*GetCFComparatorMap());
159     auto s = batch->Iterate(&counter);
160     if (!s.ok()) {
161       return s;
162     }
163     batch_cnt = counter.BatchCount();
164     WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
165     ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
166                       static_cast<uint64_t>(batch_cnt));
167   }
168   assert(batch_cnt);
169 
170   bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
171   WriteOptions write_options(write_options_orig);
172   // In the absence of Prepare markers, use Noop as a batch separator
173   auto s = WriteBatchInternal::InsertNoop(batch);
174   assert(s.ok());
175   const bool DISABLE_MEMTABLE = true;
176   const uint64_t no_log_ref = 0;
177   uint64_t seq_used = kMaxSequenceNumber;
178   const size_t ZERO_PREPARES = 0;
179   const bool kSeperatePrepareCommitBatches = true;
180   // Since this is not 2pc, there is no need for AddPrepared but having it in
181   // the PreReleaseCallback enables an optimization. Refer to
182   // SmallestUnCommittedSeq for more details.
183   AddPreparedCallback add_prepared_callback(
184       this, db_impl_, batch_cnt,
185       db_impl_->immutable_db_options().two_write_queues,
186       !kSeperatePrepareCommitBatches);
187   WritePreparedCommitEntryPreReleaseCallback update_commit_map(
188       this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
189   PreReleaseCallback* pre_release_callback;
190   if (do_one_write) {
191     pre_release_callback = &update_commit_map;
192   } else {
193     pre_release_callback = &add_prepared_callback;
194   }
195   s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, no_log_ref,
196                           !DISABLE_MEMTABLE, &seq_used, batch_cnt,
197                           pre_release_callback);
198   assert(!s.ok() || seq_used != kMaxSequenceNumber);
199   uint64_t prepare_seq = seq_used;
200   if (txn != nullptr) {
201     txn->SetId(prepare_seq);
202   }
203   if (!s.ok()) {
204     return s;
205   }
206   if (do_one_write) {
207     return s;
208   }  // else do the 2nd write for commit
209   ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
210                     "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
211                     prepare_seq);
212   // Commit the batch by writing an empty batch to the 2nd queue that will
213   // release the commit sequence number to readers.
214   const size_t ZERO_COMMITS = 0;
215   WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
216       this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
217   WriteBatch empty_batch;
218   write_options.disableWAL = true;
219   write_options.sync = false;
220   const size_t ONE_BATCH = 1;  // Just to inc the seq
221   s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
222                           no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
223                           &update_commit_map_with_prepare);
224   assert(!s.ok() || seq_used != kMaxSequenceNumber);
225   // Note: RemovePrepared is called from within PreReleaseCallback
226   return s;
227 }
228 
Get(const ReadOptions & options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)229 Status WritePreparedTxnDB::Get(const ReadOptions& options,
230                                ColumnFamilyHandle* column_family,
231                                const Slice& key, PinnableSlice* value) {
232   SequenceNumber min_uncommitted, snap_seq;
233   const SnapshotBackup backed_by_snapshot =
234       AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
235   WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted,
236                                         backed_by_snapshot);
237   bool* dont_care = nullptr;
238   DBImpl::GetImplOptions get_impl_options;
239   get_impl_options.column_family = column_family;
240   get_impl_options.value = value;
241   get_impl_options.value_found = dont_care;
242   get_impl_options.callback = &callback;
243   auto res = db_impl_->GetImpl(options, key, get_impl_options);
244   if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(),
245                                                   backed_by_snapshot))) {
246     return res;
247   } else {
248     WPRecordTick(TXN_GET_TRY_AGAIN);
249     return Status::TryAgain();
250   }
251 }
252 
UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle * > & handles)253 void WritePreparedTxnDB::UpdateCFComparatorMap(
254     const std::vector<ColumnFamilyHandle*>& handles) {
255   auto cf_map = new std::map<uint32_t, const Comparator*>();
256   auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>();
257   for (auto h : handles) {
258     auto id = h->GetID();
259     const Comparator* comparator = h->GetComparator();
260     (*cf_map)[id] = comparator;
261     if (id != 0) {
262       (*handle_map)[id] = h;
263     } else {
264       // The pointer to the default cf handle in the handles will be deleted.
265       // Use the pointer maintained by the db instead.
266       (*handle_map)[id] = DefaultColumnFamily();
267     }
268   }
269   cf_map_.reset(cf_map);
270   handle_map_.reset(handle_map);
271 }
272 
UpdateCFComparatorMap(ColumnFamilyHandle * h)273 void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
274   auto old_cf_map_ptr = cf_map_.get();
275   assert(old_cf_map_ptr);
276   auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
277   auto old_handle_map_ptr = handle_map_.get();
278   assert(old_handle_map_ptr);
279   auto handle_map =
280       new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr);
281   auto id = h->GetID();
282   const Comparator* comparator = h->GetComparator();
283   (*cf_map)[id] = comparator;
284   (*handle_map)[id] = h;
285   cf_map_.reset(cf_map);
286   handle_map_.reset(handle_map);
287 }
288 
289 
MultiGet(const ReadOptions & options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)290 std::vector<Status> WritePreparedTxnDB::MultiGet(
291     const ReadOptions& options,
292     const std::vector<ColumnFamilyHandle*>& column_family,
293     const std::vector<Slice>& keys, std::vector<std::string>* values) {
294   assert(values);
295   size_t num_keys = keys.size();
296   values->resize(num_keys);
297 
298   std::vector<Status> stat_list(num_keys);
299   for (size_t i = 0; i < num_keys; ++i) {
300     stat_list[i] = this->Get(options, column_family[i], keys[i], &(*values)[i]);
301   }
302   return stat_list;
303 }
304 
305 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
306 struct WritePreparedTxnDB::IteratorState {
IteratorStateROCKSDB_NAMESPACE::WritePreparedTxnDB::IteratorState307   IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
308                 std::shared_ptr<ManagedSnapshot> s,
309                 SequenceNumber min_uncommitted)
310       : callback(txn_db, sequence, min_uncommitted, kBackedByDBSnapshot),
311         snapshot(s) {}
312 
313   WritePreparedTxnReadCallback callback;
314   std::shared_ptr<ManagedSnapshot> snapshot;
315 };
316 
317 namespace {
CleanupWritePreparedTxnDBIterator(void * arg1,void *)318 static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
319   delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
320 }
321 }  // anonymous namespace
322 
NewIterator(const ReadOptions & options,ColumnFamilyHandle * column_family)323 Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
324                                           ColumnFamilyHandle* column_family) {
325   constexpr bool expose_blob_index = false;
326   constexpr bool allow_refresh = false;
327   std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
328   SequenceNumber snapshot_seq = kMaxSequenceNumber;
329   SequenceNumber min_uncommitted = 0;
330   if (options.snapshot != nullptr) {
331     snapshot_seq = options.snapshot->GetSequenceNumber();
332     min_uncommitted =
333         static_cast_with_check<const SnapshotImpl>(options.snapshot)
334             ->min_uncommitted_;
335   } else {
336     auto* snapshot = GetSnapshot();
337     // We take a snapshot to make sure that the related data in the commit map
338     // are not deleted.
339     snapshot_seq = snapshot->GetSequenceNumber();
340     min_uncommitted =
341         static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
342     own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
343   }
344   assert(snapshot_seq != kMaxSequenceNumber);
345   auto* cfd =
346       static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
347   auto* state =
348       new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
349   auto* db_iter =
350       db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
351                                 expose_blob_index, allow_refresh);
352   db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
353   return db_iter;
354 }
355 
NewIterators(const ReadOptions & options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)356 Status WritePreparedTxnDB::NewIterators(
357     const ReadOptions& options,
358     const std::vector<ColumnFamilyHandle*>& column_families,
359     std::vector<Iterator*>* iterators) {
360   constexpr bool expose_blob_index = false;
361   constexpr bool allow_refresh = false;
362   std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
363   SequenceNumber snapshot_seq = kMaxSequenceNumber;
364   SequenceNumber min_uncommitted = 0;
365   if (options.snapshot != nullptr) {
366     snapshot_seq = options.snapshot->GetSequenceNumber();
367     min_uncommitted =
368         static_cast_with_check<const SnapshotImpl>(options.snapshot)
369             ->min_uncommitted_;
370   } else {
371     auto* snapshot = GetSnapshot();
372     // We take a snapshot to make sure that the related data in the commit map
373     // are not deleted.
374     snapshot_seq = snapshot->GetSequenceNumber();
375     own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
376     min_uncommitted =
377         static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
378   }
379   iterators->clear();
380   iterators->reserve(column_families.size());
381   for (auto* column_family : column_families) {
382     auto* cfd =
383         static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
384     auto* state =
385         new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
386     auto* db_iter =
387         db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
388                                   expose_blob_index, allow_refresh);
389     db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
390     iterators->push_back(db_iter);
391   }
392   return Status::OK();
393 }
394 
Init(const TransactionDBOptions &)395 void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
396   // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
397   // around.
398   INC_STEP_FOR_MAX_EVICTED =
399       std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
400   snapshot_cache_ = std::unique_ptr<std::atomic<SequenceNumber>[]>(
401       new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
402   commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>(
403       new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
404   dummy_max_snapshot_.number_ = kMaxSequenceNumber;
405 }
406 
CheckPreparedAgainstMax(SequenceNumber new_max,bool locked)407 void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
408                                                  bool locked) {
409   // When max_evicted_seq_ advances, move older entries from prepared_txns_
410   // to delayed_prepared_. This guarantees that if a seq is lower than max,
411   // then it is not in prepared_txns_ and save an expensive, synchronized
412   // lookup from a shared set. delayed_prepared_ is expected to be empty in
413   // normal cases.
414   ROCKS_LOG_DETAILS(
415       info_log_,
416       "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64,
417       prepared_txns_.empty(),
418       prepared_txns_.empty() ? 0 : prepared_txns_.top());
419   const SequenceNumber prepared_top = prepared_txns_.top();
420   const bool empty = prepared_top == kMaxSequenceNumber;
421   // Preliminary check to avoid the synchronization cost
422   if (!empty && prepared_top <= new_max) {
423     if (locked) {
424       // Needed to avoid double locking in pop().
425       prepared_txns_.push_pop_mutex()->Unlock();
426     }
427     WriteLock wl(&prepared_mutex_);
428     // Need to fetch fresh values of ::top after mutex is acquired
429     while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
430       auto to_be_popped = prepared_txns_.top();
431       delayed_prepared_.insert(to_be_popped);
432       ROCKS_LOG_WARN(info_log_,
433                      "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
434                      " new_max=%" PRIu64,
435                      static_cast<uint64_t>(delayed_prepared_.size()),
436                      to_be_popped, new_max);
437       delayed_prepared_empty_.store(false, std::memory_order_release);
438       // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise
439       // there will be a point in time that the entry is neither in
440       // prepared_txns_ nor in delayed_prepared_, which will not be checked if
441       // delayed_prepared_empty_ is false.
442       prepared_txns_.pop();
443     }
444     if (locked) {
445       prepared_txns_.push_pop_mutex()->Lock();
446     }
447   }
448 }
449 
AddPrepared(uint64_t seq,bool locked)450 void WritePreparedTxnDB::AddPrepared(uint64_t seq, bool locked) {
451   ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64,
452                     seq, max_evicted_seq_.load());
453   TEST_SYNC_POINT("AddPrepared::begin:pause");
454   TEST_SYNC_POINT("AddPrepared::begin:resume");
455   if (!locked) {
456     prepared_txns_.push_pop_mutex()->Lock();
457   }
458   prepared_txns_.push_pop_mutex()->AssertHeld();
459   prepared_txns_.push(seq);
460   auto new_max = future_max_evicted_seq_.load();
461   if (UNLIKELY(seq <= new_max)) {
462     // This should not happen in normal case
463     ROCKS_LOG_ERROR(
464         info_log_,
465         "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
466         " <= %" PRIu64,
467         seq, new_max);
468     CheckPreparedAgainstMax(new_max, true /*locked*/);
469   }
470   if (!locked) {
471     prepared_txns_.push_pop_mutex()->Unlock();
472   }
473   TEST_SYNC_POINT("AddPrepared::end");
474 }
475 
AddCommitted(uint64_t prepare_seq,uint64_t commit_seq,uint8_t loop_cnt)476 void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
477                                       uint8_t loop_cnt) {
478   ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
479                     prepare_seq, commit_seq);
480   TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
481   TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
482   auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
483   CommitEntry64b evicted_64b;
484   CommitEntry evicted;
485   bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
486   if (LIKELY(to_be_evicted)) {
487     assert(evicted.prep_seq != prepare_seq);
488     auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
489     ROCKS_LOG_DETAILS(info_log_,
490                       "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
491                       evicted.prep_seq, evicted.commit_seq, prev_max);
492     if (prev_max < evicted.commit_seq) {
493       auto last = db_impl_->GetLastPublishedSequence();  // could be 0
494       SequenceNumber max_evicted_seq;
495       if (LIKELY(evicted.commit_seq < last)) {
496         assert(last > 0);
497         // Inc max in larger steps to avoid frequent updates
498         max_evicted_seq =
499             std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1);
500       } else {
501         // legit when a commit entry in a write batch overwrite the previous one
502         max_evicted_seq = evicted.commit_seq;
503       }
504       ROCKS_LOG_DETAILS(info_log_,
505                         "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
506                         " => %lu",
507                         prepare_seq, evicted.prep_seq, evicted.commit_seq,
508                         prev_max, max_evicted_seq);
509       AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
510     }
511     // After each eviction from commit cache, check if the commit entry should
512     // be kept around because it overlaps with a live snapshot.
513     CheckAgainstSnapshots(evicted);
514     if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) {
515       WriteLock wl(&prepared_mutex_);
516       for (auto dp : delayed_prepared_) {
517         if (dp == evicted.prep_seq) {
518           // This is a rare case that txn is committed but prepared_txns_ is not
519           // cleaned up yet. Refer to delayed_prepared_commits_ definition for
520           // why it should be kept updated.
521           delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
522           ROCKS_LOG_DEBUG(info_log_,
523                           "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
524                           evicted.prep_seq, evicted.commit_seq);
525           break;
526         }
527       }
528     }
529   }
530   bool succ =
531       ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
532   if (UNLIKELY(!succ)) {
533     ROCKS_LOG_ERROR(info_log_,
534                     "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
535                     ",%" PRIu64 " retrying...",
536                     indexed_seq, prepare_seq, commit_seq);
537     // A very rare event, in which the commit entry is updated before we do.
538     // Here we apply a very simple solution of retrying.
539     if (loop_cnt > 100) {
540       throw std::runtime_error("Infinite loop in AddCommitted!");
541     }
542     AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
543     return;
544   }
545   TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
546   TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
547 }
548 
RemovePrepared(const uint64_t prepare_seq,const size_t batch_cnt)549 void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
550                                         const size_t batch_cnt) {
551   TEST_SYNC_POINT_CALLBACK(
552       "RemovePrepared:Start",
553       const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq)));
554   TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
555   TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
556   ROCKS_LOG_DETAILS(info_log_,
557                     "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt,
558                     prepare_seq, batch_cnt);
559   WriteLock wl(&prepared_mutex_);
560   for (size_t i = 0; i < batch_cnt; i++) {
561     prepared_txns_.erase(prepare_seq + i);
562     bool was_empty = delayed_prepared_.empty();
563     if (!was_empty) {
564       delayed_prepared_.erase(prepare_seq + i);
565       auto it = delayed_prepared_commits_.find(prepare_seq + i);
566       if (it != delayed_prepared_commits_.end()) {
567         ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64,
568                           prepare_seq + i);
569         delayed_prepared_commits_.erase(it);
570       }
571       bool is_empty = delayed_prepared_.empty();
572       if (was_empty != is_empty) {
573         delayed_prepared_empty_.store(is_empty, std::memory_order_release);
574       }
575     }
576   }
577 }
578 
GetCommitEntry(const uint64_t indexed_seq,CommitEntry64b * entry_64b,CommitEntry * entry) const579 bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
580                                         CommitEntry64b* entry_64b,
581                                         CommitEntry* entry) const {
582   *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(std::memory_order_acquire);
583   bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
584   return valid;
585 }
586 
AddCommitEntry(const uint64_t indexed_seq,const CommitEntry & new_entry,CommitEntry * evicted_entry)587 bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
588                                         const CommitEntry& new_entry,
589                                         CommitEntry* evicted_entry) {
590   CommitEntry64b new_entry_64b(new_entry, FORMAT);
591   CommitEntry64b evicted_entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].exchange(
592       new_entry_64b, std::memory_order_acq_rel);
593   bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
594   return valid;
595 }
596 
ExchangeCommitEntry(const uint64_t indexed_seq,CommitEntry64b & expected_entry_64b,const CommitEntry & new_entry)597 bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
598                                              CommitEntry64b& expected_entry_64b,
599                                              const CommitEntry& new_entry) {
600   auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)];
601   CommitEntry64b new_entry_64b(new_entry, FORMAT);
602   bool succ = atomic_entry.compare_exchange_strong(
603       expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
604       std::memory_order_acquire);
605   return succ;
606 }
607 
AdvanceMaxEvictedSeq(const SequenceNumber & prev_max,const SequenceNumber & new_max)608 void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
609                                               const SequenceNumber& new_max) {
610   ROCKS_LOG_DETAILS(info_log_,
611                     "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
612                     prev_max, new_max);
613   // Declare the intention before getting snapshot from the DB. This helps a
614   // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
615   // it has not already. Otherwise the new snapshot is when we ask DB for
616   // snapshots smaller than future max.
617   auto updated_future_max = prev_max;
618   while (updated_future_max < new_max &&
619          !future_max_evicted_seq_.compare_exchange_weak(
620              updated_future_max, new_max, std::memory_order_acq_rel,
621              std::memory_order_relaxed)) {
622   };
623 
624   CheckPreparedAgainstMax(new_max, false /*locked*/);
625 
626   // With each change to max_evicted_seq_ fetch the live snapshots behind it.
627   // We use max as the version of snapshots to identify how fresh are the
628   // snapshot list. This works because the snapshots are between 0 and
629   // max, so the larger the max, the more complete they are.
630   SequenceNumber new_snapshots_version = new_max;
631   std::vector<SequenceNumber> snapshots;
632   bool update_snapshots = false;
633   if (new_snapshots_version > snapshots_version_) {
634     // This is to avoid updating the snapshots_ if it already updated
635     // with a more recent vesion by a concrrent thread
636     update_snapshots = true;
637     // We only care about snapshots lower then max
638     snapshots = GetSnapshotListFromDB(new_max);
639   }
640   if (update_snapshots) {
641     UpdateSnapshots(snapshots, new_snapshots_version);
642     if (!snapshots.empty()) {
643       WriteLock wl(&old_commit_map_mutex_);
644       for (auto snap : snapshots) {
645         // This allows IsInSnapshot to tell apart the reads from in valid
646         // snapshots from the reads from committed values in valid snapshots.
647         old_commit_map_[snap];
648       }
649       old_commit_map_empty_.store(false, std::memory_order_release);
650     }
651   }
652   auto updated_prev_max = prev_max;
653   TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
654   TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
655   while (updated_prev_max < new_max &&
656          !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
657                                                  std::memory_order_acq_rel,
658                                                  std::memory_order_relaxed)) {
659   };
660 }
661 
GetSnapshot()662 const Snapshot* WritePreparedTxnDB::GetSnapshot() {
663   const bool kForWWConflictCheck = true;
664   return GetSnapshotInternal(!kForWWConflictCheck);
665 }
666 
GetSnapshotInternal(bool for_ww_conflict_check)667 SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
668     bool for_ww_conflict_check) {
669   // Note: for this optimization setting the last sequence number and obtaining
670   // the smallest uncommitted seq should be done atomically. However to avoid
671   // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
672   // snapshot. Since we always updated the list of unprepared seq (via
673   // AddPrepared) AFTER the last sequence is updated, this guarantees that the
674   // smallest uncommitted seq that we pair with the snapshot is smaller or equal
675   // the value that would be obtained otherwise atomically. That is ok since
676   // this optimization works as long as min_uncommitted is less than or equal
677   // than the smallest uncommitted seq when the snapshot was taken.
678   auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
679   SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
680   TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first");
681   assert(snap_impl);
682   SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
683   // Note: Check against future_max_evicted_seq_ (in contrast with
684   // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
685   if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) {
686     // There is a very rare case in which the commit entry evicts another commit
687     // entry that is not published yet thus advancing max evicted seq beyond the
688     // last published seq. This case is not likely in real-world setup so we
689     // handle it with a few retries.
690     size_t retry = 0;
691     SequenceNumber max;
692     while ((max = future_max_evicted_seq_.load()) != 0 &&
693            snap_impl->GetSequenceNumber() <= max && retry < 100) {
694       ROCKS_LOG_WARN(info_log_,
695                      "GetSnapshot snap: %" PRIu64 " max: %" PRIu64
696                      " retry %" ROCKSDB_PRIszt,
697                      snap_impl->GetSequenceNumber(), max, retry);
698       ReleaseSnapshot(snap_impl);
699       // Wait for last visible seq to catch up with max, and also go beyond it
700       // by one.
701       AdvanceSeqByOne();
702       snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
703       assert(snap_impl);
704       retry++;
705     }
706     assert(snap_impl->GetSequenceNumber() > max);
707     if (snap_impl->GetSequenceNumber() <= max) {
708       throw std::runtime_error(
709           "Snapshot seq " + ToString(snap_impl->GetSequenceNumber()) +
710           " after " + ToString(retry) +
711           " retries is still less than futre_max_evicted_seq_" + ToString(max));
712     }
713   }
714   EnhanceSnapshot(snap_impl, min_uncommitted);
715   ROCKS_LOG_DETAILS(
716       db_impl_->immutable_db_options().info_log,
717       "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
718       snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted);
719   TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end");
720   return snap_impl;
721 }
722 
AdvanceSeqByOne()723 void WritePreparedTxnDB::AdvanceSeqByOne() {
724   // Inserting an empty value will i) let the max evicted entry to be
725   // published, i.e., max == last_published, increase the last published to
726   // be one beyond max, i.e., max < last_published.
727   WriteOptions woptions;
728   TransactionOptions txn_options;
729   Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr);
730   std::hash<std::thread::id> hasher;
731   char name[64];
732   snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id()));
733   assert(strlen(name) < 64 - 1);
734   Status s = txn0->SetName(name);
735   assert(s.ok());
736   if (s.ok()) {
737     // Without prepare it would simply skip the commit
738     s = txn0->Prepare();
739   }
740   assert(s.ok());
741   if (s.ok()) {
742     s = txn0->Commit();
743   }
744   assert(s.ok());
745   delete txn0;
746 }
747 
GetSnapshotListFromDB(SequenceNumber max)748 const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
749     SequenceNumber max) {
750   ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
751   InstrumentedMutexLock dblock(db_impl_->mutex());
752   db_impl_->mutex()->AssertHeld();
753   return db_impl_->snapshots().GetAll(nullptr, max);
754 }
755 
ReleaseSnapshotInternal(const SequenceNumber snap_seq)756 void WritePreparedTxnDB::ReleaseSnapshotInternal(
757     const SequenceNumber snap_seq) {
758   // TODO(myabandeh): relax should enough since the synchronizatin is already
759   // done by snapshots_mutex_ under which this function is called.
760   if (snap_seq <= max_evicted_seq_.load(std::memory_order_acquire)) {
761     // Then this is a rare case that transaction did not finish before max
762     // advances. It is expected for a few read-only backup snapshots. For such
763     // snapshots we might have kept around a couple of entries in the
764     // old_commit_map_. Check and do garbage collection if that is the case.
765     bool need_gc = false;
766     {
767       WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
768       ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
769                      snap_seq);
770       ReadLock rl(&old_commit_map_mutex_);
771       auto prep_set_entry = old_commit_map_.find(snap_seq);
772       need_gc = prep_set_entry != old_commit_map_.end();
773     }
774     if (need_gc) {
775       WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
776       ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
777                      snap_seq);
778       WriteLock wl(&old_commit_map_mutex_);
779       old_commit_map_.erase(snap_seq);
780       old_commit_map_empty_.store(old_commit_map_.empty(),
781                                   std::memory_order_release);
782     }
783   }
784 }
785 
CleanupReleasedSnapshots(const std::vector<SequenceNumber> & new_snapshots,const std::vector<SequenceNumber> & old_snapshots)786 void WritePreparedTxnDB::CleanupReleasedSnapshots(
787     const std::vector<SequenceNumber>& new_snapshots,
788     const std::vector<SequenceNumber>& old_snapshots) {
789   auto newi = new_snapshots.begin();
790   auto oldi = old_snapshots.begin();
791   for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) {
792     assert(*newi >= *oldi);  // cannot have new snapshots with lower seq
793     if (*newi == *oldi) {    // still not released
794       auto value = *newi;
795       while (newi != new_snapshots.end() && *newi == value) {
796         newi++;
797       }
798       while (oldi != old_snapshots.end() && *oldi == value) {
799         oldi++;
800       }
801     } else {
802       assert(*newi > *oldi);  // *oldi is released
803       ReleaseSnapshotInternal(*oldi);
804       oldi++;
805     }
806   }
807   // Everything remained in old_snapshots is released and must be cleaned up
808   for (; oldi != old_snapshots.end(); oldi++) {
809     ReleaseSnapshotInternal(*oldi);
810   }
811 }
812 
UpdateSnapshots(const std::vector<SequenceNumber> & snapshots,const SequenceNumber & version)813 void WritePreparedTxnDB::UpdateSnapshots(
814     const std::vector<SequenceNumber>& snapshots,
815     const SequenceNumber& version) {
816   ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
817                     version);
818   TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
819   TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
820 #ifndef NDEBUG
821   size_t sync_i = 0;
822 #endif
823   ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead");
824   WriteLock wl(&snapshots_mutex_);
825   snapshots_version_ = version;
826   // We update the list concurrently with the readers.
827   // Both new and old lists are sorted and the new list is subset of the
828   // previous list plus some new items. Thus if a snapshot repeats in
829   // both new and old lists, it will appear upper in the new list. So if
830   // we simply insert the new snapshots in order, if an overwritten item
831   // is still valid in the new list is either written to the same place in
832   // the array or it is written in a higher palce before it gets
833   // overwritten by another item. This guarantess a reader that reads the
834   // list bottom-up will eventaully see a snapshot that repeats in the
835   // update, either before it gets overwritten by the writer or
836   // afterwards.
837   size_t i = 0;
838   auto it = snapshots.begin();
839   for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; ++it, ++i) {
840     snapshot_cache_[i].store(*it, std::memory_order_release);
841     TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
842     TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
843   }
844 #ifndef NDEBUG
845   // Release the remaining sync points since they are useless given that the
846   // reader would also use lock to access snapshots
847   for (++sync_i; sync_i <= 10; ++sync_i) {
848     TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
849     TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
850   }
851 #endif
852   snapshots_.clear();
853   for (; it != snapshots.end(); ++it) {
854     // Insert them to a vector that is less efficient to access
855     // concurrently
856     snapshots_.push_back(*it);
857   }
858   // Update the size at the end. Otherwise a parallel reader might read
859   // items that are not set yet.
860   snapshots_total_.store(snapshots.size(), std::memory_order_release);
861 
862   // Note: this must be done after the snapshots data structures are updated
863   // with the new list of snapshots.
864   CleanupReleasedSnapshots(snapshots, snapshots_all_);
865   snapshots_all_ = snapshots;
866 
867   TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
868   TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
869 }
870 
CheckAgainstSnapshots(const CommitEntry & evicted)871 void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
872   TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
873   TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
874 #ifndef NDEBUG
875   size_t sync_i = 0;
876 #endif
877   // First check the snapshot cache that is efficient for concurrent access
878   auto cnt = snapshots_total_.load(std::memory_order_acquire);
879   // The list might get updated concurrently as we are reading from it. The
880   // reader should be able to read all the snapshots that are still valid
881   // after the update. Since the survived snapshots are written in a higher
882   // place before gets overwritten the reader that reads bottom-up will
883   // eventully see it.
884   const bool next_is_larger = true;
885   // We will set to true if the border line snapshot suggests that.
886   bool search_larger_list = false;
887   size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
888   for (; 0 < ip1; ip1--) {
889     SequenceNumber snapshot_seq =
890         snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
891     TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
892                         ++sync_i);
893     TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
894     if (ip1 == SNAPSHOT_CACHE_SIZE) {  // border line snapshot
895       // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
896       // then later also continue the search to larger snapshots
897       search_larger_list = snapshot_seq < evicted.commit_seq;
898     }
899     if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
900                                  snapshot_seq, !next_is_larger)) {
901       break;
902     }
903   }
904 #ifndef NDEBUG
905   // Release the remaining sync points before accquiring the lock
906   for (++sync_i; sync_i <= 10; ++sync_i) {
907     TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
908     TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
909   }
910 #endif
911   TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
912   TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
913   if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) {
914     // Then access the less efficient list of snapshots_
915     WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
916     ROCKS_LOG_WARN(info_log_,
917                    "snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64
918                    "> with %" ROCKSDB_PRIszt " snapshots",
919                    evicted.prep_seq, evicted.commit_seq, cnt);
920     ReadLock rl(&snapshots_mutex_);
921     // Items could have moved from the snapshots_ to snapshot_cache_ before
922     // accquiring the lock. To make sure that we do not miss a valid snapshot,
923     // read snapshot_cache_ again while holding the lock.
924     for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
925       SequenceNumber snapshot_seq =
926           snapshot_cache_[i].load(std::memory_order_acquire);
927       if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
928                                    snapshot_seq, next_is_larger)) {
929         break;
930       }
931     }
932     for (auto snapshot_seq_2 : snapshots_) {
933       if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
934                                    snapshot_seq_2, next_is_larger)) {
935         break;
936       }
937     }
938   }
939 }
940 
MaybeUpdateOldCommitMap(const uint64_t & prep_seq,const uint64_t & commit_seq,const uint64_t & snapshot_seq,const bool next_is_larger=true)941 bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
942     const uint64_t& prep_seq, const uint64_t& commit_seq,
943     const uint64_t& snapshot_seq, const bool next_is_larger = true) {
944   // If we do not store an entry in old_commit_map_ we assume it is committed in
945   // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
946   // the snapshot so we need not to keep the entry around for this snapshot.
947   if (commit_seq <= snapshot_seq) {
948     // continue the search if the next snapshot could be smaller than commit_seq
949     return !next_is_larger;
950   }
951   // then snapshot_seq < commit_seq
952   if (prep_seq <= snapshot_seq) {  // overlapping range
953     WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
954     ROCKS_LOG_WARN(info_log_,
955                    "old_commit_map_mutex_ overhead for %" PRIu64
956                    " commit entry: <%" PRIu64 ",%" PRIu64 ">",
957                    snapshot_seq, prep_seq, commit_seq);
958     WriteLock wl(&old_commit_map_mutex_);
959     old_commit_map_empty_.store(false, std::memory_order_release);
960     auto& vec = old_commit_map_[snapshot_seq];
961     vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
962     // We need to store it once for each overlapping snapshot. Returning true to
963     // continue the search if there is more overlapping snapshot.
964     return true;
965   }
966   // continue the search if the next snapshot could be larger than prep_seq
967   return next_is_larger;
968 }
969 
~WritePreparedTxnDB()970 WritePreparedTxnDB::~WritePreparedTxnDB() {
971   // At this point there could be running compaction/flush holding a
972   // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
973   // Make sure those jobs finished before destructing WritePreparedTxnDB.
974   if (!db_impl_->shutting_down_) {
975     db_impl_->CancelAllBackgroundWork(true /*wait*/);
976   }
977 }
978 
InitWithComp(const uint32_t cf)979 void SubBatchCounter::InitWithComp(const uint32_t cf) {
980   auto cmp = comparators_[cf];
981   keys_[cf] = CFKeys(SetComparator(cmp));
982 }
983 
AddKey(const uint32_t cf,const Slice & key)984 void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
985   CFKeys& cf_keys = keys_[cf];
986   if (cf_keys.size() == 0) {  // just inserted
987     InitWithComp(cf);
988   }
989   auto it = cf_keys.insert(key);
990   if (it.second == false) {  // second is false if a element already existed.
991     batches_++;
992     keys_.clear();
993     InitWithComp(cf);
994     keys_[cf].insert(key);
995   }
996 }
997 
998 }  // namespace ROCKSDB_NAMESPACE
999 #endif  // ROCKSDB_LITE
1000