1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 #include "toku_pthread.h"
41 #include <db.h>
42 #include <sys/stat.h>
43 
44 DB_ENV *env;
45 enum {MAX_NAME=128};
46 enum {MAX_DBS=16};
47 enum {MAX_ROW_LEN=1024};
48 static int NUM_DBS=10;
49 static int DISALLOW_PUTS=0;
50 static int COMPRESS=0;
51 static int USE_REGION=0;
52 static const char *envdir = TOKU_TEST_FILENAME;
53 
54 static int generate_rows_for_region(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) __attribute__((unused));
55 static int generate_rows_for_lineitem(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) __attribute__((unused));
56 
57 // linenumber,orderkey form a unique, primary key
58 // key is a potentially duplicate secondary key
59 struct tpch_key {
60     uint32_t linenumber;
61     uint32_t orderkey;
62     uint32_t key;
63 };
64 
65 static __attribute__((__unused__)) int
tpch_dbt_cmp(DB * db,const DBT * a,const DBT * b)66 tpch_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
67     assert(db && a && b);
68     assert(a->size == sizeof(struct tpch_key));
69     assert(b->size == sizeof(struct tpch_key));
70 
71     unsigned int xl = (*((struct tpch_key *) a->data)).linenumber;
72     unsigned int xo = (*((struct tpch_key *) a->data)).orderkey;
73     unsigned int xk = (*((struct tpch_key *) a->data)).key;
74 
75     unsigned int yl = (*((struct tpch_key *) b->data)).linenumber;
76     unsigned int yo = (*((struct tpch_key *) b->data)).orderkey;
77     unsigned int yk = (*((struct tpch_key *) b->data)).key;
78 
79 //    printf("tpch_dbt_cmp xl:%d, yl:%d, xo:%d, yo:%d, xk:%d, yk:%d\n", xl, yl, xo, yo, xk, yk);
80 
81     if (xk<yk) return -1;
82     if (xk>yk) return 1;
83 
84     if (xl<yl) return -1;
85     if (xl>yl) return 1;
86 
87     if (xo>yo) return -1;
88     if (xo<yo) return 1;
89     return 0;
90 }
91 
92 
93 static int lineno = 0;
tpch_read_row(FILE * fp,int * key,char * val)94 static char *tpch_read_row(FILE *fp, int *key, char *val)
95 {
96     *key = lineno++;
97     return fgets(val, MAX_ROW_LEN , fp);
98 }
99 
100 
101 /*
102  *   split '|' separated fields into fields array
103  */
tpch_parse_row(char * row,char * fields[],int fields_N)104 static void tpch_parse_row(char *row, char *fields[], int fields_N)
105 {
106     int field = 0;
107     int i = 0;
108     int p = 0;
109     char c = row[p];
110 
111     while(c != '\0')
112     {
113         if ( c == '|') {
114             fields[field][i] = '\0';
115             //printf("field : <%s>\n", fields[field]);
116             field++;
117             i = 0;
118         }
119         else
120             fields[field][i++] = c;
121         c = row[++p];
122     }
123     assert(field == fields_N);
124 }
125 
126 /*
127  *     region table
128  */
129 
generate_rows_for_region(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)130 static int generate_rows_for_region(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val)
131 {
132     toku_dbt_array_resize(dest_keys, 1);
133     toku_dbt_array_resize(dest_vals, 1);
134     DBT *dest_key = &dest_keys->dbts[0];
135     DBT *dest_val = &dest_vals->dbts[0];
136 
137     // not used
138     (void) src_db;
139     (void) src_key;
140     assert(*(uint32_t*)dest_db->app_private == 0);
141 
142     // region fields
143     char regionkey[8];
144     char name[32];
145     char comment[160];
146     char row[8+32+160+8];
147     sprintf(row, "%s", (char*)src_val->data);
148 
149     const uint32_t fields_N = 3;
150     char *fields[3] = {regionkey, name, comment};
151     tpch_parse_row(row, fields, fields_N);
152 
153     if (dest_key->flags==DB_DBT_REALLOC) {
154         if (dest_key->data) toku_free(dest_key->data);
155         dest_key->flags = 0;
156         dest_key->ulen  = 0;
157     }
158     if (dest_val->flags==DB_DBT_REALLOC) {
159         if (dest_val->data) toku_free(dest_val->data);
160         dest_val->flags = 0;
161         dest_val->ulen  = 0;
162     }
163 
164     struct tpch_key *XMALLOC(key);
165     key->orderkey   = atoi(regionkey);
166     key->linenumber = atoi(regionkey);
167     key->key        = atoi(regionkey);
168 
169     char *XMALLOC_N(sizeof(row), val);
170     sprintf(val, "%s|%s", name, comment);
171 
172     dbt_init(dest_key, key, sizeof(struct tpch_key));
173     dest_key->flags = DB_DBT_REALLOC;
174 
175     dbt_init(dest_val, val, strlen(val)+1);
176     dest_val->flags = DB_DBT_REALLOC;
177 
178     return 0;
179 }
180 
181 /*
182  *      lineitem table
183  */
184 
185 
generate_rows_for_lineitem(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)186 static int generate_rows_for_lineitem(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val)
187 {
188     toku_dbt_array_resize(dest_keys, 1);
189     toku_dbt_array_resize(dest_vals, 1);
190     DBT *dest_key = &dest_keys->dbts[0];
191     DBT *dest_val = &dest_vals->dbts[0];
192     // not used
193     (void) src_db;
194     (void) src_key;
195 
196     // lineitem fields
197     char orderkey[16];
198     char partkey[16];
199     char suppkey[16];
200     char linenumber[8];
201     char quantity[8];
202     char extendedprice[16];
203     char discount[8];
204     char tax[8];
205     char returnflag[8];
206     char linestatus[8];
207     char shipdate[16];
208     char commitdate[16];
209     char receiptdate[16];
210     char shipinstruct[32];
211     char shipmode[16];
212     char comment[48];
213     char row[16+16+16+8+8+16+8+8+8+8+16+16+16+32+16+48 + 8];
214     sprintf(row, "%s", (char*)src_val->data);
215 
216     const uint32_t fields_N = 16;
217     char *fields[16] = {orderkey,
218                         partkey,
219                         suppkey,
220                         linenumber,
221                         quantity,
222                         extendedprice,
223                         discount,
224                         tax,
225                         returnflag,
226                         linestatus,
227                         shipdate,
228                         commitdate,
229                         receiptdate,
230                         shipinstruct,
231                         shipmode,
232                         comment};
233     tpch_parse_row(row, fields, fields_N);
234 
235     if (dest_key->flags==DB_DBT_REALLOC) {
236         if (dest_key->data) toku_free(dest_key->data);
237         dest_key->flags = 0;
238         dest_key->ulen  = 0;
239     }
240     if (dest_val->flags==DB_DBT_REALLOC) {
241         if (dest_val->data) toku_free(dest_val->data);
242         dest_val->flags = 0;
243         dest_val->ulen  = 0;
244     }
245 
246     struct tpch_key *XMALLOC(key);
247     key->orderkey   = atoi(linenumber);
248     key->linenumber = atoi(orderkey);
249 
250     char *val;
251     uint32_t which = *(uint32_t*)dest_db->app_private;
252 
253     if ( which == 0 ) {
254         val = toku_xstrdup(row);
255     }
256     else {
257         val = toku_xstrdup(orderkey);
258     }
259 
260     switch(which) {
261     case 0:
262         key->key = atoi(linenumber);
263         break;
264     case 1:
265         // lineitem_fk1
266         key->key = atoi(orderkey);
267         break;
268     case 2:
269         // lineitem_fk2
270         key->key = atoi(suppkey);
271         break;
272     case 3:
273         // lineitem_fk3
274         key->key = atoi(partkey);// not really, ...
275         break;
276     case 4:
277         // lineitem_fk4
278         key->key = atoi(partkey);
279         break;
280     case 5:
281         // li_shp_dt_idx
282         key->key = atoi(linenumber) + atoi(suppkey); // not really ...
283         break;
284     case 6:
285         key->key = atoi(linenumber) +atoi(partkey); // not really ...
286         break;
287     case 7:
288         // li_rcpt_dt_idx
289         key->key = atoi(suppkey) + atoi(partkey); // not really ...
290         break;
291     default:
292         assert(0);
293     }
294 
295     dbt_init(dest_key, key, sizeof(struct tpch_key));
296     dest_key->flags = DB_DBT_REALLOC;
297 
298     dbt_init(dest_val, val, strlen(val)+1);
299     dest_val->flags = DB_DBT_REALLOC;
300 
301     return 0;
302 }
303 
304 
305 static void *expect_poll_void = &expect_poll_void;
306 static int poll_count=0;
poll_function(void * extra,float progress)307 static int poll_function (void *extra, float progress) {
308     if (0) {
309 	static int did_one=0;
310 	static struct timeval start;
311 	struct timeval now;
312 	gettimeofday(&now, 0);
313 	if (!did_one) {
314 	    start=now;
315 	    did_one=1;
316 	}
317 	printf("%6.6f %5.1f%%\n", now.tv_sec - start.tv_sec + 1e-6*(now.tv_usec - start.tv_usec), progress*100);
318     }
319     assert(extra==expect_poll_void);
320     assert(0.0<=progress && progress<=1.0);
321     poll_count++;
322     return 0;
323 }
324 
test_loader(DB ** dbs)325 static int test_loader(DB **dbs)
326 {
327     int r;
328     DB_TXN    *txn;
329     DB_LOADER *loader;
330     uint32_t db_flags[MAX_DBS];
331     uint32_t dbt_flags[MAX_DBS];
332     for(int i=0;i<MAX_DBS;i++) {
333         db_flags[i] = DB_NOOVERWRITE;
334         dbt_flags[i] = 0;
335     }
336     uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
337 
338     FILE *fp;
339     // select which table to loader
340     if ( USE_REGION ) {
341         fp = fopen("./region.tbl", "r");
342         if (fp == NULL) {
343             fprintf(stderr, "%s:%d %s\n", __FUNCTION__, __LINE__, strerror(errno));
344             return 1;
345         }
346         assert(fp != NULL);
347     } else {
348         fp = fopen("./lineitem.tbl", "r");
349         if (fp == NULL) {
350             fprintf(stderr, "%s:%d %s\n", __FUNCTION__, __LINE__, strerror(errno));
351             return 1;
352         }
353         assert(fp != NULL);
354     }
355 
356     // create and initialize loader
357 
358     r = env->txn_begin(env, NULL, &txn, 0);
359     CKERR(r);
360     r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
361     CKERR(r);
362     r = loader->set_error_callback(loader, NULL, NULL);
363     CKERR(r);
364     r = loader->set_poll_function(loader, poll_function, expect_poll_void);
365     CKERR(r);
366 
367     // using loader->put, put values into DB
368     printf("puts "); fflush(stdout);
369     DBT key, val;
370     int k;
371     char v[MAX_ROW_LEN];
372     char *c;
373     c = tpch_read_row(fp, &k, v);
374     int i = 1;
375     while ( c != NULL ) {
376         v[strlen(v)-1] = '\0';  // remove trailing \n
377         dbt_init(&key, &k, sizeof(int));
378         dbt_init(&val, v, strlen(v)+1);
379         r = loader->put(loader, &key, &val);
380         if (DISALLOW_PUTS) {
381             CKERR2(r, EINVAL);
382         } else {
383             CKERR(r);
384         }
385         if (verbose) { if((i++%10000) == 0){printf("."); fflush(stdout);} }
386         c = tpch_read_row(fp, &k, v);
387     }
388     if(verbose) {printf("\n"); fflush(stdout);}
389     fclose(fp);
390 
391     poll_count=0;
392 
393     // close the loader
394     printf("closing"); fflush(stdout);
395     r = loader->close(loader);
396     printf(" done\n");
397     CKERR(r);
398 
399     if ( DISALLOW_PUTS == 0 ) assert(poll_count>0);
400 
401     r = txn->commit(txn, 0);
402     CKERR(r);
403 
404     return 0;
405 }
406 
run_test(void)407 static int run_test(void)
408 {
409     int r;
410     char rmcmd[32 + strlen(envdir)];
411     snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
412     r = system(rmcmd);                                                                             CKERR(r);
413     r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO);                                                       CKERR(r);
414 
415     r = db_env_create(&env, 0);                                                                               CKERR(r);
416     db_env_enable_engine_status(0);  // disable engine status on crash because test is expected to fail
417     r = env->set_default_bt_compare(env, tpch_dbt_cmp);                                                       CKERR(r);
418     // select which TPC-H table to load
419     if ( USE_REGION ) {
420         r = env->set_generate_row_callback_for_put(env, generate_rows_for_region);                            CKERR(r);
421         NUM_DBS=1;
422     }
423     else {
424         r = env->set_generate_row_callback_for_put(env, generate_rows_for_lineitem);                          CKERR(r);
425         NUM_DBS=8;
426     }
427 
428     int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
429     r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO);                                            CKERR(r);
430     env->set_errfile(env, stderr);
431     //Disable auto-checkpointing
432     r = env->checkpointing_set_period(env, 0);                                                                CKERR(r);
433 
434     DBT desc;
435     dbt_init(&desc, "foo", sizeof("foo"));
436     char name[MAX_NAME*2];
437 
438     DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS);
439     assert(dbs != NULL);
440     int idx[MAX_DBS];
441     for(int i=0;i<NUM_DBS;i++) {
442         idx[i] = i;
443         r = db_create(&dbs[i], env, 0);                                                                       CKERR(r);
444         dbs[i]->app_private = &idx[i];
445         snprintf(name, sizeof(name), "db_%04x", i);
446         r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666);                                CKERR(r);
447         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
448                 { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
449         });
450     }
451 
452     // -------------------------- //
453     int testr = test_loader(dbs);
454     // -------------------------- //
455 
456     for(int i=0;i<NUM_DBS;i++) {
457         dbs[i]->close(dbs[i], 0);                                                                             CKERR(r);
458         dbs[i] = NULL;
459     }
460     r = env->close(env, 0);                                                                                   CKERR(r);
461     toku_free(dbs);
462 
463     return testr;
464 }
465 
466 // ------------ infrastructure ----------
467 static void do_args(int argc, char * const argv[]);
468 
test_main(int argc,char * const * argv)469 int test_main(int argc, char * const *argv) {
470     do_args(argc, argv);
471     int r = run_test();
472     return r;
473 }
474 
do_args(int argc,char * const argv[])475 static void do_args(int argc, char * const argv[]) {
476     int resultcode;
477     char *cmd = argv[0];
478     argc--; argv++;
479     while (argc>0) {
480 	if (strcmp(argv[0], "-v")==0) {
481 	    verbose++;
482 	} else if (strcmp(argv[0],"-q")==0) {
483 	    verbose--;
484 	    if (verbose<0) verbose=0;
485         } else if (strcmp(argv[0], "-h")==0) {
486 	    resultcode=0;
487 	do_usage:
488 	    fprintf(stderr, "Usage: -h -p -g\n%s\n", cmd);
489 	    exit(resultcode);
490         } else if (strcmp(argv[0], "-p")==0) {
491             DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
492         } else if (strcmp(argv[0], "-z")==0) {
493             COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
494         } else if (strcmp(argv[0], "-g")==0) {
495             USE_REGION = 1;
496         } else if (strcmp(argv[0], "-e") == 0) {
497             argc--; argv++;
498             if (argc > 0)
499                 envdir = argv[0];
500 	} else {
501 	    fprintf(stderr, "Unknown arg: %s\n", argv[0]);
502 	    resultcode=1;
503 	    goto do_usage;
504 	}
505 	argc--;
506 	argv++;
507     }
508 }
509