1 /*-------------------------------------------------------------------------
2  *
3  * vacuumlazy.c
4  *	  Concurrent ("lazy") vacuuming.
5  *
6  *
7  * The major space usage for LAZY VACUUM is storage for the array of dead tuple
8  * TIDs.  We want to ensure we can vacuum even the very largest relations with
9  * finite memory space usage.  To do that, we set upper bounds on the number of
10  * tuples we will keep track of at once.
11  *
12  * We are willing to use at most maintenance_work_mem (or perhaps
13  * autovacuum_work_mem) memory space to keep track of dead tuples.  We
14  * initially allocate an array of TIDs of that size, with an upper limit that
15  * depends on table size (this limit ensures we don't allocate a huge area
16  * uselessly for vacuuming small tables).  If the array threatens to overflow,
17  * we suspend the heap scan phase and perform a pass of index cleanup and page
18  * compaction, then resume the heap scan with an empty TID array.
19  *
20  * If we're processing a table with no indexes, we can just vacuum each page
21  * as we go; there's no need to save up multiple tuples to minimize the number
22  * of index scans performed.  So we don't use maintenance_work_mem memory for
23  * the TID array, just enough to hold as many heap tuples as fit on one page.
24  *
25  * Lazy vacuum supports parallel execution with parallel worker processes.  In
26  * a parallel vacuum, we perform both index vacuum and index cleanup with
27  * parallel worker processes.  Individual indexes are processed by one vacuum
28  * process.  At the beginning of a lazy vacuum (at lazy_scan_heap) we prepare
29  * the parallel context and initialize the DSM segment that contains shared
30  * information as well as the memory space for storing dead tuples.  When
31  * starting either index vacuum or index cleanup, we launch parallel worker
32  * processes.  Once all indexes are processed the parallel worker processes
33  * exit.  After that, the leader process re-initializes the parallel context
34  * so that it can use the same DSM for multiple passes of index vacuum and
35  * for performing index cleanup.  For updating the index statistics, we need
36  * to update the system table and since updates are not allowed during
37  * parallel mode we update the index statistics after exiting from the
38  * parallel mode.
39  *
40  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
41  * Portions Copyright (c) 1994, Regents of the University of California
42  *
43  *
44  * IDENTIFICATION
45  *	  src/backend/access/heap/vacuumlazy.c
46  *
47  *-------------------------------------------------------------------------
48  */
49 #include "postgres.h"
50 
51 #include <math.h>
52 
53 #include "access/amapi.h"
54 #include "access/genam.h"
55 #include "access/heapam.h"
56 #include "access/heapam_xlog.h"
57 #include "access/htup_details.h"
58 #include "access/multixact.h"
59 #include "access/parallel.h"
60 #include "access/transam.h"
61 #include "access/visibilitymap.h"
62 #include "access/xact.h"
63 #include "access/xlog.h"
64 #include "catalog/storage.h"
65 #include "commands/dbcommands.h"
66 #include "commands/progress.h"
67 #include "commands/vacuum.h"
68 #include "executor/instrument.h"
69 #include "miscadmin.h"
70 #include "optimizer/paths.h"
71 #include "pgstat.h"
72 #include "portability/instr_time.h"
73 #include "postmaster/autovacuum.h"
74 #include "storage/bufmgr.h"
75 #include "storage/freespace.h"
76 #include "storage/lmgr.h"
77 #include "tcop/tcopprot.h"
78 #include "utils/lsyscache.h"
79 #include "utils/memutils.h"
80 #include "utils/pg_rusage.h"
81 #include "utils/timestamp.h"
82 
83 
84 /*
85  * Space/time tradeoff parameters: do these need to be user-tunable?
86  *
87  * To consider truncating the relation, we want there to be at least
88  * REL_TRUNCATE_MINIMUM or (relsize / REL_TRUNCATE_FRACTION) (whichever
89  * is less) potentially-freeable pages.
90  */
91 #define REL_TRUNCATE_MINIMUM	1000
92 #define REL_TRUNCATE_FRACTION	16
93 
94 /*
95  * Timing parameters for truncate locking heuristics.
96  *
97  * These were not exposed as user tunable GUC values because it didn't seem
98  * that the potential for improvement was great enough to merit the cost of
99  * supporting them.
100  */
101 #define VACUUM_TRUNCATE_LOCK_CHECK_INTERVAL		20	/* ms */
102 #define VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL		50	/* ms */
103 #define VACUUM_TRUNCATE_LOCK_TIMEOUT			5000	/* ms */
104 
105 /*
106  * When a table has no indexes, vacuum the FSM after every 8GB, approximately
107  * (it won't be exact because we only vacuum FSM after processing a heap page
108  * that has some removable tuples).  When there are indexes, this is ignored,
109  * and we vacuum FSM after each index/heap cleaning pass.
110  */
111 #define VACUUM_FSM_EVERY_PAGES \
112 	((BlockNumber) (((uint64) 8 * 1024 * 1024 * 1024) / BLCKSZ))
113 
114 /*
115  * Guesstimation of number of dead tuples per page.  This is used to
116  * provide an upper limit to memory allocated when vacuuming small
117  * tables.
118  */
119 #define LAZY_ALLOC_TUPLES		MaxHeapTuplesPerPage
120 
121 /*
122  * Before we consider skipping a page that's marked as clean in
123  * visibility map, we must've seen at least this many clean pages.
124  */
125 #define SKIP_PAGES_THRESHOLD	((BlockNumber) 32)
126 
127 /*
128  * Size of the prefetch window for lazy vacuum backwards truncation scan.
129  * Needs to be a power of 2.
130  */
131 #define PREFETCH_SIZE			((BlockNumber) 32)
132 
133 /*
134  * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
135  * we don't need to worry about DSM keys conflicting with plan_node_id we can
136  * use small integers.
137  */
138 #define PARALLEL_VACUUM_KEY_SHARED			1
139 #define PARALLEL_VACUUM_KEY_DEAD_TUPLES		2
140 #define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
141 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE	4
142 #define PARALLEL_VACUUM_KEY_WAL_USAGE		5
143 
144 /*
145  * Macro to check if we are in a parallel vacuum.  If true, we are in the
146  * parallel mode and the DSM segment is initialized.
147  */
148 #define ParallelVacuumIsActive(lps) PointerIsValid(lps)
149 
150 /* Phases of vacuum during which we report error context. */
151 typedef enum
152 {
153 	VACUUM_ERRCB_PHASE_UNKNOWN,
154 	VACUUM_ERRCB_PHASE_SCAN_HEAP,
155 	VACUUM_ERRCB_PHASE_VACUUM_INDEX,
156 	VACUUM_ERRCB_PHASE_VACUUM_HEAP,
157 	VACUUM_ERRCB_PHASE_INDEX_CLEANUP,
158 	VACUUM_ERRCB_PHASE_TRUNCATE
159 } VacErrPhase;
160 
161 /*
162  * LVDeadTuples stores the dead tuple TIDs collected during the heap scan.
163  * This is allocated in the DSM segment in parallel mode and in local memory
164  * in non-parallel mode.
165  */
166 typedef struct LVDeadTuples
167 {
168 	int			max_tuples;		/* # slots allocated in array */
169 	int			num_tuples;		/* current # of entries */
170 	/* List of TIDs of tuples we intend to delete */
171 	/* NB: this list is ordered by TID address */
172 	ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER];	/* array of
173 														 * ItemPointerData */
174 } LVDeadTuples;
175 
176 /* The dead tuple space consists of LVDeadTuples and dead tuple TIDs */
177 #define SizeOfDeadTuples(cnt) \
178 	add_size(offsetof(LVDeadTuples, itemptrs), \
179 			 mul_size(sizeof(ItemPointerData), cnt))
180 #define MAXDEADTUPLES(max_size) \
181 		(((max_size) - offsetof(LVDeadTuples, itemptrs)) / sizeof(ItemPointerData))
182 
183 /*
184  * Shared information among parallel workers.  So this is allocated in the DSM
185  * segment.
186  */
187 typedef struct LVShared
188 {
189 	/*
190 	 * Target table relid and log level.  These fields are not modified during
191 	 * the lazy vacuum.
192 	 */
193 	Oid			relid;
194 	int			elevel;
195 
196 	/*
197 	 * An indication for vacuum workers to perform either index vacuum or
198 	 * index cleanup.  first_time is true only if for_cleanup is true and
199 	 * bulk-deletion is not performed yet.
200 	 */
201 	bool		for_cleanup;
202 	bool		first_time;
203 
204 	/*
205 	 * Fields for both index vacuum and cleanup.
206 	 *
207 	 * reltuples is the total number of input heap tuples.  We set either old
208 	 * live tuples in the index vacuum case or the new live tuples in the
209 	 * index cleanup case.
210 	 *
211 	 * estimated_count is true if reltuples is an estimated value.
212 	 */
213 	double		reltuples;
214 	bool		estimated_count;
215 
216 	/*
217 	 * In single process lazy vacuum we could consume more memory during index
218 	 * vacuuming or cleanup apart from the memory for heap scanning.  In
219 	 * parallel vacuum, since individual vacuum workers can consume memory
220 	 * equal to maintenance_work_mem, the new maintenance_work_mem for each
221 	 * worker is set such that the parallel operation doesn't consume more
222 	 * memory than single process lazy vacuum.
223 	 */
224 	int			maintenance_work_mem_worker;
225 
226 	/*
227 	 * Shared vacuum cost balance.  During parallel vacuum,
228 	 * VacuumSharedCostBalance points to this value and it accumulates the
229 	 * balance of each parallel vacuum worker.
230 	 */
231 	pg_atomic_uint32 cost_balance;
232 
233 	/*
234 	 * Number of active parallel workers.  This is used for computing the
235 	 * minimum threshold of the vacuum cost balance before a worker sleeps for
236 	 * cost-based delay.
237 	 */
238 	pg_atomic_uint32 active_nworkers;
239 
240 	/*
241 	 * Variables to control parallel vacuum.  We have a bitmap to indicate
242 	 * which index has stats in shared memory.  The set bit in the map
243 	 * indicates that the particular index supports a parallel vacuum.
244 	 */
245 	pg_atomic_uint32 idx;		/* counter for vacuuming and clean up */
246 	uint32		offset;			/* sizeof header incl. bitmap */
247 	bits8		bitmap[FLEXIBLE_ARRAY_MEMBER];	/* bit map of NULLs */
248 
249 	/* Shared index statistics data follows at end of struct */
250 } LVShared;
251 
252 #define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8))
253 #define GetSharedIndStats(s) \
254 	((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset))
255 #define IndStatsIsNull(s, i) \
256 	(!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
257 
258 /*
259  * Struct for an index bulk-deletion statistic used for parallel vacuum.  This
260  * is allocated in the DSM segment.
261  */
262 typedef struct LVSharedIndStats
263 {
264 	bool		updated;		/* are the stats updated? */
265 	IndexBulkDeleteResult stats;
266 } LVSharedIndStats;
267 
268 /* Struct for maintaining a parallel vacuum state. */
269 typedef struct LVParallelState
270 {
271 	ParallelContext *pcxt;
272 
273 	/* Shared information among parallel vacuum workers */
274 	LVShared   *lvshared;
275 
276 	/* Points to buffer usage area in DSM */
277 	BufferUsage *buffer_usage;
278 
279 	/* Points to WAL usage area in DSM */
280 	WalUsage   *wal_usage;
281 
282 	/*
283 	 * The number of indexes that support parallel index bulk-deletion and
284 	 * parallel index cleanup respectively.
285 	 */
286 	int			nindexes_parallel_bulkdel;
287 	int			nindexes_parallel_cleanup;
288 	int			nindexes_parallel_condcleanup;
289 } LVParallelState;
290 
291 typedef struct LVRelStats
292 {
293 	char	   *relnamespace;
294 	char	   *relname;
295 	/* useindex = true means two-pass strategy; false means one-pass */
296 	bool		useindex;
297 	/* Overall statistics about rel */
298 	BlockNumber old_rel_pages;	/* previous value of pg_class.relpages */
299 	BlockNumber rel_pages;		/* total number of pages */
300 	BlockNumber scanned_pages;	/* number of pages we examined */
301 	BlockNumber pinskipped_pages;	/* # of pages we skipped due to a pin */
302 	BlockNumber frozenskipped_pages;	/* # of frozen pages we skipped */
303 	BlockNumber tupcount_pages; /* pages whose tuples we counted */
304 	double		old_live_tuples;	/* previous value of pg_class.reltuples */
305 	double		new_rel_tuples; /* new estimated total # of tuples */
306 	double		new_live_tuples;	/* new estimated total # of live tuples */
307 	double		new_dead_tuples;	/* new estimated total # of dead tuples */
308 	BlockNumber pages_removed;
309 	double		tuples_deleted;
310 	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
311 	LVDeadTuples *dead_tuples;
312 	int			num_index_scans;
313 	TransactionId latestRemovedXid;
314 	bool		lock_waiter_detected;
315 
316 	/* Used for error callback */
317 	char	   *indname;
318 	BlockNumber blkno;			/* used only for heap operations */
319 	VacErrPhase phase;
320 } LVRelStats;
321 
322 /* Struct for saving and restoring vacuum error information. */
323 typedef struct LVSavedErrInfo
324 {
325 	BlockNumber blkno;
326 	VacErrPhase phase;
327 } LVSavedErrInfo;
328 
329 /* A few variables that don't seem worth passing around as parameters */
330 static int	elevel = -1;
331 
332 static TransactionId OldestXmin;
333 static TransactionId FreezeLimit;
334 static MultiXactId MultiXactCutoff;
335 
336 static BufferAccessStrategy vac_strategy;
337 
338 
339 /* non-export function prototypes */
340 static void lazy_scan_heap(Relation onerel, VacuumParams *params,
341 						   LVRelStats *vacrelstats, Relation *Irel, int nindexes,
342 						   bool aggressive);
343 static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
344 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
345 static void lazy_vacuum_all_indexes(Relation onerel, Relation *Irel,
346 									IndexBulkDeleteResult **stats,
347 									LVRelStats *vacrelstats, LVParallelState *lps,
348 									int nindexes);
349 static void lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
350 							  LVDeadTuples *dead_tuples, double reltuples, LVRelStats *vacrelstats);
351 static void lazy_cleanup_index(Relation indrel,
352 							   IndexBulkDeleteResult **stats,
353 							   double reltuples, bool estimated_count, LVRelStats *vacrelstats);
354 static int	lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
355 							 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
356 static bool should_attempt_truncation(VacuumParams *params,
357 									  LVRelStats *vacrelstats);
358 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
359 static BlockNumber count_nondeletable_pages(Relation onerel,
360 											LVRelStats *vacrelstats);
361 static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
362 static void lazy_record_dead_tuple(LVDeadTuples *dead_tuples,
363 								   ItemPointer itemptr);
364 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
365 static int	vac_cmp_itemptr(const void *left, const void *right);
366 static bool heap_page_is_all_visible(Relation rel, Buffer buf,
367 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
368 static void lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
369 										 LVRelStats *vacrelstats, LVParallelState *lps,
370 										 int nindexes);
371 static void parallel_vacuum_index(Relation *Irel, IndexBulkDeleteResult **stats,
372 								  LVShared *lvshared, LVDeadTuples *dead_tuples,
373 								  int nindexes, LVRelStats *vacrelstats);
374 static void vacuum_indexes_leader(Relation *Irel, IndexBulkDeleteResult **stats,
375 								  LVRelStats *vacrelstats, LVParallelState *lps,
376 								  int nindexes);
377 static void vacuum_one_index(Relation indrel, IndexBulkDeleteResult **stats,
378 							 LVShared *lvshared, LVSharedIndStats *shared_indstats,
379 							 LVDeadTuples *dead_tuples, LVRelStats *vacrelstats);
380 static void lazy_cleanup_all_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
381 									 LVRelStats *vacrelstats, LVParallelState *lps,
382 									 int nindexes);
383 static long compute_max_dead_tuples(BlockNumber relblocks, bool hasindex);
384 static int	compute_parallel_vacuum_workers(Relation *Irel, int nindexes, int nrequested,
385 											bool *can_parallel_vacuum);
386 static void prepare_index_statistics(LVShared *lvshared, bool *can_parallel_vacuum,
387 									 int nindexes);
388 static void update_index_statistics(Relation *Irel, IndexBulkDeleteResult **stats,
389 									int nindexes);
390 static LVParallelState *begin_parallel_vacuum(Oid relid, Relation *Irel,
391 											  LVRelStats *vacrelstats, BlockNumber nblocks,
392 											  int nindexes, int nrequested);
393 static void end_parallel_vacuum(IndexBulkDeleteResult **stats,
394 								LVParallelState *lps, int nindexes);
395 static LVSharedIndStats *get_indstats(LVShared *lvshared, int n);
396 static bool skip_parallel_vacuum_index(Relation indrel, LVShared *lvshared);
397 static void vacuum_error_callback(void *arg);
398 static void update_vacuum_error_info(LVRelStats *errinfo, LVSavedErrInfo *saved_err_info,
399 									 int phase, BlockNumber blkno);
400 static void restore_vacuum_error_info(LVRelStats *errinfo, const LVSavedErrInfo *saved_err_info);
401 
402 
403 /*
404  *	heap_vacuum_rel() -- perform VACUUM for one heap relation
405  *
406  *		This routine vacuums a single heap, cleans out its indexes, and
407  *		updates its relpages and reltuples statistics.
408  *
409  *		At entry, we have already established a transaction and opened
410  *		and locked the relation.
411  */
412 void
heap_vacuum_rel(Relation onerel,VacuumParams * params,BufferAccessStrategy bstrategy)413 heap_vacuum_rel(Relation onerel, VacuumParams *params,
414 				BufferAccessStrategy bstrategy)
415 {
416 	LVRelStats *vacrelstats;
417 	Relation   *Irel;
418 	int			nindexes;
419 	PGRUsage	ru0;
420 	TimestampTz starttime = 0;
421 	WalUsage	walusage_start = pgWalUsage;
422 	WalUsage	walusage = {0, 0, 0};
423 	long		secs;
424 	int			usecs;
425 	double		read_rate,
426 				write_rate;
427 	bool		aggressive;		/* should we scan all unfrozen pages? */
428 	bool		scanned_all_unfrozen;	/* actually scanned all such pages? */
429 	TransactionId xidFullScanLimit;
430 	MultiXactId mxactFullScanLimit;
431 	BlockNumber new_rel_pages;
432 	BlockNumber new_rel_allvisible;
433 	double		new_live_tuples;
434 	TransactionId new_frozen_xid;
435 	MultiXactId new_min_multi;
436 	ErrorContextCallback errcallback;
437 
438 	Assert(params != NULL);
439 	Assert(params->index_cleanup != VACOPT_TERNARY_DEFAULT);
440 	Assert(params->truncate != VACOPT_TERNARY_DEFAULT);
441 
442 	/* not every AM requires these to be valid, but heap does */
443 	Assert(TransactionIdIsNormal(onerel->rd_rel->relfrozenxid));
444 	Assert(MultiXactIdIsValid(onerel->rd_rel->relminmxid));
445 
446 	/* measure elapsed time iff autovacuum logging requires it */
447 	if (IsAutoVacuumWorkerProcess() && params->log_min_duration >= 0)
448 	{
449 		pg_rusage_init(&ru0);
450 		starttime = GetCurrentTimestamp();
451 	}
452 
453 	if (params->options & VACOPT_VERBOSE)
454 		elevel = INFO;
455 	else
456 		elevel = DEBUG2;
457 
458 	pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM,
459 								  RelationGetRelid(onerel));
460 
461 	vac_strategy = bstrategy;
462 
463 	vacuum_set_xid_limits(onerel,
464 						  params->freeze_min_age,
465 						  params->freeze_table_age,
466 						  params->multixact_freeze_min_age,
467 						  params->multixact_freeze_table_age,
468 						  &OldestXmin, &FreezeLimit, &xidFullScanLimit,
469 						  &MultiXactCutoff, &mxactFullScanLimit);
470 
471 	/*
472 	 * We request an aggressive scan if the table's frozen Xid is now older
473 	 * than or equal to the requested Xid full-table scan limit; or if the
474 	 * table's minimum MultiXactId is older than or equal to the requested
475 	 * mxid full-table scan limit; or if DISABLE_PAGE_SKIPPING was specified.
476 	 */
477 	aggressive = TransactionIdPrecedesOrEquals(onerel->rd_rel->relfrozenxid,
478 											   xidFullScanLimit);
479 	aggressive |= MultiXactIdPrecedesOrEquals(onerel->rd_rel->relminmxid,
480 											  mxactFullScanLimit);
481 	if (params->options & VACOPT_DISABLE_PAGE_SKIPPING)
482 		aggressive = true;
483 
484 	vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));
485 
486 	vacrelstats->relnamespace = get_namespace_name(RelationGetNamespace(onerel));
487 	vacrelstats->relname = pstrdup(RelationGetRelationName(onerel));
488 	vacrelstats->indname = NULL;
489 	vacrelstats->phase = VACUUM_ERRCB_PHASE_UNKNOWN;
490 	vacrelstats->old_rel_pages = onerel->rd_rel->relpages;
491 	vacrelstats->old_live_tuples = onerel->rd_rel->reltuples;
492 	vacrelstats->num_index_scans = 0;
493 	vacrelstats->pages_removed = 0;
494 	vacrelstats->lock_waiter_detected = false;
495 
496 	/* Open all indexes of the relation */
497 	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
498 	vacrelstats->useindex = (nindexes > 0 &&
499 							 params->index_cleanup == VACOPT_TERNARY_ENABLED);
500 
501 	/*
502 	 * Setup error traceback support for ereport().  The idea is to set up an
503 	 * error context callback to display additional information on any error
504 	 * during a vacuum.  During different phases of vacuum (heap scan, heap
505 	 * vacuum, index vacuum, index clean up, heap truncate), we update the
506 	 * error context callback to display appropriate information.
507 	 *
508 	 * Note that the index vacuum and heap vacuum phases may be called
509 	 * multiple times in the middle of the heap scan phase.  So the old phase
510 	 * information is restored at the end of those phases.
511 	 */
512 	errcallback.callback = vacuum_error_callback;
513 	errcallback.arg = vacrelstats;
514 	errcallback.previous = error_context_stack;
515 	error_context_stack = &errcallback;
516 
517 	/* Do the vacuuming */
518 	lazy_scan_heap(onerel, params, vacrelstats, Irel, nindexes, aggressive);
519 
520 	/* Done with indexes */
521 	vac_close_indexes(nindexes, Irel, NoLock);
522 
523 	/*
524 	 * Compute whether we actually scanned the all unfrozen pages. If we did,
525 	 * we can adjust relfrozenxid and relminmxid.
526 	 *
527 	 * NB: We need to check this before truncating the relation, because that
528 	 * will change ->rel_pages.
529 	 */
530 	if ((vacrelstats->scanned_pages + vacrelstats->frozenskipped_pages)
531 		< vacrelstats->rel_pages)
532 	{
533 		Assert(!aggressive);
534 		scanned_all_unfrozen = false;
535 	}
536 	else
537 		scanned_all_unfrozen = true;
538 
539 	/*
540 	 * Optionally truncate the relation.
541 	 */
542 	if (should_attempt_truncation(params, vacrelstats))
543 	{
544 		/*
545 		 * Update error traceback information.  This is the last phase during
546 		 * which we add context information to errors, so we don't need to
547 		 * revert to the previous phase.
548 		 */
549 		update_vacuum_error_info(vacrelstats, NULL, VACUUM_ERRCB_PHASE_TRUNCATE,
550 								 vacrelstats->nonempty_pages);
551 		lazy_truncate_heap(onerel, vacrelstats);
552 	}
553 
554 	/* Pop the error context stack */
555 	error_context_stack = errcallback.previous;
556 
557 	/* Report that we are now doing final cleanup */
558 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
559 								 PROGRESS_VACUUM_PHASE_FINAL_CLEANUP);
560 
561 	/*
562 	 * Update statistics in pg_class.
563 	 *
564 	 * A corner case here is that if we scanned no pages at all because every
565 	 * page is all-visible, we should not update relpages/reltuples, because
566 	 * we have no new information to contribute.  In particular this keeps us
567 	 * from replacing relpages=reltuples=0 (which means "unknown tuple
568 	 * density") with nonzero relpages and reltuples=0 (which means "zero
569 	 * tuple density") unless there's some actual evidence for the latter.
570 	 *
571 	 * It's important that we use tupcount_pages and not scanned_pages for the
572 	 * check described above; scanned_pages counts pages where we could not
573 	 * get cleanup lock, and which were processed only for frozenxid purposes.
574 	 *
575 	 * We do update relallvisible even in the corner case, since if the table
576 	 * is all-visible we'd definitely like to know that.  But clamp the value
577 	 * to be not more than what we're setting relpages to.
578 	 *
579 	 * Also, don't change relfrozenxid/relminmxid if we skipped any pages,
580 	 * since then we don't know for certain that all tuples have a newer xmin.
581 	 */
582 	new_rel_pages = vacrelstats->rel_pages;
583 	new_live_tuples = vacrelstats->new_live_tuples;
584 	if (vacrelstats->tupcount_pages == 0 && new_rel_pages > 0)
585 	{
586 		new_rel_pages = vacrelstats->old_rel_pages;
587 		new_live_tuples = vacrelstats->old_live_tuples;
588 	}
589 
590 	visibilitymap_count(onerel, &new_rel_allvisible, NULL);
591 	if (new_rel_allvisible > new_rel_pages)
592 		new_rel_allvisible = new_rel_pages;
593 
594 	new_frozen_xid = scanned_all_unfrozen ? FreezeLimit : InvalidTransactionId;
595 	new_min_multi = scanned_all_unfrozen ? MultiXactCutoff : InvalidMultiXactId;
596 
597 	vac_update_relstats(onerel,
598 						new_rel_pages,
599 						new_live_tuples,
600 						new_rel_allvisible,
601 						nindexes > 0,
602 						new_frozen_xid,
603 						new_min_multi,
604 						false);
605 
606 	/* report results to the stats collector, too */
607 	pgstat_report_vacuum(RelationGetRelid(onerel),
608 						 onerel->rd_rel->relisshared,
609 						 new_live_tuples,
610 						 vacrelstats->new_dead_tuples);
611 	pgstat_progress_end_command();
612 
613 	/* and log the action if appropriate */
614 	if (IsAutoVacuumWorkerProcess() && params->log_min_duration >= 0)
615 	{
616 		TimestampTz endtime = GetCurrentTimestamp();
617 
618 		if (params->log_min_duration == 0 ||
619 			TimestampDifferenceExceeds(starttime, endtime,
620 									   params->log_min_duration))
621 		{
622 			StringInfoData buf;
623 			char	   *msgfmt;
624 
625 			TimestampDifference(starttime, endtime, &secs, &usecs);
626 
627 			memset(&walusage, 0, sizeof(WalUsage));
628 			WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start);
629 
630 			read_rate = 0;
631 			write_rate = 0;
632 			if ((secs > 0) || (usecs > 0))
633 			{
634 				read_rate = (double) BLCKSZ * VacuumPageMiss / (1024 * 1024) /
635 					(secs + usecs / 1000000.0);
636 				write_rate = (double) BLCKSZ * VacuumPageDirty / (1024 * 1024) /
637 					(secs + usecs / 1000000.0);
638 			}
639 
640 			/*
641 			 * This is pretty messy, but we split it up so that we can skip
642 			 * emitting individual parts of the message when not applicable.
643 			 */
644 			initStringInfo(&buf);
645 			if (params->is_wraparound)
646 			{
647 				if (aggressive)
648 					msgfmt = _("automatic aggressive vacuum to prevent wraparound of table \"%s.%s.%s\": index scans: %d\n");
649 				else
650 					msgfmt = _("automatic vacuum to prevent wraparound of table \"%s.%s.%s\": index scans: %d\n");
651 			}
652 			else
653 			{
654 				if (aggressive)
655 					msgfmt = _("automatic aggressive vacuum of table \"%s.%s.%s\": index scans: %d\n");
656 				else
657 					msgfmt = _("automatic vacuum of table \"%s.%s.%s\": index scans: %d\n");
658 			}
659 			appendStringInfo(&buf, msgfmt,
660 							 get_database_name(MyDatabaseId),
661 							 vacrelstats->relnamespace,
662 							 vacrelstats->relname,
663 							 vacrelstats->num_index_scans);
664 			appendStringInfo(&buf, _("pages: %u removed, %u remain, %u skipped due to pins, %u skipped frozen\n"),
665 							 vacrelstats->pages_removed,
666 							 vacrelstats->rel_pages,
667 							 vacrelstats->pinskipped_pages,
668 							 vacrelstats->frozenskipped_pages);
669 			appendStringInfo(&buf,
670 							 _("tuples: %.0f removed, %.0f remain, %.0f are dead but not yet removable, oldest xmin: %u\n"),
671 							 vacrelstats->tuples_deleted,
672 							 vacrelstats->new_rel_tuples,
673 							 vacrelstats->new_dead_tuples,
674 							 OldestXmin);
675 			appendStringInfo(&buf,
676 							 _("buffer usage: %lld hits, %lld misses, %lld dirtied\n"),
677 							 (long long) VacuumPageHit,
678 							 (long long) VacuumPageMiss,
679 							 (long long) VacuumPageDirty);
680 			appendStringInfo(&buf, _("avg read rate: %.3f MB/s, avg write rate: %.3f MB/s\n"),
681 							 read_rate, write_rate);
682 			appendStringInfo(&buf, _("system usage: %s\n"), pg_rusage_show(&ru0));
683 			appendStringInfo(&buf,
684 							 _("WAL usage: %ld records, %ld full page images, %llu bytes"),
685 							 walusage.wal_records,
686 							 walusage.wal_fpi,
687 							 (unsigned long long) walusage.wal_bytes);
688 
689 			ereport(LOG,
690 					(errmsg_internal("%s", buf.data)));
691 			pfree(buf.data);
692 		}
693 	}
694 }
695 
696 /*
697  * For Hot Standby we need to know the highest transaction id that will
698  * be removed by any change. VACUUM proceeds in a number of passes so
699  * we need to consider how each pass operates. The first phase runs
700  * heap_page_prune(), which can issue XLOG_HEAP2_CLEAN records as it
701  * progresses - these will have a latestRemovedXid on each record.
702  * In some cases this removes all of the tuples to be removed, though
703  * often we have dead tuples with index pointers so we must remember them
704  * for removal in phase 3. Index records for those rows are removed
705  * in phase 2 and index blocks do not have MVCC information attached.
706  * So before we can allow removal of any index tuples we need to issue
707  * a WAL record containing the latestRemovedXid of rows that will be
708  * removed in phase three. This allows recovery queries to block at the
709  * correct place, i.e. before phase two, rather than during phase three
710  * which would be after the rows have become inaccessible.
711  */
712 static void
vacuum_log_cleanup_info(Relation rel,LVRelStats * vacrelstats)713 vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
714 {
715 	/*
716 	 * Skip this for relations for which no WAL is to be written, or if we're
717 	 * not trying to support archive recovery.
718 	 */
719 	if (!RelationNeedsWAL(rel) || !XLogIsNeeded())
720 		return;
721 
722 	/*
723 	 * No need to write the record at all unless it contains a valid value
724 	 */
725 	if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
726 		(void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
727 }
728 
729 /*
730  *	lazy_scan_heap() -- scan an open heap relation
731  *
732  *		This routine prunes each page in the heap, which will among other
733  *		things truncate dead tuples to dead line pointers, defragment the
734  *		page, and set commit status bits (see heap_page_prune).  It also builds
735  *		lists of dead tuples and pages with free space, calculates statistics
736  *		on the number of live tuples in the heap, and marks pages as
737  *		all-visible if appropriate.  When done, or when we run low on space for
738  *		dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
739  *		to reclaim dead line pointers.
740  *
741  *		If the table has at least two indexes, we execute both index vacuum
742  *		and index cleanup with parallel workers unless parallel vacuum is
743  *		disabled.  In a parallel vacuum, we enter parallel mode and then
744  *		create both the parallel context and the DSM segment before starting
745  *		heap scan so that we can record dead tuples to the DSM segment.  All
746  *		parallel workers are launched at beginning of index vacuuming and
747  *		index cleanup and they exit once done with all indexes.  At the end of
748  *		this function we exit from parallel mode.  Index bulk-deletion results
749  *		are stored in the DSM segment and we update index statistics for all
750  *		the indexes after exiting from parallel mode since writes are not
751  *		allowed during parallel mode.
752  *
753  *		If there are no indexes then we can reclaim line pointers on the fly;
754  *		dead line pointers need only be retained until all index pointers that
755  *		reference them have been killed.
756  */
757 static void
lazy_scan_heap(Relation onerel,VacuumParams * params,LVRelStats * vacrelstats,Relation * Irel,int nindexes,bool aggressive)758 lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
759 			   Relation *Irel, int nindexes, bool aggressive)
760 {
761 	LVParallelState *lps = NULL;
762 	LVDeadTuples *dead_tuples;
763 	BlockNumber nblocks,
764 				blkno;
765 	HeapTupleData tuple;
766 	TransactionId relfrozenxid = onerel->rd_rel->relfrozenxid;
767 	TransactionId relminmxid = onerel->rd_rel->relminmxid;
768 	BlockNumber empty_pages,
769 				vacuumed_pages,
770 				next_fsm_block_to_vacuum;
771 	double		num_tuples,		/* total number of nonremovable tuples */
772 				live_tuples,	/* live tuples (reltuples estimate) */
773 				tups_vacuumed,	/* tuples cleaned up by vacuum */
774 				nkeep,			/* dead-but-not-removable tuples */
775 				nunused;		/* unused line pointers */
776 	IndexBulkDeleteResult **indstats;
777 	int			i;
778 	PGRUsage	ru0;
779 	Buffer		vmbuffer = InvalidBuffer;
780 	BlockNumber next_unskippable_block;
781 	bool		skipping_blocks;
782 	xl_heap_freeze_tuple *frozen;
783 	StringInfoData buf;
784 	const int	initprog_index[] = {
785 		PROGRESS_VACUUM_PHASE,
786 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
787 		PROGRESS_VACUUM_MAX_DEAD_TUPLES
788 	};
789 	int64		initprog_val[3];
790 
791 	pg_rusage_init(&ru0);
792 
793 	if (aggressive)
794 		ereport(elevel,
795 				(errmsg("aggressively vacuuming \"%s.%s\"",
796 						vacrelstats->relnamespace,
797 						vacrelstats->relname)));
798 	else
799 		ereport(elevel,
800 				(errmsg("vacuuming \"%s.%s\"",
801 						vacrelstats->relnamespace,
802 						vacrelstats->relname)));
803 
804 	empty_pages = vacuumed_pages = 0;
805 	next_fsm_block_to_vacuum = (BlockNumber) 0;
806 	num_tuples = live_tuples = tups_vacuumed = nkeep = nunused = 0;
807 
808 	indstats = (IndexBulkDeleteResult **)
809 		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
810 
811 	nblocks = RelationGetNumberOfBlocks(onerel);
812 	vacrelstats->rel_pages = nblocks;
813 	vacrelstats->scanned_pages = 0;
814 	vacrelstats->tupcount_pages = 0;
815 	vacrelstats->nonempty_pages = 0;
816 	vacrelstats->latestRemovedXid = InvalidTransactionId;
817 
818 	/*
819 	 * Initialize state for a parallel vacuum.  As of now, only one worker can
820 	 * be used for an index, so we invoke parallelism only if there are at
821 	 * least two indexes on a table.
822 	 */
823 	if (params->nworkers >= 0 && vacrelstats->useindex && nindexes > 1)
824 	{
825 		/*
826 		 * Since parallel workers cannot access data in temporary tables, we
827 		 * can't perform parallel vacuum on them.
828 		 */
829 		if (RelationUsesLocalBuffers(onerel))
830 		{
831 			/*
832 			 * Give warning only if the user explicitly tries to perform a
833 			 * parallel vacuum on the temporary table.
834 			 */
835 			if (params->nworkers > 0)
836 				ereport(WARNING,
837 						(errmsg("disabling parallel option of vacuum on \"%s\" --- cannot vacuum temporary tables in parallel",
838 								vacrelstats->relname)));
839 		}
840 		else
841 			lps = begin_parallel_vacuum(RelationGetRelid(onerel), Irel,
842 										vacrelstats, nblocks, nindexes,
843 										params->nworkers);
844 	}
845 
846 	/*
847 	 * Allocate the space for dead tuples in case parallel vacuum is not
848 	 * initialized.
849 	 */
850 	if (!ParallelVacuumIsActive(lps))
851 		lazy_space_alloc(vacrelstats, nblocks);
852 
853 	dead_tuples = vacrelstats->dead_tuples;
854 	frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
855 
856 	/* Report that we're scanning the heap, advertising total # of blocks */
857 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
858 	initprog_val[1] = nblocks;
859 	initprog_val[2] = dead_tuples->max_tuples;
860 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
861 
862 	/*
863 	 * Except when aggressive is set, we want to skip pages that are
864 	 * all-visible according to the visibility map, but only when we can skip
865 	 * at least SKIP_PAGES_THRESHOLD consecutive pages.  Since we're reading
866 	 * sequentially, the OS should be doing readahead for us, so there's no
867 	 * gain in skipping a page now and then; that's likely to disable
868 	 * readahead and so be counterproductive. Also, skipping even a single
869 	 * page means that we can't update relfrozenxid, so we only want to do it
870 	 * if we can skip a goodly number of pages.
871 	 *
872 	 * When aggressive is set, we can't skip pages just because they are
873 	 * all-visible, but we can still skip pages that are all-frozen, since
874 	 * such pages do not need freezing and do not affect the value that we can
875 	 * safely set for relfrozenxid or relminmxid.
876 	 *
877 	 * Before entering the main loop, establish the invariant that
878 	 * next_unskippable_block is the next block number >= blkno that we can't
879 	 * skip based on the visibility map, either all-visible for a regular scan
880 	 * or all-frozen for an aggressive scan.  We set it to nblocks if there's
881 	 * no such block.  We also set up the skipping_blocks flag correctly at
882 	 * this stage.
883 	 *
884 	 * Note: The value returned by visibilitymap_get_status could be slightly
885 	 * out-of-date, since we make this test before reading the corresponding
886 	 * heap page or locking the buffer.  This is OK.  If we mistakenly think
887 	 * that the page is all-visible or all-frozen when in fact the flag's just
888 	 * been cleared, we might fail to vacuum the page.  It's easy to see that
889 	 * skipping a page when aggressive is not set is not a very big deal; we
890 	 * might leave some dead tuples lying around, but the next vacuum will
891 	 * find them.  But even when aggressive *is* set, it's still OK if we miss
892 	 * a page whose all-frozen marking has just been cleared.  Any new XIDs
893 	 * just added to that page are necessarily newer than the GlobalXmin we
894 	 * computed, so they'll have no effect on the value to which we can safely
895 	 * set relfrozenxid.  A similar argument applies for MXIDs and relminmxid.
896 	 *
897 	 * We will scan the table's last page, at least to the extent of
898 	 * determining whether it has tuples or not, even if it should be skipped
899 	 * according to the above rules; except when we've already determined that
900 	 * it's not worth trying to truncate the table.  This avoids having
901 	 * lazy_truncate_heap() take access-exclusive lock on the table to attempt
902 	 * a truncation that just fails immediately because there are tuples in
903 	 * the last page.  This is worth avoiding mainly because such a lock must
904 	 * be replayed on any hot standby, where it can be disruptive.
905 	 */
906 	next_unskippable_block = 0;
907 	if ((params->options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
908 	{
909 		while (next_unskippable_block < nblocks)
910 		{
911 			uint8		vmstatus;
912 
913 			vmstatus = visibilitymap_get_status(onerel, next_unskippable_block,
914 												&vmbuffer);
915 			if (aggressive)
916 			{
917 				if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
918 					break;
919 			}
920 			else
921 			{
922 				if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
923 					break;
924 			}
925 			vacuum_delay_point();
926 			next_unskippable_block++;
927 		}
928 	}
929 
930 	if (next_unskippable_block >= SKIP_PAGES_THRESHOLD)
931 		skipping_blocks = true;
932 	else
933 		skipping_blocks = false;
934 
935 	for (blkno = 0; blkno < nblocks; blkno++)
936 	{
937 		Buffer		buf;
938 		Page		page;
939 		OffsetNumber offnum,
940 					maxoff;
941 		bool		tupgone,
942 					hastup;
943 		int			prev_dead_count;
944 		int			nfrozen;
945 		Size		freespace;
946 		bool		all_visible_according_to_vm = false;
947 		bool		all_visible;
948 		bool		all_frozen = true;	/* provided all_visible is also true */
949 		bool		has_dead_tuples;
950 		TransactionId visibility_cutoff_xid = InvalidTransactionId;
951 
952 		/* see note above about forcing scanning of last page */
953 #define FORCE_CHECK_PAGE() \
954 		(blkno == nblocks - 1 && should_attempt_truncation(params, vacrelstats))
955 
956 		pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
957 
958 		update_vacuum_error_info(vacrelstats, NULL, VACUUM_ERRCB_PHASE_SCAN_HEAP,
959 								 blkno);
960 
961 		if (blkno == next_unskippable_block)
962 		{
963 			/* Time to advance next_unskippable_block */
964 			next_unskippable_block++;
965 			if ((params->options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
966 			{
967 				while (next_unskippable_block < nblocks)
968 				{
969 					uint8		vmskipflags;
970 
971 					vmskipflags = visibilitymap_get_status(onerel,
972 														   next_unskippable_block,
973 														   &vmbuffer);
974 					if (aggressive)
975 					{
976 						if ((vmskipflags & VISIBILITYMAP_ALL_FROZEN) == 0)
977 							break;
978 					}
979 					else
980 					{
981 						if ((vmskipflags & VISIBILITYMAP_ALL_VISIBLE) == 0)
982 							break;
983 					}
984 					vacuum_delay_point();
985 					next_unskippable_block++;
986 				}
987 			}
988 
989 			/*
990 			 * We know we can't skip the current block.  But set up
991 			 * skipping_blocks to do the right thing at the following blocks.
992 			 */
993 			if (next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD)
994 				skipping_blocks = true;
995 			else
996 				skipping_blocks = false;
997 
998 			/*
999 			 * Normally, the fact that we can't skip this block must mean that
1000 			 * it's not all-visible.  But in an aggressive vacuum we know only
1001 			 * that it's not all-frozen, so it might still be all-visible.
1002 			 */
1003 			if (aggressive && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer))
1004 				all_visible_according_to_vm = true;
1005 		}
1006 		else
1007 		{
1008 			/*
1009 			 * The current block is potentially skippable; if we've seen a
1010 			 * long enough run of skippable blocks to justify skipping it, and
1011 			 * we're not forced to check it, then go ahead and skip.
1012 			 * Otherwise, the page must be at least all-visible if not
1013 			 * all-frozen, so we can set all_visible_according_to_vm = true.
1014 			 */
1015 			if (skipping_blocks && !FORCE_CHECK_PAGE())
1016 			{
1017 				/*
1018 				 * Tricky, tricky.  If this is in aggressive vacuum, the page
1019 				 * must have been all-frozen at the time we checked whether it
1020 				 * was skippable, but it might not be any more.  We must be
1021 				 * careful to count it as a skipped all-frozen page in that
1022 				 * case, or else we'll think we can't update relfrozenxid and
1023 				 * relminmxid.  If it's not an aggressive vacuum, we don't
1024 				 * know whether it was all-frozen, so we have to recheck; but
1025 				 * in this case an approximate answer is OK.
1026 				 */
1027 				if (aggressive || VM_ALL_FROZEN(onerel, blkno, &vmbuffer))
1028 					vacrelstats->frozenskipped_pages++;
1029 				continue;
1030 			}
1031 			all_visible_according_to_vm = true;
1032 		}
1033 
1034 		vacuum_delay_point();
1035 
1036 		/*
1037 		 * If we are close to overrunning the available space for dead-tuple
1038 		 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
1039 		 */
1040 		if ((dead_tuples->max_tuples - dead_tuples->num_tuples) < MaxHeapTuplesPerPage &&
1041 			dead_tuples->num_tuples > 0)
1042 		{
1043 			/*
1044 			 * Before beginning index vacuuming, we release any pin we may
1045 			 * hold on the visibility map page.  This isn't necessary for
1046 			 * correctness, but we do it anyway to avoid holding the pin
1047 			 * across a lengthy, unrelated operation.
1048 			 */
1049 			if (BufferIsValid(vmbuffer))
1050 			{
1051 				ReleaseBuffer(vmbuffer);
1052 				vmbuffer = InvalidBuffer;
1053 			}
1054 
1055 			/* Work on all the indexes, then the heap */
1056 			lazy_vacuum_all_indexes(onerel, Irel, indstats,
1057 									vacrelstats, lps, nindexes);
1058 
1059 			/* Remove tuples from heap */
1060 			lazy_vacuum_heap(onerel, vacrelstats);
1061 
1062 			/*
1063 			 * Forget the now-vacuumed tuples, and press on, but be careful
1064 			 * not to reset latestRemovedXid since we want that value to be
1065 			 * valid.
1066 			 */
1067 			dead_tuples->num_tuples = 0;
1068 
1069 			/*
1070 			 * Vacuum the Free Space Map to make newly-freed space visible on
1071 			 * upper-level FSM pages.  Note we have not yet processed blkno.
1072 			 */
1073 			FreeSpaceMapVacuumRange(onerel, next_fsm_block_to_vacuum, blkno);
1074 			next_fsm_block_to_vacuum = blkno;
1075 
1076 			/* Report that we are once again scanning the heap */
1077 			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
1078 										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
1079 		}
1080 
1081 		/*
1082 		 * Pin the visibility map page in case we need to mark the page
1083 		 * all-visible.  In most cases this will be very cheap, because we'll
1084 		 * already have the correct page pinned anyway.  However, it's
1085 		 * possible that (a) next_unskippable_block is covered by a different
1086 		 * VM page than the current block or (b) we released our pin and did a
1087 		 * cycle of index vacuuming.
1088 		 *
1089 		 */
1090 		visibilitymap_pin(onerel, blkno, &vmbuffer);
1091 
1092 		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
1093 								 RBM_NORMAL, vac_strategy);
1094 
1095 		/* We need buffer cleanup lock so that we can prune HOT chains. */
1096 		if (!ConditionalLockBufferForCleanup(buf))
1097 		{
1098 			/*
1099 			 * If we're not performing an aggressive scan to guard against XID
1100 			 * wraparound, and we don't want to forcibly check the page, then
1101 			 * it's OK to skip vacuuming pages we get a lock conflict on. They
1102 			 * will be dealt with in some future vacuum.
1103 			 */
1104 			if (!aggressive && !FORCE_CHECK_PAGE())
1105 			{
1106 				ReleaseBuffer(buf);
1107 				vacrelstats->pinskipped_pages++;
1108 				continue;
1109 			}
1110 
1111 			/*
1112 			 * Read the page with share lock to see if any xids on it need to
1113 			 * be frozen.  If not we just skip the page, after updating our
1114 			 * scan statistics.  If there are some, we wait for cleanup lock.
1115 			 *
1116 			 * We could defer the lock request further by remembering the page
1117 			 * and coming back to it later, or we could even register
1118 			 * ourselves for multiple buffers and then service whichever one
1119 			 * is received first.  For now, this seems good enough.
1120 			 *
1121 			 * If we get here with aggressive false, then we're just forcibly
1122 			 * checking the page, and so we don't want to insist on getting
1123 			 * the lock; we only need to know if the page contains tuples, so
1124 			 * that we can update nonempty_pages correctly.  It's convenient
1125 			 * to use lazy_check_needs_freeze() for both situations, though.
1126 			 */
1127 			LockBuffer(buf, BUFFER_LOCK_SHARE);
1128 			if (!lazy_check_needs_freeze(buf, &hastup))
1129 			{
1130 				UnlockReleaseBuffer(buf);
1131 				vacrelstats->scanned_pages++;
1132 				vacrelstats->pinskipped_pages++;
1133 				if (hastup)
1134 					vacrelstats->nonempty_pages = blkno + 1;
1135 				continue;
1136 			}
1137 			if (!aggressive)
1138 			{
1139 				/*
1140 				 * Here, we must not advance scanned_pages; that would amount
1141 				 * to claiming that the page contains no freezable tuples.
1142 				 */
1143 				UnlockReleaseBuffer(buf);
1144 				vacrelstats->pinskipped_pages++;
1145 				if (hastup)
1146 					vacrelstats->nonempty_pages = blkno + 1;
1147 				continue;
1148 			}
1149 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1150 			LockBufferForCleanup(buf);
1151 			/* drop through to normal processing */
1152 		}
1153 
1154 		vacrelstats->scanned_pages++;
1155 		vacrelstats->tupcount_pages++;
1156 
1157 		page = BufferGetPage(buf);
1158 
1159 		if (PageIsNew(page))
1160 		{
1161 			/*
1162 			 * All-zeroes pages can be left over if either a backend extends
1163 			 * the relation by a single page, but crashes before the newly
1164 			 * initialized page has been written out, or when bulk-extending
1165 			 * the relation (which creates a number of empty pages at the tail
1166 			 * end of the relation, but enters them into the FSM).
1167 			 *
1168 			 * Note we do not enter the page into the visibilitymap. That has
1169 			 * the downside that we repeatedly visit this page in subsequent
1170 			 * vacuums, but otherwise we'll never not discover the space on a
1171 			 * promoted standby. The harm of repeated checking ought to
1172 			 * normally not be too bad - the space usually should be used at
1173 			 * some point, otherwise there wouldn't be any regular vacuums.
1174 			 *
1175 			 * Make sure these pages are in the FSM, to ensure they can be
1176 			 * reused. Do that by testing if there's any space recorded for
1177 			 * the page. If not, enter it. We do so after releasing the lock
1178 			 * on the heap page, the FSM is approximate, after all.
1179 			 */
1180 			UnlockReleaseBuffer(buf);
1181 
1182 			empty_pages++;
1183 
1184 			if (GetRecordedFreeSpace(onerel, blkno) == 0)
1185 			{
1186 				Size		freespace;
1187 
1188 				freespace = BufferGetPageSize(buf) - SizeOfPageHeaderData;
1189 				RecordPageWithFreeSpace(onerel, blkno, freespace);
1190 			}
1191 			continue;
1192 		}
1193 
1194 		if (PageIsEmpty(page))
1195 		{
1196 			empty_pages++;
1197 			freespace = PageGetHeapFreeSpace(page);
1198 
1199 			/*
1200 			 * Empty pages are always all-visible and all-frozen (note that
1201 			 * the same is currently not true for new pages, see above).
1202 			 */
1203 			if (!PageIsAllVisible(page))
1204 			{
1205 				START_CRIT_SECTION();
1206 
1207 				/* mark buffer dirty before writing a WAL record */
1208 				MarkBufferDirty(buf);
1209 
1210 				/*
1211 				 * It's possible that another backend has extended the heap,
1212 				 * initialized the page, and then failed to WAL-log the page
1213 				 * due to an ERROR.  Since heap extension is not WAL-logged,
1214 				 * recovery might try to replay our record setting the page
1215 				 * all-visible and find that the page isn't initialized, which
1216 				 * will cause a PANIC.  To prevent that, check whether the
1217 				 * page has been previously WAL-logged, and if not, do that
1218 				 * now.
1219 				 */
1220 				if (RelationNeedsWAL(onerel) &&
1221 					PageGetLSN(page) == InvalidXLogRecPtr)
1222 					log_newpage_buffer(buf, true);
1223 
1224 				PageSetAllVisible(page);
1225 				visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
1226 								  vmbuffer, InvalidTransactionId,
1227 								  VISIBILITYMAP_ALL_VISIBLE | VISIBILITYMAP_ALL_FROZEN);
1228 				END_CRIT_SECTION();
1229 			}
1230 
1231 			UnlockReleaseBuffer(buf);
1232 			RecordPageWithFreeSpace(onerel, blkno, freespace);
1233 			continue;
1234 		}
1235 
1236 		/*
1237 		 * Prune all HOT-update chains in this page.
1238 		 *
1239 		 * We count tuples removed by the pruning step as removed by VACUUM.
1240 		 */
1241 		tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin, false,
1242 										 &vacrelstats->latestRemovedXid);
1243 
1244 		/*
1245 		 * Now scan the page to collect vacuumable items and check for tuples
1246 		 * requiring freezing.
1247 		 */
1248 		all_visible = true;
1249 		has_dead_tuples = false;
1250 		nfrozen = 0;
1251 		hastup = false;
1252 		prev_dead_count = dead_tuples->num_tuples;
1253 		maxoff = PageGetMaxOffsetNumber(page);
1254 
1255 		/*
1256 		 * Note: If you change anything in the loop below, also look at
1257 		 * heap_page_is_all_visible to see if that needs to be changed.
1258 		 */
1259 		for (offnum = FirstOffsetNumber;
1260 			 offnum <= maxoff;
1261 			 offnum = OffsetNumberNext(offnum))
1262 		{
1263 			ItemId		itemid;
1264 
1265 			itemid = PageGetItemId(page, offnum);
1266 
1267 			/* Unused items require no processing, but we count 'em */
1268 			if (!ItemIdIsUsed(itemid))
1269 			{
1270 				nunused += 1;
1271 				continue;
1272 			}
1273 
1274 			/* Redirect items mustn't be touched */
1275 			if (ItemIdIsRedirected(itemid))
1276 			{
1277 				hastup = true;	/* this page won't be truncatable */
1278 				continue;
1279 			}
1280 
1281 			ItemPointerSet(&(tuple.t_self), blkno, offnum);
1282 
1283 			/*
1284 			 * DEAD line pointers are to be vacuumed normally; but we don't
1285 			 * count them in tups_vacuumed, else we'd be double-counting (at
1286 			 * least in the common case where heap_page_prune() just freed up
1287 			 * a non-HOT tuple).
1288 			 */
1289 			if (ItemIdIsDead(itemid))
1290 			{
1291 				lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
1292 				all_visible = false;
1293 				continue;
1294 			}
1295 
1296 			Assert(ItemIdIsNormal(itemid));
1297 
1298 			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1299 			tuple.t_len = ItemIdGetLength(itemid);
1300 			tuple.t_tableOid = RelationGetRelid(onerel);
1301 
1302 			tupgone = false;
1303 
1304 			/*
1305 			 * The criteria for counting a tuple as live in this block need to
1306 			 * match what analyze.c's acquire_sample_rows() does, otherwise
1307 			 * VACUUM and ANALYZE may produce wildly different reltuples
1308 			 * values, e.g. when there are many recently-dead tuples.
1309 			 *
1310 			 * The logic here is a bit simpler than acquire_sample_rows(), as
1311 			 * VACUUM can't run inside a transaction block, which makes some
1312 			 * cases impossible (e.g. in-progress insert from the same
1313 			 * transaction).
1314 			 */
1315 			switch (HeapTupleSatisfiesVacuum(&tuple, OldestXmin, buf))
1316 			{
1317 				case HEAPTUPLE_DEAD:
1318 
1319 					/*
1320 					 * Ordinarily, DEAD tuples would have been removed by
1321 					 * heap_page_prune(), but it's possible that the tuple
1322 					 * state changed since heap_page_prune() looked.  In
1323 					 * particular an INSERT_IN_PROGRESS tuple could have
1324 					 * changed to DEAD if the inserter aborted.  So this
1325 					 * cannot be considered an error condition.
1326 					 *
1327 					 * If the tuple is HOT-updated then it must only be
1328 					 * removed by a prune operation; so we keep it just as if
1329 					 * it were RECENTLY_DEAD.  Also, if it's a heap-only
1330 					 * tuple, we choose to keep it, because it'll be a lot
1331 					 * cheaper to get rid of it in the next pruning pass than
1332 					 * to treat it like an indexed tuple. Finally, if index
1333 					 * cleanup is disabled, the second heap pass will not
1334 					 * execute, and the tuple will not get removed, so we must
1335 					 * treat it like any other dead tuple that we choose to
1336 					 * keep.
1337 					 *
1338 					 * If this were to happen for a tuple that actually needed
1339 					 * to be deleted, we'd be in trouble, because it'd
1340 					 * possibly leave a tuple below the relation's xmin
1341 					 * horizon alive.  heap_prepare_freeze_tuple() is prepared
1342 					 * to detect that case and abort the transaction,
1343 					 * preventing corruption.
1344 					 */
1345 					if (HeapTupleIsHotUpdated(&tuple) ||
1346 						HeapTupleIsHeapOnly(&tuple) ||
1347 						params->index_cleanup == VACOPT_TERNARY_DISABLED)
1348 						nkeep += 1;
1349 					else
1350 						tupgone = true; /* we can delete the tuple */
1351 					all_visible = false;
1352 					break;
1353 				case HEAPTUPLE_LIVE:
1354 
1355 					/*
1356 					 * Count it as live.  Not only is this natural, but it's
1357 					 * also what acquire_sample_rows() does.
1358 					 */
1359 					live_tuples += 1;
1360 
1361 					/*
1362 					 * Is the tuple definitely visible to all transactions?
1363 					 *
1364 					 * NB: Like with per-tuple hint bits, we can't set the
1365 					 * PD_ALL_VISIBLE flag if the inserter committed
1366 					 * asynchronously. See SetHintBits for more info. Check
1367 					 * that the tuple is hinted xmin-committed because of
1368 					 * that.
1369 					 */
1370 					if (all_visible)
1371 					{
1372 						TransactionId xmin;
1373 
1374 						if (!HeapTupleHeaderXminCommitted(tuple.t_data))
1375 						{
1376 							all_visible = false;
1377 							break;
1378 						}
1379 
1380 						/*
1381 						 * The inserter definitely committed. But is it old
1382 						 * enough that everyone sees it as committed?
1383 						 */
1384 						xmin = HeapTupleHeaderGetXmin(tuple.t_data);
1385 						if (!TransactionIdPrecedes(xmin, OldestXmin))
1386 						{
1387 							all_visible = false;
1388 							break;
1389 						}
1390 
1391 						/* Track newest xmin on page. */
1392 						if (TransactionIdFollows(xmin, visibility_cutoff_xid))
1393 							visibility_cutoff_xid = xmin;
1394 					}
1395 					break;
1396 				case HEAPTUPLE_RECENTLY_DEAD:
1397 
1398 					/*
1399 					 * If tuple is recently deleted then we must not remove it
1400 					 * from relation.
1401 					 */
1402 					nkeep += 1;
1403 					all_visible = false;
1404 					break;
1405 				case HEAPTUPLE_INSERT_IN_PROGRESS:
1406 
1407 					/*
1408 					 * This is an expected case during concurrent vacuum.
1409 					 *
1410 					 * We do not count these rows as live, because we expect
1411 					 * the inserting transaction to update the counters at
1412 					 * commit, and we assume that will happen only after we
1413 					 * report our results.  This assumption is a bit shaky,
1414 					 * but it is what acquire_sample_rows() does, so be
1415 					 * consistent.
1416 					 */
1417 					all_visible = false;
1418 					break;
1419 				case HEAPTUPLE_DELETE_IN_PROGRESS:
1420 					/* This is an expected case during concurrent vacuum */
1421 					all_visible = false;
1422 
1423 					/*
1424 					 * Count such rows as live.  As above, we assume the
1425 					 * deleting transaction will commit and update the
1426 					 * counters after we report.
1427 					 */
1428 					live_tuples += 1;
1429 					break;
1430 				default:
1431 					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1432 					break;
1433 			}
1434 
1435 			if (tupgone)
1436 			{
1437 				lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
1438 				HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
1439 													   &vacrelstats->latestRemovedXid);
1440 				tups_vacuumed += 1;
1441 				has_dead_tuples = true;
1442 			}
1443 			else
1444 			{
1445 				bool		tuple_totally_frozen;
1446 
1447 				num_tuples += 1;
1448 				hastup = true;
1449 
1450 				/*
1451 				 * Each non-removable tuple must be checked to see if it needs
1452 				 * freezing.  Note we already have exclusive buffer lock.
1453 				 */
1454 				if (heap_prepare_freeze_tuple(tuple.t_data,
1455 											  relfrozenxid, relminmxid,
1456 											  FreezeLimit, MultiXactCutoff,
1457 											  &frozen[nfrozen],
1458 											  &tuple_totally_frozen))
1459 					frozen[nfrozen++].offset = offnum;
1460 
1461 				if (!tuple_totally_frozen)
1462 					all_frozen = false;
1463 			}
1464 		}						/* scan along page */
1465 
1466 		/*
1467 		 * If we froze any tuples, mark the buffer dirty, and write a WAL
1468 		 * record recording the changes.  We must log the changes to be
1469 		 * crash-safe against future truncation of CLOG.
1470 		 */
1471 		if (nfrozen > 0)
1472 		{
1473 			START_CRIT_SECTION();
1474 
1475 			MarkBufferDirty(buf);
1476 
1477 			/* execute collected freezes */
1478 			for (i = 0; i < nfrozen; i++)
1479 			{
1480 				ItemId		itemid;
1481 				HeapTupleHeader htup;
1482 
1483 				itemid = PageGetItemId(page, frozen[i].offset);
1484 				htup = (HeapTupleHeader) PageGetItem(page, itemid);
1485 
1486 				heap_execute_freeze_tuple(htup, &frozen[i]);
1487 			}
1488 
1489 			/* Now WAL-log freezing if necessary */
1490 			if (RelationNeedsWAL(onerel))
1491 			{
1492 				XLogRecPtr	recptr;
1493 
1494 				recptr = log_heap_freeze(onerel, buf, FreezeLimit,
1495 										 frozen, nfrozen);
1496 				PageSetLSN(page, recptr);
1497 			}
1498 
1499 			END_CRIT_SECTION();
1500 		}
1501 
1502 		/*
1503 		 * If there are no indexes we can vacuum the page right now instead of
1504 		 * doing a second scan. Also we don't do that but forget dead tuples
1505 		 * when index cleanup is disabled.
1506 		 */
1507 		if (!vacrelstats->useindex && dead_tuples->num_tuples > 0)
1508 		{
1509 			if (nindexes == 0)
1510 			{
1511 				/* Remove tuples from heap if the table has no index */
1512 				lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
1513 				vacuumed_pages++;
1514 				has_dead_tuples = false;
1515 			}
1516 			else
1517 			{
1518 				/*
1519 				 * Here, we have indexes but index cleanup is disabled.
1520 				 * Instead of vacuuming the dead tuples on the heap, we just
1521 				 * forget them.
1522 				 *
1523 				 * Note that vacrelstats->dead_tuples could have tuples which
1524 				 * became dead after HOT-pruning but are not marked dead yet.
1525 				 * We do not process them because it's a very rare condition,
1526 				 * and the next vacuum will process them anyway.
1527 				 */
1528 				Assert(params->index_cleanup == VACOPT_TERNARY_DISABLED);
1529 			}
1530 
1531 			/*
1532 			 * Forget the now-vacuumed tuples, and press on, but be careful
1533 			 * not to reset latestRemovedXid since we want that value to be
1534 			 * valid.
1535 			 */
1536 			dead_tuples->num_tuples = 0;
1537 
1538 			/*
1539 			 * Periodically do incremental FSM vacuuming to make newly-freed
1540 			 * space visible on upper FSM pages.  Note: although we've cleaned
1541 			 * the current block, we haven't yet updated its FSM entry (that
1542 			 * happens further down), so passing end == blkno is correct.
1543 			 */
1544 			if (blkno - next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES)
1545 			{
1546 				FreeSpaceMapVacuumRange(onerel, next_fsm_block_to_vacuum,
1547 										blkno);
1548 				next_fsm_block_to_vacuum = blkno;
1549 			}
1550 		}
1551 
1552 		freespace = PageGetHeapFreeSpace(page);
1553 
1554 		/* mark page all-visible, if appropriate */
1555 		if (all_visible && !all_visible_according_to_vm)
1556 		{
1557 			uint8		flags = VISIBILITYMAP_ALL_VISIBLE;
1558 
1559 			if (all_frozen)
1560 				flags |= VISIBILITYMAP_ALL_FROZEN;
1561 
1562 			/*
1563 			 * It should never be the case that the visibility map page is set
1564 			 * while the page-level bit is clear, but the reverse is allowed
1565 			 * (if checksums are not enabled).  Regardless, set both bits so
1566 			 * that we get back in sync.
1567 			 *
1568 			 * NB: If the heap page is all-visible but the VM bit is not set,
1569 			 * we don't need to dirty the heap page.  However, if checksums
1570 			 * are enabled, we do need to make sure that the heap page is
1571 			 * dirtied before passing it to visibilitymap_set(), because it
1572 			 * may be logged.  Given that this situation should only happen in
1573 			 * rare cases after a crash, it is not worth optimizing.
1574 			 */
1575 			PageSetAllVisible(page);
1576 			MarkBufferDirty(buf);
1577 			visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
1578 							  vmbuffer, visibility_cutoff_xid, flags);
1579 		}
1580 
1581 		/*
1582 		 * As of PostgreSQL 9.2, the visibility map bit should never be set if
1583 		 * the page-level bit is clear.  However, it's possible that the bit
1584 		 * got cleared after we checked it and before we took the buffer
1585 		 * content lock, so we must recheck before jumping to the conclusion
1586 		 * that something bad has happened.
1587 		 */
1588 		else if (all_visible_according_to_vm && !PageIsAllVisible(page)
1589 				 && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer))
1590 		{
1591 			elog(WARNING, "page is not marked all-visible but visibility map bit is set in relation \"%s\" page %u",
1592 				 vacrelstats->relname, blkno);
1593 			visibilitymap_clear(onerel, blkno, vmbuffer,
1594 								VISIBILITYMAP_VALID_BITS);
1595 		}
1596 
1597 		/*
1598 		 * It's possible for the value returned by GetOldestXmin() to move
1599 		 * backwards, so it's not wrong for us to see tuples that appear to
1600 		 * not be visible to everyone yet, while PD_ALL_VISIBLE is already
1601 		 * set. The real safe xmin value never moves backwards, but
1602 		 * GetOldestXmin() is conservative and sometimes returns a value
1603 		 * that's unnecessarily small, so if we see that contradiction it just
1604 		 * means that the tuples that we think are not visible to everyone yet
1605 		 * actually are, and the PD_ALL_VISIBLE flag is correct.
1606 		 *
1607 		 * There should never be dead tuples on a page with PD_ALL_VISIBLE
1608 		 * set, however.
1609 		 */
1610 		else if (PageIsAllVisible(page) && has_dead_tuples)
1611 		{
1612 			elog(WARNING, "page containing dead tuples is marked as all-visible in relation \"%s\" page %u",
1613 				 vacrelstats->relname, blkno);
1614 			PageClearAllVisible(page);
1615 			MarkBufferDirty(buf);
1616 			visibilitymap_clear(onerel, blkno, vmbuffer,
1617 								VISIBILITYMAP_VALID_BITS);
1618 		}
1619 
1620 		/*
1621 		 * If the all-visible page is all-frozen but not marked as such yet,
1622 		 * mark it as all-frozen.  Note that all_frozen is only valid if
1623 		 * all_visible is true, so we must check both.
1624 		 */
1625 		else if (all_visible_according_to_vm && all_visible && all_frozen &&
1626 				 !VM_ALL_FROZEN(onerel, blkno, &vmbuffer))
1627 		{
1628 			/*
1629 			 * We can pass InvalidTransactionId as the cutoff XID here,
1630 			 * because setting the all-frozen bit doesn't cause recovery
1631 			 * conflicts.
1632 			 */
1633 			visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
1634 							  vmbuffer, InvalidTransactionId,
1635 							  VISIBILITYMAP_ALL_FROZEN);
1636 		}
1637 
1638 		UnlockReleaseBuffer(buf);
1639 
1640 		/* Remember the location of the last page with nonremovable tuples */
1641 		if (hastup)
1642 			vacrelstats->nonempty_pages = blkno + 1;
1643 
1644 		/*
1645 		 * If we remembered any tuples for deletion, then the page will be
1646 		 * visited again by lazy_vacuum_heap, which will compute and record
1647 		 * its post-compaction free space.  If not, then we're done with this
1648 		 * page, so remember its free space as-is.  (This path will always be
1649 		 * taken if there are no indexes.)
1650 		 */
1651 		if (dead_tuples->num_tuples == prev_dead_count)
1652 			RecordPageWithFreeSpace(onerel, blkno, freespace);
1653 	}
1654 
1655 	/* report that everything is scanned and vacuumed */
1656 	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
1657 
1658 	/* Clear the block number information */
1659 	vacrelstats->blkno = InvalidBlockNumber;
1660 
1661 	pfree(frozen);
1662 
1663 	/* save stats for use later */
1664 	vacrelstats->tuples_deleted = tups_vacuumed;
1665 	vacrelstats->new_dead_tuples = nkeep;
1666 
1667 	/* now we can compute the new value for pg_class.reltuples */
1668 	vacrelstats->new_live_tuples = vac_estimate_reltuples(onerel,
1669 														  nblocks,
1670 														  vacrelstats->tupcount_pages,
1671 														  live_tuples);
1672 
1673 	/* also compute total number of surviving heap entries */
1674 	vacrelstats->new_rel_tuples =
1675 		vacrelstats->new_live_tuples + vacrelstats->new_dead_tuples;
1676 
1677 	/*
1678 	 * Release any remaining pin on visibility map page.
1679 	 */
1680 	if (BufferIsValid(vmbuffer))
1681 	{
1682 		ReleaseBuffer(vmbuffer);
1683 		vmbuffer = InvalidBuffer;
1684 	}
1685 
1686 	/* If any tuples need to be deleted, perform final vacuum cycle */
1687 	/* XXX put a threshold on min number of tuples here? */
1688 	if (dead_tuples->num_tuples > 0)
1689 	{
1690 		/* Work on all the indexes, and then the heap */
1691 		lazy_vacuum_all_indexes(onerel, Irel, indstats, vacrelstats,
1692 								lps, nindexes);
1693 
1694 		/* Remove tuples from heap */
1695 		lazy_vacuum_heap(onerel, vacrelstats);
1696 	}
1697 
1698 	/*
1699 	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
1700 	 * not there were indexes.
1701 	 */
1702 	if (blkno > next_fsm_block_to_vacuum)
1703 		FreeSpaceMapVacuumRange(onerel, next_fsm_block_to_vacuum, blkno);
1704 
1705 	/* report all blocks vacuumed */
1706 	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
1707 
1708 	/* Do post-vacuum cleanup */
1709 	if (vacrelstats->useindex)
1710 		lazy_cleanup_all_indexes(Irel, indstats, vacrelstats, lps, nindexes);
1711 
1712 	/*
1713 	 * End parallel mode before updating index statistics as we cannot write
1714 	 * during parallel mode.
1715 	 */
1716 	if (ParallelVacuumIsActive(lps))
1717 		end_parallel_vacuum(indstats, lps, nindexes);
1718 
1719 	/* Update index statistics */
1720 	if (vacrelstats->useindex)
1721 		update_index_statistics(Irel, indstats, nindexes);
1722 
1723 	/* If no indexes, make log report that lazy_vacuum_heap would've made */
1724 	if (vacuumed_pages)
1725 		ereport(elevel,
1726 				(errmsg("\"%s\": removed %.0f row versions in %u pages",
1727 						vacrelstats->relname,
1728 						tups_vacuumed, vacuumed_pages)));
1729 
1730 	/*
1731 	 * This is pretty messy, but we split it up so that we can skip emitting
1732 	 * individual parts of the message when not applicable.
1733 	 */
1734 	initStringInfo(&buf);
1735 	appendStringInfo(&buf,
1736 					 _("%.0f dead row versions cannot be removed yet, oldest xmin: %u\n"),
1737 					 nkeep, OldestXmin);
1738 	appendStringInfo(&buf, _("There were %.0f unused item identifiers.\n"),
1739 					 nunused);
1740 	appendStringInfo(&buf, ngettext("Skipped %u page due to buffer pins, ",
1741 									"Skipped %u pages due to buffer pins, ",
1742 									vacrelstats->pinskipped_pages),
1743 					 vacrelstats->pinskipped_pages);
1744 	appendStringInfo(&buf, ngettext("%u frozen page.\n",
1745 									"%u frozen pages.\n",
1746 									vacrelstats->frozenskipped_pages),
1747 					 vacrelstats->frozenskipped_pages);
1748 	appendStringInfo(&buf, ngettext("%u page is entirely empty.\n",
1749 									"%u pages are entirely empty.\n",
1750 									empty_pages),
1751 					 empty_pages);
1752 	appendStringInfo(&buf, _("%s."), pg_rusage_show(&ru0));
1753 
1754 	ereport(elevel,
1755 			(errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u out of %u pages",
1756 					vacrelstats->relname,
1757 					tups_vacuumed, num_tuples,
1758 					vacrelstats->scanned_pages, nblocks),
1759 			 errdetail_internal("%s", buf.data)));
1760 	pfree(buf.data);
1761 }
1762 
1763 /*
1764  *	lazy_vacuum_all_indexes() -- vacuum all indexes of relation.
1765  *
1766  * We process the indexes serially unless we are doing parallel vacuum.
1767  */
1768 static void
lazy_vacuum_all_indexes(Relation onerel,Relation * Irel,IndexBulkDeleteResult ** stats,LVRelStats * vacrelstats,LVParallelState * lps,int nindexes)1769 lazy_vacuum_all_indexes(Relation onerel, Relation *Irel,
1770 						IndexBulkDeleteResult **stats,
1771 						LVRelStats *vacrelstats, LVParallelState *lps,
1772 						int nindexes)
1773 {
1774 	Assert(!IsParallelWorker());
1775 	Assert(nindexes > 0);
1776 
1777 	/* Log cleanup info before we touch indexes */
1778 	vacuum_log_cleanup_info(onerel, vacrelstats);
1779 
1780 	/* Report that we are now vacuuming indexes */
1781 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
1782 								 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
1783 
1784 	/* Perform index vacuuming with parallel workers for parallel vacuum. */
1785 	if (ParallelVacuumIsActive(lps))
1786 	{
1787 		/* Tell parallel workers to do index vacuuming */
1788 		lps->lvshared->for_cleanup = false;
1789 		lps->lvshared->first_time = false;
1790 
1791 		/*
1792 		 * We can only provide an approximate value of num_heap_tuples in
1793 		 * vacuum cases.
1794 		 */
1795 		lps->lvshared->reltuples = vacrelstats->old_live_tuples;
1796 		lps->lvshared->estimated_count = true;
1797 
1798 		lazy_parallel_vacuum_indexes(Irel, stats, vacrelstats, lps, nindexes);
1799 	}
1800 	else
1801 	{
1802 		int			idx;
1803 
1804 		for (idx = 0; idx < nindexes; idx++)
1805 			lazy_vacuum_index(Irel[idx], &stats[idx], vacrelstats->dead_tuples,
1806 							  vacrelstats->old_live_tuples, vacrelstats);
1807 	}
1808 
1809 	/* Increase and report the number of index scans */
1810 	vacrelstats->num_index_scans++;
1811 	pgstat_progress_update_param(PROGRESS_VACUUM_NUM_INDEX_VACUUMS,
1812 								 vacrelstats->num_index_scans);
1813 }
1814 
1815 
1816 /*
1817  *	lazy_vacuum_heap() -- second pass over the heap
1818  *
1819  *		This routine marks dead tuples as unused and compacts out free
1820  *		space on their pages.  Pages not having dead tuples recorded from
1821  *		lazy_scan_heap are not visited at all.
1822  *
1823  * Note: the reason for doing this as a second pass is we cannot remove
1824  * the tuples until we've removed their index entries, and we want to
1825  * process index entry removal in batches as large as possible.
1826  */
1827 static void
lazy_vacuum_heap(Relation onerel,LVRelStats * vacrelstats)1828 lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
1829 {
1830 	int			tupindex;
1831 	int			npages;
1832 	PGRUsage	ru0;
1833 	Buffer		vmbuffer = InvalidBuffer;
1834 	LVSavedErrInfo saved_err_info;
1835 
1836 	/* Report that we are now vacuuming the heap */
1837 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
1838 								 PROGRESS_VACUUM_PHASE_VACUUM_HEAP);
1839 
1840 	/* Update error traceback information */
1841 	update_vacuum_error_info(vacrelstats, &saved_err_info, VACUUM_ERRCB_PHASE_VACUUM_HEAP,
1842 							 InvalidBlockNumber);
1843 
1844 	pg_rusage_init(&ru0);
1845 	npages = 0;
1846 
1847 	tupindex = 0;
1848 	while (tupindex < vacrelstats->dead_tuples->num_tuples)
1849 	{
1850 		BlockNumber tblk;
1851 		Buffer		buf;
1852 		Page		page;
1853 		Size		freespace;
1854 
1855 		vacuum_delay_point();
1856 
1857 		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples->itemptrs[tupindex]);
1858 		vacrelstats->blkno = tblk;
1859 		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
1860 								 vac_strategy);
1861 		if (!ConditionalLockBufferForCleanup(buf))
1862 		{
1863 			ReleaseBuffer(buf);
1864 			++tupindex;
1865 			continue;
1866 		}
1867 		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats,
1868 									&vmbuffer);
1869 
1870 		/* Now that we've compacted the page, record its available space */
1871 		page = BufferGetPage(buf);
1872 		freespace = PageGetHeapFreeSpace(page);
1873 
1874 		UnlockReleaseBuffer(buf);
1875 		RecordPageWithFreeSpace(onerel, tblk, freespace);
1876 		npages++;
1877 	}
1878 
1879 	/* Clear the block number information */
1880 	vacrelstats->blkno = InvalidBlockNumber;
1881 
1882 	if (BufferIsValid(vmbuffer))
1883 	{
1884 		ReleaseBuffer(vmbuffer);
1885 		vmbuffer = InvalidBuffer;
1886 	}
1887 
1888 	ereport(elevel,
1889 			(errmsg("\"%s\": removed %d row versions in %d pages",
1890 					vacrelstats->relname,
1891 					tupindex, npages),
1892 			 errdetail_internal("%s", pg_rusage_show(&ru0))));
1893 
1894 	/* Revert to the previous phase information for error traceback */
1895 	restore_vacuum_error_info(vacrelstats, &saved_err_info);
1896 }
1897 
1898 /*
1899  *	lazy_vacuum_page() -- free dead tuples on a page
1900  *					 and repair its fragmentation.
1901  *
1902  * Caller must hold pin and buffer cleanup lock on the buffer.
1903  *
1904  * tupindex is the index in vacrelstats->dead_tuples of the first dead
1905  * tuple for this page.  We assume the rest follow sequentially.
1906  * The return value is the first tupindex after the tuples of this page.
1907  */
1908 static int
lazy_vacuum_page(Relation onerel,BlockNumber blkno,Buffer buffer,int tupindex,LVRelStats * vacrelstats,Buffer * vmbuffer)1909 lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
1910 				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
1911 {
1912 	LVDeadTuples *dead_tuples = vacrelstats->dead_tuples;
1913 	Page		page = BufferGetPage(buffer);
1914 	OffsetNumber unused[MaxOffsetNumber];
1915 	int			uncnt = 0;
1916 	TransactionId visibility_cutoff_xid;
1917 	bool		all_frozen;
1918 	LVSavedErrInfo saved_err_info;
1919 
1920 	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
1921 
1922 	/* Update error traceback information */
1923 	update_vacuum_error_info(vacrelstats, &saved_err_info, VACUUM_ERRCB_PHASE_VACUUM_HEAP,
1924 							 blkno);
1925 
1926 	START_CRIT_SECTION();
1927 
1928 	for (; tupindex < dead_tuples->num_tuples; tupindex++)
1929 	{
1930 		BlockNumber tblk;
1931 		OffsetNumber toff;
1932 		ItemId		itemid;
1933 
1934 		tblk = ItemPointerGetBlockNumber(&dead_tuples->itemptrs[tupindex]);
1935 		if (tblk != blkno)
1936 			break;				/* past end of tuples for this block */
1937 		toff = ItemPointerGetOffsetNumber(&dead_tuples->itemptrs[tupindex]);
1938 		itemid = PageGetItemId(page, toff);
1939 		ItemIdSetUnused(itemid);
1940 		unused[uncnt++] = toff;
1941 	}
1942 
1943 	PageRepairFragmentation(page);
1944 
1945 	/*
1946 	 * Mark buffer dirty before we write WAL.
1947 	 */
1948 	MarkBufferDirty(buffer);
1949 
1950 	/* XLOG stuff */
1951 	if (RelationNeedsWAL(onerel))
1952 	{
1953 		XLogRecPtr	recptr;
1954 
1955 		recptr = log_heap_clean(onerel, buffer,
1956 								NULL, 0, NULL, 0,
1957 								unused, uncnt,
1958 								vacrelstats->latestRemovedXid);
1959 		PageSetLSN(page, recptr);
1960 	}
1961 
1962 	/*
1963 	 * End critical section, so we safely can do visibility tests (which
1964 	 * possibly need to perform IO and allocate memory!). If we crash now the
1965 	 * page (including the corresponding vm bit) might not be marked all
1966 	 * visible, but that's fine. A later vacuum will fix that.
1967 	 */
1968 	END_CRIT_SECTION();
1969 
1970 	/*
1971 	 * Now that we have removed the dead tuples from the page, once again
1972 	 * check if the page has become all-visible.  The page is already marked
1973 	 * dirty, exclusively locked, and, if needed, a full page image has been
1974 	 * emitted in the log_heap_clean() above.
1975 	 */
1976 	if (heap_page_is_all_visible(onerel, buffer, &visibility_cutoff_xid,
1977 								 &all_frozen))
1978 		PageSetAllVisible(page);
1979 
1980 	/*
1981 	 * All the changes to the heap page have been done. If the all-visible
1982 	 * flag is now set, also set the VM all-visible bit (and, if possible, the
1983 	 * all-frozen bit) unless this has already been done previously.
1984 	 */
1985 	if (PageIsAllVisible(page))
1986 	{
1987 		uint8		vm_status = visibilitymap_get_status(onerel, blkno, vmbuffer);
1988 		uint8		flags = 0;
1989 
1990 		/* Set the VM all-frozen bit to flag, if needed */
1991 		if ((vm_status & VISIBILITYMAP_ALL_VISIBLE) == 0)
1992 			flags |= VISIBILITYMAP_ALL_VISIBLE;
1993 		if ((vm_status & VISIBILITYMAP_ALL_FROZEN) == 0 && all_frozen)
1994 			flags |= VISIBILITYMAP_ALL_FROZEN;
1995 
1996 		Assert(BufferIsValid(*vmbuffer));
1997 		if (flags != 0)
1998 			visibilitymap_set(onerel, blkno, buffer, InvalidXLogRecPtr,
1999 							  *vmbuffer, visibility_cutoff_xid, flags);
2000 	}
2001 
2002 	/* Revert to the previous phase information for error traceback */
2003 	restore_vacuum_error_info(vacrelstats, &saved_err_info);
2004 	return tupindex;
2005 }
2006 
2007 /*
2008  *	lazy_check_needs_freeze() -- scan page to see if any tuples
2009  *					 need to be cleaned to avoid wraparound
2010  *
2011  * Returns true if the page needs to be vacuumed using cleanup lock.
2012  * Also returns a flag indicating whether page contains any tuples at all.
2013  */
2014 static bool
lazy_check_needs_freeze(Buffer buf,bool * hastup)2015 lazy_check_needs_freeze(Buffer buf, bool *hastup)
2016 {
2017 	Page		page = BufferGetPage(buf);
2018 	OffsetNumber offnum,
2019 				maxoff;
2020 	HeapTupleHeader tupleheader;
2021 
2022 	*hastup = false;
2023 
2024 	/*
2025 	 * New and empty pages, obviously, don't contain tuples. We could make
2026 	 * sure that the page is registered in the FSM, but it doesn't seem worth
2027 	 * waiting for a cleanup lock just for that, especially because it's
2028 	 * likely that the pin holder will do so.
2029 	 */
2030 	if (PageIsNew(page) || PageIsEmpty(page))
2031 		return false;
2032 
2033 	maxoff = PageGetMaxOffsetNumber(page);
2034 	for (offnum = FirstOffsetNumber;
2035 		 offnum <= maxoff;
2036 		 offnum = OffsetNumberNext(offnum))
2037 	{
2038 		ItemId		itemid;
2039 
2040 		itemid = PageGetItemId(page, offnum);
2041 
2042 		/* this should match hastup test in count_nondeletable_pages() */
2043 		if (ItemIdIsUsed(itemid))
2044 			*hastup = true;
2045 
2046 		/* dead and redirect items never need freezing */
2047 		if (!ItemIdIsNormal(itemid))
2048 			continue;
2049 
2050 		tupleheader = (HeapTupleHeader) PageGetItem(page, itemid);
2051 
2052 		if (heap_tuple_needs_freeze(tupleheader, FreezeLimit,
2053 									MultiXactCutoff, buf))
2054 			return true;
2055 	}							/* scan along page */
2056 
2057 	return false;
2058 }
2059 
2060 /*
2061  * Perform index vacuum or index cleanup with parallel workers.  This function
2062  * must be used by the parallel vacuum leader process.  The caller must set
2063  * lps->lvshared->for_cleanup to indicate whether to perform vacuum or
2064  * cleanup.
2065  */
2066 static void
lazy_parallel_vacuum_indexes(Relation * Irel,IndexBulkDeleteResult ** stats,LVRelStats * vacrelstats,LVParallelState * lps,int nindexes)2067 lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
2068 							 LVRelStats *vacrelstats, LVParallelState *lps,
2069 							 int nindexes)
2070 {
2071 	int			nworkers;
2072 
2073 	Assert(!IsParallelWorker());
2074 	Assert(ParallelVacuumIsActive(lps));
2075 	Assert(nindexes > 0);
2076 
2077 	/* Determine the number of parallel workers to launch */
2078 	if (lps->lvshared->for_cleanup)
2079 	{
2080 		if (lps->lvshared->first_time)
2081 			nworkers = lps->nindexes_parallel_cleanup +
2082 				lps->nindexes_parallel_condcleanup;
2083 		else
2084 			nworkers = lps->nindexes_parallel_cleanup;
2085 	}
2086 	else
2087 		nworkers = lps->nindexes_parallel_bulkdel;
2088 
2089 	/* The leader process will participate */
2090 	nworkers--;
2091 
2092 	/*
2093 	 * It is possible that parallel context is initialized with fewer workers
2094 	 * than the number of indexes that need a separate worker in the current
2095 	 * phase, so we need to consider it.  See compute_parallel_vacuum_workers.
2096 	 */
2097 	nworkers = Min(nworkers, lps->pcxt->nworkers);
2098 
2099 	/* Setup the shared cost-based vacuum delay and launch workers */
2100 	if (nworkers > 0)
2101 	{
2102 		if (vacrelstats->num_index_scans > 0)
2103 		{
2104 			/* Reset the parallel index processing counter */
2105 			pg_atomic_write_u32(&(lps->lvshared->idx), 0);
2106 
2107 			/* Reinitialize the parallel context to relaunch parallel workers */
2108 			ReinitializeParallelDSM(lps->pcxt);
2109 		}
2110 
2111 		/*
2112 		 * Set up shared cost balance and the number of active workers for
2113 		 * vacuum delay.  We need to do this before launching workers as
2114 		 * otherwise, they might not see the updated values for these
2115 		 * parameters.
2116 		 */
2117 		pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance);
2118 		pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0);
2119 
2120 		/*
2121 		 * The number of workers can vary between bulkdelete and cleanup
2122 		 * phase.
2123 		 */
2124 		ReinitializeParallelWorkers(lps->pcxt, nworkers);
2125 
2126 		LaunchParallelWorkers(lps->pcxt);
2127 
2128 		if (lps->pcxt->nworkers_launched > 0)
2129 		{
2130 			/*
2131 			 * Reset the local cost values for leader backend as we have
2132 			 * already accumulated the remaining balance of heap.
2133 			 */
2134 			VacuumCostBalance = 0;
2135 			VacuumCostBalanceLocal = 0;
2136 
2137 			/* Enable shared cost balance for leader backend */
2138 			VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
2139 			VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
2140 		}
2141 
2142 		if (lps->lvshared->for_cleanup)
2143 			ereport(elevel,
2144 					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
2145 									 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
2146 									 lps->pcxt->nworkers_launched),
2147 							lps->pcxt->nworkers_launched, nworkers)));
2148 		else
2149 			ereport(elevel,
2150 					(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
2151 									 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
2152 									 lps->pcxt->nworkers_launched),
2153 							lps->pcxt->nworkers_launched, nworkers)));
2154 	}
2155 
2156 	/* Process the indexes that can be processed by only leader process */
2157 	vacuum_indexes_leader(Irel, stats, vacrelstats, lps, nindexes);
2158 
2159 	/*
2160 	 * Join as a parallel worker.  The leader process alone processes all the
2161 	 * indexes in the case where no workers are launched.
2162 	 */
2163 	parallel_vacuum_index(Irel, stats, lps->lvshared,
2164 						  vacrelstats->dead_tuples, nindexes, vacrelstats);
2165 
2166 	/*
2167 	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
2168 	 * to finish, or we might get incomplete data.)
2169 	 */
2170 	if (nworkers > 0)
2171 	{
2172 		int			i;
2173 
2174 		/* Wait for all vacuum workers to finish */
2175 		WaitForParallelWorkersToFinish(lps->pcxt);
2176 
2177 		for (i = 0; i < lps->pcxt->nworkers_launched; i++)
2178 			InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
2179 	}
2180 
2181 	/*
2182 	 * Carry the shared balance value to heap scan and disable shared costing
2183 	 */
2184 	if (VacuumSharedCostBalance)
2185 	{
2186 		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
2187 		VacuumSharedCostBalance = NULL;
2188 		VacuumActiveNWorkers = NULL;
2189 	}
2190 }
2191 
2192 /*
2193  * Index vacuum/cleanup routine used by the leader process and parallel
2194  * vacuum worker processes to process the indexes in parallel.
2195  */
2196 static void
parallel_vacuum_index(Relation * Irel,IndexBulkDeleteResult ** stats,LVShared * lvshared,LVDeadTuples * dead_tuples,int nindexes,LVRelStats * vacrelstats)2197 parallel_vacuum_index(Relation *Irel, IndexBulkDeleteResult **stats,
2198 					  LVShared *lvshared, LVDeadTuples *dead_tuples,
2199 					  int nindexes, LVRelStats *vacrelstats)
2200 {
2201 	/*
2202 	 * Increment the active worker count if we are able to launch any worker.
2203 	 */
2204 	if (VacuumActiveNWorkers)
2205 		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
2206 
2207 	/* Loop until all indexes are vacuumed */
2208 	for (;;)
2209 	{
2210 		int			idx;
2211 		LVSharedIndStats *shared_indstats;
2212 
2213 		/* Get an index number to process */
2214 		idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1);
2215 
2216 		/* Done for all indexes? */
2217 		if (idx >= nindexes)
2218 			break;
2219 
2220 		/* Get the index statistics of this index from DSM */
2221 		shared_indstats = get_indstats(lvshared, idx);
2222 
2223 		/*
2224 		 * Skip processing indexes that don't participate in parallel
2225 		 * operation
2226 		 */
2227 		if (shared_indstats == NULL ||
2228 			skip_parallel_vacuum_index(Irel[idx], lvshared))
2229 			continue;
2230 
2231 		/* Do vacuum or cleanup of the index */
2232 		vacuum_one_index(Irel[idx], &(stats[idx]), lvshared, shared_indstats,
2233 						 dead_tuples, vacrelstats);
2234 	}
2235 
2236 	/*
2237 	 * We have completed the index vacuum so decrement the active worker
2238 	 * count.
2239 	 */
2240 	if (VacuumActiveNWorkers)
2241 		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
2242 }
2243 
2244 /*
2245  * Vacuum or cleanup indexes that can be processed by only the leader process
2246  * because these indexes don't support parallel operation at that phase.
2247  */
2248 static void
vacuum_indexes_leader(Relation * Irel,IndexBulkDeleteResult ** stats,LVRelStats * vacrelstats,LVParallelState * lps,int nindexes)2249 vacuum_indexes_leader(Relation *Irel, IndexBulkDeleteResult **stats,
2250 					  LVRelStats *vacrelstats, LVParallelState *lps,
2251 					  int nindexes)
2252 {
2253 	int			i;
2254 
2255 	Assert(!IsParallelWorker());
2256 
2257 	/*
2258 	 * Increment the active worker count if we are able to launch any worker.
2259 	 */
2260 	if (VacuumActiveNWorkers)
2261 		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
2262 
2263 	for (i = 0; i < nindexes; i++)
2264 	{
2265 		LVSharedIndStats *shared_indstats;
2266 
2267 		shared_indstats = get_indstats(lps->lvshared, i);
2268 
2269 		/* Process the indexes skipped by parallel workers */
2270 		if (shared_indstats == NULL ||
2271 			skip_parallel_vacuum_index(Irel[i], lps->lvshared))
2272 			vacuum_one_index(Irel[i], &(stats[i]), lps->lvshared,
2273 							 shared_indstats, vacrelstats->dead_tuples,
2274 							 vacrelstats);
2275 	}
2276 
2277 	/*
2278 	 * We have completed the index vacuum so decrement the active worker
2279 	 * count.
2280 	 */
2281 	if (VacuumActiveNWorkers)
2282 		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
2283 }
2284 
2285 /*
2286  * Vacuum or cleanup index either by leader process or by one of the worker
2287  * process.  After processing the index this function copies the index
2288  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
2289  * segment.
2290  */
2291 static void
vacuum_one_index(Relation indrel,IndexBulkDeleteResult ** stats,LVShared * lvshared,LVSharedIndStats * shared_indstats,LVDeadTuples * dead_tuples,LVRelStats * vacrelstats)2292 vacuum_one_index(Relation indrel, IndexBulkDeleteResult **stats,
2293 				 LVShared *lvshared, LVSharedIndStats *shared_indstats,
2294 				 LVDeadTuples *dead_tuples, LVRelStats *vacrelstats)
2295 {
2296 	IndexBulkDeleteResult *bulkdelete_res = NULL;
2297 
2298 	if (shared_indstats)
2299 	{
2300 		/* Get the space for IndexBulkDeleteResult */
2301 		bulkdelete_res = &(shared_indstats->stats);
2302 
2303 		/*
2304 		 * Update the pointer to the corresponding bulk-deletion result if
2305 		 * someone has already updated it.
2306 		 */
2307 		if (shared_indstats->updated && *stats == NULL)
2308 			*stats = bulkdelete_res;
2309 	}
2310 
2311 	/* Do vacuum or cleanup of the index */
2312 	if (lvshared->for_cleanup)
2313 		lazy_cleanup_index(indrel, stats, lvshared->reltuples,
2314 						   lvshared->estimated_count, vacrelstats);
2315 	else
2316 		lazy_vacuum_index(indrel, stats, dead_tuples,
2317 						  lvshared->reltuples, vacrelstats);
2318 
2319 	/*
2320 	 * Copy the index bulk-deletion result returned from ambulkdelete and
2321 	 * amvacuumcleanup to the DSM segment if it's the first cycle because they
2322 	 * allocate locally and it's possible that an index will be vacuumed by a
2323 	 * different vacuum process the next cycle.  Copying the result normally
2324 	 * happens only the first time an index is vacuumed.  For any additional
2325 	 * vacuum pass, we directly point to the result on the DSM segment and
2326 	 * pass it to vacuum index APIs so that workers can update it directly.
2327 	 *
2328 	 * Since all vacuum workers write the bulk-deletion result at different
2329 	 * slots we can write them without locking.
2330 	 */
2331 	if (shared_indstats && !shared_indstats->updated && *stats != NULL)
2332 	{
2333 		memcpy(bulkdelete_res, *stats, sizeof(IndexBulkDeleteResult));
2334 		shared_indstats->updated = true;
2335 
2336 		/*
2337 		 * Now that stats[idx] points to the DSM segment, we don't need the
2338 		 * locally allocated results.
2339 		 */
2340 		pfree(*stats);
2341 		*stats = bulkdelete_res;
2342 	}
2343 }
2344 
2345 /*
2346  *	lazy_cleanup_all_indexes() -- cleanup all indexes of relation.
2347  *
2348  * Cleanup indexes.  We process the indexes serially unless we are doing
2349  * parallel vacuum.
2350  */
2351 static void
lazy_cleanup_all_indexes(Relation * Irel,IndexBulkDeleteResult ** stats,LVRelStats * vacrelstats,LVParallelState * lps,int nindexes)2352 lazy_cleanup_all_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
2353 						 LVRelStats *vacrelstats, LVParallelState *lps,
2354 						 int nindexes)
2355 {
2356 	int			idx;
2357 
2358 	Assert(!IsParallelWorker());
2359 	Assert(nindexes > 0);
2360 
2361 	/* Report that we are now cleaning up indexes */
2362 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
2363 								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
2364 
2365 	/*
2366 	 * If parallel vacuum is active we perform index cleanup with parallel
2367 	 * workers.
2368 	 */
2369 	if (ParallelVacuumIsActive(lps))
2370 	{
2371 		/* Tell parallel workers to do index cleanup */
2372 		lps->lvshared->for_cleanup = true;
2373 		lps->lvshared->first_time =
2374 			(vacrelstats->num_index_scans == 0);
2375 
2376 		/*
2377 		 * Now we can provide a better estimate of total number of surviving
2378 		 * tuples (we assume indexes are more interested in that than in the
2379 		 * number of nominally live tuples).
2380 		 */
2381 		lps->lvshared->reltuples = vacrelstats->new_rel_tuples;
2382 		lps->lvshared->estimated_count =
2383 			(vacrelstats->tupcount_pages < vacrelstats->rel_pages);
2384 
2385 		lazy_parallel_vacuum_indexes(Irel, stats, vacrelstats, lps, nindexes);
2386 	}
2387 	else
2388 	{
2389 		for (idx = 0; idx < nindexes; idx++)
2390 			lazy_cleanup_index(Irel[idx], &stats[idx],
2391 							   vacrelstats->new_rel_tuples,
2392 							   vacrelstats->tupcount_pages < vacrelstats->rel_pages,
2393 							   vacrelstats);
2394 	}
2395 }
2396 
2397 /*
2398  *	lazy_vacuum_index() -- vacuum one index relation.
2399  *
2400  *		Delete all the index entries pointing to tuples listed in
2401  *		dead_tuples, and update running statistics.
2402  *
2403  *		reltuples is the number of heap tuples to be passed to the
2404  *		bulkdelete callback.
2405  */
2406 static void
lazy_vacuum_index(Relation indrel,IndexBulkDeleteResult ** stats,LVDeadTuples * dead_tuples,double reltuples,LVRelStats * vacrelstats)2407 lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
2408 				  LVDeadTuples *dead_tuples, double reltuples, LVRelStats *vacrelstats)
2409 {
2410 	IndexVacuumInfo ivinfo;
2411 	PGRUsage	ru0;
2412 	LVSavedErrInfo saved_err_info;
2413 
2414 	pg_rusage_init(&ru0);
2415 
2416 	ivinfo.index = indrel;
2417 	ivinfo.analyze_only = false;
2418 	ivinfo.report_progress = false;
2419 	ivinfo.estimated_count = true;
2420 	ivinfo.message_level = elevel;
2421 	ivinfo.num_heap_tuples = reltuples;
2422 	ivinfo.strategy = vac_strategy;
2423 
2424 	/*
2425 	 * Update error traceback information.
2426 	 *
2427 	 * The index name is saved during this phase and restored immediately
2428 	 * after this phase.  See vacuum_error_callback.
2429 	 */
2430 	Assert(vacrelstats->indname == NULL);
2431 	vacrelstats->indname = pstrdup(RelationGetRelationName(indrel));
2432 	update_vacuum_error_info(vacrelstats, &saved_err_info,
2433 							 VACUUM_ERRCB_PHASE_VACUUM_INDEX,
2434 							 InvalidBlockNumber);
2435 
2436 	/* Do bulk deletion */
2437 	*stats = index_bulk_delete(&ivinfo, *stats,
2438 							   lazy_tid_reaped, (void *) dead_tuples);
2439 
2440 	ereport(elevel,
2441 			(errmsg("scanned index \"%s\" to remove %d row versions",
2442 					vacrelstats->indname,
2443 					dead_tuples->num_tuples),
2444 			 errdetail_internal("%s", pg_rusage_show(&ru0))));
2445 
2446 	/* Revert to the previous phase information for error traceback */
2447 	restore_vacuum_error_info(vacrelstats, &saved_err_info);
2448 	pfree(vacrelstats->indname);
2449 	vacrelstats->indname = NULL;
2450 }
2451 
2452 /*
2453  *	lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
2454  *
2455  *		reltuples is the number of heap tuples and estimated_count is true
2456  *		if reltuples is an estimated value.
2457  */
2458 static void
lazy_cleanup_index(Relation indrel,IndexBulkDeleteResult ** stats,double reltuples,bool estimated_count,LVRelStats * vacrelstats)2459 lazy_cleanup_index(Relation indrel,
2460 				   IndexBulkDeleteResult **stats,
2461 				   double reltuples, bool estimated_count, LVRelStats *vacrelstats)
2462 {
2463 	IndexVacuumInfo ivinfo;
2464 	PGRUsage	ru0;
2465 	LVSavedErrInfo saved_err_info;
2466 
2467 	pg_rusage_init(&ru0);
2468 
2469 	ivinfo.index = indrel;
2470 	ivinfo.analyze_only = false;
2471 	ivinfo.report_progress = false;
2472 	ivinfo.estimated_count = estimated_count;
2473 	ivinfo.message_level = elevel;
2474 
2475 	ivinfo.num_heap_tuples = reltuples;
2476 	ivinfo.strategy = vac_strategy;
2477 
2478 	/*
2479 	 * Update error traceback information.
2480 	 *
2481 	 * The index name is saved during this phase and restored immediately
2482 	 * after this phase.  See vacuum_error_callback.
2483 	 */
2484 	Assert(vacrelstats->indname == NULL);
2485 	vacrelstats->indname = pstrdup(RelationGetRelationName(indrel));
2486 	update_vacuum_error_info(vacrelstats, &saved_err_info,
2487 							 VACUUM_ERRCB_PHASE_INDEX_CLEANUP,
2488 							 InvalidBlockNumber);
2489 
2490 	*stats = index_vacuum_cleanup(&ivinfo, *stats);
2491 
2492 	if (*stats)
2493 	{
2494 		ereport(elevel,
2495 				(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
2496 						RelationGetRelationName(indrel),
2497 						(*stats)->num_index_tuples,
2498 						(*stats)->num_pages),
2499 				 errdetail("%.0f index row versions were removed.\n"
2500 						   "%u index pages have been deleted, %u are currently reusable.\n"
2501 						   "%s.",
2502 						   (*stats)->tuples_removed,
2503 						   (*stats)->pages_deleted, (*stats)->pages_free,
2504 						   pg_rusage_show(&ru0))));
2505 	}
2506 
2507 	/* Revert back to the old phase information for error traceback */
2508 	restore_vacuum_error_info(vacrelstats, &saved_err_info);
2509 	pfree(vacrelstats->indname);
2510 	vacrelstats->indname = NULL;
2511 }
2512 
2513 /*
2514  * should_attempt_truncation - should we attempt to truncate the heap?
2515  *
2516  * Don't even think about it unless we have a shot at releasing a goodly
2517  * number of pages.  Otherwise, the time taken isn't worth it.
2518  *
2519  * Also don't attempt it if we are doing early pruning/vacuuming, because a
2520  * scan which cannot find a truncated heap page cannot determine that the
2521  * snapshot is too old to read that page.  We might be able to get away with
2522  * truncating all except one of the pages, setting its LSN to (at least) the
2523  * maximum of the truncated range if we also treated an index leaf tuple
2524  * pointing to a missing heap page as something to trigger the "snapshot too
2525  * old" error, but that seems fragile and seems like it deserves its own patch
2526  * if we consider it.
2527  *
2528  * This is split out so that we can test whether truncation is going to be
2529  * called for before we actually do it.  If you change the logic here, be
2530  * careful to depend only on fields that lazy_scan_heap updates on-the-fly.
2531  */
2532 static bool
should_attempt_truncation(VacuumParams * params,LVRelStats * vacrelstats)2533 should_attempt_truncation(VacuumParams *params, LVRelStats *vacrelstats)
2534 {
2535 	BlockNumber possibly_freeable;
2536 
2537 	if (params->truncate == VACOPT_TERNARY_DISABLED)
2538 		return false;
2539 
2540 	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
2541 	if (possibly_freeable > 0 &&
2542 		(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
2543 		 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) &&
2544 		old_snapshot_threshold < 0)
2545 		return true;
2546 	else
2547 		return false;
2548 }
2549 
2550 /*
2551  * lazy_truncate_heap - try to truncate off any empty pages at the end
2552  */
2553 static void
lazy_truncate_heap(Relation onerel,LVRelStats * vacrelstats)2554 lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
2555 {
2556 	BlockNumber old_rel_pages = vacrelstats->rel_pages;
2557 	BlockNumber new_rel_pages;
2558 	int			lock_retry;
2559 
2560 	/* Report that we are now truncating */
2561 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
2562 								 PROGRESS_VACUUM_PHASE_TRUNCATE);
2563 
2564 	/*
2565 	 * Loop until no more truncating can be done.
2566 	 */
2567 	do
2568 	{
2569 		PGRUsage	ru0;
2570 
2571 		pg_rusage_init(&ru0);
2572 
2573 		/*
2574 		 * We need full exclusive lock on the relation in order to do
2575 		 * truncation. If we can't get it, give up rather than waiting --- we
2576 		 * don't want to block other backends, and we don't want to deadlock
2577 		 * (which is quite possible considering we already hold a lower-grade
2578 		 * lock).
2579 		 */
2580 		vacrelstats->lock_waiter_detected = false;
2581 		lock_retry = 0;
2582 		while (true)
2583 		{
2584 			if (ConditionalLockRelation(onerel, AccessExclusiveLock))
2585 				break;
2586 
2587 			/*
2588 			 * Check for interrupts while trying to (re-)acquire the exclusive
2589 			 * lock.
2590 			 */
2591 			CHECK_FOR_INTERRUPTS();
2592 
2593 			if (++lock_retry > (VACUUM_TRUNCATE_LOCK_TIMEOUT /
2594 								VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL))
2595 			{
2596 				/*
2597 				 * We failed to establish the lock in the specified number of
2598 				 * retries. This means we give up truncating.
2599 				 */
2600 				vacrelstats->lock_waiter_detected = true;
2601 				ereport(elevel,
2602 						(errmsg("\"%s\": stopping truncate due to conflicting lock request",
2603 								vacrelstats->relname)));
2604 				return;
2605 			}
2606 
2607 			pg_usleep(VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL * 1000L);
2608 		}
2609 
2610 		/*
2611 		 * Now that we have exclusive lock, look to see if the rel has grown
2612 		 * whilst we were vacuuming with non-exclusive lock.  If so, give up;
2613 		 * the newly added pages presumably contain non-deletable tuples.
2614 		 */
2615 		new_rel_pages = RelationGetNumberOfBlocks(onerel);
2616 		if (new_rel_pages != old_rel_pages)
2617 		{
2618 			/*
2619 			 * Note: we intentionally don't update vacrelstats->rel_pages with
2620 			 * the new rel size here.  If we did, it would amount to assuming
2621 			 * that the new pages are empty, which is unlikely. Leaving the
2622 			 * numbers alone amounts to assuming that the new pages have the
2623 			 * same tuple density as existing ones, which is less unlikely.
2624 			 */
2625 			UnlockRelation(onerel, AccessExclusiveLock);
2626 			return;
2627 		}
2628 
2629 		/*
2630 		 * Scan backwards from the end to verify that the end pages actually
2631 		 * contain no tuples.  This is *necessary*, not optional, because
2632 		 * other backends could have added tuples to these pages whilst we
2633 		 * were vacuuming.
2634 		 */
2635 		new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
2636 		vacrelstats->blkno = new_rel_pages;
2637 
2638 		if (new_rel_pages >= old_rel_pages)
2639 		{
2640 			/* can't do anything after all */
2641 			UnlockRelation(onerel, AccessExclusiveLock);
2642 			return;
2643 		}
2644 
2645 		/*
2646 		 * Okay to truncate.
2647 		 */
2648 		RelationTruncate(onerel, new_rel_pages);
2649 
2650 		/*
2651 		 * We can release the exclusive lock as soon as we have truncated.
2652 		 * Other backends can't safely access the relation until they have
2653 		 * processed the smgr invalidation that smgrtruncate sent out ... but
2654 		 * that should happen as part of standard invalidation processing once
2655 		 * they acquire lock on the relation.
2656 		 */
2657 		UnlockRelation(onerel, AccessExclusiveLock);
2658 
2659 		/*
2660 		 * Update statistics.  Here, it *is* correct to adjust rel_pages
2661 		 * without also touching reltuples, since the tuple count wasn't
2662 		 * changed by the truncation.
2663 		 */
2664 		vacrelstats->pages_removed += old_rel_pages - new_rel_pages;
2665 		vacrelstats->rel_pages = new_rel_pages;
2666 
2667 		ereport(elevel,
2668 				(errmsg("\"%s\": truncated %u to %u pages",
2669 						vacrelstats->relname,
2670 						old_rel_pages, new_rel_pages),
2671 				 errdetail_internal("%s",
2672 									pg_rusage_show(&ru0))));
2673 		old_rel_pages = new_rel_pages;
2674 	} while (new_rel_pages > vacrelstats->nonempty_pages &&
2675 			 vacrelstats->lock_waiter_detected);
2676 }
2677 
2678 /*
2679  * Rescan end pages to verify that they are (still) empty of tuples.
2680  *
2681  * Returns number of nondeletable pages (last nonempty page + 1).
2682  */
2683 static BlockNumber
count_nondeletable_pages(Relation onerel,LVRelStats * vacrelstats)2684 count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
2685 {
2686 	BlockNumber blkno;
2687 	BlockNumber prefetchedUntil;
2688 	instr_time	starttime;
2689 
2690 	/* Initialize the starttime if we check for conflicting lock requests */
2691 	INSTR_TIME_SET_CURRENT(starttime);
2692 
2693 	/*
2694 	 * Start checking blocks at what we believe relation end to be and move
2695 	 * backwards.  (Strange coding of loop control is needed because blkno is
2696 	 * unsigned.)  To make the scan faster, we prefetch a few blocks at a time
2697 	 * in forward direction, so that OS-level readahead can kick in.
2698 	 */
2699 	blkno = vacrelstats->rel_pages;
2700 	StaticAssertStmt((PREFETCH_SIZE & (PREFETCH_SIZE - 1)) == 0,
2701 					 "prefetch size must be power of 2");
2702 	prefetchedUntil = InvalidBlockNumber;
2703 	while (blkno > vacrelstats->nonempty_pages)
2704 	{
2705 		Buffer		buf;
2706 		Page		page;
2707 		OffsetNumber offnum,
2708 					maxoff;
2709 		bool		hastup;
2710 
2711 		/*
2712 		 * Check if another process requests a lock on our relation. We are
2713 		 * holding an AccessExclusiveLock here, so they will be waiting. We
2714 		 * only do this once per VACUUM_TRUNCATE_LOCK_CHECK_INTERVAL, and we
2715 		 * only check if that interval has elapsed once every 32 blocks to
2716 		 * keep the number of system calls and actual shared lock table
2717 		 * lookups to a minimum.
2718 		 */
2719 		if ((blkno % 32) == 0)
2720 		{
2721 			instr_time	currenttime;
2722 			instr_time	elapsed;
2723 
2724 			INSTR_TIME_SET_CURRENT(currenttime);
2725 			elapsed = currenttime;
2726 			INSTR_TIME_SUBTRACT(elapsed, starttime);
2727 			if ((INSTR_TIME_GET_MICROSEC(elapsed) / 1000)
2728 				>= VACUUM_TRUNCATE_LOCK_CHECK_INTERVAL)
2729 			{
2730 				if (LockHasWaitersRelation(onerel, AccessExclusiveLock))
2731 				{
2732 					ereport(elevel,
2733 							(errmsg("\"%s\": suspending truncate due to conflicting lock request",
2734 									vacrelstats->relname)));
2735 
2736 					vacrelstats->lock_waiter_detected = true;
2737 					return blkno;
2738 				}
2739 				starttime = currenttime;
2740 			}
2741 		}
2742 
2743 		/*
2744 		 * We don't insert a vacuum delay point here, because we have an
2745 		 * exclusive lock on the table which we want to hold for as short a
2746 		 * time as possible.  We still need to check for interrupts however.
2747 		 */
2748 		CHECK_FOR_INTERRUPTS();
2749 
2750 		blkno--;
2751 
2752 		/* If we haven't prefetched this lot yet, do so now. */
2753 		if (prefetchedUntil > blkno)
2754 		{
2755 			BlockNumber prefetchStart;
2756 			BlockNumber pblkno;
2757 
2758 			prefetchStart = blkno & ~(PREFETCH_SIZE - 1);
2759 			for (pblkno = prefetchStart; pblkno <= blkno; pblkno++)
2760 			{
2761 				PrefetchBuffer(onerel, MAIN_FORKNUM, pblkno);
2762 				CHECK_FOR_INTERRUPTS();
2763 			}
2764 			prefetchedUntil = prefetchStart;
2765 		}
2766 
2767 		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
2768 								 RBM_NORMAL, vac_strategy);
2769 
2770 		/* In this phase we only need shared access to the buffer */
2771 		LockBuffer(buf, BUFFER_LOCK_SHARE);
2772 
2773 		page = BufferGetPage(buf);
2774 
2775 		if (PageIsNew(page) || PageIsEmpty(page))
2776 		{
2777 			UnlockReleaseBuffer(buf);
2778 			continue;
2779 		}
2780 
2781 		hastup = false;
2782 		maxoff = PageGetMaxOffsetNumber(page);
2783 		for (offnum = FirstOffsetNumber;
2784 			 offnum <= maxoff;
2785 			 offnum = OffsetNumberNext(offnum))
2786 		{
2787 			ItemId		itemid;
2788 
2789 			itemid = PageGetItemId(page, offnum);
2790 
2791 			/*
2792 			 * Note: any non-unused item should be taken as a reason to keep
2793 			 * this page.  We formerly thought that DEAD tuples could be
2794 			 * thrown away, but that's not so, because we'd not have cleaned
2795 			 * out their index entries.
2796 			 */
2797 			if (ItemIdIsUsed(itemid))
2798 			{
2799 				hastup = true;
2800 				break;			/* can stop scanning */
2801 			}
2802 		}						/* scan along page */
2803 
2804 		UnlockReleaseBuffer(buf);
2805 
2806 		/* Done scanning if we found a tuple here */
2807 		if (hastup)
2808 			return blkno + 1;
2809 	}
2810 
2811 	/*
2812 	 * If we fall out of the loop, all the previously-thought-to-be-empty
2813 	 * pages still are; we need not bother to look at the last known-nonempty
2814 	 * page.
2815 	 */
2816 	return vacrelstats->nonempty_pages;
2817 }
2818 
2819 /*
2820  * Return the maximum number of dead tuples we can record.
2821  */
2822 static long
compute_max_dead_tuples(BlockNumber relblocks,bool useindex)2823 compute_max_dead_tuples(BlockNumber relblocks, bool useindex)
2824 {
2825 	long		maxtuples;
2826 	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
2827 	autovacuum_work_mem != -1 ?
2828 	autovacuum_work_mem : maintenance_work_mem;
2829 
2830 	if (useindex)
2831 	{
2832 		maxtuples = MAXDEADTUPLES(vac_work_mem * 1024L);
2833 		maxtuples = Min(maxtuples, INT_MAX);
2834 		maxtuples = Min(maxtuples, MAXDEADTUPLES(MaxAllocSize));
2835 
2836 		/* curious coding here to ensure the multiplication can't overflow */
2837 		if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks)
2838 			maxtuples = relblocks * LAZY_ALLOC_TUPLES;
2839 
2840 		/* stay sane if small maintenance_work_mem */
2841 		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
2842 	}
2843 	else
2844 		maxtuples = MaxHeapTuplesPerPage;
2845 
2846 	return maxtuples;
2847 }
2848 
2849 /*
2850  * lazy_space_alloc - space allocation decisions for lazy vacuum
2851  *
2852  * See the comments at the head of this file for rationale.
2853  */
2854 static void
lazy_space_alloc(LVRelStats * vacrelstats,BlockNumber relblocks)2855 lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
2856 {
2857 	LVDeadTuples *dead_tuples = NULL;
2858 	long		maxtuples;
2859 
2860 	maxtuples = compute_max_dead_tuples(relblocks, vacrelstats->useindex);
2861 
2862 	dead_tuples = (LVDeadTuples *) palloc(SizeOfDeadTuples(maxtuples));
2863 	dead_tuples->num_tuples = 0;
2864 	dead_tuples->max_tuples = (int) maxtuples;
2865 
2866 	vacrelstats->dead_tuples = dead_tuples;
2867 }
2868 
2869 /*
2870  * lazy_record_dead_tuple - remember one deletable tuple
2871  */
2872 static void
lazy_record_dead_tuple(LVDeadTuples * dead_tuples,ItemPointer itemptr)2873 lazy_record_dead_tuple(LVDeadTuples *dead_tuples, ItemPointer itemptr)
2874 {
2875 	/*
2876 	 * The array shouldn't overflow under normal behavior, but perhaps it
2877 	 * could if we are given a really small maintenance_work_mem. In that
2878 	 * case, just forget the last few tuples (we'll get 'em next time).
2879 	 */
2880 	if (dead_tuples->num_tuples < dead_tuples->max_tuples)
2881 	{
2882 		dead_tuples->itemptrs[dead_tuples->num_tuples] = *itemptr;
2883 		dead_tuples->num_tuples++;
2884 		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
2885 									 dead_tuples->num_tuples);
2886 	}
2887 }
2888 
2889 /*
2890  *	lazy_tid_reaped() -- is a particular tid deletable?
2891  *
2892  *		This has the right signature to be an IndexBulkDeleteCallback.
2893  *
2894  *		Assumes dead_tuples array is in sorted order.
2895  */
2896 static bool
lazy_tid_reaped(ItemPointer itemptr,void * state)2897 lazy_tid_reaped(ItemPointer itemptr, void *state)
2898 {
2899 	LVDeadTuples *dead_tuples = (LVDeadTuples *) state;
2900 	ItemPointer res;
2901 
2902 	res = (ItemPointer) bsearch((void *) itemptr,
2903 								(void *) dead_tuples->itemptrs,
2904 								dead_tuples->num_tuples,
2905 								sizeof(ItemPointerData),
2906 								vac_cmp_itemptr);
2907 
2908 	return (res != NULL);
2909 }
2910 
2911 /*
2912  * Comparator routines for use with qsort() and bsearch().
2913  */
2914 static int
vac_cmp_itemptr(const void * left,const void * right)2915 vac_cmp_itemptr(const void *left, const void *right)
2916 {
2917 	BlockNumber lblk,
2918 				rblk;
2919 	OffsetNumber loff,
2920 				roff;
2921 
2922 	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
2923 	rblk = ItemPointerGetBlockNumber((ItemPointer) right);
2924 
2925 	if (lblk < rblk)
2926 		return -1;
2927 	if (lblk > rblk)
2928 		return 1;
2929 
2930 	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
2931 	roff = ItemPointerGetOffsetNumber((ItemPointer) right);
2932 
2933 	if (loff < roff)
2934 		return -1;
2935 	if (loff > roff)
2936 		return 1;
2937 
2938 	return 0;
2939 }
2940 
2941 /*
2942  * Check if every tuple in the given page is visible to all current and future
2943  * transactions. Also return the visibility_cutoff_xid which is the highest
2944  * xmin amongst the visible tuples.  Set *all_frozen to true if every tuple
2945  * on this page is frozen.
2946  */
2947 static bool
heap_page_is_all_visible(Relation rel,Buffer buf,TransactionId * visibility_cutoff_xid,bool * all_frozen)2948 heap_page_is_all_visible(Relation rel, Buffer buf,
2949 						 TransactionId *visibility_cutoff_xid,
2950 						 bool *all_frozen)
2951 {
2952 	Page		page = BufferGetPage(buf);
2953 	BlockNumber blockno = BufferGetBlockNumber(buf);
2954 	OffsetNumber offnum,
2955 				maxoff;
2956 	bool		all_visible = true;
2957 
2958 	*visibility_cutoff_xid = InvalidTransactionId;
2959 	*all_frozen = true;
2960 
2961 	/*
2962 	 * This is a stripped down version of the line pointer scan in
2963 	 * lazy_scan_heap(). So if you change anything here, also check that code.
2964 	 */
2965 	maxoff = PageGetMaxOffsetNumber(page);
2966 	for (offnum = FirstOffsetNumber;
2967 		 offnum <= maxoff && all_visible;
2968 		 offnum = OffsetNumberNext(offnum))
2969 	{
2970 		ItemId		itemid;
2971 		HeapTupleData tuple;
2972 
2973 		itemid = PageGetItemId(page, offnum);
2974 
2975 		/* Unused or redirect line pointers are of no interest */
2976 		if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
2977 			continue;
2978 
2979 		ItemPointerSet(&(tuple.t_self), blockno, offnum);
2980 
2981 		/*
2982 		 * Dead line pointers can have index pointers pointing to them. So
2983 		 * they can't be treated as visible
2984 		 */
2985 		if (ItemIdIsDead(itemid))
2986 		{
2987 			all_visible = false;
2988 			*all_frozen = false;
2989 			break;
2990 		}
2991 
2992 		Assert(ItemIdIsNormal(itemid));
2993 
2994 		tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2995 		tuple.t_len = ItemIdGetLength(itemid);
2996 		tuple.t_tableOid = RelationGetRelid(rel);
2997 
2998 		switch (HeapTupleSatisfiesVacuum(&tuple, OldestXmin, buf))
2999 		{
3000 			case HEAPTUPLE_LIVE:
3001 				{
3002 					TransactionId xmin;
3003 
3004 					/* Check comments in lazy_scan_heap. */
3005 					if (!HeapTupleHeaderXminCommitted(tuple.t_data))
3006 					{
3007 						all_visible = false;
3008 						*all_frozen = false;
3009 						break;
3010 					}
3011 
3012 					/*
3013 					 * The inserter definitely committed. But is it old enough
3014 					 * that everyone sees it as committed?
3015 					 */
3016 					xmin = HeapTupleHeaderGetXmin(tuple.t_data);
3017 					if (!TransactionIdPrecedes(xmin, OldestXmin))
3018 					{
3019 						all_visible = false;
3020 						*all_frozen = false;
3021 						break;
3022 					}
3023 
3024 					/* Track newest xmin on page. */
3025 					if (TransactionIdFollows(xmin, *visibility_cutoff_xid))
3026 						*visibility_cutoff_xid = xmin;
3027 
3028 					/* Check whether this tuple is already frozen or not */
3029 					if (all_visible && *all_frozen &&
3030 						heap_tuple_needs_eventual_freeze(tuple.t_data))
3031 						*all_frozen = false;
3032 				}
3033 				break;
3034 
3035 			case HEAPTUPLE_DEAD:
3036 			case HEAPTUPLE_RECENTLY_DEAD:
3037 			case HEAPTUPLE_INSERT_IN_PROGRESS:
3038 			case HEAPTUPLE_DELETE_IN_PROGRESS:
3039 				{
3040 					all_visible = false;
3041 					*all_frozen = false;
3042 					break;
3043 				}
3044 			default:
3045 				elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
3046 				break;
3047 		}
3048 	}							/* scan along page */
3049 
3050 	return all_visible;
3051 }
3052 
3053 /*
3054  * Compute the number of parallel worker processes to request.  Both index
3055  * vacuum and index cleanup can be executed with parallel workers.  The index
3056  * is eligible for parallel vacuum iff its size is greater than
3057  * min_parallel_index_scan_size as invoking workers for very small indexes
3058  * can hurt performance.
3059  *
3060  * nrequested is the number of parallel workers that user requested.  If
3061  * nrequested is 0, we compute the parallel degree based on nindexes, that is
3062  * the number of indexes that support parallel vacuum.  This function also
3063  * sets can_parallel_vacuum to remember indexes that participate in parallel
3064  * vacuum.
3065  */
3066 static int
compute_parallel_vacuum_workers(Relation * Irel,int nindexes,int nrequested,bool * can_parallel_vacuum)3067 compute_parallel_vacuum_workers(Relation *Irel, int nindexes, int nrequested,
3068 								bool *can_parallel_vacuum)
3069 {
3070 	int			nindexes_parallel = 0;
3071 	int			nindexes_parallel_bulkdel = 0;
3072 	int			nindexes_parallel_cleanup = 0;
3073 	int			parallel_workers;
3074 	int			i;
3075 
3076 	/*
3077 	 * We don't allow performing parallel operation in standalone backend or
3078 	 * when parallelism is disabled.
3079 	 */
3080 	if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
3081 		return 0;
3082 
3083 	/*
3084 	 * Compute the number of indexes that can participate in parallel vacuum.
3085 	 */
3086 	for (i = 0; i < nindexes; i++)
3087 	{
3088 		uint8		vacoptions = Irel[i]->rd_indam->amparallelvacuumoptions;
3089 
3090 		if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
3091 			RelationGetNumberOfBlocks(Irel[i]) < min_parallel_index_scan_size)
3092 			continue;
3093 
3094 		can_parallel_vacuum[i] = true;
3095 
3096 		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
3097 			nindexes_parallel_bulkdel++;
3098 		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
3099 			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
3100 			nindexes_parallel_cleanup++;
3101 	}
3102 
3103 	nindexes_parallel = Max(nindexes_parallel_bulkdel,
3104 							nindexes_parallel_cleanup);
3105 
3106 	/* The leader process takes one index */
3107 	nindexes_parallel--;
3108 
3109 	/* No index supports parallel vacuum */
3110 	if (nindexes_parallel <= 0)
3111 		return 0;
3112 
3113 	/* Compute the parallel degree */
3114 	parallel_workers = (nrequested > 0) ?
3115 		Min(nrequested, nindexes_parallel) : nindexes_parallel;
3116 
3117 	/* Cap by max_parallel_maintenance_workers */
3118 	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
3119 
3120 	return parallel_workers;
3121 }
3122 
3123 /*
3124  * Initialize variables for shared index statistics, set NULL bitmap and the
3125  * size of stats for each index.
3126  */
3127 static void
prepare_index_statistics(LVShared * lvshared,bool * can_parallel_vacuum,int nindexes)3128 prepare_index_statistics(LVShared *lvshared, bool *can_parallel_vacuum,
3129 						 int nindexes)
3130 {
3131 	int			i;
3132 
3133 	/* Currently, we don't support parallel vacuum for autovacuum */
3134 	Assert(!IsAutoVacuumWorkerProcess());
3135 
3136 	/* Set NULL for all indexes */
3137 	memset(lvshared->bitmap, 0x00, BITMAPLEN(nindexes));
3138 
3139 	for (i = 0; i < nindexes; i++)
3140 	{
3141 		if (!can_parallel_vacuum[i])
3142 			continue;
3143 
3144 		/* Set NOT NULL as this index does support parallelism */
3145 		lvshared->bitmap[i >> 3] |= 1 << (i & 0x07);
3146 	}
3147 }
3148 
3149 /*
3150  * Update index statistics in pg_class if the statistics are accurate.
3151  */
3152 static void
update_index_statistics(Relation * Irel,IndexBulkDeleteResult ** stats,int nindexes)3153 update_index_statistics(Relation *Irel, IndexBulkDeleteResult **stats,
3154 						int nindexes)
3155 {
3156 	int			i;
3157 
3158 	Assert(!IsInParallelMode());
3159 
3160 	for (i = 0; i < nindexes; i++)
3161 	{
3162 		if (stats[i] == NULL || stats[i]->estimated_count)
3163 			continue;
3164 
3165 		/* Update index statistics */
3166 		vac_update_relstats(Irel[i],
3167 							stats[i]->num_pages,
3168 							stats[i]->num_index_tuples,
3169 							0,
3170 							false,
3171 							InvalidTransactionId,
3172 							InvalidMultiXactId,
3173 							false);
3174 		pfree(stats[i]);
3175 	}
3176 }
3177 
3178 /*
3179  * This function prepares and returns parallel vacuum state if we can launch
3180  * even one worker.  This function is responsible for entering parallel mode,
3181  * create a parallel context, and then initialize the DSM segment.
3182  */
3183 static LVParallelState *
begin_parallel_vacuum(Oid relid,Relation * Irel,LVRelStats * vacrelstats,BlockNumber nblocks,int nindexes,int nrequested)3184 begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
3185 					  BlockNumber nblocks, int nindexes, int nrequested)
3186 {
3187 	LVParallelState *lps = NULL;
3188 	ParallelContext *pcxt;
3189 	LVShared   *shared;
3190 	LVDeadTuples *dead_tuples;
3191 	BufferUsage *buffer_usage;
3192 	WalUsage   *wal_usage;
3193 	bool	   *can_parallel_vacuum;
3194 	long		maxtuples;
3195 	Size		est_shared;
3196 	Size		est_deadtuples;
3197 	int			nindexes_mwm = 0;
3198 	int			parallel_workers = 0;
3199 	int			querylen;
3200 	int			i;
3201 
3202 	/*
3203 	 * A parallel vacuum must be requested and there must be indexes on the
3204 	 * relation
3205 	 */
3206 	Assert(nrequested >= 0);
3207 	Assert(nindexes > 0);
3208 
3209 	/*
3210 	 * Compute the number of parallel vacuum workers to launch
3211 	 */
3212 	can_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
3213 	parallel_workers = compute_parallel_vacuum_workers(Irel, nindexes,
3214 													   nrequested,
3215 													   can_parallel_vacuum);
3216 
3217 	/* Can't perform vacuum in parallel */
3218 	if (parallel_workers <= 0)
3219 	{
3220 		pfree(can_parallel_vacuum);
3221 		return lps;
3222 	}
3223 
3224 	lps = (LVParallelState *) palloc0(sizeof(LVParallelState));
3225 
3226 	EnterParallelMode();
3227 	pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
3228 								 parallel_workers);
3229 	Assert(pcxt->nworkers > 0);
3230 	lps->pcxt = pcxt;
3231 
3232 	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
3233 	est_shared = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
3234 	for (i = 0; i < nindexes; i++)
3235 	{
3236 		uint8		vacoptions = Irel[i]->rd_indam->amparallelvacuumoptions;
3237 
3238 		/*
3239 		 * Cleanup option should be either disabled, always performing in
3240 		 * parallel or conditionally performing in parallel.
3241 		 */
3242 		Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
3243 			   ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
3244 		Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
3245 
3246 		/* Skip indexes that don't participate in parallel vacuum */
3247 		if (!can_parallel_vacuum[i])
3248 			continue;
3249 
3250 		if (Irel[i]->rd_indam->amusemaintenanceworkmem)
3251 			nindexes_mwm++;
3252 
3253 		est_shared = add_size(est_shared, sizeof(LVSharedIndStats));
3254 
3255 		/*
3256 		 * Remember the number of indexes that support parallel operation for
3257 		 * each phase.
3258 		 */
3259 		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
3260 			lps->nindexes_parallel_bulkdel++;
3261 		if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
3262 			lps->nindexes_parallel_cleanup++;
3263 		if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
3264 			lps->nindexes_parallel_condcleanup++;
3265 	}
3266 	shm_toc_estimate_chunk(&pcxt->estimator, est_shared);
3267 	shm_toc_estimate_keys(&pcxt->estimator, 1);
3268 
3269 	/* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */
3270 	maxtuples = compute_max_dead_tuples(nblocks, true);
3271 	est_deadtuples = MAXALIGN(SizeOfDeadTuples(maxtuples));
3272 	shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples);
3273 	shm_toc_estimate_keys(&pcxt->estimator, 1);
3274 
3275 	/*
3276 	 * Estimate space for BufferUsage and WalUsage --
3277 	 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
3278 	 *
3279 	 * If there are no extensions loaded that care, we could skip this.  We
3280 	 * have no way of knowing whether anyone's looking at pgBufferUsage or
3281 	 * pgWalUsage, so do it unconditionally.
3282 	 */
3283 	shm_toc_estimate_chunk(&pcxt->estimator,
3284 						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
3285 	shm_toc_estimate_keys(&pcxt->estimator, 1);
3286 	shm_toc_estimate_chunk(&pcxt->estimator,
3287 						   mul_size(sizeof(WalUsage), pcxt->nworkers));
3288 	shm_toc_estimate_keys(&pcxt->estimator, 1);
3289 
3290 	/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
3291 	if (debug_query_string)
3292 	{
3293 		querylen = strlen(debug_query_string);
3294 		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
3295 		shm_toc_estimate_keys(&pcxt->estimator, 1);
3296 	}
3297 	else
3298 		querylen = 0;			/* keep compiler quiet */
3299 
3300 	InitializeParallelDSM(pcxt);
3301 
3302 	/* Prepare shared information */
3303 	shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared);
3304 	MemSet(shared, 0, est_shared);
3305 	shared->relid = relid;
3306 	shared->elevel = elevel;
3307 	shared->maintenance_work_mem_worker =
3308 		(nindexes_mwm > 0) ?
3309 		maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
3310 		maintenance_work_mem;
3311 
3312 	pg_atomic_init_u32(&(shared->cost_balance), 0);
3313 	pg_atomic_init_u32(&(shared->active_nworkers), 0);
3314 	pg_atomic_init_u32(&(shared->idx), 0);
3315 	shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
3316 	prepare_index_statistics(shared, can_parallel_vacuum, nindexes);
3317 
3318 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
3319 	lps->lvshared = shared;
3320 
3321 	/* Prepare the dead tuple space */
3322 	dead_tuples = (LVDeadTuples *) shm_toc_allocate(pcxt->toc, est_deadtuples);
3323 	dead_tuples->max_tuples = maxtuples;
3324 	dead_tuples->num_tuples = 0;
3325 	MemSet(dead_tuples->itemptrs, 0, sizeof(ItemPointerData) * maxtuples);
3326 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
3327 	vacrelstats->dead_tuples = dead_tuples;
3328 
3329 	/*
3330 	 * Allocate space for each worker's BufferUsage and WalUsage; no need to
3331 	 * initialize
3332 	 */
3333 	buffer_usage = shm_toc_allocate(pcxt->toc,
3334 									mul_size(sizeof(BufferUsage), pcxt->nworkers));
3335 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
3336 	lps->buffer_usage = buffer_usage;
3337 	wal_usage = shm_toc_allocate(pcxt->toc,
3338 								 mul_size(sizeof(WalUsage), pcxt->nworkers));
3339 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
3340 	lps->wal_usage = wal_usage;
3341 
3342 	/* Store query string for workers */
3343 	if (debug_query_string)
3344 	{
3345 		char	   *sharedquery;
3346 
3347 		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
3348 		memcpy(sharedquery, debug_query_string, querylen + 1);
3349 		sharedquery[querylen] = '\0';
3350 		shm_toc_insert(pcxt->toc,
3351 					   PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
3352 	}
3353 
3354 	pfree(can_parallel_vacuum);
3355 	return lps;
3356 }
3357 
3358 /*
3359  * Destroy the parallel context, and end parallel mode.
3360  *
3361  * Since writes are not allowed during parallel mode, copy the
3362  * updated index statistics from DSM into local memory and then later use that
3363  * to update the index statistics.  One might think that we can exit from
3364  * parallel mode, update the index statistics and then destroy parallel
3365  * context, but that won't be safe (see ExitParallelMode).
3366  */
3367 static void
end_parallel_vacuum(IndexBulkDeleteResult ** stats,LVParallelState * lps,int nindexes)3368 end_parallel_vacuum(IndexBulkDeleteResult **stats, LVParallelState *lps,
3369 					int nindexes)
3370 {
3371 	int			i;
3372 
3373 	Assert(!IsParallelWorker());
3374 
3375 	/* Copy the updated statistics */
3376 	for (i = 0; i < nindexes; i++)
3377 	{
3378 		LVSharedIndStats *indstats = get_indstats(lps->lvshared, i);
3379 
3380 		/*
3381 		 * Skip unused slot.  The statistics of this index are already stored
3382 		 * in local memory.
3383 		 */
3384 		if (indstats == NULL)
3385 			continue;
3386 
3387 		if (indstats->updated)
3388 		{
3389 			stats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
3390 			memcpy(stats[i], &(indstats->stats), sizeof(IndexBulkDeleteResult));
3391 		}
3392 		else
3393 			stats[i] = NULL;
3394 	}
3395 
3396 	DestroyParallelContext(lps->pcxt);
3397 	ExitParallelMode();
3398 
3399 	/* Deactivate parallel vacuum */
3400 	pfree(lps);
3401 	lps = NULL;
3402 }
3403 
3404 /* Return the Nth index statistics or NULL */
3405 static LVSharedIndStats *
get_indstats(LVShared * lvshared,int n)3406 get_indstats(LVShared *lvshared, int n)
3407 {
3408 	int			i;
3409 	char	   *p;
3410 
3411 	if (IndStatsIsNull(lvshared, n))
3412 		return NULL;
3413 
3414 	p = (char *) GetSharedIndStats(lvshared);
3415 	for (i = 0; i < n; i++)
3416 	{
3417 		if (IndStatsIsNull(lvshared, i))
3418 			continue;
3419 
3420 		p += sizeof(LVSharedIndStats);
3421 	}
3422 
3423 	return (LVSharedIndStats *) p;
3424 }
3425 
3426 /*
3427  * Returns true, if the given index can't participate in parallel index vacuum
3428  * or parallel index cleanup, false, otherwise.
3429  */
3430 static bool
skip_parallel_vacuum_index(Relation indrel,LVShared * lvshared)3431 skip_parallel_vacuum_index(Relation indrel, LVShared *lvshared)
3432 {
3433 	uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
3434 
3435 	/* first_time must be true only if for_cleanup is true */
3436 	Assert(lvshared->for_cleanup || !lvshared->first_time);
3437 
3438 	if (lvshared->for_cleanup)
3439 	{
3440 		/* Skip, if the index does not support parallel cleanup */
3441 		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
3442 			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
3443 			return true;
3444 
3445 		/*
3446 		 * Skip, if the index supports parallel cleanup conditionally, but we
3447 		 * have already processed the index (for bulkdelete).  See the
3448 		 * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
3449 		 * when indexes support parallel cleanup conditionally.
3450 		 */
3451 		if (!lvshared->first_time &&
3452 			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
3453 			return true;
3454 	}
3455 	else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0)
3456 	{
3457 		/* Skip if the index does not support parallel bulk deletion */
3458 		return true;
3459 	}
3460 
3461 	return false;
3462 }
3463 
3464 /*
3465  * Perform work within a launched parallel process.
3466  *
3467  * Since parallel vacuum workers perform only index vacuum or index cleanup,
3468  * we don't need to report progress information.
3469  */
3470 void
parallel_vacuum_main(dsm_segment * seg,shm_toc * toc)3471 parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
3472 {
3473 	Relation	onerel;
3474 	Relation   *indrels;
3475 	LVShared   *lvshared;
3476 	LVDeadTuples *dead_tuples;
3477 	BufferUsage *buffer_usage;
3478 	WalUsage   *wal_usage;
3479 	int			nindexes;
3480 	char	   *sharedquery;
3481 	IndexBulkDeleteResult **stats;
3482 	LVRelStats	vacrelstats;
3483 	ErrorContextCallback errcallback;
3484 
3485 	lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
3486 										   false);
3487 	elevel = lvshared->elevel;
3488 
3489 	if (lvshared->for_cleanup)
3490 		elog(DEBUG1, "starting parallel vacuum worker for cleanup");
3491 	else
3492 		elog(DEBUG1, "starting parallel vacuum worker for bulk delete");
3493 
3494 	/* Set debug_query_string for individual workers */
3495 	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
3496 	debug_query_string = sharedquery;
3497 	pgstat_report_activity(STATE_RUNNING, debug_query_string);
3498 
3499 	/*
3500 	 * Open table.  The lock mode is the same as the leader process.  It's
3501 	 * okay because the lock mode does not conflict among the parallel
3502 	 * workers.
3503 	 */
3504 	onerel = table_open(lvshared->relid, ShareUpdateExclusiveLock);
3505 
3506 	/*
3507 	 * Open all indexes. indrels are sorted in order by OID, which should be
3508 	 * matched to the leader's one.
3509 	 */
3510 	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &indrels);
3511 	Assert(nindexes > 0);
3512 
3513 	/* Each parallel VACUUM worker gets its own access strategy */
3514 	vac_strategy = GetAccessStrategy(BAS_VACUUM);
3515 
3516 	/* Set dead tuple space */
3517 	dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc,
3518 												  PARALLEL_VACUUM_KEY_DEAD_TUPLES,
3519 												  false);
3520 
3521 	/* Set cost-based vacuum delay */
3522 	VacuumCostActive = (VacuumCostDelay > 0);
3523 	VacuumCostBalance = 0;
3524 	VacuumPageHit = 0;
3525 	VacuumPageMiss = 0;
3526 	VacuumPageDirty = 0;
3527 	VacuumCostBalanceLocal = 0;
3528 	VacuumSharedCostBalance = &(lvshared->cost_balance);
3529 	VacuumActiveNWorkers = &(lvshared->active_nworkers);
3530 
3531 	stats = (IndexBulkDeleteResult **)
3532 		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
3533 
3534 	if (lvshared->maintenance_work_mem_worker > 0)
3535 		maintenance_work_mem = lvshared->maintenance_work_mem_worker;
3536 
3537 	/*
3538 	 * Initialize vacrelstats for use as error callback arg by parallel
3539 	 * worker.
3540 	 */
3541 	vacrelstats.relnamespace = get_namespace_name(RelationGetNamespace(onerel));
3542 	vacrelstats.relname = pstrdup(RelationGetRelationName(onerel));
3543 	vacrelstats.indname = NULL;
3544 	vacrelstats.phase = VACUUM_ERRCB_PHASE_UNKNOWN; /* Not yet processing */
3545 
3546 	/* Setup error traceback support for ereport() */
3547 	errcallback.callback = vacuum_error_callback;
3548 	errcallback.arg = &vacrelstats;
3549 	errcallback.previous = error_context_stack;
3550 	error_context_stack = &errcallback;
3551 
3552 	/* Prepare to track buffer usage during parallel execution */
3553 	InstrStartParallelQuery();
3554 
3555 	/* Process indexes to perform vacuum/cleanup */
3556 	parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes,
3557 						  &vacrelstats);
3558 
3559 	/* Report buffer/WAL usage during parallel execution */
3560 	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
3561 	wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
3562 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
3563 						  &wal_usage[ParallelWorkerNumber]);
3564 
3565 	/* Pop the error context stack */
3566 	error_context_stack = errcallback.previous;
3567 
3568 	vac_close_indexes(nindexes, indrels, RowExclusiveLock);
3569 	table_close(onerel, ShareUpdateExclusiveLock);
3570 	FreeAccessStrategy(vac_strategy);
3571 	pfree(stats);
3572 }
3573 
3574 /*
3575  * Error context callback for errors occurring during vacuum.
3576  */
3577 static void
vacuum_error_callback(void * arg)3578 vacuum_error_callback(void *arg)
3579 {
3580 	LVRelStats *errinfo = arg;
3581 
3582 	switch (errinfo->phase)
3583 	{
3584 		case VACUUM_ERRCB_PHASE_SCAN_HEAP:
3585 			if (BlockNumberIsValid(errinfo->blkno))
3586 				errcontext("while scanning block %u of relation \"%s.%s\"",
3587 						   errinfo->blkno, errinfo->relnamespace, errinfo->relname);
3588 			else
3589 				errcontext("while scanning relation \"%s.%s\"",
3590 						   errinfo->relnamespace, errinfo->relname);
3591 			break;
3592 
3593 		case VACUUM_ERRCB_PHASE_VACUUM_HEAP:
3594 			if (BlockNumberIsValid(errinfo->blkno))
3595 				errcontext("while vacuuming block %u of relation \"%s.%s\"",
3596 						   errinfo->blkno, errinfo->relnamespace, errinfo->relname);
3597 			else
3598 				errcontext("while vacuuming relation \"%s.%s\"",
3599 						   errinfo->relnamespace, errinfo->relname);
3600 			break;
3601 
3602 		case VACUUM_ERRCB_PHASE_VACUUM_INDEX:
3603 			errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
3604 					   errinfo->indname, errinfo->relnamespace, errinfo->relname);
3605 			break;
3606 
3607 		case VACUUM_ERRCB_PHASE_INDEX_CLEANUP:
3608 			errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
3609 					   errinfo->indname, errinfo->relnamespace, errinfo->relname);
3610 			break;
3611 
3612 		case VACUUM_ERRCB_PHASE_TRUNCATE:
3613 			if (BlockNumberIsValid(errinfo->blkno))
3614 				errcontext("while truncating relation \"%s.%s\" to %u blocks",
3615 						   errinfo->relnamespace, errinfo->relname, errinfo->blkno);
3616 			break;
3617 
3618 		case VACUUM_ERRCB_PHASE_UNKNOWN:
3619 		default:
3620 			return;				/* do nothing; the errinfo may not be
3621 								 * initialized */
3622 	}
3623 }
3624 
3625 /*
3626  * Updates the information required for vacuum error callback.  This also saves
3627  * the current information which can be later restored via restore_vacuum_error_info.
3628  */
3629 static void
update_vacuum_error_info(LVRelStats * errinfo,LVSavedErrInfo * saved_err_info,int phase,BlockNumber blkno)3630 update_vacuum_error_info(LVRelStats *errinfo, LVSavedErrInfo *saved_err_info, int phase,
3631 						 BlockNumber blkno)
3632 {
3633 	if (saved_err_info)
3634 	{
3635 		saved_err_info->blkno = errinfo->blkno;
3636 		saved_err_info->phase = errinfo->phase;
3637 	}
3638 
3639 	errinfo->blkno = blkno;
3640 	errinfo->phase = phase;
3641 }
3642 
3643 /*
3644  * Restores the vacuum information saved via a prior call to update_vacuum_error_info.
3645  */
3646 static void
restore_vacuum_error_info(LVRelStats * errinfo,const LVSavedErrInfo * saved_err_info)3647 restore_vacuum_error_info(LVRelStats *errinfo, const LVSavedErrInfo *saved_err_info)
3648 {
3649 	errinfo->blkno = saved_err_info->blkno;
3650 	errinfo->phase = saved_err_info->phase;
3651 }
3652