1 /*-------------------------------------------------------------------------
2  *
3  * pg_visibility.c
4  *	  display visibility map information and page-level visibility bits
5  *
6  * Copyright (c) 2016, PostgreSQL Global Development Group
7  *
8  *	  contrib/pg_visibility/pg_visibility.c
9  *-------------------------------------------------------------------------
10  */
11 #include "postgres.h"
12 
13 #include "access/htup_details.h"
14 #include "access/visibilitymap.h"
15 #include "catalog/pg_type.h"
16 #include "catalog/storage_xlog.h"
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 #include "storage/bufmgr.h"
20 #include "storage/procarray.h"
21 #include "storage/smgr.h"
22 #include "utils/rel.h"
23 
24 PG_MODULE_MAGIC;
25 
26 typedef struct vbits
27 {
28 	BlockNumber next;
29 	BlockNumber count;
30 	uint8		bits[FLEXIBLE_ARRAY_MEMBER];
31 } vbits;
32 
33 typedef struct corrupt_items
34 {
35 	BlockNumber next;
36 	BlockNumber count;
37 	ItemPointer tids;
38 } corrupt_items;
39 
40 PG_FUNCTION_INFO_V1(pg_visibility_map);
41 PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
42 PG_FUNCTION_INFO_V1(pg_visibility);
43 PG_FUNCTION_INFO_V1(pg_visibility_rel);
44 PG_FUNCTION_INFO_V1(pg_visibility_map_summary);
45 PG_FUNCTION_INFO_V1(pg_check_frozen);
46 PG_FUNCTION_INFO_V1(pg_check_visible);
47 PG_FUNCTION_INFO_V1(pg_truncate_visibility_map);
48 
49 static TupleDesc pg_visibility_tupdesc(bool include_blkno, bool include_pd);
50 static vbits *collect_visibility_data(Oid relid, bool include_pd);
51 static corrupt_items *collect_corrupt_items(Oid relid, bool all_visible,
52 					  bool all_frozen);
53 static void record_corrupt_item(corrupt_items *items, ItemPointer tid);
54 static bool tuple_all_visible(HeapTuple tup, TransactionId OldestXmin,
55 				  Buffer buffer);
56 
57 /*
58  * Visibility map information for a single block of a relation.
59  *
60  * Note: the VM code will silently return zeroes for pages past the end
61  * of the map, so we allow probes up to MaxBlockNumber regardless of the
62  * actual relation size.
63  */
64 Datum
pg_visibility_map(PG_FUNCTION_ARGS)65 pg_visibility_map(PG_FUNCTION_ARGS)
66 {
67 	Oid			relid = PG_GETARG_OID(0);
68 	int64		blkno = PG_GETARG_INT64(1);
69 	int32		mapbits;
70 	Relation	rel;
71 	Buffer		vmbuffer = InvalidBuffer;
72 	TupleDesc	tupdesc;
73 	Datum		values[2];
74 	bool		nulls[2];
75 
76 	rel = relation_open(relid, AccessShareLock);
77 
78 	if (blkno < 0 || blkno > MaxBlockNumber)
79 		ereport(ERROR,
80 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
81 				 errmsg("invalid block number")));
82 
83 	tupdesc = pg_visibility_tupdesc(false, false);
84 	MemSet(nulls, 0, sizeof(nulls));
85 
86 	mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
87 	if (vmbuffer != InvalidBuffer)
88 		ReleaseBuffer(vmbuffer);
89 	values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
90 	values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
91 
92 	relation_close(rel, AccessShareLock);
93 
94 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
95 }
96 
97 /*
98  * Visibility map information for a single block of a relation, plus the
99  * page-level information for the same block.
100  */
101 Datum
pg_visibility(PG_FUNCTION_ARGS)102 pg_visibility(PG_FUNCTION_ARGS)
103 {
104 	Oid			relid = PG_GETARG_OID(0);
105 	int64		blkno = PG_GETARG_INT64(1);
106 	int32		mapbits;
107 	Relation	rel;
108 	Buffer		vmbuffer = InvalidBuffer;
109 	Buffer		buffer;
110 	Page		page;
111 	TupleDesc	tupdesc;
112 	Datum		values[3];
113 	bool		nulls[3];
114 
115 	rel = relation_open(relid, AccessShareLock);
116 
117 	if (blkno < 0 || blkno > MaxBlockNumber)
118 		ereport(ERROR,
119 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
120 				 errmsg("invalid block number")));
121 
122 	tupdesc = pg_visibility_tupdesc(false, true);
123 	MemSet(nulls, 0, sizeof(nulls));
124 
125 	mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
126 	if (vmbuffer != InvalidBuffer)
127 		ReleaseBuffer(vmbuffer);
128 	values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
129 	values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
130 
131 	/* Here we have to explicitly check rel size ... */
132 	if (blkno < RelationGetNumberOfBlocks(rel))
133 	{
134 		buffer = ReadBuffer(rel, blkno);
135 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
136 
137 		page = BufferGetPage(buffer);
138 		values[2] = BoolGetDatum(PageIsAllVisible(page));
139 
140 		UnlockReleaseBuffer(buffer);
141 	}
142 	else
143 	{
144 		/* As with the vismap, silently return 0 for pages past EOF */
145 		values[2] = BoolGetDatum(false);
146 	}
147 
148 	relation_close(rel, AccessShareLock);
149 
150 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
151 }
152 
153 /*
154  * Visibility map information for every block in a relation.
155  */
156 Datum
pg_visibility_map_rel(PG_FUNCTION_ARGS)157 pg_visibility_map_rel(PG_FUNCTION_ARGS)
158 {
159 	FuncCallContext *funcctx;
160 	vbits	   *info;
161 
162 	if (SRF_IS_FIRSTCALL())
163 	{
164 		Oid			relid = PG_GETARG_OID(0);
165 		MemoryContext oldcontext;
166 
167 		funcctx = SRF_FIRSTCALL_INIT();
168 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
169 		funcctx->tuple_desc = pg_visibility_tupdesc(true, false);
170 		funcctx->user_fctx = collect_visibility_data(relid, false);
171 		MemoryContextSwitchTo(oldcontext);
172 	}
173 
174 	funcctx = SRF_PERCALL_SETUP();
175 	info = (vbits *) funcctx->user_fctx;
176 
177 	if (info->next < info->count)
178 	{
179 		Datum		values[3];
180 		bool		nulls[3];
181 		HeapTuple	tuple;
182 
183 		MemSet(nulls, 0, sizeof(nulls));
184 		values[0] = Int64GetDatum(info->next);
185 		values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
186 		values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
187 		info->next++;
188 
189 		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
190 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
191 	}
192 
193 	SRF_RETURN_DONE(funcctx);
194 }
195 
196 /*
197  * Visibility map information for every block in a relation, plus the page
198  * level information for each block.
199  */
200 Datum
pg_visibility_rel(PG_FUNCTION_ARGS)201 pg_visibility_rel(PG_FUNCTION_ARGS)
202 {
203 	FuncCallContext *funcctx;
204 	vbits	   *info;
205 
206 	if (SRF_IS_FIRSTCALL())
207 	{
208 		Oid			relid = PG_GETARG_OID(0);
209 		MemoryContext oldcontext;
210 
211 		funcctx = SRF_FIRSTCALL_INIT();
212 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
213 		funcctx->tuple_desc = pg_visibility_tupdesc(true, true);
214 		funcctx->user_fctx = collect_visibility_data(relid, true);
215 		MemoryContextSwitchTo(oldcontext);
216 	}
217 
218 	funcctx = SRF_PERCALL_SETUP();
219 	info = (vbits *) funcctx->user_fctx;
220 
221 	if (info->next < info->count)
222 	{
223 		Datum		values[4];
224 		bool		nulls[4];
225 		HeapTuple	tuple;
226 
227 		MemSet(nulls, 0, sizeof(nulls));
228 		values[0] = Int64GetDatum(info->next);
229 		values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
230 		values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
231 		values[3] = BoolGetDatum((info->bits[info->next] & (1 << 2)) != 0);
232 		info->next++;
233 
234 		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
235 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
236 	}
237 
238 	SRF_RETURN_DONE(funcctx);
239 }
240 
241 /*
242  * Count the number of all-visible and all-frozen pages in the visibility
243  * map for a particular relation.
244  */
245 Datum
pg_visibility_map_summary(PG_FUNCTION_ARGS)246 pg_visibility_map_summary(PG_FUNCTION_ARGS)
247 {
248 	Oid			relid = PG_GETARG_OID(0);
249 	Relation	rel;
250 	BlockNumber nblocks;
251 	BlockNumber blkno;
252 	Buffer		vmbuffer = InvalidBuffer;
253 	int64		all_visible = 0;
254 	int64		all_frozen = 0;
255 	TupleDesc	tupdesc;
256 	Datum		values[2];
257 	bool		nulls[2];
258 
259 	rel = relation_open(relid, AccessShareLock);
260 	nblocks = RelationGetNumberOfBlocks(rel);
261 
262 	for (blkno = 0; blkno < nblocks; ++blkno)
263 	{
264 		int32		mapbits;
265 
266 		/* Make sure we are interruptible. */
267 		CHECK_FOR_INTERRUPTS();
268 
269 		/* Get map info. */
270 		mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
271 		if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
272 			++all_visible;
273 		if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
274 			++all_frozen;
275 	}
276 
277 	/* Clean up. */
278 	if (vmbuffer != InvalidBuffer)
279 		ReleaseBuffer(vmbuffer);
280 	relation_close(rel, AccessShareLock);
281 
282 	tupdesc = CreateTemplateTupleDesc(2, false);
283 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "all_visible", INT8OID, -1, 0);
284 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "all_frozen", INT8OID, -1, 0);
285 	tupdesc = BlessTupleDesc(tupdesc);
286 
287 	MemSet(nulls, 0, sizeof(nulls));
288 	values[0] = Int64GetDatum(all_visible);
289 	values[1] = Int64GetDatum(all_frozen);
290 
291 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
292 }
293 
294 /*
295  * Return the TIDs of non-frozen tuples present in pages marked all-frozen
296  * in the visibility map.  We hope no one will ever find any, but there could
297  * be bugs, database corruption, etc.
298  */
299 Datum
pg_check_frozen(PG_FUNCTION_ARGS)300 pg_check_frozen(PG_FUNCTION_ARGS)
301 {
302 	FuncCallContext *funcctx;
303 	corrupt_items *items;
304 
305 	if (SRF_IS_FIRSTCALL())
306 	{
307 		Oid			relid = PG_GETARG_OID(0);
308 		MemoryContext oldcontext;
309 
310 		funcctx = SRF_FIRSTCALL_INIT();
311 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
312 		funcctx->user_fctx = collect_corrupt_items(relid, false, true);
313 		MemoryContextSwitchTo(oldcontext);
314 	}
315 
316 	funcctx = SRF_PERCALL_SETUP();
317 	items = (corrupt_items *) funcctx->user_fctx;
318 
319 	if (items->next < items->count)
320 		SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
321 
322 	SRF_RETURN_DONE(funcctx);
323 }
324 
325 /*
326  * Return the TIDs of not-all-visible tuples in pages marked all-visible
327  * in the visibility map.  We hope no one will ever find any, but there could
328  * be bugs, database corruption, etc.
329  */
330 Datum
pg_check_visible(PG_FUNCTION_ARGS)331 pg_check_visible(PG_FUNCTION_ARGS)
332 {
333 	FuncCallContext *funcctx;
334 	corrupt_items *items;
335 
336 	if (SRF_IS_FIRSTCALL())
337 	{
338 		Oid			relid = PG_GETARG_OID(0);
339 		MemoryContext oldcontext;
340 
341 		funcctx = SRF_FIRSTCALL_INIT();
342 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
343 		funcctx->user_fctx = collect_corrupt_items(relid, true, false);
344 		MemoryContextSwitchTo(oldcontext);
345 	}
346 
347 	funcctx = SRF_PERCALL_SETUP();
348 	items = (corrupt_items *) funcctx->user_fctx;
349 
350 	if (items->next < items->count)
351 		SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
352 
353 	SRF_RETURN_DONE(funcctx);
354 }
355 
356 /*
357  * Remove the visibility map fork for a relation.  If there turn out to be
358  * any bugs in the visibility map code that require rebuilding the VM, this
359  * provides users with a way to do it that is cleaner than shutting down the
360  * server and removing files by hand.
361  *
362  * This is a cut-down version of RelationTruncate.
363  */
364 Datum
pg_truncate_visibility_map(PG_FUNCTION_ARGS)365 pg_truncate_visibility_map(PG_FUNCTION_ARGS)
366 {
367 	Oid			relid = PG_GETARG_OID(0);
368 	Relation	rel;
369 
370 	rel = relation_open(relid, AccessExclusiveLock);
371 
372 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
373 		rel->rd_rel->relkind != RELKIND_MATVIEW &&
374 		rel->rd_rel->relkind != RELKIND_TOASTVALUE)
375 		ereport(ERROR,
376 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
377 		   errmsg("\"%s\" is not a table, materialized view, or TOAST table",
378 				  RelationGetRelationName(rel))));
379 
380 	RelationOpenSmgr(rel);
381 	rel->rd_smgr->smgr_vm_nblocks = InvalidBlockNumber;
382 
383 	visibilitymap_truncate(rel, 0);
384 
385 	if (RelationNeedsWAL(rel))
386 	{
387 		xl_smgr_truncate xlrec;
388 
389 		xlrec.blkno = 0;
390 		xlrec.rnode = rel->rd_node;
391 		xlrec.flags = SMGR_TRUNCATE_VM;
392 
393 		XLogBeginInsert();
394 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
395 
396 		XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
397 	}
398 
399 	/*
400 	 * Release the lock right away, not at commit time.
401 	 *
402 	 * It would be a problem to release the lock prior to commit if this
403 	 * truncate operation sends any transactional invalidation messages. Other
404 	 * backends would potentially be able to lock the relation without
405 	 * processing them in the window of time between when we release the lock
406 	 * here and when we sent the messages at our eventual commit.  However,
407 	 * we're currently only sending a non-transactional smgr invalidation,
408 	 * which will have been posted to shared memory immediately from within
409 	 * visibilitymap_truncate.  Therefore, there should be no race here.
410 	 *
411 	 * The reason why it's desirable to release the lock early here is because
412 	 * of the possibility that someone will need to use this to blow away many
413 	 * visibility map forks at once.  If we can't release the lock until
414 	 * commit time, the transaction doing this will accumulate
415 	 * AccessExclusiveLocks on all of those relations at the same time, which
416 	 * is undesirable. However, if this turns out to be unsafe we may have no
417 	 * choice...
418 	 */
419 	relation_close(rel, AccessExclusiveLock);
420 
421 	/* Nothing to return. */
422 	PG_RETURN_VOID();
423 }
424 
425 /*
426  * Helper function to construct whichever TupleDesc we need for a particular
427  * call.
428  */
429 static TupleDesc
pg_visibility_tupdesc(bool include_blkno,bool include_pd)430 pg_visibility_tupdesc(bool include_blkno, bool include_pd)
431 {
432 	TupleDesc	tupdesc;
433 	AttrNumber	maxattr = 2;
434 	AttrNumber	a = 0;
435 
436 	if (include_blkno)
437 		++maxattr;
438 	if (include_pd)
439 		++maxattr;
440 	tupdesc = CreateTemplateTupleDesc(maxattr, false);
441 	if (include_blkno)
442 		TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
443 	TupleDescInitEntry(tupdesc, ++a, "all_visible", BOOLOID, -1, 0);
444 	TupleDescInitEntry(tupdesc, ++a, "all_frozen", BOOLOID, -1, 0);
445 	if (include_pd)
446 		TupleDescInitEntry(tupdesc, ++a, "pd_all_visible", BOOLOID, -1, 0);
447 	Assert(a == maxattr);
448 
449 	return BlessTupleDesc(tupdesc);
450 }
451 
452 /*
453  * Collect visibility data about a relation.
454  */
455 static vbits *
collect_visibility_data(Oid relid,bool include_pd)456 collect_visibility_data(Oid relid, bool include_pd)
457 {
458 	Relation	rel;
459 	BlockNumber nblocks;
460 	vbits	   *info;
461 	BlockNumber blkno;
462 	Buffer		vmbuffer = InvalidBuffer;
463 	BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
464 
465 	rel = relation_open(relid, AccessShareLock);
466 
467 	nblocks = RelationGetNumberOfBlocks(rel);
468 	info = palloc0(offsetof(vbits, bits) +nblocks);
469 	info->next = 0;
470 	info->count = nblocks;
471 
472 	for (blkno = 0; blkno < nblocks; ++blkno)
473 	{
474 		int32		mapbits;
475 
476 		/* Make sure we are interruptible. */
477 		CHECK_FOR_INTERRUPTS();
478 
479 		/* Get map info. */
480 		mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
481 		if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
482 			info->bits[blkno] |= (1 << 0);
483 		if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
484 			info->bits[blkno] |= (1 << 1);
485 
486 		/*
487 		 * Page-level data requires reading every block, so only get it if the
488 		 * caller needs it.  Use a buffer access strategy, too, to prevent
489 		 * cache-trashing.
490 		 */
491 		if (include_pd)
492 		{
493 			Buffer		buffer;
494 			Page		page;
495 
496 			buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
497 										bstrategy);
498 			LockBuffer(buffer, BUFFER_LOCK_SHARE);
499 
500 			page = BufferGetPage(buffer);
501 			if (PageIsAllVisible(page))
502 				info->bits[blkno] |= (1 << 2);
503 
504 			UnlockReleaseBuffer(buffer);
505 		}
506 	}
507 
508 	/* Clean up. */
509 	if (vmbuffer != InvalidBuffer)
510 		ReleaseBuffer(vmbuffer);
511 	relation_close(rel, AccessShareLock);
512 
513 	return info;
514 }
515 
516 /*
517  * Returns a list of items whose visibility map information does not match
518  * the status of the tuples on the page.
519  *
520  * If all_visible is passed as true, this will include all items which are
521  * on pages marked as all-visible in the visibility map but which do not
522  * seem to in fact be all-visible.
523  *
524  * If all_frozen is passed as true, this will include all items which are
525  * on pages marked as all-frozen but which do not seem to in fact be frozen.
526  */
527 static corrupt_items *
collect_corrupt_items(Oid relid,bool all_visible,bool all_frozen)528 collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
529 {
530 	Relation	rel;
531 	BlockNumber nblocks;
532 	corrupt_items *items;
533 	BlockNumber blkno;
534 	Buffer		vmbuffer = InvalidBuffer;
535 	BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
536 	TransactionId OldestXmin = InvalidTransactionId;
537 
538 	if (all_visible)
539 	{
540 		/* Don't pass rel; that will fail in recovery. */
541 		OldestXmin = GetOldestXmin(NULL, true);
542 	}
543 
544 	rel = relation_open(relid, AccessShareLock);
545 
546 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
547 		rel->rd_rel->relkind != RELKIND_MATVIEW &&
548 		rel->rd_rel->relkind != RELKIND_TOASTVALUE)
549 		ereport(ERROR,
550 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
551 		   errmsg("\"%s\" is not a table, materialized view, or TOAST table",
552 				  RelationGetRelationName(rel))));
553 
554 	nblocks = RelationGetNumberOfBlocks(rel);
555 
556 	/*
557 	 * Guess an initial array size. We don't expect many corrupted tuples, so
558 	 * start with a small array.  This function uses the "next" field to track
559 	 * the next offset where we can store an item (which is the same thing as
560 	 * the number of items found so far) and the "count" field to track the
561 	 * number of entries allocated.  We'll repurpose these fields before
562 	 * returning.
563 	 */
564 	items = palloc0(sizeof(corrupt_items));
565 	items->next = 0;
566 	items->count = 64;
567 	items->tids = palloc(items->count * sizeof(ItemPointerData));
568 
569 	/* Loop over every block in the relation. */
570 	for (blkno = 0; blkno < nblocks; ++blkno)
571 	{
572 		bool		check_frozen = false;
573 		bool		check_visible = false;
574 		Buffer		buffer;
575 		Page		page;
576 		OffsetNumber offnum,
577 					maxoff;
578 
579 		/* Make sure we are interruptible. */
580 		CHECK_FOR_INTERRUPTS();
581 
582 		/* Use the visibility map to decide whether to check this page. */
583 		if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
584 			check_frozen = true;
585 		if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
586 			check_visible = true;
587 		if (!check_visible && !check_frozen)
588 			continue;
589 
590 		/* Read and lock the page. */
591 		buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
592 									bstrategy);
593 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
594 
595 		page = BufferGetPage(buffer);
596 		maxoff = PageGetMaxOffsetNumber(page);
597 
598 		/*
599 		 * The visibility map bits might have changed while we were acquiring
600 		 * the page lock.  Recheck to avoid returning spurious results.
601 		 */
602 		if (check_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
603 			check_frozen = false;
604 		if (check_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
605 			check_visible = false;
606 		if (!check_visible && !check_frozen)
607 		{
608 			UnlockReleaseBuffer(buffer);
609 			continue;
610 		}
611 
612 		/* Iterate over each tuple on the page. */
613 		for (offnum = FirstOffsetNumber;
614 			 offnum <= maxoff;
615 			 offnum = OffsetNumberNext(offnum))
616 		{
617 			HeapTupleData tuple;
618 			ItemId		itemid;
619 
620 			itemid = PageGetItemId(page, offnum);
621 
622 			/* Unused or redirect line pointers are of no interest. */
623 			if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
624 				continue;
625 
626 			/* Dead line pointers are neither all-visible nor frozen. */
627 			if (ItemIdIsDead(itemid))
628 			{
629 				ItemPointerSet(&(tuple.t_self), blkno, offnum);
630 				record_corrupt_item(items, &tuple.t_self);
631 				continue;
632 			}
633 
634 			/* Initialize a HeapTupleData structure for checks below. */
635 			ItemPointerSet(&(tuple.t_self), blkno, offnum);
636 			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
637 			tuple.t_len = ItemIdGetLength(itemid);
638 			tuple.t_tableOid = relid;
639 
640 			/*
641 			 * If we're checking whether the page is all-visible, we expect
642 			 * the tuple to be all-visible.
643 			 */
644 			if (check_visible &&
645 				!tuple_all_visible(&tuple, OldestXmin, buffer))
646 			{
647 				TransactionId RecomputedOldestXmin;
648 
649 				/*
650 				 * Time has passed since we computed OldestXmin, so it's
651 				 * possible that this tuple is all-visible in reality even
652 				 * though it doesn't appear so based on our
653 				 * previously-computed value.  Let's compute a new value so we
654 				 * can be certain whether there is a problem.
655 				 *
656 				 * From a concurrency point of view, it sort of sucks to
657 				 * retake ProcArrayLock here while we're holding the buffer
658 				 * exclusively locked, but it should be safe against
659 				 * deadlocks, because surely GetOldestXmin() should never take
660 				 * a buffer lock. And this shouldn't happen often, so it's
661 				 * worth being careful so as to avoid false positives.
662 				 */
663 				RecomputedOldestXmin = GetOldestXmin(NULL, true);
664 
665 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
666 					record_corrupt_item(items, &tuple.t_self);
667 				else
668 				{
669 					OldestXmin = RecomputedOldestXmin;
670 					if (!tuple_all_visible(&tuple, OldestXmin, buffer))
671 						record_corrupt_item(items, &tuple.t_self);
672 				}
673 			}
674 
675 			/*
676 			 * If we're checking whether the page is all-frozen, we expect the
677 			 * tuple to be in a state where it will never need freezing.
678 			 */
679 			if (check_frozen)
680 			{
681 				if (heap_tuple_needs_eventual_freeze(tuple.t_data))
682 					record_corrupt_item(items, &tuple.t_self);
683 			}
684 		}
685 
686 		UnlockReleaseBuffer(buffer);
687 	}
688 
689 	/* Clean up. */
690 	if (vmbuffer != InvalidBuffer)
691 		ReleaseBuffer(vmbuffer);
692 	relation_close(rel, AccessShareLock);
693 
694 	/*
695 	 * Before returning, repurpose the fields to match caller's expectations.
696 	 * next is now the next item that should be read (rather than written) and
697 	 * count is now the number of items we wrote (rather than the number we
698 	 * allocated).
699 	 */
700 	items->count = items->next;
701 	items->next = 0;
702 
703 	return items;
704 }
705 
706 /*
707  * Remember one corrupt item.
708  */
709 static void
record_corrupt_item(corrupt_items * items,ItemPointer tid)710 record_corrupt_item(corrupt_items *items, ItemPointer tid)
711 {
712 	/* enlarge output array if needed. */
713 	if (items->next >= items->count)
714 	{
715 		items->count *= 2;
716 		items->tids = repalloc(items->tids,
717 							   items->count * sizeof(ItemPointerData));
718 	}
719 	/* and add the new item */
720 	items->tids[items->next++] = *tid;
721 }
722 
723 /*
724  * Check whether a tuple is all-visible relative to a given OldestXmin value.
725  * The buffer should contain the tuple and should be locked and pinned.
726  */
727 static bool
tuple_all_visible(HeapTuple tup,TransactionId OldestXmin,Buffer buffer)728 tuple_all_visible(HeapTuple tup, TransactionId OldestXmin, Buffer buffer)
729 {
730 	HTSV_Result state;
731 	TransactionId xmin;
732 
733 	state = HeapTupleSatisfiesVacuum(tup, OldestXmin, buffer);
734 	if (state != HEAPTUPLE_LIVE)
735 		return false;			/* all-visible implies live */
736 
737 	/*
738 	 * Neither lazy_scan_heap nor heap_page_is_all_visible will mark a page
739 	 * all-visible unless every tuple is hinted committed. However, those hint
740 	 * bits could be lost after a crash, so we can't be certain that they'll
741 	 * be set here.  So just check the xmin.
742 	 */
743 
744 	xmin = HeapTupleHeaderGetXmin(tup->t_data);
745 	if (!TransactionIdPrecedes(xmin, OldestXmin))
746 		return false;			/* xmin not old enough for all to see */
747 
748 	return true;
749 }
750