1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 // Make sure that the pending stuff gets checkpointed, but subsequent changes don't, even with concurrent updates.
40 #include "test.h"
41 #include <stdio.h>
42 #include <unistd.h>
43 #include "cachetable-test.h"
44 #include "cachetable/checkpoint.h"
45 #include <portability/toku_atomic.h>
46
47 static int N; // how many items in the table
48 static CACHEFILE cf;
49 static CACHETABLE ct;
50 int *values;
51
52 static const int item_size = sizeof(int);
53
54 static volatile int n_flush, n_write_me, n_keep_me, n_fetch;
55
56 static void
sleep_random(void)57 sleep_random (void)
58 {
59 toku_timespec_t req = {.tv_sec = 0,
60 .tv_nsec = random()%1000000}; //Max just under 1ms
61 nanosleep(&req, NULL);
62 }
63
64 int expect_value = 42; // initially 42, later 43
65
66 static void
flush(CACHEFILE UU (thiscf),int UU (fd),CACHEKEY UU (key),void * value,void ** UU (dd),void * UU (extraargs),PAIR_ATTR size,PAIR_ATTR * UU (new_size),bool write_me,bool keep_me,bool UU (for_checkpoint),bool UU (is_clone))67 flush (
68 CACHEFILE UU(thiscf),
69 int UU(fd),
70 CACHEKEY UU(key),
71 void *value,
72 void** UU(dd),
73 void *UU(extraargs),
74 PAIR_ATTR size,
75 PAIR_ATTR* UU(new_size),
76 bool write_me,
77 bool keep_me,
78 bool UU(for_checkpoint),
79 bool UU(is_clone)
80 )
81 {
82 // printf("f");
83 assert(size.size== item_size);
84 int *CAST_FROM_VOIDP(v, value);
85 if (*v!=expect_value) printf("got %d expect %d\n", *v, expect_value);
86 assert(*v==expect_value);
87 (void)toku_sync_fetch_and_add(&n_flush, 1);
88 if (write_me) (void)toku_sync_fetch_and_add(&n_write_me, 1);
89 if (keep_me) (void)toku_sync_fetch_and_add(&n_keep_me, 1);
90 sleep_random();
91 }
92
93 static void*
do_update(void * UU (ignore))94 do_update (void *UU(ignore))
95 {
96 while (n_flush==0); // wait until the first checkpoint ran
97 int i;
98 for (i=0; i<N; i++) {
99 CACHEKEY key = make_blocknum(i);
100 uint32_t hi = toku_cachetable_hash(cf, key);
101 void *vv;
102 CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
103 wc.flush_callback = flush;
104 int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, wc, fetch_die, def_pf_req_callback, def_pf_callback, true, 0);
105 //printf("g");
106 assert(r==0);
107 PAIR_ATTR attr;
108 r = toku_cachetable_get_attr(cf, key, hi, &attr);
109 assert(r==0);
110 assert(attr.size==sizeof(int));
111 int *CAST_FROM_VOIDP(v, vv);
112 assert(*v==42);
113 *v = 43;
114 //printf("[%d]43\n", i);
115 r = toku_test_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, make_pair_attr(item_size));
116 sleep_random();
117 }
118 return 0;
119 }
120
121 static void*
do_checkpoint(void * UU (v))122 do_checkpoint (void *UU(v))
123 {
124 CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
125 int r = toku_checkpoint(cp, NULL, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT);
126 assert(r == 0);
127 return 0;
128 }
129
130 // put n items into the cachetable, mark them dirty, and then concurently
131 // do a checkpoint (in which the callback functions are slow)
132 // replace the n items with new values
133 // make sure that the stuff that was checkpointed includes only the old versions
134 // then do a flush and make sure the new items are written
135
checkpoint_pending(void)136 static void checkpoint_pending(void) {
137 if (verbose) { printf("%s:%d n=%d\n", __FUNCTION__, __LINE__, N); fflush(stdout); }
138 const int test_limit = N;
139 int r;
140 toku_cachetable_create(&ct, test_limit*sizeof(int), ZERO_LSN, nullptr);
141 const char *fname1 = TOKU_TEST_FILENAME;
142 r = unlink(fname1); if (r!=0) CKERR2(get_error_errno(), ENOENT);
143 r = toku_cachetable_openf(&cf, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
144 create_dummy_functions(cf);
145
146 // Insert items into the cachetable. All dirty.
147 int i;
148 for (i=0; i<N; i++) {
149 CACHEKEY key = make_blocknum(i);
150 uint32_t hi = toku_cachetable_hash(cf, key);
151 values[i] = 42;
152 CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
153 wc.flush_callback = flush;
154 toku_cachetable_put(cf, key, hi, &values[i], make_pair_attr(sizeof(int)), wc, put_callback_nop);
155 assert(r == 0);
156
157 r = toku_test_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, make_pair_attr(item_size));
158 assert(r == 0);
159 }
160
161 // the checkpoint should cause n writes, but since n <= the cachetable size,
162 // all items should be kept in the cachetable
163 n_flush = n_write_me = n_keep_me = n_fetch = 0;
164 expect_value = 42;
165 // printf("E42\n");
166 toku_pthread_t checkpoint_thread, update_thread;
167 r = toku_pthread_create(toku_uninstrumented,
168 &checkpoint_thread,
169 nullptr,
170 do_checkpoint,
171 nullptr);
172 assert(r == 0);
173 r = toku_pthread_create(
174 toku_uninstrumented, &update_thread, nullptr, do_update, nullptr);
175 assert(r == 0);
176 r = toku_pthread_join(checkpoint_thread, 0);
177 assert(r == 0);
178 r = toku_pthread_join(update_thread, 0);
179 assert(r == 0);
180
181 assert(n_flush == N && n_write_me == N && n_keep_me == N);
182
183 // after the checkpoint, all of the items should be 43
184 //printf("E43\n");
185 n_flush = n_write_me = n_keep_me = n_fetch = 0; expect_value = 43;
186 CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
187 r = toku_checkpoint(cp, NULL, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT);
188 assert(r == 0);
189 assert(n_flush == N && n_write_me == N && n_keep_me == N);
190
191 // a subsequent checkpoint should cause no flushes, or writes since all of the items are clean
192 n_flush = n_write_me = n_keep_me = n_fetch = 0;
193
194 r = toku_checkpoint(cp, NULL, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT);
195 assert(r == 0);
196 assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0);
197
198 toku_cachefile_close(&cf, false, ZERO_LSN);
199 toku_cachetable_close(&ct);
200 }
201
202 int
test_main(int argc,const char * argv[])203 test_main(int argc, const char *argv[]) {
204 {
205 struct timeval tv;
206 gettimeofday(&tv, 0);
207 srandom(tv.tv_sec * 1000000 + tv.tv_usec);
208 }
209 {
210 int i;
211 for (i=1; i<argc; i++) {
212 if (strcmp(argv[i], "-v") == 0) {
213 verbose++;
214 continue;
215 }
216 }
217 }
218 for (N=1; N<=128; N*=2) {
219 int myvalues[N];
220 values = myvalues;
221 checkpoint_pending();
222 //printf("\n");
223 }
224 return 0;
225 }
226