1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 #ifndef OS_WIN
8 
9 #include "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h"
10 
11 #include <algorithm>
12 #include <cinttypes>
13 #include <mutex>
14 
15 #include "monitoring/perf_context_imp.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/utilities/transaction_db_mutex.h"
18 #include "test_util/sync_point.h"
19 #include "util/cast_util.h"
20 #include "util/hash.h"
21 #include "util/thread_local.h"
22 #include "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h"
23 #include "utilities/transactions/pessimistic_transaction_db.h"
24 #include "utilities/transactions/transaction_db_mutex_impl.h"
25 
26 namespace ROCKSDB_NAMESPACE {
27 
NewRangeLockManager(std::shared_ptr<TransactionDBMutexFactory> mutex_factory)28 RangeLockManagerHandle* NewRangeLockManager(
29     std::shared_ptr<TransactionDBMutexFactory> mutex_factory) {
30   std::shared_ptr<TransactionDBMutexFactory> use_factory;
31 
32   if (mutex_factory) {
33     use_factory = mutex_factory;
34   } else {
35     use_factory.reset(new TransactionDBMutexFactoryImpl());
36   }
37   return new RangeTreeLockManager(use_factory);
38 }
39 
40 static const char SUFFIX_INFIMUM = 0x0;
41 static const char SUFFIX_SUPREMUM = 0x1;
42 
43 // Convert Endpoint into an internal format used for storing it in locktree
44 // (DBT structure is used for passing endpoints to locktree and getting back)
serialize_endpoint(const Endpoint & endp,std::string * buf)45 void serialize_endpoint(const Endpoint& endp, std::string* buf) {
46   buf->push_back(endp.inf_suffix ? SUFFIX_SUPREMUM : SUFFIX_INFIMUM);
47   buf->append(endp.slice.data(), endp.slice.size());
48 }
49 
50 // Decode the endpoint from the format it is stored in the locktree (DBT) to
51 // one used outside (EndpointWithString)
deserialize_endpoint(const DBT * dbt,EndpointWithString * endp)52 void deserialize_endpoint(const DBT* dbt, EndpointWithString* endp) {
53   assert(dbt->size >= 1);
54   const char* dbt_data = (const char*)dbt->data;
55   char suffix = dbt_data[0];
56   assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM);
57   endp->inf_suffix = (suffix == SUFFIX_SUPREMUM);
58   endp->slice.assign(dbt_data + 1, dbt->size - 1);
59 }
60 
61 // Get a range lock on [start_key; end_key] range
TryLock(PessimisticTransaction * txn,uint32_t column_family_id,const Endpoint & start_endp,const Endpoint & end_endp,Env *,bool exclusive)62 Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn,
63                                      uint32_t column_family_id,
64                                      const Endpoint& start_endp,
65                                      const Endpoint& end_endp, Env*,
66                                      bool exclusive) {
67   toku::lock_request request;
68   request.create(mutex_factory_);
69   DBT start_key_dbt, end_key_dbt;
70 
71   TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:enter");
72   std::string start_key;
73   std::string end_key;
74   serialize_endpoint(start_endp, &start_key);
75   serialize_endpoint(end_endp, &end_key);
76 
77   toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size());
78   toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size());
79 
80   auto lt = GetLockTreeForCF(column_family_id);
81 
82   // Put the key waited on into request's m_extra. See
83   // wait_callback_for_locktree for details.
84   std::string wait_key(start_endp.slice.data(), start_endp.slice.size());
85 
86   request.set(lt.get(), (TXNID)txn, &start_key_dbt, &end_key_dbt,
87               exclusive ? toku::lock_request::WRITE : toku::lock_request::READ,
88               false /* not a big txn */, &wait_key);
89 
90   // This is for "periodically wake up and check if the wait is killed" feature
91   // which we are not using.
92   uint64_t killed_time_msec = 0;
93   uint64_t wait_time_msec = txn->GetLockTimeout();
94 
95   if (wait_time_msec == static_cast<uint64_t>(-1)) {
96     // The transaction has no wait timeout. lock_request::wait doesn't support
97     // this, it needs a number of milliseconds to wait. Pass it one year to
98     // be safe.
99     wait_time_msec = uint64_t(1000) * 60 * 60 * 24 * 365;
100   } else {
101     // convert microseconds to milliseconds
102     wait_time_msec = (wait_time_msec + 500) / 1000;
103   }
104 
105   std::vector<RangeDeadlockInfo> di_path;
106   request.m_deadlock_cb = [&](TXNID txnid, bool is_exclusive,
107                               const DBT* start_dbt, const DBT* end_dbt) {
108     EndpointWithString start;
109     EndpointWithString end;
110     deserialize_endpoint(start_dbt, &start);
111     deserialize_endpoint(end_dbt, &end);
112 
113     di_path.push_back({((PessimisticTransaction*)txnid)->GetID(),
114                        column_family_id, is_exclusive, std::move(start),
115                        std::move(end)});
116   };
117 
118   request.start();
119 
120   const int r = request.wait(wait_time_msec, killed_time_msec,
121                              nullptr,  // killed_callback
122                              wait_callback_for_locktree, nullptr);
123 
124   // Inform the txn that we are no longer waiting:
125   txn->ClearWaitingTxn();
126 
127   request.destroy();
128   switch (r) {
129     case 0:
130       break;  // fall through
131     case DB_LOCK_NOTGRANTED:
132       return Status::TimedOut(Status::SubCode::kLockTimeout);
133     case TOKUDB_OUT_OF_LOCKS:
134       return Status::Busy(Status::SubCode::kLockLimit);
135     case DB_LOCK_DEADLOCK: {
136       std::reverse(di_path.begin(), di_path.end());
137       dlock_buffer_.AddNewPath(
138           RangeDeadlockPath(di_path, request.get_start_time()));
139       return Status::Busy(Status::SubCode::kDeadlock);
140     }
141     default:
142       assert(0);
143       return Status::Busy(Status::SubCode::kLockLimit);
144   }
145 
146   return Status::OK();
147 }
148 
149 // Wait callback that locktree library will call to inform us about
150 // the lock waits that are in progress.
wait_callback_for_locktree(void *,lock_wait_infos * infos)151 void wait_callback_for_locktree(void*, lock_wait_infos* infos) {
152   for (auto wait_info : *infos) {
153     auto txn = (PessimisticTransaction*)wait_info.waiter;
154     auto cf_id = (ColumnFamilyId)wait_info.ltree->get_dict_id().dictid;
155 
156     autovector<TransactionID> waitee_ids;
157     for (auto waitee : wait_info.waitees) {
158       waitee_ids.push_back(((PessimisticTransaction*)waitee)->GetID());
159     }
160     txn->SetWaitingTxn(waitee_ids, cf_id, (std::string*)wait_info.m_extra);
161   }
162 
163   // Here we can assume that the locktree code will now wait for some lock
164   TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:WaitingTxn");
165 }
166 
UnLock(PessimisticTransaction * txn,ColumnFamilyId column_family_id,const std::string & key,Env *)167 void RangeTreeLockManager::UnLock(PessimisticTransaction* txn,
168                                   ColumnFamilyId column_family_id,
169                                   const std::string& key, Env*) {
170   auto locktree = GetLockTreeForCF(column_family_id);
171   std::string endp_image;
172   serialize_endpoint({key.data(), key.size(), false}, &endp_image);
173 
174   DBT key_dbt;
175   toku_fill_dbt(&key_dbt, endp_image.data(), endp_image.size());
176 
177   toku::range_buffer range_buf;
178   range_buf.create();
179   range_buf.append(&key_dbt, &key_dbt);
180 
181   locktree->release_locks((TXNID)txn, &range_buf);
182   range_buf.destroy();
183 
184   toku::lock_request::retry_all_lock_requests(
185       locktree.get(), wait_callback_for_locktree, nullptr);
186 }
187 
UnLock(PessimisticTransaction * txn,const LockTracker & tracker,Env *)188 void RangeTreeLockManager::UnLock(PessimisticTransaction* txn,
189                                   const LockTracker& tracker, Env*) {
190   const RangeTreeLockTracker* range_tracker =
191       static_cast<const RangeTreeLockTracker*>(&tracker);
192 
193   RangeTreeLockTracker* range_trx_tracker =
194       static_cast<RangeTreeLockTracker*>(&txn->GetTrackedLocks());
195   bool all_keys = (range_trx_tracker == range_tracker);
196 
197   // tracked_locks_->range_list may hold nullptr if the transaction has never
198   // acquired any locks.
199   ((RangeTreeLockTracker*)range_tracker)->ReleaseLocks(this, txn, all_keys);
200 }
201 
CompareDbtEndpoints(void * arg,const DBT * a_key,const DBT * b_key)202 int RangeTreeLockManager::CompareDbtEndpoints(void* arg, const DBT* a_key,
203                                               const DBT* b_key) {
204   const char* a = (const char*)a_key->data;
205   const char* b = (const char*)b_key->data;
206 
207   size_t a_len = a_key->size;
208   size_t b_len = b_key->size;
209 
210   size_t min_len = std::min(a_len, b_len);
211 
212   // Compare the values. The first byte encodes the endpoint type, its value
213   // is either SUFFIX_INFIMUM or SUFFIX_SUPREMUM.
214   Comparator* cmp = (Comparator*)arg;
215   int res = cmp->Compare(Slice(a + 1, min_len - 1), Slice(b + 1, min_len - 1));
216   if (!res) {
217     if (b_len > min_len) {
218       // a is shorter;
219       if (a[0] == SUFFIX_INFIMUM) {
220         return -1;  //"a is smaller"
221       } else {
222         // a is considered padded with 0xFF:FF:FF:FF...
223         return 1;  // "a" is bigger
224       }
225     } else if (a_len > min_len) {
226       // the opposite of the above: b is shorter.
227       if (b[0] == SUFFIX_INFIMUM) {
228         return 1;  //"b is smaller"
229       } else {
230         // b is considered padded with 0xFF:FF:FF:FF...
231         return -1;  // "b" is bigger
232       }
233     } else {
234       // the lengths are equal (and the key values, too)
235       if (a[0] < b[0]) {
236         return -1;
237       } else if (a[0] > b[0]) {
238         return 1;
239       } else {
240         return 0;
241       }
242     }
243   } else {
244     return res;
245   }
246 }
247 
248 namespace {
UnrefLockTreeMapsCache(void * ptr)249 void UnrefLockTreeMapsCache(void* ptr) {
250   // Called when a thread exits or a ThreadLocalPtr gets destroyed.
251   auto lock_tree_map_cache = static_cast<
252       std::unordered_map<ColumnFamilyId, std::shared_ptr<locktree>>*>(ptr);
253   delete lock_tree_map_cache;
254 }
255 }  // anonymous namespace
256 
RangeTreeLockManager(std::shared_ptr<TransactionDBMutexFactory> mutex_factory)257 RangeTreeLockManager::RangeTreeLockManager(
258     std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
259     : mutex_factory_(mutex_factory),
260       ltree_lookup_cache_(new ThreadLocalPtr(&UnrefLockTreeMapsCache)),
261       dlock_buffer_(10) {
262   ltm_.create(on_create, on_destroy, on_escalate, nullptr, mutex_factory_);
263 }
264 
SetRangeDeadlockInfoBufferSize(uint32_t target_size)265 void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize(
266     uint32_t target_size) {
267   dlock_buffer_.Resize(target_size);
268 }
269 
Resize(uint32_t target_size)270 void RangeTreeLockManager::Resize(uint32_t target_size) {
271   SetRangeDeadlockInfoBufferSize(target_size);
272 }
273 
274 std::vector<RangeDeadlockPath>
GetRangeDeadlockInfoBuffer()275 RangeTreeLockManager::GetRangeDeadlockInfoBuffer() {
276   return dlock_buffer_.PrepareBuffer();
277 }
278 
GetDeadlockInfoBuffer()279 std::vector<DeadlockPath> RangeTreeLockManager::GetDeadlockInfoBuffer() {
280   std::vector<DeadlockPath> res;
281   std::vector<RangeDeadlockPath> data = GetRangeDeadlockInfoBuffer();
282   // report left endpoints
283   for (auto it = data.begin(); it != data.end(); ++it) {
284     std::vector<DeadlockInfo> path;
285 
286     for (auto it2 = it->path.begin(); it2 != it->path.end(); ++it2) {
287       path.push_back(
288           {it2->m_txn_id, it2->m_cf_id, it2->m_exclusive, it2->m_start.slice});
289     }
290     res.push_back(DeadlockPath(path, it->deadlock_time));
291   }
292   return res;
293 }
294 
295 // @brief  Lock Escalation Callback function
296 //
297 // @param txnid   Transaction whose locks got escalated
298 // @param lt      Lock Tree where escalation is happening
299 // @param buffer  Escalation result: list of locks that this transaction now
300 //                owns in this lock tree.
301 // @param void*   Callback context
on_escalate(TXNID txnid,const locktree * lt,const range_buffer & buffer,void *)302 void RangeTreeLockManager::on_escalate(TXNID txnid, const locktree* lt,
303                                        const range_buffer& buffer, void*) {
304   auto txn = (PessimisticTransaction*)txnid;
305   ((RangeTreeLockTracker*)&txn->GetTrackedLocks())->ReplaceLocks(lt, buffer);
306 }
307 
~RangeTreeLockManager()308 RangeTreeLockManager::~RangeTreeLockManager() {
309   autovector<void*> local_caches;
310   ltree_lookup_cache_->Scrape(&local_caches, nullptr);
311   for (auto cache : local_caches) {
312     delete static_cast<LockTreeMap*>(cache);
313   }
314   ltree_map_.clear();  // this will call release_lt() for all locktrees
315   ltm_.destroy();
316 }
317 
GetStatus()318 RangeLockManagerHandle::Counters RangeTreeLockManager::GetStatus() {
319   LTM_STATUS_S ltm_status_test;
320   ltm_.get_status(&ltm_status_test);
321   Counters res;
322 
323   // Searching status variable by its string name is how Toku's unit tests
324   // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?)
325   // lookup keyname in status
326   for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS; i++) {
327     TOKU_ENGINE_STATUS_ROW status = &ltm_status_test.status[i];
328     if (strcmp(status->keyname, "LTM_ESCALATION_COUNT") == 0) {
329       res.escalation_count = status->value.num;
330       continue;
331     }
332     if (strcmp(status->keyname, "LTM_SIZE_CURRENT") == 0) {
333       res.current_lock_memory = status->value.num;
334     }
335   }
336   return res;
337 }
338 
MakeLockTreePtr(locktree * lt)339 std::shared_ptr<locktree> RangeTreeLockManager::MakeLockTreePtr(locktree* lt) {
340   locktree_manager* ltm = &ltm_;
341   return std::shared_ptr<locktree>(lt,
342                                    [ltm](locktree* p) { ltm->release_lt(p); });
343 }
344 
AddColumnFamily(const ColumnFamilyHandle * cfh)345 void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) {
346   uint32_t column_family_id = cfh->GetID();
347 
348   InstrumentedMutexLock l(&ltree_map_mutex_);
349   if (ltree_map_.find(column_family_id) == ltree_map_.end()) {
350     DICTIONARY_ID dict_id = {.dictid = column_family_id};
351     toku::comparator cmp;
352     cmp.create(CompareDbtEndpoints, (void*)cfh->GetComparator());
353     toku::locktree* ltree = ltm_.get_lt(dict_id, cmp,
354                                         /* on_create_extra*/ nullptr);
355     // This is ok to because get_lt has copied the comparator:
356     cmp.destroy();
357 
358     ltree_map_.insert({column_family_id, MakeLockTreePtr(ltree)});
359   }
360 }
361 
RemoveColumnFamily(const ColumnFamilyHandle * cfh)362 void RangeTreeLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cfh) {
363   uint32_t column_family_id = cfh->GetID();
364   // Remove lock_map for this column family.  Since the lock map is stored
365   // as a shared ptr, concurrent transactions can still keep using it
366   // until they release their references to it.
367 
368   // TODO what if one drops a column family while transaction(s) still have
369   // locks in it?
370   // locktree uses column family'c Comparator* as the criteria to do tree
371   // ordering. If the comparator is gone, we won't even be able to remove the
372   // elements from the locktree.
373   // A possible solution might be to remove everything right now:
374   //  - wait until everyone traversing the locktree are gone
375   //  - remove everything from the locktree.
376   //  - some transactions may have acquired locks in their LockTracker objects.
377   //    Arrange something so we don't blow up when they try to release them.
378   //  - ...
379   // This use case (drop column family while somebody is using it) doesn't seem
380   // the priority, though.
381 
382   {
383     InstrumentedMutexLock l(&ltree_map_mutex_);
384 
385     auto lock_maps_iter = ltree_map_.find(column_family_id);
386     assert(lock_maps_iter != ltree_map_.end());
387     ltree_map_.erase(lock_maps_iter);
388   }  // lock_map_mutex_
389 
390   autovector<void*> local_caches;
391   ltree_lookup_cache_->Scrape(&local_caches, nullptr);
392   for (auto cache : local_caches) {
393     delete static_cast<LockTreeMap*>(cache);
394   }
395 }
396 
GetLockTreeForCF(ColumnFamilyId column_family_id)397 std::shared_ptr<locktree> RangeTreeLockManager::GetLockTreeForCF(
398     ColumnFamilyId column_family_id) {
399   // First check thread-local cache
400   if (ltree_lookup_cache_->Get() == nullptr) {
401     ltree_lookup_cache_->Reset(new LockTreeMap());
402   }
403 
404   auto ltree_map_cache = static_cast<LockTreeMap*>(ltree_lookup_cache_->Get());
405 
406   auto it = ltree_map_cache->find(column_family_id);
407   if (it != ltree_map_cache->end()) {
408     // Found lock map for this column family.
409     return it->second;
410   }
411 
412   // Not found in local cache, grab mutex and check shared LockMaps
413   InstrumentedMutexLock l(&ltree_map_mutex_);
414 
415   it = ltree_map_.find(column_family_id);
416   if (it == ltree_map_.end()) {
417     return nullptr;
418   } else {
419     // Found lock map.  Store in thread-local cache and return.
420     ltree_map_cache->insert({column_family_id, it->second});
421     return it->second;
422   }
423 }
424 
425 struct LOCK_PRINT_CONTEXT {
426   RangeLockManagerHandle::RangeLockStatus* data;  // Save locks here
427   uint32_t cfh_id;  // Column Family whose tree we are traversing
428 };
429 
430 // Report left endpoints of the acquired locks
GetPointLockStatus()431 LockManager::PointLockStatus RangeTreeLockManager::GetPointLockStatus() {
432   PointLockStatus res;
433   LockManager::RangeLockStatus data = GetRangeLockStatus();
434   // report left endpoints
435   for (auto it = data.begin(); it != data.end(); ++it) {
436     auto& val = it->second;
437     res.insert({it->first, {val.start.slice, val.ids, val.exclusive}});
438   }
439   return res;
440 }
441 
push_into_lock_status_data(void * param,const DBT * left,const DBT * right,TXNID txnid_arg,bool is_shared,TxnidVector * owners)442 static void push_into_lock_status_data(void* param, const DBT* left,
443                                        const DBT* right, TXNID txnid_arg,
444                                        bool is_shared, TxnidVector* owners) {
445   struct LOCK_PRINT_CONTEXT* ctx = (LOCK_PRINT_CONTEXT*)param;
446   struct RangeLockInfo info;
447 
448   info.exclusive = !is_shared;
449 
450   deserialize_endpoint(left, &info.start);
451   deserialize_endpoint(right, &info.end);
452 
453   if (txnid_arg != TXNID_SHARED) {
454     TXNID txnid = ((PessimisticTransaction*)txnid_arg)->GetID();
455     info.ids.push_back(txnid);
456   } else {
457     for (auto it : *owners) {
458       TXNID real_id = ((PessimisticTransaction*)it)->GetID();
459       info.ids.push_back(real_id);
460     }
461   }
462   ctx->data->insert({ctx->cfh_id, info});
463 }
464 
GetRangeLockStatus()465 LockManager::RangeLockStatus RangeTreeLockManager::GetRangeLockStatus() {
466   LockManager::RangeLockStatus data;
467   {
468     InstrumentedMutexLock l(&ltree_map_mutex_);
469     for (auto it : ltree_map_) {
470       LOCK_PRINT_CONTEXT ctx = {&data, it.first};
471       it.second->dump_locks((void*)&ctx, push_into_lock_status_data);
472     }
473   }
474   return data;
475 }
476 
477 }  // namespace ROCKSDB_NAMESPACE
478 #endif  // OS_WIN
479 #endif  // ROCKSDB_LITE
480