1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 // The purpose of this test is to test the extractor component of the ft loader.  We insert rowsets into the extractor queue and verify temp files
40 // after the extractor is finished.
41 
42 #define DONT_DEPRECATE_MALLOC
43 #define DONT_DEPRECATE_WRITES
44 #include "test.h"
45 #include "loader/loader.h"
46 #include "loader/loader-internal.h"
47 #include "memory.h"
48 #include <portability/toku_path.h>
49 
50 
qsort_compare_ints(const void * a,const void * b)51 static int qsort_compare_ints (const void *a, const void *b) {
52     int avalue = *(int*)a;
53     int bvalue = *(int*)b;
54     if (avalue<bvalue) return -1;
55     if (avalue>bvalue) return +1;
56     return 0;
57 }
58 
compare_int(DB * desc,const DBT * akey,const DBT * bkey)59 static int compare_int(DB *desc, const DBT *akey, const DBT *bkey) {
60     assert(desc == NULL);
61     assert(akey->size == sizeof (int));
62     assert(bkey->size == sizeof (int));
63     return qsort_compare_ints(akey->data, bkey->data);
64 }
65 
get_temp_files(const char * testdir)66 static char **get_temp_files(const char *testdir) {
67     int ntemp = 0;
68     int maxtemp = 32;
69     char **XMALLOC_N(maxtemp, tempfiles);
70 
71     DIR *d = opendir(testdir);
72     if (d) {
73         struct dirent *de;
74         while ((de = readdir(d)) != NULL) {
75             if (strncmp(de->d_name, "temp", 4) == 0) {
76                 if (ntemp >= maxtemp-1) {
77                     maxtemp = 2*maxtemp;
78                     XREALLOC_N(2*maxtemp, tempfiles);
79                 }
80                 tempfiles[ntemp++] = toku_strdup(de->d_name);
81             }
82         }
83         closedir(d);
84     }
85     tempfiles[ntemp] = NULL;
86     return tempfiles;
87 }
88 
free_temp_files(char ** tempfiles)89 static void free_temp_files(char **tempfiles) {
90     for (int i = 0; tempfiles[i] != NULL; i++)
91         toku_free(tempfiles[i]);
92     toku_free(tempfiles);
93 }
94 
read_row(FILE * f,DBT * key,DBT * val)95 static int read_row(FILE *f, DBT *key, DBT *val) {
96     size_t r;
97     int len;
98     r = fread(&len, sizeof len, 1, f);
99     if (r != 1)
100         return EOF;
101     assert(key->flags == DB_DBT_REALLOC);
102     key->data = toku_realloc(key->data, len); key->size = len;
103     r = fread(key->data, len, 1, f);
104     if (r != 1)
105         return EOF;
106     r = fread(&len, sizeof len, 1, f);
107     if (r != 1)
108         return EOF;
109     assert(val->flags == DB_DBT_REALLOC);
110     val->data = toku_realloc(val->data, len); val->size = len;
111     r = fread(val->data, len, 1, f);
112     if (r != 1)
113         return EOF;
114     return 0;
115 }
116 
write_row(FILE * f,DBT * key,DBT * val)117 static void write_row(FILE *f, DBT *key, DBT *val) {
118     size_t r;
119     int len = key->size;
120     r = fwrite(&len, sizeof len, 1, f);
121     assert(r == 1);
122     r = fwrite(key->data, len, 1, f);
123     assert(r == 1);
124     len = val->size;
125     r = fwrite(&len, sizeof len, 1, f);
126     assert(r == 1);
127     r = fwrite(val->data, len, 1, f);
128     assert(r == 1);
129 }
130 
read_tempfile(const char * testdir,const char * tempfile,int ** tempkeys,int * ntempkeys)131 static void read_tempfile(const char *testdir, const char *tempfile, int **tempkeys, int *ntempkeys) {
132     int maxkeys = 32;
133     int nkeys = 0;
134     int *XCALLOC_N(maxkeys, keys);
135 
136     char fname[strlen(testdir) + 1 + strlen(tempfile) + 1];
137     sprintf(fname, "%s/%s", testdir, tempfile);
138     FILE *f = fopen(fname, "r");
139     if (f) {
140         DBT key;
141         toku_init_dbt_flags(&key, DB_DBT_REALLOC);
142         DBT val;
143         toku_init_dbt_flags(&val, DB_DBT_REALLOC);
144         while (read_row(f, &key, &val) == 0) {
145             if (nkeys >= maxkeys) {
146                 maxkeys *= 2;
147                 XREALLOC_N(maxkeys, keys);
148             }
149             assert(key.size == sizeof (int));
150             memcpy(&keys[nkeys], key.data, key.size);
151             nkeys++;
152         }
153         toku_free(key.data);
154         toku_free(val.data);
155         fclose(f);
156     }
157 
158     *tempkeys = keys;
159     *ntempkeys = nkeys;
160 }
161 
verify_sorted(int a[],int n)162 static void verify_sorted(int a[], int n) {
163     for (int i = 1; i < n; i++)
164         assert(a[i-1] <= a[i]);
165 }
166 
167 struct merge_file {
168     FILE *f;
169     DBT key, val;
170     bool row_valid;
171 };
172 
173 static DBT zero_dbt;
174 
merge_file_init(struct merge_file * mf)175 static void merge_file_init(struct merge_file *mf) {
176     mf->f = NULL;
177     mf->key = zero_dbt; mf->key.flags = DB_DBT_REALLOC;
178     mf->val = zero_dbt; mf->val.flags = DB_DBT_REALLOC;
179     mf->row_valid = false;
180 }
181 
merge_file_destroy(struct merge_file * mf)182 static void merge_file_destroy(struct merge_file *mf) {
183     if (mf->f) {
184         fclose(mf->f);
185         mf->f = NULL;
186     }
187     toku_free(mf->key.data);
188     toku_free(mf->val.data);
189 }
190 
merge(char ** tempfiles,int ntempfiles,const char * testdir)191 static char *merge(char **tempfiles, int ntempfiles, const char *testdir) {
192     char fname[strlen(testdir) + 1 + strlen("result") + 1];
193     sprintf(fname, "%s/%s", testdir, "result");
194     FILE *mergef = fopen(fname, "w"); assert(mergef != NULL);
195 
196     struct merge_file f[ntempfiles];
197     for (int i = 0; i < ntempfiles; i++) {
198         merge_file_init(&f[i]);
199         char tname[strlen(testdir) + 1 + strlen(tempfiles[i]) + 1];
200         sprintf(tname, "%s/%s", testdir, tempfiles[i]);
201         f[i].f = fopen(tname, "r");
202 	if (f[i].f == NULL) {
203 	    int error = errno;
204 	    fprintf(stderr, "%s:%d errno=%d %s\n", __FILE__, __LINE__, error, strerror(error));
205 	    if (error == EMFILE)
206 		fprintf(stderr, "may need to increase the nofile ulimit\n");
207 	}
208 	assert(f[i].f != NULL);
209         if (read_row(f[i].f, &f[i].key, &f[i].val) == 0)
210             f[i].row_valid = true;
211     }
212 
213     while (1) {
214         // get min
215         int mini = -1;
216         for (int i = 0; i < ntempfiles; i++) {
217             if (f[i].row_valid) {
218                 if (mini == -1) {
219                     mini = i;
220                 } else {
221                     int r = compare_int(NULL, &f[mini].key, &f[i].key);
222                     assert(r != 0);
223                     if (r > 0)
224                         mini = i;
225                 }
226             }
227         }
228         if (mini == -1)
229             break;
230 
231         // write min
232         write_row(mergef, &f[mini].key, &f[mini].val);
233 
234         // refresh mini
235         if (read_row(f[mini].f, &f[mini].key, &f[mini].val) != 0)
236             f[mini].row_valid = false;
237     }
238 
239     for (int i = 0; i < ntempfiles; i++) {
240         merge_file_destroy(&f[i]);
241     }
242 
243     fclose(mergef);
244     return toku_strdup("result");
245 }
246 
verify(int inkey[],int nkeys,const char * testdir)247 static void verify(int inkey[], int nkeys, const char *testdir) {
248     // find the temp files
249     char **tempfiles = get_temp_files(testdir);
250     int ntempfiles = 0;
251     for (int i = 0; tempfiles[i] != NULL; i++) {
252         if (verbose) printf("%s\n", tempfiles[i]);
253         ntempfiles++;
254     }
255 
256     // verify each is sorted
257     for (int i = 0; i < ntempfiles; i++) {
258         int *tempkeys; int ntempkeys;
259         read_tempfile(testdir, tempfiles[i], &tempkeys, &ntempkeys);
260         verify_sorted(tempkeys, ntempkeys);
261         toku_free(tempkeys);
262     }
263 
264     // merge
265     char *result_file = merge(tempfiles, ntempfiles, testdir);
266     assert(result_file);
267 
268     int *result_keys; int n_result_keys;
269     read_tempfile(testdir, result_file, &result_keys, &n_result_keys);
270     toku_free(result_file);
271 
272     // compare
273     assert(nkeys == n_result_keys);
274     for (int i = 0; i < nkeys; i++)
275         assert(inkey[i] == result_keys[i]);
276 
277     toku_free(result_keys);
278     free_temp_files(tempfiles);
279 }
280 
generate(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)281 static int generate(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) {
282     toku_dbt_array_resize(dest_keys, 1);
283     toku_dbt_array_resize(dest_vals, 1);
284     DBT *dest_key = &dest_keys->dbts[0];
285     DBT *dest_val = &dest_vals->dbts[0];
286     assert(dest_db == NULL); assert(src_db == NULL);
287 
288     copy_dbt(dest_key, src_key);
289     copy_dbt(dest_val, src_val);
290 
291     return 0;
292 }
293 
populate_rowset(struct rowset * rowset,int seq,int nrows,int keys[])294 static void populate_rowset(struct rowset *rowset, int seq, int nrows, int keys[]) {
295     for (int i = 0; i < nrows; i++) {
296         int k = keys[i];
297         int v = seq * nrows + i;
298         DBT key;
299         toku_fill_dbt(&key, &k, sizeof k);
300         DBT val;
301         toku_fill_dbt(&val, &v, sizeof v);
302         add_row(rowset, &key, &val);
303     }
304 }
305 
shuffle(int a[],int n)306 static void shuffle(int a[], int n) {
307     for (int i = 0; i < n; i++) {
308         int r = random() % n;
309         int t = a[i]; a[i] = a[r]; a[r] = t;
310     }
311 }
312 
313 static int ascending_keys = 0;
314 static int ascending_keys_poison = 0;
315 static int descending_keys = 0;
316 static int random_keys = 0;
317 
test_extractor(int nrows,int nrowsets,const char * testdir)318 static void test_extractor(int nrows, int nrowsets, const char *testdir) {
319     if (verbose) printf("%s %d %d %s\n", __FUNCTION__, nrows, nrowsets, testdir);
320 
321     int r;
322 
323     int nkeys = nrows * nrowsets;
324     int *XCALLOC_N(nkeys, keys);
325     for (int i = 0; i < nkeys; i++)
326         keys[i] = ascending_keys ? 2*i : nkeys - i;
327     if (ascending_keys_poison) {
328         if (verbose)
329             printf("poison %d %d %d\n", nrows*(nrowsets-1), keys[nrows*(nrowsets-1)], keys[nrows-1] -1);
330         keys[nrows*(nrowsets-1)] = keys[nrows-1] - 1;
331     }
332     if (random_keys)
333         shuffle(keys, nkeys);
334 
335     // open the ft_loader. this runs the extractor.
336     const int N = 1;
337     FT_HANDLE fts[N];
338     DB* dbs[N];
339     const char *fnames[N];
340     ft_compare_func compares[N];
341     for (int i = 0; i < N; i++) {
342         fts[i] = NULL;
343         dbs[i] = NULL;
344         fnames[i] = "";
345         compares[i] = compare_int;
346     }
347 
348     char temp[strlen(testdir) + 1 + strlen("tempXXXXXX") + 1];
349     sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
350 
351     FTLOADER loader;
352     r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, fts, dbs, fnames, compares, temp, ZERO_LSN, nullptr, true, 0, false, true);
353     assert(r == 0);
354 
355     struct rowset *rowset[nrowsets];
356     for (int i = 0 ; i < nrowsets; i++) {
357         rowset[i] = (struct rowset *) toku_malloc(sizeof (struct rowset));
358         assert(rowset[i]);
359         init_rowset(rowset[i], toku_ft_loader_get_rowset_budget_for_testing());
360         populate_rowset(rowset[i], i, nrows, &keys[i*nrows]);
361     }
362 
363     // feed rowsets to the extractor
364     for (int i = 0; i < nrowsets; i++) {
365         r = toku_queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL);
366         assert(r == 0);
367     }
368     r = toku_ft_loader_finish_extractor(loader);
369     assert(r == 0);
370 
371     int error;
372     r = toku_ft_loader_get_error(loader, &error);
373     assert(r == 0);
374     assert(error == 0);
375 
376     // sort the input keys
377     qsort(keys, nkeys, sizeof (int), qsort_compare_ints);
378 
379     // verify the temp files
380     verify(keys, nkeys, testdir);
381 
382     // abort the ft_loader.  this ends the test
383     r = toku_ft_loader_abort(loader, true);
384     assert(r == 0);
385 
386     toku_free(keys);
387 }
388 
389 static int nrows = 1;
390 static int nrowsets = 2;
391 
usage(const char * progname)392 static int usage(const char *progname) {
393     fprintf(stderr, "Usage: %s [options] directory\n", progname);
394     fprintf(stderr, "[-v] turn on verbose\n");
395     fprintf(stderr, "[-q] turn off verbose\n");
396     fprintf(stderr, "[-r %d] set the number of rows\n", nrows);
397     fprintf(stderr, "[--rowsets %d] set the number of rowsets\n", nrowsets);
398     fprintf(stderr, "[-s] set the small loader size factor\n");
399     fprintf(stderr, "[--asc] [--dsc] [--random]\n");
400     return 1;
401 }
402 
test_main(int argc,const char * argv[])403 int test_main (int argc, const char *argv[]) {
404     const char *progname=argv[0];
405     argc--; argv++;
406     while (argc>0) {
407         if (strcmp(argv[0],"-h")==0) {
408             return usage(progname);
409         } else if (strcmp(argv[0],"-v")==0) {
410 	    verbose=1;
411 	} else if (strcmp(argv[0],"-q")==0) {
412 	    verbose=0;
413         } else if (strcmp(argv[0],"-r") == 0 && argc >= 1) {
414             argc--; argv++;
415             nrows = atoi(argv[0]);
416         } else if (strcmp(argv[0],"--rowsets") == 0 && argc >= 1) {
417             argc--; argv++;
418             nrowsets = atoi(argv[0]);
419         } else if (strcmp(argv[0],"-s") == 0) {
420             toku_ft_loader_set_size_factor(1);
421         } else if (strcmp(argv[0],"--asc") == 0) {
422             ascending_keys = 1;
423         } else if (strcmp(argv[0],"--dsc") == 0) {
424             descending_keys = 1;
425         } else if (strcmp(argv[0],"--random") == 0) {
426             random_keys = 1;
427         } else if (strcmp(argv[0], "--asc-poison") == 0) {
428             ascending_keys = 1;
429             ascending_keys_poison = 1;
430 	} else if (argc!=1) {
431             return usage(progname);
432 	    exit(1);
433 	}
434         else {
435             break;
436         }
437 	argc--; argv++;
438     }
439 
440     const char *testdir = TOKU_TEST_FILENAME;
441     char unlink_all[strlen(testdir)+20];
442     snprintf(unlink_all, strlen(testdir)+20, "rm -rf %s", testdir);
443     int r;
444     r = system(unlink_all); CKERR(r);
445     r = toku_os_mkdir(testdir, 0755); CKERR(r);
446 
447     if (ascending_keys + descending_keys + random_keys == 0)
448         ascending_keys = 1;
449 
450     // run test
451     test_extractor(nrows, nrowsets, testdir);
452 
453     r = system(unlink_all); CKERR(r);
454 
455     return 0;
456 }
457