1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40 #include <toku_assert.h>
41 #include <string.h>
42 #include <stdio.h>
43 #include <unistd.h>
44 #include "loader/loader-internal.h"
45 #include "memory.h"
46 #include <portability/toku_path.h>
47
48
qsort_compare_ints(const void * a,const void * b)49 static int qsort_compare_ints (const void *a, const void *b) {
50 int avalue = *(int*)a;
51 int bvalue = *(int*)b;
52 if (avalue<bvalue) return -1;
53 if (avalue>bvalue) return +1;
54 return 0;
55
56 }
57
compare_ints(DB * UU (desc),const DBT * akey,const DBT * bkey)58 static int compare_ints (DB* UU(desc), const DBT *akey, const DBT *bkey) {
59 assert(akey->size==sizeof(int));
60 assert(bkey->size==sizeof(int));
61 return qsort_compare_ints(akey->data, bkey->data);
62 }
63
err_cb(DB * db UU (),int dbn UU (),int err UU (),DBT * key UU (),DBT * val UU (),void * extra UU ())64 static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
65 fprintf(stderr, "error in test");
66 abort();
67 }
68
69 bool founddup;
expect_dups_cb(DB * db UU (),int dbn UU (),int err UU (),DBT * key UU (),DBT * val UU (),void * extra UU ())70 static void expect_dups_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
71 founddup=true;
72 }
73
test_merge_internal(int a[],int na,int b[],int nb,bool dups)74 static void test_merge_internal (int a[], int na, int b[], int nb, bool dups) {
75 int *MALLOC_N(na+nb, ab); // the combined array a and b
76 for (int i=0; i<na; i++) {
77 ab[i]=a[i];
78 }
79 for (int i=0; i<nb; i++) {
80 ab[na+i] = b[i];
81 }
82 struct row *MALLOC_N(na, ar);
83 struct row *MALLOC_N(nb, br);
84 for (int i=0; i<na; i++) {
85 ar[i].off = i*sizeof(a[0]);
86 ar[i].klen = sizeof(a[i]);
87 ar[i].vlen = 0;
88 }
89 for (int i=0; i<nb; i++) {
90 br[i].off = (na+i)*sizeof(b[0]);
91 br[i].klen = sizeof(b[i]);
92 br[i].vlen = 0;
93 }
94 struct row *MALLOC_N(na+nb, cr);
95 DB *dest_db = NULL;
96 struct ft_loader_s bl;
97 ft_loader_init_error_callback(&bl.error_callback);
98 ft_loader_set_error_function(&bl.error_callback, dups ? expect_dups_cb : err_cb, NULL);
99 struct rowset rs = { .memory_budget = 0, .n_rows = 0, .n_rows_limit = 0, .rows = NULL, .n_bytes = 0, .n_bytes_limit = 0,
100 .data=(char*)ab};
101 merge_row_arrays_base(cr, ar, na, br, nb, 0, dest_db, compare_ints, &bl, &rs);
102 ft_loader_call_error_function(&bl.error_callback);
103 if (dups) {
104 assert(founddup);
105 } else {
106 // verify the merge
107 int i=0;
108 int j=0;
109 for (int k=0; k<na+nb; k++) {
110 int voff = cr[k].off;
111 int vc = *(int*)(((char*)ab)+voff);
112 if (i<na && j<nb) {
113 if (vc==a[i]) {
114 assert(a[i]<=b[j]);
115 i++;
116 } else if (vc==b[j]) {
117 assert(a[i]>b[j]);
118 j++;
119 } else {
120 assert(0);
121 }
122 }
123 }
124 }
125 toku_free(cr);
126 toku_free(ar);
127 toku_free(br);
128 toku_free(ab);
129 ft_loader_destroy_error_callback(&bl.error_callback);
130 }
131
132 /* Test the basic merger. */
test_merge(void)133 static void test_merge (void) {
134 {
135 int avals[]={1,2,3,4,5};
136 int *bvals = NULL;
137 test_merge_internal(avals, 5, bvals, 0, false);
138 test_merge_internal(bvals, 0, avals, 5, false);
139 }
140 {
141 int avals[]={1,3,5,7};
142 int bvals[]={2,4};
143 test_merge_internal(avals, 4, bvals, 2, false);
144 test_merge_internal(bvals, 2, avals, 4, false);
145 }
146 {
147 int avals[]={1,2,3,5,6,7};
148 int bvals[]={2,4,5,6,8};
149 test_merge_internal(avals, 6, bvals, 5, true);
150 test_merge_internal(bvals, 5, avals, 6, true);
151 }
152 }
153
test_internal_mergesort_row_array(int a[],int n)154 static void test_internal_mergesort_row_array (int a[], int n) {
155 struct row *MALLOC_N(n, ar);
156 for (int i=0; i<n; i++) {
157 ar[i].off = i*sizeof(a[0]);
158 ar[i].klen = sizeof(a[i]);
159 ar[i].vlen = 0;
160 }
161 struct rowset rs = { .memory_budget = 0, .n_rows = 0, .n_rows_limit = 0, .rows = NULL, .n_bytes = 0, .n_bytes_limit = 0,
162 .data=(char*)a};
163 ft_loader_mergesort_row_array (ar, n, 0, NULL, compare_ints, NULL, &rs);
164 int *MALLOC_N(n, tmp);
165 for (int i=0; i<n; i++) {
166 tmp[i]=a[i];
167 }
168 qsort(tmp, n, sizeof(a[0]), qsort_compare_ints);
169 for (int i=0; i<n; i++) {
170 int voff = ar[i].off;
171 int v = *(int*)(((char*)a)+voff);
172 assert(tmp[i]==v);
173 }
174 toku_free(ar);
175 toku_free(tmp);
176 }
177
test_mergesort_row_array(void)178 static void test_mergesort_row_array (void) {
179 {
180 int avals[]={5,2,1,7};
181 for (int i=0; i<=4; i++)
182 test_internal_mergesort_row_array(avals, i);
183 }
184 const int MAX_LEN = 100;
185 enum { MAX_VAL = 1000 };
186 for (int i=0; i<MAX_LEN; i++) {
187 bool used[MAX_VAL];
188 for (int j=0; j<MAX_VAL; j++) used[j]=false;
189 int len=1+random()%MAX_LEN;
190 int avals[len];
191 for (int j=0; j<len; j++) {
192 int v;
193 do {
194 v = random()%MAX_VAL;
195 } while (used[v]);
196 avals[j] = v;
197 used[v] = true;
198 }
199 test_internal_mergesort_row_array(avals, len);
200 }
201 }
202
test_read_write_rows(char * tf_template)203 static void test_read_write_rows (char *tf_template) {
204 struct ft_loader_s bl;
205 ZERO_STRUCT(bl);
206 bl.temp_file_template = tf_template;
207 int r = ft_loader_init_file_infos(&bl.file_infos);
208 CKERR(r);
209 FIDX file;
210 r = ft_loader_open_temp_file(&bl, &file);
211 CKERR(r);
212
213 uint64_t dataoff=0;
214
215 const char *keystrings[] = {"abc", "b", "cefgh"};
216 const char *valstrings[] = {"defg", "", "xyz"};
217 uint64_t actual_size=0;
218 for (int i=0; i<3; i++) {
219 DBT key;
220 toku_fill_dbt(&key, keystrings[i], strlen(keystrings[i]));
221 DBT val;
222 toku_fill_dbt(&val, valstrings[i], strlen(valstrings[i]));
223 r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, nullptr, &bl);
224 CKERR(r);
225 actual_size+=key.size + val.size + 8;
226 }
227 if (actual_size != dataoff) fprintf(stderr, "actual_size=%" PRIu64 ", dataoff=%" PRIu64 "\n", actual_size, dataoff);
228 assert(actual_size == dataoff);
229
230 r = ft_loader_fi_close(&bl.file_infos, file, true);
231 CKERR(r);
232
233 r = ft_loader_fi_reopen(&bl.file_infos, file, "r");
234 CKERR(r);
235
236 {
237 int n_read=0;
238 DBT key, val;
239 toku_init_dbt(&key);
240 toku_init_dbt(&val);
241 while (0==loader_read_row(toku_bl_fidx2file(&bl, file), &key, &val)) {
242 assert(strlen(keystrings[n_read])==key.size);
243 assert(strlen(valstrings[n_read])==val.size);
244 assert(0==memcmp(keystrings[n_read], key.data, key.size));
245 assert(0==memcmp(valstrings[n_read], val.data, val.size));
246 assert(key.size<=key.ulen);
247 assert(val.size<=val.ulen);
248 n_read++;
249 }
250 assert(n_read==3);
251 toku_free(key.data);
252 toku_free(val.data);
253 }
254 r = ft_loader_fi_close(&bl.file_infos, file, true);
255 CKERR(r);
256
257 r = ft_loader_fi_unlink(&bl.file_infos, file);
258 CKERR(r);
259
260 assert(bl.file_infos.n_files_open==0);
261 assert(bl.file_infos.n_files_extant==0);
262
263 ft_loader_fi_destroy(&bl.file_infos, false);
264 }
265
fill_rowset(struct rowset * rows,int keys[],const char * vals[],int n,uint64_t * size_est)266 static void fill_rowset (struct rowset *rows,
267 int keys[],
268 const char *vals[],
269 int n,
270 uint64_t *size_est) {
271 init_rowset(rows, toku_ft_loader_get_rowset_budget_for_testing());
272 for (int i=0; i<n; i++) {
273 DBT key;
274 toku_fill_dbt(&key, &keys[i], sizeof keys[i]);
275 DBT val;
276 toku_fill_dbt(&val, vals[i], strlen(vals[i]));
277 add_row(rows, &key, &val);
278 *size_est += ft_loader_leafentry_size(key.size, val.size, TXNID_NONE);
279 }
280 }
281
verify_dbfile(int n,int sorted_keys[],const char * sorted_vals[],const char * name)282 static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], const char *name) {
283 int r;
284
285 CACHETABLE ct;
286 toku_cachetable_create(&ct, 0, ZERO_LSN, nullptr);
287
288 TOKUTXN const null_txn = NULL;
289 FT_HANDLE t = NULL;
290 toku_ft_handle_create(&t);
291 toku_ft_set_bt_compare(t, compare_ints);
292 r = toku_ft_handle_open(t, name, 0, 0, ct, null_txn); assert(r==0);
293
294 FT_CURSOR cursor = NULL;
295 r = toku_ft_cursor(t, &cursor, NULL, false, false); assert(r == 0);
296
297 size_t userdata = 0;
298 int i;
299 for (i=0; i<n; i++) {
300 struct check_pair pair = {sizeof sorted_keys[i], &sorted_keys[i], (uint32_t) strlen(sorted_vals[i]), sorted_vals[i], 0};
301 r = toku_ft_cursor_get(cursor, NULL, lookup_checkf, &pair, DB_NEXT);
302 if (r != 0) {
303 assert(pair.call_count ==0);
304 break;
305 }
306 assert(pair.call_count==1);
307 userdata += pair.keylen + pair.vallen;
308 }
309
310 struct check_pair pair; memset(&pair, 0, sizeof pair);
311 r = toku_ft_cursor_get(cursor, NULL, lookup_checkf, &pair, DB_NEXT);
312 assert(r != 0);
313
314 toku_ft_cursor_close(cursor);
315
316 struct ftstat64_s s;
317 toku_ft_handle_stat64(t, NULL, &s);
318 assert(s.nkeys == (uint64_t) n && s.ndata == (uint64_t) n && s.dsize == userdata);
319
320 r = toku_close_ft_handle_nolsn(t, 0); assert(r==0);
321 toku_cachetable_close(&ct);
322 }
323
test_merge_files(const char * tf_template,const char * output_name)324 static void test_merge_files (const char *tf_template, const char *output_name) {
325 DB *dest_db = NULL;
326 struct ft_loader_s bl;
327 ZERO_STRUCT(bl);
328 bl.temp_file_template = tf_template;
329 bl.reserved_memory = 512*1024*1024;
330 int r = ft_loader_init_file_infos(&bl.file_infos); CKERR(r);
331 ft_loader_lock_init(&bl);
332 ft_loader_init_error_callback(&bl.error_callback);
333 ft_loader_set_fractal_workers_count_from_c(&bl);
334
335 struct merge_fileset fs;
336 init_merge_fileset(&fs);
337
338 int a_keys[] = { 1, 3, 5, 7, 8, 9};
339 int b_keys[] = { 0, 2, 4, 6 };
340 const char *a_vals[] = {"a", "c", "e", "g", "h", "i"};
341 const char *b_vals[] = {"0", "b", "d", "f"};
342 int sorted_keys[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
343 const char *sorted_vals[] = { "0", "a", "b", "c", "d", "e", "f", "g", "h", "i" };
344 struct rowset aset, bset;
345 uint64_t size_est = 0;
346 fill_rowset(&aset, a_keys, a_vals, 6, &size_est);
347 fill_rowset(&bset, b_keys, b_vals, 4, &size_est);
348
349 toku_ft_loader_set_n_rows(&bl, 6+4);
350
351 ft_loader_set_error_function(&bl.error_callback, err_cb, NULL);
352 r = ft_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
353 r = ft_loader_sort_and_write_rows(&bset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
354 assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files);
355 // destroy_rowset(&aset);
356 // destroy_rowset(&bset);
357 for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1);
358
359 ft_loader_fi_close_all(&bl.file_infos);
360
361 QUEUE q;
362 r = toku_queue_create(&q, 0xFFFFFFFF); // infinite queue.
363 assert(r==0);
364
365 r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
366
367 assert(fs.n_temp_files==0);
368
369 DESCRIPTOR_S desc;
370 toku_fill_dbt(&desc.dbt, "abcd", 4);
371
372 int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
373 assert(fd>=0);
374
375 r = toku_loader_write_ft_from_q_in_C(&bl, &desc, fd, 1000, q, size_est, 0, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 16);
376 assert(r==0);
377
378 destroy_merge_fileset(&fs);
379 ft_loader_fi_destroy(&bl.file_infos, false);
380 ft_loader_destroy_error_callback(&bl.error_callback);
381 ft_loader_lock_destroy(&bl);
382
383 // verify the dbfile
384 verify_dbfile(10, sorted_keys, sorted_vals, output_name);
385
386 r = toku_queue_destroy(q);
387 assert(r==0);
388 }
389
390 /* Test to see if we can open temporary files. */
test_main(int argc,const char * argv[])391 int test_main (int argc, const char *argv[]) {
392 argc--; argv++;
393 while (argc>0) {
394 if (strcmp(argv[0],"-v")==0) {
395 verbose=1;
396 } else if (strcmp(argv[0],"-q")==0) {
397 verbose=0;
398 }
399 else {
400 break;
401 }
402 argc--; argv++;
403 }
404 const char* directory = TOKU_TEST_FILENAME;
405 int r = toku_os_mkdir(directory, 0755);
406 if (r!=0) CKERR2(errno, EEXIST);
407
408 int templen = strlen(directory)+15;
409 char tf_template[templen];
410 {
411 int n = snprintf(tf_template, templen, "%s/tempXXXXXX", directory);
412 assert (n>0 && n<templen);
413 }
414 char output_name[templen];
415 {
416 int n = snprintf(output_name, templen, "%s/data.tokudb", directory);
417 assert (n>0 && n<templen);
418 }
419 test_read_write_rows(tf_template);
420 test_merge();
421 test_mergesort_row_array();
422 test_merge_files(tf_template, output_name);
423
424 {
425 char deletecmd[templen];
426 int n = snprintf(deletecmd, templen, "rm -rf %s", directory);
427 assert(n>0 && n<templen);
428 r = system(deletecmd);
429 CKERR(r);
430 }
431
432 return 0;
433 }
434
435