1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35
36 ----------------------------------------
37
38 Licensed under the Apache License, Version 2.0 (the "License");
39 you may not use this file except in compliance with the License.
40 You may obtain a copy of the License at
41
42 http://www.apache.org/licenses/LICENSE-2.0
43
44 Unless required by applicable law or agreed to in writing, software
45 distributed under the License is distributed on an "AS IS" BASIS,
46 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47 See the License for the specific language governing permissions and
48 limitations under the License.
49 ======= */
50
51 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
52
53 #include "portability/toku_race_tools.h"
54
55 #include "ft/txn/txn.h"
56 #include "locktree/locktree.h"
57 #include "locktree/lock_request.h"
58 #include "util/dbt.h"
59
60 namespace toku {
61
62 // initialize a lock request's internals
create(void)63 void lock_request::create(void) {
64 m_txnid = TXNID_NONE;
65 m_conflicting_txnid = TXNID_NONE;
66 m_start_time = 0;
67 m_left_key = nullptr;
68 m_right_key = nullptr;
69 toku_init_dbt(&m_left_key_copy);
70 toku_init_dbt(&m_right_key_copy);
71
72 m_type = type::UNKNOWN;
73 m_lt = nullptr;
74
75 m_complete_r = 0;
76 m_state = state::UNINITIALIZED;
77 m_info = nullptr;
78
79 toku_cond_init(*lock_request_m_wait_cond_key, &m_wait_cond, nullptr);
80
81 m_start_test_callback = nullptr;
82 m_start_before_pending_test_callback = nullptr;
83 m_retry_test_callback = nullptr;
84 }
85
86 // destroy a lock request.
destroy(void)87 void lock_request::destroy(void) {
88 invariant(m_state != state::PENDING);
89 invariant(m_state != state::DESTROYED);
90 m_state = state::DESTROYED;
91 toku_destroy_dbt(&m_left_key_copy);
92 toku_destroy_dbt(&m_right_key_copy);
93 toku_cond_destroy(&m_wait_cond);
94 }
95
clearmem(char c)96 void lock_request::clearmem(char c) {
97 memset(this, c, sizeof(* this));
98 }
99
100 // set the lock request parameters. this API allows a lock request to be reused.
set(locktree * lt,TXNID txnid,const DBT * left_key,const DBT * right_key,lock_request::type lock_type,bool big_txn,void * extra)101 void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) {
102 invariant(m_state != state::PENDING);
103 m_lt = lt;
104 m_txnid = txnid;
105 m_left_key = left_key;
106 m_right_key = right_key;
107 toku_destroy_dbt(&m_left_key_copy);
108 toku_destroy_dbt(&m_right_key_copy);
109 m_type = lock_type;
110 m_state = state::INITIALIZED;
111 m_info = lt ? lt->get_lock_request_info() : nullptr;
112 m_big_txn = big_txn;
113 m_extra = extra;
114 }
115
116 // get rid of any stored left and right key copies and
117 // replace them with copies of the given left and right key
copy_keys()118 void lock_request::copy_keys() {
119 if (!toku_dbt_is_infinite(m_left_key)) {
120 toku_clone_dbt(&m_left_key_copy, *m_left_key);
121 m_left_key = &m_left_key_copy;
122 }
123 if (!toku_dbt_is_infinite(m_right_key)) {
124 toku_clone_dbt(&m_right_key_copy, *m_right_key);
125 m_right_key = &m_right_key_copy;
126 }
127 }
128
129 // what are the conflicts for this pending lock request?
get_conflicts(txnid_set * conflicts)130 void lock_request::get_conflicts(txnid_set *conflicts) {
131 invariant(m_state == state::PENDING);
132 const bool is_write_request = m_type == type::WRITE;
133 m_lt->get_conflicts(is_write_request, m_txnid, m_left_key, m_right_key, conflicts);
134 }
135
136 // build a wait-for-graph for this lock request and the given conflict set
137 // for each transaction B that blocks A's lock request
138 // if B is blocked then
139 // add (A,T) to the WFG and if B is new, fill in the WFG from B
build_wait_graph(wfg * wait_graph,const txnid_set & conflicts)140 void lock_request::build_wait_graph(wfg *wait_graph, const txnid_set &conflicts) {
141 size_t num_conflicts = conflicts.size();
142 for (size_t i = 0; i < num_conflicts; i++) {
143 TXNID conflicting_txnid = conflicts.get(i);
144 lock_request *conflicting_request = find_lock_request(conflicting_txnid);
145 invariant(conflicting_txnid != m_txnid);
146 invariant(conflicting_request != this);
147 if (conflicting_request) {
148 bool already_exists = wait_graph->node_exists(conflicting_txnid);
149 wait_graph->add_edge(m_txnid, conflicting_txnid);
150 if (!already_exists) {
151 // recursively build the wait for graph rooted at the conflicting
152 // request, given its set of lock conflicts.
153 txnid_set other_conflicts;
154 other_conflicts.create();
155 conflicting_request->get_conflicts(&other_conflicts);
156 conflicting_request->build_wait_graph(wait_graph, other_conflicts);
157 other_conflicts.destroy();
158 }
159 }
160 }
161 }
162
163 // returns: true if the current set of lock requests contains
164 // a deadlock, false otherwise.
deadlock_exists(const txnid_set & conflicts)165 bool lock_request::deadlock_exists(const txnid_set &conflicts) {
166 wfg wait_graph;
167 wait_graph.create();
168
169 build_wait_graph(&wait_graph, conflicts);
170 bool deadlock = wait_graph.cycle_exists_from_txnid(m_txnid);
171
172 wait_graph.destroy();
173 return deadlock;
174 }
175
176 // try to acquire a lock described by this lock request.
start(void)177 int lock_request::start(void) {
178 int r;
179
180 txnid_set conflicts;
181 conflicts.create();
182 if (m_type == type::WRITE) {
183 r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
184 } else {
185 invariant(m_type == type::READ);
186 r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
187 }
188
189 // if the lock is not granted, save it to the set of lock requests
190 // and check for a deadlock. if there is one, complete it as failed
191 if (r == DB_LOCK_NOTGRANTED) {
192 copy_keys();
193 m_state = state::PENDING;
194 m_start_time = toku_current_time_microsec() / 1000;
195 m_conflicting_txnid = conflicts.get(0);
196 if (m_start_before_pending_test_callback)
197 m_start_before_pending_test_callback();
198 toku_mutex_lock(&m_info->mutex);
199 insert_into_lock_requests();
200 if (deadlock_exists(conflicts)) {
201 remove_from_lock_requests();
202 r = DB_LOCK_DEADLOCK;
203 }
204 toku_mutex_unlock(&m_info->mutex);
205 if (m_start_test_callback)
206 m_start_test_callback(); // test callback
207 }
208
209 if (r != DB_LOCK_NOTGRANTED) {
210 complete(r);
211 }
212
213 conflicts.destroy();
214 return r;
215 }
216
217 // sleep on the lock request until it becomes resolved or the wait time has elapsed.
wait(uint64_t wait_time_ms)218 int lock_request::wait(uint64_t wait_time_ms) {
219 return wait(wait_time_ms, 0, nullptr);
220 }
221
wait(uint64_t wait_time_ms,uint64_t killed_time_ms,int (* killed_callback)(void),void (* lock_wait_callback)(void *,TXNID,TXNID))222 int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
223 void (*lock_wait_callback)(void *, TXNID, TXNID)) {
224 uint64_t t_now = toku_current_time_microsec();
225 uint64_t t_start = t_now;
226 uint64_t t_end = t_start + wait_time_ms * 1000;
227
228 toku_mutex_lock(&m_info->mutex);
229
230 // check again, this time locking out other retry calls
231 if (m_state == state::PENDING) {
232 GrowableArray<TXNID> conflicts_collector;
233 conflicts_collector.init();
234 retry(&conflicts_collector);
235 if (m_state == state::PENDING) {
236 report_waits(&conflicts_collector, lock_wait_callback);
237 }
238 conflicts_collector.deinit();
239 }
240
241 while (m_state == state::PENDING) {
242 // check if this thread is killed
243 if (killed_callback && killed_callback()) {
244 remove_from_lock_requests();
245 complete(DB_LOCK_NOTGRANTED);
246 continue;
247 }
248
249 // compute next wait time
250 uint64_t t_wait;
251 if (killed_time_ms == 0) {
252 t_wait = t_end;
253 } else {
254 t_wait = t_now + killed_time_ms * 1000;
255 if (t_wait > t_end)
256 t_wait = t_end;
257 }
258 struct timespec ts = {};
259 ts.tv_sec = t_wait / 1000000;
260 ts.tv_nsec = (t_wait % 1000000) * 1000;
261 int r = toku_cond_timedwait(&m_wait_cond, &m_info->mutex, &ts);
262 invariant(r == 0 || r == ETIMEDOUT);
263
264 t_now = toku_current_time_microsec();
265 if (m_state == state::PENDING && (t_now >= t_end)) {
266 m_info->counters.timeout_count += 1;
267
268 // if we're still pending and we timed out, then remove our
269 // request from the set of lock requests and fail.
270 remove_from_lock_requests();
271
272 // complete sets m_state to COMPLETE, breaking us out of the loop
273 complete(DB_LOCK_NOTGRANTED);
274 }
275 }
276
277 uint64_t t_real_end = toku_current_time_microsec();
278 uint64_t duration = t_real_end - t_start;
279 m_info->counters.wait_count += 1;
280 m_info->counters.wait_time += duration;
281 if (duration >= 1000000) {
282 m_info->counters.long_wait_count += 1;
283 m_info->counters.long_wait_time += duration;
284 }
285 toku_mutex_unlock(&m_info->mutex);
286
287 invariant(m_state == state::COMPLETE);
288 return m_complete_r;
289 }
290
291 // complete this lock request with the given return value
complete(int complete_r)292 void lock_request::complete(int complete_r) {
293 m_complete_r = complete_r;
294 m_state = state::COMPLETE;
295 }
296
get_left_key(void) const297 const DBT *lock_request::get_left_key(void) const {
298 return m_left_key;
299 }
300
get_right_key(void) const301 const DBT *lock_request::get_right_key(void) const {
302 return m_right_key;
303 }
304
get_txnid(void) const305 TXNID lock_request::get_txnid(void) const {
306 return m_txnid;
307 }
308
get_start_time(void) const309 uint64_t lock_request::get_start_time(void) const {
310 return m_start_time;
311 }
312
get_conflicting_txnid(void) const313 TXNID lock_request::get_conflicting_txnid(void) const {
314 return m_conflicting_txnid;
315 }
316
retry(GrowableArray<TXNID> * conflicts_collector)317 int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
318 invariant(m_state == state::PENDING);
319 int r;
320 txnid_set conflicts;
321 conflicts.create();
322
323 if (m_type == type::WRITE) {
324 r = m_lt->acquire_write_lock(
325 m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
326 } else {
327 r = m_lt->acquire_read_lock(
328 m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
329 }
330
331 // if the acquisition succeeded then remove ourselves from the
332 // set of lock requests, complete, and signal the waiting thread.
333 if (r == 0) {
334 remove_from_lock_requests();
335 complete(r);
336 if (m_retry_test_callback)
337 m_retry_test_callback(); // test callback
338 toku_cond_broadcast(&m_wait_cond);
339 } else {
340 m_conflicting_txnid = conflicts.get(0);
341 add_conflicts_to_waits(&conflicts, conflicts_collector);
342 }
343 conflicts.destroy();
344
345 return r;
346 }
347
retry_all_lock_requests(locktree * lt,void (* lock_wait_callback)(void *,TXNID,TXNID),void (* after_retry_all_test_callback)(void))348 void lock_request::retry_all_lock_requests(
349 locktree *lt,
350 void (*lock_wait_callback)(void *, TXNID, TXNID),
351 void (*after_retry_all_test_callback)(void)) {
352 lt_lock_request_info *info = lt->get_lock_request_info();
353
354 // if there are no pending lock requests than there is nothing to do
355 // the unlocked data race on pending_is_empty is OK since lock requests
356 // are retried after added to the pending set.
357 if (info->pending_is_empty)
358 return;
359
360 // get my retry generation (post increment of retry_want)
361 unsigned long long my_retry_want = (info->retry_want += 1);
362
363 toku_mutex_lock(&info->retry_mutex);
364
365 GrowableArray<TXNID> conflicts_collector;
366 conflicts_collector.init();
367
368 // here is the group retry algorithm.
369 // get the latest retry_want count and use it as the generation number of
370 // this retry operation. if this retry generation is > the last retry
371 // generation, then do the lock retries. otherwise, no lock retries
372 // are needed.
373 if ((my_retry_want - 1) == info->retry_done) {
374 for (;;) {
375 if (!info->running_retry) {
376 info->running_retry = true;
377 info->retry_done = info->retry_want;
378 toku_mutex_unlock(&info->retry_mutex);
379 retry_all_lock_requests_info(info, &conflicts_collector);
380 if (after_retry_all_test_callback)
381 after_retry_all_test_callback();
382 toku_mutex_lock(&info->retry_mutex);
383 info->running_retry = false;
384 toku_cond_broadcast(&info->retry_cv);
385 break;
386 } else {
387 toku_cond_wait(&info->retry_cv, &info->retry_mutex);
388 }
389 }
390 }
391 toku_mutex_unlock(&info->retry_mutex);
392
393 report_waits(&conflicts_collector, lock_wait_callback);
394 conflicts_collector.deinit();
395 }
396
retry_all_lock_requests_info(lt_lock_request_info * info,GrowableArray<TXNID> * collector)397 void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, GrowableArray<TXNID> *collector) {
398 toku_mutex_lock(&info->mutex);
399 // retry all of the pending lock requests.
400 for (size_t i = 0; i < info->pending_lock_requests.size();) {
401 lock_request *request;
402 int r = info->pending_lock_requests.fetch(i, &request);
403 invariant_zero(r);
404
405 // retry the lock request. if it didn't succeed,
406 // move on to the next lock request. otherwise
407 // the request is gone from the list so we may
408 // read the i'th entry for the next one.
409 r = request->retry(collector);
410 if (r != 0) {
411 i++;
412 }
413 }
414
415 // future threads should only retry lock requests if some still exist
416 info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
417 toku_mutex_unlock(&info->mutex);
418 }
419
add_conflicts_to_waits(txnid_set * conflicts,GrowableArray<TXNID> * wait_conflicts)420 void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
421 GrowableArray<TXNID> *wait_conflicts) {
422 size_t num_conflicts = conflicts->size();
423 for (size_t i = 0; i < num_conflicts; i++) {
424 wait_conflicts->push(m_txnid);
425 wait_conflicts->push(conflicts->get(i));
426 }
427 }
428
report_waits(GrowableArray<TXNID> * wait_conflicts,void (* lock_wait_callback)(void *,TXNID,TXNID))429 void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts,
430 void (*lock_wait_callback)(void *, TXNID, TXNID)) {
431 if (!lock_wait_callback)
432 return;
433 size_t num_conflicts = wait_conflicts->get_size();
434 for (size_t i = 0; i < num_conflicts; i += 2) {
435 TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i);
436 TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1);
437 (*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid);
438 }
439 }
440
get_extra(void) const441 void *lock_request::get_extra(void) const {
442 return m_extra;
443 }
444
kill_waiter(void)445 void lock_request::kill_waiter(void) {
446 remove_from_lock_requests();
447 complete(DB_LOCK_NOTGRANTED);
448 toku_cond_broadcast(&m_wait_cond);
449 }
450
kill_waiter(locktree * lt,void * extra)451 void lock_request::kill_waiter(locktree *lt, void *extra) {
452 lt_lock_request_info *info = lt->get_lock_request_info();
453 toku_mutex_lock(&info->mutex);
454 for (size_t i = 0; i < info->pending_lock_requests.size(); i++) {
455 lock_request *request;
456 int r = info->pending_lock_requests.fetch(i, &request);
457 if (r == 0 && request->get_extra() == extra) {
458 request->kill_waiter();
459 break;
460 }
461 }
462 toku_mutex_unlock(&info->mutex);
463 }
464
465 // find another lock request by txnid. must hold the mutex.
find_lock_request(const TXNID & txnid)466 lock_request *lock_request::find_lock_request(const TXNID &txnid) {
467 lock_request *request;
468 int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(txnid, &request, nullptr);
469 if (r != 0) {
470 request = nullptr;
471 }
472 return request;
473 }
474
475 // insert this lock request into the locktree's set. must hold the mutex.
insert_into_lock_requests(void)476 void lock_request::insert_into_lock_requests(void) {
477 uint32_t idx;
478 lock_request *request;
479 int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(
480 m_txnid, &request, &idx);
481 invariant(r == DB_NOTFOUND);
482 r = m_info->pending_lock_requests.insert_at(this, idx);
483 invariant_zero(r);
484 m_info->pending_is_empty = false;
485 }
486
487 // remove this lock request from the locktree's set. must hold the mutex.
remove_from_lock_requests(void)488 void lock_request::remove_from_lock_requests(void) {
489 uint32_t idx;
490 lock_request *request;
491 int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(
492 m_txnid, &request, &idx);
493 invariant_zero(r);
494 invariant(request == this);
495 r = m_info->pending_lock_requests.delete_at(idx);
496 invariant_zero(r);
497 if (m_info->pending_lock_requests.size() == 0)
498 m_info->pending_is_empty = true;
499 }
500
find_by_txnid(lock_request * const & request,const TXNID & txnid)501 int lock_request::find_by_txnid(lock_request *const &request,
502 const TXNID &txnid) {
503 TXNID request_txnid = request->m_txnid;
504 if (request_txnid < txnid) {
505 return -1;
506 } else if (request_txnid == txnid) {
507 return 0;
508 } else {
509 return 1;
510 }
511 }
512
set_start_test_callback(void (* f)(void))513 void lock_request::set_start_test_callback(void (*f)(void)) {
514 m_start_test_callback = f;
515 }
516
set_start_before_pending_test_callback(void (* f)(void))517 void lock_request::set_start_before_pending_test_callback(void (*f)(void)) {
518 m_start_before_pending_test_callback = f;
519 }
520
set_retry_test_callback(void (* f)(void))521 void lock_request::set_retry_test_callback(void (*f)(void)) {
522 m_retry_test_callback = f;
523 }
524
525 } /* namespace toku */
526