1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 #include "toku_pthread.h"
41 #include <db.h>
42 #include <sys/stat.h>
43 #include "toku_random.h"
44 
45 bool fast = false;
46 
47 DB_ENV *env;
48 enum {NUM_DBS=2};
49 uint32_t USE_COMPRESS=0;
50 
51 bool do_check = false;
52 uint32_t num_rows = 1;
53 uint32_t which_db_to_fail = (uint32_t) -1;
54 uint32_t which_row_to_fail = (uint32_t) -1;
55 enum how_to_fail { FAIL_NONE, FAIL_KSIZE, FAIL_VSIZE } how_to_fail = FAIL_NONE;
56 
57 static struct random_data random_data[NUM_DBS];
58 char random_buf[NUM_DBS][8];
59 
put_multiple_generate(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)60 static int put_multiple_generate(DB *dest_db,
61 				 DB *src_db __attribute__((__unused__)),
62 				 DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals,
63 				 const DBT *src_key, const DBT *src_val __attribute__((__unused__))) {
64     toku_dbt_array_resize(dest_keys, 1);
65     toku_dbt_array_resize(dest_vals, 1);
66     DBT *dest_key = &dest_keys->dbts[0];
67     DBT *dest_val = &dest_vals->dbts[0];
68 
69     uint32_t which = *(uint32_t*)dest_db->app_private;
70     assert(src_key->size==4);
71     uint32_t rownum = *(uint32_t*)src_key->data;
72 
73     uint32_t ksize, vsize;
74     const uint32_t kmax=32*1024, vmax=32*1024*1024;
75     if (which==which_db_to_fail && rownum==which_row_to_fail) {
76 	switch (how_to_fail) {
77 	case FAIL_NONE:  ksize=kmax;   vsize=vmax;   goto gotsize;
78 	case FAIL_KSIZE: ksize=kmax+1; vsize=vmax;   goto gotsize;
79 	case FAIL_VSIZE: ksize=kmax;   vsize=vmax+1; goto gotsize;
80 	}
81 	assert(0);
82     gotsize:;
83     } else {
84 	ksize=4; vsize=100;
85     }
86     assert(dest_key->flags==DB_DBT_REALLOC);
87     if (dest_key->ulen < ksize) {
88 	dest_key->data = toku_xrealloc(dest_key->data, ksize);
89 	dest_key->ulen = ksize;
90     }
91     assert(dest_val->flags==DB_DBT_REALLOC);
92     if (dest_val->ulen < vsize) {
93 	dest_val->data = toku_xrealloc(dest_val->data, vsize);
94 	dest_val->ulen = vsize;
95     }
96     assert(ksize>=sizeof(uint32_t));
97     for (uint32_t i=0; i<ksize; i++) ((char*)dest_key->data)[i] = myrandom_r(&random_data[which]);
98     for (uint32_t i=0; i<vsize; i++) ((char*)dest_val->data)[i] = myrandom_r(&random_data[which]);
99     *(uint32_t*)dest_key->data = rownum;
100     dest_key->size = ksize;
101     dest_val->size = vsize;
102 
103     return 0;
104 }
105 
106 struct error_extra {
107     int bad_i;
108     int error_count;
109 };
110 
error_callback(DB * db,int which_db,int err,DBT * key,DBT * val,void * extra)111 static void error_callback (DB *db __attribute__((__unused__)), int which_db, int err, DBT *key __attribute__((__unused__)), DBT *val __attribute__((__unused__)), void *extra) {
112     struct error_extra *e =(struct error_extra *)extra;
113     assert(which_db==(int)which_db_to_fail);
114     assert(err==EINVAL);
115     assert(e->error_count==0);
116     e->error_count++;
117 }
118 
reset_random(void)119 static void reset_random(void) {
120     int r;
121 
122     for (int i = 0; i < NUM_DBS; i++) {
123         ZERO_STRUCT(random_data[i]);
124         ZERO_ARRAY(random_buf[i]);
125         r = myinitstate_r(i, random_buf[i], 8, &random_data[i]);
126         assert(r==0);
127     }
128 }
129 
test_loader_maxsize(DB ** dbs,DB ** check_dbs)130 static void test_loader_maxsize(DB **dbs, DB **check_dbs)
131 {
132     int r;
133     DB_TXN    *txn;
134     DB_LOADER *loader;
135     uint32_t db_flags[NUM_DBS];
136     uint32_t dbt_flags[NUM_DBS];
137     for(int i=0;i<NUM_DBS;i++) {
138         db_flags[i] = DB_NOOVERWRITE;
139         dbt_flags[i] = 0;
140     }
141     uint32_t loader_flags = USE_COMPRESS; // set with -p option
142 
143     // create and initialize loader
144     r = env->txn_begin(env, NULL, &txn, 0);
145     CKERR(r);
146     r = env->create_loader(env, txn, &loader, nullptr, NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
147     assert(which_db_to_fail != 0);
148     CKERR(r);
149     struct error_extra error_extra = {.bad_i=0,.error_count=0};
150     r = loader->set_error_callback(loader, error_callback, (void*)&error_extra);
151     CKERR(r);
152     r = loader->set_poll_function(loader, NULL, NULL);
153     CKERR(r);
154 
155     reset_random();
156     // using loader->put, put values into DB
157     DBT key, val;
158     unsigned int k, v;
159     for(uint32_t i=0;i<num_rows;i++) {
160         k = i;
161         v = i;
162         dbt_init(&key, &k, sizeof(unsigned int));
163         dbt_init(&val, &v, sizeof(unsigned int));
164         r = loader->put(loader, &key, &val);
165         CKERR(r);
166     }
167 
168     // close the loader
169     if (verbose) { printf("closing"); fflush(stdout); }
170     r = loader->close(loader);
171     if (verbose) {  printf(" done\n"); }
172     switch(how_to_fail) {
173     case FAIL_NONE:  assert(r==0);      assert(error_extra.error_count==0); goto checked;
174     case FAIL_KSIZE: assert(r==EINVAL); assert(error_extra.error_count==1); goto checked;
175     case FAIL_VSIZE: assert(r==EINVAL); assert(error_extra.error_count==1); goto checked;
176     }
177     assert(0);
178  checked:
179     r = txn->commit(txn, 0);
180     CKERR(r);
181 
182     if (do_check && how_to_fail==FAIL_NONE) {
183         r = env->txn_begin(env, NULL, &txn, 0);
184         CKERR(r);
185         reset_random();
186         DBT keys[NUM_DBS];
187         DBT vals[NUM_DBS];
188         uint32_t flags[NUM_DBS];
189         for (int i = 0; i < NUM_DBS; i++) {
190             dbt_init_realloc(&keys[i]);
191             dbt_init_realloc(&vals[i]);
192             flags[i] = 0;
193         }
194 
195         for(uint32_t i=0;i<num_rows;i++) {
196             k = i;
197             v = i;
198             dbt_init(&key, &k, sizeof(unsigned int));
199             dbt_init(&val, &v, sizeof(unsigned int));
200             r = env_put_multiple_test_no_array(env, nullptr, txn, &key, &val, NUM_DBS, check_dbs, keys, vals, flags);
201             CKERR(r);
202         }
203         r = txn->commit(txn, 0);
204         CKERR(r);
205         r = env->txn_begin(env, NULL, &txn, 0);
206         CKERR(r);
207 
208         for (int i = 0; i < NUM_DBS; i++) {
209             DBC *loader_cursor;
210             DBC *check_cursor;
211             r = dbs[i]->cursor(dbs[i], txn, &loader_cursor, 0);
212             CKERR(r);
213             r = dbs[i]->cursor(check_dbs[i], txn, &check_cursor, 0);
214             CKERR(r);
215             DBT loader_key;
216             DBT loader_val;
217             DBT check_key;
218             DBT check_val;
219             dbt_init_realloc(&loader_key);
220             dbt_init_realloc(&loader_val);
221             dbt_init_realloc(&check_key);
222             dbt_init_realloc(&check_val);
223             for (uint32_t x = 0; x <= num_rows; x++) {
224                 int r_loader = loader_cursor->c_get(loader_cursor, &loader_key, &loader_val, DB_NEXT);
225                 int r_check = check_cursor->c_get(check_cursor, &check_key, &check_val, DB_NEXT);
226                 assert(r_loader == r_check);
227                 if (x == num_rows) {
228                     CKERR2(r_loader, DB_NOTFOUND);
229                     CKERR2(r_check, DB_NOTFOUND);
230                 } else {
231                     CKERR(r_loader);
232                     CKERR(r_check);
233                 }
234                 assert(loader_key.size == check_key.size);
235                 assert(loader_val.size == check_val.size);
236                 assert(memcmp(loader_key.data, check_key.data, loader_key.size) == 0);
237                 assert(memcmp(loader_val.data, check_val.data, loader_val.size) == 0);
238             }
239             toku_free(loader_key.data);
240             toku_free(loader_val.data);
241             toku_free(check_key.data);
242             toku_free(check_val.data);
243             loader_cursor->c_close(loader_cursor);
244             check_cursor->c_close(check_cursor);
245         }
246 
247         for (int i = 0; i < NUM_DBS; i++) {
248             toku_free(keys[i].data);
249             toku_free(vals[i].data);
250             dbt_init_realloc(&keys[i]);
251             dbt_init_realloc(&vals[i]);
252         }
253         r = txn->commit(txn, 0);
254         CKERR(r);
255     }
256 
257 
258 }
259 
260 char *free_me = NULL;
261 const char *env_dir = TOKU_TEST_FILENAME; // the default env_dir
262 
create_and_open_dbs(DB ** dbs,const char * suffix,int * idx)263 static void create_and_open_dbs(DB **dbs, const char *suffix, int *idx) {
264     int r;
265     DBT desc;
266     dbt_init(&desc, "foo", sizeof("foo"));
267     enum {MAX_NAME=128};
268     char name[MAX_NAME*2];
269 
270     for(int i=0;i<NUM_DBS;i++) {
271         idx[i] = i;
272         r = db_create(&dbs[i], env, 0);                                                                       CKERR(r);
273         dbs[i]->app_private = &idx[i];
274         snprintf(name, sizeof(name), "db_%04x_%s", i, suffix);
275         r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666);                                CKERR(r);
276         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
277                 { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
278         });
279     }
280 }
281 
282 static int
uint_or_size_dbt_cmp(DB * db,const DBT * a,const DBT * b)283 uint_or_size_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
284   assert(db && a && b);
285   if (a->size == sizeof(unsigned int) && b->size == sizeof(unsigned int)) {
286       return uint_dbt_cmp(db, a, b);
287   }
288   return a->size - b->size;
289 }
290 
run_test(uint32_t nr,uint32_t wdb,uint32_t wrow,enum how_to_fail htf)291 static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail htf) {
292     num_rows = nr; which_db_to_fail = wdb; which_row_to_fail = wrow; how_to_fail = htf;
293 
294     int r;
295     toku_os_recursive_delete(env_dir);
296     r = toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO);                                                       CKERR(r);
297 
298     r = db_env_create(&env, 0);                                                                               CKERR(r);
299     r = env->set_default_bt_compare(env, uint_or_size_dbt_cmp);                                                       CKERR(r);
300     r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
301     CKERR(r);
302     int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG | DB_CREATE | DB_PRIVATE;
303     r = env->open(env, env_dir, envflags, S_IRWXU+S_IRWXG+S_IRWXO);                                            CKERR(r);
304     env->set_errfile(env, stderr);
305     //Disable auto-checkpointing
306     r = env->checkpointing_set_period(env, 0);                                                                CKERR(r);
307 
308     DB **XMALLOC_N(NUM_DBS, dbs);
309     DB **XMALLOC_N(NUM_DBS, check_dbs);
310     int idx[NUM_DBS];
311 
312     create_and_open_dbs(dbs, "loader", &idx[0]);
313     if (do_check && how_to_fail==FAIL_NONE) {
314         create_and_open_dbs(check_dbs, "check", &idx[0]);
315     }
316 
317     if (verbose) printf("running test_loader()\n");
318     // -------------------------- //
319     test_loader_maxsize(dbs, check_dbs);
320     // -------------------------- //
321     if (verbose) printf("done    test_loader()\n");
322 
323     for(int i=0;i<NUM_DBS;i++) {
324         dbs[i]->close(dbs[i], 0);                                                                             CKERR(r);
325         dbs[i] = NULL;
326         if (do_check && how_to_fail==FAIL_NONE) {
327             check_dbs[i]->close(check_dbs[i], 0);                                                                 CKERR(r);
328             check_dbs[i] = NULL;
329         }
330     }
331     r = env->close(env, 0);                                                                                   CKERR(r);
332     toku_free(dbs);
333     toku_free(check_dbs);
334 }
335 
336 // ------------ infrastructure ----------
337 static void do_args(int argc, char * const argv[]);
338 
339 int num_rows_set = false;
340 
test_main(int argc,char * const * argv)341 int test_main(int argc, char * const *argv) {
342     do_args(argc, argv);
343 
344     run_test(1, (uint32_t) -1, (uint32_t) -1, FAIL_NONE);
345     run_test(1,  1,  0, FAIL_NONE);
346     run_test(1,  1,  0, FAIL_KSIZE);
347     run_test(1,  1,  0, FAIL_VSIZE);
348     if (!fast) {
349 	run_test(1000000, 1, 500000, FAIL_KSIZE);
350 	run_test(1000000, 1, 500000, FAIL_VSIZE);
351     }
352     toku_free(free_me);
353     return 0;
354 }
355 
do_args(int argc,char * const argv[])356 static void do_args(int argc, char * const argv[]) {
357     int resultcode;
358     char *cmd = argv[0];
359     argc--; argv++;
360     while (argc>0) {
361         if (strcmp(argv[0], "-h")==0) {
362             resultcode=0;
363 	do_usage:
364 	    fprintf(stderr, "Usage: %s [-h] [-v] [-q] [-p] [-f]\n", cmd);
365 	    fprintf(stderr, " where -e <env>         uses <env> to construct the directory (so that different tests can run concurrently)\n");
366 	    fprintf(stderr, "       -h               help\n");
367 	    fprintf(stderr, "       -v               verbose\n");
368 	    fprintf(stderr, "       -q               quiet\n");
369 	    fprintf(stderr, "       -z               compress intermediates\n");
370 	    fprintf(stderr, "       -c               compare with regular dbs\n");
371 	    fprintf(stderr, "       -f               fast (suitable for vgrind)\n");
372 	    exit(resultcode);
373 	} else if (strcmp(argv[0], "-c")==0) {
374 	    do_check = true;
375 	} else if (strcmp(argv[0], "-v")==0) {
376 	    verbose++;
377 	} else if (strcmp(argv[0],"-q")==0) {
378 	    verbose--;
379 	    if (verbose<0) verbose=0;
380         } else if (strcmp(argv[0], "-z")==0) {
381             USE_COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
382         } else if (strcmp(argv[0], "-f")==0) {
383 	    fast     = true;
384 	} else {
385 	    fprintf(stderr, "Unknown arg: %s\n", argv[0]);
386 	    resultcode=1;
387 	    goto do_usage;
388 	}
389 	argc--;
390 	argv++;
391     }
392 }
393