1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 #include "toku_pthread.h"
41 #include <db.h>
42 #include <sys/stat.h>
43 
44 DB_ENV *env;
45 enum {MAX_NAME=128};
46 enum {MAX_DBS=256};
47 int NUM_DBS=5;
48 int NUM_ROWS=100000;
49 int CHECK_RESULTS=0;
50 int DISALLOW_PUTS=0;
51 int COMPRESS=0;
52 enum {MAGIC=311};
53 
54 bool dup_row_at_end = false; // false: duplicate at the beginning.  true: duplicate at the end.   The duplicated row is row 0.
55 int  dup_row_id     = 0;     // 0 means to use row 1 if inserting at the end, row NUM_ROWS if inserting at the beginning.  Otherwise insert the row specified here.
56 
57 //
58 //   Functions to create unique key/value pairs, row generators, checkers, ... for each of NUM_DBS
59 //
60 
61 //   a is the bit-wise permute table.  For DB[i], permute bits as described in a[i] using 'twiddle32'
62 // inv is the inverse bit-wise permute of a[].  To get the original value from a twiddled value, twiddle32 (again) with inv[]
63 int   a[MAX_DBS][32];
64 int inv[MAX_DBS][32];
65 
66 
67 // rotate right and left functions
rotr32(const unsigned int x,const unsigned int num)68 static inline unsigned int rotr32(const unsigned int x, const unsigned int num) {
69     const unsigned int n = num % 32;
70     return (x >> n) | ( x << (32 - n));
71 }
rotl32(const unsigned int x,const unsigned int num)72 static inline unsigned int rotl32(const unsigned int x, const unsigned int num) {
73     const unsigned int n = num % 32;
74     return (x << n) | ( x >> (32 - n));
75 }
76 
generate_permute_tables(void)77 static void generate_permute_tables(void) {
78     int i, j, tmp;
79     for(int db=0;db<MAX_DBS;db++) {
80         for(i=0;i<32;i++) {
81             a[db][i] = i;
82         }
83         for(i=0;i<32;i++) {
84             j = random() % (i + 1);
85             tmp = a[db][j];
86             a[db][j] = a[db][i];
87             a[db][i] = tmp;
88         }
89 //        if(db < NUM_DBS){ printf("a[%d] = ", db); for(i=0;i<32;i++) { printf("%2d ", a[db][i]); } printf("\n");}
90         for(i=0;i<32;i++) {
91             inv[db][a[db][i]] = i;
92         }
93     }
94 }
95 
96 // permute bits of x based on permute table bitmap
twiddle32(unsigned int x,int db)97 static unsigned int twiddle32(unsigned int x, int db)
98 {
99     unsigned int b = 0;
100     for(int i=0;i<32;i++) {
101         b |= (( x >> i ) & 1) << a[db][i];
102     }
103     return b;
104 }
105 
106 // permute bits of x based on inverse permute table bitmap
inv_twiddle32(unsigned int x,int db)107 static unsigned int inv_twiddle32(unsigned int x, int db)
108 {
109     unsigned int b = 0;
110     for(int i=0;i<32;i++) {
111         b |= (( x >> i ) & 1) << inv[db][i];
112     }
113     return b;
114 }
115 
116 // generate val from key, index
generate_val(int key,int i)117 static unsigned int generate_val(int key, int i) {
118     return rotl32((key + MAGIC), i);
119 }
pkey_for_val(int key,int i)120 static unsigned int pkey_for_val(int key, int i) {
121     return rotr32(key, i) - MAGIC;
122 }
123 
124 // There is no handlerton in this test, so this function is a local replacement
125 // for the handlerton's generate_row_for_put().
put_multiple_generate(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)126 static int put_multiple_generate(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) {
127     toku_dbt_array_resize(dest_keys, 1);
128     toku_dbt_array_resize(dest_vals, 1);
129     DBT *dest_key = &dest_keys->dbts[0];
130     DBT *dest_val = &dest_vals->dbts[0];
131 
132     (void) src_db;
133 
134     uint32_t which = *(uint32_t*)dest_db->app_private;
135 
136     if ( which == 0 ) {
137         if (dest_key->flags==DB_DBT_REALLOC) {
138             if (dest_key->data) toku_free(dest_key->data);
139             dest_key->flags = 0;
140             dest_key->ulen  = 0;
141         }
142         if (dest_val->flags==DB_DBT_REALLOC) {
143             if (dest_val->data) toku_free(dest_val->data);
144             dest_val->flags = 0;
145             dest_val->ulen  = 0;
146         }
147         dbt_init(dest_key, src_key->data, src_key->size);
148         dbt_init(dest_val, src_val->data, src_val->size);
149     }
150     else {
151         assert(dest_key->flags==DB_DBT_REALLOC);
152         if (dest_key->ulen < sizeof(unsigned int)) {
153             dest_key->data = toku_xrealloc(dest_key->data, sizeof(unsigned int));
154             dest_key->ulen = sizeof(unsigned int);
155         }
156         assert(dest_val->flags==DB_DBT_REALLOC);
157         if (dest_val->ulen < sizeof(unsigned int)) {
158             dest_val->data = toku_xrealloc(dest_val->data, sizeof(unsigned int));
159             dest_val->ulen = sizeof(unsigned int);
160         }
161         unsigned int *new_key = (unsigned int *)dest_key->data;
162         unsigned int *new_val = (unsigned int *)dest_val->data;
163 
164         *new_key = twiddle32(*(unsigned int*)src_key->data, which);
165         *new_val = generate_val(*(unsigned int*)src_key->data, which);
166 
167         dest_key->size = sizeof(unsigned int);
168         dest_val->size = sizeof(unsigned int);
169         //data is already set above
170     }
171 
172 //    printf("dest_key.data = %d\n", *(int*)dest_key->data);
173 //    printf("dest_val.data = %d\n", *(int*)dest_val->data);
174 
175     return 0;
176 }
177 
178 
check_results(DB ** dbs)179 static void check_results(DB **dbs)
180 {
181     for(int j=0;j<NUM_DBS;j++){
182         DBT key, val;
183         unsigned int k=0, v=0;
184         dbt_init(&key, &k, sizeof(unsigned int));
185         dbt_init(&val, &v, sizeof(unsigned int));
186         int r;
187         unsigned int pkey_for_db_key;
188 
189         DB_TXN *txn;
190         r = env->txn_begin(env, NULL, &txn, 0);
191         CKERR(r);
192 
193         DBC *cursor;
194         r = dbs[j]->cursor(dbs[j], txn, &cursor, 0);
195         CKERR(r);
196         for(int i=0;i<NUM_ROWS;i++) {
197             r = cursor->c_get(cursor, &key, &val, DB_NEXT);
198             if (DISALLOW_PUTS) {
199                 CKERR2(r, EINVAL);
200             } else {
201                 CKERR(r);
202                 k = *(unsigned int*)key.data;
203                 pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j);
204                 v = *(unsigned int*)val.data;
205                 // test that we have the expected keys and values
206                 assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j));
207 //            printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j));
208             }
209         }
210         {printf("."); fflush(stdout);}
211         r = cursor->c_close(cursor);
212         CKERR(r);
213         r = txn->commit(txn, 0);
214         CKERR(r);
215     }
216     printf("\nCheck OK\n");
217 }
218 
219 struct error_extra {
220     int bad_i;
221     int error_count;
222 };
223 
error_callback(DB * db,int which_db,int err,DBT * key,DBT * val,void * extra)224 static void error_callback (DB *db, int which_db, int err, DBT *key, DBT *val, void *extra) {
225     assert(db);
226     assert(extra);
227     assert(err==DB_KEYEXIST);
228     assert(which_db>=0);
229     assert(key->size==4);
230     assert(which_db==0);
231     struct error_extra *e =(struct error_extra *)extra;
232     assert(e->bad_i == *(int*)key->data);
233     (void)val;
234     assert(e->error_count==0);
235     e->error_count++;
236 }
237 
test_loader(DB ** dbs)238 static void test_loader(DB **dbs)
239 {
240     int r;
241     DB_TXN    *txn;
242     DB_LOADER *loader;
243     uint32_t db_flags[MAX_DBS];
244     uint32_t dbt_flags[MAX_DBS];
245     for(int i=0;i<MAX_DBS;i++) {
246         db_flags[i] = DB_NOOVERWRITE;
247         dbt_flags[i] = 0;
248     }
249     uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
250 
251     // create and initialize loader
252     r = env->txn_begin(env, NULL, &txn, 0);
253     CKERR(r);
254     r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
255     CKERR(r);
256     struct error_extra error_extra = {.bad_i = 0, .error_count=0};
257     r = loader->set_error_callback(loader, error_callback, (void*)&error_extra);
258     CKERR(r);
259     r = loader->set_poll_function(loader, NULL, NULL);
260     CKERR(r);
261 
262     // using loader->put, put values into DB
263     DBT key, val;
264     unsigned int k, v;
265     if (!dup_row_at_end) {
266 	// put a duplicate row in.
267 	int i = dup_row_id==0 ? NUM_ROWS : dup_row_id;
268         k = i;
269         v = generate_val(i, 0);
270         dbt_init(&key, &k, sizeof(unsigned int));
271         dbt_init(&val, &v, sizeof(unsigned int));
272         r = loader->put(loader, &key, &val);
273         CKERR(r);
274         if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
275 	error_extra.bad_i = i;
276     }
277     for(int i=1;i<=NUM_ROWS;i++) {
278         k = i;
279         v = generate_val(i, 0);
280         dbt_init(&key, &k, sizeof(unsigned int));
281         dbt_init(&val, &v, sizeof(unsigned int));
282         r = loader->put(loader, &key, &val);
283         if (DISALLOW_PUTS) {
284             CKERR2(r, EINVAL);
285         } else {
286             CKERR(r);
287         }
288         if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
289     }
290     if (dup_row_at_end) {
291 	// put a duplicate row in.
292 	int i = dup_row_id==0 ? 1 : dup_row_id;
293         k = i;
294         v = generate_val(i, 0);
295         dbt_init(&key, &k, sizeof(unsigned int));
296         dbt_init(&val, &v, sizeof(unsigned int));
297         r = loader->put(loader, &key, &val);
298         CKERR(r);
299         if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
300 	error_extra.bad_i = i;
301     }
302 
303     if( CHECK_RESULTS || verbose ) {printf("\n"); fflush(stdout);}
304 
305     // close the loader
306     if (verbose) { printf("closing"); fflush(stdout); }
307     r = loader->close(loader);
308     if (verbose) {  printf(" done\n"); }
309     if (NUM_ROWS > 0) {
310         assert(r==DB_KEYEXIST);
311         assert(error_extra.error_count==1);
312     }
313 
314     r = txn->commit(txn, 0);
315     CKERR(r);
316 
317     // verify the DBs
318     if ( CHECK_RESULTS ) {
319         check_results(dbs);
320     }
321 }
322 
323 char *free_me = NULL;
324 const char *env_dir = TOKU_TEST_FILENAME; // the default env_dir
325 
run_test(void)326 static void run_test(void)
327 {
328     int r;
329     toku_os_recursive_delete(env_dir);
330     r = toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO);                                                       CKERR(r);
331 
332     r = db_env_create(&env, 0);                                                                               CKERR(r);
333     r = env->set_default_bt_compare(env, uint_dbt_cmp);                                                       CKERR(r);
334     r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
335     CKERR(r);
336     int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG | DB_CREATE | DB_PRIVATE;
337     r = env->open(env, env_dir, envflags, S_IRWXU+S_IRWXG+S_IRWXO);                                            CKERR(r);
338     env->set_errfile(env, stderr);
339     //Disable auto-checkpointing
340     r = env->checkpointing_set_period(env, 0);                                                                CKERR(r);
341 
342     DBT desc;
343     dbt_init(&desc, "foo", sizeof("foo"));
344     char name[MAX_NAME*2];
345 
346     DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS);
347     assert(dbs != NULL);
348     int idx[MAX_DBS];
349     for(int i=0;i<NUM_DBS;i++) {
350         idx[i] = i;
351         r = db_create(&dbs[i], env, 0);                                                                       CKERR(r);
352         dbs[i]->app_private = &idx[i];
353         snprintf(name, sizeof(name), "db_%04x", i);
354         r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666);                                CKERR(r);
355         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
356                 { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
357         });
358     }
359 
360     generate_permute_tables();
361 
362     if (verbose) printf("running test_loader()\n");
363     // -------------------------- //
364     test_loader(dbs);
365     // -------------------------- //
366     if (verbose) printf("done    test_loader()\n");
367 
368     for(int i=0;i<NUM_DBS;i++) {
369         dbs[i]->close(dbs[i], 0);                                                                             CKERR(r);
370         dbs[i] = NULL;
371     }
372     r = env->close(env, 0);                                                                                   CKERR(r);
373     toku_free(dbs);
374 }
375 
376 // ------------ infrastructure ----------
377 static void do_args(int argc, char * const argv[]);
378 
379 int num_rows_set = false;
380 
test_main(int argc,char * const * argv)381 int test_main(int argc, char * const *argv) {
382     do_args(argc, argv);
383     if (num_rows_set)
384 	run_test();
385     else {
386 	int sizes[]={1,4000000,-1};
387         //Make PUT loader take about the same amount of time:
388         if (DISALLOW_PUTS) sizes[1] /= 25;
389 	for (int i=0; sizes[i]>=0; i++) {
390 	    if (verbose) printf("Doing %d\n", sizes[i]);
391 	    NUM_ROWS = sizes[i];
392 	    run_test();
393 	}
394     }
395     if (free_me) toku_free(free_me);
396     return 0;
397 }
398 
do_args(int argc,char * const argv[])399 static void do_args(int argc, char * const argv[]) {
400     int resultcode;
401     char *cmd = argv[0];
402     argc--; argv++;
403     while (argc>0) {
404         if (strcmp(argv[0], "-h")==0) {
405             resultcode=0;
406 	do_usage:
407 	    fprintf(stderr, "Usage: %s -h -c -d %d -r %d\n", cmd, NUM_DBS, NUM_ROWS);
408 	    fprintf(stderr, " where -e <env>         uses <env> to construct the directory (so that different tests can run concurrently)\n");
409 	    fprintf(stderr, "       -s               use size factor of 1 (makes internal loader buffers small so certain cases are easier to test)\n");
410 	    fprintf(stderr, "       -E               duplicate the first row at the end (not the beginning).\n");
411 	    fprintf(stderr, "       -D <rid>         use row id <rid> when duplicating.  (Default is 1 if inserting at end, <numrows> if inserting at beginning\n");
412 	    exit(resultcode);
413 	} else if (strcmp(argv[0], "-v")==0) {
414 	    verbose++;
415 	} else if (strcmp(argv[0],"-q")==0) {
416 	    verbose--;
417 	    if (verbose<0) verbose=0;
418         } else if (strcmp(argv[0], "-d")==0) {
419             argc--; argv++;
420             NUM_DBS = atoi(argv[0]);
421             if ( NUM_DBS > MAX_DBS ) {
422                 fprintf(stderr, "max value for -d field is %d\n", MAX_DBS);
423                 resultcode=1;
424                 goto do_usage;
425             }
426         } else if (strcmp(argv[0], "-r")==0) {
427             argc--; argv++;
428             NUM_ROWS = atoi(argv[0]);
429 	    num_rows_set = true;
430         } else if (strcmp(argv[0], "-c")==0) {
431             CHECK_RESULTS = 1;
432         } else if (strcmp(argv[0], "-z")==0) {
433             COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
434         } else if (strcmp(argv[0], "-p")==0) {
435             DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
436         } else if (strcmp(argv[0], "-s")==0) {
437 	    db_env_set_loader_size_factor(1);
438 	} else if (strcmp(argv[0], "-E")==0) {
439 	    dup_row_at_end = true;
440 	} else if (strcmp(argv[0], "-D")==0) {
441             argc--; argv++;
442 	    dup_row_id = atoi(argv[0]);
443 	} else {
444 	    fprintf(stderr, "Unknown arg: %s\n", argv[0]);
445 	    resultcode=1;
446 	    goto do_usage;
447 	}
448 	argc--;
449 	argv++;
450     }
451     assert(0<=dup_row_id && dup_row_id<=NUM_ROWS);
452 }
453