1 /*-
2 * Public Domain 2014-2018 MongoDB, Inc.
3 * Public Domain 2008-2014 WiredTiger, Inc.
4 *
5 * This is free and unencumbered software released into the public domain.
6 *
7 * Anyone is free to copy, modify, publish, use, compile, sell, or
8 * distribute this software, either in source code form or as a compiled
9 * binary, for any purpose, commercial or non-commercial, and by any
10 * means.
11 *
12 * In jurisdictions that recognize copyright laws, the author or authors
13 * of this software dedicate any and all copyright interest in the
14 * software to the public domain. We make this dedication for the benefit
15 * of the public at large and to the detriment of our heirs and
16 * successors. We intend this dedication to be an overt act of
17 * relinquishment in perpetuity of all present and future rights to this
18 * software under copyright law.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
24 * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
25 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26 * OTHER DEALINGS IN THE SOFTWARE.
27 */
28
29 #include "wtperf.h"
30
31 /* Default values. */
32 #define DEFAULT_HOME "WT_TEST"
33 #define DEFAULT_MONITOR_DIR "WT_TEST"
34
35 static WT_THREAD_RET checkpoint_worker(void *);
36 static int drop_all_tables(WTPERF *);
37 static int execute_populate(WTPERF *);
38 static int execute_workload(WTPERF *);
39 static int find_table_count(WTPERF *);
40 static WT_THREAD_RET monitor(void *);
41 static WT_THREAD_RET populate_thread(void *);
42 static void randomize_value(WTPERF_THREAD *, char *);
43 static void recreate_dir(const char *);
44 static int start_all_runs(WTPERF *);
45 static int start_run(WTPERF *);
46 static void start_threads(WTPERF *, WORKLOAD *,
47 WTPERF_THREAD *, u_int, WT_THREAD_CALLBACK(*)(void *));
48 static void stop_threads(u_int, WTPERF_THREAD *);
49 static WT_THREAD_RET thread_run_wtperf(void *);
50 static void update_value_delta(WTPERF_THREAD *);
51 static WT_THREAD_RET worker(void *);
52
53 static uint64_t wtperf_rand(WTPERF_THREAD *);
54 static uint64_t wtperf_value_range(WTPERF *);
55
56 #define INDEX_COL_NAMES "columns=(key,val)"
57
58 /* Retrieve an ID for the next insert operation. */
59 static inline uint64_t
get_next_incr(WTPERF * wtperf)60 get_next_incr(WTPERF *wtperf)
61 {
62 return (__wt_atomic_add64(&wtperf->insert_key, 1));
63 }
64
65 /*
66 * Each time this function is called we will overwrite the first and one
67 * other element in the value buffer.
68 */
69 static void
randomize_value(WTPERF_THREAD * thread,char * value_buf)70 randomize_value(WTPERF_THREAD *thread, char *value_buf)
71 {
72 CONFIG_OPTS *opts;
73 uint8_t *vb;
74 uint32_t i, max_range, rand_val;
75
76 opts = thread->wtperf->opts;
77
78 /*
79 * Limit how much of the buffer we validate for length, this means
80 * that only threads that do growing updates will ever make changes to
81 * values outside of the initial value size, but that's a fair trade
82 * off for avoiding figuring out how long the value is more accurately
83 * in this performance sensitive function.
84 */
85 if (thread->workload == NULL || thread->workload->update_delta == 0)
86 max_range = opts->value_sz;
87 else if (thread->workload->update_delta > 0)
88 max_range = opts->value_sz_max;
89 else
90 max_range = opts->value_sz_min;
91
92 /*
93 * Generate a single random value and re-use it. We generally only
94 * have small ranges in this function, so avoiding a bunch of calls
95 * is worthwhile.
96 */
97 rand_val = __wt_random(&thread->rnd);
98 i = rand_val % (max_range - 1);
99
100 /*
101 * Ensure we don't write past the end of a value when configured for
102 * randomly sized values.
103 */
104 while (value_buf[i] == '\0' && i > 0)
105 --i;
106
107 vb = (uint8_t *)value_buf;
108 vb[0] = ((rand_val >> 8) % 255) + 1;
109 /*
110 * If i happened to be 0, we'll be re-writing the same value
111 * twice, but that doesn't matter.
112 */
113 vb[i] = ((rand_val >> 16) % 255) + 1;
114 }
115
116 /*
117 * Partition data by key ranges.
118 */
119 static uint32_t
map_key_to_table(CONFIG_OPTS * opts,uint64_t k)120 map_key_to_table(CONFIG_OPTS *opts, uint64_t k)
121 {
122 if (opts->range_partition) {
123 /* Take care to return a result in [0..table_count-1]. */
124 if (k > opts->icount + opts->random_range)
125 return (0);
126 return ((uint32_t)((k - 1) /
127 ((opts->icount + opts->random_range +
128 opts->table_count - 1) / opts->table_count)));
129 } else
130 return ((uint32_t)(k % opts->table_count));
131 }
132
133 /*
134 * Figure out and extend the size of the value string, used for growing
135 * updates. We know that the value to be updated is in the threads value
136 * scratch buffer.
137 */
138 static inline void
update_value_delta(WTPERF_THREAD * thread)139 update_value_delta(WTPERF_THREAD *thread)
140 {
141 CONFIG_OPTS *opts;
142 WTPERF *wtperf;
143 char * value;
144 int64_t delta, len, new_len;
145
146 wtperf = thread->wtperf;
147 opts = wtperf->opts;
148 value = thread->value_buf;
149 delta = thread->workload->update_delta;
150 len = (int64_t)strlen(value);
151
152 if (delta == INT64_MAX)
153 delta = __wt_random(&thread->rnd) %
154 (opts->value_sz_max - opts->value_sz);
155
156 /* Ensure we aren't changing across boundaries */
157 if (delta > 0 && len + delta > opts->value_sz_max)
158 delta = opts->value_sz_max - len;
159 else if (delta < 0 && len + delta < opts->value_sz_min)
160 delta = opts->value_sz_min - len;
161
162 /* Bail if there isn't anything to do */
163 if (delta == 0)
164 return;
165
166 if (delta < 0)
167 value[len + delta] = '\0';
168 else {
169 /* Extend the value by the configured amount. */
170 for (new_len = len;
171 new_len < opts->value_sz_max && new_len - len < delta;
172 new_len++)
173 value[new_len] = 'a';
174 }
175 }
176
177 static int
cb_asyncop(WT_ASYNC_CALLBACK * cb,WT_ASYNC_OP * op,int ret,uint32_t flags)178 cb_asyncop(WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP *op, int ret, uint32_t flags)
179 {
180 TRACK *trk;
181 WTPERF *wtperf;
182 WTPERF_THREAD *thread;
183 WT_ASYNC_OPTYPE type;
184 uint32_t *tables;
185 int t_ret;
186 char *value;
187
188 (void)cb;
189 (void)flags;
190
191 wtperf = NULL; /* -Wconditional-uninitialized */
192 thread = NULL; /* -Wconditional-uninitialized */
193
194 type = op->get_type(op);
195 if (type != WT_AOP_COMPACT) {
196 thread = (WTPERF_THREAD *)op->app_private;
197 wtperf = thread->wtperf;
198 }
199
200 trk = NULL;
201 switch (type) {
202 case WT_AOP_COMPACT:
203 tables = (uint32_t *)op->app_private;
204 (void)__wt_atomic_add32(tables, (uint32_t)-1);
205 break;
206 case WT_AOP_INSERT:
207 trk = &thread->insert;
208 break;
209 case WT_AOP_SEARCH:
210 trk = &thread->read;
211 if (ret == 0 &&
212 (t_ret = op->get_value(op, &value)) != 0) {
213 ret = t_ret;
214 lprintf(wtperf, ret, 0, "get_value in read.");
215 goto err;
216 }
217 break;
218 case WT_AOP_UPDATE:
219 trk = &thread->update;
220 break;
221 case WT_AOP_NONE:
222 case WT_AOP_REMOVE:
223 /* We never expect this type. */
224 lprintf(wtperf,
225 ret, 0, "No type in op %" PRIu64, op->get_id(op));
226 goto err;
227 }
228
229 /*
230 * Either we have success and we track it, or failure and panic.
231 *
232 * Reads and updates can fail with WT_NOTFOUND: we may be searching
233 * in a random range, or an insert op might have updated the
234 * last record in the table but not yet finished the actual insert.
235 */
236 if (type == WT_AOP_COMPACT)
237 return (0);
238 if (ret == 0 || (ret == WT_NOTFOUND && type != WT_AOP_INSERT)) {
239 if (!wtperf->in_warmup)
240 (void)__wt_atomic_add64(&trk->ops, 1);
241 return (0);
242 }
243 err:
244 /* Panic if error */
245 lprintf(wtperf, ret, 0, "Error in op %" PRIu64, op->get_id(op));
246 wtperf->error = wtperf->stop = true;
247 return (1);
248 }
249
250 static WT_ASYNC_CALLBACK cb = { cb_asyncop };
251
252 /*
253 * track_operation --
254 * Update an operation's tracking structure with new latency information.
255 */
256 static inline void
track_operation(TRACK * trk,uint64_t usecs)257 track_operation(TRACK *trk, uint64_t usecs)
258 {
259 uint64_t v;
260
261 /* average microseconds per call */
262 v = (uint64_t)usecs;
263
264 trk->latency += usecs; /* track total latency */
265
266 if (v > trk->max_latency) /* track max/min latency */
267 trk->max_latency = (uint32_t)v;
268 if (v < trk->min_latency)
269 trk->min_latency = (uint32_t)v;
270
271 /*
272 * Update a latency bucket.
273 * First buckets: usecs from 100us to 1000us at 100us each.
274 */
275 if (v < 1000)
276 ++trk->us[v];
277
278 /*
279 * Second buckets: milliseconds from 1ms to 1000ms, at 1ms each.
280 */
281 else if (v < ms_to_us(1000))
282 ++trk->ms[us_to_ms(v)];
283
284 /*
285 * Third buckets are seconds from 1s to 100s, at 1s each.
286 */
287 else if (v < sec_to_us(100))
288 ++trk->sec[us_to_sec(v)];
289
290 /* >100 seconds, accumulate in the biggest bucket. */
291 else
292 ++trk->sec[ELEMENTS(trk->sec) - 1];
293 }
294
295 static const char *
op_name(uint8_t * op)296 op_name(uint8_t *op)
297 {
298 switch (*op) {
299 case WORKER_INSERT:
300 return ("insert");
301 case WORKER_INSERT_RMW:
302 return ("insert_rmw");
303 case WORKER_READ:
304 return ("read");
305 case WORKER_TRUNCATE:
306 return ("truncate");
307 case WORKER_UPDATE:
308 return ("update");
309 default:
310 return ("unknown");
311 }
312 /* NOTREACHED */
313 }
314
315 static WT_THREAD_RET
worker_async(void * arg)316 worker_async(void *arg)
317 {
318 CONFIG_OPTS *opts;
319 WTPERF *wtperf;
320 WTPERF_THREAD *thread;
321 WT_ASYNC_OP *asyncop;
322 WT_CONNECTION *conn;
323 uint64_t next_val;
324 uint8_t *op, *op_end;
325 int ret;
326 char *key_buf, *value_buf;
327
328 thread = (WTPERF_THREAD *)arg;
329 wtperf = thread->wtperf;
330 opts = wtperf->opts;
331 conn = wtperf->conn;
332
333 key_buf = thread->key_buf;
334 value_buf = thread->value_buf;
335
336 op = thread->workload->ops;
337 op_end = op + sizeof(thread->workload->ops);
338
339 while (!wtperf->stop) {
340 /*
341 * Generate the next key and setup operation specific
342 * statistics tracking objects.
343 */
344 switch (*op) {
345 case WORKER_INSERT:
346 case WORKER_INSERT_RMW:
347 if (opts->random_range)
348 next_val = wtperf_rand(thread);
349 else
350 next_val = opts->icount + get_next_incr(wtperf);
351 break;
352 case WORKER_READ:
353 case WORKER_UPDATE:
354 next_val = wtperf_rand(thread);
355
356 /*
357 * If the workload is started without a populate phase
358 * we rely on at least one insert to get a valid item
359 * id.
360 */
361 if (wtperf_value_range(wtperf) < next_val)
362 continue;
363 break;
364 default:
365 goto err; /* can't happen */
366 }
367
368 generate_key(opts, key_buf, next_val);
369
370 /*
371 * Spread the data out around the multiple databases.
372 * Sleep to allow workers a chance to run and process async ops.
373 * Then retry to get an async op.
374 */
375 while ((ret = conn->async_new_op(conn,
376 wtperf->uris[map_key_to_table(wtperf->opts, next_val)],
377 NULL, &cb, &asyncop)) == EBUSY)
378 (void)usleep(10000);
379 if (ret != 0)
380 goto err;
381
382 asyncop->app_private = thread;
383 asyncop->set_key(asyncop, key_buf);
384 switch (*op) {
385 case WORKER_READ:
386 ret = asyncop->search(asyncop);
387 if (ret == 0)
388 break;
389 goto op_err;
390 case WORKER_INSERT:
391 if (opts->random_value)
392 randomize_value(thread, value_buf);
393 asyncop->set_value(asyncop, value_buf);
394 if ((ret = asyncop->insert(asyncop)) == 0)
395 break;
396 goto op_err;
397 case WORKER_UPDATE:
398 if (opts->random_value)
399 randomize_value(thread, value_buf);
400 asyncop->set_value(asyncop, value_buf);
401 if ((ret = asyncop->update(asyncop)) == 0)
402 break;
403 goto op_err;
404 default:
405 op_err: lprintf(wtperf, ret, 0,
406 "%s failed for: %s, range: %"PRIu64,
407 op_name(op), key_buf, wtperf_value_range(wtperf));
408 goto err; /* can't happen */
409 }
410
411 /* Schedule the next operation */
412 if (++op == op_end)
413 op = thread->workload->ops;
414 }
415
416 if (conn->async_flush(conn) != 0)
417 goto err;
418
419 /* Notify our caller we failed and shut the system down. */
420 if (0) {
421 err: wtperf->error = wtperf->stop = true;
422 }
423 return (WT_THREAD_RET_VALUE);
424 }
425
426 /*
427 * do_range_reads --
428 * If configured to execute a sequence of next operations after each
429 * search do them. Ensuring the keys we see are always in order.
430 */
431 static int
do_range_reads(WTPERF * wtperf,WT_CURSOR * cursor,int64_t read_range)432 do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor, int64_t read_range)
433 {
434 uint64_t next_val, prev_val;
435 int64_t range;
436 char *range_key_buf;
437 char buf[512];
438 int ret;
439
440 ret = 0;
441
442 if (read_range == 0)
443 return (0);
444
445 memset(&buf[0], 0, 512 * sizeof(char));
446 range_key_buf = &buf[0];
447
448 /* Save where the first key is for comparisons. */
449 testutil_check(cursor->get_key(cursor, &range_key_buf));
450 extract_key(range_key_buf, &next_val);
451
452 for (range = 0; range < read_range; ++range) {
453 prev_val = next_val;
454 ret = cursor->next(cursor);
455 /* We are done if we reach the end. */
456 if (ret != 0)
457 break;
458
459 /* Retrieve and decode the key */
460 testutil_check(cursor->get_key(cursor, &range_key_buf));
461 extract_key(range_key_buf, &next_val);
462 if (next_val < prev_val) {
463 lprintf(wtperf, EINVAL, 0,
464 "Out of order keys %" PRIu64
465 " came before %" PRIu64,
466 prev_val, next_val);
467 return (EINVAL);
468 }
469 }
470 return (0);
471 }
472
473 /* pre_load_data --
474 * Pull everything into cache before starting the workload phase.
475 */
476 static void
pre_load_data(WTPERF * wtperf)477 pre_load_data(WTPERF *wtperf)
478 {
479 CONFIG_OPTS *opts;
480 WT_CONNECTION *conn;
481 WT_CURSOR *cursor;
482 WT_SESSION *session;
483 size_t i;
484 int ret;
485 char *key;
486
487 opts = wtperf->opts;
488 conn = wtperf->conn;
489
490 testutil_check(conn->open_session(
491 conn, NULL, opts->sess_config, &session));
492 for (i = 0; i < opts->table_count; i++) {
493 testutil_check(session->open_cursor(
494 session, wtperf->uris[i], NULL, NULL, &cursor));
495 while ((ret = cursor->next(cursor)) == 0)
496 testutil_check(cursor->get_key(cursor, &key));
497 testutil_assert(ret == WT_NOTFOUND);
498 testutil_check(cursor->close(cursor));
499 }
500 testutil_check(session->close(session, NULL));
501 }
502
503 static WT_THREAD_RET
worker(void * arg)504 worker(void *arg)
505 {
506 struct timespec start, stop;
507 CONFIG_OPTS *opts;
508 TRACK *trk;
509 WORKLOAD *workload;
510 WTPERF *wtperf;
511 WTPERF_THREAD *thread;
512 WT_CONNECTION *conn;
513 WT_CURSOR **cursors, *cursor, *log_table_cursor, *tmp_cursor;
514 WT_SESSION *session;
515 size_t i;
516 int64_t ops, ops_per_txn;
517 uint64_t log_id, next_val, usecs;
518 uint8_t *op, *op_end;
519 int measure_latency, ret, truncated;
520 char *value_buf, *key_buf, *value;
521 char buf[512];
522
523 thread = (WTPERF_THREAD *)arg;
524 workload = thread->workload;
525 wtperf = thread->wtperf;
526 opts = wtperf->opts;
527 conn = wtperf->conn;
528 cursors = NULL;
529 cursor = log_table_cursor = NULL; /* -Wconditional-initialized */
530 ops = 0;
531 ops_per_txn = workload->ops_per_txn;
532 session = NULL;
533 trk = NULL;
534
535 if ((ret = conn->open_session(
536 conn, NULL, opts->sess_config, &session)) != 0) {
537 lprintf(wtperf, ret, 0, "worker: WT_CONNECTION.open_session");
538 goto err;
539 }
540 for (i = 0; i < opts->table_count_idle; i++) {
541 testutil_check(__wt_snprintf(
542 buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i));
543 if ((ret = session->open_cursor(
544 session, buf, NULL, NULL, &tmp_cursor)) != 0) {
545 lprintf(wtperf, ret, 0,
546 "Error opening idle table %s", buf);
547 goto err;
548 }
549 if ((ret = tmp_cursor->close(tmp_cursor)) != 0) {
550 lprintf(wtperf, ret, 0,
551 "Error closing idle table %s", buf);
552 goto err;
553 }
554 }
555 if (workload->table_index != INT32_MAX) {
556 if ((ret = session->open_cursor(session,
557 wtperf->uris[workload->table_index],
558 NULL, NULL, &cursor)) != 0) {
559 lprintf(wtperf, ret, 0,
560 "worker: WT_SESSION.open_cursor: %s",
561 wtperf->uris[workload->table_index]);
562 goto err;
563 }
564 if ((ret = session->open_cursor(session,
565 wtperf->uris[workload->table_index],
566 NULL, "next_random=true", &thread->rand_cursor)) != 0) {
567 lprintf(wtperf, ret, 0,
568 "worker: WT_SESSION.open_cursor: random %s",
569 wtperf->uris[workload->table_index]);
570 goto err;
571 }
572 } else {
573 cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
574 for (i = 0; i < opts->table_count; i++) {
575 if ((ret = session->open_cursor(session,
576 wtperf->uris[i], NULL, NULL, &cursors[i])) != 0) {
577 lprintf(wtperf, ret, 0,
578 "worker: WT_SESSION.open_cursor: %s",
579 wtperf->uris[i]);
580 goto err;
581 }
582 }
583 }
584 if (opts->log_like_table && (ret = session->open_cursor(session,
585 wtperf->log_table_uri, NULL, NULL, &log_table_cursor)) != 0) {
586 lprintf(wtperf, ret, 0,
587 "worker: WT_SESSION.open_cursor: %s",
588 wtperf->log_table_uri);
589 goto err;
590 }
591
592 /* Setup the timer for throttling. */
593 if (workload->throttle != 0)
594 setup_throttle(thread);
595
596 /* Setup for truncate */
597 if (workload->truncate != 0)
598 setup_truncate(wtperf, thread, session);
599
600 key_buf = thread->key_buf;
601 value_buf = thread->value_buf;
602
603 op = workload->ops;
604 op_end = op + sizeof(workload->ops);
605
606 if ((ops_per_txn != 0 || opts->log_like_table) &&
607 (ret = session->begin_transaction(session, NULL)) != 0) {
608 lprintf(wtperf, ret, 0, "First transaction begin failed");
609 goto err;
610 }
611
612 while (!wtperf->stop) {
613 if (workload->pause != 0)
614 (void)sleep((unsigned int)workload->pause);
615 /*
616 * Generate the next key and setup operation specific
617 * statistics tracking objects.
618 */
619 switch (*op) {
620 case WORKER_INSERT:
621 case WORKER_INSERT_RMW:
622 trk = &thread->insert;
623 if (opts->random_range)
624 next_val = wtperf_rand(thread);
625 else
626 next_val = opts->icount + get_next_incr(wtperf);
627 break;
628 case WORKER_READ:
629 trk = &thread->read;
630 /* FALLTHROUGH */
631 case WORKER_UPDATE:
632 if (*op == WORKER_UPDATE)
633 trk = &thread->update;
634 next_val = wtperf_rand(thread);
635
636 /*
637 * If the workload is started without a populate phase
638 * we rely on at least one insert to get a valid item
639 * id.
640 */
641 if (wtperf_value_range(wtperf) < next_val)
642 continue;
643 break;
644 case WORKER_TRUNCATE:
645 /* Required but not used. */
646 next_val = wtperf_rand(thread);
647 break;
648 default:
649 goto err; /* can't happen */
650 }
651
652 generate_key(opts, key_buf, next_val);
653
654 if (workload->table_index == INT32_MAX)
655 /*
656 * Spread the data out around the multiple databases.
657 */
658 cursor = cursors[
659 map_key_to_table(wtperf->opts, next_val)];
660
661 /*
662 * Skip the first time we do an operation, when trk->ops
663 * is 0, to avoid first time latency spikes.
664 */
665 measure_latency =
666 opts->sample_interval != 0 && trk != NULL &&
667 trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
668 if (measure_latency)
669 __wt_epoch(NULL, &start);
670
671 cursor->set_key(cursor, key_buf);
672
673 switch (*op) {
674 case WORKER_READ:
675 /*
676 * Reads can fail with WT_NOTFOUND: we may be searching
677 * in a random range, or an insert thread might have
678 * updated the last record in the table but not yet
679 * finished the actual insert. Count failed search in
680 * a random range as a "read".
681 */
682 ret = cursor->search(cursor);
683 if (ret == 0) {
684 if ((ret = cursor->get_value(
685 cursor, &value)) != 0) {
686 lprintf(wtperf, ret, 0,
687 "get_value in read.");
688 goto err;
689 }
690 /*
691 * If we want to read a range, then call next
692 * for several operations, confirming that the
693 * next key is in the correct order.
694 */
695 ret = do_range_reads(wtperf,
696 cursor, workload->read_range);
697 }
698
699 if (ret == 0 || ret == WT_NOTFOUND)
700 break;
701 goto op_err;
702 case WORKER_INSERT_RMW:
703 if ((ret = cursor->search(cursor)) != WT_NOTFOUND)
704 goto op_err;
705
706 /* The error return reset the cursor's key. */
707 cursor->set_key(cursor, key_buf);
708
709 /* FALLTHROUGH */
710 case WORKER_INSERT:
711 if (opts->random_value)
712 randomize_value(thread, value_buf);
713 cursor->set_value(cursor, value_buf);
714 if ((ret = cursor->insert(cursor)) == 0)
715 break;
716 goto op_err;
717 case WORKER_TRUNCATE:
718 if ((ret = run_truncate(wtperf,
719 thread, cursor, session, &truncated)) == 0) {
720 if (truncated)
721 trk = &thread->truncate;
722 else
723 trk = &thread->truncate_sleep;
724 /* Pause between truncate attempts */
725 (void)usleep(1000);
726 break;
727 }
728 goto op_err;
729 case WORKER_UPDATE:
730 if ((ret = cursor->search(cursor)) == 0) {
731 if ((ret = cursor->get_value(
732 cursor, &value)) != 0) {
733 lprintf(wtperf, ret, 0,
734 "get_value in update.");
735 goto err;
736 }
737 /*
738 * Copy as much of the previous value as is
739 * safe, and be sure to NUL-terminate.
740 */
741 strncpy(value_buf,
742 value, opts->value_sz_max - 1);
743 if (workload->update_delta != 0)
744 update_value_delta(thread);
745 if (value_buf[0] == 'a')
746 value_buf[0] = 'b';
747 else
748 value_buf[0] = 'a';
749 if (opts->random_value)
750 randomize_value(thread, value_buf);
751 cursor->set_value(cursor, value_buf);
752 if ((ret = cursor->update(cursor)) == 0)
753 break;
754 goto op_err;
755 }
756
757 /*
758 * Reads can fail with WT_NOTFOUND: we may be searching
759 * in a random range, or an insert thread might have
760 * updated the last record in the table but not yet
761 * finished the actual insert. Count failed search in
762 * a random range as a "read".
763 */
764 if (ret == WT_NOTFOUND)
765 break;
766
767 op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) {
768 /*
769 * If we are running with explicit transactions
770 * configured and we hit a WT_ROLLBACK, then we
771 * should rollback the current transaction and
772 * attempt to continue.
773 * This does break the guarantee of insertion
774 * order in cases of ordered inserts, as we
775 * aren't retrying here.
776 */
777 lprintf(wtperf, ret, 1,
778 "%s for: %s, range: %"PRIu64, op_name(op),
779 key_buf, wtperf_value_range(wtperf));
780 if ((ret = session->rollback_transaction(
781 session, NULL)) != 0) {
782 lprintf(wtperf, ret, 0,
783 "Failed rollback_transaction");
784 goto err;
785 }
786 if ((ret = session->begin_transaction(
787 session, NULL)) != 0) {
788 lprintf(wtperf, ret, 0,
789 "Worker begin transaction failed");
790 goto err;
791 }
792 break;
793 }
794 lprintf(wtperf, ret, 0,
795 "%s failed for: %s, range: %"PRIu64,
796 op_name(op), key_buf, wtperf_value_range(wtperf));
797 goto err;
798 default:
799 goto err; /* can't happen */
800 }
801
802 /* Update the log-like table. */
803 if (opts->log_like_table &&
804 (*op != WORKER_READ && *op != WORKER_TRUNCATE)) {
805 log_id =
806 __wt_atomic_add64(&wtperf->log_like_table_key, 1);
807 log_table_cursor->set_key(log_table_cursor, log_id);
808 log_table_cursor->set_value(
809 log_table_cursor, value_buf);
810 if ((ret =
811 log_table_cursor->insert(log_table_cursor)) != 0) {
812 lprintf(wtperf, ret, 0, "Cursor insert failed");
813 goto err;
814 }
815 }
816
817 /* Release the cursor, if we have multiple tables. */
818 if (opts->table_count > 1 && ret == 0 &&
819 *op != WORKER_INSERT && *op != WORKER_INSERT_RMW) {
820 if ((ret = cursor->reset(cursor)) != 0) {
821 lprintf(wtperf, ret, 0, "Cursor reset failed");
822 goto err;
823 }
824 }
825
826 /* Gather statistics */
827 if (!wtperf->in_warmup) {
828 if (measure_latency) {
829 __wt_epoch(NULL, &stop);
830 ++trk->latency_ops;
831 usecs = WT_TIMEDIFF_US(stop, start);
832 track_operation(trk, usecs);
833 }
834 /* Increment operation count */
835 ++trk->ops;
836 }
837
838 /*
839 * Commit the transaction if grouping operations together
840 * or tracking changes in our log table.
841 */
842 if ((opts->log_like_table && ops_per_txn == 0) ||
843 (ops_per_txn != 0 && ops++ % ops_per_txn == 0)) {
844 if ((ret = session->commit_transaction(
845 session, NULL)) != 0) {
846 lprintf(wtperf, ret, 0,
847 "Worker transaction commit failed");
848 goto err;
849 }
850 if ((ret = session->begin_transaction(
851 session, NULL)) != 0) {
852 lprintf(wtperf, ret, 0,
853 "Worker begin transaction failed");
854 goto err;
855 }
856 }
857
858 /* Schedule the next operation */
859 if (++op == op_end)
860 op = workload->ops;
861
862 /*
863 * Decrement throttle ops and check if we should sleep
864 * and then get more work to perform.
865 */
866 if (--thread->throttle_cfg.ops_count == 0)
867 worker_throttle(thread);
868
869 }
870
871 if ((ret = session->close(session, NULL)) != 0) {
872 lprintf(wtperf, ret, 0, "Session close in worker failed");
873 goto err;
874 }
875
876 /* Notify our caller we failed and shut the system down. */
877 if (0) {
878 err: wtperf->error = wtperf->stop = true;
879 }
880 free(cursors);
881
882 return (WT_THREAD_RET_VALUE);
883 }
884
885 /*
886 * run_mix_schedule_op --
887 * Replace read operations with another operation, in the configured
888 * percentage.
889 */
890 static void
run_mix_schedule_op(WORKLOAD * workp,int op,int64_t op_cnt)891 run_mix_schedule_op(WORKLOAD *workp, int op, int64_t op_cnt)
892 {
893 int jump, pass;
894 uint8_t *p, *end;
895
896 /* Jump around the array to roughly spread out the operations. */
897 jump = (int)(100 / op_cnt);
898
899 /*
900 * Find a read operation and replace it with another operation. This
901 * is roughly n-squared, but it's an N of 100, leave it.
902 */
903 p = workp->ops;
904 end = workp->ops + sizeof(workp->ops);
905 while (op_cnt-- > 0) {
906 for (pass = 0; *p != WORKER_READ; ++p)
907 if (p == end) {
908 /*
909 * Passed a percentage of total operations and
910 * should always be a read operation to replace,
911 * but don't allow infinite loops.
912 */
913 if (++pass > 1)
914 return;
915 p = workp->ops;
916 }
917 *p = (uint8_t)op;
918
919 if (end - jump < p)
920 p = workp->ops;
921 else
922 p += jump;
923 }
924 }
925
926 /*
927 * run_mix_schedule --
928 * Schedule the mixed-run operations.
929 */
930 static int
run_mix_schedule(WTPERF * wtperf,WORKLOAD * workp)931 run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp)
932 {
933 CONFIG_OPTS *opts;
934 int64_t pct;
935
936 opts = wtperf->opts;
937
938 if (workp->truncate != 0) {
939 if (workp->insert != 0 ||
940 workp->read != 0 || workp->update != 0) {
941 lprintf(wtperf, EINVAL, 0,
942 "Can't configure truncate in a mixed workload");
943 return (EINVAL);
944 }
945 memset(workp->ops, WORKER_TRUNCATE, sizeof(workp->ops));
946 return (0);
947 }
948
949 /* Confirm reads, inserts and updates cannot all be zero. */
950 if (workp->insert == 0 && workp->read == 0 && workp->update == 0) {
951 lprintf(wtperf, EINVAL, 0, "no operations scheduled");
952 return (EINVAL);
953 }
954
955 /*
956 * Check for a simple case where the thread is only doing insert or
957 * update operations (because the default operation for a
958 * job-mix is read, the subsequent code works fine if only reads are
959 * specified).
960 */
961 if (workp->insert != 0 && workp->read == 0 && workp->update == 0) {
962 memset(workp->ops,
963 opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT,
964 sizeof(workp->ops));
965 return (0);
966 }
967 if (workp->insert == 0 && workp->read == 0 && workp->update != 0) {
968 memset(workp->ops, WORKER_UPDATE, sizeof(workp->ops));
969 return (0);
970 }
971
972 /*
973 * The worker thread configuration is done as ratios of operations. If
974 * the caller gives us something insane like "reads=77,updates=23" (do
975 * 77 reads for every 23 updates), we don't want to do 77 reads followed
976 * by 23 updates, we want to uniformly distribute the read and update
977 * operations across the space. Convert to percentages and then lay out
978 * the operations across an array.
979 *
980 * Percentage conversion is lossy, the application can do stupid stuff
981 * here, for example, imagine a configured ratio of "reads=1,inserts=2,
982 * updates=999999". First, if the percentages are skewed enough, some
983 * operations might never be done. Second, we set the base operation to
984 * read, which means any fractional results from percentage conversion
985 * will be reads, implying read operations in some cases where reads
986 * weren't configured. We should be fine if the application configures
987 * something approaching a rational set of ratios.
988 */
989 memset(workp->ops, WORKER_READ, sizeof(workp->ops));
990
991 pct = (workp->insert * 100) /
992 (workp->insert + workp->read + workp->update);
993 if (pct != 0)
994 run_mix_schedule_op(workp,
995 opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT, pct);
996 pct = (workp->update * 100) /
997 (workp->insert + workp->read + workp->update);
998 if (pct != 0)
999 run_mix_schedule_op(workp, WORKER_UPDATE, pct);
1000 return (0);
1001 }
1002
1003 static WT_THREAD_RET
populate_thread(void * arg)1004 populate_thread(void *arg)
1005 {
1006 struct timespec start, stop;
1007 CONFIG_OPTS *opts;
1008 TRACK *trk;
1009 WTPERF *wtperf;
1010 WTPERF_THREAD *thread;
1011 WT_CONNECTION *conn;
1012 WT_CURSOR **cursors, *cursor;
1013 WT_SESSION *session;
1014 size_t i;
1015 uint64_t op, usecs;
1016 uint32_t opcount;
1017 int intxn, measure_latency, ret, stress_checkpoint_due;
1018 char *value_buf, *key_buf;
1019 const char *cursor_config;
1020
1021 thread = (WTPERF_THREAD *)arg;
1022 wtperf = thread->wtperf;
1023 opts = wtperf->opts;
1024 conn = wtperf->conn;
1025 session = NULL;
1026 cursors = NULL;
1027 ret = stress_checkpoint_due = 0;
1028 trk = &thread->insert;
1029
1030 key_buf = thread->key_buf;
1031 value_buf = thread->value_buf;
1032
1033 if ((ret = conn->open_session(
1034 conn, NULL, opts->sess_config, &session)) != 0) {
1035 lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
1036 goto err;
1037 }
1038
1039 /* Do bulk loads if populate is single-threaded. */
1040 cursor_config =
1041 (opts->populate_threads == 1 && !opts->index) ? "bulk" : NULL;
1042 /* Create the cursors. */
1043 cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
1044 for (i = 0; i < opts->table_count; i++) {
1045 if ((ret = session->open_cursor(
1046 session, wtperf->uris[i], NULL,
1047 cursor_config, &cursors[i])) != 0) {
1048 lprintf(wtperf, ret, 0,
1049 "populate: WT_SESSION.open_cursor: %s",
1050 wtperf->uris[i]);
1051 goto err;
1052 }
1053 }
1054
1055 /* Populate the databases. */
1056 for (intxn = 0, opcount = 0;;) {
1057 op = get_next_incr(wtperf);
1058 if (op > opts->icount)
1059 break;
1060
1061 if (opts->populate_ops_per_txn != 0 && !intxn) {
1062 if ((ret = session->begin_transaction(
1063 session, opts->transaction_config)) != 0) {
1064 lprintf(wtperf, ret, 0,
1065 "Failed starting transaction.");
1066 goto err;
1067 }
1068 intxn = 1;
1069 }
1070 /*
1071 * Figure out which table this op belongs to.
1072 */
1073 cursor = cursors[map_key_to_table(wtperf->opts, op)];
1074 generate_key(opts, key_buf, op);
1075 measure_latency =
1076 opts->sample_interval != 0 &&
1077 trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
1078 if (measure_latency)
1079 __wt_epoch(NULL, &start);
1080 cursor->set_key(cursor, key_buf);
1081 if (opts->random_value)
1082 randomize_value(thread, value_buf);
1083 cursor->set_value(cursor, value_buf);
1084 if ((ret = cursor->insert(cursor)) == WT_ROLLBACK) {
1085 lprintf(wtperf, ret, 0, "insert retrying");
1086 if ((ret = session->rollback_transaction(
1087 session, NULL)) != 0) {
1088 lprintf(wtperf, ret, 0,
1089 "Failed rollback_transaction");
1090 goto err;
1091 }
1092 intxn = 0;
1093 continue;
1094 } else if (ret != 0) {
1095 lprintf(wtperf, ret, 0, "Failed inserting");
1096 goto err;
1097 }
1098 /*
1099 * Gather statistics.
1100 * We measure the latency of inserting a single key. If there
1101 * are multiple tables, it is the time for insertion into all
1102 * of them.
1103 */
1104 if (measure_latency) {
1105 __wt_epoch(NULL, &stop);
1106 ++trk->latency_ops;
1107 usecs = WT_TIMEDIFF_US(stop, start);
1108 track_operation(trk, usecs);
1109 }
1110 ++thread->insert.ops; /* Same as trk->ops */
1111
1112 if (opts->checkpoint_stress_rate != 0 &&
1113 (op % opts->checkpoint_stress_rate) == 0)
1114 stress_checkpoint_due = 1;
1115
1116 if (opts->populate_ops_per_txn != 0) {
1117 if (++opcount < opts->populate_ops_per_txn)
1118 continue;
1119 opcount = 0;
1120
1121 if ((ret = session->commit_transaction(
1122 session, NULL)) != 0)
1123 lprintf(wtperf, ret, 0,
1124 "Fail committing, transaction was aborted");
1125 intxn = 0;
1126 }
1127
1128 if (stress_checkpoint_due && intxn == 0) {
1129 stress_checkpoint_due = 0;
1130 if ((ret = session->checkpoint(session, NULL)) != 0) {
1131 lprintf(wtperf, ret, 0, "Checkpoint failed");
1132 goto err;
1133 }
1134 }
1135 }
1136 if (intxn &&
1137 (ret = session->commit_transaction(session, NULL)) != 0)
1138 lprintf(wtperf, ret, 0,
1139 "Fail committing, transaction was aborted");
1140
1141 if ((ret = session->close(session, NULL)) != 0) {
1142 lprintf(wtperf, ret, 0, "Error closing session in populate");
1143 goto err;
1144 }
1145
1146 /* Notify our caller we failed and shut the system down. */
1147 if (0) {
1148 err: wtperf->error = wtperf->stop = true;
1149 }
1150 free(cursors);
1151
1152 return (WT_THREAD_RET_VALUE);
1153 }
1154
1155 static WT_THREAD_RET
populate_async(void * arg)1156 populate_async(void *arg)
1157 {
1158 struct timespec start, stop;
1159 CONFIG_OPTS *opts;
1160 TRACK *trk;
1161 WTPERF *wtperf;
1162 WTPERF_THREAD *thread;
1163 WT_ASYNC_OP *asyncop;
1164 WT_CONNECTION *conn;
1165 WT_SESSION *session;
1166 uint64_t op, usecs;
1167 int measure_latency, ret;
1168 char *value_buf, *key_buf;
1169
1170 thread = (WTPERF_THREAD *)arg;
1171 wtperf = thread->wtperf;
1172 opts = wtperf->opts;
1173 conn = wtperf->conn;
1174 session = NULL;
1175 ret = 0;
1176 trk = &thread->insert;
1177
1178 key_buf = thread->key_buf;
1179 value_buf = thread->value_buf;
1180
1181 if ((ret = conn->open_session(
1182 conn, NULL, opts->sess_config, &session)) != 0) {
1183 lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
1184 goto err;
1185 }
1186
1187 /*
1188 * Measuring latency of one async op is not meaningful. We
1189 * will measure the time it takes to do all of them, including
1190 * the time to process by workers.
1191 */
1192 measure_latency =
1193 opts->sample_interval != 0 &&
1194 trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
1195 if (measure_latency)
1196 __wt_epoch(NULL, &start);
1197
1198 /* Populate the databases. */
1199 for (;;) {
1200 op = get_next_incr(wtperf);
1201 if (op > opts->icount)
1202 break;
1203 /*
1204 * Allocate an async op for whichever table.
1205 */
1206 while ((ret = conn->async_new_op(
1207 conn, wtperf->uris[map_key_to_table(wtperf->opts, op)],
1208 NULL, &cb, &asyncop)) == EBUSY)
1209 (void)usleep(10000);
1210 if (ret != 0)
1211 goto err;
1212
1213 asyncop->app_private = thread;
1214 generate_key(opts, key_buf, op);
1215 asyncop->set_key(asyncop, key_buf);
1216 if (opts->random_value)
1217 randomize_value(thread, value_buf);
1218 asyncop->set_value(asyncop, value_buf);
1219 if ((ret = asyncop->insert(asyncop)) != 0) {
1220 lprintf(wtperf, ret, 0, "Failed inserting");
1221 goto err;
1222 }
1223 }
1224
1225 /*
1226 * Gather statistics.
1227 * We measure the latency of inserting a single key. If there
1228 * are multiple tables, it is the time for insertion into all
1229 * of them. Note that currently every populate thread will call
1230 * async_flush and those calls will convoy. That is not the
1231 * most efficient way, but we want to flush before measuring latency.
1232 */
1233 if (conn->async_flush(conn) != 0)
1234 goto err;
1235 if (measure_latency) {
1236 __wt_epoch(NULL, &stop);
1237 ++trk->latency_ops;
1238 usecs = WT_TIMEDIFF_US(stop, start);
1239 track_operation(trk, usecs);
1240 }
1241 if ((ret = session->close(session, NULL)) != 0) {
1242 lprintf(wtperf, ret, 0, "Error closing session in populate");
1243 goto err;
1244 }
1245
1246 /* Notify our caller we failed and shut the system down. */
1247 if (0) {
1248 err: wtperf->error = wtperf->stop = true;
1249 }
1250 return (WT_THREAD_RET_VALUE);
1251 }
1252
1253 static WT_THREAD_RET
monitor(void * arg)1254 monitor(void *arg)
1255 {
1256 struct timespec t;
1257 struct tm localt;
1258 CONFIG_OPTS *opts;
1259 FILE *fp;
1260 WTPERF *wtperf;
1261 size_t len;
1262 uint64_t min_thr, reads, inserts, updates;
1263 uint64_t cur_reads, cur_inserts, cur_updates;
1264 uint64_t last_reads, last_inserts, last_updates;
1265 uint32_t read_avg, read_min, read_max;
1266 uint32_t insert_avg, insert_min, insert_max;
1267 uint32_t update_avg, update_min, update_max;
1268 uint32_t latency_max, level;
1269 u_int i;
1270 int msg_err;
1271 const char *str;
1272 char buf[64], *path;
1273
1274 wtperf = (WTPERF *)arg;
1275 opts = wtperf->opts;
1276 assert(opts->sample_interval != 0);
1277
1278 fp = NULL;
1279 path = NULL;
1280
1281 min_thr = (uint64_t)opts->min_throughput;
1282 latency_max = (uint32_t)ms_to_us(opts->max_latency);
1283
1284 /* Open the logging file. */
1285 len = strlen(wtperf->monitor_dir) + 100;
1286 path = dmalloc(len);
1287 testutil_check(__wt_snprintf(
1288 path, len, "%s/monitor", wtperf->monitor_dir));
1289 if ((fp = fopen(path, "w")) == NULL) {
1290 lprintf(wtperf, errno, 0, "%s", path);
1291 goto err;
1292 }
1293 /* Set line buffering for monitor file. */
1294 __wt_stream_set_line_buffer(fp);
1295 fprintf(fp,
1296 "#time,"
1297 "totalsec,"
1298 "read ops per second,"
1299 "insert ops per second,"
1300 "update ops per second,"
1301 "checkpoints,"
1302 "read average latency(uS),"
1303 "read minimum latency(uS),"
1304 "read maximum latency(uS),"
1305 "insert average latency(uS),"
1306 "insert min latency(uS),"
1307 "insert maximum latency(uS),"
1308 "update average latency(uS),"
1309 "update min latency(uS),"
1310 "update maximum latency(uS)"
1311 "\n");
1312 last_reads = last_inserts = last_updates = 0;
1313 while (!wtperf->stop) {
1314 for (i = 0; i < opts->sample_interval; i++) {
1315 sleep(1);
1316 if (wtperf->stop)
1317 break;
1318 }
1319 /* If the workers are done, don't bother with a final call. */
1320 if (wtperf->stop)
1321 break;
1322 if (wtperf->in_warmup)
1323 continue;
1324
1325 __wt_epoch(NULL, &t);
1326 testutil_check(__wt_localtime(NULL, &t.tv_sec, &localt));
1327 testutil_assert(
1328 strftime(buf, sizeof(buf), "%b %d %H:%M:%S", &localt) != 0);
1329
1330 reads = sum_read_ops(wtperf);
1331 inserts = sum_insert_ops(wtperf);
1332 updates = sum_update_ops(wtperf);
1333 latency_read(wtperf, &read_avg, &read_min, &read_max);
1334 latency_insert(wtperf, &insert_avg, &insert_min, &insert_max);
1335 latency_update(wtperf, &update_avg, &update_min, &update_max);
1336
1337 cur_reads = (reads - last_reads) / opts->sample_interval;
1338 cur_updates = (updates - last_updates) / opts->sample_interval;
1339 /*
1340 * For now the only item we need to worry about changing is
1341 * inserts when we transition from the populate phase to
1342 * workload phase.
1343 */
1344 if (inserts < last_inserts)
1345 cur_inserts = 0;
1346 else
1347 cur_inserts =
1348 (inserts - last_inserts) / opts->sample_interval;
1349
1350 (void)fprintf(fp,
1351 "%s,%" PRIu32
1352 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64
1353 ",%c"
1354 ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
1355 ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
1356 ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
1357 "\n",
1358 buf, wtperf->totalsec,
1359 cur_reads, cur_inserts, cur_updates,
1360 wtperf->ckpt ? 'Y' : 'N',
1361 read_avg, read_min, read_max,
1362 insert_avg, insert_min, insert_max,
1363 update_avg, update_min, update_max);
1364
1365 if (latency_max != 0 &&
1366 (read_max > latency_max || insert_max > latency_max ||
1367 update_max > latency_max)) {
1368 if (opts->max_latency_fatal) {
1369 level = 1;
1370 msg_err = WT_PANIC;
1371 str = "ERROR";
1372 } else {
1373 level = 0;
1374 msg_err = 0;
1375 str = "WARNING";
1376 }
1377 lprintf(wtperf, msg_err, level,
1378 "%s: max latency exceeded: threshold %" PRIu32
1379 " read max %" PRIu32 " insert max %" PRIu32
1380 " update max %" PRIu32, str, latency_max,
1381 read_max, insert_max, update_max);
1382 }
1383 if (min_thr != 0 &&
1384 ((cur_reads != 0 && cur_reads < min_thr) ||
1385 (cur_inserts != 0 && cur_inserts < min_thr) ||
1386 (cur_updates != 0 && cur_updates < min_thr))) {
1387 if (opts->min_throughput_fatal) {
1388 level = 1;
1389 msg_err = WT_PANIC;
1390 str = "ERROR";
1391 } else {
1392 level = 0;
1393 msg_err = 0;
1394 str = "WARNING";
1395 }
1396 lprintf(wtperf, msg_err, level,
1397 "%s: minimum throughput not met: threshold %" PRIu64
1398 " reads %" PRIu64 " inserts %" PRIu64
1399 " updates %" PRIu64, str, min_thr, cur_reads,
1400 cur_inserts, cur_updates);
1401 }
1402 last_reads = reads;
1403 last_inserts = inserts;
1404 last_updates = updates;
1405 }
1406
1407 /* Notify our caller we failed and shut the system down. */
1408 if (0) {
1409 err: wtperf->error = wtperf->stop = true;
1410 }
1411
1412 if (fp != NULL)
1413 (void)fclose(fp);
1414 free(path);
1415
1416 return (WT_THREAD_RET_VALUE);
1417 }
1418
1419 static WT_THREAD_RET
checkpoint_worker(void * arg)1420 checkpoint_worker(void *arg)
1421 {
1422 CONFIG_OPTS *opts;
1423 WTPERF *wtperf;
1424 WTPERF_THREAD *thread;
1425 WT_CONNECTION *conn;
1426 WT_SESSION *session;
1427 struct timespec e, s;
1428 uint32_t i;
1429 int ret;
1430
1431 thread = (WTPERF_THREAD *)arg;
1432 wtperf = thread->wtperf;
1433 opts = wtperf->opts;
1434 conn = wtperf->conn;
1435 session = NULL;
1436
1437 if ((ret = conn->open_session(
1438 conn, NULL, opts->sess_config, &session)) != 0) {
1439 lprintf(wtperf, ret, 0,
1440 "open_session failed in checkpoint thread.");
1441 goto err;
1442 }
1443
1444 while (!wtperf->stop) {
1445 /* Break the sleep up, so we notice interrupts faster. */
1446 for (i = 0; i < opts->checkpoint_interval; i++) {
1447 sleep(1);
1448 if (wtperf->stop)
1449 break;
1450 }
1451 /* If the workers are done, don't bother with a final call. */
1452 if (wtperf->stop)
1453 break;
1454
1455 __wt_epoch(NULL, &s);
1456
1457 wtperf->ckpt = true;
1458 if ((ret = session->checkpoint(session, NULL)) != 0) {
1459 lprintf(wtperf, ret, 0, "Checkpoint failed.");
1460 goto err;
1461 }
1462 wtperf->ckpt = false;
1463 ++thread->ckpt.ops;
1464
1465 __wt_epoch(NULL, &e);
1466 }
1467
1468 if (session != NULL &&
1469 ((ret = session->close(session, NULL)) != 0)) {
1470 lprintf(wtperf, ret, 0,
1471 "Error closing session in checkpoint worker.");
1472 goto err;
1473 }
1474
1475 /* Notify our caller we failed and shut the system down. */
1476 if (0) {
1477 err: wtperf->error = wtperf->stop = true;
1478 }
1479
1480 return (WT_THREAD_RET_VALUE);
1481 }
1482
1483 static int
execute_populate(WTPERF * wtperf)1484 execute_populate(WTPERF *wtperf)
1485 {
1486 struct timespec start, stop;
1487 CONFIG_OPTS *opts;
1488 WT_ASYNC_OP *asyncop;
1489 WTPERF_THREAD *popth;
1490 WT_THREAD_CALLBACK(*pfunc)(void *);
1491 size_t i;
1492 uint64_t last_ops, msecs, print_ops_sec;
1493 uint32_t interval, tables;
1494 wt_thread_t idle_table_cycle_thread;
1495 double print_secs;
1496 int elapsed, ret;
1497
1498 opts = wtperf->opts;
1499
1500 lprintf(wtperf, 0, 1,
1501 "Starting %" PRIu32
1502 " populate thread(s) for %" PRIu32 " items",
1503 opts->populate_threads, opts->icount);
1504
1505 /* Start cycling idle tables if configured. */
1506 start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
1507
1508 wtperf->insert_key = 0;
1509
1510 wtperf->popthreads =
1511 dcalloc(opts->populate_threads, sizeof(WTPERF_THREAD));
1512 if (wtperf->use_asyncops) {
1513 lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)",
1514 opts->async_threads);
1515 pfunc = populate_async;
1516 } else
1517 pfunc = populate_thread;
1518 start_threads(wtperf, NULL,
1519 wtperf->popthreads, opts->populate_threads, pfunc);
1520
1521 __wt_epoch(NULL, &start);
1522 for (elapsed = 0, interval = 0, last_ops = 0;
1523 wtperf->insert_key < opts->icount && !wtperf->error;) {
1524 /*
1525 * Sleep for 100th of a second, report_interval is in second
1526 * granularity, each 100th increment of elapsed is a single
1527 * increment of interval.
1528 */
1529 (void)usleep(10000);
1530 if (opts->report_interval == 0 || ++elapsed < 100)
1531 continue;
1532 elapsed = 0;
1533 if (++interval < opts->report_interval)
1534 continue;
1535 interval = 0;
1536 wtperf->totalsec += opts->report_interval;
1537 wtperf->insert_ops = sum_pop_ops(wtperf);
1538 lprintf(wtperf, 0, 1,
1539 "%" PRIu64 " populate inserts (%" PRIu64 " of %"
1540 PRIu32 ") in %" PRIu32 " secs (%" PRIu32 " total secs)",
1541 wtperf->insert_ops - last_ops, wtperf->insert_ops,
1542 opts->icount, opts->report_interval, wtperf->totalsec);
1543 last_ops = wtperf->insert_ops;
1544 }
1545 __wt_epoch(NULL, &stop);
1546
1547 /*
1548 * Move popthreads aside to narrow possible race with the monitor
1549 * thread. The latency tracking code also requires that popthreads be
1550 * NULL when the populate phase is finished, to know that the workload
1551 * phase has started.
1552 */
1553 popth = wtperf->popthreads;
1554 wtperf->popthreads = NULL;
1555 stop_threads(opts->populate_threads, popth);
1556 free(popth);
1557
1558 /* Report if any worker threads didn't finish. */
1559 if (wtperf->error) {
1560 lprintf(wtperf, WT_ERROR, 0,
1561 "Populate thread(s) exited without finishing.");
1562 return (WT_ERROR);
1563 }
1564
1565 lprintf(wtperf,
1566 0, 1, "Finished load of %" PRIu32 " items", opts->icount);
1567 msecs = WT_TIMEDIFF_MS(stop, start);
1568
1569 /*
1570 * This is needed as the divisions will fail if the insert takes no time
1571 * which will only be the case when there is no data to insert.
1572 */
1573 if (msecs == 0) {
1574 print_secs = 0;
1575 print_ops_sec = 0;
1576 } else {
1577 print_secs = (double)msecs / (double)MSEC_PER_SEC;
1578 print_ops_sec = (uint64_t)(opts->icount / print_secs);
1579 }
1580 lprintf(wtperf, 0, 1,
1581 "Load time: %.2f\n" "load ops/sec: %" PRIu64,
1582 print_secs, print_ops_sec);
1583
1584 /*
1585 * If configured, compact to allow LSM merging to complete. We
1586 * set an unlimited timeout because if we close the connection
1587 * then any in-progress compact/merge is aborted.
1588 */
1589 if (opts->compact) {
1590 assert(opts->async_threads > 0);
1591 lprintf(wtperf, 0, 1, "Compact after populate");
1592 __wt_epoch(NULL, &start);
1593 tables = opts->table_count;
1594 for (i = 0; i < opts->table_count; i++) {
1595 /*
1596 * If no ops are available, retry. Any other error,
1597 * return.
1598 */
1599 while ((ret = wtperf->conn->async_new_op(
1600 wtperf->conn, wtperf->uris[i],
1601 "timeout=0", &cb, &asyncop)) == EBUSY)
1602 (void)usleep(10000);
1603 if (ret != 0)
1604 return (ret);
1605
1606 asyncop->app_private = &tables;
1607 if ((ret = asyncop->compact(asyncop)) != 0) {
1608 lprintf(wtperf,
1609 ret, 0, "Async compact failed.");
1610 return (ret);
1611 }
1612 }
1613 if ((ret = wtperf->conn->async_flush(wtperf->conn)) != 0) {
1614 lprintf(wtperf, ret, 0, "Populate async flush failed.");
1615 return (ret);
1616 }
1617 __wt_epoch(NULL, &stop);
1618 lprintf(wtperf, 0, 1,
1619 "Compact completed in %" PRIu64 " seconds",
1620 (uint64_t)(WT_TIMEDIFF_SEC(stop, start)));
1621 assert(tables == 0);
1622 }
1623
1624 /* Stop cycling idle tables. */
1625 stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
1626
1627 return (0);
1628 }
1629
1630 static int
close_reopen(WTPERF * wtperf)1631 close_reopen(WTPERF *wtperf)
1632 {
1633 CONFIG_OPTS *opts;
1634 int ret;
1635
1636 opts = wtperf->opts;
1637
1638 if (opts->in_memory)
1639 return (0);
1640
1641 if (!opts->readonly && !opts->reopen_connection)
1642 return (0);
1643 /*
1644 * Reopen the connection. We do this so that the workload phase always
1645 * starts with the on-disk files, and so that read-only workloads can
1646 * be identified. This is particularly important for LSM, where the
1647 * merge algorithm is more aggressive for read-only trees.
1648 */
1649 /* wtperf->conn is released no matter the return value from close(). */
1650 ret = wtperf->conn->close(wtperf->conn, NULL);
1651 wtperf->conn = NULL;
1652 if (ret != 0) {
1653 lprintf(wtperf, ret, 0, "Closing the connection failed");
1654 return (ret);
1655 }
1656 if ((ret = wiredtiger_open(
1657 wtperf->home, NULL, wtperf->reopen_config, &wtperf->conn)) != 0) {
1658 lprintf(wtperf, ret, 0, "Re-opening the connection failed");
1659 return (ret);
1660 }
1661 /*
1662 * If we started async threads only for the purposes of compact,
1663 * then turn it off before starting the workload so that those extra
1664 * threads looking for work that will never arrive don't affect
1665 * performance.
1666 */
1667 if (opts->compact && !wtperf->use_asyncops) {
1668 if ((ret = wtperf->conn->reconfigure(
1669 wtperf->conn, "async=(enabled=false)")) != 0) {
1670 lprintf(wtperf, ret, 0, "Reconfigure async off failed");
1671 return (ret);
1672 }
1673 }
1674 return (0);
1675 }
1676
1677 static int
execute_workload(WTPERF * wtperf)1678 execute_workload(WTPERF *wtperf)
1679 {
1680 CONFIG_OPTS *opts;
1681 WORKLOAD *workp;
1682 WTPERF_THREAD *threads;
1683 WT_CONNECTION *conn;
1684 WT_SESSION **sessions;
1685 WT_THREAD_CALLBACK(*pfunc)(void *);
1686 wt_thread_t idle_table_cycle_thread;
1687 uint64_t last_ckpts, last_inserts, last_reads, last_truncates;
1688 uint64_t last_updates;
1689 uint32_t interval, run_ops, run_time;
1690 u_int i;
1691 int ret;
1692
1693 opts = wtperf->opts;
1694
1695 wtperf->insert_key = 0;
1696 wtperf->insert_ops = wtperf->read_ops = wtperf->truncate_ops = 0;
1697 wtperf->update_ops = 0;
1698
1699 last_ckpts = last_inserts = last_reads = last_truncates = 0;
1700 last_updates = 0;
1701 ret = 0;
1702
1703 sessions = NULL;
1704
1705 /* Start cycling idle tables. */
1706 start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
1707
1708 if (opts->warmup != 0)
1709 wtperf->in_warmup = true;
1710
1711 /* Allocate memory for the worker threads. */
1712 wtperf->workers =
1713 dcalloc((size_t)wtperf->workers_cnt, sizeof(WTPERF_THREAD));
1714
1715 if (wtperf->use_asyncops) {
1716 lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)",
1717 opts->async_threads);
1718 pfunc = worker_async;
1719 } else
1720 pfunc = worker;
1721
1722 if (opts->session_count_idle != 0) {
1723 sessions = dcalloc((size_t)opts->session_count_idle,
1724 sizeof(WT_SESSION *));
1725 conn = wtperf->conn;
1726 for (i = 0; i < opts->session_count_idle; ++i)
1727 if ((ret = conn->open_session(conn,
1728 NULL, opts->sess_config, &sessions[i])) != 0) {
1729 lprintf(wtperf, ret, 0,
1730 "execute_workload: idle open_session");
1731 goto err;
1732 }
1733 }
1734 /* Start each workload. */
1735 for (threads = wtperf->workers, i = 0,
1736 workp = wtperf->workload; i < wtperf->workload_cnt; ++i, ++workp) {
1737 lprintf(wtperf, 0, 1,
1738 "Starting workload #%u: %" PRId64 " threads, inserts=%"
1739 PRId64 ", reads=%" PRId64 ", updates=%" PRId64
1740 ", truncate=%" PRId64 ", throttle=%" PRIu64,
1741 i + 1, workp->threads, workp->insert,
1742 workp->read, workp->update, workp->truncate,
1743 workp->throttle);
1744
1745 /* Figure out the workload's schedule. */
1746 if ((ret = run_mix_schedule(wtperf, workp)) != 0)
1747 goto err;
1748
1749 /* Start the workload's threads. */
1750 start_threads(
1751 wtperf, workp, threads, (u_int)workp->threads, pfunc);
1752 threads += workp->threads;
1753 }
1754
1755 if (opts->warmup != 0) {
1756 lprintf(wtperf, 0, 1,
1757 "Waiting for warmup duration of %" PRIu32, opts->warmup);
1758 sleep(opts->warmup);
1759 wtperf->in_warmup = false;
1760 }
1761
1762 for (interval = opts->report_interval,
1763 run_time = opts->run_time, run_ops = opts->run_ops;
1764 !wtperf->error;) {
1765 /*
1766 * Sleep for one second at a time.
1767 * If we are tracking run time, check to see if we're done, and
1768 * if we're only tracking run time, go back to sleep.
1769 */
1770 sleep(1);
1771 if (run_time != 0) {
1772 if (--run_time == 0)
1773 break;
1774 if (!interval && !run_ops)
1775 continue;
1776 }
1777
1778 /* Sum the operations we've done. */
1779 wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
1780 wtperf->insert_ops = sum_insert_ops(wtperf);
1781 wtperf->read_ops = sum_read_ops(wtperf);
1782 wtperf->update_ops = sum_update_ops(wtperf);
1783 wtperf->truncate_ops = sum_truncate_ops(wtperf);
1784
1785 /* If we're checking total operations, see if we're done. */
1786 if (run_ops != 0 && run_ops <=
1787 wtperf->insert_ops + wtperf->read_ops + wtperf->update_ops)
1788 break;
1789
1790 /* If writing out throughput information, see if it's time. */
1791 if (interval == 0 || --interval > 0)
1792 continue;
1793 interval = opts->report_interval;
1794 wtperf->totalsec += opts->report_interval;
1795
1796 lprintf(wtperf, 0, 1,
1797 "%" PRIu64 " reads, %" PRIu64 " inserts, %" PRIu64
1798 " updates, %" PRIu64 " truncates, %" PRIu64
1799 " checkpoints in %" PRIu32 " secs (%" PRIu32 " total secs)",
1800 wtperf->read_ops - last_reads,
1801 wtperf->insert_ops - last_inserts,
1802 wtperf->update_ops - last_updates,
1803 wtperf->truncate_ops - last_truncates,
1804 wtperf->ckpt_ops - last_ckpts,
1805 opts->report_interval, wtperf->totalsec);
1806 last_reads = wtperf->read_ops;
1807 last_inserts = wtperf->insert_ops;
1808 last_updates = wtperf->update_ops;
1809 last_truncates = wtperf->truncate_ops;
1810 last_ckpts = wtperf->ckpt_ops;
1811 }
1812
1813 /* Notify the worker threads they are done. */
1814 err: wtperf->stop = true;
1815
1816 /* Stop cycling idle tables. */
1817 stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
1818
1819 stop_threads((u_int)wtperf->workers_cnt, wtperf->workers);
1820
1821 /* Drop tables if configured to and this isn't an error path */
1822 if (ret == 0 &&
1823 opts->drop_tables && (ret = drop_all_tables(wtperf)) != 0)
1824 lprintf(wtperf, ret, 0, "Drop tables failed.");
1825
1826 free(sessions);
1827 /* Report if any worker threads didn't finish. */
1828 if (wtperf->error) {
1829 lprintf(wtperf, WT_ERROR, 0,
1830 "Worker thread(s) exited without finishing.");
1831 if (ret == 0)
1832 ret = WT_ERROR;
1833 }
1834 return (ret);
1835 }
1836
1837 /*
1838 * Ensure that icount matches the number of records in the
1839 * existing table.
1840 */
1841 static int
find_table_count(WTPERF * wtperf)1842 find_table_count(WTPERF *wtperf)
1843 {
1844 CONFIG_OPTS *opts;
1845 WT_CONNECTION *conn;
1846 WT_CURSOR *cursor;
1847 WT_SESSION *session;
1848 uint32_t i, max_icount, table_icount;
1849 int ret, t_ret;
1850 char *key;
1851
1852 opts = wtperf->opts;
1853 conn = wtperf->conn;
1854
1855 max_icount = 0;
1856 if ((ret = conn->open_session(
1857 conn, NULL, opts->sess_config, &session)) != 0) {
1858 lprintf(wtperf, ret, 0,
1859 "find_table_count: open_session failed");
1860 goto out;
1861 }
1862 for (i = 0; i < opts->table_count; i++) {
1863 if ((ret = session->open_cursor(session, wtperf->uris[i],
1864 NULL, NULL, &cursor)) != 0) {
1865 lprintf(wtperf, ret, 0,
1866 "find_table_count: open_cursor failed");
1867 goto err;
1868 }
1869 if ((ret = cursor->prev(cursor)) != 0) {
1870 lprintf(wtperf, ret, 0,
1871 "find_table_count: cursor prev failed");
1872 goto err;
1873 }
1874 if ((ret = cursor->get_key(cursor, &key)) != 0) {
1875 lprintf(wtperf, ret, 0,
1876 "find_table_count: cursor get_key failed");
1877 goto err;
1878 }
1879 table_icount = (uint32_t)atoi(key);
1880 if (table_icount > max_icount)
1881 max_icount = table_icount;
1882
1883 if ((ret = cursor->close(cursor)) != 0) {
1884 lprintf(wtperf, ret, 0,
1885 "find_table_count: cursor close failed");
1886 goto err;
1887 }
1888 }
1889 err: if ((t_ret = session->close(session, NULL)) != 0) {
1890 if (ret == 0)
1891 ret = t_ret;
1892 lprintf(wtperf, ret, 0,
1893 "find_table_count: session close failed");
1894 }
1895 opts->icount = max_icount;
1896 out: return (ret);
1897 }
1898
1899 /*
1900 * Populate the uri array.
1901 */
1902 static void
create_uris(WTPERF * wtperf)1903 create_uris(WTPERF *wtperf)
1904 {
1905 CONFIG_OPTS *opts;
1906 size_t len;
1907 uint32_t i;
1908
1909 opts = wtperf->opts;
1910
1911 wtperf->uris = dcalloc(opts->table_count, sizeof(char *));
1912 len = strlen("table:") + strlen(opts->table_name) + 20;
1913 for (i = 0; i < opts->table_count; i++) {
1914 /* If there is only one table, just use the base name. */
1915 wtperf->uris[i] = dmalloc(len);
1916 if (opts->table_count == 1)
1917 testutil_check(__wt_snprintf(wtperf->uris[i],
1918 len, "table:%s", opts->table_name));
1919 else
1920 testutil_check(__wt_snprintf(wtperf->uris[i],
1921 len, "table:%s%05d", opts->table_name, i));
1922 }
1923
1924 /* Create the log-like-table URI. */
1925 len = strlen("table:") +
1926 strlen(opts->table_name) + strlen("_log_table") + 1;
1927 wtperf->log_table_uri = dmalloc(len);
1928 testutil_check(__wt_snprintf(wtperf->log_table_uri,
1929 len, "table:%s_log_table", opts->table_name));
1930 }
1931
1932 static int
create_tables(WTPERF * wtperf)1933 create_tables(WTPERF *wtperf)
1934 {
1935 CONFIG_OPTS *opts;
1936 WT_SESSION *session;
1937 size_t i;
1938 int ret;
1939 char buf[512];
1940
1941 opts = wtperf->opts;
1942
1943 if ((ret = wtperf->conn->open_session(
1944 wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
1945 lprintf(wtperf, ret, 0,
1946 "Error opening a session on %s", wtperf->home);
1947 return (ret);
1948 }
1949
1950 for (i = 0; i < opts->table_count_idle; i++) {
1951 testutil_check(__wt_snprintf(
1952 buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i));
1953 if ((ret = session->create(
1954 session, buf, opts->table_config)) != 0) {
1955 lprintf(wtperf, ret, 0,
1956 "Error creating idle table %s", buf);
1957 return (ret);
1958 }
1959 }
1960 if (opts->log_like_table && (ret = session->create(session,
1961 wtperf->log_table_uri, "key_format=Q,value_format=S")) != 0) {
1962 lprintf(wtperf, ret, 0, "Error creating log table %s", buf);
1963 return (ret);
1964 }
1965
1966 for (i = 0; i < opts->table_count; i++) {
1967 if (opts->log_partial && i > 0) {
1968 if (((ret = session->create(session,
1969 wtperf->uris[i], wtperf->partial_config)) != 0)) {
1970 lprintf(wtperf, ret, 0,
1971 "Error creating table %s", wtperf->uris[i]);
1972 return (ret);
1973 }
1974 } else if ((ret = session->create(
1975 session, wtperf->uris[i], opts->table_config)) != 0) {
1976 lprintf(wtperf, ret, 0,
1977 "Error creating table %s", wtperf->uris[i]);
1978 return (ret);
1979 }
1980 if (opts->index) {
1981 testutil_check(__wt_snprintf(buf, 512,
1982 "index:%s:val_idx",
1983 wtperf->uris[i] + strlen("table:")));
1984 if ((ret = session->create(
1985 session, buf, "columns=(val)")) != 0) {
1986 lprintf(wtperf, ret, 0,
1987 "Error creating index %s", buf);
1988 return (ret);
1989 }
1990 }
1991 }
1992
1993 if ((ret = session->close(session, NULL)) != 0) {
1994 lprintf(wtperf, ret, 0, "Error closing session");
1995 return (ret);
1996 }
1997
1998 return (0);
1999 }
2000
2001 /*
2002 * wtperf_copy --
2003 * Create a new WTPERF structure as a duplicate of a previous one.
2004 */
2005 static void
wtperf_copy(const WTPERF * src,WTPERF ** retp)2006 wtperf_copy(const WTPERF *src, WTPERF **retp)
2007 {
2008 CONFIG_OPTS *opts;
2009 WTPERF *dest;
2010 size_t i;
2011
2012 opts = src->opts;
2013
2014 dest = dcalloc(1, sizeof(WTPERF));
2015
2016 /*
2017 * Don't copy the home and monitor directories, they are filled in by
2018 * our caller, explicitly.
2019 */
2020
2021 if (src->partial_config != NULL)
2022 dest->partial_config = dstrdup(src->partial_config);
2023 if (src->reopen_config != NULL)
2024 dest->reopen_config = dstrdup(src->reopen_config);
2025
2026 if (src->uris != NULL) {
2027 dest->uris = dcalloc(opts->table_count, sizeof(char *));
2028 for (i = 0; i < opts->table_count; i++)
2029 dest->uris[i] = dstrdup(src->uris[i]);
2030 }
2031
2032 if (src->async_config != NULL)
2033 dest->async_config = dstrdup(src->async_config);
2034
2035 dest->ckptthreads = NULL;
2036 dest->popthreads = NULL;
2037
2038 dest->workers = NULL;
2039 dest->workers_cnt = src->workers_cnt;
2040 if (src->workload_cnt != 0) {
2041 dest->workload_cnt = src->workload_cnt;
2042 dest->workload = dcalloc(src->workload_cnt, sizeof(WORKLOAD));
2043 memcpy(dest->workload,
2044 src->workload, src->workload_cnt * sizeof(WORKLOAD));
2045 }
2046
2047 TAILQ_INIT(&dest->stone_head);
2048
2049 dest->opts = src->opts;
2050
2051 *retp = dest;
2052 }
2053
2054 /*
2055 * wtperf_free --
2056 * Free any storage allocated in the WTPERF structure.
2057 */
2058 static void
wtperf_free(WTPERF * wtperf)2059 wtperf_free(WTPERF *wtperf)
2060 {
2061 CONFIG_OPTS *opts;
2062 size_t i;
2063
2064 opts = wtperf->opts;
2065
2066 free(wtperf->home);
2067 free(wtperf->monitor_dir);
2068 free(wtperf->partial_config);
2069 free(wtperf->reopen_config);
2070 free(wtperf->log_table_uri);
2071
2072 if (wtperf->uris != NULL) {
2073 for (i = 0; i < opts->table_count; i++)
2074 free(wtperf->uris[i]);
2075 free(wtperf->uris);
2076 }
2077
2078 free(wtperf->async_config);
2079
2080 free(wtperf->ckptthreads);
2081 free(wtperf->popthreads);
2082
2083 free(wtperf->workers);
2084 free(wtperf->workload);
2085
2086 cleanup_truncate_config(wtperf);
2087 }
2088
2089 /*
2090 * config_compress --
2091 * Parse the compression configuration.
2092 */
2093 static int
config_compress(WTPERF * wtperf)2094 config_compress(WTPERF *wtperf)
2095 {
2096 CONFIG_OPTS *opts;
2097 int ret;
2098 const char *s;
2099
2100 opts = wtperf->opts;
2101 ret = 0;
2102
2103 s = opts->compression;
2104 if (strcmp(s, "none") == 0) {
2105 wtperf->compress_ext = NULL;
2106 wtperf->compress_table = NULL;
2107 } else if (strcmp(s, "lz4") == 0) {
2108 #ifndef HAVE_BUILTIN_EXTENSION_LZ4
2109 wtperf->compress_ext = LZ4_EXT;
2110 #endif
2111 wtperf->compress_table = LZ4_BLK;
2112 } else if (strcmp(s, "snappy") == 0) {
2113 #ifndef HAVE_BUILTIN_EXTENSION_SNAPPY
2114 wtperf->compress_ext = SNAPPY_EXT;
2115 #endif
2116 wtperf->compress_table = SNAPPY_BLK;
2117 } else if (strcmp(s, "zlib") == 0) {
2118 #ifndef HAVE_BUILTIN_EXTENSION_ZLIB
2119 wtperf->compress_ext = ZLIB_EXT;
2120 #endif
2121 wtperf->compress_table = ZLIB_BLK;
2122 } else if (strcmp(s, "zstd") == 0) {
2123 #ifndef HAVE_BUILTIN_EXTENSION_ZSTD
2124 wtperf->compress_ext = ZSTD_EXT;
2125 #endif
2126 wtperf->compress_table = ZSTD_BLK;
2127 } else {
2128 fprintf(stderr,
2129 "invalid compression configuration: %s\n", s);
2130 ret = EINVAL;
2131 }
2132 return (ret);
2133
2134 }
2135
2136 static int
start_all_runs(WTPERF * wtperf)2137 start_all_runs(WTPERF *wtperf)
2138 {
2139 CONFIG_OPTS *opts;
2140 WTPERF *next_wtperf, **wtperfs;
2141 size_t i, len;
2142 wt_thread_t *threads;
2143 int ret;
2144
2145 opts = wtperf->opts;
2146 wtperfs = NULL;
2147 ret = 0;
2148
2149 if (opts->database_count == 1)
2150 return (start_run(wtperf));
2151
2152 /* Allocate an array to hold our WTPERF copies. */
2153 wtperfs = dcalloc(opts->database_count, sizeof(WTPERF *));
2154
2155 /* Allocate an array to hold our thread IDs. */
2156 threads = dcalloc(opts->database_count, sizeof(*threads));
2157
2158 for (i = 0; i < opts->database_count; i++) {
2159 wtperf_copy(wtperf, &next_wtperf);
2160 wtperfs[i] = next_wtperf;
2161
2162 /*
2163 * Set up unique home/monitor directories for each database.
2164 * Re-create the directories if creating the databases.
2165 */
2166 len = strlen(wtperf->home) + 5;
2167 next_wtperf->home = dmalloc(len);
2168 testutil_check(__wt_snprintf(
2169 next_wtperf->home, len, "%s/D%02d", wtperf->home, (int)i));
2170 if (opts->create != 0)
2171 recreate_dir(next_wtperf->home);
2172
2173 len = strlen(wtperf->monitor_dir) + 5;
2174 next_wtperf->monitor_dir = dmalloc(len);
2175 testutil_check(__wt_snprintf(next_wtperf->monitor_dir,
2176 len, "%s/D%02d", wtperf->monitor_dir, (int)i));
2177 if (opts->create != 0 &&
2178 strcmp(next_wtperf->home, next_wtperf->monitor_dir) != 0)
2179 recreate_dir(next_wtperf->monitor_dir);
2180
2181 testutil_check(__wt_thread_create(NULL,
2182 &threads[i], thread_run_wtperf, next_wtperf));
2183 }
2184
2185 /* Wait for threads to finish. */
2186 for (i = 0; i < opts->database_count; i++)
2187 testutil_check(__wt_thread_join(NULL, &threads[i]));
2188
2189 for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) {
2190 wtperf_free(wtperfs[i]);
2191 free(wtperfs[i]);
2192 }
2193 free(wtperfs);
2194 free(threads);
2195
2196 return (ret);
2197 }
2198
2199 /* Run an instance of wtperf for a given configuration. */
2200 static WT_THREAD_RET
thread_run_wtperf(void * arg)2201 thread_run_wtperf(void *arg)
2202 {
2203 WTPERF *wtperf;
2204 int ret;
2205
2206 wtperf = (WTPERF *)arg;
2207 if ((ret = start_run(wtperf)) != 0)
2208 lprintf(wtperf, ret, 0, "Run failed for: %s.", wtperf->home);
2209 return (WT_THREAD_RET_VALUE);
2210 }
2211
2212 static int
start_run(WTPERF * wtperf)2213 start_run(WTPERF *wtperf)
2214 {
2215 CONFIG_OPTS *opts;
2216 wt_thread_t monitor_thread;
2217 uint64_t total_ops;
2218 uint32_t run_time;
2219 int monitor_created, ret, t_ret;
2220
2221 opts = wtperf->opts;
2222 monitor_created = ret = 0;
2223 /* [-Wconditional-uninitialized] */
2224 memset(&monitor_thread, 0, sizeof(monitor_thread));
2225
2226 if ((ret = setup_log_file(wtperf)) != 0)
2227 goto err;
2228
2229 if ((ret = wiredtiger_open( /* Open the real connection. */
2230 wtperf->home, NULL, opts->conn_config, &wtperf->conn)) != 0) {
2231 lprintf(wtperf, ret, 0, "Error connecting to %s", wtperf->home);
2232 goto err;
2233 }
2234
2235 create_uris(wtperf);
2236
2237 /* If creating, create the tables. */
2238 if (opts->create != 0 && (ret = create_tables(wtperf)) != 0)
2239 goto err;
2240
2241 /* Start the monitor thread. */
2242 if (opts->sample_interval != 0) {
2243 testutil_check(__wt_thread_create(
2244 NULL, &monitor_thread, monitor, wtperf));
2245 monitor_created = 1;
2246 }
2247
2248 /* If creating, populate the table. */
2249 if (opts->create != 0 && execute_populate(wtperf) != 0)
2250 goto err;
2251
2252 /* Optional workload. */
2253 if (wtperf->workers_cnt != 0 &&
2254 (opts->run_time != 0 || opts->run_ops != 0)) {
2255 /*
2256 * If we have a workload, close and reopen the connection so
2257 * that LSM can detect read-only workloads.
2258 */
2259 if (close_reopen(wtperf) != 0)
2260 goto err;
2261
2262 /* Didn't create, set insert count. */
2263 if (opts->create == 0 &&
2264 opts->random_range == 0 && find_table_count(wtperf) != 0)
2265 goto err;
2266 /* Start the checkpoint thread. */
2267 if (opts->checkpoint_threads != 0) {
2268 lprintf(wtperf, 0, 1,
2269 "Starting %" PRIu32 " checkpoint thread(s)",
2270 opts->checkpoint_threads);
2271 wtperf->ckptthreads = dcalloc(
2272 opts->checkpoint_threads, sizeof(WTPERF_THREAD));
2273 start_threads(wtperf, NULL, wtperf->ckptthreads,
2274 opts->checkpoint_threads, checkpoint_worker);
2275 }
2276 if (opts->pre_load_data)
2277 pre_load_data(wtperf);
2278
2279 /* Execute the workload. */
2280 if ((ret = execute_workload(wtperf)) != 0)
2281 goto err;
2282
2283 /* One final summation of the operations we've completed. */
2284 wtperf->read_ops = sum_read_ops(wtperf);
2285 wtperf->insert_ops = sum_insert_ops(wtperf);
2286 wtperf->truncate_ops = sum_truncate_ops(wtperf);
2287 wtperf->update_ops = sum_update_ops(wtperf);
2288 wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
2289 total_ops =
2290 wtperf->read_ops + wtperf->insert_ops + wtperf->update_ops;
2291
2292 run_time = opts->run_time == 0 ? 1 : opts->run_time;
2293 lprintf(wtperf, 0, 1,
2294 "Executed %" PRIu64 " read operations (%" PRIu64
2295 "%%) %" PRIu64 " ops/sec",
2296 wtperf->read_ops, (wtperf->read_ops * 100) / total_ops,
2297 wtperf->read_ops / run_time);
2298 lprintf(wtperf, 0, 1,
2299 "Executed %" PRIu64 " insert operations (%" PRIu64
2300 "%%) %" PRIu64 " ops/sec",
2301 wtperf->insert_ops, (wtperf->insert_ops * 100) / total_ops,
2302 wtperf->insert_ops / run_time);
2303 lprintf(wtperf, 0, 1,
2304 "Executed %" PRIu64 " truncate operations (%" PRIu64
2305 "%%) %" PRIu64 " ops/sec",
2306 wtperf->truncate_ops,
2307 (wtperf->truncate_ops * 100) / total_ops,
2308 wtperf->truncate_ops / run_time);
2309 lprintf(wtperf, 0, 1,
2310 "Executed %" PRIu64 " update operations (%" PRIu64
2311 "%%) %" PRIu64 " ops/sec",
2312 wtperf->update_ops, (wtperf->update_ops * 100) / total_ops,
2313 wtperf->update_ops / run_time);
2314 lprintf(wtperf, 0, 1,
2315 "Executed %" PRIu64 " checkpoint operations",
2316 wtperf->ckpt_ops);
2317
2318 latency_print(wtperf);
2319 }
2320
2321 if (0) {
2322 err: if (ret == 0)
2323 ret = EXIT_FAILURE;
2324 }
2325
2326 /* Notify the worker threads they are done. */
2327 wtperf->stop = true;
2328
2329 stop_threads(1, wtperf->ckptthreads);
2330
2331 if (monitor_created != 0)
2332 testutil_check(__wt_thread_join(NULL, &monitor_thread));
2333
2334 if (wtperf->conn != NULL && opts->close_conn &&
2335 (t_ret = wtperf->conn->close(wtperf->conn, NULL)) != 0) {
2336 lprintf(wtperf, t_ret, 0,
2337 "Error closing connection to %s", wtperf->home);
2338 if (ret == 0)
2339 ret = t_ret;
2340 }
2341
2342 if (ret == 0) {
2343 if (opts->run_time == 0 && opts->run_ops == 0)
2344 lprintf(wtperf, 0, 1, "Run completed");
2345 else
2346 lprintf(wtperf, 0, 1, "Run completed: %" PRIu32 " %s",
2347 opts->run_time == 0 ?
2348 opts->run_ops : opts->run_time,
2349 opts->run_time == 0 ? "operations" : "seconds");
2350 }
2351
2352 if (wtperf->logf != NULL) {
2353 if ((t_ret = fflush(wtperf->logf)) != 0 && ret == 0)
2354 ret = t_ret;
2355 if ((t_ret = fclose(wtperf->logf)) != 0 && ret == 0)
2356 ret = t_ret;
2357 }
2358 return (ret);
2359 }
2360
2361 extern int __wt_optind, __wt_optreset;
2362 extern char *__wt_optarg;
2363
2364 /*
2365 * usage --
2366 * wtperf usage print, no error.
2367 */
2368 static void
usage(void)2369 usage(void)
2370 {
2371 printf("wtperf [-C config] "
2372 "[-H mount] [-h home] [-O file] [-o option] [-T config]\n");
2373 printf("\t-C <string> additional connection configuration\n");
2374 printf("\t (added to option conn_config)\n");
2375 printf("\t-H <mount> configure Helium volume mount point\n");
2376 printf("\t-h <string> Wired Tiger home must exist, default WT_TEST\n");
2377 printf("\t-O <file> file contains options as listed below\n");
2378 printf("\t-o option=val[,option=val,...] set options listed below\n");
2379 printf("\t-T <string> additional table configuration\n");
2380 printf("\t (added to option table_config)\n");
2381 printf("\n");
2382 config_opt_usage();
2383 }
2384
2385 int
main(int argc,char * argv[])2386 main(int argc, char *argv[])
2387 {
2388 CONFIG_OPTS *opts;
2389 WTPERF *wtperf, _wtperf;
2390 size_t pos, req_len, sreq_len;
2391 bool monitor_set;
2392 int ch, ret;
2393 const char *cmdflags = "C:h:m:O:o:T:";
2394 const char *append_comma, *config_opts;
2395 char *cc_buf, *path, *sess_cfg, *tc_buf, *user_cconfig, *user_tconfig;
2396
2397 /* The first WTPERF structure (from which all others are derived). */
2398 wtperf = &_wtperf;
2399 memset(wtperf, 0, sizeof(*wtperf));
2400 wtperf->home = dstrdup(DEFAULT_HOME);
2401 wtperf->monitor_dir = dstrdup(DEFAULT_MONITOR_DIR);
2402 TAILQ_INIT(&wtperf->stone_head);
2403 config_opt_init(&wtperf->opts);
2404
2405 opts = wtperf->opts;
2406 monitor_set = false;
2407 ret = 0;
2408 config_opts = NULL;
2409 cc_buf = sess_cfg = tc_buf = user_cconfig = user_tconfig = NULL;
2410
2411 /* Do a basic validation of options, and home is needed before open. */
2412 while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
2413 switch (ch) {
2414 case 'C':
2415 if (user_cconfig == NULL)
2416 user_cconfig = dstrdup(__wt_optarg);
2417 else {
2418 user_cconfig = drealloc(user_cconfig,
2419 strlen(user_cconfig) +
2420 strlen(__wt_optarg) + 2);
2421 strcat(user_cconfig, ",");
2422 strcat(user_cconfig, __wt_optarg);
2423 }
2424 break;
2425 case 'h':
2426 free(wtperf->home);
2427 wtperf->home = dstrdup(__wt_optarg);
2428 break;
2429 case 'm':
2430 free(wtperf->monitor_dir);
2431 wtperf->monitor_dir = dstrdup(__wt_optarg);
2432 monitor_set = true;
2433 break;
2434 case 'O':
2435 config_opts = __wt_optarg;
2436 break;
2437 case 'T':
2438 if (user_tconfig == NULL)
2439 user_tconfig = dstrdup(__wt_optarg);
2440 else {
2441 user_tconfig = drealloc(user_tconfig,
2442 strlen(user_tconfig) +
2443 strlen(__wt_optarg) + 2);
2444 strcat(user_tconfig, ",");
2445 strcat(user_tconfig, __wt_optarg);
2446 }
2447 break;
2448 case '?':
2449 usage();
2450 goto einval;
2451 }
2452
2453 /*
2454 * If the user did not specify a monitor directory then set the
2455 * monitor directory to the home dir.
2456 */
2457 if (!monitor_set) {
2458 free(wtperf->monitor_dir);
2459 wtperf->monitor_dir = dstrdup(wtperf->home);
2460 }
2461
2462 /* Parse configuration settings from configuration file. */
2463 if (config_opts != NULL && config_opt_file(wtperf, config_opts) != 0)
2464 goto einval;
2465
2466 /* Parse options that override values set via a configuration file. */
2467 __wt_optreset = __wt_optind = 1;
2468 while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
2469 switch (ch) {
2470 case 'o':
2471 /* Allow -o key=value */
2472 if (config_opt_str(wtperf, __wt_optarg) != 0)
2473 goto einval;
2474 break;
2475 }
2476
2477 if (opts->populate_threads == 0 && opts->icount != 0) {
2478 lprintf(wtperf, 1, 0,
2479 "Cannot have 0 populate threads when icount is set\n");
2480 goto err;
2481 }
2482
2483 wtperf->async_config = NULL;
2484 /*
2485 * If the user specified async_threads we use async for all ops.
2486 * If the user wants compaction, then we also enable async for
2487 * the compact operation, but not for the workloads.
2488 */
2489 if (opts->async_threads > 0) {
2490 if (F_ISSET(wtperf, CFG_TRUNCATE)) {
2491 lprintf(wtperf,
2492 1, 0, "Cannot run truncate and async\n");
2493 goto err;
2494 }
2495 wtperf->use_asyncops = true;
2496 }
2497 if (opts->compact && opts->async_threads == 0)
2498 opts->async_threads = 2;
2499 if (opts->async_threads > 0) {
2500 /*
2501 * The maximum number of async threads is two digits, so just
2502 * use that to compute the space we need. Assume the default
2503 * of 1024 for the max ops. Although we could bump that up
2504 * to 4096 if needed.
2505 */
2506 req_len = strlen(",async=(enabled=true,threads=)") + 4;
2507 wtperf->async_config = dmalloc(req_len);
2508 testutil_check(__wt_snprintf(wtperf->async_config, req_len,
2509 ",async=(enabled=true,threads=%" PRIu32 ")",
2510 opts->async_threads));
2511 }
2512 if ((ret = config_compress(wtperf)) != 0)
2513 goto err;
2514
2515 /* You can't have truncate on a random collection. */
2516 if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->random_range) {
2517 lprintf(wtperf, 1, 0, "Cannot run truncate and random_range\n");
2518 goto err;
2519 }
2520
2521 /* We can't run truncate with more than one table. */
2522 if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->table_count > 1) {
2523 lprintf(wtperf, 1, 0, "Cannot truncate more than 1 table\n");
2524 goto err;
2525 }
2526
2527 /* Make stdout line buffered, so verbose output appears quickly. */
2528 __wt_stream_set_line_buffer(stdout);
2529
2530 /* Concatenate non-default configuration strings. */
2531 if (user_cconfig != NULL || opts->session_count_idle > 0 ||
2532 wtperf->compress_ext != NULL || wtperf->async_config != NULL ||
2533 opts->in_memory) {
2534 req_len = 20;
2535 req_len += wtperf->async_config != NULL ?
2536 strlen(wtperf->async_config) : 0;
2537 req_len += wtperf->compress_ext != NULL ?
2538 strlen(wtperf->compress_ext) : 0;
2539 if (opts->session_count_idle > 0) {
2540 sreq_len = strlen("session_max=") + 6;
2541 req_len += sreq_len;
2542 sess_cfg = dmalloc(sreq_len);
2543 testutil_check(__wt_snprintf(sess_cfg, sreq_len,
2544 "session_max=%" PRIu32,
2545 opts->session_count_idle +
2546 wtperf->workers_cnt + opts->populate_threads + 10));
2547 }
2548 req_len += opts->in_memory ? strlen("in_memory=true") : 0;
2549 req_len += user_cconfig != NULL ? strlen(user_cconfig) : 0;
2550 cc_buf = dmalloc(req_len);
2551
2552 pos = 0;
2553 append_comma = "";
2554 if (wtperf->async_config != NULL &&
2555 strlen(wtperf->async_config) != 0) {
2556 testutil_check(__wt_snprintf_len_incr(
2557 cc_buf + pos, req_len - pos, &pos, "%s%s",
2558 append_comma, wtperf->async_config));
2559 append_comma = ",";
2560 }
2561 if (wtperf->compress_ext != NULL &&
2562 strlen(wtperf->compress_ext) != 0) {
2563 testutil_check(__wt_snprintf_len_incr(
2564 cc_buf + pos, req_len - pos, &pos, "%s%s",
2565 append_comma, wtperf->compress_ext));
2566 append_comma = ",";
2567 }
2568 if (opts->in_memory) {
2569 testutil_check(__wt_snprintf_len_incr(
2570 cc_buf + pos, req_len - pos, &pos, "%s%s",
2571 append_comma, "in_memory=true"));
2572 append_comma = ",";
2573 }
2574 if (sess_cfg != NULL && strlen(sess_cfg) != 0) {
2575 testutil_check(__wt_snprintf_len_incr(
2576 cc_buf + pos, req_len - pos, &pos, "%s%s",
2577 append_comma, sess_cfg));
2578 append_comma = ",";
2579 }
2580 if (user_cconfig != NULL && strlen(user_cconfig) != 0) {
2581 testutil_check(__wt_snprintf_len_incr(
2582 cc_buf + pos, req_len - pos, &pos, "%s%s",
2583 append_comma, user_cconfig));
2584 }
2585
2586 if (strlen(cc_buf) != 0 && (ret =
2587 config_opt_name_value(wtperf, "conn_config", cc_buf)) != 0)
2588 goto err;
2589 }
2590 if (opts->index ||
2591 user_tconfig != NULL || wtperf->compress_table != NULL) {
2592 req_len = 20;
2593 req_len += wtperf->compress_table != NULL ?
2594 strlen(wtperf->compress_table) : 0;
2595 req_len += opts->index ? strlen(INDEX_COL_NAMES) : 0;
2596 req_len += user_tconfig != NULL ? strlen(user_tconfig) : 0;
2597 tc_buf = dmalloc(req_len);
2598
2599 pos = 0;
2600 append_comma = "";
2601 if (wtperf->compress_table != NULL &&
2602 strlen(wtperf->compress_table) != 0) {
2603 testutil_check(__wt_snprintf_len_incr(
2604 tc_buf + pos, req_len - pos, &pos, "%s%s",
2605 append_comma, wtperf->compress_table));
2606 append_comma = ",";
2607 }
2608 if (opts->index) {
2609 testutil_check(__wt_snprintf_len_incr(
2610 tc_buf + pos, req_len - pos, &pos, "%s%s",
2611 append_comma, INDEX_COL_NAMES));
2612 append_comma = ",";
2613 }
2614 if (user_tconfig != NULL && strlen(user_tconfig) != 0) {
2615 testutil_check(__wt_snprintf_len_incr(
2616 tc_buf + pos, req_len - pos, &pos, "%s%s",
2617 append_comma, user_tconfig));
2618 }
2619
2620 if (strlen(tc_buf) != 0 && (ret =
2621 config_opt_name_value(wtperf, "table_config", tc_buf)) != 0)
2622 goto err;
2623 }
2624 if (opts->log_partial && opts->table_count > 1) {
2625 req_len = strlen(opts->table_config) +
2626 strlen(LOG_PARTIAL_CONFIG) + 1;
2627 wtperf->partial_config = dmalloc(req_len);
2628 testutil_check(__wt_snprintf(
2629 wtperf->partial_config, req_len, "%s%s",
2630 opts->table_config, LOG_PARTIAL_CONFIG));
2631 }
2632 /*
2633 * Set the config for reopen. If readonly add in that string.
2634 * If not readonly then just copy the original conn_config.
2635 */
2636 if (opts->readonly)
2637 req_len = strlen(opts->conn_config) +
2638 strlen(READONLY_CONFIG) + 1;
2639 else
2640 req_len = strlen(opts->conn_config) + 1;
2641 wtperf->reopen_config = dmalloc(req_len);
2642 if (opts->readonly)
2643 testutil_check(__wt_snprintf(
2644 wtperf->reopen_config, req_len, "%s%s",
2645 opts->conn_config, READONLY_CONFIG));
2646 else
2647 testutil_check(__wt_snprintf(
2648 wtperf->reopen_config, req_len, "%s", opts->conn_config));
2649
2650 /* Sanity-check the configuration. */
2651 if ((ret = config_sanity(wtperf)) != 0)
2652 goto err;
2653
2654 /* If creating, remove and re-create the home directory. */
2655 if (opts->create != 0)
2656 recreate_dir(wtperf->home);
2657
2658 /* Write a copy of the config. */
2659 req_len = strlen(wtperf->home) + strlen("/CONFIG.wtperf") + 1;
2660 path = dmalloc(req_len);
2661 testutil_check(__wt_snprintf(
2662 path, req_len, "%s/CONFIG.wtperf", wtperf->home));
2663 config_opt_log(opts, path);
2664 free(path);
2665
2666 /* Display the configuration. */
2667 if (opts->verbose > 1)
2668 config_opt_print(wtperf);
2669
2670 if ((ret = start_all_runs(wtperf)) != 0)
2671 goto err;
2672
2673 if (0) {
2674 einval: ret = EINVAL;
2675 }
2676
2677 err: wtperf_free(wtperf);
2678 config_opt_cleanup(opts);
2679
2680 free(cc_buf);
2681 free(sess_cfg);
2682 free(tc_buf);
2683 free(user_cconfig);
2684 free(user_tconfig);
2685
2686 return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
2687 }
2688
2689 static void
start_threads(WTPERF * wtperf,WORKLOAD * workp,WTPERF_THREAD * base,u_int num,WT_THREAD_CALLBACK (* func)(void *))2690 start_threads(WTPERF *wtperf, WORKLOAD *workp,
2691 WTPERF_THREAD *base, u_int num, WT_THREAD_CALLBACK(*func)(void *))
2692 {
2693 CONFIG_OPTS *opts;
2694 WTPERF_THREAD *thread;
2695 u_int i;
2696
2697 opts = wtperf->opts;
2698
2699 /* Initialize the threads. */
2700 for (i = 0, thread = base; i < num; ++i, ++thread) {
2701 thread->wtperf = wtperf;
2702 thread->workload = workp;
2703
2704 /*
2705 * We don't want the threads executing in lock-step, seed each
2706 * one differently.
2707 */
2708 __wt_random_init_seed(NULL, &thread->rnd);
2709
2710 /*
2711 * Every thread gets a key/data buffer because we don't bother
2712 * to distinguish between threads needing them and threads that
2713 * don't, it's not enough memory to bother. These buffers hold
2714 * strings: trailing NUL is included in the size.
2715 */
2716 thread->key_buf = dcalloc(opts->key_sz, 1);
2717 thread->value_buf = dcalloc(opts->value_sz_max, 1);
2718
2719 /*
2720 * Initialize and then toss in a bit of random values if needed.
2721 */
2722 memset(thread->value_buf, 'a', opts->value_sz - 1);
2723 if (opts->random_value)
2724 randomize_value(thread, thread->value_buf);
2725
2726 /*
2727 * Every thread gets tracking information and is initialized
2728 * for latency measurements, for the same reason.
2729 */
2730 thread->ckpt.min_latency =
2731 thread->insert.min_latency = thread->read.min_latency =
2732 thread->update.min_latency = UINT32_MAX;
2733 thread->ckpt.max_latency = thread->insert.max_latency =
2734 thread->read.max_latency = thread->update.max_latency = 0;
2735 }
2736
2737 /* Start the threads. */
2738 for (i = 0, thread = base; i < num; ++i, ++thread)
2739 testutil_check(__wt_thread_create(
2740 NULL, &thread->handle, func, thread));
2741 }
2742
2743 static void
stop_threads(u_int num,WTPERF_THREAD * threads)2744 stop_threads(u_int num, WTPERF_THREAD *threads)
2745 {
2746 u_int i;
2747
2748 if (num == 0 || threads == NULL)
2749 return;
2750
2751 for (i = 0; i < num; ++i, ++threads) {
2752 testutil_check(__wt_thread_join(NULL, &threads->handle));
2753
2754 free(threads->key_buf);
2755 threads->key_buf = NULL;
2756 free(threads->value_buf);
2757 threads->value_buf = NULL;
2758 }
2759
2760 /*
2761 * We don't free the thread structures or any memory referenced, or NULL
2762 * the reference when we stop the threads; the thread structure is still
2763 * being read by the monitor thread (among others). As a standalone
2764 * program, leaking memory isn't a concern, and it's simpler that way.
2765 */
2766 }
2767
2768 static void
recreate_dir(const char * name)2769 recreate_dir(const char *name)
2770 {
2771 char *buf;
2772 size_t len;
2773
2774 len = strlen(name) * 2 + 100;
2775 buf = dmalloc(len);
2776 testutil_check(__wt_snprintf(
2777 buf, len, "rm -rf %s && mkdir %s", name, name));
2778 testutil_checkfmt(system(buf), "system: %s", buf);
2779 free(buf);
2780 }
2781
2782 static int
drop_all_tables(WTPERF * wtperf)2783 drop_all_tables(WTPERF *wtperf)
2784 {
2785 struct timespec start, stop;
2786 CONFIG_OPTS *opts;
2787 WT_SESSION *session;
2788 size_t i;
2789 uint64_t msecs;
2790 int ret, t_ret;
2791
2792 opts = wtperf->opts;
2793
2794 /* Drop any tables. */
2795 if ((ret = wtperf->conn->open_session(
2796 wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
2797 lprintf(wtperf, ret, 0,
2798 "Error opening a session on %s", wtperf->home);
2799 return (ret);
2800 }
2801 __wt_epoch(NULL, &start);
2802 for (i = 0; i < opts->table_count; i++) {
2803 if ((ret =
2804 session->drop(session, wtperf->uris[i], NULL)) != 0) {
2805 lprintf(wtperf, ret, 0,
2806 "Error dropping table %s", wtperf->uris[i]);
2807 goto err;
2808 }
2809 }
2810 __wt_epoch(NULL, &stop);
2811 msecs = WT_TIMEDIFF_MS(stop, start);
2812 lprintf(wtperf, 0, 1,
2813 "Executed %" PRIu32 " drop operations average time %" PRIu64 "ms",
2814 opts->table_count, msecs / opts->table_count);
2815
2816 err: if ((t_ret = session->close(session, NULL)) != 0 && ret == 0)
2817 ret = t_ret;
2818 return (ret);
2819 }
2820
2821 static uint64_t
wtperf_value_range(WTPERF * wtperf)2822 wtperf_value_range(WTPERF *wtperf)
2823 {
2824 CONFIG_OPTS *opts;
2825
2826 opts = wtperf->opts;
2827
2828 if (opts->random_range)
2829 return (opts->icount + opts->random_range);
2830 /*
2831 * It is legal to configure a zero size populate phase, hide that
2832 * from other code by pretending the range is 1 in that case.
2833 */
2834 if (opts->icount + wtperf->insert_key == 0)
2835 return (1);
2836 return (opts->icount +
2837 wtperf->insert_key - (u_int)(wtperf->workers_cnt + 1));
2838 }
2839
2840 static uint64_t
wtperf_rand(WTPERF_THREAD * thread)2841 wtperf_rand(WTPERF_THREAD *thread)
2842 {
2843 CONFIG_OPTS *opts;
2844 WT_CURSOR *rnd_cursor;
2845 WTPERF *wtperf;
2846 double S1, S2, U;
2847 uint64_t rval;
2848 int ret;
2849 char *key_buf;
2850
2851 wtperf = thread->wtperf;
2852 opts = wtperf->opts;
2853
2854 /*
2855 * If we have a random cursor set up then use it.
2856 */
2857 if ((rnd_cursor = thread->rand_cursor) != NULL) {
2858 if ((ret = rnd_cursor->next(rnd_cursor)) != 0) {
2859 lprintf(wtperf, ret, 0, "worker: rand next failed");
2860 /* 0 is outside the expected range. */
2861 return (0);
2862 }
2863 if ((ret = rnd_cursor->get_key(rnd_cursor, &key_buf)) != 0) {
2864 lprintf(wtperf, ret, 0,
2865 "worker: rand next key retrieval");
2866 return (0);
2867 }
2868 /*
2869 * Resetting the cursor is not fatal. We still return the
2870 * value we retrieved above. We do it so that we don't
2871 * leave a cursor positioned.
2872 */
2873 if ((ret = rnd_cursor->reset(rnd_cursor)) != 0)
2874 lprintf(wtperf, ret, 0,
2875 "worker: rand cursor reset failed");
2876 extract_key(key_buf, &rval);
2877 return (rval);
2878 }
2879
2880 /*
2881 * Use WiredTiger's random number routine: it's lock-free and fairly
2882 * good.
2883 */
2884 rval = __wt_random(&thread->rnd);
2885
2886 /* Use Pareto distribution to give 80/20 hot/cold values. */
2887 if (opts->pareto != 0) {
2888 #define PARETO_SHAPE 1.5
2889 S1 = (-1 / PARETO_SHAPE);
2890 S2 = wtperf_value_range(wtperf) *
2891 (opts->pareto / 100.0) * (PARETO_SHAPE - 1);
2892 U = 1 - (double)rval / (double)UINT32_MAX;
2893 rval = (uint64_t)((pow(U, S1) - 1) * S2);
2894 /*
2895 * This Pareto calculation chooses out of range values about
2896 * 2% of the time, from my testing. That will lead to the
2897 * first item in the table being "hot".
2898 */
2899 if (rval > wtperf_value_range(wtperf))
2900 rval = 0;
2901 }
2902 /*
2903 * Wrap the key to within the expected range and avoid zero: we never
2904 * insert that key.
2905 */
2906 rval = (rval % wtperf_value_range(wtperf)) + 1;
2907 return (rval);
2908 }
2909