1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #pragma once
40 
41 #include <toku_pthread.h>
42 #include "test.h"
43 #include "threaded_stress_test_helpers.h"
44 #include <portability/toku_atomic.h>
45 
46 // set this to true for the recovery version of this stress test
47 // the way this works is to include this header and set
48 // crash_at_end = true;
49 static bool stress_openclose_crash_at_end;
50 
51 //
52 // Stress test ft reference counting
53 //
54 // Three things keep a fractal tree in memory by holding a reference:
55 //  - open ft handle
56 //  - live txn that did a write op
57 //  - checkpoint
58 //
59 // To stress reference counting, we would like threads which:
60 //  - take checkpoints at random intervals
61 //  - update random values, random point queries for auditing
62 //      * sometimes close handle before commit.
63 //  - close random dictionaries
64 //
65 // Here's how we can do it:
66 //
67 // A bunch of threads randomly choose from N buckets. Each bucket
68 // has a DB, an is_open bit, and a lock.
69 // - in a single txn, each thread will do some small number of
70 // queries or updates on randomb uckets, opening the dbs if
71 // they were closed and possibly closing afterwards.
72 // - this should stress both open db handles and various txns
73 // references dbs simultaneously.
74 //
75 // and all while this is happening, throw in scanners, updaters,
76 // and query threads that all assert the contents of these dbs
77 // is correct, even after recovery.
78 
79 #define verbose_printf(...)         \
80     do {                            \
81         if (verbose) {              \
82             printf(__VA_ARGS__);    \
83             fflush(stdout);         \
84         }                           \
85     } while (0)
86 
87 // a bunch of buckets with dbs, a lock, and an is_open bit
88 // threads will choose buckets randomly for update, query,
89 // and then maybe open/close the bucket's db.
90 struct db_bucket {
91     DB_ENV *env;
92     DB *db;
93     bool is_open;
94     toku_mutex_t mutex;
95 };
96 static struct db_bucket *buckets;
97 static int num_buckets;
98 
99 // each operation can do at most this many operations in one txn
100 static int
choose_random_iteration_count(ARG arg)101 choose_random_iteration_count(ARG arg) {
102     const int max_iteration_count = 8;
103     int k = myrandom_r(arg->random_data) % max_iteration_count;
104     return k + 1;
105 }
106 
107 // open the ith db in the array, asserting success
108 static void
open_ith_db(DB_ENV * env,DB ** db,int i)109 open_ith_db(DB_ENV *env, DB **db, int i) {
110     char name[30];
111     memset(name, 0, sizeof(name));
112     get_ith_table_name(name, sizeof(name), i);
113     int r = db_create(db, env, 0);
114     CKERR(r);
115     r = (*db)->open(*db, null_txn, name, NULL, DB_BTREE, 0, 0666);
116     CKERR(r);
117 }
118 
119 // debugging counter to maintain the invariant that open_buckets <= num_buckets
120 static int open_buckets;
121 
122 // choose and lock a random bucket, possibly opening a db
123 static struct db_bucket *
lock_and_maybe_open_some_db(ARG arg)124 lock_and_maybe_open_some_db(ARG arg) {
125     int k = myrandom_r(arg->random_data) % num_buckets;
126     struct db_bucket *bucket = &buckets[k];
127     toku_mutex_lock(&bucket->mutex);
128     if (!bucket->is_open) {
129         // choose a random DB from 0..k-1 to associate with this bucket
130         // then, mark the bucket as open.
131         int i = myrandom_r(arg->random_data) % num_buckets;
132         open_ith_db(bucket->env, &bucket->db, i);
133         bucket->is_open = true;
134         assert(toku_sync_fetch_and_add(&open_buckets, 1) < num_buckets);
135         verbose_printf("opened db %d in bucket %d\n", i, k);
136     }
137     return bucket;
138 }
139 
140 // release the lock on a bucket, possibly closing its db
141 static void
unlock_and_maybe_close_db(struct db_bucket * bucket,ARG arg)142 unlock_and_maybe_close_db(struct db_bucket *bucket, ARG arg) {
143     static const int p = 5;
144     int k = ((unsigned) myrandom_r(arg->random_data)) % 100;
145     // we should close with probability approximately p / 100
146     assert(bucket->is_open);
147     if (k <= p) {
148         DB *db = bucket->db;
149         int r = db->close(db, 0);
150         CKERR(r);
151         bucket->is_open = false;
152         int old_open_buckets = toku_sync_fetch_and_sub(&open_buckets, 1);
153         assert(old_open_buckets > 0);
154         verbose_printf("decided to close a bucket's db before unlocking\n");
155     }
156     toku_mutex_unlock(&bucket->mutex);
157 }
158 
159 // scan some dbs, verifying the correct sum.
160 static int
scan_some_dbs(DB_TXN * txn,ARG arg,void * operation_extra,void * UU (stats_extra))161 scan_some_dbs(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
162     int r = 0;
163     verbose_printf("scanning some dbs\n");
164     struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
165     // scan every db, one by one, and verify that the contents are correct
166     for (int i = 0; r == 0 && run_test && i < arg->cli->num_DBs; i++) {
167         struct db_bucket *bucket = lock_and_maybe_open_some_db(arg);
168         const bool check_sum = true;
169         r = scan_op_and_maybe_check_sum(bucket->db, txn, extra, check_sum);
170         invariant(r == 0 || r == DB_LOCK_NOTGRANTED);
171         unlock_and_maybe_close_db(bucket, arg);
172     }
173     return r;
174 }
175 
176 // update a couple of dbs in some buckets with a txn
177 static int
update_some_dbs(DB_TXN * txn,ARG arg,void * op_extra,void * stats_extra)178 update_some_dbs(DB_TXN *txn, ARG arg, void *op_extra, void *stats_extra) {
179     int r = 0;
180     verbose_printf("updating some dbs\n");
181     const int iterations = choose_random_iteration_count(arg);
182     for (int i = 0; r == 0 && run_test && i < iterations; i++) {
183         struct db_bucket *bucket = lock_and_maybe_open_some_db(arg);
184         // does an update operation on this bucket's db
185         r = update_op_db(bucket->db, txn, arg, op_extra, stats_extra);
186         invariant(r == 0 || r == DB_LOCK_NOTGRANTED);
187         unlock_and_maybe_close_db(bucket, arg);
188     }
189     return r;
190 }
191 
192 // point query a couple of dbs in some buckets with a txn
193 static int
ptquery_some_dbs(DB_TXN * txn,ARG arg,void * UU (op_extra),void * UU (stats_extra))194 ptquery_some_dbs(DB_TXN *txn, ARG arg, void *UU(op_extra), void *UU(stats_extra)) {
195     int r = 0;
196     verbose_printf("querying some dbs\n");
197     const int iterations = choose_random_iteration_count(arg);
198     for (int i = 0; r == 0 && run_test && i < iterations; i++) {
199         struct db_bucket *bucket = lock_and_maybe_open_some_db(arg);
200         // does a point query on a random key for this bucket's db
201         const bool check_sum = true;
202         r = ptquery_and_maybe_check_op(bucket->db, txn, arg, check_sum);
203         invariant(r == 0 || r == DB_LOCK_NOTGRANTED);
204         unlock_and_maybe_close_db(bucket, arg);
205     }
206     return r;
207 }
208 
209 static void
stress_table(DB_ENV * env,DB ** dbp,struct cli_args * cli_args)210 stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
211     const int update_threads = cli_args->num_update_threads;
212     const int query_threads = cli_args->num_ptquery_threads;
213     const int total_threads = update_threads + query_threads + 1;
214 
215     struct arg myargs[total_threads];
216     for (int i = 0; i < total_threads; i++) {
217         arg_init(&myargs[i], dbp, env, cli_args);
218     }
219 
220     struct scan_op_extra soe[4];
221 
222     // make the forward fast scanner
223     soe[0].fast = true;
224     soe[0].fwd = true;
225     soe[0].prefetch = false;
226     myargs[0].operation_extra = &soe[0];
227     myargs[0].operation = scan_some_dbs;
228 
229     struct update_op_args uoe = get_update_op_args(cli_args, NULL);
230     // make the guy that updates the db
231     for (int i = 1; i < 1 + update_threads; ++i) {
232         myargs[i].operation_extra = &uoe;
233         myargs[i].operation = update_some_dbs;
234         myargs[i].do_prepare = true;
235     }
236     // make the guy that does point queries
237     for (int i = 1 + update_threads; i < total_threads; i++) {
238         myargs[i].operation = ptquery_some_dbs;
239         myargs[i].do_prepare = true;
240     }
241 
242     num_buckets = cli_args->num_DBs;
243     open_buckets = num_buckets;
244     // each thread gets access to this array of db buckets, from
245     // which they can choose a random db to either touch or query
246     XMALLOC_N(num_buckets, buckets);
247     for (int i = 0; i < num_buckets; i++) {
248         struct db_bucket bucket = {.env = env, .db = dbp[i], .is_open = true};
249         buckets[i] = bucket;
250         toku_mutex_init(toku_uninstrumented, &buckets[i].mutex, nullptr);
251     }
252     // run all of the query and update workers. they may randomly open
253     // and close the dbs in each db_bucket to be some random dictionary,
254     // so when they're done we'll have to clean up the mess so this
255     // stress test can exit gracefully expecting db[i] = the ith db
256     // verbose_printf("stressing %d tables using %d update threads, %d query
257     // threads\n",
258     //        num_buckets, update_threads, query_threads);
259     verbose_printf("stressing %d tables using %d update threads\n",
260             num_buckets, update_threads);
261     // stress_openclose_crash_at_end should be changed to true or false,
262     // depending if this test is for recovery or not.
263     const bool crash_at_end = stress_openclose_crash_at_end;
264     run_workers(myargs, total_threads, cli_args->num_seconds, crash_at_end, cli_args);
265 
266     // the stress test is now complete. get ready for shutdown/close.
267     //
268     // make sure that every db in the original array is opened
269     // as it was when it was passed in.
270     for (int i = 0; i < num_buckets; i++) {
271         // close whatever is open
272         if (buckets[i].is_open) {
273             DB *db = buckets[i].db;
274             int r = db->close(db, 0);
275             CKERR(r);
276         }
277         // put the correct db back, then save the pointer
278         // into the dbp array we were given
279         open_ith_db(env, &buckets[i].db, i);
280         dbp[i] = buckets[i].db;
281     }
282 
283     toku_free(buckets);
284 }
285