1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 #ifndef ROCKSDB_LITE
8 
9 #include "utilities/transactions/write_prepared_txn_db.h"
10 #include "utilities/transactions/write_unprepared_txn.h"
11 
12 namespace ROCKSDB_NAMESPACE {
13 
14 class WriteUnpreparedTxn;
15 
16 class WriteUnpreparedTxnDB : public WritePreparedTxnDB {
17  public:
18   using WritePreparedTxnDB::WritePreparedTxnDB;
19 
20   Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices,
21                     const std::vector<ColumnFamilyHandle*>& handles) override;
22 
23   Transaction* BeginTransaction(const WriteOptions& write_options,
24                                 const TransactionOptions& txn_options,
25                                 Transaction* old_txn) override;
26 
27   // Struct to hold ownership of snapshot and read callback for cleanup.
28   struct IteratorState;
29 
30   using WritePreparedTxnDB::NewIterator;
31   Iterator* NewIterator(const ReadOptions& options,
32                         ColumnFamilyHandle* column_family,
33                         WriteUnpreparedTxn* txn);
34 
35  private:
36   Status RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction* rtxn);
37 };
38 
39 class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
40   // TODO(lth): Reduce code duplication with
41   // WritePreparedCommitEntryPreReleaseCallback
42  public:
43   // includes_data indicates that the commit also writes non-empty
44   // CommitTimeWriteBatch to memtable, which needs to be committed separately.
45   WriteUnpreparedCommitEntryPreReleaseCallback(
46       WritePreparedTxnDB* db, DBImpl* db_impl,
47       const std::map<SequenceNumber, size_t>& unprep_seqs,
48       size_t data_batch_cnt = 0, bool publish_seq = true)
db_(db)49       : db_(db),
50         db_impl_(db_impl),
51         unprep_seqs_(unprep_seqs),
52         data_batch_cnt_(data_batch_cnt),
53         includes_data_(data_batch_cnt_ > 0),
54         publish_seq_(publish_seq) {
55     assert(unprep_seqs.size() > 0);
56   }
57 
Callback(SequenceNumber commit_seq,bool is_mem_disabled,uint64_t,size_t,size_t)58   virtual Status Callback(SequenceNumber commit_seq,
59                           bool is_mem_disabled __attribute__((__unused__)),
60                           uint64_t, size_t /*index*/,
61                           size_t /*total*/) override {
62     const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
63                                          ? commit_seq
64                                          : commit_seq + data_batch_cnt_ - 1;
65     // Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt.
66     for (const auto& s : unprep_seqs_) {
67       for (size_t i = 0; i < s.second; i++) {
68         db_->AddCommitted(s.first + i, last_commit_seq);
69       }
70     }
71 
72     if (includes_data_) {
73       assert(data_batch_cnt_);
74       // Commit the data that is accompanied with the commit request
75       for (size_t i = 0; i < data_batch_cnt_; i++) {
76         // For commit seq of each batch use the commit seq of the last batch.
77         // This would make debugging easier by having all the batches having
78         // the same sequence number.
79         db_->AddCommitted(commit_seq + i, last_commit_seq);
80       }
81     }
82     if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) {
83       assert(is_mem_disabled);  // implies the 2nd queue
84       // Publish the sequence number. We can do that here assuming the callback
85       // is invoked only from one write queue, which would guarantee that the
86       // publish sequence numbers will be in order, i.e., once a seq is
87       // published all the seq prior to that are also publishable.
88       db_impl_->SetLastPublishedSequence(last_commit_seq);
89     }
90     // else SequenceNumber that is updated as part of the write already does the
91     // publishing
92     return Status::OK();
93   }
94 
95  private:
96   WritePreparedTxnDB* db_;
97   DBImpl* db_impl_;
98   const std::map<SequenceNumber, size_t>& unprep_seqs_;
99   size_t data_batch_cnt_;
100   // Either because it is commit without prepare or it has a
101   // CommitTimeWriteBatch
102   bool includes_data_;
103   // Should the callback also publishes the commit seq number
104   bool publish_seq_;
105 };
106 
107 }  // namespace ROCKSDB_NAMESPACE
108 #endif  // ROCKSDB_LITE
109