1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 // verify that two transactions doing cursor next and prev operations detect a deadlock when
40 // using write locking cursor operations.
41
42 #include "test.h"
43 #include "toku_pthread.h"
44
get_key(DBT * key)45 static uint64_t get_key(DBT *key) {
46 uint64_t k = 0;
47 assert(key->size == sizeof k);
48 memcpy(&k, key->data, key->size);
49 return htonl(k);
50 }
51
populate(DB_ENV * db_env,DB * db,uint64_t nrows)52 static void populate(DB_ENV *db_env, DB *db, uint64_t nrows) {
53 int r;
54
55 DB_TXN *txn = NULL;
56 r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
57
58 for (uint64_t i = 0; i < nrows; i++) {
59
60 uint64_t k = htonl(i);
61 uint64_t v = i;
62 DBT key = { .data = &k, .size = sizeof k };
63 DBT val = { .data = &v, .size = sizeof v };
64 r = db->put(db, txn, &key, &val, 0); assert(r == 0);
65 }
66
67 r = txn->commit(txn, 0); assert(r == 0);
68 }
69
70 struct my_callback_context {
71 DBT key;
72 DBT val;
73 };
74
blocking_next_callback(DBT const * a UU (),DBT const * b UU (),void * e UU ())75 static int blocking_next_callback(DBT const *a UU(), DBT const *b UU(), void *e UU()) {
76 DBT const *found_key = a;
77 DBT const *found_val = b;
78 struct my_callback_context *context = (struct my_callback_context *) e;
79 copy_dbt(&context->key, found_key);
80 copy_dbt(&context->val, found_val);
81 return 0;
82 }
83
blocking_next(DB_ENV * db_env,DB * db,uint64_t nrows UU (),long sleeptime)84 static void blocking_next(DB_ENV *db_env, DB *db, uint64_t nrows UU(), long sleeptime) {
85 int r;
86
87 struct my_callback_context context;
88 dbt_init_realloc(&context.key);
89 dbt_init_realloc(&context.val);
90
91 DB_TXN *txn = NULL;
92 r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
93
94 DBC *cursor = NULL;
95 r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
96
97 uint64_t i;
98 for (i = 0; ; i++) {
99 r = cursor->c_getf_next(cursor, DB_RMW, blocking_next_callback, &context);
100 if (r != 0)
101 break;
102 if (verbose)
103 printf("%lu next %" PRIu64 "\n", (unsigned long) toku_pthread_self(), get_key(&context.key));
104 usleep(sleeptime);
105 }
106
107 if (verbose)
108 printf("%lu next=%d\n", (unsigned long) toku_pthread_self(), r);
109 assert(r == DB_NOTFOUND || r == DB_LOCK_DEADLOCK);
110
111 int rr = cursor->c_close(cursor); assert(rr == 0);
112
113 if (r == DB_NOTFOUND) {
114 if (verbose) printf("%lu commit\n", (unsigned long) toku_pthread_self());
115 r = txn->commit(txn, 0);
116 } else {
117 if (verbose) printf("%lu abort\n", (unsigned long) toku_pthread_self());
118 r = txn->abort(txn);
119 }
120 assert(r == 0);
121
122 toku_free(context.key.data);
123 toku_free(context.val.data);
124 }
125
blocking_prev(DB_ENV * db_env,DB * db,uint64_t nrows UU (),long sleeptime)126 static void blocking_prev(DB_ENV *db_env, DB *db, uint64_t nrows UU(), long sleeptime) {
127 int r;
128
129 struct my_callback_context context;
130 dbt_init_realloc(&context.key);
131 dbt_init_realloc(&context.val);
132
133 DB_TXN *txn = NULL;
134 r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
135
136 DBC *cursor = NULL;
137 r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
138
139 uint64_t i;
140 for (i = 0; ; i++) {
141 r = cursor->c_getf_prev(cursor, DB_RMW, blocking_next_callback, &context);
142 if (r != 0)
143 break;
144 if (verbose)
145 printf("%lu prev %" PRIu64 "\n", (unsigned long) toku_pthread_self(), get_key(&context.key));
146 usleep(sleeptime);
147 }
148
149 if (verbose)
150 printf("%lu prev=%d\n", (unsigned long) toku_pthread_self(), r);
151 assert(r == DB_NOTFOUND || r == DB_LOCK_DEADLOCK);
152
153 int rr = cursor->c_close(cursor); assert(rr == 0);
154
155 if (r == DB_NOTFOUND) {
156 if (verbose) printf("%lu commit\n", (unsigned long) toku_pthread_self());
157 r = txn->commit(txn, 0);
158 } else {
159 if (verbose) printf("%lu abort\n", (unsigned long) toku_pthread_self());
160 r = txn->abort(txn);
161 }
162 assert(r == 0);
163
164 toku_free(context.key.data);
165 toku_free(context.val.data);
166 }
167
168 struct blocking_next_args {
169 DB_ENV *db_env;
170 DB *db;
171 uint64_t nrows;
172 long sleeptime;
173 };
174
blocking_next_thread(void * arg)175 static void *blocking_next_thread(void *arg) {
176 struct blocking_next_args *a = (struct blocking_next_args *) arg;
177 blocking_next(a->db_env, a->db, a->nrows, a->sleeptime);
178 return arg;
179 }
180
run_test(DB_ENV * db_env,DB * db,int nthreads,uint64_t nrows,long sleeptime)181 static void run_test(DB_ENV *db_env, DB *db, int nthreads, uint64_t nrows, long sleeptime) {
182 int r;
183 toku_pthread_t tids[nthreads];
184 struct blocking_next_args a = {db_env, db, nrows, sleeptime};
185 for (int i = 0; i < nthreads - 1; i++) {
186 r = toku_pthread_create(
187 toku_uninstrumented, &tids[i], nullptr, blocking_next_thread, &a);
188 assert(r == 0);
189 }
190 blocking_prev(db_env, db, nrows, sleeptime);
191 for (int i = 0; i < nthreads - 1; i++) {
192 void *ret;
193 r = toku_pthread_join(tids[i], &ret); assert(r == 0);
194 }
195 }
196
test_main(int argc,char * const argv[])197 int test_main(int argc, char * const argv[]) {
198 uint64_t cachesize = 0;
199 uint32_t pagesize = 0;
200 uint64_t nrows = 10;
201 int nthreads = 2;
202 long sleeptime = 100000;
203 const char *db_env_dir = TOKU_TEST_FILENAME;
204 const char *db_filename = "test.db";
205 int db_env_open_flags = DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_THREAD;
206
207 // parse_args(argc, argv);
208 for (int i = 1; i < argc; i++) {
209 if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
210 verbose++;
211 continue;
212 }
213 if (strcmp(argv[i], "-q") == 0 || strcmp(argv[i], "--quiet") == 0) {
214 if (verbose > 0)
215 verbose--;
216 continue;
217 }
218 if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
219 nrows = atoll(argv[++i]);
220 continue;
221 }
222 if (strcmp(argv[i], "--nthreads") == 0 && i+1 < argc) {
223 nthreads = atoi(argv[++i]);
224 continue;
225 }
226 if (strcmp(argv[i], "--sleeptime") == 0 && i+1 < argc) {
227 sleeptime = atol(argv[++i]);
228 continue;
229 }
230 assert(0);
231 }
232
233 // setup env
234 int r;
235 char rm_cmd[strlen(db_env_dir) + strlen("rm -rf ") + 1];
236 snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", db_env_dir);
237 r = system(rm_cmd); assert(r == 0);
238
239 r = toku_os_mkdir(db_env_dir, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); assert(r == 0);
240
241 DB_ENV *db_env = NULL;
242 r = db_env_create(&db_env, 0); assert(r == 0);
243 if (cachesize) {
244 const uint64_t gig = 1 << 30;
245 r = db_env->set_cachesize(db_env, cachesize / gig, cachesize % gig, 1); assert(r == 0);
246 }
247 r = db_env->open(db_env, db_env_dir, db_env_open_flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
248 r = db_env->set_lock_timeout(db_env, 30 * 1000, nullptr); assert(r == 0);
249
250 // create the db
251 DB *db = NULL;
252 r = db_create(&db, db_env, 0); assert(r == 0);
253 if (pagesize) {
254 r = db->set_pagesize(db, pagesize); assert(r == 0);
255 }
256 r = db->open(db, NULL, db_filename, NULL, DB_BTREE, DB_CREATE|DB_AUTO_COMMIT|DB_THREAD, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
257
258 // populate the db
259 populate(db_env, db, nrows);
260
261 run_test(db_env, db, nthreads, nrows, sleeptime);
262
263 // close env
264 r = db->close(db, 0); assert(r == 0); db = NULL;
265 r = db_env->close(db_env, 0); assert(r == 0); db_env = NULL;
266
267 return 0;
268 }
269