1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/write_unprepared_txn_db.h"
9 #include "db/arena_wrapped_db_iter.h"
10 #include "rocksdb/utilities/transaction_db.h"
11 #include "util/cast_util.h"
12 
13 namespace ROCKSDB_NAMESPACE {
14 
15 // Instead of reconstructing a Transaction object, and calling rollback on it,
16 // we can be more efficient with RollbackRecoveredTransaction by skipping
17 // unnecessary steps (eg. updating CommitMap, reconstructing keyset)
RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction * rtxn)18 Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
19     const DBImpl::RecoveredTransaction* rtxn) {
20   // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
21   assert(rtxn->unprepared_);
22   auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
23   auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
24   // In theory we could write with disableWAL = true during recovery, and
25   // assume that if we crash again during recovery, we can just replay from
26   // the very beginning. Unfortunately, the XIDs from the application may not
27   // necessarily be unique across restarts, potentially leading to situations
28   // like this:
29   //
30   // BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1)
31   // -- crash and recover with Put(a) rolled back as it was not prepared
32   // BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1)
33   // COMMIT(xid = 1)
34   // -- crash and recover with both a, b
35   //
36   // We could just write the rollback marker, but then we would have to extend
37   // MemTableInserter during recovery to actually do writes into the DB
38   // instead of just dropping the in-memory write batch.
39   //
40   WriteOptions w_options;
41 
42   class InvalidSnapshotReadCallback : public ReadCallback {
43    public:
44     InvalidSnapshotReadCallback(SequenceNumber snapshot)
45         : ReadCallback(snapshot) {}
46 
47     inline bool IsVisibleFullCheck(SequenceNumber) override {
48       // The seq provided as snapshot is the seq right before we have locked and
49       // wrote to it, so whatever is there, it is committed.
50       return true;
51     }
52 
53     // Ignore the refresh request since we are confident that our snapshot seq
54     // is not going to be affected by concurrent compactions (not enabled yet.)
55     void Refresh(SequenceNumber) override {}
56   };
57 
58   // Iterate starting with largest sequence number.
59   for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
60     auto last_visible_txn = it->first - 1;
61     const auto& batch = it->second.batch_;
62     WriteBatch rollback_batch;
63 
64     struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
65       DBImpl* db_;
66       ReadOptions roptions;
67       InvalidSnapshotReadCallback callback;
68       WriteBatch* rollback_batch_;
69       std::map<uint32_t, const Comparator*>& comparators_;
70       std::map<uint32_t, ColumnFamilyHandle*>& handles_;
71       using CFKeys = std::set<Slice, SetComparator>;
72       std::map<uint32_t, CFKeys> keys_;
73       bool rollback_merge_operands_;
74       RollbackWriteBatchBuilder(
75           DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch,
76           std::map<uint32_t, const Comparator*>& comparators,
77           std::map<uint32_t, ColumnFamilyHandle*>& handles,
78           bool rollback_merge_operands)
79           : db_(db),
80             callback(snap_seq),
81             // disable min_uncommitted optimization
82             rollback_batch_(dst_batch),
83             comparators_(comparators),
84             handles_(handles),
85             rollback_merge_operands_(rollback_merge_operands) {}
86 
87       Status Rollback(uint32_t cf, const Slice& key) {
88         Status s;
89         CFKeys& cf_keys = keys_[cf];
90         if (cf_keys.size() == 0) {  // just inserted
91           auto cmp = comparators_[cf];
92           keys_[cf] = CFKeys(SetComparator(cmp));
93         }
94         auto res = cf_keys.insert(key);
95         if (res.second ==
96             false) {  // second is false if a element already existed.
97           return s;
98         }
99 
100         PinnableSlice pinnable_val;
101         bool not_used;
102         auto cf_handle = handles_[cf];
103         DBImpl::GetImplOptions get_impl_options;
104         get_impl_options.column_family = cf_handle;
105         get_impl_options.value = &pinnable_val;
106         get_impl_options.value_found = &not_used;
107         get_impl_options.callback = &callback;
108         s = db_->GetImpl(roptions, key, get_impl_options);
109         assert(s.ok() || s.IsNotFound());
110         if (s.ok()) {
111           s = rollback_batch_->Put(cf_handle, key, pinnable_val);
112           assert(s.ok());
113         } else if (s.IsNotFound()) {
114           // There has been no readable value before txn. By adding a delete we
115           // make sure that there will be none afterwards either.
116           s = rollback_batch_->Delete(cf_handle, key);
117           assert(s.ok());
118         } else {
119           // Unexpected status. Return it to the user.
120         }
121         return s;
122       }
123 
124       Status PutCF(uint32_t cf, const Slice& key,
125                    const Slice& /*val*/) override {
126         return Rollback(cf, key);
127       }
128 
129       Status DeleteCF(uint32_t cf, const Slice& key) override {
130         return Rollback(cf, key);
131       }
132 
133       Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
134         return Rollback(cf, key);
135       }
136 
137       Status MergeCF(uint32_t cf, const Slice& key,
138                      const Slice& /*val*/) override {
139         if (rollback_merge_operands_) {
140           return Rollback(cf, key);
141         } else {
142           return Status::OK();
143         }
144       }
145 
146       // Recovered batches do not contain 2PC markers.
147       Status MarkNoop(bool) override { return Status::InvalidArgument(); }
148       Status MarkBeginPrepare(bool) override {
149         return Status::InvalidArgument();
150       }
151       Status MarkEndPrepare(const Slice&) override {
152         return Status::InvalidArgument();
153       }
154       Status MarkCommit(const Slice&) override {
155         return Status::InvalidArgument();
156       }
157       Status MarkRollback(const Slice&) override {
158         return Status::InvalidArgument();
159       }
160     } rollback_handler(db_impl_, last_visible_txn, &rollback_batch,
161                        *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
162                        txn_db_options_.rollback_merge_operands);
163 
164     auto s = batch->Iterate(&rollback_handler);
165     if (!s.ok()) {
166       return s;
167     }
168 
169     // The Rollback marker will be used as a batch separator
170     WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
171 
172     const uint64_t kNoLogRef = 0;
173     const bool kDisableMemtable = true;
174     const size_t kOneBatch = 1;
175     uint64_t seq_used = kMaxSequenceNumber;
176     s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
177                             kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch);
178     if (!s.ok()) {
179       return s;
180     }
181 
182     // If two_write_queues, we must manually release the sequence number to
183     // readers.
184     if (db_impl_->immutable_db_options().two_write_queues) {
185       db_impl_->SetLastPublishedSequence(seq_used);
186     }
187   }
188 
189   return Status::OK();
190 }
191 
Initialize(const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles)192 Status WriteUnpreparedTxnDB::Initialize(
193     const std::vector<size_t>& compaction_enabled_cf_indices,
194     const std::vector<ColumnFamilyHandle*>& handles) {
195   // TODO(lth): Reduce code duplication in this function.
196   auto dbimpl = static_cast_with_check<DBImpl, DB>(GetRootDB());
197   assert(dbimpl != nullptr);
198 
199   db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
200   // A callback to commit a single sub-batch
201   class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
202    public:
203     explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
204         : db_(db) {}
205     Status Callback(SequenceNumber commit_seq,
206                     bool is_mem_disabled __attribute__((__unused__)), uint64_t,
207                     size_t /*index*/, size_t /*total*/) override {
208       assert(!is_mem_disabled);
209       db_->AddCommitted(commit_seq, commit_seq);
210       return Status::OK();
211     }
212 
213    private:
214     WritePreparedTxnDB* db_;
215   };
216   db_impl_->SetRecoverableStatePreReleaseCallback(
217       new CommitSubBatchPreReleaseCallback(this));
218 
219   // PessimisticTransactionDB::Initialize
220   for (auto cf_ptr : handles) {
221     AddColumnFamily(cf_ptr);
222   }
223   // Verify cf options
224   for (auto handle : handles) {
225     ColumnFamilyDescriptor cfd;
226     Status s = handle->GetDescriptor(&cfd);
227     if (!s.ok()) {
228       return s;
229     }
230     s = VerifyCFOptions(cfd.options);
231     if (!s.ok()) {
232       return s;
233     }
234   }
235 
236   // Re-enable compaction for the column families that initially had
237   // compaction enabled.
238   std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
239   compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
240   for (auto index : compaction_enabled_cf_indices) {
241     compaction_enabled_cf_handles.push_back(handles[index]);
242   }
243 
244   // create 'real' transactions from recovered shell transactions
245   auto rtxns = dbimpl->recovered_transactions();
246   std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
247   for (auto rtxn : rtxns) {
248     auto recovered_trx = rtxn.second;
249     assert(recovered_trx);
250     assert(recovered_trx->batches_.size() >= 1);
251     assert(recovered_trx->name_.length());
252 
253     // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
254     // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
255     // two iterations is required.
256     if (recovered_trx->unprepared_) {
257       continue;
258     }
259 
260     WriteOptions w_options;
261     w_options.sync = true;
262     TransactionOptions t_options;
263 
264     auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
265     auto first_seq = recovered_trx->batches_.begin()->first;
266     auto last_prepare_batch_cnt =
267         recovered_trx->batches_.begin()->second.batch_cnt_;
268 
269     Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
270     assert(real_trx);
271     auto wupt =
272         static_cast_with_check<WriteUnpreparedTxn, Transaction>(real_trx);
273     wupt->recovered_txn_ = true;
274 
275     real_trx->SetLogNumber(first_log_number);
276     real_trx->SetId(first_seq);
277     Status s = real_trx->SetName(recovered_trx->name_);
278     if (!s.ok()) {
279       return s;
280     }
281     wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
282 
283     for (auto batch : recovered_trx->batches_) {
284       const auto& seq = batch.first;
285       const auto& batch_info = batch.second;
286       auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
287       assert(batch_info.log_number_);
288 
289       ordered_seq_cnt[seq] = cnt;
290       assert(wupt->unprep_seqs_.count(seq) == 0);
291       wupt->unprep_seqs_[seq] = cnt;
292 
293       s = wupt->RebuildFromWriteBatch(batch_info.batch_);
294       assert(s.ok());
295       if (!s.ok()) {
296         return s;
297       }
298     }
299 
300     const bool kClear = true;
301     wupt->InitWriteBatch(kClear);
302 
303     real_trx->SetState(Transaction::PREPARED);
304     if (!s.ok()) {
305       return s;
306     }
307   }
308   // AddPrepared must be called in order
309   for (auto seq_cnt : ordered_seq_cnt) {
310     auto seq = seq_cnt.first;
311     auto cnt = seq_cnt.second;
312     for (size_t i = 0; i < cnt; i++) {
313       AddPrepared(seq + i);
314     }
315   }
316 
317   SequenceNumber prev_max = max_evicted_seq_;
318   SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
319   AdvanceMaxEvictedSeq(prev_max, last_seq);
320   // Create a gap between max and the next snapshot. This simplifies the logic
321   // in IsInSnapshot by not having to consider the special case of max ==
322   // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
323   if (last_seq) {
324     db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
325     db_impl_->versions_->SetLastSequence(last_seq + 1);
326     db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
327   }
328 
329   Status s;
330   // Rollback unprepared transactions.
331   for (auto rtxn : rtxns) {
332     auto recovered_trx = rtxn.second;
333     if (recovered_trx->unprepared_) {
334       s = RollbackRecoveredTransaction(recovered_trx);
335       if (!s.ok()) {
336         return s;
337       }
338       continue;
339     }
340   }
341 
342   if (s.ok()) {
343     dbimpl->DeleteAllRecoveredTransactions();
344 
345     // Compaction should start only after max_evicted_seq_ is set AND recovered
346     // transactions are either added to PrepareHeap or rolled back.
347     s = EnableAutoCompaction(compaction_enabled_cf_handles);
348   }
349 
350   return s;
351 }
352 
BeginTransaction(const WriteOptions & write_options,const TransactionOptions & txn_options,Transaction * old_txn)353 Transaction* WriteUnpreparedTxnDB::BeginTransaction(
354     const WriteOptions& write_options, const TransactionOptions& txn_options,
355     Transaction* old_txn) {
356   if (old_txn != nullptr) {
357     ReinitializeTransaction(old_txn, write_options, txn_options);
358     return old_txn;
359   } else {
360     return new WriteUnpreparedTxn(this, write_options, txn_options);
361   }
362 }
363 
364 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
365 struct WriteUnpreparedTxnDB::IteratorState {
IteratorStateROCKSDB_NAMESPACE::WriteUnpreparedTxnDB::IteratorState366   IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
367                 std::shared_ptr<ManagedSnapshot> s,
368                 SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
369       : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_,
370                  kBackedByDBSnapshot),
371         snapshot(s) {}
MaxVisibleSeqROCKSDB_NAMESPACE::WriteUnpreparedTxnDB::IteratorState372   SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }
373 
374   WriteUnpreparedTxnReadCallback callback;
375   std::shared_ptr<ManagedSnapshot> snapshot;
376 };
377 
378 namespace {
CleanupWriteUnpreparedTxnDBIterator(void * arg1,void *)379 static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
380   delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
381 }
382 }  // anonymous namespace
383 
NewIterator(const ReadOptions & options,ColumnFamilyHandle * column_family,WriteUnpreparedTxn * txn)384 Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
385                                             ColumnFamilyHandle* column_family,
386                                             WriteUnpreparedTxn* txn) {
387   // TODO(lth): Refactor so that this logic is shared with WritePrepared.
388   constexpr bool ALLOW_BLOB = true;
389   constexpr bool ALLOW_REFRESH = true;
390   std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
391   SequenceNumber snapshot_seq = kMaxSequenceNumber;
392   SequenceNumber min_uncommitted = 0;
393 
394   // Currently, the Prev() iterator logic does not work well without snapshot
395   // validation. The logic simply iterates through values of a key in
396   // ascending seqno order, stopping at the first non-visible value and
397   // returning the last visible value.
398   //
399   // For example, if snapshot sequence is 3, and we have the following keys:
400   // foo: v1 1
401   // foo: v2 2
402   // foo: v3 3
403   // foo: v4 4
404   // foo: v5 5
405   //
406   // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
407   // which is the last visible value.
408   //
409   // For unprepared transactions, if we have snap_seq = 3, but the current
410   // transaction has unprep_seq 5, then returning the first non-visible value
411   // would be incorrect, as we should return v5, and not v3. The problem is that
412   // there are committed values at snapshot_seq < commit_seq < unprep_seq.
413   //
414   // Snapshot validation can prevent this problem by ensuring that no committed
415   // values exist at snapshot_seq < commit_seq, and thus any value with a
416   // sequence number greater than snapshot_seq must be unprepared values. For
417   // example, if the transaction had a snapshot at 3, then snapshot validation
418   // would be performed during the Put(v5) call. It would find v4, and the Put
419   // would fail with snapshot validation failure.
420   //
421   // TODO(lth): Improve Prev() logic to continue iterating until
422   // max_visible_seq, and then return the last visible value, so that this
423   // restriction can be lifted.
424   const Snapshot* snapshot = nullptr;
425   if (options.snapshot == nullptr) {
426     snapshot = GetSnapshot();
427     own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
428   } else {
429     snapshot = options.snapshot;
430   }
431 
432   snapshot_seq = snapshot->GetSequenceNumber();
433   assert(snapshot_seq != kMaxSequenceNumber);
434   // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
435   // guaranteed that for keys that were modified by this transaction (and thus
436   // might have unprepared values), no committed values exist at
437   // largest_validated_seq < commit_seq (or the contrapositive: any committed
438   // value must exist at commit_seq <= largest_validated_seq). This implies
439   // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
440   // snapshot_seq. As explained above, the problem with Prev() only happens when
441   // snapshot_seq < commit_seq.
442   //
443   // For keys that were not modified by this transaction, largest_validated_seq_
444   // is meaningless, and Prev() should just work with the existing visibility
445   // logic.
446   if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() &&
447       !txn->unprep_seqs_.empty()) {
448     ROCKS_LOG_ERROR(info_log_,
449                     "WriteUnprepared iterator creation failed since the "
450                     "transaction has performed unvalidated writes");
451     return nullptr;
452   }
453   min_uncommitted =
454       static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
455           ->min_uncommitted_;
456 
457   auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
458   auto* state =
459       new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
460   auto* db_iter =
461       db_impl_->NewIteratorImpl(options, cfd, state->MaxVisibleSeq(),
462                                 &state->callback, !ALLOW_BLOB, !ALLOW_REFRESH);
463   db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
464   return db_iter;
465 }
466 
467 }  // namespace ROCKSDB_NAMESPACE
468 #endif  // ROCKSDB_LITE
469