1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #pragma once
40
41 // The Way Things Work:
42 //
43 // Threaded stress tests have the following properties:
44 // - One or more DBs
45 // - One or more threads performing some number of operations per txn.
46 // - Correctness tests use signed 4 byte keys and signed 4 byte values. They expect
47 // a table with all zeroes before running.
48 // - Performance tests should use 8 byte keys and 8+ byte values, where the values
49 // are some mixture of random uncompressible garbage and zeroes, depending how
50 // compressible we want the data. These tests want the table to be populated
51 // with keys in the range [0, table_size - 1] unless disperse_keys is true,
52 // then the keys are scrambled up in the integer key space.
53
54 #include "toku_config.h"
55 #include "test.h"
56
57 #include <stdio.h>
58 #include <math.h>
59 #include <locale.h>
60
61 #include <db.h>
62 #include <memory.h>
63 #include <toku_race_tools.h>
64
65 #include <portability/toku_atomic.h>
66 #include <portability/toku_pthread.h>
67 #include <portability/toku_random.h>
68 #include <portability/toku_time.h>
69
70 #include <src/ydb-internal.h>
71
72 #include <util/dbt.h>
73
74 #include <util/rwlock.h>
75 #include <util/kibbutz.h>
76
77 static const size_t min_val_size = sizeof(int32_t);
78 static const size_t min_key_size = sizeof(int32_t);
79
80 volatile bool run_test; // should be volatile since we are communicating through this variable.
81
82 typedef struct arg *ARG;
83 typedef int (*operation_t)(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra);
84
85 // TODO: Properly define these in db.h so we don't have to copy them here
86 typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra);
87 typedef int (*test_generate_row_for_put_callback)(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_data);
88 typedef int (*test_generate_row_for_del_callback)(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, const DBT *src_key, const DBT *src_data);
89
90 enum stress_lock_type {
91 STRESS_LOCK_NONE = 0,
92 STRESS_LOCK_SHARED,
93 STRESS_LOCK_EXCL
94 };
95
96 struct env_args {
97 int fanout;
98 int node_size;
99 int basement_node_size;
100 int rollback_node_size;
101 int checkpointing_period;
102 int cleaner_period;
103 int cleaner_iterations;
104 int sync_period;
105 uint64_t lk_max_memory;
106 uint64_t cachetable_size;
107 uint32_t num_bucket_mutexes;
108 const char *envdir;
109 test_update_callback_f update_function; // update callback function
110 test_generate_row_for_put_callback generate_put_callback;
111 test_generate_row_for_del_callback generate_del_callback;
112 };
113
114 enum perf_output_format {
115 HUMAN = 0,
116 CSV,
117 TSV,
118 NUM_OUTPUT_FORMATS
119 };
120
121 struct cli_args {
122 int num_elements; // number of elements per DB
123 int num_DBs; // number of DBs
124 int num_seconds; // how long test should run
125 int join_timeout; // how long to wait for threads to join before assuming deadlocks
126 bool only_create; // true if want to only create DBs but not run stress
127 bool only_stress; // true if DBs are already created and want to only run stress
128 int update_broadcast_period_ms; // specific to test_stress3
129 int num_ptquery_threads; // number of threads to run point queries
130 bool do_test_and_crash; // true if we should crash after running stress test. For recovery tests.
131 bool do_recover; // true if we should run recover
132 int num_update_threads; // number of threads running updates
133 int num_put_threads; // number of threads running puts
134 int range_query_limit; // how many rows to look at for range queries
135 bool serial_insert;
136 bool interleave; // for insert benchmarks, whether to interleave separate threads' puts (or segregate them)
137 bool crash_on_operation_failure;
138 bool print_performance;
139 bool print_thread_performance;
140 bool print_iteration_performance;
141 enum perf_output_format perf_output_format;
142 enum toku_compression_method compression_method; // the compression method to use on newly created DBs
143 int performance_period;
144 uint32_t txn_size; // specifies number of updates/puts/whatevers per txn
145 uint32_t key_size; // number of bytes in vals. Must be at least 4
146 uint32_t val_size; // number of bytes in vals. Must be at least 4
147 double compressibility; // the row values should compress down to this fraction
148 struct env_args env_args; // specifies environment variables
149 bool single_txn;
150 bool warm_cache; // warm caches before running stress_table
151 bool blackhole; // all message injects are no-ops. helps measure txn/logging/locktree overhead.
152 bool nolocktree; // use this flag to avoid the locktree on insertions
153 bool unique_checks; // use uniqueness checking during insert. makes it slow.
154 uint32_t sync_period; // background log fsync period
155 bool nolog; // do not log. useful for testing in memory performance.
156 bool nocrashstatus; // do not print engine status upon crash
157 bool prelock_updates; // update threads perform serial updates on a prelocked range
158 bool disperse_keys; // spread the keys out during a load (by reversing the bits in the loop index) to make a wide tree we can spread out random inserts into
159 bool memcmp_keys; // pack keys big endian and use the builtin key comparison function in the fractal tree
160 bool direct_io; // use direct I/O
161 const char *print_engine_status; // print engine status rows matching a simple regex "a|b|c", matching strings where a or b or c is a subtring.
162 };
163
164 struct arg {
165 DB **dbp; // array of DBs
166 DB_ENV* env; // environment used
167 bool bounded_element_range; // true if elements in dictionary are bounded
168 // by num_elements, that is, all keys in each
169 // DB are in [0, num_elements)
170 // false otherwise
171 int sleep_ms; // number of milliseconds to sleep between operations
172 uint32_t txn_flags; // isolation level for txn running operation
173 operation_t operation; // function that is the operation to be run
174 void* operation_extra; // extra parameter passed to operation
175 enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking
176 struct random_data *random_data; // state for random_r
177 int thread_idx;
178 int num_threads;
179 struct cli_args *cli;
180 bool do_prepare;
181 bool prelock_updates;
182 bool track_thread_performance;
183 bool wrap_in_parent;
184 };
185
arg_init(struct arg * arg,DB ** dbp,DB_ENV * env,struct cli_args * cli_args)186 static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
187 arg->cli = cli_args;
188 arg->dbp = dbp;
189 arg->env = env;
190 arg->bounded_element_range = true;
191 arg->sleep_ms = 0;
192 arg->lock_type = STRESS_LOCK_NONE;
193 arg->txn_flags = DB_TXN_SNAPSHOT;
194 arg->operation_extra = nullptr;
195 arg->do_prepare = false;
196 arg->prelock_updates = false;
197 arg->track_thread_performance = true;
198 arg->wrap_in_parent = false;
199 }
200
201 enum operation_type {
202 OPERATION = 0,
203 PUTS,
204 PTQUERIES,
205 NUM_OPERATION_TYPES
206 };
207
208 const char *operation_names[] = {
209 "ops",
210 "puts",
211 "ptqueries",
212 nullptr
213 };
214
increment_counter(void * extra,enum operation_type type,uint64_t inc)215 static void increment_counter(void *extra, enum operation_type type, uint64_t inc) {
216 invariant(type != OPERATION);
217 int t = (int) type;
218 invariant(extra);
219 invariant(t >= 0 && t < (int) NUM_OPERATION_TYPES);
220 uint64_t *CAST_FROM_VOIDP(counters, extra);
221 counters[t] += inc;
222 }
223
224 struct perf_formatter {
225 void (*header)(const struct cli_args *cli_args, const int num_threads);
226 void (*iteration)(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads);
227 void (*totals)(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads);
228 };
229
230 static inline int
seconds_in_this_iteration(const int current_time,const int performance_period)231 seconds_in_this_iteration(const int current_time, const int performance_period)
232 {
233 const int iteration = (current_time + performance_period - 1) / performance_period;
234 return current_time - ((iteration - 1) * performance_period);
235 }
236
237 static void
human_print_perf_header(const struct cli_args * UU (cli_args),const int UU (num_threads))238 human_print_perf_header(const struct cli_args *UU(cli_args), const int UU(num_threads)) {}
239
240 static void
human_print_perf_iteration(const struct cli_args * cli_args,const int current_time,uint64_t last_counters[][(int)NUM_OPERATION_TYPES],uint64_t * counters[],const int num_threads)241 human_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
242 {
243 const int secondsthisiter = seconds_in_this_iteration(current_time, cli_args->performance_period);
244 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
245 uint64_t period_total = 0;
246 printf("%4d %s", current_time, operation_names[op]);
247 for (int i = strlen(operation_names[op]); i < 12; ++i) {
248 printf(" ");
249 }
250 for (int t = 0; t < num_threads; ++t) {
251 const uint64_t last = last_counters[t][op];
252 const uint64_t current = counters[t][op];
253 const uint64_t this_iter = current - last;
254 if (cli_args->print_thread_performance) {
255 const double persecond = (double) this_iter / secondsthisiter;
256 printf("\t%'12" PRIu64 " (%'12.1lf/s)", this_iter, persecond);
257 }
258 period_total += this_iter;
259 last_counters[t][op] = current;
260 }
261 const double totalpersecond = (double) period_total / secondsthisiter;
262 printf("\tTotal %'12" PRIu64 " (%'12.1lf/s)\n", period_total, totalpersecond);
263 }
264 fflush(stdout);
265 }
266
267 static void
human_print_perf_totals(const struct cli_args * cli_args,uint64_t * counters[],const int num_threads)268 human_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads)
269 {
270 if (cli_args->print_iteration_performance) {
271 printf("\n");
272 }
273 printf("Overall performance:\n");
274 uint64_t overall_totals[(int) NUM_OPERATION_TYPES];
275 ZERO_ARRAY(overall_totals);
276 for (int t = 0; t < num_threads; ++t) {
277 if (cli_args->print_thread_performance) {
278 printf("Thread %4d: ", t + 1);
279 }
280 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
281 const uint64_t current = counters[t][op];
282 if (cli_args->print_thread_performance) {
283 const double persecond = (double) current / cli_args->num_seconds;
284 printf("\t%s\t%'12" PRIu64 " (%'12.1lf/s)", operation_names[op], current, persecond);
285 }
286 overall_totals[op] += current;
287 }
288 if (cli_args->print_thread_performance) {
289 printf("\n");
290 }
291 }
292 printf("All threads: ");
293 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
294 const double totalpersecond = (double) overall_totals[op] / cli_args->num_seconds;
295 printf("\t%s\t%'12" PRIu64 " (%'12.1lf/s)", operation_names[op], overall_totals[op], totalpersecond);
296 }
297 printf("\n");
298 }
299
300 static void
csv_print_perf_header(const struct cli_args * cli_args,const int num_threads)301 csv_print_perf_header(const struct cli_args *cli_args, const int num_threads)
302 {
303 printf("seconds");
304 if (cli_args->print_thread_performance) {
305 for (int t = 1; t <= num_threads; ++t) {
306 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
307 printf(",\"Thread %d %s\",\"Thread %d %s/s\"", t, operation_names[op], t, operation_names[op]);
308 }
309 }
310 }
311 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
312 printf(",\"Total %s\",\"Total %s/s\"", operation_names[op], operation_names[op]);
313 }
314 printf("\n");
315 }
316
317 static void
csv_print_perf_iteration(const struct cli_args * cli_args,const int current_time,uint64_t last_counters[][(int)NUM_OPERATION_TYPES],uint64_t * counters[],const int num_threads)318 csv_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
319 {
320 const int secondsthisiter = seconds_in_this_iteration(current_time, cli_args->performance_period);
321 printf("%d", current_time);
322 uint64_t period_totals[(int) NUM_OPERATION_TYPES];
323 ZERO_ARRAY(period_totals);
324 for (int t = 0; t < num_threads; ++t) {
325 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
326 const uint64_t last = last_counters[t][op];
327 const uint64_t current = counters[t][op];
328 const uint64_t this_iter = current - last;
329 if (cli_args->print_thread_performance) {
330 const double persecond = (double) this_iter / secondsthisiter;
331 printf(",%" PRIu64 ",%.1lf", this_iter, persecond);
332 }
333 period_totals[op] += this_iter;
334 last_counters[t][op] = current;
335 }
336 }
337 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
338 const double totalpersecond = (double) period_totals[op] / secondsthisiter;
339 printf(",%" PRIu64 ",%.1lf", period_totals[op], totalpersecond);
340 }
341 printf("\n");
342 fflush(stdout);
343 }
344
345 static void
csv_print_perf_totals(const struct cli_args * cli_args,uint64_t * counters[],const int num_threads)346 csv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads) {
347 printf("overall");
348 uint64_t overall_totals[(int) NUM_OPERATION_TYPES];
349 ZERO_ARRAY(overall_totals);
350 for (int t = 0; t < num_threads; ++t) {
351 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
352 const uint64_t current = counters[t][op];
353 if (cli_args->print_thread_performance) {
354 const double persecond = (double) current / cli_args->num_seconds;
355 printf(",%" PRIu64 ",%.1lf", current, persecond);
356 }
357 overall_totals[op] += current;
358 }
359 }
360 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
361 const double totalpersecond = (double) overall_totals[op] / cli_args->num_seconds;
362 printf(",%" PRIu64 ",%.1lf", overall_totals[op], totalpersecond);
363 }
364 printf("\n");
365 }
366
367 static void
tsv_print_perf_header(const struct cli_args * cli_args,const int num_threads)368 tsv_print_perf_header(const struct cli_args *cli_args, const int num_threads)
369 {
370 printf("\"seconds\"");
371 if (cli_args->print_thread_performance) {
372 for (int t = 1; t <= num_threads; ++t) {
373 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
374 printf("\t\"Thread %d %s\"\t\"Thread %d %s/s\"", t, operation_names[op], t, operation_names[op]);
375 }
376 }
377 }
378 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
379 printf("\t\"Total %s\"\t\"Total %s/s\"", operation_names[op], operation_names[op]);
380 }
381 printf("\n");
382 }
383
384 static void
tsv_print_perf_iteration(const struct cli_args * cli_args,const int current_time,uint64_t last_counters[][(int)NUM_OPERATION_TYPES],uint64_t * counters[],const int num_threads)385 tsv_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
386 {
387 const int secondsthisiter = seconds_in_this_iteration(current_time, cli_args->performance_period);
388 printf("%d", current_time);
389 uint64_t period_totals[(int) NUM_OPERATION_TYPES];
390 ZERO_ARRAY(period_totals);
391 for (int t = 0; t < num_threads; ++t) {
392 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
393 const uint64_t last = last_counters[t][op];
394 const uint64_t current = counters[t][op];
395 const uint64_t this_iter = current - last;
396 if (cli_args->print_thread_performance) {
397 const double persecond = (double) this_iter / secondsthisiter;
398 printf("\t%" PRIu64 "\t%.1lf", this_iter, persecond);
399 }
400 period_totals[op] += this_iter;
401 last_counters[t][op] = current;
402 }
403 }
404 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
405 const double totalpersecond = (double) period_totals[op] / secondsthisiter;
406 printf("\t%" PRIu64 "\t%.1lf", period_totals[op], totalpersecond);
407 }
408 printf("\n");
409 fflush(stdout);
410 }
411
412 static void
tsv_print_perf_totals(const struct cli_args * cli_args,uint64_t * counters[],const int num_threads)413 tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads) {
414 printf("\"overall\"");
415 uint64_t overall_totals[(int) NUM_OPERATION_TYPES];
416 ZERO_ARRAY(overall_totals);
417 for (int t = 0; t < num_threads; ++t) {
418 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
419 const uint64_t current = counters[t][op];
420 if (cli_args->print_thread_performance) {
421 const double persecond = (double) current / cli_args->num_seconds;
422 printf("\t%" PRIu64 "\t%.1lf", current, persecond);
423 }
424 overall_totals[op] += current;
425 }
426 }
427 for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
428 const double totalpersecond = (double) overall_totals[op] / cli_args->num_seconds;
429 printf("\t%" PRIu64 "\t%.1lf", overall_totals[op], totalpersecond);
430 }
431 printf("\n");
432 }
433
434 const struct perf_formatter perf_formatters[] = {
435 { /* HUMAN */
436 .header = human_print_perf_header,
437 .iteration = human_print_perf_iteration,
438 .totals = human_print_perf_totals
439 },
440 { /* CSV */
441 .header = csv_print_perf_header,
442 .iteration = csv_print_perf_iteration,
443 .totals = csv_print_perf_totals
444 },
445 { /* TSV */
446 .header = tsv_print_perf_header,
447 .iteration = tsv_print_perf_iteration,
448 .totals = tsv_print_perf_totals
449 },
450 };
451
get_env_open_flags(struct cli_args * args)452 static int get_env_open_flags(struct cli_args *args) {
453 int flags = DB_INIT_LOCK|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE;
454 flags |= args->nolog ? 0 : DB_INIT_LOG;
455 return flags;
456 }
457
get_put_flags(struct cli_args * args)458 static int get_put_flags(struct cli_args *args) {
459 int flags = 0;
460 flags |= args->nolocktree ? DB_PRELOCKED_WRITE : 0;
461 flags |= args->unique_checks ? DB_NOOVERWRITE : 0;
462 return flags;
463 }
464
get_commit_flags(struct cli_args * args)465 static int get_commit_flags(struct cli_args *args) {
466 int flags = 0;
467 flags |= args->env_args.sync_period > 0 ? DB_TXN_NOSYNC : 0;
468 return flags;
469 }
470
471 struct worker_extra {
472 struct arg *thread_arg;
473 toku_mutex_t *operation_lock_mutex;
474 struct st_rwlock *operation_lock;
475 uint64_t *counters;
476 int64_t pad[4]; // pad to 64 bytes
477 };
478
lock_worker_op(struct worker_extra * we)479 static void lock_worker_op(struct worker_extra* we) {
480 ARG arg = we->thread_arg;
481 if (arg->lock_type != STRESS_LOCK_NONE) {
482 toku_mutex_lock(we->operation_lock_mutex);
483 if (arg->lock_type == STRESS_LOCK_SHARED) {
484 rwlock_read_lock(we->operation_lock, we->operation_lock_mutex);
485 } else if (arg->lock_type == STRESS_LOCK_EXCL) {
486 rwlock_write_lock(we->operation_lock, we->operation_lock_mutex);
487 } else {
488 abort();
489 }
490 toku_mutex_unlock(we->operation_lock_mutex);
491 }
492 }
493
unlock_worker_op(struct worker_extra * we)494 static void unlock_worker_op(struct worker_extra* we) {
495 ARG arg = we->thread_arg;
496 if (arg->lock_type != STRESS_LOCK_NONE) {
497 toku_mutex_lock(we->operation_lock_mutex);
498 if (arg->lock_type == STRESS_LOCK_SHARED) {
499 rwlock_read_unlock(we->operation_lock);
500 } else if (arg->lock_type == STRESS_LOCK_EXCL) {
501 rwlock_write_unlock(we->operation_lock);
502 } else {
503 abort();
504 }
505 toku_mutex_unlock(we->operation_lock_mutex);
506 }
507 }
508
worker(void * arg_v)509 static void *worker(void *arg_v) {
510 int r;
511 struct worker_extra* CAST_FROM_VOIDP(we, arg_v);
512 ARG arg = we->thread_arg;
513 struct random_data random_data;
514 ZERO_STRUCT(random_data);
515 char *XCALLOC_N(8, random_buf);
516 r = myinitstate_r(random(), random_buf, 8, &random_data);
517 assert_zero(r);
518 arg->random_data = &random_data;
519 DB_ENV *env = arg->env;
520 DB_TXN *txn = nullptr;
521 DB_TXN *ptxn = nullptr;
522 if (verbose) {
523 toku_pthread_t self = toku_pthread_self();
524 uintptr_t intself = (uintptr_t) self;
525 printf("%lu starting %p\n", (unsigned long) intself, arg->operation);
526 }
527 if (arg->cli->single_txn) {
528 r = env->txn_begin(env, 0, &txn, arg->txn_flags); CKERR(r);
529 } else if (arg->wrap_in_parent) {
530 r = env->txn_begin(env, 0, &ptxn, arg->txn_flags); CKERR(r);
531 }
532 while (run_test) {
533 lock_worker_op(we);
534 if (!arg->cli->single_txn) {
535 r = env->txn_begin(env, ptxn, &txn, arg->txn_flags); CKERR(r);
536 }
537 r = arg->operation(txn, arg, arg->operation_extra, we->counters);
538 if (r==0 && !arg->cli->single_txn && arg->do_prepare) {
539 uint8_t gid[DB_GID_SIZE];
540 memset(gid, 0, DB_GID_SIZE);
541 uint64_t gid_val = txn->id64(txn);
542 uint64_t *gid_count_p = cast_to_typeof(gid_count_p) gid; // make gcc --happy about -Wstrict-aliasing
543 *gid_count_p = gid_val;
544 int rr = txn->prepare(txn, gid, 0);
545 assert_zero(rr);
546 }
547 if (r == 0) {
548 if (!arg->cli->single_txn) {
549 int flags = get_commit_flags(arg->cli);
550 int chk_r = txn->commit(txn, flags); CKERR(chk_r);
551 }
552 } else {
553 if (arg->cli->crash_on_operation_failure) {
554 CKERR(r);
555 } else {
556 if (!arg->cli->single_txn) {
557 { int chk_r = txn->abort(txn); CKERR(chk_r); }
558 }
559 }
560 }
561 unlock_worker_op(we);
562 if (arg->track_thread_performance) {
563 we->counters[OPERATION]++;
564 }
565 if (arg->sleep_ms) {
566 usleep(arg->sleep_ms * 1000);
567 }
568 }
569 if (arg->cli->single_txn) {
570 int flags = get_commit_flags(arg->cli);
571 int chk_r = txn->commit(txn, flags); CKERR(chk_r);
572 } else if (arg->wrap_in_parent) {
573 int flags = get_commit_flags(arg->cli);
574 int chk_r = ptxn->commit(ptxn, flags); CKERR(chk_r);
575 }
576 if (verbose) {
577 toku_pthread_t self = toku_pthread_self();
578 uintptr_t intself = (uintptr_t) self;
579 printf("%lu returning\n", (unsigned long) intself);
580 }
581 toku_free(random_buf);
582 return arg;
583 }
584
585 struct scan_cb_extra {
586 bool fast;
587 int curr_sum;
588 int num_elements;
589 };
590
591 struct scan_op_extra {
592 bool fast;
593 bool fwd;
594 bool prefetch;
595 };
596
597 static int
scan_cb(const DBT * key,const DBT * val,void * arg_v)598 scan_cb(const DBT *key, const DBT *val, void *arg_v) {
599 struct scan_cb_extra *CAST_FROM_VOIDP(cb_extra, arg_v);
600 assert(key);
601 assert(val);
602 assert(cb_extra);
603 assert(val->size >= sizeof(int));
604 cb_extra->curr_sum += *(int *) val->data;
605 cb_extra->num_elements++;
606 return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0;
607 }
608
scan_op_and_maybe_check_sum(DB * db,DB_TXN * txn,struct scan_op_extra * sce,bool check_sum)609 static int scan_op_and_maybe_check_sum(
610 DB* db,
611 DB_TXN *txn,
612 struct scan_op_extra* sce,
613 bool check_sum
614 )
615 {
616 int r = 0;
617 DBC* cursor = nullptr;
618
619 struct scan_cb_extra e = {
620 e.fast = sce->fast,
621 e.curr_sum = 0,
622 e.num_elements = 0,
623 };
624
625 { int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); }
626 if (sce->prefetch) {
627 r = cursor->c_set_bounds(cursor, db->dbt_neg_infty(), db->dbt_pos_infty(), true, 0);
628 assert(r == 0);
629 }
630 while (r != DB_NOTFOUND) {
631 if (sce->fwd) {
632 r = cursor->c_getf_next(cursor, 0, scan_cb, &e);
633 }
634 else {
635 r = cursor->c_getf_prev(cursor, 0, scan_cb, &e);
636 }
637 assert(r==0 || r==DB_NOTFOUND);
638 if (!run_test) {
639 // terminate early because this op takes a while under drd.
640 // don't check the sum if we do this.
641 check_sum = false;
642 break;
643 }
644 }
645 { int chk_r = cursor->c_close(cursor); CKERR(chk_r); }
646 if (r == DB_NOTFOUND) {
647 r = 0;
648 }
649 if (check_sum && e.curr_sum) {
650 printf("e.curr_sum: %" PRId32 " e.num_elements: %" PRId32 " \n", e.curr_sum, e.num_elements);
651 abort();
652 }
653 return r;
654 }
655
generate_row_for_put(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)656 static int generate_row_for_put(
657 DB *dest_db,
658 DB *src_db,
659 DBT_ARRAY *dest_keys,
660 DBT_ARRAY *dest_vals,
661 const DBT *src_key,
662 const DBT *src_val
663 )
664 {
665 invariant(!src_db || src_db != dest_db);
666 invariant(src_key->size >= sizeof(unsigned int));
667
668 // Consistent pseudo random source. Use checksum of key and val, and which db as seed
669
670 /*
671 struct x1764 l;
672 x1764_init(&l);
673 x1764_add(&l, src_key->data, src_key->size);
674 x1764_add(&l, src_val->data, src_val->size);
675 x1764_add(&l, &dest_db, sizeof(dest_db)); //make it depend on which db
676 unsigned int seed = x1764_finish(&l);
677 */
678 unsigned int seed = *(unsigned int*)src_key->data;
679
680 struct random_data random_data;
681 ZERO_STRUCT(random_data);
682 char random_buf[8];
683 {
684 int r = myinitstate_r(seed, random_buf, 8, &random_data);
685 assert_zero(r);
686 }
687
688 uint8_t num_outputs = 0;
689 while (myrandom_r(&random_data) % 2) {
690 num_outputs++;
691 if (num_outputs > 8) {
692 break;
693 }
694 }
695
696 toku_dbt_array_resize(dest_keys, num_outputs);
697 toku_dbt_array_resize(dest_vals, num_outputs);
698 int sum = 0;
699 for (uint8_t i = 0; i < num_outputs; i++) {
700 DBT *dest_key = &dest_keys->dbts[i];
701 DBT *dest_val = &dest_vals->dbts[i];
702
703 invariant(dest_key->flags == DB_DBT_REALLOC);
704 invariant(dest_val->flags == DB_DBT_REALLOC);
705
706 if (dest_key->ulen < src_key->size) {
707 dest_key->data = toku_xrealloc(dest_key->data, src_key->size);
708 dest_key->ulen = src_key->size;
709 }
710 dest_key->size = src_key->size;
711 if (dest_val->ulen < src_val->size) {
712 dest_val->data = toku_xrealloc(dest_val->data, src_val->size);
713 dest_val->ulen = src_val->size;
714 }
715 dest_val->size = src_val->size;
716 memcpy(dest_key->data, src_key->data, src_key->size);
717 ((uint8_t*)dest_key->data)[src_key->size-1] = i; //Have different keys for each entry.
718
719 memcpy(dest_val->data, src_val->data, src_val->size);
720 invariant(dest_val->size >= sizeof(int));
721 int number;
722 if (i == num_outputs - 1) {
723 // Make sum add to 0
724 number = -sum;
725 } else {
726 // Keep track of sum
727 number = myrandom_r(&random_data);
728 }
729 sum += number;
730 *(int *) dest_val->data = number;
731 }
732 invariant(sum == 0);
733 return 0;
734 }
735
736 // How Keys Work:
737 //
738 // Keys are either
739 // - 4 byte little endian non-negative integers
740 // - 8 byte little endian non-negative integers
741 // - 8 byte little endian non-negative integers, padded with zeroes.
742 //
743 // The comparison function treats the key as a 4 byte
744 // int if the key size is exactly 4, and it treats
745 // the key as an 8 byte int if the key size is 8 or more.
746
random_bounded_key(struct random_data * random_data,ARG arg)747 static int64_t random_bounded_key(struct random_data *random_data, ARG arg) {
748 // Effect: Returns a random key in the table, possible bounded by the number of elements.
749 int64_t key = myrandom_r(random_data);
750 if (arg->bounded_element_range && arg->cli->num_elements > 0) {
751 key = key % arg->cli->num_elements;
752 }
753 return key;
754 }
755
breverse(int64_t v)756 static int64_t breverse(int64_t v)
757 // Effect: return the bits in i, reversed
758 // Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
759 // Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
760 {
761 uint64_t k = v; // r will be reversed bits of v; first get LSB of v
762 int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
763
764 for (v >>= 1; v; v >>= 1) {
765 k <<= 1;
766 k |= v & 1;
767 s--;
768 }
769 k <<= s; // shift when v's highest bits are zero
770 int64_t r = k;
771 return r & ~(1ULL << 63);
772 }
773
774 static void
fill_key_buf(int64_t key,uint8_t * data,struct cli_args * args)775 fill_key_buf(int64_t key, uint8_t *data, struct cli_args *args) {
776 // Effect: Fill data with a specific little-endian integer, 4 or 8 bytes long
777 // depending on args->key_size, possibly padded with zeroes.
778 // Requires: *data is at least sizeof(uint64_t)
779 if (args->disperse_keys) {
780 key = breverse(key);
781 }
782 invariant(key >= 0);
783 if (args->key_size == sizeof(int)) {
784 const int key32 = args->memcmp_keys ? toku_htonl(key) : key;
785 memcpy(data, &key32, sizeof(key32));
786 } else {
787 invariant(args->key_size >= sizeof(key));
788 const int64_t key64 = args->memcmp_keys ? toku_htonl(key) : key;
789 memcpy(data, &key64, sizeof(key64));
790 memset(data + sizeof(key64), 0, args->key_size - sizeof(key64));
791 }
792 }
793
794 static void
fill_key_buf_random(struct random_data * random_data,uint8_t * data,ARG arg)795 fill_key_buf_random(struct random_data *random_data, uint8_t *data, ARG arg) {
796 // Effect: Fill data with a random, little-endian, 4 or 8 byte integer, possibly
797 // bounded by the size of the table, and padded with zeroes until key_size.
798 // Requires, Notes: see fill_key_buf()
799 int64_t key = random_bounded_key(random_data, arg);
800 fill_key_buf(key, data, arg->cli);
801 }
802
803 // How Vals Work:
804 //
805 // Values are either
806 // - 4 byte little endian integers
807 // - 4 byte little endian integers, padded with zeroes
808 // - X bytes random values, Y bytes zeroes, where X and Y
809 // are derived from the desired compressibility;
810 //
811 // Correctness tests use integer values, perf tests use random bytes.
812 // Both support padding out values > 4 bytes with zeroes.
813
814 static void
fill_val_buf(int64_t val,uint8_t * data,uint32_t val_size)815 fill_val_buf(int64_t val, uint8_t *data, uint32_t val_size) {
816 // Effect, Requires, Notes: see fill_key_buf().
817 if (val_size == sizeof(int)) {
818 const int val32 = val;
819 memcpy(data, &val32, sizeof(val32));
820 } else {
821 invariant(val_size >= sizeof(val));
822 memcpy(data, &val, sizeof(val));
823 memset(data + sizeof(val), 0, val_size - sizeof(val));
824 }
825 }
826
827 // Fill array with compressibility*size 0s.
828 // 0.0<=compressibility<=1.0
829 // Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away).
830 // The rest will be random data.
831 static void
fill_val_buf_random(struct random_data * random_data,uint8_t * data,struct cli_args * args)832 fill_val_buf_random(struct random_data *random_data, uint8_t *data, struct cli_args *args) {
833 invariant(args->val_size >= min_val_size);
834 //Requires: The array was zeroed since the last time 'size' was changed.
835 //Requires: compressibility is in range [0,1] indicating fraction that should be zeros.
836
837 // Fill in the random bytes
838 uint32_t num_random_bytes = (1 - args->compressibility) * args->val_size;
839 if (num_random_bytes > 0) {
840 uint32_t filled;
841 for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) {
842 *((uint64_t *) &data[filled]) = myrandom_r(random_data);
843 }
844 if (filled != num_random_bytes) {
845 uint64_t last8 = myrandom_r(random_data);
846 memcpy(&data[filled], &last8, num_random_bytes - filled);
847 }
848 }
849
850 // Fill in the zero bytes
851 if (num_random_bytes < args->val_size) {
852 memset(data + num_random_bytes, 0, args->val_size - num_random_bytes);
853 }
854 }
855
random_put_in_db(DB * db,DB_TXN * txn,ARG arg,bool ignore_errors,void * stats_extra)856 static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) {
857 int r = 0;
858 uint8_t keybuf[arg->cli->key_size];
859 uint8_t valbuf[arg->cli->val_size];
860
861 DBT key, val;
862 dbt_init(&key, keybuf, sizeof keybuf);
863 dbt_init(&val, valbuf, sizeof valbuf);
864 const int put_flags = get_put_flags(arg->cli);
865
866 uint64_t puts_to_increment = 0;
867 for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
868 fill_key_buf_random(arg->random_data, keybuf, arg);
869 fill_val_buf_random(arg->random_data, valbuf, arg->cli);
870 r = db->put(db, txn, &key, &val, put_flags);
871 if (!ignore_errors && r != 0) {
872 goto cleanup;
873 }
874 puts_to_increment++;
875 if (puts_to_increment == 100) {
876 increment_counter(stats_extra, PUTS, puts_to_increment);
877 puts_to_increment = 0;
878 }
879 }
880
881 cleanup:
882 increment_counter(stats_extra, PUTS, puts_to_increment);
883 return r;
884 }
885
UU()886 static int UU() random_put_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
887 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
888 DB* db = arg->dbp[db_index];
889 return random_put_in_db(db, txn, arg, false, stats_extra);
890 }
891
UU()892 static int UU() random_put_op_singledb(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
893 int db_index = arg->thread_idx%arg->cli->num_DBs;
894 DB* db = arg->dbp[db_index];
895 return random_put_in_db(db, txn, arg, false, stats_extra);
896 }
897
898 struct serial_put_extra {
899 uint64_t current;
900 };
901
UU()902 static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) {
903 struct serial_put_extra *CAST_FROM_VOIDP(extra, operation_extra);
904
905 int db_index = arg->thread_idx % arg->cli->num_DBs;
906 DB* db = arg->dbp[db_index];
907
908 int r = 0;
909 uint8_t keybuf[arg->cli->key_size];
910 uint8_t valbuf[arg->cli->val_size];
911
912 DBT key, val;
913 dbt_init(&key, keybuf, sizeof keybuf);
914 dbt_init(&val, valbuf, sizeof valbuf);
915 const int put_flags = get_put_flags(arg->cli);
916
917 uint64_t puts_to_increment = 0;
918 for (uint64_t i = 0; i < arg->cli->txn_size; ++i) {
919 // TODO: Change perf_insert to pass a single serial_put_op_extra
920 // to each insertion thread so they share the current key,
921 // and use a sync fetch an add here. This way you can measure
922 // the true performance of multiple threads appending unique
923 // keys to the end of a tree.
924 uint64_t k = extra->current++;
925 fill_key_buf(k, keybuf, arg->cli);
926 fill_val_buf_random(arg->random_data, valbuf, arg->cli);
927 r = db->put(db, txn, &key, &val, put_flags);
928 if (r != 0) {
929 goto cleanup;
930 }
931 puts_to_increment++;
932 if (puts_to_increment == 100) {
933 increment_counter(stats_extra, PUTS, puts_to_increment);
934 puts_to_increment = 0;
935 }
936 }
937
938 cleanup:
939 increment_counter(stats_extra, PUTS, puts_to_increment);
940 return r;
941 }
942
943 struct loader_op_extra {
944 struct scan_op_extra soe;
945 int num_dbs;
946 };
947
UU()948 static int UU() loader_op(DB_TXN* txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
949 struct loader_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
950 invariant(extra->num_dbs >= 1);
951 DB_ENV* env = arg->env;
952 int r;
953 for (int num = 0; num < 2; num++) {
954 DB *dbs_load[extra->num_dbs];
955 uint32_t db_flags[extra->num_dbs];
956 uint32_t dbt_flags[extra->num_dbs];
957 for (int i = 0; i < extra->num_dbs; ++i) {
958 db_flags[i] = 0;
959 dbt_flags[i] = 0;
960 r = db_create(&dbs_load[i], env, 0);
961 assert(r == 0);
962 char fname[100];
963 sprintf(fname, "loader-db-%d", i);
964 // TODO: Need to call before_db_open_hook() and after_db_open_hook()
965 r = dbs_load[i]->open(dbs_load[i], txn, fname, nullptr, DB_BTREE, DB_CREATE, 0666);
966 assert(r == 0);
967 }
968 DB_LOADER *loader;
969 uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES;
970 r = env->create_loader(env, txn, &loader, dbs_load[0], extra->num_dbs, dbs_load, db_flags, dbt_flags, loader_flags);
971 CKERR(r);
972
973 DBT key, val;
974 uint8_t keybuf[arg->cli->key_size];
975 uint8_t valbuf[arg->cli->val_size];
976 dbt_init(&key, keybuf, sizeof keybuf);
977 dbt_init(&val, valbuf, sizeof valbuf);
978
979 int sum = 0;
980 const int num_elements = 1000;
981 for (int i = 0; i < num_elements; i++) {
982 fill_key_buf(i, keybuf, arg->cli);
983 fill_val_buf_random(arg->random_data, valbuf, arg->cli);
984
985 assert(val.size >= sizeof(int));
986 if (i == num_elements - 1) {
987 // Make sum add to 0
988 *(int *) val.data = -sum;
989 } else {
990 // Keep track of sum
991 sum += *(int *) val.data;
992 }
993 r = loader->put(loader, &key, &val); CKERR(r);
994 }
995
996 r = loader->close(loader); CKERR(r);
997
998 for (int i = 0; i < extra->num_dbs; ++i) {
999 r = scan_op_and_maybe_check_sum(dbs_load[i], txn, &extra->soe, true); CKERR(r);
1000 r = dbs_load[i]->close(dbs_load[i], 0); CKERR(r);
1001 char fname[100];
1002 sprintf(fname, "loader-db-%d", i);
1003 r = env->dbremove(env, txn, fname, nullptr, 0); CKERR(r);
1004 }
1005 }
1006 return 0;
1007 }
1008
UU()1009 static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1010 // Pick a random DB, do a keyrange operation.
1011 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1012 DB* db = arg->dbp[db_index];
1013
1014 int r = 0;
1015 uint8_t keybuf[arg->cli->key_size];
1016
1017 DBT key;
1018 dbt_init(&key, keybuf, sizeof keybuf);
1019 fill_key_buf_random(arg->random_data, keybuf, arg);
1020
1021 uint64_t less,equal,greater;
1022 int is_exact;
1023 r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact);
1024 assert(r == 0);
1025 return r;
1026 }
1027
UU()1028 static int UU() frag_op(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1029 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1030 DB *db = arg->dbp[db_index];
1031
1032 TOKU_DB_FRAGMENTATION_S frag;
1033 int r = db->get_fragmentation(db, &frag);
1034 invariant_zero(r);
1035 return r;
1036 }
1037
UU()1038 static void UU() get_key_after_bytes_callback(const DBT *UU(end_key), uint64_t UU(skipped), void *UU(extra)) {
1039 // nothing
1040 }
1041
UU()1042 static int UU() get_key_after_bytes_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1043 // Pick a random DB, do a get_key_after_bytes operation.
1044 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1045 DB* db = arg->dbp[db_index];
1046
1047 int r = 0;
1048 uint8_t keybuf[arg->cli->key_size];
1049
1050 DBT start_key, end_key;
1051 dbt_init(&start_key, keybuf, sizeof keybuf);
1052 fill_key_buf_random(arg->random_data, keybuf, arg);
1053 uint64_t skip_len = myrandom_r(arg->random_data) % (2<<30);
1054 dbt_init(&end_key, nullptr, 0);
1055
1056 r = db->get_key_after_bytes(db, txn, &start_key, skip_len, get_key_after_bytes_callback, nullptr, 0);
1057 return r;
1058 }
1059
verify_progress_callback(void * UU (extra),float UU (progress))1060 static int verify_progress_callback(void *UU(extra), float UU(progress)) {
1061 if (!run_test) {
1062 return -1;
1063 }
1064 return 0;
1065 }
1066
UU()1067 static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
1068 int r = 0;
1069 for (int i = 0; i < arg->cli->num_DBs && run_test; i++) {
1070 DB* db = arg->dbp[i];
1071 r = db->verify_with_progress(db, verify_progress_callback, nullptr, 1, 0);
1072 if (!run_test) {
1073 r = 0;
1074 }
1075 CKERR(r);
1076 }
1077 return r;
1078 }
1079
UU()1080 static int UU() scan_op(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1081 struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
1082 for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
1083 int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, true);
1084 assert_zero(r);
1085 }
1086 return 0;
1087 }
1088
UU()1089 static int UU() scan_op_no_check(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1090 struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
1091 for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
1092 int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, false);
1093 assert_zero(r);
1094 }
1095 return 0;
1096 }
1097
1098 struct scan_op_worker_info {
1099 DB *db;
1100 DB_TXN *txn;
1101 void *extra;
1102 };
1103
scan_op_worker(void * arg)1104 static void scan_op_worker(void *arg) {
1105 struct scan_op_worker_info *CAST_FROM_VOIDP(info, arg);
1106 struct scan_op_extra *CAST_FROM_VOIDP(extra, info->extra);
1107 int r = scan_op_and_maybe_check_sum(
1108 info->db,
1109 info->txn,
1110 extra,
1111 false
1112 );
1113 assert_zero(r);
1114 toku_free(info);
1115 }
1116
UU()1117 static int UU() scan_op_no_check_parallel(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1118 const int num_cores = toku_os_get_number_processors();
1119 const int num_workers = arg->cli->num_DBs < num_cores ? arg->cli->num_DBs : num_cores;
1120 KIBBUTZ kibbutz = NULL;
1121 int r = toku_kibbutz_create(num_workers, &kibbutz);
1122 assert(r == 0);
1123 for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
1124 struct scan_op_worker_info *XCALLOC(info);
1125 info->db = arg->dbp[i];
1126 info->txn = txn;
1127 info->extra = operation_extra;
1128 toku_kibbutz_enq(kibbutz, scan_op_worker, info);
1129 }
1130 toku_kibbutz_destroy(kibbutz);
1131 return 0;
1132 }
1133
dbt_do_nothing(DBT const * UU (key),DBT const * UU (row),void * UU (context))1134 static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(context)) {
1135 return 0;
1136 }
1137
UU()1138 static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) {
1139 int r = 0;
1140 uint8_t keybuf[arg->cli->key_size];
1141 DBT key, val;
1142 dbt_init(&key, keybuf, sizeof keybuf);
1143 dbt_init(&val, nullptr, 0);
1144 fill_key_buf_random(arg->random_data, keybuf, arg);
1145
1146 r = db->getf_set(
1147 db,
1148 txn,
1149 0,
1150 &key,
1151 dbt_do_nothing,
1152 nullptr
1153 );
1154 if (check) {
1155 assert(r != DB_NOTFOUND);
1156 }
1157 r = 0;
1158 return r;
1159 }
1160
UU()1161 static int UU() ptquery_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *stats_extra) {
1162 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1163 DB* db = arg->dbp[db_index];
1164 int r = ptquery_and_maybe_check_op(db, txn, arg, true);
1165 if (!r) {
1166 increment_counter(stats_extra, PTQUERIES, 1);
1167 }
1168 return r;
1169 }
1170
UU()1171 static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *stats_extra) {
1172 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1173 DB* db = arg->dbp[db_index];
1174 int r = ptquery_and_maybe_check_op(db, txn, arg, false);
1175 if (!r) {
1176 increment_counter(stats_extra, PTQUERIES, 1);
1177 }
1178 return r;
1179 }
1180
1181 typedef void (*rangequery_row_cb)(DB *db, const DBT *key, const DBT *val, void *extra);
1182 struct rangequery_cb_extra {
1183 int rows_read;
1184
1185 // Call cb(db, key, value, cb_extra) on up to $limit rows.
1186 const int limit;
1187 const rangequery_row_cb cb;
1188 DB *const db;
1189 void *const cb_extra;
1190 };
1191
rangequery_cb(const DBT * key,const DBT * value,void * extra)1192 static int rangequery_cb(const DBT *key, const DBT *value, void *extra) {
1193 struct rangequery_cb_extra *CAST_FROM_VOIDP(info, extra);
1194 if (info->cb != nullptr) {
1195 info->cb(info->db, key, value, info->cb_extra);
1196 }
1197 if (++info->rows_read >= info->limit) {
1198 return 0;
1199 } else {
1200 return TOKUDB_CURSOR_CONTINUE;
1201 }
1202 }
1203
rangequery_db(DB * db,DB_TXN * txn,ARG arg,rangequery_row_cb cb,void * cb_extra)1204 static void rangequery_db(DB *db, DB_TXN *txn, ARG arg, rangequery_row_cb cb, void *cb_extra) {
1205 const int limit = arg->cli->range_query_limit;
1206
1207 int r;
1208 DBC *cursor;
1209 DBT start_key, end_key;
1210 uint8_t start_keybuf[arg->cli->key_size];
1211 uint8_t end_keybuf[arg->cli->key_size];
1212 dbt_init(&start_key, start_keybuf, sizeof start_keybuf);
1213 dbt_init(&end_key, end_keybuf, sizeof end_keybuf);
1214 const uint64_t start_k = random_bounded_key(arg->random_data, arg);
1215 fill_key_buf(start_k, start_keybuf, arg->cli);
1216 fill_key_buf(start_k + limit, end_keybuf, arg->cli);
1217
1218 r = db->cursor(db, txn, &cursor, 0); CKERR(r);
1219 r = cursor->c_set_bounds(cursor, &start_key, &end_key, true, 0); CKERR(r);
1220
1221 struct rangequery_cb_extra extra = {
1222 .rows_read = 0,
1223 .limit = limit,
1224 .cb = cb,
1225 .db = db,
1226 .cb_extra = cb_extra,
1227 };
1228 r = cursor->c_getf_set(cursor, 0, &start_key, rangequery_cb, &extra);
1229 while (r == 0 && extra.rows_read < extra.limit && run_test) {
1230 r = cursor->c_getf_next(cursor, 0, rangequery_cb, &extra);
1231 }
1232
1233 r = cursor->c_close(cursor); CKERR(r);
1234 }
1235
UU()1236 static int UU() rangequery_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
1237 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1238 DB *db = arg->dbp[db_index];
1239 rangequery_db(db, txn, arg, nullptr, nullptr);
1240 increment_counter(stats_extra, PTQUERIES, 1);
1241 return 0;
1242 }
1243
UU()1244 static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1245 int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0;
1246 DB* db = arg->dbp[db_index];
1247 DBC* cursor = nullptr;
1248 int r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
1249 r = cursor->c_close(cursor); assert(r == 0);
1250 return 0;
1251 }
1252
1253 #define MAX_RANDOM_VAL 10000
1254
1255 enum update_type {
1256 UPDATE_ADD_DIFF,
1257 UPDATE_NEGATE,
1258 UPDATE_WITH_HISTORY
1259 };
1260
1261 struct update_op_extra {
1262 enum update_type type;
1263 int pad_bytes;
1264 union {
1265 struct {
1266 int diff;
1267 } d;
1268 struct {
1269 int expected;
1270 int new_val;
1271 } h;
1272 } u;
1273 };
1274
1275 struct update_op_args {
1276 int *update_history_buffer;
1277 int update_pad_frequency;
1278 };
1279
UU()1280 static struct update_op_args UU() get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) {
1281 struct update_op_args uoe;
1282 uoe.update_history_buffer = update_history_buffer;
1283 uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary
1284 return uoe;
1285 }
1286
1287 static uint64_t update_count = 0;
1288
update_op_callback(DB * UU (db),const DBT * UU (key),const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)1289 static int update_op_callback(DB *UU(db), const DBT *UU(key),
1290 const DBT *old_val,
1291 const DBT *extra,
1292 void (*set_val)(const DBT *new_val,
1293 void *set_extra),
1294 void *set_extra)
1295 {
1296 int old_int_val = 0;
1297 if (old_val) {
1298 old_int_val = *(int *) old_val->data;
1299 }
1300 assert(extra->size == sizeof(struct update_op_extra));
1301 struct update_op_extra *CAST_FROM_VOIDP(e, extra->data);
1302
1303 int new_int_val;
1304 switch (e->type) {
1305 case UPDATE_ADD_DIFF:
1306 new_int_val = old_int_val + e->u.d.diff;
1307 break;
1308 case UPDATE_NEGATE:
1309 new_int_val = -old_int_val;
1310 break;
1311 case UPDATE_WITH_HISTORY:
1312 assert(old_int_val == e->u.h.expected);
1313 new_int_val = e->u.h.new_val;
1314 break;
1315 default:
1316 abort();
1317 }
1318
1319 uint32_t val_size = sizeof(int) + e->pad_bytes;
1320 uint8_t valbuf[val_size];
1321 fill_val_buf(new_int_val, valbuf, val_size);
1322
1323 DBT new_val;
1324 dbt_init(&new_val, valbuf, val_size);
1325 set_val(&new_val, set_extra);
1326 return 0;
1327 }
1328
UU()1329 static int UU() update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1330 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1331 DB* db = arg->dbp[db_index];
1332
1333 int r = 0;
1334 DBT key, val;
1335 uint8_t keybuf[arg->cli->key_size];
1336
1337 toku_sync_fetch_and_add(&update_count, 1);
1338 struct update_op_extra extra;
1339 ZERO_STRUCT(extra);
1340 extra.type = UPDATE_ADD_DIFF;
1341 extra.pad_bytes = 0;
1342 int curr_val_sum = 0;
1343
1344 dbt_init(&key, keybuf, sizeof keybuf);
1345 dbt_init(&val, &extra, sizeof extra);
1346
1347 for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
1348 fill_key_buf_random(arg->random_data, keybuf, arg);
1349 extra.u.d.diff = 1;
1350 curr_val_sum += extra.u.d.diff;
1351 r = db->update(
1352 db,
1353 txn,
1354 &key,
1355 &val,
1356 0
1357 );
1358 if (r != 0) {
1359 return r;
1360 }
1361 int *rkp = (int *) keybuf;
1362 int rand_key = *rkp;
1363 invariant(rand_key != (arg->cli->num_elements - rand_key));
1364 rand_key -= arg->cli->num_elements;
1365 fill_key_buf(rand_key, keybuf, arg->cli);
1366 extra.u.d.diff = -1;
1367 r = db->update(
1368 db,
1369 txn,
1370 &key,
1371 &val,
1372 0
1373 );
1374 if (r != 0) {
1375 return r;
1376 }
1377 }
1378 return r;
1379 }
1380
pre_acquire_write_lock(DB * db,DB_TXN * txn,const DBT * left_key,const DBT * right_key)1381 static int pre_acquire_write_lock(DB *db, DB_TXN *txn,
1382 const DBT *left_key, const DBT *right_key) {
1383 int r;
1384 DBC *cursor;
1385
1386 r = db->cursor(db, txn, &cursor, DB_RMW);
1387 CKERR(r);
1388 int cursor_r = cursor->c_set_bounds(cursor, left_key, right_key, true, 0);
1389 r = cursor->c_close(cursor);
1390 CKERR(r);
1391
1392 return cursor_r;
1393 }
1394
1395 // take the given db and do an update on it
1396 static int
UU()1397 UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1398 uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1);
1399 struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
1400 struct update_op_extra extra;
1401 ZERO_STRUCT(extra);
1402 extra.type = UPDATE_ADD_DIFF;
1403 extra.pad_bytes = 0;
1404 if (op_args->update_pad_frequency) {
1405 if (old_update_count % (2*op_args->update_pad_frequency) == old_update_count%op_args->update_pad_frequency) {
1406 extra.pad_bytes = 100;
1407 }
1408 }
1409
1410 int r = 0;
1411 DBT key, val;
1412 uint8_t keybuf[arg->cli->key_size];
1413 int update_key;
1414 int curr_val_sum = 0;
1415 const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0;
1416
1417 for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
1418 if (arg->prelock_updates) {
1419 if (i == 0) {
1420 update_key = random_bounded_key(arg->random_data, arg);
1421
1422 const int max_key_in_table = arg->cli->num_elements - 1;
1423 const bool range_wraps = (update_key + (int) arg->cli->txn_size - 1) > max_key_in_table;
1424 int left_key, right_key;
1425 DBT left_key_dbt, right_key_dbt;
1426
1427 // acquire the range starting at the random key, plus txn_size - 1
1428 // elements, but lock no further than the end of the table. if the
1429 // range wraps around to the beginning we will handle it below.
1430 left_key = update_key;
1431 right_key = range_wraps ? max_key_in_table : (left_key + arg->cli->txn_size - 1);
1432 r = pre_acquire_write_lock(
1433 db,
1434 txn,
1435 dbt_init(&left_key_dbt, &left_key, sizeof update_key),
1436 dbt_init(&right_key_dbt, &right_key, sizeof right_key)
1437 );
1438 if (r != 0) {
1439 return r;
1440 }
1441
1442 // check if the right end point wrapped around to the beginning
1443 // if so, lock from 0 to the right key, modded by table size.
1444 if (range_wraps) {
1445 right_key = (left_key + arg->cli->txn_size - 1) - max_key_in_table;
1446 invariant(right_key > 0);
1447 left_key = 0;
1448 r = pre_acquire_write_lock(
1449 db,
1450 txn,
1451 dbt_init(&left_key_dbt, &left_key, sizeof update_key),
1452 dbt_init(&right_key_dbt, &right_key, sizeof right_key)
1453 );
1454 if (r != 0) {
1455 return r;
1456 }
1457 }
1458 } else {
1459 update_key++;
1460 if (arg->bounded_element_range) {
1461 update_key = update_key % arg->cli->num_elements;
1462 }
1463 }
1464 fill_key_buf(update_key, keybuf, arg->cli);
1465 } else {
1466 // just do a usual, random point update without locking first
1467 fill_key_buf_random(arg->random_data, keybuf, arg);
1468 }
1469
1470
1471 // the last update keeps the table's sum as zero
1472 // every other update except the last applies a random delta
1473 if (i == arg->cli->txn_size - 1) {
1474 extra.u.d.diff = -curr_val_sum;
1475 } else {
1476 extra.u.d.diff = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
1477 // just make every other value random
1478 if (i%2 == 0) {
1479 extra.u.d.diff = -extra.u.d.diff;
1480 }
1481 curr_val_sum += extra.u.d.diff;
1482 }
1483
1484 dbt_init(&key, keybuf, sizeof keybuf);
1485 dbt_init(&val, &extra, sizeof extra);
1486
1487 // do the update
1488 r = db->update(
1489 db,
1490 txn,
1491 &key,
1492 &val,
1493 update_flags
1494 );
1495 if (r != 0) {
1496 return r;
1497 }
1498 }
1499
1500 return r;
1501 }
1502
1503 // choose a random DB and do an update on it
1504 static int
UU()1505 UU() update_op(DB_TXN *txn, ARG arg, void* operation_extra, void *stats_extra) {
1506 int db_index = myrandom_r(arg->random_data) % arg->cli->num_DBs;
1507 DB *db = arg->dbp[db_index];
1508 return update_op_db(db, txn, arg, operation_extra, stats_extra);
1509 }
1510
UU()1511 static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1512 struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
1513 assert(arg->bounded_element_range);
1514 assert(op_args->update_history_buffer);
1515
1516 int r = 0;
1517 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1518 DB* db = arg->dbp[db_index];
1519
1520 struct update_op_extra extra;
1521 ZERO_STRUCT(extra);
1522 extra.type = UPDATE_WITH_HISTORY;
1523 uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1);
1524 extra.pad_bytes = 0;
1525 if (op_args->update_pad_frequency) {
1526 if (old_update_count % (2*op_args->update_pad_frequency) != old_update_count%op_args->update_pad_frequency) {
1527 extra.pad_bytes = 500;
1528 }
1529 }
1530
1531 DBT key, val;
1532 uint8_t keybuf[arg->cli->key_size];
1533 int rand_key;
1534 int curr_val_sum = 0;
1535
1536 dbt_init(&key, keybuf, sizeof keybuf);
1537 dbt_init(&val, &extra, sizeof extra);
1538
1539 for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
1540 fill_key_buf_random(arg->random_data, keybuf, arg);
1541 int *rkp = (int *) keybuf;
1542 rand_key = *rkp;
1543 invariant(rand_key < arg->cli->num_elements);
1544 if (i < arg->cli->txn_size - 1) {
1545 extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
1546 // just make every other value random
1547 if (i % 2 == 0) {
1548 extra.u.h.new_val = -extra.u.h.new_val;
1549 }
1550 curr_val_sum += extra.u.h.new_val;
1551 } else {
1552 // the last update should ensure the sum stays zero
1553 extra.u.h.new_val = -curr_val_sum;
1554 }
1555 extra.u.h.expected = op_args->update_history_buffer[rand_key];
1556 op_args->update_history_buffer[rand_key] = extra.u.h.new_val;
1557 r = db->update(
1558 db,
1559 txn,
1560 &key,
1561 &val,
1562 0
1563 );
1564 if (r != 0) {
1565 return r;
1566 }
1567 }
1568
1569 return r;
1570 }
1571
UU()1572 static int UU() update_broadcast_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1573 struct update_op_extra extra;
1574 ZERO_STRUCT(extra);
1575 int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1576 DB* db = arg->dbp[db_index];
1577 extra.type = UPDATE_NEGATE;
1578 extra.pad_bytes = 0;
1579 DBT val;
1580 int r = db->update_broadcast(db, txn, dbt_init(&val, &extra, sizeof extra), 0);
1581 CKERR(r);
1582 return r;
1583 }
1584
hot_progress_callback(void * UU (extra),float UU (progress))1585 static int hot_progress_callback(void *UU(extra), float UU(progress)) {
1586 return run_test ? 0 : 1;
1587 }
1588
UU()1589 static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
1590 int r;
1591 for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
1592 DB* db = arg->dbp[i];
1593 uint64_t loops_run;
1594 r = db->hot_optimize(db, NULL, NULL, hot_progress_callback, nullptr, &loops_run);
1595 if (run_test) {
1596 CKERR(r);
1597 }
1598 }
1599 return 0;
1600 }
1601
1602 static void
get_ith_table_name(char * buf,size_t len,int i)1603 get_ith_table_name(char *buf, size_t len, int i) {
1604 snprintf(buf, len, "main%d", i);
1605 }
1606
1607 DB_TXN * const null_txn = 0;
1608
1609 // For each line of engine status output, look for lines that contain substrings
1610 // that match any of the strings in the pattern string. The pattern string contains
1611 // 0 or more strings separated by the '|' character, kind of like a regex.
print_matching_engine_status_rows(DB_ENV * env,const char * pattern)1612 static void print_matching_engine_status_rows(DB_ENV *env, const char *pattern) {
1613 uint64_t num_rows;
1614 env->get_engine_status_num_rows(env, &num_rows);
1615 uint64_t buf_size = num_rows * 128;
1616 const char *row;
1617 char *row_r;
1618
1619 char *pattern_copy = toku_xstrdup(pattern);
1620 int num_patterns = 1;
1621 for (char *p = pattern_copy; *p != '\0'; p++) {
1622 if (*p == '|') {
1623 *p = '\0';
1624 num_patterns++;
1625 }
1626 }
1627
1628 char *XMALLOC_N(buf_size, buf);
1629 int r = env->get_engine_status_text(env, buf, buf_size);
1630 invariant_zero(r);
1631
1632 for (row = strtok_r(buf, "\n", &row_r); row != nullptr; row = strtok_r(nullptr, "\n", &row_r)) {
1633 const char *p = pattern_copy;
1634 for (int i = 0; i < num_patterns; i++, p += strlen(p) + 1) {
1635 if (strstr(row, p) != nullptr) {
1636 fprintf(stderr, "%s\n", row);
1637 }
1638 }
1639 }
1640
1641 toku_free(pattern_copy);
1642 toku_free(buf);
1643 fflush(stderr);
1644 }
1645
1646 // TODO: stuff like this should be in a generalized header somwhere
1647 static inline int
intmin(const int a,const int b)1648 intmin(const int a, const int b)
1649 {
1650 if (a < b) {
1651 return a;
1652 }
1653 return b;
1654 }
1655
1656 struct test_time_extra {
1657 DB_ENV *env;
1658 int num_seconds;
1659 bool crash_at_end;
1660 struct worker_extra *wes;
1661 int num_wes;
1662 struct cli_args *cli_args;
1663 };
1664
test_time(void * arg)1665 static void *test_time(void *arg) {
1666 struct test_time_extra* CAST_FROM_VOIDP(tte, arg);
1667 DB_ENV *env = tte->env;
1668 int num_seconds = tte->num_seconds;
1669 const struct perf_formatter *perf_formatter = &perf_formatters[tte->cli_args->perf_output_format];
1670
1671 //
1672 // if num_Seconds is set to 0, run indefinitely
1673 //
1674 if (num_seconds == 0) {
1675 num_seconds = INT32_MAX;
1676 }
1677 uint64_t last_counter_values[tte->num_wes][(int) NUM_OPERATION_TYPES];
1678 ZERO_ARRAY(last_counter_values);
1679 uint64_t *counters[tte->num_wes];
1680 for (int t = 0; t < tte->num_wes; ++t) {
1681 counters[t] = tte->wes[t].counters;
1682 }
1683 if (verbose) {
1684 printf("Sleeping for %d seconds\n", num_seconds);
1685 }
1686 for (int i = 0; i < num_seconds; ) {
1687 struct timeval tv[2];
1688 const int sleeptime = intmin(tte->cli_args->performance_period, num_seconds - i);
1689 int r = gettimeofday(&tv[0], nullptr);
1690 assert_zero(r);
1691 usleep(sleeptime*1000*1000);
1692 r = gettimeofday(&tv[1], nullptr);
1693 assert_zero(r);
1694 int actual_sleeptime = tv[1].tv_sec - tv[0].tv_sec;
1695 if (abs(actual_sleeptime - sleeptime) <= 1) {
1696 // Close enough, no need to alarm the user, and we didn't check nsec.
1697 i += sleeptime;
1698 } else {
1699 if (verbose) {
1700 printf("tried to sleep %d secs, actually slept %d secs\n", sleeptime, actual_sleeptime);
1701 }
1702 i += actual_sleeptime;
1703 }
1704 if (tte->cli_args->print_performance && tte->cli_args->print_iteration_performance) {
1705 perf_formatter->iteration(tte->cli_args, i, last_counter_values, counters, tte->num_wes);
1706 }
1707 if (tte->cli_args->print_engine_status != nullptr) {
1708 print_matching_engine_status_rows(env, tte->cli_args->print_engine_status);
1709 }
1710 }
1711
1712 if (verbose) {
1713 printf("should now end test\n");
1714 }
1715 toku_sync_bool_compare_and_swap(&run_test, true, false); // make this atomic to make valgrind --tool=drd happy.
1716 if (verbose) {
1717 printf("run_test %d\n", run_test);
1718 }
1719 if (tte->crash_at_end) {
1720 toku_hard_crash_on_purpose();
1721 }
1722 return arg;
1723 }
1724
1725 struct sleep_and_crash_extra {
1726 toku_mutex_t mutex;
1727 toku_cond_t cond;
1728 int seconds;
1729 bool is_setup;
1730 bool threads_have_joined;
1731 };
1732
sleep_and_crash(void * extra)1733 static void *sleep_and_crash(void *extra) {
1734 sleep_and_crash_extra *e = static_cast<sleep_and_crash_extra *>(extra);
1735 toku_mutex_lock(&e->mutex);
1736 struct timeval tv;
1737 toku_timespec_t ts;
1738 gettimeofday(&tv, nullptr);
1739 ts.tv_sec = tv.tv_sec + e->seconds;
1740 ts.tv_nsec = 0;
1741 e->is_setup = true;
1742 if (verbose) {
1743 printf("Waiting %d seconds for other threads to join.\n", e->seconds);
1744 fflush(stdout);
1745 }
1746 int r = toku_cond_timedwait(&e->cond, &e->mutex, &ts);
1747 toku_mutex_assert_locked(&e->mutex);
1748 if (r == ETIMEDOUT) {
1749 invariant(!e->threads_have_joined);
1750 if (verbose) {
1751 printf("Some thread didn't join on time, crashing.\n");
1752 fflush(stdout);
1753 }
1754 toku_crash_and_dump_core_on_purpose();
1755 } else {
1756 assert(r == 0);
1757 assert(e->threads_have_joined);
1758 if (verbose) {
1759 printf("Other threads joined on time, exiting cleanly.\n");
1760 }
1761 }
1762 toku_mutex_unlock(&e->mutex);
1763 return nullptr;
1764 }
1765
run_workers(struct arg * thread_args,int num_threads,uint32_t num_seconds,bool crash_at_end,struct cli_args * cli_args)1766 static int run_workers(
1767 struct arg *thread_args,
1768 int num_threads,
1769 uint32_t num_seconds,
1770 bool crash_at_end,
1771 struct cli_args* cli_args
1772 )
1773 {
1774 int r;
1775 const struct perf_formatter *perf_formatter =
1776 &perf_formatters[cli_args->perf_output_format];
1777 toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER;
1778 toku_mutex_init(toku_uninstrumented, &mutex, nullptr);
1779 struct st_rwlock rwlock;
1780 rwlock_init(toku_uninstrumented, &rwlock);
1781 toku_pthread_t tids[num_threads];
1782 toku_pthread_t time_tid;
1783 if (cli_args->print_performance) {
1784 perf_formatter->header(cli_args, num_threads);
1785 }
1786 // allocate worker_extra's on cache line boundaries
1787 struct worker_extra *XMALLOC_N_ALIGNED(64, num_threads, worker_extra);
1788 struct test_time_extra tte;
1789 tte.env = thread_args[0].env;
1790 tte.num_seconds = num_seconds;
1791 tte.crash_at_end = crash_at_end;
1792 tte.wes = worker_extra;
1793 tte.num_wes = num_threads;
1794 tte.cli_args = cli_args;
1795 run_test = true;
1796 for (int i = 0; i < num_threads; ++i) {
1797 thread_args[i].thread_idx = i;
1798 thread_args[i].num_threads = num_threads;
1799 worker_extra[i].thread_arg = &thread_args[i];
1800 worker_extra[i].operation_lock = &rwlock;
1801 worker_extra[i].operation_lock_mutex = &mutex;
1802 XCALLOC_N((int)NUM_OPERATION_TYPES, worker_extra[i].counters);
1803 TOKU_DRD_IGNORE_VAR(worker_extra[i].counters);
1804 {
1805 int chk_r = toku_pthread_create(toku_uninstrumented,
1806 &tids[i],
1807 nullptr,
1808 worker,
1809 &worker_extra[i]);
1810 CKERR(chk_r);
1811 }
1812 if (verbose)
1813 printf("%lu created\n", (unsigned long)tids[i]);
1814 }
1815 {
1816 int chk_r = toku_pthread_create(
1817 toku_uninstrumented, &time_tid, nullptr, test_time, &tte);
1818 CKERR(chk_r);
1819 }
1820 if (verbose)
1821 printf("%lu created\n", (unsigned long)time_tid);
1822
1823 void *ret;
1824 r = toku_pthread_join(time_tid, &ret); assert_zero(r);
1825 if (verbose) printf("%lu joined\n", (unsigned long) time_tid);
1826
1827 {
1828 // Set an alarm that will kill us if it takes too long to join all the
1829 // threads (i.e. there is some runaway thread).
1830 struct sleep_and_crash_extra sac_extra;
1831 ZERO_STRUCT(sac_extra);
1832 toku_mutex_init(toku_uninstrumented, &sac_extra.mutex, nullptr);
1833 toku_cond_init(toku_uninstrumented, &sac_extra.cond, nullptr);
1834 sac_extra.seconds = cli_args->join_timeout;
1835 sac_extra.is_setup = false;
1836 sac_extra.threads_have_joined = false;
1837
1838 toku_mutex_lock(&sac_extra.mutex);
1839 toku_pthread_t sac_thread;
1840 r = toku_pthread_create(toku_uninstrumented,
1841 &sac_thread,
1842 nullptr,
1843 sleep_and_crash,
1844 &sac_extra);
1845 assert_zero(r);
1846 // Wait for sleep_and_crash thread to get set up, spinning is ok, this
1847 // should be quick.
1848 while (!sac_extra.is_setup) {
1849 toku_mutex_unlock(&sac_extra.mutex);
1850 r = toku_pthread_yield();
1851 assert_zero(r);
1852 toku_mutex_lock(&sac_extra.mutex);
1853 }
1854 toku_mutex_unlock(&sac_extra.mutex);
1855
1856 // Timeout thread has started, join everyone
1857 for (int i = 0; i < num_threads; ++i) {
1858 r = toku_pthread_join(tids[i], &ret); assert_zero(r);
1859 if (verbose)
1860 printf("%lu joined\n", (unsigned long) tids[i]);
1861 }
1862
1863 // Signal timeout thread not to crash.
1864 toku_mutex_lock(&sac_extra.mutex);
1865 sac_extra.threads_have_joined = true;
1866 toku_cond_signal(&sac_extra.cond);
1867 toku_mutex_unlock(&sac_extra.mutex);
1868 r = toku_pthread_join(sac_thread, nullptr);
1869 assert_zero(r);
1870 toku_cond_destroy(&sac_extra.cond);
1871 toku_mutex_destroy(&sac_extra.mutex);
1872 }
1873
1874 if (cli_args->print_performance) {
1875 uint64_t *counters[num_threads];
1876 for (int i = 0; i < num_threads; ++i) {
1877 counters[i] = worker_extra[i].counters;
1878 }
1879 perf_formatter->totals(cli_args, counters, num_threads);
1880 }
1881
1882 for (int i = 0; i < num_threads; ++i) {
1883 toku_free(worker_extra[i].counters);
1884 }
1885 if (verbose)
1886 printf("ending test, pthreads have joined\n");
1887 rwlock_destroy(&rwlock);
1888 toku_mutex_destroy(&mutex);
1889 toku_free(worker_extra);
1890 return r;
1891 }
1892
1893 // Pre-open hook
do_nothing_before_db_open(DB * UU (db),int UU (idx))1894 static void do_nothing_before_db_open(DB *UU(db), int UU(idx)) { }
1895 // Requires: DB is created (allocated) but not opened. idx is the index
1896 // into the DBs array.
1897 static void (*before_db_open_hook)(DB *db, int idx) = do_nothing_before_db_open;
1898
1899 // Post-open hook
1900 typedef void (*reopen_db_fn)(DB *db, int idx, struct cli_args *cli_args);
do_nothing_after_db_open(DB_ENV * UU (env),DB * db,int UU (idx),reopen_db_fn UU (reopen),struct cli_args * UU (cli_args))1901 static DB *do_nothing_after_db_open(DB_ENV *UU(env), DB *db, int UU(idx), reopen_db_fn UU(reopen), struct cli_args *UU(cli_args)) { return db; }
1902 // Requires: DB is opened and is the 'idx' db in the DBs array.
1903 // Note: Reopen function may be used to open a db if the given one was closed.
1904 // Returns: An opened db.
1905 static DB *(*after_db_open_hook)(DB_ENV *env, DB *db, int idx, reopen_db_fn reopen, struct cli_args *cli_args) = do_nothing_after_db_open;
1906
open_db_for_create(DB * db,int idx,struct cli_args * cli_args)1907 static void open_db_for_create(DB *db, int idx, struct cli_args *cli_args) {
1908 int r;
1909 char name[30];
1910 memset(name, 0, sizeof(name));
1911 get_ith_table_name(name, sizeof(name), idx);
1912 r = db->set_flags(db, 0); CKERR(r);
1913 r = db->set_fanout(db, cli_args->env_args.fanout); CKERR(r);
1914 r = db->set_pagesize(db, cli_args->env_args.node_size); CKERR(r);
1915 r = db->set_readpagesize(db, cli_args->env_args.basement_node_size); CKERR(r);
1916 r = db->set_compression_method(db, cli_args->compression_method); CKERR(r);
1917 const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0);
1918 r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666); CKERR(r);
1919 }
1920
open_db(DB * db,int idx,struct cli_args * cli_args)1921 static void open_db(DB *db, int idx, struct cli_args *cli_args) {
1922 int r;
1923 char name[30];
1924 memset(name, 0, sizeof(name));
1925 get_ith_table_name(name, sizeof(name), idx);
1926 const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0);
1927 r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666); CKERR(r);
1928 r = db->change_fanout(db, cli_args->env_args.fanout); CKERR(r); // change fanout until fanout is persistent
1929 }
1930
create_tables(DB_ENV ** env_res,DB ** db_res,int num_DBs,int (* bt_compare)(DB *,const DBT *,const DBT *),struct cli_args * cli_args)1931 static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
1932 int (*bt_compare)(DB *, const DBT *, const DBT *),
1933 struct cli_args *cli_args
1934 ) {
1935 int r;
1936 struct env_args env_args = cli_args->env_args;
1937
1938 char rmcmd[32 + strlen(env_args.envdir)]; sprintf(rmcmd, "rm -rf %s", env_args.envdir);
1939 r = system(rmcmd);
1940 CKERR(r);
1941 r = toku_os_mkdir(env_args.envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
1942
1943 DB_ENV *env;
1944 db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes);
1945 r = db_env_create(&env, 0); assert(r == 0);
1946 r = env->set_redzone(env, 0); CKERR(r);
1947 if (!cli_args->memcmp_keys) {
1948 r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
1949 }
1950 r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
1951 r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
1952 r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
1953 if (env_args.generate_put_callback) {
1954 r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
1955 CKERR(r);
1956 }
1957 else {
1958 r = env->set_generate_row_callback_for_put(env, generate_row_for_put);
1959 CKERR(r);
1960 }
1961 if (env_args.generate_del_callback) {
1962 r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback);
1963 CKERR(r);
1964 }
1965 int env_flags = get_env_open_flags(cli_args);
1966 r = env->open(env, env_args.envdir, env_flags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
1967 r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
1968 r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
1969 r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
1970 env->change_fsync_log_period(env, env_args.sync_period);
1971 *env_res = env;
1972
1973 for (int i = 0; i < num_DBs; i++) {
1974 DB *db;
1975 r = db_create(&db, env, 0); CKERR(r);
1976 before_db_open_hook(db, i);
1977 open_db_for_create(db, i, cli_args);
1978 db_res[i] = after_db_open_hook(env, db, i, open_db_for_create, cli_args);
1979 }
1980 return r;
1981 }
1982
report_overall_fill_table_progress(struct cli_args * args,int num_rows)1983 static void report_overall_fill_table_progress(struct cli_args *args, int num_rows) {
1984 // for sanitary reasons we'd like to prevent two threads
1985 // from printing the same performance report twice.
1986 static bool reporting;
1987
1988 // when was the first time measurement taken?
1989 static uint64_t t0;
1990 static int rows_inserted;
1991
1992 // when was the last report? what was its progress?
1993 static uint64_t last_report;
1994 static double last_progress;
1995 if (t0 == 0) {
1996 t0 = toku_current_time_microsec();
1997 last_report = t0;
1998 }
1999
2000 uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows);
2001 double progress = rows_so_far / (args->num_elements * args->num_DBs * 1.0);
2002 if (progress > (last_progress + .01)) {
2003 uint64_t t1 = toku_current_time_microsec();
2004 const uint64_t minimum_report_period = 5 * 1000000;
2005 if (t1 > last_report + minimum_report_period
2006 && toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) {
2007 double inserts_per_sec = (rows_so_far*1000000) / ((t1 - t0) * 1.0);
2008 printf("fill tables: %ld%% complete, %.2lf rows/sec\n",
2009 (long)(progress * 100), inserts_per_sec);
2010 last_progress = progress;
2011 last_report = t1;
2012 reporting = false;
2013 }
2014 }
2015 }
2016
fill_single_table(DB_ENV * env,DB * db,struct cli_args * args,bool fill_with_zeroes)2017 static void fill_single_table(DB_ENV *env, DB *db, struct cli_args *args, bool fill_with_zeroes) {
2018 const int min_size_for_loader = 1 * 1000 * 1000;
2019 const int puts_per_txn = 10 * 1000;;
2020
2021 int r = 0;
2022 DB_TXN *txn = nullptr;
2023 DB_LOADER *loader = nullptr;
2024 struct random_data random_data;
2025 char random_buf[8];
2026 memset(&random_data, 0, sizeof(random_data));
2027 memset(random_buf, 0, 8);
2028 r = myinitstate_r(random(), random_buf, 8, &random_data); CKERR(r);
2029
2030 uint8_t keybuf[args->key_size], valbuf[args->val_size];
2031 memset(keybuf, 0, sizeof keybuf);
2032 memset(valbuf, 0, sizeof valbuf);
2033 DBT key, val;
2034 dbt_init(&key, keybuf, args->key_size);
2035 dbt_init(&val, valbuf, args->val_size);
2036
2037 r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
2038 if (args->num_elements >= min_size_for_loader) {
2039 uint32_t db_flags = DB_PRELOCKED_WRITE;
2040 uint32_t dbt_flags = 0;
2041 r = env->create_loader(env, txn, &loader, db, 1, &db, &db_flags, &dbt_flags, 0); CKERR(r);
2042 }
2043
2044 for (int i = 0; i < args->num_elements; i++) {
2045 fill_key_buf(i, keybuf, args);
2046
2047 // Correctness tests map every key to zeroes. Perf tests fill
2048 // values with random bytes, based on compressibility.
2049 if (fill_with_zeroes) {
2050 fill_val_buf(0, valbuf, args->val_size);
2051 } else {
2052 fill_val_buf_random(&random_data, valbuf, args);
2053 }
2054
2055 r = loader ? loader->put(loader, &key, &val) :
2056 db->put(db, txn, &key, &val, DB_PRELOCKED_WRITE);
2057 CKERR(r);
2058
2059 if (i > 0 && i % puts_per_txn == 0) {
2060 if (verbose) {
2061 report_overall_fill_table_progress(args, puts_per_txn);
2062 }
2063 // begin a new txn if we're not using the loader,
2064 if (loader == nullptr) {
2065 r = txn->commit(txn, 0); CKERR(r);
2066 r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
2067 }
2068 }
2069 }
2070
2071 if (loader) {
2072 r = loader->close(loader); CKERR(r);
2073 }
2074 r = txn->commit(txn, 0); CKERR(r);
2075 }
2076
2077 struct fill_table_worker_info {
2078 struct cli_args *args;
2079 DB_ENV *env;
2080 DB *db;
2081 bool fill_with_zeroes;
2082 };
2083
fill_table_worker(void * arg)2084 static void fill_table_worker(void *arg) {
2085 struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
2086 fill_single_table(info->env, info->db, info->args, info->fill_with_zeroes);
2087 toku_free(info);
2088 }
2089
fill_tables_default(DB_ENV * env,DB ** dbs,struct cli_args * args,bool fill_with_zeroes)2090 static int fill_tables_default(DB_ENV *env, DB **dbs, struct cli_args *args, bool fill_with_zeroes) {
2091 const int num_cores = toku_os_get_number_processors();
2092 // Use at most cores / 2 worker threads, since we want some other cores to
2093 // be used for internal engine work (ie: flushes, loader threads, etc).
2094 const int max_num_workers = (num_cores + 1) / 2;
2095 const int num_workers = args->num_DBs < max_num_workers ? args->num_DBs : max_num_workers;
2096 KIBBUTZ kibbutz = NULL;
2097 int r = toku_kibbutz_create(num_workers, &kibbutz);
2098 assert(r == 0);
2099 for (int i = 0; i < args->num_DBs; i++) {
2100 struct fill_table_worker_info *XCALLOC(info);
2101 info->env = env;
2102 info->db = dbs[i];
2103 info->args = args;
2104 info->fill_with_zeroes = fill_with_zeroes;
2105 toku_kibbutz_enq(kibbutz, fill_table_worker, info);
2106 }
2107 toku_kibbutz_destroy(kibbutz);
2108 return 0;
2109 }
2110
2111 // fill_tables() is called when the tables are first created.
2112 // set this function if you want custom table contents.
2113 static int (*fill_tables)(DB_ENV *env, DB **dbs, struct cli_args *args, bool fill_with_zeroes) = fill_tables_default;
2114
do_xa_recovery(DB_ENV * env)2115 static void do_xa_recovery(DB_ENV* env) {
2116 DB_PREPLIST preplist[1];
2117 long num_recovered= 0;
2118 int r = 0;
2119 r = env->txn_recover(env, preplist, 1, &num_recovered, DB_NEXT);
2120 while(r==0 && num_recovered > 0) {
2121 DB_TXN* recovered_txn = preplist[0].txn;
2122 if (verbose) {
2123 printf("recovering transaction with id %" PRIu64 " \n", recovered_txn->id64(recovered_txn));
2124 }
2125 if (random() % 2 == 0) {
2126 int rr = recovered_txn->commit(recovered_txn, 0);
2127 CKERR(rr);
2128 }
2129 else {
2130 int rr = recovered_txn->abort(recovered_txn);
2131 CKERR(rr);
2132 }
2133 r = env->txn_recover(env, preplist, 1, &num_recovered, DB_NEXT);
2134 }
2135 }
2136
open_tables(DB_ENV ** env_res,DB ** db_res,int num_DBs,int (* bt_compare)(DB *,const DBT *,const DBT *),struct cli_args * cli_args)2137 static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
2138 int (*bt_compare)(DB *, const DBT *, const DBT *),
2139 struct cli_args *cli_args) {
2140 int r;
2141 struct env_args env_args = cli_args->env_args;
2142
2143 DB_ENV *env;
2144 db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes);
2145 r = db_env_create(&env, 0); assert(r == 0);
2146 r = env->set_redzone(env, 0); CKERR(r);
2147 if (!cli_args->memcmp_keys) {
2148 r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
2149 }
2150 r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
2151 env->set_update(env, env_args.update_function);
2152 r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
2153 r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
2154 if (env_args.generate_put_callback) {
2155 r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
2156 CKERR(r);
2157 }
2158 else {
2159 r = env->set_generate_row_callback_for_put(env, generate_row_for_put);
2160 CKERR(r);
2161 }
2162 if (env_args.generate_del_callback) {
2163 r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback);
2164 CKERR(r);
2165 }
2166 int env_flags = get_env_open_flags(cli_args);
2167 r = env->open(env, env_args.envdir, DB_RECOVER | env_flags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
2168 do_xa_recovery(env);
2169 r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
2170 r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
2171 r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
2172 env->change_fsync_log_period(env, env_args.sync_period);
2173 *env_res = env;
2174
2175 for (int i = 0; i < num_DBs; i++) {
2176 DB *db;
2177 r = db_create(&db, env, 0); CKERR(r);
2178 before_db_open_hook(db, i);
2179 open_db(db, i, cli_args);
2180 db_res[i] = after_db_open_hook(env, db, i, open_db, cli_args);
2181 }
2182 return r;
2183 }
2184
close_tables(DB_ENV * env,DB ** dbs,int num_DBs)2185 static int close_tables(DB_ENV *env, DB** dbs, int num_DBs) {
2186 int r;
2187 for (int i = 0; i < num_DBs; i++) {
2188 r = dbs[i]->close(dbs[i], 0); CKERR(r);
2189 }
2190 r = env->close(env, 0); CKERR(r);
2191 return r;
2192 }
2193
2194 static const struct env_args DEFAULT_ENV_ARGS = {
2195 .fanout = 16,
2196 .node_size = 4096,
2197 .basement_node_size = 1024,
2198 .rollback_node_size = 4096,
2199 .checkpointing_period = 10,
2200 .cleaner_period = 1,
2201 .cleaner_iterations = 1,
2202 .sync_period = 0,
2203 .lk_max_memory = 1L * 1024 * 1024 * 1024,
2204 .cachetable_size = 300000,
2205 .num_bucket_mutexes = 1024,
2206 .envdir = nullptr,
2207 .update_function = update_op_callback,
2208 .generate_put_callback = nullptr,
2209 .generate_del_callback = nullptr,
2210 };
2211
2212 static const struct env_args DEFAULT_PERF_ENV_ARGS = {
2213 .fanout = 16,
2214 .node_size = 4*1024*1024,
2215 .basement_node_size = 128*1024,
2216 .rollback_node_size = 4*1024*1024,
2217 .checkpointing_period = 60,
2218 .cleaner_period = 1,
2219 .cleaner_iterations = 5,
2220 .sync_period = 0,
2221 .lk_max_memory = 1L * 1024 * 1024 * 1024,
2222 .cachetable_size = 1<<30,
2223 .num_bucket_mutexes = 1024 * 1024,
2224 .envdir = nullptr,
2225 .update_function = nullptr,
2226 .generate_put_callback = nullptr,
2227 .generate_del_callback = nullptr,
2228 };
2229
UU()2230 static struct cli_args UU() get_default_args(void) {
2231 struct cli_args DEFAULT_ARGS = {
2232 .num_elements = 150000,
2233 .num_DBs = 1,
2234 .num_seconds = 180,
2235 .join_timeout = 3600,
2236 .only_create = false,
2237 .only_stress = false,
2238 .update_broadcast_period_ms = 2000,
2239 .num_ptquery_threads = 1,
2240 .do_test_and_crash = false,
2241 .do_recover = false,
2242 .num_update_threads = 1,
2243 .num_put_threads = 1,
2244 .range_query_limit = 100,
2245 .serial_insert = false,
2246 .interleave = false,
2247 .crash_on_operation_failure = true,
2248 .print_performance = false,
2249 .print_thread_performance = true,
2250 .print_iteration_performance = true,
2251 .perf_output_format = HUMAN,
2252 .compression_method = TOKU_DEFAULT_COMPRESSION_METHOD,
2253 .performance_period = 1,
2254 .txn_size = 1000,
2255 .key_size = min_key_size,
2256 .val_size = min_val_size,
2257 .compressibility = 1.0,
2258 .env_args = DEFAULT_ENV_ARGS,
2259 .single_txn = false,
2260 .warm_cache = false,
2261 .blackhole = false,
2262 .nolocktree = false,
2263 .unique_checks = false,
2264 .sync_period = 0,
2265 .nolog = false,
2266 .nocrashstatus = false,
2267 .prelock_updates = false,
2268 .disperse_keys = false,
2269 .memcmp_keys = false,
2270 .direct_io = false,
2271 };
2272 DEFAULT_ARGS.env_args.envdir = TOKU_TEST_FILENAME;
2273 return DEFAULT_ARGS;
2274 }
2275
UU()2276 static struct cli_args UU() get_default_args_for_perf(void) {
2277 struct cli_args args = get_default_args();
2278 args.num_elements = 1000000; //default of 1M
2279 args.env_args = DEFAULT_PERF_ENV_ARGS;
2280 args.env_args.envdir = TOKU_TEST_FILENAME;
2281 return args;
2282 }
2283
2284 union val_type {
2285 int32_t i32;
2286 int64_t i64;
2287 uint32_t u32;
2288 uint64_t u64;
2289 bool b;
2290 double d;
2291 const char *s;
2292 };
2293
2294 struct arg_type;
2295
2296 typedef bool (*match_fun)(struct arg_type *type, char *const argv[]);
2297 typedef int (*parse_fun)(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]);
2298 typedef void (*help_fun)(struct arg_type *type, int width_name, int width_type);
2299
2300 struct type_description {
2301 const char *type_name;
2302 const match_fun matches;
2303 const parse_fun parse;
2304 const help_fun help;
2305 };
2306
2307 struct arg_type {
2308 const char *name;
2309 struct type_description *description;
2310 union val_type default_val;
2311 void *target;
2312 const char *help_suffix;
2313 union val_type min;
2314 union val_type max;
2315 };
2316
2317 #define DEFINE_NUMERIC_HELP(typename, format, member, MIN, MAX) \
2318 static inline void \
2319 help_##typename(struct arg_type *type, int width_name, int width_type) { \
2320 invariant(!strncmp("--", type->name, strlen("--"))); \
2321 fprintf(stderr, "\t%-*s %-*s ", width_name, type->name, width_type, type->description->type_name); \
2322 fprintf(stderr, "(default %" format "%s", type->default_val.member, type->help_suffix); \
2323 if (type->min.member != MIN) { \
2324 fprintf(stderr, ", min %" format "%s", type->min.member, type->help_suffix); \
2325 } \
2326 if (type->max.member != MAX) { \
2327 fprintf(stderr, ", max %" format "%s", type->max.member, type->help_suffix); \
2328 } \
2329 fprintf(stderr, ")\n"); \
2330 }
2331
DEFINE_NUMERIC_HELP(int32,PRId32,i32,INT32_MIN,INT32_MAX)2332 DEFINE_NUMERIC_HELP(int32, PRId32, i32, INT32_MIN, INT32_MAX)
2333 DEFINE_NUMERIC_HELP(int64, PRId64, i64, INT64_MIN, INT64_MAX)
2334 DEFINE_NUMERIC_HELP(uint32, PRIu32, u32, 0, UINT32_MAX)
2335 DEFINE_NUMERIC_HELP(uint64, PRIu64, u64, 0, UINT64_MAX)
2336 DEFINE_NUMERIC_HELP(double, ".2lf", d, -HUGE_VAL, HUGE_VAL)
2337 static inline void
2338 help_bool(struct arg_type *type, int width_name, int width_type) {
2339 invariant(strncmp("--", type->name, strlen("--")));
2340 const char *default_value = type->default_val.b ? "yes" : "no";
2341 fprintf(stderr, "\t--[no-]%-*s %-*s (default %s)\n",
2342 width_name - (int)strlen("--[no-]"), type->name,
2343 width_type, type->description->type_name,
2344 default_value);
2345 }
2346
2347 static inline void
help_string(struct arg_type * type,int width_name,int width_type)2348 help_string(struct arg_type *type, int width_name, int width_type) {
2349 invariant(!strncmp("--", type->name, strlen("--")));
2350 const char *default_value = type->default_val.s ? type->default_val.s : "";
2351 fprintf(stderr, "\t%-*s %-*s (default '%s')\n",
2352 width_name, type->name,
2353 width_type, type->description->type_name,
2354 default_value);
2355 }
2356
2357 static inline bool
match_name(struct arg_type * type,char * const argv[])2358 match_name(struct arg_type *type, char *const argv[]) {
2359 invariant(!strncmp("--", type->name, strlen("--")));
2360 return !strcmp(argv[1], type->name);
2361 }
2362
2363 static inline bool
match_bool(struct arg_type * type,char * const argv[])2364 match_bool(struct arg_type *type, char *const argv[]) {
2365 invariant(strncmp("--", type->name, strlen("--")));
2366 const char *string = argv[1];
2367 if (strncmp(string, "--", strlen("--"))) {
2368 return false;
2369 }
2370 string += strlen("--");
2371 if (!strncmp(string, "no-", strlen("no-"))) {
2372 string += strlen("no-");
2373 }
2374 return !strcmp(string, type->name);
2375 }
2376
2377 static inline int
parse_bool(struct arg_type * type,int * extra_args_consumed,int UU (argc),char * const argv[])2378 parse_bool(struct arg_type *type, int *extra_args_consumed, int UU(argc), char *const argv[]) {
2379 const char *string = argv[1];
2380 if (!strncmp(string, "--no-", strlen("--no-"))) {
2381 *((bool *)type->target) = false;
2382 }
2383 else {
2384 *((bool *)type->target) = true;
2385 }
2386 *extra_args_consumed = 0;
2387 return 0;
2388 }
2389
2390 static inline int
parse_string(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2391 parse_string(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2392 if (argc < 2) {
2393 return EINVAL;
2394 }
2395 *((const char **)type->target) = argv[2];
2396 *extra_args_consumed = 1;
2397 return 0;
2398 }
2399
2400 static inline int
parse_uint64(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2401 parse_uint64(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2402 // Already verified name.
2403
2404 if (argc < 2) {
2405 return EINVAL;
2406 }
2407 if (*argv[2] == '\0') {
2408 return EINVAL;
2409 }
2410
2411 char *endptr;
2412 unsigned long long int result = strtoull(argv[2], &endptr, 0);
2413 if (*endptr != '\0') {
2414 return EINVAL;
2415 }
2416 if (result < type->min.u64 || result > type->max.u64) {
2417 return ERANGE;
2418 }
2419 *((uint64_t*)type->target) = result;
2420 *extra_args_consumed = 1;
2421 return 0;
2422 }
2423
2424 static inline int
parse_int64(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2425 parse_int64(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2426 // Already verified name.
2427
2428 if (argc < 2) {
2429 return EINVAL;
2430 }
2431 if (*argv[2] == '\0') {
2432 return EINVAL;
2433 }
2434
2435 char *endptr;
2436 long long int result = strtoll(argv[2], &endptr, 0);
2437 if (*endptr != '\0') {
2438 return EINVAL;
2439 }
2440 if (result < type->min.i64 || result > type->max.i64) {
2441 return ERANGE;
2442 }
2443 *((int64_t*)type->target) = result;
2444 *extra_args_consumed = 1;
2445 return 0;
2446 }
2447
2448 static inline int
parse_uint32(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2449 parse_uint32(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2450 // Already verified name.
2451
2452 if (argc < 2) {
2453 return EINVAL;
2454 }
2455 if (*argv[2] == '\0') {
2456 return EINVAL;
2457 }
2458
2459 char *endptr;
2460 unsigned long int result = strtoul(argv[2], &endptr, 0);
2461 if (*endptr != '\0') {
2462 return EINVAL;
2463 }
2464 if (result < type->min.u32 || result > type->max.u32) {
2465 return ERANGE;
2466 }
2467 *((int32_t*)type->target) = result;
2468 *extra_args_consumed = 1;
2469 return 0;
2470 }
2471
2472 static inline int
parse_int32(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2473 parse_int32(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2474 // Already verified name.
2475
2476 if (argc < 2) {
2477 return EINVAL;
2478 }
2479 if (*argv[2] == '\0') {
2480 return EINVAL;
2481 }
2482
2483 char *endptr;
2484 long int result = strtol(argv[2], &endptr, 0);
2485 if (*endptr != '\0') {
2486 return EINVAL;
2487 }
2488 if (result < type->min.i32 || result > type->max.i32) {
2489 return ERANGE;
2490 }
2491 *((int32_t*)type->target) = result;
2492 *extra_args_consumed = 1;
2493 return 0;
2494 }
2495
2496 static inline int
parse_double(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2497 parse_double(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2498 // Already verified name.
2499
2500 if (argc < 2) {
2501 return EINVAL;
2502 }
2503 if (*argv[2] == '\0') {
2504 return EINVAL;
2505 }
2506
2507 char *endptr;
2508 double result = strtod(argv[2], &endptr);
2509 if (*endptr != '\0') {
2510 return EINVAL;
2511 }
2512 if (result < type->min.d || result > type->max.d) {
2513 return ERANGE;
2514 }
2515 *((double*)type->target) = result;
2516 *extra_args_consumed = 1;
2517 return 0;
2518 }
2519
2520 // Common case (match_name).
2521 #define DECLARE_TYPE_DESCRIPTION(typename) \
2522 struct type_description type_##typename = { \
2523 .type_name = #typename, \
2524 .matches = match_name, \
2525 .parse = parse_##typename, \
2526 .help = help_##typename \
2527 }
2528 DECLARE_TYPE_DESCRIPTION(int32);
2529 DECLARE_TYPE_DESCRIPTION(uint32);
2530 DECLARE_TYPE_DESCRIPTION(int64);
2531 DECLARE_TYPE_DESCRIPTION(uint64);
2532 DECLARE_TYPE_DESCRIPTION(double);
2533 DECLARE_TYPE_DESCRIPTION(string);
2534
2535 // Bools use their own match function so they are declared manually.
2536 struct type_description type_bool = {
2537 .type_name = "bool",
2538 .matches = match_bool,
2539 .parse = parse_bool,
2540 .help = help_bool
2541 };
2542
2543 #define ARG_MATCHES(type, rest...) type->description->matches(type, rest)
2544 #define ARG_PARSE(type, rest...) type->description->parse(type, rest)
2545 #define ARG_HELP(type, rest...) type->description->help(type, rest)
2546
2547 static inline void
do_usage(const char * argv0,int n,struct arg_type types[])2548 do_usage(const char *argv0, int n, struct arg_type types[/*n*/]) {
2549 fprintf(stderr, "Usage:\n");
2550 fprintf(stderr, "\t%s [-h|--help]\n", argv0);
2551 fprintf(stderr, "\t%s [OPTIONS]\n", argv0);
2552 fprintf(stderr, "\n");
2553 fprintf(stderr, "OPTIONS are among:\n");
2554 fprintf(stderr, "\t-q|--quiet\n");
2555 fprintf(stderr, "\t-v|--verbose\n");
2556 for (int i = 0; i < n; i++) {
2557 struct arg_type *type = &types[i];
2558 ARG_HELP(type, 35, 6);
2559 }
2560 }
2561
parse_stress_test_args(int argc,char * const argv[],struct cli_args * args)2562 static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) {
2563 struct cli_args default_args = *args;
2564 const char *argv0=argv[0];
2565
2566 #define MAKE_ARG(name_string, type, member, variable, suffix, min_val, max_val) { \
2567 .name=(name_string), \
2568 .description=&(type), \
2569 .default_val={.member=default_args.variable}, \
2570 .target=&(args->variable), \
2571 .help_suffix=(suffix), \
2572 .min={.member=min_val}, \
2573 .max={.member=max_val}, \
2574 }
2575 #define MAKE_LOCAL_ARG(name_string, type, member, default, variable, suffix, min_val, max_val) { \
2576 .name=(name_string), \
2577 .description=&(type), \
2578 .default_val={.member=default}, \
2579 .target=&(variable), \
2580 .help_suffix=(suffix), \
2581 .min={.member=min_val}, \
2582 .max={.member=max_val}, \
2583 }
2584 #define UINT32_ARG(name_string, variable, suffix) \
2585 MAKE_ARG(name_string, type_uint32, u32, variable, suffix, 0, UINT32_MAX)
2586 #define UINT32_ARG_R(name_string, variable, suffix, min, max) \
2587 MAKE_ARG(name_string, type_uint32, u32, variable, suffix, min, max)
2588 #define UINT64_ARG(name_string, variable, suffix) \
2589 MAKE_ARG(name_string, type_uint64, u64, variable, suffix, 0, UINT64_MAX)
2590 #define INT32_ARG_NONNEG(name_string, variable, suffix) \
2591 MAKE_ARG(name_string, type_int32, i32, variable, suffix, 0, INT32_MAX)
2592 #define INT32_ARG_R(name_string, variable, suffix, min, max) \
2593 MAKE_ARG(name_string, type_int32, i32, variable, suffix, min, max)
2594 #define DOUBLE_ARG_R(name_string, variable, suffix, min, max) \
2595 MAKE_ARG(name_string, type_double, d, variable, suffix, min, max)
2596 #define BOOL_ARG(name_string, variable) \
2597 MAKE_ARG(name_string, type_bool, b, variable, "", false, false)
2598 #define STRING_ARG(name_string, variable) \
2599 MAKE_ARG(name_string, type_string, s, variable, "", "", "")
2600 #define LOCAL_STRING_ARG(name_string, variable, default) \
2601 MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "")
2602
2603 const char *perf_format_s = nullptr;
2604 const char *compression_method_s = nullptr;
2605 const char *print_engine_status_s = nullptr;
2606 struct arg_type arg_types[] = {
2607 INT32_ARG_NONNEG("--num_elements", num_elements, ""),
2608 INT32_ARG_NONNEG("--num_DBs", num_DBs, ""),
2609 INT32_ARG_NONNEG("--num_seconds", num_seconds, "s"),
2610 INT32_ARG_NONNEG("--fanout", env_args.fanout, ""),
2611 INT32_ARG_NONNEG("--node_size", env_args.node_size, " bytes"),
2612 INT32_ARG_NONNEG("--basement_node_size", env_args.basement_node_size, " bytes"),
2613 INT32_ARG_NONNEG("--rollback_node_size", env_args.rollback_node_size, " bytes"),
2614 INT32_ARG_NONNEG("--checkpointing_period", env_args.checkpointing_period, "s"),
2615 INT32_ARG_NONNEG("--cleaner_period", env_args.cleaner_period, "s"),
2616 INT32_ARG_NONNEG("--cleaner_iterations", env_args.cleaner_iterations, ""),
2617 INT32_ARG_NONNEG("--sync_period", env_args.sync_period, "ms"),
2618 INT32_ARG_NONNEG("--update_broadcast_period", update_broadcast_period_ms, "ms"),
2619 INT32_ARG_NONNEG("--num_ptquery_threads", num_ptquery_threads, " threads"),
2620 INT32_ARG_NONNEG("--num_put_threads", num_put_threads, " threads"),
2621 INT32_ARG_NONNEG("--num_update_threads", num_update_threads, " threads"),
2622 INT32_ARG_NONNEG("--range_query_limit", range_query_limit, " rows"),
2623
2624 UINT32_ARG("--txn_size", txn_size, " rows"),
2625 UINT32_ARG("--num_bucket_mutexes", env_args.num_bucket_mutexes, " mutexes"),
2626
2627 INT32_ARG_R("--join_timeout", join_timeout, "s", 1, INT32_MAX),
2628 INT32_ARG_R("--performance_period", performance_period, "s", 1, INT32_MAX),
2629
2630 UINT64_ARG("--cachetable_size", env_args.cachetable_size, " bytes"),
2631 UINT64_ARG("--lk_max_memory", env_args.lk_max_memory, " bytes"),
2632
2633 DOUBLE_ARG_R("--compressibility", compressibility, "", 0.0, 1.0),
2634
2635 //TODO: when outputting help.. skip min/max that is min/max of data range.
2636 UINT32_ARG_R("--key_size", key_size, " bytes", min_key_size, UINT32_MAX),
2637 UINT32_ARG_R("--val_size", val_size, " bytes", min_val_size, UINT32_MAX),
2638
2639 BOOL_ARG("serial_insert", serial_insert),
2640 BOOL_ARG("interleave", interleave),
2641 BOOL_ARG("crash_on_operation_failure", crash_on_operation_failure),
2642 BOOL_ARG("single_txn", single_txn),
2643 BOOL_ARG("warm_cache", warm_cache),
2644 BOOL_ARG("print_performance", print_performance),
2645 BOOL_ARG("print_thread_performance", print_thread_performance),
2646 BOOL_ARG("print_iteration_performance", print_iteration_performance),
2647 BOOL_ARG("only_create", only_create),
2648 BOOL_ARG("only_stress", only_stress),
2649 BOOL_ARG("test", do_test_and_crash),
2650 BOOL_ARG("recover", do_recover),
2651 BOOL_ARG("blackhole", blackhole),
2652 BOOL_ARG("nolocktree", nolocktree),
2653 BOOL_ARG("unique_checks", unique_checks),
2654 BOOL_ARG("nolog", nolog),
2655 BOOL_ARG("nocrashstatus", nocrashstatus),
2656 BOOL_ARG("prelock_updates", prelock_updates),
2657 BOOL_ARG("disperse_keys", disperse_keys),
2658 BOOL_ARG("memcmp_keys", memcmp_keys),
2659 BOOL_ARG("direct_io", direct_io),
2660
2661 STRING_ARG("--envdir", env_args.envdir),
2662
2663 LOCAL_STRING_ARG("--perf_format", perf_format_s, "human"),
2664 LOCAL_STRING_ARG("--compression_method", compression_method_s, "quicklz"),
2665 LOCAL_STRING_ARG("--print_engine_status", print_engine_status_s, nullptr),
2666 //TODO(add --quiet, -v, -h)
2667 };
2668 #undef UINT32_ARG
2669 #undef UINT32_ARG_R
2670 #undef UINT64_ARG
2671 #undef DOUBLE_ARG_R
2672 #undef BOOL_ARG
2673 #undef STRING_ARG
2674 #undef MAKE_ARG
2675
2676 int num_arg_types = sizeof(arg_types) / sizeof(arg_types[0]);
2677
2678 int resultcode = 0;
2679 while (argc > 1) {
2680 if (!strcmp(argv[1], "-v") || !strcmp(argv[1], "--verbose")) {
2681 verbose++;
2682 argv++;
2683 argc--;
2684 }
2685 else if (!strcmp(argv[1], "-q") || !strcmp(argv[1], "--quiet")) {
2686 verbose = 0;
2687 argv++;
2688 argc--;
2689 }
2690 else if (!strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
2691 fprintf(stderr, "HELP INVOKED\n");
2692 do_usage(argv0, num_arg_types, arg_types);
2693 exit(0);
2694 }
2695 else {
2696 bool found = false;
2697 for (int i = 0; i < num_arg_types; i++) {
2698 struct arg_type *type = &arg_types[i];
2699 if (ARG_MATCHES(type, argv)) {
2700 int extra_args_consumed;
2701 resultcode = ARG_PARSE(type, &extra_args_consumed, argc, argv);
2702 if (resultcode) {
2703 fprintf(stderr, "ERROR PARSING [%s]\n", argv[1]);
2704 do_usage(argv0, num_arg_types, arg_types);
2705 exit(resultcode);
2706 }
2707 found = true;
2708 argv += extra_args_consumed + 1;
2709 argc -= extra_args_consumed + 1;
2710 break;
2711 }
2712 }
2713 if (!found) {
2714 fprintf(stderr, "COULD NOT PARSE [%s]\n", argv[1]);
2715 do_usage(argv0, num_arg_types, arg_types);
2716 exit(EINVAL);
2717 }
2718 }
2719 }
2720 args->print_engine_status = print_engine_status_s;
2721 if (compression_method_s != nullptr) {
2722 if (strcmp(compression_method_s, "quicklz") == 0) {
2723 args->compression_method = TOKU_QUICKLZ_METHOD;
2724 } else if (strcmp(compression_method_s, "zlib") == 0) {
2725 args->compression_method = TOKU_ZLIB_WITHOUT_CHECKSUM_METHOD;
2726 } else if (strcmp(compression_method_s, "lzma") == 0) {
2727 args->compression_method = TOKU_LZMA_METHOD;
2728 } else if (strcmp(compression_method_s, "snappy") == 0) {
2729 args->compression_method = TOKU_SNAPPY_METHOD;
2730 } else if (strcmp(compression_method_s, "none") == 0) {
2731 args->compression_method = TOKU_NO_COMPRESSION;
2732 } else {
2733 fprintf(stderr, "valid values for --compression_method are \"quicklz\", \"zlib\", \"lzma\", \"snappy\", and \"none\"\n");
2734 do_usage(argv0, num_arg_types, arg_types);
2735 exit(EINVAL);
2736 }
2737 }
2738 if (perf_format_s != nullptr) {
2739 if (!strcmp(perf_format_s, "human")) {
2740 args->perf_output_format = HUMAN;
2741 } else if (!strcmp(perf_format_s, "csv")) {
2742 args->perf_output_format = CSV;
2743 } else if (!strcmp(perf_format_s, "tsv")) {
2744 args->perf_output_format = TSV;
2745 } else {
2746 fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n");
2747 do_usage(argv0, num_arg_types, arg_types);
2748 exit(EINVAL);
2749 }
2750 }
2751 if (args->only_create && args->only_stress) {
2752 fprintf(stderr, "used --only_stress and --only_create\n");
2753 do_usage(argv0, num_arg_types, arg_types);
2754 exit(EINVAL);
2755 }
2756 }
2757
2758 static void
2759 stress_table(DB_ENV *, DB **, struct cli_args *);
2760
2761 static int
stress_dbt_cmp_legacy(const DBT * a,const DBT * b)2762 stress_dbt_cmp_legacy(const DBT *a, const DBT *b) {
2763 int x = *(int *) a->data;
2764 int y = *(int *) b->data;
2765 if (x < y) {
2766 return -1;
2767 } else if (x > y) {
2768 return +1;
2769 } else {
2770 return 0;
2771 }
2772 }
2773
2774 static int
stress_dbt_cmp(const DBT * a,const DBT * b)2775 stress_dbt_cmp(const DBT *a, const DBT *b) {
2776 // Keys are only compared by their first 8 bytes,
2777 // interpreted as a little endian 64 bit integers.
2778 // The rest of the key is just padding.
2779 uint64_t x = *(uint64_t *) a->data;
2780 uint64_t y = *(uint64_t *) b->data;
2781 if (x < y) {
2782 return -1;
2783 } else if (x > y) {
2784 return +1;
2785 } else {
2786 return 0;
2787 }
2788 }
2789
2790 static int
stress_cmp(DB * db,const DBT * a,const DBT * b)2791 stress_cmp(DB *db, const DBT *a, const DBT *b) {
2792 assert(db && a && b);
2793 assert(a->size == b->size);
2794
2795 if (a->size == sizeof(int)) {
2796 // Legacy comparison: keys must be >= 4 bytes
2797 return stress_dbt_cmp_legacy(a, b);
2798 } else {
2799 // Modern comparison: keys must be >= 8 bytes
2800 invariant(a->size >= sizeof(uint64_t));
2801 return stress_dbt_cmp(a, b);
2802 }
2803 }
2804
2805 static void
do_warm_cache(DB_ENV * env,DB ** dbs,struct cli_args * args)2806 do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args)
2807 {
2808 struct scan_op_extra soe;
2809 soe.fast = true;
2810 soe.fwd = true;
2811 soe.prefetch = true;
2812 struct arg scan_arg;
2813 arg_init(&scan_arg, dbs, env, args);
2814 scan_arg.operation_extra = &soe;
2815 scan_arg.operation = scan_op_no_check;
2816 scan_arg.lock_type = STRESS_LOCK_NONE;
2817 DB_TXN* txn = nullptr;
2818 // don't take serializable read locks when scanning.
2819 int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
2820 // make sure the scan doesn't terminate early
2821 run_test = true;
2822 // warm up each DB in parallel
2823 scan_op_no_check_parallel(txn, &scan_arg, &soe, nullptr);
2824 r = txn->commit(txn,0); CKERR(r);
2825 }
2826
2827 static void
UU()2828 UU() stress_recover(struct cli_args *args) {
2829 DB_ENV* env = nullptr;
2830 DB* dbs[args->num_DBs];
2831 memset(dbs, 0, sizeof(dbs));
2832 { int chk_r = open_tables(&env,
2833 dbs,
2834 args->num_DBs,
2835 stress_cmp,
2836 args); CKERR(chk_r); }
2837
2838 DB_TXN* txn = nullptr;
2839 struct arg recover_args;
2840 arg_init(&recover_args, dbs, env, args);
2841 int r = env->txn_begin(env, 0, &txn, recover_args.txn_flags);
2842 CKERR(r);
2843 struct scan_op_extra soe = {
2844 .fast = true,
2845 .fwd = true,
2846 .prefetch = false
2847 };
2848 // make sure the scan doesn't terminate early
2849 run_test = true;
2850 r = scan_op(txn, &recover_args, &soe, nullptr);
2851 CKERR(r);
2852 { int chk_r = txn->commit(txn,0); CKERR(chk_r); }
2853 { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
2854 }
2855
2856 static void
open_and_stress_tables(struct cli_args * args,bool fill_with_zeroes,int (* cmp)(DB *,const DBT *,const DBT *))2857 open_and_stress_tables(struct cli_args *args, bool fill_with_zeroes, int (*cmp)(DB *, const DBT *, const DBT *))
2858 {
2859 if ((args->key_size < 8 && args->key_size != 4) ||
2860 (args->val_size < 8 && args->val_size != 4)) {
2861 fprintf(stderr, "The only valid key/val sizes are 4, 8, and > 8.\n");
2862 return;
2863 }
2864
2865 setlocale(LC_NUMERIC, "en_US.UTF-8");
2866 DB_ENV* env = nullptr;
2867 DB* dbs[args->num_DBs];
2868 memset(dbs, 0, sizeof(dbs));
2869 db_env_enable_engine_status(args->nocrashstatus ? false : true);
2870 db_env_set_direct_io(args->direct_io ? true : false);
2871 if (!args->only_stress) {
2872 create_tables(
2873 &env,
2874 dbs,
2875 args->num_DBs,
2876 cmp,
2877 args
2878 );
2879 { int chk_r = fill_tables(env, dbs, args, fill_with_zeroes); CKERR(chk_r); }
2880 { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
2881 }
2882 if (!args->only_create) {
2883 { int chk_r = open_tables(&env,
2884 dbs,
2885 args->num_DBs,
2886 cmp,
2887 args); CKERR(chk_r); }
2888 if (args->warm_cache) {
2889 do_warm_cache(env, dbs, args);
2890 }
2891 stress_table(env, dbs, args);
2892 { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
2893 }
2894 }
2895
2896 static void
UU()2897 UU() stress_test_main(struct cli_args *args) {
2898 // Begin the test with fixed size values equal to zero.
2899 // This is important for correctness testing.
2900 open_and_stress_tables(args, true, stress_cmp);
2901 }
2902
2903 static void
UU()2904 UU() perf_test_main(struct cli_args *args) {
2905 // Do not begin the test by creating a table of all zeroes.
2906 // We want to control the row size and its compressibility.
2907 open_and_stress_tables(args, false, stress_cmp);
2908 }
2909
2910 static void
UU()2911 UU() perf_test_main_with_cmp(struct cli_args *args, int (*cmp)(DB *, const DBT *, const DBT *)) {
2912 // Do not begin the test by creating a table of all zeroes.
2913 // We want to control the row size and its compressibility.
2914 open_and_stress_tables(args, false, cmp);
2915 }
2916