1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include <db.h>
40 #include <portability/toku_atomic.h>
41
42 #include "test.h"
43 #include "threaded_stress_test_helpers.h"
44
45 //
46 // This test tries to emulate iibench at the ydb layer.
47 //
48 // The schema is simple:
49 // 8 byte primary key
50 // 8 byte key A
51 // 8 byte key B
52 // 8 byte key C
53 //
54 // There's one primary DB for the pk and three secondary DBs.
55 //
56 // The primary key stores the other columns as the value.
57 // The secondary keys have the primary key appended to them.
58 //
59
60 static const size_t iibench_secondary_key_size = 16;
61
62 struct iibench_row {
63 uint64_t pk;
64 int64_t a;
65 int64_t b;
66 int64_t c;
67 };
68
69 struct iibench_secondary_row {
70 int64_t column;
71 uint64_t pk;
72 };
73
hash(uint64_t key)74 static int64_t hash(uint64_t key) {
75 uint64_t hash = 0;
76 uint8_t *buf = (uint8_t *) &key;
77 for (int i = 0; i < 8; i++) {
78 hash += (((buf[i] + 1) * 17) & 0xFF) << (i * 8);
79 }
80 return hash;
81 }
82
iibench_generate_column_by_pk(int64_t pk,int db_idx)83 static int64_t iibench_generate_column_by_pk(int64_t pk, int db_idx) {
84 invariant(db_idx > 0);
85 return hash(pk * db_idx);
86 }
87
iibench_generate_row(int64_t pk,struct iibench_row * row)88 static void iibench_generate_row(int64_t pk, struct iibench_row *row) {
89 row->a = iibench_generate_column_by_pk(pk, 1);
90 row->b = iibench_generate_column_by_pk(pk, 2);
91 row->c = iibench_generate_column_by_pk(pk, 3);
92 }
93
iibench_parse_row(const DBT * key,const DBT * val,struct iibench_row * row)94 static void iibench_parse_row(const DBT *key, const DBT *val, struct iibench_row *row) {
95 char *CAST_FROM_VOIDP(val_buf, val->data);
96 invariant(key->size == 8);
97 invariant(val->size == 24);
98 memcpy(&row->pk, key->data, 8);
99 memcpy(&row->a, val_buf + 0, 8);
100 memcpy(&row->b, val_buf + 8, 8);
101 memcpy(&row->c, val_buf + 16, 8);
102 }
103
UU()104 static void UU() iibench_verify_row(const struct iibench_row *row) {
105 struct iibench_row expected_row;
106 iibench_generate_row(row->pk, &expected_row);
107 invariant(row->a == expected_row.a);
108 invariant(row->b == expected_row.b);
109 invariant(row->c == expected_row.c);
110 }
111
iibench_parse_secondary_row(const DBT * key,const DBT * val,struct iibench_secondary_row * row)112 static void iibench_parse_secondary_row(const DBT *key, const DBT *val, struct iibench_secondary_row *row) {
113 char *CAST_FROM_VOIDP(key_buf, key->data);
114 invariant(key->size == iibench_secondary_key_size);
115 invariant(val->size == 0);
116 memcpy(&row->column, key_buf + 0, 8);
117 memcpy(&row->pk, key_buf + 8, 8);
118 }
119
UU()120 static void UU() iibench_verify_secondary_row(const struct iibench_secondary_row *row, int db_idx) {
121 int64_t expected = iibench_generate_column_by_pk(row->pk, db_idx);
122 invariant(row->column == expected);
123 }
124
iibench_fill_key_buf(uint64_t pk,int64_t * buf)125 static void iibench_fill_key_buf(uint64_t pk, int64_t *buf) {
126 memcpy(&buf[0], &pk, 8);
127 }
128
iibench_fill_val_buf(uint64_t pk,int64_t * buf)129 static void iibench_fill_val_buf(uint64_t pk, int64_t *buf) {
130 struct iibench_row row;
131 iibench_generate_row(pk, &row);
132 memcpy(&buf[0], &row.a, sizeof(row.a));
133 memcpy(&buf[1], &row.b, sizeof(row.b));
134 memcpy(&buf[2], &row.c, sizeof(row.c));
135 }
136
iibench_get_db_idx(DB * db)137 static int iibench_get_db_idx(DB *db) {
138 DESCRIPTOR desc = db->cmp_descriptor;
139 invariant_notnull(desc->dbt.data);
140 invariant(desc->dbt.size == sizeof(int));
141 int db_idx;
142 memcpy(&db_idx, desc->dbt.data, desc->dbt.size);
143 return db_idx;
144 }
145
iibench_rangequery_cb(DB * db,const DBT * key,const DBT * val,void * extra)146 static void iibench_rangequery_cb(DB *db, const DBT *key, const DBT *val, void *extra) {
147 invariant_null(extra);
148 const int db_idx = iibench_get_db_idx(db);
149 if (db_idx == 0) {
150 struct iibench_row row;
151 iibench_parse_row(key, val, &row);
152 iibench_verify_row(&row);
153 } else {
154 struct iibench_secondary_row row;
155 iibench_parse_secondary_row(key, val, &row);
156 iibench_verify_secondary_row(&row, db_idx);
157 }
158 }
159
160 struct iibench_put_op_extra {
161 uint64_t autoincrement;
162 };
163
UU()164 static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) {
165 const int num_dbs = arg->cli->num_DBs;
166 DB **dbs = arg->dbp;
167 DB_ENV *env = arg->env;
168 DBT_ARRAY mult_key_dbt[num_dbs];
169 DBT_ARRAY mult_val_dbt[num_dbs];
170 uint32_t mult_put_flags[num_dbs];
171
172 // The first index is unique with serial autoincrement keys.
173 // The rest are have keys generated with this thread's random data.
174 mult_put_flags[0] = get_put_flags(arg->cli) |
175 // If the table was already created, don't check for uniqueness.
176 (arg->cli->num_elements > 0 ? 0 : DB_NOOVERWRITE);
177 for (int i = 0; i < num_dbs; i++) {
178 toku_dbt_array_init(&mult_key_dbt[i], 1);
179 toku_dbt_array_init(&mult_val_dbt[i], 1);
180 mult_put_flags[i] = get_put_flags(arg->cli);
181 }
182 mult_key_dbt[0].dbts[0].flags = 0;
183 mult_val_dbt[0].dbts[0].flags = 0;
184
185 int r = 0;
186
187 uint64_t puts_to_increment = 0;
188 for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
189 struct iibench_put_op_extra *CAST_FROM_VOIDP(info, operation_extra);
190
191 // Get a random primary key, generate secondary key columns in valbuf
192 uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1);
193 if (arg->bounded_element_range && arg->cli->num_elements > 0) {
194 pk = pk % arg->cli->num_elements;
195 }
196 int64_t keybuf[1];
197 int64_t valbuf[3];
198 iibench_fill_key_buf(pk, keybuf);
199 iibench_fill_val_buf(pk, valbuf);
200 dbt_init(&mult_key_dbt[0].dbts[0], keybuf, sizeof keybuf);
201 dbt_init(&mult_val_dbt[0].dbts[0], valbuf, sizeof valbuf);
202
203 r = env->put_multiple(
204 env,
205 dbs[0], // source db.
206 txn,
207 &mult_key_dbt[0].dbts[0], // source db key
208 &mult_val_dbt[0].dbts[0], // source db value
209 num_dbs, // total number of dbs
210 dbs, // array of dbs
211 mult_key_dbt, // array of keys
212 mult_val_dbt, // array of values
213 mult_put_flags // array of flags
214 );
215 if (r != 0) {
216 goto cleanup;
217 }
218 puts_to_increment++;
219 if (puts_to_increment == 100) {
220 increment_counter(stats_extra, PUTS, puts_to_increment);
221 puts_to_increment = 0;
222 }
223 }
224
225 cleanup:
226 for (int i = 0; i < num_dbs; i++) {
227 toku_dbt_array_destroy(&mult_key_dbt[i]);
228 toku_dbt_array_destroy(&mult_val_dbt[i]);
229 }
230 return r;
231 }
232
iibench_generate_row_for_put(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * UU (src_key),const DBT * src_val)233 static int iibench_generate_row_for_put(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *UU(src_key), const DBT *src_val) {
234 toku_dbt_array_resize(dest_keys, 1);
235 toku_dbt_array_resize(dest_vals, 1);
236 DBT *dest_key = &dest_keys->dbts[0];
237 DBT *dest_val = &dest_vals->dbts[0];
238
239 invariant(src_db != dest_db);
240 // 8 byte primary key, REALLOC secondary key
241 invariant_notnull(src_key->data);
242 invariant(src_key->size == 8);
243 invariant(dest_key->flags == DB_DBT_REALLOC);
244 // Expand the secondary key data buffer if necessary
245 if (dest_key->size != iibench_secondary_key_size) {
246 dest_key->data = toku_xrealloc(dest_key->data, iibench_secondary_key_size);
247 dest_key->size = iibench_secondary_key_size;
248 }
249
250 // Get the db index from the descriptor. This is a secondary index
251 // so it has to be greater than zero (which would be the pk). Then
252 // grab the appropriate secondary key from the source val, which is
253 // an array of the 3 columns, so we have to subtract 1 from the index.
254 const int db_idx = iibench_get_db_idx(dest_db);
255 int64_t *CAST_FROM_VOIDP(columns, src_val->data);
256 int64_t secondary_key = columns[db_idx - 1];
257
258 // First write down the secondary key, then the primary key (in src_key)
259 int64_t *CAST_FROM_VOIDP(dest_key_buf, dest_key->data);
260 memcpy(&dest_key_buf[0], &secondary_key, sizeof(secondary_key));
261 memcpy(&dest_key_buf[1], src_key->data, src_key->size);
262 dest_val->data = nullptr;
263 dest_val->size = 0;
264 return 0;
265 }
266
267 // After each DB opens, set the descriptor to store the DB idx value.
268 // Close and reopen the DB so we can use db->cmp_descriptor during comparisons.
iibench_set_descriptor_after_db_opens(DB_ENV * env,DB * db,int idx,reopen_db_fn reopen,struct cli_args * cli_args)269 static DB *iibench_set_descriptor_after_db_opens(DB_ENV *env, DB *db, int idx, reopen_db_fn reopen, struct cli_args *cli_args) {
270 int r;
271 DBT desc_dbt;
272 desc_dbt.data = &idx;
273 desc_dbt.size = sizeof(idx);
274 desc_dbt.ulen = 0;
275 desc_dbt.flags = 0;
276 r = db->change_descriptor(db, nullptr, &desc_dbt, 0); CKERR(r);
277 r = db->close(db, 0); CKERR(r);
278 r = db_create(&db, env, 0); CKERR(r);
279 reopen(db, idx, cli_args);
280 return db;
281 }
282
iibench_compare_keys(DB * db,const DBT * a,const DBT * b)283 static int iibench_compare_keys(DB *db, const DBT *a, const DBT *b) {
284 const int db_idx = iibench_get_db_idx(db);
285 if (db_idx == 0) {
286 invariant(a->size == 8);
287 invariant(b->size == 8);
288 uint64_t x = *(uint64_t *) a->data;
289 uint64_t y = *(uint64_t *) b->data;
290 if (x < y) {
291 return -1;
292 } else if (x == y) {
293 return 0;
294 } else {
295 return 1;
296 }
297 } else {
298 invariant(a->size == 16);
299 invariant(b->size == 16);
300 int64_t x = *(int64_t *) a->data;
301 int64_t y = *(int64_t *) b->data;
302 uint64_t pk_x = *(uint64_t *) (((char *) a->data) + 8);
303 uint64_t pk_y = *(uint64_t *) (((char *) b->data) + 8);
304 if (x < y) {
305 return -1;
306 } else if (x == y) {
307 if (pk_x < pk_y) {
308 return -1;
309 } else if (pk_x == pk_y) {
310 return 0;
311 } else {
312 return 1;
313 }
314 } else {
315 return 1;
316 }
317 }
318 }
319
iibench_rangequery_db(DB * db,DB_TXN * txn,ARG arg,uint64_t max_pk)320 static void iibench_rangequery_db(DB *db, DB_TXN *txn, ARG arg, uint64_t max_pk) {
321 const int limit = arg->cli->range_query_limit;
322
323 int r;
324 DBC *cursor;
325
326 // Get a random key no greater than max pk
327 DBT start_key, end_key;
328 uint64_t start_k = myrandom_r(arg->random_data) % (max_pk + 1);
329 uint64_t end_k = start_k + limit;
330 dbt_init(&start_key, &start_k, 8);
331 dbt_init(&end_key, &end_k, 8);
332
333 r = db->cursor(db, txn, &cursor, 0); CKERR(r);
334 r = cursor->c_set_bounds(cursor, &start_key, &end_key, true, 0); CKERR(r);
335 struct rangequery_cb_extra extra = {
336 .rows_read = 0,
337 .limit = limit,
338 .cb = iibench_rangequery_cb,
339 .db = db,
340 .cb_extra = nullptr,
341 };
342 r = cursor->c_getf_set(cursor, 0, &start_key, rangequery_cb, &extra);
343 while (r == 0 && extra.rows_read < extra.limit && run_test) {
344 r = cursor->c_getf_next(cursor, 0, rangequery_cb, &extra);
345 }
346
347 r = cursor->c_close(cursor); CKERR(r);
348 }
349
350 // Do a range query over the primary index, verifying the contents of the rows
iibench_rangequery_op(DB_TXN * txn,ARG arg,void * operation_extra,void * stats_extra)351 static int iibench_rangequery_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) {
352 struct iibench_put_op_extra *CAST_FROM_VOIDP(info, operation_extra);
353 DB *db = arg->dbp[0];
354
355 // Assume the max PK is the table size. If it isn't specified, do a
356 // safe read of the current autoincrement key from the put thread.
357 uint64_t max_pk = arg->cli->num_elements;
358 if (max_pk == 0) {
359 max_pk = toku_sync_fetch_and_add(&info->autoincrement, 0);
360 }
361 iibench_rangequery_db(db, txn, arg, max_pk);
362 increment_counter(stats_extra, PTQUERIES, 1);
363 return 0;
364 }
365
iibench_fill_tables(DB_ENV * env,DB ** dbs,struct cli_args * cli_args,bool UU (fill_with_zeroes))366 static int iibench_fill_tables(DB_ENV *env, DB **dbs, struct cli_args *cli_args, bool UU(fill_with_zeroes)) {
367 const int num_dbs = cli_args->num_DBs;
368 int r = 0;
369
370 DB_TXN *txn;
371 r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
372
373 DB_LOADER *loader;
374 uint32_t db_flags[num_dbs];
375 uint32_t dbt_flags[num_dbs];
376 for (int i = 0; i < num_dbs; i++) {
377 db_flags[i] = DB_PRELOCKED_WRITE;
378 dbt_flags[i] = DB_DBT_REALLOC;
379 }
380
381 r = env->create_loader(env, txn, &loader, dbs[0], num_dbs, dbs, db_flags, dbt_flags, 0); CKERR(r);
382 for (int i = 0; i < cli_args->num_elements; i++) {
383 DBT key, val;
384 uint64_t pk = i;
385 int64_t keybuf[1];
386 int64_t valbuf[3];
387 iibench_fill_key_buf(pk, keybuf);
388 iibench_fill_val_buf(pk, valbuf);
389 dbt_init(&key, keybuf, sizeof keybuf);
390 dbt_init(&val, valbuf, sizeof valbuf);
391 r = loader->put(loader, &key, &val); CKERR(r);
392 if (verbose && i > 0 && i % 10000 == 0) {
393 report_overall_fill_table_progress(cli_args, 10000);
394 }
395 }
396 r = loader->close(loader); CKERR(r);
397
398 r = txn->commit(txn, 0); CKERR(r);
399 return 0;
400 }
401
402 static void
stress_table(DB_ENV * env,DB ** dbs,struct cli_args * cli_args)403 stress_table(DB_ENV* env, DB **dbs, struct cli_args *cli_args) {
404 if (verbose) printf("starting creation of pthreads\n");
405 const int num_threads = cli_args->num_put_threads + cli_args->num_ptquery_threads;
406 struct arg myargs[num_threads];
407
408 // Put threads do iibench-like inserts with an auto-increment primary key
409 // Query threads do range queries of a certain size, verifying row contents.
410
411 struct iibench_put_op_extra put_extra = {
412 .autoincrement = 0
413 };
414 for (int i = 0; i < num_threads; i++) {
415 arg_init(&myargs[i], dbs, env, cli_args);
416 if (i < cli_args->num_put_threads) {
417 myargs[i].operation = iibench_put_op;
418 myargs[i].operation_extra = &put_extra;
419 } else {
420 myargs[i].operation = iibench_rangequery_op;
421 myargs[i].operation_extra = &put_extra;
422 myargs[i].txn_flags |= DB_TXN_READ_ONLY;
423 myargs[i].sleep_ms = 1000; // 1 second between range queries
424 }
425 }
426 const bool crash_at_end = false;
427 run_workers(myargs, num_threads, cli_args->num_seconds, crash_at_end, cli_args);
428 }
429
test_main(int argc,char * const argv[])430 int test_main(int argc, char *const argv[]) {
431 struct cli_args args = get_default_args_for_perf();
432 args.num_elements = 0; // want to start with empty DBs
433 // Puts per transaction is configurable. It defaults to 1k.
434 args.txn_size = 1000;
435 // Default to one writer on 4 indexes (pk + 3 secondaries), no readers.
436 args.num_DBs = 4;
437 args.num_put_threads = 1;
438 args.num_ptquery_threads = 0;
439 parse_stress_test_args(argc, argv, &args);
440 // The schema is not configurable. Silently ignore whatever was passed in.
441 args.key_size = 8;
442 args.val_size = 32;
443 // when there are multiple threads, its valid for two of them to
444 // generate the same key and one of them fail with DB_LOCK_NOTGRANTED
445 if (args.num_put_threads > 1) {
446 args.crash_on_operation_failure = false;
447 }
448 args.env_args.generate_put_callback = iibench_generate_row_for_put;
449 after_db_open_hook = iibench_set_descriptor_after_db_opens;
450 fill_tables = iibench_fill_tables;
451 perf_test_main_with_cmp(&args, iibench_compare_keys);
452 return 0;
453 }
454