1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 // test the loader write dbfile function
40 
41 
42 #include "test.h"
43 #include "loader/loader-internal.h"
44 #include <inttypes.h>
45 #include <portability/toku_path.h>
46 
47 
traceit(const char * s)48 static void traceit(const char *s) {
49     time_t t = time(NULL);
50     printf("%.24s %s\n", ctime(&t), s);
51     fflush(stdout);
52 }
53 
qsort_compare_ints(const void * a,const void * b)54 static int qsort_compare_ints (const void *a, const void *b) {
55     int avalue = *(int*)a;
56     int bvalue = *(int*)b;
57     if (avalue<bvalue) return -1;
58     if (avalue>bvalue) return +1;
59     return 0;
60 }
61 
compare_ints(DB * UU (desc),const DBT * akey,const DBT * bkey)62 static int compare_ints (DB *UU(desc), const DBT *akey, const DBT *bkey) {
63     assert(akey->size==sizeof(int));
64     assert(bkey->size==sizeof(int));
65     return qsort_compare_ints(akey->data, bkey->data);
66 }
67 
err_cb(DB * db UU (),int dbn UU (),int err UU (),DBT * key UU (),DBT * val UU (),void * extra UU ())68 static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
69     fprintf(stderr, "error in test");
70     abort();
71 }
72 
verify_dbfile(int n,const char * name)73 static void verify_dbfile(int n, const char *name) {
74     if (verbose) traceit("verify");
75 
76     int r;
77 
78     CACHETABLE ct;
79     toku_cachetable_create(&ct, 0, ZERO_LSN, nullptr);
80 
81     TOKUTXN const null_txn = NULL;
82     FT_HANDLE t = NULL;
83     toku_ft_handle_create(&t);
84     toku_ft_set_bt_compare(t, compare_ints);
85     r = toku_ft_handle_open(t, name, 0, 0, ct, null_txn); assert(r==0);
86 
87     if (verbose) traceit("Verifying ft internals");
88     r = toku_verify_ft(t);
89     if (verbose) traceit("Verified ft internals");
90 
91     FT_CURSOR cursor = NULL;
92     r = toku_ft_cursor(t, &cursor, NULL, false, false); assert(r == 0);
93 
94     size_t userdata = 0;
95     int i;
96     for (i=0; ; i++) {
97 	int kk = i;
98 	int vv = i;
99 	struct check_pair pair = {sizeof kk, &kk, sizeof vv, &vv, 0};
100         r = toku_ft_cursor_get(cursor, NULL, lookup_checkf, &pair, DB_NEXT);
101         if (r != 0) {
102 	    assert(pair.call_count ==0);
103 	    break;
104 	}
105 	assert(pair.call_count==1);
106         userdata += pair.keylen + pair.vallen;
107     }
108 
109     assert(i == n);
110 
111     toku_ft_cursor_close(cursor);
112 
113     struct ftstat64_s s;
114     toku_ft_handle_stat64(t, NULL, &s);
115     assert(s.nkeys == (uint64_t)n && s.ndata == (uint64_t)n && s.dsize == userdata);
116 
117     r = toku_close_ft_handle_nolsn(t, 0); assert(r==0);
118     toku_cachetable_close(&ct);
119     if (verbose) traceit("verify done");
120 }
121 
test_write_dbfile(char * tf_template,int n,char * output_name,TXNID xid)122 static void test_write_dbfile (char *tf_template, int n, char *output_name, TXNID xid) {
123     if (verbose) traceit("test start");
124 
125     DB *dest_db = NULL;
126     struct ft_loader_s bl;
127     ZERO_STRUCT(bl);
128     bl.temp_file_template = tf_template;
129     bl.reserved_memory = 512*1024*1024;
130     bl.load_root_xid = xid;
131     if (xid) {
132         XCALLOC_N(1, bl.root_xids_that_created);
133         bl.root_xids_that_created[0] = 0;
134     }
135     int r = ft_loader_init_file_infos(&bl.file_infos); CKERR(r);
136     ft_loader_lock_init(&bl);
137     ft_loader_set_fractal_workers_count_from_c(&bl);
138 
139     struct merge_fileset fs;
140     init_merge_fileset(&fs);
141 
142     // put rows in the row set
143     struct rowset aset;
144     uint64_t size_est = 0;
145     init_rowset(&aset, toku_ft_loader_get_rowset_budget_for_testing());
146     for (int i=0; i<n; i++) {
147 	DBT key;
148         toku_fill_dbt(&key, &i, sizeof i);
149 	DBT val;
150         toku_fill_dbt(&val, &i, sizeof i);
151 	add_row(&aset, &key, &val);
152 	size_est += ft_loader_leafentry_size(key.size, val.size, xid);
153      }
154 
155     toku_ft_loader_set_n_rows(&bl, n);
156 
157     ft_loader_init_error_callback(&bl.error_callback);
158     ft_loader_set_error_function(&bl.error_callback, err_cb, NULL);
159     r = ft_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints);  CKERR(r);
160     // destroy_rowset(&aset);
161 
162     ft_loader_fi_close_all(&bl.file_infos);
163 
164     QUEUE q;
165     r = toku_queue_create(&q, 0xFFFFFFFF); // infinite queue.
166     assert(r==0);
167     r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
168     assert(fs.n_temp_files==0);
169 
170     QUEUE q2;
171     r = toku_queue_create(&q2, 0xFFFFFFFF); // infinite queue.
172     assert(r==0);
173 
174     size_t num_found = 0;
175     size_t found_size_est = 0;
176     while (1) {
177 	void *v;
178 	r = toku_queue_deq(q, &v, NULL, NULL);
179 	if (r==EOF) break;
180 	struct rowset *rs = (struct rowset *)v;
181 	if (verbose) printf("v=%p\n", v);
182 
183 	for (size_t i=0; i<rs->n_rows; i++) {
184 	    struct row *row = &rs->rows[i];
185 	    assert(row->klen==sizeof(int));
186 	    assert(row->vlen==sizeof(int));
187 	    assert((int)(num_found+i)==*(int*)(rs->data+row->off));
188 	    found_size_est += ft_loader_leafentry_size(row->klen, row->vlen, xid);
189  	}
190 
191 	num_found += rs->n_rows;
192 
193 	r = toku_queue_enq(q2, v, 0, NULL);
194 	assert(r==0);
195     }
196     assert((int)num_found == n);
197     assert(found_size_est == size_est);
198 
199     r = toku_queue_eof(q2);
200     assert(r==0);
201 
202     r = toku_queue_destroy(q);
203     assert(r==0);
204 
205     DESCRIPTOR_S desc;
206     toku_fill_dbt(&desc.dbt, "abcd", 4);
207 
208     int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
209     assert(fd>=0);
210 
211     if (verbose) traceit("write to file");
212     r = toku_loader_write_ft_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 16);
213     assert(r==0);
214 
215     r = toku_queue_destroy(q2);
216     assert_zero(r);
217 
218     destroy_merge_fileset(&fs);
219     ft_loader_fi_destroy(&bl.file_infos, false);
220 
221     // walk a cursor through the dbfile and verify the rows
222     verify_dbfile(n, output_name);
223 
224     ft_loader_destroy_error_callback(&bl.error_callback);
225     ft_loader_lock_destroy(&bl);
226 
227     toku_free(bl.root_xids_that_created);
228 }
229 
230 static int nrows = 1;
231 static TXNID xid = 0;
232 
usage(const char * progname)233 static int usage(const char *progname) {
234     fprintf(stderr, "Usage:\n %s [-h] [-v] [-q] [-r %d] [-x %" PRIu64 "] [-s] directory\n", progname, nrows, xid);
235     return 1;
236 }
237 
test_main(int argc,const char * argv[])238 int test_main (int argc, const char *argv[]) {
239     const char *progname=argv[0];
240     argc--; argv++;
241     while (argc>0) {
242 	if (strcmp(argv[0], "-h") == 0 || strcmp(argv[0], "--help") == 0) {
243 	    return usage(progname);
244         } else if (strcmp(argv[0], "-v") == 0 || strcmp(argv[0], "--verbose") == 0) {
245 	    verbose=1;
246 	} else if (strcmp(argv[0], "-q") == 0) {
247 	    verbose=0;
248         } else if (strcmp(argv[0], "-r") == 0) {
249             argc--; argv++;
250             nrows = atoi(argv[0]);
251         } else if (strcmp(argv[0], "-x") == 0) {
252             argc--; argv++;
253             xid = atol(argv[0]);
254         } else if (strcmp(argv[0], "-s") == 0) {
255             toku_ft_loader_set_size_factor(1);
256 	} else if (argv[0][0] == '-' || argc != 1) {
257             return usage(progname);
258 	} else {
259             break;
260         }
261 	argc--; argv++;
262     }
263     const char* directory = TOKU_TEST_FILENAME;
264     char unlink_all[strlen(directory)+20];
265     snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory);
266     int r;
267     r = system(unlink_all);
268     CKERR(r);
269     r = toku_os_mkdir(directory, 0755);
270     CKERR(r);
271 
272     int  templen = strlen(directory)+15;
273     char tf_template[templen];
274     int tlen = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
275     assert (tlen>0 && tlen<templen);
276 
277     char output_name[templen];
278     int  olen = snprintf(output_name, templen, "%s/test.tokudb", directory);
279     assert (olen>0 && olen<templen);
280 
281     test_write_dbfile(tf_template, nrows, output_name, xid);
282 
283 #if 0
284     r = system(unlink_all);
285     CKERR(r);
286 #endif
287     return 0;
288 }
289 
290