1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 8 #ifndef ROCKSDB_LITE 9 10 #include <set> 11 12 #include "utilities/transactions/write_prepared_txn.h" 13 #include "utilities/transactions/write_unprepared_txn_db.h" 14 15 namespace ROCKSDB_NAMESPACE { 16 17 class WriteUnpreparedTxnDB; 18 class WriteUnpreparedTxn; 19 20 // WriteUnprepared transactions needs to be able to read their own uncommitted 21 // writes, and supporting this requires some careful consideration. Because 22 // writes in the current transaction may be flushed to DB already, we cannot 23 // rely on the contents of WriteBatchWithIndex to determine whether a key should 24 // be visible or not, so we have to remember to check the DB for any uncommitted 25 // keys that should be visible to us. First, we will need to change the seek to 26 // snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq). 27 // Any key greater than max_visible_seq should not be visible because they 28 // cannot be unprepared by the current transaction and they are not in its 29 // snapshot. 30 // 31 // When we seek to max_visible_seq, one of these cases will happen: 32 // 1. We hit a unprepared key from the current transaction. 33 // 2. We hit a unprepared key from the another transaction. 34 // 3. We hit a committed key with snap_seq < seq < max_unprep_seq. 35 // 4. We hit a committed key with seq <= snap_seq. 36 // 37 // IsVisibleFullCheck handles all cases correctly. 38 // 39 // Other notes: 40 // Note that max_visible_seq is only calculated once at iterator construction 41 // time, meaning if the same transaction is adding more unprep seqs through 42 // writes during iteration, these newer writes may not be visible. This is not a 43 // problem for MySQL though because it avoids modifying the index as it is 44 // scanning through it to avoid the Halloween Problem. Instead, it scans the 45 // index once up front, and modifies based on a temporary copy. 46 // 47 // In DBIter, there is a "reseek" optimization if the iterator skips over too 48 // many keys. However, this assumes that the reseek seeks exactly to the 49 // required key. In write unprepared, even after seeking directly to 50 // max_visible_seq, some iteration may be required before hitting a visible key, 51 // and special precautions must be taken to avoid performing another reseek, 52 // leading to an infinite loop. 53 // 54 class WriteUnpreparedTxnReadCallback : public ReadCallback { 55 public: WriteUnpreparedTxnReadCallback(WritePreparedTxnDB * db,SequenceNumber snapshot,SequenceNumber min_uncommitted,const std::map<SequenceNumber,size_t> & unprep_seqs,SnapshotBackup backed_by_snapshot)56 WriteUnpreparedTxnReadCallback( 57 WritePreparedTxnDB* db, SequenceNumber snapshot, 58 SequenceNumber min_uncommitted, 59 const std::map<SequenceNumber, size_t>& unprep_seqs, 60 SnapshotBackup backed_by_snapshot) 61 // Pass our last uncommitted seq as the snapshot to the parent class to 62 // ensure that the parent will not prematurely filter out own writes. We 63 // will do the exact comparison against snapshots in IsVisibleFullCheck 64 // override. 65 : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted), 66 db_(db), 67 unprep_seqs_(unprep_seqs), 68 wup_snapshot_(snapshot), 69 backed_by_snapshot_(backed_by_snapshot) { 70 (void)backed_by_snapshot_; // to silence unused private field warning 71 } 72 ~WriteUnpreparedTxnReadCallback()73 virtual ~WriteUnpreparedTxnReadCallback() { 74 // If it is not backed by snapshot, the caller must check validity 75 assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot); 76 } 77 78 virtual bool IsVisibleFullCheck(SequenceNumber seq) override; 79 valid()80 inline bool valid() { 81 valid_checked_ = true; 82 return snap_released_ == false; 83 } 84 Refresh(SequenceNumber seq)85 void Refresh(SequenceNumber seq) override { 86 max_visible_seq_ = std::max(max_visible_seq_, seq); 87 wup_snapshot_ = seq; 88 } 89 CalcMaxVisibleSeq(const std::map<SequenceNumber,size_t> & unprep_seqs,SequenceNumber snapshot_seq)90 static SequenceNumber CalcMaxVisibleSeq( 91 const std::map<SequenceNumber, size_t>& unprep_seqs, 92 SequenceNumber snapshot_seq) { 93 SequenceNumber max_unprepared = 0; 94 if (unprep_seqs.size()) { 95 max_unprepared = 96 unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; 97 } 98 return std::max(max_unprepared, snapshot_seq); 99 } 100 101 private: 102 WritePreparedTxnDB* db_; 103 const std::map<SequenceNumber, size_t>& unprep_seqs_; 104 SequenceNumber wup_snapshot_; 105 // Whether max_visible_seq_ is backed by a snapshot 106 const SnapshotBackup backed_by_snapshot_; 107 bool snap_released_ = false; 108 // Safety check to ensure that the caller has checked invalid statuses 109 bool valid_checked_ = false; 110 }; 111 112 class WriteUnpreparedTxn : public WritePreparedTxn { 113 public: 114 WriteUnpreparedTxn(WriteUnpreparedTxnDB* db, 115 const WriteOptions& write_options, 116 const TransactionOptions& txn_options); 117 118 virtual ~WriteUnpreparedTxn(); 119 120 using TransactionBaseImpl::Put; 121 virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, 122 const Slice& value, 123 const bool assume_tracked = false) override; 124 virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, 125 const SliceParts& value, 126 const bool assume_tracked = false) override; 127 128 using TransactionBaseImpl::Merge; 129 virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key, 130 const Slice& value, 131 const bool assume_tracked = false) override; 132 133 using TransactionBaseImpl::Delete; 134 virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key, 135 const bool assume_tracked = false) override; 136 virtual Status Delete(ColumnFamilyHandle* column_family, 137 const SliceParts& key, 138 const bool assume_tracked = false) override; 139 140 using TransactionBaseImpl::SingleDelete; 141 virtual Status SingleDelete(ColumnFamilyHandle* column_family, 142 const Slice& key, 143 const bool assume_tracked = false) override; 144 virtual Status SingleDelete(ColumnFamilyHandle* column_family, 145 const SliceParts& key, 146 const bool assume_tracked = false) override; 147 148 // In WriteUnprepared, untracked writes will break snapshot validation logic. 149 // Snapshot validation will only check the largest sequence number of a key to 150 // see if it was committed or not. However, an untracked unprepared write will 151 // hide smaller committed sequence numbers. 152 // 153 // TODO(lth): Investigate whether it is worth having snapshot validation 154 // validate all values larger than snap_seq. Otherwise, we should return 155 // Status::NotSupported for untracked writes. 156 157 virtual Status RebuildFromWriteBatch(WriteBatch*) override; 158 GetLastLogNumber()159 virtual uint64_t GetLastLogNumber() const override { 160 return last_log_number_; 161 } 162 RemoveActiveIterator(Iterator * iter)163 void RemoveActiveIterator(Iterator* iter) { 164 active_iterators_.erase( 165 std::remove(active_iterators_.begin(), active_iterators_.end(), iter), 166 active_iterators_.end()); 167 } 168 169 protected: 170 void Initialize(const TransactionOptions& txn_options) override; 171 172 Status PrepareInternal() override; 173 174 Status CommitWithoutPrepareInternal() override; 175 Status CommitInternal() override; 176 177 Status RollbackInternal() override; 178 179 void Clear() override; 180 181 void SetSavePoint() override; 182 Status RollbackToSavePoint() override; 183 Status PopSavePoint() override; 184 185 // Get and GetIterator needs to be overridden so that a ReadCallback to 186 // handle read-your-own-write is used. 187 using Transaction::Get; 188 virtual Status Get(const ReadOptions& options, 189 ColumnFamilyHandle* column_family, const Slice& key, 190 PinnableSlice* value) override; 191 192 using Transaction::MultiGet; 193 virtual void MultiGet(const ReadOptions& options, 194 ColumnFamilyHandle* column_family, 195 const size_t num_keys, const Slice* keys, 196 PinnableSlice* values, Status* statuses, 197 const bool sorted_input = false) override; 198 199 using Transaction::GetIterator; 200 virtual Iterator* GetIterator(const ReadOptions& options) override; 201 virtual Iterator* GetIterator(const ReadOptions& options, 202 ColumnFamilyHandle* column_family) override; 203 204 virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family, 205 const Slice& key, 206 SequenceNumber* tracked_at_seq) override; 207 208 private: 209 friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test; 210 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; 211 friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test; 212 friend class WriteUnpreparedTxnDB; 213 214 const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers(); 215 Status WriteRollbackKeys(const LockTracker& tracked_keys, 216 WriteBatchWithIndex* rollback_batch, 217 ReadCallback* callback, const ReadOptions& roptions); 218 219 Status MaybeFlushWriteBatchToDB(); 220 Status FlushWriteBatchToDB(bool prepared); 221 Status FlushWriteBatchToDBInternal(bool prepared); 222 Status FlushWriteBatchWithSavePointToDB(); 223 Status RollbackToSavePointInternal(); 224 Status HandleWrite(std::function<Status()> do_write); 225 226 // For write unprepared, we check on every writebatch append to see if 227 // write_batch_flush_threshold_ has been exceeded, and then call 228 // FlushWriteBatchToDB if so. This logic is encapsulated in 229 // MaybeFlushWriteBatchToDB. 230 int64_t write_batch_flush_threshold_; 231 WriteUnpreparedTxnDB* wupt_db_; 232 233 // Ordered list of unprep_seq sequence numbers that we have already written 234 // to DB. 235 // 236 // This maps unprep_seq => prepare_batch_cnt for each unprepared batch 237 // written by this transaction. 238 // 239 // Note that this contains both prepared and unprepared batches, since they 240 // are treated similarily in prepare heap/commit map, so it simplifies the 241 // commit callbacks. 242 std::map<SequenceNumber, size_t> unprep_seqs_; 243 244 uint64_t last_log_number_; 245 246 // Recovered transactions have tracked_keys_ populated, but are not actually 247 // locked for efficiency reasons. For recovered transactions, skip unlocking 248 // keys when transaction ends. 249 bool recovered_txn_; 250 251 // Track the largest sequence number at which we performed snapshot 252 // validation. If snapshot validation was skipped because no snapshot was set, 253 // then this is set to GetLastPublishedSequence. This value is useful because 254 // it means that for keys that have unprepared seqnos, we can guarantee that 255 // no committed keys by other transactions can exist between 256 // largest_validated_seq_ and max_unprep_seq. See 257 // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is 258 // necessary for iterator Prev(). 259 // 260 // Currently this value only increases during the lifetime of a transaction, 261 // but in some cases, we should be able to restore the previously largest 262 // value when calling RollbackToSavepoint. 263 SequenceNumber largest_validated_seq_; 264 265 struct SavePoint { 266 // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is 267 // used during RollbackToSavepoint to determine visibility when restoring 268 // old values. 269 // 270 // TODO(lth): Since all unprep_seqs_ sets further down the stack must be 271 // subsets, this can potentially be deduplicated by just storing set 272 // difference. Investigate if this is worth it. 273 std::map<SequenceNumber, size_t> unprep_seqs_; 274 275 // This snapshot will be used to read keys at this savepoint if we call 276 // RollbackToSavePoint. 277 std::unique_ptr<ManagedSnapshot> snapshot_; 278 SavePointSavePoint279 SavePoint(const std::map<SequenceNumber, size_t>& seqs, 280 ManagedSnapshot* snapshot) 281 : unprep_seqs_(seqs), snapshot_(snapshot){}; 282 }; 283 284 // We have 3 data structures holding savepoint information: 285 // 1. TransactionBaseImpl::save_points_ 286 // 2. WriteUnpreparedTxn::flushed_save_points_ 287 // 3. WriteUnpreparecTxn::unflushed_save_points_ 288 // 289 // TransactionBaseImpl::save_points_ holds information about all write 290 // batches, including the current in-memory write_batch_, or unprepared 291 // batches that have been written out. Its responsibility is just to track 292 // which keys have been modified in every savepoint. 293 // 294 // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints 295 // set on unprepared batches that have already flushed. It holds the snapshot 296 // and unprep_seqs at that savepoint, so that the rollback process can 297 // determine which keys were visible at that point in time. 298 // 299 // WriteUnpreparecTxn::unflushed_save_points_ holds information about 300 // savepoints on the current in-memory write_batch_. It simply records the 301 // size of the write batch at every savepoint. 302 // 303 // TODO(lth): Remove the redundancy between save_point_boundaries_ and 304 // write_batch_.save_points_. 305 // 306 // Based on this information, here are some invariants: 307 // size(unflushed_save_points_) = size(write_batch_.save_points_) 308 // size(flushed_save_points_) + size(unflushed_save_points_) 309 // = size(save_points_) 310 // 311 std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>> 312 flushed_save_points_; 313 std::unique_ptr<autovector<size_t>> unflushed_save_points_; 314 315 // It is currently unsafe to flush a write batch if there are active iterators 316 // created from this transaction. This is because we use WriteBatchWithIndex 317 // to do merging reads from the DB and the write batch. If we flush the write 318 // batch, it is possible that the delta iterator on the iterator will point to 319 // invalid memory. 320 std::vector<Iterator*> active_iterators_; 321 322 // Untracked keys that we have to rollback. 323 // 324 // TODO(lth): Currently we we do not record untracked keys per-savepoint. 325 // This means that when rolling back to savepoints, we have to check all 326 // keys in the current transaction for rollback. Note that this is only 327 // inefficient, but still correct because we take a snapshot at every 328 // savepoint, and we will use that snapshot to construct the rollback batch. 329 // The rollback batch will then contain a reissue of the same marker. 330 // 331 // A more optimal solution would be to only check keys changed since the 332 // last savepoint. Also, it may make sense to merge this into tracked_keys_ 333 // and differentiate between tracked but not locked keys to avoid having two 334 // very similar data structures. 335 using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>; 336 KeySet untracked_keys_; 337 }; 338 339 } // namespace ROCKSDB_NAMESPACE 340 341 #endif // ROCKSDB_LITE 342