1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 // test the loader write dbfile function
40
41
42 #include "test.h"
43 #include "loader/loader-internal.h"
44 #include <inttypes.h>
45 #include <portability/toku_path.h>
46
47
traceit(const char * s)48 static void traceit(const char *s) {
49 time_t t = time(NULL);
50 printf("%.24s %s\n", ctime(&t), s);
51 fflush(stdout);
52 }
53
qsort_compare_ints(const void * a,const void * b)54 static int qsort_compare_ints (const void *a, const void *b) {
55 int avalue = *(int*)a;
56 int bvalue = *(int*)b;
57 if (avalue<bvalue) return -1;
58 if (avalue>bvalue) return +1;
59 return 0;
60 }
61
compare_ints(DB * UU (desc),const DBT * akey,const DBT * bkey)62 static int compare_ints (DB *UU(desc), const DBT *akey, const DBT *bkey) {
63 assert(akey->size==sizeof(int));
64 assert(bkey->size==sizeof(int));
65 return qsort_compare_ints(akey->data, bkey->data);
66 }
67
err_cb(DB * db UU (),int dbn UU (),int err UU (),DBT * key UU (),DBT * val UU (),void * extra UU ())68 static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
69 fprintf(stderr, "error in test");
70 abort();
71 }
72
verify_dbfile(int n,const char * name)73 static void verify_dbfile(int n, const char *name) {
74 if (verbose) traceit("verify");
75
76 int r;
77
78 CACHETABLE ct;
79 toku_cachetable_create(&ct, 0, ZERO_LSN, nullptr);
80
81 TOKUTXN const null_txn = NULL;
82 FT_HANDLE t = NULL;
83 toku_ft_handle_create(&t);
84 toku_ft_set_bt_compare(t, compare_ints);
85 r = toku_ft_handle_open(t, name, 0, 0, ct, null_txn); assert(r==0);
86
87 if (verbose) traceit("Verifying ft internals");
88 r = toku_verify_ft(t);
89 if (verbose) traceit("Verified ft internals");
90
91 FT_CURSOR cursor = NULL;
92 r = toku_ft_cursor(t, &cursor, NULL, false, false); assert(r == 0);
93
94 size_t userdata = 0;
95 int i;
96 for (i=0; ; i++) {
97 int kk = i;
98 int vv = i;
99 struct check_pair pair = {sizeof kk, &kk, sizeof vv, &vv, 0};
100 r = toku_ft_cursor_get(cursor, NULL, lookup_checkf, &pair, DB_NEXT);
101 if (r != 0) {
102 assert(pair.call_count ==0);
103 break;
104 }
105 assert(pair.call_count==1);
106 userdata += pair.keylen + pair.vallen;
107 }
108
109 assert(i == n);
110
111 toku_ft_cursor_close(cursor);
112
113 struct ftstat64_s s;
114 toku_ft_handle_stat64(t, NULL, &s);
115 assert(s.nkeys == (uint64_t)n && s.ndata == (uint64_t)n && s.dsize == userdata);
116
117 r = toku_close_ft_handle_nolsn(t, 0); assert(r==0);
118 toku_cachetable_close(&ct);
119 if (verbose) traceit("verify done");
120 }
121
test_write_dbfile(char * tf_template,int n,char * output_name,TXNID xid)122 static void test_write_dbfile (char *tf_template, int n, char *output_name, TXNID xid) {
123 if (verbose) traceit("test start");
124
125 DB *dest_db = NULL;
126 struct ft_loader_s bl;
127 ZERO_STRUCT(bl);
128 bl.temp_file_template = tf_template;
129 bl.reserved_memory = 512*1024*1024;
130 bl.load_root_xid = xid;
131 if (xid) {
132 XCALLOC_N(1, bl.root_xids_that_created);
133 bl.root_xids_that_created[0] = 0;
134 }
135 int r = ft_loader_init_file_infos(&bl.file_infos); CKERR(r);
136 ft_loader_lock_init(&bl);
137 ft_loader_set_fractal_workers_count_from_c(&bl);
138
139 struct merge_fileset fs;
140 init_merge_fileset(&fs);
141
142 // put rows in the row set
143 struct rowset aset;
144 uint64_t size_est = 0;
145 init_rowset(&aset, toku_ft_loader_get_rowset_budget_for_testing());
146 for (int i=0; i<n; i++) {
147 DBT key;
148 toku_fill_dbt(&key, &i, sizeof i);
149 DBT val;
150 toku_fill_dbt(&val, &i, sizeof i);
151 add_row(&aset, &key, &val);
152 size_est += ft_loader_leafentry_size(key.size, val.size, xid);
153 }
154
155 toku_ft_loader_set_n_rows(&bl, n);
156
157 ft_loader_init_error_callback(&bl.error_callback);
158 ft_loader_set_error_function(&bl.error_callback, err_cb, NULL);
159 r = ft_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
160 // destroy_rowset(&aset);
161
162 ft_loader_fi_close_all(&bl.file_infos);
163
164 QUEUE q;
165 r = toku_queue_create(&q, 0xFFFFFFFF); // infinite queue.
166 assert(r==0);
167 r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
168 assert(fs.n_temp_files==0);
169
170 QUEUE q2;
171 r = toku_queue_create(&q2, 0xFFFFFFFF); // infinite queue.
172 assert(r==0);
173
174 size_t num_found = 0;
175 size_t found_size_est = 0;
176 while (1) {
177 void *v;
178 r = toku_queue_deq(q, &v, NULL, NULL);
179 if (r==EOF) break;
180 struct rowset *rs = (struct rowset *)v;
181 if (verbose) printf("v=%p\n", v);
182
183 for (size_t i=0; i<rs->n_rows; i++) {
184 struct row *row = &rs->rows[i];
185 assert(row->klen==sizeof(int));
186 assert(row->vlen==sizeof(int));
187 assert((int)(num_found+i)==*(int*)(rs->data+row->off));
188 found_size_est += ft_loader_leafentry_size(row->klen, row->vlen, xid);
189 }
190
191 num_found += rs->n_rows;
192
193 r = toku_queue_enq(q2, v, 0, NULL);
194 assert(r==0);
195 }
196 assert((int)num_found == n);
197 assert(found_size_est == size_est);
198
199 r = toku_queue_eof(q2);
200 assert(r==0);
201
202 r = toku_queue_destroy(q);
203 assert(r==0);
204
205 DESCRIPTOR_S desc;
206 toku_fill_dbt(&desc.dbt, "abcd", 4);
207
208 int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
209 assert(fd>=0);
210
211 if (verbose) traceit("write to file");
212 r = toku_loader_write_ft_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 16);
213 assert(r==0);
214
215 r = toku_queue_destroy(q2);
216 assert_zero(r);
217
218 destroy_merge_fileset(&fs);
219 ft_loader_fi_destroy(&bl.file_infos, false);
220
221 // walk a cursor through the dbfile and verify the rows
222 verify_dbfile(n, output_name);
223
224 ft_loader_destroy_error_callback(&bl.error_callback);
225 ft_loader_lock_destroy(&bl);
226
227 toku_free(bl.root_xids_that_created);
228 }
229
230 static int nrows = 1;
231 static TXNID xid = 0;
232
usage(const char * progname)233 static int usage(const char *progname) {
234 fprintf(stderr, "Usage:\n %s [-h] [-v] [-q] [-r %d] [-x %" PRIu64 "] [-s] directory\n", progname, nrows, xid);
235 return 1;
236 }
237
test_main(int argc,const char * argv[])238 int test_main (int argc, const char *argv[]) {
239 const char *progname=argv[0];
240 argc--; argv++;
241 while (argc>0) {
242 if (strcmp(argv[0], "-h") == 0 || strcmp(argv[0], "--help") == 0) {
243 return usage(progname);
244 } else if (strcmp(argv[0], "-v") == 0 || strcmp(argv[0], "--verbose") == 0) {
245 verbose=1;
246 } else if (strcmp(argv[0], "-q") == 0) {
247 verbose=0;
248 } else if (strcmp(argv[0], "-r") == 0) {
249 argc--; argv++;
250 nrows = atoi(argv[0]);
251 } else if (strcmp(argv[0], "-x") == 0) {
252 argc--; argv++;
253 xid = atol(argv[0]);
254 } else if (strcmp(argv[0], "-s") == 0) {
255 toku_ft_loader_set_size_factor(1);
256 } else if (argv[0][0] == '-' || argc != 1) {
257 return usage(progname);
258 } else {
259 break;
260 }
261 argc--; argv++;
262 }
263 const char* directory = TOKU_TEST_FILENAME;
264 char unlink_all[strlen(directory)+20];
265 snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory);
266 int r;
267 r = system(unlink_all);
268 CKERR(r);
269 r = toku_os_mkdir(directory, 0755);
270 CKERR(r);
271
272 int templen = strlen(directory)+15;
273 char tf_template[templen];
274 int tlen = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
275 assert (tlen>0 && tlen<templen);
276
277 char output_name[templen];
278 int olen = snprintf(output_name, templen, "%s/test.tokudb", directory);
279 assert (olen>0 && olen<templen);
280
281 test_write_dbfile(tf_template, nrows, output_name, xid);
282
283 #if 0
284 r = system(unlink_all);
285 CKERR(r);
286 #endif
287 return 0;
288 }
289
290