1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40 #include "toku_pthread.h"
41 #include <db.h>
42 #include <sys/stat.h>
43 #include "toku_random.h"
44
45 bool fast = false;
46
47 DB_ENV *env;
48 enum {NUM_DBS=2};
49 uint32_t USE_COMPRESS=0;
50
51 bool do_check = false;
52 uint32_t num_rows = 1;
53 uint32_t which_db_to_fail = (uint32_t) -1;
54 uint32_t which_row_to_fail = (uint32_t) -1;
55 enum how_to_fail { FAIL_NONE, FAIL_KSIZE, FAIL_VSIZE } how_to_fail = FAIL_NONE;
56
57 static struct random_data random_data[NUM_DBS];
58 char random_buf[NUM_DBS][8];
59
put_multiple_generate(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)60 static int put_multiple_generate(DB *dest_db,
61 DB *src_db __attribute__((__unused__)),
62 DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals,
63 const DBT *src_key, const DBT *src_val __attribute__((__unused__))) {
64 toku_dbt_array_resize(dest_keys, 1);
65 toku_dbt_array_resize(dest_vals, 1);
66 DBT *dest_key = &dest_keys->dbts[0];
67 DBT *dest_val = &dest_vals->dbts[0];
68
69 uint32_t which = *(uint32_t*)dest_db->app_private;
70 assert(src_key->size==4);
71 uint32_t rownum = *(uint32_t*)src_key->data;
72
73 uint32_t ksize, vsize;
74 const uint32_t kmax=32*1024, vmax=32*1024*1024;
75 if (which==which_db_to_fail && rownum==which_row_to_fail) {
76 switch (how_to_fail) {
77 case FAIL_NONE: ksize=kmax; vsize=vmax; goto gotsize;
78 case FAIL_KSIZE: ksize=kmax+1; vsize=vmax; goto gotsize;
79 case FAIL_VSIZE: ksize=kmax; vsize=vmax+1; goto gotsize;
80 }
81 assert(0);
82 gotsize:;
83 } else {
84 ksize=4; vsize=100;
85 }
86 assert(dest_key->flags==DB_DBT_REALLOC);
87 if (dest_key->ulen < ksize) {
88 dest_key->data = toku_xrealloc(dest_key->data, ksize);
89 dest_key->ulen = ksize;
90 }
91 assert(dest_val->flags==DB_DBT_REALLOC);
92 if (dest_val->ulen < vsize) {
93 dest_val->data = toku_xrealloc(dest_val->data, vsize);
94 dest_val->ulen = vsize;
95 }
96 assert(ksize>=sizeof(uint32_t));
97 for (uint32_t i=0; i<ksize; i++) ((char*)dest_key->data)[i] = myrandom_r(&random_data[which]);
98 for (uint32_t i=0; i<vsize; i++) ((char*)dest_val->data)[i] = myrandom_r(&random_data[which]);
99 *(uint32_t*)dest_key->data = rownum;
100 dest_key->size = ksize;
101 dest_val->size = vsize;
102
103 return 0;
104 }
105
106 struct error_extra {
107 int bad_i;
108 int error_count;
109 };
110
error_callback(DB * db,int which_db,int err,DBT * key,DBT * val,void * extra)111 static void error_callback (DB *db __attribute__((__unused__)), int which_db, int err, DBT *key __attribute__((__unused__)), DBT *val __attribute__((__unused__)), void *extra) {
112 struct error_extra *e =(struct error_extra *)extra;
113 assert(which_db==(int)which_db_to_fail);
114 assert(err==EINVAL);
115 assert(e->error_count==0);
116 e->error_count++;
117 }
118
reset_random(void)119 static void reset_random(void) {
120 int r;
121
122 for (int i = 0; i < NUM_DBS; i++) {
123 ZERO_STRUCT(random_data[i]);
124 ZERO_ARRAY(random_buf[i]);
125 r = myinitstate_r(i, random_buf[i], 8, &random_data[i]);
126 assert(r==0);
127 }
128 }
129
test_loader_maxsize(DB ** dbs,DB ** check_dbs)130 static void test_loader_maxsize(DB **dbs, DB **check_dbs)
131 {
132 int r;
133 DB_TXN *txn;
134 DB_LOADER *loader;
135 uint32_t db_flags[NUM_DBS];
136 uint32_t dbt_flags[NUM_DBS];
137 for(int i=0;i<NUM_DBS;i++) {
138 db_flags[i] = DB_NOOVERWRITE;
139 dbt_flags[i] = 0;
140 }
141 uint32_t loader_flags = USE_COMPRESS; // set with -p option
142
143 // create and initialize loader
144 r = env->txn_begin(env, NULL, &txn, 0);
145 CKERR(r);
146 r = env->create_loader(env, txn, &loader, nullptr, NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
147 assert(which_db_to_fail != 0);
148 CKERR(r);
149 struct error_extra error_extra = {.bad_i=0,.error_count=0};
150 r = loader->set_error_callback(loader, error_callback, (void*)&error_extra);
151 CKERR(r);
152 r = loader->set_poll_function(loader, NULL, NULL);
153 CKERR(r);
154
155 reset_random();
156 // using loader->put, put values into DB
157 DBT key, val;
158 unsigned int k, v;
159 for(uint32_t i=0;i<num_rows;i++) {
160 k = i;
161 v = i;
162 dbt_init(&key, &k, sizeof(unsigned int));
163 dbt_init(&val, &v, sizeof(unsigned int));
164 r = loader->put(loader, &key, &val);
165 CKERR(r);
166 }
167
168 // close the loader
169 if (verbose) { printf("closing"); fflush(stdout); }
170 r = loader->close(loader);
171 if (verbose) { printf(" done\n"); }
172 switch(how_to_fail) {
173 case FAIL_NONE: assert(r==0); assert(error_extra.error_count==0); goto checked;
174 case FAIL_KSIZE: assert(r==EINVAL); assert(error_extra.error_count==1); goto checked;
175 case FAIL_VSIZE: assert(r==EINVAL); assert(error_extra.error_count==1); goto checked;
176 }
177 assert(0);
178 checked:
179 r = txn->commit(txn, 0);
180 CKERR(r);
181
182 if (do_check && how_to_fail==FAIL_NONE) {
183 r = env->txn_begin(env, NULL, &txn, 0);
184 CKERR(r);
185 reset_random();
186 DBT keys[NUM_DBS];
187 DBT vals[NUM_DBS];
188 uint32_t flags[NUM_DBS];
189 for (int i = 0; i < NUM_DBS; i++) {
190 dbt_init_realloc(&keys[i]);
191 dbt_init_realloc(&vals[i]);
192 flags[i] = 0;
193 }
194
195 for(uint32_t i=0;i<num_rows;i++) {
196 k = i;
197 v = i;
198 dbt_init(&key, &k, sizeof(unsigned int));
199 dbt_init(&val, &v, sizeof(unsigned int));
200 r = env_put_multiple_test_no_array(env, nullptr, txn, &key, &val, NUM_DBS, check_dbs, keys, vals, flags);
201 CKERR(r);
202 }
203 r = txn->commit(txn, 0);
204 CKERR(r);
205 r = env->txn_begin(env, NULL, &txn, 0);
206 CKERR(r);
207
208 for (int i = 0; i < NUM_DBS; i++) {
209 DBC *loader_cursor;
210 DBC *check_cursor;
211 r = dbs[i]->cursor(dbs[i], txn, &loader_cursor, 0);
212 CKERR(r);
213 r = dbs[i]->cursor(check_dbs[i], txn, &check_cursor, 0);
214 CKERR(r);
215 DBT loader_key;
216 DBT loader_val;
217 DBT check_key;
218 DBT check_val;
219 dbt_init_realloc(&loader_key);
220 dbt_init_realloc(&loader_val);
221 dbt_init_realloc(&check_key);
222 dbt_init_realloc(&check_val);
223 for (uint32_t x = 0; x <= num_rows; x++) {
224 int r_loader = loader_cursor->c_get(loader_cursor, &loader_key, &loader_val, DB_NEXT);
225 int r_check = check_cursor->c_get(check_cursor, &check_key, &check_val, DB_NEXT);
226 assert(r_loader == r_check);
227 if (x == num_rows) {
228 CKERR2(r_loader, DB_NOTFOUND);
229 CKERR2(r_check, DB_NOTFOUND);
230 } else {
231 CKERR(r_loader);
232 CKERR(r_check);
233 }
234 assert(loader_key.size == check_key.size);
235 assert(loader_val.size == check_val.size);
236 assert(memcmp(loader_key.data, check_key.data, loader_key.size) == 0);
237 assert(memcmp(loader_val.data, check_val.data, loader_val.size) == 0);
238 }
239 toku_free(loader_key.data);
240 toku_free(loader_val.data);
241 toku_free(check_key.data);
242 toku_free(check_val.data);
243 loader_cursor->c_close(loader_cursor);
244 check_cursor->c_close(check_cursor);
245 }
246
247 for (int i = 0; i < NUM_DBS; i++) {
248 toku_free(keys[i].data);
249 toku_free(vals[i].data);
250 dbt_init_realloc(&keys[i]);
251 dbt_init_realloc(&vals[i]);
252 }
253 r = txn->commit(txn, 0);
254 CKERR(r);
255 }
256
257
258 }
259
260 char *free_me = NULL;
261 const char *env_dir = TOKU_TEST_FILENAME; // the default env_dir
262
create_and_open_dbs(DB ** dbs,const char * suffix,int * idx)263 static void create_and_open_dbs(DB **dbs, const char *suffix, int *idx) {
264 int r;
265 DBT desc;
266 dbt_init(&desc, "foo", sizeof("foo"));
267 enum {MAX_NAME=128};
268 char name[MAX_NAME*2];
269
270 for(int i=0;i<NUM_DBS;i++) {
271 idx[i] = i;
272 r = db_create(&dbs[i], env, 0); CKERR(r);
273 dbs[i]->app_private = &idx[i];
274 snprintf(name, sizeof(name), "db_%04x_%s", i, suffix);
275 r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
276 IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
277 { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
278 });
279 }
280 }
281
282 static int
uint_or_size_dbt_cmp(DB * db,const DBT * a,const DBT * b)283 uint_or_size_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
284 assert(db && a && b);
285 if (a->size == sizeof(unsigned int) && b->size == sizeof(unsigned int)) {
286 return uint_dbt_cmp(db, a, b);
287 }
288 return a->size - b->size;
289 }
290
run_test(uint32_t nr,uint32_t wdb,uint32_t wrow,enum how_to_fail htf)291 static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail htf) {
292 num_rows = nr; which_db_to_fail = wdb; which_row_to_fail = wrow; how_to_fail = htf;
293
294 int r;
295 toku_os_recursive_delete(env_dir);
296 r = toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
297
298 r = db_env_create(&env, 0); CKERR(r);
299 r = env->set_default_bt_compare(env, uint_or_size_dbt_cmp); CKERR(r);
300 r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
301 CKERR(r);
302 int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG | DB_CREATE | DB_PRIVATE;
303 r = env->open(env, env_dir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
304 env->set_errfile(env, stderr);
305 //Disable auto-checkpointing
306 r = env->checkpointing_set_period(env, 0); CKERR(r);
307
308 DB **XMALLOC_N(NUM_DBS, dbs);
309 DB **XMALLOC_N(NUM_DBS, check_dbs);
310 int idx[NUM_DBS];
311
312 create_and_open_dbs(dbs, "loader", &idx[0]);
313 if (do_check && how_to_fail==FAIL_NONE) {
314 create_and_open_dbs(check_dbs, "check", &idx[0]);
315 }
316
317 if (verbose) printf("running test_loader()\n");
318 // -------------------------- //
319 test_loader_maxsize(dbs, check_dbs);
320 // -------------------------- //
321 if (verbose) printf("done test_loader()\n");
322
323 for(int i=0;i<NUM_DBS;i++) {
324 dbs[i]->close(dbs[i], 0); CKERR(r);
325 dbs[i] = NULL;
326 if (do_check && how_to_fail==FAIL_NONE) {
327 check_dbs[i]->close(check_dbs[i], 0); CKERR(r);
328 check_dbs[i] = NULL;
329 }
330 }
331 r = env->close(env, 0); CKERR(r);
332 toku_free(dbs);
333 toku_free(check_dbs);
334 }
335
336 // ------------ infrastructure ----------
337 static void do_args(int argc, char * const argv[]);
338
339 int num_rows_set = false;
340
test_main(int argc,char * const * argv)341 int test_main(int argc, char * const *argv) {
342 do_args(argc, argv);
343
344 run_test(1, (uint32_t) -1, (uint32_t) -1, FAIL_NONE);
345 run_test(1, 1, 0, FAIL_NONE);
346 run_test(1, 1, 0, FAIL_KSIZE);
347 run_test(1, 1, 0, FAIL_VSIZE);
348 if (!fast) {
349 run_test(1000000, 1, 500000, FAIL_KSIZE);
350 run_test(1000000, 1, 500000, FAIL_VSIZE);
351 }
352 toku_free(free_me);
353 return 0;
354 }
355
do_args(int argc,char * const argv[])356 static void do_args(int argc, char * const argv[]) {
357 int resultcode;
358 char *cmd = argv[0];
359 argc--; argv++;
360 while (argc>0) {
361 if (strcmp(argv[0], "-h")==0) {
362 resultcode=0;
363 do_usage:
364 fprintf(stderr, "Usage: %s [-h] [-v] [-q] [-p] [-f]\n", cmd);
365 fprintf(stderr, " where -e <env> uses <env> to construct the directory (so that different tests can run concurrently)\n");
366 fprintf(stderr, " -h help\n");
367 fprintf(stderr, " -v verbose\n");
368 fprintf(stderr, " -q quiet\n");
369 fprintf(stderr, " -z compress intermediates\n");
370 fprintf(stderr, " -c compare with regular dbs\n");
371 fprintf(stderr, " -f fast (suitable for vgrind)\n");
372 exit(resultcode);
373 } else if (strcmp(argv[0], "-c")==0) {
374 do_check = true;
375 } else if (strcmp(argv[0], "-v")==0) {
376 verbose++;
377 } else if (strcmp(argv[0],"-q")==0) {
378 verbose--;
379 if (verbose<0) verbose=0;
380 } else if (strcmp(argv[0], "-z")==0) {
381 USE_COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
382 } else if (strcmp(argv[0], "-f")==0) {
383 fast = true;
384 } else {
385 fprintf(stderr, "Unknown arg: %s\n", argv[0]);
386 resultcode=1;
387 goto do_usage;
388 }
389 argc--;
390 argv++;
391 }
392 }
393