1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/write_prepared_txn.h"
9 
10 #include <cinttypes>
11 #include <map>
12 #include <set>
13 
14 #include "db/column_family.h"
15 #include "db/db_impl/db_impl.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/status.h"
18 #include "rocksdb/utilities/transaction_db.h"
19 #include "util/cast_util.h"
20 #include "utilities/transactions/pessimistic_transaction.h"
21 #include "utilities/transactions/write_prepared_txn_db.h"
22 
23 namespace ROCKSDB_NAMESPACE {
24 
25 struct WriteOptions;
26 
WritePreparedTxn(WritePreparedTxnDB * txn_db,const WriteOptions & write_options,const TransactionOptions & txn_options)27 WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
28                                    const WriteOptions& write_options,
29                                    const TransactionOptions& txn_options)
30     : PessimisticTransaction(txn_db, write_options, txn_options, false),
31       wpt_db_(txn_db) {
32   // Call Initialize outside PessimisticTransaction constructor otherwise it
33   // would skip overridden functions in WritePreparedTxn since they are not
34   // defined yet in the constructor of PessimisticTransaction
35   Initialize(txn_options);
36 }
37 
Initialize(const TransactionOptions & txn_options)38 void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
39   PessimisticTransaction::Initialize(txn_options);
40   prepare_batch_cnt_ = 0;
41 }
42 
MultiGet(const ReadOptions & options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)43 void WritePreparedTxn::MultiGet(const ReadOptions& options,
44                                 ColumnFamilyHandle* column_family,
45                                 const size_t num_keys, const Slice* keys,
46                                 PinnableSlice* values, Status* statuses,
47                                 const bool sorted_input) {
48   SequenceNumber min_uncommitted, snap_seq;
49   const SnapshotBackup backed_by_snapshot =
50       wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
51   WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
52                                         backed_by_snapshot);
53   write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
54                                       keys, values, statuses, sorted_input,
55                                       &callback);
56   if (UNLIKELY(!callback.valid() ||
57                !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
58     wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
59     for (size_t i = 0; i < num_keys; i++) {
60       statuses[i] = Status::TryAgain();
61     }
62   }
63 }
64 
Get(const ReadOptions & options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)65 Status WritePreparedTxn::Get(const ReadOptions& options,
66                              ColumnFamilyHandle* column_family,
67                              const Slice& key, PinnableSlice* pinnable_val) {
68   SequenceNumber min_uncommitted, snap_seq;
69   const SnapshotBackup backed_by_snapshot =
70       wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
71   WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
72                                         backed_by_snapshot);
73   auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
74                                             pinnable_val, &callback);
75   if (LIKELY(callback.valid() &&
76              wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
77                                        backed_by_snapshot))) {
78     return res;
79   } else {
80     wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
81     return Status::TryAgain();
82   }
83 }
84 
GetIterator(const ReadOptions & options)85 Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
86   // Make sure to get iterator from WritePrepareTxnDB, not the root db.
87   Iterator* db_iter = wpt_db_->NewIterator(options);
88   assert(db_iter);
89 
90   return write_batch_.NewIteratorWithBase(db_iter);
91 }
92 
GetIterator(const ReadOptions & options,ColumnFamilyHandle * column_family)93 Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
94                                         ColumnFamilyHandle* column_family) {
95   // Make sure to get iterator from WritePrepareTxnDB, not the root db.
96   Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
97   assert(db_iter);
98 
99   return write_batch_.NewIteratorWithBase(column_family, db_iter);
100 }
101 
PrepareInternal()102 Status WritePreparedTxn::PrepareInternal() {
103   WriteOptions write_options = write_options_;
104   write_options.disableWAL = false;
105   const bool WRITE_AFTER_COMMIT = true;
106   const bool kFirstPrepareBatch = true;
107   WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
108                                      !WRITE_AFTER_COMMIT);
109   // For each duplicate key we account for a new sub-batch
110   prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
111   // Having AddPrepared in the PreReleaseCallback allows in-order addition of
112   // prepared entries to PreparedHeap and hence enables an optimization. Refer to
113   // SmallestUnCommittedSeq for more details.
114   AddPreparedCallback add_prepared_callback(
115       wpt_db_, db_impl_, prepare_batch_cnt_,
116       db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
117   const bool DISABLE_MEMTABLE = true;
118   uint64_t seq_used = kMaxSequenceNumber;
119   Status s = db_impl_->WriteImpl(
120       write_options, GetWriteBatch()->GetWriteBatch(),
121       /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
122       &seq_used, prepare_batch_cnt_, &add_prepared_callback);
123   assert(!s.ok() || seq_used != kMaxSequenceNumber);
124   auto prepare_seq = seq_used;
125   SetId(prepare_seq);
126   return s;
127 }
128 
CommitWithoutPrepareInternal()129 Status WritePreparedTxn::CommitWithoutPrepareInternal() {
130   // For each duplicate key we account for a new sub-batch
131   const size_t batch_cnt = GetWriteBatch()->SubBatchCnt();
132   return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
133 }
134 
CommitBatchInternal(WriteBatch * batch,size_t batch_cnt)135 Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
136                                              size_t batch_cnt) {
137   return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this);
138 }
139 
CommitInternal()140 Status WritePreparedTxn::CommitInternal() {
141   ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
142                     "CommitInternal prepare_seq: %" PRIu64, GetID());
143   // We take the commit-time batch and append the Commit marker.
144   // The Memtable will ignore the Commit marker in non-recovery mode
145   WriteBatch* working_batch = GetCommitTimeWriteBatch();
146   const bool empty = working_batch->Count() == 0;
147   WriteBatchInternal::MarkCommit(working_batch, name_);
148 
149   const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
150   if (!empty && for_recovery) {
151     // When not writing to memtable, we can still cache the latest write batch.
152     // The cached batch will be written to memtable in WriteRecoverableState
153     // during FlushMemTable
154     WriteBatchInternal::SetAsLastestPersistentState(working_batch);
155   }
156 
157   auto prepare_seq = GetId();
158   const bool includes_data = !empty && !for_recovery;
159   assert(prepare_batch_cnt_);
160   size_t commit_batch_cnt = 0;
161   if (UNLIKELY(includes_data)) {
162     ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
163                    "Duplicate key overhead");
164     SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
165     auto s = working_batch->Iterate(&counter);
166     assert(s.ok());
167     commit_batch_cnt = counter.BatchCount();
168   }
169   const bool disable_memtable = !includes_data;
170   const bool do_one_write =
171       !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
172   WritePreparedCommitEntryPreReleaseCallback update_commit_map(
173       wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
174   // This is to call AddPrepared on CommitTimeWriteBatch
175   const bool kFirstPrepareBatch = true;
176   AddPreparedCallback add_prepared_callback(
177       wpt_db_, db_impl_, commit_batch_cnt,
178       db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
179   PreReleaseCallback* pre_release_callback;
180   if (do_one_write) {
181     pre_release_callback = &update_commit_map;
182   } else {
183     pre_release_callback = &add_prepared_callback;
184   }
185   uint64_t seq_used = kMaxSequenceNumber;
186   // Since the prepared batch is directly written to memtable, there is already
187   // a connection between the memtable and its WAL, so there is no need to
188   // redundantly reference the log that contains the prepared data.
189   const uint64_t zero_log_number = 0ull;
190   size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
191   auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
192                                zero_log_number, disable_memtable, &seq_used,
193                                batch_cnt, pre_release_callback);
194   assert(!s.ok() || seq_used != kMaxSequenceNumber);
195   const SequenceNumber commit_batch_seq = seq_used;
196   if (LIKELY(do_one_write || !s.ok())) {
197     if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
198                  s.ok())) {
199       // Note: RemovePrepared should be called after WriteImpl that publishsed
200       // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
201       wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
202     }  // else RemovePrepared is called from within PreReleaseCallback
203     if (UNLIKELY(!do_one_write)) {
204       assert(!s.ok());
205       // Cleanup the prepared entry we added with add_prepared_callback
206       wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
207     }
208     return s;
209   }  // else do the 2nd write to publish seq
210   // Note: the 2nd write comes with a performance penality. So if we have too
211   // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
212   // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
213   // two_write_queues should be disabled to avoid many additional writes here.
214   const size_t kZeroData = 0;
215   // Update commit map only from the 2nd queue
216   WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
217       wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
218       commit_batch_seq, commit_batch_cnt);
219   WriteBatch empty_batch;
220   empty_batch.PutLogData(Slice());
221   // In the absence of Prepare markers, use Noop as a batch separator
222   WriteBatchInternal::InsertNoop(&empty_batch);
223   const bool DISABLE_MEMTABLE = true;
224   const size_t ONE_BATCH = 1;
225   const uint64_t NO_REF_LOG = 0;
226   s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
227                           NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
228                           &update_commit_map_with_aux_batch);
229   assert(!s.ok() || seq_used != kMaxSequenceNumber);
230   if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) {
231     if (s.ok()) {
232       // Note: RemovePrepared should be called after WriteImpl that publishsed
233       // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
234       wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
235     }
236     wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
237   }  // else RemovePrepared is called from within PreReleaseCallback
238   return s;
239 }
240 
RollbackInternal()241 Status WritePreparedTxn::RollbackInternal() {
242   ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
243                  "RollbackInternal prepare_seq: %" PRIu64, GetId());
244   WriteBatch rollback_batch;
245   assert(GetId() != kMaxSequenceNumber);
246   assert(GetId() > 0);
247   auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
248   auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
249   auto read_at_seq = kMaxSequenceNumber;
250   ReadOptions roptions;
251   // to prevent callback's seq to be overrriden inside DBImpk::Get
252   roptions.snapshot = wpt_db_->GetMaxSnapshot();
253   struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
254     DBImpl* db_;
255     WritePreparedTxnReadCallback callback;
256     WriteBatch* rollback_batch_;
257     std::map<uint32_t, const Comparator*>& comparators_;
258     std::map<uint32_t, ColumnFamilyHandle*>& handles_;
259     using CFKeys = std::set<Slice, SetComparator>;
260     std::map<uint32_t, CFKeys> keys_;
261     bool rollback_merge_operands_;
262     ReadOptions roptions_;
263     RollbackWriteBatchBuilder(
264         DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
265         WriteBatch* dst_batch,
266         std::map<uint32_t, const Comparator*>& comparators,
267         std::map<uint32_t, ColumnFamilyHandle*>& handles,
268         bool rollback_merge_operands, ReadOptions _roptions)
269         : db_(db),
270           callback(wpt_db, snap_seq),  // disable min_uncommitted optimization
271           rollback_batch_(dst_batch),
272           comparators_(comparators),
273           handles_(handles),
274           rollback_merge_operands_(rollback_merge_operands),
275           roptions_(_roptions) {}
276 
277     Status Rollback(uint32_t cf, const Slice& key) {
278       Status s;
279       CFKeys& cf_keys = keys_[cf];
280       if (cf_keys.size() == 0) {  // just inserted
281         auto cmp = comparators_[cf];
282         keys_[cf] = CFKeys(SetComparator(cmp));
283       }
284       auto it = cf_keys.insert(key);
285       if (it.second ==
286           false) {  // second is false if a element already existed.
287         return s;
288       }
289 
290       PinnableSlice pinnable_val;
291       bool not_used;
292       auto cf_handle = handles_[cf];
293       DBImpl::GetImplOptions get_impl_options;
294       get_impl_options.column_family = cf_handle;
295       get_impl_options.value = &pinnable_val;
296       get_impl_options.value_found = &not_used;
297       get_impl_options.callback = &callback;
298       s = db_->GetImpl(roptions_, key, get_impl_options);
299       assert(s.ok() || s.IsNotFound());
300       if (s.ok()) {
301         s = rollback_batch_->Put(cf_handle, key, pinnable_val);
302         assert(s.ok());
303       } else if (s.IsNotFound()) {
304         // There has been no readable value before txn. By adding a delete we
305         // make sure that there will be none afterwards either.
306         s = rollback_batch_->Delete(cf_handle, key);
307         assert(s.ok());
308       } else {
309         // Unexpected status. Return it to the user.
310       }
311       return s;
312     }
313 
314     Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override {
315       return Rollback(cf, key);
316     }
317 
318     Status DeleteCF(uint32_t cf, const Slice& key) override {
319       return Rollback(cf, key);
320     }
321 
322     Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
323       return Rollback(cf, key);
324     }
325 
326     Status MergeCF(uint32_t cf, const Slice& key,
327                    const Slice& /*val*/) override {
328       if (rollback_merge_operands_) {
329         return Rollback(cf, key);
330       } else {
331         return Status::OK();
332       }
333     }
334 
335     Status MarkNoop(bool) override { return Status::OK(); }
336     Status MarkBeginPrepare(bool) override { return Status::OK(); }
337     Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
338     Status MarkCommit(const Slice&) override { return Status::OK(); }
339     Status MarkRollback(const Slice&) override {
340       return Status::InvalidArgument();
341     }
342 
343    protected:
344     bool WriteAfterCommit() const override { return false; }
345   } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
346                      *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
347                      wpt_db_->txn_db_options_.rollback_merge_operands,
348                      roptions);
349   auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
350   assert(s.ok());
351   if (!s.ok()) {
352     return s;
353   }
354   // The Rollback marker will be used as a batch separator
355   WriteBatchInternal::MarkRollback(&rollback_batch, name_);
356   bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
357   const bool DISABLE_MEMTABLE = true;
358   const uint64_t NO_REF_LOG = 0;
359   uint64_t seq_used = kMaxSequenceNumber;
360   const size_t ONE_BATCH = 1;
361   const bool kFirstPrepareBatch = true;
362   // We commit the rolled back prepared batches. Although this is
363   // counter-intuitive, i) it is safe to do so, since the prepared batches are
364   // already canceled out by the rollback batch, ii) adding the commit entry to
365   // CommitCache will allow us to benefit from the existing mechanism in
366   // CommitCache that keeps an entry evicted due to max advance and yet overlaps
367   // with a live snapshot around so that the live snapshot properly skips the
368   // entry even if its prepare seq is lower than max_evicted_seq_.
369   AddPreparedCallback add_prepared_callback(
370       wpt_db_, db_impl_, ONE_BATCH,
371       db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
372   WritePreparedCommitEntryPreReleaseCallback update_commit_map(
373       wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
374   PreReleaseCallback* pre_release_callback;
375   if (do_one_write) {
376     pre_release_callback = &update_commit_map;
377   } else {
378     pre_release_callback = &add_prepared_callback;
379   }
380   // Note: the rollback batch does not need AddPrepared since it is written to
381   // DB in one shot. min_uncommitted still works since it requires capturing
382   // data that is written to DB but not yet committed, while
383   // the rollback batch commits with PreReleaseCallback.
384   s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
385                           NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
386                           pre_release_callback);
387   assert(!s.ok() || seq_used != kMaxSequenceNumber);
388   if (!s.ok()) {
389     return s;
390   }
391   if (do_one_write) {
392     assert(!db_impl_->immutable_db_options().two_write_queues);
393     wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
394     return s;
395   }  // else do the 2nd write for commit
396   uint64_t rollback_seq = seq_used;
397   ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
398                     "RollbackInternal 2nd write rollback_seq: %" PRIu64,
399                     rollback_seq);
400   // Commit the batch by writing an empty batch to the queue that will release
401   // the commit sequence number to readers.
402   WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
403       wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
404   WriteBatch empty_batch;
405   empty_batch.PutLogData(Slice());
406   // In the absence of Prepare markers, use Noop as a batch separator
407   WriteBatchInternal::InsertNoop(&empty_batch);
408   s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
409                           NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
410                           &update_commit_map_with_prepare);
411   assert(!s.ok() || seq_used != kMaxSequenceNumber);
412   ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
413                     "RollbackInternal (status=%s) commit: %" PRIu64,
414                     s.ToString().c_str(), GetId());
415   // TODO(lth): For WriteUnPrepared that rollback is called frequently,
416   // RemovePrepared could be moved to the callback to reduce lock contention.
417   if (s.ok()) {
418     wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
419   }
420   // Note: RemovePrepared for prepared batch is called from within
421   // PreReleaseCallback
422   wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
423 
424   return s;
425 }
426 
ValidateSnapshot(ColumnFamilyHandle * column_family,const Slice & key,SequenceNumber * tracked_at_seq)427 Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
428                                           const Slice& key,
429                                           SequenceNumber* tracked_at_seq) {
430   assert(snapshot_);
431 
432   SequenceNumber min_uncommitted =
433       static_cast_with_check<const SnapshotImpl, const Snapshot>(
434           snapshot_.get())
435           ->min_uncommitted_;
436   SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
437   // tracked_at_seq is either max or the last snapshot with which this key was
438   // trackeed so there is no need to apply the IsInSnapshot to this comparison
439   // here as tracked_at_seq is not a prepare seq.
440   if (*tracked_at_seq <= snap_seq) {
441     // If the key has been previous validated at a sequence number earlier
442     // than the curent snapshot's sequence number, we already know it has not
443     // been modified.
444     return Status::OK();
445   }
446 
447   *tracked_at_seq = snap_seq;
448 
449   ColumnFamilyHandle* cfh =
450       column_family ? column_family : db_impl_->DefaultColumnFamily();
451 
452   WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
453                                             kBackedByDBSnapshot);
454   return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
455                                                snap_seq, false /* cache_only */,
456                                                &snap_checker, min_uncommitted);
457 }
458 
SetSnapshot()459 void WritePreparedTxn::SetSnapshot() {
460   const bool kForWWConflictCheck = true;
461   SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
462   SetSnapshotInternal(snapshot);
463 }
464 
RebuildFromWriteBatch(WriteBatch * src_batch)465 Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
466   auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
467   prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
468   return ret;
469 }
470 
471 }  // namespace ROCKSDB_NAMESPACE
472 
473 #endif  // ROCKSDB_LITE
474