1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 #include <stdio.h>
42 #include <stdlib.h>
43 
44 #include <toku_pthread.h>
45 #include <unistd.h>
46 #include <memory.h>
47 #include <sys/stat.h>
48 #include <db.h>
49 
50 #include "threaded_stress_test_helpers.h"
51 
52 //
53 // This test is a form of stress that does operations on a single dictionary:
54 // We create a dictionary bigger than the cachetable (around 4x greater).
55 // Then, we spawn a bunch of pthreads that do the following:
56 //  - scan dictionary forward with bulk fetch
57 //  - scan dictionary forward slowly
58 //  - scan dictionary backward with bulk fetch
59 //  - scan dictionary backward slowly
60 //  - update existing values in the dictionary with db->put(DB_YESOVERWRITE)
61 //  - do random point queries into the dictionary
62 // With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
63 // If the test runs to completion without crashing, we consider it a success.
64 //
65 // This test differs from stress1 in that it grows the database through
66 // update operations.
67 //
68 
remove_and_recreate_me(DB_TXN * UU (txn),ARG arg,void * UU (operation_extra),void * UU (stats_extra))69 static int remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
70     int r;
71     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
72     DB* db = arg->dbp[db_index];
73     r = (db)->close(db, 0); CKERR(r);
74 
75     char name[30];
76     ZERO_ARRAY(name);
77     get_ith_table_name(name, sizeof(name), db_index);
78 
79     r = arg->env->dbremove(arg->env, null_txn, name, nullptr, 0);
80     CKERR(r);
81 
82     r = db_create(&(arg->dbp[db_index]), arg->env, 0);
83     assert(r == 0);
84     // TODO: Need to call before_db_open_hook() and after_db_open_hook()
85     r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, nullptr, DB_BTREE, DB_CREATE, 0666);
86     assert(r == 0);
87     return 0;
88 }
89 
90 static void
stress_table(DB_ENV * env,DB ** dbp,struct cli_args * cli_args)91 stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
92     //
93     // the threads that we want:
94     //   - one thread constantly updating random values
95     //   - one thread doing table scan with bulk fetch
96     //   - one thread doing table scan without bulk fetch
97     //   - one thread doing random point queries
98     //
99 
100     if (verbose) printf("starting creation of pthreads\n");
101     const int num_threads = 5 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
102     struct arg myargs[num_threads];
103     for (int i = 0; i < num_threads; i++) {
104         arg_init(&myargs[i], dbp, env, cli_args);
105     }
106 
107     struct scan_op_extra soe[4];
108 
109     // make the forward fast scanner
110     soe[0].fast = true;
111     soe[0].fwd = true;
112     soe[0].prefetch = false;
113     myargs[0].lock_type = STRESS_LOCK_SHARED;
114     myargs[0].operation_extra = &soe[0];
115     myargs[0].operation = scan_op;
116 
117     // make the forward slow scanner
118     soe[1].fast = false;
119     soe[1].fwd = true;
120     soe[1].prefetch = false;
121     myargs[1].lock_type = STRESS_LOCK_SHARED;
122     myargs[1].operation_extra = &soe[1];
123     myargs[1].operation = scan_op;
124 
125     // make the backward fast scanner
126     soe[2].fast = true;
127     soe[2].fwd = false;
128     soe[2].prefetch = false;
129     myargs[2].lock_type = STRESS_LOCK_SHARED;
130     myargs[2].operation_extra = &soe[2];
131     myargs[2].operation = scan_op;
132 
133     // make the backward slow scanner
134     soe[3].fast = false;
135     soe[3].fwd = false;
136     soe[3].prefetch = false;
137     myargs[3].lock_type = STRESS_LOCK_SHARED;
138     myargs[3].operation_extra = &soe[3];
139     myargs[3].operation = scan_op;
140 
141     // make the guy that removes and recreates the db
142     myargs[4].lock_type = STRESS_LOCK_EXCL;
143     myargs[4].sleep_ms = 2000; // maybe make this a runtime param at some point
144     myargs[4].operation = remove_and_recreate_me;
145 
146     // make the guy that updates the db
147     struct update_op_args uoe = get_update_op_args(cli_args, NULL);
148     for (int i = 5; i < 5 + cli_args->num_update_threads; ++i) {
149         myargs[i].bounded_element_range = false;
150         myargs[i].lock_type = STRESS_LOCK_SHARED;
151         myargs[i].operation_extra = &uoe;
152         myargs[i].operation = update_op;
153     }
154 
155     // make the guy that does point queries
156     for (int i = 5 + cli_args->num_update_threads; i < num_threads; i++) {
157         myargs[i].lock_type = STRESS_LOCK_SHARED;
158         myargs[i].bounded_element_range = false;
159         myargs[i].operation = ptquery_op_no_check;
160     }
161 
162     run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
163 }
164 
165 int
test_main(int argc,char * const argv[])166 test_main(int argc, char *const argv[]) {
167     struct cli_args args = get_default_args();
168     parse_stress_test_args(argc, argv, &args);
169     stress_test_main(&args);
170     return 0;
171 }
172