1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 // test the loader write dbfile function
40
41 #define DONT_DEPRECATE_WRITES
42 #define DONT_DEPRECATE_MALLOC
43
44 #include "test.h"
45 #include "loader/loader-internal.h"
46 #include <portability/toku_path.h>
47
48 static int event_count, event_count_trigger;
49
my_assert_hook(void)50 static void my_assert_hook (void) {
51 fprintf(stderr, "event_count=%d\n", event_count);
52 }
53
reset_event_counts(void)54 static void reset_event_counts(void) {
55 event_count = event_count_trigger = 0;
56 }
57
event_hit(void)58 static void event_hit(void) {
59 }
60
loader_poll_callback(void * UU (extra),float UU (progress))61 static int loader_poll_callback(void *UU(extra), float UU(progress)) {
62 int r;
63 event_count++;
64 if (event_count_trigger == event_count) {
65 event_hit();
66 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
67 r = TOKUDB_CANCELED;
68 } else {
69 r = 0;
70 }
71 return r;
72 }
73
bad_fwrite(const void * ptr,size_t size,size_t nmemb,FILE * stream)74 static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
75 event_count++;
76 size_t r;
77 if (event_count_trigger == event_count) {
78 event_hit();
79 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
80 errno = ENOSPC;
81 r = (size_t) -1;
82 } else {
83 r = fwrite(ptr, size, nmemb, stream);
84 if (r!=nmemb) {
85 errno = ferror(stream);
86 }
87 }
88 return r;
89 }
90
bad_write(int fd,const void * bp,size_t len)91 static ssize_t bad_write(int fd, const void * bp, size_t len) {
92 ssize_t r;
93 event_count++;
94 if (event_count_trigger == event_count) {
95 event_hit();
96 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
97 errno = ENOSPC;
98 r = -1;
99 } else {
100 r = write(fd, bp, len);
101 }
102 return r;
103 }
104
bad_pwrite(int fd,const void * bp,size_t len,toku_off_t off)105 static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
106 ssize_t r;
107 event_count++;
108 if (event_count_trigger == event_count) {
109 event_hit();
110 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
111 errno = ENOSPC;
112 r = -1;
113 } else {
114 r = pwrite(fd, bp, len, off);
115 }
116 return r;
117 }
118
119 static FILE *
bad_fdopen(int fd,const char * mode)120 bad_fdopen(int fd, const char * mode) {
121 FILE * rval;
122 event_count++;
123 if (event_count_trigger == event_count) {
124 event_hit();
125 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
126 errno = EINVAL;
127 rval = NULL;
128 } else {
129 rval = fdopen(fd, mode);
130 }
131 return rval;
132 }
133
134 static FILE *
bad_fopen(const char * filename,const char * mode)135 bad_fopen(const char *filename, const char *mode) {
136 FILE * rval;
137 event_count++;
138 if (event_count_trigger == event_count) {
139 event_hit();
140 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
141 errno = EINVAL;
142 rval = NULL;
143 } else {
144 rval = fopen(filename, mode);
145 }
146 return rval;
147 }
148
149
150 static int
bad_open(const char * path,int oflag,int mode)151 bad_open(const char *path, int oflag, int mode) {
152 int rval;
153 event_count++;
154 if (event_count_trigger == event_count) {
155 event_hit();
156 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
157 errno = EINVAL;
158 rval = -1;
159 } else {
160 rval = open(path, oflag, mode);
161 }
162 return rval;
163 }
164
165
166
167 static int
bad_fclose(FILE * stream)168 bad_fclose(FILE * stream) {
169 int rval;
170 event_count++;
171 // Must close the stream even in the "error case" because otherwise there is no way to get the memory back.
172 rval = fclose(stream);
173 if (rval==0) {
174 if (event_count_trigger == event_count) {
175 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
176 errno = ENOSPC;
177 rval = -1;
178 }
179 }
180 return rval;
181 }
182
183 int bad_read_errno = 0;
184
185 static ssize_t
bad_read(int fd,void * buf,size_t count)186 bad_read(int fd, void *buf, size_t count) {
187 ssize_t rval;
188 event_count++;
189 if (event_count_trigger == event_count) {
190 event_hit();
191 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
192 errno = bad_read_errno;
193 rval = -1;
194 } else
195 rval = read(fd, buf, count);
196 return rval;
197 }
198
199 static int my_malloc_event = 1;
200 static int my_malloc_count = 0, my_big_malloc_count = 0;
201 static int my_realloc_count = 0, my_big_realloc_count = 0;
202
reset_my_malloc_counts(void)203 static void reset_my_malloc_counts(void) {
204 my_malloc_count = my_big_malloc_count = 0;
205 my_realloc_count = my_big_realloc_count = 0;
206 }
207
208 size_t min_malloc_error_size = 0;
209
my_malloc(size_t n)210 static void *my_malloc(size_t n) {
211 my_malloc_count++;
212 if (n >= min_malloc_error_size) {
213 my_big_malloc_count++;
214 if (my_malloc_event) {
215 event_count++;
216 if (event_count == event_count_trigger) {
217 event_hit();
218 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
219 errno = ENOMEM;
220 return NULL;
221 }
222 }
223 }
224 return os_malloc(n);
225 }
226
227 static int do_realloc_errors = 1;
228
my_realloc(void * p,size_t n)229 static void *my_realloc(void *p, size_t n) {
230 my_realloc_count++;
231 if (n >= min_malloc_error_size) {
232 my_big_realloc_count++;
233 if (do_realloc_errors) {
234 event_count++;
235 if (event_count == event_count_trigger) {
236 event_hit();
237 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
238 errno = ENOMEM;
239 return NULL;
240 }
241 }
242 }
243 return os_realloc(p, n);
244 }
245
246
qsort_compare_ints(const void * a,const void * b)247 static int qsort_compare_ints (const void *a, const void *b) {
248 int avalue = *(int*)a;
249 int bvalue = *(int*)b;
250 if (avalue<bvalue) return -1;
251 if (avalue>bvalue) return +1;
252 return 0;
253
254 }
255
compare_ints(DB * UU (desc),const DBT * akey,const DBT * bkey)256 static int compare_ints (DB* UU(desc), const DBT *akey, const DBT *bkey) {
257 assert(akey->size==sizeof(int));
258 assert(bkey->size==sizeof(int));
259 return qsort_compare_ints(akey->data, bkey->data);
260 }
261
errorstr_static(int err)262 static char *errorstr_static (int err) {
263 static char errorstr[100];
264 toku_ft_strerror_r(err, errorstr, sizeof(errorstr));
265 return errorstr;
266 }
267
268
err_cb(DB * db UU (),int dbn,int err,DBT * key UU (),DBT * val UU (),void * extra UU ())269 static void err_cb(DB *db UU(), int dbn, int err, DBT *key UU(), DBT *val UU(), void *extra UU()) {
270 fprintf(stderr, "error in test dbn=%d err=%d (%s)\n", dbn, err, errorstr_static(err));
271 abort();
272 }
273
274 enum { N_SOURCES = 2, N_DEST_DBS=1 };
275
276 int N_RECORDS = 10;
277
make_fname(const char * directory,const char * fname,int idx)278 static char *make_fname(const char *directory, const char *fname, int idx) {
279 int len = strlen(directory)+strlen(fname)+20;
280 char *XMALLOC_N(len, result);
281 int r = snprintf(result, len, "%s/%s%d", directory, fname, idx);
282 assert(r<len);
283 return result; // don't care that it's a little too long.
284 }
285
286
287 struct consumer_thunk {
288 QUEUE q;
289 int64_t n_read;
290 };
291
consumer_thread(void * ctv)292 static void *consumer_thread (void *ctv) {
293 struct consumer_thunk *cthunk = (struct consumer_thunk *)ctv;
294 while (1) {
295 void *item;
296 int r = toku_queue_deq(cthunk->q, &item, NULL, NULL);
297 if (r==EOF) return NULL;
298 assert(r==0);
299 struct rowset *rowset = (struct rowset *)item;
300 cthunk->n_read += rowset->n_rows;
301 destroy_rowset(rowset);
302 toku_free(rowset);
303 }
304 }
305
306
test(const char * directory,bool is_error)307 static void test (const char *directory, bool is_error) {
308
309 int *XMALLOC_N(N_SOURCES, fds);
310
311 char **XMALLOC_N(N_SOURCES, fnames);
312 int *XMALLOC_N(N_SOURCES, n_records_in_fd);
313 for (int i=0; i<N_SOURCES; i++) {
314 fnames[i] = make_fname(directory, "temp", i);
315 fds[i] = open(fnames[i], O_CREAT|O_RDWR, S_IRWXU);
316 assert(fds[i]>=0);
317 n_records_in_fd[i] = 0;
318 }
319 for (int i=0; i<N_RECORDS; i++) {
320 int size=4;
321 int fdi = random()%N_SOURCES;
322 int fd = fds[fdi];
323 { int r = write(fd, &size, 4); assert(r==4); }
324 { int r = write(fd, &i, 4); assert(r==4); }
325 { int r = write(fd, &size, 4); assert(r==4); }
326 { int r = write(fd, &i, 4); assert(r==4); }
327 n_records_in_fd[fdi]++;
328 }
329 for (int i=0; i<N_SOURCES; i++) {
330 toku_off_t r = lseek(fds[i], 0, SEEK_SET);
331 assert(r==0);
332 }
333
334 FTLOADER bl;
335 FT_HANDLE *XCALLOC_N(N_DEST_DBS, fts);
336 DB* *XCALLOC_N(N_DEST_DBS, dbs);
337 const char **XMALLOC_N(N_DEST_DBS, new_fnames_in_env);
338 for (int i=0; i<N_DEST_DBS; i++) {
339 char s[100];
340 snprintf(s, sizeof(s), "db%d.db", i);
341 new_fnames_in_env[i] = toku_strdup(s);
342 assert(new_fnames_in_env[i]);
343 }
344 ft_compare_func *XMALLOC_N(N_DEST_DBS, bt_compare_functions);
345 bt_compare_functions[0] = compare_ints;
346 CACHETABLE ct;
347 enum {CACHETABLE_SIZE = 64*1024};
348 {
349 toku_cachetable_create(&ct, CACHETABLE_SIZE, (LSN){1}, NULL);
350 }
351 LSN *XMALLOC(lsnp);
352 {
353 int r = toku_ft_loader_internal_init (&bl,
354 ct,
355 (generate_row_for_put_func)NULL,
356 (DB*)NULL,
357 N_DEST_DBS, fts, dbs,
358 new_fnames_in_env,
359 bt_compare_functions,
360 "tempxxxxxx",
361 *lsnp,
362 nullptr, true, 0, false, true);
363 assert(r==0);
364 }
365
366 ft_loader_init_error_callback(&bl->error_callback);
367 ft_loader_set_error_function(&bl->error_callback, err_cb, NULL);
368 ft_loader_init_poll_callback(&bl->poll_callback);
369 ft_loader_set_poll_function(&bl->poll_callback, loader_poll_callback, NULL);
370 ft_loader_set_fractal_workers_count_from_c(bl);
371
372 QUEUE q;
373 { int r = toku_queue_create(&q, 1000); assert(r==0); }
374 DBUFIO_FILESET bfs;
375 const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues.
376 { int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE, false); assert(r==0); }
377 FIDX *XMALLOC_N(N_SOURCES, src_fidxs);
378 assert(bl->file_infos.n_files==0);
379 bl->file_infos.n_files = N_SOURCES;
380 bl->file_infos.n_files_limit = N_SOURCES;
381 bl->file_infos.n_files_open = 0;
382 bl->file_infos.n_files_extant = 0;
383 XREALLOC_N(bl->file_infos.n_files_limit, bl->file_infos.file_infos);
384 for (int i=0; i<N_SOURCES; i++) {
385 // all we really need is the number of records in the file. The rest of the file_info is unused by the dbufio code.n
386 bl->file_infos.file_infos[i].n_rows = n_records_in_fd[i];
387 // However we need these for the destroy method to work right.
388 bl->file_infos.file_infos[i].is_extant = false;
389 bl->file_infos.file_infos[i].is_open = false;
390 bl->file_infos.file_infos[i].buffer = NULL;
391 src_fidxs[i].idx = i;
392 }
393 toku_pthread_t consumer;
394 struct consumer_thunk cthunk = {q, 0};
395 {
396 int r = toku_pthread_create(toku_uninstrumented,
397 &consumer,
398 nullptr,
399 consumer_thread,
400 static_cast<void *>(&cthunk));
401 assert(r == 0);
402 }
403
404 toku_set_func_malloc_only(my_malloc);
405 toku_set_func_realloc_only(my_realloc);
406 toku_set_func_fwrite(bad_fwrite);
407 toku_set_func_write(bad_write);
408 toku_set_func_pwrite(bad_pwrite);
409 toku_set_func_fdopen(bad_fdopen);
410 toku_set_func_fopen(bad_fopen);
411 toku_set_func_open(bad_open);
412 toku_set_func_fclose(bad_fclose);
413 if (bad_read_errno) toku_set_func_read(bad_read);
414
415 int result = 0;
416 {
417 int r = toku_merge_some_files_using_dbufio(true, FIDX_NULL, q, N_SOURCES, bfs, src_fidxs, bl, 0, (DB*)NULL, compare_ints, 10000);
418 if (is_error && r!=0) {
419 result = r;
420 } else {
421 if (r!=0) printf("%s:%d r=%d (%s)\n", __FILE__, __LINE__, r, errorstr_static(r));
422 assert(r==0);
423 }
424 if (r)
425 panic_dbufio_fileset(bfs, r);
426 }
427 {
428 int r = toku_queue_eof(q);
429 assert(r==0);
430 }
431
432 toku_set_func_malloc(NULL);
433 toku_set_func_realloc(NULL);
434 toku_set_func_fwrite(nullptr);
435 toku_set_func_write(NULL);
436 toku_set_func_pwrite(NULL);
437 toku_set_func_fdopen(NULL);
438 toku_set_func_fopen(NULL);
439 toku_set_func_open(NULL);
440 toku_set_func_fclose(NULL);
441 toku_set_func_read(NULL);
442 do_assert_hook = my_assert_hook;
443
444 {
445 void *vresult;
446 int r = toku_pthread_join(consumer, &vresult);
447 assert(r==0);
448 assert(vresult==NULL);
449 //printf("n_read = %ld, N_SOURCES=%d N_RECORDS=%d\n", cthunk.n_read, N_SOURCES, N_RECORDS);
450 if (result==0) {
451 assert(cthunk.n_read == N_RECORDS);
452 }
453 }
454 {
455 int r = toku_queue_destroy(q);
456 assert(r==0);
457 }
458 toku_ft_loader_internal_destroy(bl, false);
459 {
460 toku_cachetable_close(&ct);
461 }
462 for (int i=0; i<N_DEST_DBS; i++) {
463 toku_free((void*)new_fnames_in_env[i]);
464 }
465 for (int i=0; i<N_SOURCES; i++) {
466 toku_free(fnames[i]);
467 }
468 destroy_dbufio_fileset(bfs);
469 toku_free(fnames);
470 toku_free(fds);
471 toku_free(fts);
472 toku_free(dbs);
473 toku_free(new_fnames_in_env);
474 toku_free(bt_compare_functions);
475 toku_free(lsnp);
476 toku_free(src_fidxs);
477 toku_free(n_records_in_fd);
478 }
479
480
usage(const char * progname,int n)481 static int usage(const char *progname, int n) {
482 fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] [-m] [-tend NEVENTS] directory\n", progname, n);
483 fprintf(stderr, "[-v] turn on verbose\n");
484 fprintf(stderr, "[-q] turn off verbose\n");
485 fprintf(stderr, "[-r %d] set the number of rows\n", n);
486 fprintf(stderr, "[-s] set the small loader size factor\n");
487 fprintf(stderr, "[-m] inject big malloc failures\n");
488 fprintf(stderr, "[-tend NEVENTS] stop testing after N events\n");
489 fprintf(stderr, "[-bad_read_errno ERRNO]\n");
490 return 1;
491 }
492
test_main(int argc,const char * argv[])493 int test_main (int argc, const char *argv[]) {
494 int tstart = 0;
495 int tend = -1;
496 const char *progname=argv[0];
497 argc--; argv++;
498 while (argc>0) {
499 if (strcmp(argv[0],"-h")==0) {
500 return usage(progname, N_RECORDS);
501 } else if (strcmp(argv[0],"-v")==0) {
502 verbose=1;
503 } else if (strcmp(argv[0],"-q")==0) {
504 verbose=0;
505 } else if (strcmp(argv[0],"-r") == 0) {
506 argc--; argv++;
507 N_RECORDS = atoi(argv[0]);
508 } else if (strcmp(argv[0],"-s") == 0) {
509 toku_ft_loader_set_size_factor(1);
510 } else if (strcmp(argv[0],"-m") == 0) {
511 my_malloc_event = 1;
512 } else if (strcmp(argv[0],"-tend") == 0 && argc > 1) {
513 argc--; argv++;
514 tend = atoi(argv[0]);
515 } else if (strcmp(argv[0],"-tstart") == 0 && argc > 1) {
516 argc--; argv++;
517 tstart = atoi(argv[0]);
518 } else if (strcmp(argv[0], "-bad_read_errno") == 0 && argc > 1) {
519 argc--; argv++;
520 bad_read_errno = atoi(argv[0]);
521 } else if (argc!=1) {
522 return usage(progname, N_RECORDS);
523 }
524 else {
525 break;
526 }
527 argc--; argv++;
528 }
529 const char* directory = TOKU_TEST_FILENAME;
530 char unlink_all[strlen(directory)+20];
531 snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory);
532
533 int templen = strlen(directory)+15;
534 char tf_template[templen];
535 int tlen = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
536 assert (tlen>0 && tlen<templen);
537
538 char output_name[templen];
539 int olen = snprintf(output_name, templen, "%s/test.tokudb", directory);
540 assert (olen>0 && olen<templen);
541
542 // callibrate
543 int r;
544 r = system(unlink_all); CKERR(r);
545 r = toku_os_mkdir(directory, 0755); CKERR(r);
546 test(directory, false);
547
548 if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count);
549
550 {
551 int event_limit = event_count;
552 if (tend>=0 && tend<event_limit) event_limit=tend;
553 if (verbose) printf("event_limit=%d\n", event_limit);
554
555 for (int i = tstart+1; i <= event_limit; i++) {
556 reset_event_counts();
557 reset_my_malloc_counts();
558 event_count_trigger = i;
559 r = system(unlink_all); CKERR(r);
560 r = toku_os_mkdir(directory, 0755); CKERR(r);
561 if (verbose) printf("event=%d\n", i);
562 test(directory, true);
563 }
564 r = system(unlink_all); CKERR(r);
565 }
566
567 return 0;
568 }
569