1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/write_prepared_txn_db.h"
9
10 #include <algorithm>
11 #include <cinttypes>
12 #include <string>
13 #include <unordered_set>
14 #include <vector>
15
16 #include "db/arena_wrapped_db_iter.h"
17 #include "db/db_impl/db_impl.h"
18 #include "logging/logging.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "test_util/sync_point.h"
23 #include "util/cast_util.h"
24 #include "util/mutexlock.h"
25 #include "util/string_util.h"
26 #include "utilities/transactions/pessimistic_transaction.h"
27 #include "utilities/transactions/transaction_db_mutex_impl.h"
28
29 namespace ROCKSDB_NAMESPACE {
30
Initialize(const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles)31 Status WritePreparedTxnDB::Initialize(
32 const std::vector<size_t>& compaction_enabled_cf_indices,
33 const std::vector<ColumnFamilyHandle*>& handles) {
34 auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
35 assert(dbimpl != nullptr);
36 auto rtxns = dbimpl->recovered_transactions();
37 std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
38 for (auto rtxn : rtxns) {
39 // There should only one batch for WritePrepared policy.
40 assert(rtxn.second->batches_.size() == 1);
41 const auto& seq = rtxn.second->batches_.begin()->first;
42 const auto& batch_info = rtxn.second->batches_.begin()->second;
43 auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
44 ordered_seq_cnt[seq] = cnt;
45 }
46 // AddPrepared must be called in order
47 for (auto seq_cnt : ordered_seq_cnt) {
48 auto seq = seq_cnt.first;
49 auto cnt = seq_cnt.second;
50 for (size_t i = 0; i < cnt; i++) {
51 AddPrepared(seq + i);
52 }
53 }
54 SequenceNumber prev_max = max_evicted_seq_;
55 SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
56 AdvanceMaxEvictedSeq(prev_max, last_seq);
57 // Create a gap between max and the next snapshot. This simplifies the logic
58 // in IsInSnapshot by not having to consider the special case of max ==
59 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
60 if (last_seq) {
61 db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
62 db_impl_->versions_->SetLastSequence(last_seq + 1);
63 db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
64 }
65
66 db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
67 // A callback to commit a single sub-batch
68 class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
69 public:
70 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
71 : db_(db) {}
72 Status Callback(SequenceNumber commit_seq,
73 bool is_mem_disabled __attribute__((__unused__)), uint64_t,
74 size_t /*index*/, size_t /*total*/) override {
75 assert(!is_mem_disabled);
76 db_->AddCommitted(commit_seq, commit_seq);
77 return Status::OK();
78 }
79
80 private:
81 WritePreparedTxnDB* db_;
82 };
83 db_impl_->SetRecoverableStatePreReleaseCallback(
84 new CommitSubBatchPreReleaseCallback(this));
85
86 auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
87 handles);
88 return s;
89 }
90
VerifyCFOptions(const ColumnFamilyOptions & cf_options)91 Status WritePreparedTxnDB::VerifyCFOptions(
92 const ColumnFamilyOptions& cf_options) {
93 Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
94 if (!s.ok()) {
95 return s;
96 }
97 if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
98 return Status::InvalidArgument(
99 "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
100 "WritePrpeared transactions");
101 }
102 return Status::OK();
103 }
104
BeginTransaction(const WriteOptions & write_options,const TransactionOptions & txn_options,Transaction * old_txn)105 Transaction* WritePreparedTxnDB::BeginTransaction(
106 const WriteOptions& write_options, const TransactionOptions& txn_options,
107 Transaction* old_txn) {
108 if (old_txn != nullptr) {
109 ReinitializeTransaction(old_txn, write_options, txn_options);
110 return old_txn;
111 } else {
112 return new WritePreparedTxn(this, write_options, txn_options);
113 }
114 }
115
Write(const WriteOptions & opts,WriteBatch * updates)116 Status WritePreparedTxnDB::Write(const WriteOptions& opts,
117 WriteBatch* updates) {
118 if (txn_db_options_.skip_concurrency_control) {
119 // Skip locking the rows
120 const size_t UNKNOWN_BATCH_CNT = 0;
121 WritePreparedTxn* NO_TXN = nullptr;
122 return WriteInternal(opts, updates, UNKNOWN_BATCH_CNT, NO_TXN);
123 } else {
124 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
125 }
126 }
127
Write(const WriteOptions & opts,const TransactionDBWriteOptimizations & optimizations,WriteBatch * updates)128 Status WritePreparedTxnDB::Write(
129 const WriteOptions& opts,
130 const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
131 if (optimizations.skip_concurrency_control) {
132 // Skip locking the rows
133 const size_t UNKNOWN_BATCH_CNT = 0;
134 const size_t ONE_BATCH_CNT = 1;
135 const size_t batch_cnt = optimizations.skip_duplicate_key_check
136 ? ONE_BATCH_CNT
137 : UNKNOWN_BATCH_CNT;
138 WritePreparedTxn* NO_TXN = nullptr;
139 return WriteInternal(opts, updates, batch_cnt, NO_TXN);
140 } else {
141 // TODO(myabandeh): Make use of skip_duplicate_key_check hint
142 // Fall back to unoptimized version
143 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
144 }
145 }
146
WriteInternal(const WriteOptions & write_options_orig,WriteBatch * batch,size_t batch_cnt,WritePreparedTxn * txn)147 Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
148 WriteBatch* batch, size_t batch_cnt,
149 WritePreparedTxn* txn) {
150 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
151 "CommitBatchInternal");
152 if (batch->Count() == 0) {
153 // Otherwise our 1 seq per batch logic will break since there is no seq
154 // increased for this batch.
155 return Status::OK();
156 }
157 if (batch_cnt == 0) { // not provided, then compute it
158 // TODO(myabandeh): add an option to allow user skipping this cost
159 SubBatchCounter counter(*GetCFComparatorMap());
160 auto s = batch->Iterate(&counter);
161 if (!s.ok()) {
162 return s;
163 }
164 batch_cnt = counter.BatchCount();
165 WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
166 ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
167 static_cast<uint64_t>(batch_cnt));
168 }
169 assert(batch_cnt);
170
171 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
172 WriteOptions write_options(write_options_orig);
173 // In the absence of Prepare markers, use Noop as a batch separator
174 auto s = WriteBatchInternal::InsertNoop(batch);
175 assert(s.ok());
176 const bool DISABLE_MEMTABLE = true;
177 const uint64_t no_log_ref = 0;
178 uint64_t seq_used = kMaxSequenceNumber;
179 const size_t ZERO_PREPARES = 0;
180 const bool kSeperatePrepareCommitBatches = true;
181 // Since this is not 2pc, there is no need for AddPrepared but having it in
182 // the PreReleaseCallback enables an optimization. Refer to
183 // SmallestUnCommittedSeq for more details.
184 AddPreparedCallback add_prepared_callback(
185 this, db_impl_, batch_cnt,
186 db_impl_->immutable_db_options().two_write_queues,
187 !kSeperatePrepareCommitBatches);
188 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
189 this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
190 PreReleaseCallback* pre_release_callback;
191 if (do_one_write) {
192 pre_release_callback = &update_commit_map;
193 } else {
194 pre_release_callback = &add_prepared_callback;
195 }
196 s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, no_log_ref,
197 !DISABLE_MEMTABLE, &seq_used, batch_cnt,
198 pre_release_callback);
199 assert(!s.ok() || seq_used != kMaxSequenceNumber);
200 uint64_t prepare_seq = seq_used;
201 if (txn != nullptr) {
202 txn->SetId(prepare_seq);
203 }
204 if (!s.ok()) {
205 return s;
206 }
207 if (do_one_write) {
208 return s;
209 } // else do the 2nd write for commit
210 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
211 "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
212 prepare_seq);
213 // Commit the batch by writing an empty batch to the 2nd queue that will
214 // release the commit sequence number to readers.
215 const size_t ZERO_COMMITS = 0;
216 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
217 this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
218 WriteBatch empty_batch;
219 write_options.disableWAL = true;
220 write_options.sync = false;
221 const size_t ONE_BATCH = 1; // Just to inc the seq
222 s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
223 no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
224 &update_commit_map_with_prepare);
225 assert(!s.ok() || seq_used != kMaxSequenceNumber);
226 // Note: RemovePrepared is called from within PreReleaseCallback
227 return s;
228 }
229
Get(const ReadOptions & options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)230 Status WritePreparedTxnDB::Get(const ReadOptions& options,
231 ColumnFamilyHandle* column_family,
232 const Slice& key, PinnableSlice* value) {
233 SequenceNumber min_uncommitted, snap_seq;
234 const SnapshotBackup backed_by_snapshot =
235 AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
236 WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted,
237 backed_by_snapshot);
238 bool* dont_care = nullptr;
239 DBImpl::GetImplOptions get_impl_options;
240 get_impl_options.column_family = column_family;
241 get_impl_options.value = value;
242 get_impl_options.value_found = dont_care;
243 get_impl_options.callback = &callback;
244 auto res = db_impl_->GetImpl(options, key, get_impl_options);
245 if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(),
246 backed_by_snapshot))) {
247 return res;
248 } else {
249 WPRecordTick(TXN_GET_TRY_AGAIN);
250 return Status::TryAgain();
251 }
252 }
253
UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle * > & handles)254 void WritePreparedTxnDB::UpdateCFComparatorMap(
255 const std::vector<ColumnFamilyHandle*>& handles) {
256 auto cf_map = new std::map<uint32_t, const Comparator*>();
257 auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>();
258 for (auto h : handles) {
259 auto id = h->GetID();
260 const Comparator* comparator = h->GetComparator();
261 (*cf_map)[id] = comparator;
262 if (id != 0) {
263 (*handle_map)[id] = h;
264 } else {
265 // The pointer to the default cf handle in the handles will be deleted.
266 // Use the pointer maintained by the db instead.
267 (*handle_map)[id] = DefaultColumnFamily();
268 }
269 }
270 cf_map_.reset(cf_map);
271 handle_map_.reset(handle_map);
272 }
273
UpdateCFComparatorMap(ColumnFamilyHandle * h)274 void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
275 auto old_cf_map_ptr = cf_map_.get();
276 assert(old_cf_map_ptr);
277 auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
278 auto old_handle_map_ptr = handle_map_.get();
279 assert(old_handle_map_ptr);
280 auto handle_map =
281 new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr);
282 auto id = h->GetID();
283 const Comparator* comparator = h->GetComparator();
284 (*cf_map)[id] = comparator;
285 (*handle_map)[id] = h;
286 cf_map_.reset(cf_map);
287 handle_map_.reset(handle_map);
288 }
289
290
MultiGet(const ReadOptions & options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)291 std::vector<Status> WritePreparedTxnDB::MultiGet(
292 const ReadOptions& options,
293 const std::vector<ColumnFamilyHandle*>& column_family,
294 const std::vector<Slice>& keys, std::vector<std::string>* values) {
295 assert(values);
296 size_t num_keys = keys.size();
297 values->resize(num_keys);
298
299 std::vector<Status> stat_list(num_keys);
300 for (size_t i = 0; i < num_keys; ++i) {
301 stat_list[i] = this->Get(options, column_family[i], keys[i], &(*values)[i]);
302 }
303 return stat_list;
304 }
305
306 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
307 struct WritePreparedTxnDB::IteratorState {
IteratorStateROCKSDB_NAMESPACE::WritePreparedTxnDB::IteratorState308 IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
309 std::shared_ptr<ManagedSnapshot> s,
310 SequenceNumber min_uncommitted)
311 : callback(txn_db, sequence, min_uncommitted, kBackedByDBSnapshot),
312 snapshot(s) {}
313
314 WritePreparedTxnReadCallback callback;
315 std::shared_ptr<ManagedSnapshot> snapshot;
316 };
317
318 namespace {
CleanupWritePreparedTxnDBIterator(void * arg1,void *)319 static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
320 delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
321 }
322 } // anonymous namespace
323
NewIterator(const ReadOptions & options,ColumnFamilyHandle * column_family)324 Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
325 ColumnFamilyHandle* column_family) {
326 constexpr bool expose_blob_index = false;
327 constexpr bool allow_refresh = false;
328 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
329 SequenceNumber snapshot_seq = kMaxSequenceNumber;
330 SequenceNumber min_uncommitted = 0;
331 if (options.snapshot != nullptr) {
332 snapshot_seq = options.snapshot->GetSequenceNumber();
333 min_uncommitted =
334 static_cast_with_check<const SnapshotImpl>(options.snapshot)
335 ->min_uncommitted_;
336 } else {
337 auto* snapshot = GetSnapshot();
338 // We take a snapshot to make sure that the related data in the commit map
339 // are not deleted.
340 snapshot_seq = snapshot->GetSequenceNumber();
341 min_uncommitted =
342 static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
343 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
344 }
345 assert(snapshot_seq != kMaxSequenceNumber);
346 auto* cfd =
347 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
348 auto* state =
349 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
350 auto* db_iter =
351 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
352 expose_blob_index, allow_refresh);
353 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
354 return db_iter;
355 }
356
NewIterators(const ReadOptions & options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)357 Status WritePreparedTxnDB::NewIterators(
358 const ReadOptions& options,
359 const std::vector<ColumnFamilyHandle*>& column_families,
360 std::vector<Iterator*>* iterators) {
361 constexpr bool expose_blob_index = false;
362 constexpr bool allow_refresh = false;
363 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
364 SequenceNumber snapshot_seq = kMaxSequenceNumber;
365 SequenceNumber min_uncommitted = 0;
366 if (options.snapshot != nullptr) {
367 snapshot_seq = options.snapshot->GetSequenceNumber();
368 min_uncommitted =
369 static_cast_with_check<const SnapshotImpl>(options.snapshot)
370 ->min_uncommitted_;
371 } else {
372 auto* snapshot = GetSnapshot();
373 // We take a snapshot to make sure that the related data in the commit map
374 // are not deleted.
375 snapshot_seq = snapshot->GetSequenceNumber();
376 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
377 min_uncommitted =
378 static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
379 }
380 iterators->clear();
381 iterators->reserve(column_families.size());
382 for (auto* column_family : column_families) {
383 auto* cfd =
384 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
385 auto* state =
386 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
387 auto* db_iter =
388 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
389 expose_blob_index, allow_refresh);
390 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
391 iterators->push_back(db_iter);
392 }
393 return Status::OK();
394 }
395
Init(const TransactionDBOptions &)396 void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
397 // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
398 // around.
399 INC_STEP_FOR_MAX_EVICTED =
400 std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
401 snapshot_cache_ = std::unique_ptr<std::atomic<SequenceNumber>[]>(
402 new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
403 commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>(
404 new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
405 dummy_max_snapshot_.number_ = kMaxSequenceNumber;
406 }
407
CheckPreparedAgainstMax(SequenceNumber new_max,bool locked)408 void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
409 bool locked) {
410 // When max_evicted_seq_ advances, move older entries from prepared_txns_
411 // to delayed_prepared_. This guarantees that if a seq is lower than max,
412 // then it is not in prepared_txns_ and save an expensive, synchronized
413 // lookup from a shared set. delayed_prepared_ is expected to be empty in
414 // normal cases.
415 ROCKS_LOG_DETAILS(
416 info_log_,
417 "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64,
418 prepared_txns_.empty(),
419 prepared_txns_.empty() ? 0 : prepared_txns_.top());
420 const SequenceNumber prepared_top = prepared_txns_.top();
421 const bool empty = prepared_top == kMaxSequenceNumber;
422 // Preliminary check to avoid the synchronization cost
423 if (!empty && prepared_top <= new_max) {
424 if (locked) {
425 // Needed to avoid double locking in pop().
426 prepared_txns_.push_pop_mutex()->Unlock();
427 }
428 WriteLock wl(&prepared_mutex_);
429 // Need to fetch fresh values of ::top after mutex is acquired
430 while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
431 auto to_be_popped = prepared_txns_.top();
432 delayed_prepared_.insert(to_be_popped);
433 ROCKS_LOG_WARN(info_log_,
434 "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
435 " new_max=%" PRIu64,
436 static_cast<uint64_t>(delayed_prepared_.size()),
437 to_be_popped, new_max);
438 delayed_prepared_empty_.store(false, std::memory_order_release);
439 // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise
440 // there will be a point in time that the entry is neither in
441 // prepared_txns_ nor in delayed_prepared_, which will not be checked if
442 // delayed_prepared_empty_ is false.
443 prepared_txns_.pop();
444 }
445 if (locked) {
446 prepared_txns_.push_pop_mutex()->Lock();
447 }
448 }
449 }
450
AddPrepared(uint64_t seq,bool locked)451 void WritePreparedTxnDB::AddPrepared(uint64_t seq, bool locked) {
452 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64,
453 seq, max_evicted_seq_.load());
454 TEST_SYNC_POINT("AddPrepared::begin:pause");
455 TEST_SYNC_POINT("AddPrepared::begin:resume");
456 if (!locked) {
457 prepared_txns_.push_pop_mutex()->Lock();
458 }
459 prepared_txns_.push_pop_mutex()->AssertHeld();
460 prepared_txns_.push(seq);
461 auto new_max = future_max_evicted_seq_.load();
462 if (UNLIKELY(seq <= new_max)) {
463 // This should not happen in normal case
464 ROCKS_LOG_ERROR(
465 info_log_,
466 "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
467 " <= %" PRIu64,
468 seq, new_max);
469 CheckPreparedAgainstMax(new_max, true /*locked*/);
470 }
471 if (!locked) {
472 prepared_txns_.push_pop_mutex()->Unlock();
473 }
474 TEST_SYNC_POINT("AddPrepared::end");
475 }
476
AddCommitted(uint64_t prepare_seq,uint64_t commit_seq,uint8_t loop_cnt)477 void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
478 uint8_t loop_cnt) {
479 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
480 prepare_seq, commit_seq);
481 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
482 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
483 auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
484 CommitEntry64b evicted_64b;
485 CommitEntry evicted;
486 bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
487 if (LIKELY(to_be_evicted)) {
488 assert(evicted.prep_seq != prepare_seq);
489 auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
490 ROCKS_LOG_DETAILS(info_log_,
491 "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
492 evicted.prep_seq, evicted.commit_seq, prev_max);
493 if (prev_max < evicted.commit_seq) {
494 auto last = db_impl_->GetLastPublishedSequence(); // could be 0
495 SequenceNumber max_evicted_seq;
496 if (LIKELY(evicted.commit_seq < last)) {
497 assert(last > 0);
498 // Inc max in larger steps to avoid frequent updates
499 max_evicted_seq =
500 std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1);
501 } else {
502 // legit when a commit entry in a write batch overwrite the previous one
503 max_evicted_seq = evicted.commit_seq;
504 }
505 ROCKS_LOG_DETAILS(info_log_,
506 "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
507 " => %lu",
508 prepare_seq, evicted.prep_seq, evicted.commit_seq,
509 prev_max, max_evicted_seq);
510 AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
511 }
512 if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) {
513 WriteLock wl(&prepared_mutex_);
514 auto dp_iter = delayed_prepared_.find(evicted.prep_seq);
515 if (dp_iter != delayed_prepared_.end()) {
516 // This is a rare case that txn is committed but prepared_txns_ is not
517 // cleaned up yet. Refer to delayed_prepared_commits_ definition for
518 // why it should be kept updated.
519 delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
520 ROCKS_LOG_DEBUG(info_log_,
521 "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
522 evicted.prep_seq, evicted.commit_seq);
523 }
524 }
525 // After each eviction from commit cache, check if the commit entry should
526 // be kept around because it overlaps with a live snapshot.
527 CheckAgainstSnapshots(evicted);
528 }
529 bool succ =
530 ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
531 if (UNLIKELY(!succ)) {
532 ROCKS_LOG_ERROR(info_log_,
533 "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
534 ",%" PRIu64 " retrying...",
535 indexed_seq, prepare_seq, commit_seq);
536 // A very rare event, in which the commit entry is updated before we do.
537 // Here we apply a very simple solution of retrying.
538 if (loop_cnt > 100) {
539 throw std::runtime_error("Infinite loop in AddCommitted!");
540 }
541 AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
542 return;
543 }
544 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
545 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
546 }
547
RemovePrepared(const uint64_t prepare_seq,const size_t batch_cnt)548 void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
549 const size_t batch_cnt) {
550 TEST_SYNC_POINT_CALLBACK(
551 "RemovePrepared:Start",
552 const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq)));
553 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
554 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
555 ROCKS_LOG_DETAILS(info_log_,
556 "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt,
557 prepare_seq, batch_cnt);
558 WriteLock wl(&prepared_mutex_);
559 for (size_t i = 0; i < batch_cnt; i++) {
560 prepared_txns_.erase(prepare_seq + i);
561 bool was_empty = delayed_prepared_.empty();
562 if (!was_empty) {
563 delayed_prepared_.erase(prepare_seq + i);
564 auto it = delayed_prepared_commits_.find(prepare_seq + i);
565 if (it != delayed_prepared_commits_.end()) {
566 ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64,
567 prepare_seq + i);
568 delayed_prepared_commits_.erase(it);
569 }
570 bool is_empty = delayed_prepared_.empty();
571 if (was_empty != is_empty) {
572 delayed_prepared_empty_.store(is_empty, std::memory_order_release);
573 }
574 }
575 }
576 }
577
GetCommitEntry(const uint64_t indexed_seq,CommitEntry64b * entry_64b,CommitEntry * entry) const578 bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
579 CommitEntry64b* entry_64b,
580 CommitEntry* entry) const {
581 *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(std::memory_order_acquire);
582 bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
583 return valid;
584 }
585
AddCommitEntry(const uint64_t indexed_seq,const CommitEntry & new_entry,CommitEntry * evicted_entry)586 bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
587 const CommitEntry& new_entry,
588 CommitEntry* evicted_entry) {
589 CommitEntry64b new_entry_64b(new_entry, FORMAT);
590 CommitEntry64b evicted_entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].exchange(
591 new_entry_64b, std::memory_order_acq_rel);
592 bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
593 return valid;
594 }
595
ExchangeCommitEntry(const uint64_t indexed_seq,CommitEntry64b & expected_entry_64b,const CommitEntry & new_entry)596 bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
597 CommitEntry64b& expected_entry_64b,
598 const CommitEntry& new_entry) {
599 auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)];
600 CommitEntry64b new_entry_64b(new_entry, FORMAT);
601 bool succ = atomic_entry.compare_exchange_strong(
602 expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
603 std::memory_order_acquire);
604 return succ;
605 }
606
AdvanceMaxEvictedSeq(const SequenceNumber & prev_max,const SequenceNumber & new_max)607 void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
608 const SequenceNumber& new_max) {
609 ROCKS_LOG_DETAILS(info_log_,
610 "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
611 prev_max, new_max);
612 // Declare the intention before getting snapshot from the DB. This helps a
613 // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
614 // it has not already. Otherwise the new snapshot is when we ask DB for
615 // snapshots smaller than future max.
616 auto updated_future_max = prev_max;
617 while (updated_future_max < new_max &&
618 !future_max_evicted_seq_.compare_exchange_weak(
619 updated_future_max, new_max, std::memory_order_acq_rel,
620 std::memory_order_relaxed)) {
621 };
622
623 CheckPreparedAgainstMax(new_max, false /*locked*/);
624
625 // With each change to max_evicted_seq_ fetch the live snapshots behind it.
626 // We use max as the version of snapshots to identify how fresh are the
627 // snapshot list. This works because the snapshots are between 0 and
628 // max, so the larger the max, the more complete they are.
629 SequenceNumber new_snapshots_version = new_max;
630 std::vector<SequenceNumber> snapshots;
631 bool update_snapshots = false;
632 if (new_snapshots_version > snapshots_version_) {
633 // This is to avoid updating the snapshots_ if it already updated
634 // with a more recent vesion by a concrrent thread
635 update_snapshots = true;
636 // We only care about snapshots lower then max
637 snapshots = GetSnapshotListFromDB(new_max);
638 }
639 if (update_snapshots) {
640 UpdateSnapshots(snapshots, new_snapshots_version);
641 if (!snapshots.empty()) {
642 WriteLock wl(&old_commit_map_mutex_);
643 for (auto snap : snapshots) {
644 // This allows IsInSnapshot to tell apart the reads from in valid
645 // snapshots from the reads from committed values in valid snapshots.
646 old_commit_map_[snap];
647 }
648 old_commit_map_empty_.store(false, std::memory_order_release);
649 }
650 }
651 auto updated_prev_max = prev_max;
652 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
653 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
654 while (updated_prev_max < new_max &&
655 !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
656 std::memory_order_acq_rel,
657 std::memory_order_relaxed)) {
658 };
659 }
660
GetSnapshot()661 const Snapshot* WritePreparedTxnDB::GetSnapshot() {
662 const bool kForWWConflictCheck = true;
663 return GetSnapshotInternal(!kForWWConflictCheck);
664 }
665
GetSnapshotInternal(bool for_ww_conflict_check)666 SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
667 bool for_ww_conflict_check) {
668 // Note: for this optimization setting the last sequence number and obtaining
669 // the smallest uncommitted seq should be done atomically. However to avoid
670 // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
671 // snapshot. Since we always updated the list of unprepared seq (via
672 // AddPrepared) AFTER the last sequence is updated, this guarantees that the
673 // smallest uncommitted seq that we pair with the snapshot is smaller or equal
674 // the value that would be obtained otherwise atomically. That is ok since
675 // this optimization works as long as min_uncommitted is less than or equal
676 // than the smallest uncommitted seq when the snapshot was taken.
677 auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
678 SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
679 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first");
680 assert(snap_impl);
681 SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
682 // Note: Check against future_max_evicted_seq_ (in contrast with
683 // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
684 if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) {
685 // There is a very rare case in which the commit entry evicts another commit
686 // entry that is not published yet thus advancing max evicted seq beyond the
687 // last published seq. This case is not likely in real-world setup so we
688 // handle it with a few retries.
689 size_t retry = 0;
690 SequenceNumber max;
691 while ((max = future_max_evicted_seq_.load()) != 0 &&
692 snap_impl->GetSequenceNumber() <= max && retry < 100) {
693 ROCKS_LOG_WARN(info_log_,
694 "GetSnapshot snap: %" PRIu64 " max: %" PRIu64
695 " retry %" ROCKSDB_PRIszt,
696 snap_impl->GetSequenceNumber(), max, retry);
697 ReleaseSnapshot(snap_impl);
698 // Wait for last visible seq to catch up with max, and also go beyond it
699 // by one.
700 AdvanceSeqByOne();
701 snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
702 assert(snap_impl);
703 retry++;
704 }
705 assert(snap_impl->GetSequenceNumber() > max);
706 if (snap_impl->GetSequenceNumber() <= max) {
707 throw std::runtime_error(
708 "Snapshot seq " + ToString(snap_impl->GetSequenceNumber()) +
709 " after " + ToString(retry) +
710 " retries is still less than futre_max_evicted_seq_" + ToString(max));
711 }
712 }
713 EnhanceSnapshot(snap_impl, min_uncommitted);
714 ROCKS_LOG_DETAILS(
715 db_impl_->immutable_db_options().info_log,
716 "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
717 snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted);
718 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end");
719 return snap_impl;
720 }
721
AdvanceSeqByOne()722 void WritePreparedTxnDB::AdvanceSeqByOne() {
723 // Inserting an empty value will i) let the max evicted entry to be
724 // published, i.e., max == last_published, increase the last published to
725 // be one beyond max, i.e., max < last_published.
726 WriteOptions woptions;
727 TransactionOptions txn_options;
728 Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr);
729 std::hash<std::thread::id> hasher;
730 char name[64];
731 snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id()));
732 assert(strlen(name) < 64 - 1);
733 Status s = txn0->SetName(name);
734 assert(s.ok());
735 if (s.ok()) {
736 // Without prepare it would simply skip the commit
737 s = txn0->Prepare();
738 }
739 assert(s.ok());
740 if (s.ok()) {
741 s = txn0->Commit();
742 }
743 assert(s.ok());
744 delete txn0;
745 }
746
GetSnapshotListFromDB(SequenceNumber max)747 const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
748 SequenceNumber max) {
749 ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
750 InstrumentedMutexLock dblock(db_impl_->mutex());
751 db_impl_->mutex()->AssertHeld();
752 return db_impl_->snapshots().GetAll(nullptr, max);
753 }
754
ReleaseSnapshotInternal(const SequenceNumber snap_seq)755 void WritePreparedTxnDB::ReleaseSnapshotInternal(
756 const SequenceNumber snap_seq) {
757 // TODO(myabandeh): relax should enough since the synchronizatin is already
758 // done by snapshots_mutex_ under which this function is called.
759 if (snap_seq <= max_evicted_seq_.load(std::memory_order_acquire)) {
760 // Then this is a rare case that transaction did not finish before max
761 // advances. It is expected for a few read-only backup snapshots. For such
762 // snapshots we might have kept around a couple of entries in the
763 // old_commit_map_. Check and do garbage collection if that is the case.
764 bool need_gc = false;
765 {
766 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
767 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
768 snap_seq);
769 ReadLock rl(&old_commit_map_mutex_);
770 auto prep_set_entry = old_commit_map_.find(snap_seq);
771 need_gc = prep_set_entry != old_commit_map_.end();
772 }
773 if (need_gc) {
774 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
775 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
776 snap_seq);
777 WriteLock wl(&old_commit_map_mutex_);
778 old_commit_map_.erase(snap_seq);
779 old_commit_map_empty_.store(old_commit_map_.empty(),
780 std::memory_order_release);
781 }
782 }
783 }
784
CleanupReleasedSnapshots(const std::vector<SequenceNumber> & new_snapshots,const std::vector<SequenceNumber> & old_snapshots)785 void WritePreparedTxnDB::CleanupReleasedSnapshots(
786 const std::vector<SequenceNumber>& new_snapshots,
787 const std::vector<SequenceNumber>& old_snapshots) {
788 auto newi = new_snapshots.begin();
789 auto oldi = old_snapshots.begin();
790 for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) {
791 assert(*newi >= *oldi); // cannot have new snapshots with lower seq
792 if (*newi == *oldi) { // still not released
793 auto value = *newi;
794 while (newi != new_snapshots.end() && *newi == value) {
795 newi++;
796 }
797 while (oldi != old_snapshots.end() && *oldi == value) {
798 oldi++;
799 }
800 } else {
801 assert(*newi > *oldi); // *oldi is released
802 ReleaseSnapshotInternal(*oldi);
803 oldi++;
804 }
805 }
806 // Everything remained in old_snapshots is released and must be cleaned up
807 for (; oldi != old_snapshots.end(); oldi++) {
808 ReleaseSnapshotInternal(*oldi);
809 }
810 }
811
UpdateSnapshots(const std::vector<SequenceNumber> & snapshots,const SequenceNumber & version)812 void WritePreparedTxnDB::UpdateSnapshots(
813 const std::vector<SequenceNumber>& snapshots,
814 const SequenceNumber& version) {
815 ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
816 version);
817 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
818 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
819 #ifndef NDEBUG
820 size_t sync_i = 0;
821 #endif
822 ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead");
823 WriteLock wl(&snapshots_mutex_);
824 snapshots_version_ = version;
825 // We update the list concurrently with the readers.
826 // Both new and old lists are sorted and the new list is subset of the
827 // previous list plus some new items. Thus if a snapshot repeats in
828 // both new and old lists, it will appear upper in the new list. So if
829 // we simply insert the new snapshots in order, if an overwritten item
830 // is still valid in the new list is either written to the same place in
831 // the array or it is written in a higher palce before it gets
832 // overwritten by another item. This guarantess a reader that reads the
833 // list bottom-up will eventaully see a snapshot that repeats in the
834 // update, either before it gets overwritten by the writer or
835 // afterwards.
836 size_t i = 0;
837 auto it = snapshots.begin();
838 for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; ++it, ++i) {
839 snapshot_cache_[i].store(*it, std::memory_order_release);
840 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
841 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
842 }
843 #ifndef NDEBUG
844 // Release the remaining sync points since they are useless given that the
845 // reader would also use lock to access snapshots
846 for (++sync_i; sync_i <= 10; ++sync_i) {
847 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
848 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
849 }
850 #endif
851 snapshots_.clear();
852 for (; it != snapshots.end(); ++it) {
853 // Insert them to a vector that is less efficient to access
854 // concurrently
855 snapshots_.push_back(*it);
856 }
857 // Update the size at the end. Otherwise a parallel reader might read
858 // items that are not set yet.
859 snapshots_total_.store(snapshots.size(), std::memory_order_release);
860
861 // Note: this must be done after the snapshots data structures are updated
862 // with the new list of snapshots.
863 CleanupReleasedSnapshots(snapshots, snapshots_all_);
864 snapshots_all_ = snapshots;
865
866 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
867 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
868 }
869
CheckAgainstSnapshots(const CommitEntry & evicted)870 void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
871 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
872 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
873 #ifndef NDEBUG
874 size_t sync_i = 0;
875 #endif
876 // First check the snapshot cache that is efficient for concurrent access
877 auto cnt = snapshots_total_.load(std::memory_order_acquire);
878 // The list might get updated concurrently as we are reading from it. The
879 // reader should be able to read all the snapshots that are still valid
880 // after the update. Since the survived snapshots are written in a higher
881 // place before gets overwritten the reader that reads bottom-up will
882 // eventully see it.
883 const bool next_is_larger = true;
884 // We will set to true if the border line snapshot suggests that.
885 bool search_larger_list = false;
886 size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
887 for (; 0 < ip1; ip1--) {
888 SequenceNumber snapshot_seq =
889 snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
890 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
891 ++sync_i);
892 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
893 if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot
894 // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
895 // then later also continue the search to larger snapshots
896 search_larger_list = snapshot_seq < evicted.commit_seq;
897 }
898 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
899 snapshot_seq, !next_is_larger)) {
900 break;
901 }
902 }
903 #ifndef NDEBUG
904 // Release the remaining sync points before accquiring the lock
905 for (++sync_i; sync_i <= 10; ++sync_i) {
906 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
907 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
908 }
909 #endif
910 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
911 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
912 if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) {
913 // Then access the less efficient list of snapshots_
914 WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
915 ROCKS_LOG_WARN(info_log_,
916 "snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64
917 "> with %" ROCKSDB_PRIszt " snapshots",
918 evicted.prep_seq, evicted.commit_seq, cnt);
919 ReadLock rl(&snapshots_mutex_);
920 // Items could have moved from the snapshots_ to snapshot_cache_ before
921 // accquiring the lock. To make sure that we do not miss a valid snapshot,
922 // read snapshot_cache_ again while holding the lock.
923 for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
924 SequenceNumber snapshot_seq =
925 snapshot_cache_[i].load(std::memory_order_acquire);
926 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
927 snapshot_seq, next_is_larger)) {
928 break;
929 }
930 }
931 for (auto snapshot_seq_2 : snapshots_) {
932 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
933 snapshot_seq_2, next_is_larger)) {
934 break;
935 }
936 }
937 }
938 }
939
MaybeUpdateOldCommitMap(const uint64_t & prep_seq,const uint64_t & commit_seq,const uint64_t & snapshot_seq,const bool next_is_larger=true)940 bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
941 const uint64_t& prep_seq, const uint64_t& commit_seq,
942 const uint64_t& snapshot_seq, const bool next_is_larger = true) {
943 // If we do not store an entry in old_commit_map_ we assume it is committed in
944 // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
945 // the snapshot so we need not to keep the entry around for this snapshot.
946 if (commit_seq <= snapshot_seq) {
947 // continue the search if the next snapshot could be smaller than commit_seq
948 return !next_is_larger;
949 }
950 // then snapshot_seq < commit_seq
951 if (prep_seq <= snapshot_seq) { // overlapping range
952 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
953 ROCKS_LOG_WARN(info_log_,
954 "old_commit_map_mutex_ overhead for %" PRIu64
955 " commit entry: <%" PRIu64 ",%" PRIu64 ">",
956 snapshot_seq, prep_seq, commit_seq);
957 WriteLock wl(&old_commit_map_mutex_);
958 old_commit_map_empty_.store(false, std::memory_order_release);
959 auto& vec = old_commit_map_[snapshot_seq];
960 vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
961 // We need to store it once for each overlapping snapshot. Returning true to
962 // continue the search if there is more overlapping snapshot.
963 return true;
964 }
965 // continue the search if the next snapshot could be larger than prep_seq
966 return next_is_larger;
967 }
968
~WritePreparedTxnDB()969 WritePreparedTxnDB::~WritePreparedTxnDB() {
970 // At this point there could be running compaction/flush holding a
971 // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
972 // Make sure those jobs finished before destructing WritePreparedTxnDB.
973 if (!db_impl_->shutting_down_) {
974 db_impl_->CancelAllBackgroundWork(true /*wait*/);
975 }
976 }
977
InitWithComp(const uint32_t cf)978 void SubBatchCounter::InitWithComp(const uint32_t cf) {
979 auto cmp = comparators_[cf];
980 keys_[cf] = CFKeys(SetComparator(cmp));
981 }
982
AddKey(const uint32_t cf,const Slice & key)983 void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
984 CFKeys& cf_keys = keys_[cf];
985 if (cf_keys.size() == 0) { // just inserted
986 InitWithComp(cf);
987 }
988 auto it = cf_keys.insert(key);
989 if (it.second == false) { // second is false if a element already existed.
990 batches_++;
991 keys_.clear();
992 InitWithComp(cf);
993 keys_[cf].insert(key);
994 }
995 }
996
997 } // namespace ROCKSDB_NAMESPACE
998 #endif // ROCKSDB_LITE
999