1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 
8 #ifndef ROCKSDB_LITE
9 
10 #include <set>
11 
12 #include "utilities/transactions/write_prepared_txn.h"
13 #include "utilities/transactions/write_unprepared_txn_db.h"
14 
15 namespace ROCKSDB_NAMESPACE {
16 
17 class WriteUnpreparedTxnDB;
18 class WriteUnpreparedTxn;
19 
20 // WriteUnprepared transactions needs to be able to read their own uncommitted
21 // writes, and supporting this requires some careful consideration. Because
22 // writes in the current transaction may be flushed to DB already, we cannot
23 // rely on the contents of WriteBatchWithIndex to determine whether a key should
24 // be visible or not, so we have to remember to check the DB for any uncommitted
25 // keys that should be visible to us. First, we will need to change the seek to
26 // snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
27 // Any key greater than max_visible_seq should not be visible because they
28 // cannot be unprepared by the current transaction and they are not in its
29 // snapshot.
30 //
31 // When we seek to max_visible_seq, one of these cases will happen:
32 // 1. We hit a unprepared key from the current transaction.
33 // 2. We hit a unprepared key from the another transaction.
34 // 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
35 // 4. We hit a committed key with seq <= snap_seq.
36 //
37 // IsVisibleFullCheck handles all cases correctly.
38 //
39 // Other notes:
40 // Note that max_visible_seq is only calculated once at iterator construction
41 // time, meaning if the same transaction is adding more unprep seqs through
42 // writes during iteration, these newer writes may not be visible. This is not a
43 // problem for MySQL though because it avoids modifying the index as it is
44 // scanning through it to avoid the Halloween Problem. Instead, it scans the
45 // index once up front, and modifies based on a temporary copy.
46 //
47 // In DBIter, there is a "reseek" optimization if the iterator skips over too
48 // many keys. However, this assumes that the reseek seeks exactly to the
49 // required key. In write unprepared, even after seeking directly to
50 // max_visible_seq, some iteration may be required before hitting a visible key,
51 // and special precautions must be taken to avoid performing another reseek,
52 // leading to an infinite loop.
53 //
54 class WriteUnpreparedTxnReadCallback : public ReadCallback {
55  public:
WriteUnpreparedTxnReadCallback(WritePreparedTxnDB * db,SequenceNumber snapshot,SequenceNumber min_uncommitted,const std::map<SequenceNumber,size_t> & unprep_seqs,SnapshotBackup backed_by_snapshot)56   WriteUnpreparedTxnReadCallback(
57       WritePreparedTxnDB* db, SequenceNumber snapshot,
58       SequenceNumber min_uncommitted,
59       const std::map<SequenceNumber, size_t>& unprep_seqs,
60       SnapshotBackup backed_by_snapshot)
61       // Pass our last uncommitted seq as the snapshot to the parent class to
62       // ensure that the parent will not prematurely filter out own writes. We
63       // will do the exact comparison against snapshots in IsVisibleFullCheck
64       // override.
65       : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
66         db_(db),
67         unprep_seqs_(unprep_seqs),
68         wup_snapshot_(snapshot),
69         backed_by_snapshot_(backed_by_snapshot) {
70     (void)backed_by_snapshot_;  // to silence unused private field warning
71   }
72 
~WriteUnpreparedTxnReadCallback()73   virtual ~WriteUnpreparedTxnReadCallback() {
74     // If it is not backed by snapshot, the caller must check validity
75     assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
76   }
77 
78   virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
79 
valid()80   inline bool valid() {
81     valid_checked_ = true;
82     return snap_released_ == false;
83   }
84 
Refresh(SequenceNumber seq)85   void Refresh(SequenceNumber seq) override {
86     max_visible_seq_ = std::max(max_visible_seq_, seq);
87     wup_snapshot_ = seq;
88   }
89 
CalcMaxVisibleSeq(const std::map<SequenceNumber,size_t> & unprep_seqs,SequenceNumber snapshot_seq)90   static SequenceNumber CalcMaxVisibleSeq(
91       const std::map<SequenceNumber, size_t>& unprep_seqs,
92       SequenceNumber snapshot_seq) {
93     SequenceNumber max_unprepared = 0;
94     if (unprep_seqs.size()) {
95       max_unprepared =
96           unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
97     }
98     return std::max(max_unprepared, snapshot_seq);
99   }
100 
101  private:
102   WritePreparedTxnDB* db_;
103   const std::map<SequenceNumber, size_t>& unprep_seqs_;
104   SequenceNumber wup_snapshot_;
105   // Whether max_visible_seq_ is backed by a snapshot
106   const SnapshotBackup backed_by_snapshot_;
107   bool snap_released_ = false;
108   // Safety check to ensure that the caller has checked invalid statuses
109   bool valid_checked_ = false;
110 };
111 
112 class WriteUnpreparedTxn : public WritePreparedTxn {
113  public:
114   WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
115                      const WriteOptions& write_options,
116                      const TransactionOptions& txn_options);
117 
118   virtual ~WriteUnpreparedTxn();
119 
120   using TransactionBaseImpl::Put;
121   virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
122                      const Slice& value,
123                      const bool assume_tracked = false) override;
124   virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
125                      const SliceParts& value,
126                      const bool assume_tracked = false) override;
127 
128   using TransactionBaseImpl::Merge;
129   virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
130                        const Slice& value,
131                        const bool assume_tracked = false) override;
132 
133   using TransactionBaseImpl::Delete;
134   virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
135                         const bool assume_tracked = false) override;
136   virtual Status Delete(ColumnFamilyHandle* column_family,
137                         const SliceParts& key,
138                         const bool assume_tracked = false) override;
139 
140   using TransactionBaseImpl::SingleDelete;
141   virtual Status SingleDelete(ColumnFamilyHandle* column_family,
142                               const Slice& key,
143                               const bool assume_tracked = false) override;
144   virtual Status SingleDelete(ColumnFamilyHandle* column_family,
145                               const SliceParts& key,
146                               const bool assume_tracked = false) override;
147 
148   // In WriteUnprepared, untracked writes will break snapshot validation logic.
149   // Snapshot validation will only check the largest sequence number of a key to
150   // see if it was committed or not. However, an untracked unprepared write will
151   // hide smaller committed sequence numbers.
152   //
153   // TODO(lth): Investigate whether it is worth having snapshot validation
154   // validate all values larger than snap_seq. Otherwise, we should return
155   // Status::NotSupported for untracked writes.
156 
157   virtual Status RebuildFromWriteBatch(WriteBatch*) override;
158 
GetLastLogNumber()159   virtual uint64_t GetLastLogNumber() const override {
160     return last_log_number_;
161   }
162 
RemoveActiveIterator(Iterator * iter)163   void RemoveActiveIterator(Iterator* iter) {
164     active_iterators_.erase(
165         std::remove(active_iterators_.begin(), active_iterators_.end(), iter),
166         active_iterators_.end());
167   }
168 
169  protected:
170   void Initialize(const TransactionOptions& txn_options) override;
171 
172   Status PrepareInternal() override;
173 
174   Status CommitWithoutPrepareInternal() override;
175   Status CommitInternal() override;
176 
177   Status RollbackInternal() override;
178 
179   void Clear() override;
180 
181   void SetSavePoint() override;
182   Status RollbackToSavePoint() override;
183   Status PopSavePoint() override;
184 
185   // Get and GetIterator needs to be overridden so that a ReadCallback to
186   // handle read-your-own-write is used.
187   using Transaction::Get;
188   virtual Status Get(const ReadOptions& options,
189                      ColumnFamilyHandle* column_family, const Slice& key,
190                      PinnableSlice* value) override;
191 
192   using Transaction::MultiGet;
193   virtual void MultiGet(const ReadOptions& options,
194                         ColumnFamilyHandle* column_family,
195                         const size_t num_keys, const Slice* keys,
196                         PinnableSlice* values, Status* statuses,
197                         const bool sorted_input = false) override;
198 
199   using Transaction::GetIterator;
200   virtual Iterator* GetIterator(const ReadOptions& options) override;
201   virtual Iterator* GetIterator(const ReadOptions& options,
202                                 ColumnFamilyHandle* column_family) override;
203 
204   virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
205                                   const Slice& key,
206                                   SequenceNumber* tracked_at_seq) override;
207 
208  private:
209   friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
210   friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
211   friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
212   friend class WriteUnpreparedTxnDB;
213 
214   const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
215   Status WriteRollbackKeys(const LockTracker& tracked_keys,
216                            WriteBatchWithIndex* rollback_batch,
217                            ReadCallback* callback, const ReadOptions& roptions);
218 
219   Status MaybeFlushWriteBatchToDB();
220   Status FlushWriteBatchToDB(bool prepared);
221   Status FlushWriteBatchToDBInternal(bool prepared);
222   Status FlushWriteBatchWithSavePointToDB();
223   Status RollbackToSavePointInternal();
224   Status HandleWrite(std::function<Status()> do_write);
225 
226   // For write unprepared, we check on every writebatch append to see if
227   // write_batch_flush_threshold_ has been exceeded, and then call
228   // FlushWriteBatchToDB if so. This logic is encapsulated in
229   // MaybeFlushWriteBatchToDB.
230   int64_t write_batch_flush_threshold_;
231   WriteUnpreparedTxnDB* wupt_db_;
232 
233   // Ordered list of unprep_seq sequence numbers that we have already written
234   // to DB.
235   //
236   // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
237   // written by this transaction.
238   //
239   // Note that this contains both prepared and unprepared batches, since they
240   // are treated similarily in prepare heap/commit map, so it simplifies the
241   // commit callbacks.
242   std::map<SequenceNumber, size_t> unprep_seqs_;
243 
244   uint64_t last_log_number_;
245 
246   // Recovered transactions have tracked_keys_ populated, but are not actually
247   // locked for efficiency reasons. For recovered transactions, skip unlocking
248   // keys when transaction ends.
249   bool recovered_txn_;
250 
251   // Track the largest sequence number at which we performed snapshot
252   // validation. If snapshot validation was skipped because no snapshot was set,
253   // then this is set to GetLastPublishedSequence. This value is useful because
254   // it means that for keys that have unprepared seqnos, we can guarantee that
255   // no committed keys by other transactions can exist between
256   // largest_validated_seq_ and max_unprep_seq. See
257   // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
258   // necessary for iterator Prev().
259   //
260   // Currently this value only increases during the lifetime of a transaction,
261   // but in some cases, we should be able to restore the previously largest
262   // value when calling RollbackToSavepoint.
263   SequenceNumber largest_validated_seq_;
264 
265   struct SavePoint {
266     // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
267     // used during RollbackToSavepoint to determine visibility when restoring
268     // old values.
269     //
270     // TODO(lth): Since all unprep_seqs_ sets further down the stack must be
271     // subsets, this can potentially be deduplicated by just storing set
272     // difference. Investigate if this is worth it.
273     std::map<SequenceNumber, size_t> unprep_seqs_;
274 
275     // This snapshot will be used to read keys at this savepoint if we call
276     // RollbackToSavePoint.
277     std::unique_ptr<ManagedSnapshot> snapshot_;
278 
SavePointSavePoint279     SavePoint(const std::map<SequenceNumber, size_t>& seqs,
280               ManagedSnapshot* snapshot)
281         : unprep_seqs_(seqs), snapshot_(snapshot){};
282   };
283 
284   // We have 3 data structures holding savepoint information:
285   // 1. TransactionBaseImpl::save_points_
286   // 2. WriteUnpreparedTxn::flushed_save_points_
287   // 3. WriteUnpreparecTxn::unflushed_save_points_
288   //
289   // TransactionBaseImpl::save_points_ holds information about all write
290   // batches, including the current in-memory write_batch_, or unprepared
291   // batches that have been written out. Its responsibility is just to track
292   // which keys have been modified in every savepoint.
293   //
294   // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
295   // set on unprepared batches that have already flushed. It holds the snapshot
296   // and unprep_seqs at that savepoint, so that the rollback process can
297   // determine which keys were visible at that point in time.
298   //
299   // WriteUnpreparecTxn::unflushed_save_points_ holds information about
300   // savepoints on the current in-memory write_batch_. It simply records the
301   // size of the write batch at every savepoint.
302   //
303   // TODO(lth): Remove the redundancy between save_point_boundaries_ and
304   // write_batch_.save_points_.
305   //
306   // Based on this information, here are some invariants:
307   // size(unflushed_save_points_) = size(write_batch_.save_points_)
308   // size(flushed_save_points_) + size(unflushed_save_points_)
309   //   = size(save_points_)
310   //
311   std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
312       flushed_save_points_;
313   std::unique_ptr<autovector<size_t>> unflushed_save_points_;
314 
315   // It is currently unsafe to flush a write batch if there are active iterators
316   // created from this transaction. This is because we use WriteBatchWithIndex
317   // to do merging reads from the DB and the write batch. If we flush the write
318   // batch, it is possible that the delta iterator on the iterator will point to
319   // invalid memory.
320   std::vector<Iterator*> active_iterators_;
321 
322   // Untracked keys that we have to rollback.
323   //
324   // TODO(lth): Currently we we do not record untracked keys per-savepoint.
325   // This means that when rolling back to savepoints, we have to check all
326   // keys in the current transaction for rollback. Note that this is only
327   // inefficient, but still correct because we take a snapshot at every
328   // savepoint, and we will use that snapshot to construct the rollback batch.
329   // The rollback batch will then contain a reissue of the same marker.
330   //
331   // A more optimal solution would be to only check keys changed since the
332   // last savepoint. Also, it may make sense to merge this into tracked_keys_
333   // and differentiate between tracked but not locked keys to avoid having two
334   // very similar data structures.
335   using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
336   KeySet untracked_keys_;
337 };
338 
339 }  // namespace ROCKSDB_NAMESPACE
340 
341 #endif  // ROCKSDB_LITE
342