1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40
41 #include <stdio.h>
42 #include <stdlib.h>
43
44 #include <toku_pthread.h>
45 #include <unistd.h>
46 #include <memory.h>
47 #include <sys/stat.h>
48 #include <db.h>
49
50
51 //
52 // This test verifies that running evictions on a writer thread
53 // are ok. We create a dictionary bigger than the cachetable (around 4x greater).
54 // Then, we spawn a bunch of pthreads that do the following:
55 // - scan dictionary forward with bulk fetch
56 // - scan dictionary forward slowly
57 // - scan dictionary backward with bulk fetch
58 // - scan dictionary backward slowly
59 // - update existing values in the dictionary with db->put(DB_YESOVERWRITE)
60 // With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
61 // If the test runs to completion without crashing, we consider it a success.
62 //
63
64 bool run_test;
65 int time_of_test;
66 int num_elements;
67
68 struct arg {
69 int n;
70 DB *db;
71 DB_ENV* env;
72 bool fast;
73 bool fwd;
74 };
75
76 static int
go_fast(DBT const * a,DBT const * b,void * c)77 go_fast(DBT const *a, DBT const *b, void *c) {
78 assert(a);
79 assert(b);
80 assert(c==NULL);
81 return TOKUDB_CURSOR_CONTINUE;
82 }
83 static int
go_slow(DBT const * a,DBT const * b,void * c)84 go_slow(DBT const *a, DBT const *b, void *c) {
85 assert(a);
86 assert(b);
87 assert(c==NULL);
88 return 0;
89 }
90
scan_db(void * arg)91 static void *scan_db(void *arg) {
92 struct arg *myarg = (struct arg *) arg;
93 DB_ENV* env = myarg->env;
94 DB* db = myarg->db;
95 DB_TXN* txn = NULL;
96 while(run_test) {
97 int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
98 DBC* cursor = NULL;
99 { int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); }
100 while (r != DB_NOTFOUND) {
101 if (myarg->fwd) {
102 r = cursor->c_getf_next(cursor, 0, myarg->fast ? go_fast : go_slow, NULL);
103 }
104 else {
105 r = cursor->c_getf_prev(cursor, 0, myarg->fast ? go_fast : go_slow, NULL);
106 }
107 assert(r==0 || r==DB_NOTFOUND);
108 }
109
110 { int chk_r = cursor->c_close(cursor); CKERR(chk_r); }
111 { int chk_r = txn->commit(txn,0); CKERR(chk_r); }
112 }
113 return arg;
114 }
115
ptquery_db(void * arg)116 static void *ptquery_db(void *arg) {
117 struct arg *myarg = (struct arg *) arg;
118 DB_ENV* env = myarg->env;
119 DB* db = myarg->db;
120 DB_TXN* txn = NULL;
121 int n = myarg->n;
122 while(run_test) {
123 int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
124 int rand_key = random() % n;
125 DBT key;
126 DBT val;
127 memset(&val, 0, sizeof(val));
128 dbt_init(&key, &rand_key, sizeof(rand_key));
129 r = db->get(db, txn, &key, &val, 0);
130 assert(r != DB_NOTFOUND);
131 { int chk_r = txn->commit(txn,0); CKERR(chk_r); }
132 }
133 return arg;
134 }
135
update_db(void * arg)136 static void *update_db(void *arg) {
137 struct arg *myarg = (struct arg *) arg;
138 DB_ENV* env = myarg->env;
139 DB* db = myarg->db;
140 int n = myarg->n;
141
142 DB_TXN* txn = NULL;
143 while (run_test) {
144 int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
145 for (uint32_t i = 0; i < 1000; i++) {
146 int rand_key = random() % n;
147 int rand_val = random();
148 DBT key, val;
149 r = db->put(
150 db,
151 txn,
152 dbt_init(&key, &rand_key, sizeof(rand_key)),
153 dbt_init(&val, &rand_val, sizeof(rand_val)),
154 0
155 );
156 CKERR(r);
157 }
158 { int chk_r = txn->commit(txn,0); CKERR(chk_r); }
159 }
160 return arg;
161 }
162
test_time(void * arg)163 static void *test_time(void *arg) {
164 assert(arg == NULL);
165 usleep(time_of_test*1000*1000);
166 if (verbose) printf("should now end test\n");
167 run_test = false;
168 return arg;
169 }
170
171
172 static void
test_evictions(void)173 test_evictions (void) {
174 int n = num_elements;
175 if (verbose) printf("test_3645:%d \n", n);
176
177 DB_TXN * const null_txn = 0;
178 const char * const fname = "test.bulk_fetch.ft_handle";
179 int r;
180
181 toku_os_recursive_delete(TOKU_TEST_FILENAME);
182 r=toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
183
184 /* create the dup database file */
185 DB_ENV *env;
186 r = db_env_create(&env, 0); assert(r == 0);
187 r=env->set_default_bt_compare(env, int_dbt_cmp); CKERR(r);
188 // set the cache size to 10MB
189 r = env->set_cachesize(env, 0, 100000, 1); CKERR(r);
190 r=env->open(env, TOKU_TEST_FILENAME, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
191 r = env->checkpointing_set_period(env, 10);
192 CKERR(r);
193
194
195
196 DB *db;
197 r = db_create(&db, env, 0);
198 assert(r == 0);
199 r = db->set_flags(db, 0);
200 assert(r == 0);
201 r = db->set_pagesize(db, 4096);
202 assert(r == 0);
203 r = db->set_readpagesize(db, 1024);
204 assert(r == 0);
205 r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
206 assert(r == 0);
207
208 int keys[n];
209 for (int i=0; i<n; i++) {
210 keys[i] = i;
211 }
212
213 if (verbose) printf("starting insertion of elements to setup test\n");
214 for (int i=0; i<n; i++) {
215 DBT key, val;
216 r = db->put(db, null_txn, dbt_init(&key, &keys[i], sizeof keys[i]), dbt_init(&val, &i, sizeof i), 0);
217 assert(r == 0);
218 }
219
220 //
221 // the threads that we want:
222 // - one thread constantly updating random values
223 // - one thread doing table scan with bulk fetch
224 // - one thread doing table scan without bulk fetch
225 // - one thread doing random point queries
226 //
227 run_test = true;
228 if (verbose) printf("starting creation of pthreads\n");
229 toku_pthread_t mytids[7];
230 struct arg myargs[7];
231 for (uint32_t i = 0; i < sizeof(myargs)/sizeof(myargs[0]); i++) {
232 myargs[i].n = n;
233 myargs[i].db = db;
234 myargs[i].env = env;
235 myargs[i].fast = true;
236 myargs[i].fwd = true;
237 }
238
239 // make the forward fast scanner
240 myargs[0].fast = true;
241 myargs[0].fwd = true;
242 {
243 int chk_r = toku_pthread_create(
244 toku_uninstrumented, &mytids[0], nullptr, scan_db, &myargs[0]);
245 CKERR(chk_r);
246 }
247
248 // make the forward slow scanner
249 myargs[1].fast = false;
250 myargs[1].fwd = true;
251 {
252 int chk_r = toku_pthread_create(
253 toku_uninstrumented, &mytids[1], nullptr, scan_db, &myargs[1]);
254 CKERR(chk_r);
255 }
256
257 // make the backward fast scanner
258 myargs[2].fast = true;
259 myargs[2].fwd = false;
260 {
261 int chk_r = toku_pthread_create(
262 toku_uninstrumented, &mytids[2], nullptr, scan_db, &myargs[2]);
263 CKERR(chk_r);
264 }
265
266 // make the backward slow scanner
267 myargs[3].fast = false;
268 myargs[3].fwd = false;
269 {
270 int chk_r = toku_pthread_create(
271 toku_uninstrumented, &mytids[3], nullptr, scan_db, &myargs[3]);
272 CKERR(chk_r);
273 }
274
275 // make the guy that updates the db
276 {
277 int chk_r = toku_pthread_create(
278 toku_uninstrumented, &mytids[4], nullptr, update_db, &myargs[4]);
279 CKERR(chk_r);
280 }
281
282 // make the guy that does point queries
283 {
284 int chk_r = toku_pthread_create(
285 toku_uninstrumented, &mytids[5], nullptr, ptquery_db, &myargs[5]);
286 CKERR(chk_r);
287 }
288
289 // make the guy that sleeps
290 {
291 int chk_r = toku_pthread_create(
292 toku_uninstrumented, &mytids[6], nullptr, test_time, nullptr);
293 CKERR(chk_r);
294 }
295
296 for (uint32_t i = 0; i < sizeof(myargs) / sizeof(myargs[0]); i++) {
297 void *ret;
298 r = toku_pthread_join(mytids[i], &ret); assert_zero(r);
299 }
300 if (verbose) printf("ending test, pthreads have joined\n");
301
302
303 r = db->close(db, 0); CKERR(r);
304 r = env->close(env, 0); CKERR(r);
305 }
306
parse_3645_args(int argc,char * const argv[])307 static inline void parse_3645_args (int argc, char *const argv[]) {
308 const char *argv0=argv[0];
309 while (argc>1) {
310 int resultcode=0;
311 if (strcmp(argv[1], "-v")==0) {
312 verbose++;
313 }
314 else if (strcmp(argv[1], "-q")==0) {
315 verbose=0;
316 }
317 else if (strcmp(argv[1], "-h")==0) {
318 do_usage:
319 fprintf(stderr, "Usage:\n%s [-v|-h-q|--num_elements number | --num_seconds number]\n", argv0);
320 exit(resultcode);
321 }
322 else if (strcmp(argv[1], "--num_elements") == 0) {
323 argc--;
324 argv++;
325 num_elements = atoi(argv[1]);
326 }
327 else if (strcmp(argv[1], "--num_seconds") == 0) {
328 argc--;
329 argv++;
330 time_of_test = atoi(argv[1]);
331 }
332 else {
333 resultcode=1;
334 goto do_usage;
335 }
336 argc--;
337 argv++;
338 }
339 }
340
341
342 int
test_main(int argc,char * const argv[])343 test_main(int argc, char *const argv[]) {
344 // default values
345 num_elements = 100000;
346 time_of_test = 60;
347 parse_3645_args(argc, argv);
348 test_evictions();
349 return 0;
350 }
351