1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/write_unprepared_txn_db.h"
9 #include "db/arena_wrapped_db_iter.h"
10 #include "rocksdb/utilities/transaction_db.h"
11 #include "util/cast_util.h"
12
13 namespace ROCKSDB_NAMESPACE {
14
15 // Instead of reconstructing a Transaction object, and calling rollback on it,
16 // we can be more efficient with RollbackRecoveredTransaction by skipping
17 // unnecessary steps (eg. updating CommitMap, reconstructing keyset)
RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction * rtxn)18 Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
19 const DBImpl::RecoveredTransaction* rtxn) {
20 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
21 assert(rtxn->unprepared_);
22 auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
23 auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
24 // In theory we could write with disableWAL = true during recovery, and
25 // assume that if we crash again during recovery, we can just replay from
26 // the very beginning. Unfortunately, the XIDs from the application may not
27 // necessarily be unique across restarts, potentially leading to situations
28 // like this:
29 //
30 // BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1)
31 // -- crash and recover with Put(a) rolled back as it was not prepared
32 // BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1)
33 // COMMIT(xid = 1)
34 // -- crash and recover with both a, b
35 //
36 // We could just write the rollback marker, but then we would have to extend
37 // MemTableInserter during recovery to actually do writes into the DB
38 // instead of just dropping the in-memory write batch.
39 //
40 WriteOptions w_options;
41
42 class InvalidSnapshotReadCallback : public ReadCallback {
43 public:
44 InvalidSnapshotReadCallback(SequenceNumber snapshot)
45 : ReadCallback(snapshot) {}
46
47 inline bool IsVisibleFullCheck(SequenceNumber) override {
48 // The seq provided as snapshot is the seq right before we have locked and
49 // wrote to it, so whatever is there, it is committed.
50 return true;
51 }
52
53 // Ignore the refresh request since we are confident that our snapshot seq
54 // is not going to be affected by concurrent compactions (not enabled yet.)
55 void Refresh(SequenceNumber) override {}
56 };
57
58 // Iterate starting with largest sequence number.
59 for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
60 auto last_visible_txn = it->first - 1;
61 const auto& batch = it->second.batch_;
62 WriteBatch rollback_batch;
63
64 struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
65 DBImpl* db_;
66 ReadOptions roptions;
67 InvalidSnapshotReadCallback callback;
68 WriteBatch* rollback_batch_;
69 std::map<uint32_t, const Comparator*>& comparators_;
70 std::map<uint32_t, ColumnFamilyHandle*>& handles_;
71 using CFKeys = std::set<Slice, SetComparator>;
72 std::map<uint32_t, CFKeys> keys_;
73 bool rollback_merge_operands_;
74 RollbackWriteBatchBuilder(
75 DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch,
76 std::map<uint32_t, const Comparator*>& comparators,
77 std::map<uint32_t, ColumnFamilyHandle*>& handles,
78 bool rollback_merge_operands)
79 : db_(db),
80 callback(snap_seq),
81 // disable min_uncommitted optimization
82 rollback_batch_(dst_batch),
83 comparators_(comparators),
84 handles_(handles),
85 rollback_merge_operands_(rollback_merge_operands) {}
86
87 Status Rollback(uint32_t cf, const Slice& key) {
88 Status s;
89 CFKeys& cf_keys = keys_[cf];
90 if (cf_keys.size() == 0) { // just inserted
91 auto cmp = comparators_[cf];
92 keys_[cf] = CFKeys(SetComparator(cmp));
93 }
94 auto res = cf_keys.insert(key);
95 if (res.second ==
96 false) { // second is false if a element already existed.
97 return s;
98 }
99
100 PinnableSlice pinnable_val;
101 bool not_used;
102 auto cf_handle = handles_[cf];
103 DBImpl::GetImplOptions get_impl_options;
104 get_impl_options.column_family = cf_handle;
105 get_impl_options.value = &pinnable_val;
106 get_impl_options.value_found = ¬_used;
107 get_impl_options.callback = &callback;
108 s = db_->GetImpl(roptions, key, get_impl_options);
109 assert(s.ok() || s.IsNotFound());
110 if (s.ok()) {
111 s = rollback_batch_->Put(cf_handle, key, pinnable_val);
112 assert(s.ok());
113 } else if (s.IsNotFound()) {
114 // There has been no readable value before txn. By adding a delete we
115 // make sure that there will be none afterwards either.
116 s = rollback_batch_->Delete(cf_handle, key);
117 assert(s.ok());
118 } else {
119 // Unexpected status. Return it to the user.
120 }
121 return s;
122 }
123
124 Status PutCF(uint32_t cf, const Slice& key,
125 const Slice& /*val*/) override {
126 return Rollback(cf, key);
127 }
128
129 Status DeleteCF(uint32_t cf, const Slice& key) override {
130 return Rollback(cf, key);
131 }
132
133 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
134 return Rollback(cf, key);
135 }
136
137 Status MergeCF(uint32_t cf, const Slice& key,
138 const Slice& /*val*/) override {
139 if (rollback_merge_operands_) {
140 return Rollback(cf, key);
141 } else {
142 return Status::OK();
143 }
144 }
145
146 // Recovered batches do not contain 2PC markers.
147 Status MarkNoop(bool) override { return Status::InvalidArgument(); }
148 Status MarkBeginPrepare(bool) override {
149 return Status::InvalidArgument();
150 }
151 Status MarkEndPrepare(const Slice&) override {
152 return Status::InvalidArgument();
153 }
154 Status MarkCommit(const Slice&) override {
155 return Status::InvalidArgument();
156 }
157 Status MarkRollback(const Slice&) override {
158 return Status::InvalidArgument();
159 }
160 } rollback_handler(db_impl_, last_visible_txn, &rollback_batch,
161 *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
162 txn_db_options_.rollback_merge_operands);
163
164 auto s = batch->Iterate(&rollback_handler);
165 if (!s.ok()) {
166 return s;
167 }
168
169 // The Rollback marker will be used as a batch separator
170 s = WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
171 if (!s.ok()) {
172 return s;
173 }
174
175 const uint64_t kNoLogRef = 0;
176 const bool kDisableMemtable = true;
177 const size_t kOneBatch = 1;
178 uint64_t seq_used = kMaxSequenceNumber;
179 s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
180 kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch);
181 if (!s.ok()) {
182 return s;
183 }
184
185 // If two_write_queues, we must manually release the sequence number to
186 // readers.
187 if (db_impl_->immutable_db_options().two_write_queues) {
188 db_impl_->SetLastPublishedSequence(seq_used);
189 }
190 }
191
192 return Status::OK();
193 }
194
Initialize(const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles)195 Status WriteUnpreparedTxnDB::Initialize(
196 const std::vector<size_t>& compaction_enabled_cf_indices,
197 const std::vector<ColumnFamilyHandle*>& handles) {
198 // TODO(lth): Reduce code duplication in this function.
199 auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
200 assert(dbimpl != nullptr);
201
202 db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
203 // A callback to commit a single sub-batch
204 class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
205 public:
206 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
207 : db_(db) {}
208 Status Callback(SequenceNumber commit_seq,
209 bool is_mem_disabled __attribute__((__unused__)), uint64_t,
210 size_t /*index*/, size_t /*total*/) override {
211 assert(!is_mem_disabled);
212 db_->AddCommitted(commit_seq, commit_seq);
213 return Status::OK();
214 }
215
216 private:
217 WritePreparedTxnDB* db_;
218 };
219 db_impl_->SetRecoverableStatePreReleaseCallback(
220 new CommitSubBatchPreReleaseCallback(this));
221
222 // PessimisticTransactionDB::Initialize
223 for (auto cf_ptr : handles) {
224 AddColumnFamily(cf_ptr);
225 }
226 // Verify cf options
227 for (auto handle : handles) {
228 ColumnFamilyDescriptor cfd;
229 Status s = handle->GetDescriptor(&cfd);
230 if (!s.ok()) {
231 return s;
232 }
233 s = VerifyCFOptions(cfd.options);
234 if (!s.ok()) {
235 return s;
236 }
237 }
238
239 // Re-enable compaction for the column families that initially had
240 // compaction enabled.
241 std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
242 compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
243 for (auto index : compaction_enabled_cf_indices) {
244 compaction_enabled_cf_handles.push_back(handles[index]);
245 }
246
247 // create 'real' transactions from recovered shell transactions
248 auto rtxns = dbimpl->recovered_transactions();
249 std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
250 for (auto rtxn : rtxns) {
251 auto recovered_trx = rtxn.second;
252 assert(recovered_trx);
253 assert(recovered_trx->batches_.size() >= 1);
254 assert(recovered_trx->name_.length());
255
256 // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
257 // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
258 // two iterations is required.
259 if (recovered_trx->unprepared_) {
260 continue;
261 }
262
263 WriteOptions w_options;
264 w_options.sync = true;
265 TransactionOptions t_options;
266
267 auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
268 auto first_seq = recovered_trx->batches_.begin()->first;
269 auto last_prepare_batch_cnt =
270 recovered_trx->batches_.begin()->second.batch_cnt_;
271
272 Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
273 assert(real_trx);
274 auto wupt = static_cast_with_check<WriteUnpreparedTxn>(real_trx);
275 wupt->recovered_txn_ = true;
276
277 real_trx->SetLogNumber(first_log_number);
278 real_trx->SetId(first_seq);
279 Status s = real_trx->SetName(recovered_trx->name_);
280 if (!s.ok()) {
281 return s;
282 }
283 wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
284
285 for (auto batch : recovered_trx->batches_) {
286 const auto& seq = batch.first;
287 const auto& batch_info = batch.second;
288 auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
289 assert(batch_info.log_number_);
290
291 ordered_seq_cnt[seq] = cnt;
292 assert(wupt->unprep_seqs_.count(seq) == 0);
293 wupt->unprep_seqs_[seq] = cnt;
294
295 s = wupt->RebuildFromWriteBatch(batch_info.batch_);
296 assert(s.ok());
297 if (!s.ok()) {
298 return s;
299 }
300 }
301
302 const bool kClear = true;
303 wupt->InitWriteBatch(kClear);
304
305 real_trx->SetState(Transaction::PREPARED);
306 if (!s.ok()) {
307 return s;
308 }
309 }
310 // AddPrepared must be called in order
311 for (auto seq_cnt : ordered_seq_cnt) {
312 auto seq = seq_cnt.first;
313 auto cnt = seq_cnt.second;
314 for (size_t i = 0; i < cnt; i++) {
315 AddPrepared(seq + i);
316 }
317 }
318
319 SequenceNumber prev_max = max_evicted_seq_;
320 SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
321 AdvanceMaxEvictedSeq(prev_max, last_seq);
322 // Create a gap between max and the next snapshot. This simplifies the logic
323 // in IsInSnapshot by not having to consider the special case of max ==
324 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
325 if (last_seq) {
326 db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
327 db_impl_->versions_->SetLastSequence(last_seq + 1);
328 db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
329 }
330
331 Status s;
332 // Rollback unprepared transactions.
333 for (auto rtxn : rtxns) {
334 auto recovered_trx = rtxn.second;
335 if (recovered_trx->unprepared_) {
336 s = RollbackRecoveredTransaction(recovered_trx);
337 if (!s.ok()) {
338 return s;
339 }
340 continue;
341 }
342 }
343
344 if (s.ok()) {
345 dbimpl->DeleteAllRecoveredTransactions();
346
347 // Compaction should start only after max_evicted_seq_ is set AND recovered
348 // transactions are either added to PrepareHeap or rolled back.
349 s = EnableAutoCompaction(compaction_enabled_cf_handles);
350 }
351
352 return s;
353 }
354
BeginTransaction(const WriteOptions & write_options,const TransactionOptions & txn_options,Transaction * old_txn)355 Transaction* WriteUnpreparedTxnDB::BeginTransaction(
356 const WriteOptions& write_options, const TransactionOptions& txn_options,
357 Transaction* old_txn) {
358 if (old_txn != nullptr) {
359 ReinitializeTransaction(old_txn, write_options, txn_options);
360 return old_txn;
361 } else {
362 return new WriteUnpreparedTxn(this, write_options, txn_options);
363 }
364 }
365
366 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
367 struct WriteUnpreparedTxnDB::IteratorState {
IteratorStateROCKSDB_NAMESPACE::WriteUnpreparedTxnDB::IteratorState368 IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
369 std::shared_ptr<ManagedSnapshot> s,
370 SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
371 : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_,
372 kBackedByDBSnapshot),
373 snapshot(s) {}
MaxVisibleSeqROCKSDB_NAMESPACE::WriteUnpreparedTxnDB::IteratorState374 SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }
375
376 WriteUnpreparedTxnReadCallback callback;
377 std::shared_ptr<ManagedSnapshot> snapshot;
378 };
379
380 namespace {
CleanupWriteUnpreparedTxnDBIterator(void * arg1,void *)381 static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
382 delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
383 }
384 } // anonymous namespace
385
NewIterator(const ReadOptions & options,ColumnFamilyHandle * column_family,WriteUnpreparedTxn * txn)386 Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
387 ColumnFamilyHandle* column_family,
388 WriteUnpreparedTxn* txn) {
389 // TODO(lth): Refactor so that this logic is shared with WritePrepared.
390 constexpr bool expose_blob_index = false;
391 constexpr bool allow_refresh = false;
392 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
393 SequenceNumber snapshot_seq = kMaxSequenceNumber;
394 SequenceNumber min_uncommitted = 0;
395
396 // Currently, the Prev() iterator logic does not work well without snapshot
397 // validation. The logic simply iterates through values of a key in
398 // ascending seqno order, stopping at the first non-visible value and
399 // returning the last visible value.
400 //
401 // For example, if snapshot sequence is 3, and we have the following keys:
402 // foo: v1 1
403 // foo: v2 2
404 // foo: v3 3
405 // foo: v4 4
406 // foo: v5 5
407 //
408 // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
409 // which is the last visible value.
410 //
411 // For unprepared transactions, if we have snap_seq = 3, but the current
412 // transaction has unprep_seq 5, then returning the first non-visible value
413 // would be incorrect, as we should return v5, and not v3. The problem is that
414 // there are committed values at snapshot_seq < commit_seq < unprep_seq.
415 //
416 // Snapshot validation can prevent this problem by ensuring that no committed
417 // values exist at snapshot_seq < commit_seq, and thus any value with a
418 // sequence number greater than snapshot_seq must be unprepared values. For
419 // example, if the transaction had a snapshot at 3, then snapshot validation
420 // would be performed during the Put(v5) call. It would find v4, and the Put
421 // would fail with snapshot validation failure.
422 //
423 // TODO(lth): Improve Prev() logic to continue iterating until
424 // max_visible_seq, and then return the last visible value, so that this
425 // restriction can be lifted.
426 const Snapshot* snapshot = nullptr;
427 if (options.snapshot == nullptr) {
428 snapshot = GetSnapshot();
429 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
430 } else {
431 snapshot = options.snapshot;
432 }
433
434 snapshot_seq = snapshot->GetSequenceNumber();
435 assert(snapshot_seq != kMaxSequenceNumber);
436 // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
437 // guaranteed that for keys that were modified by this transaction (and thus
438 // might have unprepared values), no committed values exist at
439 // largest_validated_seq < commit_seq (or the contrapositive: any committed
440 // value must exist at commit_seq <= largest_validated_seq). This implies
441 // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
442 // snapshot_seq. As explained above, the problem with Prev() only happens when
443 // snapshot_seq < commit_seq.
444 //
445 // For keys that were not modified by this transaction, largest_validated_seq_
446 // is meaningless, and Prev() should just work with the existing visibility
447 // logic.
448 if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() &&
449 !txn->unprep_seqs_.empty()) {
450 ROCKS_LOG_ERROR(info_log_,
451 "WriteUnprepared iterator creation failed since the "
452 "transaction has performed unvalidated writes");
453 return nullptr;
454 }
455 min_uncommitted =
456 static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
457
458 auto* cfd =
459 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
460 auto* state =
461 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
462 auto* db_iter = db_impl_->NewIteratorImpl(
463 options, cfd, state->MaxVisibleSeq(), &state->callback, expose_blob_index,
464 allow_refresh);
465 db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
466 return db_iter;
467 }
468
469 } // namespace ROCKSDB_NAMESPACE
470 #endif // ROCKSDB_LITE
471