1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/transaction_lock_mgr.h"
9 
10 #include <cinttypes>
11 
12 #include <algorithm>
13 #include <condition_variable>
14 #include <functional>
15 #include <mutex>
16 #include <string>
17 #include <vector>
18 
19 #include "monitoring/perf_context_imp.h"
20 #include "rocksdb/slice.h"
21 #include "rocksdb/utilities/transaction_db_mutex.h"
22 #include "test_util/sync_point.h"
23 #include "util/cast_util.h"
24 #include "util/hash.h"
25 #include "util/thread_local.h"
26 #include "utilities/transactions/pessimistic_transaction_db.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 
30 struct LockInfo {
31   bool exclusive;
32   autovector<TransactionID> txn_ids;
33 
34   // Transaction locks are not valid after this time in us
35   uint64_t expiration_time;
36 
LockInfoROCKSDB_NAMESPACE::LockInfo37   LockInfo(TransactionID id, uint64_t time, bool ex)
38       : exclusive(ex), expiration_time(time) {
39     txn_ids.push_back(id);
40   }
LockInfoROCKSDB_NAMESPACE::LockInfo41   LockInfo(const LockInfo& lock_info)
42       : exclusive(lock_info.exclusive),
43         txn_ids(lock_info.txn_ids),
44         expiration_time(lock_info.expiration_time) {}
45 };
46 
47 struct LockMapStripe {
LockMapStripeROCKSDB_NAMESPACE::LockMapStripe48   explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) {
49     stripe_mutex = factory->AllocateMutex();
50     stripe_cv = factory->AllocateCondVar();
51     assert(stripe_mutex);
52     assert(stripe_cv);
53   }
54 
55   // Mutex must be held before modifying keys map
56   std::shared_ptr<TransactionDBMutex> stripe_mutex;
57 
58   // Condition Variable per stripe for waiting on a lock
59   std::shared_ptr<TransactionDBCondVar> stripe_cv;
60 
61   // Locked keys mapped to the info about the transactions that locked them.
62   // TODO(agiardullo): Explore performance of other data structures.
63   std::unordered_map<std::string, LockInfo> keys;
64 };
65 
66 // Map of #num_stripes LockMapStripes
67 struct LockMap {
LockMapROCKSDB_NAMESPACE::LockMap68   explicit LockMap(size_t num_stripes,
69                    std::shared_ptr<TransactionDBMutexFactory> factory)
70       : num_stripes_(num_stripes) {
71     lock_map_stripes_.reserve(num_stripes);
72     for (size_t i = 0; i < num_stripes; i++) {
73       LockMapStripe* stripe = new LockMapStripe(factory);
74       lock_map_stripes_.push_back(stripe);
75     }
76   }
77 
~LockMapROCKSDB_NAMESPACE::LockMap78   ~LockMap() {
79     for (auto stripe : lock_map_stripes_) {
80       delete stripe;
81     }
82   }
83 
84   // Number of sepearate LockMapStripes to create, each with their own Mutex
85   const size_t num_stripes_;
86 
87   // Count of keys that are currently locked in this column family.
88   // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
89   std::atomic<int64_t> lock_cnt{0};
90 
91   std::vector<LockMapStripe*> lock_map_stripes_;
92 
93   size_t GetStripe(const std::string& key) const;
94 };
95 
AddNewPath(DeadlockPath path)96 void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) {
97   std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
98 
99   if (paths_buffer_.empty()) {
100     return;
101   }
102 
103   paths_buffer_[buffer_idx_] = std::move(path);
104   buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size();
105 }
106 
Resize(uint32_t target_size)107 void DeadlockInfoBuffer::Resize(uint32_t target_size) {
108   std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
109 
110   paths_buffer_ = Normalize();
111 
112   // Drop the deadlocks that will no longer be needed ater the normalize
113   if (target_size < paths_buffer_.size()) {
114     paths_buffer_.erase(
115         paths_buffer_.begin(),
116         paths_buffer_.begin() + (paths_buffer_.size() - target_size));
117     buffer_idx_ = 0;
118   }
119   // Resize the buffer to the target size and restore the buffer's idx
120   else {
121     auto prev_size = paths_buffer_.size();
122     paths_buffer_.resize(target_size);
123     buffer_idx_ = (uint32_t)prev_size;
124   }
125 }
126 
Normalize()127 std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() {
128   auto working = paths_buffer_;
129 
130   if (working.empty()) {
131     return working;
132   }
133 
134   // Next write occurs at a nonexistent path's slot
135   if (paths_buffer_[buffer_idx_].empty()) {
136     working.resize(buffer_idx_);
137   } else {
138     std::rotate(working.begin(), working.begin() + buffer_idx_, working.end());
139   }
140 
141   return working;
142 }
143 
PrepareBuffer()144 std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() {
145   std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
146 
147   // Reversing the normalized vector returns the latest deadlocks first
148   auto working = Normalize();
149   std::reverse(working.begin(), working.end());
150 
151   return working;
152 }
153 
154 namespace {
UnrefLockMapsCache(void * ptr)155 void UnrefLockMapsCache(void* ptr) {
156   // Called when a thread exits or a ThreadLocalPtr gets destroyed.
157   auto lock_maps_cache =
158       static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr);
159   delete lock_maps_cache;
160 }
161 }  // anonymous namespace
162 
TransactionLockMgr(TransactionDB * txn_db,size_t default_num_stripes,int64_t max_num_locks,uint32_t max_num_deadlocks,std::shared_ptr<TransactionDBMutexFactory> mutex_factory)163 TransactionLockMgr::TransactionLockMgr(
164     TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks,
165     uint32_t max_num_deadlocks,
166     std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
167     : txn_db_impl_(nullptr),
168       default_num_stripes_(default_num_stripes),
169       max_num_locks_(max_num_locks),
170       lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
171       dlock_buffer_(max_num_deadlocks),
172       mutex_factory_(mutex_factory) {
173   assert(txn_db);
174   txn_db_impl_ =
175       static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
176 }
177 
~TransactionLockMgr()178 TransactionLockMgr::~TransactionLockMgr() {}
179 
GetStripe(const std::string & key) const180 size_t LockMap::GetStripe(const std::string& key) const {
181   assert(num_stripes_ > 0);
182   return fastrange64(GetSliceNPHash64(key), num_stripes_);
183 }
184 
AddColumnFamily(uint32_t column_family_id)185 void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) {
186   InstrumentedMutexLock l(&lock_map_mutex_);
187 
188   if (lock_maps_.find(column_family_id) == lock_maps_.end()) {
189     lock_maps_.emplace(column_family_id,
190                        std::make_shared<LockMap>(default_num_stripes_, mutex_factory_));
191   } else {
192     // column_family already exists in lock map
193     assert(false);
194   }
195 }
196 
RemoveColumnFamily(uint32_t column_family_id)197 void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) {
198   // Remove lock_map for this column family.  Since the lock map is stored
199   // as a shared ptr, concurrent transactions can still keep using it
200   // until they release their references to it.
201   {
202     InstrumentedMutexLock l(&lock_map_mutex_);
203 
204     auto lock_maps_iter = lock_maps_.find(column_family_id);
205     assert(lock_maps_iter != lock_maps_.end());
206 
207     lock_maps_.erase(lock_maps_iter);
208   }  // lock_map_mutex_
209 
210   // Clear all thread-local caches
211   autovector<void*> local_caches;
212   lock_maps_cache_->Scrape(&local_caches, nullptr);
213   for (auto cache : local_caches) {
214     delete static_cast<LockMaps*>(cache);
215   }
216 }
217 
218 // Look up the LockMap std::shared_ptr for a given column_family_id.
219 // Note:  The LockMap is only valid as long as the caller is still holding on
220 //   to the returned std::shared_ptr.
GetLockMap(uint32_t column_family_id)221 std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
222     uint32_t column_family_id) {
223   // First check thread-local cache
224   if (lock_maps_cache_->Get() == nullptr) {
225     lock_maps_cache_->Reset(new LockMaps());
226   }
227 
228   auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
229 
230   auto lock_map_iter = lock_maps_cache->find(column_family_id);
231   if (lock_map_iter != lock_maps_cache->end()) {
232     // Found lock map for this column family.
233     return lock_map_iter->second;
234   }
235 
236   // Not found in local cache, grab mutex and check shared LockMaps
237   InstrumentedMutexLock l(&lock_map_mutex_);
238 
239   lock_map_iter = lock_maps_.find(column_family_id);
240   if (lock_map_iter == lock_maps_.end()) {
241     return std::shared_ptr<LockMap>(nullptr);
242   } else {
243     // Found lock map.  Store in thread-local cache and return.
244     std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
245     lock_maps_cache->insert({column_family_id, lock_map});
246 
247     return lock_map;
248   }
249 }
250 
251 // Returns true if this lock has expired and can be acquired by another
252 // transaction.
253 // If false, sets *expire_time to the expiration time of the lock according
254 // to Env->GetMicros() or 0 if no expiration.
IsLockExpired(TransactionID txn_id,const LockInfo & lock_info,Env * env,uint64_t * expire_time)255 bool TransactionLockMgr::IsLockExpired(TransactionID txn_id,
256                                        const LockInfo& lock_info, Env* env,
257                                        uint64_t* expire_time) {
258   auto now = env->NowMicros();
259 
260   bool expired =
261       (lock_info.expiration_time > 0 && lock_info.expiration_time <= now);
262 
263   if (!expired && lock_info.expiration_time > 0) {
264     // return how many microseconds until lock will be expired
265     *expire_time = lock_info.expiration_time;
266   } else {
267     for (auto id : lock_info.txn_ids) {
268       if (txn_id == id) {
269         continue;
270       }
271 
272       bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id);
273       if (!success) {
274         expired = false;
275         break;
276       }
277       *expire_time = 0;
278     }
279   }
280 
281   return expired;
282 }
283 
TryLock(PessimisticTransaction * txn,uint32_t column_family_id,const std::string & key,Env * env,bool exclusive)284 Status TransactionLockMgr::TryLock(PessimisticTransaction* txn,
285                                    uint32_t column_family_id,
286                                    const std::string& key, Env* env,
287                                    bool exclusive) {
288   // Lookup lock map for this column family id
289   std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
290   LockMap* lock_map = lock_map_ptr.get();
291   if (lock_map == nullptr) {
292     char msg[255];
293     snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
294              column_family_id);
295 
296     return Status::InvalidArgument(msg);
297   }
298 
299   // Need to lock the mutex for the stripe that this key hashes to
300   size_t stripe_num = lock_map->GetStripe(key);
301   assert(lock_map->lock_map_stripes_.size() > stripe_num);
302   LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
303 
304   LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
305   int64_t timeout = txn->GetLockTimeout();
306 
307   return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
308                             timeout, std::move(lock_info));
309 }
310 
311 // Helper function for TryLock().
AcquireWithTimeout(PessimisticTransaction * txn,LockMap * lock_map,LockMapStripe * stripe,uint32_t column_family_id,const std::string & key,Env * env,int64_t timeout,LockInfo && lock_info)312 Status TransactionLockMgr::AcquireWithTimeout(
313     PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
314     uint32_t column_family_id, const std::string& key, Env* env,
315     int64_t timeout, LockInfo&& lock_info) {
316   Status result;
317   uint64_t end_time = 0;
318 
319   if (timeout > 0) {
320     uint64_t start_time = env->NowMicros();
321     end_time = start_time + timeout;
322   }
323 
324   if (timeout < 0) {
325     // If timeout is negative, we wait indefinitely to acquire the lock
326     result = stripe->stripe_mutex->Lock();
327   } else {
328     result = stripe->stripe_mutex->TryLockFor(timeout);
329   }
330 
331   if (!result.ok()) {
332     // failed to acquire mutex
333     return result;
334   }
335 
336   // Acquire lock if we are able to
337   uint64_t expire_time_hint = 0;
338   autovector<TransactionID> wait_ids;
339   result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
340                          &expire_time_hint, &wait_ids);
341 
342   if (!result.ok() && timeout != 0) {
343     PERF_TIMER_GUARD(key_lock_wait_time);
344     PERF_COUNTER_ADD(key_lock_wait_count, 1);
345     // If we weren't able to acquire the lock, we will keep retrying as long
346     // as the timeout allows.
347     bool timed_out = false;
348     do {
349       // Decide how long to wait
350       int64_t cv_end_time = -1;
351 
352       // Check if held lock's expiration time is sooner than our timeout
353       if (expire_time_hint > 0 &&
354           (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) {
355         // expiration time is sooner than our timeout
356         cv_end_time = expire_time_hint;
357       } else if (timeout >= 0) {
358         cv_end_time = end_time;
359       }
360 
361       assert(result.IsBusy() || wait_ids.size() != 0);
362 
363       // We are dependent on a transaction to finish, so perform deadlock
364       // detection.
365       if (wait_ids.size() != 0) {
366         if (txn->IsDeadlockDetect()) {
367           if (IncrementWaiters(txn, wait_ids, key, column_family_id,
368                                lock_info.exclusive, env)) {
369             result = Status::Busy(Status::SubCode::kDeadlock);
370             stripe->stripe_mutex->UnLock();
371             return result;
372           }
373         }
374         txn->SetWaitingTxn(wait_ids, column_family_id, &key);
375       }
376 
377       TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn");
378       if (cv_end_time < 0) {
379         // Wait indefinitely
380         result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
381       } else {
382         uint64_t now = env->NowMicros();
383         if (static_cast<uint64_t>(cv_end_time) > now) {
384           result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex,
385                                               cv_end_time - now);
386         }
387       }
388 
389       if (wait_ids.size() != 0) {
390         txn->ClearWaitingTxn();
391         if (txn->IsDeadlockDetect()) {
392           DecrementWaiters(txn, wait_ids);
393         }
394       }
395 
396       if (result.IsTimedOut()) {
397           timed_out = true;
398           // Even though we timed out, we will still make one more attempt to
399           // acquire lock below (it is possible the lock expired and we
400           // were never signaled).
401       }
402 
403       if (result.ok() || result.IsTimedOut()) {
404         result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
405                                &expire_time_hint, &wait_ids);
406       }
407     } while (!result.ok() && !timed_out);
408   }
409 
410   stripe->stripe_mutex->UnLock();
411 
412   return result;
413 }
414 
DecrementWaiters(const PessimisticTransaction * txn,const autovector<TransactionID> & wait_ids)415 void TransactionLockMgr::DecrementWaiters(
416     const PessimisticTransaction* txn,
417     const autovector<TransactionID>& wait_ids) {
418   std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
419   DecrementWaitersImpl(txn, wait_ids);
420 }
421 
DecrementWaitersImpl(const PessimisticTransaction * txn,const autovector<TransactionID> & wait_ids)422 void TransactionLockMgr::DecrementWaitersImpl(
423     const PessimisticTransaction* txn,
424     const autovector<TransactionID>& wait_ids) {
425   auto id = txn->GetID();
426   assert(wait_txn_map_.Contains(id));
427   wait_txn_map_.Delete(id);
428 
429   for (auto wait_id : wait_ids) {
430     rev_wait_txn_map_.Get(wait_id)--;
431     if (rev_wait_txn_map_.Get(wait_id) == 0) {
432       rev_wait_txn_map_.Delete(wait_id);
433     }
434   }
435 }
436 
IncrementWaiters(const PessimisticTransaction * txn,const autovector<TransactionID> & wait_ids,const std::string & key,const uint32_t & cf_id,const bool & exclusive,Env * const env)437 bool TransactionLockMgr::IncrementWaiters(
438     const PessimisticTransaction* txn,
439     const autovector<TransactionID>& wait_ids, const std::string& key,
440     const uint32_t& cf_id, const bool& exclusive, Env* const env) {
441   auto id = txn->GetID();
442   std::vector<int> queue_parents(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
443   std::vector<TransactionID> queue_values(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
444   std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
445   assert(!wait_txn_map_.Contains(id));
446 
447   wait_txn_map_.Insert(id, {wait_ids, cf_id, exclusive, key});
448 
449   for (auto wait_id : wait_ids) {
450     if (rev_wait_txn_map_.Contains(wait_id)) {
451       rev_wait_txn_map_.Get(wait_id)++;
452     } else {
453       rev_wait_txn_map_.Insert(wait_id, 1);
454     }
455   }
456 
457   // No deadlock if nobody is waiting on self.
458   if (!rev_wait_txn_map_.Contains(id)) {
459     return false;
460   }
461 
462   const auto* next_ids = &wait_ids;
463   int parent = -1;
464   int64_t deadlock_time = 0;
465   for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) {
466     int i = 0;
467     if (next_ids) {
468       for (; i < static_cast<int>(next_ids->size()) &&
469              tail + i < txn->GetDeadlockDetectDepth();
470            i++) {
471         queue_values[tail + i] = (*next_ids)[i];
472         queue_parents[tail + i] = parent;
473       }
474       tail += i;
475     }
476 
477     // No more items in the list, meaning no deadlock.
478     if (tail == head) {
479       return false;
480     }
481 
482     auto next = queue_values[head];
483     if (next == id) {
484       std::vector<DeadlockInfo> path;
485       while (head != -1) {
486         assert(wait_txn_map_.Contains(queue_values[head]));
487 
488         auto extracted_info = wait_txn_map_.Get(queue_values[head]);
489         path.push_back({queue_values[head], extracted_info.m_cf_id,
490                         extracted_info.m_exclusive,
491                         extracted_info.m_waiting_key});
492         head = queue_parents[head];
493       }
494       env->GetCurrentTime(&deadlock_time);
495       std::reverse(path.begin(), path.end());
496       dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time));
497       deadlock_time = 0;
498       DecrementWaitersImpl(txn, wait_ids);
499       return true;
500     } else if (!wait_txn_map_.Contains(next)) {
501       next_ids = nullptr;
502       continue;
503     } else {
504       parent = head;
505       next_ids = &(wait_txn_map_.Get(next).m_neighbors);
506     }
507   }
508 
509   // Wait cycle too big, just assume deadlock.
510   env->GetCurrentTime(&deadlock_time);
511   dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true));
512   DecrementWaitersImpl(txn, wait_ids);
513   return true;
514 }
515 
516 // Try to lock this key after we have acquired the mutex.
517 // Sets *expire_time to the expiration time in microseconds
518 //  or 0 if no expiration.
519 // REQUIRED:  Stripe mutex must be held.
AcquireLocked(LockMap * lock_map,LockMapStripe * stripe,const std::string & key,Env * env,LockInfo && txn_lock_info,uint64_t * expire_time,autovector<TransactionID> * txn_ids)520 Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
521                                          LockMapStripe* stripe,
522                                          const std::string& key, Env* env,
523                                          LockInfo&& txn_lock_info,
524                                          uint64_t* expire_time,
525                                          autovector<TransactionID>* txn_ids) {
526   assert(txn_lock_info.txn_ids.size() == 1);
527 
528   Status result;
529   // Check if this key is already locked
530   auto stripe_iter = stripe->keys.find(key);
531   if (stripe_iter != stripe->keys.end()) {
532     // Lock already held
533     LockInfo& lock_info = stripe_iter->second;
534     assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive);
535 
536     if (lock_info.exclusive || txn_lock_info.exclusive) {
537       if (lock_info.txn_ids.size() == 1 &&
538           lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) {
539         // The list contains one txn and we're it, so just take it.
540         lock_info.exclusive = txn_lock_info.exclusive;
541         lock_info.expiration_time = txn_lock_info.expiration_time;
542       } else {
543         // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case
544         // it's there for a shared lock with multiple holders which was not
545         // caught in the first case.
546         if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,
547                           expire_time)) {
548           // lock is expired, can steal it
549           lock_info.txn_ids = txn_lock_info.txn_ids;
550           lock_info.exclusive = txn_lock_info.exclusive;
551           lock_info.expiration_time = txn_lock_info.expiration_time;
552           // lock_cnt does not change
553         } else {
554           result = Status::TimedOut(Status::SubCode::kLockTimeout);
555           *txn_ids = lock_info.txn_ids;
556         }
557       }
558     } else {
559       // We are requesting shared access to a shared lock, so just grant it.
560       lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);
561       // Using std::max means that expiration time never goes down even when
562       // a transaction is removed from the list. The correct solution would be
563       // to track expiry for every transaction, but this would also work for
564       // now.
565       lock_info.expiration_time =
566           std::max(lock_info.expiration_time, txn_lock_info.expiration_time);
567     }
568   } else {  // Lock not held.
569     // Check lock limit
570     if (max_num_locks_ > 0 &&
571         lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) {
572       result = Status::Busy(Status::SubCode::kLockLimit);
573     } else {
574       // acquire lock
575       stripe->keys.emplace(key, std::move(txn_lock_info));
576 
577       // Maintain lock count if there is a limit on the number of locks
578       if (max_num_locks_) {
579         lock_map->lock_cnt++;
580       }
581     }
582   }
583 
584   return result;
585 }
586 
UnLockKey(const PessimisticTransaction * txn,const std::string & key,LockMapStripe * stripe,LockMap * lock_map,Env * env)587 void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,
588                                    const std::string& key,
589                                    LockMapStripe* stripe, LockMap* lock_map,
590                                    Env* env) {
591 #ifdef NDEBUG
592   (void)env;
593 #endif
594   TransactionID txn_id = txn->GetID();
595 
596   auto stripe_iter = stripe->keys.find(key);
597   if (stripe_iter != stripe->keys.end()) {
598     auto& txns = stripe_iter->second.txn_ids;
599     auto txn_it = std::find(txns.begin(), txns.end(), txn_id);
600     // Found the key we locked.  unlock it.
601     if (txn_it != txns.end()) {
602       if (txns.size() == 1) {
603         stripe->keys.erase(stripe_iter);
604       } else {
605         auto last_it = txns.end() - 1;
606         if (txn_it != last_it) {
607           *txn_it = *last_it;
608         }
609         txns.pop_back();
610       }
611 
612       if (max_num_locks_ > 0) {
613         // Maintain lock count if there is a limit on the number of locks.
614         assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
615         lock_map->lock_cnt--;
616       }
617     }
618   } else {
619     // This key is either not locked or locked by someone else.  This should
620     // only happen if the unlocking transaction has expired.
621     assert(txn->GetExpirationTime() > 0 &&
622            txn->GetExpirationTime() < env->NowMicros());
623   }
624 }
625 
UnLock(PessimisticTransaction * txn,uint32_t column_family_id,const std::string & key,Env * env)626 void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
627                                 uint32_t column_family_id,
628                                 const std::string& key, Env* env) {
629   std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
630   LockMap* lock_map = lock_map_ptr.get();
631   if (lock_map == nullptr) {
632     // Column Family must have been dropped.
633     return;
634   }
635 
636   // Lock the mutex for the stripe that this key hashes to
637   size_t stripe_num = lock_map->GetStripe(key);
638   assert(lock_map->lock_map_stripes_.size() > stripe_num);
639   LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
640 
641   stripe->stripe_mutex->Lock();
642   UnLockKey(txn, key, stripe, lock_map, env);
643   stripe->stripe_mutex->UnLock();
644 
645   // Signal waiting threads to retry locking
646   stripe->stripe_cv->NotifyAll();
647 }
648 
UnLock(const PessimisticTransaction * txn,const TransactionKeyMap * key_map,Env * env)649 void TransactionLockMgr::UnLock(const PessimisticTransaction* txn,
650                                 const TransactionKeyMap* key_map, Env* env) {
651   for (auto& key_map_iter : *key_map) {
652     uint32_t column_family_id = key_map_iter.first;
653     auto& keys = key_map_iter.second;
654 
655     std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
656     LockMap* lock_map = lock_map_ptr.get();
657 
658     if (lock_map == nullptr) {
659       // Column Family must have been dropped.
660       return;
661     }
662 
663     // Bucket keys by lock_map_ stripe
664     std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe(
665         std::max(keys.size(), lock_map->num_stripes_));
666 
667     for (auto& key_iter : keys) {
668       const std::string& key = key_iter.first;
669 
670       size_t stripe_num = lock_map->GetStripe(key);
671       keys_by_stripe[stripe_num].push_back(&key);
672     }
673 
674     // For each stripe, grab the stripe mutex and unlock all keys in this stripe
675     for (auto& stripe_iter : keys_by_stripe) {
676       size_t stripe_num = stripe_iter.first;
677       auto& stripe_keys = stripe_iter.second;
678 
679       assert(lock_map->lock_map_stripes_.size() > stripe_num);
680       LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
681 
682       stripe->stripe_mutex->Lock();
683 
684       for (const std::string* key : stripe_keys) {
685         UnLockKey(txn, *key, stripe, lock_map, env);
686       }
687 
688       stripe->stripe_mutex->UnLock();
689 
690       // Signal waiting threads to retry locking
691       stripe->stripe_cv->NotifyAll();
692     }
693   }
694 }
695 
GetLockStatusData()696 TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
697   LockStatusData data;
698   // Lock order here is important. The correct order is lock_map_mutex_, then
699   // for every column family ID in ascending order lock every stripe in
700   // ascending order.
701   InstrumentedMutexLock l(&lock_map_mutex_);
702 
703   std::vector<uint32_t> cf_ids;
704   for (const auto& map : lock_maps_) {
705     cf_ids.push_back(map.first);
706   }
707   std::sort(cf_ids.begin(), cf_ids.end());
708 
709   for (auto i : cf_ids) {
710     const auto& stripes = lock_maps_[i]->lock_map_stripes_;
711     // Iterate and lock all stripes in ascending order.
712     for (const auto& j : stripes) {
713       j->stripe_mutex->Lock();
714       for (const auto& it : j->keys) {
715         struct KeyLockInfo info;
716         info.exclusive = it.second.exclusive;
717         info.key = it.first;
718         for (const auto& id : it.second.txn_ids) {
719           info.ids.push_back(id);
720         }
721         data.insert({i, info});
722       }
723     }
724   }
725 
726   // Unlock everything. Unlocking order is not important.
727   for (auto i : cf_ids) {
728     const auto& stripes = lock_maps_[i]->lock_map_stripes_;
729     for (const auto& j : stripes) {
730       j->stripe_mutex->UnLock();
731     }
732   }
733 
734   return data;
735 }
GetDeadlockInfoBuffer()736 std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() {
737   return dlock_buffer_.PrepareBuffer();
738 }
739 
Resize(uint32_t target_size)740 void TransactionLockMgr::Resize(uint32_t target_size) {
741   dlock_buffer_.Resize(target_size);
742 }
743 
744 }  // namespace ROCKSDB_NAMESPACE
745 #endif  // ROCKSDB_LITE
746