1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/pessimistic_transaction.h"
9 
10 #include <map>
11 #include <set>
12 #include <string>
13 #include <vector>
14 
15 #include "db/column_family.h"
16 #include "db/db_impl/db_impl.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/snapshot.h"
20 #include "rocksdb/status.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "test_util/sync_point.h"
23 #include "util/cast_util.h"
24 #include "util/string_util.h"
25 #include "utilities/transactions/pessimistic_transaction_db.h"
26 #include "utilities/transactions/transaction_util.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 
30 struct WriteOptions;
31 
32 std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1);
33 
GenTxnID()34 TransactionID PessimisticTransaction::GenTxnID() {
35   return txn_id_counter_.fetch_add(1);
36 }
37 
PessimisticTransaction(TransactionDB * txn_db,const WriteOptions & write_options,const TransactionOptions & txn_options,const bool init)38 PessimisticTransaction::PessimisticTransaction(
39     TransactionDB* txn_db, const WriteOptions& write_options,
40     const TransactionOptions& txn_options, const bool init)
41     : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
42       txn_db_impl_(nullptr),
43       expiration_time_(0),
44       txn_id_(0),
45       waiting_cf_id_(0),
46       waiting_key_(nullptr),
47       lock_timeout_(0),
48       deadlock_detect_(false),
49       deadlock_detect_depth_(0),
50       skip_concurrency_control_(false) {
51   txn_db_impl_ =
52       static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
53   db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
54   if (init) {
55     Initialize(txn_options);
56   }
57 }
58 
Initialize(const TransactionOptions & txn_options)59 void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
60   txn_id_ = GenTxnID();
61 
62   txn_state_ = STARTED;
63 
64   deadlock_detect_ = txn_options.deadlock_detect;
65   deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
66   write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
67   skip_concurrency_control_ = txn_options.skip_concurrency_control;
68 
69   lock_timeout_ = txn_options.lock_timeout * 1000;
70   if (lock_timeout_ < 0) {
71     // Lock timeout not set, use default
72     lock_timeout_ =
73         txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
74   }
75 
76   if (txn_options.expiration >= 0) {
77     expiration_time_ = start_time_ + txn_options.expiration * 1000;
78   } else {
79     expiration_time_ = 0;
80   }
81 
82   if (txn_options.set_snapshot) {
83     SetSnapshot();
84   }
85 
86   if (expiration_time_ > 0) {
87     txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
88   }
89   use_only_the_last_commit_time_batch_for_recovery_ =
90       txn_options.use_only_the_last_commit_time_batch_for_recovery;
91 }
92 
~PessimisticTransaction()93 PessimisticTransaction::~PessimisticTransaction() {
94   txn_db_impl_->UnLock(this, &GetTrackedKeys());
95   if (expiration_time_ > 0) {
96     txn_db_impl_->RemoveExpirableTransaction(txn_id_);
97   }
98   if (!name_.empty() && txn_state_ != COMMITED) {
99     txn_db_impl_->UnregisterTransaction(this);
100   }
101 }
102 
Clear()103 void PessimisticTransaction::Clear() {
104   txn_db_impl_->UnLock(this, &GetTrackedKeys());
105   TransactionBaseImpl::Clear();
106 }
107 
Reinitialize(TransactionDB * txn_db,const WriteOptions & write_options,const TransactionOptions & txn_options)108 void PessimisticTransaction::Reinitialize(
109     TransactionDB* txn_db, const WriteOptions& write_options,
110     const TransactionOptions& txn_options) {
111   if (!name_.empty() && txn_state_ != COMMITED) {
112     txn_db_impl_->UnregisterTransaction(this);
113   }
114   TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
115   Initialize(txn_options);
116 }
117 
IsExpired() const118 bool PessimisticTransaction::IsExpired() const {
119   if (expiration_time_ > 0) {
120     if (db_->GetEnv()->NowMicros() >= expiration_time_) {
121       // Transaction is expired.
122       return true;
123     }
124   }
125 
126   return false;
127 }
128 
WriteCommittedTxn(TransactionDB * txn_db,const WriteOptions & write_options,const TransactionOptions & txn_options)129 WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
130                                      const WriteOptions& write_options,
131                                      const TransactionOptions& txn_options)
132     : PessimisticTransaction(txn_db, write_options, txn_options){};
133 
CommitBatch(WriteBatch * batch)134 Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
135   TransactionKeyMap keys_to_unlock;
136   Status s = LockBatch(batch, &keys_to_unlock);
137 
138   if (!s.ok()) {
139     return s;
140   }
141 
142   bool can_commit = false;
143 
144   if (IsExpired()) {
145     s = Status::Expired();
146   } else if (expiration_time_ > 0) {
147     TransactionState expected = STARTED;
148     can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,
149                                                      AWAITING_COMMIT);
150   } else if (txn_state_ == STARTED) {
151     // lock stealing is not a concern
152     can_commit = true;
153   }
154 
155   if (can_commit) {
156     txn_state_.store(AWAITING_COMMIT);
157     s = CommitBatchInternal(batch);
158     if (s.ok()) {
159       txn_state_.store(COMMITED);
160     }
161   } else if (txn_state_ == LOCKS_STOLEN) {
162     s = Status::Expired();
163   } else {
164     s = Status::InvalidArgument("Transaction is not in state for commit.");
165   }
166 
167   txn_db_impl_->UnLock(this, &keys_to_unlock);
168 
169   return s;
170 }
171 
Prepare()172 Status PessimisticTransaction::Prepare() {
173   Status s;
174 
175   if (name_.empty()) {
176     return Status::InvalidArgument(
177         "Cannot prepare a transaction that has not been named.");
178   }
179 
180   if (IsExpired()) {
181     return Status::Expired();
182   }
183 
184   bool can_prepare = false;
185 
186   if (expiration_time_ > 0) {
187     // must concern ourselves with expiraton and/or lock stealing
188     // need to compare/exchange bc locks could be stolen under us here
189     TransactionState expected = STARTED;
190     can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,
191                                                       AWAITING_PREPARE);
192   } else if (txn_state_ == STARTED) {
193     // expiration and lock stealing is not possible
194     can_prepare = true;
195   }
196 
197   if (can_prepare) {
198     txn_state_.store(AWAITING_PREPARE);
199     // transaction can't expire after preparation
200     expiration_time_ = 0;
201     assert(log_number_ == 0 ||
202            txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
203 
204     s = PrepareInternal();
205     if (s.ok()) {
206       txn_state_.store(PREPARED);
207     }
208   } else if (txn_state_ == LOCKS_STOLEN) {
209     s = Status::Expired();
210   } else if (txn_state_ == PREPARED) {
211     s = Status::InvalidArgument("Transaction has already been prepared.");
212   } else if (txn_state_ == COMMITED) {
213     s = Status::InvalidArgument("Transaction has already been committed.");
214   } else if (txn_state_ == ROLLEDBACK) {
215     s = Status::InvalidArgument("Transaction has already been rolledback.");
216   } else {
217     s = Status::InvalidArgument("Transaction is not in state for commit.");
218   }
219 
220   return s;
221 }
222 
PrepareInternal()223 Status WriteCommittedTxn::PrepareInternal() {
224   WriteOptions write_options = write_options_;
225   write_options.disableWAL = false;
226   WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
227   class MarkLogCallback : public PreReleaseCallback {
228    public:
229     MarkLogCallback(DBImpl* db, bool two_write_queues)
230         : db_(db), two_write_queues_(two_write_queues) {
231       (void)two_write_queues_;  // to silence unused private field warning
232     }
233     virtual Status Callback(SequenceNumber, bool is_mem_disabled,
234                             uint64_t log_number, size_t /*index*/,
235                             size_t /*total*/) override {
236 #ifdef NDEBUG
237       (void)is_mem_disabled;
238 #endif
239       assert(log_number != 0);
240       assert(!two_write_queues_ || is_mem_disabled);  // implies the 2nd queue
241       db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number);
242       return Status::OK();
243     }
244 
245    private:
246     DBImpl* db_;
247     bool two_write_queues_;
248   } mark_log_callback(db_impl_,
249                       db_impl_->immutable_db_options().two_write_queues);
250 
251   WriteCallback* const kNoWriteCallback = nullptr;
252   const uint64_t kRefNoLog = 0;
253   const bool kDisableMemtable = true;
254   SequenceNumber* const KIgnoreSeqUsed = nullptr;
255   const size_t kNoBatchCount = 0;
256   Status s = db_impl_->WriteImpl(
257       write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback,
258       &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
259       &mark_log_callback);
260   return s;
261 }
262 
Commit()263 Status PessimisticTransaction::Commit() {
264   Status s;
265   bool commit_without_prepare = false;
266   bool commit_prepared = false;
267 
268   if (IsExpired()) {
269     return Status::Expired();
270   }
271 
272   if (expiration_time_ > 0) {
273     // we must atomicaly compare and exchange the state here because at
274     // this state in the transaction it is possible for another thread
275     // to change our state out from under us in the even that we expire and have
276     // our locks stolen. In this case the only valid state is STARTED because
277     // a state of PREPARED would have a cleared expiration_time_.
278     TransactionState expected = STARTED;
279     commit_without_prepare = std::atomic_compare_exchange_strong(
280         &txn_state_, &expected, AWAITING_COMMIT);
281     TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
282   } else if (txn_state_ == PREPARED) {
283     // expiration and lock stealing is not a concern
284     commit_prepared = true;
285   } else if (txn_state_ == STARTED) {
286     // expiration and lock stealing is not a concern
287     commit_without_prepare = true;
288     // TODO(myabandeh): what if the user mistakenly forgets prepare? We should
289     // add an option so that the user explictly express the intention of
290     // skipping the prepare phase.
291   }
292 
293   if (commit_without_prepare) {
294     assert(!commit_prepared);
295     if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
296       s = Status::InvalidArgument(
297           "Commit-time batch contains values that will not be committed.");
298     } else {
299       txn_state_.store(AWAITING_COMMIT);
300       if (log_number_ > 0) {
301         dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
302             log_number_);
303       }
304       s = CommitWithoutPrepareInternal();
305       if (!name_.empty()) {
306         txn_db_impl_->UnregisterTransaction(this);
307       }
308       Clear();
309       if (s.ok()) {
310         txn_state_.store(COMMITED);
311       }
312     }
313   } else if (commit_prepared) {
314     txn_state_.store(AWAITING_COMMIT);
315 
316     s = CommitInternal();
317 
318     if (!s.ok()) {
319       ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
320                      "Commit write failed");
321       return s;
322     }
323 
324     // FindObsoleteFiles must now look to the memtables
325     // to determine what prep logs must be kept around,
326     // not the prep section heap.
327     assert(log_number_ > 0);
328     dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
329         log_number_);
330     txn_db_impl_->UnregisterTransaction(this);
331 
332     Clear();
333     txn_state_.store(COMMITED);
334   } else if (txn_state_ == LOCKS_STOLEN) {
335     s = Status::Expired();
336   } else if (txn_state_ == COMMITED) {
337     s = Status::InvalidArgument("Transaction has already been committed.");
338   } else if (txn_state_ == ROLLEDBACK) {
339     s = Status::InvalidArgument("Transaction has already been rolledback.");
340   } else {
341     s = Status::InvalidArgument("Transaction is not in state for commit.");
342   }
343 
344   return s;
345 }
346 
CommitWithoutPrepareInternal()347 Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
348   uint64_t seq_used = kMaxSequenceNumber;
349   auto s =
350       db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),
351                           /*callback*/ nullptr, /*log_used*/ nullptr,
352                           /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
353   assert(!s.ok() || seq_used != kMaxSequenceNumber);
354   if (s.ok()) {
355     SetId(seq_used);
356   }
357   return s;
358 }
359 
CommitBatchInternal(WriteBatch * batch,size_t)360 Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
361   uint64_t seq_used = kMaxSequenceNumber;
362   auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr,
363                                /*log_used*/ nullptr, /*log_ref*/ 0,
364                                /*disable_memtable*/ false, &seq_used);
365   assert(!s.ok() || seq_used != kMaxSequenceNumber);
366   if (s.ok()) {
367     SetId(seq_used);
368   }
369   return s;
370 }
371 
CommitInternal()372 Status WriteCommittedTxn::CommitInternal() {
373   // We take the commit-time batch and append the Commit marker.
374   // The Memtable will ignore the Commit marker in non-recovery mode
375   WriteBatch* working_batch = GetCommitTimeWriteBatch();
376   WriteBatchInternal::MarkCommit(working_batch, name_);
377 
378   // any operations appended to this working_batch will be ignored from WAL
379   working_batch->MarkWalTerminationPoint();
380 
381   // insert prepared batch into Memtable only skipping WAL.
382   // Memtable will ignore BeginPrepare/EndPrepare markers
383   // in non recovery mode and simply insert the values
384   WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
385 
386   uint64_t seq_used = kMaxSequenceNumber;
387   auto s =
388       db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
389                           /*log_used*/ nullptr, /*log_ref*/ log_number_,
390                           /*disable_memtable*/ false, &seq_used);
391   assert(!s.ok() || seq_used != kMaxSequenceNumber);
392   if (s.ok()) {
393     SetId(seq_used);
394   }
395   return s;
396 }
397 
Rollback()398 Status PessimisticTransaction::Rollback() {
399   Status s;
400   if (txn_state_ == PREPARED) {
401     txn_state_.store(AWAITING_ROLLBACK);
402 
403     s = RollbackInternal();
404 
405     if (s.ok()) {
406       // we do not need to keep our prepared section around
407       assert(log_number_ > 0);
408       dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
409           log_number_);
410       Clear();
411       txn_state_.store(ROLLEDBACK);
412     }
413   } else if (txn_state_ == STARTED) {
414     if (log_number_ > 0) {
415       assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
416       assert(GetId() > 0);
417       s = RollbackInternal();
418 
419       if (s.ok()) {
420         dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
421             log_number_);
422       }
423     }
424     // prepare couldn't have taken place
425     Clear();
426   } else if (txn_state_ == COMMITED) {
427     s = Status::InvalidArgument("This transaction has already been committed.");
428   } else {
429     s = Status::InvalidArgument(
430         "Two phase transaction is not in state for rollback.");
431   }
432 
433   return s;
434 }
435 
RollbackInternal()436 Status WriteCommittedTxn::RollbackInternal() {
437   WriteBatch rollback_marker;
438   WriteBatchInternal::MarkRollback(&rollback_marker, name_);
439   auto s = db_impl_->WriteImpl(write_options_, &rollback_marker);
440   return s;
441 }
442 
RollbackToSavePoint()443 Status PessimisticTransaction::RollbackToSavePoint() {
444   if (txn_state_ != STARTED) {
445     return Status::InvalidArgument("Transaction is beyond state for rollback.");
446   }
447 
448   // Unlock any keys locked since last transaction
449   const std::unique_ptr<TransactionKeyMap>& keys =
450       GetTrackedKeysSinceSavePoint();
451 
452   if (keys) {
453     txn_db_impl_->UnLock(this, keys.get());
454   }
455 
456   return TransactionBaseImpl::RollbackToSavePoint();
457 }
458 
459 // Lock all keys in this batch.
460 // On success, caller should unlock keys_to_unlock
LockBatch(WriteBatch * batch,TransactionKeyMap * keys_to_unlock)461 Status PessimisticTransaction::LockBatch(WriteBatch* batch,
462                                          TransactionKeyMap* keys_to_unlock) {
463   class Handler : public WriteBatch::Handler {
464    public:
465     // Sorted map of column_family_id to sorted set of keys.
466     // Since LockBatch() always locks keys in sorted order, it cannot deadlock
467     // with itself.  We're not using a comparator here since it doesn't matter
468     // what the sorting is as long as it's consistent.
469     std::map<uint32_t, std::set<std::string>> keys_;
470 
471     Handler() {}
472 
473     void RecordKey(uint32_t column_family_id, const Slice& key) {
474       std::string key_str = key.ToString();
475 
476       auto& cfh_keys = keys_[column_family_id];
477       auto iter = cfh_keys.find(key_str);
478       if (iter == cfh_keys.end()) {
479         // key not yet seen, store it.
480         cfh_keys.insert({std::move(key_str)});
481       }
482     }
483 
484     Status PutCF(uint32_t column_family_id, const Slice& key,
485                  const Slice& /* unused */) override {
486       RecordKey(column_family_id, key);
487       return Status::OK();
488     }
489     Status MergeCF(uint32_t column_family_id, const Slice& key,
490                    const Slice& /* unused */) override {
491       RecordKey(column_family_id, key);
492       return Status::OK();
493     }
494     Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
495       RecordKey(column_family_id, key);
496       return Status::OK();
497     }
498   };
499 
500   // Iterating on this handler will add all keys in this batch into keys
501   Handler handler;
502   batch->Iterate(&handler);
503 
504   Status s;
505 
506   // Attempt to lock all keys
507   for (const auto& cf_iter : handler.keys_) {
508     uint32_t cfh_id = cf_iter.first;
509     auto& cfh_keys = cf_iter.second;
510 
511     for (const auto& key_iter : cfh_keys) {
512       const std::string& key = key_iter;
513 
514       s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);
515       if (!s.ok()) {
516         break;
517       }
518       TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
519                false, true /* exclusive */);
520     }
521 
522     if (!s.ok()) {
523       break;
524     }
525   }
526 
527   if (!s.ok()) {
528     txn_db_impl_->UnLock(this, keys_to_unlock);
529   }
530 
531   return s;
532 }
533 
534 // Attempt to lock this key.
535 // Returns OK if the key has been successfully locked.  Non-ok, otherwise.
536 // If check_shapshot is true and this transaction has a snapshot set,
537 // this key will only be locked if there have been no writes to this key since
538 // the snapshot time.
TryLock(ColumnFamilyHandle * column_family,const Slice & key,bool read_only,bool exclusive,const bool do_validate,const bool assume_tracked)539 Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
540                                        const Slice& key, bool read_only,
541                                        bool exclusive, const bool do_validate,
542                                        const bool assume_tracked) {
543   assert(!assume_tracked || !do_validate);
544   Status s;
545   if (UNLIKELY(skip_concurrency_control_)) {
546     return s;
547   }
548   uint32_t cfh_id = GetColumnFamilyID(column_family);
549   std::string key_str = key.ToString();
550   bool previously_locked;
551   bool lock_upgrade = false;
552 
553   // lock this key if this transactions hasn't already locked it
554   SequenceNumber tracked_at_seq = kMaxSequenceNumber;
555 
556   const auto& tracked_keys = GetTrackedKeys();
557   const auto tracked_keys_cf = tracked_keys.find(cfh_id);
558   if (tracked_keys_cf == tracked_keys.end()) {
559     previously_locked = false;
560   } else {
561     auto iter = tracked_keys_cf->second.find(key_str);
562     if (iter == tracked_keys_cf->second.end()) {
563       previously_locked = false;
564     } else {
565       if (!iter->second.exclusive && exclusive) {
566         lock_upgrade = true;
567       }
568       previously_locked = true;
569       tracked_at_seq = iter->second.seq;
570     }
571   }
572 
573   // Lock this key if this transactions hasn't already locked it or we require
574   // an upgrade.
575   if (!previously_locked || lock_upgrade) {
576     s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
577   }
578 
579   SetSnapshotIfNeeded();
580 
581   // Even though we do not care about doing conflict checking for this write,
582   // we still need to take a lock to make sure we do not cause a conflict with
583   // some other write.  However, we do not need to check if there have been
584   // any writes since this transaction's snapshot.
585   // TODO(agiardullo): could optimize by supporting shared txn locks in the
586   // future
587   if (!do_validate || snapshot_ == nullptr) {
588     if (assume_tracked && !previously_locked) {
589       s = Status::InvalidArgument(
590           "assume_tracked is set but it is not tracked yet");
591     }
592     // Need to remember the earliest sequence number that we know that this
593     // key has not been modified after.  This is useful if this same
594     // transaction
595     // later tries to lock this key again.
596     if (tracked_at_seq == kMaxSequenceNumber) {
597       // Since we haven't checked a snapshot, we only know this key has not
598       // been modified since after we locked it.
599       // Note: when last_seq_same_as_publish_seq_==false this is less than the
600       // latest allocated seq but it is ok since i) this is just a heuristic
601       // used only as a hint to avoid actual check for conflicts, ii) this would
602       // cause a false positive only if the snapthot is taken right after the
603       // lock, which would be an unusual sequence.
604       tracked_at_seq = db_->GetLatestSequenceNumber();
605     }
606   } else {
607     // If a snapshot is set, we need to make sure the key hasn't been modified
608     // since the snapshot.  This must be done after we locked the key.
609     // If we already have validated an earilier snapshot it must has been
610     // reflected in tracked_at_seq and ValidateSnapshot will return OK.
611     if (s.ok()) {
612       s = ValidateSnapshot(column_family, key, &tracked_at_seq);
613 
614       if (!s.ok()) {
615         // Failed to validate key
616         if (!previously_locked) {
617           // Unlock key we just locked
618           if (lock_upgrade) {
619             s = txn_db_impl_->TryLock(this, cfh_id, key_str,
620                                       false /* exclusive */);
621             assert(s.ok());
622           } else {
623             txn_db_impl_->UnLock(this, cfh_id, key.ToString());
624           }
625         }
626       }
627     }
628   }
629 
630   if (s.ok()) {
631     // We must track all the locked keys so that we can unlock them later. If
632     // the key is already locked, this func will update some stats on the
633     // tracked key. It could also update the tracked_at_seq if it is lower
634     // than the existing tracked key seq. These stats are necessary for
635     // RollbackToSavePoint to determine whether a key can be safely removed
636     // from tracked_keys_. Removal can only be done if a key was only locked
637     // during the current savepoint.
638     //
639     // Recall that if assume_tracked is true, we assume that TrackKey has been
640     // called previously since the last savepoint, with the same exclusive
641     // setting, and at a lower sequence number, so skipping here should be
642     // safe.
643     if (!assume_tracked) {
644       TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
645     } else {
646 #ifndef NDEBUG
647       assert(tracked_keys_cf->second.count(key_str) > 0);
648       const auto& info = tracked_keys_cf->second.find(key_str)->second;
649       assert(info.seq <= tracked_at_seq);
650       assert(info.exclusive == exclusive);
651 #endif
652     }
653   }
654 
655   return s;
656 }
657 
658 // Return OK() if this key has not been modified more recently than the
659 // transaction snapshot_.
660 // tracked_at_seq is the global seq at which we either locked the key or already
661 // have done ValidateSnapshot.
ValidateSnapshot(ColumnFamilyHandle * column_family,const Slice & key,SequenceNumber * tracked_at_seq)662 Status PessimisticTransaction::ValidateSnapshot(
663     ColumnFamilyHandle* column_family, const Slice& key,
664     SequenceNumber* tracked_at_seq) {
665   assert(snapshot_);
666 
667   SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
668   if (*tracked_at_seq <= snap_seq) {
669     // If the key has been previous validated (or locked) at a sequence number
670     // earlier than the current snapshot's sequence number, we already know it
671     // has not been modified aftter snap_seq either.
672     return Status::OK();
673   }
674   // Otherwise we have either
675   // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
676   // 2: snap_seq < tracked_at_seq: last time we lock the key was via
677   // do_validate=false which means we had skipped ValidateSnapshot. In both
678   // cases we should do ValidateSnapshot now.
679 
680   *tracked_at_seq = snap_seq;
681 
682   ColumnFamilyHandle* cfh =
683       column_family ? column_family : db_impl_->DefaultColumnFamily();
684 
685   return TransactionUtil::CheckKeyForConflicts(
686       db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
687 }
688 
TryStealingLocks()689 bool PessimisticTransaction::TryStealingLocks() {
690   assert(IsExpired());
691   TransactionState expected = STARTED;
692   return std::atomic_compare_exchange_strong(&txn_state_, &expected,
693                                              LOCKS_STOLEN);
694 }
695 
UnlockGetForUpdate(ColumnFamilyHandle * column_family,const Slice & key)696 void PessimisticTransaction::UnlockGetForUpdate(
697     ColumnFamilyHandle* column_family, const Slice& key) {
698   txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
699 }
700 
SetName(const TransactionName & name)701 Status PessimisticTransaction::SetName(const TransactionName& name) {
702   Status s;
703   if (txn_state_ == STARTED) {
704     if (name_.length()) {
705       s = Status::InvalidArgument("Transaction has already been named.");
706     } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {
707       s = Status::InvalidArgument("Transaction name must be unique.");
708     } else if (name.length() < 1 || name.length() > 512) {
709       s = Status::InvalidArgument(
710           "Transaction name length must be between 1 and 512 chars.");
711     } else {
712       name_ = name;
713       txn_db_impl_->RegisterTransaction(this);
714     }
715   } else {
716     s = Status::InvalidArgument("Transaction is beyond state for naming.");
717   }
718   return s;
719 }
720 
721 }  // namespace ROCKSDB_NAMESPACE
722 
723 #endif  // ROCKSDB_LITE
724