1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 // test the loader write dbfile function
40 
41 #define DONT_DEPRECATE_WRITES
42 #define DONT_DEPRECATE_MALLOC
43 
44 #include "test.h"
45 #include "loader/loader-internal.h"
46 #include "ftloader-error-injector.h"
47 #include <portability/toku_path.h>
48 
49 
qsort_compare_ints(const void * a,const void * b)50 static int qsort_compare_ints (const void *a, const void *b) {
51     int avalue = *(int*)a;
52     int bvalue = *(int*)b;
53     if (avalue<bvalue) return -1;
54     if (avalue>bvalue) return +1;
55     return 0;
56 
57 }
58 
compare_ints(DB * UU (desc),const DBT * akey,const DBT * bkey)59 static int compare_ints (DB *UU(desc), const DBT *akey, const DBT *bkey) {
60     assert(akey->size==sizeof(int));
61     assert(bkey->size==sizeof(int));
62     return qsort_compare_ints(akey->data, bkey->data);
63 }
64 
err_cb(DB * db UU (),int dbn UU (),int err UU (),DBT * key UU (),DBT * val UU (),void * extra UU ())65 static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
66     fprintf(stderr, "error in test");
67     abort();
68 }
69 
write_dbfile(char * tf_template,int n,char * output_name,bool expect_error,int testno)70 static int write_dbfile (char *tf_template, int n, char *output_name, bool expect_error, int testno) {
71     if (verbose) printf("test start %d %d testno=%d\n", n, expect_error, testno);
72 
73     int result = 0;
74     DB *dest_db = NULL;
75     struct ft_loader_s bl;
76     ZERO_STRUCT(bl);
77     bl.temp_file_template = tf_template;
78     bl.reserved_memory = 512*1024*1024;
79     int r = ft_loader_init_file_infos(&bl.file_infos); CKERR(r);
80     ft_loader_lock_init(&bl);
81     ft_loader_set_fractal_workers_count_from_c(&bl);
82 
83     struct merge_fileset fs;
84     init_merge_fileset(&fs);
85 
86     // put rows in the row set
87     struct rowset aset;
88     uint64_t size_est = 0;
89     init_rowset(&aset, toku_ft_loader_get_rowset_budget_for_testing());
90     for (int i=0; i<n; i++) {
91 	DBT key;
92         toku_fill_dbt(&key, &i, sizeof i);
93 	DBT val;
94         toku_fill_dbt(&val, &i, sizeof i);
95 	add_row(&aset, &key, &val);
96 	size_est += ft_loader_leafentry_size(key.size, val.size, TXNID_NONE);
97     }
98 
99     toku_ft_loader_set_n_rows(&bl, n);
100 
101     ft_loader_init_error_callback(&bl.error_callback);
102     ft_loader_set_error_function(&bl.error_callback, err_cb, NULL);
103     ft_loader_init_poll_callback(&bl.poll_callback);
104     r = ft_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints);  CKERR(r);
105 
106     ft_loader_fi_close_all(&bl.file_infos);
107 
108     QUEUE q;
109     r = toku_queue_create(&q, 0xFFFFFFFF); // infinite queue.
110     assert(r==0);
111     r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
112     assert(fs.n_temp_files==0);
113 
114     QUEUE q2;
115     r = toku_queue_create(&q2, 0xFFFFFFFF); // infinite queue.
116     assert(r==0);
117 
118     size_t num_found = 0;
119     size_t found_size_est = 0;
120     while (1) {
121 	void *v;
122 	r = toku_queue_deq(q, &v, NULL, NULL);
123 	if (r==EOF) break;
124 	struct rowset *rs = (struct rowset *)v;
125 	if (verbose) printf("v=%p\n", v);
126 
127 	for (size_t i=0; i<rs->n_rows; i++) {
128 	    struct row *row = &rs->rows[i];
129 	    assert(row->klen==sizeof(int));
130 	    assert(row->vlen==sizeof(int));
131 	    assert((int)(num_found+i)==*(int*)(rs->data+row->off));
132 	    found_size_est += ft_loader_leafentry_size(row->klen, row->vlen, TXNID_NONE);
133 	}
134 
135 	num_found += rs->n_rows;
136 
137 	r = toku_queue_enq(q2, v, 0, NULL);
138 	assert(r==0);
139     }
140     assert((int)num_found == n);
141     if (!expect_error) assert(found_size_est == size_est);
142 
143     r = toku_queue_eof(q2);
144     assert(r==0);
145 
146     r = toku_queue_destroy(q);
147     assert(r==0);
148 
149     DESCRIPTOR_S desc;
150     toku_fill_dbt(&desc.dbt, "abcd", 4);
151 
152     int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
153     assert(fd>=0);
154 
155     toku_set_func_malloc_only(my_malloc);
156     toku_set_func_realloc_only(my_realloc);
157     toku_set_func_fwrite(bad_fwrite);
158     toku_set_func_write(bad_write);
159     toku_set_func_pwrite(bad_pwrite);
160     ft_loader_set_error_function(&bl.error_callback, NULL, NULL);
161     ft_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
162 
163     result = toku_loader_write_ft_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 16);
164 
165     toku_set_func_malloc_only(NULL);
166     toku_set_func_realloc_only(NULL);
167     toku_set_func_fwrite(nullptr);
168     toku_set_func_write(NULL);
169     toku_set_func_pwrite(NULL);
170 
171     ft_loader_destroy_error_callback(&bl.error_callback);
172     ft_loader_destroy_poll_callback(&bl.poll_callback);
173     ft_loader_lock_destroy(&bl);
174 
175     r = toku_queue_destroy(q2);
176     assert(r==0);
177 
178     destroy_merge_fileset(&fs);
179     ft_loader_fi_destroy(&bl.file_infos, expect_error);
180 
181     return result;
182 }
183 
usage(const char * progname,int n)184 static int usage(const char *progname, int n) {
185     fprintf(stderr, "Usage: %s [options] directory\n", progname);
186     fprintf(stderr, "[-v] turn on verbose\n");
187     fprintf(stderr, "[-q] turn off verbose\n");
188     fprintf(stderr, "[-r %d] set the number of rows\n", n);
189     fprintf(stderr, "[-s] set the small loader size factor\n");
190     fprintf(stderr, "[-m] inject big malloc and realloc errors\n");
191     fprintf(stderr, "[--malloc_limit %u] set the threshold for failing malloc and realloc\n", (unsigned) my_big_malloc_limit);
192     fprintf(stderr, "[--realloc_errors] inject realloc errors\n");
193     fprintf(stderr, "[-w] inject write errors\n");
194     fprintf(stderr, "[-u] inject user errors\n");
195     return 1;
196 }
197 
test_main(int argc,const char * argv[])198 int test_main (int argc, const char *argv[]) {
199     const char *progname=argv[0];
200     int n = 1;
201     argc--; argv++;
202     while (argc>0) {
203         if (strcmp(argv[0],"-h")==0) {
204             return usage(progname, n);
205 	} else if (strcmp(argv[0],"-v")==0) {
206 	    verbose=1;
207 	} else if (strcmp(argv[0],"-q")==0) {
208 	    verbose=0;
209         } else if (strcmp(argv[0],"-r") == 0) {
210             argc--; argv++;
211             n = atoi(argv[0]);
212         } else if (strcmp(argv[0],"-s") == 0) {
213             toku_ft_loader_set_size_factor(1);
214         } else if (strcmp(argv[0],"-w") == 0) {
215             do_write_errors = 1;
216         } else if (strcmp(argv[0],"-m") == 0) {
217             do_malloc_errors = 1;
218             do_realloc_errors = 1;
219         } else if (strcmp(argv[0],"-u") == 0) {
220             do_user_errors = 1;
221         } else if (strcmp(argv[0],"--realloc_errors") == 0) {
222             do_realloc_errors = 1;
223         } else if (strcmp(argv[0],"--malloc_limit") == 0 && argc > 1) {
224             argc--; argv++;
225             my_big_malloc_limit = atoi(argv[0]);
226 	} else if (argc!=1) {
227             return usage(progname, n);
228 	}
229         else {
230             break;
231         }
232 	argc--; argv++;
233     }
234     const char* directory = TOKU_TEST_FILENAME;
235     char unlink_all[strlen(directory)+20];
236     snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory);
237 
238     int  templen = strlen(directory)+15;
239     char tf_template[templen];
240     int tlen = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
241     assert (tlen>0 && tlen<templen);
242 
243     char output_name[templen];
244     int  olen = snprintf(output_name, templen, "%s/test.tokudb", directory);
245     assert (olen>0 && olen<templen);
246 
247     // callibrate
248     int r;
249     r = system(unlink_all); CKERR(r);
250     r = toku_os_mkdir(directory, 0755); CKERR(r);
251     r = write_dbfile(tf_template, n, output_name, false, 0); CKERR(r);
252 
253     if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count);
254     if (verbose) printf("my_realloc_count=%d big_count=%d\n", my_realloc_count, my_big_realloc_count);
255 
256     int event_limit = event_count;
257     if (verbose) printf("event_limit=%d\n", event_limit);
258 
259     // we computed an upper bound on the number of events.  since the loader continues to malloc after a
260     // malloc failure, the actual number of events that can induce a failed load is less than the upper
261     // bound.
262     for (int i = 1; i <= event_limit; i++) {
263         reset_event_counts();
264         reset_my_malloc_counts();
265         event_count_trigger = i;
266         r = system(unlink_all); CKERR(r);
267         r = toku_os_mkdir(directory, 0755); CKERR(r);
268         r = write_dbfile(tf_template, n, output_name, true, i);
269         if (verbose) printf("event_count=%d\n", event_count);
270         if (r == 0)
271             break;
272     }
273 
274     r = system(unlink_all); CKERR(r);
275 
276     return 0;
277 }
278