1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 #include "cachetable-test.h"
41 
42 //
43 // This test ensures that get_and_pin with dependent nodes works
44 // as intended with checkpoints, by having multiple threads changing
45 // values on elements in data, and ensure that checkpoints always get snapshots
46 // such that the sum of all the elements in data are 0.
47 //
48 
49 // The arrays
50 
51 #define NUM_ELEMENTS 100
52 #define NUM_MOVER_THREADS 4
53 
54 int64_t data[NUM_ELEMENTS];
55 int64_t checkpointed_data[NUM_ELEMENTS];
56 PAIR data_pair[NUM_ELEMENTS];
57 
58 uint32_t time_of_test;
59 bool run_test;
60 
61 static void
clone_callback(void * value_data,void ** cloned_value_data,long * clone_size,PAIR_ATTR * new_attr,bool UU (for_checkpoint),void * UU (write_extraargs))62 clone_callback(
63     void* value_data,
64     void** cloned_value_data,
65     long* clone_size,
66     PAIR_ATTR* new_attr,
67     bool UU(for_checkpoint),
68     void* UU(write_extraargs)
69     )
70 {
71     new_attr->is_valid = false;
72     int64_t* XMALLOC(data_val);
73     *data_val = *(int64_t *)value_data;
74     *cloned_value_data = data_val;
75     *clone_size = 8;
76 }
77 
78 
79 static void
flush(CACHEFILE f,int UU (fd),CACHEKEY k,void * v,void ** UU (dd),void * e,PAIR_ATTR s,PAIR_ATTR * new_size,bool write_me,bool keep_me,bool checkpoint_me,bool UU (is_clone))80 flush (CACHEFILE f __attribute__((__unused__)),
81        int UU(fd),
82        CACHEKEY k  __attribute__((__unused__)),
83        void *v     __attribute__((__unused__)),
84        void** UU(dd),
85        void *e     __attribute__((__unused__)),
86        PAIR_ATTR s      __attribute__((__unused__)),
87        PAIR_ATTR* new_size      __attribute__((__unused__)),
88        bool write_me,
89        bool keep_me,
90        bool checkpoint_me,
91         bool UU(is_clone)
92        ) {
93     /* Do nothing */
94     int64_t val_to_write = *(int64_t *)v;
95     size_t data_index = (size_t)k.b;
96     assert(val_to_write != INT64_MAX);
97     if (write_me) {
98         usleep(10);
99         data[data_index] = val_to_write;
100         if (checkpoint_me) checkpointed_data[data_index] = val_to_write;
101     }
102     if (!keep_me) {
103         toku_free(v);
104     }
105 }
106 
107 static int
fetch(CACHEFILE f,PAIR p,int UU (fd),CACHEKEY k,uint32_t fullhash,void ** value,void ** UU (dd),PAIR_ATTR * sizep,int * dirtyp,void * extraargs)108 fetch (CACHEFILE f        __attribute__((__unused__)),
109        PAIR p,
110        int UU(fd),
111        CACHEKEY k,
112        uint32_t fullhash __attribute__((__unused__)),
113        void **value,
114        void** UU(dd),
115        PAIR_ATTR *sizep,
116        int  *dirtyp,
117        void *extraargs    __attribute__((__unused__))
118        ) {
119     *dirtyp = 0;
120     size_t data_index = (size_t)k.b;
121     assert(data[data_index] != INT64_MAX);
122 
123     int64_t* XMALLOC(data_val);
124     usleep(10);
125     *data_val = data[data_index];
126     data_pair[data_index] = p;
127     *value = data_val;
128     *sizep = make_pair_attr(8);
129     return 0;
130 }
131 
test_time(void * arg)132 static void *test_time(void *arg) {
133     //
134     // if num_Seconds is set to 0, run indefinitely
135     //
136     if (time_of_test != 0) {
137         usleep(time_of_test*1000*1000);
138         if (verbose) printf("should now end test\n");
139         run_test = false;
140     }
141     if (verbose) printf("should be ending test now\n");
142     return arg;
143 }
144 
145 CACHETABLE ct;
146 CACHEFILE f1;
147 
move_numbers(void * arg)148 static void *move_numbers(void *arg) {
149     while (run_test) {
150         int rand_key1 = 0;
151         int rand_key2 = 0;
152         int less;
153         int greater;
154         int r;
155         while (rand_key1 == rand_key2) {
156             rand_key1 = random() % NUM_ELEMENTS;
157             rand_key2 = random() % NUM_ELEMENTS;
158             less = (rand_key1 < rand_key2) ? rand_key1 : rand_key2;
159             greater = (rand_key1 > rand_key2) ? rand_key1 : rand_key2;
160         }
161         assert(less < greater);
162 
163         /*
164         while (rand_key1 == rand_key2) {
165             rand_key1 = random() % (NUM_ELEMENTS/2);
166             rand_key2 = (NUM_ELEMENTS-1) - rand_key1;
167             less = (rand_key1 < rand_key2) ? rand_key1 : rand_key2;
168             greater = (rand_key1 > rand_key2) ? rand_key1 : rand_key2;
169         }
170         assert(less < greater);
171         */
172 
173         void* v1;
174         CACHEKEY less_key;
175         less_key.b = less;
176         uint32_t less_fullhash = less;
177         enum cachetable_dirty less_dirty = CACHETABLE_DIRTY;
178         CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
179         wc.flush_callback = flush;
180         wc.clone_callback = clone_callback;
181         r = toku_cachetable_get_and_pin_with_dep_pairs(
182             f1,
183             less_key,
184             less,
185             &v1,
186             wc, fetch, def_pf_req_callback, def_pf_callback,
187             PL_WRITE_CHEAP,
188             NULL,
189             0, //num_dependent_pairs
190             NULL,
191             NULL
192             );
193         assert(r==0);
194         int64_t* first_val = (int64_t *)v1;
195 
196         CACHEKEY greater_key;
197         greater_key.b = greater;
198         uint32_t greater_fullhash = greater;
199         enum cachetable_dirty greater_dirty = CACHETABLE_DIRTY;
200         PAIR dep_pair = data_pair[less];
201         r = toku_cachetable_get_and_pin_with_dep_pairs(
202             f1,
203             make_blocknum(greater),
204             greater,
205             &v1,
206             wc, fetch, def_pf_req_callback, def_pf_callback,
207             PL_WRITE_CHEAP,
208             NULL,
209             1, //num_dependent_pairs
210             &dep_pair,
211             &less_dirty
212             );
213         assert(r==0);
214 
215         int64_t* second_val = (int64_t *)v1;
216         assert(second_val != first_val); // sanity check that we are messing with different vals
217         assert(*first_val != INT64_MAX);
218         assert(*second_val != INT64_MAX);
219         usleep(10);
220         (*first_val)++;
221         (*second_val)--;
222         r = toku_test_cachetable_unpin(f1, less_key, less_fullhash, less_dirty, make_pair_attr(8));
223 
224         int third = 0;
225         int num_possible_values = (NUM_ELEMENTS-1) - greater;
226         if (num_possible_values > 0) {
227             third = (random() % (num_possible_values)) + greater + 1;
228             CACHEKEY third_key;
229             third_key.b = third;
230             dep_pair = data_pair[greater];
231             uint32_t third_fullhash = third;
232             enum cachetable_dirty third_dirty = CACHETABLE_DIRTY;
233             r = toku_cachetable_get_and_pin_with_dep_pairs(
234                 f1,
235                 make_blocknum(third),
236                 third,
237                 &v1,
238                 wc, fetch, def_pf_req_callback, def_pf_callback,
239                 PL_WRITE_CHEAP,
240                 NULL,
241                 1, //num_dependent_pairs
242                 &dep_pair,
243                 &greater_dirty
244                 );
245             assert(r==0);
246 
247             int64_t* third_val = (int64_t *)v1;
248             assert(second_val != third_val); // sanity check that we are messing with different vals
249             usleep(10);
250             (*second_val)++;
251             (*third_val)--;
252             r = toku_test_cachetable_unpin(f1, third_key, third_fullhash, third_dirty, make_pair_attr(8));
253         }
254         r = toku_test_cachetable_unpin(f1, greater_key, greater_fullhash, greater_dirty, make_pair_attr(8));
255     }
256     return arg;
257 }
258 
read_random_numbers(void * arg)259 static void *read_random_numbers(void *arg) {
260     while(run_test) {
261         int rand_key1 = random() % NUM_ELEMENTS;
262         void* v1;
263         int r1;
264         CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
265         wc.flush_callback = flush;
266         wc.clone_callback = clone_callback;
267         r1 = toku_cachetable_get_and_pin_nonblocking(
268             f1,
269             make_blocknum(rand_key1),
270             rand_key1,
271             &v1,
272             wc, fetch, def_pf_req_callback, def_pf_callback,
273             PL_READ,
274             NULL,
275             NULL
276             );
277         if (r1 == 0) {
278             r1 = toku_test_cachetable_unpin(f1, make_blocknum(rand_key1), rand_key1, CACHETABLE_CLEAN, make_pair_attr(8));
279             assert(r1 == 0);
280         }
281     }
282     if (verbose) printf("leaving\n");
283     return arg;
284 }
285 
286 static int num_checkpoints = 0;
checkpoints(void * arg)287 static void *checkpoints(void *arg) {
288     // first verify that checkpointed_data is correct;
289     while(run_test) {
290         int64_t sum = 0;
291         for (int i = 0; i < NUM_ELEMENTS; i++) {
292             sum += checkpointed_data[i];
293         }
294         assert (sum==0);
295 
296         //
297         // now run a checkpoint
298         //
299         CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
300         toku_cachetable_begin_checkpoint(cp, NULL);
301         toku_cachetable_end_checkpoint(
302             cp,
303             NULL,
304             NULL,
305             NULL
306             );
307         assert (sum==0);
308         for (int i = 0; i < NUM_ELEMENTS; i++) {
309             sum += checkpointed_data[i];
310         }
311         assert (sum==0);
312         usleep(10*1024);
313         num_checkpoints++;
314     }
315     return arg;
316 }
317 
318 static void
test_begin_checkpoint(LSN UU (checkpoint_lsn),void * UU (header_v))319 test_begin_checkpoint (
320     LSN UU(checkpoint_lsn),
321     void* UU(header_v))
322 {
323     memcpy(checkpointed_data, data, sizeof(int64_t)*NUM_ELEMENTS);
324 }
325 
sum_vals(void)326 static void sum_vals(void) {
327     int64_t sum = 0;
328     for (int i = 0; i < NUM_ELEMENTS; i++) {
329         //printf("actual: i %d val %" PRId64 " \n", i, data[i]);
330         sum += data[i];
331     }
332     if (verbose) printf("actual sum %" PRId64 " \n", sum);
333     assert(sum == 0);
334     sum = 0;
335     for (int i = 0; i < NUM_ELEMENTS; i++) {
336         //printf("checkpointed: i %d val %" PRId64 " \n", i, checkpointed_data[i]);
337         sum += checkpointed_data[i];
338     }
339     if (verbose) printf("checkpointed sum %" PRId64 " \n", sum);
340     assert(sum == 0);
341 }
342 
343 static void
cachetable_test(void)344 cachetable_test (void) {
345     const int test_limit = NUM_ELEMENTS;
346 
347     //
348     // let's set up the data
349     //
350     for (int64_t i = 0; i < NUM_ELEMENTS; i++) {
351         data[i] = 0;
352         checkpointed_data[i] = 0;
353     }
354     time_of_test = 30;
355 
356     int r;
357 
358     toku_cachetable_create(&ct, test_limit, ZERO_LSN, nullptr);
359     const char *fname1 = TOKU_TEST_FILENAME;
360     unlink(fname1);
361     r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
362 
363     toku_cachefile_set_userdata(
364         f1,
365         NULL,
366         &dummy_log_fassociate,
367         &dummy_close_usr,
368         &dummy_free_usr,
369         &dummy_chckpnt_usr,
370         &test_begin_checkpoint,
371         &dummy_end,
372         &dummy_note_pin,
373         &dummy_note_unpin
374         );
375 
376     toku_pthread_t time_tid;
377     toku_pthread_t checkpoint_tid;
378     toku_pthread_t move_tid[NUM_MOVER_THREADS];
379     toku_pthread_t read_random_tid[NUM_MOVER_THREADS];
380     run_test = true;
381 
382     for (int i = 0; i < NUM_MOVER_THREADS; i++) {
383         r = toku_pthread_create(toku_uninstrumented,
384                                 &read_random_tid[i],
385                                 nullptr,
386                                 read_random_numbers,
387                                 nullptr);
388         assert_zero(r);
389     }
390     for (int i = 0; i < NUM_MOVER_THREADS; i++) {
391         r = toku_pthread_create(toku_uninstrumented,
392                                 &move_tid[i],
393                                 nullptr,
394                                 move_numbers,
395                                 nullptr);
396         assert_zero(r);
397     }
398     r = toku_pthread_create(
399         toku_uninstrumented, &checkpoint_tid, nullptr, checkpoints, nullptr);
400     assert_zero(r);
401     r = toku_pthread_create(
402         toku_uninstrumented, &time_tid, nullptr, test_time, nullptr);
403     assert_zero(r);
404 
405     void *ret;
406     r = toku_pthread_join(time_tid, &ret);
407     assert_zero(r);
408     r = toku_pthread_join(checkpoint_tid, &ret);
409     assert_zero(r);
410     for (int i = 0; i < NUM_MOVER_THREADS; i++) {
411         r = toku_pthread_join(move_tid[i], &ret);
412         assert_zero(r);
413     }
414     for (int i = 0; i < NUM_MOVER_THREADS; i++) {
415         r = toku_pthread_join(read_random_tid[i], &ret);
416         assert_zero(r);
417     }
418 
419     toku_cachetable_verify(ct);
420     toku_cachefile_close(&f1, false, ZERO_LSN);
421     toku_cachetable_close(&ct);
422 
423     sum_vals();
424     if (verbose) printf("num_checkpoints %d\n", num_checkpoints);
425 
426 }
427 
428 int
test_main(int argc,const char * argv[])429 test_main(int argc, const char *argv[]) {
430   default_parse_args(argc, argv);
431   cachetable_test();
432   return 0;
433 }
434