1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 // verify that cursor first with a write lock suspends the conflicting threads.
40
41 #include "test.h"
42 #include "toku_pthread.h"
43
populate(DB_ENV * db_env,DB * db,uint64_t nrows)44 static void populate(DB_ENV *db_env, DB *db, uint64_t nrows) {
45 int r;
46
47 DB_TXN *txn = NULL;
48 r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
49
50 for (uint64_t i = 0; i < nrows; i++) {
51
52 uint64_t k = htonl(i);
53 uint64_t v = i;
54 DBT key = { .data = &k, .size = sizeof k };
55 DBT val = { .data = &v, .size = sizeof v };
56 r = db->put(db, txn, &key, &val, 0); assert(r == 0);
57 }
58
59 r = txn->commit(txn, 0); assert(r == 0);
60 }
61
62 struct my_callback_context {
63 DBT key;
64 DBT val;
65 };
66
blocking_first_callback(DBT const * a UU (),DBT const * b UU (),void * e UU ())67 static int blocking_first_callback(DBT const *a UU(), DBT const *b UU(), void *e UU()) {
68 DBT const *found_key = a;
69 DBT const *found_val = b;
70 struct my_callback_context *context = (struct my_callback_context *) e;
71 copy_dbt(&context->key, found_key);
72 copy_dbt(&context->val, found_val);
73 return 0;
74 }
75
blocking_first(DB_ENV * db_env,DB * db,uint64_t nrows,long sleeptime)76 static void blocking_first(DB_ENV *db_env, DB *db, uint64_t nrows, long sleeptime) {
77 int r;
78
79 struct my_callback_context context;
80 dbt_init_realloc(&context.key);
81 dbt_init_realloc(&context.val);
82
83 for (uint64_t i = 0; i < nrows; i++) {
84 DB_TXN *txn = NULL;
85 r = db_env->txn_begin(db_env, NULL, &txn, 0); assert(r == 0);
86
87 DBC *cursor = NULL;
88 r = db->cursor(db, txn, &cursor, 0); assert(r == 0); // get a write lock on -inf ... 0
89 r = cursor->c_getf_first(cursor, DB_RMW, blocking_first_callback, &context); assert(r == 0);
90 usleep(sleeptime);
91
92 r = cursor->c_close(cursor); assert(r == 0);
93
94 r = txn->commit(txn, 0); assert(r == 0);
95 if (verbose)
96 printf("%lu %" PRIu64 "\n", (unsigned long) toku_pthread_self(), i);
97 }
98
99 toku_free(context.key.data);
100 toku_free(context.val.data);
101 }
102
103 struct blocking_first_args {
104 DB_ENV *db_env;
105 DB *db;
106 uint64_t nrows;
107 long sleeptime;
108 };
109
blocking_first_thread(void * arg)110 static void *blocking_first_thread(void *arg) {
111 struct blocking_first_args *a = (struct blocking_first_args *) arg;
112 blocking_first(a->db_env, a->db, a->nrows, a->sleeptime);
113 return arg;
114 }
115
run_test(DB_ENV * db_env,DB * db,int nthreads,uint64_t nrows,long sleeptime)116 static void run_test(DB_ENV *db_env, DB *db, int nthreads, uint64_t nrows, long sleeptime) {
117 int r;
118 toku_pthread_t tids[nthreads];
119 struct blocking_first_args a = {db_env, db, nrows, sleeptime};
120 for (int i = 0; i < nthreads - 1; i++) {
121 r = toku_pthread_create(
122 toku_uninstrumented, &tids[i], nullptr, blocking_first_thread, &a);
123 assert(r == 0);
124 }
125 blocking_first(db_env, db, nrows, sleeptime);
126 for (int i = 0; i < nthreads - 1; i++) {
127 void *ret;
128 r = toku_pthread_join(tids[i], &ret); assert(r == 0);
129 }
130 }
131
test_main(int argc,char * const argv[])132 int test_main(int argc, char * const argv[]) {
133 uint64_t cachesize = 0;
134 uint32_t pagesize = 0;
135 uint64_t nrows = 10;
136 int nthreads = 2;
137 long sleeptime = 100000;
138 const char *db_env_dir = TOKU_TEST_FILENAME;
139 const char *db_filename = "test.db";
140 int db_env_open_flags = DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_THREAD;
141
142 // parse_args(argc, argv);
143 for (int i = 1; i < argc; i++) {
144 if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
145 verbose++;
146 continue;
147 }
148 if (strcmp(argv[i], "-q") == 0 || strcmp(argv[i], "--quiet") == 0) {
149 if (verbose > 0)
150 verbose--;
151 continue;
152 }
153 if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
154 nrows = atoll(argv[++i]);
155 continue;
156 }
157 if (strcmp(argv[i], "--nthreads") == 0 && i+1 < argc) {
158 nthreads = atoi(argv[++i]);
159 continue;
160 }
161 if (strcmp(argv[i], "--sleeptime") == 0 && i+1 < argc) {
162 sleeptime = atol(argv[++i]);
163 continue;
164 }
165 assert(0);
166 }
167
168 // setup env
169 int r;
170 char rm_cmd[strlen(db_env_dir) + strlen("rm -rf ") + 1];
171 snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", db_env_dir);
172 r = system(rm_cmd); assert(r == 0);
173
174 r = toku_os_mkdir(db_env_dir, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); assert(r == 0);
175
176 DB_ENV *db_env = NULL;
177 r = db_env_create(&db_env, 0); assert(r == 0);
178 if (cachesize) {
179 const uint64_t gig = 1 << 30;
180 r = db_env->set_cachesize(db_env, cachesize / gig, cachesize % gig, 1); assert(r == 0);
181 }
182 r = db_env->open(db_env, db_env_dir, db_env_open_flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
183 r = db_env->set_lock_timeout(db_env, 30 * 1000, nullptr); assert(r == 0);
184
185 // create the db
186 DB *db = NULL;
187 r = db_create(&db, db_env, 0); assert(r == 0);
188 if (pagesize) {
189 r = db->set_pagesize(db, pagesize); assert(r == 0);
190 }
191 r = db->open(db, NULL, db_filename, NULL, DB_BTREE, DB_CREATE|DB_AUTO_COMMIT|DB_THREAD, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
192
193 // populate the db
194 populate(db_env, db, nrows);
195
196 run_test(db_env, db, nthreads, nrows, sleeptime);
197
198 // close env
199 r = db->close(db, 0); assert(r == 0); db = NULL;
200 r = db_env->close(db_env, 0); assert(r == 0); db_env = NULL;
201
202 return 0;
203 }
204