1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35
36 ----------------------------------------
37
38 Licensed under the Apache License, Version 2.0 (the "License");
39 you may not use this file except in compliance with the License.
40 You may obtain a copy of the License at
41
42 http://www.apache.org/licenses/LICENSE-2.0
43
44 Unless required by applicable law or agreed to in writing, software
45 distributed under the License is distributed on an "AS IS" BASIS,
46 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47 See the License for the specific language governing permissions and
48 limitations under the License.
49 ======= */
50
51 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
52
53 #include <stdlib.h>
54 #include <string.h>
55 #include <portability/toku_pthread.h>
56
57 #include "locktree.h"
58 #include "lock_request.h"
59
60 #include <util/status.h>
61
62 namespace toku {
63
create(lt_create_cb create_cb,lt_destroy_cb destroy_cb,lt_escalate_cb escalate_cb,void * escalate_extra)64 void locktree_manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) {
65 m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY;
66 m_current_lock_memory = 0;
67
68 m_locktree_map.create();
69 m_lt_create_callback = create_cb;
70 m_lt_destroy_callback = destroy_cb;
71 m_lt_escalate_callback = escalate_cb;
72 m_lt_escalate_callback_extra = escalate_extra;
73 ZERO_STRUCT(m_mutex);
74 toku_mutex_init(*manager_mutex_key, &m_mutex, nullptr);
75
76 ZERO_STRUCT(m_lt_counters);
77
78 escalator_init();
79 }
80
destroy(void)81 void locktree_manager::destroy(void) {
82 escalator_destroy();
83 invariant(m_current_lock_memory == 0);
84 invariant(m_locktree_map.size() == 0);
85 m_locktree_map.destroy();
86 toku_mutex_destroy(&m_mutex);
87 }
88
mutex_lock(void)89 void locktree_manager::mutex_lock(void) {
90 toku_mutex_lock(&m_mutex);
91 }
92
mutex_unlock(void)93 void locktree_manager::mutex_unlock(void) {
94 toku_mutex_unlock(&m_mutex);
95 }
96
get_max_lock_memory(void)97 size_t locktree_manager::get_max_lock_memory(void) {
98 return m_max_lock_memory;
99 }
100
set_max_lock_memory(size_t max_lock_memory)101 int locktree_manager::set_max_lock_memory(size_t max_lock_memory) {
102 int r = 0;
103 mutex_lock();
104 if (max_lock_memory < m_current_lock_memory) {
105 r = EDOM;
106 } else {
107 m_max_lock_memory = max_lock_memory;
108 }
109 mutex_unlock();
110 return r;
111 }
112
find_by_dict_id(locktree * const & lt,const DICTIONARY_ID & dict_id)113 int locktree_manager::find_by_dict_id(locktree *const <, const DICTIONARY_ID &dict_id) {
114 if (lt->get_dict_id().dictid < dict_id.dictid) {
115 return -1;
116 } else if (lt->get_dict_id().dictid == dict_id.dictid) {
117 return 0;
118 } else {
119 return 1;
120 }
121 }
122
locktree_map_find(const DICTIONARY_ID & dict_id)123 locktree *locktree_manager::locktree_map_find(const DICTIONARY_ID &dict_id) {
124 locktree *lt;
125 int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>(dict_id, <, nullptr);
126 return r == 0 ? lt : nullptr;
127 }
128
locktree_map_put(locktree * lt)129 void locktree_manager::locktree_map_put(locktree *lt) {
130 int r = m_locktree_map.insert<DICTIONARY_ID, find_by_dict_id>(lt, lt->get_dict_id(), nullptr);
131 invariant_zero(r);
132 }
133
locktree_map_remove(locktree * lt)134 void locktree_manager::locktree_map_remove(locktree *lt) {
135 uint32_t idx;
136 locktree *found_lt;
137 int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>(
138 lt->get_dict_id(), &found_lt, &idx);
139 invariant_zero(r);
140 invariant(found_lt == lt);
141 r = m_locktree_map.delete_at(idx);
142 invariant_zero(r);
143 }
144
get_lt(DICTIONARY_ID dict_id,const comparator & cmp,void * on_create_extra)145 locktree *locktree_manager::get_lt(DICTIONARY_ID dict_id,
146 const comparator &cmp, void *on_create_extra) {
147
148 // hold the mutex around searching and maybe
149 // inserting into the locktree map
150 mutex_lock();
151
152 locktree *lt = locktree_map_find(dict_id);
153 if (lt == nullptr) {
154 XCALLOC(lt);
155 lt->create(this, dict_id, cmp);
156
157 // new locktree created - call the on_create callback
158 // and put it in the locktree map
159 if (m_lt_create_callback) {
160 int r = m_lt_create_callback(lt, on_create_extra);
161 if (r != 0) {
162 lt->release_reference();
163 lt->destroy();
164 toku_free(lt);
165 lt = nullptr;
166 }
167 }
168 if (lt) {
169 locktree_map_put(lt);
170 }
171 } else {
172 reference_lt(lt);
173 }
174
175 mutex_unlock();
176
177 return lt;
178 }
179
reference_lt(locktree * lt)180 void locktree_manager::reference_lt(locktree *lt) {
181 // increment using a sync fetch and add.
182 // the caller guarantees that the lt won't be
183 // destroyed while we increment the count here.
184 //
185 // the caller can do this by already having an lt
186 // reference or by holding the manager mutex.
187 //
188 // if the manager's mutex is held, it is ok for the
189 // reference count to transition from 0 to 1 (no race),
190 // since we're serialized with other opens and closes.
191 lt->add_reference();
192 }
193
release_lt(locktree * lt)194 void locktree_manager::release_lt(locktree *lt) {
195 bool do_destroy = false;
196 DICTIONARY_ID dict_id = lt->get_dict_id();
197
198 // Release a reference on the locktree. If the count transitions to zero,
199 // then we *may* need to do the cleanup.
200 //
201 // Grab the manager's mutex and look for a locktree with this locktree's
202 // dictionary id. Since dictionary id's never get reused, any locktree
203 // found must be the one we just released a reference on.
204 //
205 // At least two things could have happened since we got the mutex:
206 // - Another thread gets a locktree with the same dict_id, increments
207 // the reference count. In this case, we shouldn't destroy it.
208 // - Another thread gets a locktree with the same dict_id and then
209 // releases it quickly, transitioning the reference count from zero to
210 // one and back to zero. In this case, only one of us should destroy it.
211 // It doesn't matter which. We originally missed this case, see #5776.
212 //
213 // After 5776, the high level rule for release is described below.
214 //
215 // If a thread releases a locktree and notices the reference count transition
216 // to zero, then that thread must immediately:
217 // - assume the locktree object is invalid
218 // - grab the manager's mutex
219 // - search the locktree map for a locktree with the same dict_id and remove
220 // it, if it exists. the destroy may be deferred.
221 // - release the manager's mutex
222 //
223 // This way, if many threads transition the same locktree's reference count
224 // from 1 to zero and wait behind the manager's mutex, only one of them will
225 // do the actual destroy and the others will happily do nothing.
226 uint32_t refs = lt->release_reference();
227 if (refs == 0) {
228 mutex_lock();
229 // lt may not have already been destroyed, so look it up.
230 locktree *find_lt = locktree_map_find(dict_id);
231 if (find_lt != nullptr) {
232 // A locktree is still in the map with that dict_id, so it must be
233 // equal to lt. This is true because dictionary ids are never reused.
234 // If the reference count is zero, it's our responsibility to remove
235 // it and do the destroy. Otherwise, someone still wants it.
236 // If the locktree is still valid then check if it should be deleted.
237 if (find_lt == lt) {
238 if (lt->get_reference_count() == 0) {
239 locktree_map_remove(lt);
240 do_destroy = true;
241 }
242 m_lt_counters.add(lt->get_lock_request_info()->counters);
243 }
244 }
245 mutex_unlock();
246 }
247
248 // if necessary, do the destroy without holding the mutex
249 if (do_destroy) {
250 if (m_lt_destroy_callback) {
251 m_lt_destroy_callback(lt);
252 }
253 lt->destroy();
254 toku_free(lt);
255 }
256 }
257
run_escalation(void)258 void locktree_manager::run_escalation(void) {
259 struct escalation_fn {
260 static void run(void *extra) {
261 locktree_manager *mgr = (locktree_manager *) extra;
262 mgr->escalate_all_locktrees();
263 };
264 };
265 m_escalator.run(this, escalation_fn::run, this);
266 }
267
268 // test-only version of lock escalation
run_escalation_for_test(void)269 void locktree_manager::run_escalation_for_test(void) {
270 run_escalation();
271 }
272
escalate_all_locktrees(void)273 void locktree_manager::escalate_all_locktrees(void) {
274 uint64_t t0 = toku_current_time_microsec();
275
276 // get all locktrees
277 mutex_lock();
278 int num_locktrees = m_locktree_map.size();
279 locktree **locktrees = new locktree *[num_locktrees];
280 for (int i = 0; i < num_locktrees; i++) {
281 int r = m_locktree_map.fetch(i, &locktrees[i]);
282 invariant_zero(r);
283 reference_lt(locktrees[i]);
284 }
285 mutex_unlock();
286
287 // escalate them
288 escalate_locktrees(locktrees, num_locktrees);
289
290 delete [] locktrees;
291
292 uint64_t t1 = toku_current_time_microsec();
293 add_escalator_wait_time(t1 - t0);
294 }
295
note_mem_used(uint64_t mem_used)296 void locktree_manager::note_mem_used(uint64_t mem_used) {
297 (void) toku_sync_fetch_and_add(&m_current_lock_memory, mem_used);
298 }
299
note_mem_released(uint64_t mem_released)300 void locktree_manager::note_mem_released(uint64_t mem_released) {
301 uint64_t old_mem_used = toku_sync_fetch_and_sub(&m_current_lock_memory, mem_released);
302 invariant(old_mem_used >= mem_released);
303 }
304
out_of_locks(void) const305 bool locktree_manager::out_of_locks(void) const {
306 return m_current_lock_memory >= m_max_lock_memory;
307 }
308
over_big_threshold(void)309 bool locktree_manager::over_big_threshold(void) {
310 return m_current_lock_memory >= m_max_lock_memory / 2;
311 }
312
iterate_pending_lock_requests(lock_request_iterate_callback callback,void * extra)313 int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callback callback,
314 void *extra) {
315 mutex_lock();
316 int r = 0;
317 size_t num_locktrees = m_locktree_map.size();
318 for (size_t i = 0; i < num_locktrees && r == 0; i++) {
319 locktree *lt;
320 r = m_locktree_map.fetch(i, <);
321 invariant_zero(r);
322
323 struct lt_lock_request_info *info = lt->get_lock_request_info();
324 toku_mutex_lock(&info->mutex);
325
326 size_t num_requests = info->pending_lock_requests.size();
327 for (size_t k = 0; k < num_requests && r == 0; k++) {
328 lock_request *req;
329 r = info->pending_lock_requests.fetch(k, &req);
330 invariant_zero(r);
331 r = callback(lt->get_dict_id(), req->get_txnid(),
332 req->get_left_key(), req->get_right_key(),
333 req->get_conflicting_txnid(), req->get_start_time(), extra);
334 }
335
336 toku_mutex_unlock(&info->mutex);
337 }
338 mutex_unlock();
339 return r;
340 }
341
check_current_lock_constraints(bool big_txn)342 int locktree_manager::check_current_lock_constraints(bool big_txn) {
343 int r = 0;
344 if (big_txn && over_big_threshold()) {
345 run_escalation();
346 if (over_big_threshold()) {
347 r = TOKUDB_OUT_OF_LOCKS;
348 }
349 }
350 if (r == 0 && out_of_locks()) {
351 run_escalation();
352 if (out_of_locks()) {
353 // return an error if we're still out of locks after escalation.
354 r = TOKUDB_OUT_OF_LOCKS;
355 }
356 }
357 return r;
358 }
359
escalator_init(void)360 void locktree_manager::escalator_init(void) {
361 ZERO_STRUCT(m_escalation_mutex);
362 toku_mutex_init(
363 *manager_escalation_mutex_key, &m_escalation_mutex, nullptr);
364 m_escalation_count = 0;
365 m_escalation_time = 0;
366 m_wait_escalation_count = 0;
367 m_wait_escalation_time = 0;
368 m_long_wait_escalation_count = 0;
369 m_long_wait_escalation_time = 0;
370 m_escalation_latest_result = 0;
371 m_escalator.create();
372 }
373
escalator_destroy(void)374 void locktree_manager::escalator_destroy(void) {
375 m_escalator.destroy();
376 toku_mutex_destroy(&m_escalation_mutex);
377 }
378
add_escalator_wait_time(uint64_t t)379 void locktree_manager::add_escalator_wait_time(uint64_t t) {
380 toku_mutex_lock(&m_escalation_mutex);
381 m_wait_escalation_count += 1;
382 m_wait_escalation_time += t;
383 if (t >= 1000000) {
384 m_long_wait_escalation_count += 1;
385 m_long_wait_escalation_time += t;
386 }
387 toku_mutex_unlock(&m_escalation_mutex);
388 }
389
escalate_locktrees(locktree ** locktrees,int num_locktrees)390 void locktree_manager::escalate_locktrees(locktree **locktrees, int num_locktrees) {
391 // there are too many row locks in the system and we need to tidy up.
392 //
393 // a simple implementation of escalation does not attempt
394 // to reduce the memory foot print of each txn's range buffer.
395 // doing so would require some layering hackery (or a callback)
396 // and more complicated locking. for now, just escalate each
397 // locktree individually, in-place.
398 tokutime_t t0 = toku_time_now();
399 for (int i = 0; i < num_locktrees; i++) {
400 locktrees[i]->escalate(m_lt_escalate_callback, m_lt_escalate_callback_extra);
401 release_lt(locktrees[i]);
402 }
403 tokutime_t t1 = toku_time_now();
404
405 toku_mutex_lock(&m_escalation_mutex);
406 m_escalation_count++;
407 m_escalation_time += (t1 - t0);
408 m_escalation_latest_result = m_current_lock_memory;
409 toku_mutex_unlock(&m_escalation_mutex);
410 }
411
412 struct escalate_args {
413 locktree_manager *mgr;
414 locktree **locktrees;
415 int num_locktrees;
416 };
417
create(void)418 void locktree_manager::locktree_escalator::create(void) {
419 ZERO_STRUCT(m_escalator_mutex);
420 toku_mutex_init(*manager_escalator_mutex_key, &m_escalator_mutex, nullptr);
421 toku_cond_init(*manager_m_escalator_done_key, &m_escalator_done, nullptr);
422 m_escalator_running = false;
423 }
424
destroy(void)425 void locktree_manager::locktree_escalator::destroy(void) {
426 toku_cond_destroy(&m_escalator_done);
427 toku_mutex_destroy(&m_escalator_mutex);
428 }
429
run(locktree_manager * mgr,void (* escalate_locktrees_fun)(void * extra),void * extra)430 void locktree_manager::locktree_escalator::run(locktree_manager *mgr, void (*escalate_locktrees_fun)(void *extra), void *extra) {
431 uint64_t t0 = toku_current_time_microsec();
432 toku_mutex_lock(&m_escalator_mutex);
433 if (!m_escalator_running) {
434 // run escalation on this thread
435 m_escalator_running = true;
436 toku_mutex_unlock(&m_escalator_mutex);
437 escalate_locktrees_fun(extra);
438 toku_mutex_lock(&m_escalator_mutex);
439 m_escalator_running = false;
440 toku_cond_broadcast(&m_escalator_done);
441 } else {
442 toku_cond_wait(&m_escalator_done, &m_escalator_mutex);
443 }
444 toku_mutex_unlock(&m_escalator_mutex);
445 uint64_t t1 = toku_current_time_microsec();
446 mgr->add_escalator_wait_time(t1 - t0);
447 }
448
get_status(LTM_STATUS statp)449 void locktree_manager::get_status(LTM_STATUS statp) {
450 ltm_status.init();
451 LTM_STATUS_VAL(LTM_SIZE_CURRENT) = m_current_lock_memory;
452 LTM_STATUS_VAL(LTM_SIZE_LIMIT) = m_max_lock_memory;
453 LTM_STATUS_VAL(LTM_ESCALATION_COUNT) = m_escalation_count;
454 LTM_STATUS_VAL(LTM_ESCALATION_TIME) = m_escalation_time;
455 LTM_STATUS_VAL(LTM_ESCALATION_LATEST_RESULT) = m_escalation_latest_result;
456 LTM_STATUS_VAL(LTM_WAIT_ESCALATION_COUNT) = m_wait_escalation_count;
457 LTM_STATUS_VAL(LTM_WAIT_ESCALATION_TIME) = m_wait_escalation_time;
458 LTM_STATUS_VAL(LTM_LONG_WAIT_ESCALATION_COUNT) = m_long_wait_escalation_count;
459 LTM_STATUS_VAL(LTM_LONG_WAIT_ESCALATION_TIME) = m_long_wait_escalation_time;
460
461 uint64_t lock_requests_pending = 0;
462 uint64_t sto_num_eligible = 0;
463 uint64_t sto_end_early_count = 0;
464 tokutime_t sto_end_early_time = 0;
465 size_t num_locktrees = 0;
466 struct lt_counters lt_counters = {};
467
468 if (toku_mutex_trylock(&m_mutex) == 0) {
469 lt_counters = m_lt_counters;
470 num_locktrees = m_locktree_map.size();
471 for (size_t i = 0; i < num_locktrees; i++) {
472 locktree *lt;
473 int r = m_locktree_map.fetch(i, <);
474 invariant_zero(r);
475 if (toku_mutex_trylock(<->m_lock_request_info.mutex) == 0) {
476 lock_requests_pending += lt->m_lock_request_info.pending_lock_requests.size();
477 lt_counters.add(lt->get_lock_request_info()->counters);
478 toku_mutex_unlock(<->m_lock_request_info.mutex);
479 }
480 sto_num_eligible += lt->sto_txnid_is_valid_unsafe() ? 1 : 0;
481 sto_end_early_count += lt->m_sto_end_early_count;
482 sto_end_early_time += lt->m_sto_end_early_time;
483 }
484 mutex_unlock();
485 }
486
487 LTM_STATUS_VAL(LTM_NUM_LOCKTREES) = num_locktrees;
488 LTM_STATUS_VAL(LTM_LOCK_REQUESTS_PENDING) = lock_requests_pending;
489 LTM_STATUS_VAL(LTM_STO_NUM_ELIGIBLE) = sto_num_eligible;
490 LTM_STATUS_VAL(LTM_STO_END_EARLY_COUNT) = sto_end_early_count;
491 LTM_STATUS_VAL(LTM_STO_END_EARLY_TIME) = sto_end_early_time;
492 LTM_STATUS_VAL(LTM_WAIT_COUNT) = lt_counters.wait_count;
493 LTM_STATUS_VAL(LTM_WAIT_TIME) = lt_counters.wait_time;
494 LTM_STATUS_VAL(LTM_LONG_WAIT_COUNT) = lt_counters.long_wait_count;
495 LTM_STATUS_VAL(LTM_LONG_WAIT_TIME) = lt_counters.long_wait_time;
496 LTM_STATUS_VAL(LTM_TIMEOUT_COUNT) = lt_counters.timeout_count;
497 *statp = ltm_status;
498 }
499
kill_waiter(void * extra)500 void locktree_manager::kill_waiter(void *extra) {
501 mutex_lock();
502 int r = 0;
503 size_t num_locktrees = m_locktree_map.size();
504 for (size_t i = 0; i < num_locktrees; i++) {
505 locktree *lt;
506 r = m_locktree_map.fetch(i, <);
507 invariant_zero(r);
508 lock_request::kill_waiter(lt, extra);
509 }
510 mutex_unlock();
511 }
512
513 } /* namespace toku */
514