1 /*-------------------------------------------------------------------------
2 *
3 * vacuumdb
4 *
5 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
6 * Portions Copyright (c) 1994, Regents of the University of California
7 *
8 * src/bin/scripts/vacuumdb.c
9 *
10 *-------------------------------------------------------------------------
11 */
12
13 #ifdef WIN32
14 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
15 #endif
16
17 #include "postgres_fe.h"
18
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22
23 #include "catalog/pg_class.h"
24
25 #include "common.h"
26 #include "fe_utils/simple_list.h"
27 #include "fe_utils/string_utils.h"
28
29
30 #define ERRCODE_UNDEFINED_TABLE "42P01"
31
32 /* Parallel vacuuming stuff */
33 typedef struct ParallelSlot
34 {
35 PGconn *connection; /* One connection */
36 bool isFree; /* Is it known to be idle? */
37 } ParallelSlot;
38
39 /* vacuum options controlled by user flags */
40 typedef struct vacuumingOptions
41 {
42 bool analyze_only;
43 bool verbose;
44 bool and_analyze;
45 bool full;
46 bool freeze;
47 } vacuumingOptions;
48
49
50 static void vacuum_one_database(const ConnParams *cparams,
51 vacuumingOptions *vacopts,
52 int stage,
53 SimpleStringList *tables,
54 int concurrentCons,
55 const char *progname, bool echo, bool quiet);
56
57 static void vacuum_all_databases(ConnParams *cparams,
58 vacuumingOptions *vacopts,
59 bool analyze_in_stages,
60 int concurrentCons,
61 const char *progname, bool echo, bool quiet);
62
63 static void prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
64 vacuumingOptions *vacopts, const char *table,
65 bool table_pre_qualified,
66 const char *progname, bool echo);
67
68 static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
69 const char *table, const char *progname, bool async);
70
71 static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
72 const char *progname);
73
74 static bool ProcessQueryResult(PGconn *conn, PGresult *result,
75 const char *progname);
76
77 static bool GetQueryResult(PGconn *conn, const char *progname);
78
79 static void DisconnectDatabase(ParallelSlot *slot);
80
81 static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
82
83 static void init_slot(ParallelSlot *slot, PGconn *conn);
84
85 static void help(const char *progname);
86
87 /* For analyze-in-stages mode */
88 #define ANALYZE_NO_STAGE -1
89 #define ANALYZE_NUM_STAGES 3
90
91
92 int
main(int argc,char * argv[])93 main(int argc, char *argv[])
94 {
95 static struct option long_options[] = {
96 {"host", required_argument, NULL, 'h'},
97 {"port", required_argument, NULL, 'p'},
98 {"username", required_argument, NULL, 'U'},
99 {"no-password", no_argument, NULL, 'w'},
100 {"password", no_argument, NULL, 'W'},
101 {"echo", no_argument, NULL, 'e'},
102 {"quiet", no_argument, NULL, 'q'},
103 {"dbname", required_argument, NULL, 'd'},
104 {"analyze", no_argument, NULL, 'z'},
105 {"analyze-only", no_argument, NULL, 'Z'},
106 {"freeze", no_argument, NULL, 'F'},
107 {"all", no_argument, NULL, 'a'},
108 {"table", required_argument, NULL, 't'},
109 {"full", no_argument, NULL, 'f'},
110 {"verbose", no_argument, NULL, 'v'},
111 {"jobs", required_argument, NULL, 'j'},
112 {"maintenance-db", required_argument, NULL, 2},
113 {"analyze-in-stages", no_argument, NULL, 3},
114 {NULL, 0, NULL, 0}
115 };
116
117 const char *progname;
118 int optindex;
119 int c;
120 const char *dbname = NULL;
121 const char *maintenance_db = NULL;
122 char *host = NULL;
123 char *port = NULL;
124 char *username = NULL;
125 enum trivalue prompt_password = TRI_DEFAULT;
126 ConnParams cparams;
127 bool echo = false;
128 bool quiet = false;
129 vacuumingOptions vacopts;
130 bool analyze_in_stages = false;
131 bool alldb = false;
132 SimpleStringList tables = {NULL, NULL};
133 int concurrentCons = 1;
134 int tbl_count = 0;
135
136 /* initialize options to all false */
137 memset(&vacopts, 0, sizeof(vacopts));
138
139 progname = get_progname(argv[0]);
140
141 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
142
143 handle_help_version_opts(argc, argv, "vacuumdb", help);
144
145 while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
146 {
147 switch (c)
148 {
149 case 'h':
150 host = pg_strdup(optarg);
151 break;
152 case 'p':
153 port = pg_strdup(optarg);
154 break;
155 case 'U':
156 username = pg_strdup(optarg);
157 break;
158 case 'w':
159 prompt_password = TRI_NO;
160 break;
161 case 'W':
162 prompt_password = TRI_YES;
163 break;
164 case 'e':
165 echo = true;
166 break;
167 case 'q':
168 quiet = true;
169 break;
170 case 'd':
171 dbname = pg_strdup(optarg);
172 break;
173 case 'z':
174 vacopts.and_analyze = true;
175 break;
176 case 'Z':
177 vacopts.analyze_only = true;
178 break;
179 case 'F':
180 vacopts.freeze = true;
181 break;
182 case 'a':
183 alldb = true;
184 break;
185 case 't':
186 {
187 simple_string_list_append(&tables, optarg);
188 tbl_count++;
189 break;
190 }
191 case 'f':
192 vacopts.full = true;
193 break;
194 case 'v':
195 vacopts.verbose = true;
196 break;
197 case 'j':
198 concurrentCons = atoi(optarg);
199 if (concurrentCons <= 0)
200 {
201 fprintf(stderr, _("%s: number of parallel jobs must be at least 1\n"),
202 progname);
203 exit(1);
204 }
205 break;
206 case 2:
207 maintenance_db = pg_strdup(optarg);
208 break;
209 case 3:
210 analyze_in_stages = vacopts.analyze_only = true;
211 break;
212 default:
213 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
214 exit(1);
215 }
216 }
217
218 /*
219 * Non-option argument specifies database name as long as it wasn't
220 * already specified with -d / --dbname
221 */
222 if (optind < argc && dbname == NULL)
223 {
224 dbname = argv[optind];
225 optind++;
226 }
227
228 if (optind < argc)
229 {
230 fprintf(stderr, _("%s: too many command-line arguments (first is \"%s\")\n"),
231 progname, argv[optind]);
232 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
233 exit(1);
234 }
235
236 if (vacopts.analyze_only)
237 {
238 if (vacopts.full)
239 {
240 fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
241 progname, "full");
242 exit(1);
243 }
244 if (vacopts.freeze)
245 {
246 fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
247 progname, "freeze");
248 exit(1);
249 }
250 /* allow 'and_analyze' with 'analyze_only' */
251 }
252
253 /* fill cparams except for dbname, which is set below */
254 cparams.pghost = host;
255 cparams.pgport = port;
256 cparams.pguser = username;
257 cparams.prompt_password = prompt_password;
258 cparams.override_dbname = NULL;
259
260 setup_cancel_handler();
261
262 /* Avoid opening extra connections. */
263 if (tbl_count && (concurrentCons > tbl_count))
264 concurrentCons = tbl_count;
265
266 if (alldb)
267 {
268 if (dbname)
269 {
270 fprintf(stderr, _("%s: cannot vacuum all databases and a specific one at the same time\n"),
271 progname);
272 exit(1);
273 }
274 if (tables.head != NULL)
275 {
276 fprintf(stderr, _("%s: cannot vacuum specific table(s) in all databases\n"),
277 progname);
278 exit(1);
279 }
280
281 cparams.dbname = maintenance_db;
282
283 vacuum_all_databases(&cparams, &vacopts,
284 analyze_in_stages,
285 concurrentCons,
286 progname, echo, quiet);
287 }
288 else
289 {
290 if (dbname == NULL)
291 {
292 if (getenv("PGDATABASE"))
293 dbname = getenv("PGDATABASE");
294 else if (getenv("PGUSER"))
295 dbname = getenv("PGUSER");
296 else
297 dbname = get_user_name_or_exit(progname);
298 }
299
300 cparams.dbname = dbname;
301
302 if (analyze_in_stages)
303 {
304 int stage;
305
306 for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
307 {
308 vacuum_one_database(&cparams, &vacopts,
309 stage,
310 &tables,
311 concurrentCons,
312 progname, echo, quiet);
313 }
314 }
315 else
316 vacuum_one_database(&cparams, &vacopts,
317 ANALYZE_NO_STAGE,
318 &tables,
319 concurrentCons,
320 progname, echo, quiet);
321 }
322
323 exit(0);
324 }
325
326 /*
327 * vacuum_one_database
328 *
329 * Process tables in the given database. If the 'tables' list is empty,
330 * process all tables in the database.
331 *
332 * Note that this function is only concerned with running exactly one stage
333 * when in analyze-in-stages mode; caller must iterate on us if necessary.
334 *
335 * If concurrentCons is > 1, multiple connections are used to vacuum tables
336 * in parallel. In this case and if the table list is empty, we first obtain
337 * a list of tables from the database.
338 */
339 static void
vacuum_one_database(const ConnParams * cparams,vacuumingOptions * vacopts,int stage,SimpleStringList * tables,int concurrentCons,const char * progname,bool echo,bool quiet)340 vacuum_one_database(const ConnParams *cparams,
341 vacuumingOptions *vacopts,
342 int stage,
343 SimpleStringList *tables,
344 int concurrentCons,
345 const char *progname, bool echo, bool quiet)
346 {
347 PQExpBufferData sql;
348 PGconn *conn;
349 SimpleStringListCell *cell;
350 ParallelSlot *slots;
351 SimpleStringList dbtables = {NULL, NULL};
352 int i;
353 bool failed = false;
354 bool parallel = concurrentCons > 1;
355 const char *stage_commands[] = {
356 "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
357 "SET default_statistics_target=10; RESET vacuum_cost_delay;",
358 "RESET default_statistics_target;"
359 };
360 const char *stage_messages[] = {
361 gettext_noop("Generating minimal optimizer statistics (1 target)"),
362 gettext_noop("Generating medium optimizer statistics (10 targets)"),
363 gettext_noop("Generating default (full) optimizer statistics")
364 };
365
366 Assert(stage == ANALYZE_NO_STAGE ||
367 (stage >= 0 && stage < ANALYZE_NUM_STAGES));
368
369 conn = connectDatabase(cparams, progname, echo, false, true);
370
371 if (!quiet)
372 {
373 if (stage != ANALYZE_NO_STAGE)
374 printf(_("%s: processing database \"%s\": %s\n"),
375 progname, PQdb(conn), _(stage_messages[stage]));
376 else
377 printf(_("%s: vacuuming database \"%s\"\n"),
378 progname, PQdb(conn));
379 fflush(stdout);
380 }
381
382 initPQExpBuffer(&sql);
383
384 /*
385 * If a table list is not provided and we're using multiple connections,
386 * prepare the list of tables by querying the catalogs.
387 */
388 if (parallel && (!tables || !tables->head))
389 {
390 PQExpBufferData buf;
391 PGresult *res;
392 int ntups;
393
394 initPQExpBuffer(&buf);
395
396 res = executeQuery(conn,
397 "SELECT c.relname, ns.nspname"
398 " FROM pg_class c, pg_namespace ns\n"
399 " WHERE relkind IN ("
400 CppAsString2(RELKIND_RELATION) ", "
401 CppAsString2(RELKIND_MATVIEW) ")"
402 " AND c.relnamespace = ns.oid\n"
403 " ORDER BY c.relpages DESC;",
404 progname, echo);
405
406 ntups = PQntuples(res);
407 for (i = 0; i < ntups; i++)
408 {
409 appendPQExpBufferStr(&buf,
410 fmtQualifiedId(PQserverVersion(conn),
411 PQgetvalue(res, i, 1),
412 PQgetvalue(res, i, 0)));
413
414 simple_string_list_append(&dbtables, buf.data);
415 resetPQExpBuffer(&buf);
416 }
417
418 termPQExpBuffer(&buf);
419 tables = &dbtables;
420
421 /*
422 * If there are more connections than vacuumable relations, we don't
423 * need to use them all.
424 */
425 if (concurrentCons > ntups)
426 concurrentCons = ntups;
427 if (concurrentCons <= 1)
428 parallel = false;
429 PQclear(res);
430 }
431
432 /*
433 * Setup the database connections. We reuse the connection we already have
434 * for the first slot. If not in parallel mode, the first slot in the
435 * array contains the connection.
436 */
437 if (concurrentCons <= 0)
438 concurrentCons = 1;
439 slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
440 init_slot(slots, conn);
441 if (parallel)
442 {
443 for (i = 1; i < concurrentCons; i++)
444 {
445 conn = connectDatabase(cparams, progname, echo, false, true);
446
447 /*
448 * Fail and exit immediately if trying to use a socket in an
449 * unsupported range. POSIX requires open(2) to use the lowest
450 * unused file descriptor and the hint given relies on that.
451 */
452 if (PQsocket(conn) >= FD_SETSIZE)
453 {
454 fprintf(stderr,
455 _("%s: too many jobs for this platform -- try %d\n"),
456 progname, i);
457 exit(1);
458 }
459
460 init_slot(slots + i, conn);
461 }
462 }
463
464 /*
465 * Prepare all the connections to run the appropriate analyze stage, if
466 * caller requested that mode.
467 */
468 if (stage != ANALYZE_NO_STAGE)
469 {
470 int j;
471
472 /* We already emitted the message above */
473
474 for (j = 0; j < concurrentCons; j++)
475 executeCommand((slots + j)->connection,
476 stage_commands[stage], progname, echo);
477 }
478
479 cell = tables ? tables->head : NULL;
480 do
481 {
482 const char *tabname = cell ? cell->val : NULL;
483 ParallelSlot *free_slot;
484
485 if (CancelRequested)
486 {
487 failed = true;
488 goto finish;
489 }
490
491 /*
492 * Get the connection slot to use. If in parallel mode, here we wait
493 * for one connection to become available if none already is. In
494 * non-parallel mode we simply use the only slot we have, which we
495 * know to be free.
496 */
497 if (parallel)
498 {
499 /*
500 * Get a free slot, waiting until one becomes free if none
501 * currently is.
502 */
503 free_slot = GetIdleSlot(slots, concurrentCons, progname);
504 if (!free_slot)
505 {
506 failed = true;
507 goto finish;
508 }
509
510 free_slot->isFree = false;
511 }
512 else
513 free_slot = slots;
514
515 /*
516 * Prepare the vacuum command. Note that in some cases this requires
517 * query execution, so be sure to use the free connection.
518 */
519 prepare_vacuum_command(&sql, free_slot->connection, vacopts, tabname,
520 tables == &dbtables, progname, echo);
521
522 /*
523 * Execute the vacuum. If not in parallel mode, this terminates the
524 * program in case of an error. (The parallel case handles query
525 * errors in ProcessQueryResult through GetIdleSlot.)
526 */
527 run_vacuum_command(free_slot->connection, sql.data,
528 echo, tabname, progname, parallel);
529
530 if (cell)
531 cell = cell->next;
532 } while (cell != NULL);
533
534 if (parallel)
535 {
536 int j;
537
538 /* wait for all connections to finish */
539 for (j = 0; j < concurrentCons; j++)
540 {
541 if (!GetQueryResult((slots + j)->connection, progname))
542 {
543 failed = true;
544 goto finish;
545 }
546 }
547 }
548
549 finish:
550 for (i = 0; i < concurrentCons; i++)
551 DisconnectDatabase(slots + i);
552 pfree(slots);
553
554 termPQExpBuffer(&sql);
555
556 if (failed)
557 exit(1);
558 }
559
560 /*
561 * Vacuum/analyze all connectable databases.
562 *
563 * In analyze-in-stages mode, we process all databases in one stage before
564 * moving on to the next stage. That ensure minimal stats are available
565 * quickly everywhere before generating more detailed ones.
566 */
567 static void
vacuum_all_databases(ConnParams * cparams,vacuumingOptions * vacopts,bool analyze_in_stages,int concurrentCons,const char * progname,bool echo,bool quiet)568 vacuum_all_databases(ConnParams *cparams,
569 vacuumingOptions *vacopts,
570 bool analyze_in_stages,
571 int concurrentCons,
572 const char *progname, bool echo, bool quiet)
573 {
574 PGconn *conn;
575 PGresult *result;
576 int stage;
577 int i;
578
579 conn = connectMaintenanceDatabase(cparams, progname, echo);
580 result = executeQuery(conn,
581 "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
582 progname, echo);
583 PQfinish(conn);
584
585 if (analyze_in_stages)
586 {
587 /*
588 * When analyzing all databases in stages, we analyze them all in the
589 * fastest stage first, so that initial statistics become available
590 * for all of them as soon as possible.
591 *
592 * This means we establish several times as many connections, but
593 * that's a secondary consideration.
594 */
595 for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
596 {
597 for (i = 0; i < PQntuples(result); i++)
598 {
599 cparams->override_dbname = PQgetvalue(result, i, 0);
600
601 vacuum_one_database(cparams, vacopts,
602 stage,
603 NULL,
604 concurrentCons,
605 progname, echo, quiet);
606 }
607 }
608 }
609 else
610 {
611 for (i = 0; i < PQntuples(result); i++)
612 {
613 cparams->override_dbname = PQgetvalue(result, i, 0);
614
615 vacuum_one_database(cparams, vacopts,
616 ANALYZE_NO_STAGE,
617 NULL,
618 concurrentCons,
619 progname, echo, quiet);
620 }
621 }
622
623 PQclear(result);
624 }
625
626 /*
627 * Construct a vacuum/analyze command to run based on the given options, in the
628 * given string buffer, which may contain previous garbage.
629 *
630 * An optional table name can be passed; this must be already be properly
631 * quoted. The command is semicolon-terminated.
632 */
633 static void
prepare_vacuum_command(PQExpBuffer sql,PGconn * conn,vacuumingOptions * vacopts,const char * table,bool table_pre_qualified,const char * progname,bool echo)634 prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
635 vacuumingOptions *vacopts, const char *table,
636 bool table_pre_qualified,
637 const char *progname, bool echo)
638 {
639 resetPQExpBuffer(sql);
640
641 if (vacopts->analyze_only)
642 {
643 appendPQExpBufferStr(sql, "ANALYZE");
644 if (vacopts->verbose)
645 appendPQExpBufferStr(sql, " VERBOSE");
646 }
647 else
648 {
649 appendPQExpBufferStr(sql, "VACUUM");
650 if (PQserverVersion(conn) >= 90000)
651 {
652 const char *paren = " (";
653 const char *comma = ", ";
654 const char *sep = paren;
655
656 if (vacopts->full)
657 {
658 appendPQExpBuffer(sql, "%sFULL", sep);
659 sep = comma;
660 }
661 if (vacopts->freeze)
662 {
663 appendPQExpBuffer(sql, "%sFREEZE", sep);
664 sep = comma;
665 }
666 if (vacopts->verbose)
667 {
668 appendPQExpBuffer(sql, "%sVERBOSE", sep);
669 sep = comma;
670 }
671 if (vacopts->and_analyze)
672 {
673 appendPQExpBuffer(sql, "%sANALYZE", sep);
674 sep = comma;
675 }
676 if (sep != paren)
677 appendPQExpBufferChar(sql, ')');
678 }
679 else
680 {
681 if (vacopts->full)
682 appendPQExpBufferStr(sql, " FULL");
683 if (vacopts->freeze)
684 appendPQExpBufferStr(sql, " FREEZE");
685 if (vacopts->verbose)
686 appendPQExpBufferStr(sql, " VERBOSE");
687 if (vacopts->and_analyze)
688 appendPQExpBufferStr(sql, " ANALYZE");
689 }
690 }
691
692 if (table)
693 {
694 appendPQExpBufferChar(sql, ' ');
695 if (table_pre_qualified)
696 appendPQExpBufferStr(sql, table);
697 else
698 appendQualifiedRelation(sql, table, conn, progname, echo);
699 }
700 appendPQExpBufferChar(sql, ';');
701 }
702
703 /*
704 * Send a vacuum/analyze command to the server. In async mode, return after
705 * sending the command; else, wait for it to finish.
706 *
707 * Any errors during command execution are reported to stderr. If async is
708 * false, this function exits the program after reporting the error.
709 */
710 static void
run_vacuum_command(PGconn * conn,const char * sql,bool echo,const char * table,const char * progname,bool async)711 run_vacuum_command(PGconn *conn, const char *sql, bool echo,
712 const char *table, const char *progname, bool async)
713 {
714 bool status;
715
716 if (async)
717 {
718 if (echo)
719 printf("%s\n", sql);
720
721 status = PQsendQuery(conn, sql) == 1;
722 }
723 else
724 status = executeMaintenanceCommand(conn, sql, echo);
725
726 if (!status)
727 {
728 if (table)
729 fprintf(stderr,
730 _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
731 progname, table, PQdb(conn), PQerrorMessage(conn));
732 else
733 fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
734 progname, PQdb(conn), PQerrorMessage(conn));
735
736 if (!async)
737 {
738 PQfinish(conn);
739 exit(1);
740 }
741 }
742 }
743
744 /*
745 * GetIdleSlot
746 * Return a connection slot that is ready to execute a command.
747 *
748 * We return the first slot we find that is marked isFree, if one is;
749 * otherwise, we loop on select() until one socket becomes available. When
750 * this happens, we read the whole set and mark as free all sockets that become
751 * available.
752 *
753 * If an error occurs, NULL is returned.
754 */
755 static ParallelSlot *
GetIdleSlot(ParallelSlot slots[],int numslots,const char * progname)756 GetIdleSlot(ParallelSlot slots[], int numslots,
757 const char *progname)
758 {
759 int i;
760 int firstFree = -1;
761
762 /* Any connection already known free? */
763 for (i = 0; i < numslots; i++)
764 {
765 if (slots[i].isFree)
766 return slots + i;
767 }
768
769 /*
770 * No free slot found, so wait until one of the connections has finished
771 * its task and return the available slot.
772 */
773 while (firstFree < 0)
774 {
775 fd_set slotset;
776 int maxFd = 0;
777 bool aborting;
778
779 /* We must reconstruct the fd_set for each call to select_loop */
780 FD_ZERO(&slotset);
781
782 for (i = 0; i < numslots; i++)
783 {
784 int sock = PQsocket(slots[i].connection);
785
786 /*
787 * We don't really expect any connections to lose their sockets
788 * after startup, but just in case, cope by ignoring them.
789 */
790 if (sock < 0)
791 continue;
792
793 FD_SET(sock, &slotset);
794 if (sock > maxFd)
795 maxFd = sock;
796 }
797
798 SetCancelConn(slots->connection);
799 i = select_loop(maxFd, &slotset, &aborting);
800 ResetCancelConn();
801
802 if (aborting)
803 {
804 /*
805 * We set the cancel-receiving connection to the one in the zeroth
806 * slot above, so fetch the error from there.
807 */
808 GetQueryResult(slots->connection, progname);
809 return NULL;
810 }
811 Assert(i != 0);
812
813 for (i = 0; i < numslots; i++)
814 {
815 int sock = PQsocket(slots[i].connection);
816
817 if (sock >= 0 && FD_ISSET(sock, &slotset))
818 {
819 /* select() says input is available, so consume it */
820 PQconsumeInput(slots[i].connection);
821 }
822
823 /* Collect result(s) as long as any are available */
824 while (!PQisBusy(slots[i].connection))
825 {
826 PGresult *result = PQgetResult(slots[i].connection);
827
828 if (result != NULL)
829 {
830 /* Check and discard the command result */
831 if (!ProcessQueryResult(slots[i].connection, result,
832 progname))
833 return NULL;
834 }
835 else
836 {
837 /* This connection has become idle */
838 slots[i].isFree = true;
839 if (firstFree < 0)
840 firstFree = i;
841 break;
842 }
843 }
844 }
845 }
846
847 return slots + firstFree;
848 }
849
850 /*
851 * ProcessQueryResult
852 *
853 * Process (and delete) a query result. Returns true if there's no error,
854 * false otherwise -- but errors about trying to vacuum a missing relation
855 * are reported and subsequently ignored.
856 */
857 static bool
ProcessQueryResult(PGconn * conn,PGresult * result,const char * progname)858 ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
859 {
860 /*
861 * If it's an error, report it. Errors about a missing table are harmless
862 * so we continue processing; but die for other errors.
863 */
864 if (PQresultStatus(result) != PGRES_COMMAND_OK)
865 {
866 char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
867
868 fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
869 progname, PQdb(conn), PQerrorMessage(conn));
870
871 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
872 {
873 PQclear(result);
874 return false;
875 }
876 }
877
878 PQclear(result);
879 return true;
880 }
881
882 /*
883 * GetQueryResult
884 *
885 * Pump the conn till it's dry of results; return false if any are errors.
886 * Note that this will block if the conn is busy.
887 */
888 static bool
GetQueryResult(PGconn * conn,const char * progname)889 GetQueryResult(PGconn *conn, const char *progname)
890 {
891 bool ok = true;
892 PGresult *result;
893
894 SetCancelConn(conn);
895 while ((result = PQgetResult(conn)) != NULL)
896 {
897 if (!ProcessQueryResult(conn, result, progname))
898 ok = false;
899 }
900 ResetCancelConn();
901 return ok;
902 }
903
904 /*
905 * DisconnectDatabase
906 * Disconnect the connection associated with the given slot
907 */
908 static void
DisconnectDatabase(ParallelSlot * slot)909 DisconnectDatabase(ParallelSlot *slot)
910 {
911 char errbuf[256];
912
913 if (!slot->connection)
914 return;
915
916 if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
917 {
918 PGcancel *cancel;
919
920 if ((cancel = PQgetCancel(slot->connection)))
921 {
922 (void) PQcancel(cancel, errbuf, sizeof(errbuf));
923 PQfreeCancel(cancel);
924 }
925 }
926
927 PQfinish(slot->connection);
928 slot->connection = NULL;
929 }
930
931 /*
932 * Loop on select() until a descriptor from the given set becomes readable.
933 *
934 * If we get a cancel request while we're waiting, we forego all further
935 * processing and set the *aborting flag to true. The return value must be
936 * ignored in this case. Otherwise, *aborting is set to false.
937 */
938 static int
select_loop(int maxFd,fd_set * workerset,bool * aborting)939 select_loop(int maxFd, fd_set *workerset, bool *aborting)
940 {
941 int i;
942 fd_set saveSet = *workerset;
943
944 if (CancelRequested)
945 {
946 *aborting = true;
947 return -1;
948 }
949 else
950 *aborting = false;
951
952 for (;;)
953 {
954 /*
955 * On Windows, we need to check once in a while for cancel requests;
956 * on other platforms we rely on select() returning when interrupted.
957 */
958 struct timeval *tvp;
959 #ifdef WIN32
960 struct timeval tv = {0, 1000000};
961
962 tvp = &tv;
963 #else
964 tvp = NULL;
965 #endif
966
967 *workerset = saveSet;
968 i = select(maxFd + 1, workerset, NULL, NULL, tvp);
969
970 #ifdef WIN32
971 if (i == SOCKET_ERROR)
972 {
973 i = -1;
974
975 if (WSAGetLastError() == WSAEINTR)
976 errno = EINTR;
977 }
978 #endif
979
980 if (i < 0 && errno == EINTR)
981 continue; /* ignore this */
982 if (i < 0 || CancelRequested)
983 *aborting = true; /* but not this */
984 if (i == 0)
985 continue; /* timeout (Win32 only) */
986 break;
987 }
988
989 return i;
990 }
991
992 static void
init_slot(ParallelSlot * slot,PGconn * conn)993 init_slot(ParallelSlot *slot, PGconn *conn)
994 {
995 slot->connection = conn;
996 /* Initially assume connection is idle */
997 slot->isFree = true;
998 }
999
1000 static void
help(const char * progname)1001 help(const char *progname)
1002 {
1003 printf(_("%s cleans and analyzes a PostgreSQL database.\n\n"), progname);
1004 printf(_("Usage:\n"));
1005 printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1006 printf(_("\nOptions:\n"));
1007 printf(_(" -a, --all vacuum all databases\n"));
1008 printf(_(" -d, --dbname=DBNAME database to vacuum\n"));
1009 printf(_(" -e, --echo show the commands being sent to the server\n"));
1010 printf(_(" -f, --full do full vacuuming\n"));
1011 printf(_(" -F, --freeze freeze row transaction information\n"));
1012 printf(_(" -j, --jobs=NUM use this many concurrent connections to vacuum\n"));
1013 printf(_(" -q, --quiet don't write any messages\n"));
1014 printf(_(" -t, --table='TABLE[(COLUMNS)]' vacuum specific table(s) only\n"));
1015 printf(_(" -v, --verbose write a lot of output\n"));
1016 printf(_(" -V, --version output version information, then exit\n"));
1017 printf(_(" -z, --analyze update optimizer statistics\n"));
1018 printf(_(" -Z, --analyze-only only update optimizer statistics; no vacuum\n"));
1019 printf(_(" --analyze-in-stages only update optimizer statistics, in multiple\n"
1020 " stages for faster results; no vacuum\n"));
1021 printf(_(" -?, --help show this help, then exit\n"));
1022 printf(_("\nConnection options:\n"));
1023 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1024 printf(_(" -p, --port=PORT database server port\n"));
1025 printf(_(" -U, --username=USERNAME user name to connect as\n"));
1026 printf(_(" -w, --no-password never prompt for password\n"));
1027 printf(_(" -W, --password force password prompt\n"));
1028 printf(_(" --maintenance-db=DBNAME alternate maintenance database\n"));
1029 printf(_("\nRead the description of the SQL command VACUUM for details.\n"));
1030 printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
1031 }
1032