1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 // verify that put_multiple inserts the correct rows into N dictionaries
42 // verify that pu_multiple locks the correct keys for N dictionaries
43 
44 const int max_rows_per_primary = 9;
45 
46 static uint32_t
get_total_secondary_rows(uint32_t num_primary)47 get_total_secondary_rows(uint32_t num_primary) {
48     assert((num_primary % (max_rows_per_primary+1)) == 0);
49     return num_primary / (max_rows_per_primary+1) *
50         ( (max_rows_per_primary) * (max_rows_per_primary+1) / 2 );
51 }
52 
53 static uint8_t
get_num_keys(uint16_t i,uint8_t dbnum)54 get_num_keys(uint16_t i, uint8_t dbnum) {
55     return (i+dbnum) % (max_rows_per_primary + 1);  // 0..9.. 10 choices
56 }
57 
58 static uint16_t
get_total_num_keys(uint16_t i,uint8_t num_dbs)59 get_total_num_keys(uint16_t i, uint8_t num_dbs) {
60     uint16_t sum = 0;
61     for (uint8_t db = 0; db < num_dbs; ++db) {
62         sum += get_num_keys(i, db);
63     }
64     return sum;
65 }
66 
67 static uint32_t
get_key(uint16_t i,uint8_t dbnum,uint8_t which)68 get_key(uint16_t i, uint8_t dbnum, uint8_t which) {
69     uint32_t i32 = i;
70     uint32_t dbnum32 = dbnum;
71     uint32_t which32 = which;
72     uint32_t x = (dbnum32<<24) | (i32) | (which32<<8);
73     return x;
74 }
75 
76 static void
get_data(uint32_t * v,uint8_t i,uint8_t ndbs)77 get_data(uint32_t *v, uint8_t i, uint8_t ndbs) {
78     int index = 0;
79     for (uint8_t dbnum = 0; dbnum < ndbs; dbnum++) {
80         for (uint8_t which = 0; which < get_num_keys(i, dbnum); ++which) {
81             v[index++] = get_key(i, dbnum, which);
82         }
83     }
84 }
85 
86 static int
put_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)87 put_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) {
88     (void) src_val;
89     uint8_t dbnum;
90     assert(dest_db->descriptor->dbt.size == sizeof dbnum);
91     memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
92 
93     assert(dbnum > 0); // Does not get called for primary.
94     assert(dest_db != src_db);
95 
96     assert(src_key->size == 2);
97     uint16_t i = *(uint16_t*)src_key->data;
98     uint8_t num_keys = get_num_keys(i, dbnum);
99 
100     toku_dbt_array_resize(dest_keys, num_keys);
101     if (dest_vals) {
102         toku_dbt_array_resize(dest_vals, num_keys);
103     }
104 
105     for (uint8_t which = 0; which < num_keys; ++which) {
106         DBT *dest_key = &dest_keys->dbts[which];
107 
108         assert(dest_key->flags == DB_DBT_REALLOC);
109         {
110             // Memory management
111             if (dest_key->ulen < sizeof(uint32_t)) {
112                 dest_key->data = toku_xrealloc(dest_key->data, sizeof(uint32_t));
113                 dest_key->ulen = sizeof(uint32_t);
114             }
115             dest_key->size = sizeof(uint32_t);
116         }
117         *(uint32_t*)dest_key->data = get_key(i, dbnum, which);
118 
119         if (dest_vals) {
120             DBT *dest_val = &dest_vals->dbts[which];
121             dest_val->flags = 0;
122             dest_val->data = nullptr;
123             dest_val->size = 0;
124         }
125     }
126     return 0;
127 }
128 
129 static int
del_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,const DBT * src_key,const DBT * src_data)130 del_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, const DBT *src_key, const DBT *src_data) {
131     return put_callback(dest_db, src_db, dest_keys, NULL, src_key, src_data);
132 }
133 
134 static void
verify_locked(DB_ENV * env,DB * db,uint8_t dbnum,uint16_t i)135 verify_locked(DB_ENV *env, DB *db, uint8_t dbnum, uint16_t i) {
136     int r;
137     DB_TXN *txn = NULL;
138     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
139     if (dbnum == 0) {
140         DBT key; dbt_init(&key, &i, sizeof i);
141         r = db->del(db, txn, &key, DB_DELETE_ANY); CKERR2(r, DB_LOCK_NOTGRANTED);
142     } else {
143         for (uint8_t which = 0; which < get_num_keys(i, dbnum); ++which) {
144             uint32_t k = get_key(i, dbnum, which);
145             DBT key; dbt_init(&key, &k, sizeof k);
146             r = db->del(db, txn, &key, DB_DELETE_ANY); CKERR2(r, DB_LOCK_NOTGRANTED);
147         }
148     }
149     r = txn->abort(txn); assert_zero(r);
150 }
151 
152 static void
verify_seq_primary(DB_ENV * env,DB * db,int dbnum,int ndbs,int nrows)153 verify_seq_primary(DB_ENV *env, DB *db, int dbnum, int ndbs, int nrows) {
154     assert(dbnum==0);
155     int r;
156     DB_TXN *txn = NULL;
157     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
158 
159     DBC *cursor = NULL;
160     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
161     int i;
162     for (i = 0; ; i++) {
163         DBT key; memset(&key, 0, sizeof key);
164         DBT val; memset(&val, 0, sizeof val);
165         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
166         if (r != 0)
167             break;
168         uint16_t k;
169         assert(key.size == sizeof k);
170         memcpy(&k, key.data, key.size);
171         assert(k == i);
172 
173         uint32_t total_rows = get_total_num_keys(i, ndbs);
174         assert(val.size == total_rows * sizeof (uint32_t));
175         uint32_t v[total_rows]; get_data(v, i, ndbs);
176         assert(memcmp(val.data, v, val.size) == 0);
177     }
178     assert(i == nrows);
179     r = cursor->c_close(cursor); assert_zero(r);
180     r = txn->commit(txn, 0); assert_zero(r);
181 }
182 
183 static void
verify_seq(DB_ENV * env,DB * db,uint8_t dbnum,uint8_t ndbs,uint16_t nrows_primary)184 verify_seq(DB_ENV *env, DB *db, uint8_t dbnum, uint8_t ndbs, uint16_t nrows_primary) {
185     assert(dbnum > 0);
186     assert(dbnum < ndbs);
187     uint32_t nrows = get_total_secondary_rows(nrows_primary);
188     int r;
189     DB_TXN *txn = NULL;
190     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
191 
192     DBC *cursor = NULL;
193     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
194     uint16_t rows_found = 0;
195     uint16_t source_i = 0;
196     DBT key; memset(&key, 0, sizeof key);
197     DBT val; memset(&val, 0, sizeof val);
198     for (source_i = 0; source_i < nrows_primary; ++source_i) {
199         uint8_t num_keys = get_num_keys(source_i, dbnum);
200         for (uint8_t which = 0; which < num_keys; ++which) {
201             r = cursor->c_get(cursor, &key, &val, DB_NEXT);
202             CKERR(r);
203             uint32_t k;
204             assert(key.size == sizeof k);
205             memcpy(&k, key.data, key.size);
206             assert(k == get_key(source_i, dbnum, which));
207             assert(val.size == 0);
208             rows_found++;
209         }
210     }
211     r = cursor->c_get(cursor, &key, &val, DB_NEXT);
212     CKERR2(r, DB_NOTFOUND);
213     assert(rows_found == nrows);
214     r = cursor->c_close(cursor); assert_zero(r);
215     r = txn->commit(txn, 0); assert_zero(r);
216 }
217 
218 static void
verify(DB_ENV * env,DB * db[],int ndbs,int nrows)219 verify(DB_ENV *env, DB *db[], int ndbs, int nrows) {
220     verify_seq_primary(env, db[0], 0, ndbs, nrows);
221     for (int dbnum = 1; dbnum < ndbs; dbnum++)
222         verify_seq(env, db[dbnum], dbnum, ndbs, nrows);
223 }
224 
225 static void
verify_empty(DB_ENV * env,DB * db)226 verify_empty(DB_ENV *env, DB *db) {
227     int r;
228     DB_TXN *txn = NULL;
229     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
230 
231     DBC *cursor = NULL;
232     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
233     int i;
234     for (i = 0; ; i++) {
235         DBT key; memset(&key, 0, sizeof key);
236         DBT val; memset(&val, 0, sizeof val);
237         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
238         if (r != 0)
239             break;
240     }
241     assert_zero(i);
242     r = cursor->c_close(cursor); assert_zero(r);
243     r = txn->commit(txn, 0); assert_zero(r);
244 }
245 
246 static void
verify_del(DB_ENV * env,DB * db[],int ndbs)247 verify_del(DB_ENV *env, DB *db[], int ndbs) {
248     for (int dbnum = 0; dbnum < ndbs; dbnum++)
249         verify_empty(env, db[dbnum]);
250 }
251 
252 static void
populate(DB_ENV * env,DB * db[],uint8_t ndbs,uint16_t nrows,bool del)253 populate(DB_ENV *env, DB *db[], uint8_t ndbs, uint16_t nrows, bool del) {
254     int r;
255     DB_TXN *txn = NULL;
256     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
257 
258     DBT_ARRAY key_arrays[ndbs];
259     DBT_ARRAY val_arrays[ndbs];
260     for (uint8_t i = 0; i < ndbs; ++i) {
261         toku_dbt_array_init(&key_arrays[i], 1);
262         toku_dbt_array_init(&val_arrays[i], 1);
263     }
264     // populate
265     for (uint16_t i = 0; i < nrows; i++) {
266         uint32_t total_rows = get_total_num_keys(i, ndbs);
267         uint16_t k = i;
268         uint32_t v[total_rows]; get_data(v, i, ndbs);
269         DBT pri_key; dbt_init(&pri_key, &k, sizeof k);
270         DBT pri_val; dbt_init(&pri_val, &v[0], sizeof v);
271         uint32_t flags[ndbs]; memset(flags, 0, sizeof flags);
272         if (del) {
273             r = env->del_multiple(env, db[0], txn, &pri_key, &pri_val, ndbs, db, key_arrays, flags);
274         } else {
275             r = env->put_multiple(env, db[0], txn, &pri_key, &pri_val, ndbs, db, key_arrays, val_arrays, flags);
276         }
277         assert_zero(r);
278         for (int dbnum = 0; dbnum < ndbs; dbnum++)
279             verify_locked(env, db[dbnum], dbnum, i);
280     }
281     for (uint8_t i = 0; i < ndbs; ++i) {
282         toku_dbt_array_destroy(&key_arrays[i]);
283         toku_dbt_array_destroy(&val_arrays[i]);
284     }
285 
286     r = txn->commit(txn, 0); assert_zero(r);
287 }
288 
289 static void
run_test(int ndbs,int nrows)290 run_test(int ndbs, int nrows) {
291     int r;
292     DB_ENV *env = NULL;
293     r = db_env_create(&env, 0); assert_zero(r);
294 
295     r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
296     r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
297 
298     r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
299 
300     DB *db[ndbs];
301     for (uint8_t dbnum = 0; dbnum < ndbs; dbnum++) {
302         r = db_create(&db[dbnum], env, 0); assert_zero(r);
303 
304         DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
305 
306         char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
307         r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
308         assert_zero(r);
309 
310         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
311                 { int chk_r = db[dbnum]->change_descriptor(db[dbnum], txn_desc, &dbt_dbnum, 0); CKERR(chk_r); }
312         });
313     }
314 
315     populate(env, db, ndbs, nrows, false);
316 
317     verify(env, db, ndbs, nrows);
318 
319     populate(env, db, ndbs, nrows, true);
320 
321     verify_del(env, db, ndbs);
322 
323     for (int dbnum = 0; dbnum < ndbs; dbnum++)
324         r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
325 
326     r = env->close(env, 0); assert_zero(r);
327 }
328 
329 int
test_main(int argc,char * const argv[])330 test_main(int argc, char * const argv[]) {
331     int r;
332     int ndbs = 16;
333     int nrows = 100;
334 
335     // parse_args(argc, argv);
336     for (int i = 1; i < argc; i++) {
337         char * const arg = argv[i];
338         if (strcmp(arg, "-v") == 0) {
339             verbose++;
340             continue;
341         }
342         if (strcmp(arg, "-q") == 0) {
343             verbose = 0;
344             continue;
345         }
346         if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
347             ndbs = atoi(argv[++i]);
348             continue;
349         }
350         if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
351             nrows = atoi(argv[++i]);
352             continue;
353         }
354     }
355     //rows should be divisible by max_rows + 1 (so that we have an equal number of each type and we know the total)
356     if (nrows % (max_rows_per_primary+1) != 0) {
357         nrows += (max_rows_per_primary+1) - (nrows % (max_rows_per_primary+1));
358     }
359     assert(ndbs >= 0);
360     assert(ndbs < (1<<8) - 1);
361     assert(nrows >= 0);
362     assert(nrows < (1<<15));  // Leave plenty of room
363 
364     toku_os_recursive_delete(TOKU_TEST_FILENAME);
365     r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
366 
367     run_test(ndbs, nrows);
368 
369     return 0;
370 }
371 
372