1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *	  The postgres vacuum cleaner.
5  *
6  * This file now includes only control and dispatch code for VACUUM and
7  * ANALYZE commands.  Regular VACUUM is implemented in vacuumlazy.c,
8  * ANALYZE in analyze.c, and VACUUM FULL is a variant of CLUSTER, handled
9  * in cluster.c.
10  *
11  *
12  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  *
16  * IDENTIFICATION
17  *	  src/backend/commands/vacuum.c
18  *
19  *-------------------------------------------------------------------------
20  */
21 #include "postgres.h"
22 
23 #include <math.h>
24 
25 #include "access/clog.h"
26 #include "access/commit_ts.h"
27 #include "access/genam.h"
28 #include "access/heapam.h"
29 #include "access/htup_details.h"
30 #include "access/multixact.h"
31 #include "access/tableam.h"
32 #include "access/transam.h"
33 #include "access/xact.h"
34 #include "catalog/namespace.h"
35 #include "catalog/pg_database.h"
36 #include "catalog/pg_inherits.h"
37 #include "catalog/pg_namespace.h"
38 #include "commands/cluster.h"
39 #include "commands/defrem.h"
40 #include "commands/vacuum.h"
41 #include "miscadmin.h"
42 #include "nodes/makefuncs.h"
43 #include "pgstat.h"
44 #include "postmaster/autovacuum.h"
45 #include "postmaster/bgworker_internals.h"
46 #include "storage/bufmgr.h"
47 #include "storage/lmgr.h"
48 #include "storage/proc.h"
49 #include "storage/procarray.h"
50 #include "utils/acl.h"
51 #include "utils/fmgroids.h"
52 #include "utils/guc.h"
53 #include "utils/memutils.h"
54 #include "utils/snapmgr.h"
55 #include "utils/syscache.h"
56 
57 
58 /*
59  * GUC parameters
60  */
61 int			vacuum_freeze_min_age;
62 int			vacuum_freeze_table_age;
63 int			vacuum_multixact_freeze_min_age;
64 int			vacuum_multixact_freeze_table_age;
65 
66 
67 /* A few variables that don't seem worth passing around as parameters */
68 static MemoryContext vac_context = NULL;
69 static BufferAccessStrategy vac_strategy;
70 
71 
72 /*
73  * Variables for cost-based parallel vacuum.  See comments atop
74  * compute_parallel_delay to understand how it works.
75  */
76 pg_atomic_uint32 *VacuumSharedCostBalance = NULL;
77 pg_atomic_uint32 *VacuumActiveNWorkers = NULL;
78 int			VacuumCostBalanceLocal = 0;
79 
80 /* non-export function prototypes */
81 static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
82 static List *get_all_vacuum_rels(int options);
83 static void vac_truncate_clog(TransactionId frozenXID,
84 							  MultiXactId minMulti,
85 							  TransactionId lastSaneFrozenXid,
86 							  MultiXactId lastSaneMinMulti);
87 static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
88 static double compute_parallel_delay(void);
89 static VacOptTernaryValue get_vacopt_ternary_value(DefElem *def);
90 
91 /*
92  * Primary entry point for manual VACUUM and ANALYZE commands
93  *
94  * This is mainly a preparation wrapper for the real operations that will
95  * happen in vacuum().
96  */
97 void
98 ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
99 {
100 	VacuumParams params;
101 	bool		verbose = false;
102 	bool		skip_locked = false;
103 	bool		analyze = false;
104 	bool		freeze = false;
105 	bool		full = false;
106 	bool		disable_page_skipping = false;
107 	ListCell   *lc;
108 
109 	/* Set default value */
110 	params.index_cleanup = VACOPT_TERNARY_DEFAULT;
111 	params.truncate = VACOPT_TERNARY_DEFAULT;
112 
113 	/* By default parallel vacuum is enabled */
114 	params.nworkers = 0;
115 
116 	/* Parse options list */
117 	foreach(lc, vacstmt->options)
118 	{
119 		DefElem    *opt = (DefElem *) lfirst(lc);
120 
121 		/* Parse common options for VACUUM and ANALYZE */
122 		if (strcmp(opt->defname, "verbose") == 0)
123 			verbose = defGetBoolean(opt);
124 		else if (strcmp(opt->defname, "skip_locked") == 0)
125 			skip_locked = defGetBoolean(opt);
126 		else if (!vacstmt->is_vacuumcmd)
127 			ereport(ERROR,
128 					(errcode(ERRCODE_SYNTAX_ERROR),
129 					 errmsg("unrecognized ANALYZE option \"%s\"", opt->defname),
130 					 parser_errposition(pstate, opt->location)));
131 
132 		/* Parse options available on VACUUM */
133 		else if (strcmp(opt->defname, "analyze") == 0)
134 			analyze = defGetBoolean(opt);
135 		else if (strcmp(opt->defname, "freeze") == 0)
136 			freeze = defGetBoolean(opt);
137 		else if (strcmp(opt->defname, "full") == 0)
138 			full = defGetBoolean(opt);
139 		else if (strcmp(opt->defname, "disable_page_skipping") == 0)
140 			disable_page_skipping = defGetBoolean(opt);
141 		else if (strcmp(opt->defname, "index_cleanup") == 0)
142 			params.index_cleanup = get_vacopt_ternary_value(opt);
143 		else if (strcmp(opt->defname, "truncate") == 0)
144 			params.truncate = get_vacopt_ternary_value(opt);
145 		else if (strcmp(opt->defname, "parallel") == 0)
146 		{
147 			if (opt->arg == NULL)
148 			{
149 				ereport(ERROR,
150 						(errcode(ERRCODE_SYNTAX_ERROR),
151 						 errmsg("parallel option requires a value between 0 and %d",
152 								MAX_PARALLEL_WORKER_LIMIT),
153 						 parser_errposition(pstate, opt->location)));
154 			}
155 			else
156 			{
157 				int			nworkers;
158 
159 				nworkers = defGetInt32(opt);
160 				if (nworkers < 0 || nworkers > MAX_PARALLEL_WORKER_LIMIT)
161 					ereport(ERROR,
162 							(errcode(ERRCODE_SYNTAX_ERROR),
163 							 errmsg("parallel workers for vacuum must be between 0 and %d",
164 									MAX_PARALLEL_WORKER_LIMIT),
165 							 parser_errposition(pstate, opt->location)));
166 
167 				/*
168 				 * Disable parallel vacuum, if user has specified parallel
169 				 * degree as zero.
170 				 */
171 				if (nworkers == 0)
172 					params.nworkers = -1;
173 				else
174 					params.nworkers = nworkers;
175 			}
176 		}
177 		else
178 			ereport(ERROR,
179 					(errcode(ERRCODE_SYNTAX_ERROR),
180 					 errmsg("unrecognized VACUUM option \"%s\"", opt->defname),
181 					 parser_errposition(pstate, opt->location)));
182 	}
183 
184 	/* Set vacuum options */
185 	params.options =
186 		(vacstmt->is_vacuumcmd ? VACOPT_VACUUM : VACOPT_ANALYZE) |
187 		(verbose ? VACOPT_VERBOSE : 0) |
188 		(skip_locked ? VACOPT_SKIP_LOCKED : 0) |
189 		(analyze ? VACOPT_ANALYZE : 0) |
190 		(freeze ? VACOPT_FREEZE : 0) |
191 		(full ? VACOPT_FULL : 0) |
192 		(disable_page_skipping ? VACOPT_DISABLE_PAGE_SKIPPING : 0);
193 
194 	/* sanity checks on options */
195 	Assert(params.options & (VACOPT_VACUUM | VACOPT_ANALYZE));
196 	Assert((params.options & VACOPT_VACUUM) ||
197 		   !(params.options & (VACOPT_FULL | VACOPT_FREEZE)));
198 	Assert(!(params.options & VACOPT_SKIPTOAST));
199 
200 	if ((params.options & VACOPT_FULL) && params.nworkers > 0)
201 		ereport(ERROR,
202 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
203 				 errmsg("VACUUM FULL cannot be performed in parallel")));
204 
205 	/*
206 	 * Make sure VACOPT_ANALYZE is specified if any column lists are present.
207 	 */
208 	if (!(params.options & VACOPT_ANALYZE))
209 	{
210 		ListCell   *lc;
211 
212 		foreach(lc, vacstmt->rels)
213 		{
214 			VacuumRelation *vrel = lfirst_node(VacuumRelation, lc);
215 
216 			if (vrel->va_cols != NIL)
217 				ereport(ERROR,
218 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
219 						 errmsg("ANALYZE option must be specified when a column list is provided")));
220 		}
221 	}
222 
223 	/*
224 	 * All freeze ages are zero if the FREEZE option is given; otherwise pass
225 	 * them as -1 which means to use the default values.
226 	 */
227 	if (params.options & VACOPT_FREEZE)
228 	{
229 		params.freeze_min_age = 0;
230 		params.freeze_table_age = 0;
231 		params.multixact_freeze_min_age = 0;
232 		params.multixact_freeze_table_age = 0;
233 	}
234 	else
235 	{
236 		params.freeze_min_age = -1;
237 		params.freeze_table_age = -1;
238 		params.multixact_freeze_min_age = -1;
239 		params.multixact_freeze_table_age = -1;
240 	}
241 
242 	/* user-invoked vacuum is never "for wraparound" */
243 	params.is_wraparound = false;
244 
245 	/* user-invoked vacuum never uses this parameter */
246 	params.log_min_duration = -1;
247 
248 	/* Now go through the common routine */
249 	vacuum(vacstmt->rels, &params, NULL, isTopLevel);
250 }
251 
252 /*
253  * Internal entry point for VACUUM and ANALYZE commands.
254  *
255  * relations, if not NIL, is a list of VacuumRelation to process; otherwise,
256  * we process all relevant tables in the database.  For each VacuumRelation,
257  * if a valid OID is supplied, the table with that OID is what to process;
258  * otherwise, the VacuumRelation's RangeVar indicates what to process.
259  *
260  * params contains a set of parameters that can be used to customize the
261  * behavior.
262  *
263  * bstrategy is normally given as NULL, but in autovacuum it can be passed
264  * in to use the same buffer strategy object across multiple vacuum() calls.
265  *
266  * isTopLevel should be passed down from ProcessUtility.
267  *
268  * It is the caller's responsibility that all parameters are allocated in a
269  * memory context that will not disappear at transaction commit.
270  */
271 void
272 vacuum(List *relations, VacuumParams *params,
273 	   BufferAccessStrategy bstrategy, bool isTopLevel)
274 {
275 	static bool in_vacuum = false;
276 
277 	const char *stmttype;
278 	volatile bool in_outer_xact,
279 				use_own_xacts;
280 
281 	Assert(params != NULL);
282 
283 	stmttype = (params->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
284 
285 	/*
286 	 * We cannot run VACUUM inside a user transaction block; if we were inside
287 	 * a transaction, then our commit- and start-transaction-command calls
288 	 * would not have the intended effect!	There are numerous other subtle
289 	 * dependencies on this, too.
290 	 *
291 	 * ANALYZE (without VACUUM) can run either way.
292 	 */
293 	if (params->options & VACOPT_VACUUM)
294 	{
295 		PreventInTransactionBlock(isTopLevel, stmttype);
296 		in_outer_xact = false;
297 	}
298 	else
299 		in_outer_xact = IsInTransactionBlock(isTopLevel);
300 
301 	/*
302 	 * Due to static variables vac_context, anl_context and vac_strategy,
303 	 * vacuum() is not reentrant.  This matters when VACUUM FULL or ANALYZE
304 	 * calls a hostile index expression that itself calls ANALYZE.
305 	 */
306 	if (in_vacuum)
307 		ereport(ERROR,
308 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
309 				 errmsg("%s cannot be executed from VACUUM or ANALYZE",
310 						stmttype)));
311 
312 	/*
313 	 * Sanity check DISABLE_PAGE_SKIPPING option.
314 	 */
315 	if ((params->options & VACOPT_FULL) != 0 &&
316 		(params->options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
317 		ereport(ERROR,
318 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
319 				 errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL")));
320 
321 	/*
322 	 * Send info about dead objects to the statistics collector, unless we are
323 	 * in autovacuum --- autovacuum.c does this for itself.
324 	 */
325 	if ((params->options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
326 		pgstat_vacuum_stat();
327 
328 	/*
329 	 * Create special memory context for cross-transaction storage.
330 	 *
331 	 * Since it is a child of PortalContext, it will go away eventually even
332 	 * if we suffer an error; there's no need for special abort cleanup logic.
333 	 */
334 	vac_context = AllocSetContextCreate(PortalContext,
335 										"Vacuum",
336 										ALLOCSET_DEFAULT_SIZES);
337 
338 	/*
339 	 * If caller didn't give us a buffer strategy object, make one in the
340 	 * cross-transaction memory context.
341 	 */
342 	if (bstrategy == NULL)
343 	{
344 		MemoryContext old_context = MemoryContextSwitchTo(vac_context);
345 
346 		bstrategy = GetAccessStrategy(BAS_VACUUM);
347 		MemoryContextSwitchTo(old_context);
348 	}
349 	vac_strategy = bstrategy;
350 
351 	/*
352 	 * Build list of relation(s) to process, putting any new data in
353 	 * vac_context for safekeeping.
354 	 */
355 	if (relations != NIL)
356 	{
357 		List	   *newrels = NIL;
358 		ListCell   *lc;
359 
360 		foreach(lc, relations)
361 		{
362 			VacuumRelation *vrel = lfirst_node(VacuumRelation, lc);
363 			List	   *sublist;
364 			MemoryContext old_context;
365 
366 			sublist = expand_vacuum_rel(vrel, params->options);
367 			old_context = MemoryContextSwitchTo(vac_context);
368 			newrels = list_concat(newrels, sublist);
369 			MemoryContextSwitchTo(old_context);
370 		}
371 		relations = newrels;
372 	}
373 	else
374 		relations = get_all_vacuum_rels(params->options);
375 
376 	/*
377 	 * Decide whether we need to start/commit our own transactions.
378 	 *
379 	 * For VACUUM (with or without ANALYZE): always do so, so that we can
380 	 * release locks as soon as possible.  (We could possibly use the outer
381 	 * transaction for a one-table VACUUM, but handling TOAST tables would be
382 	 * problematic.)
383 	 *
384 	 * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
385 	 * start/commit our own transactions.  Also, there's no need to do so if
386 	 * only processing one relation.  For multiple relations when not within a
387 	 * transaction block, and also in an autovacuum worker, use own
388 	 * transactions so we can release locks sooner.
389 	 */
390 	if (params->options & VACOPT_VACUUM)
391 		use_own_xacts = true;
392 	else
393 	{
394 		Assert(params->options & VACOPT_ANALYZE);
395 		if (IsAutoVacuumWorkerProcess())
396 			use_own_xacts = true;
397 		else if (in_outer_xact)
398 			use_own_xacts = false;
399 		else if (list_length(relations) > 1)
400 			use_own_xacts = true;
401 		else
402 			use_own_xacts = false;
403 	}
404 
405 	/*
406 	 * vacuum_rel expects to be entered with no transaction active; it will
407 	 * start and commit its own transaction.  But we are called by an SQL
408 	 * command, and so we are executing inside a transaction already. We
409 	 * commit the transaction started in PostgresMain() here, and start
410 	 * another one before exiting to match the commit waiting for us back in
411 	 * PostgresMain().
412 	 */
413 	if (use_own_xacts)
414 	{
415 		Assert(!in_outer_xact);
416 
417 		/* ActiveSnapshot is not set by autovacuum */
418 		if (ActiveSnapshotSet())
419 			PopActiveSnapshot();
420 
421 		/* matches the StartTransaction in PostgresMain() */
422 		CommitTransactionCommand();
423 	}
424 
425 	/* Turn vacuum cost accounting on or off, and set/clear in_vacuum */
426 	PG_TRY();
427 	{
428 		ListCell   *cur;
429 
430 		in_vacuum = true;
431 		VacuumCostActive = (VacuumCostDelay > 0);
432 		VacuumCostBalance = 0;
433 		VacuumPageHit = 0;
434 		VacuumPageMiss = 0;
435 		VacuumPageDirty = 0;
436 		VacuumCostBalanceLocal = 0;
437 		VacuumSharedCostBalance = NULL;
438 		VacuumActiveNWorkers = NULL;
439 
440 		/*
441 		 * Loop to process each selected relation.
442 		 */
443 		foreach(cur, relations)
444 		{
445 			VacuumRelation *vrel = lfirst_node(VacuumRelation, cur);
446 
447 			if (params->options & VACOPT_VACUUM)
448 			{
449 				if (!vacuum_rel(vrel->oid, vrel->relation, params))
450 					continue;
451 			}
452 
453 			if (params->options & VACOPT_ANALYZE)
454 			{
455 				/*
456 				 * If using separate xacts, start one for analyze. Otherwise,
457 				 * we can use the outer transaction.
458 				 */
459 				if (use_own_xacts)
460 				{
461 					StartTransactionCommand();
462 					/* functions in indexes may want a snapshot set */
463 					PushActiveSnapshot(GetTransactionSnapshot());
464 				}
465 
466 				analyze_rel(vrel->oid, vrel->relation, params,
467 							vrel->va_cols, in_outer_xact, vac_strategy);
468 
469 				if (use_own_xacts)
470 				{
471 					PopActiveSnapshot();
472 					CommitTransactionCommand();
473 				}
474 				else
475 				{
476 					/*
477 					 * If we're not using separate xacts, better separate the
478 					 * ANALYZE actions with CCIs.  This avoids trouble if user
479 					 * says "ANALYZE t, t".
480 					 */
481 					CommandCounterIncrement();
482 				}
483 			}
484 		}
485 	}
486 	PG_FINALLY();
487 	{
488 		in_vacuum = false;
489 		VacuumCostActive = false;
490 	}
491 	PG_END_TRY();
492 
493 	/*
494 	 * Finish up processing.
495 	 */
496 	if (use_own_xacts)
497 	{
498 		/* here, we are not in a transaction */
499 
500 		/*
501 		 * This matches the CommitTransaction waiting for us in
502 		 * PostgresMain().
503 		 */
504 		StartTransactionCommand();
505 	}
506 
507 	if ((params->options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
508 	{
509 		/*
510 		 * Update pg_database.datfrozenxid, and truncate pg_xact if possible.
511 		 * (autovacuum.c does this for itself.)
512 		 */
513 		vac_update_datfrozenxid();
514 	}
515 
516 	/*
517 	 * Clean up working storage --- note we must do this after
518 	 * StartTransactionCommand, else we might be trying to delete the active
519 	 * context!
520 	 */
521 	MemoryContextDelete(vac_context);
522 	vac_context = NULL;
523 }
524 
525 /*
526  * Check if a given relation can be safely vacuumed or analyzed.  If the
527  * user is not the relation owner, issue a WARNING log message and return
528  * false to let the caller decide what to do with this relation.  This
529  * routine is used to decide if a relation can be processed for VACUUM or
530  * ANALYZE.
531  */
532 bool
533 vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple, int options)
534 {
535 	char	   *relname;
536 
537 	Assert((options & (VACOPT_VACUUM | VACOPT_ANALYZE)) != 0);
538 
539 	/*
540 	 * Check permissions.
541 	 *
542 	 * We allow the user to vacuum or analyze a table if he is superuser, the
543 	 * table owner, or the database owner (but in the latter case, only if
544 	 * it's not a shared relation).  pg_class_ownercheck includes the
545 	 * superuser case.
546 	 *
547 	 * Note we choose to treat permissions failure as a WARNING and keep
548 	 * trying to vacuum or analyze the rest of the DB --- is this appropriate?
549 	 */
550 	if (pg_class_ownercheck(relid, GetUserId()) ||
551 		(pg_database_ownercheck(MyDatabaseId, GetUserId()) && !reltuple->relisshared))
552 		return true;
553 
554 	relname = NameStr(reltuple->relname);
555 
556 	if ((options & VACOPT_VACUUM) != 0)
557 	{
558 		if (reltuple->relisshared)
559 			ereport(WARNING,
560 					(errmsg("skipping \"%s\" --- only superuser can vacuum it",
561 							relname)));
562 		else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
563 			ereport(WARNING,
564 					(errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
565 							relname)));
566 		else
567 			ereport(WARNING,
568 					(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
569 							relname)));
570 
571 		/*
572 		 * For VACUUM ANALYZE, both logs could show up, but just generate
573 		 * information for VACUUM as that would be the first one to be
574 		 * processed.
575 		 */
576 		return false;
577 	}
578 
579 	if ((options & VACOPT_ANALYZE) != 0)
580 	{
581 		if (reltuple->relisshared)
582 			ereport(WARNING,
583 					(errmsg("skipping \"%s\" --- only superuser can analyze it",
584 							relname)));
585 		else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
586 			ereport(WARNING,
587 					(errmsg("skipping \"%s\" --- only superuser or database owner can analyze it",
588 							relname)));
589 		else
590 			ereport(WARNING,
591 					(errmsg("skipping \"%s\" --- only table or database owner can analyze it",
592 							relname)));
593 	}
594 
595 	return false;
596 }
597 
598 
599 /*
600  * vacuum_open_relation
601  *
602  * This routine is used for attempting to open and lock a relation which
603  * is going to be vacuumed or analyzed.  If the relation cannot be opened
604  * or locked, a log is emitted if possible.
605  */
606 Relation
607 vacuum_open_relation(Oid relid, RangeVar *relation, int options,
608 					 bool verbose, LOCKMODE lmode)
609 {
610 	Relation	onerel;
611 	bool		rel_lock = true;
612 	int			elevel;
613 
614 	Assert((options & (VACOPT_VACUUM | VACOPT_ANALYZE)) != 0);
615 
616 	/*
617 	 * Open the relation and get the appropriate lock on it.
618 	 *
619 	 * There's a race condition here: the relation may have gone away since
620 	 * the last time we saw it.  If so, we don't need to vacuum or analyze it.
621 	 *
622 	 * If we've been asked not to wait for the relation lock, acquire it first
623 	 * in non-blocking mode, before calling try_relation_open().
624 	 */
625 	if (!(options & VACOPT_SKIP_LOCKED))
626 		onerel = try_relation_open(relid, lmode);
627 	else if (ConditionalLockRelationOid(relid, lmode))
628 		onerel = try_relation_open(relid, NoLock);
629 	else
630 	{
631 		onerel = NULL;
632 		rel_lock = false;
633 	}
634 
635 	/* if relation is opened, leave */
636 	if (onerel)
637 		return onerel;
638 
639 	/*
640 	 * Relation could not be opened, hence generate if possible a log
641 	 * informing on the situation.
642 	 *
643 	 * If the RangeVar is not defined, we do not have enough information to
644 	 * provide a meaningful log statement.  Chances are that the caller has
645 	 * intentionally not provided this information so that this logging is
646 	 * skipped, anyway.
647 	 */
648 	if (relation == NULL)
649 		return NULL;
650 
651 	/*
652 	 * Determine the log level.
653 	 *
654 	 * For manual VACUUM or ANALYZE, we emit a WARNING to match the log
655 	 * statements in the permission checks; otherwise, only log if the caller
656 	 * so requested.
657 	 */
658 	if (!IsAutoVacuumWorkerProcess())
659 		elevel = WARNING;
660 	else if (verbose)
661 		elevel = LOG;
662 	else
663 		return NULL;
664 
665 	if ((options & VACOPT_VACUUM) != 0)
666 	{
667 		if (!rel_lock)
668 			ereport(elevel,
669 					(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
670 					 errmsg("skipping vacuum of \"%s\" --- lock not available",
671 							relation->relname)));
672 		else
673 			ereport(elevel,
674 					(errcode(ERRCODE_UNDEFINED_TABLE),
675 					 errmsg("skipping vacuum of \"%s\" --- relation no longer exists",
676 							relation->relname)));
677 
678 		/*
679 		 * For VACUUM ANALYZE, both logs could show up, but just generate
680 		 * information for VACUUM as that would be the first one to be
681 		 * processed.
682 		 */
683 		return NULL;
684 	}
685 
686 	if ((options & VACOPT_ANALYZE) != 0)
687 	{
688 		if (!rel_lock)
689 			ereport(elevel,
690 					(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
691 					 errmsg("skipping analyze of \"%s\" --- lock not available",
692 							relation->relname)));
693 		else
694 			ereport(elevel,
695 					(errcode(ERRCODE_UNDEFINED_TABLE),
696 					 errmsg("skipping analyze of \"%s\" --- relation no longer exists",
697 							relation->relname)));
698 	}
699 
700 	return NULL;
701 }
702 
703 
704 /*
705  * Given a VacuumRelation, fill in the table OID if it wasn't specified,
706  * and optionally add VacuumRelations for partitions of the table.
707  *
708  * If a VacuumRelation does not have an OID supplied and is a partitioned
709  * table, an extra entry will be added to the output for each partition.
710  * Presently, only autovacuum supplies OIDs when calling vacuum(), and
711  * it does not want us to expand partitioned tables.
712  *
713  * We take care not to modify the input data structure, but instead build
714  * new VacuumRelation(s) to return.  (But note that they will reference
715  * unmodified parts of the input, eg column lists.)  New data structures
716  * are made in vac_context.
717  */
718 static List *
719 expand_vacuum_rel(VacuumRelation *vrel, int options)
720 {
721 	List	   *vacrels = NIL;
722 	MemoryContext oldcontext;
723 
724 	/* If caller supplied OID, there's nothing we need do here. */
725 	if (OidIsValid(vrel->oid))
726 	{
727 		oldcontext = MemoryContextSwitchTo(vac_context);
728 		vacrels = lappend(vacrels, vrel);
729 		MemoryContextSwitchTo(oldcontext);
730 	}
731 	else
732 	{
733 		/* Process a specific relation, and possibly partitions thereof */
734 		Oid			relid;
735 		HeapTuple	tuple;
736 		Form_pg_class classForm;
737 		bool		include_parts;
738 		int			rvr_opts;
739 
740 		/*
741 		 * Since autovacuum workers supply OIDs when calling vacuum(), no
742 		 * autovacuum worker should reach this code.
743 		 */
744 		Assert(!IsAutoVacuumWorkerProcess());
745 
746 		/*
747 		 * We transiently take AccessShareLock to protect the syscache lookup
748 		 * below, as well as find_all_inheritors's expectation that the caller
749 		 * holds some lock on the starting relation.
750 		 */
751 		rvr_opts = (options & VACOPT_SKIP_LOCKED) ? RVR_SKIP_LOCKED : 0;
752 		relid = RangeVarGetRelidExtended(vrel->relation,
753 										 AccessShareLock,
754 										 rvr_opts,
755 										 NULL, NULL);
756 
757 		/*
758 		 * If the lock is unavailable, emit the same log statement that
759 		 * vacuum_rel() and analyze_rel() would.
760 		 */
761 		if (!OidIsValid(relid))
762 		{
763 			if (options & VACOPT_VACUUM)
764 				ereport(WARNING,
765 						(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
766 						 errmsg("skipping vacuum of \"%s\" --- lock not available",
767 								vrel->relation->relname)));
768 			else
769 				ereport(WARNING,
770 						(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
771 						 errmsg("skipping analyze of \"%s\" --- lock not available",
772 								vrel->relation->relname)));
773 			return vacrels;
774 		}
775 
776 		/*
777 		 * To check whether the relation is a partitioned table and its
778 		 * ownership, fetch its syscache entry.
779 		 */
780 		tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
781 		if (!HeapTupleIsValid(tuple))
782 			elog(ERROR, "cache lookup failed for relation %u", relid);
783 		classForm = (Form_pg_class) GETSTRUCT(tuple);
784 
785 		/*
786 		 * Make a returnable VacuumRelation for this rel if user is a proper
787 		 * owner.
788 		 */
789 		if (vacuum_is_relation_owner(relid, classForm, options))
790 		{
791 			oldcontext = MemoryContextSwitchTo(vac_context);
792 			vacrels = lappend(vacrels, makeVacuumRelation(vrel->relation,
793 														  relid,
794 														  vrel->va_cols));
795 			MemoryContextSwitchTo(oldcontext);
796 		}
797 
798 
799 		include_parts = (classForm->relkind == RELKIND_PARTITIONED_TABLE);
800 		ReleaseSysCache(tuple);
801 
802 		/*
803 		 * If it is, make relation list entries for its partitions.  Note that
804 		 * the list returned by find_all_inheritors() includes the passed-in
805 		 * OID, so we have to skip that.  There's no point in taking locks on
806 		 * the individual partitions yet, and doing so would just add
807 		 * unnecessary deadlock risk.  For this last reason we do not check
808 		 * yet the ownership of the partitions, which get added to the list to
809 		 * process.  Ownership will be checked later on anyway.
810 		 */
811 		if (include_parts)
812 		{
813 			List	   *part_oids = find_all_inheritors(relid, NoLock, NULL);
814 			ListCell   *part_lc;
815 
816 			foreach(part_lc, part_oids)
817 			{
818 				Oid			part_oid = lfirst_oid(part_lc);
819 
820 				if (part_oid == relid)
821 					continue;	/* ignore original table */
822 
823 				/*
824 				 * We omit a RangeVar since it wouldn't be appropriate to
825 				 * complain about failure to open one of these relations
826 				 * later.
827 				 */
828 				oldcontext = MemoryContextSwitchTo(vac_context);
829 				vacrels = lappend(vacrels, makeVacuumRelation(NULL,
830 															  part_oid,
831 															  vrel->va_cols));
832 				MemoryContextSwitchTo(oldcontext);
833 			}
834 		}
835 
836 		/*
837 		 * Release lock again.  This means that by the time we actually try to
838 		 * process the table, it might be gone or renamed.  In the former case
839 		 * we'll silently ignore it; in the latter case we'll process it
840 		 * anyway, but we must beware that the RangeVar doesn't necessarily
841 		 * identify it anymore.  This isn't ideal, perhaps, but there's little
842 		 * practical alternative, since we're typically going to commit this
843 		 * transaction and begin a new one between now and then.  Moreover,
844 		 * holding locks on multiple relations would create significant risk
845 		 * of deadlock.
846 		 */
847 		UnlockRelationOid(relid, AccessShareLock);
848 	}
849 
850 	return vacrels;
851 }
852 
853 /*
854  * Construct a list of VacuumRelations for all vacuumable rels in
855  * the current database.  The list is built in vac_context.
856  */
857 static List *
858 get_all_vacuum_rels(int options)
859 {
860 	List	   *vacrels = NIL;
861 	Relation	pgclass;
862 	TableScanDesc scan;
863 	HeapTuple	tuple;
864 
865 	pgclass = table_open(RelationRelationId, AccessShareLock);
866 
867 	scan = table_beginscan_catalog(pgclass, 0, NULL);
868 
869 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
870 	{
871 		Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
872 		MemoryContext oldcontext;
873 		Oid			relid = classForm->oid;
874 
875 		/* check permissions of relation */
876 		if (!vacuum_is_relation_owner(relid, classForm, options))
877 			continue;
878 
879 		/*
880 		 * We include partitioned tables here; depending on which operation is
881 		 * to be performed, caller will decide whether to process or ignore
882 		 * them.
883 		 */
884 		if (classForm->relkind != RELKIND_RELATION &&
885 			classForm->relkind != RELKIND_MATVIEW &&
886 			classForm->relkind != RELKIND_PARTITIONED_TABLE)
887 			continue;
888 
889 		/*
890 		 * Build VacuumRelation(s) specifying the table OIDs to be processed.
891 		 * We omit a RangeVar since it wouldn't be appropriate to complain
892 		 * about failure to open one of these relations later.
893 		 */
894 		oldcontext = MemoryContextSwitchTo(vac_context);
895 		vacrels = lappend(vacrels, makeVacuumRelation(NULL,
896 													  relid,
897 													  NIL));
898 		MemoryContextSwitchTo(oldcontext);
899 	}
900 
901 	table_endscan(scan);
902 	table_close(pgclass, AccessShareLock);
903 
904 	return vacrels;
905 }
906 
907 /*
908  * vacuum_set_xid_limits() -- compute oldestXmin and freeze cutoff points
909  *
910  * The output parameters are:
911  * - oldestXmin is the cutoff value used to distinguish whether tuples are
912  *	 DEAD or RECENTLY_DEAD (see HeapTupleSatisfiesVacuum).
913  * - freezeLimit is the Xid below which all Xids are replaced by
914  *	 FrozenTransactionId during vacuum.
915  * - xidFullScanLimit (computed from freeze_table_age parameter)
916  *	 represents a minimum Xid value; a table whose relfrozenxid is older than
917  *	 this will have a full-table vacuum applied to it, to freeze tuples across
918  *	 the whole table.  Vacuuming a table younger than this value can use a
919  *	 partial scan.
920  * - multiXactCutoff is the value below which all MultiXactIds are removed from
921  *	 Xmax.
922  * - mxactFullScanLimit is a value against which a table's relminmxid value is
923  *	 compared to produce a full-table vacuum, as with xidFullScanLimit.
924  *
925  * xidFullScanLimit and mxactFullScanLimit can be passed as NULL if caller is
926  * not interested.
927  */
928 void
929 vacuum_set_xid_limits(Relation rel,
930 					  int freeze_min_age,
931 					  int freeze_table_age,
932 					  int multixact_freeze_min_age,
933 					  int multixact_freeze_table_age,
934 					  TransactionId *oldestXmin,
935 					  TransactionId *freezeLimit,
936 					  TransactionId *xidFullScanLimit,
937 					  MultiXactId *multiXactCutoff,
938 					  MultiXactId *mxactFullScanLimit)
939 {
940 	int			freezemin;
941 	int			mxid_freezemin;
942 	int			effective_multixact_freeze_max_age;
943 	TransactionId limit;
944 	TransactionId safeLimit;
945 	MultiXactId oldestMxact;
946 	MultiXactId mxactLimit;
947 	MultiXactId safeMxactLimit;
948 
949 	/*
950 	 * We can always ignore processes running lazy vacuum.  This is because we
951 	 * use these values only for deciding which tuples we must keep in the
952 	 * tables.  Since lazy vacuum doesn't write its XID anywhere, it's safe to
953 	 * ignore it.  In theory it could be problematic to ignore lazy vacuums in
954 	 * a full vacuum, but keep in mind that only one vacuum process can be
955 	 * working on a particular table at any time, and that each vacuum is
956 	 * always an independent transaction.
957 	 */
958 	*oldestXmin =
959 		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, PROCARRAY_FLAGS_VACUUM), rel);
960 
961 	Assert(TransactionIdIsNormal(*oldestXmin));
962 
963 	/*
964 	 * Determine the minimum freeze age to use: as specified by the caller, or
965 	 * vacuum_freeze_min_age, but in any case not more than half
966 	 * autovacuum_freeze_max_age, so that autovacuums to prevent XID
967 	 * wraparound won't occur too frequently.
968 	 */
969 	freezemin = freeze_min_age;
970 	if (freezemin < 0)
971 		freezemin = vacuum_freeze_min_age;
972 	freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
973 	Assert(freezemin >= 0);
974 
975 	/*
976 	 * Compute the cutoff XID, being careful not to generate a "permanent" XID
977 	 */
978 	limit = *oldestXmin - freezemin;
979 	if (!TransactionIdIsNormal(limit))
980 		limit = FirstNormalTransactionId;
981 
982 	/*
983 	 * If oldestXmin is very far back (in practice, more than
984 	 * autovacuum_freeze_max_age / 2 XIDs old), complain and force a minimum
985 	 * freeze age of zero.
986 	 */
987 	safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
988 	if (!TransactionIdIsNormal(safeLimit))
989 		safeLimit = FirstNormalTransactionId;
990 
991 	if (TransactionIdPrecedes(limit, safeLimit))
992 	{
993 		ereport(WARNING,
994 				(errmsg("oldest xmin is far in the past"),
995 				 errhint("Close open transactions soon to avoid wraparound problems.\n"
996 						 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
997 		limit = *oldestXmin;
998 	}
999 
1000 	*freezeLimit = limit;
1001 
1002 	/*
1003 	 * Compute the multixact age for which freezing is urgent.  This is
1004 	 * normally autovacuum_multixact_freeze_max_age, but may be less if we are
1005 	 * short of multixact member space.
1006 	 */
1007 	effective_multixact_freeze_max_age = MultiXactMemberFreezeThreshold();
1008 
1009 	/*
1010 	 * Determine the minimum multixact freeze age to use: as specified by
1011 	 * caller, or vacuum_multixact_freeze_min_age, but in any case not more
1012 	 * than half effective_multixact_freeze_max_age, so that autovacuums to
1013 	 * prevent MultiXact wraparound won't occur too frequently.
1014 	 */
1015 	mxid_freezemin = multixact_freeze_min_age;
1016 	if (mxid_freezemin < 0)
1017 		mxid_freezemin = vacuum_multixact_freeze_min_age;
1018 	mxid_freezemin = Min(mxid_freezemin,
1019 						 effective_multixact_freeze_max_age / 2);
1020 	Assert(mxid_freezemin >= 0);
1021 
1022 	/* compute the cutoff multi, being careful to generate a valid value */
1023 	oldestMxact = GetOldestMultiXactId();
1024 	mxactLimit = oldestMxact - mxid_freezemin;
1025 	if (mxactLimit < FirstMultiXactId)
1026 		mxactLimit = FirstMultiXactId;
1027 
1028 	safeMxactLimit =
1029 		ReadNextMultiXactId() - effective_multixact_freeze_max_age;
1030 	if (safeMxactLimit < FirstMultiXactId)
1031 		safeMxactLimit = FirstMultiXactId;
1032 
1033 	if (MultiXactIdPrecedes(mxactLimit, safeMxactLimit))
1034 	{
1035 		ereport(WARNING,
1036 				(errmsg("oldest multixact is far in the past"),
1037 				 errhint("Close open transactions with multixacts soon to avoid wraparound problems.")));
1038 		/* Use the safe limit, unless an older mxact is still running */
1039 		if (MultiXactIdPrecedes(oldestMxact, safeMxactLimit))
1040 			mxactLimit = oldestMxact;
1041 		else
1042 			mxactLimit = safeMxactLimit;
1043 	}
1044 
1045 	*multiXactCutoff = mxactLimit;
1046 
1047 	if (xidFullScanLimit != NULL)
1048 	{
1049 		int			freezetable;
1050 
1051 		Assert(mxactFullScanLimit != NULL);
1052 
1053 		/*
1054 		 * Determine the table freeze age to use: as specified by the caller,
1055 		 * or vacuum_freeze_table_age, but in any case not more than
1056 		 * autovacuum_freeze_max_age * 0.95, so that if you have e.g nightly
1057 		 * VACUUM schedule, the nightly VACUUM gets a chance to freeze tuples
1058 		 * before anti-wraparound autovacuum is launched.
1059 		 */
1060 		freezetable = freeze_table_age;
1061 		if (freezetable < 0)
1062 			freezetable = vacuum_freeze_table_age;
1063 		freezetable = Min(freezetable, autovacuum_freeze_max_age * 0.95);
1064 		Assert(freezetable >= 0);
1065 
1066 		/*
1067 		 * Compute XID limit causing a full-table vacuum, being careful not to
1068 		 * generate a "permanent" XID.
1069 		 */
1070 		limit = ReadNewTransactionId() - freezetable;
1071 		if (!TransactionIdIsNormal(limit))
1072 			limit = FirstNormalTransactionId;
1073 
1074 		*xidFullScanLimit = limit;
1075 
1076 		/*
1077 		 * Similar to the above, determine the table freeze age to use for
1078 		 * multixacts: as specified by the caller, or
1079 		 * vacuum_multixact_freeze_table_age, but in any case not more than
1080 		 * autovacuum_multixact_freeze_table_age * 0.95, so that if you have
1081 		 * e.g. nightly VACUUM schedule, the nightly VACUUM gets a chance to
1082 		 * freeze multixacts before anti-wraparound autovacuum is launched.
1083 		 */
1084 		freezetable = multixact_freeze_table_age;
1085 		if (freezetable < 0)
1086 			freezetable = vacuum_multixact_freeze_table_age;
1087 		freezetable = Min(freezetable,
1088 						  effective_multixact_freeze_max_age * 0.95);
1089 		Assert(freezetable >= 0);
1090 
1091 		/*
1092 		 * Compute MultiXact limit causing a full-table vacuum, being careful
1093 		 * to generate a valid MultiXact value.
1094 		 */
1095 		mxactLimit = ReadNextMultiXactId() - freezetable;
1096 		if (mxactLimit < FirstMultiXactId)
1097 			mxactLimit = FirstMultiXactId;
1098 
1099 		*mxactFullScanLimit = mxactLimit;
1100 	}
1101 	else
1102 	{
1103 		Assert(mxactFullScanLimit == NULL);
1104 	}
1105 }
1106 
1107 /*
1108  * vac_estimate_reltuples() -- estimate the new value for pg_class.reltuples
1109  *
1110  *		If we scanned the whole relation then we should just use the count of
1111  *		live tuples seen; but if we did not, we should not blindly extrapolate
1112  *		from that number, since VACUUM may have scanned a quite nonrandom
1113  *		subset of the table.  When we have only partial information, we take
1114  *		the old value of pg_class.reltuples as a measurement of the
1115  *		tuple density in the unscanned pages.
1116  *
1117  *		Note: scanned_tuples should count only *live* tuples, since
1118  *		pg_class.reltuples is defined that way.
1119  */
1120 double
1121 vac_estimate_reltuples(Relation relation,
1122 					   BlockNumber total_pages,
1123 					   BlockNumber scanned_pages,
1124 					   double scanned_tuples)
1125 {
1126 	BlockNumber old_rel_pages = relation->rd_rel->relpages;
1127 	double		old_rel_tuples = relation->rd_rel->reltuples;
1128 	double		old_density;
1129 	double		unscanned_pages;
1130 	double		total_tuples;
1131 
1132 	/* If we did scan the whole table, just use the count as-is */
1133 	if (scanned_pages >= total_pages)
1134 		return scanned_tuples;
1135 
1136 	/*
1137 	 * If scanned_pages is zero but total_pages isn't, keep the existing value
1138 	 * of reltuples.  (Note: callers should avoid updating the pg_class
1139 	 * statistics in this situation, since no new information has been
1140 	 * provided.)
1141 	 */
1142 	if (scanned_pages == 0)
1143 		return old_rel_tuples;
1144 
1145 	/*
1146 	 * If old value of relpages is zero, old density is indeterminate; we
1147 	 * can't do much except scale up scanned_tuples to match total_pages.
1148 	 */
1149 	if (old_rel_pages == 0)
1150 		return floor((scanned_tuples / scanned_pages) * total_pages + 0.5);
1151 
1152 	/*
1153 	 * Okay, we've covered the corner cases.  The normal calculation is to
1154 	 * convert the old measurement to a density (tuples per page), then
1155 	 * estimate the number of tuples in the unscanned pages using that figure,
1156 	 * and finally add on the number of tuples in the scanned pages.
1157 	 */
1158 	old_density = old_rel_tuples / old_rel_pages;
1159 	unscanned_pages = (double) total_pages - (double) scanned_pages;
1160 	total_tuples = old_density * unscanned_pages + scanned_tuples;
1161 	return floor(total_tuples + 0.5);
1162 }
1163 
1164 
1165 /*
1166  *	vac_update_relstats() -- update statistics for one relation
1167  *
1168  *		Update the whole-relation statistics that are kept in its pg_class
1169  *		row.  There are additional stats that will be updated if we are
1170  *		doing ANALYZE, but we always update these stats.  This routine works
1171  *		for both index and heap relation entries in pg_class.
1172  *
1173  *		We violate transaction semantics here by overwriting the rel's
1174  *		existing pg_class tuple with the new values.  This is reasonably
1175  *		safe as long as we're sure that the new values are correct whether or
1176  *		not this transaction commits.  The reason for doing this is that if
1177  *		we updated these tuples in the usual way, vacuuming pg_class itself
1178  *		wouldn't work very well --- by the time we got done with a vacuum
1179  *		cycle, most of the tuples in pg_class would've been obsoleted.  Of
1180  *		course, this only works for fixed-size not-null columns, but these are.
1181  *
1182  *		Another reason for doing it this way is that when we are in a lazy
1183  *		VACUUM and have PROC_IN_VACUUM set, we mustn't do any regular updates.
1184  *		Somebody vacuuming pg_class might think they could delete a tuple
1185  *		marked with xmin = our xid.
1186  *
1187  *		In addition to fundamentally nontransactional statistics such as
1188  *		relpages and relallvisible, we try to maintain certain lazily-updated
1189  *		DDL flags such as relhasindex, by clearing them if no longer correct.
1190  *		It's safe to do this in VACUUM, which can't run in parallel with
1191  *		CREATE INDEX/RULE/TRIGGER and can't be part of a transaction block.
1192  *		However, it's *not* safe to do it in an ANALYZE that's within an
1193  *		outer transaction, because for example the current transaction might
1194  *		have dropped the last index; then we'd think relhasindex should be
1195  *		cleared, but if the transaction later rolls back this would be wrong.
1196  *		So we refrain from updating the DDL flags if we're inside an outer
1197  *		transaction.  This is OK since postponing the flag maintenance is
1198  *		always allowable.
1199  *
1200  *		Note: num_tuples should count only *live* tuples, since
1201  *		pg_class.reltuples is defined that way.
1202  *
1203  *		This routine is shared by VACUUM and ANALYZE.
1204  */
1205 void
1206 vac_update_relstats(Relation relation,
1207 					BlockNumber num_pages, double num_tuples,
1208 					BlockNumber num_all_visible_pages,
1209 					bool hasindex, TransactionId frozenxid,
1210 					MultiXactId minmulti,
1211 					bool in_outer_xact)
1212 {
1213 	Oid			relid = RelationGetRelid(relation);
1214 	Relation	rd;
1215 	HeapTuple	ctup;
1216 	Form_pg_class pgcform;
1217 	bool		dirty;
1218 
1219 	rd = table_open(RelationRelationId, RowExclusiveLock);
1220 
1221 	/* Fetch a copy of the tuple to scribble on */
1222 	ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
1223 	if (!HeapTupleIsValid(ctup))
1224 		elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
1225 			 relid);
1226 	pgcform = (Form_pg_class) GETSTRUCT(ctup);
1227 
1228 	/* Apply statistical updates, if any, to copied tuple */
1229 
1230 	dirty = false;
1231 	if (pgcform->relpages != (int32) num_pages)
1232 	{
1233 		pgcform->relpages = (int32) num_pages;
1234 		dirty = true;
1235 	}
1236 	if (pgcform->reltuples != (float4) num_tuples)
1237 	{
1238 		pgcform->reltuples = (float4) num_tuples;
1239 		dirty = true;
1240 	}
1241 	if (pgcform->relallvisible != (int32) num_all_visible_pages)
1242 	{
1243 		pgcform->relallvisible = (int32) num_all_visible_pages;
1244 		dirty = true;
1245 	}
1246 
1247 	/* Apply DDL updates, but not inside an outer transaction (see above) */
1248 
1249 	if (!in_outer_xact)
1250 	{
1251 		/*
1252 		 * If we didn't find any indexes, reset relhasindex.
1253 		 */
1254 		if (pgcform->relhasindex && !hasindex)
1255 		{
1256 			pgcform->relhasindex = false;
1257 			dirty = true;
1258 		}
1259 
1260 		/* We also clear relhasrules and relhastriggers if needed */
1261 		if (pgcform->relhasrules && relation->rd_rules == NULL)
1262 		{
1263 			pgcform->relhasrules = false;
1264 			dirty = true;
1265 		}
1266 		if (pgcform->relhastriggers && relation->trigdesc == NULL)
1267 		{
1268 			pgcform->relhastriggers = false;
1269 			dirty = true;
1270 		}
1271 	}
1272 
1273 	/*
1274 	 * Update relfrozenxid, unless caller passed InvalidTransactionId
1275 	 * indicating it has no new data.
1276 	 *
1277 	 * Ordinarily, we don't let relfrozenxid go backwards: if things are
1278 	 * working correctly, the only way the new frozenxid could be older would
1279 	 * be if a previous VACUUM was done with a tighter freeze_min_age, in
1280 	 * which case we don't want to forget the work it already did.  However,
1281 	 * if the stored relfrozenxid is "in the future", then it must be corrupt
1282 	 * and it seems best to overwrite it with the cutoff we used this time.
1283 	 * This should match vac_update_datfrozenxid() concerning what we consider
1284 	 * to be "in the future".
1285 	 */
1286 	if (TransactionIdIsNormal(frozenxid) &&
1287 		pgcform->relfrozenxid != frozenxid &&
1288 		(TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid) ||
1289 		 TransactionIdPrecedes(ReadNewTransactionId(),
1290 							   pgcform->relfrozenxid)))
1291 	{
1292 		pgcform->relfrozenxid = frozenxid;
1293 		dirty = true;
1294 	}
1295 
1296 	/* Similarly for relminmxid */
1297 	if (MultiXactIdIsValid(minmulti) &&
1298 		pgcform->relminmxid != minmulti &&
1299 		(MultiXactIdPrecedes(pgcform->relminmxid, minmulti) ||
1300 		 MultiXactIdPrecedes(ReadNextMultiXactId(), pgcform->relminmxid)))
1301 	{
1302 		pgcform->relminmxid = minmulti;
1303 		dirty = true;
1304 	}
1305 
1306 	/* If anything changed, write out the tuple. */
1307 	if (dirty)
1308 		heap_inplace_update(rd, ctup);
1309 
1310 	table_close(rd, RowExclusiveLock);
1311 }
1312 
1313 
1314 /*
1315  *	vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
1316  *
1317  *		Update pg_database's datfrozenxid entry for our database to be the
1318  *		minimum of the pg_class.relfrozenxid values.
1319  *
1320  *		Similarly, update our datminmxid to be the minimum of the
1321  *		pg_class.relminmxid values.
1322  *
1323  *		If we are able to advance either pg_database value, also try to
1324  *		truncate pg_xact and pg_multixact.
1325  *
1326  *		We violate transaction semantics here by overwriting the database's
1327  *		existing pg_database tuple with the new values.  This is reasonably
1328  *		safe since the new values are correct whether or not this transaction
1329  *		commits.  As with vac_update_relstats, this avoids leaving dead tuples
1330  *		behind after a VACUUM.
1331  */
1332 void
1333 vac_update_datfrozenxid(void)
1334 {
1335 	HeapTuple	tuple;
1336 	Form_pg_database dbform;
1337 	Relation	relation;
1338 	SysScanDesc scan;
1339 	HeapTuple	classTup;
1340 	TransactionId newFrozenXid;
1341 	MultiXactId newMinMulti;
1342 	TransactionId lastSaneFrozenXid;
1343 	MultiXactId lastSaneMinMulti;
1344 	bool		bogus = false;
1345 	bool		dirty = false;
1346 
1347 	/*
1348 	 * Restrict this task to one backend per database.  This avoids race
1349 	 * conditions that would move datfrozenxid or datminmxid backward.  It
1350 	 * avoids calling vac_truncate_clog() with a datfrozenxid preceding a
1351 	 * datfrozenxid passed to an earlier vac_truncate_clog() call.
1352 	 */
1353 	LockDatabaseFrozenIds(ExclusiveLock);
1354 
1355 	/*
1356 	 * Initialize the "min" calculation with GetOldestXmin, which is a
1357 	 * reasonable approximation to the minimum relfrozenxid for not-yet-
1358 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
1359 	 * So we cannot produce a wrong minimum by starting with this.
1360 	 */
1361 	newFrozenXid = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
1362 
1363 	/*
1364 	 * Similarly, initialize the MultiXact "min" with the value that would be
1365 	 * used on pg_class for new tables.  See AddNewRelationTuple().
1366 	 */
1367 	newMinMulti = GetOldestMultiXactId();
1368 
1369 	/*
1370 	 * Identify the latest relfrozenxid and relminmxid values that we could
1371 	 * validly see during the scan.  These are conservative values, but it's
1372 	 * not really worth trying to be more exact.
1373 	 */
1374 	lastSaneFrozenXid = ReadNewTransactionId();
1375 	lastSaneMinMulti = ReadNextMultiXactId();
1376 
1377 	/*
1378 	 * We must seqscan pg_class to find the minimum Xid, because there is no
1379 	 * index that can help us here.
1380 	 */
1381 	relation = table_open(RelationRelationId, AccessShareLock);
1382 
1383 	scan = systable_beginscan(relation, InvalidOid, false,
1384 							  NULL, 0, NULL);
1385 
1386 	while ((classTup = systable_getnext(scan)) != NULL)
1387 	{
1388 		Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
1389 
1390 		/*
1391 		 * Only consider relations able to hold unfrozen XIDs (anything else
1392 		 * should have InvalidTransactionId in relfrozenxid anyway).
1393 		 */
1394 		if (classForm->relkind != RELKIND_RELATION &&
1395 			classForm->relkind != RELKIND_MATVIEW &&
1396 			classForm->relkind != RELKIND_TOASTVALUE)
1397 		{
1398 			Assert(!TransactionIdIsValid(classForm->relfrozenxid));
1399 			Assert(!MultiXactIdIsValid(classForm->relminmxid));
1400 			continue;
1401 		}
1402 
1403 		/*
1404 		 * Some table AMs might not need per-relation xid / multixid horizons.
1405 		 * It therefore seems reasonable to allow relfrozenxid and relminmxid
1406 		 * to not be set (i.e. set to their respective Invalid*Id)
1407 		 * independently. Thus validate and compute horizon for each only if
1408 		 * set.
1409 		 *
1410 		 * If things are working properly, no relation should have a
1411 		 * relfrozenxid or relminmxid that is "in the future".  However, such
1412 		 * cases have been known to arise due to bugs in pg_upgrade.  If we
1413 		 * see any entries that are "in the future", chicken out and don't do
1414 		 * anything.  This ensures we won't truncate clog & multixact SLRUs
1415 		 * before those relations have been scanned and cleaned up.
1416 		 */
1417 
1418 		if (TransactionIdIsValid(classForm->relfrozenxid))
1419 		{
1420 			Assert(TransactionIdIsNormal(classForm->relfrozenxid));
1421 
1422 			/* check for values in the future */
1423 			if (TransactionIdPrecedes(lastSaneFrozenXid, classForm->relfrozenxid))
1424 			{
1425 				bogus = true;
1426 				break;
1427 			}
1428 
1429 			/* determine new horizon */
1430 			if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
1431 				newFrozenXid = classForm->relfrozenxid;
1432 		}
1433 
1434 		if (MultiXactIdIsValid(classForm->relminmxid))
1435 		{
1436 			/* check for values in the future */
1437 			if (MultiXactIdPrecedes(lastSaneMinMulti, classForm->relminmxid))
1438 			{
1439 				bogus = true;
1440 				break;
1441 			}
1442 
1443 			/* determine new horizon */
1444 			if (MultiXactIdPrecedes(classForm->relminmxid, newMinMulti))
1445 				newMinMulti = classForm->relminmxid;
1446 		}
1447 	}
1448 
1449 	/* we're done with pg_class */
1450 	systable_endscan(scan);
1451 	table_close(relation, AccessShareLock);
1452 
1453 	/* chicken out if bogus data found */
1454 	if (bogus)
1455 		return;
1456 
1457 	Assert(TransactionIdIsNormal(newFrozenXid));
1458 	Assert(MultiXactIdIsValid(newMinMulti));
1459 
1460 	/* Now fetch the pg_database tuple we need to update. */
1461 	relation = table_open(DatabaseRelationId, RowExclusiveLock);
1462 
1463 	/* Fetch a copy of the tuple to scribble on */
1464 	tuple = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
1465 	if (!HeapTupleIsValid(tuple))
1466 		elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
1467 	dbform = (Form_pg_database) GETSTRUCT(tuple);
1468 
1469 	/*
1470 	 * As in vac_update_relstats(), we ordinarily don't want to let
1471 	 * datfrozenxid go backward; but if it's "in the future" then it must be
1472 	 * corrupt and it seems best to overwrite it.
1473 	 */
1474 	if (dbform->datfrozenxid != newFrozenXid &&
1475 		(TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid) ||
1476 		 TransactionIdPrecedes(lastSaneFrozenXid, dbform->datfrozenxid)))
1477 	{
1478 		dbform->datfrozenxid = newFrozenXid;
1479 		dirty = true;
1480 	}
1481 	else
1482 		newFrozenXid = dbform->datfrozenxid;
1483 
1484 	/* Ditto for datminmxid */
1485 	if (dbform->datminmxid != newMinMulti &&
1486 		(MultiXactIdPrecedes(dbform->datminmxid, newMinMulti) ||
1487 		 MultiXactIdPrecedes(lastSaneMinMulti, dbform->datminmxid)))
1488 	{
1489 		dbform->datminmxid = newMinMulti;
1490 		dirty = true;
1491 	}
1492 	else
1493 		newMinMulti = dbform->datminmxid;
1494 
1495 	if (dirty)
1496 		heap_inplace_update(relation, tuple);
1497 
1498 	heap_freetuple(tuple);
1499 	table_close(relation, RowExclusiveLock);
1500 
1501 	/*
1502 	 * If we were able to advance datfrozenxid or datminmxid, see if we can
1503 	 * truncate pg_xact and/or pg_multixact.  Also do it if the shared
1504 	 * XID-wrap-limit info is stale, since this action will update that too.
1505 	 */
1506 	if (dirty || ForceTransactionIdLimitUpdate())
1507 		vac_truncate_clog(newFrozenXid, newMinMulti,
1508 						  lastSaneFrozenXid, lastSaneMinMulti);
1509 }
1510 
1511 
1512 /*
1513  *	vac_truncate_clog() -- attempt to truncate the commit log
1514  *
1515  *		Scan pg_database to determine the system-wide oldest datfrozenxid,
1516  *		and use it to truncate the transaction commit log (pg_xact).
1517  *		Also update the XID wrap limit info maintained by varsup.c.
1518  *		Likewise for datminmxid.
1519  *
1520  *		The passed frozenXID and minMulti are the updated values for my own
1521  *		pg_database entry. They're used to initialize the "min" calculations.
1522  *		The caller also passes the "last sane" XID and MXID, since it has
1523  *		those at hand already.
1524  *
1525  *		This routine is only invoked when we've managed to change our
1526  *		DB's datfrozenxid/datminmxid values, or we found that the shared
1527  *		XID-wrap-limit info is stale.
1528  */
1529 static void
1530 vac_truncate_clog(TransactionId frozenXID,
1531 				  MultiXactId minMulti,
1532 				  TransactionId lastSaneFrozenXid,
1533 				  MultiXactId lastSaneMinMulti)
1534 {
1535 	TransactionId nextXID = ReadNewTransactionId();
1536 	Relation	relation;
1537 	TableScanDesc scan;
1538 	HeapTuple	tuple;
1539 	Oid			oldestxid_datoid;
1540 	Oid			minmulti_datoid;
1541 	bool		bogus = false;
1542 	bool		frozenAlreadyWrapped = false;
1543 
1544 	/* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
1545 	LWLockAcquire(WrapLimitsVacuumLock, LW_EXCLUSIVE);
1546 
1547 	/* init oldest datoids to sync with my frozenXID/minMulti values */
1548 	oldestxid_datoid = MyDatabaseId;
1549 	minmulti_datoid = MyDatabaseId;
1550 
1551 	/*
1552 	 * Scan pg_database to compute the minimum datfrozenxid/datminmxid
1553 	 *
1554 	 * Since vac_update_datfrozenxid updates datfrozenxid/datminmxid in-place,
1555 	 * the values could change while we look at them.  Fetch each one just
1556 	 * once to ensure sane behavior of the comparison logic.  (Here, as in
1557 	 * many other places, we assume that fetching or updating an XID in shared
1558 	 * storage is atomic.)
1559 	 *
1560 	 * Note: we need not worry about a race condition with new entries being
1561 	 * inserted by CREATE DATABASE.  Any such entry will have a copy of some
1562 	 * existing DB's datfrozenxid, and that source DB cannot be ours because
1563 	 * of the interlock against copying a DB containing an active backend.
1564 	 * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
1565 	 * concurrently modify the datfrozenxid's of different databases, the
1566 	 * worst possible outcome is that pg_xact is not truncated as aggressively
1567 	 * as it could be.
1568 	 */
1569 	relation = table_open(DatabaseRelationId, AccessShareLock);
1570 
1571 	scan = table_beginscan_catalog(relation, 0, NULL);
1572 
1573 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1574 	{
1575 		volatile FormData_pg_database *dbform = (Form_pg_database) GETSTRUCT(tuple);
1576 		TransactionId datfrozenxid = dbform->datfrozenxid;
1577 		TransactionId datminmxid = dbform->datminmxid;
1578 
1579 		Assert(TransactionIdIsNormal(datfrozenxid));
1580 		Assert(MultiXactIdIsValid(datminmxid));
1581 
1582 		/*
1583 		 * If things are working properly, no database should have a
1584 		 * datfrozenxid or datminmxid that is "in the future".  However, such
1585 		 * cases have been known to arise due to bugs in pg_upgrade.  If we
1586 		 * see any entries that are "in the future", chicken out and don't do
1587 		 * anything.  This ensures we won't truncate clog before those
1588 		 * databases have been scanned and cleaned up.  (We will issue the
1589 		 * "already wrapped" warning if appropriate, though.)
1590 		 */
1591 		if (TransactionIdPrecedes(lastSaneFrozenXid, datfrozenxid) ||
1592 			MultiXactIdPrecedes(lastSaneMinMulti, datminmxid))
1593 			bogus = true;
1594 
1595 		if (TransactionIdPrecedes(nextXID, datfrozenxid))
1596 			frozenAlreadyWrapped = true;
1597 		else if (TransactionIdPrecedes(datfrozenxid, frozenXID))
1598 		{
1599 			frozenXID = datfrozenxid;
1600 			oldestxid_datoid = dbform->oid;
1601 		}
1602 
1603 		if (MultiXactIdPrecedes(datminmxid, minMulti))
1604 		{
1605 			minMulti = datminmxid;
1606 			minmulti_datoid = dbform->oid;
1607 		}
1608 	}
1609 
1610 	table_endscan(scan);
1611 
1612 	table_close(relation, AccessShareLock);
1613 
1614 	/*
1615 	 * Do not truncate CLOG if we seem to have suffered wraparound already;
1616 	 * the computed minimum XID might be bogus.  This case should now be
1617 	 * impossible due to the defenses in GetNewTransactionId, but we keep the
1618 	 * test anyway.
1619 	 */
1620 	if (frozenAlreadyWrapped)
1621 	{
1622 		ereport(WARNING,
1623 				(errmsg("some databases have not been vacuumed in over 2 billion transactions"),
1624 				 errdetail("You might have already suffered transaction-wraparound data loss.")));
1625 		return;
1626 	}
1627 
1628 	/* chicken out if data is bogus in any other way */
1629 	if (bogus)
1630 		return;
1631 
1632 	/*
1633 	 * Advance the oldest value for commit timestamps before truncating, so
1634 	 * that if a user requests a timestamp for a transaction we're truncating
1635 	 * away right after this point, they get NULL instead of an ugly "file not
1636 	 * found" error from slru.c.  This doesn't matter for xact/multixact
1637 	 * because they are not subject to arbitrary lookups from users.
1638 	 */
1639 	AdvanceOldestCommitTsXid(frozenXID);
1640 
1641 	/*
1642 	 * Truncate CLOG, multixact and CommitTs to the oldest computed value.
1643 	 */
1644 	TruncateCLOG(frozenXID, oldestxid_datoid);
1645 	TruncateCommitTs(frozenXID);
1646 	TruncateMultiXact(minMulti, minmulti_datoid);
1647 
1648 	/*
1649 	 * Update the wrap limit for GetNewTransactionId and creation of new
1650 	 * MultiXactIds.  Note: these functions will also signal the postmaster
1651 	 * for an(other) autovac cycle if needed.   XXX should we avoid possibly
1652 	 * signaling twice?
1653 	 */
1654 	SetTransactionIdLimit(frozenXID, oldestxid_datoid);
1655 	SetMultiXactIdLimit(minMulti, minmulti_datoid, false);
1656 
1657 	LWLockRelease(WrapLimitsVacuumLock);
1658 }
1659 
1660 
1661 /*
1662  *	vacuum_rel() -- vacuum one heap relation
1663  *
1664  *		relid identifies the relation to vacuum.  If relation is supplied,
1665  *		use the name therein for reporting any failure to open/lock the rel;
1666  *		do not use it once we've successfully opened the rel, since it might
1667  *		be stale.
1668  *
1669  *		Returns true if it's okay to proceed with a requested ANALYZE
1670  *		operation on this table.
1671  *
1672  *		Doing one heap at a time incurs extra overhead, since we need to
1673  *		check that the heap exists again just before we vacuum it.  The
1674  *		reason that we do this is so that vacuuming can be spread across
1675  *		many small transactions.  Otherwise, two-phase locking would require
1676  *		us to lock the entire database during one pass of the vacuum cleaner.
1677  *
1678  *		At entry and exit, we are not inside a transaction.
1679  */
1680 static bool
1681 vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
1682 {
1683 	LOCKMODE	lmode;
1684 	Relation	onerel;
1685 	LockRelId	onerelid;
1686 	Oid			toast_relid;
1687 	Oid			save_userid;
1688 	int			save_sec_context;
1689 	int			save_nestlevel;
1690 
1691 	Assert(params != NULL);
1692 
1693 	/* Begin a transaction for vacuuming this relation */
1694 	StartTransactionCommand();
1695 
1696 	/*
1697 	 * Functions in indexes may want a snapshot set.  Also, setting a snapshot
1698 	 * ensures that RecentGlobalXmin is kept truly recent.
1699 	 */
1700 	PushActiveSnapshot(GetTransactionSnapshot());
1701 
1702 	if (!(params->options & VACOPT_FULL))
1703 	{
1704 		/*
1705 		 * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
1706 		 * other concurrent VACUUMs know that they can ignore this one while
1707 		 * determining their OldestXmin.  (The reason we don't set it during a
1708 		 * full VACUUM is exactly that we may have to run user-defined
1709 		 * functions for functional indexes, and we want to make sure that if
1710 		 * they use the snapshot set above, any tuples it requires can't get
1711 		 * removed from other tables.  An index function that depends on the
1712 		 * contents of other tables is arguably broken, but we won't break it
1713 		 * here by violating transaction semantics.)
1714 		 *
1715 		 * We also set the VACUUM_FOR_WRAPAROUND flag, which is passed down by
1716 		 * autovacuum; it's used to avoid canceling a vacuum that was invoked
1717 		 * in an emergency.
1718 		 *
1719 		 * Note: these flags remain set until CommitTransaction or
1720 		 * AbortTransaction.  We don't want to clear them until we reset
1721 		 * MyPgXact->xid/xmin, else OldestXmin might appear to go backwards,
1722 		 * which is probably Not Good.
1723 		 */
1724 		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1725 		MyPgXact->vacuumFlags |= PROC_IN_VACUUM;
1726 		if (params->is_wraparound)
1727 			MyPgXact->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
1728 		LWLockRelease(ProcArrayLock);
1729 	}
1730 
1731 	/*
1732 	 * Check for user-requested abort.  Note we want this to be inside a
1733 	 * transaction, so xact.c doesn't issue useless WARNING.
1734 	 */
1735 	CHECK_FOR_INTERRUPTS();
1736 
1737 	/*
1738 	 * Determine the type of lock we want --- hard exclusive lock for a FULL
1739 	 * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
1740 	 * way, we can be sure that no other backend is vacuuming the same table.
1741 	 */
1742 	lmode = (params->options & VACOPT_FULL) ?
1743 		AccessExclusiveLock : ShareUpdateExclusiveLock;
1744 
1745 	/* open the relation and get the appropriate lock on it */
1746 	onerel = vacuum_open_relation(relid, relation, params->options,
1747 								  params->log_min_duration >= 0, lmode);
1748 
1749 	/* leave if relation could not be opened or locked */
1750 	if (!onerel)
1751 	{
1752 		PopActiveSnapshot();
1753 		CommitTransactionCommand();
1754 		return false;
1755 	}
1756 
1757 	/*
1758 	 * Check if relation needs to be skipped based on ownership.  This check
1759 	 * happens also when building the relation list to vacuum for a manual
1760 	 * operation, and needs to be done additionally here as VACUUM could
1761 	 * happen across multiple transactions where relation ownership could have
1762 	 * changed in-between.  Make sure to only generate logs for VACUUM in this
1763 	 * case.
1764 	 */
1765 	if (!vacuum_is_relation_owner(RelationGetRelid(onerel),
1766 								  onerel->rd_rel,
1767 								  params->options & VACOPT_VACUUM))
1768 	{
1769 		relation_close(onerel, lmode);
1770 		PopActiveSnapshot();
1771 		CommitTransactionCommand();
1772 		return false;
1773 	}
1774 
1775 	/*
1776 	 * Check that it's of a vacuumable relkind.
1777 	 */
1778 	if (onerel->rd_rel->relkind != RELKIND_RELATION &&
1779 		onerel->rd_rel->relkind != RELKIND_MATVIEW &&
1780 		onerel->rd_rel->relkind != RELKIND_TOASTVALUE &&
1781 		onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1782 	{
1783 		ereport(WARNING,
1784 				(errmsg("skipping \"%s\" --- cannot vacuum non-tables or special system tables",
1785 						RelationGetRelationName(onerel))));
1786 		relation_close(onerel, lmode);
1787 		PopActiveSnapshot();
1788 		CommitTransactionCommand();
1789 		return false;
1790 	}
1791 
1792 	/*
1793 	 * Silently ignore tables that are temp tables of other backends ---
1794 	 * trying to vacuum these will lead to great unhappiness, since their
1795 	 * contents are probably not up-to-date on disk.  (We don't throw a
1796 	 * warning here; it would just lead to chatter during a database-wide
1797 	 * VACUUM.)
1798 	 */
1799 	if (RELATION_IS_OTHER_TEMP(onerel))
1800 	{
1801 		relation_close(onerel, lmode);
1802 		PopActiveSnapshot();
1803 		CommitTransactionCommand();
1804 		return false;
1805 	}
1806 
1807 	/*
1808 	 * Silently ignore partitioned tables as there is no work to be done.  The
1809 	 * useful work is on their child partitions, which have been queued up for
1810 	 * us separately.
1811 	 */
1812 	if (onerel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1813 	{
1814 		relation_close(onerel, lmode);
1815 		PopActiveSnapshot();
1816 		CommitTransactionCommand();
1817 		/* It's OK to proceed with ANALYZE on this table */
1818 		return true;
1819 	}
1820 
1821 	/*
1822 	 * Get a session-level lock too. This will protect our access to the
1823 	 * relation across multiple transactions, so that we can vacuum the
1824 	 * relation's TOAST table (if any) secure in the knowledge that no one is
1825 	 * deleting the parent relation.
1826 	 *
1827 	 * NOTE: this cannot block, even if someone else is waiting for access,
1828 	 * because the lock manager knows that both lock requests are from the
1829 	 * same process.
1830 	 */
1831 	onerelid = onerel->rd_lockInfo.lockRelId;
1832 	LockRelationIdForSession(&onerelid, lmode);
1833 
1834 	/* Set index cleanup option based on reloptions if not yet */
1835 	if (params->index_cleanup == VACOPT_TERNARY_DEFAULT)
1836 	{
1837 		if (onerel->rd_options == NULL ||
1838 			((StdRdOptions *) onerel->rd_options)->vacuum_index_cleanup)
1839 			params->index_cleanup = VACOPT_TERNARY_ENABLED;
1840 		else
1841 			params->index_cleanup = VACOPT_TERNARY_DISABLED;
1842 	}
1843 
1844 	/* Set truncate option based on reloptions if not yet */
1845 	if (params->truncate == VACOPT_TERNARY_DEFAULT)
1846 	{
1847 		if (onerel->rd_options == NULL ||
1848 			((StdRdOptions *) onerel->rd_options)->vacuum_truncate)
1849 			params->truncate = VACOPT_TERNARY_ENABLED;
1850 		else
1851 			params->truncate = VACOPT_TERNARY_DISABLED;
1852 	}
1853 
1854 	/*
1855 	 * Remember the relation's TOAST relation for later, if the caller asked
1856 	 * us to process it.  In VACUUM FULL, though, the toast table is
1857 	 * automatically rebuilt by cluster_rel so we shouldn't recurse to it.
1858 	 */
1859 	if (!(params->options & VACOPT_SKIPTOAST) && !(params->options & VACOPT_FULL))
1860 		toast_relid = onerel->rd_rel->reltoastrelid;
1861 	else
1862 		toast_relid = InvalidOid;
1863 
1864 	/*
1865 	 * Switch to the table owner's userid, so that any index functions are run
1866 	 * as that user.  Also lock down security-restricted operations and
1867 	 * arrange to make GUC variable changes local to this command. (This is
1868 	 * unnecessary, but harmless, for lazy VACUUM.)
1869 	 */
1870 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
1871 	SetUserIdAndSecContext(onerel->rd_rel->relowner,
1872 						   save_sec_context | SECURITY_RESTRICTED_OPERATION);
1873 	save_nestlevel = NewGUCNestLevel();
1874 
1875 	/*
1876 	 * Do the actual work --- either FULL or "lazy" vacuum
1877 	 */
1878 	if (params->options & VACOPT_FULL)
1879 	{
1880 		int			cluster_options = 0;
1881 
1882 		/* close relation before vacuuming, but hold lock until commit */
1883 		relation_close(onerel, NoLock);
1884 		onerel = NULL;
1885 
1886 		if ((params->options & VACOPT_VERBOSE) != 0)
1887 			cluster_options |= CLUOPT_VERBOSE;
1888 
1889 		/* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
1890 		cluster_rel(relid, InvalidOid, cluster_options);
1891 	}
1892 	else
1893 		table_relation_vacuum(onerel, params, vac_strategy);
1894 
1895 	/* Roll back any GUC changes executed by index functions */
1896 	AtEOXact_GUC(false, save_nestlevel);
1897 
1898 	/* Restore userid and security context */
1899 	SetUserIdAndSecContext(save_userid, save_sec_context);
1900 
1901 	/* all done with this class, but hold lock until commit */
1902 	if (onerel)
1903 		relation_close(onerel, NoLock);
1904 
1905 	/*
1906 	 * Complete the transaction and free all temporary memory used.
1907 	 */
1908 	PopActiveSnapshot();
1909 	CommitTransactionCommand();
1910 
1911 	/*
1912 	 * If the relation has a secondary toast rel, vacuum that too while we
1913 	 * still hold the session lock on the master table.  Note however that
1914 	 * "analyze" will not get done on the toast table.  This is good, because
1915 	 * the toaster always uses hardcoded index access and statistics are
1916 	 * totally unimportant for toast relations.
1917 	 */
1918 	if (toast_relid != InvalidOid)
1919 		vacuum_rel(toast_relid, NULL, params);
1920 
1921 	/*
1922 	 * Now release the session-level lock on the master table.
1923 	 */
1924 	UnlockRelationIdForSession(&onerelid, lmode);
1925 
1926 	/* Report that we really did it. */
1927 	return true;
1928 }
1929 
1930 
1931 /*
1932  * Open all the vacuumable indexes of the given relation, obtaining the
1933  * specified kind of lock on each.  Return an array of Relation pointers for
1934  * the indexes into *Irel, and the number of indexes into *nindexes.
1935  *
1936  * We consider an index vacuumable if it is marked insertable (indisready).
1937  * If it isn't, probably a CREATE INDEX CONCURRENTLY command failed early in
1938  * execution, and what we have is too corrupt to be processable.  We will
1939  * vacuum even if the index isn't indisvalid; this is important because in a
1940  * unique index, uniqueness checks will be performed anyway and had better not
1941  * hit dangling index pointers.
1942  */
1943 void
1944 vac_open_indexes(Relation relation, LOCKMODE lockmode,
1945 				 int *nindexes, Relation **Irel)
1946 {
1947 	List	   *indexoidlist;
1948 	ListCell   *indexoidscan;
1949 	int			i;
1950 
1951 	Assert(lockmode != NoLock);
1952 
1953 	indexoidlist = RelationGetIndexList(relation);
1954 
1955 	/* allocate enough memory for all indexes */
1956 	i = list_length(indexoidlist);
1957 
1958 	if (i > 0)
1959 		*Irel = (Relation *) palloc(i * sizeof(Relation));
1960 	else
1961 		*Irel = NULL;
1962 
1963 	/* collect just the ready indexes */
1964 	i = 0;
1965 	foreach(indexoidscan, indexoidlist)
1966 	{
1967 		Oid			indexoid = lfirst_oid(indexoidscan);
1968 		Relation	indrel;
1969 
1970 		indrel = index_open(indexoid, lockmode);
1971 		if (indrel->rd_index->indisready)
1972 			(*Irel)[i++] = indrel;
1973 		else
1974 			index_close(indrel, lockmode);
1975 	}
1976 
1977 	*nindexes = i;
1978 
1979 	list_free(indexoidlist);
1980 }
1981 
1982 /*
1983  * Release the resources acquired by vac_open_indexes.  Optionally release
1984  * the locks (say NoLock to keep 'em).
1985  */
1986 void
1987 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
1988 {
1989 	if (Irel == NULL)
1990 		return;
1991 
1992 	while (nindexes--)
1993 	{
1994 		Relation	ind = Irel[nindexes];
1995 
1996 		index_close(ind, lockmode);
1997 	}
1998 	pfree(Irel);
1999 }
2000 
2001 /*
2002  * vacuum_delay_point --- check for interrupts and cost-based delay.
2003  *
2004  * This should be called in each major loop of VACUUM processing,
2005  * typically once per page processed.
2006  */
2007 void
2008 vacuum_delay_point(void)
2009 {
2010 	double		msec = 0;
2011 
2012 	/* Always check for interrupts */
2013 	CHECK_FOR_INTERRUPTS();
2014 
2015 	if (!VacuumCostActive || InterruptPending)
2016 		return;
2017 
2018 	/*
2019 	 * For parallel vacuum, the delay is computed based on the shared cost
2020 	 * balance.  See compute_parallel_delay.
2021 	 */
2022 	if (VacuumSharedCostBalance != NULL)
2023 		msec = compute_parallel_delay();
2024 	else if (VacuumCostBalance >= VacuumCostLimit)
2025 		msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
2026 
2027 	/* Nap if appropriate */
2028 	if (msec > 0)
2029 	{
2030 		if (msec > VacuumCostDelay * 4)
2031 			msec = VacuumCostDelay * 4;
2032 
2033 		pgstat_report_wait_start(WAIT_EVENT_VACUUM_DELAY);
2034 		pg_usleep((long) (msec * 1000));
2035 		pgstat_report_wait_end();
2036 
2037 		VacuumCostBalance = 0;
2038 
2039 		/* update balance values for workers */
2040 		AutoVacuumUpdateDelay();
2041 
2042 		/* Might have gotten an interrupt while sleeping */
2043 		CHECK_FOR_INTERRUPTS();
2044 	}
2045 }
2046 
2047 /*
2048  * Computes the vacuum delay for parallel workers.
2049  *
2050  * The basic idea of a cost-based delay for parallel vacuum is to allow each
2051  * worker to sleep in proportion to the share of work it's done.  We achieve this
2052  * by allowing all parallel vacuum workers including the leader process to
2053  * have a shared view of cost related parameters (mainly VacuumCostBalance).
2054  * We allow each worker to update it as and when it has incurred any cost and
2055  * then based on that decide whether it needs to sleep.  We compute the time
2056  * to sleep for a worker based on the cost it has incurred
2057  * (VacuumCostBalanceLocal) and then reduce the VacuumSharedCostBalance by
2058  * that amount.  This avoids putting to sleep those workers which have done less
2059  * I/O than other workers and therefore ensure that workers
2060  * which are doing more I/O got throttled more.
2061  *
2062  * We allow a worker to sleep only if it has performed I/O above a certain
2063  * threshold, which is calculated based on the number of active workers
2064  * (VacuumActiveNWorkers), and the overall cost balance is more than
2065  * VacuumCostLimit set by the system.  Testing reveals that we achieve
2066  * the required throttling if we force a worker that has done more than 50%
2067  * of its share of work to sleep.
2068  */
2069 static double
2070 compute_parallel_delay(void)
2071 {
2072 	double		msec = 0;
2073 	uint32		shared_balance;
2074 	int			nworkers;
2075 
2076 	/* Parallel vacuum must be active */
2077 	Assert(VacuumSharedCostBalance);
2078 
2079 	nworkers = pg_atomic_read_u32(VacuumActiveNWorkers);
2080 
2081 	/* At least count itself */
2082 	Assert(nworkers >= 1);
2083 
2084 	/* Update the shared cost balance value atomically */
2085 	shared_balance = pg_atomic_add_fetch_u32(VacuumSharedCostBalance, VacuumCostBalance);
2086 
2087 	/* Compute the total local balance for the current worker */
2088 	VacuumCostBalanceLocal += VacuumCostBalance;
2089 
2090 	if ((shared_balance >= VacuumCostLimit) &&
2091 		(VacuumCostBalanceLocal > 0.5 * ((double) VacuumCostLimit / nworkers)))
2092 	{
2093 		/* Compute sleep time based on the local cost balance */
2094 		msec = VacuumCostDelay * VacuumCostBalanceLocal / VacuumCostLimit;
2095 		pg_atomic_sub_fetch_u32(VacuumSharedCostBalance, VacuumCostBalanceLocal);
2096 		VacuumCostBalanceLocal = 0;
2097 	}
2098 
2099 	/*
2100 	 * Reset the local balance as we accumulated it into the shared value.
2101 	 */
2102 	VacuumCostBalance = 0;
2103 
2104 	return msec;
2105 }
2106 
2107 /*
2108  * A wrapper function of defGetBoolean().
2109  *
2110  * This function returns VACOPT_TERNARY_ENABLED and VACOPT_TERNARY_DISABLED
2111  * instead of true and false.
2112  */
2113 static VacOptTernaryValue
2114 get_vacopt_ternary_value(DefElem *def)
2115 {
2116 	return defGetBoolean(def) ? VACOPT_TERNARY_ENABLED : VACOPT_TERNARY_DISABLED;
2117 }
2118