1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * src/bin/pgbench/pgbench.c
8  * Copyright (c) 2000-2016, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29 
30 #ifdef WIN32
31 #define FD_SETSIZE 1024			/* set before winsock2.h is included */
32 #endif   /* ! WIN32 */
33 
34 #include "postgres_fe.h"
35 
36 #include "getopt_long.h"
37 #include "libpq-fe.h"
38 #include "portability/instr_time.h"
39 
40 #include <ctype.h>
41 #include <float.h>
42 #include <limits.h>
43 #include <math.h>
44 #include <signal.h>
45 #include <sys/time.h>
46 #ifdef HAVE_SYS_SELECT_H
47 #include <sys/select.h>
48 #endif
49 
50 #ifdef HAVE_SYS_RESOURCE_H
51 #include <sys/resource.h>		/* for getrlimit */
52 #endif
53 
54 #ifndef M_PI
55 #define M_PI 3.14159265358979323846
56 #endif
57 
58 #include "pgbench.h"
59 
60 #define ERRCODE_UNDEFINED_TABLE  "42P01"
61 
62 /*
63  * Multi-platform pthread implementations
64  */
65 
66 #ifdef WIN32
67 /* Use native win32 threads on Windows */
68 typedef struct win32_pthread *pthread_t;
69 typedef int pthread_attr_t;
70 
71 static int	pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
72 static int	pthread_join(pthread_t th, void **thread_return);
73 #elif defined(ENABLE_THREAD_SAFETY)
74 /* Use platform-dependent pthread capability */
75 #include <pthread.h>
76 #else
77 /* No threads implementation, use none (-j 1) */
78 #define pthread_t void *
79 #endif
80 
81 
82 /********************************************************************
83  * some configurable parameters */
84 
85 /* max number of clients allowed */
86 #ifdef FD_SETSIZE
87 #define MAXCLIENTS	(FD_SETSIZE - 10)
88 #else
89 #define MAXCLIENTS	1024
90 #endif
91 
92 #define LOG_STEP_SECONDS	5	/* seconds between log messages */
93 #define DEFAULT_NXACTS	10		/* default nxacts */
94 
95 #define MIN_GAUSSIAN_PARAM		2.0		/* minimum parameter for gauss */
96 
97 int			nxacts = 0;			/* number of transactions per client */
98 int			duration = 0;		/* duration in seconds */
99 int64		end_time = 0;		/* when to stop in micro seconds, under -T */
100 
101 /*
102  * scaling factor. for example, scale = 10 will make 1000000 tuples in
103  * pgbench_accounts table.
104  */
105 int			scale = 1;
106 
107 /*
108  * fillfactor. for example, fillfactor = 90 will use only 90 percent
109  * space during inserts and leave 10 percent free.
110  */
111 int			fillfactor = 100;
112 
113 /*
114  * create foreign key constraints on the tables?
115  */
116 int			foreign_keys = 0;
117 
118 /*
119  * use unlogged tables?
120  */
121 int			unlogged_tables = 0;
122 
123 /*
124  * log sampling rate (1.0 = log everything, 0.0 = option not given)
125  */
126 double		sample_rate = 0.0;
127 
128 /*
129  * When threads are throttled to a given rate limit, this is the target delay
130  * to reach that rate in usec.  0 is the default and means no throttling.
131  */
132 int64		throttle_delay = 0;
133 
134 /*
135  * Transactions which take longer than this limit (in usec) are counted as
136  * late, and reported as such, although they are completed anyway. When
137  * throttling is enabled, execution time slots that are more than this late
138  * are skipped altogether, and counted separately.
139  */
140 int64		latency_limit = 0;
141 
142 /*
143  * tablespace selection
144  */
145 char	   *tablespace = NULL;
146 char	   *index_tablespace = NULL;
147 
148 /*
149  * end of configurable parameters
150  *********************************************************************/
151 
152 #define nbranches	1			/* Makes little sense to change this.  Change
153 								 * -s instead */
154 #define ntellers	10
155 #define naccounts	100000
156 
157 /*
158  * The scale factor at/beyond which 32bit integers are incapable of storing
159  * 64bit values.
160  *
161  * Although the actual threshold is 21474, we use 20000 because it is easier to
162  * document and remember, and isn't that far away from the real threshold.
163  */
164 #define SCALE_32BIT_THRESHOLD 20000
165 
166 bool		use_log;			/* log transaction latencies to a file */
167 bool		use_quiet;			/* quiet logging onto stderr */
168 int			agg_interval;		/* log aggregates instead of individual
169 								 * transactions */
170 bool		per_script_stats = false;	/* whether to collect stats per script */
171 int			progress = 0;		/* thread progress report every this seconds */
172 bool		progress_timestamp = false; /* progress report with Unix time */
173 int			nclients = 1;		/* number of clients */
174 int			nthreads = 1;		/* number of threads */
175 bool		is_connect;			/* establish connection for each transaction */
176 bool		is_latencies;		/* report per-command latencies */
177 int			main_pid;			/* main process id used in log filename */
178 
179 char	   *pghost = "";
180 char	   *pgport = "";
181 char	   *login = NULL;
182 char	   *dbName;
183 const char *progname;
184 
185 #define WSEP '@'				/* weight separator */
186 
187 volatile bool timer_exceeded = false;	/* flag from signal handler */
188 
189 /*
190  * Variable definitions.  If a variable has a string value, "value" is that
191  * value, is_numeric is false, and num_value is undefined.  If the value is
192  * known to be numeric, is_numeric is true and num_value contains the value
193  * (in any permitted numeric variant).  In this case "value" contains the
194  * string equivalent of the number, if we've had occasion to compute that,
195  * or NULL if we haven't.
196  */
197 typedef struct
198 {
199 	char	   *name;			/* variable's name */
200 	char	   *value;			/* its value in string form, if known */
201 	bool		is_numeric;		/* is numeric value known? */
202 	PgBenchValue num_value;		/* variable's value in numeric form */
203 } Variable;
204 
205 #define MAX_SCRIPTS		128		/* max number of SQL scripts allowed */
206 #define SHELL_COMMAND_SIZE	256 /* maximum size allowed for shell command */
207 
208 /*
209  * Simple data structure to keep stats about something.
210  *
211  * XXX probably the first value should be kept and used as an offset for
212  * better numerical stability...
213  */
214 typedef struct SimpleStats
215 {
216 	int64		count;			/* how many values were encountered */
217 	double		min;			/* the minimum seen */
218 	double		max;			/* the maximum seen */
219 	double		sum;			/* sum of values */
220 	double		sum2;			/* sum of squared values */
221 } SimpleStats;
222 
223 /*
224  * Data structure to hold various statistics: per-thread and per-script stats
225  * are maintained and merged together.
226  */
227 typedef struct StatsData
228 {
229 	long		start_time;		/* interval start time, for aggregates */
230 	int64		cnt;			/* number of transactions */
231 	int64		skipped;		/* number of transactions skipped under --rate
232 								 * and --latency-limit */
233 	SimpleStats latency;
234 	SimpleStats lag;
235 } StatsData;
236 
237 /*
238  * Connection state
239  */
240 typedef struct
241 {
242 	PGconn	   *con;			/* connection handle to DB */
243 	int			id;				/* client No. */
244 	int			state;			/* state No. */
245 	bool		listen;			/* whether an async query has been sent */
246 	bool		sleeping;		/* whether the client is napping */
247 	bool		throttling;		/* whether nap is for throttling */
248 	bool		is_throttled;	/* whether transaction throttling is done */
249 	Variable   *variables;		/* array of variable definitions */
250 	int			nvariables;		/* number of variables */
251 	bool		vars_sorted;	/* are variables sorted by name? */
252 	int64		txn_scheduled;	/* scheduled start time of transaction (usec) */
253 	int64		sleep_until;	/* scheduled start time of next cmd (usec) */
254 	instr_time	txn_begin;		/* used for measuring schedule lag times */
255 	instr_time	stmt_begin;		/* used for measuring statement latencies */
256 	int			use_file;		/* index in sql_scripts for this client */
257 	bool		prepared[MAX_SCRIPTS];	/* whether client prepared the script */
258 
259 	/* per client collected stats */
260 	int64		cnt;			/* transaction count */
261 	int			ecnt;			/* error count */
262 } CState;
263 
264 /*
265  * Thread state
266  */
267 typedef struct
268 {
269 	int			tid;			/* thread id */
270 	pthread_t	thread;			/* thread handle */
271 	CState	   *state;			/* array of CState */
272 	int			nstate;			/* length of state[] */
273 	unsigned short random_state[3];		/* separate randomness for each thread */
274 	int64		throttle_trigger;		/* previous/next throttling (us) */
275 	FILE	   *logfile;		/* where to log, or NULL */
276 
277 	/* per thread collected stats */
278 	instr_time	start_time;		/* thread start time */
279 	instr_time	conn_time;
280 	StatsData	stats;
281 	int64		latency_late;	/* executed but late transactions */
282 } TState;
283 
284 #define INVALID_THREAD		((pthread_t) 0)
285 
286 /*
287  * queries read from files
288  */
289 #define SQL_COMMAND		1
290 #define META_COMMAND	2
291 #define MAX_ARGS		10
292 
293 typedef enum QueryMode
294 {
295 	QUERY_SIMPLE,				/* simple query */
296 	QUERY_EXTENDED,				/* extended query */
297 	QUERY_PREPARED,				/* extended query with prepared statements */
298 	NUM_QUERYMODE
299 } QueryMode;
300 
301 static QueryMode querymode = QUERY_SIMPLE;
302 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
303 
304 typedef struct
305 {
306 	char	   *line;			/* text of command line */
307 	int			command_num;	/* unique index of this Command struct */
308 	int			type;			/* command type (SQL_COMMAND or META_COMMAND) */
309 	int			argc;			/* number of command words */
310 	char	   *argv[MAX_ARGS]; /* command word list */
311 	PgBenchExpr *expr;			/* parsed expression, if needed */
312 	SimpleStats stats;			/* time spent in this command */
313 } Command;
314 
315 typedef struct ParsedScript
316 {
317 	const char *desc;			/* script descriptor (eg, file name) */
318 	int			weight;			/* selection weight */
319 	Command   **commands;		/* NULL-terminated array of Commands */
320 	StatsData	stats;			/* total time spent in script */
321 } ParsedScript;
322 
323 static ParsedScript sql_script[MAX_SCRIPTS];	/* SQL script files */
324 static int	num_scripts;		/* number of scripts in sql_script[] */
325 static int	num_commands = 0;	/* total number of Command structs */
326 static int64 total_weight = 0;
327 
328 static int	debug = 0;			/* debug flag */
329 
330 /* Builtin test scripts */
331 typedef struct BuiltinScript
332 {
333 	const char *name;			/* very short name for -b ... */
334 	const char *desc;			/* short description */
335 	const char *script;			/* actual pgbench script */
336 } BuiltinScript;
337 
338 static const BuiltinScript builtin_script[] =
339 {
340 	{
341 		"tpcb-like",
342 		"<builtin: TPC-B (sort of)>",
343 		"\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
344 		"\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
345 		"\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
346 		"\\set delta random(-5000, 5000)\n"
347 		"BEGIN;\n"
348 		"UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
349 		"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
350 		"UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
351 		"UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
352 		"INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
353 		"END;\n"
354 	},
355 	{
356 		"simple-update",
357 		"<builtin: simple update>",
358 		"\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
359 		"\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
360 		"\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
361 		"\\set delta random(-5000, 5000)\n"
362 		"BEGIN;\n"
363 		"UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
364 		"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
365 		"INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
366 		"END;\n"
367 	},
368 	{
369 		"select-only",
370 		"<builtin: select only>",
371 		"\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
372 		"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
373 	}
374 };
375 
376 
377 /* Function prototypes */
378 static void setIntValue(PgBenchValue *pv, int64 ival);
379 static void setDoubleValue(PgBenchValue *pv, double dval);
380 static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *);
381 static void doLog(TState *thread, CState *st, instr_time *now,
382 	  StatsData *agg, bool skipped, double latency, double lag);
383 static void processXactStats(TState *thread, CState *st, instr_time *now,
384 				 bool skipped, StatsData *agg);
385 static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
386 static void addScript(ParsedScript script);
387 static void *threadRun(void *arg);
388 static void setalarm(int seconds);
389 
390 
391 /* callback functions for our flex lexer */
392 static const PsqlScanCallbacks pgbench_callbacks = {
393 	NULL,						/* don't need get_variable functionality */
394 	pgbench_error
395 };
396 
397 
398 static void
usage(void)399 usage(void)
400 {
401 	printf("%s is a benchmarking tool for PostgreSQL.\n\n"
402 		   "Usage:\n"
403 		   "  %s [OPTION]... [DBNAME]\n"
404 		   "\nInitialization options:\n"
405 		   "  -i, --initialize         invokes initialization mode\n"
406 		   "  -F, --fillfactor=NUM     set fill factor\n"
407 		"  -n, --no-vacuum          do not run VACUUM after initialization\n"
408 	"  -q, --quiet              quiet logging (one message each 5 seconds)\n"
409 		   "  -s, --scale=NUM          scaling factor\n"
410 		   "  --foreign-keys           create foreign key constraints between tables\n"
411 		   "  --index-tablespace=TABLESPACE\n"
412 	"                           create indexes in the specified tablespace\n"
413 	 "  --tablespace=TABLESPACE  create tables in the specified tablespace\n"
414 		   "  --unlogged-tables        create tables as unlogged tables\n"
415 		   "\nOptions to select what to run:\n"
416 		   "  -b, --builtin=NAME[@W]   add builtin script NAME weighted at W (default: 1)\n"
417 	"                           (use \"-b list\" to list available scripts)\n"
418 		   "  -f, --file=FILENAME[@W]  add script FILENAME weighted at W (default: 1)\n"
419 		   "  -N, --skip-some-updates  skip updates of pgbench_tellers and pgbench_branches\n"
420 		   "                           (same as \"-b simple-update\")\n"
421 		   "  -S, --select-only        perform SELECT-only transactions\n"
422 		   "                           (same as \"-b select-only\")\n"
423 		   "\nBenchmarking options:\n"
424 		   "  -c, --client=NUM         number of concurrent database clients (default: 1)\n"
425 		   "  -C, --connect            establish new connection for each transaction\n"
426 		   "  -D, --define=VARNAME=VALUE\n"
427 	  "                           define variable for use by custom script\n"
428 		   "  -j, --jobs=NUM           number of threads (default: 1)\n"
429 		   "  -l, --log                write transaction times to log file\n"
430 		   "  -L, --latency-limit=NUM  count transactions lasting more than NUM ms as late\n"
431 		   "  -M, --protocol=simple|extended|prepared\n"
432 		   "                           protocol for submitting queries (default: simple)\n"
433 		   "  -n, --no-vacuum          do not run VACUUM before tests\n"
434 		   "  -P, --progress=NUM       show thread progress report every NUM seconds\n"
435 		   "  -r, --report-latencies   report average latency per command\n"
436 		"  -R, --rate=NUM           target rate in transactions per second\n"
437 		   "  -s, --scale=NUM          report this scale factor in output\n"
438 		   "  -t, --transactions=NUM   number of transactions each client runs (default: 10)\n"
439 		 "  -T, --time=NUM           duration of benchmark test in seconds\n"
440 		   "  -v, --vacuum-all         vacuum all four standard tables before tests\n"
441 		   "  --aggregate-interval=NUM aggregate data over NUM seconds\n"
442 		"  --progress-timestamp     use Unix epoch timestamps for progress\n"
443 		   "  --sampling-rate=NUM      fraction of transactions to log (e.g., 0.01 for 1%%)\n"
444 		   "\nCommon options:\n"
445 		   "  -d, --debug              print debugging output\n"
446 	  "  -h, --host=HOSTNAME      database server host or socket directory\n"
447 		   "  -p, --port=PORT          database server port number\n"
448 		   "  -U, --username=USERNAME  connect as specified database user\n"
449 		 "  -V, --version            output version information, then exit\n"
450 		   "  -?, --help               show this help, then exit\n"
451 		   "\n"
452 		   "Report bugs to <pgsql-bugs@postgresql.org>.\n",
453 		   progname, progname);
454 }
455 
456 /* return whether str matches "^\s*[-+]?[0-9]+$" */
457 static bool
is_an_int(const char * str)458 is_an_int(const char *str)
459 {
460 	const char *ptr = str;
461 
462 	/* skip leading spaces; cast is consistent with strtoint64 */
463 	while (*ptr && isspace((unsigned char) *ptr))
464 		ptr++;
465 
466 	/* skip sign */
467 	if (*ptr == '+' || *ptr == '-')
468 		ptr++;
469 
470 	/* at least one digit */
471 	if (*ptr && !isdigit((unsigned char) *ptr))
472 		return false;
473 
474 	/* eat all digits */
475 	while (*ptr && isdigit((unsigned char) *ptr))
476 		ptr++;
477 
478 	/* must have reached end of string */
479 	return *ptr == '\0';
480 }
481 
482 
483 /*
484  * strtoint64 -- convert a string to 64-bit integer
485  *
486  * This function is a modified version of scanint8() from
487  * src/backend/utils/adt/int8.c.
488  */
489 int64
strtoint64(const char * str)490 strtoint64(const char *str)
491 {
492 	const char *ptr = str;
493 	int64		result = 0;
494 	int			sign = 1;
495 
496 	/*
497 	 * Do our own scan, rather than relying on sscanf which might be broken
498 	 * for long long.
499 	 */
500 
501 	/* skip leading spaces */
502 	while (*ptr && isspace((unsigned char) *ptr))
503 		ptr++;
504 
505 	/* handle sign */
506 	if (*ptr == '-')
507 	{
508 		ptr++;
509 
510 		/*
511 		 * Do an explicit check for INT64_MIN.  Ugly though this is, it's
512 		 * cleaner than trying to get the loop below to handle it portably.
513 		 */
514 		if (strncmp(ptr, "9223372036854775808", 19) == 0)
515 		{
516 			result = PG_INT64_MIN;
517 			ptr += 19;
518 			goto gotdigits;
519 		}
520 		sign = -1;
521 	}
522 	else if (*ptr == '+')
523 		ptr++;
524 
525 	/* require at least one digit */
526 	if (!isdigit((unsigned char) *ptr))
527 		fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
528 
529 	/* process digits */
530 	while (*ptr && isdigit((unsigned char) *ptr))
531 	{
532 		int64		tmp = result * 10 + (*ptr++ - '0');
533 
534 		if ((tmp / 10) != result)		/* overflow? */
535 			fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
536 		result = tmp;
537 	}
538 
539 gotdigits:
540 
541 	/* allow trailing whitespace, but not other trailing chars */
542 	while (*ptr != '\0' && isspace((unsigned char) *ptr))
543 		ptr++;
544 
545 	if (*ptr != '\0')
546 		fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
547 
548 	return ((sign < 0) ? -result : result);
549 }
550 
551 /* random number generator: uniform distribution from min to max inclusive */
552 static int64
getrand(TState * thread,int64 min,int64 max)553 getrand(TState *thread, int64 min, int64 max)
554 {
555 	/*
556 	 * Odd coding is so that min and max have approximately the same chance of
557 	 * being selected as do numbers between them.
558 	 *
559 	 * pg_erand48() is thread-safe and concurrent, which is why we use it
560 	 * rather than random(), which in glibc is non-reentrant, and therefore
561 	 * protected by a mutex, and therefore a bottleneck on machines with many
562 	 * CPUs.
563 	 */
564 	return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
565 }
566 
567 /*
568  * random number generator: exponential distribution from min to max inclusive.
569  * the parameter is so that the density of probability for the last cut-off max
570  * value is exp(-parameter).
571  */
572 static int64
getExponentialRand(TState * thread,int64 min,int64 max,double parameter)573 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
574 {
575 	double		cut,
576 				uniform,
577 				rand;
578 
579 	/* abort if wrong parameter, but must really be checked beforehand */
580 	Assert(parameter > 0.0);
581 	cut = exp(-parameter);
582 	/* erand in [0, 1), uniform in (0, 1] */
583 	uniform = 1.0 - pg_erand48(thread->random_state);
584 
585 	/*
586 	 * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
587 	 */
588 	Assert((1.0 - cut) != 0.0);
589 	rand = -log(cut + (1.0 - cut) * uniform) / parameter;
590 	/* return int64 random number within between min and max */
591 	return min + (int64) ((max - min + 1) * rand);
592 }
593 
594 /* random number generator: gaussian distribution from min to max inclusive */
595 static int64
getGaussianRand(TState * thread,int64 min,int64 max,double parameter)596 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
597 {
598 	double		stdev;
599 	double		rand;
600 
601 	/* abort if parameter is too low, but must really be checked beforehand */
602 	Assert(parameter >= MIN_GAUSSIAN_PARAM);
603 
604 	/*
605 	 * Get user specified random number from this loop, with -parameter <
606 	 * stdev <= parameter
607 	 *
608 	 * This loop is executed until the number is in the expected range.
609 	 *
610 	 * As the minimum parameter is 2.0, the probability of looping is low:
611 	 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
612 	 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
613 	 * the worst case. For a parameter value of 5.0, the looping probability
614 	 * is about e^{-5} * 2 / pi ~ 0.43%.
615 	 */
616 	do
617 	{
618 		/*
619 		 * pg_erand48 generates [0,1), but for the basic version of the
620 		 * Box-Muller transform the two uniformly distributed random numbers
621 		 * are expected in (0, 1] (see
622 		 * http://en.wikipedia.org/wiki/Box_muller)
623 		 */
624 		double		rand1 = 1.0 - pg_erand48(thread->random_state);
625 		double		rand2 = 1.0 - pg_erand48(thread->random_state);
626 
627 		/* Box-Muller basic form transform */
628 		double		var_sqrt = sqrt(-2.0 * log(rand1));
629 
630 		stdev = var_sqrt * sin(2.0 * M_PI * rand2);
631 
632 		/*
633 		 * we may try with cos, but there may be a bias induced if the
634 		 * previous value fails the test. To be on the safe side, let us try
635 		 * over.
636 		 */
637 	}
638 	while (stdev < -parameter || stdev >= parameter);
639 
640 	/* stdev is in [-parameter, parameter), normalization to [0,1) */
641 	rand = (stdev + parameter) / (parameter * 2.0);
642 
643 	/* return int64 random number within between min and max */
644 	return min + (int64) ((max - min + 1) * rand);
645 }
646 
647 /*
648  * random number generator: generate a value, such that the series of values
649  * will approximate a Poisson distribution centered on the given value.
650  */
651 static int64
getPoissonRand(TState * thread,int64 center)652 getPoissonRand(TState *thread, int64 center)
653 {
654 	/*
655 	 * Use inverse transform sampling to generate a value > 0, such that the
656 	 * expected (i.e. average) value is the given argument.
657 	 */
658 	double		uniform;
659 
660 	/* erand in [0, 1), uniform in (0, 1] */
661 	uniform = 1.0 - pg_erand48(thread->random_state);
662 
663 	return (int64) (-log(uniform) * ((double) center) + 0.5);
664 }
665 
666 /*
667  * Initialize the given SimpleStats struct to all zeroes
668  */
669 static void
initSimpleStats(SimpleStats * ss)670 initSimpleStats(SimpleStats *ss)
671 {
672 	memset(ss, 0, sizeof(SimpleStats));
673 }
674 
675 /*
676  * Accumulate one value into a SimpleStats struct.
677  */
678 static void
addToSimpleStats(SimpleStats * ss,double val)679 addToSimpleStats(SimpleStats *ss, double val)
680 {
681 	if (ss->count == 0 || val < ss->min)
682 		ss->min = val;
683 	if (ss->count == 0 || val > ss->max)
684 		ss->max = val;
685 	ss->count++;
686 	ss->sum += val;
687 	ss->sum2 += val * val;
688 }
689 
690 /*
691  * Merge two SimpleStats objects
692  */
693 static void
mergeSimpleStats(SimpleStats * acc,SimpleStats * ss)694 mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
695 {
696 	if (acc->count == 0 || ss->min < acc->min)
697 		acc->min = ss->min;
698 	if (acc->count == 0 || ss->max > acc->max)
699 		acc->max = ss->max;
700 	acc->count += ss->count;
701 	acc->sum += ss->sum;
702 	acc->sum2 += ss->sum2;
703 }
704 
705 /*
706  * Initialize a StatsData struct to mostly zeroes, with its start time set to
707  * the given value.
708  */
709 static void
initStats(StatsData * sd,double start_time)710 initStats(StatsData *sd, double start_time)
711 {
712 	sd->start_time = start_time;
713 	sd->cnt = 0;
714 	sd->skipped = 0;
715 	initSimpleStats(&sd->latency);
716 	initSimpleStats(&sd->lag);
717 }
718 
719 /*
720  * Accumulate one additional item into the given stats object.
721  */
722 static void
accumStats(StatsData * stats,bool skipped,double lat,double lag)723 accumStats(StatsData *stats, bool skipped, double lat, double lag)
724 {
725 	stats->cnt++;
726 
727 	if (skipped)
728 	{
729 		/* no latency to record on skipped transactions */
730 		stats->skipped++;
731 	}
732 	else
733 	{
734 		addToSimpleStats(&stats->latency, lat);
735 
736 		/* and possibly the same for schedule lag */
737 		if (throttle_delay)
738 			addToSimpleStats(&stats->lag, lag);
739 	}
740 }
741 
742 /* call PQexec() and exit() on failure */
743 static void
executeStatement(PGconn * con,const char * sql)744 executeStatement(PGconn *con, const char *sql)
745 {
746 	PGresult   *res;
747 
748 	res = PQexec(con, sql);
749 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
750 	{
751 		fprintf(stderr, "%s", PQerrorMessage(con));
752 		exit(1);
753 	}
754 	PQclear(res);
755 }
756 
757 /* call PQexec() and complain, but without exiting, on failure */
758 static void
tryExecuteStatement(PGconn * con,const char * sql)759 tryExecuteStatement(PGconn *con, const char *sql)
760 {
761 	PGresult   *res;
762 
763 	res = PQexec(con, sql);
764 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
765 	{
766 		fprintf(stderr, "%s", PQerrorMessage(con));
767 		fprintf(stderr, "(ignoring this error and continuing anyway)\n");
768 	}
769 	PQclear(res);
770 }
771 
772 /* set up a connection to the backend */
773 static PGconn *
doConnect(void)774 doConnect(void)
775 {
776 	PGconn	   *conn;
777 	static char *password = NULL;
778 	bool		new_pass;
779 
780 	/*
781 	 * Start the connection.  Loop until we have a password if requested by
782 	 * backend.
783 	 */
784 	do
785 	{
786 #define PARAMS_ARRAY_SIZE	7
787 
788 		const char *keywords[PARAMS_ARRAY_SIZE];
789 		const char *values[PARAMS_ARRAY_SIZE];
790 
791 		keywords[0] = "host";
792 		values[0] = pghost;
793 		keywords[1] = "port";
794 		values[1] = pgport;
795 		keywords[2] = "user";
796 		values[2] = login;
797 		keywords[3] = "password";
798 		values[3] = password;
799 		keywords[4] = "dbname";
800 		values[4] = dbName;
801 		keywords[5] = "fallback_application_name";
802 		values[5] = progname;
803 		keywords[6] = NULL;
804 		values[6] = NULL;
805 
806 		new_pass = false;
807 
808 		conn = PQconnectdbParams(keywords, values, true);
809 
810 		if (!conn)
811 		{
812 			fprintf(stderr, "connection to database \"%s\" failed\n",
813 					dbName);
814 			return NULL;
815 		}
816 
817 		if (PQstatus(conn) == CONNECTION_BAD &&
818 			PQconnectionNeedsPassword(conn) &&
819 			password == NULL)
820 		{
821 			PQfinish(conn);
822 			password = simple_prompt("Password: ", 100, false);
823 			new_pass = true;
824 		}
825 	} while (new_pass);
826 
827 	/* check to see that the backend connection was successfully made */
828 	if (PQstatus(conn) == CONNECTION_BAD)
829 	{
830 		fprintf(stderr, "connection to database \"%s\" failed:\n%s",
831 				PQdb(conn), PQerrorMessage(conn));
832 		PQfinish(conn);
833 		return NULL;
834 	}
835 
836 	return conn;
837 }
838 
839 /* throw away response from backend */
840 static void
discard_response(CState * state)841 discard_response(CState *state)
842 {
843 	PGresult   *res;
844 
845 	do
846 	{
847 		res = PQgetResult(state->con);
848 		if (res)
849 			PQclear(res);
850 	} while (res);
851 }
852 
853 /* qsort comparator for Variable array */
854 static int
compareVariableNames(const void * v1,const void * v2)855 compareVariableNames(const void *v1, const void *v2)
856 {
857 	return strcmp(((const Variable *) v1)->name,
858 				  ((const Variable *) v2)->name);
859 }
860 
861 /* Locate a variable by name; returns NULL if unknown */
862 static Variable *
lookupVariable(CState * st,char * name)863 lookupVariable(CState *st, char *name)
864 {
865 	Variable	key;
866 
867 	/* On some versions of Solaris, bsearch of zero items dumps core */
868 	if (st->nvariables <= 0)
869 		return NULL;
870 
871 	/* Sort if we have to */
872 	if (!st->vars_sorted)
873 	{
874 		qsort((void *) st->variables, st->nvariables, sizeof(Variable),
875 			  compareVariableNames);
876 		st->vars_sorted = true;
877 	}
878 
879 	/* Now we can search */
880 	key.name = name;
881 	return (Variable *) bsearch((void *) &key,
882 								(void *) st->variables,
883 								st->nvariables,
884 								sizeof(Variable),
885 								compareVariableNames);
886 }
887 
888 /* Get the value of a variable, in string form; returns NULL if unknown */
889 static char *
getVariable(CState * st,char * name)890 getVariable(CState *st, char *name)
891 {
892 	Variable   *var;
893 	char		stringform[64];
894 
895 	var = lookupVariable(st, name);
896 	if (var == NULL)
897 		return NULL;			/* not found */
898 
899 	if (var->value)
900 		return var->value;		/* we have it in string form */
901 
902 	/* We need to produce a string equivalent of the numeric value */
903 	Assert(var->is_numeric);
904 	if (var->num_value.type == PGBT_INT)
905 		snprintf(stringform, sizeof(stringform),
906 				 INT64_FORMAT, var->num_value.u.ival);
907 	else
908 	{
909 		Assert(var->num_value.type == PGBT_DOUBLE);
910 		snprintf(stringform, sizeof(stringform),
911 				 "%.*g", DBL_DIG, var->num_value.u.dval);
912 	}
913 	var->value = pg_strdup(stringform);
914 	return var->value;
915 }
916 
917 /* Try to convert variable to numeric form; return false on failure */
918 static bool
makeVariableNumeric(Variable * var)919 makeVariableNumeric(Variable *var)
920 {
921 	if (var->is_numeric)
922 		return true;			/* no work */
923 
924 	if (is_an_int(var->value))
925 	{
926 		setIntValue(&var->num_value, strtoint64(var->value));
927 		var->is_numeric = true;
928 	}
929 	else	/* type should be double */
930 	{
931 		double		dv;
932 		char		xs;
933 
934 		if (sscanf(var->value, "%lf%c", &dv, &xs) != 1)
935 		{
936 			fprintf(stderr,
937 					"malformed variable \"%s\" value: \"%s\"\n",
938 					var->name, var->value);
939 			return false;
940 		}
941 		setDoubleValue(&var->num_value, dv);
942 		var->is_numeric = true;
943 	}
944 	return true;
945 }
946 
947 /* check whether the name consists of alphabets, numerals and underscores. */
948 static bool
isLegalVariableName(const char * name)949 isLegalVariableName(const char *name)
950 {
951 	int			i;
952 
953 	for (i = 0; name[i] != '\0'; i++)
954 	{
955 		if (!isalnum((unsigned char) name[i]) && name[i] != '_')
956 			return false;
957 	}
958 
959 	return (i > 0);				/* must be non-empty */
960 }
961 
962 /*
963  * Lookup a variable by name, creating it if need be.
964  * Caller is expected to assign a value to the variable.
965  * Returns NULL on failure (bad name).
966  */
967 static Variable *
lookupCreateVariable(CState * st,const char * context,char * name)968 lookupCreateVariable(CState *st, const char *context, char *name)
969 {
970 	Variable   *var;
971 
972 	var = lookupVariable(st, name);
973 	if (var == NULL)
974 	{
975 		Variable   *newvars;
976 
977 		/*
978 		 * Check for the name only when declaring a new variable to avoid
979 		 * overhead.
980 		 */
981 		if (!isLegalVariableName(name))
982 		{
983 			fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
984 					context, name);
985 			return NULL;
986 		}
987 
988 		/* Create variable at the end of the array */
989 		if (st->variables)
990 			newvars = (Variable *) pg_realloc(st->variables,
991 									(st->nvariables + 1) * sizeof(Variable));
992 		else
993 			newvars = (Variable *) pg_malloc(sizeof(Variable));
994 
995 		st->variables = newvars;
996 
997 		var = &newvars[st->nvariables];
998 
999 		var->name = pg_strdup(name);
1000 		var->value = NULL;
1001 		/* caller is expected to initialize remaining fields */
1002 
1003 		st->nvariables++;
1004 		/* we don't re-sort the array till we have to */
1005 		st->vars_sorted = false;
1006 	}
1007 
1008 	return var;
1009 }
1010 
1011 /* Assign a string value to a variable, creating it if need be */
1012 /* Returns false on failure (bad name) */
1013 static bool
putVariable(CState * st,const char * context,char * name,const char * value)1014 putVariable(CState *st, const char *context, char *name, const char *value)
1015 {
1016 	Variable   *var;
1017 	char	   *val;
1018 
1019 	var = lookupCreateVariable(st, context, name);
1020 	if (!var)
1021 		return false;
1022 
1023 	/* dup then free, in case value is pointing at this variable */
1024 	val = pg_strdup(value);
1025 
1026 	if (var->value)
1027 		free(var->value);
1028 	var->value = val;
1029 	var->is_numeric = false;
1030 
1031 	return true;
1032 }
1033 
1034 /* Assign a numeric value to a variable, creating it if need be */
1035 /* Returns false on failure (bad name) */
1036 static bool
putVariableNumber(CState * st,const char * context,char * name,const PgBenchValue * value)1037 putVariableNumber(CState *st, const char *context, char *name,
1038 				  const PgBenchValue *value)
1039 {
1040 	Variable   *var;
1041 
1042 	var = lookupCreateVariable(st, context, name);
1043 	if (!var)
1044 		return false;
1045 
1046 	if (var->value)
1047 		free(var->value);
1048 	var->value = NULL;
1049 	var->is_numeric = true;
1050 	var->num_value = *value;
1051 
1052 	return true;
1053 }
1054 
1055 /* Assign an integer value to a variable, creating it if need be */
1056 /* Returns false on failure (bad name) */
1057 static bool
putVariableInt(CState * st,const char * context,char * name,int64 value)1058 putVariableInt(CState *st, const char *context, char *name, int64 value)
1059 {
1060 	PgBenchValue val;
1061 
1062 	setIntValue(&val, value);
1063 	return putVariableNumber(st, context, name, &val);
1064 }
1065 
1066 static char *
parseVariable(const char * sql,int * eaten)1067 parseVariable(const char *sql, int *eaten)
1068 {
1069 	int			i = 0;
1070 	char	   *name;
1071 
1072 	do
1073 	{
1074 		i++;
1075 	} while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
1076 	if (i == 1)
1077 		return NULL;
1078 
1079 	name = pg_malloc(i);
1080 	memcpy(name, &sql[1], i - 1);
1081 	name[i - 1] = '\0';
1082 
1083 	*eaten = i;
1084 	return name;
1085 }
1086 
1087 static char *
replaceVariable(char ** sql,char * param,int len,char * value)1088 replaceVariable(char **sql, char *param, int len, char *value)
1089 {
1090 	int			valueln = strlen(value);
1091 
1092 	if (valueln > len)
1093 	{
1094 		size_t		offset = param - *sql;
1095 
1096 		*sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1097 		param = *sql + offset;
1098 	}
1099 
1100 	if (valueln != len)
1101 		memmove(param + valueln, param + len, strlen(param + len) + 1);
1102 	memcpy(param, value, valueln);
1103 
1104 	return param + valueln;
1105 }
1106 
1107 static char *
assignVariables(CState * st,char * sql)1108 assignVariables(CState *st, char *sql)
1109 {
1110 	char	   *p,
1111 			   *name,
1112 			   *val;
1113 
1114 	p = sql;
1115 	while ((p = strchr(p, ':')) != NULL)
1116 	{
1117 		int			eaten;
1118 
1119 		name = parseVariable(p, &eaten);
1120 		if (name == NULL)
1121 		{
1122 			while (*p == ':')
1123 			{
1124 				p++;
1125 			}
1126 			continue;
1127 		}
1128 
1129 		val = getVariable(st, name);
1130 		free(name);
1131 		if (val == NULL)
1132 		{
1133 			p++;
1134 			continue;
1135 		}
1136 
1137 		p = replaceVariable(&sql, p, eaten, val);
1138 	}
1139 
1140 	return sql;
1141 }
1142 
1143 static void
getQueryParams(CState * st,const Command * command,const char ** params)1144 getQueryParams(CState *st, const Command *command, const char **params)
1145 {
1146 	int			i;
1147 
1148 	for (i = 0; i < command->argc - 1; i++)
1149 		params[i] = getVariable(st, command->argv[i + 1]);
1150 }
1151 
1152 /* get a value as an int, tell if there is a problem */
1153 static bool
coerceToInt(PgBenchValue * pval,int64 * ival)1154 coerceToInt(PgBenchValue *pval, int64 *ival)
1155 {
1156 	if (pval->type == PGBT_INT)
1157 	{
1158 		*ival = pval->u.ival;
1159 		return true;
1160 	}
1161 	else
1162 	{
1163 		double		dval;
1164 
1165 		Assert(pval->type == PGBT_DOUBLE);
1166 		dval = rint(pval->u.dval);
1167 		if (isnan(dval) || !FLOAT8_FITS_IN_INT64(dval))
1168 		{
1169 			fprintf(stderr, "double to int overflow for %f\n", dval);
1170 			return false;
1171 		}
1172 		*ival = (int64) dval;
1173 		return true;
1174 	}
1175 }
1176 
1177 /* get a value as a double, or tell if there is a problem */
1178 static bool
coerceToDouble(PgBenchValue * pval,double * dval)1179 coerceToDouble(PgBenchValue *pval, double *dval)
1180 {
1181 	if (pval->type == PGBT_DOUBLE)
1182 	{
1183 		*dval = pval->u.dval;
1184 		return true;
1185 	}
1186 	else
1187 	{
1188 		Assert(pval->type == PGBT_INT);
1189 		*dval = (double) pval->u.ival;
1190 		return true;
1191 	}
1192 }
1193 
1194 /* assign an integer value */
1195 static void
setIntValue(PgBenchValue * pv,int64 ival)1196 setIntValue(PgBenchValue *pv, int64 ival)
1197 {
1198 	pv->type = PGBT_INT;
1199 	pv->u.ival = ival;
1200 }
1201 
1202 /* assign a double value */
1203 static void
setDoubleValue(PgBenchValue * pv,double dval)1204 setDoubleValue(PgBenchValue *pv, double dval)
1205 {
1206 	pv->type = PGBT_DOUBLE;
1207 	pv->u.dval = dval;
1208 }
1209 
1210 /* maximum number of function arguments */
1211 #define MAX_FARGS 16
1212 
1213 /*
1214  * Recursive evaluation of functions
1215  */
1216 static bool
evalFunc(TState * thread,CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)1217 evalFunc(TState *thread, CState *st,
1218 		 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
1219 {
1220 	/* evaluate all function arguments */
1221 	int			nargs = 0;
1222 	PgBenchValue vargs[MAX_FARGS];
1223 	PgBenchExprLink *l = args;
1224 
1225 	for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
1226 		if (!evaluateExpr(thread, st, l->expr, &vargs[nargs]))
1227 			return false;
1228 
1229 	if (l != NULL)
1230 	{
1231 		fprintf(stderr,
1232 				"too many function arguments, maximum is %d\n", MAX_FARGS);
1233 		return false;
1234 	}
1235 
1236 	/* then evaluate function */
1237 	switch (func)
1238 	{
1239 			/* overloaded operators */
1240 		case PGBENCH_ADD:
1241 		case PGBENCH_SUB:
1242 		case PGBENCH_MUL:
1243 		case PGBENCH_DIV:
1244 		case PGBENCH_MOD:
1245 			{
1246 				PgBenchValue *lval = &vargs[0],
1247 						   *rval = &vargs[1];
1248 
1249 				Assert(nargs == 2);
1250 
1251 				/* overloaded type management, double if some double */
1252 				if ((lval->type == PGBT_DOUBLE ||
1253 					 rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
1254 				{
1255 					double		ld,
1256 								rd;
1257 
1258 					if (!coerceToDouble(lval, &ld) ||
1259 						!coerceToDouble(rval, &rd))
1260 						return false;
1261 
1262 					switch (func)
1263 					{
1264 						case PGBENCH_ADD:
1265 							setDoubleValue(retval, ld + rd);
1266 							return true;
1267 
1268 						case PGBENCH_SUB:
1269 							setDoubleValue(retval, ld - rd);
1270 							return true;
1271 
1272 						case PGBENCH_MUL:
1273 							setDoubleValue(retval, ld * rd);
1274 							return true;
1275 
1276 						case PGBENCH_DIV:
1277 							setDoubleValue(retval, ld / rd);
1278 							return true;
1279 
1280 						default:
1281 							/* cannot get here */
1282 							Assert(0);
1283 					}
1284 				}
1285 				else	/* we have integer operands, or % */
1286 				{
1287 					int64		li,
1288 								ri;
1289 
1290 					if (!coerceToInt(lval, &li) ||
1291 						!coerceToInt(rval, &ri))
1292 						return false;
1293 
1294 					switch (func)
1295 					{
1296 						case PGBENCH_ADD:
1297 							setIntValue(retval, li + ri);
1298 							return true;
1299 
1300 						case PGBENCH_SUB:
1301 							setIntValue(retval, li - ri);
1302 							return true;
1303 
1304 						case PGBENCH_MUL:
1305 							setIntValue(retval, li * ri);
1306 							return true;
1307 
1308 						case PGBENCH_DIV:
1309 						case PGBENCH_MOD:
1310 							if (ri == 0)
1311 							{
1312 								fprintf(stderr, "division by zero\n");
1313 								return false;
1314 							}
1315 							/* special handling of -1 divisor */
1316 							if (ri == -1)
1317 							{
1318 								if (func == PGBENCH_DIV)
1319 								{
1320 									/* overflow check (needed for INT64_MIN) */
1321 									if (li == PG_INT64_MIN)
1322 									{
1323 										fprintf(stderr, "bigint out of range\n");
1324 										return false;
1325 									}
1326 									else
1327 										setIntValue(retval, -li);
1328 								}
1329 								else
1330 									setIntValue(retval, 0);
1331 								return true;
1332 							}
1333 							/* else divisor is not -1 */
1334 							if (func == PGBENCH_DIV)
1335 								setIntValue(retval, li / ri);
1336 							else	/* func == PGBENCH_MOD */
1337 								setIntValue(retval, li % ri);
1338 
1339 							return true;
1340 
1341 						default:
1342 							/* cannot get here */
1343 							Assert(0);
1344 					}
1345 				}
1346 			}
1347 
1348 			/* no arguments */
1349 		case PGBENCH_PI:
1350 			setDoubleValue(retval, M_PI);
1351 			return true;
1352 
1353 			/* 1 overloaded argument */
1354 		case PGBENCH_ABS:
1355 			{
1356 				PgBenchValue *varg = &vargs[0];
1357 
1358 				Assert(nargs == 1);
1359 
1360 				if (varg->type == PGBT_INT)
1361 				{
1362 					int64		i = varg->u.ival;
1363 
1364 					setIntValue(retval, i < 0 ? -i : i);
1365 				}
1366 				else
1367 				{
1368 					double		d = varg->u.dval;
1369 
1370 					Assert(varg->type == PGBT_DOUBLE);
1371 					setDoubleValue(retval, d < 0.0 ? -d : d);
1372 				}
1373 
1374 				return true;
1375 			}
1376 
1377 		case PGBENCH_DEBUG:
1378 			{
1379 				PgBenchValue *varg = &vargs[0];
1380 
1381 				Assert(nargs == 1);
1382 
1383 				fprintf(stderr, "debug(script=%d,command=%d): ",
1384 						st->use_file, st->state + 1);
1385 
1386 				if (varg->type == PGBT_INT)
1387 					fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
1388 				else
1389 				{
1390 					Assert(varg->type == PGBT_DOUBLE);
1391 					fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
1392 				}
1393 
1394 				*retval = *varg;
1395 
1396 				return true;
1397 			}
1398 
1399 			/* 1 double argument */
1400 		case PGBENCH_DOUBLE:
1401 		case PGBENCH_SQRT:
1402 			{
1403 				double		dval;
1404 
1405 				Assert(nargs == 1);
1406 
1407 				if (!coerceToDouble(&vargs[0], &dval))
1408 					return false;
1409 
1410 				if (func == PGBENCH_SQRT)
1411 					dval = sqrt(dval);
1412 
1413 				setDoubleValue(retval, dval);
1414 				return true;
1415 			}
1416 
1417 			/* 1 int argument */
1418 		case PGBENCH_INT:
1419 			{
1420 				int64		ival;
1421 
1422 				Assert(nargs == 1);
1423 
1424 				if (!coerceToInt(&vargs[0], &ival))
1425 					return false;
1426 
1427 				setIntValue(retval, ival);
1428 				return true;
1429 			}
1430 
1431 			/* variable number of arguments */
1432 		case PGBENCH_LEAST:
1433 		case PGBENCH_GREATEST:
1434 			{
1435 				bool		havedouble;
1436 				int			i;
1437 
1438 				Assert(nargs >= 1);
1439 
1440 				/* need double result if any input is double */
1441 				havedouble = false;
1442 				for (i = 0; i < nargs; i++)
1443 				{
1444 					if (vargs[i].type == PGBT_DOUBLE)
1445 					{
1446 						havedouble = true;
1447 						break;
1448 					}
1449 				}
1450 				if (havedouble)
1451 				{
1452 					double		extremum;
1453 
1454 					if (!coerceToDouble(&vargs[0], &extremum))
1455 						return false;
1456 					for (i = 1; i < nargs; i++)
1457 					{
1458 						double		dval;
1459 
1460 						if (!coerceToDouble(&vargs[i], &dval))
1461 							return false;
1462 						if (func == PGBENCH_LEAST)
1463 							extremum = Min(extremum, dval);
1464 						else
1465 							extremum = Max(extremum, dval);
1466 					}
1467 					setDoubleValue(retval, extremum);
1468 				}
1469 				else
1470 				{
1471 					int64		extremum;
1472 
1473 					if (!coerceToInt(&vargs[0], &extremum))
1474 						return false;
1475 					for (i = 1; i < nargs; i++)
1476 					{
1477 						int64		ival;
1478 
1479 						if (!coerceToInt(&vargs[i], &ival))
1480 							return false;
1481 						if (func == PGBENCH_LEAST)
1482 							extremum = Min(extremum, ival);
1483 						else
1484 							extremum = Max(extremum, ival);
1485 					}
1486 					setIntValue(retval, extremum);
1487 				}
1488 				return true;
1489 			}
1490 
1491 			/* random functions */
1492 		case PGBENCH_RANDOM:
1493 		case PGBENCH_RANDOM_EXPONENTIAL:
1494 		case PGBENCH_RANDOM_GAUSSIAN:
1495 			{
1496 				int64		imin,
1497 							imax;
1498 
1499 				Assert(nargs >= 2);
1500 
1501 				if (!coerceToInt(&vargs[0], &imin) ||
1502 					!coerceToInt(&vargs[1], &imax))
1503 					return false;
1504 
1505 				/* check random range */
1506 				if (imin > imax)
1507 				{
1508 					fprintf(stderr, "empty range given to random\n");
1509 					return false;
1510 				}
1511 				else if (imax - imin < 0 || (imax - imin) + 1 < 0)
1512 				{
1513 					/* prevent int overflows in random functions */
1514 					fprintf(stderr, "random range is too large\n");
1515 					return false;
1516 				}
1517 
1518 				if (func == PGBENCH_RANDOM)
1519 				{
1520 					Assert(nargs == 2);
1521 					setIntValue(retval, getrand(thread, imin, imax));
1522 				}
1523 				else	/* gaussian & exponential */
1524 				{
1525 					double		param;
1526 
1527 					Assert(nargs == 3);
1528 
1529 					if (!coerceToDouble(&vargs[2], &param))
1530 						return false;
1531 
1532 					if (func == PGBENCH_RANDOM_GAUSSIAN)
1533 					{
1534 						if (param < MIN_GAUSSIAN_PARAM)
1535 						{
1536 							fprintf(stderr,
1537 									"gaussian parameter must be at least %f "
1538 									"(not %f)\n", MIN_GAUSSIAN_PARAM, param);
1539 							return false;
1540 						}
1541 
1542 						setIntValue(retval,
1543 								 getGaussianRand(thread, imin, imax, param));
1544 					}
1545 					else	/* exponential */
1546 					{
1547 						if (param <= 0.0)
1548 						{
1549 							fprintf(stderr,
1550 							"exponential parameter must be greater than zero"
1551 									" (got %f)\n", param);
1552 							return false;
1553 						}
1554 
1555 						setIntValue(retval,
1556 							  getExponentialRand(thread, imin, imax, param));
1557 					}
1558 				}
1559 
1560 				return true;
1561 			}
1562 
1563 		default:
1564 			/* cannot get here */
1565 			Assert(0);
1566 			/* dead code to avoid a compiler warning */
1567 			return false;
1568 	}
1569 }
1570 
1571 /*
1572  * Recursive evaluation of an expression in a pgbench script
1573  * using the current state of variables.
1574  * Returns whether the evaluation was ok,
1575  * the value itself is returned through the retval pointer.
1576  */
1577 static bool
evaluateExpr(TState * thread,CState * st,PgBenchExpr * expr,PgBenchValue * retval)1578 evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr, PgBenchValue *retval)
1579 {
1580 	switch (expr->etype)
1581 	{
1582 		case ENODE_CONSTANT:
1583 			{
1584 				*retval = expr->u.constant;
1585 				return true;
1586 			}
1587 
1588 		case ENODE_VARIABLE:
1589 			{
1590 				Variable   *var;
1591 
1592 				if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
1593 				{
1594 					fprintf(stderr, "undefined variable \"%s\"\n",
1595 							expr->u.variable.varname);
1596 					return false;
1597 				}
1598 
1599 				if (!makeVariableNumeric(var))
1600 					return false;
1601 
1602 				*retval = var->num_value;
1603 				return true;
1604 			}
1605 
1606 		case ENODE_FUNCTION:
1607 			return evalFunc(thread, st,
1608 							expr->u.function.function,
1609 							expr->u.function.args,
1610 							retval);
1611 
1612 		default:
1613 			/* internal error which should never occur */
1614 			fprintf(stderr, "unexpected enode type in evaluation: %d\n",
1615 					expr->etype);
1616 			exit(1);
1617 	}
1618 }
1619 
1620 /*
1621  * Run a shell command. The result is assigned to the variable if not NULL.
1622  * Return true if succeeded, or false on error.
1623  */
1624 static bool
runShellCommand(CState * st,char * variable,char ** argv,int argc)1625 runShellCommand(CState *st, char *variable, char **argv, int argc)
1626 {
1627 	char		command[SHELL_COMMAND_SIZE];
1628 	int			i,
1629 				len = 0;
1630 	FILE	   *fp;
1631 	char		res[64];
1632 	char	   *endptr;
1633 	int			retval;
1634 
1635 	/*----------
1636 	 * Join arguments with whitespace separators. Arguments starting with
1637 	 * exactly one colon are treated as variables:
1638 	 *	name - append a string "name"
1639 	 *	:var - append a variable named 'var'
1640 	 *	::name - append a string ":name"
1641 	 *----------
1642 	 */
1643 	for (i = 0; i < argc; i++)
1644 	{
1645 		char	   *arg;
1646 		int			arglen;
1647 
1648 		if (argv[i][0] != ':')
1649 		{
1650 			arg = argv[i];		/* a string literal */
1651 		}
1652 		else if (argv[i][1] == ':')
1653 		{
1654 			arg = argv[i] + 1;	/* a string literal starting with colons */
1655 		}
1656 		else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1657 		{
1658 			fprintf(stderr, "%s: undefined variable \"%s\"\n",
1659 					argv[0], argv[i]);
1660 			return false;
1661 		}
1662 
1663 		arglen = strlen(arg);
1664 		if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1665 		{
1666 			fprintf(stderr, "%s: shell command is too long\n", argv[0]);
1667 			return false;
1668 		}
1669 
1670 		if (i > 0)
1671 			command[len++] = ' ';
1672 		memcpy(command + len, arg, arglen);
1673 		len += arglen;
1674 	}
1675 
1676 	command[len] = '\0';
1677 
1678 	/* Fast path for non-assignment case */
1679 	if (variable == NULL)
1680 	{
1681 		if (system(command))
1682 		{
1683 			if (!timer_exceeded)
1684 				fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1685 			return false;
1686 		}
1687 		return true;
1688 	}
1689 
1690 	/* Execute the command with pipe and read the standard output. */
1691 	if ((fp = popen(command, "r")) == NULL)
1692 	{
1693 		fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1694 		return false;
1695 	}
1696 	if (fgets(res, sizeof(res), fp) == NULL)
1697 	{
1698 		if (!timer_exceeded)
1699 			fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
1700 		(void) pclose(fp);
1701 		return false;
1702 	}
1703 	if (pclose(fp) < 0)
1704 	{
1705 		fprintf(stderr, "%s: could not close shell command\n", argv[0]);
1706 		return false;
1707 	}
1708 
1709 	/* Check whether the result is an integer and assign it to the variable */
1710 	retval = (int) strtol(res, &endptr, 10);
1711 	while (*endptr != '\0' && isspace((unsigned char) *endptr))
1712 		endptr++;
1713 	if (*res == '\0' || *endptr != '\0')
1714 	{
1715 		fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
1716 				argv[0], res);
1717 		return false;
1718 	}
1719 	if (!putVariableInt(st, "setshell", variable, retval))
1720 		return false;
1721 
1722 #ifdef DEBUG
1723 	printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
1724 #endif
1725 	return true;
1726 }
1727 
1728 #define MAX_PREPARE_NAME		32
1729 static void
preparedStatementName(char * buffer,int file,int state)1730 preparedStatementName(char *buffer, int file, int state)
1731 {
1732 	sprintf(buffer, "P%d_%d", file, state);
1733 }
1734 
1735 static bool
clientDone(CState * st)1736 clientDone(CState *st)
1737 {
1738 	if (st->con != NULL)
1739 	{
1740 		PQfinish(st->con);
1741 		st->con = NULL;
1742 	}
1743 	return false;				/* always false */
1744 }
1745 
1746 /* return a script number with a weighted choice. */
1747 static int
chooseScript(TState * thread)1748 chooseScript(TState *thread)
1749 {
1750 	int			i = 0;
1751 	int64		w;
1752 
1753 	if (num_scripts == 1)
1754 		return 0;
1755 
1756 	w = getrand(thread, 0, total_weight - 1);
1757 	do
1758 	{
1759 		w -= sql_script[i++].weight;
1760 	} while (w >= 0);
1761 
1762 	return i - 1;
1763 }
1764 
1765 /* return false iff client should be disconnected */
1766 static bool
doCustom(TState * thread,CState * st,StatsData * agg)1767 doCustom(TState *thread, CState *st, StatsData *agg)
1768 {
1769 	PGresult   *res;
1770 	Command   **commands;
1771 	bool		trans_needs_throttle = false;
1772 	instr_time	now;
1773 
1774 	/*
1775 	 * gettimeofday() isn't free, so we get the current timestamp lazily the
1776 	 * first time it's needed, and reuse the same value throughout this
1777 	 * function after that. This also ensures that e.g. the calculated latency
1778 	 * reported in the log file and in the totals are the same. Zero means
1779 	 * "not set yet". Reset "now" when we step to the next command with "goto
1780 	 * top", though.
1781 	 */
1782 top:
1783 	INSTR_TIME_SET_ZERO(now);
1784 
1785 	commands = sql_script[st->use_file].commands;
1786 
1787 	/*
1788 	 * Handle throttling once per transaction by sleeping.  It is simpler to
1789 	 * do this here rather than at the end, because so much complicated logic
1790 	 * happens below when statements finish.
1791 	 */
1792 	if (throttle_delay && !st->is_throttled)
1793 	{
1794 		/*
1795 		 * Generate a delay such that the series of delays will approximate a
1796 		 * Poisson distribution centered on the throttle_delay time.
1797 		 *
1798 		 * If transactions are too slow or a given wait is shorter than a
1799 		 * transaction, the next transaction will start right away.
1800 		 */
1801 		int64		wait = getPoissonRand(thread, throttle_delay);
1802 
1803 		thread->throttle_trigger += wait;
1804 		st->txn_scheduled = thread->throttle_trigger;
1805 
1806 		/* stop client if next transaction is beyond pgbench end of execution */
1807 		if (duration > 0 && st->txn_scheduled > end_time)
1808 			return clientDone(st);
1809 
1810 		/*
1811 		 * If this --latency-limit is used, and this slot is already late so
1812 		 * that the transaction will miss the latency limit even if it
1813 		 * completed immediately, we skip this time slot and iterate till the
1814 		 * next slot that isn't late yet.
1815 		 */
1816 		if (latency_limit)
1817 		{
1818 			int64		now_us;
1819 
1820 			if (INSTR_TIME_IS_ZERO(now))
1821 				INSTR_TIME_SET_CURRENT(now);
1822 			now_us = INSTR_TIME_GET_MICROSEC(now);
1823 			while (thread->throttle_trigger < now_us - latency_limit)
1824 			{
1825 				processXactStats(thread, st, &now, true, agg);
1826 				/* next rendez-vous */
1827 				wait = getPoissonRand(thread, throttle_delay);
1828 				thread->throttle_trigger += wait;
1829 				st->txn_scheduled = thread->throttle_trigger;
1830 			}
1831 		}
1832 
1833 		st->sleep_until = st->txn_scheduled;
1834 		st->sleeping = true;
1835 		st->throttling = true;
1836 		st->is_throttled = true;
1837 		if (debug)
1838 			fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
1839 					st->id, wait);
1840 	}
1841 
1842 	if (st->sleeping)
1843 	{							/* are we sleeping? */
1844 		if (INSTR_TIME_IS_ZERO(now))
1845 			INSTR_TIME_SET_CURRENT(now);
1846 		if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
1847 			return true;		/* Still sleeping, nothing to do here */
1848 		/* Else done sleeping, go ahead with next command */
1849 		st->sleeping = false;
1850 		st->throttling = false;
1851 	}
1852 
1853 	if (st->listen)
1854 	{							/* are we receiver? */
1855 		if (commands[st->state]->type == SQL_COMMAND)
1856 		{
1857 			if (debug)
1858 				fprintf(stderr, "client %d receiving\n", st->id);
1859 			if (!PQconsumeInput(st->con))
1860 			{					/* there's something wrong */
1861 				fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
1862 				return clientDone(st);
1863 			}
1864 			if (PQisBusy(st->con))
1865 				return true;	/* don't have the whole result yet */
1866 		}
1867 
1868 		/*
1869 		 * command finished: accumulate per-command execution times in
1870 		 * thread-local data structure, if per-command latencies are requested
1871 		 */
1872 		if (is_latencies)
1873 		{
1874 			if (INSTR_TIME_IS_ZERO(now))
1875 				INSTR_TIME_SET_CURRENT(now);
1876 
1877 			/* XXX could use a mutex here, but we choose not to */
1878 			addToSimpleStats(&commands[st->state]->stats,
1879 							 INSTR_TIME_GET_DOUBLE(now) -
1880 							 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
1881 		}
1882 
1883 		/* transaction finished: calculate latency and log the transaction */
1884 		if (commands[st->state + 1] == NULL)
1885 		{
1886 			if (progress || throttle_delay || latency_limit ||
1887 				per_script_stats || use_log)
1888 				processXactStats(thread, st, &now, false, agg);
1889 			else
1890 				thread->stats.cnt++;
1891 		}
1892 
1893 		if (commands[st->state]->type == SQL_COMMAND)
1894 		{
1895 			/*
1896 			 * Read and discard the query result; note this is not included in
1897 			 * the statement latency numbers.
1898 			 */
1899 			res = PQgetResult(st->con);
1900 			switch (PQresultStatus(res))
1901 			{
1902 				case PGRES_COMMAND_OK:
1903 				case PGRES_TUPLES_OK:
1904 					break;		/* OK */
1905 				default:
1906 					fprintf(stderr, "client %d aborted in state %d: %s",
1907 							st->id, st->state, PQerrorMessage(st->con));
1908 					PQclear(res);
1909 					return clientDone(st);
1910 			}
1911 			PQclear(res);
1912 			discard_response(st);
1913 		}
1914 
1915 		if (commands[st->state + 1] == NULL)
1916 		{
1917 			if (is_connect)
1918 			{
1919 				PQfinish(st->con);
1920 				st->con = NULL;
1921 			}
1922 
1923 			++st->cnt;
1924 			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1925 				return clientDone(st);	/* exit success */
1926 		}
1927 
1928 		/* increment state counter */
1929 		st->state++;
1930 		if (commands[st->state] == NULL)
1931 		{
1932 			st->state = 0;
1933 			st->use_file = chooseScript(thread);
1934 			commands = sql_script[st->use_file].commands;
1935 			if (debug)
1936 				fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
1937 						sql_script[st->use_file].desc);
1938 			st->is_throttled = false;
1939 
1940 			/*
1941 			 * No transaction is underway anymore, which means there is
1942 			 * nothing to listen to right now.  When throttling rate limits
1943 			 * are active, a sleep will happen next, as the next transaction
1944 			 * starts.  And then in any case the next SQL command will set
1945 			 * listen back to true.
1946 			 */
1947 			st->listen = false;
1948 			trans_needs_throttle = (throttle_delay > 0);
1949 		}
1950 	}
1951 
1952 	if (st->con == NULL)
1953 	{
1954 		instr_time	start,
1955 					end;
1956 
1957 		INSTR_TIME_SET_CURRENT(start);
1958 		if ((st->con = doConnect()) == NULL)
1959 		{
1960 			fprintf(stderr, "client %d aborted while establishing connection\n",
1961 					st->id);
1962 			return clientDone(st);
1963 		}
1964 		INSTR_TIME_SET_CURRENT(end);
1965 		INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
1966 
1967 		/* Reset session-local state */
1968 		st->listen = false;
1969 		st->sleeping = false;
1970 		st->throttling = false;
1971 		memset(st->prepared, 0, sizeof(st->prepared));
1972 	}
1973 
1974 	/*
1975 	 * This ensures that a throttling delay is inserted before proceeding with
1976 	 * sql commands, after the first transaction. The first transaction
1977 	 * throttling is performed when first entering doCustom.
1978 	 */
1979 	if (trans_needs_throttle)
1980 	{
1981 		trans_needs_throttle = false;
1982 		goto top;
1983 	}
1984 
1985 	/* Record transaction start time under logging, progress or throttling */
1986 	if ((use_log || progress || throttle_delay || latency_limit ||
1987 		 per_script_stats) && st->state == 0)
1988 	{
1989 		INSTR_TIME_SET_CURRENT(st->txn_begin);
1990 
1991 		/*
1992 		 * When not throttling, this is also the transaction's scheduled start
1993 		 * time.
1994 		 */
1995 		if (!throttle_delay)
1996 			st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
1997 	}
1998 
1999 	/* Record statement start time if per-command latencies are requested */
2000 	if (is_latencies)
2001 		INSTR_TIME_SET_CURRENT(st->stmt_begin);
2002 
2003 	if (commands[st->state]->type == SQL_COMMAND)
2004 	{
2005 		const Command *command = commands[st->state];
2006 		int			r;
2007 
2008 		if (querymode == QUERY_SIMPLE)
2009 		{
2010 			char	   *sql;
2011 
2012 			sql = pg_strdup(command->argv[0]);
2013 			sql = assignVariables(st, sql);
2014 
2015 			if (debug)
2016 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
2017 			r = PQsendQuery(st->con, sql);
2018 			free(sql);
2019 		}
2020 		else if (querymode == QUERY_EXTENDED)
2021 		{
2022 			const char *sql = command->argv[0];
2023 			const char *params[MAX_ARGS];
2024 
2025 			getQueryParams(st, command, params);
2026 
2027 			if (debug)
2028 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
2029 			r = PQsendQueryParams(st->con, sql, command->argc - 1,
2030 								  NULL, params, NULL, NULL, 0);
2031 		}
2032 		else if (querymode == QUERY_PREPARED)
2033 		{
2034 			char		name[MAX_PREPARE_NAME];
2035 			const char *params[MAX_ARGS];
2036 
2037 			if (!st->prepared[st->use_file])
2038 			{
2039 				int			j;
2040 
2041 				for (j = 0; commands[j] != NULL; j++)
2042 				{
2043 					PGresult   *res;
2044 					char		name[MAX_PREPARE_NAME];
2045 
2046 					if (commands[j]->type != SQL_COMMAND)
2047 						continue;
2048 					preparedStatementName(name, st->use_file, j);
2049 					res = PQprepare(st->con, name,
2050 						  commands[j]->argv[0], commands[j]->argc - 1, NULL);
2051 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
2052 						fprintf(stderr, "%s", PQerrorMessage(st->con));
2053 					PQclear(res);
2054 				}
2055 				st->prepared[st->use_file] = true;
2056 			}
2057 
2058 			getQueryParams(st, command, params);
2059 			preparedStatementName(name, st->use_file, st->state);
2060 
2061 			if (debug)
2062 				fprintf(stderr, "client %d sending %s\n", st->id, name);
2063 			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
2064 									params, NULL, NULL, 0);
2065 		}
2066 		else	/* unknown sql mode */
2067 			r = 0;
2068 
2069 		if (r == 0)
2070 		{
2071 			if (debug)
2072 				fprintf(stderr, "client %d could not send %s\n",
2073 						st->id, command->argv[0]);
2074 			st->ecnt++;
2075 		}
2076 		else
2077 			st->listen = true;	/* flags that should be listened */
2078 	}
2079 	else if (commands[st->state]->type == META_COMMAND)
2080 	{
2081 		int			argc = commands[st->state]->argc,
2082 					i;
2083 		char	  **argv = commands[st->state]->argv;
2084 
2085 		if (debug)
2086 		{
2087 			fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
2088 			for (i = 1; i < argc; i++)
2089 				fprintf(stderr, " %s", argv[i]);
2090 			fprintf(stderr, "\n");
2091 		}
2092 
2093 		if (pg_strcasecmp(argv[0], "set") == 0)
2094 		{
2095 			PgBenchExpr *expr = commands[st->state]->expr;
2096 			PgBenchValue result;
2097 
2098 			if (!evaluateExpr(thread, st, expr, &result))
2099 			{
2100 				st->ecnt++;
2101 				return true;
2102 			}
2103 
2104 			if (!putVariableNumber(st, argv[0], argv[1], &result))
2105 			{
2106 				st->ecnt++;
2107 				return true;
2108 			}
2109 
2110 			st->listen = true;
2111 		}
2112 		else if (pg_strcasecmp(argv[0], "sleep") == 0)
2113 		{
2114 			char	   *var;
2115 			int			usec;
2116 			instr_time	now;
2117 
2118 			if (*argv[1] == ':')
2119 			{
2120 				if ((var = getVariable(st, argv[1] + 1)) == NULL)
2121 				{
2122 					fprintf(stderr, "%s: undefined variable \"%s\"\n",
2123 							argv[0], argv[1]);
2124 					st->ecnt++;
2125 					return true;
2126 				}
2127 				usec = atoi(var);
2128 			}
2129 			else
2130 				usec = atoi(argv[1]);
2131 
2132 			if (argc > 2)
2133 			{
2134 				if (pg_strcasecmp(argv[2], "ms") == 0)
2135 					usec *= 1000;
2136 				else if (pg_strcasecmp(argv[2], "s") == 0)
2137 					usec *= 1000000;
2138 			}
2139 			else
2140 				usec *= 1000000;
2141 
2142 			INSTR_TIME_SET_CURRENT(now);
2143 			st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
2144 			st->sleeping = true;
2145 
2146 			st->listen = true;
2147 		}
2148 		else if (pg_strcasecmp(argv[0], "setshell") == 0)
2149 		{
2150 			bool		ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
2151 
2152 			if (timer_exceeded) /* timeout */
2153 				return clientDone(st);
2154 			else if (!ret)		/* on error */
2155 			{
2156 				st->ecnt++;
2157 				return true;
2158 			}
2159 			else	/* succeeded */
2160 				st->listen = true;
2161 		}
2162 		else if (pg_strcasecmp(argv[0], "shell") == 0)
2163 		{
2164 			bool		ret = runShellCommand(st, NULL, argv + 1, argc - 1);
2165 
2166 			if (timer_exceeded) /* timeout */
2167 				return clientDone(st);
2168 			else if (!ret)		/* on error */
2169 			{
2170 				st->ecnt++;
2171 				return true;
2172 			}
2173 			else	/* succeeded */
2174 				st->listen = true;
2175 		}
2176 
2177 		/* after a meta command, immediately proceed with next command */
2178 		goto top;
2179 	}
2180 
2181 	return true;
2182 }
2183 
2184 /*
2185  * print log entry after completing one transaction.
2186  */
2187 static void
doLog(TState * thread,CState * st,instr_time * now,StatsData * agg,bool skipped,double latency,double lag)2188 doLog(TState *thread, CState *st, instr_time *now,
2189 	  StatsData *agg, bool skipped, double latency, double lag)
2190 {
2191 	FILE	   *logfile = thread->logfile;
2192 
2193 	Assert(use_log);
2194 
2195 	/*
2196 	 * Skip the log entry if sampling is enabled and this row doesn't belong
2197 	 * to the random sample.
2198 	 */
2199 	if (sample_rate != 0.0 &&
2200 		pg_erand48(thread->random_state) > sample_rate)
2201 		return;
2202 
2203 	/* should we aggregate the results or not? */
2204 	if (agg_interval > 0)
2205 	{
2206 		/*
2207 		 * Loop until we reach the interval of the current transaction, and
2208 		 * print all the empty intervals in between (this may happen with very
2209 		 * low tps, e.g. --rate=0.1).
2210 		 */
2211 		while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
2212 		{
2213 			/* print aggregated report to logfile */
2214 			fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
2215 					agg->start_time,
2216 					agg->cnt,
2217 					agg->latency.sum,
2218 					agg->latency.sum2,
2219 					agg->latency.min,
2220 					agg->latency.max);
2221 			if (throttle_delay)
2222 			{
2223 				fprintf(logfile, " %.0f %.0f %.0f %.0f",
2224 						agg->lag.sum,
2225 						agg->lag.sum2,
2226 						agg->lag.min,
2227 						agg->lag.max);
2228 				if (latency_limit)
2229 					fprintf(logfile, " " INT64_FORMAT, agg->skipped);
2230 			}
2231 			fputc('\n', logfile);
2232 
2233 			/* reset data and move to next interval */
2234 			initStats(agg, agg->start_time + agg_interval);
2235 		}
2236 
2237 		/* accumulate the current transaction */
2238 		accumStats(agg, skipped, latency, lag);
2239 	}
2240 	else
2241 	{
2242 		/* no, print raw transactions */
2243 #ifndef WIN32
2244 
2245 		/* This is more than we really ought to know about instr_time */
2246 		if (skipped)
2247 			fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
2248 					st->id, st->cnt, st->use_file,
2249 					(long) now->tv_sec, (long) now->tv_usec);
2250 		else
2251 			fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
2252 					st->id, st->cnt, latency, st->use_file,
2253 					(long) now->tv_sec, (long) now->tv_usec);
2254 #else
2255 
2256 		/* On Windows, instr_time doesn't provide a timestamp anyway */
2257 		if (skipped)
2258 			fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0",
2259 					st->id, st->cnt, st->use_file);
2260 		else
2261 			fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0",
2262 					st->id, st->cnt, latency, st->use_file);
2263 #endif
2264 		if (throttle_delay)
2265 			fprintf(logfile, " %.0f", lag);
2266 		fputc('\n', logfile);
2267 	}
2268 }
2269 
2270 /*
2271  * Accumulate and report statistics at end of a transaction.
2272  *
2273  * (This is also called when a transaction is late and thus skipped.)
2274  */
2275 static void
processXactStats(TState * thread,CState * st,instr_time * now,bool skipped,StatsData * agg)2276 processXactStats(TState *thread, CState *st, instr_time *now,
2277 				 bool skipped, StatsData *agg)
2278 {
2279 	double		latency = 0.0,
2280 				lag = 0.0;
2281 
2282 	if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now))
2283 		INSTR_TIME_SET_CURRENT(*now);
2284 
2285 	if (!skipped)
2286 	{
2287 		/* compute latency & lag */
2288 		latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
2289 		lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
2290 	}
2291 
2292 	if (progress || throttle_delay || latency_limit)
2293 	{
2294 		accumStats(&thread->stats, skipped, latency, lag);
2295 
2296 		/* count transactions over the latency limit, if needed */
2297 		if (latency_limit && latency > latency_limit)
2298 			thread->latency_late++;
2299 	}
2300 	else
2301 		thread->stats.cnt++;
2302 
2303 	if (use_log)
2304 		doLog(thread, st, now, agg, skipped, latency, lag);
2305 
2306 	/* XXX could use a mutex here, but we choose not to */
2307 	if (per_script_stats)
2308 		accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
2309 }
2310 
2311 
2312 /* discard connections */
2313 static void
disconnect_all(CState * state,int length)2314 disconnect_all(CState *state, int length)
2315 {
2316 	int			i;
2317 
2318 	for (i = 0; i < length; i++)
2319 	{
2320 		if (state[i].con)
2321 		{
2322 			PQfinish(state[i].con);
2323 			state[i].con = NULL;
2324 		}
2325 	}
2326 }
2327 
2328 /* create tables and setup data */
2329 static void
init(bool is_no_vacuum)2330 init(bool is_no_vacuum)
2331 {
2332 /*
2333  * The scale factor at/beyond which 32-bit integers are insufficient for
2334  * storing TPC-B account IDs.
2335  *
2336  * Although the actual threshold is 21474, we use 20000 because it is easier to
2337  * document and remember, and isn't that far away from the real threshold.
2338  */
2339 #define SCALE_32BIT_THRESHOLD 20000
2340 
2341 	/*
2342 	 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
2343 	 * fields in these table declarations were intended to comply with that.
2344 	 * The pgbench_accounts table complies with that because the "filler"
2345 	 * column is set to blank-padded empty string. But for all other tables
2346 	 * the columns default to NULL and so don't actually take any space.  We
2347 	 * could fix that by giving them non-null default values.  However, that
2348 	 * would completely break comparability of pgbench results with prior
2349 	 * versions. Since pgbench has never pretended to be fully TPC-B compliant
2350 	 * anyway, we stick with the historical behavior.
2351 	 */
2352 	struct ddlinfo
2353 	{
2354 		const char *table;		/* table name */
2355 		const char *smcols;		/* column decls if accountIDs are 32 bits */
2356 		const char *bigcols;	/* column decls if accountIDs are 64 bits */
2357 		int			declare_fillfactor;
2358 	};
2359 	static const struct ddlinfo DDLs[] = {
2360 		{
2361 			"pgbench_history",
2362 			"tid int,bid int,aid    int,delta int,mtime timestamp,filler char(22)",
2363 			"tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
2364 			0
2365 		},
2366 		{
2367 			"pgbench_tellers",
2368 			"tid int not null,bid int,tbalance int,filler char(84)",
2369 			"tid int not null,bid int,tbalance int,filler char(84)",
2370 			1
2371 		},
2372 		{
2373 			"pgbench_accounts",
2374 			"aid    int not null,bid int,abalance int,filler char(84)",
2375 			"aid bigint not null,bid int,abalance int,filler char(84)",
2376 			1
2377 		},
2378 		{
2379 			"pgbench_branches",
2380 			"bid int not null,bbalance int,filler char(88)",
2381 			"bid int not null,bbalance int,filler char(88)",
2382 			1
2383 		}
2384 	};
2385 	static const char *const DDLINDEXes[] = {
2386 		"alter table pgbench_branches add primary key (bid)",
2387 		"alter table pgbench_tellers add primary key (tid)",
2388 		"alter table pgbench_accounts add primary key (aid)"
2389 	};
2390 	static const char *const DDLKEYs[] = {
2391 		"alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
2392 		"alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
2393 		"alter table pgbench_history add foreign key (bid) references pgbench_branches",
2394 		"alter table pgbench_history add foreign key (tid) references pgbench_tellers",
2395 		"alter table pgbench_history add foreign key (aid) references pgbench_accounts"
2396 	};
2397 
2398 	PGconn	   *con;
2399 	PGresult   *res;
2400 	char		sql[256];
2401 	int			i;
2402 	int64		k;
2403 
2404 	/* used to track elapsed time and estimate of the remaining time */
2405 	instr_time	start,
2406 				diff;
2407 	double		elapsed_sec,
2408 				remaining_sec;
2409 	int			log_interval = 1;
2410 
2411 	if ((con = doConnect()) == NULL)
2412 		exit(1);
2413 
2414 	for (i = 0; i < lengthof(DDLs); i++)
2415 	{
2416 		char		opts[256];
2417 		char		buffer[256];
2418 		const struct ddlinfo *ddl = &DDLs[i];
2419 		const char *cols;
2420 
2421 		/* Remove old table, if it exists. */
2422 		snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
2423 		executeStatement(con, buffer);
2424 
2425 		/* Construct new create table statement. */
2426 		opts[0] = '\0';
2427 		if (ddl->declare_fillfactor)
2428 			snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
2429 					 " with (fillfactor=%d)", fillfactor);
2430 		if (tablespace != NULL)
2431 		{
2432 			char	   *escape_tablespace;
2433 
2434 			escape_tablespace = PQescapeIdentifier(con, tablespace,
2435 												   strlen(tablespace));
2436 			snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
2437 					 " tablespace %s", escape_tablespace);
2438 			PQfreemem(escape_tablespace);
2439 		}
2440 
2441 		cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
2442 
2443 		snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
2444 				 unlogged_tables ? " unlogged" : "",
2445 				 ddl->table, cols, opts);
2446 
2447 		executeStatement(con, buffer);
2448 	}
2449 
2450 	executeStatement(con, "begin");
2451 
2452 	for (i = 0; i < nbranches * scale; i++)
2453 	{
2454 		/* "filler" column defaults to NULL */
2455 		snprintf(sql, sizeof(sql),
2456 				 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2457 				 i + 1);
2458 		executeStatement(con, sql);
2459 	}
2460 
2461 	for (i = 0; i < ntellers * scale; i++)
2462 	{
2463 		/* "filler" column defaults to NULL */
2464 		snprintf(sql, sizeof(sql),
2465 			"insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2466 				 i + 1, i / ntellers + 1);
2467 		executeStatement(con, sql);
2468 	}
2469 
2470 	executeStatement(con, "commit");
2471 
2472 	/*
2473 	 * fill the pgbench_accounts table with some data
2474 	 */
2475 	fprintf(stderr, "creating tables...\n");
2476 
2477 	executeStatement(con, "begin");
2478 	executeStatement(con, "truncate pgbench_accounts");
2479 
2480 	res = PQexec(con, "copy pgbench_accounts from stdin");
2481 	if (PQresultStatus(res) != PGRES_COPY_IN)
2482 	{
2483 		fprintf(stderr, "%s", PQerrorMessage(con));
2484 		exit(1);
2485 	}
2486 	PQclear(res);
2487 
2488 	INSTR_TIME_SET_CURRENT(start);
2489 
2490 	for (k = 0; k < (int64) naccounts * scale; k++)
2491 	{
2492 		int64		j = k + 1;
2493 
2494 		/* "filler" column defaults to blank padded empty string */
2495 		snprintf(sql, sizeof(sql),
2496 				 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2497 				 j, k / naccounts + 1, 0);
2498 		if (PQputline(con, sql))
2499 		{
2500 			fprintf(stderr, "PQputline failed\n");
2501 			exit(1);
2502 		}
2503 
2504 		/*
2505 		 * If we want to stick with the original logging, print a message each
2506 		 * 100k inserted rows.
2507 		 */
2508 		if ((!use_quiet) && (j % 100000 == 0))
2509 		{
2510 			INSTR_TIME_SET_CURRENT(diff);
2511 			INSTR_TIME_SUBTRACT(diff, start);
2512 
2513 			elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2514 			remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2515 
2516 			fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2517 					j, (int64) naccounts * scale,
2518 					(int) (((int64) j * 100) / (naccounts * (int64) scale)),
2519 					elapsed_sec, remaining_sec);
2520 		}
2521 		/* let's not call the timing for each row, but only each 100 rows */
2522 		else if (use_quiet && (j % 100 == 0))
2523 		{
2524 			INSTR_TIME_SET_CURRENT(diff);
2525 			INSTR_TIME_SUBTRACT(diff, start);
2526 
2527 			elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2528 			remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2529 
2530 			/* have we reached the next interval (or end)? */
2531 			if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2532 			{
2533 				fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2534 						j, (int64) naccounts * scale,
2535 						(int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2536 
2537 				/* skip to the next interval */
2538 				log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2539 			}
2540 		}
2541 
2542 	}
2543 	if (PQputline(con, "\\.\n"))
2544 	{
2545 		fprintf(stderr, "very last PQputline failed\n");
2546 		exit(1);
2547 	}
2548 	if (PQendcopy(con))
2549 	{
2550 		fprintf(stderr, "PQendcopy failed\n");
2551 		exit(1);
2552 	}
2553 	executeStatement(con, "commit");
2554 
2555 	/* vacuum */
2556 	if (!is_no_vacuum)
2557 	{
2558 		fprintf(stderr, "vacuum...\n");
2559 		executeStatement(con, "vacuum analyze pgbench_branches");
2560 		executeStatement(con, "vacuum analyze pgbench_tellers");
2561 		executeStatement(con, "vacuum analyze pgbench_accounts");
2562 		executeStatement(con, "vacuum analyze pgbench_history");
2563 	}
2564 
2565 	/*
2566 	 * create indexes
2567 	 */
2568 	fprintf(stderr, "set primary keys...\n");
2569 	for (i = 0; i < lengthof(DDLINDEXes); i++)
2570 	{
2571 		char		buffer[256];
2572 
2573 		strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2574 
2575 		if (index_tablespace != NULL)
2576 		{
2577 			char	   *escape_tablespace;
2578 
2579 			escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2580 												   strlen(index_tablespace));
2581 			snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2582 					 " using index tablespace %s", escape_tablespace);
2583 			PQfreemem(escape_tablespace);
2584 		}
2585 
2586 		executeStatement(con, buffer);
2587 	}
2588 
2589 	/*
2590 	 * create foreign keys
2591 	 */
2592 	if (foreign_keys)
2593 	{
2594 		fprintf(stderr, "set foreign keys...\n");
2595 		for (i = 0; i < lengthof(DDLKEYs); i++)
2596 		{
2597 			executeStatement(con, DDLKEYs[i]);
2598 		}
2599 	}
2600 
2601 	fprintf(stderr, "done.\n");
2602 	PQfinish(con);
2603 }
2604 
2605 /*
2606  * Parse the raw sql and replace :param to $n.
2607  */
2608 static bool
parseQuery(Command * cmd,const char * raw_sql)2609 parseQuery(Command *cmd, const char *raw_sql)
2610 {
2611 	char	   *sql,
2612 			   *p;
2613 
2614 	sql = pg_strdup(raw_sql);
2615 	cmd->argc = 1;
2616 
2617 	p = sql;
2618 	while ((p = strchr(p, ':')) != NULL)
2619 	{
2620 		char		var[13];
2621 		char	   *name;
2622 		int			eaten;
2623 
2624 		name = parseVariable(p, &eaten);
2625 		if (name == NULL)
2626 		{
2627 			while (*p == ':')
2628 			{
2629 				p++;
2630 			}
2631 			continue;
2632 		}
2633 
2634 		if (cmd->argc >= MAX_ARGS)
2635 		{
2636 			fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2637 			pg_free(name);
2638 			return false;
2639 		}
2640 
2641 		sprintf(var, "$%d", cmd->argc);
2642 		p = replaceVariable(&sql, p, eaten, var);
2643 
2644 		cmd->argv[cmd->argc] = name;
2645 		cmd->argc++;
2646 	}
2647 
2648 	cmd->argv[0] = sql;
2649 	return true;
2650 }
2651 
2652 /*
2653  * Simple error-printing function, might be needed by lexer
2654  */
2655 static void
pgbench_error(const char * fmt,...)2656 pgbench_error(const char *fmt,...)
2657 {
2658 	va_list		ap;
2659 
2660 	fflush(stdout);
2661 	va_start(ap, fmt);
2662 	vfprintf(stderr, _(fmt), ap);
2663 	va_end(ap);
2664 }
2665 
2666 /*
2667  * syntax error while parsing a script (in practice, while parsing a
2668  * backslash command, because we don't detect syntax errors in SQL)
2669  *
2670  * source: source of script (filename or builtin-script ID)
2671  * lineno: line number within script (count from 1)
2672  * line: whole line of backslash command, if available
2673  * command: backslash command name, if available
2674  * msg: the actual error message
2675  * more: optional extra message
2676  * column: zero-based column number, or -1 if unknown
2677  */
2678 void
syntax_error(const char * source,int lineno,const char * line,const char * command,const char * msg,const char * more,int column)2679 syntax_error(const char *source, int lineno,
2680 			 const char *line, const char *command,
2681 			 const char *msg, const char *more, int column)
2682 {
2683 	fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2684 	if (more != NULL)
2685 		fprintf(stderr, " (%s)", more);
2686 	if (column >= 0 && line == NULL)
2687 		fprintf(stderr, " at column %d", column + 1);
2688 	if (command != NULL)
2689 		fprintf(stderr, " in command \"%s\"", command);
2690 	fprintf(stderr, "\n");
2691 	if (line != NULL)
2692 	{
2693 		fprintf(stderr, "%s\n", line);
2694 		if (column >= 0)
2695 		{
2696 			int			i;
2697 
2698 			for (i = 0; i < column; i++)
2699 				fprintf(stderr, " ");
2700 			fprintf(stderr, "^ error found here\n");
2701 		}
2702 	}
2703 	exit(1);
2704 }
2705 
2706 /*
2707  * Parse a SQL command; return a Command struct, or NULL if it's a comment
2708  *
2709  * On entry, psqlscan.l has collected the command into "buf", so we don't
2710  * really need to do much here except check for comment and set up a
2711  * Command struct.
2712  */
2713 static Command *
process_sql_command(PQExpBuffer buf,const char * source)2714 process_sql_command(PQExpBuffer buf, const char *source)
2715 {
2716 	Command    *my_command;
2717 	char	   *p;
2718 	char	   *nlpos;
2719 
2720 	/* Skip any leading whitespace, as well as "--" style comments */
2721 	p = buf->data;
2722 	for (;;)
2723 	{
2724 		if (isspace((unsigned char) *p))
2725 			p++;
2726 		else if (strncmp(p, "--", 2) == 0)
2727 		{
2728 			p = strchr(p, '\n');
2729 			if (p == NULL)
2730 				return NULL;
2731 			p++;
2732 		}
2733 		else
2734 			break;
2735 	}
2736 
2737 	/* If there's nothing but whitespace and comments, we're done */
2738 	if (*p == '\0')
2739 		return NULL;
2740 
2741 	/* Allocate and initialize Command structure */
2742 	my_command = (Command *) pg_malloc0(sizeof(Command));
2743 	my_command->command_num = num_commands++;
2744 	my_command->type = SQL_COMMAND;
2745 	my_command->argc = 0;
2746 	initSimpleStats(&my_command->stats);
2747 
2748 	/*
2749 	 * If SQL command is multi-line, we only want to save the first line as
2750 	 * the "line" label.
2751 	 */
2752 	nlpos = strchr(p, '\n');
2753 	if (nlpos)
2754 	{
2755 		my_command->line = pg_malloc(nlpos - p + 1);
2756 		memcpy(my_command->line, p, nlpos - p);
2757 		my_command->line[nlpos - p] = '\0';
2758 	}
2759 	else
2760 		my_command->line = pg_strdup(p);
2761 
2762 	switch (querymode)
2763 	{
2764 		case QUERY_SIMPLE:
2765 			my_command->argv[0] = pg_strdup(p);
2766 			my_command->argc++;
2767 			break;
2768 		case QUERY_EXTENDED:
2769 		case QUERY_PREPARED:
2770 			if (!parseQuery(my_command, p))
2771 				exit(1);
2772 			break;
2773 		default:
2774 			exit(1);
2775 	}
2776 
2777 	return my_command;
2778 }
2779 
2780 /*
2781  * Parse a backslash command; return a Command struct, or NULL if comment
2782  *
2783  * At call, we have scanned only the initial backslash.
2784  */
2785 static Command *
process_backslash_command(PsqlScanState sstate,const char * source)2786 process_backslash_command(PsqlScanState sstate, const char *source)
2787 {
2788 	Command    *my_command;
2789 	PQExpBufferData word_buf;
2790 	int			word_offset;
2791 	int			offsets[MAX_ARGS];		/* offsets of argument words */
2792 	int			start_offset,
2793 				end_offset;
2794 	int			lineno;
2795 	int			j;
2796 
2797 	initPQExpBuffer(&word_buf);
2798 
2799 	/* Remember location of the backslash */
2800 	start_offset = expr_scanner_offset(sstate) - 1;
2801 	lineno = expr_scanner_get_lineno(sstate, start_offset);
2802 
2803 	/* Collect first word of command */
2804 	if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
2805 	{
2806 		termPQExpBuffer(&word_buf);
2807 		return NULL;
2808 	}
2809 
2810 	/* Allocate and initialize Command structure */
2811 	my_command = (Command *) pg_malloc0(sizeof(Command));
2812 	my_command->command_num = num_commands++;
2813 	my_command->type = META_COMMAND;
2814 	my_command->argc = 0;
2815 	initSimpleStats(&my_command->stats);
2816 
2817 	/* Save first word (command name) */
2818 	j = 0;
2819 	offsets[j] = word_offset;
2820 	my_command->argv[j++] = pg_strdup(word_buf.data);
2821 	my_command->argc++;
2822 
2823 	if (pg_strcasecmp(my_command->argv[0], "set") == 0)
2824 	{
2825 		/* For \set, collect var name, then lex the expression. */
2826 		yyscan_t	yyscanner;
2827 
2828 		if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
2829 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
2830 						 "missing argument", NULL, -1);
2831 
2832 		offsets[j] = word_offset;
2833 		my_command->argv[j++] = pg_strdup(word_buf.data);
2834 		my_command->argc++;
2835 
2836 		yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
2837 									  my_command->argv[0]);
2838 
2839 		if (expr_yyparse(yyscanner) != 0)
2840 		{
2841 			/* dead code: exit done from syntax_error called by yyerror */
2842 			exit(1);
2843 		}
2844 
2845 		my_command->expr = expr_parse_result;
2846 
2847 		/* Get location of the ending newline */
2848 		end_offset = expr_scanner_offset(sstate) - 1;
2849 
2850 		/* Save line */
2851 		my_command->line = expr_scanner_get_substring(sstate,
2852 													  start_offset,
2853 													  end_offset);
2854 
2855 		expr_scanner_finish(yyscanner);
2856 
2857 		termPQExpBuffer(&word_buf);
2858 
2859 		return my_command;
2860 	}
2861 
2862 	/* For all other commands, collect remaining words. */
2863 	while (expr_lex_one_word(sstate, &word_buf, &word_offset))
2864 	{
2865 		if (j >= MAX_ARGS)
2866 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
2867 						 "too many arguments", NULL, -1);
2868 
2869 		offsets[j] = word_offset;
2870 		my_command->argv[j++] = pg_strdup(word_buf.data);
2871 		my_command->argc++;
2872 	}
2873 
2874 	/* Get location of the ending newline */
2875 	end_offset = expr_scanner_offset(sstate) - 1;
2876 
2877 	/* Save line */
2878 	my_command->line = expr_scanner_get_substring(sstate,
2879 												  start_offset,
2880 												  end_offset);
2881 
2882 	if (pg_strcasecmp(my_command->argv[0], "sleep") == 0)
2883 	{
2884 		if (my_command->argc < 2)
2885 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
2886 						 "missing argument", NULL, -1);
2887 
2888 		if (my_command->argc > 3)
2889 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
2890 						 "too many arguments", NULL,
2891 						 offsets[3] - start_offset);
2892 
2893 		/*
2894 		 * Split argument into number and unit to allow "sleep 1ms" etc. We
2895 		 * don't have to terminate the number argument with null because it
2896 		 * will be parsed with atoi, which ignores trailing non-digit
2897 		 * characters.
2898 		 */
2899 		if (my_command->argc == 2 && my_command->argv[1][0] != ':')
2900 		{
2901 			char	   *c = my_command->argv[1];
2902 
2903 			while (isdigit((unsigned char) *c))
2904 				c++;
2905 			if (*c)
2906 			{
2907 				my_command->argv[2] = c;
2908 				offsets[2] = offsets[1] + (c - my_command->argv[1]);
2909 				my_command->argc = 3;
2910 			}
2911 		}
2912 
2913 		if (my_command->argc == 3)
2914 		{
2915 			if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
2916 				pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
2917 				pg_strcasecmp(my_command->argv[2], "s") != 0)
2918 				syntax_error(source, lineno, my_command->line, my_command->argv[0],
2919 							 "unrecognized time unit, must be us, ms or s",
2920 							 my_command->argv[2], offsets[2] - start_offset);
2921 		}
2922 	}
2923 	else if (pg_strcasecmp(my_command->argv[0], "setshell") == 0)
2924 	{
2925 		if (my_command->argc < 3)
2926 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
2927 						 "missing argument", NULL, -1);
2928 	}
2929 	else if (pg_strcasecmp(my_command->argv[0], "shell") == 0)
2930 	{
2931 		if (my_command->argc < 2)
2932 			syntax_error(source, lineno, my_command->line, my_command->argv[0],
2933 						 "missing command", NULL, -1);
2934 	}
2935 	else
2936 	{
2937 		syntax_error(source, lineno, my_command->line, my_command->argv[0],
2938 					 "invalid command", NULL, -1);
2939 	}
2940 
2941 	termPQExpBuffer(&word_buf);
2942 
2943 	return my_command;
2944 }
2945 
2946 /*
2947  * Parse a script (either the contents of a file, or a built-in script)
2948  * and add it to the list of scripts.
2949  */
2950 static void
ParseScript(const char * script,const char * desc,int weight)2951 ParseScript(const char *script, const char *desc, int weight)
2952 {
2953 	ParsedScript ps;
2954 	PsqlScanState sstate;
2955 	PQExpBufferData line_buf;
2956 	int			alloc_num;
2957 	int			index;
2958 
2959 #define COMMANDS_ALLOC_NUM 128
2960 	alloc_num = COMMANDS_ALLOC_NUM;
2961 
2962 	/* Initialize all fields of ps */
2963 	ps.desc = desc;
2964 	ps.weight = weight;
2965 	ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2966 	initStats(&ps.stats, 0.0);
2967 
2968 	/* Prepare to parse script */
2969 	sstate = psql_scan_create(&pgbench_callbacks);
2970 
2971 	/*
2972 	 * Ideally, we'd scan scripts using the encoding and stdstrings settings
2973 	 * we get from a DB connection.  However, without major rearrangement of
2974 	 * pgbench's argument parsing, we can't have a DB connection at the time
2975 	 * we parse scripts.  Using SQL_ASCII (encoding 0) should work well enough
2976 	 * with any backend-safe encoding, though conceivably we could be fooled
2977 	 * if a script file uses a client-only encoding.  We also assume that
2978 	 * stdstrings should be true, which is a bit riskier.
2979 	 */
2980 	psql_scan_setup(sstate, script, strlen(script), 0, true);
2981 
2982 	initPQExpBuffer(&line_buf);
2983 
2984 	index = 0;
2985 
2986 	for (;;)
2987 	{
2988 		PsqlScanResult sr;
2989 		promptStatus_t prompt;
2990 		Command    *command;
2991 
2992 		resetPQExpBuffer(&line_buf);
2993 
2994 		sr = psql_scan(sstate, &line_buf, &prompt);
2995 
2996 		/* If we collected a SQL command, process that */
2997 		command = process_sql_command(&line_buf, desc);
2998 		if (command)
2999 		{
3000 			ps.commands[index] = command;
3001 			index++;
3002 
3003 			if (index >= alloc_num)
3004 			{
3005 				alloc_num += COMMANDS_ALLOC_NUM;
3006 				ps.commands = (Command **)
3007 					pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
3008 			}
3009 		}
3010 
3011 		/* If we reached a backslash, process that */
3012 		if (sr == PSCAN_BACKSLASH)
3013 		{
3014 			command = process_backslash_command(sstate, desc);
3015 			if (command)
3016 			{
3017 				ps.commands[index] = command;
3018 				index++;
3019 
3020 				if (index >= alloc_num)
3021 				{
3022 					alloc_num += COMMANDS_ALLOC_NUM;
3023 					ps.commands = (Command **)
3024 						pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
3025 				}
3026 			}
3027 		}
3028 
3029 		/* Done if we reached EOF */
3030 		if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
3031 			break;
3032 	}
3033 
3034 	ps.commands[index] = NULL;
3035 
3036 	addScript(ps);
3037 
3038 	termPQExpBuffer(&line_buf);
3039 	psql_scan_finish(sstate);
3040 	psql_scan_destroy(sstate);
3041 }
3042 
3043 /*
3044  * Read the entire contents of file fd, and return it in a malloc'd buffer.
3045  *
3046  * The buffer will typically be larger than necessary, but we don't care
3047  * in this program, because we'll free it as soon as we've parsed the script.
3048  */
3049 static char *
read_file_contents(FILE * fd)3050 read_file_contents(FILE *fd)
3051 {
3052 	char	   *buf;
3053 	size_t		buflen = BUFSIZ;
3054 	size_t		used = 0;
3055 
3056 	buf = (char *) pg_malloc(buflen);
3057 
3058 	for (;;)
3059 	{
3060 		size_t		nread;
3061 
3062 		nread = fread(buf + used, 1, BUFSIZ, fd);
3063 		used += nread;
3064 		/* If fread() read less than requested, must be EOF or error */
3065 		if (nread < BUFSIZ)
3066 			break;
3067 		/* Enlarge buf so we can read some more */
3068 		buflen += BUFSIZ;
3069 		buf = (char *) pg_realloc(buf, buflen);
3070 	}
3071 	/* There is surely room for a terminator */
3072 	buf[used] = '\0';
3073 
3074 	return buf;
3075 }
3076 
3077 /*
3078  * Given a file name, read it and add its script to the list.
3079  * "-" means to read stdin.
3080  * NB: filename must be storage that won't disappear.
3081  */
3082 static void
process_file(const char * filename,int weight)3083 process_file(const char *filename, int weight)
3084 {
3085 	FILE	   *fd;
3086 	char	   *buf;
3087 
3088 	/* Slurp the file contents into "buf" */
3089 	if (strcmp(filename, "-") == 0)
3090 		fd = stdin;
3091 	else if ((fd = fopen(filename, "r")) == NULL)
3092 	{
3093 		fprintf(stderr, "could not open file \"%s\": %s\n",
3094 				filename, strerror(errno));
3095 		exit(1);
3096 	}
3097 
3098 	buf = read_file_contents(fd);
3099 
3100 	if (ferror(fd))
3101 	{
3102 		fprintf(stderr, "could not read file \"%s\": %s\n",
3103 				filename, strerror(errno));
3104 		exit(1);
3105 	}
3106 
3107 	if (fd != stdin)
3108 		fclose(fd);
3109 
3110 	ParseScript(buf, filename, weight);
3111 
3112 	free(buf);
3113 }
3114 
3115 /* Parse the given builtin script and add it to the list. */
3116 static void
process_builtin(const BuiltinScript * bi,int weight)3117 process_builtin(const BuiltinScript *bi, int weight)
3118 {
3119 	ParseScript(bi->script, bi->desc, weight);
3120 }
3121 
3122 /* show available builtin scripts */
3123 static void
listAvailableScripts(void)3124 listAvailableScripts(void)
3125 {
3126 	int			i;
3127 
3128 	fprintf(stderr, "Available builtin scripts:\n");
3129 	for (i = 0; i < lengthof(builtin_script); i++)
3130 		fprintf(stderr, "\t%s\n", builtin_script[i].name);
3131 	fprintf(stderr, "\n");
3132 }
3133 
3134 /* return builtin script "name" if unambiguous, fails if not found */
3135 static const BuiltinScript *
findBuiltin(const char * name)3136 findBuiltin(const char *name)
3137 {
3138 	int			i,
3139 				found = 0,
3140 				len = strlen(name);
3141 	const BuiltinScript *result = NULL;
3142 
3143 	for (i = 0; i < lengthof(builtin_script); i++)
3144 	{
3145 		if (strncmp(builtin_script[i].name, name, len) == 0)
3146 		{
3147 			result = &builtin_script[i];
3148 			found++;
3149 		}
3150 	}
3151 
3152 	/* ok, unambiguous result */
3153 	if (found == 1)
3154 		return result;
3155 
3156 	/* error cases */
3157 	if (found == 0)
3158 		fprintf(stderr, "no builtin script found for name \"%s\"\n", name);
3159 	else	/* found > 1 */
3160 		fprintf(stderr,
3161 				"ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name);
3162 
3163 	listAvailableScripts();
3164 	exit(1);
3165 }
3166 
3167 /*
3168  * Determine the weight specification from a script option (-b, -f), if any,
3169  * and return it as an integer (1 is returned if there's no weight).  The
3170  * script name is returned in *script as a malloc'd string.
3171  */
3172 static int
parseScriptWeight(const char * option,char ** script)3173 parseScriptWeight(const char *option, char **script)
3174 {
3175 	char	   *sep;
3176 	int			weight;
3177 
3178 	if ((sep = strrchr(option, WSEP)))
3179 	{
3180 		int			namelen = sep - option;
3181 		long		wtmp;
3182 		char	   *badp;
3183 
3184 		/* generate the script name */
3185 		*script = pg_malloc(namelen + 1);
3186 		strncpy(*script, option, namelen);
3187 		(*script)[namelen] = '\0';
3188 
3189 		/* process digits of the weight spec */
3190 		errno = 0;
3191 		wtmp = strtol(sep + 1, &badp, 10);
3192 		if (errno != 0 || badp == sep + 1 || *badp != '\0')
3193 		{
3194 			fprintf(stderr, "invalid weight specification: %s\n", sep);
3195 			exit(1);
3196 		}
3197 		if (wtmp > INT_MAX || wtmp < 0)
3198 		{
3199 			fprintf(stderr,
3200 			"weight specification out of range (0 .. %u): " INT64_FORMAT "\n",
3201 					INT_MAX, (int64) wtmp);
3202 			exit(1);
3203 		}
3204 		weight = wtmp;
3205 	}
3206 	else
3207 	{
3208 		*script = pg_strdup(option);
3209 		weight = 1;
3210 	}
3211 
3212 	return weight;
3213 }
3214 
3215 /* append a script to the list of scripts to process */
3216 static void
addScript(ParsedScript script)3217 addScript(ParsedScript script)
3218 {
3219 	if (script.commands == NULL || script.commands[0] == NULL)
3220 	{
3221 		fprintf(stderr, "empty command list for script \"%s\"\n", script.desc);
3222 		exit(1);
3223 	}
3224 
3225 	if (num_scripts >= MAX_SCRIPTS)
3226 	{
3227 		fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS);
3228 		exit(1);
3229 	}
3230 
3231 	sql_script[num_scripts] = script;
3232 	num_scripts++;
3233 }
3234 
3235 static void
printSimpleStats(char * prefix,SimpleStats * ss)3236 printSimpleStats(char *prefix, SimpleStats *ss)
3237 {
3238 	/* print NaN if no transactions where executed */
3239 	double		latency = ss->sum / ss->count;
3240 	double		stddev = sqrt(ss->sum2 / ss->count - latency * latency);
3241 
3242 	printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
3243 	printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
3244 }
3245 
3246 /* print out results */
3247 static void
printResults(TState * threads,StatsData * total,instr_time total_time,instr_time conn_total_time,int latency_late)3248 printResults(TState *threads, StatsData *total, instr_time total_time,
3249 			 instr_time conn_total_time, int latency_late)
3250 {
3251 	double		time_include,
3252 				tps_include,
3253 				tps_exclude;
3254 
3255 	time_include = INSTR_TIME_GET_DOUBLE(total_time);
3256 	tps_include = total->cnt / time_include;
3257 	tps_exclude = total->cnt / (time_include -
3258 						(INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
3259 
3260 	/* Report test parameters. */
3261 	printf("transaction type: %s\n",
3262 		   num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
3263 	printf("scaling factor: %d\n", scale);
3264 	printf("query mode: %s\n", QUERYMODE[querymode]);
3265 	printf("number of clients: %d\n", nclients);
3266 	printf("number of threads: %d\n", nthreads);
3267 	if (duration <= 0)
3268 	{
3269 		printf("number of transactions per client: %d\n", nxacts);
3270 		printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
3271 			   total->cnt, nxacts * nclients);
3272 	}
3273 	else
3274 	{
3275 		printf("duration: %d s\n", duration);
3276 		printf("number of transactions actually processed: " INT64_FORMAT "\n",
3277 			   total->cnt);
3278 	}
3279 
3280 	/* Remaining stats are nonsensical if we failed to execute any xacts */
3281 	if (total->cnt <= 0)
3282 		return;
3283 
3284 	if (throttle_delay && latency_limit)
3285 		printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
3286 			   total->skipped,
3287 			   100.0 * total->skipped / (total->skipped + total->cnt));
3288 
3289 	if (latency_limit)
3290 		printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n",
3291 			   latency_limit / 1000.0, latency_late,
3292 			   100.0 * latency_late / (total->skipped + total->cnt));
3293 
3294 	if (throttle_delay || progress || latency_limit)
3295 		printSimpleStats("latency", &total->latency);
3296 	else
3297 	{
3298 		/* no measurement, show average latency computed from run time */
3299 		printf("latency average = %.3f ms\n",
3300 			   1000.0 * time_include * nclients / total->cnt);
3301 	}
3302 
3303 	if (throttle_delay)
3304 	{
3305 		/*
3306 		 * Report average transaction lag under rate limit throttling.  This
3307 		 * is the delay between scheduled and actual start times for the
3308 		 * transaction.  The measured lag may be caused by thread/client load,
3309 		 * the database load, or the Poisson throttling process.
3310 		 */
3311 		printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
3312 			   0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
3313 	}
3314 
3315 	printf("tps = %f (including connections establishing)\n", tps_include);
3316 	printf("tps = %f (excluding connections establishing)\n", tps_exclude);
3317 
3318 	/* Report per-script/command statistics */
3319 	if (per_script_stats || latency_limit || is_latencies)
3320 	{
3321 		int			i;
3322 
3323 		for (i = 0; i < num_scripts; i++)
3324 		{
3325 			if (num_scripts > 1)
3326 				printf("SQL script %d: %s\n"
3327 					   " - weight: %d (targets %.1f%% of total)\n"
3328 					   " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
3329 					   i + 1, sql_script[i].desc,
3330 					   sql_script[i].weight,
3331 					   100.0 * sql_script[i].weight / total_weight,
3332 					   sql_script[i].stats.cnt,
3333 					   100.0 * sql_script[i].stats.cnt / total->cnt,
3334 					   sql_script[i].stats.cnt / time_include);
3335 			else
3336 				printf("script statistics:\n");
3337 
3338 			if (latency_limit)
3339 				printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
3340 					   sql_script[i].stats.skipped,
3341 					   100.0 * sql_script[i].stats.skipped /
3342 					(sql_script[i].stats.skipped + sql_script[i].stats.cnt));
3343 
3344 			if (num_scripts > 1)
3345 				printSimpleStats(" - latency", &sql_script[i].stats.latency);
3346 
3347 			/* Report per-command latencies */
3348 			if (is_latencies)
3349 			{
3350 				Command   **commands;
3351 
3352 				printf(" - statement latencies in milliseconds:\n");
3353 
3354 				for (commands = sql_script[i].commands;
3355 					 *commands != NULL;
3356 					 commands++)
3357 					printf("   %11.3f  %s\n",
3358 						   1000.0 * (*commands)->stats.sum /
3359 						   (*commands)->stats.count,
3360 						   (*commands)->line);
3361 			}
3362 		}
3363 	}
3364 }
3365 
3366 
3367 int
main(int argc,char ** argv)3368 main(int argc, char **argv)
3369 {
3370 	static struct option long_options[] = {
3371 		/* systematic long/short named options */
3372 		{"builtin", required_argument, NULL, 'b'},
3373 		{"client", required_argument, NULL, 'c'},
3374 		{"connect", no_argument, NULL, 'C'},
3375 		{"debug", no_argument, NULL, 'd'},
3376 		{"define", required_argument, NULL, 'D'},
3377 		{"file", required_argument, NULL, 'f'},
3378 		{"fillfactor", required_argument, NULL, 'F'},
3379 		{"host", required_argument, NULL, 'h'},
3380 		{"initialize", no_argument, NULL, 'i'},
3381 		{"jobs", required_argument, NULL, 'j'},
3382 		{"log", no_argument, NULL, 'l'},
3383 		{"latency-limit", required_argument, NULL, 'L'},
3384 		{"no-vacuum", no_argument, NULL, 'n'},
3385 		{"port", required_argument, NULL, 'p'},
3386 		{"progress", required_argument, NULL, 'P'},
3387 		{"protocol", required_argument, NULL, 'M'},
3388 		{"quiet", no_argument, NULL, 'q'},
3389 		{"report-latencies", no_argument, NULL, 'r'},
3390 		{"rate", required_argument, NULL, 'R'},
3391 		{"scale", required_argument, NULL, 's'},
3392 		{"select-only", no_argument, NULL, 'S'},
3393 		{"skip-some-updates", no_argument, NULL, 'N'},
3394 		{"time", required_argument, NULL, 'T'},
3395 		{"transactions", required_argument, NULL, 't'},
3396 		{"username", required_argument, NULL, 'U'},
3397 		{"vacuum-all", no_argument, NULL, 'v'},
3398 		/* long-named only options */
3399 		{"foreign-keys", no_argument, &foreign_keys, 1},
3400 		{"index-tablespace", required_argument, NULL, 3},
3401 		{"tablespace", required_argument, NULL, 2},
3402 		{"unlogged-tables", no_argument, &unlogged_tables, 1},
3403 		{"sampling-rate", required_argument, NULL, 4},
3404 		{"aggregate-interval", required_argument, NULL, 5},
3405 		{"progress-timestamp", no_argument, NULL, 6},
3406 		{NULL, 0, NULL, 0}
3407 	};
3408 
3409 	int			c;
3410 	int			is_init_mode = 0;		/* initialize mode? */
3411 	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
3412 	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
3413 	int			optindex;
3414 	bool		scale_given = false;
3415 
3416 	bool		benchmarking_option_set = false;
3417 	bool		initialization_option_set = false;
3418 	bool		internal_script_used = false;
3419 
3420 	CState	   *state;			/* status of clients */
3421 	TState	   *threads;		/* array of thread */
3422 
3423 	instr_time	start_time;		/* start up time */
3424 	instr_time	total_time;
3425 	instr_time	conn_total_time;
3426 	int64		latency_late = 0;
3427 	StatsData	stats;
3428 	int			weight;
3429 
3430 	int			i;
3431 	int			nclients_dealt;
3432 
3433 #ifdef HAVE_GETRLIMIT
3434 	struct rlimit rlim;
3435 #endif
3436 
3437 	PGconn	   *con;
3438 	PGresult   *res;
3439 	char	   *env;
3440 
3441 	progname = get_progname(argv[0]);
3442 
3443 	if (argc > 1)
3444 	{
3445 		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
3446 		{
3447 			usage();
3448 			exit(0);
3449 		}
3450 		if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
3451 		{
3452 			puts("pgbench (PostgreSQL) " PG_VERSION);
3453 			exit(0);
3454 		}
3455 	}
3456 
3457 #ifdef WIN32
3458 	/* stderr is buffered on Win32. */
3459 	setvbuf(stderr, NULL, _IONBF, 0);
3460 #endif
3461 
3462 	if ((env = getenv("PGHOST")) != NULL && *env != '\0')
3463 		pghost = env;
3464 	if ((env = getenv("PGPORT")) != NULL && *env != '\0')
3465 		pgport = env;
3466 	else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
3467 		login = env;
3468 
3469 	state = (CState *) pg_malloc(sizeof(CState));
3470 	memset(state, 0, sizeof(CState));
3471 
3472 	while ((c = getopt_long(argc, argv, "ih:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
3473 	{
3474 		char	   *script;
3475 
3476 		switch (c)
3477 		{
3478 			case 'i':
3479 				is_init_mode++;
3480 				break;
3481 			case 'h':
3482 				pghost = pg_strdup(optarg);
3483 				break;
3484 			case 'n':
3485 				is_no_vacuum++;
3486 				break;
3487 			case 'v':
3488 				do_vacuum_accounts++;
3489 				break;
3490 			case 'p':
3491 				pgport = pg_strdup(optarg);
3492 				break;
3493 			case 'd':
3494 				debug++;
3495 				break;
3496 			case 'c':
3497 				benchmarking_option_set = true;
3498 				nclients = atoi(optarg);
3499 				if (nclients <= 0 || nclients > MAXCLIENTS)
3500 				{
3501 					fprintf(stderr, "invalid number of clients: \"%s\"\n",
3502 							optarg);
3503 					exit(1);
3504 				}
3505 #ifdef HAVE_GETRLIMIT
3506 #ifdef RLIMIT_NOFILE			/* most platforms use RLIMIT_NOFILE */
3507 				if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
3508 #else							/* but BSD doesn't ... */
3509 				if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
3510 #endif   /* RLIMIT_NOFILE */
3511 				{
3512 					fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
3513 					exit(1);
3514 				}
3515 				if (rlim.rlim_cur < nclients + 3)
3516 				{
3517 					fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
3518 							nclients + 3, (long) rlim.rlim_cur);
3519 					fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
3520 					exit(1);
3521 				}
3522 #endif   /* HAVE_GETRLIMIT */
3523 				break;
3524 			case 'j':			/* jobs */
3525 				benchmarking_option_set = true;
3526 				nthreads = atoi(optarg);
3527 				if (nthreads <= 0)
3528 				{
3529 					fprintf(stderr, "invalid number of threads: \"%s\"\n",
3530 							optarg);
3531 					exit(1);
3532 				}
3533 #ifndef ENABLE_THREAD_SAFETY
3534 				if (nthreads != 1)
3535 				{
3536 					fprintf(stderr, "threads are not supported on this platform; use -j1\n");
3537 					exit(1);
3538 				}
3539 #endif   /* !ENABLE_THREAD_SAFETY */
3540 				break;
3541 			case 'C':
3542 				benchmarking_option_set = true;
3543 				is_connect = true;
3544 				break;
3545 			case 'r':
3546 				benchmarking_option_set = true;
3547 				per_script_stats = true;
3548 				is_latencies = true;
3549 				break;
3550 			case 's':
3551 				scale_given = true;
3552 				scale = atoi(optarg);
3553 				if (scale <= 0)
3554 				{
3555 					fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
3556 					exit(1);
3557 				}
3558 				break;
3559 			case 't':
3560 				benchmarking_option_set = true;
3561 				if (duration > 0)
3562 				{
3563 					fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
3564 					exit(1);
3565 				}
3566 				nxacts = atoi(optarg);
3567 				if (nxacts <= 0)
3568 				{
3569 					fprintf(stderr, "invalid number of transactions: \"%s\"\n",
3570 							optarg);
3571 					exit(1);
3572 				}
3573 				break;
3574 			case 'T':
3575 				benchmarking_option_set = true;
3576 				if (nxacts > 0)
3577 				{
3578 					fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
3579 					exit(1);
3580 				}
3581 				duration = atoi(optarg);
3582 				if (duration <= 0)
3583 				{
3584 					fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
3585 					exit(1);
3586 				}
3587 				break;
3588 			case 'U':
3589 				login = pg_strdup(optarg);
3590 				break;
3591 			case 'l':
3592 				benchmarking_option_set = true;
3593 				use_log = true;
3594 				break;
3595 			case 'q':
3596 				initialization_option_set = true;
3597 				use_quiet = true;
3598 				break;
3599 
3600 			case 'b':
3601 				if (strcmp(optarg, "list") == 0)
3602 				{
3603 					listAvailableScripts();
3604 					exit(0);
3605 				}
3606 
3607 				weight = parseScriptWeight(optarg, &script);
3608 				process_builtin(findBuiltin(script), weight);
3609 				benchmarking_option_set = true;
3610 				internal_script_used = true;
3611 				break;
3612 
3613 			case 'S':
3614 				process_builtin(findBuiltin("select-only"), 1);
3615 				benchmarking_option_set = true;
3616 				internal_script_used = true;
3617 				break;
3618 			case 'N':
3619 				process_builtin(findBuiltin("simple-update"), 1);
3620 				benchmarking_option_set = true;
3621 				internal_script_used = true;
3622 				break;
3623 			case 'f':
3624 				weight = parseScriptWeight(optarg, &script);
3625 				process_file(script, weight);
3626 				benchmarking_option_set = true;
3627 				break;
3628 			case 'D':
3629 				{
3630 					char	   *p;
3631 
3632 					benchmarking_option_set = true;
3633 
3634 					if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
3635 					{
3636 						fprintf(stderr, "invalid variable definition: \"%s\"\n",
3637 								optarg);
3638 						exit(1);
3639 					}
3640 
3641 					*p++ = '\0';
3642 					if (!putVariable(&state[0], "option", optarg, p))
3643 						exit(1);
3644 				}
3645 				break;
3646 			case 'F':
3647 				initialization_option_set = true;
3648 				fillfactor = atoi(optarg);
3649 				if (fillfactor < 10 || fillfactor > 100)
3650 				{
3651 					fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
3652 					exit(1);
3653 				}
3654 				break;
3655 			case 'M':
3656 				benchmarking_option_set = true;
3657 				if (num_scripts > 0)
3658 				{
3659 					fprintf(stderr, "query mode (-M) should be specified before any transaction scripts (-f or -b)\n");
3660 					exit(1);
3661 				}
3662 				for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3663 					if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3664 						break;
3665 				if (querymode >= NUM_QUERYMODE)
3666 				{
3667 					fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
3668 							optarg);
3669 					exit(1);
3670 				}
3671 				break;
3672 			case 'P':
3673 				benchmarking_option_set = true;
3674 				progress = atoi(optarg);
3675 				if (progress <= 0)
3676 				{
3677 					fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
3678 							optarg);
3679 					exit(1);
3680 				}
3681 				break;
3682 			case 'R':
3683 				{
3684 					/* get a double from the beginning of option value */
3685 					double		throttle_value = atof(optarg);
3686 
3687 					benchmarking_option_set = true;
3688 
3689 					if (throttle_value <= 0.0)
3690 					{
3691 						fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
3692 						exit(1);
3693 					}
3694 					/* Invert rate limit into a time offset */
3695 					throttle_delay = (int64) (1000000.0 / throttle_value);
3696 				}
3697 				break;
3698 			case 'L':
3699 				{
3700 					double		limit_ms = atof(optarg);
3701 
3702 					if (limit_ms <= 0.0)
3703 					{
3704 						fprintf(stderr, "invalid latency limit: \"%s\"\n",
3705 								optarg);
3706 						exit(1);
3707 					}
3708 					benchmarking_option_set = true;
3709 					latency_limit = (int64) (limit_ms * 1000);
3710 				}
3711 				break;
3712 			case 0:
3713 				/* This covers long options which take no argument. */
3714 				if (foreign_keys || unlogged_tables)
3715 					initialization_option_set = true;
3716 				break;
3717 			case 2:				/* tablespace */
3718 				initialization_option_set = true;
3719 				tablespace = pg_strdup(optarg);
3720 				break;
3721 			case 3:				/* index-tablespace */
3722 				initialization_option_set = true;
3723 				index_tablespace = pg_strdup(optarg);
3724 				break;
3725 			case 4:
3726 				benchmarking_option_set = true;
3727 				sample_rate = atof(optarg);
3728 				if (sample_rate <= 0.0 || sample_rate > 1.0)
3729 				{
3730 					fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
3731 					exit(1);
3732 				}
3733 				break;
3734 			case 5:
3735 #ifdef WIN32
3736 				fprintf(stderr, "--aggregate-interval is not currently supported on Windows\n");
3737 				exit(1);
3738 #else
3739 				benchmarking_option_set = true;
3740 				agg_interval = atoi(optarg);
3741 				if (agg_interval <= 0)
3742 				{
3743 					fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
3744 							optarg);
3745 					exit(1);
3746 				}
3747 #endif
3748 				break;
3749 			case 6:
3750 				progress_timestamp = true;
3751 				benchmarking_option_set = true;
3752 				break;
3753 			default:
3754 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3755 				exit(1);
3756 				break;
3757 		}
3758 	}
3759 
3760 	/* set default script if none */
3761 	if (num_scripts == 0 && !is_init_mode)
3762 	{
3763 		process_builtin(findBuiltin("tpcb-like"), 1);
3764 		benchmarking_option_set = true;
3765 		internal_script_used = true;
3766 	}
3767 
3768 	/* compute total_weight */
3769 	for (i = 0; i < num_scripts; i++)
3770 		/* cannot overflow: weight is 32b, total_weight 64b */
3771 		total_weight += sql_script[i].weight;
3772 
3773 	if (total_weight == 0 && !is_init_mode)
3774 	{
3775 		fprintf(stderr, "total script weight must not be zero\n");
3776 		exit(1);
3777 	}
3778 
3779 	/* show per script stats if several scripts are used */
3780 	if (num_scripts > 1)
3781 		per_script_stats = true;
3782 
3783 	/*
3784 	 * Don't need more threads than there are clients.  (This is not merely an
3785 	 * optimization; throttle_delay is calculated incorrectly below if some
3786 	 * threads have no clients assigned to them.)
3787 	 */
3788 	if (nthreads > nclients)
3789 		nthreads = nclients;
3790 
3791 	/* compute a per thread delay */
3792 	throttle_delay *= nthreads;
3793 
3794 	if (argc > optind)
3795 		dbName = argv[optind];
3796 	else
3797 	{
3798 		if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
3799 			dbName = env;
3800 		else if (login != NULL && *login != '\0')
3801 			dbName = login;
3802 		else
3803 			dbName = "";
3804 	}
3805 
3806 	if (is_init_mode)
3807 	{
3808 		if (benchmarking_option_set)
3809 		{
3810 			fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
3811 			exit(1);
3812 		}
3813 
3814 		init(is_no_vacuum);
3815 		exit(0);
3816 	}
3817 	else
3818 	{
3819 		if (initialization_option_set)
3820 		{
3821 			fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
3822 			exit(1);
3823 		}
3824 	}
3825 
3826 	/* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
3827 	if (nxacts <= 0 && duration <= 0)
3828 		nxacts = DEFAULT_NXACTS;
3829 
3830 	/* --sampling-rate may be used only with -l */
3831 	if (sample_rate > 0.0 && !use_log)
3832 	{
3833 		fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
3834 		exit(1);
3835 	}
3836 
3837 	/* --sampling-rate may not be used with --aggregate-interval */
3838 	if (sample_rate > 0.0 && agg_interval > 0)
3839 	{
3840 		fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
3841 		exit(1);
3842 	}
3843 
3844 	if (agg_interval > 0 && !use_log)
3845 	{
3846 		fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
3847 		exit(1);
3848 	}
3849 
3850 	if (duration > 0 && agg_interval > duration)
3851 	{
3852 		fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
3853 		exit(1);
3854 	}
3855 
3856 	if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
3857 	{
3858 		fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
3859 		exit(1);
3860 	}
3861 
3862 	/*
3863 	 * save main process id in the global variable because process id will be
3864 	 * changed after fork.
3865 	 */
3866 	main_pid = (int) getpid();
3867 
3868 	if (nclients > 1)
3869 	{
3870 		state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
3871 		memset(state + 1, 0, sizeof(CState) * (nclients - 1));
3872 
3873 		/* copy any -D switch values to all clients */
3874 		for (i = 1; i < nclients; i++)
3875 		{
3876 			int			j;
3877 
3878 			state[i].id = i;
3879 			for (j = 0; j < state[0].nvariables; j++)
3880 			{
3881 				Variable   *var = &state[0].variables[j];
3882 
3883 				if (var->is_numeric)
3884 				{
3885 					if (!putVariableNumber(&state[i], "startup",
3886 										   var->name, &var->num_value))
3887 						exit(1);
3888 				}
3889 				else
3890 				{
3891 					if (!putVariable(&state[i], "startup",
3892 									 var->name, var->value))
3893 						exit(1);
3894 				}
3895 			}
3896 		}
3897 	}
3898 
3899 	if (debug)
3900 	{
3901 		if (duration <= 0)
3902 			printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
3903 				   pghost, pgport, nclients, nxacts, dbName);
3904 		else
3905 			printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
3906 				   pghost, pgport, nclients, duration, dbName);
3907 	}
3908 
3909 	/* opening connection... */
3910 	con = doConnect();
3911 	if (con == NULL)
3912 		exit(1);
3913 
3914 	if (internal_script_used)
3915 	{
3916 		/*
3917 		 * get the scaling factor that should be same as count(*) from
3918 		 * pgbench_branches if this is not a custom query
3919 		 */
3920 		res = PQexec(con, "select count(*) from pgbench_branches");
3921 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
3922 		{
3923 			char	   *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
3924 
3925 			fprintf(stderr, "%s", PQerrorMessage(con));
3926 			if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
3927 			{
3928 				fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
3929 			}
3930 
3931 			exit(1);
3932 		}
3933 		scale = atoi(PQgetvalue(res, 0, 0));
3934 		if (scale < 0)
3935 		{
3936 			fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
3937 					PQgetvalue(res, 0, 0));
3938 			exit(1);
3939 		}
3940 		PQclear(res);
3941 
3942 		/* warn if we override user-given -s switch */
3943 		if (scale_given)
3944 			fprintf(stderr,
3945 					"scale option ignored, using count from pgbench_branches table (%d)\n",
3946 					scale);
3947 	}
3948 
3949 	/*
3950 	 * :scale variables normally get -s or database scale, but don't override
3951 	 * an explicit -D switch
3952 	 */
3953 	if (lookupVariable(&state[0], "scale") == NULL)
3954 	{
3955 		for (i = 0; i < nclients; i++)
3956 		{
3957 			if (!putVariableInt(&state[i], "startup", "scale", scale))
3958 				exit(1);
3959 		}
3960 	}
3961 
3962 	/*
3963 	 * Define a :client_id variable that is unique per connection. But don't
3964 	 * override an explicit -D switch.
3965 	 */
3966 	if (lookupVariable(&state[0], "client_id") == NULL)
3967 	{
3968 		for (i = 0; i < nclients; i++)
3969 		{
3970 			if (!putVariableInt(&state[i], "startup", "client_id", i))
3971 				exit(1);
3972 		}
3973 	}
3974 
3975 	if (!is_no_vacuum)
3976 	{
3977 		fprintf(stderr, "starting vacuum...");
3978 		tryExecuteStatement(con, "vacuum pgbench_branches");
3979 		tryExecuteStatement(con, "vacuum pgbench_tellers");
3980 		tryExecuteStatement(con, "truncate pgbench_history");
3981 		fprintf(stderr, "end.\n");
3982 
3983 		if (do_vacuum_accounts)
3984 		{
3985 			fprintf(stderr, "starting vacuum pgbench_accounts...");
3986 			tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
3987 			fprintf(stderr, "end.\n");
3988 		}
3989 	}
3990 	PQfinish(con);
3991 
3992 	/* set random seed */
3993 	INSTR_TIME_SET_CURRENT(start_time);
3994 	srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
3995 
3996 	/* set up thread data structures */
3997 	threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
3998 	nclients_dealt = 0;
3999 
4000 	for (i = 0; i < nthreads; i++)
4001 	{
4002 		TState	   *thread = &threads[i];
4003 
4004 		thread->tid = i;
4005 		thread->state = &state[nclients_dealt];
4006 		thread->nstate =
4007 			(nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
4008 		thread->random_state[0] = random();
4009 		thread->random_state[1] = random();
4010 		thread->random_state[2] = random();
4011 		thread->logfile = NULL; /* filled in later */
4012 		thread->latency_late = 0;
4013 		initStats(&thread->stats, 0.0);
4014 
4015 		nclients_dealt += thread->nstate;
4016 	}
4017 
4018 	/* all clients must be assigned to a thread */
4019 	Assert(nclients_dealt == nclients);
4020 
4021 	/* get start up time */
4022 	INSTR_TIME_SET_CURRENT(start_time);
4023 
4024 	/* set alarm if duration is specified. */
4025 	if (duration > 0)
4026 		setalarm(duration);
4027 
4028 	/* start threads */
4029 #ifdef ENABLE_THREAD_SAFETY
4030 	for (i = 0; i < nthreads; i++)
4031 	{
4032 		TState	   *thread = &threads[i];
4033 
4034 		INSTR_TIME_SET_CURRENT(thread->start_time);
4035 
4036 		/* compute when to stop */
4037 		if (duration > 0)
4038 			end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
4039 				(int64) 1000000 *duration;
4040 
4041 		/* the first thread (i = 0) is executed by main thread */
4042 		if (i > 0)
4043 		{
4044 			int			err = pthread_create(&thread->thread, NULL, threadRun, thread);
4045 
4046 			if (err != 0 || thread->thread == INVALID_THREAD)
4047 			{
4048 				fprintf(stderr, "could not create thread: %s\n", strerror(err));
4049 				exit(1);
4050 			}
4051 		}
4052 		else
4053 		{
4054 			thread->thread = INVALID_THREAD;
4055 		}
4056 	}
4057 #else
4058 	INSTR_TIME_SET_CURRENT(threads[0].start_time);
4059 	/* compute when to stop */
4060 	if (duration > 0)
4061 		end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
4062 			(int64) 1000000 *duration;
4063 	threads[0].thread = INVALID_THREAD;
4064 #endif   /* ENABLE_THREAD_SAFETY */
4065 
4066 	/* wait for threads and accumulate results */
4067 	initStats(&stats, 0.0);
4068 	INSTR_TIME_SET_ZERO(conn_total_time);
4069 	for (i = 0; i < nthreads; i++)
4070 	{
4071 		TState	   *thread = &threads[i];
4072 
4073 #ifdef ENABLE_THREAD_SAFETY
4074 		if (threads[i].thread == INVALID_THREAD)
4075 			/* actually run this thread directly in the main thread */
4076 			(void) threadRun(thread);
4077 		else
4078 			/* wait of other threads. should check that 0 is returned? */
4079 			pthread_join(thread->thread, NULL);
4080 #else
4081 		(void) threadRun(thread);
4082 #endif   /* ENABLE_THREAD_SAFETY */
4083 
4084 		/* aggregate thread level stats */
4085 		mergeSimpleStats(&stats.latency, &thread->stats.latency);
4086 		mergeSimpleStats(&stats.lag, &thread->stats.lag);
4087 		stats.cnt += thread->stats.cnt;
4088 		stats.skipped += thread->stats.skipped;
4089 		latency_late += thread->latency_late;
4090 		INSTR_TIME_ADD(conn_total_time, thread->conn_time);
4091 	}
4092 	disconnect_all(state, nclients);
4093 
4094 	/*
4095 	 * XXX We compute results as though every client of every thread started
4096 	 * and finished at the same time.  That model can diverge noticeably from
4097 	 * reality for a short benchmark run involving relatively many threads.
4098 	 * The first thread may process notably many transactions before the last
4099 	 * thread begins.  Improving the model alone would bring limited benefit,
4100 	 * because performance during those periods of partial thread count can
4101 	 * easily exceed steady state performance.  This is one of the many ways
4102 	 * short runs convey deceptive performance figures.
4103 	 */
4104 	INSTR_TIME_SET_CURRENT(total_time);
4105 	INSTR_TIME_SUBTRACT(total_time, start_time);
4106 	printResults(threads, &stats, total_time, conn_total_time, latency_late);
4107 
4108 	return 0;
4109 }
4110 
4111 static void *
threadRun(void * arg)4112 threadRun(void *arg)
4113 {
4114 	TState	   *thread = (TState *) arg;
4115 	CState	   *state = thread->state;
4116 	instr_time	start,
4117 				end;
4118 	int			nstate = thread->nstate;
4119 	int			remains = nstate;		/* number of remaining clients */
4120 	int			i;
4121 
4122 	/* for reporting progress: */
4123 	int64		thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
4124 	int64		last_report = thread_start;
4125 	int64		next_report = last_report + (int64) progress * 1000000;
4126 	StatsData	last,
4127 				aggs;
4128 
4129 	/*
4130 	 * Initialize throttling rate target for all of the thread's clients.  It
4131 	 * might be a little more accurate to reset thread->start_time here too.
4132 	 * The possible drift seems too small relative to typical throttle delay
4133 	 * times to worry about it.
4134 	 */
4135 	INSTR_TIME_SET_CURRENT(start);
4136 	thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
4137 
4138 	INSTR_TIME_SET_ZERO(thread->conn_time);
4139 
4140 	/* open log file if requested */
4141 	if (use_log)
4142 	{
4143 		char		logpath[64];
4144 
4145 		if (thread->tid == 0)
4146 			snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
4147 		else
4148 			snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
4149 		thread->logfile = fopen(logpath, "w");
4150 
4151 		if (thread->logfile == NULL)
4152 		{
4153 			fprintf(stderr, "could not open logfile \"%s\": %s\n",
4154 					logpath, strerror(errno));
4155 			goto done;
4156 		}
4157 	}
4158 
4159 	if (!is_connect)
4160 	{
4161 		/* make connections to the database */
4162 		for (i = 0; i < nstate; i++)
4163 		{
4164 			if ((state[i].con = doConnect()) == NULL)
4165 				goto done;
4166 		}
4167 	}
4168 
4169 	/* time after thread and connections set up */
4170 	INSTR_TIME_SET_CURRENT(thread->conn_time);
4171 	INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
4172 
4173 	initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
4174 	last = aggs;
4175 
4176 	/* send start up queries in async manner */
4177 	for (i = 0; i < nstate; i++)
4178 	{
4179 		CState	   *st = &state[i];
4180 		int			prev_ecnt = st->ecnt;
4181 		Command   **commands;
4182 
4183 		st->use_file = chooseScript(thread);
4184 		commands = sql_script[st->use_file].commands;
4185 		if (debug)
4186 			fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
4187 					sql_script[st->use_file].desc);
4188 		if (!doCustom(thread, st, &aggs))
4189 			remains--;			/* I've aborted */
4190 
4191 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
4192 		{
4193 			fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
4194 					i, st->state);
4195 			remains--;			/* I've aborted */
4196 			PQfinish(st->con);
4197 			st->con = NULL;
4198 		}
4199 	}
4200 
4201 	while (remains > 0)
4202 	{
4203 		fd_set		input_mask;
4204 		int			maxsock;	/* max socket number to be waited */
4205 		int64		now_usec = 0;
4206 		int64		min_usec;
4207 
4208 		FD_ZERO(&input_mask);
4209 
4210 		maxsock = -1;
4211 		min_usec = PG_INT64_MAX;
4212 		for (i = 0; i < nstate; i++)
4213 		{
4214 			CState	   *st = &state[i];
4215 			Command   **commands = sql_script[st->use_file].commands;
4216 			int			sock;
4217 
4218 			if (st->con == NULL)
4219 			{
4220 				continue;
4221 			}
4222 			else if (st->sleeping)
4223 			{
4224 				if (st->throttling && timer_exceeded)
4225 				{
4226 					/* interrupt client which has not started a transaction */
4227 					remains--;
4228 					st->sleeping = false;
4229 					st->throttling = false;
4230 					PQfinish(st->con);
4231 					st->con = NULL;
4232 					continue;
4233 				}
4234 				else	/* just a nap from the script */
4235 				{
4236 					int			this_usec;
4237 
4238 					if (min_usec == PG_INT64_MAX)
4239 					{
4240 						instr_time	now;
4241 
4242 						INSTR_TIME_SET_CURRENT(now);
4243 						now_usec = INSTR_TIME_GET_MICROSEC(now);
4244 					}
4245 
4246 					this_usec = st->txn_scheduled - now_usec;
4247 					if (min_usec > this_usec)
4248 						min_usec = this_usec;
4249 				}
4250 			}
4251 			else if (commands[st->state]->type == META_COMMAND)
4252 			{
4253 				min_usec = 0;	/* the connection is ready to run */
4254 				break;
4255 			}
4256 
4257 			sock = PQsocket(st->con);
4258 			if (sock < 0)
4259 			{
4260 				fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
4261 				goto done;
4262 			}
4263 
4264 			FD_SET(sock, &input_mask);
4265 
4266 			if (maxsock < sock)
4267 				maxsock = sock;
4268 		}
4269 
4270 		/* also wake up to print the next progress report on time */
4271 		if (progress && min_usec > 0 && thread->tid == 0)
4272 		{
4273 			/* get current time if needed */
4274 			if (now_usec == 0)
4275 			{
4276 				instr_time	now;
4277 
4278 				INSTR_TIME_SET_CURRENT(now);
4279 				now_usec = INSTR_TIME_GET_MICROSEC(now);
4280 			}
4281 
4282 			if (now_usec >= next_report)
4283 				min_usec = 0;
4284 			else if ((next_report - now_usec) < min_usec)
4285 				min_usec = next_report - now_usec;
4286 		}
4287 
4288 		/*
4289 		 * Sleep until we receive data from the server, or a nap-time
4290 		 * specified in the script ends, or it's time to print a progress
4291 		 * report.
4292 		 */
4293 		if (min_usec > 0 && maxsock != -1)
4294 		{
4295 			int			nsocks; /* return from select(2) */
4296 
4297 			if (min_usec != PG_INT64_MAX)
4298 			{
4299 				struct timeval timeout;
4300 
4301 				timeout.tv_sec = min_usec / 1000000;
4302 				timeout.tv_usec = min_usec % 1000000;
4303 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
4304 			}
4305 			else
4306 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
4307 			if (nsocks < 0)
4308 			{
4309 				if (errno == EINTR)
4310 					continue;
4311 				/* must be something wrong */
4312 				fprintf(stderr, "select() failed: %s\n", strerror(errno));
4313 				goto done;
4314 			}
4315 		}
4316 
4317 		/* ok, backend returns reply */
4318 		for (i = 0; i < nstate; i++)
4319 		{
4320 			CState	   *st = &state[i];
4321 			Command   **commands = sql_script[st->use_file].commands;
4322 			int			prev_ecnt = st->ecnt;
4323 
4324 			if (st->con)
4325 			{
4326 				int			sock = PQsocket(st->con);
4327 
4328 				if (sock < 0)
4329 				{
4330 					fprintf(stderr, "invalid socket: %s",
4331 							PQerrorMessage(st->con));
4332 					goto done;
4333 				}
4334 				if (FD_ISSET(sock, &input_mask) ||
4335 					commands[st->state]->type == META_COMMAND)
4336 				{
4337 					if (!doCustom(thread, st, &aggs))
4338 						remains--;		/* I've aborted */
4339 				}
4340 			}
4341 			else if (is_connect && st->sleeping)
4342 			{
4343 				/* it is sleeping for throttling, maybe it is done, let us try */
4344 				if (!doCustom(thread, st, &aggs))
4345 					remains--;
4346 			}
4347 
4348 			if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
4349 			{
4350 				fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
4351 						i, st->state);
4352 				remains--;		/* I've aborted */
4353 				PQfinish(st->con);
4354 				st->con = NULL;
4355 			}
4356 		}
4357 
4358 		/* progress report by thread 0 for all threads */
4359 		if (progress && thread->tid == 0)
4360 		{
4361 			instr_time	now_time;
4362 			int64		now;
4363 
4364 			INSTR_TIME_SET_CURRENT(now_time);
4365 			now = INSTR_TIME_GET_MICROSEC(now_time);
4366 			if (now >= next_report)
4367 			{
4368 				/* generate and show report */
4369 				StatsData	cur;
4370 				int64		run = now - last_report;
4371 				double		tps,
4372 							total_run,
4373 							latency,
4374 							sqlat,
4375 							lag,
4376 							stdev;
4377 				char		tbuf[64];
4378 
4379 				/*
4380 				 * Add up the statistics of all threads.
4381 				 *
4382 				 * XXX: No locking. There is no guarantee that we get an
4383 				 * atomic snapshot of the transaction count and latencies, so
4384 				 * these figures can well be off by a small amount. The
4385 				 * progress is report's purpose is to give a quick overview of
4386 				 * how the test is going, so that shouldn't matter too much.
4387 				 * (If a read from a 64-bit integer is not atomic, you might
4388 				 * get a "torn" read and completely bogus latencies though!)
4389 				 */
4390 				initStats(&cur, 0.0);
4391 				for (i = 0; i < nthreads; i++)
4392 				{
4393 					mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
4394 					mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
4395 					cur.cnt += thread[i].stats.cnt;
4396 					cur.skipped += thread[i].stats.skipped;
4397 				}
4398 
4399 				total_run = (now - thread_start) / 1000000.0;
4400 				tps = 1000000.0 * (cur.cnt - last.cnt) / run;
4401 				latency = 0.001 * (cur.latency.sum - last.latency.sum) /
4402 					(cur.cnt - last.cnt);
4403 				sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
4404 					/ (cur.cnt - last.cnt);
4405 				stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
4406 				lag = 0.001 * (cur.lag.sum - last.lag.sum) /
4407 					(cur.cnt - last.cnt);
4408 
4409 				if (progress_timestamp)
4410 					sprintf(tbuf, "%.03f s",
4411 							INSTR_TIME_GET_MILLISEC(now_time) / 1000.0);
4412 				else
4413 					sprintf(tbuf, "%.1f s", total_run);
4414 
4415 				fprintf(stderr,
4416 						"progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
4417 						tbuf, tps, latency, stdev);
4418 
4419 				if (throttle_delay)
4420 				{
4421 					fprintf(stderr, ", lag %.3f ms", lag);
4422 					if (latency_limit)
4423 						fprintf(stderr, ", " INT64_FORMAT " skipped",
4424 								cur.skipped - last.skipped);
4425 				}
4426 				fprintf(stderr, "\n");
4427 
4428 				last = cur;
4429 				last_report = now;
4430 
4431 				/*
4432 				 * Ensure that the next report is in the future, in case
4433 				 * pgbench/postgres got stuck somewhere.
4434 				 */
4435 				do
4436 				{
4437 					next_report += (int64) progress *1000000;
4438 				} while (now >= next_report);
4439 			}
4440 		}
4441 	}
4442 
4443 done:
4444 	INSTR_TIME_SET_CURRENT(start);
4445 	disconnect_all(state, nstate);
4446 	INSTR_TIME_SET_CURRENT(end);
4447 	INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
4448 	if (thread->logfile)
4449 	{
4450 		if (agg_interval)
4451 		{
4452 			/* log aggregated but not yet reported transactions */
4453 			doLog(thread, state, &end, &aggs, false, 0, 0);
4454 		}
4455 		fclose(thread->logfile);
4456 	}
4457 	return NULL;
4458 }
4459 
4460 /*
4461  * Support for duration option: set timer_exceeded after so many seconds.
4462  */
4463 
4464 #ifndef WIN32
4465 
4466 static void
handle_sig_alarm(SIGNAL_ARGS)4467 handle_sig_alarm(SIGNAL_ARGS)
4468 {
4469 	timer_exceeded = true;
4470 }
4471 
4472 static void
setalarm(int seconds)4473 setalarm(int seconds)
4474 {
4475 	pqsignal(SIGALRM, handle_sig_alarm);
4476 	alarm(seconds);
4477 }
4478 
4479 #else							/* WIN32 */
4480 
4481 static VOID CALLBACK
win32_timer_callback(PVOID lpParameter,BOOLEAN TimerOrWaitFired)4482 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
4483 {
4484 	timer_exceeded = true;
4485 }
4486 
4487 static void
setalarm(int seconds)4488 setalarm(int seconds)
4489 {
4490 	HANDLE		queue;
4491 	HANDLE		timer;
4492 
4493 	/* This function will be called at most once, so we can cheat a bit. */
4494 	queue = CreateTimerQueue();
4495 	if (seconds > ((DWORD) -1) / 1000 ||
4496 		!CreateTimerQueueTimer(&timer, queue,
4497 							   win32_timer_callback, NULL, seconds * 1000, 0,
4498 							   WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
4499 	{
4500 		fprintf(stderr, "failed to set timer\n");
4501 		exit(1);
4502 	}
4503 }
4504 
4505 /* partial pthread implementation for Windows */
4506 
4507 typedef struct win32_pthread
4508 {
4509 	HANDLE		handle;
4510 	void	   *(*routine) (void *);
4511 	void	   *arg;
4512 	void	   *result;
4513 } win32_pthread;
4514 
4515 static unsigned __stdcall
win32_pthread_run(void * arg)4516 win32_pthread_run(void *arg)
4517 {
4518 	win32_pthread *th = (win32_pthread *) arg;
4519 
4520 	th->result = th->routine(th->arg);
4521 
4522 	return 0;
4523 }
4524 
4525 static int
pthread_create(pthread_t * thread,pthread_attr_t * attr,void * (* start_routine)(void *),void * arg)4526 pthread_create(pthread_t *thread,
4527 			   pthread_attr_t *attr,
4528 			   void *(*start_routine) (void *),
4529 			   void *arg)
4530 {
4531 	int			save_errno;
4532 	win32_pthread *th;
4533 
4534 	th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
4535 	th->routine = start_routine;
4536 	th->arg = arg;
4537 	th->result = NULL;
4538 
4539 	th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
4540 	if (th->handle == NULL)
4541 	{
4542 		save_errno = errno;
4543 		free(th);
4544 		return save_errno;
4545 	}
4546 
4547 	*thread = th;
4548 	return 0;
4549 }
4550 
4551 static int
pthread_join(pthread_t th,void ** thread_return)4552 pthread_join(pthread_t th, void **thread_return)
4553 {
4554 	if (th == NULL || th->handle == NULL)
4555 		return errno = EINVAL;
4556 
4557 	if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
4558 	{
4559 		_dosmaperr(GetLastError());
4560 		return errno;
4561 	}
4562 
4563 	if (thread_return)
4564 		*thread_return = th->result;
4565 
4566 	CloseHandle(th->handle);
4567 	free(th);
4568 	return 0;
4569 }
4570 
4571 #endif   /* WIN32 */
4572