1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 #include <stdio.h>
42 #include <stdlib.h>
43 
44 #include <toku_pthread.h>
45 #include <unistd.h>
46 #include <memory.h>
47 #include <sys/stat.h>
48 #include <db.h>
49 
50 
51 //
52 // This test verifies that running evictions on a writer thread
53 // are ok. We create a dictionary bigger than the cachetable (around 4x greater).
54 // Then, we spawn a bunch of pthreads that do the following:
55 //  - scan dictionary forward with bulk fetch
56 //  - scan dictionary forward slowly
57 //  - scan dictionary backward with bulk fetch
58 //  - scan dictionary backward slowly
59 //  - update existing values in the dictionary with db->put(DB_YESOVERWRITE)
60 // With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
61 // If the test runs to completion without crashing, we consider it a success.
62 //
63 
64 bool run_test;
65 int time_of_test;
66 int num_elements;
67 
68 struct arg {
69     int n;
70     DB *db;
71     DB_ENV* env;
72     bool fast;
73     bool fwd;
74 };
75 
76 static int
go_fast(DBT const * a,DBT const * b,void * c)77 go_fast(DBT const *a, DBT  const *b, void *c) {
78     assert(a);
79     assert(b);
80     assert(c==NULL);
81     return TOKUDB_CURSOR_CONTINUE;
82 }
83 static int
go_slow(DBT const * a,DBT const * b,void * c)84 go_slow(DBT const *a, DBT  const *b, void *c) {
85     assert(a);
86     assert(b);
87     assert(c==NULL);
88     return 0;
89 }
90 
scan_db(void * arg)91 static void *scan_db(void *arg) {
92     struct arg *myarg = (struct arg *) arg;
93     DB_ENV* env = myarg->env;
94     DB* db = myarg->db;
95     DB_TXN* txn = NULL;
96     while(run_test) {
97         int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
98         DBC* cursor = NULL;
99         { int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); }
100         while (r != DB_NOTFOUND) {
101             if (myarg->fwd) {
102                 r = cursor->c_getf_next(cursor, 0, myarg->fast ? go_fast : go_slow, NULL);
103             }
104             else {
105                 r = cursor->c_getf_prev(cursor, 0, myarg->fast ? go_fast : go_slow, NULL);
106             }
107             assert(r==0 || r==DB_NOTFOUND);
108         }
109 
110         { int chk_r = cursor->c_close(cursor); CKERR(chk_r); }
111         { int chk_r = txn->commit(txn,0); CKERR(chk_r); }
112     }
113     return arg;
114 }
115 
ptquery_db(void * arg)116 static void *ptquery_db(void *arg) {
117     struct arg *myarg = (struct arg *) arg;
118     DB_ENV* env = myarg->env;
119     DB* db = myarg->db;
120     DB_TXN* txn = NULL;
121     int n = myarg->n;
122     while(run_test) {
123         int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
124         int rand_key = random() % n;
125         DBT key;
126         DBT val;
127         memset(&val, 0, sizeof(val));
128         dbt_init(&key, &rand_key, sizeof(rand_key));
129         r = db->get(db, txn, &key, &val, 0);
130         assert(r != DB_NOTFOUND);
131         { int chk_r = txn->commit(txn,0); CKERR(chk_r); }
132     }
133     return arg;
134 }
135 
update_db(void * arg)136 static void *update_db(void *arg) {
137     struct arg *myarg = (struct arg *) arg;
138     DB_ENV* env = myarg->env;
139     DB* db = myarg->db;
140     int n = myarg->n;
141 
142     DB_TXN* txn = NULL;
143     while (run_test) {
144         int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
145         for (uint32_t i = 0; i < 1000; i++) {
146             int rand_key = random() % n;
147             int rand_val = random();
148             DBT key, val;
149             r = db->put(
150                 db,
151                 txn,
152                 dbt_init(&key, &rand_key, sizeof(rand_key)),
153                 dbt_init(&val, &rand_val, sizeof(rand_val)),
154                 0
155                 );
156             CKERR(r);
157         }
158         { int chk_r = txn->commit(txn,0); CKERR(chk_r); }
159     }
160     return arg;
161 }
162 
test_time(void * arg)163 static void *test_time(void *arg) {
164     assert(arg == NULL);
165     usleep(time_of_test*1000*1000);
166     if (verbose) printf("should now end test\n");
167     run_test = false;
168     return arg;
169 }
170 
171 
172 static void
test_evictions(void)173 test_evictions (void) {
174     int n = num_elements;
175     if (verbose) printf("test_3645:%d \n", n);
176 
177     DB_TXN * const null_txn = 0;
178     const char * const fname = "test.bulk_fetch.ft_handle";
179     int r;
180 
181     toku_os_recursive_delete(TOKU_TEST_FILENAME);
182     r=toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
183 
184     /* create the dup database file */
185     DB_ENV *env;
186     r = db_env_create(&env, 0); assert(r == 0);
187     r=env->set_default_bt_compare(env, int_dbt_cmp); CKERR(r);
188     // set the cache size to 10MB
189     r = env->set_cachesize(env, 0, 100000, 1); CKERR(r);
190     r=env->open(env, TOKU_TEST_FILENAME, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
191     r = env->checkpointing_set_period(env, 10);
192     CKERR(r);
193 
194 
195 
196     DB *db;
197     r = db_create(&db, env, 0);
198     assert(r == 0);
199     r = db->set_flags(db, 0);
200     assert(r == 0);
201     r = db->set_pagesize(db, 4096);
202     assert(r == 0);
203     r = db->set_readpagesize(db, 1024);
204     assert(r == 0);
205     r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
206     assert(r == 0);
207 
208     int keys[n];
209     for (int i=0; i<n; i++) {
210         keys[i] = i;
211     }
212 
213     if (verbose) printf("starting insertion of elements to setup test\n");
214     for (int i=0; i<n; i++) {
215         DBT key, val;
216         r = db->put(db, null_txn, dbt_init(&key, &keys[i], sizeof keys[i]), dbt_init(&val, &i, sizeof i), 0);
217         assert(r == 0);
218     }
219 
220     //
221     // the threads that we want:
222     //   - one thread constantly updating random values
223     //   - one thread doing table scan with bulk fetch
224     //   - one thread doing table scan without bulk fetch
225     //   - one thread doing random point queries
226     //
227     run_test = true;
228     if (verbose) printf("starting creation of pthreads\n");
229     toku_pthread_t mytids[7];
230     struct arg myargs[7];
231     for (uint32_t i = 0; i < sizeof(myargs)/sizeof(myargs[0]); i++) {
232         myargs[i].n = n;
233         myargs[i].db = db;
234         myargs[i].env = env;
235         myargs[i].fast = true;
236         myargs[i].fwd = true;
237     }
238 
239     // make the forward fast scanner
240     myargs[0].fast = true;
241     myargs[0].fwd = true;
242     {
243         int chk_r = toku_pthread_create(
244             toku_uninstrumented, &mytids[0], nullptr, scan_db, &myargs[0]);
245         CKERR(chk_r);
246     }
247 
248     // make the forward slow scanner
249     myargs[1].fast = false;
250     myargs[1].fwd = true;
251     {
252         int chk_r = toku_pthread_create(
253             toku_uninstrumented, &mytids[1], nullptr, scan_db, &myargs[1]);
254         CKERR(chk_r);
255     }
256 
257     // make the backward fast scanner
258     myargs[2].fast = true;
259     myargs[2].fwd = false;
260     {
261         int chk_r = toku_pthread_create(
262             toku_uninstrumented, &mytids[2], nullptr, scan_db, &myargs[2]);
263         CKERR(chk_r);
264     }
265 
266     // make the backward slow scanner
267     myargs[3].fast = false;
268     myargs[3].fwd = false;
269     {
270         int chk_r = toku_pthread_create(
271             toku_uninstrumented, &mytids[3], nullptr, scan_db, &myargs[3]);
272         CKERR(chk_r);
273     }
274 
275     // make the guy that updates the db
276     {
277         int chk_r = toku_pthread_create(
278             toku_uninstrumented, &mytids[4], nullptr, update_db, &myargs[4]);
279         CKERR(chk_r);
280     }
281 
282     // make the guy that does point queries
283     {
284         int chk_r = toku_pthread_create(
285             toku_uninstrumented, &mytids[5], nullptr, ptquery_db, &myargs[5]);
286         CKERR(chk_r);
287     }
288 
289     // make the guy that sleeps
290     {
291         int chk_r = toku_pthread_create(
292             toku_uninstrumented, &mytids[6], nullptr, test_time, nullptr);
293         CKERR(chk_r);
294     }
295 
296     for (uint32_t i = 0; i < sizeof(myargs) / sizeof(myargs[0]); i++) {
297         void *ret;
298         r = toku_pthread_join(mytids[i], &ret); assert_zero(r);
299     }
300     if (verbose) printf("ending test, pthreads have joined\n");
301 
302 
303     r = db->close(db, 0); CKERR(r);
304     r = env->close(env, 0); CKERR(r);
305 }
306 
parse_3645_args(int argc,char * const argv[])307 static inline void parse_3645_args (int argc, char *const argv[]) {
308     const char *argv0=argv[0];
309     while (argc>1) {
310         int resultcode=0;
311         if (strcmp(argv[1], "-v")==0) {
312             verbose++;
313         }
314         else if (strcmp(argv[1], "-q")==0) {
315             verbose=0;
316         }
317         else if (strcmp(argv[1], "-h")==0) {
318         do_usage:
319             fprintf(stderr, "Usage:\n%s [-v|-h-q|--num_elements number | --num_seconds number]\n", argv0);
320             exit(resultcode);
321         }
322         else if (strcmp(argv[1], "--num_elements") == 0) {
323             argc--;
324             argv++;
325             num_elements = atoi(argv[1]);
326         }
327         else if (strcmp(argv[1], "--num_seconds") == 0) {
328             argc--;
329             argv++;
330             time_of_test = atoi(argv[1]);
331         }
332         else {
333             resultcode=1;
334             goto do_usage;
335         }
336         argc--;
337         argv++;
338     }
339 }
340 
341 
342 int
test_main(int argc,char * const argv[])343 test_main(int argc, char *const argv[]) {
344     // default values
345     num_elements = 100000;
346     time_of_test = 60;
347     parse_3645_args(argc, argv);
348     test_evictions();
349     return 0;
350 }
351