1 /*-------------------------------------------------------------------------
2  *
3  * pg_visibility.c
4  *	  display visibility map information and page-level visibility bits
5  *
6  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
7  *
8  *	  contrib/pg_visibility/pg_visibility.c
9  *-------------------------------------------------------------------------
10  */
11 #include "postgres.h"
12 
13 #include "access/htup_details.h"
14 #include "access/visibilitymap.h"
15 #include "catalog/pg_type.h"
16 #include "catalog/storage_xlog.h"
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 #include "storage/bufmgr.h"
20 #include "storage/procarray.h"
21 #include "storage/smgr.h"
22 #include "utils/rel.h"
23 
24 PG_MODULE_MAGIC;
25 
26 typedef struct vbits
27 {
28 	BlockNumber next;
29 	BlockNumber count;
30 	uint8		bits[FLEXIBLE_ARRAY_MEMBER];
31 } vbits;
32 
33 typedef struct corrupt_items
34 {
35 	BlockNumber next;
36 	BlockNumber count;
37 	ItemPointer tids;
38 } corrupt_items;
39 
40 PG_FUNCTION_INFO_V1(pg_visibility_map);
41 PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
42 PG_FUNCTION_INFO_V1(pg_visibility);
43 PG_FUNCTION_INFO_V1(pg_visibility_rel);
44 PG_FUNCTION_INFO_V1(pg_visibility_map_summary);
45 PG_FUNCTION_INFO_V1(pg_check_frozen);
46 PG_FUNCTION_INFO_V1(pg_check_visible);
47 PG_FUNCTION_INFO_V1(pg_truncate_visibility_map);
48 
49 static TupleDesc pg_visibility_tupdesc(bool include_blkno, bool include_pd);
50 static vbits *collect_visibility_data(Oid relid, bool include_pd);
51 static corrupt_items *collect_corrupt_items(Oid relid, bool all_visible,
52 					  bool all_frozen);
53 static void record_corrupt_item(corrupt_items *items, ItemPointer tid);
54 static bool tuple_all_visible(HeapTuple tup, TransactionId OldestXmin,
55 				  Buffer buffer);
56 static void check_relation_relkind(Relation rel);
57 
58 /*
59  * Visibility map information for a single block of a relation.
60  *
61  * Note: the VM code will silently return zeroes for pages past the end
62  * of the map, so we allow probes up to MaxBlockNumber regardless of the
63  * actual relation size.
64  */
65 Datum
pg_visibility_map(PG_FUNCTION_ARGS)66 pg_visibility_map(PG_FUNCTION_ARGS)
67 {
68 	Oid			relid = PG_GETARG_OID(0);
69 	int64		blkno = PG_GETARG_INT64(1);
70 	int32		mapbits;
71 	Relation	rel;
72 	Buffer		vmbuffer = InvalidBuffer;
73 	TupleDesc	tupdesc;
74 	Datum		values[2];
75 	bool		nulls[2];
76 
77 	rel = relation_open(relid, AccessShareLock);
78 
79 	/* Only some relkinds have a visibility map */
80 	check_relation_relkind(rel);
81 
82 	if (blkno < 0 || blkno > MaxBlockNumber)
83 		ereport(ERROR,
84 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 				 errmsg("invalid block number")));
86 
87 	tupdesc = pg_visibility_tupdesc(false, false);
88 	MemSet(nulls, 0, sizeof(nulls));
89 
90 	mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
91 	if (vmbuffer != InvalidBuffer)
92 		ReleaseBuffer(vmbuffer);
93 	values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
94 	values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
95 
96 	relation_close(rel, AccessShareLock);
97 
98 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
99 }
100 
101 /*
102  * Visibility map information for a single block of a relation, plus the
103  * page-level information for the same block.
104  */
105 Datum
pg_visibility(PG_FUNCTION_ARGS)106 pg_visibility(PG_FUNCTION_ARGS)
107 {
108 	Oid			relid = PG_GETARG_OID(0);
109 	int64		blkno = PG_GETARG_INT64(1);
110 	int32		mapbits;
111 	Relation	rel;
112 	Buffer		vmbuffer = InvalidBuffer;
113 	Buffer		buffer;
114 	Page		page;
115 	TupleDesc	tupdesc;
116 	Datum		values[3];
117 	bool		nulls[3];
118 
119 	rel = relation_open(relid, AccessShareLock);
120 
121 	/* Only some relkinds have a visibility map */
122 	check_relation_relkind(rel);
123 
124 	if (blkno < 0 || blkno > MaxBlockNumber)
125 		ereport(ERROR,
126 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
127 				 errmsg("invalid block number")));
128 
129 	tupdesc = pg_visibility_tupdesc(false, true);
130 	MemSet(nulls, 0, sizeof(nulls));
131 
132 	mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
133 	if (vmbuffer != InvalidBuffer)
134 		ReleaseBuffer(vmbuffer);
135 	values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
136 	values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
137 
138 	/* Here we have to explicitly check rel size ... */
139 	if (blkno < RelationGetNumberOfBlocks(rel))
140 	{
141 		buffer = ReadBuffer(rel, blkno);
142 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
143 
144 		page = BufferGetPage(buffer);
145 		values[2] = BoolGetDatum(PageIsAllVisible(page));
146 
147 		UnlockReleaseBuffer(buffer);
148 	}
149 	else
150 	{
151 		/* As with the vismap, silently return 0 for pages past EOF */
152 		values[2] = BoolGetDatum(false);
153 	}
154 
155 	relation_close(rel, AccessShareLock);
156 
157 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
158 }
159 
160 /*
161  * Visibility map information for every block in a relation.
162  */
163 Datum
pg_visibility_map_rel(PG_FUNCTION_ARGS)164 pg_visibility_map_rel(PG_FUNCTION_ARGS)
165 {
166 	FuncCallContext *funcctx;
167 	vbits	   *info;
168 
169 	if (SRF_IS_FIRSTCALL())
170 	{
171 		Oid			relid = PG_GETARG_OID(0);
172 		MemoryContext oldcontext;
173 
174 		funcctx = SRF_FIRSTCALL_INIT();
175 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
176 		funcctx->tuple_desc = pg_visibility_tupdesc(true, false);
177 		/* collect_visibility_data will verify the relkind */
178 		funcctx->user_fctx = collect_visibility_data(relid, false);
179 		MemoryContextSwitchTo(oldcontext);
180 	}
181 
182 	funcctx = SRF_PERCALL_SETUP();
183 	info = (vbits *) funcctx->user_fctx;
184 
185 	if (info->next < info->count)
186 	{
187 		Datum		values[3];
188 		bool		nulls[3];
189 		HeapTuple	tuple;
190 
191 		MemSet(nulls, 0, sizeof(nulls));
192 		values[0] = Int64GetDatum(info->next);
193 		values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
194 		values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
195 		info->next++;
196 
197 		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
198 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
199 	}
200 
201 	SRF_RETURN_DONE(funcctx);
202 }
203 
204 /*
205  * Visibility map information for every block in a relation, plus the page
206  * level information for each block.
207  */
208 Datum
pg_visibility_rel(PG_FUNCTION_ARGS)209 pg_visibility_rel(PG_FUNCTION_ARGS)
210 {
211 	FuncCallContext *funcctx;
212 	vbits	   *info;
213 
214 	if (SRF_IS_FIRSTCALL())
215 	{
216 		Oid			relid = PG_GETARG_OID(0);
217 		MemoryContext oldcontext;
218 
219 		funcctx = SRF_FIRSTCALL_INIT();
220 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
221 		funcctx->tuple_desc = pg_visibility_tupdesc(true, true);
222 		/* collect_visibility_data will verify the relkind */
223 		funcctx->user_fctx = collect_visibility_data(relid, true);
224 		MemoryContextSwitchTo(oldcontext);
225 	}
226 
227 	funcctx = SRF_PERCALL_SETUP();
228 	info = (vbits *) funcctx->user_fctx;
229 
230 	if (info->next < info->count)
231 	{
232 		Datum		values[4];
233 		bool		nulls[4];
234 		HeapTuple	tuple;
235 
236 		MemSet(nulls, 0, sizeof(nulls));
237 		values[0] = Int64GetDatum(info->next);
238 		values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
239 		values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
240 		values[3] = BoolGetDatum((info->bits[info->next] & (1 << 2)) != 0);
241 		info->next++;
242 
243 		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
244 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
245 	}
246 
247 	SRF_RETURN_DONE(funcctx);
248 }
249 
250 /*
251  * Count the number of all-visible and all-frozen pages in the visibility
252  * map for a particular relation.
253  */
254 Datum
pg_visibility_map_summary(PG_FUNCTION_ARGS)255 pg_visibility_map_summary(PG_FUNCTION_ARGS)
256 {
257 	Oid			relid = PG_GETARG_OID(0);
258 	Relation	rel;
259 	BlockNumber nblocks;
260 	BlockNumber blkno;
261 	Buffer		vmbuffer = InvalidBuffer;
262 	int64		all_visible = 0;
263 	int64		all_frozen = 0;
264 	TupleDesc	tupdesc;
265 	Datum		values[2];
266 	bool		nulls[2];
267 
268 	rel = relation_open(relid, AccessShareLock);
269 
270 	/* Only some relkinds have a visibility map */
271 	check_relation_relkind(rel);
272 
273 	nblocks = RelationGetNumberOfBlocks(rel);
274 
275 	for (blkno = 0; blkno < nblocks; ++blkno)
276 	{
277 		int32		mapbits;
278 
279 		/* Make sure we are interruptible. */
280 		CHECK_FOR_INTERRUPTS();
281 
282 		/* Get map info. */
283 		mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
284 		if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
285 			++all_visible;
286 		if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
287 			++all_frozen;
288 	}
289 
290 	/* Clean up. */
291 	if (vmbuffer != InvalidBuffer)
292 		ReleaseBuffer(vmbuffer);
293 	relation_close(rel, AccessShareLock);
294 
295 	tupdesc = CreateTemplateTupleDesc(2, false);
296 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "all_visible", INT8OID, -1, 0);
297 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "all_frozen", INT8OID, -1, 0);
298 	tupdesc = BlessTupleDesc(tupdesc);
299 
300 	MemSet(nulls, 0, sizeof(nulls));
301 	values[0] = Int64GetDatum(all_visible);
302 	values[1] = Int64GetDatum(all_frozen);
303 
304 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
305 }
306 
307 /*
308  * Return the TIDs of non-frozen tuples present in pages marked all-frozen
309  * in the visibility map.  We hope no one will ever find any, but there could
310  * be bugs, database corruption, etc.
311  */
312 Datum
pg_check_frozen(PG_FUNCTION_ARGS)313 pg_check_frozen(PG_FUNCTION_ARGS)
314 {
315 	FuncCallContext *funcctx;
316 	corrupt_items *items;
317 
318 	if (SRF_IS_FIRSTCALL())
319 	{
320 		Oid			relid = PG_GETARG_OID(0);
321 		MemoryContext oldcontext;
322 
323 		funcctx = SRF_FIRSTCALL_INIT();
324 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
325 		/* collect_corrupt_items will verify the relkind */
326 		funcctx->user_fctx = collect_corrupt_items(relid, false, true);
327 		MemoryContextSwitchTo(oldcontext);
328 	}
329 
330 	funcctx = SRF_PERCALL_SETUP();
331 	items = (corrupt_items *) funcctx->user_fctx;
332 
333 	if (items->next < items->count)
334 		SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
335 
336 	SRF_RETURN_DONE(funcctx);
337 }
338 
339 /*
340  * Return the TIDs of not-all-visible tuples in pages marked all-visible
341  * in the visibility map.  We hope no one will ever find any, but there could
342  * be bugs, database corruption, etc.
343  */
344 Datum
pg_check_visible(PG_FUNCTION_ARGS)345 pg_check_visible(PG_FUNCTION_ARGS)
346 {
347 	FuncCallContext *funcctx;
348 	corrupt_items *items;
349 
350 	if (SRF_IS_FIRSTCALL())
351 	{
352 		Oid			relid = PG_GETARG_OID(0);
353 		MemoryContext oldcontext;
354 
355 		funcctx = SRF_FIRSTCALL_INIT();
356 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
357 		/* collect_corrupt_items will verify the relkind */
358 		funcctx->user_fctx = collect_corrupt_items(relid, true, false);
359 		MemoryContextSwitchTo(oldcontext);
360 	}
361 
362 	funcctx = SRF_PERCALL_SETUP();
363 	items = (corrupt_items *) funcctx->user_fctx;
364 
365 	if (items->next < items->count)
366 		SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
367 
368 	SRF_RETURN_DONE(funcctx);
369 }
370 
371 /*
372  * Remove the visibility map fork for a relation.  If there turn out to be
373  * any bugs in the visibility map code that require rebuilding the VM, this
374  * provides users with a way to do it that is cleaner than shutting down the
375  * server and removing files by hand.
376  *
377  * This is a cut-down version of RelationTruncate.
378  */
379 Datum
pg_truncate_visibility_map(PG_FUNCTION_ARGS)380 pg_truncate_visibility_map(PG_FUNCTION_ARGS)
381 {
382 	Oid			relid = PG_GETARG_OID(0);
383 	Relation	rel;
384 
385 	rel = relation_open(relid, AccessExclusiveLock);
386 
387 	/* Only some relkinds have a visibility map */
388 	check_relation_relkind(rel);
389 
390 	RelationOpenSmgr(rel);
391 	rel->rd_smgr->smgr_vm_nblocks = InvalidBlockNumber;
392 
393 	visibilitymap_truncate(rel, 0);
394 
395 	if (RelationNeedsWAL(rel))
396 	{
397 		xl_smgr_truncate xlrec;
398 
399 		xlrec.blkno = 0;
400 		xlrec.rnode = rel->rd_node;
401 		xlrec.flags = SMGR_TRUNCATE_VM;
402 
403 		XLogBeginInsert();
404 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
405 
406 		XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
407 	}
408 
409 	/*
410 	 * Release the lock right away, not at commit time.
411 	 *
412 	 * It would be a problem to release the lock prior to commit if this
413 	 * truncate operation sends any transactional invalidation messages. Other
414 	 * backends would potentially be able to lock the relation without
415 	 * processing them in the window of time between when we release the lock
416 	 * here and when we sent the messages at our eventual commit.  However,
417 	 * we're currently only sending a non-transactional smgr invalidation,
418 	 * which will have been posted to shared memory immediately from within
419 	 * visibilitymap_truncate.  Therefore, there should be no race here.
420 	 *
421 	 * The reason why it's desirable to release the lock early here is because
422 	 * of the possibility that someone will need to use this to blow away many
423 	 * visibility map forks at once.  If we can't release the lock until
424 	 * commit time, the transaction doing this will accumulate
425 	 * AccessExclusiveLocks on all of those relations at the same time, which
426 	 * is undesirable. However, if this turns out to be unsafe we may have no
427 	 * choice...
428 	 */
429 	relation_close(rel, AccessExclusiveLock);
430 
431 	/* Nothing to return. */
432 	PG_RETURN_VOID();
433 }
434 
435 /*
436  * Helper function to construct whichever TupleDesc we need for a particular
437  * call.
438  */
439 static TupleDesc
pg_visibility_tupdesc(bool include_blkno,bool include_pd)440 pg_visibility_tupdesc(bool include_blkno, bool include_pd)
441 {
442 	TupleDesc	tupdesc;
443 	AttrNumber	maxattr = 2;
444 	AttrNumber	a = 0;
445 
446 	if (include_blkno)
447 		++maxattr;
448 	if (include_pd)
449 		++maxattr;
450 	tupdesc = CreateTemplateTupleDesc(maxattr, false);
451 	if (include_blkno)
452 		TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
453 	TupleDescInitEntry(tupdesc, ++a, "all_visible", BOOLOID, -1, 0);
454 	TupleDescInitEntry(tupdesc, ++a, "all_frozen", BOOLOID, -1, 0);
455 	if (include_pd)
456 		TupleDescInitEntry(tupdesc, ++a, "pd_all_visible", BOOLOID, -1, 0);
457 	Assert(a == maxattr);
458 
459 	return BlessTupleDesc(tupdesc);
460 }
461 
462 /*
463  * Collect visibility data about a relation.
464  *
465  * Checks relkind of relid and will throw an error if the relation does not
466  * have a VM.
467  */
468 static vbits *
collect_visibility_data(Oid relid,bool include_pd)469 collect_visibility_data(Oid relid, bool include_pd)
470 {
471 	Relation	rel;
472 	BlockNumber nblocks;
473 	vbits	   *info;
474 	BlockNumber blkno;
475 	Buffer		vmbuffer = InvalidBuffer;
476 	BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
477 
478 	rel = relation_open(relid, AccessShareLock);
479 
480 	/* Only some relkinds have a visibility map */
481 	check_relation_relkind(rel);
482 
483 	nblocks = RelationGetNumberOfBlocks(rel);
484 	info = palloc0(offsetof(vbits, bits) + nblocks);
485 	info->next = 0;
486 	info->count = nblocks;
487 
488 	for (blkno = 0; blkno < nblocks; ++blkno)
489 	{
490 		int32		mapbits;
491 
492 		/* Make sure we are interruptible. */
493 		CHECK_FOR_INTERRUPTS();
494 
495 		/* Get map info. */
496 		mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
497 		if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
498 			info->bits[blkno] |= (1 << 0);
499 		if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
500 			info->bits[blkno] |= (1 << 1);
501 
502 		/*
503 		 * Page-level data requires reading every block, so only get it if the
504 		 * caller needs it.  Use a buffer access strategy, too, to prevent
505 		 * cache-trashing.
506 		 */
507 		if (include_pd)
508 		{
509 			Buffer		buffer;
510 			Page		page;
511 
512 			buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
513 										bstrategy);
514 			LockBuffer(buffer, BUFFER_LOCK_SHARE);
515 
516 			page = BufferGetPage(buffer);
517 			if (PageIsAllVisible(page))
518 				info->bits[blkno] |= (1 << 2);
519 
520 			UnlockReleaseBuffer(buffer);
521 		}
522 	}
523 
524 	/* Clean up. */
525 	if (vmbuffer != InvalidBuffer)
526 		ReleaseBuffer(vmbuffer);
527 	relation_close(rel, AccessShareLock);
528 
529 	return info;
530 }
531 
532 /*
533  * Returns a list of items whose visibility map information does not match
534  * the status of the tuples on the page.
535  *
536  * If all_visible is passed as true, this will include all items which are
537  * on pages marked as all-visible in the visibility map but which do not
538  * seem to in fact be all-visible.
539  *
540  * If all_frozen is passed as true, this will include all items which are
541  * on pages marked as all-frozen but which do not seem to in fact be frozen.
542  *
543  * Checks relkind of relid and will throw an error if the relation does not
544  * have a VM.
545  */
546 static corrupt_items *
collect_corrupt_items(Oid relid,bool all_visible,bool all_frozen)547 collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
548 {
549 	Relation	rel;
550 	BlockNumber nblocks;
551 	corrupt_items *items;
552 	BlockNumber blkno;
553 	Buffer		vmbuffer = InvalidBuffer;
554 	BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
555 	TransactionId OldestXmin = InvalidTransactionId;
556 
557 	if (all_visible)
558 	{
559 		/* Don't pass rel; that will fail in recovery. */
560 		OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
561 	}
562 
563 	rel = relation_open(relid, AccessShareLock);
564 
565 	/* Only some relkinds have a visibility map */
566 	check_relation_relkind(rel);
567 
568 	nblocks = RelationGetNumberOfBlocks(rel);
569 
570 	/*
571 	 * Guess an initial array size. We don't expect many corrupted tuples, so
572 	 * start with a small array.  This function uses the "next" field to track
573 	 * the next offset where we can store an item (which is the same thing as
574 	 * the number of items found so far) and the "count" field to track the
575 	 * number of entries allocated.  We'll repurpose these fields before
576 	 * returning.
577 	 */
578 	items = palloc0(sizeof(corrupt_items));
579 	items->next = 0;
580 	items->count = 64;
581 	items->tids = palloc(items->count * sizeof(ItemPointerData));
582 
583 	/* Loop over every block in the relation. */
584 	for (blkno = 0; blkno < nblocks; ++blkno)
585 	{
586 		bool		check_frozen = false;
587 		bool		check_visible = false;
588 		Buffer		buffer;
589 		Page		page;
590 		OffsetNumber offnum,
591 					maxoff;
592 
593 		/* Make sure we are interruptible. */
594 		CHECK_FOR_INTERRUPTS();
595 
596 		/* Use the visibility map to decide whether to check this page. */
597 		if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
598 			check_frozen = true;
599 		if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
600 			check_visible = true;
601 		if (!check_visible && !check_frozen)
602 			continue;
603 
604 		/* Read and lock the page. */
605 		buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
606 									bstrategy);
607 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
608 
609 		page = BufferGetPage(buffer);
610 		maxoff = PageGetMaxOffsetNumber(page);
611 
612 		/*
613 		 * The visibility map bits might have changed while we were acquiring
614 		 * the page lock.  Recheck to avoid returning spurious results.
615 		 */
616 		if (check_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
617 			check_frozen = false;
618 		if (check_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
619 			check_visible = false;
620 		if (!check_visible && !check_frozen)
621 		{
622 			UnlockReleaseBuffer(buffer);
623 			continue;
624 		}
625 
626 		/* Iterate over each tuple on the page. */
627 		for (offnum = FirstOffsetNumber;
628 			 offnum <= maxoff;
629 			 offnum = OffsetNumberNext(offnum))
630 		{
631 			HeapTupleData tuple;
632 			ItemId		itemid;
633 
634 			itemid = PageGetItemId(page, offnum);
635 
636 			/* Unused or redirect line pointers are of no interest. */
637 			if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
638 				continue;
639 
640 			/* Dead line pointers are neither all-visible nor frozen. */
641 			if (ItemIdIsDead(itemid))
642 			{
643 				ItemPointerSet(&(tuple.t_self), blkno, offnum);
644 				record_corrupt_item(items, &tuple.t_self);
645 				continue;
646 			}
647 
648 			/* Initialize a HeapTupleData structure for checks below. */
649 			ItemPointerSet(&(tuple.t_self), blkno, offnum);
650 			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
651 			tuple.t_len = ItemIdGetLength(itemid);
652 			tuple.t_tableOid = relid;
653 
654 			/*
655 			 * If we're checking whether the page is all-visible, we expect
656 			 * the tuple to be all-visible.
657 			 */
658 			if (check_visible &&
659 				!tuple_all_visible(&tuple, OldestXmin, buffer))
660 			{
661 				TransactionId RecomputedOldestXmin;
662 
663 				/*
664 				 * Time has passed since we computed OldestXmin, so it's
665 				 * possible that this tuple is all-visible in reality even
666 				 * though it doesn't appear so based on our
667 				 * previously-computed value.  Let's compute a new value so we
668 				 * can be certain whether there is a problem.
669 				 *
670 				 * From a concurrency point of view, it sort of sucks to
671 				 * retake ProcArrayLock here while we're holding the buffer
672 				 * exclusively locked, but it should be safe against
673 				 * deadlocks, because surely GetOldestXmin() should never take
674 				 * a buffer lock. And this shouldn't happen often, so it's
675 				 * worth being careful so as to avoid false positives.
676 				 */
677 				RecomputedOldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
678 
679 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
680 					record_corrupt_item(items, &tuple.t_self);
681 				else
682 				{
683 					OldestXmin = RecomputedOldestXmin;
684 					if (!tuple_all_visible(&tuple, OldestXmin, buffer))
685 						record_corrupt_item(items, &tuple.t_self);
686 				}
687 			}
688 
689 			/*
690 			 * If we're checking whether the page is all-frozen, we expect the
691 			 * tuple to be in a state where it will never need freezing.
692 			 */
693 			if (check_frozen)
694 			{
695 				if (heap_tuple_needs_eventual_freeze(tuple.t_data))
696 					record_corrupt_item(items, &tuple.t_self);
697 			}
698 		}
699 
700 		UnlockReleaseBuffer(buffer);
701 	}
702 
703 	/* Clean up. */
704 	if (vmbuffer != InvalidBuffer)
705 		ReleaseBuffer(vmbuffer);
706 	relation_close(rel, AccessShareLock);
707 
708 	/*
709 	 * Before returning, repurpose the fields to match caller's expectations.
710 	 * next is now the next item that should be read (rather than written) and
711 	 * count is now the number of items we wrote (rather than the number we
712 	 * allocated).
713 	 */
714 	items->count = items->next;
715 	items->next = 0;
716 
717 	return items;
718 }
719 
720 /*
721  * Remember one corrupt item.
722  */
723 static void
record_corrupt_item(corrupt_items * items,ItemPointer tid)724 record_corrupt_item(corrupt_items *items, ItemPointer tid)
725 {
726 	/* enlarge output array if needed. */
727 	if (items->next >= items->count)
728 	{
729 		items->count *= 2;
730 		items->tids = repalloc(items->tids,
731 							   items->count * sizeof(ItemPointerData));
732 	}
733 	/* and add the new item */
734 	items->tids[items->next++] = *tid;
735 }
736 
737 /*
738  * Check whether a tuple is all-visible relative to a given OldestXmin value.
739  * The buffer should contain the tuple and should be locked and pinned.
740  */
741 static bool
tuple_all_visible(HeapTuple tup,TransactionId OldestXmin,Buffer buffer)742 tuple_all_visible(HeapTuple tup, TransactionId OldestXmin, Buffer buffer)
743 {
744 	HTSV_Result state;
745 	TransactionId xmin;
746 
747 	state = HeapTupleSatisfiesVacuum(tup, OldestXmin, buffer);
748 	if (state != HEAPTUPLE_LIVE)
749 		return false;			/* all-visible implies live */
750 
751 	/*
752 	 * Neither lazy_scan_heap nor heap_page_is_all_visible will mark a page
753 	 * all-visible unless every tuple is hinted committed. However, those hint
754 	 * bits could be lost after a crash, so we can't be certain that they'll
755 	 * be set here.  So just check the xmin.
756 	 */
757 
758 	xmin = HeapTupleHeaderGetXmin(tup->t_data);
759 	if (!TransactionIdPrecedes(xmin, OldestXmin))
760 		return false;			/* xmin not old enough for all to see */
761 
762 	return true;
763 }
764 
765 /*
766  * check_relation_relkind - convenience routine to check that relation
767  * is of the relkind supported by the callers
768  */
769 static void
check_relation_relkind(Relation rel)770 check_relation_relkind(Relation rel)
771 {
772 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
773 		rel->rd_rel->relkind != RELKIND_MATVIEW &&
774 		rel->rd_rel->relkind != RELKIND_TOASTVALUE)
775 		ereport(ERROR,
776 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
777 				 errmsg("\"%s\" is not a table, materialized view, or TOAST table",
778 						RelationGetRelationName(rel))));
779 }
780