1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
3 #ifndef ROCKSDB_LITE
4 #ifndef OS_WIN
5 #ident "$Id$"
6 /*======
7 This file is part of PerconaFT.
8
9
10 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
11
12 PerconaFT is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License, version 2,
14 as published by the Free Software Foundation.
15
16 PerconaFT is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
23
24 ----------------------------------------
25
26 PerconaFT is free software: you can redistribute it and/or modify
27 it under the terms of the GNU Affero General Public License, version 3,
28 as published by the Free Software Foundation.
29
30 PerconaFT is distributed in the hope that it will be useful,
31 but WITHOUT ANY WARRANTY; without even the implied warranty of
32 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
33 GNU Affero General Public License for more details.
34
35 You should have received a copy of the GNU Affero General Public License
36 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
37
38 ----------------------------------------
39
40 Licensed under the Apache License, Version 2.0 (the "License");
41 you may not use this file except in compliance with the License.
42 You may obtain a copy of the License at
43
44 http://www.apache.org/licenses/LICENSE-2.0
45
46 Unless required by applicable law or agreed to in writing, software
47 distributed under the License is distributed on an "AS IS" BASIS,
48 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49 See the License for the specific language governing permissions and
50 limitations under the License.
51 ======= */
52
53 #ident \
54 "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
55
56 #include "locktree.h"
57
58 #include <memory.h>
59
60 #include "../portability/toku_pthread.h"
61 #include "../portability/toku_time.h"
62 #include "../util/growable_array.h"
63 #include "range_buffer.h"
64
65 // including the concurrent_tree here expands the templates
66 // and "defines" the implementation, so we do it here in
67 // the locktree source file instead of the header.
68 #include "concurrent_tree.h"
69
70 namespace toku {
71 // A locktree represents the set of row locks owned by all transactions
72 // over an open dictionary. Read and write ranges are represented as
73 // a left and right key which are compared with the given descriptor
74 // and comparison fn.
75 //
76 // Each locktree has a reference count which it manages
77 // but does nothing based on the value of the reference count - it is
78 // up to the user of the locktree to destroy it when it sees fit.
79
create(locktree_manager * mgr,DICTIONARY_ID dict_id,const comparator & cmp,toku_external_mutex_factory_t mutex_factory)80 void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id,
81 const comparator &cmp,
82 toku_external_mutex_factory_t mutex_factory) {
83 m_mgr = mgr;
84 m_dict_id = dict_id;
85
86 m_cmp.create_from(cmp);
87 m_reference_count = 1;
88 m_userdata = nullptr;
89
90 XCALLOC(m_rangetree);
91 m_rangetree->create(&m_cmp);
92
93 m_sto_txnid = TXNID_NONE;
94 m_sto_buffer.create();
95 m_sto_score = STO_SCORE_THRESHOLD;
96 m_sto_end_early_count = 0;
97 m_sto_end_early_time = 0;
98
99 m_lock_request_info.init(mutex_factory);
100 }
101
init(toku_external_mutex_factory_t mutex_factory)102 void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) {
103 pending_lock_requests.create();
104 pending_is_empty = true;
105 toku_external_mutex_init(mutex_factory, &mutex);
106 retry_want = retry_done = 0;
107 ZERO_STRUCT(counters);
108 ZERO_STRUCT(retry_mutex);
109 toku_mutex_init(locktree_request_info_retry_mutex_key, &retry_mutex, nullptr);
110 toku_cond_init(locktree_request_info_retry_cv_key, &retry_cv, nullptr);
111 running_retry = false;
112
113 TOKU_VALGRIND_HG_DISABLE_CHECKING(&pending_is_empty,
114 sizeof(pending_is_empty));
115 TOKU_DRD_IGNORE_VAR(pending_is_empty);
116 }
117
destroy(void)118 void locktree::destroy(void) {
119 invariant(m_reference_count == 0);
120 invariant(m_lock_request_info.pending_lock_requests.size() == 0);
121 m_cmp.destroy();
122 m_rangetree->destroy();
123 toku_free(m_rangetree);
124 m_sto_buffer.destroy();
125 m_lock_request_info.destroy();
126 }
127
destroy(void)128 void lt_lock_request_info::destroy(void) {
129 pending_lock_requests.destroy();
130 toku_external_mutex_destroy(&mutex);
131 toku_mutex_destroy(&retry_mutex);
132 toku_cond_destroy(&retry_cv);
133 }
134
add_reference(void)135 void locktree::add_reference(void) {
136 (void)toku_sync_add_and_fetch(&m_reference_count, 1);
137 }
138
release_reference(void)139 uint32_t locktree::release_reference(void) {
140 return toku_sync_sub_and_fetch(&m_reference_count, 1);
141 }
142
get_reference_count(void)143 uint32_t locktree::get_reference_count(void) { return m_reference_count; }
144
145 // a container for a range/txnid pair
146 struct row_lock {
147 keyrange range;
148 TXNID txnid;
149 bool is_shared;
150 TxnidVector *owners;
151 };
152
153 // iterate over a locked keyrange and copy out all of the data,
154 // storing each row lock into the given growable array. the
155 // caller does not own the range inside the returned row locks,
156 // so remove from the tree with care using them as keys.
iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange * lkr,GrowableArray<row_lock> * row_locks)157 static void iterate_and_get_overlapping_row_locks(
158 const concurrent_tree::locked_keyrange *lkr,
159 GrowableArray<row_lock> *row_locks) {
160 struct copy_fn_obj {
161 GrowableArray<row_lock> *row_locks;
162 bool fn(const keyrange &range, TXNID txnid, bool is_shared,
163 TxnidVector *owners) {
164 row_lock lock = {.range = range,
165 .txnid = txnid,
166 .is_shared = is_shared,
167 .owners = owners};
168 row_locks->push(lock);
169 return true;
170 }
171 } copy_fn;
172 copy_fn.row_locks = row_locks;
173 lkr->iterate(©_fn);
174 }
175
176 // given a txnid and a set of overlapping row locks, determine
177 // which txnids are conflicting, and store them in the conflicts
178 // set, if given.
determine_conflicting_txnids(const GrowableArray<row_lock> & row_locks,const TXNID & txnid,txnid_set * conflicts)179 static bool determine_conflicting_txnids(
180 const GrowableArray<row_lock> &row_locks, const TXNID &txnid,
181 txnid_set *conflicts) {
182 bool conflicts_exist = false;
183 const size_t num_overlaps = row_locks.get_size();
184 for (size_t i = 0; i < num_overlaps; i++) {
185 const row_lock lock = row_locks.fetch_unchecked(i);
186 const TXNID other_txnid = lock.txnid;
187 if (other_txnid != txnid) {
188 if (conflicts) {
189 if (other_txnid == TXNID_SHARED) {
190 // Add all shared lock owners, except this transaction.
191 for (TXNID shared_id : *lock.owners) {
192 if (shared_id != txnid)
193 conflicts->add(shared_id);
194 }
195 } else {
196 conflicts->add(other_txnid);
197 }
198 }
199 conflicts_exist = true;
200 }
201 }
202 return conflicts_exist;
203 }
204
205 // how much memory does a row lock take up in a concurrent tree?
row_lock_size_in_tree(const row_lock & lock)206 static uint64_t row_lock_size_in_tree(const row_lock &lock) {
207 const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead();
208 return lock.range.get_memory_size() + overhead;
209 }
210
211 // remove and destroy the given row lock from the locked keyrange,
212 // then notify the memory tracker of the newly freed lock.
remove_row_lock_from_tree(concurrent_tree::locked_keyrange * lkr,const row_lock & lock,TXNID txnid,locktree_manager * mgr)213 static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
214 const row_lock &lock, TXNID txnid,
215 locktree_manager *mgr) {
216 const uint64_t mem_released = row_lock_size_in_tree(lock);
217 lkr->remove(lock.range, txnid);
218 if (mgr != nullptr) {
219 mgr->note_mem_released(mem_released);
220 }
221 }
222
223 // insert a row lock into the locked keyrange, then notify
224 // the memory tracker of this newly acquired lock.
insert_row_lock_into_tree(concurrent_tree::locked_keyrange * lkr,const row_lock & lock,locktree_manager * mgr)225 static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
226 const row_lock &lock,
227 locktree_manager *mgr) {
228 uint64_t mem_used = row_lock_size_in_tree(lock);
229 lkr->insert(lock.range, lock.txnid, lock.is_shared);
230 if (mgr != nullptr) {
231 mgr->note_mem_used(mem_used);
232 }
233 }
234
sto_begin(TXNID txnid)235 void locktree::sto_begin(TXNID txnid) {
236 invariant(m_sto_txnid == TXNID_NONE);
237 invariant(m_sto_buffer.is_empty());
238 m_sto_txnid = txnid;
239 }
240
sto_append(const DBT * left_key,const DBT * right_key,bool is_write_request)241 void locktree::sto_append(const DBT *left_key, const DBT *right_key,
242 bool is_write_request) {
243 uint64_t buffer_mem, delta;
244
245 // psergey: the below two lines do not make any sense
246 // (and it's the same in upstream TokuDB)
247 keyrange range;
248 range.create(left_key, right_key);
249
250 buffer_mem = m_sto_buffer.total_memory_size();
251 m_sto_buffer.append(left_key, right_key, is_write_request);
252 delta = m_sto_buffer.total_memory_size() - buffer_mem;
253 if (m_mgr != nullptr) {
254 m_mgr->note_mem_used(delta);
255 }
256 }
257
sto_end(void)258 void locktree::sto_end(void) {
259 uint64_t mem_size = m_sto_buffer.total_memory_size();
260 if (m_mgr != nullptr) {
261 m_mgr->note_mem_released(mem_size);
262 }
263 m_sto_buffer.destroy();
264 m_sto_buffer.create();
265 m_sto_txnid = TXNID_NONE;
266 }
267
sto_end_early_no_accounting(void * prepared_lkr)268 void locktree::sto_end_early_no_accounting(void *prepared_lkr) {
269 sto_migrate_buffer_ranges_to_tree(prepared_lkr);
270 sto_end();
271 toku_unsafe_set(m_sto_score, 0);
272 }
273
sto_end_early(void * prepared_lkr)274 void locktree::sto_end_early(void *prepared_lkr) {
275 m_sto_end_early_count++;
276
277 tokutime_t t0 = toku_time_now();
278 sto_end_early_no_accounting(prepared_lkr);
279 tokutime_t t1 = toku_time_now();
280
281 m_sto_end_early_time += (t1 - t0);
282 }
283
sto_migrate_buffer_ranges_to_tree(void * prepared_lkr)284 void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
285 // There should be something to migrate, and nothing in the rangetree.
286 invariant(!m_sto_buffer.is_empty());
287 invariant(m_rangetree->is_empty());
288
289 concurrent_tree sto_rangetree;
290 concurrent_tree::locked_keyrange sto_lkr;
291 sto_rangetree.create(&m_cmp);
292
293 // insert all of the ranges from the single txnid buffer into a new rangtree
294 range_buffer::iterator iter(&m_sto_buffer);
295 range_buffer::iterator::record rec;
296 while (iter.current(&rec)) {
297 sto_lkr.prepare(&sto_rangetree);
298 int r = acquire_lock_consolidated(&sto_lkr, m_sto_txnid, rec.get_left_key(),
299 rec.get_right_key(),
300 rec.get_exclusive_flag(), nullptr);
301 invariant_zero(r);
302 sto_lkr.release();
303 iter.next();
304 }
305
306 // Iterate the newly created rangetree and insert each range into the
307 // locktree's rangetree, on behalf of the old single txnid.
308 struct migrate_fn_obj {
309 concurrent_tree::locked_keyrange *dst_lkr;
310 bool fn(const keyrange &range, TXNID txnid, bool is_shared,
311 TxnidVector *owners) {
312 // There can't be multiple owners in STO mode
313 invariant_zero(owners);
314 dst_lkr->insert(range, txnid, is_shared);
315 return true;
316 }
317 } migrate_fn;
318 migrate_fn.dst_lkr =
319 static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
320 sto_lkr.prepare(&sto_rangetree);
321 sto_lkr.iterate(&migrate_fn);
322 sto_lkr.remove_all();
323 sto_lkr.release();
324 sto_rangetree.destroy();
325 invariant(!m_rangetree->is_empty());
326 }
327
sto_try_acquire(void * prepared_lkr,TXNID txnid,const DBT * left_key,const DBT * right_key,bool is_write_request)328 bool locktree::sto_try_acquire(void *prepared_lkr, TXNID txnid,
329 const DBT *left_key, const DBT *right_key,
330 bool is_write_request) {
331 if (m_rangetree->is_empty() && m_sto_buffer.is_empty() &&
332 toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) {
333 // We can do the optimization because the rangetree is empty, and
334 // we know its worth trying because the sto score is big enough.
335 sto_begin(txnid);
336 } else if (m_sto_txnid != TXNID_NONE) {
337 // We are currently doing the optimization. Check if we need to cancel
338 // it because a new txnid appeared, or if the current single txnid has
339 // taken too many locks already.
340 if (m_sto_txnid != txnid ||
341 m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) {
342 sto_end_early(prepared_lkr);
343 }
344 }
345
346 // At this point the sto txnid is properly set. If it is valid, then
347 // this txnid can append its lock to the sto buffer successfully.
348 if (m_sto_txnid != TXNID_NONE) {
349 invariant(m_sto_txnid == txnid);
350 sto_append(left_key, right_key, is_write_request);
351 return true;
352 } else {
353 invariant(m_sto_buffer.is_empty());
354 return false;
355 }
356 }
357
358 /*
359 Do the same as iterate_and_get_overlapping_row_locks does, but also check for
360 this:
361 The set of overlapping rows locks consists of just one read-only shared
362 lock with the same endpoints as specified (in that case, we can just add
363 ourselves into that list)
364
365 @return true - One compatible shared lock
366 false - Otherwise
367 */
iterate_and_get_overlapping_row_locks2(const concurrent_tree::locked_keyrange * lkr,const DBT * left_key,const DBT * right_key,comparator * cmp,TXNID,GrowableArray<row_lock> * row_locks)368 static bool iterate_and_get_overlapping_row_locks2(
369 const concurrent_tree::locked_keyrange *lkr, const DBT *left_key,
370 const DBT *right_key, comparator *cmp, TXNID,
371 GrowableArray<row_lock> *row_locks) {
372 struct copy_fn_obj {
373 GrowableArray<row_lock> *row_locks;
374 bool first_call = true;
375 bool matching_lock_found = false;
376 const DBT *left_key, *right_key;
377 comparator *cmp;
378
379 bool fn(const keyrange &range, TXNID txnid, bool is_shared,
380 TxnidVector *owners) {
381 if (first_call) {
382 first_call = false;
383 if (is_shared && !(*cmp)(left_key, range.get_left_key()) &&
384 !(*cmp)(right_key, range.get_right_key())) {
385 matching_lock_found = true;
386 }
387 } else {
388 // if we see multiple matching locks, it doesn't matter whether
389 // the first one was matching.
390 matching_lock_found = false;
391 }
392 row_lock lock = {.range = range,
393 .txnid = txnid,
394 .is_shared = is_shared,
395 .owners = owners};
396 row_locks->push(lock);
397 return true;
398 }
399 } copy_fn;
400 copy_fn.row_locks = row_locks;
401 copy_fn.left_key = left_key;
402 copy_fn.right_key = right_key;
403 copy_fn.cmp = cmp;
404 lkr->iterate(©_fn);
405 return copy_fn.matching_lock_found;
406 }
407
408 // try to acquire a lock and consolidate it with existing locks if possible
409 // param: lkr, a prepared locked keyrange
410 // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
acquire_lock_consolidated(void * prepared_lkr,TXNID txnid,const DBT * left_key,const DBT * right_key,bool is_write_request,txnid_set * conflicts)411 int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
412 const DBT *left_key,
413 const DBT *right_key,
414 bool is_write_request,
415 txnid_set *conflicts) {
416 int r = 0;
417 concurrent_tree::locked_keyrange *lkr;
418
419 keyrange requested_range;
420 requested_range.create(left_key, right_key);
421 lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
422 lkr->acquire(requested_range);
423
424 // copy out the set of overlapping row locks.
425 GrowableArray<row_lock> overlapping_row_locks;
426 overlapping_row_locks.init();
427 bool matching_shared_lock_found = false;
428
429 if (is_write_request)
430 iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks);
431 else {
432 matching_shared_lock_found = iterate_and_get_overlapping_row_locks2(
433 lkr, left_key, right_key, &m_cmp, txnid, &overlapping_row_locks);
434 // psergey-todo: what to do now? So, we have figured we have just one
435 // shareable lock. Need to add us into it as an owner but the lock
436 // pointer cannot be kept?
437 // A: use find_node_with_overlapping_child(key_range, nullptr);
438 // then, add ourselves to the owner list.
439 // Dont' foreget to release the subtree after that.
440 }
441
442 if (matching_shared_lock_found) {
443 // there is just one non-confliting matching shared lock.
444 // we are hilding a lock on it (see acquire() call above).
445 // we need to modify it to indicate there is another locker...
446 if (lkr->add_shared_owner(requested_range, txnid)) {
447 // Pretend shared lock uses as much memory.
448 row_lock new_lock = {.range = requested_range,
449 .txnid = txnid,
450 .is_shared = false,
451 .owners = nullptr};
452 uint64_t mem_used = row_lock_size_in_tree(new_lock);
453 if (m_mgr) {
454 m_mgr->note_mem_used(mem_used);
455 }
456 }
457 requested_range.destroy();
458 overlapping_row_locks.deinit();
459 return 0;
460 }
461
462 size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
463
464 // if any overlapping row locks conflict with this request, bail out.
465
466 bool conflicts_exist =
467 determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
468 if (!conflicts_exist) {
469 // there are no conflicts, so all of the overlaps are for the requesting
470 // txnid. so, we must consolidate all existing overlapping ranges and the
471 // requested range into one dominating range. then we insert the dominating
472 // range.
473 bool all_shared = !is_write_request;
474 for (size_t i = 0; i < num_overlapping_row_locks; i++) {
475 row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
476 invariant(overlapping_lock.txnid == txnid);
477 requested_range.extend(m_cmp, overlapping_lock.range);
478 remove_row_lock_from_tree(lkr, overlapping_lock, TXNID_ANY, m_mgr);
479 all_shared = all_shared && overlapping_lock.is_shared;
480 }
481
482 row_lock new_lock = {.range = requested_range,
483 .txnid = txnid,
484 .is_shared = all_shared,
485 .owners = nullptr};
486 insert_row_lock_into_tree(lkr, new_lock, m_mgr);
487 } else {
488 r = DB_LOCK_NOTGRANTED;
489 }
490
491 requested_range.destroy();
492 overlapping_row_locks.deinit();
493 return r;
494 }
495
496 // acquire a lock in the given key range, inclusive. if successful,
497 // return 0. otherwise, populate the conflicts txnid_set with the set of
498 // transactions that conflict with this request.
acquire_lock(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)499 int locktree::acquire_lock(bool is_write_request, TXNID txnid,
500 const DBT *left_key, const DBT *right_key,
501 txnid_set *conflicts) {
502 int r = 0;
503
504 // we are only supporting write locks for simplicity
505 // invariant(is_write_request);
506
507 // acquire and prepare a locked keyrange over the requested range.
508 // prepare is a serialzation point, so we take the opportunity to
509 // try the single txnid optimization first.
510 concurrent_tree::locked_keyrange lkr;
511 lkr.prepare(m_rangetree);
512
513 bool acquired =
514 sto_try_acquire(&lkr, txnid, left_key, right_key, is_write_request);
515 if (!acquired) {
516 r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key,
517 is_write_request, conflicts);
518 }
519
520 lkr.release();
521 return r;
522 }
523
try_acquire_lock(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)524 int locktree::try_acquire_lock(bool is_write_request, TXNID txnid,
525 const DBT *left_key, const DBT *right_key,
526 txnid_set *conflicts, bool big_txn) {
527 // All ranges in the locktree must have left endpoints <= right endpoints.
528 // Range comparisons rely on this fact, so we make a paranoid invariant here.
529 paranoid_invariant(m_cmp(left_key, right_key) <= 0);
530 int r = m_mgr == nullptr ? 0 : m_mgr->check_current_lock_constraints(big_txn);
531 if (r == 0) {
532 r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts);
533 }
534 return r;
535 }
536
537 // the locktree silently upgrades read locks to write locks for simplicity
acquire_read_lock(TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)538 int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key,
539 const DBT *right_key, txnid_set *conflicts,
540 bool big_txn) {
541 return try_acquire_lock(false, txnid, left_key, right_key, conflicts,
542 big_txn);
543 }
544
acquire_write_lock(TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)545 int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key,
546 const DBT *right_key, txnid_set *conflicts,
547 bool big_txn) {
548 return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn);
549 }
550
551 // typedef void (*dump_callback)(void *cdata, const DBT *left, const DBT *right,
552 // TXNID txnid);
dump_locks(void * cdata,dump_callback cb)553 void locktree::dump_locks(void *cdata, dump_callback cb) {
554 concurrent_tree::locked_keyrange lkr;
555 keyrange range;
556 range.create(toku_dbt_negative_infinity(), toku_dbt_positive_infinity());
557
558 lkr.prepare(m_rangetree);
559 lkr.acquire(range);
560
561 TXNID sto_txn;
562 if ((sto_txn = toku_unsafe_fetch(m_sto_txnid)) != TXNID_NONE) {
563 // insert all of the ranges from the single txnid buffer into a new rangtree
564 range_buffer::iterator iter(&m_sto_buffer);
565 range_buffer::iterator::record rec;
566 while (iter.current(&rec)) {
567 (*cb)(cdata, rec.get_left_key(), rec.get_right_key(), sto_txn,
568 !rec.get_exclusive_flag(), nullptr);
569 iter.next();
570 }
571 } else {
572 GrowableArray<row_lock> all_locks;
573 all_locks.init();
574 iterate_and_get_overlapping_row_locks(&lkr, &all_locks);
575
576 const size_t n_locks = all_locks.get_size();
577 for (size_t i = 0; i < n_locks; i++) {
578 const row_lock lock = all_locks.fetch_unchecked(i);
579 (*cb)(cdata, lock.range.get_left_key(), lock.range.get_right_key(),
580 lock.txnid, lock.is_shared, lock.owners);
581 }
582 all_locks.deinit();
583 }
584 lkr.release();
585 range.destroy();
586 }
587
get_conflicts(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)588 void locktree::get_conflicts(bool is_write_request, TXNID txnid,
589 const DBT *left_key, const DBT *right_key,
590 txnid_set *conflicts) {
591 // because we only support write locks, ignore this bit for now.
592 (void)is_write_request;
593
594 // preparing and acquire a locked keyrange over the range
595 keyrange range;
596 range.create(left_key, right_key);
597 concurrent_tree::locked_keyrange lkr;
598 lkr.prepare(m_rangetree);
599 lkr.acquire(range);
600
601 // copy out the set of overlapping row locks and determine the conflicts
602 GrowableArray<row_lock> overlapping_row_locks;
603 overlapping_row_locks.init();
604 iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
605
606 // we don't care if conflicts exist. we just want the conflicts set populated.
607 (void)determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
608
609 lkr.release();
610 overlapping_row_locks.deinit();
611 range.destroy();
612 }
613
614 // Effect:
615 // For each range in the lock tree that overlaps the given range and has
616 // the given txnid, remove it.
617 // Rationale:
618 // In the common case, there is only the range [left_key, right_key] and
619 // it is associated with txnid, so this is a single tree delete.
620 //
621 // However, consolidation and escalation change the objects in the tree
622 // without telling the txn anything. In this case, the txn may own a
623 // large range lock that represents its ownership of many smaller range
624 // locks. For example, the txn may think it owns point locks on keys 1,
625 // 2, and 3, but due to escalation, only the object [1,3] exists in the
626 // tree.
627 //
628 // The first call for a small lock will remove the large range lock, and
629 // the rest of the calls should do nothing. After the first release,
630 // another thread can acquire one of the locks that the txn thinks it
631 // still owns. That's ok, because the txn doesn't want it anymore (it
632 // unlocks everything at once), but it may find a lock that it does not
633 // own.
634 //
635 // In our example, the txn unlocks key 1, which actually removes the
636 // whole lock [1,3]. Now, someone else can lock 2 before our txn gets
637 // around to unlocking 2, so we should not remove that lock.
remove_overlapping_locks_for_txnid(TXNID txnid,const DBT * left_key,const DBT * right_key)638 void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
639 const DBT *left_key,
640 const DBT *right_key) {
641 keyrange release_range;
642 release_range.create(left_key, right_key);
643
644 // acquire and prepare a locked keyrange over the release range
645 concurrent_tree::locked_keyrange lkr;
646 lkr.prepare(m_rangetree);
647 lkr.acquire(release_range);
648
649 // copy out the set of overlapping row locks.
650 GrowableArray<row_lock> overlapping_row_locks;
651 overlapping_row_locks.init();
652 iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
653 size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
654
655 for (size_t i = 0; i < num_overlapping_row_locks; i++) {
656 row_lock lock = overlapping_row_locks.fetch_unchecked(i);
657 // If this isn't our lock, that's ok, just don't remove it.
658 // See rationale above.
659 // psergey-todo: for shared locks, just remove ourselves from the
660 // owners.
661 if (lock.txnid == txnid || (lock.owners && lock.owners->contains(txnid))) {
662 remove_row_lock_from_tree(&lkr, lock, txnid, m_mgr);
663 }
664 }
665
666 lkr.release();
667 overlapping_row_locks.deinit();
668 release_range.destroy();
669 }
670
sto_txnid_is_valid_unsafe(void) const671 bool locktree::sto_txnid_is_valid_unsafe(void) const {
672 return toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE;
673 }
674
sto_get_score_unsafe(void) const675 int locktree::sto_get_score_unsafe(void) const {
676 return toku_unsafe_fetch(m_sto_score);
677 }
678
sto_try_release(TXNID txnid)679 bool locktree::sto_try_release(TXNID txnid) {
680 bool released = false;
681 if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
682 // check the bit again with a prepared locked keyrange,
683 // which protects the optimization bits and rangetree data
684 concurrent_tree::locked_keyrange lkr;
685 lkr.prepare(m_rangetree);
686 if (m_sto_txnid != TXNID_NONE) {
687 // this txnid better be the single txnid on this locktree,
688 // or else we are in big trouble (meaning the logic is broken)
689 invariant(m_sto_txnid == txnid);
690 invariant(m_rangetree->is_empty());
691 sto_end();
692 released = true;
693 }
694 lkr.release();
695 }
696 return released;
697 }
698
699 // release all of the locks for a txnid whose endpoints are pairs
700 // in the given range buffer.
release_locks(TXNID txnid,const range_buffer * ranges,bool all_trx_locks_hint)701 void locktree::release_locks(TXNID txnid, const range_buffer *ranges,
702 bool all_trx_locks_hint) {
703 // try the single txn optimization. if it worked, then all of the
704 // locks are already released, otherwise we need to do it here.
705 bool released;
706 if (all_trx_locks_hint) {
707 // This will release all of the locks the transaction is holding
708 released = sto_try_release(txnid);
709 } else {
710 /*
711 psergey: we are asked to release *Some* of the locks the transaction
712 is holding.
713 We could try doing that without leaving the STO mode, but right now,
714 the easiest way is to exit the STO mode and let the non-STO code path
715 handle it.
716 */
717 if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
718 // check the bit again with a prepared locked keyrange,
719 // which protects the optimization bits and rangetree data
720 concurrent_tree::locked_keyrange lkr;
721 lkr.prepare(m_rangetree);
722 if (m_sto_txnid != TXNID_NONE) {
723 sto_end_early(&lkr);
724 }
725 lkr.release();
726 }
727 released = false;
728 }
729 if (!released) {
730 range_buffer::iterator iter(ranges);
731 range_buffer::iterator::record rec;
732 while (iter.current(&rec)) {
733 const DBT *left_key = rec.get_left_key();
734 const DBT *right_key = rec.get_right_key();
735 // All ranges in the locktree must have left endpoints <= right endpoints.
736 // Range comparisons rely on this fact, so we make a paranoid invariant
737 // here.
738 paranoid_invariant(m_cmp(left_key, right_key) <= 0);
739 remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
740 iter.next();
741 }
742 // Increase the sto score slightly. Eventually it will hit
743 // the threshold and we'll try the optimization again. This
744 // is how a previously multithreaded system transitions into
745 // a single threaded system that benefits from the optimization.
746 if (toku_unsafe_fetch(m_sto_score) < STO_SCORE_THRESHOLD) {
747 toku_sync_fetch_and_add(&m_sto_score, 1);
748 }
749 }
750 }
751
752 // iterate over a locked keyrange and extract copies of the first N
753 // row locks, storing each one into the given array of size N,
754 // then removing each extracted lock from the locked keyrange.
extract_first_n_row_locks(concurrent_tree::locked_keyrange * lkr,locktree_manager * mgr,row_lock * row_locks,int num_to_extract)755 static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
756 locktree_manager *mgr, row_lock *row_locks,
757 int num_to_extract) {
758 struct extract_fn_obj {
759 int num_extracted;
760 int num_to_extract;
761 row_lock *row_locks;
762 bool fn(const keyrange &range, TXNID txnid, bool is_shared,
763 TxnidVector *owners) {
764 if (num_extracted < num_to_extract) {
765 row_lock lock;
766 lock.range.create_copy(range);
767 lock.txnid = txnid;
768 lock.is_shared = is_shared;
769 // deep-copy the set of owners:
770 if (owners)
771 lock.owners = new TxnidVector(*owners);
772 else
773 lock.owners = nullptr;
774 row_locks[num_extracted++] = lock;
775 return true;
776 } else {
777 return false;
778 }
779 }
780 } extract_fn;
781
782 extract_fn.row_locks = row_locks;
783 extract_fn.num_to_extract = num_to_extract;
784 extract_fn.num_extracted = 0;
785 lkr->iterate(&extract_fn);
786
787 // now that the ranges have been copied out, complete
788 // the extraction by removing the ranges from the tree.
789 // use remove_row_lock_from_tree() so we properly track the
790 // amount of memory and number of locks freed.
791 int num_extracted = extract_fn.num_extracted;
792 invariant(num_extracted <= num_to_extract);
793 for (int i = 0; i < num_extracted; i++) {
794 remove_row_lock_from_tree(lkr, row_locks[i], TXNID_ANY, mgr);
795 }
796
797 return num_extracted;
798 }
799
800 // Store each newly escalated lock in a range buffer for appropriate txnid.
801 // We'll rebuild the locktree by iterating over these ranges, and then we
802 // can pass back each txnid/buffer pair individually through a callback
803 // to notify higher layers that locks have changed.
804 struct txnid_range_buffer {
805 TXNID txnid;
806 range_buffer buffer;
807
find_by_txnidtoku::txnid_range_buffer808 static int find_by_txnid(struct txnid_range_buffer *const &other_buffer,
809 const TXNID &txnid) {
810 if (txnid < other_buffer->txnid) {
811 return -1;
812 } else if (other_buffer->txnid == txnid) {
813 return 0;
814 } else {
815 return 1;
816 }
817 }
818 };
819
820 // escalate the locks in the locktree by merging adjacent
821 // locks that have the same txnid into one larger lock.
822 //
823 // if there's only one txnid in the locktree then this
824 // approach works well. if there are many txnids and each
825 // has locks in a random/alternating order, then this does
826 // not work so well.
escalate(lt_escalate_cb after_escalate_callback,void * after_escalate_callback_extra)827 void locktree::escalate(lt_escalate_cb after_escalate_callback,
828 void *after_escalate_callback_extra) {
829 omt<struct txnid_range_buffer *, struct txnid_range_buffer *> range_buffers;
830 range_buffers.create();
831
832 // prepare and acquire a locked keyrange on the entire locktree
833 concurrent_tree::locked_keyrange lkr;
834 keyrange infinite_range = keyrange::get_infinite_range();
835 lkr.prepare(m_rangetree);
836 lkr.acquire(infinite_range);
837
838 // if we're in the single txnid optimization, simply call it off.
839 // if you have to run escalation, you probably don't care about
840 // the optimization anyway, and this makes things easier.
841 if (m_sto_txnid != TXNID_NONE) {
842 // We are already accounting for this escalation time and
843 // count, so don't do it for sto_end_early too.
844 sto_end_early_no_accounting(&lkr);
845 }
846
847 // extract and remove batches of row locks from the locktree
848 int num_extracted;
849 const int num_row_locks_per_batch = 128;
850 row_lock *XCALLOC_N(num_row_locks_per_batch, extracted_buf);
851
852 // we always remove the "first" n because we are removing n
853 // each time we do an extraction. so this loops until its empty.
854 while ((num_extracted = extract_first_n_row_locks(
855 &lkr, m_mgr, extracted_buf, num_row_locks_per_batch)) > 0) {
856 int current_index = 0;
857 while (current_index < num_extracted) {
858 // every batch of extracted locks is in range-sorted order. search
859 // through them and merge adjacent locks with the same txnid into
860 // one dominating lock and save it to a set of escalated locks.
861 //
862 // first, find the index of the next row lock that
863 // - belongs to a different txnid, or
864 // - belongs to several txnids, or
865 // - is a shared lock (we could potentially merge those but
866 // currently we don't)
867 int next_txnid_index = current_index + 1;
868
869 while (next_txnid_index < num_extracted &&
870 (extracted_buf[current_index].txnid ==
871 extracted_buf[next_txnid_index].txnid) &&
872 !extracted_buf[next_txnid_index].is_shared &&
873 !extracted_buf[next_txnid_index].owners) {
874 next_txnid_index++;
875 }
876
877 // Create an escalated range for the current txnid that dominates
878 // each range between the current indext and the next txnid's index.
879 // const TXNID current_txnid = extracted_buf[current_index].txnid;
880 const DBT *escalated_left_key =
881 extracted_buf[current_index].range.get_left_key();
882 const DBT *escalated_right_key =
883 extracted_buf[next_txnid_index - 1].range.get_right_key();
884
885 // Try to find a range buffer for the current txnid. Create one if it
886 // doesn't exist. Then, append the new escalated range to the buffer. (If
887 // a lock is shared by multiple txnids, append it each of txnid's lists)
888 TxnidVector *owners_ptr;
889 TxnidVector singleton_owner;
890 if (extracted_buf[current_index].owners)
891 owners_ptr = extracted_buf[current_index].owners;
892 else {
893 singleton_owner.insert(extracted_buf[current_index].txnid);
894 owners_ptr = &singleton_owner;
895 }
896
897 for (auto cur_txnid : *owners_ptr) {
898 uint32_t idx;
899 struct txnid_range_buffer *existing_range_buffer;
900 int r =
901 range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
902 cur_txnid, &existing_range_buffer, &idx);
903 if (r == DB_NOTFOUND) {
904 struct txnid_range_buffer *XMALLOC(new_range_buffer);
905 new_range_buffer->txnid = cur_txnid;
906 new_range_buffer->buffer.create();
907 new_range_buffer->buffer.append(
908 escalated_left_key, escalated_right_key,
909 !extracted_buf[current_index].is_shared);
910 range_buffers.insert_at(new_range_buffer, idx);
911 } else {
912 invariant_zero(r);
913 invariant(existing_range_buffer->txnid == cur_txnid);
914 existing_range_buffer->buffer.append(
915 escalated_left_key, escalated_right_key,
916 !extracted_buf[current_index].is_shared);
917 }
918 }
919
920 current_index = next_txnid_index;
921 }
922
923 // destroy the ranges copied during the extraction
924 for (int i = 0; i < num_extracted; i++) {
925 delete extracted_buf[i].owners;
926 extracted_buf[i].range.destroy();
927 }
928 }
929 toku_free(extracted_buf);
930
931 // Rebuild the locktree from each range in each range buffer,
932 // then notify higher layers that the txnid's locks have changed.
933 //
934 // (shared locks: if a lock was initially shared between transactions TRX1,
935 // TRX2, etc, we will now try to acquire it acting on behalf on TRX1, on
936 // TRX2, etc. This will succeed and an identical shared lock will be
937 // constructed)
938
939 invariant(m_rangetree->is_empty());
940 const uint32_t num_range_buffers = range_buffers.size();
941 for (uint32_t i = 0; i < num_range_buffers; i++) {
942 struct txnid_range_buffer *current_range_buffer;
943 int r = range_buffers.fetch(i, ¤t_range_buffer);
944 invariant_zero(r);
945 if (r == EINVAL) // Shouldn't happen, avoid compiler warning
946 continue;
947
948 const TXNID current_txnid = current_range_buffer->txnid;
949 range_buffer::iterator iter(¤t_range_buffer->buffer);
950 range_buffer::iterator::record rec;
951 while (iter.current(&rec)) {
952 keyrange range;
953 range.create(rec.get_left_key(), rec.get_right_key());
954 row_lock lock = {.range = range,
955 .txnid = current_txnid,
956 .is_shared = !rec.get_exclusive_flag(),
957 .owners = nullptr};
958 insert_row_lock_into_tree(&lkr, lock, m_mgr);
959 iter.next();
960 }
961
962 // Notify higher layers that locks have changed for the current txnid
963 if (after_escalate_callback) {
964 after_escalate_callback(current_txnid, this, current_range_buffer->buffer,
965 after_escalate_callback_extra);
966 }
967 current_range_buffer->buffer.destroy();
968 }
969
970 while (range_buffers.size() > 0) {
971 struct txnid_range_buffer *buffer;
972 int r = range_buffers.fetch(0, &buffer);
973 invariant_zero(r);
974 r = range_buffers.delete_at(0);
975 invariant_zero(r);
976 toku_free(buffer);
977 }
978 range_buffers.destroy();
979
980 lkr.release();
981 }
982
get_userdata(void) const983 void *locktree::get_userdata(void) const { return m_userdata; }
984
set_userdata(void * userdata)985 void locktree::set_userdata(void *userdata) { m_userdata = userdata; }
986
get_lock_request_info(void)987 struct lt_lock_request_info *locktree::get_lock_request_info(void) {
988 return &m_lock_request_info;
989 }
990
set_comparator(const comparator & cmp)991 void locktree::set_comparator(const comparator &cmp) { m_cmp.inherit(cmp); }
992
get_manager(void) const993 locktree_manager *locktree::get_manager(void) const { return m_mgr; }
994
compare(const locktree * lt) const995 int locktree::compare(const locktree *lt) const {
996 if (m_dict_id.dictid < lt->m_dict_id.dictid) {
997 return -1;
998 } else if (m_dict_id.dictid == lt->m_dict_id.dictid) {
999 return 0;
1000 } else {
1001 return 1;
1002 }
1003 }
1004
get_dict_id() const1005 DICTIONARY_ID locktree::get_dict_id() const { return m_dict_id; }
1006
1007 } /* namespace toku */
1008 #endif // OS_WIN
1009 #endif // ROCKSDB_LITE
1010