1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #pragma once
40 
41 // The Way Things Work:
42 //
43 // Threaded stress tests have the following properties:
44 // - One or more DBs
45 // - One or more threads performing some number of operations per txn.
46 // - Correctness tests use signed 4 byte keys and signed 4 byte values. They expect
47 // a table with all zeroes before running.
48 // - Performance tests should use 8 byte keys and 8+ byte values, where the values
49 // are some mixture of random uncompressible garbage and zeroes, depending how
50 // compressible we want the data. These tests want the table to be populated
51 // with keys in the range [0, table_size - 1] unless disperse_keys is true,
52 // then the keys are scrambled up in the integer key space.
53 
54 #include "toku_config.h"
55 #include "test.h"
56 
57 #include <stdio.h>
58 #include <math.h>
59 #include <locale.h>
60 
61 #include <db.h>
62 #include <memory.h>
63 #include <toku_race_tools.h>
64 
65 #include <portability/toku_atomic.h>
66 #include <portability/toku_pthread.h>
67 #include <portability/toku_random.h>
68 #include <portability/toku_time.h>
69 
70 #include <src/ydb-internal.h>
71 
72 #include <util/dbt.h>
73 
74 #include <util/rwlock.h>
75 #include <util/kibbutz.h>
76 
77 static const size_t min_val_size = sizeof(int32_t);
78 static const size_t min_key_size = sizeof(int32_t);
79 
80 volatile bool run_test; // should be volatile since we are communicating through this variable.
81 
82 typedef struct arg *ARG;
83 typedef int (*operation_t)(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra);
84 
85 // TODO: Properly define these in db.h so we don't have to copy them here
86 typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra);
87 typedef int (*test_generate_row_for_put_callback)(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_data);
88 typedef int (*test_generate_row_for_del_callback)(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, const DBT *src_key, const DBT *src_data);
89 
90 enum stress_lock_type {
91     STRESS_LOCK_NONE = 0,
92     STRESS_LOCK_SHARED,
93     STRESS_LOCK_EXCL
94 };
95 
96 struct env_args {
97     int fanout;
98     int node_size;
99     int basement_node_size;
100     int rollback_node_size;
101     int checkpointing_period;
102     int cleaner_period;
103     int cleaner_iterations;
104     int sync_period;
105     uint64_t lk_max_memory;
106     uint64_t cachetable_size;
107     uint32_t num_bucket_mutexes;
108     const char *envdir;
109     test_update_callback_f update_function; // update callback function
110     test_generate_row_for_put_callback generate_put_callback;
111     test_generate_row_for_del_callback generate_del_callback;
112 };
113 
114 enum perf_output_format {
115     HUMAN = 0,
116     CSV,
117     TSV,
118     NUM_OUTPUT_FORMATS
119 };
120 
121 struct cli_args {
122     int num_elements; // number of elements per DB
123     int num_DBs; // number of DBs
124     int num_seconds; // how long test should run
125     int join_timeout; // how long to wait for threads to join before assuming deadlocks
126     bool only_create; // true if want to only create DBs but not run stress
127     bool only_stress; // true if DBs are already created and want to only run stress
128     int update_broadcast_period_ms; // specific to test_stress3
129     int num_ptquery_threads; // number of threads to run point queries
130     bool do_test_and_crash; // true if we should crash after running stress test. For recovery tests.
131     bool do_recover; // true if we should run recover
132     int num_update_threads; // number of threads running updates
133     int num_put_threads; // number of threads running puts
134     int range_query_limit; // how many rows to look at for range queries
135     bool serial_insert;
136     bool interleave; // for insert benchmarks, whether to interleave separate threads' puts (or segregate them)
137     bool crash_on_operation_failure;
138     bool print_performance;
139     bool print_thread_performance;
140     bool print_iteration_performance;
141     enum perf_output_format perf_output_format;
142     enum toku_compression_method compression_method; // the compression method to use on newly created DBs
143     int performance_period;
144     uint32_t txn_size; // specifies number of updates/puts/whatevers per txn
145     uint32_t key_size; // number of bytes in vals. Must be at least 4
146     uint32_t val_size; // number of bytes in vals. Must be at least 4
147     double compressibility; // the row values should compress down to this fraction
148     struct env_args env_args; // specifies environment variables
149     bool single_txn;
150     bool warm_cache; // warm caches before running stress_table
151     bool blackhole; // all message injects are no-ops. helps measure txn/logging/locktree overhead.
152     bool nolocktree; // use this flag to avoid the locktree on insertions
153     bool unique_checks; // use uniqueness checking during insert. makes it slow.
154     uint32_t sync_period; // background log fsync period
155     bool nolog; // do not log. useful for testing in memory performance.
156     bool nocrashstatus; // do not print engine status upon crash
157     bool prelock_updates; // update threads perform serial updates on a prelocked range
158     bool disperse_keys; // spread the keys out during a load (by reversing the bits in the loop index) to make a wide tree we can spread out random inserts into
159     bool memcmp_keys; // pack keys big endian and use the builtin key comparison function in the fractal tree
160     bool direct_io; // use direct I/O
161     const char *print_engine_status; // print engine status rows matching a simple regex "a|b|c", matching strings where a or b or c is a subtring.
162 };
163 
164 struct arg {
165     DB **dbp; // array of DBs
166     DB_ENV* env; // environment used
167     bool bounded_element_range; // true if elements in dictionary are bounded
168                                 // by num_elements, that is, all keys in each
169                                 // DB are in [0, num_elements)
170                                 // false otherwise
171     int sleep_ms; // number of milliseconds to sleep between operations
172     uint32_t txn_flags; // isolation level for txn running operation
173     operation_t operation; // function that is the operation to be run
174     void* operation_extra; // extra parameter passed to operation
175     enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking
176     struct random_data *random_data; // state for random_r
177     int thread_idx;
178     int num_threads;
179     struct cli_args *cli;
180     bool do_prepare;
181     bool prelock_updates;
182     bool track_thread_performance;
183     bool wrap_in_parent;
184 };
185 
arg_init(struct arg * arg,DB ** dbp,DB_ENV * env,struct cli_args * cli_args)186 static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
187     arg->cli = cli_args;
188     arg->dbp = dbp;
189     arg->env = env;
190     arg->bounded_element_range = true;
191     arg->sleep_ms = 0;
192     arg->lock_type = STRESS_LOCK_NONE;
193     arg->txn_flags = DB_TXN_SNAPSHOT;
194     arg->operation_extra = nullptr;
195     arg->do_prepare = false;
196     arg->prelock_updates = false;
197     arg->track_thread_performance = true;
198     arg->wrap_in_parent = false;
199 }
200 
201 enum operation_type {
202     OPERATION = 0,
203     PUTS,
204     PTQUERIES,
205     NUM_OPERATION_TYPES
206 };
207 
208 const char *operation_names[] = {
209     "ops",
210     "puts",
211     "ptqueries",
212     nullptr
213 };
214 
increment_counter(void * extra,enum operation_type type,uint64_t inc)215 static void increment_counter(void *extra, enum operation_type type, uint64_t inc) {
216     invariant(type != OPERATION);
217     int t = (int) type;
218     invariant(extra);
219     invariant(t >= 0 && t < (int) NUM_OPERATION_TYPES);
220     uint64_t *CAST_FROM_VOIDP(counters, extra);
221     counters[t] += inc;
222 }
223 
224 struct perf_formatter {
225     void (*header)(const struct cli_args *cli_args, const int num_threads);
226     void (*iteration)(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads);
227     void (*totals)(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads);
228 };
229 
230 static inline int
seconds_in_this_iteration(const int current_time,const int performance_period)231 seconds_in_this_iteration(const int current_time, const int performance_period)
232 {
233     const int iteration = (current_time + performance_period - 1) / performance_period;
234     return current_time - ((iteration - 1) * performance_period);
235 }
236 
237 static void
human_print_perf_header(const struct cli_args * UU (cli_args),const int UU (num_threads))238 human_print_perf_header(const struct cli_args *UU(cli_args), const int UU(num_threads)) {}
239 
240 static void
human_print_perf_iteration(const struct cli_args * cli_args,const int current_time,uint64_t last_counters[][(int)NUM_OPERATION_TYPES],uint64_t * counters[],const int num_threads)241 human_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
242 {
243     const int secondsthisiter = seconds_in_this_iteration(current_time, cli_args->performance_period);
244     for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
245         uint64_t period_total = 0;
246         printf("%4d %s", current_time, operation_names[op]);
247         for (int i = strlen(operation_names[op]); i < 12; ++i) {
248             printf(" ");
249         }
250         for (int t = 0; t < num_threads; ++t) {
251             const uint64_t last = last_counters[t][op];
252             const uint64_t current = counters[t][op];
253             const uint64_t this_iter = current - last;
254             if (cli_args->print_thread_performance) {
255                 const double persecond = (double) this_iter / secondsthisiter;
256                 printf("\t%'12" PRIu64 " (%'12.1lf/s)", this_iter, persecond);
257             }
258             period_total += this_iter;
259             last_counters[t][op] = current;
260         }
261         const double totalpersecond = (double) period_total / secondsthisiter;
262         printf("\tTotal %'12" PRIu64 " (%'12.1lf/s)\n", period_total, totalpersecond);
263     }
264     fflush(stdout);
265 }
266 
267 static void
human_print_perf_totals(const struct cli_args * cli_args,uint64_t * counters[],const int num_threads)268 human_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads)
269 {
270     if (cli_args->print_iteration_performance) {
271         printf("\n");
272     }
273     printf("Overall performance:\n");
274     uint64_t overall_totals[(int) NUM_OPERATION_TYPES];
275     ZERO_ARRAY(overall_totals);
276     for (int t = 0; t < num_threads; ++t) {
277         if (cli_args->print_thread_performance) {
278             printf("Thread %4d: ", t + 1);
279         }
280         for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
281             const uint64_t current = counters[t][op];
282             if (cli_args->print_thread_performance) {
283                 const double persecond = (double) current / cli_args->num_seconds;
284                 printf("\t%s\t%'12" PRIu64 " (%'12.1lf/s)", operation_names[op], current, persecond);
285             }
286             overall_totals[op] += current;
287         }
288         if (cli_args->print_thread_performance) {
289             printf("\n");
290         }
291     }
292     printf("All threads: ");
293     for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
294         const double totalpersecond = (double) overall_totals[op] / cli_args->num_seconds;
295         printf("\t%s\t%'12" PRIu64 " (%'12.1lf/s)", operation_names[op], overall_totals[op], totalpersecond);
296     }
297     printf("\n");
298 }
299 
300 static void
csv_print_perf_header(const struct cli_args * cli_args,const int num_threads)301 csv_print_perf_header(const struct cli_args *cli_args, const int num_threads)
302 {
303     printf("seconds");
304     if (cli_args->print_thread_performance) {
305         for (int t = 1; t <= num_threads; ++t) {
306             for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
307                 printf(",\"Thread %d %s\",\"Thread %d %s/s\"", t, operation_names[op], t, operation_names[op]);
308             }
309         }
310     }
311     for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
312         printf(",\"Total %s\",\"Total %s/s\"", operation_names[op], operation_names[op]);
313     }
314     printf("\n");
315 }
316 
317 static void
csv_print_perf_iteration(const struct cli_args * cli_args,const int current_time,uint64_t last_counters[][(int)NUM_OPERATION_TYPES],uint64_t * counters[],const int num_threads)318 csv_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
319 {
320     const int secondsthisiter = seconds_in_this_iteration(current_time, cli_args->performance_period);
321     printf("%d", current_time);
322     uint64_t period_totals[(int) NUM_OPERATION_TYPES];
323     ZERO_ARRAY(period_totals);
324     for (int t = 0; t < num_threads; ++t) {
325         for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
326             const uint64_t last = last_counters[t][op];
327             const uint64_t current = counters[t][op];
328             const uint64_t this_iter = current - last;
329             if (cli_args->print_thread_performance) {
330                 const double persecond = (double) this_iter / secondsthisiter;
331                 printf(",%" PRIu64 ",%.1lf", this_iter, persecond);
332             }
333             period_totals[op] += this_iter;
334             last_counters[t][op] = current;
335         }
336     }
337     for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
338         const double totalpersecond = (double) period_totals[op] / secondsthisiter;
339         printf(",%" PRIu64 ",%.1lf", period_totals[op], totalpersecond);
340     }
341     printf("\n");
342     fflush(stdout);
343 }
344 
345 static void
csv_print_perf_totals(const struct cli_args * cli_args,uint64_t * counters[],const int num_threads)346 csv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads) {
347     printf("overall");
348     uint64_t overall_totals[(int) NUM_OPERATION_TYPES];
349     ZERO_ARRAY(overall_totals);
350     for (int t = 0; t < num_threads; ++t) {
351         for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
352             const uint64_t current = counters[t][op];
353             if (cli_args->print_thread_performance) {
354                 const double persecond = (double) current / cli_args->num_seconds;
355                 printf(",%" PRIu64 ",%.1lf", current, persecond);
356             }
357             overall_totals[op] += current;
358         }
359     }
360     for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
361         const double totalpersecond = (double) overall_totals[op] / cli_args->num_seconds;
362         printf(",%" PRIu64 ",%.1lf", overall_totals[op], totalpersecond);
363     }
364     printf("\n");
365 }
366 
367 static void
tsv_print_perf_header(const struct cli_args * cli_args,const int num_threads)368 tsv_print_perf_header(const struct cli_args *cli_args, const int num_threads)
369 {
370     printf("\"seconds\"");
371     if (cli_args->print_thread_performance) {
372         for (int t = 1; t <= num_threads; ++t) {
373             for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
374                 printf("\t\"Thread %d %s\"\t\"Thread %d %s/s\"", t, operation_names[op], t, operation_names[op]);
375             }
376         }
377     }
378     for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
379         printf("\t\"Total %s\"\t\"Total %s/s\"", operation_names[op], operation_names[op]);
380     }
381     printf("\n");
382 }
383 
384 static void
tsv_print_perf_iteration(const struct cli_args * cli_args,const int current_time,uint64_t last_counters[][(int)NUM_OPERATION_TYPES],uint64_t * counters[],const int num_threads)385 tsv_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
386 {
387     const int secondsthisiter = seconds_in_this_iteration(current_time, cli_args->performance_period);
388     printf("%d", current_time);
389     uint64_t period_totals[(int) NUM_OPERATION_TYPES];
390     ZERO_ARRAY(period_totals);
391     for (int t = 0; t < num_threads; ++t) {
392         for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
393             const uint64_t last = last_counters[t][op];
394             const uint64_t current = counters[t][op];
395             const uint64_t this_iter = current - last;
396             if (cli_args->print_thread_performance) {
397                 const double persecond = (double) this_iter / secondsthisiter;
398                 printf("\t%" PRIu64 "\t%.1lf", this_iter, persecond);
399             }
400             period_totals[op] += this_iter;
401             last_counters[t][op] = current;
402         }
403     }
404     for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
405         const double totalpersecond = (double) period_totals[op] / secondsthisiter;
406         printf("\t%" PRIu64 "\t%.1lf", period_totals[op], totalpersecond);
407     }
408     printf("\n");
409     fflush(stdout);
410 }
411 
412 static void
tsv_print_perf_totals(const struct cli_args * cli_args,uint64_t * counters[],const int num_threads)413 tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads) {
414     printf("\"overall\"");
415     uint64_t overall_totals[(int) NUM_OPERATION_TYPES];
416     ZERO_ARRAY(overall_totals);
417     for (int t = 0; t < num_threads; ++t) {
418         for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
419             const uint64_t current = counters[t][op];
420             if (cli_args->print_thread_performance) {
421                 const double persecond = (double) current / cli_args->num_seconds;
422                 printf("\t%" PRIu64 "\t%.1lf", current, persecond);
423             }
424             overall_totals[op] += current;
425         }
426     }
427     for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
428         const double totalpersecond = (double) overall_totals[op] / cli_args->num_seconds;
429         printf("\t%" PRIu64 "\t%.1lf", overall_totals[op], totalpersecond);
430     }
431     printf("\n");
432 }
433 
434 const struct perf_formatter perf_formatters[] = {
435     { /* HUMAN */
436         .header = human_print_perf_header,
437         .iteration = human_print_perf_iteration,
438         .totals = human_print_perf_totals
439     },
440     { /* CSV */
441         .header = csv_print_perf_header,
442         .iteration = csv_print_perf_iteration,
443         .totals = csv_print_perf_totals
444     },
445     { /* TSV */
446         .header = tsv_print_perf_header,
447         .iteration = tsv_print_perf_iteration,
448         .totals = tsv_print_perf_totals
449     },
450 };
451 
get_env_open_flags(struct cli_args * args)452 static int get_env_open_flags(struct cli_args *args) {
453     int flags = DB_INIT_LOCK|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE;
454     flags |= args->nolog ? 0 : DB_INIT_LOG;
455     return flags;
456 }
457 
get_put_flags(struct cli_args * args)458 static int get_put_flags(struct cli_args *args) {
459     int flags = 0;
460     flags |= args->nolocktree ? DB_PRELOCKED_WRITE : 0;
461     flags |= args->unique_checks ? DB_NOOVERWRITE : 0;
462     return flags;
463 }
464 
get_commit_flags(struct cli_args * args)465 static int get_commit_flags(struct cli_args *args) {
466     int flags = 0;
467     flags |= args->env_args.sync_period > 0 ? DB_TXN_NOSYNC : 0;
468     return flags;
469 }
470 
471 struct worker_extra {
472     struct arg *thread_arg;
473     toku_mutex_t *operation_lock_mutex;
474     struct st_rwlock *operation_lock;
475     uint64_t *counters;
476     int64_t pad[4];  // pad to 64 bytes
477 };
478 
lock_worker_op(struct worker_extra * we)479 static void lock_worker_op(struct worker_extra* we) {
480     ARG arg = we->thread_arg;
481     if (arg->lock_type != STRESS_LOCK_NONE) {
482         toku_mutex_lock(we->operation_lock_mutex);
483         if (arg->lock_type == STRESS_LOCK_SHARED) {
484             rwlock_read_lock(we->operation_lock, we->operation_lock_mutex);
485         } else if (arg->lock_type == STRESS_LOCK_EXCL) {
486             rwlock_write_lock(we->operation_lock, we->operation_lock_mutex);
487         } else {
488             abort();
489         }
490         toku_mutex_unlock(we->operation_lock_mutex);
491     }
492 }
493 
unlock_worker_op(struct worker_extra * we)494 static void unlock_worker_op(struct worker_extra* we) {
495     ARG arg = we->thread_arg;
496     if (arg->lock_type != STRESS_LOCK_NONE) {
497         toku_mutex_lock(we->operation_lock_mutex);
498         if (arg->lock_type == STRESS_LOCK_SHARED) {
499             rwlock_read_unlock(we->operation_lock);
500         } else if (arg->lock_type == STRESS_LOCK_EXCL) {
501             rwlock_write_unlock(we->operation_lock);
502         } else {
503             abort();
504         }
505         toku_mutex_unlock(we->operation_lock_mutex);
506     }
507 }
508 
worker(void * arg_v)509 static void *worker(void *arg_v) {
510     int r;
511     struct worker_extra* CAST_FROM_VOIDP(we, arg_v);
512     ARG arg = we->thread_arg;
513     struct random_data random_data;
514     ZERO_STRUCT(random_data);
515     char *XCALLOC_N(8, random_buf);
516     r = myinitstate_r(random(), random_buf, 8, &random_data);
517     assert_zero(r);
518     arg->random_data = &random_data;
519     DB_ENV *env = arg->env;
520     DB_TXN *txn = nullptr;
521     DB_TXN *ptxn = nullptr;
522     if (verbose) {
523         toku_pthread_t self = toku_pthread_self();
524         uintptr_t intself = (uintptr_t) self;
525         printf("%lu starting %p\n", (unsigned long) intself, arg->operation);
526     }
527     if (arg->cli->single_txn) {
528         r = env->txn_begin(env, 0, &txn, arg->txn_flags); CKERR(r);
529     } else if (arg->wrap_in_parent) {
530         r = env->txn_begin(env, 0, &ptxn, arg->txn_flags); CKERR(r);
531     }
532     while (run_test) {
533         lock_worker_op(we);
534         if (!arg->cli->single_txn) {
535             r = env->txn_begin(env, ptxn, &txn, arg->txn_flags); CKERR(r);
536         }
537         r = arg->operation(txn, arg, arg->operation_extra, we->counters);
538         if (r==0 && !arg->cli->single_txn && arg->do_prepare) {
539             uint8_t gid[DB_GID_SIZE];
540             memset(gid, 0, DB_GID_SIZE);
541             uint64_t gid_val = txn->id64(txn);
542             uint64_t *gid_count_p = cast_to_typeof(gid_count_p) gid;  // make gcc --happy about -Wstrict-aliasing
543             *gid_count_p = gid_val;
544             int rr = txn->prepare(txn, gid, 0);
545             assert_zero(rr);
546         }
547         if (r == 0) {
548             if (!arg->cli->single_txn) {
549                 int flags = get_commit_flags(arg->cli);
550                 int chk_r = txn->commit(txn, flags); CKERR(chk_r);
551             }
552         } else {
553             if (arg->cli->crash_on_operation_failure) {
554                 CKERR(r);
555             } else {
556                 if (!arg->cli->single_txn) {
557                     { int chk_r = txn->abort(txn); CKERR(chk_r); }
558                 }
559             }
560         }
561         unlock_worker_op(we);
562         if (arg->track_thread_performance) {
563             we->counters[OPERATION]++;
564         }
565         if (arg->sleep_ms) {
566             usleep(arg->sleep_ms * 1000);
567         }
568     }
569     if (arg->cli->single_txn) {
570         int flags = get_commit_flags(arg->cli);
571         int chk_r = txn->commit(txn, flags); CKERR(chk_r);
572     } else if (arg->wrap_in_parent) {
573         int flags = get_commit_flags(arg->cli);
574         int chk_r = ptxn->commit(ptxn, flags); CKERR(chk_r);
575     }
576     if (verbose) {
577         toku_pthread_t self = toku_pthread_self();
578         uintptr_t intself = (uintptr_t) self;
579         printf("%lu returning\n", (unsigned long) intself);
580     }
581     toku_free(random_buf);
582     return arg;
583 }
584 
585 struct scan_cb_extra {
586     bool fast;
587     int curr_sum;
588     int num_elements;
589 };
590 
591 struct scan_op_extra {
592     bool fast;
593     bool fwd;
594     bool prefetch;
595 };
596 
597 static int
scan_cb(const DBT * key,const DBT * val,void * arg_v)598 scan_cb(const DBT *key, const DBT *val, void *arg_v) {
599     struct scan_cb_extra *CAST_FROM_VOIDP(cb_extra, arg_v);
600     assert(key);
601     assert(val);
602     assert(cb_extra);
603     assert(val->size >= sizeof(int));
604     cb_extra->curr_sum += *(int *) val->data;
605     cb_extra->num_elements++;
606     return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0;
607 }
608 
scan_op_and_maybe_check_sum(DB * db,DB_TXN * txn,struct scan_op_extra * sce,bool check_sum)609 static int scan_op_and_maybe_check_sum(
610     DB* db,
611     DB_TXN *txn,
612     struct scan_op_extra* sce,
613     bool check_sum
614     )
615 {
616     int r = 0;
617     DBC* cursor = nullptr;
618 
619     struct scan_cb_extra e = {
620         e.fast = sce->fast,
621         e.curr_sum = 0,
622         e.num_elements = 0,
623     };
624 
625     { int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); }
626     if (sce->prefetch) {
627         r = cursor->c_set_bounds(cursor, db->dbt_neg_infty(), db->dbt_pos_infty(), true, 0);
628         assert(r == 0);
629     }
630     while (r != DB_NOTFOUND) {
631         if (sce->fwd) {
632             r = cursor->c_getf_next(cursor, 0, scan_cb, &e);
633         }
634         else {
635             r = cursor->c_getf_prev(cursor, 0, scan_cb, &e);
636         }
637         assert(r==0 || r==DB_NOTFOUND);
638         if (!run_test) {
639             // terminate early because this op takes a while under drd.
640             // don't check the sum if we do this.
641             check_sum = false;
642             break;
643         }
644     }
645     { int chk_r = cursor->c_close(cursor); CKERR(chk_r); }
646     if (r == DB_NOTFOUND) {
647         r = 0;
648     }
649     if (check_sum && e.curr_sum) {
650         printf("e.curr_sum: %" PRId32 " e.num_elements: %" PRId32 " \n", e.curr_sum, e.num_elements);
651         abort();
652     }
653     return r;
654 }
655 
generate_row_for_put(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)656 static int generate_row_for_put(
657     DB *dest_db,
658     DB *src_db,
659     DBT_ARRAY *dest_keys,
660     DBT_ARRAY *dest_vals,
661     const DBT *src_key,
662     const DBT *src_val
663     )
664 {
665     invariant(!src_db || src_db != dest_db);
666     invariant(src_key->size >= sizeof(unsigned int));
667 
668     // Consistent pseudo random source.  Use checksum of key and val, and which db as seed
669 
670 /*
671     struct x1764 l;
672     x1764_init(&l);
673     x1764_add(&l, src_key->data, src_key->size);
674     x1764_add(&l, src_val->data, src_val->size);
675     x1764_add(&l, &dest_db, sizeof(dest_db)); //make it depend on which db
676     unsigned int seed = x1764_finish(&l);
677     */
678     unsigned int seed = *(unsigned int*)src_key->data;
679 
680     struct random_data random_data;
681     ZERO_STRUCT(random_data);
682     char random_buf[8];
683     {
684         int r = myinitstate_r(seed, random_buf, 8, &random_data);
685         assert_zero(r);
686     }
687 
688     uint8_t num_outputs = 0;
689     while (myrandom_r(&random_data) % 2) {
690         num_outputs++;
691         if (num_outputs > 8) {
692             break;
693         }
694     }
695 
696     toku_dbt_array_resize(dest_keys, num_outputs);
697     toku_dbt_array_resize(dest_vals, num_outputs);
698     int sum = 0;
699     for (uint8_t i = 0; i < num_outputs; i++) {
700         DBT *dest_key = &dest_keys->dbts[i];
701         DBT *dest_val = &dest_vals->dbts[i];
702 
703         invariant(dest_key->flags == DB_DBT_REALLOC);
704         invariant(dest_val->flags == DB_DBT_REALLOC);
705 
706         if (dest_key->ulen < src_key->size) {
707             dest_key->data = toku_xrealloc(dest_key->data, src_key->size);
708             dest_key->ulen = src_key->size;
709         }
710         dest_key->size = src_key->size;
711         if (dest_val->ulen < src_val->size) {
712             dest_val->data = toku_xrealloc(dest_val->data, src_val->size);
713             dest_val->ulen = src_val->size;
714         }
715         dest_val->size = src_val->size;
716         memcpy(dest_key->data, src_key->data, src_key->size);
717         ((uint8_t*)dest_key->data)[src_key->size-1] = i;  //Have different keys for each entry.
718 
719         memcpy(dest_val->data, src_val->data, src_val->size);
720         invariant(dest_val->size >= sizeof(int));
721         int number;
722         if (i == num_outputs - 1) {
723             // Make sum add to 0
724             number = -sum;
725         } else {
726             // Keep track of sum
727             number = myrandom_r(&random_data);
728         }
729         sum += number;
730         *(int *) dest_val->data = number;
731     }
732     invariant(sum == 0);
733     return 0;
734 }
735 
736 // How Keys Work:
737 //
738 // Keys are either
739 // - 4 byte little endian non-negative integers
740 // - 8 byte little endian non-negative integers
741 // - 8 byte little endian non-negative integers, padded with zeroes.
742 //
743 // The comparison function treats the key as a 4 byte
744 // int if the key size is exactly 4, and it treats
745 // the key as an 8 byte int if the key size is 8 or more.
746 
random_bounded_key(struct random_data * random_data,ARG arg)747 static int64_t random_bounded_key(struct random_data *random_data, ARG arg) {
748 // Effect: Returns a random key in the table, possible bounded by the number of elements.
749     int64_t key = myrandom_r(random_data);
750     if (arg->bounded_element_range && arg->cli->num_elements > 0) {
751         key = key % arg->cli->num_elements;
752     }
753     return key;
754 }
755 
breverse(int64_t v)756 static int64_t breverse(int64_t v)
757 // Effect: return the bits in i, reversed
758 // Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
759 // Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
760 {
761     uint64_t k = v; // r will be reversed bits of v; first get LSB of v
762     int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
763 
764     for (v >>= 1; v; v >>= 1) {
765         k <<= 1;
766         k |= v & 1;
767         s--;
768     }
769     k <<= s; // shift when v's highest bits are zero
770     int64_t r = k;
771     return r & ~(1ULL << 63);
772 }
773 
774 static void
fill_key_buf(int64_t key,uint8_t * data,struct cli_args * args)775 fill_key_buf(int64_t key, uint8_t *data, struct cli_args *args) {
776 // Effect: Fill data with a specific little-endian integer, 4 or 8 bytes long
777 //         depending on args->key_size, possibly padded with zeroes.
778 // Requires: *data is at least sizeof(uint64_t)
779     if (args->disperse_keys) {
780         key = breverse(key);
781     }
782     invariant(key >= 0);
783     if (args->key_size == sizeof(int)) {
784         const int key32 = args->memcmp_keys ? toku_htonl(key) : key;
785         memcpy(data, &key32, sizeof(key32));
786     } else {
787         invariant(args->key_size >= sizeof(key));
788         const int64_t key64 = args->memcmp_keys ? toku_htonl(key) : key;
789         memcpy(data, &key64, sizeof(key64));
790         memset(data + sizeof(key64), 0, args->key_size - sizeof(key64));
791     }
792 }
793 
794 static void
fill_key_buf_random(struct random_data * random_data,uint8_t * data,ARG arg)795 fill_key_buf_random(struct random_data *random_data, uint8_t *data, ARG arg) {
796 // Effect: Fill data with a random, little-endian, 4 or 8 byte integer, possibly
797 // bounded by the size of the table, and padded with zeroes until key_size.
798 // Requires, Notes: see fill_key_buf()
799     int64_t key = random_bounded_key(random_data, arg);
800     fill_key_buf(key, data, arg->cli);
801 }
802 
803 // How Vals Work:
804 //
805 // Values are either
806 // - 4 byte little endian integers
807 // - 4 byte little endian integers, padded with zeroes
808 // - X bytes random values, Y bytes zeroes, where X and Y
809 // are derived from the desired compressibility;
810 //
811 // Correctness tests use integer values, perf tests use random bytes.
812 // Both support padding out values > 4 bytes with zeroes.
813 
814 static void
fill_val_buf(int64_t val,uint8_t * data,uint32_t val_size)815 fill_val_buf(int64_t val, uint8_t *data, uint32_t val_size) {
816 // Effect, Requires, Notes: see fill_key_buf().
817     if (val_size == sizeof(int)) {
818         const int val32 = val;
819         memcpy(data, &val32, sizeof(val32));
820     } else {
821         invariant(val_size >= sizeof(val));
822         memcpy(data, &val, sizeof(val));
823         memset(data + sizeof(val), 0, val_size - sizeof(val));
824     }
825 }
826 
827 // Fill array with compressibility*size 0s.
828 // 0.0<=compressibility<=1.0
829 // Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away).
830 // The rest will be random data.
831 static void
fill_val_buf_random(struct random_data * random_data,uint8_t * data,struct cli_args * args)832 fill_val_buf_random(struct random_data *random_data, uint8_t *data, struct cli_args *args) {
833     invariant(args->val_size >= min_val_size);
834     //Requires: The array was zeroed since the last time 'size' was changed.
835     //Requires: compressibility is in range [0,1] indicating fraction that should be zeros.
836 
837     // Fill in the random bytes
838     uint32_t num_random_bytes = (1 - args->compressibility) * args->val_size;
839     if (num_random_bytes > 0) {
840         uint32_t filled;
841         for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) {
842             *((uint64_t *) &data[filled]) = myrandom_r(random_data);
843         }
844         if (filled != num_random_bytes) {
845             uint64_t last8 = myrandom_r(random_data);
846             memcpy(&data[filled], &last8, num_random_bytes - filled);
847         }
848     }
849 
850     // Fill in the zero bytes
851     if (num_random_bytes < args->val_size) {
852         memset(data + num_random_bytes, 0, args->val_size - num_random_bytes);
853     }
854 }
855 
random_put_in_db(DB * db,DB_TXN * txn,ARG arg,bool ignore_errors,void * stats_extra)856 static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) {
857     int r = 0;
858     uint8_t keybuf[arg->cli->key_size];
859     uint8_t valbuf[arg->cli->val_size];
860 
861     DBT key, val;
862     dbt_init(&key, keybuf, sizeof keybuf);
863     dbt_init(&val, valbuf, sizeof valbuf);
864     const int put_flags = get_put_flags(arg->cli);
865 
866     uint64_t puts_to_increment = 0;
867     for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
868         fill_key_buf_random(arg->random_data, keybuf, arg);
869         fill_val_buf_random(arg->random_data, valbuf, arg->cli);
870         r = db->put(db, txn, &key, &val, put_flags);
871         if (!ignore_errors && r != 0) {
872             goto cleanup;
873         }
874         puts_to_increment++;
875         if (puts_to_increment == 100) {
876             increment_counter(stats_extra, PUTS, puts_to_increment);
877             puts_to_increment = 0;
878         }
879     }
880 
881 cleanup:
882     increment_counter(stats_extra, PUTS, puts_to_increment);
883     return r;
884 }
885 
UU()886 static int UU() random_put_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
887     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
888     DB* db = arg->dbp[db_index];
889     return random_put_in_db(db, txn, arg, false, stats_extra);
890 }
891 
UU()892 static int UU() random_put_op_singledb(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
893     int db_index = arg->thread_idx%arg->cli->num_DBs;
894     DB* db = arg->dbp[db_index];
895     return random_put_in_db(db, txn, arg, false, stats_extra);
896 }
897 
898 struct serial_put_extra {
899     uint64_t current;
900 };
901 
UU()902 static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) {
903     struct serial_put_extra *CAST_FROM_VOIDP(extra, operation_extra);
904 
905     int db_index = arg->thread_idx % arg->cli->num_DBs;
906     DB* db = arg->dbp[db_index];
907 
908     int r = 0;
909     uint8_t keybuf[arg->cli->key_size];
910     uint8_t valbuf[arg->cli->val_size];
911 
912     DBT key, val;
913     dbt_init(&key, keybuf, sizeof keybuf);
914     dbt_init(&val, valbuf, sizeof valbuf);
915     const int put_flags = get_put_flags(arg->cli);
916 
917     uint64_t puts_to_increment = 0;
918     for (uint64_t i = 0; i < arg->cli->txn_size; ++i) {
919         // TODO: Change perf_insert to pass a single serial_put_op_extra
920         // to each insertion thread so they share the current key,
921         // and use a sync fetch an add here. This way you can measure
922         // the true performance of multiple threads appending unique
923         // keys to the end of a tree.
924         uint64_t k = extra->current++;
925         fill_key_buf(k, keybuf, arg->cli);
926         fill_val_buf_random(arg->random_data, valbuf, arg->cli);
927         r = db->put(db, txn, &key, &val, put_flags);
928         if (r != 0) {
929             goto cleanup;
930         }
931         puts_to_increment++;
932         if (puts_to_increment == 100) {
933             increment_counter(stats_extra, PUTS, puts_to_increment);
934             puts_to_increment = 0;
935         }
936     }
937 
938 cleanup:
939     increment_counter(stats_extra, PUTS, puts_to_increment);
940     return r;
941 }
942 
943 struct loader_op_extra {
944     struct scan_op_extra soe;
945     int num_dbs;
946 };
947 
UU()948 static int UU() loader_op(DB_TXN* txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
949     struct loader_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
950     invariant(extra->num_dbs >= 1);
951     DB_ENV* env = arg->env;
952     int r;
953     for (int num = 0; num < 2; num++) {
954         DB *dbs_load[extra->num_dbs];
955         uint32_t db_flags[extra->num_dbs];
956         uint32_t dbt_flags[extra->num_dbs];
957         for (int i = 0; i < extra->num_dbs; ++i) {
958             db_flags[i] = 0;
959             dbt_flags[i] = 0;
960             r = db_create(&dbs_load[i], env, 0);
961             assert(r == 0);
962             char fname[100];
963             sprintf(fname, "loader-db-%d", i);
964             // TODO: Need to call before_db_open_hook() and after_db_open_hook()
965             r = dbs_load[i]->open(dbs_load[i], txn, fname, nullptr, DB_BTREE, DB_CREATE, 0666);
966             assert(r == 0);
967         }
968         DB_LOADER *loader;
969         uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES;
970         r = env->create_loader(env, txn, &loader, dbs_load[0], extra->num_dbs, dbs_load, db_flags, dbt_flags, loader_flags);
971         CKERR(r);
972 
973         DBT key, val;
974         uint8_t keybuf[arg->cli->key_size];
975         uint8_t valbuf[arg->cli->val_size];
976         dbt_init(&key, keybuf, sizeof keybuf);
977         dbt_init(&val, valbuf, sizeof valbuf);
978 
979         int sum = 0;
980         const int num_elements = 1000;
981         for (int i = 0; i < num_elements; i++) {
982             fill_key_buf(i, keybuf, arg->cli);
983             fill_val_buf_random(arg->random_data, valbuf, arg->cli);
984 
985             assert(val.size >= sizeof(int));
986             if (i == num_elements - 1) {
987                 // Make sum add to 0
988                 *(int *) val.data = -sum;
989             } else {
990                 // Keep track of sum
991                 sum += *(int *) val.data;
992             }
993             r = loader->put(loader, &key, &val); CKERR(r);
994         }
995 
996         r = loader->close(loader); CKERR(r);
997 
998         for (int i = 0; i < extra->num_dbs; ++i) {
999             r = scan_op_and_maybe_check_sum(dbs_load[i], txn, &extra->soe, true); CKERR(r);
1000             r = dbs_load[i]->close(dbs_load[i], 0); CKERR(r);
1001             char fname[100];
1002             sprintf(fname, "loader-db-%d", i);
1003             r = env->dbremove(env, txn, fname, nullptr, 0); CKERR(r);
1004         }
1005     }
1006     return 0;
1007 }
1008 
UU()1009 static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1010     // Pick a random DB, do a keyrange operation.
1011     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1012     DB* db = arg->dbp[db_index];
1013 
1014     int r = 0;
1015     uint8_t keybuf[arg->cli->key_size];
1016 
1017     DBT key;
1018     dbt_init(&key, keybuf, sizeof keybuf);
1019     fill_key_buf_random(arg->random_data, keybuf, arg);
1020 
1021     uint64_t less,equal,greater;
1022     int is_exact;
1023     r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact);
1024     assert(r == 0);
1025     return r;
1026 }
1027 
UU()1028 static int UU() frag_op(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1029     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1030     DB *db = arg->dbp[db_index];
1031 
1032     TOKU_DB_FRAGMENTATION_S frag;
1033     int r = db->get_fragmentation(db, &frag);
1034     invariant_zero(r);
1035     return r;
1036 }
1037 
UU()1038 static void UU() get_key_after_bytes_callback(const DBT *UU(end_key), uint64_t UU(skipped), void *UU(extra)) {
1039     // nothing
1040 }
1041 
UU()1042 static int UU() get_key_after_bytes_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1043     // Pick a random DB, do a get_key_after_bytes operation.
1044     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1045     DB* db = arg->dbp[db_index];
1046 
1047     int r = 0;
1048     uint8_t keybuf[arg->cli->key_size];
1049 
1050     DBT start_key, end_key;
1051     dbt_init(&start_key, keybuf, sizeof keybuf);
1052     fill_key_buf_random(arg->random_data, keybuf, arg);
1053     uint64_t skip_len = myrandom_r(arg->random_data) % (2<<30);
1054     dbt_init(&end_key, nullptr, 0);
1055 
1056     r = db->get_key_after_bytes(db, txn, &start_key, skip_len, get_key_after_bytes_callback, nullptr, 0);
1057     return r;
1058 }
1059 
verify_progress_callback(void * UU (extra),float UU (progress))1060 static int verify_progress_callback(void *UU(extra), float UU(progress)) {
1061     if (!run_test) {
1062         return -1;
1063     }
1064     return 0;
1065 }
1066 
UU()1067 static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
1068     int r = 0;
1069     for (int i = 0; i < arg->cli->num_DBs && run_test; i++) {
1070         DB* db = arg->dbp[i];
1071         r = db->verify_with_progress(db, verify_progress_callback, nullptr, 1, 0);
1072         if (!run_test) {
1073             r = 0;
1074         }
1075         CKERR(r);
1076     }
1077     return r;
1078 }
1079 
UU()1080 static int UU() scan_op(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1081     struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
1082     for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
1083         int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, true);
1084         assert_zero(r);
1085     }
1086     return 0;
1087 }
1088 
UU()1089 static int UU() scan_op_no_check(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1090     struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
1091     for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
1092         int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, false);
1093         assert_zero(r);
1094     }
1095     return 0;
1096 }
1097 
1098 struct scan_op_worker_info {
1099     DB *db;
1100     DB_TXN *txn;
1101     void *extra;
1102 };
1103 
scan_op_worker(void * arg)1104 static void scan_op_worker(void *arg) {
1105     struct scan_op_worker_info *CAST_FROM_VOIDP(info, arg);
1106     struct scan_op_extra *CAST_FROM_VOIDP(extra, info->extra);
1107     int r = scan_op_and_maybe_check_sum(
1108             info->db,
1109             info->txn,
1110             extra,
1111             false
1112             );
1113     assert_zero(r);
1114     toku_free(info);
1115 }
1116 
UU()1117 static int UU() scan_op_no_check_parallel(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1118     const int num_cores = toku_os_get_number_processors();
1119     const int num_workers = arg->cli->num_DBs < num_cores ? arg->cli->num_DBs : num_cores;
1120     KIBBUTZ kibbutz = NULL;
1121     int r = toku_kibbutz_create(num_workers, &kibbutz);
1122     assert(r == 0);
1123     for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
1124         struct scan_op_worker_info *XCALLOC(info);
1125         info->db = arg->dbp[i];
1126         info->txn = txn;
1127         info->extra = operation_extra;
1128         toku_kibbutz_enq(kibbutz, scan_op_worker, info);
1129     }
1130     toku_kibbutz_destroy(kibbutz);
1131     return 0;
1132 }
1133 
dbt_do_nothing(DBT const * UU (key),DBT const * UU (row),void * UU (context))1134 static int dbt_do_nothing (DBT const *UU(key), DBT  const *UU(row), void *UU(context)) {
1135   return 0;
1136 }
1137 
UU()1138 static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) {
1139     int r = 0;
1140     uint8_t keybuf[arg->cli->key_size];
1141     DBT key, val;
1142     dbt_init(&key, keybuf, sizeof keybuf);
1143     dbt_init(&val, nullptr, 0);
1144     fill_key_buf_random(arg->random_data, keybuf, arg);
1145 
1146     r = db->getf_set(
1147         db,
1148         txn,
1149         0,
1150         &key,
1151         dbt_do_nothing,
1152         nullptr
1153         );
1154     if (check) {
1155         assert(r != DB_NOTFOUND);
1156     }
1157     r = 0;
1158     return r;
1159 }
1160 
UU()1161 static int UU() ptquery_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *stats_extra) {
1162     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1163     DB* db = arg->dbp[db_index];
1164     int r = ptquery_and_maybe_check_op(db, txn, arg, true);
1165     if (!r) {
1166         increment_counter(stats_extra, PTQUERIES, 1);
1167     }
1168     return r;
1169 }
1170 
UU()1171 static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *stats_extra) {
1172     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1173     DB* db = arg->dbp[db_index];
1174     int r = ptquery_and_maybe_check_op(db, txn, arg, false);
1175     if (!r) {
1176         increment_counter(stats_extra, PTQUERIES, 1);
1177     }
1178     return r;
1179 }
1180 
1181 typedef void (*rangequery_row_cb)(DB *db, const DBT *key, const DBT *val, void *extra);
1182 struct rangequery_cb_extra {
1183     int rows_read;
1184 
1185     // Call cb(db, key, value, cb_extra) on up to $limit rows.
1186     const int limit;
1187     const rangequery_row_cb cb;
1188     DB *const db;
1189     void *const cb_extra;
1190 };
1191 
rangequery_cb(const DBT * key,const DBT * value,void * extra)1192 static int rangequery_cb(const DBT *key, const DBT *value, void *extra) {
1193     struct rangequery_cb_extra *CAST_FROM_VOIDP(info, extra);
1194     if (info->cb != nullptr) {
1195         info->cb(info->db, key, value, info->cb_extra);
1196     }
1197     if (++info->rows_read >= info->limit) {
1198         return 0;
1199     } else {
1200         return TOKUDB_CURSOR_CONTINUE;
1201     }
1202 }
1203 
rangequery_db(DB * db,DB_TXN * txn,ARG arg,rangequery_row_cb cb,void * cb_extra)1204 static void rangequery_db(DB *db, DB_TXN *txn, ARG arg, rangequery_row_cb cb, void *cb_extra) {
1205     const int limit = arg->cli->range_query_limit;
1206 
1207     int r;
1208     DBC *cursor;
1209     DBT start_key, end_key;
1210     uint8_t start_keybuf[arg->cli->key_size];
1211     uint8_t end_keybuf[arg->cli->key_size];
1212     dbt_init(&start_key, start_keybuf, sizeof start_keybuf);
1213     dbt_init(&end_key, end_keybuf, sizeof end_keybuf);
1214     const uint64_t start_k = random_bounded_key(arg->random_data, arg);
1215     fill_key_buf(start_k, start_keybuf, arg->cli);
1216     fill_key_buf(start_k + limit, end_keybuf, arg->cli);
1217 
1218     r = db->cursor(db, txn, &cursor, 0); CKERR(r);
1219     r = cursor->c_set_bounds(cursor, &start_key, &end_key, true, 0); CKERR(r);
1220 
1221     struct rangequery_cb_extra extra = {
1222         .rows_read = 0,
1223         .limit = limit,
1224         .cb = cb,
1225         .db = db,
1226         .cb_extra = cb_extra,
1227     };
1228     r = cursor->c_getf_set(cursor, 0, &start_key, rangequery_cb, &extra);
1229     while (r == 0 && extra.rows_read < extra.limit && run_test) {
1230         r = cursor->c_getf_next(cursor, 0, rangequery_cb, &extra);
1231     }
1232 
1233     r = cursor->c_close(cursor); CKERR(r);
1234 }
1235 
UU()1236 static int UU() rangequery_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
1237     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1238     DB *db = arg->dbp[db_index];
1239     rangequery_db(db, txn, arg, nullptr, nullptr);
1240     increment_counter(stats_extra, PTQUERIES, 1);
1241     return 0;
1242 }
1243 
UU()1244 static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1245     int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0;
1246     DB* db = arg->dbp[db_index];
1247     DBC* cursor = nullptr;
1248     int r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
1249     r = cursor->c_close(cursor); assert(r == 0);
1250     return 0;
1251 }
1252 
1253 #define MAX_RANDOM_VAL 10000
1254 
1255 enum update_type {
1256     UPDATE_ADD_DIFF,
1257     UPDATE_NEGATE,
1258     UPDATE_WITH_HISTORY
1259 };
1260 
1261 struct update_op_extra {
1262     enum update_type type;
1263     int pad_bytes;
1264     union {
1265         struct {
1266             int diff;
1267         } d;
1268         struct {
1269             int expected;
1270             int new_val;
1271         } h;
1272     } u;
1273 };
1274 
1275 struct update_op_args {
1276     int *update_history_buffer;
1277     int update_pad_frequency;
1278 };
1279 
UU()1280 static struct update_op_args UU() get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) {
1281     struct update_op_args uoe;
1282     uoe.update_history_buffer = update_history_buffer;
1283     uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary
1284     return uoe;
1285 }
1286 
1287 static uint64_t update_count = 0;
1288 
update_op_callback(DB * UU (db),const DBT * UU (key),const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)1289 static int update_op_callback(DB *UU(db), const DBT *UU(key),
1290                               const DBT *old_val,
1291                               const DBT *extra,
1292                               void (*set_val)(const DBT *new_val,
1293                                               void *set_extra),
1294                               void *set_extra)
1295 {
1296     int old_int_val = 0;
1297     if (old_val) {
1298         old_int_val = *(int *) old_val->data;
1299     }
1300     assert(extra->size == sizeof(struct update_op_extra));
1301     struct update_op_extra *CAST_FROM_VOIDP(e, extra->data);
1302 
1303     int new_int_val;
1304     switch (e->type) {
1305     case UPDATE_ADD_DIFF:
1306         new_int_val = old_int_val + e->u.d.diff;
1307         break;
1308     case UPDATE_NEGATE:
1309         new_int_val = -old_int_val;
1310         break;
1311     case UPDATE_WITH_HISTORY:
1312         assert(old_int_val == e->u.h.expected);
1313         new_int_val = e->u.h.new_val;
1314         break;
1315     default:
1316         abort();
1317     }
1318 
1319     uint32_t val_size = sizeof(int) + e->pad_bytes;
1320     uint8_t valbuf[val_size];
1321     fill_val_buf(new_int_val, valbuf, val_size);
1322 
1323     DBT new_val;
1324     dbt_init(&new_val, valbuf, val_size);
1325     set_val(&new_val, set_extra);
1326     return 0;
1327 }
1328 
UU()1329 static int UU() update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1330     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1331     DB* db = arg->dbp[db_index];
1332 
1333     int r = 0;
1334     DBT key, val;
1335     uint8_t keybuf[arg->cli->key_size];
1336 
1337     toku_sync_fetch_and_add(&update_count, 1);
1338     struct update_op_extra extra;
1339     ZERO_STRUCT(extra);
1340     extra.type = UPDATE_ADD_DIFF;
1341     extra.pad_bytes = 0;
1342     int curr_val_sum = 0;
1343 
1344     dbt_init(&key, keybuf, sizeof keybuf);
1345     dbt_init(&val, &extra, sizeof extra);
1346 
1347     for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
1348         fill_key_buf_random(arg->random_data, keybuf, arg);
1349         extra.u.d.diff = 1;
1350         curr_val_sum += extra.u.d.diff;
1351         r = db->update(
1352             db,
1353             txn,
1354             &key,
1355             &val,
1356             0
1357             );
1358         if (r != 0) {
1359             return r;
1360         }
1361         int *rkp = (int *) keybuf;
1362         int rand_key = *rkp;
1363         invariant(rand_key != (arg->cli->num_elements - rand_key));
1364         rand_key -= arg->cli->num_elements;
1365         fill_key_buf(rand_key, keybuf, arg->cli);
1366         extra.u.d.diff = -1;
1367         r = db->update(
1368             db,
1369             txn,
1370             &key,
1371             &val,
1372             0
1373             );
1374         if (r != 0) {
1375             return r;
1376         }
1377     }
1378     return r;
1379 }
1380 
pre_acquire_write_lock(DB * db,DB_TXN * txn,const DBT * left_key,const DBT * right_key)1381 static int pre_acquire_write_lock(DB *db, DB_TXN *txn,
1382         const DBT *left_key, const DBT *right_key) {
1383     int r;
1384     DBC *cursor;
1385 
1386     r = db->cursor(db, txn, &cursor, DB_RMW);
1387     CKERR(r);
1388     int cursor_r = cursor->c_set_bounds(cursor, left_key, right_key, true, 0);
1389     r = cursor->c_close(cursor);
1390     CKERR(r);
1391 
1392     return cursor_r;
1393 }
1394 
1395 // take the given db and do an update on it
1396 static int
UU()1397 UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1398     uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1);
1399     struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
1400     struct update_op_extra extra;
1401     ZERO_STRUCT(extra);
1402     extra.type = UPDATE_ADD_DIFF;
1403     extra.pad_bytes = 0;
1404     if (op_args->update_pad_frequency) {
1405         if (old_update_count % (2*op_args->update_pad_frequency) == old_update_count%op_args->update_pad_frequency) {
1406             extra.pad_bytes = 100;
1407         }
1408     }
1409 
1410     int r = 0;
1411     DBT key, val;
1412     uint8_t keybuf[arg->cli->key_size];
1413     int update_key;
1414     int curr_val_sum = 0;
1415     const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0;
1416 
1417     for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
1418         if (arg->prelock_updates) {
1419             if (i == 0) {
1420                 update_key = random_bounded_key(arg->random_data, arg);
1421 
1422                 const int max_key_in_table = arg->cli->num_elements - 1;
1423                 const bool range_wraps = (update_key + (int) arg->cli->txn_size - 1) > max_key_in_table;
1424                 int left_key, right_key;
1425                 DBT left_key_dbt, right_key_dbt;
1426 
1427                 // acquire the range starting at the random key, plus txn_size - 1
1428                 // elements, but lock no further than the end of the table. if the
1429                 // range wraps around to the beginning we will handle it below.
1430                 left_key = update_key;
1431                 right_key = range_wraps ? max_key_in_table : (left_key + arg->cli->txn_size - 1);
1432                 r = pre_acquire_write_lock(
1433                         db,
1434                         txn,
1435                         dbt_init(&left_key_dbt, &left_key, sizeof update_key),
1436                         dbt_init(&right_key_dbt, &right_key, sizeof right_key)
1437                         );
1438                 if (r != 0) {
1439                     return r;
1440                 }
1441 
1442                 // check if the right end point wrapped around to the beginning
1443                 // if so, lock from 0 to the right key, modded by table size.
1444                 if (range_wraps) {
1445                     right_key = (left_key + arg->cli->txn_size - 1) - max_key_in_table;
1446                     invariant(right_key > 0);
1447                     left_key = 0;
1448                     r = pre_acquire_write_lock(
1449                             db,
1450                             txn,
1451                             dbt_init(&left_key_dbt, &left_key, sizeof update_key),
1452                             dbt_init(&right_key_dbt, &right_key, sizeof right_key)
1453                             );
1454                     if (r != 0) {
1455                         return r;
1456                     }
1457                 }
1458             } else {
1459                 update_key++;
1460                 if (arg->bounded_element_range) {
1461                     update_key = update_key % arg->cli->num_elements;
1462                 }
1463             }
1464             fill_key_buf(update_key, keybuf, arg->cli);
1465         } else {
1466             // just do a usual, random point update without locking first
1467             fill_key_buf_random(arg->random_data, keybuf, arg);
1468         }
1469 
1470 
1471         // the last update keeps the table's sum as zero
1472         // every other update except the last applies a random delta
1473         if (i == arg->cli->txn_size - 1) {
1474             extra.u.d.diff = -curr_val_sum;
1475         } else {
1476             extra.u.d.diff = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
1477             // just make every other value random
1478             if (i%2 == 0) {
1479                 extra.u.d.diff = -extra.u.d.diff;
1480             }
1481             curr_val_sum += extra.u.d.diff;
1482         }
1483 
1484         dbt_init(&key, keybuf, sizeof keybuf);
1485         dbt_init(&val, &extra, sizeof extra);
1486 
1487         // do the update
1488         r = db->update(
1489             db,
1490             txn,
1491             &key,
1492             &val,
1493             update_flags
1494             );
1495         if (r != 0) {
1496             return r;
1497         }
1498     }
1499 
1500     return r;
1501 }
1502 
1503 // choose a random DB and do an update on it
1504 static int
UU()1505 UU() update_op(DB_TXN *txn, ARG arg, void* operation_extra, void *stats_extra) {
1506     int db_index = myrandom_r(arg->random_data) % arg->cli->num_DBs;
1507     DB *db = arg->dbp[db_index];
1508     return update_op_db(db, txn, arg, operation_extra, stats_extra);
1509 }
1510 
UU()1511 static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
1512     struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
1513     assert(arg->bounded_element_range);
1514     assert(op_args->update_history_buffer);
1515 
1516     int r = 0;
1517     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1518     DB* db = arg->dbp[db_index];
1519 
1520     struct update_op_extra extra;
1521     ZERO_STRUCT(extra);
1522     extra.type = UPDATE_WITH_HISTORY;
1523     uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1);
1524     extra.pad_bytes = 0;
1525     if (op_args->update_pad_frequency) {
1526         if (old_update_count % (2*op_args->update_pad_frequency) != old_update_count%op_args->update_pad_frequency) {
1527             extra.pad_bytes = 500;
1528         }
1529     }
1530 
1531     DBT key, val;
1532     uint8_t keybuf[arg->cli->key_size];
1533     int rand_key;
1534     int curr_val_sum = 0;
1535 
1536     dbt_init(&key, keybuf, sizeof keybuf);
1537     dbt_init(&val, &extra, sizeof extra);
1538 
1539     for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
1540         fill_key_buf_random(arg->random_data, keybuf, arg);
1541         int *rkp = (int *) keybuf;
1542         rand_key = *rkp;
1543         invariant(rand_key < arg->cli->num_elements);
1544         if (i < arg->cli->txn_size - 1) {
1545             extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
1546             // just make every other value random
1547             if (i % 2 == 0) {
1548                 extra.u.h.new_val = -extra.u.h.new_val;
1549             }
1550             curr_val_sum += extra.u.h.new_val;
1551         } else {
1552             // the last update should ensure the sum stays zero
1553             extra.u.h.new_val = -curr_val_sum;
1554         }
1555         extra.u.h.expected = op_args->update_history_buffer[rand_key];
1556         op_args->update_history_buffer[rand_key] = extra.u.h.new_val;
1557         r = db->update(
1558             db,
1559             txn,
1560             &key,
1561             &val,
1562             0
1563             );
1564         if (r != 0) {
1565             return r;
1566         }
1567     }
1568 
1569     return r;
1570 }
1571 
UU()1572 static int UU() update_broadcast_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
1573     struct update_op_extra extra;
1574     ZERO_STRUCT(extra);
1575     int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
1576     DB* db = arg->dbp[db_index];
1577     extra.type = UPDATE_NEGATE;
1578     extra.pad_bytes = 0;
1579     DBT val;
1580     int r = db->update_broadcast(db, txn, dbt_init(&val, &extra, sizeof extra), 0);
1581     CKERR(r);
1582     return r;
1583 }
1584 
hot_progress_callback(void * UU (extra),float UU (progress))1585 static int hot_progress_callback(void *UU(extra), float UU(progress)) {
1586     return run_test ? 0 : 1;
1587 }
1588 
UU()1589 static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
1590     int r;
1591     for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
1592         DB* db = arg->dbp[i];
1593         uint64_t loops_run;
1594         r = db->hot_optimize(db, NULL, NULL, hot_progress_callback, nullptr, &loops_run);
1595         if (run_test) {
1596             CKERR(r);
1597         }
1598     }
1599     return 0;
1600 }
1601 
1602 static void
get_ith_table_name(char * buf,size_t len,int i)1603 get_ith_table_name(char *buf, size_t len, int i) {
1604     snprintf(buf, len, "main%d", i);
1605 }
1606 
1607 DB_TXN * const null_txn = 0;
1608 
1609 // For each line of engine status output, look for lines that contain substrings
1610 // that match any of the strings in the pattern string. The pattern string contains
1611 // 0 or more strings separated by the '|' character, kind of like a regex.
print_matching_engine_status_rows(DB_ENV * env,const char * pattern)1612 static void print_matching_engine_status_rows(DB_ENV *env, const char *pattern) {
1613     uint64_t num_rows;
1614     env->get_engine_status_num_rows(env, &num_rows);
1615     uint64_t buf_size = num_rows * 128;
1616     const char *row;
1617     char *row_r;
1618 
1619     char *pattern_copy = toku_xstrdup(pattern);
1620     int num_patterns = 1;
1621     for (char *p = pattern_copy; *p != '\0'; p++) {
1622         if (*p == '|') {
1623             *p = '\0';
1624             num_patterns++;
1625         }
1626     }
1627 
1628     char *XMALLOC_N(buf_size, buf);
1629     int r = env->get_engine_status_text(env, buf, buf_size);
1630     invariant_zero(r);
1631 
1632     for (row = strtok_r(buf, "\n", &row_r); row != nullptr; row = strtok_r(nullptr, "\n", &row_r)) {
1633         const char *p = pattern_copy;
1634         for (int i = 0; i < num_patterns; i++, p += strlen(p) + 1) {
1635             if (strstr(row, p) != nullptr) {
1636                 fprintf(stderr, "%s\n", row);
1637             }
1638         }
1639     }
1640 
1641     toku_free(pattern_copy);
1642     toku_free(buf);
1643     fflush(stderr);
1644 }
1645 
1646 // TODO: stuff like this should be in a generalized header somwhere
1647 static inline int
intmin(const int a,const int b)1648 intmin(const int a, const int b)
1649 {
1650     if (a < b) {
1651         return a;
1652     }
1653     return b;
1654 }
1655 
1656 struct test_time_extra {
1657     DB_ENV *env;
1658     int num_seconds;
1659     bool crash_at_end;
1660     struct worker_extra *wes;
1661     int num_wes;
1662     struct cli_args *cli_args;
1663 };
1664 
test_time(void * arg)1665 static void *test_time(void *arg) {
1666     struct test_time_extra* CAST_FROM_VOIDP(tte, arg);
1667     DB_ENV *env = tte->env;
1668     int num_seconds = tte->num_seconds;
1669     const struct perf_formatter *perf_formatter = &perf_formatters[tte->cli_args->perf_output_format];
1670 
1671     //
1672     // if num_Seconds is set to 0, run indefinitely
1673     //
1674     if (num_seconds == 0) {
1675         num_seconds = INT32_MAX;
1676     }
1677     uint64_t last_counter_values[tte->num_wes][(int) NUM_OPERATION_TYPES];
1678     ZERO_ARRAY(last_counter_values);
1679     uint64_t *counters[tte->num_wes];
1680     for (int t = 0; t < tte->num_wes; ++t) {
1681         counters[t] = tte->wes[t].counters;
1682     }
1683     if (verbose) {
1684         printf("Sleeping for %d seconds\n", num_seconds);
1685     }
1686     for (int i = 0; i < num_seconds; ) {
1687         struct timeval tv[2];
1688         const int sleeptime = intmin(tte->cli_args->performance_period, num_seconds - i);
1689         int r = gettimeofday(&tv[0], nullptr);
1690         assert_zero(r);
1691         usleep(sleeptime*1000*1000);
1692         r = gettimeofday(&tv[1], nullptr);
1693         assert_zero(r);
1694         int actual_sleeptime = tv[1].tv_sec - tv[0].tv_sec;
1695         if (abs(actual_sleeptime - sleeptime) <= 1) {
1696             // Close enough, no need to alarm the user, and we didn't check nsec.
1697             i += sleeptime;
1698         } else {
1699             if (verbose) {
1700                 printf("tried to sleep %d secs, actually slept %d secs\n", sleeptime, actual_sleeptime);
1701             }
1702             i += actual_sleeptime;
1703         }
1704         if (tte->cli_args->print_performance && tte->cli_args->print_iteration_performance) {
1705             perf_formatter->iteration(tte->cli_args, i, last_counter_values, counters, tte->num_wes);
1706         }
1707         if (tte->cli_args->print_engine_status != nullptr) {
1708             print_matching_engine_status_rows(env, tte->cli_args->print_engine_status);
1709         }
1710     }
1711 
1712     if (verbose) {
1713         printf("should now end test\n");
1714     }
1715     toku_sync_bool_compare_and_swap(&run_test, true, false); // make this atomic to make valgrind --tool=drd happy.
1716     if (verbose) {
1717         printf("run_test %d\n", run_test);
1718     }
1719     if (tte->crash_at_end) {
1720         toku_hard_crash_on_purpose();
1721     }
1722     return arg;
1723 }
1724 
1725 struct sleep_and_crash_extra {
1726     toku_mutex_t mutex;
1727     toku_cond_t cond;
1728     int seconds;
1729     bool is_setup;
1730     bool threads_have_joined;
1731 };
1732 
sleep_and_crash(void * extra)1733 static void *sleep_and_crash(void *extra) {
1734     sleep_and_crash_extra *e = static_cast<sleep_and_crash_extra *>(extra);
1735     toku_mutex_lock(&e->mutex);
1736     struct timeval tv;
1737     toku_timespec_t ts;
1738     gettimeofday(&tv, nullptr);
1739     ts.tv_sec = tv.tv_sec + e->seconds;
1740     ts.tv_nsec = 0;
1741     e->is_setup = true;
1742     if (verbose) {
1743         printf("Waiting %d seconds for other threads to join.\n", e->seconds);
1744         fflush(stdout);
1745     }
1746     int r = toku_cond_timedwait(&e->cond, &e->mutex, &ts);
1747     toku_mutex_assert_locked(&e->mutex);
1748     if (r == ETIMEDOUT) {
1749         invariant(!e->threads_have_joined);
1750         if (verbose) {
1751             printf("Some thread didn't join on time, crashing.\n");
1752             fflush(stdout);
1753         }
1754         toku_crash_and_dump_core_on_purpose();
1755     } else {
1756         assert(r == 0);
1757         assert(e->threads_have_joined);
1758         if (verbose) {
1759             printf("Other threads joined on time, exiting cleanly.\n");
1760         }
1761     }
1762     toku_mutex_unlock(&e->mutex);
1763     return nullptr;
1764 }
1765 
run_workers(struct arg * thread_args,int num_threads,uint32_t num_seconds,bool crash_at_end,struct cli_args * cli_args)1766 static int run_workers(
1767     struct arg *thread_args,
1768     int num_threads,
1769     uint32_t num_seconds,
1770     bool crash_at_end,
1771     struct cli_args* cli_args
1772     )
1773 {
1774     int r;
1775     const struct perf_formatter *perf_formatter =
1776         &perf_formatters[cli_args->perf_output_format];
1777     toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER;
1778     toku_mutex_init(toku_uninstrumented, &mutex, nullptr);
1779     struct st_rwlock rwlock;
1780     rwlock_init(toku_uninstrumented, &rwlock);
1781     toku_pthread_t tids[num_threads];
1782     toku_pthread_t time_tid;
1783     if (cli_args->print_performance) {
1784         perf_formatter->header(cli_args, num_threads);
1785     }
1786     // allocate worker_extra's on cache line boundaries
1787     struct worker_extra *XMALLOC_N_ALIGNED(64, num_threads, worker_extra);
1788     struct test_time_extra tte;
1789     tte.env = thread_args[0].env;
1790     tte.num_seconds = num_seconds;
1791     tte.crash_at_end = crash_at_end;
1792     tte.wes = worker_extra;
1793     tte.num_wes = num_threads;
1794     tte.cli_args = cli_args;
1795     run_test = true;
1796     for (int i = 0; i < num_threads; ++i) {
1797         thread_args[i].thread_idx = i;
1798         thread_args[i].num_threads = num_threads;
1799         worker_extra[i].thread_arg = &thread_args[i];
1800         worker_extra[i].operation_lock = &rwlock;
1801         worker_extra[i].operation_lock_mutex = &mutex;
1802         XCALLOC_N((int)NUM_OPERATION_TYPES, worker_extra[i].counters);
1803         TOKU_DRD_IGNORE_VAR(worker_extra[i].counters);
1804         {
1805             int chk_r = toku_pthread_create(toku_uninstrumented,
1806                                             &tids[i],
1807                                             nullptr,
1808                                             worker,
1809                                             &worker_extra[i]);
1810             CKERR(chk_r);
1811         }
1812         if (verbose)
1813             printf("%lu created\n", (unsigned long)tids[i]);
1814     }
1815     {
1816         int chk_r = toku_pthread_create(
1817             toku_uninstrumented, &time_tid, nullptr, test_time, &tte);
1818         CKERR(chk_r);
1819     }
1820     if (verbose)
1821         printf("%lu created\n", (unsigned long)time_tid);
1822 
1823     void *ret;
1824     r = toku_pthread_join(time_tid, &ret); assert_zero(r);
1825     if (verbose) printf("%lu joined\n", (unsigned long) time_tid);
1826 
1827     {
1828         // Set an alarm that will kill us if it takes too long to join all the
1829         // threads (i.e. there is some runaway thread).
1830         struct sleep_and_crash_extra sac_extra;
1831         ZERO_STRUCT(sac_extra);
1832         toku_mutex_init(toku_uninstrumented, &sac_extra.mutex, nullptr);
1833         toku_cond_init(toku_uninstrumented, &sac_extra.cond, nullptr);
1834         sac_extra.seconds = cli_args->join_timeout;
1835         sac_extra.is_setup = false;
1836         sac_extra.threads_have_joined = false;
1837 
1838         toku_mutex_lock(&sac_extra.mutex);
1839         toku_pthread_t sac_thread;
1840         r = toku_pthread_create(toku_uninstrumented,
1841                                 &sac_thread,
1842                                 nullptr,
1843                                 sleep_and_crash,
1844                                 &sac_extra);
1845         assert_zero(r);
1846         // Wait for sleep_and_crash thread to get set up, spinning is ok, this
1847         // should be quick.
1848         while (!sac_extra.is_setup) {
1849             toku_mutex_unlock(&sac_extra.mutex);
1850             r = toku_pthread_yield();
1851             assert_zero(r);
1852             toku_mutex_lock(&sac_extra.mutex);
1853         }
1854         toku_mutex_unlock(&sac_extra.mutex);
1855 
1856         // Timeout thread has started, join everyone
1857         for (int i = 0; i < num_threads; ++i) {
1858             r = toku_pthread_join(tids[i], &ret); assert_zero(r);
1859             if (verbose)
1860                 printf("%lu joined\n", (unsigned long) tids[i]);
1861         }
1862 
1863         // Signal timeout thread not to crash.
1864         toku_mutex_lock(&sac_extra.mutex);
1865         sac_extra.threads_have_joined = true;
1866         toku_cond_signal(&sac_extra.cond);
1867         toku_mutex_unlock(&sac_extra.mutex);
1868         r = toku_pthread_join(sac_thread, nullptr);
1869         assert_zero(r);
1870         toku_cond_destroy(&sac_extra.cond);
1871         toku_mutex_destroy(&sac_extra.mutex);
1872     }
1873 
1874     if (cli_args->print_performance) {
1875         uint64_t *counters[num_threads];
1876         for (int i = 0; i < num_threads; ++i) {
1877             counters[i] = worker_extra[i].counters;
1878         }
1879         perf_formatter->totals(cli_args, counters, num_threads);
1880     }
1881 
1882     for (int i = 0; i < num_threads; ++i) {
1883         toku_free(worker_extra[i].counters);
1884     }
1885     if (verbose)
1886         printf("ending test, pthreads have joined\n");
1887     rwlock_destroy(&rwlock);
1888     toku_mutex_destroy(&mutex);
1889     toku_free(worker_extra);
1890     return r;
1891 }
1892 
1893 // Pre-open hook
do_nothing_before_db_open(DB * UU (db),int UU (idx))1894 static void do_nothing_before_db_open(DB *UU(db), int UU(idx)) { }
1895 // Requires: DB is created (allocated) but not opened. idx is the index
1896 //           into the DBs array.
1897 static void (*before_db_open_hook)(DB *db, int idx) = do_nothing_before_db_open;
1898 
1899 // Post-open hook
1900 typedef void (*reopen_db_fn)(DB *db, int idx, struct cli_args *cli_args);
do_nothing_after_db_open(DB_ENV * UU (env),DB * db,int UU (idx),reopen_db_fn UU (reopen),struct cli_args * UU (cli_args))1901 static DB *do_nothing_after_db_open(DB_ENV *UU(env), DB *db, int UU(idx), reopen_db_fn UU(reopen), struct cli_args *UU(cli_args)) { return db; }
1902 // Requires: DB is opened and is the 'idx' db in the DBs array.
1903 // Note: Reopen function may be used to open a db if the given one was closed.
1904 // Returns: An opened db.
1905 static DB *(*after_db_open_hook)(DB_ENV *env, DB *db, int idx, reopen_db_fn reopen, struct cli_args *cli_args) = do_nothing_after_db_open;
1906 
open_db_for_create(DB * db,int idx,struct cli_args * cli_args)1907 static void open_db_for_create(DB *db, int idx, struct cli_args *cli_args) {
1908     int r;
1909     char name[30];
1910     memset(name, 0, sizeof(name));
1911     get_ith_table_name(name, sizeof(name), idx);
1912     r = db->set_flags(db, 0); CKERR(r);
1913     r = db->set_fanout(db, cli_args->env_args.fanout); CKERR(r);
1914     r = db->set_pagesize(db, cli_args->env_args.node_size); CKERR(r);
1915     r = db->set_readpagesize(db, cli_args->env_args.basement_node_size); CKERR(r);
1916     r = db->set_compression_method(db, cli_args->compression_method); CKERR(r);
1917     const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0);
1918     r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666); CKERR(r);
1919 }
1920 
open_db(DB * db,int idx,struct cli_args * cli_args)1921 static void open_db(DB *db, int idx, struct cli_args *cli_args) {
1922     int r;
1923     char name[30];
1924     memset(name, 0, sizeof(name));
1925     get_ith_table_name(name, sizeof(name), idx);
1926     const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0);
1927     r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666); CKERR(r);
1928     r = db->change_fanout(db, cli_args->env_args.fanout); CKERR(r); // change fanout until fanout is persistent
1929 }
1930 
create_tables(DB_ENV ** env_res,DB ** db_res,int num_DBs,int (* bt_compare)(DB *,const DBT *,const DBT *),struct cli_args * cli_args)1931 static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
1932                         int (*bt_compare)(DB *, const DBT *, const DBT *),
1933                         struct cli_args *cli_args
1934 ) {
1935     int r;
1936     struct env_args env_args = cli_args->env_args;
1937 
1938     char rmcmd[32 + strlen(env_args.envdir)]; sprintf(rmcmd, "rm -rf %s", env_args.envdir);
1939     r = system(rmcmd);
1940     CKERR(r);
1941     r = toku_os_mkdir(env_args.envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
1942 
1943     DB_ENV *env;
1944     db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes);
1945     r = db_env_create(&env, 0); assert(r == 0);
1946     r = env->set_redzone(env, 0); CKERR(r);
1947     if (!cli_args->memcmp_keys) {
1948         r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
1949     }
1950     r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
1951     r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
1952     r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
1953     if (env_args.generate_put_callback) {
1954         r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
1955         CKERR(r);
1956     }
1957     else {
1958         r = env->set_generate_row_callback_for_put(env, generate_row_for_put);
1959         CKERR(r);
1960     }
1961     if (env_args.generate_del_callback) {
1962         r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback);
1963         CKERR(r);
1964     }
1965     int env_flags = get_env_open_flags(cli_args);
1966     r = env->open(env, env_args.envdir, env_flags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
1967     r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
1968     r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
1969     r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
1970     env->change_fsync_log_period(env, env_args.sync_period);
1971     *env_res = env;
1972 
1973     for (int i = 0; i < num_DBs; i++) {
1974         DB *db;
1975         r = db_create(&db, env, 0); CKERR(r);
1976         before_db_open_hook(db, i);
1977         open_db_for_create(db, i, cli_args);
1978         db_res[i] = after_db_open_hook(env, db, i, open_db_for_create, cli_args);
1979     }
1980     return r;
1981 }
1982 
report_overall_fill_table_progress(struct cli_args * args,int num_rows)1983 static void report_overall_fill_table_progress(struct cli_args *args, int num_rows) {
1984     // for sanitary reasons we'd like to prevent two threads
1985     // from printing the same performance report twice.
1986     static bool reporting;
1987 
1988     // when was the first time measurement taken?
1989     static uint64_t t0;
1990     static int rows_inserted;
1991 
1992     // when was the last report? what was its progress?
1993     static uint64_t last_report;
1994     static double last_progress;
1995     if (t0 == 0) {
1996         t0 = toku_current_time_microsec();
1997         last_report = t0;
1998     }
1999 
2000     uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows);
2001     double progress = rows_so_far / (args->num_elements * args->num_DBs * 1.0);
2002     if (progress > (last_progress + .01)) {
2003         uint64_t t1 = toku_current_time_microsec();
2004         const uint64_t minimum_report_period = 5 * 1000000;
2005         if (t1 > last_report + minimum_report_period
2006                 && toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) {
2007             double inserts_per_sec = (rows_so_far*1000000) / ((t1 - t0) * 1.0);
2008             printf("fill tables: %ld%% complete, %.2lf rows/sec\n",
2009                     (long)(progress * 100), inserts_per_sec);
2010             last_progress = progress;
2011             last_report = t1;
2012             reporting = false;
2013         }
2014     }
2015 }
2016 
fill_single_table(DB_ENV * env,DB * db,struct cli_args * args,bool fill_with_zeroes)2017 static void fill_single_table(DB_ENV *env, DB *db, struct cli_args *args, bool fill_with_zeroes) {
2018     const int min_size_for_loader = 1 * 1000 * 1000;
2019     const int puts_per_txn = 10 * 1000;;
2020 
2021     int r = 0;
2022     DB_TXN *txn = nullptr;
2023     DB_LOADER *loader = nullptr;
2024     struct random_data random_data;
2025     char random_buf[8];
2026     memset(&random_data, 0, sizeof(random_data));
2027     memset(random_buf, 0, 8);
2028     r = myinitstate_r(random(), random_buf, 8, &random_data); CKERR(r);
2029 
2030     uint8_t keybuf[args->key_size], valbuf[args->val_size];
2031     memset(keybuf, 0, sizeof keybuf);
2032     memset(valbuf, 0, sizeof valbuf);
2033     DBT key, val;
2034     dbt_init(&key, keybuf, args->key_size);
2035     dbt_init(&val, valbuf, args->val_size);
2036 
2037     r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
2038     if (args->num_elements >= min_size_for_loader) {
2039         uint32_t db_flags = DB_PRELOCKED_WRITE;
2040         uint32_t dbt_flags = 0;
2041         r = env->create_loader(env, txn, &loader, db, 1, &db, &db_flags, &dbt_flags, 0); CKERR(r);
2042     }
2043 
2044     for (int i = 0; i < args->num_elements; i++) {
2045         fill_key_buf(i, keybuf, args);
2046 
2047         // Correctness tests map every key to zeroes. Perf tests fill
2048         // values with random bytes, based on compressibility.
2049         if (fill_with_zeroes) {
2050             fill_val_buf(0, valbuf, args->val_size);
2051         } else {
2052             fill_val_buf_random(&random_data, valbuf, args);
2053         }
2054 
2055         r = loader ? loader->put(loader, &key, &val) :
2056                      db->put(db, txn, &key, &val, DB_PRELOCKED_WRITE);
2057         CKERR(r);
2058 
2059         if (i > 0 && i % puts_per_txn == 0) {
2060             if (verbose) {
2061                 report_overall_fill_table_progress(args, puts_per_txn);
2062             }
2063             // begin a new txn if we're not using the loader,
2064             if (loader == nullptr) {
2065                 r = txn->commit(txn, 0); CKERR(r);
2066                 r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
2067             }
2068         }
2069     }
2070 
2071     if (loader) {
2072         r = loader->close(loader); CKERR(r);
2073     }
2074     r = txn->commit(txn, 0); CKERR(r);
2075 }
2076 
2077 struct fill_table_worker_info {
2078     struct cli_args *args;
2079     DB_ENV *env;
2080     DB *db;
2081     bool fill_with_zeroes;
2082 };
2083 
fill_table_worker(void * arg)2084 static void fill_table_worker(void *arg) {
2085     struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
2086     fill_single_table(info->env, info->db, info->args, info->fill_with_zeroes);
2087     toku_free(info);
2088 }
2089 
fill_tables_default(DB_ENV * env,DB ** dbs,struct cli_args * args,bool fill_with_zeroes)2090 static int fill_tables_default(DB_ENV *env, DB **dbs, struct cli_args *args, bool fill_with_zeroes) {
2091     const int num_cores = toku_os_get_number_processors();
2092     // Use at most cores / 2 worker threads, since we want some other cores to
2093     // be used for internal engine work (ie: flushes, loader threads, etc).
2094     const int max_num_workers = (num_cores + 1) / 2;
2095     const int num_workers = args->num_DBs < max_num_workers ? args->num_DBs : max_num_workers;
2096     KIBBUTZ kibbutz = NULL;
2097     int r = toku_kibbutz_create(num_workers, &kibbutz);
2098     assert(r == 0);
2099     for (int i = 0; i < args->num_DBs; i++) {
2100         struct fill_table_worker_info *XCALLOC(info);
2101         info->env = env;
2102         info->db = dbs[i];
2103         info->args = args;
2104         info->fill_with_zeroes = fill_with_zeroes;
2105         toku_kibbutz_enq(kibbutz, fill_table_worker, info);
2106     }
2107     toku_kibbutz_destroy(kibbutz);
2108     return 0;
2109 }
2110 
2111 // fill_tables() is called when the tables are first created.
2112 // set this function if you want custom table contents.
2113 static int (*fill_tables)(DB_ENV *env, DB **dbs, struct cli_args *args, bool fill_with_zeroes) = fill_tables_default;
2114 
do_xa_recovery(DB_ENV * env)2115 static void do_xa_recovery(DB_ENV* env) {
2116     DB_PREPLIST preplist[1];
2117     long num_recovered= 0;
2118     int r = 0;
2119     r = env->txn_recover(env, preplist, 1, &num_recovered, DB_NEXT);
2120     while(r==0 && num_recovered > 0) {
2121         DB_TXN* recovered_txn = preplist[0].txn;
2122         if (verbose) {
2123             printf("recovering transaction with id %" PRIu64 " \n", recovered_txn->id64(recovered_txn));
2124         }
2125         if (random() % 2 == 0) {
2126             int rr = recovered_txn->commit(recovered_txn, 0);
2127             CKERR(rr);
2128         }
2129         else {
2130             int rr = recovered_txn->abort(recovered_txn);
2131             CKERR(rr);
2132         }
2133         r = env->txn_recover(env, preplist, 1, &num_recovered, DB_NEXT);
2134     }
2135 }
2136 
open_tables(DB_ENV ** env_res,DB ** db_res,int num_DBs,int (* bt_compare)(DB *,const DBT *,const DBT *),struct cli_args * cli_args)2137 static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
2138                       int (*bt_compare)(DB *, const DBT *, const DBT *),
2139                       struct cli_args *cli_args) {
2140     int r;
2141     struct env_args env_args = cli_args->env_args;
2142 
2143     DB_ENV *env;
2144     db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes);
2145     r = db_env_create(&env, 0); assert(r == 0);
2146     r = env->set_redzone(env, 0); CKERR(r);
2147     if (!cli_args->memcmp_keys) {
2148         r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
2149     }
2150     r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
2151     env->set_update(env, env_args.update_function);
2152     r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
2153     r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
2154     if (env_args.generate_put_callback) {
2155         r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
2156         CKERR(r);
2157     }
2158     else {
2159         r = env->set_generate_row_callback_for_put(env, generate_row_for_put);
2160         CKERR(r);
2161     }
2162     if (env_args.generate_del_callback) {
2163         r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback);
2164         CKERR(r);
2165     }
2166     int env_flags = get_env_open_flags(cli_args);
2167     r = env->open(env, env_args.envdir, DB_RECOVER | env_flags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
2168     do_xa_recovery(env);
2169     r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
2170     r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
2171     r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
2172     env->change_fsync_log_period(env, env_args.sync_period);
2173     *env_res = env;
2174 
2175     for (int i = 0; i < num_DBs; i++) {
2176         DB *db;
2177         r = db_create(&db, env, 0); CKERR(r);
2178         before_db_open_hook(db, i);
2179         open_db(db, i, cli_args);
2180         db_res[i] = after_db_open_hook(env, db, i, open_db, cli_args);
2181     }
2182     return r;
2183 }
2184 
close_tables(DB_ENV * env,DB ** dbs,int num_DBs)2185 static int close_tables(DB_ENV *env, DB**  dbs, int num_DBs) {
2186     int r;
2187     for (int i = 0; i < num_DBs; i++) {
2188         r = dbs[i]->close(dbs[i], 0); CKERR(r);
2189     }
2190     r = env->close(env, 0); CKERR(r);
2191     return r;
2192 }
2193 
2194 static const struct env_args DEFAULT_ENV_ARGS = {
2195     .fanout = 16,
2196     .node_size = 4096,
2197     .basement_node_size = 1024,
2198     .rollback_node_size = 4096,
2199     .checkpointing_period = 10,
2200     .cleaner_period = 1,
2201     .cleaner_iterations = 1,
2202     .sync_period = 0,
2203     .lk_max_memory = 1L * 1024 * 1024 * 1024,
2204     .cachetable_size = 300000,
2205     .num_bucket_mutexes = 1024,
2206     .envdir = nullptr,
2207     .update_function = update_op_callback,
2208     .generate_put_callback = nullptr,
2209     .generate_del_callback = nullptr,
2210 };
2211 
2212 static const struct env_args DEFAULT_PERF_ENV_ARGS = {
2213     .fanout = 16,
2214     .node_size = 4*1024*1024,
2215     .basement_node_size = 128*1024,
2216     .rollback_node_size = 4*1024*1024,
2217     .checkpointing_period = 60,
2218     .cleaner_period = 1,
2219     .cleaner_iterations = 5,
2220     .sync_period = 0,
2221     .lk_max_memory = 1L * 1024 * 1024 * 1024,
2222     .cachetable_size = 1<<30,
2223     .num_bucket_mutexes = 1024 * 1024,
2224     .envdir = nullptr,
2225     .update_function = nullptr,
2226     .generate_put_callback = nullptr,
2227     .generate_del_callback = nullptr,
2228 };
2229 
UU()2230 static struct cli_args UU() get_default_args(void) {
2231     struct cli_args DEFAULT_ARGS = {
2232         .num_elements = 150000,
2233         .num_DBs = 1,
2234         .num_seconds = 180,
2235         .join_timeout = 3600,
2236         .only_create = false,
2237         .only_stress = false,
2238         .update_broadcast_period_ms = 2000,
2239         .num_ptquery_threads = 1,
2240         .do_test_and_crash = false,
2241         .do_recover = false,
2242         .num_update_threads = 1,
2243         .num_put_threads = 1,
2244         .range_query_limit = 100,
2245         .serial_insert = false,
2246         .interleave = false,
2247         .crash_on_operation_failure = true,
2248         .print_performance = false,
2249         .print_thread_performance = true,
2250         .print_iteration_performance = true,
2251         .perf_output_format = HUMAN,
2252         .compression_method = TOKU_DEFAULT_COMPRESSION_METHOD,
2253         .performance_period = 1,
2254         .txn_size = 1000,
2255         .key_size = min_key_size,
2256         .val_size = min_val_size,
2257         .compressibility = 1.0,
2258         .env_args = DEFAULT_ENV_ARGS,
2259         .single_txn = false,
2260         .warm_cache = false,
2261         .blackhole = false,
2262         .nolocktree = false,
2263         .unique_checks = false,
2264         .sync_period = 0,
2265         .nolog = false,
2266         .nocrashstatus = false,
2267         .prelock_updates = false,
2268         .disperse_keys = false,
2269         .memcmp_keys = false,
2270         .direct_io = false,
2271         };
2272     DEFAULT_ARGS.env_args.envdir = TOKU_TEST_FILENAME;
2273     return DEFAULT_ARGS;
2274 }
2275 
UU()2276 static struct cli_args UU() get_default_args_for_perf(void) {
2277     struct cli_args args = get_default_args();
2278     args.num_elements = 1000000; //default of 1M
2279     args.env_args = DEFAULT_PERF_ENV_ARGS;
2280     args.env_args.envdir = TOKU_TEST_FILENAME;
2281     return args;
2282 }
2283 
2284 union val_type {
2285     int32_t     i32;
2286     int64_t     i64;
2287     uint32_t    u32;
2288     uint64_t    u64;
2289     bool        b;
2290     double      d;
2291     const char *s;
2292 };
2293 
2294 struct arg_type;
2295 
2296 typedef bool (*match_fun)(struct arg_type *type, char *const argv[]);
2297 typedef int  (*parse_fun)(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]);
2298 typedef void  (*help_fun)(struct arg_type *type, int width_name, int width_type);
2299 
2300 struct type_description {
2301     const char           *type_name;
2302     const match_fun       matches;
2303     const parse_fun       parse;
2304     const help_fun        help;
2305 };
2306 
2307 struct arg_type {
2308     const char              *name;
2309     struct type_description *description;
2310     union val_type           default_val;
2311     void                    *target;
2312     const char              *help_suffix;
2313     union val_type           min;
2314     union val_type           max;
2315 };
2316 
2317 #define DEFINE_NUMERIC_HELP(typename, format, member, MIN, MAX) \
2318 static inline void \
2319 help_##typename(struct arg_type *type, int width_name, int width_type) { \
2320     invariant(!strncmp("--", type->name, strlen("--"))); \
2321     fprintf(stderr, "\t%-*s  %-*s  ", width_name, type->name, width_type, type->description->type_name); \
2322     fprintf(stderr, "(default %" format "%s", type->default_val.member, type->help_suffix); \
2323     if (type->min.member != MIN) { \
2324         fprintf(stderr, ", min %" format "%s", type->min.member, type->help_suffix); \
2325     } \
2326     if (type->max.member != MAX) { \
2327         fprintf(stderr, ", max %" format "%s", type->max.member, type->help_suffix); \
2328     } \
2329     fprintf(stderr, ")\n"); \
2330 }
2331 
DEFINE_NUMERIC_HELP(int32,PRId32,i32,INT32_MIN,INT32_MAX)2332 DEFINE_NUMERIC_HELP(int32, PRId32, i32, INT32_MIN, INT32_MAX)
2333 DEFINE_NUMERIC_HELP(int64, PRId64, i64, INT64_MIN, INT64_MAX)
2334 DEFINE_NUMERIC_HELP(uint32, PRIu32, u32, 0, UINT32_MAX)
2335 DEFINE_NUMERIC_HELP(uint64, PRIu64, u64, 0, UINT64_MAX)
2336 DEFINE_NUMERIC_HELP(double, ".2lf",  d, -HUGE_VAL, HUGE_VAL)
2337 static inline void
2338 help_bool(struct arg_type *type, int width_name, int width_type) {
2339     invariant(strncmp("--", type->name, strlen("--")));
2340     const char *default_value = type->default_val.b ? "yes" : "no";
2341     fprintf(stderr, "\t--[no-]%-*s  %-*s  (default %s)\n",
2342             width_name - (int)strlen("--[no-]"), type->name,
2343             width_type, type->description->type_name,
2344             default_value);
2345 }
2346 
2347 static inline void
help_string(struct arg_type * type,int width_name,int width_type)2348 help_string(struct arg_type *type, int width_name, int width_type) {
2349     invariant(!strncmp("--", type->name, strlen("--")));
2350     const char *default_value = type->default_val.s ? type->default_val.s : "";
2351     fprintf(stderr, "\t%-*s  %-*s  (default '%s')\n",
2352             width_name, type->name,
2353             width_type, type->description->type_name,
2354             default_value);
2355 }
2356 
2357 static inline bool
match_name(struct arg_type * type,char * const argv[])2358 match_name(struct arg_type *type, char *const argv[]) {
2359     invariant(!strncmp("--", type->name, strlen("--")));
2360     return !strcmp(argv[1], type->name);
2361 }
2362 
2363 static inline bool
match_bool(struct arg_type * type,char * const argv[])2364 match_bool(struct arg_type *type, char *const argv[]) {
2365     invariant(strncmp("--", type->name, strlen("--")));
2366     const char *string = argv[1];
2367     if (strncmp(string, "--", strlen("--"))) {
2368         return false;
2369     }
2370     string += strlen("--");
2371     if (!strncmp(string, "no-", strlen("no-"))) {
2372         string += strlen("no-");
2373     }
2374     return !strcmp(string, type->name);
2375 }
2376 
2377 static inline int
parse_bool(struct arg_type * type,int * extra_args_consumed,int UU (argc),char * const argv[])2378 parse_bool(struct arg_type *type, int *extra_args_consumed, int UU(argc), char *const argv[]) {
2379     const char *string = argv[1];
2380     if (!strncmp(string, "--no-", strlen("--no-"))) {
2381         *((bool *)type->target) = false;
2382     }
2383     else {
2384         *((bool *)type->target) = true;
2385     }
2386     *extra_args_consumed = 0;
2387     return 0;
2388 }
2389 
2390 static inline int
parse_string(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2391 parse_string(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2392     if (argc < 2) {
2393         return EINVAL;
2394     }
2395     *((const char **)type->target) = argv[2];
2396     *extra_args_consumed = 1;
2397     return 0;
2398 }
2399 
2400 static inline int
parse_uint64(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2401 parse_uint64(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2402     // Already verified name.
2403 
2404     if (argc < 2) {
2405         return EINVAL;
2406     }
2407     if (*argv[2] == '\0') {
2408        return EINVAL;
2409     }
2410 
2411     char *endptr;
2412     unsigned long long int result = strtoull(argv[2], &endptr, 0);
2413     if (*endptr != '\0') {
2414         return EINVAL;
2415     }
2416     if (result < type->min.u64 || result > type->max.u64) {
2417         return ERANGE;
2418     }
2419     *((uint64_t*)type->target) = result;
2420     *extra_args_consumed = 1;
2421     return 0;
2422 }
2423 
2424 static inline int
parse_int64(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2425 parse_int64(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2426     // Already verified name.
2427 
2428     if (argc < 2) {
2429         return EINVAL;
2430     }
2431     if (*argv[2] == '\0') {
2432        return EINVAL;
2433     }
2434 
2435     char *endptr;
2436     long long int result = strtoll(argv[2], &endptr, 0);
2437     if (*endptr != '\0') {
2438         return EINVAL;
2439     }
2440     if (result < type->min.i64 || result > type->max.i64) {
2441         return ERANGE;
2442     }
2443     *((int64_t*)type->target) = result;
2444     *extra_args_consumed = 1;
2445     return 0;
2446 }
2447 
2448 static inline int
parse_uint32(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2449 parse_uint32(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2450     // Already verified name.
2451 
2452     if (argc < 2) {
2453         return EINVAL;
2454     }
2455     if (*argv[2] == '\0') {
2456        return EINVAL;
2457     }
2458 
2459     char *endptr;
2460     unsigned long int result = strtoul(argv[2], &endptr, 0);
2461     if (*endptr != '\0') {
2462         return EINVAL;
2463     }
2464     if (result < type->min.u32 || result > type->max.u32) {
2465         return ERANGE;
2466     }
2467     *((int32_t*)type->target) = result;
2468     *extra_args_consumed = 1;
2469     return 0;
2470 }
2471 
2472 static inline int
parse_int32(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2473 parse_int32(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2474     // Already verified name.
2475 
2476     if (argc < 2) {
2477         return EINVAL;
2478     }
2479     if (*argv[2] == '\0') {
2480        return EINVAL;
2481     }
2482 
2483     char *endptr;
2484     long int result = strtol(argv[2], &endptr, 0);
2485     if (*endptr != '\0') {
2486         return EINVAL;
2487     }
2488     if (result < type->min.i32 || result > type->max.i32) {
2489         return ERANGE;
2490     }
2491     *((int32_t*)type->target) = result;
2492     *extra_args_consumed = 1;
2493     return 0;
2494 }
2495 
2496 static inline int
parse_double(struct arg_type * type,int * extra_args_consumed,int argc,char * const argv[])2497 parse_double(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) {
2498     // Already verified name.
2499 
2500     if (argc < 2) {
2501         return EINVAL;
2502     }
2503     if (*argv[2] == '\0') {
2504        return EINVAL;
2505     }
2506 
2507     char *endptr;
2508     double result = strtod(argv[2], &endptr);
2509     if (*endptr != '\0') {
2510         return EINVAL;
2511     }
2512     if (result < type->min.d || result > type->max.d) {
2513         return ERANGE;
2514     }
2515     *((double*)type->target) = result;
2516     *extra_args_consumed = 1;
2517     return 0;
2518 }
2519 
2520 // Common case (match_name).
2521 #define DECLARE_TYPE_DESCRIPTION(typename) \
2522     struct type_description type_##typename = { \
2523         .type_name = #typename, \
2524         .matches = match_name, \
2525         .parse = parse_##typename, \
2526         .help = help_##typename \
2527     }
2528 DECLARE_TYPE_DESCRIPTION(int32);
2529 DECLARE_TYPE_DESCRIPTION(uint32);
2530 DECLARE_TYPE_DESCRIPTION(int64);
2531 DECLARE_TYPE_DESCRIPTION(uint64);
2532 DECLARE_TYPE_DESCRIPTION(double);
2533 DECLARE_TYPE_DESCRIPTION(string);
2534 
2535 // Bools use their own match function so they are declared manually.
2536 struct type_description type_bool = {
2537     .type_name = "bool",
2538     .matches = match_bool,
2539     .parse = parse_bool,
2540     .help = help_bool
2541 };
2542 
2543 #define ARG_MATCHES(type, rest...) type->description->matches(type, rest)
2544 #define ARG_PARSE(type, rest...) type->description->parse(type, rest)
2545 #define ARG_HELP(type, rest...) type->description->help(type, rest)
2546 
2547 static inline void
do_usage(const char * argv0,int n,struct arg_type types[])2548 do_usage(const char *argv0, int n, struct arg_type types[/*n*/]) {
2549     fprintf(stderr, "Usage:\n");
2550     fprintf(stderr, "\t%s [-h|--help]\n", argv0);
2551     fprintf(stderr, "\t%s [OPTIONS]\n", argv0);
2552     fprintf(stderr, "\n");
2553     fprintf(stderr, "OPTIONS are among:\n");
2554     fprintf(stderr, "\t-q|--quiet\n");
2555     fprintf(stderr, "\t-v|--verbose\n");
2556     for (int i = 0; i < n; i++) {
2557         struct arg_type *type = &types[i];
2558         ARG_HELP(type, 35, 6);
2559     }
2560 }
2561 
parse_stress_test_args(int argc,char * const argv[],struct cli_args * args)2562 static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) {
2563     struct cli_args default_args = *args;
2564     const char *argv0=argv[0];
2565 
2566 #define MAKE_ARG(name_string, type, member, variable, suffix, min_val, max_val) { \
2567     .name=(name_string), \
2568     .description=&(type), \
2569     .default_val={.member=default_args.variable}, \
2570     .target=&(args->variable), \
2571     .help_suffix=(suffix), \
2572     .min={.member=min_val}, \
2573     .max={.member=max_val}, \
2574 }
2575 #define MAKE_LOCAL_ARG(name_string, type, member, default, variable, suffix, min_val, max_val) { \
2576     .name=(name_string), \
2577     .description=&(type), \
2578     .default_val={.member=default}, \
2579     .target=&(variable), \
2580     .help_suffix=(suffix), \
2581     .min={.member=min_val}, \
2582     .max={.member=max_val}, \
2583 }
2584 #define UINT32_ARG(name_string, variable, suffix) \
2585         MAKE_ARG(name_string, type_uint32, u32, variable, suffix, 0, UINT32_MAX)
2586 #define UINT32_ARG_R(name_string, variable, suffix, min, max) \
2587         MAKE_ARG(name_string, type_uint32, u32, variable, suffix, min, max)
2588 #define UINT64_ARG(name_string, variable, suffix) \
2589         MAKE_ARG(name_string, type_uint64, u64, variable, suffix, 0, UINT64_MAX)
2590 #define INT32_ARG_NONNEG(name_string, variable, suffix) \
2591         MAKE_ARG(name_string, type_int32, i32, variable, suffix, 0, INT32_MAX)
2592 #define INT32_ARG_R(name_string, variable, suffix, min, max) \
2593         MAKE_ARG(name_string, type_int32, i32, variable, suffix, min, max)
2594 #define DOUBLE_ARG_R(name_string, variable, suffix, min, max) \
2595         MAKE_ARG(name_string, type_double, d, variable, suffix, min, max)
2596 #define BOOL_ARG(name_string, variable) \
2597         MAKE_ARG(name_string, type_bool, b, variable, "", false, false)
2598 #define STRING_ARG(name_string, variable) \
2599         MAKE_ARG(name_string, type_string, s, variable, "", "", "")
2600 #define LOCAL_STRING_ARG(name_string, variable, default) \
2601         MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "")
2602 
2603     const char *perf_format_s = nullptr;
2604     const char *compression_method_s = nullptr;
2605     const char *print_engine_status_s = nullptr;
2606     struct arg_type arg_types[] = {
2607         INT32_ARG_NONNEG("--num_elements",            num_elements,                  ""),
2608         INT32_ARG_NONNEG("--num_DBs",                 num_DBs,                       ""),
2609         INT32_ARG_NONNEG("--num_seconds",             num_seconds,                   "s"),
2610         INT32_ARG_NONNEG("--fanout",                  env_args.fanout,               ""),
2611         INT32_ARG_NONNEG("--node_size",               env_args.node_size,            " bytes"),
2612         INT32_ARG_NONNEG("--basement_node_size",      env_args.basement_node_size,   " bytes"),
2613         INT32_ARG_NONNEG("--rollback_node_size",      env_args.rollback_node_size,   " bytes"),
2614         INT32_ARG_NONNEG("--checkpointing_period",    env_args.checkpointing_period, "s"),
2615         INT32_ARG_NONNEG("--cleaner_period",          env_args.cleaner_period,       "s"),
2616         INT32_ARG_NONNEG("--cleaner_iterations",      env_args.cleaner_iterations,   ""),
2617         INT32_ARG_NONNEG("--sync_period",             env_args.sync_period,          "ms"),
2618         INT32_ARG_NONNEG("--update_broadcast_period", update_broadcast_period_ms,    "ms"),
2619         INT32_ARG_NONNEG("--num_ptquery_threads",     num_ptquery_threads,           " threads"),
2620         INT32_ARG_NONNEG("--num_put_threads",         num_put_threads,               " threads"),
2621         INT32_ARG_NONNEG("--num_update_threads",      num_update_threads,            " threads"),
2622         INT32_ARG_NONNEG("--range_query_limit",       range_query_limit,             " rows"),
2623 
2624         UINT32_ARG("--txn_size",                      txn_size,                      " rows"),
2625         UINT32_ARG("--num_bucket_mutexes",            env_args.num_bucket_mutexes,   " mutexes"),
2626 
2627         INT32_ARG_R("--join_timeout",                 join_timeout,                  "s", 1, INT32_MAX),
2628         INT32_ARG_R("--performance_period",           performance_period,            "s", 1, INT32_MAX),
2629 
2630         UINT64_ARG("--cachetable_size",               env_args.cachetable_size,      " bytes"),
2631         UINT64_ARG("--lk_max_memory",                 env_args.lk_max_memory,        " bytes"),
2632 
2633         DOUBLE_ARG_R("--compressibility",             compressibility,               "", 0.0, 1.0),
2634 
2635         //TODO: when outputting help.. skip min/max that is min/max of data range.
2636         UINT32_ARG_R("--key_size",                    key_size,                      " bytes", min_key_size, UINT32_MAX),
2637         UINT32_ARG_R("--val_size",                    val_size,                      " bytes", min_val_size, UINT32_MAX),
2638 
2639         BOOL_ARG("serial_insert",                     serial_insert),
2640         BOOL_ARG("interleave",                        interleave),
2641         BOOL_ARG("crash_on_operation_failure",        crash_on_operation_failure),
2642         BOOL_ARG("single_txn",                        single_txn),
2643         BOOL_ARG("warm_cache",                        warm_cache),
2644         BOOL_ARG("print_performance",                 print_performance),
2645         BOOL_ARG("print_thread_performance",          print_thread_performance),
2646         BOOL_ARG("print_iteration_performance",       print_iteration_performance),
2647         BOOL_ARG("only_create",                       only_create),
2648         BOOL_ARG("only_stress",                       only_stress),
2649         BOOL_ARG("test",                              do_test_and_crash),
2650         BOOL_ARG("recover",                           do_recover),
2651         BOOL_ARG("blackhole",                         blackhole),
2652         BOOL_ARG("nolocktree",                        nolocktree),
2653         BOOL_ARG("unique_checks",                     unique_checks),
2654         BOOL_ARG("nolog",                             nolog),
2655         BOOL_ARG("nocrashstatus",                     nocrashstatus),
2656         BOOL_ARG("prelock_updates",                   prelock_updates),
2657         BOOL_ARG("disperse_keys",                     disperse_keys),
2658         BOOL_ARG("memcmp_keys",                       memcmp_keys),
2659         BOOL_ARG("direct_io",                         direct_io),
2660 
2661         STRING_ARG("--envdir",                        env_args.envdir),
2662 
2663         LOCAL_STRING_ARG("--perf_format",             perf_format_s,                "human"),
2664         LOCAL_STRING_ARG("--compression_method",      compression_method_s,         "quicklz"),
2665         LOCAL_STRING_ARG("--print_engine_status",     print_engine_status_s,        nullptr),
2666         //TODO(add --quiet, -v, -h)
2667     };
2668 #undef UINT32_ARG
2669 #undef UINT32_ARG_R
2670 #undef UINT64_ARG
2671 #undef DOUBLE_ARG_R
2672 #undef BOOL_ARG
2673 #undef STRING_ARG
2674 #undef MAKE_ARG
2675 
2676     int num_arg_types = sizeof(arg_types) / sizeof(arg_types[0]);
2677 
2678     int resultcode = 0;
2679     while (argc > 1) {
2680         if (!strcmp(argv[1], "-v") || !strcmp(argv[1], "--verbose")) {
2681             verbose++;
2682             argv++;
2683             argc--;
2684         }
2685         else if (!strcmp(argv[1], "-q") || !strcmp(argv[1], "--quiet")) {
2686             verbose = 0;
2687             argv++;
2688             argc--;
2689         }
2690         else if (!strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
2691             fprintf(stderr, "HELP INVOKED\n");
2692             do_usage(argv0, num_arg_types, arg_types);
2693             exit(0);
2694         }
2695         else {
2696             bool found = false;
2697             for (int i = 0; i < num_arg_types; i++) {
2698                 struct arg_type *type = &arg_types[i];
2699                 if (ARG_MATCHES(type, argv)) {
2700                     int extra_args_consumed;
2701                     resultcode = ARG_PARSE(type, &extra_args_consumed, argc, argv);
2702                     if (resultcode) {
2703                         fprintf(stderr, "ERROR PARSING [%s]\n", argv[1]);
2704                         do_usage(argv0, num_arg_types, arg_types);
2705                         exit(resultcode);
2706                     }
2707                     found = true;
2708                     argv += extra_args_consumed + 1;
2709                     argc -= extra_args_consumed + 1;
2710                     break;
2711                 }
2712             }
2713             if (!found) {
2714                 fprintf(stderr, "COULD NOT PARSE [%s]\n", argv[1]);
2715                 do_usage(argv0, num_arg_types, arg_types);
2716                 exit(EINVAL);
2717             }
2718         }
2719     }
2720     args->print_engine_status = print_engine_status_s;
2721     if (compression_method_s != nullptr) {
2722         if (strcmp(compression_method_s, "quicklz") == 0) {
2723             args->compression_method = TOKU_QUICKLZ_METHOD;
2724         } else if (strcmp(compression_method_s, "zlib") == 0) {
2725             args->compression_method = TOKU_ZLIB_WITHOUT_CHECKSUM_METHOD;
2726         } else if (strcmp(compression_method_s, "lzma") == 0) {
2727             args->compression_method = TOKU_LZMA_METHOD;
2728         } else if (strcmp(compression_method_s, "snappy") == 0) {
2729             args->compression_method = TOKU_SNAPPY_METHOD;
2730         } else if (strcmp(compression_method_s, "none") == 0) {
2731             args->compression_method = TOKU_NO_COMPRESSION;
2732         } else {
2733             fprintf(stderr, "valid values for --compression_method are \"quicklz\", \"zlib\", \"lzma\", \"snappy\", and \"none\"\n");
2734             do_usage(argv0, num_arg_types, arg_types);
2735             exit(EINVAL);
2736         }
2737     }
2738     if (perf_format_s != nullptr) {
2739         if (!strcmp(perf_format_s, "human")) {
2740             args->perf_output_format = HUMAN;
2741         } else if (!strcmp(perf_format_s, "csv")) {
2742             args->perf_output_format = CSV;
2743         } else if (!strcmp(perf_format_s, "tsv")) {
2744             args->perf_output_format = TSV;
2745         } else {
2746             fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n");
2747             do_usage(argv0, num_arg_types, arg_types);
2748             exit(EINVAL);
2749         }
2750     }
2751     if (args->only_create && args->only_stress) {
2752         fprintf(stderr, "used --only_stress and --only_create\n");
2753         do_usage(argv0, num_arg_types, arg_types);
2754         exit(EINVAL);
2755     }
2756 }
2757 
2758 static void
2759 stress_table(DB_ENV *, DB **, struct cli_args *);
2760 
2761 static int
stress_dbt_cmp_legacy(const DBT * a,const DBT * b)2762 stress_dbt_cmp_legacy(const DBT *a, const DBT *b) {
2763     int x = *(int *) a->data;
2764     int y = *(int *) b->data;
2765     if (x < y) {
2766         return -1;
2767     } else if (x > y) {
2768         return +1;
2769     } else {
2770         return 0;
2771     }
2772 }
2773 
2774 static int
stress_dbt_cmp(const DBT * a,const DBT * b)2775 stress_dbt_cmp(const DBT *a, const DBT *b) {
2776     // Keys are only compared by their first 8 bytes,
2777     // interpreted as a little endian 64 bit integers.
2778     // The rest of the key is just padding.
2779     uint64_t x = *(uint64_t *) a->data;
2780     uint64_t y = *(uint64_t *) b->data;
2781     if (x < y) {
2782         return -1;
2783     } else if (x > y) {
2784         return +1;
2785     } else {
2786         return 0;
2787     }
2788 }
2789 
2790 static int
stress_cmp(DB * db,const DBT * a,const DBT * b)2791 stress_cmp(DB *db, const DBT *a, const DBT *b) {
2792     assert(db && a && b);
2793     assert(a->size == b->size);
2794 
2795     if (a->size == sizeof(int)) {
2796         // Legacy comparison: keys must be >= 4 bytes
2797         return stress_dbt_cmp_legacy(a, b);
2798     } else {
2799         // Modern comparison: keys must be >= 8 bytes
2800         invariant(a->size >= sizeof(uint64_t));
2801         return stress_dbt_cmp(a, b);
2802     }
2803 }
2804 
2805 static void
do_warm_cache(DB_ENV * env,DB ** dbs,struct cli_args * args)2806 do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args)
2807 {
2808     struct scan_op_extra soe;
2809     soe.fast = true;
2810     soe.fwd = true;
2811     soe.prefetch = true;
2812     struct arg scan_arg;
2813     arg_init(&scan_arg, dbs, env, args);
2814     scan_arg.operation_extra = &soe;
2815     scan_arg.operation = scan_op_no_check;
2816     scan_arg.lock_type = STRESS_LOCK_NONE;
2817     DB_TXN* txn = nullptr;
2818     // don't take serializable read locks when scanning.
2819     int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
2820     // make sure the scan doesn't terminate early
2821     run_test = true;
2822     // warm up each DB in parallel
2823     scan_op_no_check_parallel(txn, &scan_arg, &soe, nullptr);
2824     r = txn->commit(txn,0); CKERR(r);
2825 }
2826 
2827 static void
UU()2828 UU() stress_recover(struct cli_args *args) {
2829     DB_ENV* env = nullptr;
2830     DB* dbs[args->num_DBs];
2831     memset(dbs, 0, sizeof(dbs));
2832     { int chk_r = open_tables(&env,
2833                               dbs,
2834                               args->num_DBs,
2835                               stress_cmp,
2836                               args); CKERR(chk_r); }
2837 
2838     DB_TXN* txn = nullptr;
2839     struct arg recover_args;
2840     arg_init(&recover_args, dbs, env, args);
2841     int r = env->txn_begin(env, 0, &txn, recover_args.txn_flags);
2842     CKERR(r);
2843     struct scan_op_extra soe = {
2844         .fast = true,
2845         .fwd = true,
2846         .prefetch = false
2847     };
2848     // make sure the scan doesn't terminate early
2849     run_test = true;
2850     r = scan_op(txn, &recover_args, &soe, nullptr);
2851     CKERR(r);
2852     { int chk_r = txn->commit(txn,0); CKERR(chk_r); }
2853     { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
2854 }
2855 
2856 static void
open_and_stress_tables(struct cli_args * args,bool fill_with_zeroes,int (* cmp)(DB *,const DBT *,const DBT *))2857 open_and_stress_tables(struct cli_args *args, bool fill_with_zeroes, int (*cmp)(DB *, const DBT *, const DBT *))
2858 {
2859     if ((args->key_size < 8 && args->key_size != 4) ||
2860         (args->val_size < 8 && args->val_size != 4)) {
2861         fprintf(stderr, "The only valid key/val sizes are 4, 8, and > 8.\n");
2862         return;
2863     }
2864 
2865     setlocale(LC_NUMERIC, "en_US.UTF-8");
2866     DB_ENV* env = nullptr;
2867     DB* dbs[args->num_DBs];
2868     memset(dbs, 0, sizeof(dbs));
2869     db_env_enable_engine_status(args->nocrashstatus ? false : true);
2870     db_env_set_direct_io(args->direct_io ? true : false);
2871     if (!args->only_stress) {
2872         create_tables(
2873             &env,
2874             dbs,
2875             args->num_DBs,
2876             cmp,
2877             args
2878             );
2879         { int chk_r = fill_tables(env, dbs, args, fill_with_zeroes); CKERR(chk_r); }
2880         { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
2881     }
2882     if (!args->only_create) {
2883         { int chk_r = open_tables(&env,
2884                                   dbs,
2885                                   args->num_DBs,
2886                                   cmp,
2887                                   args); CKERR(chk_r); }
2888         if (args->warm_cache) {
2889             do_warm_cache(env, dbs, args);
2890         }
2891         stress_table(env, dbs, args);
2892         { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
2893     }
2894 }
2895 
2896 static void
UU()2897 UU() stress_test_main(struct cli_args *args) {
2898     // Begin the test with fixed size values equal to zero.
2899     // This is important for correctness testing.
2900     open_and_stress_tables(args, true, stress_cmp);
2901 }
2902 
2903 static void
UU()2904 UU() perf_test_main(struct cli_args *args) {
2905     // Do not begin the test by creating a table of all zeroes.
2906     // We want to control the row size and its compressibility.
2907     open_and_stress_tables(args, false, stress_cmp);
2908 }
2909 
2910 static void
UU()2911 UU() perf_test_main_with_cmp(struct cli_args *args, int (*cmp)(DB *, const DBT *, const DBT *)) {
2912     // Do not begin the test by creating a table of all zeroes.
2913     // We want to control the row size and its compressibility.
2914     open_and_stress_tables(args, false, cmp);
2915 }
2916