1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 // verify that del_multiple deletes the correct key from N dictionaries
42 // verify that del_multiple locks the correct key for N dictionaries
43 
44 static int
get_key(int i,int dbnum)45 get_key(int i, int dbnum) {
46     return htonl(i + dbnum);
47 }
48 
49 static void
get_data(int * v,int i,int ndbs)50 get_data(int *v, int i, int ndbs) {
51     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
52         v[dbnum] = get_key(i, dbnum);
53     }
54 }
55 
56 static int
del_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,const DBT * src_key,const DBT * src_data)57 del_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, const DBT *src_key, const DBT *src_data) {
58     toku_dbt_array_resize(dest_keys, 1);
59     DBT *dest_key = &dest_keys->dbts[0];
60     (void) dest_db; (void) src_db; (void) dest_keys; (void) src_key; (void) src_data;
61     assert(src_db == NULL);
62 
63     unsigned int dbnum;
64     assert(dest_db->descriptor->dbt.size == sizeof dbnum);
65     memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
66     assert(dbnum < src_data->size / sizeof (int));
67 
68     int *pri_data = (int *) src_data->data;
69 
70     assert(dest_key->flags == 0);
71     dest_key->size = sizeof (int);
72     dest_key->data = &pri_data[dbnum];
73 
74     return 0;
75 }
76 
77 static void
verify_locked(DB_ENV * env,DB * db,int k)78 verify_locked(DB_ENV *env, DB *db, int k) {
79     int r;
80     DB_TXN *txn = NULL;
81     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
82     DBT key; dbt_init(&key, &k, sizeof k);
83     r = db->del(db, txn, &key, DB_DELETE_ANY); assert(r == DB_LOCK_NOTGRANTED);
84     r = txn->abort(txn); assert_zero(r);
85 }
86 
87 static void
verify_empty(DB_ENV * env,DB * db)88 verify_empty(DB_ENV *env, DB *db) {
89     int r;
90     DB_TXN *txn = NULL;
91     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
92 
93     DBC *cursor = NULL;
94     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
95     int i;
96     for (i = 0; ; i++) {
97         DBT key; memset(&key, 0, sizeof key);
98         DBT val; memset(&val, 0, sizeof val);
99         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
100         if (r != 0)
101             break;
102     }
103     assert_zero(i);
104     r = cursor->c_close(cursor); assert_zero(r);
105     r = txn->commit(txn, 0); assert_zero(r);
106 }
107 
108 static void
verify_del_multiple(DB_ENV * env,DB * db[],int ndbs,int nrows)109 verify_del_multiple(DB_ENV *env, DB *db[], int ndbs, int nrows) {
110     int r;
111     DB_TXN *deltxn = NULL;
112     r = env->txn_begin(env, NULL, &deltxn, 0); assert_zero(r);
113     for (int i = 0; i < nrows; i++) {
114         int k = get_key(i, 0);
115         DBT pri_key; dbt_init(&pri_key, &k, sizeof k);
116         int v[ndbs]; get_data(v, i, ndbs);
117         DBT pri_data; dbt_init(&pri_data, &v[0], sizeof v);
118         DBT keys[ndbs]; memset(keys, 0, sizeof keys);
119         uint32_t flags[ndbs]; memset(flags, 0, sizeof flags);
120         r = env_del_multiple_test_no_array(env, NULL, deltxn, &pri_key, &pri_data, ndbs, db, keys, flags); assert_zero(r);
121         for (int dbnum = 0; dbnum < ndbs; dbnum++)
122             verify_locked(env, db[dbnum], get_key(i, dbnum));
123     }
124     r = deltxn->commit(deltxn, 0); assert_zero(r);
125     for (int dbnum = 0; dbnum < ndbs; dbnum++)
126         verify_empty(env, db[dbnum]);
127 }
128 
129 static void
populate_primary(DB_ENV * env,DB * db,int ndbs,int nrows)130 populate_primary(DB_ENV *env, DB *db, int ndbs, int nrows) {
131     int r;
132     DB_TXN *txn = NULL;
133     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
134 
135     // populate
136     for (int i = 0; i < nrows; i++) {
137         int k = get_key(i, 0);
138         int v[ndbs]; get_data(v, i, ndbs);
139         DBT key; dbt_init(&key, &k, sizeof k);
140         DBT val; dbt_init(&val, &v[0], sizeof v);
141         r = db->put(db, txn, &key, &val, 0); assert_zero(r);
142     }
143 
144     r = txn->commit(txn, 0); assert_zero(r);
145 }
146 
147 static void
populate_secondary(DB_ENV * env,DB * db,int dbnum,int nrows)148 populate_secondary(DB_ENV *env, DB *db, int dbnum, int nrows) {
149     int r;
150     DB_TXN *txn = NULL;
151     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
152 
153     // populate
154     for (int i = 0; i < nrows; i++) {
155         int k = get_key(i, dbnum);
156         DBT key; dbt_init(&key, &k, sizeof k);
157         DBT val; dbt_init(&val, NULL, 0);
158         r = db->put(db, txn, &key, &val, 0); assert_zero(r);
159     }
160 
161     r = txn->commit(txn, 0); assert_zero(r);
162 }
163 
164 static void
run_test(int ndbs,int nrows)165 run_test(int ndbs, int nrows) {
166     int r;
167     DB_ENV *env = NULL;
168     r = db_env_create(&env, 0); assert_zero(r);
169 
170     r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
171 
172     r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
173 
174     DB *db[ndbs];
175     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
176         r = db_create(&db[dbnum], env, 0); assert_zero(r);
177 
178         char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
179         r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
180 
181         DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
182         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
183                 { int chk_r = db[dbnum]->change_descriptor(db[dbnum], txn_desc, &dbt_dbnum, 0); CKERR(chk_r); }
184         });
185     }
186 
187     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
188         if (dbnum == 0)
189             populate_primary(env, db[dbnum], ndbs, nrows);
190         else
191             populate_secondary(env, db[dbnum], dbnum, nrows);
192     }
193 
194     verify_del_multiple(env, db, ndbs, nrows);
195 
196     for (int dbnum = 0; dbnum < ndbs; dbnum++)
197         r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
198 
199     r = env->close(env, 0); assert_zero(r);
200 }
201 
202 int
test_main(int argc,char * const argv[])203 test_main(int argc, char * const argv[]) {
204     int r;
205     int ndbs = 2;
206     int nrows = 2;
207 
208     // parse_args(argc, argv);
209     for (int i = 1; i < argc; i++) {
210         char * const arg = argv[i];
211         if (strcmp(arg, "-v") == 0) {
212             verbose++;
213             continue;
214         }
215         if (strcmp(arg, "-q") == 0) {
216             verbose = 0;
217             continue;
218         }
219         if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
220             ndbs = atoi(argv[++i]);
221             continue;
222         }
223         if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
224             nrows = atoi(argv[++i]);
225             continue;
226         }
227     }
228 
229     toku_os_recursive_delete(TOKU_TEST_FILENAME);
230     r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
231 
232     run_test(ndbs, nrows);
233 
234     return 0;
235 }
236 
237