1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 // Make sure that the pending stuff gets checkpointed, but subsequent changes don't, even with concurrent updates.
40 #include "test.h"
41 #include <stdio.h>
42 #include <unistd.h>
43 #include "cachetable-test.h"
44 #include "cachetable/checkpoint.h"
45 #include <portability/toku_atomic.h>
46 
47 static int N; // how many items in the table
48 static CACHEFILE cf;
49 static CACHETABLE ct;
50 int    *values;
51 
52 static const int item_size = sizeof(int);
53 
54 static volatile int n_flush, n_write_me, n_keep_me, n_fetch;
55 
56 static void
57 sleep_random (void)
58 {
59     toku_timespec_t req = {.tv_sec  = 0,
60 			   .tv_nsec = random()%1000000}; //Max just under 1ms
61     nanosleep(&req, NULL);
62 }
63 
64 int expect_value = 42; // initially 42, later 43
65 
66 static void
67 flush (
68     CACHEFILE UU(thiscf),
69     int UU(fd),
70     CACHEKEY UU(key),
71     void *value,
72     void** UU(dd),
73     void *UU(extraargs),
74     PAIR_ATTR size,
75     PAIR_ATTR* UU(new_size),
76     bool write_me,
77     bool keep_me,
78     bool UU(for_checkpoint),
79         bool UU(is_clone)
80     )
81 {
82     // printf("f");
83     assert(size.size== item_size);
84     int *CAST_FROM_VOIDP(v, value);
85     if (*v!=expect_value) printf("got %d expect %d\n", *v, expect_value);
86     assert(*v==expect_value);
87     (void)toku_sync_fetch_and_add(&n_flush, 1);
88     if (write_me) (void)toku_sync_fetch_and_add(&n_write_me, 1);
89     if (keep_me)  (void)toku_sync_fetch_and_add(&n_keep_me, 1);
90     sleep_random();
91 }
92 
93 static void*
94 do_update (void *UU(ignore))
95 {
96     while (n_flush==0); // wait until the first checkpoint ran
97     int i;
98     for (i=0; i<N; i++) {
99 	CACHEKEY key = make_blocknum(i);
100         uint32_t hi = toku_cachetable_hash(cf, key);
101         void *vv;
102         CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
103         wc.flush_callback = flush;
104         int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, wc, fetch_die, def_pf_req_callback, def_pf_callback, true, 0);
105 	//printf("g");
106 	assert(r==0);
107         PAIR_ATTR attr;
108         r = toku_cachetable_get_attr(cf, key, hi, &attr);
109         assert(r==0);
110 	assert(attr.size==sizeof(int));
111 	int *CAST_FROM_VOIDP(v, vv);
112 	assert(*v==42);
113 	*v = 43;
114 	//printf("[%d]43\n", i);
115 	r = toku_test_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, make_pair_attr(item_size));
116 	sleep_random();
117     }
118     return 0;
119 }
120 
121 static void*
122 do_checkpoint (void *UU(v))
123 {
124     CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
125     int r = toku_checkpoint(cp, NULL, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT);
126     assert(r == 0);
127     return 0;
128 }
129 
130 // put n items into the cachetable, mark them dirty, and then concurently
131 //   do a checkpoint (in which the callback functions are slow)
132 //   replace the n items with new values
133 // make sure that the stuff that was checkpointed includes only the old versions
134 // then do a flush and make sure the new items are written
135 
136 static void checkpoint_pending(void) {
137     if (verbose) { printf("%s:%d n=%d\n", __FUNCTION__, __LINE__, N); fflush(stdout); }
138     const int test_limit = N;
139     int r;
140     toku_cachetable_create(&ct, test_limit*sizeof(int), ZERO_LSN, nullptr);
141     const char *fname1 = TOKU_TEST_FILENAME;
142     r = unlink(fname1); if (r!=0) CKERR2(get_error_errno(), ENOENT);
143     r = toku_cachetable_openf(&cf, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
144     create_dummy_functions(cf);
145 
146     // Insert items into the cachetable. All dirty.
147     int i;
148     for (i=0; i<N; i++) {
149         CACHEKEY key = make_blocknum(i);
150         uint32_t hi = toku_cachetable_hash(cf, key);
151 	values[i] = 42;
152         CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
153         wc.flush_callback = flush;
154         toku_cachetable_put(cf, key, hi, &values[i], make_pair_attr(sizeof(int)), wc, put_callback_nop);
155         assert(r == 0);
156 
157         r = toku_test_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, make_pair_attr(item_size));
158         assert(r == 0);
159     }
160 
161     // the checkpoint should cause n writes, but since n <= the cachetable size,
162     // all items should be kept in the cachetable
163     n_flush = n_write_me = n_keep_me = n_fetch = 0;
164     expect_value = 42;
165     // printf("E42\n");
166     toku_pthread_t checkpoint_thread, update_thread;
167     r = toku_pthread_create(toku_uninstrumented,
168                             &checkpoint_thread,
169                             nullptr,
170                             do_checkpoint,
171                             nullptr);
172     assert(r == 0);
173     r = toku_pthread_create(
174         toku_uninstrumented, &update_thread, nullptr, do_update, nullptr);
175     assert(r == 0);
176     r = toku_pthread_join(checkpoint_thread, 0);
177     assert(r == 0);
178     r = toku_pthread_join(update_thread, 0);
179     assert(r == 0);
180 
181     assert(n_flush == N && n_write_me == N && n_keep_me == N);
182 
183     // after the checkpoint, all of the items should be 43
184     //printf("E43\n");
185     n_flush = n_write_me = n_keep_me = n_fetch = 0; expect_value = 43;
186     CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
187     r = toku_checkpoint(cp, NULL, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT);
188     assert(r == 0);
189     assert(n_flush == N && n_write_me == N && n_keep_me == N);
190 
191     // a subsequent checkpoint should cause no flushes, or writes since all of the items are clean
192     n_flush = n_write_me = n_keep_me = n_fetch = 0;
193 
194     r = toku_checkpoint(cp, NULL, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT);
195     assert(r == 0);
196     assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0);
197 
198     toku_cachefile_close(&cf, false, ZERO_LSN);
199     toku_cachetable_close(&ct);
200 }
201 
202 int
203 test_main(int argc, const char *argv[]) {
204     {
205 	struct timeval tv;
206 	gettimeofday(&tv, 0);
207 	srandom(tv.tv_sec * 1000000 + tv.tv_usec);
208     }
209     {
210 	int i;
211 	for (i=1; i<argc; i++) {
212 	    if (strcmp(argv[i], "-v") == 0) {
213 		verbose++;
214 		continue;
215 	    }
216 	}
217     }
218     for (N=1; N<=128; N*=2) {
219 	int myvalues[N];
220 	values = myvalues;
221         checkpoint_pending();
222 	//printf("\n");
223     }
224     return 0;
225 }
226