1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 // verify that two transactions doing cursor next and prev operations detect a deadlock when
40 // using write locking cursor operations.
41 
42 #include "test.h"
43 #include "toku_pthread.h"
44 
get_key(DBT * key)45 static uint64_t get_key(DBT *key) {
46     uint64_t k = 0;
47     assert(key->size == sizeof k);
48     memcpy(&k, key->data, key->size);
49     return htonl(k);
50 }
51 
populate(DB_ENV * db_env,DB * db,uint64_t nrows)52 static void populate(DB_ENV *db_env, DB *db, uint64_t nrows) {
53     int r;
54 
55     DB_TXN *txn = NULL;
56     r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
57 
58     for (uint64_t i = 0; i < nrows; i++) {
59 
60         uint64_t k = htonl(i);
61         uint64_t v = i;
62         DBT key = { .data = &k, .size = sizeof k };
63         DBT val = { .data = &v, .size = sizeof v };
64         r = db->put(db, txn, &key, &val, 0); assert(r == 0);
65     }
66 
67     r = txn->commit(txn, 0); assert(r == 0);
68 }
69 
70 struct my_callback_context {
71     DBT key;
72     DBT val;
73 };
74 
blocking_next_callback(DBT const * a UU (),DBT const * b UU (),void * e UU ())75 static int blocking_next_callback(DBT const *a UU(), DBT const *b UU(), void *e UU()) {
76     DBT const *found_key = a;
77     DBT const *found_val = b;
78     struct my_callback_context *context = (struct my_callback_context *) e;
79     copy_dbt(&context->key, found_key);
80     copy_dbt(&context->val, found_val);
81     return 0;
82 }
83 
blocking_next(DB_ENV * db_env,DB * db,uint64_t nrows UU (),long sleeptime)84 static void blocking_next(DB_ENV *db_env, DB *db, uint64_t nrows UU(), long sleeptime) {
85     int r;
86 
87     struct my_callback_context context;
88     dbt_init_realloc(&context.key);
89     dbt_init_realloc(&context.val);
90 
91     DB_TXN *txn = NULL;
92     r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
93 
94     DBC *cursor = NULL;
95     r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
96 
97     uint64_t i;
98     for (i = 0; ; i++) {
99         r = cursor->c_getf_next(cursor, DB_RMW, blocking_next_callback, &context);
100         if (r != 0)
101             break;
102         if (verbose)
103             printf("%lu next %" PRIu64 "\n", (unsigned long) toku_pthread_self(), get_key(&context.key));
104         usleep(sleeptime);
105     }
106 
107     if (verbose)
108         printf("%lu next=%d\n", (unsigned long) toku_pthread_self(), r);
109     assert(r == DB_NOTFOUND || r == DB_LOCK_DEADLOCK);
110 
111     int rr = cursor->c_close(cursor); assert(rr == 0);
112 
113     if (r == DB_NOTFOUND) {
114         if (verbose) printf("%lu commit\n", (unsigned long) toku_pthread_self());
115         r = txn->commit(txn, 0);
116     } else {
117         if (verbose) printf("%lu abort\n", (unsigned long) toku_pthread_self());
118         r = txn->abort(txn);
119     }
120     assert(r == 0);
121 
122     toku_free(context.key.data);
123     toku_free(context.val.data);
124 }
125 
blocking_prev(DB_ENV * db_env,DB * db,uint64_t nrows UU (),long sleeptime)126 static void blocking_prev(DB_ENV *db_env, DB *db, uint64_t nrows UU(), long sleeptime) {
127     int r;
128 
129     struct my_callback_context context;
130     dbt_init_realloc(&context.key);
131     dbt_init_realloc(&context.val);
132 
133     DB_TXN *txn = NULL;
134     r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
135 
136     DBC *cursor = NULL;
137     r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
138 
139     uint64_t i;
140     for (i = 0; ; i++) {
141         r = cursor->c_getf_prev(cursor, DB_RMW, blocking_next_callback, &context);
142         if (r != 0)
143             break;
144         if (verbose)
145             printf("%lu prev %" PRIu64 "\n", (unsigned long) toku_pthread_self(), get_key(&context.key));
146         usleep(sleeptime);
147     }
148 
149     if (verbose)
150         printf("%lu prev=%d\n", (unsigned long) toku_pthread_self(), r);
151     assert(r == DB_NOTFOUND || r == DB_LOCK_DEADLOCK);
152 
153     int rr = cursor->c_close(cursor); assert(rr == 0);
154 
155     if (r == DB_NOTFOUND) {
156         if (verbose) printf("%lu commit\n", (unsigned long) toku_pthread_self());
157         r = txn->commit(txn, 0);
158     } else {
159         if (verbose) printf("%lu abort\n", (unsigned long) toku_pthread_self());
160         r = txn->abort(txn);
161     }
162     assert(r == 0);
163 
164     toku_free(context.key.data);
165     toku_free(context.val.data);
166 }
167 
168 struct blocking_next_args {
169     DB_ENV *db_env;
170     DB *db;
171     uint64_t nrows;
172     long sleeptime;
173 };
174 
blocking_next_thread(void * arg)175 static void *blocking_next_thread(void *arg) {
176     struct blocking_next_args *a = (struct blocking_next_args *) arg;
177     blocking_next(a->db_env, a->db, a->nrows, a->sleeptime);
178     return arg;
179 }
180 
run_test(DB_ENV * db_env,DB * db,int nthreads,uint64_t nrows,long sleeptime)181 static void run_test(DB_ENV *db_env, DB *db, int nthreads, uint64_t nrows, long sleeptime) {
182     int r;
183     toku_pthread_t tids[nthreads];
184     struct blocking_next_args a = {db_env, db, nrows, sleeptime};
185     for (int i = 0; i < nthreads - 1; i++) {
186         r = toku_pthread_create(
187             toku_uninstrumented, &tids[i], nullptr, blocking_next_thread, &a);
188         assert(r == 0);
189     }
190     blocking_prev(db_env, db, nrows, sleeptime);
191     for (int i = 0; i < nthreads - 1; i++) {
192         void *ret;
193         r = toku_pthread_join(tids[i], &ret); assert(r == 0);
194     }
195 }
196 
test_main(int argc,char * const argv[])197 int test_main(int argc, char * const argv[]) {
198     uint64_t cachesize = 0;
199     uint32_t pagesize = 0;
200     uint64_t nrows = 10;
201     int nthreads = 2;
202     long sleeptime = 100000;
203     const char *db_env_dir = TOKU_TEST_FILENAME;
204     const char *db_filename = "test.db";
205     int db_env_open_flags = DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_THREAD;
206 
207     // parse_args(argc, argv);
208     for (int i = 1; i < argc; i++) {
209         if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
210             verbose++;
211             continue;
212         }
213         if (strcmp(argv[i], "-q") == 0 || strcmp(argv[i], "--quiet") == 0) {
214             if (verbose > 0)
215                 verbose--;
216             continue;
217         }
218         if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
219             nrows = atoll(argv[++i]);
220             continue;
221         }
222         if (strcmp(argv[i], "--nthreads") == 0 && i+1 < argc) {
223             nthreads = atoi(argv[++i]);
224             continue;
225         }
226         if (strcmp(argv[i], "--sleeptime") == 0 && i+1 < argc) {
227             sleeptime = atol(argv[++i]);
228             continue;
229         }
230         assert(0);
231     }
232 
233     // setup env
234     int r;
235     char rm_cmd[strlen(db_env_dir) + strlen("rm -rf ") + 1];
236     snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", db_env_dir);
237     r = system(rm_cmd); assert(r == 0);
238 
239     r = toku_os_mkdir(db_env_dir, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); assert(r == 0);
240 
241     DB_ENV *db_env = NULL;
242     r = db_env_create(&db_env, 0); assert(r == 0);
243     if (cachesize) {
244         const uint64_t gig = 1 << 30;
245         r = db_env->set_cachesize(db_env, cachesize / gig, cachesize % gig, 1); assert(r == 0);
246     }
247     r = db_env->open(db_env, db_env_dir, db_env_open_flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
248     r = db_env->set_lock_timeout(db_env, 30 * 1000, nullptr); assert(r == 0);
249 
250     // create the db
251     DB *db = NULL;
252     r = db_create(&db, db_env, 0); assert(r == 0);
253     if (pagesize) {
254         r = db->set_pagesize(db, pagesize); assert(r == 0);
255     }
256     r = db->open(db, NULL, db_filename, NULL, DB_BTREE, DB_CREATE|DB_AUTO_COMMIT|DB_THREAD, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
257 
258     // populate the db
259     populate(db_env, db, nrows);
260 
261     run_test(db_env, db, nthreads, nrows, sleeptime);
262 
263     // close env
264     r = db->close(db, 0); assert(r == 0); db = NULL;
265     r = db_env->close(db_env, 0); assert(r == 0); db_env = NULL;
266 
267     return 0;
268 }
269