1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 #ifndef ROCKSDB_LITE 8 9 #include "utilities/transactions/write_prepared_txn_db.h" 10 #include "utilities/transactions/write_unprepared_txn.h" 11 12 namespace ROCKSDB_NAMESPACE { 13 14 class WriteUnpreparedTxn; 15 16 class WriteUnpreparedTxnDB : public WritePreparedTxnDB { 17 public: 18 using WritePreparedTxnDB::WritePreparedTxnDB; 19 20 Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices, 21 const std::vector<ColumnFamilyHandle*>& handles) override; 22 23 Transaction* BeginTransaction(const WriteOptions& write_options, 24 const TransactionOptions& txn_options, 25 Transaction* old_txn) override; 26 27 // Struct to hold ownership of snapshot and read callback for cleanup. 28 struct IteratorState; 29 30 using WritePreparedTxnDB::NewIterator; 31 Iterator* NewIterator(const ReadOptions& options, 32 ColumnFamilyHandle* column_family, 33 WriteUnpreparedTxn* txn); 34 35 private: 36 Status RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction* rtxn); 37 }; 38 39 class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { 40 // TODO(lth): Reduce code duplication with 41 // WritePreparedCommitEntryPreReleaseCallback 42 public: 43 // includes_data indicates that the commit also writes non-empty 44 // CommitTimeWriteBatch to memtable, which needs to be committed separately. 45 WriteUnpreparedCommitEntryPreReleaseCallback( 46 WritePreparedTxnDB* db, DBImpl* db_impl, 47 const std::map<SequenceNumber, size_t>& unprep_seqs, 48 size_t data_batch_cnt = 0, bool publish_seq = true) db_(db)49 : db_(db), 50 db_impl_(db_impl), 51 unprep_seqs_(unprep_seqs), 52 data_batch_cnt_(data_batch_cnt), 53 includes_data_(data_batch_cnt_ > 0), 54 publish_seq_(publish_seq) { 55 assert(unprep_seqs.size() > 0); 56 } 57 Callback(SequenceNumber commit_seq,bool is_mem_disabled,uint64_t,size_t,size_t)58 virtual Status Callback(SequenceNumber commit_seq, 59 bool is_mem_disabled __attribute__((__unused__)), 60 uint64_t, size_t /*index*/, 61 size_t /*total*/) override { 62 const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) 63 ? commit_seq 64 : commit_seq + data_batch_cnt_ - 1; 65 // Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt. 66 for (const auto& s : unprep_seqs_) { 67 for (size_t i = 0; i < s.second; i++) { 68 db_->AddCommitted(s.first + i, last_commit_seq); 69 } 70 } 71 72 if (includes_data_) { 73 assert(data_batch_cnt_); 74 // Commit the data that is accompanied with the commit request 75 for (size_t i = 0; i < data_batch_cnt_; i++) { 76 // For commit seq of each batch use the commit seq of the last batch. 77 // This would make debugging easier by having all the batches having 78 // the same sequence number. 79 db_->AddCommitted(commit_seq + i, last_commit_seq); 80 } 81 } 82 if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) { 83 assert(is_mem_disabled); // implies the 2nd queue 84 // Publish the sequence number. We can do that here assuming the callback 85 // is invoked only from one write queue, which would guarantee that the 86 // publish sequence numbers will be in order, i.e., once a seq is 87 // published all the seq prior to that are also publishable. 88 db_impl_->SetLastPublishedSequence(last_commit_seq); 89 } 90 // else SequenceNumber that is updated as part of the write already does the 91 // publishing 92 return Status::OK(); 93 } 94 95 private: 96 WritePreparedTxnDB* db_; 97 DBImpl* db_impl_; 98 const std::map<SequenceNumber, size_t>& unprep_seqs_; 99 size_t data_batch_cnt_; 100 // Either because it is commit without prepare or it has a 101 // CommitTimeWriteBatch 102 bool includes_data_; 103 // Should the callback also publishes the commit seq number 104 bool publish_seq_; 105 }; 106 107 } // namespace ROCKSDB_NAMESPACE 108 #endif // ROCKSDB_LITE 109