1 /*
2 * pgbench.c
3 *
4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
6 *
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2016, PostgreSQL Global Development Group
9 * ALL RIGHTS RESERVED;
10 *
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
15 *
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
21 *
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27 *
28 */
29
30 #ifdef WIN32
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
32 #endif /* ! WIN32 */
33
34 #include "postgres_fe.h"
35
36 #include "getopt_long.h"
37 #include "libpq-fe.h"
38 #include "portability/instr_time.h"
39
40 #include <ctype.h>
41 #include <float.h>
42 #include <limits.h>
43 #include <math.h>
44 #include <signal.h>
45 #include <sys/time.h>
46 #ifdef HAVE_SYS_SELECT_H
47 #include <sys/select.h>
48 #endif
49
50 #ifdef HAVE_SYS_RESOURCE_H
51 #include <sys/resource.h> /* for getrlimit */
52 #endif
53
54 #ifndef M_PI
55 #define M_PI 3.14159265358979323846
56 #endif
57
58 #include "pgbench.h"
59
60 #define ERRCODE_UNDEFINED_TABLE "42P01"
61
62 /*
63 * Multi-platform pthread implementations
64 */
65
66 #ifdef WIN32
67 /* Use native win32 threads on Windows */
68 typedef struct win32_pthread *pthread_t;
69 typedef int pthread_attr_t;
70
71 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
72 static int pthread_join(pthread_t th, void **thread_return);
73 #elif defined(ENABLE_THREAD_SAFETY)
74 /* Use platform-dependent pthread capability */
75 #include <pthread.h>
76 #else
77 /* No threads implementation, use none (-j 1) */
78 #define pthread_t void *
79 #endif
80
81
82 /********************************************************************
83 * some configurable parameters */
84
85 /* max number of clients allowed */
86 #ifdef FD_SETSIZE
87 #define MAXCLIENTS (FD_SETSIZE - 10)
88 #else
89 #define MAXCLIENTS 1024
90 #endif
91
92 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
93 #define DEFAULT_NXACTS 10 /* default nxacts */
94
95 #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
96
97 int nxacts = 0; /* number of transactions per client */
98 int duration = 0; /* duration in seconds */
99 int64 end_time = 0; /* when to stop in micro seconds, under -T */
100
101 /*
102 * scaling factor. for example, scale = 10 will make 1000000 tuples in
103 * pgbench_accounts table.
104 */
105 int scale = 1;
106
107 /*
108 * fillfactor. for example, fillfactor = 90 will use only 90 percent
109 * space during inserts and leave 10 percent free.
110 */
111 int fillfactor = 100;
112
113 /*
114 * create foreign key constraints on the tables?
115 */
116 int foreign_keys = 0;
117
118 /*
119 * use unlogged tables?
120 */
121 int unlogged_tables = 0;
122
123 /*
124 * log sampling rate (1.0 = log everything, 0.0 = option not given)
125 */
126 double sample_rate = 0.0;
127
128 /*
129 * When threads are throttled to a given rate limit, this is the target delay
130 * to reach that rate in usec. 0 is the default and means no throttling.
131 */
132 int64 throttle_delay = 0;
133
134 /*
135 * Transactions which take longer than this limit (in usec) are counted as
136 * late, and reported as such, although they are completed anyway. When
137 * throttling is enabled, execution time slots that are more than this late
138 * are skipped altogether, and counted separately.
139 */
140 int64 latency_limit = 0;
141
142 /*
143 * tablespace selection
144 */
145 char *tablespace = NULL;
146 char *index_tablespace = NULL;
147
148 /*
149 * end of configurable parameters
150 *********************************************************************/
151
152 #define nbranches 1 /* Makes little sense to change this. Change
153 * -s instead */
154 #define ntellers 10
155 #define naccounts 100000
156
157 /*
158 * The scale factor at/beyond which 32bit integers are incapable of storing
159 * 64bit values.
160 *
161 * Although the actual threshold is 21474, we use 20000 because it is easier to
162 * document and remember, and isn't that far away from the real threshold.
163 */
164 #define SCALE_32BIT_THRESHOLD 20000
165
166 bool use_log; /* log transaction latencies to a file */
167 bool use_quiet; /* quiet logging onto stderr */
168 int agg_interval; /* log aggregates instead of individual
169 * transactions */
170 bool per_script_stats = false; /* whether to collect stats per script */
171 int progress = 0; /* thread progress report every this seconds */
172 bool progress_timestamp = false; /* progress report with Unix time */
173 int nclients = 1; /* number of clients */
174 int nthreads = 1; /* number of threads */
175 bool is_connect; /* establish connection for each transaction */
176 bool is_latencies; /* report per-command latencies */
177 int main_pid; /* main process id used in log filename */
178
179 char *pghost = "";
180 char *pgport = "";
181 char *login = NULL;
182 char *dbName;
183 const char *progname;
184
185 #define WSEP '@' /* weight separator */
186
187 volatile bool timer_exceeded = false; /* flag from signal handler */
188
189 /*
190 * Variable definitions. If a variable has a string value, "value" is that
191 * value, is_numeric is false, and num_value is undefined. If the value is
192 * known to be numeric, is_numeric is true and num_value contains the value
193 * (in any permitted numeric variant). In this case "value" contains the
194 * string equivalent of the number, if we've had occasion to compute that,
195 * or NULL if we haven't.
196 */
197 typedef struct
198 {
199 char *name; /* variable's name */
200 char *value; /* its value in string form, if known */
201 bool is_numeric; /* is numeric value known? */
202 PgBenchValue num_value; /* variable's value in numeric form */
203 } Variable;
204
205 #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
206 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
207
208 /*
209 * Simple data structure to keep stats about something.
210 *
211 * XXX probably the first value should be kept and used as an offset for
212 * better numerical stability...
213 */
214 typedef struct SimpleStats
215 {
216 int64 count; /* how many values were encountered */
217 double min; /* the minimum seen */
218 double max; /* the maximum seen */
219 double sum; /* sum of values */
220 double sum2; /* sum of squared values */
221 } SimpleStats;
222
223 /*
224 * Data structure to hold various statistics: per-thread and per-script stats
225 * are maintained and merged together.
226 */
227 typedef struct StatsData
228 {
229 long start_time; /* interval start time, for aggregates */
230 int64 cnt; /* number of transactions */
231 int64 skipped; /* number of transactions skipped under --rate
232 * and --latency-limit */
233 SimpleStats latency;
234 SimpleStats lag;
235 } StatsData;
236
237 /*
238 * Connection state
239 */
240 typedef struct
241 {
242 PGconn *con; /* connection handle to DB */
243 int id; /* client No. */
244 int state; /* state No. */
245 bool listen; /* whether an async query has been sent */
246 bool sleeping; /* whether the client is napping */
247 bool throttling; /* whether nap is for throttling */
248 bool is_throttled; /* whether transaction throttling is done */
249 Variable *variables; /* array of variable definitions */
250 int nvariables; /* number of variables */
251 bool vars_sorted; /* are variables sorted by name? */
252 int64 txn_scheduled; /* scheduled start time of transaction (usec) */
253 int64 sleep_until; /* scheduled start time of next cmd (usec) */
254 instr_time txn_begin; /* used for measuring schedule lag times */
255 instr_time stmt_begin; /* used for measuring statement latencies */
256 int use_file; /* index in sql_scripts for this client */
257 bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
258
259 /* per client collected stats */
260 int64 cnt; /* transaction count */
261 int ecnt; /* error count */
262 } CState;
263
264 /*
265 * Thread state
266 */
267 typedef struct
268 {
269 int tid; /* thread id */
270 pthread_t thread; /* thread handle */
271 CState *state; /* array of CState */
272 int nstate; /* length of state[] */
273 unsigned short random_state[3]; /* separate randomness for each thread */
274 int64 throttle_trigger; /* previous/next throttling (us) */
275 FILE *logfile; /* where to log, or NULL */
276
277 /* per thread collected stats */
278 instr_time start_time; /* thread start time */
279 instr_time conn_time;
280 StatsData stats;
281 int64 latency_late; /* executed but late transactions */
282 } TState;
283
284 #define INVALID_THREAD ((pthread_t) 0)
285
286 /*
287 * queries read from files
288 */
289 #define SQL_COMMAND 1
290 #define META_COMMAND 2
291 #define MAX_ARGS 10
292
293 typedef enum QueryMode
294 {
295 QUERY_SIMPLE, /* simple query */
296 QUERY_EXTENDED, /* extended query */
297 QUERY_PREPARED, /* extended query with prepared statements */
298 NUM_QUERYMODE
299 } QueryMode;
300
301 static QueryMode querymode = QUERY_SIMPLE;
302 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
303
304 typedef struct
305 {
306 char *line; /* text of command line */
307 int command_num; /* unique index of this Command struct */
308 int type; /* command type (SQL_COMMAND or META_COMMAND) */
309 int argc; /* number of command words */
310 char *argv[MAX_ARGS]; /* command word list */
311 PgBenchExpr *expr; /* parsed expression, if needed */
312 SimpleStats stats; /* time spent in this command */
313 } Command;
314
315 typedef struct ParsedScript
316 {
317 const char *desc; /* script descriptor (eg, file name) */
318 int weight; /* selection weight */
319 Command **commands; /* NULL-terminated array of Commands */
320 StatsData stats; /* total time spent in script */
321 } ParsedScript;
322
323 static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
324 static int num_scripts; /* number of scripts in sql_script[] */
325 static int num_commands = 0; /* total number of Command structs */
326 static int64 total_weight = 0;
327
328 static int debug = 0; /* debug flag */
329
330 /* Builtin test scripts */
331 typedef struct BuiltinScript
332 {
333 const char *name; /* very short name for -b ... */
334 const char *desc; /* short description */
335 const char *script; /* actual pgbench script */
336 } BuiltinScript;
337
338 static const BuiltinScript builtin_script[] =
339 {
340 {
341 "tpcb-like",
342 "<builtin: TPC-B (sort of)>",
343 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
344 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
345 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
346 "\\set delta random(-5000, 5000)\n"
347 "BEGIN;\n"
348 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
349 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
350 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
351 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
352 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
353 "END;\n"
354 },
355 {
356 "simple-update",
357 "<builtin: simple update>",
358 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
359 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
360 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
361 "\\set delta random(-5000, 5000)\n"
362 "BEGIN;\n"
363 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
364 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
365 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
366 "END;\n"
367 },
368 {
369 "select-only",
370 "<builtin: select only>",
371 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
372 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
373 }
374 };
375
376
377 /* Function prototypes */
378 static void setIntValue(PgBenchValue *pv, int64 ival);
379 static void setDoubleValue(PgBenchValue *pv, double dval);
380 static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *);
381 static void doLog(TState *thread, CState *st, instr_time *now,
382 StatsData *agg, bool skipped, double latency, double lag);
383 static void processXactStats(TState *thread, CState *st, instr_time *now,
384 bool skipped, StatsData *agg);
385 static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
386 static void addScript(ParsedScript script);
387 static void *threadRun(void *arg);
388 static void setalarm(int seconds);
389
390
391 /* callback functions for our flex lexer */
392 static const PsqlScanCallbacks pgbench_callbacks = {
393 NULL, /* don't need get_variable functionality */
394 pgbench_error
395 };
396
397
398 static void
usage(void)399 usage(void)
400 {
401 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
402 "Usage:\n"
403 " %s [OPTION]... [DBNAME]\n"
404 "\nInitialization options:\n"
405 " -i, --initialize invokes initialization mode\n"
406 " -F, --fillfactor=NUM set fill factor\n"
407 " -n, --no-vacuum do not run VACUUM after initialization\n"
408 " -q, --quiet quiet logging (one message each 5 seconds)\n"
409 " -s, --scale=NUM scaling factor\n"
410 " --foreign-keys create foreign key constraints between tables\n"
411 " --index-tablespace=TABLESPACE\n"
412 " create indexes in the specified tablespace\n"
413 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
414 " --unlogged-tables create tables as unlogged tables\n"
415 "\nOptions to select what to run:\n"
416 " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
417 " (use \"-b list\" to list available scripts)\n"
418 " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
419 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
420 " (same as \"-b simple-update\")\n"
421 " -S, --select-only perform SELECT-only transactions\n"
422 " (same as \"-b select-only\")\n"
423 "\nBenchmarking options:\n"
424 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
425 " -C, --connect establish new connection for each transaction\n"
426 " -D, --define=VARNAME=VALUE\n"
427 " define variable for use by custom script\n"
428 " -j, --jobs=NUM number of threads (default: 1)\n"
429 " -l, --log write transaction times to log file\n"
430 " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
431 " -M, --protocol=simple|extended|prepared\n"
432 " protocol for submitting queries (default: simple)\n"
433 " -n, --no-vacuum do not run VACUUM before tests\n"
434 " -P, --progress=NUM show thread progress report every NUM seconds\n"
435 " -r, --report-latencies report average latency per command\n"
436 " -R, --rate=NUM target rate in transactions per second\n"
437 " -s, --scale=NUM report this scale factor in output\n"
438 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
439 " -T, --time=NUM duration of benchmark test in seconds\n"
440 " -v, --vacuum-all vacuum all four standard tables before tests\n"
441 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
442 " --progress-timestamp use Unix epoch timestamps for progress\n"
443 " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
444 "\nCommon options:\n"
445 " -d, --debug print debugging output\n"
446 " -h, --host=HOSTNAME database server host or socket directory\n"
447 " -p, --port=PORT database server port number\n"
448 " -U, --username=USERNAME connect as specified database user\n"
449 " -V, --version output version information, then exit\n"
450 " -?, --help show this help, then exit\n"
451 "\n"
452 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
453 progname, progname);
454 }
455
456 /* return whether str matches "^\s*[-+]?[0-9]+$" */
457 static bool
is_an_int(const char * str)458 is_an_int(const char *str)
459 {
460 const char *ptr = str;
461
462 /* skip leading spaces; cast is consistent with strtoint64 */
463 while (*ptr && isspace((unsigned char) *ptr))
464 ptr++;
465
466 /* skip sign */
467 if (*ptr == '+' || *ptr == '-')
468 ptr++;
469
470 /* at least one digit */
471 if (*ptr && !isdigit((unsigned char) *ptr))
472 return false;
473
474 /* eat all digits */
475 while (*ptr && isdigit((unsigned char) *ptr))
476 ptr++;
477
478 /* must have reached end of string */
479 return *ptr == '\0';
480 }
481
482
483 /*
484 * strtoint64 -- convert a string to 64-bit integer
485 *
486 * This function is a modified version of scanint8() from
487 * src/backend/utils/adt/int8.c.
488 */
489 int64
strtoint64(const char * str)490 strtoint64(const char *str)
491 {
492 const char *ptr = str;
493 int64 result = 0;
494 int sign = 1;
495
496 /*
497 * Do our own scan, rather than relying on sscanf which might be broken
498 * for long long.
499 */
500
501 /* skip leading spaces */
502 while (*ptr && isspace((unsigned char) *ptr))
503 ptr++;
504
505 /* handle sign */
506 if (*ptr == '-')
507 {
508 ptr++;
509
510 /*
511 * Do an explicit check for INT64_MIN. Ugly though this is, it's
512 * cleaner than trying to get the loop below to handle it portably.
513 */
514 if (strncmp(ptr, "9223372036854775808", 19) == 0)
515 {
516 result = PG_INT64_MIN;
517 ptr += 19;
518 goto gotdigits;
519 }
520 sign = -1;
521 }
522 else if (*ptr == '+')
523 ptr++;
524
525 /* require at least one digit */
526 if (!isdigit((unsigned char) *ptr))
527 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
528
529 /* process digits */
530 while (*ptr && isdigit((unsigned char) *ptr))
531 {
532 int64 tmp = result * 10 + (*ptr++ - '0');
533
534 if ((tmp / 10) != result) /* overflow? */
535 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
536 result = tmp;
537 }
538
539 gotdigits:
540
541 /* allow trailing whitespace, but not other trailing chars */
542 while (*ptr != '\0' && isspace((unsigned char) *ptr))
543 ptr++;
544
545 if (*ptr != '\0')
546 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
547
548 return ((sign < 0) ? -result : result);
549 }
550
551 /* random number generator: uniform distribution from min to max inclusive */
552 static int64
getrand(TState * thread,int64 min,int64 max)553 getrand(TState *thread, int64 min, int64 max)
554 {
555 /*
556 * Odd coding is so that min and max have approximately the same chance of
557 * being selected as do numbers between them.
558 *
559 * pg_erand48() is thread-safe and concurrent, which is why we use it
560 * rather than random(), which in glibc is non-reentrant, and therefore
561 * protected by a mutex, and therefore a bottleneck on machines with many
562 * CPUs.
563 */
564 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
565 }
566
567 /*
568 * random number generator: exponential distribution from min to max inclusive.
569 * the parameter is so that the density of probability for the last cut-off max
570 * value is exp(-parameter).
571 */
572 static int64
getExponentialRand(TState * thread,int64 min,int64 max,double parameter)573 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
574 {
575 double cut,
576 uniform,
577 rand;
578
579 /* abort if wrong parameter, but must really be checked beforehand */
580 Assert(parameter > 0.0);
581 cut = exp(-parameter);
582 /* erand in [0, 1), uniform in (0, 1] */
583 uniform = 1.0 - pg_erand48(thread->random_state);
584
585 /*
586 * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
587 */
588 Assert((1.0 - cut) != 0.0);
589 rand = -log(cut + (1.0 - cut) * uniform) / parameter;
590 /* return int64 random number within between min and max */
591 return min + (int64) ((max - min + 1) * rand);
592 }
593
594 /* random number generator: gaussian distribution from min to max inclusive */
595 static int64
getGaussianRand(TState * thread,int64 min,int64 max,double parameter)596 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
597 {
598 double stdev;
599 double rand;
600
601 /* abort if parameter is too low, but must really be checked beforehand */
602 Assert(parameter >= MIN_GAUSSIAN_PARAM);
603
604 /*
605 * Get user specified random number from this loop, with -parameter <
606 * stdev <= parameter
607 *
608 * This loop is executed until the number is in the expected range.
609 *
610 * As the minimum parameter is 2.0, the probability of looping is low:
611 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
612 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
613 * the worst case. For a parameter value of 5.0, the looping probability
614 * is about e^{-5} * 2 / pi ~ 0.43%.
615 */
616 do
617 {
618 /*
619 * pg_erand48 generates [0,1), but for the basic version of the
620 * Box-Muller transform the two uniformly distributed random numbers
621 * are expected in (0, 1] (see
622 * http://en.wikipedia.org/wiki/Box_muller)
623 */
624 double rand1 = 1.0 - pg_erand48(thread->random_state);
625 double rand2 = 1.0 - pg_erand48(thread->random_state);
626
627 /* Box-Muller basic form transform */
628 double var_sqrt = sqrt(-2.0 * log(rand1));
629
630 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
631
632 /*
633 * we may try with cos, but there may be a bias induced if the
634 * previous value fails the test. To be on the safe side, let us try
635 * over.
636 */
637 }
638 while (stdev < -parameter || stdev >= parameter);
639
640 /* stdev is in [-parameter, parameter), normalization to [0,1) */
641 rand = (stdev + parameter) / (parameter * 2.0);
642
643 /* return int64 random number within between min and max */
644 return min + (int64) ((max - min + 1) * rand);
645 }
646
647 /*
648 * random number generator: generate a value, such that the series of values
649 * will approximate a Poisson distribution centered on the given value.
650 */
651 static int64
getPoissonRand(TState * thread,int64 center)652 getPoissonRand(TState *thread, int64 center)
653 {
654 /*
655 * Use inverse transform sampling to generate a value > 0, such that the
656 * expected (i.e. average) value is the given argument.
657 */
658 double uniform;
659
660 /* erand in [0, 1), uniform in (0, 1] */
661 uniform = 1.0 - pg_erand48(thread->random_state);
662
663 return (int64) (-log(uniform) * ((double) center) + 0.5);
664 }
665
666 /*
667 * Initialize the given SimpleStats struct to all zeroes
668 */
669 static void
initSimpleStats(SimpleStats * ss)670 initSimpleStats(SimpleStats *ss)
671 {
672 memset(ss, 0, sizeof(SimpleStats));
673 }
674
675 /*
676 * Accumulate one value into a SimpleStats struct.
677 */
678 static void
addToSimpleStats(SimpleStats * ss,double val)679 addToSimpleStats(SimpleStats *ss, double val)
680 {
681 if (ss->count == 0 || val < ss->min)
682 ss->min = val;
683 if (ss->count == 0 || val > ss->max)
684 ss->max = val;
685 ss->count++;
686 ss->sum += val;
687 ss->sum2 += val * val;
688 }
689
690 /*
691 * Merge two SimpleStats objects
692 */
693 static void
mergeSimpleStats(SimpleStats * acc,SimpleStats * ss)694 mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
695 {
696 if (acc->count == 0 || ss->min < acc->min)
697 acc->min = ss->min;
698 if (acc->count == 0 || ss->max > acc->max)
699 acc->max = ss->max;
700 acc->count += ss->count;
701 acc->sum += ss->sum;
702 acc->sum2 += ss->sum2;
703 }
704
705 /*
706 * Initialize a StatsData struct to mostly zeroes, with its start time set to
707 * the given value.
708 */
709 static void
initStats(StatsData * sd,double start_time)710 initStats(StatsData *sd, double start_time)
711 {
712 sd->start_time = start_time;
713 sd->cnt = 0;
714 sd->skipped = 0;
715 initSimpleStats(&sd->latency);
716 initSimpleStats(&sd->lag);
717 }
718
719 /*
720 * Accumulate one additional item into the given stats object.
721 */
722 static void
accumStats(StatsData * stats,bool skipped,double lat,double lag)723 accumStats(StatsData *stats, bool skipped, double lat, double lag)
724 {
725 stats->cnt++;
726
727 if (skipped)
728 {
729 /* no latency to record on skipped transactions */
730 stats->skipped++;
731 }
732 else
733 {
734 addToSimpleStats(&stats->latency, lat);
735
736 /* and possibly the same for schedule lag */
737 if (throttle_delay)
738 addToSimpleStats(&stats->lag, lag);
739 }
740 }
741
742 /* call PQexec() and exit() on failure */
743 static void
executeStatement(PGconn * con,const char * sql)744 executeStatement(PGconn *con, const char *sql)
745 {
746 PGresult *res;
747
748 res = PQexec(con, sql);
749 if (PQresultStatus(res) != PGRES_COMMAND_OK)
750 {
751 fprintf(stderr, "%s", PQerrorMessage(con));
752 exit(1);
753 }
754 PQclear(res);
755 }
756
757 /* call PQexec() and complain, but without exiting, on failure */
758 static void
tryExecuteStatement(PGconn * con,const char * sql)759 tryExecuteStatement(PGconn *con, const char *sql)
760 {
761 PGresult *res;
762
763 res = PQexec(con, sql);
764 if (PQresultStatus(res) != PGRES_COMMAND_OK)
765 {
766 fprintf(stderr, "%s", PQerrorMessage(con));
767 fprintf(stderr, "(ignoring this error and continuing anyway)\n");
768 }
769 PQclear(res);
770 }
771
772 /* set up a connection to the backend */
773 static PGconn *
doConnect(void)774 doConnect(void)
775 {
776 PGconn *conn;
777 static char *password = NULL;
778 bool new_pass;
779
780 /*
781 * Start the connection. Loop until we have a password if requested by
782 * backend.
783 */
784 do
785 {
786 #define PARAMS_ARRAY_SIZE 7
787
788 const char *keywords[PARAMS_ARRAY_SIZE];
789 const char *values[PARAMS_ARRAY_SIZE];
790
791 keywords[0] = "host";
792 values[0] = pghost;
793 keywords[1] = "port";
794 values[1] = pgport;
795 keywords[2] = "user";
796 values[2] = login;
797 keywords[3] = "password";
798 values[3] = password;
799 keywords[4] = "dbname";
800 values[4] = dbName;
801 keywords[5] = "fallback_application_name";
802 values[5] = progname;
803 keywords[6] = NULL;
804 values[6] = NULL;
805
806 new_pass = false;
807
808 conn = PQconnectdbParams(keywords, values, true);
809
810 if (!conn)
811 {
812 fprintf(stderr, "connection to database \"%s\" failed\n",
813 dbName);
814 return NULL;
815 }
816
817 if (PQstatus(conn) == CONNECTION_BAD &&
818 PQconnectionNeedsPassword(conn) &&
819 password == NULL)
820 {
821 PQfinish(conn);
822 password = simple_prompt("Password: ", 100, false);
823 new_pass = true;
824 }
825 } while (new_pass);
826
827 /* check to see that the backend connection was successfully made */
828 if (PQstatus(conn) == CONNECTION_BAD)
829 {
830 fprintf(stderr, "connection to database \"%s\" failed:\n%s",
831 PQdb(conn), PQerrorMessage(conn));
832 PQfinish(conn);
833 return NULL;
834 }
835
836 return conn;
837 }
838
839 /* throw away response from backend */
840 static void
discard_response(CState * state)841 discard_response(CState *state)
842 {
843 PGresult *res;
844
845 do
846 {
847 res = PQgetResult(state->con);
848 if (res)
849 PQclear(res);
850 } while (res);
851 }
852
853 /* qsort comparator for Variable array */
854 static int
compareVariableNames(const void * v1,const void * v2)855 compareVariableNames(const void *v1, const void *v2)
856 {
857 return strcmp(((const Variable *) v1)->name,
858 ((const Variable *) v2)->name);
859 }
860
861 /* Locate a variable by name; returns NULL if unknown */
862 static Variable *
lookupVariable(CState * st,char * name)863 lookupVariable(CState *st, char *name)
864 {
865 Variable key;
866
867 /* On some versions of Solaris, bsearch of zero items dumps core */
868 if (st->nvariables <= 0)
869 return NULL;
870
871 /* Sort if we have to */
872 if (!st->vars_sorted)
873 {
874 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
875 compareVariableNames);
876 st->vars_sorted = true;
877 }
878
879 /* Now we can search */
880 key.name = name;
881 return (Variable *) bsearch((void *) &key,
882 (void *) st->variables,
883 st->nvariables,
884 sizeof(Variable),
885 compareVariableNames);
886 }
887
888 /* Get the value of a variable, in string form; returns NULL if unknown */
889 static char *
getVariable(CState * st,char * name)890 getVariable(CState *st, char *name)
891 {
892 Variable *var;
893 char stringform[64];
894
895 var = lookupVariable(st, name);
896 if (var == NULL)
897 return NULL; /* not found */
898
899 if (var->value)
900 return var->value; /* we have it in string form */
901
902 /* We need to produce a string equivalent of the numeric value */
903 Assert(var->is_numeric);
904 if (var->num_value.type == PGBT_INT)
905 snprintf(stringform, sizeof(stringform),
906 INT64_FORMAT, var->num_value.u.ival);
907 else
908 {
909 Assert(var->num_value.type == PGBT_DOUBLE);
910 snprintf(stringform, sizeof(stringform),
911 "%.*g", DBL_DIG, var->num_value.u.dval);
912 }
913 var->value = pg_strdup(stringform);
914 return var->value;
915 }
916
917 /* Try to convert variable to numeric form; return false on failure */
918 static bool
makeVariableNumeric(Variable * var)919 makeVariableNumeric(Variable *var)
920 {
921 if (var->is_numeric)
922 return true; /* no work */
923
924 if (is_an_int(var->value))
925 {
926 setIntValue(&var->num_value, strtoint64(var->value));
927 var->is_numeric = true;
928 }
929 else /* type should be double */
930 {
931 double dv;
932 char xs;
933
934 if (sscanf(var->value, "%lf%c", &dv, &xs) != 1)
935 {
936 fprintf(stderr,
937 "malformed variable \"%s\" value: \"%s\"\n",
938 var->name, var->value);
939 return false;
940 }
941 setDoubleValue(&var->num_value, dv);
942 var->is_numeric = true;
943 }
944 return true;
945 }
946
947 /* check whether the name consists of alphabets, numerals and underscores. */
948 static bool
isLegalVariableName(const char * name)949 isLegalVariableName(const char *name)
950 {
951 int i;
952
953 for (i = 0; name[i] != '\0'; i++)
954 {
955 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
956 return false;
957 }
958
959 return (i > 0); /* must be non-empty */
960 }
961
962 /*
963 * Lookup a variable by name, creating it if need be.
964 * Caller is expected to assign a value to the variable.
965 * Returns NULL on failure (bad name).
966 */
967 static Variable *
lookupCreateVariable(CState * st,const char * context,char * name)968 lookupCreateVariable(CState *st, const char *context, char *name)
969 {
970 Variable *var;
971
972 var = lookupVariable(st, name);
973 if (var == NULL)
974 {
975 Variable *newvars;
976
977 /*
978 * Check for the name only when declaring a new variable to avoid
979 * overhead.
980 */
981 if (!isLegalVariableName(name))
982 {
983 fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
984 context, name);
985 return NULL;
986 }
987
988 /* Create variable at the end of the array */
989 if (st->variables)
990 newvars = (Variable *) pg_realloc(st->variables,
991 (st->nvariables + 1) * sizeof(Variable));
992 else
993 newvars = (Variable *) pg_malloc(sizeof(Variable));
994
995 st->variables = newvars;
996
997 var = &newvars[st->nvariables];
998
999 var->name = pg_strdup(name);
1000 var->value = NULL;
1001 /* caller is expected to initialize remaining fields */
1002
1003 st->nvariables++;
1004 /* we don't re-sort the array till we have to */
1005 st->vars_sorted = false;
1006 }
1007
1008 return var;
1009 }
1010
1011 /* Assign a string value to a variable, creating it if need be */
1012 /* Returns false on failure (bad name) */
1013 static bool
putVariable(CState * st,const char * context,char * name,const char * value)1014 putVariable(CState *st, const char *context, char *name, const char *value)
1015 {
1016 Variable *var;
1017 char *val;
1018
1019 var = lookupCreateVariable(st, context, name);
1020 if (!var)
1021 return false;
1022
1023 /* dup then free, in case value is pointing at this variable */
1024 val = pg_strdup(value);
1025
1026 if (var->value)
1027 free(var->value);
1028 var->value = val;
1029 var->is_numeric = false;
1030
1031 return true;
1032 }
1033
1034 /* Assign a numeric value to a variable, creating it if need be */
1035 /* Returns false on failure (bad name) */
1036 static bool
putVariableNumber(CState * st,const char * context,char * name,const PgBenchValue * value)1037 putVariableNumber(CState *st, const char *context, char *name,
1038 const PgBenchValue *value)
1039 {
1040 Variable *var;
1041
1042 var = lookupCreateVariable(st, context, name);
1043 if (!var)
1044 return false;
1045
1046 if (var->value)
1047 free(var->value);
1048 var->value = NULL;
1049 var->is_numeric = true;
1050 var->num_value = *value;
1051
1052 return true;
1053 }
1054
1055 /* Assign an integer value to a variable, creating it if need be */
1056 /* Returns false on failure (bad name) */
1057 static bool
putVariableInt(CState * st,const char * context,char * name,int64 value)1058 putVariableInt(CState *st, const char *context, char *name, int64 value)
1059 {
1060 PgBenchValue val;
1061
1062 setIntValue(&val, value);
1063 return putVariableNumber(st, context, name, &val);
1064 }
1065
1066 static char *
parseVariable(const char * sql,int * eaten)1067 parseVariable(const char *sql, int *eaten)
1068 {
1069 int i = 0;
1070 char *name;
1071
1072 do
1073 {
1074 i++;
1075 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
1076 if (i == 1)
1077 return NULL;
1078
1079 name = pg_malloc(i);
1080 memcpy(name, &sql[1], i - 1);
1081 name[i - 1] = '\0';
1082
1083 *eaten = i;
1084 return name;
1085 }
1086
1087 static char *
replaceVariable(char ** sql,char * param,int len,char * value)1088 replaceVariable(char **sql, char *param, int len, char *value)
1089 {
1090 int valueln = strlen(value);
1091
1092 if (valueln > len)
1093 {
1094 size_t offset = param - *sql;
1095
1096 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1097 param = *sql + offset;
1098 }
1099
1100 if (valueln != len)
1101 memmove(param + valueln, param + len, strlen(param + len) + 1);
1102 memcpy(param, value, valueln);
1103
1104 return param + valueln;
1105 }
1106
1107 static char *
assignVariables(CState * st,char * sql)1108 assignVariables(CState *st, char *sql)
1109 {
1110 char *p,
1111 *name,
1112 *val;
1113
1114 p = sql;
1115 while ((p = strchr(p, ':')) != NULL)
1116 {
1117 int eaten;
1118
1119 name = parseVariable(p, &eaten);
1120 if (name == NULL)
1121 {
1122 while (*p == ':')
1123 {
1124 p++;
1125 }
1126 continue;
1127 }
1128
1129 val = getVariable(st, name);
1130 free(name);
1131 if (val == NULL)
1132 {
1133 p++;
1134 continue;
1135 }
1136
1137 p = replaceVariable(&sql, p, eaten, val);
1138 }
1139
1140 return sql;
1141 }
1142
1143 static void
getQueryParams(CState * st,const Command * command,const char ** params)1144 getQueryParams(CState *st, const Command *command, const char **params)
1145 {
1146 int i;
1147
1148 for (i = 0; i < command->argc - 1; i++)
1149 params[i] = getVariable(st, command->argv[i + 1]);
1150 }
1151
1152 /* get a value as an int, tell if there is a problem */
1153 static bool
coerceToInt(PgBenchValue * pval,int64 * ival)1154 coerceToInt(PgBenchValue *pval, int64 *ival)
1155 {
1156 if (pval->type == PGBT_INT)
1157 {
1158 *ival = pval->u.ival;
1159 return true;
1160 }
1161 else
1162 {
1163 double dval;
1164
1165 Assert(pval->type == PGBT_DOUBLE);
1166 dval = rint(pval->u.dval);
1167 if (isnan(dval) || !FLOAT8_FITS_IN_INT64(dval))
1168 {
1169 fprintf(stderr, "double to int overflow for %f\n", dval);
1170 return false;
1171 }
1172 *ival = (int64) dval;
1173 return true;
1174 }
1175 }
1176
1177 /* get a value as a double, or tell if there is a problem */
1178 static bool
coerceToDouble(PgBenchValue * pval,double * dval)1179 coerceToDouble(PgBenchValue *pval, double *dval)
1180 {
1181 if (pval->type == PGBT_DOUBLE)
1182 {
1183 *dval = pval->u.dval;
1184 return true;
1185 }
1186 else
1187 {
1188 Assert(pval->type == PGBT_INT);
1189 *dval = (double) pval->u.ival;
1190 return true;
1191 }
1192 }
1193
1194 /* assign an integer value */
1195 static void
setIntValue(PgBenchValue * pv,int64 ival)1196 setIntValue(PgBenchValue *pv, int64 ival)
1197 {
1198 pv->type = PGBT_INT;
1199 pv->u.ival = ival;
1200 }
1201
1202 /* assign a double value */
1203 static void
setDoubleValue(PgBenchValue * pv,double dval)1204 setDoubleValue(PgBenchValue *pv, double dval)
1205 {
1206 pv->type = PGBT_DOUBLE;
1207 pv->u.dval = dval;
1208 }
1209
1210 /* maximum number of function arguments */
1211 #define MAX_FARGS 16
1212
1213 /*
1214 * Recursive evaluation of functions
1215 */
1216 static bool
evalFunc(TState * thread,CState * st,PgBenchFunction func,PgBenchExprLink * args,PgBenchValue * retval)1217 evalFunc(TState *thread, CState *st,
1218 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
1219 {
1220 /* evaluate all function arguments */
1221 int nargs = 0;
1222 PgBenchValue vargs[MAX_FARGS];
1223 PgBenchExprLink *l = args;
1224
1225 for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
1226 if (!evaluateExpr(thread, st, l->expr, &vargs[nargs]))
1227 return false;
1228
1229 if (l != NULL)
1230 {
1231 fprintf(stderr,
1232 "too many function arguments, maximum is %d\n", MAX_FARGS);
1233 return false;
1234 }
1235
1236 /* then evaluate function */
1237 switch (func)
1238 {
1239 /* overloaded operators */
1240 case PGBENCH_ADD:
1241 case PGBENCH_SUB:
1242 case PGBENCH_MUL:
1243 case PGBENCH_DIV:
1244 case PGBENCH_MOD:
1245 {
1246 PgBenchValue *lval = &vargs[0],
1247 *rval = &vargs[1];
1248
1249 Assert(nargs == 2);
1250
1251 /* overloaded type management, double if some double */
1252 if ((lval->type == PGBT_DOUBLE ||
1253 rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
1254 {
1255 double ld,
1256 rd;
1257
1258 if (!coerceToDouble(lval, &ld) ||
1259 !coerceToDouble(rval, &rd))
1260 return false;
1261
1262 switch (func)
1263 {
1264 case PGBENCH_ADD:
1265 setDoubleValue(retval, ld + rd);
1266 return true;
1267
1268 case PGBENCH_SUB:
1269 setDoubleValue(retval, ld - rd);
1270 return true;
1271
1272 case PGBENCH_MUL:
1273 setDoubleValue(retval, ld * rd);
1274 return true;
1275
1276 case PGBENCH_DIV:
1277 setDoubleValue(retval, ld / rd);
1278 return true;
1279
1280 default:
1281 /* cannot get here */
1282 Assert(0);
1283 }
1284 }
1285 else /* we have integer operands, or % */
1286 {
1287 int64 li,
1288 ri;
1289
1290 if (!coerceToInt(lval, &li) ||
1291 !coerceToInt(rval, &ri))
1292 return false;
1293
1294 switch (func)
1295 {
1296 case PGBENCH_ADD:
1297 setIntValue(retval, li + ri);
1298 return true;
1299
1300 case PGBENCH_SUB:
1301 setIntValue(retval, li - ri);
1302 return true;
1303
1304 case PGBENCH_MUL:
1305 setIntValue(retval, li * ri);
1306 return true;
1307
1308 case PGBENCH_DIV:
1309 case PGBENCH_MOD:
1310 if (ri == 0)
1311 {
1312 fprintf(stderr, "division by zero\n");
1313 return false;
1314 }
1315 /* special handling of -1 divisor */
1316 if (ri == -1)
1317 {
1318 if (func == PGBENCH_DIV)
1319 {
1320 /* overflow check (needed for INT64_MIN) */
1321 if (li == PG_INT64_MIN)
1322 {
1323 fprintf(stderr, "bigint out of range\n");
1324 return false;
1325 }
1326 else
1327 setIntValue(retval, -li);
1328 }
1329 else
1330 setIntValue(retval, 0);
1331 return true;
1332 }
1333 /* else divisor is not -1 */
1334 if (func == PGBENCH_DIV)
1335 setIntValue(retval, li / ri);
1336 else /* func == PGBENCH_MOD */
1337 setIntValue(retval, li % ri);
1338
1339 return true;
1340
1341 default:
1342 /* cannot get here */
1343 Assert(0);
1344 }
1345 }
1346 }
1347
1348 /* no arguments */
1349 case PGBENCH_PI:
1350 setDoubleValue(retval, M_PI);
1351 return true;
1352
1353 /* 1 overloaded argument */
1354 case PGBENCH_ABS:
1355 {
1356 PgBenchValue *varg = &vargs[0];
1357
1358 Assert(nargs == 1);
1359
1360 if (varg->type == PGBT_INT)
1361 {
1362 int64 i = varg->u.ival;
1363
1364 setIntValue(retval, i < 0 ? -i : i);
1365 }
1366 else
1367 {
1368 double d = varg->u.dval;
1369
1370 Assert(varg->type == PGBT_DOUBLE);
1371 setDoubleValue(retval, d < 0.0 ? -d : d);
1372 }
1373
1374 return true;
1375 }
1376
1377 case PGBENCH_DEBUG:
1378 {
1379 PgBenchValue *varg = &vargs[0];
1380
1381 Assert(nargs == 1);
1382
1383 fprintf(stderr, "debug(script=%d,command=%d): ",
1384 st->use_file, st->state + 1);
1385
1386 if (varg->type == PGBT_INT)
1387 fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
1388 else
1389 {
1390 Assert(varg->type == PGBT_DOUBLE);
1391 fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
1392 }
1393
1394 *retval = *varg;
1395
1396 return true;
1397 }
1398
1399 /* 1 double argument */
1400 case PGBENCH_DOUBLE:
1401 case PGBENCH_SQRT:
1402 {
1403 double dval;
1404
1405 Assert(nargs == 1);
1406
1407 if (!coerceToDouble(&vargs[0], &dval))
1408 return false;
1409
1410 if (func == PGBENCH_SQRT)
1411 dval = sqrt(dval);
1412
1413 setDoubleValue(retval, dval);
1414 return true;
1415 }
1416
1417 /* 1 int argument */
1418 case PGBENCH_INT:
1419 {
1420 int64 ival;
1421
1422 Assert(nargs == 1);
1423
1424 if (!coerceToInt(&vargs[0], &ival))
1425 return false;
1426
1427 setIntValue(retval, ival);
1428 return true;
1429 }
1430
1431 /* variable number of arguments */
1432 case PGBENCH_LEAST:
1433 case PGBENCH_GREATEST:
1434 {
1435 bool havedouble;
1436 int i;
1437
1438 Assert(nargs >= 1);
1439
1440 /* need double result if any input is double */
1441 havedouble = false;
1442 for (i = 0; i < nargs; i++)
1443 {
1444 if (vargs[i].type == PGBT_DOUBLE)
1445 {
1446 havedouble = true;
1447 break;
1448 }
1449 }
1450 if (havedouble)
1451 {
1452 double extremum;
1453
1454 if (!coerceToDouble(&vargs[0], &extremum))
1455 return false;
1456 for (i = 1; i < nargs; i++)
1457 {
1458 double dval;
1459
1460 if (!coerceToDouble(&vargs[i], &dval))
1461 return false;
1462 if (func == PGBENCH_LEAST)
1463 extremum = Min(extremum, dval);
1464 else
1465 extremum = Max(extremum, dval);
1466 }
1467 setDoubleValue(retval, extremum);
1468 }
1469 else
1470 {
1471 int64 extremum;
1472
1473 if (!coerceToInt(&vargs[0], &extremum))
1474 return false;
1475 for (i = 1; i < nargs; i++)
1476 {
1477 int64 ival;
1478
1479 if (!coerceToInt(&vargs[i], &ival))
1480 return false;
1481 if (func == PGBENCH_LEAST)
1482 extremum = Min(extremum, ival);
1483 else
1484 extremum = Max(extremum, ival);
1485 }
1486 setIntValue(retval, extremum);
1487 }
1488 return true;
1489 }
1490
1491 /* random functions */
1492 case PGBENCH_RANDOM:
1493 case PGBENCH_RANDOM_EXPONENTIAL:
1494 case PGBENCH_RANDOM_GAUSSIAN:
1495 {
1496 int64 imin,
1497 imax;
1498
1499 Assert(nargs >= 2);
1500
1501 if (!coerceToInt(&vargs[0], &imin) ||
1502 !coerceToInt(&vargs[1], &imax))
1503 return false;
1504
1505 /* check random range */
1506 if (imin > imax)
1507 {
1508 fprintf(stderr, "empty range given to random\n");
1509 return false;
1510 }
1511 else if (imax - imin < 0 || (imax - imin) + 1 < 0)
1512 {
1513 /* prevent int overflows in random functions */
1514 fprintf(stderr, "random range is too large\n");
1515 return false;
1516 }
1517
1518 if (func == PGBENCH_RANDOM)
1519 {
1520 Assert(nargs == 2);
1521 setIntValue(retval, getrand(thread, imin, imax));
1522 }
1523 else /* gaussian & exponential */
1524 {
1525 double param;
1526
1527 Assert(nargs == 3);
1528
1529 if (!coerceToDouble(&vargs[2], ¶m))
1530 return false;
1531
1532 if (func == PGBENCH_RANDOM_GAUSSIAN)
1533 {
1534 if (param < MIN_GAUSSIAN_PARAM)
1535 {
1536 fprintf(stderr,
1537 "gaussian parameter must be at least %f "
1538 "(not %f)\n", MIN_GAUSSIAN_PARAM, param);
1539 return false;
1540 }
1541
1542 setIntValue(retval,
1543 getGaussianRand(thread, imin, imax, param));
1544 }
1545 else /* exponential */
1546 {
1547 if (param <= 0.0)
1548 {
1549 fprintf(stderr,
1550 "exponential parameter must be greater than zero"
1551 " (got %f)\n", param);
1552 return false;
1553 }
1554
1555 setIntValue(retval,
1556 getExponentialRand(thread, imin, imax, param));
1557 }
1558 }
1559
1560 return true;
1561 }
1562
1563 default:
1564 /* cannot get here */
1565 Assert(0);
1566 /* dead code to avoid a compiler warning */
1567 return false;
1568 }
1569 }
1570
1571 /*
1572 * Recursive evaluation of an expression in a pgbench script
1573 * using the current state of variables.
1574 * Returns whether the evaluation was ok,
1575 * the value itself is returned through the retval pointer.
1576 */
1577 static bool
evaluateExpr(TState * thread,CState * st,PgBenchExpr * expr,PgBenchValue * retval)1578 evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr, PgBenchValue *retval)
1579 {
1580 switch (expr->etype)
1581 {
1582 case ENODE_CONSTANT:
1583 {
1584 *retval = expr->u.constant;
1585 return true;
1586 }
1587
1588 case ENODE_VARIABLE:
1589 {
1590 Variable *var;
1591
1592 if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
1593 {
1594 fprintf(stderr, "undefined variable \"%s\"\n",
1595 expr->u.variable.varname);
1596 return false;
1597 }
1598
1599 if (!makeVariableNumeric(var))
1600 return false;
1601
1602 *retval = var->num_value;
1603 return true;
1604 }
1605
1606 case ENODE_FUNCTION:
1607 return evalFunc(thread, st,
1608 expr->u.function.function,
1609 expr->u.function.args,
1610 retval);
1611
1612 default:
1613 /* internal error which should never occur */
1614 fprintf(stderr, "unexpected enode type in evaluation: %d\n",
1615 expr->etype);
1616 exit(1);
1617 }
1618 }
1619
1620 /*
1621 * Run a shell command. The result is assigned to the variable if not NULL.
1622 * Return true if succeeded, or false on error.
1623 */
1624 static bool
runShellCommand(CState * st,char * variable,char ** argv,int argc)1625 runShellCommand(CState *st, char *variable, char **argv, int argc)
1626 {
1627 char command[SHELL_COMMAND_SIZE];
1628 int i,
1629 len = 0;
1630 FILE *fp;
1631 char res[64];
1632 char *endptr;
1633 int retval;
1634
1635 /*----------
1636 * Join arguments with whitespace separators. Arguments starting with
1637 * exactly one colon are treated as variables:
1638 * name - append a string "name"
1639 * :var - append a variable named 'var'
1640 * ::name - append a string ":name"
1641 *----------
1642 */
1643 for (i = 0; i < argc; i++)
1644 {
1645 char *arg;
1646 int arglen;
1647
1648 if (argv[i][0] != ':')
1649 {
1650 arg = argv[i]; /* a string literal */
1651 }
1652 else if (argv[i][1] == ':')
1653 {
1654 arg = argv[i] + 1; /* a string literal starting with colons */
1655 }
1656 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1657 {
1658 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1659 argv[0], argv[i]);
1660 return false;
1661 }
1662
1663 arglen = strlen(arg);
1664 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1665 {
1666 fprintf(stderr, "%s: shell command is too long\n", argv[0]);
1667 return false;
1668 }
1669
1670 if (i > 0)
1671 command[len++] = ' ';
1672 memcpy(command + len, arg, arglen);
1673 len += arglen;
1674 }
1675
1676 command[len] = '\0';
1677
1678 /* Fast path for non-assignment case */
1679 if (variable == NULL)
1680 {
1681 if (system(command))
1682 {
1683 if (!timer_exceeded)
1684 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1685 return false;
1686 }
1687 return true;
1688 }
1689
1690 /* Execute the command with pipe and read the standard output. */
1691 if ((fp = popen(command, "r")) == NULL)
1692 {
1693 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1694 return false;
1695 }
1696 if (fgets(res, sizeof(res), fp) == NULL)
1697 {
1698 if (!timer_exceeded)
1699 fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
1700 (void) pclose(fp);
1701 return false;
1702 }
1703 if (pclose(fp) < 0)
1704 {
1705 fprintf(stderr, "%s: could not close shell command\n", argv[0]);
1706 return false;
1707 }
1708
1709 /* Check whether the result is an integer and assign it to the variable */
1710 retval = (int) strtol(res, &endptr, 10);
1711 while (*endptr != '\0' && isspace((unsigned char) *endptr))
1712 endptr++;
1713 if (*res == '\0' || *endptr != '\0')
1714 {
1715 fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
1716 argv[0], res);
1717 return false;
1718 }
1719 if (!putVariableInt(st, "setshell", variable, retval))
1720 return false;
1721
1722 #ifdef DEBUG
1723 printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
1724 #endif
1725 return true;
1726 }
1727
1728 #define MAX_PREPARE_NAME 32
1729 static void
preparedStatementName(char * buffer,int file,int state)1730 preparedStatementName(char *buffer, int file, int state)
1731 {
1732 sprintf(buffer, "P%d_%d", file, state);
1733 }
1734
1735 static bool
clientDone(CState * st)1736 clientDone(CState *st)
1737 {
1738 if (st->con != NULL)
1739 {
1740 PQfinish(st->con);
1741 st->con = NULL;
1742 }
1743 return false; /* always false */
1744 }
1745
1746 /* return a script number with a weighted choice. */
1747 static int
chooseScript(TState * thread)1748 chooseScript(TState *thread)
1749 {
1750 int i = 0;
1751 int64 w;
1752
1753 if (num_scripts == 1)
1754 return 0;
1755
1756 w = getrand(thread, 0, total_weight - 1);
1757 do
1758 {
1759 w -= sql_script[i++].weight;
1760 } while (w >= 0);
1761
1762 return i - 1;
1763 }
1764
1765 /* return false iff client should be disconnected */
1766 static bool
doCustom(TState * thread,CState * st,StatsData * agg)1767 doCustom(TState *thread, CState *st, StatsData *agg)
1768 {
1769 PGresult *res;
1770 Command **commands;
1771 bool trans_needs_throttle = false;
1772 instr_time now;
1773
1774 /*
1775 * gettimeofday() isn't free, so we get the current timestamp lazily the
1776 * first time it's needed, and reuse the same value throughout this
1777 * function after that. This also ensures that e.g. the calculated latency
1778 * reported in the log file and in the totals are the same. Zero means
1779 * "not set yet". Reset "now" when we step to the next command with "goto
1780 * top", though.
1781 */
1782 top:
1783 INSTR_TIME_SET_ZERO(now);
1784
1785 commands = sql_script[st->use_file].commands;
1786
1787 /*
1788 * Handle throttling once per transaction by sleeping. It is simpler to
1789 * do this here rather than at the end, because so much complicated logic
1790 * happens below when statements finish.
1791 */
1792 if (throttle_delay && !st->is_throttled)
1793 {
1794 /*
1795 * Generate a delay such that the series of delays will approximate a
1796 * Poisson distribution centered on the throttle_delay time.
1797 *
1798 * If transactions are too slow or a given wait is shorter than a
1799 * transaction, the next transaction will start right away.
1800 */
1801 int64 wait = getPoissonRand(thread, throttle_delay);
1802
1803 thread->throttle_trigger += wait;
1804 st->txn_scheduled = thread->throttle_trigger;
1805
1806 /* stop client if next transaction is beyond pgbench end of execution */
1807 if (duration > 0 && st->txn_scheduled > end_time)
1808 return clientDone(st);
1809
1810 /*
1811 * If this --latency-limit is used, and this slot is already late so
1812 * that the transaction will miss the latency limit even if it
1813 * completed immediately, we skip this time slot and iterate till the
1814 * next slot that isn't late yet.
1815 */
1816 if (latency_limit)
1817 {
1818 int64 now_us;
1819
1820 if (INSTR_TIME_IS_ZERO(now))
1821 INSTR_TIME_SET_CURRENT(now);
1822 now_us = INSTR_TIME_GET_MICROSEC(now);
1823 while (thread->throttle_trigger < now_us - latency_limit)
1824 {
1825 processXactStats(thread, st, &now, true, agg);
1826 /* next rendez-vous */
1827 wait = getPoissonRand(thread, throttle_delay);
1828 thread->throttle_trigger += wait;
1829 st->txn_scheduled = thread->throttle_trigger;
1830 }
1831 }
1832
1833 st->sleep_until = st->txn_scheduled;
1834 st->sleeping = true;
1835 st->throttling = true;
1836 st->is_throttled = true;
1837 if (debug)
1838 fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
1839 st->id, wait);
1840 }
1841
1842 if (st->sleeping)
1843 { /* are we sleeping? */
1844 if (INSTR_TIME_IS_ZERO(now))
1845 INSTR_TIME_SET_CURRENT(now);
1846 if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
1847 return true; /* Still sleeping, nothing to do here */
1848 /* Else done sleeping, go ahead with next command */
1849 st->sleeping = false;
1850 st->throttling = false;
1851 }
1852
1853 if (st->listen)
1854 { /* are we receiver? */
1855 if (commands[st->state]->type == SQL_COMMAND)
1856 {
1857 if (debug)
1858 fprintf(stderr, "client %d receiving\n", st->id);
1859 if (!PQconsumeInput(st->con))
1860 { /* there's something wrong */
1861 fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
1862 return clientDone(st);
1863 }
1864 if (PQisBusy(st->con))
1865 return true; /* don't have the whole result yet */
1866 }
1867
1868 /*
1869 * command finished: accumulate per-command execution times in
1870 * thread-local data structure, if per-command latencies are requested
1871 */
1872 if (is_latencies)
1873 {
1874 if (INSTR_TIME_IS_ZERO(now))
1875 INSTR_TIME_SET_CURRENT(now);
1876
1877 /* XXX could use a mutex here, but we choose not to */
1878 addToSimpleStats(&commands[st->state]->stats,
1879 INSTR_TIME_GET_DOUBLE(now) -
1880 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
1881 }
1882
1883 /* transaction finished: calculate latency and log the transaction */
1884 if (commands[st->state + 1] == NULL)
1885 {
1886 if (progress || throttle_delay || latency_limit ||
1887 per_script_stats || use_log)
1888 processXactStats(thread, st, &now, false, agg);
1889 else
1890 thread->stats.cnt++;
1891 }
1892
1893 if (commands[st->state]->type == SQL_COMMAND)
1894 {
1895 /*
1896 * Read and discard the query result; note this is not included in
1897 * the statement latency numbers.
1898 */
1899 res = PQgetResult(st->con);
1900 switch (PQresultStatus(res))
1901 {
1902 case PGRES_COMMAND_OK:
1903 case PGRES_TUPLES_OK:
1904 break; /* OK */
1905 default:
1906 fprintf(stderr, "client %d aborted in state %d: %s",
1907 st->id, st->state, PQerrorMessage(st->con));
1908 PQclear(res);
1909 return clientDone(st);
1910 }
1911 PQclear(res);
1912 discard_response(st);
1913 }
1914
1915 if (commands[st->state + 1] == NULL)
1916 {
1917 if (is_connect)
1918 {
1919 PQfinish(st->con);
1920 st->con = NULL;
1921 }
1922
1923 ++st->cnt;
1924 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1925 return clientDone(st); /* exit success */
1926 }
1927
1928 /* increment state counter */
1929 st->state++;
1930 if (commands[st->state] == NULL)
1931 {
1932 st->state = 0;
1933 st->use_file = chooseScript(thread);
1934 commands = sql_script[st->use_file].commands;
1935 if (debug)
1936 fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
1937 sql_script[st->use_file].desc);
1938 st->is_throttled = false;
1939
1940 /*
1941 * No transaction is underway anymore, which means there is
1942 * nothing to listen to right now. When throttling rate limits
1943 * are active, a sleep will happen next, as the next transaction
1944 * starts. And then in any case the next SQL command will set
1945 * listen back to true.
1946 */
1947 st->listen = false;
1948 trans_needs_throttle = (throttle_delay > 0);
1949 }
1950 }
1951
1952 if (st->con == NULL)
1953 {
1954 instr_time start,
1955 end;
1956
1957 INSTR_TIME_SET_CURRENT(start);
1958 if ((st->con = doConnect()) == NULL)
1959 {
1960 fprintf(stderr, "client %d aborted while establishing connection\n",
1961 st->id);
1962 return clientDone(st);
1963 }
1964 INSTR_TIME_SET_CURRENT(end);
1965 INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
1966
1967 /* Reset session-local state */
1968 st->listen = false;
1969 st->sleeping = false;
1970 st->throttling = false;
1971 memset(st->prepared, 0, sizeof(st->prepared));
1972 }
1973
1974 /*
1975 * This ensures that a throttling delay is inserted before proceeding with
1976 * sql commands, after the first transaction. The first transaction
1977 * throttling is performed when first entering doCustom.
1978 */
1979 if (trans_needs_throttle)
1980 {
1981 trans_needs_throttle = false;
1982 goto top;
1983 }
1984
1985 /* Record transaction start time under logging, progress or throttling */
1986 if ((use_log || progress || throttle_delay || latency_limit ||
1987 per_script_stats) && st->state == 0)
1988 {
1989 INSTR_TIME_SET_CURRENT(st->txn_begin);
1990
1991 /*
1992 * When not throttling, this is also the transaction's scheduled start
1993 * time.
1994 */
1995 if (!throttle_delay)
1996 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
1997 }
1998
1999 /* Record statement start time if per-command latencies are requested */
2000 if (is_latencies)
2001 INSTR_TIME_SET_CURRENT(st->stmt_begin);
2002
2003 if (commands[st->state]->type == SQL_COMMAND)
2004 {
2005 const Command *command = commands[st->state];
2006 int r;
2007
2008 if (querymode == QUERY_SIMPLE)
2009 {
2010 char *sql;
2011
2012 sql = pg_strdup(command->argv[0]);
2013 sql = assignVariables(st, sql);
2014
2015 if (debug)
2016 fprintf(stderr, "client %d sending %s\n", st->id, sql);
2017 r = PQsendQuery(st->con, sql);
2018 free(sql);
2019 }
2020 else if (querymode == QUERY_EXTENDED)
2021 {
2022 const char *sql = command->argv[0];
2023 const char *params[MAX_ARGS];
2024
2025 getQueryParams(st, command, params);
2026
2027 if (debug)
2028 fprintf(stderr, "client %d sending %s\n", st->id, sql);
2029 r = PQsendQueryParams(st->con, sql, command->argc - 1,
2030 NULL, params, NULL, NULL, 0);
2031 }
2032 else if (querymode == QUERY_PREPARED)
2033 {
2034 char name[MAX_PREPARE_NAME];
2035 const char *params[MAX_ARGS];
2036
2037 if (!st->prepared[st->use_file])
2038 {
2039 int j;
2040
2041 for (j = 0; commands[j] != NULL; j++)
2042 {
2043 PGresult *res;
2044 char name[MAX_PREPARE_NAME];
2045
2046 if (commands[j]->type != SQL_COMMAND)
2047 continue;
2048 preparedStatementName(name, st->use_file, j);
2049 res = PQprepare(st->con, name,
2050 commands[j]->argv[0], commands[j]->argc - 1, NULL);
2051 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2052 fprintf(stderr, "%s", PQerrorMessage(st->con));
2053 PQclear(res);
2054 }
2055 st->prepared[st->use_file] = true;
2056 }
2057
2058 getQueryParams(st, command, params);
2059 preparedStatementName(name, st->use_file, st->state);
2060
2061 if (debug)
2062 fprintf(stderr, "client %d sending %s\n", st->id, name);
2063 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
2064 params, NULL, NULL, 0);
2065 }
2066 else /* unknown sql mode */
2067 r = 0;
2068
2069 if (r == 0)
2070 {
2071 if (debug)
2072 fprintf(stderr, "client %d could not send %s\n",
2073 st->id, command->argv[0]);
2074 st->ecnt++;
2075 }
2076 else
2077 st->listen = true; /* flags that should be listened */
2078 }
2079 else if (commands[st->state]->type == META_COMMAND)
2080 {
2081 int argc = commands[st->state]->argc,
2082 i;
2083 char **argv = commands[st->state]->argv;
2084
2085 if (debug)
2086 {
2087 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
2088 for (i = 1; i < argc; i++)
2089 fprintf(stderr, " %s", argv[i]);
2090 fprintf(stderr, "\n");
2091 }
2092
2093 if (pg_strcasecmp(argv[0], "set") == 0)
2094 {
2095 PgBenchExpr *expr = commands[st->state]->expr;
2096 PgBenchValue result;
2097
2098 if (!evaluateExpr(thread, st, expr, &result))
2099 {
2100 st->ecnt++;
2101 return true;
2102 }
2103
2104 if (!putVariableNumber(st, argv[0], argv[1], &result))
2105 {
2106 st->ecnt++;
2107 return true;
2108 }
2109
2110 st->listen = true;
2111 }
2112 else if (pg_strcasecmp(argv[0], "sleep") == 0)
2113 {
2114 char *var;
2115 int usec;
2116 instr_time now;
2117
2118 if (*argv[1] == ':')
2119 {
2120 if ((var = getVariable(st, argv[1] + 1)) == NULL)
2121 {
2122 fprintf(stderr, "%s: undefined variable \"%s\"\n",
2123 argv[0], argv[1]);
2124 st->ecnt++;
2125 return true;
2126 }
2127 usec = atoi(var);
2128 }
2129 else
2130 usec = atoi(argv[1]);
2131
2132 if (argc > 2)
2133 {
2134 if (pg_strcasecmp(argv[2], "ms") == 0)
2135 usec *= 1000;
2136 else if (pg_strcasecmp(argv[2], "s") == 0)
2137 usec *= 1000000;
2138 }
2139 else
2140 usec *= 1000000;
2141
2142 INSTR_TIME_SET_CURRENT(now);
2143 st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
2144 st->sleeping = true;
2145
2146 st->listen = true;
2147 }
2148 else if (pg_strcasecmp(argv[0], "setshell") == 0)
2149 {
2150 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
2151
2152 if (timer_exceeded) /* timeout */
2153 return clientDone(st);
2154 else if (!ret) /* on error */
2155 {
2156 st->ecnt++;
2157 return true;
2158 }
2159 else /* succeeded */
2160 st->listen = true;
2161 }
2162 else if (pg_strcasecmp(argv[0], "shell") == 0)
2163 {
2164 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
2165
2166 if (timer_exceeded) /* timeout */
2167 return clientDone(st);
2168 else if (!ret) /* on error */
2169 {
2170 st->ecnt++;
2171 return true;
2172 }
2173 else /* succeeded */
2174 st->listen = true;
2175 }
2176
2177 /* after a meta command, immediately proceed with next command */
2178 goto top;
2179 }
2180
2181 return true;
2182 }
2183
2184 /*
2185 * print log entry after completing one transaction.
2186 */
2187 static void
doLog(TState * thread,CState * st,instr_time * now,StatsData * agg,bool skipped,double latency,double lag)2188 doLog(TState *thread, CState *st, instr_time *now,
2189 StatsData *agg, bool skipped, double latency, double lag)
2190 {
2191 FILE *logfile = thread->logfile;
2192
2193 Assert(use_log);
2194
2195 /*
2196 * Skip the log entry if sampling is enabled and this row doesn't belong
2197 * to the random sample.
2198 */
2199 if (sample_rate != 0.0 &&
2200 pg_erand48(thread->random_state) > sample_rate)
2201 return;
2202
2203 /* should we aggregate the results or not? */
2204 if (agg_interval > 0)
2205 {
2206 /*
2207 * Loop until we reach the interval of the current transaction, and
2208 * print all the empty intervals in between (this may happen with very
2209 * low tps, e.g. --rate=0.1).
2210 */
2211 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
2212 {
2213 /* print aggregated report to logfile */
2214 fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
2215 agg->start_time,
2216 agg->cnt,
2217 agg->latency.sum,
2218 agg->latency.sum2,
2219 agg->latency.min,
2220 agg->latency.max);
2221 if (throttle_delay)
2222 {
2223 fprintf(logfile, " %.0f %.0f %.0f %.0f",
2224 agg->lag.sum,
2225 agg->lag.sum2,
2226 agg->lag.min,
2227 agg->lag.max);
2228 if (latency_limit)
2229 fprintf(logfile, " " INT64_FORMAT, agg->skipped);
2230 }
2231 fputc('\n', logfile);
2232
2233 /* reset data and move to next interval */
2234 initStats(agg, agg->start_time + agg_interval);
2235 }
2236
2237 /* accumulate the current transaction */
2238 accumStats(agg, skipped, latency, lag);
2239 }
2240 else
2241 {
2242 /* no, print raw transactions */
2243 #ifndef WIN32
2244
2245 /* This is more than we really ought to know about instr_time */
2246 if (skipped)
2247 fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
2248 st->id, st->cnt, st->use_file,
2249 (long) now->tv_sec, (long) now->tv_usec);
2250 else
2251 fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
2252 st->id, st->cnt, latency, st->use_file,
2253 (long) now->tv_sec, (long) now->tv_usec);
2254 #else
2255
2256 /* On Windows, instr_time doesn't provide a timestamp anyway */
2257 if (skipped)
2258 fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0",
2259 st->id, st->cnt, st->use_file);
2260 else
2261 fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0",
2262 st->id, st->cnt, latency, st->use_file);
2263 #endif
2264 if (throttle_delay)
2265 fprintf(logfile, " %.0f", lag);
2266 fputc('\n', logfile);
2267 }
2268 }
2269
2270 /*
2271 * Accumulate and report statistics at end of a transaction.
2272 *
2273 * (This is also called when a transaction is late and thus skipped.)
2274 */
2275 static void
processXactStats(TState * thread,CState * st,instr_time * now,bool skipped,StatsData * agg)2276 processXactStats(TState *thread, CState *st, instr_time *now,
2277 bool skipped, StatsData *agg)
2278 {
2279 double latency = 0.0,
2280 lag = 0.0;
2281
2282 if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now))
2283 INSTR_TIME_SET_CURRENT(*now);
2284
2285 if (!skipped)
2286 {
2287 /* compute latency & lag */
2288 latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
2289 lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
2290 }
2291
2292 if (progress || throttle_delay || latency_limit)
2293 {
2294 accumStats(&thread->stats, skipped, latency, lag);
2295
2296 /* count transactions over the latency limit, if needed */
2297 if (latency_limit && latency > latency_limit)
2298 thread->latency_late++;
2299 }
2300 else
2301 thread->stats.cnt++;
2302
2303 if (use_log)
2304 doLog(thread, st, now, agg, skipped, latency, lag);
2305
2306 /* XXX could use a mutex here, but we choose not to */
2307 if (per_script_stats)
2308 accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
2309 }
2310
2311
2312 /* discard connections */
2313 static void
disconnect_all(CState * state,int length)2314 disconnect_all(CState *state, int length)
2315 {
2316 int i;
2317
2318 for (i = 0; i < length; i++)
2319 {
2320 if (state[i].con)
2321 {
2322 PQfinish(state[i].con);
2323 state[i].con = NULL;
2324 }
2325 }
2326 }
2327
2328 /* create tables and setup data */
2329 static void
init(bool is_no_vacuum)2330 init(bool is_no_vacuum)
2331 {
2332 /*
2333 * The scale factor at/beyond which 32-bit integers are insufficient for
2334 * storing TPC-B account IDs.
2335 *
2336 * Although the actual threshold is 21474, we use 20000 because it is easier to
2337 * document and remember, and isn't that far away from the real threshold.
2338 */
2339 #define SCALE_32BIT_THRESHOLD 20000
2340
2341 /*
2342 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
2343 * fields in these table declarations were intended to comply with that.
2344 * The pgbench_accounts table complies with that because the "filler"
2345 * column is set to blank-padded empty string. But for all other tables
2346 * the columns default to NULL and so don't actually take any space. We
2347 * could fix that by giving them non-null default values. However, that
2348 * would completely break comparability of pgbench results with prior
2349 * versions. Since pgbench has never pretended to be fully TPC-B compliant
2350 * anyway, we stick with the historical behavior.
2351 */
2352 struct ddlinfo
2353 {
2354 const char *table; /* table name */
2355 const char *smcols; /* column decls if accountIDs are 32 bits */
2356 const char *bigcols; /* column decls if accountIDs are 64 bits */
2357 int declare_fillfactor;
2358 };
2359 static const struct ddlinfo DDLs[] = {
2360 {
2361 "pgbench_history",
2362 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
2363 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
2364 0
2365 },
2366 {
2367 "pgbench_tellers",
2368 "tid int not null,bid int,tbalance int,filler char(84)",
2369 "tid int not null,bid int,tbalance int,filler char(84)",
2370 1
2371 },
2372 {
2373 "pgbench_accounts",
2374 "aid int not null,bid int,abalance int,filler char(84)",
2375 "aid bigint not null,bid int,abalance int,filler char(84)",
2376 1
2377 },
2378 {
2379 "pgbench_branches",
2380 "bid int not null,bbalance int,filler char(88)",
2381 "bid int not null,bbalance int,filler char(88)",
2382 1
2383 }
2384 };
2385 static const char *const DDLINDEXes[] = {
2386 "alter table pgbench_branches add primary key (bid)",
2387 "alter table pgbench_tellers add primary key (tid)",
2388 "alter table pgbench_accounts add primary key (aid)"
2389 };
2390 static const char *const DDLKEYs[] = {
2391 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
2392 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
2393 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
2394 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
2395 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
2396 };
2397
2398 PGconn *con;
2399 PGresult *res;
2400 char sql[256];
2401 int i;
2402 int64 k;
2403
2404 /* used to track elapsed time and estimate of the remaining time */
2405 instr_time start,
2406 diff;
2407 double elapsed_sec,
2408 remaining_sec;
2409 int log_interval = 1;
2410
2411 if ((con = doConnect()) == NULL)
2412 exit(1);
2413
2414 for (i = 0; i < lengthof(DDLs); i++)
2415 {
2416 char opts[256];
2417 char buffer[256];
2418 const struct ddlinfo *ddl = &DDLs[i];
2419 const char *cols;
2420
2421 /* Remove old table, if it exists. */
2422 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
2423 executeStatement(con, buffer);
2424
2425 /* Construct new create table statement. */
2426 opts[0] = '\0';
2427 if (ddl->declare_fillfactor)
2428 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
2429 " with (fillfactor=%d)", fillfactor);
2430 if (tablespace != NULL)
2431 {
2432 char *escape_tablespace;
2433
2434 escape_tablespace = PQescapeIdentifier(con, tablespace,
2435 strlen(tablespace));
2436 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
2437 " tablespace %s", escape_tablespace);
2438 PQfreemem(escape_tablespace);
2439 }
2440
2441 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
2442
2443 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
2444 unlogged_tables ? " unlogged" : "",
2445 ddl->table, cols, opts);
2446
2447 executeStatement(con, buffer);
2448 }
2449
2450 executeStatement(con, "begin");
2451
2452 for (i = 0; i < nbranches * scale; i++)
2453 {
2454 /* "filler" column defaults to NULL */
2455 snprintf(sql, sizeof(sql),
2456 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2457 i + 1);
2458 executeStatement(con, sql);
2459 }
2460
2461 for (i = 0; i < ntellers * scale; i++)
2462 {
2463 /* "filler" column defaults to NULL */
2464 snprintf(sql, sizeof(sql),
2465 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2466 i + 1, i / ntellers + 1);
2467 executeStatement(con, sql);
2468 }
2469
2470 executeStatement(con, "commit");
2471
2472 /*
2473 * fill the pgbench_accounts table with some data
2474 */
2475 fprintf(stderr, "creating tables...\n");
2476
2477 executeStatement(con, "begin");
2478 executeStatement(con, "truncate pgbench_accounts");
2479
2480 res = PQexec(con, "copy pgbench_accounts from stdin");
2481 if (PQresultStatus(res) != PGRES_COPY_IN)
2482 {
2483 fprintf(stderr, "%s", PQerrorMessage(con));
2484 exit(1);
2485 }
2486 PQclear(res);
2487
2488 INSTR_TIME_SET_CURRENT(start);
2489
2490 for (k = 0; k < (int64) naccounts * scale; k++)
2491 {
2492 int64 j = k + 1;
2493
2494 /* "filler" column defaults to blank padded empty string */
2495 snprintf(sql, sizeof(sql),
2496 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2497 j, k / naccounts + 1, 0);
2498 if (PQputline(con, sql))
2499 {
2500 fprintf(stderr, "PQputline failed\n");
2501 exit(1);
2502 }
2503
2504 /*
2505 * If we want to stick with the original logging, print a message each
2506 * 100k inserted rows.
2507 */
2508 if ((!use_quiet) && (j % 100000 == 0))
2509 {
2510 INSTR_TIME_SET_CURRENT(diff);
2511 INSTR_TIME_SUBTRACT(diff, start);
2512
2513 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2514 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2515
2516 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2517 j, (int64) naccounts * scale,
2518 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2519 elapsed_sec, remaining_sec);
2520 }
2521 /* let's not call the timing for each row, but only each 100 rows */
2522 else if (use_quiet && (j % 100 == 0))
2523 {
2524 INSTR_TIME_SET_CURRENT(diff);
2525 INSTR_TIME_SUBTRACT(diff, start);
2526
2527 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2528 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2529
2530 /* have we reached the next interval (or end)? */
2531 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2532 {
2533 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2534 j, (int64) naccounts * scale,
2535 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2536
2537 /* skip to the next interval */
2538 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2539 }
2540 }
2541
2542 }
2543 if (PQputline(con, "\\.\n"))
2544 {
2545 fprintf(stderr, "very last PQputline failed\n");
2546 exit(1);
2547 }
2548 if (PQendcopy(con))
2549 {
2550 fprintf(stderr, "PQendcopy failed\n");
2551 exit(1);
2552 }
2553 executeStatement(con, "commit");
2554
2555 /* vacuum */
2556 if (!is_no_vacuum)
2557 {
2558 fprintf(stderr, "vacuum...\n");
2559 executeStatement(con, "vacuum analyze pgbench_branches");
2560 executeStatement(con, "vacuum analyze pgbench_tellers");
2561 executeStatement(con, "vacuum analyze pgbench_accounts");
2562 executeStatement(con, "vacuum analyze pgbench_history");
2563 }
2564
2565 /*
2566 * create indexes
2567 */
2568 fprintf(stderr, "set primary keys...\n");
2569 for (i = 0; i < lengthof(DDLINDEXes); i++)
2570 {
2571 char buffer[256];
2572
2573 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2574
2575 if (index_tablespace != NULL)
2576 {
2577 char *escape_tablespace;
2578
2579 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2580 strlen(index_tablespace));
2581 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2582 " using index tablespace %s", escape_tablespace);
2583 PQfreemem(escape_tablespace);
2584 }
2585
2586 executeStatement(con, buffer);
2587 }
2588
2589 /*
2590 * create foreign keys
2591 */
2592 if (foreign_keys)
2593 {
2594 fprintf(stderr, "set foreign keys...\n");
2595 for (i = 0; i < lengthof(DDLKEYs); i++)
2596 {
2597 executeStatement(con, DDLKEYs[i]);
2598 }
2599 }
2600
2601 fprintf(stderr, "done.\n");
2602 PQfinish(con);
2603 }
2604
2605 /*
2606 * Parse the raw sql and replace :param to $n.
2607 */
2608 static bool
parseQuery(Command * cmd,const char * raw_sql)2609 parseQuery(Command *cmd, const char *raw_sql)
2610 {
2611 char *sql,
2612 *p;
2613
2614 sql = pg_strdup(raw_sql);
2615 cmd->argc = 1;
2616
2617 p = sql;
2618 while ((p = strchr(p, ':')) != NULL)
2619 {
2620 char var[13];
2621 char *name;
2622 int eaten;
2623
2624 name = parseVariable(p, &eaten);
2625 if (name == NULL)
2626 {
2627 while (*p == ':')
2628 {
2629 p++;
2630 }
2631 continue;
2632 }
2633
2634 if (cmd->argc >= MAX_ARGS)
2635 {
2636 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2637 pg_free(name);
2638 return false;
2639 }
2640
2641 sprintf(var, "$%d", cmd->argc);
2642 p = replaceVariable(&sql, p, eaten, var);
2643
2644 cmd->argv[cmd->argc] = name;
2645 cmd->argc++;
2646 }
2647
2648 cmd->argv[0] = sql;
2649 return true;
2650 }
2651
2652 /*
2653 * Simple error-printing function, might be needed by lexer
2654 */
2655 static void
pgbench_error(const char * fmt,...)2656 pgbench_error(const char *fmt,...)
2657 {
2658 va_list ap;
2659
2660 fflush(stdout);
2661 va_start(ap, fmt);
2662 vfprintf(stderr, _(fmt), ap);
2663 va_end(ap);
2664 }
2665
2666 /*
2667 * syntax error while parsing a script (in practice, while parsing a
2668 * backslash command, because we don't detect syntax errors in SQL)
2669 *
2670 * source: source of script (filename or builtin-script ID)
2671 * lineno: line number within script (count from 1)
2672 * line: whole line of backslash command, if available
2673 * command: backslash command name, if available
2674 * msg: the actual error message
2675 * more: optional extra message
2676 * column: zero-based column number, or -1 if unknown
2677 */
2678 void
syntax_error(const char * source,int lineno,const char * line,const char * command,const char * msg,const char * more,int column)2679 syntax_error(const char *source, int lineno,
2680 const char *line, const char *command,
2681 const char *msg, const char *more, int column)
2682 {
2683 fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2684 if (more != NULL)
2685 fprintf(stderr, " (%s)", more);
2686 if (column >= 0 && line == NULL)
2687 fprintf(stderr, " at column %d", column + 1);
2688 if (command != NULL)
2689 fprintf(stderr, " in command \"%s\"", command);
2690 fprintf(stderr, "\n");
2691 if (line != NULL)
2692 {
2693 fprintf(stderr, "%s\n", line);
2694 if (column >= 0)
2695 {
2696 int i;
2697
2698 for (i = 0; i < column; i++)
2699 fprintf(stderr, " ");
2700 fprintf(stderr, "^ error found here\n");
2701 }
2702 }
2703 exit(1);
2704 }
2705
2706 /*
2707 * Parse a SQL command; return a Command struct, or NULL if it's a comment
2708 *
2709 * On entry, psqlscan.l has collected the command into "buf", so we don't
2710 * really need to do much here except check for comment and set up a
2711 * Command struct.
2712 */
2713 static Command *
process_sql_command(PQExpBuffer buf,const char * source)2714 process_sql_command(PQExpBuffer buf, const char *source)
2715 {
2716 Command *my_command;
2717 char *p;
2718 char *nlpos;
2719
2720 /* Skip any leading whitespace, as well as "--" style comments */
2721 p = buf->data;
2722 for (;;)
2723 {
2724 if (isspace((unsigned char) *p))
2725 p++;
2726 else if (strncmp(p, "--", 2) == 0)
2727 {
2728 p = strchr(p, '\n');
2729 if (p == NULL)
2730 return NULL;
2731 p++;
2732 }
2733 else
2734 break;
2735 }
2736
2737 /* If there's nothing but whitespace and comments, we're done */
2738 if (*p == '\0')
2739 return NULL;
2740
2741 /* Allocate and initialize Command structure */
2742 my_command = (Command *) pg_malloc0(sizeof(Command));
2743 my_command->command_num = num_commands++;
2744 my_command->type = SQL_COMMAND;
2745 my_command->argc = 0;
2746 initSimpleStats(&my_command->stats);
2747
2748 /*
2749 * If SQL command is multi-line, we only want to save the first line as
2750 * the "line" label.
2751 */
2752 nlpos = strchr(p, '\n');
2753 if (nlpos)
2754 {
2755 my_command->line = pg_malloc(nlpos - p + 1);
2756 memcpy(my_command->line, p, nlpos - p);
2757 my_command->line[nlpos - p] = '\0';
2758 }
2759 else
2760 my_command->line = pg_strdup(p);
2761
2762 switch (querymode)
2763 {
2764 case QUERY_SIMPLE:
2765 my_command->argv[0] = pg_strdup(p);
2766 my_command->argc++;
2767 break;
2768 case QUERY_EXTENDED:
2769 case QUERY_PREPARED:
2770 if (!parseQuery(my_command, p))
2771 exit(1);
2772 break;
2773 default:
2774 exit(1);
2775 }
2776
2777 return my_command;
2778 }
2779
2780 /*
2781 * Parse a backslash command; return a Command struct, or NULL if comment
2782 *
2783 * At call, we have scanned only the initial backslash.
2784 */
2785 static Command *
process_backslash_command(PsqlScanState sstate,const char * source)2786 process_backslash_command(PsqlScanState sstate, const char *source)
2787 {
2788 Command *my_command;
2789 PQExpBufferData word_buf;
2790 int word_offset;
2791 int offsets[MAX_ARGS]; /* offsets of argument words */
2792 int start_offset,
2793 end_offset;
2794 int lineno;
2795 int j;
2796
2797 initPQExpBuffer(&word_buf);
2798
2799 /* Remember location of the backslash */
2800 start_offset = expr_scanner_offset(sstate) - 1;
2801 lineno = expr_scanner_get_lineno(sstate, start_offset);
2802
2803 /* Collect first word of command */
2804 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
2805 {
2806 termPQExpBuffer(&word_buf);
2807 return NULL;
2808 }
2809
2810 /* Allocate and initialize Command structure */
2811 my_command = (Command *) pg_malloc0(sizeof(Command));
2812 my_command->command_num = num_commands++;
2813 my_command->type = META_COMMAND;
2814 my_command->argc = 0;
2815 initSimpleStats(&my_command->stats);
2816
2817 /* Save first word (command name) */
2818 j = 0;
2819 offsets[j] = word_offset;
2820 my_command->argv[j++] = pg_strdup(word_buf.data);
2821 my_command->argc++;
2822
2823 if (pg_strcasecmp(my_command->argv[0], "set") == 0)
2824 {
2825 /* For \set, collect var name, then lex the expression. */
2826 yyscan_t yyscanner;
2827
2828 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
2829 syntax_error(source, lineno, my_command->line, my_command->argv[0],
2830 "missing argument", NULL, -1);
2831
2832 offsets[j] = word_offset;
2833 my_command->argv[j++] = pg_strdup(word_buf.data);
2834 my_command->argc++;
2835
2836 yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
2837 my_command->argv[0]);
2838
2839 if (expr_yyparse(yyscanner) != 0)
2840 {
2841 /* dead code: exit done from syntax_error called by yyerror */
2842 exit(1);
2843 }
2844
2845 my_command->expr = expr_parse_result;
2846
2847 /* Get location of the ending newline */
2848 end_offset = expr_scanner_offset(sstate) - 1;
2849
2850 /* Save line */
2851 my_command->line = expr_scanner_get_substring(sstate,
2852 start_offset,
2853 end_offset);
2854
2855 expr_scanner_finish(yyscanner);
2856
2857 termPQExpBuffer(&word_buf);
2858
2859 return my_command;
2860 }
2861
2862 /* For all other commands, collect remaining words. */
2863 while (expr_lex_one_word(sstate, &word_buf, &word_offset))
2864 {
2865 if (j >= MAX_ARGS)
2866 syntax_error(source, lineno, my_command->line, my_command->argv[0],
2867 "too many arguments", NULL, -1);
2868
2869 offsets[j] = word_offset;
2870 my_command->argv[j++] = pg_strdup(word_buf.data);
2871 my_command->argc++;
2872 }
2873
2874 /* Get location of the ending newline */
2875 end_offset = expr_scanner_offset(sstate) - 1;
2876
2877 /* Save line */
2878 my_command->line = expr_scanner_get_substring(sstate,
2879 start_offset,
2880 end_offset);
2881
2882 if (pg_strcasecmp(my_command->argv[0], "sleep") == 0)
2883 {
2884 if (my_command->argc < 2)
2885 syntax_error(source, lineno, my_command->line, my_command->argv[0],
2886 "missing argument", NULL, -1);
2887
2888 if (my_command->argc > 3)
2889 syntax_error(source, lineno, my_command->line, my_command->argv[0],
2890 "too many arguments", NULL,
2891 offsets[3] - start_offset);
2892
2893 /*
2894 * Split argument into number and unit to allow "sleep 1ms" etc. We
2895 * don't have to terminate the number argument with null because it
2896 * will be parsed with atoi, which ignores trailing non-digit
2897 * characters.
2898 */
2899 if (my_command->argc == 2 && my_command->argv[1][0] != ':')
2900 {
2901 char *c = my_command->argv[1];
2902
2903 while (isdigit((unsigned char) *c))
2904 c++;
2905 if (*c)
2906 {
2907 my_command->argv[2] = c;
2908 offsets[2] = offsets[1] + (c - my_command->argv[1]);
2909 my_command->argc = 3;
2910 }
2911 }
2912
2913 if (my_command->argc == 3)
2914 {
2915 if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
2916 pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
2917 pg_strcasecmp(my_command->argv[2], "s") != 0)
2918 syntax_error(source, lineno, my_command->line, my_command->argv[0],
2919 "unrecognized time unit, must be us, ms or s",
2920 my_command->argv[2], offsets[2] - start_offset);
2921 }
2922 }
2923 else if (pg_strcasecmp(my_command->argv[0], "setshell") == 0)
2924 {
2925 if (my_command->argc < 3)
2926 syntax_error(source, lineno, my_command->line, my_command->argv[0],
2927 "missing argument", NULL, -1);
2928 }
2929 else if (pg_strcasecmp(my_command->argv[0], "shell") == 0)
2930 {
2931 if (my_command->argc < 2)
2932 syntax_error(source, lineno, my_command->line, my_command->argv[0],
2933 "missing command", NULL, -1);
2934 }
2935 else
2936 {
2937 syntax_error(source, lineno, my_command->line, my_command->argv[0],
2938 "invalid command", NULL, -1);
2939 }
2940
2941 termPQExpBuffer(&word_buf);
2942
2943 return my_command;
2944 }
2945
2946 /*
2947 * Parse a script (either the contents of a file, or a built-in script)
2948 * and add it to the list of scripts.
2949 */
2950 static void
ParseScript(const char * script,const char * desc,int weight)2951 ParseScript(const char *script, const char *desc, int weight)
2952 {
2953 ParsedScript ps;
2954 PsqlScanState sstate;
2955 PQExpBufferData line_buf;
2956 int alloc_num;
2957 int index;
2958
2959 #define COMMANDS_ALLOC_NUM 128
2960 alloc_num = COMMANDS_ALLOC_NUM;
2961
2962 /* Initialize all fields of ps */
2963 ps.desc = desc;
2964 ps.weight = weight;
2965 ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2966 initStats(&ps.stats, 0.0);
2967
2968 /* Prepare to parse script */
2969 sstate = psql_scan_create(&pgbench_callbacks);
2970
2971 /*
2972 * Ideally, we'd scan scripts using the encoding and stdstrings settings
2973 * we get from a DB connection. However, without major rearrangement of
2974 * pgbench's argument parsing, we can't have a DB connection at the time
2975 * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
2976 * with any backend-safe encoding, though conceivably we could be fooled
2977 * if a script file uses a client-only encoding. We also assume that
2978 * stdstrings should be true, which is a bit riskier.
2979 */
2980 psql_scan_setup(sstate, script, strlen(script), 0, true);
2981
2982 initPQExpBuffer(&line_buf);
2983
2984 index = 0;
2985
2986 for (;;)
2987 {
2988 PsqlScanResult sr;
2989 promptStatus_t prompt;
2990 Command *command;
2991
2992 resetPQExpBuffer(&line_buf);
2993
2994 sr = psql_scan(sstate, &line_buf, &prompt);
2995
2996 /* If we collected a SQL command, process that */
2997 command = process_sql_command(&line_buf, desc);
2998 if (command)
2999 {
3000 ps.commands[index] = command;
3001 index++;
3002
3003 if (index >= alloc_num)
3004 {
3005 alloc_num += COMMANDS_ALLOC_NUM;
3006 ps.commands = (Command **)
3007 pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
3008 }
3009 }
3010
3011 /* If we reached a backslash, process that */
3012 if (sr == PSCAN_BACKSLASH)
3013 {
3014 command = process_backslash_command(sstate, desc);
3015 if (command)
3016 {
3017 ps.commands[index] = command;
3018 index++;
3019
3020 if (index >= alloc_num)
3021 {
3022 alloc_num += COMMANDS_ALLOC_NUM;
3023 ps.commands = (Command **)
3024 pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
3025 }
3026 }
3027 }
3028
3029 /* Done if we reached EOF */
3030 if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
3031 break;
3032 }
3033
3034 ps.commands[index] = NULL;
3035
3036 addScript(ps);
3037
3038 termPQExpBuffer(&line_buf);
3039 psql_scan_finish(sstate);
3040 psql_scan_destroy(sstate);
3041 }
3042
3043 /*
3044 * Read the entire contents of file fd, and return it in a malloc'd buffer.
3045 *
3046 * The buffer will typically be larger than necessary, but we don't care
3047 * in this program, because we'll free it as soon as we've parsed the script.
3048 */
3049 static char *
read_file_contents(FILE * fd)3050 read_file_contents(FILE *fd)
3051 {
3052 char *buf;
3053 size_t buflen = BUFSIZ;
3054 size_t used = 0;
3055
3056 buf = (char *) pg_malloc(buflen);
3057
3058 for (;;)
3059 {
3060 size_t nread;
3061
3062 nread = fread(buf + used, 1, BUFSIZ, fd);
3063 used += nread;
3064 /* If fread() read less than requested, must be EOF or error */
3065 if (nread < BUFSIZ)
3066 break;
3067 /* Enlarge buf so we can read some more */
3068 buflen += BUFSIZ;
3069 buf = (char *) pg_realloc(buf, buflen);
3070 }
3071 /* There is surely room for a terminator */
3072 buf[used] = '\0';
3073
3074 return buf;
3075 }
3076
3077 /*
3078 * Given a file name, read it and add its script to the list.
3079 * "-" means to read stdin.
3080 * NB: filename must be storage that won't disappear.
3081 */
3082 static void
process_file(const char * filename,int weight)3083 process_file(const char *filename, int weight)
3084 {
3085 FILE *fd;
3086 char *buf;
3087
3088 /* Slurp the file contents into "buf" */
3089 if (strcmp(filename, "-") == 0)
3090 fd = stdin;
3091 else if ((fd = fopen(filename, "r")) == NULL)
3092 {
3093 fprintf(stderr, "could not open file \"%s\": %s\n",
3094 filename, strerror(errno));
3095 exit(1);
3096 }
3097
3098 buf = read_file_contents(fd);
3099
3100 if (ferror(fd))
3101 {
3102 fprintf(stderr, "could not read file \"%s\": %s\n",
3103 filename, strerror(errno));
3104 exit(1);
3105 }
3106
3107 if (fd != stdin)
3108 fclose(fd);
3109
3110 ParseScript(buf, filename, weight);
3111
3112 free(buf);
3113 }
3114
3115 /* Parse the given builtin script and add it to the list. */
3116 static void
process_builtin(const BuiltinScript * bi,int weight)3117 process_builtin(const BuiltinScript *bi, int weight)
3118 {
3119 ParseScript(bi->script, bi->desc, weight);
3120 }
3121
3122 /* show available builtin scripts */
3123 static void
listAvailableScripts(void)3124 listAvailableScripts(void)
3125 {
3126 int i;
3127
3128 fprintf(stderr, "Available builtin scripts:\n");
3129 for (i = 0; i < lengthof(builtin_script); i++)
3130 fprintf(stderr, "\t%s\n", builtin_script[i].name);
3131 fprintf(stderr, "\n");
3132 }
3133
3134 /* return builtin script "name" if unambiguous, fails if not found */
3135 static const BuiltinScript *
findBuiltin(const char * name)3136 findBuiltin(const char *name)
3137 {
3138 int i,
3139 found = 0,
3140 len = strlen(name);
3141 const BuiltinScript *result = NULL;
3142
3143 for (i = 0; i < lengthof(builtin_script); i++)
3144 {
3145 if (strncmp(builtin_script[i].name, name, len) == 0)
3146 {
3147 result = &builtin_script[i];
3148 found++;
3149 }
3150 }
3151
3152 /* ok, unambiguous result */
3153 if (found == 1)
3154 return result;
3155
3156 /* error cases */
3157 if (found == 0)
3158 fprintf(stderr, "no builtin script found for name \"%s\"\n", name);
3159 else /* found > 1 */
3160 fprintf(stderr,
3161 "ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name);
3162
3163 listAvailableScripts();
3164 exit(1);
3165 }
3166
3167 /*
3168 * Determine the weight specification from a script option (-b, -f), if any,
3169 * and return it as an integer (1 is returned if there's no weight). The
3170 * script name is returned in *script as a malloc'd string.
3171 */
3172 static int
parseScriptWeight(const char * option,char ** script)3173 parseScriptWeight(const char *option, char **script)
3174 {
3175 char *sep;
3176 int weight;
3177
3178 if ((sep = strrchr(option, WSEP)))
3179 {
3180 int namelen = sep - option;
3181 long wtmp;
3182 char *badp;
3183
3184 /* generate the script name */
3185 *script = pg_malloc(namelen + 1);
3186 strncpy(*script, option, namelen);
3187 (*script)[namelen] = '\0';
3188
3189 /* process digits of the weight spec */
3190 errno = 0;
3191 wtmp = strtol(sep + 1, &badp, 10);
3192 if (errno != 0 || badp == sep + 1 || *badp != '\0')
3193 {
3194 fprintf(stderr, "invalid weight specification: %s\n", sep);
3195 exit(1);
3196 }
3197 if (wtmp > INT_MAX || wtmp < 0)
3198 {
3199 fprintf(stderr,
3200 "weight specification out of range (0 .. %u): " INT64_FORMAT "\n",
3201 INT_MAX, (int64) wtmp);
3202 exit(1);
3203 }
3204 weight = wtmp;
3205 }
3206 else
3207 {
3208 *script = pg_strdup(option);
3209 weight = 1;
3210 }
3211
3212 return weight;
3213 }
3214
3215 /* append a script to the list of scripts to process */
3216 static void
addScript(ParsedScript script)3217 addScript(ParsedScript script)
3218 {
3219 if (script.commands == NULL || script.commands[0] == NULL)
3220 {
3221 fprintf(stderr, "empty command list for script \"%s\"\n", script.desc);
3222 exit(1);
3223 }
3224
3225 if (num_scripts >= MAX_SCRIPTS)
3226 {
3227 fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS);
3228 exit(1);
3229 }
3230
3231 sql_script[num_scripts] = script;
3232 num_scripts++;
3233 }
3234
3235 static void
printSimpleStats(char * prefix,SimpleStats * ss)3236 printSimpleStats(char *prefix, SimpleStats *ss)
3237 {
3238 /* print NaN if no transactions where executed */
3239 double latency = ss->sum / ss->count;
3240 double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
3241
3242 printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
3243 printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
3244 }
3245
3246 /* print out results */
3247 static void
printResults(TState * threads,StatsData * total,instr_time total_time,instr_time conn_total_time,int latency_late)3248 printResults(TState *threads, StatsData *total, instr_time total_time,
3249 instr_time conn_total_time, int latency_late)
3250 {
3251 double time_include,
3252 tps_include,
3253 tps_exclude;
3254
3255 time_include = INSTR_TIME_GET_DOUBLE(total_time);
3256 tps_include = total->cnt / time_include;
3257 tps_exclude = total->cnt / (time_include -
3258 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
3259
3260 /* Report test parameters. */
3261 printf("transaction type: %s\n",
3262 num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
3263 printf("scaling factor: %d\n", scale);
3264 printf("query mode: %s\n", QUERYMODE[querymode]);
3265 printf("number of clients: %d\n", nclients);
3266 printf("number of threads: %d\n", nthreads);
3267 if (duration <= 0)
3268 {
3269 printf("number of transactions per client: %d\n", nxacts);
3270 printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
3271 total->cnt, nxacts * nclients);
3272 }
3273 else
3274 {
3275 printf("duration: %d s\n", duration);
3276 printf("number of transactions actually processed: " INT64_FORMAT "\n",
3277 total->cnt);
3278 }
3279
3280 /* Remaining stats are nonsensical if we failed to execute any xacts */
3281 if (total->cnt <= 0)
3282 return;
3283
3284 if (throttle_delay && latency_limit)
3285 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
3286 total->skipped,
3287 100.0 * total->skipped / (total->skipped + total->cnt));
3288
3289 if (latency_limit)
3290 printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n",
3291 latency_limit / 1000.0, latency_late,
3292 100.0 * latency_late / (total->skipped + total->cnt));
3293
3294 if (throttle_delay || progress || latency_limit)
3295 printSimpleStats("latency", &total->latency);
3296 else
3297 {
3298 /* no measurement, show average latency computed from run time */
3299 printf("latency average = %.3f ms\n",
3300 1000.0 * time_include * nclients / total->cnt);
3301 }
3302
3303 if (throttle_delay)
3304 {
3305 /*
3306 * Report average transaction lag under rate limit throttling. This
3307 * is the delay between scheduled and actual start times for the
3308 * transaction. The measured lag may be caused by thread/client load,
3309 * the database load, or the Poisson throttling process.
3310 */
3311 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
3312 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
3313 }
3314
3315 printf("tps = %f (including connections establishing)\n", tps_include);
3316 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
3317
3318 /* Report per-script/command statistics */
3319 if (per_script_stats || latency_limit || is_latencies)
3320 {
3321 int i;
3322
3323 for (i = 0; i < num_scripts; i++)
3324 {
3325 if (num_scripts > 1)
3326 printf("SQL script %d: %s\n"
3327 " - weight: %d (targets %.1f%% of total)\n"
3328 " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
3329 i + 1, sql_script[i].desc,
3330 sql_script[i].weight,
3331 100.0 * sql_script[i].weight / total_weight,
3332 sql_script[i].stats.cnt,
3333 100.0 * sql_script[i].stats.cnt / total->cnt,
3334 sql_script[i].stats.cnt / time_include);
3335 else
3336 printf("script statistics:\n");
3337
3338 if (latency_limit)
3339 printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
3340 sql_script[i].stats.skipped,
3341 100.0 * sql_script[i].stats.skipped /
3342 (sql_script[i].stats.skipped + sql_script[i].stats.cnt));
3343
3344 if (num_scripts > 1)
3345 printSimpleStats(" - latency", &sql_script[i].stats.latency);
3346
3347 /* Report per-command latencies */
3348 if (is_latencies)
3349 {
3350 Command **commands;
3351
3352 printf(" - statement latencies in milliseconds:\n");
3353
3354 for (commands = sql_script[i].commands;
3355 *commands != NULL;
3356 commands++)
3357 printf(" %11.3f %s\n",
3358 1000.0 * (*commands)->stats.sum /
3359 (*commands)->stats.count,
3360 (*commands)->line);
3361 }
3362 }
3363 }
3364 }
3365
3366
3367 int
main(int argc,char ** argv)3368 main(int argc, char **argv)
3369 {
3370 static struct option long_options[] = {
3371 /* systematic long/short named options */
3372 {"builtin", required_argument, NULL, 'b'},
3373 {"client", required_argument, NULL, 'c'},
3374 {"connect", no_argument, NULL, 'C'},
3375 {"debug", no_argument, NULL, 'd'},
3376 {"define", required_argument, NULL, 'D'},
3377 {"file", required_argument, NULL, 'f'},
3378 {"fillfactor", required_argument, NULL, 'F'},
3379 {"host", required_argument, NULL, 'h'},
3380 {"initialize", no_argument, NULL, 'i'},
3381 {"jobs", required_argument, NULL, 'j'},
3382 {"log", no_argument, NULL, 'l'},
3383 {"latency-limit", required_argument, NULL, 'L'},
3384 {"no-vacuum", no_argument, NULL, 'n'},
3385 {"port", required_argument, NULL, 'p'},
3386 {"progress", required_argument, NULL, 'P'},
3387 {"protocol", required_argument, NULL, 'M'},
3388 {"quiet", no_argument, NULL, 'q'},
3389 {"report-latencies", no_argument, NULL, 'r'},
3390 {"rate", required_argument, NULL, 'R'},
3391 {"scale", required_argument, NULL, 's'},
3392 {"select-only", no_argument, NULL, 'S'},
3393 {"skip-some-updates", no_argument, NULL, 'N'},
3394 {"time", required_argument, NULL, 'T'},
3395 {"transactions", required_argument, NULL, 't'},
3396 {"username", required_argument, NULL, 'U'},
3397 {"vacuum-all", no_argument, NULL, 'v'},
3398 /* long-named only options */
3399 {"foreign-keys", no_argument, &foreign_keys, 1},
3400 {"index-tablespace", required_argument, NULL, 3},
3401 {"tablespace", required_argument, NULL, 2},
3402 {"unlogged-tables", no_argument, &unlogged_tables, 1},
3403 {"sampling-rate", required_argument, NULL, 4},
3404 {"aggregate-interval", required_argument, NULL, 5},
3405 {"progress-timestamp", no_argument, NULL, 6},
3406 {NULL, 0, NULL, 0}
3407 };
3408
3409 int c;
3410 int is_init_mode = 0; /* initialize mode? */
3411 int is_no_vacuum = 0; /* no vacuum at all before testing? */
3412 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
3413 int optindex;
3414 bool scale_given = false;
3415
3416 bool benchmarking_option_set = false;
3417 bool initialization_option_set = false;
3418 bool internal_script_used = false;
3419
3420 CState *state; /* status of clients */
3421 TState *threads; /* array of thread */
3422
3423 instr_time start_time; /* start up time */
3424 instr_time total_time;
3425 instr_time conn_total_time;
3426 int64 latency_late = 0;
3427 StatsData stats;
3428 int weight;
3429
3430 int i;
3431 int nclients_dealt;
3432
3433 #ifdef HAVE_GETRLIMIT
3434 struct rlimit rlim;
3435 #endif
3436
3437 PGconn *con;
3438 PGresult *res;
3439 char *env;
3440
3441 progname = get_progname(argv[0]);
3442
3443 if (argc > 1)
3444 {
3445 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
3446 {
3447 usage();
3448 exit(0);
3449 }
3450 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
3451 {
3452 puts("pgbench (PostgreSQL) " PG_VERSION);
3453 exit(0);
3454 }
3455 }
3456
3457 #ifdef WIN32
3458 /* stderr is buffered on Win32. */
3459 setvbuf(stderr, NULL, _IONBF, 0);
3460 #endif
3461
3462 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
3463 pghost = env;
3464 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
3465 pgport = env;
3466 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
3467 login = env;
3468
3469 state = (CState *) pg_malloc(sizeof(CState));
3470 memset(state, 0, sizeof(CState));
3471
3472 while ((c = getopt_long(argc, argv, "ih:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
3473 {
3474 char *script;
3475
3476 switch (c)
3477 {
3478 case 'i':
3479 is_init_mode++;
3480 break;
3481 case 'h':
3482 pghost = pg_strdup(optarg);
3483 break;
3484 case 'n':
3485 is_no_vacuum++;
3486 break;
3487 case 'v':
3488 do_vacuum_accounts++;
3489 break;
3490 case 'p':
3491 pgport = pg_strdup(optarg);
3492 break;
3493 case 'd':
3494 debug++;
3495 break;
3496 case 'c':
3497 benchmarking_option_set = true;
3498 nclients = atoi(optarg);
3499 if (nclients <= 0 || nclients > MAXCLIENTS)
3500 {
3501 fprintf(stderr, "invalid number of clients: \"%s\"\n",
3502 optarg);
3503 exit(1);
3504 }
3505 #ifdef HAVE_GETRLIMIT
3506 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
3507 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
3508 #else /* but BSD doesn't ... */
3509 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
3510 #endif /* RLIMIT_NOFILE */
3511 {
3512 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
3513 exit(1);
3514 }
3515 if (rlim.rlim_cur < nclients + 3)
3516 {
3517 fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
3518 nclients + 3, (long) rlim.rlim_cur);
3519 fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
3520 exit(1);
3521 }
3522 #endif /* HAVE_GETRLIMIT */
3523 break;
3524 case 'j': /* jobs */
3525 benchmarking_option_set = true;
3526 nthreads = atoi(optarg);
3527 if (nthreads <= 0)
3528 {
3529 fprintf(stderr, "invalid number of threads: \"%s\"\n",
3530 optarg);
3531 exit(1);
3532 }
3533 #ifndef ENABLE_THREAD_SAFETY
3534 if (nthreads != 1)
3535 {
3536 fprintf(stderr, "threads are not supported on this platform; use -j1\n");
3537 exit(1);
3538 }
3539 #endif /* !ENABLE_THREAD_SAFETY */
3540 break;
3541 case 'C':
3542 benchmarking_option_set = true;
3543 is_connect = true;
3544 break;
3545 case 'r':
3546 benchmarking_option_set = true;
3547 per_script_stats = true;
3548 is_latencies = true;
3549 break;
3550 case 's':
3551 scale_given = true;
3552 scale = atoi(optarg);
3553 if (scale <= 0)
3554 {
3555 fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
3556 exit(1);
3557 }
3558 break;
3559 case 't':
3560 benchmarking_option_set = true;
3561 if (duration > 0)
3562 {
3563 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
3564 exit(1);
3565 }
3566 nxacts = atoi(optarg);
3567 if (nxacts <= 0)
3568 {
3569 fprintf(stderr, "invalid number of transactions: \"%s\"\n",
3570 optarg);
3571 exit(1);
3572 }
3573 break;
3574 case 'T':
3575 benchmarking_option_set = true;
3576 if (nxacts > 0)
3577 {
3578 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
3579 exit(1);
3580 }
3581 duration = atoi(optarg);
3582 if (duration <= 0)
3583 {
3584 fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
3585 exit(1);
3586 }
3587 break;
3588 case 'U':
3589 login = pg_strdup(optarg);
3590 break;
3591 case 'l':
3592 benchmarking_option_set = true;
3593 use_log = true;
3594 break;
3595 case 'q':
3596 initialization_option_set = true;
3597 use_quiet = true;
3598 break;
3599
3600 case 'b':
3601 if (strcmp(optarg, "list") == 0)
3602 {
3603 listAvailableScripts();
3604 exit(0);
3605 }
3606
3607 weight = parseScriptWeight(optarg, &script);
3608 process_builtin(findBuiltin(script), weight);
3609 benchmarking_option_set = true;
3610 internal_script_used = true;
3611 break;
3612
3613 case 'S':
3614 process_builtin(findBuiltin("select-only"), 1);
3615 benchmarking_option_set = true;
3616 internal_script_used = true;
3617 break;
3618 case 'N':
3619 process_builtin(findBuiltin("simple-update"), 1);
3620 benchmarking_option_set = true;
3621 internal_script_used = true;
3622 break;
3623 case 'f':
3624 weight = parseScriptWeight(optarg, &script);
3625 process_file(script, weight);
3626 benchmarking_option_set = true;
3627 break;
3628 case 'D':
3629 {
3630 char *p;
3631
3632 benchmarking_option_set = true;
3633
3634 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
3635 {
3636 fprintf(stderr, "invalid variable definition: \"%s\"\n",
3637 optarg);
3638 exit(1);
3639 }
3640
3641 *p++ = '\0';
3642 if (!putVariable(&state[0], "option", optarg, p))
3643 exit(1);
3644 }
3645 break;
3646 case 'F':
3647 initialization_option_set = true;
3648 fillfactor = atoi(optarg);
3649 if (fillfactor < 10 || fillfactor > 100)
3650 {
3651 fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
3652 exit(1);
3653 }
3654 break;
3655 case 'M':
3656 benchmarking_option_set = true;
3657 if (num_scripts > 0)
3658 {
3659 fprintf(stderr, "query mode (-M) should be specified before any transaction scripts (-f or -b)\n");
3660 exit(1);
3661 }
3662 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3663 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3664 break;
3665 if (querymode >= NUM_QUERYMODE)
3666 {
3667 fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
3668 optarg);
3669 exit(1);
3670 }
3671 break;
3672 case 'P':
3673 benchmarking_option_set = true;
3674 progress = atoi(optarg);
3675 if (progress <= 0)
3676 {
3677 fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
3678 optarg);
3679 exit(1);
3680 }
3681 break;
3682 case 'R':
3683 {
3684 /* get a double from the beginning of option value */
3685 double throttle_value = atof(optarg);
3686
3687 benchmarking_option_set = true;
3688
3689 if (throttle_value <= 0.0)
3690 {
3691 fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
3692 exit(1);
3693 }
3694 /* Invert rate limit into a time offset */
3695 throttle_delay = (int64) (1000000.0 / throttle_value);
3696 }
3697 break;
3698 case 'L':
3699 {
3700 double limit_ms = atof(optarg);
3701
3702 if (limit_ms <= 0.0)
3703 {
3704 fprintf(stderr, "invalid latency limit: \"%s\"\n",
3705 optarg);
3706 exit(1);
3707 }
3708 benchmarking_option_set = true;
3709 latency_limit = (int64) (limit_ms * 1000);
3710 }
3711 break;
3712 case 0:
3713 /* This covers long options which take no argument. */
3714 if (foreign_keys || unlogged_tables)
3715 initialization_option_set = true;
3716 break;
3717 case 2: /* tablespace */
3718 initialization_option_set = true;
3719 tablespace = pg_strdup(optarg);
3720 break;
3721 case 3: /* index-tablespace */
3722 initialization_option_set = true;
3723 index_tablespace = pg_strdup(optarg);
3724 break;
3725 case 4:
3726 benchmarking_option_set = true;
3727 sample_rate = atof(optarg);
3728 if (sample_rate <= 0.0 || sample_rate > 1.0)
3729 {
3730 fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
3731 exit(1);
3732 }
3733 break;
3734 case 5:
3735 #ifdef WIN32
3736 fprintf(stderr, "--aggregate-interval is not currently supported on Windows\n");
3737 exit(1);
3738 #else
3739 benchmarking_option_set = true;
3740 agg_interval = atoi(optarg);
3741 if (agg_interval <= 0)
3742 {
3743 fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
3744 optarg);
3745 exit(1);
3746 }
3747 #endif
3748 break;
3749 case 6:
3750 progress_timestamp = true;
3751 benchmarking_option_set = true;
3752 break;
3753 default:
3754 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3755 exit(1);
3756 break;
3757 }
3758 }
3759
3760 /* set default script if none */
3761 if (num_scripts == 0 && !is_init_mode)
3762 {
3763 process_builtin(findBuiltin("tpcb-like"), 1);
3764 benchmarking_option_set = true;
3765 internal_script_used = true;
3766 }
3767
3768 /* compute total_weight */
3769 for (i = 0; i < num_scripts; i++)
3770 /* cannot overflow: weight is 32b, total_weight 64b */
3771 total_weight += sql_script[i].weight;
3772
3773 if (total_weight == 0 && !is_init_mode)
3774 {
3775 fprintf(stderr, "total script weight must not be zero\n");
3776 exit(1);
3777 }
3778
3779 /* show per script stats if several scripts are used */
3780 if (num_scripts > 1)
3781 per_script_stats = true;
3782
3783 /*
3784 * Don't need more threads than there are clients. (This is not merely an
3785 * optimization; throttle_delay is calculated incorrectly below if some
3786 * threads have no clients assigned to them.)
3787 */
3788 if (nthreads > nclients)
3789 nthreads = nclients;
3790
3791 /* compute a per thread delay */
3792 throttle_delay *= nthreads;
3793
3794 if (argc > optind)
3795 dbName = argv[optind];
3796 else
3797 {
3798 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
3799 dbName = env;
3800 else if (login != NULL && *login != '\0')
3801 dbName = login;
3802 else
3803 dbName = "";
3804 }
3805
3806 if (is_init_mode)
3807 {
3808 if (benchmarking_option_set)
3809 {
3810 fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
3811 exit(1);
3812 }
3813
3814 init(is_no_vacuum);
3815 exit(0);
3816 }
3817 else
3818 {
3819 if (initialization_option_set)
3820 {
3821 fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
3822 exit(1);
3823 }
3824 }
3825
3826 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
3827 if (nxacts <= 0 && duration <= 0)
3828 nxacts = DEFAULT_NXACTS;
3829
3830 /* --sampling-rate may be used only with -l */
3831 if (sample_rate > 0.0 && !use_log)
3832 {
3833 fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
3834 exit(1);
3835 }
3836
3837 /* --sampling-rate may not be used with --aggregate-interval */
3838 if (sample_rate > 0.0 && agg_interval > 0)
3839 {
3840 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
3841 exit(1);
3842 }
3843
3844 if (agg_interval > 0 && !use_log)
3845 {
3846 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
3847 exit(1);
3848 }
3849
3850 if (duration > 0 && agg_interval > duration)
3851 {
3852 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
3853 exit(1);
3854 }
3855
3856 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
3857 {
3858 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
3859 exit(1);
3860 }
3861
3862 /*
3863 * save main process id in the global variable because process id will be
3864 * changed after fork.
3865 */
3866 main_pid = (int) getpid();
3867
3868 if (nclients > 1)
3869 {
3870 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
3871 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
3872
3873 /* copy any -D switch values to all clients */
3874 for (i = 1; i < nclients; i++)
3875 {
3876 int j;
3877
3878 state[i].id = i;
3879 for (j = 0; j < state[0].nvariables; j++)
3880 {
3881 Variable *var = &state[0].variables[j];
3882
3883 if (var->is_numeric)
3884 {
3885 if (!putVariableNumber(&state[i], "startup",
3886 var->name, &var->num_value))
3887 exit(1);
3888 }
3889 else
3890 {
3891 if (!putVariable(&state[i], "startup",
3892 var->name, var->value))
3893 exit(1);
3894 }
3895 }
3896 }
3897 }
3898
3899 if (debug)
3900 {
3901 if (duration <= 0)
3902 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
3903 pghost, pgport, nclients, nxacts, dbName);
3904 else
3905 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
3906 pghost, pgport, nclients, duration, dbName);
3907 }
3908
3909 /* opening connection... */
3910 con = doConnect();
3911 if (con == NULL)
3912 exit(1);
3913
3914 if (internal_script_used)
3915 {
3916 /*
3917 * get the scaling factor that should be same as count(*) from
3918 * pgbench_branches if this is not a custom query
3919 */
3920 res = PQexec(con, "select count(*) from pgbench_branches");
3921 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3922 {
3923 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
3924
3925 fprintf(stderr, "%s", PQerrorMessage(con));
3926 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
3927 {
3928 fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
3929 }
3930
3931 exit(1);
3932 }
3933 scale = atoi(PQgetvalue(res, 0, 0));
3934 if (scale < 0)
3935 {
3936 fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
3937 PQgetvalue(res, 0, 0));
3938 exit(1);
3939 }
3940 PQclear(res);
3941
3942 /* warn if we override user-given -s switch */
3943 if (scale_given)
3944 fprintf(stderr,
3945 "scale option ignored, using count from pgbench_branches table (%d)\n",
3946 scale);
3947 }
3948
3949 /*
3950 * :scale variables normally get -s or database scale, but don't override
3951 * an explicit -D switch
3952 */
3953 if (lookupVariable(&state[0], "scale") == NULL)
3954 {
3955 for (i = 0; i < nclients; i++)
3956 {
3957 if (!putVariableInt(&state[i], "startup", "scale", scale))
3958 exit(1);
3959 }
3960 }
3961
3962 /*
3963 * Define a :client_id variable that is unique per connection. But don't
3964 * override an explicit -D switch.
3965 */
3966 if (lookupVariable(&state[0], "client_id") == NULL)
3967 {
3968 for (i = 0; i < nclients; i++)
3969 {
3970 if (!putVariableInt(&state[i], "startup", "client_id", i))
3971 exit(1);
3972 }
3973 }
3974
3975 if (!is_no_vacuum)
3976 {
3977 fprintf(stderr, "starting vacuum...");
3978 tryExecuteStatement(con, "vacuum pgbench_branches");
3979 tryExecuteStatement(con, "vacuum pgbench_tellers");
3980 tryExecuteStatement(con, "truncate pgbench_history");
3981 fprintf(stderr, "end.\n");
3982
3983 if (do_vacuum_accounts)
3984 {
3985 fprintf(stderr, "starting vacuum pgbench_accounts...");
3986 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
3987 fprintf(stderr, "end.\n");
3988 }
3989 }
3990 PQfinish(con);
3991
3992 /* set random seed */
3993 INSTR_TIME_SET_CURRENT(start_time);
3994 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
3995
3996 /* set up thread data structures */
3997 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
3998 nclients_dealt = 0;
3999
4000 for (i = 0; i < nthreads; i++)
4001 {
4002 TState *thread = &threads[i];
4003
4004 thread->tid = i;
4005 thread->state = &state[nclients_dealt];
4006 thread->nstate =
4007 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
4008 thread->random_state[0] = random();
4009 thread->random_state[1] = random();
4010 thread->random_state[2] = random();
4011 thread->logfile = NULL; /* filled in later */
4012 thread->latency_late = 0;
4013 initStats(&thread->stats, 0.0);
4014
4015 nclients_dealt += thread->nstate;
4016 }
4017
4018 /* all clients must be assigned to a thread */
4019 Assert(nclients_dealt == nclients);
4020
4021 /* get start up time */
4022 INSTR_TIME_SET_CURRENT(start_time);
4023
4024 /* set alarm if duration is specified. */
4025 if (duration > 0)
4026 setalarm(duration);
4027
4028 /* start threads */
4029 #ifdef ENABLE_THREAD_SAFETY
4030 for (i = 0; i < nthreads; i++)
4031 {
4032 TState *thread = &threads[i];
4033
4034 INSTR_TIME_SET_CURRENT(thread->start_time);
4035
4036 /* compute when to stop */
4037 if (duration > 0)
4038 end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
4039 (int64) 1000000 *duration;
4040
4041 /* the first thread (i = 0) is executed by main thread */
4042 if (i > 0)
4043 {
4044 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
4045
4046 if (err != 0 || thread->thread == INVALID_THREAD)
4047 {
4048 fprintf(stderr, "could not create thread: %s\n", strerror(err));
4049 exit(1);
4050 }
4051 }
4052 else
4053 {
4054 thread->thread = INVALID_THREAD;
4055 }
4056 }
4057 #else
4058 INSTR_TIME_SET_CURRENT(threads[0].start_time);
4059 /* compute when to stop */
4060 if (duration > 0)
4061 end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
4062 (int64) 1000000 *duration;
4063 threads[0].thread = INVALID_THREAD;
4064 #endif /* ENABLE_THREAD_SAFETY */
4065
4066 /* wait for threads and accumulate results */
4067 initStats(&stats, 0.0);
4068 INSTR_TIME_SET_ZERO(conn_total_time);
4069 for (i = 0; i < nthreads; i++)
4070 {
4071 TState *thread = &threads[i];
4072
4073 #ifdef ENABLE_THREAD_SAFETY
4074 if (threads[i].thread == INVALID_THREAD)
4075 /* actually run this thread directly in the main thread */
4076 (void) threadRun(thread);
4077 else
4078 /* wait of other threads. should check that 0 is returned? */
4079 pthread_join(thread->thread, NULL);
4080 #else
4081 (void) threadRun(thread);
4082 #endif /* ENABLE_THREAD_SAFETY */
4083
4084 /* aggregate thread level stats */
4085 mergeSimpleStats(&stats.latency, &thread->stats.latency);
4086 mergeSimpleStats(&stats.lag, &thread->stats.lag);
4087 stats.cnt += thread->stats.cnt;
4088 stats.skipped += thread->stats.skipped;
4089 latency_late += thread->latency_late;
4090 INSTR_TIME_ADD(conn_total_time, thread->conn_time);
4091 }
4092 disconnect_all(state, nclients);
4093
4094 /*
4095 * XXX We compute results as though every client of every thread started
4096 * and finished at the same time. That model can diverge noticeably from
4097 * reality for a short benchmark run involving relatively many threads.
4098 * The first thread may process notably many transactions before the last
4099 * thread begins. Improving the model alone would bring limited benefit,
4100 * because performance during those periods of partial thread count can
4101 * easily exceed steady state performance. This is one of the many ways
4102 * short runs convey deceptive performance figures.
4103 */
4104 INSTR_TIME_SET_CURRENT(total_time);
4105 INSTR_TIME_SUBTRACT(total_time, start_time);
4106 printResults(threads, &stats, total_time, conn_total_time, latency_late);
4107
4108 return 0;
4109 }
4110
4111 static void *
threadRun(void * arg)4112 threadRun(void *arg)
4113 {
4114 TState *thread = (TState *) arg;
4115 CState *state = thread->state;
4116 instr_time start,
4117 end;
4118 int nstate = thread->nstate;
4119 int remains = nstate; /* number of remaining clients */
4120 int i;
4121
4122 /* for reporting progress: */
4123 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
4124 int64 last_report = thread_start;
4125 int64 next_report = last_report + (int64) progress * 1000000;
4126 StatsData last,
4127 aggs;
4128
4129 /*
4130 * Initialize throttling rate target for all of the thread's clients. It
4131 * might be a little more accurate to reset thread->start_time here too.
4132 * The possible drift seems too small relative to typical throttle delay
4133 * times to worry about it.
4134 */
4135 INSTR_TIME_SET_CURRENT(start);
4136 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
4137
4138 INSTR_TIME_SET_ZERO(thread->conn_time);
4139
4140 /* open log file if requested */
4141 if (use_log)
4142 {
4143 char logpath[64];
4144
4145 if (thread->tid == 0)
4146 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
4147 else
4148 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
4149 thread->logfile = fopen(logpath, "w");
4150
4151 if (thread->logfile == NULL)
4152 {
4153 fprintf(stderr, "could not open logfile \"%s\": %s\n",
4154 logpath, strerror(errno));
4155 goto done;
4156 }
4157 }
4158
4159 if (!is_connect)
4160 {
4161 /* make connections to the database */
4162 for (i = 0; i < nstate; i++)
4163 {
4164 if ((state[i].con = doConnect()) == NULL)
4165 goto done;
4166 }
4167 }
4168
4169 /* time after thread and connections set up */
4170 INSTR_TIME_SET_CURRENT(thread->conn_time);
4171 INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
4172
4173 initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
4174 last = aggs;
4175
4176 /* send start up queries in async manner */
4177 for (i = 0; i < nstate; i++)
4178 {
4179 CState *st = &state[i];
4180 int prev_ecnt = st->ecnt;
4181 Command **commands;
4182
4183 st->use_file = chooseScript(thread);
4184 commands = sql_script[st->use_file].commands;
4185 if (debug)
4186 fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
4187 sql_script[st->use_file].desc);
4188 if (!doCustom(thread, st, &aggs))
4189 remains--; /* I've aborted */
4190
4191 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
4192 {
4193 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
4194 i, st->state);
4195 remains--; /* I've aborted */
4196 PQfinish(st->con);
4197 st->con = NULL;
4198 }
4199 }
4200
4201 while (remains > 0)
4202 {
4203 fd_set input_mask;
4204 int maxsock; /* max socket number to be waited */
4205 int64 now_usec = 0;
4206 int64 min_usec;
4207
4208 FD_ZERO(&input_mask);
4209
4210 maxsock = -1;
4211 min_usec = PG_INT64_MAX;
4212 for (i = 0; i < nstate; i++)
4213 {
4214 CState *st = &state[i];
4215 Command **commands = sql_script[st->use_file].commands;
4216 int sock;
4217
4218 if (st->con == NULL)
4219 {
4220 continue;
4221 }
4222 else if (st->sleeping)
4223 {
4224 if (st->throttling && timer_exceeded)
4225 {
4226 /* interrupt client which has not started a transaction */
4227 remains--;
4228 st->sleeping = false;
4229 st->throttling = false;
4230 PQfinish(st->con);
4231 st->con = NULL;
4232 continue;
4233 }
4234 else /* just a nap from the script */
4235 {
4236 int this_usec;
4237
4238 if (min_usec == PG_INT64_MAX)
4239 {
4240 instr_time now;
4241
4242 INSTR_TIME_SET_CURRENT(now);
4243 now_usec = INSTR_TIME_GET_MICROSEC(now);
4244 }
4245
4246 this_usec = st->txn_scheduled - now_usec;
4247 if (min_usec > this_usec)
4248 min_usec = this_usec;
4249 }
4250 }
4251 else if (commands[st->state]->type == META_COMMAND)
4252 {
4253 min_usec = 0; /* the connection is ready to run */
4254 break;
4255 }
4256
4257 sock = PQsocket(st->con);
4258 if (sock < 0)
4259 {
4260 fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
4261 goto done;
4262 }
4263
4264 FD_SET(sock, &input_mask);
4265
4266 if (maxsock < sock)
4267 maxsock = sock;
4268 }
4269
4270 /* also wake up to print the next progress report on time */
4271 if (progress && min_usec > 0 && thread->tid == 0)
4272 {
4273 /* get current time if needed */
4274 if (now_usec == 0)
4275 {
4276 instr_time now;
4277
4278 INSTR_TIME_SET_CURRENT(now);
4279 now_usec = INSTR_TIME_GET_MICROSEC(now);
4280 }
4281
4282 if (now_usec >= next_report)
4283 min_usec = 0;
4284 else if ((next_report - now_usec) < min_usec)
4285 min_usec = next_report - now_usec;
4286 }
4287
4288 /*
4289 * Sleep until we receive data from the server, or a nap-time
4290 * specified in the script ends, or it's time to print a progress
4291 * report.
4292 */
4293 if (min_usec > 0 && maxsock != -1)
4294 {
4295 int nsocks; /* return from select(2) */
4296
4297 if (min_usec != PG_INT64_MAX)
4298 {
4299 struct timeval timeout;
4300
4301 timeout.tv_sec = min_usec / 1000000;
4302 timeout.tv_usec = min_usec % 1000000;
4303 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
4304 }
4305 else
4306 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
4307 if (nsocks < 0)
4308 {
4309 if (errno == EINTR)
4310 continue;
4311 /* must be something wrong */
4312 fprintf(stderr, "select() failed: %s\n", strerror(errno));
4313 goto done;
4314 }
4315 }
4316
4317 /* ok, backend returns reply */
4318 for (i = 0; i < nstate; i++)
4319 {
4320 CState *st = &state[i];
4321 Command **commands = sql_script[st->use_file].commands;
4322 int prev_ecnt = st->ecnt;
4323
4324 if (st->con)
4325 {
4326 int sock = PQsocket(st->con);
4327
4328 if (sock < 0)
4329 {
4330 fprintf(stderr, "invalid socket: %s",
4331 PQerrorMessage(st->con));
4332 goto done;
4333 }
4334 if (FD_ISSET(sock, &input_mask) ||
4335 commands[st->state]->type == META_COMMAND)
4336 {
4337 if (!doCustom(thread, st, &aggs))
4338 remains--; /* I've aborted */
4339 }
4340 }
4341 else if (is_connect && st->sleeping)
4342 {
4343 /* it is sleeping for throttling, maybe it is done, let us try */
4344 if (!doCustom(thread, st, &aggs))
4345 remains--;
4346 }
4347
4348 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
4349 {
4350 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
4351 i, st->state);
4352 remains--; /* I've aborted */
4353 PQfinish(st->con);
4354 st->con = NULL;
4355 }
4356 }
4357
4358 /* progress report by thread 0 for all threads */
4359 if (progress && thread->tid == 0)
4360 {
4361 instr_time now_time;
4362 int64 now;
4363
4364 INSTR_TIME_SET_CURRENT(now_time);
4365 now = INSTR_TIME_GET_MICROSEC(now_time);
4366 if (now >= next_report)
4367 {
4368 /* generate and show report */
4369 StatsData cur;
4370 int64 run = now - last_report;
4371 double tps,
4372 total_run,
4373 latency,
4374 sqlat,
4375 lag,
4376 stdev;
4377 char tbuf[64];
4378
4379 /*
4380 * Add up the statistics of all threads.
4381 *
4382 * XXX: No locking. There is no guarantee that we get an
4383 * atomic snapshot of the transaction count and latencies, so
4384 * these figures can well be off by a small amount. The
4385 * progress is report's purpose is to give a quick overview of
4386 * how the test is going, so that shouldn't matter too much.
4387 * (If a read from a 64-bit integer is not atomic, you might
4388 * get a "torn" read and completely bogus latencies though!)
4389 */
4390 initStats(&cur, 0.0);
4391 for (i = 0; i < nthreads; i++)
4392 {
4393 mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
4394 mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
4395 cur.cnt += thread[i].stats.cnt;
4396 cur.skipped += thread[i].stats.skipped;
4397 }
4398
4399 total_run = (now - thread_start) / 1000000.0;
4400 tps = 1000000.0 * (cur.cnt - last.cnt) / run;
4401 latency = 0.001 * (cur.latency.sum - last.latency.sum) /
4402 (cur.cnt - last.cnt);
4403 sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
4404 / (cur.cnt - last.cnt);
4405 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
4406 lag = 0.001 * (cur.lag.sum - last.lag.sum) /
4407 (cur.cnt - last.cnt);
4408
4409 if (progress_timestamp)
4410 sprintf(tbuf, "%.03f s",
4411 INSTR_TIME_GET_MILLISEC(now_time) / 1000.0);
4412 else
4413 sprintf(tbuf, "%.1f s", total_run);
4414
4415 fprintf(stderr,
4416 "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
4417 tbuf, tps, latency, stdev);
4418
4419 if (throttle_delay)
4420 {
4421 fprintf(stderr, ", lag %.3f ms", lag);
4422 if (latency_limit)
4423 fprintf(stderr, ", " INT64_FORMAT " skipped",
4424 cur.skipped - last.skipped);
4425 }
4426 fprintf(stderr, "\n");
4427
4428 last = cur;
4429 last_report = now;
4430
4431 /*
4432 * Ensure that the next report is in the future, in case
4433 * pgbench/postgres got stuck somewhere.
4434 */
4435 do
4436 {
4437 next_report += (int64) progress *1000000;
4438 } while (now >= next_report);
4439 }
4440 }
4441 }
4442
4443 done:
4444 INSTR_TIME_SET_CURRENT(start);
4445 disconnect_all(state, nstate);
4446 INSTR_TIME_SET_CURRENT(end);
4447 INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
4448 if (thread->logfile)
4449 {
4450 if (agg_interval)
4451 {
4452 /* log aggregated but not yet reported transactions */
4453 doLog(thread, state, &end, &aggs, false, 0, 0);
4454 }
4455 fclose(thread->logfile);
4456 }
4457 return NULL;
4458 }
4459
4460 /*
4461 * Support for duration option: set timer_exceeded after so many seconds.
4462 */
4463
4464 #ifndef WIN32
4465
4466 static void
handle_sig_alarm(SIGNAL_ARGS)4467 handle_sig_alarm(SIGNAL_ARGS)
4468 {
4469 timer_exceeded = true;
4470 }
4471
4472 static void
setalarm(int seconds)4473 setalarm(int seconds)
4474 {
4475 pqsignal(SIGALRM, handle_sig_alarm);
4476 alarm(seconds);
4477 }
4478
4479 #else /* WIN32 */
4480
4481 static VOID CALLBACK
win32_timer_callback(PVOID lpParameter,BOOLEAN TimerOrWaitFired)4482 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
4483 {
4484 timer_exceeded = true;
4485 }
4486
4487 static void
setalarm(int seconds)4488 setalarm(int seconds)
4489 {
4490 HANDLE queue;
4491 HANDLE timer;
4492
4493 /* This function will be called at most once, so we can cheat a bit. */
4494 queue = CreateTimerQueue();
4495 if (seconds > ((DWORD) -1) / 1000 ||
4496 !CreateTimerQueueTimer(&timer, queue,
4497 win32_timer_callback, NULL, seconds * 1000, 0,
4498 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
4499 {
4500 fprintf(stderr, "failed to set timer\n");
4501 exit(1);
4502 }
4503 }
4504
4505 /* partial pthread implementation for Windows */
4506
4507 typedef struct win32_pthread
4508 {
4509 HANDLE handle;
4510 void *(*routine) (void *);
4511 void *arg;
4512 void *result;
4513 } win32_pthread;
4514
4515 static unsigned __stdcall
win32_pthread_run(void * arg)4516 win32_pthread_run(void *arg)
4517 {
4518 win32_pthread *th = (win32_pthread *) arg;
4519
4520 th->result = th->routine(th->arg);
4521
4522 return 0;
4523 }
4524
4525 static int
pthread_create(pthread_t * thread,pthread_attr_t * attr,void * (* start_routine)(void *),void * arg)4526 pthread_create(pthread_t *thread,
4527 pthread_attr_t *attr,
4528 void *(*start_routine) (void *),
4529 void *arg)
4530 {
4531 int save_errno;
4532 win32_pthread *th;
4533
4534 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
4535 th->routine = start_routine;
4536 th->arg = arg;
4537 th->result = NULL;
4538
4539 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
4540 if (th->handle == NULL)
4541 {
4542 save_errno = errno;
4543 free(th);
4544 return save_errno;
4545 }
4546
4547 *thread = th;
4548 return 0;
4549 }
4550
4551 static int
pthread_join(pthread_t th,void ** thread_return)4552 pthread_join(pthread_t th, void **thread_return)
4553 {
4554 if (th == NULL || th->handle == NULL)
4555 return errno = EINVAL;
4556
4557 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
4558 {
4559 _dosmaperr(GetLastError());
4560 return errno;
4561 }
4562
4563 if (thread_return)
4564 *thread_return = th->result;
4565
4566 CloseHandle(th->handle);
4567 free(th);
4568 return 0;
4569 }
4570
4571 #endif /* WIN32 */
4572