1 /*-------------------------------------------------------------------------
2  *
3  * bufpage.c
4  *	  POSTGRES standard buffer page code.
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/page/bufpage.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/itup.h"
19 #include "access/xlog.h"
20 #include "pgstat.h"
21 #include "storage/checksum.h"
22 #include "utils/memdebug.h"
23 #include "utils/memutils.h"
24 
25 
26 /* GUC variable */
27 bool		ignore_checksum_failure = false;
28 
29 
30 /* ----------------------------------------------------------------
31  *						Page support functions
32  * ----------------------------------------------------------------
33  */
34 
35 /*
36  * PageInit
37  *		Initializes the contents of a page.
38  *		Note that we don't calculate an initial checksum here; that's not done
39  *		until it's time to write.
40  */
41 void
PageInit(Page page,Size pageSize,Size specialSize)42 PageInit(Page page, Size pageSize, Size specialSize)
43 {
44 	PageHeader	p = (PageHeader) page;
45 
46 	specialSize = MAXALIGN(specialSize);
47 
48 	Assert(pageSize == BLCKSZ);
49 	Assert(pageSize > specialSize + SizeOfPageHeaderData);
50 
51 	/* Make sure all fields of page are zero, as well as unused space */
52 	MemSet(p, 0, pageSize);
53 
54 	p->pd_flags = 0;
55 	p->pd_lower = SizeOfPageHeaderData;
56 	p->pd_upper = pageSize - specialSize;
57 	p->pd_special = pageSize - specialSize;
58 	PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
59 	/* p->pd_prune_xid = InvalidTransactionId;		done by above MemSet */
60 }
61 
62 
63 /*
64  * PageIsVerified
65  *		Utility wrapper for PageIsVerifiedExtended().
66  */
67 bool
PageIsVerified(Page page,BlockNumber blkno)68 PageIsVerified(Page page, BlockNumber blkno)
69 {
70 	return PageIsVerifiedExtended(page, blkno,
71 								  PIV_LOG_WARNING | PIV_REPORT_STAT);
72 }
73 
74 
75 /*
76  * PageIsVerifiedExtended
77  *		Check that the page header and checksum (if any) appear valid.
78  *
79  * This is called when a page has just been read in from disk.  The idea is
80  * to cheaply detect trashed pages before we go nuts following bogus line
81  * pointers, testing invalid transaction identifiers, etc.
82  *
83  * It turns out to be necessary to allow zeroed pages here too.  Even though
84  * this routine is *not* called when deliberately adding a page to a relation,
85  * there are scenarios in which a zeroed page might be found in a table.
86  * (Example: a backend extends a relation, then crashes before it can write
87  * any WAL entry about the new page.  The kernel will already have the
88  * zeroed page in the file, and it will stay that way after restart.)  So we
89  * allow zeroed pages here, and are careful that the page access macros
90  * treat such a page as empty and without free space.  Eventually, VACUUM
91  * will clean up such a page and make it usable.
92  *
93  * If flag PIV_LOG_WARNING is set, a WARNING is logged in the event of
94  * a checksum failure.
95  *
96  * If flag PIV_REPORT_STAT is set, a checksum failure is reported directly
97  * to pgstat.
98  */
99 bool
PageIsVerifiedExtended(Page page,BlockNumber blkno,int flags)100 PageIsVerifiedExtended(Page page, BlockNumber blkno, int flags)
101 {
102 	PageHeader	p = (PageHeader) page;
103 	size_t	   *pagebytes;
104 	int			i;
105 	bool		checksum_failure = false;
106 	bool		header_sane = false;
107 	bool		all_zeroes = false;
108 	uint16		checksum = 0;
109 
110 	/*
111 	 * Don't verify page data unless the page passes basic non-zero test
112 	 */
113 	if (!PageIsNew(page))
114 	{
115 		if (DataChecksumsEnabled())
116 		{
117 			checksum = pg_checksum_page((char *) page, blkno);
118 
119 			if (checksum != p->pd_checksum)
120 				checksum_failure = true;
121 		}
122 
123 		/*
124 		 * The following checks don't prove the header is correct, only that
125 		 * it looks sane enough to allow into the buffer pool. Later usage of
126 		 * the block can still reveal problems, which is why we offer the
127 		 * checksum option.
128 		 */
129 		if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
130 			p->pd_lower <= p->pd_upper &&
131 			p->pd_upper <= p->pd_special &&
132 			p->pd_special <= BLCKSZ &&
133 			p->pd_special == MAXALIGN(p->pd_special))
134 			header_sane = true;
135 
136 		if (header_sane && !checksum_failure)
137 			return true;
138 	}
139 
140 	/* Check all-zeroes case */
141 	all_zeroes = true;
142 	pagebytes = (size_t *) page;
143 	for (i = 0; i < (BLCKSZ / sizeof(size_t)); i++)
144 	{
145 		if (pagebytes[i] != 0)
146 		{
147 			all_zeroes = false;
148 			break;
149 		}
150 	}
151 
152 	if (all_zeroes)
153 		return true;
154 
155 	/*
156 	 * Throw a WARNING if the checksum fails, but only after we've checked for
157 	 * the all-zeroes case.
158 	 */
159 	if (checksum_failure)
160 	{
161 		if ((flags & PIV_LOG_WARNING) != 0)
162 			ereport(WARNING,
163 					(errcode(ERRCODE_DATA_CORRUPTED),
164 					 errmsg("page verification failed, calculated checksum %u but expected %u",
165 							checksum, p->pd_checksum)));
166 
167 		if ((flags & PIV_REPORT_STAT) != 0)
168 			pgstat_report_checksum_failure();
169 
170 		if (header_sane && ignore_checksum_failure)
171 			return true;
172 	}
173 
174 	return false;
175 }
176 
177 
178 /*
179  *	PageAddItemExtended
180  *
181  *	Add an item to a page.  Return value is the offset at which it was
182  *	inserted, or InvalidOffsetNumber if the item is not inserted for any
183  *	reason.  A WARNING is issued indicating the reason for the refusal.
184  *
185  *	offsetNumber must be either InvalidOffsetNumber to specify finding a
186  *	free line pointer, or a value between FirstOffsetNumber and one past
187  *	the last existing item, to specify using that particular line pointer.
188  *
189  *	If offsetNumber is valid and flag PAI_OVERWRITE is set, we just store
190  *	the item at the specified offsetNumber, which must be either a
191  *	currently-unused line pointer, or one past the last existing item.
192  *
193  *	If offsetNumber is valid and flag PAI_OVERWRITE is not set, insert
194  *	the item at the specified offsetNumber, moving existing items later
195  *	in the array to make room.
196  *
197  *	If offsetNumber is not valid, then assign a slot by finding the first
198  *	one that is both unused and deallocated.
199  *
200  *	If flag PAI_IS_HEAP is set, we enforce that there can't be more than
201  *	MaxHeapTuplesPerPage line pointers on the page.
202  *
203  *	!!! EREPORT(ERROR) IS DISALLOWED HERE !!!
204  */
205 OffsetNumber
PageAddItemExtended(Page page,Item item,Size size,OffsetNumber offsetNumber,int flags)206 PageAddItemExtended(Page page,
207 					Item item,
208 					Size size,
209 					OffsetNumber offsetNumber,
210 					int flags)
211 {
212 	PageHeader	phdr = (PageHeader) page;
213 	Size		alignedSize;
214 	int			lower;
215 	int			upper;
216 	ItemId		itemId;
217 	OffsetNumber limit;
218 	bool		needshuffle = false;
219 
220 	/*
221 	 * Be wary about corrupted page pointers
222 	 */
223 	if (phdr->pd_lower < SizeOfPageHeaderData ||
224 		phdr->pd_lower > phdr->pd_upper ||
225 		phdr->pd_upper > phdr->pd_special ||
226 		phdr->pd_special > BLCKSZ)
227 		ereport(PANIC,
228 				(errcode(ERRCODE_DATA_CORRUPTED),
229 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
230 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
231 
232 	/*
233 	 * Select offsetNumber to place the new item at
234 	 */
235 	limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
236 
237 	/* was offsetNumber passed in? */
238 	if (OffsetNumberIsValid(offsetNumber))
239 	{
240 		/* yes, check it */
241 		if ((flags & PAI_OVERWRITE) != 0)
242 		{
243 			if (offsetNumber < limit)
244 			{
245 				itemId = PageGetItemId(phdr, offsetNumber);
246 				if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
247 				{
248 					elog(WARNING, "will not overwrite a used ItemId");
249 					return InvalidOffsetNumber;
250 				}
251 			}
252 		}
253 		else
254 		{
255 			if (offsetNumber < limit)
256 				needshuffle = true; /* need to move existing linp's */
257 		}
258 	}
259 	else
260 	{
261 		/* offsetNumber was not passed in, so find a free slot */
262 		/* if no free slot, we'll put it at limit (1st open slot) */
263 		if (PageHasFreeLinePointers(phdr))
264 		{
265 			/*
266 			 * Look for "recyclable" (unused) ItemId.  We check for no storage
267 			 * as well, just to be paranoid --- unused items should never have
268 			 * storage.
269 			 */
270 			for (offsetNumber = 1; offsetNumber < limit; offsetNumber++)
271 			{
272 				itemId = PageGetItemId(phdr, offsetNumber);
273 				if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
274 					break;
275 			}
276 			if (offsetNumber >= limit)
277 			{
278 				/* the hint is wrong, so reset it */
279 				PageClearHasFreeLinePointers(phdr);
280 			}
281 		}
282 		else
283 		{
284 			/* don't bother searching if hint says there's no free slot */
285 			offsetNumber = limit;
286 		}
287 	}
288 
289 	/* Reject placing items beyond the first unused line pointer */
290 	if (offsetNumber > limit)
291 	{
292 		elog(WARNING, "specified item offset is too large");
293 		return InvalidOffsetNumber;
294 	}
295 
296 	/* Reject placing items beyond heap boundary, if heap */
297 	if ((flags & PAI_IS_HEAP) != 0 && offsetNumber > MaxHeapTuplesPerPage)
298 	{
299 		elog(WARNING, "can't put more than MaxHeapTuplesPerPage items in a heap page");
300 		return InvalidOffsetNumber;
301 	}
302 
303 	/*
304 	 * Compute new lower and upper pointers for page, see if it'll fit.
305 	 *
306 	 * Note: do arithmetic as signed ints, to avoid mistakes if, say,
307 	 * alignedSize > pd_upper.
308 	 */
309 	if (offsetNumber == limit || needshuffle)
310 		lower = phdr->pd_lower + sizeof(ItemIdData);
311 	else
312 		lower = phdr->pd_lower;
313 
314 	alignedSize = MAXALIGN(size);
315 
316 	upper = (int) phdr->pd_upper - (int) alignedSize;
317 
318 	if (lower > upper)
319 		return InvalidOffsetNumber;
320 
321 	/*
322 	 * OK to insert the item.  First, shuffle the existing pointers if needed.
323 	 */
324 	itemId = PageGetItemId(phdr, offsetNumber);
325 
326 	if (needshuffle)
327 		memmove(itemId + 1, itemId,
328 				(limit - offsetNumber) * sizeof(ItemIdData));
329 
330 	/* set the line pointer */
331 	ItemIdSetNormal(itemId, upper, size);
332 
333 	/*
334 	 * Items normally contain no uninitialized bytes.  Core bufpage consumers
335 	 * conform, but this is not a necessary coding rule; a new index AM could
336 	 * opt to depart from it.  However, data type input functions and other
337 	 * C-language functions that synthesize datums should initialize all
338 	 * bytes; datumIsEqual() relies on this.  Testing here, along with the
339 	 * similar check in printtup(), helps to catch such mistakes.
340 	 *
341 	 * Values of the "name" type retrieved via index-only scans may contain
342 	 * uninitialized bytes; see comment in btrescan().  Valgrind will report
343 	 * this as an error, but it is safe to ignore.
344 	 */
345 	VALGRIND_CHECK_MEM_IS_DEFINED(item, size);
346 
347 	/* copy the item's data onto the page */
348 	memcpy((char *) page + upper, item, size);
349 
350 	/* adjust page header */
351 	phdr->pd_lower = (LocationIndex) lower;
352 	phdr->pd_upper = (LocationIndex) upper;
353 
354 	return offsetNumber;
355 }
356 
357 
358 /*
359  * PageGetTempPage
360  *		Get a temporary page in local memory for special processing.
361  *		The returned page is not initialized at all; caller must do that.
362  */
363 Page
PageGetTempPage(Page page)364 PageGetTempPage(Page page)
365 {
366 	Size		pageSize;
367 	Page		temp;
368 
369 	pageSize = PageGetPageSize(page);
370 	temp = (Page) palloc(pageSize);
371 
372 	return temp;
373 }
374 
375 /*
376  * PageGetTempPageCopy
377  *		Get a temporary page in local memory for special processing.
378  *		The page is initialized by copying the contents of the given page.
379  */
380 Page
PageGetTempPageCopy(Page page)381 PageGetTempPageCopy(Page page)
382 {
383 	Size		pageSize;
384 	Page		temp;
385 
386 	pageSize = PageGetPageSize(page);
387 	temp = (Page) palloc(pageSize);
388 
389 	memcpy(temp, page, pageSize);
390 
391 	return temp;
392 }
393 
394 /*
395  * PageGetTempPageCopySpecial
396  *		Get a temporary page in local memory for special processing.
397  *		The page is PageInit'd with the same special-space size as the
398  *		given page, and the special space is copied from the given page.
399  */
400 Page
PageGetTempPageCopySpecial(Page page)401 PageGetTempPageCopySpecial(Page page)
402 {
403 	Size		pageSize;
404 	Page		temp;
405 
406 	pageSize = PageGetPageSize(page);
407 	temp = (Page) palloc(pageSize);
408 
409 	PageInit(temp, pageSize, PageGetSpecialSize(page));
410 	memcpy(PageGetSpecialPointer(temp),
411 		   PageGetSpecialPointer(page),
412 		   PageGetSpecialSize(page));
413 
414 	return temp;
415 }
416 
417 /*
418  * PageRestoreTempPage
419  *		Copy temporary page back to permanent page after special processing
420  *		and release the temporary page.
421  */
422 void
PageRestoreTempPage(Page tempPage,Page oldPage)423 PageRestoreTempPage(Page tempPage, Page oldPage)
424 {
425 	Size		pageSize;
426 
427 	pageSize = PageGetPageSize(tempPage);
428 	memcpy((char *) oldPage, (char *) tempPage, pageSize);
429 
430 	pfree(tempPage);
431 }
432 
433 /*
434  * sorting support for PageRepairFragmentation and PageIndexMultiDelete
435  */
436 typedef struct itemIdSortData
437 {
438 	uint16		offsetindex;	/* linp array index */
439 	int16		itemoff;		/* page offset of item data */
440 	uint16		alignedlen;		/* MAXALIGN(item data len) */
441 } itemIdSortData;
442 typedef itemIdSortData *itemIdSort;
443 
444 static int
itemoffcompare(const void * itemidp1,const void * itemidp2)445 itemoffcompare(const void *itemidp1, const void *itemidp2)
446 {
447 	/* Sort in decreasing itemoff order */
448 	return ((itemIdSort) itemidp2)->itemoff -
449 		((itemIdSort) itemidp1)->itemoff;
450 }
451 
452 /*
453  * After removing or marking some line pointers unused, move the tuples to
454  * remove the gaps caused by the removed items.
455  */
456 static void
compactify_tuples(itemIdSort itemidbase,int nitems,Page page)457 compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
458 {
459 	PageHeader	phdr = (PageHeader) page;
460 	Offset		upper;
461 	int			i;
462 
463 	/* sort itemIdSortData array into decreasing itemoff order */
464 	qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
465 		  itemoffcompare);
466 
467 	upper = phdr->pd_special;
468 	for (i = 0; i < nitems; i++)
469 	{
470 		itemIdSort	itemidptr = &itemidbase[i];
471 		ItemId		lp;
472 
473 		lp = PageGetItemId(page, itemidptr->offsetindex + 1);
474 		upper -= itemidptr->alignedlen;
475 		memmove((char *) page + upper,
476 				(char *) page + itemidptr->itemoff,
477 				itemidptr->alignedlen);
478 		lp->lp_off = upper;
479 	}
480 
481 	phdr->pd_upper = upper;
482 }
483 
484 /*
485  * PageRepairFragmentation
486  *
487  * Frees fragmented space on a page.
488  * It doesn't remove unused line pointers! Please don't change this.
489  *
490  * This routine is usable for heap pages only, but see PageIndexMultiDelete.
491  *
492  * As a side effect, the page's PD_HAS_FREE_LINES hint bit is updated.
493  */
494 void
PageRepairFragmentation(Page page)495 PageRepairFragmentation(Page page)
496 {
497 	Offset		pd_lower = ((PageHeader) page)->pd_lower;
498 	Offset		pd_upper = ((PageHeader) page)->pd_upper;
499 	Offset		pd_special = ((PageHeader) page)->pd_special;
500 	itemIdSortData itemidbase[MaxHeapTuplesPerPage];
501 	itemIdSort	itemidptr;
502 	ItemId		lp;
503 	int			nline,
504 				nstorage,
505 				nunused;
506 	int			i;
507 	Size		totallen;
508 
509 	/*
510 	 * It's worth the trouble to be more paranoid here than in most places,
511 	 * because we are about to reshuffle data in (what is usually) a shared
512 	 * disk buffer.  If we aren't careful then corrupted pointers, lengths,
513 	 * etc could cause us to clobber adjacent disk buffers, spreading the data
514 	 * loss further.  So, check everything.
515 	 */
516 	if (pd_lower < SizeOfPageHeaderData ||
517 		pd_lower > pd_upper ||
518 		pd_upper > pd_special ||
519 		pd_special > BLCKSZ ||
520 		pd_special != MAXALIGN(pd_special))
521 		ereport(ERROR,
522 				(errcode(ERRCODE_DATA_CORRUPTED),
523 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
524 						pd_lower, pd_upper, pd_special)));
525 
526 	/*
527 	 * Run through the line pointer array and collect data about live items.
528 	 */
529 	nline = PageGetMaxOffsetNumber(page);
530 	itemidptr = itemidbase;
531 	nunused = totallen = 0;
532 	for (i = FirstOffsetNumber; i <= nline; i++)
533 	{
534 		lp = PageGetItemId(page, i);
535 		if (ItemIdIsUsed(lp))
536 		{
537 			if (ItemIdHasStorage(lp))
538 			{
539 				itemidptr->offsetindex = i - 1;
540 				itemidptr->itemoff = ItemIdGetOffset(lp);
541 				if (unlikely(itemidptr->itemoff < (int) pd_upper ||
542 							 itemidptr->itemoff >= (int) pd_special))
543 					ereport(ERROR,
544 							(errcode(ERRCODE_DATA_CORRUPTED),
545 							 errmsg("corrupted line pointer: %u",
546 									itemidptr->itemoff)));
547 				itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
548 				totallen += itemidptr->alignedlen;
549 				itemidptr++;
550 			}
551 		}
552 		else
553 		{
554 			/* Unused entries should have lp_len = 0, but make sure */
555 			ItemIdSetUnused(lp);
556 			nunused++;
557 		}
558 	}
559 
560 	nstorage = itemidptr - itemidbase;
561 	if (nstorage == 0)
562 	{
563 		/* Page is completely empty, so just reset it quickly */
564 		((PageHeader) page)->pd_upper = pd_special;
565 	}
566 	else
567 	{
568 		/* Need to compact the page the hard way */
569 		if (totallen > (Size) (pd_special - pd_lower))
570 			ereport(ERROR,
571 					(errcode(ERRCODE_DATA_CORRUPTED),
572 					 errmsg("corrupted item lengths: total %u, available space %u",
573 							(unsigned int) totallen, pd_special - pd_lower)));
574 
575 		compactify_tuples(itemidbase, nstorage, page);
576 	}
577 
578 	/* Set hint bit for PageAddItem */
579 	if (nunused > 0)
580 		PageSetHasFreeLinePointers(page);
581 	else
582 		PageClearHasFreeLinePointers(page);
583 }
584 
585 /*
586  * PageGetFreeSpace
587  *		Returns the size of the free (allocatable) space on a page,
588  *		reduced by the space needed for a new line pointer.
589  *
590  * Note: this should usually only be used on index pages.  Use
591  * PageGetHeapFreeSpace on heap pages.
592  */
593 Size
PageGetFreeSpace(Page page)594 PageGetFreeSpace(Page page)
595 {
596 	int			space;
597 
598 	/*
599 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
600 	 * pd_upper.
601 	 */
602 	space = (int) ((PageHeader) page)->pd_upper -
603 		(int) ((PageHeader) page)->pd_lower;
604 
605 	if (space < (int) sizeof(ItemIdData))
606 		return 0;
607 	space -= sizeof(ItemIdData);
608 
609 	return (Size) space;
610 }
611 
612 /*
613  * PageGetFreeSpaceForMultipleTuples
614  *		Returns the size of the free (allocatable) space on a page,
615  *		reduced by the space needed for multiple new line pointers.
616  *
617  * Note: this should usually only be used on index pages.  Use
618  * PageGetHeapFreeSpace on heap pages.
619  */
620 Size
PageGetFreeSpaceForMultipleTuples(Page page,int ntups)621 PageGetFreeSpaceForMultipleTuples(Page page, int ntups)
622 {
623 	int			space;
624 
625 	/*
626 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
627 	 * pd_upper.
628 	 */
629 	space = (int) ((PageHeader) page)->pd_upper -
630 		(int) ((PageHeader) page)->pd_lower;
631 
632 	if (space < (int) (ntups * sizeof(ItemIdData)))
633 		return 0;
634 	space -= ntups * sizeof(ItemIdData);
635 
636 	return (Size) space;
637 }
638 
639 /*
640  * PageGetExactFreeSpace
641  *		Returns the size of the free (allocatable) space on a page,
642  *		without any consideration for adding/removing line pointers.
643  */
644 Size
PageGetExactFreeSpace(Page page)645 PageGetExactFreeSpace(Page page)
646 {
647 	int			space;
648 
649 	/*
650 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
651 	 * pd_upper.
652 	 */
653 	space = (int) ((PageHeader) page)->pd_upper -
654 		(int) ((PageHeader) page)->pd_lower;
655 
656 	if (space < 0)
657 		return 0;
658 
659 	return (Size) space;
660 }
661 
662 
663 /*
664  * PageGetHeapFreeSpace
665  *		Returns the size of the free (allocatable) space on a page,
666  *		reduced by the space needed for a new line pointer.
667  *
668  * The difference between this and PageGetFreeSpace is that this will return
669  * zero if there are already MaxHeapTuplesPerPage line pointers in the page
670  * and none are free.  We use this to enforce that no more than
671  * MaxHeapTuplesPerPage line pointers are created on a heap page.  (Although
672  * no more tuples than that could fit anyway, in the presence of redirected
673  * or dead line pointers it'd be possible to have too many line pointers.
674  * To avoid breaking code that assumes MaxHeapTuplesPerPage is a hard limit
675  * on the number of line pointers, we make this extra check.)
676  */
677 Size
PageGetHeapFreeSpace(Page page)678 PageGetHeapFreeSpace(Page page)
679 {
680 	Size		space;
681 
682 	space = PageGetFreeSpace(page);
683 	if (space > 0)
684 	{
685 		OffsetNumber offnum,
686 					nline;
687 
688 		/*
689 		 * Are there already MaxHeapTuplesPerPage line pointers in the page?
690 		 */
691 		nline = PageGetMaxOffsetNumber(page);
692 		if (nline >= MaxHeapTuplesPerPage)
693 		{
694 			if (PageHasFreeLinePointers((PageHeader) page))
695 			{
696 				/*
697 				 * Since this is just a hint, we must confirm that there is
698 				 * indeed a free line pointer
699 				 */
700 				for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
701 				{
702 					ItemId		lp = PageGetItemId(page, offnum);
703 
704 					if (!ItemIdIsUsed(lp))
705 						break;
706 				}
707 
708 				if (offnum > nline)
709 				{
710 					/*
711 					 * The hint is wrong, but we can't clear it here since we
712 					 * don't have the ability to mark the page dirty.
713 					 */
714 					space = 0;
715 				}
716 			}
717 			else
718 			{
719 				/*
720 				 * Although the hint might be wrong, PageAddItem will believe
721 				 * it anyway, so we must believe it too.
722 				 */
723 				space = 0;
724 			}
725 		}
726 	}
727 	return space;
728 }
729 
730 
731 /*
732  * PageIndexTupleDelete
733  *
734  * This routine does the work of removing a tuple from an index page.
735  *
736  * Unlike heap pages, we compact out the line pointer for the removed tuple.
737  */
738 void
PageIndexTupleDelete(Page page,OffsetNumber offnum)739 PageIndexTupleDelete(Page page, OffsetNumber offnum)
740 {
741 	PageHeader	phdr = (PageHeader) page;
742 	char	   *addr;
743 	ItemId		tup;
744 	Size		size;
745 	unsigned	offset;
746 	int			nbytes;
747 	int			offidx;
748 	int			nline;
749 
750 	/*
751 	 * As with PageRepairFragmentation, paranoia seems justified.
752 	 */
753 	if (phdr->pd_lower < SizeOfPageHeaderData ||
754 		phdr->pd_lower > phdr->pd_upper ||
755 		phdr->pd_upper > phdr->pd_special ||
756 		phdr->pd_special > BLCKSZ ||
757 		phdr->pd_special != MAXALIGN(phdr->pd_special))
758 		ereport(ERROR,
759 				(errcode(ERRCODE_DATA_CORRUPTED),
760 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
761 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
762 
763 	nline = PageGetMaxOffsetNumber(page);
764 	if ((int) offnum <= 0 || (int) offnum > nline)
765 		elog(ERROR, "invalid index offnum: %u", offnum);
766 
767 	/* change offset number to offset index */
768 	offidx = offnum - 1;
769 
770 	tup = PageGetItemId(page, offnum);
771 	Assert(ItemIdHasStorage(tup));
772 	size = ItemIdGetLength(tup);
773 	offset = ItemIdGetOffset(tup);
774 
775 	if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
776 		offset != MAXALIGN(offset))
777 		ereport(ERROR,
778 				(errcode(ERRCODE_DATA_CORRUPTED),
779 				 errmsg("corrupted line pointer: offset = %u, size = %u",
780 						offset, (unsigned int) size)));
781 
782 	/* Amount of space to actually be deleted */
783 	size = MAXALIGN(size);
784 
785 	/*
786 	 * First, we want to get rid of the pd_linp entry for the index tuple. We
787 	 * copy all subsequent linp's back one slot in the array. We don't use
788 	 * PageGetItemId, because we are manipulating the _array_, not individual
789 	 * linp's.
790 	 */
791 	nbytes = phdr->pd_lower -
792 		((char *) &phdr->pd_linp[offidx + 1] - (char *) phdr);
793 
794 	if (nbytes > 0)
795 		memmove((char *) &(phdr->pd_linp[offidx]),
796 				(char *) &(phdr->pd_linp[offidx + 1]),
797 				nbytes);
798 
799 	/*
800 	 * Now move everything between the old upper bound (beginning of tuple
801 	 * space) and the beginning of the deleted tuple forward, so that space in
802 	 * the middle of the page is left free.  If we've just deleted the tuple
803 	 * at the beginning of tuple space, then there's no need to do the copy.
804 	 */
805 
806 	/* beginning of tuple space */
807 	addr = (char *) page + phdr->pd_upper;
808 
809 	if (offset > phdr->pd_upper)
810 		memmove(addr + size, addr, offset - phdr->pd_upper);
811 
812 	/* adjust free space boundary pointers */
813 	phdr->pd_upper += size;
814 	phdr->pd_lower -= sizeof(ItemIdData);
815 
816 	/*
817 	 * Finally, we need to adjust the linp entries that remain.
818 	 *
819 	 * Anything that used to be before the deleted tuple's data was moved
820 	 * forward by the size of the deleted tuple.
821 	 */
822 	if (!PageIsEmpty(page))
823 	{
824 		int			i;
825 
826 		nline--;				/* there's one less than when we started */
827 		for (i = 1; i <= nline; i++)
828 		{
829 			ItemId		ii = PageGetItemId(phdr, i);
830 
831 			Assert(ItemIdHasStorage(ii));
832 			if (ItemIdGetOffset(ii) <= offset)
833 				ii->lp_off += size;
834 		}
835 	}
836 }
837 
838 
839 /*
840  * PageIndexMultiDelete
841  *
842  * This routine handles the case of deleting multiple tuples from an
843  * index page at once.  It is considerably faster than a loop around
844  * PageIndexTupleDelete ... however, the caller *must* supply the array
845  * of item numbers to be deleted in item number order!
846  */
847 void
PageIndexMultiDelete(Page page,OffsetNumber * itemnos,int nitems)848 PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
849 {
850 	PageHeader	phdr = (PageHeader) page;
851 	Offset		pd_lower = phdr->pd_lower;
852 	Offset		pd_upper = phdr->pd_upper;
853 	Offset		pd_special = phdr->pd_special;
854 	itemIdSortData itemidbase[MaxIndexTuplesPerPage];
855 	ItemIdData	newitemids[MaxIndexTuplesPerPage];
856 	itemIdSort	itemidptr;
857 	ItemId		lp;
858 	int			nline,
859 				nused;
860 	Size		totallen;
861 	Size		size;
862 	unsigned	offset;
863 	int			nextitm;
864 	OffsetNumber offnum;
865 
866 	Assert(nitems <= MaxIndexTuplesPerPage);
867 
868 	/*
869 	 * If there aren't very many items to delete, then retail
870 	 * PageIndexTupleDelete is the best way.  Delete the items in reverse
871 	 * order so we don't have to think about adjusting item numbers for
872 	 * previous deletions.
873 	 *
874 	 * TODO: tune the magic number here
875 	 */
876 	if (nitems <= 2)
877 	{
878 		while (--nitems >= 0)
879 			PageIndexTupleDelete(page, itemnos[nitems]);
880 		return;
881 	}
882 
883 	/*
884 	 * As with PageRepairFragmentation, paranoia seems justified.
885 	 */
886 	if (pd_lower < SizeOfPageHeaderData ||
887 		pd_lower > pd_upper ||
888 		pd_upper > pd_special ||
889 		pd_special > BLCKSZ ||
890 		pd_special != MAXALIGN(pd_special))
891 		ereport(ERROR,
892 				(errcode(ERRCODE_DATA_CORRUPTED),
893 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
894 						pd_lower, pd_upper, pd_special)));
895 
896 	/*
897 	 * Scan the line pointer array and build a list of just the ones we are
898 	 * going to keep.  Notice we do not modify the page yet, since we are
899 	 * still validity-checking.
900 	 */
901 	nline = PageGetMaxOffsetNumber(page);
902 	itemidptr = itemidbase;
903 	totallen = 0;
904 	nused = 0;
905 	nextitm = 0;
906 	for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
907 	{
908 		lp = PageGetItemId(page, offnum);
909 		Assert(ItemIdHasStorage(lp));
910 		size = ItemIdGetLength(lp);
911 		offset = ItemIdGetOffset(lp);
912 		if (offset < pd_upper ||
913 			(offset + size) > pd_special ||
914 			offset != MAXALIGN(offset))
915 			ereport(ERROR,
916 					(errcode(ERRCODE_DATA_CORRUPTED),
917 					 errmsg("corrupted line pointer: offset = %u, size = %u",
918 							offset, (unsigned int) size)));
919 
920 		if (nextitm < nitems && offnum == itemnos[nextitm])
921 		{
922 			/* skip item to be deleted */
923 			nextitm++;
924 		}
925 		else
926 		{
927 			itemidptr->offsetindex = nused; /* where it will go */
928 			itemidptr->itemoff = offset;
929 			itemidptr->alignedlen = MAXALIGN(size);
930 			totallen += itemidptr->alignedlen;
931 			newitemids[nused] = *lp;
932 			itemidptr++;
933 			nused++;
934 		}
935 	}
936 
937 	/* this will catch invalid or out-of-order itemnos[] */
938 	if (nextitm != nitems)
939 		elog(ERROR, "incorrect index offsets supplied");
940 
941 	if (totallen > (Size) (pd_special - pd_lower))
942 		ereport(ERROR,
943 				(errcode(ERRCODE_DATA_CORRUPTED),
944 				 errmsg("corrupted item lengths: total %u, available space %u",
945 						(unsigned int) totallen, pd_special - pd_lower)));
946 
947 	/*
948 	 * Looks good. Overwrite the line pointers with the copy, from which we've
949 	 * removed all the unused items.
950 	 */
951 	memcpy(phdr->pd_linp, newitemids, nused * sizeof(ItemIdData));
952 	phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
953 
954 	/* and compactify the tuple data */
955 	compactify_tuples(itemidbase, nused, page);
956 }
957 
958 
959 /*
960  * PageIndexTupleDeleteNoCompact
961  *
962  * Remove the specified tuple from an index page, but set its line pointer
963  * to "unused" instead of compacting it out, except that it can be removed
964  * if it's the last line pointer on the page.
965  *
966  * This is used for index AMs that require that existing TIDs of live tuples
967  * remain unchanged, and are willing to allow unused line pointers instead.
968  */
969 void
PageIndexTupleDeleteNoCompact(Page page,OffsetNumber offnum)970 PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum)
971 {
972 	PageHeader	phdr = (PageHeader) page;
973 	char	   *addr;
974 	ItemId		tup;
975 	Size		size;
976 	unsigned	offset;
977 	int			nline;
978 
979 	/*
980 	 * As with PageRepairFragmentation, paranoia seems justified.
981 	 */
982 	if (phdr->pd_lower < SizeOfPageHeaderData ||
983 		phdr->pd_lower > phdr->pd_upper ||
984 		phdr->pd_upper > phdr->pd_special ||
985 		phdr->pd_special > BLCKSZ ||
986 		phdr->pd_special != MAXALIGN(phdr->pd_special))
987 		ereport(ERROR,
988 				(errcode(ERRCODE_DATA_CORRUPTED),
989 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
990 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
991 
992 	nline = PageGetMaxOffsetNumber(page);
993 	if ((int) offnum <= 0 || (int) offnum > nline)
994 		elog(ERROR, "invalid index offnum: %u", offnum);
995 
996 	tup = PageGetItemId(page, offnum);
997 	Assert(ItemIdHasStorage(tup));
998 	size = ItemIdGetLength(tup);
999 	offset = ItemIdGetOffset(tup);
1000 
1001 	if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
1002 		offset != MAXALIGN(offset))
1003 		ereport(ERROR,
1004 				(errcode(ERRCODE_DATA_CORRUPTED),
1005 				 errmsg("corrupted line pointer: offset = %u, size = %u",
1006 						offset, (unsigned int) size)));
1007 
1008 	/* Amount of space to actually be deleted */
1009 	size = MAXALIGN(size);
1010 
1011 	/*
1012 	 * Either set the line pointer to "unused", or zap it if it's the last
1013 	 * one.  (Note: it's possible that the next-to-last one(s) are already
1014 	 * unused, but we do not trouble to try to compact them out if so.)
1015 	 */
1016 	if ((int) offnum < nline)
1017 		ItemIdSetUnused(tup);
1018 	else
1019 	{
1020 		phdr->pd_lower -= sizeof(ItemIdData);
1021 		nline--;				/* there's one less than when we started */
1022 	}
1023 
1024 	/*
1025 	 * Now move everything between the old upper bound (beginning of tuple
1026 	 * space) and the beginning of the deleted tuple forward, so that space in
1027 	 * the middle of the page is left free.  If we've just deleted the tuple
1028 	 * at the beginning of tuple space, then there's no need to do the copy.
1029 	 */
1030 
1031 	/* beginning of tuple space */
1032 	addr = (char *) page + phdr->pd_upper;
1033 
1034 	if (offset > phdr->pd_upper)
1035 		memmove(addr + size, addr, offset - phdr->pd_upper);
1036 
1037 	/* adjust free space boundary pointer */
1038 	phdr->pd_upper += size;
1039 
1040 	/*
1041 	 * Finally, we need to adjust the linp entries that remain.
1042 	 *
1043 	 * Anything that used to be before the deleted tuple's data was moved
1044 	 * forward by the size of the deleted tuple.
1045 	 */
1046 	if (!PageIsEmpty(page))
1047 	{
1048 		int			i;
1049 
1050 		for (i = 1; i <= nline; i++)
1051 		{
1052 			ItemId		ii = PageGetItemId(phdr, i);
1053 
1054 			if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1055 				ii->lp_off += size;
1056 		}
1057 	}
1058 }
1059 
1060 
1061 /*
1062  * PageIndexTupleOverwrite
1063  *
1064  * Replace a specified tuple on an index page.
1065  *
1066  * The new tuple is placed exactly where the old one had been, shifting
1067  * other tuples' data up or down as needed to keep the page compacted.
1068  * This is better than deleting and reinserting the tuple, because it
1069  * avoids any data shifting when the tuple size doesn't change; and
1070  * even when it does, we avoid moving the line pointers around.
1071  * This could be used by an index AM that doesn't want to unset the
1072  * LP_DEAD bit when it happens to be set.  It could conceivably also be
1073  * used by an index AM that cares about the physical order of tuples as
1074  * well as their logical/ItemId order.
1075  *
1076  * If there's insufficient space for the new tuple, return false.  Other
1077  * errors represent data-corruption problems, so we just elog.
1078  */
1079 bool
PageIndexTupleOverwrite(Page page,OffsetNumber offnum,Item newtup,Size newsize)1080 PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
1081 						Item newtup, Size newsize)
1082 {
1083 	PageHeader	phdr = (PageHeader) page;
1084 	ItemId		tupid;
1085 	int			oldsize;
1086 	unsigned	offset;
1087 	Size		alignednewsize;
1088 	int			size_diff;
1089 	int			itemcount;
1090 
1091 	/*
1092 	 * As with PageRepairFragmentation, paranoia seems justified.
1093 	 */
1094 	if (phdr->pd_lower < SizeOfPageHeaderData ||
1095 		phdr->pd_lower > phdr->pd_upper ||
1096 		phdr->pd_upper > phdr->pd_special ||
1097 		phdr->pd_special > BLCKSZ ||
1098 		phdr->pd_special != MAXALIGN(phdr->pd_special))
1099 		ereport(ERROR,
1100 				(errcode(ERRCODE_DATA_CORRUPTED),
1101 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
1102 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
1103 
1104 	itemcount = PageGetMaxOffsetNumber(page);
1105 	if ((int) offnum <= 0 || (int) offnum > itemcount)
1106 		elog(ERROR, "invalid index offnum: %u", offnum);
1107 
1108 	tupid = PageGetItemId(page, offnum);
1109 	Assert(ItemIdHasStorage(tupid));
1110 	oldsize = ItemIdGetLength(tupid);
1111 	offset = ItemIdGetOffset(tupid);
1112 
1113 	if (offset < phdr->pd_upper || (offset + oldsize) > phdr->pd_special ||
1114 		offset != MAXALIGN(offset))
1115 		ereport(ERROR,
1116 				(errcode(ERRCODE_DATA_CORRUPTED),
1117 				 errmsg("corrupted line pointer: offset = %u, size = %u",
1118 						offset, (unsigned int) oldsize)));
1119 
1120 	/*
1121 	 * Determine actual change in space requirement, check for page overflow.
1122 	 */
1123 	oldsize = MAXALIGN(oldsize);
1124 	alignednewsize = MAXALIGN(newsize);
1125 	if (alignednewsize > oldsize + (phdr->pd_upper - phdr->pd_lower))
1126 		return false;
1127 
1128 	/*
1129 	 * Relocate existing data and update line pointers, unless the new tuple
1130 	 * is the same size as the old (after alignment), in which case there's
1131 	 * nothing to do.  Notice that what we have to relocate is data before the
1132 	 * target tuple, not data after, so it's convenient to express size_diff
1133 	 * as the amount by which the tuple's size is decreasing, making it the
1134 	 * delta to add to pd_upper and affected line pointers.
1135 	 */
1136 	size_diff = oldsize - (int) alignednewsize;
1137 	if (size_diff != 0)
1138 	{
1139 		char	   *addr = (char *) page + phdr->pd_upper;
1140 		int			i;
1141 
1142 		/* relocate all tuple data before the target tuple */
1143 		memmove(addr + size_diff, addr, offset - phdr->pd_upper);
1144 
1145 		/* adjust free space boundary pointer */
1146 		phdr->pd_upper += size_diff;
1147 
1148 		/* adjust affected line pointers too */
1149 		for (i = FirstOffsetNumber; i <= itemcount; i++)
1150 		{
1151 			ItemId		ii = PageGetItemId(phdr, i);
1152 
1153 			/* Allow items without storage; currently only BRIN needs that */
1154 			if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1155 				ii->lp_off += size_diff;
1156 		}
1157 	}
1158 
1159 	/* Update the item's tuple length without changing its lp_flags field */
1160 	tupid->lp_off = offset + size_diff;
1161 	tupid->lp_len = newsize;
1162 
1163 	/* Copy new tuple data onto page */
1164 	memcpy(PageGetItem(page, tupid), newtup, newsize);
1165 
1166 	return true;
1167 }
1168 
1169 
1170 /*
1171  * Set checksum for a page in shared buffers.
1172  *
1173  * If checksums are disabled, or if the page is not initialized, just return
1174  * the input.  Otherwise, we must make a copy of the page before calculating
1175  * the checksum, to prevent concurrent modifications (e.g. setting hint bits)
1176  * from making the final checksum invalid.  It doesn't matter if we include or
1177  * exclude hints during the copy, as long as we write a valid page and
1178  * associated checksum.
1179  *
1180  * Returns a pointer to the block-sized data that needs to be written. Uses
1181  * statically-allocated memory, so the caller must immediately write the
1182  * returned page and not refer to it again.
1183  */
1184 char *
PageSetChecksumCopy(Page page,BlockNumber blkno)1185 PageSetChecksumCopy(Page page, BlockNumber blkno)
1186 {
1187 	static char *pageCopy = NULL;
1188 
1189 	/* If we don't need a checksum, just return the passed-in data */
1190 	if (PageIsNew(page) || !DataChecksumsEnabled())
1191 		return (char *) page;
1192 
1193 	/*
1194 	 * We allocate the copy space once and use it over on each subsequent
1195 	 * call.  The point of palloc'ing here, rather than having a static char
1196 	 * array, is first to ensure adequate alignment for the checksumming code
1197 	 * and second to avoid wasting space in processes that never call this.
1198 	 */
1199 	if (pageCopy == NULL)
1200 		pageCopy = MemoryContextAlloc(TopMemoryContext, BLCKSZ);
1201 
1202 	memcpy(pageCopy, (char *) page, BLCKSZ);
1203 	((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno);
1204 	return pageCopy;
1205 }
1206 
1207 /*
1208  * Set checksum for a page in private memory.
1209  *
1210  * This must only be used when we know that no other process can be modifying
1211  * the page buffer.
1212  */
1213 void
PageSetChecksumInplace(Page page,BlockNumber blkno)1214 PageSetChecksumInplace(Page page, BlockNumber blkno)
1215 {
1216 	/* If we don't need a checksum, just return */
1217 	if (PageIsNew(page) || !DataChecksumsEnabled())
1218 		return;
1219 
1220 	((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blkno);
1221 }
1222