1 /*-------------------------------------------------------------------------
2  *
3  * vacuumdb
4  *
5  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
6  * Portions Copyright (c) 1994, Regents of the University of California
7  *
8  * src/bin/scripts/vacuumdb.c
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 #ifdef WIN32
14 #define FD_SETSIZE 1024			/* must set before winsock2.h is included */
15 #endif
16 
17 #include "postgres_fe.h"
18 
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 #include "catalog/pg_class_d.h"
24 
25 #include "common.h"
26 #include "fe_utils/simple_list.h"
27 #include "fe_utils/string_utils.h"
28 
29 
30 #define ERRCODE_UNDEFINED_TABLE  "42P01"
31 
32 /* Parallel vacuuming stuff */
33 typedef struct ParallelSlot
34 {
35 	PGconn	   *connection;		/* One connection */
36 	bool		isFree;			/* Is it known to be idle? */
37 } ParallelSlot;
38 
39 /* vacuum options controlled by user flags */
40 typedef struct vacuumingOptions
41 {
42 	bool		analyze_only;
43 	bool		verbose;
44 	bool		and_analyze;
45 	bool		full;
46 	bool		freeze;
47 } vacuumingOptions;
48 
49 
50 static void vacuum_one_database(const ConnParams *cparams,
51 								vacuumingOptions *vacopts,
52 					int stage,
53 					SimpleStringList *tables,
54 					int concurrentCons,
55 					const char *progname, bool echo, bool quiet);
56 
57 static void vacuum_all_databases(ConnParams *cparams,
58 								 vacuumingOptions *vacopts,
59 					 bool analyze_in_stages,
60 					 int concurrentCons,
61 					 const char *progname, bool echo, bool quiet);
62 
63 static void prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
64 					   vacuumingOptions *vacopts, const char *table,
65 					   bool table_pre_qualified,
66 					   const char *progname, bool echo);
67 
68 static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
69 				   const char *table, const char *progname, bool async);
70 
71 static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
72 			const char *progname);
73 
74 static bool ProcessQueryResult(PGconn *conn, PGresult *result,
75 				   const char *progname);
76 
77 static bool GetQueryResult(PGconn *conn, const char *progname);
78 
79 static void DisconnectDatabase(ParallelSlot *slot);
80 
81 static int	select_loop(int maxFd, fd_set *workerset, bool *aborting);
82 
83 static void init_slot(ParallelSlot *slot, PGconn *conn);
84 
85 static void help(const char *progname);
86 
87 /* For analyze-in-stages mode */
88 #define ANALYZE_NO_STAGE	-1
89 #define ANALYZE_NUM_STAGES	3
90 
91 
92 int
main(int argc,char * argv[])93 main(int argc, char *argv[])
94 {
95 	static struct option long_options[] = {
96 		{"host", required_argument, NULL, 'h'},
97 		{"port", required_argument, NULL, 'p'},
98 		{"username", required_argument, NULL, 'U'},
99 		{"no-password", no_argument, NULL, 'w'},
100 		{"password", no_argument, NULL, 'W'},
101 		{"echo", no_argument, NULL, 'e'},
102 		{"quiet", no_argument, NULL, 'q'},
103 		{"dbname", required_argument, NULL, 'd'},
104 		{"analyze", no_argument, NULL, 'z'},
105 		{"analyze-only", no_argument, NULL, 'Z'},
106 		{"freeze", no_argument, NULL, 'F'},
107 		{"all", no_argument, NULL, 'a'},
108 		{"table", required_argument, NULL, 't'},
109 		{"full", no_argument, NULL, 'f'},
110 		{"verbose", no_argument, NULL, 'v'},
111 		{"jobs", required_argument, NULL, 'j'},
112 		{"maintenance-db", required_argument, NULL, 2},
113 		{"analyze-in-stages", no_argument, NULL, 3},
114 		{NULL, 0, NULL, 0}
115 	};
116 
117 	const char *progname;
118 	int			optindex;
119 	int			c;
120 	const char *dbname = NULL;
121 	const char *maintenance_db = NULL;
122 	char	   *host = NULL;
123 	char	   *port = NULL;
124 	char	   *username = NULL;
125 	enum trivalue prompt_password = TRI_DEFAULT;
126 	ConnParams	cparams;
127 	bool		echo = false;
128 	bool		quiet = false;
129 	vacuumingOptions vacopts;
130 	bool		analyze_in_stages = false;
131 	bool		alldb = false;
132 	SimpleStringList tables = {NULL, NULL};
133 	int			concurrentCons = 1;
134 	int			tbl_count = 0;
135 
136 	/* initialize options to all false */
137 	memset(&vacopts, 0, sizeof(vacopts));
138 
139 	progname = get_progname(argv[0]);
140 
141 	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
142 
143 	handle_help_version_opts(argc, argv, "vacuumdb", help);
144 
145 	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
146 	{
147 		switch (c)
148 		{
149 			case 'h':
150 				host = pg_strdup(optarg);
151 				break;
152 			case 'p':
153 				port = pg_strdup(optarg);
154 				break;
155 			case 'U':
156 				username = pg_strdup(optarg);
157 				break;
158 			case 'w':
159 				prompt_password = TRI_NO;
160 				break;
161 			case 'W':
162 				prompt_password = TRI_YES;
163 				break;
164 			case 'e':
165 				echo = true;
166 				break;
167 			case 'q':
168 				quiet = true;
169 				break;
170 			case 'd':
171 				dbname = pg_strdup(optarg);
172 				break;
173 			case 'z':
174 				vacopts.and_analyze = true;
175 				break;
176 			case 'Z':
177 				vacopts.analyze_only = true;
178 				break;
179 			case 'F':
180 				vacopts.freeze = true;
181 				break;
182 			case 'a':
183 				alldb = true;
184 				break;
185 			case 't':
186 				{
187 					simple_string_list_append(&tables, optarg);
188 					tbl_count++;
189 					break;
190 				}
191 			case 'f':
192 				vacopts.full = true;
193 				break;
194 			case 'v':
195 				vacopts.verbose = true;
196 				break;
197 			case 'j':
198 				concurrentCons = atoi(optarg);
199 				if (concurrentCons <= 0)
200 				{
201 					fprintf(stderr, _("%s: number of parallel jobs must be at least 1\n"),
202 							progname);
203 					exit(1);
204 				}
205 				break;
206 			case 2:
207 				maintenance_db = pg_strdup(optarg);
208 				break;
209 			case 3:
210 				analyze_in_stages = vacopts.analyze_only = true;
211 				break;
212 			default:
213 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
214 				exit(1);
215 		}
216 	}
217 
218 	/*
219 	 * Non-option argument specifies database name as long as it wasn't
220 	 * already specified with -d / --dbname
221 	 */
222 	if (optind < argc && dbname == NULL)
223 	{
224 		dbname = argv[optind];
225 		optind++;
226 	}
227 
228 	if (optind < argc)
229 	{
230 		fprintf(stderr, _("%s: too many command-line arguments (first is \"%s\")\n"),
231 				progname, argv[optind]);
232 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
233 		exit(1);
234 	}
235 
236 	if (vacopts.analyze_only)
237 	{
238 		if (vacopts.full)
239 		{
240 			fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
241 					progname, "full");
242 			exit(1);
243 		}
244 		if (vacopts.freeze)
245 		{
246 			fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
247 					progname, "freeze");
248 			exit(1);
249 		}
250 		/* allow 'and_analyze' with 'analyze_only' */
251 	}
252 
253 	/* fill cparams except for dbname, which is set below */
254 	cparams.pghost = host;
255 	cparams.pgport = port;
256 	cparams.pguser = username;
257 	cparams.prompt_password = prompt_password;
258 	cparams.override_dbname = NULL;
259 
260 	setup_cancel_handler();
261 
262 	/* Avoid opening extra connections. */
263 	if (tbl_count && (concurrentCons > tbl_count))
264 		concurrentCons = tbl_count;
265 
266 	if (alldb)
267 	{
268 		if (dbname)
269 		{
270 			fprintf(stderr, _("%s: cannot vacuum all databases and a specific one at the same time\n"),
271 					progname);
272 			exit(1);
273 		}
274 		if (tables.head != NULL)
275 		{
276 			fprintf(stderr, _("%s: cannot vacuum specific table(s) in all databases\n"),
277 					progname);
278 			exit(1);
279 		}
280 
281 		cparams.dbname = maintenance_db;
282 
283 		vacuum_all_databases(&cparams, &vacopts,
284 							 analyze_in_stages,
285 							 concurrentCons,
286 							 progname, echo, quiet);
287 	}
288 	else
289 	{
290 		if (dbname == NULL)
291 		{
292 			if (getenv("PGDATABASE"))
293 				dbname = getenv("PGDATABASE");
294 			else if (getenv("PGUSER"))
295 				dbname = getenv("PGUSER");
296 			else
297 				dbname = get_user_name_or_exit(progname);
298 		}
299 
300 		cparams.dbname = dbname;
301 
302 		if (analyze_in_stages)
303 		{
304 			int			stage;
305 
306 			for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
307 			{
308 				vacuum_one_database(&cparams, &vacopts,
309 									stage,
310 									&tables,
311 									concurrentCons,
312 									progname, echo, quiet);
313 			}
314 		}
315 		else
316 			vacuum_one_database(&cparams, &vacopts,
317 								ANALYZE_NO_STAGE,
318 								&tables,
319 								concurrentCons,
320 								progname, echo, quiet);
321 	}
322 
323 	exit(0);
324 }
325 
326 /*
327  * vacuum_one_database
328  *
329  * Process tables in the given database.  If the 'tables' list is empty,
330  * process all tables in the database.
331  *
332  * Note that this function is only concerned with running exactly one stage
333  * when in analyze-in-stages mode; caller must iterate on us if necessary.
334  *
335  * If concurrentCons is > 1, multiple connections are used to vacuum tables
336  * in parallel.  In this case and if the table list is empty, we first obtain
337  * a list of tables from the database.
338  */
339 static void
vacuum_one_database(const ConnParams * cparams,vacuumingOptions * vacopts,int stage,SimpleStringList * tables,int concurrentCons,const char * progname,bool echo,bool quiet)340 vacuum_one_database(const ConnParams *cparams,
341 					vacuumingOptions *vacopts,
342 					int stage,
343 					SimpleStringList *tables,
344 					int concurrentCons,
345 					const char *progname, bool echo, bool quiet)
346 {
347 	PQExpBufferData sql;
348 	PGconn	   *conn;
349 	SimpleStringListCell *cell;
350 	ParallelSlot *slots;
351 	SimpleStringList dbtables = {NULL, NULL};
352 	int			i;
353 	bool		failed = false;
354 	bool		parallel = concurrentCons > 1;
355 	const char *stage_commands[] = {
356 		"SET default_statistics_target=1; SET vacuum_cost_delay=0;",
357 		"SET default_statistics_target=10; RESET vacuum_cost_delay;",
358 		"RESET default_statistics_target;"
359 	};
360 	const char *stage_messages[] = {
361 		gettext_noop("Generating minimal optimizer statistics (1 target)"),
362 		gettext_noop("Generating medium optimizer statistics (10 targets)"),
363 		gettext_noop("Generating default (full) optimizer statistics")
364 	};
365 
366 	Assert(stage == ANALYZE_NO_STAGE ||
367 		   (stage >= 0 && stage < ANALYZE_NUM_STAGES));
368 
369 	conn = connectDatabase(cparams, progname, echo, false, true);
370 
371 	if (!quiet)
372 	{
373 		if (stage != ANALYZE_NO_STAGE)
374 			printf(_("%s: processing database \"%s\": %s\n"),
375 				   progname, PQdb(conn), _(stage_messages[stage]));
376 		else
377 			printf(_("%s: vacuuming database \"%s\"\n"),
378 				   progname, PQdb(conn));
379 		fflush(stdout);
380 	}
381 
382 	initPQExpBuffer(&sql);
383 
384 	/*
385 	 * If a table list is not provided and we're using multiple connections,
386 	 * prepare the list of tables by querying the catalogs.
387 	 */
388 	if (parallel && (!tables || !tables->head))
389 	{
390 		PQExpBufferData buf;
391 		PGresult   *res;
392 		int			ntups;
393 
394 		initPQExpBuffer(&buf);
395 
396 		res = executeQuery(conn,
397 						   "SELECT c.relname, ns.nspname"
398 						   " FROM pg_class c, pg_namespace ns\n"
399 						   " WHERE relkind IN ("
400 						   CppAsString2(RELKIND_RELATION) ", "
401 						   CppAsString2(RELKIND_MATVIEW) ")"
402 						   " AND c.relnamespace = ns.oid\n"
403 						   " ORDER BY c.relpages DESC;",
404 						   progname, echo);
405 
406 		ntups = PQntuples(res);
407 		for (i = 0; i < ntups; i++)
408 		{
409 			appendPQExpBufferStr(&buf,
410 								 fmtQualifiedId(PQgetvalue(res, i, 1),
411 												PQgetvalue(res, i, 0)));
412 
413 			simple_string_list_append(&dbtables, buf.data);
414 			resetPQExpBuffer(&buf);
415 		}
416 
417 		termPQExpBuffer(&buf);
418 		tables = &dbtables;
419 
420 		/*
421 		 * If there are more connections than vacuumable relations, we don't
422 		 * need to use them all.
423 		 */
424 		if (concurrentCons > ntups)
425 			concurrentCons = ntups;
426 		if (concurrentCons <= 1)
427 			parallel = false;
428 		PQclear(res);
429 	}
430 
431 	/*
432 	 * Setup the database connections. We reuse the connection we already have
433 	 * for the first slot.  If not in parallel mode, the first slot in the
434 	 * array contains the connection.
435 	 */
436 	if (concurrentCons <= 0)
437 		concurrentCons = 1;
438 	slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
439 	init_slot(slots, conn);
440 	if (parallel)
441 	{
442 		for (i = 1; i < concurrentCons; i++)
443 		{
444 			conn = connectDatabase(cparams, progname, echo, false, true);
445 
446 			/*
447 			 * Fail and exit immediately if trying to use a socket in an
448 			 * unsupported range.  POSIX requires open(2) to use the lowest
449 			 * unused file descriptor and the hint given relies on that.
450 			 */
451 			if (PQsocket(conn) >= FD_SETSIZE)
452 			{
453 				fprintf(stderr,
454 						_("%s: too many jobs for this platform -- try %d\n"),
455 						progname, i);
456 				exit(1);
457 			}
458 
459 			init_slot(slots + i, conn);
460 		}
461 	}
462 
463 	/*
464 	 * Prepare all the connections to run the appropriate analyze stage, if
465 	 * caller requested that mode.
466 	 */
467 	if (stage != ANALYZE_NO_STAGE)
468 	{
469 		int			j;
470 
471 		/* We already emitted the message above */
472 
473 		for (j = 0; j < concurrentCons; j++)
474 			executeCommand((slots + j)->connection,
475 						   stage_commands[stage], progname, echo);
476 	}
477 
478 	cell = tables ? tables->head : NULL;
479 	do
480 	{
481 		const char *tabname = cell ? cell->val : NULL;
482 		ParallelSlot *free_slot;
483 
484 		if (CancelRequested)
485 		{
486 			failed = true;
487 			goto finish;
488 		}
489 
490 		/*
491 		 * Get the connection slot to use.  If in parallel mode, here we wait
492 		 * for one connection to become available if none already is.  In
493 		 * non-parallel mode we simply use the only slot we have, which we
494 		 * know to be free.
495 		 */
496 		if (parallel)
497 		{
498 			/*
499 			 * Get a free slot, waiting until one becomes free if none
500 			 * currently is.
501 			 */
502 			free_slot = GetIdleSlot(slots, concurrentCons, progname);
503 			if (!free_slot)
504 			{
505 				failed = true;
506 				goto finish;
507 			}
508 
509 			free_slot->isFree = false;
510 		}
511 		else
512 			free_slot = slots;
513 
514 		/*
515 		 * Prepare the vacuum command.  Note that in some cases this requires
516 		 * query execution, so be sure to use the free connection.
517 		 */
518 		prepare_vacuum_command(&sql, free_slot->connection, vacopts, tabname,
519 							   tables == &dbtables, progname, echo);
520 
521 		/*
522 		 * Execute the vacuum.  If not in parallel mode, this terminates the
523 		 * program in case of an error.  (The parallel case handles query
524 		 * errors in ProcessQueryResult through GetIdleSlot.)
525 		 */
526 		run_vacuum_command(free_slot->connection, sql.data,
527 						   echo, tabname, progname, parallel);
528 
529 		if (cell)
530 			cell = cell->next;
531 	} while (cell != NULL);
532 
533 	if (parallel)
534 	{
535 		int			j;
536 
537 		/* wait for all connections to finish */
538 		for (j = 0; j < concurrentCons; j++)
539 		{
540 			if (!GetQueryResult((slots + j)->connection, progname))
541 			{
542 				failed = true;
543 				goto finish;
544 			}
545 		}
546 	}
547 
548 finish:
549 	for (i = 0; i < concurrentCons; i++)
550 		DisconnectDatabase(slots + i);
551 	pfree(slots);
552 
553 	termPQExpBuffer(&sql);
554 
555 	if (failed)
556 		exit(1);
557 }
558 
559 /*
560  * Vacuum/analyze all connectable databases.
561  *
562  * In analyze-in-stages mode, we process all databases in one stage before
563  * moving on to the next stage.  That ensure minimal stats are available
564  * quickly everywhere before generating more detailed ones.
565  */
566 static void
vacuum_all_databases(ConnParams * cparams,vacuumingOptions * vacopts,bool analyze_in_stages,int concurrentCons,const char * progname,bool echo,bool quiet)567 vacuum_all_databases(ConnParams *cparams,
568 					 vacuumingOptions *vacopts,
569 					 bool analyze_in_stages,
570 					 int concurrentCons,
571 					 const char *progname, bool echo, bool quiet)
572 {
573 	PGconn	   *conn;
574 	PGresult   *result;
575 	int			stage;
576 	int			i;
577 
578 	conn = connectMaintenanceDatabase(cparams, progname, echo);
579 	result = executeQuery(conn,
580 						  "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
581 						  progname, echo);
582 	PQfinish(conn);
583 
584 	if (analyze_in_stages)
585 	{
586 		/*
587 		 * When analyzing all databases in stages, we analyze them all in the
588 		 * fastest stage first, so that initial statistics become available
589 		 * for all of them as soon as possible.
590 		 *
591 		 * This means we establish several times as many connections, but
592 		 * that's a secondary consideration.
593 		 */
594 		for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
595 		{
596 			for (i = 0; i < PQntuples(result); i++)
597 			{
598 				cparams->override_dbname = PQgetvalue(result, i, 0);
599 
600 				vacuum_one_database(cparams, vacopts,
601 									stage,
602 									NULL,
603 									concurrentCons,
604 									progname, echo, quiet);
605 			}
606 		}
607 	}
608 	else
609 	{
610 		for (i = 0; i < PQntuples(result); i++)
611 		{
612 			cparams->override_dbname = PQgetvalue(result, i, 0);
613 
614 			vacuum_one_database(cparams, vacopts,
615 								ANALYZE_NO_STAGE,
616 								NULL,
617 								concurrentCons,
618 								progname, echo, quiet);
619 		}
620 	}
621 
622 	PQclear(result);
623 }
624 
625 /*
626  * Construct a vacuum/analyze command to run based on the given options, in the
627  * given string buffer, which may contain previous garbage.
628  *
629  * An optional table name can be passed; this must be already be properly
630  * quoted.  The command is semicolon-terminated.
631  */
632 static void
prepare_vacuum_command(PQExpBuffer sql,PGconn * conn,vacuumingOptions * vacopts,const char * table,bool table_pre_qualified,const char * progname,bool echo)633 prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
634 					   vacuumingOptions *vacopts, const char *table,
635 					   bool table_pre_qualified,
636 					   const char *progname, bool echo)
637 {
638 	resetPQExpBuffer(sql);
639 
640 	if (vacopts->analyze_only)
641 	{
642 		appendPQExpBufferStr(sql, "ANALYZE");
643 		if (vacopts->verbose)
644 			appendPQExpBufferStr(sql, " VERBOSE");
645 	}
646 	else
647 	{
648 		appendPQExpBufferStr(sql, "VACUUM");
649 		if (PQserverVersion(conn) >= 90000)
650 		{
651 			const char *paren = " (";
652 			const char *comma = ", ";
653 			const char *sep = paren;
654 
655 			if (vacopts->full)
656 			{
657 				appendPQExpBuffer(sql, "%sFULL", sep);
658 				sep = comma;
659 			}
660 			if (vacopts->freeze)
661 			{
662 				appendPQExpBuffer(sql, "%sFREEZE", sep);
663 				sep = comma;
664 			}
665 			if (vacopts->verbose)
666 			{
667 				appendPQExpBuffer(sql, "%sVERBOSE", sep);
668 				sep = comma;
669 			}
670 			if (vacopts->and_analyze)
671 			{
672 				appendPQExpBuffer(sql, "%sANALYZE", sep);
673 				sep = comma;
674 			}
675 			if (sep != paren)
676 				appendPQExpBufferChar(sql, ')');
677 		}
678 		else
679 		{
680 			if (vacopts->full)
681 				appendPQExpBufferStr(sql, " FULL");
682 			if (vacopts->freeze)
683 				appendPQExpBufferStr(sql, " FREEZE");
684 			if (vacopts->verbose)
685 				appendPQExpBufferStr(sql, " VERBOSE");
686 			if (vacopts->and_analyze)
687 				appendPQExpBufferStr(sql, " ANALYZE");
688 		}
689 	}
690 
691 	if (table)
692 	{
693 		appendPQExpBufferChar(sql, ' ');
694 		if (table_pre_qualified)
695 			appendPQExpBufferStr(sql, table);
696 		else
697 			appendQualifiedRelation(sql, table, conn, progname, echo);
698 	}
699 	appendPQExpBufferChar(sql, ';');
700 }
701 
702 /*
703  * Send a vacuum/analyze command to the server.  In async mode, return after
704  * sending the command; else, wait for it to finish.
705  *
706  * Any errors during command execution are reported to stderr.  If async is
707  * false, this function exits the program after reporting the error.
708  */
709 static void
run_vacuum_command(PGconn * conn,const char * sql,bool echo,const char * table,const char * progname,bool async)710 run_vacuum_command(PGconn *conn, const char *sql, bool echo,
711 				   const char *table, const char *progname, bool async)
712 {
713 	bool		status;
714 
715 	if (async)
716 	{
717 		if (echo)
718 			printf("%s\n", sql);
719 
720 		status = PQsendQuery(conn, sql) == 1;
721 	}
722 	else
723 		status = executeMaintenanceCommand(conn, sql, echo);
724 
725 	if (!status)
726 	{
727 		if (table)
728 			fprintf(stderr,
729 					_("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
730 					progname, table, PQdb(conn), PQerrorMessage(conn));
731 		else
732 			fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
733 					progname, PQdb(conn), PQerrorMessage(conn));
734 
735 		if (!async)
736 		{
737 			PQfinish(conn);
738 			exit(1);
739 		}
740 	}
741 }
742 
743 /*
744  * GetIdleSlot
745  *		Return a connection slot that is ready to execute a command.
746  *
747  * We return the first slot we find that is marked isFree, if one is;
748  * otherwise, we loop on select() until one socket becomes available.  When
749  * this happens, we read the whole set and mark as free all sockets that become
750  * available.
751  *
752  * If an error occurs, NULL is returned.
753  */
754 static ParallelSlot *
GetIdleSlot(ParallelSlot slots[],int numslots,const char * progname)755 GetIdleSlot(ParallelSlot slots[], int numslots,
756 			const char *progname)
757 {
758 	int			i;
759 	int			firstFree = -1;
760 
761 	/* Any connection already known free? */
762 	for (i = 0; i < numslots; i++)
763 	{
764 		if (slots[i].isFree)
765 			return slots + i;
766 	}
767 
768 	/*
769 	 * No free slot found, so wait until one of the connections has finished
770 	 * its task and return the available slot.
771 	 */
772 	while (firstFree < 0)
773 	{
774 		fd_set		slotset;
775 		int			maxFd = 0;
776 		bool		aborting;
777 
778 		/* We must reconstruct the fd_set for each call to select_loop */
779 		FD_ZERO(&slotset);
780 
781 		for (i = 0; i < numslots; i++)
782 		{
783 			int			sock = PQsocket(slots[i].connection);
784 
785 			/*
786 			 * We don't really expect any connections to lose their sockets
787 			 * after startup, but just in case, cope by ignoring them.
788 			 */
789 			if (sock < 0)
790 				continue;
791 
792 			FD_SET(sock, &slotset);
793 			if (sock > maxFd)
794 				maxFd = sock;
795 		}
796 
797 		SetCancelConn(slots->connection);
798 		i = select_loop(maxFd, &slotset, &aborting);
799 		ResetCancelConn();
800 
801 		if (aborting)
802 		{
803 			/*
804 			 * We set the cancel-receiving connection to the one in the zeroth
805 			 * slot above, so fetch the error from there.
806 			 */
807 			GetQueryResult(slots->connection, progname);
808 			return NULL;
809 		}
810 		Assert(i != 0);
811 
812 		for (i = 0; i < numslots; i++)
813 		{
814 			int			sock = PQsocket(slots[i].connection);
815 
816 			if (sock >= 0 && FD_ISSET(sock, &slotset))
817 			{
818 				/* select() says input is available, so consume it */
819 				PQconsumeInput(slots[i].connection);
820 			}
821 
822 			/* Collect result(s) as long as any are available */
823 			while (!PQisBusy(slots[i].connection))
824 			{
825 				PGresult   *result = PQgetResult(slots[i].connection);
826 
827 				if (result != NULL)
828 				{
829 					/* Check and discard the command result */
830 					if (!ProcessQueryResult(slots[i].connection, result,
831 											progname))
832 						return NULL;
833 				}
834 				else
835 				{
836 					/* This connection has become idle */
837 					slots[i].isFree = true;
838 					if (firstFree < 0)
839 						firstFree = i;
840 					break;
841 				}
842 			}
843 		}
844 	}
845 
846 	return slots + firstFree;
847 }
848 
849 /*
850  * ProcessQueryResult
851  *
852  * Process (and delete) a query result.  Returns true if there's no error,
853  * false otherwise -- but errors about trying to vacuum a missing relation
854  * are reported and subsequently ignored.
855  */
856 static bool
ProcessQueryResult(PGconn * conn,PGresult * result,const char * progname)857 ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
858 {
859 	/*
860 	 * If it's an error, report it.  Errors about a missing table are harmless
861 	 * so we continue processing; but die for other errors.
862 	 */
863 	if (PQresultStatus(result) != PGRES_COMMAND_OK)
864 	{
865 		char	   *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
866 
867 		fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
868 				progname, PQdb(conn), PQerrorMessage(conn));
869 
870 		if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
871 		{
872 			PQclear(result);
873 			return false;
874 		}
875 	}
876 
877 	PQclear(result);
878 	return true;
879 }
880 
881 /*
882  * GetQueryResult
883  *
884  * Pump the conn till it's dry of results; return false if any are errors.
885  * Note that this will block if the conn is busy.
886  */
887 static bool
GetQueryResult(PGconn * conn,const char * progname)888 GetQueryResult(PGconn *conn, const char *progname)
889 {
890 	bool		ok = true;
891 	PGresult   *result;
892 
893 	SetCancelConn(conn);
894 	while ((result = PQgetResult(conn)) != NULL)
895 	{
896 		if (!ProcessQueryResult(conn, result, progname))
897 			ok = false;
898 	}
899 	ResetCancelConn();
900 	return ok;
901 }
902 
903 /*
904  * DisconnectDatabase
905  *		Disconnect the connection associated with the given slot
906  */
907 static void
DisconnectDatabase(ParallelSlot * slot)908 DisconnectDatabase(ParallelSlot *slot)
909 {
910 	char		errbuf[256];
911 
912 	if (!slot->connection)
913 		return;
914 
915 	if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
916 	{
917 		PGcancel   *cancel;
918 
919 		if ((cancel = PQgetCancel(slot->connection)))
920 		{
921 			(void) PQcancel(cancel, errbuf, sizeof(errbuf));
922 			PQfreeCancel(cancel);
923 		}
924 	}
925 
926 	PQfinish(slot->connection);
927 	slot->connection = NULL;
928 }
929 
930 /*
931  * Loop on select() until a descriptor from the given set becomes readable.
932  *
933  * If we get a cancel request while we're waiting, we forego all further
934  * processing and set the *aborting flag to true.  The return value must be
935  * ignored in this case.  Otherwise, *aborting is set to false.
936  */
937 static int
select_loop(int maxFd,fd_set * workerset,bool * aborting)938 select_loop(int maxFd, fd_set *workerset, bool *aborting)
939 {
940 	int			i;
941 	fd_set		saveSet = *workerset;
942 
943 	if (CancelRequested)
944 	{
945 		*aborting = true;
946 		return -1;
947 	}
948 	else
949 		*aborting = false;
950 
951 	for (;;)
952 	{
953 		/*
954 		 * On Windows, we need to check once in a while for cancel requests;
955 		 * on other platforms we rely on select() returning when interrupted.
956 		 */
957 		struct timeval *tvp;
958 #ifdef WIN32
959 		struct timeval tv = {0, 1000000};
960 
961 		tvp = &tv;
962 #else
963 		tvp = NULL;
964 #endif
965 
966 		*workerset = saveSet;
967 		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
968 
969 #ifdef WIN32
970 		if (i == SOCKET_ERROR)
971 		{
972 			i = -1;
973 
974 			if (WSAGetLastError() == WSAEINTR)
975 				errno = EINTR;
976 		}
977 #endif
978 
979 		if (i < 0 && errno == EINTR)
980 			continue;			/* ignore this */
981 		if (i < 0 || CancelRequested)
982 			*aborting = true;	/* but not this */
983 		if (i == 0)
984 			continue;			/* timeout (Win32 only) */
985 		break;
986 	}
987 
988 	return i;
989 }
990 
991 static void
init_slot(ParallelSlot * slot,PGconn * conn)992 init_slot(ParallelSlot *slot, PGconn *conn)
993 {
994 	slot->connection = conn;
995 	/* Initially assume connection is idle */
996 	slot->isFree = true;
997 }
998 
999 static void
help(const char * progname)1000 help(const char *progname)
1001 {
1002 	printf(_("%s cleans and analyzes a PostgreSQL database.\n\n"), progname);
1003 	printf(_("Usage:\n"));
1004 	printf(_("  %s [OPTION]... [DBNAME]\n"), progname);
1005 	printf(_("\nOptions:\n"));
1006 	printf(_("  -a, --all                       vacuum all databases\n"));
1007 	printf(_("  -d, --dbname=DBNAME             database to vacuum\n"));
1008 	printf(_("  -e, --echo                      show the commands being sent to the server\n"));
1009 	printf(_("  -f, --full                      do full vacuuming\n"));
1010 	printf(_("  -F, --freeze                    freeze row transaction information\n"));
1011 	printf(_("  -j, --jobs=NUM                  use this many concurrent connections to vacuum\n"));
1012 	printf(_("  -q, --quiet                     don't write any messages\n"));
1013 	printf(_("  -t, --table='TABLE[(COLUMNS)]'  vacuum specific table(s) only\n"));
1014 	printf(_("  -v, --verbose                   write a lot of output\n"));
1015 	printf(_("  -V, --version                   output version information, then exit\n"));
1016 	printf(_("  -z, --analyze                   update optimizer statistics\n"));
1017 	printf(_("  -Z, --analyze-only              only update optimizer statistics; no vacuum\n"));
1018 	printf(_("      --analyze-in-stages         only update optimizer statistics, in multiple\n"
1019 			 "                                  stages for faster results; no vacuum\n"));
1020 	printf(_("  -?, --help                      show this help, then exit\n"));
1021 	printf(_("\nConnection options:\n"));
1022 	printf(_("  -h, --host=HOSTNAME       database server host or socket directory\n"));
1023 	printf(_("  -p, --port=PORT           database server port\n"));
1024 	printf(_("  -U, --username=USERNAME   user name to connect as\n"));
1025 	printf(_("  -w, --no-password         never prompt for password\n"));
1026 	printf(_("  -W, --password            force password prompt\n"));
1027 	printf(_("  --maintenance-db=DBNAME   alternate maintenance database\n"));
1028 	printf(_("\nRead the description of the SQL command VACUUM for details.\n"));
1029 	printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
1030 }
1031