1 /*-------------------------------------------------------------------------
2  *
3  * vacuumdb
4  *
5  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
6  * Portions Copyright (c) 1994, Regents of the University of California
7  *
8  * src/bin/scripts/vacuumdb.c
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 #ifdef WIN32
14 #define FD_SETSIZE 1024			/* must set before winsock2.h is included */
15 #endif
16 
17 #include "postgres_fe.h"
18 
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 #include "catalog/pg_class.h"
24 
25 #include "common.h"
26 #include "fe_utils/simple_list.h"
27 #include "fe_utils/string_utils.h"
28 
29 
30 #define ERRCODE_UNDEFINED_TABLE  "42P01"
31 
32 /* Parallel vacuuming stuff */
33 typedef struct ParallelSlot
34 {
35 	PGconn	   *connection;		/* One connection */
36 	bool		isFree;			/* Is it known to be idle? */
37 } ParallelSlot;
38 
39 /* vacuum options controlled by user flags */
40 typedef struct vacuumingOptions
41 {
42 	bool		analyze_only;
43 	bool		verbose;
44 	bool		and_analyze;
45 	bool		full;
46 	bool		freeze;
47 } vacuumingOptions;
48 
49 
50 static void vacuum_one_database(const ConnParams *cparams,
51 								vacuumingOptions *vacopts,
52 					int stage,
53 					SimpleStringList *tables,
54 					int concurrentCons,
55 					const char *progname, bool echo, bool quiet);
56 
57 static void vacuum_all_databases(ConnParams *cparams,
58 								 vacuumingOptions *vacopts,
59 					 bool analyze_in_stages,
60 					 int concurrentCons,
61 					 const char *progname, bool echo, bool quiet);
62 
63 static void prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
64 					   vacuumingOptions *vacopts, const char *table,
65 					   bool table_pre_qualified,
66 					   const char *progname, bool echo);
67 
68 static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
69 				   const char *table, const char *progname, bool async);
70 
71 static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
72 			const char *progname);
73 
74 static bool ProcessQueryResult(PGconn *conn, PGresult *result,
75 				   const char *progname);
76 
77 static bool GetQueryResult(PGconn *conn, const char *progname);
78 
79 static void DisconnectDatabase(ParallelSlot *slot);
80 
81 static int	select_loop(int maxFd, fd_set *workerset, bool *aborting);
82 
83 static void init_slot(ParallelSlot *slot, PGconn *conn);
84 
85 static void help(const char *progname);
86 
87 /* For analyze-in-stages mode */
88 #define ANALYZE_NO_STAGE	-1
89 #define ANALYZE_NUM_STAGES	3
90 
91 
92 int
main(int argc,char * argv[])93 main(int argc, char *argv[])
94 {
95 	static struct option long_options[] = {
96 		{"host", required_argument, NULL, 'h'},
97 		{"port", required_argument, NULL, 'p'},
98 		{"username", required_argument, NULL, 'U'},
99 		{"no-password", no_argument, NULL, 'w'},
100 		{"password", no_argument, NULL, 'W'},
101 		{"echo", no_argument, NULL, 'e'},
102 		{"quiet", no_argument, NULL, 'q'},
103 		{"dbname", required_argument, NULL, 'd'},
104 		{"analyze", no_argument, NULL, 'z'},
105 		{"analyze-only", no_argument, NULL, 'Z'},
106 		{"freeze", no_argument, NULL, 'F'},
107 		{"all", no_argument, NULL, 'a'},
108 		{"table", required_argument, NULL, 't'},
109 		{"full", no_argument, NULL, 'f'},
110 		{"verbose", no_argument, NULL, 'v'},
111 		{"jobs", required_argument, NULL, 'j'},
112 		{"maintenance-db", required_argument, NULL, 2},
113 		{"analyze-in-stages", no_argument, NULL, 3},
114 		{NULL, 0, NULL, 0}
115 	};
116 
117 	const char *progname;
118 	int			optindex;
119 	int			c;
120 	const char *dbname = NULL;
121 	const char *maintenance_db = NULL;
122 	char	   *host = NULL;
123 	char	   *port = NULL;
124 	char	   *username = NULL;
125 	enum trivalue prompt_password = TRI_DEFAULT;
126 	ConnParams	cparams;
127 	bool		echo = false;
128 	bool		quiet = false;
129 	vacuumingOptions vacopts;
130 	bool		analyze_in_stages = false;
131 	bool		alldb = false;
132 	SimpleStringList tables = {NULL, NULL};
133 	int			concurrentCons = 1;
134 	int			tbl_count = 0;
135 
136 	/* initialize options to all false */
137 	memset(&vacopts, 0, sizeof(vacopts));
138 
139 	progname = get_progname(argv[0]);
140 
141 	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
142 
143 	handle_help_version_opts(argc, argv, "vacuumdb", help);
144 
145 	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
146 	{
147 		switch (c)
148 		{
149 			case 'h':
150 				host = pg_strdup(optarg);
151 				break;
152 			case 'p':
153 				port = pg_strdup(optarg);
154 				break;
155 			case 'U':
156 				username = pg_strdup(optarg);
157 				break;
158 			case 'w':
159 				prompt_password = TRI_NO;
160 				break;
161 			case 'W':
162 				prompt_password = TRI_YES;
163 				break;
164 			case 'e':
165 				echo = true;
166 				break;
167 			case 'q':
168 				quiet = true;
169 				break;
170 			case 'd':
171 				dbname = pg_strdup(optarg);
172 				break;
173 			case 'z':
174 				vacopts.and_analyze = true;
175 				break;
176 			case 'Z':
177 				vacopts.analyze_only = true;
178 				break;
179 			case 'F':
180 				vacopts.freeze = true;
181 				break;
182 			case 'a':
183 				alldb = true;
184 				break;
185 			case 't':
186 				{
187 					simple_string_list_append(&tables, optarg);
188 					tbl_count++;
189 					break;
190 				}
191 			case 'f':
192 				vacopts.full = true;
193 				break;
194 			case 'v':
195 				vacopts.verbose = true;
196 				break;
197 			case 'j':
198 				concurrentCons = atoi(optarg);
199 				if (concurrentCons <= 0)
200 				{
201 					fprintf(stderr, _("%s: number of parallel jobs must be at least 1\n"),
202 							progname);
203 					exit(1);
204 				}
205 				break;
206 			case 2:
207 				maintenance_db = pg_strdup(optarg);
208 				break;
209 			case 3:
210 				analyze_in_stages = vacopts.analyze_only = true;
211 				break;
212 			default:
213 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
214 				exit(1);
215 		}
216 	}
217 
218 	/*
219 	 * Non-option argument specifies database name as long as it wasn't
220 	 * already specified with -d / --dbname
221 	 */
222 	if (optind < argc && dbname == NULL)
223 	{
224 		dbname = argv[optind];
225 		optind++;
226 	}
227 
228 	if (optind < argc)
229 	{
230 		fprintf(stderr, _("%s: too many command-line arguments (first is \"%s\")\n"),
231 				progname, argv[optind]);
232 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
233 		exit(1);
234 	}
235 
236 	if (vacopts.analyze_only)
237 	{
238 		if (vacopts.full)
239 		{
240 			fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
241 					progname, "full");
242 			exit(1);
243 		}
244 		if (vacopts.freeze)
245 		{
246 			fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
247 					progname, "freeze");
248 			exit(1);
249 		}
250 		/* allow 'and_analyze' with 'analyze_only' */
251 	}
252 
253 	/* fill cparams except for dbname, which is set below */
254 	cparams.pghost = host;
255 	cparams.pgport = port;
256 	cparams.pguser = username;
257 	cparams.prompt_password = prompt_password;
258 	cparams.override_dbname = NULL;
259 
260 	setup_cancel_handler();
261 
262 	/* Avoid opening extra connections. */
263 	if (tbl_count && (concurrentCons > tbl_count))
264 		concurrentCons = tbl_count;
265 
266 	if (alldb)
267 	{
268 		if (dbname)
269 		{
270 			fprintf(stderr, _("%s: cannot vacuum all databases and a specific one at the same time\n"),
271 					progname);
272 			exit(1);
273 		}
274 		if (tables.head != NULL)
275 		{
276 			fprintf(stderr, _("%s: cannot vacuum specific table(s) in all databases\n"),
277 					progname);
278 			exit(1);
279 		}
280 
281 		cparams.dbname = maintenance_db;
282 
283 		vacuum_all_databases(&cparams, &vacopts,
284 							 analyze_in_stages,
285 							 concurrentCons,
286 							 progname, echo, quiet);
287 	}
288 	else
289 	{
290 		if (dbname == NULL)
291 		{
292 			if (getenv("PGDATABASE"))
293 				dbname = getenv("PGDATABASE");
294 			else if (getenv("PGUSER"))
295 				dbname = getenv("PGUSER");
296 			else
297 				dbname = get_user_name_or_exit(progname);
298 		}
299 
300 		cparams.dbname = dbname;
301 
302 		if (analyze_in_stages)
303 		{
304 			int			stage;
305 
306 			for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
307 			{
308 				vacuum_one_database(&cparams, &vacopts,
309 									stage,
310 									&tables,
311 									concurrentCons,
312 									progname, echo, quiet);
313 			}
314 		}
315 		else
316 			vacuum_one_database(&cparams, &vacopts,
317 								ANALYZE_NO_STAGE,
318 								&tables,
319 								concurrentCons,
320 								progname, echo, quiet);
321 	}
322 
323 	exit(0);
324 }
325 
326 /*
327  * vacuum_one_database
328  *
329  * Process tables in the given database.  If the 'tables' list is empty,
330  * process all tables in the database.
331  *
332  * Note that this function is only concerned with running exactly one stage
333  * when in analyze-in-stages mode; caller must iterate on us if necessary.
334  *
335  * If concurrentCons is > 1, multiple connections are used to vacuum tables
336  * in parallel.  In this case and if the table list is empty, we first obtain
337  * a list of tables from the database.
338  */
339 static void
vacuum_one_database(const ConnParams * cparams,vacuumingOptions * vacopts,int stage,SimpleStringList * tables,int concurrentCons,const char * progname,bool echo,bool quiet)340 vacuum_one_database(const ConnParams *cparams,
341 					vacuumingOptions *vacopts,
342 					int stage,
343 					SimpleStringList *tables,
344 					int concurrentCons,
345 					const char *progname, bool echo, bool quiet)
346 {
347 	PQExpBufferData sql;
348 	PGconn	   *conn;
349 	SimpleStringListCell *cell;
350 	ParallelSlot *slots;
351 	SimpleStringList dbtables = {NULL, NULL};
352 	int			i;
353 	bool		failed = false;
354 	bool		parallel = concurrentCons > 1;
355 	const char *stage_commands[] = {
356 		"SET default_statistics_target=1; SET vacuum_cost_delay=0;",
357 		"SET default_statistics_target=10; RESET vacuum_cost_delay;",
358 		"RESET default_statistics_target;"
359 	};
360 	const char *stage_messages[] = {
361 		gettext_noop("Generating minimal optimizer statistics (1 target)"),
362 		gettext_noop("Generating medium optimizer statistics (10 targets)"),
363 		gettext_noop("Generating default (full) optimizer statistics")
364 	};
365 
366 	Assert(stage == ANALYZE_NO_STAGE ||
367 		   (stage >= 0 && stage < ANALYZE_NUM_STAGES));
368 
369 	conn = connectDatabase(cparams, progname, echo, false, true);
370 
371 	if (!quiet)
372 	{
373 		if (stage != ANALYZE_NO_STAGE)
374 			printf(_("%s: processing database \"%s\": %s\n"),
375 				   progname, PQdb(conn), _(stage_messages[stage]));
376 		else
377 			printf(_("%s: vacuuming database \"%s\"\n"),
378 				   progname, PQdb(conn));
379 		fflush(stdout);
380 	}
381 
382 	initPQExpBuffer(&sql);
383 
384 	/*
385 	 * If a table list is not provided and we're using multiple connections,
386 	 * prepare the list of tables by querying the catalogs.
387 	 */
388 	if (parallel && (!tables || !tables->head))
389 	{
390 		PQExpBufferData buf;
391 		PGresult   *res;
392 		int			ntups;
393 
394 		initPQExpBuffer(&buf);
395 
396 		res = executeQuery(conn,
397 						   "SELECT c.relname, ns.nspname"
398 						   " FROM pg_class c, pg_namespace ns\n"
399 						   " WHERE relkind IN ("
400 						   CppAsString2(RELKIND_RELATION) ", "
401 						   CppAsString2(RELKIND_MATVIEW) ")"
402 						   " AND c.relnamespace = ns.oid\n"
403 						   " ORDER BY c.relpages DESC;",
404 						   progname, echo);
405 
406 		ntups = PQntuples(res);
407 		for (i = 0; i < ntups; i++)
408 		{
409 			appendPQExpBufferStr(&buf,
410 								 fmtQualifiedId(PQserverVersion(conn),
411 												PQgetvalue(res, i, 1),
412 												PQgetvalue(res, i, 0)));
413 
414 			simple_string_list_append(&dbtables, buf.data);
415 			resetPQExpBuffer(&buf);
416 		}
417 
418 		termPQExpBuffer(&buf);
419 		tables = &dbtables;
420 
421 		/*
422 		 * If there are more connections than vacuumable relations, we don't
423 		 * need to use them all.
424 		 */
425 		if (concurrentCons > ntups)
426 			concurrentCons = ntups;
427 		if (concurrentCons <= 1)
428 			parallel = false;
429 		PQclear(res);
430 	}
431 
432 	/*
433 	 * Setup the database connections. We reuse the connection we already have
434 	 * for the first slot.  If not in parallel mode, the first slot in the
435 	 * array contains the connection.
436 	 */
437 	if (concurrentCons <= 0)
438 		concurrentCons = 1;
439 	slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
440 	init_slot(slots, conn);
441 	if (parallel)
442 	{
443 		for (i = 1; i < concurrentCons; i++)
444 		{
445 			conn = connectDatabase(cparams, progname, echo, false, true);
446 
447 			/*
448 			 * Fail and exit immediately if trying to use a socket in an
449 			 * unsupported range.  POSIX requires open(2) to use the lowest
450 			 * unused file descriptor and the hint given relies on that.
451 			 */
452 			if (PQsocket(conn) >= FD_SETSIZE)
453 			{
454 				fprintf(stderr,
455 						_("%s: too many jobs for this platform -- try %d\n"),
456 						progname, i);
457 				exit(1);
458 			}
459 
460 			init_slot(slots + i, conn);
461 		}
462 	}
463 
464 	/*
465 	 * Prepare all the connections to run the appropriate analyze stage, if
466 	 * caller requested that mode.
467 	 */
468 	if (stage != ANALYZE_NO_STAGE)
469 	{
470 		int			j;
471 
472 		/* We already emitted the message above */
473 
474 		for (j = 0; j < concurrentCons; j++)
475 			executeCommand((slots + j)->connection,
476 						   stage_commands[stage], progname, echo);
477 	}
478 
479 	cell = tables ? tables->head : NULL;
480 	do
481 	{
482 		const char *tabname = cell ? cell->val : NULL;
483 		ParallelSlot *free_slot;
484 
485 		if (CancelRequested)
486 		{
487 			failed = true;
488 			goto finish;
489 		}
490 
491 		/*
492 		 * Get the connection slot to use.  If in parallel mode, here we wait
493 		 * for one connection to become available if none already is.  In
494 		 * non-parallel mode we simply use the only slot we have, which we
495 		 * know to be free.
496 		 */
497 		if (parallel)
498 		{
499 			/*
500 			 * Get a free slot, waiting until one becomes free if none
501 			 * currently is.
502 			 */
503 			free_slot = GetIdleSlot(slots, concurrentCons, progname);
504 			if (!free_slot)
505 			{
506 				failed = true;
507 				goto finish;
508 			}
509 
510 			free_slot->isFree = false;
511 		}
512 		else
513 			free_slot = slots;
514 
515 		/*
516 		 * Prepare the vacuum command.  Note that in some cases this requires
517 		 * query execution, so be sure to use the free connection.
518 		 */
519 		prepare_vacuum_command(&sql, free_slot->connection, vacopts, tabname,
520 							   tables == &dbtables, progname, echo);
521 
522 		/*
523 		 * Execute the vacuum.  If not in parallel mode, this terminates the
524 		 * program in case of an error.  (The parallel case handles query
525 		 * errors in ProcessQueryResult through GetIdleSlot.)
526 		 */
527 		run_vacuum_command(free_slot->connection, sql.data,
528 						   echo, tabname, progname, parallel);
529 
530 		if (cell)
531 			cell = cell->next;
532 	} while (cell != NULL);
533 
534 	if (parallel)
535 	{
536 		int			j;
537 
538 		/* wait for all connections to finish */
539 		for (j = 0; j < concurrentCons; j++)
540 		{
541 			if (!GetQueryResult((slots + j)->connection, progname))
542 			{
543 				failed = true;
544 				goto finish;
545 			}
546 		}
547 	}
548 
549 finish:
550 	for (i = 0; i < concurrentCons; i++)
551 		DisconnectDatabase(slots + i);
552 	pfree(slots);
553 
554 	termPQExpBuffer(&sql);
555 
556 	if (failed)
557 		exit(1);
558 }
559 
560 /*
561  * Vacuum/analyze all connectable databases.
562  *
563  * In analyze-in-stages mode, we process all databases in one stage before
564  * moving on to the next stage.  That ensure minimal stats are available
565  * quickly everywhere before generating more detailed ones.
566  */
567 static void
vacuum_all_databases(ConnParams * cparams,vacuumingOptions * vacopts,bool analyze_in_stages,int concurrentCons,const char * progname,bool echo,bool quiet)568 vacuum_all_databases(ConnParams *cparams,
569 					 vacuumingOptions *vacopts,
570 					 bool analyze_in_stages,
571 					 int concurrentCons,
572 					 const char *progname, bool echo, bool quiet)
573 {
574 	PGconn	   *conn;
575 	PGresult   *result;
576 	int			stage;
577 	int			i;
578 
579 	conn = connectMaintenanceDatabase(cparams, progname, echo);
580 	result = executeQuery(conn,
581 						  "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
582 						  progname, echo);
583 	PQfinish(conn);
584 
585 	if (analyze_in_stages)
586 	{
587 		/*
588 		 * When analyzing all databases in stages, we analyze them all in the
589 		 * fastest stage first, so that initial statistics become available
590 		 * for all of them as soon as possible.
591 		 *
592 		 * This means we establish several times as many connections, but
593 		 * that's a secondary consideration.
594 		 */
595 		for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
596 		{
597 			for (i = 0; i < PQntuples(result); i++)
598 			{
599 				cparams->override_dbname = PQgetvalue(result, i, 0);
600 
601 				vacuum_one_database(cparams, vacopts,
602 									stage,
603 									NULL,
604 									concurrentCons,
605 									progname, echo, quiet);
606 			}
607 		}
608 	}
609 	else
610 	{
611 		for (i = 0; i < PQntuples(result); i++)
612 		{
613 			cparams->override_dbname = PQgetvalue(result, i, 0);
614 
615 			vacuum_one_database(cparams, vacopts,
616 								ANALYZE_NO_STAGE,
617 								NULL,
618 								concurrentCons,
619 								progname, echo, quiet);
620 		}
621 	}
622 
623 	PQclear(result);
624 }
625 
626 /*
627  * Construct a vacuum/analyze command to run based on the given options, in the
628  * given string buffer, which may contain previous garbage.
629  *
630  * An optional table name can be passed; this must be already be properly
631  * quoted.  The command is semicolon-terminated.
632  */
633 static void
prepare_vacuum_command(PQExpBuffer sql,PGconn * conn,vacuumingOptions * vacopts,const char * table,bool table_pre_qualified,const char * progname,bool echo)634 prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
635 					   vacuumingOptions *vacopts, const char *table,
636 					   bool table_pre_qualified,
637 					   const char *progname, bool echo)
638 {
639 	resetPQExpBuffer(sql);
640 
641 	if (vacopts->analyze_only)
642 	{
643 		appendPQExpBufferStr(sql, "ANALYZE");
644 		if (vacopts->verbose)
645 			appendPQExpBufferStr(sql, " VERBOSE");
646 	}
647 	else
648 	{
649 		appendPQExpBufferStr(sql, "VACUUM");
650 		if (PQserverVersion(conn) >= 90000)
651 		{
652 			const char *paren = " (";
653 			const char *comma = ", ";
654 			const char *sep = paren;
655 
656 			if (vacopts->full)
657 			{
658 				appendPQExpBuffer(sql, "%sFULL", sep);
659 				sep = comma;
660 			}
661 			if (vacopts->freeze)
662 			{
663 				appendPQExpBuffer(sql, "%sFREEZE", sep);
664 				sep = comma;
665 			}
666 			if (vacopts->verbose)
667 			{
668 				appendPQExpBuffer(sql, "%sVERBOSE", sep);
669 				sep = comma;
670 			}
671 			if (vacopts->and_analyze)
672 			{
673 				appendPQExpBuffer(sql, "%sANALYZE", sep);
674 				sep = comma;
675 			}
676 			if (sep != paren)
677 				appendPQExpBufferChar(sql, ')');
678 		}
679 		else
680 		{
681 			if (vacopts->full)
682 				appendPQExpBufferStr(sql, " FULL");
683 			if (vacopts->freeze)
684 				appendPQExpBufferStr(sql, " FREEZE");
685 			if (vacopts->verbose)
686 				appendPQExpBufferStr(sql, " VERBOSE");
687 			if (vacopts->and_analyze)
688 				appendPQExpBufferStr(sql, " ANALYZE");
689 		}
690 	}
691 
692 	if (table)
693 	{
694 		appendPQExpBufferChar(sql, ' ');
695 		if (table_pre_qualified)
696 			appendPQExpBufferStr(sql, table);
697 		else
698 			appendQualifiedRelation(sql, table, conn, progname, echo);
699 	}
700 	appendPQExpBufferChar(sql, ';');
701 }
702 
703 /*
704  * Send a vacuum/analyze command to the server.  In async mode, return after
705  * sending the command; else, wait for it to finish.
706  *
707  * Any errors during command execution are reported to stderr.  If async is
708  * false, this function exits the program after reporting the error.
709  */
710 static void
run_vacuum_command(PGconn * conn,const char * sql,bool echo,const char * table,const char * progname,bool async)711 run_vacuum_command(PGconn *conn, const char *sql, bool echo,
712 				   const char *table, const char *progname, bool async)
713 {
714 	bool		status;
715 
716 	if (async)
717 	{
718 		if (echo)
719 			printf("%s\n", sql);
720 
721 		status = PQsendQuery(conn, sql) == 1;
722 	}
723 	else
724 		status = executeMaintenanceCommand(conn, sql, echo);
725 
726 	if (!status)
727 	{
728 		if (table)
729 			fprintf(stderr,
730 					_("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
731 					progname, table, PQdb(conn), PQerrorMessage(conn));
732 		else
733 			fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
734 					progname, PQdb(conn), PQerrorMessage(conn));
735 
736 		if (!async)
737 		{
738 			PQfinish(conn);
739 			exit(1);
740 		}
741 	}
742 }
743 
744 /*
745  * GetIdleSlot
746  *		Return a connection slot that is ready to execute a command.
747  *
748  * We return the first slot we find that is marked isFree, if one is;
749  * otherwise, we loop on select() until one socket becomes available.  When
750  * this happens, we read the whole set and mark as free all sockets that become
751  * available.
752  *
753  * If an error occurs, NULL is returned.
754  */
755 static ParallelSlot *
GetIdleSlot(ParallelSlot slots[],int numslots,const char * progname)756 GetIdleSlot(ParallelSlot slots[], int numslots,
757 			const char *progname)
758 {
759 	int			i;
760 	int			firstFree = -1;
761 
762 	/* Any connection already known free? */
763 	for (i = 0; i < numslots; i++)
764 	{
765 		if (slots[i].isFree)
766 			return slots + i;
767 	}
768 
769 	/*
770 	 * No free slot found, so wait until one of the connections has finished
771 	 * its task and return the available slot.
772 	 */
773 	while (firstFree < 0)
774 	{
775 		fd_set		slotset;
776 		int			maxFd = 0;
777 		bool		aborting;
778 
779 		/* We must reconstruct the fd_set for each call to select_loop */
780 		FD_ZERO(&slotset);
781 
782 		for (i = 0; i < numslots; i++)
783 		{
784 			int			sock = PQsocket(slots[i].connection);
785 
786 			/*
787 			 * We don't really expect any connections to lose their sockets
788 			 * after startup, but just in case, cope by ignoring them.
789 			 */
790 			if (sock < 0)
791 				continue;
792 
793 			FD_SET(sock, &slotset);
794 			if (sock > maxFd)
795 				maxFd = sock;
796 		}
797 
798 		SetCancelConn(slots->connection);
799 		i = select_loop(maxFd, &slotset, &aborting);
800 		ResetCancelConn();
801 
802 		if (aborting)
803 		{
804 			/*
805 			 * We set the cancel-receiving connection to the one in the zeroth
806 			 * slot above, so fetch the error from there.
807 			 */
808 			GetQueryResult(slots->connection, progname);
809 			return NULL;
810 		}
811 		Assert(i != 0);
812 
813 		for (i = 0; i < numslots; i++)
814 		{
815 			int			sock = PQsocket(slots[i].connection);
816 
817 			if (sock >= 0 && FD_ISSET(sock, &slotset))
818 			{
819 				/* select() says input is available, so consume it */
820 				PQconsumeInput(slots[i].connection);
821 			}
822 
823 			/* Collect result(s) as long as any are available */
824 			while (!PQisBusy(slots[i].connection))
825 			{
826 				PGresult   *result = PQgetResult(slots[i].connection);
827 
828 				if (result != NULL)
829 				{
830 					/* Check and discard the command result */
831 					if (!ProcessQueryResult(slots[i].connection, result,
832 											progname))
833 						return NULL;
834 				}
835 				else
836 				{
837 					/* This connection has become idle */
838 					slots[i].isFree = true;
839 					if (firstFree < 0)
840 						firstFree = i;
841 					break;
842 				}
843 			}
844 		}
845 	}
846 
847 	return slots + firstFree;
848 }
849 
850 /*
851  * ProcessQueryResult
852  *
853  * Process (and delete) a query result.  Returns true if there's no error,
854  * false otherwise -- but errors about trying to vacuum a missing relation
855  * are reported and subsequently ignored.
856  */
857 static bool
ProcessQueryResult(PGconn * conn,PGresult * result,const char * progname)858 ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
859 {
860 	/*
861 	 * If it's an error, report it.  Errors about a missing table are harmless
862 	 * so we continue processing; but die for other errors.
863 	 */
864 	if (PQresultStatus(result) != PGRES_COMMAND_OK)
865 	{
866 		char	   *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
867 
868 		fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
869 				progname, PQdb(conn), PQerrorMessage(conn));
870 
871 		if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
872 		{
873 			PQclear(result);
874 			return false;
875 		}
876 	}
877 
878 	PQclear(result);
879 	return true;
880 }
881 
882 /*
883  * GetQueryResult
884  *
885  * Pump the conn till it's dry of results; return false if any are errors.
886  * Note that this will block if the conn is busy.
887  */
888 static bool
GetQueryResult(PGconn * conn,const char * progname)889 GetQueryResult(PGconn *conn, const char *progname)
890 {
891 	bool		ok = true;
892 	PGresult   *result;
893 
894 	SetCancelConn(conn);
895 	while ((result = PQgetResult(conn)) != NULL)
896 	{
897 		if (!ProcessQueryResult(conn, result, progname))
898 			ok = false;
899 	}
900 	ResetCancelConn();
901 	return ok;
902 }
903 
904 /*
905  * DisconnectDatabase
906  *		Disconnect the connection associated with the given slot
907  */
908 static void
DisconnectDatabase(ParallelSlot * slot)909 DisconnectDatabase(ParallelSlot *slot)
910 {
911 	char		errbuf[256];
912 
913 	if (!slot->connection)
914 		return;
915 
916 	if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
917 	{
918 		PGcancel   *cancel;
919 
920 		if ((cancel = PQgetCancel(slot->connection)))
921 		{
922 			(void) PQcancel(cancel, errbuf, sizeof(errbuf));
923 			PQfreeCancel(cancel);
924 		}
925 	}
926 
927 	PQfinish(slot->connection);
928 	slot->connection = NULL;
929 }
930 
931 /*
932  * Loop on select() until a descriptor from the given set becomes readable.
933  *
934  * If we get a cancel request while we're waiting, we forego all further
935  * processing and set the *aborting flag to true.  The return value must be
936  * ignored in this case.  Otherwise, *aborting is set to false.
937  */
938 static int
select_loop(int maxFd,fd_set * workerset,bool * aborting)939 select_loop(int maxFd, fd_set *workerset, bool *aborting)
940 {
941 	int			i;
942 	fd_set		saveSet = *workerset;
943 
944 	if (CancelRequested)
945 	{
946 		*aborting = true;
947 		return -1;
948 	}
949 	else
950 		*aborting = false;
951 
952 	for (;;)
953 	{
954 		/*
955 		 * On Windows, we need to check once in a while for cancel requests;
956 		 * on other platforms we rely on select() returning when interrupted.
957 		 */
958 		struct timeval *tvp;
959 #ifdef WIN32
960 		struct timeval tv = {0, 1000000};
961 
962 		tvp = &tv;
963 #else
964 		tvp = NULL;
965 #endif
966 
967 		*workerset = saveSet;
968 		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
969 
970 #ifdef WIN32
971 		if (i == SOCKET_ERROR)
972 		{
973 			i = -1;
974 
975 			if (WSAGetLastError() == WSAEINTR)
976 				errno = EINTR;
977 		}
978 #endif
979 
980 		if (i < 0 && errno == EINTR)
981 			continue;			/* ignore this */
982 		if (i < 0 || CancelRequested)
983 			*aborting = true;	/* but not this */
984 		if (i == 0)
985 			continue;			/* timeout (Win32 only) */
986 		break;
987 	}
988 
989 	return i;
990 }
991 
992 static void
init_slot(ParallelSlot * slot,PGconn * conn)993 init_slot(ParallelSlot *slot, PGconn *conn)
994 {
995 	slot->connection = conn;
996 	/* Initially assume connection is idle */
997 	slot->isFree = true;
998 }
999 
1000 static void
help(const char * progname)1001 help(const char *progname)
1002 {
1003 	printf(_("%s cleans and analyzes a PostgreSQL database.\n\n"), progname);
1004 	printf(_("Usage:\n"));
1005 	printf(_("  %s [OPTION]... [DBNAME]\n"), progname);
1006 	printf(_("\nOptions:\n"));
1007 	printf(_("  -a, --all                       vacuum all databases\n"));
1008 	printf(_("  -d, --dbname=DBNAME             database to vacuum\n"));
1009 	printf(_("  -e, --echo                      show the commands being sent to the server\n"));
1010 	printf(_("  -f, --full                      do full vacuuming\n"));
1011 	printf(_("  -F, --freeze                    freeze row transaction information\n"));
1012 	printf(_("  -j, --jobs=NUM                  use this many concurrent connections to vacuum\n"));
1013 	printf(_("  -q, --quiet                     don't write any messages\n"));
1014 	printf(_("  -t, --table='TABLE[(COLUMNS)]'  vacuum specific table(s) only\n"));
1015 	printf(_("  -v, --verbose                   write a lot of output\n"));
1016 	printf(_("  -V, --version                   output version information, then exit\n"));
1017 	printf(_("  -z, --analyze                   update optimizer statistics\n"));
1018 	printf(_("  -Z, --analyze-only              only update optimizer statistics; no vacuum\n"));
1019 	printf(_("      --analyze-in-stages         only update optimizer statistics, in multiple\n"
1020 			 "                                  stages for faster results; no vacuum\n"));
1021 	printf(_("  -?, --help                      show this help, then exit\n"));
1022 	printf(_("\nConnection options:\n"));
1023 	printf(_("  -h, --host=HOSTNAME       database server host or socket directory\n"));
1024 	printf(_("  -p, --port=PORT           database server port\n"));
1025 	printf(_("  -U, --username=USERNAME   user name to connect as\n"));
1026 	printf(_("  -w, --no-password         never prompt for password\n"));
1027 	printf(_("  -W, --password            force password prompt\n"));
1028 	printf(_("  --maintenance-db=DBNAME   alternate maintenance database\n"));
1029 	printf(_("\nRead the description of the SQL command VACUUM for details.\n"));
1030 	printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
1031 }
1032