1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 // verify that update_multiple where we change the data in row[i] col[j] from x to x+1
42 
43 static int
get_key(int i,int dbnum)44 get_key(int i, int dbnum) {
45     return htonl(2*(i + dbnum));
46 }
47 
48 static int
get_new_key(int i,int dbnum)49 get_new_key(int i, int dbnum) {
50     return htonl(2*(i + dbnum) + 1);
51 }
52 
53 static void
get_data(int * v,int i,int ndbs)54 get_data(int *v, int i, int ndbs) {
55     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
56         v[dbnum] = get_key(i, dbnum);
57     }
58 }
59 
60 static void
get_new_data(int * v,int i,int ndbs)61 get_new_data(int *v, int i, int ndbs) {
62     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
63         if ((i % ndbs) == dbnum)
64             v[dbnum] = get_new_key(i, dbnum);
65         else
66             v[dbnum] = get_key(i, dbnum);
67     }
68 }
69 
70 static int
put_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_key_arrays,DBT_ARRAY * dest_val_arrays,const DBT * src_key,const DBT * src_val)71 put_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_key_arrays, DBT_ARRAY *dest_val_arrays, const DBT *src_key, const DBT *src_val) {
72     toku_dbt_array_resize(dest_key_arrays, 1);
73     DBT *dest_key = &dest_key_arrays->dbts[0];
74     DBT *dest_val = NULL;
75     if (dest_val_arrays) {
76         toku_dbt_array_resize(dest_val_arrays, 1);
77         dest_val = &dest_val_arrays->dbts[0];
78     }
79     (void) dest_db; (void) src_db; (void) dest_key; (void) dest_val; (void) src_key; (void) src_val;
80 
81     unsigned int dbnum;
82     assert(dest_db->descriptor->dbt.size == sizeof dbnum);
83     memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
84     assert(dbnum < src_val->size / sizeof (int));
85 
86     int *pri_key = (int *) src_key->data;
87     int *pri_data = (int *) src_val->data;
88 
89     switch (dest_key->flags) {
90     case 0:
91         dest_key->size = sizeof (int);
92         dest_key->data = dbnum == 0 ? &pri_key[dbnum] : &pri_data[dbnum];
93         break;
94     case DB_DBT_REALLOC:
95         dest_key->size = sizeof (int);
96         dest_key->data = toku_realloc(dest_key->data, dest_key->size);
97         memcpy(dest_key->data, dbnum == 0 ? &pri_key[dbnum] : &pri_data[dbnum], dest_key->size);
98         break;
99     default:
100         assert(0);
101     }
102 
103     if (dest_val) {
104         switch (dest_val->flags) {
105         case 0:
106             if (dbnum == 0) {
107                 dest_val->size = src_val->size;
108                 dest_val->data = src_val->data;
109             } else
110                 dest_val->size = 0;
111             break;
112         case DB_DBT_REALLOC:
113             if (dbnum == 0) {
114                 dest_val->size = src_val->size;
115                 dest_val->data = toku_realloc(dest_val->data, dest_val->size);
116                 memcpy(dest_val->data, src_val->data, dest_val->size);
117             } else
118                 dest_val->size = 0;
119             break;
120         default:
121             assert(0);
122         }
123     }
124 
125     return 0;
126 }
127 
128 static int
del_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_key_arrays,const DBT * src_key,const DBT * src_data)129 del_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_key_arrays, const DBT *src_key, const DBT *src_data) {
130     return put_callback(dest_db, src_db, dest_key_arrays, NULL, src_key, src_data);
131 }
132 
133 #if 0
134 static void
135 verify_locked(DB_ENV *env, DB *db, int k) {
136     int r;
137     DB_TXN *txn = NULL;
138     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
139     DBT key; dbt_init(&key, &k, sizeof k);
140     r = db->del(db, txn, &key, DB_DELETE_ANY); assert(r == DB_LOCK_NOTGRANTED);
141     r = txn->abort(txn); assert_zero(r);
142 }
143 
144 static void
145 verify_empty(DB_ENV *env, DB *db) {
146     int r;
147     DB_TXN *txn = NULL;
148     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
149 
150     DBC *cursor = NULL;
151     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
152     int i;
153     for (i = 0; ; i++) {
154         DBT key; memset(&key, 0, sizeof key);
155         DBT val; memset(&val, 0, sizeof val);
156         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
157         if (r != 0)
158             break;
159     }
160     assert_zero(i);
161     r = cursor->c_close(cursor); assert_zero(r);
162     r = txn->commit(txn, 0); assert_zero(r);
163 }
164 #endif
165 
166 static void
verify_seq(DB_ENV * env,DB * db,int dbnum,int ndbs,int nrows)167 verify_seq(DB_ENV *env, DB *db, int dbnum, int ndbs, int nrows) {
168     int r;
169     DB_TXN *txn = NULL;
170     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
171 
172     DBC *cursor = NULL;
173     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
174     int i;
175     for (i = 0; ; i++) {
176         DBT key; memset(&key, 0, sizeof key);
177         DBT val; memset(&val, 0, sizeof val);
178         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
179         if (r != 0)
180             break;
181         int k;
182         int expectk;
183         if (dbnum == 0 || (i % ndbs) != dbnum)
184             expectk = get_key(i, dbnum);
185         else
186             expectk = get_new_key(i, dbnum);
187 
188         assert(key.size == sizeof k);
189         memcpy(&k, key.data, key.size);
190         assert(k == expectk);
191 
192         if (dbnum == 0) {
193             assert(val.size == ndbs * sizeof (int));
194             int v[ndbs]; get_new_data(v, i, ndbs);
195             assert(memcmp(val.data, v, val.size) == 0);
196         } else
197             assert(val.size == 0);
198     }
199     assert(i == nrows); // if (i != nrows) printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, i, nrows); // assert(i == nrows);
200     r = cursor->c_close(cursor); assert_zero(r);
201     r = txn->commit(txn, 0); assert_zero(r);
202 }
203 
204 static void
update_diagonal(DB_ENV * env,DB * db[],int ndbs,int nrows)205 update_diagonal(DB_ENV *env, DB *db[], int ndbs, int nrows) {
206     assert(ndbs > 0);
207     int r;
208     DB_TXN *txn = NULL;
209     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
210     for (int i = 0; i < nrows; i++) {
211 
212         // update the data i % ndbs col from x to x+1
213 
214         int k = get_key(i, 0);
215         DBT old_key; dbt_init(&old_key, &k, sizeof k);
216         DBT new_key = old_key;
217 
218         int v[ndbs]; get_data(v, i, ndbs);
219         DBT old_data; dbt_init(&old_data, &v[0], sizeof v);
220 
221         int newv[ndbs]; get_new_data(newv, i, ndbs);
222         DBT new_data; dbt_init(&new_data, &newv[0], sizeof newv);
223 
224         int ndbts = 2 * ndbs;
225         DBT keys[ndbts]; memset(keys, 0, sizeof keys);
226         DBT vals[ndbts]; memset(vals, 0, sizeof vals);
227         uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
228 
229         r = env_update_multiple_test_no_array(env, ndbs > 0 ? db[0] : NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals);
230         assert_zero(r);
231     }
232     r = txn->commit(txn, 0); assert_zero(r);
233 }
234 
235 static void
populate_primary(DB_ENV * env,DB * db,int ndbs,int nrows)236 populate_primary(DB_ENV *env, DB *db, int ndbs, int nrows) {
237     int r;
238     DB_TXN *txn = NULL;
239     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
240 
241     // populate
242     for (int i = 0; i < nrows; i++) {
243         int k = get_key(i, 0);
244         int v[ndbs]; get_data(v, i, ndbs);
245         DBT key; dbt_init(&key, &k, sizeof k);
246         DBT val; dbt_init(&val, &v[0], sizeof v);
247         r = db->put(db, txn, &key, &val, 0); assert_zero(r);
248     }
249 
250     r = txn->commit(txn, 0); assert_zero(r);
251 }
252 
253 static void
populate_secondary(DB_ENV * env,DB * db,int dbnum,int nrows)254 populate_secondary(DB_ENV *env, DB *db, int dbnum, int nrows) {
255     int r;
256     DB_TXN *txn = NULL;
257     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
258 
259     // populate
260     for (int i = 0; i < nrows; i++) {
261         int k = get_key(i, dbnum);
262         DBT key; dbt_init(&key, &k, sizeof k);
263         DBT val; dbt_init(&val, NULL, 0);
264         r = db->put(db, txn, &key, &val, 0); assert_zero(r);
265     }
266 
267     r = txn->commit(txn, 0); assert_zero(r);
268 }
269 
270 static void
run_test(int ndbs,int nrows)271 run_test(int ndbs, int nrows) {
272     int r;
273     DB_ENV *env = NULL;
274     r = db_env_create(&env, 0); assert_zero(r);
275 
276     r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
277     r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
278 
279     r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
280 
281     DB *db[ndbs];
282     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
283         r = db_create(&db[dbnum], env, 0); assert_zero(r);
284 
285         DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
286         char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
287         r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
288         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
289                 { int chk_r = db[dbnum]->change_descriptor(db[dbnum], txn_desc, &dbt_dbnum, 0); CKERR(chk_r); }
290         });
291     }
292 
293     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
294         if (dbnum == 0)
295             populate_primary(env, db[dbnum], ndbs, nrows);
296         else
297             populate_secondary(env, db[dbnum], dbnum, nrows);
298     }
299 
300     update_diagonal(env, db, ndbs, nrows);
301     for (int dbnum = 0; dbnum < ndbs; dbnum++)
302         verify_seq(env, db[dbnum], dbnum, ndbs, nrows);
303     for (int dbnum = 0; dbnum < ndbs; dbnum++)
304         r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
305 
306     r = env->close(env, 0); assert_zero(r);
307 }
308 
309 int
test_main(int argc,char * const argv[])310 test_main(int argc, char * const argv[]) {
311     int r;
312     int ndbs = 2;
313     int nrows = 2;
314 
315     // parse_args(argc, argv);
316     for (int i = 1; i < argc; i++) {
317         char * const arg = argv[i];
318         if (strcmp(arg, "-v") == 0) {
319             verbose++;
320             continue;
321         }
322         if (strcmp(arg, "-q") == 0) {
323             verbose = 0;
324             continue;
325         }
326         if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
327             ndbs = atoi(argv[++i]);
328             continue;
329         }
330         if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
331             nrows = atoi(argv[++i]);
332             continue;
333         }
334     }
335 
336     toku_os_recursive_delete(TOKU_TEST_FILENAME);
337     r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
338 
339     run_test(ndbs, nrows);
340 
341     return 0;
342 }
343 
344