1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40
41 // verify that del_multiple deletes the correct key from N dictionaries
42 // verify that del_multiple locks the correct key for N dictionaries
43
44 static int
get_key(int i,int dbnum)45 get_key(int i, int dbnum) {
46 return htonl(i + dbnum);
47 }
48
49 static void
get_data(int * v,int i,int ndbs)50 get_data(int *v, int i, int ndbs) {
51 for (int dbnum = 0; dbnum < ndbs; dbnum++) {
52 v[dbnum] = get_key(i, dbnum);
53 }
54 }
55
56 static int
del_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,const DBT * src_key,const DBT * src_data)57 del_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, const DBT *src_key, const DBT *src_data) {
58 toku_dbt_array_resize(dest_keys, 1);
59 DBT *dest_key = &dest_keys->dbts[0];
60 (void) dest_db; (void) src_db; (void) dest_keys; (void) src_key; (void) src_data;
61 assert(src_db == NULL);
62
63 unsigned int dbnum;
64 assert(dest_db->descriptor->dbt.size == sizeof dbnum);
65 memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
66 assert(dbnum < src_data->size / sizeof (int));
67
68 int *pri_data = (int *) src_data->data;
69
70 assert(dest_key->flags == 0);
71 dest_key->size = sizeof (int);
72 dest_key->data = &pri_data[dbnum];
73
74 return 0;
75 }
76
77 static void
verify_locked(DB_ENV * env,DB * db,int k)78 verify_locked(DB_ENV *env, DB *db, int k) {
79 int r;
80 DB_TXN *txn = NULL;
81 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
82 DBT key; dbt_init(&key, &k, sizeof k);
83 r = db->del(db, txn, &key, DB_DELETE_ANY); assert(r == DB_LOCK_NOTGRANTED);
84 r = txn->abort(txn); assert_zero(r);
85 }
86
87 static void
verify_empty(DB_ENV * env,DB * db)88 verify_empty(DB_ENV *env, DB *db) {
89 int r;
90 DB_TXN *txn = NULL;
91 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
92
93 DBC *cursor = NULL;
94 r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
95 int i;
96 for (i = 0; ; i++) {
97 DBT key; memset(&key, 0, sizeof key);
98 DBT val; memset(&val, 0, sizeof val);
99 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
100 if (r != 0)
101 break;
102 }
103 assert_zero(i);
104 r = cursor->c_close(cursor); assert_zero(r);
105 r = txn->commit(txn, 0); assert_zero(r);
106 }
107
108 static void
verify_del_multiple(DB_ENV * env,DB * db[],int ndbs,int nrows)109 verify_del_multiple(DB_ENV *env, DB *db[], int ndbs, int nrows) {
110 int r;
111 DB_TXN *deltxn = NULL;
112 r = env->txn_begin(env, NULL, &deltxn, 0); assert_zero(r);
113 for (int i = 0; i < nrows; i++) {
114 int k = get_key(i, 0);
115 DBT pri_key; dbt_init(&pri_key, &k, sizeof k);
116 int v[ndbs]; get_data(v, i, ndbs);
117 DBT pri_data; dbt_init(&pri_data, &v[0], sizeof v);
118 DBT keys[ndbs]; memset(keys, 0, sizeof keys);
119 uint32_t flags[ndbs]; memset(flags, 0, sizeof flags);
120 r = env_del_multiple_test_no_array(env, NULL, deltxn, &pri_key, &pri_data, ndbs, db, keys, flags); assert_zero(r);
121 for (int dbnum = 0; dbnum < ndbs; dbnum++)
122 verify_locked(env, db[dbnum], get_key(i, dbnum));
123 }
124 r = deltxn->commit(deltxn, 0); assert_zero(r);
125 for (int dbnum = 0; dbnum < ndbs; dbnum++)
126 verify_empty(env, db[dbnum]);
127 }
128
129 static void
populate_primary(DB_ENV * env,DB * db,int ndbs,int nrows)130 populate_primary(DB_ENV *env, DB *db, int ndbs, int nrows) {
131 int r;
132 DB_TXN *txn = NULL;
133 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
134
135 // populate
136 for (int i = 0; i < nrows; i++) {
137 int k = get_key(i, 0);
138 int v[ndbs]; get_data(v, i, ndbs);
139 DBT key; dbt_init(&key, &k, sizeof k);
140 DBT val; dbt_init(&val, &v[0], sizeof v);
141 r = db->put(db, txn, &key, &val, 0); assert_zero(r);
142 }
143
144 r = txn->commit(txn, 0); assert_zero(r);
145 }
146
147 static void
populate_secondary(DB_ENV * env,DB * db,int dbnum,int nrows)148 populate_secondary(DB_ENV *env, DB *db, int dbnum, int nrows) {
149 int r;
150 DB_TXN *txn = NULL;
151 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
152
153 // populate
154 for (int i = 0; i < nrows; i++) {
155 int k = get_key(i, dbnum);
156 DBT key; dbt_init(&key, &k, sizeof k);
157 DBT val; dbt_init(&val, NULL, 0);
158 r = db->put(db, txn, &key, &val, 0); assert_zero(r);
159 }
160
161 r = txn->commit(txn, 0); assert_zero(r);
162 }
163
164 static void
run_test(int ndbs,int nrows)165 run_test(int ndbs, int nrows) {
166 int r;
167 DB_ENV *env = NULL;
168 r = db_env_create(&env, 0); assert_zero(r);
169
170 r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
171
172 r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
173
174 DB *db[ndbs];
175 for (int dbnum = 0; dbnum < ndbs; dbnum++) {
176 r = db_create(&db[dbnum], env, 0); assert_zero(r);
177
178 char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
179 r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
180
181 DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
182 IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
183 { int chk_r = db[dbnum]->change_descriptor(db[dbnum], txn_desc, &dbt_dbnum, 0); CKERR(chk_r); }
184 });
185 }
186
187 for (int dbnum = 0; dbnum < ndbs; dbnum++) {
188 if (dbnum == 0)
189 populate_primary(env, db[dbnum], ndbs, nrows);
190 else
191 populate_secondary(env, db[dbnum], dbnum, nrows);
192 }
193
194 verify_del_multiple(env, db, ndbs, nrows);
195
196 for (int dbnum = 0; dbnum < ndbs; dbnum++)
197 r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
198
199 r = env->close(env, 0); assert_zero(r);
200 }
201
202 int
test_main(int argc,char * const argv[])203 test_main(int argc, char * const argv[]) {
204 int r;
205 int ndbs = 2;
206 int nrows = 2;
207
208 // parse_args(argc, argv);
209 for (int i = 1; i < argc; i++) {
210 char * const arg = argv[i];
211 if (strcmp(arg, "-v") == 0) {
212 verbose++;
213 continue;
214 }
215 if (strcmp(arg, "-q") == 0) {
216 verbose = 0;
217 continue;
218 }
219 if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
220 ndbs = atoi(argv[++i]);
221 continue;
222 }
223 if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
224 nrows = atoi(argv[++i]);
225 continue;
226 }
227 }
228
229 toku_os_recursive_delete(TOKU_TEST_FILENAME);
230 r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
231
232 run_test(ndbs, nrows);
233
234 return 0;
235 }
236
237