1 /*-------------------------------------------------------------------------
2  *
3  * bufpage.c
4  *	  POSTGRES standard buffer page code.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/page/bufpage.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/itup.h"
19 #include "access/xlog.h"
20 #include "storage/checksum.h"
21 #include "utils/memdebug.h"
22 #include "utils/memutils.h"
23 
24 
25 /* GUC variable */
26 bool		ignore_checksum_failure = false;
27 
28 
29 /* ----------------------------------------------------------------
30  *						Page support functions
31  * ----------------------------------------------------------------
32  */
33 
34 /*
35  * PageInit
36  *		Initializes the contents of a page.
37  *		Note that we don't calculate an initial checksum here; that's not done
38  *		until it's time to write.
39  */
40 void
PageInit(Page page,Size pageSize,Size specialSize)41 PageInit(Page page, Size pageSize, Size specialSize)
42 {
43 	PageHeader	p = (PageHeader) page;
44 
45 	specialSize = MAXALIGN(specialSize);
46 
47 	Assert(pageSize == BLCKSZ);
48 	Assert(pageSize > specialSize + SizeOfPageHeaderData);
49 
50 	/* Make sure all fields of page are zero, as well as unused space */
51 	MemSet(p, 0, pageSize);
52 
53 	p->pd_flags = 0;
54 	p->pd_lower = SizeOfPageHeaderData;
55 	p->pd_upper = pageSize - specialSize;
56 	p->pd_special = pageSize - specialSize;
57 	PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
58 	/* p->pd_prune_xid = InvalidTransactionId;		done by above MemSet */
59 }
60 
61 
62 /*
63  * PageIsVerified
64  *		Check that the page header and checksum (if any) appear valid.
65  *
66  * This is called when a page has just been read in from disk.  The idea is
67  * to cheaply detect trashed pages before we go nuts following bogus item
68  * pointers, testing invalid transaction identifiers, etc.
69  *
70  * It turns out to be necessary to allow zeroed pages here too.  Even though
71  * this routine is *not* called when deliberately adding a page to a relation,
72  * there are scenarios in which a zeroed page might be found in a table.
73  * (Example: a backend extends a relation, then crashes before it can write
74  * any WAL entry about the new page.  The kernel will already have the
75  * zeroed page in the file, and it will stay that way after restart.)  So we
76  * allow zeroed pages here, and are careful that the page access macros
77  * treat such a page as empty and without free space.  Eventually, VACUUM
78  * will clean up such a page and make it usable.
79  */
80 bool
PageIsVerified(Page page,BlockNumber blkno)81 PageIsVerified(Page page, BlockNumber blkno)
82 {
83 	PageHeader	p = (PageHeader) page;
84 	char	   *pagebytes;
85 	int			i;
86 	bool		checksum_failure = false;
87 	bool		header_sane = false;
88 	bool		all_zeroes = false;
89 	uint16		checksum = 0;
90 
91 	/*
92 	 * Don't verify page data unless the page passes basic non-zero test
93 	 */
94 	if (!PageIsNew(page))
95 	{
96 		if (DataChecksumsEnabled())
97 		{
98 			checksum = pg_checksum_page((char *) page, blkno);
99 
100 			if (checksum != p->pd_checksum)
101 				checksum_failure = true;
102 		}
103 
104 		/*
105 		 * The following checks don't prove the header is correct, only that
106 		 * it looks sane enough to allow into the buffer pool. Later usage of
107 		 * the block can still reveal problems, which is why we offer the
108 		 * checksum option.
109 		 */
110 		if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
111 			p->pd_lower <= p->pd_upper &&
112 			p->pd_upper <= p->pd_special &&
113 			p->pd_special <= BLCKSZ &&
114 			p->pd_special == MAXALIGN(p->pd_special))
115 			header_sane = true;
116 
117 		if (header_sane && !checksum_failure)
118 			return true;
119 	}
120 
121 	/* Check all-zeroes case */
122 	all_zeroes = true;
123 	pagebytes = (char *) page;
124 	for (i = 0; i < BLCKSZ; i++)
125 	{
126 		if (pagebytes[i] != 0)
127 		{
128 			all_zeroes = false;
129 			break;
130 		}
131 	}
132 
133 	if (all_zeroes)
134 		return true;
135 
136 	/*
137 	 * Throw a WARNING if the checksum fails, but only after we've checked for
138 	 * the all-zeroes case.
139 	 */
140 	if (checksum_failure)
141 	{
142 		ereport(WARNING,
143 				(errcode(ERRCODE_DATA_CORRUPTED),
144 				 errmsg("page verification failed, calculated checksum %u but expected %u",
145 						checksum, p->pd_checksum)));
146 
147 		if (header_sane && ignore_checksum_failure)
148 			return true;
149 	}
150 
151 	return false;
152 }
153 
154 
155 /*
156  *	PageAddItemExtended
157  *
158  *	Add an item to a page.  Return value is the offset at which it was
159  *	inserted, or InvalidOffsetNumber if the item is not inserted for any
160  *	reason.  A WARNING is issued indicating the reason for the refusal.
161  *
162  *	If flag PAI_OVERWRITE is set, we just store the item at the specified
163  *	offsetNumber (which must be either a currently-unused item pointer,
164  *	or one past the last existing item).  Otherwise,
165  *	if offsetNumber is valid and <= current max offset in the page,
166  *	insert item into the array at that position by shuffling ItemId's
167  *	down to make room.
168  *	If offsetNumber is not valid, then assign one by finding the first
169  *	one that is both unused and deallocated.
170  *
171  *	If flag PAI_IS_HEAP is set, we enforce that there can't be more than
172  *	MaxHeapTuplesPerPage line pointers on the page.
173  *
174  *	If flag PAI_ALLOW_FAR_OFFSET is not set, we disallow placing items
175  *	beyond one past the last existing item.
176  *
177  *	!!! EREPORT(ERROR) IS DISALLOWED HERE !!!
178  */
179 OffsetNumber
PageAddItemExtended(Page page,Item item,Size size,OffsetNumber offsetNumber,int flags)180 PageAddItemExtended(Page page,
181 					Item item,
182 					Size size,
183 					OffsetNumber offsetNumber,
184 					int flags)
185 {
186 	PageHeader	phdr = (PageHeader) page;
187 	Size		alignedSize;
188 	int			lower;
189 	int			upper;
190 	ItemId		itemId;
191 	OffsetNumber limit;
192 	bool		needshuffle = false;
193 
194 	/*
195 	 * Be wary about corrupted page pointers
196 	 */
197 	if (phdr->pd_lower < SizeOfPageHeaderData ||
198 		phdr->pd_lower > phdr->pd_upper ||
199 		phdr->pd_upper > phdr->pd_special ||
200 		phdr->pd_special > BLCKSZ)
201 		ereport(PANIC,
202 				(errcode(ERRCODE_DATA_CORRUPTED),
203 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
204 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
205 
206 	/*
207 	 * Select offsetNumber to place the new item at
208 	 */
209 	limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
210 
211 	/* was offsetNumber passed in? */
212 	if (OffsetNumberIsValid(offsetNumber))
213 	{
214 		/* yes, check it */
215 		if ((flags & PAI_OVERWRITE) != 0)
216 		{
217 			if (offsetNumber < limit)
218 			{
219 				itemId = PageGetItemId(phdr, offsetNumber);
220 				if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
221 				{
222 					elog(WARNING, "will not overwrite a used ItemId");
223 					return InvalidOffsetNumber;
224 				}
225 			}
226 		}
227 		else
228 		{
229 			if (offsetNumber < limit)
230 				needshuffle = true;		/* need to move existing linp's */
231 		}
232 	}
233 	else
234 	{
235 		/* offsetNumber was not passed in, so find a free slot */
236 		/* if no free slot, we'll put it at limit (1st open slot) */
237 		if (PageHasFreeLinePointers(phdr))
238 		{
239 			/*
240 			 * Look for "recyclable" (unused) ItemId.  We check for no storage
241 			 * as well, just to be paranoid --- unused items should never have
242 			 * storage.
243 			 */
244 			for (offsetNumber = 1; offsetNumber < limit; offsetNumber++)
245 			{
246 				itemId = PageGetItemId(phdr, offsetNumber);
247 				if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
248 					break;
249 			}
250 			if (offsetNumber >= limit)
251 			{
252 				/* the hint is wrong, so reset it */
253 				PageClearHasFreeLinePointers(phdr);
254 			}
255 		}
256 		else
257 		{
258 			/* don't bother searching if hint says there's no free slot */
259 			offsetNumber = limit;
260 		}
261 	}
262 
263 	/*
264 	 * Reject placing items beyond the first unused line pointer, unless
265 	 * caller asked for that behavior specifically.
266 	 */
267 	if ((flags & PAI_ALLOW_FAR_OFFSET) == 0 && offsetNumber > limit)
268 	{
269 		elog(WARNING, "specified item offset is too large");
270 		return InvalidOffsetNumber;
271 	}
272 
273 	/* Reject placing items beyond heap boundary, if heap */
274 	if ((flags & PAI_IS_HEAP) != 0 && offsetNumber > MaxHeapTuplesPerPage)
275 	{
276 		elog(WARNING, "can't put more than MaxHeapTuplesPerPage items in a heap page");
277 		return InvalidOffsetNumber;
278 	}
279 
280 	/*
281 	 * Compute new lower and upper pointers for page, see if it'll fit.
282 	 *
283 	 * Note: do arithmetic as signed ints, to avoid mistakes if, say,
284 	 * alignedSize > pd_upper.
285 	 */
286 	if ((flags & PAI_ALLOW_FAR_OFFSET) != 0)
287 		lower = Max(phdr->pd_lower,
288 					SizeOfPageHeaderData + sizeof(ItemIdData) * offsetNumber);
289 	else if (offsetNumber == limit || needshuffle)
290 		lower = phdr->pd_lower + sizeof(ItemIdData);
291 	else
292 		lower = phdr->pd_lower;
293 
294 	alignedSize = MAXALIGN(size);
295 
296 	upper = (int) phdr->pd_upper - (int) alignedSize;
297 
298 	if (lower > upper)
299 		return InvalidOffsetNumber;
300 
301 	/*
302 	 * OK to insert the item.  First, shuffle the existing pointers if needed.
303 	 */
304 	itemId = PageGetItemId(phdr, offsetNumber);
305 
306 	if (needshuffle)
307 		memmove(itemId + 1, itemId,
308 				(limit - offsetNumber) * sizeof(ItemIdData));
309 
310 	/* set the item pointer */
311 	ItemIdSetNormal(itemId, upper, size);
312 
313 	/*
314 	 * Items normally contain no uninitialized bytes.  Core bufpage consumers
315 	 * conform, but this is not a necessary coding rule; a new index AM could
316 	 * opt to depart from it.  However, data type input functions and other
317 	 * C-language functions that synthesize datums should initialize all
318 	 * bytes; datumIsEqual() relies on this.  Testing here, along with the
319 	 * similar check in printtup(), helps to catch such mistakes.
320 	 *
321 	 * Values of the "name" type retrieved via index-only scans may contain
322 	 * uninitialized bytes; see comment in btrescan().  Valgrind will report
323 	 * this as an error, but it is safe to ignore.
324 	 */
325 	VALGRIND_CHECK_MEM_IS_DEFINED(item, size);
326 
327 	/* copy the item's data onto the page */
328 	memcpy((char *) page + upper, item, size);
329 
330 	/* adjust page header */
331 	phdr->pd_lower = (LocationIndex) lower;
332 	phdr->pd_upper = (LocationIndex) upper;
333 
334 	return offsetNumber;
335 }
336 
337 /*
338  *	PageAddItem
339  *
340  *	Add an item to a page.  Return value is offset at which it was
341  *	inserted, or InvalidOffsetNumber if the item is not inserted for
342  *	any reason.
343  *
344  *	Passing the 'overwrite' and 'is_heap' parameters as true causes the
345  *	PAI_OVERWRITE and PAI_IS_HEAP flags to be set, respectively.
346  *
347  *	!!! EREPORT(ERROR) IS DISALLOWED HERE !!!
348  */
349 OffsetNumber
PageAddItem(Page page,Item item,Size size,OffsetNumber offsetNumber,bool overwrite,bool is_heap)350 PageAddItem(Page page, Item item, Size size, OffsetNumber offsetNumber,
351 			bool overwrite, bool is_heap)
352 {
353 	return PageAddItemExtended(page, item, size, offsetNumber,
354 							   overwrite ? PAI_OVERWRITE : 0 |
355 							   is_heap ? PAI_IS_HEAP : 0);
356 }
357 
358 /*
359  * PageGetTempPage
360  *		Get a temporary page in local memory for special processing.
361  *		The returned page is not initialized at all; caller must do that.
362  */
363 Page
PageGetTempPage(Page page)364 PageGetTempPage(Page page)
365 {
366 	Size		pageSize;
367 	Page		temp;
368 
369 	pageSize = PageGetPageSize(page);
370 	temp = (Page) palloc(pageSize);
371 
372 	return temp;
373 }
374 
375 /*
376  * PageGetTempPageCopy
377  *		Get a temporary page in local memory for special processing.
378  *		The page is initialized by copying the contents of the given page.
379  */
380 Page
PageGetTempPageCopy(Page page)381 PageGetTempPageCopy(Page page)
382 {
383 	Size		pageSize;
384 	Page		temp;
385 
386 	pageSize = PageGetPageSize(page);
387 	temp = (Page) palloc(pageSize);
388 
389 	memcpy(temp, page, pageSize);
390 
391 	return temp;
392 }
393 
394 /*
395  * PageGetTempPageCopySpecial
396  *		Get a temporary page in local memory for special processing.
397  *		The page is PageInit'd with the same special-space size as the
398  *		given page, and the special space is copied from the given page.
399  */
400 Page
PageGetTempPageCopySpecial(Page page)401 PageGetTempPageCopySpecial(Page page)
402 {
403 	Size		pageSize;
404 	Page		temp;
405 
406 	pageSize = PageGetPageSize(page);
407 	temp = (Page) palloc(pageSize);
408 
409 	PageInit(temp, pageSize, PageGetSpecialSize(page));
410 	memcpy(PageGetSpecialPointer(temp),
411 		   PageGetSpecialPointer(page),
412 		   PageGetSpecialSize(page));
413 
414 	return temp;
415 }
416 
417 /*
418  * PageRestoreTempPage
419  *		Copy temporary page back to permanent page after special processing
420  *		and release the temporary page.
421  */
422 void
PageRestoreTempPage(Page tempPage,Page oldPage)423 PageRestoreTempPage(Page tempPage, Page oldPage)
424 {
425 	Size		pageSize;
426 
427 	pageSize = PageGetPageSize(tempPage);
428 	memcpy((char *) oldPage, (char *) tempPage, pageSize);
429 
430 	pfree(tempPage);
431 }
432 
433 /*
434  * sorting support for PageRepairFragmentation, PageIndexMultiDelete,
435  * PageIndexDeleteNoCompact
436  */
437 typedef struct itemIdSortData
438 {
439 	uint16		offsetindex;	/* linp array index */
440 	int16		itemoff;		/* page offset of item data */
441 	uint16		alignedlen;		/* MAXALIGN(item data len) */
442 } itemIdSortData;
443 typedef itemIdSortData *itemIdSort;
444 
445 static int
itemoffcompare(const void * itemidp1,const void * itemidp2)446 itemoffcompare(const void *itemidp1, const void *itemidp2)
447 {
448 	/* Sort in decreasing itemoff order */
449 	return ((itemIdSort) itemidp2)->itemoff -
450 		((itemIdSort) itemidp1)->itemoff;
451 }
452 
453 /*
454  * After removing or marking some line pointers unused, move the tuples to
455  * remove the gaps caused by the removed items.
456  */
457 static void
compactify_tuples(itemIdSort itemidbase,int nitems,Page page)458 compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
459 {
460 	PageHeader	phdr = (PageHeader) page;
461 	Offset		upper;
462 	int			i;
463 
464 	/* sort itemIdSortData array into decreasing itemoff order */
465 	qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
466 		  itemoffcompare);
467 
468 	upper = phdr->pd_special;
469 	for (i = 0; i < nitems; i++)
470 	{
471 		itemIdSort	itemidptr = &itemidbase[i];
472 		ItemId		lp;
473 
474 		lp = PageGetItemId(page, itemidptr->offsetindex + 1);
475 		upper -= itemidptr->alignedlen;
476 		memmove((char *) page + upper,
477 				(char *) page + itemidptr->itemoff,
478 				itemidptr->alignedlen);
479 		lp->lp_off = upper;
480 	}
481 
482 	phdr->pd_upper = upper;
483 }
484 
485 /*
486  * PageRepairFragmentation
487  *
488  * Frees fragmented space on a page.
489  * It doesn't remove unused line pointers! Please don't change this.
490  *
491  * This routine is usable for heap pages only, but see PageIndexMultiDelete.
492  *
493  * As a side effect, the page's PD_HAS_FREE_LINES hint bit is updated.
494  */
495 void
PageRepairFragmentation(Page page)496 PageRepairFragmentation(Page page)
497 {
498 	Offset		pd_lower = ((PageHeader) page)->pd_lower;
499 	Offset		pd_upper = ((PageHeader) page)->pd_upper;
500 	Offset		pd_special = ((PageHeader) page)->pd_special;
501 	ItemId		lp;
502 	int			nline,
503 				nstorage,
504 				nunused;
505 	int			i;
506 	Size		totallen;
507 
508 	/*
509 	 * It's worth the trouble to be more paranoid here than in most places,
510 	 * because we are about to reshuffle data in (what is usually) a shared
511 	 * disk buffer.  If we aren't careful then corrupted pointers, lengths,
512 	 * etc could cause us to clobber adjacent disk buffers, spreading the data
513 	 * loss further.  So, check everything.
514 	 */
515 	if (pd_lower < SizeOfPageHeaderData ||
516 		pd_lower > pd_upper ||
517 		pd_upper > pd_special ||
518 		pd_special > BLCKSZ ||
519 		pd_special != MAXALIGN(pd_special))
520 		ereport(ERROR,
521 				(errcode(ERRCODE_DATA_CORRUPTED),
522 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
523 						pd_lower, pd_upper, pd_special)));
524 
525 	nline = PageGetMaxOffsetNumber(page);
526 	nunused = nstorage = 0;
527 	for (i = FirstOffsetNumber; i <= nline; i++)
528 	{
529 		lp = PageGetItemId(page, i);
530 		if (ItemIdIsUsed(lp))
531 		{
532 			if (ItemIdHasStorage(lp))
533 				nstorage++;
534 		}
535 		else
536 		{
537 			/* Unused entries should have lp_len = 0, but make sure */
538 			ItemIdSetUnused(lp);
539 			nunused++;
540 		}
541 	}
542 
543 	if (nstorage == 0)
544 	{
545 		/* Page is completely empty, so just reset it quickly */
546 		((PageHeader) page)->pd_upper = pd_special;
547 	}
548 	else
549 	{
550 		/* Need to compact the page the hard way */
551 		itemIdSortData itemidbase[MaxHeapTuplesPerPage];
552 		itemIdSort	itemidptr = itemidbase;
553 
554 		totallen = 0;
555 		for (i = 0; i < nline; i++)
556 		{
557 			lp = PageGetItemId(page, i + 1);
558 			if (ItemIdHasStorage(lp))
559 			{
560 				itemidptr->offsetindex = i;
561 				itemidptr->itemoff = ItemIdGetOffset(lp);
562 				if (itemidptr->itemoff < (int) pd_upper ||
563 					itemidptr->itemoff >= (int) pd_special)
564 					ereport(ERROR,
565 							(errcode(ERRCODE_DATA_CORRUPTED),
566 							 errmsg("corrupted item pointer: %u",
567 									itemidptr->itemoff)));
568 				itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
569 				totallen += itemidptr->alignedlen;
570 				itemidptr++;
571 			}
572 		}
573 
574 		if (totallen > (Size) (pd_special - pd_lower))
575 			ereport(ERROR,
576 					(errcode(ERRCODE_DATA_CORRUPTED),
577 			   errmsg("corrupted item lengths: total %u, available space %u",
578 					  (unsigned int) totallen, pd_special - pd_lower)));
579 
580 		compactify_tuples(itemidbase, nstorage, page);
581 	}
582 
583 	/* Set hint bit for PageAddItem */
584 	if (nunused > 0)
585 		PageSetHasFreeLinePointers(page);
586 	else
587 		PageClearHasFreeLinePointers(page);
588 }
589 
590 /*
591  * PageGetFreeSpace
592  *		Returns the size of the free (allocatable) space on a page,
593  *		reduced by the space needed for a new line pointer.
594  *
595  * Note: this should usually only be used on index pages.  Use
596  * PageGetHeapFreeSpace on heap pages.
597  */
598 Size
PageGetFreeSpace(Page page)599 PageGetFreeSpace(Page page)
600 {
601 	int			space;
602 
603 	/*
604 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
605 	 * pd_upper.
606 	 */
607 	space = (int) ((PageHeader) page)->pd_upper -
608 		(int) ((PageHeader) page)->pd_lower;
609 
610 	if (space < (int) sizeof(ItemIdData))
611 		return 0;
612 	space -= sizeof(ItemIdData);
613 
614 	return (Size) space;
615 }
616 
617 /*
618  * PageGetExactFreeSpace
619  *		Returns the size of the free (allocatable) space on a page,
620  *		without any consideration for adding/removing line pointers.
621  */
622 Size
PageGetExactFreeSpace(Page page)623 PageGetExactFreeSpace(Page page)
624 {
625 	int			space;
626 
627 	/*
628 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
629 	 * pd_upper.
630 	 */
631 	space = (int) ((PageHeader) page)->pd_upper -
632 		(int) ((PageHeader) page)->pd_lower;
633 
634 	if (space < 0)
635 		return 0;
636 
637 	return (Size) space;
638 }
639 
640 
641 /*
642  * PageGetHeapFreeSpace
643  *		Returns the size of the free (allocatable) space on a page,
644  *		reduced by the space needed for a new line pointer.
645  *
646  * The difference between this and PageGetFreeSpace is that this will return
647  * zero if there are already MaxHeapTuplesPerPage line pointers in the page
648  * and none are free.  We use this to enforce that no more than
649  * MaxHeapTuplesPerPage line pointers are created on a heap page.  (Although
650  * no more tuples than that could fit anyway, in the presence of redirected
651  * or dead line pointers it'd be possible to have too many line pointers.
652  * To avoid breaking code that assumes MaxHeapTuplesPerPage is a hard limit
653  * on the number of line pointers, we make this extra check.)
654  */
655 Size
PageGetHeapFreeSpace(Page page)656 PageGetHeapFreeSpace(Page page)
657 {
658 	Size		space;
659 
660 	space = PageGetFreeSpace(page);
661 	if (space > 0)
662 	{
663 		OffsetNumber offnum,
664 					nline;
665 
666 		/*
667 		 * Are there already MaxHeapTuplesPerPage line pointers in the page?
668 		 */
669 		nline = PageGetMaxOffsetNumber(page);
670 		if (nline >= MaxHeapTuplesPerPage)
671 		{
672 			if (PageHasFreeLinePointers((PageHeader) page))
673 			{
674 				/*
675 				 * Since this is just a hint, we must confirm that there is
676 				 * indeed a free line pointer
677 				 */
678 				for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
679 				{
680 					ItemId		lp = PageGetItemId(page, offnum);
681 
682 					if (!ItemIdIsUsed(lp))
683 						break;
684 				}
685 
686 				if (offnum > nline)
687 				{
688 					/*
689 					 * The hint is wrong, but we can't clear it here since we
690 					 * don't have the ability to mark the page dirty.
691 					 */
692 					space = 0;
693 				}
694 			}
695 			else
696 			{
697 				/*
698 				 * Although the hint might be wrong, PageAddItem will believe
699 				 * it anyway, so we must believe it too.
700 				 */
701 				space = 0;
702 			}
703 		}
704 	}
705 	return space;
706 }
707 
708 
709 /*
710  * PageIndexTupleDelete
711  *
712  * This routine does the work of removing a tuple from an index page.
713  *
714  * Unlike heap pages, we compact out the line pointer for the removed tuple.
715  */
716 void
PageIndexTupleDelete(Page page,OffsetNumber offnum)717 PageIndexTupleDelete(Page page, OffsetNumber offnum)
718 {
719 	PageHeader	phdr = (PageHeader) page;
720 	char	   *addr;
721 	ItemId		tup;
722 	Size		size;
723 	unsigned	offset;
724 	int			nbytes;
725 	int			offidx;
726 	int			nline;
727 
728 	/*
729 	 * As with PageRepairFragmentation, paranoia seems justified.
730 	 */
731 	if (phdr->pd_lower < SizeOfPageHeaderData ||
732 		phdr->pd_lower > phdr->pd_upper ||
733 		phdr->pd_upper > phdr->pd_special ||
734 		phdr->pd_special > BLCKSZ)
735 		ereport(ERROR,
736 				(errcode(ERRCODE_DATA_CORRUPTED),
737 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
738 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
739 
740 	nline = PageGetMaxOffsetNumber(page);
741 	if ((int) offnum <= 0 || (int) offnum > nline)
742 		elog(ERROR, "invalid index offnum: %u", offnum);
743 
744 	/* change offset number to offset index */
745 	offidx = offnum - 1;
746 
747 	tup = PageGetItemId(page, offnum);
748 	Assert(ItemIdHasStorage(tup));
749 	size = ItemIdGetLength(tup);
750 	offset = ItemIdGetOffset(tup);
751 
752 	if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
753 		offset != MAXALIGN(offset) || size != MAXALIGN(size))
754 		ereport(ERROR,
755 				(errcode(ERRCODE_DATA_CORRUPTED),
756 				 errmsg("corrupted item pointer: offset = %u, size = %u",
757 						offset, (unsigned int) size)));
758 
759 	/*
760 	 * First, we want to get rid of the pd_linp entry for the index tuple. We
761 	 * copy all subsequent linp's back one slot in the array. We don't use
762 	 * PageGetItemId, because we are manipulating the _array_, not individual
763 	 * linp's.
764 	 */
765 	nbytes = phdr->pd_lower -
766 		((char *) &phdr->pd_linp[offidx + 1] - (char *) phdr);
767 
768 	if (nbytes > 0)
769 		memmove((char *) &(phdr->pd_linp[offidx]),
770 				(char *) &(phdr->pd_linp[offidx + 1]),
771 				nbytes);
772 
773 	/*
774 	 * Now move everything between the old upper bound (beginning of tuple
775 	 * space) and the beginning of the deleted tuple forward, so that space in
776 	 * the middle of the page is left free.  If we've just deleted the tuple
777 	 * at the beginning of tuple space, then there's no need to do the copy
778 	 * (and bcopy on some architectures SEGV's if asked to move zero bytes).
779 	 */
780 
781 	/* beginning of tuple space */
782 	addr = (char *) page + phdr->pd_upper;
783 
784 	if (offset > phdr->pd_upper)
785 		memmove(addr + size, addr, (int) (offset - phdr->pd_upper));
786 
787 	/* adjust free space boundary pointers */
788 	phdr->pd_upper += size;
789 	phdr->pd_lower -= sizeof(ItemIdData);
790 
791 	/*
792 	 * Finally, we need to adjust the linp entries that remain.
793 	 *
794 	 * Anything that used to be before the deleted tuple's data was moved
795 	 * forward by the size of the deleted tuple.
796 	 */
797 	if (!PageIsEmpty(page))
798 	{
799 		int			i;
800 
801 		nline--;				/* there's one less than when we started */
802 		for (i = 1; i <= nline; i++)
803 		{
804 			ItemId		ii = PageGetItemId(phdr, i);
805 
806 			Assert(ItemIdHasStorage(ii));
807 			if (ItemIdGetOffset(ii) <= offset)
808 				ii->lp_off += size;
809 		}
810 	}
811 }
812 
813 
814 /*
815  * PageIndexMultiDelete
816  *
817  * This routine handles the case of deleting multiple tuples from an
818  * index page at once.  It is considerably faster than a loop around
819  * PageIndexTupleDelete ... however, the caller *must* supply the array
820  * of item numbers to be deleted in item number order!
821  */
822 void
PageIndexMultiDelete(Page page,OffsetNumber * itemnos,int nitems)823 PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
824 {
825 	PageHeader	phdr = (PageHeader) page;
826 	Offset		pd_lower = phdr->pd_lower;
827 	Offset		pd_upper = phdr->pd_upper;
828 	Offset		pd_special = phdr->pd_special;
829 	itemIdSortData itemidbase[MaxIndexTuplesPerPage];
830 	ItemIdData	newitemids[MaxIndexTuplesPerPage];
831 	itemIdSort	itemidptr;
832 	ItemId		lp;
833 	int			nline,
834 				nused;
835 	Size		totallen;
836 	Size		size;
837 	unsigned	offset;
838 	int			nextitm;
839 	OffsetNumber offnum;
840 
841 	Assert(nitems <= MaxIndexTuplesPerPage);
842 
843 	/*
844 	 * If there aren't very many items to delete, then retail
845 	 * PageIndexTupleDelete is the best way.  Delete the items in reverse
846 	 * order so we don't have to think about adjusting item numbers for
847 	 * previous deletions.
848 	 *
849 	 * TODO: tune the magic number here
850 	 */
851 	if (nitems <= 2)
852 	{
853 		while (--nitems >= 0)
854 			PageIndexTupleDelete(page, itemnos[nitems]);
855 		return;
856 	}
857 
858 	/*
859 	 * As with PageRepairFragmentation, paranoia seems justified.
860 	 */
861 	if (pd_lower < SizeOfPageHeaderData ||
862 		pd_lower > pd_upper ||
863 		pd_upper > pd_special ||
864 		pd_special > BLCKSZ ||
865 		pd_special != MAXALIGN(pd_special))
866 		ereport(ERROR,
867 				(errcode(ERRCODE_DATA_CORRUPTED),
868 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
869 						pd_lower, pd_upper, pd_special)));
870 
871 	/*
872 	 * Scan the item pointer array and build a list of just the ones we are
873 	 * going to keep.  Notice we do not modify the page yet, since we are
874 	 * still validity-checking.
875 	 */
876 	nline = PageGetMaxOffsetNumber(page);
877 	itemidptr = itemidbase;
878 	totallen = 0;
879 	nused = 0;
880 	nextitm = 0;
881 	for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
882 	{
883 		lp = PageGetItemId(page, offnum);
884 		Assert(ItemIdHasStorage(lp));
885 		size = ItemIdGetLength(lp);
886 		offset = ItemIdGetOffset(lp);
887 		if (offset < pd_upper ||
888 			(offset + size) > pd_special ||
889 			offset != MAXALIGN(offset))
890 			ereport(ERROR,
891 					(errcode(ERRCODE_DATA_CORRUPTED),
892 					 errmsg("corrupted item pointer: offset = %u, length = %u",
893 							offset, (unsigned int) size)));
894 
895 		if (nextitm < nitems && offnum == itemnos[nextitm])
896 		{
897 			/* skip item to be deleted */
898 			nextitm++;
899 		}
900 		else
901 		{
902 			itemidptr->offsetindex = nused;		/* where it will go */
903 			itemidptr->itemoff = offset;
904 			itemidptr->alignedlen = MAXALIGN(size);
905 			totallen += itemidptr->alignedlen;
906 			newitemids[nused] = *lp;
907 			itemidptr++;
908 			nused++;
909 		}
910 	}
911 
912 	/* this will catch invalid or out-of-order itemnos[] */
913 	if (nextitm != nitems)
914 		elog(ERROR, "incorrect index offsets supplied");
915 
916 	if (totallen > (Size) (pd_special - pd_lower))
917 		ereport(ERROR,
918 				(errcode(ERRCODE_DATA_CORRUPTED),
919 			   errmsg("corrupted item lengths: total %u, available space %u",
920 					  (unsigned int) totallen, pd_special - pd_lower)));
921 
922 	/*
923 	 * Looks good. Overwrite the line pointers with the copy, from which we've
924 	 * removed all the unused items.
925 	 */
926 	memcpy(phdr->pd_linp, newitemids, nused * sizeof(ItemIdData));
927 	phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
928 
929 	/* and compactify the tuple data */
930 	compactify_tuples(itemidbase, nused, page);
931 }
932 
933 /*
934  * PageIndexDeleteNoCompact
935  *		Delete the given items for an index page, and defragment the resulting
936  *		free space, but do not compact the item pointers array.
937  *
938  * itemnos is the array of tuples to delete; nitems is its size.  maxIdxTuples
939  * is the maximum number of tuples that can exist in a page.
940  *
941  * Unused items at the end of the array are removed.
942  *
943  * This is used for index AMs that require that existing TIDs of live tuples
944  * remain unchanged.
945  */
946 void
PageIndexDeleteNoCompact(Page page,OffsetNumber * itemnos,int nitems)947 PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
948 {
949 	PageHeader	phdr = (PageHeader) page;
950 	LocationIndex pd_lower = phdr->pd_lower;
951 	LocationIndex pd_upper = phdr->pd_upper;
952 	LocationIndex pd_special = phdr->pd_special;
953 	int			nline;
954 	bool		empty;
955 	OffsetNumber offnum;
956 	int			nextitm;
957 
958 	/*
959 	 * As with PageRepairFragmentation, paranoia seems justified.
960 	 */
961 	if (pd_lower < SizeOfPageHeaderData ||
962 		pd_lower > pd_upper ||
963 		pd_upper > pd_special ||
964 		pd_special > BLCKSZ ||
965 		pd_special != MAXALIGN(pd_special))
966 		ereport(ERROR,
967 				(errcode(ERRCODE_DATA_CORRUPTED),
968 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
969 						pd_lower, pd_upper, pd_special)));
970 
971 	/*
972 	 * Scan the existing item pointer array and mark as unused those that are
973 	 * in our kill-list; make sure any non-interesting ones are marked unused
974 	 * as well.
975 	 */
976 	nline = PageGetMaxOffsetNumber(page);
977 	empty = true;
978 	nextitm = 0;
979 	for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
980 	{
981 		ItemId		lp;
982 		ItemLength	itemlen;
983 		ItemOffset	offset;
984 
985 		lp = PageGetItemId(page, offnum);
986 
987 		itemlen = ItemIdGetLength(lp);
988 		offset = ItemIdGetOffset(lp);
989 
990 		if (ItemIdIsUsed(lp))
991 		{
992 			if (offset < pd_upper ||
993 				(offset + itemlen) > pd_special ||
994 				offset != MAXALIGN(offset))
995 				ereport(ERROR,
996 						(errcode(ERRCODE_DATA_CORRUPTED),
997 				   errmsg("corrupted item pointer: offset = %u, length = %u",
998 						  offset, (unsigned int) itemlen)));
999 
1000 			if (nextitm < nitems && offnum == itemnos[nextitm])
1001 			{
1002 				/* this one is on our list to delete, so mark it unused */
1003 				ItemIdSetUnused(lp);
1004 				nextitm++;
1005 			}
1006 			else if (ItemIdHasStorage(lp))
1007 			{
1008 				/* This one's live -- must do the compaction dance */
1009 				empty = false;
1010 			}
1011 			else
1012 			{
1013 				/* get rid of this one too */
1014 				ItemIdSetUnused(lp);
1015 			}
1016 		}
1017 	}
1018 
1019 	/* this will catch invalid or out-of-order itemnos[] */
1020 	if (nextitm != nitems)
1021 		elog(ERROR, "incorrect index offsets supplied");
1022 
1023 	if (empty)
1024 	{
1025 		/* Page is completely empty, so just reset it quickly */
1026 		phdr->pd_lower = SizeOfPageHeaderData;
1027 		phdr->pd_upper = pd_special;
1028 	}
1029 	else
1030 	{
1031 		/* There are live items: need to compact the page the hard way */
1032 		itemIdSortData itemidbase[MaxOffsetNumber];
1033 		itemIdSort	itemidptr;
1034 		int			i;
1035 		Size		totallen;
1036 
1037 		/*
1038 		 * Scan the page taking note of each item that we need to preserve.
1039 		 * This includes both live items (those that contain data) and
1040 		 * interspersed unused ones.  It's critical to preserve these unused
1041 		 * items, because otherwise the offset numbers for later live items
1042 		 * would change, which is not acceptable.  Unused items might get used
1043 		 * again later; that is fine.
1044 		 */
1045 		itemidptr = itemidbase;
1046 		totallen = 0;
1047 		PageClearHasFreeLinePointers(page);
1048 		for (i = 0; i < nline; i++)
1049 		{
1050 			ItemId		lp;
1051 
1052 			itemidptr->offsetindex = i;
1053 
1054 			lp = PageGetItemId(page, i + 1);
1055 			if (ItemIdHasStorage(lp))
1056 			{
1057 				itemidptr->itemoff = ItemIdGetOffset(lp);
1058 				itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
1059 				totallen += itemidptr->alignedlen;
1060 				itemidptr++;
1061 			}
1062 			else
1063 			{
1064 				PageSetHasFreeLinePointers(page);
1065 				ItemIdSetUnused(lp);
1066 			}
1067 		}
1068 		nline = itemidptr - itemidbase;
1069 		/* By here, there are exactly nline elements in itemidbase array */
1070 
1071 		if (totallen > (Size) (pd_special - pd_lower))
1072 			ereport(ERROR,
1073 					(errcode(ERRCODE_DATA_CORRUPTED),
1074 			   errmsg("corrupted item lengths: total %u, available space %u",
1075 					  (unsigned int) totallen, pd_special - pd_lower)));
1076 
1077 		/*
1078 		 * Defragment the data areas of each tuple, being careful to preserve
1079 		 * each item's position in the linp array.
1080 		 */
1081 		compactify_tuples(itemidbase, nline, page);
1082 	}
1083 }
1084 
1085 /*
1086  * Set checksum for a page in shared buffers.
1087  *
1088  * If checksums are disabled, or if the page is not initialized, just return
1089  * the input.  Otherwise, we must make a copy of the page before calculating
1090  * the checksum, to prevent concurrent modifications (e.g. setting hint bits)
1091  * from making the final checksum invalid.  It doesn't matter if we include or
1092  * exclude hints during the copy, as long as we write a valid page and
1093  * associated checksum.
1094  *
1095  * Returns a pointer to the block-sized data that needs to be written. Uses
1096  * statically-allocated memory, so the caller must immediately write the
1097  * returned page and not refer to it again.
1098  */
1099 char *
PageSetChecksumCopy(Page page,BlockNumber blkno)1100 PageSetChecksumCopy(Page page, BlockNumber blkno)
1101 {
1102 	static char *pageCopy = NULL;
1103 
1104 	/* If we don't need a checksum, just return the passed-in data */
1105 	if (PageIsNew(page) || !DataChecksumsEnabled())
1106 		return (char *) page;
1107 
1108 	/*
1109 	 * We allocate the copy space once and use it over on each subsequent
1110 	 * call.  The point of palloc'ing here, rather than having a static char
1111 	 * array, is first to ensure adequate alignment for the checksumming code
1112 	 * and second to avoid wasting space in processes that never call this.
1113 	 */
1114 	if (pageCopy == NULL)
1115 		pageCopy = MemoryContextAlloc(TopMemoryContext, BLCKSZ);
1116 
1117 	memcpy(pageCopy, (char *) page, BLCKSZ);
1118 	((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno);
1119 	return pageCopy;
1120 }
1121 
1122 /*
1123  * Set checksum for a page in private memory.
1124  *
1125  * This must only be used when we know that no other process can be modifying
1126  * the page buffer.
1127  */
1128 void
PageSetChecksumInplace(Page page,BlockNumber blkno)1129 PageSetChecksumInplace(Page page, BlockNumber blkno)
1130 {
1131 	/* If we don't need a checksum, just return */
1132 	if (PageIsNew(page) || !DataChecksumsEnabled())
1133 		return;
1134 
1135 	((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blkno);
1136 }
1137