1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 
36 ----------------------------------------
37 
38    Licensed under the Apache License, Version 2.0 (the "License");
39    you may not use this file except in compliance with the License.
40    You may obtain a copy of the License at
41 
42        http://www.apache.org/licenses/LICENSE-2.0
43 
44    Unless required by applicable law or agreed to in writing, software
45    distributed under the License is distributed on an "AS IS" BASIS,
46    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47    See the License for the specific language governing permissions and
48    limitations under the License.
49 ======= */
50 
51 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
52 
53 #include <memory.h>
54 
55 #include <util/growable_array.h>
56 
57 #include <portability/toku_pthread.h>
58 #include <portability/toku_time.h>
59 
60 #include "locktree.h"
61 #include "range_buffer.h"
62 
63 // including the concurrent_tree here expands the templates
64 // and "defines" the implementation, so we do it here in
65 // the locktree source file instead of the header.
66 #include "concurrent_tree.h"
67 
68 namespace toku {
69 
70 // A locktree represents the set of row locks owned by all transactions
71 // over an open dictionary. Read and write ranges are represented as
72 // a left and right key which are compared with the given descriptor
73 // and comparison fn.
74 //
75 // Each locktree has a reference count which it manages
76 // but does nothing based on the value of the reference count - it is
77 // up to the user of the locktree to destroy it when it sees fit.
78 
create(locktree_manager * mgr,DICTIONARY_ID dict_id,const comparator & cmp)79 void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp) {
80     m_mgr = mgr;
81     m_dict_id = dict_id;
82 
83     m_cmp.create_from(cmp);
84     m_reference_count = 1;
85     m_userdata = nullptr;
86 
87     XCALLOC(m_rangetree);
88     m_rangetree->create(&m_cmp);
89 
90     m_sto_txnid = TXNID_NONE;
91     m_sto_buffer.create();
92     m_sto_score = STO_SCORE_THRESHOLD;
93     m_sto_end_early_count = 0;
94     m_sto_end_early_time = 0;
95 
96     m_lock_request_info.init();
97 }
98 
init(void)99 void lt_lock_request_info::init(void) {
100     pending_lock_requests.create();
101     pending_is_empty = true;
102     ZERO_STRUCT(mutex);
103     toku_mutex_init(*locktree_request_info_mutex_key, &mutex, nullptr);
104     retry_want = retry_done = 0;
105     ZERO_STRUCT(counters);
106     ZERO_STRUCT(retry_mutex);
107     toku_mutex_init(
108         *locktree_request_info_retry_mutex_key, &retry_mutex, nullptr);
109     toku_cond_init(*locktree_request_info_retry_cv_key, &retry_cv, nullptr);
110     running_retry = false;
111 
112     TOKU_VALGRIND_HG_DISABLE_CHECKING(&pending_is_empty,
113                                       sizeof(pending_is_empty));
114     TOKU_DRD_IGNORE_VAR(pending_is_empty);
115 }
116 
destroy(void)117 void locktree::destroy(void) {
118     invariant(m_reference_count == 0);
119     invariant(m_lock_request_info.pending_lock_requests.size() == 0);
120     m_cmp.destroy();
121     m_rangetree->destroy();
122     toku_free(m_rangetree);
123     m_sto_buffer.destroy();
124     m_lock_request_info.destroy();
125 }
126 
destroy(void)127 void lt_lock_request_info::destroy(void) {
128     pending_lock_requests.destroy();
129     toku_mutex_destroy(&mutex);
130     toku_mutex_destroy(&retry_mutex);
131     toku_cond_destroy(&retry_cv);
132 }
133 
add_reference(void)134 void locktree::add_reference(void) {
135     (void)toku_sync_add_and_fetch(&m_reference_count, 1);
136 }
137 
release_reference(void)138 uint32_t locktree::release_reference(void) {
139     return toku_sync_sub_and_fetch(&m_reference_count, 1);
140 }
141 
get_reference_count(void)142 uint32_t locktree::get_reference_count(void) {
143     return m_reference_count;
144 }
145 
146 // a container for a range/txnid pair
147 struct row_lock {
148     keyrange range;
149     TXNID txnid;
150 };
151 
152 // iterate over a locked keyrange and copy out all of the data,
153 // storing each row lock into the given growable array. the
154 // caller does not own the range inside the returned row locks,
155 // so remove from the tree with care using them as keys.
iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange * lkr,GrowableArray<row_lock> * row_locks)156 static void iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange *lkr,
157                                                   GrowableArray<row_lock> *row_locks) {
158     struct copy_fn_obj {
159         GrowableArray<row_lock> *row_locks;
160         bool fn(const keyrange &range, TXNID txnid) {
161             row_lock lock = { .range = range, .txnid = txnid };
162             row_locks->push(lock);
163             return true;
164         }
165     } copy_fn;
166     copy_fn.row_locks = row_locks;
167     lkr->iterate(&copy_fn);
168 }
169 
170 // given a txnid and a set of overlapping row locks, determine
171 // which txnids are conflicting, and store them in the conflicts
172 // set, if given.
determine_conflicting_txnids(const GrowableArray<row_lock> & row_locks,const TXNID & txnid,txnid_set * conflicts)173 static bool determine_conflicting_txnids(const GrowableArray<row_lock> &row_locks,
174                                          const TXNID &txnid, txnid_set *conflicts) {
175     bool conflicts_exist = false;
176     const size_t num_overlaps = row_locks.get_size();
177     for (size_t i = 0; i < num_overlaps; i++) {
178         const row_lock lock = row_locks.fetch_unchecked(i);
179         const TXNID other_txnid = lock.txnid;
180         if (other_txnid != txnid) {
181             if (conflicts) {
182                 conflicts->add(other_txnid);
183             }
184             conflicts_exist = true;
185         }
186     }
187     return conflicts_exist;
188 }
189 
190 // how much memory does a row lock take up in a concurrent tree?
row_lock_size_in_tree(const row_lock & lock)191 static uint64_t row_lock_size_in_tree(const row_lock &lock) {
192     const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead();
193     return lock.range.get_memory_size() + overhead;
194 }
195 
196 // remove and destroy the given row lock from the locked keyrange,
197 // then notify the memory tracker of the newly freed lock.
remove_row_lock_from_tree(concurrent_tree::locked_keyrange * lkr,const row_lock & lock,locktree_manager * mgr)198 static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
199                                       const row_lock &lock, locktree_manager *mgr) {
200     const uint64_t mem_released = row_lock_size_in_tree(lock);
201     lkr->remove(lock.range);
202     if (mgr != nullptr) {
203         mgr->note_mem_released(mem_released);
204     }
205 }
206 
207 // insert a row lock into the locked keyrange, then notify
208 // the memory tracker of this newly acquired lock.
insert_row_lock_into_tree(concurrent_tree::locked_keyrange * lkr,const row_lock & lock,locktree_manager * mgr)209 static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
210                                       const row_lock &lock, locktree_manager *mgr) {
211     uint64_t mem_used = row_lock_size_in_tree(lock);
212     lkr->insert(lock.range, lock.txnid);
213     if (mgr != nullptr) {
214         mgr->note_mem_used(mem_used);
215     }
216 }
217 
sto_begin(TXNID txnid)218 void locktree::sto_begin(TXNID txnid) {
219     invariant(m_sto_txnid == TXNID_NONE);
220     invariant(m_sto_buffer.is_empty());
221     m_sto_txnid = txnid;
222 }
223 
sto_append(const DBT * left_key,const DBT * right_key)224 void locktree::sto_append(const DBT *left_key, const DBT *right_key) {
225     uint64_t buffer_mem, delta;
226     keyrange range;
227     range.create(left_key, right_key);
228 
229     buffer_mem = m_sto_buffer.total_memory_size();
230     m_sto_buffer.append(left_key, right_key);
231     delta = m_sto_buffer.total_memory_size() - buffer_mem;
232     if (m_mgr != nullptr) {
233         m_mgr->note_mem_used(delta);
234     }
235 }
236 
sto_end(void)237 void locktree::sto_end(void) {
238     uint64_t mem_size = m_sto_buffer.total_memory_size();
239     if (m_mgr != nullptr) {
240         m_mgr->note_mem_released(mem_size);
241     }
242     m_sto_buffer.destroy();
243     m_sto_buffer.create();
244     m_sto_txnid = TXNID_NONE;
245 }
246 
sto_end_early_no_accounting(void * prepared_lkr)247 void locktree::sto_end_early_no_accounting(void *prepared_lkr) {
248     sto_migrate_buffer_ranges_to_tree(prepared_lkr);
249     sto_end();
250     toku_unsafe_set(m_sto_score, 0);
251 }
252 
sto_end_early(void * prepared_lkr)253 void locktree::sto_end_early(void *prepared_lkr) {
254     m_sto_end_early_count++;
255 
256     tokutime_t t0 = toku_time_now();
257     sto_end_early_no_accounting(prepared_lkr);
258     tokutime_t t1 = toku_time_now();
259 
260     m_sto_end_early_time += (t1 - t0);
261 }
262 
sto_migrate_buffer_ranges_to_tree(void * prepared_lkr)263 void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
264     // There should be something to migrate, and nothing in the rangetree.
265     invariant(!m_sto_buffer.is_empty());
266     invariant(m_rangetree->is_empty());
267 
268     concurrent_tree sto_rangetree;
269     concurrent_tree::locked_keyrange sto_lkr;
270     sto_rangetree.create(&m_cmp);
271 
272     // insert all of the ranges from the single txnid buffer into a new rangtree
273     range_buffer::iterator iter(&m_sto_buffer);
274     range_buffer::iterator::record rec;
275     while (iter.current(&rec)) {
276         sto_lkr.prepare(&sto_rangetree);
277         int r = acquire_lock_consolidated(&sto_lkr,
278                 m_sto_txnid, rec.get_left_key(), rec.get_right_key(), nullptr);
279         invariant_zero(r);
280         sto_lkr.release();
281         iter.next();
282     }
283 
284     // Iterate the newly created rangetree and insert each range into the
285     // locktree's rangetree, on behalf of the old single txnid.
286     struct migrate_fn_obj {
287         concurrent_tree::locked_keyrange *dst_lkr;
288         bool fn(const keyrange &range, TXNID txnid) {
289             dst_lkr->insert(range, txnid);
290             return true;
291         }
292     } migrate_fn;
293     migrate_fn.dst_lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
294     sto_lkr.prepare(&sto_rangetree);
295     sto_lkr.iterate(&migrate_fn);
296     sto_lkr.remove_all();
297     sto_lkr.release();
298     sto_rangetree.destroy();
299     invariant(!m_rangetree->is_empty());
300 }
301 
sto_try_acquire(void * prepared_lkr,TXNID txnid,const DBT * left_key,const DBT * right_key)302 bool locktree::sto_try_acquire(void *prepared_lkr,
303                                TXNID txnid,
304                                const DBT *left_key, const DBT *right_key) {
305     if (m_rangetree->is_empty() && m_sto_buffer.is_empty() && toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) {
306         // We can do the optimization because the rangetree is empty, and
307         // we know its worth trying because the sto score is big enough.
308         sto_begin(txnid);
309     } else if (m_sto_txnid != TXNID_NONE) {
310         // We are currently doing the optimization. Check if we need to cancel
311         // it because a new txnid appeared, or if the current single txnid has
312         // taken too many locks already.
313         if (m_sto_txnid != txnid || m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) {
314             sto_end_early(prepared_lkr);
315         }
316     }
317 
318     // At this point the sto txnid is properly set. If it is valid, then
319     // this txnid can append its lock to the sto buffer successfully.
320     if (m_sto_txnid != TXNID_NONE) {
321         invariant(m_sto_txnid == txnid);
322         sto_append(left_key, right_key);
323         return true;
324     } else {
325         invariant(m_sto_buffer.is_empty());
326         return false;
327     }
328 }
329 
330 // try to acquire a lock and consolidate it with existing locks if possible
331 // param: lkr, a prepared locked keyrange
332 // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
acquire_lock_consolidated(void * prepared_lkr,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)333 int locktree::acquire_lock_consolidated(void *prepared_lkr,
334                                         TXNID txnid,
335                                         const DBT *left_key, const DBT *right_key,
336                                         txnid_set *conflicts) {
337     int r = 0;
338     concurrent_tree::locked_keyrange *lkr;
339 
340     keyrange requested_range;
341     requested_range.create(left_key, right_key);
342     lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
343     lkr->acquire(requested_range);
344 
345     // copy out the set of overlapping row locks.
346     GrowableArray<row_lock> overlapping_row_locks;
347     overlapping_row_locks.init();
348     iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks);
349     size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
350 
351     // if any overlapping row locks conflict with this request, bail out.
352     bool conflicts_exist = determine_conflicting_txnids(overlapping_row_locks,
353                                                         txnid, conflicts);
354     if (!conflicts_exist) {
355         // there are no conflicts, so all of the overlaps are for the requesting txnid.
356         // so, we must consolidate all existing overlapping ranges and the requested
357         // range into one dominating range. then we insert the dominating range.
358         for (size_t i = 0; i < num_overlapping_row_locks; i++) {
359             row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
360             invariant(overlapping_lock.txnid == txnid);
361             requested_range.extend(m_cmp, overlapping_lock.range);
362             remove_row_lock_from_tree(lkr, overlapping_lock, m_mgr);
363         }
364 
365         row_lock new_lock = { .range = requested_range, .txnid = txnid };
366         insert_row_lock_into_tree(lkr, new_lock, m_mgr);
367     } else {
368         r = DB_LOCK_NOTGRANTED;
369     }
370 
371     requested_range.destroy();
372     overlapping_row_locks.deinit();
373     return r;
374 }
375 
376 // acquire a lock in the given key range, inclusive. if successful,
377 // return 0. otherwise, populate the conflicts txnid_set with the set of
378 // transactions that conflict with this request.
acquire_lock(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)379 int locktree::acquire_lock(bool is_write_request,
380                            TXNID txnid,
381                            const DBT *left_key, const DBT *right_key,
382                            txnid_set *conflicts) {
383     int r = 0;
384 
385     // we are only supporting write locks for simplicity
386     invariant(is_write_request);
387 
388     // acquire and prepare a locked keyrange over the requested range.
389     // prepare is a serialzation point, so we take the opportunity to
390     // try the single txnid optimization first.
391     concurrent_tree::locked_keyrange lkr;
392     lkr.prepare(m_rangetree);
393 
394     bool acquired = sto_try_acquire(&lkr, txnid, left_key, right_key);
395     if (!acquired) {
396         r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key, conflicts);
397     }
398 
399     lkr.release();
400     return r;
401 }
402 
try_acquire_lock(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)403 int locktree::try_acquire_lock(bool is_write_request,
404                                TXNID txnid,
405                                const DBT *left_key, const DBT *right_key,
406                                txnid_set *conflicts, bool big_txn) {
407     // All ranges in the locktree must have left endpoints <= right endpoints.
408     // Range comparisons rely on this fact, so we make a paranoid invariant here.
409     paranoid_invariant(m_cmp(left_key, right_key) <= 0);
410     int r = m_mgr == nullptr ? 0 :
411             m_mgr->check_current_lock_constraints(big_txn);
412     if (r == 0) {
413         r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts);
414     }
415     return r;
416 }
417 
418 // the locktree silently upgrades read locks to write locks for simplicity
acquire_read_lock(TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)419 int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
420                                 txnid_set *conflicts, bool big_txn) {
421     return acquire_write_lock(txnid, left_key, right_key, conflicts, big_txn);
422 }
423 
acquire_write_lock(TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)424 int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
425                                  txnid_set *conflicts, bool big_txn) {
426     return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn);
427 }
428 
get_conflicts(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)429 void locktree::get_conflicts(bool is_write_request,
430                              TXNID txnid, const DBT *left_key, const DBT *right_key,
431                              txnid_set *conflicts) {
432     // because we only support write locks, ignore this bit for now.
433     (void) is_write_request;
434 
435     // preparing and acquire a locked keyrange over the range
436     keyrange range;
437     range.create(left_key, right_key);
438     concurrent_tree::locked_keyrange lkr;
439     lkr.prepare(m_rangetree);
440     lkr.acquire(range);
441 
442     // copy out the set of overlapping row locks and determine the conflicts
443     GrowableArray<row_lock> overlapping_row_locks;
444     overlapping_row_locks.init();
445     iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
446 
447     // we don't care if conflicts exist. we just want the conflicts set populated.
448     (void) determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
449 
450     lkr.release();
451     overlapping_row_locks.deinit();
452     range.destroy();
453 }
454 
455 // Effect:
456 //  For each range in the lock tree that overlaps the given range and has
457 //  the given txnid, remove it.
458 // Rationale:
459 //  In the common case, there is only the range [left_key, right_key] and
460 //  it is associated with txnid, so this is a single tree delete.
461 //
462 //  However, consolidation and escalation change the objects in the tree
463 //  without telling the txn anything.  In this case, the txn may own a
464 //  large range lock that represents its ownership of many smaller range
465 //  locks.  For example, the txn may think it owns point locks on keys 1,
466 //  2, and 3, but due to escalation, only the object [1,3] exists in the
467 //  tree.
468 //
469 //  The first call for a small lock will remove the large range lock, and
470 //  the rest of the calls should do nothing.  After the first release,
471 //  another thread can acquire one of the locks that the txn thinks it
472 //  still owns.  That's ok, because the txn doesn't want it anymore (it
473 //  unlocks everything at once), but it may find a lock that it does not
474 //  own.
475 //
476 //  In our example, the txn unlocks key 1, which actually removes the
477 //  whole lock [1,3].  Now, someone else can lock 2 before our txn gets
478 //  around to unlocking 2, so we should not remove that lock.
remove_overlapping_locks_for_txnid(TXNID txnid,const DBT * left_key,const DBT * right_key)479 void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
480                                                   const DBT *left_key,
481                                                   const DBT *right_key) {
482     keyrange release_range;
483     release_range.create(left_key, right_key);
484 
485     // acquire and prepare a locked keyrange over the release range
486     concurrent_tree::locked_keyrange lkr;
487     lkr.prepare(m_rangetree);
488     lkr.acquire(release_range);
489 
490     // copy out the set of overlapping row locks.
491     GrowableArray<row_lock> overlapping_row_locks;
492     overlapping_row_locks.init();
493     iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
494     size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
495 
496     for (size_t i = 0; i < num_overlapping_row_locks; i++) {
497         row_lock lock = overlapping_row_locks.fetch_unchecked(i);
498         // If this isn't our lock, that's ok, just don't remove it.
499         // See rationale above.
500         if (lock.txnid == txnid) {
501             remove_row_lock_from_tree(&lkr, lock, m_mgr);
502         }
503     }
504 
505     lkr.release();
506     overlapping_row_locks.deinit();
507     release_range.destroy();
508 }
509 
sto_txnid_is_valid_unsafe(void) const510 bool locktree::sto_txnid_is_valid_unsafe(void) const {
511     return toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE;
512 }
513 
sto_get_score_unsafe(void) const514 int locktree::sto_get_score_unsafe(void) const {
515     return toku_unsafe_fetch(m_sto_score);
516 }
517 
sto_try_release(TXNID txnid)518 bool locktree::sto_try_release(TXNID txnid) {
519     bool released = false;
520     if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
521         // check the bit again with a prepared locked keyrange,
522         // which protects the optimization bits and rangetree data
523         concurrent_tree::locked_keyrange lkr;
524         lkr.prepare(m_rangetree);
525         if (m_sto_txnid != TXNID_NONE) {
526             // this txnid better be the single txnid on this locktree,
527             // or else we are in big trouble (meaning the logic is broken)
528             invariant(m_sto_txnid == txnid);
529             invariant(m_rangetree->is_empty());
530             sto_end();
531             released = true;
532         }
533         lkr.release();
534     }
535     return released;
536 }
537 
538 // release all of the locks for a txnid whose endpoints are pairs
539 // in the given range buffer.
release_locks(TXNID txnid,const range_buffer * ranges)540 void locktree::release_locks(TXNID txnid, const range_buffer *ranges) {
541     // try the single txn optimization. if it worked, then all of the
542     // locks are already released, otherwise we need to do it here.
543     bool released = sto_try_release(txnid);
544     if (!released) {
545         range_buffer::iterator iter(ranges);
546         range_buffer::iterator::record rec;
547         while (iter.current(&rec)) {
548             const DBT *left_key = rec.get_left_key();
549             const DBT *right_key = rec.get_right_key();
550             // All ranges in the locktree must have left endpoints <= right endpoints.
551             // Range comparisons rely on this fact, so we make a paranoid invariant here.
552             paranoid_invariant(m_cmp(left_key, right_key) <= 0);
553             remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
554             iter.next();
555         }
556         // Increase the sto score slightly. Eventually it will hit
557         // the threshold and we'll try the optimization again. This
558         // is how a previously multithreaded system transitions into
559         // a single threaded system that benefits from the optimization.
560         if (toku_unsafe_fetch(m_sto_score) < STO_SCORE_THRESHOLD) {
561             toku_sync_fetch_and_add(&m_sto_score, 1);
562         }
563     }
564 }
565 
566 // iterate over a locked keyrange and extract copies of the first N
567 // row locks, storing each one into the given array of size N,
568 // then removing each extracted lock from the locked keyrange.
extract_first_n_row_locks(concurrent_tree::locked_keyrange * lkr,locktree_manager * mgr,row_lock * row_locks,int num_to_extract)569 static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
570                                      locktree_manager *mgr,
571                                      row_lock *row_locks, int num_to_extract) {
572 
573     struct extract_fn_obj {
574         int num_extracted;
575         int num_to_extract;
576         row_lock *row_locks;
577         bool fn(const keyrange &range, TXNID txnid) {
578             if (num_extracted < num_to_extract) {
579                 row_lock lock;
580                 lock.range.create_copy(range);
581                 lock.txnid = txnid;
582                 row_locks[num_extracted++] = lock;
583                 return true;
584             } else {
585                 return false;
586             }
587         }
588     } extract_fn;
589 
590     extract_fn.row_locks = row_locks;
591     extract_fn.num_to_extract = num_to_extract;
592     extract_fn.num_extracted = 0;
593     lkr->iterate(&extract_fn);
594 
595     // now that the ranges have been copied out, complete
596     // the extraction by removing the ranges from the tree.
597     // use remove_row_lock_from_tree() so we properly track the
598     // amount of memory and number of locks freed.
599     int num_extracted = extract_fn.num_extracted;
600     invariant(num_extracted <= num_to_extract);
601     for (int i = 0; i < num_extracted; i++) {
602         remove_row_lock_from_tree(lkr, row_locks[i], mgr);
603     }
604 
605     return num_extracted;
606 }
607 
608 // Store each newly escalated lock in a range buffer for appropriate txnid.
609 // We'll rebuild the locktree by iterating over these ranges, and then we
610 // can pass back each txnid/buffer pair individually through a callback
611 // to notify higher layers that locks have changed.
612 struct txnid_range_buffer {
613     TXNID txnid;
614     range_buffer buffer;
615 
find_by_txnidtoku::txnid_range_buffer616     static int find_by_txnid(struct txnid_range_buffer *const &other_buffer, const TXNID &txnid) {
617         if (txnid < other_buffer->txnid) {
618             return -1;
619         } else if (other_buffer->txnid == txnid) {
620             return 0;
621         } else {
622             return 1;
623         }
624     }
625 };
626 
627 // escalate the locks in the locktree by merging adjacent
628 // locks that have the same txnid into one larger lock.
629 //
630 // if there's only one txnid in the locktree then this
631 // approach works well. if there are many txnids and each
632 // has locks in a random/alternating order, then this does
633 // not work so well.
escalate(lt_escalate_cb after_escalate_callback,void * after_escalate_callback_extra)634 void locktree::escalate(lt_escalate_cb after_escalate_callback, void *after_escalate_callback_extra) {
635     omt<struct txnid_range_buffer *, struct txnid_range_buffer *> range_buffers;
636     range_buffers.create();
637 
638     // prepare and acquire a locked keyrange on the entire locktree
639     concurrent_tree::locked_keyrange lkr;
640     keyrange infinite_range = keyrange::get_infinite_range();
641     lkr.prepare(m_rangetree);
642     lkr.acquire(infinite_range);
643 
644     // if we're in the single txnid optimization, simply call it off.
645     // if you have to run escalation, you probably don't care about
646     // the optimization anyway, and this makes things easier.
647     if (m_sto_txnid != TXNID_NONE) {
648         // We are already accounting for this escalation time and
649         // count, so don't do it for sto_end_early too.
650         sto_end_early_no_accounting(&lkr);
651     }
652 
653     // extract and remove batches of row locks from the locktree
654     int num_extracted;
655     const int num_row_locks_per_batch = 128;
656     row_lock *XCALLOC_N(num_row_locks_per_batch, extracted_buf);
657 
658     // we always remove the "first" n because we are removing n
659     // each time we do an extraction. so this loops until its empty.
660     while ((num_extracted =
661                 extract_first_n_row_locks(&lkr, m_mgr, extracted_buf,
662                                           num_row_locks_per_batch)) > 0) {
663         int current_index = 0;
664         while (current_index < num_extracted) {
665             // every batch of extracted locks is in range-sorted order. search
666             // through them and merge adjacent locks with the same txnid into
667             // one dominating lock and save it to a set of escalated locks.
668             //
669             // first, find the index of the next row lock with a different txnid
670             int next_txnid_index = current_index + 1;
671             while (next_txnid_index < num_extracted &&
672                     extracted_buf[current_index].txnid == extracted_buf[next_txnid_index].txnid) {
673                 next_txnid_index++;
674             }
675 
676             // Create an escalated range for the current txnid that dominates
677             // each range between the current indext and the next txnid's index.
678             const TXNID current_txnid = extracted_buf[current_index].txnid;
679             const DBT *escalated_left_key = extracted_buf[current_index].range.get_left_key();
680             const DBT *escalated_right_key = extracted_buf[next_txnid_index - 1].range.get_right_key();
681 
682             // Try to find a range buffer for the current txnid. Create one if it doesn't exist.
683             // Then, append the new escalated range to the buffer.
684             uint32_t idx;
685             struct txnid_range_buffer *existing_range_buffer;
686             int r = range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
687                     current_txnid,
688                     &existing_range_buffer,
689                     &idx
690                     );
691             if (r == DB_NOTFOUND) {
692                 struct txnid_range_buffer *XMALLOC(new_range_buffer);
693                 new_range_buffer->txnid = current_txnid;
694                 new_range_buffer->buffer.create();
695                 new_range_buffer->buffer.append(escalated_left_key, escalated_right_key);
696                 range_buffers.insert_at(new_range_buffer, idx);
697             } else {
698                 invariant_zero(r);
699                 invariant(existing_range_buffer->txnid == current_txnid);
700                 existing_range_buffer->buffer.append(escalated_left_key, escalated_right_key);
701             }
702 
703             current_index = next_txnid_index;
704         }
705 
706         // destroy the ranges copied during the extraction
707         for (int i = 0; i < num_extracted; i++) {
708             extracted_buf[i].range.destroy();
709         }
710     }
711     toku_free(extracted_buf);
712 
713     // Rebuild the locktree from each range in each range buffer,
714     // then notify higher layers that the txnid's locks have changed.
715     invariant(m_rangetree->is_empty());
716     const size_t num_range_buffers = range_buffers.size();
717     for (size_t i = 0; i < num_range_buffers; i++) {
718         struct txnid_range_buffer *current_range_buffer;
719         int r = range_buffers.fetch(i, &current_range_buffer);
720         invariant_zero(r);
721 
722         const TXNID current_txnid = current_range_buffer->txnid;
723         range_buffer::iterator iter(&current_range_buffer->buffer);
724         range_buffer::iterator::record rec;
725         while (iter.current(&rec)) {
726             keyrange range;
727             range.create(rec.get_left_key(), rec.get_right_key());
728             row_lock lock = { .range = range, .txnid = current_txnid };
729             insert_row_lock_into_tree(&lkr, lock, m_mgr);
730             iter.next();
731         }
732 
733         // Notify higher layers that locks have changed for the current txnid
734         if (after_escalate_callback) {
735             after_escalate_callback(current_txnid, this, current_range_buffer->buffer, after_escalate_callback_extra);
736         }
737         current_range_buffer->buffer.destroy();
738     }
739 
740     while (range_buffers.size() > 0) {
741         struct txnid_range_buffer *buffer;
742         int r = range_buffers.fetch(0, &buffer);
743         invariant_zero(r);
744         r = range_buffers.delete_at(0);
745         invariant_zero(r);
746         toku_free(buffer);
747     }
748     range_buffers.destroy();
749 
750     lkr.release();
751 }
752 
get_userdata(void) const753 void *locktree::get_userdata(void) const {
754     return m_userdata;
755 }
756 
set_userdata(void * userdata)757 void locktree::set_userdata(void *userdata) {
758     m_userdata = userdata;
759 }
760 
get_lock_request_info(void)761 struct lt_lock_request_info *locktree::get_lock_request_info(void) {
762     return &m_lock_request_info;
763 }
764 
set_comparator(const comparator & cmp)765 void locktree::set_comparator(const comparator &cmp) {
766     m_cmp.inherit(cmp);
767 }
768 
get_manager(void) const769 locktree_manager *locktree::get_manager(void) const {
770     return m_mgr;
771 }
772 
compare(const locktree * lt) const773 int locktree::compare(const locktree *lt) const {
774     if (m_dict_id.dictid < lt->m_dict_id.dictid) {
775         return -1;
776     } else if (m_dict_id.dictid == lt->m_dict_id.dictid) {
777         return 0;
778     } else {
779         return 1;
780     }
781 }
782 
get_dict_id() const783 DICTIONARY_ID locktree::get_dict_id() const {
784     return m_dict_id;
785 }
786 
787 } /* namespace toku */
788