1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40 #include "toku_pthread.h"
41 #include <db.h>
42 #include <sys/stat.h>
43
44 DB_ENV *env;
45 enum {MAX_NAME=128};
46 enum {MAX_DBS=256};
47 int NUM_DBS=5;
48 int NUM_ROWS=100000;
49 int CHECK_RESULTS=0;
50 int DISALLOW_PUTS=0;
51 int COMPRESS=0;
52 enum {MAGIC=311};
53
54 bool dup_row_at_end = false; // false: duplicate at the beginning. true: duplicate at the end. The duplicated row is row 0.
55 int dup_row_id = 0; // 0 means to use row 1 if inserting at the end, row NUM_ROWS if inserting at the beginning. Otherwise insert the row specified here.
56
57 //
58 // Functions to create unique key/value pairs, row generators, checkers, ... for each of NUM_DBS
59 //
60
61 // a is the bit-wise permute table. For DB[i], permute bits as described in a[i] using 'twiddle32'
62 // inv is the inverse bit-wise permute of a[]. To get the original value from a twiddled value, twiddle32 (again) with inv[]
63 int a[MAX_DBS][32];
64 int inv[MAX_DBS][32];
65
66
67 // rotate right and left functions
rotr32(const unsigned int x,const unsigned int num)68 static inline unsigned int rotr32(const unsigned int x, const unsigned int num) {
69 const unsigned int n = num % 32;
70 return (x >> n) | ( x << (32 - n));
71 }
rotl32(const unsigned int x,const unsigned int num)72 static inline unsigned int rotl32(const unsigned int x, const unsigned int num) {
73 const unsigned int n = num % 32;
74 return (x << n) | ( x >> (32 - n));
75 }
76
generate_permute_tables(void)77 static void generate_permute_tables(void) {
78 int i, j, tmp;
79 for(int db=0;db<MAX_DBS;db++) {
80 for(i=0;i<32;i++) {
81 a[db][i] = i;
82 }
83 for(i=0;i<32;i++) {
84 j = random() % (i + 1);
85 tmp = a[db][j];
86 a[db][j] = a[db][i];
87 a[db][i] = tmp;
88 }
89 // if(db < NUM_DBS){ printf("a[%d] = ", db); for(i=0;i<32;i++) { printf("%2d ", a[db][i]); } printf("\n");}
90 for(i=0;i<32;i++) {
91 inv[db][a[db][i]] = i;
92 }
93 }
94 }
95
96 // permute bits of x based on permute table bitmap
twiddle32(unsigned int x,int db)97 static unsigned int twiddle32(unsigned int x, int db)
98 {
99 unsigned int b = 0;
100 for(int i=0;i<32;i++) {
101 b |= (( x >> i ) & 1) << a[db][i];
102 }
103 return b;
104 }
105
106 // permute bits of x based on inverse permute table bitmap
inv_twiddle32(unsigned int x,int db)107 static unsigned int inv_twiddle32(unsigned int x, int db)
108 {
109 unsigned int b = 0;
110 for(int i=0;i<32;i++) {
111 b |= (( x >> i ) & 1) << inv[db][i];
112 }
113 return b;
114 }
115
116 // generate val from key, index
generate_val(int key,int i)117 static unsigned int generate_val(int key, int i) {
118 return rotl32((key + MAGIC), i);
119 }
pkey_for_val(int key,int i)120 static unsigned int pkey_for_val(int key, int i) {
121 return rotr32(key, i) - MAGIC;
122 }
123
124 // There is no handlerton in this test, so this function is a local replacement
125 // for the handlerton's generate_row_for_put().
put_multiple_generate(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)126 static int put_multiple_generate(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) {
127 toku_dbt_array_resize(dest_keys, 1);
128 toku_dbt_array_resize(dest_vals, 1);
129 DBT *dest_key = &dest_keys->dbts[0];
130 DBT *dest_val = &dest_vals->dbts[0];
131
132 (void) src_db;
133
134 uint32_t which = *(uint32_t*)dest_db->app_private;
135
136 if ( which == 0 ) {
137 if (dest_key->flags==DB_DBT_REALLOC) {
138 if (dest_key->data) toku_free(dest_key->data);
139 dest_key->flags = 0;
140 dest_key->ulen = 0;
141 }
142 if (dest_val->flags==DB_DBT_REALLOC) {
143 if (dest_val->data) toku_free(dest_val->data);
144 dest_val->flags = 0;
145 dest_val->ulen = 0;
146 }
147 dbt_init(dest_key, src_key->data, src_key->size);
148 dbt_init(dest_val, src_val->data, src_val->size);
149 }
150 else {
151 assert(dest_key->flags==DB_DBT_REALLOC);
152 if (dest_key->ulen < sizeof(unsigned int)) {
153 dest_key->data = toku_xrealloc(dest_key->data, sizeof(unsigned int));
154 dest_key->ulen = sizeof(unsigned int);
155 }
156 assert(dest_val->flags==DB_DBT_REALLOC);
157 if (dest_val->ulen < sizeof(unsigned int)) {
158 dest_val->data = toku_xrealloc(dest_val->data, sizeof(unsigned int));
159 dest_val->ulen = sizeof(unsigned int);
160 }
161 unsigned int *new_key = (unsigned int *)dest_key->data;
162 unsigned int *new_val = (unsigned int *)dest_val->data;
163
164 *new_key = twiddle32(*(unsigned int*)src_key->data, which);
165 *new_val = generate_val(*(unsigned int*)src_key->data, which);
166
167 dest_key->size = sizeof(unsigned int);
168 dest_val->size = sizeof(unsigned int);
169 //data is already set above
170 }
171
172 // printf("dest_key.data = %d\n", *(int*)dest_key->data);
173 // printf("dest_val.data = %d\n", *(int*)dest_val->data);
174
175 return 0;
176 }
177
178
check_results(DB ** dbs)179 static void check_results(DB **dbs)
180 {
181 for(int j=0;j<NUM_DBS;j++){
182 DBT key, val;
183 unsigned int k=0, v=0;
184 dbt_init(&key, &k, sizeof(unsigned int));
185 dbt_init(&val, &v, sizeof(unsigned int));
186 int r;
187 unsigned int pkey_for_db_key;
188
189 DB_TXN *txn;
190 r = env->txn_begin(env, NULL, &txn, 0);
191 CKERR(r);
192
193 DBC *cursor;
194 r = dbs[j]->cursor(dbs[j], txn, &cursor, 0);
195 CKERR(r);
196 for(int i=0;i<NUM_ROWS;i++) {
197 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
198 if (DISALLOW_PUTS) {
199 CKERR2(r, EINVAL);
200 } else {
201 CKERR(r);
202 k = *(unsigned int*)key.data;
203 pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j);
204 v = *(unsigned int*)val.data;
205 // test that we have the expected keys and values
206 assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j));
207 // printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j));
208 }
209 }
210 {printf("."); fflush(stdout);}
211 r = cursor->c_close(cursor);
212 CKERR(r);
213 r = txn->commit(txn, 0);
214 CKERR(r);
215 }
216 printf("\nCheck OK\n");
217 }
218
219 struct error_extra {
220 int bad_i;
221 int error_count;
222 };
223
error_callback(DB * db,int which_db,int err,DBT * key,DBT * val,void * extra)224 static void error_callback (DB *db, int which_db, int err, DBT *key, DBT *val, void *extra) {
225 assert(db);
226 assert(extra);
227 assert(err==DB_KEYEXIST);
228 assert(which_db>=0);
229 assert(key->size==4);
230 assert(which_db==0);
231 struct error_extra *e =(struct error_extra *)extra;
232 assert(e->bad_i == *(int*)key->data);
233 (void)val;
234 assert(e->error_count==0);
235 e->error_count++;
236 }
237
test_loader(DB ** dbs)238 static void test_loader(DB **dbs)
239 {
240 int r;
241 DB_TXN *txn;
242 DB_LOADER *loader;
243 uint32_t db_flags[MAX_DBS];
244 uint32_t dbt_flags[MAX_DBS];
245 for(int i=0;i<MAX_DBS;i++) {
246 db_flags[i] = DB_NOOVERWRITE;
247 dbt_flags[i] = 0;
248 }
249 uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
250
251 // create and initialize loader
252 r = env->txn_begin(env, NULL, &txn, 0);
253 CKERR(r);
254 r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
255 CKERR(r);
256 struct error_extra error_extra = {.bad_i = 0, .error_count=0};
257 r = loader->set_error_callback(loader, error_callback, (void*)&error_extra);
258 CKERR(r);
259 r = loader->set_poll_function(loader, NULL, NULL);
260 CKERR(r);
261
262 // using loader->put, put values into DB
263 DBT key, val;
264 unsigned int k, v;
265 if (!dup_row_at_end) {
266 // put a duplicate row in.
267 int i = dup_row_id==0 ? NUM_ROWS : dup_row_id;
268 k = i;
269 v = generate_val(i, 0);
270 dbt_init(&key, &k, sizeof(unsigned int));
271 dbt_init(&val, &v, sizeof(unsigned int));
272 r = loader->put(loader, &key, &val);
273 CKERR(r);
274 if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
275 error_extra.bad_i = i;
276 }
277 for(int i=1;i<=NUM_ROWS;i++) {
278 k = i;
279 v = generate_val(i, 0);
280 dbt_init(&key, &k, sizeof(unsigned int));
281 dbt_init(&val, &v, sizeof(unsigned int));
282 r = loader->put(loader, &key, &val);
283 if (DISALLOW_PUTS) {
284 CKERR2(r, EINVAL);
285 } else {
286 CKERR(r);
287 }
288 if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
289 }
290 if (dup_row_at_end) {
291 // put a duplicate row in.
292 int i = dup_row_id==0 ? 1 : dup_row_id;
293 k = i;
294 v = generate_val(i, 0);
295 dbt_init(&key, &k, sizeof(unsigned int));
296 dbt_init(&val, &v, sizeof(unsigned int));
297 r = loader->put(loader, &key, &val);
298 CKERR(r);
299 if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
300 error_extra.bad_i = i;
301 }
302
303 if( CHECK_RESULTS || verbose ) {printf("\n"); fflush(stdout);}
304
305 // close the loader
306 if (verbose) { printf("closing"); fflush(stdout); }
307 r = loader->close(loader);
308 if (verbose) { printf(" done\n"); }
309 if (NUM_ROWS > 0) {
310 assert(r==DB_KEYEXIST);
311 assert(error_extra.error_count==1);
312 }
313
314 r = txn->commit(txn, 0);
315 CKERR(r);
316
317 // verify the DBs
318 if ( CHECK_RESULTS ) {
319 check_results(dbs);
320 }
321 }
322
323 char *free_me = NULL;
324 const char *env_dir = TOKU_TEST_FILENAME; // the default env_dir
325
run_test(void)326 static void run_test(void)
327 {
328 int r;
329 toku_os_recursive_delete(env_dir);
330 r = toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
331
332 r = db_env_create(&env, 0); CKERR(r);
333 r = env->set_default_bt_compare(env, uint_dbt_cmp); CKERR(r);
334 r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
335 CKERR(r);
336 int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG | DB_CREATE | DB_PRIVATE;
337 r = env->open(env, env_dir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
338 env->set_errfile(env, stderr);
339 //Disable auto-checkpointing
340 r = env->checkpointing_set_period(env, 0); CKERR(r);
341
342 DBT desc;
343 dbt_init(&desc, "foo", sizeof("foo"));
344 char name[MAX_NAME*2];
345
346 DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS);
347 assert(dbs != NULL);
348 int idx[MAX_DBS];
349 for(int i=0;i<NUM_DBS;i++) {
350 idx[i] = i;
351 r = db_create(&dbs[i], env, 0); CKERR(r);
352 dbs[i]->app_private = &idx[i];
353 snprintf(name, sizeof(name), "db_%04x", i);
354 r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
355 IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
356 { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
357 });
358 }
359
360 generate_permute_tables();
361
362 if (verbose) printf("running test_loader()\n");
363 // -------------------------- //
364 test_loader(dbs);
365 // -------------------------- //
366 if (verbose) printf("done test_loader()\n");
367
368 for(int i=0;i<NUM_DBS;i++) {
369 dbs[i]->close(dbs[i], 0); CKERR(r);
370 dbs[i] = NULL;
371 }
372 r = env->close(env, 0); CKERR(r);
373 toku_free(dbs);
374 }
375
376 // ------------ infrastructure ----------
377 static void do_args(int argc, char * const argv[]);
378
379 int num_rows_set = false;
380
test_main(int argc,char * const * argv)381 int test_main(int argc, char * const *argv) {
382 do_args(argc, argv);
383 if (num_rows_set)
384 run_test();
385 else {
386 int sizes[]={1,4000000,-1};
387 //Make PUT loader take about the same amount of time:
388 if (DISALLOW_PUTS) sizes[1] /= 25;
389 for (int i=0; sizes[i]>=0; i++) {
390 if (verbose) printf("Doing %d\n", sizes[i]);
391 NUM_ROWS = sizes[i];
392 run_test();
393 }
394 }
395 if (free_me) toku_free(free_me);
396 return 0;
397 }
398
do_args(int argc,char * const argv[])399 static void do_args(int argc, char * const argv[]) {
400 int resultcode;
401 char *cmd = argv[0];
402 argc--; argv++;
403 while (argc>0) {
404 if (strcmp(argv[0], "-h")==0) {
405 resultcode=0;
406 do_usage:
407 fprintf(stderr, "Usage: %s -h -c -d %d -r %d\n", cmd, NUM_DBS, NUM_ROWS);
408 fprintf(stderr, " where -e <env> uses <env> to construct the directory (so that different tests can run concurrently)\n");
409 fprintf(stderr, " -s use size factor of 1 (makes internal loader buffers small so certain cases are easier to test)\n");
410 fprintf(stderr, " -E duplicate the first row at the end (not the beginning).\n");
411 fprintf(stderr, " -D <rid> use row id <rid> when duplicating. (Default is 1 if inserting at end, <numrows> if inserting at beginning\n");
412 exit(resultcode);
413 } else if (strcmp(argv[0], "-v")==0) {
414 verbose++;
415 } else if (strcmp(argv[0],"-q")==0) {
416 verbose--;
417 if (verbose<0) verbose=0;
418 } else if (strcmp(argv[0], "-d")==0) {
419 argc--; argv++;
420 NUM_DBS = atoi(argv[0]);
421 if ( NUM_DBS > MAX_DBS ) {
422 fprintf(stderr, "max value for -d field is %d\n", MAX_DBS);
423 resultcode=1;
424 goto do_usage;
425 }
426 } else if (strcmp(argv[0], "-r")==0) {
427 argc--; argv++;
428 NUM_ROWS = atoi(argv[0]);
429 num_rows_set = true;
430 } else if (strcmp(argv[0], "-c")==0) {
431 CHECK_RESULTS = 1;
432 } else if (strcmp(argv[0], "-z")==0) {
433 COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
434 } else if (strcmp(argv[0], "-p")==0) {
435 DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
436 } else if (strcmp(argv[0], "-s")==0) {
437 db_env_set_loader_size_factor(1);
438 } else if (strcmp(argv[0], "-E")==0) {
439 dup_row_at_end = true;
440 } else if (strcmp(argv[0], "-D")==0) {
441 argc--; argv++;
442 dup_row_id = atoi(argv[0]);
443 } else {
444 fprintf(stderr, "Unknown arg: %s\n", argv[0]);
445 resultcode=1;
446 goto do_usage;
447 }
448 argc--;
449 argv++;
450 }
451 assert(0<=dup_row_id && dup_row_id<=NUM_ROWS);
452 }
453