1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7 #ifndef OS_WIN
8
9 #include "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h"
10
11 #include <algorithm>
12 #include <cinttypes>
13 #include <mutex>
14
15 #include "monitoring/perf_context_imp.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/utilities/transaction_db_mutex.h"
18 #include "test_util/sync_point.h"
19 #include "util/cast_util.h"
20 #include "util/hash.h"
21 #include "util/thread_local.h"
22 #include "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h"
23 #include "utilities/transactions/pessimistic_transaction_db.h"
24 #include "utilities/transactions/transaction_db_mutex_impl.h"
25
26 namespace ROCKSDB_NAMESPACE {
27
NewRangeLockManager(std::shared_ptr<TransactionDBMutexFactory> mutex_factory)28 RangeLockManagerHandle* NewRangeLockManager(
29 std::shared_ptr<TransactionDBMutexFactory> mutex_factory) {
30 std::shared_ptr<TransactionDBMutexFactory> use_factory;
31
32 if (mutex_factory) {
33 use_factory = mutex_factory;
34 } else {
35 use_factory.reset(new TransactionDBMutexFactoryImpl());
36 }
37 return new RangeTreeLockManager(use_factory);
38 }
39
40 static const char SUFFIX_INFIMUM = 0x0;
41 static const char SUFFIX_SUPREMUM = 0x1;
42
43 // Convert Endpoint into an internal format used for storing it in locktree
44 // (DBT structure is used for passing endpoints to locktree and getting back)
serialize_endpoint(const Endpoint & endp,std::string * buf)45 void serialize_endpoint(const Endpoint& endp, std::string* buf) {
46 buf->push_back(endp.inf_suffix ? SUFFIX_SUPREMUM : SUFFIX_INFIMUM);
47 buf->append(endp.slice.data(), endp.slice.size());
48 }
49
50 // Decode the endpoint from the format it is stored in the locktree (DBT) to
51 // one used outside (EndpointWithString)
deserialize_endpoint(const DBT * dbt,EndpointWithString * endp)52 void deserialize_endpoint(const DBT* dbt, EndpointWithString* endp) {
53 assert(dbt->size >= 1);
54 const char* dbt_data = (const char*)dbt->data;
55 char suffix = dbt_data[0];
56 assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM);
57 endp->inf_suffix = (suffix == SUFFIX_SUPREMUM);
58 endp->slice.assign(dbt_data + 1, dbt->size - 1);
59 }
60
61 // Get a range lock on [start_key; end_key] range
TryLock(PessimisticTransaction * txn,uint32_t column_family_id,const Endpoint & start_endp,const Endpoint & end_endp,Env *,bool exclusive)62 Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn,
63 uint32_t column_family_id,
64 const Endpoint& start_endp,
65 const Endpoint& end_endp, Env*,
66 bool exclusive) {
67 toku::lock_request request;
68 request.create(mutex_factory_);
69 DBT start_key_dbt, end_key_dbt;
70
71 TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:enter");
72 std::string start_key;
73 std::string end_key;
74 serialize_endpoint(start_endp, &start_key);
75 serialize_endpoint(end_endp, &end_key);
76
77 toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size());
78 toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size());
79
80 auto lt = GetLockTreeForCF(column_family_id);
81
82 // Put the key waited on into request's m_extra. See
83 // wait_callback_for_locktree for details.
84 std::string wait_key(start_endp.slice.data(), start_endp.slice.size());
85
86 request.set(lt.get(), (TXNID)txn, &start_key_dbt, &end_key_dbt,
87 exclusive ? toku::lock_request::WRITE : toku::lock_request::READ,
88 false /* not a big txn */, &wait_key);
89
90 // This is for "periodically wake up and check if the wait is killed" feature
91 // which we are not using.
92 uint64_t killed_time_msec = 0;
93 uint64_t wait_time_msec = txn->GetLockTimeout();
94
95 if (wait_time_msec == static_cast<uint64_t>(-1)) {
96 // The transaction has no wait timeout. lock_request::wait doesn't support
97 // this, it needs a number of milliseconds to wait. Pass it one year to
98 // be safe.
99 wait_time_msec = uint64_t(1000) * 60 * 60 * 24 * 365;
100 } else {
101 // convert microseconds to milliseconds
102 wait_time_msec = (wait_time_msec + 500) / 1000;
103 }
104
105 std::vector<RangeDeadlockInfo> di_path;
106 request.m_deadlock_cb = [&](TXNID txnid, bool is_exclusive,
107 const DBT* start_dbt, const DBT* end_dbt) {
108 EndpointWithString start;
109 EndpointWithString end;
110 deserialize_endpoint(start_dbt, &start);
111 deserialize_endpoint(end_dbt, &end);
112
113 di_path.push_back({((PessimisticTransaction*)txnid)->GetID(),
114 column_family_id, is_exclusive, std::move(start),
115 std::move(end)});
116 };
117
118 request.start();
119
120 const int r = request.wait(wait_time_msec, killed_time_msec,
121 nullptr, // killed_callback
122 wait_callback_for_locktree, nullptr);
123
124 // Inform the txn that we are no longer waiting:
125 txn->ClearWaitingTxn();
126
127 request.destroy();
128 switch (r) {
129 case 0:
130 break; // fall through
131 case DB_LOCK_NOTGRANTED:
132 return Status::TimedOut(Status::SubCode::kLockTimeout);
133 case TOKUDB_OUT_OF_LOCKS:
134 return Status::Busy(Status::SubCode::kLockLimit);
135 case DB_LOCK_DEADLOCK: {
136 std::reverse(di_path.begin(), di_path.end());
137 dlock_buffer_.AddNewPath(
138 RangeDeadlockPath(di_path, request.get_start_time()));
139 return Status::Busy(Status::SubCode::kDeadlock);
140 }
141 default:
142 assert(0);
143 return Status::Busy(Status::SubCode::kLockLimit);
144 }
145
146 return Status::OK();
147 }
148
149 // Wait callback that locktree library will call to inform us about
150 // the lock waits that are in progress.
wait_callback_for_locktree(void *,lock_wait_infos * infos)151 void wait_callback_for_locktree(void*, lock_wait_infos* infos) {
152 for (auto wait_info : *infos) {
153 auto txn = (PessimisticTransaction*)wait_info.waiter;
154 auto cf_id = (ColumnFamilyId)wait_info.ltree->get_dict_id().dictid;
155
156 autovector<TransactionID> waitee_ids;
157 for (auto waitee : wait_info.waitees) {
158 waitee_ids.push_back(((PessimisticTransaction*)waitee)->GetID());
159 }
160 txn->SetWaitingTxn(waitee_ids, cf_id, (std::string*)wait_info.m_extra);
161 }
162
163 // Here we can assume that the locktree code will now wait for some lock
164 TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:WaitingTxn");
165 }
166
UnLock(PessimisticTransaction * txn,ColumnFamilyId column_family_id,const std::string & key,Env *)167 void RangeTreeLockManager::UnLock(PessimisticTransaction* txn,
168 ColumnFamilyId column_family_id,
169 const std::string& key, Env*) {
170 auto locktree = GetLockTreeForCF(column_family_id);
171 std::string endp_image;
172 serialize_endpoint({key.data(), key.size(), false}, &endp_image);
173
174 DBT key_dbt;
175 toku_fill_dbt(&key_dbt, endp_image.data(), endp_image.size());
176
177 toku::range_buffer range_buf;
178 range_buf.create();
179 range_buf.append(&key_dbt, &key_dbt);
180
181 locktree->release_locks((TXNID)txn, &range_buf);
182 range_buf.destroy();
183
184 toku::lock_request::retry_all_lock_requests(
185 locktree.get(), wait_callback_for_locktree, nullptr);
186 }
187
UnLock(PessimisticTransaction * txn,const LockTracker & tracker,Env *)188 void RangeTreeLockManager::UnLock(PessimisticTransaction* txn,
189 const LockTracker& tracker, Env*) {
190 const RangeTreeLockTracker* range_tracker =
191 static_cast<const RangeTreeLockTracker*>(&tracker);
192
193 RangeTreeLockTracker* range_trx_tracker =
194 static_cast<RangeTreeLockTracker*>(&txn->GetTrackedLocks());
195 bool all_keys = (range_trx_tracker == range_tracker);
196
197 // tracked_locks_->range_list may hold nullptr if the transaction has never
198 // acquired any locks.
199 ((RangeTreeLockTracker*)range_tracker)->ReleaseLocks(this, txn, all_keys);
200 }
201
CompareDbtEndpoints(void * arg,const DBT * a_key,const DBT * b_key)202 int RangeTreeLockManager::CompareDbtEndpoints(void* arg, const DBT* a_key,
203 const DBT* b_key) {
204 const char* a = (const char*)a_key->data;
205 const char* b = (const char*)b_key->data;
206
207 size_t a_len = a_key->size;
208 size_t b_len = b_key->size;
209
210 size_t min_len = std::min(a_len, b_len);
211
212 // Compare the values. The first byte encodes the endpoint type, its value
213 // is either SUFFIX_INFIMUM or SUFFIX_SUPREMUM.
214 Comparator* cmp = (Comparator*)arg;
215 int res = cmp->Compare(Slice(a + 1, min_len - 1), Slice(b + 1, min_len - 1));
216 if (!res) {
217 if (b_len > min_len) {
218 // a is shorter;
219 if (a[0] == SUFFIX_INFIMUM) {
220 return -1; //"a is smaller"
221 } else {
222 // a is considered padded with 0xFF:FF:FF:FF...
223 return 1; // "a" is bigger
224 }
225 } else if (a_len > min_len) {
226 // the opposite of the above: b is shorter.
227 if (b[0] == SUFFIX_INFIMUM) {
228 return 1; //"b is smaller"
229 } else {
230 // b is considered padded with 0xFF:FF:FF:FF...
231 return -1; // "b" is bigger
232 }
233 } else {
234 // the lengths are equal (and the key values, too)
235 if (a[0] < b[0]) {
236 return -1;
237 } else if (a[0] > b[0]) {
238 return 1;
239 } else {
240 return 0;
241 }
242 }
243 } else {
244 return res;
245 }
246 }
247
248 namespace {
UnrefLockTreeMapsCache(void * ptr)249 void UnrefLockTreeMapsCache(void* ptr) {
250 // Called when a thread exits or a ThreadLocalPtr gets destroyed.
251 auto lock_tree_map_cache = static_cast<
252 std::unordered_map<ColumnFamilyId, std::shared_ptr<locktree>>*>(ptr);
253 delete lock_tree_map_cache;
254 }
255 } // anonymous namespace
256
RangeTreeLockManager(std::shared_ptr<TransactionDBMutexFactory> mutex_factory)257 RangeTreeLockManager::RangeTreeLockManager(
258 std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
259 : mutex_factory_(mutex_factory),
260 ltree_lookup_cache_(new ThreadLocalPtr(&UnrefLockTreeMapsCache)),
261 dlock_buffer_(10) {
262 ltm_.create(on_create, on_destroy, on_escalate, nullptr, mutex_factory_);
263 }
264
SetRangeDeadlockInfoBufferSize(uint32_t target_size)265 void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize(
266 uint32_t target_size) {
267 dlock_buffer_.Resize(target_size);
268 }
269
Resize(uint32_t target_size)270 void RangeTreeLockManager::Resize(uint32_t target_size) {
271 SetRangeDeadlockInfoBufferSize(target_size);
272 }
273
274 std::vector<RangeDeadlockPath>
GetRangeDeadlockInfoBuffer()275 RangeTreeLockManager::GetRangeDeadlockInfoBuffer() {
276 return dlock_buffer_.PrepareBuffer();
277 }
278
GetDeadlockInfoBuffer()279 std::vector<DeadlockPath> RangeTreeLockManager::GetDeadlockInfoBuffer() {
280 std::vector<DeadlockPath> res;
281 std::vector<RangeDeadlockPath> data = GetRangeDeadlockInfoBuffer();
282 // report left endpoints
283 for (auto it = data.begin(); it != data.end(); ++it) {
284 std::vector<DeadlockInfo> path;
285
286 for (auto it2 = it->path.begin(); it2 != it->path.end(); ++it2) {
287 path.push_back(
288 {it2->m_txn_id, it2->m_cf_id, it2->m_exclusive, it2->m_start.slice});
289 }
290 res.push_back(DeadlockPath(path, it->deadlock_time));
291 }
292 return res;
293 }
294
295 // @brief Lock Escalation Callback function
296 //
297 // @param txnid Transaction whose locks got escalated
298 // @param lt Lock Tree where escalation is happening
299 // @param buffer Escalation result: list of locks that this transaction now
300 // owns in this lock tree.
301 // @param void* Callback context
on_escalate(TXNID txnid,const locktree * lt,const range_buffer & buffer,void *)302 void RangeTreeLockManager::on_escalate(TXNID txnid, const locktree* lt,
303 const range_buffer& buffer, void*) {
304 auto txn = (PessimisticTransaction*)txnid;
305 ((RangeTreeLockTracker*)&txn->GetTrackedLocks())->ReplaceLocks(lt, buffer);
306 }
307
~RangeTreeLockManager()308 RangeTreeLockManager::~RangeTreeLockManager() {
309 autovector<void*> local_caches;
310 ltree_lookup_cache_->Scrape(&local_caches, nullptr);
311 for (auto cache : local_caches) {
312 delete static_cast<LockTreeMap*>(cache);
313 }
314 ltree_map_.clear(); // this will call release_lt() for all locktrees
315 ltm_.destroy();
316 }
317
GetStatus()318 RangeLockManagerHandle::Counters RangeTreeLockManager::GetStatus() {
319 LTM_STATUS_S ltm_status_test;
320 ltm_.get_status(<m_status_test);
321 Counters res;
322
323 // Searching status variable by its string name is how Toku's unit tests
324 // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?)
325 // lookup keyname in status
326 for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS; i++) {
327 TOKU_ENGINE_STATUS_ROW status = <m_status_test.status[i];
328 if (strcmp(status->keyname, "LTM_ESCALATION_COUNT") == 0) {
329 res.escalation_count = status->value.num;
330 continue;
331 }
332 if (strcmp(status->keyname, "LTM_SIZE_CURRENT") == 0) {
333 res.current_lock_memory = status->value.num;
334 }
335 }
336 return res;
337 }
338
MakeLockTreePtr(locktree * lt)339 std::shared_ptr<locktree> RangeTreeLockManager::MakeLockTreePtr(locktree* lt) {
340 locktree_manager* ltm = <m_;
341 return std::shared_ptr<locktree>(lt,
342 [ltm](locktree* p) { ltm->release_lt(p); });
343 }
344
AddColumnFamily(const ColumnFamilyHandle * cfh)345 void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) {
346 uint32_t column_family_id = cfh->GetID();
347
348 InstrumentedMutexLock l(<ree_map_mutex_);
349 if (ltree_map_.find(column_family_id) == ltree_map_.end()) {
350 DICTIONARY_ID dict_id = {.dictid = column_family_id};
351 toku::comparator cmp;
352 cmp.create(CompareDbtEndpoints, (void*)cfh->GetComparator());
353 toku::locktree* ltree = ltm_.get_lt(dict_id, cmp,
354 /* on_create_extra*/ nullptr);
355 // This is ok to because get_lt has copied the comparator:
356 cmp.destroy();
357
358 ltree_map_.insert({column_family_id, MakeLockTreePtr(ltree)});
359 }
360 }
361
RemoveColumnFamily(const ColumnFamilyHandle * cfh)362 void RangeTreeLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cfh) {
363 uint32_t column_family_id = cfh->GetID();
364 // Remove lock_map for this column family. Since the lock map is stored
365 // as a shared ptr, concurrent transactions can still keep using it
366 // until they release their references to it.
367
368 // TODO what if one drops a column family while transaction(s) still have
369 // locks in it?
370 // locktree uses column family'c Comparator* as the criteria to do tree
371 // ordering. If the comparator is gone, we won't even be able to remove the
372 // elements from the locktree.
373 // A possible solution might be to remove everything right now:
374 // - wait until everyone traversing the locktree are gone
375 // - remove everything from the locktree.
376 // - some transactions may have acquired locks in their LockTracker objects.
377 // Arrange something so we don't blow up when they try to release them.
378 // - ...
379 // This use case (drop column family while somebody is using it) doesn't seem
380 // the priority, though.
381
382 {
383 InstrumentedMutexLock l(<ree_map_mutex_);
384
385 auto lock_maps_iter = ltree_map_.find(column_family_id);
386 assert(lock_maps_iter != ltree_map_.end());
387 ltree_map_.erase(lock_maps_iter);
388 } // lock_map_mutex_
389
390 autovector<void*> local_caches;
391 ltree_lookup_cache_->Scrape(&local_caches, nullptr);
392 for (auto cache : local_caches) {
393 delete static_cast<LockTreeMap*>(cache);
394 }
395 }
396
GetLockTreeForCF(ColumnFamilyId column_family_id)397 std::shared_ptr<locktree> RangeTreeLockManager::GetLockTreeForCF(
398 ColumnFamilyId column_family_id) {
399 // First check thread-local cache
400 if (ltree_lookup_cache_->Get() == nullptr) {
401 ltree_lookup_cache_->Reset(new LockTreeMap());
402 }
403
404 auto ltree_map_cache = static_cast<LockTreeMap*>(ltree_lookup_cache_->Get());
405
406 auto it = ltree_map_cache->find(column_family_id);
407 if (it != ltree_map_cache->end()) {
408 // Found lock map for this column family.
409 return it->second;
410 }
411
412 // Not found in local cache, grab mutex and check shared LockMaps
413 InstrumentedMutexLock l(<ree_map_mutex_);
414
415 it = ltree_map_.find(column_family_id);
416 if (it == ltree_map_.end()) {
417 return nullptr;
418 } else {
419 // Found lock map. Store in thread-local cache and return.
420 ltree_map_cache->insert({column_family_id, it->second});
421 return it->second;
422 }
423 }
424
425 struct LOCK_PRINT_CONTEXT {
426 RangeLockManagerHandle::RangeLockStatus* data; // Save locks here
427 uint32_t cfh_id; // Column Family whose tree we are traversing
428 };
429
430 // Report left endpoints of the acquired locks
GetPointLockStatus()431 LockManager::PointLockStatus RangeTreeLockManager::GetPointLockStatus() {
432 PointLockStatus res;
433 LockManager::RangeLockStatus data = GetRangeLockStatus();
434 // report left endpoints
435 for (auto it = data.begin(); it != data.end(); ++it) {
436 auto& val = it->second;
437 res.insert({it->first, {val.start.slice, val.ids, val.exclusive}});
438 }
439 return res;
440 }
441
push_into_lock_status_data(void * param,const DBT * left,const DBT * right,TXNID txnid_arg,bool is_shared,TxnidVector * owners)442 static void push_into_lock_status_data(void* param, const DBT* left,
443 const DBT* right, TXNID txnid_arg,
444 bool is_shared, TxnidVector* owners) {
445 struct LOCK_PRINT_CONTEXT* ctx = (LOCK_PRINT_CONTEXT*)param;
446 struct RangeLockInfo info;
447
448 info.exclusive = !is_shared;
449
450 deserialize_endpoint(left, &info.start);
451 deserialize_endpoint(right, &info.end);
452
453 if (txnid_arg != TXNID_SHARED) {
454 TXNID txnid = ((PessimisticTransaction*)txnid_arg)->GetID();
455 info.ids.push_back(txnid);
456 } else {
457 for (auto it : *owners) {
458 TXNID real_id = ((PessimisticTransaction*)it)->GetID();
459 info.ids.push_back(real_id);
460 }
461 }
462 ctx->data->insert({ctx->cfh_id, info});
463 }
464
GetRangeLockStatus()465 LockManager::RangeLockStatus RangeTreeLockManager::GetRangeLockStatus() {
466 LockManager::RangeLockStatus data;
467 {
468 InstrumentedMutexLock l(<ree_map_mutex_);
469 for (auto it : ltree_map_) {
470 LOCK_PRINT_CONTEXT ctx = {&data, it.first};
471 it.second->dump_locks((void*)&ctx, push_into_lock_status_data);
472 }
473 }
474 return data;
475 }
476
477 } // namespace ROCKSDB_NAMESPACE
478 #endif // OS_WIN
479 #endif // ROCKSDB_LITE
480