1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35
36 ----------------------------------------
37
38 Licensed under the Apache License, Version 2.0 (the "License");
39 you may not use this file except in compliance with the License.
40 You may obtain a copy of the License at
41
42 http://www.apache.org/licenses/LICENSE-2.0
43
44 Unless required by applicable law or agreed to in writing, software
45 distributed under the License is distributed on an "AS IS" BASIS,
46 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47 See the License for the specific language governing permissions and
48 limitations under the License.
49 ======= */
50
51 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
52
53 #include <memory.h>
54
55 #include <util/growable_array.h>
56
57 #include <portability/toku_pthread.h>
58 #include <portability/toku_time.h>
59
60 #include "locktree.h"
61 #include "range_buffer.h"
62
63 // including the concurrent_tree here expands the templates
64 // and "defines" the implementation, so we do it here in
65 // the locktree source file instead of the header.
66 #include "concurrent_tree.h"
67
68 namespace toku {
69
70 // A locktree represents the set of row locks owned by all transactions
71 // over an open dictionary. Read and write ranges are represented as
72 // a left and right key which are compared with the given descriptor
73 // and comparison fn.
74 //
75 // Each locktree has a reference count which it manages
76 // but does nothing based on the value of the reference count - it is
77 // up to the user of the locktree to destroy it when it sees fit.
78
create(locktree_manager * mgr,DICTIONARY_ID dict_id,const comparator & cmp)79 void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp) {
80 m_mgr = mgr;
81 m_dict_id = dict_id;
82
83 m_cmp.create_from(cmp);
84 m_reference_count = 1;
85 m_userdata = nullptr;
86
87 XCALLOC(m_rangetree);
88 m_rangetree->create(&m_cmp);
89
90 m_sto_txnid = TXNID_NONE;
91 m_sto_buffer.create();
92 m_sto_score = STO_SCORE_THRESHOLD;
93 m_sto_end_early_count = 0;
94 m_sto_end_early_time = 0;
95
96 m_lock_request_info.init();
97 }
98
init(void)99 void lt_lock_request_info::init(void) {
100 pending_lock_requests.create();
101 pending_is_empty = true;
102 ZERO_STRUCT(mutex);
103 toku_mutex_init(*locktree_request_info_mutex_key, &mutex, nullptr);
104 retry_want = retry_done = 0;
105 ZERO_STRUCT(counters);
106 ZERO_STRUCT(retry_mutex);
107 toku_mutex_init(
108 *locktree_request_info_retry_mutex_key, &retry_mutex, nullptr);
109 toku_cond_init(*locktree_request_info_retry_cv_key, &retry_cv, nullptr);
110 running_retry = false;
111
112 TOKU_VALGRIND_HG_DISABLE_CHECKING(&pending_is_empty,
113 sizeof(pending_is_empty));
114 TOKU_DRD_IGNORE_VAR(pending_is_empty);
115 }
116
destroy(void)117 void locktree::destroy(void) {
118 invariant(m_reference_count == 0);
119 invariant(m_lock_request_info.pending_lock_requests.size() == 0);
120 m_cmp.destroy();
121 m_rangetree->destroy();
122 toku_free(m_rangetree);
123 m_sto_buffer.destroy();
124 m_lock_request_info.destroy();
125 }
126
destroy(void)127 void lt_lock_request_info::destroy(void) {
128 pending_lock_requests.destroy();
129 toku_mutex_destroy(&mutex);
130 toku_mutex_destroy(&retry_mutex);
131 toku_cond_destroy(&retry_cv);
132 }
133
add_reference(void)134 void locktree::add_reference(void) {
135 (void)toku_sync_add_and_fetch(&m_reference_count, 1);
136 }
137
release_reference(void)138 uint32_t locktree::release_reference(void) {
139 return toku_sync_sub_and_fetch(&m_reference_count, 1);
140 }
141
get_reference_count(void)142 uint32_t locktree::get_reference_count(void) {
143 return m_reference_count;
144 }
145
146 // a container for a range/txnid pair
147 struct row_lock {
148 keyrange range;
149 TXNID txnid;
150 };
151
152 // iterate over a locked keyrange and copy out all of the data,
153 // storing each row lock into the given growable array. the
154 // caller does not own the range inside the returned row locks,
155 // so remove from the tree with care using them as keys.
iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange * lkr,GrowableArray<row_lock> * row_locks)156 static void iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange *lkr,
157 GrowableArray<row_lock> *row_locks) {
158 struct copy_fn_obj {
159 GrowableArray<row_lock> *row_locks;
160 bool fn(const keyrange &range, TXNID txnid) {
161 row_lock lock = { .range = range, .txnid = txnid };
162 row_locks->push(lock);
163 return true;
164 }
165 } copy_fn;
166 copy_fn.row_locks = row_locks;
167 lkr->iterate(©_fn);
168 }
169
170 // given a txnid and a set of overlapping row locks, determine
171 // which txnids are conflicting, and store them in the conflicts
172 // set, if given.
determine_conflicting_txnids(const GrowableArray<row_lock> & row_locks,const TXNID & txnid,txnid_set * conflicts)173 static bool determine_conflicting_txnids(const GrowableArray<row_lock> &row_locks,
174 const TXNID &txnid, txnid_set *conflicts) {
175 bool conflicts_exist = false;
176 const size_t num_overlaps = row_locks.get_size();
177 for (size_t i = 0; i < num_overlaps; i++) {
178 const row_lock lock = row_locks.fetch_unchecked(i);
179 const TXNID other_txnid = lock.txnid;
180 if (other_txnid != txnid) {
181 if (conflicts) {
182 conflicts->add(other_txnid);
183 }
184 conflicts_exist = true;
185 }
186 }
187 return conflicts_exist;
188 }
189
190 // how much memory does a row lock take up in a concurrent tree?
row_lock_size_in_tree(const row_lock & lock)191 static uint64_t row_lock_size_in_tree(const row_lock &lock) {
192 const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead();
193 return lock.range.get_memory_size() + overhead;
194 }
195
196 // remove and destroy the given row lock from the locked keyrange,
197 // then notify the memory tracker of the newly freed lock.
remove_row_lock_from_tree(concurrent_tree::locked_keyrange * lkr,const row_lock & lock,locktree_manager * mgr)198 static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
199 const row_lock &lock, locktree_manager *mgr) {
200 const uint64_t mem_released = row_lock_size_in_tree(lock);
201 lkr->remove(lock.range);
202 if (mgr != nullptr) {
203 mgr->note_mem_released(mem_released);
204 }
205 }
206
207 // insert a row lock into the locked keyrange, then notify
208 // the memory tracker of this newly acquired lock.
insert_row_lock_into_tree(concurrent_tree::locked_keyrange * lkr,const row_lock & lock,locktree_manager * mgr)209 static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
210 const row_lock &lock, locktree_manager *mgr) {
211 uint64_t mem_used = row_lock_size_in_tree(lock);
212 lkr->insert(lock.range, lock.txnid);
213 if (mgr != nullptr) {
214 mgr->note_mem_used(mem_used);
215 }
216 }
217
sto_begin(TXNID txnid)218 void locktree::sto_begin(TXNID txnid) {
219 invariant(m_sto_txnid == TXNID_NONE);
220 invariant(m_sto_buffer.is_empty());
221 m_sto_txnid = txnid;
222 }
223
sto_append(const DBT * left_key,const DBT * right_key)224 void locktree::sto_append(const DBT *left_key, const DBT *right_key) {
225 uint64_t buffer_mem, delta;
226 keyrange range;
227 range.create(left_key, right_key);
228
229 buffer_mem = m_sto_buffer.total_memory_size();
230 m_sto_buffer.append(left_key, right_key);
231 delta = m_sto_buffer.total_memory_size() - buffer_mem;
232 if (m_mgr != nullptr) {
233 m_mgr->note_mem_used(delta);
234 }
235 }
236
sto_end(void)237 void locktree::sto_end(void) {
238 uint64_t mem_size = m_sto_buffer.total_memory_size();
239 if (m_mgr != nullptr) {
240 m_mgr->note_mem_released(mem_size);
241 }
242 m_sto_buffer.destroy();
243 m_sto_buffer.create();
244 m_sto_txnid = TXNID_NONE;
245 }
246
sto_end_early_no_accounting(void * prepared_lkr)247 void locktree::sto_end_early_no_accounting(void *prepared_lkr) {
248 sto_migrate_buffer_ranges_to_tree(prepared_lkr);
249 sto_end();
250 toku_unsafe_set(m_sto_score, 0);
251 }
252
sto_end_early(void * prepared_lkr)253 void locktree::sto_end_early(void *prepared_lkr) {
254 m_sto_end_early_count++;
255
256 tokutime_t t0 = toku_time_now();
257 sto_end_early_no_accounting(prepared_lkr);
258 tokutime_t t1 = toku_time_now();
259
260 m_sto_end_early_time += (t1 - t0);
261 }
262
sto_migrate_buffer_ranges_to_tree(void * prepared_lkr)263 void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
264 // There should be something to migrate, and nothing in the rangetree.
265 invariant(!m_sto_buffer.is_empty());
266 invariant(m_rangetree->is_empty());
267
268 concurrent_tree sto_rangetree;
269 concurrent_tree::locked_keyrange sto_lkr;
270 sto_rangetree.create(&m_cmp);
271
272 // insert all of the ranges from the single txnid buffer into a new rangtree
273 range_buffer::iterator iter(&m_sto_buffer);
274 range_buffer::iterator::record rec;
275 while (iter.current(&rec)) {
276 sto_lkr.prepare(&sto_rangetree);
277 int r = acquire_lock_consolidated(&sto_lkr,
278 m_sto_txnid, rec.get_left_key(), rec.get_right_key(), nullptr);
279 invariant_zero(r);
280 sto_lkr.release();
281 iter.next();
282 }
283
284 // Iterate the newly created rangetree and insert each range into the
285 // locktree's rangetree, on behalf of the old single txnid.
286 struct migrate_fn_obj {
287 concurrent_tree::locked_keyrange *dst_lkr;
288 bool fn(const keyrange &range, TXNID txnid) {
289 dst_lkr->insert(range, txnid);
290 return true;
291 }
292 } migrate_fn;
293 migrate_fn.dst_lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
294 sto_lkr.prepare(&sto_rangetree);
295 sto_lkr.iterate(&migrate_fn);
296 sto_lkr.remove_all();
297 sto_lkr.release();
298 sto_rangetree.destroy();
299 invariant(!m_rangetree->is_empty());
300 }
301
sto_try_acquire(void * prepared_lkr,TXNID txnid,const DBT * left_key,const DBT * right_key)302 bool locktree::sto_try_acquire(void *prepared_lkr,
303 TXNID txnid,
304 const DBT *left_key, const DBT *right_key) {
305 if (m_rangetree->is_empty() && m_sto_buffer.is_empty() && toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) {
306 // We can do the optimization because the rangetree is empty, and
307 // we know its worth trying because the sto score is big enough.
308 sto_begin(txnid);
309 } else if (m_sto_txnid != TXNID_NONE) {
310 // We are currently doing the optimization. Check if we need to cancel
311 // it because a new txnid appeared, or if the current single txnid has
312 // taken too many locks already.
313 if (m_sto_txnid != txnid || m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) {
314 sto_end_early(prepared_lkr);
315 }
316 }
317
318 // At this point the sto txnid is properly set. If it is valid, then
319 // this txnid can append its lock to the sto buffer successfully.
320 if (m_sto_txnid != TXNID_NONE) {
321 invariant(m_sto_txnid == txnid);
322 sto_append(left_key, right_key);
323 return true;
324 } else {
325 invariant(m_sto_buffer.is_empty());
326 return false;
327 }
328 }
329
330 // try to acquire a lock and consolidate it with existing locks if possible
331 // param: lkr, a prepared locked keyrange
332 // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
acquire_lock_consolidated(void * prepared_lkr,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)333 int locktree::acquire_lock_consolidated(void *prepared_lkr,
334 TXNID txnid,
335 const DBT *left_key, const DBT *right_key,
336 txnid_set *conflicts) {
337 int r = 0;
338 concurrent_tree::locked_keyrange *lkr;
339
340 keyrange requested_range;
341 requested_range.create(left_key, right_key);
342 lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
343 lkr->acquire(requested_range);
344
345 // copy out the set of overlapping row locks.
346 GrowableArray<row_lock> overlapping_row_locks;
347 overlapping_row_locks.init();
348 iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks);
349 size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
350
351 // if any overlapping row locks conflict with this request, bail out.
352 bool conflicts_exist = determine_conflicting_txnids(overlapping_row_locks,
353 txnid, conflicts);
354 if (!conflicts_exist) {
355 // there are no conflicts, so all of the overlaps are for the requesting txnid.
356 // so, we must consolidate all existing overlapping ranges and the requested
357 // range into one dominating range. then we insert the dominating range.
358 for (size_t i = 0; i < num_overlapping_row_locks; i++) {
359 row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
360 invariant(overlapping_lock.txnid == txnid);
361 requested_range.extend(m_cmp, overlapping_lock.range);
362 remove_row_lock_from_tree(lkr, overlapping_lock, m_mgr);
363 }
364
365 row_lock new_lock = { .range = requested_range, .txnid = txnid };
366 insert_row_lock_into_tree(lkr, new_lock, m_mgr);
367 } else {
368 r = DB_LOCK_NOTGRANTED;
369 }
370
371 requested_range.destroy();
372 overlapping_row_locks.deinit();
373 return r;
374 }
375
376 // acquire a lock in the given key range, inclusive. if successful,
377 // return 0. otherwise, populate the conflicts txnid_set with the set of
378 // transactions that conflict with this request.
acquire_lock(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)379 int locktree::acquire_lock(bool is_write_request,
380 TXNID txnid,
381 const DBT *left_key, const DBT *right_key,
382 txnid_set *conflicts) {
383 int r = 0;
384
385 // we are only supporting write locks for simplicity
386 invariant(is_write_request);
387
388 // acquire and prepare a locked keyrange over the requested range.
389 // prepare is a serialzation point, so we take the opportunity to
390 // try the single txnid optimization first.
391 concurrent_tree::locked_keyrange lkr;
392 lkr.prepare(m_rangetree);
393
394 bool acquired = sto_try_acquire(&lkr, txnid, left_key, right_key);
395 if (!acquired) {
396 r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key, conflicts);
397 }
398
399 lkr.release();
400 return r;
401 }
402
try_acquire_lock(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)403 int locktree::try_acquire_lock(bool is_write_request,
404 TXNID txnid,
405 const DBT *left_key, const DBT *right_key,
406 txnid_set *conflicts, bool big_txn) {
407 // All ranges in the locktree must have left endpoints <= right endpoints.
408 // Range comparisons rely on this fact, so we make a paranoid invariant here.
409 paranoid_invariant(m_cmp(left_key, right_key) <= 0);
410 int r = m_mgr == nullptr ? 0 :
411 m_mgr->check_current_lock_constraints(big_txn);
412 if (r == 0) {
413 r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts);
414 }
415 return r;
416 }
417
418 // the locktree silently upgrades read locks to write locks for simplicity
acquire_read_lock(TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)419 int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
420 txnid_set *conflicts, bool big_txn) {
421 return acquire_write_lock(txnid, left_key, right_key, conflicts, big_txn);
422 }
423
acquire_write_lock(TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)424 int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
425 txnid_set *conflicts, bool big_txn) {
426 return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn);
427 }
428
get_conflicts(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)429 void locktree::get_conflicts(bool is_write_request,
430 TXNID txnid, const DBT *left_key, const DBT *right_key,
431 txnid_set *conflicts) {
432 // because we only support write locks, ignore this bit for now.
433 (void) is_write_request;
434
435 // preparing and acquire a locked keyrange over the range
436 keyrange range;
437 range.create(left_key, right_key);
438 concurrent_tree::locked_keyrange lkr;
439 lkr.prepare(m_rangetree);
440 lkr.acquire(range);
441
442 // copy out the set of overlapping row locks and determine the conflicts
443 GrowableArray<row_lock> overlapping_row_locks;
444 overlapping_row_locks.init();
445 iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
446
447 // we don't care if conflicts exist. we just want the conflicts set populated.
448 (void) determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
449
450 lkr.release();
451 overlapping_row_locks.deinit();
452 range.destroy();
453 }
454
455 // Effect:
456 // For each range in the lock tree that overlaps the given range and has
457 // the given txnid, remove it.
458 // Rationale:
459 // In the common case, there is only the range [left_key, right_key] and
460 // it is associated with txnid, so this is a single tree delete.
461 //
462 // However, consolidation and escalation change the objects in the tree
463 // without telling the txn anything. In this case, the txn may own a
464 // large range lock that represents its ownership of many smaller range
465 // locks. For example, the txn may think it owns point locks on keys 1,
466 // 2, and 3, but due to escalation, only the object [1,3] exists in the
467 // tree.
468 //
469 // The first call for a small lock will remove the large range lock, and
470 // the rest of the calls should do nothing. After the first release,
471 // another thread can acquire one of the locks that the txn thinks it
472 // still owns. That's ok, because the txn doesn't want it anymore (it
473 // unlocks everything at once), but it may find a lock that it does not
474 // own.
475 //
476 // In our example, the txn unlocks key 1, which actually removes the
477 // whole lock [1,3]. Now, someone else can lock 2 before our txn gets
478 // around to unlocking 2, so we should not remove that lock.
remove_overlapping_locks_for_txnid(TXNID txnid,const DBT * left_key,const DBT * right_key)479 void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
480 const DBT *left_key,
481 const DBT *right_key) {
482 keyrange release_range;
483 release_range.create(left_key, right_key);
484
485 // acquire and prepare a locked keyrange over the release range
486 concurrent_tree::locked_keyrange lkr;
487 lkr.prepare(m_rangetree);
488 lkr.acquire(release_range);
489
490 // copy out the set of overlapping row locks.
491 GrowableArray<row_lock> overlapping_row_locks;
492 overlapping_row_locks.init();
493 iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
494 size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
495
496 for (size_t i = 0; i < num_overlapping_row_locks; i++) {
497 row_lock lock = overlapping_row_locks.fetch_unchecked(i);
498 // If this isn't our lock, that's ok, just don't remove it.
499 // See rationale above.
500 if (lock.txnid == txnid) {
501 remove_row_lock_from_tree(&lkr, lock, m_mgr);
502 }
503 }
504
505 lkr.release();
506 overlapping_row_locks.deinit();
507 release_range.destroy();
508 }
509
sto_txnid_is_valid_unsafe(void) const510 bool locktree::sto_txnid_is_valid_unsafe(void) const {
511 return toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE;
512 }
513
sto_get_score_unsafe(void) const514 int locktree::sto_get_score_unsafe(void) const {
515 return toku_unsafe_fetch(m_sto_score);
516 }
517
sto_try_release(TXNID txnid)518 bool locktree::sto_try_release(TXNID txnid) {
519 bool released = false;
520 if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
521 // check the bit again with a prepared locked keyrange,
522 // which protects the optimization bits and rangetree data
523 concurrent_tree::locked_keyrange lkr;
524 lkr.prepare(m_rangetree);
525 if (m_sto_txnid != TXNID_NONE) {
526 // this txnid better be the single txnid on this locktree,
527 // or else we are in big trouble (meaning the logic is broken)
528 invariant(m_sto_txnid == txnid);
529 invariant(m_rangetree->is_empty());
530 sto_end();
531 released = true;
532 }
533 lkr.release();
534 }
535 return released;
536 }
537
538 // release all of the locks for a txnid whose endpoints are pairs
539 // in the given range buffer.
release_locks(TXNID txnid,const range_buffer * ranges)540 void locktree::release_locks(TXNID txnid, const range_buffer *ranges) {
541 // try the single txn optimization. if it worked, then all of the
542 // locks are already released, otherwise we need to do it here.
543 bool released = sto_try_release(txnid);
544 if (!released) {
545 range_buffer::iterator iter(ranges);
546 range_buffer::iterator::record rec;
547 while (iter.current(&rec)) {
548 const DBT *left_key = rec.get_left_key();
549 const DBT *right_key = rec.get_right_key();
550 // All ranges in the locktree must have left endpoints <= right endpoints.
551 // Range comparisons rely on this fact, so we make a paranoid invariant here.
552 paranoid_invariant(m_cmp(left_key, right_key) <= 0);
553 remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
554 iter.next();
555 }
556 // Increase the sto score slightly. Eventually it will hit
557 // the threshold and we'll try the optimization again. This
558 // is how a previously multithreaded system transitions into
559 // a single threaded system that benefits from the optimization.
560 if (toku_unsafe_fetch(m_sto_score) < STO_SCORE_THRESHOLD) {
561 toku_sync_fetch_and_add(&m_sto_score, 1);
562 }
563 }
564 }
565
566 // iterate over a locked keyrange and extract copies of the first N
567 // row locks, storing each one into the given array of size N,
568 // then removing each extracted lock from the locked keyrange.
extract_first_n_row_locks(concurrent_tree::locked_keyrange * lkr,locktree_manager * mgr,row_lock * row_locks,int num_to_extract)569 static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
570 locktree_manager *mgr,
571 row_lock *row_locks, int num_to_extract) {
572
573 struct extract_fn_obj {
574 int num_extracted;
575 int num_to_extract;
576 row_lock *row_locks;
577 bool fn(const keyrange &range, TXNID txnid) {
578 if (num_extracted < num_to_extract) {
579 row_lock lock;
580 lock.range.create_copy(range);
581 lock.txnid = txnid;
582 row_locks[num_extracted++] = lock;
583 return true;
584 } else {
585 return false;
586 }
587 }
588 } extract_fn;
589
590 extract_fn.row_locks = row_locks;
591 extract_fn.num_to_extract = num_to_extract;
592 extract_fn.num_extracted = 0;
593 lkr->iterate(&extract_fn);
594
595 // now that the ranges have been copied out, complete
596 // the extraction by removing the ranges from the tree.
597 // use remove_row_lock_from_tree() so we properly track the
598 // amount of memory and number of locks freed.
599 int num_extracted = extract_fn.num_extracted;
600 invariant(num_extracted <= num_to_extract);
601 for (int i = 0; i < num_extracted; i++) {
602 remove_row_lock_from_tree(lkr, row_locks[i], mgr);
603 }
604
605 return num_extracted;
606 }
607
608 // Store each newly escalated lock in a range buffer for appropriate txnid.
609 // We'll rebuild the locktree by iterating over these ranges, and then we
610 // can pass back each txnid/buffer pair individually through a callback
611 // to notify higher layers that locks have changed.
612 struct txnid_range_buffer {
613 TXNID txnid;
614 range_buffer buffer;
615
find_by_txnidtoku::txnid_range_buffer616 static int find_by_txnid(struct txnid_range_buffer *const &other_buffer, const TXNID &txnid) {
617 if (txnid < other_buffer->txnid) {
618 return -1;
619 } else if (other_buffer->txnid == txnid) {
620 return 0;
621 } else {
622 return 1;
623 }
624 }
625 };
626
627 // escalate the locks in the locktree by merging adjacent
628 // locks that have the same txnid into one larger lock.
629 //
630 // if there's only one txnid in the locktree then this
631 // approach works well. if there are many txnids and each
632 // has locks in a random/alternating order, then this does
633 // not work so well.
escalate(lt_escalate_cb after_escalate_callback,void * after_escalate_callback_extra)634 void locktree::escalate(lt_escalate_cb after_escalate_callback, void *after_escalate_callback_extra) {
635 omt<struct txnid_range_buffer *, struct txnid_range_buffer *> range_buffers;
636 range_buffers.create();
637
638 // prepare and acquire a locked keyrange on the entire locktree
639 concurrent_tree::locked_keyrange lkr;
640 keyrange infinite_range = keyrange::get_infinite_range();
641 lkr.prepare(m_rangetree);
642 lkr.acquire(infinite_range);
643
644 // if we're in the single txnid optimization, simply call it off.
645 // if you have to run escalation, you probably don't care about
646 // the optimization anyway, and this makes things easier.
647 if (m_sto_txnid != TXNID_NONE) {
648 // We are already accounting for this escalation time and
649 // count, so don't do it for sto_end_early too.
650 sto_end_early_no_accounting(&lkr);
651 }
652
653 // extract and remove batches of row locks from the locktree
654 int num_extracted;
655 const int num_row_locks_per_batch = 128;
656 row_lock *XCALLOC_N(num_row_locks_per_batch, extracted_buf);
657
658 // we always remove the "first" n because we are removing n
659 // each time we do an extraction. so this loops until its empty.
660 while ((num_extracted =
661 extract_first_n_row_locks(&lkr, m_mgr, extracted_buf,
662 num_row_locks_per_batch)) > 0) {
663 int current_index = 0;
664 while (current_index < num_extracted) {
665 // every batch of extracted locks is in range-sorted order. search
666 // through them and merge adjacent locks with the same txnid into
667 // one dominating lock and save it to a set of escalated locks.
668 //
669 // first, find the index of the next row lock with a different txnid
670 int next_txnid_index = current_index + 1;
671 while (next_txnid_index < num_extracted &&
672 extracted_buf[current_index].txnid == extracted_buf[next_txnid_index].txnid) {
673 next_txnid_index++;
674 }
675
676 // Create an escalated range for the current txnid that dominates
677 // each range between the current indext and the next txnid's index.
678 const TXNID current_txnid = extracted_buf[current_index].txnid;
679 const DBT *escalated_left_key = extracted_buf[current_index].range.get_left_key();
680 const DBT *escalated_right_key = extracted_buf[next_txnid_index - 1].range.get_right_key();
681
682 // Try to find a range buffer for the current txnid. Create one if it doesn't exist.
683 // Then, append the new escalated range to the buffer.
684 uint32_t idx;
685 struct txnid_range_buffer *existing_range_buffer;
686 int r = range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
687 current_txnid,
688 &existing_range_buffer,
689 &idx
690 );
691 if (r == DB_NOTFOUND) {
692 struct txnid_range_buffer *XMALLOC(new_range_buffer);
693 new_range_buffer->txnid = current_txnid;
694 new_range_buffer->buffer.create();
695 new_range_buffer->buffer.append(escalated_left_key, escalated_right_key);
696 range_buffers.insert_at(new_range_buffer, idx);
697 } else {
698 invariant_zero(r);
699 invariant(existing_range_buffer->txnid == current_txnid);
700 existing_range_buffer->buffer.append(escalated_left_key, escalated_right_key);
701 }
702
703 current_index = next_txnid_index;
704 }
705
706 // destroy the ranges copied during the extraction
707 for (int i = 0; i < num_extracted; i++) {
708 extracted_buf[i].range.destroy();
709 }
710 }
711 toku_free(extracted_buf);
712
713 // Rebuild the locktree from each range in each range buffer,
714 // then notify higher layers that the txnid's locks have changed.
715 invariant(m_rangetree->is_empty());
716 const size_t num_range_buffers = range_buffers.size();
717 for (size_t i = 0; i < num_range_buffers; i++) {
718 struct txnid_range_buffer *current_range_buffer;
719 int r = range_buffers.fetch(i, ¤t_range_buffer);
720 invariant_zero(r);
721
722 const TXNID current_txnid = current_range_buffer->txnid;
723 range_buffer::iterator iter(¤t_range_buffer->buffer);
724 range_buffer::iterator::record rec;
725 while (iter.current(&rec)) {
726 keyrange range;
727 range.create(rec.get_left_key(), rec.get_right_key());
728 row_lock lock = { .range = range, .txnid = current_txnid };
729 insert_row_lock_into_tree(&lkr, lock, m_mgr);
730 iter.next();
731 }
732
733 // Notify higher layers that locks have changed for the current txnid
734 if (after_escalate_callback) {
735 after_escalate_callback(current_txnid, this, current_range_buffer->buffer, after_escalate_callback_extra);
736 }
737 current_range_buffer->buffer.destroy();
738 }
739
740 while (range_buffers.size() > 0) {
741 struct txnid_range_buffer *buffer;
742 int r = range_buffers.fetch(0, &buffer);
743 invariant_zero(r);
744 r = range_buffers.delete_at(0);
745 invariant_zero(r);
746 toku_free(buffer);
747 }
748 range_buffers.destroy();
749
750 lkr.release();
751 }
752
get_userdata(void) const753 void *locktree::get_userdata(void) const {
754 return m_userdata;
755 }
756
set_userdata(void * userdata)757 void locktree::set_userdata(void *userdata) {
758 m_userdata = userdata;
759 }
760
get_lock_request_info(void)761 struct lt_lock_request_info *locktree::get_lock_request_info(void) {
762 return &m_lock_request_info;
763 }
764
set_comparator(const comparator & cmp)765 void locktree::set_comparator(const comparator &cmp) {
766 m_cmp.inherit(cmp);
767 }
768
get_manager(void) const769 locktree_manager *locktree::get_manager(void) const {
770 return m_mgr;
771 }
772
compare(const locktree * lt) const773 int locktree::compare(const locktree *lt) const {
774 if (m_dict_id.dictid < lt->m_dict_id.dictid) {
775 return -1;
776 } else if (m_dict_id.dictid == lt->m_dict_id.dictid) {
777 return 0;
778 } else {
779 return 1;
780 }
781 }
782
get_dict_id() const783 DICTIONARY_ID locktree::get_dict_id() const {
784 return m_dict_id;
785 }
786
787 } /* namespace toku */
788