1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/transaction_util.h"
9 
10 #include <cinttypes>
11 #include <string>
12 #include <vector>
13 
14 #include "db/db_impl/db_impl.h"
15 #include "rocksdb/status.h"
16 #include "rocksdb/utilities/write_batch_with_index.h"
17 #include "util/cast_util.h"
18 #include "util/string_util.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 
CheckKeyForConflicts(DBImpl * db_impl,ColumnFamilyHandle * column_family,const std::string & key,SequenceNumber snap_seq,bool cache_only,ReadCallback * snap_checker,SequenceNumber min_uncommitted)22 Status TransactionUtil::CheckKeyForConflicts(
23     DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key,
24     SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker,
25     SequenceNumber min_uncommitted) {
26   Status result;
27 
28   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
29   auto cfd = cfh->cfd();
30   SuperVersion* sv = db_impl->GetAndRefSuperVersion(cfd);
31 
32   if (sv == nullptr) {
33     result = Status::InvalidArgument("Could not access column family " +
34                                      cfh->GetName());
35   }
36 
37   if (result.ok()) {
38     SequenceNumber earliest_seq =
39         db_impl->GetEarliestMemTableSequenceNumber(sv, true);
40 
41     result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, cache_only,
42                       snap_checker, min_uncommitted);
43 
44     db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
45   }
46 
47   return result;
48 }
49 
CheckKey(DBImpl * db_impl,SuperVersion * sv,SequenceNumber earliest_seq,SequenceNumber snap_seq,const std::string & key,bool cache_only,ReadCallback * snap_checker,SequenceNumber min_uncommitted)50 Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
51                                  SequenceNumber earliest_seq,
52                                  SequenceNumber snap_seq,
53                                  const std::string& key, bool cache_only,
54                                  ReadCallback* snap_checker,
55                                  SequenceNumber min_uncommitted) {
56   // When `min_uncommitted` is provided, keys are not always committed
57   // in sequence number order, and `snap_checker` is used to check whether
58   // specific sequence number is in the database is visible to the transaction.
59   // So `snap_checker` must be provided.
60   assert(min_uncommitted == kMaxSequenceNumber || snap_checker != nullptr);
61 
62   Status result;
63   bool need_to_read_sst = false;
64 
65   // Since it would be too slow to check the SST files, we will only use
66   // the memtables to check whether there have been any recent writes
67   // to this key after it was accessed in this transaction.  But if the
68   // Memtables do not contain a long enough history, we must fail the
69   // transaction.
70   if (earliest_seq == kMaxSequenceNumber) {
71     // The age of this memtable is unknown.  Cannot rely on it to check
72     // for recent writes.  This error shouldn't happen often in practice as
73     // the Memtable should have a valid earliest sequence number except in some
74     // corner cases (such as error cases during recovery).
75     need_to_read_sst = true;
76 
77     if (cache_only) {
78       result = Status::TryAgain(
79           "Transaction could not check for conflicts as the MemTable does not "
80           "contain a long enough history to check write at SequenceNumber: ",
81           ToString(snap_seq));
82     }
83   } else if (snap_seq < earliest_seq || min_uncommitted <= earliest_seq) {
84     // Use <= for min_uncommitted since earliest_seq is actually the largest sec
85     // before this memtable was created
86     need_to_read_sst = true;
87 
88     if (cache_only) {
89       // The age of this memtable is too new to use to check for recent
90       // writes.
91       char msg[300];
92       snprintf(msg, sizeof(msg),
93                "Transaction could not check for conflicts for operation at "
94                "SequenceNumber %" PRIu64
95                " as the MemTable only contains changes newer than "
96                "SequenceNumber %" PRIu64
97                ".  Increasing the value of the "
98                "max_write_buffer_size_to_maintain option could reduce the "
99                "frequency "
100                "of this error.",
101                snap_seq, earliest_seq);
102       result = Status::TryAgain(msg);
103     }
104   }
105 
106   if (result.ok()) {
107     SequenceNumber seq = kMaxSequenceNumber;
108     bool found_record_for_key = false;
109 
110     // When min_uncommitted == kMaxSequenceNumber, writes are committed in
111     // sequence number order, so only keys larger than `snap_seq` can cause
112     // conflict.
113     // When min_uncommitted != kMaxSequenceNumber, keys lower than
114     // min_uncommitted will not triggered conflicts, while keys larger than
115     // min_uncommitted might create conflicts, so we need  to read them out
116     // from the DB, and call callback to snap_checker to determine. So only
117     // keys lower than min_uncommitted can be skipped.
118     SequenceNumber lower_bound_seq =
119         (min_uncommitted == kMaxSequenceNumber) ? snap_seq : min_uncommitted;
120     Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst,
121                                                 lower_bound_seq, &seq,
122                                                 &found_record_for_key);
123 
124     if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
125       result = s;
126     } else if (found_record_for_key) {
127       bool write_conflict = snap_checker == nullptr
128                                 ? snap_seq < seq
129                                 : !snap_checker->IsVisible(seq);
130       if (write_conflict) {
131         result = Status::Busy();
132       }
133     }
134   }
135 
136   return result;
137 }
138 
CheckKeysForConflicts(DBImpl * db_impl,const LockTracker & tracker,bool cache_only)139 Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
140                                               const LockTracker& tracker,
141                                               bool cache_only) {
142   Status result;
143 
144   std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
145       tracker.GetColumnFamilyIterator());
146   assert(cf_it != nullptr);
147   while (cf_it->HasNext()) {
148     ColumnFamilyId cf = cf_it->Next();
149 
150     SuperVersion* sv = db_impl->GetAndRefSuperVersion(cf);
151     if (sv == nullptr) {
152       result = Status::InvalidArgument("Could not access column family " +
153                                        ToString(cf));
154       break;
155     }
156 
157     SequenceNumber earliest_seq =
158         db_impl->GetEarliestMemTableSequenceNumber(sv, true);
159 
160     // For each of the keys in this transaction, check to see if someone has
161     // written to this key since the start of the transaction.
162     std::unique_ptr<LockTracker::KeyIterator> key_it(
163         tracker.GetKeyIterator(cf));
164     assert(key_it != nullptr);
165     while (key_it->HasNext()) {
166       const std::string& key = key_it->Next();
167       PointLockStatus status = tracker.GetPointLockStatus(cf, key);
168       const SequenceNumber key_seq = status.seq;
169 
170       result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only);
171       if (!result.ok()) {
172         break;
173       }
174     }
175 
176     db_impl->ReturnAndCleanupSuperVersion(cf, sv);
177 
178     if (!result.ok()) {
179       break;
180     }
181   }
182 
183   return result;
184 }
185 
186 }  // namespace ROCKSDB_NAMESPACE
187 
188 #endif  // ROCKSDB_LITE
189