1 /*-------------------------------------------------------------------------
2  *
3  * bufpage.c
4  *	  POSTGRES standard buffer page code.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/page/bufpage.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/itup.h"
19 #include "access/xlog.h"
20 #include "storage/checksum.h"
21 #include "utils/memdebug.h"
22 #include "utils/memutils.h"
23 
24 
25 /* GUC variable */
26 bool		ignore_checksum_failure = false;
27 
28 
29 /* ----------------------------------------------------------------
30  *						Page support functions
31  * ----------------------------------------------------------------
32  */
33 
34 /*
35  * PageInit
36  *		Initializes the contents of a page.
37  *		Note that we don't calculate an initial checksum here; that's not done
38  *		until it's time to write.
39  */
40 void
PageInit(Page page,Size pageSize,Size specialSize)41 PageInit(Page page, Size pageSize, Size specialSize)
42 {
43 	PageHeader	p = (PageHeader) page;
44 
45 	specialSize = MAXALIGN(specialSize);
46 
47 	Assert(pageSize == BLCKSZ);
48 	Assert(pageSize > specialSize + SizeOfPageHeaderData);
49 
50 	/* Make sure all fields of page are zero, as well as unused space */
51 	MemSet(p, 0, pageSize);
52 
53 	p->pd_flags = 0;
54 	p->pd_lower = SizeOfPageHeaderData;
55 	p->pd_upper = pageSize - specialSize;
56 	p->pd_special = pageSize - specialSize;
57 	PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
58 	/* p->pd_prune_xid = InvalidTransactionId;		done by above MemSet */
59 }
60 
61 
62 /*
63  * PageIsVerified
64  *		Utility wrapper for PageIsVerifiedExtended().
65  */
66 bool
PageIsVerified(Page page,BlockNumber blkno)67 PageIsVerified(Page page, BlockNumber blkno)
68 {
69 	return PageIsVerifiedExtended(page, blkno, PIV_LOG_WARNING);
70 }
71 
72 
73 /*
74  * PageIsVerifiedExtended
75  *		Check that the page header and checksum (if any) appear valid.
76  *
77  * This is called when a page has just been read in from disk.  The idea is
78  * to cheaply detect trashed pages before we go nuts following bogus item
79  * pointers, testing invalid transaction identifiers, etc.
80  *
81  * It turns out to be necessary to allow zeroed pages here too.  Even though
82  * this routine is *not* called when deliberately adding a page to a relation,
83  * there are scenarios in which a zeroed page might be found in a table.
84  * (Example: a backend extends a relation, then crashes before it can write
85  * any WAL entry about the new page.  The kernel will already have the
86  * zeroed page in the file, and it will stay that way after restart.)  So we
87  * allow zeroed pages here, and are careful that the page access macros
88  * treat such a page as empty and without free space.  Eventually, VACUUM
89  * will clean up such a page and make it usable.
90  *
91  * If flag PIV_LOG_WARNING is set, a WARNING is logged in the event of
92  * a checksum failure.
93  */
94 bool
PageIsVerifiedExtended(Page page,BlockNumber blkno,int flags)95 PageIsVerifiedExtended(Page page, BlockNumber blkno, int flags)
96 {
97 	PageHeader	p = (PageHeader) page;
98 	size_t	   *pagebytes;
99 	int			i;
100 	bool		checksum_failure = false;
101 	bool		header_sane = false;
102 	bool		all_zeroes = false;
103 	uint16		checksum = 0;
104 
105 	/*
106 	 * Don't verify page data unless the page passes basic non-zero test
107 	 */
108 	if (!PageIsNew(page))
109 	{
110 		if (DataChecksumsEnabled())
111 		{
112 			checksum = pg_checksum_page((char *) page, blkno);
113 
114 			if (checksum != p->pd_checksum)
115 				checksum_failure = true;
116 		}
117 
118 		/*
119 		 * The following checks don't prove the header is correct, only that
120 		 * it looks sane enough to allow into the buffer pool. Later usage of
121 		 * the block can still reveal problems, which is why we offer the
122 		 * checksum option.
123 		 */
124 		if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
125 			p->pd_lower <= p->pd_upper &&
126 			p->pd_upper <= p->pd_special &&
127 			p->pd_special <= BLCKSZ &&
128 			p->pd_special == MAXALIGN(p->pd_special))
129 			header_sane = true;
130 
131 		if (header_sane && !checksum_failure)
132 			return true;
133 	}
134 
135 	/*
136 	 * Check all-zeroes case. Luckily BLCKSZ is guaranteed to always be a
137 	 * multiple of size_t - and it's much faster to compare memory using the
138 	 * native word size.
139 	 */
140 	StaticAssertStmt(BLCKSZ == (BLCKSZ / sizeof(size_t)) * sizeof(size_t),
141 					 "BLCKSZ has to be a multiple of sizeof(size_t)");
142 
143 	all_zeroes = true;
144 	pagebytes = (size_t *) page;
145 	for (i = 0; i < (BLCKSZ / sizeof(size_t)); i++)
146 	{
147 		if (pagebytes[i] != 0)
148 		{
149 			all_zeroes = false;
150 			break;
151 		}
152 	}
153 
154 	if (all_zeroes)
155 		return true;
156 
157 	/*
158 	 * Throw a WARNING if the checksum fails, but only after we've checked for
159 	 * the all-zeroes case.
160 	 */
161 	if (checksum_failure)
162 	{
163 		if ((flags & PIV_LOG_WARNING) != 0)
164 			ereport(WARNING,
165 					(errcode(ERRCODE_DATA_CORRUPTED),
166 					 errmsg("page verification failed, calculated checksum %u but expected %u",
167 							checksum, p->pd_checksum)));
168 
169 		if (header_sane && ignore_checksum_failure)
170 			return true;
171 	}
172 
173 	return false;
174 }
175 
176 
177 /*
178  *	PageAddItemExtended
179  *
180  *	Add an item to a page.  Return value is the offset at which it was
181  *	inserted, or InvalidOffsetNumber if the item is not inserted for any
182  *	reason.  A WARNING is issued indicating the reason for the refusal.
183  *
184  *	offsetNumber must be either InvalidOffsetNumber to specify finding a
185  *	free item pointer, or a value between FirstOffsetNumber and one past
186  *	the last existing item, to specify using that particular item pointer.
187  *
188  *	If offsetNumber is valid and flag PAI_OVERWRITE is set, we just store
189  *	the item at the specified offsetNumber, which must be either a
190  *	currently-unused item pointer, or one past the last existing item.
191  *
192  *	If offsetNumber is valid and flag PAI_OVERWRITE is not set, insert
193  *	the item at the specified offsetNumber, moving existing items later
194  *	in the array to make room.
195  *
196  *	If offsetNumber is not valid, then assign a slot by finding the first
197  *	one that is both unused and deallocated.
198  *
199  *	If flag PAI_IS_HEAP is set, we enforce that there can't be more than
200  *	MaxHeapTuplesPerPage line pointers on the page.
201  *
202  *	!!! EREPORT(ERROR) IS DISALLOWED HERE !!!
203  */
204 OffsetNumber
PageAddItemExtended(Page page,Item item,Size size,OffsetNumber offsetNumber,int flags)205 PageAddItemExtended(Page page,
206 					Item item,
207 					Size size,
208 					OffsetNumber offsetNumber,
209 					int flags)
210 {
211 	PageHeader	phdr = (PageHeader) page;
212 	Size		alignedSize;
213 	int			lower;
214 	int			upper;
215 	ItemId		itemId;
216 	OffsetNumber limit;
217 	bool		needshuffle = false;
218 
219 	/*
220 	 * Be wary about corrupted page pointers
221 	 */
222 	if (phdr->pd_lower < SizeOfPageHeaderData ||
223 		phdr->pd_lower > phdr->pd_upper ||
224 		phdr->pd_upper > phdr->pd_special ||
225 		phdr->pd_special > BLCKSZ)
226 		ereport(PANIC,
227 				(errcode(ERRCODE_DATA_CORRUPTED),
228 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
229 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
230 
231 	/*
232 	 * Select offsetNumber to place the new item at
233 	 */
234 	limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
235 
236 	/* was offsetNumber passed in? */
237 	if (OffsetNumberIsValid(offsetNumber))
238 	{
239 		/* yes, check it */
240 		if ((flags & PAI_OVERWRITE) != 0)
241 		{
242 			if (offsetNumber < limit)
243 			{
244 				itemId = PageGetItemId(phdr, offsetNumber);
245 				if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
246 				{
247 					elog(WARNING, "will not overwrite a used ItemId");
248 					return InvalidOffsetNumber;
249 				}
250 			}
251 		}
252 		else
253 		{
254 			if (offsetNumber < limit)
255 				needshuffle = true; /* need to move existing linp's */
256 		}
257 	}
258 	else
259 	{
260 		/* offsetNumber was not passed in, so find a free slot */
261 		/* if no free slot, we'll put it at limit (1st open slot) */
262 		if (PageHasFreeLinePointers(phdr))
263 		{
264 			/*
265 			 * Look for "recyclable" (unused) ItemId.  We check for no storage
266 			 * as well, just to be paranoid --- unused items should never have
267 			 * storage.
268 			 */
269 			for (offsetNumber = 1; offsetNumber < limit; offsetNumber++)
270 			{
271 				itemId = PageGetItemId(phdr, offsetNumber);
272 				if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
273 					break;
274 			}
275 			if (offsetNumber >= limit)
276 			{
277 				/* the hint is wrong, so reset it */
278 				PageClearHasFreeLinePointers(phdr);
279 			}
280 		}
281 		else
282 		{
283 			/* don't bother searching if hint says there's no free slot */
284 			offsetNumber = limit;
285 		}
286 	}
287 
288 	/* Reject placing items beyond the first unused line pointer */
289 	if (offsetNumber > limit)
290 	{
291 		elog(WARNING, "specified item offset is too large");
292 		return InvalidOffsetNumber;
293 	}
294 
295 	/* Reject placing items beyond heap boundary, if heap */
296 	if ((flags & PAI_IS_HEAP) != 0 && offsetNumber > MaxHeapTuplesPerPage)
297 	{
298 		elog(WARNING, "can't put more than MaxHeapTuplesPerPage items in a heap page");
299 		return InvalidOffsetNumber;
300 	}
301 
302 	/*
303 	 * Compute new lower and upper pointers for page, see if it'll fit.
304 	 *
305 	 * Note: do arithmetic as signed ints, to avoid mistakes if, say,
306 	 * alignedSize > pd_upper.
307 	 */
308 	if (offsetNumber == limit || needshuffle)
309 		lower = phdr->pd_lower + sizeof(ItemIdData);
310 	else
311 		lower = phdr->pd_lower;
312 
313 	alignedSize = MAXALIGN(size);
314 
315 	upper = (int) phdr->pd_upper - (int) alignedSize;
316 
317 	if (lower > upper)
318 		return InvalidOffsetNumber;
319 
320 	/*
321 	 * OK to insert the item.  First, shuffle the existing pointers if needed.
322 	 */
323 	itemId = PageGetItemId(phdr, offsetNumber);
324 
325 	if (needshuffle)
326 		memmove(itemId + 1, itemId,
327 				(limit - offsetNumber) * sizeof(ItemIdData));
328 
329 	/* set the item pointer */
330 	ItemIdSetNormal(itemId, upper, size);
331 
332 	/*
333 	 * Items normally contain no uninitialized bytes.  Core bufpage consumers
334 	 * conform, but this is not a necessary coding rule; a new index AM could
335 	 * opt to depart from it.  However, data type input functions and other
336 	 * C-language functions that synthesize datums should initialize all
337 	 * bytes; datumIsEqual() relies on this.  Testing here, along with the
338 	 * similar check in printtup(), helps to catch such mistakes.
339 	 *
340 	 * Values of the "name" type retrieved via index-only scans may contain
341 	 * uninitialized bytes; see comment in btrescan().  Valgrind will report
342 	 * this as an error, but it is safe to ignore.
343 	 */
344 	VALGRIND_CHECK_MEM_IS_DEFINED(item, size);
345 
346 	/* copy the item's data onto the page */
347 	memcpy((char *) page + upper, item, size);
348 
349 	/* adjust page header */
350 	phdr->pd_lower = (LocationIndex) lower;
351 	phdr->pd_upper = (LocationIndex) upper;
352 
353 	return offsetNumber;
354 }
355 
356 
357 /*
358  * PageGetTempPage
359  *		Get a temporary page in local memory for special processing.
360  *		The returned page is not initialized at all; caller must do that.
361  */
362 Page
PageGetTempPage(Page page)363 PageGetTempPage(Page page)
364 {
365 	Size		pageSize;
366 	Page		temp;
367 
368 	pageSize = PageGetPageSize(page);
369 	temp = (Page) palloc(pageSize);
370 
371 	return temp;
372 }
373 
374 /*
375  * PageGetTempPageCopy
376  *		Get a temporary page in local memory for special processing.
377  *		The page is initialized by copying the contents of the given page.
378  */
379 Page
PageGetTempPageCopy(Page page)380 PageGetTempPageCopy(Page page)
381 {
382 	Size		pageSize;
383 	Page		temp;
384 
385 	pageSize = PageGetPageSize(page);
386 	temp = (Page) palloc(pageSize);
387 
388 	memcpy(temp, page, pageSize);
389 
390 	return temp;
391 }
392 
393 /*
394  * PageGetTempPageCopySpecial
395  *		Get a temporary page in local memory for special processing.
396  *		The page is PageInit'd with the same special-space size as the
397  *		given page, and the special space is copied from the given page.
398  */
399 Page
PageGetTempPageCopySpecial(Page page)400 PageGetTempPageCopySpecial(Page page)
401 {
402 	Size		pageSize;
403 	Page		temp;
404 
405 	pageSize = PageGetPageSize(page);
406 	temp = (Page) palloc(pageSize);
407 
408 	PageInit(temp, pageSize, PageGetSpecialSize(page));
409 	memcpy(PageGetSpecialPointer(temp),
410 		   PageGetSpecialPointer(page),
411 		   PageGetSpecialSize(page));
412 
413 	return temp;
414 }
415 
416 /*
417  * PageRestoreTempPage
418  *		Copy temporary page back to permanent page after special processing
419  *		and release the temporary page.
420  */
421 void
PageRestoreTempPage(Page tempPage,Page oldPage)422 PageRestoreTempPage(Page tempPage, Page oldPage)
423 {
424 	Size		pageSize;
425 
426 	pageSize = PageGetPageSize(tempPage);
427 	memcpy((char *) oldPage, (char *) tempPage, pageSize);
428 
429 	pfree(tempPage);
430 }
431 
432 /*
433  * sorting support for PageRepairFragmentation and PageIndexMultiDelete
434  */
435 typedef struct itemIdSortData
436 {
437 	uint16		offsetindex;	/* linp array index */
438 	int16		itemoff;		/* page offset of item data */
439 	uint16		alignedlen;		/* MAXALIGN(item data len) */
440 } itemIdSortData;
441 typedef itemIdSortData *itemIdSort;
442 
443 static int
itemoffcompare(const void * itemidp1,const void * itemidp2)444 itemoffcompare(const void *itemidp1, const void *itemidp2)
445 {
446 	/* Sort in decreasing itemoff order */
447 	return ((itemIdSort) itemidp2)->itemoff -
448 		((itemIdSort) itemidp1)->itemoff;
449 }
450 
451 /*
452  * After removing or marking some line pointers unused, move the tuples to
453  * remove the gaps caused by the removed items.
454  */
455 static void
compactify_tuples(itemIdSort itemidbase,int nitems,Page page)456 compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
457 {
458 	PageHeader	phdr = (PageHeader) page;
459 	Offset		upper;
460 	int			i;
461 
462 	/* sort itemIdSortData array into decreasing itemoff order */
463 	qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
464 		  itemoffcompare);
465 
466 	upper = phdr->pd_special;
467 	for (i = 0; i < nitems; i++)
468 	{
469 		itemIdSort	itemidptr = &itemidbase[i];
470 		ItemId		lp;
471 
472 		lp = PageGetItemId(page, itemidptr->offsetindex + 1);
473 		upper -= itemidptr->alignedlen;
474 		memmove((char *) page + upper,
475 				(char *) page + itemidptr->itemoff,
476 				itemidptr->alignedlen);
477 		lp->lp_off = upper;
478 	}
479 
480 	phdr->pd_upper = upper;
481 }
482 
483 /*
484  * PageRepairFragmentation
485  *
486  * Frees fragmented space on a page.
487  * It doesn't remove unused line pointers! Please don't change this.
488  *
489  * This routine is usable for heap pages only, but see PageIndexMultiDelete.
490  *
491  * As a side effect, the page's PD_HAS_FREE_LINES hint bit is updated.
492  */
493 void
PageRepairFragmentation(Page page)494 PageRepairFragmentation(Page page)
495 {
496 	Offset		pd_lower = ((PageHeader) page)->pd_lower;
497 	Offset		pd_upper = ((PageHeader) page)->pd_upper;
498 	Offset		pd_special = ((PageHeader) page)->pd_special;
499 	itemIdSortData itemidbase[MaxHeapTuplesPerPage];
500 	itemIdSort	itemidptr;
501 	ItemId		lp;
502 	int			nline,
503 				nstorage,
504 				nunused;
505 	int			i;
506 	Size		totallen;
507 
508 	/*
509 	 * It's worth the trouble to be more paranoid here than in most places,
510 	 * because we are about to reshuffle data in (what is usually) a shared
511 	 * disk buffer.  If we aren't careful then corrupted pointers, lengths,
512 	 * etc could cause us to clobber adjacent disk buffers, spreading the data
513 	 * loss further.  So, check everything.
514 	 */
515 	if (pd_lower < SizeOfPageHeaderData ||
516 		pd_lower > pd_upper ||
517 		pd_upper > pd_special ||
518 		pd_special > BLCKSZ ||
519 		pd_special != MAXALIGN(pd_special))
520 		ereport(ERROR,
521 				(errcode(ERRCODE_DATA_CORRUPTED),
522 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
523 						pd_lower, pd_upper, pd_special)));
524 
525 	/*
526 	 * Run through the line pointer array and collect data about live items.
527 	 */
528 	nline = PageGetMaxOffsetNumber(page);
529 	itemidptr = itemidbase;
530 	nunused = totallen = 0;
531 	for (i = FirstOffsetNumber; i <= nline; i++)
532 	{
533 		lp = PageGetItemId(page, i);
534 		if (ItemIdIsUsed(lp))
535 		{
536 			if (ItemIdHasStorage(lp))
537 			{
538 				itemidptr->offsetindex = i - 1;
539 				itemidptr->itemoff = ItemIdGetOffset(lp);
540 				if (unlikely(itemidptr->itemoff < (int) pd_upper ||
541 							 itemidptr->itemoff >= (int) pd_special))
542 					ereport(ERROR,
543 							(errcode(ERRCODE_DATA_CORRUPTED),
544 							 errmsg("corrupted item pointer: %u",
545 									itemidptr->itemoff)));
546 				itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
547 				totallen += itemidptr->alignedlen;
548 				itemidptr++;
549 			}
550 		}
551 		else
552 		{
553 			/* Unused entries should have lp_len = 0, but make sure */
554 			ItemIdSetUnused(lp);
555 			nunused++;
556 		}
557 	}
558 
559 	nstorage = itemidptr - itemidbase;
560 	if (nstorage == 0)
561 	{
562 		/* Page is completely empty, so just reset it quickly */
563 		((PageHeader) page)->pd_upper = pd_special;
564 	}
565 	else
566 	{
567 		/* Need to compact the page the hard way */
568 		if (totallen > (Size) (pd_special - pd_lower))
569 			ereport(ERROR,
570 					(errcode(ERRCODE_DATA_CORRUPTED),
571 					 errmsg("corrupted item lengths: total %u, available space %u",
572 							(unsigned int) totallen, pd_special - pd_lower)));
573 
574 		compactify_tuples(itemidbase, nstorage, page);
575 	}
576 
577 	/* Set hint bit for PageAddItem */
578 	if (nunused > 0)
579 		PageSetHasFreeLinePointers(page);
580 	else
581 		PageClearHasFreeLinePointers(page);
582 }
583 
584 /*
585  * PageGetFreeSpace
586  *		Returns the size of the free (allocatable) space on a page,
587  *		reduced by the space needed for a new line pointer.
588  *
589  * Note: this should usually only be used on index pages.  Use
590  * PageGetHeapFreeSpace on heap pages.
591  */
592 Size
PageGetFreeSpace(Page page)593 PageGetFreeSpace(Page page)
594 {
595 	int			space;
596 
597 	/*
598 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
599 	 * pd_upper.
600 	 */
601 	space = (int) ((PageHeader) page)->pd_upper -
602 		(int) ((PageHeader) page)->pd_lower;
603 
604 	if (space < (int) sizeof(ItemIdData))
605 		return 0;
606 	space -= sizeof(ItemIdData);
607 
608 	return (Size) space;
609 }
610 
611 /*
612  * PageGetFreeSpaceForMultipleTuples
613  *		Returns the size of the free (allocatable) space on a page,
614  *		reduced by the space needed for multiple new line pointers.
615  *
616  * Note: this should usually only be used on index pages.  Use
617  * PageGetHeapFreeSpace on heap pages.
618  */
619 Size
PageGetFreeSpaceForMultipleTuples(Page page,int ntups)620 PageGetFreeSpaceForMultipleTuples(Page page, int ntups)
621 {
622 	int			space;
623 
624 	/*
625 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
626 	 * pd_upper.
627 	 */
628 	space = (int) ((PageHeader) page)->pd_upper -
629 		(int) ((PageHeader) page)->pd_lower;
630 
631 	if (space < (int) (ntups * sizeof(ItemIdData)))
632 		return 0;
633 	space -= ntups * sizeof(ItemIdData);
634 
635 	return (Size) space;
636 }
637 
638 /*
639  * PageGetExactFreeSpace
640  *		Returns the size of the free (allocatable) space on a page,
641  *		without any consideration for adding/removing line pointers.
642  */
643 Size
PageGetExactFreeSpace(Page page)644 PageGetExactFreeSpace(Page page)
645 {
646 	int			space;
647 
648 	/*
649 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
650 	 * pd_upper.
651 	 */
652 	space = (int) ((PageHeader) page)->pd_upper -
653 		(int) ((PageHeader) page)->pd_lower;
654 
655 	if (space < 0)
656 		return 0;
657 
658 	return (Size) space;
659 }
660 
661 
662 /*
663  * PageGetHeapFreeSpace
664  *		Returns the size of the free (allocatable) space on a page,
665  *		reduced by the space needed for a new line pointer.
666  *
667  * The difference between this and PageGetFreeSpace is that this will return
668  * zero if there are already MaxHeapTuplesPerPage line pointers in the page
669  * and none are free.  We use this to enforce that no more than
670  * MaxHeapTuplesPerPage line pointers are created on a heap page.  (Although
671  * no more tuples than that could fit anyway, in the presence of redirected
672  * or dead line pointers it'd be possible to have too many line pointers.
673  * To avoid breaking code that assumes MaxHeapTuplesPerPage is a hard limit
674  * on the number of line pointers, we make this extra check.)
675  */
676 Size
PageGetHeapFreeSpace(Page page)677 PageGetHeapFreeSpace(Page page)
678 {
679 	Size		space;
680 
681 	space = PageGetFreeSpace(page);
682 	if (space > 0)
683 	{
684 		OffsetNumber offnum,
685 					nline;
686 
687 		/*
688 		 * Are there already MaxHeapTuplesPerPage line pointers in the page?
689 		 */
690 		nline = PageGetMaxOffsetNumber(page);
691 		if (nline >= MaxHeapTuplesPerPage)
692 		{
693 			if (PageHasFreeLinePointers((PageHeader) page))
694 			{
695 				/*
696 				 * Since this is just a hint, we must confirm that there is
697 				 * indeed a free line pointer
698 				 */
699 				for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
700 				{
701 					ItemId		lp = PageGetItemId(page, offnum);
702 
703 					if (!ItemIdIsUsed(lp))
704 						break;
705 				}
706 
707 				if (offnum > nline)
708 				{
709 					/*
710 					 * The hint is wrong, but we can't clear it here since we
711 					 * don't have the ability to mark the page dirty.
712 					 */
713 					space = 0;
714 				}
715 			}
716 			else
717 			{
718 				/*
719 				 * Although the hint might be wrong, PageAddItem will believe
720 				 * it anyway, so we must believe it too.
721 				 */
722 				space = 0;
723 			}
724 		}
725 	}
726 	return space;
727 }
728 
729 
730 /*
731  * PageIndexTupleDelete
732  *
733  * This routine does the work of removing a tuple from an index page.
734  *
735  * Unlike heap pages, we compact out the line pointer for the removed tuple.
736  */
737 void
PageIndexTupleDelete(Page page,OffsetNumber offnum)738 PageIndexTupleDelete(Page page, OffsetNumber offnum)
739 {
740 	PageHeader	phdr = (PageHeader) page;
741 	char	   *addr;
742 	ItemId		tup;
743 	Size		size;
744 	unsigned	offset;
745 	int			nbytes;
746 	int			offidx;
747 	int			nline;
748 
749 	/*
750 	 * As with PageRepairFragmentation, paranoia seems justified.
751 	 */
752 	if (phdr->pd_lower < SizeOfPageHeaderData ||
753 		phdr->pd_lower > phdr->pd_upper ||
754 		phdr->pd_upper > phdr->pd_special ||
755 		phdr->pd_special > BLCKSZ ||
756 		phdr->pd_special != MAXALIGN(phdr->pd_special))
757 		ereport(ERROR,
758 				(errcode(ERRCODE_DATA_CORRUPTED),
759 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
760 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
761 
762 	nline = PageGetMaxOffsetNumber(page);
763 	if ((int) offnum <= 0 || (int) offnum > nline)
764 		elog(ERROR, "invalid index offnum: %u", offnum);
765 
766 	/* change offset number to offset index */
767 	offidx = offnum - 1;
768 
769 	tup = PageGetItemId(page, offnum);
770 	Assert(ItemIdHasStorage(tup));
771 	size = ItemIdGetLength(tup);
772 	offset = ItemIdGetOffset(tup);
773 
774 	if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
775 		offset != MAXALIGN(offset))
776 		ereport(ERROR,
777 				(errcode(ERRCODE_DATA_CORRUPTED),
778 				 errmsg("corrupted item pointer: offset = %u, size = %u",
779 						offset, (unsigned int) size)));
780 
781 	/* Amount of space to actually be deleted */
782 	size = MAXALIGN(size);
783 
784 	/*
785 	 * First, we want to get rid of the pd_linp entry for the index tuple. We
786 	 * copy all subsequent linp's back one slot in the array. We don't use
787 	 * PageGetItemId, because we are manipulating the _array_, not individual
788 	 * linp's.
789 	 */
790 	nbytes = phdr->pd_lower -
791 		((char *) &phdr->pd_linp[offidx + 1] - (char *) phdr);
792 
793 	if (nbytes > 0)
794 		memmove((char *) &(phdr->pd_linp[offidx]),
795 				(char *) &(phdr->pd_linp[offidx + 1]),
796 				nbytes);
797 
798 	/*
799 	 * Now move everything between the old upper bound (beginning of tuple
800 	 * space) and the beginning of the deleted tuple forward, so that space in
801 	 * the middle of the page is left free.  If we've just deleted the tuple
802 	 * at the beginning of tuple space, then there's no need to do the copy.
803 	 */
804 
805 	/* beginning of tuple space */
806 	addr = (char *) page + phdr->pd_upper;
807 
808 	if (offset > phdr->pd_upper)
809 		memmove(addr + size, addr, offset - phdr->pd_upper);
810 
811 	/* adjust free space boundary pointers */
812 	phdr->pd_upper += size;
813 	phdr->pd_lower -= sizeof(ItemIdData);
814 
815 	/*
816 	 * Finally, we need to adjust the linp entries that remain.
817 	 *
818 	 * Anything that used to be before the deleted tuple's data was moved
819 	 * forward by the size of the deleted tuple.
820 	 */
821 	if (!PageIsEmpty(page))
822 	{
823 		int			i;
824 
825 		nline--;				/* there's one less than when we started */
826 		for (i = 1; i <= nline; i++)
827 		{
828 			ItemId		ii = PageGetItemId(phdr, i);
829 
830 			Assert(ItemIdHasStorage(ii));
831 			if (ItemIdGetOffset(ii) <= offset)
832 				ii->lp_off += size;
833 		}
834 	}
835 }
836 
837 
838 /*
839  * PageIndexMultiDelete
840  *
841  * This routine handles the case of deleting multiple tuples from an
842  * index page at once.  It is considerably faster than a loop around
843  * PageIndexTupleDelete ... however, the caller *must* supply the array
844  * of item numbers to be deleted in item number order!
845  */
846 void
PageIndexMultiDelete(Page page,OffsetNumber * itemnos,int nitems)847 PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
848 {
849 	PageHeader	phdr = (PageHeader) page;
850 	Offset		pd_lower = phdr->pd_lower;
851 	Offset		pd_upper = phdr->pd_upper;
852 	Offset		pd_special = phdr->pd_special;
853 	itemIdSortData itemidbase[MaxIndexTuplesPerPage];
854 	ItemIdData	newitemids[MaxIndexTuplesPerPage];
855 	itemIdSort	itemidptr;
856 	ItemId		lp;
857 	int			nline,
858 				nused;
859 	Size		totallen;
860 	Size		size;
861 	unsigned	offset;
862 	int			nextitm;
863 	OffsetNumber offnum;
864 
865 	Assert(nitems <= MaxIndexTuplesPerPage);
866 
867 	/*
868 	 * If there aren't very many items to delete, then retail
869 	 * PageIndexTupleDelete is the best way.  Delete the items in reverse
870 	 * order so we don't have to think about adjusting item numbers for
871 	 * previous deletions.
872 	 *
873 	 * TODO: tune the magic number here
874 	 */
875 	if (nitems <= 2)
876 	{
877 		while (--nitems >= 0)
878 			PageIndexTupleDelete(page, itemnos[nitems]);
879 		return;
880 	}
881 
882 	/*
883 	 * As with PageRepairFragmentation, paranoia seems justified.
884 	 */
885 	if (pd_lower < SizeOfPageHeaderData ||
886 		pd_lower > pd_upper ||
887 		pd_upper > pd_special ||
888 		pd_special > BLCKSZ ||
889 		pd_special != MAXALIGN(pd_special))
890 		ereport(ERROR,
891 				(errcode(ERRCODE_DATA_CORRUPTED),
892 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
893 						pd_lower, pd_upper, pd_special)));
894 
895 	/*
896 	 * Scan the item pointer array and build a list of just the ones we are
897 	 * going to keep.  Notice we do not modify the page yet, since we are
898 	 * still validity-checking.
899 	 */
900 	nline = PageGetMaxOffsetNumber(page);
901 	itemidptr = itemidbase;
902 	totallen = 0;
903 	nused = 0;
904 	nextitm = 0;
905 	for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
906 	{
907 		lp = PageGetItemId(page, offnum);
908 		Assert(ItemIdHasStorage(lp));
909 		size = ItemIdGetLength(lp);
910 		offset = ItemIdGetOffset(lp);
911 		if (offset < pd_upper ||
912 			(offset + size) > pd_special ||
913 			offset != MAXALIGN(offset))
914 			ereport(ERROR,
915 					(errcode(ERRCODE_DATA_CORRUPTED),
916 					 errmsg("corrupted item pointer: offset = %u, length = %u",
917 							offset, (unsigned int) size)));
918 
919 		if (nextitm < nitems && offnum == itemnos[nextitm])
920 		{
921 			/* skip item to be deleted */
922 			nextitm++;
923 		}
924 		else
925 		{
926 			itemidptr->offsetindex = nused; /* where it will go */
927 			itemidptr->itemoff = offset;
928 			itemidptr->alignedlen = MAXALIGN(size);
929 			totallen += itemidptr->alignedlen;
930 			newitemids[nused] = *lp;
931 			itemidptr++;
932 			nused++;
933 		}
934 	}
935 
936 	/* this will catch invalid or out-of-order itemnos[] */
937 	if (nextitm != nitems)
938 		elog(ERROR, "incorrect index offsets supplied");
939 
940 	if (totallen > (Size) (pd_special - pd_lower))
941 		ereport(ERROR,
942 				(errcode(ERRCODE_DATA_CORRUPTED),
943 				 errmsg("corrupted item lengths: total %u, available space %u",
944 						(unsigned int) totallen, pd_special - pd_lower)));
945 
946 	/*
947 	 * Looks good. Overwrite the line pointers with the copy, from which we've
948 	 * removed all the unused items.
949 	 */
950 	memcpy(phdr->pd_linp, newitemids, nused * sizeof(ItemIdData));
951 	phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
952 
953 	/* and compactify the tuple data */
954 	compactify_tuples(itemidbase, nused, page);
955 }
956 
957 
958 /*
959  * PageIndexTupleDeleteNoCompact
960  *
961  * Remove the specified tuple from an index page, but set its line pointer
962  * to "unused" instead of compacting it out, except that it can be removed
963  * if it's the last line pointer on the page.
964  *
965  * This is used for index AMs that require that existing TIDs of live tuples
966  * remain unchanged, and are willing to allow unused line pointers instead.
967  */
968 void
PageIndexTupleDeleteNoCompact(Page page,OffsetNumber offnum)969 PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum)
970 {
971 	PageHeader	phdr = (PageHeader) page;
972 	char	   *addr;
973 	ItemId		tup;
974 	Size		size;
975 	unsigned	offset;
976 	int			nline;
977 
978 	/*
979 	 * As with PageRepairFragmentation, paranoia seems justified.
980 	 */
981 	if (phdr->pd_lower < SizeOfPageHeaderData ||
982 		phdr->pd_lower > phdr->pd_upper ||
983 		phdr->pd_upper > phdr->pd_special ||
984 		phdr->pd_special > BLCKSZ ||
985 		phdr->pd_special != MAXALIGN(phdr->pd_special))
986 		ereport(ERROR,
987 				(errcode(ERRCODE_DATA_CORRUPTED),
988 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
989 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
990 
991 	nline = PageGetMaxOffsetNumber(page);
992 	if ((int) offnum <= 0 || (int) offnum > nline)
993 		elog(ERROR, "invalid index offnum: %u", offnum);
994 
995 	tup = PageGetItemId(page, offnum);
996 	Assert(ItemIdHasStorage(tup));
997 	size = ItemIdGetLength(tup);
998 	offset = ItemIdGetOffset(tup);
999 
1000 	if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
1001 		offset != MAXALIGN(offset))
1002 		ereport(ERROR,
1003 				(errcode(ERRCODE_DATA_CORRUPTED),
1004 				 errmsg("corrupted item pointer: offset = %u, size = %u",
1005 						offset, (unsigned int) size)));
1006 
1007 	/* Amount of space to actually be deleted */
1008 	size = MAXALIGN(size);
1009 
1010 	/*
1011 	 * Either set the item pointer to "unused", or zap it if it's the last
1012 	 * one.  (Note: it's possible that the next-to-last one(s) are already
1013 	 * unused, but we do not trouble to try to compact them out if so.)
1014 	 */
1015 	if ((int) offnum < nline)
1016 		ItemIdSetUnused(tup);
1017 	else
1018 	{
1019 		phdr->pd_lower -= sizeof(ItemIdData);
1020 		nline--;				/* there's one less than when we started */
1021 	}
1022 
1023 	/*
1024 	 * Now move everything between the old upper bound (beginning of tuple
1025 	 * space) and the beginning of the deleted tuple forward, so that space in
1026 	 * the middle of the page is left free.  If we've just deleted the tuple
1027 	 * at the beginning of tuple space, then there's no need to do the copy.
1028 	 */
1029 
1030 	/* beginning of tuple space */
1031 	addr = (char *) page + phdr->pd_upper;
1032 
1033 	if (offset > phdr->pd_upper)
1034 		memmove(addr + size, addr, offset - phdr->pd_upper);
1035 
1036 	/* adjust free space boundary pointer */
1037 	phdr->pd_upper += size;
1038 
1039 	/*
1040 	 * Finally, we need to adjust the linp entries that remain.
1041 	 *
1042 	 * Anything that used to be before the deleted tuple's data was moved
1043 	 * forward by the size of the deleted tuple.
1044 	 */
1045 	if (!PageIsEmpty(page))
1046 	{
1047 		int			i;
1048 
1049 		for (i = 1; i <= nline; i++)
1050 		{
1051 			ItemId		ii = PageGetItemId(phdr, i);
1052 
1053 			if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1054 				ii->lp_off += size;
1055 		}
1056 	}
1057 }
1058 
1059 
1060 /*
1061  * PageIndexTupleOverwrite
1062  *
1063  * Replace a specified tuple on an index page.
1064  *
1065  * The new tuple is placed exactly where the old one had been, shifting
1066  * other tuples' data up or down as needed to keep the page compacted.
1067  * This is better than deleting and reinserting the tuple, because it
1068  * avoids any data shifting when the tuple size doesn't change; and
1069  * even when it does, we avoid moving the item pointers around.
1070  * Conceivably this could also be of use to an index AM that cares about
1071  * the physical order of tuples as well as their ItemId order.
1072  *
1073  * If there's insufficient space for the new tuple, return false.  Other
1074  * errors represent data-corruption problems, so we just elog.
1075  */
1076 bool
PageIndexTupleOverwrite(Page page,OffsetNumber offnum,Item newtup,Size newsize)1077 PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
1078 						Item newtup, Size newsize)
1079 {
1080 	PageHeader	phdr = (PageHeader) page;
1081 	ItemId		tupid;
1082 	int			oldsize;
1083 	unsigned	offset;
1084 	Size		alignednewsize;
1085 	int			size_diff;
1086 	int			itemcount;
1087 
1088 	/*
1089 	 * As with PageRepairFragmentation, paranoia seems justified.
1090 	 */
1091 	if (phdr->pd_lower < SizeOfPageHeaderData ||
1092 		phdr->pd_lower > phdr->pd_upper ||
1093 		phdr->pd_upper > phdr->pd_special ||
1094 		phdr->pd_special > BLCKSZ ||
1095 		phdr->pd_special != MAXALIGN(phdr->pd_special))
1096 		ereport(ERROR,
1097 				(errcode(ERRCODE_DATA_CORRUPTED),
1098 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
1099 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
1100 
1101 	itemcount = PageGetMaxOffsetNumber(page);
1102 	if ((int) offnum <= 0 || (int) offnum > itemcount)
1103 		elog(ERROR, "invalid index offnum: %u", offnum);
1104 
1105 	tupid = PageGetItemId(page, offnum);
1106 	Assert(ItemIdHasStorage(tupid));
1107 	oldsize = ItemIdGetLength(tupid);
1108 	offset = ItemIdGetOffset(tupid);
1109 
1110 	if (offset < phdr->pd_upper || (offset + oldsize) > phdr->pd_special ||
1111 		offset != MAXALIGN(offset))
1112 		ereport(ERROR,
1113 				(errcode(ERRCODE_DATA_CORRUPTED),
1114 				 errmsg("corrupted item pointer: offset = %u, size = %u",
1115 						offset, (unsigned int) oldsize)));
1116 
1117 	/*
1118 	 * Determine actual change in space requirement, check for page overflow.
1119 	 */
1120 	oldsize = MAXALIGN(oldsize);
1121 	alignednewsize = MAXALIGN(newsize);
1122 	if (alignednewsize > oldsize + (phdr->pd_upper - phdr->pd_lower))
1123 		return false;
1124 
1125 	/*
1126 	 * Relocate existing data and update line pointers, unless the new tuple
1127 	 * is the same size as the old (after alignment), in which case there's
1128 	 * nothing to do.  Notice that what we have to relocate is data before the
1129 	 * target tuple, not data after, so it's convenient to express size_diff
1130 	 * as the amount by which the tuple's size is decreasing, making it the
1131 	 * delta to add to pd_upper and affected line pointers.
1132 	 */
1133 	size_diff = oldsize - (int) alignednewsize;
1134 	if (size_diff != 0)
1135 	{
1136 		char	   *addr = (char *) page + phdr->pd_upper;
1137 		int			i;
1138 
1139 		/* relocate all tuple data before the target tuple */
1140 		memmove(addr + size_diff, addr, offset - phdr->pd_upper);
1141 
1142 		/* adjust free space boundary pointer */
1143 		phdr->pd_upper += size_diff;
1144 
1145 		/* adjust affected line pointers too */
1146 		for (i = FirstOffsetNumber; i <= itemcount; i++)
1147 		{
1148 			ItemId		ii = PageGetItemId(phdr, i);
1149 
1150 			/* Allow items without storage; currently only BRIN needs that */
1151 			if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1152 				ii->lp_off += size_diff;
1153 		}
1154 	}
1155 
1156 	/* Update the item's tuple length (other fields shouldn't change) */
1157 	ItemIdSetNormal(tupid, offset + size_diff, newsize);
1158 
1159 	/* Copy new tuple data onto page */
1160 	memcpy(PageGetItem(page, tupid), newtup, newsize);
1161 
1162 	return true;
1163 }
1164 
1165 
1166 /*
1167  * Set checksum for a page in shared buffers.
1168  *
1169  * If checksums are disabled, or if the page is not initialized, just return
1170  * the input.  Otherwise, we must make a copy of the page before calculating
1171  * the checksum, to prevent concurrent modifications (e.g. setting hint bits)
1172  * from making the final checksum invalid.  It doesn't matter if we include or
1173  * exclude hints during the copy, as long as we write a valid page and
1174  * associated checksum.
1175  *
1176  * Returns a pointer to the block-sized data that needs to be written. Uses
1177  * statically-allocated memory, so the caller must immediately write the
1178  * returned page and not refer to it again.
1179  */
1180 char *
PageSetChecksumCopy(Page page,BlockNumber blkno)1181 PageSetChecksumCopy(Page page, BlockNumber blkno)
1182 {
1183 	static char *pageCopy = NULL;
1184 
1185 	/* If we don't need a checksum, just return the passed-in data */
1186 	if (PageIsNew(page) || !DataChecksumsEnabled())
1187 		return (char *) page;
1188 
1189 	/*
1190 	 * We allocate the copy space once and use it over on each subsequent
1191 	 * call.  The point of palloc'ing here, rather than having a static char
1192 	 * array, is first to ensure adequate alignment for the checksumming code
1193 	 * and second to avoid wasting space in processes that never call this.
1194 	 */
1195 	if (pageCopy == NULL)
1196 		pageCopy = MemoryContextAlloc(TopMemoryContext, BLCKSZ);
1197 
1198 	memcpy(pageCopy, (char *) page, BLCKSZ);
1199 	((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno);
1200 	return pageCopy;
1201 }
1202 
1203 /*
1204  * Set checksum for a page in private memory.
1205  *
1206  * This must only be used when we know that no other process can be modifying
1207  * the page buffer.
1208  */
1209 void
PageSetChecksumInplace(Page page,BlockNumber blkno)1210 PageSetChecksumInplace(Page page, BlockNumber blkno)
1211 {
1212 	/* If we don't need a checksum, just return */
1213 	if (PageIsNew(page) || !DataChecksumsEnabled())
1214 		return;
1215 
1216 	((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blkno);
1217 }
1218