1 /*-------------------------------------------------------------------------
2  *
3  * bufpage.c
4  *	  POSTGRES standard buffer page code.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/page/bufpage.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/itup.h"
19 #include "access/xlog.h"
20 #include "storage/checksum.h"
21 #include "utils/memdebug.h"
22 #include "utils/memutils.h"
23 
24 
25 /* GUC variable */
26 bool		ignore_checksum_failure = false;
27 
28 
29 /* ----------------------------------------------------------------
30  *						Page support functions
31  * ----------------------------------------------------------------
32  */
33 
34 /*
35  * PageInit
36  *		Initializes the contents of a page.
37  *		Note that we don't calculate an initial checksum here; that's not done
38  *		until it's time to write.
39  */
40 void
PageInit(Page page,Size pageSize,Size specialSize)41 PageInit(Page page, Size pageSize, Size specialSize)
42 {
43 	PageHeader	p = (PageHeader) page;
44 
45 	specialSize = MAXALIGN(specialSize);
46 
47 	Assert(pageSize == BLCKSZ);
48 	Assert(pageSize > specialSize + SizeOfPageHeaderData);
49 
50 	/* Make sure all fields of page are zero, as well as unused space */
51 	MemSet(p, 0, pageSize);
52 
53 	p->pd_flags = 0;
54 	p->pd_lower = SizeOfPageHeaderData;
55 	p->pd_upper = pageSize - specialSize;
56 	p->pd_special = pageSize - specialSize;
57 	PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
58 	/* p->pd_prune_xid = InvalidTransactionId;		done by above MemSet */
59 }
60 
61 
62 /*
63  * PageIsVerified
64  *		Check that the page header and checksum (if any) appear valid.
65  *
66  * This is called when a page has just been read in from disk.  The idea is
67  * to cheaply detect trashed pages before we go nuts following bogus item
68  * pointers, testing invalid transaction identifiers, etc.
69  *
70  * It turns out to be necessary to allow zeroed pages here too.  Even though
71  * this routine is *not* called when deliberately adding a page to a relation,
72  * there are scenarios in which a zeroed page might be found in a table.
73  * (Example: a backend extends a relation, then crashes before it can write
74  * any WAL entry about the new page.  The kernel will already have the
75  * zeroed page in the file, and it will stay that way after restart.)  So we
76  * allow zeroed pages here, and are careful that the page access macros
77  * treat such a page as empty and without free space.  Eventually, VACUUM
78  * will clean up such a page and make it usable.
79  */
80 bool
PageIsVerified(Page page,BlockNumber blkno)81 PageIsVerified(Page page, BlockNumber blkno)
82 {
83 	PageHeader	p = (PageHeader) page;
84 	size_t	   *pagebytes;
85 	int			i;
86 	bool		checksum_failure = false;
87 	bool		header_sane = false;
88 	bool		all_zeroes = false;
89 	uint16		checksum = 0;
90 
91 	/*
92 	 * Don't verify page data unless the page passes basic non-zero test
93 	 */
94 	if (!PageIsNew(page))
95 	{
96 		if (DataChecksumsEnabled())
97 		{
98 			checksum = pg_checksum_page((char *) page, blkno);
99 
100 			if (checksum != p->pd_checksum)
101 				checksum_failure = true;
102 		}
103 
104 		/*
105 		 * The following checks don't prove the header is correct, only that
106 		 * it looks sane enough to allow into the buffer pool. Later usage of
107 		 * the block can still reveal problems, which is why we offer the
108 		 * checksum option.
109 		 */
110 		if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
111 			p->pd_lower <= p->pd_upper &&
112 			p->pd_upper <= p->pd_special &&
113 			p->pd_special <= BLCKSZ &&
114 			p->pd_special == MAXALIGN(p->pd_special))
115 			header_sane = true;
116 
117 		if (header_sane && !checksum_failure)
118 			return true;
119 	}
120 
121 	/*
122 	 * Check all-zeroes case. Luckily BLCKSZ is guaranteed to always be a
123 	 * multiple of size_t - and it's much faster to compare memory using the
124 	 * native word size.
125 	 */
126 	StaticAssertStmt(BLCKSZ == (BLCKSZ / sizeof(size_t)) * sizeof(size_t),
127 					 "BLCKSZ has to be a multiple of sizeof(size_t)");
128 
129 	all_zeroes = true;
130 	pagebytes = (size_t *) page;
131 	for (i = 0; i < (BLCKSZ / sizeof(size_t)); i++)
132 	{
133 		if (pagebytes[i] != 0)
134 		{
135 			all_zeroes = false;
136 			break;
137 		}
138 	}
139 
140 	if (all_zeroes)
141 		return true;
142 
143 	/*
144 	 * Throw a WARNING if the checksum fails, but only after we've checked for
145 	 * the all-zeroes case.
146 	 */
147 	if (checksum_failure)
148 	{
149 		ereport(WARNING,
150 				(errcode(ERRCODE_DATA_CORRUPTED),
151 				 errmsg("page verification failed, calculated checksum %u but expected %u",
152 						checksum, p->pd_checksum)));
153 
154 		if (header_sane && ignore_checksum_failure)
155 			return true;
156 	}
157 
158 	return false;
159 }
160 
161 
162 /*
163  *	PageAddItemExtended
164  *
165  *	Add an item to a page.  Return value is the offset at which it was
166  *	inserted, or InvalidOffsetNumber if the item is not inserted for any
167  *	reason.  A WARNING is issued indicating the reason for the refusal.
168  *
169  *	offsetNumber must be either InvalidOffsetNumber to specify finding a
170  *	free item pointer, or a value between FirstOffsetNumber and one past
171  *	the last existing item, to specify using that particular item pointer.
172  *
173  *	If offsetNumber is valid and flag PAI_OVERWRITE is set, we just store
174  *	the item at the specified offsetNumber, which must be either a
175  *	currently-unused item pointer, or one past the last existing item.
176  *
177  *	If offsetNumber is valid and flag PAI_OVERWRITE is not set, insert
178  *	the item at the specified offsetNumber, moving existing items later
179  *	in the array to make room.
180  *
181  *	If offsetNumber is not valid, then assign a slot by finding the first
182  *	one that is both unused and deallocated.
183  *
184  *	If flag PAI_IS_HEAP is set, we enforce that there can't be more than
185  *	MaxHeapTuplesPerPage line pointers on the page.
186  *
187  *	!!! EREPORT(ERROR) IS DISALLOWED HERE !!!
188  */
189 OffsetNumber
PageAddItemExtended(Page page,Item item,Size size,OffsetNumber offsetNumber,int flags)190 PageAddItemExtended(Page page,
191 					Item item,
192 					Size size,
193 					OffsetNumber offsetNumber,
194 					int flags)
195 {
196 	PageHeader	phdr = (PageHeader) page;
197 	Size		alignedSize;
198 	int			lower;
199 	int			upper;
200 	ItemId		itemId;
201 	OffsetNumber limit;
202 	bool		needshuffle = false;
203 
204 	/*
205 	 * Be wary about corrupted page pointers
206 	 */
207 	if (phdr->pd_lower < SizeOfPageHeaderData ||
208 		phdr->pd_lower > phdr->pd_upper ||
209 		phdr->pd_upper > phdr->pd_special ||
210 		phdr->pd_special > BLCKSZ)
211 		ereport(PANIC,
212 				(errcode(ERRCODE_DATA_CORRUPTED),
213 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
214 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
215 
216 	/*
217 	 * Select offsetNumber to place the new item at
218 	 */
219 	limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
220 
221 	/* was offsetNumber passed in? */
222 	if (OffsetNumberIsValid(offsetNumber))
223 	{
224 		/* yes, check it */
225 		if ((flags & PAI_OVERWRITE) != 0)
226 		{
227 			if (offsetNumber < limit)
228 			{
229 				itemId = PageGetItemId(phdr, offsetNumber);
230 				if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
231 				{
232 					elog(WARNING, "will not overwrite a used ItemId");
233 					return InvalidOffsetNumber;
234 				}
235 			}
236 		}
237 		else
238 		{
239 			if (offsetNumber < limit)
240 				needshuffle = true; /* need to move existing linp's */
241 		}
242 	}
243 	else
244 	{
245 		/* offsetNumber was not passed in, so find a free slot */
246 		/* if no free slot, we'll put it at limit (1st open slot) */
247 		if (PageHasFreeLinePointers(phdr))
248 		{
249 			/*
250 			 * Look for "recyclable" (unused) ItemId.  We check for no storage
251 			 * as well, just to be paranoid --- unused items should never have
252 			 * storage.
253 			 */
254 			for (offsetNumber = 1; offsetNumber < limit; offsetNumber++)
255 			{
256 				itemId = PageGetItemId(phdr, offsetNumber);
257 				if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
258 					break;
259 			}
260 			if (offsetNumber >= limit)
261 			{
262 				/* the hint is wrong, so reset it */
263 				PageClearHasFreeLinePointers(phdr);
264 			}
265 		}
266 		else
267 		{
268 			/* don't bother searching if hint says there's no free slot */
269 			offsetNumber = limit;
270 		}
271 	}
272 
273 	/* Reject placing items beyond the first unused line pointer */
274 	if (offsetNumber > limit)
275 	{
276 		elog(WARNING, "specified item offset is too large");
277 		return InvalidOffsetNumber;
278 	}
279 
280 	/* Reject placing items beyond heap boundary, if heap */
281 	if ((flags & PAI_IS_HEAP) != 0 && offsetNumber > MaxHeapTuplesPerPage)
282 	{
283 		elog(WARNING, "can't put more than MaxHeapTuplesPerPage items in a heap page");
284 		return InvalidOffsetNumber;
285 	}
286 
287 	/*
288 	 * Compute new lower and upper pointers for page, see if it'll fit.
289 	 *
290 	 * Note: do arithmetic as signed ints, to avoid mistakes if, say,
291 	 * alignedSize > pd_upper.
292 	 */
293 	if (offsetNumber == limit || needshuffle)
294 		lower = phdr->pd_lower + sizeof(ItemIdData);
295 	else
296 		lower = phdr->pd_lower;
297 
298 	alignedSize = MAXALIGN(size);
299 
300 	upper = (int) phdr->pd_upper - (int) alignedSize;
301 
302 	if (lower > upper)
303 		return InvalidOffsetNumber;
304 
305 	/*
306 	 * OK to insert the item.  First, shuffle the existing pointers if needed.
307 	 */
308 	itemId = PageGetItemId(phdr, offsetNumber);
309 
310 	if (needshuffle)
311 		memmove(itemId + 1, itemId,
312 				(limit - offsetNumber) * sizeof(ItemIdData));
313 
314 	/* set the item pointer */
315 	ItemIdSetNormal(itemId, upper, size);
316 
317 	/*
318 	 * Items normally contain no uninitialized bytes.  Core bufpage consumers
319 	 * conform, but this is not a necessary coding rule; a new index AM could
320 	 * opt to depart from it.  However, data type input functions and other
321 	 * C-language functions that synthesize datums should initialize all
322 	 * bytes; datumIsEqual() relies on this.  Testing here, along with the
323 	 * similar check in printtup(), helps to catch such mistakes.
324 	 *
325 	 * Values of the "name" type retrieved via index-only scans may contain
326 	 * uninitialized bytes; see comment in btrescan().  Valgrind will report
327 	 * this as an error, but it is safe to ignore.
328 	 */
329 	VALGRIND_CHECK_MEM_IS_DEFINED(item, size);
330 
331 	/* copy the item's data onto the page */
332 	memcpy((char *) page + upper, item, size);
333 
334 	/* adjust page header */
335 	phdr->pd_lower = (LocationIndex) lower;
336 	phdr->pd_upper = (LocationIndex) upper;
337 
338 	return offsetNumber;
339 }
340 
341 
342 /*
343  * PageGetTempPage
344  *		Get a temporary page in local memory for special processing.
345  *		The returned page is not initialized at all; caller must do that.
346  */
347 Page
PageGetTempPage(Page page)348 PageGetTempPage(Page page)
349 {
350 	Size		pageSize;
351 	Page		temp;
352 
353 	pageSize = PageGetPageSize(page);
354 	temp = (Page) palloc(pageSize);
355 
356 	return temp;
357 }
358 
359 /*
360  * PageGetTempPageCopy
361  *		Get a temporary page in local memory for special processing.
362  *		The page is initialized by copying the contents of the given page.
363  */
364 Page
PageGetTempPageCopy(Page page)365 PageGetTempPageCopy(Page page)
366 {
367 	Size		pageSize;
368 	Page		temp;
369 
370 	pageSize = PageGetPageSize(page);
371 	temp = (Page) palloc(pageSize);
372 
373 	memcpy(temp, page, pageSize);
374 
375 	return temp;
376 }
377 
378 /*
379  * PageGetTempPageCopySpecial
380  *		Get a temporary page in local memory for special processing.
381  *		The page is PageInit'd with the same special-space size as the
382  *		given page, and the special space is copied from the given page.
383  */
384 Page
PageGetTempPageCopySpecial(Page page)385 PageGetTempPageCopySpecial(Page page)
386 {
387 	Size		pageSize;
388 	Page		temp;
389 
390 	pageSize = PageGetPageSize(page);
391 	temp = (Page) palloc(pageSize);
392 
393 	PageInit(temp, pageSize, PageGetSpecialSize(page));
394 	memcpy(PageGetSpecialPointer(temp),
395 		   PageGetSpecialPointer(page),
396 		   PageGetSpecialSize(page));
397 
398 	return temp;
399 }
400 
401 /*
402  * PageRestoreTempPage
403  *		Copy temporary page back to permanent page after special processing
404  *		and release the temporary page.
405  */
406 void
PageRestoreTempPage(Page tempPage,Page oldPage)407 PageRestoreTempPage(Page tempPage, Page oldPage)
408 {
409 	Size		pageSize;
410 
411 	pageSize = PageGetPageSize(tempPage);
412 	memcpy((char *) oldPage, (char *) tempPage, pageSize);
413 
414 	pfree(tempPage);
415 }
416 
417 /*
418  * sorting support for PageRepairFragmentation and PageIndexMultiDelete
419  */
420 typedef struct itemIdSortData
421 {
422 	uint16		offsetindex;	/* linp array index */
423 	int16		itemoff;		/* page offset of item data */
424 	uint16		alignedlen;		/* MAXALIGN(item data len) */
425 } itemIdSortData;
426 typedef itemIdSortData *itemIdSort;
427 
428 static int
itemoffcompare(const void * itemidp1,const void * itemidp2)429 itemoffcompare(const void *itemidp1, const void *itemidp2)
430 {
431 	/* Sort in decreasing itemoff order */
432 	return ((itemIdSort) itemidp2)->itemoff -
433 		((itemIdSort) itemidp1)->itemoff;
434 }
435 
436 /*
437  * After removing or marking some line pointers unused, move the tuples to
438  * remove the gaps caused by the removed items.
439  */
440 static void
compactify_tuples(itemIdSort itemidbase,int nitems,Page page)441 compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
442 {
443 	PageHeader	phdr = (PageHeader) page;
444 	Offset		upper;
445 	int			i;
446 
447 	/* sort itemIdSortData array into decreasing itemoff order */
448 	qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
449 		  itemoffcompare);
450 
451 	upper = phdr->pd_special;
452 	for (i = 0; i < nitems; i++)
453 	{
454 		itemIdSort	itemidptr = &itemidbase[i];
455 		ItemId		lp;
456 
457 		lp = PageGetItemId(page, itemidptr->offsetindex + 1);
458 		upper -= itemidptr->alignedlen;
459 		memmove((char *) page + upper,
460 				(char *) page + itemidptr->itemoff,
461 				itemidptr->alignedlen);
462 		lp->lp_off = upper;
463 	}
464 
465 	phdr->pd_upper = upper;
466 }
467 
468 /*
469  * PageRepairFragmentation
470  *
471  * Frees fragmented space on a page.
472  * It doesn't remove unused line pointers! Please don't change this.
473  *
474  * This routine is usable for heap pages only, but see PageIndexMultiDelete.
475  *
476  * As a side effect, the page's PD_HAS_FREE_LINES hint bit is updated.
477  */
478 void
PageRepairFragmentation(Page page)479 PageRepairFragmentation(Page page)
480 {
481 	Offset		pd_lower = ((PageHeader) page)->pd_lower;
482 	Offset		pd_upper = ((PageHeader) page)->pd_upper;
483 	Offset		pd_special = ((PageHeader) page)->pd_special;
484 	ItemId		lp;
485 	int			nline,
486 				nstorage,
487 				nunused;
488 	int			i;
489 	Size		totallen;
490 
491 	/*
492 	 * It's worth the trouble to be more paranoid here than in most places,
493 	 * because we are about to reshuffle data in (what is usually) a shared
494 	 * disk buffer.  If we aren't careful then corrupted pointers, lengths,
495 	 * etc could cause us to clobber adjacent disk buffers, spreading the data
496 	 * loss further.  So, check everything.
497 	 */
498 	if (pd_lower < SizeOfPageHeaderData ||
499 		pd_lower > pd_upper ||
500 		pd_upper > pd_special ||
501 		pd_special > BLCKSZ ||
502 		pd_special != MAXALIGN(pd_special))
503 		ereport(ERROR,
504 				(errcode(ERRCODE_DATA_CORRUPTED),
505 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
506 						pd_lower, pd_upper, pd_special)));
507 
508 	nline = PageGetMaxOffsetNumber(page);
509 	nunused = nstorage = 0;
510 	for (i = FirstOffsetNumber; i <= nline; i++)
511 	{
512 		lp = PageGetItemId(page, i);
513 		if (ItemIdIsUsed(lp))
514 		{
515 			if (ItemIdHasStorage(lp))
516 				nstorage++;
517 		}
518 		else
519 		{
520 			/* Unused entries should have lp_len = 0, but make sure */
521 			ItemIdSetUnused(lp);
522 			nunused++;
523 		}
524 	}
525 
526 	if (nstorage == 0)
527 	{
528 		/* Page is completely empty, so just reset it quickly */
529 		((PageHeader) page)->pd_upper = pd_special;
530 	}
531 	else
532 	{
533 		/* Need to compact the page the hard way */
534 		itemIdSortData itemidbase[MaxHeapTuplesPerPage];
535 		itemIdSort	itemidptr = itemidbase;
536 
537 		totallen = 0;
538 		for (i = 0; i < nline; i++)
539 		{
540 			lp = PageGetItemId(page, i + 1);
541 			if (ItemIdHasStorage(lp))
542 			{
543 				itemidptr->offsetindex = i;
544 				itemidptr->itemoff = ItemIdGetOffset(lp);
545 				if (itemidptr->itemoff < (int) pd_upper ||
546 					itemidptr->itemoff >= (int) pd_special)
547 					ereport(ERROR,
548 							(errcode(ERRCODE_DATA_CORRUPTED),
549 							 errmsg("corrupted item pointer: %u",
550 									itemidptr->itemoff)));
551 				itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
552 				totallen += itemidptr->alignedlen;
553 				itemidptr++;
554 			}
555 		}
556 
557 		if (totallen > (Size) (pd_special - pd_lower))
558 			ereport(ERROR,
559 					(errcode(ERRCODE_DATA_CORRUPTED),
560 					 errmsg("corrupted item lengths: total %u, available space %u",
561 							(unsigned int) totallen, pd_special - pd_lower)));
562 
563 		compactify_tuples(itemidbase, nstorage, page);
564 	}
565 
566 	/* Set hint bit for PageAddItem */
567 	if (nunused > 0)
568 		PageSetHasFreeLinePointers(page);
569 	else
570 		PageClearHasFreeLinePointers(page);
571 }
572 
573 /*
574  * PageGetFreeSpace
575  *		Returns the size of the free (allocatable) space on a page,
576  *		reduced by the space needed for a new line pointer.
577  *
578  * Note: this should usually only be used on index pages.  Use
579  * PageGetHeapFreeSpace on heap pages.
580  */
581 Size
PageGetFreeSpace(Page page)582 PageGetFreeSpace(Page page)
583 {
584 	int			space;
585 
586 	/*
587 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
588 	 * pd_upper.
589 	 */
590 	space = (int) ((PageHeader) page)->pd_upper -
591 		(int) ((PageHeader) page)->pd_lower;
592 
593 	if (space < (int) sizeof(ItemIdData))
594 		return 0;
595 	space -= sizeof(ItemIdData);
596 
597 	return (Size) space;
598 }
599 
600 /*
601  * PageGetFreeSpaceForMultipleTuples
602  *		Returns the size of the free (allocatable) space on a page,
603  *		reduced by the space needed for multiple new line pointers.
604  *
605  * Note: this should usually only be used on index pages.  Use
606  * PageGetHeapFreeSpace on heap pages.
607  */
608 Size
PageGetFreeSpaceForMultipleTuples(Page page,int ntups)609 PageGetFreeSpaceForMultipleTuples(Page page, int ntups)
610 {
611 	int			space;
612 
613 	/*
614 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
615 	 * pd_upper.
616 	 */
617 	space = (int) ((PageHeader) page)->pd_upper -
618 		(int) ((PageHeader) page)->pd_lower;
619 
620 	if (space < (int) (ntups * sizeof(ItemIdData)))
621 		return 0;
622 	space -= ntups * sizeof(ItemIdData);
623 
624 	return (Size) space;
625 }
626 
627 /*
628  * PageGetExactFreeSpace
629  *		Returns the size of the free (allocatable) space on a page,
630  *		without any consideration for adding/removing line pointers.
631  */
632 Size
PageGetExactFreeSpace(Page page)633 PageGetExactFreeSpace(Page page)
634 {
635 	int			space;
636 
637 	/*
638 	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
639 	 * pd_upper.
640 	 */
641 	space = (int) ((PageHeader) page)->pd_upper -
642 		(int) ((PageHeader) page)->pd_lower;
643 
644 	if (space < 0)
645 		return 0;
646 
647 	return (Size) space;
648 }
649 
650 
651 /*
652  * PageGetHeapFreeSpace
653  *		Returns the size of the free (allocatable) space on a page,
654  *		reduced by the space needed for a new line pointer.
655  *
656  * The difference between this and PageGetFreeSpace is that this will return
657  * zero if there are already MaxHeapTuplesPerPage line pointers in the page
658  * and none are free.  We use this to enforce that no more than
659  * MaxHeapTuplesPerPage line pointers are created on a heap page.  (Although
660  * no more tuples than that could fit anyway, in the presence of redirected
661  * or dead line pointers it'd be possible to have too many line pointers.
662  * To avoid breaking code that assumes MaxHeapTuplesPerPage is a hard limit
663  * on the number of line pointers, we make this extra check.)
664  */
665 Size
PageGetHeapFreeSpace(Page page)666 PageGetHeapFreeSpace(Page page)
667 {
668 	Size		space;
669 
670 	space = PageGetFreeSpace(page);
671 	if (space > 0)
672 	{
673 		OffsetNumber offnum,
674 					nline;
675 
676 		/*
677 		 * Are there already MaxHeapTuplesPerPage line pointers in the page?
678 		 */
679 		nline = PageGetMaxOffsetNumber(page);
680 		if (nline >= MaxHeapTuplesPerPage)
681 		{
682 			if (PageHasFreeLinePointers((PageHeader) page))
683 			{
684 				/*
685 				 * Since this is just a hint, we must confirm that there is
686 				 * indeed a free line pointer
687 				 */
688 				for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
689 				{
690 					ItemId		lp = PageGetItemId(page, offnum);
691 
692 					if (!ItemIdIsUsed(lp))
693 						break;
694 				}
695 
696 				if (offnum > nline)
697 				{
698 					/*
699 					 * The hint is wrong, but we can't clear it here since we
700 					 * don't have the ability to mark the page dirty.
701 					 */
702 					space = 0;
703 				}
704 			}
705 			else
706 			{
707 				/*
708 				 * Although the hint might be wrong, PageAddItem will believe
709 				 * it anyway, so we must believe it too.
710 				 */
711 				space = 0;
712 			}
713 		}
714 	}
715 	return space;
716 }
717 
718 
719 /*
720  * PageIndexTupleDelete
721  *
722  * This routine does the work of removing a tuple from an index page.
723  *
724  * Unlike heap pages, we compact out the line pointer for the removed tuple.
725  */
726 void
PageIndexTupleDelete(Page page,OffsetNumber offnum)727 PageIndexTupleDelete(Page page, OffsetNumber offnum)
728 {
729 	PageHeader	phdr = (PageHeader) page;
730 	char	   *addr;
731 	ItemId		tup;
732 	Size		size;
733 	unsigned	offset;
734 	int			nbytes;
735 	int			offidx;
736 	int			nline;
737 
738 	/*
739 	 * As with PageRepairFragmentation, paranoia seems justified.
740 	 */
741 	if (phdr->pd_lower < SizeOfPageHeaderData ||
742 		phdr->pd_lower > phdr->pd_upper ||
743 		phdr->pd_upper > phdr->pd_special ||
744 		phdr->pd_special > BLCKSZ ||
745 		phdr->pd_special != MAXALIGN(phdr->pd_special))
746 		ereport(ERROR,
747 				(errcode(ERRCODE_DATA_CORRUPTED),
748 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
749 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
750 
751 	nline = PageGetMaxOffsetNumber(page);
752 	if ((int) offnum <= 0 || (int) offnum > nline)
753 		elog(ERROR, "invalid index offnum: %u", offnum);
754 
755 	/* change offset number to offset index */
756 	offidx = offnum - 1;
757 
758 	tup = PageGetItemId(page, offnum);
759 	Assert(ItemIdHasStorage(tup));
760 	size = ItemIdGetLength(tup);
761 	offset = ItemIdGetOffset(tup);
762 
763 	if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
764 		offset != MAXALIGN(offset))
765 		ereport(ERROR,
766 				(errcode(ERRCODE_DATA_CORRUPTED),
767 				 errmsg("corrupted item pointer: offset = %u, size = %u",
768 						offset, (unsigned int) size)));
769 
770 	/* Amount of space to actually be deleted */
771 	size = MAXALIGN(size);
772 
773 	/*
774 	 * First, we want to get rid of the pd_linp entry for the index tuple. We
775 	 * copy all subsequent linp's back one slot in the array. We don't use
776 	 * PageGetItemId, because we are manipulating the _array_, not individual
777 	 * linp's.
778 	 */
779 	nbytes = phdr->pd_lower -
780 		((char *) &phdr->pd_linp[offidx + 1] - (char *) phdr);
781 
782 	if (nbytes > 0)
783 		memmove((char *) &(phdr->pd_linp[offidx]),
784 				(char *) &(phdr->pd_linp[offidx + 1]),
785 				nbytes);
786 
787 	/*
788 	 * Now move everything between the old upper bound (beginning of tuple
789 	 * space) and the beginning of the deleted tuple forward, so that space in
790 	 * the middle of the page is left free.  If we've just deleted the tuple
791 	 * at the beginning of tuple space, then there's no need to do the copy.
792 	 */
793 
794 	/* beginning of tuple space */
795 	addr = (char *) page + phdr->pd_upper;
796 
797 	if (offset > phdr->pd_upper)
798 		memmove(addr + size, addr, offset - phdr->pd_upper);
799 
800 	/* adjust free space boundary pointers */
801 	phdr->pd_upper += size;
802 	phdr->pd_lower -= sizeof(ItemIdData);
803 
804 	/*
805 	 * Finally, we need to adjust the linp entries that remain.
806 	 *
807 	 * Anything that used to be before the deleted tuple's data was moved
808 	 * forward by the size of the deleted tuple.
809 	 */
810 	if (!PageIsEmpty(page))
811 	{
812 		int			i;
813 
814 		nline--;				/* there's one less than when we started */
815 		for (i = 1; i <= nline; i++)
816 		{
817 			ItemId		ii = PageGetItemId(phdr, i);
818 
819 			Assert(ItemIdHasStorage(ii));
820 			if (ItemIdGetOffset(ii) <= offset)
821 				ii->lp_off += size;
822 		}
823 	}
824 }
825 
826 
827 /*
828  * PageIndexMultiDelete
829  *
830  * This routine handles the case of deleting multiple tuples from an
831  * index page at once.  It is considerably faster than a loop around
832  * PageIndexTupleDelete ... however, the caller *must* supply the array
833  * of item numbers to be deleted in item number order!
834  */
835 void
PageIndexMultiDelete(Page page,OffsetNumber * itemnos,int nitems)836 PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
837 {
838 	PageHeader	phdr = (PageHeader) page;
839 	Offset		pd_lower = phdr->pd_lower;
840 	Offset		pd_upper = phdr->pd_upper;
841 	Offset		pd_special = phdr->pd_special;
842 	itemIdSortData itemidbase[MaxIndexTuplesPerPage];
843 	ItemIdData	newitemids[MaxIndexTuplesPerPage];
844 	itemIdSort	itemidptr;
845 	ItemId		lp;
846 	int			nline,
847 				nused;
848 	Size		totallen;
849 	Size		size;
850 	unsigned	offset;
851 	int			nextitm;
852 	OffsetNumber offnum;
853 
854 	Assert(nitems <= MaxIndexTuplesPerPage);
855 
856 	/*
857 	 * If there aren't very many items to delete, then retail
858 	 * PageIndexTupleDelete is the best way.  Delete the items in reverse
859 	 * order so we don't have to think about adjusting item numbers for
860 	 * previous deletions.
861 	 *
862 	 * TODO: tune the magic number here
863 	 */
864 	if (nitems <= 2)
865 	{
866 		while (--nitems >= 0)
867 			PageIndexTupleDelete(page, itemnos[nitems]);
868 		return;
869 	}
870 
871 	/*
872 	 * As with PageRepairFragmentation, paranoia seems justified.
873 	 */
874 	if (pd_lower < SizeOfPageHeaderData ||
875 		pd_lower > pd_upper ||
876 		pd_upper > pd_special ||
877 		pd_special > BLCKSZ ||
878 		pd_special != MAXALIGN(pd_special))
879 		ereport(ERROR,
880 				(errcode(ERRCODE_DATA_CORRUPTED),
881 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
882 						pd_lower, pd_upper, pd_special)));
883 
884 	/*
885 	 * Scan the item pointer array and build a list of just the ones we are
886 	 * going to keep.  Notice we do not modify the page yet, since we are
887 	 * still validity-checking.
888 	 */
889 	nline = PageGetMaxOffsetNumber(page);
890 	itemidptr = itemidbase;
891 	totallen = 0;
892 	nused = 0;
893 	nextitm = 0;
894 	for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
895 	{
896 		lp = PageGetItemId(page, offnum);
897 		Assert(ItemIdHasStorage(lp));
898 		size = ItemIdGetLength(lp);
899 		offset = ItemIdGetOffset(lp);
900 		if (offset < pd_upper ||
901 			(offset + size) > pd_special ||
902 			offset != MAXALIGN(offset))
903 			ereport(ERROR,
904 					(errcode(ERRCODE_DATA_CORRUPTED),
905 					 errmsg("corrupted item pointer: offset = %u, length = %u",
906 							offset, (unsigned int) size)));
907 
908 		if (nextitm < nitems && offnum == itemnos[nextitm])
909 		{
910 			/* skip item to be deleted */
911 			nextitm++;
912 		}
913 		else
914 		{
915 			itemidptr->offsetindex = nused; /* where it will go */
916 			itemidptr->itemoff = offset;
917 			itemidptr->alignedlen = MAXALIGN(size);
918 			totallen += itemidptr->alignedlen;
919 			newitemids[nused] = *lp;
920 			itemidptr++;
921 			nused++;
922 		}
923 	}
924 
925 	/* this will catch invalid or out-of-order itemnos[] */
926 	if (nextitm != nitems)
927 		elog(ERROR, "incorrect index offsets supplied");
928 
929 	if (totallen > (Size) (pd_special - pd_lower))
930 		ereport(ERROR,
931 				(errcode(ERRCODE_DATA_CORRUPTED),
932 				 errmsg("corrupted item lengths: total %u, available space %u",
933 						(unsigned int) totallen, pd_special - pd_lower)));
934 
935 	/*
936 	 * Looks good. Overwrite the line pointers with the copy, from which we've
937 	 * removed all the unused items.
938 	 */
939 	memcpy(phdr->pd_linp, newitemids, nused * sizeof(ItemIdData));
940 	phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
941 
942 	/* and compactify the tuple data */
943 	compactify_tuples(itemidbase, nused, page);
944 }
945 
946 
947 /*
948  * PageIndexTupleDeleteNoCompact
949  *
950  * Remove the specified tuple from an index page, but set its line pointer
951  * to "unused" instead of compacting it out, except that it can be removed
952  * if it's the last line pointer on the page.
953  *
954  * This is used for index AMs that require that existing TIDs of live tuples
955  * remain unchanged, and are willing to allow unused line pointers instead.
956  */
957 void
PageIndexTupleDeleteNoCompact(Page page,OffsetNumber offnum)958 PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum)
959 {
960 	PageHeader	phdr = (PageHeader) page;
961 	char	   *addr;
962 	ItemId		tup;
963 	Size		size;
964 	unsigned	offset;
965 	int			nline;
966 
967 	/*
968 	 * As with PageRepairFragmentation, paranoia seems justified.
969 	 */
970 	if (phdr->pd_lower < SizeOfPageHeaderData ||
971 		phdr->pd_lower > phdr->pd_upper ||
972 		phdr->pd_upper > phdr->pd_special ||
973 		phdr->pd_special > BLCKSZ ||
974 		phdr->pd_special != MAXALIGN(phdr->pd_special))
975 		ereport(ERROR,
976 				(errcode(ERRCODE_DATA_CORRUPTED),
977 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
978 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
979 
980 	nline = PageGetMaxOffsetNumber(page);
981 	if ((int) offnum <= 0 || (int) offnum > nline)
982 		elog(ERROR, "invalid index offnum: %u", offnum);
983 
984 	tup = PageGetItemId(page, offnum);
985 	Assert(ItemIdHasStorage(tup));
986 	size = ItemIdGetLength(tup);
987 	offset = ItemIdGetOffset(tup);
988 
989 	if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
990 		offset != MAXALIGN(offset))
991 		ereport(ERROR,
992 				(errcode(ERRCODE_DATA_CORRUPTED),
993 				 errmsg("corrupted item pointer: offset = %u, size = %u",
994 						offset, (unsigned int) size)));
995 
996 	/* Amount of space to actually be deleted */
997 	size = MAXALIGN(size);
998 
999 	/*
1000 	 * Either set the item pointer to "unused", or zap it if it's the last
1001 	 * one.  (Note: it's possible that the next-to-last one(s) are already
1002 	 * unused, but we do not trouble to try to compact them out if so.)
1003 	 */
1004 	if ((int) offnum < nline)
1005 		ItemIdSetUnused(tup);
1006 	else
1007 	{
1008 		phdr->pd_lower -= sizeof(ItemIdData);
1009 		nline--;				/* there's one less than when we started */
1010 	}
1011 
1012 	/*
1013 	 * Now move everything between the old upper bound (beginning of tuple
1014 	 * space) and the beginning of the deleted tuple forward, so that space in
1015 	 * the middle of the page is left free.  If we've just deleted the tuple
1016 	 * at the beginning of tuple space, then there's no need to do the copy.
1017 	 */
1018 
1019 	/* beginning of tuple space */
1020 	addr = (char *) page + phdr->pd_upper;
1021 
1022 	if (offset > phdr->pd_upper)
1023 		memmove(addr + size, addr, offset - phdr->pd_upper);
1024 
1025 	/* adjust free space boundary pointer */
1026 	phdr->pd_upper += size;
1027 
1028 	/*
1029 	 * Finally, we need to adjust the linp entries that remain.
1030 	 *
1031 	 * Anything that used to be before the deleted tuple's data was moved
1032 	 * forward by the size of the deleted tuple.
1033 	 */
1034 	if (!PageIsEmpty(page))
1035 	{
1036 		int			i;
1037 
1038 		for (i = 1; i <= nline; i++)
1039 		{
1040 			ItemId		ii = PageGetItemId(phdr, i);
1041 
1042 			if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1043 				ii->lp_off += size;
1044 		}
1045 	}
1046 }
1047 
1048 
1049 /*
1050  * PageIndexTupleOverwrite
1051  *
1052  * Replace a specified tuple on an index page.
1053  *
1054  * The new tuple is placed exactly where the old one had been, shifting
1055  * other tuples' data up or down as needed to keep the page compacted.
1056  * This is better than deleting and reinserting the tuple, because it
1057  * avoids any data shifting when the tuple size doesn't change; and
1058  * even when it does, we avoid moving the item pointers around.
1059  * Conceivably this could also be of use to an index AM that cares about
1060  * the physical order of tuples as well as their ItemId order.
1061  *
1062  * If there's insufficient space for the new tuple, return false.  Other
1063  * errors represent data-corruption problems, so we just elog.
1064  */
1065 bool
PageIndexTupleOverwrite(Page page,OffsetNumber offnum,Item newtup,Size newsize)1066 PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
1067 						Item newtup, Size newsize)
1068 {
1069 	PageHeader	phdr = (PageHeader) page;
1070 	ItemId		tupid;
1071 	int			oldsize;
1072 	unsigned	offset;
1073 	Size		alignednewsize;
1074 	int			size_diff;
1075 	int			itemcount;
1076 
1077 	/*
1078 	 * As with PageRepairFragmentation, paranoia seems justified.
1079 	 */
1080 	if (phdr->pd_lower < SizeOfPageHeaderData ||
1081 		phdr->pd_lower > phdr->pd_upper ||
1082 		phdr->pd_upper > phdr->pd_special ||
1083 		phdr->pd_special > BLCKSZ ||
1084 		phdr->pd_special != MAXALIGN(phdr->pd_special))
1085 		ereport(ERROR,
1086 				(errcode(ERRCODE_DATA_CORRUPTED),
1087 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
1088 						phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
1089 
1090 	itemcount = PageGetMaxOffsetNumber(page);
1091 	if ((int) offnum <= 0 || (int) offnum > itemcount)
1092 		elog(ERROR, "invalid index offnum: %u", offnum);
1093 
1094 	tupid = PageGetItemId(page, offnum);
1095 	Assert(ItemIdHasStorage(tupid));
1096 	oldsize = ItemIdGetLength(tupid);
1097 	offset = ItemIdGetOffset(tupid);
1098 
1099 	if (offset < phdr->pd_upper || (offset + oldsize) > phdr->pd_special ||
1100 		offset != MAXALIGN(offset))
1101 		ereport(ERROR,
1102 				(errcode(ERRCODE_DATA_CORRUPTED),
1103 				 errmsg("corrupted item pointer: offset = %u, size = %u",
1104 						offset, (unsigned int) oldsize)));
1105 
1106 	/*
1107 	 * Determine actual change in space requirement, check for page overflow.
1108 	 */
1109 	oldsize = MAXALIGN(oldsize);
1110 	alignednewsize = MAXALIGN(newsize);
1111 	if (alignednewsize > oldsize + (phdr->pd_upper - phdr->pd_lower))
1112 		return false;
1113 
1114 	/*
1115 	 * Relocate existing data and update line pointers, unless the new tuple
1116 	 * is the same size as the old (after alignment), in which case there's
1117 	 * nothing to do.  Notice that what we have to relocate is data before the
1118 	 * target tuple, not data after, so it's convenient to express size_diff
1119 	 * as the amount by which the tuple's size is decreasing, making it the
1120 	 * delta to add to pd_upper and affected line pointers.
1121 	 */
1122 	size_diff = oldsize - (int) alignednewsize;
1123 	if (size_diff != 0)
1124 	{
1125 		char	   *addr = (char *) page + phdr->pd_upper;
1126 		int			i;
1127 
1128 		/* relocate all tuple data before the target tuple */
1129 		memmove(addr + size_diff, addr, offset - phdr->pd_upper);
1130 
1131 		/* adjust free space boundary pointer */
1132 		phdr->pd_upper += size_diff;
1133 
1134 		/* adjust affected line pointers too */
1135 		for (i = FirstOffsetNumber; i <= itemcount; i++)
1136 		{
1137 			ItemId		ii = PageGetItemId(phdr, i);
1138 
1139 			/* Allow items without storage; currently only BRIN needs that */
1140 			if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1141 				ii->lp_off += size_diff;
1142 		}
1143 	}
1144 
1145 	/* Update the item's tuple length (other fields shouldn't change) */
1146 	ItemIdSetNormal(tupid, offset + size_diff, newsize);
1147 
1148 	/* Copy new tuple data onto page */
1149 	memcpy(PageGetItem(page, tupid), newtup, newsize);
1150 
1151 	return true;
1152 }
1153 
1154 
1155 /*
1156  * Set checksum for a page in shared buffers.
1157  *
1158  * If checksums are disabled, or if the page is not initialized, just return
1159  * the input.  Otherwise, we must make a copy of the page before calculating
1160  * the checksum, to prevent concurrent modifications (e.g. setting hint bits)
1161  * from making the final checksum invalid.  It doesn't matter if we include or
1162  * exclude hints during the copy, as long as we write a valid page and
1163  * associated checksum.
1164  *
1165  * Returns a pointer to the block-sized data that needs to be written. Uses
1166  * statically-allocated memory, so the caller must immediately write the
1167  * returned page and not refer to it again.
1168  */
1169 char *
PageSetChecksumCopy(Page page,BlockNumber blkno)1170 PageSetChecksumCopy(Page page, BlockNumber blkno)
1171 {
1172 	static char *pageCopy = NULL;
1173 
1174 	/* If we don't need a checksum, just return the passed-in data */
1175 	if (PageIsNew(page) || !DataChecksumsEnabled())
1176 		return (char *) page;
1177 
1178 	/*
1179 	 * We allocate the copy space once and use it over on each subsequent
1180 	 * call.  The point of palloc'ing here, rather than having a static char
1181 	 * array, is first to ensure adequate alignment for the checksumming code
1182 	 * and second to avoid wasting space in processes that never call this.
1183 	 */
1184 	if (pageCopy == NULL)
1185 		pageCopy = MemoryContextAlloc(TopMemoryContext, BLCKSZ);
1186 
1187 	memcpy(pageCopy, (char *) page, BLCKSZ);
1188 	((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno);
1189 	return pageCopy;
1190 }
1191 
1192 /*
1193  * Set checksum for a page in private memory.
1194  *
1195  * This must only be used when we know that no other process can be modifying
1196  * the page buffer.
1197  */
1198 void
PageSetChecksumInplace(Page page,BlockNumber blkno)1199 PageSetChecksumInplace(Page page, BlockNumber blkno)
1200 {
1201 	/* If we don't need a checksum, just return */
1202 	if (PageIsNew(page) || !DataChecksumsEnabled())
1203 		return;
1204 
1205 	((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blkno);
1206 }
1207