1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 #include "ydb.h"
41 #include "toku_pthread.h"
42 
43 // this test reproduces the rollback log corruption that occurs when hot indexing runs concurrent with a long abort
44 // the concurrent operation occurs when the abort periodically releases the ydb lock which allows the hot indexer
45 // to run.  the hot indexer erroneously append to the rollback log that is in the process of being aborted.
46 
47 static int
put_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)48 put_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) {
49     toku_dbt_array_resize(dest_keys, 1);
50     toku_dbt_array_resize(dest_vals, 1);
51     DBT *dest_key = &dest_keys->dbts[0];
52     DBT *dest_val = &dest_vals->dbts[0];
53 
54     (void) dest_db; (void) src_db; (void) dest_keys; (void) dest_vals; (void) src_key; (void) src_val;
55     lazy_assert(src_db != NULL && dest_db != NULL);
56 
57     if (dest_key->flags == DB_DBT_REALLOC) {
58         dest_key->data = toku_realloc(dest_key->data, src_val->size);
59         memcpy(dest_key->data, src_val->data, src_val->size);
60         dest_key->size = src_val->size;
61     }
62     dest_val->size = 0;
63 
64     return 0;
65 }
66 
67 struct indexer_arg {
68     DB_ENV *env;
69     DB *src_db;
70     int n_dest_db;
71     DB **dest_db;
72 };
73 
74 static void *
indexer_thread(void * arg)75 indexer_thread(void *arg) {
76     struct indexer_arg *indexer_arg = (struct indexer_arg *) arg;
77     DB_ENV *env = indexer_arg->env;
78     int r;
79 
80     DB_TXN *indexer_txn = NULL;
81     r = env->txn_begin(env, NULL, &indexer_txn, 0); assert_zero(r);
82 
83     DB_INDEXER *indexer = NULL;
84     r = env->create_indexer(env, indexer_txn, &indexer, indexer_arg->src_db, indexer_arg->n_dest_db, indexer_arg->dest_db, NULL, 0); assert_zero(r);
85 
86     r = indexer->build(indexer); assert_zero(r);
87 
88     r = indexer->close(indexer); assert_zero(r);
89 
90     r = indexer_txn->commit(indexer_txn, 0); assert_zero(r);
91 
92     return arg;
93 }
94 
95 static void
verify_empty(DB_ENV * env,DB * db)96 verify_empty(DB_ENV *env, DB *db) {
97     int r;
98 
99     DB_TXN *txn = NULL;
100     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
101 
102     DBC *cursor = NULL;
103     r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
104 
105     DBT key, val;
106     r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
107     assert(r == DB_NOTFOUND);
108 
109     r = cursor->c_close(cursor); assert_zero(r);
110 
111     r = txn->commit(txn, 0); assert_zero(r);
112 }
113 
114 static void
run_test(void)115 run_test(void) {
116     int r;
117     DB_ENV *env = NULL;
118     r = db_env_create(&env, 0); assert_zero(r);
119 
120     r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
121 
122     r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
123 
124     DB *src_db = NULL;
125     r = db_create(&src_db, env, 0); assert_zero(r);
126     r = src_db->open(src_db, NULL, "0.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
127 
128     DB *dest_db = NULL;
129     r = db_create(&dest_db, env, 0); assert_zero(r);
130     r = dest_db->open(dest_db, NULL, "1.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
131 
132     DB_TXN *txn = NULL;
133     r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
134 
135     // insert some
136     for (int i = 0; i < 246723; i++) {
137         int k = htonl(i);
138         int v = i;
139         DBT key; dbt_init(&key, &k, sizeof k);
140         DBT val; dbt_init(&val, &v, sizeof v);
141         r = src_db->put(src_db, txn, &key, &val, 0); assert_zero(r);
142     }
143 
144     // run the indexer
145     struct indexer_arg indexer_arg = {env, src_db, 1, &dest_db};
146     toku_pthread_t pid;
147     r = toku_pthread_create(
148         toku_uninstrumented, &pid, nullptr, indexer_thread, &indexer_arg);
149     assert_zero(r);
150 
151     r = txn->abort(txn);
152     assert_zero(r);
153 
154     void *ret;
155     r = toku_pthread_join(pid, &ret); assert_zero(r);
156 
157     verify_empty(env, src_db);
158     verify_empty(env, dest_db);
159 
160     r = src_db->close(src_db, 0); assert_zero(r);
161 
162     r = dest_db->close(dest_db, 0); assert_zero(r);
163 
164     r = env->close(env, 0); assert_zero(r);
165 }
166 
167 int
test_main(int argc,char * const argv[])168 test_main(int argc, char * const argv[]) {
169     int r;
170 
171     // parse_args(argc, argv);
172     for (int i = 1; i < argc; i++) {
173         char * const arg = argv[i];
174         if (strcmp(arg, "-v") == 0) {
175             verbose++;
176             continue;
177         }
178         if (strcmp(arg, "-q") == 0) {
179             verbose = 0;
180             continue;
181         }
182     }
183 
184     toku_os_recursive_delete(TOKU_TEST_FILENAME);
185     r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
186 
187     run_test();
188 
189     return 0;
190 }
191 
192