1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 // verify that serializable cursor locks deleted keys so that  another transaction can not insert into the range being scanned by the cursor
42 // we create 2 level tree that looks like
43 // root node with pivot key 2
44 // left leaf contains keys 0, 1, and 2
45 // right leaf contains keys 3 and 4
46 // we delete keys 0, 1, and 2 while a snapshot txn exist so that garbage collection does not occur.
47 // txn_a walks a cursor through the deleted keys.
48 // when txn_a finishes reading the deleted keys, txn_b tries to get a table lock.
49 // the table lock should fail since txn_a holds a read lock on the deleted key range.
50 
51 #include <db.h>
52 #include <unistd.h>
53 #include <sys/stat.h>
54 
55 static DB_ENV *env = NULL;
56 static DB_TXN *txn_a = NULL;
57 static DB_TXN *txn_b = NULL;
58 static DB *db = NULL;
59 static uint32_t db_page_size = 4096;
60 // static uint32_t db_basement_size = 4096;
61 static const char *envdir = TOKU_TEST_FILENAME;
62 
63 static int
my_compare(DB * this_db UU (),const DBT * a UU (),const DBT * b UU ())64 my_compare(DB *this_db UU(), const DBT *a UU(), const DBT *b UU()) {
65     assert(a->size == b->size);
66     return memcmp(a->data, b->data, a->size);
67 }
68 
69 static int
my_generate_row(DB * dest_db UU (),DB * src_db UU (),DBT_ARRAY * dest_key_arrays UU (),DBT_ARRAY * dest_val_arrays UU (),const DBT * src_key UU (),const DBT * src_val UU ())70 my_generate_row(DB *dest_db UU(), DB *src_db UU(), DBT_ARRAY *dest_key_arrays UU(), DBT_ARRAY *dest_val_arrays UU(), const DBT *src_key UU(), const DBT *src_val UU()) {
71     toku_dbt_array_resize(dest_key_arrays, 1);
72     toku_dbt_array_resize(dest_val_arrays, 1);
73     DBT *dest_key = &dest_key_arrays->dbts[0];
74     DBT *dest_val = &dest_val_arrays->dbts[0];
75     assert(dest_key->flags == DB_DBT_REALLOC);
76     dest_key->data = toku_realloc(dest_key->data, src_key->size);
77     memcpy(dest_key->data, src_key->data, src_key->size);
78     dest_key->size = src_key->size;
79     assert(dest_val->flags == DB_DBT_REALLOC);
80     dest_val->data = toku_realloc(dest_val->data, src_val->size);
81     memcpy(dest_val->data, src_val->data, src_val->size);
82     dest_val->size = src_val->size;
83     return 0;
84 }
85 
86 static int
next_do_nothing(DBT const * UU (a),DBT const * UU (b),void * UU (c))87 next_do_nothing(DBT const *UU(a), DBT  const *UU(b), void *UU(c)) {
88     return 0;
89 }
90 
91 static ssize_t
my_pread(int fd,void * buf,size_t count,off_t offset)92 my_pread (int fd, void *buf, size_t count, off_t offset) {
93     static int my_pread_count = 0;
94     if (++my_pread_count == 5) {
95         // try to acquire a table lock, should fail
96         int r = db->pre_acquire_table_lock(db, txn_b);
97         assert(r == DB_LOCK_NOTGRANTED);
98     }
99     return pread(fd, buf, count, offset);
100 }
101 
102 static void
run_test(void)103 run_test(void) {
104     int r;
105     r = db_env_create(&env, 0); CKERR(r);
106     env->set_errfile(env, stderr);
107     r = env->set_redzone(env, 0); CKERR(r);
108     r = env->set_generate_row_callback_for_put(env, my_generate_row); CKERR(r);
109     r = env->set_default_bt_compare(env, my_compare); CKERR(r);
110     r = env->open(env, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
111 
112     r = db_create(&db, env, 0); CKERR(r);
113     r = db->set_pagesize(db, db_page_size);
114     DB_TXN *txn = NULL;
115     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
116     r = db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
117     r = txn->commit(txn, 0);    CKERR(r);
118 
119     // build a tree with 2 leaf nodes
120     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
121     DB_LOADER *loader = NULL;
122     r = env->create_loader(env, txn, &loader, db, 1, &db, NULL, NULL, 0); CKERR(r);
123     for (uint64_t i = 0; i < 5; i++) {
124         uint64_t key = i;
125         char val[800]; memset(val, 0, sizeof val);
126         DBT k,v;
127         r = loader->put(loader, dbt_init(&k, &key, sizeof key), dbt_init(&v, val, sizeof val)); CKERR(r);
128     }
129     r = loader->close(loader); CKERR(r);
130     r = txn->commit(txn, 0);    CKERR(r);
131 
132     // this transaction ensure that garbage collection does not occur when deleting
133     DB_TXN *bogus_txn = NULL;
134     r = env->txn_begin(env, 0, &bogus_txn, DB_TXN_SNAPSHOT); CKERR(r);
135 
136     // delete the keys in the first leaf node
137     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
138     for (uint64_t i = 0; i < 3; i++) {
139         uint64_t key = i;
140         DBT k;
141         r = db->del(db, txn, dbt_init(&k, &key, sizeof key), 0); CKERR(r);
142     }
143     r = txn->commit(txn, 0);    CKERR(r);
144     r = bogus_txn->commit(bogus_txn, 0); CKERR(r);
145 
146     // close and reopen
147     r = db->close(db, 0);     CKERR(r);
148     r = db_create(&db, env, 0); CKERR(r);
149     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
150     r = db->open(db, txn, "foo.db", 0, DB_BTREE, 0, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
151     r = txn->commit(txn, 0);    CKERR(r);
152 
153     // create a txn that will try to acquire a write lock on key 0 in the pread callback
154     r = env->txn_begin(env, 0, &txn_b, 0); CKERR(r);
155 
156     // walk a serializable cursor through the tree
157     r = env->txn_begin(env, 0, &txn_a, 0); CKERR(r);
158     DBC *cursor = NULL;
159     r = db->cursor(db, txn_a, &cursor, 0); CKERR(r);
160     db_env_set_func_pread(my_pread);
161     while (1) {
162         r = cursor->c_getf_next(cursor, 0, next_do_nothing, NULL);
163         if (r != 0)
164             break;
165     }
166     db_env_set_func_pread(NULL);
167     r = cursor->c_close(cursor); CKERR(r);
168     r = txn_a->commit(txn_a, 0);    CKERR(r);
169 
170     r = txn_b->commit(txn_b, 0); CKERR(r);
171 
172     r = db->close(db, 0);     CKERR(r);
173     r = env->close(env, 0);   CKERR(r);
174 }
175 
176 static int
usage(void)177 usage(void) {
178     fprintf(stderr, "-v (verbose)\n");
179     fprintf(stderr, "-q (quiet)\n");
180     fprintf(stderr, "--envdir %s\n", envdir);
181     return 1;
182 }
183 
184 int
test_main(int argc,char * const argv[])185 test_main (int argc , char * const argv[]) {
186     for (int i = 1 ; i < argc; i++) {
187         if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
188             verbose++;
189             continue;
190         }
191         if (strcmp(argv[i], "-q") == 0) {
192             if (verbose > 0)
193                 verbose--;
194             continue;
195         }
196         if (strcmp(argv[i], "--envdir") == 0 && i+1 < argc) {
197             envdir = argv[++i];
198             continue;
199         }
200         return usage();
201     }
202 
203     char rmcmd[32 + strlen(envdir)];
204     snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
205     int r;
206     r = system(rmcmd); CKERR(r);
207     r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO);       CKERR(r);
208 
209     run_test();
210 
211     return 0;
212 }
213