1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 // verify that cursor first with a write lock suspends the conflicting threads.
40 
41 #include "test.h"
42 #include "toku_pthread.h"
43 
populate(DB_ENV * db_env,DB * db,uint64_t nrows)44 static void populate(DB_ENV *db_env, DB *db, uint64_t nrows) {
45     int r;
46 
47     DB_TXN *txn = NULL;
48     r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
49 
50     for (uint64_t i = 0; i < nrows; i++) {
51 
52         uint64_t k = htonl(i);
53         uint64_t v = i;
54         DBT key = { .data = &k, .size = sizeof k };
55         DBT val = { .data = &v, .size = sizeof v };
56         r = db->put(db, txn, &key, &val, 0); assert(r == 0);
57     }
58 
59     r = txn->commit(txn, 0); assert(r == 0);
60 }
61 
62 struct my_callback_context {
63     DBT key;
64     DBT val;
65 };
66 
blocking_first_callback(DBT const * a UU (),DBT const * b UU (),void * e UU ())67 static int blocking_first_callback(DBT const *a UU(), DBT const *b UU(), void *e UU()) {
68     DBT const *found_key = a;
69     DBT const *found_val = b;
70     struct my_callback_context *context = (struct my_callback_context *) e;
71     copy_dbt(&context->key, found_key);
72     copy_dbt(&context->val, found_val);
73     return 0;
74 }
75 
blocking_first(DB_ENV * db_env,DB * db,uint64_t nrows,long sleeptime)76 static void blocking_first(DB_ENV *db_env, DB *db, uint64_t nrows, long sleeptime) {
77     int r;
78 
79     struct my_callback_context context;
80     dbt_init_realloc(&context.key);
81     dbt_init_realloc(&context.val);
82 
83     for (uint64_t i = 0; i < nrows; i++) {
84         DB_TXN *txn = NULL;
85         r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
86 
87         DBC *cursor = NULL;
88         r = db->cursor(db, txn, &cursor, 0); assert(r == 0); // get a write lock on -inf ... 0
89         r = cursor->c_getf_first(cursor, DB_RMW, blocking_first_callback, &context); assert(r == 0);
90         usleep(sleeptime);
91 
92         r = cursor->c_close(cursor); assert(r == 0);
93 
94         r = txn->commit(txn, 0); assert(r == 0);
95         if (verbose)
96             printf("%lu %" PRIu64 "\n", (unsigned long) toku_pthread_self(), i);
97     }
98 
99     toku_free(context.key.data);
100     toku_free(context.val.data);
101 }
102 
103 struct blocking_first_args {
104     DB_ENV *db_env;
105     DB *db;
106     uint64_t nrows;
107     long sleeptime;
108 };
109 
blocking_first_thread(void * arg)110 static void *blocking_first_thread(void *arg) {
111     struct blocking_first_args *a = (struct blocking_first_args *) arg;
112     blocking_first(a->db_env, a->db, a->nrows, a->sleeptime);
113     return arg;
114 }
115 
run_test(DB_ENV * db_env,DB * db,int nthreads,uint64_t nrows,long sleeptime)116 static void run_test(DB_ENV *db_env, DB *db, int nthreads, uint64_t nrows, long sleeptime) {
117     int r;
118     toku_pthread_t tids[nthreads];
119     struct blocking_first_args a = {db_env, db, nrows, sleeptime};
120     for (int i = 0; i < nthreads - 1; i++) {
121         r = toku_pthread_create(
122             toku_uninstrumented, &tids[i], nullptr, blocking_first_thread, &a);
123         assert(r == 0);
124     }
125     blocking_first(db_env, db, nrows, sleeptime);
126     for (int i = 0; i < nthreads - 1; i++) {
127         void *ret;
128         r = toku_pthread_join(tids[i], &ret); assert(r == 0);
129     }
130 }
131 
test_main(int argc,char * const argv[])132 int test_main(int argc, char * const argv[]) {
133     uint64_t cachesize = 0;
134     uint32_t pagesize = 0;
135     uint64_t nrows = 10;
136     int nthreads = 2;
137     long sleeptime = 100000;
138     const char *db_env_dir = TOKU_TEST_FILENAME;
139     const char *db_filename = "test.db";
140     int db_env_open_flags = DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_THREAD;
141 
142     // parse_args(argc, argv);
143     for (int i = 1; i < argc; i++) {
144         if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
145             verbose++;
146             continue;
147         }
148         if (strcmp(argv[i], "-q") == 0 || strcmp(argv[i], "--quiet") == 0) {
149             if (verbose > 0)
150                 verbose--;
151             continue;
152         }
153         if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
154             nrows = atoll(argv[++i]);
155             continue;
156         }
157         if (strcmp(argv[i], "--nthreads") == 0 && i+1 < argc) {
158             nthreads = atoi(argv[++i]);
159             continue;
160         }
161         if (strcmp(argv[i], "--sleeptime") == 0 && i+1 < argc) {
162             sleeptime = atol(argv[++i]);
163             continue;
164         }
165         assert(0);
166     }
167 
168     // setup env
169     int r;
170     char rm_cmd[strlen(db_env_dir) + strlen("rm -rf ") + 1];
171     snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", db_env_dir);
172     r = system(rm_cmd); assert(r == 0);
173 
174     r = toku_os_mkdir(db_env_dir, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); assert(r == 0);
175 
176     DB_ENV *db_env = NULL;
177     r = db_env_create(&db_env, 0); assert(r == 0);
178     if (cachesize) {
179         const uint64_t gig = 1 << 30;
180         r = db_env->set_cachesize(db_env, cachesize / gig, cachesize % gig, 1); assert(r == 0);
181     }
182     r = db_env->open(db_env, db_env_dir, db_env_open_flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
183     r = db_env->set_lock_timeout(db_env, 30 * 1000, nullptr); assert(r == 0);
184 
185     // create the db
186     DB *db = NULL;
187     r = db_create(&db, db_env, 0); assert(r == 0);
188     if (pagesize) {
189         r = db->set_pagesize(db, pagesize); assert(r == 0);
190     }
191     r = db->open(db, NULL, db_filename, NULL, DB_BTREE, DB_CREATE|DB_AUTO_COMMIT|DB_THREAD, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
192 
193     // populate the db
194     populate(db_env, db, nrows);
195 
196     run_test(db_env, db, nthreads, nrows, sleeptime);
197 
198     // close env
199     r = db->close(db, 0); assert(r == 0); db = NULL;
200     r = db_env->close(db_env, 0); assert(r == 0); db_env = NULL;
201 
202     return 0;
203 }
204