1 /*-
2  * Public Domain 2014-2018 MongoDB, Inc.
3  * Public Domain 2008-2014 WiredTiger, Inc.
4  *
5  * This is free and unencumbered software released into the public domain.
6  *
7  * Anyone is free to copy, modify, publish, use, compile, sell, or
8  * distribute this software, either in source code form or as a compiled
9  * binary, for any purpose, commercial or non-commercial, and by any
10  * means.
11  *
12  * In jurisdictions that recognize copyright laws, the author or authors
13  * of this software dedicate any and all copyright interest in the
14  * software to the public domain. We make this dedication for the benefit
15  * of the public at large and to the detriment of our heirs and
16  * successors. We intend this dedication to be an overt act of
17  * relinquishment in perpetuity of all present and future rights to this
18  * software under copyright law.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23  * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
24  * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
25  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26  * OTHER DEALINGS IN THE SOFTWARE.
27  */
28 
29 #include "wtperf.h"
30 
31 /* Default values. */
32 #define	DEFAULT_HOME		"WT_TEST"
33 #define	DEFAULT_MONITOR_DIR	"WT_TEST"
34 
35 static WT_THREAD_RET checkpoint_worker(void *);
36 static int	 drop_all_tables(WTPERF *);
37 static int	 execute_populate(WTPERF *);
38 static int	 execute_workload(WTPERF *);
39 static int	 find_table_count(WTPERF *);
40 static WT_THREAD_RET monitor(void *);
41 static WT_THREAD_RET populate_thread(void *);
42 static void	 randomize_value(WTPERF_THREAD *, char *);
43 static void	 recreate_dir(const char *);
44 static int	 start_all_runs(WTPERF *);
45 static int	 start_run(WTPERF *);
46 static void	 start_threads(WTPERF *, WORKLOAD *,
47 		    WTPERF_THREAD *, u_int, WT_THREAD_CALLBACK(*)(void *));
48 static void	 stop_threads(u_int, WTPERF_THREAD *);
49 static WT_THREAD_RET thread_run_wtperf(void *);
50 static void	 update_value_delta(WTPERF_THREAD *);
51 static WT_THREAD_RET worker(void *);
52 
53 static uint64_t	 wtperf_rand(WTPERF_THREAD *);
54 static uint64_t	 wtperf_value_range(WTPERF *);
55 
56 #define	INDEX_COL_NAMES	"columns=(key,val)"
57 
58 /* Retrieve an ID for the next insert operation. */
59 static inline uint64_t
get_next_incr(WTPERF * wtperf)60 get_next_incr(WTPERF *wtperf)
61 {
62 	return (__wt_atomic_add64(&wtperf->insert_key, 1));
63 }
64 
65 /*
66  * Each time this function is called we will overwrite the first and one
67  * other element in the value buffer.
68  */
69 static void
randomize_value(WTPERF_THREAD * thread,char * value_buf)70 randomize_value(WTPERF_THREAD *thread, char *value_buf)
71 {
72 	CONFIG_OPTS *opts;
73 	uint8_t *vb;
74 	uint32_t i, max_range, rand_val;
75 
76 	opts = thread->wtperf->opts;
77 
78 	/*
79 	 * Limit how much of the buffer we validate for length, this means
80 	 * that only threads that do growing updates will ever make changes to
81 	 * values outside of the initial value size, but that's a fair trade
82 	 * off for avoiding figuring out how long the value is more accurately
83 	 * in this performance sensitive function.
84 	 */
85 	if (thread->workload == NULL || thread->workload->update_delta == 0)
86 		max_range = opts->value_sz;
87 	else if (thread->workload->update_delta > 0)
88 		max_range = opts->value_sz_max;
89 	else
90 		max_range = opts->value_sz_min;
91 
92 	/*
93 	 * Generate a single random value and re-use it. We generally only
94 	 * have small ranges in this function, so avoiding a bunch of calls
95 	 * is worthwhile.
96 	 */
97 	rand_val = __wt_random(&thread->rnd);
98 	i = rand_val % (max_range - 1);
99 
100 	/*
101 	 * Ensure we don't write past the end of a value when configured for
102 	 * randomly sized values.
103 	 */
104 	while (value_buf[i] == '\0' && i > 0)
105 		--i;
106 
107 	vb = (uint8_t *)value_buf;
108 	vb[0] = ((rand_val >> 8) % 255) + 1;
109 	/*
110 	 * If i happened to be 0, we'll be re-writing the same value
111 	 * twice, but that doesn't matter.
112 	 */
113 	vb[i] = ((rand_val >> 16) % 255) + 1;
114 }
115 
116 /*
117  * Partition data by key ranges.
118  */
119 static uint32_t
map_key_to_table(CONFIG_OPTS * opts,uint64_t k)120 map_key_to_table(CONFIG_OPTS *opts, uint64_t k)
121 {
122 	if (opts->range_partition) {
123 		/* Take care to return a result in [0..table_count-1]. */
124 		if (k > opts->icount + opts->random_range)
125 			return (0);
126 		return ((uint32_t)((k - 1) /
127 		    ((opts->icount + opts->random_range +
128 		    opts->table_count - 1) / opts->table_count)));
129 	} else
130 		return ((uint32_t)(k % opts->table_count));
131 }
132 
133 /*
134  * Figure out and extend the size of the value string, used for growing
135  * updates. We know that the value to be updated is in the threads value
136  * scratch buffer.
137  */
138 static inline void
update_value_delta(WTPERF_THREAD * thread)139 update_value_delta(WTPERF_THREAD *thread)
140 {
141 	CONFIG_OPTS *opts;
142 	WTPERF *wtperf;
143 	char * value;
144 	int64_t delta, len, new_len;
145 
146 	wtperf = thread->wtperf;
147 	opts = wtperf->opts;
148 	value = thread->value_buf;
149 	delta = thread->workload->update_delta;
150 	len = (int64_t)strlen(value);
151 
152 	if (delta == INT64_MAX)
153 		delta = __wt_random(&thread->rnd) %
154 		    (opts->value_sz_max - opts->value_sz);
155 
156 	/* Ensure we aren't changing across boundaries */
157 	if (delta > 0 && len + delta > opts->value_sz_max)
158 		delta = opts->value_sz_max - len;
159 	else if (delta < 0 && len + delta < opts->value_sz_min)
160 		delta = opts->value_sz_min - len;
161 
162 	/* Bail if there isn't anything to do */
163 	if (delta == 0)
164 		return;
165 
166 	if (delta < 0)
167 		value[len + delta] = '\0';
168 	else {
169 		/* Extend the value by the configured amount. */
170 		for (new_len = len;
171 		    new_len < opts->value_sz_max && new_len - len < delta;
172 		    new_len++)
173 			value[new_len] = 'a';
174 	}
175 }
176 
177 static int
cb_asyncop(WT_ASYNC_CALLBACK * cb,WT_ASYNC_OP * op,int ret,uint32_t flags)178 cb_asyncop(WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP *op, int ret, uint32_t flags)
179 {
180 	TRACK *trk;
181 	WTPERF *wtperf;
182 	WTPERF_THREAD *thread;
183 	WT_ASYNC_OPTYPE type;
184 	uint32_t *tables;
185 	int t_ret;
186 	char *value;
187 
188 	(void)cb;
189 	(void)flags;
190 
191 	wtperf = NULL;			/* -Wconditional-uninitialized */
192 	thread = NULL;			/* -Wconditional-uninitialized */
193 
194 	type = op->get_type(op);
195 	if (type != WT_AOP_COMPACT) {
196 		thread = (WTPERF_THREAD *)op->app_private;
197 		wtperf = thread->wtperf;
198 	}
199 
200 	trk = NULL;
201 	switch (type) {
202 	case WT_AOP_COMPACT:
203 		tables = (uint32_t *)op->app_private;
204 		(void)__wt_atomic_add32(tables, (uint32_t)-1);
205 		break;
206 	case WT_AOP_INSERT:
207 		trk = &thread->insert;
208 		break;
209 	case WT_AOP_SEARCH:
210 		trk = &thread->read;
211 		if (ret == 0 &&
212 		    (t_ret = op->get_value(op, &value)) != 0) {
213 			ret = t_ret;
214 			lprintf(wtperf, ret, 0, "get_value in read.");
215 			goto err;
216 		}
217 		break;
218 	case WT_AOP_UPDATE:
219 		trk = &thread->update;
220 		break;
221 	case WT_AOP_NONE:
222 	case WT_AOP_REMOVE:
223 		/* We never expect this type. */
224 		lprintf(wtperf,
225 		    ret, 0, "No type in op %" PRIu64, op->get_id(op));
226 		goto err;
227 	}
228 
229 	/*
230 	 * Either we have success and we track it, or failure and panic.
231 	 *
232 	 * Reads and updates can fail with WT_NOTFOUND: we may be searching
233 	 * in a random range, or an insert op might have updated the
234 	 * last record in the table but not yet finished the actual insert.
235 	 */
236 	if (type == WT_AOP_COMPACT)
237 		return (0);
238 	if (ret == 0 || (ret == WT_NOTFOUND && type != WT_AOP_INSERT)) {
239 		if (!wtperf->in_warmup)
240 			(void)__wt_atomic_add64(&trk->ops, 1);
241 		return (0);
242 	}
243 err:
244 	/* Panic if error */
245 	lprintf(wtperf, ret, 0, "Error in op %" PRIu64, op->get_id(op));
246 	wtperf->error = wtperf->stop = true;
247 	return (1);
248 }
249 
250 static WT_ASYNC_CALLBACK cb = { cb_asyncop };
251 
252 /*
253  * track_operation --
254  *	Update an operation's tracking structure with new latency information.
255  */
256 static inline void
track_operation(TRACK * trk,uint64_t usecs)257 track_operation(TRACK *trk, uint64_t usecs)
258 {
259 	uint64_t v;
260 
261 					/* average microseconds per call */
262 	v = (uint64_t)usecs;
263 
264 	trk->latency += usecs;		/* track total latency */
265 
266 	if (v > trk->max_latency)	/* track max/min latency */
267 		trk->max_latency = (uint32_t)v;
268 	if (v < trk->min_latency)
269 		trk->min_latency = (uint32_t)v;
270 
271 	/*
272 	 * Update a latency bucket.
273 	 * First buckets: usecs from 100us to 1000us at 100us each.
274 	 */
275 	if (v < 1000)
276 		++trk->us[v];
277 
278 	/*
279 	 * Second buckets: milliseconds from 1ms to 1000ms, at 1ms each.
280 	 */
281 	else if (v < ms_to_us(1000))
282 		++trk->ms[us_to_ms(v)];
283 
284 	/*
285 	 * Third buckets are seconds from 1s to 100s, at 1s each.
286 	 */
287 	else if (v < sec_to_us(100))
288 		++trk->sec[us_to_sec(v)];
289 
290 	/* >100 seconds, accumulate in the biggest bucket. */
291 	else
292 		++trk->sec[ELEMENTS(trk->sec) - 1];
293 }
294 
295 static const char *
op_name(uint8_t * op)296 op_name(uint8_t *op)
297 {
298 	switch (*op) {
299 	case WORKER_INSERT:
300 		return ("insert");
301 	case WORKER_INSERT_RMW:
302 		return ("insert_rmw");
303 	case WORKER_READ:
304 		return ("read");
305 	case WORKER_TRUNCATE:
306 		return ("truncate");
307 	case WORKER_UPDATE:
308 		return ("update");
309 	default:
310 		return ("unknown");
311 	}
312 	/* NOTREACHED */
313 }
314 
315 static WT_THREAD_RET
worker_async(void * arg)316 worker_async(void *arg)
317 {
318 	CONFIG_OPTS *opts;
319 	WTPERF *wtperf;
320 	WTPERF_THREAD *thread;
321 	WT_ASYNC_OP *asyncop;
322 	WT_CONNECTION *conn;
323 	uint64_t next_val;
324 	uint8_t *op, *op_end;
325 	int ret;
326 	char *key_buf, *value_buf;
327 
328 	thread = (WTPERF_THREAD *)arg;
329 	wtperf = thread->wtperf;
330 	opts = wtperf->opts;
331 	conn = wtperf->conn;
332 
333 	key_buf = thread->key_buf;
334 	value_buf = thread->value_buf;
335 
336 	op = thread->workload->ops;
337 	op_end = op + sizeof(thread->workload->ops);
338 
339 	while (!wtperf->stop) {
340 		/*
341 		 * Generate the next key and setup operation specific
342 		 * statistics tracking objects.
343 		 */
344 		switch (*op) {
345 		case WORKER_INSERT:
346 		case WORKER_INSERT_RMW:
347 			if (opts->random_range)
348 				next_val = wtperf_rand(thread);
349 			else
350 				next_val = opts->icount + get_next_incr(wtperf);
351 			break;
352 		case WORKER_READ:
353 		case WORKER_UPDATE:
354 			next_val = wtperf_rand(thread);
355 
356 			/*
357 			 * If the workload is started without a populate phase
358 			 * we rely on at least one insert to get a valid item
359 			 * id.
360 			 */
361 			if (wtperf_value_range(wtperf) < next_val)
362 				continue;
363 			break;
364 		default:
365 			goto err;		/* can't happen */
366 		}
367 
368 		generate_key(opts, key_buf, next_val);
369 
370 		/*
371 		 * Spread the data out around the multiple databases.
372 		 * Sleep to allow workers a chance to run and process async ops.
373 		 * Then retry to get an async op.
374 		 */
375 		while ((ret = conn->async_new_op(conn,
376 		    wtperf->uris[map_key_to_table(wtperf->opts, next_val)],
377 		    NULL, &cb, &asyncop)) == EBUSY)
378 			(void)usleep(10000);
379 		if (ret != 0)
380 			goto err;
381 
382 		asyncop->app_private = thread;
383 		asyncop->set_key(asyncop, key_buf);
384 		switch (*op) {
385 		case WORKER_READ:
386 			ret = asyncop->search(asyncop);
387 			if (ret == 0)
388 				break;
389 			goto op_err;
390 		case WORKER_INSERT:
391 			if (opts->random_value)
392 				randomize_value(thread, value_buf);
393 			asyncop->set_value(asyncop, value_buf);
394 			if ((ret = asyncop->insert(asyncop)) == 0)
395 				break;
396 			goto op_err;
397 		case WORKER_UPDATE:
398 			if (opts->random_value)
399 				randomize_value(thread, value_buf);
400 			asyncop->set_value(asyncop, value_buf);
401 			if ((ret = asyncop->update(asyncop)) == 0)
402 				break;
403 			goto op_err;
404 		default:
405 op_err:			lprintf(wtperf, ret, 0,
406 			    "%s failed for: %s, range: %"PRIu64,
407 			    op_name(op), key_buf, wtperf_value_range(wtperf));
408 			goto err;		/* can't happen */
409 		}
410 
411 		/* Schedule the next operation */
412 		if (++op == op_end)
413 			op = thread->workload->ops;
414 	}
415 
416 	if (conn->async_flush(conn) != 0)
417 		goto err;
418 
419 	/* Notify our caller we failed and shut the system down. */
420 	if (0) {
421 err:		wtperf->error = wtperf->stop = true;
422 	}
423 	return (WT_THREAD_RET_VALUE);
424 }
425 
426 /*
427  * do_range_reads --
428  *	If configured to execute a sequence of next operations after each
429  *	search do them. Ensuring the keys we see are always in order.
430  */
431 static int
do_range_reads(WTPERF * wtperf,WT_CURSOR * cursor,int64_t read_range)432 do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor, int64_t read_range)
433 {
434 	uint64_t next_val, prev_val;
435 	int64_t range;
436 	char *range_key_buf;
437 	char buf[512];
438 	int ret;
439 
440 	ret = 0;
441 
442 	if (read_range == 0)
443 		return (0);
444 
445 	memset(&buf[0], 0, 512 * sizeof(char));
446 	range_key_buf = &buf[0];
447 
448 	/* Save where the first key is for comparisons. */
449 	testutil_check(cursor->get_key(cursor, &range_key_buf));
450 	extract_key(range_key_buf, &next_val);
451 
452 	for (range = 0; range < read_range; ++range) {
453 		prev_val = next_val;
454 		ret = cursor->next(cursor);
455 		/* We are done if we reach the end. */
456 		if (ret != 0)
457 			break;
458 
459 		/* Retrieve and decode the key */
460 		testutil_check(cursor->get_key(cursor, &range_key_buf));
461 		extract_key(range_key_buf, &next_val);
462 		if (next_val < prev_val) {
463 			lprintf(wtperf, EINVAL, 0,
464 			    "Out of order keys %" PRIu64
465 			    " came before %" PRIu64,
466 			    prev_val, next_val);
467 			return (EINVAL);
468 		}
469 	}
470 	return (0);
471 }
472 
473 /* pre_load_data --
474  *	Pull everything into cache before starting the workload phase.
475  */
476 static void
pre_load_data(WTPERF * wtperf)477 pre_load_data(WTPERF *wtperf)
478 {
479 	CONFIG_OPTS *opts;
480 	WT_CONNECTION *conn;
481 	WT_CURSOR *cursor;
482 	WT_SESSION *session;
483 	size_t i;
484 	int ret;
485 	char *key;
486 
487 	opts = wtperf->opts;
488 	conn = wtperf->conn;
489 
490 	testutil_check(conn->open_session(
491 	    conn, NULL, opts->sess_config, &session));
492 	for (i = 0; i < opts->table_count; i++) {
493 		testutil_check(session->open_cursor(
494 		    session, wtperf->uris[i], NULL, NULL, &cursor));
495 		while ((ret = cursor->next(cursor)) == 0)
496 			testutil_check(cursor->get_key(cursor, &key));
497 		testutil_assert(ret == WT_NOTFOUND);
498 		testutil_check(cursor->close(cursor));
499 	}
500 	testutil_check(session->close(session, NULL));
501 }
502 
503 static WT_THREAD_RET
worker(void * arg)504 worker(void *arg)
505 {
506 	struct timespec start, stop;
507 	CONFIG_OPTS *opts;
508 	TRACK *trk;
509 	WORKLOAD *workload;
510 	WTPERF *wtperf;
511 	WTPERF_THREAD *thread;
512 	WT_CONNECTION *conn;
513 	WT_CURSOR **cursors, *cursor, *log_table_cursor, *tmp_cursor;
514 	WT_SESSION *session;
515 	size_t i;
516 	int64_t ops, ops_per_txn;
517 	uint64_t log_id, next_val, usecs;
518 	uint8_t *op, *op_end;
519 	int measure_latency, ret, truncated;
520 	char *value_buf, *key_buf, *value;
521 	char buf[512];
522 
523 	thread = (WTPERF_THREAD *)arg;
524 	workload = thread->workload;
525 	wtperf = thread->wtperf;
526 	opts = wtperf->opts;
527 	conn = wtperf->conn;
528 	cursors = NULL;
529 	cursor = log_table_cursor = NULL;	/* -Wconditional-initialized */
530 	ops = 0;
531 	ops_per_txn = workload->ops_per_txn;
532 	session = NULL;
533 	trk = NULL;
534 
535 	if ((ret = conn->open_session(
536 	    conn, NULL, opts->sess_config, &session)) != 0) {
537 		lprintf(wtperf, ret, 0, "worker: WT_CONNECTION.open_session");
538 		goto err;
539 	}
540 	for (i = 0; i < opts->table_count_idle; i++) {
541 		testutil_check(__wt_snprintf(
542 		    buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i));
543 		if ((ret = session->open_cursor(
544 		    session, buf, NULL, NULL, &tmp_cursor)) != 0) {
545 			lprintf(wtperf, ret, 0,
546 			    "Error opening idle table %s", buf);
547 			goto err;
548 		}
549 		if ((ret = tmp_cursor->close(tmp_cursor)) != 0) {
550 			lprintf(wtperf, ret, 0,
551 			    "Error closing idle table %s", buf);
552 			goto err;
553 		}
554 	}
555 	if (workload->table_index != INT32_MAX) {
556 		if ((ret = session->open_cursor(session,
557 		    wtperf->uris[workload->table_index],
558 		    NULL, NULL, &cursor)) != 0) {
559 			lprintf(wtperf, ret, 0,
560 			    "worker: WT_SESSION.open_cursor: %s",
561 			    wtperf->uris[workload->table_index]);
562 			goto err;
563 		}
564 		if ((ret = session->open_cursor(session,
565 		    wtperf->uris[workload->table_index],
566 		    NULL, "next_random=true", &thread->rand_cursor)) != 0) {
567 			lprintf(wtperf, ret, 0,
568 			    "worker: WT_SESSION.open_cursor: random %s",
569 			    wtperf->uris[workload->table_index]);
570 			goto err;
571 		}
572 	} else {
573 		cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
574 		for (i = 0; i < opts->table_count; i++) {
575 			if ((ret = session->open_cursor(session,
576 			    wtperf->uris[i], NULL, NULL, &cursors[i])) != 0) {
577 				lprintf(wtperf, ret, 0,
578 				    "worker: WT_SESSION.open_cursor: %s",
579 				    wtperf->uris[i]);
580 				goto err;
581 			}
582 		}
583 	}
584 	if (opts->log_like_table && (ret = session->open_cursor(session,
585 	    wtperf->log_table_uri, NULL, NULL, &log_table_cursor)) != 0) {
586 		lprintf(wtperf, ret, 0,
587 		    "worker: WT_SESSION.open_cursor: %s",
588 		    wtperf->log_table_uri);
589 		goto err;
590 	}
591 
592 	/* Setup the timer for throttling. */
593 	if (workload->throttle != 0)
594 		setup_throttle(thread);
595 
596 	/* Setup for truncate */
597 	if (workload->truncate != 0)
598 		setup_truncate(wtperf, thread, session);
599 
600 	key_buf = thread->key_buf;
601 	value_buf = thread->value_buf;
602 
603 	op = workload->ops;
604 	op_end = op + sizeof(workload->ops);
605 
606 	if ((ops_per_txn != 0 || opts->log_like_table) &&
607 		(ret = session->begin_transaction(session, NULL)) != 0) {
608 		lprintf(wtperf, ret, 0, "First transaction begin failed");
609 		goto err;
610 	}
611 
612 	while (!wtperf->stop) {
613 		if (workload->pause != 0)
614 			(void)sleep((unsigned int)workload->pause);
615 		/*
616 		 * Generate the next key and setup operation specific
617 		 * statistics tracking objects.
618 		 */
619 		switch (*op) {
620 		case WORKER_INSERT:
621 		case WORKER_INSERT_RMW:
622 			trk = &thread->insert;
623 			if (opts->random_range)
624 				next_val = wtperf_rand(thread);
625 			else
626 				next_val = opts->icount + get_next_incr(wtperf);
627 			break;
628 		case WORKER_READ:
629 			trk = &thread->read;
630 			/* FALLTHROUGH */
631 		case WORKER_UPDATE:
632 			if (*op == WORKER_UPDATE)
633 				trk = &thread->update;
634 			next_val = wtperf_rand(thread);
635 
636 			/*
637 			 * If the workload is started without a populate phase
638 			 * we rely on at least one insert to get a valid item
639 			 * id.
640 			 */
641 			if (wtperf_value_range(wtperf) < next_val)
642 				continue;
643 			break;
644 		case WORKER_TRUNCATE:
645 			/* Required but not used. */
646 			next_val = wtperf_rand(thread);
647 			break;
648 		default:
649 			goto err;		/* can't happen */
650 		}
651 
652 		generate_key(opts, key_buf, next_val);
653 
654 		if (workload->table_index == INT32_MAX)
655 			/*
656 			 * Spread the data out around the multiple databases.
657 			 */
658 			cursor = cursors[
659 			    map_key_to_table(wtperf->opts, next_val)];
660 
661 		/*
662 		 * Skip the first time we do an operation, when trk->ops
663 		 * is 0, to avoid first time latency spikes.
664 		 */
665 		measure_latency =
666 		    opts->sample_interval != 0 && trk != NULL &&
667 		    trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
668 		if (measure_latency)
669 			__wt_epoch(NULL, &start);
670 
671 		cursor->set_key(cursor, key_buf);
672 
673 		switch (*op) {
674 		case WORKER_READ:
675 			/*
676 			 * Reads can fail with WT_NOTFOUND: we may be searching
677 			 * in a random range, or an insert thread might have
678 			 * updated the last record in the table but not yet
679 			 * finished the actual insert.  Count failed search in
680 			 * a random range as a "read".
681 			 */
682 			ret = cursor->search(cursor);
683 			if (ret == 0) {
684 				if ((ret = cursor->get_value(
685 				    cursor, &value)) != 0) {
686 					lprintf(wtperf, ret, 0,
687 					    "get_value in read.");
688 					goto err;
689 				}
690 				/*
691 				 * If we want to read a range, then call next
692 				 * for several operations, confirming that the
693 				 * next key is in the correct order.
694 				 */
695 				ret = do_range_reads(wtperf,
696 				    cursor, workload->read_range);
697 			}
698 
699 			if (ret == 0 || ret == WT_NOTFOUND)
700 				break;
701 			goto op_err;
702 		case WORKER_INSERT_RMW:
703 			if ((ret = cursor->search(cursor)) != WT_NOTFOUND)
704 				goto op_err;
705 
706 			/* The error return reset the cursor's key. */
707 			cursor->set_key(cursor, key_buf);
708 
709 			/* FALLTHROUGH */
710 		case WORKER_INSERT:
711 			if (opts->random_value)
712 				randomize_value(thread, value_buf);
713 			cursor->set_value(cursor, value_buf);
714 			if ((ret = cursor->insert(cursor)) == 0)
715 				break;
716 			goto op_err;
717 		case WORKER_TRUNCATE:
718 			if ((ret = run_truncate(wtperf,
719 			    thread, cursor, session, &truncated)) == 0) {
720 				if (truncated)
721 					trk = &thread->truncate;
722 				else
723 					trk = &thread->truncate_sleep;
724 				/* Pause between truncate attempts */
725 				(void)usleep(1000);
726 				break;
727 			}
728 			goto op_err;
729 		case WORKER_UPDATE:
730 			if ((ret = cursor->search(cursor)) == 0) {
731 				if ((ret = cursor->get_value(
732 				    cursor, &value)) != 0) {
733 					lprintf(wtperf, ret, 0,
734 					    "get_value in update.");
735 					goto err;
736 				}
737 				/*
738 				 * Copy as much of the previous value as is
739 				 * safe, and be sure to NUL-terminate.
740 				 */
741 				strncpy(value_buf,
742 				    value, opts->value_sz_max - 1);
743 				if (workload->update_delta != 0)
744 					update_value_delta(thread);
745 				if (value_buf[0] == 'a')
746 					value_buf[0] = 'b';
747 				else
748 					value_buf[0] = 'a';
749 				if (opts->random_value)
750 					randomize_value(thread, value_buf);
751 				cursor->set_value(cursor, value_buf);
752 				if ((ret = cursor->update(cursor)) == 0)
753 					break;
754 				goto op_err;
755 			}
756 
757 			/*
758 			 * Reads can fail with WT_NOTFOUND: we may be searching
759 			 * in a random range, or an insert thread might have
760 			 * updated the last record in the table but not yet
761 			 * finished the actual insert.  Count failed search in
762 			 * a random range as a "read".
763 			 */
764 			if (ret == WT_NOTFOUND)
765 				break;
766 
767 op_err:			if (ret == WT_ROLLBACK && ops_per_txn != 0) {
768 				/*
769 				 * If we are running with explicit transactions
770 				 * configured and we hit a WT_ROLLBACK, then we
771 				 * should rollback the current transaction and
772 				 * attempt to continue.
773 				 * This does break the guarantee of insertion
774 				 * order in cases of ordered inserts, as we
775 				 * aren't retrying here.
776 				 */
777 				lprintf(wtperf, ret, 1,
778 				    "%s for: %s, range: %"PRIu64, op_name(op),
779 				    key_buf, wtperf_value_range(wtperf));
780 				if ((ret = session->rollback_transaction(
781 				    session, NULL)) != 0) {
782 					lprintf(wtperf, ret, 0,
783 					     "Failed rollback_transaction");
784 					goto err;
785 				}
786 				if ((ret = session->begin_transaction(
787 				    session, NULL)) != 0) {
788 					lprintf(wtperf, ret, 0,
789 					    "Worker begin transaction failed");
790 					goto err;
791 				}
792 				break;
793 			}
794 			lprintf(wtperf, ret, 0,
795 			    "%s failed for: %s, range: %"PRIu64,
796 			    op_name(op), key_buf, wtperf_value_range(wtperf));
797 			goto err;
798 		default:
799 			goto err;		/* can't happen */
800 		}
801 
802 		/* Update the log-like table. */
803 		if (opts->log_like_table &&
804 		    (*op != WORKER_READ && *op != WORKER_TRUNCATE)) {
805 			log_id =
806 			    __wt_atomic_add64(&wtperf->log_like_table_key, 1);
807 			log_table_cursor->set_key(log_table_cursor, log_id);
808 			log_table_cursor->set_value(
809 			    log_table_cursor, value_buf);
810 			if ((ret =
811 			    log_table_cursor->insert(log_table_cursor)) != 0) {
812 				lprintf(wtperf, ret, 0, "Cursor insert failed");
813 				goto err;
814 			}
815 		}
816 
817 		/* Release the cursor, if we have multiple tables. */
818 		if (opts->table_count > 1 && ret == 0 &&
819 		    *op != WORKER_INSERT && *op != WORKER_INSERT_RMW) {
820 			if ((ret = cursor->reset(cursor)) != 0) {
821 				lprintf(wtperf, ret, 0, "Cursor reset failed");
822 				goto err;
823 			}
824 		}
825 
826 		/* Gather statistics */
827 		if (!wtperf->in_warmup) {
828 			if (measure_latency) {
829 				__wt_epoch(NULL, &stop);
830 				++trk->latency_ops;
831 				usecs = WT_TIMEDIFF_US(stop, start);
832 				track_operation(trk, usecs);
833 			}
834 			/* Increment operation count */
835 			++trk->ops;
836 		}
837 
838 		/*
839 		 * Commit the transaction if grouping operations together
840 		 * or tracking changes in our log table.
841 		 */
842 		if ((opts->log_like_table && ops_per_txn == 0) ||
843 		    (ops_per_txn != 0 && ops++ % ops_per_txn == 0)) {
844 			if ((ret = session->commit_transaction(
845 			    session, NULL)) != 0) {
846 				lprintf(wtperf, ret, 0,
847 				    "Worker transaction commit failed");
848 				goto err;
849 			}
850 			if ((ret = session->begin_transaction(
851 			    session, NULL)) != 0) {
852 				lprintf(wtperf, ret, 0,
853 				    "Worker begin transaction failed");
854 				goto err;
855 			}
856 		}
857 
858 		/* Schedule the next operation */
859 		if (++op == op_end)
860 			op = workload->ops;
861 
862 		/*
863 		 * Decrement throttle ops and check if we should sleep
864 		 * and then get more work to perform.
865 		 */
866 		if (--thread->throttle_cfg.ops_count == 0)
867 			worker_throttle(thread);
868 
869 	}
870 
871 	if ((ret = session->close(session, NULL)) != 0) {
872 		lprintf(wtperf, ret, 0, "Session close in worker failed");
873 		goto err;
874 	}
875 
876 	/* Notify our caller we failed and shut the system down. */
877 	if (0) {
878 err:		wtperf->error = wtperf->stop = true;
879 	}
880 	free(cursors);
881 
882 	return (WT_THREAD_RET_VALUE);
883 }
884 
885 /*
886  * run_mix_schedule_op --
887  *	Replace read operations with another operation, in the configured
888  * percentage.
889  */
890 static void
run_mix_schedule_op(WORKLOAD * workp,int op,int64_t op_cnt)891 run_mix_schedule_op(WORKLOAD *workp, int op, int64_t op_cnt)
892 {
893 	int jump, pass;
894 	uint8_t *p, *end;
895 
896 	/* Jump around the array to roughly spread out the operations. */
897 	jump = (int)(100 / op_cnt);
898 
899 	/*
900 	 * Find a read operation and replace it with another operation.  This
901 	 * is roughly n-squared, but it's an N of 100, leave it.
902 	 */
903 	p = workp->ops;
904 	end = workp->ops + sizeof(workp->ops);
905 	while (op_cnt-- > 0) {
906 		for (pass = 0; *p != WORKER_READ; ++p)
907 			if (p == end) {
908 				/*
909 				 * Passed a percentage of total operations and
910 				 * should always be a read operation to replace,
911 				 * but don't allow infinite loops.
912 				 */
913 				if (++pass > 1)
914 					return;
915 				p = workp->ops;
916 			}
917 		*p = (uint8_t)op;
918 
919 		if (end - jump < p)
920 			p = workp->ops;
921 		else
922 			p += jump;
923 	}
924 }
925 
926 /*
927  * run_mix_schedule --
928  *	Schedule the mixed-run operations.
929  */
930 static int
run_mix_schedule(WTPERF * wtperf,WORKLOAD * workp)931 run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp)
932 {
933 	CONFIG_OPTS *opts;
934 	int64_t pct;
935 
936 	opts = wtperf->opts;
937 
938 	if (workp->truncate != 0) {
939 		if (workp->insert != 0 ||
940 		    workp->read != 0 || workp->update != 0) {
941 			lprintf(wtperf, EINVAL, 0,
942 			    "Can't configure truncate in a mixed workload");
943 			return (EINVAL);
944 		}
945 		memset(workp->ops, WORKER_TRUNCATE, sizeof(workp->ops));
946 		return (0);
947 	}
948 
949 	/* Confirm reads, inserts and updates cannot all be zero. */
950 	if (workp->insert == 0 && workp->read == 0 && workp->update == 0) {
951 		lprintf(wtperf, EINVAL, 0, "no operations scheduled");
952 		return (EINVAL);
953 	}
954 
955 	/*
956 	 * Check for a simple case where the thread is only doing insert or
957 	 * update operations (because the default operation for a
958 	 * job-mix is read, the subsequent code works fine if only reads are
959 	 * specified).
960 	 */
961 	if (workp->insert != 0 && workp->read == 0 && workp->update == 0) {
962 		memset(workp->ops,
963 		    opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT,
964 		    sizeof(workp->ops));
965 		return (0);
966 	}
967 	if (workp->insert == 0 && workp->read == 0 && workp->update != 0) {
968 		memset(workp->ops, WORKER_UPDATE, sizeof(workp->ops));
969 		return (0);
970 	}
971 
972 	/*
973 	 * The worker thread configuration is done as ratios of operations.  If
974 	 * the caller gives us something insane like "reads=77,updates=23" (do
975 	 * 77 reads for every 23 updates), we don't want to do 77 reads followed
976 	 * by 23 updates, we want to uniformly distribute the read and update
977 	 * operations across the space.  Convert to percentages and then lay out
978 	 * the operations across an array.
979 	 *
980 	 * Percentage conversion is lossy, the application can do stupid stuff
981 	 * here, for example, imagine a configured ratio of "reads=1,inserts=2,
982 	 * updates=999999".  First, if the percentages are skewed enough, some
983 	 * operations might never be done.  Second, we set the base operation to
984 	 * read, which means any fractional results from percentage conversion
985 	 * will be reads, implying read operations in some cases where reads
986 	 * weren't configured.  We should be fine if the application configures
987 	 * something approaching a rational set of ratios.
988 	 */
989 	memset(workp->ops, WORKER_READ, sizeof(workp->ops));
990 
991 	pct = (workp->insert * 100) /
992 	    (workp->insert + workp->read + workp->update);
993 	if (pct != 0)
994 		run_mix_schedule_op(workp,
995 		    opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT, pct);
996 	pct = (workp->update * 100) /
997 	    (workp->insert + workp->read + workp->update);
998 	if (pct != 0)
999 		run_mix_schedule_op(workp, WORKER_UPDATE, pct);
1000 	return (0);
1001 }
1002 
1003 static WT_THREAD_RET
populate_thread(void * arg)1004 populate_thread(void *arg)
1005 {
1006 	struct timespec start, stop;
1007 	CONFIG_OPTS *opts;
1008 	TRACK *trk;
1009 	WTPERF *wtperf;
1010 	WTPERF_THREAD *thread;
1011 	WT_CONNECTION *conn;
1012 	WT_CURSOR **cursors, *cursor;
1013 	WT_SESSION *session;
1014 	size_t i;
1015 	uint64_t op, usecs;
1016 	uint32_t opcount;
1017 	int intxn, measure_latency, ret, stress_checkpoint_due;
1018 	char *value_buf, *key_buf;
1019 	const char *cursor_config;
1020 
1021 	thread = (WTPERF_THREAD *)arg;
1022 	wtperf = thread->wtperf;
1023 	opts = wtperf->opts;
1024 	conn = wtperf->conn;
1025 	session = NULL;
1026 	cursors = NULL;
1027 	ret = stress_checkpoint_due = 0;
1028 	trk = &thread->insert;
1029 
1030 	key_buf = thread->key_buf;
1031 	value_buf = thread->value_buf;
1032 
1033 	if ((ret = conn->open_session(
1034 	    conn, NULL, opts->sess_config, &session)) != 0) {
1035 		lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
1036 		goto err;
1037 	}
1038 
1039 	/* Do bulk loads if populate is single-threaded. */
1040 	cursor_config =
1041 	    (opts->populate_threads == 1 && !opts->index) ? "bulk" : NULL;
1042 	/* Create the cursors. */
1043 	cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
1044 	for (i = 0; i < opts->table_count; i++) {
1045 		if ((ret = session->open_cursor(
1046 		    session, wtperf->uris[i], NULL,
1047 		    cursor_config, &cursors[i])) != 0) {
1048 			lprintf(wtperf, ret, 0,
1049 			    "populate: WT_SESSION.open_cursor: %s",
1050 			    wtperf->uris[i]);
1051 			goto err;
1052 		}
1053 	}
1054 
1055 	/* Populate the databases. */
1056 	for (intxn = 0, opcount = 0;;) {
1057 		op = get_next_incr(wtperf);
1058 		if (op > opts->icount)
1059 			break;
1060 
1061 		if (opts->populate_ops_per_txn != 0 && !intxn) {
1062 			if ((ret = session->begin_transaction(
1063 			    session, opts->transaction_config)) != 0) {
1064 				lprintf(wtperf, ret, 0,
1065 				    "Failed starting transaction.");
1066 				goto err;
1067 			}
1068 			intxn = 1;
1069 		}
1070 		/*
1071 		 * Figure out which table this op belongs to.
1072 		 */
1073 		cursor = cursors[map_key_to_table(wtperf->opts, op)];
1074 		generate_key(opts, key_buf, op);
1075 		measure_latency =
1076 		    opts->sample_interval != 0 &&
1077 		    trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
1078 		if (measure_latency)
1079 			__wt_epoch(NULL, &start);
1080 		cursor->set_key(cursor, key_buf);
1081 		if (opts->random_value)
1082 			randomize_value(thread, value_buf);
1083 		cursor->set_value(cursor, value_buf);
1084 		if ((ret = cursor->insert(cursor)) == WT_ROLLBACK) {
1085 			lprintf(wtperf, ret, 0, "insert retrying");
1086 			if ((ret = session->rollback_transaction(
1087 			    session, NULL)) != 0) {
1088 				lprintf(wtperf, ret, 0,
1089 				    "Failed rollback_transaction");
1090 				goto err;
1091 			}
1092 			intxn = 0;
1093 			continue;
1094 		} else if (ret != 0) {
1095 			lprintf(wtperf, ret, 0, "Failed inserting");
1096 			goto err;
1097 		}
1098 		/*
1099 		 * Gather statistics.
1100 		 * We measure the latency of inserting a single key.  If there
1101 		 * are multiple tables, it is the time for insertion into all
1102 		 * of them.
1103 		 */
1104 		if (measure_latency) {
1105 			__wt_epoch(NULL, &stop);
1106 			++trk->latency_ops;
1107 			usecs = WT_TIMEDIFF_US(stop, start);
1108 			track_operation(trk, usecs);
1109 		}
1110 		++thread->insert.ops;	/* Same as trk->ops */
1111 
1112 		if (opts->checkpoint_stress_rate != 0 &&
1113 		    (op % opts->checkpoint_stress_rate) == 0)
1114 			stress_checkpoint_due = 1;
1115 
1116 		if (opts->populate_ops_per_txn != 0) {
1117 			if (++opcount < opts->populate_ops_per_txn)
1118 				continue;
1119 			opcount = 0;
1120 
1121 			if ((ret = session->commit_transaction(
1122 			    session, NULL)) != 0)
1123 				lprintf(wtperf, ret, 0,
1124 				    "Fail committing, transaction was aborted");
1125 			intxn = 0;
1126 		}
1127 
1128 		if (stress_checkpoint_due && intxn == 0) {
1129 			stress_checkpoint_due = 0;
1130 			if ((ret = session->checkpoint(session, NULL)) != 0) {
1131 				lprintf(wtperf, ret, 0, "Checkpoint failed");
1132 				goto err;
1133 			}
1134 		}
1135 	}
1136 	if (intxn &&
1137 	    (ret = session->commit_transaction(session, NULL)) != 0)
1138 		lprintf(wtperf, ret, 0,
1139 		    "Fail committing, transaction was aborted");
1140 
1141 	if ((ret = session->close(session, NULL)) != 0) {
1142 		lprintf(wtperf, ret, 0, "Error closing session in populate");
1143 		goto err;
1144 	}
1145 
1146 	/* Notify our caller we failed and shut the system down. */
1147 	if (0) {
1148 err:		wtperf->error = wtperf->stop = true;
1149 	}
1150 	free(cursors);
1151 
1152 	return (WT_THREAD_RET_VALUE);
1153 }
1154 
1155 static WT_THREAD_RET
populate_async(void * arg)1156 populate_async(void *arg)
1157 {
1158 	struct timespec start, stop;
1159 	CONFIG_OPTS *opts;
1160 	TRACK *trk;
1161 	WTPERF *wtperf;
1162 	WTPERF_THREAD *thread;
1163 	WT_ASYNC_OP *asyncop;
1164 	WT_CONNECTION *conn;
1165 	WT_SESSION *session;
1166 	uint64_t op, usecs;
1167 	int measure_latency, ret;
1168 	char *value_buf, *key_buf;
1169 
1170 	thread = (WTPERF_THREAD *)arg;
1171 	wtperf = thread->wtperf;
1172 	opts = wtperf->opts;
1173 	conn = wtperf->conn;
1174 	session = NULL;
1175 	ret = 0;
1176 	trk = &thread->insert;
1177 
1178 	key_buf = thread->key_buf;
1179 	value_buf = thread->value_buf;
1180 
1181 	if ((ret = conn->open_session(
1182 	    conn, NULL, opts->sess_config, &session)) != 0) {
1183 		lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
1184 		goto err;
1185 	}
1186 
1187 	/*
1188 	 * Measuring latency of one async op is not meaningful.  We
1189 	 * will measure the time it takes to do all of them, including
1190 	 * the time to process by workers.
1191 	 */
1192 	measure_latency =
1193 	    opts->sample_interval != 0 &&
1194 	    trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
1195 	if (measure_latency)
1196 		__wt_epoch(NULL, &start);
1197 
1198 	/* Populate the databases. */
1199 	for (;;) {
1200 		op = get_next_incr(wtperf);
1201 		if (op > opts->icount)
1202 			break;
1203 		/*
1204 		 * Allocate an async op for whichever table.
1205 		 */
1206 		while ((ret = conn->async_new_op(
1207 		    conn, wtperf->uris[map_key_to_table(wtperf->opts, op)],
1208 		    NULL, &cb, &asyncop)) == EBUSY)
1209 			(void)usleep(10000);
1210 		if (ret != 0)
1211 			goto err;
1212 
1213 		asyncop->app_private = thread;
1214 		generate_key(opts, key_buf, op);
1215 		asyncop->set_key(asyncop, key_buf);
1216 		if (opts->random_value)
1217 			randomize_value(thread, value_buf);
1218 		asyncop->set_value(asyncop, value_buf);
1219 		if ((ret = asyncop->insert(asyncop)) != 0) {
1220 			lprintf(wtperf, ret, 0, "Failed inserting");
1221 			goto err;
1222 		}
1223 	}
1224 
1225 	/*
1226 	 * Gather statistics.
1227 	 * We measure the latency of inserting a single key.  If there
1228 	 * are multiple tables, it is the time for insertion into all
1229 	 * of them.  Note that currently every populate thread will call
1230 	 * async_flush and those calls will convoy.  That is not the
1231 	 * most efficient way, but we want to flush before measuring latency.
1232 	 */
1233 	if (conn->async_flush(conn) != 0)
1234 		goto err;
1235 	if (measure_latency) {
1236 		__wt_epoch(NULL, &stop);
1237 		++trk->latency_ops;
1238 		usecs = WT_TIMEDIFF_US(stop, start);
1239 		track_operation(trk, usecs);
1240 	}
1241 	if ((ret = session->close(session, NULL)) != 0) {
1242 		lprintf(wtperf, ret, 0, "Error closing session in populate");
1243 		goto err;
1244 	}
1245 
1246 	/* Notify our caller we failed and shut the system down. */
1247 	if (0) {
1248 err:		wtperf->error = wtperf->stop = true;
1249 	}
1250 	return (WT_THREAD_RET_VALUE);
1251 }
1252 
1253 static WT_THREAD_RET
monitor(void * arg)1254 monitor(void *arg)
1255 {
1256 	struct timespec t;
1257 	struct tm localt;
1258 	CONFIG_OPTS *opts;
1259 	FILE *fp;
1260 	WTPERF *wtperf;
1261 	size_t len;
1262 	uint64_t min_thr, reads, inserts, updates;
1263 	uint64_t cur_reads, cur_inserts, cur_updates;
1264 	uint64_t last_reads, last_inserts, last_updates;
1265 	uint32_t read_avg, read_min, read_max;
1266 	uint32_t insert_avg, insert_min, insert_max;
1267 	uint32_t update_avg, update_min, update_max;
1268 	uint32_t latency_max, level;
1269 	u_int i;
1270 	int msg_err;
1271 	const char *str;
1272 	char buf[64], *path;
1273 
1274 	wtperf = (WTPERF *)arg;
1275 	opts = wtperf->opts;
1276 	assert(opts->sample_interval != 0);
1277 
1278 	fp = NULL;
1279 	path = NULL;
1280 
1281 	min_thr = (uint64_t)opts->min_throughput;
1282 	latency_max = (uint32_t)ms_to_us(opts->max_latency);
1283 
1284 	/* Open the logging file. */
1285 	len = strlen(wtperf->monitor_dir) + 100;
1286 	path = dmalloc(len);
1287 	testutil_check(__wt_snprintf(
1288 	    path, len, "%s/monitor", wtperf->monitor_dir));
1289 	if ((fp = fopen(path, "w")) == NULL) {
1290 		lprintf(wtperf, errno, 0, "%s", path);
1291 		goto err;
1292 	}
1293 	/* Set line buffering for monitor file. */
1294 	__wt_stream_set_line_buffer(fp);
1295 	fprintf(fp,
1296 	    "#time,"
1297 	    "totalsec,"
1298 	    "read ops per second,"
1299 	    "insert ops per second,"
1300 	    "update ops per second,"
1301 	    "checkpoints,"
1302 	    "read average latency(uS),"
1303 	    "read minimum latency(uS),"
1304 	    "read maximum latency(uS),"
1305 	    "insert average latency(uS),"
1306 	    "insert min latency(uS),"
1307 	    "insert maximum latency(uS),"
1308 	    "update average latency(uS),"
1309 	    "update min latency(uS),"
1310 	    "update maximum latency(uS)"
1311 	    "\n");
1312 	last_reads = last_inserts = last_updates = 0;
1313 	while (!wtperf->stop) {
1314 		for (i = 0; i < opts->sample_interval; i++) {
1315 			sleep(1);
1316 			if (wtperf->stop)
1317 				break;
1318 		}
1319 		/* If the workers are done, don't bother with a final call. */
1320 		if (wtperf->stop)
1321 			break;
1322 		if (wtperf->in_warmup)
1323 			continue;
1324 
1325 		__wt_epoch(NULL, &t);
1326 		testutil_check(__wt_localtime(NULL, &t.tv_sec, &localt));
1327 		testutil_assert(
1328 		    strftime(buf, sizeof(buf), "%b %d %H:%M:%S", &localt) != 0);
1329 
1330 		reads = sum_read_ops(wtperf);
1331 		inserts = sum_insert_ops(wtperf);
1332 		updates = sum_update_ops(wtperf);
1333 		latency_read(wtperf, &read_avg, &read_min, &read_max);
1334 		latency_insert(wtperf, &insert_avg, &insert_min, &insert_max);
1335 		latency_update(wtperf, &update_avg, &update_min, &update_max);
1336 
1337 		cur_reads = (reads - last_reads) / opts->sample_interval;
1338 		cur_updates = (updates - last_updates) / opts->sample_interval;
1339 		/*
1340 		 * For now the only item we need to worry about changing is
1341 		 * inserts when we transition from the populate phase to
1342 		 * workload phase.
1343 		 */
1344 		if (inserts < last_inserts)
1345 			cur_inserts = 0;
1346 		else
1347 			cur_inserts =
1348 			    (inserts - last_inserts) / opts->sample_interval;
1349 
1350 		(void)fprintf(fp,
1351 		    "%s,%" PRIu32
1352 		    ",%" PRIu64 ",%" PRIu64 ",%" PRIu64
1353 		    ",%c"
1354 		    ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
1355 		    ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
1356 		    ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
1357 		    "\n",
1358 		    buf, wtperf->totalsec,
1359 		    cur_reads, cur_inserts, cur_updates,
1360 		    wtperf->ckpt ? 'Y' : 'N',
1361 		    read_avg, read_min, read_max,
1362 		    insert_avg, insert_min, insert_max,
1363 		    update_avg, update_min, update_max);
1364 
1365 		if (latency_max != 0 &&
1366 		    (read_max > latency_max || insert_max > latency_max ||
1367 		     update_max > latency_max)) {
1368 			if (opts->max_latency_fatal) {
1369 				level = 1;
1370 				msg_err = WT_PANIC;
1371 				str = "ERROR";
1372 			} else {
1373 				level = 0;
1374 				msg_err = 0;
1375 				str = "WARNING";
1376 			}
1377 			lprintf(wtperf, msg_err, level,
1378 			    "%s: max latency exceeded: threshold %" PRIu32
1379 			    " read max %" PRIu32 " insert max %" PRIu32
1380 			    " update max %" PRIu32, str, latency_max,
1381 			    read_max, insert_max, update_max);
1382 		}
1383 		if (min_thr != 0 &&
1384 		    ((cur_reads != 0 && cur_reads < min_thr) ||
1385 		    (cur_inserts != 0 && cur_inserts < min_thr) ||
1386 		    (cur_updates != 0 && cur_updates < min_thr))) {
1387 			if (opts->min_throughput_fatal) {
1388 				level = 1;
1389 				msg_err = WT_PANIC;
1390 				str = "ERROR";
1391 			} else {
1392 				level = 0;
1393 				msg_err = 0;
1394 				str = "WARNING";
1395 			}
1396 			lprintf(wtperf, msg_err, level,
1397 			    "%s: minimum throughput not met: threshold %" PRIu64
1398 			    " reads %" PRIu64 " inserts %" PRIu64
1399 			    " updates %" PRIu64, str, min_thr, cur_reads,
1400 			    cur_inserts, cur_updates);
1401 		}
1402 		last_reads = reads;
1403 		last_inserts = inserts;
1404 		last_updates = updates;
1405 	}
1406 
1407 	/* Notify our caller we failed and shut the system down. */
1408 	if (0) {
1409 err:		wtperf->error = wtperf->stop = true;
1410 	}
1411 
1412 	if (fp != NULL)
1413 		(void)fclose(fp);
1414 	free(path);
1415 
1416 	return (WT_THREAD_RET_VALUE);
1417 }
1418 
1419 static WT_THREAD_RET
checkpoint_worker(void * arg)1420 checkpoint_worker(void *arg)
1421 {
1422 	CONFIG_OPTS *opts;
1423 	WTPERF *wtperf;
1424 	WTPERF_THREAD *thread;
1425 	WT_CONNECTION *conn;
1426 	WT_SESSION *session;
1427 	struct timespec e, s;
1428 	uint32_t i;
1429 	int ret;
1430 
1431 	thread = (WTPERF_THREAD *)arg;
1432 	wtperf = thread->wtperf;
1433 	opts = wtperf->opts;
1434 	conn = wtperf->conn;
1435 	session = NULL;
1436 
1437 	if ((ret = conn->open_session(
1438 	    conn, NULL, opts->sess_config, &session)) != 0) {
1439 		lprintf(wtperf, ret, 0,
1440 		    "open_session failed in checkpoint thread.");
1441 		goto err;
1442 	}
1443 
1444 	while (!wtperf->stop) {
1445 		/* Break the sleep up, so we notice interrupts faster. */
1446 		for (i = 0; i < opts->checkpoint_interval; i++) {
1447 			sleep(1);
1448 			if (wtperf->stop)
1449 				break;
1450 		}
1451 		/* If the workers are done, don't bother with a final call. */
1452 		if (wtperf->stop)
1453 			break;
1454 
1455 		__wt_epoch(NULL, &s);
1456 
1457 		wtperf->ckpt = true;
1458 		if ((ret = session->checkpoint(session, NULL)) != 0) {
1459 			lprintf(wtperf, ret, 0, "Checkpoint failed.");
1460 			goto err;
1461 		}
1462 		wtperf->ckpt = false;
1463 		++thread->ckpt.ops;
1464 
1465 		__wt_epoch(NULL, &e);
1466 	}
1467 
1468 	if (session != NULL &&
1469 	    ((ret = session->close(session, NULL)) != 0)) {
1470 		lprintf(wtperf, ret, 0,
1471 		    "Error closing session in checkpoint worker.");
1472 		goto err;
1473 	}
1474 
1475 	/* Notify our caller we failed and shut the system down. */
1476 	if (0) {
1477 err:		wtperf->error = wtperf->stop = true;
1478 	}
1479 
1480 	return (WT_THREAD_RET_VALUE);
1481 }
1482 
1483 static int
execute_populate(WTPERF * wtperf)1484 execute_populate(WTPERF *wtperf)
1485 {
1486 	struct timespec start, stop;
1487 	CONFIG_OPTS *opts;
1488 	WT_ASYNC_OP *asyncop;
1489 	WTPERF_THREAD *popth;
1490 	WT_THREAD_CALLBACK(*pfunc)(void *);
1491 	size_t i;
1492 	uint64_t last_ops, msecs, print_ops_sec;
1493 	uint32_t interval, tables;
1494 	wt_thread_t idle_table_cycle_thread;
1495 	double print_secs;
1496 	int elapsed, ret;
1497 
1498 	opts = wtperf->opts;
1499 
1500 	lprintf(wtperf, 0, 1,
1501 	    "Starting %" PRIu32
1502 	    " populate thread(s) for %" PRIu32 " items",
1503 	    opts->populate_threads, opts->icount);
1504 
1505 	/* Start cycling idle tables if configured. */
1506 	start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
1507 
1508 	wtperf->insert_key = 0;
1509 
1510 	wtperf->popthreads =
1511 	    dcalloc(opts->populate_threads, sizeof(WTPERF_THREAD));
1512 	if (wtperf->use_asyncops) {
1513 		lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)",
1514 		    opts->async_threads);
1515 		pfunc = populate_async;
1516 	} else
1517 		pfunc = populate_thread;
1518 	start_threads(wtperf, NULL,
1519 	    wtperf->popthreads, opts->populate_threads, pfunc);
1520 
1521 	__wt_epoch(NULL, &start);
1522 	for (elapsed = 0, interval = 0, last_ops = 0;
1523 	    wtperf->insert_key < opts->icount && !wtperf->error;) {
1524 		/*
1525 		 * Sleep for 100th of a second, report_interval is in second
1526 		 * granularity, each 100th increment of elapsed is a single
1527 		 * increment of interval.
1528 		 */
1529 		(void)usleep(10000);
1530 		if (opts->report_interval == 0 || ++elapsed < 100)
1531 			continue;
1532 		elapsed = 0;
1533 		if (++interval < opts->report_interval)
1534 			continue;
1535 		interval = 0;
1536 		wtperf->totalsec += opts->report_interval;
1537 		wtperf->insert_ops = sum_pop_ops(wtperf);
1538 		lprintf(wtperf, 0, 1,
1539 		    "%" PRIu64 " populate inserts (%" PRIu64 " of %"
1540 		    PRIu32 ") in %" PRIu32 " secs (%" PRIu32 " total secs)",
1541 		    wtperf->insert_ops - last_ops, wtperf->insert_ops,
1542 		    opts->icount, opts->report_interval, wtperf->totalsec);
1543 		last_ops = wtperf->insert_ops;
1544 	}
1545 	__wt_epoch(NULL, &stop);
1546 
1547 	/*
1548 	 * Move popthreads aside to narrow possible race with the monitor
1549 	 * thread. The latency tracking code also requires that popthreads be
1550 	 * NULL when the populate phase is finished, to know that the workload
1551 	 * phase has started.
1552 	 */
1553 	popth = wtperf->popthreads;
1554 	wtperf->popthreads = NULL;
1555 	stop_threads(opts->populate_threads, popth);
1556 	free(popth);
1557 
1558 	/* Report if any worker threads didn't finish. */
1559 	if (wtperf->error) {
1560 		lprintf(wtperf, WT_ERROR, 0,
1561 		    "Populate thread(s) exited without finishing.");
1562 		return (WT_ERROR);
1563 	}
1564 
1565 	lprintf(wtperf,
1566 	    0, 1, "Finished load of %" PRIu32 " items", opts->icount);
1567 	msecs = WT_TIMEDIFF_MS(stop, start);
1568 
1569 	/*
1570 	 * This is needed as the divisions will fail if the insert takes no time
1571 	 * which will only be the case when there is no data to insert.
1572 	 */
1573 	if (msecs == 0) {
1574 		print_secs = 0;
1575 		print_ops_sec = 0;
1576 	} else {
1577 		print_secs = (double)msecs / (double)MSEC_PER_SEC;
1578 		print_ops_sec = (uint64_t)(opts->icount / print_secs);
1579 	}
1580 	lprintf(wtperf, 0, 1,
1581 	    "Load time: %.2f\n" "load ops/sec: %" PRIu64,
1582 	    print_secs, print_ops_sec);
1583 
1584 	/*
1585 	 * If configured, compact to allow LSM merging to complete.  We
1586 	 * set an unlimited timeout because if we close the connection
1587 	 * then any in-progress compact/merge is aborted.
1588 	 */
1589 	if (opts->compact) {
1590 		assert(opts->async_threads > 0);
1591 		lprintf(wtperf, 0, 1, "Compact after populate");
1592 		__wt_epoch(NULL, &start);
1593 		tables = opts->table_count;
1594 		for (i = 0; i < opts->table_count; i++) {
1595 			/*
1596 			 * If no ops are available, retry.  Any other error,
1597 			 * return.
1598 			 */
1599 			 while ((ret = wtperf->conn->async_new_op(
1600 			     wtperf->conn, wtperf->uris[i],
1601 			     "timeout=0", &cb, &asyncop)) == EBUSY)
1602 				(void)usleep(10000);
1603 			if (ret != 0)
1604 				return (ret);
1605 
1606 			asyncop->app_private = &tables;
1607 			if ((ret = asyncop->compact(asyncop)) != 0) {
1608 				lprintf(wtperf,
1609 				    ret, 0, "Async compact failed.");
1610 				return (ret);
1611 			}
1612 		}
1613 		if ((ret = wtperf->conn->async_flush(wtperf->conn)) != 0) {
1614 			lprintf(wtperf, ret, 0, "Populate async flush failed.");
1615 			return (ret);
1616 		}
1617 		__wt_epoch(NULL, &stop);
1618 		lprintf(wtperf, 0, 1,
1619 		    "Compact completed in %" PRIu64 " seconds",
1620 		    (uint64_t)(WT_TIMEDIFF_SEC(stop, start)));
1621 		assert(tables == 0);
1622 	}
1623 
1624 	/* Stop cycling idle tables. */
1625 	stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
1626 
1627 	return (0);
1628 }
1629 
1630 static int
close_reopen(WTPERF * wtperf)1631 close_reopen(WTPERF *wtperf)
1632 {
1633 	CONFIG_OPTS *opts;
1634 	int ret;
1635 
1636 	opts = wtperf->opts;
1637 
1638 	if (opts->in_memory)
1639 		return (0);
1640 
1641 	if (!opts->readonly && !opts->reopen_connection)
1642 		return (0);
1643 	/*
1644 	 * Reopen the connection.  We do this so that the workload phase always
1645 	 * starts with the on-disk files, and so that read-only workloads can
1646 	 * be identified.  This is particularly important for LSM, where the
1647 	 * merge algorithm is more aggressive for read-only trees.
1648 	 */
1649 	/* wtperf->conn is released no matter the return value from close(). */
1650 	ret = wtperf->conn->close(wtperf->conn, NULL);
1651 	wtperf->conn = NULL;
1652 	if (ret != 0) {
1653 		lprintf(wtperf, ret, 0, "Closing the connection failed");
1654 		return (ret);
1655 	}
1656 	if ((ret = wiredtiger_open(
1657 	    wtperf->home, NULL, wtperf->reopen_config, &wtperf->conn)) != 0) {
1658 		lprintf(wtperf, ret, 0, "Re-opening the connection failed");
1659 		return (ret);
1660 	}
1661 	/*
1662 	 * If we started async threads only for the purposes of compact,
1663 	 * then turn it off before starting the workload so that those extra
1664 	 * threads looking for work that will never arrive don't affect
1665 	 * performance.
1666 	 */
1667 	if (opts->compact && !wtperf->use_asyncops) {
1668 		if ((ret = wtperf->conn->reconfigure(
1669 		    wtperf->conn, "async=(enabled=false)")) != 0) {
1670 			lprintf(wtperf, ret, 0, "Reconfigure async off failed");
1671 			return (ret);
1672 		}
1673 	}
1674 	return (0);
1675 }
1676 
1677 static int
execute_workload(WTPERF * wtperf)1678 execute_workload(WTPERF *wtperf)
1679 {
1680 	CONFIG_OPTS *opts;
1681 	WORKLOAD *workp;
1682 	WTPERF_THREAD *threads;
1683 	WT_CONNECTION *conn;
1684 	WT_SESSION **sessions;
1685 	WT_THREAD_CALLBACK(*pfunc)(void *);
1686 	wt_thread_t idle_table_cycle_thread;
1687 	uint64_t last_ckpts, last_inserts, last_reads, last_truncates;
1688 	uint64_t last_updates;
1689 	uint32_t interval, run_ops, run_time;
1690 	u_int i;
1691 	int ret;
1692 
1693 	opts = wtperf->opts;
1694 
1695 	wtperf->insert_key = 0;
1696 	wtperf->insert_ops = wtperf->read_ops = wtperf->truncate_ops = 0;
1697 	wtperf->update_ops = 0;
1698 
1699 	last_ckpts = last_inserts = last_reads = last_truncates = 0;
1700 	last_updates = 0;
1701 	ret = 0;
1702 
1703 	sessions = NULL;
1704 
1705 	/* Start cycling idle tables. */
1706 	start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
1707 
1708 	if (opts->warmup != 0)
1709 		wtperf->in_warmup = true;
1710 
1711 	/* Allocate memory for the worker threads. */
1712 	wtperf->workers =
1713 	    dcalloc((size_t)wtperf->workers_cnt, sizeof(WTPERF_THREAD));
1714 
1715 	if (wtperf->use_asyncops) {
1716 		lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)",
1717 		    opts->async_threads);
1718 		pfunc = worker_async;
1719 	} else
1720 		pfunc = worker;
1721 
1722 	if (opts->session_count_idle != 0) {
1723 		sessions = dcalloc((size_t)opts->session_count_idle,
1724 		    sizeof(WT_SESSION *));
1725 		conn = wtperf->conn;
1726 		for (i = 0; i < opts->session_count_idle; ++i)
1727 			if ((ret = conn->open_session(conn,
1728 			    NULL, opts->sess_config, &sessions[i])) != 0) {
1729 				lprintf(wtperf, ret, 0,
1730 				    "execute_workload: idle open_session");
1731 				goto err;
1732 			}
1733 	}
1734 	/* Start each workload. */
1735 	for (threads = wtperf->workers, i = 0,
1736 	    workp = wtperf->workload; i < wtperf->workload_cnt; ++i, ++workp) {
1737 		lprintf(wtperf, 0, 1,
1738 		    "Starting workload #%u: %" PRId64 " threads, inserts=%"
1739 		    PRId64 ", reads=%" PRId64 ", updates=%" PRId64
1740 		    ", truncate=%" PRId64 ", throttle=%" PRIu64,
1741 		    i + 1, workp->threads, workp->insert,
1742 		    workp->read, workp->update, workp->truncate,
1743 		    workp->throttle);
1744 
1745 		/* Figure out the workload's schedule. */
1746 		if ((ret = run_mix_schedule(wtperf, workp)) != 0)
1747 			goto err;
1748 
1749 		/* Start the workload's threads. */
1750 		start_threads(
1751 		    wtperf, workp, threads, (u_int)workp->threads, pfunc);
1752 		threads += workp->threads;
1753 	}
1754 
1755 	if (opts->warmup != 0) {
1756 		lprintf(wtperf, 0, 1,
1757 		    "Waiting for warmup duration of %" PRIu32, opts->warmup);
1758 		sleep(opts->warmup);
1759 		wtperf->in_warmup = false;
1760 	}
1761 
1762 	for (interval = opts->report_interval,
1763 	    run_time = opts->run_time, run_ops = opts->run_ops;
1764 	    !wtperf->error;) {
1765 		/*
1766 		 * Sleep for one second at a time.
1767 		 * If we are tracking run time, check to see if we're done, and
1768 		 * if we're only tracking run time, go back to sleep.
1769 		 */
1770 		sleep(1);
1771 		if (run_time != 0) {
1772 			if (--run_time == 0)
1773 				break;
1774 			if (!interval && !run_ops)
1775 				continue;
1776 		}
1777 
1778 		/* Sum the operations we've done. */
1779 		wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
1780 		wtperf->insert_ops = sum_insert_ops(wtperf);
1781 		wtperf->read_ops = sum_read_ops(wtperf);
1782 		wtperf->update_ops = sum_update_ops(wtperf);
1783 		wtperf->truncate_ops = sum_truncate_ops(wtperf);
1784 
1785 		/* If we're checking total operations, see if we're done. */
1786 		if (run_ops != 0 && run_ops <=
1787 		    wtperf->insert_ops + wtperf->read_ops + wtperf->update_ops)
1788 			break;
1789 
1790 		/* If writing out throughput information, see if it's time. */
1791 		if (interval == 0 || --interval > 0)
1792 			continue;
1793 		interval = opts->report_interval;
1794 		wtperf->totalsec += opts->report_interval;
1795 
1796 		lprintf(wtperf, 0, 1,
1797 		    "%" PRIu64 " reads, %" PRIu64 " inserts, %" PRIu64
1798 		    " updates, %" PRIu64 " truncates, %" PRIu64
1799 		    " checkpoints in %" PRIu32 " secs (%" PRIu32 " total secs)",
1800 		    wtperf->read_ops - last_reads,
1801 		    wtperf->insert_ops - last_inserts,
1802 		    wtperf->update_ops - last_updates,
1803 		    wtperf->truncate_ops - last_truncates,
1804 		    wtperf->ckpt_ops - last_ckpts,
1805 		    opts->report_interval, wtperf->totalsec);
1806 		last_reads = wtperf->read_ops;
1807 		last_inserts = wtperf->insert_ops;
1808 		last_updates = wtperf->update_ops;
1809 		last_truncates = wtperf->truncate_ops;
1810 		last_ckpts = wtperf->ckpt_ops;
1811 	}
1812 
1813 	/* Notify the worker threads they are done. */
1814 err:	wtperf->stop = true;
1815 
1816 	/* Stop cycling idle tables. */
1817 	stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
1818 
1819 	stop_threads((u_int)wtperf->workers_cnt, wtperf->workers);
1820 
1821 	/* Drop tables if configured to and this isn't an error path */
1822 	if (ret == 0 &&
1823 	    opts->drop_tables && (ret = drop_all_tables(wtperf)) != 0)
1824 		lprintf(wtperf, ret, 0, "Drop tables failed.");
1825 
1826 	free(sessions);
1827 	/* Report if any worker threads didn't finish. */
1828 	if (wtperf->error) {
1829 		lprintf(wtperf, WT_ERROR, 0,
1830 		    "Worker thread(s) exited without finishing.");
1831 		if (ret == 0)
1832 			ret = WT_ERROR;
1833 	}
1834 	return (ret);
1835 }
1836 
1837 /*
1838  * Ensure that icount matches the number of records in the
1839  * existing table.
1840  */
1841 static int
find_table_count(WTPERF * wtperf)1842 find_table_count(WTPERF *wtperf)
1843 {
1844 	CONFIG_OPTS *opts;
1845 	WT_CONNECTION *conn;
1846 	WT_CURSOR *cursor;
1847 	WT_SESSION *session;
1848 	uint32_t i, max_icount, table_icount;
1849 	int ret, t_ret;
1850 	char *key;
1851 
1852 	opts = wtperf->opts;
1853 	conn = wtperf->conn;
1854 
1855 	max_icount = 0;
1856 	if ((ret = conn->open_session(
1857 	    conn, NULL, opts->sess_config, &session)) != 0) {
1858 		lprintf(wtperf, ret, 0,
1859 		    "find_table_count: open_session failed");
1860 		goto out;
1861 	}
1862 	for (i = 0; i < opts->table_count; i++) {
1863 		if ((ret = session->open_cursor(session, wtperf->uris[i],
1864 		    NULL, NULL, &cursor)) != 0) {
1865 			lprintf(wtperf, ret, 0,
1866 			    "find_table_count: open_cursor failed");
1867 			goto err;
1868 		}
1869 		if ((ret = cursor->prev(cursor)) != 0) {
1870 			lprintf(wtperf, ret, 0,
1871 			    "find_table_count: cursor prev failed");
1872 			goto err;
1873 		}
1874 		if ((ret = cursor->get_key(cursor, &key)) != 0) {
1875 			lprintf(wtperf, ret, 0,
1876 			    "find_table_count: cursor get_key failed");
1877 			goto err;
1878 		}
1879 		table_icount = (uint32_t)atoi(key);
1880 		if (table_icount > max_icount)
1881 			max_icount = table_icount;
1882 
1883 		if ((ret = cursor->close(cursor)) != 0) {
1884 			lprintf(wtperf, ret, 0,
1885 			    "find_table_count: cursor close failed");
1886 			goto err;
1887 		}
1888 	}
1889 err:	if ((t_ret = session->close(session, NULL)) != 0) {
1890 		if (ret == 0)
1891 			ret = t_ret;
1892 		lprintf(wtperf, ret, 0,
1893 		    "find_table_count: session close failed");
1894 	}
1895 	opts->icount = max_icount;
1896 out:	return (ret);
1897 }
1898 
1899 /*
1900  * Populate the uri array.
1901  */
1902 static void
create_uris(WTPERF * wtperf)1903 create_uris(WTPERF *wtperf)
1904 {
1905 	CONFIG_OPTS *opts;
1906 	size_t len;
1907 	uint32_t i;
1908 
1909 	opts = wtperf->opts;
1910 
1911 	wtperf->uris = dcalloc(opts->table_count, sizeof(char *));
1912 	len = strlen("table:") + strlen(opts->table_name) + 20;
1913 	for (i = 0; i < opts->table_count; i++) {
1914 		/* If there is only one table, just use the base name. */
1915 		wtperf->uris[i] = dmalloc(len);
1916 		if (opts->table_count == 1)
1917 			testutil_check(__wt_snprintf(wtperf->uris[i],
1918 			    len, "table:%s", opts->table_name));
1919 		else
1920 			testutil_check(__wt_snprintf(wtperf->uris[i],
1921 			    len, "table:%s%05d", opts->table_name, i));
1922 	}
1923 
1924 	/* Create the log-like-table URI. */
1925 	len = strlen("table:") +
1926 	    strlen(opts->table_name) + strlen("_log_table") + 1;
1927 	wtperf->log_table_uri = dmalloc(len);
1928 	testutil_check(__wt_snprintf(wtperf->log_table_uri,
1929 	    len, "table:%s_log_table", opts->table_name));
1930 }
1931 
1932 static int
create_tables(WTPERF * wtperf)1933 create_tables(WTPERF *wtperf)
1934 {
1935 	CONFIG_OPTS *opts;
1936 	WT_SESSION *session;
1937 	size_t i;
1938 	int ret;
1939 	char buf[512];
1940 
1941 	opts = wtperf->opts;
1942 
1943 	if ((ret = wtperf->conn->open_session(
1944 	    wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
1945 		lprintf(wtperf, ret, 0,
1946 		    "Error opening a session on %s", wtperf->home);
1947 		return (ret);
1948 	}
1949 
1950 	for (i = 0; i < opts->table_count_idle; i++) {
1951 		testutil_check(__wt_snprintf(
1952 		    buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i));
1953 		if ((ret = session->create(
1954 		    session, buf, opts->table_config)) != 0) {
1955 			lprintf(wtperf, ret, 0,
1956 			    "Error creating idle table %s", buf);
1957 			return (ret);
1958 		}
1959 	}
1960 	if (opts->log_like_table && (ret = session->create(session,
1961 	    wtperf->log_table_uri, "key_format=Q,value_format=S")) != 0) {
1962 		lprintf(wtperf, ret, 0, "Error creating log table %s", buf);
1963 		return (ret);
1964 	}
1965 
1966 	for (i = 0; i < opts->table_count; i++) {
1967 		if (opts->log_partial && i > 0) {
1968 			if (((ret = session->create(session,
1969 			    wtperf->uris[i], wtperf->partial_config)) != 0)) {
1970 				lprintf(wtperf, ret, 0,
1971 				    "Error creating table %s", wtperf->uris[i]);
1972 				return (ret);
1973 			}
1974 		} else if ((ret = session->create(
1975 		    session, wtperf->uris[i], opts->table_config)) != 0) {
1976 			lprintf(wtperf, ret, 0,
1977 			    "Error creating table %s", wtperf->uris[i]);
1978 			return (ret);
1979 		}
1980 		if (opts->index) {
1981 			testutil_check(__wt_snprintf(buf, 512,
1982 			    "index:%s:val_idx",
1983 			    wtperf->uris[i] + strlen("table:")));
1984 			if ((ret = session->create(
1985 			    session, buf, "columns=(val)")) != 0) {
1986 				lprintf(wtperf, ret, 0,
1987 				    "Error creating index %s", buf);
1988 				return (ret);
1989 			}
1990 		}
1991 	}
1992 
1993 	if ((ret = session->close(session, NULL)) != 0) {
1994 		lprintf(wtperf, ret, 0, "Error closing session");
1995 		return (ret);
1996 	}
1997 
1998 	return (0);
1999 }
2000 
2001 /*
2002  * wtperf_copy --
2003  *	Create a new WTPERF structure as a duplicate of a previous one.
2004  */
2005 static void
wtperf_copy(const WTPERF * src,WTPERF ** retp)2006 wtperf_copy(const WTPERF *src, WTPERF **retp)
2007 {
2008 	CONFIG_OPTS *opts;
2009 	WTPERF *dest;
2010 	size_t i;
2011 
2012 	opts = src->opts;
2013 
2014 	dest = dcalloc(1, sizeof(WTPERF));
2015 
2016 	/*
2017 	 * Don't copy the home and monitor directories, they are filled in by
2018 	 * our caller, explicitly.
2019 	 */
2020 
2021 	if (src->partial_config != NULL)
2022 		dest->partial_config = dstrdup(src->partial_config);
2023 	if (src->reopen_config != NULL)
2024 		dest->reopen_config = dstrdup(src->reopen_config);
2025 
2026 	if (src->uris != NULL) {
2027 		dest->uris = dcalloc(opts->table_count, sizeof(char *));
2028 		for (i = 0; i < opts->table_count; i++)
2029 			dest->uris[i] = dstrdup(src->uris[i]);
2030 	}
2031 
2032 	if (src->async_config != NULL)
2033 		dest->async_config = dstrdup(src->async_config);
2034 
2035 	dest->ckptthreads = NULL;
2036 	dest->popthreads = NULL;
2037 
2038 	dest->workers = NULL;
2039 	dest->workers_cnt = src->workers_cnt;
2040 	if (src->workload_cnt != 0) {
2041 		dest->workload_cnt = src->workload_cnt;
2042 		dest->workload = dcalloc(src->workload_cnt, sizeof(WORKLOAD));
2043 		memcpy(dest->workload,
2044 		    src->workload, src->workload_cnt * sizeof(WORKLOAD));
2045 	}
2046 
2047 	TAILQ_INIT(&dest->stone_head);
2048 
2049 	dest->opts = src->opts;
2050 
2051 	*retp = dest;
2052 }
2053 
2054 /*
2055  * wtperf_free --
2056  *	Free any storage allocated in the WTPERF structure.
2057  */
2058 static void
wtperf_free(WTPERF * wtperf)2059 wtperf_free(WTPERF *wtperf)
2060 {
2061 	CONFIG_OPTS *opts;
2062 	size_t i;
2063 
2064 	opts = wtperf->opts;
2065 
2066 	free(wtperf->home);
2067 	free(wtperf->monitor_dir);
2068 	free(wtperf->partial_config);
2069 	free(wtperf->reopen_config);
2070 	free(wtperf->log_table_uri);
2071 
2072 	if (wtperf->uris != NULL) {
2073 		for (i = 0; i < opts->table_count; i++)
2074 			free(wtperf->uris[i]);
2075 		free(wtperf->uris);
2076 	}
2077 
2078 	free(wtperf->async_config);
2079 
2080 	free(wtperf->ckptthreads);
2081 	free(wtperf->popthreads);
2082 
2083 	free(wtperf->workers);
2084 	free(wtperf->workload);
2085 
2086 	cleanup_truncate_config(wtperf);
2087 }
2088 
2089 /*
2090  * config_compress --
2091  *	Parse the compression configuration.
2092  */
2093 static int
config_compress(WTPERF * wtperf)2094 config_compress(WTPERF *wtperf)
2095 {
2096 	CONFIG_OPTS *opts;
2097 	int ret;
2098 	const char *s;
2099 
2100 	opts = wtperf->opts;
2101 	ret = 0;
2102 
2103 	s = opts->compression;
2104 	if (strcmp(s, "none") == 0) {
2105 		wtperf->compress_ext = NULL;
2106 		wtperf->compress_table = NULL;
2107 	} else if (strcmp(s, "lz4") == 0) {
2108 #ifndef HAVE_BUILTIN_EXTENSION_LZ4
2109 		wtperf->compress_ext = LZ4_EXT;
2110 #endif
2111 		wtperf->compress_table = LZ4_BLK;
2112 	} else if (strcmp(s, "snappy") == 0) {
2113 #ifndef HAVE_BUILTIN_EXTENSION_SNAPPY
2114 		wtperf->compress_ext = SNAPPY_EXT;
2115 #endif
2116 		wtperf->compress_table = SNAPPY_BLK;
2117 	} else if (strcmp(s, "zlib") == 0) {
2118 #ifndef HAVE_BUILTIN_EXTENSION_ZLIB
2119 		wtperf->compress_ext = ZLIB_EXT;
2120 #endif
2121 		wtperf->compress_table = ZLIB_BLK;
2122 	} else if (strcmp(s, "zstd") == 0) {
2123 #ifndef HAVE_BUILTIN_EXTENSION_ZSTD
2124 		wtperf->compress_ext = ZSTD_EXT;
2125 #endif
2126 		wtperf->compress_table = ZSTD_BLK;
2127 	} else {
2128 		fprintf(stderr,
2129 	    "invalid compression configuration: %s\n", s);
2130 		ret = EINVAL;
2131 	}
2132 	return (ret);
2133 
2134 }
2135 
2136 static int
start_all_runs(WTPERF * wtperf)2137 start_all_runs(WTPERF *wtperf)
2138 {
2139 	CONFIG_OPTS *opts;
2140 	WTPERF *next_wtperf, **wtperfs;
2141 	size_t i, len;
2142 	wt_thread_t *threads;
2143 	int ret;
2144 
2145 	opts = wtperf->opts;
2146 	wtperfs = NULL;
2147 	ret = 0;
2148 
2149 	if (opts->database_count == 1)
2150 		return (start_run(wtperf));
2151 
2152 	/* Allocate an array to hold our WTPERF copies. */
2153 	wtperfs = dcalloc(opts->database_count, sizeof(WTPERF *));
2154 
2155 	/* Allocate an array to hold our thread IDs. */
2156 	threads = dcalloc(opts->database_count, sizeof(*threads));
2157 
2158 	for (i = 0; i < opts->database_count; i++) {
2159 		wtperf_copy(wtperf, &next_wtperf);
2160 		wtperfs[i] = next_wtperf;
2161 
2162 		/*
2163 		 * Set up unique home/monitor directories for each database.
2164 		 * Re-create the directories if creating the databases.
2165 		 */
2166 		len = strlen(wtperf->home) + 5;
2167 		next_wtperf->home = dmalloc(len);
2168 		testutil_check(__wt_snprintf(
2169 		    next_wtperf->home, len, "%s/D%02d", wtperf->home, (int)i));
2170 		if (opts->create != 0)
2171 			recreate_dir(next_wtperf->home);
2172 
2173 		len = strlen(wtperf->monitor_dir) + 5;
2174 		next_wtperf->monitor_dir = dmalloc(len);
2175 		testutil_check(__wt_snprintf(next_wtperf->monitor_dir,
2176 		    len, "%s/D%02d", wtperf->monitor_dir, (int)i));
2177 		if (opts->create != 0 &&
2178 		    strcmp(next_wtperf->home, next_wtperf->monitor_dir) != 0)
2179 			recreate_dir(next_wtperf->monitor_dir);
2180 
2181 		testutil_check(__wt_thread_create(NULL,
2182 		    &threads[i], thread_run_wtperf, next_wtperf));
2183 	}
2184 
2185 	/* Wait for threads to finish. */
2186 	for (i = 0; i < opts->database_count; i++)
2187 		testutil_check(__wt_thread_join(NULL, &threads[i]));
2188 
2189 	for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) {
2190 		wtperf_free(wtperfs[i]);
2191 		free(wtperfs[i]);
2192 	}
2193 	free(wtperfs);
2194 	free(threads);
2195 
2196 	return (ret);
2197 }
2198 
2199 /* Run an instance of wtperf for a given configuration. */
2200 static WT_THREAD_RET
thread_run_wtperf(void * arg)2201 thread_run_wtperf(void *arg)
2202 {
2203 	WTPERF *wtperf;
2204 	int ret;
2205 
2206 	wtperf = (WTPERF *)arg;
2207 	if ((ret = start_run(wtperf)) != 0)
2208 		lprintf(wtperf, ret, 0, "Run failed for: %s.", wtperf->home);
2209 	return (WT_THREAD_RET_VALUE);
2210 }
2211 
2212 static int
start_run(WTPERF * wtperf)2213 start_run(WTPERF *wtperf)
2214 {
2215 	CONFIG_OPTS *opts;
2216 	wt_thread_t monitor_thread;
2217 	uint64_t total_ops;
2218 	uint32_t run_time;
2219 	int monitor_created, ret, t_ret;
2220 
2221 	opts = wtperf->opts;
2222 	monitor_created = ret = 0;
2223 					/* [-Wconditional-uninitialized] */
2224 	memset(&monitor_thread, 0, sizeof(monitor_thread));
2225 
2226 	if ((ret = setup_log_file(wtperf)) != 0)
2227 		goto err;
2228 
2229 	if ((ret = wiredtiger_open(	/* Open the real connection. */
2230 	    wtperf->home, NULL, opts->conn_config, &wtperf->conn)) != 0) {
2231 		lprintf(wtperf, ret, 0, "Error connecting to %s", wtperf->home);
2232 		goto err;
2233 	}
2234 
2235 	create_uris(wtperf);
2236 
2237 	/* If creating, create the tables. */
2238 	if (opts->create != 0 && (ret = create_tables(wtperf)) != 0)
2239 		goto err;
2240 
2241 	/* Start the monitor thread. */
2242 	if (opts->sample_interval != 0) {
2243 		testutil_check(__wt_thread_create(
2244 		    NULL, &monitor_thread, monitor, wtperf));
2245 		monitor_created = 1;
2246 	}
2247 
2248 	/* If creating, populate the table. */
2249 	if (opts->create != 0 && execute_populate(wtperf) != 0)
2250 		goto err;
2251 
2252 	/* Optional workload. */
2253 	if (wtperf->workers_cnt != 0 &&
2254 	    (opts->run_time != 0 || opts->run_ops != 0)) {
2255 		/*
2256 		 * If we have a workload, close and reopen the connection so
2257 		 * that LSM can detect read-only workloads.
2258 		 */
2259 		if (close_reopen(wtperf) != 0)
2260 			goto err;
2261 
2262 		/* Didn't create, set insert count. */
2263 		if (opts->create == 0 &&
2264 		    opts->random_range == 0 && find_table_count(wtperf) != 0)
2265 			goto err;
2266 		/* Start the checkpoint thread. */
2267 		if (opts->checkpoint_threads != 0) {
2268 			lprintf(wtperf, 0, 1,
2269 			    "Starting %" PRIu32 " checkpoint thread(s)",
2270 			    opts->checkpoint_threads);
2271 			wtperf->ckptthreads = dcalloc(
2272 			     opts->checkpoint_threads, sizeof(WTPERF_THREAD));
2273 			start_threads(wtperf, NULL, wtperf->ckptthreads,
2274 			    opts->checkpoint_threads, checkpoint_worker);
2275 		}
2276 		if (opts->pre_load_data)
2277 			pre_load_data(wtperf);
2278 
2279 		/* Execute the workload. */
2280 		if ((ret = execute_workload(wtperf)) != 0)
2281 			goto err;
2282 
2283 		/* One final summation of the operations we've completed. */
2284 		wtperf->read_ops = sum_read_ops(wtperf);
2285 		wtperf->insert_ops = sum_insert_ops(wtperf);
2286 		wtperf->truncate_ops = sum_truncate_ops(wtperf);
2287 		wtperf->update_ops = sum_update_ops(wtperf);
2288 		wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
2289 		total_ops =
2290 		    wtperf->read_ops + wtperf->insert_ops + wtperf->update_ops;
2291 
2292 		run_time = opts->run_time == 0 ? 1 : opts->run_time;
2293 		lprintf(wtperf, 0, 1,
2294 		    "Executed %" PRIu64 " read operations (%" PRIu64
2295 		    "%%) %" PRIu64 " ops/sec",
2296 		    wtperf->read_ops, (wtperf->read_ops * 100) / total_ops,
2297 		    wtperf->read_ops / run_time);
2298 		lprintf(wtperf, 0, 1,
2299 		    "Executed %" PRIu64 " insert operations (%" PRIu64
2300 		    "%%) %" PRIu64 " ops/sec",
2301 		    wtperf->insert_ops, (wtperf->insert_ops * 100) / total_ops,
2302 		    wtperf->insert_ops / run_time);
2303 		lprintf(wtperf, 0, 1,
2304 		    "Executed %" PRIu64 " truncate operations (%" PRIu64
2305 		    "%%) %" PRIu64 " ops/sec",
2306 		    wtperf->truncate_ops,
2307 		    (wtperf->truncate_ops * 100) / total_ops,
2308 		    wtperf->truncate_ops / run_time);
2309 		lprintf(wtperf, 0, 1,
2310 		    "Executed %" PRIu64 " update operations (%" PRIu64
2311 		    "%%) %" PRIu64 " ops/sec",
2312 		    wtperf->update_ops, (wtperf->update_ops * 100) / total_ops,
2313 		    wtperf->update_ops / run_time);
2314 		lprintf(wtperf, 0, 1,
2315 		    "Executed %" PRIu64 " checkpoint operations",
2316 		    wtperf->ckpt_ops);
2317 
2318 		latency_print(wtperf);
2319 	}
2320 
2321 	if (0) {
2322 err:		if (ret == 0)
2323 			ret = EXIT_FAILURE;
2324 	}
2325 
2326 	/* Notify the worker threads they are done. */
2327 	wtperf->stop = true;
2328 
2329 	stop_threads(1, wtperf->ckptthreads);
2330 
2331 	if (monitor_created != 0)
2332 		testutil_check(__wt_thread_join(NULL, &monitor_thread));
2333 
2334 	if (wtperf->conn != NULL && opts->close_conn &&
2335 	    (t_ret = wtperf->conn->close(wtperf->conn, NULL)) != 0) {
2336 		lprintf(wtperf, t_ret, 0,
2337 		    "Error closing connection to %s", wtperf->home);
2338 		if (ret == 0)
2339 			ret = t_ret;
2340 	}
2341 
2342 	if (ret == 0) {
2343 		if (opts->run_time == 0 && opts->run_ops == 0)
2344 			lprintf(wtperf, 0, 1, "Run completed");
2345 		else
2346 			lprintf(wtperf, 0, 1, "Run completed: %" PRIu32 " %s",
2347 			    opts->run_time == 0 ?
2348 			    opts->run_ops : opts->run_time,
2349 			    opts->run_time == 0 ? "operations" : "seconds");
2350 	}
2351 
2352 	if (wtperf->logf != NULL) {
2353 		if ((t_ret = fflush(wtperf->logf)) != 0 && ret == 0)
2354 			ret = t_ret;
2355 		if ((t_ret = fclose(wtperf->logf)) != 0 && ret == 0)
2356 			ret = t_ret;
2357 	}
2358 	return (ret);
2359 }
2360 
2361 extern int __wt_optind, __wt_optreset;
2362 extern char *__wt_optarg;
2363 
2364 /*
2365  * usage --
2366  *	wtperf usage print, no error.
2367  */
2368 static void
usage(void)2369 usage(void)
2370 {
2371 	printf("wtperf [-C config] "
2372 	    "[-H mount] [-h home] [-O file] [-o option] [-T config]\n");
2373 	printf("\t-C <string> additional connection configuration\n");
2374 	printf("\t            (added to option conn_config)\n");
2375 	printf("\t-H <mount> configure Helium volume mount point\n");
2376 	printf("\t-h <string> Wired Tiger home must exist, default WT_TEST\n");
2377 	printf("\t-O <file> file contains options as listed below\n");
2378 	printf("\t-o option=val[,option=val,...] set options listed below\n");
2379 	printf("\t-T <string> additional table configuration\n");
2380 	printf("\t            (added to option table_config)\n");
2381 	printf("\n");
2382 	config_opt_usage();
2383 }
2384 
2385 int
main(int argc,char * argv[])2386 main(int argc, char *argv[])
2387 {
2388 	CONFIG_OPTS *opts;
2389 	WTPERF *wtperf, _wtperf;
2390 	size_t pos, req_len, sreq_len;
2391 	bool monitor_set;
2392 	int ch, ret;
2393 	const char *cmdflags = "C:h:m:O:o:T:";
2394 	const char *append_comma, *config_opts;
2395 	char *cc_buf, *path, *sess_cfg, *tc_buf, *user_cconfig, *user_tconfig;
2396 
2397 	/* The first WTPERF structure (from which all others are derived). */
2398 	wtperf = &_wtperf;
2399 	memset(wtperf, 0, sizeof(*wtperf));
2400 	wtperf->home = dstrdup(DEFAULT_HOME);
2401 	wtperf->monitor_dir = dstrdup(DEFAULT_MONITOR_DIR);
2402 	TAILQ_INIT(&wtperf->stone_head);
2403 	config_opt_init(&wtperf->opts);
2404 
2405 	opts = wtperf->opts;
2406 	monitor_set = false;
2407 	ret = 0;
2408 	config_opts = NULL;
2409 	cc_buf = sess_cfg = tc_buf = user_cconfig = user_tconfig = NULL;
2410 
2411 	/* Do a basic validation of options, and home is needed before open. */
2412 	while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
2413 		switch (ch) {
2414 		case 'C':
2415 			if (user_cconfig == NULL)
2416 				user_cconfig = dstrdup(__wt_optarg);
2417 			else {
2418 				user_cconfig = drealloc(user_cconfig,
2419 				    strlen(user_cconfig) +
2420 				    strlen(__wt_optarg) + 2);
2421 				strcat(user_cconfig, ",");
2422 				strcat(user_cconfig, __wt_optarg);
2423 			}
2424 			break;
2425 		case 'h':
2426 			free(wtperf->home);
2427 			wtperf->home = dstrdup(__wt_optarg);
2428 			break;
2429 		case 'm':
2430 			free(wtperf->monitor_dir);
2431 			wtperf->monitor_dir = dstrdup(__wt_optarg);
2432 			monitor_set = true;
2433 			break;
2434 		case 'O':
2435 			config_opts = __wt_optarg;
2436 			break;
2437 		case 'T':
2438 			if (user_tconfig == NULL)
2439 				user_tconfig = dstrdup(__wt_optarg);
2440 			else {
2441 				user_tconfig = drealloc(user_tconfig,
2442 				    strlen(user_tconfig) +
2443 				    strlen(__wt_optarg) + 2);
2444 				strcat(user_tconfig, ",");
2445 				strcat(user_tconfig, __wt_optarg);
2446 			}
2447 			break;
2448 		case '?':
2449 			usage();
2450 			goto einval;
2451 		}
2452 
2453 	/*
2454 	 * If the user did not specify a monitor directory then set the
2455 	 * monitor directory to the home dir.
2456 	 */
2457 	if (!monitor_set) {
2458 		free(wtperf->monitor_dir);
2459 		wtperf->monitor_dir = dstrdup(wtperf->home);
2460 	}
2461 
2462 	/* Parse configuration settings from configuration file. */
2463 	if (config_opts != NULL && config_opt_file(wtperf, config_opts) != 0)
2464 		goto einval;
2465 
2466 	/* Parse options that override values set via a configuration file. */
2467 	__wt_optreset = __wt_optind = 1;
2468 	while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
2469 		switch (ch) {
2470 		case 'o':
2471 			/* Allow -o key=value */
2472 			if (config_opt_str(wtperf, __wt_optarg) != 0)
2473 				goto einval;
2474 			break;
2475 		}
2476 
2477 	if (opts->populate_threads == 0 && opts->icount != 0) {
2478 		lprintf(wtperf, 1, 0,
2479 		    "Cannot have 0 populate threads when icount is set\n");
2480 		goto err;
2481 	}
2482 
2483 	wtperf->async_config = NULL;
2484 	/*
2485 	 * If the user specified async_threads we use async for all ops.
2486 	 * If the user wants compaction, then we also enable async for
2487 	 * the compact operation, but not for the workloads.
2488 	 */
2489 	if (opts->async_threads > 0) {
2490 		if (F_ISSET(wtperf, CFG_TRUNCATE)) {
2491 			lprintf(wtperf,
2492 			    1, 0, "Cannot run truncate and async\n");
2493 			goto err;
2494 		}
2495 		wtperf->use_asyncops = true;
2496 	}
2497 	if (opts->compact && opts->async_threads == 0)
2498 		opts->async_threads = 2;
2499 	if (opts->async_threads > 0) {
2500 		/*
2501 		 * The maximum number of async threads is two digits, so just
2502 		 * use that to compute the space we need.  Assume the default
2503 		 * of 1024 for the max ops.  Although we could bump that up
2504 		 * to 4096 if needed.
2505 		 */
2506 		req_len = strlen(",async=(enabled=true,threads=)") + 4;
2507 		wtperf->async_config = dmalloc(req_len);
2508 		testutil_check(__wt_snprintf(wtperf->async_config, req_len,
2509 		    ",async=(enabled=true,threads=%" PRIu32 ")",
2510 		    opts->async_threads));
2511 	}
2512 	if ((ret = config_compress(wtperf)) != 0)
2513 		goto err;
2514 
2515 	/* You can't have truncate on a random collection. */
2516 	if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->random_range) {
2517 		lprintf(wtperf, 1, 0, "Cannot run truncate and random_range\n");
2518 		goto err;
2519 	}
2520 
2521 	/* We can't run truncate with more than one table. */
2522 	if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->table_count > 1) {
2523 		lprintf(wtperf, 1, 0, "Cannot truncate more than 1 table\n");
2524 		goto err;
2525 	}
2526 
2527 	/* Make stdout line buffered, so verbose output appears quickly. */
2528 	__wt_stream_set_line_buffer(stdout);
2529 
2530 	/* Concatenate non-default configuration strings. */
2531 	if (user_cconfig != NULL || opts->session_count_idle > 0 ||
2532 	     wtperf->compress_ext != NULL || wtperf->async_config != NULL ||
2533 	     opts->in_memory) {
2534 		req_len = 20;
2535 		req_len += wtperf->async_config != NULL ?
2536 		    strlen(wtperf->async_config) : 0;
2537 		req_len += wtperf->compress_ext != NULL ?
2538 		    strlen(wtperf->compress_ext) : 0;
2539 		if (opts->session_count_idle > 0) {
2540 			sreq_len = strlen("session_max=") + 6;
2541 			req_len += sreq_len;
2542 			sess_cfg = dmalloc(sreq_len);
2543 			testutil_check(__wt_snprintf(sess_cfg, sreq_len,
2544 			    "session_max=%" PRIu32,
2545 			    opts->session_count_idle +
2546 			    wtperf->workers_cnt + opts->populate_threads + 10));
2547 		}
2548 		req_len += opts->in_memory ? strlen("in_memory=true") : 0;
2549 		req_len += user_cconfig != NULL ? strlen(user_cconfig) : 0;
2550 		cc_buf = dmalloc(req_len);
2551 
2552 		pos = 0;
2553 		append_comma = "";
2554 		if (wtperf->async_config != NULL &&
2555 		    strlen(wtperf->async_config) != 0) {
2556 			testutil_check(__wt_snprintf_len_incr(
2557 			    cc_buf + pos, req_len - pos, &pos, "%s%s",
2558 			    append_comma, wtperf->async_config));
2559 			append_comma = ",";
2560 		}
2561 		if (wtperf->compress_ext != NULL &&
2562 		    strlen(wtperf->compress_ext) != 0) {
2563 			testutil_check(__wt_snprintf_len_incr(
2564 			    cc_buf + pos, req_len - pos, &pos, "%s%s",
2565 			    append_comma, wtperf->compress_ext));
2566 			append_comma = ",";
2567 		}
2568 		if (opts->in_memory) {
2569 			testutil_check(__wt_snprintf_len_incr(
2570 			    cc_buf + pos, req_len - pos, &pos, "%s%s",
2571 			    append_comma, "in_memory=true"));
2572 			append_comma = ",";
2573 		}
2574 		if (sess_cfg != NULL && strlen(sess_cfg) != 0) {
2575 			testutil_check(__wt_snprintf_len_incr(
2576 			    cc_buf + pos, req_len - pos, &pos, "%s%s",
2577 			    append_comma, sess_cfg));
2578 			append_comma = ",";
2579 		}
2580 		if (user_cconfig != NULL && strlen(user_cconfig) != 0) {
2581 			testutil_check(__wt_snprintf_len_incr(
2582 			    cc_buf + pos, req_len - pos, &pos, "%s%s",
2583 			    append_comma, user_cconfig));
2584 		}
2585 
2586 		if (strlen(cc_buf) != 0 && (ret =
2587 		    config_opt_name_value(wtperf, "conn_config", cc_buf)) != 0)
2588 			goto err;
2589 	}
2590 	if (opts->index ||
2591 	    user_tconfig != NULL || wtperf->compress_table != NULL) {
2592 		req_len = 20;
2593 		req_len += wtperf->compress_table != NULL ?
2594 		    strlen(wtperf->compress_table) : 0;
2595 		req_len += opts->index ? strlen(INDEX_COL_NAMES) : 0;
2596 		req_len += user_tconfig != NULL ? strlen(user_tconfig) : 0;
2597 		tc_buf = dmalloc(req_len);
2598 
2599 		pos = 0;
2600 		append_comma = "";
2601 		if (wtperf->compress_table != NULL &&
2602 		    strlen(wtperf->compress_table) != 0) {
2603 			testutil_check(__wt_snprintf_len_incr(
2604 			    tc_buf + pos, req_len - pos, &pos, "%s%s",
2605 			    append_comma, wtperf->compress_table));
2606 			append_comma = ",";
2607 		}
2608 		if (opts->index) {
2609 			testutil_check(__wt_snprintf_len_incr(
2610 			    tc_buf + pos, req_len - pos, &pos, "%s%s",
2611 			    append_comma, INDEX_COL_NAMES));
2612 			append_comma = ",";
2613 		}
2614 		if (user_tconfig != NULL && strlen(user_tconfig) != 0) {
2615 			testutil_check(__wt_snprintf_len_incr(
2616 			    tc_buf + pos, req_len - pos, &pos, "%s%s",
2617 			    append_comma, user_tconfig));
2618 		}
2619 
2620 		if (strlen(tc_buf) != 0 && (ret =
2621 		    config_opt_name_value(wtperf, "table_config", tc_buf)) != 0)
2622 			goto err;
2623 	}
2624 	if (opts->log_partial && opts->table_count > 1) {
2625 		req_len = strlen(opts->table_config) +
2626 		    strlen(LOG_PARTIAL_CONFIG) + 1;
2627 		wtperf->partial_config = dmalloc(req_len);
2628 		testutil_check(__wt_snprintf(
2629 		    wtperf->partial_config, req_len, "%s%s",
2630 		    opts->table_config, LOG_PARTIAL_CONFIG));
2631 	}
2632 	/*
2633 	 * Set the config for reopen.  If readonly add in that string.
2634 	 * If not readonly then just copy the original conn_config.
2635 	 */
2636 	if (opts->readonly)
2637 		req_len = strlen(opts->conn_config) +
2638 		    strlen(READONLY_CONFIG) + 1;
2639 	else
2640 		req_len = strlen(opts->conn_config) + 1;
2641 	wtperf->reopen_config = dmalloc(req_len);
2642 	if (opts->readonly)
2643 		testutil_check(__wt_snprintf(
2644 		    wtperf->reopen_config, req_len, "%s%s",
2645 		    opts->conn_config, READONLY_CONFIG));
2646 	else
2647 		testutil_check(__wt_snprintf(
2648 		    wtperf->reopen_config, req_len, "%s", opts->conn_config));
2649 
2650 	/* Sanity-check the configuration. */
2651 	if ((ret = config_sanity(wtperf)) != 0)
2652 		goto err;
2653 
2654 	/* If creating, remove and re-create the home directory. */
2655 	if (opts->create != 0)
2656 		recreate_dir(wtperf->home);
2657 
2658 	/* Write a copy of the config. */
2659 	req_len = strlen(wtperf->home) + strlen("/CONFIG.wtperf") + 1;
2660 	path = dmalloc(req_len);
2661 	testutil_check(__wt_snprintf(
2662 	    path, req_len, "%s/CONFIG.wtperf", wtperf->home));
2663 	config_opt_log(opts, path);
2664 	free(path);
2665 
2666 	/* Display the configuration. */
2667 	if (opts->verbose > 1)
2668 		config_opt_print(wtperf);
2669 
2670 	if ((ret = start_all_runs(wtperf)) != 0)
2671 		goto err;
2672 
2673 	if (0) {
2674 einval:		ret = EINVAL;
2675 	}
2676 
2677 err:	wtperf_free(wtperf);
2678 	config_opt_cleanup(opts);
2679 
2680 	free(cc_buf);
2681 	free(sess_cfg);
2682 	free(tc_buf);
2683 	free(user_cconfig);
2684 	free(user_tconfig);
2685 
2686 	return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
2687 }
2688 
2689 static void
start_threads(WTPERF * wtperf,WORKLOAD * workp,WTPERF_THREAD * base,u_int num,WT_THREAD_CALLBACK (* func)(void *))2690 start_threads(WTPERF *wtperf, WORKLOAD *workp,
2691     WTPERF_THREAD *base, u_int num, WT_THREAD_CALLBACK(*func)(void *))
2692 {
2693 	CONFIG_OPTS *opts;
2694 	WTPERF_THREAD *thread;
2695 	u_int i;
2696 
2697 	opts = wtperf->opts;
2698 
2699 	/* Initialize the threads. */
2700 	for (i = 0, thread = base; i < num; ++i, ++thread) {
2701 		thread->wtperf = wtperf;
2702 		thread->workload = workp;
2703 
2704 		/*
2705 		 * We don't want the threads executing in lock-step, seed each
2706 		 * one differently.
2707 		 */
2708 		__wt_random_init_seed(NULL, &thread->rnd);
2709 
2710 		/*
2711 		 * Every thread gets a key/data buffer because we don't bother
2712 		 * to distinguish between threads needing them and threads that
2713 		 * don't, it's not enough memory to bother.  These buffers hold
2714 		 * strings: trailing NUL is included in the size.
2715 		 */
2716 		thread->key_buf = dcalloc(opts->key_sz, 1);
2717 		thread->value_buf = dcalloc(opts->value_sz_max, 1);
2718 
2719 		/*
2720 		 * Initialize and then toss in a bit of random values if needed.
2721 		 */
2722 		memset(thread->value_buf, 'a', opts->value_sz - 1);
2723 		if (opts->random_value)
2724 			randomize_value(thread, thread->value_buf);
2725 
2726 		/*
2727 		 * Every thread gets tracking information and is initialized
2728 		 * for latency measurements, for the same reason.
2729 		 */
2730 		thread->ckpt.min_latency =
2731 		thread->insert.min_latency = thread->read.min_latency =
2732 		thread->update.min_latency = UINT32_MAX;
2733 		thread->ckpt.max_latency = thread->insert.max_latency =
2734 		thread->read.max_latency = thread->update.max_latency = 0;
2735 	}
2736 
2737 	/* Start the threads. */
2738 	for (i = 0, thread = base; i < num; ++i, ++thread)
2739 		testutil_check(__wt_thread_create(
2740 		    NULL, &thread->handle, func, thread));
2741 }
2742 
2743 static void
stop_threads(u_int num,WTPERF_THREAD * threads)2744 stop_threads(u_int num, WTPERF_THREAD *threads)
2745 {
2746 	u_int i;
2747 
2748 	if (num == 0 || threads == NULL)
2749 		return;
2750 
2751 	for (i = 0; i < num; ++i, ++threads) {
2752 		testutil_check(__wt_thread_join(NULL, &threads->handle));
2753 
2754 		free(threads->key_buf);
2755 		threads->key_buf = NULL;
2756 		free(threads->value_buf);
2757 		threads->value_buf = NULL;
2758 	}
2759 
2760 	/*
2761 	 * We don't free the thread structures or any memory referenced, or NULL
2762 	 * the reference when we stop the threads; the thread structure is still
2763 	 * being read by the monitor thread (among others).  As a standalone
2764 	 * program, leaking memory isn't a concern, and it's simpler that way.
2765 	 */
2766 }
2767 
2768 static void
recreate_dir(const char * name)2769 recreate_dir(const char *name)
2770 {
2771 	char *buf;
2772 	size_t len;
2773 
2774 	len = strlen(name) * 2 + 100;
2775 	buf = dmalloc(len);
2776 	testutil_check(__wt_snprintf(
2777 	    buf, len, "rm -rf %s && mkdir %s", name, name));
2778 	testutil_checkfmt(system(buf), "system: %s", buf);
2779 	free(buf);
2780 }
2781 
2782 static int
drop_all_tables(WTPERF * wtperf)2783 drop_all_tables(WTPERF *wtperf)
2784 {
2785 	struct timespec start, stop;
2786 	CONFIG_OPTS *opts;
2787 	WT_SESSION *session;
2788 	size_t i;
2789 	uint64_t msecs;
2790 	int ret, t_ret;
2791 
2792 	opts = wtperf->opts;
2793 
2794 	/* Drop any tables. */
2795 	if ((ret = wtperf->conn->open_session(
2796 	    wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
2797 		lprintf(wtperf, ret, 0,
2798 		    "Error opening a session on %s", wtperf->home);
2799 		return (ret);
2800 	}
2801 	__wt_epoch(NULL, &start);
2802 	for (i = 0; i < opts->table_count; i++) {
2803 		if ((ret =
2804 		    session->drop(session, wtperf->uris[i], NULL)) != 0) {
2805 			lprintf(wtperf, ret, 0,
2806 			    "Error dropping table %s", wtperf->uris[i]);
2807 			goto err;
2808 		}
2809 	}
2810 	__wt_epoch(NULL, &stop);
2811 	msecs = WT_TIMEDIFF_MS(stop, start);
2812 	lprintf(wtperf, 0, 1,
2813 	    "Executed %" PRIu32 " drop operations average time %" PRIu64 "ms",
2814 	    opts->table_count, msecs / opts->table_count);
2815 
2816 err:	if ((t_ret = session->close(session, NULL)) != 0 && ret == 0)
2817 		ret = t_ret;
2818 	return (ret);
2819 }
2820 
2821 static uint64_t
wtperf_value_range(WTPERF * wtperf)2822 wtperf_value_range(WTPERF *wtperf)
2823 {
2824 	CONFIG_OPTS *opts;
2825 
2826 	opts = wtperf->opts;
2827 
2828 	if (opts->random_range)
2829 		return (opts->icount + opts->random_range);
2830 	/*
2831 	 * It is legal to configure a zero size populate phase, hide that
2832 	 * from other code by pretending the range is 1 in that case.
2833 	 */
2834 	if (opts->icount + wtperf->insert_key == 0)
2835 		return (1);
2836 	return (opts->icount +
2837 	    wtperf->insert_key - (u_int)(wtperf->workers_cnt + 1));
2838 }
2839 
2840 static uint64_t
wtperf_rand(WTPERF_THREAD * thread)2841 wtperf_rand(WTPERF_THREAD *thread)
2842 {
2843 	CONFIG_OPTS *opts;
2844 	WT_CURSOR *rnd_cursor;
2845 	WTPERF *wtperf;
2846 	double S1, S2, U;
2847 	uint64_t rval;
2848 	int ret;
2849 	char *key_buf;
2850 
2851 	wtperf = thread->wtperf;
2852 	opts = wtperf->opts;
2853 
2854 	/*
2855 	 * If we have a random cursor set up then use it.
2856 	 */
2857 	if ((rnd_cursor = thread->rand_cursor) != NULL) {
2858 		if ((ret = rnd_cursor->next(rnd_cursor)) != 0) {
2859 			lprintf(wtperf, ret, 0, "worker: rand next failed");
2860 			/* 0 is outside the expected range. */
2861 			return (0);
2862 		}
2863 		if ((ret = rnd_cursor->get_key(rnd_cursor, &key_buf)) != 0) {
2864 			lprintf(wtperf, ret, 0,
2865 			    "worker: rand next key retrieval");
2866 			return (0);
2867 		}
2868 		/*
2869 		 * Resetting the cursor is not fatal.  We still return the
2870 		 * value we retrieved above.  We do it so that we don't
2871 		 * leave a cursor positioned.
2872 		 */
2873 		if ((ret = rnd_cursor->reset(rnd_cursor)) != 0)
2874 			lprintf(wtperf, ret, 0,
2875 			    "worker: rand cursor reset failed");
2876 		extract_key(key_buf, &rval);
2877 		return (rval);
2878 	}
2879 
2880 	/*
2881 	 * Use WiredTiger's random number routine: it's lock-free and fairly
2882 	 * good.
2883 	 */
2884 	rval = __wt_random(&thread->rnd);
2885 
2886 	/* Use Pareto distribution to give 80/20 hot/cold values. */
2887 	if (opts->pareto != 0) {
2888 #define	PARETO_SHAPE	1.5
2889 		S1 = (-1 / PARETO_SHAPE);
2890 		S2 = wtperf_value_range(wtperf) *
2891 		    (opts->pareto / 100.0) * (PARETO_SHAPE - 1);
2892 		U = 1 - (double)rval / (double)UINT32_MAX;
2893 		rval = (uint64_t)((pow(U, S1) - 1) * S2);
2894 		/*
2895 		 * This Pareto calculation chooses out of range values about
2896 		 * 2% of the time, from my testing. That will lead to the
2897 		 * first item in the table being "hot".
2898 		 */
2899 		if (rval > wtperf_value_range(wtperf))
2900 			rval = 0;
2901 	}
2902 	/*
2903 	 * Wrap the key to within the expected range and avoid zero: we never
2904 	 * insert that key.
2905 	 */
2906 	rval = (rval % wtperf_value_range(wtperf)) + 1;
2907 	return (rval);
2908 }
2909