1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/pessimistic_transaction.h"
9
10 #include <map>
11 #include <set>
12 #include <string>
13 #include <vector>
14
15 #include "db/column_family.h"
16 #include "db/db_impl/db_impl.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/snapshot.h"
20 #include "rocksdb/status.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "test_util/sync_point.h"
23 #include "util/cast_util.h"
24 #include "util/string_util.h"
25 #include "utilities/transactions/pessimistic_transaction_db.h"
26 #include "utilities/transactions/transaction_util.h"
27
28 namespace ROCKSDB_NAMESPACE {
29
30 struct WriteOptions;
31
32 std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1);
33
GenTxnID()34 TransactionID PessimisticTransaction::GenTxnID() {
35 return txn_id_counter_.fetch_add(1);
36 }
37
PessimisticTransaction(TransactionDB * txn_db,const WriteOptions & write_options,const TransactionOptions & txn_options,const bool init)38 PessimisticTransaction::PessimisticTransaction(
39 TransactionDB* txn_db, const WriteOptions& write_options,
40 const TransactionOptions& txn_options, const bool init)
41 : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
42 txn_db_impl_(nullptr),
43 expiration_time_(0),
44 txn_id_(0),
45 waiting_cf_id_(0),
46 waiting_key_(nullptr),
47 lock_timeout_(0),
48 deadlock_detect_(false),
49 deadlock_detect_depth_(0),
50 skip_concurrency_control_(false) {
51 txn_db_impl_ =
52 static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
53 db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
54 if (init) {
55 Initialize(txn_options);
56 }
57 }
58
Initialize(const TransactionOptions & txn_options)59 void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
60 txn_id_ = GenTxnID();
61
62 txn_state_ = STARTED;
63
64 deadlock_detect_ = txn_options.deadlock_detect;
65 deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
66 write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
67 skip_concurrency_control_ = txn_options.skip_concurrency_control;
68
69 lock_timeout_ = txn_options.lock_timeout * 1000;
70 if (lock_timeout_ < 0) {
71 // Lock timeout not set, use default
72 lock_timeout_ =
73 txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
74 }
75
76 if (txn_options.expiration >= 0) {
77 expiration_time_ = start_time_ + txn_options.expiration * 1000;
78 } else {
79 expiration_time_ = 0;
80 }
81
82 if (txn_options.set_snapshot) {
83 SetSnapshot();
84 }
85
86 if (expiration_time_ > 0) {
87 txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
88 }
89 use_only_the_last_commit_time_batch_for_recovery_ =
90 txn_options.use_only_the_last_commit_time_batch_for_recovery;
91 }
92
~PessimisticTransaction()93 PessimisticTransaction::~PessimisticTransaction() {
94 txn_db_impl_->UnLock(this, &GetTrackedKeys());
95 if (expiration_time_ > 0) {
96 txn_db_impl_->RemoveExpirableTransaction(txn_id_);
97 }
98 if (!name_.empty() && txn_state_ != COMMITED) {
99 txn_db_impl_->UnregisterTransaction(this);
100 }
101 }
102
Clear()103 void PessimisticTransaction::Clear() {
104 txn_db_impl_->UnLock(this, &GetTrackedKeys());
105 TransactionBaseImpl::Clear();
106 }
107
Reinitialize(TransactionDB * txn_db,const WriteOptions & write_options,const TransactionOptions & txn_options)108 void PessimisticTransaction::Reinitialize(
109 TransactionDB* txn_db, const WriteOptions& write_options,
110 const TransactionOptions& txn_options) {
111 if (!name_.empty() && txn_state_ != COMMITED) {
112 txn_db_impl_->UnregisterTransaction(this);
113 }
114 TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
115 Initialize(txn_options);
116 }
117
IsExpired() const118 bool PessimisticTransaction::IsExpired() const {
119 if (expiration_time_ > 0) {
120 if (db_->GetEnv()->NowMicros() >= expiration_time_) {
121 // Transaction is expired.
122 return true;
123 }
124 }
125
126 return false;
127 }
128
WriteCommittedTxn(TransactionDB * txn_db,const WriteOptions & write_options,const TransactionOptions & txn_options)129 WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
130 const WriteOptions& write_options,
131 const TransactionOptions& txn_options)
132 : PessimisticTransaction(txn_db, write_options, txn_options){};
133
CommitBatch(WriteBatch * batch)134 Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
135 TransactionKeyMap keys_to_unlock;
136 Status s = LockBatch(batch, &keys_to_unlock);
137
138 if (!s.ok()) {
139 return s;
140 }
141
142 bool can_commit = false;
143
144 if (IsExpired()) {
145 s = Status::Expired();
146 } else if (expiration_time_ > 0) {
147 TransactionState expected = STARTED;
148 can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,
149 AWAITING_COMMIT);
150 } else if (txn_state_ == STARTED) {
151 // lock stealing is not a concern
152 can_commit = true;
153 }
154
155 if (can_commit) {
156 txn_state_.store(AWAITING_COMMIT);
157 s = CommitBatchInternal(batch);
158 if (s.ok()) {
159 txn_state_.store(COMMITED);
160 }
161 } else if (txn_state_ == LOCKS_STOLEN) {
162 s = Status::Expired();
163 } else {
164 s = Status::InvalidArgument("Transaction is not in state for commit.");
165 }
166
167 txn_db_impl_->UnLock(this, &keys_to_unlock);
168
169 return s;
170 }
171
Prepare()172 Status PessimisticTransaction::Prepare() {
173 Status s;
174
175 if (name_.empty()) {
176 return Status::InvalidArgument(
177 "Cannot prepare a transaction that has not been named.");
178 }
179
180 if (IsExpired()) {
181 return Status::Expired();
182 }
183
184 bool can_prepare = false;
185
186 if (expiration_time_ > 0) {
187 // must concern ourselves with expiraton and/or lock stealing
188 // need to compare/exchange bc locks could be stolen under us here
189 TransactionState expected = STARTED;
190 can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,
191 AWAITING_PREPARE);
192 } else if (txn_state_ == STARTED) {
193 // expiration and lock stealing is not possible
194 can_prepare = true;
195 }
196
197 if (can_prepare) {
198 txn_state_.store(AWAITING_PREPARE);
199 // transaction can't expire after preparation
200 expiration_time_ = 0;
201 assert(log_number_ == 0 ||
202 txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
203
204 s = PrepareInternal();
205 if (s.ok()) {
206 txn_state_.store(PREPARED);
207 }
208 } else if (txn_state_ == LOCKS_STOLEN) {
209 s = Status::Expired();
210 } else if (txn_state_ == PREPARED) {
211 s = Status::InvalidArgument("Transaction has already been prepared.");
212 } else if (txn_state_ == COMMITED) {
213 s = Status::InvalidArgument("Transaction has already been committed.");
214 } else if (txn_state_ == ROLLEDBACK) {
215 s = Status::InvalidArgument("Transaction has already been rolledback.");
216 } else {
217 s = Status::InvalidArgument("Transaction is not in state for commit.");
218 }
219
220 return s;
221 }
222
PrepareInternal()223 Status WriteCommittedTxn::PrepareInternal() {
224 WriteOptions write_options = write_options_;
225 write_options.disableWAL = false;
226 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
227 class MarkLogCallback : public PreReleaseCallback {
228 public:
229 MarkLogCallback(DBImpl* db, bool two_write_queues)
230 : db_(db), two_write_queues_(two_write_queues) {
231 (void)two_write_queues_; // to silence unused private field warning
232 }
233 virtual Status Callback(SequenceNumber, bool is_mem_disabled,
234 uint64_t log_number, size_t /*index*/,
235 size_t /*total*/) override {
236 #ifdef NDEBUG
237 (void)is_mem_disabled;
238 #endif
239 assert(log_number != 0);
240 assert(!two_write_queues_ || is_mem_disabled); // implies the 2nd queue
241 db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number);
242 return Status::OK();
243 }
244
245 private:
246 DBImpl* db_;
247 bool two_write_queues_;
248 } mark_log_callback(db_impl_,
249 db_impl_->immutable_db_options().two_write_queues);
250
251 WriteCallback* const kNoWriteCallback = nullptr;
252 const uint64_t kRefNoLog = 0;
253 const bool kDisableMemtable = true;
254 SequenceNumber* const KIgnoreSeqUsed = nullptr;
255 const size_t kNoBatchCount = 0;
256 Status s = db_impl_->WriteImpl(
257 write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback,
258 &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
259 &mark_log_callback);
260 return s;
261 }
262
Commit()263 Status PessimisticTransaction::Commit() {
264 Status s;
265 bool commit_without_prepare = false;
266 bool commit_prepared = false;
267
268 if (IsExpired()) {
269 return Status::Expired();
270 }
271
272 if (expiration_time_ > 0) {
273 // we must atomicaly compare and exchange the state here because at
274 // this state in the transaction it is possible for another thread
275 // to change our state out from under us in the even that we expire and have
276 // our locks stolen. In this case the only valid state is STARTED because
277 // a state of PREPARED would have a cleared expiration_time_.
278 TransactionState expected = STARTED;
279 commit_without_prepare = std::atomic_compare_exchange_strong(
280 &txn_state_, &expected, AWAITING_COMMIT);
281 TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
282 } else if (txn_state_ == PREPARED) {
283 // expiration and lock stealing is not a concern
284 commit_prepared = true;
285 } else if (txn_state_ == STARTED) {
286 // expiration and lock stealing is not a concern
287 commit_without_prepare = true;
288 // TODO(myabandeh): what if the user mistakenly forgets prepare? We should
289 // add an option so that the user explictly express the intention of
290 // skipping the prepare phase.
291 }
292
293 if (commit_without_prepare) {
294 assert(!commit_prepared);
295 if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
296 s = Status::InvalidArgument(
297 "Commit-time batch contains values that will not be committed.");
298 } else {
299 txn_state_.store(AWAITING_COMMIT);
300 if (log_number_ > 0) {
301 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
302 log_number_);
303 }
304 s = CommitWithoutPrepareInternal();
305 if (!name_.empty()) {
306 txn_db_impl_->UnregisterTransaction(this);
307 }
308 Clear();
309 if (s.ok()) {
310 txn_state_.store(COMMITED);
311 }
312 }
313 } else if (commit_prepared) {
314 txn_state_.store(AWAITING_COMMIT);
315
316 s = CommitInternal();
317
318 if (!s.ok()) {
319 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
320 "Commit write failed");
321 return s;
322 }
323
324 // FindObsoleteFiles must now look to the memtables
325 // to determine what prep logs must be kept around,
326 // not the prep section heap.
327 assert(log_number_ > 0);
328 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
329 log_number_);
330 txn_db_impl_->UnregisterTransaction(this);
331
332 Clear();
333 txn_state_.store(COMMITED);
334 } else if (txn_state_ == LOCKS_STOLEN) {
335 s = Status::Expired();
336 } else if (txn_state_ == COMMITED) {
337 s = Status::InvalidArgument("Transaction has already been committed.");
338 } else if (txn_state_ == ROLLEDBACK) {
339 s = Status::InvalidArgument("Transaction has already been rolledback.");
340 } else {
341 s = Status::InvalidArgument("Transaction is not in state for commit.");
342 }
343
344 return s;
345 }
346
CommitWithoutPrepareInternal()347 Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
348 uint64_t seq_used = kMaxSequenceNumber;
349 auto s =
350 db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),
351 /*callback*/ nullptr, /*log_used*/ nullptr,
352 /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
353 assert(!s.ok() || seq_used != kMaxSequenceNumber);
354 if (s.ok()) {
355 SetId(seq_used);
356 }
357 return s;
358 }
359
CommitBatchInternal(WriteBatch * batch,size_t)360 Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
361 uint64_t seq_used = kMaxSequenceNumber;
362 auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr,
363 /*log_used*/ nullptr, /*log_ref*/ 0,
364 /*disable_memtable*/ false, &seq_used);
365 assert(!s.ok() || seq_used != kMaxSequenceNumber);
366 if (s.ok()) {
367 SetId(seq_used);
368 }
369 return s;
370 }
371
CommitInternal()372 Status WriteCommittedTxn::CommitInternal() {
373 // We take the commit-time batch and append the Commit marker.
374 // The Memtable will ignore the Commit marker in non-recovery mode
375 WriteBatch* working_batch = GetCommitTimeWriteBatch();
376 WriteBatchInternal::MarkCommit(working_batch, name_);
377
378 // any operations appended to this working_batch will be ignored from WAL
379 working_batch->MarkWalTerminationPoint();
380
381 // insert prepared batch into Memtable only skipping WAL.
382 // Memtable will ignore BeginPrepare/EndPrepare markers
383 // in non recovery mode and simply insert the values
384 WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
385
386 uint64_t seq_used = kMaxSequenceNumber;
387 auto s =
388 db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
389 /*log_used*/ nullptr, /*log_ref*/ log_number_,
390 /*disable_memtable*/ false, &seq_used);
391 assert(!s.ok() || seq_used != kMaxSequenceNumber);
392 if (s.ok()) {
393 SetId(seq_used);
394 }
395 return s;
396 }
397
Rollback()398 Status PessimisticTransaction::Rollback() {
399 Status s;
400 if (txn_state_ == PREPARED) {
401 txn_state_.store(AWAITING_ROLLBACK);
402
403 s = RollbackInternal();
404
405 if (s.ok()) {
406 // we do not need to keep our prepared section around
407 assert(log_number_ > 0);
408 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
409 log_number_);
410 Clear();
411 txn_state_.store(ROLLEDBACK);
412 }
413 } else if (txn_state_ == STARTED) {
414 if (log_number_ > 0) {
415 assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
416 assert(GetId() > 0);
417 s = RollbackInternal();
418
419 if (s.ok()) {
420 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
421 log_number_);
422 }
423 }
424 // prepare couldn't have taken place
425 Clear();
426 } else if (txn_state_ == COMMITED) {
427 s = Status::InvalidArgument("This transaction has already been committed.");
428 } else {
429 s = Status::InvalidArgument(
430 "Two phase transaction is not in state for rollback.");
431 }
432
433 return s;
434 }
435
RollbackInternal()436 Status WriteCommittedTxn::RollbackInternal() {
437 WriteBatch rollback_marker;
438 WriteBatchInternal::MarkRollback(&rollback_marker, name_);
439 auto s = db_impl_->WriteImpl(write_options_, &rollback_marker);
440 return s;
441 }
442
RollbackToSavePoint()443 Status PessimisticTransaction::RollbackToSavePoint() {
444 if (txn_state_ != STARTED) {
445 return Status::InvalidArgument("Transaction is beyond state for rollback.");
446 }
447
448 // Unlock any keys locked since last transaction
449 const std::unique_ptr<TransactionKeyMap>& keys =
450 GetTrackedKeysSinceSavePoint();
451
452 if (keys) {
453 txn_db_impl_->UnLock(this, keys.get());
454 }
455
456 return TransactionBaseImpl::RollbackToSavePoint();
457 }
458
459 // Lock all keys in this batch.
460 // On success, caller should unlock keys_to_unlock
LockBatch(WriteBatch * batch,TransactionKeyMap * keys_to_unlock)461 Status PessimisticTransaction::LockBatch(WriteBatch* batch,
462 TransactionKeyMap* keys_to_unlock) {
463 class Handler : public WriteBatch::Handler {
464 public:
465 // Sorted map of column_family_id to sorted set of keys.
466 // Since LockBatch() always locks keys in sorted order, it cannot deadlock
467 // with itself. We're not using a comparator here since it doesn't matter
468 // what the sorting is as long as it's consistent.
469 std::map<uint32_t, std::set<std::string>> keys_;
470
471 Handler() {}
472
473 void RecordKey(uint32_t column_family_id, const Slice& key) {
474 std::string key_str = key.ToString();
475
476 auto& cfh_keys = keys_[column_family_id];
477 auto iter = cfh_keys.find(key_str);
478 if (iter == cfh_keys.end()) {
479 // key not yet seen, store it.
480 cfh_keys.insert({std::move(key_str)});
481 }
482 }
483
484 Status PutCF(uint32_t column_family_id, const Slice& key,
485 const Slice& /* unused */) override {
486 RecordKey(column_family_id, key);
487 return Status::OK();
488 }
489 Status MergeCF(uint32_t column_family_id, const Slice& key,
490 const Slice& /* unused */) override {
491 RecordKey(column_family_id, key);
492 return Status::OK();
493 }
494 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
495 RecordKey(column_family_id, key);
496 return Status::OK();
497 }
498 };
499
500 // Iterating on this handler will add all keys in this batch into keys
501 Handler handler;
502 batch->Iterate(&handler);
503
504 Status s;
505
506 // Attempt to lock all keys
507 for (const auto& cf_iter : handler.keys_) {
508 uint32_t cfh_id = cf_iter.first;
509 auto& cfh_keys = cf_iter.second;
510
511 for (const auto& key_iter : cfh_keys) {
512 const std::string& key = key_iter;
513
514 s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);
515 if (!s.ok()) {
516 break;
517 }
518 TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
519 false, true /* exclusive */);
520 }
521
522 if (!s.ok()) {
523 break;
524 }
525 }
526
527 if (!s.ok()) {
528 txn_db_impl_->UnLock(this, keys_to_unlock);
529 }
530
531 return s;
532 }
533
534 // Attempt to lock this key.
535 // Returns OK if the key has been successfully locked. Non-ok, otherwise.
536 // If check_shapshot is true and this transaction has a snapshot set,
537 // this key will only be locked if there have been no writes to this key since
538 // the snapshot time.
TryLock(ColumnFamilyHandle * column_family,const Slice & key,bool read_only,bool exclusive,const bool do_validate,const bool assume_tracked)539 Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
540 const Slice& key, bool read_only,
541 bool exclusive, const bool do_validate,
542 const bool assume_tracked) {
543 assert(!assume_tracked || !do_validate);
544 Status s;
545 if (UNLIKELY(skip_concurrency_control_)) {
546 return s;
547 }
548 uint32_t cfh_id = GetColumnFamilyID(column_family);
549 std::string key_str = key.ToString();
550 bool previously_locked;
551 bool lock_upgrade = false;
552
553 // lock this key if this transactions hasn't already locked it
554 SequenceNumber tracked_at_seq = kMaxSequenceNumber;
555
556 const auto& tracked_keys = GetTrackedKeys();
557 const auto tracked_keys_cf = tracked_keys.find(cfh_id);
558 if (tracked_keys_cf == tracked_keys.end()) {
559 previously_locked = false;
560 } else {
561 auto iter = tracked_keys_cf->second.find(key_str);
562 if (iter == tracked_keys_cf->second.end()) {
563 previously_locked = false;
564 } else {
565 if (!iter->second.exclusive && exclusive) {
566 lock_upgrade = true;
567 }
568 previously_locked = true;
569 tracked_at_seq = iter->second.seq;
570 }
571 }
572
573 // Lock this key if this transactions hasn't already locked it or we require
574 // an upgrade.
575 if (!previously_locked || lock_upgrade) {
576 s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
577 }
578
579 SetSnapshotIfNeeded();
580
581 // Even though we do not care about doing conflict checking for this write,
582 // we still need to take a lock to make sure we do not cause a conflict with
583 // some other write. However, we do not need to check if there have been
584 // any writes since this transaction's snapshot.
585 // TODO(agiardullo): could optimize by supporting shared txn locks in the
586 // future
587 if (!do_validate || snapshot_ == nullptr) {
588 if (assume_tracked && !previously_locked) {
589 s = Status::InvalidArgument(
590 "assume_tracked is set but it is not tracked yet");
591 }
592 // Need to remember the earliest sequence number that we know that this
593 // key has not been modified after. This is useful if this same
594 // transaction
595 // later tries to lock this key again.
596 if (tracked_at_seq == kMaxSequenceNumber) {
597 // Since we haven't checked a snapshot, we only know this key has not
598 // been modified since after we locked it.
599 // Note: when last_seq_same_as_publish_seq_==false this is less than the
600 // latest allocated seq but it is ok since i) this is just a heuristic
601 // used only as a hint to avoid actual check for conflicts, ii) this would
602 // cause a false positive only if the snapthot is taken right after the
603 // lock, which would be an unusual sequence.
604 tracked_at_seq = db_->GetLatestSequenceNumber();
605 }
606 } else {
607 // If a snapshot is set, we need to make sure the key hasn't been modified
608 // since the snapshot. This must be done after we locked the key.
609 // If we already have validated an earilier snapshot it must has been
610 // reflected in tracked_at_seq and ValidateSnapshot will return OK.
611 if (s.ok()) {
612 s = ValidateSnapshot(column_family, key, &tracked_at_seq);
613
614 if (!s.ok()) {
615 // Failed to validate key
616 if (!previously_locked) {
617 // Unlock key we just locked
618 if (lock_upgrade) {
619 s = txn_db_impl_->TryLock(this, cfh_id, key_str,
620 false /* exclusive */);
621 assert(s.ok());
622 } else {
623 txn_db_impl_->UnLock(this, cfh_id, key.ToString());
624 }
625 }
626 }
627 }
628 }
629
630 if (s.ok()) {
631 // We must track all the locked keys so that we can unlock them later. If
632 // the key is already locked, this func will update some stats on the
633 // tracked key. It could also update the tracked_at_seq if it is lower
634 // than the existing tracked key seq. These stats are necessary for
635 // RollbackToSavePoint to determine whether a key can be safely removed
636 // from tracked_keys_. Removal can only be done if a key was only locked
637 // during the current savepoint.
638 //
639 // Recall that if assume_tracked is true, we assume that TrackKey has been
640 // called previously since the last savepoint, with the same exclusive
641 // setting, and at a lower sequence number, so skipping here should be
642 // safe.
643 if (!assume_tracked) {
644 TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
645 } else {
646 #ifndef NDEBUG
647 assert(tracked_keys_cf->second.count(key_str) > 0);
648 const auto& info = tracked_keys_cf->second.find(key_str)->second;
649 assert(info.seq <= tracked_at_seq);
650 assert(info.exclusive == exclusive);
651 #endif
652 }
653 }
654
655 return s;
656 }
657
658 // Return OK() if this key has not been modified more recently than the
659 // transaction snapshot_.
660 // tracked_at_seq is the global seq at which we either locked the key or already
661 // have done ValidateSnapshot.
ValidateSnapshot(ColumnFamilyHandle * column_family,const Slice & key,SequenceNumber * tracked_at_seq)662 Status PessimisticTransaction::ValidateSnapshot(
663 ColumnFamilyHandle* column_family, const Slice& key,
664 SequenceNumber* tracked_at_seq) {
665 assert(snapshot_);
666
667 SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
668 if (*tracked_at_seq <= snap_seq) {
669 // If the key has been previous validated (or locked) at a sequence number
670 // earlier than the current snapshot's sequence number, we already know it
671 // has not been modified aftter snap_seq either.
672 return Status::OK();
673 }
674 // Otherwise we have either
675 // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
676 // 2: snap_seq < tracked_at_seq: last time we lock the key was via
677 // do_validate=false which means we had skipped ValidateSnapshot. In both
678 // cases we should do ValidateSnapshot now.
679
680 *tracked_at_seq = snap_seq;
681
682 ColumnFamilyHandle* cfh =
683 column_family ? column_family : db_impl_->DefaultColumnFamily();
684
685 return TransactionUtil::CheckKeyForConflicts(
686 db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
687 }
688
TryStealingLocks()689 bool PessimisticTransaction::TryStealingLocks() {
690 assert(IsExpired());
691 TransactionState expected = STARTED;
692 return std::atomic_compare_exchange_strong(&txn_state_, &expected,
693 LOCKS_STOLEN);
694 }
695
UnlockGetForUpdate(ColumnFamilyHandle * column_family,const Slice & key)696 void PessimisticTransaction::UnlockGetForUpdate(
697 ColumnFamilyHandle* column_family, const Slice& key) {
698 txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
699 }
700
SetName(const TransactionName & name)701 Status PessimisticTransaction::SetName(const TransactionName& name) {
702 Status s;
703 if (txn_state_ == STARTED) {
704 if (name_.length()) {
705 s = Status::InvalidArgument("Transaction has already been named.");
706 } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {
707 s = Status::InvalidArgument("Transaction name must be unique.");
708 } else if (name.length() < 1 || name.length() > 512) {
709 s = Status::InvalidArgument(
710 "Transaction name length must be between 1 and 512 chars.");
711 } else {
712 name_ = name;
713 txn_db_impl_->RegisterTransaction(this);
714 }
715 } else {
716 s = Status::InvalidArgument("Transaction is beyond state for naming.");
717 }
718 return s;
719 }
720
721 } // namespace ROCKSDB_NAMESPACE
722
723 #endif // ROCKSDB_LITE
724