1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #pragma once
40
41 #include <toku_pthread.h>
42 #include "test.h"
43 #include "threaded_stress_test_helpers.h"
44 #include <portability/toku_atomic.h>
45
46 // set this to true for the recovery version of this stress test
47 // the way this works is to include this header and set
48 // crash_at_end = true;
49 static bool stress_openclose_crash_at_end;
50
51 //
52 // Stress test ft reference counting
53 //
54 // Three things keep a fractal tree in memory by holding a reference:
55 // - open ft handle
56 // - live txn that did a write op
57 // - checkpoint
58 //
59 // To stress reference counting, we would like threads which:
60 // - take checkpoints at random intervals
61 // - update random values, random point queries for auditing
62 // * sometimes close handle before commit.
63 // - close random dictionaries
64 //
65 // Here's how we can do it:
66 //
67 // A bunch of threads randomly choose from N buckets. Each bucket
68 // has a DB, an is_open bit, and a lock.
69 // - in a single txn, each thread will do some small number of
70 // queries or updates on randomb uckets, opening the dbs if
71 // they were closed and possibly closing afterwards.
72 // - this should stress both open db handles and various txns
73 // references dbs simultaneously.
74 //
75 // and all while this is happening, throw in scanners, updaters,
76 // and query threads that all assert the contents of these dbs
77 // is correct, even after recovery.
78
79 #define verbose_printf(...) \
80 do { \
81 if (verbose) { \
82 printf(__VA_ARGS__); \
83 fflush(stdout); \
84 } \
85 } while (0)
86
87 // a bunch of buckets with dbs, a lock, and an is_open bit
88 // threads will choose buckets randomly for update, query,
89 // and then maybe open/close the bucket's db.
90 struct db_bucket {
91 DB_ENV *env;
92 DB *db;
93 bool is_open;
94 toku_mutex_t mutex;
95 };
96 static struct db_bucket *buckets;
97 static int num_buckets;
98
99 // each operation can do at most this many operations in one txn
100 static int
choose_random_iteration_count(ARG arg)101 choose_random_iteration_count(ARG arg) {
102 const int max_iteration_count = 8;
103 int k = myrandom_r(arg->random_data) % max_iteration_count;
104 return k + 1;
105 }
106
107 // open the ith db in the array, asserting success
108 static void
open_ith_db(DB_ENV * env,DB ** db,int i)109 open_ith_db(DB_ENV *env, DB **db, int i) {
110 char name[30];
111 memset(name, 0, sizeof(name));
112 get_ith_table_name(name, sizeof(name), i);
113 int r = db_create(db, env, 0);
114 CKERR(r);
115 r = (*db)->open(*db, null_txn, name, NULL, DB_BTREE, 0, 0666);
116 CKERR(r);
117 }
118
119 // debugging counter to maintain the invariant that open_buckets <= num_buckets
120 static int open_buckets;
121
122 // choose and lock a random bucket, possibly opening a db
123 static struct db_bucket *
lock_and_maybe_open_some_db(ARG arg)124 lock_and_maybe_open_some_db(ARG arg) {
125 int k = myrandom_r(arg->random_data) % num_buckets;
126 struct db_bucket *bucket = &buckets[k];
127 toku_mutex_lock(&bucket->mutex);
128 if (!bucket->is_open) {
129 // choose a random DB from 0..k-1 to associate with this bucket
130 // then, mark the bucket as open.
131 int i = myrandom_r(arg->random_data) % num_buckets;
132 open_ith_db(bucket->env, &bucket->db, i);
133 bucket->is_open = true;
134 assert(toku_sync_fetch_and_add(&open_buckets, 1) < num_buckets);
135 verbose_printf("opened db %d in bucket %d\n", i, k);
136 }
137 return bucket;
138 }
139
140 // release the lock on a bucket, possibly closing its db
141 static void
unlock_and_maybe_close_db(struct db_bucket * bucket,ARG arg)142 unlock_and_maybe_close_db(struct db_bucket *bucket, ARG arg) {
143 static const int p = 5;
144 int k = ((unsigned) myrandom_r(arg->random_data)) % 100;
145 // we should close with probability approximately p / 100
146 assert(bucket->is_open);
147 if (k <= p) {
148 DB *db = bucket->db;
149 int r = db->close(db, 0);
150 CKERR(r);
151 bucket->is_open = false;
152 int old_open_buckets = toku_sync_fetch_and_sub(&open_buckets, 1);
153 assert(old_open_buckets > 0);
154 verbose_printf("decided to close a bucket's db before unlocking\n");
155 }
156 toku_mutex_unlock(&bucket->mutex);
157 }
158
159 // scan some dbs, verifying the correct sum.
160 static int
scan_some_dbs(DB_TXN * txn,ARG arg,void * operation_extra,void * UU (stats_extra))161 scan_some_dbs(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
162 int r = 0;
163 verbose_printf("scanning some dbs\n");
164 struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
165 // scan every db, one by one, and verify that the contents are correct
166 for (int i = 0; r == 0 && run_test && i < arg->cli->num_DBs; i++) {
167 struct db_bucket *bucket = lock_and_maybe_open_some_db(arg);
168 const bool check_sum = true;
169 r = scan_op_and_maybe_check_sum(bucket->db, txn, extra, check_sum);
170 invariant(r == 0 || r == DB_LOCK_NOTGRANTED);
171 unlock_and_maybe_close_db(bucket, arg);
172 }
173 return r;
174 }
175
176 // update a couple of dbs in some buckets with a txn
177 static int
update_some_dbs(DB_TXN * txn,ARG arg,void * op_extra,void * stats_extra)178 update_some_dbs(DB_TXN *txn, ARG arg, void *op_extra, void *stats_extra) {
179 int r = 0;
180 verbose_printf("updating some dbs\n");
181 const int iterations = choose_random_iteration_count(arg);
182 for (int i = 0; r == 0 && run_test && i < iterations; i++) {
183 struct db_bucket *bucket = lock_and_maybe_open_some_db(arg);
184 // does an update operation on this bucket's db
185 r = update_op_db(bucket->db, txn, arg, op_extra, stats_extra);
186 invariant(r == 0 || r == DB_LOCK_NOTGRANTED);
187 unlock_and_maybe_close_db(bucket, arg);
188 }
189 return r;
190 }
191
192 // point query a couple of dbs in some buckets with a txn
193 static int
ptquery_some_dbs(DB_TXN * txn,ARG arg,void * UU (op_extra),void * UU (stats_extra))194 ptquery_some_dbs(DB_TXN *txn, ARG arg, void *UU(op_extra), void *UU(stats_extra)) {
195 int r = 0;
196 verbose_printf("querying some dbs\n");
197 const int iterations = choose_random_iteration_count(arg);
198 for (int i = 0; r == 0 && run_test && i < iterations; i++) {
199 struct db_bucket *bucket = lock_and_maybe_open_some_db(arg);
200 // does a point query on a random key for this bucket's db
201 const bool check_sum = true;
202 r = ptquery_and_maybe_check_op(bucket->db, txn, arg, check_sum);
203 invariant(r == 0 || r == DB_LOCK_NOTGRANTED);
204 unlock_and_maybe_close_db(bucket, arg);
205 }
206 return r;
207 }
208
209 static void
stress_table(DB_ENV * env,DB ** dbp,struct cli_args * cli_args)210 stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
211 const int update_threads = cli_args->num_update_threads;
212 const int query_threads = cli_args->num_ptquery_threads;
213 const int total_threads = update_threads + query_threads + 1;
214
215 struct arg myargs[total_threads];
216 for (int i = 0; i < total_threads; i++) {
217 arg_init(&myargs[i], dbp, env, cli_args);
218 }
219
220 struct scan_op_extra soe[4];
221
222 // make the forward fast scanner
223 soe[0].fast = true;
224 soe[0].fwd = true;
225 soe[0].prefetch = false;
226 myargs[0].operation_extra = &soe[0];
227 myargs[0].operation = scan_some_dbs;
228
229 struct update_op_args uoe = get_update_op_args(cli_args, NULL);
230 // make the guy that updates the db
231 for (int i = 1; i < 1 + update_threads; ++i) {
232 myargs[i].operation_extra = &uoe;
233 myargs[i].operation = update_some_dbs;
234 myargs[i].do_prepare = true;
235 }
236 // make the guy that does point queries
237 for (int i = 1 + update_threads; i < total_threads; i++) {
238 myargs[i].operation = ptquery_some_dbs;
239 myargs[i].do_prepare = true;
240 }
241
242 num_buckets = cli_args->num_DBs;
243 open_buckets = num_buckets;
244 // each thread gets access to this array of db buckets, from
245 // which they can choose a random db to either touch or query
246 XMALLOC_N(num_buckets, buckets);
247 for (int i = 0; i < num_buckets; i++) {
248 struct db_bucket bucket = {.env = env, .db = dbp[i], .is_open = true};
249 buckets[i] = bucket;
250 toku_mutex_init(toku_uninstrumented, &buckets[i].mutex, nullptr);
251 }
252 // run all of the query and update workers. they may randomly open
253 // and close the dbs in each db_bucket to be some random dictionary,
254 // so when they're done we'll have to clean up the mess so this
255 // stress test can exit gracefully expecting db[i] = the ith db
256 // verbose_printf("stressing %d tables using %d update threads, %d query
257 // threads\n",
258 // num_buckets, update_threads, query_threads);
259 verbose_printf("stressing %d tables using %d update threads\n",
260 num_buckets, update_threads);
261 // stress_openclose_crash_at_end should be changed to true or false,
262 // depending if this test is for recovery or not.
263 const bool crash_at_end = stress_openclose_crash_at_end;
264 run_workers(myargs, total_threads, cli_args->num_seconds, crash_at_end, cli_args);
265
266 // the stress test is now complete. get ready for shutdown/close.
267 //
268 // make sure that every db in the original array is opened
269 // as it was when it was passed in.
270 for (int i = 0; i < num_buckets; i++) {
271 // close whatever is open
272 if (buckets[i].is_open) {
273 DB *db = buckets[i].db;
274 int r = db->close(db, 0);
275 CKERR(r);
276 }
277 // put the correct db back, then save the pointer
278 // into the dbp array we were given
279 open_ith_db(env, &buckets[i].db, i);
280 dbp[i] = buckets[i].db;
281 }
282
283 toku_free(buckets);
284 }
285