1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/write_unprepared_txn_db.h"
9 #include "db/arena_wrapped_db_iter.h"
10 #include "rocksdb/utilities/transaction_db.h"
11 #include "util/cast_util.h"
12 
13 namespace ROCKSDB_NAMESPACE {
14 
15 // Instead of reconstructing a Transaction object, and calling rollback on it,
16 // we can be more efficient with RollbackRecoveredTransaction by skipping
17 // unnecessary steps (eg. updating CommitMap, reconstructing keyset)
RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction * rtxn)18 Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
19     const DBImpl::RecoveredTransaction* rtxn) {
20   // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
21   assert(rtxn->unprepared_);
22   auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
23   auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
24   // In theory we could write with disableWAL = true during recovery, and
25   // assume that if we crash again during recovery, we can just replay from
26   // the very beginning. Unfortunately, the XIDs from the application may not
27   // necessarily be unique across restarts, potentially leading to situations
28   // like this:
29   //
30   // BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1)
31   // -- crash and recover with Put(a) rolled back as it was not prepared
32   // BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1)
33   // COMMIT(xid = 1)
34   // -- crash and recover with both a, b
35   //
36   // We could just write the rollback marker, but then we would have to extend
37   // MemTableInserter during recovery to actually do writes into the DB
38   // instead of just dropping the in-memory write batch.
39   //
40   WriteOptions w_options;
41 
42   class InvalidSnapshotReadCallback : public ReadCallback {
43    public:
44     InvalidSnapshotReadCallback(SequenceNumber snapshot)
45         : ReadCallback(snapshot) {}
46 
47     inline bool IsVisibleFullCheck(SequenceNumber) override {
48       // The seq provided as snapshot is the seq right before we have locked and
49       // wrote to it, so whatever is there, it is committed.
50       return true;
51     }
52 
53     // Ignore the refresh request since we are confident that our snapshot seq
54     // is not going to be affected by concurrent compactions (not enabled yet.)
55     void Refresh(SequenceNumber) override {}
56   };
57 
58   // Iterate starting with largest sequence number.
59   for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
60     auto last_visible_txn = it->first - 1;
61     const auto& batch = it->second.batch_;
62     WriteBatch rollback_batch;
63 
64     struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
65       DBImpl* db_;
66       ReadOptions roptions;
67       InvalidSnapshotReadCallback callback;
68       WriteBatch* rollback_batch_;
69       std::map<uint32_t, const Comparator*>& comparators_;
70       std::map<uint32_t, ColumnFamilyHandle*>& handles_;
71       using CFKeys = std::set<Slice, SetComparator>;
72       std::map<uint32_t, CFKeys> keys_;
73       bool rollback_merge_operands_;
74       RollbackWriteBatchBuilder(
75           DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch,
76           std::map<uint32_t, const Comparator*>& comparators,
77           std::map<uint32_t, ColumnFamilyHandle*>& handles,
78           bool rollback_merge_operands)
79           : db_(db),
80             callback(snap_seq),
81             // disable min_uncommitted optimization
82             rollback_batch_(dst_batch),
83             comparators_(comparators),
84             handles_(handles),
85             rollback_merge_operands_(rollback_merge_operands) {}
86 
87       Status Rollback(uint32_t cf, const Slice& key) {
88         Status s;
89         CFKeys& cf_keys = keys_[cf];
90         if (cf_keys.size() == 0) {  // just inserted
91           auto cmp = comparators_[cf];
92           keys_[cf] = CFKeys(SetComparator(cmp));
93         }
94         auto res = cf_keys.insert(key);
95         if (res.second ==
96             false) {  // second is false if a element already existed.
97           return s;
98         }
99 
100         PinnableSlice pinnable_val;
101         bool not_used;
102         auto cf_handle = handles_[cf];
103         DBImpl::GetImplOptions get_impl_options;
104         get_impl_options.column_family = cf_handle;
105         get_impl_options.value = &pinnable_val;
106         get_impl_options.value_found = &not_used;
107         get_impl_options.callback = &callback;
108         s = db_->GetImpl(roptions, key, get_impl_options);
109         assert(s.ok() || s.IsNotFound());
110         if (s.ok()) {
111           s = rollback_batch_->Put(cf_handle, key, pinnable_val);
112           assert(s.ok());
113         } else if (s.IsNotFound()) {
114           // There has been no readable value before txn. By adding a delete we
115           // make sure that there will be none afterwards either.
116           s = rollback_batch_->Delete(cf_handle, key);
117           assert(s.ok());
118         } else {
119           // Unexpected status. Return it to the user.
120         }
121         return s;
122       }
123 
124       Status PutCF(uint32_t cf, const Slice& key,
125                    const Slice& /*val*/) override {
126         return Rollback(cf, key);
127       }
128 
129       Status DeleteCF(uint32_t cf, const Slice& key) override {
130         return Rollback(cf, key);
131       }
132 
133       Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
134         return Rollback(cf, key);
135       }
136 
137       Status MergeCF(uint32_t cf, const Slice& key,
138                      const Slice& /*val*/) override {
139         if (rollback_merge_operands_) {
140           return Rollback(cf, key);
141         } else {
142           return Status::OK();
143         }
144       }
145 
146       // Recovered batches do not contain 2PC markers.
147       Status MarkNoop(bool) override { return Status::InvalidArgument(); }
148       Status MarkBeginPrepare(bool) override {
149         return Status::InvalidArgument();
150       }
151       Status MarkEndPrepare(const Slice&) override {
152         return Status::InvalidArgument();
153       }
154       Status MarkCommit(const Slice&) override {
155         return Status::InvalidArgument();
156       }
157       Status MarkRollback(const Slice&) override {
158         return Status::InvalidArgument();
159       }
160     } rollback_handler(db_impl_, last_visible_txn, &rollback_batch,
161                        *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
162                        txn_db_options_.rollback_merge_operands);
163 
164     auto s = batch->Iterate(&rollback_handler);
165     if (!s.ok()) {
166       return s;
167     }
168 
169     // The Rollback marker will be used as a batch separator
170     s = WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
171     if (!s.ok()) {
172       return s;
173     }
174 
175     const uint64_t kNoLogRef = 0;
176     const bool kDisableMemtable = true;
177     const size_t kOneBatch = 1;
178     uint64_t seq_used = kMaxSequenceNumber;
179     s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
180                             kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch);
181     if (!s.ok()) {
182       return s;
183     }
184 
185     // If two_write_queues, we must manually release the sequence number to
186     // readers.
187     if (db_impl_->immutable_db_options().two_write_queues) {
188       db_impl_->SetLastPublishedSequence(seq_used);
189     }
190   }
191 
192   return Status::OK();
193 }
194 
Initialize(const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles)195 Status WriteUnpreparedTxnDB::Initialize(
196     const std::vector<size_t>& compaction_enabled_cf_indices,
197     const std::vector<ColumnFamilyHandle*>& handles) {
198   // TODO(lth): Reduce code duplication in this function.
199   auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
200   assert(dbimpl != nullptr);
201 
202   db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
203   // A callback to commit a single sub-batch
204   class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
205    public:
206     explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
207         : db_(db) {}
208     Status Callback(SequenceNumber commit_seq,
209                     bool is_mem_disabled __attribute__((__unused__)), uint64_t,
210                     size_t /*index*/, size_t /*total*/) override {
211       assert(!is_mem_disabled);
212       db_->AddCommitted(commit_seq, commit_seq);
213       return Status::OK();
214     }
215 
216    private:
217     WritePreparedTxnDB* db_;
218   };
219   db_impl_->SetRecoverableStatePreReleaseCallback(
220       new CommitSubBatchPreReleaseCallback(this));
221 
222   // PessimisticTransactionDB::Initialize
223   for (auto cf_ptr : handles) {
224     AddColumnFamily(cf_ptr);
225   }
226   // Verify cf options
227   for (auto handle : handles) {
228     ColumnFamilyDescriptor cfd;
229     Status s = handle->GetDescriptor(&cfd);
230     if (!s.ok()) {
231       return s;
232     }
233     s = VerifyCFOptions(cfd.options);
234     if (!s.ok()) {
235       return s;
236     }
237   }
238 
239   // Re-enable compaction for the column families that initially had
240   // compaction enabled.
241   std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
242   compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
243   for (auto index : compaction_enabled_cf_indices) {
244     compaction_enabled_cf_handles.push_back(handles[index]);
245   }
246 
247   // create 'real' transactions from recovered shell transactions
248   auto rtxns = dbimpl->recovered_transactions();
249   std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
250   for (auto rtxn : rtxns) {
251     auto recovered_trx = rtxn.second;
252     assert(recovered_trx);
253     assert(recovered_trx->batches_.size() >= 1);
254     assert(recovered_trx->name_.length());
255 
256     // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
257     // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
258     // two iterations is required.
259     if (recovered_trx->unprepared_) {
260       continue;
261     }
262 
263     WriteOptions w_options;
264     w_options.sync = true;
265     TransactionOptions t_options;
266 
267     auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
268     auto first_seq = recovered_trx->batches_.begin()->first;
269     auto last_prepare_batch_cnt =
270         recovered_trx->batches_.begin()->second.batch_cnt_;
271 
272     Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
273     assert(real_trx);
274     auto wupt = static_cast_with_check<WriteUnpreparedTxn>(real_trx);
275     wupt->recovered_txn_ = true;
276 
277     real_trx->SetLogNumber(first_log_number);
278     real_trx->SetId(first_seq);
279     Status s = real_trx->SetName(recovered_trx->name_);
280     if (!s.ok()) {
281       return s;
282     }
283     wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
284 
285     for (auto batch : recovered_trx->batches_) {
286       const auto& seq = batch.first;
287       const auto& batch_info = batch.second;
288       auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
289       assert(batch_info.log_number_);
290 
291       ordered_seq_cnt[seq] = cnt;
292       assert(wupt->unprep_seqs_.count(seq) == 0);
293       wupt->unprep_seqs_[seq] = cnt;
294 
295       s = wupt->RebuildFromWriteBatch(batch_info.batch_);
296       assert(s.ok());
297       if (!s.ok()) {
298         return s;
299       }
300     }
301 
302     const bool kClear = true;
303     wupt->InitWriteBatch(kClear);
304 
305     real_trx->SetState(Transaction::PREPARED);
306     if (!s.ok()) {
307       return s;
308     }
309   }
310   // AddPrepared must be called in order
311   for (auto seq_cnt : ordered_seq_cnt) {
312     auto seq = seq_cnt.first;
313     auto cnt = seq_cnt.second;
314     for (size_t i = 0; i < cnt; i++) {
315       AddPrepared(seq + i);
316     }
317   }
318 
319   SequenceNumber prev_max = max_evicted_seq_;
320   SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
321   AdvanceMaxEvictedSeq(prev_max, last_seq);
322   // Create a gap between max and the next snapshot. This simplifies the logic
323   // in IsInSnapshot by not having to consider the special case of max ==
324   // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
325   if (last_seq) {
326     db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
327     db_impl_->versions_->SetLastSequence(last_seq + 1);
328     db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
329   }
330 
331   Status s;
332   // Rollback unprepared transactions.
333   for (auto rtxn : rtxns) {
334     auto recovered_trx = rtxn.second;
335     if (recovered_trx->unprepared_) {
336       s = RollbackRecoveredTransaction(recovered_trx);
337       if (!s.ok()) {
338         return s;
339       }
340       continue;
341     }
342   }
343 
344   if (s.ok()) {
345     dbimpl->DeleteAllRecoveredTransactions();
346 
347     // Compaction should start only after max_evicted_seq_ is set AND recovered
348     // transactions are either added to PrepareHeap or rolled back.
349     s = EnableAutoCompaction(compaction_enabled_cf_handles);
350   }
351 
352   return s;
353 }
354 
BeginTransaction(const WriteOptions & write_options,const TransactionOptions & txn_options,Transaction * old_txn)355 Transaction* WriteUnpreparedTxnDB::BeginTransaction(
356     const WriteOptions& write_options, const TransactionOptions& txn_options,
357     Transaction* old_txn) {
358   if (old_txn != nullptr) {
359     ReinitializeTransaction(old_txn, write_options, txn_options);
360     return old_txn;
361   } else {
362     return new WriteUnpreparedTxn(this, write_options, txn_options);
363   }
364 }
365 
366 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
367 struct WriteUnpreparedTxnDB::IteratorState {
IteratorStateROCKSDB_NAMESPACE::WriteUnpreparedTxnDB::IteratorState368   IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
369                 std::shared_ptr<ManagedSnapshot> s,
370                 SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
371       : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_,
372                  kBackedByDBSnapshot),
373         snapshot(s) {}
MaxVisibleSeqROCKSDB_NAMESPACE::WriteUnpreparedTxnDB::IteratorState374   SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }
375 
376   WriteUnpreparedTxnReadCallback callback;
377   std::shared_ptr<ManagedSnapshot> snapshot;
378 };
379 
380 namespace {
CleanupWriteUnpreparedTxnDBIterator(void * arg1,void *)381 static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
382   delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
383 }
384 }  // anonymous namespace
385 
NewIterator(const ReadOptions & options,ColumnFamilyHandle * column_family,WriteUnpreparedTxn * txn)386 Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
387                                             ColumnFamilyHandle* column_family,
388                                             WriteUnpreparedTxn* txn) {
389   // TODO(lth): Refactor so that this logic is shared with WritePrepared.
390   constexpr bool expose_blob_index = false;
391   constexpr bool allow_refresh = false;
392   std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
393   SequenceNumber snapshot_seq = kMaxSequenceNumber;
394   SequenceNumber min_uncommitted = 0;
395 
396   // Currently, the Prev() iterator logic does not work well without snapshot
397   // validation. The logic simply iterates through values of a key in
398   // ascending seqno order, stopping at the first non-visible value and
399   // returning the last visible value.
400   //
401   // For example, if snapshot sequence is 3, and we have the following keys:
402   // foo: v1 1
403   // foo: v2 2
404   // foo: v3 3
405   // foo: v4 4
406   // foo: v5 5
407   //
408   // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
409   // which is the last visible value.
410   //
411   // For unprepared transactions, if we have snap_seq = 3, but the current
412   // transaction has unprep_seq 5, then returning the first non-visible value
413   // would be incorrect, as we should return v5, and not v3. The problem is that
414   // there are committed values at snapshot_seq < commit_seq < unprep_seq.
415   //
416   // Snapshot validation can prevent this problem by ensuring that no committed
417   // values exist at snapshot_seq < commit_seq, and thus any value with a
418   // sequence number greater than snapshot_seq must be unprepared values. For
419   // example, if the transaction had a snapshot at 3, then snapshot validation
420   // would be performed during the Put(v5) call. It would find v4, and the Put
421   // would fail with snapshot validation failure.
422   //
423   // TODO(lth): Improve Prev() logic to continue iterating until
424   // max_visible_seq, and then return the last visible value, so that this
425   // restriction can be lifted.
426   const Snapshot* snapshot = nullptr;
427   if (options.snapshot == nullptr) {
428     snapshot = GetSnapshot();
429     own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
430   } else {
431     snapshot = options.snapshot;
432   }
433 
434   snapshot_seq = snapshot->GetSequenceNumber();
435   assert(snapshot_seq != kMaxSequenceNumber);
436   // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
437   // guaranteed that for keys that were modified by this transaction (and thus
438   // might have unprepared values), no committed values exist at
439   // largest_validated_seq < commit_seq (or the contrapositive: any committed
440   // value must exist at commit_seq <= largest_validated_seq). This implies
441   // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
442   // snapshot_seq. As explained above, the problem with Prev() only happens when
443   // snapshot_seq < commit_seq.
444   //
445   // For keys that were not modified by this transaction, largest_validated_seq_
446   // is meaningless, and Prev() should just work with the existing visibility
447   // logic.
448   if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() &&
449       !txn->unprep_seqs_.empty()) {
450     ROCKS_LOG_ERROR(info_log_,
451                     "WriteUnprepared iterator creation failed since the "
452                     "transaction has performed unvalidated writes");
453     return nullptr;
454   }
455   min_uncommitted =
456       static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
457 
458   auto* cfd =
459       static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
460   auto* state =
461       new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
462   auto* db_iter = db_impl_->NewIteratorImpl(
463       options, cfd, state->MaxVisibleSeq(), &state->callback, expose_blob_index,
464       allow_refresh);
465   db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
466   return db_iter;
467 }
468 
469 }  // namespace ROCKSDB_NAMESPACE
470 #endif  // ROCKSDB_LITE
471