1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 // verify that two transactions doing cursor next and prev operations on a tree do not conflict.
40 
41 #include "test.h"
42 #include "toku_pthread.h"
43 
get_key(DBT * key)44 static uint64_t get_key(DBT *key) {
45     uint64_t k = 0;
46     assert(key->size == sizeof k);
47     memcpy(&k, key->data, key->size);
48     return htonl(k);
49 }
50 
populate(DB_ENV * db_env,DB * db,uint64_t nrows)51 static void populate(DB_ENV *db_env, DB *db, uint64_t nrows) {
52     int r;
53 
54     DB_TXN *txn = NULL;
55     r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
56 
57     for (uint64_t i = 0; i < nrows; i++) {
58 
59         uint64_t k = htonl(i);
60         uint64_t v = i;
61         DBT key = { .data = &k, .size = sizeof k };
62         DBT val = { .data = &v, .size = sizeof v };
63         r = db->put(db, txn, &key, &val, 0); assert(r == 0);
64     }
65 
66     r = txn->commit(txn, 0); assert(r == 0);
67 }
68 
69 struct my_callback_context {
70     DBT key;
71     DBT val;
72 };
73 
blocking_next_callback(DBT const * a UU (),DBT const * b UU (),void * e UU ())74 static int blocking_next_callback(DBT const *a UU(), DBT const *b UU(), void *e UU()) {
75     DBT const *found_key = a;
76     DBT const *found_val = b;
77     struct my_callback_context *context = (struct my_callback_context *) e;
78     copy_dbt(&context->key, found_key);
79     copy_dbt(&context->val, found_val);
80     return 0;
81 }
82 
blocking_next(DB_ENV * db_env,DB * db,uint64_t nrows UU (),long sleeptime)83 static void blocking_next(DB_ENV *db_env, DB *db, uint64_t nrows UU(), long sleeptime) {
84     int r;
85 
86     struct my_callback_context context;
87     dbt_init_realloc(&context.key);
88     dbt_init_realloc(&context.val);
89 
90     DB_TXN *txn = NULL;
91     r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
92 
93     DBC *cursor = NULL;
94     r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
95 
96     uint64_t i;
97     for (i = 0; ; i++) {
98         r = cursor->c_getf_next(cursor, 0, blocking_next_callback, &context);
99         if (r != 0)
100             break;
101         if (verbose)
102             printf("%lu next %" PRIu64 "\n", (unsigned long) toku_pthread_self(), get_key(&context.key));
103         usleep(sleeptime);
104     }
105 
106     if (verbose)
107         printf("%lu next=%d\n", (unsigned long) toku_pthread_self(), r);
108 #ifdef BLOCKING_ROW_LOCKS_READS_NOT_SHARED
109     assert(r == DB_NOTFOUND || r == DB_LOCK_DEADLOCK || r == DB_LOCK_NOTGRANTED);
110 #else
111     assert(r == DB_NOTFOUND);
112 #endif
113 
114     int rr = cursor->c_close(cursor); assert(rr == 0);
115 
116     if (r == DB_NOTFOUND) {
117         if (verbose) printf("%lu commit\n", (unsigned long) toku_pthread_self());
118         r = txn->commit(txn, 0);
119     } else {
120         if (verbose) printf("%lu abort\n", (unsigned long) toku_pthread_self());
121         r = txn->abort(txn);
122     }
123     assert(r == 0);
124 
125     toku_free(context.key.data);
126     toku_free(context.val.data);
127 }
128 
blocking_prev(DB_ENV * db_env,DB * db,uint64_t nrows UU (),long sleeptime)129 static void blocking_prev(DB_ENV *db_env, DB *db, uint64_t nrows UU(), long sleeptime) {
130     int r;
131 
132     struct my_callback_context context;
133     dbt_init_realloc(&context.key);
134     dbt_init_realloc(&context.val);
135 
136     DB_TXN *txn = NULL;
137     r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
138 
139     DBC *cursor = NULL;
140     r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
141 
142     uint64_t i;
143     for (i = 0; ; i++) {
144         r = cursor->c_getf_prev(cursor, 0, blocking_next_callback, &context);
145         if (r != 0)
146             break;
147         if (verbose)
148             printf("%lu prev %" PRIu64 "\n", (unsigned long) toku_pthread_self(), get_key(&context.key));
149         usleep(sleeptime);
150     }
151 
152     if (verbose)
153         printf("%lu prev=%d\n", (unsigned long) toku_pthread_self(), r);
154 #ifdef BLOCKING_ROW_LOCKS_READS_NOT_SHARED
155     assert(r == DB_NOTFOUND || r == DB_LOCK_DEADLOCK || r == DB_LOCK_NOTGRANTED);
156 #else
157     assert(r == DB_NOTFOUND);
158 #endif
159 
160     int rr = cursor->c_close(cursor); assert(rr == 0);
161 
162     if (r == DB_NOTFOUND) {
163         if (verbose) printf("%lu commit\n", (unsigned long) toku_pthread_self());
164         r = txn->commit(txn, 0);
165     } else {
166         if (verbose) printf("%lu abort\n", (unsigned long) toku_pthread_self());
167         r = txn->abort(txn);
168     }
169     assert(r == 0);
170 
171     toku_free(context.key.data);
172     toku_free(context.val.data);
173 }
174 
175 struct blocking_next_args {
176     DB_ENV *db_env;
177     DB *db;
178     uint64_t nrows;
179     long sleeptime;
180 };
181 
blocking_next_thread(void * arg)182 static void *blocking_next_thread(void *arg) {
183     struct blocking_next_args *a = (struct blocking_next_args *) arg;
184     blocking_next(a->db_env, a->db, a->nrows, a->sleeptime);
185     return arg;
186 }
187 
run_test(DB_ENV * db_env,DB * db,int nthreads,uint64_t nrows,long sleeptime)188 static void run_test(DB_ENV *db_env, DB *db, int nthreads, uint64_t nrows, long sleeptime) {
189     int r;
190     toku_pthread_t tids[nthreads];
191     struct blocking_next_args a = {db_env, db, nrows, sleeptime};
192     for (int i = 0; i < nthreads - 1; i++) {
193         r = toku_pthread_create(
194             toku_uninstrumented, &tids[i], nullptr, blocking_next_thread, &a);
195         assert(r == 0);
196     }
197     blocking_prev(db_env, db, nrows, sleeptime);
198     for (int i = 0; i < nthreads - 1; i++) {
199         void *ret;
200         r = toku_pthread_join(tids[i], &ret); assert(r == 0);
201     }
202 }
203 
test_main(int argc,char * const argv[])204 int test_main(int argc, char * const argv[]) {
205     uint64_t cachesize = 0;
206     uint32_t pagesize = 0;
207     uint64_t nrows = 10;
208     int nthreads = 2;
209     long sleeptime = 100000;
210     const char *db_env_dir = TOKU_TEST_FILENAME;
211     const char *db_filename = "test.db";
212     int db_env_open_flags = DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_THREAD;
213 
214     // parse_args(argc, argv);
215     for (int i = 1; i < argc; i++) {
216         if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
217             verbose++;
218             continue;
219         }
220         if (strcmp(argv[i], "-q") == 0 || strcmp(argv[i], "--quiet") == 0) {
221             if (verbose > 0)
222                 verbose--;
223             continue;
224         }
225         if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
226             nrows = atoll(argv[++i]);
227             continue;
228         }
229         if (strcmp(argv[i], "--nthreads") == 0 && i+1 < argc) {
230             nthreads = atoi(argv[++i]);
231             continue;
232         }
233         if (strcmp(argv[i], "--sleeptime") == 0 && i+1 < argc) {
234             sleeptime = atol(argv[++i]);
235             continue;
236         }
237         assert(0);
238     }
239 
240     // setup env
241     int r;
242     char rm_cmd[strlen(db_env_dir) + strlen("rm -rf ") + 1];
243     snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", db_env_dir);
244     r = system(rm_cmd); assert(r == 0);
245 
246     r = toku_os_mkdir(db_env_dir, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); assert(r == 0);
247 
248     DB_ENV *db_env = NULL;
249     r = db_env_create(&db_env, 0); assert(r == 0);
250     if (cachesize) {
251         const uint64_t gig = 1 << 30;
252         r = db_env->set_cachesize(db_env, cachesize / gig, cachesize % gig, 1); assert(r == 0);
253     }
254     r = db_env->open(db_env, db_env_dir, db_env_open_flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
255 
256     // create the db
257     DB *db = NULL;
258     r = db_create(&db, db_env, 0); assert(r == 0);
259     if (pagesize) {
260         r = db->set_pagesize(db, pagesize); assert(r == 0);
261     }
262     r = db->open(db, NULL, db_filename, NULL, DB_BTREE, DB_CREATE|DB_AUTO_COMMIT|DB_THREAD, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
263 
264     // populate the db
265     populate(db_env, db, nrows);
266 
267     run_test(db_env, db, nthreads, nrows, sleeptime);
268 
269     // close env
270     r = db->close(db, 0); assert(r == 0); db = NULL;
271     r = db_env->close(db_env, 0); assert(r == 0); db_env = NULL;
272 
273     return 0;
274 }
275