1 /*-------------------------------------------------------------------------
2  *
3  * bufpage.c
4  *	  POSTGRES standard buffer page code.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/page/bufpage.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/itup.h"
19 #include "access/xlog.h"
20 #include "pgstat.h"
21 #include "storage/checksum.h"
22 #include "utils/memdebug.h"
23 #include "utils/memutils.h"
24 
25 
26 /* GUC variable */
27 bool		ignore_checksum_failure = false;
28 
29 
30 /* ----------------------------------------------------------------
31  *						Page support functions
32  * ----------------------------------------------------------------
33  */
34 
35 /*
36  * PageInit
37  *		Initializes the contents of a page.
38  *		Note that we don't calculate an initial checksum here; that's not done
39  *		until it's time to write.
40  */
41 void
PageInit(Page page,Size pageSize,Size specialSize)42 PageInit(Page page, Size pageSize, Size specialSize)
43 {
44 	PageHeader	p = (PageHeader) page;
45 
46 	specialSize = MAXALIGN(specialSize);
47 
48 	Assert(pageSize == BLCKSZ);
49 	Assert(pageSize > specialSize + SizeOfPageHeaderData);
50 
51 	/* Make sure all fields of page are zero, as well as unused space */
52 	MemSet(p, 0, pageSize);
53 
54 	p->pd_flags = 0;
55 	p->pd_lower = SizeOfPageHeaderData;
56 	p->pd_upper = pageSize - specialSize;
57 	p->pd_special = pageSize - specialSize;
58 	PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
59 	/* p->pd_prune_xid = InvalidTransactionId;		done by above MemSet */
60 }
61 
62 
63 /*
64  * PageIsVerifiedExtended
65  *		Check that the page header and checksum (if any) appear valid.
66  *
67  * This is called when a page has just been read in from disk.  The idea is
68  * to cheaply detect trashed pages before we go nuts following bogus line
69  * pointers, testing invalid transaction identifiers, etc.
70  *
71  * It turns out to be necessary to allow zeroed pages here too.  Even though
72  * this routine is *not* called when deliberately adding a page to a relation,
73  * there are scenarios in which a zeroed page might be found in a table.
74  * (Example: a backend extends a relation, then crashes before it can write
75  * any WAL entry about the new page.  The kernel will already have the
76  * zeroed page in the file, and it will stay that way after restart.)  So we
77  * allow zeroed pages here, and are careful that the page access macros
78  * treat such a page as empty and without free space.  Eventually, VACUUM
79  * will clean up such a page and make it usable.
80  *
81  * If flag PIV_LOG_WARNING is set, a WARNING is logged in the event of
82  * a checksum failure.
83  *
84  * If flag PIV_REPORT_STAT is set, a checksum failure is reported directly
85  * to pgstat.
86  */
87 bool
PageIsVerifiedExtended(Page page,BlockNumber blkno,int flags)88 PageIsVerifiedExtended(Page page, BlockNumber blkno, int flags)
89 {
90 	PageHeader	p = (PageHeader) page;
91 	size_t	   *pagebytes;
92 	int			i;
93 	bool		checksum_failure = false;
94 	bool		header_sane = false;
95 	bool		all_zeroes = false;
96 	uint16		checksum = 0;
97 
98 	/*
99 	 * Don't verify page data unless the page passes basic non-zero test
100 	 */
101 	if (!PageIsNew(page))
102 	{
103 		if (DataChecksumsEnabled())
104 		{
105 			checksum = pg_checksum_page((char *) page, blkno);
106 
107 			if (checksum != p->pd_checksum)
108 				checksum_failure = true;
109 		}
110 
111 		/*
112 		 * The following checks don't prove the header is correct, only that
113 		 * it looks sane enough to allow into the buffer pool. Later usage of
114 		 * the block can still reveal problems, which is why we offer the
115 		 * checksum option.
116 		 */
117 		if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
118 			p->pd_lower <= p->pd_upper &&
119 			p->pd_upper <= p->pd_special &&
120 			p->pd_special <= BLCKSZ &&
121 			p->pd_special == MAXALIGN(p->pd_special))
122 			header_sane = true;
123 
124 		if (header_sane && !checksum_failure)
125 			return true;
126 	}
127 
128 	/* Check all-zeroes case */
129 	all_zeroes = true;
130 	pagebytes = (size_t *) page;
131 	for (i = 0; i < (BLCKSZ / sizeof(size_t)); i++)
132 	{
133 		if (pagebytes[i] != 0)
134 		{
135 			all_zeroes = false;
136 			break;
137 		}
138 	}
139 
140 	if (all_zeroes)
141 		return true;
142 
143 	/*
144 	 * Throw a WARNING if the checksum fails, but only after we've checked for
145 	 * the all-zeroes case.
146 	 */
147 	if (checksum_failure)
148 	{
149 		if ((flags & PIV_LOG_WARNING) != 0)
150 			ereport(WARNING,
151 					(errcode(ERRCODE_DATA_CORRUPTED),
152 					 errmsg("page verification failed, calculated checksum %u but expected %u",
153 							checksum, p->pd_checksum)));
154 
155 		if ((flags & PIV_REPORT_STAT) != 0)
156 			pgstat_report_checksum_failure();
157 
158 		if (header_sane && ignore_checksum_failure)
159 			return true;
160 	}
161 
162 	return false;
163 }
164 
165 
166 /*
167  *	PageAddItemExtended
168  *
169  *	Add an item to a page.  Return value is the offset at which it was
170  *	inserted, or InvalidOffsetNumber if the item is not inserted for any
171  *	reason.  A WARNING is issued indicating the reason for the refusal.
172  *
173  *	offsetNumber must be either InvalidOffsetNumber to specify finding a
174  *	free line pointer, or a value between FirstOffsetNumber and one past
175  *	the last existing item, to specify using that particular line pointer.
176  *
177  *	If offsetNumber is valid and flag PAI_OVERWRITE is set, we just store
178  *	the item at the specified offsetNumber, which must be either a
179  *	currently-unused line pointer, or one past the last existing item.
180  *
181  *	If offsetNumber is valid and flag PAI_OVERWRITE is not set, insert
182  *	the item at the specified offsetNumber, moving existing items later
183  *	in the array to make room.
184  *
185  *	If offsetNumber is not valid, then assign a slot by finding the first
186  *	one that is both unused and deallocated.
187  *
188  *	If flag PAI_IS_HEAP is set, we enforce that there can't be more than
189  *	MaxHeapTuplesPerPage line pointers on the page.
190  *
191  *	!!! EREPORT(ERROR) IS DISALLOWED HERE !!!
192  */
193 OffsetNumber
PageAddItemExtended(Page page,Item item,Size size,OffsetNumber offsetNumber,int flags)194 PageAddItemExtended(Page page,
195 					Item item,
196 					Size size,
197 					OffsetNumber offsetNumber,
198 					int flags)
199 {
200 	PageHeader	phdr = (PageHeader) page;
201 	Size		alignedSize;
202 	int			lower;
203 	int			upper;
204 	ItemId		itemId;
205 	OffsetNumber limit;
206 	bool		needshuffle = false;
207 
208 	/*
209 	 * Be wary about corrupted page pointers
210 	 */
211 	if (phdr->pd_lower < SizeOfPageHeaderData ||
212 		phdr->pd_lower > phdr->pd_upper ||
213 		phdr->pd_upper > phdr->pd_special ||
214 		phdr->pd_special > BLCKSZ)
215 		ereport(PANIC,
216 				(errcode(ERRCODE_DATA_CORRUPTED),
217 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
218 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
219 
220 	/*
221 	 * Select offsetNumber to place the new item at
222 	 */
223 	limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
224 
225 	/* was offsetNumber passed in? */
226 	if (OffsetNumberIsValid(offsetNumber))
227 	{
228 		/* yes, check it */
229 		if ((flags & PAI_OVERWRITE) != 0)
230 		{
231 			if (offsetNumber < limit)
232 			{
233 				itemId = PageGetItemId(phdr, offsetNumber);
234 				if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
235 				{
236 					elog(WARNING, "will not overwrite a used ItemId");
237 					return InvalidOffsetNumber;
238 				}
239 			}
240 		}
241 		else
242 		{
243 			if (offsetNumber < limit)
244 				needshuffle = true; /* need to move existing linp's */
245 		}
246 	}
247 	else
248 	{
249 		/* offsetNumber was not passed in, so find a free slot */
250 		/* if no free slot, we'll put it at limit (1st open slot) */
251 		if (PageHasFreeLinePointers(phdr))
252 		{
253 			/*
254 			 * Scan line pointer array to locate a "recyclable" (unused)
255 			 * ItemId.
256 			 *
257 			 * Always use earlier items first.  PageTruncateLinePointerArray
258 			 * can only truncate unused items when they appear as a contiguous
259 			 * group at the end of the line pointer array.
260 			 */
261 			for (offsetNumber = FirstOffsetNumber;
262 				 offsetNumber < limit;	/* limit is maxoff+1 */
263 				 offsetNumber++)
264 			{
265 				itemId = PageGetItemId(phdr, offsetNumber);
266 
267 				/*
268 				 * We check for no storage as well, just to be paranoid;
269 				 * unused items should never have storage.  Assert() that the
270 				 * invariant is respected too.
271 				 */
272 				Assert(ItemIdIsUsed(itemId) || !ItemIdHasStorage(itemId));
273 
274 				if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
275 					break;
276 			}
277 			if (offsetNumber >= limit)
278 			{
279 				/* the hint is wrong, so reset it */
280 				PageClearHasFreeLinePointers(phdr);
281 			}
282 		}
283 		else
284 		{
285 			/* don't bother searching if hint says there's no free slot */
286 			offsetNumber = limit;
287 		}
288 	}
289 
290 	/* Reject placing items beyond the first unused line pointer */
291 	if (offsetNumber > limit)
292 	{
293 		elog(WARNING, "specified item offset is too large");
294 		return InvalidOffsetNumber;
295 	}
296 
297 	/* Reject placing items beyond heap boundary, if heap */
298 	if ((flags & PAI_IS_HEAP) != 0 && offsetNumber > MaxHeapTuplesPerPage)
299 	{
300 		elog(WARNING, "can't put more than MaxHeapTuplesPerPage items in a heap page");
301 		return InvalidOffsetNumber;
302 	}
303 
304 	/*
305 	 * Compute new lower and upper pointers for page, see if it'll fit.
306 	 *
307 	 * Note: do arithmetic as signed ints, to avoid mistakes if, say,
308 	 * alignedSize > pd_upper.
309 	 */
310 	if (offsetNumber == limit || needshuffle)
311 		lower = phdr->pd_lower + sizeof(ItemIdData);
312 	else
313 		lower = phdr->pd_lower;
314 
315 	alignedSize = MAXALIGN(size);
316 
317 	upper = (int) phdr->pd_upper - (int) alignedSize;
318 
319 	if (lower > upper)
320 		return InvalidOffsetNumber;
321 
322 	/*
323 	 * OK to insert the item.  First, shuffle the existing pointers if needed.
324 	 */
325 	itemId = PageGetItemId(phdr, offsetNumber);
326 
327 	if (needshuffle)
328 		memmove(itemId + 1, itemId,
329 				(limit - offsetNumber) * sizeof(ItemIdData));
330 
331 	/* set the line pointer */
332 	ItemIdSetNormal(itemId, upper, size);
333 
334 	/*
335 	 * Items normally contain no uninitialized bytes.  Core bufpage consumers
336 	 * conform, but this is not a necessary coding rule; a new index AM could
337 	 * opt to depart from it.  However, data type input functions and other
338 	 * C-language functions that synthesize datums should initialize all
339 	 * bytes; datumIsEqual() relies on this.  Testing here, along with the
340 	 * similar check in printtup(), helps to catch such mistakes.
341 	 *
342 	 * Values of the "name" type retrieved via index-only scans may contain
343 	 * uninitialized bytes; see comment in btrescan().  Valgrind will report
344 	 * this as an error, but it is safe to ignore.
345 	 */
346 	VALGRIND_CHECK_MEM_IS_DEFINED(item, size);
347 
348 	/* copy the item's data onto the page */
349 	memcpy((char *) page + upper, item, size);
350 
351 	/* adjust page header */
352 	phdr->pd_lower = (LocationIndex) lower;
353 	phdr->pd_upper = (LocationIndex) upper;
354 
355 	return offsetNumber;
356 }
357 
358 
359 /*
360  * PageGetTempPage
361  *		Get a temporary page in local memory for special processing.
362  *		The returned page is not initialized at all; caller must do that.
363  */
364 Page
PageGetTempPage(Page page)365 PageGetTempPage(Page page)
366 {
367 	Size		pageSize;
368 	Page		temp;
369 
370 	pageSize = PageGetPageSize(page);
371 	temp = (Page) palloc(pageSize);
372 
373 	return temp;
374 }
375 
376 /*
377  * PageGetTempPageCopy
378  *		Get a temporary page in local memory for special processing.
379  *		The page is initialized by copying the contents of the given page.
380  */
381 Page
PageGetTempPageCopy(Page page)382 PageGetTempPageCopy(Page page)
383 {
384 	Size		pageSize;
385 	Page		temp;
386 
387 	pageSize = PageGetPageSize(page);
388 	temp = (Page) palloc(pageSize);
389 
390 	memcpy(temp, page, pageSize);
391 
392 	return temp;
393 }
394 
395 /*
396  * PageGetTempPageCopySpecial
397  *		Get a temporary page in local memory for special processing.
398  *		The page is PageInit'd with the same special-space size as the
399  *		given page, and the special space is copied from the given page.
400  */
401 Page
PageGetTempPageCopySpecial(Page page)402 PageGetTempPageCopySpecial(Page page)
403 {
404 	Size		pageSize;
405 	Page		temp;
406 
407 	pageSize = PageGetPageSize(page);
408 	temp = (Page) palloc(pageSize);
409 
410 	PageInit(temp, pageSize, PageGetSpecialSize(page));
411 	memcpy(PageGetSpecialPointer(temp),
412 		   PageGetSpecialPointer(page),
413 		   PageGetSpecialSize(page));
414 
415 	return temp;
416 }
417 
418 /*
419  * PageRestoreTempPage
420  *		Copy temporary page back to permanent page after special processing
421  *		and release the temporary page.
422  */
423 void
PageRestoreTempPage(Page tempPage,Page oldPage)424 PageRestoreTempPage(Page tempPage, Page oldPage)
425 {
426 	Size		pageSize;
427 
428 	pageSize = PageGetPageSize(tempPage);
429 	memcpy((char *) oldPage, (char *) tempPage, pageSize);
430 
431 	pfree(tempPage);
432 }
433 
434 /*
435  * Tuple defrag support for PageRepairFragmentation and PageIndexMultiDelete
436  */
437 typedef struct itemIdCompactData
438 {
439 	uint16		offsetindex;	/* linp array index */
440 	int16		itemoff;		/* page offset of item data */
441 	uint16		alignedlen;		/* MAXALIGN(item data len) */
442 } itemIdCompactData;
443 typedef itemIdCompactData *itemIdCompact;
444 
445 /*
446  * After removing or marking some line pointers unused, move the tuples to
447  * remove the gaps caused by the removed items and reorder them back into
448  * reverse line pointer order in the page.
449  *
450  * This function can often be fairly hot, so it pays to take some measures to
451  * make it as optimal as possible.
452  *
453  * Callers may pass 'presorted' as true if the 'itemidbase' array is sorted in
454  * descending order of itemoff.  When this is true we can just memmove()
455  * tuples towards the end of the page.  This is quite a common case as it's
456  * the order that tuples are initially inserted into pages.  When we call this
457  * function to defragment the tuples in the page then any new line pointers
458  * added to the page will keep that presorted order, so hitting this case is
459  * still very common for tables that are commonly updated.
460  *
461  * When the 'itemidbase' array is not presorted then we're unable to just
462  * memmove() tuples around freely.  Doing so could cause us to overwrite the
463  * memory belonging to a tuple we've not moved yet.  In this case, we copy all
464  * the tuples that need to be moved into a temporary buffer.  We can then
465  * simply memcpy() out of that temp buffer back into the page at the correct
466  * location.  Tuples are copied back into the page in the same order as the
467  * 'itemidbase' array, so we end up reordering the tuples back into reverse
468  * line pointer order.  This will increase the chances of hitting the
469  * presorted case the next time around.
470  *
471  * Callers must ensure that nitems is > 0
472  */
473 static void
compactify_tuples(itemIdCompact itemidbase,int nitems,Page page,bool presorted)474 compactify_tuples(itemIdCompact itemidbase, int nitems, Page page, bool presorted)
475 {
476 	PageHeader	phdr = (PageHeader) page;
477 	Offset		upper;
478 	Offset		copy_tail;
479 	Offset		copy_head;
480 	itemIdCompact itemidptr;
481 	int			i;
482 
483 	/* Code within will not work correctly if nitems == 0 */
484 	Assert(nitems > 0);
485 
486 	if (presorted)
487 	{
488 
489 #ifdef USE_ASSERT_CHECKING
490 		{
491 			/*
492 			 * Verify we've not gotten any new callers that are incorrectly
493 			 * passing a true presorted value.
494 			 */
495 			Offset		lastoff = phdr->pd_special;
496 
497 			for (i = 0; i < nitems; i++)
498 			{
499 				itemidptr = &itemidbase[i];
500 
501 				Assert(lastoff > itemidptr->itemoff);
502 
503 				lastoff = itemidptr->itemoff;
504 			}
505 		}
506 #endif							/* USE_ASSERT_CHECKING */
507 
508 		/*
509 		 * 'itemidbase' is already in the optimal order, i.e, lower item
510 		 * pointers have a higher offset.  This allows us to memmove() the
511 		 * tuples up to the end of the page without having to worry about
512 		 * overwriting other tuples that have not been moved yet.
513 		 *
514 		 * There's a good chance that there are tuples already right at the
515 		 * end of the page that we can simply skip over because they're
516 		 * already in the correct location within the page.  We'll do that
517 		 * first...
518 		 */
519 		upper = phdr->pd_special;
520 		i = 0;
521 		do
522 		{
523 			itemidptr = &itemidbase[i];
524 			if (upper != itemidptr->itemoff + itemidptr->alignedlen)
525 				break;
526 			upper -= itemidptr->alignedlen;
527 
528 			i++;
529 		} while (i < nitems);
530 
531 		/*
532 		 * Now that we've found the first tuple that needs to be moved, we can
533 		 * do the tuple compactification.  We try and make the least number of
534 		 * memmove() calls and only call memmove() when there's a gap.  When
535 		 * we see a gap we just move all tuples after the gap up until the
536 		 * point of the last move operation.
537 		 */
538 		copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen;
539 		for (; i < nitems; i++)
540 		{
541 			ItemId		lp;
542 
543 			itemidptr = &itemidbase[i];
544 			lp = PageGetItemId(page, itemidptr->offsetindex + 1);
545 
546 			if (copy_head != itemidptr->itemoff + itemidptr->alignedlen)
547 			{
548 				memmove((char *) page + upper,
549 						page + copy_head,
550 						copy_tail - copy_head);
551 
552 				/*
553 				 * We've now moved all tuples already seen, but not the
554 				 * current tuple, so we set the copy_tail to the end of this
555 				 * tuple so it can be moved in another iteration of the loop.
556 				 */
557 				copy_tail = itemidptr->itemoff + itemidptr->alignedlen;
558 			}
559 			/* shift the target offset down by the length of this tuple */
560 			upper -= itemidptr->alignedlen;
561 			/* point the copy_head to the start of this tuple */
562 			copy_head = itemidptr->itemoff;
563 
564 			/* update the line pointer to reference the new offset */
565 			lp->lp_off = upper;
566 
567 		}
568 
569 		/* move the remaining tuples. */
570 		memmove((char *) page + upper,
571 				page + copy_head,
572 				copy_tail - copy_head);
573 	}
574 	else
575 	{
576 		PGAlignedBlock scratch;
577 		char	   *scratchptr = scratch.data;
578 
579 		/*
580 		 * Non-presorted case:  The tuples in the itemidbase array may be in
581 		 * any order.  So, in order to move these to the end of the page we
582 		 * must make a temp copy of each tuple that needs to be moved before
583 		 * we copy them back into the page at the new offset.
584 		 *
585 		 * If a large percentage of tuples have been pruned (>75%) then we'll
586 		 * copy these into the temp buffer tuple-by-tuple, otherwise, we'll
587 		 * just do a single memcpy() for all tuples that need to be moved.
588 		 * When so many tuples have been removed there's likely to be a lot of
589 		 * gaps and it's unlikely that many non-movable tuples remain at the
590 		 * end of the page.
591 		 */
592 		if (nitems < PageGetMaxOffsetNumber(page) / 4)
593 		{
594 			i = 0;
595 			do
596 			{
597 				itemidptr = &itemidbase[i];
598 				memcpy(scratchptr + itemidptr->itemoff, page + itemidptr->itemoff,
599 					   itemidptr->alignedlen);
600 				i++;
601 			} while (i < nitems);
602 
603 			/* Set things up for the compactification code below */
604 			i = 0;
605 			itemidptr = &itemidbase[0];
606 			upper = phdr->pd_special;
607 		}
608 		else
609 		{
610 			upper = phdr->pd_special;
611 
612 			/*
613 			 * Many tuples are likely to already be in the correct location.
614 			 * There's no need to copy these into the temp buffer.  Instead
615 			 * we'll just skip forward in the itemidbase array to the position
616 			 * that we do need to move tuples from so that the code below just
617 			 * leaves these ones alone.
618 			 */
619 			i = 0;
620 			do
621 			{
622 				itemidptr = &itemidbase[i];
623 				if (upper != itemidptr->itemoff + itemidptr->alignedlen)
624 					break;
625 				upper -= itemidptr->alignedlen;
626 
627 				i++;
628 			} while (i < nitems);
629 
630 			/* Copy all tuples that need to be moved into the temp buffer */
631 			memcpy(scratchptr + phdr->pd_upper,
632 				   page + phdr->pd_upper,
633 				   upper - phdr->pd_upper);
634 		}
635 
636 		/*
637 		 * Do the tuple compactification.  itemidptr is already pointing to
638 		 * the first tuple that we're going to move.  Here we collapse the
639 		 * memcpy calls for adjacent tuples into a single call.  This is done
640 		 * by delaying the memcpy call until we find a gap that needs to be
641 		 * closed.
642 		 */
643 		copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen;
644 		for (; i < nitems; i++)
645 		{
646 			ItemId		lp;
647 
648 			itemidptr = &itemidbase[i];
649 			lp = PageGetItemId(page, itemidptr->offsetindex + 1);
650 
651 			/* copy pending tuples when we detect a gap */
652 			if (copy_head != itemidptr->itemoff + itemidptr->alignedlen)
653 			{
654 				memcpy((char *) page + upper,
655 					   scratchptr + copy_head,
656 					   copy_tail - copy_head);
657 
658 				/*
659 				 * We've now copied all tuples already seen, but not the
660 				 * current tuple, so we set the copy_tail to the end of this
661 				 * tuple.
662 				 */
663 				copy_tail = itemidptr->itemoff + itemidptr->alignedlen;
664 			}
665 			/* shift the target offset down by the length of this tuple */
666 			upper -= itemidptr->alignedlen;
667 			/* point the copy_head to the start of this tuple */
668 			copy_head = itemidptr->itemoff;
669 
670 			/* update the line pointer to reference the new offset */
671 			lp->lp_off = upper;
672 
673 		}
674 
675 		/* Copy the remaining chunk */
676 		memcpy((char *) page + upper,
677 			   scratchptr + copy_head,
678 			   copy_tail - copy_head);
679 	}
680 
681 	phdr->pd_upper = upper;
682 }
683 
684 /*
685  * PageRepairFragmentation
686  *
687  * Frees fragmented space on a heap page following pruning.
688  *
689  * This routine is usable for heap pages only, but see PageIndexMultiDelete.
690  *
691  * Never removes unused line pointers.  PageTruncateLinePointerArray can
692  * safely remove some unused line pointers.  It ought to be safe for this
693  * routine to free unused line pointers in roughly the same way, but it's not
694  * clear that that would be beneficial.
695  *
696  * PageTruncateLinePointerArray is only called during VACUUM's second pass
697  * over the heap.  Any unused line pointers that it sees are likely to have
698  * been set to LP_UNUSED (from LP_DEAD) immediately before the time it is
699  * called.  On the other hand, many tables have the vast majority of all
700  * required pruning performed opportunistically (not during VACUUM).  And so
701  * there is, in general, a good chance that even large groups of unused line
702  * pointers that we see here will be recycled quickly.
703  *
704  * Caller had better have a super-exclusive lock on page's buffer.  As a side
705  * effect the page's PD_HAS_FREE_LINES hint bit will be set or unset as
706  * needed.
707  */
708 void
PageRepairFragmentation(Page page)709 PageRepairFragmentation(Page page)
710 {
711 	Offset		pd_lower = ((PageHeader) page)->pd_lower;
712 	Offset		pd_upper = ((PageHeader) page)->pd_upper;
713 	Offset		pd_special = ((PageHeader) page)->pd_special;
714 	Offset		last_offset;
715 	itemIdCompactData itemidbase[MaxHeapTuplesPerPage];
716 	itemIdCompact itemidptr;
717 	ItemId		lp;
718 	int			nline,
719 				nstorage,
720 				nunused;
721 	int			i;
722 	Size		totallen;
723 	bool		presorted = true;	/* For now */
724 
725 	/*
726 	 * It's worth the trouble to be more paranoid here than in most places,
727 	 * because we are about to reshuffle data in (what is usually) a shared
728 	 * disk buffer.  If we aren't careful then corrupted pointers, lengths,
729 	 * etc could cause us to clobber adjacent disk buffers, spreading the data
730 	 * loss further.  So, check everything.
731 	 */
732 	if (pd_lower < SizeOfPageHeaderData ||
733 		pd_lower > pd_upper ||
734 		pd_upper > pd_special ||
735 		pd_special > BLCKSZ ||
736 		pd_special != MAXALIGN(pd_special))
737 		ereport(ERROR,
738 				(errcode(ERRCODE_DATA_CORRUPTED),
739 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
740 						pd_lower, pd_upper, pd_special)));
741 
742 	/*
743 	 * Run through the line pointer array and collect data about live items.
744 	 */
745 	nline = PageGetMaxOffsetNumber(page);
746 	itemidptr = itemidbase;
747 	nunused = totallen = 0;
748 	last_offset = pd_special;
749 	for (i = FirstOffsetNumber; i <= nline; i++)
750 	{
751 		lp = PageGetItemId(page, i);
752 		if (ItemIdIsUsed(lp))
753 		{
754 			if (ItemIdHasStorage(lp))
755 			{
756 				itemidptr->offsetindex = i - 1;
757 				itemidptr->itemoff = ItemIdGetOffset(lp);
758 
759 				if (last_offset > itemidptr->itemoff)
760 					last_offset = itemidptr->itemoff;
761 				else
762 					presorted = false;
763 
764 				if (unlikely(itemidptr->itemoff < (int) pd_upper ||
765 							 itemidptr->itemoff >= (int) pd_special))
766 					ereport(ERROR,
767 							(errcode(ERRCODE_DATA_CORRUPTED),
768 							 errmsg("corrupted line pointer: %u",
769 									itemidptr->itemoff)));
770 				itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
771 				totallen += itemidptr->alignedlen;
772 				itemidptr++;
773 			}
774 		}
775 		else
776 		{
777 			/* Unused entries should have lp_len = 0, but make sure */
778 			ItemIdSetUnused(lp);
779 			nunused++;
780 		}
781 	}
782 
783 	nstorage = itemidptr - itemidbase;
784 	if (nstorage == 0)
785 	{
786 		/* Page is completely empty, so just reset it quickly */
787 		((PageHeader) page)->pd_upper = pd_special;
788 	}
789 	else
790 	{
791 		/* Need to compact the page the hard way */
792 		if (totallen > (Size) (pd_special - pd_lower))
793 			ereport(ERROR,
794 					(errcode(ERRCODE_DATA_CORRUPTED),
795 					 errmsg("corrupted item lengths: total %u, available space %u",
796 							(unsigned int) totallen, pd_special - pd_lower)));
797 
798 		compactify_tuples(itemidbase, nstorage, page, presorted);
799 	}
800 
801 	/* Set hint bit for PageAddItemExtended */
802 	if (nunused > 0)
803 		PageSetHasFreeLinePointers(page);
804 	else
805 		PageClearHasFreeLinePointers(page);
806 }
807 
808 /*
809  * PageTruncateLinePointerArray
810  *
811  * Removes unused line pointers at the end of the line pointer array.
812  *
813  * This routine is usable for heap pages only.  It is called by VACUUM during
814  * its second pass over the heap.  We expect at least one LP_UNUSED line
815  * pointer on the page (if VACUUM didn't have an LP_DEAD item on the page that
816  * it just set to LP_UNUSED then it should not call here).
817  *
818  * We avoid truncating the line pointer array to 0 items, if necessary by
819  * leaving behind a single remaining LP_UNUSED item.  This is a little
820  * arbitrary, but it seems like a good idea to avoid leaving a PageIsEmpty()
821  * page behind.
822  *
823  * Caller can have either an exclusive lock or a super-exclusive lock on
824  * page's buffer.  The page's PD_HAS_FREE_LINES hint bit will be set or unset
825  * based on whether or not we leave behind any remaining LP_UNUSED items.
826  */
827 void
PageTruncateLinePointerArray(Page page)828 PageTruncateLinePointerArray(Page page)
829 {
830 	PageHeader	phdr = (PageHeader) page;
831 	bool		countdone = false,
832 				sethint = false;
833 	int			nunusedend = 0;
834 
835 	/* Scan line pointer array back-to-front */
836 	for (int i = PageGetMaxOffsetNumber(page); i >= FirstOffsetNumber; i--)
837 	{
838 		ItemId		lp = PageGetItemId(page, i);
839 
840 		if (!countdone && i > FirstOffsetNumber)
841 		{
842 			/*
843 			 * Still determining which line pointers from the end of the array
844 			 * will be truncated away.  Either count another line pointer as
845 			 * safe to truncate, or notice that it's not safe to truncate
846 			 * additional line pointers (stop counting line pointers).
847 			 */
848 			if (!ItemIdIsUsed(lp))
849 				nunusedend++;
850 			else
851 				countdone = true;
852 		}
853 		else
854 		{
855 			/*
856 			 * Once we've stopped counting we still need to figure out if
857 			 * there are any remaining LP_UNUSED line pointers somewhere more
858 			 * towards the front of the array.
859 			 */
860 			if (!ItemIdIsUsed(lp))
861 			{
862 				/*
863 				 * This is an unused line pointer that we won't be truncating
864 				 * away -- so there is at least one.  Set hint on page.
865 				 */
866 				sethint = true;
867 				break;
868 			}
869 		}
870 	}
871 
872 	if (nunusedend > 0)
873 	{
874 		phdr->pd_lower -= sizeof(ItemIdData) * nunusedend;
875 
876 #ifdef CLOBBER_FREED_MEMORY
877 		memset((char *) page + phdr->pd_lower, 0x7F,
878 			   sizeof(ItemIdData) * nunusedend);
879 #endif
880 	}
881 	else
882 		Assert(sethint);
883 
884 	/* Set hint bit for PageAddItemExtended */
885 	if (sethint)
886 		PageSetHasFreeLinePointers(page);
887 	else
888 		PageClearHasFreeLinePointers(page);
889 }
890 
891 /*
892  * PageGetFreeSpace
893  *		Returns the size of the free (allocatable) space on a page,
894  *		reduced by the space needed for a new line pointer.
895  *
896  * Note: this should usually only be used on index pages.  Use
897  * PageGetHeapFreeSpace on heap pages.
898  */
899 Size
PageGetFreeSpace(Page page)900 PageGetFreeSpace(Page page)
901 {
902 	int			space;
903 
904 	/*
905 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
906 	 * pd_upper.
907 	 */
908 	space = (int) ((PageHeader) page)->pd_upper -
909 		(int) ((PageHeader) page)->pd_lower;
910 
911 	if (space < (int) sizeof(ItemIdData))
912 		return 0;
913 	space -= sizeof(ItemIdData);
914 
915 	return (Size) space;
916 }
917 
918 /*
919  * PageGetFreeSpaceForMultipleTuples
920  *		Returns the size of the free (allocatable) space on a page,
921  *		reduced by the space needed for multiple new line pointers.
922  *
923  * Note: this should usually only be used on index pages.  Use
924  * PageGetHeapFreeSpace on heap pages.
925  */
926 Size
PageGetFreeSpaceForMultipleTuples(Page page,int ntups)927 PageGetFreeSpaceForMultipleTuples(Page page, int ntups)
928 {
929 	int			space;
930 
931 	/*
932 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
933 	 * pd_upper.
934 	 */
935 	space = (int) ((PageHeader) page)->pd_upper -
936 		(int) ((PageHeader) page)->pd_lower;
937 
938 	if (space < (int) (ntups * sizeof(ItemIdData)))
939 		return 0;
940 	space -= ntups * sizeof(ItemIdData);
941 
942 	return (Size) space;
943 }
944 
945 /*
946  * PageGetExactFreeSpace
947  *		Returns the size of the free (allocatable) space on a page,
948  *		without any consideration for adding/removing line pointers.
949  */
950 Size
PageGetExactFreeSpace(Page page)951 PageGetExactFreeSpace(Page page)
952 {
953 	int			space;
954 
955 	/*
956 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
957 	 * pd_upper.
958 	 */
959 	space = (int) ((PageHeader) page)->pd_upper -
960 		(int) ((PageHeader) page)->pd_lower;
961 
962 	if (space < 0)
963 		return 0;
964 
965 	return (Size) space;
966 }
967 
968 
969 /*
970  * PageGetHeapFreeSpace
971  *		Returns the size of the free (allocatable) space on a page,
972  *		reduced by the space needed for a new line pointer.
973  *
974  * The difference between this and PageGetFreeSpace is that this will return
975  * zero if there are already MaxHeapTuplesPerPage line pointers in the page
976  * and none are free.  We use this to enforce that no more than
977  * MaxHeapTuplesPerPage line pointers are created on a heap page.  (Although
978  * no more tuples than that could fit anyway, in the presence of redirected
979  * or dead line pointers it'd be possible to have too many line pointers.
980  * To avoid breaking code that assumes MaxHeapTuplesPerPage is a hard limit
981  * on the number of line pointers, we make this extra check.)
982  */
983 Size
PageGetHeapFreeSpace(Page page)984 PageGetHeapFreeSpace(Page page)
985 {
986 	Size		space;
987 
988 	space = PageGetFreeSpace(page);
989 	if (space > 0)
990 	{
991 		OffsetNumber offnum,
992 					nline;
993 
994 		/*
995 		 * Are there already MaxHeapTuplesPerPage line pointers in the page?
996 		 */
997 		nline = PageGetMaxOffsetNumber(page);
998 		if (nline >= MaxHeapTuplesPerPage)
999 		{
1000 			if (PageHasFreeLinePointers((PageHeader) page))
1001 			{
1002 				/*
1003 				 * Since this is just a hint, we must confirm that there is
1004 				 * indeed a free line pointer
1005 				 */
1006 				for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
1007 				{
1008 					ItemId		lp = PageGetItemId(page, offnum);
1009 
1010 					if (!ItemIdIsUsed(lp))
1011 						break;
1012 				}
1013 
1014 				if (offnum > nline)
1015 				{
1016 					/*
1017 					 * The hint is wrong, but we can't clear it here since we
1018 					 * don't have the ability to mark the page dirty.
1019 					 */
1020 					space = 0;
1021 				}
1022 			}
1023 			else
1024 			{
1025 				/*
1026 				 * Although the hint might be wrong, PageAddItem will believe
1027 				 * it anyway, so we must believe it too.
1028 				 */
1029 				space = 0;
1030 			}
1031 		}
1032 	}
1033 	return space;
1034 }
1035 
1036 
1037 /*
1038  * PageIndexTupleDelete
1039  *
1040  * This routine does the work of removing a tuple from an index page.
1041  *
1042  * Unlike heap pages, we compact out the line pointer for the removed tuple.
1043  */
1044 void
PageIndexTupleDelete(Page page,OffsetNumber offnum)1045 PageIndexTupleDelete(Page page, OffsetNumber offnum)
1046 {
1047 	PageHeader	phdr = (PageHeader) page;
1048 	char	   *addr;
1049 	ItemId		tup;
1050 	Size		size;
1051 	unsigned	offset;
1052 	int			nbytes;
1053 	int			offidx;
1054 	int			nline;
1055 
1056 	/*
1057 	 * As with PageRepairFragmentation, paranoia seems justified.
1058 	 */
1059 	if (phdr->pd_lower < SizeOfPageHeaderData ||
1060 		phdr->pd_lower > phdr->pd_upper ||
1061 		phdr->pd_upper > phdr->pd_special ||
1062 		phdr->pd_special > BLCKSZ ||
1063 		phdr->pd_special != MAXALIGN(phdr->pd_special))
1064 		ereport(ERROR,
1065 				(errcode(ERRCODE_DATA_CORRUPTED),
1066 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
1067 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
1068 
1069 	nline = PageGetMaxOffsetNumber(page);
1070 	if ((int) offnum <= 0 || (int) offnum > nline)
1071 		elog(ERROR, "invalid index offnum: %u", offnum);
1072 
1073 	/* change offset number to offset index */
1074 	offidx = offnum - 1;
1075 
1076 	tup = PageGetItemId(page, offnum);
1077 	Assert(ItemIdHasStorage(tup));
1078 	size = ItemIdGetLength(tup);
1079 	offset = ItemIdGetOffset(tup);
1080 
1081 	if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
1082 		offset != MAXALIGN(offset))
1083 		ereport(ERROR,
1084 				(errcode(ERRCODE_DATA_CORRUPTED),
1085 				 errmsg("corrupted line pointer: offset = %u, size = %u",
1086 						offset, (unsigned int) size)));
1087 
1088 	/* Amount of space to actually be deleted */
1089 	size = MAXALIGN(size);
1090 
1091 	/*
1092 	 * First, we want to get rid of the pd_linp entry for the index tuple. We
1093 	 * copy all subsequent linp's back one slot in the array. We don't use
1094 	 * PageGetItemId, because we are manipulating the _array_, not individual
1095 	 * linp's.
1096 	 */
1097 	nbytes = phdr->pd_lower -
1098 		((char *) &phdr->pd_linp[offidx + 1] - (char *) phdr);
1099 
1100 	if (nbytes > 0)
1101 		memmove((char *) &(phdr->pd_linp[offidx]),
1102 				(char *) &(phdr->pd_linp[offidx + 1]),
1103 				nbytes);
1104 
1105 	/*
1106 	 * Now move everything between the old upper bound (beginning of tuple
1107 	 * space) and the beginning of the deleted tuple forward, so that space in
1108 	 * the middle of the page is left free.  If we've just deleted the tuple
1109 	 * at the beginning of tuple space, then there's no need to do the copy.
1110 	 */
1111 
1112 	/* beginning of tuple space */
1113 	addr = (char *) page + phdr->pd_upper;
1114 
1115 	if (offset > phdr->pd_upper)
1116 		memmove(addr + size, addr, offset - phdr->pd_upper);
1117 
1118 	/* adjust free space boundary pointers */
1119 	phdr->pd_upper += size;
1120 	phdr->pd_lower -= sizeof(ItemIdData);
1121 
1122 	/*
1123 	 * Finally, we need to adjust the linp entries that remain.
1124 	 *
1125 	 * Anything that used to be before the deleted tuple's data was moved
1126 	 * forward by the size of the deleted tuple.
1127 	 */
1128 	if (!PageIsEmpty(page))
1129 	{
1130 		int			i;
1131 
1132 		nline--;				/* there's one less than when we started */
1133 		for (i = 1; i <= nline; i++)
1134 		{
1135 			ItemId		ii = PageGetItemId(phdr, i);
1136 
1137 			Assert(ItemIdHasStorage(ii));
1138 			if (ItemIdGetOffset(ii) <= offset)
1139 				ii->lp_off += size;
1140 		}
1141 	}
1142 }
1143 
1144 
1145 /*
1146  * PageIndexMultiDelete
1147  *
1148  * This routine handles the case of deleting multiple tuples from an
1149  * index page at once.  It is considerably faster than a loop around
1150  * PageIndexTupleDelete ... however, the caller *must* supply the array
1151  * of item numbers to be deleted in item number order!
1152  */
1153 void
PageIndexMultiDelete(Page page,OffsetNumber * itemnos,int nitems)1154 PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
1155 {
1156 	PageHeader	phdr = (PageHeader) page;
1157 	Offset		pd_lower = phdr->pd_lower;
1158 	Offset		pd_upper = phdr->pd_upper;
1159 	Offset		pd_special = phdr->pd_special;
1160 	Offset		last_offset;
1161 	itemIdCompactData itemidbase[MaxIndexTuplesPerPage];
1162 	ItemIdData	newitemids[MaxIndexTuplesPerPage];
1163 	itemIdCompact itemidptr;
1164 	ItemId		lp;
1165 	int			nline,
1166 				nused;
1167 	Size		totallen;
1168 	Size		size;
1169 	unsigned	offset;
1170 	int			nextitm;
1171 	OffsetNumber offnum;
1172 	bool		presorted = true;	/* For now */
1173 
1174 	Assert(nitems <= MaxIndexTuplesPerPage);
1175 
1176 	/*
1177 	 * If there aren't very many items to delete, then retail
1178 	 * PageIndexTupleDelete is the best way.  Delete the items in reverse
1179 	 * order so we don't have to think about adjusting item numbers for
1180 	 * previous deletions.
1181 	 *
1182 	 * TODO: tune the magic number here
1183 	 */
1184 	if (nitems <= 2)
1185 	{
1186 		while (--nitems >= 0)
1187 			PageIndexTupleDelete(page, itemnos[nitems]);
1188 		return;
1189 	}
1190 
1191 	/*
1192 	 * As with PageRepairFragmentation, paranoia seems justified.
1193 	 */
1194 	if (pd_lower < SizeOfPageHeaderData ||
1195 		pd_lower > pd_upper ||
1196 		pd_upper > pd_special ||
1197 		pd_special > BLCKSZ ||
1198 		pd_special != MAXALIGN(pd_special))
1199 		ereport(ERROR,
1200 				(errcode(ERRCODE_DATA_CORRUPTED),
1201 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
1202 						pd_lower, pd_upper, pd_special)));
1203 
1204 	/*
1205 	 * Scan the line pointer array and build a list of just the ones we are
1206 	 * going to keep.  Notice we do not modify the page yet, since we are
1207 	 * still validity-checking.
1208 	 */
1209 	nline = PageGetMaxOffsetNumber(page);
1210 	itemidptr = itemidbase;
1211 	totallen = 0;
1212 	nused = 0;
1213 	nextitm = 0;
1214 	last_offset = pd_special;
1215 	for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
1216 	{
1217 		lp = PageGetItemId(page, offnum);
1218 		Assert(ItemIdHasStorage(lp));
1219 		size = ItemIdGetLength(lp);
1220 		offset = ItemIdGetOffset(lp);
1221 		if (offset < pd_upper ||
1222 			(offset + size) > pd_special ||
1223 			offset != MAXALIGN(offset))
1224 			ereport(ERROR,
1225 					(errcode(ERRCODE_DATA_CORRUPTED),
1226 					 errmsg("corrupted line pointer: offset = %u, size = %u",
1227 							offset, (unsigned int) size)));
1228 
1229 		if (nextitm < nitems && offnum == itemnos[nextitm])
1230 		{
1231 			/* skip item to be deleted */
1232 			nextitm++;
1233 		}
1234 		else
1235 		{
1236 			itemidptr->offsetindex = nused; /* where it will go */
1237 			itemidptr->itemoff = offset;
1238 
1239 			if (last_offset > itemidptr->itemoff)
1240 				last_offset = itemidptr->itemoff;
1241 			else
1242 				presorted = false;
1243 
1244 			itemidptr->alignedlen = MAXALIGN(size);
1245 			totallen += itemidptr->alignedlen;
1246 			newitemids[nused] = *lp;
1247 			itemidptr++;
1248 			nused++;
1249 		}
1250 	}
1251 
1252 	/* this will catch invalid or out-of-order itemnos[] */
1253 	if (nextitm != nitems)
1254 		elog(ERROR, "incorrect index offsets supplied");
1255 
1256 	if (totallen > (Size) (pd_special - pd_lower))
1257 		ereport(ERROR,
1258 				(errcode(ERRCODE_DATA_CORRUPTED),
1259 				 errmsg("corrupted item lengths: total %u, available space %u",
1260 						(unsigned int) totallen, pd_special - pd_lower)));
1261 
1262 	/*
1263 	 * Looks good. Overwrite the line pointers with the copy, from which we've
1264 	 * removed all the unused items.
1265 	 */
1266 	memcpy(phdr->pd_linp, newitemids, nused * sizeof(ItemIdData));
1267 	phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
1268 
1269 	/* and compactify the tuple data */
1270 	if (nused > 0)
1271 		compactify_tuples(itemidbase, nused, page, presorted);
1272 	else
1273 		phdr->pd_upper = pd_special;
1274 }
1275 
1276 
1277 /*
1278  * PageIndexTupleDeleteNoCompact
1279  *
1280  * Remove the specified tuple from an index page, but set its line pointer
1281  * to "unused" instead of compacting it out, except that it can be removed
1282  * if it's the last line pointer on the page.
1283  *
1284  * This is used for index AMs that require that existing TIDs of live tuples
1285  * remain unchanged, and are willing to allow unused line pointers instead.
1286  */
1287 void
PageIndexTupleDeleteNoCompact(Page page,OffsetNumber offnum)1288 PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum)
1289 {
1290 	PageHeader	phdr = (PageHeader) page;
1291 	char	   *addr;
1292 	ItemId		tup;
1293 	Size		size;
1294 	unsigned	offset;
1295 	int			nline;
1296 
1297 	/*
1298 	 * As with PageRepairFragmentation, paranoia seems justified.
1299 	 */
1300 	if (phdr->pd_lower < SizeOfPageHeaderData ||
1301 		phdr->pd_lower > phdr->pd_upper ||
1302 		phdr->pd_upper > phdr->pd_special ||
1303 		phdr->pd_special > BLCKSZ ||
1304 		phdr->pd_special != MAXALIGN(phdr->pd_special))
1305 		ereport(ERROR,
1306 				(errcode(ERRCODE_DATA_CORRUPTED),
1307 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
1308 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
1309 
1310 	nline = PageGetMaxOffsetNumber(page);
1311 	if ((int) offnum <= 0 || (int) offnum > nline)
1312 		elog(ERROR, "invalid index offnum: %u", offnum);
1313 
1314 	tup = PageGetItemId(page, offnum);
1315 	Assert(ItemIdHasStorage(tup));
1316 	size = ItemIdGetLength(tup);
1317 	offset = ItemIdGetOffset(tup);
1318 
1319 	if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
1320 		offset != MAXALIGN(offset))
1321 		ereport(ERROR,
1322 				(errcode(ERRCODE_DATA_CORRUPTED),
1323 				 errmsg("corrupted line pointer: offset = %u, size = %u",
1324 						offset, (unsigned int) size)));
1325 
1326 	/* Amount of space to actually be deleted */
1327 	size = MAXALIGN(size);
1328 
1329 	/*
1330 	 * Either set the line pointer to "unused", or zap it if it's the last
1331 	 * one.  (Note: it's possible that the next-to-last one(s) are already
1332 	 * unused, but we do not trouble to try to compact them out if so.)
1333 	 */
1334 	if ((int) offnum < nline)
1335 		ItemIdSetUnused(tup);
1336 	else
1337 	{
1338 		phdr->pd_lower -= sizeof(ItemIdData);
1339 		nline--;				/* there's one less than when we started */
1340 	}
1341 
1342 	/*
1343 	 * Now move everything between the old upper bound (beginning of tuple
1344 	 * space) and the beginning of the deleted tuple forward, so that space in
1345 	 * the middle of the page is left free.  If we've just deleted the tuple
1346 	 * at the beginning of tuple space, then there's no need to do the copy.
1347 	 */
1348 
1349 	/* beginning of tuple space */
1350 	addr = (char *) page + phdr->pd_upper;
1351 
1352 	if (offset > phdr->pd_upper)
1353 		memmove(addr + size, addr, offset - phdr->pd_upper);
1354 
1355 	/* adjust free space boundary pointer */
1356 	phdr->pd_upper += size;
1357 
1358 	/*
1359 	 * Finally, we need to adjust the linp entries that remain.
1360 	 *
1361 	 * Anything that used to be before the deleted tuple's data was moved
1362 	 * forward by the size of the deleted tuple.
1363 	 */
1364 	if (!PageIsEmpty(page))
1365 	{
1366 		int			i;
1367 
1368 		for (i = 1; i <= nline; i++)
1369 		{
1370 			ItemId		ii = PageGetItemId(phdr, i);
1371 
1372 			if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1373 				ii->lp_off += size;
1374 		}
1375 	}
1376 }
1377 
1378 
1379 /*
1380  * PageIndexTupleOverwrite
1381  *
1382  * Replace a specified tuple on an index page.
1383  *
1384  * The new tuple is placed exactly where the old one had been, shifting
1385  * other tuples' data up or down as needed to keep the page compacted.
1386  * This is better than deleting and reinserting the tuple, because it
1387  * avoids any data shifting when the tuple size doesn't change; and
1388  * even when it does, we avoid moving the line pointers around.
1389  * This could be used by an index AM that doesn't want to unset the
1390  * LP_DEAD bit when it happens to be set.  It could conceivably also be
1391  * used by an index AM that cares about the physical order of tuples as
1392  * well as their logical/ItemId order.
1393  *
1394  * If there's insufficient space for the new tuple, return false.  Other
1395  * errors represent data-corruption problems, so we just elog.
1396  */
1397 bool
PageIndexTupleOverwrite(Page page,OffsetNumber offnum,Item newtup,Size newsize)1398 PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
1399 						Item newtup, Size newsize)
1400 {
1401 	PageHeader	phdr = (PageHeader) page;
1402 	ItemId		tupid;
1403 	int			oldsize;
1404 	unsigned	offset;
1405 	Size		alignednewsize;
1406 	int			size_diff;
1407 	int			itemcount;
1408 
1409 	/*
1410 	 * As with PageRepairFragmentation, paranoia seems justified.
1411 	 */
1412 	if (phdr->pd_lower < SizeOfPageHeaderData ||
1413 		phdr->pd_lower > phdr->pd_upper ||
1414 		phdr->pd_upper > phdr->pd_special ||
1415 		phdr->pd_special > BLCKSZ ||
1416 		phdr->pd_special != MAXALIGN(phdr->pd_special))
1417 		ereport(ERROR,
1418 				(errcode(ERRCODE_DATA_CORRUPTED),
1419 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
1420 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
1421 
1422 	itemcount = PageGetMaxOffsetNumber(page);
1423 	if ((int) offnum <= 0 || (int) offnum > itemcount)
1424 		elog(ERROR, "invalid index offnum: %u", offnum);
1425 
1426 	tupid = PageGetItemId(page, offnum);
1427 	Assert(ItemIdHasStorage(tupid));
1428 	oldsize = ItemIdGetLength(tupid);
1429 	offset = ItemIdGetOffset(tupid);
1430 
1431 	if (offset < phdr->pd_upper || (offset + oldsize) > phdr->pd_special ||
1432 		offset != MAXALIGN(offset))
1433 		ereport(ERROR,
1434 				(errcode(ERRCODE_DATA_CORRUPTED),
1435 				 errmsg("corrupted line pointer: offset = %u, size = %u",
1436 						offset, (unsigned int) oldsize)));
1437 
1438 	/*
1439 	 * Determine actual change in space requirement, check for page overflow.
1440 	 */
1441 	oldsize = MAXALIGN(oldsize);
1442 	alignednewsize = MAXALIGN(newsize);
1443 	if (alignednewsize > oldsize + (phdr->pd_upper - phdr->pd_lower))
1444 		return false;
1445 
1446 	/*
1447 	 * Relocate existing data and update line pointers, unless the new tuple
1448 	 * is the same size as the old (after alignment), in which case there's
1449 	 * nothing to do.  Notice that what we have to relocate is data before the
1450 	 * target tuple, not data after, so it's convenient to express size_diff
1451 	 * as the amount by which the tuple's size is decreasing, making it the
1452 	 * delta to add to pd_upper and affected line pointers.
1453 	 */
1454 	size_diff = oldsize - (int) alignednewsize;
1455 	if (size_diff != 0)
1456 	{
1457 		char	   *addr = (char *) page + phdr->pd_upper;
1458 		int			i;
1459 
1460 		/* relocate all tuple data before the target tuple */
1461 		memmove(addr + size_diff, addr, offset - phdr->pd_upper);
1462 
1463 		/* adjust free space boundary pointer */
1464 		phdr->pd_upper += size_diff;
1465 
1466 		/* adjust affected line pointers too */
1467 		for (i = FirstOffsetNumber; i <= itemcount; i++)
1468 		{
1469 			ItemId		ii = PageGetItemId(phdr, i);
1470 
1471 			/* Allow items without storage; currently only BRIN needs that */
1472 			if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1473 				ii->lp_off += size_diff;
1474 		}
1475 	}
1476 
1477 	/* Update the item's tuple length without changing its lp_flags field */
1478 	tupid->lp_off = offset + size_diff;
1479 	tupid->lp_len = newsize;
1480 
1481 	/* Copy new tuple data onto page */
1482 	memcpy(PageGetItem(page, tupid), newtup, newsize);
1483 
1484 	return true;
1485 }
1486 
1487 
1488 /*
1489  * Set checksum for a page in shared buffers.
1490  *
1491  * If checksums are disabled, or if the page is not initialized, just return
1492  * the input.  Otherwise, we must make a copy of the page before calculating
1493  * the checksum, to prevent concurrent modifications (e.g. setting hint bits)
1494  * from making the final checksum invalid.  It doesn't matter if we include or
1495  * exclude hints during the copy, as long as we write a valid page and
1496  * associated checksum.
1497  *
1498  * Returns a pointer to the block-sized data that needs to be written. Uses
1499  * statically-allocated memory, so the caller must immediately write the
1500  * returned page and not refer to it again.
1501  */
1502 char *
PageSetChecksumCopy(Page page,BlockNumber blkno)1503 PageSetChecksumCopy(Page page, BlockNumber blkno)
1504 {
1505 	static char *pageCopy = NULL;
1506 
1507 	/* If we don't need a checksum, just return the passed-in data */
1508 	if (PageIsNew(page) || !DataChecksumsEnabled())
1509 		return (char *) page;
1510 
1511 	/*
1512 	 * We allocate the copy space once and use it over on each subsequent
1513 	 * call.  The point of palloc'ing here, rather than having a static char
1514 	 * array, is first to ensure adequate alignment for the checksumming code
1515 	 * and second to avoid wasting space in processes that never call this.
1516 	 */
1517 	if (pageCopy == NULL)
1518 		pageCopy = MemoryContextAlloc(TopMemoryContext, BLCKSZ);
1519 
1520 	memcpy(pageCopy, (char *) page, BLCKSZ);
1521 	((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno);
1522 	return pageCopy;
1523 }
1524 
1525 /*
1526  * Set checksum for a page in private memory.
1527  *
1528  * This must only be used when we know that no other process can be modifying
1529  * the page buffer.
1530  */
1531 void
PageSetChecksumInplace(Page page,BlockNumber blkno)1532 PageSetChecksumInplace(Page page, BlockNumber blkno)
1533 {
1534 	/* If we don't need a checksum, just return */
1535 	if (PageIsNew(page) || !DataChecksumsEnabled())
1536 		return;
1537 
1538 	((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blkno);
1539 }
1540