1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 #include <stdio.h>
42 #include <stdlib.h>
43 
44 #include <toku_pthread.h>
45 #include <unistd.h>
46 #include <memory.h>
47 #include <sys/stat.h>
48 #include <db.h>
49 
50 #include "threaded_stress_test_helpers.h"
51 
52 //
53 // This test is a form of stress that does operations on a single dictionary:
54 // We create a dictionary bigger than the cachetable (around 4x greater).
55 // Then, we spawn a bunch of pthreads that do the following:
56 //  - scan dictionary forward with bulk fetch
57 //  - scan dictionary forward slowly
58 //  - scan dictionary backward with bulk fetch
59 //  - scan dictionary backward slowly
60 //  - Grow the dictionary with insertions
61 //  - do random point queries into the dictionary
62 // With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
63 // If the test runs to completion without crashing, we consider it a success. It also tests that snapshots
64 // work correctly by verifying that table scans sum their vals to 0.
65 //
66 // This does NOT test:
67 //  - splits and merges
68 //  - multiple DBs
69 //
70 // Variables that are interesting to tweak and run:
71 //  - small cachetable
72 //  - number of elements
73 //
74 
75 uint64_t time_til_crash;
76 uint64_t start_time;
77 
get_tnow(void)78 static uint64_t get_tnow(void) {
79     struct timeval tv;
80     int r = gettimeofday(&tv, NULL); assert(r == 0);
81     return tv.tv_sec * 1000000ULL + tv.tv_usec;
82 }
83 
checkpoint_callback2(void * UU (extra))84 static void checkpoint_callback2(void* UU(extra)) {
85     uint64_t curr_time = get_tnow();
86     uint64_t time_diff = curr_time - start_time;
87     if ((time_diff/1000000ULL) > time_til_crash) {
88         toku_hard_crash_on_purpose();
89     }
90 }
91 
manual_checkpoint(DB_TXN * UU (txn),ARG UU (arg),void * operation_extra,void * UU (stats_extra))92 static int manual_checkpoint(DB_TXN *UU(txn), ARG UU(arg), void* operation_extra, void *UU(stats_extra)) {
93     DB_ENV* CAST_FROM_VOIDP(env, operation_extra);
94     int r = env->txn_checkpoint(env,0,0,0);
95     assert_zero(r);
96     return 0;
97 }
98 
99 static void
stress_table(DB_ENV * env,DB ** dbp,struct cli_args * cli_args)100 stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
101     //
102     // the threads that we want:
103     //   - one thread constantly updating random values
104     //   - one thread doing table scan with bulk fetch
105     //   - one thread doing table scan without bulk fetch
106     //   - one thread doing random point queries
107     //
108 
109     if (verbose) printf("starting creation of pthreads\n");
110     const int num_threads = 5 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
111     struct arg myargs[num_threads];
112     for (int i = 0; i < num_threads; i++) {
113         arg_init(&myargs[i], dbp, env, cli_args);
114     }
115     struct scan_op_extra soe[4];
116 
117     // make the forward fast scanner
118     soe[0].fast = true;
119     soe[0].fwd = true;
120     soe[0].prefetch = false;
121     myargs[0].operation_extra = &soe[0];
122     myargs[0].operation = scan_op;
123 
124     // make the forward slow scanner
125     soe[1].fast = false;
126     soe[1].fwd = true;
127     soe[1].prefetch = false;
128     myargs[1].operation_extra = &soe[1];
129     myargs[1].operation = scan_op;
130 
131     // make the backward fast scanner
132     soe[2].fast = true;
133     soe[2].fwd = false;
134     soe[2].prefetch = false;
135     myargs[2].operation_extra = &soe[2];
136     myargs[2].operation = scan_op;
137 
138     // make the backward slow scanner
139     soe[3].fast = false;
140     soe[3].fwd = false;
141     soe[3].prefetch = false;
142     myargs[3].operation_extra = &soe[3];
143     myargs[3].operation = scan_op;
144 
145     // make something for checkpoints
146     myargs[4].operation = manual_checkpoint;
147     myargs[4].sleep_ms = 30*1000; // do checkpoints every 30 seconds
148     myargs[4].operation_extra = env;
149 
150     struct update_op_args uoe = get_update_op_args(cli_args, NULL);
151     // make the guy that updates the db
152     for (int i = 5; i < 5 + cli_args->num_update_threads; ++i) {
153         myargs[i].operation_extra = &uoe;
154         myargs[i].operation = update_op;
155     }
156 
157     // make the guy that does point queries
158     for (int i = 5 + cli_args->num_update_threads; i < num_threads; i++) {
159         myargs[i].operation = ptquery_op;
160     }
161 
162     db_env_set_checkpoint_callback2(checkpoint_callback2, NULL);
163     time_til_crash = random() % cli_args->num_seconds;
164     start_time = get_tnow();
165     run_workers(myargs, num_threads, INT32_MAX, true, cli_args);
166 }
167 
168 int
test_main(int argc,char * const argv[])169 test_main(int argc, char *const argv[]) {
170     struct cli_args args = get_default_args();
171     args.env_args.checkpointing_period = 0;
172     parse_stress_test_args(argc, argv, &args);
173     if (args.do_test_and_crash) {
174         stress_test_main(&args);
175     }
176     if (args.do_recover) {
177         stress_recover(&args);
178     }
179     return 0;
180 }
181