1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40 #include "toku_pthread.h"
41 #include <db.h>
42 #include <sys/stat.h>
43 #include "key-val.h"
44
45 enum {NUM_INDEXER_INDEXES=1};
46 static const int NUM_DBS = NUM_INDEXER_INDEXES + 1; // 1 for source DB
47 static const int NUM_ROWS = 10000;
48 int num_rows;
49 typedef enum {FORWARD = 0, BACKWARD} Direction;
50 typedef enum {TXN_NONE = 0, TXN_CREATE = 1, TXN_END = 2} TxnWork;
51
52 DB_ENV *env;
53
54 /*
55 * client() is a routine intended to be run in a separate thread from index creation
56 * - it takes a client spec which describes work to be done
57 * - direction : move to ever increasing or decreasing rows
58 * - txnwork : whether a transaction should be created or closed within the client
59 * (allows client transaction to start before or during index creation,
60 * and to close during or after index creation)
61 */
62
63 typedef struct {
64 uint32_t num; // number of rows to write
65 uint32_t start; // approximate start row
66 int offset; // offset from stride (= MAX_CLIENTS)
67 Direction dir;
68 int txnwork;
69 DB_TXN *txn;
70 uint32_t max_inserts_per_txn; // this is for the parent transaction
71 DB **dbs;
72 int client_number;
73 uint32_t *flags;
74 } client_spec_t, *client_spec;
75
76 int client_count = 0;
77
client(void * arg)78 static void * client(void *arg)
79 {
80 client_spec CAST_FROM_VOIDP(cs, arg);
81 client_count++;
82 if ( verbose ) printf("client[%d]\n", cs->client_number);
83 assert(cs->client_number < MAX_CLIENTS);
84 assert(cs->dir == FORWARD || cs->dir == BACKWARD);
85
86 int r;
87 if ( cs->txnwork & TXN_CREATE ) { r = env->txn_begin(env, NULL, &cs->txn, 0); CKERR(r); }
88
89 DBT key, val;
90 DBT dest_keys[NUM_DBS];
91 DBT dest_vals[NUM_DBS];
92 uint32_t k, v;
93 int n = cs->start;
94
95 for(int which=0;which<NUM_DBS;which++) {
96 dbt_init(&dest_keys[which], NULL, 0);
97 dest_keys[which].flags = DB_DBT_REALLOC;
98
99 dbt_init(&dest_vals[which], NULL, 0);
100 dest_vals[which].flags = DB_DBT_REALLOC;
101 }
102
103 int rr;
104 uint32_t inserts = 0;
105 for (uint32_t i = 0; i < cs->num; i++ ) {
106 DB_TXN *txn;
107 env->txn_begin(env, cs->txn, &txn, 0);
108 k = key_to_put(n, cs->offset);
109 v = generate_val(k, 0);
110 dbt_init(&key, &k, sizeof(k));
111 dbt_init(&val, &v, sizeof(v));
112
113 rr = env_put_multiple_test_no_array(env,
114 cs->dbs[0],
115 txn,
116 &key,
117 &val,
118 NUM_DBS,
119 cs->dbs, // dest dbs
120 dest_keys,
121 dest_vals,
122 cs->flags);
123 if ( rr != 0 ) {
124 if ( verbose ) printf("client[%u] : put_multiple returns %d, i=%u, n=%u, key=%u\n", cs->client_number, rr, i, n, k);
125 r = txn->abort(txn); CKERR(r);
126 break;
127 }
128 r = txn->commit(txn, 0); CKERR(r);
129 // limit the number of inserts per parent transaction to prevent lock escalation
130 inserts++;
131 if ( inserts >= cs->max_inserts_per_txn ) {
132 r = cs->txn->commit(cs->txn, 0); CKERR(r);
133 r = env->txn_begin(env, NULL, &cs->txn, 0); CKERR(r);
134 inserts = 0;
135 }
136 n = ( cs->dir == FORWARD ) ? n + 1 : n - 1;
137 }
138
139 if ( cs->txnwork & TXN_END ) { r = cs->txn->commit(cs->txn, DB_TXN_SYNC); CKERR(r); }
140 if (verbose) printf("client[%d] done\n", cs->client_number);
141
142 for (int which=0; which<NUM_DBS; which++) {
143 toku_free(dest_keys[which].data);
144 toku_free(dest_vals[which].data);
145 }
146
147
148 return 0;
149 }
150
151 toku_pthread_t *client_threads;
152 client_spec_t *client_specs;
153
clients_init(DB ** dbs,uint32_t * flags)154 static void clients_init(DB **dbs, uint32_t *flags)
155 {
156 XMALLOC_N(MAX_CLIENTS, client_threads);
157 XMALLOC_N(MAX_CLIENTS, client_specs);
158
159 client_specs[0].client_number = 0;
160 client_specs[0].start = 0;
161 client_specs[0].num = num_rows;
162 client_specs[0].offset = -1;
163 client_specs[0].dir = FORWARD;
164 client_specs[0].txnwork = TXN_CREATE | TXN_END;
165 client_specs[0].txn = NULL;
166 client_specs[0].max_inserts_per_txn = 1000;
167 client_specs[0].dbs = dbs;
168 client_specs[0].flags = flags;
169
170 client_specs[1].client_number = 1;
171 client_specs[1].start = 0;
172 client_specs[1].num = num_rows;
173 client_specs[1].offset = 1;
174 client_specs[1].dir = FORWARD;
175 client_specs[1].txnwork = TXN_CREATE | TXN_END;
176 client_specs[1].txn = NULL;
177 client_specs[1].max_inserts_per_txn = 100;
178 client_specs[1].dbs = dbs;
179 client_specs[1].flags = flags;
180
181 client_specs[2].client_number = 2;
182 client_specs[2].start = num_rows -1;
183 client_specs[2].num = num_rows;
184 client_specs[2].offset = -2;
185 client_specs[2].dir = BACKWARD;
186 client_specs[2].txnwork = TXN_CREATE | TXN_END;
187 client_specs[2].txn = NULL;
188 client_specs[2].max_inserts_per_txn = 1000;
189 client_specs[2].dbs = dbs;
190 client_specs[2].flags = flags;
191 }
192
clients_cleanup(void)193 static void clients_cleanup(void)
194 {
195 toku_free(client_threads); client_threads = NULL;
196 toku_free(client_specs); client_specs = NULL;
197 }
198
199 // verify results
200 // - read the keys in the primary table, then calculate what keys should exist
201 // in the other DB. Read the other table to verify.
check_results(DB * src,DB * db)202 static int check_results(DB *src, DB *db)
203 {
204 int r;
205 int fail = 0;
206
207 int clients = client_count;
208
209 int max_rows = ( clients + 1 ) * num_rows;
210 unsigned int *db_keys = (unsigned int *) toku_malloc(max_rows * sizeof (unsigned int));
211
212 DBT key, val;
213 unsigned int k=0, v=0;
214 dbt_init(&key, &k, sizeof(unsigned int));
215 dbt_init(&val, &v, sizeof(unsigned int));
216
217 DB_TXN *txn;
218 r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
219
220 DBC *cursor;
221 r = src->cursor(src, txn, &cursor, 0); CKERR(r);
222
223 int which = *(uint32_t*)db->app_private;
224
225 // scan the primary table,
226 // calculate the expected keys in 'db'
227 int row = 0;
228 while ( r != DB_NOTFOUND ) {
229 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
230 if ( r != DB_NOTFOUND ) {
231 k = *((uint32_t *)(key.data));
232 db_keys[row] = twiddle32(k, which);
233 row++;
234 }
235 }
236 if ( verbose ) printf("primary table scanned, contains %d rows\n", row);
237 int primary_rows = row;
238 r = cursor->c_close(cursor); CKERR(r);
239 // sort the expected keys
240 qsort(db_keys, primary_rows, sizeof (unsigned int), uint_cmp);
241
242 if ( verbose > 1 ) {
243 for(int i=0;i<primary_rows;i++) {
244 printf("primary table[%u] = %u\n", i, db_keys[i]);
245 }
246 }
247
248 // scan the indexer-created DB, comparing keys with expected keys
249 // - there should be exactly 'primary_rows' in the new index
250 r = db->cursor(db, txn, &cursor, 0); CKERR(r);
251 for (int i=0;i<primary_rows;i++) {
252 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
253 if ( r == DB_NOTFOUND ) {
254 printf("scan of index finds last row is %d\n", i);
255 }
256 CKERR(r);
257 k = *((uint32_t *)(key.data));
258 if ( db_keys[i] != k ) {
259 if ( verbose ) printf("ERROR expecting key %10u for row %d, found key = %10u\n", db_keys[i],i,k);
260 fail = 1;
261 goto check_results_error;
262 }
263 }
264 // next cursor op should return DB_NOTFOUND
265 r = cursor->c_get(cursor, &key, &val, DB_NEXT);
266 assert(r == DB_NOTFOUND);
267
268 // we're done - cleanup and close
269 check_results_error:
270 r = cursor->c_close(cursor); CKERR(r);
271 toku_free(db_keys);
272 r = txn->commit(txn, 0); CKERR(r);
273 if ( verbose ) {
274 if ( fail ) printf("check_results : fail\n");
275 else printf("check_results : pass\n");
276 }
277 return fail;
278 }
279
test_indexer(DB * src,DB ** dbs)280 static void test_indexer(DB *src, DB **dbs)
281 {
282 int r;
283 DB_TXN *txn;
284 DB_INDEXER *indexer;
285 uint32_t db_flags[NUM_DBS];
286
287
288 if ( verbose ) printf("test_indexer\n");
289 for(int i=0;i<NUM_DBS;i++) {
290 db_flags[i] = 0;
291 }
292 clients_init(dbs, db_flags);
293
294 // create and initialize indexer
295 r = env->txn_begin(env, NULL, &txn, 0);
296 CKERR(r);
297
298 if ( verbose ) printf("test_indexer create_indexer\n");
299 r = env->create_indexer(env, txn, &indexer, src, NUM_DBS-1, &dbs[1], db_flags, 0);
300 CKERR(r);
301 r = indexer->set_error_callback(indexer, NULL, NULL);
302 CKERR(r);
303 r = indexer->set_poll_function(indexer, poll_print, NULL);
304 CKERR(r);
305
306 // start threads doing additional inserts - no lock issues since indexer
307 // already created
308 r = toku_pthread_create(toku_uninstrumented,
309 &client_threads[0],
310 nullptr,
311 client,
312 static_cast<void *>(&client_specs[0]));
313 CKERR(r);
314 r = toku_pthread_create(toku_uninstrumented,
315 &client_threads[1],
316 nullptr,
317 client,
318 static_cast<void *>(&client_specs[1]));
319 CKERR(r);
320 // r = toku_pthread_create(toku_uninstrumented, &client_threads[2], 0,
321 // client, (void *)&client_specs[2]); CKERR(r);
322
323 struct timeval start, now;
324 if (verbose) {
325 printf("test_indexer build\n");
326 gettimeofday(&start,0);
327 }
328 r = indexer->build(indexer);
329 CKERR(r);
330 if ( verbose ) {
331 gettimeofday(&now,0);
332 int duration = (int)(now.tv_sec - start.tv_sec);
333 if ( duration > 0 )
334 printf("test_indexer build : sec = %d\n", duration);
335 }
336
337 void *t0; r = toku_pthread_join(client_threads[0], &t0); CKERR(r);
338 void *t1; r = toku_pthread_join(client_threads[1], &t1); CKERR(r);
339 // void *t2; r = toku_pthread_join(client_threads[2], &t2); CKERR(r);
340
341 if ( verbose ) printf("test_indexer close\n");
342 r = indexer->close(indexer);
343 CKERR(r);
344 r = txn->commit(txn, DB_TXN_SYNC);
345 CKERR(r);
346
347 clients_cleanup();
348
349 if ( verbose ) printf("check_results\n");
350 r = check_results(src, dbs[1]);
351 CKERR(r);
352
353 if ( verbose && (r == 0)) printf("PASS\n");
354 if ( verbose && (r == 0)) printf("test_indexer done\n");
355 }
356
357
run_test(void)358 static void run_test(void)
359 {
360 int r;
361 toku_os_recursive_delete(TOKU_TEST_FILENAME);
362 r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
363 char logname[TOKU_PATH_MAX+1];
364 r = toku_os_mkdir(toku_path_join(logname, 2, TOKU_TEST_FILENAME, "log"), S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
365
366 r = db_env_create(&env, 0); CKERR(r);
367 r = env->set_lg_dir(env, "log"); CKERR(r);
368 r = env->set_default_bt_compare(env, uint_dbt_cmp); CKERR(r);
369 generate_permute_tables();
370 r = env->set_generate_row_callback_for_put(env, put_multiple_generate); CKERR(r);
371 int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG;
372 r = env->open(env, TOKU_TEST_FILENAME, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
373 env->set_errfile(env, stderr);
374 r = env->checkpointing_set_period(env, 0); CKERR(r);
375
376 DBT desc;
377 dbt_init(&desc, "foo", sizeof("foo"));
378 int ids[MAX_DBS];
379 DB *dbs[MAX_DBS];
380 for (int i = 0; i < NUM_DBS; i++) {
381 ids[i] = i;
382 r = db_create(&dbs[i], env, 0); CKERR(r);
383 dbs[i]->app_private = &ids[i];
384 char key_name[32];
385 sprintf(key_name, "key%d", i);
386 r = dbs[i]->open(dbs[i], NULL, key_name, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
387 IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
388 { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
389 });
390 }
391
392 // generate the src DB (do not use put_multiple)
393 DB_TXN *txn;
394 r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
395 r = generate_initial_table(dbs[0], txn, num_rows); CKERR(r);
396 r = txn->commit(txn, DB_TXN_SYNC); CKERR(r);
397
398 // -------------------------- //
399 if (1) test_indexer(dbs[0], dbs);
400 // -------------------------- //
401
402 for(int i=0;i<NUM_DBS;i++) {
403 r = dbs[i]->close(dbs[i], 0); CKERR(r);
404 }
405 r = env->close(env, 0); CKERR(r);
406 }
407
408 // ------------ infrastructure ----------
409
410 static inline void
do_args(int argc,char * const argv[])411 do_args (int argc, char * const argv[]) {
412 const char *progname=argv[0];
413 num_rows = NUM_ROWS;
414 argc--; argv++;
415 while (argc>0) {
416 if (strcmp(argv[0],"-v")==0) {
417 verbose++;
418 } else if (strcmp(argv[0],"-q")==0) {
419 verbose=0;
420 } else if (strcmp(argv[0],"-r")==0) {
421 argc--; argv++;
422 num_rows = atoi(argv[0]);
423 } else {
424 fprintf(stderr, "Usage:\n %s [-v] [-q] [-r rows]\n", progname);
425 exit(1);
426 }
427 argc--; argv++;
428 }
429 }
430
431
test_main(int argc,char * const * argv)432 int test_main(int argc, char * const *argv) {
433 do_args(argc, argv);
434 run_test();
435 return 0;
436 }
437
438
439 /*
440 * Please ignore this code - I don't think I'm going to use it, but I don't want to lose it
441 * I will delete this later - Dave
442
443 if ( rr != 0 ) { // possible lock deadlock
444 if (verbose > 1) {
445 printf("client[%u] : put_multiple returns %d, i=%u, n=%u, key=%u\n", cs->client_number, rr, i, n, k);
446 if ( verbose > 2 ) print_engine_status(env);
447 }
448 // abort the transaction, freeing up locks associated with previous put_multiples
449 if ( verbose > 1 ) printf("start txn abort\n");
450 r = txn->abort(txn); CKERR(r);
451 if ( verbose > 1 ) printf(" txn aborted\n");
452 sleep(2 + cs->client_number);
453 // now retry, waiting until the deadlock resolves itself
454 r = env->txn_begin(env, cs->txn, &txn, 0); CKERR(r);
455 if ( verbose > 1 ) printf("txn begin\n");
456 while ( rr != 0 ) {
457 rr = env->put_multiple(env,
458 cs->dbs[0],
459 txn,
460 &key,
461 &val,
462 NUM_DBS,
463 cs->dbs, // dest dbs
464 dest_keys,
465 dest_vals,
466 cs->flags,
467 NULL);
468 if ( rr != 0 ) {
469 if ( verbose ) printf("client[%u] : put_multiple returns %d, i=%u, n=%u, key=%u\n", cs->client_number, rr, i, n, k);
470 if ( verbose ) printf("start txn abort\n");
471 r = txn->abort(txn); CKERR(r);
472 if ( verbose ) printf(" txn aborted\n");
473 sleep(2 + cs->client_number);
474 r = env->txn_begin(env, cs->txn, &txn, 0); CKERR(r);
475 if ( verbose ) printf("txn begin\n");
476 }
477 }
478 */
479