1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/transaction_base.h"
9 
10 #include <cinttypes>
11 
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/status.h"
17 #include "util/cast_util.h"
18 #include "util/string_util.h"
19 #include "utilities/transactions/lock/lock_tracker.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
TransactionBaseImpl(DB * db,const WriteOptions & write_options,const LockTrackerFactory & lock_tracker_factory)23 TransactionBaseImpl::TransactionBaseImpl(
24     DB* db, const WriteOptions& write_options,
25     const LockTrackerFactory& lock_tracker_factory)
26     : db_(db),
27       dbimpl_(static_cast_with_check<DBImpl>(db)),
28       write_options_(write_options),
29       cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
30       lock_tracker_factory_(lock_tracker_factory),
31       start_time_(dbimpl_->GetSystemClock()->NowMicros()),
32       write_batch_(cmp_, 0, true, 0),
33       tracked_locks_(lock_tracker_factory_.Create()),
34       indexing_enabled_(true) {
35   assert(dynamic_cast<DBImpl*>(db_) != nullptr);
36   log_number_ = 0;
37   if (dbimpl_->allow_2pc()) {
38     InitWriteBatch();
39   }
40 }
41 
~TransactionBaseImpl()42 TransactionBaseImpl::~TransactionBaseImpl() {
43   // Release snapshot if snapshot is set
44   SetSnapshotInternal(nullptr);
45 }
46 
Clear()47 void TransactionBaseImpl::Clear() {
48   save_points_.reset(nullptr);
49   write_batch_.Clear();
50   commit_time_batch_.Clear();
51   tracked_locks_->Clear();
52   num_puts_ = 0;
53   num_deletes_ = 0;
54   num_merges_ = 0;
55 
56   if (dbimpl_->allow_2pc()) {
57     InitWriteBatch();
58   }
59 }
60 
Reinitialize(DB * db,const WriteOptions & write_options)61 void TransactionBaseImpl::Reinitialize(DB* db,
62                                        const WriteOptions& write_options) {
63   Clear();
64   ClearSnapshot();
65   id_ = 0;
66   db_ = db;
67   name_.clear();
68   log_number_ = 0;
69   write_options_ = write_options;
70   start_time_ = dbimpl_->GetSystemClock()->NowMicros();
71   indexing_enabled_ = true;
72   cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
73 }
74 
SetSnapshot()75 void TransactionBaseImpl::SetSnapshot() {
76   const Snapshot* snapshot = dbimpl_->GetSnapshotForWriteConflictBoundary();
77   SetSnapshotInternal(snapshot);
78 }
79 
SetSnapshotInternal(const Snapshot * snapshot)80 void TransactionBaseImpl::SetSnapshotInternal(const Snapshot* snapshot) {
81   // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to
82   // be released, not deleted when it is no longer referenced.
83   snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot,
84                                       this, std::placeholders::_1, db_));
85   snapshot_needed_ = false;
86   snapshot_notifier_ = nullptr;
87 }
88 
SetSnapshotOnNextOperation(std::shared_ptr<TransactionNotifier> notifier)89 void TransactionBaseImpl::SetSnapshotOnNextOperation(
90     std::shared_ptr<TransactionNotifier> notifier) {
91   snapshot_needed_ = true;
92   snapshot_notifier_ = notifier;
93 }
94 
SetSnapshotIfNeeded()95 void TransactionBaseImpl::SetSnapshotIfNeeded() {
96   if (snapshot_needed_) {
97     std::shared_ptr<TransactionNotifier> notifier = snapshot_notifier_;
98     SetSnapshot();
99     if (notifier != nullptr) {
100       notifier->SnapshotCreated(GetSnapshot());
101     }
102   }
103 }
104 
TryLock(ColumnFamilyHandle * column_family,const SliceParts & key,bool read_only,bool exclusive,const bool do_validate,const bool assume_tracked)105 Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
106                                     const SliceParts& key, bool read_only,
107                                     bool exclusive, const bool do_validate,
108                                     const bool assume_tracked) {
109   size_t key_size = 0;
110   for (int i = 0; i < key.num_parts; ++i) {
111     key_size += key.parts[i].size();
112   }
113 
114   std::string str;
115   str.reserve(key_size);
116 
117   for (int i = 0; i < key.num_parts; ++i) {
118     str.append(key.parts[i].data(), key.parts[i].size());
119   }
120 
121   return TryLock(column_family, str, read_only, exclusive, do_validate,
122                  assume_tracked);
123 }
124 
SetSavePoint()125 void TransactionBaseImpl::SetSavePoint() {
126   if (save_points_ == nullptr) {
127     save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>());
128   }
129   save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
130                         num_puts_, num_deletes_, num_merges_,
131                         lock_tracker_factory_);
132   write_batch_.SetSavePoint();
133 }
134 
RollbackToSavePoint()135 Status TransactionBaseImpl::RollbackToSavePoint() {
136   if (save_points_ != nullptr && save_points_->size() > 0) {
137     // Restore saved SavePoint
138     TransactionBaseImpl::SavePoint& save_point = save_points_->top();
139     snapshot_ = save_point.snapshot_;
140     snapshot_needed_ = save_point.snapshot_needed_;
141     snapshot_notifier_ = save_point.snapshot_notifier_;
142     num_puts_ = save_point.num_puts_;
143     num_deletes_ = save_point.num_deletes_;
144     num_merges_ = save_point.num_merges_;
145 
146     // Rollback batch
147     Status s = write_batch_.RollbackToSavePoint();
148     assert(s.ok());
149 
150     // Rollback any keys that were tracked since the last savepoint
151     tracked_locks_->Subtract(*save_point.new_locks_);
152 
153     save_points_->pop();
154 
155     return s;
156   } else {
157     assert(write_batch_.RollbackToSavePoint().IsNotFound());
158     return Status::NotFound();
159   }
160 }
161 
PopSavePoint()162 Status TransactionBaseImpl::PopSavePoint() {
163   if (save_points_ == nullptr ||
164       save_points_->empty()) {
165     // No SavePoint yet.
166     assert(write_batch_.PopSavePoint().IsNotFound());
167     return Status::NotFound();
168   }
169 
170   assert(!save_points_->empty());
171   // If there is another savepoint A below the current savepoint B, then A needs
172   // to inherit tracked_keys in B so that if we rollback to savepoint A, we
173   // remember to unlock keys in B. If there is no other savepoint below, then we
174   // can safely discard savepoint info.
175   if (save_points_->size() == 1) {
176     save_points_->pop();
177   } else {
178     TransactionBaseImpl::SavePoint top(lock_tracker_factory_);
179     std::swap(top, save_points_->top());
180     save_points_->pop();
181 
182     save_points_->top().new_locks_->Merge(*top.new_locks_);
183   }
184 
185   return write_batch_.PopSavePoint();
186 }
187 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value)188 Status TransactionBaseImpl::Get(const ReadOptions& read_options,
189                                 ColumnFamilyHandle* column_family,
190                                 const Slice& key, std::string* value) {
191   assert(value != nullptr);
192   PinnableSlice pinnable_val(value);
193   assert(!pinnable_val.IsPinned());
194   auto s = Get(read_options, column_family, key, &pinnable_val);
195   if (s.ok() && pinnable_val.IsPinned()) {
196     value->assign(pinnable_val.data(), pinnable_val.size());
197   }  // else value is already assigned
198   return s;
199 }
200 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)201 Status TransactionBaseImpl::Get(const ReadOptions& read_options,
202                                 ColumnFamilyHandle* column_family,
203                                 const Slice& key, PinnableSlice* pinnable_val) {
204   return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
205                                         pinnable_val);
206 }
207 
GetForUpdate(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,bool exclusive,const bool do_validate)208 Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
209                                          ColumnFamilyHandle* column_family,
210                                          const Slice& key, std::string* value,
211                                          bool exclusive,
212                                          const bool do_validate) {
213   if (!do_validate && read_options.snapshot != nullptr) {
214     return Status::InvalidArgument(
215         "If do_validate is false then GetForUpdate with snapshot is not "
216         "defined.");
217   }
218   Status s =
219       TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
220 
221   if (s.ok() && value != nullptr) {
222     assert(value != nullptr);
223     PinnableSlice pinnable_val(value);
224     assert(!pinnable_val.IsPinned());
225     s = Get(read_options, column_family, key, &pinnable_val);
226     if (s.ok() && pinnable_val.IsPinned()) {
227       value->assign(pinnable_val.data(), pinnable_val.size());
228     }  // else value is already assigned
229   }
230   return s;
231 }
232 
GetForUpdate(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val,bool exclusive,const bool do_validate)233 Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
234                                          ColumnFamilyHandle* column_family,
235                                          const Slice& key,
236                                          PinnableSlice* pinnable_val,
237                                          bool exclusive,
238                                          const bool do_validate) {
239   if (!do_validate && read_options.snapshot != nullptr) {
240     return Status::InvalidArgument(
241         "If do_validate is false then GetForUpdate with snapshot is not "
242         "defined.");
243   }
244   Status s =
245       TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
246 
247   if (s.ok() && pinnable_val != nullptr) {
248     s = Get(read_options, column_family, key, pinnable_val);
249   }
250   return s;
251 }
252 
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)253 std::vector<Status> TransactionBaseImpl::MultiGet(
254     const ReadOptions& read_options,
255     const std::vector<ColumnFamilyHandle*>& column_family,
256     const std::vector<Slice>& keys, std::vector<std::string>* values) {
257   size_t num_keys = keys.size();
258   values->resize(num_keys);
259 
260   std::vector<Status> stat_list(num_keys);
261   for (size_t i = 0; i < num_keys; ++i) {
262     stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
263   }
264 
265   return stat_list;
266 }
267 
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)268 void TransactionBaseImpl::MultiGet(const ReadOptions& read_options,
269                                    ColumnFamilyHandle* column_family,
270                                    const size_t num_keys, const Slice* keys,
271                                    PinnableSlice* values, Status* statuses,
272                                    const bool sorted_input) {
273   write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
274                                       num_keys, keys, values, statuses,
275                                       sorted_input);
276 }
277 
MultiGetForUpdate(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)278 std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
279     const ReadOptions& read_options,
280     const std::vector<ColumnFamilyHandle*>& column_family,
281     const std::vector<Slice>& keys, std::vector<std::string>* values) {
282   // Regardless of whether the MultiGet succeeded, track these keys.
283   size_t num_keys = keys.size();
284   values->resize(num_keys);
285 
286   // Lock all keys
287   for (size_t i = 0; i < num_keys; ++i) {
288     Status s = TryLock(column_family[i], keys[i], true /* read_only */,
289                        true /* exclusive */);
290     if (!s.ok()) {
291       // Fail entire multiget if we cannot lock all keys
292       return std::vector<Status>(num_keys, s);
293     }
294   }
295 
296   // TODO(agiardullo): optimize multiget?
297   std::vector<Status> stat_list(num_keys);
298   for (size_t i = 0; i < num_keys; ++i) {
299     stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
300   }
301 
302   return stat_list;
303 }
304 
GetIterator(const ReadOptions & read_options)305 Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
306   Iterator* db_iter = db_->NewIterator(read_options);
307   assert(db_iter);
308 
309   return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter,
310                                           &read_options);
311 }
312 
GetIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)313 Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
314                                            ColumnFamilyHandle* column_family) {
315   Iterator* db_iter = db_->NewIterator(read_options, column_family);
316   assert(db_iter);
317 
318   return write_batch_.NewIteratorWithBase(column_family, db_iter,
319                                           &read_options);
320 }
321 
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value,const bool assume_tracked)322 Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
323                                 const Slice& key, const Slice& value,
324                                 const bool assume_tracked) {
325   const bool do_validate = !assume_tracked;
326   Status s = TryLock(column_family, key, false /* read_only */,
327                      true /* exclusive */, do_validate, assume_tracked);
328 
329   if (s.ok()) {
330     s = GetBatchForWrite()->Put(column_family, key, value);
331     if (s.ok()) {
332       num_puts_++;
333     }
334   }
335 
336   return s;
337 }
338 
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value,const bool assume_tracked)339 Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
340                                 const SliceParts& key, const SliceParts& value,
341                                 const bool assume_tracked) {
342   const bool do_validate = !assume_tracked;
343   Status s = TryLock(column_family, key, false /* read_only */,
344                      true /* exclusive */, do_validate, assume_tracked);
345 
346   if (s.ok()) {
347     s = GetBatchForWrite()->Put(column_family, key, value);
348     if (s.ok()) {
349       num_puts_++;
350     }
351   }
352 
353   return s;
354 }
355 
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value,const bool assume_tracked)356 Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
357                                   const Slice& key, const Slice& value,
358                                   const bool assume_tracked) {
359   const bool do_validate = !assume_tracked;
360   Status s = TryLock(column_family, key, false /* read_only */,
361                      true /* exclusive */, do_validate, assume_tracked);
362 
363   if (s.ok()) {
364     s = GetBatchForWrite()->Merge(column_family, key, value);
365     if (s.ok()) {
366       num_merges_++;
367     }
368   }
369 
370   return s;
371 }
372 
Delete(ColumnFamilyHandle * column_family,const Slice & key,const bool assume_tracked)373 Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
374                                    const Slice& key,
375                                    const bool assume_tracked) {
376   const bool do_validate = !assume_tracked;
377   Status s = TryLock(column_family, key, false /* read_only */,
378                      true /* exclusive */, do_validate, assume_tracked);
379 
380   if (s.ok()) {
381     s = GetBatchForWrite()->Delete(column_family, key);
382     if (s.ok()) {
383       num_deletes_++;
384     }
385   }
386 
387   return s;
388 }
389 
Delete(ColumnFamilyHandle * column_family,const SliceParts & key,const bool assume_tracked)390 Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
391                                    const SliceParts& key,
392                                    const bool assume_tracked) {
393   const bool do_validate = !assume_tracked;
394   Status s = TryLock(column_family, key, false /* read_only */,
395                      true /* exclusive */, do_validate, assume_tracked);
396 
397   if (s.ok()) {
398     s = GetBatchForWrite()->Delete(column_family, key);
399     if (s.ok()) {
400       num_deletes_++;
401     }
402   }
403 
404   return s;
405 }
406 
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key,const bool assume_tracked)407 Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
408                                          const Slice& key,
409                                          const bool assume_tracked) {
410   const bool do_validate = !assume_tracked;
411   Status s = TryLock(column_family, key, false /* read_only */,
412                      true /* exclusive */, do_validate, assume_tracked);
413 
414   if (s.ok()) {
415     s = GetBatchForWrite()->SingleDelete(column_family, key);
416     if (s.ok()) {
417       num_deletes_++;
418     }
419   }
420 
421   return s;
422 }
423 
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key,const bool assume_tracked)424 Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
425                                          const SliceParts& key,
426                                          const bool assume_tracked) {
427   const bool do_validate = !assume_tracked;
428   Status s = TryLock(column_family, key, false /* read_only */,
429                      true /* exclusive */, do_validate, assume_tracked);
430 
431   if (s.ok()) {
432     s = GetBatchForWrite()->SingleDelete(column_family, key);
433     if (s.ok()) {
434       num_deletes_++;
435     }
436   }
437 
438   return s;
439 }
440 
PutUntracked(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)441 Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
442                                          const Slice& key, const Slice& value) {
443   Status s = TryLock(column_family, key, false /* read_only */,
444                      true /* exclusive */, false /* do_validate */);
445 
446   if (s.ok()) {
447     s = GetBatchForWrite()->Put(column_family, key, value);
448     if (s.ok()) {
449       num_puts_++;
450     }
451   }
452 
453   return s;
454 }
455 
PutUntracked(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)456 Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
457                                          const SliceParts& key,
458                                          const SliceParts& value) {
459   Status s = TryLock(column_family, key, false /* read_only */,
460                      true /* exclusive */, false /* do_validate */);
461 
462   if (s.ok()) {
463     s = GetBatchForWrite()->Put(column_family, key, value);
464     if (s.ok()) {
465       num_puts_++;
466     }
467   }
468 
469   return s;
470 }
471 
MergeUntracked(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)472 Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
473                                            const Slice& key,
474                                            const Slice& value) {
475   Status s = TryLock(column_family, key, false /* read_only */,
476                      true /* exclusive */, false /* do_validate */);
477 
478   if (s.ok()) {
479     s = GetBatchForWrite()->Merge(column_family, key, value);
480     if (s.ok()) {
481       num_merges_++;
482     }
483   }
484 
485   return s;
486 }
487 
DeleteUntracked(ColumnFamilyHandle * column_family,const Slice & key)488 Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
489                                             const Slice& key) {
490   Status s = TryLock(column_family, key, false /* read_only */,
491                      true /* exclusive */, false /* do_validate */);
492 
493   if (s.ok()) {
494     s = GetBatchForWrite()->Delete(column_family, key);
495     if (s.ok()) {
496       num_deletes_++;
497     }
498   }
499 
500   return s;
501 }
502 
DeleteUntracked(ColumnFamilyHandle * column_family,const SliceParts & key)503 Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
504                                             const SliceParts& key) {
505   Status s = TryLock(column_family, key, false /* read_only */,
506                      true /* exclusive */, false /* do_validate */);
507 
508   if (s.ok()) {
509     s = GetBatchForWrite()->Delete(column_family, key);
510     if (s.ok()) {
511       num_deletes_++;
512     }
513   }
514 
515   return s;
516 }
517 
SingleDeleteUntracked(ColumnFamilyHandle * column_family,const Slice & key)518 Status TransactionBaseImpl::SingleDeleteUntracked(
519     ColumnFamilyHandle* column_family, const Slice& key) {
520   Status s = TryLock(column_family, key, false /* read_only */,
521                      true /* exclusive */, false /* do_validate */);
522 
523   if (s.ok()) {
524     s = GetBatchForWrite()->SingleDelete(column_family, key);
525     if (s.ok()) {
526       num_deletes_++;
527     }
528   }
529 
530   return s;
531 }
532 
PutLogData(const Slice & blob)533 void TransactionBaseImpl::PutLogData(const Slice& blob) {
534   auto s = write_batch_.PutLogData(blob);
535   (void)s;
536   assert(s.ok());
537 }
538 
GetWriteBatch()539 WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
540   return &write_batch_;
541 }
542 
GetElapsedTime() const543 uint64_t TransactionBaseImpl::GetElapsedTime() const {
544   return (dbimpl_->GetSystemClock()->NowMicros() - start_time_) / 1000;
545 }
546 
GetNumPuts() const547 uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; }
548 
GetNumDeletes() const549 uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
550 
GetNumMerges() const551 uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
552 
GetNumKeys() const553 uint64_t TransactionBaseImpl::GetNumKeys() const {
554   return tracked_locks_->GetNumPointLocks();
555 }
556 
TrackKey(uint32_t cfh_id,const std::string & key,SequenceNumber seq,bool read_only,bool exclusive)557 void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
558                                    SequenceNumber seq, bool read_only,
559                                    bool exclusive) {
560   PointLockRequest r;
561   r.column_family_id = cfh_id;
562   r.key = key;
563   r.seq = seq;
564   r.read_only = read_only;
565   r.exclusive = exclusive;
566 
567   // Update map of all tracked keys for this transaction
568   tracked_locks_->Track(r);
569 
570   if (save_points_ != nullptr && !save_points_->empty()) {
571     // Update map of tracked keys in this SavePoint
572     save_points_->top().new_locks_->Track(r);
573   }
574 }
575 
576 // Gets the write batch that should be used for Put/Merge/Deletes.
577 //
578 // Returns either a WriteBatch or WriteBatchWithIndex depending on whether
579 // DisableIndexing() has been called.
GetBatchForWrite()580 WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() {
581   if (indexing_enabled_) {
582     // Use WriteBatchWithIndex
583     return &write_batch_;
584   } else {
585     // Don't use WriteBatchWithIndex. Return base WriteBatch.
586     return write_batch_.GetWriteBatch();
587   }
588 }
589 
ReleaseSnapshot(const Snapshot * snapshot,DB * db)590 void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
591   if (snapshot != nullptr) {
592     ROCKS_LOG_DETAILS(dbimpl_->immutable_db_options().info_log,
593                       "ReleaseSnapshot %" PRIu64 " Set",
594                       snapshot->GetSequenceNumber());
595     db->ReleaseSnapshot(snapshot);
596   }
597 }
598 
UndoGetForUpdate(ColumnFamilyHandle * column_family,const Slice & key)599 void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
600                                            const Slice& key) {
601   PointLockRequest r;
602   r.column_family_id = GetColumnFamilyID(column_family);
603   r.key = key.ToString();
604   r.read_only = true;
605 
606   bool can_untrack = false;
607   if (save_points_ != nullptr && !save_points_->empty()) {
608     // If there is no GetForUpdate of the key in this save point,
609     // then cannot untrack from the global lock tracker.
610     UntrackStatus s = save_points_->top().new_locks_->Untrack(r);
611     can_untrack = (s != UntrackStatus::NOT_TRACKED);
612   } else {
613     // No save point, so can untrack from the global lock tracker.
614     can_untrack = true;
615   }
616 
617   if (can_untrack) {
618     // If erased from the global tracker, then can unlock the key.
619     UntrackStatus s = tracked_locks_->Untrack(r);
620     bool can_unlock = (s == UntrackStatus::REMOVED);
621     if (can_unlock) {
622       UnlockGetForUpdate(column_family, key);
623     }
624   }
625 }
626 
RebuildFromWriteBatch(WriteBatch * src_batch)627 Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
628   struct IndexedWriteBatchBuilder : public WriteBatch::Handler {
629     Transaction* txn_;
630     DBImpl* db_;
631     IndexedWriteBatchBuilder(Transaction* txn, DBImpl* db)
632         : txn_(txn), db_(db) {
633       assert(dynamic_cast<TransactionBaseImpl*>(txn_) != nullptr);
634     }
635 
636     Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
637       return txn_->Put(db_->GetColumnFamilyHandle(cf), key, val);
638     }
639 
640     Status DeleteCF(uint32_t cf, const Slice& key) override {
641       return txn_->Delete(db_->GetColumnFamilyHandle(cf), key);
642     }
643 
644     Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
645       return txn_->SingleDelete(db_->GetColumnFamilyHandle(cf), key);
646     }
647 
648     Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
649       return txn_->Merge(db_->GetColumnFamilyHandle(cf), key, val);
650     }
651 
652     // this is used for reconstructing prepared transactions upon
653     // recovery. there should not be any meta markers in the batches
654     // we are processing.
655     Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
656 
657     Status MarkEndPrepare(const Slice&) override {
658       return Status::InvalidArgument();
659     }
660 
661     Status MarkCommit(const Slice&) override {
662       return Status::InvalidArgument();
663     }
664 
665     Status MarkRollback(const Slice&) override {
666       return Status::InvalidArgument();
667     }
668   };
669 
670   IndexedWriteBatchBuilder copycat(this, dbimpl_);
671   return src_batch->Iterate(&copycat);
672 }
673 
GetCommitTimeWriteBatch()674 WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() {
675   return &commit_time_batch_;
676 }
677 }  // namespace ROCKSDB_NAMESPACE
678 
679 #endif  // ROCKSDB_LITE
680