1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 // verify that two transactions doing cursor next and prev operations on a tree do not conflict.
40
41 #include "test.h"
42 #include "toku_pthread.h"
43
get_key(DBT * key)44 static uint64_t get_key(DBT *key) {
45 uint64_t k = 0;
46 assert(key->size == sizeof k);
47 memcpy(&k, key->data, key->size);
48 return htonl(k);
49 }
50
populate(DB_ENV * db_env,DB * db,uint64_t nrows)51 static void populate(DB_ENV *db_env, DB *db, uint64_t nrows) {
52 int r;
53
54 DB_TXN *txn = NULL;
55 r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
56
57 for (uint64_t i = 0; i < nrows; i++) {
58
59 uint64_t k = htonl(i);
60 uint64_t v = i;
61 DBT key = { .data = &k, .size = sizeof k };
62 DBT val = { .data = &v, .size = sizeof v };
63 r = db->put(db, txn, &key, &val, 0); assert(r == 0);
64 }
65
66 r = txn->commit(txn, 0); assert(r == 0);
67 }
68
69 struct my_callback_context {
70 DBT key;
71 DBT val;
72 };
73
blocking_next_callback(DBT const * a UU (),DBT const * b UU (),void * e UU ())74 static int blocking_next_callback(DBT const *a UU(), DBT const *b UU(), void *e UU()) {
75 DBT const *found_key = a;
76 DBT const *found_val = b;
77 struct my_callback_context *context = (struct my_callback_context *) e;
78 copy_dbt(&context->key, found_key);
79 copy_dbt(&context->val, found_val);
80 return 0;
81 }
82
blocking_next(DB_ENV * db_env,DB * db,uint64_t nrows UU (),long sleeptime)83 static void blocking_next(DB_ENV *db_env, DB *db, uint64_t nrows UU(), long sleeptime) {
84 int r;
85
86 struct my_callback_context context;
87 dbt_init_realloc(&context.key);
88 dbt_init_realloc(&context.val);
89
90 DB_TXN *txn = NULL;
91 r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
92
93 DBC *cursor = NULL;
94 r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
95
96 uint64_t i;
97 for (i = 0; ; i++) {
98 r = cursor->c_getf_next(cursor, 0, blocking_next_callback, &context);
99 if (r != 0)
100 break;
101 if (verbose)
102 printf("%lu next %" PRIu64 "\n", (unsigned long) toku_pthread_self(), get_key(&context.key));
103 usleep(sleeptime);
104 }
105
106 if (verbose)
107 printf("%lu next=%d\n", (unsigned long) toku_pthread_self(), r);
108 #ifdef BLOCKING_ROW_LOCKS_READS_NOT_SHARED
109 assert(r == DB_NOTFOUND || r == DB_LOCK_DEADLOCK || r == DB_LOCK_NOTGRANTED);
110 #else
111 assert(r == DB_NOTFOUND);
112 #endif
113
114 int rr = cursor->c_close(cursor); assert(rr == 0);
115
116 if (r == DB_NOTFOUND) {
117 if (verbose) printf("%lu commit\n", (unsigned long) toku_pthread_self());
118 r = txn->commit(txn, 0);
119 } else {
120 if (verbose) printf("%lu abort\n", (unsigned long) toku_pthread_self());
121 r = txn->abort(txn);
122 }
123 assert(r == 0);
124
125 toku_free(context.key.data);
126 toku_free(context.val.data);
127 }
128
blocking_prev(DB_ENV * db_env,DB * db,uint64_t nrows UU (),long sleeptime)129 static void blocking_prev(DB_ENV *db_env, DB *db, uint64_t nrows UU(), long sleeptime) {
130 int r;
131
132 struct my_callback_context context;
133 dbt_init_realloc(&context.key);
134 dbt_init_realloc(&context.val);
135
136 DB_TXN *txn = NULL;
137 r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
138
139 DBC *cursor = NULL;
140 r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
141
142 uint64_t i;
143 for (i = 0; ; i++) {
144 r = cursor->c_getf_prev(cursor, 0, blocking_next_callback, &context);
145 if (r != 0)
146 break;
147 if (verbose)
148 printf("%lu prev %" PRIu64 "\n", (unsigned long) toku_pthread_self(), get_key(&context.key));
149 usleep(sleeptime);
150 }
151
152 if (verbose)
153 printf("%lu prev=%d\n", (unsigned long) toku_pthread_self(), r);
154 #ifdef BLOCKING_ROW_LOCKS_READS_NOT_SHARED
155 assert(r == DB_NOTFOUND || r == DB_LOCK_DEADLOCK || r == DB_LOCK_NOTGRANTED);
156 #else
157 assert(r == DB_NOTFOUND);
158 #endif
159
160 int rr = cursor->c_close(cursor); assert(rr == 0);
161
162 if (r == DB_NOTFOUND) {
163 if (verbose) printf("%lu commit\n", (unsigned long) toku_pthread_self());
164 r = txn->commit(txn, 0);
165 } else {
166 if (verbose) printf("%lu abort\n", (unsigned long) toku_pthread_self());
167 r = txn->abort(txn);
168 }
169 assert(r == 0);
170
171 toku_free(context.key.data);
172 toku_free(context.val.data);
173 }
174
175 struct blocking_next_args {
176 DB_ENV *db_env;
177 DB *db;
178 uint64_t nrows;
179 long sleeptime;
180 };
181
blocking_next_thread(void * arg)182 static void *blocking_next_thread(void *arg) {
183 struct blocking_next_args *a = (struct blocking_next_args *) arg;
184 blocking_next(a->db_env, a->db, a->nrows, a->sleeptime);
185 return arg;
186 }
187
run_test(DB_ENV * db_env,DB * db,int nthreads,uint64_t nrows,long sleeptime)188 static void run_test(DB_ENV *db_env, DB *db, int nthreads, uint64_t nrows, long sleeptime) {
189 int r;
190 toku_pthread_t tids[nthreads];
191 struct blocking_next_args a = {db_env, db, nrows, sleeptime};
192 for (int i = 0; i < nthreads - 1; i++) {
193 r = toku_pthread_create(
194 toku_uninstrumented, &tids[i], nullptr, blocking_next_thread, &a);
195 assert(r == 0);
196 }
197 blocking_prev(db_env, db, nrows, sleeptime);
198 for (int i = 0; i < nthreads - 1; i++) {
199 void *ret;
200 r = toku_pthread_join(tids[i], &ret); assert(r == 0);
201 }
202 }
203
test_main(int argc,char * const argv[])204 int test_main(int argc, char * const argv[]) {
205 uint64_t cachesize = 0;
206 uint32_t pagesize = 0;
207 uint64_t nrows = 10;
208 int nthreads = 2;
209 long sleeptime = 100000;
210 const char *db_env_dir = TOKU_TEST_FILENAME;
211 const char *db_filename = "test.db";
212 int db_env_open_flags = DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_THREAD;
213
214 // parse_args(argc, argv);
215 for (int i = 1; i < argc; i++) {
216 if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
217 verbose++;
218 continue;
219 }
220 if (strcmp(argv[i], "-q") == 0 || strcmp(argv[i], "--quiet") == 0) {
221 if (verbose > 0)
222 verbose--;
223 continue;
224 }
225 if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
226 nrows = atoll(argv[++i]);
227 continue;
228 }
229 if (strcmp(argv[i], "--nthreads") == 0 && i+1 < argc) {
230 nthreads = atoi(argv[++i]);
231 continue;
232 }
233 if (strcmp(argv[i], "--sleeptime") == 0 && i+1 < argc) {
234 sleeptime = atol(argv[++i]);
235 continue;
236 }
237 assert(0);
238 }
239
240 // setup env
241 int r;
242 char rm_cmd[strlen(db_env_dir) + strlen("rm -rf ") + 1];
243 snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", db_env_dir);
244 r = system(rm_cmd); assert(r == 0);
245
246 r = toku_os_mkdir(db_env_dir, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); assert(r == 0);
247
248 DB_ENV *db_env = NULL;
249 r = db_env_create(&db_env, 0); assert(r == 0);
250 if (cachesize) {
251 const uint64_t gig = 1 << 30;
252 r = db_env->set_cachesize(db_env, cachesize / gig, cachesize % gig, 1); assert(r == 0);
253 }
254 r = db_env->open(db_env, db_env_dir, db_env_open_flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
255
256 // create the db
257 DB *db = NULL;
258 r = db_create(&db, db_env, 0); assert(r == 0);
259 if (pagesize) {
260 r = db->set_pagesize(db, pagesize); assert(r == 0);
261 }
262 r = db->open(db, NULL, db_filename, NULL, DB_BTREE, DB_CREATE|DB_AUTO_COMMIT|DB_THREAD, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
263
264 // populate the db
265 populate(db_env, db, nrows);
266
267 run_test(db_env, db, nthreads, nrows, sleeptime);
268
269 // close env
270 r = db->close(db, 0); assert(r == 0); db = NULL;
271 r = db_env->close(db_env, 0); assert(r == 0); db_env = NULL;
272
273 return 0;
274 }
275