1 /*-------------------------------------------------------------------------
2  *
3  * pg_visibility.c
4  *	  display visibility map information and page-level visibility bits
5  *
6  * Copyright (c) 2016-2021, PostgreSQL Global Development Group
7  *
8  *	  contrib/pg_visibility/pg_visibility.c
9  *-------------------------------------------------------------------------
10  */
11 #include "postgres.h"
12 
13 #include "access/heapam.h"
14 #include "access/htup_details.h"
15 #include "access/visibilitymap.h"
16 #include "catalog/pg_type.h"
17 #include "catalog/storage_xlog.h"
18 #include "funcapi.h"
19 #include "miscadmin.h"
20 #include "storage/bufmgr.h"
21 #include "storage/procarray.h"
22 #include "storage/smgr.h"
23 #include "utils/rel.h"
24 #include "utils/snapmgr.h"
25 
26 PG_MODULE_MAGIC;
27 
28 typedef struct vbits
29 {
30 	BlockNumber next;
31 	BlockNumber count;
32 	uint8		bits[FLEXIBLE_ARRAY_MEMBER];
33 } vbits;
34 
35 typedef struct corrupt_items
36 {
37 	BlockNumber next;
38 	BlockNumber count;
39 	ItemPointer tids;
40 } corrupt_items;
41 
42 PG_FUNCTION_INFO_V1(pg_visibility_map);
43 PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
44 PG_FUNCTION_INFO_V1(pg_visibility);
45 PG_FUNCTION_INFO_V1(pg_visibility_rel);
46 PG_FUNCTION_INFO_V1(pg_visibility_map_summary);
47 PG_FUNCTION_INFO_V1(pg_check_frozen);
48 PG_FUNCTION_INFO_V1(pg_check_visible);
49 PG_FUNCTION_INFO_V1(pg_truncate_visibility_map);
50 
51 static TupleDesc pg_visibility_tupdesc(bool include_blkno, bool include_pd);
52 static vbits *collect_visibility_data(Oid relid, bool include_pd);
53 static corrupt_items *collect_corrupt_items(Oid relid, bool all_visible,
54 											bool all_frozen);
55 static void record_corrupt_item(corrupt_items *items, ItemPointer tid);
56 static bool tuple_all_visible(HeapTuple tup, TransactionId OldestXmin,
57 							  Buffer buffer);
58 static void check_relation_relkind(Relation rel);
59 
60 /*
61  * Visibility map information for a single block of a relation.
62  *
63  * Note: the VM code will silently return zeroes for pages past the end
64  * of the map, so we allow probes up to MaxBlockNumber regardless of the
65  * actual relation size.
66  */
67 Datum
pg_visibility_map(PG_FUNCTION_ARGS)68 pg_visibility_map(PG_FUNCTION_ARGS)
69 {
70 	Oid			relid = PG_GETARG_OID(0);
71 	int64		blkno = PG_GETARG_INT64(1);
72 	int32		mapbits;
73 	Relation	rel;
74 	Buffer		vmbuffer = InvalidBuffer;
75 	TupleDesc	tupdesc;
76 	Datum		values[2];
77 	bool		nulls[2];
78 
79 	rel = relation_open(relid, AccessShareLock);
80 
81 	/* Only some relkinds have a visibility map */
82 	check_relation_relkind(rel);
83 
84 	if (blkno < 0 || blkno > MaxBlockNumber)
85 		ereport(ERROR,
86 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
87 				 errmsg("invalid block number")));
88 
89 	tupdesc = pg_visibility_tupdesc(false, false);
90 	MemSet(nulls, 0, sizeof(nulls));
91 
92 	mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
93 	if (vmbuffer != InvalidBuffer)
94 		ReleaseBuffer(vmbuffer);
95 	values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
96 	values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
97 
98 	relation_close(rel, AccessShareLock);
99 
100 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
101 }
102 
103 /*
104  * Visibility map information for a single block of a relation, plus the
105  * page-level information for the same block.
106  */
107 Datum
pg_visibility(PG_FUNCTION_ARGS)108 pg_visibility(PG_FUNCTION_ARGS)
109 {
110 	Oid			relid = PG_GETARG_OID(0);
111 	int64		blkno = PG_GETARG_INT64(1);
112 	int32		mapbits;
113 	Relation	rel;
114 	Buffer		vmbuffer = InvalidBuffer;
115 	Buffer		buffer;
116 	Page		page;
117 	TupleDesc	tupdesc;
118 	Datum		values[3];
119 	bool		nulls[3];
120 
121 	rel = relation_open(relid, AccessShareLock);
122 
123 	/* Only some relkinds have a visibility map */
124 	check_relation_relkind(rel);
125 
126 	if (blkno < 0 || blkno > MaxBlockNumber)
127 		ereport(ERROR,
128 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129 				 errmsg("invalid block number")));
130 
131 	tupdesc = pg_visibility_tupdesc(false, true);
132 	MemSet(nulls, 0, sizeof(nulls));
133 
134 	mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
135 	if (vmbuffer != InvalidBuffer)
136 		ReleaseBuffer(vmbuffer);
137 	values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
138 	values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
139 
140 	/* Here we have to explicitly check rel size ... */
141 	if (blkno < RelationGetNumberOfBlocks(rel))
142 	{
143 		buffer = ReadBuffer(rel, blkno);
144 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
145 
146 		page = BufferGetPage(buffer);
147 		values[2] = BoolGetDatum(PageIsAllVisible(page));
148 
149 		UnlockReleaseBuffer(buffer);
150 	}
151 	else
152 	{
153 		/* As with the vismap, silently return 0 for pages past EOF */
154 		values[2] = BoolGetDatum(false);
155 	}
156 
157 	relation_close(rel, AccessShareLock);
158 
159 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
160 }
161 
162 /*
163  * Visibility map information for every block in a relation.
164  */
165 Datum
pg_visibility_map_rel(PG_FUNCTION_ARGS)166 pg_visibility_map_rel(PG_FUNCTION_ARGS)
167 {
168 	FuncCallContext *funcctx;
169 	vbits	   *info;
170 
171 	if (SRF_IS_FIRSTCALL())
172 	{
173 		Oid			relid = PG_GETARG_OID(0);
174 		MemoryContext oldcontext;
175 
176 		funcctx = SRF_FIRSTCALL_INIT();
177 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
178 		funcctx->tuple_desc = pg_visibility_tupdesc(true, false);
179 		/* collect_visibility_data will verify the relkind */
180 		funcctx->user_fctx = collect_visibility_data(relid, false);
181 		MemoryContextSwitchTo(oldcontext);
182 	}
183 
184 	funcctx = SRF_PERCALL_SETUP();
185 	info = (vbits *) funcctx->user_fctx;
186 
187 	if (info->next < info->count)
188 	{
189 		Datum		values[3];
190 		bool		nulls[3];
191 		HeapTuple	tuple;
192 
193 		MemSet(nulls, 0, sizeof(nulls));
194 		values[0] = Int64GetDatum(info->next);
195 		values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
196 		values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
197 		info->next++;
198 
199 		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
200 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
201 	}
202 
203 	SRF_RETURN_DONE(funcctx);
204 }
205 
206 /*
207  * Visibility map information for every block in a relation, plus the page
208  * level information for each block.
209  */
210 Datum
pg_visibility_rel(PG_FUNCTION_ARGS)211 pg_visibility_rel(PG_FUNCTION_ARGS)
212 {
213 	FuncCallContext *funcctx;
214 	vbits	   *info;
215 
216 	if (SRF_IS_FIRSTCALL())
217 	{
218 		Oid			relid = PG_GETARG_OID(0);
219 		MemoryContext oldcontext;
220 
221 		funcctx = SRF_FIRSTCALL_INIT();
222 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
223 		funcctx->tuple_desc = pg_visibility_tupdesc(true, true);
224 		/* collect_visibility_data will verify the relkind */
225 		funcctx->user_fctx = collect_visibility_data(relid, true);
226 		MemoryContextSwitchTo(oldcontext);
227 	}
228 
229 	funcctx = SRF_PERCALL_SETUP();
230 	info = (vbits *) funcctx->user_fctx;
231 
232 	if (info->next < info->count)
233 	{
234 		Datum		values[4];
235 		bool		nulls[4];
236 		HeapTuple	tuple;
237 
238 		MemSet(nulls, 0, sizeof(nulls));
239 		values[0] = Int64GetDatum(info->next);
240 		values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
241 		values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
242 		values[3] = BoolGetDatum((info->bits[info->next] & (1 << 2)) != 0);
243 		info->next++;
244 
245 		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
246 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
247 	}
248 
249 	SRF_RETURN_DONE(funcctx);
250 }
251 
252 /*
253  * Count the number of all-visible and all-frozen pages in the visibility
254  * map for a particular relation.
255  */
256 Datum
pg_visibility_map_summary(PG_FUNCTION_ARGS)257 pg_visibility_map_summary(PG_FUNCTION_ARGS)
258 {
259 	Oid			relid = PG_GETARG_OID(0);
260 	Relation	rel;
261 	BlockNumber nblocks;
262 	BlockNumber blkno;
263 	Buffer		vmbuffer = InvalidBuffer;
264 	int64		all_visible = 0;
265 	int64		all_frozen = 0;
266 	TupleDesc	tupdesc;
267 	Datum		values[2];
268 	bool		nulls[2];
269 
270 	rel = relation_open(relid, AccessShareLock);
271 
272 	/* Only some relkinds have a visibility map */
273 	check_relation_relkind(rel);
274 
275 	nblocks = RelationGetNumberOfBlocks(rel);
276 
277 	for (blkno = 0; blkno < nblocks; ++blkno)
278 	{
279 		int32		mapbits;
280 
281 		/* Make sure we are interruptible. */
282 		CHECK_FOR_INTERRUPTS();
283 
284 		/* Get map info. */
285 		mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
286 		if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
287 			++all_visible;
288 		if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
289 			++all_frozen;
290 	}
291 
292 	/* Clean up. */
293 	if (vmbuffer != InvalidBuffer)
294 		ReleaseBuffer(vmbuffer);
295 	relation_close(rel, AccessShareLock);
296 
297 	tupdesc = CreateTemplateTupleDesc(2);
298 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "all_visible", INT8OID, -1, 0);
299 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "all_frozen", INT8OID, -1, 0);
300 	tupdesc = BlessTupleDesc(tupdesc);
301 
302 	MemSet(nulls, 0, sizeof(nulls));
303 	values[0] = Int64GetDatum(all_visible);
304 	values[1] = Int64GetDatum(all_frozen);
305 
306 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
307 }
308 
309 /*
310  * Return the TIDs of non-frozen tuples present in pages marked all-frozen
311  * in the visibility map.  We hope no one will ever find any, but there could
312  * be bugs, database corruption, etc.
313  */
314 Datum
pg_check_frozen(PG_FUNCTION_ARGS)315 pg_check_frozen(PG_FUNCTION_ARGS)
316 {
317 	FuncCallContext *funcctx;
318 	corrupt_items *items;
319 
320 	if (SRF_IS_FIRSTCALL())
321 	{
322 		Oid			relid = PG_GETARG_OID(0);
323 		MemoryContext oldcontext;
324 
325 		funcctx = SRF_FIRSTCALL_INIT();
326 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
327 		/* collect_corrupt_items will verify the relkind */
328 		funcctx->user_fctx = collect_corrupt_items(relid, false, true);
329 		MemoryContextSwitchTo(oldcontext);
330 	}
331 
332 	funcctx = SRF_PERCALL_SETUP();
333 	items = (corrupt_items *) funcctx->user_fctx;
334 
335 	if (items->next < items->count)
336 		SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
337 
338 	SRF_RETURN_DONE(funcctx);
339 }
340 
341 /*
342  * Return the TIDs of not-all-visible tuples in pages marked all-visible
343  * in the visibility map.  We hope no one will ever find any, but there could
344  * be bugs, database corruption, etc.
345  */
346 Datum
pg_check_visible(PG_FUNCTION_ARGS)347 pg_check_visible(PG_FUNCTION_ARGS)
348 {
349 	FuncCallContext *funcctx;
350 	corrupt_items *items;
351 
352 	if (SRF_IS_FIRSTCALL())
353 	{
354 		Oid			relid = PG_GETARG_OID(0);
355 		MemoryContext oldcontext;
356 
357 		funcctx = SRF_FIRSTCALL_INIT();
358 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
359 		/* collect_corrupt_items will verify the relkind */
360 		funcctx->user_fctx = collect_corrupt_items(relid, true, false);
361 		MemoryContextSwitchTo(oldcontext);
362 	}
363 
364 	funcctx = SRF_PERCALL_SETUP();
365 	items = (corrupt_items *) funcctx->user_fctx;
366 
367 	if (items->next < items->count)
368 		SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
369 
370 	SRF_RETURN_DONE(funcctx);
371 }
372 
373 /*
374  * Remove the visibility map fork for a relation.  If there turn out to be
375  * any bugs in the visibility map code that require rebuilding the VM, this
376  * provides users with a way to do it that is cleaner than shutting down the
377  * server and removing files by hand.
378  *
379  * This is a cut-down version of RelationTruncate.
380  */
381 Datum
pg_truncate_visibility_map(PG_FUNCTION_ARGS)382 pg_truncate_visibility_map(PG_FUNCTION_ARGS)
383 {
384 	Oid			relid = PG_GETARG_OID(0);
385 	Relation	rel;
386 	ForkNumber	fork;
387 	BlockNumber block;
388 
389 	rel = relation_open(relid, AccessExclusiveLock);
390 
391 	/* Only some relkinds have a visibility map */
392 	check_relation_relkind(rel);
393 
394 	RelationOpenSmgr(rel);
395 	rel->rd_smgr->smgr_cached_nblocks[VISIBILITYMAP_FORKNUM] = InvalidBlockNumber;
396 
397 	block = visibilitymap_prepare_truncate(rel, 0);
398 	if (BlockNumberIsValid(block))
399 	{
400 		fork = VISIBILITYMAP_FORKNUM;
401 		smgrtruncate(rel->rd_smgr, &fork, 1, &block);
402 	}
403 
404 	if (RelationNeedsWAL(rel))
405 	{
406 		xl_smgr_truncate xlrec;
407 
408 		xlrec.blkno = 0;
409 		xlrec.rnode = rel->rd_node;
410 		xlrec.flags = SMGR_TRUNCATE_VM;
411 
412 		XLogBeginInsert();
413 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
414 
415 		XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
416 	}
417 
418 	/*
419 	 * Release the lock right away, not at commit time.
420 	 *
421 	 * It would be a problem to release the lock prior to commit if this
422 	 * truncate operation sends any transactional invalidation messages. Other
423 	 * backends would potentially be able to lock the relation without
424 	 * processing them in the window of time between when we release the lock
425 	 * here and when we sent the messages at our eventual commit.  However,
426 	 * we're currently only sending a non-transactional smgr invalidation,
427 	 * which will have been posted to shared memory immediately from within
428 	 * smgr_truncate.  Therefore, there should be no race here.
429 	 *
430 	 * The reason why it's desirable to release the lock early here is because
431 	 * of the possibility that someone will need to use this to blow away many
432 	 * visibility map forks at once.  If we can't release the lock until
433 	 * commit time, the transaction doing this will accumulate
434 	 * AccessExclusiveLocks on all of those relations at the same time, which
435 	 * is undesirable. However, if this turns out to be unsafe we may have no
436 	 * choice...
437 	 */
438 	relation_close(rel, AccessExclusiveLock);
439 
440 	/* Nothing to return. */
441 	PG_RETURN_VOID();
442 }
443 
444 /*
445  * Helper function to construct whichever TupleDesc we need for a particular
446  * call.
447  */
448 static TupleDesc
pg_visibility_tupdesc(bool include_blkno,bool include_pd)449 pg_visibility_tupdesc(bool include_blkno, bool include_pd)
450 {
451 	TupleDesc	tupdesc;
452 	AttrNumber	maxattr = 2;
453 	AttrNumber	a = 0;
454 
455 	if (include_blkno)
456 		++maxattr;
457 	if (include_pd)
458 		++maxattr;
459 	tupdesc = CreateTemplateTupleDesc(maxattr);
460 	if (include_blkno)
461 		TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
462 	TupleDescInitEntry(tupdesc, ++a, "all_visible", BOOLOID, -1, 0);
463 	TupleDescInitEntry(tupdesc, ++a, "all_frozen", BOOLOID, -1, 0);
464 	if (include_pd)
465 		TupleDescInitEntry(tupdesc, ++a, "pd_all_visible", BOOLOID, -1, 0);
466 	Assert(a == maxattr);
467 
468 	return BlessTupleDesc(tupdesc);
469 }
470 
471 /*
472  * Collect visibility data about a relation.
473  *
474  * Checks relkind of relid and will throw an error if the relation does not
475  * have a VM.
476  */
477 static vbits *
collect_visibility_data(Oid relid,bool include_pd)478 collect_visibility_data(Oid relid, bool include_pd)
479 {
480 	Relation	rel;
481 	BlockNumber nblocks;
482 	vbits	   *info;
483 	BlockNumber blkno;
484 	Buffer		vmbuffer = InvalidBuffer;
485 	BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
486 
487 	rel = relation_open(relid, AccessShareLock);
488 
489 	/* Only some relkinds have a visibility map */
490 	check_relation_relkind(rel);
491 
492 	nblocks = RelationGetNumberOfBlocks(rel);
493 	info = palloc0(offsetof(vbits, bits) + nblocks);
494 	info->next = 0;
495 	info->count = nblocks;
496 
497 	for (blkno = 0; blkno < nblocks; ++blkno)
498 	{
499 		int32		mapbits;
500 
501 		/* Make sure we are interruptible. */
502 		CHECK_FOR_INTERRUPTS();
503 
504 		/* Get map info. */
505 		mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
506 		if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
507 			info->bits[blkno] |= (1 << 0);
508 		if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
509 			info->bits[blkno] |= (1 << 1);
510 
511 		/*
512 		 * Page-level data requires reading every block, so only get it if the
513 		 * caller needs it.  Use a buffer access strategy, too, to prevent
514 		 * cache-trashing.
515 		 */
516 		if (include_pd)
517 		{
518 			Buffer		buffer;
519 			Page		page;
520 
521 			buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
522 										bstrategy);
523 			LockBuffer(buffer, BUFFER_LOCK_SHARE);
524 
525 			page = BufferGetPage(buffer);
526 			if (PageIsAllVisible(page))
527 				info->bits[blkno] |= (1 << 2);
528 
529 			UnlockReleaseBuffer(buffer);
530 		}
531 	}
532 
533 	/* Clean up. */
534 	if (vmbuffer != InvalidBuffer)
535 		ReleaseBuffer(vmbuffer);
536 	relation_close(rel, AccessShareLock);
537 
538 	return info;
539 }
540 
541 /*
542  * Returns a list of items whose visibility map information does not match
543  * the status of the tuples on the page.
544  *
545  * If all_visible is passed as true, this will include all items which are
546  * on pages marked as all-visible in the visibility map but which do not
547  * seem to in fact be all-visible.
548  *
549  * If all_frozen is passed as true, this will include all items which are
550  * on pages marked as all-frozen but which do not seem to in fact be frozen.
551  *
552  * Checks relkind of relid and will throw an error if the relation does not
553  * have a VM.
554  */
555 static corrupt_items *
collect_corrupt_items(Oid relid,bool all_visible,bool all_frozen)556 collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
557 {
558 	Relation	rel;
559 	BlockNumber nblocks;
560 	corrupt_items *items;
561 	BlockNumber blkno;
562 	Buffer		vmbuffer = InvalidBuffer;
563 	BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
564 	TransactionId OldestXmin = InvalidTransactionId;
565 
566 	rel = relation_open(relid, AccessShareLock);
567 
568 	/* Only some relkinds have a visibility map */
569 	check_relation_relkind(rel);
570 
571 	if (all_visible)
572 		OldestXmin = GetOldestNonRemovableTransactionId(rel);
573 
574 	nblocks = RelationGetNumberOfBlocks(rel);
575 
576 	/*
577 	 * Guess an initial array size. We don't expect many corrupted tuples, so
578 	 * start with a small array.  This function uses the "next" field to track
579 	 * the next offset where we can store an item (which is the same thing as
580 	 * the number of items found so far) and the "count" field to track the
581 	 * number of entries allocated.  We'll repurpose these fields before
582 	 * returning.
583 	 */
584 	items = palloc0(sizeof(corrupt_items));
585 	items->next = 0;
586 	items->count = 64;
587 	items->tids = palloc(items->count * sizeof(ItemPointerData));
588 
589 	/* Loop over every block in the relation. */
590 	for (blkno = 0; blkno < nblocks; ++blkno)
591 	{
592 		bool		check_frozen = false;
593 		bool		check_visible = false;
594 		Buffer		buffer;
595 		Page		page;
596 		OffsetNumber offnum,
597 					maxoff;
598 
599 		/* Make sure we are interruptible. */
600 		CHECK_FOR_INTERRUPTS();
601 
602 		/* Use the visibility map to decide whether to check this page. */
603 		if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
604 			check_frozen = true;
605 		if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
606 			check_visible = true;
607 		if (!check_visible && !check_frozen)
608 			continue;
609 
610 		/* Read and lock the page. */
611 		buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
612 									bstrategy);
613 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
614 
615 		page = BufferGetPage(buffer);
616 		maxoff = PageGetMaxOffsetNumber(page);
617 
618 		/*
619 		 * The visibility map bits might have changed while we were acquiring
620 		 * the page lock.  Recheck to avoid returning spurious results.
621 		 */
622 		if (check_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
623 			check_frozen = false;
624 		if (check_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
625 			check_visible = false;
626 		if (!check_visible && !check_frozen)
627 		{
628 			UnlockReleaseBuffer(buffer);
629 			continue;
630 		}
631 
632 		/* Iterate over each tuple on the page. */
633 		for (offnum = FirstOffsetNumber;
634 			 offnum <= maxoff;
635 			 offnum = OffsetNumberNext(offnum))
636 		{
637 			HeapTupleData tuple;
638 			ItemId		itemid;
639 
640 			itemid = PageGetItemId(page, offnum);
641 
642 			/* Unused or redirect line pointers are of no interest. */
643 			if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
644 				continue;
645 
646 			/* Dead line pointers are neither all-visible nor frozen. */
647 			if (ItemIdIsDead(itemid))
648 			{
649 				ItemPointerSet(&(tuple.t_self), blkno, offnum);
650 				record_corrupt_item(items, &tuple.t_self);
651 				continue;
652 			}
653 
654 			/* Initialize a HeapTupleData structure for checks below. */
655 			ItemPointerSet(&(tuple.t_self), blkno, offnum);
656 			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
657 			tuple.t_len = ItemIdGetLength(itemid);
658 			tuple.t_tableOid = relid;
659 
660 			/*
661 			 * If we're checking whether the page is all-visible, we expect
662 			 * the tuple to be all-visible.
663 			 */
664 			if (check_visible &&
665 				!tuple_all_visible(&tuple, OldestXmin, buffer))
666 			{
667 				TransactionId RecomputedOldestXmin;
668 
669 				/*
670 				 * Time has passed since we computed OldestXmin, so it's
671 				 * possible that this tuple is all-visible in reality even
672 				 * though it doesn't appear so based on our
673 				 * previously-computed value.  Let's compute a new value so we
674 				 * can be certain whether there is a problem.
675 				 *
676 				 * From a concurrency point of view, it sort of sucks to
677 				 * retake ProcArrayLock here while we're holding the buffer
678 				 * exclusively locked, but it should be safe against
679 				 * deadlocks, because surely
680 				 * GetOldestNonRemovableTransactionId() should never take a
681 				 * buffer lock. And this shouldn't happen often, so it's worth
682 				 * being careful so as to avoid false positives.
683 				 */
684 				RecomputedOldestXmin = GetOldestNonRemovableTransactionId(rel);
685 
686 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
687 					record_corrupt_item(items, &tuple.t_self);
688 				else
689 				{
690 					OldestXmin = RecomputedOldestXmin;
691 					if (!tuple_all_visible(&tuple, OldestXmin, buffer))
692 						record_corrupt_item(items, &tuple.t_self);
693 				}
694 			}
695 
696 			/*
697 			 * If we're checking whether the page is all-frozen, we expect the
698 			 * tuple to be in a state where it will never need freezing.
699 			 */
700 			if (check_frozen)
701 			{
702 				if (heap_tuple_needs_eventual_freeze(tuple.t_data))
703 					record_corrupt_item(items, &tuple.t_self);
704 			}
705 		}
706 
707 		UnlockReleaseBuffer(buffer);
708 	}
709 
710 	/* Clean up. */
711 	if (vmbuffer != InvalidBuffer)
712 		ReleaseBuffer(vmbuffer);
713 	relation_close(rel, AccessShareLock);
714 
715 	/*
716 	 * Before returning, repurpose the fields to match caller's expectations.
717 	 * next is now the next item that should be read (rather than written) and
718 	 * count is now the number of items we wrote (rather than the number we
719 	 * allocated).
720 	 */
721 	items->count = items->next;
722 	items->next = 0;
723 
724 	return items;
725 }
726 
727 /*
728  * Remember one corrupt item.
729  */
730 static void
record_corrupt_item(corrupt_items * items,ItemPointer tid)731 record_corrupt_item(corrupt_items *items, ItemPointer tid)
732 {
733 	/* enlarge output array if needed. */
734 	if (items->next >= items->count)
735 	{
736 		items->count *= 2;
737 		items->tids = repalloc(items->tids,
738 							   items->count * sizeof(ItemPointerData));
739 	}
740 	/* and add the new item */
741 	items->tids[items->next++] = *tid;
742 }
743 
744 /*
745  * Check whether a tuple is all-visible relative to a given OldestXmin value.
746  * The buffer should contain the tuple and should be locked and pinned.
747  */
748 static bool
tuple_all_visible(HeapTuple tup,TransactionId OldestXmin,Buffer buffer)749 tuple_all_visible(HeapTuple tup, TransactionId OldestXmin, Buffer buffer)
750 {
751 	HTSV_Result state;
752 	TransactionId xmin;
753 
754 	state = HeapTupleSatisfiesVacuum(tup, OldestXmin, buffer);
755 	if (state != HEAPTUPLE_LIVE)
756 		return false;			/* all-visible implies live */
757 
758 	/*
759 	 * Neither lazy_scan_heap nor heap_page_is_all_visible will mark a page
760 	 * all-visible unless every tuple is hinted committed. However, those hint
761 	 * bits could be lost after a crash, so we can't be certain that they'll
762 	 * be set here.  So just check the xmin.
763 	 */
764 
765 	xmin = HeapTupleHeaderGetXmin(tup->t_data);
766 	if (!TransactionIdPrecedes(xmin, OldestXmin))
767 		return false;			/* xmin not old enough for all to see */
768 
769 	return true;
770 }
771 
772 /*
773  * check_relation_relkind - convenience routine to check that relation
774  * is of the relkind supported by the callers
775  */
776 static void
check_relation_relkind(Relation rel)777 check_relation_relkind(Relation rel)
778 {
779 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
780 		rel->rd_rel->relkind != RELKIND_MATVIEW &&
781 		rel->rd_rel->relkind != RELKIND_TOASTVALUE)
782 		ereport(ERROR,
783 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
784 				 errmsg("\"%s\" is not a table, materialized view, or TOAST table",
785 						RelationGetRelationName(rel))));
786 }
787