1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 #include "cachetable-test.h"
41 
42 //
43 // This test ensures that get_and_pin with dependent nodes works
44 // as intended with checkpoints, by having multiple threads changing
45 // values on elements in data, and ensure that checkpoints always get snapshots
46 // such that the sum of all the elements in data are 0.
47 //
48 
49 // The arrays
50 
51 // must be power of 2 minus 1
52 #define NUM_ELEMENTS 127
53 // must be (NUM_ELEMENTS +1)/2 - 1
54 #define NUM_INTERNAL 63
55 #define NUM_MOVER_THREADS 4
56 
57 int64_t data[NUM_ELEMENTS];
58 int64_t checkpointed_data[NUM_ELEMENTS];
59 PAIR data_pair[NUM_ELEMENTS];
60 
61 uint32_t time_of_test;
62 bool run_test;
63 
64 static void
put_callback_pair(CACHEKEY key,void * UU (v),PAIR p)65 put_callback_pair(
66     CACHEKEY key,
67     void *UU(v),
68     PAIR p)
69 {
70     int64_t data_index = key.b;
71     data_pair[data_index] = p;
72 }
73 
74 static void
clone_callback(void * value_data,void ** cloned_value_data,long * clone_size,PAIR_ATTR * new_attr,bool UU (for_checkpoint),void * UU (write_extraargs))75 clone_callback(
76     void* value_data,
77     void** cloned_value_data,
78     long* clone_size,
79     PAIR_ATTR* new_attr,
80     bool UU(for_checkpoint),
81     void* UU(write_extraargs)
82     )
83 {
84     new_attr->is_valid = false;
85     int64_t* XMALLOC(data_val);
86     *data_val = *(int64_t *)value_data;
87     *cloned_value_data = data_val;
88     *new_attr = make_pair_attr(8);
89     *clone_size = 8;
90 }
91 
92 static void
flush(CACHEFILE f,int UU (fd),CACHEKEY k,void * v,void ** UU (dd),void * e,PAIR_ATTR s,PAIR_ATTR * new_size,bool write_me,bool keep_me,bool checkpoint_me,bool UU (is_clone))93 flush (CACHEFILE f __attribute__((__unused__)),
94        int UU(fd),
95        CACHEKEY k  __attribute__((__unused__)),
96        void *v     __attribute__((__unused__)),
97        void** UU(dd),
98        void *e     __attribute__((__unused__)),
99        PAIR_ATTR s      __attribute__((__unused__)),
100        PAIR_ATTR* new_size,
101        bool write_me,
102        bool keep_me,
103        bool checkpoint_me,
104         bool UU(is_clone)
105        ) {
106     int64_t val_to_write = *(int64_t *)v;
107     size_t data_index = (size_t)k.b;
108     if (write_me) {
109         usleep(10);
110         *new_size = make_pair_attr(8);
111         data[data_index] = val_to_write;
112         if (checkpoint_me) checkpointed_data[data_index] = val_to_write;
113     }
114     if (!keep_me) {
115         toku_free(v);
116     }
117 }
118 
119 static int
fetch(CACHEFILE f,PAIR p,int UU (fd),CACHEKEY k,uint32_t fullhash,void ** value,void ** UU (dd),PAIR_ATTR * sizep,int * dirtyp,void * extraargs)120 fetch (CACHEFILE f        __attribute__((__unused__)),
121        PAIR p,
122        int UU(fd),
123        CACHEKEY k,
124        uint32_t fullhash __attribute__((__unused__)),
125        void **value,
126        void** UU(dd),
127        PAIR_ATTR *sizep,
128        int  *dirtyp,
129        void *extraargs    __attribute__((__unused__))
130        ) {
131     *dirtyp = 0;
132     size_t data_index = (size_t)k.b;
133     // assert that data_index is valid
134     // if it is INT64_MAX, then that means
135     // the block is not supposed to be in the cachetable
136     assert(data[data_index] != INT64_MAX);
137 
138     int64_t* XMALLOC(data_val);
139     usleep(10);
140     *data_val = data[data_index];
141     data_pair[data_index] = p;
142     *value = data_val;
143     *sizep = make_pair_attr(8);
144     return 0;
145 }
146 
test_time(void * arg)147 static void *test_time(void *arg) {
148     //
149     // if num_Seconds is set to 0, run indefinitely
150     //
151     if (time_of_test != 0) {
152         usleep(time_of_test*1000*1000);
153         if (verbose) printf("should now end test\n");
154         run_test = false;
155     }
156     if (verbose) printf("should be ending test now\n");
157     return arg;
158 }
159 
160 CACHETABLE ct;
161 CACHEFILE f1;
162 
move_number_to_child(int parent,int64_t * parent_val,enum cachetable_dirty parent_dirty)163 static void move_number_to_child(
164     int parent,
165     int64_t* parent_val,
166     enum cachetable_dirty parent_dirty
167     )
168 {
169     int child = 0;
170     int r;
171     child = ((random() % 2) == 0) ? (2*parent + 1) : (2*parent + 2);
172 
173     void* v1;
174     CACHEKEY parent_key;
175     parent_key.b = parent;
176     uint32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
177 
178 
179     CACHEKEY child_key;
180     child_key.b = child;
181     uint32_t child_fullhash = toku_cachetable_hash(f1, child_key);
182     CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
183     wc.flush_callback = flush;
184     wc.clone_callback = clone_callback;
185     PAIR dep_pair = data_pair[parent];
186     r = toku_cachetable_get_and_pin_with_dep_pairs(
187         f1,
188         child_key,
189         child_fullhash,
190         &v1,
191         wc, fetch, def_pf_req_callback, def_pf_callback,
192         PL_WRITE_CHEAP,
193         NULL,
194         1, //num_dependent_pairs
195         &dep_pair,
196         &parent_dirty
197         );
198     assert(r==0);
199 
200     int64_t* child_val = (int64_t *)v1;
201     assert(child_val != parent_val); // sanity check that we are messing with different vals
202     assert(*parent_val != INT64_MAX);
203     assert(*child_val != INT64_MAX);
204     usleep(10);
205     (*parent_val)++;
206     (*child_val)--;
207     r = toku_test_cachetable_unpin(f1, parent_key, parent_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
208     assert_zero(r);
209     if (child < NUM_INTERNAL) {
210         move_number_to_child(child, child_val, CACHETABLE_DIRTY);
211     }
212     else {
213         r = toku_test_cachetable_unpin(f1, child_key, child_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
214         assert_zero(r);
215     }
216 }
217 
move_numbers(void * arg)218 static void *move_numbers(void *arg) {
219     while (run_test) {
220         int parent = 0;
221         int r;
222         void* v1;
223         CACHEKEY parent_key;
224         parent_key.b = parent;
225         uint32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
226         CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
227         wc.flush_callback = flush;
228         wc.clone_callback = clone_callback;
229         r = toku_cachetable_get_and_pin_with_dep_pairs(
230             f1,
231             parent_key,
232             parent_fullhash,
233             &v1,
234             wc, fetch, def_pf_req_callback, def_pf_callback,
235             PL_WRITE_CHEAP,
236             NULL,
237             0, //num_dependent_pairs
238             NULL,
239             NULL
240             );
241         assert(r==0);
242         int64_t* parent_val = (int64_t *)v1;
243         move_number_to_child(parent, parent_val, CACHETABLE_CLEAN);
244     }
245     return arg;
246 }
247 
remove_data(CACHEKEY * cachekey,bool for_checkpoint,void * UU (extra))248 static void remove_data(CACHEKEY* cachekey, bool for_checkpoint, void* UU(extra)) {
249     assert(cachekey->b < NUM_ELEMENTS);
250     data[cachekey->b] = INT64_MAX;
251     if (for_checkpoint) {
252         checkpointed_data[cachekey->b] = INT64_MAX;
253     }
254 }
255 
256 
get_data(CACHEKEY * cachekey,uint32_t * fullhash,void * extra)257 static void get_data(CACHEKEY* cachekey, uint32_t* fullhash, void* extra) {
258     int* CAST_FROM_VOIDP(key, extra);
259     cachekey->b = *key;
260     *fullhash = toku_cachetable_hash(f1, *cachekey);
261     data[*key] = INT64_MAX - 1;
262 }
263 
merge_and_split_child(int parent,int64_t * parent_val,enum cachetable_dirty parent_dirty)264 static void merge_and_split_child(
265     int parent,
266     int64_t* parent_val,
267     enum cachetable_dirty parent_dirty
268     )
269 {
270     int child = 0;
271     int other_child = 0;
272     int r;
273     bool even = (random() % 2) == 0;
274     child = (even) ? (2*parent + 1) : (2*parent + 2);
275     other_child = (!even) ? (2*parent + 1) : (2*parent + 2);
276     assert(child != other_child);
277 
278     void* v1;
279 
280     CACHEKEY parent_key;
281     parent_key.b = parent;
282     uint32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
283 
284     CACHEKEY child_key;
285     child_key.b = child;
286     uint32_t child_fullhash = toku_cachetable_hash(f1, child_key);
287     enum cachetable_dirty child_dirty = CACHETABLE_CLEAN;
288     CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
289     wc.flush_callback = flush;
290     wc.clone_callback = clone_callback;
291     PAIR dep_pair = data_pair[parent];
292     r = toku_cachetable_get_and_pin_with_dep_pairs(
293         f1,
294         child_key,
295         child_fullhash,
296         &v1,
297         wc, fetch, def_pf_req_callback, def_pf_callback,
298         PL_WRITE_CHEAP,
299         NULL,
300         1, //num_dependent_pairs
301         &dep_pair,
302         &parent_dirty
303         );
304     assert(r==0);
305     int64_t* child_val = (int64_t *)v1;
306 
307     CACHEKEY other_child_key;
308     other_child_key.b = other_child;
309     uint32_t other_child_fullhash = toku_cachetable_hash(f1, other_child_key);
310     enum cachetable_dirty dirties[2];
311     dirties[0] = parent_dirty;
312     dirties[1] = child_dirty;
313     PAIR dep_pairs[2];
314     dep_pairs[0] = data_pair[parent];
315     dep_pairs[1] = data_pair[child];
316 
317     r = toku_cachetable_get_and_pin_with_dep_pairs(
318         f1,
319         other_child_key,
320         other_child_fullhash,
321         &v1,
322         wc, fetch, def_pf_req_callback, def_pf_callback,
323         PL_WRITE_CHEAP,
324         NULL,
325         2, //num_dependent_pairs
326         dep_pairs,
327         dirties
328         );
329     assert(r==0);
330     int64_t* other_child_val = (int64_t *)v1;
331     assert(*parent_val != INT64_MAX);
332     assert(*child_val != INT64_MAX);
333     assert(*other_child_val != INT64_MAX);
334 
335     // lets get rid of other_child_val with a merge
336     *child_val += *other_child_val;
337     *other_child_val = INT64_MAX;
338     toku_test_cachetable_unpin_and_remove(f1, other_child_key, remove_data, NULL);
339     dirties[1] = CACHETABLE_DIRTY;
340     child_dirty = CACHETABLE_DIRTY;
341 
342     // now do a split
343     CACHEKEY new_key;
344     uint32_t new_fullhash;
345     int64_t* XMALLOC(data_val);
346     toku_cachetable_put_with_dep_pairs(
347           f1,
348           get_data,
349           data_val,
350           make_pair_attr(8),
351           wc,
352           &other_child,
353           2, // number of dependent pairs that we may need to checkpoint
354           dep_pairs,
355           dirties,
356           &new_key,
357           &new_fullhash,
358           put_callback_pair
359           );
360     assert(new_key.b == other_child);
361     assert(new_fullhash == other_child_fullhash);
362     *data_val = 5000;
363     *child_val -= 5000;
364 
365     r = toku_test_cachetable_unpin(f1, parent_key, parent_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
366     assert_zero(r);
367     r = toku_test_cachetable_unpin(f1, other_child_key, other_child_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
368     assert_zero(r);
369     if (child < NUM_INTERNAL) {
370         merge_and_split_child(child, child_val, CACHETABLE_DIRTY);
371     }
372     else {
373         r = toku_test_cachetable_unpin(f1, child_key, child_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
374         assert_zero(r);
375     }
376 }
377 
merge_and_split(void * arg)378 static void *merge_and_split(void *arg) {
379     while (run_test) {
380         int parent = 0;
381         int r;
382         void* v1;
383         CACHEKEY parent_key;
384         parent_key.b = parent;
385         uint32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
386         CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
387         wc.flush_callback = flush;
388         wc.clone_callback = clone_callback;
389         r = toku_cachetable_get_and_pin_with_dep_pairs(
390             f1,
391             parent_key,
392             parent_fullhash,
393             &v1,
394             wc, fetch, def_pf_req_callback, def_pf_callback,
395             PL_WRITE_CHEAP,
396             NULL,
397             0, //num_dependent_pairs
398             NULL,
399             NULL
400             );
401         assert(r==0);
402         int64_t* parent_val = (int64_t *)v1;
403         merge_and_split_child(parent, parent_val, CACHETABLE_CLEAN);
404     }
405     return arg;
406 }
407 
408 static int num_checkpoints = 0;
checkpoints(void * arg)409 static void *checkpoints(void *arg) {
410     // first verify that checkpointed_data is correct;
411     while(run_test) {
412         int64_t sum = 0;
413         for (int i = 0; i < NUM_ELEMENTS; i++) {
414             if (checkpointed_data[i] != INT64_MAX) {
415                 sum += checkpointed_data[i];
416             }
417         }
418         assert (sum==0);
419 
420         //
421         // now run a checkpoint
422         //
423         CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
424         toku_cachetable_begin_checkpoint(cp, NULL);
425         toku_cachetable_end_checkpoint(
426             cp,
427             NULL,
428             NULL,
429             NULL
430             );
431         assert (sum==0);
432         for (int i = 0; i < NUM_ELEMENTS; i++) {
433             if (checkpointed_data[i] != INT64_MAX) {
434                 sum += checkpointed_data[i];
435             }
436         }
437         assert (sum==0);
438         num_checkpoints++;
439     }
440     return arg;
441 }
442 
443 static void
test_begin_checkpoint(LSN UU (checkpoint_lsn),void * UU (header_v))444 test_begin_checkpoint (
445     LSN UU(checkpoint_lsn),
446     void* UU(header_v))
447 {
448     memcpy(checkpointed_data, data, sizeof(int64_t)*NUM_ELEMENTS);
449 }
450 
sum_vals(void)451 static void sum_vals(void) {
452     int64_t sum = 0;
453     for (int i = 0; i < NUM_ELEMENTS; i++) {
454         //printf("actual: i %d val %" PRId64 " \n", i, data[i]);
455         if (data[i] != INT64_MAX) {
456             sum += data[i];
457         }
458     }
459     if (verbose) printf("actual sum %" PRId64 " \n", sum);
460     assert(sum == 0);
461     sum = 0;
462     for (int i = 0; i < NUM_ELEMENTS; i++) {
463         //printf("checkpointed: i %d val %" PRId64 " \n", i, checkpointed_data[i]);
464         if (checkpointed_data[i] != INT64_MAX) {
465             sum += checkpointed_data[i];
466         }
467     }
468     if (verbose) printf("checkpointed sum %" PRId64 " \n", sum);
469     assert(sum == 0);
470 }
471 
472 static void
cachetable_test(void)473 cachetable_test (void) {
474     const int test_limit = NUM_ELEMENTS;
475 
476     //
477     // let's set up the data
478     //
479     for (int64_t i = 0; i < NUM_ELEMENTS; i++) {
480         data[i] = 0;
481         checkpointed_data[i] = 0;
482     }
483     time_of_test = 60;
484 
485     int r;
486 
487     toku_cachetable_create(&ct, test_limit, ZERO_LSN, nullptr);
488     const char *fname1 = TOKU_TEST_FILENAME;
489     unlink(fname1);
490     r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
491 
492     toku_cachefile_set_userdata(
493         f1,
494         NULL,
495         &dummy_log_fassociate,
496         &dummy_close_usr,
497         &dummy_free_usr,
498         &dummy_chckpnt_usr,
499         test_begin_checkpoint, // called in begin_checkpoint
500         &dummy_end,
501         &dummy_note_pin,
502         &dummy_note_unpin
503         );
504 
505     toku_pthread_t time_tid;
506     toku_pthread_t checkpoint_tid;
507     toku_pthread_t move_tid[NUM_MOVER_THREADS];
508     toku_pthread_t merge_and_split_tid[NUM_MOVER_THREADS];
509     run_test = true;
510 
511     for (int i = 0; i < NUM_MOVER_THREADS; i++) {
512         r = toku_pthread_create(toku_uninstrumented,
513                                 &move_tid[i],
514                                 nullptr,
515                                 move_numbers,
516                                 nullptr);
517         assert_zero(r);
518     }
519     for (int i = 0; i < NUM_MOVER_THREADS; i++) {
520         r = toku_pthread_create(toku_uninstrumented,
521                                 &merge_and_split_tid[i],
522                                 nullptr,
523                                 merge_and_split,
524                                 nullptr);
525         assert_zero(r);
526     }
527     r = toku_pthread_create(
528         toku_uninstrumented, &checkpoint_tid, nullptr, checkpoints, nullptr);
529     assert_zero(r);
530     r = toku_pthread_create(
531         toku_uninstrumented, &time_tid, nullptr, test_time, nullptr);
532     assert_zero(r);
533 
534     void *ret;
535     r = toku_pthread_join(time_tid, &ret);
536     assert_zero(r);
537     r = toku_pthread_join(checkpoint_tid, &ret);
538     assert_zero(r);
539     for (int i = 0; i < NUM_MOVER_THREADS; i++) {
540         r = toku_pthread_join(merge_and_split_tid[i], &ret);
541         assert_zero(r);
542     }
543     for (int i = 0; i < NUM_MOVER_THREADS; i++) {
544         r = toku_pthread_join(move_tid[i], &ret);
545         assert_zero(r);
546     }
547 
548     toku_cachetable_verify(ct);
549     toku_cachefile_close(&f1, false, ZERO_LSN);
550     toku_cachetable_close(&ct);
551 
552     sum_vals();
553     if (verbose) printf("num_checkpoints %d\n", num_checkpoints);
554 
555 }
556 
557 int
test_main(int argc,const char * argv[])558 test_main(int argc, const char *argv[]) {
559   default_parse_args(argc, argv);
560   cachetable_test();
561   return 0;
562 }
563