1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <db.h>
40 #include <portability/toku_atomic.h>
41 
42 #include "test.h"
43 #include "threaded_stress_test_helpers.h"
44 
45 //
46 // This test tries to emulate iibench at the ydb layer.
47 //
48 // The schema is simple:
49 // 8 byte primary key
50 // 8 byte key A
51 // 8 byte key B
52 // 8 byte key C
53 //
54 // There's one primary DB for the pk and three secondary DBs.
55 //
56 // The primary key stores the other columns as the value.
57 // The secondary keys have the primary key appended to them.
58 //
59 
60 static const size_t iibench_secondary_key_size = 16;
61 
62 struct iibench_row {
63     uint64_t pk;
64     int64_t a;
65     int64_t b;
66     int64_t c;
67 };
68 
69 struct iibench_secondary_row {
70     int64_t column;
71     uint64_t pk;
72 };
73 
hash(uint64_t key)74 static int64_t hash(uint64_t key) {
75     uint64_t hash = 0;
76     uint8_t *buf = (uint8_t *) &key;
77     for (int i = 0; i < 8; i++) {
78         hash += (((buf[i] + 1) * 17) & 0xFF) << (i * 8);
79     }
80     return hash;
81 }
82 
iibench_generate_column_by_pk(int64_t pk,int db_idx)83 static int64_t iibench_generate_column_by_pk(int64_t pk, int db_idx) {
84     invariant(db_idx > 0);
85     return hash(pk * db_idx);
86 }
87 
iibench_generate_row(int64_t pk,struct iibench_row * row)88 static void iibench_generate_row(int64_t pk, struct iibench_row *row) {
89     row->a = iibench_generate_column_by_pk(pk, 1);
90     row->b = iibench_generate_column_by_pk(pk, 2);
91     row->c = iibench_generate_column_by_pk(pk, 3);
92 }
93 
iibench_parse_row(const DBT * key,const DBT * val,struct iibench_row * row)94 static void iibench_parse_row(const DBT *key, const DBT *val, struct iibench_row *row) {
95     char *CAST_FROM_VOIDP(val_buf, val->data);
96     invariant(key->size == 8);
97     invariant(val->size == 24);
98     memcpy(&row->pk, key->data, 8);
99     memcpy(&row->a, val_buf + 0, 8);
100     memcpy(&row->b, val_buf + 8, 8);
101     memcpy(&row->c, val_buf + 16, 8);
102 }
103 
UU()104 static void UU() iibench_verify_row(const struct iibench_row *row) {
105     struct iibench_row expected_row;
106     iibench_generate_row(row->pk, &expected_row);
107     invariant(row->a == expected_row.a);
108     invariant(row->b == expected_row.b);
109     invariant(row->c == expected_row.c);
110 }
111 
iibench_parse_secondary_row(const DBT * key,const DBT * val,struct iibench_secondary_row * row)112 static void iibench_parse_secondary_row(const DBT *key, const DBT *val, struct iibench_secondary_row *row) {
113     char *CAST_FROM_VOIDP(key_buf, key->data);
114     invariant(key->size == iibench_secondary_key_size);
115     invariant(val->size == 0);
116     memcpy(&row->column, key_buf + 0, 8);
117     memcpy(&row->pk, key_buf + 8, 8);
118 }
119 
UU()120 static void UU() iibench_verify_secondary_row(const struct iibench_secondary_row *row, int db_idx) {
121     int64_t expected = iibench_generate_column_by_pk(row->pk, db_idx);
122     invariant(row->column == expected);
123 }
124 
iibench_fill_key_buf(uint64_t pk,int64_t * buf)125 static void iibench_fill_key_buf(uint64_t pk, int64_t *buf) {
126     memcpy(&buf[0], &pk, 8);
127 }
128 
iibench_fill_val_buf(uint64_t pk,int64_t * buf)129 static void iibench_fill_val_buf(uint64_t pk, int64_t *buf) {
130     struct iibench_row row;
131     iibench_generate_row(pk, &row);
132     memcpy(&buf[0], &row.a, sizeof(row.a));
133     memcpy(&buf[1], &row.b, sizeof(row.b));
134     memcpy(&buf[2], &row.c, sizeof(row.c));
135 }
136 
iibench_get_db_idx(DB * db)137 static int iibench_get_db_idx(DB *db) {
138     DESCRIPTOR desc = db->cmp_descriptor;
139     invariant_notnull(desc->dbt.data);
140     invariant(desc->dbt.size == sizeof(int));
141     int db_idx;
142     memcpy(&db_idx, desc->dbt.data, desc->dbt.size);
143     return db_idx;
144 }
145 
iibench_rangequery_cb(DB * db,const DBT * key,const DBT * val,void * extra)146 static void iibench_rangequery_cb(DB *db, const DBT *key, const DBT *val, void *extra) {
147     invariant_null(extra);
148     const int db_idx = iibench_get_db_idx(db);
149     if (db_idx == 0) {
150         struct iibench_row row;
151         iibench_parse_row(key, val, &row);
152         iibench_verify_row(&row);
153     } else {
154         struct iibench_secondary_row row;
155         iibench_parse_secondary_row(key, val, &row);
156         iibench_verify_secondary_row(&row, db_idx);
157     }
158 }
159 
160 struct iibench_put_op_extra {
161     uint64_t autoincrement;
162 };
163 
UU()164 static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) {
165     const int num_dbs = arg->cli->num_DBs;
166     DB **dbs = arg->dbp;
167     DB_ENV *env = arg->env;
168     DBT_ARRAY mult_key_dbt[num_dbs];
169     DBT_ARRAY mult_val_dbt[num_dbs];
170     uint32_t mult_put_flags[num_dbs];
171 
172     // The first index is unique with serial autoincrement keys.
173     // The rest are have keys generated with this thread's random data.
174     mult_put_flags[0] = get_put_flags(arg->cli) |
175         // If the table was already created, don't check for uniqueness.
176         (arg->cli->num_elements > 0 ? 0 : DB_NOOVERWRITE);
177     for (int i = 0; i < num_dbs; i++) {
178         toku_dbt_array_init(&mult_key_dbt[i], 1);
179         toku_dbt_array_init(&mult_val_dbt[i], 1);
180         mult_put_flags[i] = get_put_flags(arg->cli);
181     }
182     mult_key_dbt[0].dbts[0].flags = 0;
183     mult_val_dbt[0].dbts[0].flags = 0;
184 
185     int r = 0;
186 
187     uint64_t puts_to_increment = 0;
188     for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
189         struct iibench_put_op_extra *CAST_FROM_VOIDP(info, operation_extra);
190 
191         // Get a random primary key, generate secondary key columns in valbuf
192         uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1);
193         if (arg->bounded_element_range && arg->cli->num_elements > 0) {
194             pk = pk % arg->cli->num_elements;
195         }
196         int64_t keybuf[1];
197         int64_t valbuf[3];
198         iibench_fill_key_buf(pk, keybuf);
199         iibench_fill_val_buf(pk, valbuf);
200         dbt_init(&mult_key_dbt[0].dbts[0], keybuf, sizeof keybuf);
201         dbt_init(&mult_val_dbt[0].dbts[0], valbuf, sizeof valbuf);
202 
203         r = env->put_multiple(
204             env,
205             dbs[0], // source db.
206             txn,
207             &mult_key_dbt[0].dbts[0], // source db key
208             &mult_val_dbt[0].dbts[0], // source db value
209             num_dbs, // total number of dbs
210             dbs, // array of dbs
211             mult_key_dbt, // array of keys
212             mult_val_dbt, // array of values
213             mult_put_flags // array of flags
214             );
215         if (r != 0) {
216             goto cleanup;
217         }
218         puts_to_increment++;
219         if (puts_to_increment == 100) {
220             increment_counter(stats_extra, PUTS, puts_to_increment);
221             puts_to_increment = 0;
222         }
223     }
224 
225 cleanup:
226     for (int i = 0; i < num_dbs; i++) {
227         toku_dbt_array_destroy(&mult_key_dbt[i]);
228         toku_dbt_array_destroy(&mult_val_dbt[i]);
229     }
230     return r;
231 }
232 
iibench_generate_row_for_put(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * UU (src_key),const DBT * src_val)233 static int iibench_generate_row_for_put(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *UU(src_key), const DBT *src_val) {
234     toku_dbt_array_resize(dest_keys, 1);
235     toku_dbt_array_resize(dest_vals, 1);
236     DBT *dest_key = &dest_keys->dbts[0];
237     DBT *dest_val = &dest_vals->dbts[0];
238 
239     invariant(src_db != dest_db);
240     // 8 byte primary key, REALLOC secondary key
241     invariant_notnull(src_key->data);
242     invariant(src_key->size == 8);
243     invariant(dest_key->flags == DB_DBT_REALLOC);
244     // Expand the secondary key data buffer if necessary
245     if (dest_key->size != iibench_secondary_key_size) {
246         dest_key->data = toku_xrealloc(dest_key->data, iibench_secondary_key_size);
247         dest_key->size = iibench_secondary_key_size;
248     }
249 
250     // Get the db index from the descriptor. This is a secondary index
251     // so it has to be greater than zero (which would be the pk). Then
252     // grab the appropriate secondary key from the source val, which is
253     // an array of the 3 columns, so we have to subtract 1 from the index.
254     const int db_idx = iibench_get_db_idx(dest_db);
255     int64_t *CAST_FROM_VOIDP(columns, src_val->data);
256     int64_t secondary_key = columns[db_idx - 1];
257 
258     // First write down the secondary key, then the primary key (in src_key)
259     int64_t *CAST_FROM_VOIDP(dest_key_buf, dest_key->data);
260     memcpy(&dest_key_buf[0], &secondary_key, sizeof(secondary_key));
261     memcpy(&dest_key_buf[1], src_key->data, src_key->size);
262     dest_val->data = nullptr;
263     dest_val->size = 0;
264     return 0;
265 }
266 
267 // After each DB opens, set the descriptor to store the DB idx value.
268 // Close and reopen the DB so we can use db->cmp_descriptor during comparisons.
iibench_set_descriptor_after_db_opens(DB_ENV * env,DB * db,int idx,reopen_db_fn reopen,struct cli_args * cli_args)269 static DB *iibench_set_descriptor_after_db_opens(DB_ENV *env, DB *db, int idx, reopen_db_fn reopen, struct cli_args *cli_args) {
270     int r;
271     DBT desc_dbt;
272     desc_dbt.data = &idx;
273     desc_dbt.size = sizeof(idx);
274     desc_dbt.ulen = 0;
275     desc_dbt.flags = 0;
276     r = db->change_descriptor(db, nullptr, &desc_dbt, 0); CKERR(r);
277     r = db->close(db, 0); CKERR(r);
278     r = db_create(&db, env, 0); CKERR(r);
279     reopen(db, idx, cli_args);
280     return db;
281 }
282 
iibench_compare_keys(DB * db,const DBT * a,const DBT * b)283 static int iibench_compare_keys(DB *db, const DBT *a, const DBT *b) {
284     const int db_idx = iibench_get_db_idx(db);
285     if (db_idx == 0) {
286         invariant(a->size == 8);
287         invariant(b->size == 8);
288         uint64_t x = *(uint64_t *) a->data;
289         uint64_t y = *(uint64_t *) b->data;
290         if (x < y) {
291             return -1;
292         } else if (x == y) {
293             return 0;
294         } else {
295             return 1;
296         }
297     } else {
298         invariant(a->size == 16);
299         invariant(b->size == 16);
300         int64_t x = *(int64_t *) a->data;
301         int64_t y = *(int64_t *) b->data;
302         uint64_t pk_x = *(uint64_t *) (((char *) a->data) + 8);
303         uint64_t pk_y = *(uint64_t *) (((char *) b->data) + 8);
304         if (x < y) {
305             return -1;
306         } else if (x == y) {
307             if (pk_x < pk_y) {
308                 return -1;
309             } else if (pk_x == pk_y) {
310                 return 0;
311             } else {
312                 return 1;
313             }
314         } else {
315             return 1;
316         }
317     }
318 }
319 
iibench_rangequery_db(DB * db,DB_TXN * txn,ARG arg,uint64_t max_pk)320 static void iibench_rangequery_db(DB *db, DB_TXN *txn, ARG arg, uint64_t max_pk) {
321     const int limit = arg->cli->range_query_limit;
322 
323     int r;
324     DBC *cursor;
325 
326     // Get a random key no greater than max pk
327     DBT start_key, end_key;
328     uint64_t start_k = myrandom_r(arg->random_data) % (max_pk + 1);
329     uint64_t end_k = start_k + limit;
330     dbt_init(&start_key, &start_k, 8);
331     dbt_init(&end_key, &end_k, 8);
332 
333     r = db->cursor(db, txn, &cursor, 0); CKERR(r);
334     r = cursor->c_set_bounds(cursor, &start_key, &end_key, true, 0); CKERR(r);
335     struct rangequery_cb_extra extra = {
336         .rows_read = 0,
337         .limit = limit,
338         .cb = iibench_rangequery_cb,
339         .db = db,
340         .cb_extra = nullptr,
341     };
342     r = cursor->c_getf_set(cursor, 0, &start_key, rangequery_cb, &extra);
343     while (r == 0 && extra.rows_read < extra.limit && run_test) {
344         r = cursor->c_getf_next(cursor, 0, rangequery_cb, &extra);
345     }
346 
347     r = cursor->c_close(cursor); CKERR(r);
348 }
349 
350 // Do a range query over the primary index, verifying the contents of the rows
iibench_rangequery_op(DB_TXN * txn,ARG arg,void * operation_extra,void * stats_extra)351 static int iibench_rangequery_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) {
352     struct iibench_put_op_extra *CAST_FROM_VOIDP(info, operation_extra);
353     DB *db = arg->dbp[0];
354 
355     // Assume the max PK is the table size. If it isn't specified, do a
356     // safe read of the current autoincrement key from the put thread.
357     uint64_t max_pk = arg->cli->num_elements;
358     if (max_pk == 0) {
359         max_pk = toku_sync_fetch_and_add(&info->autoincrement, 0);
360     }
361     iibench_rangequery_db(db, txn, arg, max_pk);
362     increment_counter(stats_extra, PTQUERIES, 1);
363     return 0;
364 }
365 
iibench_fill_tables(DB_ENV * env,DB ** dbs,struct cli_args * cli_args,bool UU (fill_with_zeroes))366 static int iibench_fill_tables(DB_ENV *env, DB **dbs, struct cli_args *cli_args, bool UU(fill_with_zeroes)) {
367     const int num_dbs = cli_args->num_DBs;
368     int r = 0;
369 
370     DB_TXN *txn;
371     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
372 
373     DB_LOADER *loader;
374     uint32_t db_flags[num_dbs];
375     uint32_t dbt_flags[num_dbs];
376     for (int i = 0; i < num_dbs; i++) {
377         db_flags[i] = DB_PRELOCKED_WRITE;
378         dbt_flags[i] = DB_DBT_REALLOC;
379     }
380 
381     r = env->create_loader(env, txn, &loader, dbs[0], num_dbs, dbs, db_flags, dbt_flags, 0); CKERR(r);
382     for (int i = 0; i < cli_args->num_elements; i++) {
383         DBT key, val;
384         uint64_t pk = i;
385         int64_t keybuf[1];
386         int64_t valbuf[3];
387         iibench_fill_key_buf(pk, keybuf);
388         iibench_fill_val_buf(pk, valbuf);
389         dbt_init(&key, keybuf, sizeof keybuf);
390         dbt_init(&val, valbuf, sizeof valbuf);
391         r = loader->put(loader, &key, &val); CKERR(r);
392         if (verbose && i > 0 && i % 10000 == 0) {
393             report_overall_fill_table_progress(cli_args, 10000);
394         }
395     }
396     r = loader->close(loader); CKERR(r);
397 
398     r = txn->commit(txn, 0); CKERR(r);
399     return 0;
400 }
401 
402 static void
stress_table(DB_ENV * env,DB ** dbs,struct cli_args * cli_args)403 stress_table(DB_ENV* env, DB **dbs, struct cli_args *cli_args) {
404     if (verbose) printf("starting creation of pthreads\n");
405     const int num_threads = cli_args->num_put_threads + cli_args->num_ptquery_threads;
406     struct arg myargs[num_threads];
407 
408     // Put threads do iibench-like inserts with an auto-increment primary key
409     // Query threads do range queries of a certain size, verifying row contents.
410 
411     struct iibench_put_op_extra put_extra = {
412         .autoincrement = 0
413     };
414     for (int i = 0; i < num_threads; i++) {
415         arg_init(&myargs[i], dbs, env, cli_args);
416         if (i < cli_args->num_put_threads) {
417             myargs[i].operation = iibench_put_op;
418             myargs[i].operation_extra = &put_extra;
419         } else {
420             myargs[i].operation = iibench_rangequery_op;
421             myargs[i].operation_extra = &put_extra;
422             myargs[i].txn_flags |= DB_TXN_READ_ONLY;
423             myargs[i].sleep_ms = 1000; // 1 second between range queries
424         }
425     }
426     const bool crash_at_end = false;
427     run_workers(myargs, num_threads, cli_args->num_seconds, crash_at_end, cli_args);
428 }
429 
test_main(int argc,char * const argv[])430 int test_main(int argc, char *const argv[]) {
431     struct cli_args args = get_default_args_for_perf();
432     args.num_elements = 0;  // want to start with empty DBs
433     // Puts per transaction is configurable. It defaults to 1k.
434     args.txn_size = 1000;
435     // Default to one writer on 4 indexes (pk + 3 secondaries), no readers.
436     args.num_DBs = 4;
437     args.num_put_threads = 1;
438     args.num_ptquery_threads = 0;
439     parse_stress_test_args(argc, argv, &args);
440     // The schema is not configurable. Silently ignore whatever was passed in.
441     args.key_size = 8;
442     args.val_size = 32;
443     // when there are multiple threads, its valid for two of them to
444     // generate the same key and one of them fail with DB_LOCK_NOTGRANTED
445     if (args.num_put_threads > 1) {
446         args.crash_on_operation_failure = false;
447     }
448     args.env_args.generate_put_callback = iibench_generate_row_for_put;
449     after_db_open_hook = iibench_set_descriptor_after_db_opens;
450     fill_tables = iibench_fill_tables;
451     perf_test_main_with_cmp(&args, iibench_compare_keys);
452     return 0;
453 }
454