1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
3 #ifndef ROCKSDB_LITE
4 #ifndef OS_WIN
5 #ident "$Id$"
6 /*======
7 This file is part of PerconaFT.
8 
9 
10 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
11 
12     PerconaFT is free software: you can redistribute it and/or modify
13     it under the terms of the GNU General Public License, version 2,
14     as published by the Free Software Foundation.
15 
16     PerconaFT is distributed in the hope that it will be useful,
17     but WITHOUT ANY WARRANTY; without even the implied warranty of
18     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19     GNU General Public License for more details.
20 
21     You should have received a copy of the GNU General Public License
22     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
23 
24 ----------------------------------------
25 
26     PerconaFT is free software: you can redistribute it and/or modify
27     it under the terms of the GNU Affero General Public License, version 3,
28     as published by the Free Software Foundation.
29 
30     PerconaFT is distributed in the hope that it will be useful,
31     but WITHOUT ANY WARRANTY; without even the implied warranty of
32     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
33     GNU Affero General Public License for more details.
34 
35     You should have received a copy of the GNU Affero General Public License
36     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
37 
38 ----------------------------------------
39 
40    Licensed under the Apache License, Version 2.0 (the "License");
41    you may not use this file except in compliance with the License.
42    You may obtain a copy of the License at
43 
44        http://www.apache.org/licenses/LICENSE-2.0
45 
46    Unless required by applicable law or agreed to in writing, software
47    distributed under the License is distributed on an "AS IS" BASIS,
48    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49    See the License for the specific language governing permissions and
50    limitations under the License.
51 ======= */
52 
53 #ident \
54     "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
55 
56 #include "locktree.h"
57 
58 #include <memory.h>
59 
60 #include "../portability/toku_pthread.h"
61 #include "../portability/toku_time.h"
62 #include "../util/growable_array.h"
63 #include "range_buffer.h"
64 
65 // including the concurrent_tree here expands the templates
66 // and "defines" the implementation, so we do it here in
67 // the locktree source file instead of the header.
68 #include "concurrent_tree.h"
69 
70 namespace toku {
71 // A locktree represents the set of row locks owned by all transactions
72 // over an open dictionary. Read and write ranges are represented as
73 // a left and right key which are compared with the given descriptor
74 // and comparison fn.
75 //
76 // Each locktree has a reference count which it manages
77 // but does nothing based on the value of the reference count - it is
78 // up to the user of the locktree to destroy it when it sees fit.
79 
create(locktree_manager * mgr,DICTIONARY_ID dict_id,const comparator & cmp,toku_external_mutex_factory_t mutex_factory)80 void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id,
81                       const comparator &cmp,
82                       toku_external_mutex_factory_t mutex_factory) {
83   m_mgr = mgr;
84   m_dict_id = dict_id;
85 
86   m_cmp.create_from(cmp);
87   m_reference_count = 1;
88   m_userdata = nullptr;
89 
90   XCALLOC(m_rangetree);
91   m_rangetree->create(&m_cmp);
92 
93   m_sto_txnid = TXNID_NONE;
94   m_sto_buffer.create();
95   m_sto_score = STO_SCORE_THRESHOLD;
96   m_sto_end_early_count = 0;
97   m_sto_end_early_time = 0;
98 
99   m_lock_request_info.init(mutex_factory);
100 }
101 
init(toku_external_mutex_factory_t mutex_factory)102 void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) {
103   pending_lock_requests.create();
104   pending_is_empty = true;
105   toku_external_mutex_init(mutex_factory, &mutex);
106   retry_want = retry_done = 0;
107   ZERO_STRUCT(counters);
108   ZERO_STRUCT(retry_mutex);
109   toku_mutex_init(locktree_request_info_retry_mutex_key, &retry_mutex, nullptr);
110   toku_cond_init(locktree_request_info_retry_cv_key, &retry_cv, nullptr);
111   running_retry = false;
112 
113   TOKU_VALGRIND_HG_DISABLE_CHECKING(&pending_is_empty,
114                                     sizeof(pending_is_empty));
115   TOKU_DRD_IGNORE_VAR(pending_is_empty);
116 }
117 
destroy(void)118 void locktree::destroy(void) {
119   invariant(m_reference_count == 0);
120   invariant(m_lock_request_info.pending_lock_requests.size() == 0);
121   m_cmp.destroy();
122   m_rangetree->destroy();
123   toku_free(m_rangetree);
124   m_sto_buffer.destroy();
125   m_lock_request_info.destroy();
126 }
127 
destroy(void)128 void lt_lock_request_info::destroy(void) {
129   pending_lock_requests.destroy();
130   toku_external_mutex_destroy(&mutex);
131   toku_mutex_destroy(&retry_mutex);
132   toku_cond_destroy(&retry_cv);
133 }
134 
add_reference(void)135 void locktree::add_reference(void) {
136   (void)toku_sync_add_and_fetch(&m_reference_count, 1);
137 }
138 
release_reference(void)139 uint32_t locktree::release_reference(void) {
140   return toku_sync_sub_and_fetch(&m_reference_count, 1);
141 }
142 
get_reference_count(void)143 uint32_t locktree::get_reference_count(void) { return m_reference_count; }
144 
145 // a container for a range/txnid pair
146 struct row_lock {
147   keyrange range;
148   TXNID txnid;
149   bool is_shared;
150   TxnidVector *owners;
151 };
152 
153 // iterate over a locked keyrange and copy out all of the data,
154 // storing each row lock into the given growable array. the
155 // caller does not own the range inside the returned row locks,
156 // so remove from the tree with care using them as keys.
iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange * lkr,GrowableArray<row_lock> * row_locks)157 static void iterate_and_get_overlapping_row_locks(
158     const concurrent_tree::locked_keyrange *lkr,
159     GrowableArray<row_lock> *row_locks) {
160   struct copy_fn_obj {
161     GrowableArray<row_lock> *row_locks;
162     bool fn(const keyrange &range, TXNID txnid, bool is_shared,
163             TxnidVector *owners) {
164       row_lock lock = {.range = range,
165                        .txnid = txnid,
166                        .is_shared = is_shared,
167                        .owners = owners};
168       row_locks->push(lock);
169       return true;
170     }
171   } copy_fn;
172   copy_fn.row_locks = row_locks;
173   lkr->iterate(&copy_fn);
174 }
175 
176 // given a txnid and a set of overlapping row locks, determine
177 // which txnids are conflicting, and store them in the conflicts
178 // set, if given.
determine_conflicting_txnids(const GrowableArray<row_lock> & row_locks,const TXNID & txnid,txnid_set * conflicts)179 static bool determine_conflicting_txnids(
180     const GrowableArray<row_lock> &row_locks, const TXNID &txnid,
181     txnid_set *conflicts) {
182   bool conflicts_exist = false;
183   const size_t num_overlaps = row_locks.get_size();
184   for (size_t i = 0; i < num_overlaps; i++) {
185     const row_lock lock = row_locks.fetch_unchecked(i);
186     const TXNID other_txnid = lock.txnid;
187     if (other_txnid != txnid) {
188       if (conflicts) {
189         if (other_txnid == TXNID_SHARED) {
190           // Add all shared lock owners, except this transaction.
191           for (TXNID shared_id : *lock.owners) {
192             if (shared_id != txnid)
193               conflicts->add(shared_id);
194           }
195         } else {
196           conflicts->add(other_txnid);
197         }
198       }
199       conflicts_exist = true;
200     }
201   }
202   return conflicts_exist;
203 }
204 
205 // how much memory does a row lock take up in a concurrent tree?
row_lock_size_in_tree(const row_lock & lock)206 static uint64_t row_lock_size_in_tree(const row_lock &lock) {
207   const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead();
208   return lock.range.get_memory_size() + overhead;
209 }
210 
211 // remove and destroy the given row lock from the locked keyrange,
212 // then notify the memory tracker of the newly freed lock.
remove_row_lock_from_tree(concurrent_tree::locked_keyrange * lkr,const row_lock & lock,TXNID txnid,locktree_manager * mgr)213 static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
214                                       const row_lock &lock, TXNID txnid,
215                                       locktree_manager *mgr) {
216   const uint64_t mem_released = row_lock_size_in_tree(lock);
217   lkr->remove(lock.range, txnid);
218   if (mgr != nullptr) {
219     mgr->note_mem_released(mem_released);
220   }
221 }
222 
223 // insert a row lock into the locked keyrange, then notify
224 // the memory tracker of this newly acquired lock.
insert_row_lock_into_tree(concurrent_tree::locked_keyrange * lkr,const row_lock & lock,locktree_manager * mgr)225 static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
226                                       const row_lock &lock,
227                                       locktree_manager *mgr) {
228   uint64_t mem_used = row_lock_size_in_tree(lock);
229   lkr->insert(lock.range, lock.txnid, lock.is_shared);
230   if (mgr != nullptr) {
231     mgr->note_mem_used(mem_used);
232   }
233 }
234 
sto_begin(TXNID txnid)235 void locktree::sto_begin(TXNID txnid) {
236   invariant(m_sto_txnid == TXNID_NONE);
237   invariant(m_sto_buffer.is_empty());
238   m_sto_txnid = txnid;
239 }
240 
sto_append(const DBT * left_key,const DBT * right_key,bool is_write_request)241 void locktree::sto_append(const DBT *left_key, const DBT *right_key,
242                           bool is_write_request) {
243   uint64_t buffer_mem, delta;
244 
245   // psergey: the below two lines do not make any sense
246   // (and it's the same in upstream TokuDB)
247   keyrange range;
248   range.create(left_key, right_key);
249 
250   buffer_mem = m_sto_buffer.total_memory_size();
251   m_sto_buffer.append(left_key, right_key, is_write_request);
252   delta = m_sto_buffer.total_memory_size() - buffer_mem;
253   if (m_mgr != nullptr) {
254     m_mgr->note_mem_used(delta);
255   }
256 }
257 
sto_end(void)258 void locktree::sto_end(void) {
259   uint64_t mem_size = m_sto_buffer.total_memory_size();
260   if (m_mgr != nullptr) {
261     m_mgr->note_mem_released(mem_size);
262   }
263   m_sto_buffer.destroy();
264   m_sto_buffer.create();
265   m_sto_txnid = TXNID_NONE;
266 }
267 
sto_end_early_no_accounting(void * prepared_lkr)268 void locktree::sto_end_early_no_accounting(void *prepared_lkr) {
269   sto_migrate_buffer_ranges_to_tree(prepared_lkr);
270   sto_end();
271   toku_unsafe_set(m_sto_score, 0);
272 }
273 
sto_end_early(void * prepared_lkr)274 void locktree::sto_end_early(void *prepared_lkr) {
275   m_sto_end_early_count++;
276 
277   tokutime_t t0 = toku_time_now();
278   sto_end_early_no_accounting(prepared_lkr);
279   tokutime_t t1 = toku_time_now();
280 
281   m_sto_end_early_time += (t1 - t0);
282 }
283 
sto_migrate_buffer_ranges_to_tree(void * prepared_lkr)284 void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
285   // There should be something to migrate, and nothing in the rangetree.
286   invariant(!m_sto_buffer.is_empty());
287   invariant(m_rangetree->is_empty());
288 
289   concurrent_tree sto_rangetree;
290   concurrent_tree::locked_keyrange sto_lkr;
291   sto_rangetree.create(&m_cmp);
292 
293   // insert all of the ranges from the single txnid buffer into a new rangtree
294   range_buffer::iterator iter(&m_sto_buffer);
295   range_buffer::iterator::record rec;
296   while (iter.current(&rec)) {
297     sto_lkr.prepare(&sto_rangetree);
298     int r = acquire_lock_consolidated(&sto_lkr, m_sto_txnid, rec.get_left_key(),
299                                       rec.get_right_key(),
300                                       rec.get_exclusive_flag(), nullptr);
301     invariant_zero(r);
302     sto_lkr.release();
303     iter.next();
304   }
305 
306   // Iterate the newly created rangetree and insert each range into the
307   // locktree's rangetree, on behalf of the old single txnid.
308   struct migrate_fn_obj {
309     concurrent_tree::locked_keyrange *dst_lkr;
310     bool fn(const keyrange &range, TXNID txnid, bool is_shared,
311             TxnidVector *owners) {
312       // There can't be multiple owners in STO mode
313       invariant_zero(owners);
314       dst_lkr->insert(range, txnid, is_shared);
315       return true;
316     }
317   } migrate_fn;
318   migrate_fn.dst_lkr =
319       static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
320   sto_lkr.prepare(&sto_rangetree);
321   sto_lkr.iterate(&migrate_fn);
322   sto_lkr.remove_all();
323   sto_lkr.release();
324   sto_rangetree.destroy();
325   invariant(!m_rangetree->is_empty());
326 }
327 
sto_try_acquire(void * prepared_lkr,TXNID txnid,const DBT * left_key,const DBT * right_key,bool is_write_request)328 bool locktree::sto_try_acquire(void *prepared_lkr, TXNID txnid,
329                                const DBT *left_key, const DBT *right_key,
330                                bool is_write_request) {
331   if (m_rangetree->is_empty() && m_sto_buffer.is_empty() &&
332       toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) {
333     // We can do the optimization because the rangetree is empty, and
334     // we know its worth trying because the sto score is big enough.
335     sto_begin(txnid);
336   } else if (m_sto_txnid != TXNID_NONE) {
337     // We are currently doing the optimization. Check if we need to cancel
338     // it because a new txnid appeared, or if the current single txnid has
339     // taken too many locks already.
340     if (m_sto_txnid != txnid ||
341         m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) {
342       sto_end_early(prepared_lkr);
343     }
344   }
345 
346   // At this point the sto txnid is properly set. If it is valid, then
347   // this txnid can append its lock to the sto buffer successfully.
348   if (m_sto_txnid != TXNID_NONE) {
349     invariant(m_sto_txnid == txnid);
350     sto_append(left_key, right_key, is_write_request);
351     return true;
352   } else {
353     invariant(m_sto_buffer.is_empty());
354     return false;
355   }
356 }
357 
358 /*
359   Do the same as iterate_and_get_overlapping_row_locks does, but also check for
360   this:
361     The set of overlapping rows locks consists of just one read-only shared
362     lock with the same endpoints as specified (in that case, we can just add
363     ourselves into that list)
364 
365   @return true - One compatible shared lock
366          false - Otherwise
367 */
iterate_and_get_overlapping_row_locks2(const concurrent_tree::locked_keyrange * lkr,const DBT * left_key,const DBT * right_key,comparator * cmp,TXNID,GrowableArray<row_lock> * row_locks)368 static bool iterate_and_get_overlapping_row_locks2(
369     const concurrent_tree::locked_keyrange *lkr, const DBT *left_key,
370     const DBT *right_key, comparator *cmp, TXNID,
371     GrowableArray<row_lock> *row_locks) {
372   struct copy_fn_obj {
373     GrowableArray<row_lock> *row_locks;
374     bool first_call = true;
375     bool matching_lock_found = false;
376     const DBT *left_key, *right_key;
377     comparator *cmp;
378 
379     bool fn(const keyrange &range, TXNID txnid, bool is_shared,
380             TxnidVector *owners) {
381       if (first_call) {
382         first_call = false;
383         if (is_shared && !(*cmp)(left_key, range.get_left_key()) &&
384             !(*cmp)(right_key, range.get_right_key())) {
385           matching_lock_found = true;
386         }
387       } else {
388         // if we see multiple matching locks, it doesn't matter whether
389         // the first one was matching.
390         matching_lock_found = false;
391       }
392       row_lock lock = {.range = range,
393                        .txnid = txnid,
394                        .is_shared = is_shared,
395                        .owners = owners};
396       row_locks->push(lock);
397       return true;
398     }
399   } copy_fn;
400   copy_fn.row_locks = row_locks;
401   copy_fn.left_key = left_key;
402   copy_fn.right_key = right_key;
403   copy_fn.cmp = cmp;
404   lkr->iterate(&copy_fn);
405   return copy_fn.matching_lock_found;
406 }
407 
408 // try to acquire a lock and consolidate it with existing locks if possible
409 // param: lkr, a prepared locked keyrange
410 // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
acquire_lock_consolidated(void * prepared_lkr,TXNID txnid,const DBT * left_key,const DBT * right_key,bool is_write_request,txnid_set * conflicts)411 int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
412                                         const DBT *left_key,
413                                         const DBT *right_key,
414                                         bool is_write_request,
415                                         txnid_set *conflicts) {
416   int r = 0;
417   concurrent_tree::locked_keyrange *lkr;
418 
419   keyrange requested_range;
420   requested_range.create(left_key, right_key);
421   lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
422   lkr->acquire(requested_range);
423 
424   // copy out the set of overlapping row locks.
425   GrowableArray<row_lock> overlapping_row_locks;
426   overlapping_row_locks.init();
427   bool matching_shared_lock_found = false;
428 
429   if (is_write_request)
430     iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks);
431   else {
432     matching_shared_lock_found = iterate_and_get_overlapping_row_locks2(
433         lkr, left_key, right_key, &m_cmp, txnid, &overlapping_row_locks);
434     // psergey-todo: what to do now? So, we have figured we have just one
435     // shareable lock. Need to add us into it as an owner but the lock
436     // pointer cannot be kept?
437     // A: use find_node_with_overlapping_child(key_range, nullptr);
438     //  then, add ourselves to the owner list.
439     // Dont' foreget to release the subtree after that.
440   }
441 
442   if (matching_shared_lock_found) {
443     // there is just one non-confliting matching shared lock.
444     //  we are hilding a lock on it (see acquire() call above).
445     //  we need to modify it to indicate there is another locker...
446     if (lkr->add_shared_owner(requested_range, txnid)) {
447       // Pretend shared lock uses as much memory.
448       row_lock new_lock = {.range = requested_range,
449                            .txnid = txnid,
450                            .is_shared = false,
451                            .owners = nullptr};
452       uint64_t mem_used = row_lock_size_in_tree(new_lock);
453       if (m_mgr) {
454         m_mgr->note_mem_used(mem_used);
455       }
456     }
457     requested_range.destroy();
458     overlapping_row_locks.deinit();
459     return 0;
460   }
461 
462   size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
463 
464   // if any overlapping row locks conflict with this request, bail out.
465 
466   bool conflicts_exist =
467       determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
468   if (!conflicts_exist) {
469     // there are no conflicts, so all of the overlaps are for the requesting
470     // txnid. so, we must consolidate all existing overlapping ranges and the
471     // requested range into one dominating range. then we insert the dominating
472     // range.
473     bool all_shared = !is_write_request;
474     for (size_t i = 0; i < num_overlapping_row_locks; i++) {
475       row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
476       invariant(overlapping_lock.txnid == txnid);
477       requested_range.extend(m_cmp, overlapping_lock.range);
478       remove_row_lock_from_tree(lkr, overlapping_lock, TXNID_ANY, m_mgr);
479       all_shared = all_shared && overlapping_lock.is_shared;
480     }
481 
482     row_lock new_lock = {.range = requested_range,
483                          .txnid = txnid,
484                          .is_shared = all_shared,
485                          .owners = nullptr};
486     insert_row_lock_into_tree(lkr, new_lock, m_mgr);
487   } else {
488     r = DB_LOCK_NOTGRANTED;
489   }
490 
491   requested_range.destroy();
492   overlapping_row_locks.deinit();
493   return r;
494 }
495 
496 // acquire a lock in the given key range, inclusive. if successful,
497 // return 0. otherwise, populate the conflicts txnid_set with the set of
498 // transactions that conflict with this request.
acquire_lock(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)499 int locktree::acquire_lock(bool is_write_request, TXNID txnid,
500                            const DBT *left_key, const DBT *right_key,
501                            txnid_set *conflicts) {
502   int r = 0;
503 
504   // we are only supporting write locks for simplicity
505   // invariant(is_write_request);
506 
507   // acquire and prepare a locked keyrange over the requested range.
508   // prepare is a serialzation point, so we take the opportunity to
509   // try the single txnid optimization first.
510   concurrent_tree::locked_keyrange lkr;
511   lkr.prepare(m_rangetree);
512 
513   bool acquired =
514       sto_try_acquire(&lkr, txnid, left_key, right_key, is_write_request);
515   if (!acquired) {
516     r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key,
517                                   is_write_request, conflicts);
518   }
519 
520   lkr.release();
521   return r;
522 }
523 
try_acquire_lock(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)524 int locktree::try_acquire_lock(bool is_write_request, TXNID txnid,
525                                const DBT *left_key, const DBT *right_key,
526                                txnid_set *conflicts, bool big_txn) {
527   // All ranges in the locktree must have left endpoints <= right endpoints.
528   // Range comparisons rely on this fact, so we make a paranoid invariant here.
529   paranoid_invariant(m_cmp(left_key, right_key) <= 0);
530   int r = m_mgr == nullptr ? 0 : m_mgr->check_current_lock_constraints(big_txn);
531   if (r == 0) {
532     r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts);
533   }
534   return r;
535 }
536 
537 // the locktree silently upgrades read locks to write locks for simplicity
acquire_read_lock(TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)538 int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key,
539                                 const DBT *right_key, txnid_set *conflicts,
540                                 bool big_txn) {
541   return try_acquire_lock(false, txnid, left_key, right_key, conflicts,
542                           big_txn);
543 }
544 
acquire_write_lock(TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts,bool big_txn)545 int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key,
546                                  const DBT *right_key, txnid_set *conflicts,
547                                  bool big_txn) {
548   return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn);
549 }
550 
551 // typedef void (*dump_callback)(void *cdata, const DBT *left, const DBT *right,
552 // TXNID txnid);
dump_locks(void * cdata,dump_callback cb)553 void locktree::dump_locks(void *cdata, dump_callback cb) {
554   concurrent_tree::locked_keyrange lkr;
555   keyrange range;
556   range.create(toku_dbt_negative_infinity(), toku_dbt_positive_infinity());
557 
558   lkr.prepare(m_rangetree);
559   lkr.acquire(range);
560 
561   TXNID sto_txn;
562   if ((sto_txn = toku_unsafe_fetch(m_sto_txnid)) != TXNID_NONE) {
563     // insert all of the ranges from the single txnid buffer into a new rangtree
564     range_buffer::iterator iter(&m_sto_buffer);
565     range_buffer::iterator::record rec;
566     while (iter.current(&rec)) {
567       (*cb)(cdata, rec.get_left_key(), rec.get_right_key(), sto_txn,
568             !rec.get_exclusive_flag(), nullptr);
569       iter.next();
570     }
571   } else {
572     GrowableArray<row_lock> all_locks;
573     all_locks.init();
574     iterate_and_get_overlapping_row_locks(&lkr, &all_locks);
575 
576     const size_t n_locks = all_locks.get_size();
577     for (size_t i = 0; i < n_locks; i++) {
578       const row_lock lock = all_locks.fetch_unchecked(i);
579       (*cb)(cdata, lock.range.get_left_key(), lock.range.get_right_key(),
580             lock.txnid, lock.is_shared, lock.owners);
581     }
582     all_locks.deinit();
583   }
584   lkr.release();
585   range.destroy();
586 }
587 
get_conflicts(bool is_write_request,TXNID txnid,const DBT * left_key,const DBT * right_key,txnid_set * conflicts)588 void locktree::get_conflicts(bool is_write_request, TXNID txnid,
589                              const DBT *left_key, const DBT *right_key,
590                              txnid_set *conflicts) {
591   // because we only support write locks, ignore this bit for now.
592   (void)is_write_request;
593 
594   // preparing and acquire a locked keyrange over the range
595   keyrange range;
596   range.create(left_key, right_key);
597   concurrent_tree::locked_keyrange lkr;
598   lkr.prepare(m_rangetree);
599   lkr.acquire(range);
600 
601   // copy out the set of overlapping row locks and determine the conflicts
602   GrowableArray<row_lock> overlapping_row_locks;
603   overlapping_row_locks.init();
604   iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
605 
606   // we don't care if conflicts exist. we just want the conflicts set populated.
607   (void)determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
608 
609   lkr.release();
610   overlapping_row_locks.deinit();
611   range.destroy();
612 }
613 
614 // Effect:
615 //  For each range in the lock tree that overlaps the given range and has
616 //  the given txnid, remove it.
617 // Rationale:
618 //  In the common case, there is only the range [left_key, right_key] and
619 //  it is associated with txnid, so this is a single tree delete.
620 //
621 //  However, consolidation and escalation change the objects in the tree
622 //  without telling the txn anything.  In this case, the txn may own a
623 //  large range lock that represents its ownership of many smaller range
624 //  locks.  For example, the txn may think it owns point locks on keys 1,
625 //  2, and 3, but due to escalation, only the object [1,3] exists in the
626 //  tree.
627 //
628 //  The first call for a small lock will remove the large range lock, and
629 //  the rest of the calls should do nothing.  After the first release,
630 //  another thread can acquire one of the locks that the txn thinks it
631 //  still owns.  That's ok, because the txn doesn't want it anymore (it
632 //  unlocks everything at once), but it may find a lock that it does not
633 //  own.
634 //
635 //  In our example, the txn unlocks key 1, which actually removes the
636 //  whole lock [1,3].  Now, someone else can lock 2 before our txn gets
637 //  around to unlocking 2, so we should not remove that lock.
remove_overlapping_locks_for_txnid(TXNID txnid,const DBT * left_key,const DBT * right_key)638 void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
639                                                   const DBT *left_key,
640                                                   const DBT *right_key) {
641   keyrange release_range;
642   release_range.create(left_key, right_key);
643 
644   // acquire and prepare a locked keyrange over the release range
645   concurrent_tree::locked_keyrange lkr;
646   lkr.prepare(m_rangetree);
647   lkr.acquire(release_range);
648 
649   // copy out the set of overlapping row locks.
650   GrowableArray<row_lock> overlapping_row_locks;
651   overlapping_row_locks.init();
652   iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
653   size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
654 
655   for (size_t i = 0; i < num_overlapping_row_locks; i++) {
656     row_lock lock = overlapping_row_locks.fetch_unchecked(i);
657     // If this isn't our lock, that's ok, just don't remove it.
658     // See rationale above.
659     // psergey-todo: for shared locks, just remove ourselves from the
660     //               owners.
661     if (lock.txnid == txnid || (lock.owners && lock.owners->contains(txnid))) {
662       remove_row_lock_from_tree(&lkr, lock, txnid, m_mgr);
663     }
664   }
665 
666   lkr.release();
667   overlapping_row_locks.deinit();
668   release_range.destroy();
669 }
670 
sto_txnid_is_valid_unsafe(void) const671 bool locktree::sto_txnid_is_valid_unsafe(void) const {
672   return toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE;
673 }
674 
sto_get_score_unsafe(void) const675 int locktree::sto_get_score_unsafe(void) const {
676   return toku_unsafe_fetch(m_sto_score);
677 }
678 
sto_try_release(TXNID txnid)679 bool locktree::sto_try_release(TXNID txnid) {
680   bool released = false;
681   if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
682     // check the bit again with a prepared locked keyrange,
683     // which protects the optimization bits and rangetree data
684     concurrent_tree::locked_keyrange lkr;
685     lkr.prepare(m_rangetree);
686     if (m_sto_txnid != TXNID_NONE) {
687       // this txnid better be the single txnid on this locktree,
688       // or else we are in big trouble (meaning the logic is broken)
689       invariant(m_sto_txnid == txnid);
690       invariant(m_rangetree->is_empty());
691       sto_end();
692       released = true;
693     }
694     lkr.release();
695   }
696   return released;
697 }
698 
699 // release all of the locks for a txnid whose endpoints are pairs
700 // in the given range buffer.
release_locks(TXNID txnid,const range_buffer * ranges,bool all_trx_locks_hint)701 void locktree::release_locks(TXNID txnid, const range_buffer *ranges,
702                              bool all_trx_locks_hint) {
703   // try the single txn optimization. if it worked, then all of the
704   // locks are already released, otherwise we need to do it here.
705   bool released;
706   if (all_trx_locks_hint) {
707     // This will release all of the locks the transaction is holding
708     released = sto_try_release(txnid);
709   } else {
710     /*
711       psergey: we are asked to release *Some* of the locks the transaction
712       is holding.
713       We could try doing that without leaving the STO mode, but right now,
714       the easiest way is to exit the STO mode and let the non-STO code path
715       handle it.
716     */
717     if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
718       // check the bit again with a prepared locked keyrange,
719       // which protects the optimization bits and rangetree data
720       concurrent_tree::locked_keyrange lkr;
721       lkr.prepare(m_rangetree);
722       if (m_sto_txnid != TXNID_NONE) {
723         sto_end_early(&lkr);
724       }
725       lkr.release();
726     }
727     released = false;
728   }
729   if (!released) {
730     range_buffer::iterator iter(ranges);
731     range_buffer::iterator::record rec;
732     while (iter.current(&rec)) {
733       const DBT *left_key = rec.get_left_key();
734       const DBT *right_key = rec.get_right_key();
735       // All ranges in the locktree must have left endpoints <= right endpoints.
736       // Range comparisons rely on this fact, so we make a paranoid invariant
737       // here.
738       paranoid_invariant(m_cmp(left_key, right_key) <= 0);
739       remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
740       iter.next();
741     }
742     // Increase the sto score slightly. Eventually it will hit
743     // the threshold and we'll try the optimization again. This
744     // is how a previously multithreaded system transitions into
745     // a single threaded system that benefits from the optimization.
746     if (toku_unsafe_fetch(m_sto_score) < STO_SCORE_THRESHOLD) {
747       toku_sync_fetch_and_add(&m_sto_score, 1);
748     }
749   }
750 }
751 
752 // iterate over a locked keyrange and extract copies of the first N
753 // row locks, storing each one into the given array of size N,
754 // then removing each extracted lock from the locked keyrange.
extract_first_n_row_locks(concurrent_tree::locked_keyrange * lkr,locktree_manager * mgr,row_lock * row_locks,int num_to_extract)755 static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
756                                      locktree_manager *mgr, row_lock *row_locks,
757                                      int num_to_extract) {
758   struct extract_fn_obj {
759     int num_extracted;
760     int num_to_extract;
761     row_lock *row_locks;
762     bool fn(const keyrange &range, TXNID txnid, bool is_shared,
763             TxnidVector *owners) {
764       if (num_extracted < num_to_extract) {
765         row_lock lock;
766         lock.range.create_copy(range);
767         lock.txnid = txnid;
768         lock.is_shared = is_shared;
769         // deep-copy the set of owners:
770         if (owners)
771           lock.owners = new TxnidVector(*owners);
772         else
773           lock.owners = nullptr;
774         row_locks[num_extracted++] = lock;
775         return true;
776       } else {
777         return false;
778       }
779     }
780   } extract_fn;
781 
782   extract_fn.row_locks = row_locks;
783   extract_fn.num_to_extract = num_to_extract;
784   extract_fn.num_extracted = 0;
785   lkr->iterate(&extract_fn);
786 
787   // now that the ranges have been copied out, complete
788   // the extraction by removing the ranges from the tree.
789   // use remove_row_lock_from_tree() so we properly track the
790   // amount of memory and number of locks freed.
791   int num_extracted = extract_fn.num_extracted;
792   invariant(num_extracted <= num_to_extract);
793   for (int i = 0; i < num_extracted; i++) {
794     remove_row_lock_from_tree(lkr, row_locks[i], TXNID_ANY, mgr);
795   }
796 
797   return num_extracted;
798 }
799 
800 // Store each newly escalated lock in a range buffer for appropriate txnid.
801 // We'll rebuild the locktree by iterating over these ranges, and then we
802 // can pass back each txnid/buffer pair individually through a callback
803 // to notify higher layers that locks have changed.
804 struct txnid_range_buffer {
805   TXNID txnid;
806   range_buffer buffer;
807 
find_by_txnidtoku::txnid_range_buffer808   static int find_by_txnid(struct txnid_range_buffer *const &other_buffer,
809                            const TXNID &txnid) {
810     if (txnid < other_buffer->txnid) {
811       return -1;
812     } else if (other_buffer->txnid == txnid) {
813       return 0;
814     } else {
815       return 1;
816     }
817   }
818 };
819 
820 // escalate the locks in the locktree by merging adjacent
821 // locks that have the same txnid into one larger lock.
822 //
823 // if there's only one txnid in the locktree then this
824 // approach works well. if there are many txnids and each
825 // has locks in a random/alternating order, then this does
826 // not work so well.
escalate(lt_escalate_cb after_escalate_callback,void * after_escalate_callback_extra)827 void locktree::escalate(lt_escalate_cb after_escalate_callback,
828                         void *after_escalate_callback_extra) {
829   omt<struct txnid_range_buffer *, struct txnid_range_buffer *> range_buffers;
830   range_buffers.create();
831 
832   // prepare and acquire a locked keyrange on the entire locktree
833   concurrent_tree::locked_keyrange lkr;
834   keyrange infinite_range = keyrange::get_infinite_range();
835   lkr.prepare(m_rangetree);
836   lkr.acquire(infinite_range);
837 
838   // if we're in the single txnid optimization, simply call it off.
839   // if you have to run escalation, you probably don't care about
840   // the optimization anyway, and this makes things easier.
841   if (m_sto_txnid != TXNID_NONE) {
842     // We are already accounting for this escalation time and
843     // count, so don't do it for sto_end_early too.
844     sto_end_early_no_accounting(&lkr);
845   }
846 
847   // extract and remove batches of row locks from the locktree
848   int num_extracted;
849   const int num_row_locks_per_batch = 128;
850   row_lock *XCALLOC_N(num_row_locks_per_batch, extracted_buf);
851 
852   // we always remove the "first" n because we are removing n
853   // each time we do an extraction. so this loops until its empty.
854   while ((num_extracted = extract_first_n_row_locks(
855               &lkr, m_mgr, extracted_buf, num_row_locks_per_batch)) > 0) {
856     int current_index = 0;
857     while (current_index < num_extracted) {
858       // every batch of extracted locks is in range-sorted order. search
859       // through them and merge adjacent locks with the same txnid into
860       // one dominating lock and save it to a set of escalated locks.
861       //
862       // first, find the index of the next row lock that
863       //  - belongs to a different txnid, or
864       //  - belongs to several txnids, or
865       //  - is a shared lock (we could potentially merge those but
866       //    currently we don't)
867       int next_txnid_index = current_index + 1;
868 
869       while (next_txnid_index < num_extracted &&
870              (extracted_buf[current_index].txnid ==
871               extracted_buf[next_txnid_index].txnid) &&
872              !extracted_buf[next_txnid_index].is_shared &&
873              !extracted_buf[next_txnid_index].owners) {
874         next_txnid_index++;
875       }
876 
877       // Create an escalated range for the current txnid that dominates
878       // each range between the current indext and the next txnid's index.
879       // const TXNID current_txnid = extracted_buf[current_index].txnid;
880       const DBT *escalated_left_key =
881           extracted_buf[current_index].range.get_left_key();
882       const DBT *escalated_right_key =
883           extracted_buf[next_txnid_index - 1].range.get_right_key();
884 
885       // Try to find a range buffer for the current txnid. Create one if it
886       // doesn't exist. Then, append the new escalated range to the buffer. (If
887       // a lock is shared by multiple txnids, append it each of txnid's lists)
888       TxnidVector *owners_ptr;
889       TxnidVector singleton_owner;
890       if (extracted_buf[current_index].owners)
891         owners_ptr = extracted_buf[current_index].owners;
892       else {
893         singleton_owner.insert(extracted_buf[current_index].txnid);
894         owners_ptr = &singleton_owner;
895       }
896 
897       for (auto cur_txnid : *owners_ptr) {
898         uint32_t idx;
899         struct txnid_range_buffer *existing_range_buffer;
900         int r =
901             range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
902                 cur_txnid, &existing_range_buffer, &idx);
903         if (r == DB_NOTFOUND) {
904           struct txnid_range_buffer *XMALLOC(new_range_buffer);
905           new_range_buffer->txnid = cur_txnid;
906           new_range_buffer->buffer.create();
907           new_range_buffer->buffer.append(
908               escalated_left_key, escalated_right_key,
909               !extracted_buf[current_index].is_shared);
910           range_buffers.insert_at(new_range_buffer, idx);
911         } else {
912           invariant_zero(r);
913           invariant(existing_range_buffer->txnid == cur_txnid);
914           existing_range_buffer->buffer.append(
915               escalated_left_key, escalated_right_key,
916               !extracted_buf[current_index].is_shared);
917         }
918       }
919 
920       current_index = next_txnid_index;
921     }
922 
923     // destroy the ranges copied during the extraction
924     for (int i = 0; i < num_extracted; i++) {
925       delete extracted_buf[i].owners;
926       extracted_buf[i].range.destroy();
927     }
928   }
929   toku_free(extracted_buf);
930 
931   // Rebuild the locktree from each range in each range buffer,
932   // then notify higher layers that the txnid's locks have changed.
933   //
934   // (shared locks: if a lock was initially shared between transactions TRX1,
935   //  TRX2, etc, we will now try to acquire it acting on behalf on TRX1, on
936   //  TRX2, etc.  This will succeed and an identical shared lock will be
937   //  constructed)
938 
939   invariant(m_rangetree->is_empty());
940   const uint32_t num_range_buffers = range_buffers.size();
941   for (uint32_t i = 0; i < num_range_buffers; i++) {
942     struct txnid_range_buffer *current_range_buffer;
943     int r = range_buffers.fetch(i, &current_range_buffer);
944     invariant_zero(r);
945     if (r == EINVAL)  // Shouldn't happen, avoid compiler warning
946       continue;
947 
948     const TXNID current_txnid = current_range_buffer->txnid;
949     range_buffer::iterator iter(&current_range_buffer->buffer);
950     range_buffer::iterator::record rec;
951     while (iter.current(&rec)) {
952       keyrange range;
953       range.create(rec.get_left_key(), rec.get_right_key());
954       row_lock lock = {.range = range,
955                        .txnid = current_txnid,
956                        .is_shared = !rec.get_exclusive_flag(),
957                        .owners = nullptr};
958       insert_row_lock_into_tree(&lkr, lock, m_mgr);
959       iter.next();
960     }
961 
962     // Notify higher layers that locks have changed for the current txnid
963     if (after_escalate_callback) {
964       after_escalate_callback(current_txnid, this, current_range_buffer->buffer,
965                               after_escalate_callback_extra);
966     }
967     current_range_buffer->buffer.destroy();
968   }
969 
970   while (range_buffers.size() > 0) {
971     struct txnid_range_buffer *buffer;
972     int r = range_buffers.fetch(0, &buffer);
973     invariant_zero(r);
974     r = range_buffers.delete_at(0);
975     invariant_zero(r);
976     toku_free(buffer);
977   }
978   range_buffers.destroy();
979 
980   lkr.release();
981 }
982 
get_userdata(void) const983 void *locktree::get_userdata(void) const { return m_userdata; }
984 
set_userdata(void * userdata)985 void locktree::set_userdata(void *userdata) { m_userdata = userdata; }
986 
get_lock_request_info(void)987 struct lt_lock_request_info *locktree::get_lock_request_info(void) {
988   return &m_lock_request_info;
989 }
990 
set_comparator(const comparator & cmp)991 void locktree::set_comparator(const comparator &cmp) { m_cmp.inherit(cmp); }
992 
get_manager(void) const993 locktree_manager *locktree::get_manager(void) const { return m_mgr; }
994 
compare(const locktree * lt) const995 int locktree::compare(const locktree *lt) const {
996   if (m_dict_id.dictid < lt->m_dict_id.dictid) {
997     return -1;
998   } else if (m_dict_id.dictid == lt->m_dict_id.dictid) {
999     return 0;
1000   } else {
1001     return 1;
1002   }
1003 }
1004 
get_dict_id() const1005 DICTIONARY_ID locktree::get_dict_id() const { return m_dict_id; }
1006 
1007 } /* namespace toku */
1008 #endif  // OS_WIN
1009 #endif  // ROCKSDB_LITE
1010