1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 // verify that update_multiple where we change the data in row[i] col[j] from x to x+1
42 
43 static const int MAX_KEYS = 3;
44 
45 static int
array_size(int ndbs)46 array_size(int ndbs) {
47     return +
48            1 + // 0 for old 1 for new
49            1 + // ndbs
50            2 * MAX_KEYS * (ndbs-1);
51 }
52 static int
get_num_new_keys(int i,int dbnum)53 get_num_new_keys(int i, int dbnum) {
54     if (dbnum == 0) return 1;
55     if (i & (1<<4)) {
56         dbnum++;  // Shift every once in a while.
57     }
58     return (i + dbnum) % MAX_KEYS;  // 0, 1, or 2
59 }
60 
61 static int
get_old_num_keys(int i,int dbnum)62 get_old_num_keys(int i, int dbnum) {
63     if (dbnum == 0) return 1;
64     return (i + dbnum) % MAX_KEYS;  // 0, 1, or 2
65 }
66 
67 static int
get_total_secondary_rows(int num_primary)68 get_total_secondary_rows(int num_primary) {
69     assert(num_primary % MAX_KEYS == 0);
70     return num_primary / MAX_KEYS * (0 + 1 + 2);
71 }
72 
73 static int
get_old_key(int i,int dbnum,int which)74 get_old_key(int i, int dbnum, int which) {
75     assert(i <  INT16_MAX / 2);
76     assert(which >= 0);
77     assert(which < 4);
78     assert(dbnum < 16);
79     if (dbnum == 0) {
80         assert(which == 0);
81         return htonl(2*i);
82     }
83     if (which >= get_old_num_keys(i, dbnum)) {
84         return htonl(-1);
85     }
86     return htonl(((2*i+0) << 16) + (dbnum<<8) + (which<<1));
87 }
88 
89 static int
get_new_key(int i,int dbnum,int which)90 get_new_key(int i, int dbnum, int which) {
91     assert(which >= 0);
92     assert(which < 4);
93     assert(dbnum < 16);
94 
95     if (dbnum == 0) {
96         assert(which == 0);
97         return htonl(2*i);
98     }
99     if (which >= get_num_new_keys(i, dbnum)) {
100         return htonl(-1);
101     }
102     if ((i+dbnum+which) & (1<<5)) {
103         return htonl(((2*i+0) << 16) + (dbnum<<8) + (which<<1));  // no change from original
104     }
105     return htonl(((2*i+0) << 16) + (dbnum<<8) + (which<<1) + 1);
106 }
107 
108 static void
fill_data_2_and_later(int * v,int i,int ndbs)109 fill_data_2_and_later(int *v, int i, int ndbs) {
110     int index = 2;
111     for (int dbnum = 1; dbnum < ndbs; dbnum++) {
112         for (int which = 0; which < MAX_KEYS; ++which) {
113             v[index++] = get_old_key(i, dbnum, which);
114         }
115     }
116     for (int dbnum = 1; dbnum < ndbs; dbnum++) {
117         for (int which = 0; which < MAX_KEYS; ++which) {
118             v[index++] = get_new_key(i, dbnum, which);
119         }
120     }
121 }
122 
123 
124 static void
fill_old_data(int * v,int i,int ndbs)125 fill_old_data(int *v, int i, int ndbs) {
126     v[0] = 0;
127     v[1] = ndbs;
128     fill_data_2_and_later(v, i, ndbs);
129 }
130 
131 static void
fill_new_data(int * v,int i,int ndbs)132 fill_new_data(int *v, int i, int ndbs) {
133     v[0] = 1;
134     v[1] = ndbs;
135     fill_data_2_and_later(v, i, ndbs);
136 }
137 
138 
139 static int
put_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_key_arrays,DBT_ARRAY * dest_val_arrays,const DBT * src_key,const DBT * src_val)140 put_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_key_arrays, DBT_ARRAY *dest_val_arrays, const DBT *src_key, const DBT *src_val) {
141     (void)src_val;
142     assert(src_db != dest_db);
143     assert(src_db);
144     int dbnum;
145     assert(dest_db->descriptor->dbt.size == sizeof dbnum);
146     memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
147     assert(dbnum > 0);
148 
149     int pri_key = *(int *) src_key->data;
150     int* pri_val = (int*) src_val->data;
151 
152     bool is_new = pri_val[0] == 1;
153     int i = (ntohl(pri_key)) / 2;
154 
155     int num_keys = is_new ? get_num_new_keys(i, dbnum) : get_old_num_keys(i, dbnum);
156 
157     toku_dbt_array_resize(dest_key_arrays, num_keys);
158 
159     if (dest_val_arrays) {
160         toku_dbt_array_resize(dest_val_arrays, num_keys);
161     }
162 
163     int ndbs = pri_val[1];
164     int index = 2 + (dbnum-1)*MAX_KEYS;
165     if (is_new) {
166         index += MAX_KEYS*(ndbs-1);
167     }
168 
169     assert(src_val->size % sizeof(int) == 0);
170     assert((int)src_val->size / 4 >= index + num_keys);
171 
172 
173     for (int which = 0; which < num_keys; which++) {
174         DBT *dest_key = &dest_key_arrays->dbts[which];
175         DBT *dest_val = NULL;
176 
177         assert(dest_key->flags == DB_DBT_REALLOC);
178         if (dest_key->ulen < sizeof(int)) {
179             dest_key->data = toku_xrealloc(dest_key->data, sizeof(int));
180             dest_key->ulen = sizeof(int);
181         }
182         dest_key->size = sizeof(int);
183         if (dest_val_arrays) {
184             dest_val = &dest_val_arrays->dbts[which];
185             assert(dest_val->flags == DB_DBT_REALLOC);
186             dest_val->size = 0;
187         }
188         int new_key = is_new ? get_new_key(i, dbnum, which) : get_old_key(i, dbnum, which);
189         assert(new_key == pri_val[index + which]);
190         *(int*)dest_key->data = new_key;
191     }
192     return 0;
193 }
194 
195 static int
del_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_key_arrays,const DBT * src_key,const DBT * src_data)196 del_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_key_arrays, const DBT *src_key, const DBT *src_data) {
197     return put_callback(dest_db, src_db, dest_key_arrays, NULL, src_key, src_data);
198 }
199 
200 static void
do_updates(DB_ENV * env,DB * db[],int ndbs,int nrows)201 do_updates(DB_ENV *env, DB *db[], int ndbs, int nrows) {
202     assert(ndbs > 0);
203     int r;
204     DB_TXN *txn = NULL;
205     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
206     int narrays = 2 * ndbs;
207     DBT_ARRAY keys[narrays];
208     DBT_ARRAY vals[narrays];
209     for (int i = 0; i < narrays; i++) {
210         toku_dbt_array_init(&keys[i], 1);
211         toku_dbt_array_init(&vals[i], 1);
212     }
213 
214     for (int i = 0; i < nrows; i++) {
215 
216         // update the data i % ndbs col from x to x+1
217 
218         int old_k = get_old_key(i, 0, 0);
219         DBT old_key; dbt_init(&old_key, &old_k, sizeof old_k);
220         int new_k = get_new_key(i, 0, 0);
221         DBT new_key; dbt_init(&new_key, &new_k, sizeof new_k);
222 
223         int v[array_size(ndbs)]; fill_old_data(v, i, ndbs);
224         DBT old_data; dbt_init(&old_data, &v[0], sizeof v);
225 
226         int newv[array_size(ndbs)]; fill_new_data(newv, i, ndbs);
227         DBT new_data; dbt_init(&new_data, &newv[0], sizeof newv);
228 
229         uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
230 
231         r = env->update_multiple(env, db[0], txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, narrays, keys, narrays, vals);
232         assert_zero(r);
233     }
234     for (int i = 0; i < narrays; i++) {
235         toku_dbt_array_destroy(&keys[i]);
236         toku_dbt_array_destroy(&vals[i]);
237     }
238     r = txn->commit(txn, 0); assert_zero(r);
239 }
240 
241 static void
populate_primary(DB_ENV * env,DB * db,int ndbs,int nrows)242 populate_primary(DB_ENV *env, DB *db, int ndbs, int nrows) {
243     int r;
244     DB_TXN *txn = NULL;
245     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
246 
247     // populate
248     for (int i = 0; i < nrows; i++) {
249         int k = get_old_key(i, 0, 0);
250         int v[array_size(ndbs)];
251         fill_old_data(v, i, ndbs);
252         DBT key; dbt_init(&key, &k, sizeof k);
253         DBT val; dbt_init(&val, &v[0], sizeof v);
254         r = db->put(db, txn, &key, &val, 0); assert_zero(r);
255     }
256 
257     r = txn->commit(txn, 0); assert_zero(r);
258 }
259 
260 static void
populate_secondary(DB_ENV * env,DB * db,int dbnum,int nrows)261 populate_secondary(DB_ENV *env, DB *db, int dbnum, int nrows) {
262     int r;
263     DB_TXN *txn = NULL;
264     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
265 
266     // populate
267     for (int i = 0; i < nrows; i++) {
268         for (int which = 0; which < MAX_KEYS; which++) {
269             int k = get_old_key(i, dbnum, which);
270             if (k >= 0) {
271                 DBT key; dbt_init(&key, &k, sizeof k);
272                 DBT val; dbt_init(&val, NULL, 0);
273                 r = db->put(db, txn, &key, &val, 0); assert_zero(r);
274             }
275         }
276     }
277 
278     r = txn->commit(txn, 0); assert_zero(r);
279 }
280 
281 static void
verify_pri_seq(DB_ENV * env,DB * db,int ndbs,int nrows)282 verify_pri_seq(DB_ENV *env, DB *db, int ndbs, int nrows) {
283     const int dbnum = 0;
284     int r;
285     DB_TXN *txn = NULL;
286     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
287 
288     DBC *cursor = NULL;
289     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
290     int i;
291     for (i = 0; ; i++) {
292         DBT key; memset(&key, 0, sizeof key);
293         DBT val; memset(&val, 0, sizeof val);
294         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
295         if (r != 0)
296             break;
297         int k;
298         int expectk = get_new_key(i, dbnum, 0);
299 
300         assert(key.size == sizeof k);
301         memcpy(&k, key.data, key.size);
302         assert(k == expectk);
303 
304         int num_keys = array_size(ndbs);
305         assert(val.size == num_keys*sizeof(int));
306         int v[num_keys]; fill_new_data(v, i, ndbs);
307         assert(memcmp(val.data, v, val.size) == 0);
308     }
309     assert(i == nrows); // if (i != nrows) printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, i, nrows); // assert(i == nrows);
310     r = cursor->c_close(cursor); assert_zero(r);
311     r = txn->commit(txn, 0); assert_zero(r);
312 }
313 
314 static void
verify_sec_seq(DB_ENV * env,DB * db,int dbnum,int nrows)315 verify_sec_seq(DB_ENV *env, DB *db, int dbnum, int nrows) {
316     assert(dbnum > 0);
317     int r;
318     DB_TXN *txn = NULL;
319     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
320 
321     DBC *cursor = NULL;
322     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
323     int i;
324     int rows_found = 0;
325 
326     for (i = 0; ; i++) {
327         int num_keys = get_num_new_keys(i, dbnum);
328         for (int which = 0; which < num_keys; ++which) {
329             DBT key; memset(&key, 0, sizeof key);
330             DBT val; memset(&val, 0, sizeof val);
331             r = cursor->c_get(cursor, &key, &val, DB_NEXT);
332             if (r != 0) {
333                 CKERR2(r, DB_NOTFOUND);
334                 goto done;
335             }
336             rows_found++;
337             int k;
338             int expectk = get_new_key(i, dbnum, which);
339 
340             assert(key.size == sizeof k);
341             memcpy(&k, key.data, key.size);
342             int got_i = (ntohl(k) >> 16) / 2;
343             if (got_i < i) {
344                 // Will fail.  Too many old i's
345                 assert(k == expectk);
346             } else if (got_i > i) {
347                 // Will fail.  Too few in previous i.
348                 assert(k == expectk);
349             }
350 
351             if (k != expectk && which < get_old_num_keys(i, dbnum) && k == get_old_key(i, dbnum, which)) {
352                 // Will fail, never got updated.
353                 assert(k == expectk);
354             }
355             assert(k == expectk);
356             assert(val.size == 0);
357         }
358     }
359 done:
360     assert(rows_found == get_total_secondary_rows(nrows));
361     r = cursor->c_close(cursor); assert_zero(r);
362     r = txn->commit(txn, 0); assert_zero(r);
363 }
364 
365 static void
run_test(int ndbs,int nrows)366 run_test(int ndbs, int nrows) {
367     int r;
368     DB_ENV *env = NULL;
369     r = db_env_create(&env, 0); assert_zero(r);
370 
371     r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
372     r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
373 
374     r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
375 
376     DB *db[ndbs];
377     for (int dbnum = 0; dbnum < ndbs; dbnum++) {
378         r = db_create(&db[dbnum], env, 0); assert_zero(r);
379 
380         DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
381 
382         char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
383         r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
384         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
385                 { int chk_r = db[dbnum]->change_descriptor(db[dbnum], txn_desc, &dbt_dbnum, 0); CKERR(chk_r); }
386         });
387     }
388 
389     populate_primary(env, db[0], ndbs, nrows);
390     for (int dbnum = 1; dbnum < ndbs-1; dbnum++) {
391         populate_secondary(env, db[dbnum], dbnum, nrows);
392     }
393 
394     DB_TXN *indexer_txn = NULL;
395     r = env->txn_begin(env, NULL, &indexer_txn, 0); assert_zero(r);
396 
397     DB_INDEXER *indexer = NULL;
398     uint32_t db_flags = 0;
399     assert(ndbs > 2);
400     r = env->create_indexer(env, indexer_txn, &indexer, db[0], 1, &db[ndbs-1], &db_flags, 0); assert_zero(r);
401 
402     do_updates(env, db, ndbs, nrows);
403 
404     r = indexer->build(indexer); assert_zero(r);
405     r = indexer->close(indexer); assert_zero(r);
406 
407     r = indexer_txn->commit(indexer_txn, 0); assert_zero(r);
408 
409     verify_pri_seq(env, db[0], ndbs, nrows);
410     for (int dbnum = 1; dbnum < ndbs; dbnum++)
411         verify_sec_seq(env, db[dbnum], dbnum, nrows);
412     for (int dbnum = 0; dbnum < ndbs; dbnum++)
413         r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
414 
415     r = env->close(env, 0); assert_zero(r);
416 }
417 
418 int
test_main(int argc,char * const argv[])419 test_main(int argc, char * const argv[]) {
420     int r;
421     int ndbs = 10;
422     int nrows = MAX_KEYS*(1<<5)*4;
423 
424     // parse_args(argc, argv);
425     for (int i = 1; i < argc; i++) {
426         char * const arg = argv[i];
427         if (strcmp(arg, "-v") == 0) {
428             verbose++;
429             continue;
430         }
431         if (strcmp(arg, "-q") == 0) {
432             verbose = 0;
433             continue;
434         }
435         if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
436             ndbs = atoi(argv[++i]);
437             continue;
438         }
439         if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
440             nrows = atoi(argv[++i]);
441             continue;
442         }
443     }
444     while (nrows % (MAX_KEYS*(1<<5)) != 0) {
445         nrows++;
446     }
447     //Need at least one to update, and one to index
448     while (ndbs < 3) {
449         ndbs++;
450     }
451 
452     toku_os_recursive_delete(TOKU_TEST_FILENAME);
453     r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
454 
455     run_test(ndbs, nrows);
456 
457     return 0;
458 }
459 
460