1 /*-------------------------------------------------------------------------
2 *
3 * pg_visibility.c
4 * display visibility map information and page-level visibility bits
5 *
6 * Copyright (c) 2016-2021, PostgreSQL Global Development Group
7 *
8 * contrib/pg_visibility/pg_visibility.c
9 *-------------------------------------------------------------------------
10 */
11 #include "postgres.h"
12
13 #include "access/heapam.h"
14 #include "access/htup_details.h"
15 #include "access/visibilitymap.h"
16 #include "catalog/pg_type.h"
17 #include "catalog/storage_xlog.h"
18 #include "funcapi.h"
19 #include "miscadmin.h"
20 #include "storage/bufmgr.h"
21 #include "storage/procarray.h"
22 #include "storage/smgr.h"
23 #include "utils/rel.h"
24 #include "utils/snapmgr.h"
25
26 PG_MODULE_MAGIC;
27
28 typedef struct vbits
29 {
30 BlockNumber next;
31 BlockNumber count;
32 uint8 bits[FLEXIBLE_ARRAY_MEMBER];
33 } vbits;
34
35 typedef struct corrupt_items
36 {
37 BlockNumber next;
38 BlockNumber count;
39 ItemPointer tids;
40 } corrupt_items;
41
42 PG_FUNCTION_INFO_V1(pg_visibility_map);
43 PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
44 PG_FUNCTION_INFO_V1(pg_visibility);
45 PG_FUNCTION_INFO_V1(pg_visibility_rel);
46 PG_FUNCTION_INFO_V1(pg_visibility_map_summary);
47 PG_FUNCTION_INFO_V1(pg_check_frozen);
48 PG_FUNCTION_INFO_V1(pg_check_visible);
49 PG_FUNCTION_INFO_V1(pg_truncate_visibility_map);
50
51 static TupleDesc pg_visibility_tupdesc(bool include_blkno, bool include_pd);
52 static vbits *collect_visibility_data(Oid relid, bool include_pd);
53 static corrupt_items *collect_corrupt_items(Oid relid, bool all_visible,
54 bool all_frozen);
55 static void record_corrupt_item(corrupt_items *items, ItemPointer tid);
56 static bool tuple_all_visible(HeapTuple tup, TransactionId OldestXmin,
57 Buffer buffer);
58 static void check_relation_relkind(Relation rel);
59
60 /*
61 * Visibility map information for a single block of a relation.
62 *
63 * Note: the VM code will silently return zeroes for pages past the end
64 * of the map, so we allow probes up to MaxBlockNumber regardless of the
65 * actual relation size.
66 */
67 Datum
pg_visibility_map(PG_FUNCTION_ARGS)68 pg_visibility_map(PG_FUNCTION_ARGS)
69 {
70 Oid relid = PG_GETARG_OID(0);
71 int64 blkno = PG_GETARG_INT64(1);
72 int32 mapbits;
73 Relation rel;
74 Buffer vmbuffer = InvalidBuffer;
75 TupleDesc tupdesc;
76 Datum values[2];
77 bool nulls[2];
78
79 rel = relation_open(relid, AccessShareLock);
80
81 /* Only some relkinds have a visibility map */
82 check_relation_relkind(rel);
83
84 if (blkno < 0 || blkno > MaxBlockNumber)
85 ereport(ERROR,
86 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
87 errmsg("invalid block number")));
88
89 tupdesc = pg_visibility_tupdesc(false, false);
90 MemSet(nulls, 0, sizeof(nulls));
91
92 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
93 if (vmbuffer != InvalidBuffer)
94 ReleaseBuffer(vmbuffer);
95 values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
96 values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
97
98 relation_close(rel, AccessShareLock);
99
100 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
101 }
102
103 /*
104 * Visibility map information for a single block of a relation, plus the
105 * page-level information for the same block.
106 */
107 Datum
pg_visibility(PG_FUNCTION_ARGS)108 pg_visibility(PG_FUNCTION_ARGS)
109 {
110 Oid relid = PG_GETARG_OID(0);
111 int64 blkno = PG_GETARG_INT64(1);
112 int32 mapbits;
113 Relation rel;
114 Buffer vmbuffer = InvalidBuffer;
115 Buffer buffer;
116 Page page;
117 TupleDesc tupdesc;
118 Datum values[3];
119 bool nulls[3];
120
121 rel = relation_open(relid, AccessShareLock);
122
123 /* Only some relkinds have a visibility map */
124 check_relation_relkind(rel);
125
126 if (blkno < 0 || blkno > MaxBlockNumber)
127 ereport(ERROR,
128 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129 errmsg("invalid block number")));
130
131 tupdesc = pg_visibility_tupdesc(false, true);
132 MemSet(nulls, 0, sizeof(nulls));
133
134 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
135 if (vmbuffer != InvalidBuffer)
136 ReleaseBuffer(vmbuffer);
137 values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
138 values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
139
140 /* Here we have to explicitly check rel size ... */
141 if (blkno < RelationGetNumberOfBlocks(rel))
142 {
143 buffer = ReadBuffer(rel, blkno);
144 LockBuffer(buffer, BUFFER_LOCK_SHARE);
145
146 page = BufferGetPage(buffer);
147 values[2] = BoolGetDatum(PageIsAllVisible(page));
148
149 UnlockReleaseBuffer(buffer);
150 }
151 else
152 {
153 /* As with the vismap, silently return 0 for pages past EOF */
154 values[2] = BoolGetDatum(false);
155 }
156
157 relation_close(rel, AccessShareLock);
158
159 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
160 }
161
162 /*
163 * Visibility map information for every block in a relation.
164 */
165 Datum
pg_visibility_map_rel(PG_FUNCTION_ARGS)166 pg_visibility_map_rel(PG_FUNCTION_ARGS)
167 {
168 FuncCallContext *funcctx;
169 vbits *info;
170
171 if (SRF_IS_FIRSTCALL())
172 {
173 Oid relid = PG_GETARG_OID(0);
174 MemoryContext oldcontext;
175
176 funcctx = SRF_FIRSTCALL_INIT();
177 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
178 funcctx->tuple_desc = pg_visibility_tupdesc(true, false);
179 /* collect_visibility_data will verify the relkind */
180 funcctx->user_fctx = collect_visibility_data(relid, false);
181 MemoryContextSwitchTo(oldcontext);
182 }
183
184 funcctx = SRF_PERCALL_SETUP();
185 info = (vbits *) funcctx->user_fctx;
186
187 if (info->next < info->count)
188 {
189 Datum values[3];
190 bool nulls[3];
191 HeapTuple tuple;
192
193 MemSet(nulls, 0, sizeof(nulls));
194 values[0] = Int64GetDatum(info->next);
195 values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
196 values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
197 info->next++;
198
199 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
200 SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
201 }
202
203 SRF_RETURN_DONE(funcctx);
204 }
205
206 /*
207 * Visibility map information for every block in a relation, plus the page
208 * level information for each block.
209 */
210 Datum
pg_visibility_rel(PG_FUNCTION_ARGS)211 pg_visibility_rel(PG_FUNCTION_ARGS)
212 {
213 FuncCallContext *funcctx;
214 vbits *info;
215
216 if (SRF_IS_FIRSTCALL())
217 {
218 Oid relid = PG_GETARG_OID(0);
219 MemoryContext oldcontext;
220
221 funcctx = SRF_FIRSTCALL_INIT();
222 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
223 funcctx->tuple_desc = pg_visibility_tupdesc(true, true);
224 /* collect_visibility_data will verify the relkind */
225 funcctx->user_fctx = collect_visibility_data(relid, true);
226 MemoryContextSwitchTo(oldcontext);
227 }
228
229 funcctx = SRF_PERCALL_SETUP();
230 info = (vbits *) funcctx->user_fctx;
231
232 if (info->next < info->count)
233 {
234 Datum values[4];
235 bool nulls[4];
236 HeapTuple tuple;
237
238 MemSet(nulls, 0, sizeof(nulls));
239 values[0] = Int64GetDatum(info->next);
240 values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
241 values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
242 values[3] = BoolGetDatum((info->bits[info->next] & (1 << 2)) != 0);
243 info->next++;
244
245 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
246 SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
247 }
248
249 SRF_RETURN_DONE(funcctx);
250 }
251
252 /*
253 * Count the number of all-visible and all-frozen pages in the visibility
254 * map for a particular relation.
255 */
256 Datum
pg_visibility_map_summary(PG_FUNCTION_ARGS)257 pg_visibility_map_summary(PG_FUNCTION_ARGS)
258 {
259 Oid relid = PG_GETARG_OID(0);
260 Relation rel;
261 BlockNumber nblocks;
262 BlockNumber blkno;
263 Buffer vmbuffer = InvalidBuffer;
264 int64 all_visible = 0;
265 int64 all_frozen = 0;
266 TupleDesc tupdesc;
267 Datum values[2];
268 bool nulls[2];
269
270 rel = relation_open(relid, AccessShareLock);
271
272 /* Only some relkinds have a visibility map */
273 check_relation_relkind(rel);
274
275 nblocks = RelationGetNumberOfBlocks(rel);
276
277 for (blkno = 0; blkno < nblocks; ++blkno)
278 {
279 int32 mapbits;
280
281 /* Make sure we are interruptible. */
282 CHECK_FOR_INTERRUPTS();
283
284 /* Get map info. */
285 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
286 if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
287 ++all_visible;
288 if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
289 ++all_frozen;
290 }
291
292 /* Clean up. */
293 if (vmbuffer != InvalidBuffer)
294 ReleaseBuffer(vmbuffer);
295 relation_close(rel, AccessShareLock);
296
297 tupdesc = CreateTemplateTupleDesc(2);
298 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "all_visible", INT8OID, -1, 0);
299 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "all_frozen", INT8OID, -1, 0);
300 tupdesc = BlessTupleDesc(tupdesc);
301
302 MemSet(nulls, 0, sizeof(nulls));
303 values[0] = Int64GetDatum(all_visible);
304 values[1] = Int64GetDatum(all_frozen);
305
306 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
307 }
308
309 /*
310 * Return the TIDs of non-frozen tuples present in pages marked all-frozen
311 * in the visibility map. We hope no one will ever find any, but there could
312 * be bugs, database corruption, etc.
313 */
314 Datum
pg_check_frozen(PG_FUNCTION_ARGS)315 pg_check_frozen(PG_FUNCTION_ARGS)
316 {
317 FuncCallContext *funcctx;
318 corrupt_items *items;
319
320 if (SRF_IS_FIRSTCALL())
321 {
322 Oid relid = PG_GETARG_OID(0);
323 MemoryContext oldcontext;
324
325 funcctx = SRF_FIRSTCALL_INIT();
326 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
327 /* collect_corrupt_items will verify the relkind */
328 funcctx->user_fctx = collect_corrupt_items(relid, false, true);
329 MemoryContextSwitchTo(oldcontext);
330 }
331
332 funcctx = SRF_PERCALL_SETUP();
333 items = (corrupt_items *) funcctx->user_fctx;
334
335 if (items->next < items->count)
336 SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
337
338 SRF_RETURN_DONE(funcctx);
339 }
340
341 /*
342 * Return the TIDs of not-all-visible tuples in pages marked all-visible
343 * in the visibility map. We hope no one will ever find any, but there could
344 * be bugs, database corruption, etc.
345 */
346 Datum
pg_check_visible(PG_FUNCTION_ARGS)347 pg_check_visible(PG_FUNCTION_ARGS)
348 {
349 FuncCallContext *funcctx;
350 corrupt_items *items;
351
352 if (SRF_IS_FIRSTCALL())
353 {
354 Oid relid = PG_GETARG_OID(0);
355 MemoryContext oldcontext;
356
357 funcctx = SRF_FIRSTCALL_INIT();
358 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
359 /* collect_corrupt_items will verify the relkind */
360 funcctx->user_fctx = collect_corrupt_items(relid, true, false);
361 MemoryContextSwitchTo(oldcontext);
362 }
363
364 funcctx = SRF_PERCALL_SETUP();
365 items = (corrupt_items *) funcctx->user_fctx;
366
367 if (items->next < items->count)
368 SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
369
370 SRF_RETURN_DONE(funcctx);
371 }
372
373 /*
374 * Remove the visibility map fork for a relation. If there turn out to be
375 * any bugs in the visibility map code that require rebuilding the VM, this
376 * provides users with a way to do it that is cleaner than shutting down the
377 * server and removing files by hand.
378 *
379 * This is a cut-down version of RelationTruncate.
380 */
381 Datum
pg_truncate_visibility_map(PG_FUNCTION_ARGS)382 pg_truncate_visibility_map(PG_FUNCTION_ARGS)
383 {
384 Oid relid = PG_GETARG_OID(0);
385 Relation rel;
386 ForkNumber fork;
387 BlockNumber block;
388
389 rel = relation_open(relid, AccessExclusiveLock);
390
391 /* Only some relkinds have a visibility map */
392 check_relation_relkind(rel);
393
394 RelationOpenSmgr(rel);
395 rel->rd_smgr->smgr_cached_nblocks[VISIBILITYMAP_FORKNUM] = InvalidBlockNumber;
396
397 block = visibilitymap_prepare_truncate(rel, 0);
398 if (BlockNumberIsValid(block))
399 {
400 fork = VISIBILITYMAP_FORKNUM;
401 smgrtruncate(rel->rd_smgr, &fork, 1, &block);
402 }
403
404 if (RelationNeedsWAL(rel))
405 {
406 xl_smgr_truncate xlrec;
407
408 xlrec.blkno = 0;
409 xlrec.rnode = rel->rd_node;
410 xlrec.flags = SMGR_TRUNCATE_VM;
411
412 XLogBeginInsert();
413 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
414
415 XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
416 }
417
418 /*
419 * Release the lock right away, not at commit time.
420 *
421 * It would be a problem to release the lock prior to commit if this
422 * truncate operation sends any transactional invalidation messages. Other
423 * backends would potentially be able to lock the relation without
424 * processing them in the window of time between when we release the lock
425 * here and when we sent the messages at our eventual commit. However,
426 * we're currently only sending a non-transactional smgr invalidation,
427 * which will have been posted to shared memory immediately from within
428 * smgr_truncate. Therefore, there should be no race here.
429 *
430 * The reason why it's desirable to release the lock early here is because
431 * of the possibility that someone will need to use this to blow away many
432 * visibility map forks at once. If we can't release the lock until
433 * commit time, the transaction doing this will accumulate
434 * AccessExclusiveLocks on all of those relations at the same time, which
435 * is undesirable. However, if this turns out to be unsafe we may have no
436 * choice...
437 */
438 relation_close(rel, AccessExclusiveLock);
439
440 /* Nothing to return. */
441 PG_RETURN_VOID();
442 }
443
444 /*
445 * Helper function to construct whichever TupleDesc we need for a particular
446 * call.
447 */
448 static TupleDesc
pg_visibility_tupdesc(bool include_blkno,bool include_pd)449 pg_visibility_tupdesc(bool include_blkno, bool include_pd)
450 {
451 TupleDesc tupdesc;
452 AttrNumber maxattr = 2;
453 AttrNumber a = 0;
454
455 if (include_blkno)
456 ++maxattr;
457 if (include_pd)
458 ++maxattr;
459 tupdesc = CreateTemplateTupleDesc(maxattr);
460 if (include_blkno)
461 TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
462 TupleDescInitEntry(tupdesc, ++a, "all_visible", BOOLOID, -1, 0);
463 TupleDescInitEntry(tupdesc, ++a, "all_frozen", BOOLOID, -1, 0);
464 if (include_pd)
465 TupleDescInitEntry(tupdesc, ++a, "pd_all_visible", BOOLOID, -1, 0);
466 Assert(a == maxattr);
467
468 return BlessTupleDesc(tupdesc);
469 }
470
471 /*
472 * Collect visibility data about a relation.
473 *
474 * Checks relkind of relid and will throw an error if the relation does not
475 * have a VM.
476 */
477 static vbits *
collect_visibility_data(Oid relid,bool include_pd)478 collect_visibility_data(Oid relid, bool include_pd)
479 {
480 Relation rel;
481 BlockNumber nblocks;
482 vbits *info;
483 BlockNumber blkno;
484 Buffer vmbuffer = InvalidBuffer;
485 BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
486
487 rel = relation_open(relid, AccessShareLock);
488
489 /* Only some relkinds have a visibility map */
490 check_relation_relkind(rel);
491
492 nblocks = RelationGetNumberOfBlocks(rel);
493 info = palloc0(offsetof(vbits, bits) + nblocks);
494 info->next = 0;
495 info->count = nblocks;
496
497 for (blkno = 0; blkno < nblocks; ++blkno)
498 {
499 int32 mapbits;
500
501 /* Make sure we are interruptible. */
502 CHECK_FOR_INTERRUPTS();
503
504 /* Get map info. */
505 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
506 if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
507 info->bits[blkno] |= (1 << 0);
508 if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
509 info->bits[blkno] |= (1 << 1);
510
511 /*
512 * Page-level data requires reading every block, so only get it if the
513 * caller needs it. Use a buffer access strategy, too, to prevent
514 * cache-trashing.
515 */
516 if (include_pd)
517 {
518 Buffer buffer;
519 Page page;
520
521 buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
522 bstrategy);
523 LockBuffer(buffer, BUFFER_LOCK_SHARE);
524
525 page = BufferGetPage(buffer);
526 if (PageIsAllVisible(page))
527 info->bits[blkno] |= (1 << 2);
528
529 UnlockReleaseBuffer(buffer);
530 }
531 }
532
533 /* Clean up. */
534 if (vmbuffer != InvalidBuffer)
535 ReleaseBuffer(vmbuffer);
536 relation_close(rel, AccessShareLock);
537
538 return info;
539 }
540
541 /*
542 * Returns a list of items whose visibility map information does not match
543 * the status of the tuples on the page.
544 *
545 * If all_visible is passed as true, this will include all items which are
546 * on pages marked as all-visible in the visibility map but which do not
547 * seem to in fact be all-visible.
548 *
549 * If all_frozen is passed as true, this will include all items which are
550 * on pages marked as all-frozen but which do not seem to in fact be frozen.
551 *
552 * Checks relkind of relid and will throw an error if the relation does not
553 * have a VM.
554 */
555 static corrupt_items *
collect_corrupt_items(Oid relid,bool all_visible,bool all_frozen)556 collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
557 {
558 Relation rel;
559 BlockNumber nblocks;
560 corrupt_items *items;
561 BlockNumber blkno;
562 Buffer vmbuffer = InvalidBuffer;
563 BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
564 TransactionId OldestXmin = InvalidTransactionId;
565
566 rel = relation_open(relid, AccessShareLock);
567
568 /* Only some relkinds have a visibility map */
569 check_relation_relkind(rel);
570
571 if (all_visible)
572 OldestXmin = GetOldestNonRemovableTransactionId(rel);
573
574 nblocks = RelationGetNumberOfBlocks(rel);
575
576 /*
577 * Guess an initial array size. We don't expect many corrupted tuples, so
578 * start with a small array. This function uses the "next" field to track
579 * the next offset where we can store an item (which is the same thing as
580 * the number of items found so far) and the "count" field to track the
581 * number of entries allocated. We'll repurpose these fields before
582 * returning.
583 */
584 items = palloc0(sizeof(corrupt_items));
585 items->next = 0;
586 items->count = 64;
587 items->tids = palloc(items->count * sizeof(ItemPointerData));
588
589 /* Loop over every block in the relation. */
590 for (blkno = 0; blkno < nblocks; ++blkno)
591 {
592 bool check_frozen = false;
593 bool check_visible = false;
594 Buffer buffer;
595 Page page;
596 OffsetNumber offnum,
597 maxoff;
598
599 /* Make sure we are interruptible. */
600 CHECK_FOR_INTERRUPTS();
601
602 /* Use the visibility map to decide whether to check this page. */
603 if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
604 check_frozen = true;
605 if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
606 check_visible = true;
607 if (!check_visible && !check_frozen)
608 continue;
609
610 /* Read and lock the page. */
611 buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
612 bstrategy);
613 LockBuffer(buffer, BUFFER_LOCK_SHARE);
614
615 page = BufferGetPage(buffer);
616 maxoff = PageGetMaxOffsetNumber(page);
617
618 /*
619 * The visibility map bits might have changed while we were acquiring
620 * the page lock. Recheck to avoid returning spurious results.
621 */
622 if (check_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
623 check_frozen = false;
624 if (check_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
625 check_visible = false;
626 if (!check_visible && !check_frozen)
627 {
628 UnlockReleaseBuffer(buffer);
629 continue;
630 }
631
632 /* Iterate over each tuple on the page. */
633 for (offnum = FirstOffsetNumber;
634 offnum <= maxoff;
635 offnum = OffsetNumberNext(offnum))
636 {
637 HeapTupleData tuple;
638 ItemId itemid;
639
640 itemid = PageGetItemId(page, offnum);
641
642 /* Unused or redirect line pointers are of no interest. */
643 if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
644 continue;
645
646 /* Dead line pointers are neither all-visible nor frozen. */
647 if (ItemIdIsDead(itemid))
648 {
649 ItemPointerSet(&(tuple.t_self), blkno, offnum);
650 record_corrupt_item(items, &tuple.t_self);
651 continue;
652 }
653
654 /* Initialize a HeapTupleData structure for checks below. */
655 ItemPointerSet(&(tuple.t_self), blkno, offnum);
656 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
657 tuple.t_len = ItemIdGetLength(itemid);
658 tuple.t_tableOid = relid;
659
660 /*
661 * If we're checking whether the page is all-visible, we expect
662 * the tuple to be all-visible.
663 */
664 if (check_visible &&
665 !tuple_all_visible(&tuple, OldestXmin, buffer))
666 {
667 TransactionId RecomputedOldestXmin;
668
669 /*
670 * Time has passed since we computed OldestXmin, so it's
671 * possible that this tuple is all-visible in reality even
672 * though it doesn't appear so based on our
673 * previously-computed value. Let's compute a new value so we
674 * can be certain whether there is a problem.
675 *
676 * From a concurrency point of view, it sort of sucks to
677 * retake ProcArrayLock here while we're holding the buffer
678 * exclusively locked, but it should be safe against
679 * deadlocks, because surely
680 * GetOldestNonRemovableTransactionId() should never take a
681 * buffer lock. And this shouldn't happen often, so it's worth
682 * being careful so as to avoid false positives.
683 */
684 RecomputedOldestXmin = GetOldestNonRemovableTransactionId(rel);
685
686 if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
687 record_corrupt_item(items, &tuple.t_self);
688 else
689 {
690 OldestXmin = RecomputedOldestXmin;
691 if (!tuple_all_visible(&tuple, OldestXmin, buffer))
692 record_corrupt_item(items, &tuple.t_self);
693 }
694 }
695
696 /*
697 * If we're checking whether the page is all-frozen, we expect the
698 * tuple to be in a state where it will never need freezing.
699 */
700 if (check_frozen)
701 {
702 if (heap_tuple_needs_eventual_freeze(tuple.t_data))
703 record_corrupt_item(items, &tuple.t_self);
704 }
705 }
706
707 UnlockReleaseBuffer(buffer);
708 }
709
710 /* Clean up. */
711 if (vmbuffer != InvalidBuffer)
712 ReleaseBuffer(vmbuffer);
713 relation_close(rel, AccessShareLock);
714
715 /*
716 * Before returning, repurpose the fields to match caller's expectations.
717 * next is now the next item that should be read (rather than written) and
718 * count is now the number of items we wrote (rather than the number we
719 * allocated).
720 */
721 items->count = items->next;
722 items->next = 0;
723
724 return items;
725 }
726
727 /*
728 * Remember one corrupt item.
729 */
730 static void
record_corrupt_item(corrupt_items * items,ItemPointer tid)731 record_corrupt_item(corrupt_items *items, ItemPointer tid)
732 {
733 /* enlarge output array if needed. */
734 if (items->next >= items->count)
735 {
736 items->count *= 2;
737 items->tids = repalloc(items->tids,
738 items->count * sizeof(ItemPointerData));
739 }
740 /* and add the new item */
741 items->tids[items->next++] = *tid;
742 }
743
744 /*
745 * Check whether a tuple is all-visible relative to a given OldestXmin value.
746 * The buffer should contain the tuple and should be locked and pinned.
747 */
748 static bool
tuple_all_visible(HeapTuple tup,TransactionId OldestXmin,Buffer buffer)749 tuple_all_visible(HeapTuple tup, TransactionId OldestXmin, Buffer buffer)
750 {
751 HTSV_Result state;
752 TransactionId xmin;
753
754 state = HeapTupleSatisfiesVacuum(tup, OldestXmin, buffer);
755 if (state != HEAPTUPLE_LIVE)
756 return false; /* all-visible implies live */
757
758 /*
759 * Neither lazy_scan_heap nor heap_page_is_all_visible will mark a page
760 * all-visible unless every tuple is hinted committed. However, those hint
761 * bits could be lost after a crash, so we can't be certain that they'll
762 * be set here. So just check the xmin.
763 */
764
765 xmin = HeapTupleHeaderGetXmin(tup->t_data);
766 if (!TransactionIdPrecedes(xmin, OldestXmin))
767 return false; /* xmin not old enough for all to see */
768
769 return true;
770 }
771
772 /*
773 * check_relation_relkind - convenience routine to check that relation
774 * is of the relkind supported by the callers
775 */
776 static void
check_relation_relkind(Relation rel)777 check_relation_relkind(Relation rel)
778 {
779 if (rel->rd_rel->relkind != RELKIND_RELATION &&
780 rel->rd_rel->relkind != RELKIND_MATVIEW &&
781 rel->rd_rel->relkind != RELKIND_TOASTVALUE)
782 ereport(ERROR,
783 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
784 errmsg("\"%s\" is not a table, materialized view, or TOAST table",
785 RelationGetRelationName(rel))));
786 }
787