1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 // test the loader write dbfile function
40 
41 #define DONT_DEPRECATE_WRITES
42 #define DONT_DEPRECATE_MALLOC
43 
44 #include "test.h"
45 #include "loader/loader-internal.h"
46 #include <portability/toku_path.h>
47 
48 static int event_count, event_count_trigger;
49 
my_assert_hook(void)50 static void my_assert_hook (void) {
51     fprintf(stderr, "event_count=%d\n", event_count);
52 }
53 
reset_event_counts(void)54 static void reset_event_counts(void) {
55     event_count = event_count_trigger = 0;
56 }
57 
event_hit(void)58 static void event_hit(void) {
59 }
60 
loader_poll_callback(void * UU (extra),float UU (progress))61 static int loader_poll_callback(void *UU(extra), float UU(progress)) {
62     int r;
63     event_count++;
64     if (event_count_trigger == event_count) {
65         event_hit();
66         if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
67         r = TOKUDB_CANCELED;
68     } else {
69         r = 0;
70     }
71     return r;
72 }
73 
bad_fwrite(const void * ptr,size_t size,size_t nmemb,FILE * stream)74 static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
75     event_count++;
76     size_t r;
77     if (event_count_trigger == event_count) {
78         event_hit();
79         if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
80 	errno = ENOSPC;
81 	r = (size_t) -1;
82     } else {
83 	r = fwrite(ptr, size, nmemb, stream);
84 	if (r!=nmemb) {
85 	    errno = ferror(stream);
86 	}
87     }
88     return r;
89 }
90 
bad_write(int fd,const void * bp,size_t len)91 static ssize_t bad_write(int fd, const void * bp, size_t len) {
92     ssize_t r;
93     event_count++;
94     if (event_count_trigger == event_count) {
95         event_hit();
96         if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
97 	errno = ENOSPC;
98 	r = -1;
99     } else {
100 	r = write(fd, bp, len);
101     }
102     return r;
103 }
104 
bad_pwrite(int fd,const void * bp,size_t len,toku_off_t off)105 static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
106     ssize_t r;
107     event_count++;
108     if (event_count_trigger == event_count) {
109         event_hit();
110         if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
111 	errno = ENOSPC;
112 	r = -1;
113     } else {
114 	r = pwrite(fd, bp, len, off);
115     }
116     return r;
117 }
118 
119 static FILE *
bad_fdopen(int fd,const char * mode)120 bad_fdopen(int fd, const char * mode) {
121     FILE * rval;
122     event_count++;
123     if (event_count_trigger == event_count) {
124 	event_hit();
125         if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
126 	errno = EINVAL;
127 	rval  = NULL;
128     } else {
129 	rval = fdopen(fd, mode);
130     }
131     return rval;
132 }
133 
134 static FILE *
bad_fopen(const char * filename,const char * mode)135 bad_fopen(const char *filename, const char *mode) {
136     FILE * rval;
137     event_count++;
138     if (event_count_trigger == event_count) {
139 	event_hit();
140         if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
141 	errno = EINVAL;
142 	rval  = NULL;
143     } else {
144 	rval = fopen(filename, mode);
145     }
146     return rval;
147 }
148 
149 
150 static int
bad_open(const char * path,int oflag,int mode)151 bad_open(const char *path, int oflag, int mode) {
152     int rval;
153     event_count++;
154     if (event_count_trigger == event_count) {
155 	event_hit();
156         if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
157 	errno = EINVAL;
158 	rval = -1;
159     } else {
160 	rval = open(path, oflag, mode);
161     }
162     return rval;
163 }
164 
165 
166 
167 static int
bad_fclose(FILE * stream)168 bad_fclose(FILE * stream) {
169     int rval;
170     event_count++;
171     // Must close the stream even in the "error case" because otherwise there is no way to get the memory back.
172     rval = fclose(stream);
173     if (rval==0) {
174 	if (event_count_trigger == event_count) {
175             if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
176 	    errno = ENOSPC;
177 	    rval = -1;
178 	}
179     }
180     return rval;
181 }
182 
183 int bad_read_errno = 0;
184 
185 static ssize_t
bad_read(int fd,void * buf,size_t count)186 bad_read(int fd, void *buf, size_t count) {
187     ssize_t rval;
188     event_count++;
189     if (event_count_trigger == event_count) {
190         event_hit();
191         if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
192         errno = bad_read_errno;
193         rval = -1;
194     } else
195         rval = read(fd, buf, count);
196     return rval;
197 }
198 
199 static int my_malloc_event = 1;
200 static int my_malloc_count = 0, my_big_malloc_count = 0;
201 static int my_realloc_count = 0, my_big_realloc_count = 0;
202 
reset_my_malloc_counts(void)203 static void reset_my_malloc_counts(void) {
204     my_malloc_count = my_big_malloc_count = 0;
205     my_realloc_count = my_big_realloc_count = 0;
206 }
207 
208 size_t min_malloc_error_size = 0;
209 
my_malloc(size_t n)210 static void *my_malloc(size_t n) {
211     my_malloc_count++;
212     if (n >= min_malloc_error_size) {
213         my_big_malloc_count++;
214         if (my_malloc_event) {
215             event_count++;
216             if (event_count == event_count_trigger) {
217                 event_hit();
218                 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
219                 errno = ENOMEM;
220                 return NULL;
221             }
222         }
223     }
224     return os_malloc(n);
225 }
226 
227 static int do_realloc_errors = 1;
228 
my_realloc(void * p,size_t n)229 static void *my_realloc(void *p, size_t n) {
230     my_realloc_count++;
231     if (n >= min_malloc_error_size) {
232         my_big_realloc_count++;
233         if (do_realloc_errors) {
234             event_count++;
235             if (event_count == event_count_trigger) {
236                 event_hit();
237                 if (verbose) printf("%s %d\n", __FUNCTION__, event_count);
238                 errno = ENOMEM;
239                 return NULL;
240             }
241         }
242     }
243     return os_realloc(p, n);
244 }
245 
246 
qsort_compare_ints(const void * a,const void * b)247 static int qsort_compare_ints (const void *a, const void *b) {
248     int avalue = *(int*)a;
249     int bvalue = *(int*)b;
250     if (avalue<bvalue) return -1;
251     if (avalue>bvalue) return +1;
252     return 0;
253 
254 }
255 
compare_ints(DB * UU (desc),const DBT * akey,const DBT * bkey)256 static int compare_ints (DB* UU(desc), const DBT *akey, const DBT *bkey) {
257     assert(akey->size==sizeof(int));
258     assert(bkey->size==sizeof(int));
259     return qsort_compare_ints(akey->data, bkey->data);
260 }
261 
errorstr_static(int err)262 static char *errorstr_static (int err) {
263     static char errorstr[100];
264     toku_ft_strerror_r(err, errorstr, sizeof(errorstr));
265     return errorstr;
266 }
267 
268 
err_cb(DB * db UU (),int dbn,int err,DBT * key UU (),DBT * val UU (),void * extra UU ())269 static void err_cb(DB *db UU(), int dbn, int err, DBT *key UU(), DBT *val UU(), void *extra UU()) {
270     fprintf(stderr, "error in test dbn=%d err=%d (%s)\n", dbn, err, errorstr_static(err));
271     abort();
272 }
273 
274 enum { N_SOURCES = 2, N_DEST_DBS=1 };
275 
276 int N_RECORDS = 10;
277 
make_fname(const char * directory,const char * fname,int idx)278 static char *make_fname(const char *directory, const char *fname, int idx) {
279     int len = strlen(directory)+strlen(fname)+20;
280     char *XMALLOC_N(len, result);
281     int r = snprintf(result, len, "%s/%s%d", directory, fname, idx);
282     assert(r<len);
283     return result; // don't care that it's a little too long.
284 }
285 
286 
287 struct consumer_thunk {
288     QUEUE q;
289     int64_t n_read;
290 };
291 
consumer_thread(void * ctv)292 static void *consumer_thread (void *ctv) {
293     struct consumer_thunk *cthunk = (struct consumer_thunk *)ctv;
294     while (1) {
295 	void *item;
296 	int r = toku_queue_deq(cthunk->q, &item, NULL, NULL);
297 	if (r==EOF) return NULL;
298 	assert(r==0);
299 	struct rowset *rowset = (struct rowset *)item;
300 	cthunk->n_read += rowset->n_rows;
301 	destroy_rowset(rowset);
302 	toku_free(rowset);
303     }
304 }
305 
306 
test(const char * directory,bool is_error)307 static void test (const char *directory, bool is_error) {
308 
309     int *XMALLOC_N(N_SOURCES, fds);
310 
311     char **XMALLOC_N(N_SOURCES, fnames);
312     int *XMALLOC_N(N_SOURCES, n_records_in_fd);
313     for (int i=0; i<N_SOURCES; i++) {
314 	fnames[i] = make_fname(directory, "temp", i);
315 	fds[i] = open(fnames[i], O_CREAT|O_RDWR, S_IRWXU);
316 	assert(fds[i]>=0);
317 	n_records_in_fd[i] = 0;
318     }
319     for (int i=0; i<N_RECORDS; i++) {
320 	int size=4;
321 	int fdi = random()%N_SOURCES;
322 	int fd  = fds[fdi];
323 	{ int r = write(fd, &size, 4);  assert(r==4); }
324 	{ int r = write(fd, &i,    4);  assert(r==4); }
325 	{ int r = write(fd, &size, 4);  assert(r==4); }
326 	{ int r = write(fd, &i,    4);  assert(r==4); }
327 	n_records_in_fd[fdi]++;
328     }
329     for (int i=0; i<N_SOURCES; i++) {
330 	toku_off_t r = lseek(fds[i], 0, SEEK_SET);
331 	assert(r==0);
332     }
333 
334     FTLOADER bl;
335     FT_HANDLE *XCALLOC_N(N_DEST_DBS, fts);
336     DB* *XCALLOC_N(N_DEST_DBS, dbs);
337     const char **XMALLOC_N(N_DEST_DBS, new_fnames_in_env);
338     for (int i=0; i<N_DEST_DBS; i++) {
339 	char s[100];
340 	snprintf(s, sizeof(s), "db%d.db", i);
341 	new_fnames_in_env[i] = toku_strdup(s);
342 	assert(new_fnames_in_env[i]);
343     }
344     ft_compare_func *XMALLOC_N(N_DEST_DBS, bt_compare_functions);
345     bt_compare_functions[0] = compare_ints;
346     CACHETABLE ct;
347     enum {CACHETABLE_SIZE = 64*1024};
348     {
349 	toku_cachetable_create(&ct, CACHETABLE_SIZE, (LSN){1}, NULL);
350     }
351     LSN *XMALLOC(lsnp);
352     {
353 	int r = toku_ft_loader_internal_init (&bl,
354 					       ct,
355 					       (generate_row_for_put_func)NULL,
356 					       (DB*)NULL,
357 					       N_DEST_DBS, fts, dbs,
358 					       new_fnames_in_env,
359 					       bt_compare_functions,
360 					       "tempxxxxxx",
361 					       *lsnp,
362                                                nullptr, true, 0, false, true);
363 	assert(r==0);
364     }
365 
366     ft_loader_init_error_callback(&bl->error_callback);
367     ft_loader_set_error_function(&bl->error_callback, err_cb, NULL);
368     ft_loader_init_poll_callback(&bl->poll_callback);
369     ft_loader_set_poll_function(&bl->poll_callback, loader_poll_callback, NULL);
370     ft_loader_set_fractal_workers_count_from_c(bl);
371 
372     QUEUE q;
373     { int r = toku_queue_create(&q, 1000); assert(r==0); }
374     DBUFIO_FILESET bfs;
375     const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues.
376     { int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE, false);  assert(r==0); }
377     FIDX *XMALLOC_N(N_SOURCES, src_fidxs);
378     assert(bl->file_infos.n_files==0);
379     bl->file_infos.n_files = N_SOURCES;
380     bl->file_infos.n_files_limit = N_SOURCES;
381     bl->file_infos.n_files_open  = 0;
382     bl->file_infos.n_files_extant = 0;
383     XREALLOC_N(bl->file_infos.n_files_limit, bl->file_infos.file_infos);
384     for (int i=0; i<N_SOURCES; i++) {
385 	// all we really need is the number of records in the file.  The rest of the file_info is unused by the dbufio code.n
386 	bl->file_infos.file_infos[i].n_rows = n_records_in_fd[i];
387 	// However we need these for the destroy method to work right.
388 	bl->file_infos.file_infos[i].is_extant = false;
389 	bl->file_infos.file_infos[i].is_open   = false;
390 	bl->file_infos.file_infos[i].buffer    = NULL;
391 	src_fidxs[i].idx = i;
392     }
393     toku_pthread_t consumer;
394     struct consumer_thunk cthunk = {q, 0};
395     {
396         int r = toku_pthread_create(toku_uninstrumented,
397                                     &consumer,
398                                     nullptr,
399                                     consumer_thread,
400                                     static_cast<void *>(&cthunk));
401         assert(r == 0);
402     }
403 
404     toku_set_func_malloc_only(my_malloc);
405     toku_set_func_realloc_only(my_realloc);
406     toku_set_func_fwrite(bad_fwrite);
407     toku_set_func_write(bad_write);
408     toku_set_func_pwrite(bad_pwrite);
409     toku_set_func_fdopen(bad_fdopen);
410     toku_set_func_fopen(bad_fopen);
411     toku_set_func_open(bad_open);
412     toku_set_func_fclose(bad_fclose);
413     if (bad_read_errno) toku_set_func_read(bad_read);
414 
415     int result = 0;
416     {
417 	int r = toku_merge_some_files_using_dbufio(true, FIDX_NULL, q, N_SOURCES, bfs, src_fidxs, bl, 0, (DB*)NULL, compare_ints, 10000);
418 	if (is_error && r!=0) {
419 	    result = r;
420 	} else {
421 	    if (r!=0) printf("%s:%d r=%d (%s)\n", __FILE__, __LINE__, r, errorstr_static(r));
422 	    assert(r==0);
423 	}
424         if (r)
425             panic_dbufio_fileset(bfs, r);
426     }
427     {
428 	int r = toku_queue_eof(q);
429 	assert(r==0);
430     }
431 
432     toku_set_func_malloc(NULL);
433     toku_set_func_realloc(NULL);
434     toku_set_func_fwrite(nullptr);
435     toku_set_func_write(NULL);
436     toku_set_func_pwrite(NULL);
437     toku_set_func_fdopen(NULL);
438     toku_set_func_fopen(NULL);
439     toku_set_func_open(NULL);
440     toku_set_func_fclose(NULL);
441     toku_set_func_read(NULL);
442     do_assert_hook = my_assert_hook;
443 
444     {
445 	void *vresult;
446 	int r = toku_pthread_join(consumer, &vresult);
447 	assert(r==0);
448 	assert(vresult==NULL);
449 	//printf("n_read = %ld, N_SOURCES=%d N_RECORDS=%d\n", cthunk.n_read, N_SOURCES, N_RECORDS);
450 	if (result==0) {
451 	    assert(cthunk.n_read == N_RECORDS);
452 	}
453     }
454     {
455 	int r = toku_queue_destroy(q);
456 	assert(r==0);
457     }
458     toku_ft_loader_internal_destroy(bl, false);
459     {
460 	toku_cachetable_close(&ct);
461     }
462     for (int i=0; i<N_DEST_DBS; i++) {
463 	toku_free((void*)new_fnames_in_env[i]);
464     }
465     for (int i=0; i<N_SOURCES; i++) {
466 	toku_free(fnames[i]);
467     }
468     destroy_dbufio_fileset(bfs);
469     toku_free(fnames);
470     toku_free(fds);
471     toku_free(fts);
472     toku_free(dbs);
473     toku_free(new_fnames_in_env);
474     toku_free(bt_compare_functions);
475     toku_free(lsnp);
476     toku_free(src_fidxs);
477     toku_free(n_records_in_fd);
478 }
479 
480 
usage(const char * progname,int n)481 static int usage(const char *progname, int n) {
482     fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] [-m] [-tend NEVENTS] directory\n", progname, n);
483     fprintf(stderr, "[-v] turn on verbose\n");
484     fprintf(stderr, "[-q] turn off verbose\n");
485     fprintf(stderr, "[-r %d] set the number of rows\n", n);
486     fprintf(stderr, "[-s] set the small loader size factor\n");
487     fprintf(stderr, "[-m] inject big malloc failures\n");
488     fprintf(stderr, "[-tend NEVENTS] stop testing after N events\n");
489     fprintf(stderr, "[-bad_read_errno ERRNO]\n");
490     return 1;
491 }
492 
test_main(int argc,const char * argv[])493 int test_main (int argc, const char *argv[]) {
494     int tstart = 0;
495     int tend = -1;
496     const char *progname=argv[0];
497     argc--; argv++;
498     while (argc>0) {
499         if (strcmp(argv[0],"-h")==0) {
500             return usage(progname, N_RECORDS);
501 	} else if (strcmp(argv[0],"-v")==0) {
502 	    verbose=1;
503 	} else if (strcmp(argv[0],"-q")==0) {
504 	    verbose=0;
505         } else if (strcmp(argv[0],"-r") == 0) {
506             argc--; argv++;
507             N_RECORDS = atoi(argv[0]);
508         } else if (strcmp(argv[0],"-s") == 0) {
509             toku_ft_loader_set_size_factor(1);
510         } else if (strcmp(argv[0],"-m") == 0) {
511             my_malloc_event = 1;
512 	} else if (strcmp(argv[0],"-tend") == 0 && argc > 1) {
513             argc--; argv++;
514 	    tend = atoi(argv[0]);
515 	} else if (strcmp(argv[0],"-tstart") == 0 && argc > 1) {
516             argc--; argv++;
517 	    tstart = atoi(argv[0]);
518         } else if (strcmp(argv[0], "-bad_read_errno") == 0 && argc > 1) {
519             argc--; argv++;
520             bad_read_errno = atoi(argv[0]);
521 	} else if (argc!=1) {
522             return usage(progname, N_RECORDS);
523 	}
524         else {
525             break;
526         }
527 	argc--; argv++;
528     }
529     const char* directory = TOKU_TEST_FILENAME;
530     char unlink_all[strlen(directory)+20];
531     snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory);
532 
533     int  templen = strlen(directory)+15;
534     char tf_template[templen];
535     int tlen = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
536     assert (tlen>0 && tlen<templen);
537 
538     char output_name[templen];
539     int  olen = snprintf(output_name, templen, "%s/test.tokudb", directory);
540     assert (olen>0 && olen<templen);
541 
542     // callibrate
543     int r;
544     r = system(unlink_all); CKERR(r);
545     r = toku_os_mkdir(directory, 0755); CKERR(r);
546     test(directory, false);
547 
548     if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count);
549 
550     {
551 	int event_limit = event_count;
552 	if (tend>=0 && tend<event_limit) event_limit=tend;
553 	if (verbose) printf("event_limit=%d\n", event_limit);
554 
555 	for (int i = tstart+1; i <= event_limit; i++) {
556 	    reset_event_counts();
557 	    reset_my_malloc_counts();
558 	    event_count_trigger = i;
559 	    r = system(unlink_all); CKERR(r);
560 	    r = toku_os_mkdir(directory, 0755); CKERR(r);
561 	    if (verbose) printf("event=%d\n", i);
562 	    test(directory, true);
563 	}
564 	r = system(unlink_all); CKERR(r);
565     }
566 
567     return 0;
568 }
569