1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * src/bin/pgbench/pgbench.c
8  * Copyright (c) 2000-2018, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29 
30 #ifdef WIN32
31 #define FD_SETSIZE 1024			/* set before winsock2.h is included */
32 #endif							/* ! WIN32 */
33 
34 #include "postgres_fe.h"
35 #include "fe_utils/conditional.h"
36 
37 #include "getopt_long.h"
38 #include "libpq-fe.h"
39 #include "portability/instr_time.h"
40 
41 #include <ctype.h>
42 #include <float.h>
43 #include <limits.h>
44 #include <math.h>
45 #include <signal.h>
46 #include <time.h>
47 #include <sys/time.h>
48 #ifdef HAVE_SYS_SELECT_H
49 #include <sys/select.h>
50 #endif
51 
52 #ifdef HAVE_SYS_RESOURCE_H
53 #include <sys/resource.h>		/* for getrlimit */
54 #endif
55 
56 #ifndef M_PI
57 #define M_PI 3.14159265358979323846
58 #endif
59 
60 #include "pgbench.h"
61 
62 #define ERRCODE_UNDEFINED_TABLE  "42P01"
63 
64 /*
65  * Hashing constants
66  */
67 #define FNV_PRIME			UINT64CONST(0x100000001b3)
68 #define FNV_OFFSET_BASIS	UINT64CONST(0xcbf29ce484222325)
69 #define MM2_MUL				UINT64CONST(0xc6a4a7935bd1e995)
70 #define MM2_MUL_TIMES_8		UINT64CONST(0x35253c9ade8f4ca8)
71 #define MM2_ROT				47
72 
73 /*
74  * Multi-platform pthread implementations
75  */
76 
77 #ifdef WIN32
78 /* Use native win32 threads on Windows */
79 typedef struct win32_pthread *pthread_t;
80 typedef int pthread_attr_t;
81 
82 static int	pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
83 static int	pthread_join(pthread_t th, void **thread_return);
84 #elif defined(ENABLE_THREAD_SAFETY)
85 /* Use platform-dependent pthread capability */
86 #include <pthread.h>
87 #else
88 /* No threads implementation, use none (-j 1) */
89 #define pthread_t void *
90 #endif
91 
92 
93 /********************************************************************
94  * some configurable parameters */
95 
96 /* max number of clients allowed */
97 #ifdef FD_SETSIZE
98 #define MAXCLIENTS	(FD_SETSIZE - 10)
99 #else
100 #define MAXCLIENTS	1024
101 #endif
102 
103 #define DEFAULT_INIT_STEPS "dtgvp"	/* default -I setting */
104 
105 #define LOG_STEP_SECONDS	5	/* seconds between log messages */
106 #define DEFAULT_NXACTS	10		/* default nxacts */
107 
108 #define ZIPF_CACHE_SIZE	15		/* cache cells number */
109 
110 #define MIN_GAUSSIAN_PARAM		2.0 /* minimum parameter for gauss */
111 #define MAX_ZIPFIAN_PARAM		1000	/* maximum parameter for zipfian */
112 
113 int			nxacts = 0;			/* number of transactions per client */
114 int			duration = 0;		/* duration in seconds */
115 int64		end_time = 0;		/* when to stop in micro seconds, under -T */
116 
117 /*
118  * scaling factor. for example, scale = 10 will make 1000000 tuples in
119  * pgbench_accounts table.
120  */
121 int			scale = 1;
122 
123 /*
124  * fillfactor. for example, fillfactor = 90 will use only 90 percent
125  * space during inserts and leave 10 percent free.
126  */
127 int			fillfactor = 100;
128 
129 /*
130  * use unlogged tables?
131  */
132 bool		unlogged_tables = false;
133 
134 /*
135  * log sampling rate (1.0 = log everything, 0.0 = option not given)
136  */
137 double		sample_rate = 0.0;
138 
139 /*
140  * When threads are throttled to a given rate limit, this is the target delay
141  * to reach that rate in usec.  0 is the default and means no throttling.
142  */
143 int64		throttle_delay = 0;
144 
145 /*
146  * Transactions which take longer than this limit (in usec) are counted as
147  * late, and reported as such, although they are completed anyway. When
148  * throttling is enabled, execution time slots that are more than this late
149  * are skipped altogether, and counted separately.
150  */
151 int64		latency_limit = 0;
152 
153 /*
154  * tablespace selection
155  */
156 char	   *tablespace = NULL;
157 char	   *index_tablespace = NULL;
158 
159 /* random seed used to initialize base_random_sequence */
160 int64		random_seed = -1;
161 
162 /*
163  * end of configurable parameters
164  *********************************************************************/
165 
166 #define nbranches	1			/* Makes little sense to change this.  Change
167 								 * -s instead */
168 #define ntellers	10
169 #define naccounts	100000
170 
171 /*
172  * The scale factor at/beyond which 32bit integers are incapable of storing
173  * 64bit values.
174  *
175  * Although the actual threshold is 21474, we use 20000 because it is easier to
176  * document and remember, and isn't that far away from the real threshold.
177  */
178 #define SCALE_32BIT_THRESHOLD 20000
179 
180 bool		use_log;			/* log transaction latencies to a file */
181 bool		use_quiet;			/* quiet logging onto stderr */
182 int			agg_interval;		/* log aggregates instead of individual
183 								 * transactions */
184 bool		per_script_stats = false;	/* whether to collect stats per script */
185 int			progress = 0;		/* thread progress report every this seconds */
186 bool		progress_timestamp = false; /* progress report with Unix time */
187 int			nclients = 1;		/* number of clients */
188 int			nthreads = 1;		/* number of threads */
189 bool		is_connect;			/* establish connection for each transaction */
190 bool		is_latencies;		/* report per-command latencies */
191 int			main_pid;			/* main process id used in log filename */
192 
193 char	   *pghost = "";
194 char	   *pgport = "";
195 char	   *login = NULL;
196 char	   *dbName;
197 char	   *logfile_prefix = NULL;
198 const char *progname;
199 
200 #define WSEP '@'				/* weight separator */
201 
202 volatile bool timer_exceeded = false;	/* flag from signal handler */
203 
204 /*
205  * Variable definitions.
206  *
207  * If a variable only has a string value, "svalue" is that value, and value is
208  * "not set".  If the value is known, "value" contains the value (in any
209  * variant).
210  *
211  * In this case "svalue" contains the string equivalent of the value, if we've
212  * had occasion to compute that, or NULL if we haven't.
213  */
214 typedef struct
215 {
216 	char	   *name;			/* variable's name */
217 	char	   *svalue;			/* its value in string form, if known */
218 	PgBenchValue value;			/* actual variable's value */
219 } Variable;
220 
221 #define MAX_SCRIPTS		128		/* max number of SQL scripts allowed */
222 #define SHELL_COMMAND_SIZE	256 /* maximum size allowed for shell command */
223 
224 /*
225  * Simple data structure to keep stats about something.
226  *
227  * XXX probably the first value should be kept and used as an offset for
228  * better numerical stability...
229  */
230 typedef struct SimpleStats
231 {
232 	int64		count;			/* how many values were encountered */
233 	double		min;			/* the minimum seen */
234 	double		max;			/* the maximum seen */
235 	double		sum;			/* sum of values */
236 	double		sum2;			/* sum of squared values */
237 } SimpleStats;
238 
239 /*
240  * Data structure to hold various statistics: per-thread and per-script stats
241  * are maintained and merged together.
242  */
243 typedef struct StatsData
244 {
245 	time_t		start_time;		/* interval start time, for aggregates */
246 	int64		cnt;			/* number of transactions, including skipped */
247 	int64		skipped;		/* number of transactions skipped under --rate
248 								 * and --latency-limit */
249 	SimpleStats latency;
250 	SimpleStats lag;
251 } StatsData;
252 
253 /* Various random sequences are initialized from this one. */
254 static unsigned short base_random_sequence[3];
255 
256 /*
257  * Connection state machine states.
258  */
259 typedef enum
260 {
261 	/*
262 	 * The client must first choose a script to execute.  Once chosen, it can
263 	 * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
264 	 * right away (state CSTATE_START_TX).
265 	 */
266 	CSTATE_CHOOSE_SCRIPT,
267 
268 	/*
269 	 * In CSTATE_START_THROTTLE state, we calculate when to begin the next
270 	 * transaction, and advance to CSTATE_THROTTLE.  CSTATE_THROTTLE state
271 	 * sleeps until that moment.  (If throttling is not enabled, doCustom()
272 	 * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
273 	 */
274 	CSTATE_START_THROTTLE,
275 	CSTATE_THROTTLE,
276 
277 	/*
278 	 * CSTATE_START_TX performs start-of-transaction processing.  Establishes
279 	 * a new connection for the transaction, in --connect mode, and records
280 	 * the transaction start time.
281 	 */
282 	CSTATE_START_TX,
283 
284 	/*
285 	 * We loop through these states, to process each command in the script:
286 	 *
287 	 * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
288 	 * command, the command is sent to the server, and we move to
289 	 * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is set,
290 	 * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
291 	 * meta-commands are executed immediately.
292 	 *
293 	 * CSTATE_SKIP_COMMAND for conditional branches which are not executed,
294 	 * quickly skip commands that do not need any evaluation.
295 	 *
296 	 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
297 	 * for the current command.
298 	 *
299 	 * CSTATE_SLEEP waits until the end of \sleep.
300 	 *
301 	 * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
302 	 * command counter, and loops back to CSTATE_START_COMMAND state.
303 	 */
304 	CSTATE_START_COMMAND,
305 	CSTATE_SKIP_COMMAND,
306 	CSTATE_WAIT_RESULT,
307 	CSTATE_SLEEP,
308 	CSTATE_END_COMMAND,
309 
310 	/*
311 	 * CSTATE_END_TX performs end-of-transaction processing.  Calculates
312 	 * latency, and logs the transaction.  In --connect mode, closes the
313 	 * current connection.  Chooses the next script to execute and starts over
314 	 * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
315 	 * more work to do.
316 	 */
317 	CSTATE_END_TX,
318 
319 	/*
320 	 * Final states.  CSTATE_ABORTED means that the script execution was
321 	 * aborted because a command failed, CSTATE_FINISHED means success.
322 	 */
323 	CSTATE_ABORTED,
324 	CSTATE_FINISHED
325 } ConnectionStateEnum;
326 
327 /*
328  * Connection state.
329  */
330 typedef struct
331 {
332 	PGconn	   *con;			/* connection handle to DB */
333 	int			id;				/* client No. */
334 	ConnectionStateEnum state;	/* state machine's current state. */
335 	ConditionalStack cstack;	/* enclosing conditionals state */
336 
337 	int			use_file;		/* index in sql_script for this client */
338 	int			command;		/* command number in script */
339 
340 	/* client variables */
341 	Variable   *variables;		/* array of variable definitions */
342 	int			nvariables;		/* number of variables */
343 	bool		vars_sorted;	/* are variables sorted by name? */
344 
345 	/* various times about current transaction */
346 	int64		txn_scheduled;	/* scheduled start time of transaction (usec) */
347 	int64		sleep_until;	/* scheduled start time of next cmd (usec) */
348 	instr_time	txn_begin;		/* used for measuring schedule lag times */
349 	instr_time	stmt_begin;		/* used for measuring statement latencies */
350 
351 	bool		prepared[MAX_SCRIPTS];	/* whether client prepared the script */
352 
353 	/* per client collected stats */
354 	int64		cnt;			/* client transaction count, for -t */
355 	int			ecnt;			/* error count */
356 } CState;
357 
358 /*
359  * Cache cell for random_zipfian call
360  */
361 typedef struct
362 {
363 	/* cell keys */
364 	double		s;				/* s - parameter of random_zipfian function */
365 	int64		n;				/* number of elements in range (max - min + 1) */
366 
367 	double		harmonicn;		/* generalizedHarmonicNumber(n, s) */
368 	double		alpha;
369 	double		beta;
370 	double		eta;
371 
372 	uint64		last_used;		/* last used logical time */
373 } ZipfCell;
374 
375 /*
376  * Zipf cache for zeta values
377  */
378 typedef struct
379 {
380 	uint64		current;		/* counter for LRU cache replacement algorithm */
381 
382 	int			nb_cells;		/* number of filled cells */
383 	int			overflowCount;	/* number of cache overflows */
384 	ZipfCell	cells[ZIPF_CACHE_SIZE];
385 } ZipfCache;
386 
387 /*
388  * Thread state
389  */
390 typedef struct
391 {
392 	int			tid;			/* thread id */
393 	pthread_t	thread;			/* thread handle */
394 	CState	   *state;			/* array of CState */
395 	int			nstate;			/* length of state[] */
396 	unsigned short random_state[3]; /* separate randomness for each thread */
397 	int64		throttle_trigger;	/* previous/next throttling (us) */
398 	FILE	   *logfile;		/* where to log, or NULL */
399 	ZipfCache	zipf_cache;		/* for thread-safe  zipfian random number
400 								 * generation */
401 
402 	/* per thread collected stats */
403 	instr_time	start_time;		/* thread start time */
404 	instr_time	conn_time;
405 	StatsData	stats;
406 	int64		latency_late;	/* executed but late transactions */
407 } TState;
408 
409 #define INVALID_THREAD		((pthread_t) 0)
410 
411 /*
412  * queries read from files
413  */
414 #define SQL_COMMAND		1
415 #define META_COMMAND	2
416 #define MAX_ARGS		10
417 
418 typedef enum MetaCommand
419 {
420 	META_NONE,					/* not a known meta-command */
421 	META_SET,					/* \set */
422 	META_SETSHELL,				/* \setshell */
423 	META_SHELL,					/* \shell */
424 	META_SLEEP,					/* \sleep */
425 	META_IF,					/* \if */
426 	META_ELIF,					/* \elif */
427 	META_ELSE,					/* \else */
428 	META_ENDIF					/* \endif */
429 } MetaCommand;
430 
431 typedef enum QueryMode
432 {
433 	QUERY_SIMPLE,				/* simple query */
434 	QUERY_EXTENDED,				/* extended query */
435 	QUERY_PREPARED,				/* extended query with prepared statements */
436 	NUM_QUERYMODE
437 } QueryMode;
438 
439 static QueryMode querymode = QUERY_SIMPLE;
440 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
441 
442 typedef struct
443 {
444 	char	   *line;			/* text of command line */
445 	int			command_num;	/* unique index of this Command struct */
446 	int			type;			/* command type (SQL_COMMAND or META_COMMAND) */
447 	MetaCommand meta;			/* meta command identifier, or META_NONE */
448 	int			argc;			/* number of command words */
449 	char	   *argv[MAX_ARGS]; /* command word list */
450 	PgBenchExpr *expr;			/* parsed expression, if needed */
451 	SimpleStats stats;			/* time spent in this command */
452 } Command;
453 
454 typedef struct ParsedScript
455 {
456 	const char *desc;			/* script descriptor (eg, file name) */
457 	int			weight;			/* selection weight */
458 	Command   **commands;		/* NULL-terminated array of Commands */
459 	StatsData	stats;			/* total time spent in script */
460 } ParsedScript;
461 
462 static ParsedScript sql_script[MAX_SCRIPTS];	/* SQL script files */
463 static int	num_scripts;		/* number of scripts in sql_script[] */
464 static int	num_commands = 0;	/* total number of Command structs */
465 static int64 total_weight = 0;
466 
467 static int	debug = 0;			/* debug flag */
468 
469 /* Builtin test scripts */
470 typedef struct BuiltinScript
471 {
472 	const char *name;			/* very short name for -b ... */
473 	const char *desc;			/* short description */
474 	const char *script;			/* actual pgbench script */
475 } BuiltinScript;
476 
477 static const BuiltinScript builtin_script[] =
478 {
479 	{
480 		"tpcb-like",
481 		"<builtin: TPC-B (sort of)>",
482 		"\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
483 		"\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
484 		"\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
485 		"\\set delta random(-5000, 5000)\n"
486 		"BEGIN;\n"
487 		"UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
488 		"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
489 		"UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
490 		"UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
491 		"INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
492 		"END;\n"
493 	},
494 	{
495 		"simple-update",
496 		"<builtin: simple update>",
497 		"\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
498 		"\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
499 		"\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
500 		"\\set delta random(-5000, 5000)\n"
501 		"BEGIN;\n"
502 		"UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
503 		"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
504 		"INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
505 		"END;\n"
506 	},
507 	{
508 		"select-only",
509 		"<builtin: select only>",
510 		"\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
511 		"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
512 	}
513 };
514 
515 
516 /* Function prototypes */
517 static void setNullValue(PgBenchValue *pv);
518 static void setBoolValue(PgBenchValue *pv, bool bval);
519 static void setIntValue(PgBenchValue *pv, int64 ival);
520 static void setDoubleValue(PgBenchValue *pv, double dval);
521 static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *);
522 static void doLog(TState *thread, CState *st,
523 	  StatsData *agg, bool skipped, double latency, double lag);
524 static void processXactStats(TState *thread, CState *st, instr_time *now,
525 				 bool skipped, StatsData *agg);
526 static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
527 static void addScript(ParsedScript script);
528 static void *threadRun(void *arg);
529 static void setalarm(int seconds);
530 static void finishCon(CState *st);
531 
532 
533 /* callback functions for our flex lexer */
534 static const PsqlScanCallbacks pgbench_callbacks = {
535 	NULL,						/* don't need get_variable functionality */
536 	pgbench_error
537 };
538 
539 
540 static void
usage(void)541 usage(void)
542 {
543 	printf("%s is a benchmarking tool for PostgreSQL.\n\n"
544 		   "Usage:\n"
545 		   "  %s [OPTION]... [DBNAME]\n"
546 		   "\nInitialization options:\n"
547 		   "  -i, --initialize         invokes initialization mode\n"
548 		   "  -I, --init-steps=[dtgvpf]+ (default \"dtgvp\")\n"
549 		   "                           run selected initialization steps\n"
550 		   "  -F, --fillfactor=NUM     set fill factor\n"
551 		   "  -n, --no-vacuum          do not run VACUUM during initialization\n"
552 		   "  -q, --quiet              quiet logging (one message each 5 seconds)\n"
553 		   "  -s, --scale=NUM          scaling factor\n"
554 		   "  --foreign-keys           create foreign key constraints between tables\n"
555 		   "  --index-tablespace=TABLESPACE\n"
556 		   "                           create indexes in the specified tablespace\n"
557 		   "  --tablespace=TABLESPACE  create tables in the specified tablespace\n"
558 		   "  --unlogged-tables        create tables as unlogged tables\n"
559 		   "\nOptions to select what to run:\n"
560 		   "  -b, --builtin=NAME[@W]   add builtin script NAME weighted at W (default: 1)\n"
561 		   "                           (use \"-b list\" to list available scripts)\n"
562 		   "  -f, --file=FILENAME[@W]  add script FILENAME weighted at W (default: 1)\n"
563 		   "  -N, --skip-some-updates  skip updates of pgbench_tellers and pgbench_branches\n"
564 		   "                           (same as \"-b simple-update\")\n"
565 		   "  -S, --select-only        perform SELECT-only transactions\n"
566 		   "                           (same as \"-b select-only\")\n"
567 		   "\nBenchmarking options:\n"
568 		   "  -c, --client=NUM         number of concurrent database clients (default: 1)\n"
569 		   "  -C, --connect            establish new connection for each transaction\n"
570 		   "  -D, --define=VARNAME=VALUE\n"
571 		   "                           define variable for use by custom script\n"
572 		   "  -j, --jobs=NUM           number of threads (default: 1)\n"
573 		   "  -l, --log                write transaction times to log file\n"
574 		   "  -L, --latency-limit=NUM  count transactions lasting more than NUM ms as late\n"
575 		   "  -M, --protocol=simple|extended|prepared\n"
576 		   "                           protocol for submitting queries (default: simple)\n"
577 		   "  -n, --no-vacuum          do not run VACUUM before tests\n"
578 		   "  -P, --progress=NUM       show thread progress report every NUM seconds\n"
579 		   "  -r, --report-latencies   report average latency per command\n"
580 		   "  -R, --rate=NUM           target rate in transactions per second\n"
581 		   "  -s, --scale=NUM          report this scale factor in output\n"
582 		   "  -t, --transactions=NUM   number of transactions each client runs (default: 10)\n"
583 		   "  -T, --time=NUM           duration of benchmark test in seconds\n"
584 		   "  -v, --vacuum-all         vacuum all four standard tables before tests\n"
585 		   "  --aggregate-interval=NUM aggregate data over NUM seconds\n"
586 		   "  --log-prefix=PREFIX      prefix for transaction time log file\n"
587 		   "                           (default: \"pgbench_log\")\n"
588 		   "  --progress-timestamp     use Unix epoch timestamps for progress\n"
589 		   "  --random-seed=SEED       set random seed (\"time\", \"rand\", integer)\n"
590 		   "  --sampling-rate=NUM      fraction of transactions to log (e.g., 0.01 for 1%%)\n"
591 		   "\nCommon options:\n"
592 		   "  -d, --debug              print debugging output\n"
593 		   "  -h, --host=HOSTNAME      database server host or socket directory\n"
594 		   "  -p, --port=PORT          database server port number\n"
595 		   "  -U, --username=USERNAME  connect as specified database user\n"
596 		   "  -V, --version            output version information, then exit\n"
597 		   "  -?, --help               show this help, then exit\n"
598 		   "\n"
599 		   "Report bugs to <pgsql-bugs@postgresql.org>.\n",
600 		   progname, progname);
601 }
602 
603 /* return whether str matches "^\s*[-+]?[0-9]+$" */
604 static bool
is_an_int(const char * str)605 is_an_int(const char *str)
606 {
607 	const char *ptr = str;
608 
609 	/* skip leading spaces; cast is consistent with strtoint64 */
610 	while (*ptr && isspace((unsigned char) *ptr))
611 		ptr++;
612 
613 	/* skip sign */
614 	if (*ptr == '+' || *ptr == '-')
615 		ptr++;
616 
617 	/* at least one digit */
618 	if (*ptr && !isdigit((unsigned char) *ptr))
619 		return false;
620 
621 	/* eat all digits */
622 	while (*ptr && isdigit((unsigned char) *ptr))
623 		ptr++;
624 
625 	/* must have reached end of string */
626 	return *ptr == '\0';
627 }
628 
629 
630 /*
631  * strtoint64 -- convert a string to 64-bit integer
632  *
633  * This function is a modified version of scanint8() from
634  * src/backend/utils/adt/int8.c.
635  */
636 int64
strtoint64(const char * str)637 strtoint64(const char *str)
638 {
639 	const char *ptr = str;
640 	int64		result = 0;
641 	int			sign = 1;
642 
643 	/*
644 	 * Do our own scan, rather than relying on sscanf which might be broken
645 	 * for long long.
646 	 */
647 
648 	/* skip leading spaces */
649 	while (*ptr && isspace((unsigned char) *ptr))
650 		ptr++;
651 
652 	/* handle sign */
653 	if (*ptr == '-')
654 	{
655 		ptr++;
656 
657 		/*
658 		 * Do an explicit check for INT64_MIN.  Ugly though this is, it's
659 		 * cleaner than trying to get the loop below to handle it portably.
660 		 */
661 		if (strncmp(ptr, "9223372036854775808", 19) == 0)
662 		{
663 			result = PG_INT64_MIN;
664 			ptr += 19;
665 			goto gotdigits;
666 		}
667 		sign = -1;
668 	}
669 	else if (*ptr == '+')
670 		ptr++;
671 
672 	/* require at least one digit */
673 	if (!isdigit((unsigned char) *ptr))
674 		fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
675 
676 	/* process digits */
677 	while (*ptr && isdigit((unsigned char) *ptr))
678 	{
679 		int64		tmp = result * 10 + (*ptr++ - '0');
680 
681 		if ((tmp / 10) != result)	/* overflow? */
682 			fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
683 		result = tmp;
684 	}
685 
686 gotdigits:
687 
688 	/* allow trailing whitespace, but not other trailing chars */
689 	while (*ptr != '\0' && isspace((unsigned char) *ptr))
690 		ptr++;
691 
692 	if (*ptr != '\0')
693 		fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
694 
695 	return ((sign < 0) ? -result : result);
696 }
697 
698 /*
699  * Random number generator: uniform distribution from min to max inclusive.
700  *
701  * Although the limits are expressed as int64, you can't generate the full
702  * int64 range in one call, because the difference of the limits mustn't
703  * overflow int64.  In practice it's unwise to ask for more than an int32
704  * range, because of the limited precision of pg_erand48().
705  */
706 static int64
getrand(TState * thread,int64 min,int64 max)707 getrand(TState *thread, int64 min, int64 max)
708 {
709 	/*
710 	 * Odd coding is so that min and max have approximately the same chance of
711 	 * being selected as do numbers between them.
712 	 *
713 	 * pg_erand48() is thread-safe and concurrent, which is why we use it
714 	 * rather than random(), which in glibc is non-reentrant, and therefore
715 	 * protected by a mutex, and therefore a bottleneck on machines with many
716 	 * CPUs.
717 	 */
718 	return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
719 }
720 
721 /*
722  * random number generator: exponential distribution from min to max inclusive.
723  * the parameter is so that the density of probability for the last cut-off max
724  * value is exp(-parameter).
725  */
726 static int64
getExponentialRand(TState * thread,int64 min,int64 max,double parameter)727 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
728 {
729 	double		cut,
730 				uniform,
731 				rand;
732 
733 	/* abort if wrong parameter, but must really be checked beforehand */
734 	Assert(parameter > 0.0);
735 	cut = exp(-parameter);
736 	/* erand in [0, 1), uniform in (0, 1] */
737 	uniform = 1.0 - pg_erand48(thread->random_state);
738 
739 	/*
740 	 * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
741 	 */
742 	Assert((1.0 - cut) != 0.0);
743 	rand = -log(cut + (1.0 - cut) * uniform) / parameter;
744 	/* return int64 random number within between min and max */
745 	return min + (int64) ((max - min + 1) * rand);
746 }
747 
748 /* random number generator: gaussian distribution from min to max inclusive */
749 static int64
getGaussianRand(TState * thread,int64 min,int64 max,double parameter)750 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
751 {
752 	double		stdev;
753 	double		rand;
754 
755 	/* abort if parameter is too low, but must really be checked beforehand */
756 	Assert(parameter >= MIN_GAUSSIAN_PARAM);
757 
758 	/*
759 	 * Get user specified random number from this loop, with -parameter <
760 	 * stdev <= parameter
761 	 *
762 	 * This loop is executed until the number is in the expected range.
763 	 *
764 	 * As the minimum parameter is 2.0, the probability of looping is low:
765 	 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
766 	 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
767 	 * the worst case. For a parameter value of 5.0, the looping probability
768 	 * is about e^{-5} * 2 / pi ~ 0.43%.
769 	 */
770 	do
771 	{
772 		/*
773 		 * pg_erand48 generates [0,1), but for the basic version of the
774 		 * Box-Muller transform the two uniformly distributed random numbers
775 		 * are expected in (0, 1] (see
776 		 * https://en.wikipedia.org/wiki/Box-Muller_transform)
777 		 */
778 		double		rand1 = 1.0 - pg_erand48(thread->random_state);
779 		double		rand2 = 1.0 - pg_erand48(thread->random_state);
780 
781 		/* Box-Muller basic form transform */
782 		double		var_sqrt = sqrt(-2.0 * log(rand1));
783 
784 		stdev = var_sqrt * sin(2.0 * M_PI * rand2);
785 
786 		/*
787 		 * we may try with cos, but there may be a bias induced if the
788 		 * previous value fails the test. To be on the safe side, let us try
789 		 * over.
790 		 */
791 	}
792 	while (stdev < -parameter || stdev >= parameter);
793 
794 	/* stdev is in [-parameter, parameter), normalization to [0,1) */
795 	rand = (stdev + parameter) / (parameter * 2.0);
796 
797 	/* return int64 random number within between min and max */
798 	return min + (int64) ((max - min + 1) * rand);
799 }
800 
801 /*
802  * random number generator: generate a value, such that the series of values
803  * will approximate a Poisson distribution centered on the given value.
804  */
805 static int64
getPoissonRand(TState * thread,int64 center)806 getPoissonRand(TState *thread, int64 center)
807 {
808 	/*
809 	 * Use inverse transform sampling to generate a value > 0, such that the
810 	 * expected (i.e. average) value is the given argument.
811 	 */
812 	double		uniform;
813 
814 	/* erand in [0, 1), uniform in (0, 1] */
815 	uniform = 1.0 - pg_erand48(thread->random_state);
816 
817 	return (int64) (-log(uniform) * ((double) center) + 0.5);
818 }
819 
820 /* helper function for getZipfianRand */
821 static double
generalizedHarmonicNumber(int64 n,double s)822 generalizedHarmonicNumber(int64 n, double s)
823 {
824 	int			i;
825 	double		ans = 0.0;
826 
827 	for (i = n; i > 1; i--)
828 		ans += pow(i, -s);
829 	return ans + 1.0;
830 }
831 
832 /* set harmonicn and other parameters to cache cell */
833 static void
zipfSetCacheCell(ZipfCell * cell,int64 n,double s)834 zipfSetCacheCell(ZipfCell *cell, int64 n, double s)
835 {
836 	double		harmonic2;
837 
838 	cell->n = n;
839 	cell->s = s;
840 
841 	harmonic2 = generalizedHarmonicNumber(2, s);
842 	cell->harmonicn = generalizedHarmonicNumber(n, s);
843 
844 	cell->alpha = 1.0 / (1.0 - s);
845 	cell->beta = pow(0.5, s);
846 	cell->eta = (1.0 - pow(2.0 / n, 1.0 - s)) / (1.0 - harmonic2 / cell->harmonicn);
847 }
848 
849 /*
850  * search for cache cell with keys (n, s)
851  * and create new cell if it does not exist
852  */
853 static ZipfCell *
zipfFindOrCreateCacheCell(ZipfCache * cache,int64 n,double s)854 zipfFindOrCreateCacheCell(ZipfCache *cache, int64 n, double s)
855 {
856 	int			i,
857 				least_recently_used = 0;
858 	ZipfCell   *cell;
859 
860 	/* search cached cell for given parameters */
861 	for (i = 0; i < cache->nb_cells; i++)
862 	{
863 		cell = &cache->cells[i];
864 		if (cell->n == n && cell->s == s)
865 			return &cache->cells[i];
866 
867 		if (cell->last_used < cache->cells[least_recently_used].last_used)
868 			least_recently_used = i;
869 	}
870 
871 	/* create new one if it does not exist */
872 	if (cache->nb_cells < ZIPF_CACHE_SIZE)
873 		i = cache->nb_cells++;
874 	else
875 	{
876 		/* replace LRU cell if cache is full */
877 		i = least_recently_used;
878 		cache->overflowCount++;
879 	}
880 
881 	zipfSetCacheCell(&cache->cells[i], n, s);
882 
883 	cache->cells[i].last_used = cache->current++;
884 	return &cache->cells[i];
885 }
886 
887 /*
888  * Computing zipfian using rejection method, based on
889  * "Non-Uniform Random Variate Generation",
890  * Luc Devroye, p. 550-551, Springer 1986.
891  */
892 static int64
computeIterativeZipfian(TState * thread,int64 n,double s)893 computeIterativeZipfian(TState *thread, int64 n, double s)
894 {
895 	double		b = pow(2.0, s - 1.0);
896 	double		x,
897 				t,
898 				u,
899 				v;
900 
901 	while (true)
902 	{
903 		/* random variates */
904 		u = pg_erand48(thread->random_state);
905 		v = pg_erand48(thread->random_state);
906 
907 		x = floor(pow(u, -1.0 / (s - 1.0)));
908 
909 		t = pow(1.0 + 1.0 / x, s - 1.0);
910 		/* reject if too large or out of bound */
911 		if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
912 			break;
913 	}
914 	return (int64) x;
915 }
916 
917 /*
918  * Computing zipfian using harmonic numbers, based on algorithm described in
919  * "Quickly Generating Billion-Record Synthetic Databases",
920  * Jim Gray et al, SIGMOD 1994
921  */
922 static int64
computeHarmonicZipfian(TState * thread,int64 n,double s)923 computeHarmonicZipfian(TState *thread, int64 n, double s)
924 {
925 	ZipfCell   *cell = zipfFindOrCreateCacheCell(&thread->zipf_cache, n, s);
926 	double		uniform = pg_erand48(thread->random_state);
927 	double		uz = uniform * cell->harmonicn;
928 
929 	if (uz < 1.0)
930 		return 1;
931 	if (uz < 1.0 + cell->beta)
932 		return 2;
933 	return 1 + (int64) (cell->n * pow(cell->eta * uniform - cell->eta + 1.0, cell->alpha));
934 }
935 
936 /* random number generator: zipfian distribution from min to max inclusive */
937 static int64
getZipfianRand(TState * thread,int64 min,int64 max,double s)938 getZipfianRand(TState *thread, int64 min, int64 max, double s)
939 {
940 	int64		n = max - min + 1;
941 
942 	/* abort if parameter is invalid */
943 	Assert(s > 0.0 && s != 1.0 && s <= MAX_ZIPFIAN_PARAM);
944 
945 
946 	return min - 1 + ((s > 1)
947 					  ? computeIterativeZipfian(thread, n, s)
948 					  : computeHarmonicZipfian(thread, n, s));
949 }
950 
951 /*
952  * FNV-1a hash function
953  */
954 static int64
getHashFnv1a(int64 val,uint64 seed)955 getHashFnv1a(int64 val, uint64 seed)
956 {
957 	int64		result;
958 	int			i;
959 
960 	result = FNV_OFFSET_BASIS ^ seed;
961 	for (i = 0; i < 8; ++i)
962 	{
963 		int32		octet = val & 0xff;
964 
965 		val = val >> 8;
966 		result = result ^ octet;
967 		result = result * FNV_PRIME;
968 	}
969 
970 	return result;
971 }
972 
973 /*
974  * Murmur2 hash function
975  *
976  * Based on original work of Austin Appleby
977  * https://github.com/aappleby/smhasher/blob/master/src/MurmurHash2.cpp
978  */
979 static int64
getHashMurmur2(int64 val,uint64 seed)980 getHashMurmur2(int64 val, uint64 seed)
981 {
982 	uint64		result = seed ^ MM2_MUL_TIMES_8;	/* sizeof(int64) */
983 	uint64		k = (uint64) val;
984 
985 	k *= MM2_MUL;
986 	k ^= k >> MM2_ROT;
987 	k *= MM2_MUL;
988 
989 	result ^= k;
990 	result *= MM2_MUL;
991 
992 	result ^= result >> MM2_ROT;
993 	result *= MM2_MUL;
994 	result ^= result >> MM2_ROT;
995 
996 	return (int64) result;
997 }
998 
999 /*
1000  * Initialize the given SimpleStats struct to all zeroes
1001  */
1002 static void
initSimpleStats(SimpleStats * ss)1003 initSimpleStats(SimpleStats *ss)
1004 {
1005 	memset(ss, 0, sizeof(SimpleStats));
1006 }
1007 
1008 /*
1009  * Accumulate one value into a SimpleStats struct.
1010  */
1011 static void
addToSimpleStats(SimpleStats * ss,double val)1012 addToSimpleStats(SimpleStats *ss, double val)
1013 {
1014 	if (ss->count == 0 || val < ss->min)
1015 		ss->min = val;
1016 	if (ss->count == 0 || val > ss->max)
1017 		ss->max = val;
1018 	ss->count++;
1019 	ss->sum += val;
1020 	ss->sum2 += val * val;
1021 }
1022 
1023 /*
1024  * Merge two SimpleStats objects
1025  */
1026 static void
mergeSimpleStats(SimpleStats * acc,SimpleStats * ss)1027 mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
1028 {
1029 	if (acc->count == 0 || ss->min < acc->min)
1030 		acc->min = ss->min;
1031 	if (acc->count == 0 || ss->max > acc->max)
1032 		acc->max = ss->max;
1033 	acc->count += ss->count;
1034 	acc->sum += ss->sum;
1035 	acc->sum2 += ss->sum2;
1036 }
1037 
1038 /*
1039  * Initialize a StatsData struct to mostly zeroes, with its start time set to
1040  * the given value.
1041  */
1042 static void
initStats(StatsData * sd,time_t start_time)1043 initStats(StatsData *sd, time_t start_time)
1044 {
1045 	sd->start_time = start_time;
1046 	sd->cnt = 0;
1047 	sd->skipped = 0;
1048 	initSimpleStats(&sd->latency);
1049 	initSimpleStats(&sd->lag);
1050 }
1051 
1052 /*
1053  * Accumulate one additional item into the given stats object.
1054  */
1055 static void
accumStats(StatsData * stats,bool skipped,double lat,double lag)1056 accumStats(StatsData *stats, bool skipped, double lat, double lag)
1057 {
1058 	stats->cnt++;
1059 
1060 	if (skipped)
1061 	{
1062 		/* no latency to record on skipped transactions */
1063 		stats->skipped++;
1064 	}
1065 	else
1066 	{
1067 		addToSimpleStats(&stats->latency, lat);
1068 
1069 		/* and possibly the same for schedule lag */
1070 		if (throttle_delay)
1071 			addToSimpleStats(&stats->lag, lag);
1072 	}
1073 }
1074 
1075 /* call PQexec() and exit() on failure */
1076 static void
executeStatement(PGconn * con,const char * sql)1077 executeStatement(PGconn *con, const char *sql)
1078 {
1079 	PGresult   *res;
1080 
1081 	res = PQexec(con, sql);
1082 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1083 	{
1084 		fprintf(stderr, "%s", PQerrorMessage(con));
1085 		exit(1);
1086 	}
1087 	PQclear(res);
1088 }
1089 
1090 /* call PQexec() and complain, but without exiting, on failure */
1091 static void
tryExecuteStatement(PGconn * con,const char * sql)1092 tryExecuteStatement(PGconn *con, const char *sql)
1093 {
1094 	PGresult   *res;
1095 
1096 	res = PQexec(con, sql);
1097 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1098 	{
1099 		fprintf(stderr, "%s", PQerrorMessage(con));
1100 		fprintf(stderr, "(ignoring this error and continuing anyway)\n");
1101 	}
1102 	PQclear(res);
1103 }
1104 
1105 /* set up a connection to the backend */
1106 static PGconn *
doConnect(void)1107 doConnect(void)
1108 {
1109 	PGconn	   *conn;
1110 	bool		new_pass;
1111 	static bool have_password = false;
1112 	static char password[100];
1113 
1114 	/*
1115 	 * Start the connection.  Loop until we have a password if requested by
1116 	 * backend.
1117 	 */
1118 	do
1119 	{
1120 #define PARAMS_ARRAY_SIZE	7
1121 
1122 		const char *keywords[PARAMS_ARRAY_SIZE];
1123 		const char *values[PARAMS_ARRAY_SIZE];
1124 
1125 		keywords[0] = "host";
1126 		values[0] = pghost;
1127 		keywords[1] = "port";
1128 		values[1] = pgport;
1129 		keywords[2] = "user";
1130 		values[2] = login;
1131 		keywords[3] = "password";
1132 		values[3] = have_password ? password : NULL;
1133 		keywords[4] = "dbname";
1134 		values[4] = dbName;
1135 		keywords[5] = "fallback_application_name";
1136 		values[5] = progname;
1137 		keywords[6] = NULL;
1138 		values[6] = NULL;
1139 
1140 		new_pass = false;
1141 
1142 		conn = PQconnectdbParams(keywords, values, true);
1143 
1144 		if (!conn)
1145 		{
1146 			fprintf(stderr, "connection to database \"%s\" failed\n",
1147 					dbName);
1148 			return NULL;
1149 		}
1150 
1151 		if (PQstatus(conn) == CONNECTION_BAD &&
1152 			PQconnectionNeedsPassword(conn) &&
1153 			!have_password)
1154 		{
1155 			PQfinish(conn);
1156 			simple_prompt("Password: ", password, sizeof(password), false);
1157 			have_password = true;
1158 			new_pass = true;
1159 		}
1160 	} while (new_pass);
1161 
1162 	/* check to see that the backend connection was successfully made */
1163 	if (PQstatus(conn) == CONNECTION_BAD)
1164 	{
1165 		fprintf(stderr, "connection to database \"%s\" failed:\n%s",
1166 				PQdb(conn), PQerrorMessage(conn));
1167 		PQfinish(conn);
1168 		return NULL;
1169 	}
1170 
1171 	return conn;
1172 }
1173 
1174 /* throw away response from backend */
1175 static void
discard_response(CState * state)1176 discard_response(CState *state)
1177 {
1178 	PGresult   *res;
1179 
1180 	do
1181 	{
1182 		res = PQgetResult(state->con);
1183 		if (res)
1184 			PQclear(res);
1185 	} while (res);
1186 }
1187 
1188 /* qsort comparator for Variable array */
1189 static int
compareVariableNames(const void * v1,const void * v2)1190 compareVariableNames(const void *v1, const void *v2)
1191 {
1192 	return strcmp(((const Variable *) v1)->name,
1193 				  ((const Variable *) v2)->name);
1194 }
1195 
1196 /* Locate a variable by name; returns NULL if unknown */
1197 static Variable *
lookupVariable(CState * st,char * name)1198 lookupVariable(CState *st, char *name)
1199 {
1200 	Variable	key;
1201 
1202 	/* On some versions of Solaris, bsearch of zero items dumps core */
1203 	if (st->nvariables <= 0)
1204 		return NULL;
1205 
1206 	/* Sort if we have to */
1207 	if (!st->vars_sorted)
1208 	{
1209 		qsort((void *) st->variables, st->nvariables, sizeof(Variable),
1210 			  compareVariableNames);
1211 		st->vars_sorted = true;
1212 	}
1213 
1214 	/* Now we can search */
1215 	key.name = name;
1216 	return (Variable *) bsearch((void *) &key,
1217 								(void *) st->variables,
1218 								st->nvariables,
1219 								sizeof(Variable),
1220 								compareVariableNames);
1221 }
1222 
1223 /* Get the value of a variable, in string form; returns NULL if unknown */
1224 static char *
getVariable(CState * st,char * name)1225 getVariable(CState *st, char *name)
1226 {
1227 	Variable   *var;
1228 	char		stringform[64];
1229 
1230 	var = lookupVariable(st, name);
1231 	if (var == NULL)
1232 		return NULL;			/* not found */
1233 
1234 	if (var->svalue)
1235 		return var->svalue;		/* we have it in string form */
1236 
1237 	/* We need to produce a string equivalent of the value */
1238 	Assert(var->value.type != PGBT_NO_VALUE);
1239 	if (var->value.type == PGBT_NULL)
1240 		snprintf(stringform, sizeof(stringform), "NULL");
1241 	else if (var->value.type == PGBT_BOOLEAN)
1242 		snprintf(stringform, sizeof(stringform),
1243 				 "%s", var->value.u.bval ? "true" : "false");
1244 	else if (var->value.type == PGBT_INT)
1245 		snprintf(stringform, sizeof(stringform),
1246 				 INT64_FORMAT, var->value.u.ival);
1247 	else if (var->value.type == PGBT_DOUBLE)
1248 		snprintf(stringform, sizeof(stringform),
1249 				 "%.*g", DBL_DIG, var->value.u.dval);
1250 	else						/* internal error, unexpected type */
1251 		Assert(0);
1252 	var->svalue = pg_strdup(stringform);
1253 	return var->svalue;
1254 }
1255 
1256 /* Try to convert variable to a value; return false on failure */
1257 static bool
makeVariableValue(Variable * var)1258 makeVariableValue(Variable *var)
1259 {
1260 	size_t		slen;
1261 
1262 	if (var->value.type != PGBT_NO_VALUE)
1263 		return true;			/* no work */
1264 
1265 	slen = strlen(var->svalue);
1266 
1267 	if (slen == 0)
1268 		/* what should it do on ""? */
1269 		return false;
1270 
1271 	if (pg_strcasecmp(var->svalue, "null") == 0)
1272 	{
1273 		setNullValue(&var->value);
1274 	}
1275 
1276 	/*
1277 	 * accept prefixes such as y, ye, n, no... but not for "o". 0/1 are
1278 	 * recognized later as an int, which is converted to bool if needed.
1279 	 */
1280 	else if (pg_strncasecmp(var->svalue, "true", slen) == 0 ||
1281 			 pg_strncasecmp(var->svalue, "yes", slen) == 0 ||
1282 			 pg_strcasecmp(var->svalue, "on") == 0)
1283 	{
1284 		setBoolValue(&var->value, true);
1285 	}
1286 	else if (pg_strncasecmp(var->svalue, "false", slen) == 0 ||
1287 			 pg_strncasecmp(var->svalue, "no", slen) == 0 ||
1288 			 pg_strcasecmp(var->svalue, "off") == 0 ||
1289 			 pg_strcasecmp(var->svalue, "of") == 0)
1290 	{
1291 		setBoolValue(&var->value, false);
1292 	}
1293 	else if (is_an_int(var->svalue))
1294 	{
1295 		setIntValue(&var->value, strtoint64(var->svalue));
1296 	}
1297 	else						/* type should be double */
1298 	{
1299 		double		dv;
1300 		char		xs;
1301 
1302 		if (sscanf(var->svalue, "%lf%c", &dv, &xs) != 1)
1303 		{
1304 			fprintf(stderr,
1305 					"malformed variable \"%s\" value: \"%s\"\n",
1306 					var->name, var->svalue);
1307 			return false;
1308 		}
1309 		setDoubleValue(&var->value, dv);
1310 	}
1311 	return true;
1312 }
1313 
1314 /*
1315  * Check whether a variable's name is allowed.
1316  *
1317  * We allow any non-ASCII character, as well as ASCII letters, digits, and
1318  * underscore.
1319  *
1320  * Keep this in sync with the definitions of variable name characters in
1321  * "src/fe_utils/psqlscan.l", "src/bin/psql/psqlscanslash.l" and
1322  * "src/bin/pgbench/exprscan.l".  Also see parseVariable(), below.
1323  *
1324  * Note: this static function is copied from "src/bin/psql/variables.c"
1325  * but changed to disallow variable names starting with a digit.
1326  */
1327 static bool
valid_variable_name(const char * name)1328 valid_variable_name(const char *name)
1329 {
1330 	const unsigned char *ptr = (const unsigned char *) name;
1331 
1332 	/* Mustn't be zero-length */
1333 	if (*ptr == '\0')
1334 		return false;
1335 
1336 	/* must not start with [0-9] */
1337 	if (IS_HIGHBIT_SET(*ptr) ||
1338 		strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1339 			   "_", *ptr) != NULL)
1340 		ptr++;
1341 	else
1342 		return false;
1343 
1344 	/* remaining characters can include [0-9] */
1345 	while (*ptr)
1346 	{
1347 		if (IS_HIGHBIT_SET(*ptr) ||
1348 			strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1349 				   "_0123456789", *ptr) != NULL)
1350 			ptr++;
1351 		else
1352 			return false;
1353 	}
1354 
1355 	return true;
1356 }
1357 
1358 /*
1359  * Lookup a variable by name, creating it if need be.
1360  * Caller is expected to assign a value to the variable.
1361  * Returns NULL on failure (bad name).
1362  */
1363 static Variable *
lookupCreateVariable(CState * st,const char * context,char * name)1364 lookupCreateVariable(CState *st, const char *context, char *name)
1365 {
1366 	Variable   *var;
1367 
1368 	var = lookupVariable(st, name);
1369 	if (var == NULL)
1370 	{
1371 		Variable   *newvars;
1372 
1373 		/*
1374 		 * Check for the name only when declaring a new variable to avoid
1375 		 * overhead.
1376 		 */
1377 		if (!valid_variable_name(name))
1378 		{
1379 			fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
1380 					context, name);
1381 			return NULL;
1382 		}
1383 
1384 		/* Create variable at the end of the array */
1385 		if (st->variables)
1386 			newvars = (Variable *) pg_realloc(st->variables,
1387 											  (st->nvariables + 1) * sizeof(Variable));
1388 		else
1389 			newvars = (Variable *) pg_malloc(sizeof(Variable));
1390 
1391 		st->variables = newvars;
1392 
1393 		var = &newvars[st->nvariables];
1394 
1395 		var->name = pg_strdup(name);
1396 		var->svalue = NULL;
1397 		/* caller is expected to initialize remaining fields */
1398 
1399 		st->nvariables++;
1400 		/* we don't re-sort the array till we have to */
1401 		st->vars_sorted = false;
1402 	}
1403 
1404 	return var;
1405 }
1406 
1407 /* Assign a string value to a variable, creating it if need be */
1408 /* Returns false on failure (bad name) */
1409 static bool
putVariable(CState * st,const char * context,char * name,const char * value)1410 putVariable(CState *st, const char *context, char *name, const char *value)
1411 {
1412 	Variable   *var;
1413 	char	   *val;
1414 
1415 	var = lookupCreateVariable(st, context, name);
1416 	if (!var)
1417 		return false;
1418 
1419 	/* dup then free, in case value is pointing at this variable */
1420 	val = pg_strdup(value);
1421 
1422 	if (var->svalue)
1423 		free(var->svalue);
1424 	var->svalue = val;
1425 	var->value.type = PGBT_NO_VALUE;
1426 
1427 	return true;
1428 }
1429 
1430 /* Assign a value to a variable, creating it if need be */
1431 /* Returns false on failure (bad name) */
1432 static bool
putVariableValue(CState * st,const char * context,char * name,const PgBenchValue * value)1433 putVariableValue(CState *st, const char *context, char *name,
1434 				 const PgBenchValue *value)
1435 {
1436 	Variable   *var;
1437 
1438 	var = lookupCreateVariable(st, context, name);
1439 	if (!var)
1440 		return false;
1441 
1442 	if (var->svalue)
1443 		free(var->svalue);
1444 	var->svalue = NULL;
1445 	var->value = *value;
1446 
1447 	return true;
1448 }
1449 
1450 /* Assign an integer value to a variable, creating it if need be */
1451 /* Returns false on failure (bad name) */
1452 static bool
putVariableInt(CState * st,const char * context,char * name,int64 value)1453 putVariableInt(CState *st, const char *context, char *name, int64 value)
1454 {
1455 	PgBenchValue val;
1456 
1457 	setIntValue(&val, value);
1458 	return putVariableValue(st, context, name, &val);
1459 }
1460 
1461 /*
1462  * Parse a possible variable reference (:varname).
1463  *
1464  * "sql" points at a colon.  If what follows it looks like a valid
1465  * variable name, return a malloc'd string containing the variable name,
1466  * and set *eaten to the number of characters consumed (including the colon).
1467  * Otherwise, return NULL.
1468  */
1469 static char *
parseVariable(const char * sql,int * eaten)1470 parseVariable(const char *sql, int *eaten)
1471 {
1472 	int			i = 1;			/* starting at 1 skips the colon */
1473 	char	   *name;
1474 
1475 	/* keep this logic in sync with valid_variable_name() */
1476 	if (IS_HIGHBIT_SET(sql[i]) ||
1477 		strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1478 			   "_", sql[i]) != NULL)
1479 		i++;
1480 	else
1481 		return NULL;
1482 
1483 	while (IS_HIGHBIT_SET(sql[i]) ||
1484 		   strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1485 				  "_0123456789", sql[i]) != NULL)
1486 		i++;
1487 
1488 	name = pg_malloc(i);
1489 	memcpy(name, &sql[1], i - 1);
1490 	name[i - 1] = '\0';
1491 
1492 	*eaten = i;
1493 	return name;
1494 }
1495 
1496 static char *
replaceVariable(char ** sql,char * param,int len,char * value)1497 replaceVariable(char **sql, char *param, int len, char *value)
1498 {
1499 	int			valueln = strlen(value);
1500 
1501 	if (valueln > len)
1502 	{
1503 		size_t		offset = param - *sql;
1504 
1505 		*sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1506 		param = *sql + offset;
1507 	}
1508 
1509 	if (valueln != len)
1510 		memmove(param + valueln, param + len, strlen(param + len) + 1);
1511 	memcpy(param, value, valueln);
1512 
1513 	return param + valueln;
1514 }
1515 
1516 static char *
assignVariables(CState * st,char * sql)1517 assignVariables(CState *st, char *sql)
1518 {
1519 	char	   *p,
1520 			   *name,
1521 			   *val;
1522 
1523 	p = sql;
1524 	while ((p = strchr(p, ':')) != NULL)
1525 	{
1526 		int			eaten;
1527 
1528 		name = parseVariable(p, &eaten);
1529 		if (name == NULL)
1530 		{
1531 			while (*p == ':')
1532 			{
1533 				p++;
1534 			}
1535 			continue;
1536 		}
1537 
1538 		val = getVariable(st, name);
1539 		free(name);
1540 		if (val == NULL)
1541 		{
1542 			p++;
1543 			continue;
1544 		}
1545 
1546 		p = replaceVariable(&sql, p, eaten, val);
1547 	}
1548 
1549 	return sql;
1550 }
1551 
1552 static void
getQueryParams(CState * st,const Command * command,const char ** params)1553 getQueryParams(CState *st, const Command *command, const char **params)
1554 {
1555 	int			i;
1556 
1557 	for (i = 0; i < command->argc - 1; i++)
1558 		params[i] = getVariable(st, command->argv[i + 1]);
1559 }
1560 
1561 static char *
valueTypeName(PgBenchValue * pval)1562 valueTypeName(PgBenchValue *pval)
1563 {
1564 	if (pval->type == PGBT_NO_VALUE)
1565 		return "none";
1566 	else if (pval->type == PGBT_NULL)
1567 		return "null";
1568 	else if (pval->type == PGBT_INT)
1569 		return "int";
1570 	else if (pval->type == PGBT_DOUBLE)
1571 		return "double";
1572 	else if (pval->type == PGBT_BOOLEAN)
1573 		return "boolean";
1574 	else
1575 	{
1576 		/* internal error, should never get there */
1577 		Assert(false);
1578 		return NULL;
1579 	}
1580 }
1581 
1582 /* get a value as a boolean, or tell if there is a problem */
1583 static bool
coerceToBool(PgBenchValue * pval,bool * bval)1584 coerceToBool(PgBenchValue *pval, bool *bval)
1585 {
1586 	if (pval->type == PGBT_BOOLEAN)
1587 	{
1588 		*bval = pval->u.bval;
1589 		return true;
1590 	}
1591 	else						/* NULL, INT or DOUBLE */
1592 	{
1593 		fprintf(stderr, "cannot coerce %s to boolean\n", valueTypeName(pval));
1594 		*bval = false;			/* suppress uninitialized-variable warnings */
1595 		return false;
1596 	}
1597 }
1598 
1599 /*
1600  * Return true or false from an expression for conditional purposes.
1601  * Non zero numerical values are true, zero and NULL are false.
1602  */
1603 static bool
valueTruth(PgBenchValue * pval)1604 valueTruth(PgBenchValue *pval)
1605 {
1606 	switch (pval->type)
1607 	{
1608 		case PGBT_NULL:
1609 			return false;
1610 		case PGBT_BOOLEAN:
1611 			return pval->u.bval;
1612 		case PGBT_INT:
1613 			return pval->u.ival != 0;
1614 		case PGBT_DOUBLE:
1615 			return pval->u.dval != 0.0;
1616 		default:
1617 			/* internal error, unexpected type */
1618 			Assert(0);
1619 			return false;
1620 	}
1621 }
1622 
1623 /* get a value as an int, tell if there is a problem */
1624 static bool
coerceToInt(PgBenchValue * pval,int64 * ival)1625 coerceToInt(PgBenchValue *pval, int64 *ival)
1626 {
1627 	if (pval->type == PGBT_INT)
1628 	{
1629 		*ival = pval->u.ival;
1630 		return true;
1631 	}
1632 	else if (pval->type == PGBT_DOUBLE)
1633 	{
1634 		double		dval = rint(pval->u.dval);
1635 
1636 		if (isnan(dval) || !FLOAT8_FITS_IN_INT64(dval))
1637 		{
1638 			fprintf(stderr, "double to int overflow for %f\n", dval);
1639 			return false;
1640 		}
1641 		*ival = (int64) dval;
1642 		return true;
1643 	}
1644 	else						/* BOOLEAN or NULL */
1645 	{
1646 		fprintf(stderr, "cannot coerce %s to int\n", valueTypeName(pval));
1647 		return false;
1648 	}
1649 }
1650 
1651 /* get a value as a double, or tell if there is a problem */
1652 static bool
coerceToDouble(PgBenchValue * pval,double * dval)1653 coerceToDouble(PgBenchValue *pval, double *dval)
1654 {
1655 	if (pval->type == PGBT_DOUBLE)
1656 	{
1657 		*dval = pval->u.dval;
1658 		return true;
1659 	}
1660 	else if (pval->type == PGBT_INT)
1661 	{
1662 		*dval = (double) pval->u.ival;
1663 		return true;
1664 	}
1665 	else						/* BOOLEAN or NULL */
1666 	{
1667 		fprintf(stderr, "cannot coerce %s to double\n", valueTypeName(pval));
1668 		return false;
1669 	}
1670 }
1671 
1672 /* assign a null value */
1673 static void
setNullValue(PgBenchValue * pv)1674 setNullValue(PgBenchValue *pv)
1675 {
1676 	pv->type = PGBT_NULL;
1677 	pv->u.ival = 0;
1678 }
1679 
1680 /* assign a boolean value */
1681 static void
setBoolValue(PgBenchValue * pv,bool bval)1682 setBoolValue(PgBenchValue *pv, bool bval)
1683 {
1684 	pv->type = PGBT_BOOLEAN;
1685 	pv->u.bval = bval;
1686 }
1687 
1688 /* assign an integer value */
1689 static void
setIntValue(PgBenchValue * pv,int64 ival)1690 setIntValue(PgBenchValue *pv, int64 ival)
1691 {
1692 	pv->type = PGBT_INT;
1693 	pv->u.ival = ival;
1694 }
1695 
1696 /* assign a double value */
1697 static void
setDoubleValue(PgBenchValue * pv,double dval)1698 setDoubleValue(PgBenchValue *pv, double dval)
1699 {
1700 	pv->type = PGBT_DOUBLE;
1701 	pv->u.dval = dval;
1702 }
1703 
1704 static bool
isLazyFunc(PgBenchFunction func)1705 isLazyFunc(PgBenchFunction func)
1706 {
1707 	return func == PGBENCH_AND || func == PGBENCH_OR || func == PGBENCH_CASE;
1708 }
1709 
1710 /* lazy evaluation of some functions */
1711 static bool
evalLazyFunc(TState * thread,CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)1712 evalLazyFunc(TState *thread, CState *st,
1713 			 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
1714 {
1715 	PgBenchValue a1,
1716 				a2;
1717 	bool		ba1,
1718 				ba2;
1719 
1720 	Assert(isLazyFunc(func) && args != NULL && args->next != NULL);
1721 
1722 	/* args points to first condition */
1723 	if (!evaluateExpr(thread, st, args->expr, &a1))
1724 		return false;
1725 
1726 	/* second condition for AND/OR and corresponding branch for CASE */
1727 	args = args->next;
1728 
1729 	switch (func)
1730 	{
1731 		case PGBENCH_AND:
1732 			if (a1.type == PGBT_NULL)
1733 			{
1734 				setNullValue(retval);
1735 				return true;
1736 			}
1737 
1738 			if (!coerceToBool(&a1, &ba1))
1739 				return false;
1740 
1741 			if (!ba1)
1742 			{
1743 				setBoolValue(retval, false);
1744 				return true;
1745 			}
1746 
1747 			if (!evaluateExpr(thread, st, args->expr, &a2))
1748 				return false;
1749 
1750 			if (a2.type == PGBT_NULL)
1751 			{
1752 				setNullValue(retval);
1753 				return true;
1754 			}
1755 			else if (!coerceToBool(&a2, &ba2))
1756 				return false;
1757 			else
1758 			{
1759 				setBoolValue(retval, ba2);
1760 				return true;
1761 			}
1762 
1763 			return true;
1764 
1765 		case PGBENCH_OR:
1766 
1767 			if (a1.type == PGBT_NULL)
1768 			{
1769 				setNullValue(retval);
1770 				return true;
1771 			}
1772 
1773 			if (!coerceToBool(&a1, &ba1))
1774 				return false;
1775 
1776 			if (ba1)
1777 			{
1778 				setBoolValue(retval, true);
1779 				return true;
1780 			}
1781 
1782 			if (!evaluateExpr(thread, st, args->expr, &a2))
1783 				return false;
1784 
1785 			if (a2.type == PGBT_NULL)
1786 			{
1787 				setNullValue(retval);
1788 				return true;
1789 			}
1790 			else if (!coerceToBool(&a2, &ba2))
1791 				return false;
1792 			else
1793 			{
1794 				setBoolValue(retval, ba2);
1795 				return true;
1796 			}
1797 
1798 		case PGBENCH_CASE:
1799 			/* when true, execute branch */
1800 			if (valueTruth(&a1))
1801 				return evaluateExpr(thread, st, args->expr, retval);
1802 
1803 			/* now args contains next condition or final else expression */
1804 			args = args->next;
1805 
1806 			/* final else case? */
1807 			if (args->next == NULL)
1808 				return evaluateExpr(thread, st, args->expr, retval);
1809 
1810 			/* no, another when, proceed */
1811 			return evalLazyFunc(thread, st, PGBENCH_CASE, args, retval);
1812 
1813 		default:
1814 			/* internal error, cannot get here */
1815 			Assert(0);
1816 			break;
1817 	}
1818 	return false;
1819 }
1820 
1821 /* maximum number of function arguments */
1822 #define MAX_FARGS 16
1823 
1824 /*
1825  * Recursive evaluation of standard functions,
1826  * which do not require lazy evaluation.
1827  */
1828 static bool
evalStandardFunc(TState * thread,CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)1829 evalStandardFunc(TState *thread, CState *st,
1830 				 PgBenchFunction func, PgBenchExprLink *args,
1831 				 PgBenchValue *retval)
1832 {
1833 	/* evaluate all function arguments */
1834 	int			nargs = 0;
1835 	PgBenchValue vargs[MAX_FARGS];
1836 	PgBenchExprLink *l = args;
1837 	bool		has_null = false;
1838 
1839 	for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
1840 	{
1841 		if (!evaluateExpr(thread, st, l->expr, &vargs[nargs]))
1842 			return false;
1843 		has_null |= vargs[nargs].type == PGBT_NULL;
1844 	}
1845 
1846 	if (l != NULL)
1847 	{
1848 		fprintf(stderr,
1849 				"too many function arguments, maximum is %d\n", MAX_FARGS);
1850 		return false;
1851 	}
1852 
1853 	/* NULL arguments */
1854 	if (has_null && func != PGBENCH_IS && func != PGBENCH_DEBUG)
1855 	{
1856 		setNullValue(retval);
1857 		return true;
1858 	}
1859 
1860 	/* then evaluate function */
1861 	switch (func)
1862 	{
1863 			/* overloaded operators */
1864 		case PGBENCH_ADD:
1865 		case PGBENCH_SUB:
1866 		case PGBENCH_MUL:
1867 		case PGBENCH_DIV:
1868 		case PGBENCH_MOD:
1869 		case PGBENCH_EQ:
1870 		case PGBENCH_NE:
1871 		case PGBENCH_LE:
1872 		case PGBENCH_LT:
1873 			{
1874 				PgBenchValue *lval = &vargs[0],
1875 						   *rval = &vargs[1];
1876 
1877 				Assert(nargs == 2);
1878 
1879 				/* overloaded type management, double if some double */
1880 				if ((lval->type == PGBT_DOUBLE ||
1881 					 rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
1882 				{
1883 					double		ld,
1884 								rd;
1885 
1886 					if (!coerceToDouble(lval, &ld) ||
1887 						!coerceToDouble(rval, &rd))
1888 						return false;
1889 
1890 					switch (func)
1891 					{
1892 						case PGBENCH_ADD:
1893 							setDoubleValue(retval, ld + rd);
1894 							return true;
1895 
1896 						case PGBENCH_SUB:
1897 							setDoubleValue(retval, ld - rd);
1898 							return true;
1899 
1900 						case PGBENCH_MUL:
1901 							setDoubleValue(retval, ld * rd);
1902 							return true;
1903 
1904 						case PGBENCH_DIV:
1905 							setDoubleValue(retval, ld / rd);
1906 							return true;
1907 
1908 						case PGBENCH_EQ:
1909 							setBoolValue(retval, ld == rd);
1910 							return true;
1911 
1912 						case PGBENCH_NE:
1913 							setBoolValue(retval, ld != rd);
1914 							return true;
1915 
1916 						case PGBENCH_LE:
1917 							setBoolValue(retval, ld <= rd);
1918 							return true;
1919 
1920 						case PGBENCH_LT:
1921 							setBoolValue(retval, ld < rd);
1922 							return true;
1923 
1924 						default:
1925 							/* cannot get here */
1926 							Assert(0);
1927 					}
1928 				}
1929 				else			/* we have integer operands, or % */
1930 				{
1931 					int64		li,
1932 								ri;
1933 
1934 					if (!coerceToInt(lval, &li) ||
1935 						!coerceToInt(rval, &ri))
1936 						return false;
1937 
1938 					switch (func)
1939 					{
1940 						case PGBENCH_ADD:
1941 							setIntValue(retval, li + ri);
1942 							return true;
1943 
1944 						case PGBENCH_SUB:
1945 							setIntValue(retval, li - ri);
1946 							return true;
1947 
1948 						case PGBENCH_MUL:
1949 							setIntValue(retval, li * ri);
1950 							return true;
1951 
1952 						case PGBENCH_EQ:
1953 							setBoolValue(retval, li == ri);
1954 							return true;
1955 
1956 						case PGBENCH_NE:
1957 							setBoolValue(retval, li != ri);
1958 							return true;
1959 
1960 						case PGBENCH_LE:
1961 							setBoolValue(retval, li <= ri);
1962 							return true;
1963 
1964 						case PGBENCH_LT:
1965 							setBoolValue(retval, li < ri);
1966 							return true;
1967 
1968 						case PGBENCH_DIV:
1969 						case PGBENCH_MOD:
1970 							if (ri == 0)
1971 							{
1972 								fprintf(stderr, "division by zero\n");
1973 								return false;
1974 							}
1975 							/* special handling of -1 divisor */
1976 							if (ri == -1)
1977 							{
1978 								if (func == PGBENCH_DIV)
1979 								{
1980 									/* overflow check (needed for INT64_MIN) */
1981 									if (li == PG_INT64_MIN)
1982 									{
1983 										fprintf(stderr, "bigint out of range\n");
1984 										return false;
1985 									}
1986 									else
1987 										setIntValue(retval, -li);
1988 								}
1989 								else
1990 									setIntValue(retval, 0);
1991 								return true;
1992 							}
1993 							/* else divisor is not -1 */
1994 							if (func == PGBENCH_DIV)
1995 								setIntValue(retval, li / ri);
1996 							else	/* func == PGBENCH_MOD */
1997 								setIntValue(retval, li % ri);
1998 
1999 							return true;
2000 
2001 						default:
2002 							/* cannot get here */
2003 							Assert(0);
2004 					}
2005 				}
2006 
2007 				Assert(0);
2008 				return false;	/* NOTREACHED */
2009 			}
2010 
2011 			/* integer bitwise operators */
2012 		case PGBENCH_BITAND:
2013 		case PGBENCH_BITOR:
2014 		case PGBENCH_BITXOR:
2015 		case PGBENCH_LSHIFT:
2016 		case PGBENCH_RSHIFT:
2017 			{
2018 				int64		li,
2019 							ri;
2020 
2021 				if (!coerceToInt(&vargs[0], &li) || !coerceToInt(&vargs[1], &ri))
2022 					return false;
2023 
2024 				if (func == PGBENCH_BITAND)
2025 					setIntValue(retval, li & ri);
2026 				else if (func == PGBENCH_BITOR)
2027 					setIntValue(retval, li | ri);
2028 				else if (func == PGBENCH_BITXOR)
2029 					setIntValue(retval, li ^ ri);
2030 				else if (func == PGBENCH_LSHIFT)
2031 					setIntValue(retval, li << ri);
2032 				else if (func == PGBENCH_RSHIFT)
2033 					setIntValue(retval, li >> ri);
2034 				else			/* cannot get here */
2035 					Assert(0);
2036 
2037 				return true;
2038 			}
2039 
2040 			/* logical operators */
2041 		case PGBENCH_NOT:
2042 			{
2043 				bool		b;
2044 
2045 				if (!coerceToBool(&vargs[0], &b))
2046 					return false;
2047 
2048 				setBoolValue(retval, !b);
2049 				return true;
2050 			}
2051 
2052 			/* no arguments */
2053 		case PGBENCH_PI:
2054 			setDoubleValue(retval, M_PI);
2055 			return true;
2056 
2057 			/* 1 overloaded argument */
2058 		case PGBENCH_ABS:
2059 			{
2060 				PgBenchValue *varg = &vargs[0];
2061 
2062 				Assert(nargs == 1);
2063 
2064 				if (varg->type == PGBT_INT)
2065 				{
2066 					int64		i = varg->u.ival;
2067 
2068 					setIntValue(retval, i < 0 ? -i : i);
2069 				}
2070 				else
2071 				{
2072 					double		d = varg->u.dval;
2073 
2074 					Assert(varg->type == PGBT_DOUBLE);
2075 					setDoubleValue(retval, d < 0.0 ? -d : d);
2076 				}
2077 
2078 				return true;
2079 			}
2080 
2081 		case PGBENCH_DEBUG:
2082 			{
2083 				PgBenchValue *varg = &vargs[0];
2084 
2085 				Assert(nargs == 1);
2086 
2087 				fprintf(stderr, "debug(script=%d,command=%d): ",
2088 						st->use_file, st->command + 1);
2089 
2090 				if (varg->type == PGBT_NULL)
2091 					fprintf(stderr, "null\n");
2092 				else if (varg->type == PGBT_BOOLEAN)
2093 					fprintf(stderr, "boolean %s\n", varg->u.bval ? "true" : "false");
2094 				else if (varg->type == PGBT_INT)
2095 					fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
2096 				else if (varg->type == PGBT_DOUBLE)
2097 					fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
2098 				else			/* internal error, unexpected type */
2099 					Assert(0);
2100 
2101 				*retval = *varg;
2102 
2103 				return true;
2104 			}
2105 
2106 			/* 1 double argument */
2107 		case PGBENCH_DOUBLE:
2108 		case PGBENCH_SQRT:
2109 		case PGBENCH_LN:
2110 		case PGBENCH_EXP:
2111 			{
2112 				double		dval;
2113 
2114 				Assert(nargs == 1);
2115 
2116 				if (!coerceToDouble(&vargs[0], &dval))
2117 					return false;
2118 
2119 				if (func == PGBENCH_SQRT)
2120 					dval = sqrt(dval);
2121 				else if (func == PGBENCH_LN)
2122 					dval = log(dval);
2123 				else if (func == PGBENCH_EXP)
2124 					dval = exp(dval);
2125 				/* else is cast: do nothing */
2126 
2127 				setDoubleValue(retval, dval);
2128 				return true;
2129 			}
2130 
2131 			/* 1 int argument */
2132 		case PGBENCH_INT:
2133 			{
2134 				int64		ival;
2135 
2136 				Assert(nargs == 1);
2137 
2138 				if (!coerceToInt(&vargs[0], &ival))
2139 					return false;
2140 
2141 				setIntValue(retval, ival);
2142 				return true;
2143 			}
2144 
2145 			/* variable number of arguments */
2146 		case PGBENCH_LEAST:
2147 		case PGBENCH_GREATEST:
2148 			{
2149 				bool		havedouble;
2150 				int			i;
2151 
2152 				Assert(nargs >= 1);
2153 
2154 				/* need double result if any input is double */
2155 				havedouble = false;
2156 				for (i = 0; i < nargs; i++)
2157 				{
2158 					if (vargs[i].type == PGBT_DOUBLE)
2159 					{
2160 						havedouble = true;
2161 						break;
2162 					}
2163 				}
2164 				if (havedouble)
2165 				{
2166 					double		extremum;
2167 
2168 					if (!coerceToDouble(&vargs[0], &extremum))
2169 						return false;
2170 					for (i = 1; i < nargs; i++)
2171 					{
2172 						double		dval;
2173 
2174 						if (!coerceToDouble(&vargs[i], &dval))
2175 							return false;
2176 						if (func == PGBENCH_LEAST)
2177 							extremum = Min(extremum, dval);
2178 						else
2179 							extremum = Max(extremum, dval);
2180 					}
2181 					setDoubleValue(retval, extremum);
2182 				}
2183 				else
2184 				{
2185 					int64		extremum;
2186 
2187 					if (!coerceToInt(&vargs[0], &extremum))
2188 						return false;
2189 					for (i = 1; i < nargs; i++)
2190 					{
2191 						int64		ival;
2192 
2193 						if (!coerceToInt(&vargs[i], &ival))
2194 							return false;
2195 						if (func == PGBENCH_LEAST)
2196 							extremum = Min(extremum, ival);
2197 						else
2198 							extremum = Max(extremum, ival);
2199 					}
2200 					setIntValue(retval, extremum);
2201 				}
2202 				return true;
2203 			}
2204 
2205 			/* random functions */
2206 		case PGBENCH_RANDOM:
2207 		case PGBENCH_RANDOM_EXPONENTIAL:
2208 		case PGBENCH_RANDOM_GAUSSIAN:
2209 		case PGBENCH_RANDOM_ZIPFIAN:
2210 			{
2211 				int64		imin,
2212 							imax;
2213 
2214 				Assert(nargs >= 2);
2215 
2216 				if (!coerceToInt(&vargs[0], &imin) ||
2217 					!coerceToInt(&vargs[1], &imax))
2218 					return false;
2219 
2220 				/* check random range */
2221 				if (imin > imax)
2222 				{
2223 					fprintf(stderr, "empty range given to random\n");
2224 					return false;
2225 				}
2226 				else if (imax - imin < 0 || (imax - imin) + 1 < 0)
2227 				{
2228 					/* prevent int overflows in random functions */
2229 					fprintf(stderr, "random range is too large\n");
2230 					return false;
2231 				}
2232 
2233 				if (func == PGBENCH_RANDOM)
2234 				{
2235 					Assert(nargs == 2);
2236 					setIntValue(retval, getrand(thread, imin, imax));
2237 				}
2238 				else			/* gaussian & exponential */
2239 				{
2240 					double		param;
2241 
2242 					Assert(nargs == 3);
2243 
2244 					if (!coerceToDouble(&vargs[2], &param))
2245 						return false;
2246 
2247 					if (func == PGBENCH_RANDOM_GAUSSIAN)
2248 					{
2249 						if (param < MIN_GAUSSIAN_PARAM)
2250 						{
2251 							fprintf(stderr,
2252 									"gaussian parameter must be at least %f "
2253 									"(not %f)\n", MIN_GAUSSIAN_PARAM, param);
2254 							return false;
2255 						}
2256 
2257 						setIntValue(retval,
2258 									getGaussianRand(thread, imin, imax, param));
2259 					}
2260 					else if (func == PGBENCH_RANDOM_ZIPFIAN)
2261 					{
2262 						if (param <= 0.0 || param == 1.0 || param > MAX_ZIPFIAN_PARAM)
2263 						{
2264 							fprintf(stderr,
2265 									"zipfian parameter must be in range (0, 1) U (1, %d]"
2266 									" (got %f)\n", MAX_ZIPFIAN_PARAM, param);
2267 							return false;
2268 						}
2269 						setIntValue(retval,
2270 									getZipfianRand(thread, imin, imax, param));
2271 					}
2272 					else		/* exponential */
2273 					{
2274 						if (param <= 0.0)
2275 						{
2276 							fprintf(stderr,
2277 									"exponential parameter must be greater than zero"
2278 									" (got %f)\n", param);
2279 							return false;
2280 						}
2281 
2282 						setIntValue(retval,
2283 									getExponentialRand(thread, imin, imax, param));
2284 					}
2285 				}
2286 
2287 				return true;
2288 			}
2289 
2290 		case PGBENCH_POW:
2291 			{
2292 				PgBenchValue *lval = &vargs[0];
2293 				PgBenchValue *rval = &vargs[1];
2294 				double		ld,
2295 							rd;
2296 
2297 				Assert(nargs == 2);
2298 
2299 				if (!coerceToDouble(lval, &ld) ||
2300 					!coerceToDouble(rval, &rd))
2301 					return false;
2302 
2303 				setDoubleValue(retval, pow(ld, rd));
2304 
2305 				return true;
2306 			}
2307 
2308 		case PGBENCH_IS:
2309 			{
2310 				Assert(nargs == 2);
2311 
2312 				/*
2313 				 * note: this simple implementation is more permissive than
2314 				 * SQL
2315 				 */
2316 				setBoolValue(retval,
2317 							 vargs[0].type == vargs[1].type &&
2318 							 vargs[0].u.bval == vargs[1].u.bval);
2319 				return true;
2320 			}
2321 
2322 			/* hashing */
2323 		case PGBENCH_HASH_FNV1A:
2324 		case PGBENCH_HASH_MURMUR2:
2325 			{
2326 				int64		val,
2327 							seed;
2328 
2329 				Assert(nargs == 2);
2330 
2331 				if (!coerceToInt(&vargs[0], &val) ||
2332 					!coerceToInt(&vargs[1], &seed))
2333 					return false;
2334 
2335 				if (func == PGBENCH_HASH_MURMUR2)
2336 					setIntValue(retval, getHashMurmur2(val, seed));
2337 				else if (func == PGBENCH_HASH_FNV1A)
2338 					setIntValue(retval, getHashFnv1a(val, seed));
2339 				else
2340 					/* cannot get here */
2341 					Assert(0);
2342 
2343 				return true;
2344 			}
2345 
2346 		default:
2347 			/* cannot get here */
2348 			Assert(0);
2349 			/* dead code to avoid a compiler warning */
2350 			return false;
2351 	}
2352 }
2353 
2354 /* evaluate some function */
2355 static bool
evalFunc(TState * thread,CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)2356 evalFunc(TState *thread, CState *st,
2357 		 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
2358 {
2359 	if (isLazyFunc(func))
2360 		return evalLazyFunc(thread, st, func, args, retval);
2361 	else
2362 		return evalStandardFunc(thread, st, func, args, retval);
2363 }
2364 
2365 /*
2366  * Recursive evaluation of an expression in a pgbench script
2367  * using the current state of variables.
2368  * Returns whether the evaluation was ok,
2369  * the value itself is returned through the retval pointer.
2370  */
2371 static bool
evaluateExpr(TState * thread,CState * st,PgBenchExpr * expr,PgBenchValue * retval)2372 evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr, PgBenchValue *retval)
2373 {
2374 	switch (expr->etype)
2375 	{
2376 		case ENODE_CONSTANT:
2377 			{
2378 				*retval = expr->u.constant;
2379 				return true;
2380 			}
2381 
2382 		case ENODE_VARIABLE:
2383 			{
2384 				Variable   *var;
2385 
2386 				if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
2387 				{
2388 					fprintf(stderr, "undefined variable \"%s\"\n",
2389 							expr->u.variable.varname);
2390 					return false;
2391 				}
2392 
2393 				if (!makeVariableValue(var))
2394 					return false;
2395 
2396 				*retval = var->value;
2397 				return true;
2398 			}
2399 
2400 		case ENODE_FUNCTION:
2401 			return evalFunc(thread, st,
2402 							expr->u.function.function,
2403 							expr->u.function.args,
2404 							retval);
2405 
2406 		default:
2407 			/* internal error which should never occur */
2408 			fprintf(stderr, "unexpected enode type in evaluation: %d\n",
2409 					expr->etype);
2410 			exit(1);
2411 	}
2412 }
2413 
2414 /*
2415  * Convert command name to meta-command enum identifier
2416  */
2417 static MetaCommand
getMetaCommand(const char * cmd)2418 getMetaCommand(const char *cmd)
2419 {
2420 	MetaCommand mc;
2421 
2422 	if (cmd == NULL)
2423 		mc = META_NONE;
2424 	else if (pg_strcasecmp(cmd, "set") == 0)
2425 		mc = META_SET;
2426 	else if (pg_strcasecmp(cmd, "setshell") == 0)
2427 		mc = META_SETSHELL;
2428 	else if (pg_strcasecmp(cmd, "shell") == 0)
2429 		mc = META_SHELL;
2430 	else if (pg_strcasecmp(cmd, "sleep") == 0)
2431 		mc = META_SLEEP;
2432 	else if (pg_strcasecmp(cmd, "if") == 0)
2433 		mc = META_IF;
2434 	else if (pg_strcasecmp(cmd, "elif") == 0)
2435 		mc = META_ELIF;
2436 	else if (pg_strcasecmp(cmd, "else") == 0)
2437 		mc = META_ELSE;
2438 	else if (pg_strcasecmp(cmd, "endif") == 0)
2439 		mc = META_ENDIF;
2440 	else
2441 		mc = META_NONE;
2442 	return mc;
2443 }
2444 
2445 /*
2446  * Run a shell command. The result is assigned to the variable if not NULL.
2447  * Return true if succeeded, or false on error.
2448  */
2449 static bool
runShellCommand(CState * st,char * variable,char ** argv,int argc)2450 runShellCommand(CState *st, char *variable, char **argv, int argc)
2451 {
2452 	char		command[SHELL_COMMAND_SIZE];
2453 	int			i,
2454 				len = 0;
2455 	FILE	   *fp;
2456 	char		res[64];
2457 	char	   *endptr;
2458 	int			retval;
2459 
2460 	/*----------
2461 	 * Join arguments with whitespace separators. Arguments starting with
2462 	 * exactly one colon are treated as variables:
2463 	 *	name - append a string "name"
2464 	 *	:var - append a variable named 'var'
2465 	 *	::name - append a string ":name"
2466 	 *----------
2467 	 */
2468 	for (i = 0; i < argc; i++)
2469 	{
2470 		char	   *arg;
2471 		int			arglen;
2472 
2473 		if (argv[i][0] != ':')
2474 		{
2475 			arg = argv[i];		/* a string literal */
2476 		}
2477 		else if (argv[i][1] == ':')
2478 		{
2479 			arg = argv[i] + 1;	/* a string literal starting with colons */
2480 		}
2481 		else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
2482 		{
2483 			fprintf(stderr, "%s: undefined variable \"%s\"\n",
2484 					argv[0], argv[i]);
2485 			return false;
2486 		}
2487 
2488 		arglen = strlen(arg);
2489 		if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
2490 		{
2491 			fprintf(stderr, "%s: shell command is too long\n", argv[0]);
2492 			return false;
2493 		}
2494 
2495 		if (i > 0)
2496 			command[len++] = ' ';
2497 		memcpy(command + len, arg, arglen);
2498 		len += arglen;
2499 	}
2500 
2501 	command[len] = '\0';
2502 
2503 	/* Fast path for non-assignment case */
2504 	if (variable == NULL)
2505 	{
2506 		if (system(command))
2507 		{
2508 			if (!timer_exceeded)
2509 				fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2510 			return false;
2511 		}
2512 		return true;
2513 	}
2514 
2515 	/* Execute the command with pipe and read the standard output. */
2516 	if ((fp = popen(command, "r")) == NULL)
2517 	{
2518 		fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2519 		return false;
2520 	}
2521 	if (fgets(res, sizeof(res), fp) == NULL)
2522 	{
2523 		if (!timer_exceeded)
2524 			fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
2525 		(void) pclose(fp);
2526 		return false;
2527 	}
2528 	if (pclose(fp) < 0)
2529 	{
2530 		fprintf(stderr, "%s: could not close shell command\n", argv[0]);
2531 		return false;
2532 	}
2533 
2534 	/* Check whether the result is an integer and assign it to the variable */
2535 	retval = (int) strtol(res, &endptr, 10);
2536 	while (*endptr != '\0' && isspace((unsigned char) *endptr))
2537 		endptr++;
2538 	if (*res == '\0' || *endptr != '\0')
2539 	{
2540 		fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
2541 				argv[0], res);
2542 		return false;
2543 	}
2544 	if (!putVariableInt(st, "setshell", variable, retval))
2545 		return false;
2546 
2547 #ifdef DEBUG
2548 	printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
2549 #endif
2550 	return true;
2551 }
2552 
2553 #define MAX_PREPARE_NAME		32
2554 static void
preparedStatementName(char * buffer,int file,int state)2555 preparedStatementName(char *buffer, int file, int state)
2556 {
2557 	sprintf(buffer, "P%d_%d", file, state);
2558 }
2559 
2560 static void
commandFailed(CState * st,const char * cmd,const char * message)2561 commandFailed(CState *st, const char *cmd, const char *message)
2562 {
2563 	fprintf(stderr,
2564 			"client %d aborted in command %d (%s) of script %d; %s\n",
2565 			st->id, st->command, cmd, st->use_file, message);
2566 }
2567 
2568 /* return a script number with a weighted choice. */
2569 static int
chooseScript(TState * thread)2570 chooseScript(TState *thread)
2571 {
2572 	int			i = 0;
2573 	int64		w;
2574 
2575 	if (num_scripts == 1)
2576 		return 0;
2577 
2578 	w = getrand(thread, 0, total_weight - 1);
2579 	do
2580 	{
2581 		w -= sql_script[i++].weight;
2582 	} while (w >= 0);
2583 
2584 	return i - 1;
2585 }
2586 
2587 /* Send a SQL command, using the chosen querymode */
2588 static bool
sendCommand(CState * st,Command * command)2589 sendCommand(CState *st, Command *command)
2590 {
2591 	int			r;
2592 
2593 	if (querymode == QUERY_SIMPLE)
2594 	{
2595 		char	   *sql;
2596 
2597 		sql = pg_strdup(command->argv[0]);
2598 		sql = assignVariables(st, sql);
2599 
2600 		if (debug)
2601 			fprintf(stderr, "client %d sending %s\n", st->id, sql);
2602 		r = PQsendQuery(st->con, sql);
2603 		free(sql);
2604 	}
2605 	else if (querymode == QUERY_EXTENDED)
2606 	{
2607 		const char *sql = command->argv[0];
2608 		const char *params[MAX_ARGS];
2609 
2610 		getQueryParams(st, command, params);
2611 
2612 		if (debug)
2613 			fprintf(stderr, "client %d sending %s\n", st->id, sql);
2614 		r = PQsendQueryParams(st->con, sql, command->argc - 1,
2615 							  NULL, params, NULL, NULL, 0);
2616 	}
2617 	else if (querymode == QUERY_PREPARED)
2618 	{
2619 		char		name[MAX_PREPARE_NAME];
2620 		const char *params[MAX_ARGS];
2621 
2622 		if (!st->prepared[st->use_file])
2623 		{
2624 			int			j;
2625 			Command   **commands = sql_script[st->use_file].commands;
2626 
2627 			for (j = 0; commands[j] != NULL; j++)
2628 			{
2629 				PGresult   *res;
2630 				char		name[MAX_PREPARE_NAME];
2631 
2632 				if (commands[j]->type != SQL_COMMAND)
2633 					continue;
2634 				preparedStatementName(name, st->use_file, j);
2635 				res = PQprepare(st->con, name,
2636 								commands[j]->argv[0], commands[j]->argc - 1, NULL);
2637 				if (PQresultStatus(res) != PGRES_COMMAND_OK)
2638 					fprintf(stderr, "%s", PQerrorMessage(st->con));
2639 				PQclear(res);
2640 			}
2641 			st->prepared[st->use_file] = true;
2642 		}
2643 
2644 		getQueryParams(st, command, params);
2645 		preparedStatementName(name, st->use_file, st->command);
2646 
2647 		if (debug)
2648 			fprintf(stderr, "client %d sending %s\n", st->id, name);
2649 		r = PQsendQueryPrepared(st->con, name, command->argc - 1,
2650 								params, NULL, NULL, 0);
2651 	}
2652 	else						/* unknown sql mode */
2653 		r = 0;
2654 
2655 	if (r == 0)
2656 	{
2657 		if (debug)
2658 			fprintf(stderr, "client %d could not send %s\n",
2659 					st->id, command->argv[0]);
2660 		st->ecnt++;
2661 		return false;
2662 	}
2663 	else
2664 		return true;
2665 }
2666 
2667 /*
2668  * Parse the argument to a \sleep command, and return the requested amount
2669  * of delay, in microseconds.  Returns true on success, false on error.
2670  */
2671 static bool
evaluateSleep(CState * st,int argc,char ** argv,int * usecs)2672 evaluateSleep(CState *st, int argc, char **argv, int *usecs)
2673 {
2674 	char	   *var;
2675 	int			usec;
2676 
2677 	if (*argv[1] == ':')
2678 	{
2679 		if ((var = getVariable(st, argv[1] + 1)) == NULL)
2680 		{
2681 			fprintf(stderr, "%s: undefined variable \"%s\"\n",
2682 					argv[0], argv[1]);
2683 			return false;
2684 		}
2685 		usec = atoi(var);
2686 	}
2687 	else
2688 		usec = atoi(argv[1]);
2689 
2690 	if (argc > 2)
2691 	{
2692 		if (pg_strcasecmp(argv[2], "ms") == 0)
2693 			usec *= 1000;
2694 		else if (pg_strcasecmp(argv[2], "s") == 0)
2695 			usec *= 1000000;
2696 	}
2697 	else
2698 		usec *= 1000000;
2699 
2700 	*usecs = usec;
2701 	return true;
2702 }
2703 
2704 /*
2705  * Advance the state machine of a connection, if possible.
2706  */
2707 static void
doCustom(TState * thread,CState * st,StatsData * agg)2708 doCustom(TState *thread, CState *st, StatsData *agg)
2709 {
2710 	PGresult   *res;
2711 	Command    *command;
2712 	instr_time	now;
2713 	bool		end_tx_processed = false;
2714 	int64		wait;
2715 
2716 	/*
2717 	 * gettimeofday() isn't free, so we get the current timestamp lazily the
2718 	 * first time it's needed, and reuse the same value throughout this
2719 	 * function after that.  This also ensures that e.g. the calculated
2720 	 * latency reported in the log file and in the totals are the same. Zero
2721 	 * means "not set yet".  Reset "now" when we execute shell commands or
2722 	 * expressions, which might take a non-negligible amount of time, though.
2723 	 */
2724 	INSTR_TIME_SET_ZERO(now);
2725 
2726 	/*
2727 	 * Loop in the state machine, until we have to wait for a result from the
2728 	 * server (or have to sleep, for throttling or for \sleep).
2729 	 *
2730 	 * Note: In the switch-statement below, 'break' will loop back here,
2731 	 * meaning "continue in the state machine".  Return is used to return to
2732 	 * the caller.
2733 	 */
2734 	for (;;)
2735 	{
2736 		switch (st->state)
2737 		{
2738 				/*
2739 				 * Select transaction to run.
2740 				 */
2741 			case CSTATE_CHOOSE_SCRIPT:
2742 
2743 				st->use_file = chooseScript(thread);
2744 
2745 				if (debug)
2746 					fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
2747 							sql_script[st->use_file].desc);
2748 
2749 				if (throttle_delay > 0)
2750 					st->state = CSTATE_START_THROTTLE;
2751 				else
2752 					st->state = CSTATE_START_TX;
2753 				/* check consistency */
2754 				Assert(conditional_stack_empty(st->cstack));
2755 				break;
2756 
2757 				/*
2758 				 * Handle throttling once per transaction by sleeping.
2759 				 */
2760 			case CSTATE_START_THROTTLE:
2761 
2762 				/*
2763 				 * Generate a delay such that the series of delays will
2764 				 * approximate a Poisson distribution centered on the
2765 				 * throttle_delay time.
2766 				 *
2767 				 * If transactions are too slow or a given wait is shorter
2768 				 * than a transaction, the next transaction will start right
2769 				 * away.
2770 				 */
2771 				Assert(throttle_delay > 0);
2772 				wait = getPoissonRand(thread, throttle_delay);
2773 
2774 				thread->throttle_trigger += wait;
2775 				st->txn_scheduled = thread->throttle_trigger;
2776 
2777 				/*
2778 				 * stop client if next transaction is beyond pgbench end of
2779 				 * execution
2780 				 */
2781 				if (duration > 0 && st->txn_scheduled > end_time)
2782 				{
2783 					st->state = CSTATE_FINISHED;
2784 					break;
2785 				}
2786 
2787 				/*
2788 				 * If --latency-limit is used, and this slot is already late
2789 				 * so that the transaction will miss the latency limit even if
2790 				 * it completed immediately, we skip this time slot and
2791 				 * iterate till the next slot that isn't late yet.  But don't
2792 				 * iterate beyond the -t limit, if one is given.
2793 				 */
2794 				if (latency_limit)
2795 				{
2796 					int64		now_us;
2797 
2798 					if (INSTR_TIME_IS_ZERO(now))
2799 						INSTR_TIME_SET_CURRENT(now);
2800 					now_us = INSTR_TIME_GET_MICROSEC(now);
2801 					while (thread->throttle_trigger < now_us - latency_limit &&
2802 						   (nxacts <= 0 || st->cnt < nxacts))
2803 					{
2804 						processXactStats(thread, st, &now, true, agg);
2805 						/* next rendez-vous */
2806 						wait = getPoissonRand(thread, throttle_delay);
2807 						thread->throttle_trigger += wait;
2808 						st->txn_scheduled = thread->throttle_trigger;
2809 					}
2810 					/* stop client if -t exceeded */
2811 					if (nxacts > 0 && st->cnt >= nxacts)
2812 					{
2813 						st->state = CSTATE_FINISHED;
2814 						break;
2815 					}
2816 				}
2817 
2818 				st->state = CSTATE_THROTTLE;
2819 				if (debug)
2820 					fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
2821 							st->id, wait);
2822 				break;
2823 
2824 				/*
2825 				 * Wait until it's time to start next transaction.
2826 				 */
2827 			case CSTATE_THROTTLE:
2828 				if (INSTR_TIME_IS_ZERO(now))
2829 					INSTR_TIME_SET_CURRENT(now);
2830 				if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
2831 					return;		/* Still sleeping, nothing to do here */
2832 
2833 				/* Else done sleeping, start the transaction */
2834 				st->state = CSTATE_START_TX;
2835 				break;
2836 
2837 				/* Start new transaction */
2838 			case CSTATE_START_TX:
2839 
2840 				/*
2841 				 * Establish connection on first call, or if is_connect is
2842 				 * true.
2843 				 */
2844 				if (st->con == NULL)
2845 				{
2846 					instr_time	start;
2847 
2848 					if (INSTR_TIME_IS_ZERO(now))
2849 						INSTR_TIME_SET_CURRENT(now);
2850 					start = now;
2851 					if ((st->con = doConnect()) == NULL)
2852 					{
2853 						fprintf(stderr, "client %d aborted while establishing connection\n",
2854 								st->id);
2855 						st->state = CSTATE_ABORTED;
2856 						break;
2857 					}
2858 					INSTR_TIME_SET_CURRENT(now);
2859 					INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
2860 
2861 					/* Reset session-local state */
2862 					memset(st->prepared, 0, sizeof(st->prepared));
2863 				}
2864 
2865 				/*
2866 				 * Record transaction start time under logging, progress or
2867 				 * throttling.
2868 				 */
2869 				if (use_log || progress || throttle_delay || latency_limit ||
2870 					per_script_stats)
2871 				{
2872 					if (INSTR_TIME_IS_ZERO(now))
2873 						INSTR_TIME_SET_CURRENT(now);
2874 					st->txn_begin = now;
2875 
2876 					/*
2877 					 * When not throttling, this is also the transaction's
2878 					 * scheduled start time.
2879 					 */
2880 					if (!throttle_delay)
2881 						st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
2882 				}
2883 
2884 				/* Begin with the first command */
2885 				st->command = 0;
2886 				st->state = CSTATE_START_COMMAND;
2887 				break;
2888 
2889 				/*
2890 				 * Send a command to server (or execute a meta-command)
2891 				 */
2892 			case CSTATE_START_COMMAND:
2893 				command = sql_script[st->use_file].commands[st->command];
2894 
2895 				/*
2896 				 * If we reached the end of the script, move to end-of-xact
2897 				 * processing.
2898 				 */
2899 				if (command == NULL)
2900 				{
2901 					st->state = CSTATE_END_TX;
2902 					break;
2903 				}
2904 
2905 				/*
2906 				 * Record statement start time if per-command latencies are
2907 				 * requested
2908 				 */
2909 				if (is_latencies)
2910 				{
2911 					if (INSTR_TIME_IS_ZERO(now))
2912 						INSTR_TIME_SET_CURRENT(now);
2913 					st->stmt_begin = now;
2914 				}
2915 
2916 				if (command->type == SQL_COMMAND)
2917 				{
2918 					if (!sendCommand(st, command))
2919 					{
2920 						commandFailed(st, "SQL", "SQL command send failed");
2921 						st->state = CSTATE_ABORTED;
2922 					}
2923 					else
2924 						st->state = CSTATE_WAIT_RESULT;
2925 				}
2926 				else if (command->type == META_COMMAND)
2927 				{
2928 					int			argc = command->argc,
2929 								i;
2930 					char	  **argv = command->argv;
2931 
2932 					if (debug)
2933 					{
2934 						fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
2935 						for (i = 1; i < argc; i++)
2936 							fprintf(stderr, " %s", argv[i]);
2937 						fprintf(stderr, "\n");
2938 					}
2939 
2940 					if (command->meta == META_SLEEP)
2941 					{
2942 						/*
2943 						 * A \sleep doesn't execute anything, we just get the
2944 						 * delay from the argument, and enter the CSTATE_SLEEP
2945 						 * state.  (The per-command latency will be recorded
2946 						 * in CSTATE_SLEEP state, not here, after the delay
2947 						 * has elapsed.)
2948 						 */
2949 						int			usec;
2950 
2951 						if (!evaluateSleep(st, argc, argv, &usec))
2952 						{
2953 							commandFailed(st, "sleep", "execution of meta-command failed");
2954 							st->state = CSTATE_ABORTED;
2955 							break;
2956 						}
2957 
2958 						if (INSTR_TIME_IS_ZERO(now))
2959 							INSTR_TIME_SET_CURRENT(now);
2960 						st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
2961 						st->state = CSTATE_SLEEP;
2962 						break;
2963 					}
2964 					else if (command->meta == META_SET ||
2965 							 command->meta == META_IF ||
2966 							 command->meta == META_ELIF)
2967 					{
2968 						/* backslash commands with an expression to evaluate */
2969 						PgBenchExpr *expr = command->expr;
2970 						PgBenchValue result;
2971 
2972 						if (command->meta == META_ELIF &&
2973 							conditional_stack_peek(st->cstack) == IFSTATE_TRUE)
2974 						{
2975 							/*
2976 							 * elif after executed block, skip eval and wait
2977 							 * for endif
2978 							 */
2979 							conditional_stack_poke(st->cstack, IFSTATE_IGNORED);
2980 							goto move_to_end_command;
2981 						}
2982 
2983 						if (!evaluateExpr(thread, st, expr, &result))
2984 						{
2985 							commandFailed(st, argv[0], "evaluation of meta-command failed");
2986 							st->state = CSTATE_ABORTED;
2987 							break;
2988 						}
2989 
2990 						if (command->meta == META_SET)
2991 						{
2992 							if (!putVariableValue(st, argv[0], argv[1], &result))
2993 							{
2994 								commandFailed(st, "set", "assignment of meta-command failed");
2995 								st->state = CSTATE_ABORTED;
2996 								break;
2997 							}
2998 						}
2999 						else	/* if and elif evaluated cases */
3000 						{
3001 							bool		cond = valueTruth(&result);
3002 
3003 							/* execute or not depending on evaluated condition */
3004 							if (command->meta == META_IF)
3005 							{
3006 								conditional_stack_push(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
3007 							}
3008 							else	/* elif */
3009 							{
3010 								/*
3011 								 * we should get here only if the "elif"
3012 								 * needed evaluation
3013 								 */
3014 								Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
3015 								conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
3016 							}
3017 						}
3018 					}
3019 					else if (command->meta == META_ELSE)
3020 					{
3021 						switch (conditional_stack_peek(st->cstack))
3022 						{
3023 							case IFSTATE_TRUE:
3024 								conditional_stack_poke(st->cstack, IFSTATE_ELSE_FALSE);
3025 								break;
3026 							case IFSTATE_FALSE: /* inconsistent if active */
3027 							case IFSTATE_IGNORED:	/* inconsistent if active */
3028 							case IFSTATE_NONE:	/* else without if */
3029 							case IFSTATE_ELSE_TRUE: /* else after else */
3030 							case IFSTATE_ELSE_FALSE:	/* else after else */
3031 							default:
3032 								/* dead code if conditional check is ok */
3033 								Assert(false);
3034 						}
3035 						goto move_to_end_command;
3036 					}
3037 					else if (command->meta == META_ENDIF)
3038 					{
3039 						Assert(!conditional_stack_empty(st->cstack));
3040 						conditional_stack_pop(st->cstack);
3041 						goto move_to_end_command;
3042 					}
3043 					else if (command->meta == META_SETSHELL)
3044 					{
3045 						bool		ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
3046 
3047 						if (timer_exceeded) /* timeout */
3048 						{
3049 							st->state = CSTATE_FINISHED;
3050 							break;
3051 						}
3052 						else if (!ret)	/* on error */
3053 						{
3054 							commandFailed(st, "setshell", "execution of meta-command failed");
3055 							st->state = CSTATE_ABORTED;
3056 							break;
3057 						}
3058 						else
3059 						{
3060 							/* succeeded */
3061 						}
3062 					}
3063 					else if (command->meta == META_SHELL)
3064 					{
3065 						bool		ret = runShellCommand(st, NULL, argv + 1, argc - 1);
3066 
3067 						if (timer_exceeded) /* timeout */
3068 						{
3069 							st->state = CSTATE_FINISHED;
3070 							break;
3071 						}
3072 						else if (!ret)	/* on error */
3073 						{
3074 							commandFailed(st, "shell", "execution of meta-command failed");
3075 							st->state = CSTATE_ABORTED;
3076 							break;
3077 						}
3078 						else
3079 						{
3080 							/* succeeded */
3081 						}
3082 					}
3083 
3084 			move_to_end_command:
3085 
3086 					/*
3087 					 * executing the expression or shell command might take a
3088 					 * non-negligible amount of time, so reset 'now'
3089 					 */
3090 					INSTR_TIME_SET_ZERO(now);
3091 
3092 					st->state = CSTATE_END_COMMAND;
3093 				}
3094 				break;
3095 
3096 				/*
3097 				 * non executed conditional branch
3098 				 */
3099 			case CSTATE_SKIP_COMMAND:
3100 				Assert(!conditional_active(st->cstack));
3101 				/* quickly skip commands until something to do... */
3102 				while (true)
3103 				{
3104 					command = sql_script[st->use_file].commands[st->command];
3105 
3106 					/* cannot reach end of script in that state */
3107 					Assert(command != NULL);
3108 
3109 					/*
3110 					 * if this is conditional related, update conditional
3111 					 * state
3112 					 */
3113 					if (command->type == META_COMMAND &&
3114 						(command->meta == META_IF ||
3115 						 command->meta == META_ELIF ||
3116 						 command->meta == META_ELSE ||
3117 						 command->meta == META_ENDIF))
3118 					{
3119 						switch (conditional_stack_peek(st->cstack))
3120 						{
3121 							case IFSTATE_FALSE:
3122 								if (command->meta == META_IF || command->meta == META_ELIF)
3123 								{
3124 									/* we must evaluate the condition */
3125 									st->state = CSTATE_START_COMMAND;
3126 								}
3127 								else if (command->meta == META_ELSE)
3128 								{
3129 									/* we must execute next command */
3130 									conditional_stack_poke(st->cstack, IFSTATE_ELSE_TRUE);
3131 									st->state = CSTATE_START_COMMAND;
3132 									st->command++;
3133 								}
3134 								else if (command->meta == META_ENDIF)
3135 								{
3136 									Assert(!conditional_stack_empty(st->cstack));
3137 									conditional_stack_pop(st->cstack);
3138 									if (conditional_active(st->cstack))
3139 										st->state = CSTATE_START_COMMAND;
3140 
3141 									/*
3142 									 * else state remains in
3143 									 * CSTATE_SKIP_COMMAND
3144 									 */
3145 									st->command++;
3146 								}
3147 								break;
3148 
3149 							case IFSTATE_IGNORED:
3150 							case IFSTATE_ELSE_FALSE:
3151 								if (command->meta == META_IF)
3152 									conditional_stack_push(st->cstack, IFSTATE_IGNORED);
3153 								else if (command->meta == META_ENDIF)
3154 								{
3155 									Assert(!conditional_stack_empty(st->cstack));
3156 									conditional_stack_pop(st->cstack);
3157 									if (conditional_active(st->cstack))
3158 										st->state = CSTATE_START_COMMAND;
3159 								}
3160 								/* could detect "else" & "elif" after "else" */
3161 								st->command++;
3162 								break;
3163 
3164 							case IFSTATE_NONE:
3165 							case IFSTATE_TRUE:
3166 							case IFSTATE_ELSE_TRUE:
3167 							default:
3168 
3169 								/*
3170 								 * inconsistent if inactive, unreachable dead
3171 								 * code
3172 								 */
3173 								Assert(false);
3174 						}
3175 					}
3176 					else
3177 					{
3178 						/* skip and consider next */
3179 						st->command++;
3180 					}
3181 
3182 					if (st->state != CSTATE_SKIP_COMMAND)
3183 						break;
3184 				}
3185 				break;
3186 
3187 				/*
3188 				 * Wait for the current SQL command to complete
3189 				 */
3190 			case CSTATE_WAIT_RESULT:
3191 				command = sql_script[st->use_file].commands[st->command];
3192 				if (debug)
3193 					fprintf(stderr, "client %d receiving\n", st->id);
3194 				if (!PQconsumeInput(st->con))
3195 				{				/* there's something wrong */
3196 					commandFailed(st, "SQL", "perhaps the backend died while processing");
3197 					st->state = CSTATE_ABORTED;
3198 					break;
3199 				}
3200 				if (PQisBusy(st->con))
3201 					return;		/* don't have the whole result yet */
3202 
3203 				/*
3204 				 * Read and discard the query result;
3205 				 */
3206 				res = PQgetResult(st->con);
3207 				switch (PQresultStatus(res))
3208 				{
3209 					case PGRES_COMMAND_OK:
3210 					case PGRES_TUPLES_OK:
3211 					case PGRES_EMPTY_QUERY:
3212 						/* OK */
3213 						PQclear(res);
3214 						discard_response(st);
3215 						st->state = CSTATE_END_COMMAND;
3216 						break;
3217 					default:
3218 						commandFailed(st, "SQL", PQerrorMessage(st->con));
3219 						PQclear(res);
3220 						st->state = CSTATE_ABORTED;
3221 						break;
3222 				}
3223 				break;
3224 
3225 				/*
3226 				 * Wait until sleep is done. This state is entered after a
3227 				 * \sleep metacommand. The behavior is similar to
3228 				 * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
3229 				 * instead of CSTATE_START_TX.
3230 				 */
3231 			case CSTATE_SLEEP:
3232 				if (INSTR_TIME_IS_ZERO(now))
3233 					INSTR_TIME_SET_CURRENT(now);
3234 				if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
3235 					return;		/* Still sleeping, nothing to do here */
3236 				/* Else done sleeping. */
3237 				st->state = CSTATE_END_COMMAND;
3238 				break;
3239 
3240 				/*
3241 				 * End of command: record stats and proceed to next command.
3242 				 */
3243 			case CSTATE_END_COMMAND:
3244 
3245 				/*
3246 				 * command completed: accumulate per-command execution times
3247 				 * in thread-local data structure, if per-command latencies
3248 				 * are requested.
3249 				 */
3250 				if (is_latencies)
3251 				{
3252 					if (INSTR_TIME_IS_ZERO(now))
3253 						INSTR_TIME_SET_CURRENT(now);
3254 
3255 					/* XXX could use a mutex here, but we choose not to */
3256 					command = sql_script[st->use_file].commands[st->command];
3257 					addToSimpleStats(&command->stats,
3258 									 INSTR_TIME_GET_DOUBLE(now) -
3259 									 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
3260 				}
3261 
3262 				/* Go ahead with next command, to be executed or skipped */
3263 				st->command++;
3264 				st->state = conditional_active(st->cstack) ?
3265 					CSTATE_START_COMMAND : CSTATE_SKIP_COMMAND;
3266 				break;
3267 
3268 				/*
3269 				 * End of transaction.
3270 				 */
3271 			case CSTATE_END_TX:
3272 
3273 				/* transaction finished: calculate latency and do log */
3274 				processXactStats(thread, st, &now, false, agg);
3275 
3276 				/* conditional stack must be empty */
3277 				if (!conditional_stack_empty(st->cstack))
3278 				{
3279 					fprintf(stderr, "end of script reached within a conditional, missing \\endif\n");
3280 					exit(1);
3281 				}
3282 
3283 				if (is_connect)
3284 				{
3285 					finishCon(st);
3286 					INSTR_TIME_SET_ZERO(now);
3287 				}
3288 
3289 				if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
3290 				{
3291 					/* exit success */
3292 					st->state = CSTATE_FINISHED;
3293 					break;
3294 				}
3295 
3296 				/*
3297 				 * No transaction is underway anymore.
3298 				 */
3299 				st->state = CSTATE_CHOOSE_SCRIPT;
3300 
3301 				/*
3302 				 * If we paced through all commands in the script in this
3303 				 * loop, without returning to the caller even once, do it now.
3304 				 * This gives the thread a chance to process other
3305 				 * connections, and to do progress reporting.  This can
3306 				 * currently only happen if the script consists entirely of
3307 				 * meta-commands.
3308 				 */
3309 				if (end_tx_processed)
3310 					return;
3311 				else
3312 				{
3313 					end_tx_processed = true;
3314 					break;
3315 				}
3316 
3317 				/*
3318 				 * Final states.  Close the connection if it's still open.
3319 				 */
3320 			case CSTATE_ABORTED:
3321 			case CSTATE_FINISHED:
3322 				finishCon(st);
3323 				return;
3324 		}
3325 	}
3326 }
3327 
3328 /*
3329  * Print log entry after completing one transaction.
3330  *
3331  * We print Unix-epoch timestamps in the log, so that entries can be
3332  * correlated against other logs.  On some platforms this could be obtained
3333  * from the instr_time reading the caller has, but rather than get entangled
3334  * with that, we just eat the cost of an extra syscall in all cases.
3335  */
3336 static void
doLog(TState * thread,CState * st,StatsData * agg,bool skipped,double latency,double lag)3337 doLog(TState *thread, CState *st,
3338 	  StatsData *agg, bool skipped, double latency, double lag)
3339 {
3340 	FILE	   *logfile = thread->logfile;
3341 
3342 	Assert(use_log);
3343 
3344 	/*
3345 	 * Skip the log entry if sampling is enabled and this row doesn't belong
3346 	 * to the random sample.
3347 	 */
3348 	if (sample_rate != 0.0 &&
3349 		pg_erand48(thread->random_state) > sample_rate)
3350 		return;
3351 
3352 	/* should we aggregate the results or not? */
3353 	if (agg_interval > 0)
3354 	{
3355 		/*
3356 		 * Loop until we reach the interval of the current moment, and print
3357 		 * any empty intervals in between (this may happen with very low tps,
3358 		 * e.g. --rate=0.1).
3359 		 */
3360 		time_t		now = time(NULL);
3361 
3362 		while (agg->start_time + agg_interval <= now)
3363 		{
3364 			/* print aggregated report to logfile */
3365 			fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
3366 					(long) agg->start_time,
3367 					agg->cnt,
3368 					agg->latency.sum,
3369 					agg->latency.sum2,
3370 					agg->latency.min,
3371 					agg->latency.max);
3372 			if (throttle_delay)
3373 			{
3374 				fprintf(logfile, " %.0f %.0f %.0f %.0f",
3375 						agg->lag.sum,
3376 						agg->lag.sum2,
3377 						agg->lag.min,
3378 						agg->lag.max);
3379 				if (latency_limit)
3380 					fprintf(logfile, " " INT64_FORMAT, agg->skipped);
3381 			}
3382 			fputc('\n', logfile);
3383 
3384 			/* reset data and move to next interval */
3385 			initStats(agg, agg->start_time + agg_interval);
3386 		}
3387 
3388 		/* accumulate the current transaction */
3389 		accumStats(agg, skipped, latency, lag);
3390 	}
3391 	else
3392 	{
3393 		/* no, print raw transactions */
3394 		struct timeval tv;
3395 
3396 		gettimeofday(&tv, NULL);
3397 		if (skipped)
3398 			fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
3399 					st->id, st->cnt, st->use_file,
3400 					(long) tv.tv_sec, (long) tv.tv_usec);
3401 		else
3402 			fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
3403 					st->id, st->cnt, latency, st->use_file,
3404 					(long) tv.tv_sec, (long) tv.tv_usec);
3405 		if (throttle_delay)
3406 			fprintf(logfile, " %.0f", lag);
3407 		fputc('\n', logfile);
3408 	}
3409 }
3410 
3411 /*
3412  * Accumulate and report statistics at end of a transaction.
3413  *
3414  * (This is also called when a transaction is late and thus skipped.
3415  * Note that even skipped transactions are counted in the "cnt" fields.)
3416  */
3417 static void
processXactStats(TState * thread,CState * st,instr_time * now,bool skipped,StatsData * agg)3418 processXactStats(TState *thread, CState *st, instr_time *now,
3419 				 bool skipped, StatsData *agg)
3420 {
3421 	double		latency = 0.0,
3422 				lag = 0.0;
3423 	bool		thread_details = progress || throttle_delay || latency_limit,
3424 				detailed = thread_details || use_log || per_script_stats;
3425 
3426 	if (detailed && !skipped)
3427 	{
3428 		if (INSTR_TIME_IS_ZERO(*now))
3429 			INSTR_TIME_SET_CURRENT(*now);
3430 
3431 		/* compute latency & lag */
3432 		latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
3433 		lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
3434 	}
3435 
3436 	if (thread_details)
3437 	{
3438 		/* keep detailed thread stats */
3439 		accumStats(&thread->stats, skipped, latency, lag);
3440 
3441 		/* count transactions over the latency limit, if needed */
3442 		if (latency_limit && latency > latency_limit)
3443 			thread->latency_late++;
3444 	}
3445 	else
3446 	{
3447 		/* no detailed stats, just count */
3448 		thread->stats.cnt++;
3449 	}
3450 
3451 	/* client stat is just counting */
3452 	st->cnt++;
3453 
3454 	if (use_log)
3455 		doLog(thread, st, agg, skipped, latency, lag);
3456 
3457 	/* XXX could use a mutex here, but we choose not to */
3458 	if (per_script_stats)
3459 		accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
3460 }
3461 
3462 
3463 /* discard connections */
3464 static void
disconnect_all(CState * state,int length)3465 disconnect_all(CState *state, int length)
3466 {
3467 	int			i;
3468 
3469 	for (i = 0; i < length; i++)
3470 		finishCon(&state[i]);
3471 }
3472 
3473 /*
3474  * Remove old pgbench tables, if any exist
3475  */
3476 static void
initDropTables(PGconn * con)3477 initDropTables(PGconn *con)
3478 {
3479 	fprintf(stderr, "dropping old tables...\n");
3480 
3481 	/*
3482 	 * We drop all the tables in one command, so that whether there are
3483 	 * foreign key dependencies or not doesn't matter.
3484 	 */
3485 	executeStatement(con, "drop table if exists "
3486 					 "pgbench_accounts, "
3487 					 "pgbench_branches, "
3488 					 "pgbench_history, "
3489 					 "pgbench_tellers");
3490 }
3491 
3492 /*
3493  * Create pgbench's standard tables
3494  */
3495 static void
initCreateTables(PGconn * con)3496 initCreateTables(PGconn *con)
3497 {
3498 	/*
3499 	 * The scale factor at/beyond which 32-bit integers are insufficient for
3500 	 * storing TPC-B account IDs.
3501 	 *
3502 	 * Although the actual threshold is 21474, we use 20000 because it is
3503 	 * easier to document and remember, and isn't that far away from the real
3504 	 * threshold.
3505 	 */
3506 #define SCALE_32BIT_THRESHOLD 20000
3507 
3508 	/*
3509 	 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
3510 	 * fields in these table declarations were intended to comply with that.
3511 	 * The pgbench_accounts table complies with that because the "filler"
3512 	 * column is set to blank-padded empty string. But for all other tables
3513 	 * the columns default to NULL and so don't actually take any space.  We
3514 	 * could fix that by giving them non-null default values.  However, that
3515 	 * would completely break comparability of pgbench results with prior
3516 	 * versions. Since pgbench has never pretended to be fully TPC-B compliant
3517 	 * anyway, we stick with the historical behavior.
3518 	 */
3519 	struct ddlinfo
3520 	{
3521 		const char *table;		/* table name */
3522 		const char *smcols;		/* column decls if accountIDs are 32 bits */
3523 		const char *bigcols;	/* column decls if accountIDs are 64 bits */
3524 		int			declare_fillfactor;
3525 	};
3526 	static const struct ddlinfo DDLs[] = {
3527 		{
3528 			"pgbench_history",
3529 			"tid int,bid int,aid    int,delta int,mtime timestamp,filler char(22)",
3530 			"tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
3531 			0
3532 		},
3533 		{
3534 			"pgbench_tellers",
3535 			"tid int not null,bid int,tbalance int,filler char(84)",
3536 			"tid int not null,bid int,tbalance int,filler char(84)",
3537 			1
3538 		},
3539 		{
3540 			"pgbench_accounts",
3541 			"aid    int not null,bid int,abalance int,filler char(84)",
3542 			"aid bigint not null,bid int,abalance int,filler char(84)",
3543 			1
3544 		},
3545 		{
3546 			"pgbench_branches",
3547 			"bid int not null,bbalance int,filler char(88)",
3548 			"bid int not null,bbalance int,filler char(88)",
3549 			1
3550 		}
3551 	};
3552 	int			i;
3553 
3554 	fprintf(stderr, "creating tables...\n");
3555 
3556 	for (i = 0; i < lengthof(DDLs); i++)
3557 	{
3558 		char		opts[256];
3559 		char		buffer[256];
3560 		const struct ddlinfo *ddl = &DDLs[i];
3561 		const char *cols;
3562 
3563 		/* Construct new create table statement. */
3564 		opts[0] = '\0';
3565 		if (ddl->declare_fillfactor)
3566 			snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3567 					 " with (fillfactor=%d)", fillfactor);
3568 		if (tablespace != NULL)
3569 		{
3570 			char	   *escape_tablespace;
3571 
3572 			escape_tablespace = PQescapeIdentifier(con, tablespace,
3573 												   strlen(tablespace));
3574 			snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3575 					 " tablespace %s", escape_tablespace);
3576 			PQfreemem(escape_tablespace);
3577 		}
3578 
3579 		cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
3580 
3581 		snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
3582 				 unlogged_tables ? " unlogged" : "",
3583 				 ddl->table, cols, opts);
3584 
3585 		executeStatement(con, buffer);
3586 	}
3587 }
3588 
3589 /*
3590  * Fill the standard tables with some data
3591  */
3592 static void
initGenerateData(PGconn * con)3593 initGenerateData(PGconn *con)
3594 {
3595 	char		sql[256];
3596 	PGresult   *res;
3597 	int			i;
3598 	int64		k;
3599 
3600 	/* used to track elapsed time and estimate of the remaining time */
3601 	instr_time	start,
3602 				diff;
3603 	double		elapsed_sec,
3604 				remaining_sec;
3605 	int			log_interval = 1;
3606 
3607 	fprintf(stderr, "generating data...\n");
3608 
3609 	/*
3610 	 * we do all of this in one transaction to enable the backend's
3611 	 * data-loading optimizations
3612 	 */
3613 	executeStatement(con, "begin");
3614 
3615 	/*
3616 	 * truncate away any old data, in one command in case there are foreign
3617 	 * keys
3618 	 */
3619 	executeStatement(con, "truncate table "
3620 					 "pgbench_accounts, "
3621 					 "pgbench_branches, "
3622 					 "pgbench_history, "
3623 					 "pgbench_tellers");
3624 
3625 	/*
3626 	 * fill branches, tellers, accounts in that order in case foreign keys
3627 	 * already exist
3628 	 */
3629 	for (i = 0; i < nbranches * scale; i++)
3630 	{
3631 		/* "filler" column defaults to NULL */
3632 		snprintf(sql, sizeof(sql),
3633 				 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
3634 				 i + 1);
3635 		executeStatement(con, sql);
3636 	}
3637 
3638 	for (i = 0; i < ntellers * scale; i++)
3639 	{
3640 		/* "filler" column defaults to NULL */
3641 		snprintf(sql, sizeof(sql),
3642 				 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
3643 				 i + 1, i / ntellers + 1);
3644 		executeStatement(con, sql);
3645 	}
3646 
3647 	/*
3648 	 * accounts is big enough to be worth using COPY and tracking runtime
3649 	 */
3650 	res = PQexec(con, "copy pgbench_accounts from stdin");
3651 	if (PQresultStatus(res) != PGRES_COPY_IN)
3652 	{
3653 		fprintf(stderr, "%s", PQerrorMessage(con));
3654 		exit(1);
3655 	}
3656 	PQclear(res);
3657 
3658 	INSTR_TIME_SET_CURRENT(start);
3659 
3660 	for (k = 0; k < (int64) naccounts * scale; k++)
3661 	{
3662 		int64		j = k + 1;
3663 
3664 		/* "filler" column defaults to blank padded empty string */
3665 		snprintf(sql, sizeof(sql),
3666 				 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
3667 				 j, k / naccounts + 1, 0);
3668 		if (PQputline(con, sql))
3669 		{
3670 			fprintf(stderr, "PQputline failed\n");
3671 			exit(1);
3672 		}
3673 
3674 		/*
3675 		 * If we want to stick with the original logging, print a message each
3676 		 * 100k inserted rows.
3677 		 */
3678 		if ((!use_quiet) && (j % 100000 == 0))
3679 		{
3680 			INSTR_TIME_SET_CURRENT(diff);
3681 			INSTR_TIME_SUBTRACT(diff, start);
3682 
3683 			elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3684 			remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3685 
3686 			fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3687 					j, (int64) naccounts * scale,
3688 					(int) (((int64) j * 100) / (naccounts * (int64) scale)),
3689 					elapsed_sec, remaining_sec);
3690 		}
3691 		/* let's not call the timing for each row, but only each 100 rows */
3692 		else if (use_quiet && (j % 100 == 0))
3693 		{
3694 			INSTR_TIME_SET_CURRENT(diff);
3695 			INSTR_TIME_SUBTRACT(diff, start);
3696 
3697 			elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3698 			remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3699 
3700 			/* have we reached the next interval (or end)? */
3701 			if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
3702 			{
3703 				fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3704 						j, (int64) naccounts * scale,
3705 						(int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
3706 
3707 				/* skip to the next interval */
3708 				log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
3709 			}
3710 		}
3711 
3712 	}
3713 	if (PQputline(con, "\\.\n"))
3714 	{
3715 		fprintf(stderr, "very last PQputline failed\n");
3716 		exit(1);
3717 	}
3718 	if (PQendcopy(con))
3719 	{
3720 		fprintf(stderr, "PQendcopy failed\n");
3721 		exit(1);
3722 	}
3723 
3724 	executeStatement(con, "commit");
3725 }
3726 
3727 /*
3728  * Invoke vacuum on the standard tables
3729  */
3730 static void
initVacuum(PGconn * con)3731 initVacuum(PGconn *con)
3732 {
3733 	fprintf(stderr, "vacuuming...\n");
3734 	executeStatement(con, "vacuum analyze pgbench_branches");
3735 	executeStatement(con, "vacuum analyze pgbench_tellers");
3736 	executeStatement(con, "vacuum analyze pgbench_accounts");
3737 	executeStatement(con, "vacuum analyze pgbench_history");
3738 }
3739 
3740 /*
3741  * Create primary keys on the standard tables
3742  */
3743 static void
initCreatePKeys(PGconn * con)3744 initCreatePKeys(PGconn *con)
3745 {
3746 	static const char *const DDLINDEXes[] = {
3747 		"alter table pgbench_branches add primary key (bid)",
3748 		"alter table pgbench_tellers add primary key (tid)",
3749 		"alter table pgbench_accounts add primary key (aid)"
3750 	};
3751 	int			i;
3752 
3753 	fprintf(stderr, "creating primary keys...\n");
3754 	for (i = 0; i < lengthof(DDLINDEXes); i++)
3755 	{
3756 		char		buffer[256];
3757 
3758 		strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
3759 
3760 		if (index_tablespace != NULL)
3761 		{
3762 			char	   *escape_tablespace;
3763 
3764 			escape_tablespace = PQescapeIdentifier(con, index_tablespace,
3765 												   strlen(index_tablespace));
3766 			snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
3767 					 " using index tablespace %s", escape_tablespace);
3768 			PQfreemem(escape_tablespace);
3769 		}
3770 
3771 		executeStatement(con, buffer);
3772 	}
3773 }
3774 
3775 /*
3776  * Create foreign key constraints between the standard tables
3777  */
3778 static void
initCreateFKeys(PGconn * con)3779 initCreateFKeys(PGconn *con)
3780 {
3781 	static const char *const DDLKEYs[] = {
3782 		"alter table pgbench_tellers add constraint pgbench_tellers_bid_fkey foreign key (bid) references pgbench_branches",
3783 		"alter table pgbench_accounts add constraint pgbench_accounts_bid_fkey foreign key (bid) references pgbench_branches",
3784 		"alter table pgbench_history add constraint pgbench_history_bid_fkey foreign key (bid) references pgbench_branches",
3785 		"alter table pgbench_history add constraint pgbench_history_tid_fkey foreign key (tid) references pgbench_tellers",
3786 		"alter table pgbench_history add constraint pgbench_history_aid_fkey foreign key (aid) references pgbench_accounts"
3787 	};
3788 	int			i;
3789 
3790 	fprintf(stderr, "creating foreign keys...\n");
3791 	for (i = 0; i < lengthof(DDLKEYs); i++)
3792 	{
3793 		executeStatement(con, DDLKEYs[i]);
3794 	}
3795 }
3796 
3797 /*
3798  * Validate an initialization-steps string
3799  *
3800  * (We could just leave it to runInitSteps() to fail if there are wrong
3801  * characters, but since initialization can take awhile, it seems friendlier
3802  * to check during option parsing.)
3803  */
3804 static void
checkInitSteps(const char * initialize_steps)3805 checkInitSteps(const char *initialize_steps)
3806 {
3807 	const char *step;
3808 
3809 	if (initialize_steps[0] == '\0')
3810 	{
3811 		fprintf(stderr, "no initialization steps specified\n");
3812 		exit(1);
3813 	}
3814 
3815 	for (step = initialize_steps; *step != '\0'; step++)
3816 	{
3817 		if (strchr("dtgvpf ", *step) == NULL)
3818 		{
3819 			fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3820 					*step);
3821 			fprintf(stderr, "allowed steps are: \"d\", \"t\", \"g\", \"v\", \"p\", \"f\"\n");
3822 			exit(1);
3823 		}
3824 	}
3825 }
3826 
3827 /*
3828  * Invoke each initialization step in the given string
3829  */
3830 static void
runInitSteps(const char * initialize_steps)3831 runInitSteps(const char *initialize_steps)
3832 {
3833 	PGconn	   *con;
3834 	const char *step;
3835 
3836 	if ((con = doConnect()) == NULL)
3837 		exit(1);
3838 
3839 	for (step = initialize_steps; *step != '\0'; step++)
3840 	{
3841 		switch (*step)
3842 		{
3843 			case 'd':
3844 				initDropTables(con);
3845 				break;
3846 			case 't':
3847 				initCreateTables(con);
3848 				break;
3849 			case 'g':
3850 				initGenerateData(con);
3851 				break;
3852 			case 'v':
3853 				initVacuum(con);
3854 				break;
3855 			case 'p':
3856 				initCreatePKeys(con);
3857 				break;
3858 			case 'f':
3859 				initCreateFKeys(con);
3860 				break;
3861 			case ' ':
3862 				break;			/* ignore */
3863 			default:
3864 				fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3865 						*step);
3866 				PQfinish(con);
3867 				exit(1);
3868 		}
3869 	}
3870 
3871 	fprintf(stderr, "done.\n");
3872 	PQfinish(con);
3873 }
3874 
3875 /*
3876  * Replace :param with $n throughout the command's SQL text, which
3877  * is a modifiable string in cmd->argv[0].
3878  */
3879 static bool
parseQuery(Command * cmd)3880 parseQuery(Command *cmd)
3881 {
3882 	char	   *sql,
3883 			   *p;
3884 
3885 	/* We don't want to scribble on cmd->argv[0] until done */
3886 	sql = pg_strdup(cmd->argv[0]);
3887 
3888 	cmd->argc = 1;
3889 
3890 	p = sql;
3891 	while ((p = strchr(p, ':')) != NULL)
3892 	{
3893 		char		var[13];
3894 		char	   *name;
3895 		int			eaten;
3896 
3897 		name = parseVariable(p, &eaten);
3898 		if (name == NULL)
3899 		{
3900 			while (*p == ':')
3901 			{
3902 				p++;
3903 			}
3904 			continue;
3905 		}
3906 
3907 		if (cmd->argc >= MAX_ARGS)
3908 		{
3909 			fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n",
3910 					MAX_ARGS - 1, cmd->argv[0]);
3911 			pg_free(name);
3912 			return false;
3913 		}
3914 
3915 		sprintf(var, "$%d", cmd->argc);
3916 		p = replaceVariable(&sql, p, eaten, var);
3917 
3918 		cmd->argv[cmd->argc] = name;
3919 		cmd->argc++;
3920 	}
3921 
3922 	pg_free(cmd->argv[0]);
3923 	cmd->argv[0] = sql;
3924 	return true;
3925 }
3926 
3927 /*
3928  * Simple error-printing function, might be needed by lexer
3929  */
3930 static void
pgbench_error(const char * fmt,...)3931 pgbench_error(const char *fmt,...)
3932 {
3933 	va_list		ap;
3934 
3935 	fflush(stdout);
3936 	va_start(ap, fmt);
3937 	vfprintf(stderr, _(fmt), ap);
3938 	va_end(ap);
3939 }
3940 
3941 /*
3942  * syntax error while parsing a script (in practice, while parsing a
3943  * backslash command, because we don't detect syntax errors in SQL)
3944  *
3945  * source: source of script (filename or builtin-script ID)
3946  * lineno: line number within script (count from 1)
3947  * line: whole line of backslash command, if available
3948  * command: backslash command name, if available
3949  * msg: the actual error message
3950  * more: optional extra message
3951  * column: zero-based column number, or -1 if unknown
3952  */
3953 void
syntax_error(const char * source,int lineno,const char * line,const char * command,const char * msg,const char * more,int column)3954 syntax_error(const char *source, int lineno,
3955 			 const char *line, const char *command,
3956 			 const char *msg, const char *more, int column)
3957 {
3958 	fprintf(stderr, "%s:%d: %s", source, lineno, msg);
3959 	if (more != NULL)
3960 		fprintf(stderr, " (%s)", more);
3961 	if (column >= 0 && line == NULL)
3962 		fprintf(stderr, " at column %d", column + 1);
3963 	if (command != NULL)
3964 		fprintf(stderr, " in command \"%s\"", command);
3965 	fprintf(stderr, "\n");
3966 	if (line != NULL)
3967 	{
3968 		fprintf(stderr, "%s\n", line);
3969 		if (column >= 0)
3970 		{
3971 			int			i;
3972 
3973 			for (i = 0; i < column; i++)
3974 				fprintf(stderr, " ");
3975 			fprintf(stderr, "^ error found here\n");
3976 		}
3977 	}
3978 	exit(1);
3979 }
3980 
3981 /*
3982  * Parse a SQL command; return a Command struct, or NULL if it's a comment
3983  *
3984  * On entry, psqlscan.l has collected the command into "buf", so we don't
3985  * really need to do much here except check for comment and set up a
3986  * Command struct.
3987  */
3988 static Command *
process_sql_command(PQExpBuffer buf,const char * source)3989 process_sql_command(PQExpBuffer buf, const char *source)
3990 {
3991 	Command    *my_command;
3992 	char	   *p;
3993 	char	   *nlpos;
3994 
3995 	/* Skip any leading whitespace, as well as "--" style comments */
3996 	p = buf->data;
3997 	for (;;)
3998 	{
3999 		if (isspace((unsigned char) *p))
4000 			p++;
4001 		else if (strncmp(p, "--", 2) == 0)
4002 		{
4003 			p = strchr(p, '\n');
4004 			if (p == NULL)
4005 				return NULL;
4006 			p++;
4007 		}
4008 		else
4009 			break;
4010 	}
4011 
4012 	/* If there's nothing but whitespace and comments, we're done */
4013 	if (*p == '\0')
4014 		return NULL;
4015 
4016 	/* Allocate and initialize Command structure */
4017 	my_command = (Command *) pg_malloc0(sizeof(Command));
4018 	my_command->command_num = num_commands++;
4019 	my_command->type = SQL_COMMAND;
4020 	my_command->meta = META_NONE;
4021 	initSimpleStats(&my_command->stats);
4022 
4023 	/*
4024 	 * Install query text as the sole argv string.  If we are using a
4025 	 * non-simple query mode, we'll extract parameters from it later.
4026 	 */
4027 	my_command->argv[0] = pg_strdup(p);
4028 	my_command->argc = 1;
4029 
4030 	/*
4031 	 * If SQL command is multi-line, we only want to save the first line as
4032 	 * the "line" label.
4033 	 */
4034 	nlpos = strchr(p, '\n');
4035 	if (nlpos)
4036 	{
4037 		my_command->line = pg_malloc(nlpos - p + 1);
4038 		memcpy(my_command->line, p, nlpos - p);
4039 		my_command->line[nlpos - p] = '\0';
4040 	}
4041 	else
4042 		my_command->line = pg_strdup(p);
4043 
4044 	return my_command;
4045 }
4046 
4047 /*
4048  * Parse a backslash command; return a Command struct, or NULL if comment
4049  *
4050  * At call, we have scanned only the initial backslash.
4051  */
4052 static Command *
process_backslash_command(PsqlScanState sstate,const char * source)4053 process_backslash_command(PsqlScanState sstate, const char *source)
4054 {
4055 	Command    *my_command;
4056 	PQExpBufferData word_buf;
4057 	int			word_offset;
4058 	int			offsets[MAX_ARGS];	/* offsets of argument words */
4059 	int			start_offset;
4060 	int			lineno;
4061 	int			j;
4062 
4063 	initPQExpBuffer(&word_buf);
4064 
4065 	/* Remember location of the backslash */
4066 	start_offset = expr_scanner_offset(sstate) - 1;
4067 	lineno = expr_scanner_get_lineno(sstate, start_offset);
4068 
4069 	/* Collect first word of command */
4070 	if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4071 	{
4072 		termPQExpBuffer(&word_buf);
4073 		return NULL;
4074 	}
4075 
4076 	/* Allocate and initialize Command structure */
4077 	my_command = (Command *) pg_malloc0(sizeof(Command));
4078 	my_command->command_num = num_commands++;
4079 	my_command->type = META_COMMAND;
4080 	my_command->argc = 0;
4081 	initSimpleStats(&my_command->stats);
4082 
4083 	/* Save first word (command name) */
4084 	j = 0;
4085 	offsets[j] = word_offset;
4086 	my_command->argv[j++] = pg_strdup(word_buf.data);
4087 	my_command->argc++;
4088 
4089 	/* ... and convert it to enum form */
4090 	my_command->meta = getMetaCommand(my_command->argv[0]);
4091 
4092 	if (my_command->meta == META_SET ||
4093 		my_command->meta == META_IF ||
4094 		my_command->meta == META_ELIF)
4095 	{
4096 		yyscan_t	yyscanner;
4097 
4098 		/* For \set, collect var name */
4099 		if (my_command->meta == META_SET)
4100 		{
4101 			if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4102 				syntax_error(source, lineno, my_command->line, my_command->argv[0],
4103 							 "missing argument", NULL, -1);
4104 
4105 			offsets[j] = word_offset;
4106 			my_command->argv[j++] = pg_strdup(word_buf.data);
4107 			my_command->argc++;
4108 		}
4109 
4110 		/* then for all parse the expression */
4111 		yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
4112 									  my_command->argv[0]);
4113 
4114 		if (expr_yyparse(yyscanner) != 0)
4115 		{
4116 			/* dead code: exit done from syntax_error called by yyerror */
4117 			exit(1);
4118 		}
4119 
4120 		my_command->expr = expr_parse_result;
4121 
4122 		/* Save line, trimming any trailing newline */
4123 		my_command->line = expr_scanner_get_substring(sstate,
4124 													  start_offset,
4125 													  expr_scanner_offset(sstate),
4126 													  true);
4127 
4128 		expr_scanner_finish(yyscanner);
4129 
4130 		termPQExpBuffer(&word_buf);
4131 
4132 		return my_command;
4133 	}
4134 
4135 	/* For all other commands, collect remaining words. */
4136 	while (expr_lex_one_word(sstate, &word_buf, &word_offset))
4137 	{
4138 		if (j >= MAX_ARGS)
4139 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
4140 						 "too many arguments", NULL, -1);
4141 
4142 		offsets[j] = word_offset;
4143 		my_command->argv[j++] = pg_strdup(word_buf.data);
4144 		my_command->argc++;
4145 	}
4146 
4147 	/* Save line, trimming any trailing newline */
4148 	my_command->line = expr_scanner_get_substring(sstate,
4149 												  start_offset,
4150 												  expr_scanner_offset(sstate),
4151 												  true);
4152 
4153 	if (my_command->meta == META_SLEEP)
4154 	{
4155 		if (my_command->argc < 2)
4156 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
4157 						 "missing argument", NULL, -1);
4158 
4159 		if (my_command->argc > 3)
4160 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
4161 						 "too many arguments", NULL,
4162 						 offsets[3] - start_offset);
4163 
4164 		/*
4165 		 * Split argument into number and unit to allow "sleep 1ms" etc. We
4166 		 * don't have to terminate the number argument with null because it
4167 		 * will be parsed with atoi, which ignores trailing non-digit
4168 		 * characters.
4169 		 */
4170 		if (my_command->argc == 2 && my_command->argv[1][0] != ':')
4171 		{
4172 			char	   *c = my_command->argv[1];
4173 
4174 			while (isdigit((unsigned char) *c))
4175 				c++;
4176 			if (*c)
4177 			{
4178 				my_command->argv[2] = c;
4179 				offsets[2] = offsets[1] + (c - my_command->argv[1]);
4180 				my_command->argc = 3;
4181 			}
4182 		}
4183 
4184 		if (my_command->argc == 3)
4185 		{
4186 			if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
4187 				pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
4188 				pg_strcasecmp(my_command->argv[2], "s") != 0)
4189 				syntax_error(source, lineno, my_command->line, my_command->argv[0],
4190 							 "unrecognized time unit, must be us, ms or s",
4191 							 my_command->argv[2], offsets[2] - start_offset);
4192 		}
4193 	}
4194 	else if (my_command->meta == META_SETSHELL)
4195 	{
4196 		if (my_command->argc < 3)
4197 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
4198 						 "missing argument", NULL, -1);
4199 	}
4200 	else if (my_command->meta == META_SHELL)
4201 	{
4202 		if (my_command->argc < 2)
4203 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
4204 						 "missing command", NULL, -1);
4205 	}
4206 	else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
4207 	{
4208 		if (my_command->argc != 1)
4209 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
4210 						 "unexpected argument", NULL, -1);
4211 	}
4212 	else
4213 	{
4214 		/* my_command->meta == META_NONE */
4215 		syntax_error(source, lineno, my_command->line, my_command->argv[0],
4216 					 "invalid command", NULL, -1);
4217 	}
4218 
4219 	termPQExpBuffer(&word_buf);
4220 
4221 	return my_command;
4222 }
4223 
4224 static void
ConditionError(const char * desc,int cmdn,const char * msg)4225 ConditionError(const char *desc, int cmdn, const char *msg)
4226 {
4227 	fprintf(stderr,
4228 			"condition error in script \"%s\" command %d: %s\n",
4229 			desc, cmdn, msg);
4230 	exit(1);
4231 }
4232 
4233 /*
4234  * Partial evaluation of conditionals before recording and running the script.
4235  */
4236 static void
CheckConditional(ParsedScript ps)4237 CheckConditional(ParsedScript ps)
4238 {
4239 	/* statically check conditional structure */
4240 	ConditionalStack cs = conditional_stack_create();
4241 	int			i;
4242 
4243 	for (i = 0; ps.commands[i] != NULL; i++)
4244 	{
4245 		Command    *cmd = ps.commands[i];
4246 
4247 		if (cmd->type == META_COMMAND)
4248 		{
4249 			switch (cmd->meta)
4250 			{
4251 				case META_IF:
4252 					conditional_stack_push(cs, IFSTATE_FALSE);
4253 					break;
4254 				case META_ELIF:
4255 					if (conditional_stack_empty(cs))
4256 						ConditionError(ps.desc, i + 1, "\\elif without matching \\if");
4257 					if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
4258 						ConditionError(ps.desc, i + 1, "\\elif after \\else");
4259 					break;
4260 				case META_ELSE:
4261 					if (conditional_stack_empty(cs))
4262 						ConditionError(ps.desc, i + 1, "\\else without matching \\if");
4263 					if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
4264 						ConditionError(ps.desc, i + 1, "\\else after \\else");
4265 					conditional_stack_poke(cs, IFSTATE_ELSE_FALSE);
4266 					break;
4267 				case META_ENDIF:
4268 					if (!conditional_stack_pop(cs))
4269 						ConditionError(ps.desc, i + 1, "\\endif without matching \\if");
4270 					break;
4271 				default:
4272 					/* ignore anything else... */
4273 					break;
4274 			}
4275 		}
4276 	}
4277 	if (!conditional_stack_empty(cs))
4278 		ConditionError(ps.desc, i + 1, "\\if without matching \\endif");
4279 	conditional_stack_destroy(cs);
4280 }
4281 
4282 /*
4283  * Parse a script (either the contents of a file, or a built-in script)
4284  * and add it to the list of scripts.
4285  */
4286 static void
ParseScript(const char * script,const char * desc,int weight)4287 ParseScript(const char *script, const char *desc, int weight)
4288 {
4289 	ParsedScript ps;
4290 	PsqlScanState sstate;
4291 	PQExpBufferData line_buf;
4292 	int			alloc_num;
4293 	int			index;
4294 
4295 #define COMMANDS_ALLOC_NUM 128
4296 	alloc_num = COMMANDS_ALLOC_NUM;
4297 
4298 	/* Initialize all fields of ps */
4299 	ps.desc = desc;
4300 	ps.weight = weight;
4301 	ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
4302 	initStats(&ps.stats, 0);
4303 
4304 	/* Prepare to parse script */
4305 	sstate = psql_scan_create(&pgbench_callbacks);
4306 
4307 	/*
4308 	 * Ideally, we'd scan scripts using the encoding and stdstrings settings
4309 	 * we get from a DB connection.  However, without major rearrangement of
4310 	 * pgbench's argument parsing, we can't have a DB connection at the time
4311 	 * we parse scripts.  Using SQL_ASCII (encoding 0) should work well enough
4312 	 * with any backend-safe encoding, though conceivably we could be fooled
4313 	 * if a script file uses a client-only encoding.  We also assume that
4314 	 * stdstrings should be true, which is a bit riskier.
4315 	 */
4316 	psql_scan_setup(sstate, script, strlen(script), 0, true);
4317 
4318 	initPQExpBuffer(&line_buf);
4319 
4320 	index = 0;
4321 
4322 	for (;;)
4323 	{
4324 		PsqlScanResult sr;
4325 		promptStatus_t prompt;
4326 		Command    *command;
4327 
4328 		resetPQExpBuffer(&line_buf);
4329 
4330 		sr = psql_scan(sstate, &line_buf, &prompt);
4331 
4332 		/* If we collected a SQL command, process that */
4333 		command = process_sql_command(&line_buf, desc);
4334 		if (command)
4335 		{
4336 			ps.commands[index] = command;
4337 			index++;
4338 
4339 			if (index >= alloc_num)
4340 			{
4341 				alloc_num += COMMANDS_ALLOC_NUM;
4342 				ps.commands = (Command **)
4343 					pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
4344 			}
4345 		}
4346 
4347 		/* If we reached a backslash, process that */
4348 		if (sr == PSCAN_BACKSLASH)
4349 		{
4350 			command = process_backslash_command(sstate, desc);
4351 			if (command)
4352 			{
4353 				ps.commands[index] = command;
4354 				index++;
4355 
4356 				if (index >= alloc_num)
4357 				{
4358 					alloc_num += COMMANDS_ALLOC_NUM;
4359 					ps.commands = (Command **)
4360 						pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
4361 				}
4362 			}
4363 		}
4364 
4365 		/* Done if we reached EOF */
4366 		if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
4367 			break;
4368 	}
4369 
4370 	ps.commands[index] = NULL;
4371 
4372 	addScript(ps);
4373 
4374 	termPQExpBuffer(&line_buf);
4375 	psql_scan_finish(sstate);
4376 	psql_scan_destroy(sstate);
4377 }
4378 
4379 /*
4380  * Read the entire contents of file fd, and return it in a malloc'd buffer.
4381  *
4382  * The buffer will typically be larger than necessary, but we don't care
4383  * in this program, because we'll free it as soon as we've parsed the script.
4384  */
4385 static char *
read_file_contents(FILE * fd)4386 read_file_contents(FILE *fd)
4387 {
4388 	char	   *buf;
4389 	size_t		buflen = BUFSIZ;
4390 	size_t		used = 0;
4391 
4392 	buf = (char *) pg_malloc(buflen);
4393 
4394 	for (;;)
4395 	{
4396 		size_t		nread;
4397 
4398 		nread = fread(buf + used, 1, BUFSIZ, fd);
4399 		used += nread;
4400 		/* If fread() read less than requested, must be EOF or error */
4401 		if (nread < BUFSIZ)
4402 			break;
4403 		/* Enlarge buf so we can read some more */
4404 		buflen += BUFSIZ;
4405 		buf = (char *) pg_realloc(buf, buflen);
4406 	}
4407 	/* There is surely room for a terminator */
4408 	buf[used] = '\0';
4409 
4410 	return buf;
4411 }
4412 
4413 /*
4414  * Given a file name, read it and add its script to the list.
4415  * "-" means to read stdin.
4416  * NB: filename must be storage that won't disappear.
4417  */
4418 static void
process_file(const char * filename,int weight)4419 process_file(const char *filename, int weight)
4420 {
4421 	FILE	   *fd;
4422 	char	   *buf;
4423 
4424 	/* Slurp the file contents into "buf" */
4425 	if (strcmp(filename, "-") == 0)
4426 		fd = stdin;
4427 	else if ((fd = fopen(filename, "r")) == NULL)
4428 	{
4429 		fprintf(stderr, "could not open file \"%s\": %s\n",
4430 				filename, strerror(errno));
4431 		exit(1);
4432 	}
4433 
4434 	buf = read_file_contents(fd);
4435 
4436 	if (ferror(fd))
4437 	{
4438 		fprintf(stderr, "could not read file \"%s\": %s\n",
4439 				filename, strerror(errno));
4440 		exit(1);
4441 	}
4442 
4443 	if (fd != stdin)
4444 		fclose(fd);
4445 
4446 	ParseScript(buf, filename, weight);
4447 
4448 	free(buf);
4449 }
4450 
4451 /* Parse the given builtin script and add it to the list. */
4452 static void
process_builtin(const BuiltinScript * bi,int weight)4453 process_builtin(const BuiltinScript *bi, int weight)
4454 {
4455 	ParseScript(bi->script, bi->desc, weight);
4456 }
4457 
4458 /* show available builtin scripts */
4459 static void
listAvailableScripts(void)4460 listAvailableScripts(void)
4461 {
4462 	int			i;
4463 
4464 	fprintf(stderr, "Available builtin scripts:\n");
4465 	for (i = 0; i < lengthof(builtin_script); i++)
4466 		fprintf(stderr, "\t%s\n", builtin_script[i].name);
4467 	fprintf(stderr, "\n");
4468 }
4469 
4470 /* return builtin script "name" if unambiguous, fails if not found */
4471 static const BuiltinScript *
findBuiltin(const char * name)4472 findBuiltin(const char *name)
4473 {
4474 	int			i,
4475 				found = 0,
4476 				len = strlen(name);
4477 	const BuiltinScript *result = NULL;
4478 
4479 	for (i = 0; i < lengthof(builtin_script); i++)
4480 	{
4481 		if (strncmp(builtin_script[i].name, name, len) == 0)
4482 		{
4483 			result = &builtin_script[i];
4484 			found++;
4485 		}
4486 	}
4487 
4488 	/* ok, unambiguous result */
4489 	if (found == 1)
4490 		return result;
4491 
4492 	/* error cases */
4493 	if (found == 0)
4494 		fprintf(stderr, "no builtin script found for name \"%s\"\n", name);
4495 	else						/* found > 1 */
4496 		fprintf(stderr,
4497 				"ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name);
4498 
4499 	listAvailableScripts();
4500 	exit(1);
4501 }
4502 
4503 /*
4504  * Determine the weight specification from a script option (-b, -f), if any,
4505  * and return it as an integer (1 is returned if there's no weight).  The
4506  * script name is returned in *script as a malloc'd string.
4507  */
4508 static int
parseScriptWeight(const char * option,char ** script)4509 parseScriptWeight(const char *option, char **script)
4510 {
4511 	char	   *sep;
4512 	int			weight;
4513 
4514 	if ((sep = strrchr(option, WSEP)))
4515 	{
4516 		int			namelen = sep - option;
4517 		long		wtmp;
4518 		char	   *badp;
4519 
4520 		/* generate the script name */
4521 		*script = pg_malloc(namelen + 1);
4522 		strncpy(*script, option, namelen);
4523 		(*script)[namelen] = '\0';
4524 
4525 		/* process digits of the weight spec */
4526 		errno = 0;
4527 		wtmp = strtol(sep + 1, &badp, 10);
4528 		if (errno != 0 || badp == sep + 1 || *badp != '\0')
4529 		{
4530 			fprintf(stderr, "invalid weight specification: %s\n", sep);
4531 			exit(1);
4532 		}
4533 		if (wtmp > INT_MAX || wtmp < 0)
4534 		{
4535 			fprintf(stderr,
4536 					"weight specification out of range (0 .. %u): " INT64_FORMAT "\n",
4537 					INT_MAX, (int64) wtmp);
4538 			exit(1);
4539 		}
4540 		weight = wtmp;
4541 	}
4542 	else
4543 	{
4544 		*script = pg_strdup(option);
4545 		weight = 1;
4546 	}
4547 
4548 	return weight;
4549 }
4550 
4551 /* append a script to the list of scripts to process */
4552 static void
addScript(ParsedScript script)4553 addScript(ParsedScript script)
4554 {
4555 	if (script.commands == NULL || script.commands[0] == NULL)
4556 	{
4557 		fprintf(stderr, "empty command list for script \"%s\"\n", script.desc);
4558 		exit(1);
4559 	}
4560 
4561 	if (num_scripts >= MAX_SCRIPTS)
4562 	{
4563 		fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS);
4564 		exit(1);
4565 	}
4566 
4567 	CheckConditional(script);
4568 
4569 	sql_script[num_scripts] = script;
4570 	num_scripts++;
4571 }
4572 
4573 static void
printSimpleStats(const char * prefix,SimpleStats * ss)4574 printSimpleStats(const char *prefix, SimpleStats *ss)
4575 {
4576 	if (ss->count > 0)
4577 	{
4578 		double		latency = ss->sum / ss->count;
4579 		double		stddev = sqrt(ss->sum2 / ss->count - latency * latency);
4580 
4581 		printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
4582 		printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
4583 	}
4584 }
4585 
4586 /* print out results */
4587 static void
printResults(TState * threads,StatsData * total,instr_time total_time,instr_time conn_total_time,int64 latency_late)4588 printResults(TState *threads, StatsData *total, instr_time total_time,
4589 			 instr_time conn_total_time, int64 latency_late)
4590 {
4591 	double		time_include,
4592 				tps_include,
4593 				tps_exclude;
4594 	int64		ntx = total->cnt - total->skipped;
4595 	int			i,
4596 				totalCacheOverflows = 0;
4597 
4598 	time_include = INSTR_TIME_GET_DOUBLE(total_time);
4599 
4600 	/* tps is about actually executed transactions */
4601 	tps_include = ntx / time_include;
4602 	tps_exclude = ntx /
4603 		(time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
4604 
4605 	/* Report test parameters. */
4606 	printf("transaction type: %s\n",
4607 		   num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
4608 	printf("scaling factor: %d\n", scale);
4609 	printf("query mode: %s\n", QUERYMODE[querymode]);
4610 	printf("number of clients: %d\n", nclients);
4611 	printf("number of threads: %d\n", nthreads);
4612 	if (duration <= 0)
4613 	{
4614 		printf("number of transactions per client: %d\n", nxacts);
4615 		printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
4616 			   ntx, nxacts * nclients);
4617 	}
4618 	else
4619 	{
4620 		printf("duration: %d s\n", duration);
4621 		printf("number of transactions actually processed: " INT64_FORMAT "\n",
4622 			   ntx);
4623 	}
4624 	/* Report zipfian cache overflow */
4625 	for (i = 0; i < nthreads; i++)
4626 	{
4627 		totalCacheOverflows += threads[i].zipf_cache.overflowCount;
4628 	}
4629 	if (totalCacheOverflows > 0)
4630 	{
4631 		printf("zipfian cache array overflowed %d time(s)\n", totalCacheOverflows);
4632 	}
4633 
4634 	/* Remaining stats are nonsensical if we failed to execute any xacts */
4635 	if (total->cnt <= 0)
4636 		return;
4637 
4638 	if (throttle_delay && latency_limit)
4639 		printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
4640 			   total->skipped,
4641 			   100.0 * total->skipped / total->cnt);
4642 
4643 	if (latency_limit)
4644 		printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f %%)\n",
4645 			   latency_limit / 1000.0, latency_late, ntx,
4646 			   (ntx > 0) ? 100.0 * latency_late / ntx : 0.0);
4647 
4648 	if (throttle_delay || progress || latency_limit)
4649 		printSimpleStats("latency", &total->latency);
4650 	else
4651 	{
4652 		/* no measurement, show average latency computed from run time */
4653 		printf("latency average = %.3f ms\n",
4654 			   1000.0 * time_include * nclients / total->cnt);
4655 	}
4656 
4657 	if (throttle_delay)
4658 	{
4659 		/*
4660 		 * Report average transaction lag under rate limit throttling.  This
4661 		 * is the delay between scheduled and actual start times for the
4662 		 * transaction.  The measured lag may be caused by thread/client load,
4663 		 * the database load, or the Poisson throttling process.
4664 		 */
4665 		printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
4666 			   0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
4667 	}
4668 
4669 	printf("tps = %f (including connections establishing)\n", tps_include);
4670 	printf("tps = %f (excluding connections establishing)\n", tps_exclude);
4671 
4672 	/* Report per-script/command statistics */
4673 	if (per_script_stats || is_latencies)
4674 	{
4675 		int			i;
4676 
4677 		for (i = 0; i < num_scripts; i++)
4678 		{
4679 			if (per_script_stats)
4680 			{
4681 				StatsData  *sstats = &sql_script[i].stats;
4682 
4683 				printf("SQL script %d: %s\n"
4684 					   " - weight: %d (targets %.1f%% of total)\n"
4685 					   " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
4686 					   i + 1, sql_script[i].desc,
4687 					   sql_script[i].weight,
4688 					   100.0 * sql_script[i].weight / total_weight,
4689 					   sstats->cnt,
4690 					   100.0 * sstats->cnt / total->cnt,
4691 					   (sstats->cnt - sstats->skipped) / time_include);
4692 
4693 				if (throttle_delay && latency_limit && sstats->cnt > 0)
4694 					printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
4695 						   sstats->skipped,
4696 						   100.0 * sstats->skipped / sstats->cnt);
4697 
4698 				printSimpleStats(" - latency", &sstats->latency);
4699 			}
4700 
4701 			/* Report per-command latencies */
4702 			if (is_latencies)
4703 			{
4704 				Command   **commands;
4705 
4706 				if (per_script_stats)
4707 					printf(" - statement latencies in milliseconds:\n");
4708 				else
4709 					printf("statement latencies in milliseconds:\n");
4710 
4711 				for (commands = sql_script[i].commands;
4712 					 *commands != NULL;
4713 					 commands++)
4714 				{
4715 					SimpleStats *cstats = &(*commands)->stats;
4716 
4717 					printf("   %11.3f  %s\n",
4718 						   (cstats->count > 0) ?
4719 						   1000.0 * cstats->sum / cstats->count : 0.0,
4720 						   (*commands)->line);
4721 				}
4722 			}
4723 		}
4724 	}
4725 }
4726 
4727 /*
4728  * Set up a random seed according to seed parameter (NULL means default),
4729  * and initialize base_random_sequence for use in initializing other sequences.
4730  */
4731 static bool
set_random_seed(const char * seed)4732 set_random_seed(const char *seed)
4733 {
4734 	uint64		iseed;
4735 
4736 	if (seed == NULL || strcmp(seed, "time") == 0)
4737 	{
4738 		/* rely on current time */
4739 		instr_time	now;
4740 
4741 		INSTR_TIME_SET_CURRENT(now);
4742 		iseed = (uint64) INSTR_TIME_GET_MICROSEC(now);
4743 	}
4744 	else if (strcmp(seed, "rand") == 0)
4745 	{
4746 		/* use some "strong" random source */
4747 #ifdef HAVE_STRONG_RANDOM
4748 		if (!pg_strong_random(&iseed, sizeof(iseed)))
4749 #endif
4750 		{
4751 			fprintf(stderr,
4752 					"cannot seed random from a strong source, none available: "
4753 					"use \"time\" or an unsigned integer value.\n");
4754 			return false;
4755 		}
4756 	}
4757 	else
4758 	{
4759 		/* parse unsigned-int seed value */
4760 		unsigned long ulseed;
4761 		char		garbage;
4762 
4763 		/* Don't try to use UINT64_FORMAT here; it might not work for sscanf */
4764 		if (sscanf(seed, "%lu%c", &ulseed, &garbage) != 1)
4765 		{
4766 			fprintf(stderr,
4767 					"unrecognized random seed option \"%s\": expecting an unsigned integer, \"time\" or \"rand\"\n",
4768 					seed);
4769 			return false;
4770 		}
4771 		iseed = (uint64) ulseed;
4772 	}
4773 
4774 	if (seed != NULL)
4775 		fprintf(stderr, "setting random seed to " UINT64_FORMAT "\n", iseed);
4776 	random_seed = iseed;
4777 
4778 	/* Fill base_random_sequence with low-order bits of seed */
4779 	base_random_sequence[0] = iseed & 0xFFFF;
4780 	base_random_sequence[1] = (iseed >> 16) & 0xFFFF;
4781 	base_random_sequence[2] = (iseed >> 32) & 0xFFFF;
4782 
4783 	return true;
4784 }
4785 
4786 
4787 int
main(int argc,char ** argv)4788 main(int argc, char **argv)
4789 {
4790 	static struct option long_options[] = {
4791 		/* systematic long/short named options */
4792 		{"builtin", required_argument, NULL, 'b'},
4793 		{"client", required_argument, NULL, 'c'},
4794 		{"connect", no_argument, NULL, 'C'},
4795 		{"debug", no_argument, NULL, 'd'},
4796 		{"define", required_argument, NULL, 'D'},
4797 		{"file", required_argument, NULL, 'f'},
4798 		{"fillfactor", required_argument, NULL, 'F'},
4799 		{"host", required_argument, NULL, 'h'},
4800 		{"initialize", no_argument, NULL, 'i'},
4801 		{"init-steps", required_argument, NULL, 'I'},
4802 		{"jobs", required_argument, NULL, 'j'},
4803 		{"log", no_argument, NULL, 'l'},
4804 		{"latency-limit", required_argument, NULL, 'L'},
4805 		{"no-vacuum", no_argument, NULL, 'n'},
4806 		{"port", required_argument, NULL, 'p'},
4807 		{"progress", required_argument, NULL, 'P'},
4808 		{"protocol", required_argument, NULL, 'M'},
4809 		{"quiet", no_argument, NULL, 'q'},
4810 		{"report-latencies", no_argument, NULL, 'r'},
4811 		{"rate", required_argument, NULL, 'R'},
4812 		{"scale", required_argument, NULL, 's'},
4813 		{"select-only", no_argument, NULL, 'S'},
4814 		{"skip-some-updates", no_argument, NULL, 'N'},
4815 		{"time", required_argument, NULL, 'T'},
4816 		{"transactions", required_argument, NULL, 't'},
4817 		{"username", required_argument, NULL, 'U'},
4818 		{"vacuum-all", no_argument, NULL, 'v'},
4819 		/* long-named only options */
4820 		{"unlogged-tables", no_argument, NULL, 1},
4821 		{"tablespace", required_argument, NULL, 2},
4822 		{"index-tablespace", required_argument, NULL, 3},
4823 		{"sampling-rate", required_argument, NULL, 4},
4824 		{"aggregate-interval", required_argument, NULL, 5},
4825 		{"progress-timestamp", no_argument, NULL, 6},
4826 		{"log-prefix", required_argument, NULL, 7},
4827 		{"foreign-keys", no_argument, NULL, 8},
4828 		{"random-seed", required_argument, NULL, 9},
4829 		{NULL, 0, NULL, 0}
4830 	};
4831 
4832 	int			c;
4833 	bool		is_init_mode = false;	/* initialize mode? */
4834 	char	   *initialize_steps = NULL;
4835 	bool		foreign_keys = false;
4836 	bool		is_no_vacuum = false;
4837 	bool		do_vacuum_accounts = false; /* vacuum accounts table? */
4838 	int			optindex;
4839 	bool		scale_given = false;
4840 
4841 	bool		benchmarking_option_set = false;
4842 	bool		initialization_option_set = false;
4843 	bool		internal_script_used = false;
4844 
4845 	CState	   *state;			/* status of clients */
4846 	TState	   *threads;		/* array of thread */
4847 
4848 	instr_time	start_time;		/* start up time */
4849 	instr_time	total_time;
4850 	instr_time	conn_total_time;
4851 	int64		latency_late = 0;
4852 	StatsData	stats;
4853 	int			weight;
4854 
4855 	int			i;
4856 	int			nclients_dealt;
4857 
4858 #ifdef HAVE_GETRLIMIT
4859 	struct rlimit rlim;
4860 #endif
4861 
4862 	PGconn	   *con;
4863 	PGresult   *res;
4864 	char	   *env;
4865 
4866 	progname = get_progname(argv[0]);
4867 
4868 	if (argc > 1)
4869 	{
4870 		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
4871 		{
4872 			usage();
4873 			exit(0);
4874 		}
4875 		if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
4876 		{
4877 			puts("pgbench (PostgreSQL) " PG_VERSION);
4878 			exit(0);
4879 		}
4880 	}
4881 
4882 #ifdef WIN32
4883 	/* stderr is buffered on Win32. */
4884 	setvbuf(stderr, NULL, _IONBF, 0);
4885 #endif
4886 
4887 	if ((env = getenv("PGHOST")) != NULL && *env != '\0')
4888 		pghost = env;
4889 	if ((env = getenv("PGPORT")) != NULL && *env != '\0')
4890 		pgport = env;
4891 	else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
4892 		login = env;
4893 
4894 	state = (CState *) pg_malloc(sizeof(CState));
4895 	memset(state, 0, sizeof(CState));
4896 
4897 	/* set random seed early, because it may be used while parsing scripts. */
4898 	if (!set_random_seed(getenv("PGBENCH_RANDOM_SEED")))
4899 	{
4900 		fprintf(stderr, "error while setting random seed from PGBENCH_RANDOM_SEED environment variable\n");
4901 		exit(1);
4902 	}
4903 
4904 	while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
4905 	{
4906 		char	   *script;
4907 
4908 		switch (c)
4909 		{
4910 			case 'i':
4911 				is_init_mode = true;
4912 				break;
4913 			case 'I':
4914 				if (initialize_steps)
4915 					pg_free(initialize_steps);
4916 				initialize_steps = pg_strdup(optarg);
4917 				checkInitSteps(initialize_steps);
4918 				initialization_option_set = true;
4919 				break;
4920 			case 'h':
4921 				pghost = pg_strdup(optarg);
4922 				break;
4923 			case 'n':
4924 				is_no_vacuum = true;
4925 				break;
4926 			case 'v':
4927 				benchmarking_option_set = true;
4928 				do_vacuum_accounts = true;
4929 				break;
4930 			case 'p':
4931 				pgport = pg_strdup(optarg);
4932 				break;
4933 			case 'd':
4934 				debug++;
4935 				break;
4936 			case 'c':
4937 				benchmarking_option_set = true;
4938 				nclients = atoi(optarg);
4939 				if (nclients <= 0 || nclients > MAXCLIENTS)
4940 				{
4941 					fprintf(stderr, "invalid number of clients: \"%s\"\n",
4942 							optarg);
4943 					exit(1);
4944 				}
4945 #ifdef HAVE_GETRLIMIT
4946 #ifdef RLIMIT_NOFILE			/* most platforms use RLIMIT_NOFILE */
4947 				if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
4948 #else							/* but BSD doesn't ... */
4949 				if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
4950 #endif							/* RLIMIT_NOFILE */
4951 				{
4952 					fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
4953 					exit(1);
4954 				}
4955 				if (rlim.rlim_cur < nclients + 3)
4956 				{
4957 					fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
4958 							nclients + 3, (long) rlim.rlim_cur);
4959 					fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
4960 					exit(1);
4961 				}
4962 #endif							/* HAVE_GETRLIMIT */
4963 				break;
4964 			case 'j':			/* jobs */
4965 				benchmarking_option_set = true;
4966 				nthreads = atoi(optarg);
4967 				if (nthreads <= 0)
4968 				{
4969 					fprintf(stderr, "invalid number of threads: \"%s\"\n",
4970 							optarg);
4971 					exit(1);
4972 				}
4973 #ifndef ENABLE_THREAD_SAFETY
4974 				if (nthreads != 1)
4975 				{
4976 					fprintf(stderr, "threads are not supported on this platform; use -j1\n");
4977 					exit(1);
4978 				}
4979 #endif							/* !ENABLE_THREAD_SAFETY */
4980 				break;
4981 			case 'C':
4982 				benchmarking_option_set = true;
4983 				is_connect = true;
4984 				break;
4985 			case 'r':
4986 				benchmarking_option_set = true;
4987 				is_latencies = true;
4988 				break;
4989 			case 's':
4990 				scale_given = true;
4991 				scale = atoi(optarg);
4992 				if (scale <= 0)
4993 				{
4994 					fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
4995 					exit(1);
4996 				}
4997 				break;
4998 			case 't':
4999 				benchmarking_option_set = true;
5000 				nxacts = atoi(optarg);
5001 				if (nxacts <= 0)
5002 				{
5003 					fprintf(stderr, "invalid number of transactions: \"%s\"\n",
5004 							optarg);
5005 					exit(1);
5006 				}
5007 				break;
5008 			case 'T':
5009 				benchmarking_option_set = true;
5010 				duration = atoi(optarg);
5011 				if (duration <= 0)
5012 				{
5013 					fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
5014 					exit(1);
5015 				}
5016 				break;
5017 			case 'U':
5018 				login = pg_strdup(optarg);
5019 				break;
5020 			case 'l':
5021 				benchmarking_option_set = true;
5022 				use_log = true;
5023 				break;
5024 			case 'q':
5025 				initialization_option_set = true;
5026 				use_quiet = true;
5027 				break;
5028 			case 'b':
5029 				if (strcmp(optarg, "list") == 0)
5030 				{
5031 					listAvailableScripts();
5032 					exit(0);
5033 				}
5034 				weight = parseScriptWeight(optarg, &script);
5035 				process_builtin(findBuiltin(script), weight);
5036 				benchmarking_option_set = true;
5037 				internal_script_used = true;
5038 				break;
5039 			case 'S':
5040 				process_builtin(findBuiltin("select-only"), 1);
5041 				benchmarking_option_set = true;
5042 				internal_script_used = true;
5043 				break;
5044 			case 'N':
5045 				process_builtin(findBuiltin("simple-update"), 1);
5046 				benchmarking_option_set = true;
5047 				internal_script_used = true;
5048 				break;
5049 			case 'f':
5050 				weight = parseScriptWeight(optarg, &script);
5051 				process_file(script, weight);
5052 				benchmarking_option_set = true;
5053 				break;
5054 			case 'D':
5055 				{
5056 					char	   *p;
5057 
5058 					benchmarking_option_set = true;
5059 
5060 					if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
5061 					{
5062 						fprintf(stderr, "invalid variable definition: \"%s\"\n",
5063 								optarg);
5064 						exit(1);
5065 					}
5066 
5067 					*p++ = '\0';
5068 					if (!putVariable(&state[0], "option", optarg, p))
5069 						exit(1);
5070 				}
5071 				break;
5072 			case 'F':
5073 				initialization_option_set = true;
5074 				fillfactor = atoi(optarg);
5075 				if (fillfactor < 10 || fillfactor > 100)
5076 				{
5077 					fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
5078 					exit(1);
5079 				}
5080 				break;
5081 			case 'M':
5082 				benchmarking_option_set = true;
5083 				for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
5084 					if (strcmp(optarg, QUERYMODE[querymode]) == 0)
5085 						break;
5086 				if (querymode >= NUM_QUERYMODE)
5087 				{
5088 					fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
5089 							optarg);
5090 					exit(1);
5091 				}
5092 				break;
5093 			case 'P':
5094 				benchmarking_option_set = true;
5095 				progress = atoi(optarg);
5096 				if (progress <= 0)
5097 				{
5098 					fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
5099 							optarg);
5100 					exit(1);
5101 				}
5102 				break;
5103 			case 'R':
5104 				{
5105 					/* get a double from the beginning of option value */
5106 					double		throttle_value = atof(optarg);
5107 
5108 					benchmarking_option_set = true;
5109 
5110 					if (throttle_value <= 0.0)
5111 					{
5112 						fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
5113 						exit(1);
5114 					}
5115 					/* Invert rate limit into a time offset */
5116 					throttle_delay = (int64) (1000000.0 / throttle_value);
5117 				}
5118 				break;
5119 			case 'L':
5120 				{
5121 					double		limit_ms = atof(optarg);
5122 
5123 					if (limit_ms <= 0.0)
5124 					{
5125 						fprintf(stderr, "invalid latency limit: \"%s\"\n",
5126 								optarg);
5127 						exit(1);
5128 					}
5129 					benchmarking_option_set = true;
5130 					latency_limit = (int64) (limit_ms * 1000);
5131 				}
5132 				break;
5133 			case 1:				/* unlogged-tables */
5134 				initialization_option_set = true;
5135 				unlogged_tables = true;
5136 				break;
5137 			case 2:				/* tablespace */
5138 				initialization_option_set = true;
5139 				tablespace = pg_strdup(optarg);
5140 				break;
5141 			case 3:				/* index-tablespace */
5142 				initialization_option_set = true;
5143 				index_tablespace = pg_strdup(optarg);
5144 				break;
5145 			case 4:				/* sampling-rate */
5146 				benchmarking_option_set = true;
5147 				sample_rate = atof(optarg);
5148 				if (sample_rate <= 0.0 || sample_rate > 1.0)
5149 				{
5150 					fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
5151 					exit(1);
5152 				}
5153 				break;
5154 			case 5:				/* aggregate-interval */
5155 				benchmarking_option_set = true;
5156 				agg_interval = atoi(optarg);
5157 				if (agg_interval <= 0)
5158 				{
5159 					fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
5160 							optarg);
5161 					exit(1);
5162 				}
5163 				break;
5164 			case 6:				/* progress-timestamp */
5165 				progress_timestamp = true;
5166 				benchmarking_option_set = true;
5167 				break;
5168 			case 7:				/* log-prefix */
5169 				benchmarking_option_set = true;
5170 				logfile_prefix = pg_strdup(optarg);
5171 				break;
5172 			case 8:				/* foreign-keys */
5173 				initialization_option_set = true;
5174 				foreign_keys = true;
5175 				break;
5176 			case 9:				/* random-seed */
5177 				benchmarking_option_set = true;
5178 				if (!set_random_seed(optarg))
5179 				{
5180 					fprintf(stderr, "error while setting random seed from --random-seed option\n");
5181 					exit(1);
5182 				}
5183 				break;
5184 			default:
5185 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
5186 				exit(1);
5187 				break;
5188 		}
5189 	}
5190 
5191 	/* set default script if none */
5192 	if (num_scripts == 0 && !is_init_mode)
5193 	{
5194 		process_builtin(findBuiltin("tpcb-like"), 1);
5195 		benchmarking_option_set = true;
5196 		internal_script_used = true;
5197 	}
5198 
5199 	/* if not simple query mode, parse the script(s) to find parameters */
5200 	if (querymode != QUERY_SIMPLE)
5201 	{
5202 		for (i = 0; i < num_scripts; i++)
5203 		{
5204 			Command   **commands = sql_script[i].commands;
5205 			int			j;
5206 
5207 			for (j = 0; commands[j] != NULL; j++)
5208 			{
5209 				if (commands[j]->type != SQL_COMMAND)
5210 					continue;
5211 				if (!parseQuery(commands[j]))
5212 					exit(1);
5213 			}
5214 		}
5215 	}
5216 
5217 	/* compute total_weight */
5218 	for (i = 0; i < num_scripts; i++)
5219 		/* cannot overflow: weight is 32b, total_weight 64b */
5220 		total_weight += sql_script[i].weight;
5221 
5222 	if (total_weight == 0 && !is_init_mode)
5223 	{
5224 		fprintf(stderr, "total script weight must not be zero\n");
5225 		exit(1);
5226 	}
5227 
5228 	/* show per script stats if several scripts are used */
5229 	if (num_scripts > 1)
5230 		per_script_stats = true;
5231 
5232 	/*
5233 	 * Don't need more threads than there are clients.  (This is not merely an
5234 	 * optimization; throttle_delay is calculated incorrectly below if some
5235 	 * threads have no clients assigned to them.)
5236 	 */
5237 	if (nthreads > nclients)
5238 		nthreads = nclients;
5239 
5240 	/* compute a per thread delay */
5241 	throttle_delay *= nthreads;
5242 
5243 	if (argc > optind)
5244 		dbName = argv[optind];
5245 	else
5246 	{
5247 		if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
5248 			dbName = env;
5249 		else if (login != NULL && *login != '\0')
5250 			dbName = login;
5251 		else
5252 			dbName = "";
5253 	}
5254 
5255 	if (is_init_mode)
5256 	{
5257 		if (benchmarking_option_set)
5258 		{
5259 			fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
5260 			exit(1);
5261 		}
5262 
5263 		if (initialize_steps == NULL)
5264 			initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
5265 
5266 		if (is_no_vacuum)
5267 		{
5268 			/* Remove any vacuum step in initialize_steps */
5269 			char	   *p;
5270 
5271 			while ((p = strchr(initialize_steps, 'v')) != NULL)
5272 				*p = ' ';
5273 		}
5274 
5275 		if (foreign_keys)
5276 		{
5277 			/* Add 'f' to end of initialize_steps, if not already there */
5278 			if (strchr(initialize_steps, 'f') == NULL)
5279 			{
5280 				initialize_steps = (char *)
5281 					pg_realloc(initialize_steps,
5282 							   strlen(initialize_steps) + 2);
5283 				strcat(initialize_steps, "f");
5284 			}
5285 		}
5286 
5287 		runInitSteps(initialize_steps);
5288 		exit(0);
5289 	}
5290 	else
5291 	{
5292 		if (initialization_option_set)
5293 		{
5294 			fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
5295 			exit(1);
5296 		}
5297 	}
5298 
5299 	if (nxacts > 0 && duration > 0)
5300 	{
5301 		fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
5302 		exit(1);
5303 	}
5304 
5305 	/* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
5306 	if (nxacts <= 0 && duration <= 0)
5307 		nxacts = DEFAULT_NXACTS;
5308 
5309 	/* --sampling-rate may be used only with -l */
5310 	if (sample_rate > 0.0 && !use_log)
5311 	{
5312 		fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
5313 		exit(1);
5314 	}
5315 
5316 	/* --sampling-rate may not be used with --aggregate-interval */
5317 	if (sample_rate > 0.0 && agg_interval > 0)
5318 	{
5319 		fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
5320 		exit(1);
5321 	}
5322 
5323 	if (agg_interval > 0 && !use_log)
5324 	{
5325 		fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
5326 		exit(1);
5327 	}
5328 
5329 	if (!use_log && logfile_prefix)
5330 	{
5331 		fprintf(stderr, "log file prefix (--log-prefix) is allowed only when logging transactions (-l)\n");
5332 		exit(1);
5333 	}
5334 
5335 	if (duration > 0 && agg_interval > duration)
5336 	{
5337 		fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
5338 		exit(1);
5339 	}
5340 
5341 	if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
5342 	{
5343 		fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
5344 		exit(1);
5345 	}
5346 
5347 	if (progress_timestamp && progress == 0)
5348 	{
5349 		fprintf(stderr, "--progress-timestamp is allowed only under --progress\n");
5350 		exit(1);
5351 	}
5352 
5353 	/*
5354 	 * save main process id in the global variable because process id will be
5355 	 * changed after fork.
5356 	 */
5357 	main_pid = (int) getpid();
5358 
5359 	if (nclients > 1)
5360 	{
5361 		state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
5362 		memset(state + 1, 0, sizeof(CState) * (nclients - 1));
5363 
5364 		/* copy any -D switch values to all clients */
5365 		for (i = 1; i < nclients; i++)
5366 		{
5367 			int			j;
5368 
5369 			state[i].id = i;
5370 			for (j = 0; j < state[0].nvariables; j++)
5371 			{
5372 				Variable   *var = &state[0].variables[j];
5373 
5374 				if (var->value.type != PGBT_NO_VALUE)
5375 				{
5376 					if (!putVariableValue(&state[i], "startup",
5377 										  var->name, &var->value))
5378 						exit(1);
5379 				}
5380 				else
5381 				{
5382 					if (!putVariable(&state[i], "startup",
5383 									 var->name, var->svalue))
5384 						exit(1);
5385 				}
5386 			}
5387 		}
5388 	}
5389 
5390 	/* other CState initializations */
5391 	for (i = 0; i < nclients; i++)
5392 	{
5393 		state[i].cstack = conditional_stack_create();
5394 	}
5395 
5396 	if (debug)
5397 	{
5398 		if (duration <= 0)
5399 			printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
5400 				   pghost, pgport, nclients, nxacts, dbName);
5401 		else
5402 			printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
5403 				   pghost, pgport, nclients, duration, dbName);
5404 	}
5405 
5406 	/* opening connection... */
5407 	con = doConnect();
5408 	if (con == NULL)
5409 		exit(1);
5410 
5411 	if (internal_script_used)
5412 	{
5413 		/*
5414 		 * get the scaling factor that should be same as count(*) from
5415 		 * pgbench_branches if this is not a custom query
5416 		 */
5417 		res = PQexec(con, "select count(*) from pgbench_branches");
5418 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
5419 		{
5420 			char	   *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
5421 
5422 			fprintf(stderr, "%s", PQerrorMessage(con));
5423 			if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
5424 			{
5425 				fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
5426 			}
5427 
5428 			exit(1);
5429 		}
5430 		scale = atoi(PQgetvalue(res, 0, 0));
5431 		if (scale < 0)
5432 		{
5433 			fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
5434 					PQgetvalue(res, 0, 0));
5435 			exit(1);
5436 		}
5437 		PQclear(res);
5438 
5439 		/* warn if we override user-given -s switch */
5440 		if (scale_given)
5441 			fprintf(stderr,
5442 					"scale option ignored, using count from pgbench_branches table (%d)\n",
5443 					scale);
5444 	}
5445 
5446 	/*
5447 	 * :scale variables normally get -s or database scale, but don't override
5448 	 * an explicit -D switch
5449 	 */
5450 	if (lookupVariable(&state[0], "scale") == NULL)
5451 	{
5452 		for (i = 0; i < nclients; i++)
5453 		{
5454 			if (!putVariableInt(&state[i], "startup", "scale", scale))
5455 				exit(1);
5456 		}
5457 	}
5458 
5459 	/*
5460 	 * Define a :client_id variable that is unique per connection. But don't
5461 	 * override an explicit -D switch.
5462 	 */
5463 	if (lookupVariable(&state[0], "client_id") == NULL)
5464 	{
5465 		for (i = 0; i < nclients; i++)
5466 			if (!putVariableInt(&state[i], "startup", "client_id", i))
5467 				exit(1);
5468 	}
5469 
5470 	/* set default seed for hash functions */
5471 	if (lookupVariable(&state[0], "default_seed") == NULL)
5472 	{
5473 		uint64		seed =
5474 		((uint64) pg_jrand48(base_random_sequence) & 0xFFFFFFFF) |
5475 		(((uint64) pg_jrand48(base_random_sequence) & 0xFFFFFFFF) << 32);
5476 
5477 		for (i = 0; i < nclients; i++)
5478 			if (!putVariableInt(&state[i], "startup", "default_seed", (int64) seed))
5479 				exit(1);
5480 	}
5481 
5482 	/* set random seed unless overwritten */
5483 	if (lookupVariable(&state[0], "random_seed") == NULL)
5484 	{
5485 		for (i = 0; i < nclients; i++)
5486 			if (!putVariableInt(&state[i], "startup", "random_seed", random_seed))
5487 				exit(1);
5488 	}
5489 
5490 	if (!is_no_vacuum)
5491 	{
5492 		fprintf(stderr, "starting vacuum...");
5493 		tryExecuteStatement(con, "vacuum pgbench_branches");
5494 		tryExecuteStatement(con, "vacuum pgbench_tellers");
5495 		tryExecuteStatement(con, "truncate pgbench_history");
5496 		fprintf(stderr, "end.\n");
5497 
5498 		if (do_vacuum_accounts)
5499 		{
5500 			fprintf(stderr, "starting vacuum pgbench_accounts...");
5501 			tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
5502 			fprintf(stderr, "end.\n");
5503 		}
5504 	}
5505 	PQfinish(con);
5506 
5507 	/* set up thread data structures */
5508 	threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
5509 	nclients_dealt = 0;
5510 
5511 	for (i = 0; i < nthreads; i++)
5512 	{
5513 		TState	   *thread = &threads[i];
5514 
5515 		thread->tid = i;
5516 		thread->state = &state[nclients_dealt];
5517 		thread->nstate =
5518 			(nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
5519 		thread->random_state[0] = (unsigned short)
5520 			(pg_jrand48(base_random_sequence) & 0xFFFF);
5521 		thread->random_state[1] = (unsigned short)
5522 			(pg_jrand48(base_random_sequence) & 0xFFFF);
5523 		thread->random_state[2] = (unsigned short)
5524 			(pg_jrand48(base_random_sequence) & 0xFFFF);
5525 		thread->logfile = NULL; /* filled in later */
5526 		thread->latency_late = 0;
5527 		thread->zipf_cache.nb_cells = 0;
5528 		thread->zipf_cache.current = 0;
5529 		thread->zipf_cache.overflowCount = 0;
5530 		initStats(&thread->stats, 0);
5531 
5532 		nclients_dealt += thread->nstate;
5533 	}
5534 
5535 	/* all clients must be assigned to a thread */
5536 	Assert(nclients_dealt == nclients);
5537 
5538 	/* get start up time */
5539 	INSTR_TIME_SET_CURRENT(start_time);
5540 
5541 	/* set alarm if duration is specified. */
5542 	if (duration > 0)
5543 		setalarm(duration);
5544 
5545 	/* start threads */
5546 #ifdef ENABLE_THREAD_SAFETY
5547 	for (i = 0; i < nthreads; i++)
5548 	{
5549 		TState	   *thread = &threads[i];
5550 
5551 		INSTR_TIME_SET_CURRENT(thread->start_time);
5552 
5553 		/* compute when to stop */
5554 		if (duration > 0)
5555 			end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
5556 				(int64) 1000000 * duration;
5557 
5558 		/* the first thread (i = 0) is executed by main thread */
5559 		if (i > 0)
5560 		{
5561 			int			err = pthread_create(&thread->thread, NULL, threadRun, thread);
5562 
5563 			if (err != 0 || thread->thread == INVALID_THREAD)
5564 			{
5565 				fprintf(stderr, "could not create thread: %s\n", strerror(err));
5566 				exit(1);
5567 			}
5568 		}
5569 		else
5570 		{
5571 			thread->thread = INVALID_THREAD;
5572 		}
5573 	}
5574 #else
5575 	INSTR_TIME_SET_CURRENT(threads[0].start_time);
5576 	/* compute when to stop */
5577 	if (duration > 0)
5578 		end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
5579 			(int64) 1000000 * duration;
5580 	threads[0].thread = INVALID_THREAD;
5581 #endif							/* ENABLE_THREAD_SAFETY */
5582 
5583 	/* wait for threads and accumulate results */
5584 	initStats(&stats, 0);
5585 	INSTR_TIME_SET_ZERO(conn_total_time);
5586 	for (i = 0; i < nthreads; i++)
5587 	{
5588 		TState	   *thread = &threads[i];
5589 
5590 #ifdef ENABLE_THREAD_SAFETY
5591 		if (threads[i].thread == INVALID_THREAD)
5592 			/* actually run this thread directly in the main thread */
5593 			(void) threadRun(thread);
5594 		else
5595 			/* wait of other threads. should check that 0 is returned? */
5596 			pthread_join(thread->thread, NULL);
5597 #else
5598 		(void) threadRun(thread);
5599 #endif							/* ENABLE_THREAD_SAFETY */
5600 
5601 		/* aggregate thread level stats */
5602 		mergeSimpleStats(&stats.latency, &thread->stats.latency);
5603 		mergeSimpleStats(&stats.lag, &thread->stats.lag);
5604 		stats.cnt += thread->stats.cnt;
5605 		stats.skipped += thread->stats.skipped;
5606 		latency_late += thread->latency_late;
5607 		INSTR_TIME_ADD(conn_total_time, thread->conn_time);
5608 	}
5609 	disconnect_all(state, nclients);
5610 
5611 	/*
5612 	 * XXX We compute results as though every client of every thread started
5613 	 * and finished at the same time.  That model can diverge noticeably from
5614 	 * reality for a short benchmark run involving relatively many threads.
5615 	 * The first thread may process notably many transactions before the last
5616 	 * thread begins.  Improving the model alone would bring limited benefit,
5617 	 * because performance during those periods of partial thread count can
5618 	 * easily exceed steady state performance.  This is one of the many ways
5619 	 * short runs convey deceptive performance figures.
5620 	 */
5621 	INSTR_TIME_SET_CURRENT(total_time);
5622 	INSTR_TIME_SUBTRACT(total_time, start_time);
5623 	printResults(threads, &stats, total_time, conn_total_time, latency_late);
5624 
5625 	return 0;
5626 }
5627 
5628 static void *
threadRun(void * arg)5629 threadRun(void *arg)
5630 {
5631 	TState	   *thread = (TState *) arg;
5632 	CState	   *state = thread->state;
5633 	instr_time	start,
5634 				end;
5635 	int			nstate = thread->nstate;
5636 	int			remains = nstate;	/* number of remaining clients */
5637 	int			i;
5638 
5639 	/* for reporting progress: */
5640 	int64		thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
5641 	int64		last_report = thread_start;
5642 	int64		next_report = last_report + (int64) progress * 1000000;
5643 	StatsData	last,
5644 				aggs;
5645 
5646 	/*
5647 	 * Initialize throttling rate target for all of the thread's clients.  It
5648 	 * might be a little more accurate to reset thread->start_time here too.
5649 	 * The possible drift seems too small relative to typical throttle delay
5650 	 * times to worry about it.
5651 	 */
5652 	INSTR_TIME_SET_CURRENT(start);
5653 	thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
5654 
5655 	INSTR_TIME_SET_ZERO(thread->conn_time);
5656 
5657 	initStats(&aggs, time(NULL));
5658 	last = aggs;
5659 
5660 	/* open log file if requested */
5661 	if (use_log)
5662 	{
5663 		char		logpath[MAXPGPATH];
5664 		char	   *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
5665 
5666 		if (thread->tid == 0)
5667 			snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
5668 		else
5669 			snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
5670 
5671 		thread->logfile = fopen(logpath, "w");
5672 
5673 		if (thread->logfile == NULL)
5674 		{
5675 			fprintf(stderr, "could not open logfile \"%s\": %s\n",
5676 					logpath, strerror(errno));
5677 			goto done;
5678 		}
5679 	}
5680 
5681 	if (!is_connect)
5682 	{
5683 		/* make connections to the database */
5684 		for (i = 0; i < nstate; i++)
5685 		{
5686 			if ((state[i].con = doConnect()) == NULL)
5687 				goto done;
5688 		}
5689 	}
5690 
5691 	/* time after thread and connections set up */
5692 	INSTR_TIME_SET_CURRENT(thread->conn_time);
5693 	INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
5694 
5695 	/* explicitly initialize the state machines */
5696 	for (i = 0; i < nstate; i++)
5697 	{
5698 		state[i].state = CSTATE_CHOOSE_SCRIPT;
5699 	}
5700 
5701 	/* loop till all clients have terminated */
5702 	while (remains > 0)
5703 	{
5704 		fd_set		input_mask;
5705 		int			maxsock;	/* max socket number to be waited for */
5706 		int64		min_usec;
5707 		int64		now_usec = 0;	/* set this only if needed */
5708 
5709 		/* identify which client sockets should be checked for input */
5710 		FD_ZERO(&input_mask);
5711 		maxsock = -1;
5712 		min_usec = PG_INT64_MAX;
5713 		for (i = 0; i < nstate; i++)
5714 		{
5715 			CState	   *st = &state[i];
5716 
5717 			if (st->state == CSTATE_THROTTLE && timer_exceeded)
5718 			{
5719 				/* interrupt client that has not started a transaction */
5720 				st->state = CSTATE_FINISHED;
5721 				finishCon(st);
5722 				remains--;
5723 			}
5724 			else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
5725 			{
5726 				/* a nap from the script, or under throttling */
5727 				int64		this_usec;
5728 
5729 				/* get current time if needed */
5730 				if (now_usec == 0)
5731 				{
5732 					instr_time	now;
5733 
5734 					INSTR_TIME_SET_CURRENT(now);
5735 					now_usec = INSTR_TIME_GET_MICROSEC(now);
5736 				}
5737 
5738 				/* min_usec should be the minimum delay across all clients */
5739 				this_usec = (st->state == CSTATE_SLEEP ?
5740 							 st->sleep_until : st->txn_scheduled) - now_usec;
5741 				if (min_usec > this_usec)
5742 					min_usec = this_usec;
5743 			}
5744 			else if (st->state == CSTATE_WAIT_RESULT)
5745 			{
5746 				/*
5747 				 * waiting for result from server - nothing to do unless the
5748 				 * socket is readable
5749 				 */
5750 				int			sock = PQsocket(st->con);
5751 
5752 				if (sock < 0)
5753 				{
5754 					fprintf(stderr, "invalid socket: %s",
5755 							PQerrorMessage(st->con));
5756 					goto done;
5757 				}
5758 
5759 				FD_SET(sock, &input_mask);
5760 				if (maxsock < sock)
5761 					maxsock = sock;
5762 			}
5763 			else if (st->state != CSTATE_ABORTED &&
5764 					 st->state != CSTATE_FINISHED)
5765 			{
5766 				/*
5767 				 * This client thread is ready to do something, so we don't
5768 				 * want to wait.  No need to examine additional clients.
5769 				 */
5770 				min_usec = 0;
5771 				break;
5772 			}
5773 		}
5774 
5775 		/* under throttling we may have finished the last client above */
5776 		if (remains == 0)
5777 			break;
5778 
5779 		/* also wake up to print the next progress report on time */
5780 		if (progress && min_usec > 0 && thread->tid == 0)
5781 		{
5782 			/* get current time if needed */
5783 			if (now_usec == 0)
5784 			{
5785 				instr_time	now;
5786 
5787 				INSTR_TIME_SET_CURRENT(now);
5788 				now_usec = INSTR_TIME_GET_MICROSEC(now);
5789 			}
5790 
5791 			if (now_usec >= next_report)
5792 				min_usec = 0;
5793 			else if ((next_report - now_usec) < min_usec)
5794 				min_usec = next_report - now_usec;
5795 		}
5796 
5797 		/*
5798 		 * If no clients are ready to execute actions, sleep until we receive
5799 		 * data from the server, or a nap-time specified in the script ends,
5800 		 * or it's time to print a progress report.  Update input_mask to show
5801 		 * which client(s) received data.
5802 		 */
5803 		if (min_usec > 0)
5804 		{
5805 			int			nsocks = 0; /* return from select(2) if called */
5806 
5807 			if (min_usec != PG_INT64_MAX)
5808 			{
5809 				if (maxsock != -1)
5810 				{
5811 					struct timeval timeout;
5812 
5813 					timeout.tv_sec = min_usec / 1000000;
5814 					timeout.tv_usec = min_usec % 1000000;
5815 					nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
5816 				}
5817 				else			/* nothing active, simple sleep */
5818 				{
5819 					pg_usleep(min_usec);
5820 				}
5821 			}
5822 			else				/* no explicit delay, select without timeout */
5823 			{
5824 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
5825 			}
5826 
5827 			if (nsocks < 0)
5828 			{
5829 				if (errno == EINTR)
5830 				{
5831 					/* On EINTR, go back to top of loop */
5832 					continue;
5833 				}
5834 				/* must be something wrong */
5835 				fprintf(stderr, "select() failed: %s\n", strerror(errno));
5836 				goto done;
5837 			}
5838 		}
5839 		else
5840 		{
5841 			/* min_usec == 0, i.e. something needs to be executed */
5842 
5843 			/* If we didn't call select(), don't try to read any data */
5844 			FD_ZERO(&input_mask);
5845 		}
5846 
5847 		/* ok, advance the state machine of each connection */
5848 		for (i = 0; i < nstate; i++)
5849 		{
5850 			CState	   *st = &state[i];
5851 
5852 			if (st->state == CSTATE_WAIT_RESULT)
5853 			{
5854 				/* don't call doCustom unless data is available */
5855 				int			sock = PQsocket(st->con);
5856 
5857 				if (sock < 0)
5858 				{
5859 					fprintf(stderr, "invalid socket: %s",
5860 							PQerrorMessage(st->con));
5861 					goto done;
5862 				}
5863 
5864 				if (!FD_ISSET(sock, &input_mask))
5865 					continue;
5866 			}
5867 			else if (st->state == CSTATE_FINISHED ||
5868 					 st->state == CSTATE_ABORTED)
5869 			{
5870 				/* this client is done, no need to consider it anymore */
5871 				continue;
5872 			}
5873 
5874 			doCustom(thread, st, &aggs);
5875 
5876 			/* If doCustom changed client to finished state, reduce remains */
5877 			if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
5878 				remains--;
5879 		}
5880 
5881 		/* progress report is made by thread 0 for all threads */
5882 		if (progress && thread->tid == 0)
5883 		{
5884 			instr_time	now_time;
5885 			int64		now;
5886 
5887 			INSTR_TIME_SET_CURRENT(now_time);
5888 			now = INSTR_TIME_GET_MICROSEC(now_time);
5889 			if (now >= next_report)
5890 			{
5891 				/* generate and show report */
5892 				StatsData	cur;
5893 				int64		run = now - last_report,
5894 							ntx;
5895 				double		tps,
5896 							total_run,
5897 							latency,
5898 							sqlat,
5899 							lag,
5900 							stdev;
5901 				char		tbuf[315];
5902 
5903 				/*
5904 				 * Add up the statistics of all threads.
5905 				 *
5906 				 * XXX: No locking. There is no guarantee that we get an
5907 				 * atomic snapshot of the transaction count and latencies, so
5908 				 * these figures can well be off by a small amount. The
5909 				 * progress report's purpose is to give a quick overview of
5910 				 * how the test is going, so that shouldn't matter too much.
5911 				 * (If a read from a 64-bit integer is not atomic, you might
5912 				 * get a "torn" read and completely bogus latencies though!)
5913 				 */
5914 				initStats(&cur, 0);
5915 				for (i = 0; i < nthreads; i++)
5916 				{
5917 					mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
5918 					mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
5919 					cur.cnt += thread[i].stats.cnt;
5920 					cur.skipped += thread[i].stats.skipped;
5921 				}
5922 
5923 				/* we count only actually executed transactions */
5924 				ntx = (cur.cnt - cur.skipped) - (last.cnt - last.skipped);
5925 				total_run = (now - thread_start) / 1000000.0;
5926 				tps = 1000000.0 * ntx / run;
5927 				if (ntx > 0)
5928 				{
5929 					latency = 0.001 * (cur.latency.sum - last.latency.sum) / ntx;
5930 					sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2) / ntx;
5931 					stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
5932 					lag = 0.001 * (cur.lag.sum - last.lag.sum) / ntx;
5933 				}
5934 				else
5935 				{
5936 					latency = sqlat = stdev = lag = 0;
5937 				}
5938 
5939 				if (progress_timestamp)
5940 				{
5941 					/*
5942 					 * On some platforms the current system timestamp is
5943 					 * available in now_time, but rather than get entangled
5944 					 * with that, we just eat the cost of an extra syscall in
5945 					 * all cases.
5946 					 */
5947 					struct timeval tv;
5948 
5949 					gettimeofday(&tv, NULL);
5950 					snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s",
5951 							 (long) tv.tv_sec, (long) (tv.tv_usec / 1000));
5952 				}
5953 				else
5954 				{
5955 					/* round seconds are expected, but the thread may be late */
5956 					snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
5957 				}
5958 
5959 				fprintf(stderr,
5960 						"progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
5961 						tbuf, tps, latency, stdev);
5962 
5963 				if (throttle_delay)
5964 				{
5965 					fprintf(stderr, ", lag %.3f ms", lag);
5966 					if (latency_limit)
5967 						fprintf(stderr, ", " INT64_FORMAT " skipped",
5968 								cur.skipped - last.skipped);
5969 				}
5970 				fprintf(stderr, "\n");
5971 
5972 				last = cur;
5973 				last_report = now;
5974 
5975 				/*
5976 				 * Ensure that the next report is in the future, in case
5977 				 * pgbench/postgres got stuck somewhere.
5978 				 */
5979 				do
5980 				{
5981 					next_report += (int64) progress * 1000000;
5982 				} while (now >= next_report);
5983 			}
5984 		}
5985 	}
5986 
5987 done:
5988 	INSTR_TIME_SET_CURRENT(start);
5989 	disconnect_all(state, nstate);
5990 	INSTR_TIME_SET_CURRENT(end);
5991 	INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
5992 	if (thread->logfile)
5993 	{
5994 		if (agg_interval > 0)
5995 		{
5996 			/* log aggregated but not yet reported transactions */
5997 			doLog(thread, state, &aggs, false, 0, 0);
5998 		}
5999 		fclose(thread->logfile);
6000 		thread->logfile = NULL;
6001 	}
6002 	return NULL;
6003 }
6004 
6005 static void
finishCon(CState * st)6006 finishCon(CState *st)
6007 {
6008 	if (st->con != NULL)
6009 	{
6010 		PQfinish(st->con);
6011 		st->con = NULL;
6012 	}
6013 }
6014 
6015 /*
6016  * Support for duration option: set timer_exceeded after so many seconds.
6017  */
6018 
6019 #ifndef WIN32
6020 
6021 static void
handle_sig_alarm(SIGNAL_ARGS)6022 handle_sig_alarm(SIGNAL_ARGS)
6023 {
6024 	timer_exceeded = true;
6025 }
6026 
6027 static void
setalarm(int seconds)6028 setalarm(int seconds)
6029 {
6030 	pqsignal(SIGALRM, handle_sig_alarm);
6031 	alarm(seconds);
6032 }
6033 
6034 #else							/* WIN32 */
6035 
6036 static VOID CALLBACK
win32_timer_callback(PVOID lpParameter,BOOLEAN TimerOrWaitFired)6037 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
6038 {
6039 	timer_exceeded = true;
6040 }
6041 
6042 static void
setalarm(int seconds)6043 setalarm(int seconds)
6044 {
6045 	HANDLE		queue;
6046 	HANDLE		timer;
6047 
6048 	/* This function will be called at most once, so we can cheat a bit. */
6049 	queue = CreateTimerQueue();
6050 	if (seconds > ((DWORD) -1) / 1000 ||
6051 		!CreateTimerQueueTimer(&timer, queue,
6052 							   win32_timer_callback, NULL, seconds * 1000, 0,
6053 							   WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
6054 	{
6055 		fprintf(stderr, "failed to set timer\n");
6056 		exit(1);
6057 	}
6058 }
6059 
6060 /* partial pthread implementation for Windows */
6061 
6062 typedef struct win32_pthread
6063 {
6064 	HANDLE		handle;
6065 	void	   *(*routine) (void *);
6066 	void	   *arg;
6067 	void	   *result;
6068 } win32_pthread;
6069 
6070 static unsigned __stdcall
win32_pthread_run(void * arg)6071 win32_pthread_run(void *arg)
6072 {
6073 	win32_pthread *th = (win32_pthread *) arg;
6074 
6075 	th->result = th->routine(th->arg);
6076 
6077 	return 0;
6078 }
6079 
6080 static int
pthread_create(pthread_t * thread,pthread_attr_t * attr,void * (* start_routine)(void *),void * arg)6081 pthread_create(pthread_t *thread,
6082 			   pthread_attr_t *attr,
6083 			   void *(*start_routine) (void *),
6084 			   void *arg)
6085 {
6086 	int			save_errno;
6087 	win32_pthread *th;
6088 
6089 	th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
6090 	th->routine = start_routine;
6091 	th->arg = arg;
6092 	th->result = NULL;
6093 
6094 	th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
6095 	if (th->handle == NULL)
6096 	{
6097 		save_errno = errno;
6098 		free(th);
6099 		return save_errno;
6100 	}
6101 
6102 	*thread = th;
6103 	return 0;
6104 }
6105 
6106 static int
pthread_join(pthread_t th,void ** thread_return)6107 pthread_join(pthread_t th, void **thread_return)
6108 {
6109 	if (th == NULL || th->handle == NULL)
6110 		return errno = EINVAL;
6111 
6112 	if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
6113 	{
6114 		_dosmaperr(GetLastError());
6115 		return errno;
6116 	}
6117 
6118 	if (thread_return)
6119 		*thread_return = th->result;
6120 
6121 	CloseHandle(th->handle);
6122 	free(th);
6123 	return 0;
6124 }
6125 
6126 #endif							/* WIN32 */
6127