1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 // verify that update_multiple where we change the data in row[i] col[j] from x to x+1
42 
43 static int
get_key(int i,int dbnum)44 get_key(int i, int dbnum) {
45     return htonl(2*(i + dbnum));
46 }
47 
48 static int
get_new_key(int i,int dbnum)49 get_new_key(int i, int dbnum) {
50     return htonl(2*(i + dbnum) + 1);
51 }
52 
53 static void
get_data(int * v,int i,int ndbs)54 get_data(int *v, int i, int ndbs) {
55     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
56         v[dbnum] = get_key(i, dbnum);
57     }
58 }
59 
60 static void
get_new_data(int * v,int i,int ndbs)61 get_new_data(int *v, int i, int ndbs) {
62     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
63         if ((i % ndbs) == dbnum)
64             v[dbnum] = get_new_key(i, dbnum);
65         else
66             v[dbnum] = get_key(i, dbnum);
67     }
68 }
69 
70 static int
put_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_key_arrays,DBT_ARRAY * dest_val_arrays,const DBT * src_key,const DBT * src_val)71 put_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_key_arrays, DBT_ARRAY *dest_val_arrays, const DBT *src_key, const DBT *src_val) {
72     toku_dbt_array_resize(dest_key_arrays, 1);
73     DBT *dest_key = &dest_key_arrays->dbts[0];
74     DBT *dest_val = NULL;
75     if (dest_val_arrays) {
76         toku_dbt_array_resize(dest_val_arrays, 1);
77         dest_val = &dest_val_arrays->dbts[0];
78     }
79 
80     (void) dest_db; (void) src_db; (void) dest_key; (void) dest_val; (void) src_key; (void) src_val;
81 
82     unsigned int dbnum;
83     assert(dest_db->descriptor->dbt.size == sizeof dbnum);
84     memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
85     assert(dbnum < src_val->size / sizeof (int));
86 
87     int *pri_key = (int *) src_key->data;
88     int *pri_data = (int *) src_val->data;
89 
90     switch (dest_key->flags) {
91     case 0:
92         dest_key->size = sizeof (int);
93         dest_key->data = dbnum == 0 ? &pri_key[dbnum] : &pri_data[dbnum];
94         break;
95     case DB_DBT_REALLOC:
96         dest_key->size = sizeof (int);
97         dest_key->data = toku_realloc(dest_key->data, dest_key->size);
98         memcpy(dest_key->data, dbnum == 0 ? &pri_key[dbnum] : &pri_data[dbnum], dest_key->size);
99         break;
100     default:
101         assert(0);
102     }
103 
104     if (dest_val) {
105         switch (dest_val->flags) {
106         case 0:
107             if (dbnum == 0) {
108                 dest_val->size = src_val->size;
109                 dest_val->data = src_val->data;
110             } else
111                 dest_val->size = 0;
112             break;
113         case DB_DBT_REALLOC:
114             if (dbnum == 0) {
115                 dest_val->size = src_val->size;
116                 dest_val->data = toku_realloc(dest_val->data, dest_val->size);
117                 memcpy(dest_val->data, src_val->data, dest_val->size);
118             } else
119                 dest_val->size = 0;
120             break;
121         default:
122             assert(0);
123         }
124     }
125 
126     return 0;
127 }
128 
129 static int
del_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_key_arrays,const DBT * src_key,const DBT * src_data)130 del_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_key_arrays, const DBT *src_key, const DBT *src_data) {
131     return put_callback(dest_db, src_db, dest_key_arrays, NULL, src_key, src_data);
132 }
133 
134 #if 0
135 static void
136 verify_locked(DB_ENV *env, DB *db, int k) {
137     int r;
138     DB_TXN *txn = NULL;
139     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
140     DBT key; dbt_init(&key, &k, sizeof k);
141     r = db->del(db, txn, &key, DB_DELETE_ANY); assert(r == DB_LOCK_NOTGRANTED);
142     r = txn->abort(txn); assert_zero(r);
143 }
144 
145 static void
146 verify_empty(DB_ENV *env, DB *db) {
147     int r;
148     DB_TXN *txn = NULL;
149     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
150 
151     DBC *cursor = NULL;
152     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
153     int i;
154     for (i = 0; ; i++) {
155         DBT key; memset(&key, 0, sizeof key);
156         DBT val; memset(&val, 0, sizeof val);
157         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
158         if (r != 0)
159             break;
160     }
161     assert_zero(i);
162     r = cursor->c_close(cursor); assert_zero(r);
163     r = txn->commit(txn, 0); assert_zero(r);
164 }
165 #endif
166 
167 static void
verify_seq(DB_ENV * env,DB * db,int dbnum,int ndbs,int nrows)168 verify_seq(DB_ENV *env, DB *db, int dbnum, int ndbs, int nrows) {
169     int r;
170     DB_TXN *txn = NULL;
171     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
172 
173     DBC *cursor = NULL;
174     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
175     int i;
176     for (i = 0; ; i++) {
177         DBT key; memset(&key, 0, sizeof key);
178         DBT val; memset(&val, 0, sizeof val);
179         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
180         if (r != 0)
181             break;
182         int k;
183         int expectk;
184         if (dbnum == 0 || (i % ndbs) != dbnum)
185             expectk = get_key(i, dbnum);
186         else
187             expectk = get_new_key(i, dbnum);
188 
189         assert(key.size == sizeof k);
190         memcpy(&k, key.data, key.size);
191         assert(k == expectk);
192 
193         if (dbnum == 0) {
194             assert(val.size == ndbs * sizeof (int));
195             int v[ndbs]; get_new_data(v, i, ndbs);
196             assert(memcmp(val.data, v, val.size) == 0);
197         } else
198             assert(val.size == 0);
199     }
200     assert(i == nrows); // if (i != nrows) printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, i, nrows); // assert(i == nrows);
201     r = cursor->c_close(cursor); assert_zero(r);
202     r = txn->commit(txn, 0); assert_zero(r);
203 }
204 
205 static void
update_diagonal(DB_ENV * env,DB * db[],int ndbs,int nrows)206 update_diagonal(DB_ENV *env, DB *db[], int ndbs, int nrows) {
207     assert(ndbs > 0);
208     int r;
209     DB_TXN *txn = NULL;
210     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
211     for (int i = 0; i < nrows; i++) {
212 
213         // update the data i % ndbs col from x to x+1
214 
215         int k = get_key(i, 0);
216         DBT old_key; dbt_init(&old_key, &k, sizeof k);
217         DBT new_key = old_key;
218 
219         int v[ndbs]; get_data(v, i, ndbs);
220         DBT old_data; dbt_init(&old_data, &v[0], sizeof v);
221 
222         int newv[ndbs]; get_new_data(newv, i, ndbs);
223         DBT new_data; dbt_init(&new_data, &newv[0], sizeof newv);
224 
225         int ndbts = 2 * ndbs;
226         DBT keys[ndbts]; memset(keys, 0, sizeof keys);
227         DBT vals[ndbts]; memset(vals, 0, sizeof vals);
228         uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
229 
230         r = env_update_multiple_test_no_array(env, ndbs > 0 ? db[0] : NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals);
231         assert_zero(r);
232     }
233     r = txn->commit(txn, 0); assert_zero(r);
234 }
235 
236 static void
populate_primary(DB_ENV * env,DB * db,int ndbs,int nrows)237 populate_primary(DB_ENV *env, DB *db, int ndbs, int nrows) {
238     int r;
239     DB_TXN *txn = NULL;
240     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
241 
242     // populate
243     for (int i = 0; i < nrows; i++) {
244         int k = get_key(i, 0);
245         int v[ndbs]; get_data(v, i, ndbs);
246         DBT key; dbt_init(&key, &k, sizeof k);
247         DBT val; dbt_init(&val, &v[0], sizeof v);
248         r = db->put(db, txn, &key, &val, 0); assert_zero(r);
249     }
250 
251     r = txn->commit(txn, 0); assert_zero(r);
252 }
253 
254 static void
populate_secondary(DB_ENV * env,DB * db,int dbnum,int nrows)255 populate_secondary(DB_ENV *env, DB *db, int dbnum, int nrows) {
256     int r;
257     DB_TXN *txn = NULL;
258     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
259 
260     // populate
261     for (int i = 0; i < nrows; i++) {
262         int k = get_key(i, dbnum);
263         DBT key; dbt_init(&key, &k, sizeof k);
264         DBT val; dbt_init(&val, NULL, 0);
265         r = db->put(db, txn, &key, &val, 0); assert_zero(r);
266     }
267 
268     r = txn->commit(txn, 0); assert_zero(r);
269 }
270 
271 static void
run_test(int ndbs,int nrows)272 run_test(int ndbs, int nrows) {
273     int r;
274     DB_ENV *env = NULL;
275     r = db_env_create(&env, 0); assert_zero(r);
276 
277     r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
278     r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
279 
280     r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
281 
282     DB *db[ndbs];
283     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
284         r = db_create(&db[dbnum], env, 0); assert_zero(r);
285 
286         DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
287 
288         char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
289         r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
290         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
291                 { int chk_r = db[dbnum]->change_descriptor(db[dbnum], txn_desc, &dbt_dbnum, 0); CKERR(chk_r); }
292         });
293     }
294 
295     for (int dbnum = 0; dbnum < ndbs-1; dbnum++) {
296         if (dbnum == 0)
297             populate_primary(env, db[dbnum], ndbs, nrows);
298         else
299             populate_secondary(env, db[dbnum], dbnum, nrows);
300     }
301 
302     DB_TXN *indexer_txn = NULL;
303     r = env->txn_begin(env, NULL, &indexer_txn, 0); assert_zero(r);
304 
305     DB_INDEXER *indexer = NULL;
306     uint32_t db_flags = 0;
307     r = env->create_indexer(env, indexer_txn, &indexer, db[0], 1, &db[ndbs-1], &db_flags, 0); assert_zero(r);
308 
309     update_diagonal(env, db, ndbs, nrows);
310 
311     r = indexer->build(indexer); assert_zero(r);
312     r = indexer->close(indexer); assert_zero(r);
313 
314     r = indexer_txn->commit(indexer_txn, 0); assert_zero(r);
315 
316     for (int dbnum = 0; dbnum < ndbs; dbnum++)
317         verify_seq(env, db[dbnum], dbnum, ndbs, nrows);
318     for (int dbnum = 0; dbnum < ndbs; dbnum++)
319         r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
320 
321     r = env->close(env, 0); assert_zero(r);
322 }
323 
324 int
test_main(int argc,char * const argv[])325 test_main(int argc, char * const argv[]) {
326     int r;
327     int ndbs = 2;
328     int nrows = 2;
329 
330     // parse_args(argc, argv);
331     for (int i = 1; i < argc; i++) {
332         char * const arg = argv[i];
333         if (strcmp(arg, "-v") == 0) {
334             verbose++;
335             continue;
336         }
337         if (strcmp(arg, "-q") == 0) {
338             verbose = 0;
339             continue;
340         }
341         if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
342             ndbs = atoi(argv[++i]);
343             continue;
344         }
345         if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
346             nrows = atoi(argv[++i]);
347             continue;
348         }
349     }
350 
351     toku_os_recursive_delete(TOKU_TEST_FILENAME);
352     r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
353 
354     run_test(ndbs, nrows);
355 
356     return 0;
357 }
358 
359