1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 
36 ----------------------------------------
37 
38    Licensed under the Apache License, Version 2.0 (the "License");
39    you may not use this file except in compliance with the License.
40    You may obtain a copy of the License at
41 
42        http://www.apache.org/licenses/LICENSE-2.0
43 
44    Unless required by applicable law or agreed to in writing, software
45    distributed under the License is distributed on an "AS IS" BASIS,
46    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47    See the License for the specific language governing permissions and
48    limitations under the License.
49 ======= */
50 
51 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
52 
53 #include <stdlib.h>
54 #include <string.h>
55 #include <portability/toku_pthread.h>
56 
57 #include "locktree.h"
58 #include "lock_request.h"
59 
60 #include <util/status.h>
61 
62 namespace toku {
63 
create(lt_create_cb create_cb,lt_destroy_cb destroy_cb,lt_escalate_cb escalate_cb,void * escalate_extra)64 void locktree_manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) {
65     m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY;
66     m_current_lock_memory = 0;
67 
68     m_locktree_map.create();
69     m_lt_create_callback = create_cb;
70     m_lt_destroy_callback = destroy_cb;
71     m_lt_escalate_callback = escalate_cb;
72     m_lt_escalate_callback_extra = escalate_extra;
73     ZERO_STRUCT(m_mutex);
74     toku_mutex_init(*manager_mutex_key, &m_mutex, nullptr);
75 
76     ZERO_STRUCT(m_lt_counters);
77 
78     escalator_init();
79 }
80 
destroy(void)81 void locktree_manager::destroy(void) {
82     escalator_destroy();
83     invariant(m_current_lock_memory == 0);
84     invariant(m_locktree_map.size() == 0);
85     m_locktree_map.destroy();
86     toku_mutex_destroy(&m_mutex);
87 }
88 
mutex_lock(void)89 void locktree_manager::mutex_lock(void) {
90     toku_mutex_lock(&m_mutex);
91 }
92 
mutex_unlock(void)93 void locktree_manager::mutex_unlock(void) {
94     toku_mutex_unlock(&m_mutex);
95 }
96 
get_max_lock_memory(void)97 size_t locktree_manager::get_max_lock_memory(void) {
98     return m_max_lock_memory;
99 }
100 
set_max_lock_memory(size_t max_lock_memory)101 int locktree_manager::set_max_lock_memory(size_t max_lock_memory) {
102     int r = 0;
103     mutex_lock();
104     if (max_lock_memory < m_current_lock_memory) {
105         r = EDOM;
106     } else {
107         m_max_lock_memory = max_lock_memory;
108     }
109     mutex_unlock();
110     return r;
111 }
112 
find_by_dict_id(locktree * const & lt,const DICTIONARY_ID & dict_id)113 int locktree_manager::find_by_dict_id(locktree *const &lt, const DICTIONARY_ID &dict_id) {
114     if (lt->get_dict_id().dictid < dict_id.dictid) {
115         return -1;
116     } else if (lt->get_dict_id().dictid == dict_id.dictid) {
117         return 0;
118     } else {
119         return 1;
120     }
121 }
122 
locktree_map_find(const DICTIONARY_ID & dict_id)123 locktree *locktree_manager::locktree_map_find(const DICTIONARY_ID &dict_id) {
124     locktree *lt;
125     int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>(dict_id, &lt, nullptr);
126     return r == 0 ? lt : nullptr;
127 }
128 
locktree_map_put(locktree * lt)129 void locktree_manager::locktree_map_put(locktree *lt) {
130     int r = m_locktree_map.insert<DICTIONARY_ID, find_by_dict_id>(lt, lt->get_dict_id(), nullptr);
131     invariant_zero(r);
132 }
133 
locktree_map_remove(locktree * lt)134 void locktree_manager::locktree_map_remove(locktree *lt) {
135     uint32_t idx;
136     locktree *found_lt;
137     int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>(
138             lt->get_dict_id(), &found_lt, &idx);
139     invariant_zero(r);
140     invariant(found_lt == lt);
141     r = m_locktree_map.delete_at(idx);
142     invariant_zero(r);
143 }
144 
get_lt(DICTIONARY_ID dict_id,const comparator & cmp,void * on_create_extra)145 locktree *locktree_manager::get_lt(DICTIONARY_ID dict_id,
146                                    const comparator &cmp, void *on_create_extra) {
147 
148     // hold the mutex around searching and maybe
149     // inserting into the locktree map
150     mutex_lock();
151 
152     locktree *lt = locktree_map_find(dict_id);
153     if (lt == nullptr) {
154         XCALLOC(lt);
155         lt->create(this, dict_id, cmp);
156 
157         // new locktree created - call the on_create callback
158         // and put it in the locktree map
159         if (m_lt_create_callback) {
160             int r = m_lt_create_callback(lt, on_create_extra);
161             if (r != 0) {
162                 lt->release_reference();
163                 lt->destroy();
164                 toku_free(lt);
165                 lt = nullptr;
166             }
167         }
168         if (lt) {
169             locktree_map_put(lt);
170         }
171     } else {
172         reference_lt(lt);
173     }
174 
175     mutex_unlock();
176 
177     return lt;
178 }
179 
reference_lt(locktree * lt)180 void locktree_manager::reference_lt(locktree *lt) {
181     // increment using a sync fetch and add.
182     // the caller guarantees that the lt won't be
183     // destroyed while we increment the count here.
184     //
185     // the caller can do this by already having an lt
186     // reference or by holding the manager mutex.
187     //
188     // if the manager's mutex is held, it is ok for the
189     // reference count to transition from 0 to 1 (no race),
190     // since we're serialized with other opens and closes.
191     lt->add_reference();
192 }
193 
release_lt(locktree * lt)194 void locktree_manager::release_lt(locktree *lt) {
195     bool do_destroy = false;
196     DICTIONARY_ID dict_id = lt->get_dict_id();
197 
198     // Release a reference on the locktree. If the count transitions to zero,
199     // then we *may* need to do the cleanup.
200     //
201     // Grab the manager's mutex and look for a locktree with this locktree's
202     // dictionary id. Since dictionary id's never get reused, any locktree
203     // found must be the one we just released a reference on.
204     //
205     // At least two things could have happened since we got the mutex:
206     // - Another thread gets a locktree with the same dict_id, increments
207     // the reference count. In this case, we shouldn't destroy it.
208     // - Another thread gets a locktree with the same dict_id and then
209     // releases it quickly, transitioning the reference count from zero to
210     // one and back to zero. In this case, only one of us should destroy it.
211     // It doesn't matter which. We originally missed this case, see #5776.
212     //
213     // After 5776, the high level rule for release is described below.
214     //
215     // If a thread releases a locktree and notices the reference count transition
216     // to zero, then that thread must immediately:
217     // - assume the locktree object is invalid
218     // - grab the manager's mutex
219     // - search the locktree map for a locktree with the same dict_id and remove
220     // it, if it exists. the destroy may be deferred.
221     // - release the manager's mutex
222     //
223     // This way, if many threads transition the same locktree's reference count
224     // from 1 to zero and wait behind the manager's mutex, only one of them will
225     // do the actual destroy and the others will happily do nothing.
226     uint32_t refs = lt->release_reference();
227     if (refs == 0) {
228         mutex_lock();
229         // lt may not have already been destroyed, so look it up.
230         locktree *find_lt = locktree_map_find(dict_id);
231         if (find_lt != nullptr) {
232             // A locktree is still in the map with that dict_id, so it must be
233             // equal to lt. This is true because dictionary ids are never reused.
234             // If the reference count is zero, it's our responsibility to remove
235             // it and do the destroy. Otherwise, someone still wants it.
236             // If the locktree is still valid then check if it should be deleted.
237             if (find_lt == lt) {
238                 if (lt->get_reference_count() == 0) {
239                     locktree_map_remove(lt);
240                     do_destroy = true;
241                 }
242                 m_lt_counters.add(lt->get_lock_request_info()->counters);
243             }
244         }
245         mutex_unlock();
246     }
247 
248     // if necessary, do the destroy without holding the mutex
249     if (do_destroy) {
250         if (m_lt_destroy_callback) {
251             m_lt_destroy_callback(lt);
252         }
253         lt->destroy();
254         toku_free(lt);
255     }
256 }
257 
run_escalation(void)258 void locktree_manager::run_escalation(void) {
259     struct escalation_fn {
260         static void run(void *extra) {
261             locktree_manager *mgr = (locktree_manager *) extra;
262             mgr->escalate_all_locktrees();
263         };
264     };
265     m_escalator.run(this, escalation_fn::run, this);
266 }
267 
268 // test-only version of lock escalation
run_escalation_for_test(void)269 void locktree_manager::run_escalation_for_test(void) {
270     run_escalation();
271 }
272 
escalate_all_locktrees(void)273 void locktree_manager::escalate_all_locktrees(void) {
274     uint64_t t0 = toku_current_time_microsec();
275 
276     // get all locktrees
277     mutex_lock();
278     int num_locktrees = m_locktree_map.size();
279     locktree **locktrees = new locktree *[num_locktrees];
280     for (int i = 0; i < num_locktrees; i++) {
281         int r = m_locktree_map.fetch(i, &locktrees[i]);
282         invariant_zero(r);
283         reference_lt(locktrees[i]);
284     }
285     mutex_unlock();
286 
287     // escalate them
288     escalate_locktrees(locktrees, num_locktrees);
289 
290     delete [] locktrees;
291 
292     uint64_t t1 = toku_current_time_microsec();
293     add_escalator_wait_time(t1 - t0);
294 }
295 
note_mem_used(uint64_t mem_used)296 void locktree_manager::note_mem_used(uint64_t mem_used) {
297     (void) toku_sync_fetch_and_add(&m_current_lock_memory, mem_used);
298 }
299 
note_mem_released(uint64_t mem_released)300 void locktree_manager::note_mem_released(uint64_t mem_released) {
301     uint64_t old_mem_used = toku_sync_fetch_and_sub(&m_current_lock_memory, mem_released);
302     invariant(old_mem_used >= mem_released);
303 }
304 
out_of_locks(void) const305 bool locktree_manager::out_of_locks(void) const {
306     return m_current_lock_memory >= m_max_lock_memory;
307 }
308 
over_big_threshold(void)309 bool locktree_manager::over_big_threshold(void) {
310     return m_current_lock_memory >= m_max_lock_memory / 2;
311 }
312 
iterate_pending_lock_requests(lock_request_iterate_callback callback,void * extra)313 int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callback callback,
314                                                     void *extra) {
315     mutex_lock();
316     int r = 0;
317     size_t num_locktrees = m_locktree_map.size();
318     for (size_t i = 0; i < num_locktrees && r == 0; i++) {
319         locktree *lt;
320         r = m_locktree_map.fetch(i, &lt);
321         invariant_zero(r);
322 
323         struct lt_lock_request_info *info = lt->get_lock_request_info();
324         toku_mutex_lock(&info->mutex);
325 
326         size_t num_requests = info->pending_lock_requests.size();
327         for (size_t k = 0; k < num_requests && r == 0; k++) {
328             lock_request *req;
329             r = info->pending_lock_requests.fetch(k, &req);
330             invariant_zero(r);
331             r = callback(lt->get_dict_id(), req->get_txnid(),
332                          req->get_left_key(), req->get_right_key(),
333                          req->get_conflicting_txnid(), req->get_start_time(), extra);
334         }
335 
336         toku_mutex_unlock(&info->mutex);
337     }
338     mutex_unlock();
339     return r;
340 }
341 
check_current_lock_constraints(bool big_txn)342 int locktree_manager::check_current_lock_constraints(bool big_txn) {
343     int r = 0;
344     if (big_txn && over_big_threshold()) {
345         run_escalation();
346         if (over_big_threshold()) {
347             r = TOKUDB_OUT_OF_LOCKS;
348         }
349     }
350     if (r == 0 && out_of_locks()) {
351         run_escalation();
352         if (out_of_locks()) {
353             // return an error if we're still out of locks after escalation.
354             r = TOKUDB_OUT_OF_LOCKS;
355         }
356     }
357     return r;
358 }
359 
escalator_init(void)360 void locktree_manager::escalator_init(void) {
361     ZERO_STRUCT(m_escalation_mutex);
362     toku_mutex_init(
363         *manager_escalation_mutex_key, &m_escalation_mutex, nullptr);
364     m_escalation_count = 0;
365     m_escalation_time = 0;
366     m_wait_escalation_count = 0;
367     m_wait_escalation_time = 0;
368     m_long_wait_escalation_count = 0;
369     m_long_wait_escalation_time = 0;
370     m_escalation_latest_result = 0;
371     m_escalator.create();
372 }
373 
escalator_destroy(void)374 void locktree_manager::escalator_destroy(void) {
375     m_escalator.destroy();
376     toku_mutex_destroy(&m_escalation_mutex);
377 }
378 
add_escalator_wait_time(uint64_t t)379 void locktree_manager::add_escalator_wait_time(uint64_t t) {
380     toku_mutex_lock(&m_escalation_mutex);
381     m_wait_escalation_count += 1;
382     m_wait_escalation_time += t;
383     if (t >= 1000000) {
384         m_long_wait_escalation_count += 1;
385         m_long_wait_escalation_time += t;
386     }
387     toku_mutex_unlock(&m_escalation_mutex);
388 }
389 
escalate_locktrees(locktree ** locktrees,int num_locktrees)390 void locktree_manager::escalate_locktrees(locktree **locktrees, int num_locktrees) {
391     // there are too many row locks in the system and we need to tidy up.
392     //
393     // a simple implementation of escalation does not attempt
394     // to reduce the memory foot print of each txn's range buffer.
395     // doing so would require some layering hackery (or a callback)
396     // and more complicated locking. for now, just escalate each
397     // locktree individually, in-place.
398     tokutime_t t0 = toku_time_now();
399     for (int i = 0; i < num_locktrees; i++) {
400         locktrees[i]->escalate(m_lt_escalate_callback, m_lt_escalate_callback_extra);
401         release_lt(locktrees[i]);
402     }
403     tokutime_t t1 = toku_time_now();
404 
405     toku_mutex_lock(&m_escalation_mutex);
406     m_escalation_count++;
407     m_escalation_time += (t1 - t0);
408     m_escalation_latest_result = m_current_lock_memory;
409     toku_mutex_unlock(&m_escalation_mutex);
410 }
411 
412 struct escalate_args {
413     locktree_manager *mgr;
414     locktree **locktrees;
415     int num_locktrees;
416 };
417 
create(void)418 void locktree_manager::locktree_escalator::create(void) {
419     ZERO_STRUCT(m_escalator_mutex);
420     toku_mutex_init(*manager_escalator_mutex_key, &m_escalator_mutex, nullptr);
421     toku_cond_init(*manager_m_escalator_done_key, &m_escalator_done, nullptr);
422     m_escalator_running = false;
423 }
424 
destroy(void)425 void locktree_manager::locktree_escalator::destroy(void) {
426     toku_cond_destroy(&m_escalator_done);
427     toku_mutex_destroy(&m_escalator_mutex);
428 }
429 
run(locktree_manager * mgr,void (* escalate_locktrees_fun)(void * extra),void * extra)430 void locktree_manager::locktree_escalator::run(locktree_manager *mgr, void (*escalate_locktrees_fun)(void *extra), void *extra) {
431     uint64_t t0 = toku_current_time_microsec();
432     toku_mutex_lock(&m_escalator_mutex);
433     if (!m_escalator_running) {
434         // run escalation on this thread
435         m_escalator_running = true;
436         toku_mutex_unlock(&m_escalator_mutex);
437         escalate_locktrees_fun(extra);
438         toku_mutex_lock(&m_escalator_mutex);
439         m_escalator_running = false;
440         toku_cond_broadcast(&m_escalator_done);
441     } else {
442         toku_cond_wait(&m_escalator_done, &m_escalator_mutex);
443     }
444     toku_mutex_unlock(&m_escalator_mutex);
445     uint64_t t1 = toku_current_time_microsec();
446     mgr->add_escalator_wait_time(t1 - t0);
447 }
448 
get_status(LTM_STATUS statp)449 void locktree_manager::get_status(LTM_STATUS statp) {
450     ltm_status.init();
451     LTM_STATUS_VAL(LTM_SIZE_CURRENT) = m_current_lock_memory;
452     LTM_STATUS_VAL(LTM_SIZE_LIMIT) = m_max_lock_memory;
453     LTM_STATUS_VAL(LTM_ESCALATION_COUNT) = m_escalation_count;
454     LTM_STATUS_VAL(LTM_ESCALATION_TIME) = m_escalation_time;
455     LTM_STATUS_VAL(LTM_ESCALATION_LATEST_RESULT) = m_escalation_latest_result;
456     LTM_STATUS_VAL(LTM_WAIT_ESCALATION_COUNT) = m_wait_escalation_count;
457     LTM_STATUS_VAL(LTM_WAIT_ESCALATION_TIME) = m_wait_escalation_time;
458     LTM_STATUS_VAL(LTM_LONG_WAIT_ESCALATION_COUNT) = m_long_wait_escalation_count;
459     LTM_STATUS_VAL(LTM_LONG_WAIT_ESCALATION_TIME) = m_long_wait_escalation_time;
460 
461     uint64_t lock_requests_pending = 0;
462     uint64_t sto_num_eligible = 0;
463     uint64_t sto_end_early_count = 0;
464     tokutime_t sto_end_early_time = 0;
465     size_t num_locktrees = 0;
466     struct lt_counters lt_counters = {};
467 
468     if (toku_mutex_trylock(&m_mutex) == 0) {
469         lt_counters = m_lt_counters;
470         num_locktrees = m_locktree_map.size();
471         for (size_t i = 0; i < num_locktrees; i++) {
472             locktree *lt;
473             int r = m_locktree_map.fetch(i, &lt);
474             invariant_zero(r);
475             if (toku_mutex_trylock(&lt->m_lock_request_info.mutex) == 0) {
476                 lock_requests_pending += lt->m_lock_request_info.pending_lock_requests.size();
477                 lt_counters.add(lt->get_lock_request_info()->counters);
478                 toku_mutex_unlock(&lt->m_lock_request_info.mutex);
479             }
480             sto_num_eligible += lt->sto_txnid_is_valid_unsafe() ? 1 : 0;
481             sto_end_early_count += lt->m_sto_end_early_count;
482             sto_end_early_time += lt->m_sto_end_early_time;
483         }
484         mutex_unlock();
485     }
486 
487     LTM_STATUS_VAL(LTM_NUM_LOCKTREES) = num_locktrees;
488     LTM_STATUS_VAL(LTM_LOCK_REQUESTS_PENDING) = lock_requests_pending;
489     LTM_STATUS_VAL(LTM_STO_NUM_ELIGIBLE) = sto_num_eligible;
490     LTM_STATUS_VAL(LTM_STO_END_EARLY_COUNT) = sto_end_early_count;
491     LTM_STATUS_VAL(LTM_STO_END_EARLY_TIME) = sto_end_early_time;
492     LTM_STATUS_VAL(LTM_WAIT_COUNT) = lt_counters.wait_count;
493     LTM_STATUS_VAL(LTM_WAIT_TIME) = lt_counters.wait_time;
494     LTM_STATUS_VAL(LTM_LONG_WAIT_COUNT) = lt_counters.long_wait_count;
495     LTM_STATUS_VAL(LTM_LONG_WAIT_TIME) = lt_counters.long_wait_time;
496     LTM_STATUS_VAL(LTM_TIMEOUT_COUNT) = lt_counters.timeout_count;
497     *statp = ltm_status;
498 }
499 
kill_waiter(void * extra)500 void locktree_manager::kill_waiter(void *extra) {
501     mutex_lock();
502     int r = 0;
503     size_t num_locktrees = m_locktree_map.size();
504     for (size_t i = 0; i < num_locktrees; i++) {
505         locktree *lt;
506         r = m_locktree_map.fetch(i, &lt);
507         invariant_zero(r);
508         lock_request::kill_waiter(lt, extra);
509     }
510     mutex_unlock();
511 }
512 
513 } /* namespace toku */
514