1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/transaction_util.h"
9 
10 #include <cinttypes>
11 #include <string>
12 #include <vector>
13 
14 #include "db/db_impl/db_impl.h"
15 #include "rocksdb/status.h"
16 #include "rocksdb/utilities/write_batch_with_index.h"
17 #include "util/string_util.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
CheckKeyForConflicts(DBImpl * db_impl,ColumnFamilyHandle * column_family,const std::string & key,SequenceNumber snap_seq,bool cache_only,ReadCallback * snap_checker,SequenceNumber min_uncommitted)21 Status TransactionUtil::CheckKeyForConflicts(
22     DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key,
23     SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker,
24     SequenceNumber min_uncommitted) {
25   Status result;
26 
27   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
28   auto cfd = cfh->cfd();
29   SuperVersion* sv = db_impl->GetAndRefSuperVersion(cfd);
30 
31   if (sv == nullptr) {
32     result = Status::InvalidArgument("Could not access column family " +
33                                      cfh->GetName());
34   }
35 
36   if (result.ok()) {
37     SequenceNumber earliest_seq =
38         db_impl->GetEarliestMemTableSequenceNumber(sv, true);
39 
40     result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, cache_only,
41                       snap_checker, min_uncommitted);
42 
43     db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
44   }
45 
46   return result;
47 }
48 
CheckKey(DBImpl * db_impl,SuperVersion * sv,SequenceNumber earliest_seq,SequenceNumber snap_seq,const std::string & key,bool cache_only,ReadCallback * snap_checker,SequenceNumber min_uncommitted)49 Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
50                                  SequenceNumber earliest_seq,
51                                  SequenceNumber snap_seq,
52                                  const std::string& key, bool cache_only,
53                                  ReadCallback* snap_checker,
54                                  SequenceNumber min_uncommitted) {
55   // When `min_uncommitted` is provided, keys are not always committed
56   // in sequence number order, and `snap_checker` is used to check whether
57   // specific sequence number is in the database is visible to the transaction.
58   // So `snap_checker` must be provided.
59   assert(min_uncommitted == kMaxSequenceNumber || snap_checker != nullptr);
60 
61   Status result;
62   bool need_to_read_sst = false;
63 
64   // Since it would be too slow to check the SST files, we will only use
65   // the memtables to check whether there have been any recent writes
66   // to this key after it was accessed in this transaction.  But if the
67   // Memtables do not contain a long enough history, we must fail the
68   // transaction.
69   if (earliest_seq == kMaxSequenceNumber) {
70     // The age of this memtable is unknown.  Cannot rely on it to check
71     // for recent writes.  This error shouldn't happen often in practice as
72     // the Memtable should have a valid earliest sequence number except in some
73     // corner cases (such as error cases during recovery).
74     need_to_read_sst = true;
75 
76     if (cache_only) {
77       result = Status::TryAgain(
78           "Transaction could not check for conflicts as the MemTable does not "
79           "contain a long enough history to check write at SequenceNumber: ",
80           ToString(snap_seq));
81     }
82   } else if (snap_seq < earliest_seq || min_uncommitted <= earliest_seq) {
83     // Use <= for min_uncommitted since earliest_seq is actually the largest sec
84     // before this memtable was created
85     need_to_read_sst = true;
86 
87     if (cache_only) {
88       // The age of this memtable is too new to use to check for recent
89       // writes.
90       char msg[300];
91       snprintf(msg, sizeof(msg),
92                "Transaction could not check for conflicts for operation at "
93                "SequenceNumber %" PRIu64
94                " as the MemTable only contains changes newer than "
95                "SequenceNumber %" PRIu64
96                ".  Increasing the value of the "
97                "max_write_buffer_size_to_maintain option could reduce the "
98                "frequency "
99                "of this error.",
100                snap_seq, earliest_seq);
101       result = Status::TryAgain(msg);
102     }
103   }
104 
105   if (result.ok()) {
106     SequenceNumber seq = kMaxSequenceNumber;
107     bool found_record_for_key = false;
108 
109     // When min_uncommitted == kMaxSequenceNumber, writes are committed in
110     // sequence number order, so only keys larger than `snap_seq` can cause
111     // conflict.
112     // When min_uncommitted != kMaxSequenceNumber, keys lower than
113     // min_uncommitted will not triggered conflicts, while keys larger than
114     // min_uncommitted might create conflicts, so we need  to read them out
115     // from the DB, and call callback to snap_checker to determine. So only
116     // keys lower than min_uncommitted can be skipped.
117     SequenceNumber lower_bound_seq =
118         (min_uncommitted == kMaxSequenceNumber) ? snap_seq : min_uncommitted;
119     Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst,
120                                                 lower_bound_seq, &seq,
121                                                 &found_record_for_key);
122 
123     if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
124       result = s;
125     } else if (found_record_for_key) {
126       bool write_conflict = snap_checker == nullptr
127                                 ? snap_seq < seq
128                                 : !snap_checker->IsVisible(seq);
129       if (write_conflict) {
130         result = Status::Busy();
131       }
132     }
133   }
134 
135   return result;
136 }
137 
CheckKeysForConflicts(DBImpl * db_impl,const TransactionKeyMap & key_map,bool cache_only)138 Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
139                                               const TransactionKeyMap& key_map,
140                                               bool cache_only) {
141   Status result;
142 
143   for (auto& key_map_iter : key_map) {
144     uint32_t cf_id = key_map_iter.first;
145     const auto& keys = key_map_iter.second;
146 
147     SuperVersion* sv = db_impl->GetAndRefSuperVersion(cf_id);
148     if (sv == nullptr) {
149       result = Status::InvalidArgument("Could not access column family " +
150                                        ToString(cf_id));
151       break;
152     }
153 
154     SequenceNumber earliest_seq =
155         db_impl->GetEarliestMemTableSequenceNumber(sv, true);
156 
157     // For each of the keys in this transaction, check to see if someone has
158     // written to this key since the start of the transaction.
159     for (const auto& key_iter : keys) {
160       const auto& key = key_iter.first;
161       const SequenceNumber key_seq = key_iter.second.seq;
162 
163       result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only);
164 
165       if (!result.ok()) {
166         break;
167       }
168     }
169 
170     db_impl->ReturnAndCleanupSuperVersion(cf_id, sv);
171 
172     if (!result.ok()) {
173       break;
174     }
175   }
176 
177   return result;
178 }
179 
180 }  // namespace ROCKSDB_NAMESPACE
181 
182 #endif  // ROCKSDB_LITE
183