1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 #include "toku_pthread.h"
41 #include <db.h>
42 #include <sys/stat.h>
43 #include "key-val.h"
44 
45 enum {NUM_INDEXER_INDEXES=1};
46 static const int NUM_DBS = NUM_INDEXER_INDEXES + 1; // 1 for source DB
47 static const int NUM_ROWS = 10000;
48 int num_rows;
49 typedef enum {FORWARD = 0, BACKWARD} Direction;
50 typedef enum {TXN_NONE = 0, TXN_CREATE = 1, TXN_END = 2} TxnWork;
51 
52 DB_ENV *env;
53 
54 /*
55  *  client() is a routine intended to be run in a separate thread from index creation
56  *     - it takes a client spec which describes work to be done
57  *     - direction : move to ever increasing or decreasing rows
58  *     - txnwork : whether a transaction should be created or closed within the client
59  *         (allows client transaction to start before or during index creation,
60  *          and to close during or after index creation)
61  */
62 
63 typedef struct {
64     uint32_t num; // number of rows to write
65     uint32_t start; // approximate start row
66     int      offset; // offset from stride (= MAX_CLIENTS)
67     Direction dir;
68     int txnwork;
69     DB_TXN *txn;
70     uint32_t max_inserts_per_txn;  // this is for the parent transaction
71     DB **dbs;
72     int client_number;
73     uint32_t *flags;
74 } client_spec_t, *client_spec;
75 
76 int client_count = 0;
77 
client(void * arg)78 static void * client(void *arg)
79 {
80     client_spec CAST_FROM_VOIDP(cs, arg);
81     client_count++;
82     if ( verbose ) printf("client[%d]\n", cs->client_number);
83     assert(cs->client_number < MAX_CLIENTS);
84     assert(cs->dir == FORWARD || cs->dir == BACKWARD);
85 
86     int r;
87     if ( cs->txnwork & TXN_CREATE ) { r = env->txn_begin(env, NULL, &cs->txn, 0);  CKERR(r); }
88 
89     DBT key, val;
90     DBT dest_keys[NUM_DBS];
91     DBT dest_vals[NUM_DBS];
92     uint32_t k, v;
93     int n = cs->start;
94 
95     for(int which=0;which<NUM_DBS;which++) {
96         dbt_init(&dest_keys[which], NULL, 0);
97         dest_keys[which].flags = DB_DBT_REALLOC;
98 
99         dbt_init(&dest_vals[which], NULL, 0);
100         dest_vals[which].flags = DB_DBT_REALLOC;
101     }
102 
103     int rr;
104     uint32_t inserts = 0;
105     for (uint32_t i = 0; i < cs->num; i++ ) {
106         DB_TXN *txn;
107         env->txn_begin(env, cs->txn, &txn, 0);
108         k = key_to_put(n, cs->offset);
109         v = generate_val(k, 0);
110         dbt_init(&key, &k, sizeof(k));
111         dbt_init(&val, &v, sizeof(v));
112 
113         rr = env_put_multiple_test_no_array(env,
114                                cs->dbs[0],
115                                txn,
116                                &key,
117                                &val,
118                                NUM_DBS,
119                                cs->dbs, // dest dbs
120                                dest_keys,
121                                dest_vals,
122                                cs->flags);
123         if ( rr != 0 ) {
124             if ( verbose ) printf("client[%u] : put_multiple returns %d, i=%u, n=%u, key=%u\n", cs->client_number, rr, i, n, k);
125             r = txn->abort(txn); CKERR(r);
126             break;
127         }
128         r = txn->commit(txn, 0); CKERR(r);
129         // limit the number of inserts per parent transaction to prevent lock escalation
130         inserts++;
131         if ( inserts >= cs->max_inserts_per_txn ) {
132             r = cs->txn->commit(cs->txn, 0); CKERR(r);
133             r = env->txn_begin(env, NULL, &cs->txn, 0); CKERR(r);
134             inserts = 0;
135         }
136         n = ( cs->dir == FORWARD ) ? n + 1 : n - 1;
137     }
138 
139     if ( cs->txnwork & TXN_END )    { r = cs->txn->commit(cs->txn, DB_TXN_SYNC);       CKERR(r); }
140     if (verbose) printf("client[%d] done\n", cs->client_number);
141 
142     for (int which=0; which<NUM_DBS; which++) {
143         toku_free(dest_keys[which].data);
144         toku_free(dest_vals[which].data);
145     }
146 
147 
148     return 0;
149 }
150 
151 toku_pthread_t *client_threads;
152 client_spec_t  *client_specs;
153 
clients_init(DB ** dbs,uint32_t * flags)154 static void clients_init(DB **dbs, uint32_t *flags)
155 {
156     XMALLOC_N(MAX_CLIENTS, client_threads);
157     XMALLOC_N(MAX_CLIENTS, client_specs);
158 
159     client_specs[0].client_number = 0;
160     client_specs[0].start         = 0;
161     client_specs[0].num           = num_rows;
162     client_specs[0].offset        = -1;
163     client_specs[0].dir           = FORWARD;
164     client_specs[0].txnwork       = TXN_CREATE | TXN_END;
165     client_specs[0].txn           = NULL;
166     client_specs[0].max_inserts_per_txn = 1000;
167     client_specs[0].dbs           = dbs;
168     client_specs[0].flags         = flags;
169 
170     client_specs[1].client_number = 1;
171     client_specs[1].start         = 0;
172     client_specs[1].num           = num_rows;
173     client_specs[1].offset        = 1;
174     client_specs[1].dir           = FORWARD;
175     client_specs[1].txnwork       = TXN_CREATE | TXN_END;
176     client_specs[1].txn           = NULL;
177     client_specs[1].max_inserts_per_txn = 100;
178     client_specs[1].dbs           = dbs;
179     client_specs[1].flags         = flags;
180 
181     client_specs[2].client_number = 2;
182     client_specs[2].start         = num_rows -1;
183     client_specs[2].num           = num_rows;
184     client_specs[2].offset        = -2;
185     client_specs[2].dir           = BACKWARD;
186     client_specs[2].txnwork       = TXN_CREATE | TXN_END;
187     client_specs[2].txn           = NULL;
188     client_specs[2].max_inserts_per_txn = 1000;
189     client_specs[2].dbs           = dbs;
190     client_specs[2].flags         = flags;
191 }
192 
clients_cleanup(void)193 static void clients_cleanup(void)
194 {
195     toku_free(client_threads);   client_threads = NULL;
196     toku_free(client_specs);       client_specs = NULL;
197 }
198 
199 // verify results
200 //  - read the keys in the primary table, then calculate what keys should exist
201 //     in the other DB.  Read the other table to verify.
check_results(DB * src,DB * db)202 static int check_results(DB *src, DB *db)
203 {
204     int r;
205     int fail = 0;
206 
207     int clients = client_count;
208 
209     int max_rows = ( clients + 1 ) * num_rows;
210     unsigned int *db_keys = (unsigned int *) toku_malloc(max_rows * sizeof (unsigned int));
211 
212     DBT key, val;
213     unsigned int k=0, v=0;
214     dbt_init(&key, &k, sizeof(unsigned int));
215     dbt_init(&val, &v, sizeof(unsigned int));
216 
217     DB_TXN *txn;
218     r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
219 
220     DBC *cursor;
221     r = src->cursor(src, txn, &cursor, 0);  CKERR(r);
222 
223     int which = *(uint32_t*)db->app_private;
224 
225     // scan the primary table,
226     // calculate the expected keys in 'db'
227     int row = 0;
228     while ( r != DB_NOTFOUND ) {
229         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
230         if ( r != DB_NOTFOUND ) {
231             k = *((uint32_t *)(key.data));
232             db_keys[row] = twiddle32(k, which);
233             row++;
234         }
235     }
236     if ( verbose ) printf("primary table scanned, contains %d rows\n", row);
237     int primary_rows = row;
238     r = cursor->c_close(cursor); CKERR(r);
239     // sort the expected keys
240     qsort(db_keys, primary_rows, sizeof (unsigned int), uint_cmp);
241 
242     if ( verbose > 1 ) {
243         for(int i=0;i<primary_rows;i++) {
244             printf("primary table[%u] = %u\n", i, db_keys[i]);
245         }
246     }
247 
248     // scan the indexer-created DB, comparing keys with expected keys
249     //   - there should be exactly 'primary_rows' in the new index
250     r = db->cursor(db, txn, &cursor, 0); CKERR(r);
251     for (int i=0;i<primary_rows;i++) {
252         r = cursor->c_get(cursor, &key, &val, DB_NEXT);
253         if ( r == DB_NOTFOUND ) {
254             printf("scan of index finds last row is %d\n", i);
255         }
256         CKERR(r);
257         k = *((uint32_t *)(key.data));
258         if ( db_keys[i] != k ) {
259             if ( verbose ) printf("ERROR expecting key %10u for row %d, found key = %10u\n", db_keys[i],i,k);
260             fail = 1;
261             goto check_results_error;
262         }
263     }
264     // next cursor op should return DB_NOTFOUND
265     r = cursor->c_get(cursor, &key, &val, DB_NEXT);
266     assert(r == DB_NOTFOUND);
267 
268     // we're done - cleanup and close
269 check_results_error:
270     r = cursor->c_close(cursor); CKERR(r);
271     toku_free(db_keys);
272     r = txn->commit(txn, 0);  CKERR(r);
273     if ( verbose ) {
274         if ( fail ) printf("check_results : fail\n");
275         else printf("check_results : pass\n");
276     }
277     return fail;
278 }
279 
test_indexer(DB * src,DB ** dbs)280 static void test_indexer(DB *src, DB **dbs)
281 {
282     int r;
283     DB_TXN    *txn;
284     DB_INDEXER *indexer;
285     uint32_t db_flags[NUM_DBS];
286 
287 
288     if ( verbose ) printf("test_indexer\n");
289     for(int i=0;i<NUM_DBS;i++) {
290         db_flags[i] = 0;
291     }
292     clients_init(dbs, db_flags);
293 
294     // create and initialize indexer
295     r = env->txn_begin(env, NULL, &txn, 0);
296     CKERR(r);
297 
298     if ( verbose ) printf("test_indexer create_indexer\n");
299     r = env->create_indexer(env, txn, &indexer, src, NUM_DBS-1, &dbs[1], db_flags, 0);
300     CKERR(r);
301     r = indexer->set_error_callback(indexer, NULL, NULL);
302     CKERR(r);
303     r = indexer->set_poll_function(indexer, poll_print, NULL);
304     CKERR(r);
305 
306     // start threads doing additional inserts - no lock issues since indexer
307     // already created
308     r = toku_pthread_create(toku_uninstrumented,
309                             &client_threads[0],
310                             nullptr,
311                             client,
312                             static_cast<void *>(&client_specs[0]));
313     CKERR(r);
314     r = toku_pthread_create(toku_uninstrumented,
315                             &client_threads[1],
316                             nullptr,
317                             client,
318                             static_cast<void *>(&client_specs[1]));
319     CKERR(r);
320     //    r = toku_pthread_create(toku_uninstrumented, &client_threads[2], 0,
321     //    client, (void *)&client_specs[2]);  CKERR(r);
322 
323     struct timeval start, now;
324     if (verbose) {
325         printf("test_indexer build\n");
326         gettimeofday(&start,0);
327     }
328     r = indexer->build(indexer);
329     CKERR(r);
330     if ( verbose ) {
331         gettimeofday(&now,0);
332         int duration = (int)(now.tv_sec - start.tv_sec);
333         if ( duration > 0 )
334             printf("test_indexer build : sec = %d\n", duration);
335     }
336 
337     void *t0;      r = toku_pthread_join(client_threads[0], &t0);     CKERR(r);
338     void *t1;      r = toku_pthread_join(client_threads[1], &t1);     CKERR(r);
339 //    void *t2;      r = toku_pthread_join(client_threads[2], &t2);     CKERR(r);
340 
341     if ( verbose ) printf("test_indexer close\n");
342     r = indexer->close(indexer);
343     CKERR(r);
344     r = txn->commit(txn, DB_TXN_SYNC);
345     CKERR(r);
346 
347     clients_cleanup();
348 
349     if ( verbose ) printf("check_results\n");
350     r = check_results(src, dbs[1]);
351     CKERR(r);
352 
353     if ( verbose && (r == 0)) printf("PASS\n");
354     if ( verbose && (r == 0)) printf("test_indexer done\n");
355 }
356 
357 
run_test(void)358 static void run_test(void)
359 {
360     int r;
361     toku_os_recursive_delete(TOKU_TEST_FILENAME);
362     r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO);                          CKERR(r);
363     char logname[TOKU_PATH_MAX+1];
364     r = toku_os_mkdir(toku_path_join(logname, 2, TOKU_TEST_FILENAME, "log"), S_IRWXU+S_IRWXG+S_IRWXO);                   CKERR(r);
365 
366     r = db_env_create(&env, 0);                                                  CKERR(r);
367     r = env->set_lg_dir(env, "log");                                             CKERR(r);
368     r = env->set_default_bt_compare(env, uint_dbt_cmp);                           CKERR(r);
369     generate_permute_tables();
370     r = env->set_generate_row_callback_for_put(env, put_multiple_generate);      CKERR(r);
371     int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG;
372     r = env->open(env, TOKU_TEST_FILENAME, envflags, S_IRWXU+S_IRWXG+S_IRWXO);               CKERR(r);
373     env->set_errfile(env, stderr);
374     r = env->checkpointing_set_period(env, 0);                                   CKERR(r);
375 
376     DBT desc;
377     dbt_init(&desc, "foo", sizeof("foo"));
378     int ids[MAX_DBS];
379     DB *dbs[MAX_DBS];
380     for (int i = 0; i < NUM_DBS; i++) {
381         ids[i] = i;
382         r = db_create(&dbs[i], env, 0); CKERR(r);
383         dbs[i]->app_private = &ids[i];
384         char key_name[32];
385         sprintf(key_name, "key%d", i);
386         r = dbs[i]->open(dbs[i], NULL, key_name, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666);   CKERR(r);
387         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
388                 { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
389         });
390     }
391 
392     // generate the src DB (do not use put_multiple)
393     DB_TXN *txn;
394     r = env->txn_begin(env, NULL, &txn, 0);                                      CKERR(r);
395     r = generate_initial_table(dbs[0], txn, num_rows);                           CKERR(r);
396     r = txn->commit(txn, DB_TXN_SYNC);                                           CKERR(r);
397 
398     // -------------------------- //
399     if (1) test_indexer(dbs[0], dbs);
400     // -------------------------- //
401 
402     for(int i=0;i<NUM_DBS;i++) {
403         r = dbs[i]->close(dbs[i], 0);                                            CKERR(r);
404     }
405     r = env->close(env, 0);                                                      CKERR(r);
406 }
407 
408 // ------------ infrastructure ----------
409 
410 static inline void
do_args(int argc,char * const argv[])411 do_args (int argc, char * const argv[]) {
412     const char *progname=argv[0];
413     num_rows = NUM_ROWS;
414     argc--; argv++;
415     while (argc>0) {
416 	if (strcmp(argv[0],"-v")==0) {
417 	    verbose++;
418 	} else if (strcmp(argv[0],"-q")==0) {
419 	    verbose=0;
420         } else if (strcmp(argv[0],"-r")==0) {
421             argc--; argv++;
422             num_rows = atoi(argv[0]);
423 	} else {
424 	    fprintf(stderr, "Usage:\n %s [-v] [-q] [-r rows]\n", progname);
425 	    exit(1);
426 	}
427 	argc--; argv++;
428     }
429 }
430 
431 
test_main(int argc,char * const * argv)432 int test_main(int argc, char * const *argv) {
433     do_args(argc, argv);
434     run_test();
435     return 0;
436 }
437 
438 
439 /*
440  * Please ignore this code - I don't think I'm going to use it, but I don't want to lose it
441  *   I will delete this later - Dave
442 
443         if ( rr != 0 ) {  // possible lock deadlock
444             if (verbose > 1) {
445                 printf("client[%u] : put_multiple returns %d, i=%u, n=%u, key=%u\n", cs->client_number, rr, i, n, k);
446                 if ( verbose > 2 ) print_engine_status(env);
447             }
448             // abort the transaction, freeing up locks associated with previous put_multiples
449             if ( verbose > 1 ) printf("start txn abort\n");
450             r = txn->abort(txn); CKERR(r);
451             if ( verbose > 1 ) printf("      txn aborted\n");
452             sleep(2 + cs->client_number);
453             // now retry, waiting until the deadlock resolves itself
454             r = env->txn_begin(env, cs->txn, &txn, 0); CKERR(r);
455             if ( verbose > 1 ) printf("txn begin\n");
456             while ( rr != 0 ) {
457                 rr = env->put_multiple(env,
458                                        cs->dbs[0],
459                                        txn,
460                                        &key,
461                                        &val,
462                                        NUM_DBS,
463                                        cs->dbs, // dest dbs
464                                        dest_keys,
465                                        dest_vals,
466                                        cs->flags,
467                                        NULL);
468                 if ( rr != 0 ) {
469                     if ( verbose ) printf("client[%u] : put_multiple returns %d, i=%u, n=%u, key=%u\n", cs->client_number, rr, i, n, k);
470                     if ( verbose ) printf("start txn abort\n");
471                     r = txn->abort(txn); CKERR(r);
472                     if ( verbose ) printf("      txn aborted\n");
473                     sleep(2 + cs->client_number);
474                     r = env->txn_begin(env, cs->txn, &txn, 0); CKERR(r);
475                     if ( verbose ) printf("txn begin\n");
476                 }
477             }
478  */
479