1 /*-------------------------------------------------------------------------
2 *
3 * pg_visibility.c
4 * display visibility map information and page-level visibility bits
5 *
6 * Copyright (c) 2016-2018, PostgreSQL Global Development Group
7 *
8 * contrib/pg_visibility/pg_visibility.c
9 *-------------------------------------------------------------------------
10 */
11 #include "postgres.h"
12
13 #include "access/htup_details.h"
14 #include "access/visibilitymap.h"
15 #include "catalog/pg_type.h"
16 #include "catalog/storage_xlog.h"
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 #include "storage/bufmgr.h"
20 #include "storage/procarray.h"
21 #include "storage/smgr.h"
22 #include "utils/rel.h"
23
24 PG_MODULE_MAGIC;
25
26 typedef struct vbits
27 {
28 BlockNumber next;
29 BlockNumber count;
30 uint8 bits[FLEXIBLE_ARRAY_MEMBER];
31 } vbits;
32
33 typedef struct corrupt_items
34 {
35 BlockNumber next;
36 BlockNumber count;
37 ItemPointer tids;
38 } corrupt_items;
39
40 PG_FUNCTION_INFO_V1(pg_visibility_map);
41 PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
42 PG_FUNCTION_INFO_V1(pg_visibility);
43 PG_FUNCTION_INFO_V1(pg_visibility_rel);
44 PG_FUNCTION_INFO_V1(pg_visibility_map_summary);
45 PG_FUNCTION_INFO_V1(pg_check_frozen);
46 PG_FUNCTION_INFO_V1(pg_check_visible);
47 PG_FUNCTION_INFO_V1(pg_truncate_visibility_map);
48
49 static TupleDesc pg_visibility_tupdesc(bool include_blkno, bool include_pd);
50 static vbits *collect_visibility_data(Oid relid, bool include_pd);
51 static corrupt_items *collect_corrupt_items(Oid relid, bool all_visible,
52 bool all_frozen);
53 static void record_corrupt_item(corrupt_items *items, ItemPointer tid);
54 static bool tuple_all_visible(HeapTuple tup, TransactionId OldestXmin,
55 Buffer buffer);
56 static void check_relation_relkind(Relation rel);
57
58 /*
59 * Visibility map information for a single block of a relation.
60 *
61 * Note: the VM code will silently return zeroes for pages past the end
62 * of the map, so we allow probes up to MaxBlockNumber regardless of the
63 * actual relation size.
64 */
65 Datum
pg_visibility_map(PG_FUNCTION_ARGS)66 pg_visibility_map(PG_FUNCTION_ARGS)
67 {
68 Oid relid = PG_GETARG_OID(0);
69 int64 blkno = PG_GETARG_INT64(1);
70 int32 mapbits;
71 Relation rel;
72 Buffer vmbuffer = InvalidBuffer;
73 TupleDesc tupdesc;
74 Datum values[2];
75 bool nulls[2];
76
77 rel = relation_open(relid, AccessShareLock);
78
79 /* Only some relkinds have a visibility map */
80 check_relation_relkind(rel);
81
82 if (blkno < 0 || blkno > MaxBlockNumber)
83 ereport(ERROR,
84 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 errmsg("invalid block number")));
86
87 tupdesc = pg_visibility_tupdesc(false, false);
88 MemSet(nulls, 0, sizeof(nulls));
89
90 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
91 if (vmbuffer != InvalidBuffer)
92 ReleaseBuffer(vmbuffer);
93 values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
94 values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
95
96 relation_close(rel, AccessShareLock);
97
98 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
99 }
100
101 /*
102 * Visibility map information for a single block of a relation, plus the
103 * page-level information for the same block.
104 */
105 Datum
pg_visibility(PG_FUNCTION_ARGS)106 pg_visibility(PG_FUNCTION_ARGS)
107 {
108 Oid relid = PG_GETARG_OID(0);
109 int64 blkno = PG_GETARG_INT64(1);
110 int32 mapbits;
111 Relation rel;
112 Buffer vmbuffer = InvalidBuffer;
113 Buffer buffer;
114 Page page;
115 TupleDesc tupdesc;
116 Datum values[3];
117 bool nulls[3];
118
119 rel = relation_open(relid, AccessShareLock);
120
121 /* Only some relkinds have a visibility map */
122 check_relation_relkind(rel);
123
124 if (blkno < 0 || blkno > MaxBlockNumber)
125 ereport(ERROR,
126 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
127 errmsg("invalid block number")));
128
129 tupdesc = pg_visibility_tupdesc(false, true);
130 MemSet(nulls, 0, sizeof(nulls));
131
132 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
133 if (vmbuffer != InvalidBuffer)
134 ReleaseBuffer(vmbuffer);
135 values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
136 values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
137
138 /* Here we have to explicitly check rel size ... */
139 if (blkno < RelationGetNumberOfBlocks(rel))
140 {
141 buffer = ReadBuffer(rel, blkno);
142 LockBuffer(buffer, BUFFER_LOCK_SHARE);
143
144 page = BufferGetPage(buffer);
145 values[2] = BoolGetDatum(PageIsAllVisible(page));
146
147 UnlockReleaseBuffer(buffer);
148 }
149 else
150 {
151 /* As with the vismap, silently return 0 for pages past EOF */
152 values[2] = BoolGetDatum(false);
153 }
154
155 relation_close(rel, AccessShareLock);
156
157 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
158 }
159
160 /*
161 * Visibility map information for every block in a relation.
162 */
163 Datum
pg_visibility_map_rel(PG_FUNCTION_ARGS)164 pg_visibility_map_rel(PG_FUNCTION_ARGS)
165 {
166 FuncCallContext *funcctx;
167 vbits *info;
168
169 if (SRF_IS_FIRSTCALL())
170 {
171 Oid relid = PG_GETARG_OID(0);
172 MemoryContext oldcontext;
173
174 funcctx = SRF_FIRSTCALL_INIT();
175 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
176 funcctx->tuple_desc = pg_visibility_tupdesc(true, false);
177 /* collect_visibility_data will verify the relkind */
178 funcctx->user_fctx = collect_visibility_data(relid, false);
179 MemoryContextSwitchTo(oldcontext);
180 }
181
182 funcctx = SRF_PERCALL_SETUP();
183 info = (vbits *) funcctx->user_fctx;
184
185 if (info->next < info->count)
186 {
187 Datum values[3];
188 bool nulls[3];
189 HeapTuple tuple;
190
191 MemSet(nulls, 0, sizeof(nulls));
192 values[0] = Int64GetDatum(info->next);
193 values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
194 values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
195 info->next++;
196
197 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
198 SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
199 }
200
201 SRF_RETURN_DONE(funcctx);
202 }
203
204 /*
205 * Visibility map information for every block in a relation, plus the page
206 * level information for each block.
207 */
208 Datum
pg_visibility_rel(PG_FUNCTION_ARGS)209 pg_visibility_rel(PG_FUNCTION_ARGS)
210 {
211 FuncCallContext *funcctx;
212 vbits *info;
213
214 if (SRF_IS_FIRSTCALL())
215 {
216 Oid relid = PG_GETARG_OID(0);
217 MemoryContext oldcontext;
218
219 funcctx = SRF_FIRSTCALL_INIT();
220 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
221 funcctx->tuple_desc = pg_visibility_tupdesc(true, true);
222 /* collect_visibility_data will verify the relkind */
223 funcctx->user_fctx = collect_visibility_data(relid, true);
224 MemoryContextSwitchTo(oldcontext);
225 }
226
227 funcctx = SRF_PERCALL_SETUP();
228 info = (vbits *) funcctx->user_fctx;
229
230 if (info->next < info->count)
231 {
232 Datum values[4];
233 bool nulls[4];
234 HeapTuple tuple;
235
236 MemSet(nulls, 0, sizeof(nulls));
237 values[0] = Int64GetDatum(info->next);
238 values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
239 values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
240 values[3] = BoolGetDatum((info->bits[info->next] & (1 << 2)) != 0);
241 info->next++;
242
243 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
244 SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
245 }
246
247 SRF_RETURN_DONE(funcctx);
248 }
249
250 /*
251 * Count the number of all-visible and all-frozen pages in the visibility
252 * map for a particular relation.
253 */
254 Datum
pg_visibility_map_summary(PG_FUNCTION_ARGS)255 pg_visibility_map_summary(PG_FUNCTION_ARGS)
256 {
257 Oid relid = PG_GETARG_OID(0);
258 Relation rel;
259 BlockNumber nblocks;
260 BlockNumber blkno;
261 Buffer vmbuffer = InvalidBuffer;
262 int64 all_visible = 0;
263 int64 all_frozen = 0;
264 TupleDesc tupdesc;
265 Datum values[2];
266 bool nulls[2];
267
268 rel = relation_open(relid, AccessShareLock);
269
270 /* Only some relkinds have a visibility map */
271 check_relation_relkind(rel);
272
273 nblocks = RelationGetNumberOfBlocks(rel);
274
275 for (blkno = 0; blkno < nblocks; ++blkno)
276 {
277 int32 mapbits;
278
279 /* Make sure we are interruptible. */
280 CHECK_FOR_INTERRUPTS();
281
282 /* Get map info. */
283 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
284 if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
285 ++all_visible;
286 if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
287 ++all_frozen;
288 }
289
290 /* Clean up. */
291 if (vmbuffer != InvalidBuffer)
292 ReleaseBuffer(vmbuffer);
293 relation_close(rel, AccessShareLock);
294
295 tupdesc = CreateTemplateTupleDesc(2, false);
296 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "all_visible", INT8OID, -1, 0);
297 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "all_frozen", INT8OID, -1, 0);
298 tupdesc = BlessTupleDesc(tupdesc);
299
300 MemSet(nulls, 0, sizeof(nulls));
301 values[0] = Int64GetDatum(all_visible);
302 values[1] = Int64GetDatum(all_frozen);
303
304 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
305 }
306
307 /*
308 * Return the TIDs of non-frozen tuples present in pages marked all-frozen
309 * in the visibility map. We hope no one will ever find any, but there could
310 * be bugs, database corruption, etc.
311 */
312 Datum
pg_check_frozen(PG_FUNCTION_ARGS)313 pg_check_frozen(PG_FUNCTION_ARGS)
314 {
315 FuncCallContext *funcctx;
316 corrupt_items *items;
317
318 if (SRF_IS_FIRSTCALL())
319 {
320 Oid relid = PG_GETARG_OID(0);
321 MemoryContext oldcontext;
322
323 funcctx = SRF_FIRSTCALL_INIT();
324 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
325 /* collect_corrupt_items will verify the relkind */
326 funcctx->user_fctx = collect_corrupt_items(relid, false, true);
327 MemoryContextSwitchTo(oldcontext);
328 }
329
330 funcctx = SRF_PERCALL_SETUP();
331 items = (corrupt_items *) funcctx->user_fctx;
332
333 if (items->next < items->count)
334 SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
335
336 SRF_RETURN_DONE(funcctx);
337 }
338
339 /*
340 * Return the TIDs of not-all-visible tuples in pages marked all-visible
341 * in the visibility map. We hope no one will ever find any, but there could
342 * be bugs, database corruption, etc.
343 */
344 Datum
pg_check_visible(PG_FUNCTION_ARGS)345 pg_check_visible(PG_FUNCTION_ARGS)
346 {
347 FuncCallContext *funcctx;
348 corrupt_items *items;
349
350 if (SRF_IS_FIRSTCALL())
351 {
352 Oid relid = PG_GETARG_OID(0);
353 MemoryContext oldcontext;
354
355 funcctx = SRF_FIRSTCALL_INIT();
356 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
357 /* collect_corrupt_items will verify the relkind */
358 funcctx->user_fctx = collect_corrupt_items(relid, true, false);
359 MemoryContextSwitchTo(oldcontext);
360 }
361
362 funcctx = SRF_PERCALL_SETUP();
363 items = (corrupt_items *) funcctx->user_fctx;
364
365 if (items->next < items->count)
366 SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
367
368 SRF_RETURN_DONE(funcctx);
369 }
370
371 /*
372 * Remove the visibility map fork for a relation. If there turn out to be
373 * any bugs in the visibility map code that require rebuilding the VM, this
374 * provides users with a way to do it that is cleaner than shutting down the
375 * server and removing files by hand.
376 *
377 * This is a cut-down version of RelationTruncate.
378 */
379 Datum
pg_truncate_visibility_map(PG_FUNCTION_ARGS)380 pg_truncate_visibility_map(PG_FUNCTION_ARGS)
381 {
382 Oid relid = PG_GETARG_OID(0);
383 Relation rel;
384
385 rel = relation_open(relid, AccessExclusiveLock);
386
387 /* Only some relkinds have a visibility map */
388 check_relation_relkind(rel);
389
390 RelationOpenSmgr(rel);
391 rel->rd_smgr->smgr_vm_nblocks = InvalidBlockNumber;
392
393 visibilitymap_truncate(rel, 0);
394
395 if (RelationNeedsWAL(rel))
396 {
397 xl_smgr_truncate xlrec;
398
399 xlrec.blkno = 0;
400 xlrec.rnode = rel->rd_node;
401 xlrec.flags = SMGR_TRUNCATE_VM;
402
403 XLogBeginInsert();
404 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
405
406 XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
407 }
408
409 /*
410 * Release the lock right away, not at commit time.
411 *
412 * It would be a problem to release the lock prior to commit if this
413 * truncate operation sends any transactional invalidation messages. Other
414 * backends would potentially be able to lock the relation without
415 * processing them in the window of time between when we release the lock
416 * here and when we sent the messages at our eventual commit. However,
417 * we're currently only sending a non-transactional smgr invalidation,
418 * which will have been posted to shared memory immediately from within
419 * visibilitymap_truncate. Therefore, there should be no race here.
420 *
421 * The reason why it's desirable to release the lock early here is because
422 * of the possibility that someone will need to use this to blow away many
423 * visibility map forks at once. If we can't release the lock until
424 * commit time, the transaction doing this will accumulate
425 * AccessExclusiveLocks on all of those relations at the same time, which
426 * is undesirable. However, if this turns out to be unsafe we may have no
427 * choice...
428 */
429 relation_close(rel, AccessExclusiveLock);
430
431 /* Nothing to return. */
432 PG_RETURN_VOID();
433 }
434
435 /*
436 * Helper function to construct whichever TupleDesc we need for a particular
437 * call.
438 */
439 static TupleDesc
pg_visibility_tupdesc(bool include_blkno,bool include_pd)440 pg_visibility_tupdesc(bool include_blkno, bool include_pd)
441 {
442 TupleDesc tupdesc;
443 AttrNumber maxattr = 2;
444 AttrNumber a = 0;
445
446 if (include_blkno)
447 ++maxattr;
448 if (include_pd)
449 ++maxattr;
450 tupdesc = CreateTemplateTupleDesc(maxattr, false);
451 if (include_blkno)
452 TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
453 TupleDescInitEntry(tupdesc, ++a, "all_visible", BOOLOID, -1, 0);
454 TupleDescInitEntry(tupdesc, ++a, "all_frozen", BOOLOID, -1, 0);
455 if (include_pd)
456 TupleDescInitEntry(tupdesc, ++a, "pd_all_visible", BOOLOID, -1, 0);
457 Assert(a == maxattr);
458
459 return BlessTupleDesc(tupdesc);
460 }
461
462 /*
463 * Collect visibility data about a relation.
464 *
465 * Checks relkind of relid and will throw an error if the relation does not
466 * have a VM.
467 */
468 static vbits *
collect_visibility_data(Oid relid,bool include_pd)469 collect_visibility_data(Oid relid, bool include_pd)
470 {
471 Relation rel;
472 BlockNumber nblocks;
473 vbits *info;
474 BlockNumber blkno;
475 Buffer vmbuffer = InvalidBuffer;
476 BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
477
478 rel = relation_open(relid, AccessShareLock);
479
480 /* Only some relkinds have a visibility map */
481 check_relation_relkind(rel);
482
483 nblocks = RelationGetNumberOfBlocks(rel);
484 info = palloc0(offsetof(vbits, bits) + nblocks);
485 info->next = 0;
486 info->count = nblocks;
487
488 for (blkno = 0; blkno < nblocks; ++blkno)
489 {
490 int32 mapbits;
491
492 /* Make sure we are interruptible. */
493 CHECK_FOR_INTERRUPTS();
494
495 /* Get map info. */
496 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
497 if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
498 info->bits[blkno] |= (1 << 0);
499 if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
500 info->bits[blkno] |= (1 << 1);
501
502 /*
503 * Page-level data requires reading every block, so only get it if the
504 * caller needs it. Use a buffer access strategy, too, to prevent
505 * cache-trashing.
506 */
507 if (include_pd)
508 {
509 Buffer buffer;
510 Page page;
511
512 buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
513 bstrategy);
514 LockBuffer(buffer, BUFFER_LOCK_SHARE);
515
516 page = BufferGetPage(buffer);
517 if (PageIsAllVisible(page))
518 info->bits[blkno] |= (1 << 2);
519
520 UnlockReleaseBuffer(buffer);
521 }
522 }
523
524 /* Clean up. */
525 if (vmbuffer != InvalidBuffer)
526 ReleaseBuffer(vmbuffer);
527 relation_close(rel, AccessShareLock);
528
529 return info;
530 }
531
532 /*
533 * Returns a list of items whose visibility map information does not match
534 * the status of the tuples on the page.
535 *
536 * If all_visible is passed as true, this will include all items which are
537 * on pages marked as all-visible in the visibility map but which do not
538 * seem to in fact be all-visible.
539 *
540 * If all_frozen is passed as true, this will include all items which are
541 * on pages marked as all-frozen but which do not seem to in fact be frozen.
542 *
543 * Checks relkind of relid and will throw an error if the relation does not
544 * have a VM.
545 */
546 static corrupt_items *
collect_corrupt_items(Oid relid,bool all_visible,bool all_frozen)547 collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
548 {
549 Relation rel;
550 BlockNumber nblocks;
551 corrupt_items *items;
552 BlockNumber blkno;
553 Buffer vmbuffer = InvalidBuffer;
554 BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
555 TransactionId OldestXmin = InvalidTransactionId;
556
557 if (all_visible)
558 {
559 /* Don't pass rel; that will fail in recovery. */
560 OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
561 }
562
563 rel = relation_open(relid, AccessShareLock);
564
565 /* Only some relkinds have a visibility map */
566 check_relation_relkind(rel);
567
568 nblocks = RelationGetNumberOfBlocks(rel);
569
570 /*
571 * Guess an initial array size. We don't expect many corrupted tuples, so
572 * start with a small array. This function uses the "next" field to track
573 * the next offset where we can store an item (which is the same thing as
574 * the number of items found so far) and the "count" field to track the
575 * number of entries allocated. We'll repurpose these fields before
576 * returning.
577 */
578 items = palloc0(sizeof(corrupt_items));
579 items->next = 0;
580 items->count = 64;
581 items->tids = palloc(items->count * sizeof(ItemPointerData));
582
583 /* Loop over every block in the relation. */
584 for (blkno = 0; blkno < nblocks; ++blkno)
585 {
586 bool check_frozen = false;
587 bool check_visible = false;
588 Buffer buffer;
589 Page page;
590 OffsetNumber offnum,
591 maxoff;
592
593 /* Make sure we are interruptible. */
594 CHECK_FOR_INTERRUPTS();
595
596 /* Use the visibility map to decide whether to check this page. */
597 if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
598 check_frozen = true;
599 if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
600 check_visible = true;
601 if (!check_visible && !check_frozen)
602 continue;
603
604 /* Read and lock the page. */
605 buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
606 bstrategy);
607 LockBuffer(buffer, BUFFER_LOCK_SHARE);
608
609 page = BufferGetPage(buffer);
610 maxoff = PageGetMaxOffsetNumber(page);
611
612 /*
613 * The visibility map bits might have changed while we were acquiring
614 * the page lock. Recheck to avoid returning spurious results.
615 */
616 if (check_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
617 check_frozen = false;
618 if (check_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
619 check_visible = false;
620 if (!check_visible && !check_frozen)
621 {
622 UnlockReleaseBuffer(buffer);
623 continue;
624 }
625
626 /* Iterate over each tuple on the page. */
627 for (offnum = FirstOffsetNumber;
628 offnum <= maxoff;
629 offnum = OffsetNumberNext(offnum))
630 {
631 HeapTupleData tuple;
632 ItemId itemid;
633
634 itemid = PageGetItemId(page, offnum);
635
636 /* Unused or redirect line pointers are of no interest. */
637 if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
638 continue;
639
640 /* Dead line pointers are neither all-visible nor frozen. */
641 if (ItemIdIsDead(itemid))
642 {
643 ItemPointerSet(&(tuple.t_self), blkno, offnum);
644 record_corrupt_item(items, &tuple.t_self);
645 continue;
646 }
647
648 /* Initialize a HeapTupleData structure for checks below. */
649 ItemPointerSet(&(tuple.t_self), blkno, offnum);
650 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
651 tuple.t_len = ItemIdGetLength(itemid);
652 tuple.t_tableOid = relid;
653
654 /*
655 * If we're checking whether the page is all-visible, we expect
656 * the tuple to be all-visible.
657 */
658 if (check_visible &&
659 !tuple_all_visible(&tuple, OldestXmin, buffer))
660 {
661 TransactionId RecomputedOldestXmin;
662
663 /*
664 * Time has passed since we computed OldestXmin, so it's
665 * possible that this tuple is all-visible in reality even
666 * though it doesn't appear so based on our
667 * previously-computed value. Let's compute a new value so we
668 * can be certain whether there is a problem.
669 *
670 * From a concurrency point of view, it sort of sucks to
671 * retake ProcArrayLock here while we're holding the buffer
672 * exclusively locked, but it should be safe against
673 * deadlocks, because surely GetOldestXmin() should never take
674 * a buffer lock. And this shouldn't happen often, so it's
675 * worth being careful so as to avoid false positives.
676 */
677 RecomputedOldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
678
679 if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
680 record_corrupt_item(items, &tuple.t_self);
681 else
682 {
683 OldestXmin = RecomputedOldestXmin;
684 if (!tuple_all_visible(&tuple, OldestXmin, buffer))
685 record_corrupt_item(items, &tuple.t_self);
686 }
687 }
688
689 /*
690 * If we're checking whether the page is all-frozen, we expect the
691 * tuple to be in a state where it will never need freezing.
692 */
693 if (check_frozen)
694 {
695 if (heap_tuple_needs_eventual_freeze(tuple.t_data))
696 record_corrupt_item(items, &tuple.t_self);
697 }
698 }
699
700 UnlockReleaseBuffer(buffer);
701 }
702
703 /* Clean up. */
704 if (vmbuffer != InvalidBuffer)
705 ReleaseBuffer(vmbuffer);
706 relation_close(rel, AccessShareLock);
707
708 /*
709 * Before returning, repurpose the fields to match caller's expectations.
710 * next is now the next item that should be read (rather than written) and
711 * count is now the number of items we wrote (rather than the number we
712 * allocated).
713 */
714 items->count = items->next;
715 items->next = 0;
716
717 return items;
718 }
719
720 /*
721 * Remember one corrupt item.
722 */
723 static void
record_corrupt_item(corrupt_items * items,ItemPointer tid)724 record_corrupt_item(corrupt_items *items, ItemPointer tid)
725 {
726 /* enlarge output array if needed. */
727 if (items->next >= items->count)
728 {
729 items->count *= 2;
730 items->tids = repalloc(items->tids,
731 items->count * sizeof(ItemPointerData));
732 }
733 /* and add the new item */
734 items->tids[items->next++] = *tid;
735 }
736
737 /*
738 * Check whether a tuple is all-visible relative to a given OldestXmin value.
739 * The buffer should contain the tuple and should be locked and pinned.
740 */
741 static bool
tuple_all_visible(HeapTuple tup,TransactionId OldestXmin,Buffer buffer)742 tuple_all_visible(HeapTuple tup, TransactionId OldestXmin, Buffer buffer)
743 {
744 HTSV_Result state;
745 TransactionId xmin;
746
747 state = HeapTupleSatisfiesVacuum(tup, OldestXmin, buffer);
748 if (state != HEAPTUPLE_LIVE)
749 return false; /* all-visible implies live */
750
751 /*
752 * Neither lazy_scan_heap nor heap_page_is_all_visible will mark a page
753 * all-visible unless every tuple is hinted committed. However, those hint
754 * bits could be lost after a crash, so we can't be certain that they'll
755 * be set here. So just check the xmin.
756 */
757
758 xmin = HeapTupleHeaderGetXmin(tup->t_data);
759 if (!TransactionIdPrecedes(xmin, OldestXmin))
760 return false; /* xmin not old enough for all to see */
761
762 return true;
763 }
764
765 /*
766 * check_relation_relkind - convenience routine to check that relation
767 * is of the relkind supported by the callers
768 */
769 static void
check_relation_relkind(Relation rel)770 check_relation_relkind(Relation rel)
771 {
772 if (rel->rd_rel->relkind != RELKIND_RELATION &&
773 rel->rd_rel->relkind != RELKIND_MATVIEW &&
774 rel->rd_rel->relkind != RELKIND_TOASTVALUE)
775 ereport(ERROR,
776 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
777 errmsg("\"%s\" is not a table, materialized view, or TOAST table",
778 RelationGetRelationName(rel))));
779 }
780