1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 // test the loader write dbfile function
40
41 #define DONT_DEPRECATE_WRITES
42 #define DONT_DEPRECATE_MALLOC
43
44 #include "test.h"
45 #include "loader/loader-internal.h"
46 #include "ftloader-error-injector.h"
47 #include <portability/toku_path.h>
48
49
qsort_compare_ints(const void * a,const void * b)50 static int qsort_compare_ints (const void *a, const void *b) {
51 int avalue = *(int*)a;
52 int bvalue = *(int*)b;
53 if (avalue<bvalue) return -1;
54 if (avalue>bvalue) return +1;
55 return 0;
56
57 }
58
compare_ints(DB * UU (desc),const DBT * akey,const DBT * bkey)59 static int compare_ints (DB *UU(desc), const DBT *akey, const DBT *bkey) {
60 assert(akey->size==sizeof(int));
61 assert(bkey->size==sizeof(int));
62 return qsort_compare_ints(akey->data, bkey->data);
63 }
64
err_cb(DB * db UU (),int dbn UU (),int err UU (),DBT * key UU (),DBT * val UU (),void * extra UU ())65 static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
66 fprintf(stderr, "error in test");
67 abort();
68 }
69
write_dbfile(char * tf_template,int n,char * output_name,bool expect_error,int testno)70 static int write_dbfile (char *tf_template, int n, char *output_name, bool expect_error, int testno) {
71 if (verbose) printf("test start %d %d testno=%d\n", n, expect_error, testno);
72
73 int result = 0;
74 DB *dest_db = NULL;
75 struct ft_loader_s bl;
76 ZERO_STRUCT(bl);
77 bl.temp_file_template = tf_template;
78 bl.reserved_memory = 512*1024*1024;
79 int r = ft_loader_init_file_infos(&bl.file_infos); CKERR(r);
80 ft_loader_lock_init(&bl);
81 ft_loader_set_fractal_workers_count_from_c(&bl);
82
83 struct merge_fileset fs;
84 init_merge_fileset(&fs);
85
86 // put rows in the row set
87 struct rowset aset;
88 uint64_t size_est = 0;
89 init_rowset(&aset, toku_ft_loader_get_rowset_budget_for_testing());
90 for (int i=0; i<n; i++) {
91 DBT key;
92 toku_fill_dbt(&key, &i, sizeof i);
93 DBT val;
94 toku_fill_dbt(&val, &i, sizeof i);
95 add_row(&aset, &key, &val);
96 size_est += ft_loader_leafentry_size(key.size, val.size, TXNID_NONE);
97 }
98
99 toku_ft_loader_set_n_rows(&bl, n);
100
101 ft_loader_init_error_callback(&bl.error_callback);
102 ft_loader_set_error_function(&bl.error_callback, err_cb, NULL);
103 ft_loader_init_poll_callback(&bl.poll_callback);
104 r = ft_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
105
106 ft_loader_fi_close_all(&bl.file_infos);
107
108 QUEUE q;
109 r = toku_queue_create(&q, 0xFFFFFFFF); // infinite queue.
110 assert(r==0);
111 r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
112 assert(fs.n_temp_files==0);
113
114 QUEUE q2;
115 r = toku_queue_create(&q2, 0xFFFFFFFF); // infinite queue.
116 assert(r==0);
117
118 size_t num_found = 0;
119 size_t found_size_est = 0;
120 while (1) {
121 void *v;
122 r = toku_queue_deq(q, &v, NULL, NULL);
123 if (r==EOF) break;
124 struct rowset *rs = (struct rowset *)v;
125 if (verbose) printf("v=%p\n", v);
126
127 for (size_t i=0; i<rs->n_rows; i++) {
128 struct row *row = &rs->rows[i];
129 assert(row->klen==sizeof(int));
130 assert(row->vlen==sizeof(int));
131 assert((int)(num_found+i)==*(int*)(rs->data+row->off));
132 found_size_est += ft_loader_leafentry_size(row->klen, row->vlen, TXNID_NONE);
133 }
134
135 num_found += rs->n_rows;
136
137 r = toku_queue_enq(q2, v, 0, NULL);
138 assert(r==0);
139 }
140 assert((int)num_found == n);
141 if (!expect_error) assert(found_size_est == size_est);
142
143 r = toku_queue_eof(q2);
144 assert(r==0);
145
146 r = toku_queue_destroy(q);
147 assert(r==0);
148
149 DESCRIPTOR_S desc;
150 toku_fill_dbt(&desc.dbt, "abcd", 4);
151
152 int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
153 assert(fd>=0);
154
155 toku_set_func_malloc_only(my_malloc);
156 toku_set_func_realloc_only(my_realloc);
157 toku_set_func_fwrite(bad_fwrite);
158 toku_set_func_write(bad_write);
159 toku_set_func_pwrite(bad_pwrite);
160 ft_loader_set_error_function(&bl.error_callback, NULL, NULL);
161 ft_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
162
163 result = toku_loader_write_ft_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 16);
164
165 toku_set_func_malloc_only(NULL);
166 toku_set_func_realloc_only(NULL);
167 toku_set_func_fwrite(nullptr);
168 toku_set_func_write(NULL);
169 toku_set_func_pwrite(NULL);
170
171 ft_loader_destroy_error_callback(&bl.error_callback);
172 ft_loader_destroy_poll_callback(&bl.poll_callback);
173 ft_loader_lock_destroy(&bl);
174
175 r = toku_queue_destroy(q2);
176 assert(r==0);
177
178 destroy_merge_fileset(&fs);
179 ft_loader_fi_destroy(&bl.file_infos, expect_error);
180
181 return result;
182 }
183
usage(const char * progname,int n)184 static int usage(const char *progname, int n) {
185 fprintf(stderr, "Usage: %s [options] directory\n", progname);
186 fprintf(stderr, "[-v] turn on verbose\n");
187 fprintf(stderr, "[-q] turn off verbose\n");
188 fprintf(stderr, "[-r %d] set the number of rows\n", n);
189 fprintf(stderr, "[-s] set the small loader size factor\n");
190 fprintf(stderr, "[-m] inject big malloc and realloc errors\n");
191 fprintf(stderr, "[--malloc_limit %u] set the threshold for failing malloc and realloc\n", (unsigned) my_big_malloc_limit);
192 fprintf(stderr, "[--realloc_errors] inject realloc errors\n");
193 fprintf(stderr, "[-w] inject write errors\n");
194 fprintf(stderr, "[-u] inject user errors\n");
195 return 1;
196 }
197
test_main(int argc,const char * argv[])198 int test_main (int argc, const char *argv[]) {
199 const char *progname=argv[0];
200 int n = 1;
201 argc--; argv++;
202 while (argc>0) {
203 if (strcmp(argv[0],"-h")==0) {
204 return usage(progname, n);
205 } else if (strcmp(argv[0],"-v")==0) {
206 verbose=1;
207 } else if (strcmp(argv[0],"-q")==0) {
208 verbose=0;
209 } else if (strcmp(argv[0],"-r") == 0) {
210 argc--; argv++;
211 n = atoi(argv[0]);
212 } else if (strcmp(argv[0],"-s") == 0) {
213 toku_ft_loader_set_size_factor(1);
214 } else if (strcmp(argv[0],"-w") == 0) {
215 do_write_errors = 1;
216 } else if (strcmp(argv[0],"-m") == 0) {
217 do_malloc_errors = 1;
218 do_realloc_errors = 1;
219 } else if (strcmp(argv[0],"-u") == 0) {
220 do_user_errors = 1;
221 } else if (strcmp(argv[0],"--realloc_errors") == 0) {
222 do_realloc_errors = 1;
223 } else if (strcmp(argv[0],"--malloc_limit") == 0 && argc > 1) {
224 argc--; argv++;
225 my_big_malloc_limit = atoi(argv[0]);
226 } else if (argc!=1) {
227 return usage(progname, n);
228 }
229 else {
230 break;
231 }
232 argc--; argv++;
233 }
234 const char* directory = TOKU_TEST_FILENAME;
235 char unlink_all[strlen(directory)+20];
236 snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory);
237
238 int templen = strlen(directory)+15;
239 char tf_template[templen];
240 int tlen = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
241 assert (tlen>0 && tlen<templen);
242
243 char output_name[templen];
244 int olen = snprintf(output_name, templen, "%s/test.tokudb", directory);
245 assert (olen>0 && olen<templen);
246
247 // callibrate
248 int r;
249 r = system(unlink_all); CKERR(r);
250 r = toku_os_mkdir(directory, 0755); CKERR(r);
251 r = write_dbfile(tf_template, n, output_name, false, 0); CKERR(r);
252
253 if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count);
254 if (verbose) printf("my_realloc_count=%d big_count=%d\n", my_realloc_count, my_big_realloc_count);
255
256 int event_limit = event_count;
257 if (verbose) printf("event_limit=%d\n", event_limit);
258
259 // we computed an upper bound on the number of events. since the loader continues to malloc after a
260 // malloc failure, the actual number of events that can induce a failed load is less than the upper
261 // bound.
262 for (int i = 1; i <= event_limit; i++) {
263 reset_event_counts();
264 reset_my_malloc_counts();
265 event_count_trigger = i;
266 r = system(unlink_all); CKERR(r);
267 r = toku_os_mkdir(directory, 0755); CKERR(r);
268 r = write_dbfile(tf_template, n, output_name, true, i);
269 if (verbose) printf("event_count=%d\n", event_count);
270 if (r == 0)
271 break;
272 }
273
274 r = system(unlink_all); CKERR(r);
275
276 return 0;
277 }
278