1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40 #include "cachetable-test.h"
41
42 //
43 // This test ensures that get_and_pin with dependent nodes works
44 // as intended with checkpoints, by having multiple threads changing
45 // values on elements in data, and ensure that checkpoints always get snapshots
46 // such that the sum of all the elements in data are 0.
47 //
48
49 // The arrays
50
51 #define NUM_ELEMENTS 100
52 #define NUM_MOVER_THREADS 4
53
54 int64_t data[NUM_ELEMENTS];
55 int64_t checkpointed_data[NUM_ELEMENTS];
56 PAIR data_pair[NUM_ELEMENTS];
57
58 uint32_t time_of_test;
59 bool run_test;
60
61 static void
clone_callback(void * value_data,void ** cloned_value_data,long * clone_size,PAIR_ATTR * new_attr,bool UU (for_checkpoint),void * UU (write_extraargs))62 clone_callback(
63 void* value_data,
64 void** cloned_value_data,
65 long* clone_size,
66 PAIR_ATTR* new_attr,
67 bool UU(for_checkpoint),
68 void* UU(write_extraargs)
69 )
70 {
71 new_attr->is_valid = false;
72 int64_t* XMALLOC(data_val);
73 *data_val = *(int64_t *)value_data;
74 *cloned_value_data = data_val;
75 *clone_size = 8;
76 }
77
78
79 static void
flush(CACHEFILE f,int UU (fd),CACHEKEY k,void * v,void ** UU (dd),void * e,PAIR_ATTR s,PAIR_ATTR * new_size,bool write_me,bool keep_me,bool checkpoint_me,bool UU (is_clone))80 flush (CACHEFILE f __attribute__((__unused__)),
81 int UU(fd),
82 CACHEKEY k __attribute__((__unused__)),
83 void *v __attribute__((__unused__)),
84 void** UU(dd),
85 void *e __attribute__((__unused__)),
86 PAIR_ATTR s __attribute__((__unused__)),
87 PAIR_ATTR* new_size __attribute__((__unused__)),
88 bool write_me,
89 bool keep_me,
90 bool checkpoint_me,
91 bool UU(is_clone)
92 ) {
93 /* Do nothing */
94 int64_t val_to_write = *(int64_t *)v;
95 size_t data_index = (size_t)k.b;
96 assert(val_to_write != INT64_MAX);
97 if (write_me) {
98 usleep(10);
99 data[data_index] = val_to_write;
100 if (checkpoint_me) checkpointed_data[data_index] = val_to_write;
101 }
102 if (!keep_me) {
103 toku_free(v);
104 }
105 }
106
107 static int
fetch(CACHEFILE f,PAIR p,int UU (fd),CACHEKEY k,uint32_t fullhash,void ** value,void ** UU (dd),PAIR_ATTR * sizep,int * dirtyp,void * extraargs)108 fetch (CACHEFILE f __attribute__((__unused__)),
109 PAIR p,
110 int UU(fd),
111 CACHEKEY k,
112 uint32_t fullhash __attribute__((__unused__)),
113 void **value,
114 void** UU(dd),
115 PAIR_ATTR *sizep,
116 int *dirtyp,
117 void *extraargs __attribute__((__unused__))
118 ) {
119 *dirtyp = 0;
120 size_t data_index = (size_t)k.b;
121 assert(data[data_index] != INT64_MAX);
122
123 int64_t* XMALLOC(data_val);
124 usleep(10);
125 *data_val = data[data_index];
126 data_pair[data_index] = p;
127 *value = data_val;
128 *sizep = make_pair_attr(8);
129 return 0;
130 }
131
test_time(void * arg)132 static void *test_time(void *arg) {
133 //
134 // if num_Seconds is set to 0, run indefinitely
135 //
136 if (time_of_test != 0) {
137 usleep(time_of_test*1000*1000);
138 if (verbose) printf("should now end test\n");
139 run_test = false;
140 }
141 if (verbose) printf("should be ending test now\n");
142 return arg;
143 }
144
145 CACHETABLE ct;
146 CACHEFILE f1;
147
move_numbers(void * arg)148 static void *move_numbers(void *arg) {
149 while (run_test) {
150 int rand_key1 = 0;
151 int rand_key2 = 0;
152 int less;
153 int greater;
154 int r;
155 while (rand_key1 == rand_key2) {
156 rand_key1 = random() % NUM_ELEMENTS;
157 rand_key2 = random() % NUM_ELEMENTS;
158 less = (rand_key1 < rand_key2) ? rand_key1 : rand_key2;
159 greater = (rand_key1 > rand_key2) ? rand_key1 : rand_key2;
160 }
161 assert(less < greater);
162
163 /*
164 while (rand_key1 == rand_key2) {
165 rand_key1 = random() % (NUM_ELEMENTS/2);
166 rand_key2 = (NUM_ELEMENTS-1) - rand_key1;
167 less = (rand_key1 < rand_key2) ? rand_key1 : rand_key2;
168 greater = (rand_key1 > rand_key2) ? rand_key1 : rand_key2;
169 }
170 assert(less < greater);
171 */
172
173 void* v1;
174 CACHEKEY less_key;
175 less_key.b = less;
176 uint32_t less_fullhash = less;
177 enum cachetable_dirty less_dirty = CACHETABLE_DIRTY;
178 CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
179 wc.flush_callback = flush;
180 wc.clone_callback = clone_callback;
181 r = toku_cachetable_get_and_pin_with_dep_pairs(
182 f1,
183 less_key,
184 less,
185 &v1,
186 wc, fetch, def_pf_req_callback, def_pf_callback,
187 PL_WRITE_CHEAP,
188 NULL,
189 0, //num_dependent_pairs
190 NULL,
191 NULL
192 );
193 assert(r==0);
194 int64_t* first_val = (int64_t *)v1;
195
196 CACHEKEY greater_key;
197 greater_key.b = greater;
198 uint32_t greater_fullhash = greater;
199 enum cachetable_dirty greater_dirty = CACHETABLE_DIRTY;
200 PAIR dep_pair = data_pair[less];
201 r = toku_cachetable_get_and_pin_with_dep_pairs(
202 f1,
203 make_blocknum(greater),
204 greater,
205 &v1,
206 wc, fetch, def_pf_req_callback, def_pf_callback,
207 PL_WRITE_CHEAP,
208 NULL,
209 1, //num_dependent_pairs
210 &dep_pair,
211 &less_dirty
212 );
213 assert(r==0);
214
215 int64_t* second_val = (int64_t *)v1;
216 assert(second_val != first_val); // sanity check that we are messing with different vals
217 assert(*first_val != INT64_MAX);
218 assert(*second_val != INT64_MAX);
219 usleep(10);
220 (*first_val)++;
221 (*second_val)--;
222 r = toku_test_cachetable_unpin(f1, less_key, less_fullhash, less_dirty, make_pair_attr(8));
223
224 int third = 0;
225 int num_possible_values = (NUM_ELEMENTS-1) - greater;
226 if (num_possible_values > 0) {
227 third = (random() % (num_possible_values)) + greater + 1;
228 CACHEKEY third_key;
229 third_key.b = third;
230 dep_pair = data_pair[greater];
231 uint32_t third_fullhash = third;
232 enum cachetable_dirty third_dirty = CACHETABLE_DIRTY;
233 r = toku_cachetable_get_and_pin_with_dep_pairs(
234 f1,
235 make_blocknum(third),
236 third,
237 &v1,
238 wc, fetch, def_pf_req_callback, def_pf_callback,
239 PL_WRITE_CHEAP,
240 NULL,
241 1, //num_dependent_pairs
242 &dep_pair,
243 &greater_dirty
244 );
245 assert(r==0);
246
247 int64_t* third_val = (int64_t *)v1;
248 assert(second_val != third_val); // sanity check that we are messing with different vals
249 usleep(10);
250 (*second_val)++;
251 (*third_val)--;
252 r = toku_test_cachetable_unpin(f1, third_key, third_fullhash, third_dirty, make_pair_attr(8));
253 }
254 r = toku_test_cachetable_unpin(f1, greater_key, greater_fullhash, greater_dirty, make_pair_attr(8));
255 }
256 return arg;
257 }
258
read_random_numbers(void * arg)259 static void *read_random_numbers(void *arg) {
260 while(run_test) {
261 int rand_key1 = random() % NUM_ELEMENTS;
262 void* v1;
263 int r1;
264 CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
265 wc.flush_callback = flush;
266 wc.clone_callback = clone_callback;
267 r1 = toku_cachetable_get_and_pin_nonblocking(
268 f1,
269 make_blocknum(rand_key1),
270 rand_key1,
271 &v1,
272 wc, fetch, def_pf_req_callback, def_pf_callback,
273 PL_READ,
274 NULL,
275 NULL
276 );
277 if (r1 == 0) {
278 r1 = toku_test_cachetable_unpin(f1, make_blocknum(rand_key1), rand_key1, CACHETABLE_CLEAN, make_pair_attr(8));
279 assert(r1 == 0);
280 }
281 }
282 if (verbose) printf("leaving\n");
283 return arg;
284 }
285
286 static int num_checkpoints = 0;
checkpoints(void * arg)287 static void *checkpoints(void *arg) {
288 // first verify that checkpointed_data is correct;
289 while(run_test) {
290 int64_t sum = 0;
291 for (int i = 0; i < NUM_ELEMENTS; i++) {
292 sum += checkpointed_data[i];
293 }
294 assert (sum==0);
295
296 //
297 // now run a checkpoint
298 //
299 CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
300 toku_cachetable_begin_checkpoint(cp, NULL);
301 toku_cachetable_end_checkpoint(
302 cp,
303 NULL,
304 NULL,
305 NULL
306 );
307 assert (sum==0);
308 for (int i = 0; i < NUM_ELEMENTS; i++) {
309 sum += checkpointed_data[i];
310 }
311 assert (sum==0);
312 usleep(10*1024);
313 num_checkpoints++;
314 }
315 return arg;
316 }
317
318 static void
test_begin_checkpoint(LSN UU (checkpoint_lsn),void * UU (header_v))319 test_begin_checkpoint (
320 LSN UU(checkpoint_lsn),
321 void* UU(header_v))
322 {
323 memcpy(checkpointed_data, data, sizeof(int64_t)*NUM_ELEMENTS);
324 }
325
sum_vals(void)326 static void sum_vals(void) {
327 int64_t sum = 0;
328 for (int i = 0; i < NUM_ELEMENTS; i++) {
329 //printf("actual: i %d val %" PRId64 " \n", i, data[i]);
330 sum += data[i];
331 }
332 if (verbose) printf("actual sum %" PRId64 " \n", sum);
333 assert(sum == 0);
334 sum = 0;
335 for (int i = 0; i < NUM_ELEMENTS; i++) {
336 //printf("checkpointed: i %d val %" PRId64 " \n", i, checkpointed_data[i]);
337 sum += checkpointed_data[i];
338 }
339 if (verbose) printf("checkpointed sum %" PRId64 " \n", sum);
340 assert(sum == 0);
341 }
342
343 static void
cachetable_test(void)344 cachetable_test (void) {
345 const int test_limit = NUM_ELEMENTS;
346
347 //
348 // let's set up the data
349 //
350 for (int64_t i = 0; i < NUM_ELEMENTS; i++) {
351 data[i] = 0;
352 checkpointed_data[i] = 0;
353 }
354 time_of_test = 30;
355
356 int r;
357
358 toku_cachetable_create(&ct, test_limit, ZERO_LSN, nullptr);
359 const char *fname1 = TOKU_TEST_FILENAME;
360 unlink(fname1);
361 r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
362
363 toku_cachefile_set_userdata(
364 f1,
365 NULL,
366 &dummy_log_fassociate,
367 &dummy_close_usr,
368 &dummy_free_usr,
369 &dummy_chckpnt_usr,
370 &test_begin_checkpoint,
371 &dummy_end,
372 &dummy_note_pin,
373 &dummy_note_unpin
374 );
375
376 toku_pthread_t time_tid;
377 toku_pthread_t checkpoint_tid;
378 toku_pthread_t move_tid[NUM_MOVER_THREADS];
379 toku_pthread_t read_random_tid[NUM_MOVER_THREADS];
380 run_test = true;
381
382 for (int i = 0; i < NUM_MOVER_THREADS; i++) {
383 r = toku_pthread_create(toku_uninstrumented,
384 &read_random_tid[i],
385 nullptr,
386 read_random_numbers,
387 nullptr);
388 assert_zero(r);
389 }
390 for (int i = 0; i < NUM_MOVER_THREADS; i++) {
391 r = toku_pthread_create(toku_uninstrumented,
392 &move_tid[i],
393 nullptr,
394 move_numbers,
395 nullptr);
396 assert_zero(r);
397 }
398 r = toku_pthread_create(
399 toku_uninstrumented, &checkpoint_tid, nullptr, checkpoints, nullptr);
400 assert_zero(r);
401 r = toku_pthread_create(
402 toku_uninstrumented, &time_tid, nullptr, test_time, nullptr);
403 assert_zero(r);
404
405 void *ret;
406 r = toku_pthread_join(time_tid, &ret);
407 assert_zero(r);
408 r = toku_pthread_join(checkpoint_tid, &ret);
409 assert_zero(r);
410 for (int i = 0; i < NUM_MOVER_THREADS; i++) {
411 r = toku_pthread_join(move_tid[i], &ret);
412 assert_zero(r);
413 }
414 for (int i = 0; i < NUM_MOVER_THREADS; i++) {
415 r = toku_pthread_join(read_random_tid[i], &ret);
416 assert_zero(r);
417 }
418
419 toku_cachetable_verify(ct);
420 toku_cachefile_close(&f1, false, ZERO_LSN);
421 toku_cachetable_close(&ct);
422
423 sum_vals();
424 if (verbose) printf("num_checkpoints %d\n", num_checkpoints);
425
426 }
427
428 int
test_main(int argc,const char * argv[])429 test_main(int argc, const char *argv[]) {
430 default_parse_args(argc, argv);
431 cachetable_test();
432 return 0;
433 }
434