1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/transaction_base.h"
9
10 #include <cinttypes>
11
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/status.h"
17 #include "util/cast_util.h"
18 #include "util/string_util.h"
19 #include "utilities/transactions/lock/lock_tracker.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
TransactionBaseImpl(DB * db,const WriteOptions & write_options,const LockTrackerFactory & lock_tracker_factory)23 TransactionBaseImpl::TransactionBaseImpl(
24 DB* db, const WriteOptions& write_options,
25 const LockTrackerFactory& lock_tracker_factory)
26 : db_(db),
27 dbimpl_(static_cast_with_check<DBImpl>(db)),
28 write_options_(write_options),
29 cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
30 lock_tracker_factory_(lock_tracker_factory),
31 start_time_(dbimpl_->GetSystemClock()->NowMicros()),
32 write_batch_(cmp_, 0, true, 0),
33 tracked_locks_(lock_tracker_factory_.Create()),
34 indexing_enabled_(true) {
35 assert(dynamic_cast<DBImpl*>(db_) != nullptr);
36 log_number_ = 0;
37 if (dbimpl_->allow_2pc()) {
38 InitWriteBatch();
39 }
40 }
41
~TransactionBaseImpl()42 TransactionBaseImpl::~TransactionBaseImpl() {
43 // Release snapshot if snapshot is set
44 SetSnapshotInternal(nullptr);
45 }
46
Clear()47 void TransactionBaseImpl::Clear() {
48 save_points_.reset(nullptr);
49 write_batch_.Clear();
50 commit_time_batch_.Clear();
51 tracked_locks_->Clear();
52 num_puts_ = 0;
53 num_deletes_ = 0;
54 num_merges_ = 0;
55
56 if (dbimpl_->allow_2pc()) {
57 InitWriteBatch();
58 }
59 }
60
Reinitialize(DB * db,const WriteOptions & write_options)61 void TransactionBaseImpl::Reinitialize(DB* db,
62 const WriteOptions& write_options) {
63 Clear();
64 ClearSnapshot();
65 id_ = 0;
66 db_ = db;
67 name_.clear();
68 log_number_ = 0;
69 write_options_ = write_options;
70 start_time_ = dbimpl_->GetSystemClock()->NowMicros();
71 indexing_enabled_ = true;
72 cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
73 }
74
SetSnapshot()75 void TransactionBaseImpl::SetSnapshot() {
76 const Snapshot* snapshot = dbimpl_->GetSnapshotForWriteConflictBoundary();
77 SetSnapshotInternal(snapshot);
78 }
79
SetSnapshotInternal(const Snapshot * snapshot)80 void TransactionBaseImpl::SetSnapshotInternal(const Snapshot* snapshot) {
81 // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to
82 // be released, not deleted when it is no longer referenced.
83 snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot,
84 this, std::placeholders::_1, db_));
85 snapshot_needed_ = false;
86 snapshot_notifier_ = nullptr;
87 }
88
SetSnapshotOnNextOperation(std::shared_ptr<TransactionNotifier> notifier)89 void TransactionBaseImpl::SetSnapshotOnNextOperation(
90 std::shared_ptr<TransactionNotifier> notifier) {
91 snapshot_needed_ = true;
92 snapshot_notifier_ = notifier;
93 }
94
SetSnapshotIfNeeded()95 void TransactionBaseImpl::SetSnapshotIfNeeded() {
96 if (snapshot_needed_) {
97 std::shared_ptr<TransactionNotifier> notifier = snapshot_notifier_;
98 SetSnapshot();
99 if (notifier != nullptr) {
100 notifier->SnapshotCreated(GetSnapshot());
101 }
102 }
103 }
104
TryLock(ColumnFamilyHandle * column_family,const SliceParts & key,bool read_only,bool exclusive,const bool do_validate,const bool assume_tracked)105 Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
106 const SliceParts& key, bool read_only,
107 bool exclusive, const bool do_validate,
108 const bool assume_tracked) {
109 size_t key_size = 0;
110 for (int i = 0; i < key.num_parts; ++i) {
111 key_size += key.parts[i].size();
112 }
113
114 std::string str;
115 str.reserve(key_size);
116
117 for (int i = 0; i < key.num_parts; ++i) {
118 str.append(key.parts[i].data(), key.parts[i].size());
119 }
120
121 return TryLock(column_family, str, read_only, exclusive, do_validate,
122 assume_tracked);
123 }
124
SetSavePoint()125 void TransactionBaseImpl::SetSavePoint() {
126 if (save_points_ == nullptr) {
127 save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>());
128 }
129 save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
130 num_puts_, num_deletes_, num_merges_,
131 lock_tracker_factory_);
132 write_batch_.SetSavePoint();
133 }
134
RollbackToSavePoint()135 Status TransactionBaseImpl::RollbackToSavePoint() {
136 if (save_points_ != nullptr && save_points_->size() > 0) {
137 // Restore saved SavePoint
138 TransactionBaseImpl::SavePoint& save_point = save_points_->top();
139 snapshot_ = save_point.snapshot_;
140 snapshot_needed_ = save_point.snapshot_needed_;
141 snapshot_notifier_ = save_point.snapshot_notifier_;
142 num_puts_ = save_point.num_puts_;
143 num_deletes_ = save_point.num_deletes_;
144 num_merges_ = save_point.num_merges_;
145
146 // Rollback batch
147 Status s = write_batch_.RollbackToSavePoint();
148 assert(s.ok());
149
150 // Rollback any keys that were tracked since the last savepoint
151 tracked_locks_->Subtract(*save_point.new_locks_);
152
153 save_points_->pop();
154
155 return s;
156 } else {
157 assert(write_batch_.RollbackToSavePoint().IsNotFound());
158 return Status::NotFound();
159 }
160 }
161
PopSavePoint()162 Status TransactionBaseImpl::PopSavePoint() {
163 if (save_points_ == nullptr ||
164 save_points_->empty()) {
165 // No SavePoint yet.
166 assert(write_batch_.PopSavePoint().IsNotFound());
167 return Status::NotFound();
168 }
169
170 assert(!save_points_->empty());
171 // If there is another savepoint A below the current savepoint B, then A needs
172 // to inherit tracked_keys in B so that if we rollback to savepoint A, we
173 // remember to unlock keys in B. If there is no other savepoint below, then we
174 // can safely discard savepoint info.
175 if (save_points_->size() == 1) {
176 save_points_->pop();
177 } else {
178 TransactionBaseImpl::SavePoint top(lock_tracker_factory_);
179 std::swap(top, save_points_->top());
180 save_points_->pop();
181
182 save_points_->top().new_locks_->Merge(*top.new_locks_);
183 }
184
185 return write_batch_.PopSavePoint();
186 }
187
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value)188 Status TransactionBaseImpl::Get(const ReadOptions& read_options,
189 ColumnFamilyHandle* column_family,
190 const Slice& key, std::string* value) {
191 assert(value != nullptr);
192 PinnableSlice pinnable_val(value);
193 assert(!pinnable_val.IsPinned());
194 auto s = Get(read_options, column_family, key, &pinnable_val);
195 if (s.ok() && pinnable_val.IsPinned()) {
196 value->assign(pinnable_val.data(), pinnable_val.size());
197 } // else value is already assigned
198 return s;
199 }
200
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)201 Status TransactionBaseImpl::Get(const ReadOptions& read_options,
202 ColumnFamilyHandle* column_family,
203 const Slice& key, PinnableSlice* pinnable_val) {
204 return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
205 pinnable_val);
206 }
207
GetForUpdate(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,bool exclusive,const bool do_validate)208 Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
209 ColumnFamilyHandle* column_family,
210 const Slice& key, std::string* value,
211 bool exclusive,
212 const bool do_validate) {
213 if (!do_validate && read_options.snapshot != nullptr) {
214 return Status::InvalidArgument(
215 "If do_validate is false then GetForUpdate with snapshot is not "
216 "defined.");
217 }
218 Status s =
219 TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
220
221 if (s.ok() && value != nullptr) {
222 assert(value != nullptr);
223 PinnableSlice pinnable_val(value);
224 assert(!pinnable_val.IsPinned());
225 s = Get(read_options, column_family, key, &pinnable_val);
226 if (s.ok() && pinnable_val.IsPinned()) {
227 value->assign(pinnable_val.data(), pinnable_val.size());
228 } // else value is already assigned
229 }
230 return s;
231 }
232
GetForUpdate(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val,bool exclusive,const bool do_validate)233 Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
234 ColumnFamilyHandle* column_family,
235 const Slice& key,
236 PinnableSlice* pinnable_val,
237 bool exclusive,
238 const bool do_validate) {
239 if (!do_validate && read_options.snapshot != nullptr) {
240 return Status::InvalidArgument(
241 "If do_validate is false then GetForUpdate with snapshot is not "
242 "defined.");
243 }
244 Status s =
245 TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
246
247 if (s.ok() && pinnable_val != nullptr) {
248 s = Get(read_options, column_family, key, pinnable_val);
249 }
250 return s;
251 }
252
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)253 std::vector<Status> TransactionBaseImpl::MultiGet(
254 const ReadOptions& read_options,
255 const std::vector<ColumnFamilyHandle*>& column_family,
256 const std::vector<Slice>& keys, std::vector<std::string>* values) {
257 size_t num_keys = keys.size();
258 values->resize(num_keys);
259
260 std::vector<Status> stat_list(num_keys);
261 for (size_t i = 0; i < num_keys; ++i) {
262 stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
263 }
264
265 return stat_list;
266 }
267
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)268 void TransactionBaseImpl::MultiGet(const ReadOptions& read_options,
269 ColumnFamilyHandle* column_family,
270 const size_t num_keys, const Slice* keys,
271 PinnableSlice* values, Status* statuses,
272 const bool sorted_input) {
273 write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
274 num_keys, keys, values, statuses,
275 sorted_input);
276 }
277
MultiGetForUpdate(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)278 std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
279 const ReadOptions& read_options,
280 const std::vector<ColumnFamilyHandle*>& column_family,
281 const std::vector<Slice>& keys, std::vector<std::string>* values) {
282 // Regardless of whether the MultiGet succeeded, track these keys.
283 size_t num_keys = keys.size();
284 values->resize(num_keys);
285
286 // Lock all keys
287 for (size_t i = 0; i < num_keys; ++i) {
288 Status s = TryLock(column_family[i], keys[i], true /* read_only */,
289 true /* exclusive */);
290 if (!s.ok()) {
291 // Fail entire multiget if we cannot lock all keys
292 return std::vector<Status>(num_keys, s);
293 }
294 }
295
296 // TODO(agiardullo): optimize multiget?
297 std::vector<Status> stat_list(num_keys);
298 for (size_t i = 0; i < num_keys; ++i) {
299 stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
300 }
301
302 return stat_list;
303 }
304
GetIterator(const ReadOptions & read_options)305 Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
306 Iterator* db_iter = db_->NewIterator(read_options);
307 assert(db_iter);
308
309 return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter,
310 &read_options);
311 }
312
GetIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)313 Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
314 ColumnFamilyHandle* column_family) {
315 Iterator* db_iter = db_->NewIterator(read_options, column_family);
316 assert(db_iter);
317
318 return write_batch_.NewIteratorWithBase(column_family, db_iter,
319 &read_options);
320 }
321
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value,const bool assume_tracked)322 Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
323 const Slice& key, const Slice& value,
324 const bool assume_tracked) {
325 const bool do_validate = !assume_tracked;
326 Status s = TryLock(column_family, key, false /* read_only */,
327 true /* exclusive */, do_validate, assume_tracked);
328
329 if (s.ok()) {
330 s = GetBatchForWrite()->Put(column_family, key, value);
331 if (s.ok()) {
332 num_puts_++;
333 }
334 }
335
336 return s;
337 }
338
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value,const bool assume_tracked)339 Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
340 const SliceParts& key, const SliceParts& value,
341 const bool assume_tracked) {
342 const bool do_validate = !assume_tracked;
343 Status s = TryLock(column_family, key, false /* read_only */,
344 true /* exclusive */, do_validate, assume_tracked);
345
346 if (s.ok()) {
347 s = GetBatchForWrite()->Put(column_family, key, value);
348 if (s.ok()) {
349 num_puts_++;
350 }
351 }
352
353 return s;
354 }
355
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value,const bool assume_tracked)356 Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
357 const Slice& key, const Slice& value,
358 const bool assume_tracked) {
359 const bool do_validate = !assume_tracked;
360 Status s = TryLock(column_family, key, false /* read_only */,
361 true /* exclusive */, do_validate, assume_tracked);
362
363 if (s.ok()) {
364 s = GetBatchForWrite()->Merge(column_family, key, value);
365 if (s.ok()) {
366 num_merges_++;
367 }
368 }
369
370 return s;
371 }
372
Delete(ColumnFamilyHandle * column_family,const Slice & key,const bool assume_tracked)373 Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
374 const Slice& key,
375 const bool assume_tracked) {
376 const bool do_validate = !assume_tracked;
377 Status s = TryLock(column_family, key, false /* read_only */,
378 true /* exclusive */, do_validate, assume_tracked);
379
380 if (s.ok()) {
381 s = GetBatchForWrite()->Delete(column_family, key);
382 if (s.ok()) {
383 num_deletes_++;
384 }
385 }
386
387 return s;
388 }
389
Delete(ColumnFamilyHandle * column_family,const SliceParts & key,const bool assume_tracked)390 Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
391 const SliceParts& key,
392 const bool assume_tracked) {
393 const bool do_validate = !assume_tracked;
394 Status s = TryLock(column_family, key, false /* read_only */,
395 true /* exclusive */, do_validate, assume_tracked);
396
397 if (s.ok()) {
398 s = GetBatchForWrite()->Delete(column_family, key);
399 if (s.ok()) {
400 num_deletes_++;
401 }
402 }
403
404 return s;
405 }
406
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key,const bool assume_tracked)407 Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
408 const Slice& key,
409 const bool assume_tracked) {
410 const bool do_validate = !assume_tracked;
411 Status s = TryLock(column_family, key, false /* read_only */,
412 true /* exclusive */, do_validate, assume_tracked);
413
414 if (s.ok()) {
415 s = GetBatchForWrite()->SingleDelete(column_family, key);
416 if (s.ok()) {
417 num_deletes_++;
418 }
419 }
420
421 return s;
422 }
423
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key,const bool assume_tracked)424 Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
425 const SliceParts& key,
426 const bool assume_tracked) {
427 const bool do_validate = !assume_tracked;
428 Status s = TryLock(column_family, key, false /* read_only */,
429 true /* exclusive */, do_validate, assume_tracked);
430
431 if (s.ok()) {
432 s = GetBatchForWrite()->SingleDelete(column_family, key);
433 if (s.ok()) {
434 num_deletes_++;
435 }
436 }
437
438 return s;
439 }
440
PutUntracked(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)441 Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
442 const Slice& key, const Slice& value) {
443 Status s = TryLock(column_family, key, false /* read_only */,
444 true /* exclusive */, false /* do_validate */);
445
446 if (s.ok()) {
447 s = GetBatchForWrite()->Put(column_family, key, value);
448 if (s.ok()) {
449 num_puts_++;
450 }
451 }
452
453 return s;
454 }
455
PutUntracked(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)456 Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
457 const SliceParts& key,
458 const SliceParts& value) {
459 Status s = TryLock(column_family, key, false /* read_only */,
460 true /* exclusive */, false /* do_validate */);
461
462 if (s.ok()) {
463 s = GetBatchForWrite()->Put(column_family, key, value);
464 if (s.ok()) {
465 num_puts_++;
466 }
467 }
468
469 return s;
470 }
471
MergeUntracked(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)472 Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
473 const Slice& key,
474 const Slice& value) {
475 Status s = TryLock(column_family, key, false /* read_only */,
476 true /* exclusive */, false /* do_validate */);
477
478 if (s.ok()) {
479 s = GetBatchForWrite()->Merge(column_family, key, value);
480 if (s.ok()) {
481 num_merges_++;
482 }
483 }
484
485 return s;
486 }
487
DeleteUntracked(ColumnFamilyHandle * column_family,const Slice & key)488 Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
489 const Slice& key) {
490 Status s = TryLock(column_family, key, false /* read_only */,
491 true /* exclusive */, false /* do_validate */);
492
493 if (s.ok()) {
494 s = GetBatchForWrite()->Delete(column_family, key);
495 if (s.ok()) {
496 num_deletes_++;
497 }
498 }
499
500 return s;
501 }
502
DeleteUntracked(ColumnFamilyHandle * column_family,const SliceParts & key)503 Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
504 const SliceParts& key) {
505 Status s = TryLock(column_family, key, false /* read_only */,
506 true /* exclusive */, false /* do_validate */);
507
508 if (s.ok()) {
509 s = GetBatchForWrite()->Delete(column_family, key);
510 if (s.ok()) {
511 num_deletes_++;
512 }
513 }
514
515 return s;
516 }
517
SingleDeleteUntracked(ColumnFamilyHandle * column_family,const Slice & key)518 Status TransactionBaseImpl::SingleDeleteUntracked(
519 ColumnFamilyHandle* column_family, const Slice& key) {
520 Status s = TryLock(column_family, key, false /* read_only */,
521 true /* exclusive */, false /* do_validate */);
522
523 if (s.ok()) {
524 s = GetBatchForWrite()->SingleDelete(column_family, key);
525 if (s.ok()) {
526 num_deletes_++;
527 }
528 }
529
530 return s;
531 }
532
PutLogData(const Slice & blob)533 void TransactionBaseImpl::PutLogData(const Slice& blob) {
534 auto s = write_batch_.PutLogData(blob);
535 (void)s;
536 assert(s.ok());
537 }
538
GetWriteBatch()539 WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
540 return &write_batch_;
541 }
542
GetElapsedTime() const543 uint64_t TransactionBaseImpl::GetElapsedTime() const {
544 return (dbimpl_->GetSystemClock()->NowMicros() - start_time_) / 1000;
545 }
546
GetNumPuts() const547 uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; }
548
GetNumDeletes() const549 uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
550
GetNumMerges() const551 uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
552
GetNumKeys() const553 uint64_t TransactionBaseImpl::GetNumKeys() const {
554 return tracked_locks_->GetNumPointLocks();
555 }
556
TrackKey(uint32_t cfh_id,const std::string & key,SequenceNumber seq,bool read_only,bool exclusive)557 void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
558 SequenceNumber seq, bool read_only,
559 bool exclusive) {
560 PointLockRequest r;
561 r.column_family_id = cfh_id;
562 r.key = key;
563 r.seq = seq;
564 r.read_only = read_only;
565 r.exclusive = exclusive;
566
567 // Update map of all tracked keys for this transaction
568 tracked_locks_->Track(r);
569
570 if (save_points_ != nullptr && !save_points_->empty()) {
571 // Update map of tracked keys in this SavePoint
572 save_points_->top().new_locks_->Track(r);
573 }
574 }
575
576 // Gets the write batch that should be used for Put/Merge/Deletes.
577 //
578 // Returns either a WriteBatch or WriteBatchWithIndex depending on whether
579 // DisableIndexing() has been called.
GetBatchForWrite()580 WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() {
581 if (indexing_enabled_) {
582 // Use WriteBatchWithIndex
583 return &write_batch_;
584 } else {
585 // Don't use WriteBatchWithIndex. Return base WriteBatch.
586 return write_batch_.GetWriteBatch();
587 }
588 }
589
ReleaseSnapshot(const Snapshot * snapshot,DB * db)590 void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
591 if (snapshot != nullptr) {
592 ROCKS_LOG_DETAILS(dbimpl_->immutable_db_options().info_log,
593 "ReleaseSnapshot %" PRIu64 " Set",
594 snapshot->GetSequenceNumber());
595 db->ReleaseSnapshot(snapshot);
596 }
597 }
598
UndoGetForUpdate(ColumnFamilyHandle * column_family,const Slice & key)599 void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
600 const Slice& key) {
601 PointLockRequest r;
602 r.column_family_id = GetColumnFamilyID(column_family);
603 r.key = key.ToString();
604 r.read_only = true;
605
606 bool can_untrack = false;
607 if (save_points_ != nullptr && !save_points_->empty()) {
608 // If there is no GetForUpdate of the key in this save point,
609 // then cannot untrack from the global lock tracker.
610 UntrackStatus s = save_points_->top().new_locks_->Untrack(r);
611 can_untrack = (s != UntrackStatus::NOT_TRACKED);
612 } else {
613 // No save point, so can untrack from the global lock tracker.
614 can_untrack = true;
615 }
616
617 if (can_untrack) {
618 // If erased from the global tracker, then can unlock the key.
619 UntrackStatus s = tracked_locks_->Untrack(r);
620 bool can_unlock = (s == UntrackStatus::REMOVED);
621 if (can_unlock) {
622 UnlockGetForUpdate(column_family, key);
623 }
624 }
625 }
626
RebuildFromWriteBatch(WriteBatch * src_batch)627 Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
628 struct IndexedWriteBatchBuilder : public WriteBatch::Handler {
629 Transaction* txn_;
630 DBImpl* db_;
631 IndexedWriteBatchBuilder(Transaction* txn, DBImpl* db)
632 : txn_(txn), db_(db) {
633 assert(dynamic_cast<TransactionBaseImpl*>(txn_) != nullptr);
634 }
635
636 Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
637 return txn_->Put(db_->GetColumnFamilyHandle(cf), key, val);
638 }
639
640 Status DeleteCF(uint32_t cf, const Slice& key) override {
641 return txn_->Delete(db_->GetColumnFamilyHandle(cf), key);
642 }
643
644 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
645 return txn_->SingleDelete(db_->GetColumnFamilyHandle(cf), key);
646 }
647
648 Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
649 return txn_->Merge(db_->GetColumnFamilyHandle(cf), key, val);
650 }
651
652 // this is used for reconstructing prepared transactions upon
653 // recovery. there should not be any meta markers in the batches
654 // we are processing.
655 Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
656
657 Status MarkEndPrepare(const Slice&) override {
658 return Status::InvalidArgument();
659 }
660
661 Status MarkCommit(const Slice&) override {
662 return Status::InvalidArgument();
663 }
664
665 Status MarkRollback(const Slice&) override {
666 return Status::InvalidArgument();
667 }
668 };
669
670 IndexedWriteBatchBuilder copycat(this, dbimpl_);
671 return src_batch->Iterate(©cat);
672 }
673
GetCommitTimeWriteBatch()674 WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() {
675 return &commit_time_batch_;
676 }
677 } // namespace ROCKSDB_NAMESPACE
678
679 #endif // ROCKSDB_LITE
680