1 /*
2 * pgbench.c
3 *
4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
6 *
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2021, PostgreSQL Global Development Group
9 * ALL RIGHTS RESERVED;
10 *
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
15 *
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
21 *
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27 *
28 */
29
30 #ifdef WIN32
31 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
32 #endif
33
34 #include "postgres_fe.h"
35
36 #include <ctype.h>
37 #include <float.h>
38 #include <limits.h>
39 #include <math.h>
40 #include <signal.h>
41 #include <time.h>
42 #include <sys/time.h>
43 #ifdef HAVE_SYS_RESOURCE_H
44 #include <sys/resource.h> /* for getrlimit */
45 #endif
46
47 /* For testing, PGBENCH_USE_SELECT can be defined to force use of that code */
48 #if defined(HAVE_PPOLL) && !defined(PGBENCH_USE_SELECT)
49 #define POLL_USING_PPOLL
50 #ifdef HAVE_POLL_H
51 #include <poll.h>
52 #endif
53 #else /* no ppoll(), so use select() */
54 #define POLL_USING_SELECT
55 #ifdef HAVE_SYS_SELECT_H
56 #include <sys/select.h>
57 #endif
58 #endif
59
60 #include "common/int.h"
61 #include "common/logging.h"
62 #include "common/string.h"
63 #include "common/username.h"
64 #include "fe_utils/cancel.h"
65 #include "fe_utils/conditional.h"
66 #include "fe_utils/string_utils.h"
67 #include "getopt_long.h"
68 #include "libpq-fe.h"
69 #include "pgbench.h"
70 #include "port/pg_bitutils.h"
71 #include "portability/instr_time.h"
72
73 #ifndef M_PI
74 #define M_PI 3.14159265358979323846
75 #endif
76
77 #define ERRCODE_UNDEFINED_TABLE "42P01"
78
79 /*
80 * Hashing constants
81 */
82 #define FNV_PRIME UINT64CONST(0x100000001b3)
83 #define FNV_OFFSET_BASIS UINT64CONST(0xcbf29ce484222325)
84 #define MM2_MUL UINT64CONST(0xc6a4a7935bd1e995)
85 #define MM2_MUL_TIMES_8 UINT64CONST(0x35253c9ade8f4ca8)
86 #define MM2_ROT 47
87
88 /*
89 * Multi-platform socket set implementations
90 */
91
92 #ifdef POLL_USING_PPOLL
93 #define SOCKET_WAIT_METHOD "ppoll"
94
95 typedef struct socket_set
96 {
97 int maxfds; /* allocated length of pollfds[] array */
98 int curfds; /* number currently in use */
99 struct pollfd pollfds[FLEXIBLE_ARRAY_MEMBER];
100 } socket_set;
101
102 #endif /* POLL_USING_PPOLL */
103
104 #ifdef POLL_USING_SELECT
105 #define SOCKET_WAIT_METHOD "select"
106
107 typedef struct socket_set
108 {
109 int maxfd; /* largest FD currently set in fds */
110 fd_set fds;
111 } socket_set;
112
113 #endif /* POLL_USING_SELECT */
114
115 /*
116 * Multi-platform thread implementations
117 */
118
119 #ifdef WIN32
120 /* Use Windows threads */
121 #include <windows.h>
122 #define GETERRNO() (_dosmaperr(GetLastError()), errno)
123 #define THREAD_T HANDLE
124 #define THREAD_FUNC_RETURN_TYPE unsigned
125 #define THREAD_FUNC_RETURN return 0
126 #define THREAD_FUNC_CC __stdcall
127 #define THREAD_CREATE(handle, function, arg) \
128 ((*(handle) = (HANDLE) _beginthreadex(NULL, 0, (function), (arg), 0, NULL)) == 0 ? errno : 0)
129 #define THREAD_JOIN(handle) \
130 (WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
131 GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
132 #define THREAD_BARRIER_T SYNCHRONIZATION_BARRIER
133 #define THREAD_BARRIER_INIT(barrier, n) \
134 (InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO())
135 #define THREAD_BARRIER_WAIT(barrier) \
136 EnterSynchronizationBarrier((barrier), \
137 SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)
138 #define THREAD_BARRIER_DESTROY(barrier)
139 #elif defined(ENABLE_THREAD_SAFETY)
140 /* Use POSIX threads */
141 #include "port/pg_pthread.h"
142 #define THREAD_T pthread_t
143 #define THREAD_FUNC_RETURN_TYPE void *
144 #define THREAD_FUNC_RETURN return NULL
145 #define THREAD_FUNC_CC
146 #define THREAD_CREATE(handle, function, arg) \
147 pthread_create((handle), NULL, (function), (arg))
148 #define THREAD_JOIN(handle) \
149 pthread_join((handle), NULL)
150 #define THREAD_BARRIER_T pthread_barrier_t
151 #define THREAD_BARRIER_INIT(barrier, n) \
152 pthread_barrier_init((barrier), NULL, (n))
153 #define THREAD_BARRIER_WAIT(barrier) pthread_barrier_wait((barrier))
154 #define THREAD_BARRIER_DESTROY(barrier) pthread_barrier_destroy((barrier))
155 #else
156 /* No threads implementation, use none (-j 1) */
157 #define THREAD_T void *
158 #define THREAD_FUNC_RETURN_TYPE void *
159 #define THREAD_FUNC_RETURN return NULL
160 #define THREAD_FUNC_CC
161 #define THREAD_BARRIER_T int
162 #define THREAD_BARRIER_INIT(barrier, n) (*(barrier) = 0)
163 #define THREAD_BARRIER_WAIT(barrier)
164 #define THREAD_BARRIER_DESTROY(barrier)
165 #endif
166
167
168 /********************************************************************
169 * some configurable parameters */
170
171 #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
172 #define ALL_INIT_STEPS "dtgGvpf" /* all possible steps */
173
174 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
175 #define DEFAULT_NXACTS 10 /* default nxacts */
176
177 #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
178
179 #define MIN_ZIPFIAN_PARAM 1.001 /* minimum parameter for zipfian */
180 #define MAX_ZIPFIAN_PARAM 1000.0 /* maximum parameter for zipfian */
181
182 int nxacts = 0; /* number of transactions per client */
183 int duration = 0; /* duration in seconds */
184 int64 end_time = 0; /* when to stop in micro seconds, under -T */
185
186 /*
187 * scaling factor. for example, scale = 10 will make 1000000 tuples in
188 * pgbench_accounts table.
189 */
190 int scale = 1;
191
192 /*
193 * fillfactor. for example, fillfactor = 90 will use only 90 percent
194 * space during inserts and leave 10 percent free.
195 */
196 int fillfactor = 100;
197
198 /*
199 * use unlogged tables?
200 */
201 bool unlogged_tables = false;
202
203 /*
204 * log sampling rate (1.0 = log everything, 0.0 = option not given)
205 */
206 double sample_rate = 0.0;
207
208 /*
209 * When threads are throttled to a given rate limit, this is the target delay
210 * to reach that rate in usec. 0 is the default and means no throttling.
211 */
212 double throttle_delay = 0;
213
214 /*
215 * Transactions which take longer than this limit (in usec) are counted as
216 * late, and reported as such, although they are completed anyway. When
217 * throttling is enabled, execution time slots that are more than this late
218 * are skipped altogether, and counted separately.
219 */
220 int64 latency_limit = 0;
221
222 /*
223 * tablespace selection
224 */
225 char *tablespace = NULL;
226 char *index_tablespace = NULL;
227
228 /*
229 * Number of "pgbench_accounts" partitions. 0 is the default and means no
230 * partitioning.
231 */
232 static int partitions = 0;
233
234 /* partitioning strategy for "pgbench_accounts" */
235 typedef enum
236 {
237 PART_NONE, /* no partitioning */
238 PART_RANGE, /* range partitioning */
239 PART_HASH /* hash partitioning */
240 } partition_method_t;
241
242 static partition_method_t partition_method = PART_NONE;
243 static const char *PARTITION_METHOD[] = {"none", "range", "hash"};
244
245 /* random seed used to initialize base_random_sequence */
246 int64 random_seed = -1;
247
248 /*
249 * end of configurable parameters
250 *********************************************************************/
251
252 #define nbranches 1 /* Makes little sense to change this. Change
253 * -s instead */
254 #define ntellers 10
255 #define naccounts 100000
256
257 /*
258 * The scale factor at/beyond which 32bit integers are incapable of storing
259 * 64bit values.
260 *
261 * Although the actual threshold is 21474, we use 20000 because it is easier to
262 * document and remember, and isn't that far away from the real threshold.
263 */
264 #define SCALE_32BIT_THRESHOLD 20000
265
266 bool use_log; /* log transaction latencies to a file */
267 bool use_quiet; /* quiet logging onto stderr */
268 int agg_interval; /* log aggregates instead of individual
269 * transactions */
270 bool per_script_stats = false; /* whether to collect stats per script */
271 int progress = 0; /* thread progress report every this seconds */
272 bool progress_timestamp = false; /* progress report with Unix time */
273 int nclients = 1; /* number of clients */
274 int nthreads = 1; /* number of threads */
275 bool is_connect; /* establish connection for each transaction */
276 bool report_per_command; /* report per-command latencies */
277 int main_pid; /* main process id used in log filename */
278
279 const char *pghost = NULL;
280 const char *pgport = NULL;
281 const char *username = NULL;
282 const char *dbName = NULL;
283 char *logfile_prefix = NULL;
284 const char *progname;
285
286 #define WSEP '@' /* weight separator */
287
288 volatile bool timer_exceeded = false; /* flag from signal handler */
289
290 /*
291 * Variable definitions.
292 *
293 * If a variable only has a string value, "svalue" is that value, and value is
294 * "not set". If the value is known, "value" contains the value (in any
295 * variant).
296 *
297 * In this case "svalue" contains the string equivalent of the value, if we've
298 * had occasion to compute that, or NULL if we haven't.
299 */
300 typedef struct
301 {
302 char *name; /* variable's name */
303 char *svalue; /* its value in string form, if known */
304 PgBenchValue value; /* actual variable's value */
305 } Variable;
306
307 #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
308 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
309
310 /*
311 * Simple data structure to keep stats about something.
312 *
313 * XXX probably the first value should be kept and used as an offset for
314 * better numerical stability...
315 */
316 typedef struct SimpleStats
317 {
318 int64 count; /* how many values were encountered */
319 double min; /* the minimum seen */
320 double max; /* the maximum seen */
321 double sum; /* sum of values */
322 double sum2; /* sum of squared values */
323 } SimpleStats;
324
325 /*
326 * The instr_time type is expensive when dealing with time arithmetic. Define
327 * a type to hold microseconds instead. Type int64 is good enough for about
328 * 584500 years.
329 */
330 typedef int64 pg_time_usec_t;
331
332 /*
333 * Data structure to hold various statistics: per-thread and per-script stats
334 * are maintained and merged together.
335 */
336 typedef struct StatsData
337 {
338 pg_time_usec_t start_time; /* interval start time, for aggregates */
339 int64 cnt; /* number of transactions, including skipped */
340 int64 skipped; /* number of transactions skipped under --rate
341 * and --latency-limit */
342 SimpleStats latency;
343 SimpleStats lag;
344 } StatsData;
345
346 /*
347 * For displaying Unix epoch timestamps, as some time functions may have
348 * another reference.
349 */
350 pg_time_usec_t epoch_shift;
351
352 /*
353 * Struct to keep random state.
354 */
355 typedef struct RandomState
356 {
357 unsigned short xseed[3];
358 } RandomState;
359
360 /* Various random sequences are initialized from this one. */
361 static RandomState base_random_sequence;
362
363 /* Synchronization barrier for start and connection */
364 static THREAD_BARRIER_T barrier;
365
366 /*
367 * Connection state machine states.
368 */
369 typedef enum
370 {
371 /*
372 * The client must first choose a script to execute. Once chosen, it can
373 * either be throttled (state CSTATE_PREPARE_THROTTLE under --rate), start
374 * right away (state CSTATE_START_TX) or not start at all if the timer was
375 * exceeded (state CSTATE_FINISHED).
376 */
377 CSTATE_CHOOSE_SCRIPT,
378
379 /*
380 * CSTATE_START_TX performs start-of-transaction processing. Establishes
381 * a new connection for the transaction in --connect mode, records the
382 * transaction start time, and proceed to the first command.
383 *
384 * Note: once a script is started, it will either error or run till its
385 * end, where it may be interrupted. It is not interrupted while running,
386 * so pgbench --time is to be understood as tx are allowed to start in
387 * that time, and will finish when their work is completed.
388 */
389 CSTATE_START_TX,
390
391 /*
392 * In CSTATE_PREPARE_THROTTLE state, we calculate when to begin the next
393 * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
394 * sleeps until that moment, then advances to CSTATE_START_TX, or
395 * CSTATE_FINISHED if the next transaction would start beyond the end of
396 * the run.
397 */
398 CSTATE_PREPARE_THROTTLE,
399 CSTATE_THROTTLE,
400
401 /*
402 * We loop through these states, to process each command in the script:
403 *
404 * CSTATE_START_COMMAND starts the execution of a command. On a SQL
405 * command, the command is sent to the server, and we move to
406 * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
407 * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
408 * wait for it to expire. Other meta-commands are executed immediately. If
409 * the command about to start is actually beyond the end of the script,
410 * advance to CSTATE_END_TX.
411 *
412 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
413 * for the current command.
414 *
415 * CSTATE_SLEEP waits until the end of \sleep.
416 *
417 * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
418 * command counter, and loops back to CSTATE_START_COMMAND state.
419 *
420 * CSTATE_SKIP_COMMAND is used by conditional branches which are not
421 * executed. It quickly skip commands that do not need any evaluation.
422 * This state can move forward several commands, till there is something
423 * to do or the end of the script.
424 */
425 CSTATE_START_COMMAND,
426 CSTATE_WAIT_RESULT,
427 CSTATE_SLEEP,
428 CSTATE_END_COMMAND,
429 CSTATE_SKIP_COMMAND,
430
431 /*
432 * CSTATE_END_TX performs end-of-transaction processing. It calculates
433 * latency, and logs the transaction. In --connect mode, it closes the
434 * current connection.
435 *
436 * Then either starts over in CSTATE_CHOOSE_SCRIPT, or enters
437 * CSTATE_FINISHED if we have no more work to do.
438 */
439 CSTATE_END_TX,
440
441 /*
442 * Final states. CSTATE_ABORTED means that the script execution was
443 * aborted because a command failed, CSTATE_FINISHED means success.
444 */
445 CSTATE_ABORTED,
446 CSTATE_FINISHED
447 } ConnectionStateEnum;
448
449 /*
450 * Connection state.
451 */
452 typedef struct
453 {
454 PGconn *con; /* connection handle to DB */
455 int id; /* client No. */
456 ConnectionStateEnum state; /* state machine's current state. */
457 ConditionalStack cstack; /* enclosing conditionals state */
458
459 /*
460 * Separate randomness for each client. This is used for random functions
461 * PGBENCH_RANDOM_* during the execution of the script.
462 */
463 RandomState cs_func_rs;
464
465 int use_file; /* index in sql_script for this client */
466 int command; /* command number in script */
467
468 /* client variables */
469 Variable *variables; /* array of variable definitions */
470 int nvariables; /* number of variables */
471 bool vars_sorted; /* are variables sorted by name? */
472
473 /* various times about current transaction in microseconds */
474 pg_time_usec_t txn_scheduled; /* scheduled start time of transaction */
475 pg_time_usec_t sleep_until; /* scheduled start time of next cmd */
476 pg_time_usec_t txn_begin; /* used for measuring schedule lag times */
477 pg_time_usec_t stmt_begin; /* used for measuring statement latencies */
478
479 bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
480
481 /* per client collected stats */
482 int64 cnt; /* client transaction count, for -t */
483 } CState;
484
485 /*
486 * Thread state
487 */
488 typedef struct
489 {
490 int tid; /* thread id */
491 THREAD_T thread; /* thread handle */
492 CState *state; /* array of CState */
493 int nstate; /* length of state[] */
494
495 /*
496 * Separate randomness for each thread. Each thread option uses its own
497 * random state to make all of them independent of each other and
498 * therefore deterministic at the thread level.
499 */
500 RandomState ts_choose_rs; /* random state for selecting a script */
501 RandomState ts_throttle_rs; /* random state for transaction throttling */
502 RandomState ts_sample_rs; /* random state for log sampling */
503
504 int64 throttle_trigger; /* previous/next throttling (us) */
505 FILE *logfile; /* where to log, or NULL */
506
507 /* per thread collected stats in microseconds */
508 pg_time_usec_t create_time; /* thread creation time */
509 pg_time_usec_t started_time; /* thread is running */
510 pg_time_usec_t bench_start; /* thread is benchmarking */
511 pg_time_usec_t conn_duration; /* cumulated connection and deconnection
512 * delays */
513
514 StatsData stats;
515 int64 latency_late; /* count executed but late transactions */
516 } TState;
517
518 /*
519 * queries read from files
520 */
521 #define SQL_COMMAND 1
522 #define META_COMMAND 2
523
524 /*
525 * max number of backslash command arguments or SQL variables,
526 * including the command or SQL statement itself
527 */
528 #define MAX_ARGS 256
529
530 typedef enum MetaCommand
531 {
532 META_NONE, /* not a known meta-command */
533 META_SET, /* \set */
534 META_SETSHELL, /* \setshell */
535 META_SHELL, /* \shell */
536 META_SLEEP, /* \sleep */
537 META_GSET, /* \gset */
538 META_ASET, /* \aset */
539 META_IF, /* \if */
540 META_ELIF, /* \elif */
541 META_ELSE, /* \else */
542 META_ENDIF, /* \endif */
543 META_STARTPIPELINE, /* \startpipeline */
544 META_ENDPIPELINE /* \endpipeline */
545 } MetaCommand;
546
547 typedef enum QueryMode
548 {
549 QUERY_SIMPLE, /* simple query */
550 QUERY_EXTENDED, /* extended query */
551 QUERY_PREPARED, /* extended query with prepared statements */
552 NUM_QUERYMODE
553 } QueryMode;
554
555 static QueryMode querymode = QUERY_SIMPLE;
556 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
557
558 /*
559 * struct Command represents one command in a script.
560 *
561 * lines The raw, possibly multi-line command text. Variable substitution
562 * not applied.
563 * first_line A short, single-line extract of 'lines', for error reporting.
564 * type SQL_COMMAND or META_COMMAND
565 * meta The type of meta-command, with META_NONE/GSET/ASET if command
566 * is SQL.
567 * argc Number of arguments of the command, 0 if not yet processed.
568 * argv Command arguments, the first of which is the command or SQL
569 * string itself. For SQL commands, after post-processing
570 * argv[0] is the same as 'lines' with variables substituted.
571 * varprefix SQL commands terminated with \gset or \aset have this set
572 * to a non NULL value. If nonempty, it's used to prefix the
573 * variable name that receives the value.
574 * aset do gset on all possible queries of a combined query (\;).
575 * expr Parsed expression, if needed.
576 * stats Time spent in this command.
577 */
578 typedef struct Command
579 {
580 PQExpBufferData lines;
581 char *first_line;
582 int type;
583 MetaCommand meta;
584 int argc;
585 char *argv[MAX_ARGS];
586 char *varprefix;
587 PgBenchExpr *expr;
588 SimpleStats stats;
589 } Command;
590
591 typedef struct ParsedScript
592 {
593 const char *desc; /* script descriptor (eg, file name) */
594 int weight; /* selection weight */
595 Command **commands; /* NULL-terminated array of Commands */
596 StatsData stats; /* total time spent in script */
597 } ParsedScript;
598
599 static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
600 static int num_scripts; /* number of scripts in sql_script[] */
601 static int64 total_weight = 0;
602
603 /* Builtin test scripts */
604 typedef struct BuiltinScript
605 {
606 const char *name; /* very short name for -b ... */
607 const char *desc; /* short description */
608 const char *script; /* actual pgbench script */
609 } BuiltinScript;
610
611 static const BuiltinScript builtin_script[] =
612 {
613 {
614 "tpcb-like",
615 "<builtin: TPC-B (sort of)>",
616 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
617 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
618 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
619 "\\set delta random(-5000, 5000)\n"
620 "BEGIN;\n"
621 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
622 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
623 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
624 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
625 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
626 "END;\n"
627 },
628 {
629 "simple-update",
630 "<builtin: simple update>",
631 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
632 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
633 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
634 "\\set delta random(-5000, 5000)\n"
635 "BEGIN;\n"
636 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
637 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
638 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
639 "END;\n"
640 },
641 {
642 "select-only",
643 "<builtin: select only>",
644 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
645 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
646 }
647 };
648
649
650 /* Function prototypes */
651 static void setNullValue(PgBenchValue *pv);
652 static void setBoolValue(PgBenchValue *pv, bool bval);
653 static void setIntValue(PgBenchValue *pv, int64 ival);
654 static void setDoubleValue(PgBenchValue *pv, double dval);
655 static bool evaluateExpr(CState *st, PgBenchExpr *expr,
656 PgBenchValue *retval);
657 static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
658 static void doLog(TState *thread, CState *st,
659 StatsData *agg, bool skipped, double latency, double lag);
660 static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
661 bool skipped, StatsData *agg);
662 static void addScript(ParsedScript script);
663 static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC threadRun(void *arg);
664 static void finishCon(CState *st);
665 static void setalarm(int seconds);
666 static socket_set *alloc_socket_set(int count);
667 static void free_socket_set(socket_set *sa);
668 static void clear_socket_set(socket_set *sa);
669 static void add_socket_to_set(socket_set *sa, int fd, int idx);
670 static int wait_on_socket_set(socket_set *sa, int64 usecs);
671 static bool socket_has_input(socket_set *sa, int fd, int idx);
672
673
674 /* callback functions for our flex lexer */
675 static const PsqlScanCallbacks pgbench_callbacks = {
676 NULL, /* don't need get_variable functionality */
677 };
678
679 static inline pg_time_usec_t
pg_time_now(void)680 pg_time_now(void)
681 {
682 instr_time now;
683
684 INSTR_TIME_SET_CURRENT(now);
685
686 return (pg_time_usec_t) INSTR_TIME_GET_MICROSEC(now);
687 }
688
689 static inline void
pg_time_now_lazy(pg_time_usec_t * now)690 pg_time_now_lazy(pg_time_usec_t *now)
691 {
692 if ((*now) == 0)
693 (*now) = pg_time_now();
694 }
695
696 #define PG_TIME_GET_DOUBLE(t) (0.000001 * (t))
697
698 static void
usage(void)699 usage(void)
700 {
701 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
702 "Usage:\n"
703 " %s [OPTION]... [DBNAME]\n"
704 "\nInitialization options:\n"
705 " -i, --initialize invokes initialization mode\n"
706 " -I, --init-steps=[" ALL_INIT_STEPS "]+ (default \"" DEFAULT_INIT_STEPS "\")\n"
707 " run selected initialization steps\n"
708 " -F, --fillfactor=NUM set fill factor\n"
709 " -n, --no-vacuum do not run VACUUM during initialization\n"
710 " -q, --quiet quiet logging (one message each 5 seconds)\n"
711 " -s, --scale=NUM scaling factor\n"
712 " --foreign-keys create foreign key constraints between tables\n"
713 " --index-tablespace=TABLESPACE\n"
714 " create indexes in the specified tablespace\n"
715 " --partition-method=(range|hash)\n"
716 " partition pgbench_accounts with this method (default: range)\n"
717 " --partitions=NUM partition pgbench_accounts into NUM parts (default: 0)\n"
718 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
719 " --unlogged-tables create tables as unlogged tables\n"
720 "\nOptions to select what to run:\n"
721 " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
722 " (use \"-b list\" to list available scripts)\n"
723 " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
724 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
725 " (same as \"-b simple-update\")\n"
726 " -S, --select-only perform SELECT-only transactions\n"
727 " (same as \"-b select-only\")\n"
728 "\nBenchmarking options:\n"
729 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
730 " -C, --connect establish new connection for each transaction\n"
731 " -D, --define=VARNAME=VALUE\n"
732 " define variable for use by custom script\n"
733 " -j, --jobs=NUM number of threads (default: 1)\n"
734 " -l, --log write transaction times to log file\n"
735 " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
736 " -M, --protocol=simple|extended|prepared\n"
737 " protocol for submitting queries (default: simple)\n"
738 " -n, --no-vacuum do not run VACUUM before tests\n"
739 " -P, --progress=NUM show thread progress report every NUM seconds\n"
740 " -r, --report-latencies report average latency per command\n"
741 " -R, --rate=NUM target rate in transactions per second\n"
742 " -s, --scale=NUM report this scale factor in output\n"
743 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
744 " -T, --time=NUM duration of benchmark test in seconds\n"
745 " -v, --vacuum-all vacuum all four standard tables before tests\n"
746 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
747 " --log-prefix=PREFIX prefix for transaction time log file\n"
748 " (default: \"pgbench_log\")\n"
749 " --progress-timestamp use Unix epoch timestamps for progress\n"
750 " --random-seed=SEED set random seed (\"time\", \"rand\", integer)\n"
751 " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
752 " --show-script=NAME show builtin script code, then exit\n"
753 "\nCommon options:\n"
754 " -d, --debug print debugging output\n"
755 " -h, --host=HOSTNAME database server host or socket directory\n"
756 " -p, --port=PORT database server port number\n"
757 " -U, --username=USERNAME connect as specified database user\n"
758 " -V, --version output version information, then exit\n"
759 " -?, --help show this help, then exit\n"
760 "\n"
761 "Report bugs to <%s>.\n"
762 "%s home page: <%s>\n",
763 progname, progname, PACKAGE_BUGREPORT, PACKAGE_NAME, PACKAGE_URL);
764 }
765
766 /* return whether str matches "^\s*[-+]?[0-9]+$" */
767 static bool
is_an_int(const char * str)768 is_an_int(const char *str)
769 {
770 const char *ptr = str;
771
772 /* skip leading spaces; cast is consistent with strtoint64 */
773 while (*ptr && isspace((unsigned char) *ptr))
774 ptr++;
775
776 /* skip sign */
777 if (*ptr == '+' || *ptr == '-')
778 ptr++;
779
780 /* at least one digit */
781 if (*ptr && !isdigit((unsigned char) *ptr))
782 return false;
783
784 /* eat all digits */
785 while (*ptr && isdigit((unsigned char) *ptr))
786 ptr++;
787
788 /* must have reached end of string */
789 return *ptr == '\0';
790 }
791
792
793 /*
794 * strtoint64 -- convert a string to 64-bit integer
795 *
796 * This function is a slightly modified version of scanint8() from
797 * src/backend/utils/adt/int8.c.
798 *
799 * The function returns whether the conversion worked, and if so
800 * "*result" is set to the result.
801 *
802 * If not errorOK, an error message is also printed out on errors.
803 */
804 bool
strtoint64(const char * str,bool errorOK,int64 * result)805 strtoint64(const char *str, bool errorOK, int64 *result)
806 {
807 const char *ptr = str;
808 int64 tmp = 0;
809 bool neg = false;
810
811 /*
812 * Do our own scan, rather than relying on sscanf which might be broken
813 * for long long.
814 *
815 * As INT64_MIN can't be stored as a positive 64 bit integer, accumulate
816 * value as a negative number.
817 */
818
819 /* skip leading spaces */
820 while (*ptr && isspace((unsigned char) *ptr))
821 ptr++;
822
823 /* handle sign */
824 if (*ptr == '-')
825 {
826 ptr++;
827 neg = true;
828 }
829 else if (*ptr == '+')
830 ptr++;
831
832 /* require at least one digit */
833 if (unlikely(!isdigit((unsigned char) *ptr)))
834 goto invalid_syntax;
835
836 /* process digits */
837 while (*ptr && isdigit((unsigned char) *ptr))
838 {
839 int8 digit = (*ptr++ - '0');
840
841 if (unlikely(pg_mul_s64_overflow(tmp, 10, &tmp)) ||
842 unlikely(pg_sub_s64_overflow(tmp, digit, &tmp)))
843 goto out_of_range;
844 }
845
846 /* allow trailing whitespace, but not other trailing chars */
847 while (*ptr != '\0' && isspace((unsigned char) *ptr))
848 ptr++;
849
850 if (unlikely(*ptr != '\0'))
851 goto invalid_syntax;
852
853 if (!neg)
854 {
855 if (unlikely(tmp == PG_INT64_MIN))
856 goto out_of_range;
857 tmp = -tmp;
858 }
859
860 *result = tmp;
861 return true;
862
863 out_of_range:
864 if (!errorOK)
865 pg_log_error("value \"%s\" is out of range for type bigint", str);
866 return false;
867
868 invalid_syntax:
869 if (!errorOK)
870 pg_log_error("invalid input syntax for type bigint: \"%s\"", str);
871 return false;
872 }
873
874 /* convert string to double, detecting overflows/underflows */
875 bool
strtodouble(const char * str,bool errorOK,double * dv)876 strtodouble(const char *str, bool errorOK, double *dv)
877 {
878 char *end;
879
880 errno = 0;
881 *dv = strtod(str, &end);
882
883 if (unlikely(errno != 0))
884 {
885 if (!errorOK)
886 pg_log_error("value \"%s\" is out of range for type double", str);
887 return false;
888 }
889
890 if (unlikely(end == str || *end != '\0'))
891 {
892 if (!errorOK)
893 pg_log_error("invalid input syntax for type double: \"%s\"", str);
894 return false;
895 }
896 return true;
897 }
898
899 /*
900 * Initialize a random state struct.
901 *
902 * We derive the seed from base_random_sequence, which must be set up already.
903 */
904 static void
initRandomState(RandomState * random_state)905 initRandomState(RandomState *random_state)
906 {
907 random_state->xseed[0] = (unsigned short)
908 (pg_jrand48(base_random_sequence.xseed) & 0xFFFF);
909 random_state->xseed[1] = (unsigned short)
910 (pg_jrand48(base_random_sequence.xseed) & 0xFFFF);
911 random_state->xseed[2] = (unsigned short)
912 (pg_jrand48(base_random_sequence.xseed) & 0xFFFF);
913 }
914
915 /*
916 * Random number generator: uniform distribution from min to max inclusive.
917 *
918 * Although the limits are expressed as int64, you can't generate the full
919 * int64 range in one call, because the difference of the limits mustn't
920 * overflow int64. In practice it's unwise to ask for more than an int32
921 * range, because of the limited precision of pg_erand48().
922 */
923 static int64
getrand(RandomState * random_state,int64 min,int64 max)924 getrand(RandomState *random_state, int64 min, int64 max)
925 {
926 /*
927 * Odd coding is so that min and max have approximately the same chance of
928 * being selected as do numbers between them.
929 *
930 * pg_erand48() is thread-safe and concurrent, which is why we use it
931 * rather than random(), which in glibc is non-reentrant, and therefore
932 * protected by a mutex, and therefore a bottleneck on machines with many
933 * CPUs.
934 */
935 return min + (int64) ((max - min + 1) * pg_erand48(random_state->xseed));
936 }
937
938 /*
939 * random number generator: exponential distribution from min to max inclusive.
940 * the parameter is so that the density of probability for the last cut-off max
941 * value is exp(-parameter).
942 */
943 static int64
getExponentialRand(RandomState * random_state,int64 min,int64 max,double parameter)944 getExponentialRand(RandomState *random_state, int64 min, int64 max,
945 double parameter)
946 {
947 double cut,
948 uniform,
949 rand;
950
951 /* abort if wrong parameter, but must really be checked beforehand */
952 Assert(parameter > 0.0);
953 cut = exp(-parameter);
954 /* erand in [0, 1), uniform in (0, 1] */
955 uniform = 1.0 - pg_erand48(random_state->xseed);
956
957 /*
958 * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
959 */
960 Assert((1.0 - cut) != 0.0);
961 rand = -log(cut + (1.0 - cut) * uniform) / parameter;
962 /* return int64 random number within between min and max */
963 return min + (int64) ((max - min + 1) * rand);
964 }
965
966 /* random number generator: gaussian distribution from min to max inclusive */
967 static int64
getGaussianRand(RandomState * random_state,int64 min,int64 max,double parameter)968 getGaussianRand(RandomState *random_state, int64 min, int64 max,
969 double parameter)
970 {
971 double stdev;
972 double rand;
973
974 /* abort if parameter is too low, but must really be checked beforehand */
975 Assert(parameter >= MIN_GAUSSIAN_PARAM);
976
977 /*
978 * Get user specified random number from this loop, with -parameter <
979 * stdev <= parameter
980 *
981 * This loop is executed until the number is in the expected range.
982 *
983 * As the minimum parameter is 2.0, the probability of looping is low:
984 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
985 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
986 * the worst case. For a parameter value of 5.0, the looping probability
987 * is about e^{-5} * 2 / pi ~ 0.43%.
988 */
989 do
990 {
991 /*
992 * pg_erand48 generates [0,1), but for the basic version of the
993 * Box-Muller transform the two uniformly distributed random numbers
994 * are expected in (0, 1] (see
995 * https://en.wikipedia.org/wiki/Box-Muller_transform)
996 */
997 double rand1 = 1.0 - pg_erand48(random_state->xseed);
998 double rand2 = 1.0 - pg_erand48(random_state->xseed);
999
1000 /* Box-Muller basic form transform */
1001 double var_sqrt = sqrt(-2.0 * log(rand1));
1002
1003 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
1004
1005 /*
1006 * we may try with cos, but there may be a bias induced if the
1007 * previous value fails the test. To be on the safe side, let us try
1008 * over.
1009 */
1010 }
1011 while (stdev < -parameter || stdev >= parameter);
1012
1013 /* stdev is in [-parameter, parameter), normalization to [0,1) */
1014 rand = (stdev + parameter) / (parameter * 2.0);
1015
1016 /* return int64 random number within between min and max */
1017 return min + (int64) ((max - min + 1) * rand);
1018 }
1019
1020 /*
1021 * random number generator: generate a value, such that the series of values
1022 * will approximate a Poisson distribution centered on the given value.
1023 *
1024 * Individual results are rounded to integers, though the center value need
1025 * not be one.
1026 */
1027 static int64
getPoissonRand(RandomState * random_state,double center)1028 getPoissonRand(RandomState *random_state, double center)
1029 {
1030 /*
1031 * Use inverse transform sampling to generate a value > 0, such that the
1032 * expected (i.e. average) value is the given argument.
1033 */
1034 double uniform;
1035
1036 /* erand in [0, 1), uniform in (0, 1] */
1037 uniform = 1.0 - pg_erand48(random_state->xseed);
1038
1039 return (int64) (-log(uniform) * center + 0.5);
1040 }
1041
1042 /*
1043 * Computing zipfian using rejection method, based on
1044 * "Non-Uniform Random Variate Generation",
1045 * Luc Devroye, p. 550-551, Springer 1986.
1046 *
1047 * This works for s > 1.0, but may perform badly for s very close to 1.0.
1048 */
1049 static int64
computeIterativeZipfian(RandomState * random_state,int64 n,double s)1050 computeIterativeZipfian(RandomState *random_state, int64 n, double s)
1051 {
1052 double b = pow(2.0, s - 1.0);
1053 double x,
1054 t,
1055 u,
1056 v;
1057
1058 /* Ensure n is sane */
1059 if (n <= 1)
1060 return 1;
1061
1062 while (true)
1063 {
1064 /* random variates */
1065 u = pg_erand48(random_state->xseed);
1066 v = pg_erand48(random_state->xseed);
1067
1068 x = floor(pow(u, -1.0 / (s - 1.0)));
1069
1070 t = pow(1.0 + 1.0 / x, s - 1.0);
1071 /* reject if too large or out of bound */
1072 if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
1073 break;
1074 }
1075 return (int64) x;
1076 }
1077
1078 /* random number generator: zipfian distribution from min to max inclusive */
1079 static int64
getZipfianRand(RandomState * random_state,int64 min,int64 max,double s)1080 getZipfianRand(RandomState *random_state, int64 min, int64 max, double s)
1081 {
1082 int64 n = max - min + 1;
1083
1084 /* abort if parameter is invalid */
1085 Assert(MIN_ZIPFIAN_PARAM <= s && s <= MAX_ZIPFIAN_PARAM);
1086
1087 return min - 1 + computeIterativeZipfian(random_state, n, s);
1088 }
1089
1090 /*
1091 * FNV-1a hash function
1092 */
1093 static int64
getHashFnv1a(int64 val,uint64 seed)1094 getHashFnv1a(int64 val, uint64 seed)
1095 {
1096 int64 result;
1097 int i;
1098
1099 result = FNV_OFFSET_BASIS ^ seed;
1100 for (i = 0; i < 8; ++i)
1101 {
1102 int32 octet = val & 0xff;
1103
1104 val = val >> 8;
1105 result = result ^ octet;
1106 result = result * FNV_PRIME;
1107 }
1108
1109 return result;
1110 }
1111
1112 /*
1113 * Murmur2 hash function
1114 *
1115 * Based on original work of Austin Appleby
1116 * https://github.com/aappleby/smhasher/blob/master/src/MurmurHash2.cpp
1117 */
1118 static int64
getHashMurmur2(int64 val,uint64 seed)1119 getHashMurmur2(int64 val, uint64 seed)
1120 {
1121 uint64 result = seed ^ MM2_MUL_TIMES_8; /* sizeof(int64) */
1122 uint64 k = (uint64) val;
1123
1124 k *= MM2_MUL;
1125 k ^= k >> MM2_ROT;
1126 k *= MM2_MUL;
1127
1128 result ^= k;
1129 result *= MM2_MUL;
1130
1131 result ^= result >> MM2_ROT;
1132 result *= MM2_MUL;
1133 result ^= result >> MM2_ROT;
1134
1135 return (int64) result;
1136 }
1137
1138 /*
1139 * Pseudorandom permutation function
1140 *
1141 * For small sizes, this generates each of the (size!) possible permutations
1142 * of integers in the range [0, size) with roughly equal probability. Once
1143 * the size is larger than 20, the number of possible permutations exceeds the
1144 * number of distinct states of the internal pseudorandom number generators,
1145 * and so not all possible permutations can be generated, but the permutations
1146 * chosen should continue to give the appearance of being random.
1147 *
1148 * THIS FUNCTION IS NOT CRYPTOGRAPHICALLY SECURE.
1149 * DO NOT USE FOR SUCH PURPOSE.
1150 */
1151 static int64
permute(const int64 val,const int64 isize,const int64 seed)1152 permute(const int64 val, const int64 isize, const int64 seed)
1153 {
1154 RandomState random_state1;
1155 RandomState random_state2;
1156 uint64 size;
1157 uint64 v;
1158 int masklen;
1159 uint64 mask;
1160 int i;
1161
1162 if (isize < 2)
1163 return 0; /* nothing to permute */
1164
1165 /* Initialize a pair of random states using the seed */
1166 random_state1.xseed[0] = seed & 0xFFFF;
1167 random_state1.xseed[1] = (seed >> 16) & 0xFFFF;
1168 random_state1.xseed[2] = (seed >> 32) & 0xFFFF;
1169
1170 random_state2.xseed[0] = (((uint64) seed) >> 48) & 0xFFFF;
1171 random_state2.xseed[1] = seed & 0xFFFF;
1172 random_state2.xseed[2] = (seed >> 16) & 0xFFFF;
1173
1174 /* Computations are performed on unsigned values */
1175 size = (uint64) isize;
1176 v = (uint64) val % size;
1177
1178 /* Mask to work modulo largest power of 2 less than or equal to size */
1179 masklen = pg_leftmost_one_pos64(size);
1180 mask = (((uint64) 1) << masklen) - 1;
1181
1182 /*
1183 * Permute the input value by applying several rounds of pseudorandom
1184 * bijective transformations. The intention here is to distribute each
1185 * input uniformly randomly across the range, and separate adjacent inputs
1186 * approximately uniformly randomly from each other, leading to a fairly
1187 * random overall choice of permutation.
1188 *
1189 * To separate adjacent inputs, we multiply by a random number modulo
1190 * (mask + 1), which is a power of 2. For this to be a bijection, the
1191 * multiplier must be odd. Since this is known to lead to less randomness
1192 * in the lower bits, we also apply a rotation that shifts the topmost bit
1193 * into the least significant bit. In the special cases where size <= 3,
1194 * mask = 1 and each of these operations is actually a no-op, so we also
1195 * XOR the value with a different random number to inject additional
1196 * randomness. Since the size is generally not a power of 2, we apply
1197 * this bijection on overlapping upper and lower halves of the input.
1198 *
1199 * To distribute the inputs uniformly across the range, we then also apply
1200 * a random offset modulo the full range.
1201 *
1202 * Taken together, these operations resemble a modified linear
1203 * congruential generator, as is commonly used in pseudorandom number
1204 * generators. The number of rounds is fairly arbitrary, but six has been
1205 * found empirically to give a fairly good tradeoff between performance
1206 * and uniform randomness. For small sizes it selects each of the (size!)
1207 * possible permutations with roughly equal probability. For larger
1208 * sizes, not all permutations can be generated, but the intended random
1209 * spread is still produced.
1210 */
1211 for (i = 0; i < 6; i++)
1212 {
1213 uint64 m,
1214 r,
1215 t;
1216
1217 /* Random multiply (by an odd number), XOR and rotate of lower half */
1218 m = (uint64) getrand(&random_state1, 0, mask) | 1;
1219 r = (uint64) getrand(&random_state2, 0, mask);
1220 if (v <= mask)
1221 {
1222 v = ((v * m) ^ r) & mask;
1223 v = ((v << 1) & mask) | (v >> (masklen - 1));
1224 }
1225
1226 /* Random multiply (by an odd number), XOR and rotate of upper half */
1227 m = (uint64) getrand(&random_state1, 0, mask) | 1;
1228 r = (uint64) getrand(&random_state2, 0, mask);
1229 t = size - 1 - v;
1230 if (t <= mask)
1231 {
1232 t = ((t * m) ^ r) & mask;
1233 t = ((t << 1) & mask) | (t >> (masklen - 1));
1234 v = size - 1 - t;
1235 }
1236
1237 /* Random offset */
1238 r = (uint64) getrand(&random_state2, 0, size - 1);
1239 v = (v + r) % size;
1240 }
1241
1242 return (int64) v;
1243 }
1244
1245 /*
1246 * Initialize the given SimpleStats struct to all zeroes
1247 */
1248 static void
initSimpleStats(SimpleStats * ss)1249 initSimpleStats(SimpleStats *ss)
1250 {
1251 memset(ss, 0, sizeof(SimpleStats));
1252 }
1253
1254 /*
1255 * Accumulate one value into a SimpleStats struct.
1256 */
1257 static void
addToSimpleStats(SimpleStats * ss,double val)1258 addToSimpleStats(SimpleStats *ss, double val)
1259 {
1260 if (ss->count == 0 || val < ss->min)
1261 ss->min = val;
1262 if (ss->count == 0 || val > ss->max)
1263 ss->max = val;
1264 ss->count++;
1265 ss->sum += val;
1266 ss->sum2 += val * val;
1267 }
1268
1269 /*
1270 * Merge two SimpleStats objects
1271 */
1272 static void
mergeSimpleStats(SimpleStats * acc,SimpleStats * ss)1273 mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
1274 {
1275 if (acc->count == 0 || ss->min < acc->min)
1276 acc->min = ss->min;
1277 if (acc->count == 0 || ss->max > acc->max)
1278 acc->max = ss->max;
1279 acc->count += ss->count;
1280 acc->sum += ss->sum;
1281 acc->sum2 += ss->sum2;
1282 }
1283
1284 /*
1285 * Initialize a StatsData struct to mostly zeroes, with its start time set to
1286 * the given value.
1287 */
1288 static void
initStats(StatsData * sd,pg_time_usec_t start)1289 initStats(StatsData *sd, pg_time_usec_t start)
1290 {
1291 sd->start_time = start;
1292 sd->cnt = 0;
1293 sd->skipped = 0;
1294 initSimpleStats(&sd->latency);
1295 initSimpleStats(&sd->lag);
1296 }
1297
1298 /*
1299 * Accumulate one additional item into the given stats object.
1300 */
1301 static void
accumStats(StatsData * stats,bool skipped,double lat,double lag)1302 accumStats(StatsData *stats, bool skipped, double lat, double lag)
1303 {
1304 stats->cnt++;
1305
1306 if (skipped)
1307 {
1308 /* no latency to record on skipped transactions */
1309 stats->skipped++;
1310 }
1311 else
1312 {
1313 addToSimpleStats(&stats->latency, lat);
1314
1315 /* and possibly the same for schedule lag */
1316 if (throttle_delay)
1317 addToSimpleStats(&stats->lag, lag);
1318 }
1319 }
1320
1321 /* call PQexec() and exit() on failure */
1322 static void
executeStatement(PGconn * con,const char * sql)1323 executeStatement(PGconn *con, const char *sql)
1324 {
1325 PGresult *res;
1326
1327 res = PQexec(con, sql);
1328 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1329 {
1330 pg_log_fatal("query failed: %s", PQerrorMessage(con));
1331 pg_log_info("query was: %s", sql);
1332 exit(1);
1333 }
1334 PQclear(res);
1335 }
1336
1337 /* call PQexec() and complain, but without exiting, on failure */
1338 static void
tryExecuteStatement(PGconn * con,const char * sql)1339 tryExecuteStatement(PGconn *con, const char *sql)
1340 {
1341 PGresult *res;
1342
1343 res = PQexec(con, sql);
1344 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1345 {
1346 pg_log_error("%s", PQerrorMessage(con));
1347 pg_log_info("(ignoring this error and continuing anyway)");
1348 }
1349 PQclear(res);
1350 }
1351
1352 /* set up a connection to the backend */
1353 static PGconn *
doConnect(void)1354 doConnect(void)
1355 {
1356 PGconn *conn;
1357 bool new_pass;
1358 static char *password = NULL;
1359
1360 /*
1361 * Start the connection. Loop until we have a password if requested by
1362 * backend.
1363 */
1364 do
1365 {
1366 #define PARAMS_ARRAY_SIZE 7
1367
1368 const char *keywords[PARAMS_ARRAY_SIZE];
1369 const char *values[PARAMS_ARRAY_SIZE];
1370
1371 keywords[0] = "host";
1372 values[0] = pghost;
1373 keywords[1] = "port";
1374 values[1] = pgport;
1375 keywords[2] = "user";
1376 values[2] = username;
1377 keywords[3] = "password";
1378 values[3] = password;
1379 keywords[4] = "dbname";
1380 values[4] = dbName;
1381 keywords[5] = "fallback_application_name";
1382 values[5] = progname;
1383 keywords[6] = NULL;
1384 values[6] = NULL;
1385
1386 new_pass = false;
1387
1388 conn = PQconnectdbParams(keywords, values, true);
1389
1390 if (!conn)
1391 {
1392 pg_log_error("connection to database \"%s\" failed", dbName);
1393 return NULL;
1394 }
1395
1396 if (PQstatus(conn) == CONNECTION_BAD &&
1397 PQconnectionNeedsPassword(conn) &&
1398 !password)
1399 {
1400 PQfinish(conn);
1401 password = simple_prompt("Password: ", false);
1402 new_pass = true;
1403 }
1404 } while (new_pass);
1405
1406 /* check to see that the backend connection was successfully made */
1407 if (PQstatus(conn) == CONNECTION_BAD)
1408 {
1409 pg_log_error("%s", PQerrorMessage(conn));
1410 PQfinish(conn);
1411 return NULL;
1412 }
1413
1414 return conn;
1415 }
1416
1417 /* qsort comparator for Variable array */
1418 static int
compareVariableNames(const void * v1,const void * v2)1419 compareVariableNames(const void *v1, const void *v2)
1420 {
1421 return strcmp(((const Variable *) v1)->name,
1422 ((const Variable *) v2)->name);
1423 }
1424
1425 /* Locate a variable by name; returns NULL if unknown */
1426 static Variable *
lookupVariable(CState * st,char * name)1427 lookupVariable(CState *st, char *name)
1428 {
1429 Variable key;
1430
1431 /* On some versions of Solaris, bsearch of zero items dumps core */
1432 if (st->nvariables <= 0)
1433 return NULL;
1434
1435 /* Sort if we have to */
1436 if (!st->vars_sorted)
1437 {
1438 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
1439 compareVariableNames);
1440 st->vars_sorted = true;
1441 }
1442
1443 /* Now we can search */
1444 key.name = name;
1445 return (Variable *) bsearch((void *) &key,
1446 (void *) st->variables,
1447 st->nvariables,
1448 sizeof(Variable),
1449 compareVariableNames);
1450 }
1451
1452 /* Get the value of a variable, in string form; returns NULL if unknown */
1453 static char *
getVariable(CState * st,char * name)1454 getVariable(CState *st, char *name)
1455 {
1456 Variable *var;
1457 char stringform[64];
1458
1459 var = lookupVariable(st, name);
1460 if (var == NULL)
1461 return NULL; /* not found */
1462
1463 if (var->svalue)
1464 return var->svalue; /* we have it in string form */
1465
1466 /* We need to produce a string equivalent of the value */
1467 Assert(var->value.type != PGBT_NO_VALUE);
1468 if (var->value.type == PGBT_NULL)
1469 snprintf(stringform, sizeof(stringform), "NULL");
1470 else if (var->value.type == PGBT_BOOLEAN)
1471 snprintf(stringform, sizeof(stringform),
1472 "%s", var->value.u.bval ? "true" : "false");
1473 else if (var->value.type == PGBT_INT)
1474 snprintf(stringform, sizeof(stringform),
1475 INT64_FORMAT, var->value.u.ival);
1476 else if (var->value.type == PGBT_DOUBLE)
1477 snprintf(stringform, sizeof(stringform),
1478 "%.*g", DBL_DIG, var->value.u.dval);
1479 else /* internal error, unexpected type */
1480 Assert(0);
1481 var->svalue = pg_strdup(stringform);
1482 return var->svalue;
1483 }
1484
1485 /* Try to convert variable to a value; return false on failure */
1486 static bool
makeVariableValue(Variable * var)1487 makeVariableValue(Variable *var)
1488 {
1489 size_t slen;
1490
1491 if (var->value.type != PGBT_NO_VALUE)
1492 return true; /* no work */
1493
1494 slen = strlen(var->svalue);
1495
1496 if (slen == 0)
1497 /* what should it do on ""? */
1498 return false;
1499
1500 if (pg_strcasecmp(var->svalue, "null") == 0)
1501 {
1502 setNullValue(&var->value);
1503 }
1504
1505 /*
1506 * accept prefixes such as y, ye, n, no... but not for "o". 0/1 are
1507 * recognized later as an int, which is converted to bool if needed.
1508 */
1509 else if (pg_strncasecmp(var->svalue, "true", slen) == 0 ||
1510 pg_strncasecmp(var->svalue, "yes", slen) == 0 ||
1511 pg_strcasecmp(var->svalue, "on") == 0)
1512 {
1513 setBoolValue(&var->value, true);
1514 }
1515 else if (pg_strncasecmp(var->svalue, "false", slen) == 0 ||
1516 pg_strncasecmp(var->svalue, "no", slen) == 0 ||
1517 pg_strcasecmp(var->svalue, "off") == 0 ||
1518 pg_strcasecmp(var->svalue, "of") == 0)
1519 {
1520 setBoolValue(&var->value, false);
1521 }
1522 else if (is_an_int(var->svalue))
1523 {
1524 /* if it looks like an int, it must be an int without overflow */
1525 int64 iv;
1526
1527 if (!strtoint64(var->svalue, false, &iv))
1528 return false;
1529
1530 setIntValue(&var->value, iv);
1531 }
1532 else /* type should be double */
1533 {
1534 double dv;
1535
1536 if (!strtodouble(var->svalue, true, &dv))
1537 {
1538 pg_log_error("malformed variable \"%s\" value: \"%s\"",
1539 var->name, var->svalue);
1540 return false;
1541 }
1542 setDoubleValue(&var->value, dv);
1543 }
1544 return true;
1545 }
1546
1547 /*
1548 * Check whether a variable's name is allowed.
1549 *
1550 * We allow any non-ASCII character, as well as ASCII letters, digits, and
1551 * underscore.
1552 *
1553 * Keep this in sync with the definitions of variable name characters in
1554 * "src/fe_utils/psqlscan.l", "src/bin/psql/psqlscanslash.l" and
1555 * "src/bin/pgbench/exprscan.l". Also see parseVariable(), below.
1556 *
1557 * Note: this static function is copied from "src/bin/psql/variables.c"
1558 * but changed to disallow variable names starting with a digit.
1559 */
1560 static bool
valid_variable_name(const char * name)1561 valid_variable_name(const char *name)
1562 {
1563 const unsigned char *ptr = (const unsigned char *) name;
1564
1565 /* Mustn't be zero-length */
1566 if (*ptr == '\0')
1567 return false;
1568
1569 /* must not start with [0-9] */
1570 if (IS_HIGHBIT_SET(*ptr) ||
1571 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1572 "_", *ptr) != NULL)
1573 ptr++;
1574 else
1575 return false;
1576
1577 /* remaining characters can include [0-9] */
1578 while (*ptr)
1579 {
1580 if (IS_HIGHBIT_SET(*ptr) ||
1581 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1582 "_0123456789", *ptr) != NULL)
1583 ptr++;
1584 else
1585 return false;
1586 }
1587
1588 return true;
1589 }
1590
1591 /*
1592 * Lookup a variable by name, creating it if need be.
1593 * Caller is expected to assign a value to the variable.
1594 * Returns NULL on failure (bad name).
1595 */
1596 static Variable *
lookupCreateVariable(CState * st,const char * context,char * name)1597 lookupCreateVariable(CState *st, const char *context, char *name)
1598 {
1599 Variable *var;
1600
1601 var = lookupVariable(st, name);
1602 if (var == NULL)
1603 {
1604 Variable *newvars;
1605
1606 /*
1607 * Check for the name only when declaring a new variable to avoid
1608 * overhead.
1609 */
1610 if (!valid_variable_name(name))
1611 {
1612 pg_log_error("%s: invalid variable name: \"%s\"", context, name);
1613 return NULL;
1614 }
1615
1616 /* Create variable at the end of the array */
1617 if (st->variables)
1618 newvars = (Variable *) pg_realloc(st->variables,
1619 (st->nvariables + 1) * sizeof(Variable));
1620 else
1621 newvars = (Variable *) pg_malloc(sizeof(Variable));
1622
1623 st->variables = newvars;
1624
1625 var = &newvars[st->nvariables];
1626
1627 var->name = pg_strdup(name);
1628 var->svalue = NULL;
1629 /* caller is expected to initialize remaining fields */
1630
1631 st->nvariables++;
1632 /* we don't re-sort the array till we have to */
1633 st->vars_sorted = false;
1634 }
1635
1636 return var;
1637 }
1638
1639 /* Assign a string value to a variable, creating it if need be */
1640 /* Returns false on failure (bad name) */
1641 static bool
putVariable(CState * st,const char * context,char * name,const char * value)1642 putVariable(CState *st, const char *context, char *name, const char *value)
1643 {
1644 Variable *var;
1645 char *val;
1646
1647 var = lookupCreateVariable(st, context, name);
1648 if (!var)
1649 return false;
1650
1651 /* dup then free, in case value is pointing at this variable */
1652 val = pg_strdup(value);
1653
1654 if (var->svalue)
1655 free(var->svalue);
1656 var->svalue = val;
1657 var->value.type = PGBT_NO_VALUE;
1658
1659 return true;
1660 }
1661
1662 /* Assign a value to a variable, creating it if need be */
1663 /* Returns false on failure (bad name) */
1664 static bool
putVariableValue(CState * st,const char * context,char * name,const PgBenchValue * value)1665 putVariableValue(CState *st, const char *context, char *name,
1666 const PgBenchValue *value)
1667 {
1668 Variable *var;
1669
1670 var = lookupCreateVariable(st, context, name);
1671 if (!var)
1672 return false;
1673
1674 if (var->svalue)
1675 free(var->svalue);
1676 var->svalue = NULL;
1677 var->value = *value;
1678
1679 return true;
1680 }
1681
1682 /* Assign an integer value to a variable, creating it if need be */
1683 /* Returns false on failure (bad name) */
1684 static bool
putVariableInt(CState * st,const char * context,char * name,int64 value)1685 putVariableInt(CState *st, const char *context, char *name, int64 value)
1686 {
1687 PgBenchValue val;
1688
1689 setIntValue(&val, value);
1690 return putVariableValue(st, context, name, &val);
1691 }
1692
1693 /*
1694 * Parse a possible variable reference (:varname).
1695 *
1696 * "sql" points at a colon. If what follows it looks like a valid
1697 * variable name, return a malloc'd string containing the variable name,
1698 * and set *eaten to the number of characters consumed (including the colon).
1699 * Otherwise, return NULL.
1700 */
1701 static char *
parseVariable(const char * sql,int * eaten)1702 parseVariable(const char *sql, int *eaten)
1703 {
1704 int i = 1; /* starting at 1 skips the colon */
1705 char *name;
1706
1707 /* keep this logic in sync with valid_variable_name() */
1708 if (IS_HIGHBIT_SET(sql[i]) ||
1709 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1710 "_", sql[i]) != NULL)
1711 i++;
1712 else
1713 return NULL;
1714
1715 while (IS_HIGHBIT_SET(sql[i]) ||
1716 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1717 "_0123456789", sql[i]) != NULL)
1718 i++;
1719
1720 name = pg_malloc(i);
1721 memcpy(name, &sql[1], i - 1);
1722 name[i - 1] = '\0';
1723
1724 *eaten = i;
1725 return name;
1726 }
1727
1728 static char *
replaceVariable(char ** sql,char * param,int len,char * value)1729 replaceVariable(char **sql, char *param, int len, char *value)
1730 {
1731 int valueln = strlen(value);
1732
1733 if (valueln > len)
1734 {
1735 size_t offset = param - *sql;
1736
1737 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1738 param = *sql + offset;
1739 }
1740
1741 if (valueln != len)
1742 memmove(param + valueln, param + len, strlen(param + len) + 1);
1743 memcpy(param, value, valueln);
1744
1745 return param + valueln;
1746 }
1747
1748 static char *
assignVariables(CState * st,char * sql)1749 assignVariables(CState *st, char *sql)
1750 {
1751 char *p,
1752 *name,
1753 *val;
1754
1755 p = sql;
1756 while ((p = strchr(p, ':')) != NULL)
1757 {
1758 int eaten;
1759
1760 name = parseVariable(p, &eaten);
1761 if (name == NULL)
1762 {
1763 while (*p == ':')
1764 {
1765 p++;
1766 }
1767 continue;
1768 }
1769
1770 val = getVariable(st, name);
1771 free(name);
1772 if (val == NULL)
1773 {
1774 p++;
1775 continue;
1776 }
1777
1778 p = replaceVariable(&sql, p, eaten, val);
1779 }
1780
1781 return sql;
1782 }
1783
1784 static void
getQueryParams(CState * st,const Command * command,const char ** params)1785 getQueryParams(CState *st, const Command *command, const char **params)
1786 {
1787 int i;
1788
1789 for (i = 0; i < command->argc - 1; i++)
1790 params[i] = getVariable(st, command->argv[i + 1]);
1791 }
1792
1793 static char *
valueTypeName(PgBenchValue * pval)1794 valueTypeName(PgBenchValue *pval)
1795 {
1796 if (pval->type == PGBT_NO_VALUE)
1797 return "none";
1798 else if (pval->type == PGBT_NULL)
1799 return "null";
1800 else if (pval->type == PGBT_INT)
1801 return "int";
1802 else if (pval->type == PGBT_DOUBLE)
1803 return "double";
1804 else if (pval->type == PGBT_BOOLEAN)
1805 return "boolean";
1806 else
1807 {
1808 /* internal error, should never get there */
1809 Assert(false);
1810 return NULL;
1811 }
1812 }
1813
1814 /* get a value as a boolean, or tell if there is a problem */
1815 static bool
coerceToBool(PgBenchValue * pval,bool * bval)1816 coerceToBool(PgBenchValue *pval, bool *bval)
1817 {
1818 if (pval->type == PGBT_BOOLEAN)
1819 {
1820 *bval = pval->u.bval;
1821 return true;
1822 }
1823 else /* NULL, INT or DOUBLE */
1824 {
1825 pg_log_error("cannot coerce %s to boolean", valueTypeName(pval));
1826 *bval = false; /* suppress uninitialized-variable warnings */
1827 return false;
1828 }
1829 }
1830
1831 /*
1832 * Return true or false from an expression for conditional purposes.
1833 * Non zero numerical values are true, zero and NULL are false.
1834 */
1835 static bool
valueTruth(PgBenchValue * pval)1836 valueTruth(PgBenchValue *pval)
1837 {
1838 switch (pval->type)
1839 {
1840 case PGBT_NULL:
1841 return false;
1842 case PGBT_BOOLEAN:
1843 return pval->u.bval;
1844 case PGBT_INT:
1845 return pval->u.ival != 0;
1846 case PGBT_DOUBLE:
1847 return pval->u.dval != 0.0;
1848 default:
1849 /* internal error, unexpected type */
1850 Assert(0);
1851 return false;
1852 }
1853 }
1854
1855 /* get a value as an int, tell if there is a problem */
1856 static bool
coerceToInt(PgBenchValue * pval,int64 * ival)1857 coerceToInt(PgBenchValue *pval, int64 *ival)
1858 {
1859 if (pval->type == PGBT_INT)
1860 {
1861 *ival = pval->u.ival;
1862 return true;
1863 }
1864 else if (pval->type == PGBT_DOUBLE)
1865 {
1866 double dval = rint(pval->u.dval);
1867
1868 if (isnan(dval) || !FLOAT8_FITS_IN_INT64(dval))
1869 {
1870 pg_log_error("double to int overflow for %f", dval);
1871 return false;
1872 }
1873 *ival = (int64) dval;
1874 return true;
1875 }
1876 else /* BOOLEAN or NULL */
1877 {
1878 pg_log_error("cannot coerce %s to int", valueTypeName(pval));
1879 return false;
1880 }
1881 }
1882
1883 /* get a value as a double, or tell if there is a problem */
1884 static bool
coerceToDouble(PgBenchValue * pval,double * dval)1885 coerceToDouble(PgBenchValue *pval, double *dval)
1886 {
1887 if (pval->type == PGBT_DOUBLE)
1888 {
1889 *dval = pval->u.dval;
1890 return true;
1891 }
1892 else if (pval->type == PGBT_INT)
1893 {
1894 *dval = (double) pval->u.ival;
1895 return true;
1896 }
1897 else /* BOOLEAN or NULL */
1898 {
1899 pg_log_error("cannot coerce %s to double", valueTypeName(pval));
1900 return false;
1901 }
1902 }
1903
1904 /* assign a null value */
1905 static void
setNullValue(PgBenchValue * pv)1906 setNullValue(PgBenchValue *pv)
1907 {
1908 pv->type = PGBT_NULL;
1909 pv->u.ival = 0;
1910 }
1911
1912 /* assign a boolean value */
1913 static void
setBoolValue(PgBenchValue * pv,bool bval)1914 setBoolValue(PgBenchValue *pv, bool bval)
1915 {
1916 pv->type = PGBT_BOOLEAN;
1917 pv->u.bval = bval;
1918 }
1919
1920 /* assign an integer value */
1921 static void
setIntValue(PgBenchValue * pv,int64 ival)1922 setIntValue(PgBenchValue *pv, int64 ival)
1923 {
1924 pv->type = PGBT_INT;
1925 pv->u.ival = ival;
1926 }
1927
1928 /* assign a double value */
1929 static void
setDoubleValue(PgBenchValue * pv,double dval)1930 setDoubleValue(PgBenchValue *pv, double dval)
1931 {
1932 pv->type = PGBT_DOUBLE;
1933 pv->u.dval = dval;
1934 }
1935
1936 static bool
isLazyFunc(PgBenchFunction func)1937 isLazyFunc(PgBenchFunction func)
1938 {
1939 return func == PGBENCH_AND || func == PGBENCH_OR || func == PGBENCH_CASE;
1940 }
1941
1942 /* lazy evaluation of some functions */
1943 static bool
evalLazyFunc(CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)1944 evalLazyFunc(CState *st,
1945 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
1946 {
1947 PgBenchValue a1,
1948 a2;
1949 bool ba1,
1950 ba2;
1951
1952 Assert(isLazyFunc(func) && args != NULL && args->next != NULL);
1953
1954 /* args points to first condition */
1955 if (!evaluateExpr(st, args->expr, &a1))
1956 return false;
1957
1958 /* second condition for AND/OR and corresponding branch for CASE */
1959 args = args->next;
1960
1961 switch (func)
1962 {
1963 case PGBENCH_AND:
1964 if (a1.type == PGBT_NULL)
1965 {
1966 setNullValue(retval);
1967 return true;
1968 }
1969
1970 if (!coerceToBool(&a1, &ba1))
1971 return false;
1972
1973 if (!ba1)
1974 {
1975 setBoolValue(retval, false);
1976 return true;
1977 }
1978
1979 if (!evaluateExpr(st, args->expr, &a2))
1980 return false;
1981
1982 if (a2.type == PGBT_NULL)
1983 {
1984 setNullValue(retval);
1985 return true;
1986 }
1987 else if (!coerceToBool(&a2, &ba2))
1988 return false;
1989 else
1990 {
1991 setBoolValue(retval, ba2);
1992 return true;
1993 }
1994
1995 return true;
1996
1997 case PGBENCH_OR:
1998
1999 if (a1.type == PGBT_NULL)
2000 {
2001 setNullValue(retval);
2002 return true;
2003 }
2004
2005 if (!coerceToBool(&a1, &ba1))
2006 return false;
2007
2008 if (ba1)
2009 {
2010 setBoolValue(retval, true);
2011 return true;
2012 }
2013
2014 if (!evaluateExpr(st, args->expr, &a2))
2015 return false;
2016
2017 if (a2.type == PGBT_NULL)
2018 {
2019 setNullValue(retval);
2020 return true;
2021 }
2022 else if (!coerceToBool(&a2, &ba2))
2023 return false;
2024 else
2025 {
2026 setBoolValue(retval, ba2);
2027 return true;
2028 }
2029
2030 case PGBENCH_CASE:
2031 /* when true, execute branch */
2032 if (valueTruth(&a1))
2033 return evaluateExpr(st, args->expr, retval);
2034
2035 /* now args contains next condition or final else expression */
2036 args = args->next;
2037
2038 /* final else case? */
2039 if (args->next == NULL)
2040 return evaluateExpr(st, args->expr, retval);
2041
2042 /* no, another when, proceed */
2043 return evalLazyFunc(st, PGBENCH_CASE, args, retval);
2044
2045 default:
2046 /* internal error, cannot get here */
2047 Assert(0);
2048 break;
2049 }
2050 return false;
2051 }
2052
2053 /* maximum number of function arguments */
2054 #define MAX_FARGS 16
2055
2056 /*
2057 * Recursive evaluation of standard functions,
2058 * which do not require lazy evaluation.
2059 */
2060 static bool
evalStandardFunc(CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)2061 evalStandardFunc(CState *st,
2062 PgBenchFunction func, PgBenchExprLink *args,
2063 PgBenchValue *retval)
2064 {
2065 /* evaluate all function arguments */
2066 int nargs = 0;
2067 PgBenchValue vargs[MAX_FARGS];
2068 PgBenchExprLink *l = args;
2069 bool has_null = false;
2070
2071 for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
2072 {
2073 if (!evaluateExpr(st, l->expr, &vargs[nargs]))
2074 return false;
2075 has_null |= vargs[nargs].type == PGBT_NULL;
2076 }
2077
2078 if (l != NULL)
2079 {
2080 pg_log_error("too many function arguments, maximum is %d", MAX_FARGS);
2081 return false;
2082 }
2083
2084 /* NULL arguments */
2085 if (has_null && func != PGBENCH_IS && func != PGBENCH_DEBUG)
2086 {
2087 setNullValue(retval);
2088 return true;
2089 }
2090
2091 /* then evaluate function */
2092 switch (func)
2093 {
2094 /* overloaded operators */
2095 case PGBENCH_ADD:
2096 case PGBENCH_SUB:
2097 case PGBENCH_MUL:
2098 case PGBENCH_DIV:
2099 case PGBENCH_MOD:
2100 case PGBENCH_EQ:
2101 case PGBENCH_NE:
2102 case PGBENCH_LE:
2103 case PGBENCH_LT:
2104 {
2105 PgBenchValue *lval = &vargs[0],
2106 *rval = &vargs[1];
2107
2108 Assert(nargs == 2);
2109
2110 /* overloaded type management, double if some double */
2111 if ((lval->type == PGBT_DOUBLE ||
2112 rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
2113 {
2114 double ld,
2115 rd;
2116
2117 if (!coerceToDouble(lval, &ld) ||
2118 !coerceToDouble(rval, &rd))
2119 return false;
2120
2121 switch (func)
2122 {
2123 case PGBENCH_ADD:
2124 setDoubleValue(retval, ld + rd);
2125 return true;
2126
2127 case PGBENCH_SUB:
2128 setDoubleValue(retval, ld - rd);
2129 return true;
2130
2131 case PGBENCH_MUL:
2132 setDoubleValue(retval, ld * rd);
2133 return true;
2134
2135 case PGBENCH_DIV:
2136 setDoubleValue(retval, ld / rd);
2137 return true;
2138
2139 case PGBENCH_EQ:
2140 setBoolValue(retval, ld == rd);
2141 return true;
2142
2143 case PGBENCH_NE:
2144 setBoolValue(retval, ld != rd);
2145 return true;
2146
2147 case PGBENCH_LE:
2148 setBoolValue(retval, ld <= rd);
2149 return true;
2150
2151 case PGBENCH_LT:
2152 setBoolValue(retval, ld < rd);
2153 return true;
2154
2155 default:
2156 /* cannot get here */
2157 Assert(0);
2158 }
2159 }
2160 else /* we have integer operands, or % */
2161 {
2162 int64 li,
2163 ri,
2164 res;
2165
2166 if (!coerceToInt(lval, &li) ||
2167 !coerceToInt(rval, &ri))
2168 return false;
2169
2170 switch (func)
2171 {
2172 case PGBENCH_ADD:
2173 if (pg_add_s64_overflow(li, ri, &res))
2174 {
2175 pg_log_error("bigint add out of range");
2176 return false;
2177 }
2178 setIntValue(retval, res);
2179 return true;
2180
2181 case PGBENCH_SUB:
2182 if (pg_sub_s64_overflow(li, ri, &res))
2183 {
2184 pg_log_error("bigint sub out of range");
2185 return false;
2186 }
2187 setIntValue(retval, res);
2188 return true;
2189
2190 case PGBENCH_MUL:
2191 if (pg_mul_s64_overflow(li, ri, &res))
2192 {
2193 pg_log_error("bigint mul out of range");
2194 return false;
2195 }
2196 setIntValue(retval, res);
2197 return true;
2198
2199 case PGBENCH_EQ:
2200 setBoolValue(retval, li == ri);
2201 return true;
2202
2203 case PGBENCH_NE:
2204 setBoolValue(retval, li != ri);
2205 return true;
2206
2207 case PGBENCH_LE:
2208 setBoolValue(retval, li <= ri);
2209 return true;
2210
2211 case PGBENCH_LT:
2212 setBoolValue(retval, li < ri);
2213 return true;
2214
2215 case PGBENCH_DIV:
2216 case PGBENCH_MOD:
2217 if (ri == 0)
2218 {
2219 pg_log_error("division by zero");
2220 return false;
2221 }
2222 /* special handling of -1 divisor */
2223 if (ri == -1)
2224 {
2225 if (func == PGBENCH_DIV)
2226 {
2227 /* overflow check (needed for INT64_MIN) */
2228 if (li == PG_INT64_MIN)
2229 {
2230 pg_log_error("bigint div out of range");
2231 return false;
2232 }
2233 else
2234 setIntValue(retval, -li);
2235 }
2236 else
2237 setIntValue(retval, 0);
2238 return true;
2239 }
2240 /* else divisor is not -1 */
2241 if (func == PGBENCH_DIV)
2242 setIntValue(retval, li / ri);
2243 else /* func == PGBENCH_MOD */
2244 setIntValue(retval, li % ri);
2245
2246 return true;
2247
2248 default:
2249 /* cannot get here */
2250 Assert(0);
2251 }
2252 }
2253
2254 Assert(0);
2255 return false; /* NOTREACHED */
2256 }
2257
2258 /* integer bitwise operators */
2259 case PGBENCH_BITAND:
2260 case PGBENCH_BITOR:
2261 case PGBENCH_BITXOR:
2262 case PGBENCH_LSHIFT:
2263 case PGBENCH_RSHIFT:
2264 {
2265 int64 li,
2266 ri;
2267
2268 if (!coerceToInt(&vargs[0], &li) || !coerceToInt(&vargs[1], &ri))
2269 return false;
2270
2271 if (func == PGBENCH_BITAND)
2272 setIntValue(retval, li & ri);
2273 else if (func == PGBENCH_BITOR)
2274 setIntValue(retval, li | ri);
2275 else if (func == PGBENCH_BITXOR)
2276 setIntValue(retval, li ^ ri);
2277 else if (func == PGBENCH_LSHIFT)
2278 setIntValue(retval, li << ri);
2279 else if (func == PGBENCH_RSHIFT)
2280 setIntValue(retval, li >> ri);
2281 else /* cannot get here */
2282 Assert(0);
2283
2284 return true;
2285 }
2286
2287 /* logical operators */
2288 case PGBENCH_NOT:
2289 {
2290 bool b;
2291
2292 if (!coerceToBool(&vargs[0], &b))
2293 return false;
2294
2295 setBoolValue(retval, !b);
2296 return true;
2297 }
2298
2299 /* no arguments */
2300 case PGBENCH_PI:
2301 setDoubleValue(retval, M_PI);
2302 return true;
2303
2304 /* 1 overloaded argument */
2305 case PGBENCH_ABS:
2306 {
2307 PgBenchValue *varg = &vargs[0];
2308
2309 Assert(nargs == 1);
2310
2311 if (varg->type == PGBT_INT)
2312 {
2313 int64 i = varg->u.ival;
2314
2315 setIntValue(retval, i < 0 ? -i : i);
2316 }
2317 else
2318 {
2319 double d = varg->u.dval;
2320
2321 Assert(varg->type == PGBT_DOUBLE);
2322 setDoubleValue(retval, d < 0.0 ? -d : d);
2323 }
2324
2325 return true;
2326 }
2327
2328 case PGBENCH_DEBUG:
2329 {
2330 PgBenchValue *varg = &vargs[0];
2331
2332 Assert(nargs == 1);
2333
2334 fprintf(stderr, "debug(script=%d,command=%d): ",
2335 st->use_file, st->command + 1);
2336
2337 if (varg->type == PGBT_NULL)
2338 fprintf(stderr, "null\n");
2339 else if (varg->type == PGBT_BOOLEAN)
2340 fprintf(stderr, "boolean %s\n", varg->u.bval ? "true" : "false");
2341 else if (varg->type == PGBT_INT)
2342 fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
2343 else if (varg->type == PGBT_DOUBLE)
2344 fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
2345 else /* internal error, unexpected type */
2346 Assert(0);
2347
2348 *retval = *varg;
2349
2350 return true;
2351 }
2352
2353 /* 1 double argument */
2354 case PGBENCH_DOUBLE:
2355 case PGBENCH_SQRT:
2356 case PGBENCH_LN:
2357 case PGBENCH_EXP:
2358 {
2359 double dval;
2360
2361 Assert(nargs == 1);
2362
2363 if (!coerceToDouble(&vargs[0], &dval))
2364 return false;
2365
2366 if (func == PGBENCH_SQRT)
2367 dval = sqrt(dval);
2368 else if (func == PGBENCH_LN)
2369 dval = log(dval);
2370 else if (func == PGBENCH_EXP)
2371 dval = exp(dval);
2372 /* else is cast: do nothing */
2373
2374 setDoubleValue(retval, dval);
2375 return true;
2376 }
2377
2378 /* 1 int argument */
2379 case PGBENCH_INT:
2380 {
2381 int64 ival;
2382
2383 Assert(nargs == 1);
2384
2385 if (!coerceToInt(&vargs[0], &ival))
2386 return false;
2387
2388 setIntValue(retval, ival);
2389 return true;
2390 }
2391
2392 /* variable number of arguments */
2393 case PGBENCH_LEAST:
2394 case PGBENCH_GREATEST:
2395 {
2396 bool havedouble;
2397 int i;
2398
2399 Assert(nargs >= 1);
2400
2401 /* need double result if any input is double */
2402 havedouble = false;
2403 for (i = 0; i < nargs; i++)
2404 {
2405 if (vargs[i].type == PGBT_DOUBLE)
2406 {
2407 havedouble = true;
2408 break;
2409 }
2410 }
2411 if (havedouble)
2412 {
2413 double extremum;
2414
2415 if (!coerceToDouble(&vargs[0], &extremum))
2416 return false;
2417 for (i = 1; i < nargs; i++)
2418 {
2419 double dval;
2420
2421 if (!coerceToDouble(&vargs[i], &dval))
2422 return false;
2423 if (func == PGBENCH_LEAST)
2424 extremum = Min(extremum, dval);
2425 else
2426 extremum = Max(extremum, dval);
2427 }
2428 setDoubleValue(retval, extremum);
2429 }
2430 else
2431 {
2432 int64 extremum;
2433
2434 if (!coerceToInt(&vargs[0], &extremum))
2435 return false;
2436 for (i = 1; i < nargs; i++)
2437 {
2438 int64 ival;
2439
2440 if (!coerceToInt(&vargs[i], &ival))
2441 return false;
2442 if (func == PGBENCH_LEAST)
2443 extremum = Min(extremum, ival);
2444 else
2445 extremum = Max(extremum, ival);
2446 }
2447 setIntValue(retval, extremum);
2448 }
2449 return true;
2450 }
2451
2452 /* random functions */
2453 case PGBENCH_RANDOM:
2454 case PGBENCH_RANDOM_EXPONENTIAL:
2455 case PGBENCH_RANDOM_GAUSSIAN:
2456 case PGBENCH_RANDOM_ZIPFIAN:
2457 {
2458 int64 imin,
2459 imax,
2460 delta;
2461
2462 Assert(nargs >= 2);
2463
2464 if (!coerceToInt(&vargs[0], &imin) ||
2465 !coerceToInt(&vargs[1], &imax))
2466 return false;
2467
2468 /* check random range */
2469 if (unlikely(imin > imax))
2470 {
2471 pg_log_error("empty range given to random");
2472 return false;
2473 }
2474 else if (unlikely(pg_sub_s64_overflow(imax, imin, &delta) ||
2475 pg_add_s64_overflow(delta, 1, &delta)))
2476 {
2477 /* prevent int overflows in random functions */
2478 pg_log_error("random range is too large");
2479 return false;
2480 }
2481
2482 if (func == PGBENCH_RANDOM)
2483 {
2484 Assert(nargs == 2);
2485 setIntValue(retval, getrand(&st->cs_func_rs, imin, imax));
2486 }
2487 else /* gaussian & exponential */
2488 {
2489 double param;
2490
2491 Assert(nargs == 3);
2492
2493 if (!coerceToDouble(&vargs[2], ¶m))
2494 return false;
2495
2496 if (func == PGBENCH_RANDOM_GAUSSIAN)
2497 {
2498 if (param < MIN_GAUSSIAN_PARAM)
2499 {
2500 pg_log_error("gaussian parameter must be at least %f (not %f)",
2501 MIN_GAUSSIAN_PARAM, param);
2502 return false;
2503 }
2504
2505 setIntValue(retval,
2506 getGaussianRand(&st->cs_func_rs,
2507 imin, imax, param));
2508 }
2509 else if (func == PGBENCH_RANDOM_ZIPFIAN)
2510 {
2511 if (param < MIN_ZIPFIAN_PARAM || param > MAX_ZIPFIAN_PARAM)
2512 {
2513 pg_log_error("zipfian parameter must be in range [%.3f, %.0f] (not %f)",
2514 MIN_ZIPFIAN_PARAM, MAX_ZIPFIAN_PARAM, param);
2515 return false;
2516 }
2517
2518 setIntValue(retval,
2519 getZipfianRand(&st->cs_func_rs, imin, imax, param));
2520 }
2521 else /* exponential */
2522 {
2523 if (param <= 0.0)
2524 {
2525 pg_log_error("exponential parameter must be greater than zero (not %f)",
2526 param);
2527 return false;
2528 }
2529
2530 setIntValue(retval,
2531 getExponentialRand(&st->cs_func_rs,
2532 imin, imax, param));
2533 }
2534 }
2535
2536 return true;
2537 }
2538
2539 case PGBENCH_POW:
2540 {
2541 PgBenchValue *lval = &vargs[0];
2542 PgBenchValue *rval = &vargs[1];
2543 double ld,
2544 rd;
2545
2546 Assert(nargs == 2);
2547
2548 if (!coerceToDouble(lval, &ld) ||
2549 !coerceToDouble(rval, &rd))
2550 return false;
2551
2552 setDoubleValue(retval, pow(ld, rd));
2553
2554 return true;
2555 }
2556
2557 case PGBENCH_IS:
2558 {
2559 Assert(nargs == 2);
2560
2561 /*
2562 * note: this simple implementation is more permissive than
2563 * SQL
2564 */
2565 setBoolValue(retval,
2566 vargs[0].type == vargs[1].type &&
2567 vargs[0].u.bval == vargs[1].u.bval);
2568 return true;
2569 }
2570
2571 /* hashing */
2572 case PGBENCH_HASH_FNV1A:
2573 case PGBENCH_HASH_MURMUR2:
2574 {
2575 int64 val,
2576 seed;
2577
2578 Assert(nargs == 2);
2579
2580 if (!coerceToInt(&vargs[0], &val) ||
2581 !coerceToInt(&vargs[1], &seed))
2582 return false;
2583
2584 if (func == PGBENCH_HASH_MURMUR2)
2585 setIntValue(retval, getHashMurmur2(val, seed));
2586 else if (func == PGBENCH_HASH_FNV1A)
2587 setIntValue(retval, getHashFnv1a(val, seed));
2588 else
2589 /* cannot get here */
2590 Assert(0);
2591
2592 return true;
2593 }
2594
2595 case PGBENCH_PERMUTE:
2596 {
2597 int64 val,
2598 size,
2599 seed;
2600
2601 Assert(nargs == 3);
2602
2603 if (!coerceToInt(&vargs[0], &val) ||
2604 !coerceToInt(&vargs[1], &size) ||
2605 !coerceToInt(&vargs[2], &seed))
2606 return false;
2607
2608 if (size <= 0)
2609 {
2610 pg_log_error("permute size parameter must be greater than zero");
2611 return false;
2612 }
2613
2614 setIntValue(retval, permute(val, size, seed));
2615 return true;
2616 }
2617
2618 default:
2619 /* cannot get here */
2620 Assert(0);
2621 /* dead code to avoid a compiler warning */
2622 return false;
2623 }
2624 }
2625
2626 /* evaluate some function */
2627 static bool
evalFunc(CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)2628 evalFunc(CState *st,
2629 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
2630 {
2631 if (isLazyFunc(func))
2632 return evalLazyFunc(st, func, args, retval);
2633 else
2634 return evalStandardFunc(st, func, args, retval);
2635 }
2636
2637 /*
2638 * Recursive evaluation of an expression in a pgbench script
2639 * using the current state of variables.
2640 * Returns whether the evaluation was ok,
2641 * the value itself is returned through the retval pointer.
2642 */
2643 static bool
evaluateExpr(CState * st,PgBenchExpr * expr,PgBenchValue * retval)2644 evaluateExpr(CState *st, PgBenchExpr *expr, PgBenchValue *retval)
2645 {
2646 switch (expr->etype)
2647 {
2648 case ENODE_CONSTANT:
2649 {
2650 *retval = expr->u.constant;
2651 return true;
2652 }
2653
2654 case ENODE_VARIABLE:
2655 {
2656 Variable *var;
2657
2658 if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
2659 {
2660 pg_log_error("undefined variable \"%s\"", expr->u.variable.varname);
2661 return false;
2662 }
2663
2664 if (!makeVariableValue(var))
2665 return false;
2666
2667 *retval = var->value;
2668 return true;
2669 }
2670
2671 case ENODE_FUNCTION:
2672 return evalFunc(st,
2673 expr->u.function.function,
2674 expr->u.function.args,
2675 retval);
2676
2677 default:
2678 /* internal error which should never occur */
2679 pg_log_fatal("unexpected enode type in evaluation: %d", expr->etype);
2680 exit(1);
2681 }
2682 }
2683
2684 /*
2685 * Convert command name to meta-command enum identifier
2686 */
2687 static MetaCommand
getMetaCommand(const char * cmd)2688 getMetaCommand(const char *cmd)
2689 {
2690 MetaCommand mc;
2691
2692 if (cmd == NULL)
2693 mc = META_NONE;
2694 else if (pg_strcasecmp(cmd, "set") == 0)
2695 mc = META_SET;
2696 else if (pg_strcasecmp(cmd, "setshell") == 0)
2697 mc = META_SETSHELL;
2698 else if (pg_strcasecmp(cmd, "shell") == 0)
2699 mc = META_SHELL;
2700 else if (pg_strcasecmp(cmd, "sleep") == 0)
2701 mc = META_SLEEP;
2702 else if (pg_strcasecmp(cmd, "if") == 0)
2703 mc = META_IF;
2704 else if (pg_strcasecmp(cmd, "elif") == 0)
2705 mc = META_ELIF;
2706 else if (pg_strcasecmp(cmd, "else") == 0)
2707 mc = META_ELSE;
2708 else if (pg_strcasecmp(cmd, "endif") == 0)
2709 mc = META_ENDIF;
2710 else if (pg_strcasecmp(cmd, "gset") == 0)
2711 mc = META_GSET;
2712 else if (pg_strcasecmp(cmd, "aset") == 0)
2713 mc = META_ASET;
2714 else if (pg_strcasecmp(cmd, "startpipeline") == 0)
2715 mc = META_STARTPIPELINE;
2716 else if (pg_strcasecmp(cmd, "endpipeline") == 0)
2717 mc = META_ENDPIPELINE;
2718 else
2719 mc = META_NONE;
2720 return mc;
2721 }
2722
2723 /*
2724 * Run a shell command. The result is assigned to the variable if not NULL.
2725 * Return true if succeeded, or false on error.
2726 */
2727 static bool
runShellCommand(CState * st,char * variable,char ** argv,int argc)2728 runShellCommand(CState *st, char *variable, char **argv, int argc)
2729 {
2730 char command[SHELL_COMMAND_SIZE];
2731 int i,
2732 len = 0;
2733 FILE *fp;
2734 char res[64];
2735 char *endptr;
2736 int retval;
2737
2738 /*----------
2739 * Join arguments with whitespace separators. Arguments starting with
2740 * exactly one colon are treated as variables:
2741 * name - append a string "name"
2742 * :var - append a variable named 'var'
2743 * ::name - append a string ":name"
2744 *----------
2745 */
2746 for (i = 0; i < argc; i++)
2747 {
2748 char *arg;
2749 int arglen;
2750
2751 if (argv[i][0] != ':')
2752 {
2753 arg = argv[i]; /* a string literal */
2754 }
2755 else if (argv[i][1] == ':')
2756 {
2757 arg = argv[i] + 1; /* a string literal starting with colons */
2758 }
2759 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
2760 {
2761 pg_log_error("%s: undefined variable \"%s\"", argv[0], argv[i]);
2762 return false;
2763 }
2764
2765 arglen = strlen(arg);
2766 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
2767 {
2768 pg_log_error("%s: shell command is too long", argv[0]);
2769 return false;
2770 }
2771
2772 if (i > 0)
2773 command[len++] = ' ';
2774 memcpy(command + len, arg, arglen);
2775 len += arglen;
2776 }
2777
2778 command[len] = '\0';
2779
2780 /* Fast path for non-assignment case */
2781 if (variable == NULL)
2782 {
2783 if (system(command))
2784 {
2785 if (!timer_exceeded)
2786 pg_log_error("%s: could not launch shell command", argv[0]);
2787 return false;
2788 }
2789 return true;
2790 }
2791
2792 /* Execute the command with pipe and read the standard output. */
2793 if ((fp = popen(command, "r")) == NULL)
2794 {
2795 pg_log_error("%s: could not launch shell command", argv[0]);
2796 return false;
2797 }
2798 if (fgets(res, sizeof(res), fp) == NULL)
2799 {
2800 if (!timer_exceeded)
2801 pg_log_error("%s: could not read result of shell command", argv[0]);
2802 (void) pclose(fp);
2803 return false;
2804 }
2805 if (pclose(fp) < 0)
2806 {
2807 pg_log_error("%s: could not close shell command", argv[0]);
2808 return false;
2809 }
2810
2811 /* Check whether the result is an integer and assign it to the variable */
2812 retval = (int) strtol(res, &endptr, 10);
2813 while (*endptr != '\0' && isspace((unsigned char) *endptr))
2814 endptr++;
2815 if (*res == '\0' || *endptr != '\0')
2816 {
2817 pg_log_error("%s: shell command must return an integer (not \"%s\")", argv[0], res);
2818 return false;
2819 }
2820 if (!putVariableInt(st, "setshell", variable, retval))
2821 return false;
2822
2823 pg_log_debug("%s: shell parameter name: \"%s\", value: \"%s\"", argv[0], argv[1], res);
2824
2825 return true;
2826 }
2827
2828 #define MAX_PREPARE_NAME 32
2829 static void
preparedStatementName(char * buffer,int file,int state)2830 preparedStatementName(char *buffer, int file, int state)
2831 {
2832 sprintf(buffer, "P%d_%d", file, state);
2833 }
2834
2835 static void
commandFailed(CState * st,const char * cmd,const char * message)2836 commandFailed(CState *st, const char *cmd, const char *message)
2837 {
2838 pg_log_error("client %d aborted in command %d (%s) of script %d; %s",
2839 st->id, st->command, cmd, st->use_file, message);
2840 }
2841
2842 /* return a script number with a weighted choice. */
2843 static int
chooseScript(TState * thread)2844 chooseScript(TState *thread)
2845 {
2846 int i = 0;
2847 int64 w;
2848
2849 if (num_scripts == 1)
2850 return 0;
2851
2852 w = getrand(&thread->ts_choose_rs, 0, total_weight - 1);
2853 do
2854 {
2855 w -= sql_script[i++].weight;
2856 } while (w >= 0);
2857
2858 return i - 1;
2859 }
2860
2861 /* Send a SQL command, using the chosen querymode */
2862 static bool
sendCommand(CState * st,Command * command)2863 sendCommand(CState *st, Command *command)
2864 {
2865 int r;
2866
2867 if (querymode == QUERY_SIMPLE)
2868 {
2869 char *sql;
2870
2871 sql = pg_strdup(command->argv[0]);
2872 sql = assignVariables(st, sql);
2873
2874 pg_log_debug("client %d sending %s", st->id, sql);
2875 r = PQsendQuery(st->con, sql);
2876 free(sql);
2877 }
2878 else if (querymode == QUERY_EXTENDED)
2879 {
2880 const char *sql = command->argv[0];
2881 const char *params[MAX_ARGS];
2882
2883 getQueryParams(st, command, params);
2884
2885 pg_log_debug("client %d sending %s", st->id, sql);
2886 r = PQsendQueryParams(st->con, sql, command->argc - 1,
2887 NULL, params, NULL, NULL, 0);
2888 }
2889 else if (querymode == QUERY_PREPARED)
2890 {
2891 char name[MAX_PREPARE_NAME];
2892 const char *params[MAX_ARGS];
2893
2894 if (!st->prepared[st->use_file])
2895 {
2896 int j;
2897 Command **commands = sql_script[st->use_file].commands;
2898
2899 for (j = 0; commands[j] != NULL; j++)
2900 {
2901 PGresult *res;
2902 char name[MAX_PREPARE_NAME];
2903
2904 if (commands[j]->type != SQL_COMMAND)
2905 continue;
2906 preparedStatementName(name, st->use_file, j);
2907 if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
2908 {
2909 res = PQprepare(st->con, name,
2910 commands[j]->argv[0], commands[j]->argc - 1, NULL);
2911 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2912 pg_log_error("%s", PQerrorMessage(st->con));
2913 PQclear(res);
2914 }
2915 else
2916 {
2917 /*
2918 * In pipeline mode, we use asynchronous functions. If a
2919 * server-side error occurs, it will be processed later
2920 * among the other results.
2921 */
2922 if (!PQsendPrepare(st->con, name,
2923 commands[j]->argv[0], commands[j]->argc - 1, NULL))
2924 pg_log_error("%s", PQerrorMessage(st->con));
2925 }
2926 }
2927 st->prepared[st->use_file] = true;
2928 }
2929
2930 getQueryParams(st, command, params);
2931 preparedStatementName(name, st->use_file, st->command);
2932
2933 pg_log_debug("client %d sending %s", st->id, name);
2934 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
2935 params, NULL, NULL, 0);
2936 }
2937 else /* unknown sql mode */
2938 r = 0;
2939
2940 if (r == 0)
2941 {
2942 pg_log_debug("client %d could not send %s", st->id, command->argv[0]);
2943 return false;
2944 }
2945 else
2946 return true;
2947 }
2948
2949 /*
2950 * Process query response from the backend.
2951 *
2952 * If varprefix is not NULL, it's the variable name prefix where to store
2953 * the results of the *last* command (META_GSET) or *all* commands
2954 * (META_ASET).
2955 *
2956 * Returns true if everything is A-OK, false if any error occurs.
2957 */
2958 static bool
readCommandResponse(CState * st,MetaCommand meta,char * varprefix)2959 readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
2960 {
2961 PGresult *res;
2962 PGresult *next_res;
2963 int qrynum = 0;
2964
2965 /*
2966 * varprefix should be set only with \gset or \aset, and \endpipeline and
2967 * SQL commands do not need it.
2968 */
2969 Assert((meta == META_NONE && varprefix == NULL) ||
2970 ((meta == META_ENDPIPELINE) && varprefix == NULL) ||
2971 ((meta == META_GSET || meta == META_ASET) && varprefix != NULL));
2972
2973 res = PQgetResult(st->con);
2974
2975 while (res != NULL)
2976 {
2977 bool is_last;
2978
2979 /* peek at the next result to know whether the current is last */
2980 next_res = PQgetResult(st->con);
2981 is_last = (next_res == NULL);
2982
2983 switch (PQresultStatus(res))
2984 {
2985 case PGRES_COMMAND_OK: /* non-SELECT commands */
2986 case PGRES_EMPTY_QUERY: /* may be used for testing no-op overhead */
2987 if (is_last && meta == META_GSET)
2988 {
2989 pg_log_error("client %d script %d command %d query %d: expected one row, got %d",
2990 st->id, st->use_file, st->command, qrynum, 0);
2991 goto error;
2992 }
2993 break;
2994
2995 case PGRES_TUPLES_OK:
2996 if ((is_last && meta == META_GSET) || meta == META_ASET)
2997 {
2998 int ntuples = PQntuples(res);
2999
3000 if (meta == META_GSET && ntuples != 1)
3001 {
3002 /* under \gset, report the error */
3003 pg_log_error("client %d script %d command %d query %d: expected one row, got %d",
3004 st->id, st->use_file, st->command, qrynum, PQntuples(res));
3005 goto error;
3006 }
3007 else if (meta == META_ASET && ntuples <= 0)
3008 {
3009 /* coldly skip empty result under \aset */
3010 break;
3011 }
3012
3013 /* store results into variables */
3014 for (int fld = 0; fld < PQnfields(res); fld++)
3015 {
3016 char *varname = PQfname(res, fld);
3017
3018 /* allocate varname only if necessary, freed below */
3019 if (*varprefix != '\0')
3020 varname = psprintf("%s%s", varprefix, varname);
3021
3022 /* store last row result as a string */
3023 if (!putVariable(st, meta == META_ASET ? "aset" : "gset", varname,
3024 PQgetvalue(res, ntuples - 1, fld)))
3025 {
3026 /* internal error */
3027 pg_log_error("client %d script %d command %d query %d: error storing into variable %s",
3028 st->id, st->use_file, st->command, qrynum, varname);
3029 goto error;
3030 }
3031
3032 if (*varprefix != '\0')
3033 pg_free(varname);
3034 }
3035 }
3036 /* otherwise the result is simply thrown away by PQclear below */
3037 break;
3038
3039 case PGRES_PIPELINE_SYNC:
3040 pg_log_debug("client %d pipeline ending", st->id);
3041 if (PQexitPipelineMode(st->con) != 1)
3042 pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
3043 PQerrorMessage(st->con));
3044 break;
3045
3046 default:
3047 /* anything else is unexpected */
3048 pg_log_error("client %d script %d aborted in command %d query %d: %s",
3049 st->id, st->use_file, st->command, qrynum,
3050 PQerrorMessage(st->con));
3051 goto error;
3052 }
3053
3054 PQclear(res);
3055 qrynum++;
3056 res = next_res;
3057 }
3058
3059 if (qrynum == 0)
3060 {
3061 pg_log_error("client %d command %d: no results", st->id, st->command);
3062 return false;
3063 }
3064
3065 return true;
3066
3067 error:
3068 PQclear(res);
3069 PQclear(next_res);
3070 do
3071 {
3072 res = PQgetResult(st->con);
3073 PQclear(res);
3074 } while (res);
3075
3076 return false;
3077 }
3078
3079 /*
3080 * Parse the argument to a \sleep command, and return the requested amount
3081 * of delay, in microseconds. Returns true on success, false on error.
3082 */
3083 static bool
evaluateSleep(CState * st,int argc,char ** argv,int * usecs)3084 evaluateSleep(CState *st, int argc, char **argv, int *usecs)
3085 {
3086 char *var;
3087 int usec;
3088
3089 if (*argv[1] == ':')
3090 {
3091 if ((var = getVariable(st, argv[1] + 1)) == NULL)
3092 {
3093 pg_log_error("%s: undefined variable \"%s\"", argv[0], argv[1] + 1);
3094 return false;
3095 }
3096
3097 usec = atoi(var);
3098
3099 /* Raise an error if the value of a variable is not a number */
3100 if (usec == 0 && !isdigit((unsigned char) *var))
3101 {
3102 pg_log_error("%s: invalid sleep time \"%s\" for variable \"%s\"",
3103 argv[0], var, argv[1] + 1);
3104 return false;
3105 }
3106 }
3107 else
3108 usec = atoi(argv[1]);
3109
3110 if (argc > 2)
3111 {
3112 if (pg_strcasecmp(argv[2], "ms") == 0)
3113 usec *= 1000;
3114 else if (pg_strcasecmp(argv[2], "s") == 0)
3115 usec *= 1000000;
3116 }
3117 else
3118 usec *= 1000000;
3119
3120 *usecs = usec;
3121 return true;
3122 }
3123
3124 /*
3125 * Advance the state machine of a connection.
3126 */
3127 static void
advanceConnectionState(TState * thread,CState * st,StatsData * agg)3128 advanceConnectionState(TState *thread, CState *st, StatsData *agg)
3129 {
3130
3131 /*
3132 * gettimeofday() isn't free, so we get the current timestamp lazily the
3133 * first time it's needed, and reuse the same value throughout this
3134 * function after that. This also ensures that e.g. the calculated
3135 * latency reported in the log file and in the totals are the same. Zero
3136 * means "not set yet". Reset "now" when we execute shell commands or
3137 * expressions, which might take a non-negligible amount of time, though.
3138 */
3139 pg_time_usec_t now = 0;
3140
3141 /*
3142 * Loop in the state machine, until we have to wait for a result from the
3143 * server or have to sleep for throttling or \sleep.
3144 *
3145 * Note: In the switch-statement below, 'break' will loop back here,
3146 * meaning "continue in the state machine". Return is used to return to
3147 * the caller, giving the thread the opportunity to advance another
3148 * client.
3149 */
3150 for (;;)
3151 {
3152 Command *command;
3153
3154 switch (st->state)
3155 {
3156 /* Select transaction (script) to run. */
3157 case CSTATE_CHOOSE_SCRIPT:
3158 st->use_file = chooseScript(thread);
3159 Assert(conditional_stack_empty(st->cstack));
3160
3161 pg_log_debug("client %d executing script \"%s\"",
3162 st->id, sql_script[st->use_file].desc);
3163
3164 /*
3165 * If time is over, we're done; otherwise, get ready to start
3166 * a new transaction, or to get throttled if that's requested.
3167 */
3168 st->state = timer_exceeded ? CSTATE_FINISHED :
3169 throttle_delay > 0 ? CSTATE_PREPARE_THROTTLE : CSTATE_START_TX;
3170 break;
3171
3172 /* Start new transaction (script) */
3173 case CSTATE_START_TX:
3174 pg_time_now_lazy(&now);
3175
3176 /* establish connection if needed, i.e. under --connect */
3177 if (st->con == NULL)
3178 {
3179 pg_time_usec_t start = now;
3180
3181 if ((st->con = doConnect()) == NULL)
3182 {
3183 pg_log_error("client %d aborted while establishing connection", st->id);
3184 st->state = CSTATE_ABORTED;
3185 break;
3186 }
3187
3188 /* reset now after connection */
3189 now = pg_time_now();
3190
3191 thread->conn_duration += now - start;
3192
3193 /* Reset session-local state */
3194 memset(st->prepared, 0, sizeof(st->prepared));
3195 }
3196
3197 /* record transaction start time */
3198 st->txn_begin = now;
3199
3200 /*
3201 * When not throttling, this is also the transaction's
3202 * scheduled start time.
3203 */
3204 if (!throttle_delay)
3205 st->txn_scheduled = now;
3206
3207 /* Begin with the first command */
3208 st->state = CSTATE_START_COMMAND;
3209 st->command = 0;
3210 break;
3211
3212 /*
3213 * Handle throttling once per transaction by sleeping.
3214 */
3215 case CSTATE_PREPARE_THROTTLE:
3216
3217 /*
3218 * Generate a delay such that the series of delays will
3219 * approximate a Poisson distribution centered on the
3220 * throttle_delay time.
3221 *
3222 * If transactions are too slow or a given wait is shorter
3223 * than a transaction, the next transaction will start right
3224 * away.
3225 */
3226 Assert(throttle_delay > 0);
3227
3228 thread->throttle_trigger +=
3229 getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
3230 st->txn_scheduled = thread->throttle_trigger;
3231
3232 /*
3233 * If --latency-limit is used, and this slot is already late
3234 * so that the transaction will miss the latency limit even if
3235 * it completed immediately, skip this time slot and loop to
3236 * reschedule.
3237 */
3238 if (latency_limit)
3239 {
3240 pg_time_now_lazy(&now);
3241
3242 if (thread->throttle_trigger < now - latency_limit)
3243 {
3244 processXactStats(thread, st, &now, true, agg);
3245
3246 /*
3247 * Finish client if -T or -t was exceeded.
3248 *
3249 * Stop counting skipped transactions under -T as soon
3250 * as the timer is exceeded. Because otherwise it can
3251 * take a very long time to count all of them
3252 * especially when quite a lot of them happen with
3253 * unrealistically high rate setting in -R, which
3254 * would prevent pgbench from ending immediately.
3255 * Because of this behavior, note that there is no
3256 * guarantee that all skipped transactions are counted
3257 * under -T though there is under -t. This is OK in
3258 * practice because it's very unlikely to happen with
3259 * realistic setting.
3260 */
3261 if (timer_exceeded || (nxacts > 0 && st->cnt >= nxacts))
3262 st->state = CSTATE_FINISHED;
3263
3264 /* Go back to top of loop with CSTATE_PREPARE_THROTTLE */
3265 break;
3266 }
3267 }
3268
3269 /*
3270 * stop client if next transaction is beyond pgbench end of
3271 * execution; otherwise, throttle it.
3272 */
3273 st->state = end_time > 0 && st->txn_scheduled > end_time ?
3274 CSTATE_FINISHED : CSTATE_THROTTLE;
3275 break;
3276
3277 /*
3278 * Wait until it's time to start next transaction.
3279 */
3280 case CSTATE_THROTTLE:
3281 pg_time_now_lazy(&now);
3282
3283 if (now < st->txn_scheduled)
3284 return; /* still sleeping, nothing to do here */
3285
3286 /* done sleeping, but don't start transaction if we're done */
3287 st->state = timer_exceeded ? CSTATE_FINISHED : CSTATE_START_TX;
3288 break;
3289
3290 /*
3291 * Send a command to server (or execute a meta-command)
3292 */
3293 case CSTATE_START_COMMAND:
3294 command = sql_script[st->use_file].commands[st->command];
3295
3296 /* Transition to script end processing if done */
3297 if (command == NULL)
3298 {
3299 st->state = CSTATE_END_TX;
3300 break;
3301 }
3302
3303 /* record begin time of next command, and initiate it */
3304 if (report_per_command)
3305 {
3306 pg_time_now_lazy(&now);
3307 st->stmt_begin = now;
3308 }
3309
3310 /* Execute the command */
3311 if (command->type == SQL_COMMAND)
3312 {
3313 /* disallow \aset and \gset in pipeline mode */
3314 if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
3315 {
3316 if (command->meta == META_GSET)
3317 {
3318 commandFailed(st, "gset", "\\gset is not allowed in pipeline mode");
3319 st->state = CSTATE_ABORTED;
3320 break;
3321 }
3322 else if (command->meta == META_ASET)
3323 {
3324 commandFailed(st, "aset", "\\aset is not allowed in pipeline mode");
3325 st->state = CSTATE_ABORTED;
3326 break;
3327 }
3328 }
3329
3330 if (!sendCommand(st, command))
3331 {
3332 commandFailed(st, "SQL", "SQL command send failed");
3333 st->state = CSTATE_ABORTED;
3334 }
3335 else
3336 {
3337 /* Wait for results, unless in pipeline mode */
3338 if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
3339 st->state = CSTATE_WAIT_RESULT;
3340 else
3341 st->state = CSTATE_END_COMMAND;
3342 }
3343 }
3344 else if (command->type == META_COMMAND)
3345 {
3346 /*-----
3347 * Possible state changes when executing meta commands:
3348 * - on errors CSTATE_ABORTED
3349 * - on sleep CSTATE_SLEEP
3350 * - else CSTATE_END_COMMAND
3351 */
3352 st->state = executeMetaCommand(st, &now);
3353 }
3354
3355 /*
3356 * We're now waiting for an SQL command to complete, or
3357 * finished processing a metacommand, or need to sleep, or
3358 * something bad happened.
3359 */
3360 Assert(st->state == CSTATE_WAIT_RESULT ||
3361 st->state == CSTATE_END_COMMAND ||
3362 st->state == CSTATE_SLEEP ||
3363 st->state == CSTATE_ABORTED);
3364 break;
3365
3366 /*
3367 * non executed conditional branch
3368 */
3369 case CSTATE_SKIP_COMMAND:
3370 Assert(!conditional_active(st->cstack));
3371 /* quickly skip commands until something to do... */
3372 while (true)
3373 {
3374 Command *command;
3375
3376 command = sql_script[st->use_file].commands[st->command];
3377
3378 /* cannot reach end of script in that state */
3379 Assert(command != NULL);
3380
3381 /*
3382 * if this is conditional related, update conditional
3383 * state
3384 */
3385 if (command->type == META_COMMAND &&
3386 (command->meta == META_IF ||
3387 command->meta == META_ELIF ||
3388 command->meta == META_ELSE ||
3389 command->meta == META_ENDIF))
3390 {
3391 switch (conditional_stack_peek(st->cstack))
3392 {
3393 case IFSTATE_FALSE:
3394 if (command->meta == META_IF ||
3395 command->meta == META_ELIF)
3396 {
3397 /* we must evaluate the condition */
3398 st->state = CSTATE_START_COMMAND;
3399 }
3400 else if (command->meta == META_ELSE)
3401 {
3402 /* we must execute next command */
3403 conditional_stack_poke(st->cstack,
3404 IFSTATE_ELSE_TRUE);
3405 st->state = CSTATE_START_COMMAND;
3406 st->command++;
3407 }
3408 else if (command->meta == META_ENDIF)
3409 {
3410 Assert(!conditional_stack_empty(st->cstack));
3411 conditional_stack_pop(st->cstack);
3412 if (conditional_active(st->cstack))
3413 st->state = CSTATE_START_COMMAND;
3414
3415 /*
3416 * else state remains in
3417 * CSTATE_SKIP_COMMAND
3418 */
3419 st->command++;
3420 }
3421 break;
3422
3423 case IFSTATE_IGNORED:
3424 case IFSTATE_ELSE_FALSE:
3425 if (command->meta == META_IF)
3426 conditional_stack_push(st->cstack,
3427 IFSTATE_IGNORED);
3428 else if (command->meta == META_ENDIF)
3429 {
3430 Assert(!conditional_stack_empty(st->cstack));
3431 conditional_stack_pop(st->cstack);
3432 if (conditional_active(st->cstack))
3433 st->state = CSTATE_START_COMMAND;
3434 }
3435 /* could detect "else" & "elif" after "else" */
3436 st->command++;
3437 break;
3438
3439 case IFSTATE_NONE:
3440 case IFSTATE_TRUE:
3441 case IFSTATE_ELSE_TRUE:
3442 default:
3443
3444 /*
3445 * inconsistent if inactive, unreachable dead
3446 * code
3447 */
3448 Assert(false);
3449 }
3450 }
3451 else
3452 {
3453 /* skip and consider next */
3454 st->command++;
3455 }
3456
3457 if (st->state != CSTATE_SKIP_COMMAND)
3458 /* out of quick skip command loop */
3459 break;
3460 }
3461 break;
3462
3463 /*
3464 * Wait for the current SQL command to complete
3465 */
3466 case CSTATE_WAIT_RESULT:
3467 pg_log_debug("client %d receiving", st->id);
3468
3469 /*
3470 * Only check for new network data if we processed all data
3471 * fetched prior. Otherwise we end up doing a syscall for each
3472 * individual pipelined query, which has a measurable
3473 * performance impact.
3474 */
3475 if (PQisBusy(st->con) && !PQconsumeInput(st->con))
3476 {
3477 /* there's something wrong */
3478 commandFailed(st, "SQL", "perhaps the backend died while processing");
3479 st->state = CSTATE_ABORTED;
3480 break;
3481 }
3482 if (PQisBusy(st->con))
3483 return; /* don't have the whole result yet */
3484
3485 /* store or discard the query results */
3486 if (readCommandResponse(st,
3487 sql_script[st->use_file].commands[st->command]->meta,
3488 sql_script[st->use_file].commands[st->command]->varprefix))
3489 {
3490 /*
3491 * outside of pipeline mode: stop reading results.
3492 * pipeline mode: continue reading results until an
3493 * end-of-pipeline response.
3494 */
3495 if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
3496 st->state = CSTATE_END_COMMAND;
3497 }
3498 else
3499 st->state = CSTATE_ABORTED;
3500 break;
3501
3502 /*
3503 * Wait until sleep is done. This state is entered after a
3504 * \sleep metacommand. The behavior is similar to
3505 * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
3506 * instead of CSTATE_START_TX.
3507 */
3508 case CSTATE_SLEEP:
3509 pg_time_now_lazy(&now);
3510 if (now < st->sleep_until)
3511 return; /* still sleeping, nothing to do here */
3512 /* Else done sleeping. */
3513 st->state = CSTATE_END_COMMAND;
3514 break;
3515
3516 /*
3517 * End of command: record stats and proceed to next command.
3518 */
3519 case CSTATE_END_COMMAND:
3520
3521 /*
3522 * command completed: accumulate per-command execution times
3523 * in thread-local data structure, if per-command latencies
3524 * are requested.
3525 */
3526 if (report_per_command)
3527 {
3528 Command *command;
3529
3530 pg_time_now_lazy(&now);
3531
3532 command = sql_script[st->use_file].commands[st->command];
3533 /* XXX could use a mutex here, but we choose not to */
3534 addToSimpleStats(&command->stats,
3535 PG_TIME_GET_DOUBLE(now - st->stmt_begin));
3536 }
3537
3538 /* Go ahead with next command, to be executed or skipped */
3539 st->command++;
3540 st->state = conditional_active(st->cstack) ?
3541 CSTATE_START_COMMAND : CSTATE_SKIP_COMMAND;
3542 break;
3543
3544 /*
3545 * End of transaction (end of script, really).
3546 */
3547 case CSTATE_END_TX:
3548
3549 /* transaction finished: calculate latency and do log */
3550 processXactStats(thread, st, &now, false, agg);
3551
3552 /*
3553 * missing \endif... cannot happen if CheckConditional was
3554 * okay
3555 */
3556 Assert(conditional_stack_empty(st->cstack));
3557
3558 if (is_connect)
3559 {
3560 pg_time_usec_t start = now;
3561
3562 pg_time_now_lazy(&start);
3563 finishCon(st);
3564 now = pg_time_now();
3565 thread->conn_duration += now - start;
3566 }
3567
3568 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
3569 {
3570 /* script completed */
3571 st->state = CSTATE_FINISHED;
3572 break;
3573 }
3574
3575 /* next transaction (script) */
3576 st->state = CSTATE_CHOOSE_SCRIPT;
3577
3578 /*
3579 * Ensure that we always return on this point, so as to avoid
3580 * an infinite loop if the script only contains meta commands.
3581 */
3582 return;
3583
3584 /*
3585 * Final states. Close the connection if it's still open.
3586 */
3587 case CSTATE_ABORTED:
3588 case CSTATE_FINISHED:
3589
3590 /*
3591 * Don't measure the disconnection delays here even if in
3592 * CSTATE_FINISHED and -C/--connect option is specified.
3593 * Because in this case all the connections that this thread
3594 * established are closed at the end of transactions and the
3595 * disconnection delays should have already been measured at
3596 * that moment.
3597 *
3598 * In CSTATE_ABORTED state, the measurement is no longer
3599 * necessary because we cannot report complete results anyways
3600 * in this case.
3601 */
3602 finishCon(st);
3603 return;
3604 }
3605 }
3606 }
3607
3608 /*
3609 * Subroutine for advanceConnectionState -- initiate or execute the current
3610 * meta command, and return the next state to set.
3611 *
3612 * *now is updated to the current time, unless the command is expected to
3613 * take no time to execute.
3614 */
3615 static ConnectionStateEnum
executeMetaCommand(CState * st,pg_time_usec_t * now)3616 executeMetaCommand(CState *st, pg_time_usec_t *now)
3617 {
3618 Command *command = sql_script[st->use_file].commands[st->command];
3619 int argc;
3620 char **argv;
3621
3622 Assert(command != NULL && command->type == META_COMMAND);
3623
3624 argc = command->argc;
3625 argv = command->argv;
3626
3627 if (unlikely(__pg_log_level <= PG_LOG_DEBUG))
3628 {
3629 PQExpBufferData buf;
3630
3631 initPQExpBuffer(&buf);
3632
3633 printfPQExpBuffer(&buf, "client %d executing \\%s", st->id, argv[0]);
3634 for (int i = 1; i < argc; i++)
3635 appendPQExpBuffer(&buf, " %s", argv[i]);
3636
3637 pg_log_debug("%s", buf.data);
3638
3639 termPQExpBuffer(&buf);
3640 }
3641
3642 if (command->meta == META_SLEEP)
3643 {
3644 int usec;
3645
3646 /*
3647 * A \sleep doesn't execute anything, we just get the delay from the
3648 * argument, and enter the CSTATE_SLEEP state. (The per-command
3649 * latency will be recorded in CSTATE_SLEEP state, not here, after the
3650 * delay has elapsed.)
3651 */
3652 if (!evaluateSleep(st, argc, argv, &usec))
3653 {
3654 commandFailed(st, "sleep", "execution of meta-command failed");
3655 return CSTATE_ABORTED;
3656 }
3657
3658 pg_time_now_lazy(now);
3659 st->sleep_until = (*now) + usec;
3660 return CSTATE_SLEEP;
3661 }
3662 else if (command->meta == META_SET)
3663 {
3664 PgBenchExpr *expr = command->expr;
3665 PgBenchValue result;
3666
3667 if (!evaluateExpr(st, expr, &result))
3668 {
3669 commandFailed(st, argv[0], "evaluation of meta-command failed");
3670 return CSTATE_ABORTED;
3671 }
3672
3673 if (!putVariableValue(st, argv[0], argv[1], &result))
3674 {
3675 commandFailed(st, "set", "assignment of meta-command failed");
3676 return CSTATE_ABORTED;
3677 }
3678 }
3679 else if (command->meta == META_IF)
3680 {
3681 /* backslash commands with an expression to evaluate */
3682 PgBenchExpr *expr = command->expr;
3683 PgBenchValue result;
3684 bool cond;
3685
3686 if (!evaluateExpr(st, expr, &result))
3687 {
3688 commandFailed(st, argv[0], "evaluation of meta-command failed");
3689 return CSTATE_ABORTED;
3690 }
3691
3692 cond = valueTruth(&result);
3693 conditional_stack_push(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
3694 }
3695 else if (command->meta == META_ELIF)
3696 {
3697 /* backslash commands with an expression to evaluate */
3698 PgBenchExpr *expr = command->expr;
3699 PgBenchValue result;
3700 bool cond;
3701
3702 if (conditional_stack_peek(st->cstack) == IFSTATE_TRUE)
3703 {
3704 /* elif after executed block, skip eval and wait for endif. */
3705 conditional_stack_poke(st->cstack, IFSTATE_IGNORED);
3706 return CSTATE_END_COMMAND;
3707 }
3708
3709 if (!evaluateExpr(st, expr, &result))
3710 {
3711 commandFailed(st, argv[0], "evaluation of meta-command failed");
3712 return CSTATE_ABORTED;
3713 }
3714
3715 cond = valueTruth(&result);
3716 Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
3717 conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
3718 }
3719 else if (command->meta == META_ELSE)
3720 {
3721 switch (conditional_stack_peek(st->cstack))
3722 {
3723 case IFSTATE_TRUE:
3724 conditional_stack_poke(st->cstack, IFSTATE_ELSE_FALSE);
3725 break;
3726 case IFSTATE_FALSE: /* inconsistent if active */
3727 case IFSTATE_IGNORED: /* inconsistent if active */
3728 case IFSTATE_NONE: /* else without if */
3729 case IFSTATE_ELSE_TRUE: /* else after else */
3730 case IFSTATE_ELSE_FALSE: /* else after else */
3731 default:
3732 /* dead code if conditional check is ok */
3733 Assert(false);
3734 }
3735 }
3736 else if (command->meta == META_ENDIF)
3737 {
3738 Assert(!conditional_stack_empty(st->cstack));
3739 conditional_stack_pop(st->cstack);
3740 }
3741 else if (command->meta == META_SETSHELL)
3742 {
3743 if (!runShellCommand(st, argv[1], argv + 2, argc - 2))
3744 {
3745 commandFailed(st, "setshell", "execution of meta-command failed");
3746 return CSTATE_ABORTED;
3747 }
3748 }
3749 else if (command->meta == META_SHELL)
3750 {
3751 if (!runShellCommand(st, NULL, argv + 1, argc - 1))
3752 {
3753 commandFailed(st, "shell", "execution of meta-command failed");
3754 return CSTATE_ABORTED;
3755 }
3756 }
3757 else if (command->meta == META_STARTPIPELINE)
3758 {
3759 /*
3760 * In pipeline mode, we use a workflow based on libpq pipeline
3761 * functions.
3762 */
3763 if (querymode == QUERY_SIMPLE)
3764 {
3765 commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol");
3766 return CSTATE_ABORTED;
3767 }
3768
3769 if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
3770 {
3771 commandFailed(st, "startpipeline", "already in pipeline mode");
3772 return CSTATE_ABORTED;
3773 }
3774 if (PQenterPipelineMode(st->con) == 0)
3775 {
3776 commandFailed(st, "startpipeline", "failed to enter pipeline mode");
3777 return CSTATE_ABORTED;
3778 }
3779 }
3780 else if (command->meta == META_ENDPIPELINE)
3781 {
3782 if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
3783 {
3784 commandFailed(st, "endpipeline", "not in pipeline mode");
3785 return CSTATE_ABORTED;
3786 }
3787 if (!PQpipelineSync(st->con))
3788 {
3789 commandFailed(st, "endpipeline", "failed to send a pipeline sync");
3790 return CSTATE_ABORTED;
3791 }
3792 /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
3793 /* collect pending results before getting out of pipeline mode */
3794 return CSTATE_WAIT_RESULT;
3795 }
3796
3797 /*
3798 * executing the expression or shell command might have taken a
3799 * non-negligible amount of time, so reset 'now'
3800 */
3801 *now = 0;
3802
3803 return CSTATE_END_COMMAND;
3804 }
3805
3806 /*
3807 * Print log entry after completing one transaction.
3808 *
3809 * We print Unix-epoch timestamps in the log, so that entries can be
3810 * correlated against other logs.
3811 *
3812 * XXX We could obtain the time from the caller and just shift it here, to
3813 * avoid the cost of an extra call to pg_time_now().
3814 */
3815 static void
doLog(TState * thread,CState * st,StatsData * agg,bool skipped,double latency,double lag)3816 doLog(TState *thread, CState *st,
3817 StatsData *agg, bool skipped, double latency, double lag)
3818 {
3819 FILE *logfile = thread->logfile;
3820 pg_time_usec_t now = pg_time_now() + epoch_shift;
3821
3822 Assert(use_log);
3823
3824 /*
3825 * Skip the log entry if sampling is enabled and this row doesn't belong
3826 * to the random sample.
3827 */
3828 if (sample_rate != 0.0 &&
3829 pg_erand48(thread->ts_sample_rs.xseed) > sample_rate)
3830 return;
3831
3832 /* should we aggregate the results or not? */
3833 if (agg_interval > 0)
3834 {
3835 pg_time_usec_t next;
3836
3837 /*
3838 * Loop until we reach the interval of the current moment, and print
3839 * any empty intervals in between (this may happen with very low tps,
3840 * e.g. --rate=0.1).
3841 */
3842
3843 while ((next = agg->start_time + agg_interval * INT64CONST(1000000)) <= now)
3844 {
3845 /* print aggregated report to logfile */
3846 fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
3847 agg->start_time / 1000000, /* seconds since Unix epoch */
3848 agg->cnt,
3849 agg->latency.sum,
3850 agg->latency.sum2,
3851 agg->latency.min,
3852 agg->latency.max);
3853 if (throttle_delay)
3854 {
3855 fprintf(logfile, " %.0f %.0f %.0f %.0f",
3856 agg->lag.sum,
3857 agg->lag.sum2,
3858 agg->lag.min,
3859 agg->lag.max);
3860 if (latency_limit)
3861 fprintf(logfile, " " INT64_FORMAT, agg->skipped);
3862 }
3863 fputc('\n', logfile);
3864
3865 /* reset data and move to next interval */
3866 initStats(agg, next);
3867 }
3868
3869 /* accumulate the current transaction */
3870 accumStats(agg, skipped, latency, lag);
3871 }
3872 else
3873 {
3874 /* no, print raw transactions */
3875 if (skipped)
3876 fprintf(logfile, "%d " INT64_FORMAT " skipped %d " INT64_FORMAT " "
3877 INT64_FORMAT,
3878 st->id, st->cnt, st->use_file, now / 1000000, now % 1000000);
3879 else
3880 fprintf(logfile, "%d " INT64_FORMAT " %.0f %d " INT64_FORMAT " "
3881 INT64_FORMAT,
3882 st->id, st->cnt, latency, st->use_file,
3883 now / 1000000, now % 1000000);
3884 if (throttle_delay)
3885 fprintf(logfile, " %.0f", lag);
3886 fputc('\n', logfile);
3887 }
3888 }
3889
3890 /*
3891 * Accumulate and report statistics at end of a transaction.
3892 *
3893 * (This is also called when a transaction is late and thus skipped.
3894 * Note that even skipped transactions are counted in the "cnt" fields.)
3895 */
3896 static void
processXactStats(TState * thread,CState * st,pg_time_usec_t * now,bool skipped,StatsData * agg)3897 processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
3898 bool skipped, StatsData *agg)
3899 {
3900 double latency = 0.0,
3901 lag = 0.0;
3902 bool thread_details = progress || throttle_delay || latency_limit,
3903 detailed = thread_details || use_log || per_script_stats;
3904
3905 if (detailed && !skipped)
3906 {
3907 pg_time_now_lazy(now);
3908
3909 /* compute latency & lag */
3910 latency = (*now) - st->txn_scheduled;
3911 lag = st->txn_begin - st->txn_scheduled;
3912 }
3913
3914 if (thread_details)
3915 {
3916 /* keep detailed thread stats */
3917 accumStats(&thread->stats, skipped, latency, lag);
3918
3919 /* count transactions over the latency limit, if needed */
3920 if (latency_limit && latency > latency_limit)
3921 thread->latency_late++;
3922 }
3923 else
3924 {
3925 /* no detailed stats, just count */
3926 thread->stats.cnt++;
3927 }
3928
3929 /* client stat is just counting */
3930 st->cnt++;
3931
3932 if (use_log)
3933 doLog(thread, st, agg, skipped, latency, lag);
3934
3935 /* XXX could use a mutex here, but we choose not to */
3936 if (per_script_stats)
3937 accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
3938 }
3939
3940
3941 /* discard connections */
3942 static void
disconnect_all(CState * state,int length)3943 disconnect_all(CState *state, int length)
3944 {
3945 int i;
3946
3947 for (i = 0; i < length; i++)
3948 finishCon(&state[i]);
3949 }
3950
3951 /*
3952 * Remove old pgbench tables, if any exist
3953 */
3954 static void
initDropTables(PGconn * con)3955 initDropTables(PGconn *con)
3956 {
3957 fprintf(stderr, "dropping old tables...\n");
3958
3959 /*
3960 * We drop all the tables in one command, so that whether there are
3961 * foreign key dependencies or not doesn't matter.
3962 */
3963 executeStatement(con, "drop table if exists "
3964 "pgbench_accounts, "
3965 "pgbench_branches, "
3966 "pgbench_history, "
3967 "pgbench_tellers");
3968 }
3969
3970 /*
3971 * Create "pgbench_accounts" partitions if needed.
3972 *
3973 * This is the larger table of pgbench default tpc-b like schema
3974 * with a known size, so we choose to partition it.
3975 */
3976 static void
createPartitions(PGconn * con)3977 createPartitions(PGconn *con)
3978 {
3979 PQExpBufferData query;
3980
3981 /* we must have to create some partitions */
3982 Assert(partitions > 0);
3983
3984 fprintf(stderr, "creating %d partitions...\n", partitions);
3985
3986 initPQExpBuffer(&query);
3987
3988 for (int p = 1; p <= partitions; p++)
3989 {
3990 if (partition_method == PART_RANGE)
3991 {
3992 int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
3993
3994 printfPQExpBuffer(&query,
3995 "create%s table pgbench_accounts_%d\n"
3996 " partition of pgbench_accounts\n"
3997 " for values from (",
3998 unlogged_tables ? " unlogged" : "", p);
3999
4000 /*
4001 * For RANGE, we use open-ended partitions at the beginning and
4002 * end to allow any valid value for the primary key. Although the
4003 * actual minimum and maximum values can be derived from the
4004 * scale, it is more generic and the performance is better.
4005 */
4006 if (p == 1)
4007 appendPQExpBufferStr(&query, "minvalue");
4008 else
4009 appendPQExpBuffer(&query, INT64_FORMAT, (p - 1) * part_size + 1);
4010
4011 appendPQExpBufferStr(&query, ") to (");
4012
4013 if (p < partitions)
4014 appendPQExpBuffer(&query, INT64_FORMAT, p * part_size + 1);
4015 else
4016 appendPQExpBufferStr(&query, "maxvalue");
4017
4018 appendPQExpBufferChar(&query, ')');
4019 }
4020 else if (partition_method == PART_HASH)
4021 printfPQExpBuffer(&query,
4022 "create%s table pgbench_accounts_%d\n"
4023 " partition of pgbench_accounts\n"
4024 " for values with (modulus %d, remainder %d)",
4025 unlogged_tables ? " unlogged" : "", p,
4026 partitions, p - 1);
4027 else /* cannot get there */
4028 Assert(0);
4029
4030 /*
4031 * Per ddlinfo in initCreateTables, fillfactor is needed on table
4032 * pgbench_accounts.
4033 */
4034 appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
4035
4036 executeStatement(con, query.data);
4037 }
4038
4039 termPQExpBuffer(&query);
4040 }
4041
4042 /*
4043 * Create pgbench's standard tables
4044 */
4045 static void
initCreateTables(PGconn * con)4046 initCreateTables(PGconn *con)
4047 {
4048 /*
4049 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
4050 * fields in these table declarations were intended to comply with that.
4051 * The pgbench_accounts table complies with that because the "filler"
4052 * column is set to blank-padded empty string. But for all other tables
4053 * the columns default to NULL and so don't actually take any space. We
4054 * could fix that by giving them non-null default values. However, that
4055 * would completely break comparability of pgbench results with prior
4056 * versions. Since pgbench has never pretended to be fully TPC-B compliant
4057 * anyway, we stick with the historical behavior.
4058 */
4059 struct ddlinfo
4060 {
4061 const char *table; /* table name */
4062 const char *smcols; /* column decls if accountIDs are 32 bits */
4063 const char *bigcols; /* column decls if accountIDs are 64 bits */
4064 int declare_fillfactor;
4065 };
4066 static const struct ddlinfo DDLs[] = {
4067 {
4068 "pgbench_history",
4069 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
4070 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
4071 0
4072 },
4073 {
4074 "pgbench_tellers",
4075 "tid int not null,bid int,tbalance int,filler char(84)",
4076 "tid int not null,bid int,tbalance int,filler char(84)",
4077 1
4078 },
4079 {
4080 "pgbench_accounts",
4081 "aid int not null,bid int,abalance int,filler char(84)",
4082 "aid bigint not null,bid int,abalance int,filler char(84)",
4083 1
4084 },
4085 {
4086 "pgbench_branches",
4087 "bid int not null,bbalance int,filler char(88)",
4088 "bid int not null,bbalance int,filler char(88)",
4089 1
4090 }
4091 };
4092 int i;
4093 PQExpBufferData query;
4094
4095 fprintf(stderr, "creating tables...\n");
4096
4097 initPQExpBuffer(&query);
4098
4099 for (i = 0; i < lengthof(DDLs); i++)
4100 {
4101 const struct ddlinfo *ddl = &DDLs[i];
4102
4103 /* Construct new create table statement. */
4104 printfPQExpBuffer(&query, "create%s table %s(%s)",
4105 unlogged_tables ? " unlogged" : "",
4106 ddl->table,
4107 (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols);
4108
4109 /* Partition pgbench_accounts table */
4110 if (partition_method != PART_NONE && strcmp(ddl->table, "pgbench_accounts") == 0)
4111 appendPQExpBuffer(&query,
4112 " partition by %s (aid)", PARTITION_METHOD[partition_method]);
4113 else if (ddl->declare_fillfactor)
4114 {
4115 /* fillfactor is only expected on actual tables */
4116 appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
4117 }
4118
4119 if (tablespace != NULL)
4120 {
4121 char *escape_tablespace;
4122
4123 escape_tablespace = PQescapeIdentifier(con, tablespace, strlen(tablespace));
4124 appendPQExpBuffer(&query, " tablespace %s", escape_tablespace);
4125 PQfreemem(escape_tablespace);
4126 }
4127
4128 executeStatement(con, query.data);
4129 }
4130
4131 termPQExpBuffer(&query);
4132
4133 if (partition_method != PART_NONE)
4134 createPartitions(con);
4135 }
4136
4137 /*
4138 * Truncate away any old data, in one command in case there are foreign keys
4139 */
4140 static void
initTruncateTables(PGconn * con)4141 initTruncateTables(PGconn *con)
4142 {
4143 executeStatement(con, "truncate table "
4144 "pgbench_accounts, "
4145 "pgbench_branches, "
4146 "pgbench_history, "
4147 "pgbench_tellers");
4148 }
4149
4150 /*
4151 * Fill the standard tables with some data generated and sent from the client
4152 */
4153 static void
initGenerateDataClientSide(PGconn * con)4154 initGenerateDataClientSide(PGconn *con)
4155 {
4156 PQExpBufferData sql;
4157 PGresult *res;
4158 int i;
4159 int64 k;
4160
4161 /* used to track elapsed time and estimate of the remaining time */
4162 pg_time_usec_t start;
4163 int log_interval = 1;
4164
4165 /* Stay on the same line if reporting to a terminal */
4166 char eol = isatty(fileno(stderr)) ? '\r' : '\n';
4167
4168 fprintf(stderr, "generating data (client-side)...\n");
4169
4170 /*
4171 * we do all of this in one transaction to enable the backend's
4172 * data-loading optimizations
4173 */
4174 executeStatement(con, "begin");
4175
4176 /* truncate away any old data */
4177 initTruncateTables(con);
4178
4179 initPQExpBuffer(&sql);
4180
4181 /*
4182 * fill branches, tellers, accounts in that order in case foreign keys
4183 * already exist
4184 */
4185 for (i = 0; i < nbranches * scale; i++)
4186 {
4187 /* "filler" column defaults to NULL */
4188 printfPQExpBuffer(&sql,
4189 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
4190 i + 1);
4191 executeStatement(con, sql.data);
4192 }
4193
4194 for (i = 0; i < ntellers * scale; i++)
4195 {
4196 /* "filler" column defaults to NULL */
4197 printfPQExpBuffer(&sql,
4198 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
4199 i + 1, i / ntellers + 1);
4200 executeStatement(con, sql.data);
4201 }
4202
4203 /*
4204 * accounts is big enough to be worth using COPY and tracking runtime
4205 */
4206 res = PQexec(con, "copy pgbench_accounts from stdin");
4207 if (PQresultStatus(res) != PGRES_COPY_IN)
4208 {
4209 pg_log_fatal("unexpected copy in result: %s", PQerrorMessage(con));
4210 exit(1);
4211 }
4212 PQclear(res);
4213
4214 start = pg_time_now();
4215
4216 for (k = 0; k < (int64) naccounts * scale; k++)
4217 {
4218 int64 j = k + 1;
4219
4220 /* "filler" column defaults to blank padded empty string */
4221 printfPQExpBuffer(&sql,
4222 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
4223 j, k / naccounts + 1, 0);
4224 if (PQputline(con, sql.data))
4225 {
4226 pg_log_fatal("PQputline failed");
4227 exit(1);
4228 }
4229
4230 if (CancelRequested)
4231 break;
4232
4233 /*
4234 * If we want to stick with the original logging, print a message each
4235 * 100k inserted rows.
4236 */
4237 if ((!use_quiet) && (j % 100000 == 0))
4238 {
4239 double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
4240 double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
4241
4242 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
4243 j, (int64) naccounts * scale,
4244 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
4245 elapsed_sec, remaining_sec, eol);
4246 }
4247 /* let's not call the timing for each row, but only each 100 rows */
4248 else if (use_quiet && (j % 100 == 0))
4249 {
4250 double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
4251 double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
4252
4253 /* have we reached the next interval (or end)? */
4254 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
4255 {
4256 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
4257 j, (int64) naccounts * scale,
4258 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec, eol);
4259
4260 /* skip to the next interval */
4261 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
4262 }
4263 }
4264 }
4265
4266 if (eol != '\n')
4267 fputc('\n', stderr); /* Need to move to next line */
4268
4269 if (PQputline(con, "\\.\n"))
4270 {
4271 pg_log_fatal("very last PQputline failed");
4272 exit(1);
4273 }
4274 if (PQendcopy(con))
4275 {
4276 pg_log_fatal("PQendcopy failed");
4277 exit(1);
4278 }
4279
4280 termPQExpBuffer(&sql);
4281
4282 executeStatement(con, "commit");
4283 }
4284
4285 /*
4286 * Fill the standard tables with some data generated on the server
4287 *
4288 * As already the case with the client-side data generation, the filler
4289 * column defaults to NULL in pgbench_branches and pgbench_tellers,
4290 * and is a blank-padded string in pgbench_accounts.
4291 */
4292 static void
initGenerateDataServerSide(PGconn * con)4293 initGenerateDataServerSide(PGconn *con)
4294 {
4295 PQExpBufferData sql;
4296
4297 fprintf(stderr, "generating data (server-side)...\n");
4298
4299 /*
4300 * we do all of this in one transaction to enable the backend's
4301 * data-loading optimizations
4302 */
4303 executeStatement(con, "begin");
4304
4305 /* truncate away any old data */
4306 initTruncateTables(con);
4307
4308 initPQExpBuffer(&sql);
4309
4310 printfPQExpBuffer(&sql,
4311 "insert into pgbench_branches(bid,bbalance) "
4312 "select bid, 0 "
4313 "from generate_series(1, %d) as bid", nbranches * scale);
4314 executeStatement(con, sql.data);
4315
4316 printfPQExpBuffer(&sql,
4317 "insert into pgbench_tellers(tid,bid,tbalance) "
4318 "select tid, (tid - 1) / %d + 1, 0 "
4319 "from generate_series(1, %d) as tid", ntellers, ntellers * scale);
4320 executeStatement(con, sql.data);
4321
4322 printfPQExpBuffer(&sql,
4323 "insert into pgbench_accounts(aid,bid,abalance,filler) "
4324 "select aid, (aid - 1) / %d + 1, 0, '' "
4325 "from generate_series(1, " INT64_FORMAT ") as aid",
4326 naccounts, (int64) naccounts * scale);
4327 executeStatement(con, sql.data);
4328
4329 termPQExpBuffer(&sql);
4330
4331 executeStatement(con, "commit");
4332 }
4333
4334 /*
4335 * Invoke vacuum on the standard tables
4336 */
4337 static void
initVacuum(PGconn * con)4338 initVacuum(PGconn *con)
4339 {
4340 fprintf(stderr, "vacuuming...\n");
4341 executeStatement(con, "vacuum analyze pgbench_branches");
4342 executeStatement(con, "vacuum analyze pgbench_tellers");
4343 executeStatement(con, "vacuum analyze pgbench_accounts");
4344 executeStatement(con, "vacuum analyze pgbench_history");
4345 }
4346
4347 /*
4348 * Create primary keys on the standard tables
4349 */
4350 static void
initCreatePKeys(PGconn * con)4351 initCreatePKeys(PGconn *con)
4352 {
4353 static const char *const DDLINDEXes[] = {
4354 "alter table pgbench_branches add primary key (bid)",
4355 "alter table pgbench_tellers add primary key (tid)",
4356 "alter table pgbench_accounts add primary key (aid)"
4357 };
4358 int i;
4359 PQExpBufferData query;
4360
4361 fprintf(stderr, "creating primary keys...\n");
4362 initPQExpBuffer(&query);
4363
4364 for (i = 0; i < lengthof(DDLINDEXes); i++)
4365 {
4366 resetPQExpBuffer(&query);
4367 appendPQExpBufferStr(&query, DDLINDEXes[i]);
4368
4369 if (index_tablespace != NULL)
4370 {
4371 char *escape_tablespace;
4372
4373 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
4374 strlen(index_tablespace));
4375 appendPQExpBuffer(&query, " using index tablespace %s", escape_tablespace);
4376 PQfreemem(escape_tablespace);
4377 }
4378
4379 executeStatement(con, query.data);
4380 }
4381
4382 termPQExpBuffer(&query);
4383 }
4384
4385 /*
4386 * Create foreign key constraints between the standard tables
4387 */
4388 static void
initCreateFKeys(PGconn * con)4389 initCreateFKeys(PGconn *con)
4390 {
4391 static const char *const DDLKEYs[] = {
4392 "alter table pgbench_tellers add constraint pgbench_tellers_bid_fkey foreign key (bid) references pgbench_branches",
4393 "alter table pgbench_accounts add constraint pgbench_accounts_bid_fkey foreign key (bid) references pgbench_branches",
4394 "alter table pgbench_history add constraint pgbench_history_bid_fkey foreign key (bid) references pgbench_branches",
4395 "alter table pgbench_history add constraint pgbench_history_tid_fkey foreign key (tid) references pgbench_tellers",
4396 "alter table pgbench_history add constraint pgbench_history_aid_fkey foreign key (aid) references pgbench_accounts"
4397 };
4398 int i;
4399
4400 fprintf(stderr, "creating foreign keys...\n");
4401 for (i = 0; i < lengthof(DDLKEYs); i++)
4402 {
4403 executeStatement(con, DDLKEYs[i]);
4404 }
4405 }
4406
4407 /*
4408 * Validate an initialization-steps string
4409 *
4410 * (We could just leave it to runInitSteps() to fail if there are wrong
4411 * characters, but since initialization can take awhile, it seems friendlier
4412 * to check during option parsing.)
4413 */
4414 static void
checkInitSteps(const char * initialize_steps)4415 checkInitSteps(const char *initialize_steps)
4416 {
4417 if (initialize_steps[0] == '\0')
4418 {
4419 pg_log_fatal("no initialization steps specified");
4420 exit(1);
4421 }
4422
4423 for (const char *step = initialize_steps; *step != '\0'; step++)
4424 {
4425 if (strchr(ALL_INIT_STEPS " ", *step) == NULL)
4426 {
4427 pg_log_fatal("unrecognized initialization step \"%c\"", *step);
4428 pg_log_info("Allowed step characters are: \"" ALL_INIT_STEPS "\".");
4429 exit(1);
4430 }
4431 }
4432 }
4433
4434 /*
4435 * Invoke each initialization step in the given string
4436 */
4437 static void
runInitSteps(const char * initialize_steps)4438 runInitSteps(const char *initialize_steps)
4439 {
4440 PQExpBufferData stats;
4441 PGconn *con;
4442 const char *step;
4443 double run_time = 0.0;
4444 bool first = true;
4445
4446 initPQExpBuffer(&stats);
4447
4448 if ((con = doConnect()) == NULL)
4449 exit(1);
4450
4451 setup_cancel_handler(NULL);
4452 SetCancelConn(con);
4453
4454 for (step = initialize_steps; *step != '\0'; step++)
4455 {
4456 char *op = NULL;
4457 pg_time_usec_t start = pg_time_now();
4458
4459 switch (*step)
4460 {
4461 case 'd':
4462 op = "drop tables";
4463 initDropTables(con);
4464 break;
4465 case 't':
4466 op = "create tables";
4467 initCreateTables(con);
4468 break;
4469 case 'g':
4470 op = "client-side generate";
4471 initGenerateDataClientSide(con);
4472 break;
4473 case 'G':
4474 op = "server-side generate";
4475 initGenerateDataServerSide(con);
4476 break;
4477 case 'v':
4478 op = "vacuum";
4479 initVacuum(con);
4480 break;
4481 case 'p':
4482 op = "primary keys";
4483 initCreatePKeys(con);
4484 break;
4485 case 'f':
4486 op = "foreign keys";
4487 initCreateFKeys(con);
4488 break;
4489 case ' ':
4490 break; /* ignore */
4491 default:
4492 pg_log_fatal("unrecognized initialization step \"%c\"", *step);
4493 PQfinish(con);
4494 exit(1);
4495 }
4496
4497 if (op != NULL)
4498 {
4499 double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
4500
4501 if (!first)
4502 appendPQExpBufferStr(&stats, ", ");
4503 else
4504 first = false;
4505
4506 appendPQExpBuffer(&stats, "%s %.2f s", op, elapsed_sec);
4507
4508 run_time += elapsed_sec;
4509 }
4510 }
4511
4512 fprintf(stderr, "done in %.2f s (%s).\n", run_time, stats.data);
4513 ResetCancelConn();
4514 PQfinish(con);
4515 termPQExpBuffer(&stats);
4516 }
4517
4518 /*
4519 * Extract pgbench table information into global variables scale,
4520 * partition_method and partitions.
4521 */
4522 static void
GetTableInfo(PGconn * con,bool scale_given)4523 GetTableInfo(PGconn *con, bool scale_given)
4524 {
4525 PGresult *res;
4526
4527 /*
4528 * get the scaling factor that should be same as count(*) from
4529 * pgbench_branches if this is not a custom query
4530 */
4531 res = PQexec(con, "select count(*) from pgbench_branches");
4532 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4533 {
4534 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
4535
4536 pg_log_fatal("could not count number of branches: %s", PQerrorMessage(con));
4537
4538 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
4539 pg_log_info("Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"",
4540 PQdb(con));
4541
4542 exit(1);
4543 }
4544 scale = atoi(PQgetvalue(res, 0, 0));
4545 if (scale < 0)
4546 {
4547 pg_log_fatal("invalid count(*) from pgbench_branches: \"%s\"",
4548 PQgetvalue(res, 0, 0));
4549 exit(1);
4550 }
4551 PQclear(res);
4552
4553 /* warn if we override user-given -s switch */
4554 if (scale_given)
4555 pg_log_warning("scale option ignored, using count from pgbench_branches table (%d)",
4556 scale);
4557
4558 /*
4559 * Get the partition information for the first "pgbench_accounts" table
4560 * found in search_path.
4561 *
4562 * The result is empty if no "pgbench_accounts" is found.
4563 *
4564 * Otherwise, it always returns one row even if the table is not
4565 * partitioned (in which case the partition strategy is NULL).
4566 *
4567 * The number of partitions can be 0 even for partitioned tables, if no
4568 * partition is attached.
4569 *
4570 * We assume no partitioning on any failure, so as to avoid failing on an
4571 * old version without "pg_partitioned_table".
4572 */
4573 res = PQexec(con,
4574 "select o.n, p.partstrat, pg_catalog.count(i.inhparent) "
4575 "from pg_catalog.pg_class as c "
4576 "join pg_catalog.pg_namespace as n on (n.oid = c.relnamespace) "
4577 "cross join lateral (select pg_catalog.array_position(pg_catalog.current_schemas(true), n.nspname)) as o(n) "
4578 "left join pg_catalog.pg_partitioned_table as p on (p.partrelid = c.oid) "
4579 "left join pg_catalog.pg_inherits as i on (c.oid = i.inhparent) "
4580 "where c.relname = 'pgbench_accounts' and o.n is not null "
4581 "group by 1, 2 "
4582 "order by 1 asc "
4583 "limit 1");
4584
4585 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4586 {
4587 /* probably an older version, coldly assume no partitioning */
4588 partition_method = PART_NONE;
4589 partitions = 0;
4590 }
4591 else if (PQntuples(res) == 0)
4592 {
4593 /*
4594 * This case is unlikely as pgbench already found "pgbench_branches"
4595 * above to compute the scale.
4596 */
4597 pg_log_fatal("no pgbench_accounts table found in search_path");
4598 pg_log_info("Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\".", PQdb(con));
4599 exit(1);
4600 }
4601 else /* PQntupes(res) == 1 */
4602 {
4603 /* normal case, extract partition information */
4604 if (PQgetisnull(res, 0, 1))
4605 partition_method = PART_NONE;
4606 else
4607 {
4608 char *ps = PQgetvalue(res, 0, 1);
4609
4610 /* column must be there */
4611 Assert(ps != NULL);
4612
4613 if (strcmp(ps, "r") == 0)
4614 partition_method = PART_RANGE;
4615 else if (strcmp(ps, "h") == 0)
4616 partition_method = PART_HASH;
4617 else
4618 {
4619 /* possibly a newer version with new partition method */
4620 pg_log_fatal("unexpected partition method: \"%s\"", ps);
4621 exit(1);
4622 }
4623 }
4624
4625 partitions = atoi(PQgetvalue(res, 0, 2));
4626 }
4627
4628 PQclear(res);
4629 }
4630
4631 /*
4632 * Replace :param with $n throughout the command's SQL text, which
4633 * is a modifiable string in cmd->lines.
4634 */
4635 static bool
parseQuery(Command * cmd)4636 parseQuery(Command *cmd)
4637 {
4638 char *sql,
4639 *p;
4640
4641 cmd->argc = 1;
4642
4643 p = sql = pg_strdup(cmd->lines.data);
4644 while ((p = strchr(p, ':')) != NULL)
4645 {
4646 char var[13];
4647 char *name;
4648 int eaten;
4649
4650 name = parseVariable(p, &eaten);
4651 if (name == NULL)
4652 {
4653 while (*p == ':')
4654 {
4655 p++;
4656 }
4657 continue;
4658 }
4659
4660 /*
4661 * cmd->argv[0] is the SQL statement itself, so the max number of
4662 * arguments is one less than MAX_ARGS
4663 */
4664 if (cmd->argc >= MAX_ARGS)
4665 {
4666 pg_log_error("statement has too many arguments (maximum is %d): %s",
4667 MAX_ARGS - 1, cmd->lines.data);
4668 pg_free(name);
4669 return false;
4670 }
4671
4672 sprintf(var, "$%d", cmd->argc);
4673 p = replaceVariable(&sql, p, eaten, var);
4674
4675 cmd->argv[cmd->argc] = name;
4676 cmd->argc++;
4677 }
4678
4679 Assert(cmd->argv[0] == NULL);
4680 cmd->argv[0] = sql;
4681 return true;
4682 }
4683
4684 /*
4685 * syntax error while parsing a script (in practice, while parsing a
4686 * backslash command, because we don't detect syntax errors in SQL)
4687 *
4688 * source: source of script (filename or builtin-script ID)
4689 * lineno: line number within script (count from 1)
4690 * line: whole line of backslash command, if available
4691 * command: backslash command name, if available
4692 * msg: the actual error message
4693 * more: optional extra message
4694 * column: zero-based column number, or -1 if unknown
4695 */
4696 void
syntax_error(const char * source,int lineno,const char * line,const char * command,const char * msg,const char * more,int column)4697 syntax_error(const char *source, int lineno,
4698 const char *line, const char *command,
4699 const char *msg, const char *more, int column)
4700 {
4701 PQExpBufferData buf;
4702
4703 initPQExpBuffer(&buf);
4704
4705 printfPQExpBuffer(&buf, "%s:%d: %s", source, lineno, msg);
4706 if (more != NULL)
4707 appendPQExpBuffer(&buf, " (%s)", more);
4708 if (column >= 0 && line == NULL)
4709 appendPQExpBuffer(&buf, " at column %d", column + 1);
4710 if (command != NULL)
4711 appendPQExpBuffer(&buf, " in command \"%s\"", command);
4712
4713 pg_log_fatal("%s", buf.data);
4714
4715 termPQExpBuffer(&buf);
4716
4717 if (line != NULL)
4718 {
4719 fprintf(stderr, "%s\n", line);
4720 if (column >= 0)
4721 fprintf(stderr, "%*c error found here\n", column + 1, '^');
4722 }
4723
4724 exit(1);
4725 }
4726
4727 /*
4728 * Return a pointer to the start of the SQL command, after skipping over
4729 * whitespace and "--" comments.
4730 * If the end of the string is reached, return NULL.
4731 */
4732 static char *
skip_sql_comments(char * sql_command)4733 skip_sql_comments(char *sql_command)
4734 {
4735 char *p = sql_command;
4736
4737 /* Skip any leading whitespace, as well as "--" style comments */
4738 for (;;)
4739 {
4740 if (isspace((unsigned char) *p))
4741 p++;
4742 else if (strncmp(p, "--", 2) == 0)
4743 {
4744 p = strchr(p, '\n');
4745 if (p == NULL)
4746 return NULL;
4747 p++;
4748 }
4749 else
4750 break;
4751 }
4752
4753 /* NULL if there's nothing but whitespace and comments */
4754 if (*p == '\0')
4755 return NULL;
4756
4757 return p;
4758 }
4759
4760 /*
4761 * Parse a SQL command; return a Command struct, or NULL if it's a comment
4762 *
4763 * On entry, psqlscan.l has collected the command into "buf", so we don't
4764 * really need to do much here except check for comments and set up a Command
4765 * struct.
4766 */
4767 static Command *
create_sql_command(PQExpBuffer buf,const char * source)4768 create_sql_command(PQExpBuffer buf, const char *source)
4769 {
4770 Command *my_command;
4771 char *p = skip_sql_comments(buf->data);
4772
4773 if (p == NULL)
4774 return NULL;
4775
4776 /* Allocate and initialize Command structure */
4777 my_command = (Command *) pg_malloc(sizeof(Command));
4778 initPQExpBuffer(&my_command->lines);
4779 appendPQExpBufferStr(&my_command->lines, p);
4780 my_command->first_line = NULL; /* this is set later */
4781 my_command->type = SQL_COMMAND;
4782 my_command->meta = META_NONE;
4783 my_command->argc = 0;
4784 memset(my_command->argv, 0, sizeof(my_command->argv));
4785 my_command->varprefix = NULL; /* allocated later, if needed */
4786 my_command->expr = NULL;
4787 initSimpleStats(&my_command->stats);
4788
4789 return my_command;
4790 }
4791
4792 /* Free a Command structure and associated data */
4793 static void
free_command(Command * command)4794 free_command(Command *command)
4795 {
4796 termPQExpBuffer(&command->lines);
4797 if (command->first_line)
4798 pg_free(command->first_line);
4799 for (int i = 0; i < command->argc; i++)
4800 pg_free(command->argv[i]);
4801 if (command->varprefix)
4802 pg_free(command->varprefix);
4803
4804 /*
4805 * It should also free expr recursively, but this is currently not needed
4806 * as only gset commands (which do not have an expression) are freed.
4807 */
4808 pg_free(command);
4809 }
4810
4811 /*
4812 * Once an SQL command is fully parsed, possibly by accumulating several
4813 * parts, complete other fields of the Command structure.
4814 */
4815 static void
postprocess_sql_command(Command * my_command)4816 postprocess_sql_command(Command *my_command)
4817 {
4818 char buffer[128];
4819
4820 Assert(my_command->type == SQL_COMMAND);
4821
4822 /* Save the first line for error display. */
4823 strlcpy(buffer, my_command->lines.data, sizeof(buffer));
4824 buffer[strcspn(buffer, "\n\r")] = '\0';
4825 my_command->first_line = pg_strdup(buffer);
4826
4827 /* parse query if necessary */
4828 switch (querymode)
4829 {
4830 case QUERY_SIMPLE:
4831 my_command->argv[0] = my_command->lines.data;
4832 my_command->argc++;
4833 break;
4834 case QUERY_EXTENDED:
4835 case QUERY_PREPARED:
4836 if (!parseQuery(my_command))
4837 exit(1);
4838 break;
4839 default:
4840 exit(1);
4841 }
4842 }
4843
4844 /*
4845 * Parse a backslash command; return a Command struct, or NULL if comment
4846 *
4847 * At call, we have scanned only the initial backslash.
4848 */
4849 static Command *
process_backslash_command(PsqlScanState sstate,const char * source)4850 process_backslash_command(PsqlScanState sstate, const char *source)
4851 {
4852 Command *my_command;
4853 PQExpBufferData word_buf;
4854 int word_offset;
4855 int offsets[MAX_ARGS]; /* offsets of argument words */
4856 int start_offset;
4857 int lineno;
4858 int j;
4859
4860 initPQExpBuffer(&word_buf);
4861
4862 /* Remember location of the backslash */
4863 start_offset = expr_scanner_offset(sstate) - 1;
4864 lineno = expr_scanner_get_lineno(sstate, start_offset);
4865
4866 /* Collect first word of command */
4867 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4868 {
4869 termPQExpBuffer(&word_buf);
4870 return NULL;
4871 }
4872
4873 /* Allocate and initialize Command structure */
4874 my_command = (Command *) pg_malloc0(sizeof(Command));
4875 my_command->type = META_COMMAND;
4876 my_command->argc = 0;
4877 initSimpleStats(&my_command->stats);
4878
4879 /* Save first word (command name) */
4880 j = 0;
4881 offsets[j] = word_offset;
4882 my_command->argv[j++] = pg_strdup(word_buf.data);
4883 my_command->argc++;
4884
4885 /* ... and convert it to enum form */
4886 my_command->meta = getMetaCommand(my_command->argv[0]);
4887
4888 if (my_command->meta == META_SET ||
4889 my_command->meta == META_IF ||
4890 my_command->meta == META_ELIF)
4891 {
4892 yyscan_t yyscanner;
4893
4894 /* For \set, collect var name */
4895 if (my_command->meta == META_SET)
4896 {
4897 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4898 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4899 "missing argument", NULL, -1);
4900
4901 offsets[j] = word_offset;
4902 my_command->argv[j++] = pg_strdup(word_buf.data);
4903 my_command->argc++;
4904 }
4905
4906 /* then for all parse the expression */
4907 yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
4908 my_command->argv[0]);
4909
4910 if (expr_yyparse(yyscanner) != 0)
4911 {
4912 /* dead code: exit done from syntax_error called by yyerror */
4913 exit(1);
4914 }
4915
4916 my_command->expr = expr_parse_result;
4917
4918 /* Save line, trimming any trailing newline */
4919 my_command->first_line =
4920 expr_scanner_get_substring(sstate,
4921 start_offset,
4922 expr_scanner_offset(sstate),
4923 true);
4924
4925 expr_scanner_finish(yyscanner);
4926
4927 termPQExpBuffer(&word_buf);
4928
4929 return my_command;
4930 }
4931
4932 /* For all other commands, collect remaining words. */
4933 while (expr_lex_one_word(sstate, &word_buf, &word_offset))
4934 {
4935 /*
4936 * my_command->argv[0] is the command itself, so the max number of
4937 * arguments is one less than MAX_ARGS
4938 */
4939 if (j >= MAX_ARGS)
4940 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4941 "too many arguments", NULL, -1);
4942
4943 offsets[j] = word_offset;
4944 my_command->argv[j++] = pg_strdup(word_buf.data);
4945 my_command->argc++;
4946 }
4947
4948 /* Save line, trimming any trailing newline */
4949 my_command->first_line =
4950 expr_scanner_get_substring(sstate,
4951 start_offset,
4952 expr_scanner_offset(sstate),
4953 true);
4954
4955 if (my_command->meta == META_SLEEP)
4956 {
4957 if (my_command->argc < 2)
4958 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4959 "missing argument", NULL, -1);
4960
4961 if (my_command->argc > 3)
4962 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4963 "too many arguments", NULL,
4964 offsets[3] - start_offset);
4965
4966 /*
4967 * Split argument into number and unit to allow "sleep 1ms" etc. We
4968 * don't have to terminate the number argument with null because it
4969 * will be parsed with atoi, which ignores trailing non-digit
4970 * characters.
4971 */
4972 if (my_command->argv[1][0] != ':')
4973 {
4974 char *c = my_command->argv[1];
4975 bool have_digit = false;
4976
4977 /* Skip sign */
4978 if (*c == '+' || *c == '-')
4979 c++;
4980
4981 /* Require at least one digit */
4982 if (*c && isdigit((unsigned char) *c))
4983 have_digit = true;
4984
4985 /* Eat all digits */
4986 while (*c && isdigit((unsigned char) *c))
4987 c++;
4988
4989 if (*c)
4990 {
4991 if (my_command->argc == 2 && have_digit)
4992 {
4993 my_command->argv[2] = c;
4994 offsets[2] = offsets[1] + (c - my_command->argv[1]);
4995 my_command->argc = 3;
4996 }
4997 else
4998 {
4999 /*
5000 * Raise an error if argument starts with non-digit
5001 * character (after sign).
5002 */
5003 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5004 "invalid sleep time, must be an integer",
5005 my_command->argv[1], offsets[1] - start_offset);
5006 }
5007 }
5008 }
5009
5010 if (my_command->argc == 3)
5011 {
5012 if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
5013 pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
5014 pg_strcasecmp(my_command->argv[2], "s") != 0)
5015 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5016 "unrecognized time unit, must be us, ms or s",
5017 my_command->argv[2], offsets[2] - start_offset);
5018 }
5019 }
5020 else if (my_command->meta == META_SETSHELL)
5021 {
5022 if (my_command->argc < 3)
5023 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5024 "missing argument", NULL, -1);
5025 }
5026 else if (my_command->meta == META_SHELL)
5027 {
5028 if (my_command->argc < 2)
5029 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5030 "missing command", NULL, -1);
5031 }
5032 else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
5033 my_command->meta == META_STARTPIPELINE ||
5034 my_command->meta == META_ENDPIPELINE)
5035 {
5036 if (my_command->argc != 1)
5037 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5038 "unexpected argument", NULL, -1);
5039 }
5040 else if (my_command->meta == META_GSET || my_command->meta == META_ASET)
5041 {
5042 if (my_command->argc > 2)
5043 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5044 "too many arguments", NULL, -1);
5045 }
5046 else
5047 {
5048 /* my_command->meta == META_NONE */
5049 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5050 "invalid command", NULL, -1);
5051 }
5052
5053 termPQExpBuffer(&word_buf);
5054
5055 return my_command;
5056 }
5057
5058 static void
ConditionError(const char * desc,int cmdn,const char * msg)5059 ConditionError(const char *desc, int cmdn, const char *msg)
5060 {
5061 pg_log_fatal("condition error in script \"%s\" command %d: %s",
5062 desc, cmdn, msg);
5063 exit(1);
5064 }
5065
5066 /*
5067 * Partial evaluation of conditionals before recording and running the script.
5068 */
5069 static void
CheckConditional(ParsedScript ps)5070 CheckConditional(ParsedScript ps)
5071 {
5072 /* statically check conditional structure */
5073 ConditionalStack cs = conditional_stack_create();
5074 int i;
5075
5076 for (i = 0; ps.commands[i] != NULL; i++)
5077 {
5078 Command *cmd = ps.commands[i];
5079
5080 if (cmd->type == META_COMMAND)
5081 {
5082 switch (cmd->meta)
5083 {
5084 case META_IF:
5085 conditional_stack_push(cs, IFSTATE_FALSE);
5086 break;
5087 case META_ELIF:
5088 if (conditional_stack_empty(cs))
5089 ConditionError(ps.desc, i + 1, "\\elif without matching \\if");
5090 if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
5091 ConditionError(ps.desc, i + 1, "\\elif after \\else");
5092 break;
5093 case META_ELSE:
5094 if (conditional_stack_empty(cs))
5095 ConditionError(ps.desc, i + 1, "\\else without matching \\if");
5096 if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
5097 ConditionError(ps.desc, i + 1, "\\else after \\else");
5098 conditional_stack_poke(cs, IFSTATE_ELSE_FALSE);
5099 break;
5100 case META_ENDIF:
5101 if (!conditional_stack_pop(cs))
5102 ConditionError(ps.desc, i + 1, "\\endif without matching \\if");
5103 break;
5104 default:
5105 /* ignore anything else... */
5106 break;
5107 }
5108 }
5109 }
5110 if (!conditional_stack_empty(cs))
5111 ConditionError(ps.desc, i + 1, "\\if without matching \\endif");
5112 conditional_stack_destroy(cs);
5113 }
5114
5115 /*
5116 * Parse a script (either the contents of a file, or a built-in script)
5117 * and add it to the list of scripts.
5118 */
5119 static void
ParseScript(const char * script,const char * desc,int weight)5120 ParseScript(const char *script, const char *desc, int weight)
5121 {
5122 ParsedScript ps;
5123 PsqlScanState sstate;
5124 PQExpBufferData line_buf;
5125 int alloc_num;
5126 int index;
5127 int lineno;
5128 int start_offset;
5129
5130 #define COMMANDS_ALLOC_NUM 128
5131 alloc_num = COMMANDS_ALLOC_NUM;
5132
5133 /* Initialize all fields of ps */
5134 ps.desc = desc;
5135 ps.weight = weight;
5136 ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
5137 initStats(&ps.stats, 0);
5138
5139 /* Prepare to parse script */
5140 sstate = psql_scan_create(&pgbench_callbacks);
5141
5142 /*
5143 * Ideally, we'd scan scripts using the encoding and stdstrings settings
5144 * we get from a DB connection. However, without major rearrangement of
5145 * pgbench's argument parsing, we can't have a DB connection at the time
5146 * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
5147 * with any backend-safe encoding, though conceivably we could be fooled
5148 * if a script file uses a client-only encoding. We also assume that
5149 * stdstrings should be true, which is a bit riskier.
5150 */
5151 psql_scan_setup(sstate, script, strlen(script), 0, true);
5152 start_offset = expr_scanner_offset(sstate) - 1;
5153
5154 initPQExpBuffer(&line_buf);
5155
5156 index = 0;
5157
5158 for (;;)
5159 {
5160 PsqlScanResult sr;
5161 promptStatus_t prompt;
5162 Command *command = NULL;
5163
5164 resetPQExpBuffer(&line_buf);
5165 lineno = expr_scanner_get_lineno(sstate, start_offset);
5166
5167 sr = psql_scan(sstate, &line_buf, &prompt);
5168
5169 /* If we collected a new SQL command, process that */
5170 command = create_sql_command(&line_buf, desc);
5171
5172 /* store new command */
5173 if (command)
5174 ps.commands[index++] = command;
5175
5176 /* If we reached a backslash, process that */
5177 if (sr == PSCAN_BACKSLASH)
5178 {
5179 command = process_backslash_command(sstate, desc);
5180
5181 if (command)
5182 {
5183 /*
5184 * If this is gset or aset, merge into the preceding command.
5185 * (We don't use a command slot in this case).
5186 */
5187 if (command->meta == META_GSET || command->meta == META_ASET)
5188 {
5189 Command *cmd;
5190
5191 if (index == 0)
5192 syntax_error(desc, lineno, NULL, NULL,
5193 "\\gset must follow an SQL command",
5194 NULL, -1);
5195
5196 cmd = ps.commands[index - 1];
5197
5198 if (cmd->type != SQL_COMMAND ||
5199 cmd->varprefix != NULL)
5200 syntax_error(desc, lineno, NULL, NULL,
5201 "\\gset must follow an SQL command",
5202 cmd->first_line, -1);
5203
5204 /* get variable prefix */
5205 if (command->argc <= 1 || command->argv[1][0] == '\0')
5206 cmd->varprefix = pg_strdup("");
5207 else
5208 cmd->varprefix = pg_strdup(command->argv[1]);
5209
5210 /* update the sql command meta */
5211 cmd->meta = command->meta;
5212
5213 /* cleanup unused command */
5214 free_command(command);
5215
5216 continue;
5217 }
5218
5219 /* Attach any other backslash command as a new command */
5220 ps.commands[index++] = command;
5221 }
5222 }
5223
5224 /*
5225 * Since we used a command slot, allocate more if needed. Note we
5226 * always allocate one more in order to accommodate the NULL
5227 * terminator below.
5228 */
5229 if (index >= alloc_num)
5230 {
5231 alloc_num += COMMANDS_ALLOC_NUM;
5232 ps.commands = (Command **)
5233 pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
5234 }
5235
5236 /* Done if we reached EOF */
5237 if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
5238 break;
5239 }
5240
5241 ps.commands[index] = NULL;
5242
5243 addScript(ps);
5244
5245 termPQExpBuffer(&line_buf);
5246 psql_scan_finish(sstate);
5247 psql_scan_destroy(sstate);
5248 }
5249
5250 /*
5251 * Read the entire contents of file fd, and return it in a malloc'd buffer.
5252 *
5253 * The buffer will typically be larger than necessary, but we don't care
5254 * in this program, because we'll free it as soon as we've parsed the script.
5255 */
5256 static char *
read_file_contents(FILE * fd)5257 read_file_contents(FILE *fd)
5258 {
5259 char *buf;
5260 size_t buflen = BUFSIZ;
5261 size_t used = 0;
5262
5263 buf = (char *) pg_malloc(buflen);
5264
5265 for (;;)
5266 {
5267 size_t nread;
5268
5269 nread = fread(buf + used, 1, BUFSIZ, fd);
5270 used += nread;
5271 /* If fread() read less than requested, must be EOF or error */
5272 if (nread < BUFSIZ)
5273 break;
5274 /* Enlarge buf so we can read some more */
5275 buflen += BUFSIZ;
5276 buf = (char *) pg_realloc(buf, buflen);
5277 }
5278 /* There is surely room for a terminator */
5279 buf[used] = '\0';
5280
5281 return buf;
5282 }
5283
5284 /*
5285 * Given a file name, read it and add its script to the list.
5286 * "-" means to read stdin.
5287 * NB: filename must be storage that won't disappear.
5288 */
5289 static void
process_file(const char * filename,int weight)5290 process_file(const char *filename, int weight)
5291 {
5292 FILE *fd;
5293 char *buf;
5294
5295 /* Slurp the file contents into "buf" */
5296 if (strcmp(filename, "-") == 0)
5297 fd = stdin;
5298 else if ((fd = fopen(filename, "r")) == NULL)
5299 {
5300 pg_log_fatal("could not open file \"%s\": %m", filename);
5301 exit(1);
5302 }
5303
5304 buf = read_file_contents(fd);
5305
5306 if (ferror(fd))
5307 {
5308 pg_log_fatal("could not read file \"%s\": %m", filename);
5309 exit(1);
5310 }
5311
5312 if (fd != stdin)
5313 fclose(fd);
5314
5315 ParseScript(buf, filename, weight);
5316
5317 free(buf);
5318 }
5319
5320 /* Parse the given builtin script and add it to the list. */
5321 static void
process_builtin(const BuiltinScript * bi,int weight)5322 process_builtin(const BuiltinScript *bi, int weight)
5323 {
5324 ParseScript(bi->script, bi->desc, weight);
5325 }
5326
5327 /* show available builtin scripts */
5328 static void
listAvailableScripts(void)5329 listAvailableScripts(void)
5330 {
5331 int i;
5332
5333 fprintf(stderr, "Available builtin scripts:\n");
5334 for (i = 0; i < lengthof(builtin_script); i++)
5335 fprintf(stderr, " %13s: %s\n", builtin_script[i].name, builtin_script[i].desc);
5336 fprintf(stderr, "\n");
5337 }
5338
5339 /* return builtin script "name" if unambiguous, fails if not found */
5340 static const BuiltinScript *
findBuiltin(const char * name)5341 findBuiltin(const char *name)
5342 {
5343 int i,
5344 found = 0,
5345 len = strlen(name);
5346 const BuiltinScript *result = NULL;
5347
5348 for (i = 0; i < lengthof(builtin_script); i++)
5349 {
5350 if (strncmp(builtin_script[i].name, name, len) == 0)
5351 {
5352 result = &builtin_script[i];
5353 found++;
5354 }
5355 }
5356
5357 /* ok, unambiguous result */
5358 if (found == 1)
5359 return result;
5360
5361 /* error cases */
5362 if (found == 0)
5363 pg_log_fatal("no builtin script found for name \"%s\"", name);
5364 else /* found > 1 */
5365 pg_log_fatal("ambiguous builtin name: %d builtin scripts found for prefix \"%s\"", found, name);
5366
5367 listAvailableScripts();
5368 exit(1);
5369 }
5370
5371 /*
5372 * Determine the weight specification from a script option (-b, -f), if any,
5373 * and return it as an integer (1 is returned if there's no weight). The
5374 * script name is returned in *script as a malloc'd string.
5375 */
5376 static int
parseScriptWeight(const char * option,char ** script)5377 parseScriptWeight(const char *option, char **script)
5378 {
5379 char *sep;
5380 int weight;
5381
5382 if ((sep = strrchr(option, WSEP)))
5383 {
5384 int namelen = sep - option;
5385 long wtmp;
5386 char *badp;
5387
5388 /* generate the script name */
5389 *script = pg_malloc(namelen + 1);
5390 strncpy(*script, option, namelen);
5391 (*script)[namelen] = '\0';
5392
5393 /* process digits of the weight spec */
5394 errno = 0;
5395 wtmp = strtol(sep + 1, &badp, 10);
5396 if (errno != 0 || badp == sep + 1 || *badp != '\0')
5397 {
5398 pg_log_fatal("invalid weight specification: %s", sep);
5399 exit(1);
5400 }
5401 if (wtmp > INT_MAX || wtmp < 0)
5402 {
5403 pg_log_fatal("weight specification out of range (0 .. %u): %lld",
5404 INT_MAX, (long long) wtmp);
5405 exit(1);
5406 }
5407 weight = wtmp;
5408 }
5409 else
5410 {
5411 *script = pg_strdup(option);
5412 weight = 1;
5413 }
5414
5415 return weight;
5416 }
5417
5418 /* append a script to the list of scripts to process */
5419 static void
addScript(ParsedScript script)5420 addScript(ParsedScript script)
5421 {
5422 if (script.commands == NULL || script.commands[0] == NULL)
5423 {
5424 pg_log_fatal("empty command list for script \"%s\"", script.desc);
5425 exit(1);
5426 }
5427
5428 if (num_scripts >= MAX_SCRIPTS)
5429 {
5430 pg_log_fatal("at most %d SQL scripts are allowed", MAX_SCRIPTS);
5431 exit(1);
5432 }
5433
5434 CheckConditional(script);
5435
5436 sql_script[num_scripts] = script;
5437 num_scripts++;
5438 }
5439
5440 /*
5441 * Print progress report.
5442 *
5443 * On entry, *last and *last_report contain the statistics and time of last
5444 * progress report. On exit, they are updated with the new stats.
5445 */
5446 static void
printProgressReport(TState * threads,int64 test_start,pg_time_usec_t now,StatsData * last,int64 * last_report)5447 printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
5448 StatsData *last, int64 *last_report)
5449 {
5450 /* generate and show report */
5451 pg_time_usec_t run = now - *last_report;
5452 int64 ntx;
5453 double tps,
5454 total_run,
5455 latency,
5456 sqlat,
5457 lag,
5458 stdev;
5459 char tbuf[315];
5460 StatsData cur;
5461
5462 /*
5463 * Add up the statistics of all threads.
5464 *
5465 * XXX: No locking. There is no guarantee that we get an atomic snapshot
5466 * of the transaction count and latencies, so these figures can well be
5467 * off by a small amount. The progress report's purpose is to give a
5468 * quick overview of how the test is going, so that shouldn't matter too
5469 * much. (If a read from a 64-bit integer is not atomic, you might get a
5470 * "torn" read and completely bogus latencies though!)
5471 */
5472 initStats(&cur, 0);
5473 for (int i = 0; i < nthreads; i++)
5474 {
5475 mergeSimpleStats(&cur.latency, &threads[i].stats.latency);
5476 mergeSimpleStats(&cur.lag, &threads[i].stats.lag);
5477 cur.cnt += threads[i].stats.cnt;
5478 cur.skipped += threads[i].stats.skipped;
5479 }
5480
5481 /* we count only actually executed transactions */
5482 ntx = (cur.cnt - cur.skipped) - (last->cnt - last->skipped);
5483 total_run = (now - test_start) / 1000000.0;
5484 tps = 1000000.0 * ntx / run;
5485 if (ntx > 0)
5486 {
5487 latency = 0.001 * (cur.latency.sum - last->latency.sum) / ntx;
5488 sqlat = 1.0 * (cur.latency.sum2 - last->latency.sum2) / ntx;
5489 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
5490 lag = 0.001 * (cur.lag.sum - last->lag.sum) / ntx;
5491 }
5492 else
5493 {
5494 latency = sqlat = stdev = lag = 0;
5495 }
5496
5497 if (progress_timestamp)
5498 {
5499 snprintf(tbuf, sizeof(tbuf), "%.3f s",
5500 PG_TIME_GET_DOUBLE(now + epoch_shift));
5501 }
5502 else
5503 {
5504 /* round seconds are expected, but the thread may be late */
5505 snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
5506 }
5507
5508 fprintf(stderr,
5509 "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
5510 tbuf, tps, latency, stdev);
5511
5512 if (throttle_delay)
5513 {
5514 fprintf(stderr, ", lag %.3f ms", lag);
5515 if (latency_limit)
5516 fprintf(stderr, ", " INT64_FORMAT " skipped",
5517 cur.skipped - last->skipped);
5518 }
5519 fprintf(stderr, "\n");
5520
5521 *last = cur;
5522 *last_report = now;
5523 }
5524
5525 static void
printSimpleStats(const char * prefix,SimpleStats * ss)5526 printSimpleStats(const char *prefix, SimpleStats *ss)
5527 {
5528 if (ss->count > 0)
5529 {
5530 double latency = ss->sum / ss->count;
5531 double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
5532
5533 printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
5534 printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
5535 }
5536 }
5537
5538 /* print version banner */
5539 static void
printVersion(PGconn * con)5540 printVersion(PGconn *con)
5541 {
5542 int server_ver = PQserverVersion(con);
5543 int client_ver = PG_VERSION_NUM;
5544
5545 if (server_ver != client_ver)
5546 {
5547 const char *server_version;
5548 char sverbuf[32];
5549
5550 /* Try to get full text form, might include "devel" etc */
5551 server_version = PQparameterStatus(con, "server_version");
5552 /* Otherwise fall back on server_ver */
5553 if (!server_version)
5554 {
5555 formatPGVersionNumber(server_ver, true,
5556 sverbuf, sizeof(sverbuf));
5557 server_version = sverbuf;
5558 }
5559
5560 printf(_("%s (%s, server %s)\n"),
5561 "pgbench", PG_VERSION, server_version);
5562 }
5563 /* For version match, only print pgbench version */
5564 else
5565 printf("%s (%s)\n", "pgbench", PG_VERSION);
5566 fflush(stdout);
5567 }
5568
5569 /* print out results */
5570 static void
printResults(StatsData * total,pg_time_usec_t total_duration,pg_time_usec_t conn_total_duration,pg_time_usec_t conn_elapsed_duration,int64 latency_late)5571 printResults(StatsData *total,
5572 pg_time_usec_t total_duration, /* benchmarking time */
5573 pg_time_usec_t conn_total_duration, /* is_connect */
5574 pg_time_usec_t conn_elapsed_duration, /* !is_connect */
5575 int64 latency_late)
5576 {
5577 /* tps is about actually executed transactions during benchmarking */
5578 int64 ntx = total->cnt - total->skipped;
5579 double bench_duration = PG_TIME_GET_DOUBLE(total_duration);
5580 double tps = ntx / bench_duration;
5581
5582 /* Report test parameters. */
5583 printf("transaction type: %s\n",
5584 num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
5585 printf("scaling factor: %d\n", scale);
5586 /* only print partitioning information if some partitioning was detected */
5587 if (partition_method != PART_NONE)
5588 printf("partition method: %s\npartitions: %d\n",
5589 PARTITION_METHOD[partition_method], partitions);
5590 printf("query mode: %s\n", QUERYMODE[querymode]);
5591 printf("number of clients: %d\n", nclients);
5592 printf("number of threads: %d\n", nthreads);
5593 if (duration <= 0)
5594 {
5595 printf("number of transactions per client: %d\n", nxacts);
5596 printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
5597 ntx, nxacts * nclients);
5598 }
5599 else
5600 {
5601 printf("duration: %d s\n", duration);
5602 printf("number of transactions actually processed: " INT64_FORMAT "\n",
5603 ntx);
5604 }
5605
5606 /* Remaining stats are nonsensical if we failed to execute any xacts */
5607 if (total->cnt <= 0)
5608 return;
5609
5610 if (throttle_delay && latency_limit)
5611 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
5612 total->skipped, 100.0 * total->skipped / total->cnt);
5613
5614 if (latency_limit)
5615 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f %%)\n",
5616 latency_limit / 1000.0, latency_late, ntx,
5617 (ntx > 0) ? 100.0 * latency_late / ntx : 0.0);
5618
5619 if (throttle_delay || progress || latency_limit)
5620 printSimpleStats("latency", &total->latency);
5621 else
5622 {
5623 /* no measurement, show average latency computed from run time */
5624 printf("latency average = %.3f ms\n",
5625 0.001 * total_duration * nclients / total->cnt);
5626 }
5627
5628 if (throttle_delay)
5629 {
5630 /*
5631 * Report average transaction lag under rate limit throttling. This
5632 * is the delay between scheduled and actual start times for the
5633 * transaction. The measured lag may be caused by thread/client load,
5634 * the database load, or the Poisson throttling process.
5635 */
5636 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
5637 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
5638 }
5639
5640 /*
5641 * Under -C/--connect, each transaction incurs a significant connection
5642 * cost, it would not make much sense to ignore it in tps, and it would
5643 * not be tps anyway.
5644 *
5645 * Otherwise connections are made just once at the beginning of the run
5646 * and should not impact performance but for very short run, so they are
5647 * (right)fully ignored in tps.
5648 */
5649 if (is_connect)
5650 {
5651 printf("average connection time = %.3f ms\n", 0.001 * conn_total_duration / total->cnt);
5652 printf("tps = %f (including reconnection times)\n", tps);
5653 }
5654 else
5655 {
5656 printf("initial connection time = %.3f ms\n", 0.001 * conn_elapsed_duration);
5657 printf("tps = %f (without initial connection time)\n", tps);
5658 }
5659
5660 /* Report per-script/command statistics */
5661 if (per_script_stats || report_per_command)
5662 {
5663 int i;
5664
5665 for (i = 0; i < num_scripts; i++)
5666 {
5667 if (per_script_stats)
5668 {
5669 StatsData *sstats = &sql_script[i].stats;
5670
5671 printf("SQL script %d: %s\n"
5672 " - weight: %d (targets %.1f%% of total)\n"
5673 " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
5674 i + 1, sql_script[i].desc,
5675 sql_script[i].weight,
5676 100.0 * sql_script[i].weight / total_weight,
5677 sstats->cnt,
5678 100.0 * sstats->cnt / total->cnt,
5679 (sstats->cnt - sstats->skipped) / bench_duration);
5680
5681 if (throttle_delay && latency_limit && sstats->cnt > 0)
5682 printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
5683 sstats->skipped,
5684 100.0 * sstats->skipped / sstats->cnt);
5685
5686 printSimpleStats(" - latency", &sstats->latency);
5687 }
5688
5689 /* Report per-command latencies */
5690 if (report_per_command)
5691 {
5692 Command **commands;
5693
5694 if (per_script_stats)
5695 printf(" - statement latencies in milliseconds:\n");
5696 else
5697 printf("statement latencies in milliseconds:\n");
5698
5699 for (commands = sql_script[i].commands;
5700 *commands != NULL;
5701 commands++)
5702 {
5703 SimpleStats *cstats = &(*commands)->stats;
5704
5705 printf(" %11.3f %s\n",
5706 (cstats->count > 0) ?
5707 1000.0 * cstats->sum / cstats->count : 0.0,
5708 (*commands)->first_line);
5709 }
5710 }
5711 }
5712 }
5713 }
5714
5715 /*
5716 * Set up a random seed according to seed parameter (NULL means default),
5717 * and initialize base_random_sequence for use in initializing other sequences.
5718 */
5719 static bool
set_random_seed(const char * seed)5720 set_random_seed(const char *seed)
5721 {
5722 uint64 iseed;
5723
5724 if (seed == NULL || strcmp(seed, "time") == 0)
5725 {
5726 /* rely on current time */
5727 iseed = pg_time_now();
5728 }
5729 else if (strcmp(seed, "rand") == 0)
5730 {
5731 /* use some "strong" random source */
5732 if (!pg_strong_random(&iseed, sizeof(iseed)))
5733 {
5734 pg_log_error("could not generate random seed");
5735 return false;
5736 }
5737 }
5738 else
5739 {
5740 /* parse unsigned-int seed value */
5741 unsigned long ulseed;
5742 char garbage;
5743
5744 /* Don't try to use UINT64_FORMAT here; it might not work for sscanf */
5745 if (sscanf(seed, "%lu%c", &ulseed, &garbage) != 1)
5746 {
5747 pg_log_error("unrecognized random seed option \"%s\"", seed);
5748 pg_log_info("Expecting an unsigned integer, \"time\" or \"rand\"");
5749 return false;
5750 }
5751 iseed = (uint64) ulseed;
5752 }
5753
5754 if (seed != NULL)
5755 pg_log_info("setting random seed to %llu", (unsigned long long) iseed);
5756 random_seed = iseed;
5757
5758 /* Fill base_random_sequence with low-order bits of seed */
5759 base_random_sequence.xseed[0] = iseed & 0xFFFF;
5760 base_random_sequence.xseed[1] = (iseed >> 16) & 0xFFFF;
5761 base_random_sequence.xseed[2] = (iseed >> 32) & 0xFFFF;
5762
5763 return true;
5764 }
5765
5766 int
main(int argc,char ** argv)5767 main(int argc, char **argv)
5768 {
5769 static struct option long_options[] = {
5770 /* systematic long/short named options */
5771 {"builtin", required_argument, NULL, 'b'},
5772 {"client", required_argument, NULL, 'c'},
5773 {"connect", no_argument, NULL, 'C'},
5774 {"debug", no_argument, NULL, 'd'},
5775 {"define", required_argument, NULL, 'D'},
5776 {"file", required_argument, NULL, 'f'},
5777 {"fillfactor", required_argument, NULL, 'F'},
5778 {"host", required_argument, NULL, 'h'},
5779 {"initialize", no_argument, NULL, 'i'},
5780 {"init-steps", required_argument, NULL, 'I'},
5781 {"jobs", required_argument, NULL, 'j'},
5782 {"log", no_argument, NULL, 'l'},
5783 {"latency-limit", required_argument, NULL, 'L'},
5784 {"no-vacuum", no_argument, NULL, 'n'},
5785 {"port", required_argument, NULL, 'p'},
5786 {"progress", required_argument, NULL, 'P'},
5787 {"protocol", required_argument, NULL, 'M'},
5788 {"quiet", no_argument, NULL, 'q'},
5789 {"report-latencies", no_argument, NULL, 'r'},
5790 {"rate", required_argument, NULL, 'R'},
5791 {"scale", required_argument, NULL, 's'},
5792 {"select-only", no_argument, NULL, 'S'},
5793 {"skip-some-updates", no_argument, NULL, 'N'},
5794 {"time", required_argument, NULL, 'T'},
5795 {"transactions", required_argument, NULL, 't'},
5796 {"username", required_argument, NULL, 'U'},
5797 {"vacuum-all", no_argument, NULL, 'v'},
5798 /* long-named only options */
5799 {"unlogged-tables", no_argument, NULL, 1},
5800 {"tablespace", required_argument, NULL, 2},
5801 {"index-tablespace", required_argument, NULL, 3},
5802 {"sampling-rate", required_argument, NULL, 4},
5803 {"aggregate-interval", required_argument, NULL, 5},
5804 {"progress-timestamp", no_argument, NULL, 6},
5805 {"log-prefix", required_argument, NULL, 7},
5806 {"foreign-keys", no_argument, NULL, 8},
5807 {"random-seed", required_argument, NULL, 9},
5808 {"show-script", required_argument, NULL, 10},
5809 {"partitions", required_argument, NULL, 11},
5810 {"partition-method", required_argument, NULL, 12},
5811 {NULL, 0, NULL, 0}
5812 };
5813
5814 int c;
5815 bool is_init_mode = false; /* initialize mode? */
5816 char *initialize_steps = NULL;
5817 bool foreign_keys = false;
5818 bool is_no_vacuum = false;
5819 bool do_vacuum_accounts = false; /* vacuum accounts table? */
5820 int optindex;
5821 bool scale_given = false;
5822
5823 bool benchmarking_option_set = false;
5824 bool initialization_option_set = false;
5825 bool internal_script_used = false;
5826
5827 CState *state; /* status of clients */
5828 TState *threads; /* array of thread */
5829
5830 pg_time_usec_t
5831 start_time, /* start up time */
5832 bench_start = 0, /* first recorded benchmarking time */
5833 conn_total_duration; /* cumulated connection time in
5834 * threads */
5835 int64 latency_late = 0;
5836 StatsData stats;
5837 int weight;
5838
5839 int i;
5840 int nclients_dealt;
5841
5842 #ifdef HAVE_GETRLIMIT
5843 struct rlimit rlim;
5844 #endif
5845
5846 PGconn *con;
5847 char *env;
5848
5849 int exit_code = 0;
5850 struct timeval tv;
5851
5852 /*
5853 * Record difference between Unix time and instr_time time. We'll use
5854 * this for logging and aggregation.
5855 */
5856 gettimeofday(&tv, NULL);
5857 epoch_shift = tv.tv_sec * INT64CONST(1000000) + tv.tv_usec - pg_time_now();
5858
5859 pg_logging_init(argv[0]);
5860 progname = get_progname(argv[0]);
5861
5862 if (argc > 1)
5863 {
5864 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
5865 {
5866 usage();
5867 exit(0);
5868 }
5869 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
5870 {
5871 puts("pgbench (PostgreSQL) " PG_VERSION);
5872 exit(0);
5873 }
5874 }
5875
5876 state = (CState *) pg_malloc0(sizeof(CState));
5877
5878 /* set random seed early, because it may be used while parsing scripts. */
5879 if (!set_random_seed(getenv("PGBENCH_RANDOM_SEED")))
5880 {
5881 pg_log_fatal("error while setting random seed from PGBENCH_RANDOM_SEED environment variable");
5882 exit(1);
5883 }
5884
5885 while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
5886 {
5887 char *script;
5888
5889 switch (c)
5890 {
5891 case 'i':
5892 is_init_mode = true;
5893 break;
5894 case 'I':
5895 if (initialize_steps)
5896 pg_free(initialize_steps);
5897 initialize_steps = pg_strdup(optarg);
5898 checkInitSteps(initialize_steps);
5899 initialization_option_set = true;
5900 break;
5901 case 'h':
5902 pghost = pg_strdup(optarg);
5903 break;
5904 case 'n':
5905 is_no_vacuum = true;
5906 break;
5907 case 'v':
5908 benchmarking_option_set = true;
5909 do_vacuum_accounts = true;
5910 break;
5911 case 'p':
5912 pgport = pg_strdup(optarg);
5913 break;
5914 case 'd':
5915 pg_logging_increase_verbosity();
5916 break;
5917 case 'c':
5918 benchmarking_option_set = true;
5919 nclients = atoi(optarg);
5920 if (nclients <= 0)
5921 {
5922 pg_log_fatal("invalid number of clients: \"%s\"", optarg);
5923 exit(1);
5924 }
5925 #ifdef HAVE_GETRLIMIT
5926 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
5927 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
5928 #else /* but BSD doesn't ... */
5929 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
5930 #endif /* RLIMIT_NOFILE */
5931 {
5932 pg_log_fatal("getrlimit failed: %m");
5933 exit(1);
5934 }
5935 if (rlim.rlim_cur < nclients + 3)
5936 {
5937 pg_log_fatal("need at least %d open files, but system limit is %ld",
5938 nclients + 3, (long) rlim.rlim_cur);
5939 pg_log_info("Reduce number of clients, or use limit/ulimit to increase the system limit.");
5940 exit(1);
5941 }
5942 #endif /* HAVE_GETRLIMIT */
5943 break;
5944 case 'j': /* jobs */
5945 benchmarking_option_set = true;
5946 nthreads = atoi(optarg);
5947 if (nthreads <= 0)
5948 {
5949 pg_log_fatal("invalid number of threads: \"%s\"", optarg);
5950 exit(1);
5951 }
5952 #ifndef ENABLE_THREAD_SAFETY
5953 if (nthreads != 1)
5954 {
5955 pg_log_fatal("threads are not supported on this platform; use -j1");
5956 exit(1);
5957 }
5958 #endif /* !ENABLE_THREAD_SAFETY */
5959 break;
5960 case 'C':
5961 benchmarking_option_set = true;
5962 is_connect = true;
5963 break;
5964 case 'r':
5965 benchmarking_option_set = true;
5966 report_per_command = true;
5967 break;
5968 case 's':
5969 scale_given = true;
5970 scale = atoi(optarg);
5971 if (scale <= 0)
5972 {
5973 pg_log_fatal("invalid scaling factor: \"%s\"", optarg);
5974 exit(1);
5975 }
5976 break;
5977 case 't':
5978 benchmarking_option_set = true;
5979 nxacts = atoi(optarg);
5980 if (nxacts <= 0)
5981 {
5982 pg_log_fatal("invalid number of transactions: \"%s\"", optarg);
5983 exit(1);
5984 }
5985 break;
5986 case 'T':
5987 benchmarking_option_set = true;
5988 duration = atoi(optarg);
5989 if (duration <= 0)
5990 {
5991 pg_log_fatal("invalid duration: \"%s\"", optarg);
5992 exit(1);
5993 }
5994 break;
5995 case 'U':
5996 username = pg_strdup(optarg);
5997 break;
5998 case 'l':
5999 benchmarking_option_set = true;
6000 use_log = true;
6001 break;
6002 case 'q':
6003 initialization_option_set = true;
6004 use_quiet = true;
6005 break;
6006 case 'b':
6007 if (strcmp(optarg, "list") == 0)
6008 {
6009 listAvailableScripts();
6010 exit(0);
6011 }
6012 weight = parseScriptWeight(optarg, &script);
6013 process_builtin(findBuiltin(script), weight);
6014 benchmarking_option_set = true;
6015 internal_script_used = true;
6016 break;
6017 case 'S':
6018 process_builtin(findBuiltin("select-only"), 1);
6019 benchmarking_option_set = true;
6020 internal_script_used = true;
6021 break;
6022 case 'N':
6023 process_builtin(findBuiltin("simple-update"), 1);
6024 benchmarking_option_set = true;
6025 internal_script_used = true;
6026 break;
6027 case 'f':
6028 weight = parseScriptWeight(optarg, &script);
6029 process_file(script, weight);
6030 benchmarking_option_set = true;
6031 break;
6032 case 'D':
6033 {
6034 char *p;
6035
6036 benchmarking_option_set = true;
6037
6038 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
6039 {
6040 pg_log_fatal("invalid variable definition: \"%s\"", optarg);
6041 exit(1);
6042 }
6043
6044 *p++ = '\0';
6045 if (!putVariable(&state[0], "option", optarg, p))
6046 exit(1);
6047 }
6048 break;
6049 case 'F':
6050 initialization_option_set = true;
6051 fillfactor = atoi(optarg);
6052 if (fillfactor < 10 || fillfactor > 100)
6053 {
6054 pg_log_fatal("invalid fillfactor: \"%s\"", optarg);
6055 exit(1);
6056 }
6057 break;
6058 case 'M':
6059 benchmarking_option_set = true;
6060 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
6061 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
6062 break;
6063 if (querymode >= NUM_QUERYMODE)
6064 {
6065 pg_log_fatal("invalid query mode (-M): \"%s\"", optarg);
6066 exit(1);
6067 }
6068 break;
6069 case 'P':
6070 benchmarking_option_set = true;
6071 progress = atoi(optarg);
6072 if (progress <= 0)
6073 {
6074 pg_log_fatal("invalid thread progress delay: \"%s\"", optarg);
6075 exit(1);
6076 }
6077 break;
6078 case 'R':
6079 {
6080 /* get a double from the beginning of option value */
6081 double throttle_value = atof(optarg);
6082
6083 benchmarking_option_set = true;
6084
6085 if (throttle_value <= 0.0)
6086 {
6087 pg_log_fatal("invalid rate limit: \"%s\"", optarg);
6088 exit(1);
6089 }
6090 /* Invert rate limit into per-transaction delay in usec */
6091 throttle_delay = 1000000.0 / throttle_value;
6092 }
6093 break;
6094 case 'L':
6095 {
6096 double limit_ms = atof(optarg);
6097
6098 if (limit_ms <= 0.0)
6099 {
6100 pg_log_fatal("invalid latency limit: \"%s\"", optarg);
6101 exit(1);
6102 }
6103 benchmarking_option_set = true;
6104 latency_limit = (int64) (limit_ms * 1000);
6105 }
6106 break;
6107 case 1: /* unlogged-tables */
6108 initialization_option_set = true;
6109 unlogged_tables = true;
6110 break;
6111 case 2: /* tablespace */
6112 initialization_option_set = true;
6113 tablespace = pg_strdup(optarg);
6114 break;
6115 case 3: /* index-tablespace */
6116 initialization_option_set = true;
6117 index_tablespace = pg_strdup(optarg);
6118 break;
6119 case 4: /* sampling-rate */
6120 benchmarking_option_set = true;
6121 sample_rate = atof(optarg);
6122 if (sample_rate <= 0.0 || sample_rate > 1.0)
6123 {
6124 pg_log_fatal("invalid sampling rate: \"%s\"", optarg);
6125 exit(1);
6126 }
6127 break;
6128 case 5: /* aggregate-interval */
6129 benchmarking_option_set = true;
6130 agg_interval = atoi(optarg);
6131 if (agg_interval <= 0)
6132 {
6133 pg_log_fatal("invalid number of seconds for aggregation: \"%s\"", optarg);
6134 exit(1);
6135 }
6136 break;
6137 case 6: /* progress-timestamp */
6138 progress_timestamp = true;
6139 benchmarking_option_set = true;
6140 break;
6141 case 7: /* log-prefix */
6142 benchmarking_option_set = true;
6143 logfile_prefix = pg_strdup(optarg);
6144 break;
6145 case 8: /* foreign-keys */
6146 initialization_option_set = true;
6147 foreign_keys = true;
6148 break;
6149 case 9: /* random-seed */
6150 benchmarking_option_set = true;
6151 if (!set_random_seed(optarg))
6152 {
6153 pg_log_fatal("error while setting random seed from --random-seed option");
6154 exit(1);
6155 }
6156 break;
6157 case 10: /* list */
6158 {
6159 const BuiltinScript *s = findBuiltin(optarg);
6160
6161 fprintf(stderr, "-- %s: %s\n%s\n", s->name, s->desc, s->script);
6162 exit(0);
6163 }
6164 break;
6165 case 11: /* partitions */
6166 initialization_option_set = true;
6167 partitions = atoi(optarg);
6168 if (partitions < 0)
6169 {
6170 pg_log_fatal("invalid number of partitions: \"%s\"", optarg);
6171 exit(1);
6172 }
6173 break;
6174 case 12: /* partition-method */
6175 initialization_option_set = true;
6176 if (pg_strcasecmp(optarg, "range") == 0)
6177 partition_method = PART_RANGE;
6178 else if (pg_strcasecmp(optarg, "hash") == 0)
6179 partition_method = PART_HASH;
6180 else
6181 {
6182 pg_log_fatal("invalid partition method, expecting \"range\" or \"hash\", got: \"%s\"",
6183 optarg);
6184 exit(1);
6185 }
6186 break;
6187 default:
6188 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
6189 exit(1);
6190 break;
6191 }
6192 }
6193
6194 /* set default script if none */
6195 if (num_scripts == 0 && !is_init_mode)
6196 {
6197 process_builtin(findBuiltin("tpcb-like"), 1);
6198 benchmarking_option_set = true;
6199 internal_script_used = true;
6200 }
6201
6202 /* complete SQL command initialization and compute total weight */
6203 for (i = 0; i < num_scripts; i++)
6204 {
6205 Command **commands = sql_script[i].commands;
6206
6207 for (int j = 0; commands[j] != NULL; j++)
6208 if (commands[j]->type == SQL_COMMAND)
6209 postprocess_sql_command(commands[j]);
6210
6211 /* cannot overflow: weight is 32b, total_weight 64b */
6212 total_weight += sql_script[i].weight;
6213 }
6214
6215 if (total_weight == 0 && !is_init_mode)
6216 {
6217 pg_log_fatal("total script weight must not be zero");
6218 exit(1);
6219 }
6220
6221 /* show per script stats if several scripts are used */
6222 if (num_scripts > 1)
6223 per_script_stats = true;
6224
6225 /*
6226 * Don't need more threads than there are clients. (This is not merely an
6227 * optimization; throttle_delay is calculated incorrectly below if some
6228 * threads have no clients assigned to them.)
6229 */
6230 if (nthreads > nclients)
6231 nthreads = nclients;
6232
6233 /*
6234 * Convert throttle_delay to a per-thread delay time. Note that this
6235 * might be a fractional number of usec, but that's OK, since it's just
6236 * the center of a Poisson distribution of delays.
6237 */
6238 throttle_delay *= nthreads;
6239
6240 if (argc > optind)
6241 dbName = argv[optind++];
6242 else
6243 {
6244 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
6245 dbName = env;
6246 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
6247 dbName = env;
6248 else
6249 dbName = get_user_name_or_exit(progname);
6250 }
6251
6252 if (optind < argc)
6253 {
6254 pg_log_fatal("too many command-line arguments (first is \"%s\")",
6255 argv[optind]);
6256 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
6257 exit(1);
6258 }
6259
6260 if (is_init_mode)
6261 {
6262 if (benchmarking_option_set)
6263 {
6264 pg_log_fatal("some of the specified options cannot be used in initialization (-i) mode");
6265 exit(1);
6266 }
6267
6268 if (partitions == 0 && partition_method != PART_NONE)
6269 {
6270 pg_log_fatal("--partition-method requires greater than zero --partitions");
6271 exit(1);
6272 }
6273
6274 /* set default method */
6275 if (partitions > 0 && partition_method == PART_NONE)
6276 partition_method = PART_RANGE;
6277
6278 if (initialize_steps == NULL)
6279 initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
6280
6281 if (is_no_vacuum)
6282 {
6283 /* Remove any vacuum step in initialize_steps */
6284 char *p;
6285
6286 while ((p = strchr(initialize_steps, 'v')) != NULL)
6287 *p = ' ';
6288 }
6289
6290 if (foreign_keys)
6291 {
6292 /* Add 'f' to end of initialize_steps, if not already there */
6293 if (strchr(initialize_steps, 'f') == NULL)
6294 {
6295 initialize_steps = (char *)
6296 pg_realloc(initialize_steps,
6297 strlen(initialize_steps) + 2);
6298 strcat(initialize_steps, "f");
6299 }
6300 }
6301
6302 runInitSteps(initialize_steps);
6303 exit(0);
6304 }
6305 else
6306 {
6307 if (initialization_option_set)
6308 {
6309 pg_log_fatal("some of the specified options cannot be used in benchmarking mode");
6310 exit(1);
6311 }
6312 }
6313
6314 if (nxacts > 0 && duration > 0)
6315 {
6316 pg_log_fatal("specify either a number of transactions (-t) or a duration (-T), not both");
6317 exit(1);
6318 }
6319
6320 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
6321 if (nxacts <= 0 && duration <= 0)
6322 nxacts = DEFAULT_NXACTS;
6323
6324 /* --sampling-rate may be used only with -l */
6325 if (sample_rate > 0.0 && !use_log)
6326 {
6327 pg_log_fatal("log sampling (--sampling-rate) is allowed only when logging transactions (-l)");
6328 exit(1);
6329 }
6330
6331 /* --sampling-rate may not be used with --aggregate-interval */
6332 if (sample_rate > 0.0 && agg_interval > 0)
6333 {
6334 pg_log_fatal("log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time");
6335 exit(1);
6336 }
6337
6338 if (agg_interval > 0 && !use_log)
6339 {
6340 pg_log_fatal("log aggregation is allowed only when actually logging transactions");
6341 exit(1);
6342 }
6343
6344 if (!use_log && logfile_prefix)
6345 {
6346 pg_log_fatal("log file prefix (--log-prefix) is allowed only when logging transactions (-l)");
6347 exit(1);
6348 }
6349
6350 if (duration > 0 && agg_interval > duration)
6351 {
6352 pg_log_fatal("number of seconds for aggregation (%d) must not be higher than test duration (%d)", agg_interval, duration);
6353 exit(1);
6354 }
6355
6356 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
6357 {
6358 pg_log_fatal("duration (%d) must be a multiple of aggregation interval (%d)", duration, agg_interval);
6359 exit(1);
6360 }
6361
6362 if (progress_timestamp && progress == 0)
6363 {
6364 pg_log_fatal("--progress-timestamp is allowed only under --progress");
6365 exit(1);
6366 }
6367
6368 /*
6369 * save main process id in the global variable because process id will be
6370 * changed after fork.
6371 */
6372 main_pid = (int) getpid();
6373
6374 if (nclients > 1)
6375 {
6376 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
6377 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
6378
6379 /* copy any -D switch values to all clients */
6380 for (i = 1; i < nclients; i++)
6381 {
6382 int j;
6383
6384 state[i].id = i;
6385 for (j = 0; j < state[0].nvariables; j++)
6386 {
6387 Variable *var = &state[0].variables[j];
6388
6389 if (var->value.type != PGBT_NO_VALUE)
6390 {
6391 if (!putVariableValue(&state[i], "startup",
6392 var->name, &var->value))
6393 exit(1);
6394 }
6395 else
6396 {
6397 if (!putVariable(&state[i], "startup",
6398 var->name, var->svalue))
6399 exit(1);
6400 }
6401 }
6402 }
6403 }
6404
6405 /* other CState initializations */
6406 for (i = 0; i < nclients; i++)
6407 {
6408 state[i].cstack = conditional_stack_create();
6409 initRandomState(&state[i].cs_func_rs);
6410 }
6411
6412 /* opening connection... */
6413 con = doConnect();
6414 if (con == NULL)
6415 exit(1);
6416
6417 /* report pgbench and server versions */
6418 printVersion(con);
6419
6420 pg_log_debug("pghost: %s pgport: %s nclients: %d %s: %d dbName: %s",
6421 PQhost(con), PQport(con), nclients,
6422 duration <= 0 ? "nxacts" : "duration",
6423 duration <= 0 ? nxacts : duration, PQdb(con));
6424
6425 if (internal_script_used)
6426 GetTableInfo(con, scale_given);
6427
6428 /*
6429 * :scale variables normally get -s or database scale, but don't override
6430 * an explicit -D switch
6431 */
6432 if (lookupVariable(&state[0], "scale") == NULL)
6433 {
6434 for (i = 0; i < nclients; i++)
6435 {
6436 if (!putVariableInt(&state[i], "startup", "scale", scale))
6437 exit(1);
6438 }
6439 }
6440
6441 /*
6442 * Define a :client_id variable that is unique per connection. But don't
6443 * override an explicit -D switch.
6444 */
6445 if (lookupVariable(&state[0], "client_id") == NULL)
6446 {
6447 for (i = 0; i < nclients; i++)
6448 if (!putVariableInt(&state[i], "startup", "client_id", i))
6449 exit(1);
6450 }
6451
6452 /* set default seed for hash functions */
6453 if (lookupVariable(&state[0], "default_seed") == NULL)
6454 {
6455 uint64 seed =
6456 ((uint64) pg_jrand48(base_random_sequence.xseed) & 0xFFFFFFFF) |
6457 (((uint64) pg_jrand48(base_random_sequence.xseed) & 0xFFFFFFFF) << 32);
6458
6459 for (i = 0; i < nclients; i++)
6460 if (!putVariableInt(&state[i], "startup", "default_seed", (int64) seed))
6461 exit(1);
6462 }
6463
6464 /* set random seed unless overwritten */
6465 if (lookupVariable(&state[0], "random_seed") == NULL)
6466 {
6467 for (i = 0; i < nclients; i++)
6468 if (!putVariableInt(&state[i], "startup", "random_seed", random_seed))
6469 exit(1);
6470 }
6471
6472 if (!is_no_vacuum)
6473 {
6474 fprintf(stderr, "starting vacuum...");
6475 tryExecuteStatement(con, "vacuum pgbench_branches");
6476 tryExecuteStatement(con, "vacuum pgbench_tellers");
6477 tryExecuteStatement(con, "truncate pgbench_history");
6478 fprintf(stderr, "end.\n");
6479
6480 if (do_vacuum_accounts)
6481 {
6482 fprintf(stderr, "starting vacuum pgbench_accounts...");
6483 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
6484 fprintf(stderr, "end.\n");
6485 }
6486 }
6487 PQfinish(con);
6488
6489 /* set up thread data structures */
6490 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
6491 nclients_dealt = 0;
6492
6493 for (i = 0; i < nthreads; i++)
6494 {
6495 TState *thread = &threads[i];
6496
6497 thread->tid = i;
6498 thread->state = &state[nclients_dealt];
6499 thread->nstate =
6500 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
6501 initRandomState(&thread->ts_choose_rs);
6502 initRandomState(&thread->ts_throttle_rs);
6503 initRandomState(&thread->ts_sample_rs);
6504 thread->logfile = NULL; /* filled in later */
6505 thread->latency_late = 0;
6506 initStats(&thread->stats, 0);
6507
6508 nclients_dealt += thread->nstate;
6509 }
6510
6511 /* all clients must be assigned to a thread */
6512 Assert(nclients_dealt == nclients);
6513
6514 /* get start up time for the whole computation */
6515 start_time = pg_time_now();
6516
6517 /* set alarm if duration is specified. */
6518 if (duration > 0)
6519 setalarm(duration);
6520
6521 errno = THREAD_BARRIER_INIT(&barrier, nthreads);
6522 if (errno != 0)
6523 pg_log_fatal("could not initialize barrier: %m");
6524
6525 #ifdef ENABLE_THREAD_SAFETY
6526 /* start all threads but thread 0 which is executed directly later */
6527 for (i = 1; i < nthreads; i++)
6528 {
6529 TState *thread = &threads[i];
6530
6531 thread->create_time = pg_time_now();
6532 errno = THREAD_CREATE(&thread->thread, threadRun, thread);
6533
6534 if (errno != 0)
6535 {
6536 pg_log_fatal("could not create thread: %m");
6537 exit(1);
6538 }
6539 }
6540 #else
6541 Assert(nthreads == 1);
6542 #endif /* ENABLE_THREAD_SAFETY */
6543
6544 /* compute when to stop */
6545 threads[0].create_time = pg_time_now();
6546 if (duration > 0)
6547 end_time = threads[0].create_time + (int64) 1000000 * duration;
6548
6549 /* run thread 0 directly */
6550 (void) threadRun(&threads[0]);
6551
6552 /* wait for other threads and accumulate results */
6553 initStats(&stats, 0);
6554 conn_total_duration = 0;
6555
6556 for (i = 0; i < nthreads; i++)
6557 {
6558 TState *thread = &threads[i];
6559
6560 #ifdef ENABLE_THREAD_SAFETY
6561 if (i > 0)
6562 THREAD_JOIN(thread->thread);
6563 #endif /* ENABLE_THREAD_SAFETY */
6564
6565 for (int j = 0; j < thread->nstate; j++)
6566 if (thread->state[j].state != CSTATE_FINISHED)
6567 exit_code = 2;
6568
6569 /* aggregate thread level stats */
6570 mergeSimpleStats(&stats.latency, &thread->stats.latency);
6571 mergeSimpleStats(&stats.lag, &thread->stats.lag);
6572 stats.cnt += thread->stats.cnt;
6573 stats.skipped += thread->stats.skipped;
6574 latency_late += thread->latency_late;
6575 conn_total_duration += thread->conn_duration;
6576
6577 /* first recorded benchmarking start time */
6578 if (bench_start == 0 || thread->bench_start < bench_start)
6579 bench_start = thread->bench_start;
6580 }
6581
6582 /*
6583 * All connections should be already closed in threadRun(), so this
6584 * disconnect_all() will be a no-op, but clean up the connecions just to
6585 * be sure. We don't need to measure the disconnection delays here.
6586 */
6587 disconnect_all(state, nclients);
6588
6589 /*
6590 * Beware that performance of short benchmarks with many threads and
6591 * possibly long transactions can be deceptive because threads do not
6592 * start and finish at the exact same time. The total duration computed
6593 * here encompasses all transactions so that tps shown is somehow slightly
6594 * underestimated.
6595 */
6596 printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
6597 bench_start - start_time, latency_late);
6598
6599 THREAD_BARRIER_DESTROY(&barrier);
6600
6601 if (exit_code != 0)
6602 pg_log_fatal("Run was aborted; the above results are incomplete.");
6603
6604 return exit_code;
6605 }
6606
6607 static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
threadRun(void * arg)6608 threadRun(void *arg)
6609 {
6610 TState *thread = (TState *) arg;
6611 CState *state = thread->state;
6612 pg_time_usec_t start;
6613 int nstate = thread->nstate;
6614 int remains = nstate; /* number of remaining clients */
6615 socket_set *sockets = alloc_socket_set(nstate);
6616 int64 thread_start,
6617 last_report,
6618 next_report;
6619 StatsData last,
6620 aggs;
6621
6622 /* open log file if requested */
6623 if (use_log)
6624 {
6625 char logpath[MAXPGPATH];
6626 char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
6627
6628 if (thread->tid == 0)
6629 snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
6630 else
6631 snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
6632
6633 thread->logfile = fopen(logpath, "w");
6634
6635 if (thread->logfile == NULL)
6636 {
6637 pg_log_fatal("could not open logfile \"%s\": %m", logpath);
6638 goto done;
6639 }
6640 }
6641
6642 /* explicitly initialize the state machines */
6643 for (int i = 0; i < nstate; i++)
6644 state[i].state = CSTATE_CHOOSE_SCRIPT;
6645
6646 /* READY */
6647 THREAD_BARRIER_WAIT(&barrier);
6648
6649 thread_start = pg_time_now();
6650 thread->started_time = thread_start;
6651 thread->conn_duration = 0;
6652 last_report = thread_start;
6653 next_report = last_report + (int64) 1000000 * progress;
6654
6655 /* STEADY */
6656 if (!is_connect)
6657 {
6658 /* make connections to the database before starting */
6659 for (int i = 0; i < nstate; i++)
6660 {
6661 if ((state[i].con = doConnect()) == NULL)
6662 {
6663 /*
6664 * On connection failure, we meet the barrier here in place of
6665 * GO before proceeding to the "done" path which will cleanup,
6666 * so as to avoid locking the process.
6667 *
6668 * It is unclear whether it is worth doing anything rather
6669 * than coldly exiting with an error message.
6670 */
6671 THREAD_BARRIER_WAIT(&barrier);
6672 goto done;
6673 }
6674 }
6675 }
6676
6677 /* GO */
6678 THREAD_BARRIER_WAIT(&barrier);
6679
6680 start = pg_time_now();
6681 thread->bench_start = start;
6682 thread->throttle_trigger = start;
6683
6684 /*
6685 * The log format currently has Unix epoch timestamps with whole numbers
6686 * of seconds. Round the first aggregate's start time down to the nearest
6687 * Unix epoch second (the very first aggregate might really have started a
6688 * fraction of a second later, but later aggregates are measured from the
6689 * whole number time that is actually logged).
6690 */
6691 initStats(&aggs, (start + epoch_shift) / 1000000 * 1000000);
6692 last = aggs;
6693
6694 /* loop till all clients have terminated */
6695 while (remains > 0)
6696 {
6697 int nsocks; /* number of sockets to be waited for */
6698 pg_time_usec_t min_usec;
6699 pg_time_usec_t now = 0; /* set this only if needed */
6700
6701 /*
6702 * identify which client sockets should be checked for input, and
6703 * compute the nearest time (if any) at which we need to wake up.
6704 */
6705 clear_socket_set(sockets);
6706 nsocks = 0;
6707 min_usec = PG_INT64_MAX;
6708 for (int i = 0; i < nstate; i++)
6709 {
6710 CState *st = &state[i];
6711
6712 if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
6713 {
6714 /* a nap from the script, or under throttling */
6715 pg_time_usec_t this_usec;
6716
6717 /* get current time if needed */
6718 pg_time_now_lazy(&now);
6719
6720 /* min_usec should be the minimum delay across all clients */
6721 this_usec = (st->state == CSTATE_SLEEP ?
6722 st->sleep_until : st->txn_scheduled) - now;
6723 if (min_usec > this_usec)
6724 min_usec = this_usec;
6725 }
6726 else if (st->state == CSTATE_WAIT_RESULT)
6727 {
6728 /*
6729 * waiting for result from server - nothing to do unless the
6730 * socket is readable
6731 */
6732 int sock = PQsocket(st->con);
6733
6734 if (sock < 0)
6735 {
6736 pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
6737 goto done;
6738 }
6739
6740 add_socket_to_set(sockets, sock, nsocks++);
6741 }
6742 else if (st->state != CSTATE_ABORTED &&
6743 st->state != CSTATE_FINISHED)
6744 {
6745 /*
6746 * This client thread is ready to do something, so we don't
6747 * want to wait. No need to examine additional clients.
6748 */
6749 min_usec = 0;
6750 break;
6751 }
6752 }
6753
6754 /* also wake up to print the next progress report on time */
6755 if (progress && min_usec > 0 && thread->tid == 0)
6756 {
6757 pg_time_now_lazy(&now);
6758
6759 if (now >= next_report)
6760 min_usec = 0;
6761 else if ((next_report - now) < min_usec)
6762 min_usec = next_report - now;
6763 }
6764
6765 /*
6766 * If no clients are ready to execute actions, sleep until we receive
6767 * data on some client socket or the timeout (if any) elapses.
6768 */
6769 if (min_usec > 0)
6770 {
6771 int rc = 0;
6772
6773 if (min_usec != PG_INT64_MAX)
6774 {
6775 if (nsocks > 0)
6776 {
6777 rc = wait_on_socket_set(sockets, min_usec);
6778 }
6779 else /* nothing active, simple sleep */
6780 {
6781 pg_usleep(min_usec);
6782 }
6783 }
6784 else /* no explicit delay, wait without timeout */
6785 {
6786 rc = wait_on_socket_set(sockets, 0);
6787 }
6788
6789 if (rc < 0)
6790 {
6791 if (errno == EINTR)
6792 {
6793 /* On EINTR, go back to top of loop */
6794 continue;
6795 }
6796 /* must be something wrong */
6797 pg_log_error("%s() failed: %m", SOCKET_WAIT_METHOD);
6798 goto done;
6799 }
6800 }
6801 else
6802 {
6803 /* min_usec <= 0, i.e. something needs to be executed now */
6804
6805 /* If we didn't wait, don't try to read any data */
6806 clear_socket_set(sockets);
6807 }
6808
6809 /* ok, advance the state machine of each connection */
6810 nsocks = 0;
6811 for (int i = 0; i < nstate; i++)
6812 {
6813 CState *st = &state[i];
6814
6815 if (st->state == CSTATE_WAIT_RESULT)
6816 {
6817 /* don't call advanceConnectionState unless data is available */
6818 int sock = PQsocket(st->con);
6819
6820 if (sock < 0)
6821 {
6822 pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
6823 goto done;
6824 }
6825
6826 if (!socket_has_input(sockets, sock, nsocks++))
6827 continue;
6828 }
6829 else if (st->state == CSTATE_FINISHED ||
6830 st->state == CSTATE_ABORTED)
6831 {
6832 /* this client is done, no need to consider it anymore */
6833 continue;
6834 }
6835
6836 advanceConnectionState(thread, st, &aggs);
6837
6838 /*
6839 * If advanceConnectionState changed client to finished state,
6840 * that's one fewer client that remains.
6841 */
6842 if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
6843 remains--;
6844 }
6845
6846 /* progress report is made by thread 0 for all threads */
6847 if (progress && thread->tid == 0)
6848 {
6849 pg_time_usec_t now = pg_time_now();
6850
6851 if (now >= next_report)
6852 {
6853 /*
6854 * Horrible hack: this relies on the thread pointer we are
6855 * passed to be equivalent to threads[0], that is the first
6856 * entry of the threads array. That is why this MUST be done
6857 * by thread 0 and not any other.
6858 */
6859 printProgressReport(thread, thread_start, now,
6860 &last, &last_report);
6861
6862 /*
6863 * Ensure that the next report is in the future, in case
6864 * pgbench/postgres got stuck somewhere.
6865 */
6866 do
6867 {
6868 next_report += (int64) 1000000 * progress;
6869 } while (now >= next_report);
6870 }
6871 }
6872 }
6873
6874 done:
6875 disconnect_all(state, nstate);
6876
6877 if (thread->logfile)
6878 {
6879 if (agg_interval > 0)
6880 {
6881 /* log aggregated but not yet reported transactions */
6882 doLog(thread, state, &aggs, false, 0, 0);
6883 }
6884 fclose(thread->logfile);
6885 thread->logfile = NULL;
6886 }
6887 free_socket_set(sockets);
6888 THREAD_FUNC_RETURN;
6889 }
6890
6891 static void
finishCon(CState * st)6892 finishCon(CState *st)
6893 {
6894 if (st->con != NULL)
6895 {
6896 PQfinish(st->con);
6897 st->con = NULL;
6898 }
6899 }
6900
6901 /*
6902 * Support for duration option: set timer_exceeded after so many seconds.
6903 */
6904
6905 #ifndef WIN32
6906
6907 static void
handle_sig_alarm(SIGNAL_ARGS)6908 handle_sig_alarm(SIGNAL_ARGS)
6909 {
6910 timer_exceeded = true;
6911 }
6912
6913 static void
setalarm(int seconds)6914 setalarm(int seconds)
6915 {
6916 pqsignal(SIGALRM, handle_sig_alarm);
6917 alarm(seconds);
6918 }
6919
6920 #else /* WIN32 */
6921
6922 static VOID CALLBACK
win32_timer_callback(PVOID lpParameter,BOOLEAN TimerOrWaitFired)6923 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
6924 {
6925 timer_exceeded = true;
6926 }
6927
6928 static void
setalarm(int seconds)6929 setalarm(int seconds)
6930 {
6931 HANDLE queue;
6932 HANDLE timer;
6933
6934 /* This function will be called at most once, so we can cheat a bit. */
6935 queue = CreateTimerQueue();
6936 if (seconds > ((DWORD) -1) / 1000 ||
6937 !CreateTimerQueueTimer(&timer, queue,
6938 win32_timer_callback, NULL, seconds * 1000, 0,
6939 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
6940 {
6941 pg_log_fatal("failed to set timer");
6942 exit(1);
6943 }
6944 }
6945
6946 #endif /* WIN32 */
6947
6948
6949 /*
6950 * These functions provide an abstraction layer that hides the syscall
6951 * we use to wait for input on a set of sockets.
6952 *
6953 * Currently there are two implementations, based on ppoll(2) and select(2).
6954 * ppoll() is preferred where available due to its typically higher ceiling
6955 * on the number of usable sockets. We do not use the more-widely-available
6956 * poll(2) because it only offers millisecond timeout resolution, which could
6957 * be problematic with high --rate settings.
6958 *
6959 * Function APIs:
6960 *
6961 * alloc_socket_set: allocate an empty socket set with room for up to
6962 * "count" sockets.
6963 *
6964 * free_socket_set: deallocate a socket set.
6965 *
6966 * clear_socket_set: reset a socket set to empty.
6967 *
6968 * add_socket_to_set: add socket with indicated FD to slot "idx" in the
6969 * socket set. Slots must be filled in order, starting with 0.
6970 *
6971 * wait_on_socket_set: wait for input on any socket in set, or for timeout
6972 * to expire. timeout is measured in microseconds; 0 means wait forever.
6973 * Returns result code of underlying syscall (>=0 if OK, else see errno).
6974 *
6975 * socket_has_input: after waiting, call this to see if given socket has
6976 * input. fd and idx parameters should match some previous call to
6977 * add_socket_to_set.
6978 *
6979 * Note that wait_on_socket_set destructively modifies the state of the
6980 * socket set. After checking for input, caller must apply clear_socket_set
6981 * and add_socket_to_set again before waiting again.
6982 */
6983
6984 #ifdef POLL_USING_PPOLL
6985
6986 static socket_set *
alloc_socket_set(int count)6987 alloc_socket_set(int count)
6988 {
6989 socket_set *sa;
6990
6991 sa = (socket_set *) pg_malloc0(offsetof(socket_set, pollfds) +
6992 sizeof(struct pollfd) * count);
6993 sa->maxfds = count;
6994 sa->curfds = 0;
6995 return sa;
6996 }
6997
6998 static void
free_socket_set(socket_set * sa)6999 free_socket_set(socket_set *sa)
7000 {
7001 pg_free(sa);
7002 }
7003
7004 static void
clear_socket_set(socket_set * sa)7005 clear_socket_set(socket_set *sa)
7006 {
7007 sa->curfds = 0;
7008 }
7009
7010 static void
add_socket_to_set(socket_set * sa,int fd,int idx)7011 add_socket_to_set(socket_set *sa, int fd, int idx)
7012 {
7013 Assert(idx < sa->maxfds && idx == sa->curfds);
7014 sa->pollfds[idx].fd = fd;
7015 sa->pollfds[idx].events = POLLIN;
7016 sa->pollfds[idx].revents = 0;
7017 sa->curfds++;
7018 }
7019
7020 static int
wait_on_socket_set(socket_set * sa,int64 usecs)7021 wait_on_socket_set(socket_set *sa, int64 usecs)
7022 {
7023 if (usecs > 0)
7024 {
7025 struct timespec timeout;
7026
7027 timeout.tv_sec = usecs / 1000000;
7028 timeout.tv_nsec = (usecs % 1000000) * 1000;
7029 return ppoll(sa->pollfds, sa->curfds, &timeout, NULL);
7030 }
7031 else
7032 {
7033 return ppoll(sa->pollfds, sa->curfds, NULL, NULL);
7034 }
7035 }
7036
7037 static bool
socket_has_input(socket_set * sa,int fd,int idx)7038 socket_has_input(socket_set *sa, int fd, int idx)
7039 {
7040 /*
7041 * In some cases, threadRun will apply clear_socket_set and then try to
7042 * apply socket_has_input anyway with arguments that it used before that,
7043 * or might've used before that except that it exited its setup loop
7044 * early. Hence, if the socket set is empty, silently return false
7045 * regardless of the parameters. If it's not empty, we can Assert that
7046 * the parameters match a previous call.
7047 */
7048 if (sa->curfds == 0)
7049 return false;
7050
7051 Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd);
7052 return (sa->pollfds[idx].revents & POLLIN) != 0;
7053 }
7054
7055 #endif /* POLL_USING_PPOLL */
7056
7057 #ifdef POLL_USING_SELECT
7058
7059 static socket_set *
alloc_socket_set(int count)7060 alloc_socket_set(int count)
7061 {
7062 return (socket_set *) pg_malloc0(sizeof(socket_set));
7063 }
7064
7065 static void
free_socket_set(socket_set * sa)7066 free_socket_set(socket_set *sa)
7067 {
7068 pg_free(sa);
7069 }
7070
7071 static void
clear_socket_set(socket_set * sa)7072 clear_socket_set(socket_set *sa)
7073 {
7074 FD_ZERO(&sa->fds);
7075 sa->maxfd = -1;
7076 }
7077
7078 static void
add_socket_to_set(socket_set * sa,int fd,int idx)7079 add_socket_to_set(socket_set *sa, int fd, int idx)
7080 {
7081 if (fd < 0 || fd >= FD_SETSIZE)
7082 {
7083 /*
7084 * Doing a hard exit here is a bit grotty, but it doesn't seem worth
7085 * complicating the API to make it less grotty.
7086 */
7087 pg_log_fatal("too many client connections for select()");
7088 exit(1);
7089 }
7090 FD_SET(fd, &sa->fds);
7091 if (fd > sa->maxfd)
7092 sa->maxfd = fd;
7093 }
7094
7095 static int
wait_on_socket_set(socket_set * sa,int64 usecs)7096 wait_on_socket_set(socket_set *sa, int64 usecs)
7097 {
7098 if (usecs > 0)
7099 {
7100 struct timeval timeout;
7101
7102 timeout.tv_sec = usecs / 1000000;
7103 timeout.tv_usec = usecs % 1000000;
7104 return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout);
7105 }
7106 else
7107 {
7108 return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL);
7109 }
7110 }
7111
7112 static bool
socket_has_input(socket_set * sa,int fd,int idx)7113 socket_has_input(socket_set *sa, int fd, int idx)
7114 {
7115 return (FD_ISSET(fd, &sa->fds) != 0);
7116 }
7117
7118 #endif /* POLL_USING_SELECT */
7119