1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40
41 // verify that put_multiple inserts the correct rows into N dictionaries
42 // verify that pu_multiple locks the correct keys for N dictionaries
43
44 const int max_rows_per_primary = 9;
45
46 static uint32_t
get_total_secondary_rows(uint32_t num_primary)47 get_total_secondary_rows(uint32_t num_primary) {
48 assert((num_primary % (max_rows_per_primary+1)) == 0);
49 return num_primary / (max_rows_per_primary+1) *
50 ( (max_rows_per_primary) * (max_rows_per_primary+1) / 2 );
51 }
52
53 static uint8_t
get_num_keys(uint16_t i,uint8_t dbnum)54 get_num_keys(uint16_t i, uint8_t dbnum) {
55 return (i+dbnum) % (max_rows_per_primary + 1); // 0..9.. 10 choices
56 }
57
58 static uint16_t
get_total_num_keys(uint16_t i,uint8_t num_dbs)59 get_total_num_keys(uint16_t i, uint8_t num_dbs) {
60 uint16_t sum = 0;
61 for (uint8_t db = 0; db < num_dbs; ++db) {
62 sum += get_num_keys(i, db);
63 }
64 return sum;
65 }
66
67 static uint32_t
get_key(uint16_t i,uint8_t dbnum,uint8_t which)68 get_key(uint16_t i, uint8_t dbnum, uint8_t which) {
69 uint32_t i32 = i;
70 uint32_t dbnum32 = dbnum;
71 uint32_t which32 = which;
72 uint32_t x = (dbnum32<<24) | (i32) | (which32<<8);
73 return x;
74 }
75
76 static void
get_data(uint32_t * v,uint8_t i,uint8_t ndbs)77 get_data(uint32_t *v, uint8_t i, uint8_t ndbs) {
78 int index = 0;
79 for (uint8_t dbnum = 0; dbnum < ndbs; dbnum++) {
80 for (uint8_t which = 0; which < get_num_keys(i, dbnum); ++which) {
81 v[index++] = get_key(i, dbnum, which);
82 }
83 }
84 }
85
86 static int
put_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)87 put_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) {
88 (void) src_val;
89 uint8_t dbnum;
90 assert(dest_db->descriptor->dbt.size == sizeof dbnum);
91 memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
92
93 assert(dbnum > 0); // Does not get called for primary.
94 assert(dest_db != src_db);
95
96 assert(src_key->size == 2);
97 uint16_t i = *(uint16_t*)src_key->data;
98 uint8_t num_keys = get_num_keys(i, dbnum);
99
100 toku_dbt_array_resize(dest_keys, num_keys);
101 if (dest_vals) {
102 toku_dbt_array_resize(dest_vals, num_keys);
103 }
104
105 for (uint8_t which = 0; which < num_keys; ++which) {
106 DBT *dest_key = &dest_keys->dbts[which];
107
108 assert(dest_key->flags == DB_DBT_REALLOC);
109 {
110 // Memory management
111 if (dest_key->ulen < sizeof(uint32_t)) {
112 dest_key->data = toku_xrealloc(dest_key->data, sizeof(uint32_t));
113 dest_key->ulen = sizeof(uint32_t);
114 }
115 dest_key->size = sizeof(uint32_t);
116 }
117 *(uint32_t*)dest_key->data = get_key(i, dbnum, which);
118
119 if (dest_vals) {
120 DBT *dest_val = &dest_vals->dbts[which];
121 dest_val->flags = 0;
122 dest_val->data = nullptr;
123 dest_val->size = 0;
124 }
125 }
126 return 0;
127 }
128
129 static int
del_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,const DBT * src_key,const DBT * src_data)130 del_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, const DBT *src_key, const DBT *src_data) {
131 return put_callback(dest_db, src_db, dest_keys, NULL, src_key, src_data);
132 }
133
134 static void
verify_locked(DB_ENV * env,DB * db,uint8_t dbnum,uint16_t i)135 verify_locked(DB_ENV *env, DB *db, uint8_t dbnum, uint16_t i) {
136 int r;
137 DB_TXN *txn = NULL;
138 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
139 if (dbnum == 0) {
140 DBT key; dbt_init(&key, &i, sizeof i);
141 r = db->del(db, txn, &key, DB_DELETE_ANY); CKERR2(r, DB_LOCK_NOTGRANTED);
142 } else {
143 for (uint8_t which = 0; which < get_num_keys(i, dbnum); ++which) {
144 uint32_t k = get_key(i, dbnum, which);
145 DBT key; dbt_init(&key, &k, sizeof k);
146 r = db->del(db, txn, &key, DB_DELETE_ANY); CKERR2(r, DB_LOCK_NOTGRANTED);
147 }
148 }
149 r = txn->abort(txn); assert_zero(r);
150 }
151
152 static void
verify_seq_primary(DB_ENV * env,DB * db,int dbnum,int ndbs,int nrows)153 verify_seq_primary(DB_ENV *env, DB *db, int dbnum, int ndbs, int nrows) {
154 assert(dbnum==0);
155 int r;
156 DB_TXN *txn = NULL;
157 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
158
159 DBC *cursor = NULL;
160 r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
161 int i;
162 for (i = 0; ; i++) {
163 DBT key; memset(&key, 0, sizeof key);
164 DBT val; memset(&val, 0, sizeof val);
165 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
166 if (r != 0)
167 break;
168 uint16_t k;
169 assert(key.size == sizeof k);
170 memcpy(&k, key.data, key.size);
171 assert(k == i);
172
173 uint32_t total_rows = get_total_num_keys(i, ndbs);
174 assert(val.size == total_rows * sizeof (uint32_t));
175 uint32_t v[total_rows]; get_data(v, i, ndbs);
176 assert(memcmp(val.data, v, val.size) == 0);
177 }
178 assert(i == nrows);
179 r = cursor->c_close(cursor); assert_zero(r);
180 r = txn->commit(txn, 0); assert_zero(r);
181 }
182
183 static void
verify_seq(DB_ENV * env,DB * db,uint8_t dbnum,uint8_t ndbs,uint16_t nrows_primary)184 verify_seq(DB_ENV *env, DB *db, uint8_t dbnum, uint8_t ndbs, uint16_t nrows_primary) {
185 assert(dbnum > 0);
186 assert(dbnum < ndbs);
187 uint32_t nrows = get_total_secondary_rows(nrows_primary);
188 int r;
189 DB_TXN *txn = NULL;
190 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
191
192 DBC *cursor = NULL;
193 r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
194 uint16_t rows_found = 0;
195 uint16_t source_i = 0;
196 DBT key; memset(&key, 0, sizeof key);
197 DBT val; memset(&val, 0, sizeof val);
198 for (source_i = 0; source_i < nrows_primary; ++source_i) {
199 uint8_t num_keys = get_num_keys(source_i, dbnum);
200 for (uint8_t which = 0; which < num_keys; ++which) {
201 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
202 CKERR(r);
203 uint32_t k;
204 assert(key.size == sizeof k);
205 memcpy(&k, key.data, key.size);
206 assert(k == get_key(source_i, dbnum, which));
207 assert(val.size == 0);
208 rows_found++;
209 }
210 }
211 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
212 CKERR2(r, DB_NOTFOUND);
213 assert(rows_found == nrows);
214 r = cursor->c_close(cursor); assert_zero(r);
215 r = txn->commit(txn, 0); assert_zero(r);
216 }
217
218 static void
verify(DB_ENV * env,DB * db[],int ndbs,int nrows)219 verify(DB_ENV *env, DB *db[], int ndbs, int nrows) {
220 verify_seq_primary(env, db[0], 0, ndbs, nrows);
221 for (int dbnum = 1; dbnum < ndbs; dbnum++)
222 verify_seq(env, db[dbnum], dbnum, ndbs, nrows);
223 }
224
225 static void
verify_empty(DB_ENV * env,DB * db)226 verify_empty(DB_ENV *env, DB *db) {
227 int r;
228 DB_TXN *txn = NULL;
229 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
230
231 DBC *cursor = NULL;
232 r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
233 int i;
234 for (i = 0; ; i++) {
235 DBT key; memset(&key, 0, sizeof key);
236 DBT val; memset(&val, 0, sizeof val);
237 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
238 if (r != 0)
239 break;
240 }
241 assert_zero(i);
242 r = cursor->c_close(cursor); assert_zero(r);
243 r = txn->commit(txn, 0); assert_zero(r);
244 }
245
246 static void
verify_del(DB_ENV * env,DB * db[],int ndbs)247 verify_del(DB_ENV *env, DB *db[], int ndbs) {
248 for (int dbnum = 0; dbnum < ndbs; dbnum++)
249 verify_empty(env, db[dbnum]);
250 }
251
252 static void
populate(DB_ENV * env,DB * db[],uint8_t ndbs,uint16_t nrows,bool del)253 populate(DB_ENV *env, DB *db[], uint8_t ndbs, uint16_t nrows, bool del) {
254 int r;
255 DB_TXN *txn = NULL;
256 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
257
258 DBT_ARRAY key_arrays[ndbs];
259 DBT_ARRAY val_arrays[ndbs];
260 for (uint8_t i = 0; i < ndbs; ++i) {
261 toku_dbt_array_init(&key_arrays[i], 1);
262 toku_dbt_array_init(&val_arrays[i], 1);
263 }
264 // populate
265 for (uint16_t i = 0; i < nrows; i++) {
266 uint32_t total_rows = get_total_num_keys(i, ndbs);
267 uint16_t k = i;
268 uint32_t v[total_rows]; get_data(v, i, ndbs);
269 DBT pri_key; dbt_init(&pri_key, &k, sizeof k);
270 DBT pri_val; dbt_init(&pri_val, &v[0], sizeof v);
271 uint32_t flags[ndbs]; memset(flags, 0, sizeof flags);
272 if (del) {
273 r = env->del_multiple(env, db[0], txn, &pri_key, &pri_val, ndbs, db, key_arrays, flags);
274 } else {
275 r = env->put_multiple(env, db[0], txn, &pri_key, &pri_val, ndbs, db, key_arrays, val_arrays, flags);
276 }
277 assert_zero(r);
278 for (int dbnum = 0; dbnum < ndbs; dbnum++)
279 verify_locked(env, db[dbnum], dbnum, i);
280 }
281 for (uint8_t i = 0; i < ndbs; ++i) {
282 toku_dbt_array_destroy(&key_arrays[i]);
283 toku_dbt_array_destroy(&val_arrays[i]);
284 }
285
286 r = txn->commit(txn, 0); assert_zero(r);
287 }
288
289 static void
run_test(int ndbs,int nrows)290 run_test(int ndbs, int nrows) {
291 int r;
292 DB_ENV *env = NULL;
293 r = db_env_create(&env, 0); assert_zero(r);
294
295 r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
296 r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
297
298 r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
299
300 DB *db[ndbs];
301 for (uint8_t dbnum = 0; dbnum < ndbs; dbnum++) {
302 r = db_create(&db[dbnum], env, 0); assert_zero(r);
303
304 DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
305
306 char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
307 r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
308 assert_zero(r);
309
310 IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
311 { int chk_r = db[dbnum]->change_descriptor(db[dbnum], txn_desc, &dbt_dbnum, 0); CKERR(chk_r); }
312 });
313 }
314
315 populate(env, db, ndbs, nrows, false);
316
317 verify(env, db, ndbs, nrows);
318
319 populate(env, db, ndbs, nrows, true);
320
321 verify_del(env, db, ndbs);
322
323 for (int dbnum = 0; dbnum < ndbs; dbnum++)
324 r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
325
326 r = env->close(env, 0); assert_zero(r);
327 }
328
329 int
test_main(int argc,char * const argv[])330 test_main(int argc, char * const argv[]) {
331 int r;
332 int ndbs = 16;
333 int nrows = 100;
334
335 // parse_args(argc, argv);
336 for (int i = 1; i < argc; i++) {
337 char * const arg = argv[i];
338 if (strcmp(arg, "-v") == 0) {
339 verbose++;
340 continue;
341 }
342 if (strcmp(arg, "-q") == 0) {
343 verbose = 0;
344 continue;
345 }
346 if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
347 ndbs = atoi(argv[++i]);
348 continue;
349 }
350 if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
351 nrows = atoi(argv[++i]);
352 continue;
353 }
354 }
355 //rows should be divisible by max_rows + 1 (so that we have an equal number of each type and we know the total)
356 if (nrows % (max_rows_per_primary+1) != 0) {
357 nrows += (max_rows_per_primary+1) - (nrows % (max_rows_per_primary+1));
358 }
359 assert(ndbs >= 0);
360 assert(ndbs < (1<<8) - 1);
361 assert(nrows >= 0);
362 assert(nrows < (1<<15)); // Leave plenty of room
363
364 toku_os_recursive_delete(TOKU_TEST_FILENAME);
365 r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
366
367 run_test(ndbs, nrows);
368
369 return 0;
370 }
371
372