1 /*
2 * pgbench.c
3 *
4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
6 *
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2018, PostgreSQL Global Development Group
9 * ALL RIGHTS RESERVED;
10 *
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
15 *
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
21 *
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27 *
28 */
29
30 #ifdef WIN32
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
32 #endif /* ! WIN32 */
33
34 #include "postgres_fe.h"
35 #include "fe_utils/conditional.h"
36
37 #include "getopt_long.h"
38 #include "libpq-fe.h"
39 #include "portability/instr_time.h"
40
41 #include <ctype.h>
42 #include <float.h>
43 #include <limits.h>
44 #include <math.h>
45 #include <signal.h>
46 #include <time.h>
47 #include <sys/time.h>
48 #ifdef HAVE_SYS_SELECT_H
49 #include <sys/select.h>
50 #endif
51
52 #ifdef HAVE_SYS_RESOURCE_H
53 #include <sys/resource.h> /* for getrlimit */
54 #endif
55
56 #ifndef M_PI
57 #define M_PI 3.14159265358979323846
58 #endif
59
60 #include "pgbench.h"
61
62 #define ERRCODE_UNDEFINED_TABLE "42P01"
63
64 /*
65 * Hashing constants
66 */
67 #define FNV_PRIME UINT64CONST(0x100000001b3)
68 #define FNV_OFFSET_BASIS UINT64CONST(0xcbf29ce484222325)
69 #define MM2_MUL UINT64CONST(0xc6a4a7935bd1e995)
70 #define MM2_MUL_TIMES_8 UINT64CONST(0x35253c9ade8f4ca8)
71 #define MM2_ROT 47
72
73 /*
74 * Multi-platform pthread implementations
75 */
76
77 #ifdef WIN32
78 /* Use native win32 threads on Windows */
79 typedef struct win32_pthread *pthread_t;
80 typedef int pthread_attr_t;
81
82 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
83 static int pthread_join(pthread_t th, void **thread_return);
84 #elif defined(ENABLE_THREAD_SAFETY)
85 /* Use platform-dependent pthread capability */
86 #include <pthread.h>
87 #else
88 /* No threads implementation, use none (-j 1) */
89 #define pthread_t void *
90 #endif
91
92
93 /********************************************************************
94 * some configurable parameters */
95
96 /* max number of clients allowed */
97 #ifdef FD_SETSIZE
98 #define MAXCLIENTS (FD_SETSIZE - 10)
99 #else
100 #define MAXCLIENTS 1024
101 #endif
102
103 #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
104
105 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
106 #define DEFAULT_NXACTS 10 /* default nxacts */
107
108 #define ZIPF_CACHE_SIZE 15 /* cache cells number */
109
110 #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
111 #define MAX_ZIPFIAN_PARAM 1000 /* maximum parameter for zipfian */
112
113 int nxacts = 0; /* number of transactions per client */
114 int duration = 0; /* duration in seconds */
115 int64 end_time = 0; /* when to stop in micro seconds, under -T */
116
117 /*
118 * scaling factor. for example, scale = 10 will make 1000000 tuples in
119 * pgbench_accounts table.
120 */
121 int scale = 1;
122
123 /*
124 * fillfactor. for example, fillfactor = 90 will use only 90 percent
125 * space during inserts and leave 10 percent free.
126 */
127 int fillfactor = 100;
128
129 /*
130 * use unlogged tables?
131 */
132 bool unlogged_tables = false;
133
134 /*
135 * log sampling rate (1.0 = log everything, 0.0 = option not given)
136 */
137 double sample_rate = 0.0;
138
139 /*
140 * When threads are throttled to a given rate limit, this is the target delay
141 * to reach that rate in usec. 0 is the default and means no throttling.
142 */
143 int64 throttle_delay = 0;
144
145 /*
146 * Transactions which take longer than this limit (in usec) are counted as
147 * late, and reported as such, although they are completed anyway. When
148 * throttling is enabled, execution time slots that are more than this late
149 * are skipped altogether, and counted separately.
150 */
151 int64 latency_limit = 0;
152
153 /*
154 * tablespace selection
155 */
156 char *tablespace = NULL;
157 char *index_tablespace = NULL;
158
159 /* random seed used to initialize base_random_sequence */
160 int64 random_seed = -1;
161
162 /*
163 * end of configurable parameters
164 *********************************************************************/
165
166 #define nbranches 1 /* Makes little sense to change this. Change
167 * -s instead */
168 #define ntellers 10
169 #define naccounts 100000
170
171 /*
172 * The scale factor at/beyond which 32bit integers are incapable of storing
173 * 64bit values.
174 *
175 * Although the actual threshold is 21474, we use 20000 because it is easier to
176 * document and remember, and isn't that far away from the real threshold.
177 */
178 #define SCALE_32BIT_THRESHOLD 20000
179
180 bool use_log; /* log transaction latencies to a file */
181 bool use_quiet; /* quiet logging onto stderr */
182 int agg_interval; /* log aggregates instead of individual
183 * transactions */
184 bool per_script_stats = false; /* whether to collect stats per script */
185 int progress = 0; /* thread progress report every this seconds */
186 bool progress_timestamp = false; /* progress report with Unix time */
187 int nclients = 1; /* number of clients */
188 int nthreads = 1; /* number of threads */
189 bool is_connect; /* establish connection for each transaction */
190 bool is_latencies; /* report per-command latencies */
191 int main_pid; /* main process id used in log filename */
192
193 char *pghost = "";
194 char *pgport = "";
195 char *login = NULL;
196 char *dbName;
197 char *logfile_prefix = NULL;
198 const char *progname;
199
200 #define WSEP '@' /* weight separator */
201
202 volatile bool timer_exceeded = false; /* flag from signal handler */
203
204 /*
205 * Variable definitions.
206 *
207 * If a variable only has a string value, "svalue" is that value, and value is
208 * "not set". If the value is known, "value" contains the value (in any
209 * variant).
210 *
211 * In this case "svalue" contains the string equivalent of the value, if we've
212 * had occasion to compute that, or NULL if we haven't.
213 */
214 typedef struct
215 {
216 char *name; /* variable's name */
217 char *svalue; /* its value in string form, if known */
218 PgBenchValue value; /* actual variable's value */
219 } Variable;
220
221 #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
222 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
223
224 /*
225 * Simple data structure to keep stats about something.
226 *
227 * XXX probably the first value should be kept and used as an offset for
228 * better numerical stability...
229 */
230 typedef struct SimpleStats
231 {
232 int64 count; /* how many values were encountered */
233 double min; /* the minimum seen */
234 double max; /* the maximum seen */
235 double sum; /* sum of values */
236 double sum2; /* sum of squared values */
237 } SimpleStats;
238
239 /*
240 * Data structure to hold various statistics: per-thread and per-script stats
241 * are maintained and merged together.
242 */
243 typedef struct StatsData
244 {
245 time_t start_time; /* interval start time, for aggregates */
246 int64 cnt; /* number of transactions, including skipped */
247 int64 skipped; /* number of transactions skipped under --rate
248 * and --latency-limit */
249 SimpleStats latency;
250 SimpleStats lag;
251 } StatsData;
252
253 /* Various random sequences are initialized from this one. */
254 static unsigned short base_random_sequence[3];
255
256 /*
257 * Connection state machine states.
258 */
259 typedef enum
260 {
261 /*
262 * The client must first choose a script to execute. Once chosen, it can
263 * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
264 * right away (state CSTATE_START_TX).
265 */
266 CSTATE_CHOOSE_SCRIPT,
267
268 /*
269 * In CSTATE_START_THROTTLE state, we calculate when to begin the next
270 * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
271 * sleeps until that moment. (If throttling is not enabled, doCustom()
272 * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
273 */
274 CSTATE_START_THROTTLE,
275 CSTATE_THROTTLE,
276
277 /*
278 * CSTATE_START_TX performs start-of-transaction processing. Establishes
279 * a new connection for the transaction, in --connect mode, and records
280 * the transaction start time.
281 */
282 CSTATE_START_TX,
283
284 /*
285 * We loop through these states, to process each command in the script:
286 *
287 * CSTATE_START_COMMAND starts the execution of a command. On a SQL
288 * command, the command is sent to the server, and we move to
289 * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
290 * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
291 * meta-commands are executed immediately.
292 *
293 * CSTATE_SKIP_COMMAND for conditional branches which are not executed,
294 * quickly skip commands that do not need any evaluation.
295 *
296 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
297 * for the current command.
298 *
299 * CSTATE_SLEEP waits until the end of \sleep.
300 *
301 * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
302 * command counter, and loops back to CSTATE_START_COMMAND state.
303 */
304 CSTATE_START_COMMAND,
305 CSTATE_SKIP_COMMAND,
306 CSTATE_WAIT_RESULT,
307 CSTATE_SLEEP,
308 CSTATE_END_COMMAND,
309
310 /*
311 * CSTATE_END_TX performs end-of-transaction processing. Calculates
312 * latency, and logs the transaction. In --connect mode, closes the
313 * current connection. Chooses the next script to execute and starts over
314 * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
315 * more work to do.
316 */
317 CSTATE_END_TX,
318
319 /*
320 * Final states. CSTATE_ABORTED means that the script execution was
321 * aborted because a command failed, CSTATE_FINISHED means success.
322 */
323 CSTATE_ABORTED,
324 CSTATE_FINISHED
325 } ConnectionStateEnum;
326
327 /*
328 * Connection state.
329 */
330 typedef struct
331 {
332 PGconn *con; /* connection handle to DB */
333 int id; /* client No. */
334 ConnectionStateEnum state; /* state machine's current state. */
335 ConditionalStack cstack; /* enclosing conditionals state */
336
337 int use_file; /* index in sql_script for this client */
338 int command; /* command number in script */
339
340 /* client variables */
341 Variable *variables; /* array of variable definitions */
342 int nvariables; /* number of variables */
343 bool vars_sorted; /* are variables sorted by name? */
344
345 /* various times about current transaction */
346 int64 txn_scheduled; /* scheduled start time of transaction (usec) */
347 int64 sleep_until; /* scheduled start time of next cmd (usec) */
348 instr_time txn_begin; /* used for measuring schedule lag times */
349 instr_time stmt_begin; /* used for measuring statement latencies */
350
351 bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
352
353 /* per client collected stats */
354 int64 cnt; /* client transaction count, for -t */
355 int ecnt; /* error count */
356 } CState;
357
358 /*
359 * Cache cell for random_zipfian call
360 */
361 typedef struct
362 {
363 /* cell keys */
364 double s; /* s - parameter of random_zipfian function */
365 int64 n; /* number of elements in range (max - min + 1) */
366
367 double harmonicn; /* generalizedHarmonicNumber(n, s) */
368 double alpha;
369 double beta;
370 double eta;
371
372 uint64 last_used; /* last used logical time */
373 } ZipfCell;
374
375 /*
376 * Zipf cache for zeta values
377 */
378 typedef struct
379 {
380 uint64 current; /* counter for LRU cache replacement algorithm */
381
382 int nb_cells; /* number of filled cells */
383 int overflowCount; /* number of cache overflows */
384 ZipfCell cells[ZIPF_CACHE_SIZE];
385 } ZipfCache;
386
387 /*
388 * Thread state
389 */
390 typedef struct
391 {
392 int tid; /* thread id */
393 pthread_t thread; /* thread handle */
394 CState *state; /* array of CState */
395 int nstate; /* length of state[] */
396 unsigned short random_state[3]; /* separate randomness for each thread */
397 int64 throttle_trigger; /* previous/next throttling (us) */
398 FILE *logfile; /* where to log, or NULL */
399 ZipfCache zipf_cache; /* for thread-safe zipfian random number
400 * generation */
401
402 /* per thread collected stats */
403 instr_time start_time; /* thread start time */
404 instr_time conn_time;
405 StatsData stats;
406 int64 latency_late; /* executed but late transactions */
407 } TState;
408
409 #define INVALID_THREAD ((pthread_t) 0)
410
411 /*
412 * queries read from files
413 */
414 #define SQL_COMMAND 1
415 #define META_COMMAND 2
416 #define MAX_ARGS 10
417
418 typedef enum MetaCommand
419 {
420 META_NONE, /* not a known meta-command */
421 META_SET, /* \set */
422 META_SETSHELL, /* \setshell */
423 META_SHELL, /* \shell */
424 META_SLEEP, /* \sleep */
425 META_IF, /* \if */
426 META_ELIF, /* \elif */
427 META_ELSE, /* \else */
428 META_ENDIF /* \endif */
429 } MetaCommand;
430
431 typedef enum QueryMode
432 {
433 QUERY_SIMPLE, /* simple query */
434 QUERY_EXTENDED, /* extended query */
435 QUERY_PREPARED, /* extended query with prepared statements */
436 NUM_QUERYMODE
437 } QueryMode;
438
439 static QueryMode querymode = QUERY_SIMPLE;
440 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
441
442 typedef struct
443 {
444 char *line; /* text of command line */
445 int command_num; /* unique index of this Command struct */
446 int type; /* command type (SQL_COMMAND or META_COMMAND) */
447 MetaCommand meta; /* meta command identifier, or META_NONE */
448 int argc; /* number of command words */
449 char *argv[MAX_ARGS]; /* command word list */
450 PgBenchExpr *expr; /* parsed expression, if needed */
451 SimpleStats stats; /* time spent in this command */
452 } Command;
453
454 typedef struct ParsedScript
455 {
456 const char *desc; /* script descriptor (eg, file name) */
457 int weight; /* selection weight */
458 Command **commands; /* NULL-terminated array of Commands */
459 StatsData stats; /* total time spent in script */
460 } ParsedScript;
461
462 static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
463 static int num_scripts; /* number of scripts in sql_script[] */
464 static int num_commands = 0; /* total number of Command structs */
465 static int64 total_weight = 0;
466
467 static int debug = 0; /* debug flag */
468
469 /* Builtin test scripts */
470 typedef struct BuiltinScript
471 {
472 const char *name; /* very short name for -b ... */
473 const char *desc; /* short description */
474 const char *script; /* actual pgbench script */
475 } BuiltinScript;
476
477 static const BuiltinScript builtin_script[] =
478 {
479 {
480 "tpcb-like",
481 "<builtin: TPC-B (sort of)>",
482 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
483 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
484 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
485 "\\set delta random(-5000, 5000)\n"
486 "BEGIN;\n"
487 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
488 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
489 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
490 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
491 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
492 "END;\n"
493 },
494 {
495 "simple-update",
496 "<builtin: simple update>",
497 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
498 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
499 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
500 "\\set delta random(-5000, 5000)\n"
501 "BEGIN;\n"
502 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
503 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
504 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
505 "END;\n"
506 },
507 {
508 "select-only",
509 "<builtin: select only>",
510 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
511 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
512 }
513 };
514
515
516 /* Function prototypes */
517 static void setNullValue(PgBenchValue *pv);
518 static void setBoolValue(PgBenchValue *pv, bool bval);
519 static void setIntValue(PgBenchValue *pv, int64 ival);
520 static void setDoubleValue(PgBenchValue *pv, double dval);
521 static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *);
522 static void doLog(TState *thread, CState *st,
523 StatsData *agg, bool skipped, double latency, double lag);
524 static void processXactStats(TState *thread, CState *st, instr_time *now,
525 bool skipped, StatsData *agg);
526 static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
527 static void addScript(ParsedScript script);
528 static void *threadRun(void *arg);
529 static void setalarm(int seconds);
530 static void finishCon(CState *st);
531
532
533 /* callback functions for our flex lexer */
534 static const PsqlScanCallbacks pgbench_callbacks = {
535 NULL, /* don't need get_variable functionality */
536 pgbench_error
537 };
538
539
540 static void
usage(void)541 usage(void)
542 {
543 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
544 "Usage:\n"
545 " %s [OPTION]... [DBNAME]\n"
546 "\nInitialization options:\n"
547 " -i, --initialize invokes initialization mode\n"
548 " -I, --init-steps=[dtgvpf]+ (default \"dtgvp\")\n"
549 " run selected initialization steps\n"
550 " -F, --fillfactor=NUM set fill factor\n"
551 " -n, --no-vacuum do not run VACUUM during initialization\n"
552 " -q, --quiet quiet logging (one message each 5 seconds)\n"
553 " -s, --scale=NUM scaling factor\n"
554 " --foreign-keys create foreign key constraints between tables\n"
555 " --index-tablespace=TABLESPACE\n"
556 " create indexes in the specified tablespace\n"
557 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
558 " --unlogged-tables create tables as unlogged tables\n"
559 "\nOptions to select what to run:\n"
560 " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
561 " (use \"-b list\" to list available scripts)\n"
562 " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
563 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
564 " (same as \"-b simple-update\")\n"
565 " -S, --select-only perform SELECT-only transactions\n"
566 " (same as \"-b select-only\")\n"
567 "\nBenchmarking options:\n"
568 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
569 " -C, --connect establish new connection for each transaction\n"
570 " -D, --define=VARNAME=VALUE\n"
571 " define variable for use by custom script\n"
572 " -j, --jobs=NUM number of threads (default: 1)\n"
573 " -l, --log write transaction times to log file\n"
574 " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
575 " -M, --protocol=simple|extended|prepared\n"
576 " protocol for submitting queries (default: simple)\n"
577 " -n, --no-vacuum do not run VACUUM before tests\n"
578 " -P, --progress=NUM show thread progress report every NUM seconds\n"
579 " -r, --report-latencies report average latency per command\n"
580 " -R, --rate=NUM target rate in transactions per second\n"
581 " -s, --scale=NUM report this scale factor in output\n"
582 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
583 " -T, --time=NUM duration of benchmark test in seconds\n"
584 " -v, --vacuum-all vacuum all four standard tables before tests\n"
585 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
586 " --log-prefix=PREFIX prefix for transaction time log file\n"
587 " (default: \"pgbench_log\")\n"
588 " --progress-timestamp use Unix epoch timestamps for progress\n"
589 " --random-seed=SEED set random seed (\"time\", \"rand\", integer)\n"
590 " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
591 "\nCommon options:\n"
592 " -d, --debug print debugging output\n"
593 " -h, --host=HOSTNAME database server host or socket directory\n"
594 " -p, --port=PORT database server port number\n"
595 " -U, --username=USERNAME connect as specified database user\n"
596 " -V, --version output version information, then exit\n"
597 " -?, --help show this help, then exit\n"
598 "\n"
599 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
600 progname, progname);
601 }
602
603 /* return whether str matches "^\s*[-+]?[0-9]+$" */
604 static bool
is_an_int(const char * str)605 is_an_int(const char *str)
606 {
607 const char *ptr = str;
608
609 /* skip leading spaces; cast is consistent with strtoint64 */
610 while (*ptr && isspace((unsigned char) *ptr))
611 ptr++;
612
613 /* skip sign */
614 if (*ptr == '+' || *ptr == '-')
615 ptr++;
616
617 /* at least one digit */
618 if (*ptr && !isdigit((unsigned char) *ptr))
619 return false;
620
621 /* eat all digits */
622 while (*ptr && isdigit((unsigned char) *ptr))
623 ptr++;
624
625 /* must have reached end of string */
626 return *ptr == '\0';
627 }
628
629
630 /*
631 * strtoint64 -- convert a string to 64-bit integer
632 *
633 * This function is a modified version of scanint8() from
634 * src/backend/utils/adt/int8.c.
635 */
636 int64
strtoint64(const char * str)637 strtoint64(const char *str)
638 {
639 const char *ptr = str;
640 int64 result = 0;
641 int sign = 1;
642
643 /*
644 * Do our own scan, rather than relying on sscanf which might be broken
645 * for long long.
646 */
647
648 /* skip leading spaces */
649 while (*ptr && isspace((unsigned char) *ptr))
650 ptr++;
651
652 /* handle sign */
653 if (*ptr == '-')
654 {
655 ptr++;
656
657 /*
658 * Do an explicit check for INT64_MIN. Ugly though this is, it's
659 * cleaner than trying to get the loop below to handle it portably.
660 */
661 if (strncmp(ptr, "9223372036854775808", 19) == 0)
662 {
663 result = PG_INT64_MIN;
664 ptr += 19;
665 goto gotdigits;
666 }
667 sign = -1;
668 }
669 else if (*ptr == '+')
670 ptr++;
671
672 /* require at least one digit */
673 if (!isdigit((unsigned char) *ptr))
674 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
675
676 /* process digits */
677 while (*ptr && isdigit((unsigned char) *ptr))
678 {
679 int64 tmp = result * 10 + (*ptr++ - '0');
680
681 if ((tmp / 10) != result) /* overflow? */
682 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
683 result = tmp;
684 }
685
686 gotdigits:
687
688 /* allow trailing whitespace, but not other trailing chars */
689 while (*ptr != '\0' && isspace((unsigned char) *ptr))
690 ptr++;
691
692 if (*ptr != '\0')
693 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
694
695 return ((sign < 0) ? -result : result);
696 }
697
698 /*
699 * Random number generator: uniform distribution from min to max inclusive.
700 *
701 * Although the limits are expressed as int64, you can't generate the full
702 * int64 range in one call, because the difference of the limits mustn't
703 * overflow int64. In practice it's unwise to ask for more than an int32
704 * range, because of the limited precision of pg_erand48().
705 */
706 static int64
getrand(TState * thread,int64 min,int64 max)707 getrand(TState *thread, int64 min, int64 max)
708 {
709 /*
710 * Odd coding is so that min and max have approximately the same chance of
711 * being selected as do numbers between them.
712 *
713 * pg_erand48() is thread-safe and concurrent, which is why we use it
714 * rather than random(), which in glibc is non-reentrant, and therefore
715 * protected by a mutex, and therefore a bottleneck on machines with many
716 * CPUs.
717 */
718 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
719 }
720
721 /*
722 * random number generator: exponential distribution from min to max inclusive.
723 * the parameter is so that the density of probability for the last cut-off max
724 * value is exp(-parameter).
725 */
726 static int64
getExponentialRand(TState * thread,int64 min,int64 max,double parameter)727 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
728 {
729 double cut,
730 uniform,
731 rand;
732
733 /* abort if wrong parameter, but must really be checked beforehand */
734 Assert(parameter > 0.0);
735 cut = exp(-parameter);
736 /* erand in [0, 1), uniform in (0, 1] */
737 uniform = 1.0 - pg_erand48(thread->random_state);
738
739 /*
740 * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
741 */
742 Assert((1.0 - cut) != 0.0);
743 rand = -log(cut + (1.0 - cut) * uniform) / parameter;
744 /* return int64 random number within between min and max */
745 return min + (int64) ((max - min + 1) * rand);
746 }
747
748 /* random number generator: gaussian distribution from min to max inclusive */
749 static int64
getGaussianRand(TState * thread,int64 min,int64 max,double parameter)750 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
751 {
752 double stdev;
753 double rand;
754
755 /* abort if parameter is too low, but must really be checked beforehand */
756 Assert(parameter >= MIN_GAUSSIAN_PARAM);
757
758 /*
759 * Get user specified random number from this loop, with -parameter <
760 * stdev <= parameter
761 *
762 * This loop is executed until the number is in the expected range.
763 *
764 * As the minimum parameter is 2.0, the probability of looping is low:
765 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
766 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
767 * the worst case. For a parameter value of 5.0, the looping probability
768 * is about e^{-5} * 2 / pi ~ 0.43%.
769 */
770 do
771 {
772 /*
773 * pg_erand48 generates [0,1), but for the basic version of the
774 * Box-Muller transform the two uniformly distributed random numbers
775 * are expected in (0, 1] (see
776 * https://en.wikipedia.org/wiki/Box-Muller_transform)
777 */
778 double rand1 = 1.0 - pg_erand48(thread->random_state);
779 double rand2 = 1.0 - pg_erand48(thread->random_state);
780
781 /* Box-Muller basic form transform */
782 double var_sqrt = sqrt(-2.0 * log(rand1));
783
784 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
785
786 /*
787 * we may try with cos, but there may be a bias induced if the
788 * previous value fails the test. To be on the safe side, let us try
789 * over.
790 */
791 }
792 while (stdev < -parameter || stdev >= parameter);
793
794 /* stdev is in [-parameter, parameter), normalization to [0,1) */
795 rand = (stdev + parameter) / (parameter * 2.0);
796
797 /* return int64 random number within between min and max */
798 return min + (int64) ((max - min + 1) * rand);
799 }
800
801 /*
802 * random number generator: generate a value, such that the series of values
803 * will approximate a Poisson distribution centered on the given value.
804 */
805 static int64
getPoissonRand(TState * thread,int64 center)806 getPoissonRand(TState *thread, int64 center)
807 {
808 /*
809 * Use inverse transform sampling to generate a value > 0, such that the
810 * expected (i.e. average) value is the given argument.
811 */
812 double uniform;
813
814 /* erand in [0, 1), uniform in (0, 1] */
815 uniform = 1.0 - pg_erand48(thread->random_state);
816
817 return (int64) (-log(uniform) * ((double) center) + 0.5);
818 }
819
820 /* helper function for getZipfianRand */
821 static double
generalizedHarmonicNumber(int64 n,double s)822 generalizedHarmonicNumber(int64 n, double s)
823 {
824 int i;
825 double ans = 0.0;
826
827 for (i = n; i > 1; i--)
828 ans += pow(i, -s);
829 return ans + 1.0;
830 }
831
832 /* set harmonicn and other parameters to cache cell */
833 static void
zipfSetCacheCell(ZipfCell * cell,int64 n,double s)834 zipfSetCacheCell(ZipfCell *cell, int64 n, double s)
835 {
836 double harmonic2;
837
838 cell->n = n;
839 cell->s = s;
840
841 harmonic2 = generalizedHarmonicNumber(2, s);
842 cell->harmonicn = generalizedHarmonicNumber(n, s);
843
844 cell->alpha = 1.0 / (1.0 - s);
845 cell->beta = pow(0.5, s);
846 cell->eta = (1.0 - pow(2.0 / n, 1.0 - s)) / (1.0 - harmonic2 / cell->harmonicn);
847 }
848
849 /*
850 * search for cache cell with keys (n, s)
851 * and create new cell if it does not exist
852 */
853 static ZipfCell *
zipfFindOrCreateCacheCell(ZipfCache * cache,int64 n,double s)854 zipfFindOrCreateCacheCell(ZipfCache *cache, int64 n, double s)
855 {
856 int i,
857 least_recently_used = 0;
858 ZipfCell *cell;
859
860 /* search cached cell for given parameters */
861 for (i = 0; i < cache->nb_cells; i++)
862 {
863 cell = &cache->cells[i];
864 if (cell->n == n && cell->s == s)
865 return &cache->cells[i];
866
867 if (cell->last_used < cache->cells[least_recently_used].last_used)
868 least_recently_used = i;
869 }
870
871 /* create new one if it does not exist */
872 if (cache->nb_cells < ZIPF_CACHE_SIZE)
873 i = cache->nb_cells++;
874 else
875 {
876 /* replace LRU cell if cache is full */
877 i = least_recently_used;
878 cache->overflowCount++;
879 }
880
881 zipfSetCacheCell(&cache->cells[i], n, s);
882
883 cache->cells[i].last_used = cache->current++;
884 return &cache->cells[i];
885 }
886
887 /*
888 * Computing zipfian using rejection method, based on
889 * "Non-Uniform Random Variate Generation",
890 * Luc Devroye, p. 550-551, Springer 1986.
891 */
892 static int64
computeIterativeZipfian(TState * thread,int64 n,double s)893 computeIterativeZipfian(TState *thread, int64 n, double s)
894 {
895 double b = pow(2.0, s - 1.0);
896 double x,
897 t,
898 u,
899 v;
900
901 while (true)
902 {
903 /* random variates */
904 u = pg_erand48(thread->random_state);
905 v = pg_erand48(thread->random_state);
906
907 x = floor(pow(u, -1.0 / (s - 1.0)));
908
909 t = pow(1.0 + 1.0 / x, s - 1.0);
910 /* reject if too large or out of bound */
911 if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
912 break;
913 }
914 return (int64) x;
915 }
916
917 /*
918 * Computing zipfian using harmonic numbers, based on algorithm described in
919 * "Quickly Generating Billion-Record Synthetic Databases",
920 * Jim Gray et al, SIGMOD 1994
921 */
922 static int64
computeHarmonicZipfian(TState * thread,int64 n,double s)923 computeHarmonicZipfian(TState *thread, int64 n, double s)
924 {
925 ZipfCell *cell = zipfFindOrCreateCacheCell(&thread->zipf_cache, n, s);
926 double uniform = pg_erand48(thread->random_state);
927 double uz = uniform * cell->harmonicn;
928
929 if (uz < 1.0)
930 return 1;
931 if (uz < 1.0 + cell->beta)
932 return 2;
933 return 1 + (int64) (cell->n * pow(cell->eta * uniform - cell->eta + 1.0, cell->alpha));
934 }
935
936 /* random number generator: zipfian distribution from min to max inclusive */
937 static int64
getZipfianRand(TState * thread,int64 min,int64 max,double s)938 getZipfianRand(TState *thread, int64 min, int64 max, double s)
939 {
940 int64 n = max - min + 1;
941
942 /* abort if parameter is invalid */
943 Assert(s > 0.0 && s != 1.0 && s <= MAX_ZIPFIAN_PARAM);
944
945
946 return min - 1 + ((s > 1)
947 ? computeIterativeZipfian(thread, n, s)
948 : computeHarmonicZipfian(thread, n, s));
949 }
950
951 /*
952 * FNV-1a hash function
953 */
954 static int64
getHashFnv1a(int64 val,uint64 seed)955 getHashFnv1a(int64 val, uint64 seed)
956 {
957 int64 result;
958 int i;
959
960 result = FNV_OFFSET_BASIS ^ seed;
961 for (i = 0; i < 8; ++i)
962 {
963 int32 octet = val & 0xff;
964
965 val = val >> 8;
966 result = result ^ octet;
967 result = result * FNV_PRIME;
968 }
969
970 return result;
971 }
972
973 /*
974 * Murmur2 hash function
975 *
976 * Based on original work of Austin Appleby
977 * https://github.com/aappleby/smhasher/blob/master/src/MurmurHash2.cpp
978 */
979 static int64
getHashMurmur2(int64 val,uint64 seed)980 getHashMurmur2(int64 val, uint64 seed)
981 {
982 uint64 result = seed ^ MM2_MUL_TIMES_8; /* sizeof(int64) */
983 uint64 k = (uint64) val;
984
985 k *= MM2_MUL;
986 k ^= k >> MM2_ROT;
987 k *= MM2_MUL;
988
989 result ^= k;
990 result *= MM2_MUL;
991
992 result ^= result >> MM2_ROT;
993 result *= MM2_MUL;
994 result ^= result >> MM2_ROT;
995
996 return (int64) result;
997 }
998
999 /*
1000 * Initialize the given SimpleStats struct to all zeroes
1001 */
1002 static void
initSimpleStats(SimpleStats * ss)1003 initSimpleStats(SimpleStats *ss)
1004 {
1005 memset(ss, 0, sizeof(SimpleStats));
1006 }
1007
1008 /*
1009 * Accumulate one value into a SimpleStats struct.
1010 */
1011 static void
addToSimpleStats(SimpleStats * ss,double val)1012 addToSimpleStats(SimpleStats *ss, double val)
1013 {
1014 if (ss->count == 0 || val < ss->min)
1015 ss->min = val;
1016 if (ss->count == 0 || val > ss->max)
1017 ss->max = val;
1018 ss->count++;
1019 ss->sum += val;
1020 ss->sum2 += val * val;
1021 }
1022
1023 /*
1024 * Merge two SimpleStats objects
1025 */
1026 static void
mergeSimpleStats(SimpleStats * acc,SimpleStats * ss)1027 mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
1028 {
1029 if (acc->count == 0 || ss->min < acc->min)
1030 acc->min = ss->min;
1031 if (acc->count == 0 || ss->max > acc->max)
1032 acc->max = ss->max;
1033 acc->count += ss->count;
1034 acc->sum += ss->sum;
1035 acc->sum2 += ss->sum2;
1036 }
1037
1038 /*
1039 * Initialize a StatsData struct to mostly zeroes, with its start time set to
1040 * the given value.
1041 */
1042 static void
initStats(StatsData * sd,time_t start_time)1043 initStats(StatsData *sd, time_t start_time)
1044 {
1045 sd->start_time = start_time;
1046 sd->cnt = 0;
1047 sd->skipped = 0;
1048 initSimpleStats(&sd->latency);
1049 initSimpleStats(&sd->lag);
1050 }
1051
1052 /*
1053 * Accumulate one additional item into the given stats object.
1054 */
1055 static void
accumStats(StatsData * stats,bool skipped,double lat,double lag)1056 accumStats(StatsData *stats, bool skipped, double lat, double lag)
1057 {
1058 stats->cnt++;
1059
1060 if (skipped)
1061 {
1062 /* no latency to record on skipped transactions */
1063 stats->skipped++;
1064 }
1065 else
1066 {
1067 addToSimpleStats(&stats->latency, lat);
1068
1069 /* and possibly the same for schedule lag */
1070 if (throttle_delay)
1071 addToSimpleStats(&stats->lag, lag);
1072 }
1073 }
1074
1075 /* call PQexec() and exit() on failure */
1076 static void
executeStatement(PGconn * con,const char * sql)1077 executeStatement(PGconn *con, const char *sql)
1078 {
1079 PGresult *res;
1080
1081 res = PQexec(con, sql);
1082 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1083 {
1084 fprintf(stderr, "%s", PQerrorMessage(con));
1085 exit(1);
1086 }
1087 PQclear(res);
1088 }
1089
1090 /* call PQexec() and complain, but without exiting, on failure */
1091 static void
tryExecuteStatement(PGconn * con,const char * sql)1092 tryExecuteStatement(PGconn *con, const char *sql)
1093 {
1094 PGresult *res;
1095
1096 res = PQexec(con, sql);
1097 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1098 {
1099 fprintf(stderr, "%s", PQerrorMessage(con));
1100 fprintf(stderr, "(ignoring this error and continuing anyway)\n");
1101 }
1102 PQclear(res);
1103 }
1104
1105 /* set up a connection to the backend */
1106 static PGconn *
doConnect(void)1107 doConnect(void)
1108 {
1109 PGconn *conn;
1110 bool new_pass;
1111 static bool have_password = false;
1112 static char password[100];
1113
1114 /*
1115 * Start the connection. Loop until we have a password if requested by
1116 * backend.
1117 */
1118 do
1119 {
1120 #define PARAMS_ARRAY_SIZE 7
1121
1122 const char *keywords[PARAMS_ARRAY_SIZE];
1123 const char *values[PARAMS_ARRAY_SIZE];
1124
1125 keywords[0] = "host";
1126 values[0] = pghost;
1127 keywords[1] = "port";
1128 values[1] = pgport;
1129 keywords[2] = "user";
1130 values[2] = login;
1131 keywords[3] = "password";
1132 values[3] = have_password ? password : NULL;
1133 keywords[4] = "dbname";
1134 values[4] = dbName;
1135 keywords[5] = "fallback_application_name";
1136 values[5] = progname;
1137 keywords[6] = NULL;
1138 values[6] = NULL;
1139
1140 new_pass = false;
1141
1142 conn = PQconnectdbParams(keywords, values, true);
1143
1144 if (!conn)
1145 {
1146 fprintf(stderr, "connection to database \"%s\" failed\n",
1147 dbName);
1148 return NULL;
1149 }
1150
1151 if (PQstatus(conn) == CONNECTION_BAD &&
1152 PQconnectionNeedsPassword(conn) &&
1153 !have_password)
1154 {
1155 PQfinish(conn);
1156 simple_prompt("Password: ", password, sizeof(password), false);
1157 have_password = true;
1158 new_pass = true;
1159 }
1160 } while (new_pass);
1161
1162 /* check to see that the backend connection was successfully made */
1163 if (PQstatus(conn) == CONNECTION_BAD)
1164 {
1165 fprintf(stderr, "connection to database \"%s\" failed:\n%s",
1166 PQdb(conn), PQerrorMessage(conn));
1167 PQfinish(conn);
1168 return NULL;
1169 }
1170
1171 return conn;
1172 }
1173
1174 /* throw away response from backend */
1175 static void
discard_response(CState * state)1176 discard_response(CState *state)
1177 {
1178 PGresult *res;
1179
1180 do
1181 {
1182 res = PQgetResult(state->con);
1183 if (res)
1184 PQclear(res);
1185 } while (res);
1186 }
1187
1188 /* qsort comparator for Variable array */
1189 static int
compareVariableNames(const void * v1,const void * v2)1190 compareVariableNames(const void *v1, const void *v2)
1191 {
1192 return strcmp(((const Variable *) v1)->name,
1193 ((const Variable *) v2)->name);
1194 }
1195
1196 /* Locate a variable by name; returns NULL if unknown */
1197 static Variable *
lookupVariable(CState * st,char * name)1198 lookupVariable(CState *st, char *name)
1199 {
1200 Variable key;
1201
1202 /* On some versions of Solaris, bsearch of zero items dumps core */
1203 if (st->nvariables <= 0)
1204 return NULL;
1205
1206 /* Sort if we have to */
1207 if (!st->vars_sorted)
1208 {
1209 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
1210 compareVariableNames);
1211 st->vars_sorted = true;
1212 }
1213
1214 /* Now we can search */
1215 key.name = name;
1216 return (Variable *) bsearch((void *) &key,
1217 (void *) st->variables,
1218 st->nvariables,
1219 sizeof(Variable),
1220 compareVariableNames);
1221 }
1222
1223 /* Get the value of a variable, in string form; returns NULL if unknown */
1224 static char *
getVariable(CState * st,char * name)1225 getVariable(CState *st, char *name)
1226 {
1227 Variable *var;
1228 char stringform[64];
1229
1230 var = lookupVariable(st, name);
1231 if (var == NULL)
1232 return NULL; /* not found */
1233
1234 if (var->svalue)
1235 return var->svalue; /* we have it in string form */
1236
1237 /* We need to produce a string equivalent of the value */
1238 Assert(var->value.type != PGBT_NO_VALUE);
1239 if (var->value.type == PGBT_NULL)
1240 snprintf(stringform, sizeof(stringform), "NULL");
1241 else if (var->value.type == PGBT_BOOLEAN)
1242 snprintf(stringform, sizeof(stringform),
1243 "%s", var->value.u.bval ? "true" : "false");
1244 else if (var->value.type == PGBT_INT)
1245 snprintf(stringform, sizeof(stringform),
1246 INT64_FORMAT, var->value.u.ival);
1247 else if (var->value.type == PGBT_DOUBLE)
1248 snprintf(stringform, sizeof(stringform),
1249 "%.*g", DBL_DIG, var->value.u.dval);
1250 else /* internal error, unexpected type */
1251 Assert(0);
1252 var->svalue = pg_strdup(stringform);
1253 return var->svalue;
1254 }
1255
1256 /* Try to convert variable to a value; return false on failure */
1257 static bool
makeVariableValue(Variable * var)1258 makeVariableValue(Variable *var)
1259 {
1260 size_t slen;
1261
1262 if (var->value.type != PGBT_NO_VALUE)
1263 return true; /* no work */
1264
1265 slen = strlen(var->svalue);
1266
1267 if (slen == 0)
1268 /* what should it do on ""? */
1269 return false;
1270
1271 if (pg_strcasecmp(var->svalue, "null") == 0)
1272 {
1273 setNullValue(&var->value);
1274 }
1275
1276 /*
1277 * accept prefixes such as y, ye, n, no... but not for "o". 0/1 are
1278 * recognized later as an int, which is converted to bool if needed.
1279 */
1280 else if (pg_strncasecmp(var->svalue, "true", slen) == 0 ||
1281 pg_strncasecmp(var->svalue, "yes", slen) == 0 ||
1282 pg_strcasecmp(var->svalue, "on") == 0)
1283 {
1284 setBoolValue(&var->value, true);
1285 }
1286 else if (pg_strncasecmp(var->svalue, "false", slen) == 0 ||
1287 pg_strncasecmp(var->svalue, "no", slen) == 0 ||
1288 pg_strcasecmp(var->svalue, "off") == 0 ||
1289 pg_strcasecmp(var->svalue, "of") == 0)
1290 {
1291 setBoolValue(&var->value, false);
1292 }
1293 else if (is_an_int(var->svalue))
1294 {
1295 setIntValue(&var->value, strtoint64(var->svalue));
1296 }
1297 else /* type should be double */
1298 {
1299 double dv;
1300 char xs;
1301
1302 if (sscanf(var->svalue, "%lf%c", &dv, &xs) != 1)
1303 {
1304 fprintf(stderr,
1305 "malformed variable \"%s\" value: \"%s\"\n",
1306 var->name, var->svalue);
1307 return false;
1308 }
1309 setDoubleValue(&var->value, dv);
1310 }
1311 return true;
1312 }
1313
1314 /*
1315 * Check whether a variable's name is allowed.
1316 *
1317 * We allow any non-ASCII character, as well as ASCII letters, digits, and
1318 * underscore.
1319 *
1320 * Keep this in sync with the definitions of variable name characters in
1321 * "src/fe_utils/psqlscan.l", "src/bin/psql/psqlscanslash.l" and
1322 * "src/bin/pgbench/exprscan.l". Also see parseVariable(), below.
1323 *
1324 * Note: this static function is copied from "src/bin/psql/variables.c"
1325 * but changed to disallow variable names starting with a digit.
1326 */
1327 static bool
valid_variable_name(const char * name)1328 valid_variable_name(const char *name)
1329 {
1330 const unsigned char *ptr = (const unsigned char *) name;
1331
1332 /* Mustn't be zero-length */
1333 if (*ptr == '\0')
1334 return false;
1335
1336 /* must not start with [0-9] */
1337 if (IS_HIGHBIT_SET(*ptr) ||
1338 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1339 "_", *ptr) != NULL)
1340 ptr++;
1341 else
1342 return false;
1343
1344 /* remaining characters can include [0-9] */
1345 while (*ptr)
1346 {
1347 if (IS_HIGHBIT_SET(*ptr) ||
1348 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1349 "_0123456789", *ptr) != NULL)
1350 ptr++;
1351 else
1352 return false;
1353 }
1354
1355 return true;
1356 }
1357
1358 /*
1359 * Lookup a variable by name, creating it if need be.
1360 * Caller is expected to assign a value to the variable.
1361 * Returns NULL on failure (bad name).
1362 */
1363 static Variable *
lookupCreateVariable(CState * st,const char * context,char * name)1364 lookupCreateVariable(CState *st, const char *context, char *name)
1365 {
1366 Variable *var;
1367
1368 var = lookupVariable(st, name);
1369 if (var == NULL)
1370 {
1371 Variable *newvars;
1372
1373 /*
1374 * Check for the name only when declaring a new variable to avoid
1375 * overhead.
1376 */
1377 if (!valid_variable_name(name))
1378 {
1379 fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
1380 context, name);
1381 return NULL;
1382 }
1383
1384 /* Create variable at the end of the array */
1385 if (st->variables)
1386 newvars = (Variable *) pg_realloc(st->variables,
1387 (st->nvariables + 1) * sizeof(Variable));
1388 else
1389 newvars = (Variable *) pg_malloc(sizeof(Variable));
1390
1391 st->variables = newvars;
1392
1393 var = &newvars[st->nvariables];
1394
1395 var->name = pg_strdup(name);
1396 var->svalue = NULL;
1397 /* caller is expected to initialize remaining fields */
1398
1399 st->nvariables++;
1400 /* we don't re-sort the array till we have to */
1401 st->vars_sorted = false;
1402 }
1403
1404 return var;
1405 }
1406
1407 /* Assign a string value to a variable, creating it if need be */
1408 /* Returns false on failure (bad name) */
1409 static bool
putVariable(CState * st,const char * context,char * name,const char * value)1410 putVariable(CState *st, const char *context, char *name, const char *value)
1411 {
1412 Variable *var;
1413 char *val;
1414
1415 var = lookupCreateVariable(st, context, name);
1416 if (!var)
1417 return false;
1418
1419 /* dup then free, in case value is pointing at this variable */
1420 val = pg_strdup(value);
1421
1422 if (var->svalue)
1423 free(var->svalue);
1424 var->svalue = val;
1425 var->value.type = PGBT_NO_VALUE;
1426
1427 return true;
1428 }
1429
1430 /* Assign a value to a variable, creating it if need be */
1431 /* Returns false on failure (bad name) */
1432 static bool
putVariableValue(CState * st,const char * context,char * name,const PgBenchValue * value)1433 putVariableValue(CState *st, const char *context, char *name,
1434 const PgBenchValue *value)
1435 {
1436 Variable *var;
1437
1438 var = lookupCreateVariable(st, context, name);
1439 if (!var)
1440 return false;
1441
1442 if (var->svalue)
1443 free(var->svalue);
1444 var->svalue = NULL;
1445 var->value = *value;
1446
1447 return true;
1448 }
1449
1450 /* Assign an integer value to a variable, creating it if need be */
1451 /* Returns false on failure (bad name) */
1452 static bool
putVariableInt(CState * st,const char * context,char * name,int64 value)1453 putVariableInt(CState *st, const char *context, char *name, int64 value)
1454 {
1455 PgBenchValue val;
1456
1457 setIntValue(&val, value);
1458 return putVariableValue(st, context, name, &val);
1459 }
1460
1461 /*
1462 * Parse a possible variable reference (:varname).
1463 *
1464 * "sql" points at a colon. If what follows it looks like a valid
1465 * variable name, return a malloc'd string containing the variable name,
1466 * and set *eaten to the number of characters consumed (including the colon).
1467 * Otherwise, return NULL.
1468 */
1469 static char *
parseVariable(const char * sql,int * eaten)1470 parseVariable(const char *sql, int *eaten)
1471 {
1472 int i = 1; /* starting at 1 skips the colon */
1473 char *name;
1474
1475 /* keep this logic in sync with valid_variable_name() */
1476 if (IS_HIGHBIT_SET(sql[i]) ||
1477 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1478 "_", sql[i]) != NULL)
1479 i++;
1480 else
1481 return NULL;
1482
1483 while (IS_HIGHBIT_SET(sql[i]) ||
1484 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1485 "_0123456789", sql[i]) != NULL)
1486 i++;
1487
1488 name = pg_malloc(i);
1489 memcpy(name, &sql[1], i - 1);
1490 name[i - 1] = '\0';
1491
1492 *eaten = i;
1493 return name;
1494 }
1495
1496 static char *
replaceVariable(char ** sql,char * param,int len,char * value)1497 replaceVariable(char **sql, char *param, int len, char *value)
1498 {
1499 int valueln = strlen(value);
1500
1501 if (valueln > len)
1502 {
1503 size_t offset = param - *sql;
1504
1505 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1506 param = *sql + offset;
1507 }
1508
1509 if (valueln != len)
1510 memmove(param + valueln, param + len, strlen(param + len) + 1);
1511 memcpy(param, value, valueln);
1512
1513 return param + valueln;
1514 }
1515
1516 static char *
assignVariables(CState * st,char * sql)1517 assignVariables(CState *st, char *sql)
1518 {
1519 char *p,
1520 *name,
1521 *val;
1522
1523 p = sql;
1524 while ((p = strchr(p, ':')) != NULL)
1525 {
1526 int eaten;
1527
1528 name = parseVariable(p, &eaten);
1529 if (name == NULL)
1530 {
1531 while (*p == ':')
1532 {
1533 p++;
1534 }
1535 continue;
1536 }
1537
1538 val = getVariable(st, name);
1539 free(name);
1540 if (val == NULL)
1541 {
1542 p++;
1543 continue;
1544 }
1545
1546 p = replaceVariable(&sql, p, eaten, val);
1547 }
1548
1549 return sql;
1550 }
1551
1552 static void
getQueryParams(CState * st,const Command * command,const char ** params)1553 getQueryParams(CState *st, const Command *command, const char **params)
1554 {
1555 int i;
1556
1557 for (i = 0; i < command->argc - 1; i++)
1558 params[i] = getVariable(st, command->argv[i + 1]);
1559 }
1560
1561 static char *
valueTypeName(PgBenchValue * pval)1562 valueTypeName(PgBenchValue *pval)
1563 {
1564 if (pval->type == PGBT_NO_VALUE)
1565 return "none";
1566 else if (pval->type == PGBT_NULL)
1567 return "null";
1568 else if (pval->type == PGBT_INT)
1569 return "int";
1570 else if (pval->type == PGBT_DOUBLE)
1571 return "double";
1572 else if (pval->type == PGBT_BOOLEAN)
1573 return "boolean";
1574 else
1575 {
1576 /* internal error, should never get there */
1577 Assert(false);
1578 return NULL;
1579 }
1580 }
1581
1582 /* get a value as a boolean, or tell if there is a problem */
1583 static bool
coerceToBool(PgBenchValue * pval,bool * bval)1584 coerceToBool(PgBenchValue *pval, bool *bval)
1585 {
1586 if (pval->type == PGBT_BOOLEAN)
1587 {
1588 *bval = pval->u.bval;
1589 return true;
1590 }
1591 else /* NULL, INT or DOUBLE */
1592 {
1593 fprintf(stderr, "cannot coerce %s to boolean\n", valueTypeName(pval));
1594 *bval = false; /* suppress uninitialized-variable warnings */
1595 return false;
1596 }
1597 }
1598
1599 /*
1600 * Return true or false from an expression for conditional purposes.
1601 * Non zero numerical values are true, zero and NULL are false.
1602 */
1603 static bool
valueTruth(PgBenchValue * pval)1604 valueTruth(PgBenchValue *pval)
1605 {
1606 switch (pval->type)
1607 {
1608 case PGBT_NULL:
1609 return false;
1610 case PGBT_BOOLEAN:
1611 return pval->u.bval;
1612 case PGBT_INT:
1613 return pval->u.ival != 0;
1614 case PGBT_DOUBLE:
1615 return pval->u.dval != 0.0;
1616 default:
1617 /* internal error, unexpected type */
1618 Assert(0);
1619 return false;
1620 }
1621 }
1622
1623 /* get a value as an int, tell if there is a problem */
1624 static bool
coerceToInt(PgBenchValue * pval,int64 * ival)1625 coerceToInt(PgBenchValue *pval, int64 *ival)
1626 {
1627 if (pval->type == PGBT_INT)
1628 {
1629 *ival = pval->u.ival;
1630 return true;
1631 }
1632 else if (pval->type == PGBT_DOUBLE)
1633 {
1634 double dval = rint(pval->u.dval);
1635
1636 if (isnan(dval) || !FLOAT8_FITS_IN_INT64(dval))
1637 {
1638 fprintf(stderr, "double to int overflow for %f\n", dval);
1639 return false;
1640 }
1641 *ival = (int64) dval;
1642 return true;
1643 }
1644 else /* BOOLEAN or NULL */
1645 {
1646 fprintf(stderr, "cannot coerce %s to int\n", valueTypeName(pval));
1647 return false;
1648 }
1649 }
1650
1651 /* get a value as a double, or tell if there is a problem */
1652 static bool
coerceToDouble(PgBenchValue * pval,double * dval)1653 coerceToDouble(PgBenchValue *pval, double *dval)
1654 {
1655 if (pval->type == PGBT_DOUBLE)
1656 {
1657 *dval = pval->u.dval;
1658 return true;
1659 }
1660 else if (pval->type == PGBT_INT)
1661 {
1662 *dval = (double) pval->u.ival;
1663 return true;
1664 }
1665 else /* BOOLEAN or NULL */
1666 {
1667 fprintf(stderr, "cannot coerce %s to double\n", valueTypeName(pval));
1668 return false;
1669 }
1670 }
1671
1672 /* assign a null value */
1673 static void
setNullValue(PgBenchValue * pv)1674 setNullValue(PgBenchValue *pv)
1675 {
1676 pv->type = PGBT_NULL;
1677 pv->u.ival = 0;
1678 }
1679
1680 /* assign a boolean value */
1681 static void
setBoolValue(PgBenchValue * pv,bool bval)1682 setBoolValue(PgBenchValue *pv, bool bval)
1683 {
1684 pv->type = PGBT_BOOLEAN;
1685 pv->u.bval = bval;
1686 }
1687
1688 /* assign an integer value */
1689 static void
setIntValue(PgBenchValue * pv,int64 ival)1690 setIntValue(PgBenchValue *pv, int64 ival)
1691 {
1692 pv->type = PGBT_INT;
1693 pv->u.ival = ival;
1694 }
1695
1696 /* assign a double value */
1697 static void
setDoubleValue(PgBenchValue * pv,double dval)1698 setDoubleValue(PgBenchValue *pv, double dval)
1699 {
1700 pv->type = PGBT_DOUBLE;
1701 pv->u.dval = dval;
1702 }
1703
1704 static bool
isLazyFunc(PgBenchFunction func)1705 isLazyFunc(PgBenchFunction func)
1706 {
1707 return func == PGBENCH_AND || func == PGBENCH_OR || func == PGBENCH_CASE;
1708 }
1709
1710 /* lazy evaluation of some functions */
1711 static bool
evalLazyFunc(TState * thread,CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)1712 evalLazyFunc(TState *thread, CState *st,
1713 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
1714 {
1715 PgBenchValue a1,
1716 a2;
1717 bool ba1,
1718 ba2;
1719
1720 Assert(isLazyFunc(func) && args != NULL && args->next != NULL);
1721
1722 /* args points to first condition */
1723 if (!evaluateExpr(thread, st, args->expr, &a1))
1724 return false;
1725
1726 /* second condition for AND/OR and corresponding branch for CASE */
1727 args = args->next;
1728
1729 switch (func)
1730 {
1731 case PGBENCH_AND:
1732 if (a1.type == PGBT_NULL)
1733 {
1734 setNullValue(retval);
1735 return true;
1736 }
1737
1738 if (!coerceToBool(&a1, &ba1))
1739 return false;
1740
1741 if (!ba1)
1742 {
1743 setBoolValue(retval, false);
1744 return true;
1745 }
1746
1747 if (!evaluateExpr(thread, st, args->expr, &a2))
1748 return false;
1749
1750 if (a2.type == PGBT_NULL)
1751 {
1752 setNullValue(retval);
1753 return true;
1754 }
1755 else if (!coerceToBool(&a2, &ba2))
1756 return false;
1757 else
1758 {
1759 setBoolValue(retval, ba2);
1760 return true;
1761 }
1762
1763 return true;
1764
1765 case PGBENCH_OR:
1766
1767 if (a1.type == PGBT_NULL)
1768 {
1769 setNullValue(retval);
1770 return true;
1771 }
1772
1773 if (!coerceToBool(&a1, &ba1))
1774 return false;
1775
1776 if (ba1)
1777 {
1778 setBoolValue(retval, true);
1779 return true;
1780 }
1781
1782 if (!evaluateExpr(thread, st, args->expr, &a2))
1783 return false;
1784
1785 if (a2.type == PGBT_NULL)
1786 {
1787 setNullValue(retval);
1788 return true;
1789 }
1790 else if (!coerceToBool(&a2, &ba2))
1791 return false;
1792 else
1793 {
1794 setBoolValue(retval, ba2);
1795 return true;
1796 }
1797
1798 case PGBENCH_CASE:
1799 /* when true, execute branch */
1800 if (valueTruth(&a1))
1801 return evaluateExpr(thread, st, args->expr, retval);
1802
1803 /* now args contains next condition or final else expression */
1804 args = args->next;
1805
1806 /* final else case? */
1807 if (args->next == NULL)
1808 return evaluateExpr(thread, st, args->expr, retval);
1809
1810 /* no, another when, proceed */
1811 return evalLazyFunc(thread, st, PGBENCH_CASE, args, retval);
1812
1813 default:
1814 /* internal error, cannot get here */
1815 Assert(0);
1816 break;
1817 }
1818 return false;
1819 }
1820
1821 /* maximum number of function arguments */
1822 #define MAX_FARGS 16
1823
1824 /*
1825 * Recursive evaluation of standard functions,
1826 * which do not require lazy evaluation.
1827 */
1828 static bool
evalStandardFunc(TState * thread,CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)1829 evalStandardFunc(TState *thread, CState *st,
1830 PgBenchFunction func, PgBenchExprLink *args,
1831 PgBenchValue *retval)
1832 {
1833 /* evaluate all function arguments */
1834 int nargs = 0;
1835 PgBenchValue vargs[MAX_FARGS];
1836 PgBenchExprLink *l = args;
1837 bool has_null = false;
1838
1839 for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
1840 {
1841 if (!evaluateExpr(thread, st, l->expr, &vargs[nargs]))
1842 return false;
1843 has_null |= vargs[nargs].type == PGBT_NULL;
1844 }
1845
1846 if (l != NULL)
1847 {
1848 fprintf(stderr,
1849 "too many function arguments, maximum is %d\n", MAX_FARGS);
1850 return false;
1851 }
1852
1853 /* NULL arguments */
1854 if (has_null && func != PGBENCH_IS && func != PGBENCH_DEBUG)
1855 {
1856 setNullValue(retval);
1857 return true;
1858 }
1859
1860 /* then evaluate function */
1861 switch (func)
1862 {
1863 /* overloaded operators */
1864 case PGBENCH_ADD:
1865 case PGBENCH_SUB:
1866 case PGBENCH_MUL:
1867 case PGBENCH_DIV:
1868 case PGBENCH_MOD:
1869 case PGBENCH_EQ:
1870 case PGBENCH_NE:
1871 case PGBENCH_LE:
1872 case PGBENCH_LT:
1873 {
1874 PgBenchValue *lval = &vargs[0],
1875 *rval = &vargs[1];
1876
1877 Assert(nargs == 2);
1878
1879 /* overloaded type management, double if some double */
1880 if ((lval->type == PGBT_DOUBLE ||
1881 rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
1882 {
1883 double ld,
1884 rd;
1885
1886 if (!coerceToDouble(lval, &ld) ||
1887 !coerceToDouble(rval, &rd))
1888 return false;
1889
1890 switch (func)
1891 {
1892 case PGBENCH_ADD:
1893 setDoubleValue(retval, ld + rd);
1894 return true;
1895
1896 case PGBENCH_SUB:
1897 setDoubleValue(retval, ld - rd);
1898 return true;
1899
1900 case PGBENCH_MUL:
1901 setDoubleValue(retval, ld * rd);
1902 return true;
1903
1904 case PGBENCH_DIV:
1905 setDoubleValue(retval, ld / rd);
1906 return true;
1907
1908 case PGBENCH_EQ:
1909 setBoolValue(retval, ld == rd);
1910 return true;
1911
1912 case PGBENCH_NE:
1913 setBoolValue(retval, ld != rd);
1914 return true;
1915
1916 case PGBENCH_LE:
1917 setBoolValue(retval, ld <= rd);
1918 return true;
1919
1920 case PGBENCH_LT:
1921 setBoolValue(retval, ld < rd);
1922 return true;
1923
1924 default:
1925 /* cannot get here */
1926 Assert(0);
1927 }
1928 }
1929 else /* we have integer operands, or % */
1930 {
1931 int64 li,
1932 ri;
1933
1934 if (!coerceToInt(lval, &li) ||
1935 !coerceToInt(rval, &ri))
1936 return false;
1937
1938 switch (func)
1939 {
1940 case PGBENCH_ADD:
1941 setIntValue(retval, li + ri);
1942 return true;
1943
1944 case PGBENCH_SUB:
1945 setIntValue(retval, li - ri);
1946 return true;
1947
1948 case PGBENCH_MUL:
1949 setIntValue(retval, li * ri);
1950 return true;
1951
1952 case PGBENCH_EQ:
1953 setBoolValue(retval, li == ri);
1954 return true;
1955
1956 case PGBENCH_NE:
1957 setBoolValue(retval, li != ri);
1958 return true;
1959
1960 case PGBENCH_LE:
1961 setBoolValue(retval, li <= ri);
1962 return true;
1963
1964 case PGBENCH_LT:
1965 setBoolValue(retval, li < ri);
1966 return true;
1967
1968 case PGBENCH_DIV:
1969 case PGBENCH_MOD:
1970 if (ri == 0)
1971 {
1972 fprintf(stderr, "division by zero\n");
1973 return false;
1974 }
1975 /* special handling of -1 divisor */
1976 if (ri == -1)
1977 {
1978 if (func == PGBENCH_DIV)
1979 {
1980 /* overflow check (needed for INT64_MIN) */
1981 if (li == PG_INT64_MIN)
1982 {
1983 fprintf(stderr, "bigint out of range\n");
1984 return false;
1985 }
1986 else
1987 setIntValue(retval, -li);
1988 }
1989 else
1990 setIntValue(retval, 0);
1991 return true;
1992 }
1993 /* else divisor is not -1 */
1994 if (func == PGBENCH_DIV)
1995 setIntValue(retval, li / ri);
1996 else /* func == PGBENCH_MOD */
1997 setIntValue(retval, li % ri);
1998
1999 return true;
2000
2001 default:
2002 /* cannot get here */
2003 Assert(0);
2004 }
2005 }
2006
2007 Assert(0);
2008 return false; /* NOTREACHED */
2009 }
2010
2011 /* integer bitwise operators */
2012 case PGBENCH_BITAND:
2013 case PGBENCH_BITOR:
2014 case PGBENCH_BITXOR:
2015 case PGBENCH_LSHIFT:
2016 case PGBENCH_RSHIFT:
2017 {
2018 int64 li,
2019 ri;
2020
2021 if (!coerceToInt(&vargs[0], &li) || !coerceToInt(&vargs[1], &ri))
2022 return false;
2023
2024 if (func == PGBENCH_BITAND)
2025 setIntValue(retval, li & ri);
2026 else if (func == PGBENCH_BITOR)
2027 setIntValue(retval, li | ri);
2028 else if (func == PGBENCH_BITXOR)
2029 setIntValue(retval, li ^ ri);
2030 else if (func == PGBENCH_LSHIFT)
2031 setIntValue(retval, li << ri);
2032 else if (func == PGBENCH_RSHIFT)
2033 setIntValue(retval, li >> ri);
2034 else /* cannot get here */
2035 Assert(0);
2036
2037 return true;
2038 }
2039
2040 /* logical operators */
2041 case PGBENCH_NOT:
2042 {
2043 bool b;
2044
2045 if (!coerceToBool(&vargs[0], &b))
2046 return false;
2047
2048 setBoolValue(retval, !b);
2049 return true;
2050 }
2051
2052 /* no arguments */
2053 case PGBENCH_PI:
2054 setDoubleValue(retval, M_PI);
2055 return true;
2056
2057 /* 1 overloaded argument */
2058 case PGBENCH_ABS:
2059 {
2060 PgBenchValue *varg = &vargs[0];
2061
2062 Assert(nargs == 1);
2063
2064 if (varg->type == PGBT_INT)
2065 {
2066 int64 i = varg->u.ival;
2067
2068 setIntValue(retval, i < 0 ? -i : i);
2069 }
2070 else
2071 {
2072 double d = varg->u.dval;
2073
2074 Assert(varg->type == PGBT_DOUBLE);
2075 setDoubleValue(retval, d < 0.0 ? -d : d);
2076 }
2077
2078 return true;
2079 }
2080
2081 case PGBENCH_DEBUG:
2082 {
2083 PgBenchValue *varg = &vargs[0];
2084
2085 Assert(nargs == 1);
2086
2087 fprintf(stderr, "debug(script=%d,command=%d): ",
2088 st->use_file, st->command + 1);
2089
2090 if (varg->type == PGBT_NULL)
2091 fprintf(stderr, "null\n");
2092 else if (varg->type == PGBT_BOOLEAN)
2093 fprintf(stderr, "boolean %s\n", varg->u.bval ? "true" : "false");
2094 else if (varg->type == PGBT_INT)
2095 fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
2096 else if (varg->type == PGBT_DOUBLE)
2097 fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
2098 else /* internal error, unexpected type */
2099 Assert(0);
2100
2101 *retval = *varg;
2102
2103 return true;
2104 }
2105
2106 /* 1 double argument */
2107 case PGBENCH_DOUBLE:
2108 case PGBENCH_SQRT:
2109 case PGBENCH_LN:
2110 case PGBENCH_EXP:
2111 {
2112 double dval;
2113
2114 Assert(nargs == 1);
2115
2116 if (!coerceToDouble(&vargs[0], &dval))
2117 return false;
2118
2119 if (func == PGBENCH_SQRT)
2120 dval = sqrt(dval);
2121 else if (func == PGBENCH_LN)
2122 dval = log(dval);
2123 else if (func == PGBENCH_EXP)
2124 dval = exp(dval);
2125 /* else is cast: do nothing */
2126
2127 setDoubleValue(retval, dval);
2128 return true;
2129 }
2130
2131 /* 1 int argument */
2132 case PGBENCH_INT:
2133 {
2134 int64 ival;
2135
2136 Assert(nargs == 1);
2137
2138 if (!coerceToInt(&vargs[0], &ival))
2139 return false;
2140
2141 setIntValue(retval, ival);
2142 return true;
2143 }
2144
2145 /* variable number of arguments */
2146 case PGBENCH_LEAST:
2147 case PGBENCH_GREATEST:
2148 {
2149 bool havedouble;
2150 int i;
2151
2152 Assert(nargs >= 1);
2153
2154 /* need double result if any input is double */
2155 havedouble = false;
2156 for (i = 0; i < nargs; i++)
2157 {
2158 if (vargs[i].type == PGBT_DOUBLE)
2159 {
2160 havedouble = true;
2161 break;
2162 }
2163 }
2164 if (havedouble)
2165 {
2166 double extremum;
2167
2168 if (!coerceToDouble(&vargs[0], &extremum))
2169 return false;
2170 for (i = 1; i < nargs; i++)
2171 {
2172 double dval;
2173
2174 if (!coerceToDouble(&vargs[i], &dval))
2175 return false;
2176 if (func == PGBENCH_LEAST)
2177 extremum = Min(extremum, dval);
2178 else
2179 extremum = Max(extremum, dval);
2180 }
2181 setDoubleValue(retval, extremum);
2182 }
2183 else
2184 {
2185 int64 extremum;
2186
2187 if (!coerceToInt(&vargs[0], &extremum))
2188 return false;
2189 for (i = 1; i < nargs; i++)
2190 {
2191 int64 ival;
2192
2193 if (!coerceToInt(&vargs[i], &ival))
2194 return false;
2195 if (func == PGBENCH_LEAST)
2196 extremum = Min(extremum, ival);
2197 else
2198 extremum = Max(extremum, ival);
2199 }
2200 setIntValue(retval, extremum);
2201 }
2202 return true;
2203 }
2204
2205 /* random functions */
2206 case PGBENCH_RANDOM:
2207 case PGBENCH_RANDOM_EXPONENTIAL:
2208 case PGBENCH_RANDOM_GAUSSIAN:
2209 case PGBENCH_RANDOM_ZIPFIAN:
2210 {
2211 int64 imin,
2212 imax;
2213
2214 Assert(nargs >= 2);
2215
2216 if (!coerceToInt(&vargs[0], &imin) ||
2217 !coerceToInt(&vargs[1], &imax))
2218 return false;
2219
2220 /* check random range */
2221 if (imin > imax)
2222 {
2223 fprintf(stderr, "empty range given to random\n");
2224 return false;
2225 }
2226 else if (imax - imin < 0 || (imax - imin) + 1 < 0)
2227 {
2228 /* prevent int overflows in random functions */
2229 fprintf(stderr, "random range is too large\n");
2230 return false;
2231 }
2232
2233 if (func == PGBENCH_RANDOM)
2234 {
2235 Assert(nargs == 2);
2236 setIntValue(retval, getrand(thread, imin, imax));
2237 }
2238 else /* gaussian & exponential */
2239 {
2240 double param;
2241
2242 Assert(nargs == 3);
2243
2244 if (!coerceToDouble(&vargs[2], ¶m))
2245 return false;
2246
2247 if (func == PGBENCH_RANDOM_GAUSSIAN)
2248 {
2249 if (param < MIN_GAUSSIAN_PARAM)
2250 {
2251 fprintf(stderr,
2252 "gaussian parameter must be at least %f "
2253 "(not %f)\n", MIN_GAUSSIAN_PARAM, param);
2254 return false;
2255 }
2256
2257 setIntValue(retval,
2258 getGaussianRand(thread, imin, imax, param));
2259 }
2260 else if (func == PGBENCH_RANDOM_ZIPFIAN)
2261 {
2262 if (param <= 0.0 || param == 1.0 || param > MAX_ZIPFIAN_PARAM)
2263 {
2264 fprintf(stderr,
2265 "zipfian parameter must be in range (0, 1) U (1, %d]"
2266 " (got %f)\n", MAX_ZIPFIAN_PARAM, param);
2267 return false;
2268 }
2269 setIntValue(retval,
2270 getZipfianRand(thread, imin, imax, param));
2271 }
2272 else /* exponential */
2273 {
2274 if (param <= 0.0)
2275 {
2276 fprintf(stderr,
2277 "exponential parameter must be greater than zero"
2278 " (got %f)\n", param);
2279 return false;
2280 }
2281
2282 setIntValue(retval,
2283 getExponentialRand(thread, imin, imax, param));
2284 }
2285 }
2286
2287 return true;
2288 }
2289
2290 case PGBENCH_POW:
2291 {
2292 PgBenchValue *lval = &vargs[0];
2293 PgBenchValue *rval = &vargs[1];
2294 double ld,
2295 rd;
2296
2297 Assert(nargs == 2);
2298
2299 if (!coerceToDouble(lval, &ld) ||
2300 !coerceToDouble(rval, &rd))
2301 return false;
2302
2303 setDoubleValue(retval, pow(ld, rd));
2304
2305 return true;
2306 }
2307
2308 case PGBENCH_IS:
2309 {
2310 Assert(nargs == 2);
2311
2312 /*
2313 * note: this simple implementation is more permissive than
2314 * SQL
2315 */
2316 setBoolValue(retval,
2317 vargs[0].type == vargs[1].type &&
2318 vargs[0].u.bval == vargs[1].u.bval);
2319 return true;
2320 }
2321
2322 /* hashing */
2323 case PGBENCH_HASH_FNV1A:
2324 case PGBENCH_HASH_MURMUR2:
2325 {
2326 int64 val,
2327 seed;
2328
2329 Assert(nargs == 2);
2330
2331 if (!coerceToInt(&vargs[0], &val) ||
2332 !coerceToInt(&vargs[1], &seed))
2333 return false;
2334
2335 if (func == PGBENCH_HASH_MURMUR2)
2336 setIntValue(retval, getHashMurmur2(val, seed));
2337 else if (func == PGBENCH_HASH_FNV1A)
2338 setIntValue(retval, getHashFnv1a(val, seed));
2339 else
2340 /* cannot get here */
2341 Assert(0);
2342
2343 return true;
2344 }
2345
2346 default:
2347 /* cannot get here */
2348 Assert(0);
2349 /* dead code to avoid a compiler warning */
2350 return false;
2351 }
2352 }
2353
2354 /* evaluate some function */
2355 static bool
evalFunc(TState * thread,CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)2356 evalFunc(TState *thread, CState *st,
2357 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
2358 {
2359 if (isLazyFunc(func))
2360 return evalLazyFunc(thread, st, func, args, retval);
2361 else
2362 return evalStandardFunc(thread, st, func, args, retval);
2363 }
2364
2365 /*
2366 * Recursive evaluation of an expression in a pgbench script
2367 * using the current state of variables.
2368 * Returns whether the evaluation was ok,
2369 * the value itself is returned through the retval pointer.
2370 */
2371 static bool
evaluateExpr(TState * thread,CState * st,PgBenchExpr * expr,PgBenchValue * retval)2372 evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr, PgBenchValue *retval)
2373 {
2374 switch (expr->etype)
2375 {
2376 case ENODE_CONSTANT:
2377 {
2378 *retval = expr->u.constant;
2379 return true;
2380 }
2381
2382 case ENODE_VARIABLE:
2383 {
2384 Variable *var;
2385
2386 if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
2387 {
2388 fprintf(stderr, "undefined variable \"%s\"\n",
2389 expr->u.variable.varname);
2390 return false;
2391 }
2392
2393 if (!makeVariableValue(var))
2394 return false;
2395
2396 *retval = var->value;
2397 return true;
2398 }
2399
2400 case ENODE_FUNCTION:
2401 return evalFunc(thread, st,
2402 expr->u.function.function,
2403 expr->u.function.args,
2404 retval);
2405
2406 default:
2407 /* internal error which should never occur */
2408 fprintf(stderr, "unexpected enode type in evaluation: %d\n",
2409 expr->etype);
2410 exit(1);
2411 }
2412 }
2413
2414 /*
2415 * Convert command name to meta-command enum identifier
2416 */
2417 static MetaCommand
getMetaCommand(const char * cmd)2418 getMetaCommand(const char *cmd)
2419 {
2420 MetaCommand mc;
2421
2422 if (cmd == NULL)
2423 mc = META_NONE;
2424 else if (pg_strcasecmp(cmd, "set") == 0)
2425 mc = META_SET;
2426 else if (pg_strcasecmp(cmd, "setshell") == 0)
2427 mc = META_SETSHELL;
2428 else if (pg_strcasecmp(cmd, "shell") == 0)
2429 mc = META_SHELL;
2430 else if (pg_strcasecmp(cmd, "sleep") == 0)
2431 mc = META_SLEEP;
2432 else if (pg_strcasecmp(cmd, "if") == 0)
2433 mc = META_IF;
2434 else if (pg_strcasecmp(cmd, "elif") == 0)
2435 mc = META_ELIF;
2436 else if (pg_strcasecmp(cmd, "else") == 0)
2437 mc = META_ELSE;
2438 else if (pg_strcasecmp(cmd, "endif") == 0)
2439 mc = META_ENDIF;
2440 else
2441 mc = META_NONE;
2442 return mc;
2443 }
2444
2445 /*
2446 * Run a shell command. The result is assigned to the variable if not NULL.
2447 * Return true if succeeded, or false on error.
2448 */
2449 static bool
runShellCommand(CState * st,char * variable,char ** argv,int argc)2450 runShellCommand(CState *st, char *variable, char **argv, int argc)
2451 {
2452 char command[SHELL_COMMAND_SIZE];
2453 int i,
2454 len = 0;
2455 FILE *fp;
2456 char res[64];
2457 char *endptr;
2458 int retval;
2459
2460 /*----------
2461 * Join arguments with whitespace separators. Arguments starting with
2462 * exactly one colon are treated as variables:
2463 * name - append a string "name"
2464 * :var - append a variable named 'var'
2465 * ::name - append a string ":name"
2466 *----------
2467 */
2468 for (i = 0; i < argc; i++)
2469 {
2470 char *arg;
2471 int arglen;
2472
2473 if (argv[i][0] != ':')
2474 {
2475 arg = argv[i]; /* a string literal */
2476 }
2477 else if (argv[i][1] == ':')
2478 {
2479 arg = argv[i] + 1; /* a string literal starting with colons */
2480 }
2481 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
2482 {
2483 fprintf(stderr, "%s: undefined variable \"%s\"\n",
2484 argv[0], argv[i]);
2485 return false;
2486 }
2487
2488 arglen = strlen(arg);
2489 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
2490 {
2491 fprintf(stderr, "%s: shell command is too long\n", argv[0]);
2492 return false;
2493 }
2494
2495 if (i > 0)
2496 command[len++] = ' ';
2497 memcpy(command + len, arg, arglen);
2498 len += arglen;
2499 }
2500
2501 command[len] = '\0';
2502
2503 /* Fast path for non-assignment case */
2504 if (variable == NULL)
2505 {
2506 if (system(command))
2507 {
2508 if (!timer_exceeded)
2509 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2510 return false;
2511 }
2512 return true;
2513 }
2514
2515 /* Execute the command with pipe and read the standard output. */
2516 if ((fp = popen(command, "r")) == NULL)
2517 {
2518 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2519 return false;
2520 }
2521 if (fgets(res, sizeof(res), fp) == NULL)
2522 {
2523 if (!timer_exceeded)
2524 fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
2525 (void) pclose(fp);
2526 return false;
2527 }
2528 if (pclose(fp) < 0)
2529 {
2530 fprintf(stderr, "%s: could not close shell command\n", argv[0]);
2531 return false;
2532 }
2533
2534 /* Check whether the result is an integer and assign it to the variable */
2535 retval = (int) strtol(res, &endptr, 10);
2536 while (*endptr != '\0' && isspace((unsigned char) *endptr))
2537 endptr++;
2538 if (*res == '\0' || *endptr != '\0')
2539 {
2540 fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
2541 argv[0], res);
2542 return false;
2543 }
2544 if (!putVariableInt(st, "setshell", variable, retval))
2545 return false;
2546
2547 #ifdef DEBUG
2548 printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
2549 #endif
2550 return true;
2551 }
2552
2553 #define MAX_PREPARE_NAME 32
2554 static void
preparedStatementName(char * buffer,int file,int state)2555 preparedStatementName(char *buffer, int file, int state)
2556 {
2557 sprintf(buffer, "P%d_%d", file, state);
2558 }
2559
2560 static void
commandFailed(CState * st,const char * cmd,const char * message)2561 commandFailed(CState *st, const char *cmd, const char *message)
2562 {
2563 fprintf(stderr,
2564 "client %d aborted in command %d (%s) of script %d; %s\n",
2565 st->id, st->command, cmd, st->use_file, message);
2566 }
2567
2568 /* return a script number with a weighted choice. */
2569 static int
chooseScript(TState * thread)2570 chooseScript(TState *thread)
2571 {
2572 int i = 0;
2573 int64 w;
2574
2575 if (num_scripts == 1)
2576 return 0;
2577
2578 w = getrand(thread, 0, total_weight - 1);
2579 do
2580 {
2581 w -= sql_script[i++].weight;
2582 } while (w >= 0);
2583
2584 return i - 1;
2585 }
2586
2587 /* Send a SQL command, using the chosen querymode */
2588 static bool
sendCommand(CState * st,Command * command)2589 sendCommand(CState *st, Command *command)
2590 {
2591 int r;
2592
2593 if (querymode == QUERY_SIMPLE)
2594 {
2595 char *sql;
2596
2597 sql = pg_strdup(command->argv[0]);
2598 sql = assignVariables(st, sql);
2599
2600 if (debug)
2601 fprintf(stderr, "client %d sending %s\n", st->id, sql);
2602 r = PQsendQuery(st->con, sql);
2603 free(sql);
2604 }
2605 else if (querymode == QUERY_EXTENDED)
2606 {
2607 const char *sql = command->argv[0];
2608 const char *params[MAX_ARGS];
2609
2610 getQueryParams(st, command, params);
2611
2612 if (debug)
2613 fprintf(stderr, "client %d sending %s\n", st->id, sql);
2614 r = PQsendQueryParams(st->con, sql, command->argc - 1,
2615 NULL, params, NULL, NULL, 0);
2616 }
2617 else if (querymode == QUERY_PREPARED)
2618 {
2619 char name[MAX_PREPARE_NAME];
2620 const char *params[MAX_ARGS];
2621
2622 if (!st->prepared[st->use_file])
2623 {
2624 int j;
2625 Command **commands = sql_script[st->use_file].commands;
2626
2627 for (j = 0; commands[j] != NULL; j++)
2628 {
2629 PGresult *res;
2630 char name[MAX_PREPARE_NAME];
2631
2632 if (commands[j]->type != SQL_COMMAND)
2633 continue;
2634 preparedStatementName(name, st->use_file, j);
2635 res = PQprepare(st->con, name,
2636 commands[j]->argv[0], commands[j]->argc - 1, NULL);
2637 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2638 fprintf(stderr, "%s", PQerrorMessage(st->con));
2639 PQclear(res);
2640 }
2641 st->prepared[st->use_file] = true;
2642 }
2643
2644 getQueryParams(st, command, params);
2645 preparedStatementName(name, st->use_file, st->command);
2646
2647 if (debug)
2648 fprintf(stderr, "client %d sending %s\n", st->id, name);
2649 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
2650 params, NULL, NULL, 0);
2651 }
2652 else /* unknown sql mode */
2653 r = 0;
2654
2655 if (r == 0)
2656 {
2657 if (debug)
2658 fprintf(stderr, "client %d could not send %s\n",
2659 st->id, command->argv[0]);
2660 st->ecnt++;
2661 return false;
2662 }
2663 else
2664 return true;
2665 }
2666
2667 /*
2668 * Parse the argument to a \sleep command, and return the requested amount
2669 * of delay, in microseconds. Returns true on success, false on error.
2670 */
2671 static bool
evaluateSleep(CState * st,int argc,char ** argv,int * usecs)2672 evaluateSleep(CState *st, int argc, char **argv, int *usecs)
2673 {
2674 char *var;
2675 int usec;
2676
2677 if (*argv[1] == ':')
2678 {
2679 if ((var = getVariable(st, argv[1] + 1)) == NULL)
2680 {
2681 fprintf(stderr, "%s: undefined variable \"%s\"\n",
2682 argv[0], argv[1]);
2683 return false;
2684 }
2685 usec = atoi(var);
2686 }
2687 else
2688 usec = atoi(argv[1]);
2689
2690 if (argc > 2)
2691 {
2692 if (pg_strcasecmp(argv[2], "ms") == 0)
2693 usec *= 1000;
2694 else if (pg_strcasecmp(argv[2], "s") == 0)
2695 usec *= 1000000;
2696 }
2697 else
2698 usec *= 1000000;
2699
2700 *usecs = usec;
2701 return true;
2702 }
2703
2704 /*
2705 * Advance the state machine of a connection, if possible.
2706 */
2707 static void
doCustom(TState * thread,CState * st,StatsData * agg)2708 doCustom(TState *thread, CState *st, StatsData *agg)
2709 {
2710 PGresult *res;
2711 Command *command;
2712 instr_time now;
2713 bool end_tx_processed = false;
2714 int64 wait;
2715
2716 /*
2717 * gettimeofday() isn't free, so we get the current timestamp lazily the
2718 * first time it's needed, and reuse the same value throughout this
2719 * function after that. This also ensures that e.g. the calculated
2720 * latency reported in the log file and in the totals are the same. Zero
2721 * means "not set yet". Reset "now" when we execute shell commands or
2722 * expressions, which might take a non-negligible amount of time, though.
2723 */
2724 INSTR_TIME_SET_ZERO(now);
2725
2726 /*
2727 * Loop in the state machine, until we have to wait for a result from the
2728 * server (or have to sleep, for throttling or for \sleep).
2729 *
2730 * Note: In the switch-statement below, 'break' will loop back here,
2731 * meaning "continue in the state machine". Return is used to return to
2732 * the caller.
2733 */
2734 for (;;)
2735 {
2736 switch (st->state)
2737 {
2738 /*
2739 * Select transaction to run.
2740 */
2741 case CSTATE_CHOOSE_SCRIPT:
2742
2743 st->use_file = chooseScript(thread);
2744
2745 if (debug)
2746 fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
2747 sql_script[st->use_file].desc);
2748
2749 if (throttle_delay > 0)
2750 st->state = CSTATE_START_THROTTLE;
2751 else
2752 st->state = CSTATE_START_TX;
2753 /* check consistency */
2754 Assert(conditional_stack_empty(st->cstack));
2755 break;
2756
2757 /*
2758 * Handle throttling once per transaction by sleeping.
2759 */
2760 case CSTATE_START_THROTTLE:
2761
2762 /*
2763 * Generate a delay such that the series of delays will
2764 * approximate a Poisson distribution centered on the
2765 * throttle_delay time.
2766 *
2767 * If transactions are too slow or a given wait is shorter
2768 * than a transaction, the next transaction will start right
2769 * away.
2770 */
2771 Assert(throttle_delay > 0);
2772 wait = getPoissonRand(thread, throttle_delay);
2773
2774 thread->throttle_trigger += wait;
2775 st->txn_scheduled = thread->throttle_trigger;
2776
2777 /*
2778 * stop client if next transaction is beyond pgbench end of
2779 * execution
2780 */
2781 if (duration > 0 && st->txn_scheduled > end_time)
2782 {
2783 st->state = CSTATE_FINISHED;
2784 break;
2785 }
2786
2787 /*
2788 * If --latency-limit is used, and this slot is already late
2789 * so that the transaction will miss the latency limit even if
2790 * it completed immediately, we skip this time slot and
2791 * iterate till the next slot that isn't late yet. But don't
2792 * iterate beyond the -t limit, if one is given.
2793 */
2794 if (latency_limit)
2795 {
2796 int64 now_us;
2797
2798 if (INSTR_TIME_IS_ZERO(now))
2799 INSTR_TIME_SET_CURRENT(now);
2800 now_us = INSTR_TIME_GET_MICROSEC(now);
2801 while (thread->throttle_trigger < now_us - latency_limit &&
2802 (nxacts <= 0 || st->cnt < nxacts))
2803 {
2804 processXactStats(thread, st, &now, true, agg);
2805 /* next rendez-vous */
2806 wait = getPoissonRand(thread, throttle_delay);
2807 thread->throttle_trigger += wait;
2808 st->txn_scheduled = thread->throttle_trigger;
2809 }
2810 /* stop client if -t exceeded */
2811 if (nxacts > 0 && st->cnt >= nxacts)
2812 {
2813 st->state = CSTATE_FINISHED;
2814 break;
2815 }
2816 }
2817
2818 st->state = CSTATE_THROTTLE;
2819 if (debug)
2820 fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
2821 st->id, wait);
2822 break;
2823
2824 /*
2825 * Wait until it's time to start next transaction.
2826 */
2827 case CSTATE_THROTTLE:
2828 if (INSTR_TIME_IS_ZERO(now))
2829 INSTR_TIME_SET_CURRENT(now);
2830 if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
2831 return; /* Still sleeping, nothing to do here */
2832
2833 /* Else done sleeping, start the transaction */
2834 st->state = CSTATE_START_TX;
2835 break;
2836
2837 /* Start new transaction */
2838 case CSTATE_START_TX:
2839
2840 /*
2841 * Establish connection on first call, or if is_connect is
2842 * true.
2843 */
2844 if (st->con == NULL)
2845 {
2846 instr_time start;
2847
2848 if (INSTR_TIME_IS_ZERO(now))
2849 INSTR_TIME_SET_CURRENT(now);
2850 start = now;
2851 if ((st->con = doConnect()) == NULL)
2852 {
2853 fprintf(stderr, "client %d aborted while establishing connection\n",
2854 st->id);
2855 st->state = CSTATE_ABORTED;
2856 break;
2857 }
2858 INSTR_TIME_SET_CURRENT(now);
2859 INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
2860
2861 /* Reset session-local state */
2862 memset(st->prepared, 0, sizeof(st->prepared));
2863 }
2864
2865 /*
2866 * Record transaction start time under logging, progress or
2867 * throttling.
2868 */
2869 if (use_log || progress || throttle_delay || latency_limit ||
2870 per_script_stats)
2871 {
2872 if (INSTR_TIME_IS_ZERO(now))
2873 INSTR_TIME_SET_CURRENT(now);
2874 st->txn_begin = now;
2875
2876 /*
2877 * When not throttling, this is also the transaction's
2878 * scheduled start time.
2879 */
2880 if (!throttle_delay)
2881 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
2882 }
2883
2884 /* Begin with the first command */
2885 st->command = 0;
2886 st->state = CSTATE_START_COMMAND;
2887 break;
2888
2889 /*
2890 * Send a command to server (or execute a meta-command)
2891 */
2892 case CSTATE_START_COMMAND:
2893 command = sql_script[st->use_file].commands[st->command];
2894
2895 /*
2896 * If we reached the end of the script, move to end-of-xact
2897 * processing.
2898 */
2899 if (command == NULL)
2900 {
2901 st->state = CSTATE_END_TX;
2902 break;
2903 }
2904
2905 /*
2906 * Record statement start time if per-command latencies are
2907 * requested
2908 */
2909 if (is_latencies)
2910 {
2911 if (INSTR_TIME_IS_ZERO(now))
2912 INSTR_TIME_SET_CURRENT(now);
2913 st->stmt_begin = now;
2914 }
2915
2916 if (command->type == SQL_COMMAND)
2917 {
2918 if (!sendCommand(st, command))
2919 {
2920 commandFailed(st, "SQL", "SQL command send failed");
2921 st->state = CSTATE_ABORTED;
2922 }
2923 else
2924 st->state = CSTATE_WAIT_RESULT;
2925 }
2926 else if (command->type == META_COMMAND)
2927 {
2928 int argc = command->argc,
2929 i;
2930 char **argv = command->argv;
2931
2932 if (debug)
2933 {
2934 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
2935 for (i = 1; i < argc; i++)
2936 fprintf(stderr, " %s", argv[i]);
2937 fprintf(stderr, "\n");
2938 }
2939
2940 if (command->meta == META_SLEEP)
2941 {
2942 /*
2943 * A \sleep doesn't execute anything, we just get the
2944 * delay from the argument, and enter the CSTATE_SLEEP
2945 * state. (The per-command latency will be recorded
2946 * in CSTATE_SLEEP state, not here, after the delay
2947 * has elapsed.)
2948 */
2949 int usec;
2950
2951 if (!evaluateSleep(st, argc, argv, &usec))
2952 {
2953 commandFailed(st, "sleep", "execution of meta-command failed");
2954 st->state = CSTATE_ABORTED;
2955 break;
2956 }
2957
2958 if (INSTR_TIME_IS_ZERO(now))
2959 INSTR_TIME_SET_CURRENT(now);
2960 st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
2961 st->state = CSTATE_SLEEP;
2962 break;
2963 }
2964 else if (command->meta == META_SET ||
2965 command->meta == META_IF ||
2966 command->meta == META_ELIF)
2967 {
2968 /* backslash commands with an expression to evaluate */
2969 PgBenchExpr *expr = command->expr;
2970 PgBenchValue result;
2971
2972 if (command->meta == META_ELIF &&
2973 conditional_stack_peek(st->cstack) == IFSTATE_TRUE)
2974 {
2975 /*
2976 * elif after executed block, skip eval and wait
2977 * for endif
2978 */
2979 conditional_stack_poke(st->cstack, IFSTATE_IGNORED);
2980 goto move_to_end_command;
2981 }
2982
2983 if (!evaluateExpr(thread, st, expr, &result))
2984 {
2985 commandFailed(st, argv[0], "evaluation of meta-command failed");
2986 st->state = CSTATE_ABORTED;
2987 break;
2988 }
2989
2990 if (command->meta == META_SET)
2991 {
2992 if (!putVariableValue(st, argv[0], argv[1], &result))
2993 {
2994 commandFailed(st, "set", "assignment of meta-command failed");
2995 st->state = CSTATE_ABORTED;
2996 break;
2997 }
2998 }
2999 else /* if and elif evaluated cases */
3000 {
3001 bool cond = valueTruth(&result);
3002
3003 /* execute or not depending on evaluated condition */
3004 if (command->meta == META_IF)
3005 {
3006 conditional_stack_push(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
3007 }
3008 else /* elif */
3009 {
3010 /*
3011 * we should get here only if the "elif"
3012 * needed evaluation
3013 */
3014 Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
3015 conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
3016 }
3017 }
3018 }
3019 else if (command->meta == META_ELSE)
3020 {
3021 switch (conditional_stack_peek(st->cstack))
3022 {
3023 case IFSTATE_TRUE:
3024 conditional_stack_poke(st->cstack, IFSTATE_ELSE_FALSE);
3025 break;
3026 case IFSTATE_FALSE: /* inconsistent if active */
3027 case IFSTATE_IGNORED: /* inconsistent if active */
3028 case IFSTATE_NONE: /* else without if */
3029 case IFSTATE_ELSE_TRUE: /* else after else */
3030 case IFSTATE_ELSE_FALSE: /* else after else */
3031 default:
3032 /* dead code if conditional check is ok */
3033 Assert(false);
3034 }
3035 goto move_to_end_command;
3036 }
3037 else if (command->meta == META_ENDIF)
3038 {
3039 Assert(!conditional_stack_empty(st->cstack));
3040 conditional_stack_pop(st->cstack);
3041 goto move_to_end_command;
3042 }
3043 else if (command->meta == META_SETSHELL)
3044 {
3045 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
3046
3047 if (timer_exceeded) /* timeout */
3048 {
3049 st->state = CSTATE_FINISHED;
3050 break;
3051 }
3052 else if (!ret) /* on error */
3053 {
3054 commandFailed(st, "setshell", "execution of meta-command failed");
3055 st->state = CSTATE_ABORTED;
3056 break;
3057 }
3058 else
3059 {
3060 /* succeeded */
3061 }
3062 }
3063 else if (command->meta == META_SHELL)
3064 {
3065 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
3066
3067 if (timer_exceeded) /* timeout */
3068 {
3069 st->state = CSTATE_FINISHED;
3070 break;
3071 }
3072 else if (!ret) /* on error */
3073 {
3074 commandFailed(st, "shell", "execution of meta-command failed");
3075 st->state = CSTATE_ABORTED;
3076 break;
3077 }
3078 else
3079 {
3080 /* succeeded */
3081 }
3082 }
3083
3084 move_to_end_command:
3085
3086 /*
3087 * executing the expression or shell command might take a
3088 * non-negligible amount of time, so reset 'now'
3089 */
3090 INSTR_TIME_SET_ZERO(now);
3091
3092 st->state = CSTATE_END_COMMAND;
3093 }
3094 break;
3095
3096 /*
3097 * non executed conditional branch
3098 */
3099 case CSTATE_SKIP_COMMAND:
3100 Assert(!conditional_active(st->cstack));
3101 /* quickly skip commands until something to do... */
3102 while (true)
3103 {
3104 command = sql_script[st->use_file].commands[st->command];
3105
3106 /* cannot reach end of script in that state */
3107 Assert(command != NULL);
3108
3109 /*
3110 * if this is conditional related, update conditional
3111 * state
3112 */
3113 if (command->type == META_COMMAND &&
3114 (command->meta == META_IF ||
3115 command->meta == META_ELIF ||
3116 command->meta == META_ELSE ||
3117 command->meta == META_ENDIF))
3118 {
3119 switch (conditional_stack_peek(st->cstack))
3120 {
3121 case IFSTATE_FALSE:
3122 if (command->meta == META_IF || command->meta == META_ELIF)
3123 {
3124 /* we must evaluate the condition */
3125 st->state = CSTATE_START_COMMAND;
3126 }
3127 else if (command->meta == META_ELSE)
3128 {
3129 /* we must execute next command */
3130 conditional_stack_poke(st->cstack, IFSTATE_ELSE_TRUE);
3131 st->state = CSTATE_START_COMMAND;
3132 st->command++;
3133 }
3134 else if (command->meta == META_ENDIF)
3135 {
3136 Assert(!conditional_stack_empty(st->cstack));
3137 conditional_stack_pop(st->cstack);
3138 if (conditional_active(st->cstack))
3139 st->state = CSTATE_START_COMMAND;
3140
3141 /*
3142 * else state remains in
3143 * CSTATE_SKIP_COMMAND
3144 */
3145 st->command++;
3146 }
3147 break;
3148
3149 case IFSTATE_IGNORED:
3150 case IFSTATE_ELSE_FALSE:
3151 if (command->meta == META_IF)
3152 conditional_stack_push(st->cstack, IFSTATE_IGNORED);
3153 else if (command->meta == META_ENDIF)
3154 {
3155 Assert(!conditional_stack_empty(st->cstack));
3156 conditional_stack_pop(st->cstack);
3157 if (conditional_active(st->cstack))
3158 st->state = CSTATE_START_COMMAND;
3159 }
3160 /* could detect "else" & "elif" after "else" */
3161 st->command++;
3162 break;
3163
3164 case IFSTATE_NONE:
3165 case IFSTATE_TRUE:
3166 case IFSTATE_ELSE_TRUE:
3167 default:
3168
3169 /*
3170 * inconsistent if inactive, unreachable dead
3171 * code
3172 */
3173 Assert(false);
3174 }
3175 }
3176 else
3177 {
3178 /* skip and consider next */
3179 st->command++;
3180 }
3181
3182 if (st->state != CSTATE_SKIP_COMMAND)
3183 break;
3184 }
3185 break;
3186
3187 /*
3188 * Wait for the current SQL command to complete
3189 */
3190 case CSTATE_WAIT_RESULT:
3191 command = sql_script[st->use_file].commands[st->command];
3192 if (debug)
3193 fprintf(stderr, "client %d receiving\n", st->id);
3194 if (!PQconsumeInput(st->con))
3195 { /* there's something wrong */
3196 commandFailed(st, "SQL", "perhaps the backend died while processing");
3197 st->state = CSTATE_ABORTED;
3198 break;
3199 }
3200 if (PQisBusy(st->con))
3201 return; /* don't have the whole result yet */
3202
3203 /*
3204 * Read and discard the query result;
3205 */
3206 res = PQgetResult(st->con);
3207 switch (PQresultStatus(res))
3208 {
3209 case PGRES_COMMAND_OK:
3210 case PGRES_TUPLES_OK:
3211 case PGRES_EMPTY_QUERY:
3212 /* OK */
3213 PQclear(res);
3214 discard_response(st);
3215 st->state = CSTATE_END_COMMAND;
3216 break;
3217 default:
3218 commandFailed(st, "SQL", PQerrorMessage(st->con));
3219 PQclear(res);
3220 st->state = CSTATE_ABORTED;
3221 break;
3222 }
3223 break;
3224
3225 /*
3226 * Wait until sleep is done. This state is entered after a
3227 * \sleep metacommand. The behavior is similar to
3228 * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
3229 * instead of CSTATE_START_TX.
3230 */
3231 case CSTATE_SLEEP:
3232 if (INSTR_TIME_IS_ZERO(now))
3233 INSTR_TIME_SET_CURRENT(now);
3234 if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
3235 return; /* Still sleeping, nothing to do here */
3236 /* Else done sleeping. */
3237 st->state = CSTATE_END_COMMAND;
3238 break;
3239
3240 /*
3241 * End of command: record stats and proceed to next command.
3242 */
3243 case CSTATE_END_COMMAND:
3244
3245 /*
3246 * command completed: accumulate per-command execution times
3247 * in thread-local data structure, if per-command latencies
3248 * are requested.
3249 */
3250 if (is_latencies)
3251 {
3252 if (INSTR_TIME_IS_ZERO(now))
3253 INSTR_TIME_SET_CURRENT(now);
3254
3255 /* XXX could use a mutex here, but we choose not to */
3256 command = sql_script[st->use_file].commands[st->command];
3257 addToSimpleStats(&command->stats,
3258 INSTR_TIME_GET_DOUBLE(now) -
3259 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
3260 }
3261
3262 /* Go ahead with next command, to be executed or skipped */
3263 st->command++;
3264 st->state = conditional_active(st->cstack) ?
3265 CSTATE_START_COMMAND : CSTATE_SKIP_COMMAND;
3266 break;
3267
3268 /*
3269 * End of transaction.
3270 */
3271 case CSTATE_END_TX:
3272
3273 /* transaction finished: calculate latency and do log */
3274 processXactStats(thread, st, &now, false, agg);
3275
3276 /* conditional stack must be empty */
3277 if (!conditional_stack_empty(st->cstack))
3278 {
3279 fprintf(stderr, "end of script reached within a conditional, missing \\endif\n");
3280 exit(1);
3281 }
3282
3283 if (is_connect)
3284 {
3285 finishCon(st);
3286 INSTR_TIME_SET_ZERO(now);
3287 }
3288
3289 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
3290 {
3291 /* exit success */
3292 st->state = CSTATE_FINISHED;
3293 break;
3294 }
3295
3296 /*
3297 * No transaction is underway anymore.
3298 */
3299 st->state = CSTATE_CHOOSE_SCRIPT;
3300
3301 /*
3302 * If we paced through all commands in the script in this
3303 * loop, without returning to the caller even once, do it now.
3304 * This gives the thread a chance to process other
3305 * connections, and to do progress reporting. This can
3306 * currently only happen if the script consists entirely of
3307 * meta-commands.
3308 */
3309 if (end_tx_processed)
3310 return;
3311 else
3312 {
3313 end_tx_processed = true;
3314 break;
3315 }
3316
3317 /*
3318 * Final states. Close the connection if it's still open.
3319 */
3320 case CSTATE_ABORTED:
3321 case CSTATE_FINISHED:
3322 finishCon(st);
3323 return;
3324 }
3325 }
3326 }
3327
3328 /*
3329 * Print log entry after completing one transaction.
3330 *
3331 * We print Unix-epoch timestamps in the log, so that entries can be
3332 * correlated against other logs. On some platforms this could be obtained
3333 * from the instr_time reading the caller has, but rather than get entangled
3334 * with that, we just eat the cost of an extra syscall in all cases.
3335 */
3336 static void
doLog(TState * thread,CState * st,StatsData * agg,bool skipped,double latency,double lag)3337 doLog(TState *thread, CState *st,
3338 StatsData *agg, bool skipped, double latency, double lag)
3339 {
3340 FILE *logfile = thread->logfile;
3341
3342 Assert(use_log);
3343
3344 /*
3345 * Skip the log entry if sampling is enabled and this row doesn't belong
3346 * to the random sample.
3347 */
3348 if (sample_rate != 0.0 &&
3349 pg_erand48(thread->random_state) > sample_rate)
3350 return;
3351
3352 /* should we aggregate the results or not? */
3353 if (agg_interval > 0)
3354 {
3355 /*
3356 * Loop until we reach the interval of the current moment, and print
3357 * any empty intervals in between (this may happen with very low tps,
3358 * e.g. --rate=0.1).
3359 */
3360 time_t now = time(NULL);
3361
3362 while (agg->start_time + agg_interval <= now)
3363 {
3364 /* print aggregated report to logfile */
3365 fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
3366 (long) agg->start_time,
3367 agg->cnt,
3368 agg->latency.sum,
3369 agg->latency.sum2,
3370 agg->latency.min,
3371 agg->latency.max);
3372 if (throttle_delay)
3373 {
3374 fprintf(logfile, " %.0f %.0f %.0f %.0f",
3375 agg->lag.sum,
3376 agg->lag.sum2,
3377 agg->lag.min,
3378 agg->lag.max);
3379 if (latency_limit)
3380 fprintf(logfile, " " INT64_FORMAT, agg->skipped);
3381 }
3382 fputc('\n', logfile);
3383
3384 /* reset data and move to next interval */
3385 initStats(agg, agg->start_time + agg_interval);
3386 }
3387
3388 /* accumulate the current transaction */
3389 accumStats(agg, skipped, latency, lag);
3390 }
3391 else
3392 {
3393 /* no, print raw transactions */
3394 struct timeval tv;
3395
3396 gettimeofday(&tv, NULL);
3397 if (skipped)
3398 fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
3399 st->id, st->cnt, st->use_file,
3400 (long) tv.tv_sec, (long) tv.tv_usec);
3401 else
3402 fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
3403 st->id, st->cnt, latency, st->use_file,
3404 (long) tv.tv_sec, (long) tv.tv_usec);
3405 if (throttle_delay)
3406 fprintf(logfile, " %.0f", lag);
3407 fputc('\n', logfile);
3408 }
3409 }
3410
3411 /*
3412 * Accumulate and report statistics at end of a transaction.
3413 *
3414 * (This is also called when a transaction is late and thus skipped.
3415 * Note that even skipped transactions are counted in the "cnt" fields.)
3416 */
3417 static void
processXactStats(TState * thread,CState * st,instr_time * now,bool skipped,StatsData * agg)3418 processXactStats(TState *thread, CState *st, instr_time *now,
3419 bool skipped, StatsData *agg)
3420 {
3421 double latency = 0.0,
3422 lag = 0.0;
3423 bool thread_details = progress || throttle_delay || latency_limit,
3424 detailed = thread_details || use_log || per_script_stats;
3425
3426 if (detailed && !skipped)
3427 {
3428 if (INSTR_TIME_IS_ZERO(*now))
3429 INSTR_TIME_SET_CURRENT(*now);
3430
3431 /* compute latency & lag */
3432 latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
3433 lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
3434 }
3435
3436 if (thread_details)
3437 {
3438 /* keep detailed thread stats */
3439 accumStats(&thread->stats, skipped, latency, lag);
3440
3441 /* count transactions over the latency limit, if needed */
3442 if (latency_limit && latency > latency_limit)
3443 thread->latency_late++;
3444 }
3445 else
3446 {
3447 /* no detailed stats, just count */
3448 thread->stats.cnt++;
3449 }
3450
3451 /* client stat is just counting */
3452 st->cnt++;
3453
3454 if (use_log)
3455 doLog(thread, st, agg, skipped, latency, lag);
3456
3457 /* XXX could use a mutex here, but we choose not to */
3458 if (per_script_stats)
3459 accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
3460 }
3461
3462
3463 /* discard connections */
3464 static void
disconnect_all(CState * state,int length)3465 disconnect_all(CState *state, int length)
3466 {
3467 int i;
3468
3469 for (i = 0; i < length; i++)
3470 finishCon(&state[i]);
3471 }
3472
3473 /*
3474 * Remove old pgbench tables, if any exist
3475 */
3476 static void
initDropTables(PGconn * con)3477 initDropTables(PGconn *con)
3478 {
3479 fprintf(stderr, "dropping old tables...\n");
3480
3481 /*
3482 * We drop all the tables in one command, so that whether there are
3483 * foreign key dependencies or not doesn't matter.
3484 */
3485 executeStatement(con, "drop table if exists "
3486 "pgbench_accounts, "
3487 "pgbench_branches, "
3488 "pgbench_history, "
3489 "pgbench_tellers");
3490 }
3491
3492 /*
3493 * Create pgbench's standard tables
3494 */
3495 static void
initCreateTables(PGconn * con)3496 initCreateTables(PGconn *con)
3497 {
3498 /*
3499 * The scale factor at/beyond which 32-bit integers are insufficient for
3500 * storing TPC-B account IDs.
3501 *
3502 * Although the actual threshold is 21474, we use 20000 because it is
3503 * easier to document and remember, and isn't that far away from the real
3504 * threshold.
3505 */
3506 #define SCALE_32BIT_THRESHOLD 20000
3507
3508 /*
3509 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
3510 * fields in these table declarations were intended to comply with that.
3511 * The pgbench_accounts table complies with that because the "filler"
3512 * column is set to blank-padded empty string. But for all other tables
3513 * the columns default to NULL and so don't actually take any space. We
3514 * could fix that by giving them non-null default values. However, that
3515 * would completely break comparability of pgbench results with prior
3516 * versions. Since pgbench has never pretended to be fully TPC-B compliant
3517 * anyway, we stick with the historical behavior.
3518 */
3519 struct ddlinfo
3520 {
3521 const char *table; /* table name */
3522 const char *smcols; /* column decls if accountIDs are 32 bits */
3523 const char *bigcols; /* column decls if accountIDs are 64 bits */
3524 int declare_fillfactor;
3525 };
3526 static const struct ddlinfo DDLs[] = {
3527 {
3528 "pgbench_history",
3529 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
3530 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
3531 0
3532 },
3533 {
3534 "pgbench_tellers",
3535 "tid int not null,bid int,tbalance int,filler char(84)",
3536 "tid int not null,bid int,tbalance int,filler char(84)",
3537 1
3538 },
3539 {
3540 "pgbench_accounts",
3541 "aid int not null,bid int,abalance int,filler char(84)",
3542 "aid bigint not null,bid int,abalance int,filler char(84)",
3543 1
3544 },
3545 {
3546 "pgbench_branches",
3547 "bid int not null,bbalance int,filler char(88)",
3548 "bid int not null,bbalance int,filler char(88)",
3549 1
3550 }
3551 };
3552 int i;
3553
3554 fprintf(stderr, "creating tables...\n");
3555
3556 for (i = 0; i < lengthof(DDLs); i++)
3557 {
3558 char opts[256];
3559 char buffer[256];
3560 const struct ddlinfo *ddl = &DDLs[i];
3561 const char *cols;
3562
3563 /* Construct new create table statement. */
3564 opts[0] = '\0';
3565 if (ddl->declare_fillfactor)
3566 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3567 " with (fillfactor=%d)", fillfactor);
3568 if (tablespace != NULL)
3569 {
3570 char *escape_tablespace;
3571
3572 escape_tablespace = PQescapeIdentifier(con, tablespace,
3573 strlen(tablespace));
3574 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3575 " tablespace %s", escape_tablespace);
3576 PQfreemem(escape_tablespace);
3577 }
3578
3579 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
3580
3581 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
3582 unlogged_tables ? " unlogged" : "",
3583 ddl->table, cols, opts);
3584
3585 executeStatement(con, buffer);
3586 }
3587 }
3588
3589 /*
3590 * Fill the standard tables with some data
3591 */
3592 static void
initGenerateData(PGconn * con)3593 initGenerateData(PGconn *con)
3594 {
3595 char sql[256];
3596 PGresult *res;
3597 int i;
3598 int64 k;
3599
3600 /* used to track elapsed time and estimate of the remaining time */
3601 instr_time start,
3602 diff;
3603 double elapsed_sec,
3604 remaining_sec;
3605 int log_interval = 1;
3606
3607 fprintf(stderr, "generating data...\n");
3608
3609 /*
3610 * we do all of this in one transaction to enable the backend's
3611 * data-loading optimizations
3612 */
3613 executeStatement(con, "begin");
3614
3615 /*
3616 * truncate away any old data, in one command in case there are foreign
3617 * keys
3618 */
3619 executeStatement(con, "truncate table "
3620 "pgbench_accounts, "
3621 "pgbench_branches, "
3622 "pgbench_history, "
3623 "pgbench_tellers");
3624
3625 /*
3626 * fill branches, tellers, accounts in that order in case foreign keys
3627 * already exist
3628 */
3629 for (i = 0; i < nbranches * scale; i++)
3630 {
3631 /* "filler" column defaults to NULL */
3632 snprintf(sql, sizeof(sql),
3633 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
3634 i + 1);
3635 executeStatement(con, sql);
3636 }
3637
3638 for (i = 0; i < ntellers * scale; i++)
3639 {
3640 /* "filler" column defaults to NULL */
3641 snprintf(sql, sizeof(sql),
3642 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
3643 i + 1, i / ntellers + 1);
3644 executeStatement(con, sql);
3645 }
3646
3647 /*
3648 * accounts is big enough to be worth using COPY and tracking runtime
3649 */
3650 res = PQexec(con, "copy pgbench_accounts from stdin");
3651 if (PQresultStatus(res) != PGRES_COPY_IN)
3652 {
3653 fprintf(stderr, "%s", PQerrorMessage(con));
3654 exit(1);
3655 }
3656 PQclear(res);
3657
3658 INSTR_TIME_SET_CURRENT(start);
3659
3660 for (k = 0; k < (int64) naccounts * scale; k++)
3661 {
3662 int64 j = k + 1;
3663
3664 /* "filler" column defaults to blank padded empty string */
3665 snprintf(sql, sizeof(sql),
3666 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
3667 j, k / naccounts + 1, 0);
3668 if (PQputline(con, sql))
3669 {
3670 fprintf(stderr, "PQputline failed\n");
3671 exit(1);
3672 }
3673
3674 /*
3675 * If we want to stick with the original logging, print a message each
3676 * 100k inserted rows.
3677 */
3678 if ((!use_quiet) && (j % 100000 == 0))
3679 {
3680 INSTR_TIME_SET_CURRENT(diff);
3681 INSTR_TIME_SUBTRACT(diff, start);
3682
3683 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3684 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3685
3686 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3687 j, (int64) naccounts * scale,
3688 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
3689 elapsed_sec, remaining_sec);
3690 }
3691 /* let's not call the timing for each row, but only each 100 rows */
3692 else if (use_quiet && (j % 100 == 0))
3693 {
3694 INSTR_TIME_SET_CURRENT(diff);
3695 INSTR_TIME_SUBTRACT(diff, start);
3696
3697 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3698 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3699
3700 /* have we reached the next interval (or end)? */
3701 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
3702 {
3703 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3704 j, (int64) naccounts * scale,
3705 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
3706
3707 /* skip to the next interval */
3708 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
3709 }
3710 }
3711
3712 }
3713 if (PQputline(con, "\\.\n"))
3714 {
3715 fprintf(stderr, "very last PQputline failed\n");
3716 exit(1);
3717 }
3718 if (PQendcopy(con))
3719 {
3720 fprintf(stderr, "PQendcopy failed\n");
3721 exit(1);
3722 }
3723
3724 executeStatement(con, "commit");
3725 }
3726
3727 /*
3728 * Invoke vacuum on the standard tables
3729 */
3730 static void
initVacuum(PGconn * con)3731 initVacuum(PGconn *con)
3732 {
3733 fprintf(stderr, "vacuuming...\n");
3734 executeStatement(con, "vacuum analyze pgbench_branches");
3735 executeStatement(con, "vacuum analyze pgbench_tellers");
3736 executeStatement(con, "vacuum analyze pgbench_accounts");
3737 executeStatement(con, "vacuum analyze pgbench_history");
3738 }
3739
3740 /*
3741 * Create primary keys on the standard tables
3742 */
3743 static void
initCreatePKeys(PGconn * con)3744 initCreatePKeys(PGconn *con)
3745 {
3746 static const char *const DDLINDEXes[] = {
3747 "alter table pgbench_branches add primary key (bid)",
3748 "alter table pgbench_tellers add primary key (tid)",
3749 "alter table pgbench_accounts add primary key (aid)"
3750 };
3751 int i;
3752
3753 fprintf(stderr, "creating primary keys...\n");
3754 for (i = 0; i < lengthof(DDLINDEXes); i++)
3755 {
3756 char buffer[256];
3757
3758 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
3759
3760 if (index_tablespace != NULL)
3761 {
3762 char *escape_tablespace;
3763
3764 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
3765 strlen(index_tablespace));
3766 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
3767 " using index tablespace %s", escape_tablespace);
3768 PQfreemem(escape_tablespace);
3769 }
3770
3771 executeStatement(con, buffer);
3772 }
3773 }
3774
3775 /*
3776 * Create foreign key constraints between the standard tables
3777 */
3778 static void
initCreateFKeys(PGconn * con)3779 initCreateFKeys(PGconn *con)
3780 {
3781 static const char *const DDLKEYs[] = {
3782 "alter table pgbench_tellers add constraint pgbench_tellers_bid_fkey foreign key (bid) references pgbench_branches",
3783 "alter table pgbench_accounts add constraint pgbench_accounts_bid_fkey foreign key (bid) references pgbench_branches",
3784 "alter table pgbench_history add constraint pgbench_history_bid_fkey foreign key (bid) references pgbench_branches",
3785 "alter table pgbench_history add constraint pgbench_history_tid_fkey foreign key (tid) references pgbench_tellers",
3786 "alter table pgbench_history add constraint pgbench_history_aid_fkey foreign key (aid) references pgbench_accounts"
3787 };
3788 int i;
3789
3790 fprintf(stderr, "creating foreign keys...\n");
3791 for (i = 0; i < lengthof(DDLKEYs); i++)
3792 {
3793 executeStatement(con, DDLKEYs[i]);
3794 }
3795 }
3796
3797 /*
3798 * Validate an initialization-steps string
3799 *
3800 * (We could just leave it to runInitSteps() to fail if there are wrong
3801 * characters, but since initialization can take awhile, it seems friendlier
3802 * to check during option parsing.)
3803 */
3804 static void
checkInitSteps(const char * initialize_steps)3805 checkInitSteps(const char *initialize_steps)
3806 {
3807 const char *step;
3808
3809 if (initialize_steps[0] == '\0')
3810 {
3811 fprintf(stderr, "no initialization steps specified\n");
3812 exit(1);
3813 }
3814
3815 for (step = initialize_steps; *step != '\0'; step++)
3816 {
3817 if (strchr("dtgvpf ", *step) == NULL)
3818 {
3819 fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3820 *step);
3821 fprintf(stderr, "allowed steps are: \"d\", \"t\", \"g\", \"v\", \"p\", \"f\"\n");
3822 exit(1);
3823 }
3824 }
3825 }
3826
3827 /*
3828 * Invoke each initialization step in the given string
3829 */
3830 static void
runInitSteps(const char * initialize_steps)3831 runInitSteps(const char *initialize_steps)
3832 {
3833 PGconn *con;
3834 const char *step;
3835
3836 if ((con = doConnect()) == NULL)
3837 exit(1);
3838
3839 for (step = initialize_steps; *step != '\0'; step++)
3840 {
3841 switch (*step)
3842 {
3843 case 'd':
3844 initDropTables(con);
3845 break;
3846 case 't':
3847 initCreateTables(con);
3848 break;
3849 case 'g':
3850 initGenerateData(con);
3851 break;
3852 case 'v':
3853 initVacuum(con);
3854 break;
3855 case 'p':
3856 initCreatePKeys(con);
3857 break;
3858 case 'f':
3859 initCreateFKeys(con);
3860 break;
3861 case ' ':
3862 break; /* ignore */
3863 default:
3864 fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3865 *step);
3866 PQfinish(con);
3867 exit(1);
3868 }
3869 }
3870
3871 fprintf(stderr, "done.\n");
3872 PQfinish(con);
3873 }
3874
3875 /*
3876 * Replace :param with $n throughout the command's SQL text, which
3877 * is a modifiable string in cmd->argv[0].
3878 */
3879 static bool
parseQuery(Command * cmd)3880 parseQuery(Command *cmd)
3881 {
3882 char *sql,
3883 *p;
3884
3885 /* We don't want to scribble on cmd->argv[0] until done */
3886 sql = pg_strdup(cmd->argv[0]);
3887
3888 cmd->argc = 1;
3889
3890 p = sql;
3891 while ((p = strchr(p, ':')) != NULL)
3892 {
3893 char var[13];
3894 char *name;
3895 int eaten;
3896
3897 name = parseVariable(p, &eaten);
3898 if (name == NULL)
3899 {
3900 while (*p == ':')
3901 {
3902 p++;
3903 }
3904 continue;
3905 }
3906
3907 if (cmd->argc >= MAX_ARGS)
3908 {
3909 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n",
3910 MAX_ARGS - 1, cmd->argv[0]);
3911 pg_free(name);
3912 return false;
3913 }
3914
3915 sprintf(var, "$%d", cmd->argc);
3916 p = replaceVariable(&sql, p, eaten, var);
3917
3918 cmd->argv[cmd->argc] = name;
3919 cmd->argc++;
3920 }
3921
3922 pg_free(cmd->argv[0]);
3923 cmd->argv[0] = sql;
3924 return true;
3925 }
3926
3927 /*
3928 * Simple error-printing function, might be needed by lexer
3929 */
3930 static void
pgbench_error(const char * fmt,...)3931 pgbench_error(const char *fmt,...)
3932 {
3933 va_list ap;
3934
3935 fflush(stdout);
3936 va_start(ap, fmt);
3937 vfprintf(stderr, _(fmt), ap);
3938 va_end(ap);
3939 }
3940
3941 /*
3942 * syntax error while parsing a script (in practice, while parsing a
3943 * backslash command, because we don't detect syntax errors in SQL)
3944 *
3945 * source: source of script (filename or builtin-script ID)
3946 * lineno: line number within script (count from 1)
3947 * line: whole line of backslash command, if available
3948 * command: backslash command name, if available
3949 * msg: the actual error message
3950 * more: optional extra message
3951 * column: zero-based column number, or -1 if unknown
3952 */
3953 void
syntax_error(const char * source,int lineno,const char * line,const char * command,const char * msg,const char * more,int column)3954 syntax_error(const char *source, int lineno,
3955 const char *line, const char *command,
3956 const char *msg, const char *more, int column)
3957 {
3958 fprintf(stderr, "%s:%d: %s", source, lineno, msg);
3959 if (more != NULL)
3960 fprintf(stderr, " (%s)", more);
3961 if (column >= 0 && line == NULL)
3962 fprintf(stderr, " at column %d", column + 1);
3963 if (command != NULL)
3964 fprintf(stderr, " in command \"%s\"", command);
3965 fprintf(stderr, "\n");
3966 if (line != NULL)
3967 {
3968 fprintf(stderr, "%s\n", line);
3969 if (column >= 0)
3970 {
3971 int i;
3972
3973 for (i = 0; i < column; i++)
3974 fprintf(stderr, " ");
3975 fprintf(stderr, "^ error found here\n");
3976 }
3977 }
3978 exit(1);
3979 }
3980
3981 /*
3982 * Parse a SQL command; return a Command struct, or NULL if it's a comment
3983 *
3984 * On entry, psqlscan.l has collected the command into "buf", so we don't
3985 * really need to do much here except check for comment and set up a
3986 * Command struct.
3987 */
3988 static Command *
process_sql_command(PQExpBuffer buf,const char * source)3989 process_sql_command(PQExpBuffer buf, const char *source)
3990 {
3991 Command *my_command;
3992 char *p;
3993 char *nlpos;
3994
3995 /* Skip any leading whitespace, as well as "--" style comments */
3996 p = buf->data;
3997 for (;;)
3998 {
3999 if (isspace((unsigned char) *p))
4000 p++;
4001 else if (strncmp(p, "--", 2) == 0)
4002 {
4003 p = strchr(p, '\n');
4004 if (p == NULL)
4005 return NULL;
4006 p++;
4007 }
4008 else
4009 break;
4010 }
4011
4012 /* If there's nothing but whitespace and comments, we're done */
4013 if (*p == '\0')
4014 return NULL;
4015
4016 /* Allocate and initialize Command structure */
4017 my_command = (Command *) pg_malloc0(sizeof(Command));
4018 my_command->command_num = num_commands++;
4019 my_command->type = SQL_COMMAND;
4020 my_command->meta = META_NONE;
4021 initSimpleStats(&my_command->stats);
4022
4023 /*
4024 * Install query text as the sole argv string. If we are using a
4025 * non-simple query mode, we'll extract parameters from it later.
4026 */
4027 my_command->argv[0] = pg_strdup(p);
4028 my_command->argc = 1;
4029
4030 /*
4031 * If SQL command is multi-line, we only want to save the first line as
4032 * the "line" label.
4033 */
4034 nlpos = strchr(p, '\n');
4035 if (nlpos)
4036 {
4037 my_command->line = pg_malloc(nlpos - p + 1);
4038 memcpy(my_command->line, p, nlpos - p);
4039 my_command->line[nlpos - p] = '\0';
4040 }
4041 else
4042 my_command->line = pg_strdup(p);
4043
4044 return my_command;
4045 }
4046
4047 /*
4048 * Parse a backslash command; return a Command struct, or NULL if comment
4049 *
4050 * At call, we have scanned only the initial backslash.
4051 */
4052 static Command *
process_backslash_command(PsqlScanState sstate,const char * source)4053 process_backslash_command(PsqlScanState sstate, const char *source)
4054 {
4055 Command *my_command;
4056 PQExpBufferData word_buf;
4057 int word_offset;
4058 int offsets[MAX_ARGS]; /* offsets of argument words */
4059 int start_offset;
4060 int lineno;
4061 int j;
4062
4063 initPQExpBuffer(&word_buf);
4064
4065 /* Remember location of the backslash */
4066 start_offset = expr_scanner_offset(sstate) - 1;
4067 lineno = expr_scanner_get_lineno(sstate, start_offset);
4068
4069 /* Collect first word of command */
4070 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4071 {
4072 termPQExpBuffer(&word_buf);
4073 return NULL;
4074 }
4075
4076 /* Allocate and initialize Command structure */
4077 my_command = (Command *) pg_malloc0(sizeof(Command));
4078 my_command->command_num = num_commands++;
4079 my_command->type = META_COMMAND;
4080 my_command->argc = 0;
4081 initSimpleStats(&my_command->stats);
4082
4083 /* Save first word (command name) */
4084 j = 0;
4085 offsets[j] = word_offset;
4086 my_command->argv[j++] = pg_strdup(word_buf.data);
4087 my_command->argc++;
4088
4089 /* ... and convert it to enum form */
4090 my_command->meta = getMetaCommand(my_command->argv[0]);
4091
4092 if (my_command->meta == META_SET ||
4093 my_command->meta == META_IF ||
4094 my_command->meta == META_ELIF)
4095 {
4096 yyscan_t yyscanner;
4097
4098 /* For \set, collect var name */
4099 if (my_command->meta == META_SET)
4100 {
4101 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4102 syntax_error(source, lineno, my_command->line, my_command->argv[0],
4103 "missing argument", NULL, -1);
4104
4105 offsets[j] = word_offset;
4106 my_command->argv[j++] = pg_strdup(word_buf.data);
4107 my_command->argc++;
4108 }
4109
4110 /* then for all parse the expression */
4111 yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
4112 my_command->argv[0]);
4113
4114 if (expr_yyparse(yyscanner) != 0)
4115 {
4116 /* dead code: exit done from syntax_error called by yyerror */
4117 exit(1);
4118 }
4119
4120 my_command->expr = expr_parse_result;
4121
4122 /* Save line, trimming any trailing newline */
4123 my_command->line = expr_scanner_get_substring(sstate,
4124 start_offset,
4125 expr_scanner_offset(sstate),
4126 true);
4127
4128 expr_scanner_finish(yyscanner);
4129
4130 termPQExpBuffer(&word_buf);
4131
4132 return my_command;
4133 }
4134
4135 /* For all other commands, collect remaining words. */
4136 while (expr_lex_one_word(sstate, &word_buf, &word_offset))
4137 {
4138 if (j >= MAX_ARGS)
4139 syntax_error(source, lineno, my_command->line, my_command->argv[0],
4140 "too many arguments", NULL, -1);
4141
4142 offsets[j] = word_offset;
4143 my_command->argv[j++] = pg_strdup(word_buf.data);
4144 my_command->argc++;
4145 }
4146
4147 /* Save line, trimming any trailing newline */
4148 my_command->line = expr_scanner_get_substring(sstate,
4149 start_offset,
4150 expr_scanner_offset(sstate),
4151 true);
4152
4153 if (my_command->meta == META_SLEEP)
4154 {
4155 if (my_command->argc < 2)
4156 syntax_error(source, lineno, my_command->line, my_command->argv[0],
4157 "missing argument", NULL, -1);
4158
4159 if (my_command->argc > 3)
4160 syntax_error(source, lineno, my_command->line, my_command->argv[0],
4161 "too many arguments", NULL,
4162 offsets[3] - start_offset);
4163
4164 /*
4165 * Split argument into number and unit to allow "sleep 1ms" etc. We
4166 * don't have to terminate the number argument with null because it
4167 * will be parsed with atoi, which ignores trailing non-digit
4168 * characters.
4169 */
4170 if (my_command->argc == 2 && my_command->argv[1][0] != ':')
4171 {
4172 char *c = my_command->argv[1];
4173
4174 while (isdigit((unsigned char) *c))
4175 c++;
4176 if (*c)
4177 {
4178 my_command->argv[2] = c;
4179 offsets[2] = offsets[1] + (c - my_command->argv[1]);
4180 my_command->argc = 3;
4181 }
4182 }
4183
4184 if (my_command->argc == 3)
4185 {
4186 if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
4187 pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
4188 pg_strcasecmp(my_command->argv[2], "s") != 0)
4189 syntax_error(source, lineno, my_command->line, my_command->argv[0],
4190 "unrecognized time unit, must be us, ms or s",
4191 my_command->argv[2], offsets[2] - start_offset);
4192 }
4193 }
4194 else if (my_command->meta == META_SETSHELL)
4195 {
4196 if (my_command->argc < 3)
4197 syntax_error(source, lineno, my_command->line, my_command->argv[0],
4198 "missing argument", NULL, -1);
4199 }
4200 else if (my_command->meta == META_SHELL)
4201 {
4202 if (my_command->argc < 2)
4203 syntax_error(source, lineno, my_command->line, my_command->argv[0],
4204 "missing command", NULL, -1);
4205 }
4206 else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
4207 {
4208 if (my_command->argc != 1)
4209 syntax_error(source, lineno, my_command->line, my_command->argv[0],
4210 "unexpected argument", NULL, -1);
4211 }
4212 else
4213 {
4214 /* my_command->meta == META_NONE */
4215 syntax_error(source, lineno, my_command->line, my_command->argv[0],
4216 "invalid command", NULL, -1);
4217 }
4218
4219 termPQExpBuffer(&word_buf);
4220
4221 return my_command;
4222 }
4223
4224 static void
ConditionError(const char * desc,int cmdn,const char * msg)4225 ConditionError(const char *desc, int cmdn, const char *msg)
4226 {
4227 fprintf(stderr,
4228 "condition error in script \"%s\" command %d: %s\n",
4229 desc, cmdn, msg);
4230 exit(1);
4231 }
4232
4233 /*
4234 * Partial evaluation of conditionals before recording and running the script.
4235 */
4236 static void
CheckConditional(ParsedScript ps)4237 CheckConditional(ParsedScript ps)
4238 {
4239 /* statically check conditional structure */
4240 ConditionalStack cs = conditional_stack_create();
4241 int i;
4242
4243 for (i = 0; ps.commands[i] != NULL; i++)
4244 {
4245 Command *cmd = ps.commands[i];
4246
4247 if (cmd->type == META_COMMAND)
4248 {
4249 switch (cmd->meta)
4250 {
4251 case META_IF:
4252 conditional_stack_push(cs, IFSTATE_FALSE);
4253 break;
4254 case META_ELIF:
4255 if (conditional_stack_empty(cs))
4256 ConditionError(ps.desc, i + 1, "\\elif without matching \\if");
4257 if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
4258 ConditionError(ps.desc, i + 1, "\\elif after \\else");
4259 break;
4260 case META_ELSE:
4261 if (conditional_stack_empty(cs))
4262 ConditionError(ps.desc, i + 1, "\\else without matching \\if");
4263 if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
4264 ConditionError(ps.desc, i + 1, "\\else after \\else");
4265 conditional_stack_poke(cs, IFSTATE_ELSE_FALSE);
4266 break;
4267 case META_ENDIF:
4268 if (!conditional_stack_pop(cs))
4269 ConditionError(ps.desc, i + 1, "\\endif without matching \\if");
4270 break;
4271 default:
4272 /* ignore anything else... */
4273 break;
4274 }
4275 }
4276 }
4277 if (!conditional_stack_empty(cs))
4278 ConditionError(ps.desc, i + 1, "\\if without matching \\endif");
4279 conditional_stack_destroy(cs);
4280 }
4281
4282 /*
4283 * Parse a script (either the contents of a file, or a built-in script)
4284 * and add it to the list of scripts.
4285 */
4286 static void
ParseScript(const char * script,const char * desc,int weight)4287 ParseScript(const char *script, const char *desc, int weight)
4288 {
4289 ParsedScript ps;
4290 PsqlScanState sstate;
4291 PQExpBufferData line_buf;
4292 int alloc_num;
4293 int index;
4294
4295 #define COMMANDS_ALLOC_NUM 128
4296 alloc_num = COMMANDS_ALLOC_NUM;
4297
4298 /* Initialize all fields of ps */
4299 ps.desc = desc;
4300 ps.weight = weight;
4301 ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
4302 initStats(&ps.stats, 0);
4303
4304 /* Prepare to parse script */
4305 sstate = psql_scan_create(&pgbench_callbacks);
4306
4307 /*
4308 * Ideally, we'd scan scripts using the encoding and stdstrings settings
4309 * we get from a DB connection. However, without major rearrangement of
4310 * pgbench's argument parsing, we can't have a DB connection at the time
4311 * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
4312 * with any backend-safe encoding, though conceivably we could be fooled
4313 * if a script file uses a client-only encoding. We also assume that
4314 * stdstrings should be true, which is a bit riskier.
4315 */
4316 psql_scan_setup(sstate, script, strlen(script), 0, true);
4317
4318 initPQExpBuffer(&line_buf);
4319
4320 index = 0;
4321
4322 for (;;)
4323 {
4324 PsqlScanResult sr;
4325 promptStatus_t prompt;
4326 Command *command;
4327
4328 resetPQExpBuffer(&line_buf);
4329
4330 sr = psql_scan(sstate, &line_buf, &prompt);
4331
4332 /* If we collected a SQL command, process that */
4333 command = process_sql_command(&line_buf, desc);
4334 if (command)
4335 {
4336 ps.commands[index] = command;
4337 index++;
4338
4339 if (index >= alloc_num)
4340 {
4341 alloc_num += COMMANDS_ALLOC_NUM;
4342 ps.commands = (Command **)
4343 pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
4344 }
4345 }
4346
4347 /* If we reached a backslash, process that */
4348 if (sr == PSCAN_BACKSLASH)
4349 {
4350 command = process_backslash_command(sstate, desc);
4351 if (command)
4352 {
4353 ps.commands[index] = command;
4354 index++;
4355
4356 if (index >= alloc_num)
4357 {
4358 alloc_num += COMMANDS_ALLOC_NUM;
4359 ps.commands = (Command **)
4360 pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
4361 }
4362 }
4363 }
4364
4365 /* Done if we reached EOF */
4366 if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
4367 break;
4368 }
4369
4370 ps.commands[index] = NULL;
4371
4372 addScript(ps);
4373
4374 termPQExpBuffer(&line_buf);
4375 psql_scan_finish(sstate);
4376 psql_scan_destroy(sstate);
4377 }
4378
4379 /*
4380 * Read the entire contents of file fd, and return it in a malloc'd buffer.
4381 *
4382 * The buffer will typically be larger than necessary, but we don't care
4383 * in this program, because we'll free it as soon as we've parsed the script.
4384 */
4385 static char *
read_file_contents(FILE * fd)4386 read_file_contents(FILE *fd)
4387 {
4388 char *buf;
4389 size_t buflen = BUFSIZ;
4390 size_t used = 0;
4391
4392 buf = (char *) pg_malloc(buflen);
4393
4394 for (;;)
4395 {
4396 size_t nread;
4397
4398 nread = fread(buf + used, 1, BUFSIZ, fd);
4399 used += nread;
4400 /* If fread() read less than requested, must be EOF or error */
4401 if (nread < BUFSIZ)
4402 break;
4403 /* Enlarge buf so we can read some more */
4404 buflen += BUFSIZ;
4405 buf = (char *) pg_realloc(buf, buflen);
4406 }
4407 /* There is surely room for a terminator */
4408 buf[used] = '\0';
4409
4410 return buf;
4411 }
4412
4413 /*
4414 * Given a file name, read it and add its script to the list.
4415 * "-" means to read stdin.
4416 * NB: filename must be storage that won't disappear.
4417 */
4418 static void
process_file(const char * filename,int weight)4419 process_file(const char *filename, int weight)
4420 {
4421 FILE *fd;
4422 char *buf;
4423
4424 /* Slurp the file contents into "buf" */
4425 if (strcmp(filename, "-") == 0)
4426 fd = stdin;
4427 else if ((fd = fopen(filename, "r")) == NULL)
4428 {
4429 fprintf(stderr, "could not open file \"%s\": %s\n",
4430 filename, strerror(errno));
4431 exit(1);
4432 }
4433
4434 buf = read_file_contents(fd);
4435
4436 if (ferror(fd))
4437 {
4438 fprintf(stderr, "could not read file \"%s\": %s\n",
4439 filename, strerror(errno));
4440 exit(1);
4441 }
4442
4443 if (fd != stdin)
4444 fclose(fd);
4445
4446 ParseScript(buf, filename, weight);
4447
4448 free(buf);
4449 }
4450
4451 /* Parse the given builtin script and add it to the list. */
4452 static void
process_builtin(const BuiltinScript * bi,int weight)4453 process_builtin(const BuiltinScript *bi, int weight)
4454 {
4455 ParseScript(bi->script, bi->desc, weight);
4456 }
4457
4458 /* show available builtin scripts */
4459 static void
listAvailableScripts(void)4460 listAvailableScripts(void)
4461 {
4462 int i;
4463
4464 fprintf(stderr, "Available builtin scripts:\n");
4465 for (i = 0; i < lengthof(builtin_script); i++)
4466 fprintf(stderr, "\t%s\n", builtin_script[i].name);
4467 fprintf(stderr, "\n");
4468 }
4469
4470 /* return builtin script "name" if unambiguous, fails if not found */
4471 static const BuiltinScript *
findBuiltin(const char * name)4472 findBuiltin(const char *name)
4473 {
4474 int i,
4475 found = 0,
4476 len = strlen(name);
4477 const BuiltinScript *result = NULL;
4478
4479 for (i = 0; i < lengthof(builtin_script); i++)
4480 {
4481 if (strncmp(builtin_script[i].name, name, len) == 0)
4482 {
4483 result = &builtin_script[i];
4484 found++;
4485 }
4486 }
4487
4488 /* ok, unambiguous result */
4489 if (found == 1)
4490 return result;
4491
4492 /* error cases */
4493 if (found == 0)
4494 fprintf(stderr, "no builtin script found for name \"%s\"\n", name);
4495 else /* found > 1 */
4496 fprintf(stderr,
4497 "ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name);
4498
4499 listAvailableScripts();
4500 exit(1);
4501 }
4502
4503 /*
4504 * Determine the weight specification from a script option (-b, -f), if any,
4505 * and return it as an integer (1 is returned if there's no weight). The
4506 * script name is returned in *script as a malloc'd string.
4507 */
4508 static int
parseScriptWeight(const char * option,char ** script)4509 parseScriptWeight(const char *option, char **script)
4510 {
4511 char *sep;
4512 int weight;
4513
4514 if ((sep = strrchr(option, WSEP)))
4515 {
4516 int namelen = sep - option;
4517 long wtmp;
4518 char *badp;
4519
4520 /* generate the script name */
4521 *script = pg_malloc(namelen + 1);
4522 strncpy(*script, option, namelen);
4523 (*script)[namelen] = '\0';
4524
4525 /* process digits of the weight spec */
4526 errno = 0;
4527 wtmp = strtol(sep + 1, &badp, 10);
4528 if (errno != 0 || badp == sep + 1 || *badp != '\0')
4529 {
4530 fprintf(stderr, "invalid weight specification: %s\n", sep);
4531 exit(1);
4532 }
4533 if (wtmp > INT_MAX || wtmp < 0)
4534 {
4535 fprintf(stderr,
4536 "weight specification out of range (0 .. %u): " INT64_FORMAT "\n",
4537 INT_MAX, (int64) wtmp);
4538 exit(1);
4539 }
4540 weight = wtmp;
4541 }
4542 else
4543 {
4544 *script = pg_strdup(option);
4545 weight = 1;
4546 }
4547
4548 return weight;
4549 }
4550
4551 /* append a script to the list of scripts to process */
4552 static void
addScript(ParsedScript script)4553 addScript(ParsedScript script)
4554 {
4555 if (script.commands == NULL || script.commands[0] == NULL)
4556 {
4557 fprintf(stderr, "empty command list for script \"%s\"\n", script.desc);
4558 exit(1);
4559 }
4560
4561 if (num_scripts >= MAX_SCRIPTS)
4562 {
4563 fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS);
4564 exit(1);
4565 }
4566
4567 CheckConditional(script);
4568
4569 sql_script[num_scripts] = script;
4570 num_scripts++;
4571 }
4572
4573 static void
printSimpleStats(const char * prefix,SimpleStats * ss)4574 printSimpleStats(const char *prefix, SimpleStats *ss)
4575 {
4576 if (ss->count > 0)
4577 {
4578 double latency = ss->sum / ss->count;
4579 double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
4580
4581 printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
4582 printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
4583 }
4584 }
4585
4586 /* print out results */
4587 static void
printResults(TState * threads,StatsData * total,instr_time total_time,instr_time conn_total_time,int64 latency_late)4588 printResults(TState *threads, StatsData *total, instr_time total_time,
4589 instr_time conn_total_time, int64 latency_late)
4590 {
4591 double time_include,
4592 tps_include,
4593 tps_exclude;
4594 int64 ntx = total->cnt - total->skipped;
4595 int i,
4596 totalCacheOverflows = 0;
4597
4598 time_include = INSTR_TIME_GET_DOUBLE(total_time);
4599
4600 /* tps is about actually executed transactions */
4601 tps_include = ntx / time_include;
4602 tps_exclude = ntx /
4603 (time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
4604
4605 /* Report test parameters. */
4606 printf("transaction type: %s\n",
4607 num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
4608 printf("scaling factor: %d\n", scale);
4609 printf("query mode: %s\n", QUERYMODE[querymode]);
4610 printf("number of clients: %d\n", nclients);
4611 printf("number of threads: %d\n", nthreads);
4612 if (duration <= 0)
4613 {
4614 printf("number of transactions per client: %d\n", nxacts);
4615 printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
4616 ntx, nxacts * nclients);
4617 }
4618 else
4619 {
4620 printf("duration: %d s\n", duration);
4621 printf("number of transactions actually processed: " INT64_FORMAT "\n",
4622 ntx);
4623 }
4624 /* Report zipfian cache overflow */
4625 for (i = 0; i < nthreads; i++)
4626 {
4627 totalCacheOverflows += threads[i].zipf_cache.overflowCount;
4628 }
4629 if (totalCacheOverflows > 0)
4630 {
4631 printf("zipfian cache array overflowed %d time(s)\n", totalCacheOverflows);
4632 }
4633
4634 /* Remaining stats are nonsensical if we failed to execute any xacts */
4635 if (total->cnt <= 0)
4636 return;
4637
4638 if (throttle_delay && latency_limit)
4639 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
4640 total->skipped,
4641 100.0 * total->skipped / total->cnt);
4642
4643 if (latency_limit)
4644 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f %%)\n",
4645 latency_limit / 1000.0, latency_late, ntx,
4646 (ntx > 0) ? 100.0 * latency_late / ntx : 0.0);
4647
4648 if (throttle_delay || progress || latency_limit)
4649 printSimpleStats("latency", &total->latency);
4650 else
4651 {
4652 /* no measurement, show average latency computed from run time */
4653 printf("latency average = %.3f ms\n",
4654 1000.0 * time_include * nclients / total->cnt);
4655 }
4656
4657 if (throttle_delay)
4658 {
4659 /*
4660 * Report average transaction lag under rate limit throttling. This
4661 * is the delay between scheduled and actual start times for the
4662 * transaction. The measured lag may be caused by thread/client load,
4663 * the database load, or the Poisson throttling process.
4664 */
4665 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
4666 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
4667 }
4668
4669 printf("tps = %f (including connections establishing)\n", tps_include);
4670 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
4671
4672 /* Report per-script/command statistics */
4673 if (per_script_stats || is_latencies)
4674 {
4675 int i;
4676
4677 for (i = 0; i < num_scripts; i++)
4678 {
4679 if (per_script_stats)
4680 {
4681 StatsData *sstats = &sql_script[i].stats;
4682
4683 printf("SQL script %d: %s\n"
4684 " - weight: %d (targets %.1f%% of total)\n"
4685 " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
4686 i + 1, sql_script[i].desc,
4687 sql_script[i].weight,
4688 100.0 * sql_script[i].weight / total_weight,
4689 sstats->cnt,
4690 100.0 * sstats->cnt / total->cnt,
4691 (sstats->cnt - sstats->skipped) / time_include);
4692
4693 if (throttle_delay && latency_limit && sstats->cnt > 0)
4694 printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
4695 sstats->skipped,
4696 100.0 * sstats->skipped / sstats->cnt);
4697
4698 printSimpleStats(" - latency", &sstats->latency);
4699 }
4700
4701 /* Report per-command latencies */
4702 if (is_latencies)
4703 {
4704 Command **commands;
4705
4706 if (per_script_stats)
4707 printf(" - statement latencies in milliseconds:\n");
4708 else
4709 printf("statement latencies in milliseconds:\n");
4710
4711 for (commands = sql_script[i].commands;
4712 *commands != NULL;
4713 commands++)
4714 {
4715 SimpleStats *cstats = &(*commands)->stats;
4716
4717 printf(" %11.3f %s\n",
4718 (cstats->count > 0) ?
4719 1000.0 * cstats->sum / cstats->count : 0.0,
4720 (*commands)->line);
4721 }
4722 }
4723 }
4724 }
4725 }
4726
4727 /*
4728 * Set up a random seed according to seed parameter (NULL means default),
4729 * and initialize base_random_sequence for use in initializing other sequences.
4730 */
4731 static bool
set_random_seed(const char * seed)4732 set_random_seed(const char *seed)
4733 {
4734 uint64 iseed;
4735
4736 if (seed == NULL || strcmp(seed, "time") == 0)
4737 {
4738 /* rely on current time */
4739 instr_time now;
4740
4741 INSTR_TIME_SET_CURRENT(now);
4742 iseed = (uint64) INSTR_TIME_GET_MICROSEC(now);
4743 }
4744 else if (strcmp(seed, "rand") == 0)
4745 {
4746 /* use some "strong" random source */
4747 #ifdef HAVE_STRONG_RANDOM
4748 if (!pg_strong_random(&iseed, sizeof(iseed)))
4749 #endif
4750 {
4751 fprintf(stderr,
4752 "cannot seed random from a strong source, none available: "
4753 "use \"time\" or an unsigned integer value.\n");
4754 return false;
4755 }
4756 }
4757 else
4758 {
4759 /* parse unsigned-int seed value */
4760 unsigned long ulseed;
4761 char garbage;
4762
4763 /* Don't try to use UINT64_FORMAT here; it might not work for sscanf */
4764 if (sscanf(seed, "%lu%c", &ulseed, &garbage) != 1)
4765 {
4766 fprintf(stderr,
4767 "unrecognized random seed option \"%s\": expecting an unsigned integer, \"time\" or \"rand\"\n",
4768 seed);
4769 return false;
4770 }
4771 iseed = (uint64) ulseed;
4772 }
4773
4774 if (seed != NULL)
4775 fprintf(stderr, "setting random seed to " UINT64_FORMAT "\n", iseed);
4776 random_seed = iseed;
4777
4778 /* Fill base_random_sequence with low-order bits of seed */
4779 base_random_sequence[0] = iseed & 0xFFFF;
4780 base_random_sequence[1] = (iseed >> 16) & 0xFFFF;
4781 base_random_sequence[2] = (iseed >> 32) & 0xFFFF;
4782
4783 return true;
4784 }
4785
4786
4787 int
main(int argc,char ** argv)4788 main(int argc, char **argv)
4789 {
4790 static struct option long_options[] = {
4791 /* systematic long/short named options */
4792 {"builtin", required_argument, NULL, 'b'},
4793 {"client", required_argument, NULL, 'c'},
4794 {"connect", no_argument, NULL, 'C'},
4795 {"debug", no_argument, NULL, 'd'},
4796 {"define", required_argument, NULL, 'D'},
4797 {"file", required_argument, NULL, 'f'},
4798 {"fillfactor", required_argument, NULL, 'F'},
4799 {"host", required_argument, NULL, 'h'},
4800 {"initialize", no_argument, NULL, 'i'},
4801 {"init-steps", required_argument, NULL, 'I'},
4802 {"jobs", required_argument, NULL, 'j'},
4803 {"log", no_argument, NULL, 'l'},
4804 {"latency-limit", required_argument, NULL, 'L'},
4805 {"no-vacuum", no_argument, NULL, 'n'},
4806 {"port", required_argument, NULL, 'p'},
4807 {"progress", required_argument, NULL, 'P'},
4808 {"protocol", required_argument, NULL, 'M'},
4809 {"quiet", no_argument, NULL, 'q'},
4810 {"report-latencies", no_argument, NULL, 'r'},
4811 {"rate", required_argument, NULL, 'R'},
4812 {"scale", required_argument, NULL, 's'},
4813 {"select-only", no_argument, NULL, 'S'},
4814 {"skip-some-updates", no_argument, NULL, 'N'},
4815 {"time", required_argument, NULL, 'T'},
4816 {"transactions", required_argument, NULL, 't'},
4817 {"username", required_argument, NULL, 'U'},
4818 {"vacuum-all", no_argument, NULL, 'v'},
4819 /* long-named only options */
4820 {"unlogged-tables", no_argument, NULL, 1},
4821 {"tablespace", required_argument, NULL, 2},
4822 {"index-tablespace", required_argument, NULL, 3},
4823 {"sampling-rate", required_argument, NULL, 4},
4824 {"aggregate-interval", required_argument, NULL, 5},
4825 {"progress-timestamp", no_argument, NULL, 6},
4826 {"log-prefix", required_argument, NULL, 7},
4827 {"foreign-keys", no_argument, NULL, 8},
4828 {"random-seed", required_argument, NULL, 9},
4829 {NULL, 0, NULL, 0}
4830 };
4831
4832 int c;
4833 bool is_init_mode = false; /* initialize mode? */
4834 char *initialize_steps = NULL;
4835 bool foreign_keys = false;
4836 bool is_no_vacuum = false;
4837 bool do_vacuum_accounts = false; /* vacuum accounts table? */
4838 int optindex;
4839 bool scale_given = false;
4840
4841 bool benchmarking_option_set = false;
4842 bool initialization_option_set = false;
4843 bool internal_script_used = false;
4844
4845 CState *state; /* status of clients */
4846 TState *threads; /* array of thread */
4847
4848 instr_time start_time; /* start up time */
4849 instr_time total_time;
4850 instr_time conn_total_time;
4851 int64 latency_late = 0;
4852 StatsData stats;
4853 int weight;
4854
4855 int i;
4856 int nclients_dealt;
4857
4858 #ifdef HAVE_GETRLIMIT
4859 struct rlimit rlim;
4860 #endif
4861
4862 PGconn *con;
4863 PGresult *res;
4864 char *env;
4865
4866 progname = get_progname(argv[0]);
4867
4868 if (argc > 1)
4869 {
4870 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
4871 {
4872 usage();
4873 exit(0);
4874 }
4875 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
4876 {
4877 puts("pgbench (PostgreSQL) " PG_VERSION);
4878 exit(0);
4879 }
4880 }
4881
4882 #ifdef WIN32
4883 /* stderr is buffered on Win32. */
4884 setvbuf(stderr, NULL, _IONBF, 0);
4885 #endif
4886
4887 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
4888 pghost = env;
4889 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
4890 pgport = env;
4891 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
4892 login = env;
4893
4894 state = (CState *) pg_malloc(sizeof(CState));
4895 memset(state, 0, sizeof(CState));
4896
4897 /* set random seed early, because it may be used while parsing scripts. */
4898 if (!set_random_seed(getenv("PGBENCH_RANDOM_SEED")))
4899 {
4900 fprintf(stderr, "error while setting random seed from PGBENCH_RANDOM_SEED environment variable\n");
4901 exit(1);
4902 }
4903
4904 while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
4905 {
4906 char *script;
4907
4908 switch (c)
4909 {
4910 case 'i':
4911 is_init_mode = true;
4912 break;
4913 case 'I':
4914 if (initialize_steps)
4915 pg_free(initialize_steps);
4916 initialize_steps = pg_strdup(optarg);
4917 checkInitSteps(initialize_steps);
4918 initialization_option_set = true;
4919 break;
4920 case 'h':
4921 pghost = pg_strdup(optarg);
4922 break;
4923 case 'n':
4924 is_no_vacuum = true;
4925 break;
4926 case 'v':
4927 benchmarking_option_set = true;
4928 do_vacuum_accounts = true;
4929 break;
4930 case 'p':
4931 pgport = pg_strdup(optarg);
4932 break;
4933 case 'd':
4934 debug++;
4935 break;
4936 case 'c':
4937 benchmarking_option_set = true;
4938 nclients = atoi(optarg);
4939 if (nclients <= 0 || nclients > MAXCLIENTS)
4940 {
4941 fprintf(stderr, "invalid number of clients: \"%s\"\n",
4942 optarg);
4943 exit(1);
4944 }
4945 #ifdef HAVE_GETRLIMIT
4946 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
4947 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
4948 #else /* but BSD doesn't ... */
4949 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
4950 #endif /* RLIMIT_NOFILE */
4951 {
4952 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
4953 exit(1);
4954 }
4955 if (rlim.rlim_cur < nclients + 3)
4956 {
4957 fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
4958 nclients + 3, (long) rlim.rlim_cur);
4959 fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
4960 exit(1);
4961 }
4962 #endif /* HAVE_GETRLIMIT */
4963 break;
4964 case 'j': /* jobs */
4965 benchmarking_option_set = true;
4966 nthreads = atoi(optarg);
4967 if (nthreads <= 0)
4968 {
4969 fprintf(stderr, "invalid number of threads: \"%s\"\n",
4970 optarg);
4971 exit(1);
4972 }
4973 #ifndef ENABLE_THREAD_SAFETY
4974 if (nthreads != 1)
4975 {
4976 fprintf(stderr, "threads are not supported on this platform; use -j1\n");
4977 exit(1);
4978 }
4979 #endif /* !ENABLE_THREAD_SAFETY */
4980 break;
4981 case 'C':
4982 benchmarking_option_set = true;
4983 is_connect = true;
4984 break;
4985 case 'r':
4986 benchmarking_option_set = true;
4987 is_latencies = true;
4988 break;
4989 case 's':
4990 scale_given = true;
4991 scale = atoi(optarg);
4992 if (scale <= 0)
4993 {
4994 fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
4995 exit(1);
4996 }
4997 break;
4998 case 't':
4999 benchmarking_option_set = true;
5000 nxacts = atoi(optarg);
5001 if (nxacts <= 0)
5002 {
5003 fprintf(stderr, "invalid number of transactions: \"%s\"\n",
5004 optarg);
5005 exit(1);
5006 }
5007 break;
5008 case 'T':
5009 benchmarking_option_set = true;
5010 duration = atoi(optarg);
5011 if (duration <= 0)
5012 {
5013 fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
5014 exit(1);
5015 }
5016 break;
5017 case 'U':
5018 login = pg_strdup(optarg);
5019 break;
5020 case 'l':
5021 benchmarking_option_set = true;
5022 use_log = true;
5023 break;
5024 case 'q':
5025 initialization_option_set = true;
5026 use_quiet = true;
5027 break;
5028 case 'b':
5029 if (strcmp(optarg, "list") == 0)
5030 {
5031 listAvailableScripts();
5032 exit(0);
5033 }
5034 weight = parseScriptWeight(optarg, &script);
5035 process_builtin(findBuiltin(script), weight);
5036 benchmarking_option_set = true;
5037 internal_script_used = true;
5038 break;
5039 case 'S':
5040 process_builtin(findBuiltin("select-only"), 1);
5041 benchmarking_option_set = true;
5042 internal_script_used = true;
5043 break;
5044 case 'N':
5045 process_builtin(findBuiltin("simple-update"), 1);
5046 benchmarking_option_set = true;
5047 internal_script_used = true;
5048 break;
5049 case 'f':
5050 weight = parseScriptWeight(optarg, &script);
5051 process_file(script, weight);
5052 benchmarking_option_set = true;
5053 break;
5054 case 'D':
5055 {
5056 char *p;
5057
5058 benchmarking_option_set = true;
5059
5060 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
5061 {
5062 fprintf(stderr, "invalid variable definition: \"%s\"\n",
5063 optarg);
5064 exit(1);
5065 }
5066
5067 *p++ = '\0';
5068 if (!putVariable(&state[0], "option", optarg, p))
5069 exit(1);
5070 }
5071 break;
5072 case 'F':
5073 initialization_option_set = true;
5074 fillfactor = atoi(optarg);
5075 if (fillfactor < 10 || fillfactor > 100)
5076 {
5077 fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
5078 exit(1);
5079 }
5080 break;
5081 case 'M':
5082 benchmarking_option_set = true;
5083 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
5084 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
5085 break;
5086 if (querymode >= NUM_QUERYMODE)
5087 {
5088 fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
5089 optarg);
5090 exit(1);
5091 }
5092 break;
5093 case 'P':
5094 benchmarking_option_set = true;
5095 progress = atoi(optarg);
5096 if (progress <= 0)
5097 {
5098 fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
5099 optarg);
5100 exit(1);
5101 }
5102 break;
5103 case 'R':
5104 {
5105 /* get a double from the beginning of option value */
5106 double throttle_value = atof(optarg);
5107
5108 benchmarking_option_set = true;
5109
5110 if (throttle_value <= 0.0)
5111 {
5112 fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
5113 exit(1);
5114 }
5115 /* Invert rate limit into a time offset */
5116 throttle_delay = (int64) (1000000.0 / throttle_value);
5117 }
5118 break;
5119 case 'L':
5120 {
5121 double limit_ms = atof(optarg);
5122
5123 if (limit_ms <= 0.0)
5124 {
5125 fprintf(stderr, "invalid latency limit: \"%s\"\n",
5126 optarg);
5127 exit(1);
5128 }
5129 benchmarking_option_set = true;
5130 latency_limit = (int64) (limit_ms * 1000);
5131 }
5132 break;
5133 case 1: /* unlogged-tables */
5134 initialization_option_set = true;
5135 unlogged_tables = true;
5136 break;
5137 case 2: /* tablespace */
5138 initialization_option_set = true;
5139 tablespace = pg_strdup(optarg);
5140 break;
5141 case 3: /* index-tablespace */
5142 initialization_option_set = true;
5143 index_tablespace = pg_strdup(optarg);
5144 break;
5145 case 4: /* sampling-rate */
5146 benchmarking_option_set = true;
5147 sample_rate = atof(optarg);
5148 if (sample_rate <= 0.0 || sample_rate > 1.0)
5149 {
5150 fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
5151 exit(1);
5152 }
5153 break;
5154 case 5: /* aggregate-interval */
5155 benchmarking_option_set = true;
5156 agg_interval = atoi(optarg);
5157 if (agg_interval <= 0)
5158 {
5159 fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
5160 optarg);
5161 exit(1);
5162 }
5163 break;
5164 case 6: /* progress-timestamp */
5165 progress_timestamp = true;
5166 benchmarking_option_set = true;
5167 break;
5168 case 7: /* log-prefix */
5169 benchmarking_option_set = true;
5170 logfile_prefix = pg_strdup(optarg);
5171 break;
5172 case 8: /* foreign-keys */
5173 initialization_option_set = true;
5174 foreign_keys = true;
5175 break;
5176 case 9: /* random-seed */
5177 benchmarking_option_set = true;
5178 if (!set_random_seed(optarg))
5179 {
5180 fprintf(stderr, "error while setting random seed from --random-seed option\n");
5181 exit(1);
5182 }
5183 break;
5184 default:
5185 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
5186 exit(1);
5187 break;
5188 }
5189 }
5190
5191 /* set default script if none */
5192 if (num_scripts == 0 && !is_init_mode)
5193 {
5194 process_builtin(findBuiltin("tpcb-like"), 1);
5195 benchmarking_option_set = true;
5196 internal_script_used = true;
5197 }
5198
5199 /* if not simple query mode, parse the script(s) to find parameters */
5200 if (querymode != QUERY_SIMPLE)
5201 {
5202 for (i = 0; i < num_scripts; i++)
5203 {
5204 Command **commands = sql_script[i].commands;
5205 int j;
5206
5207 for (j = 0; commands[j] != NULL; j++)
5208 {
5209 if (commands[j]->type != SQL_COMMAND)
5210 continue;
5211 if (!parseQuery(commands[j]))
5212 exit(1);
5213 }
5214 }
5215 }
5216
5217 /* compute total_weight */
5218 for (i = 0; i < num_scripts; i++)
5219 /* cannot overflow: weight is 32b, total_weight 64b */
5220 total_weight += sql_script[i].weight;
5221
5222 if (total_weight == 0 && !is_init_mode)
5223 {
5224 fprintf(stderr, "total script weight must not be zero\n");
5225 exit(1);
5226 }
5227
5228 /* show per script stats if several scripts are used */
5229 if (num_scripts > 1)
5230 per_script_stats = true;
5231
5232 /*
5233 * Don't need more threads than there are clients. (This is not merely an
5234 * optimization; throttle_delay is calculated incorrectly below if some
5235 * threads have no clients assigned to them.)
5236 */
5237 if (nthreads > nclients)
5238 nthreads = nclients;
5239
5240 /* compute a per thread delay */
5241 throttle_delay *= nthreads;
5242
5243 if (argc > optind)
5244 dbName = argv[optind];
5245 else
5246 {
5247 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
5248 dbName = env;
5249 else if (login != NULL && *login != '\0')
5250 dbName = login;
5251 else
5252 dbName = "";
5253 }
5254
5255 if (is_init_mode)
5256 {
5257 if (benchmarking_option_set)
5258 {
5259 fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
5260 exit(1);
5261 }
5262
5263 if (initialize_steps == NULL)
5264 initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
5265
5266 if (is_no_vacuum)
5267 {
5268 /* Remove any vacuum step in initialize_steps */
5269 char *p;
5270
5271 while ((p = strchr(initialize_steps, 'v')) != NULL)
5272 *p = ' ';
5273 }
5274
5275 if (foreign_keys)
5276 {
5277 /* Add 'f' to end of initialize_steps, if not already there */
5278 if (strchr(initialize_steps, 'f') == NULL)
5279 {
5280 initialize_steps = (char *)
5281 pg_realloc(initialize_steps,
5282 strlen(initialize_steps) + 2);
5283 strcat(initialize_steps, "f");
5284 }
5285 }
5286
5287 runInitSteps(initialize_steps);
5288 exit(0);
5289 }
5290 else
5291 {
5292 if (initialization_option_set)
5293 {
5294 fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
5295 exit(1);
5296 }
5297 }
5298
5299 if (nxacts > 0 && duration > 0)
5300 {
5301 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
5302 exit(1);
5303 }
5304
5305 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
5306 if (nxacts <= 0 && duration <= 0)
5307 nxacts = DEFAULT_NXACTS;
5308
5309 /* --sampling-rate may be used only with -l */
5310 if (sample_rate > 0.0 && !use_log)
5311 {
5312 fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
5313 exit(1);
5314 }
5315
5316 /* --sampling-rate may not be used with --aggregate-interval */
5317 if (sample_rate > 0.0 && agg_interval > 0)
5318 {
5319 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
5320 exit(1);
5321 }
5322
5323 if (agg_interval > 0 && !use_log)
5324 {
5325 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
5326 exit(1);
5327 }
5328
5329 if (!use_log && logfile_prefix)
5330 {
5331 fprintf(stderr, "log file prefix (--log-prefix) is allowed only when logging transactions (-l)\n");
5332 exit(1);
5333 }
5334
5335 if (duration > 0 && agg_interval > duration)
5336 {
5337 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
5338 exit(1);
5339 }
5340
5341 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
5342 {
5343 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
5344 exit(1);
5345 }
5346
5347 if (progress_timestamp && progress == 0)
5348 {
5349 fprintf(stderr, "--progress-timestamp is allowed only under --progress\n");
5350 exit(1);
5351 }
5352
5353 /*
5354 * save main process id in the global variable because process id will be
5355 * changed after fork.
5356 */
5357 main_pid = (int) getpid();
5358
5359 if (nclients > 1)
5360 {
5361 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
5362 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
5363
5364 /* copy any -D switch values to all clients */
5365 for (i = 1; i < nclients; i++)
5366 {
5367 int j;
5368
5369 state[i].id = i;
5370 for (j = 0; j < state[0].nvariables; j++)
5371 {
5372 Variable *var = &state[0].variables[j];
5373
5374 if (var->value.type != PGBT_NO_VALUE)
5375 {
5376 if (!putVariableValue(&state[i], "startup",
5377 var->name, &var->value))
5378 exit(1);
5379 }
5380 else
5381 {
5382 if (!putVariable(&state[i], "startup",
5383 var->name, var->svalue))
5384 exit(1);
5385 }
5386 }
5387 }
5388 }
5389
5390 /* other CState initializations */
5391 for (i = 0; i < nclients; i++)
5392 {
5393 state[i].cstack = conditional_stack_create();
5394 }
5395
5396 if (debug)
5397 {
5398 if (duration <= 0)
5399 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
5400 pghost, pgport, nclients, nxacts, dbName);
5401 else
5402 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
5403 pghost, pgport, nclients, duration, dbName);
5404 }
5405
5406 /* opening connection... */
5407 con = doConnect();
5408 if (con == NULL)
5409 exit(1);
5410
5411 if (internal_script_used)
5412 {
5413 /*
5414 * get the scaling factor that should be same as count(*) from
5415 * pgbench_branches if this is not a custom query
5416 */
5417 res = PQexec(con, "select count(*) from pgbench_branches");
5418 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5419 {
5420 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
5421
5422 fprintf(stderr, "%s", PQerrorMessage(con));
5423 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
5424 {
5425 fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
5426 }
5427
5428 exit(1);
5429 }
5430 scale = atoi(PQgetvalue(res, 0, 0));
5431 if (scale < 0)
5432 {
5433 fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
5434 PQgetvalue(res, 0, 0));
5435 exit(1);
5436 }
5437 PQclear(res);
5438
5439 /* warn if we override user-given -s switch */
5440 if (scale_given)
5441 fprintf(stderr,
5442 "scale option ignored, using count from pgbench_branches table (%d)\n",
5443 scale);
5444 }
5445
5446 /*
5447 * :scale variables normally get -s or database scale, but don't override
5448 * an explicit -D switch
5449 */
5450 if (lookupVariable(&state[0], "scale") == NULL)
5451 {
5452 for (i = 0; i < nclients; i++)
5453 {
5454 if (!putVariableInt(&state[i], "startup", "scale", scale))
5455 exit(1);
5456 }
5457 }
5458
5459 /*
5460 * Define a :client_id variable that is unique per connection. But don't
5461 * override an explicit -D switch.
5462 */
5463 if (lookupVariable(&state[0], "client_id") == NULL)
5464 {
5465 for (i = 0; i < nclients; i++)
5466 if (!putVariableInt(&state[i], "startup", "client_id", i))
5467 exit(1);
5468 }
5469
5470 /* set default seed for hash functions */
5471 if (lookupVariable(&state[0], "default_seed") == NULL)
5472 {
5473 uint64 seed =
5474 ((uint64) pg_jrand48(base_random_sequence) & 0xFFFFFFFF) |
5475 (((uint64) pg_jrand48(base_random_sequence) & 0xFFFFFFFF) << 32);
5476
5477 for (i = 0; i < nclients; i++)
5478 if (!putVariableInt(&state[i], "startup", "default_seed", (int64) seed))
5479 exit(1);
5480 }
5481
5482 /* set random seed unless overwritten */
5483 if (lookupVariable(&state[0], "random_seed") == NULL)
5484 {
5485 for (i = 0; i < nclients; i++)
5486 if (!putVariableInt(&state[i], "startup", "random_seed", random_seed))
5487 exit(1);
5488 }
5489
5490 if (!is_no_vacuum)
5491 {
5492 fprintf(stderr, "starting vacuum...");
5493 tryExecuteStatement(con, "vacuum pgbench_branches");
5494 tryExecuteStatement(con, "vacuum pgbench_tellers");
5495 tryExecuteStatement(con, "truncate pgbench_history");
5496 fprintf(stderr, "end.\n");
5497
5498 if (do_vacuum_accounts)
5499 {
5500 fprintf(stderr, "starting vacuum pgbench_accounts...");
5501 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
5502 fprintf(stderr, "end.\n");
5503 }
5504 }
5505 PQfinish(con);
5506
5507 /* set up thread data structures */
5508 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
5509 nclients_dealt = 0;
5510
5511 for (i = 0; i < nthreads; i++)
5512 {
5513 TState *thread = &threads[i];
5514
5515 thread->tid = i;
5516 thread->state = &state[nclients_dealt];
5517 thread->nstate =
5518 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
5519 thread->random_state[0] = (unsigned short)
5520 (pg_jrand48(base_random_sequence) & 0xFFFF);
5521 thread->random_state[1] = (unsigned short)
5522 (pg_jrand48(base_random_sequence) & 0xFFFF);
5523 thread->random_state[2] = (unsigned short)
5524 (pg_jrand48(base_random_sequence) & 0xFFFF);
5525 thread->logfile = NULL; /* filled in later */
5526 thread->latency_late = 0;
5527 thread->zipf_cache.nb_cells = 0;
5528 thread->zipf_cache.current = 0;
5529 thread->zipf_cache.overflowCount = 0;
5530 initStats(&thread->stats, 0);
5531
5532 nclients_dealt += thread->nstate;
5533 }
5534
5535 /* all clients must be assigned to a thread */
5536 Assert(nclients_dealt == nclients);
5537
5538 /* get start up time */
5539 INSTR_TIME_SET_CURRENT(start_time);
5540
5541 /* set alarm if duration is specified. */
5542 if (duration > 0)
5543 setalarm(duration);
5544
5545 /* start threads */
5546 #ifdef ENABLE_THREAD_SAFETY
5547 for (i = 0; i < nthreads; i++)
5548 {
5549 TState *thread = &threads[i];
5550
5551 INSTR_TIME_SET_CURRENT(thread->start_time);
5552
5553 /* compute when to stop */
5554 if (duration > 0)
5555 end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
5556 (int64) 1000000 * duration;
5557
5558 /* the first thread (i = 0) is executed by main thread */
5559 if (i > 0)
5560 {
5561 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
5562
5563 if (err != 0 || thread->thread == INVALID_THREAD)
5564 {
5565 fprintf(stderr, "could not create thread: %s\n", strerror(err));
5566 exit(1);
5567 }
5568 }
5569 else
5570 {
5571 thread->thread = INVALID_THREAD;
5572 }
5573 }
5574 #else
5575 INSTR_TIME_SET_CURRENT(threads[0].start_time);
5576 /* compute when to stop */
5577 if (duration > 0)
5578 end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
5579 (int64) 1000000 * duration;
5580 threads[0].thread = INVALID_THREAD;
5581 #endif /* ENABLE_THREAD_SAFETY */
5582
5583 /* wait for threads and accumulate results */
5584 initStats(&stats, 0);
5585 INSTR_TIME_SET_ZERO(conn_total_time);
5586 for (i = 0; i < nthreads; i++)
5587 {
5588 TState *thread = &threads[i];
5589
5590 #ifdef ENABLE_THREAD_SAFETY
5591 if (threads[i].thread == INVALID_THREAD)
5592 /* actually run this thread directly in the main thread */
5593 (void) threadRun(thread);
5594 else
5595 /* wait of other threads. should check that 0 is returned? */
5596 pthread_join(thread->thread, NULL);
5597 #else
5598 (void) threadRun(thread);
5599 #endif /* ENABLE_THREAD_SAFETY */
5600
5601 /* aggregate thread level stats */
5602 mergeSimpleStats(&stats.latency, &thread->stats.latency);
5603 mergeSimpleStats(&stats.lag, &thread->stats.lag);
5604 stats.cnt += thread->stats.cnt;
5605 stats.skipped += thread->stats.skipped;
5606 latency_late += thread->latency_late;
5607 INSTR_TIME_ADD(conn_total_time, thread->conn_time);
5608 }
5609 disconnect_all(state, nclients);
5610
5611 /*
5612 * XXX We compute results as though every client of every thread started
5613 * and finished at the same time. That model can diverge noticeably from
5614 * reality for a short benchmark run involving relatively many threads.
5615 * The first thread may process notably many transactions before the last
5616 * thread begins. Improving the model alone would bring limited benefit,
5617 * because performance during those periods of partial thread count can
5618 * easily exceed steady state performance. This is one of the many ways
5619 * short runs convey deceptive performance figures.
5620 */
5621 INSTR_TIME_SET_CURRENT(total_time);
5622 INSTR_TIME_SUBTRACT(total_time, start_time);
5623 printResults(threads, &stats, total_time, conn_total_time, latency_late);
5624
5625 return 0;
5626 }
5627
5628 static void *
threadRun(void * arg)5629 threadRun(void *arg)
5630 {
5631 TState *thread = (TState *) arg;
5632 CState *state = thread->state;
5633 instr_time start,
5634 end;
5635 int nstate = thread->nstate;
5636 int remains = nstate; /* number of remaining clients */
5637 int i;
5638
5639 /* for reporting progress: */
5640 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
5641 int64 last_report = thread_start;
5642 int64 next_report = last_report + (int64) progress * 1000000;
5643 StatsData last,
5644 aggs;
5645
5646 /*
5647 * Initialize throttling rate target for all of the thread's clients. It
5648 * might be a little more accurate to reset thread->start_time here too.
5649 * The possible drift seems too small relative to typical throttle delay
5650 * times to worry about it.
5651 */
5652 INSTR_TIME_SET_CURRENT(start);
5653 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
5654
5655 INSTR_TIME_SET_ZERO(thread->conn_time);
5656
5657 initStats(&aggs, time(NULL));
5658 last = aggs;
5659
5660 /* open log file if requested */
5661 if (use_log)
5662 {
5663 char logpath[MAXPGPATH];
5664 char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
5665
5666 if (thread->tid == 0)
5667 snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
5668 else
5669 snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
5670
5671 thread->logfile = fopen(logpath, "w");
5672
5673 if (thread->logfile == NULL)
5674 {
5675 fprintf(stderr, "could not open logfile \"%s\": %s\n",
5676 logpath, strerror(errno));
5677 goto done;
5678 }
5679 }
5680
5681 if (!is_connect)
5682 {
5683 /* make connections to the database */
5684 for (i = 0; i < nstate; i++)
5685 {
5686 if ((state[i].con = doConnect()) == NULL)
5687 goto done;
5688 }
5689 }
5690
5691 /* time after thread and connections set up */
5692 INSTR_TIME_SET_CURRENT(thread->conn_time);
5693 INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
5694
5695 /* explicitly initialize the state machines */
5696 for (i = 0; i < nstate; i++)
5697 {
5698 state[i].state = CSTATE_CHOOSE_SCRIPT;
5699 }
5700
5701 /* loop till all clients have terminated */
5702 while (remains > 0)
5703 {
5704 fd_set input_mask;
5705 int maxsock; /* max socket number to be waited for */
5706 int64 min_usec;
5707 int64 now_usec = 0; /* set this only if needed */
5708
5709 /* identify which client sockets should be checked for input */
5710 FD_ZERO(&input_mask);
5711 maxsock = -1;
5712 min_usec = PG_INT64_MAX;
5713 for (i = 0; i < nstate; i++)
5714 {
5715 CState *st = &state[i];
5716
5717 if (st->state == CSTATE_THROTTLE && timer_exceeded)
5718 {
5719 /* interrupt client that has not started a transaction */
5720 st->state = CSTATE_FINISHED;
5721 finishCon(st);
5722 remains--;
5723 }
5724 else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
5725 {
5726 /* a nap from the script, or under throttling */
5727 int64 this_usec;
5728
5729 /* get current time if needed */
5730 if (now_usec == 0)
5731 {
5732 instr_time now;
5733
5734 INSTR_TIME_SET_CURRENT(now);
5735 now_usec = INSTR_TIME_GET_MICROSEC(now);
5736 }
5737
5738 /* min_usec should be the minimum delay across all clients */
5739 this_usec = (st->state == CSTATE_SLEEP ?
5740 st->sleep_until : st->txn_scheduled) - now_usec;
5741 if (min_usec > this_usec)
5742 min_usec = this_usec;
5743 }
5744 else if (st->state == CSTATE_WAIT_RESULT)
5745 {
5746 /*
5747 * waiting for result from server - nothing to do unless the
5748 * socket is readable
5749 */
5750 int sock = PQsocket(st->con);
5751
5752 if (sock < 0)
5753 {
5754 fprintf(stderr, "invalid socket: %s",
5755 PQerrorMessage(st->con));
5756 goto done;
5757 }
5758
5759 FD_SET(sock, &input_mask);
5760 if (maxsock < sock)
5761 maxsock = sock;
5762 }
5763 else if (st->state != CSTATE_ABORTED &&
5764 st->state != CSTATE_FINISHED)
5765 {
5766 /*
5767 * This client thread is ready to do something, so we don't
5768 * want to wait. No need to examine additional clients.
5769 */
5770 min_usec = 0;
5771 break;
5772 }
5773 }
5774
5775 /* under throttling we may have finished the last client above */
5776 if (remains == 0)
5777 break;
5778
5779 /* also wake up to print the next progress report on time */
5780 if (progress && min_usec > 0 && thread->tid == 0)
5781 {
5782 /* get current time if needed */
5783 if (now_usec == 0)
5784 {
5785 instr_time now;
5786
5787 INSTR_TIME_SET_CURRENT(now);
5788 now_usec = INSTR_TIME_GET_MICROSEC(now);
5789 }
5790
5791 if (now_usec >= next_report)
5792 min_usec = 0;
5793 else if ((next_report - now_usec) < min_usec)
5794 min_usec = next_report - now_usec;
5795 }
5796
5797 /*
5798 * If no clients are ready to execute actions, sleep until we receive
5799 * data from the server, or a nap-time specified in the script ends,
5800 * or it's time to print a progress report. Update input_mask to show
5801 * which client(s) received data.
5802 */
5803 if (min_usec > 0)
5804 {
5805 int nsocks = 0; /* return from select(2) if called */
5806
5807 if (min_usec != PG_INT64_MAX)
5808 {
5809 if (maxsock != -1)
5810 {
5811 struct timeval timeout;
5812
5813 timeout.tv_sec = min_usec / 1000000;
5814 timeout.tv_usec = min_usec % 1000000;
5815 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
5816 }
5817 else /* nothing active, simple sleep */
5818 {
5819 pg_usleep(min_usec);
5820 }
5821 }
5822 else /* no explicit delay, select without timeout */
5823 {
5824 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
5825 }
5826
5827 if (nsocks < 0)
5828 {
5829 if (errno == EINTR)
5830 {
5831 /* On EINTR, go back to top of loop */
5832 continue;
5833 }
5834 /* must be something wrong */
5835 fprintf(stderr, "select() failed: %s\n", strerror(errno));
5836 goto done;
5837 }
5838 }
5839 else
5840 {
5841 /* min_usec == 0, i.e. something needs to be executed */
5842
5843 /* If we didn't call select(), don't try to read any data */
5844 FD_ZERO(&input_mask);
5845 }
5846
5847 /* ok, advance the state machine of each connection */
5848 for (i = 0; i < nstate; i++)
5849 {
5850 CState *st = &state[i];
5851
5852 if (st->state == CSTATE_WAIT_RESULT)
5853 {
5854 /* don't call doCustom unless data is available */
5855 int sock = PQsocket(st->con);
5856
5857 if (sock < 0)
5858 {
5859 fprintf(stderr, "invalid socket: %s",
5860 PQerrorMessage(st->con));
5861 goto done;
5862 }
5863
5864 if (!FD_ISSET(sock, &input_mask))
5865 continue;
5866 }
5867 else if (st->state == CSTATE_FINISHED ||
5868 st->state == CSTATE_ABORTED)
5869 {
5870 /* this client is done, no need to consider it anymore */
5871 continue;
5872 }
5873
5874 doCustom(thread, st, &aggs);
5875
5876 /* If doCustom changed client to finished state, reduce remains */
5877 if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
5878 remains--;
5879 }
5880
5881 /* progress report is made by thread 0 for all threads */
5882 if (progress && thread->tid == 0)
5883 {
5884 instr_time now_time;
5885 int64 now;
5886
5887 INSTR_TIME_SET_CURRENT(now_time);
5888 now = INSTR_TIME_GET_MICROSEC(now_time);
5889 if (now >= next_report)
5890 {
5891 /* generate and show report */
5892 StatsData cur;
5893 int64 run = now - last_report,
5894 ntx;
5895 double tps,
5896 total_run,
5897 latency,
5898 sqlat,
5899 lag,
5900 stdev;
5901 char tbuf[315];
5902
5903 /*
5904 * Add up the statistics of all threads.
5905 *
5906 * XXX: No locking. There is no guarantee that we get an
5907 * atomic snapshot of the transaction count and latencies, so
5908 * these figures can well be off by a small amount. The
5909 * progress report's purpose is to give a quick overview of
5910 * how the test is going, so that shouldn't matter too much.
5911 * (If a read from a 64-bit integer is not atomic, you might
5912 * get a "torn" read and completely bogus latencies though!)
5913 */
5914 initStats(&cur, 0);
5915 for (i = 0; i < nthreads; i++)
5916 {
5917 mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
5918 mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
5919 cur.cnt += thread[i].stats.cnt;
5920 cur.skipped += thread[i].stats.skipped;
5921 }
5922
5923 /* we count only actually executed transactions */
5924 ntx = (cur.cnt - cur.skipped) - (last.cnt - last.skipped);
5925 total_run = (now - thread_start) / 1000000.0;
5926 tps = 1000000.0 * ntx / run;
5927 if (ntx > 0)
5928 {
5929 latency = 0.001 * (cur.latency.sum - last.latency.sum) / ntx;
5930 sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2) / ntx;
5931 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
5932 lag = 0.001 * (cur.lag.sum - last.lag.sum) / ntx;
5933 }
5934 else
5935 {
5936 latency = sqlat = stdev = lag = 0;
5937 }
5938
5939 if (progress_timestamp)
5940 {
5941 /*
5942 * On some platforms the current system timestamp is
5943 * available in now_time, but rather than get entangled
5944 * with that, we just eat the cost of an extra syscall in
5945 * all cases.
5946 */
5947 struct timeval tv;
5948
5949 gettimeofday(&tv, NULL);
5950 snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s",
5951 (long) tv.tv_sec, (long) (tv.tv_usec / 1000));
5952 }
5953 else
5954 {
5955 /* round seconds are expected, but the thread may be late */
5956 snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
5957 }
5958
5959 fprintf(stderr,
5960 "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
5961 tbuf, tps, latency, stdev);
5962
5963 if (throttle_delay)
5964 {
5965 fprintf(stderr, ", lag %.3f ms", lag);
5966 if (latency_limit)
5967 fprintf(stderr, ", " INT64_FORMAT " skipped",
5968 cur.skipped - last.skipped);
5969 }
5970 fprintf(stderr, "\n");
5971
5972 last = cur;
5973 last_report = now;
5974
5975 /*
5976 * Ensure that the next report is in the future, in case
5977 * pgbench/postgres got stuck somewhere.
5978 */
5979 do
5980 {
5981 next_report += (int64) progress * 1000000;
5982 } while (now >= next_report);
5983 }
5984 }
5985 }
5986
5987 done:
5988 INSTR_TIME_SET_CURRENT(start);
5989 disconnect_all(state, nstate);
5990 INSTR_TIME_SET_CURRENT(end);
5991 INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
5992 if (thread->logfile)
5993 {
5994 if (agg_interval > 0)
5995 {
5996 /* log aggregated but not yet reported transactions */
5997 doLog(thread, state, &aggs, false, 0, 0);
5998 }
5999 fclose(thread->logfile);
6000 thread->logfile = NULL;
6001 }
6002 return NULL;
6003 }
6004
6005 static void
finishCon(CState * st)6006 finishCon(CState *st)
6007 {
6008 if (st->con != NULL)
6009 {
6010 PQfinish(st->con);
6011 st->con = NULL;
6012 }
6013 }
6014
6015 /*
6016 * Support for duration option: set timer_exceeded after so many seconds.
6017 */
6018
6019 #ifndef WIN32
6020
6021 static void
handle_sig_alarm(SIGNAL_ARGS)6022 handle_sig_alarm(SIGNAL_ARGS)
6023 {
6024 timer_exceeded = true;
6025 }
6026
6027 static void
setalarm(int seconds)6028 setalarm(int seconds)
6029 {
6030 pqsignal(SIGALRM, handle_sig_alarm);
6031 alarm(seconds);
6032 }
6033
6034 #else /* WIN32 */
6035
6036 static VOID CALLBACK
win32_timer_callback(PVOID lpParameter,BOOLEAN TimerOrWaitFired)6037 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
6038 {
6039 timer_exceeded = true;
6040 }
6041
6042 static void
setalarm(int seconds)6043 setalarm(int seconds)
6044 {
6045 HANDLE queue;
6046 HANDLE timer;
6047
6048 /* This function will be called at most once, so we can cheat a bit. */
6049 queue = CreateTimerQueue();
6050 if (seconds > ((DWORD) -1) / 1000 ||
6051 !CreateTimerQueueTimer(&timer, queue,
6052 win32_timer_callback, NULL, seconds * 1000, 0,
6053 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
6054 {
6055 fprintf(stderr, "failed to set timer\n");
6056 exit(1);
6057 }
6058 }
6059
6060 /* partial pthread implementation for Windows */
6061
6062 typedef struct win32_pthread
6063 {
6064 HANDLE handle;
6065 void *(*routine) (void *);
6066 void *arg;
6067 void *result;
6068 } win32_pthread;
6069
6070 static unsigned __stdcall
win32_pthread_run(void * arg)6071 win32_pthread_run(void *arg)
6072 {
6073 win32_pthread *th = (win32_pthread *) arg;
6074
6075 th->result = th->routine(th->arg);
6076
6077 return 0;
6078 }
6079
6080 static int
pthread_create(pthread_t * thread,pthread_attr_t * attr,void * (* start_routine)(void *),void * arg)6081 pthread_create(pthread_t *thread,
6082 pthread_attr_t *attr,
6083 void *(*start_routine) (void *),
6084 void *arg)
6085 {
6086 int save_errno;
6087 win32_pthread *th;
6088
6089 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
6090 th->routine = start_routine;
6091 th->arg = arg;
6092 th->result = NULL;
6093
6094 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
6095 if (th->handle == NULL)
6096 {
6097 save_errno = errno;
6098 free(th);
6099 return save_errno;
6100 }
6101
6102 *thread = th;
6103 return 0;
6104 }
6105
6106 static int
pthread_join(pthread_t th,void ** thread_return)6107 pthread_join(pthread_t th, void **thread_return)
6108 {
6109 if (th == NULL || th->handle == NULL)
6110 return errno = EINVAL;
6111
6112 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
6113 {
6114 _dosmaperr(GetLastError());
6115 return errno;
6116 }
6117
6118 if (thread_return)
6119 *thread_return = th->result;
6120
6121 CloseHandle(th->handle);
6122 free(th);
6123 return 0;
6124 }
6125
6126 #endif /* WIN32 */
6127