1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 // verify that serializable cursor locks deleted keys so that  another transaction can not insert into the range being scanned by the cursor
42 // we create 2 level tree that looks like
43 // root node with pivot key 2
44 // left leaf contains keys 0, 1, and 2
45 // right leaf contains keys 3 and 4
46 // we delete key 2 while a snapshot txn exist so that garbage collection does not occur.
47 // txn_a walks a cursor through the deleted keys.
48 // when txn_a finishes reading the deleted keys, txn_b tries to insert key 2 and should get lock not granted.
49 
50 #include <db.h>
51 #include <unistd.h>
52 #include <sys/stat.h>
53 #include <pthread.h>
54 
55 static DB_ENV *env = NULL;
56 static DB_TXN *txn_a = NULL;
57 static DB_TXN *txn_b = NULL;
58 static DB *db = NULL;
59 static uint32_t db_page_size = 4096;
60 // static uint32_t db_basement_size = 4096;
61 static const char *envdir = TOKU_TEST_FILENAME;
62 
63 static int
my_compare(DB * this_db UU (),const DBT * a UU (),const DBT * b UU ())64 my_compare(DB *this_db UU(), const DBT *a UU(), const DBT *b UU()) {
65     assert(a->size == b->size);
66     return memcmp(a->data, b->data, a->size);
67 }
68 
69 static int
my_generate_row(DB * dest_db UU (),DB * src_db UU (),DBT_ARRAY * dest_key_arrays UU (),DBT_ARRAY * dest_val_arrays UU (),const DBT * src_key UU (),const DBT * src_val UU ())70 my_generate_row(DB *dest_db UU(), DB *src_db UU(), DBT_ARRAY *dest_key_arrays UU(), DBT_ARRAY *dest_val_arrays UU(), const DBT *src_key UU(), const DBT *src_val UU()) {
71     toku_dbt_array_resize(dest_key_arrays, 1);
72     toku_dbt_array_resize(dest_val_arrays, 1);
73     DBT *dest_key = &dest_key_arrays->dbts[0];
74     DBT *dest_val = &dest_val_arrays->dbts[0];
75     assert(dest_key->flags == DB_DBT_REALLOC);
76     dest_key->data = toku_realloc(dest_key->data, src_key->size);
77     memcpy(dest_key->data, src_key->data, src_key->size);
78     dest_key->size = src_key->size;
79     assert(dest_val->flags == DB_DBT_REALLOC);
80     dest_val->data = toku_realloc(dest_val->data, src_val->size);
81     memcpy(dest_val->data, src_val->data, src_val->size);
82     dest_val->size = src_val->size;
83     return 0;
84 }
85 
86 static int
next_do_nothing(DBT const * UU (a),DBT const * UU (b),void * UU (c))87 next_do_nothing(DBT const *UU(a), DBT  const *UU(b), void *UU(c)) {
88     return 0;
89 }
90 
91 static void *
do_insert_2(void * arg)92 do_insert_2(void *arg) {
93     int r;
94     uint64_t key = 2;
95     char val[800]; memset(val, 0, sizeof val);
96     DBT k,v;
97     r = db->put(db, txn_b, dbt_init(&k, &key, sizeof key), dbt_init(&v, val, sizeof val), 0);
98     assert(r == DB_LOCK_NOTGRANTED);
99     return arg;
100 }
101 
102 static ssize_t
my_pread(int fd,void * buf,size_t count,off_t offset)103 my_pread (int fd, void *buf, size_t count, off_t offset) {
104     static int my_pread_count = 0;
105     if (++my_pread_count == 5) {
106         pthread_t id;
107         pthread_create(&id, NULL, do_insert_2, NULL);
108         void *ret;
109         pthread_join(id, &ret);
110     }
111     return pread(fd, buf, count, offset);
112 }
113 
114 static void
run_test(void)115 run_test(void) {
116     int r;
117     r = db_env_create(&env, 0); CKERR(r);
118     env->set_errfile(env, stderr);
119     r = env->set_redzone(env, 0); CKERR(r);
120     r = env->set_generate_row_callback_for_put(env, my_generate_row); CKERR(r);
121     r = env->set_default_bt_compare(env, my_compare); CKERR(r);
122     r = env->open(env, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
123 
124     r = db_create(&db, env, 0); CKERR(r);
125     r = db->set_pagesize(db, db_page_size);
126     DB_TXN *txn = NULL;
127     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
128     r = db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
129     r = txn->commit(txn, 0);    CKERR(r);
130 
131     // build a tree with 2 leaf nodes
132     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
133     DB_LOADER *loader = NULL;
134     r = env->create_loader(env, txn, &loader, db, 1, &db, NULL, NULL, 0); CKERR(r);
135     for (uint64_t i = 0; i < 5; i++) {
136         uint64_t key = i;
137         char val[800]; memset(val, 0, sizeof val);
138         DBT k,v;
139         r = loader->put(loader, dbt_init(&k, &key, sizeof key), dbt_init(&v, val, sizeof val)); CKERR(r);
140     }
141     r = loader->close(loader); CKERR(r);
142     r = txn->commit(txn, 0);    CKERR(r);
143 
144     // delete key 2
145     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
146     for (uint64_t i = 2; i < 3; i++) {
147         uint64_t key = i;
148         DBT k;
149         r = db->del(db, txn, dbt_init(&k, &key, sizeof key), 0); CKERR(r);
150     }
151     r = txn->commit(txn, 0);    CKERR(r);
152 
153     // close and reopen
154     r = db->close(db, 0);     CKERR(r);
155     r = db_create(&db, env, 0); CKERR(r);
156     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
157     r = db->open(db, txn, "foo.db", 0, DB_BTREE, 0, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
158     r = txn->commit(txn, 0);    CKERR(r);
159 
160     // create a txn that will try to insert key 2 while the serializable cursor is walking through the tree
161     r = env->txn_begin(env, 0, &txn_b, 0); CKERR(r);
162 
163     // walk a serializable cursor through the tree
164     r = env->txn_begin(env, 0, &txn_a, 0); CKERR(r);
165     DBC *cursor = NULL;
166     r = db->cursor(db, txn_a, &cursor, 0); CKERR(r);
167     db_env_set_func_pread(my_pread);
168     while (1) {
169         r = cursor->c_getf_next(cursor, 0, next_do_nothing, NULL);
170         if (r != 0)
171             break;
172     }
173     db_env_set_func_pread(NULL);
174     r = cursor->c_close(cursor); CKERR(r);
175     r = txn_a->commit(txn_a, 0);    CKERR(r);
176 
177     r = txn_b->commit(txn_b, 0); CKERR(r);
178 
179     r = db->close(db, 0);     CKERR(r);
180     r = env->close(env, 0);   CKERR(r);
181 }
182 
183 static int
usage(void)184 usage(void) {
185     fprintf(stderr, "-v (verbose)\n");
186     fprintf(stderr, "-q (quiet)\n");
187     fprintf(stderr, "--envdir %s\n", envdir);
188     return 1;
189 }
190 
191 int
test_main(int argc,char * const argv[])192 test_main (int argc , char * const argv[]) {
193     for (int i = 1 ; i < argc; i++) {
194         if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
195             verbose++;
196             continue;
197         }
198         if (strcmp(argv[i], "-q") == 0) {
199             if (verbose > 0)
200                 verbose--;
201             continue;
202         }
203         if (strcmp(argv[i], "--envdir") == 0 && i+1 < argc) {
204             envdir = argv[++i];
205             continue;
206         }
207         return usage();
208     }
209 
210     char rmcmd[32 + strlen(envdir)];
211     snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
212     int r;
213     r = system(rmcmd); CKERR(r);
214     r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO);       CKERR(r);
215 
216     run_test();
217 
218     return 0;
219 }
220