1 /*-------------------------------------------------------------------------
2 *
3 * vacuumdb
4 *
5 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
6 * Portions Copyright (c) 1994, Regents of the University of California
7 *
8 * src/bin/scripts/vacuumdb.c
9 *
10 *-------------------------------------------------------------------------
11 */
12
13 #ifdef WIN32
14 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
15 #endif
16
17 #include "postgres_fe.h"
18
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22
23 #include "catalog/pg_class_d.h"
24
25 #include "common.h"
26 #include "fe_utils/simple_list.h"
27 #include "fe_utils/string_utils.h"
28
29
30 #define ERRCODE_UNDEFINED_TABLE "42P01"
31
32 /* Parallel vacuuming stuff */
33 typedef struct ParallelSlot
34 {
35 PGconn *connection; /* One connection */
36 bool isFree; /* Is it known to be idle? */
37 } ParallelSlot;
38
39 /* vacuum options controlled by user flags */
40 typedef struct vacuumingOptions
41 {
42 bool analyze_only;
43 bool verbose;
44 bool and_analyze;
45 bool full;
46 bool freeze;
47 } vacuumingOptions;
48
49
50 static void vacuum_one_database(const ConnParams *cparams,
51 vacuumingOptions *vacopts,
52 int stage,
53 SimpleStringList *tables,
54 int concurrentCons,
55 const char *progname, bool echo, bool quiet);
56
57 static void vacuum_all_databases(ConnParams *cparams,
58 vacuumingOptions *vacopts,
59 bool analyze_in_stages,
60 int concurrentCons,
61 const char *progname, bool echo, bool quiet);
62
63 static void prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
64 vacuumingOptions *vacopts, const char *table,
65 bool table_pre_qualified,
66 const char *progname, bool echo);
67
68 static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
69 const char *table, const char *progname, bool async);
70
71 static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
72 const char *progname);
73
74 static bool ProcessQueryResult(PGconn *conn, PGresult *result,
75 const char *progname);
76
77 static bool GetQueryResult(PGconn *conn, const char *progname);
78
79 static void DisconnectDatabase(ParallelSlot *slot);
80
81 static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
82
83 static void init_slot(ParallelSlot *slot, PGconn *conn);
84
85 static void help(const char *progname);
86
87 /* For analyze-in-stages mode */
88 #define ANALYZE_NO_STAGE -1
89 #define ANALYZE_NUM_STAGES 3
90
91
92 int
main(int argc,char * argv[])93 main(int argc, char *argv[])
94 {
95 static struct option long_options[] = {
96 {"host", required_argument, NULL, 'h'},
97 {"port", required_argument, NULL, 'p'},
98 {"username", required_argument, NULL, 'U'},
99 {"no-password", no_argument, NULL, 'w'},
100 {"password", no_argument, NULL, 'W'},
101 {"echo", no_argument, NULL, 'e'},
102 {"quiet", no_argument, NULL, 'q'},
103 {"dbname", required_argument, NULL, 'd'},
104 {"analyze", no_argument, NULL, 'z'},
105 {"analyze-only", no_argument, NULL, 'Z'},
106 {"freeze", no_argument, NULL, 'F'},
107 {"all", no_argument, NULL, 'a'},
108 {"table", required_argument, NULL, 't'},
109 {"full", no_argument, NULL, 'f'},
110 {"verbose", no_argument, NULL, 'v'},
111 {"jobs", required_argument, NULL, 'j'},
112 {"maintenance-db", required_argument, NULL, 2},
113 {"analyze-in-stages", no_argument, NULL, 3},
114 {NULL, 0, NULL, 0}
115 };
116
117 const char *progname;
118 int optindex;
119 int c;
120 const char *dbname = NULL;
121 const char *maintenance_db = NULL;
122 char *host = NULL;
123 char *port = NULL;
124 char *username = NULL;
125 enum trivalue prompt_password = TRI_DEFAULT;
126 ConnParams cparams;
127 bool echo = false;
128 bool quiet = false;
129 vacuumingOptions vacopts;
130 bool analyze_in_stages = false;
131 bool alldb = false;
132 SimpleStringList tables = {NULL, NULL};
133 int concurrentCons = 1;
134 int tbl_count = 0;
135
136 /* initialize options to all false */
137 memset(&vacopts, 0, sizeof(vacopts));
138
139 progname = get_progname(argv[0]);
140
141 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
142
143 handle_help_version_opts(argc, argv, "vacuumdb", help);
144
145 while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
146 {
147 switch (c)
148 {
149 case 'h':
150 host = pg_strdup(optarg);
151 break;
152 case 'p':
153 port = pg_strdup(optarg);
154 break;
155 case 'U':
156 username = pg_strdup(optarg);
157 break;
158 case 'w':
159 prompt_password = TRI_NO;
160 break;
161 case 'W':
162 prompt_password = TRI_YES;
163 break;
164 case 'e':
165 echo = true;
166 break;
167 case 'q':
168 quiet = true;
169 break;
170 case 'd':
171 dbname = pg_strdup(optarg);
172 break;
173 case 'z':
174 vacopts.and_analyze = true;
175 break;
176 case 'Z':
177 vacopts.analyze_only = true;
178 break;
179 case 'F':
180 vacopts.freeze = true;
181 break;
182 case 'a':
183 alldb = true;
184 break;
185 case 't':
186 {
187 simple_string_list_append(&tables, optarg);
188 tbl_count++;
189 break;
190 }
191 case 'f':
192 vacopts.full = true;
193 break;
194 case 'v':
195 vacopts.verbose = true;
196 break;
197 case 'j':
198 concurrentCons = atoi(optarg);
199 if (concurrentCons <= 0)
200 {
201 fprintf(stderr, _("%s: number of parallel jobs must be at least 1\n"),
202 progname);
203 exit(1);
204 }
205 break;
206 case 2:
207 maintenance_db = pg_strdup(optarg);
208 break;
209 case 3:
210 analyze_in_stages = vacopts.analyze_only = true;
211 break;
212 default:
213 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
214 exit(1);
215 }
216 }
217
218 /*
219 * Non-option argument specifies database name as long as it wasn't
220 * already specified with -d / --dbname
221 */
222 if (optind < argc && dbname == NULL)
223 {
224 dbname = argv[optind];
225 optind++;
226 }
227
228 if (optind < argc)
229 {
230 fprintf(stderr, _("%s: too many command-line arguments (first is \"%s\")\n"),
231 progname, argv[optind]);
232 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
233 exit(1);
234 }
235
236 if (vacopts.analyze_only)
237 {
238 if (vacopts.full)
239 {
240 fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
241 progname, "full");
242 exit(1);
243 }
244 if (vacopts.freeze)
245 {
246 fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
247 progname, "freeze");
248 exit(1);
249 }
250 /* allow 'and_analyze' with 'analyze_only' */
251 }
252
253 /* fill cparams except for dbname, which is set below */
254 cparams.pghost = host;
255 cparams.pgport = port;
256 cparams.pguser = username;
257 cparams.prompt_password = prompt_password;
258 cparams.override_dbname = NULL;
259
260 setup_cancel_handler();
261
262 /* Avoid opening extra connections. */
263 if (tbl_count && (concurrentCons > tbl_count))
264 concurrentCons = tbl_count;
265
266 if (alldb)
267 {
268 if (dbname)
269 {
270 fprintf(stderr, _("%s: cannot vacuum all databases and a specific one at the same time\n"),
271 progname);
272 exit(1);
273 }
274 if (tables.head != NULL)
275 {
276 fprintf(stderr, _("%s: cannot vacuum specific table(s) in all databases\n"),
277 progname);
278 exit(1);
279 }
280
281 cparams.dbname = maintenance_db;
282
283 vacuum_all_databases(&cparams, &vacopts,
284 analyze_in_stages,
285 concurrentCons,
286 progname, echo, quiet);
287 }
288 else
289 {
290 if (dbname == NULL)
291 {
292 if (getenv("PGDATABASE"))
293 dbname = getenv("PGDATABASE");
294 else if (getenv("PGUSER"))
295 dbname = getenv("PGUSER");
296 else
297 dbname = get_user_name_or_exit(progname);
298 }
299
300 cparams.dbname = dbname;
301
302 if (analyze_in_stages)
303 {
304 int stage;
305
306 for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
307 {
308 vacuum_one_database(&cparams, &vacopts,
309 stage,
310 &tables,
311 concurrentCons,
312 progname, echo, quiet);
313 }
314 }
315 else
316 vacuum_one_database(&cparams, &vacopts,
317 ANALYZE_NO_STAGE,
318 &tables,
319 concurrentCons,
320 progname, echo, quiet);
321 }
322
323 exit(0);
324 }
325
326 /*
327 * vacuum_one_database
328 *
329 * Process tables in the given database. If the 'tables' list is empty,
330 * process all tables in the database.
331 *
332 * Note that this function is only concerned with running exactly one stage
333 * when in analyze-in-stages mode; caller must iterate on us if necessary.
334 *
335 * If concurrentCons is > 1, multiple connections are used to vacuum tables
336 * in parallel. In this case and if the table list is empty, we first obtain
337 * a list of tables from the database.
338 */
339 static void
vacuum_one_database(const ConnParams * cparams,vacuumingOptions * vacopts,int stage,SimpleStringList * tables,int concurrentCons,const char * progname,bool echo,bool quiet)340 vacuum_one_database(const ConnParams *cparams,
341 vacuumingOptions *vacopts,
342 int stage,
343 SimpleStringList *tables,
344 int concurrentCons,
345 const char *progname, bool echo, bool quiet)
346 {
347 PQExpBufferData sql;
348 PGconn *conn;
349 SimpleStringListCell *cell;
350 ParallelSlot *slots;
351 SimpleStringList dbtables = {NULL, NULL};
352 int i;
353 bool failed = false;
354 bool parallel = concurrentCons > 1;
355 const char *stage_commands[] = {
356 "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
357 "SET default_statistics_target=10; RESET vacuum_cost_delay;",
358 "RESET default_statistics_target;"
359 };
360 const char *stage_messages[] = {
361 gettext_noop("Generating minimal optimizer statistics (1 target)"),
362 gettext_noop("Generating medium optimizer statistics (10 targets)"),
363 gettext_noop("Generating default (full) optimizer statistics")
364 };
365
366 Assert(stage == ANALYZE_NO_STAGE ||
367 (stage >= 0 && stage < ANALYZE_NUM_STAGES));
368
369 conn = connectDatabase(cparams, progname, echo, false, true);
370
371 if (!quiet)
372 {
373 if (stage != ANALYZE_NO_STAGE)
374 printf(_("%s: processing database \"%s\": %s\n"),
375 progname, PQdb(conn), _(stage_messages[stage]));
376 else
377 printf(_("%s: vacuuming database \"%s\"\n"),
378 progname, PQdb(conn));
379 fflush(stdout);
380 }
381
382 initPQExpBuffer(&sql);
383
384 /*
385 * If a table list is not provided and we're using multiple connections,
386 * prepare the list of tables by querying the catalogs.
387 */
388 if (parallel && (!tables || !tables->head))
389 {
390 PQExpBufferData buf;
391 PGresult *res;
392 int ntups;
393
394 initPQExpBuffer(&buf);
395
396 res = executeQuery(conn,
397 "SELECT c.relname, ns.nspname"
398 " FROM pg_class c, pg_namespace ns\n"
399 " WHERE relkind IN ("
400 CppAsString2(RELKIND_RELATION) ", "
401 CppAsString2(RELKIND_MATVIEW) ")"
402 " AND c.relnamespace = ns.oid\n"
403 " ORDER BY c.relpages DESC;",
404 progname, echo);
405
406 ntups = PQntuples(res);
407 for (i = 0; i < ntups; i++)
408 {
409 appendPQExpBufferStr(&buf,
410 fmtQualifiedId(PQgetvalue(res, i, 1),
411 PQgetvalue(res, i, 0)));
412
413 simple_string_list_append(&dbtables, buf.data);
414 resetPQExpBuffer(&buf);
415 }
416
417 termPQExpBuffer(&buf);
418 tables = &dbtables;
419
420 /*
421 * If there are more connections than vacuumable relations, we don't
422 * need to use them all.
423 */
424 if (concurrentCons > ntups)
425 concurrentCons = ntups;
426 if (concurrentCons <= 1)
427 parallel = false;
428 PQclear(res);
429 }
430
431 /*
432 * Setup the database connections. We reuse the connection we already have
433 * for the first slot. If not in parallel mode, the first slot in the
434 * array contains the connection.
435 */
436 if (concurrentCons <= 0)
437 concurrentCons = 1;
438 slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
439 init_slot(slots, conn);
440 if (parallel)
441 {
442 for (i = 1; i < concurrentCons; i++)
443 {
444 conn = connectDatabase(cparams, progname, echo, false, true);
445
446 /*
447 * Fail and exit immediately if trying to use a socket in an
448 * unsupported range. POSIX requires open(2) to use the lowest
449 * unused file descriptor and the hint given relies on that.
450 */
451 if (PQsocket(conn) >= FD_SETSIZE)
452 {
453 fprintf(stderr,
454 _("%s: too many jobs for this platform -- try %d\n"),
455 progname, i);
456 exit(1);
457 }
458
459 init_slot(slots + i, conn);
460 }
461 }
462
463 /*
464 * Prepare all the connections to run the appropriate analyze stage, if
465 * caller requested that mode.
466 */
467 if (stage != ANALYZE_NO_STAGE)
468 {
469 int j;
470
471 /* We already emitted the message above */
472
473 for (j = 0; j < concurrentCons; j++)
474 executeCommand((slots + j)->connection,
475 stage_commands[stage], progname, echo);
476 }
477
478 cell = tables ? tables->head : NULL;
479 do
480 {
481 const char *tabname = cell ? cell->val : NULL;
482 ParallelSlot *free_slot;
483
484 if (CancelRequested)
485 {
486 failed = true;
487 goto finish;
488 }
489
490 /*
491 * Get the connection slot to use. If in parallel mode, here we wait
492 * for one connection to become available if none already is. In
493 * non-parallel mode we simply use the only slot we have, which we
494 * know to be free.
495 */
496 if (parallel)
497 {
498 /*
499 * Get a free slot, waiting until one becomes free if none
500 * currently is.
501 */
502 free_slot = GetIdleSlot(slots, concurrentCons, progname);
503 if (!free_slot)
504 {
505 failed = true;
506 goto finish;
507 }
508
509 free_slot->isFree = false;
510 }
511 else
512 free_slot = slots;
513
514 /*
515 * Prepare the vacuum command. Note that in some cases this requires
516 * query execution, so be sure to use the free connection.
517 */
518 prepare_vacuum_command(&sql, free_slot->connection, vacopts, tabname,
519 tables == &dbtables, progname, echo);
520
521 /*
522 * Execute the vacuum. If not in parallel mode, this terminates the
523 * program in case of an error. (The parallel case handles query
524 * errors in ProcessQueryResult through GetIdleSlot.)
525 */
526 run_vacuum_command(free_slot->connection, sql.data,
527 echo, tabname, progname, parallel);
528
529 if (cell)
530 cell = cell->next;
531 } while (cell != NULL);
532
533 if (parallel)
534 {
535 int j;
536
537 /* wait for all connections to finish */
538 for (j = 0; j < concurrentCons; j++)
539 {
540 if (!GetQueryResult((slots + j)->connection, progname))
541 {
542 failed = true;
543 goto finish;
544 }
545 }
546 }
547
548 finish:
549 for (i = 0; i < concurrentCons; i++)
550 DisconnectDatabase(slots + i);
551 pfree(slots);
552
553 termPQExpBuffer(&sql);
554
555 if (failed)
556 exit(1);
557 }
558
559 /*
560 * Vacuum/analyze all connectable databases.
561 *
562 * In analyze-in-stages mode, we process all databases in one stage before
563 * moving on to the next stage. That ensure minimal stats are available
564 * quickly everywhere before generating more detailed ones.
565 */
566 static void
vacuum_all_databases(ConnParams * cparams,vacuumingOptions * vacopts,bool analyze_in_stages,int concurrentCons,const char * progname,bool echo,bool quiet)567 vacuum_all_databases(ConnParams *cparams,
568 vacuumingOptions *vacopts,
569 bool analyze_in_stages,
570 int concurrentCons,
571 const char *progname, bool echo, bool quiet)
572 {
573 PGconn *conn;
574 PGresult *result;
575 int stage;
576 int i;
577
578 conn = connectMaintenanceDatabase(cparams, progname, echo);
579 result = executeQuery(conn,
580 "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
581 progname, echo);
582 PQfinish(conn);
583
584 if (analyze_in_stages)
585 {
586 /*
587 * When analyzing all databases in stages, we analyze them all in the
588 * fastest stage first, so that initial statistics become available
589 * for all of them as soon as possible.
590 *
591 * This means we establish several times as many connections, but
592 * that's a secondary consideration.
593 */
594 for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
595 {
596 for (i = 0; i < PQntuples(result); i++)
597 {
598 cparams->override_dbname = PQgetvalue(result, i, 0);
599
600 vacuum_one_database(cparams, vacopts,
601 stage,
602 NULL,
603 concurrentCons,
604 progname, echo, quiet);
605 }
606 }
607 }
608 else
609 {
610 for (i = 0; i < PQntuples(result); i++)
611 {
612 cparams->override_dbname = PQgetvalue(result, i, 0);
613
614 vacuum_one_database(cparams, vacopts,
615 ANALYZE_NO_STAGE,
616 NULL,
617 concurrentCons,
618 progname, echo, quiet);
619 }
620 }
621
622 PQclear(result);
623 }
624
625 /*
626 * Construct a vacuum/analyze command to run based on the given options, in the
627 * given string buffer, which may contain previous garbage.
628 *
629 * An optional table name can be passed; this must be already be properly
630 * quoted. The command is semicolon-terminated.
631 */
632 static void
prepare_vacuum_command(PQExpBuffer sql,PGconn * conn,vacuumingOptions * vacopts,const char * table,bool table_pre_qualified,const char * progname,bool echo)633 prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
634 vacuumingOptions *vacopts, const char *table,
635 bool table_pre_qualified,
636 const char *progname, bool echo)
637 {
638 resetPQExpBuffer(sql);
639
640 if (vacopts->analyze_only)
641 {
642 appendPQExpBufferStr(sql, "ANALYZE");
643 if (vacopts->verbose)
644 appendPQExpBufferStr(sql, " VERBOSE");
645 }
646 else
647 {
648 appendPQExpBufferStr(sql, "VACUUM");
649 if (PQserverVersion(conn) >= 90000)
650 {
651 const char *paren = " (";
652 const char *comma = ", ";
653 const char *sep = paren;
654
655 if (vacopts->full)
656 {
657 appendPQExpBuffer(sql, "%sFULL", sep);
658 sep = comma;
659 }
660 if (vacopts->freeze)
661 {
662 appendPQExpBuffer(sql, "%sFREEZE", sep);
663 sep = comma;
664 }
665 if (vacopts->verbose)
666 {
667 appendPQExpBuffer(sql, "%sVERBOSE", sep);
668 sep = comma;
669 }
670 if (vacopts->and_analyze)
671 {
672 appendPQExpBuffer(sql, "%sANALYZE", sep);
673 sep = comma;
674 }
675 if (sep != paren)
676 appendPQExpBufferChar(sql, ')');
677 }
678 else
679 {
680 if (vacopts->full)
681 appendPQExpBufferStr(sql, " FULL");
682 if (vacopts->freeze)
683 appendPQExpBufferStr(sql, " FREEZE");
684 if (vacopts->verbose)
685 appendPQExpBufferStr(sql, " VERBOSE");
686 if (vacopts->and_analyze)
687 appendPQExpBufferStr(sql, " ANALYZE");
688 }
689 }
690
691 if (table)
692 {
693 appendPQExpBufferChar(sql, ' ');
694 if (table_pre_qualified)
695 appendPQExpBufferStr(sql, table);
696 else
697 appendQualifiedRelation(sql, table, conn, progname, echo);
698 }
699 appendPQExpBufferChar(sql, ';');
700 }
701
702 /*
703 * Send a vacuum/analyze command to the server. In async mode, return after
704 * sending the command; else, wait for it to finish.
705 *
706 * Any errors during command execution are reported to stderr. If async is
707 * false, this function exits the program after reporting the error.
708 */
709 static void
run_vacuum_command(PGconn * conn,const char * sql,bool echo,const char * table,const char * progname,bool async)710 run_vacuum_command(PGconn *conn, const char *sql, bool echo,
711 const char *table, const char *progname, bool async)
712 {
713 bool status;
714
715 if (async)
716 {
717 if (echo)
718 printf("%s\n", sql);
719
720 status = PQsendQuery(conn, sql) == 1;
721 }
722 else
723 status = executeMaintenanceCommand(conn, sql, echo);
724
725 if (!status)
726 {
727 if (table)
728 fprintf(stderr,
729 _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
730 progname, table, PQdb(conn), PQerrorMessage(conn));
731 else
732 fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
733 progname, PQdb(conn), PQerrorMessage(conn));
734
735 if (!async)
736 {
737 PQfinish(conn);
738 exit(1);
739 }
740 }
741 }
742
743 /*
744 * GetIdleSlot
745 * Return a connection slot that is ready to execute a command.
746 *
747 * We return the first slot we find that is marked isFree, if one is;
748 * otherwise, we loop on select() until one socket becomes available. When
749 * this happens, we read the whole set and mark as free all sockets that become
750 * available.
751 *
752 * If an error occurs, NULL is returned.
753 */
754 static ParallelSlot *
GetIdleSlot(ParallelSlot slots[],int numslots,const char * progname)755 GetIdleSlot(ParallelSlot slots[], int numslots,
756 const char *progname)
757 {
758 int i;
759 int firstFree = -1;
760
761 /* Any connection already known free? */
762 for (i = 0; i < numslots; i++)
763 {
764 if (slots[i].isFree)
765 return slots + i;
766 }
767
768 /*
769 * No free slot found, so wait until one of the connections has finished
770 * its task and return the available slot.
771 */
772 while (firstFree < 0)
773 {
774 fd_set slotset;
775 int maxFd = 0;
776 bool aborting;
777
778 /* We must reconstruct the fd_set for each call to select_loop */
779 FD_ZERO(&slotset);
780
781 for (i = 0; i < numslots; i++)
782 {
783 int sock = PQsocket(slots[i].connection);
784
785 /*
786 * We don't really expect any connections to lose their sockets
787 * after startup, but just in case, cope by ignoring them.
788 */
789 if (sock < 0)
790 continue;
791
792 FD_SET(sock, &slotset);
793 if (sock > maxFd)
794 maxFd = sock;
795 }
796
797 SetCancelConn(slots->connection);
798 i = select_loop(maxFd, &slotset, &aborting);
799 ResetCancelConn();
800
801 if (aborting)
802 {
803 /*
804 * We set the cancel-receiving connection to the one in the zeroth
805 * slot above, so fetch the error from there.
806 */
807 GetQueryResult(slots->connection, progname);
808 return NULL;
809 }
810 Assert(i != 0);
811
812 for (i = 0; i < numslots; i++)
813 {
814 int sock = PQsocket(slots[i].connection);
815
816 if (sock >= 0 && FD_ISSET(sock, &slotset))
817 {
818 /* select() says input is available, so consume it */
819 PQconsumeInput(slots[i].connection);
820 }
821
822 /* Collect result(s) as long as any are available */
823 while (!PQisBusy(slots[i].connection))
824 {
825 PGresult *result = PQgetResult(slots[i].connection);
826
827 if (result != NULL)
828 {
829 /* Check and discard the command result */
830 if (!ProcessQueryResult(slots[i].connection, result,
831 progname))
832 return NULL;
833 }
834 else
835 {
836 /* This connection has become idle */
837 slots[i].isFree = true;
838 if (firstFree < 0)
839 firstFree = i;
840 break;
841 }
842 }
843 }
844 }
845
846 return slots + firstFree;
847 }
848
849 /*
850 * ProcessQueryResult
851 *
852 * Process (and delete) a query result. Returns true if there's no error,
853 * false otherwise -- but errors about trying to vacuum a missing relation
854 * are reported and subsequently ignored.
855 */
856 static bool
ProcessQueryResult(PGconn * conn,PGresult * result,const char * progname)857 ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
858 {
859 /*
860 * If it's an error, report it. Errors about a missing table are harmless
861 * so we continue processing; but die for other errors.
862 */
863 if (PQresultStatus(result) != PGRES_COMMAND_OK)
864 {
865 char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
866
867 fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
868 progname, PQdb(conn), PQerrorMessage(conn));
869
870 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
871 {
872 PQclear(result);
873 return false;
874 }
875 }
876
877 PQclear(result);
878 return true;
879 }
880
881 /*
882 * GetQueryResult
883 *
884 * Pump the conn till it's dry of results; return false if any are errors.
885 * Note that this will block if the conn is busy.
886 */
887 static bool
GetQueryResult(PGconn * conn,const char * progname)888 GetQueryResult(PGconn *conn, const char *progname)
889 {
890 bool ok = true;
891 PGresult *result;
892
893 SetCancelConn(conn);
894 while ((result = PQgetResult(conn)) != NULL)
895 {
896 if (!ProcessQueryResult(conn, result, progname))
897 ok = false;
898 }
899 ResetCancelConn();
900 return ok;
901 }
902
903 /*
904 * DisconnectDatabase
905 * Disconnect the connection associated with the given slot
906 */
907 static void
DisconnectDatabase(ParallelSlot * slot)908 DisconnectDatabase(ParallelSlot *slot)
909 {
910 char errbuf[256];
911
912 if (!slot->connection)
913 return;
914
915 if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
916 {
917 PGcancel *cancel;
918
919 if ((cancel = PQgetCancel(slot->connection)))
920 {
921 (void) PQcancel(cancel, errbuf, sizeof(errbuf));
922 PQfreeCancel(cancel);
923 }
924 }
925
926 PQfinish(slot->connection);
927 slot->connection = NULL;
928 }
929
930 /*
931 * Loop on select() until a descriptor from the given set becomes readable.
932 *
933 * If we get a cancel request while we're waiting, we forego all further
934 * processing and set the *aborting flag to true. The return value must be
935 * ignored in this case. Otherwise, *aborting is set to false.
936 */
937 static int
select_loop(int maxFd,fd_set * workerset,bool * aborting)938 select_loop(int maxFd, fd_set *workerset, bool *aborting)
939 {
940 int i;
941 fd_set saveSet = *workerset;
942
943 if (CancelRequested)
944 {
945 *aborting = true;
946 return -1;
947 }
948 else
949 *aborting = false;
950
951 for (;;)
952 {
953 /*
954 * On Windows, we need to check once in a while for cancel requests;
955 * on other platforms we rely on select() returning when interrupted.
956 */
957 struct timeval *tvp;
958 #ifdef WIN32
959 struct timeval tv = {0, 1000000};
960
961 tvp = &tv;
962 #else
963 tvp = NULL;
964 #endif
965
966 *workerset = saveSet;
967 i = select(maxFd + 1, workerset, NULL, NULL, tvp);
968
969 #ifdef WIN32
970 if (i == SOCKET_ERROR)
971 {
972 i = -1;
973
974 if (WSAGetLastError() == WSAEINTR)
975 errno = EINTR;
976 }
977 #endif
978
979 if (i < 0 && errno == EINTR)
980 continue; /* ignore this */
981 if (i < 0 || CancelRequested)
982 *aborting = true; /* but not this */
983 if (i == 0)
984 continue; /* timeout (Win32 only) */
985 break;
986 }
987
988 return i;
989 }
990
991 static void
init_slot(ParallelSlot * slot,PGconn * conn)992 init_slot(ParallelSlot *slot, PGconn *conn)
993 {
994 slot->connection = conn;
995 /* Initially assume connection is idle */
996 slot->isFree = true;
997 }
998
999 static void
help(const char * progname)1000 help(const char *progname)
1001 {
1002 printf(_("%s cleans and analyzes a PostgreSQL database.\n\n"), progname);
1003 printf(_("Usage:\n"));
1004 printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1005 printf(_("\nOptions:\n"));
1006 printf(_(" -a, --all vacuum all databases\n"));
1007 printf(_(" -d, --dbname=DBNAME database to vacuum\n"));
1008 printf(_(" -e, --echo show the commands being sent to the server\n"));
1009 printf(_(" -f, --full do full vacuuming\n"));
1010 printf(_(" -F, --freeze freeze row transaction information\n"));
1011 printf(_(" -j, --jobs=NUM use this many concurrent connections to vacuum\n"));
1012 printf(_(" -q, --quiet don't write any messages\n"));
1013 printf(_(" -t, --table='TABLE[(COLUMNS)]' vacuum specific table(s) only\n"));
1014 printf(_(" -v, --verbose write a lot of output\n"));
1015 printf(_(" -V, --version output version information, then exit\n"));
1016 printf(_(" -z, --analyze update optimizer statistics\n"));
1017 printf(_(" -Z, --analyze-only only update optimizer statistics; no vacuum\n"));
1018 printf(_(" --analyze-in-stages only update optimizer statistics, in multiple\n"
1019 " stages for faster results; no vacuum\n"));
1020 printf(_(" -?, --help show this help, then exit\n"));
1021 printf(_("\nConnection options:\n"));
1022 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1023 printf(_(" -p, --port=PORT database server port\n"));
1024 printf(_(" -U, --username=USERNAME user name to connect as\n"));
1025 printf(_(" -w, --no-password never prompt for password\n"));
1026 printf(_(" -W, --password force password prompt\n"));
1027 printf(_(" --maintenance-db=DBNAME alternate maintenance database\n"));
1028 printf(_("\nRead the description of the SQL command VACUUM for details.\n"));
1029 printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
1030 }
1031