1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 // The purpose of this test is to test the extractor component of the ft loader. We insert rowsets into the extractor queue and verify temp files
40 // after the extractor is finished.
41
42 #define DONT_DEPRECATE_MALLOC
43 #define DONT_DEPRECATE_WRITES
44 #include "test.h"
45 #include "loader/loader.h"
46 #include "loader/loader-internal.h"
47 #include "memory.h"
48 #include <portability/toku_path.h>
49
50
qsort_compare_ints(const void * a,const void * b)51 static int qsort_compare_ints (const void *a, const void *b) {
52 int avalue = *(int*)a;
53 int bvalue = *(int*)b;
54 if (avalue<bvalue) return -1;
55 if (avalue>bvalue) return +1;
56 return 0;
57 }
58
compare_int(DB * desc,const DBT * akey,const DBT * bkey)59 static int compare_int(DB *desc, const DBT *akey, const DBT *bkey) {
60 assert(desc == NULL);
61 assert(akey->size == sizeof (int));
62 assert(bkey->size == sizeof (int));
63 return qsort_compare_ints(akey->data, bkey->data);
64 }
65
get_temp_files(const char * testdir)66 static char **get_temp_files(const char *testdir) {
67 int ntemp = 0;
68 int maxtemp = 32;
69 char **XMALLOC_N(maxtemp, tempfiles);
70
71 DIR *d = opendir(testdir);
72 if (d) {
73 struct dirent *de;
74 while ((de = readdir(d)) != NULL) {
75 if (strncmp(de->d_name, "temp", 4) == 0) {
76 if (ntemp >= maxtemp-1) {
77 maxtemp = 2*maxtemp;
78 XREALLOC_N(2*maxtemp, tempfiles);
79 }
80 tempfiles[ntemp++] = toku_strdup(de->d_name);
81 }
82 }
83 closedir(d);
84 }
85 tempfiles[ntemp] = NULL;
86 return tempfiles;
87 }
88
free_temp_files(char ** tempfiles)89 static void free_temp_files(char **tempfiles) {
90 for (int i = 0; tempfiles[i] != NULL; i++)
91 toku_free(tempfiles[i]);
92 toku_free(tempfiles);
93 }
94
read_row(FILE * f,DBT * key,DBT * val)95 static int read_row(FILE *f, DBT *key, DBT *val) {
96 size_t r;
97 int len;
98 r = fread(&len, sizeof len, 1, f);
99 if (r != 1)
100 return EOF;
101 assert(key->flags == DB_DBT_REALLOC);
102 key->data = toku_realloc(key->data, len); key->size = len;
103 r = fread(key->data, len, 1, f);
104 if (r != 1)
105 return EOF;
106 r = fread(&len, sizeof len, 1, f);
107 if (r != 1)
108 return EOF;
109 assert(val->flags == DB_DBT_REALLOC);
110 val->data = toku_realloc(val->data, len); val->size = len;
111 r = fread(val->data, len, 1, f);
112 if (r != 1)
113 return EOF;
114 return 0;
115 }
116
write_row(FILE * f,DBT * key,DBT * val)117 static void write_row(FILE *f, DBT *key, DBT *val) {
118 size_t r;
119 int len = key->size;
120 r = fwrite(&len, sizeof len, 1, f);
121 assert(r == 1);
122 r = fwrite(key->data, len, 1, f);
123 assert(r == 1);
124 len = val->size;
125 r = fwrite(&len, sizeof len, 1, f);
126 assert(r == 1);
127 r = fwrite(val->data, len, 1, f);
128 assert(r == 1);
129 }
130
read_tempfile(const char * testdir,const char * tempfile,int ** tempkeys,int * ntempkeys)131 static void read_tempfile(const char *testdir, const char *tempfile, int **tempkeys, int *ntempkeys) {
132 int maxkeys = 32;
133 int nkeys = 0;
134 int *XCALLOC_N(maxkeys, keys);
135
136 char fname[strlen(testdir) + 1 + strlen(tempfile) + 1];
137 sprintf(fname, "%s/%s", testdir, tempfile);
138 FILE *f = fopen(fname, "r");
139 if (f) {
140 DBT key;
141 toku_init_dbt_flags(&key, DB_DBT_REALLOC);
142 DBT val;
143 toku_init_dbt_flags(&val, DB_DBT_REALLOC);
144 while (read_row(f, &key, &val) == 0) {
145 if (nkeys >= maxkeys) {
146 maxkeys *= 2;
147 XREALLOC_N(maxkeys, keys);
148 }
149 assert(key.size == sizeof (int));
150 memcpy(&keys[nkeys], key.data, key.size);
151 nkeys++;
152 }
153 toku_free(key.data);
154 toku_free(val.data);
155 fclose(f);
156 }
157
158 *tempkeys = keys;
159 *ntempkeys = nkeys;
160 }
161
verify_sorted(int a[],int n)162 static void verify_sorted(int a[], int n) {
163 for (int i = 1; i < n; i++)
164 assert(a[i-1] <= a[i]);
165 }
166
167 struct merge_file {
168 FILE *f;
169 DBT key, val;
170 bool row_valid;
171 };
172
173 static DBT zero_dbt;
174
merge_file_init(struct merge_file * mf)175 static void merge_file_init(struct merge_file *mf) {
176 mf->f = NULL;
177 mf->key = zero_dbt; mf->key.flags = DB_DBT_REALLOC;
178 mf->val = zero_dbt; mf->val.flags = DB_DBT_REALLOC;
179 mf->row_valid = false;
180 }
181
merge_file_destroy(struct merge_file * mf)182 static void merge_file_destroy(struct merge_file *mf) {
183 if (mf->f) {
184 fclose(mf->f);
185 mf->f = NULL;
186 }
187 toku_free(mf->key.data);
188 toku_free(mf->val.data);
189 }
190
merge(char ** tempfiles,int ntempfiles,const char * testdir)191 static char *merge(char **tempfiles, int ntempfiles, const char *testdir) {
192 char fname[strlen(testdir) + 1 + strlen("result") + 1];
193 sprintf(fname, "%s/%s", testdir, "result");
194 FILE *mergef = fopen(fname, "w"); assert(mergef != NULL);
195
196 struct merge_file f[ntempfiles];
197 for (int i = 0; i < ntempfiles; i++) {
198 merge_file_init(&f[i]);
199 char tname[strlen(testdir) + 1 + strlen(tempfiles[i]) + 1];
200 sprintf(tname, "%s/%s", testdir, tempfiles[i]);
201 f[i].f = fopen(tname, "r");
202 if (f[i].f == NULL) {
203 int error = errno;
204 fprintf(stderr, "%s:%d errno=%d %s\n", __FILE__, __LINE__, error, strerror(error));
205 if (error == EMFILE)
206 fprintf(stderr, "may need to increase the nofile ulimit\n");
207 }
208 assert(f[i].f != NULL);
209 if (read_row(f[i].f, &f[i].key, &f[i].val) == 0)
210 f[i].row_valid = true;
211 }
212
213 while (1) {
214 // get min
215 int mini = -1;
216 for (int i = 0; i < ntempfiles; i++) {
217 if (f[i].row_valid) {
218 if (mini == -1) {
219 mini = i;
220 } else {
221 int r = compare_int(NULL, &f[mini].key, &f[i].key);
222 assert(r != 0);
223 if (r > 0)
224 mini = i;
225 }
226 }
227 }
228 if (mini == -1)
229 break;
230
231 // write min
232 write_row(mergef, &f[mini].key, &f[mini].val);
233
234 // refresh mini
235 if (read_row(f[mini].f, &f[mini].key, &f[mini].val) != 0)
236 f[mini].row_valid = false;
237 }
238
239 for (int i = 0; i < ntempfiles; i++) {
240 merge_file_destroy(&f[i]);
241 }
242
243 fclose(mergef);
244 return toku_strdup("result");
245 }
246
verify(int inkey[],int nkeys,const char * testdir)247 static void verify(int inkey[], int nkeys, const char *testdir) {
248 // find the temp files
249 char **tempfiles = get_temp_files(testdir);
250 int ntempfiles = 0;
251 for (int i = 0; tempfiles[i] != NULL; i++) {
252 if (verbose) printf("%s\n", tempfiles[i]);
253 ntempfiles++;
254 }
255
256 // verify each is sorted
257 for (int i = 0; i < ntempfiles; i++) {
258 int *tempkeys; int ntempkeys;
259 read_tempfile(testdir, tempfiles[i], &tempkeys, &ntempkeys);
260 verify_sorted(tempkeys, ntempkeys);
261 toku_free(tempkeys);
262 }
263
264 // merge
265 char *result_file = merge(tempfiles, ntempfiles, testdir);
266 assert(result_file);
267
268 int *result_keys; int n_result_keys;
269 read_tempfile(testdir, result_file, &result_keys, &n_result_keys);
270 toku_free(result_file);
271
272 // compare
273 assert(nkeys == n_result_keys);
274 for (int i = 0; i < nkeys; i++)
275 assert(inkey[i] == result_keys[i]);
276
277 toku_free(result_keys);
278 free_temp_files(tempfiles);
279 }
280
generate(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)281 static int generate(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) {
282 toku_dbt_array_resize(dest_keys, 1);
283 toku_dbt_array_resize(dest_vals, 1);
284 DBT *dest_key = &dest_keys->dbts[0];
285 DBT *dest_val = &dest_vals->dbts[0];
286 assert(dest_db == NULL); assert(src_db == NULL);
287
288 copy_dbt(dest_key, src_key);
289 copy_dbt(dest_val, src_val);
290
291 return 0;
292 }
293
populate_rowset(struct rowset * rowset,int seq,int nrows,int keys[])294 static void populate_rowset(struct rowset *rowset, int seq, int nrows, int keys[]) {
295 for (int i = 0; i < nrows; i++) {
296 int k = keys[i];
297 int v = seq * nrows + i;
298 DBT key;
299 toku_fill_dbt(&key, &k, sizeof k);
300 DBT val;
301 toku_fill_dbt(&val, &v, sizeof v);
302 add_row(rowset, &key, &val);
303 }
304 }
305
shuffle(int a[],int n)306 static void shuffle(int a[], int n) {
307 for (int i = 0; i < n; i++) {
308 int r = random() % n;
309 int t = a[i]; a[i] = a[r]; a[r] = t;
310 }
311 }
312
313 static int ascending_keys = 0;
314 static int ascending_keys_poison = 0;
315 static int descending_keys = 0;
316 static int random_keys = 0;
317
test_extractor(int nrows,int nrowsets,const char * testdir)318 static void test_extractor(int nrows, int nrowsets, const char *testdir) {
319 if (verbose) printf("%s %d %d %s\n", __FUNCTION__, nrows, nrowsets, testdir);
320
321 int r;
322
323 int nkeys = nrows * nrowsets;
324 int *XCALLOC_N(nkeys, keys);
325 for (int i = 0; i < nkeys; i++)
326 keys[i] = ascending_keys ? 2*i : nkeys - i;
327 if (ascending_keys_poison) {
328 if (verbose)
329 printf("poison %d %d %d\n", nrows*(nrowsets-1), keys[nrows*(nrowsets-1)], keys[nrows-1] -1);
330 keys[nrows*(nrowsets-1)] = keys[nrows-1] - 1;
331 }
332 if (random_keys)
333 shuffle(keys, nkeys);
334
335 // open the ft_loader. this runs the extractor.
336 const int N = 1;
337 FT_HANDLE fts[N];
338 DB* dbs[N];
339 const char *fnames[N];
340 ft_compare_func compares[N];
341 for (int i = 0; i < N; i++) {
342 fts[i] = NULL;
343 dbs[i] = NULL;
344 fnames[i] = "";
345 compares[i] = compare_int;
346 }
347
348 char temp[strlen(testdir) + 1 + strlen("tempXXXXXX") + 1];
349 sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
350
351 FTLOADER loader;
352 r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, fts, dbs, fnames, compares, temp, ZERO_LSN, nullptr, true, 0, false, true);
353 assert(r == 0);
354
355 struct rowset *rowset[nrowsets];
356 for (int i = 0 ; i < nrowsets; i++) {
357 rowset[i] = (struct rowset *) toku_malloc(sizeof (struct rowset));
358 assert(rowset[i]);
359 init_rowset(rowset[i], toku_ft_loader_get_rowset_budget_for_testing());
360 populate_rowset(rowset[i], i, nrows, &keys[i*nrows]);
361 }
362
363 // feed rowsets to the extractor
364 for (int i = 0; i < nrowsets; i++) {
365 r = toku_queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL);
366 assert(r == 0);
367 }
368 r = toku_ft_loader_finish_extractor(loader);
369 assert(r == 0);
370
371 int error;
372 r = toku_ft_loader_get_error(loader, &error);
373 assert(r == 0);
374 assert(error == 0);
375
376 // sort the input keys
377 qsort(keys, nkeys, sizeof (int), qsort_compare_ints);
378
379 // verify the temp files
380 verify(keys, nkeys, testdir);
381
382 // abort the ft_loader. this ends the test
383 r = toku_ft_loader_abort(loader, true);
384 assert(r == 0);
385
386 toku_free(keys);
387 }
388
389 static int nrows = 1;
390 static int nrowsets = 2;
391
usage(const char * progname)392 static int usage(const char *progname) {
393 fprintf(stderr, "Usage: %s [options] directory\n", progname);
394 fprintf(stderr, "[-v] turn on verbose\n");
395 fprintf(stderr, "[-q] turn off verbose\n");
396 fprintf(stderr, "[-r %d] set the number of rows\n", nrows);
397 fprintf(stderr, "[--rowsets %d] set the number of rowsets\n", nrowsets);
398 fprintf(stderr, "[-s] set the small loader size factor\n");
399 fprintf(stderr, "[--asc] [--dsc] [--random]\n");
400 return 1;
401 }
402
test_main(int argc,const char * argv[])403 int test_main (int argc, const char *argv[]) {
404 const char *progname=argv[0];
405 argc--; argv++;
406 while (argc>0) {
407 if (strcmp(argv[0],"-h")==0) {
408 return usage(progname);
409 } else if (strcmp(argv[0],"-v")==0) {
410 verbose=1;
411 } else if (strcmp(argv[0],"-q")==0) {
412 verbose=0;
413 } else if (strcmp(argv[0],"-r") == 0 && argc >= 1) {
414 argc--; argv++;
415 nrows = atoi(argv[0]);
416 } else if (strcmp(argv[0],"--rowsets") == 0 && argc >= 1) {
417 argc--; argv++;
418 nrowsets = atoi(argv[0]);
419 } else if (strcmp(argv[0],"-s") == 0) {
420 toku_ft_loader_set_size_factor(1);
421 } else if (strcmp(argv[0],"--asc") == 0) {
422 ascending_keys = 1;
423 } else if (strcmp(argv[0],"--dsc") == 0) {
424 descending_keys = 1;
425 } else if (strcmp(argv[0],"--random") == 0) {
426 random_keys = 1;
427 } else if (strcmp(argv[0], "--asc-poison") == 0) {
428 ascending_keys = 1;
429 ascending_keys_poison = 1;
430 } else if (argc!=1) {
431 return usage(progname);
432 exit(1);
433 }
434 else {
435 break;
436 }
437 argc--; argv++;
438 }
439
440 const char *testdir = TOKU_TEST_FILENAME;
441 char unlink_all[strlen(testdir)+20];
442 snprintf(unlink_all, strlen(testdir)+20, "rm -rf %s", testdir);
443 int r;
444 r = system(unlink_all); CKERR(r);
445 r = toku_os_mkdir(testdir, 0755); CKERR(r);
446
447 if (ascending_keys + descending_keys + random_keys == 0)
448 ascending_keys = 1;
449
450 // run test
451 test_extractor(nrows, nrowsets, testdir);
452
453 r = system(unlink_all); CKERR(r);
454
455 return 0;
456 }
457