1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40 #include "toku_pthread.h"
41 #include <db.h>
42 #include <sys/stat.h>
43
44 DB_ENV *env;
45 enum {MAX_NAME=128};
46 enum {MAX_DBS=16};
47 enum {MAX_ROW_LEN=1024};
48 static int NUM_DBS=10;
49 static int DISALLOW_PUTS=0;
50 static int COMPRESS=0;
51 static int USE_REGION=0;
52 static const char *envdir = TOKU_TEST_FILENAME;
53
54 static int generate_rows_for_region(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) __attribute__((unused));
55 static int generate_rows_for_lineitem(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) __attribute__((unused));
56
57 // linenumber,orderkey form a unique, primary key
58 // key is a potentially duplicate secondary key
59 struct tpch_key {
60 uint32_t linenumber;
61 uint32_t orderkey;
62 uint32_t key;
63 };
64
65 static __attribute__((__unused__)) int
tpch_dbt_cmp(DB * db,const DBT * a,const DBT * b)66 tpch_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
67 assert(db && a && b);
68 assert(a->size == sizeof(struct tpch_key));
69 assert(b->size == sizeof(struct tpch_key));
70
71 unsigned int xl = (*((struct tpch_key *) a->data)).linenumber;
72 unsigned int xo = (*((struct tpch_key *) a->data)).orderkey;
73 unsigned int xk = (*((struct tpch_key *) a->data)).key;
74
75 unsigned int yl = (*((struct tpch_key *) b->data)).linenumber;
76 unsigned int yo = (*((struct tpch_key *) b->data)).orderkey;
77 unsigned int yk = (*((struct tpch_key *) b->data)).key;
78
79 // printf("tpch_dbt_cmp xl:%d, yl:%d, xo:%d, yo:%d, xk:%d, yk:%d\n", xl, yl, xo, yo, xk, yk);
80
81 if (xk<yk) return -1;
82 if (xk>yk) return 1;
83
84 if (xl<yl) return -1;
85 if (xl>yl) return 1;
86
87 if (xo>yo) return -1;
88 if (xo<yo) return 1;
89 return 0;
90 }
91
92
93 static int lineno = 0;
tpch_read_row(FILE * fp,int * key,char * val)94 static char *tpch_read_row(FILE *fp, int *key, char *val)
95 {
96 *key = lineno++;
97 return fgets(val, MAX_ROW_LEN , fp);
98 }
99
100
101 /*
102 * split '|' separated fields into fields array
103 */
tpch_parse_row(char * row,char * fields[],int fields_N)104 static void tpch_parse_row(char *row, char *fields[], int fields_N)
105 {
106 int field = 0;
107 int i = 0;
108 int p = 0;
109 char c = row[p];
110
111 while(c != '\0')
112 {
113 if ( c == '|') {
114 fields[field][i] = '\0';
115 //printf("field : <%s>\n", fields[field]);
116 field++;
117 i = 0;
118 }
119 else
120 fields[field][i++] = c;
121 c = row[++p];
122 }
123 assert(field == fields_N);
124 }
125
126 /*
127 * region table
128 */
129
generate_rows_for_region(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)130 static int generate_rows_for_region(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val)
131 {
132 toku_dbt_array_resize(dest_keys, 1);
133 toku_dbt_array_resize(dest_vals, 1);
134 DBT *dest_key = &dest_keys->dbts[0];
135 DBT *dest_val = &dest_vals->dbts[0];
136
137 // not used
138 (void) src_db;
139 (void) src_key;
140 assert(*(uint32_t*)dest_db->app_private == 0);
141
142 // region fields
143 char regionkey[8];
144 char name[32];
145 char comment[160];
146 char row[8+32+160+8];
147 sprintf(row, "%s", (char*)src_val->data);
148
149 const uint32_t fields_N = 3;
150 char *fields[3] = {regionkey, name, comment};
151 tpch_parse_row(row, fields, fields_N);
152
153 if (dest_key->flags==DB_DBT_REALLOC) {
154 if (dest_key->data) toku_free(dest_key->data);
155 dest_key->flags = 0;
156 dest_key->ulen = 0;
157 }
158 if (dest_val->flags==DB_DBT_REALLOC) {
159 if (dest_val->data) toku_free(dest_val->data);
160 dest_val->flags = 0;
161 dest_val->ulen = 0;
162 }
163
164 struct tpch_key *XMALLOC(key);
165 key->orderkey = atoi(regionkey);
166 key->linenumber = atoi(regionkey);
167 key->key = atoi(regionkey);
168
169 char *XMALLOC_N(sizeof(row), val);
170 sprintf(val, "%s|%s", name, comment);
171
172 dbt_init(dest_key, key, sizeof(struct tpch_key));
173 dest_key->flags = DB_DBT_REALLOC;
174
175 dbt_init(dest_val, val, strlen(val)+1);
176 dest_val->flags = DB_DBT_REALLOC;
177
178 return 0;
179 }
180
181 /*
182 * lineitem table
183 */
184
185
generate_rows_for_lineitem(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)186 static int generate_rows_for_lineitem(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val)
187 {
188 toku_dbt_array_resize(dest_keys, 1);
189 toku_dbt_array_resize(dest_vals, 1);
190 DBT *dest_key = &dest_keys->dbts[0];
191 DBT *dest_val = &dest_vals->dbts[0];
192 // not used
193 (void) src_db;
194 (void) src_key;
195
196 // lineitem fields
197 char orderkey[16];
198 char partkey[16];
199 char suppkey[16];
200 char linenumber[8];
201 char quantity[8];
202 char extendedprice[16];
203 char discount[8];
204 char tax[8];
205 char returnflag[8];
206 char linestatus[8];
207 char shipdate[16];
208 char commitdate[16];
209 char receiptdate[16];
210 char shipinstruct[32];
211 char shipmode[16];
212 char comment[48];
213 char row[16+16+16+8+8+16+8+8+8+8+16+16+16+32+16+48 + 8];
214 sprintf(row, "%s", (char*)src_val->data);
215
216 const uint32_t fields_N = 16;
217 char *fields[16] = {orderkey,
218 partkey,
219 suppkey,
220 linenumber,
221 quantity,
222 extendedprice,
223 discount,
224 tax,
225 returnflag,
226 linestatus,
227 shipdate,
228 commitdate,
229 receiptdate,
230 shipinstruct,
231 shipmode,
232 comment};
233 tpch_parse_row(row, fields, fields_N);
234
235 if (dest_key->flags==DB_DBT_REALLOC) {
236 if (dest_key->data) toku_free(dest_key->data);
237 dest_key->flags = 0;
238 dest_key->ulen = 0;
239 }
240 if (dest_val->flags==DB_DBT_REALLOC) {
241 if (dest_val->data) toku_free(dest_val->data);
242 dest_val->flags = 0;
243 dest_val->ulen = 0;
244 }
245
246 struct tpch_key *XMALLOC(key);
247 key->orderkey = atoi(linenumber);
248 key->linenumber = atoi(orderkey);
249
250 char *val;
251 uint32_t which = *(uint32_t*)dest_db->app_private;
252
253 if ( which == 0 ) {
254 val = toku_xstrdup(row);
255 }
256 else {
257 val = toku_xstrdup(orderkey);
258 }
259
260 switch(which) {
261 case 0:
262 key->key = atoi(linenumber);
263 break;
264 case 1:
265 // lineitem_fk1
266 key->key = atoi(orderkey);
267 break;
268 case 2:
269 // lineitem_fk2
270 key->key = atoi(suppkey);
271 break;
272 case 3:
273 // lineitem_fk3
274 key->key = atoi(partkey);// not really, ...
275 break;
276 case 4:
277 // lineitem_fk4
278 key->key = atoi(partkey);
279 break;
280 case 5:
281 // li_shp_dt_idx
282 key->key = atoi(linenumber) + atoi(suppkey); // not really ...
283 break;
284 case 6:
285 key->key = atoi(linenumber) +atoi(partkey); // not really ...
286 break;
287 case 7:
288 // li_rcpt_dt_idx
289 key->key = atoi(suppkey) + atoi(partkey); // not really ...
290 break;
291 default:
292 assert(0);
293 }
294
295 dbt_init(dest_key, key, sizeof(struct tpch_key));
296 dest_key->flags = DB_DBT_REALLOC;
297
298 dbt_init(dest_val, val, strlen(val)+1);
299 dest_val->flags = DB_DBT_REALLOC;
300
301 return 0;
302 }
303
304
305 static void *expect_poll_void = &expect_poll_void;
306 static int poll_count=0;
poll_function(void * extra,float progress)307 static int poll_function (void *extra, float progress) {
308 if (0) {
309 static int did_one=0;
310 static struct timeval start;
311 struct timeval now;
312 gettimeofday(&now, 0);
313 if (!did_one) {
314 start=now;
315 did_one=1;
316 }
317 printf("%6.6f %5.1f%%\n", now.tv_sec - start.tv_sec + 1e-6*(now.tv_usec - start.tv_usec), progress*100);
318 }
319 assert(extra==expect_poll_void);
320 assert(0.0<=progress && progress<=1.0);
321 poll_count++;
322 return 0;
323 }
324
test_loader(DB ** dbs)325 static int test_loader(DB **dbs)
326 {
327 int r;
328 DB_TXN *txn;
329 DB_LOADER *loader;
330 uint32_t db_flags[MAX_DBS];
331 uint32_t dbt_flags[MAX_DBS];
332 for(int i=0;i<MAX_DBS;i++) {
333 db_flags[i] = DB_NOOVERWRITE;
334 dbt_flags[i] = 0;
335 }
336 uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
337
338 FILE *fp;
339 // select which table to loader
340 if ( USE_REGION ) {
341 fp = fopen("./region.tbl", "r");
342 if (fp == NULL) {
343 fprintf(stderr, "%s:%d %s\n", __FUNCTION__, __LINE__, strerror(errno));
344 return 1;
345 }
346 assert(fp != NULL);
347 } else {
348 fp = fopen("./lineitem.tbl", "r");
349 if (fp == NULL) {
350 fprintf(stderr, "%s:%d %s\n", __FUNCTION__, __LINE__, strerror(errno));
351 return 1;
352 }
353 assert(fp != NULL);
354 }
355
356 // create and initialize loader
357
358 r = env->txn_begin(env, NULL, &txn, 0);
359 CKERR(r);
360 r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
361 CKERR(r);
362 r = loader->set_error_callback(loader, NULL, NULL);
363 CKERR(r);
364 r = loader->set_poll_function(loader, poll_function, expect_poll_void);
365 CKERR(r);
366
367 // using loader->put, put values into DB
368 printf("puts "); fflush(stdout);
369 DBT key, val;
370 int k;
371 char v[MAX_ROW_LEN];
372 char *c;
373 c = tpch_read_row(fp, &k, v);
374 int i = 1;
375 while ( c != NULL ) {
376 v[strlen(v)-1] = '\0'; // remove trailing \n
377 dbt_init(&key, &k, sizeof(int));
378 dbt_init(&val, v, strlen(v)+1);
379 r = loader->put(loader, &key, &val);
380 if (DISALLOW_PUTS) {
381 CKERR2(r, EINVAL);
382 } else {
383 CKERR(r);
384 }
385 if (verbose) { if((i++%10000) == 0){printf("."); fflush(stdout);} }
386 c = tpch_read_row(fp, &k, v);
387 }
388 if(verbose) {printf("\n"); fflush(stdout);}
389 fclose(fp);
390
391 poll_count=0;
392
393 // close the loader
394 printf("closing"); fflush(stdout);
395 r = loader->close(loader);
396 printf(" done\n");
397 CKERR(r);
398
399 if ( DISALLOW_PUTS == 0 ) assert(poll_count>0);
400
401 r = txn->commit(txn, 0);
402 CKERR(r);
403
404 return 0;
405 }
406
run_test(void)407 static int run_test(void)
408 {
409 int r;
410 char rmcmd[32 + strlen(envdir)];
411 snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
412 r = system(rmcmd); CKERR(r);
413 r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
414
415 r = db_env_create(&env, 0); CKERR(r);
416 db_env_enable_engine_status(0); // disable engine status on crash because test is expected to fail
417 r = env->set_default_bt_compare(env, tpch_dbt_cmp); CKERR(r);
418 // select which TPC-H table to load
419 if ( USE_REGION ) {
420 r = env->set_generate_row_callback_for_put(env, generate_rows_for_region); CKERR(r);
421 NUM_DBS=1;
422 }
423 else {
424 r = env->set_generate_row_callback_for_put(env, generate_rows_for_lineitem); CKERR(r);
425 NUM_DBS=8;
426 }
427
428 int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
429 r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
430 env->set_errfile(env, stderr);
431 //Disable auto-checkpointing
432 r = env->checkpointing_set_period(env, 0); CKERR(r);
433
434 DBT desc;
435 dbt_init(&desc, "foo", sizeof("foo"));
436 char name[MAX_NAME*2];
437
438 DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS);
439 assert(dbs != NULL);
440 int idx[MAX_DBS];
441 for(int i=0;i<NUM_DBS;i++) {
442 idx[i] = i;
443 r = db_create(&dbs[i], env, 0); CKERR(r);
444 dbs[i]->app_private = &idx[i];
445 snprintf(name, sizeof(name), "db_%04x", i);
446 r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
447 IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
448 { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
449 });
450 }
451
452 // -------------------------- //
453 int testr = test_loader(dbs);
454 // -------------------------- //
455
456 for(int i=0;i<NUM_DBS;i++) {
457 dbs[i]->close(dbs[i], 0); CKERR(r);
458 dbs[i] = NULL;
459 }
460 r = env->close(env, 0); CKERR(r);
461 toku_free(dbs);
462
463 return testr;
464 }
465
466 // ------------ infrastructure ----------
467 static void do_args(int argc, char * const argv[]);
468
test_main(int argc,char * const * argv)469 int test_main(int argc, char * const *argv) {
470 do_args(argc, argv);
471 int r = run_test();
472 return r;
473 }
474
do_args(int argc,char * const argv[])475 static void do_args(int argc, char * const argv[]) {
476 int resultcode;
477 char *cmd = argv[0];
478 argc--; argv++;
479 while (argc>0) {
480 if (strcmp(argv[0], "-v")==0) {
481 verbose++;
482 } else if (strcmp(argv[0],"-q")==0) {
483 verbose--;
484 if (verbose<0) verbose=0;
485 } else if (strcmp(argv[0], "-h")==0) {
486 resultcode=0;
487 do_usage:
488 fprintf(stderr, "Usage: -h -p -g\n%s\n", cmd);
489 exit(resultcode);
490 } else if (strcmp(argv[0], "-p")==0) {
491 DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
492 } else if (strcmp(argv[0], "-z")==0) {
493 COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
494 } else if (strcmp(argv[0], "-g")==0) {
495 USE_REGION = 1;
496 } else if (strcmp(argv[0], "-e") == 0) {
497 argc--; argv++;
498 if (argc > 0)
499 envdir = argv[0];
500 } else {
501 fprintf(stderr, "Unknown arg: %s\n", argv[0]);
502 resultcode=1;
503 goto do_usage;
504 }
505 argc--;
506 argv++;
507 }
508 }
509