1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #ifndef ROCKSDB_LITE 7 8 #include "utilities/transactions/pessimistic_transaction.h" 9 10 #include <map> 11 #include <set> 12 #include <string> 13 #include <vector> 14 15 #include "db/column_family.h" 16 #include "db/db_impl/db_impl.h" 17 #include "rocksdb/comparator.h" 18 #include "rocksdb/db.h" 19 #include "rocksdb/snapshot.h" 20 #include "rocksdb/status.h" 21 #include "rocksdb/utilities/transaction_db.h" 22 #include "test_util/sync_point.h" 23 #include "util/cast_util.h" 24 #include "util/string_util.h" 25 #include "utilities/transactions/pessimistic_transaction_db.h" 26 #include "utilities/transactions/transaction_util.h" 27 28 namespace ROCKSDB_NAMESPACE { 29 30 struct WriteOptions; 31 32 std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1); 33 34 TransactionID PessimisticTransaction::GenTxnID() { 35 return txn_id_counter_.fetch_add(1); 36 } 37 38 PessimisticTransaction::PessimisticTransaction( 39 TransactionDB* txn_db, const WriteOptions& write_options, 40 const TransactionOptions& txn_options, const bool init) 41 : TransactionBaseImpl(txn_db->GetRootDB(), write_options), 42 txn_db_impl_(nullptr), 43 expiration_time_(0), 44 txn_id_(0), 45 waiting_cf_id_(0), 46 waiting_key_(nullptr), 47 lock_timeout_(0), 48 deadlock_detect_(false), 49 deadlock_detect_depth_(0), 50 skip_concurrency_control_(false) { 51 txn_db_impl_ = 52 static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db); 53 db_impl_ = static_cast_with_check<DBImpl, DB>(db_); 54 if (init) { 55 Initialize(txn_options); 56 } 57 } 58 59 void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { 60 txn_id_ = GenTxnID(); 61 62 txn_state_ = STARTED; 63 64 deadlock_detect_ = txn_options.deadlock_detect; 65 deadlock_detect_depth_ = txn_options.deadlock_detect_depth; 66 write_batch_.SetMaxBytes(txn_options.max_write_batch_size); 67 skip_concurrency_control_ = txn_options.skip_concurrency_control; 68 69 lock_timeout_ = txn_options.lock_timeout * 1000; 70 if (lock_timeout_ < 0) { 71 // Lock timeout not set, use default 72 lock_timeout_ = 73 txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000; 74 } 75 76 if (txn_options.expiration >= 0) { 77 expiration_time_ = start_time_ + txn_options.expiration * 1000; 78 } else { 79 expiration_time_ = 0; 80 } 81 82 if (txn_options.set_snapshot) { 83 SetSnapshot(); 84 } 85 86 if (expiration_time_ > 0) { 87 txn_db_impl_->InsertExpirableTransaction(txn_id_, this); 88 } 89 use_only_the_last_commit_time_batch_for_recovery_ = 90 txn_options.use_only_the_last_commit_time_batch_for_recovery; 91 } 92 93 PessimisticTransaction::~PessimisticTransaction() { 94 txn_db_impl_->UnLock(this, &GetTrackedKeys()); 95 if (expiration_time_ > 0) { 96 txn_db_impl_->RemoveExpirableTransaction(txn_id_); 97 } 98 if (!name_.empty() && txn_state_ != COMMITED) { 99 txn_db_impl_->UnregisterTransaction(this); 100 } 101 } 102 103 void PessimisticTransaction::Clear() { 104 txn_db_impl_->UnLock(this, &GetTrackedKeys()); 105 TransactionBaseImpl::Clear(); 106 } 107 108 void PessimisticTransaction::Reinitialize( 109 TransactionDB* txn_db, const WriteOptions& write_options, 110 const TransactionOptions& txn_options) { 111 if (!name_.empty() && txn_state_ != COMMITED) { 112 txn_db_impl_->UnregisterTransaction(this); 113 } 114 TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options); 115 Initialize(txn_options); 116 } 117 118 bool PessimisticTransaction::IsExpired() const { 119 if (expiration_time_ > 0) { 120 if (db_->GetEnv()->NowMicros() >= expiration_time_) { 121 // Transaction is expired. 122 return true; 123 } 124 } 125 126 return false; 127 } 128 129 WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, 130 const WriteOptions& write_options, 131 const TransactionOptions& txn_options) 132 : PessimisticTransaction(txn_db, write_options, txn_options){}; 133 134 Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { 135 TransactionKeyMap keys_to_unlock; 136 Status s = LockBatch(batch, &keys_to_unlock); 137 138 if (!s.ok()) { 139 return s; 140 } 141 142 bool can_commit = false; 143 144 if (IsExpired()) { 145 s = Status::Expired(); 146 } else if (expiration_time_ > 0) { 147 TransactionState expected = STARTED; 148 can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected, 149 AWAITING_COMMIT); 150 } else if (txn_state_ == STARTED) { 151 // lock stealing is not a concern 152 can_commit = true; 153 } 154 155 if (can_commit) { 156 txn_state_.store(AWAITING_COMMIT); 157 s = CommitBatchInternal(batch); 158 if (s.ok()) { 159 txn_state_.store(COMMITED); 160 } 161 } else if (txn_state_ == LOCKS_STOLEN) { 162 s = Status::Expired(); 163 } else { 164 s = Status::InvalidArgument("Transaction is not in state for commit."); 165 } 166 167 txn_db_impl_->UnLock(this, &keys_to_unlock); 168 169 return s; 170 } 171 172 Status PessimisticTransaction::Prepare() { 173 Status s; 174 175 if (name_.empty()) { 176 return Status::InvalidArgument( 177 "Cannot prepare a transaction that has not been named."); 178 } 179 180 if (IsExpired()) { 181 return Status::Expired(); 182 } 183 184 bool can_prepare = false; 185 186 if (expiration_time_ > 0) { 187 // must concern ourselves with expiraton and/or lock stealing 188 // need to compare/exchange bc locks could be stolen under us here 189 TransactionState expected = STARTED; 190 can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected, 191 AWAITING_PREPARE); 192 } else if (txn_state_ == STARTED) { 193 // expiration and lock stealing is not possible 194 can_prepare = true; 195 } 196 197 if (can_prepare) { 198 txn_state_.store(AWAITING_PREPARE); 199 // transaction can't expire after preparation 200 expiration_time_ = 0; 201 assert(log_number_ == 0 || 202 txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); 203 204 s = PrepareInternal(); 205 if (s.ok()) { 206 txn_state_.store(PREPARED); 207 } 208 } else if (txn_state_ == LOCKS_STOLEN) { 209 s = Status::Expired(); 210 } else if (txn_state_ == PREPARED) { 211 s = Status::InvalidArgument("Transaction has already been prepared."); 212 } else if (txn_state_ == COMMITED) { 213 s = Status::InvalidArgument("Transaction has already been committed."); 214 } else if (txn_state_ == ROLLEDBACK) { 215 s = Status::InvalidArgument("Transaction has already been rolledback."); 216 } else { 217 s = Status::InvalidArgument("Transaction is not in state for commit."); 218 } 219 220 return s; 221 } 222 223 Status WriteCommittedTxn::PrepareInternal() { 224 WriteOptions write_options = write_options_; 225 write_options.disableWAL = false; 226 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); 227 class MarkLogCallback : public PreReleaseCallback { 228 public: 229 MarkLogCallback(DBImpl* db, bool two_write_queues) 230 : db_(db), two_write_queues_(two_write_queues) { 231 (void)two_write_queues_; // to silence unused private field warning 232 } 233 virtual Status Callback(SequenceNumber, bool is_mem_disabled, 234 uint64_t log_number, size_t /*index*/, 235 size_t /*total*/) override { 236 #ifdef NDEBUG 237 (void)is_mem_disabled; 238 #endif 239 assert(log_number != 0); 240 assert(!two_write_queues_ || is_mem_disabled); // implies the 2nd queue 241 db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number); 242 return Status::OK(); 243 } 244 245 private: 246 DBImpl* db_; 247 bool two_write_queues_; 248 } mark_log_callback(db_impl_, 249 db_impl_->immutable_db_options().two_write_queues); 250 251 WriteCallback* const kNoWriteCallback = nullptr; 252 const uint64_t kRefNoLog = 0; 253 const bool kDisableMemtable = true; 254 SequenceNumber* const KIgnoreSeqUsed = nullptr; 255 const size_t kNoBatchCount = 0; 256 Status s = db_impl_->WriteImpl( 257 write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback, 258 &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount, 259 &mark_log_callback); 260 return s; 261 } 262 263 Status PessimisticTransaction::Commit() { 264 Status s; 265 bool commit_without_prepare = false; 266 bool commit_prepared = false; 267 268 if (IsExpired()) { 269 return Status::Expired(); 270 } 271 272 if (expiration_time_ > 0) { 273 // we must atomicaly compare and exchange the state here because at 274 // this state in the transaction it is possible for another thread 275 // to change our state out from under us in the even that we expire and have 276 // our locks stolen. In this case the only valid state is STARTED because 277 // a state of PREPARED would have a cleared expiration_time_. 278 TransactionState expected = STARTED; 279 commit_without_prepare = std::atomic_compare_exchange_strong( 280 &txn_state_, &expected, AWAITING_COMMIT); 281 TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); 282 } else if (txn_state_ == PREPARED) { 283 // expiration and lock stealing is not a concern 284 commit_prepared = true; 285 } else if (txn_state_ == STARTED) { 286 // expiration and lock stealing is not a concern 287 commit_without_prepare = true; 288 // TODO(myabandeh): what if the user mistakenly forgets prepare? We should 289 // add an option so that the user explictly express the intention of 290 // skipping the prepare phase. 291 } 292 293 if (commit_without_prepare) { 294 assert(!commit_prepared); 295 if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) { 296 s = Status::InvalidArgument( 297 "Commit-time batch contains values that will not be committed."); 298 } else { 299 txn_state_.store(AWAITING_COMMIT); 300 if (log_number_ > 0) { 301 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( 302 log_number_); 303 } 304 s = CommitWithoutPrepareInternal(); 305 if (!name_.empty()) { 306 txn_db_impl_->UnregisterTransaction(this); 307 } 308 Clear(); 309 if (s.ok()) { 310 txn_state_.store(COMMITED); 311 } 312 } 313 } else if (commit_prepared) { 314 txn_state_.store(AWAITING_COMMIT); 315 316 s = CommitInternal(); 317 318 if (!s.ok()) { 319 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, 320 "Commit write failed"); 321 return s; 322 } 323 324 // FindObsoleteFiles must now look to the memtables 325 // to determine what prep logs must be kept around, 326 // not the prep section heap. 327 assert(log_number_ > 0); 328 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( 329 log_number_); 330 txn_db_impl_->UnregisterTransaction(this); 331 332 Clear(); 333 txn_state_.store(COMMITED); 334 } else if (txn_state_ == LOCKS_STOLEN) { 335 s = Status::Expired(); 336 } else if (txn_state_ == COMMITED) { 337 s = Status::InvalidArgument("Transaction has already been committed."); 338 } else if (txn_state_ == ROLLEDBACK) { 339 s = Status::InvalidArgument("Transaction has already been rolledback."); 340 } else { 341 s = Status::InvalidArgument("Transaction is not in state for commit."); 342 } 343 344 return s; 345 } 346 347 Status WriteCommittedTxn::CommitWithoutPrepareInternal() { 348 uint64_t seq_used = kMaxSequenceNumber; 349 auto s = 350 db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(), 351 /*callback*/ nullptr, /*log_used*/ nullptr, 352 /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used); 353 assert(!s.ok() || seq_used != kMaxSequenceNumber); 354 if (s.ok()) { 355 SetId(seq_used); 356 } 357 return s; 358 } 359 360 Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) { 361 uint64_t seq_used = kMaxSequenceNumber; 362 auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr, 363 /*log_used*/ nullptr, /*log_ref*/ 0, 364 /*disable_memtable*/ false, &seq_used); 365 assert(!s.ok() || seq_used != kMaxSequenceNumber); 366 if (s.ok()) { 367 SetId(seq_used); 368 } 369 return s; 370 } 371 372 Status WriteCommittedTxn::CommitInternal() { 373 // We take the commit-time batch and append the Commit marker. 374 // The Memtable will ignore the Commit marker in non-recovery mode 375 WriteBatch* working_batch = GetCommitTimeWriteBatch(); 376 WriteBatchInternal::MarkCommit(working_batch, name_); 377 378 // any operations appended to this working_batch will be ignored from WAL 379 working_batch->MarkWalTerminationPoint(); 380 381 // insert prepared batch into Memtable only skipping WAL. 382 // Memtable will ignore BeginPrepare/EndPrepare markers 383 // in non recovery mode and simply insert the values 384 WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch()); 385 386 uint64_t seq_used = kMaxSequenceNumber; 387 auto s = 388 db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr, 389 /*log_used*/ nullptr, /*log_ref*/ log_number_, 390 /*disable_memtable*/ false, &seq_used); 391 assert(!s.ok() || seq_used != kMaxSequenceNumber); 392 if (s.ok()) { 393 SetId(seq_used); 394 } 395 return s; 396 } 397 398 Status PessimisticTransaction::Rollback() { 399 Status s; 400 if (txn_state_ == PREPARED) { 401 txn_state_.store(AWAITING_ROLLBACK); 402 403 s = RollbackInternal(); 404 405 if (s.ok()) { 406 // we do not need to keep our prepared section around 407 assert(log_number_ > 0); 408 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( 409 log_number_); 410 Clear(); 411 txn_state_.store(ROLLEDBACK); 412 } 413 } else if (txn_state_ == STARTED) { 414 if (log_number_ > 0) { 415 assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); 416 assert(GetId() > 0); 417 s = RollbackInternal(); 418 419 if (s.ok()) { 420 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( 421 log_number_); 422 } 423 } 424 // prepare couldn't have taken place 425 Clear(); 426 } else if (txn_state_ == COMMITED) { 427 s = Status::InvalidArgument("This transaction has already been committed."); 428 } else { 429 s = Status::InvalidArgument( 430 "Two phase transaction is not in state for rollback."); 431 } 432 433 return s; 434 } 435 436 Status WriteCommittedTxn::RollbackInternal() { 437 WriteBatch rollback_marker; 438 WriteBatchInternal::MarkRollback(&rollback_marker, name_); 439 auto s = db_impl_->WriteImpl(write_options_, &rollback_marker); 440 return s; 441 } 442 443 Status PessimisticTransaction::RollbackToSavePoint() { 444 if (txn_state_ != STARTED) { 445 return Status::InvalidArgument("Transaction is beyond state for rollback."); 446 } 447 448 // Unlock any keys locked since last transaction 449 const std::unique_ptr<TransactionKeyMap>& keys = 450 GetTrackedKeysSinceSavePoint(); 451 452 if (keys) { 453 txn_db_impl_->UnLock(this, keys.get()); 454 } 455 456 return TransactionBaseImpl::RollbackToSavePoint(); 457 } 458 459 // Lock all keys in this batch. 460 // On success, caller should unlock keys_to_unlock 461 Status PessimisticTransaction::LockBatch(WriteBatch* batch, 462 TransactionKeyMap* keys_to_unlock) { 463 class Handler : public WriteBatch::Handler { 464 public: 465 // Sorted map of column_family_id to sorted set of keys. 466 // Since LockBatch() always locks keys in sorted order, it cannot deadlock 467 // with itself. We're not using a comparator here since it doesn't matter 468 // what the sorting is as long as it's consistent. 469 std::map<uint32_t, std::set<std::string>> keys_; 470 471 Handler() {} 472 473 void RecordKey(uint32_t column_family_id, const Slice& key) { 474 std::string key_str = key.ToString(); 475 476 auto& cfh_keys = keys_[column_family_id]; 477 auto iter = cfh_keys.find(key_str); 478 if (iter == cfh_keys.end()) { 479 // key not yet seen, store it. 480 cfh_keys.insert({std::move(key_str)}); 481 } 482 } 483 484 Status PutCF(uint32_t column_family_id, const Slice& key, 485 const Slice& /* unused */) override { 486 RecordKey(column_family_id, key); 487 return Status::OK(); 488 } 489 Status MergeCF(uint32_t column_family_id, const Slice& key, 490 const Slice& /* unused */) override { 491 RecordKey(column_family_id, key); 492 return Status::OK(); 493 } 494 Status DeleteCF(uint32_t column_family_id, const Slice& key) override { 495 RecordKey(column_family_id, key); 496 return Status::OK(); 497 } 498 }; 499 500 // Iterating on this handler will add all keys in this batch into keys 501 Handler handler; 502 batch->Iterate(&handler); 503 504 Status s; 505 506 // Attempt to lock all keys 507 for (const auto& cf_iter : handler.keys_) { 508 uint32_t cfh_id = cf_iter.first; 509 auto& cfh_keys = cf_iter.second; 510 511 for (const auto& key_iter : cfh_keys) { 512 const std::string& key = key_iter; 513 514 s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */); 515 if (!s.ok()) { 516 break; 517 } 518 TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber, 519 false, true /* exclusive */); 520 } 521 522 if (!s.ok()) { 523 break; 524 } 525 } 526 527 if (!s.ok()) { 528 txn_db_impl_->UnLock(this, keys_to_unlock); 529 } 530 531 return s; 532 } 533 534 // Attempt to lock this key. 535 // Returns OK if the key has been successfully locked. Non-ok, otherwise. 536 // If check_shapshot is true and this transaction has a snapshot set, 537 // this key will only be locked if there have been no writes to this key since 538 // the snapshot time. 539 Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, 540 const Slice& key, bool read_only, 541 bool exclusive, const bool do_validate, 542 const bool assume_tracked) { 543 assert(!assume_tracked || !do_validate); 544 Status s; 545 if (UNLIKELY(skip_concurrency_control_)) { 546 return s; 547 } 548 uint32_t cfh_id = GetColumnFamilyID(column_family); 549 std::string key_str = key.ToString(); 550 bool previously_locked; 551 bool lock_upgrade = false; 552 553 // lock this key if this transactions hasn't already locked it 554 SequenceNumber tracked_at_seq = kMaxSequenceNumber; 555 556 const auto& tracked_keys = GetTrackedKeys(); 557 const auto tracked_keys_cf = tracked_keys.find(cfh_id); 558 if (tracked_keys_cf == tracked_keys.end()) { 559 previously_locked = false; 560 } else { 561 auto iter = tracked_keys_cf->second.find(key_str); 562 if (iter == tracked_keys_cf->second.end()) { 563 previously_locked = false; 564 } else { 565 if (!iter->second.exclusive && exclusive) { 566 lock_upgrade = true; 567 } 568 previously_locked = true; 569 tracked_at_seq = iter->second.seq; 570 } 571 } 572 573 // Lock this key if this transactions hasn't already locked it or we require 574 // an upgrade. 575 if (!previously_locked || lock_upgrade) { 576 s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive); 577 } 578 579 SetSnapshotIfNeeded(); 580 581 // Even though we do not care about doing conflict checking for this write, 582 // we still need to take a lock to make sure we do not cause a conflict with 583 // some other write. However, we do not need to check if there have been 584 // any writes since this transaction's snapshot. 585 // TODO(agiardullo): could optimize by supporting shared txn locks in the 586 // future 587 if (!do_validate || snapshot_ == nullptr) { 588 if (assume_tracked && !previously_locked) { 589 s = Status::InvalidArgument( 590 "assume_tracked is set but it is not tracked yet"); 591 } 592 // Need to remember the earliest sequence number that we know that this 593 // key has not been modified after. This is useful if this same 594 // transaction 595 // later tries to lock this key again. 596 if (tracked_at_seq == kMaxSequenceNumber) { 597 // Since we haven't checked a snapshot, we only know this key has not 598 // been modified since after we locked it. 599 // Note: when last_seq_same_as_publish_seq_==false this is less than the 600 // latest allocated seq but it is ok since i) this is just a heuristic 601 // used only as a hint to avoid actual check for conflicts, ii) this would 602 // cause a false positive only if the snapthot is taken right after the 603 // lock, which would be an unusual sequence. 604 tracked_at_seq = db_->GetLatestSequenceNumber(); 605 } 606 } else { 607 // If a snapshot is set, we need to make sure the key hasn't been modified 608 // since the snapshot. This must be done after we locked the key. 609 // If we already have validated an earilier snapshot it must has been 610 // reflected in tracked_at_seq and ValidateSnapshot will return OK. 611 if (s.ok()) { 612 s = ValidateSnapshot(column_family, key, &tracked_at_seq); 613 614 if (!s.ok()) { 615 // Failed to validate key 616 if (!previously_locked) { 617 // Unlock key we just locked 618 if (lock_upgrade) { 619 s = txn_db_impl_->TryLock(this, cfh_id, key_str, 620 false /* exclusive */); 621 assert(s.ok()); 622 } else { 623 txn_db_impl_->UnLock(this, cfh_id, key.ToString()); 624 } 625 } 626 } 627 } 628 } 629 630 if (s.ok()) { 631 // We must track all the locked keys so that we can unlock them later. If 632 // the key is already locked, this func will update some stats on the 633 // tracked key. It could also update the tracked_at_seq if it is lower 634 // than the existing tracked key seq. These stats are necessary for 635 // RollbackToSavePoint to determine whether a key can be safely removed 636 // from tracked_keys_. Removal can only be done if a key was only locked 637 // during the current savepoint. 638 // 639 // Recall that if assume_tracked is true, we assume that TrackKey has been 640 // called previously since the last savepoint, with the same exclusive 641 // setting, and at a lower sequence number, so skipping here should be 642 // safe. 643 if (!assume_tracked) { 644 TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive); 645 } else { 646 #ifndef NDEBUG 647 assert(tracked_keys_cf->second.count(key_str) > 0); 648 const auto& info = tracked_keys_cf->second.find(key_str)->second; 649 assert(info.seq <= tracked_at_seq); 650 assert(info.exclusive == exclusive); 651 #endif 652 } 653 } 654 655 return s; 656 } 657 658 // Return OK() if this key has not been modified more recently than the 659 // transaction snapshot_. 660 // tracked_at_seq is the global seq at which we either locked the key or already 661 // have done ValidateSnapshot. 662 Status PessimisticTransaction::ValidateSnapshot( 663 ColumnFamilyHandle* column_family, const Slice& key, 664 SequenceNumber* tracked_at_seq) { 665 assert(snapshot_); 666 667 SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); 668 if (*tracked_at_seq <= snap_seq) { 669 // If the key has been previous validated (or locked) at a sequence number 670 // earlier than the current snapshot's sequence number, we already know it 671 // has not been modified aftter snap_seq either. 672 return Status::OK(); 673 } 674 // Otherwise we have either 675 // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key 676 // 2: snap_seq < tracked_at_seq: last time we lock the key was via 677 // do_validate=false which means we had skipped ValidateSnapshot. In both 678 // cases we should do ValidateSnapshot now. 679 680 *tracked_at_seq = snap_seq; 681 682 ColumnFamilyHandle* cfh = 683 column_family ? column_family : db_impl_->DefaultColumnFamily(); 684 685 return TransactionUtil::CheckKeyForConflicts( 686 db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */); 687 } 688 689 bool PessimisticTransaction::TryStealingLocks() { 690 assert(IsExpired()); 691 TransactionState expected = STARTED; 692 return std::atomic_compare_exchange_strong(&txn_state_, &expected, 693 LOCKS_STOLEN); 694 } 695 696 void PessimisticTransaction::UnlockGetForUpdate( 697 ColumnFamilyHandle* column_family, const Slice& key) { 698 txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); 699 } 700 701 Status PessimisticTransaction::SetName(const TransactionName& name) { 702 Status s; 703 if (txn_state_ == STARTED) { 704 if (name_.length()) { 705 s = Status::InvalidArgument("Transaction has already been named."); 706 } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) { 707 s = Status::InvalidArgument("Transaction name must be unique."); 708 } else if (name.length() < 1 || name.length() > 512) { 709 s = Status::InvalidArgument( 710 "Transaction name length must be between 1 and 512 chars."); 711 } else { 712 name_ = name; 713 txn_db_impl_->RegisterTransaction(this); 714 } 715 } else { 716 s = Status::InvalidArgument("Transaction is beyond state for naming."); 717 } 718 return s; 719 } 720 721 } // namespace ROCKSDB_NAMESPACE 722 723 #endif // ROCKSDB_LITE 724