1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/write_unprepared_txn.h"
9 #include "db/db_impl/db_impl.h"
10 #include "util/cast_util.h"
11 #include "utilities/transactions/write_unprepared_txn_db.h"
12 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
13 
14 namespace ROCKSDB_NAMESPACE {
15 
IsVisibleFullCheck(SequenceNumber seq)16 bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
17   // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
18   // in unprep_seqs, we have to check if seq is equal to prep_seq or any of
19   // the prepare_batch_cnt seq nums after it.
20   //
21   // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
22   // large.
23   for (const auto& it : unprep_seqs_) {
24     if (it.first <= seq && seq < it.first + it.second) {
25       return true;
26     }
27   }
28 
29   bool snap_released = false;
30   auto ret =
31       db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_, &snap_released);
32   assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
33   snap_released_ |= snap_released;
34   return ret;
35 }
36 
WriteUnpreparedTxn(WriteUnpreparedTxnDB * txn_db,const WriteOptions & write_options,const TransactionOptions & txn_options)37 WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
38                                        const WriteOptions& write_options,
39                                        const TransactionOptions& txn_options)
40     : WritePreparedTxn(txn_db, write_options, txn_options),
41       wupt_db_(txn_db),
42       last_log_number_(0),
43       recovered_txn_(false),
44       largest_validated_seq_(0) {
45   if (txn_options.write_batch_flush_threshold < 0) {
46     write_batch_flush_threshold_ =
47         txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
48   } else {
49     write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
50   }
51 }
52 
~WriteUnpreparedTxn()53 WriteUnpreparedTxn::~WriteUnpreparedTxn() {
54   if (!unprep_seqs_.empty()) {
55     assert(log_number_ > 0);
56     assert(GetId() > 0);
57     assert(!name_.empty());
58 
59     // We should rollback regardless of GetState, but some unit tests that
60     // test crash recovery run the destructor assuming that rollback does not
61     // happen, so that rollback during recovery can be exercised.
62     if (GetState() == STARTED || GetState() == LOCKS_STOLEN) {
63       auto s = RollbackInternal();
64       assert(s.ok());
65       if (!s.ok()) {
66         ROCKS_LOG_FATAL(
67             wupt_db_->info_log_,
68             "Rollback of WriteUnprepared transaction failed in destructor: %s",
69             s.ToString().c_str());
70       }
71       dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
72           log_number_);
73     }
74   }
75 
76   // Clear the tracked locks so that ~PessimisticTransaction does not
77   // try to unlock keys for recovered transactions.
78   if (recovered_txn_) {
79     tracked_locks_->Clear();
80   }
81 }
82 
Initialize(const TransactionOptions & txn_options)83 void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
84   PessimisticTransaction::Initialize(txn_options);
85   if (txn_options.write_batch_flush_threshold < 0) {
86     write_batch_flush_threshold_ =
87         txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
88   } else {
89     write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
90   }
91 
92   unprep_seqs_.clear();
93   flushed_save_points_.reset(nullptr);
94   unflushed_save_points_.reset(nullptr);
95   recovered_txn_ = false;
96   largest_validated_seq_ = 0;
97   assert(active_iterators_.empty());
98   active_iterators_.clear();
99   untracked_keys_.clear();
100 }
101 
HandleWrite(std::function<Status ()> do_write)102 Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) {
103   Status s;
104   if (active_iterators_.empty()) {
105     s = MaybeFlushWriteBatchToDB();
106     if (!s.ok()) {
107       return s;
108     }
109   }
110   s = do_write();
111   if (s.ok()) {
112     if (snapshot_) {
113       largest_validated_seq_ =
114           std::max(largest_validated_seq_, snapshot_->GetSequenceNumber());
115     } else {
116       // TODO(lth): We should use the same number as tracked_at_seq in TryLock,
117       // because what is actually being tracked is the sequence number at which
118       // this key was locked at.
119       largest_validated_seq_ = db_impl_->GetLastPublishedSequence();
120     }
121   }
122   return s;
123 }
124 
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value,const bool assume_tracked)125 Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
126                                const Slice& key, const Slice& value,
127                                const bool assume_tracked) {
128   return HandleWrite([&]() {
129     return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
130   });
131 }
132 
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value,const bool assume_tracked)133 Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
134                                const SliceParts& key, const SliceParts& value,
135                                const bool assume_tracked) {
136   return HandleWrite([&]() {
137     return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
138   });
139 }
140 
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value,const bool assume_tracked)141 Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
142                                  const Slice& key, const Slice& value,
143                                  const bool assume_tracked) {
144   return HandleWrite([&]() {
145     return TransactionBaseImpl::Merge(column_family, key, value,
146                                       assume_tracked);
147   });
148 }
149 
Delete(ColumnFamilyHandle * column_family,const Slice & key,const bool assume_tracked)150 Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
151                                   const Slice& key, const bool assume_tracked) {
152   return HandleWrite([&]() {
153     return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
154   });
155 }
156 
Delete(ColumnFamilyHandle * column_family,const SliceParts & key,const bool assume_tracked)157 Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
158                                   const SliceParts& key,
159                                   const bool assume_tracked) {
160   return HandleWrite([&]() {
161     return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
162   });
163 }
164 
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key,const bool assume_tracked)165 Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
166                                         const Slice& key,
167                                         const bool assume_tracked) {
168   return HandleWrite([&]() {
169     return TransactionBaseImpl::SingleDelete(column_family, key,
170                                              assume_tracked);
171   });
172 }
173 
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key,const bool assume_tracked)174 Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
175                                         const SliceParts& key,
176                                         const bool assume_tracked) {
177   return HandleWrite([&]() {
178     return TransactionBaseImpl::SingleDelete(column_family, key,
179                                              assume_tracked);
180   });
181 }
182 
183 // WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For
184 // WriteUnprepared, the write batches have already been written into the
185 // database during WAL replay, so all we have to do is just to "retrack" the key
186 // so that rollbacks are possible.
187 //
188 // Calling TryLock instead of TrackKey is also possible, but as an optimization,
189 // recovered transactions do not hold locks on their keys. This follows the
190 // implementation in PessimisticTransactionDB::Initialize where we set
191 // skip_concurrency_control to true.
RebuildFromWriteBatch(WriteBatch * wb)192 Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) {
193   struct TrackKeyHandler : public WriteBatch::Handler {
194     WriteUnpreparedTxn* txn_;
195     bool rollback_merge_operands_;
196 
197     TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
198         : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
199 
200     Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
201       txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
202                      false /* read_only */, true /* exclusive */);
203       return Status::OK();
204     }
205 
206     Status DeleteCF(uint32_t cf, const Slice& key) override {
207       txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
208                      false /* read_only */, true /* exclusive */);
209       return Status::OK();
210     }
211 
212     Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
213       txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
214                      false /* read_only */, true /* exclusive */);
215       return Status::OK();
216     }
217 
218     Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
219       if (rollback_merge_operands_) {
220         txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
221                        false /* read_only */, true /* exclusive */);
222       }
223       return Status::OK();
224     }
225 
226     // Recovered batches do not contain 2PC markers.
227     Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
228 
229     Status MarkEndPrepare(const Slice&) override {
230       return Status::InvalidArgument();
231     }
232 
233     Status MarkNoop(bool) override { return Status::InvalidArgument(); }
234 
235     Status MarkCommit(const Slice&) override {
236       return Status::InvalidArgument();
237     }
238 
239     Status MarkRollback(const Slice&) override {
240       return Status::InvalidArgument();
241     }
242   };
243 
244   TrackKeyHandler handler(this,
245                           wupt_db_->txn_db_options_.rollback_merge_operands);
246   return wb->Iterate(&handler);
247 }
248 
MaybeFlushWriteBatchToDB()249 Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
250   const bool kPrepared = true;
251   Status s;
252   if (write_batch_flush_threshold_ > 0 &&
253       write_batch_.GetWriteBatch()->Count() > 0 &&
254       write_batch_.GetDataSize() >
255           static_cast<size_t>(write_batch_flush_threshold_)) {
256     assert(GetState() != PREPARED);
257     s = FlushWriteBatchToDB(!kPrepared);
258   }
259   return s;
260 }
261 
FlushWriteBatchToDB(bool prepared)262 Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
263   // If the current write batch contains savepoints, then some special handling
264   // is required so that RollbackToSavepoint can work.
265   //
266   // RollbackToSavepoint is not supported after Prepare() is called, so only do
267   // this for unprepared batches.
268   if (!prepared && unflushed_save_points_ != nullptr &&
269       !unflushed_save_points_->empty()) {
270     return FlushWriteBatchWithSavePointToDB();
271   }
272 
273   return FlushWriteBatchToDBInternal(prepared);
274 }
275 
FlushWriteBatchToDBInternal(bool prepared)276 Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
277   if (name_.empty()) {
278     assert(!prepared);
279 #ifndef NDEBUG
280     static std::atomic_ullong autogen_id{0};
281     // To avoid changing all tests to call SetName, just autogenerate one.
282     if (wupt_db_->txn_db_options_.autogenerate_name) {
283       auto s =
284           SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1)));
285       assert(s.ok());
286     } else
287 #endif
288     {
289       return Status::InvalidArgument("Cannot write to DB without SetName.");
290     }
291   }
292 
293   struct UntrackedKeyHandler : public WriteBatch::Handler {
294     WriteUnpreparedTxn* txn_;
295     bool rollback_merge_operands_;
296 
297     UntrackedKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
298         : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
299 
300     Status AddUntrackedKey(uint32_t cf, const Slice& key) {
301       auto str = key.ToString();
302       PointLockStatus lock_status =
303           txn_->tracked_locks_->GetPointLockStatus(cf, str);
304       if (!lock_status.locked) {
305         txn_->untracked_keys_[cf].push_back(str);
306       }
307       return Status::OK();
308     }
309 
310     Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
311       return AddUntrackedKey(cf, key);
312     }
313 
314     Status DeleteCF(uint32_t cf, const Slice& key) override {
315       return AddUntrackedKey(cf, key);
316     }
317 
318     Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
319       return AddUntrackedKey(cf, key);
320     }
321 
322     Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
323       if (rollback_merge_operands_) {
324         return AddUntrackedKey(cf, key);
325       }
326       return Status::OK();
327     }
328 
329     // The only expected 2PC marker is the initial Noop marker.
330     Status MarkNoop(bool empty_batch) override {
331       return empty_batch ? Status::OK() : Status::InvalidArgument();
332     }
333 
334     Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
335 
336     Status MarkEndPrepare(const Slice&) override {
337       return Status::InvalidArgument();
338     }
339 
340     Status MarkCommit(const Slice&) override {
341       return Status::InvalidArgument();
342     }
343 
344     Status MarkRollback(const Slice&) override {
345       return Status::InvalidArgument();
346     }
347   };
348 
349   UntrackedKeyHandler handler(
350       this, wupt_db_->txn_db_options_.rollback_merge_operands);
351   auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&handler);
352   assert(s.ok());
353 
354   // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
355   WriteOptions write_options = write_options_;
356   write_options.disableWAL = false;
357   const bool WRITE_AFTER_COMMIT = true;
358   const bool first_prepare_batch = log_number_ == 0;
359   // MarkEndPrepare will change Noop marker to the appropriate marker.
360   s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
361                                          name_, !WRITE_AFTER_COMMIT, !prepared);
362   assert(s.ok());
363   // For each duplicate key we account for a new sub-batch
364   prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
365   // AddPrepared better to be called in the pre-release callback otherwise there
366   // is a non-zero chance of max advancing prepare_seq and readers assume the
367   // data as committed.
368   // Also having it in the PreReleaseCallback allows in-order addition of
369   // prepared entries to PreparedHeap and hence enables an optimization. Refer
370   // to SmallestUnCommittedSeq for more details.
371   AddPreparedCallback add_prepared_callback(
372       wpt_db_, db_impl_, prepare_batch_cnt_,
373       db_impl_->immutable_db_options().two_write_queues, first_prepare_batch);
374   const bool DISABLE_MEMTABLE = true;
375   uint64_t seq_used = kMaxSequenceNumber;
376   // log_number_ should refer to the oldest log containing uncommitted data
377   // from the current transaction. This means that if log_number_ is set,
378   // WriteImpl should not overwrite that value, so set log_used to nullptr if
379   // log_number_ is already set.
380   s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
381                           /*callback*/ nullptr, &last_log_number_,
382                           /*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used,
383                           prepare_batch_cnt_, &add_prepared_callback);
384   if (log_number_ == 0) {
385     log_number_ = last_log_number_;
386   }
387   assert(!s.ok() || seq_used != kMaxSequenceNumber);
388   auto prepare_seq = seq_used;
389 
390   // Only call SetId if it hasn't been set yet.
391   if (GetId() == 0) {
392     SetId(prepare_seq);
393   }
394   // unprep_seqs_ will also contain prepared seqnos since they are treated in
395   // the same way in the prepare/commit callbacks. See the comment on the
396   // definition of unprep_seqs_.
397   unprep_seqs_[prepare_seq] = prepare_batch_cnt_;
398 
399   // Reset transaction state.
400   if (!prepared) {
401     prepare_batch_cnt_ = 0;
402     const bool kClear = true;
403     TransactionBaseImpl::InitWriteBatch(kClear);
404   }
405 
406   return s;
407 }
408 
FlushWriteBatchWithSavePointToDB()409 Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
410   assert(unflushed_save_points_ != nullptr &&
411          unflushed_save_points_->size() > 0);
412   assert(save_points_ != nullptr && save_points_->size() > 0);
413   assert(save_points_->size() >= unflushed_save_points_->size());
414 
415   // Handler class for creating an unprepared batch from a savepoint.
416   struct SavePointBatchHandler : public WriteBatch::Handler {
417     WriteBatchWithIndex* wb_;
418     const std::map<uint32_t, ColumnFamilyHandle*>& handles_;
419 
420     SavePointBatchHandler(
421         WriteBatchWithIndex* wb,
422         const std::map<uint32_t, ColumnFamilyHandle*>& handles)
423         : wb_(wb), handles_(handles) {}
424 
425     Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
426       return wb_->Put(handles_.at(cf), key, value);
427     }
428 
429     Status DeleteCF(uint32_t cf, const Slice& key) override {
430       return wb_->Delete(handles_.at(cf), key);
431     }
432 
433     Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
434       return wb_->SingleDelete(handles_.at(cf), key);
435     }
436 
437     Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
438       return wb_->Merge(handles_.at(cf), key, value);
439     }
440 
441     // The only expected 2PC marker is the initial Noop marker.
442     Status MarkNoop(bool empty_batch) override {
443       return empty_batch ? Status::OK() : Status::InvalidArgument();
444     }
445 
446     Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
447 
448     Status MarkEndPrepare(const Slice&) override {
449       return Status::InvalidArgument();
450     }
451 
452     Status MarkCommit(const Slice&) override {
453       return Status::InvalidArgument();
454     }
455 
456     Status MarkRollback(const Slice&) override {
457       return Status::InvalidArgument();
458     }
459   };
460 
461   // The comparator of the default cf is passed in, similar to the
462   // initialization of TransactionBaseImpl::write_batch_. This comparator is
463   // only used if the write batch encounters an invalid cf id, and falls back to
464   // this comparator.
465   WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0,
466                          true, 0);
467   // Swap with write_batch_ so that wb contains the complete write batch. The
468   // actual write batch that will be flushed to DB will be built in
469   // write_batch_, and will be read by FlushWriteBatchToDBInternal.
470   std::swap(wb, write_batch_);
471   TransactionBaseImpl::InitWriteBatch();
472 
473   size_t prev_boundary = WriteBatchInternal::kHeader;
474   const bool kPrepared = true;
475   for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) {
476     bool trailing_batch = i == unflushed_save_points_->size();
477     SavePointBatchHandler sp_handler(&write_batch_,
478                                      *wupt_db_->GetCFHandleMap().get());
479     size_t curr_boundary = trailing_batch ? wb.GetWriteBatch()->GetDataSize()
480                                           : (*unflushed_save_points_)[i];
481 
482     // Construct the partial write batch up to the savepoint.
483     //
484     // Theoretically, a memcpy between the write batches should be sufficient
485     // since the rewriting into the batch should produce the exact same byte
486     // representation. Rebuilding the WriteBatchWithIndex index is still
487     // necessary though, and would imply doing two passes over the batch though.
488     Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler,
489                                            prev_boundary, curr_boundary);
490     if (!s.ok()) {
491       return s;
492     }
493 
494     if (write_batch_.GetWriteBatch()->Count() > 0) {
495       // Flush the write batch.
496       s = FlushWriteBatchToDBInternal(!kPrepared);
497       if (!s.ok()) {
498         return s;
499       }
500     }
501 
502     if (!trailing_batch) {
503       if (flushed_save_points_ == nullptr) {
504         flushed_save_points_.reset(
505             new autovector<WriteUnpreparedTxn::SavePoint>());
506       }
507       flushed_save_points_->emplace_back(
508           unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot()));
509     }
510 
511     prev_boundary = curr_boundary;
512     const bool kClear = true;
513     TransactionBaseImpl::InitWriteBatch(kClear);
514   }
515 
516   unflushed_save_points_->clear();
517   return Status::OK();
518 }
519 
PrepareInternal()520 Status WriteUnpreparedTxn::PrepareInternal() {
521   const bool kPrepared = true;
522   return FlushWriteBatchToDB(kPrepared);
523 }
524 
CommitWithoutPrepareInternal()525 Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
526   if (unprep_seqs_.empty()) {
527     assert(log_number_ == 0);
528     assert(GetId() == 0);
529     return WritePreparedTxn::CommitWithoutPrepareInternal();
530   }
531 
532   // TODO(lth): We should optimize commit without prepare to not perform
533   // a prepare under the hood.
534   auto s = PrepareInternal();
535   if (!s.ok()) {
536     return s;
537   }
538   return CommitInternal();
539 }
540 
CommitInternal()541 Status WriteUnpreparedTxn::CommitInternal() {
542   // TODO(lth): Reduce duplicate code with WritePrepared commit logic.
543 
544   // We take the commit-time batch and append the Commit marker.  The Memtable
545   // will ignore the Commit marker in non-recovery mode
546   WriteBatch* working_batch = GetCommitTimeWriteBatch();
547   const bool empty = working_batch->Count() == 0;
548   auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
549   assert(s.ok());
550 
551   const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
552   if (!empty && for_recovery) {
553     // When not writing to memtable, we can still cache the latest write batch.
554     // The cached batch will be written to memtable in WriteRecoverableState
555     // during FlushMemTable
556     WriteBatchInternal::SetAsLastestPersistentState(working_batch);
557   }
558 
559   const bool includes_data = !empty && !for_recovery;
560   size_t commit_batch_cnt = 0;
561   if (UNLIKELY(includes_data)) {
562     ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
563                    "Duplicate key overhead");
564     SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
565     s = working_batch->Iterate(&counter);
566     assert(s.ok());
567     commit_batch_cnt = counter.BatchCount();
568   }
569   const bool disable_memtable = !includes_data;
570   const bool do_one_write =
571       !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
572 
573   WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
574       wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt);
575   const bool kFirstPrepareBatch = true;
576   AddPreparedCallback add_prepared_callback(
577       wpt_db_, db_impl_, commit_batch_cnt,
578       db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
579   PreReleaseCallback* pre_release_callback;
580   if (do_one_write) {
581     pre_release_callback = &update_commit_map;
582   } else {
583     pre_release_callback = &add_prepared_callback;
584   }
585   uint64_t seq_used = kMaxSequenceNumber;
586   // Since the prepared batch is directly written to memtable, there is
587   // already a connection between the memtable and its WAL, so there is no
588   // need to redundantly reference the log that contains the prepared data.
589   const uint64_t zero_log_number = 0ull;
590   size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
591   s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
592                           zero_log_number, disable_memtable, &seq_used,
593                           batch_cnt, pre_release_callback);
594   assert(!s.ok() || seq_used != kMaxSequenceNumber);
595   const SequenceNumber commit_batch_seq = seq_used;
596   if (LIKELY(do_one_write || !s.ok())) {
597     if (LIKELY(s.ok())) {
598       // Note RemovePrepared should be called after WriteImpl that publishsed
599       // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
600       for (const auto& seq : unprep_seqs_) {
601         wpt_db_->RemovePrepared(seq.first, seq.second);
602       }
603     }
604     if (UNLIKELY(!do_one_write)) {
605       wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
606     }
607     unprep_seqs_.clear();
608     flushed_save_points_.reset(nullptr);
609     unflushed_save_points_.reset(nullptr);
610     return s;
611   }  // else do the 2nd write to publish seq
612 
613   // Populate unprep_seqs_ with commit_batch_seq, since we treat data in the
614   // commit write batch as just another "unprepared" batch. This will also
615   // update the unprep_seqs_ in the update_commit_map callback.
616   unprep_seqs_[commit_batch_seq] = commit_batch_cnt;
617   WriteUnpreparedCommitEntryPreReleaseCallback
618       update_commit_map_with_commit_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
619 
620   // Note: the 2nd write comes with a performance penality. So if we have too
621   // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
622   // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
623   // two_write_queues should be disabled to avoid many additional writes here.
624 
625   // Update commit map only from the 2nd queue
626   WriteBatch empty_batch;
627   s = empty_batch.PutLogData(Slice());
628   assert(s.ok());
629   // In the absence of Prepare markers, use Noop as a batch separator
630   s = WriteBatchInternal::InsertNoop(&empty_batch);
631   assert(s.ok());
632   const bool DISABLE_MEMTABLE = true;
633   const size_t ONE_BATCH = 1;
634   const uint64_t NO_REF_LOG = 0;
635   s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
636                           NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
637                           &update_commit_map_with_commit_batch);
638   assert(!s.ok() || seq_used != kMaxSequenceNumber);
639   // Note RemovePrepared should be called after WriteImpl that publishsed the
640   // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
641   for (const auto& seq : unprep_seqs_) {
642     wpt_db_->RemovePrepared(seq.first, seq.second);
643   }
644   unprep_seqs_.clear();
645   flushed_save_points_.reset(nullptr);
646   unflushed_save_points_.reset(nullptr);
647   return s;
648 }
649 
WriteRollbackKeys(const LockTracker & lock_tracker,WriteBatchWithIndex * rollback_batch,ReadCallback * callback,const ReadOptions & roptions)650 Status WriteUnpreparedTxn::WriteRollbackKeys(
651     const LockTracker& lock_tracker, WriteBatchWithIndex* rollback_batch,
652     ReadCallback* callback, const ReadOptions& roptions) {
653   // This assertion can be removed when range lock is supported.
654   assert(lock_tracker.IsPointLockSupported());
655   const auto& cf_map = *wupt_db_->GetCFHandleMap();
656   auto WriteRollbackKey = [&](const std::string& key, uint32_t cfid) {
657     const auto& cf_handle = cf_map.at(cfid);
658     PinnableSlice pinnable_val;
659     bool not_used;
660     DBImpl::GetImplOptions get_impl_options;
661     get_impl_options.column_family = cf_handle;
662     get_impl_options.value = &pinnable_val;
663     get_impl_options.value_found = &not_used;
664     get_impl_options.callback = callback;
665     auto s = db_impl_->GetImpl(roptions, key, get_impl_options);
666 
667     if (s.ok()) {
668       s = rollback_batch->Put(cf_handle, key, pinnable_val);
669       assert(s.ok());
670     } else if (s.IsNotFound()) {
671       s = rollback_batch->Delete(cf_handle, key);
672       assert(s.ok());
673     } else {
674       return s;
675     }
676 
677     return Status::OK();
678   };
679 
680   std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
681       lock_tracker.GetColumnFamilyIterator());
682   assert(cf_it != nullptr);
683   while (cf_it->HasNext()) {
684     ColumnFamilyId cf = cf_it->Next();
685     std::unique_ptr<LockTracker::KeyIterator> key_it(
686         lock_tracker.GetKeyIterator(cf));
687     assert(key_it != nullptr);
688     while (key_it->HasNext()) {
689       const std::string& key = key_it->Next();
690       auto s = WriteRollbackKey(key, cf);
691       if (!s.ok()) {
692         return s;
693       }
694     }
695   }
696 
697   for (const auto& cfkey : untracked_keys_) {
698     const auto cfid = cfkey.first;
699     const auto& keys = cfkey.second;
700     for (const auto& key : keys) {
701       auto s = WriteRollbackKey(key, cfid);
702       if (!s.ok()) {
703         return s;
704       }
705     }
706   }
707 
708   return Status::OK();
709 }
710 
RollbackInternal()711 Status WriteUnpreparedTxn::RollbackInternal() {
712   // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
713   WriteBatchWithIndex rollback_batch(
714       wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0);
715   assert(GetId() != kMaxSequenceNumber);
716   assert(GetId() > 0);
717   Status s;
718   auto read_at_seq = kMaxSequenceNumber;
719   ReadOptions roptions;
720   // to prevent callback's seq to be overrriden inside DBImpk::Get
721   roptions.snapshot = wpt_db_->GetMaxSnapshot();
722   // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
723   // need to read our own writes when reading prior versions of the key for
724   // rollback.
725   WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
726   // TODO(lth): We write rollback batch all in a single batch here, but this
727   // should be subdivded into multiple batches as well. In phase 2, when key
728   // sets are read from WAL, this will happen naturally.
729   s = WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions);
730   if (!s.ok()) {
731     return s;
732   }
733 
734   // The Rollback marker will be used as a batch separator
735   s = WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
736   assert(s.ok());
737   bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
738   const bool DISABLE_MEMTABLE = true;
739   const uint64_t NO_REF_LOG = 0;
740   uint64_t seq_used = kMaxSequenceNumber;
741   // Rollback batch may contain duplicate keys, because tracked_keys_ is not
742   // comparator aware.
743   auto rollback_batch_cnt = rollback_batch.SubBatchCnt();
744   // We commit the rolled back prepared batches. Although this is
745   // counter-intuitive, i) it is safe to do so, since the prepared batches are
746   // already canceled out by the rollback batch, ii) adding the commit entry to
747   // CommitCache will allow us to benefit from the existing mechanism in
748   // CommitCache that keeps an entry evicted due to max advance and yet overlaps
749   // with a live snapshot around so that the live snapshot properly skips the
750   // entry even if its prepare seq is lower than max_evicted_seq_.
751   //
752   // TODO(lth): RollbackInternal is conceptually very similar to
753   // CommitInternal, with the rollback batch simply taking on the role of
754   // CommitTimeWriteBatch. We should be able to merge the two code paths.
755   WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
756       wpt_db_, db_impl_, unprep_seqs_, rollback_batch_cnt);
757   // Note: the rollback batch does not need AddPrepared since it is written to
758   // DB in one shot. min_uncommitted still works since it requires capturing
759   // data that is written to DB but not yet committed, while the rollback
760   // batch commits with PreReleaseCallback.
761   s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
762                           nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE,
763                           &seq_used, rollback_batch_cnt,
764                           do_one_write ? &update_commit_map : nullptr);
765   assert(!s.ok() || seq_used != kMaxSequenceNumber);
766   if (!s.ok()) {
767     return s;
768   }
769   if (do_one_write) {
770     for (const auto& seq : unprep_seqs_) {
771       wpt_db_->RemovePrepared(seq.first, seq.second);
772     }
773     unprep_seqs_.clear();
774     flushed_save_points_.reset(nullptr);
775     unflushed_save_points_.reset(nullptr);
776     return s;
777   }  // else do the 2nd write for commit
778 
779   uint64_t& prepare_seq = seq_used;
780   // Populate unprep_seqs_ with rollback_batch_cnt, since we treat data in the
781   // rollback write batch as just another "unprepared" batch. This will also
782   // update the unprep_seqs_ in the update_commit_map callback.
783   unprep_seqs_[prepare_seq] = rollback_batch_cnt;
784   WriteUnpreparedCommitEntryPreReleaseCallback
785       update_commit_map_with_rollback_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
786 
787   ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
788                     "RollbackInternal 2nd write prepare_seq: %" PRIu64,
789                     prepare_seq);
790   WriteBatch empty_batch;
791   const size_t ONE_BATCH = 1;
792   s = empty_batch.PutLogData(Slice());
793   assert(s.ok());
794   // In the absence of Prepare markers, use Noop as a batch separator
795   s = WriteBatchInternal::InsertNoop(&empty_batch);
796   assert(s.ok());
797   s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
798                           NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
799                           &update_commit_map_with_rollback_batch);
800   assert(!s.ok() || seq_used != kMaxSequenceNumber);
801   // Mark the txn as rolled back
802   if (s.ok()) {
803     for (const auto& seq : unprep_seqs_) {
804       wpt_db_->RemovePrepared(seq.first, seq.second);
805     }
806   }
807 
808   unprep_seqs_.clear();
809   flushed_save_points_.reset(nullptr);
810   unflushed_save_points_.reset(nullptr);
811   return s;
812 }
813 
Clear()814 void WriteUnpreparedTxn::Clear() {
815   if (!recovered_txn_) {
816     txn_db_impl_->UnLock(this, *tracked_locks_);
817   }
818   unprep_seqs_.clear();
819   flushed_save_points_.reset(nullptr);
820   unflushed_save_points_.reset(nullptr);
821   recovered_txn_ = false;
822   largest_validated_seq_ = 0;
823   for (auto& it : active_iterators_) {
824     auto bdit = static_cast<BaseDeltaIterator*>(it);
825     bdit->Invalidate(Status::InvalidArgument(
826         "Cannot use iterator after transaction has finished"));
827   }
828   active_iterators_.clear();
829   untracked_keys_.clear();
830   TransactionBaseImpl::Clear();
831 }
832 
SetSavePoint()833 void WriteUnpreparedTxn::SetSavePoint() {
834   assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
835              (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
836          (save_points_ ? save_points_->size() : 0));
837   PessimisticTransaction::SetSavePoint();
838   if (unflushed_save_points_ == nullptr) {
839     unflushed_save_points_.reset(new autovector<size_t>());
840   }
841   unflushed_save_points_->push_back(write_batch_.GetDataSize());
842 }
843 
RollbackToSavePoint()844 Status WriteUnpreparedTxn::RollbackToSavePoint() {
845   assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
846              (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
847          (save_points_ ? save_points_->size() : 0));
848   if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
849     Status s = PessimisticTransaction::RollbackToSavePoint();
850     assert(!s.IsNotFound());
851     unflushed_save_points_->pop_back();
852     return s;
853   }
854 
855   if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
856     return RollbackToSavePointInternal();
857   }
858 
859   return Status::NotFound();
860 }
861 
RollbackToSavePointInternal()862 Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
863   Status s;
864 
865   const bool kClear = true;
866   TransactionBaseImpl::InitWriteBatch(kClear);
867 
868   assert(flushed_save_points_->size() > 0);
869   WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back();
870 
871   assert(save_points_ != nullptr && save_points_->size() > 0);
872   const LockTracker& tracked_keys = *save_points_->top().new_locks_;
873 
874   ReadOptions roptions;
875   roptions.snapshot = top.snapshot_->snapshot();
876   SequenceNumber min_uncommitted =
877       static_cast_with_check<const SnapshotImpl>(roptions.snapshot)
878           ->min_uncommitted_;
879   SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber();
880   WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
881                                           top.unprep_seqs_,
882                                           kBackedByDBSnapshot);
883   s = WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions);
884   if (!s.ok()) {
885     return s;
886   }
887 
888   const bool kPrepared = true;
889   s = FlushWriteBatchToDBInternal(!kPrepared);
890   if (!s.ok()) {
891     return s;
892   }
893 
894   // PessimisticTransaction::RollbackToSavePoint will call also call
895   // RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has
896   // no savepoints because this savepoint has already been flushed. Work around
897   // this by setting a fake savepoint.
898   write_batch_.SetSavePoint();
899   s = PessimisticTransaction::RollbackToSavePoint();
900   assert(s.ok());
901   if (!s.ok()) {
902     return s;
903   }
904 
905   flushed_save_points_->pop_back();
906   return s;
907 }
908 
PopSavePoint()909 Status WriteUnpreparedTxn::PopSavePoint() {
910   assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
911              (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
912          (save_points_ ? save_points_->size() : 0));
913   if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
914     Status s = PessimisticTransaction::PopSavePoint();
915     assert(!s.IsNotFound());
916     unflushed_save_points_->pop_back();
917     return s;
918   }
919 
920   if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
921     // PessimisticTransaction::PopSavePoint will call also call PopSavePoint on
922     // write_batch_. However, write_batch_ is empty and has no savepoints
923     // because this savepoint has already been flushed. Work around this by
924     // setting a fake savepoint.
925     write_batch_.SetSavePoint();
926     Status s = PessimisticTransaction::PopSavePoint();
927     assert(!s.IsNotFound());
928     flushed_save_points_->pop_back();
929     return s;
930   }
931 
932   return Status::NotFound();
933 }
934 
MultiGet(const ReadOptions & options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)935 void WriteUnpreparedTxn::MultiGet(const ReadOptions& options,
936                                   ColumnFamilyHandle* column_family,
937                                   const size_t num_keys, const Slice* keys,
938                                   PinnableSlice* values, Status* statuses,
939                                   const bool sorted_input) {
940   SequenceNumber min_uncommitted, snap_seq;
941   const SnapshotBackup backed_by_snapshot =
942       wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
943   WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
944                                           unprep_seqs_, backed_by_snapshot);
945   write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
946                                       keys, values, statuses, sorted_input,
947                                       &callback);
948   if (UNLIKELY(!callback.valid() ||
949                !wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
950     wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
951     for (size_t i = 0; i < num_keys; i++) {
952       statuses[i] = Status::TryAgain();
953     }
954   }
955 }
956 
Get(const ReadOptions & options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)957 Status WriteUnpreparedTxn::Get(const ReadOptions& options,
958                                ColumnFamilyHandle* column_family,
959                                const Slice& key, PinnableSlice* value) {
960   SequenceNumber min_uncommitted, snap_seq;
961   const SnapshotBackup backed_by_snapshot =
962       wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
963   WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
964                                           unprep_seqs_, backed_by_snapshot);
965   auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
966                                             value, &callback);
967   if (LIKELY(callback.valid() &&
968              wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
969     return res;
970   } else {
971     wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
972     return Status::TryAgain();
973   }
974 }
975 
976 namespace {
CleanupWriteUnpreparedWBWIIterator(void * arg1,void * arg2)977 static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) {
978   auto txn = reinterpret_cast<WriteUnpreparedTxn*>(arg1);
979   auto iter = reinterpret_cast<Iterator*>(arg2);
980   txn->RemoveActiveIterator(iter);
981 }
982 }  // anonymous namespace
983 
GetIterator(const ReadOptions & options)984 Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
985   return GetIterator(options, wupt_db_->DefaultColumnFamily());
986 }
987 
GetIterator(const ReadOptions & options,ColumnFamilyHandle * column_family)988 Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
989                                           ColumnFamilyHandle* column_family) {
990   // Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
991   Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
992   assert(db_iter);
993 
994   auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter);
995   active_iterators_.push_back(iter);
996   iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
997   return iter;
998 }
999 
ValidateSnapshot(ColumnFamilyHandle * column_family,const Slice & key,SequenceNumber * tracked_at_seq)1000 Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
1001                                             const Slice& key,
1002                                             SequenceNumber* tracked_at_seq) {
1003   // TODO(lth): Reduce duplicate code with WritePrepared ValidateSnapshot logic.
1004   assert(snapshot_);
1005 
1006   SequenceNumber min_uncommitted =
1007       static_cast_with_check<const SnapshotImpl>(snapshot_.get())
1008           ->min_uncommitted_;
1009   SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
1010   // tracked_at_seq is either max or the last snapshot with which this key was
1011   // trackeed so there is no need to apply the IsInSnapshot to this comparison
1012   // here as tracked_at_seq is not a prepare seq.
1013   if (*tracked_at_seq <= snap_seq) {
1014     // If the key has been previous validated at a sequence number earlier
1015     // than the curent snapshot's sequence number, we already know it has not
1016     // been modified.
1017     return Status::OK();
1018   }
1019 
1020   *tracked_at_seq = snap_seq;
1021 
1022   ColumnFamilyHandle* cfh =
1023       column_family ? column_family : db_impl_->DefaultColumnFamily();
1024 
1025   WriteUnpreparedTxnReadCallback snap_checker(
1026       wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot);
1027   return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
1028                                                snap_seq, false /* cache_only */,
1029                                                &snap_checker, min_uncommitted);
1030 }
1031 
1032 const std::map<SequenceNumber, size_t>&
GetUnpreparedSequenceNumbers()1033 WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
1034   return unprep_seqs_;
1035 }
1036 
1037 }  // namespace ROCKSDB_NAMESPACE
1038 
1039 #endif  // ROCKSDB_LITE
1040