1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 #include <toku_assert.h>
41 #include <string.h>
42 #include <stdio.h>
43 #include <unistd.h>
44 #include "loader/loader-internal.h"
45 #include "memory.h"
46 #include <portability/toku_path.h>
47 
48 
qsort_compare_ints(const void * a,const void * b)49 static int qsort_compare_ints (const void *a, const void *b) {
50     int avalue = *(int*)a;
51     int bvalue = *(int*)b;
52     if (avalue<bvalue) return -1;
53     if (avalue>bvalue) return +1;
54     return 0;
55 
56 }
57 
compare_ints(DB * UU (desc),const DBT * akey,const DBT * bkey)58 static int compare_ints (DB* UU(desc), const DBT *akey, const DBT *bkey) {
59     assert(akey->size==sizeof(int));
60     assert(bkey->size==sizeof(int));
61     return qsort_compare_ints(akey->data, bkey->data);
62 }
63 
err_cb(DB * db UU (),int dbn UU (),int err UU (),DBT * key UU (),DBT * val UU (),void * extra UU ())64 static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
65     fprintf(stderr, "error in test");
66     abort();
67 }
68 
69 bool founddup;
expect_dups_cb(DB * db UU (),int dbn UU (),int err UU (),DBT * key UU (),DBT * val UU (),void * extra UU ())70 static void expect_dups_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
71     founddup=true;
72 }
73 
test_merge_internal(int a[],int na,int b[],int nb,bool dups)74 static void test_merge_internal (int a[], int na, int b[], int nb, bool dups) {
75     int *MALLOC_N(na+nb, ab); // the combined array a and b
76     for (int i=0; i<na; i++) {
77 	ab[i]=a[i];
78     }
79     for (int i=0; i<nb; i++) {
80 	ab[na+i] = b[i];
81     }
82     struct row *MALLOC_N(na, ar);
83     struct row *MALLOC_N(nb, br);
84     for (int i=0; i<na; i++) {
85 	ar[i].off = i*sizeof(a[0]);
86 	ar[i].klen = sizeof(a[i]);
87 	ar[i].vlen = 0;
88     }
89     for (int i=0; i<nb; i++) {
90 	br[i].off  = (na+i)*sizeof(b[0]);
91 	br[i].klen = sizeof(b[i]);
92 	br[i].vlen = 0;
93     }
94     struct row *MALLOC_N(na+nb, cr);
95     DB *dest_db = NULL;
96     struct ft_loader_s bl;
97     ft_loader_init_error_callback(&bl.error_callback);
98     ft_loader_set_error_function(&bl.error_callback, dups ? expect_dups_cb : err_cb, NULL);
99     struct rowset rs = { .memory_budget = 0, .n_rows = 0, .n_rows_limit = 0, .rows = NULL, .n_bytes = 0, .n_bytes_limit = 0,
100                          .data=(char*)ab};
101     merge_row_arrays_base(cr, ar, na, br, nb, 0, dest_db, compare_ints, &bl, &rs);
102     ft_loader_call_error_function(&bl.error_callback);
103     if (dups) {
104 	assert(founddup);
105     } else {
106 	// verify the merge
107 	int i=0;
108 	int j=0;
109 	for (int k=0; k<na+nb; k++) {
110 	    int voff = cr[k].off;
111 	    int vc   = *(int*)(((char*)ab)+voff);
112 	    if (i<na && j<nb) {
113 		if (vc==a[i]) {
114 		    assert(a[i]<=b[j]);
115 		    i++;
116 		} else if (vc==b[j]) {
117 		    assert(a[i]>b[j]);
118 		    j++;
119 		} else {
120 		    assert(0);
121 		}
122 	    }
123 	}
124     }
125     toku_free(cr);
126     toku_free(ar);
127     toku_free(br);
128     toku_free(ab);
129     ft_loader_destroy_error_callback(&bl.error_callback);
130 }
131 
132 /* Test the basic merger. */
test_merge(void)133 static void test_merge (void) {
134     {
135 	int avals[]={1,2,3,4,5};
136 	int *bvals = NULL;
137 	test_merge_internal(avals, 5, bvals, 0, false);
138 	test_merge_internal(bvals, 0, avals, 5, false);
139     }
140     {
141 	int avals[]={1,3,5,7};
142 	int bvals[]={2,4};
143 	test_merge_internal(avals, 4, bvals, 2, false);
144 	test_merge_internal(bvals, 2, avals, 4, false);
145     }
146     {
147 	int avals[]={1,2,3,5,6,7};
148 	int bvals[]={2,4,5,6,8};
149 	test_merge_internal(avals, 6, bvals, 5, true);
150 	test_merge_internal(bvals, 5, avals, 6, true);
151     }
152 }
153 
test_internal_mergesort_row_array(int a[],int n)154 static void test_internal_mergesort_row_array (int a[], int n) {
155     struct row *MALLOC_N(n, ar);
156     for (int i=0; i<n; i++) {
157 	ar[i].off  = i*sizeof(a[0]);
158 	ar[i].klen = sizeof(a[i]);
159 	ar[i].vlen = 0;
160     }
161     struct rowset rs = { .memory_budget = 0, .n_rows = 0, .n_rows_limit = 0, .rows = NULL, .n_bytes = 0, .n_bytes_limit = 0,
162                          .data=(char*)a};
163     ft_loader_mergesort_row_array (ar, n, 0, NULL, compare_ints, NULL, &rs);
164     int *MALLOC_N(n, tmp);
165     for (int i=0; i<n; i++) {
166 	tmp[i]=a[i];
167     }
168     qsort(tmp, n, sizeof(a[0]), qsort_compare_ints);
169     for (int i=0; i<n; i++) {
170 	int voff = ar[i].off;
171 	int v    = *(int*)(((char*)a)+voff);
172 	assert(tmp[i]==v);
173     }
174     toku_free(ar);
175     toku_free(tmp);
176 }
177 
test_mergesort_row_array(void)178 static void test_mergesort_row_array (void) {
179     {
180 	int avals[]={5,2,1,7};
181 	for (int i=0; i<=4; i++)
182 	    test_internal_mergesort_row_array(avals, i);
183     }
184     const int MAX_LEN = 100;
185     enum { MAX_VAL = 1000 };
186     for (int i=0; i<MAX_LEN; i++) {
187 	bool used[MAX_VAL];
188 	for (int j=0; j<MAX_VAL; j++) used[j]=false;
189 	int len=1+random()%MAX_LEN;
190 	int avals[len];
191 	for (int j=0; j<len; j++) {
192 	    int v;
193 	    do {
194 		v = random()%MAX_VAL;
195 	    } while (used[v]);
196 	    avals[j] = v;
197 	    used[v] = true;
198 	}
199 	test_internal_mergesort_row_array(avals, len);
200     }
201 }
202 
test_read_write_rows(char * tf_template)203 static void test_read_write_rows (char *tf_template) {
204     struct ft_loader_s bl;
205     ZERO_STRUCT(bl);
206     bl.temp_file_template = tf_template;
207     int r = ft_loader_init_file_infos(&bl.file_infos);
208     CKERR(r);
209     FIDX file;
210     r = ft_loader_open_temp_file(&bl, &file);
211     CKERR(r);
212 
213     uint64_t dataoff=0;
214 
215     const char *keystrings[] = {"abc", "b", "cefgh"};
216     const char *valstrings[] = {"defg", "", "xyz"};
217     uint64_t actual_size=0;
218     for (int i=0; i<3; i++) {
219 	DBT key;
220         toku_fill_dbt(&key, keystrings[i], strlen(keystrings[i]));
221 	DBT val;
222         toku_fill_dbt(&val, valstrings[i], strlen(valstrings[i]));
223 	r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, nullptr, &bl);
224 	CKERR(r);
225 	actual_size+=key.size + val.size + 8;
226     }
227     if (actual_size != dataoff) fprintf(stderr, "actual_size=%" PRIu64 ", dataoff=%" PRIu64 "\n", actual_size, dataoff);
228     assert(actual_size == dataoff);
229 
230     r = ft_loader_fi_close(&bl.file_infos, file, true);
231     CKERR(r);
232 
233     r = ft_loader_fi_reopen(&bl.file_infos, file, "r");
234     CKERR(r);
235 
236     {
237 	int n_read=0;
238 	DBT key, val;
239         toku_init_dbt(&key);
240         toku_init_dbt(&val);
241 	while (0==loader_read_row(toku_bl_fidx2file(&bl, file), &key, &val)) {
242 	    assert(strlen(keystrings[n_read])==key.size);
243 	    assert(strlen(valstrings[n_read])==val.size);
244 	    assert(0==memcmp(keystrings[n_read], key.data, key.size));
245 	    assert(0==memcmp(valstrings[n_read], val.data, val.size));
246 	    assert(key.size<=key.ulen);
247 	    assert(val.size<=val.ulen);
248 	    n_read++;
249 	}
250 	assert(n_read==3);
251 	toku_free(key.data);
252 	toku_free(val.data);
253     }
254     r = ft_loader_fi_close(&bl.file_infos, file, true);
255     CKERR(r);
256 
257     r = ft_loader_fi_unlink(&bl.file_infos, file);
258     CKERR(r);
259 
260     assert(bl.file_infos.n_files_open==0);
261     assert(bl.file_infos.n_files_extant==0);
262 
263     ft_loader_fi_destroy(&bl.file_infos, false);
264 }
265 
fill_rowset(struct rowset * rows,int keys[],const char * vals[],int n,uint64_t * size_est)266 static void fill_rowset (struct rowset *rows,
267 			 int keys[],
268 			 const char *vals[],
269 			 int n,
270 			 uint64_t *size_est) {
271     init_rowset(rows, toku_ft_loader_get_rowset_budget_for_testing());
272     for (int i=0; i<n; i++) {
273 	DBT key;
274         toku_fill_dbt(&key, &keys[i], sizeof keys[i]);
275 	DBT val;
276         toku_fill_dbt(&val, vals[i], strlen(vals[i]));
277 	add_row(rows, &key, &val);
278 	*size_est += ft_loader_leafentry_size(key.size, val.size, TXNID_NONE);
279     }
280 }
281 
verify_dbfile(int n,int sorted_keys[],const char * sorted_vals[],const char * name)282 static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], const char *name) {
283     int r;
284 
285     CACHETABLE ct;
286     toku_cachetable_create(&ct, 0, ZERO_LSN, nullptr);
287 
288     TOKUTXN const null_txn = NULL;
289     FT_HANDLE t = NULL;
290     toku_ft_handle_create(&t);
291     toku_ft_set_bt_compare(t, compare_ints);
292     r = toku_ft_handle_open(t, name, 0, 0, ct, null_txn); assert(r==0);
293 
294     FT_CURSOR cursor = NULL;
295     r = toku_ft_cursor(t, &cursor, NULL, false, false); assert(r == 0);
296 
297     size_t userdata = 0;
298     int i;
299     for (i=0; i<n; i++) {
300 	struct check_pair pair = {sizeof sorted_keys[i], &sorted_keys[i], (uint32_t) strlen(sorted_vals[i]), sorted_vals[i], 0};
301         r = toku_ft_cursor_get(cursor, NULL, lookup_checkf, &pair, DB_NEXT);
302         if (r != 0) {
303 	    assert(pair.call_count ==0);
304 	    break;
305 	}
306 	assert(pair.call_count==1);
307         userdata += pair.keylen + pair.vallen;
308     }
309 
310     struct check_pair pair; memset(&pair, 0, sizeof pair);
311     r = toku_ft_cursor_get(cursor, NULL, lookup_checkf, &pair, DB_NEXT);
312     assert(r != 0);
313 
314     toku_ft_cursor_close(cursor);
315 
316     struct ftstat64_s s;
317     toku_ft_handle_stat64(t, NULL, &s);
318     assert(s.nkeys == (uint64_t) n && s.ndata == (uint64_t) n && s.dsize == userdata);
319 
320     r = toku_close_ft_handle_nolsn(t, 0); assert(r==0);
321     toku_cachetable_close(&ct);
322 }
323 
test_merge_files(const char * tf_template,const char * output_name)324 static void test_merge_files (const char *tf_template, const char *output_name) {
325     DB *dest_db = NULL;
326     struct ft_loader_s bl;
327     ZERO_STRUCT(bl);
328     bl.temp_file_template = tf_template;
329     bl.reserved_memory = 512*1024*1024;
330     int r = ft_loader_init_file_infos(&bl.file_infos); CKERR(r);
331     ft_loader_lock_init(&bl);
332     ft_loader_init_error_callback(&bl.error_callback);
333     ft_loader_set_fractal_workers_count_from_c(&bl);
334 
335     struct merge_fileset fs;
336     init_merge_fileset(&fs);
337 
338     int a_keys[] = {   1,    3,    5,    7, 8, 9};
339     int b_keys[] = { 0,   2,    4,    6         };
340     const char *a_vals[] = {"a", "c", "e", "g", "h", "i"};
341     const char *b_vals[] = {"0", "b", "d", "f"};
342     int sorted_keys[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
343     const char *sorted_vals[] = { "0", "a", "b", "c", "d", "e", "f", "g", "h", "i" };
344     struct rowset aset, bset;
345     uint64_t size_est = 0;
346     fill_rowset(&aset, a_keys, a_vals, 6, &size_est);
347     fill_rowset(&bset, b_keys, b_vals, 4, &size_est);
348 
349     toku_ft_loader_set_n_rows(&bl, 6+4);
350 
351     ft_loader_set_error_function(&bl.error_callback, err_cb, NULL);
352     r = ft_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints);  CKERR(r);
353     r = ft_loader_sort_and_write_rows(&bset, &fs, &bl, 0, dest_db, compare_ints);  CKERR(r);
354     assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files);
355     // destroy_rowset(&aset);
356     // destroy_rowset(&bset);
357     for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1);
358 
359     ft_loader_fi_close_all(&bl.file_infos);
360 
361     QUEUE q;
362     r = toku_queue_create(&q, 0xFFFFFFFF); // infinite queue.
363     assert(r==0);
364 
365     r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
366 
367     assert(fs.n_temp_files==0);
368 
369     DESCRIPTOR_S desc;
370     toku_fill_dbt(&desc.dbt, "abcd", 4);
371 
372     int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
373     assert(fd>=0);
374 
375     r = toku_loader_write_ft_from_q_in_C(&bl, &desc, fd, 1000, q, size_est, 0, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 16);
376     assert(r==0);
377 
378     destroy_merge_fileset(&fs);
379     ft_loader_fi_destroy(&bl.file_infos, false);
380     ft_loader_destroy_error_callback(&bl.error_callback);
381     ft_loader_lock_destroy(&bl);
382 
383     // verify the dbfile
384     verify_dbfile(10, sorted_keys, sorted_vals, output_name);
385 
386     r = toku_queue_destroy(q);
387     assert(r==0);
388 }
389 
390 /* Test to see if we can open temporary files. */
test_main(int argc,const char * argv[])391 int test_main (int argc, const char *argv[]) {
392     argc--; argv++;
393     while (argc>0) {
394 	if (strcmp(argv[0],"-v")==0) {
395 	    verbose=1;
396 	} else if (strcmp(argv[0],"-q")==0) {
397 	    verbose=0;
398 	}
399         else {
400             break;
401         }
402 	argc--; argv++;
403     }
404     const char* directory = TOKU_TEST_FILENAME;
405     int r = toku_os_mkdir(directory, 0755);
406     if (r!=0) CKERR2(errno, EEXIST);
407 
408     int  templen = strlen(directory)+15;
409     char tf_template[templen];
410     {
411 	int n = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
412 	assert (n>0 && n<templen);
413     }
414     char output_name[templen];
415     {
416 	int n = snprintf(output_name, templen, "%s/data.tokudb", directory);
417 	assert (n>0 && n<templen);
418     }
419     test_read_write_rows(tf_template);
420     test_merge();
421     test_mergesort_row_array();
422     test_merge_files(tf_template, output_name);
423 
424     {
425 	char deletecmd[templen];
426 	int n =  snprintf(deletecmd, templen, "rm -rf %s", directory);
427 	assert(n>0 && n<templen);
428 	r = system(deletecmd);
429         CKERR(r);
430     }
431 
432     return 0;
433 }
434 
435