1 /*-------------------------------------------------------------------------
2 *
3 * pg_visibility.c
4 * display visibility map information and page-level visibility bits
5 *
6 * Copyright (c) 2016, PostgreSQL Global Development Group
7 *
8 * contrib/pg_visibility/pg_visibility.c
9 *-------------------------------------------------------------------------
10 */
11 #include "postgres.h"
12
13 #include "access/htup_details.h"
14 #include "access/visibilitymap.h"
15 #include "catalog/pg_type.h"
16 #include "catalog/storage_xlog.h"
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 #include "storage/bufmgr.h"
20 #include "storage/procarray.h"
21 #include "storage/smgr.h"
22 #include "utils/rel.h"
23
24 PG_MODULE_MAGIC;
25
26 typedef struct vbits
27 {
28 BlockNumber next;
29 BlockNumber count;
30 uint8 bits[FLEXIBLE_ARRAY_MEMBER];
31 } vbits;
32
33 typedef struct corrupt_items
34 {
35 BlockNumber next;
36 BlockNumber count;
37 ItemPointer tids;
38 } corrupt_items;
39
40 PG_FUNCTION_INFO_V1(pg_visibility_map);
41 PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
42 PG_FUNCTION_INFO_V1(pg_visibility);
43 PG_FUNCTION_INFO_V1(pg_visibility_rel);
44 PG_FUNCTION_INFO_V1(pg_visibility_map_summary);
45 PG_FUNCTION_INFO_V1(pg_check_frozen);
46 PG_FUNCTION_INFO_V1(pg_check_visible);
47 PG_FUNCTION_INFO_V1(pg_truncate_visibility_map);
48
49 static TupleDesc pg_visibility_tupdesc(bool include_blkno, bool include_pd);
50 static vbits *collect_visibility_data(Oid relid, bool include_pd);
51 static corrupt_items *collect_corrupt_items(Oid relid, bool all_visible,
52 bool all_frozen);
53 static void record_corrupt_item(corrupt_items *items, ItemPointer tid);
54 static bool tuple_all_visible(HeapTuple tup, TransactionId OldestXmin,
55 Buffer buffer);
56
57 /*
58 * Visibility map information for a single block of a relation.
59 *
60 * Note: the VM code will silently return zeroes for pages past the end
61 * of the map, so we allow probes up to MaxBlockNumber regardless of the
62 * actual relation size.
63 */
64 Datum
pg_visibility_map(PG_FUNCTION_ARGS)65 pg_visibility_map(PG_FUNCTION_ARGS)
66 {
67 Oid relid = PG_GETARG_OID(0);
68 int64 blkno = PG_GETARG_INT64(1);
69 int32 mapbits;
70 Relation rel;
71 Buffer vmbuffer = InvalidBuffer;
72 TupleDesc tupdesc;
73 Datum values[2];
74 bool nulls[2];
75
76 rel = relation_open(relid, AccessShareLock);
77
78 if (blkno < 0 || blkno > MaxBlockNumber)
79 ereport(ERROR,
80 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
81 errmsg("invalid block number")));
82
83 tupdesc = pg_visibility_tupdesc(false, false);
84 MemSet(nulls, 0, sizeof(nulls));
85
86 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
87 if (vmbuffer != InvalidBuffer)
88 ReleaseBuffer(vmbuffer);
89 values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
90 values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
91
92 relation_close(rel, AccessShareLock);
93
94 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
95 }
96
97 /*
98 * Visibility map information for a single block of a relation, plus the
99 * page-level information for the same block.
100 */
101 Datum
pg_visibility(PG_FUNCTION_ARGS)102 pg_visibility(PG_FUNCTION_ARGS)
103 {
104 Oid relid = PG_GETARG_OID(0);
105 int64 blkno = PG_GETARG_INT64(1);
106 int32 mapbits;
107 Relation rel;
108 Buffer vmbuffer = InvalidBuffer;
109 Buffer buffer;
110 Page page;
111 TupleDesc tupdesc;
112 Datum values[3];
113 bool nulls[3];
114
115 rel = relation_open(relid, AccessShareLock);
116
117 if (blkno < 0 || blkno > MaxBlockNumber)
118 ereport(ERROR,
119 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
120 errmsg("invalid block number")));
121
122 tupdesc = pg_visibility_tupdesc(false, true);
123 MemSet(nulls, 0, sizeof(nulls));
124
125 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
126 if (vmbuffer != InvalidBuffer)
127 ReleaseBuffer(vmbuffer);
128 values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
129 values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
130
131 /* Here we have to explicitly check rel size ... */
132 if (blkno < RelationGetNumberOfBlocks(rel))
133 {
134 buffer = ReadBuffer(rel, blkno);
135 LockBuffer(buffer, BUFFER_LOCK_SHARE);
136
137 page = BufferGetPage(buffer);
138 values[2] = BoolGetDatum(PageIsAllVisible(page));
139
140 UnlockReleaseBuffer(buffer);
141 }
142 else
143 {
144 /* As with the vismap, silently return 0 for pages past EOF */
145 values[2] = BoolGetDatum(false);
146 }
147
148 relation_close(rel, AccessShareLock);
149
150 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
151 }
152
153 /*
154 * Visibility map information for every block in a relation.
155 */
156 Datum
pg_visibility_map_rel(PG_FUNCTION_ARGS)157 pg_visibility_map_rel(PG_FUNCTION_ARGS)
158 {
159 FuncCallContext *funcctx;
160 vbits *info;
161
162 if (SRF_IS_FIRSTCALL())
163 {
164 Oid relid = PG_GETARG_OID(0);
165 MemoryContext oldcontext;
166
167 funcctx = SRF_FIRSTCALL_INIT();
168 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
169 funcctx->tuple_desc = pg_visibility_tupdesc(true, false);
170 funcctx->user_fctx = collect_visibility_data(relid, false);
171 MemoryContextSwitchTo(oldcontext);
172 }
173
174 funcctx = SRF_PERCALL_SETUP();
175 info = (vbits *) funcctx->user_fctx;
176
177 if (info->next < info->count)
178 {
179 Datum values[3];
180 bool nulls[3];
181 HeapTuple tuple;
182
183 MemSet(nulls, 0, sizeof(nulls));
184 values[0] = Int64GetDatum(info->next);
185 values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
186 values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
187 info->next++;
188
189 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
190 SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
191 }
192
193 SRF_RETURN_DONE(funcctx);
194 }
195
196 /*
197 * Visibility map information for every block in a relation, plus the page
198 * level information for each block.
199 */
200 Datum
pg_visibility_rel(PG_FUNCTION_ARGS)201 pg_visibility_rel(PG_FUNCTION_ARGS)
202 {
203 FuncCallContext *funcctx;
204 vbits *info;
205
206 if (SRF_IS_FIRSTCALL())
207 {
208 Oid relid = PG_GETARG_OID(0);
209 MemoryContext oldcontext;
210
211 funcctx = SRF_FIRSTCALL_INIT();
212 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
213 funcctx->tuple_desc = pg_visibility_tupdesc(true, true);
214 funcctx->user_fctx = collect_visibility_data(relid, true);
215 MemoryContextSwitchTo(oldcontext);
216 }
217
218 funcctx = SRF_PERCALL_SETUP();
219 info = (vbits *) funcctx->user_fctx;
220
221 if (info->next < info->count)
222 {
223 Datum values[4];
224 bool nulls[4];
225 HeapTuple tuple;
226
227 MemSet(nulls, 0, sizeof(nulls));
228 values[0] = Int64GetDatum(info->next);
229 values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
230 values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
231 values[3] = BoolGetDatum((info->bits[info->next] & (1 << 2)) != 0);
232 info->next++;
233
234 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
235 SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
236 }
237
238 SRF_RETURN_DONE(funcctx);
239 }
240
241 /*
242 * Count the number of all-visible and all-frozen pages in the visibility
243 * map for a particular relation.
244 */
245 Datum
pg_visibility_map_summary(PG_FUNCTION_ARGS)246 pg_visibility_map_summary(PG_FUNCTION_ARGS)
247 {
248 Oid relid = PG_GETARG_OID(0);
249 Relation rel;
250 BlockNumber nblocks;
251 BlockNumber blkno;
252 Buffer vmbuffer = InvalidBuffer;
253 int64 all_visible = 0;
254 int64 all_frozen = 0;
255 TupleDesc tupdesc;
256 Datum values[2];
257 bool nulls[2];
258
259 rel = relation_open(relid, AccessShareLock);
260 nblocks = RelationGetNumberOfBlocks(rel);
261
262 for (blkno = 0; blkno < nblocks; ++blkno)
263 {
264 int32 mapbits;
265
266 /* Make sure we are interruptible. */
267 CHECK_FOR_INTERRUPTS();
268
269 /* Get map info. */
270 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
271 if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
272 ++all_visible;
273 if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
274 ++all_frozen;
275 }
276
277 /* Clean up. */
278 if (vmbuffer != InvalidBuffer)
279 ReleaseBuffer(vmbuffer);
280 relation_close(rel, AccessShareLock);
281
282 tupdesc = CreateTemplateTupleDesc(2, false);
283 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "all_visible", INT8OID, -1, 0);
284 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "all_frozen", INT8OID, -1, 0);
285 tupdesc = BlessTupleDesc(tupdesc);
286
287 MemSet(nulls, 0, sizeof(nulls));
288 values[0] = Int64GetDatum(all_visible);
289 values[1] = Int64GetDatum(all_frozen);
290
291 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
292 }
293
294 /*
295 * Return the TIDs of non-frozen tuples present in pages marked all-frozen
296 * in the visibility map. We hope no one will ever find any, but there could
297 * be bugs, database corruption, etc.
298 */
299 Datum
pg_check_frozen(PG_FUNCTION_ARGS)300 pg_check_frozen(PG_FUNCTION_ARGS)
301 {
302 FuncCallContext *funcctx;
303 corrupt_items *items;
304
305 if (SRF_IS_FIRSTCALL())
306 {
307 Oid relid = PG_GETARG_OID(0);
308 MemoryContext oldcontext;
309
310 funcctx = SRF_FIRSTCALL_INIT();
311 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
312 funcctx->user_fctx = collect_corrupt_items(relid, false, true);
313 MemoryContextSwitchTo(oldcontext);
314 }
315
316 funcctx = SRF_PERCALL_SETUP();
317 items = (corrupt_items *) funcctx->user_fctx;
318
319 if (items->next < items->count)
320 SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
321
322 SRF_RETURN_DONE(funcctx);
323 }
324
325 /*
326 * Return the TIDs of not-all-visible tuples in pages marked all-visible
327 * in the visibility map. We hope no one will ever find any, but there could
328 * be bugs, database corruption, etc.
329 */
330 Datum
pg_check_visible(PG_FUNCTION_ARGS)331 pg_check_visible(PG_FUNCTION_ARGS)
332 {
333 FuncCallContext *funcctx;
334 corrupt_items *items;
335
336 if (SRF_IS_FIRSTCALL())
337 {
338 Oid relid = PG_GETARG_OID(0);
339 MemoryContext oldcontext;
340
341 funcctx = SRF_FIRSTCALL_INIT();
342 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
343 funcctx->user_fctx = collect_corrupt_items(relid, true, false);
344 MemoryContextSwitchTo(oldcontext);
345 }
346
347 funcctx = SRF_PERCALL_SETUP();
348 items = (corrupt_items *) funcctx->user_fctx;
349
350 if (items->next < items->count)
351 SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
352
353 SRF_RETURN_DONE(funcctx);
354 }
355
356 /*
357 * Remove the visibility map fork for a relation. If there turn out to be
358 * any bugs in the visibility map code that require rebuilding the VM, this
359 * provides users with a way to do it that is cleaner than shutting down the
360 * server and removing files by hand.
361 *
362 * This is a cut-down version of RelationTruncate.
363 */
364 Datum
pg_truncate_visibility_map(PG_FUNCTION_ARGS)365 pg_truncate_visibility_map(PG_FUNCTION_ARGS)
366 {
367 Oid relid = PG_GETARG_OID(0);
368 Relation rel;
369
370 rel = relation_open(relid, AccessExclusiveLock);
371
372 if (rel->rd_rel->relkind != RELKIND_RELATION &&
373 rel->rd_rel->relkind != RELKIND_MATVIEW &&
374 rel->rd_rel->relkind != RELKIND_TOASTVALUE)
375 ereport(ERROR,
376 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
377 errmsg("\"%s\" is not a table, materialized view, or TOAST table",
378 RelationGetRelationName(rel))));
379
380 RelationOpenSmgr(rel);
381 rel->rd_smgr->smgr_vm_nblocks = InvalidBlockNumber;
382
383 visibilitymap_truncate(rel, 0);
384
385 if (RelationNeedsWAL(rel))
386 {
387 xl_smgr_truncate xlrec;
388
389 xlrec.blkno = 0;
390 xlrec.rnode = rel->rd_node;
391 xlrec.flags = SMGR_TRUNCATE_VM;
392
393 XLogBeginInsert();
394 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
395
396 XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
397 }
398
399 /*
400 * Release the lock right away, not at commit time.
401 *
402 * It would be a problem to release the lock prior to commit if this
403 * truncate operation sends any transactional invalidation messages. Other
404 * backends would potentially be able to lock the relation without
405 * processing them in the window of time between when we release the lock
406 * here and when we sent the messages at our eventual commit. However,
407 * we're currently only sending a non-transactional smgr invalidation,
408 * which will have been posted to shared memory immediately from within
409 * visibilitymap_truncate. Therefore, there should be no race here.
410 *
411 * The reason why it's desirable to release the lock early here is because
412 * of the possibility that someone will need to use this to blow away many
413 * visibility map forks at once. If we can't release the lock until
414 * commit time, the transaction doing this will accumulate
415 * AccessExclusiveLocks on all of those relations at the same time, which
416 * is undesirable. However, if this turns out to be unsafe we may have no
417 * choice...
418 */
419 relation_close(rel, AccessExclusiveLock);
420
421 /* Nothing to return. */
422 PG_RETURN_VOID();
423 }
424
425 /*
426 * Helper function to construct whichever TupleDesc we need for a particular
427 * call.
428 */
429 static TupleDesc
pg_visibility_tupdesc(bool include_blkno,bool include_pd)430 pg_visibility_tupdesc(bool include_blkno, bool include_pd)
431 {
432 TupleDesc tupdesc;
433 AttrNumber maxattr = 2;
434 AttrNumber a = 0;
435
436 if (include_blkno)
437 ++maxattr;
438 if (include_pd)
439 ++maxattr;
440 tupdesc = CreateTemplateTupleDesc(maxattr, false);
441 if (include_blkno)
442 TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
443 TupleDescInitEntry(tupdesc, ++a, "all_visible", BOOLOID, -1, 0);
444 TupleDescInitEntry(tupdesc, ++a, "all_frozen", BOOLOID, -1, 0);
445 if (include_pd)
446 TupleDescInitEntry(tupdesc, ++a, "pd_all_visible", BOOLOID, -1, 0);
447 Assert(a == maxattr);
448
449 return BlessTupleDesc(tupdesc);
450 }
451
452 /*
453 * Collect visibility data about a relation.
454 */
455 static vbits *
collect_visibility_data(Oid relid,bool include_pd)456 collect_visibility_data(Oid relid, bool include_pd)
457 {
458 Relation rel;
459 BlockNumber nblocks;
460 vbits *info;
461 BlockNumber blkno;
462 Buffer vmbuffer = InvalidBuffer;
463 BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
464
465 rel = relation_open(relid, AccessShareLock);
466
467 nblocks = RelationGetNumberOfBlocks(rel);
468 info = palloc0(offsetof(vbits, bits) +nblocks);
469 info->next = 0;
470 info->count = nblocks;
471
472 for (blkno = 0; blkno < nblocks; ++blkno)
473 {
474 int32 mapbits;
475
476 /* Make sure we are interruptible. */
477 CHECK_FOR_INTERRUPTS();
478
479 /* Get map info. */
480 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
481 if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
482 info->bits[blkno] |= (1 << 0);
483 if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
484 info->bits[blkno] |= (1 << 1);
485
486 /*
487 * Page-level data requires reading every block, so only get it if the
488 * caller needs it. Use a buffer access strategy, too, to prevent
489 * cache-trashing.
490 */
491 if (include_pd)
492 {
493 Buffer buffer;
494 Page page;
495
496 buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
497 bstrategy);
498 LockBuffer(buffer, BUFFER_LOCK_SHARE);
499
500 page = BufferGetPage(buffer);
501 if (PageIsAllVisible(page))
502 info->bits[blkno] |= (1 << 2);
503
504 UnlockReleaseBuffer(buffer);
505 }
506 }
507
508 /* Clean up. */
509 if (vmbuffer != InvalidBuffer)
510 ReleaseBuffer(vmbuffer);
511 relation_close(rel, AccessShareLock);
512
513 return info;
514 }
515
516 /*
517 * Returns a list of items whose visibility map information does not match
518 * the status of the tuples on the page.
519 *
520 * If all_visible is passed as true, this will include all items which are
521 * on pages marked as all-visible in the visibility map but which do not
522 * seem to in fact be all-visible.
523 *
524 * If all_frozen is passed as true, this will include all items which are
525 * on pages marked as all-frozen but which do not seem to in fact be frozen.
526 */
527 static corrupt_items *
collect_corrupt_items(Oid relid,bool all_visible,bool all_frozen)528 collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
529 {
530 Relation rel;
531 BlockNumber nblocks;
532 corrupt_items *items;
533 BlockNumber blkno;
534 Buffer vmbuffer = InvalidBuffer;
535 BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
536 TransactionId OldestXmin = InvalidTransactionId;
537
538 if (all_visible)
539 {
540 /* Don't pass rel; that will fail in recovery. */
541 OldestXmin = GetOldestXmin(NULL, true);
542 }
543
544 rel = relation_open(relid, AccessShareLock);
545
546 if (rel->rd_rel->relkind != RELKIND_RELATION &&
547 rel->rd_rel->relkind != RELKIND_MATVIEW &&
548 rel->rd_rel->relkind != RELKIND_TOASTVALUE)
549 ereport(ERROR,
550 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
551 errmsg("\"%s\" is not a table, materialized view, or TOAST table",
552 RelationGetRelationName(rel))));
553
554 nblocks = RelationGetNumberOfBlocks(rel);
555
556 /*
557 * Guess an initial array size. We don't expect many corrupted tuples, so
558 * start with a small array. This function uses the "next" field to track
559 * the next offset where we can store an item (which is the same thing as
560 * the number of items found so far) and the "count" field to track the
561 * number of entries allocated. We'll repurpose these fields before
562 * returning.
563 */
564 items = palloc0(sizeof(corrupt_items));
565 items->next = 0;
566 items->count = 64;
567 items->tids = palloc(items->count * sizeof(ItemPointerData));
568
569 /* Loop over every block in the relation. */
570 for (blkno = 0; blkno < nblocks; ++blkno)
571 {
572 bool check_frozen = false;
573 bool check_visible = false;
574 Buffer buffer;
575 Page page;
576 OffsetNumber offnum,
577 maxoff;
578
579 /* Make sure we are interruptible. */
580 CHECK_FOR_INTERRUPTS();
581
582 /* Use the visibility map to decide whether to check this page. */
583 if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
584 check_frozen = true;
585 if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
586 check_visible = true;
587 if (!check_visible && !check_frozen)
588 continue;
589
590 /* Read and lock the page. */
591 buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
592 bstrategy);
593 LockBuffer(buffer, BUFFER_LOCK_SHARE);
594
595 page = BufferGetPage(buffer);
596 maxoff = PageGetMaxOffsetNumber(page);
597
598 /*
599 * The visibility map bits might have changed while we were acquiring
600 * the page lock. Recheck to avoid returning spurious results.
601 */
602 if (check_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
603 check_frozen = false;
604 if (check_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
605 check_visible = false;
606 if (!check_visible && !check_frozen)
607 {
608 UnlockReleaseBuffer(buffer);
609 continue;
610 }
611
612 /* Iterate over each tuple on the page. */
613 for (offnum = FirstOffsetNumber;
614 offnum <= maxoff;
615 offnum = OffsetNumberNext(offnum))
616 {
617 HeapTupleData tuple;
618 ItemId itemid;
619
620 itemid = PageGetItemId(page, offnum);
621
622 /* Unused or redirect line pointers are of no interest. */
623 if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
624 continue;
625
626 /* Dead line pointers are neither all-visible nor frozen. */
627 if (ItemIdIsDead(itemid))
628 {
629 ItemPointerSet(&(tuple.t_self), blkno, offnum);
630 record_corrupt_item(items, &tuple.t_self);
631 continue;
632 }
633
634 /* Initialize a HeapTupleData structure for checks below. */
635 ItemPointerSet(&(tuple.t_self), blkno, offnum);
636 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
637 tuple.t_len = ItemIdGetLength(itemid);
638 tuple.t_tableOid = relid;
639
640 /*
641 * If we're checking whether the page is all-visible, we expect
642 * the tuple to be all-visible.
643 */
644 if (check_visible &&
645 !tuple_all_visible(&tuple, OldestXmin, buffer))
646 {
647 TransactionId RecomputedOldestXmin;
648
649 /*
650 * Time has passed since we computed OldestXmin, so it's
651 * possible that this tuple is all-visible in reality even
652 * though it doesn't appear so based on our
653 * previously-computed value. Let's compute a new value so we
654 * can be certain whether there is a problem.
655 *
656 * From a concurrency point of view, it sort of sucks to
657 * retake ProcArrayLock here while we're holding the buffer
658 * exclusively locked, but it should be safe against
659 * deadlocks, because surely GetOldestXmin() should never take
660 * a buffer lock. And this shouldn't happen often, so it's
661 * worth being careful so as to avoid false positives.
662 */
663 RecomputedOldestXmin = GetOldestXmin(NULL, true);
664
665 if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
666 record_corrupt_item(items, &tuple.t_self);
667 else
668 {
669 OldestXmin = RecomputedOldestXmin;
670 if (!tuple_all_visible(&tuple, OldestXmin, buffer))
671 record_corrupt_item(items, &tuple.t_self);
672 }
673 }
674
675 /*
676 * If we're checking whether the page is all-frozen, we expect the
677 * tuple to be in a state where it will never need freezing.
678 */
679 if (check_frozen)
680 {
681 if (heap_tuple_needs_eventual_freeze(tuple.t_data))
682 record_corrupt_item(items, &tuple.t_self);
683 }
684 }
685
686 UnlockReleaseBuffer(buffer);
687 }
688
689 /* Clean up. */
690 if (vmbuffer != InvalidBuffer)
691 ReleaseBuffer(vmbuffer);
692 relation_close(rel, AccessShareLock);
693
694 /*
695 * Before returning, repurpose the fields to match caller's expectations.
696 * next is now the next item that should be read (rather than written) and
697 * count is now the number of items we wrote (rather than the number we
698 * allocated).
699 */
700 items->count = items->next;
701 items->next = 0;
702
703 return items;
704 }
705
706 /*
707 * Remember one corrupt item.
708 */
709 static void
record_corrupt_item(corrupt_items * items,ItemPointer tid)710 record_corrupt_item(corrupt_items *items, ItemPointer tid)
711 {
712 /* enlarge output array if needed. */
713 if (items->next >= items->count)
714 {
715 items->count *= 2;
716 items->tids = repalloc(items->tids,
717 items->count * sizeof(ItemPointerData));
718 }
719 /* and add the new item */
720 items->tids[items->next++] = *tid;
721 }
722
723 /*
724 * Check whether a tuple is all-visible relative to a given OldestXmin value.
725 * The buffer should contain the tuple and should be locked and pinned.
726 */
727 static bool
tuple_all_visible(HeapTuple tup,TransactionId OldestXmin,Buffer buffer)728 tuple_all_visible(HeapTuple tup, TransactionId OldestXmin, Buffer buffer)
729 {
730 HTSV_Result state;
731 TransactionId xmin;
732
733 state = HeapTupleSatisfiesVacuum(tup, OldestXmin, buffer);
734 if (state != HEAPTUPLE_LIVE)
735 return false; /* all-visible implies live */
736
737 /*
738 * Neither lazy_scan_heap nor heap_page_is_all_visible will mark a page
739 * all-visible unless every tuple is hinted committed. However, those hint
740 * bits could be lost after a crash, so we can't be certain that they'll
741 * be set here. So just check the xmin.
742 */
743
744 xmin = HeapTupleHeaderGetXmin(tup->t_data);
745 if (!TransactionIdPrecedes(xmin, OldestXmin))
746 return false; /* xmin not old enough for all to see */
747
748 return true;
749 }
750