1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 
36 ----------------------------------------
37 
38    Licensed under the Apache License, Version 2.0 (the "License");
39    you may not use this file except in compliance with the License.
40    You may obtain a copy of the License at
41 
42        http://www.apache.org/licenses/LICENSE-2.0
43 
44    Unless required by applicable law or agreed to in writing, software
45    distributed under the License is distributed on an "AS IS" BASIS,
46    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47    See the License for the specific language governing permissions and
48    limitations under the License.
49 ======= */
50 
51 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
52 
53 #include "portability/toku_race_tools.h"
54 
55 #include "ft/txn/txn.h"
56 #include "locktree/locktree.h"
57 #include "locktree/lock_request.h"
58 #include "util/dbt.h"
59 
60 namespace toku {
61 
62 // initialize a lock request's internals
create(void)63 void lock_request::create(void) {
64     m_txnid = TXNID_NONE;
65     m_conflicting_txnid = TXNID_NONE;
66     m_start_time = 0;
67     m_left_key = nullptr;
68     m_right_key = nullptr;
69     toku_init_dbt(&m_left_key_copy);
70     toku_init_dbt(&m_right_key_copy);
71 
72     m_type = type::UNKNOWN;
73     m_lt = nullptr;
74 
75     m_complete_r = 0;
76     m_state = state::UNINITIALIZED;
77     m_info = nullptr;
78 
79     toku_cond_init(*lock_request_m_wait_cond_key, &m_wait_cond, nullptr);
80 
81     m_start_test_callback = nullptr;
82     m_start_before_pending_test_callback = nullptr;
83     m_retry_test_callback = nullptr;
84 }
85 
86 // destroy a lock request.
destroy(void)87 void lock_request::destroy(void) {
88     invariant(m_state != state::PENDING);
89     invariant(m_state != state::DESTROYED);
90     m_state = state::DESTROYED;
91     toku_destroy_dbt(&m_left_key_copy);
92     toku_destroy_dbt(&m_right_key_copy);
93     toku_cond_destroy(&m_wait_cond);
94 }
95 
clearmem(char c)96 void lock_request::clearmem(char c) {
97      memset(this, c, sizeof(* this));
98 }
99 
100 // set the lock request parameters. this API allows a lock request to be reused.
set(locktree * lt,TXNID txnid,const DBT * left_key,const DBT * right_key,lock_request::type lock_type,bool big_txn,void * extra)101 void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) {
102     invariant(m_state != state::PENDING);
103     m_lt = lt;
104     m_txnid = txnid;
105     m_left_key = left_key;
106     m_right_key = right_key;
107     toku_destroy_dbt(&m_left_key_copy);
108     toku_destroy_dbt(&m_right_key_copy);
109     m_type = lock_type;
110     m_state = state::INITIALIZED;
111     m_info = lt ? lt->get_lock_request_info() : nullptr;
112     m_big_txn = big_txn;
113     m_extra = extra;
114 }
115 
116 // get rid of any stored left and right key copies and
117 // replace them with copies of the given left and right key
copy_keys()118 void lock_request::copy_keys() {
119     if (!toku_dbt_is_infinite(m_left_key)) {
120         toku_clone_dbt(&m_left_key_copy, *m_left_key);
121         m_left_key = &m_left_key_copy;
122     }
123     if (!toku_dbt_is_infinite(m_right_key)) {
124         toku_clone_dbt(&m_right_key_copy, *m_right_key);
125         m_right_key = &m_right_key_copy;
126     }
127 }
128 
129 // what are the conflicts for this pending lock request?
get_conflicts(txnid_set * conflicts)130 void lock_request::get_conflicts(txnid_set *conflicts) {
131     invariant(m_state == state::PENDING);
132     const bool is_write_request = m_type == type::WRITE;
133     m_lt->get_conflicts(is_write_request, m_txnid, m_left_key, m_right_key, conflicts);
134 }
135 
136 // build a wait-for-graph for this lock request and the given conflict set
137 // for each transaction B that blocks A's lock request
138 //     if B is blocked then
139 //         add (A,T) to the WFG and if B is new, fill in the WFG from B
build_wait_graph(wfg * wait_graph,const txnid_set & conflicts)140 void lock_request::build_wait_graph(wfg *wait_graph, const txnid_set &conflicts) {
141     size_t num_conflicts = conflicts.size();
142     for (size_t i = 0; i < num_conflicts; i++) {
143         TXNID conflicting_txnid = conflicts.get(i);
144         lock_request *conflicting_request = find_lock_request(conflicting_txnid);
145         invariant(conflicting_txnid != m_txnid);
146         invariant(conflicting_request != this);
147         if (conflicting_request) {
148             bool already_exists = wait_graph->node_exists(conflicting_txnid);
149             wait_graph->add_edge(m_txnid, conflicting_txnid);
150             if (!already_exists) {
151                 // recursively build the wait for graph rooted at the conflicting
152                 // request, given its set of lock conflicts.
153                 txnid_set other_conflicts;
154                 other_conflicts.create();
155                 conflicting_request->get_conflicts(&other_conflicts);
156                 conflicting_request->build_wait_graph(wait_graph, other_conflicts);
157                 other_conflicts.destroy();
158             }
159         }
160     }
161 }
162 
163 // returns: true if the current set of lock requests contains
164 //          a deadlock, false otherwise.
deadlock_exists(const txnid_set & conflicts)165 bool lock_request::deadlock_exists(const txnid_set &conflicts) {
166     wfg wait_graph;
167     wait_graph.create();
168 
169     build_wait_graph(&wait_graph, conflicts);
170     bool deadlock = wait_graph.cycle_exists_from_txnid(m_txnid);
171 
172     wait_graph.destroy();
173     return deadlock;
174 }
175 
176 // try to acquire a lock described by this lock request.
start(void)177 int lock_request::start(void) {
178     int r;
179 
180     txnid_set conflicts;
181     conflicts.create();
182     if (m_type == type::WRITE) {
183         r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
184     } else {
185         invariant(m_type == type::READ);
186         r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
187     }
188 
189     // if the lock is not granted, save it to the set of lock requests
190     // and check for a deadlock. if there is one, complete it as failed
191     if (r == DB_LOCK_NOTGRANTED) {
192         copy_keys();
193         m_state = state::PENDING;
194         m_start_time = toku_current_time_microsec() / 1000;
195         m_conflicting_txnid = conflicts.get(0);
196         if (m_start_before_pending_test_callback)
197             m_start_before_pending_test_callback();
198         toku_mutex_lock(&m_info->mutex);
199         insert_into_lock_requests();
200         if (deadlock_exists(conflicts)) {
201             remove_from_lock_requests();
202             r = DB_LOCK_DEADLOCK;
203         }
204         toku_mutex_unlock(&m_info->mutex);
205         if (m_start_test_callback)
206             m_start_test_callback();  // test callback
207     }
208 
209     if (r != DB_LOCK_NOTGRANTED) {
210         complete(r);
211     }
212 
213     conflicts.destroy();
214     return r;
215 }
216 
217 // sleep on the lock request until it becomes resolved or the wait time has elapsed.
wait(uint64_t wait_time_ms)218 int lock_request::wait(uint64_t wait_time_ms) {
219     return wait(wait_time_ms, 0, nullptr);
220 }
221 
wait(uint64_t wait_time_ms,uint64_t killed_time_ms,int (* killed_callback)(void),void (* lock_wait_callback)(void *,TXNID,TXNID))222 int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
223                        void (*lock_wait_callback)(void *, TXNID, TXNID)) {
224     uint64_t t_now = toku_current_time_microsec();
225     uint64_t t_start = t_now;
226     uint64_t t_end = t_start + wait_time_ms * 1000;
227 
228     toku_mutex_lock(&m_info->mutex);
229 
230     // check again, this time locking out other retry calls
231     if (m_state == state::PENDING) {
232         GrowableArray<TXNID> conflicts_collector;
233         conflicts_collector.init();
234         retry(&conflicts_collector);
235         if (m_state == state::PENDING) {
236             report_waits(&conflicts_collector, lock_wait_callback);
237         }
238         conflicts_collector.deinit();
239     }
240 
241     while (m_state == state::PENDING) {
242         // check if this thread is killed
243         if (killed_callback && killed_callback()) {
244             remove_from_lock_requests();
245             complete(DB_LOCK_NOTGRANTED);
246             continue;
247         }
248 
249         // compute next wait time
250         uint64_t t_wait;
251         if (killed_time_ms == 0) {
252             t_wait = t_end;
253         } else {
254             t_wait = t_now + killed_time_ms * 1000;
255             if (t_wait > t_end)
256                 t_wait = t_end;
257         }
258         struct timespec ts = {};
259         ts.tv_sec = t_wait / 1000000;
260         ts.tv_nsec = (t_wait % 1000000) * 1000;
261         int r = toku_cond_timedwait(&m_wait_cond, &m_info->mutex, &ts);
262         invariant(r == 0 || r == ETIMEDOUT);
263 
264         t_now = toku_current_time_microsec();
265         if (m_state == state::PENDING && (t_now >= t_end)) {
266             m_info->counters.timeout_count += 1;
267 
268             // if we're still pending and we timed out, then remove our
269             // request from the set of lock requests and fail.
270             remove_from_lock_requests();
271 
272             // complete sets m_state to COMPLETE, breaking us out of the loop
273             complete(DB_LOCK_NOTGRANTED);
274         }
275     }
276 
277     uint64_t t_real_end = toku_current_time_microsec();
278     uint64_t duration = t_real_end - t_start;
279     m_info->counters.wait_count += 1;
280     m_info->counters.wait_time += duration;
281     if (duration >= 1000000) {
282         m_info->counters.long_wait_count += 1;
283         m_info->counters.long_wait_time += duration;
284     }
285     toku_mutex_unlock(&m_info->mutex);
286 
287     invariant(m_state == state::COMPLETE);
288     return m_complete_r;
289 }
290 
291 // complete this lock request with the given return value
complete(int complete_r)292 void lock_request::complete(int complete_r) {
293     m_complete_r = complete_r;
294     m_state = state::COMPLETE;
295 }
296 
get_left_key(void) const297 const DBT *lock_request::get_left_key(void) const {
298     return m_left_key;
299 }
300 
get_right_key(void) const301 const DBT *lock_request::get_right_key(void) const {
302     return m_right_key;
303 }
304 
get_txnid(void) const305 TXNID lock_request::get_txnid(void) const {
306     return m_txnid;
307 }
308 
get_start_time(void) const309 uint64_t lock_request::get_start_time(void) const {
310     return m_start_time;
311 }
312 
get_conflicting_txnid(void) const313 TXNID lock_request::get_conflicting_txnid(void) const {
314     return m_conflicting_txnid;
315 }
316 
retry(GrowableArray<TXNID> * conflicts_collector)317 int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
318     invariant(m_state == state::PENDING);
319     int r;
320     txnid_set conflicts;
321     conflicts.create();
322 
323     if (m_type == type::WRITE) {
324         r = m_lt->acquire_write_lock(
325             m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
326     } else {
327         r = m_lt->acquire_read_lock(
328             m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
329     }
330 
331     // if the acquisition succeeded then remove ourselves from the
332     // set of lock requests, complete, and signal the waiting thread.
333     if (r == 0) {
334         remove_from_lock_requests();
335         complete(r);
336         if (m_retry_test_callback)
337             m_retry_test_callback();  // test callback
338         toku_cond_broadcast(&m_wait_cond);
339     } else {
340         m_conflicting_txnid = conflicts.get(0);
341         add_conflicts_to_waits(&conflicts, conflicts_collector);
342     }
343     conflicts.destroy();
344 
345     return r;
346 }
347 
retry_all_lock_requests(locktree * lt,void (* lock_wait_callback)(void *,TXNID,TXNID),void (* after_retry_all_test_callback)(void))348 void lock_request::retry_all_lock_requests(
349     locktree *lt,
350     void (*lock_wait_callback)(void *, TXNID, TXNID),
351     void (*after_retry_all_test_callback)(void)) {
352     lt_lock_request_info *info = lt->get_lock_request_info();
353 
354     // if there are no pending lock requests than there is nothing to do
355     // the unlocked data race on pending_is_empty is OK since lock requests
356     // are retried after added to the pending set.
357     if (info->pending_is_empty)
358         return;
359 
360     // get my retry generation (post increment of retry_want)
361     unsigned long long my_retry_want = (info->retry_want += 1);
362 
363     toku_mutex_lock(&info->retry_mutex);
364 
365     GrowableArray<TXNID> conflicts_collector;
366     conflicts_collector.init();
367 
368     // here is the group retry algorithm.
369     // get the latest retry_want count and use it as the generation number of
370     // this retry operation. if this retry generation is > the last retry
371     // generation, then do the lock retries.  otherwise, no lock retries
372     // are needed.
373     if ((my_retry_want - 1) == info->retry_done) {
374         for (;;) {
375             if (!info->running_retry) {
376                 info->running_retry = true;
377                 info->retry_done = info->retry_want;
378                 toku_mutex_unlock(&info->retry_mutex);
379                 retry_all_lock_requests_info(info, &conflicts_collector);
380                 if (after_retry_all_test_callback)
381                     after_retry_all_test_callback();
382                 toku_mutex_lock(&info->retry_mutex);
383                 info->running_retry = false;
384                 toku_cond_broadcast(&info->retry_cv);
385                 break;
386             } else {
387                 toku_cond_wait(&info->retry_cv, &info->retry_mutex);
388             }
389         }
390     }
391     toku_mutex_unlock(&info->retry_mutex);
392 
393     report_waits(&conflicts_collector, lock_wait_callback);
394     conflicts_collector.deinit();
395 }
396 
retry_all_lock_requests_info(lt_lock_request_info * info,GrowableArray<TXNID> * collector)397 void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, GrowableArray<TXNID> *collector) {
398     toku_mutex_lock(&info->mutex);
399     // retry all of the pending lock requests.
400     for (size_t i = 0; i < info->pending_lock_requests.size();) {
401         lock_request *request;
402         int r = info->pending_lock_requests.fetch(i, &request);
403         invariant_zero(r);
404 
405         // retry the lock request. if it didn't succeed,
406         // move on to the next lock request. otherwise
407         // the request is gone from the list so we may
408         // read the i'th entry for the next one.
409         r = request->retry(collector);
410         if (r != 0) {
411             i++;
412         }
413     }
414 
415     // future threads should only retry lock requests if some still exist
416     info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
417     toku_mutex_unlock(&info->mutex);
418 }
419 
add_conflicts_to_waits(txnid_set * conflicts,GrowableArray<TXNID> * wait_conflicts)420 void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
421                                           GrowableArray<TXNID> *wait_conflicts) {
422     size_t num_conflicts = conflicts->size();
423     for (size_t i = 0; i < num_conflicts; i++) {
424         wait_conflicts->push(m_txnid);
425         wait_conflicts->push(conflicts->get(i));
426     }
427 }
428 
report_waits(GrowableArray<TXNID> * wait_conflicts,void (* lock_wait_callback)(void *,TXNID,TXNID))429 void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts,
430                                 void (*lock_wait_callback)(void *, TXNID, TXNID)) {
431     if (!lock_wait_callback)
432         return;
433     size_t num_conflicts = wait_conflicts->get_size();
434     for (size_t i = 0; i < num_conflicts; i += 2) {
435         TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i);
436         TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1);
437         (*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid);
438     }
439 }
440 
get_extra(void) const441 void *lock_request::get_extra(void) const {
442     return m_extra;
443 }
444 
kill_waiter(void)445 void lock_request::kill_waiter(void) {
446     remove_from_lock_requests();
447     complete(DB_LOCK_NOTGRANTED);
448     toku_cond_broadcast(&m_wait_cond);
449 }
450 
kill_waiter(locktree * lt,void * extra)451 void lock_request::kill_waiter(locktree *lt, void *extra) {
452     lt_lock_request_info *info = lt->get_lock_request_info();
453     toku_mutex_lock(&info->mutex);
454     for (size_t i = 0; i < info->pending_lock_requests.size(); i++) {
455         lock_request *request;
456         int r = info->pending_lock_requests.fetch(i, &request);
457         if (r == 0 && request->get_extra() == extra) {
458             request->kill_waiter();
459             break;
460         }
461     }
462     toku_mutex_unlock(&info->mutex);
463 }
464 
465 // find another lock request by txnid. must hold the mutex.
find_lock_request(const TXNID & txnid)466 lock_request *lock_request::find_lock_request(const TXNID &txnid) {
467     lock_request *request;
468     int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(txnid, &request, nullptr);
469     if (r != 0) {
470         request = nullptr;
471     }
472     return request;
473 }
474 
475 // insert this lock request into the locktree's set. must hold the mutex.
insert_into_lock_requests(void)476 void lock_request::insert_into_lock_requests(void) {
477     uint32_t idx;
478     lock_request *request;
479     int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(
480         m_txnid, &request, &idx);
481     invariant(r == DB_NOTFOUND);
482     r = m_info->pending_lock_requests.insert_at(this, idx);
483     invariant_zero(r);
484     m_info->pending_is_empty = false;
485 }
486 
487 // remove this lock request from the locktree's set. must hold the mutex.
remove_from_lock_requests(void)488 void lock_request::remove_from_lock_requests(void) {
489     uint32_t idx;
490     lock_request *request;
491     int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(
492         m_txnid, &request, &idx);
493     invariant_zero(r);
494     invariant(request == this);
495     r = m_info->pending_lock_requests.delete_at(idx);
496     invariant_zero(r);
497     if (m_info->pending_lock_requests.size() == 0)
498         m_info->pending_is_empty = true;
499 }
500 
find_by_txnid(lock_request * const & request,const TXNID & txnid)501 int lock_request::find_by_txnid(lock_request *const &request,
502                                 const TXNID &txnid) {
503     TXNID request_txnid = request->m_txnid;
504     if (request_txnid < txnid) {
505         return -1;
506     } else if (request_txnid == txnid) {
507         return 0;
508     } else {
509         return 1;
510     }
511 }
512 
set_start_test_callback(void (* f)(void))513 void lock_request::set_start_test_callback(void (*f)(void)) {
514     m_start_test_callback = f;
515 }
516 
set_start_before_pending_test_callback(void (* f)(void))517 void lock_request::set_start_before_pending_test_callback(void (*f)(void)) {
518     m_start_before_pending_test_callback = f;
519 }
520 
set_retry_test_callback(void (* f)(void))521 void lock_request::set_retry_test_callback(void (*f)(void)) {
522     m_retry_test_callback = f;
523 }
524 
525 } /* namespace toku */
526