1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40
41 // verify that update_multiple where we change the data in row[i] col[j] from x to x+1
42
43 static const int MAX_KEYS = 3;
44
45 static int
array_size(int ndbs)46 array_size(int ndbs) {
47 return +
48 1 + // 0 for old 1 for new
49 1 + // ndbs
50 2 * MAX_KEYS * (ndbs-1);
51 }
52 static int
get_num_new_keys(int i,int dbnum)53 get_num_new_keys(int i, int dbnum) {
54 if (dbnum == 0) return 1;
55 if (i & (1<<4)) {
56 dbnum++; // Shift every once in a while.
57 }
58 return (i + dbnum) % MAX_KEYS; // 0, 1, or 2
59 }
60
61 static int
get_old_num_keys(int i,int dbnum)62 get_old_num_keys(int i, int dbnum) {
63 if (dbnum == 0) return 1;
64 return (i + dbnum) % MAX_KEYS; // 0, 1, or 2
65 }
66
67 static int
get_total_secondary_rows(int num_primary)68 get_total_secondary_rows(int num_primary) {
69 assert(num_primary % MAX_KEYS == 0);
70 return num_primary / MAX_KEYS * (0 + 1 + 2);
71 }
72
73 static int
get_old_key(int i,int dbnum,int which)74 get_old_key(int i, int dbnum, int which) {
75 assert(i < INT16_MAX / 2);
76 assert(which >= 0);
77 assert(which < 4);
78 assert(dbnum < 16);
79 if (dbnum == 0) {
80 assert(which == 0);
81 return htonl(2*i);
82 }
83 if (which >= get_old_num_keys(i, dbnum)) {
84 return htonl(-1);
85 }
86 return htonl(((2*i+0) << 16) + (dbnum<<8) + (which<<1));
87 }
88
89 static int
get_new_key(int i,int dbnum,int which)90 get_new_key(int i, int dbnum, int which) {
91 assert(which >= 0);
92 assert(which < 4);
93 assert(dbnum < 16);
94
95 if (dbnum == 0) {
96 assert(which == 0);
97 return htonl(2*i);
98 }
99 if (which >= get_num_new_keys(i, dbnum)) {
100 return htonl(-1);
101 }
102 if ((i+dbnum+which) & (1<<5)) {
103 return htonl(((2*i+0) << 16) + (dbnum<<8) + (which<<1)); // no change from original
104 }
105 return htonl(((2*i+0) << 16) + (dbnum<<8) + (which<<1) + 1);
106 }
107
108 static void
fill_data_2_and_later(int * v,int i,int ndbs)109 fill_data_2_and_later(int *v, int i, int ndbs) {
110 int index = 2;
111 for (int dbnum = 1; dbnum < ndbs; dbnum++) {
112 for (int which = 0; which < MAX_KEYS; ++which) {
113 v[index++] = get_old_key(i, dbnum, which);
114 }
115 }
116 for (int dbnum = 1; dbnum < ndbs; dbnum++) {
117 for (int which = 0; which < MAX_KEYS; ++which) {
118 v[index++] = get_new_key(i, dbnum, which);
119 }
120 }
121 }
122
123
124 static void
fill_old_data(int * v,int i,int ndbs)125 fill_old_data(int *v, int i, int ndbs) {
126 v[0] = 0;
127 v[1] = ndbs;
128 fill_data_2_and_later(v, i, ndbs);
129 }
130
131 static void
fill_new_data(int * v,int i,int ndbs)132 fill_new_data(int *v, int i, int ndbs) {
133 v[0] = 1;
134 v[1] = ndbs;
135 fill_data_2_and_later(v, i, ndbs);
136 }
137
138
139 static int
put_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_key_arrays,DBT_ARRAY * dest_val_arrays,const DBT * src_key,const DBT * src_val)140 put_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_key_arrays, DBT_ARRAY *dest_val_arrays, const DBT *src_key, const DBT *src_val) {
141 (void)src_val;
142 assert(src_db != dest_db);
143 assert(src_db);
144 int dbnum;
145 assert(dest_db->descriptor->dbt.size == sizeof dbnum);
146 memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
147 assert(dbnum > 0);
148
149 int pri_key = *(int *) src_key->data;
150 int* pri_val = (int*) src_val->data;
151
152 bool is_new = pri_val[0] == 1;
153 int i = (ntohl(pri_key)) / 2;
154
155 int num_keys = is_new ? get_num_new_keys(i, dbnum) : get_old_num_keys(i, dbnum);
156
157 toku_dbt_array_resize(dest_key_arrays, num_keys);
158
159 if (dest_val_arrays) {
160 toku_dbt_array_resize(dest_val_arrays, num_keys);
161 }
162
163 int ndbs = pri_val[1];
164 int index = 2 + (dbnum-1)*MAX_KEYS;
165 if (is_new) {
166 index += MAX_KEYS*(ndbs-1);
167 }
168
169 assert(src_val->size % sizeof(int) == 0);
170 assert((int)src_val->size / 4 >= index + num_keys);
171
172
173 for (int which = 0; which < num_keys; which++) {
174 DBT *dest_key = &dest_key_arrays->dbts[which];
175 DBT *dest_val = NULL;
176
177 assert(dest_key->flags == DB_DBT_REALLOC);
178 if (dest_key->ulen < sizeof(int)) {
179 dest_key->data = toku_xrealloc(dest_key->data, sizeof(int));
180 dest_key->ulen = sizeof(int);
181 }
182 dest_key->size = sizeof(int);
183 if (dest_val_arrays) {
184 dest_val = &dest_val_arrays->dbts[which];
185 assert(dest_val->flags == DB_DBT_REALLOC);
186 dest_val->size = 0;
187 }
188 int new_key = is_new ? get_new_key(i, dbnum, which) : get_old_key(i, dbnum, which);
189 assert(new_key == pri_val[index + which]);
190 *(int*)dest_key->data = new_key;
191 }
192 return 0;
193 }
194
195 static int
del_callback(DB * dest_db,DB * src_db,DBT_ARRAY * dest_key_arrays,const DBT * src_key,const DBT * src_data)196 del_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_key_arrays, const DBT *src_key, const DBT *src_data) {
197 return put_callback(dest_db, src_db, dest_key_arrays, NULL, src_key, src_data);
198 }
199
200 static void
do_updates(DB_ENV * env,DB * db[],int ndbs,int nrows)201 do_updates(DB_ENV *env, DB *db[], int ndbs, int nrows) {
202 assert(ndbs > 0);
203 int r;
204 DB_TXN *txn = NULL;
205 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
206 int narrays = 2 * ndbs;
207 DBT_ARRAY keys[narrays];
208 DBT_ARRAY vals[narrays];
209 for (int i = 0; i < narrays; i++) {
210 toku_dbt_array_init(&keys[i], 1);
211 toku_dbt_array_init(&vals[i], 1);
212 }
213
214 for (int i = 0; i < nrows; i++) {
215
216 // update the data i % ndbs col from x to x+1
217
218 int old_k = get_old_key(i, 0, 0);
219 DBT old_key; dbt_init(&old_key, &old_k, sizeof old_k);
220 int new_k = get_new_key(i, 0, 0);
221 DBT new_key; dbt_init(&new_key, &new_k, sizeof new_k);
222
223 int v[array_size(ndbs)]; fill_old_data(v, i, ndbs);
224 DBT old_data; dbt_init(&old_data, &v[0], sizeof v);
225
226 int newv[array_size(ndbs)]; fill_new_data(newv, i, ndbs);
227 DBT new_data; dbt_init(&new_data, &newv[0], sizeof newv);
228
229 uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
230
231 r = env->update_multiple(env, db[0], txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, narrays, keys, narrays, vals);
232 assert_zero(r);
233 }
234 for (int i = 0; i < narrays; i++) {
235 toku_dbt_array_destroy(&keys[i]);
236 toku_dbt_array_destroy(&vals[i]);
237 }
238 r = txn->commit(txn, 0); assert_zero(r);
239 }
240
241 static void
populate_primary(DB_ENV * env,DB * db,int ndbs,int nrows)242 populate_primary(DB_ENV *env, DB *db, int ndbs, int nrows) {
243 int r;
244 DB_TXN *txn = NULL;
245 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
246
247 // populate
248 for (int i = 0; i < nrows; i++) {
249 int k = get_old_key(i, 0, 0);
250 int v[array_size(ndbs)];
251 fill_old_data(v, i, ndbs);
252 DBT key; dbt_init(&key, &k, sizeof k);
253 DBT val; dbt_init(&val, &v[0], sizeof v);
254 r = db->put(db, txn, &key, &val, 0); assert_zero(r);
255 }
256
257 r = txn->commit(txn, 0); assert_zero(r);
258 }
259
260 static void
populate_secondary(DB_ENV * env,DB * db,int dbnum,int nrows)261 populate_secondary(DB_ENV *env, DB *db, int dbnum, int nrows) {
262 int r;
263 DB_TXN *txn = NULL;
264 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
265
266 // populate
267 for (int i = 0; i < nrows; i++) {
268 for (int which = 0; which < MAX_KEYS; which++) {
269 int k = get_old_key(i, dbnum, which);
270 if (k >= 0) {
271 DBT key; dbt_init(&key, &k, sizeof k);
272 DBT val; dbt_init(&val, NULL, 0);
273 r = db->put(db, txn, &key, &val, 0); assert_zero(r);
274 }
275 }
276 }
277
278 r = txn->commit(txn, 0); assert_zero(r);
279 }
280
281 static void
verify_pri_seq(DB_ENV * env,DB * db,int ndbs,int nrows)282 verify_pri_seq(DB_ENV *env, DB *db, int ndbs, int nrows) {
283 const int dbnum = 0;
284 int r;
285 DB_TXN *txn = NULL;
286 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
287
288 DBC *cursor = NULL;
289 r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
290 int i;
291 for (i = 0; ; i++) {
292 DBT key; memset(&key, 0, sizeof key);
293 DBT val; memset(&val, 0, sizeof val);
294 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
295 if (r != 0)
296 break;
297 int k;
298 int expectk = get_new_key(i, dbnum, 0);
299
300 assert(key.size == sizeof k);
301 memcpy(&k, key.data, key.size);
302 assert(k == expectk);
303
304 int num_keys = array_size(ndbs);
305 assert(val.size == num_keys*sizeof(int));
306 int v[num_keys]; fill_new_data(v, i, ndbs);
307 assert(memcmp(val.data, v, val.size) == 0);
308 }
309 assert(i == nrows); // if (i != nrows) printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, i, nrows); // assert(i == nrows);
310 r = cursor->c_close(cursor); assert_zero(r);
311 r = txn->commit(txn, 0); assert_zero(r);
312 }
313
314 static void
verify_sec_seq(DB_ENV * env,DB * db,int dbnum,int nrows)315 verify_sec_seq(DB_ENV *env, DB *db, int dbnum, int nrows) {
316 assert(dbnum > 0);
317 int r;
318 DB_TXN *txn = NULL;
319 r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
320
321 DBC *cursor = NULL;
322 r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
323 int i;
324 int rows_found = 0;
325
326 for (i = 0; ; i++) {
327 int num_keys = get_num_new_keys(i, dbnum);
328 for (int which = 0; which < num_keys; ++which) {
329 DBT key; memset(&key, 0, sizeof key);
330 DBT val; memset(&val, 0, sizeof val);
331 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
332 if (r != 0) {
333 CKERR2(r, DB_NOTFOUND);
334 goto done;
335 }
336 rows_found++;
337 int k;
338 int expectk = get_new_key(i, dbnum, which);
339
340 assert(key.size == sizeof k);
341 memcpy(&k, key.data, key.size);
342 int got_i = (ntohl(k) >> 16) / 2;
343 if (got_i < i) {
344 // Will fail. Too many old i's
345 assert(k == expectk);
346 } else if (got_i > i) {
347 // Will fail. Too few in previous i.
348 assert(k == expectk);
349 }
350
351 if (k != expectk && which < get_old_num_keys(i, dbnum) && k == get_old_key(i, dbnum, which)) {
352 // Will fail, never got updated.
353 assert(k == expectk);
354 }
355 assert(k == expectk);
356 assert(val.size == 0);
357 }
358 }
359 done:
360 assert(rows_found == get_total_secondary_rows(nrows));
361 r = cursor->c_close(cursor); assert_zero(r);
362 r = txn->commit(txn, 0); assert_zero(r);
363 }
364
365 static void
run_test(int ndbs,int nrows)366 run_test(int ndbs, int nrows) {
367 int r;
368 DB_ENV *env = NULL;
369 r = db_env_create(&env, 0); assert_zero(r);
370
371 r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
372 r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
373
374 r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
375
376 DB *db[ndbs];
377 for (int dbnum = 0; dbnum < ndbs; dbnum++) {
378 r = db_create(&db[dbnum], env, 0); assert_zero(r);
379
380 DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
381
382 char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
383 r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
384 IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
385 { int chk_r = db[dbnum]->change_descriptor(db[dbnum], txn_desc, &dbt_dbnum, 0); CKERR(chk_r); }
386 });
387 }
388
389 populate_primary(env, db[0], ndbs, nrows);
390 for (int dbnum = 1; dbnum < ndbs-1; dbnum++) {
391 populate_secondary(env, db[dbnum], dbnum, nrows);
392 }
393
394 DB_TXN *indexer_txn = NULL;
395 r = env->txn_begin(env, NULL, &indexer_txn, 0); assert_zero(r);
396
397 DB_INDEXER *indexer = NULL;
398 uint32_t db_flags = 0;
399 assert(ndbs > 2);
400 r = env->create_indexer(env, indexer_txn, &indexer, db[0], 1, &db[ndbs-1], &db_flags, 0); assert_zero(r);
401
402 do_updates(env, db, ndbs, nrows);
403
404 r = indexer->build(indexer); assert_zero(r);
405 r = indexer->close(indexer); assert_zero(r);
406
407 r = indexer_txn->commit(indexer_txn, 0); assert_zero(r);
408
409 verify_pri_seq(env, db[0], ndbs, nrows);
410 for (int dbnum = 1; dbnum < ndbs; dbnum++)
411 verify_sec_seq(env, db[dbnum], dbnum, nrows);
412 for (int dbnum = 0; dbnum < ndbs; dbnum++)
413 r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
414
415 r = env->close(env, 0); assert_zero(r);
416 }
417
418 int
test_main(int argc,char * const argv[])419 test_main(int argc, char * const argv[]) {
420 int r;
421 int ndbs = 10;
422 int nrows = MAX_KEYS*(1<<5)*4;
423
424 // parse_args(argc, argv);
425 for (int i = 1; i < argc; i++) {
426 char * const arg = argv[i];
427 if (strcmp(arg, "-v") == 0) {
428 verbose++;
429 continue;
430 }
431 if (strcmp(arg, "-q") == 0) {
432 verbose = 0;
433 continue;
434 }
435 if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
436 ndbs = atoi(argv[++i]);
437 continue;
438 }
439 if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
440 nrows = atoi(argv[++i]);
441 continue;
442 }
443 }
444 while (nrows % (MAX_KEYS*(1<<5)) != 0) {
445 nrows++;
446 }
447 //Need at least one to update, and one to index
448 while (ndbs < 3) {
449 ndbs++;
450 }
451
452 toku_os_recursive_delete(TOKU_TEST_FILENAME);
453 r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
454
455 run_test(ndbs, nrows);
456
457 return 0;
458 }
459
460