1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: 3 #ident "$Id$" 4 /*====== 5 This file is part of PerconaFT. 6 7 8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. 9 10 PerconaFT is free software: you can redistribute it and/or modify 11 it under the terms of the GNU General Public License, version 2, 12 as published by the Free Software Foundation. 13 14 PerconaFT is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. 21 22 ---------------------------------------- 23 24 PerconaFT is free software: you can redistribute it and/or modify 25 it under the terms of the GNU Affero General Public License, version 3, 26 as published by the Free Software Foundation. 27 28 PerconaFT is distributed in the hope that it will be useful, 29 but WITHOUT ANY WARRANTY; without even the implied warranty of 30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 31 GNU Affero General Public License for more details. 32 33 You should have received a copy of the GNU Affero General Public License 34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. 35 ======= */ 36 37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." 38 39 // Make sure that the pending stuff gets checkpointed, but subsequent changes don't, even with concurrent updates. 40 #include "test.h" 41 #include <stdio.h> 42 #include <unistd.h> 43 #include "cachetable-test.h" 44 #include "cachetable/checkpoint.h" 45 #include <portability/toku_atomic.h> 46 47 static int N; // how many items in the table 48 static CACHEFILE cf; 49 static CACHETABLE ct; 50 int *values; 51 52 static const int item_size = sizeof(int); 53 54 static volatile int n_flush, n_write_me, n_keep_me, n_fetch; 55 56 static void 57 sleep_random (void) 58 { 59 toku_timespec_t req = {.tv_sec = 0, 60 .tv_nsec = random()%1000000}; //Max just under 1ms 61 nanosleep(&req, NULL); 62 } 63 64 int expect_value = 42; // initially 42, later 43 65 66 static void 67 flush ( 68 CACHEFILE UU(thiscf), 69 int UU(fd), 70 CACHEKEY UU(key), 71 void *value, 72 void** UU(dd), 73 void *UU(extraargs), 74 PAIR_ATTR size, 75 PAIR_ATTR* UU(new_size), 76 bool write_me, 77 bool keep_me, 78 bool UU(for_checkpoint), 79 bool UU(is_clone) 80 ) 81 { 82 // printf("f"); 83 assert(size.size== item_size); 84 int *CAST_FROM_VOIDP(v, value); 85 if (*v!=expect_value) printf("got %d expect %d\n", *v, expect_value); 86 assert(*v==expect_value); 87 (void)toku_sync_fetch_and_add(&n_flush, 1); 88 if (write_me) (void)toku_sync_fetch_and_add(&n_write_me, 1); 89 if (keep_me) (void)toku_sync_fetch_and_add(&n_keep_me, 1); 90 sleep_random(); 91 } 92 93 static void* 94 do_update (void *UU(ignore)) 95 { 96 while (n_flush==0); // wait until the first checkpoint ran 97 int i; 98 for (i=0; i<N; i++) { 99 CACHEKEY key = make_blocknum(i); 100 uint32_t hi = toku_cachetable_hash(cf, key); 101 void *vv; 102 CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL); 103 wc.flush_callback = flush; 104 int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, wc, fetch_die, def_pf_req_callback, def_pf_callback, true, 0); 105 //printf("g"); 106 assert(r==0); 107 PAIR_ATTR attr; 108 r = toku_cachetable_get_attr(cf, key, hi, &attr); 109 assert(r==0); 110 assert(attr.size==sizeof(int)); 111 int *CAST_FROM_VOIDP(v, vv); 112 assert(*v==42); 113 *v = 43; 114 //printf("[%d]43\n", i); 115 r = toku_test_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, make_pair_attr(item_size)); 116 sleep_random(); 117 } 118 return 0; 119 } 120 121 static void* 122 do_checkpoint (void *UU(v)) 123 { 124 CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct); 125 int r = toku_checkpoint(cp, NULL, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); 126 assert(r == 0); 127 return 0; 128 } 129 130 // put n items into the cachetable, mark them dirty, and then concurently 131 // do a checkpoint (in which the callback functions are slow) 132 // replace the n items with new values 133 // make sure that the stuff that was checkpointed includes only the old versions 134 // then do a flush and make sure the new items are written 135 136 static void checkpoint_pending(void) { 137 if (verbose) { printf("%s:%d n=%d\n", __FUNCTION__, __LINE__, N); fflush(stdout); } 138 const int test_limit = N; 139 int r; 140 toku_cachetable_create(&ct, test_limit*sizeof(int), ZERO_LSN, nullptr); 141 const char *fname1 = TOKU_TEST_FILENAME; 142 r = unlink(fname1); if (r!=0) CKERR2(get_error_errno(), ENOENT); 143 r = toku_cachetable_openf(&cf, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); 144 create_dummy_functions(cf); 145 146 // Insert items into the cachetable. All dirty. 147 int i; 148 for (i=0; i<N; i++) { 149 CACHEKEY key = make_blocknum(i); 150 uint32_t hi = toku_cachetable_hash(cf, key); 151 values[i] = 42; 152 CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL); 153 wc.flush_callback = flush; 154 toku_cachetable_put(cf, key, hi, &values[i], make_pair_attr(sizeof(int)), wc, put_callback_nop); 155 assert(r == 0); 156 157 r = toku_test_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, make_pair_attr(item_size)); 158 assert(r == 0); 159 } 160 161 // the checkpoint should cause n writes, but since n <= the cachetable size, 162 // all items should be kept in the cachetable 163 n_flush = n_write_me = n_keep_me = n_fetch = 0; 164 expect_value = 42; 165 // printf("E42\n"); 166 toku_pthread_t checkpoint_thread, update_thread; 167 r = toku_pthread_create(toku_uninstrumented, 168 &checkpoint_thread, 169 nullptr, 170 do_checkpoint, 171 nullptr); 172 assert(r == 0); 173 r = toku_pthread_create( 174 toku_uninstrumented, &update_thread, nullptr, do_update, nullptr); 175 assert(r == 0); 176 r = toku_pthread_join(checkpoint_thread, 0); 177 assert(r == 0); 178 r = toku_pthread_join(update_thread, 0); 179 assert(r == 0); 180 181 assert(n_flush == N && n_write_me == N && n_keep_me == N); 182 183 // after the checkpoint, all of the items should be 43 184 //printf("E43\n"); 185 n_flush = n_write_me = n_keep_me = n_fetch = 0; expect_value = 43; 186 CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct); 187 r = toku_checkpoint(cp, NULL, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); 188 assert(r == 0); 189 assert(n_flush == N && n_write_me == N && n_keep_me == N); 190 191 // a subsequent checkpoint should cause no flushes, or writes since all of the items are clean 192 n_flush = n_write_me = n_keep_me = n_fetch = 0; 193 194 r = toku_checkpoint(cp, NULL, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); 195 assert(r == 0); 196 assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0); 197 198 toku_cachefile_close(&cf, false, ZERO_LSN); 199 toku_cachetable_close(&ct); 200 } 201 202 int 203 test_main(int argc, const char *argv[]) { 204 { 205 struct timeval tv; 206 gettimeofday(&tv, 0); 207 srandom(tv.tv_sec * 1000000 + tv.tv_usec); 208 } 209 { 210 int i; 211 for (i=1; i<argc; i++) { 212 if (strcmp(argv[i], "-v") == 0) { 213 verbose++; 214 continue; 215 } 216 } 217 } 218 for (N=1; N<=128; N*=2) { 219 int myvalues[N]; 220 values = myvalues; 221 checkpoint_pending(); 222 //printf("\n"); 223 } 224 return 0; 225 } 226