1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40
41 // verify that serializable cursor locks deleted keys so that another transaction can not insert into the range being scanned by the cursor
42 // we create 2 level tree that looks like
43 // root node with pivot key 2
44 // left leaf contains keys 0, 1, and 2
45 // right leaf contains keys 3 and 4
46 // we delete key 2 while a snapshot txn exist so that garbage collection does not occur.
47 // txn_a walks a cursor through the deleted keys.
48 // when txn_a finishes reading the deleted keys, txn_b tries to insert key 2 and should get lock not granted.
49
50 #include <db.h>
51 #include <unistd.h>
52 #include <sys/stat.h>
53 #include <pthread.h>
54
55 static DB_ENV *env = NULL;
56 static DB_TXN *txn_a = NULL;
57 static DB_TXN *txn_b = NULL;
58 static DB *db = NULL;
59 static uint32_t db_page_size = 4096;
60 // static uint32_t db_basement_size = 4096;
61 static const char *envdir = TOKU_TEST_FILENAME;
62
63 static int
my_compare(DB * this_db UU (),const DBT * a UU (),const DBT * b UU ())64 my_compare(DB *this_db UU(), const DBT *a UU(), const DBT *b UU()) {
65 assert(a->size == b->size);
66 return memcmp(a->data, b->data, a->size);
67 }
68
69 static int
my_generate_row(DB * dest_db UU (),DB * src_db UU (),DBT_ARRAY * dest_key_arrays UU (),DBT_ARRAY * dest_val_arrays UU (),const DBT * src_key UU (),const DBT * src_val UU ())70 my_generate_row(DB *dest_db UU(), DB *src_db UU(), DBT_ARRAY *dest_key_arrays UU(), DBT_ARRAY *dest_val_arrays UU(), const DBT *src_key UU(), const DBT *src_val UU()) {
71 toku_dbt_array_resize(dest_key_arrays, 1);
72 toku_dbt_array_resize(dest_val_arrays, 1);
73 DBT *dest_key = &dest_key_arrays->dbts[0];
74 DBT *dest_val = &dest_val_arrays->dbts[0];
75 assert(dest_key->flags == DB_DBT_REALLOC);
76 dest_key->data = toku_realloc(dest_key->data, src_key->size);
77 memcpy(dest_key->data, src_key->data, src_key->size);
78 dest_key->size = src_key->size;
79 assert(dest_val->flags == DB_DBT_REALLOC);
80 dest_val->data = toku_realloc(dest_val->data, src_val->size);
81 memcpy(dest_val->data, src_val->data, src_val->size);
82 dest_val->size = src_val->size;
83 return 0;
84 }
85
86 static int
next_do_nothing(DBT const * UU (a),DBT const * UU (b),void * UU (c))87 next_do_nothing(DBT const *UU(a), DBT const *UU(b), void *UU(c)) {
88 return 0;
89 }
90
91 static void *
do_insert_2(void * arg)92 do_insert_2(void *arg) {
93 int r;
94 uint64_t key = 2;
95 char val[800]; memset(val, 0, sizeof val);
96 DBT k,v;
97 r = db->put(db, txn_b, dbt_init(&k, &key, sizeof key), dbt_init(&v, val, sizeof val), 0);
98 assert(r == DB_LOCK_NOTGRANTED);
99 return arg;
100 }
101
102 static ssize_t
my_pread(int fd,void * buf,size_t count,off_t offset)103 my_pread (int fd, void *buf, size_t count, off_t offset) {
104 static int my_pread_count = 0;
105 if (++my_pread_count == 5) {
106 pthread_t id;
107 pthread_create(&id, NULL, do_insert_2, NULL);
108 void *ret;
109 pthread_join(id, &ret);
110 }
111 return pread(fd, buf, count, offset);
112 }
113
114 static void
run_test(void)115 run_test(void) {
116 int r;
117 r = db_env_create(&env, 0); CKERR(r);
118 env->set_errfile(env, stderr);
119 r = env->set_redzone(env, 0); CKERR(r);
120 r = env->set_generate_row_callback_for_put(env, my_generate_row); CKERR(r);
121 r = env->set_default_bt_compare(env, my_compare); CKERR(r);
122 r = env->open(env, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
123
124 r = db_create(&db, env, 0); CKERR(r);
125 r = db->set_pagesize(db, db_page_size);
126 DB_TXN *txn = NULL;
127 r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
128 r = db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
129 r = txn->commit(txn, 0); CKERR(r);
130
131 // build a tree with 2 leaf nodes
132 r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
133 DB_LOADER *loader = NULL;
134 r = env->create_loader(env, txn, &loader, db, 1, &db, NULL, NULL, 0); CKERR(r);
135 for (uint64_t i = 0; i < 5; i++) {
136 uint64_t key = i;
137 char val[800]; memset(val, 0, sizeof val);
138 DBT k,v;
139 r = loader->put(loader, dbt_init(&k, &key, sizeof key), dbt_init(&v, val, sizeof val)); CKERR(r);
140 }
141 r = loader->close(loader); CKERR(r);
142 r = txn->commit(txn, 0); CKERR(r);
143
144 // delete key 2
145 r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
146 for (uint64_t i = 2; i < 3; i++) {
147 uint64_t key = i;
148 DBT k;
149 r = db->del(db, txn, dbt_init(&k, &key, sizeof key), 0); CKERR(r);
150 }
151 r = txn->commit(txn, 0); CKERR(r);
152
153 // close and reopen
154 r = db->close(db, 0); CKERR(r);
155 r = db_create(&db, env, 0); CKERR(r);
156 r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
157 r = db->open(db, txn, "foo.db", 0, DB_BTREE, 0, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
158 r = txn->commit(txn, 0); CKERR(r);
159
160 // create a txn that will try to insert key 2 while the serializable cursor is walking through the tree
161 r = env->txn_begin(env, 0, &txn_b, 0); CKERR(r);
162
163 // walk a serializable cursor through the tree
164 r = env->txn_begin(env, 0, &txn_a, 0); CKERR(r);
165 DBC *cursor = NULL;
166 r = db->cursor(db, txn_a, &cursor, 0); CKERR(r);
167 db_env_set_func_pread(my_pread);
168 while (1) {
169 r = cursor->c_getf_next(cursor, 0, next_do_nothing, NULL);
170 if (r != 0)
171 break;
172 }
173 db_env_set_func_pread(NULL);
174 r = cursor->c_close(cursor); CKERR(r);
175 r = txn_a->commit(txn_a, 0); CKERR(r);
176
177 r = txn_b->commit(txn_b, 0); CKERR(r);
178
179 r = db->close(db, 0); CKERR(r);
180 r = env->close(env, 0); CKERR(r);
181 }
182
183 static int
usage(void)184 usage(void) {
185 fprintf(stderr, "-v (verbose)\n");
186 fprintf(stderr, "-q (quiet)\n");
187 fprintf(stderr, "--envdir %s\n", envdir);
188 return 1;
189 }
190
191 int
test_main(int argc,char * const argv[])192 test_main (int argc , char * const argv[]) {
193 for (int i = 1 ; i < argc; i++) {
194 if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
195 verbose++;
196 continue;
197 }
198 if (strcmp(argv[i], "-q") == 0) {
199 if (verbose > 0)
200 verbose--;
201 continue;
202 }
203 if (strcmp(argv[i], "--envdir") == 0 && i+1 < argc) {
204 envdir = argv[++i];
205 continue;
206 }
207 return usage();
208 }
209
210 char rmcmd[32 + strlen(envdir)];
211 snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
212 int r;
213 r = system(rmcmd); CKERR(r);
214 r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
215
216 run_test();
217
218 return 0;
219 }
220