1 /*-------------------------------------------------------------------------
2  *
3  * spgxlog.c
4  *	  WAL replay logic for SP-GiST
5  *
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			 src/backend/access/spgist/spgxlog.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/bufmask.h"
18 #include "access/spgist_private.h"
19 #include "access/spgxlog.h"
20 #include "access/transam.h"
21 #include "access/xlog.h"
22 #include "access/xlogutils.h"
23 #include "storage/standby.h"
24 #include "utils/memutils.h"
25 
26 
27 static MemoryContext opCtx;		/* working memory for operations */
28 
29 
30 /*
31  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
32  *
33  * At present, all we need is enough info to support spgFormDeadTuple(),
34  * plus the isBuild flag.
35  */
36 static void
fillFakeState(SpGistState * state,spgxlogState stateSrc)37 fillFakeState(SpGistState *state, spgxlogState stateSrc)
38 {
39 	memset(state, 0, sizeof(*state));
40 
41 	state->myXid = stateSrc.myXid;
42 	state->isBuild = stateSrc.isBuild;
43 	state->deadTupleStorage = palloc0(SGDTSIZE);
44 }
45 
46 /*
47  * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
48  * to replay SpGistPageAddNewItem() operations.  If the offset points at an
49  * existing tuple, it had better be a placeholder tuple.
50  */
51 static void
addOrReplaceTuple(Page page,Item tuple,int size,OffsetNumber offset)52 addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
53 {
54 	if (offset <= PageGetMaxOffsetNumber(page))
55 	{
56 		SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
57 														   PageGetItemId(page, offset));
58 
59 		if (dt->tupstate != SPGIST_PLACEHOLDER)
60 			elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
61 
62 		Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
63 		SpGistPageGetOpaque(page)->nPlaceholder--;
64 
65 		PageIndexTupleDelete(page, offset);
66 	}
67 
68 	Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
69 
70 	if (PageAddItem(page, tuple, size, offset, false, false) != offset)
71 		elog(ERROR, "failed to add item of size %u to SPGiST index page",
72 			 size);
73 }
74 
75 static void
spgRedoAddLeaf(XLogReaderState * record)76 spgRedoAddLeaf(XLogReaderState *record)
77 {
78 	XLogRecPtr	lsn = record->EndRecPtr;
79 	char	   *ptr = XLogRecGetData(record);
80 	spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
81 	char	   *leafTuple;
82 	SpGistLeafTupleData leafTupleHdr;
83 	Buffer		buffer;
84 	Page		page;
85 	XLogRedoAction action;
86 
87 	ptr += sizeof(spgxlogAddLeaf);
88 	leafTuple = ptr;
89 	/* the leaf tuple is unaligned, so make a copy to access its header */
90 	memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
91 
92 	/*
93 	 * In normal operation we would have both current and parent pages locked
94 	 * simultaneously; but in WAL replay it should be safe to update the leaf
95 	 * page before updating the parent.
96 	 */
97 	if (xldata->newPage)
98 	{
99 		buffer = XLogInitBufferForRedo(record, 0);
100 		SpGistInitBuffer(buffer,
101 						 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
102 		action = BLK_NEEDS_REDO;
103 	}
104 	else
105 		action = XLogReadBufferForRedo(record, 0, &buffer);
106 
107 	if (action == BLK_NEEDS_REDO)
108 	{
109 		page = BufferGetPage(buffer);
110 
111 		/* insert new tuple */
112 		if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
113 		{
114 			/* normal cases, tuple was added by SpGistPageAddNewItem */
115 			addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
116 							  xldata->offnumLeaf);
117 
118 			/* update head tuple's chain link if needed */
119 			if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
120 			{
121 				SpGistLeafTuple head;
122 
123 				head = (SpGistLeafTuple) PageGetItem(page,
124 													 PageGetItemId(page, xldata->offnumHeadLeaf));
125 				Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
126 				SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
127 			}
128 		}
129 		else
130 		{
131 			/* replacing a DEAD tuple */
132 			PageIndexTupleDelete(page, xldata->offnumLeaf);
133 			if (PageAddItem(page,
134 							(Item) leafTuple, leafTupleHdr.size,
135 							xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
136 				elog(ERROR, "failed to add item of size %u to SPGiST index page",
137 					 leafTupleHdr.size);
138 		}
139 
140 		PageSetLSN(page, lsn);
141 		MarkBufferDirty(buffer);
142 	}
143 	if (BufferIsValid(buffer))
144 		UnlockReleaseBuffer(buffer);
145 
146 	/* update parent downlink if necessary */
147 	if (xldata->offnumParent != InvalidOffsetNumber)
148 	{
149 		if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
150 		{
151 			SpGistInnerTuple tuple;
152 			BlockNumber blknoLeaf;
153 
154 			XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
155 
156 			page = BufferGetPage(buffer);
157 
158 			tuple = (SpGistInnerTuple) PageGetItem(page,
159 												   PageGetItemId(page, xldata->offnumParent));
160 
161 			spgUpdateNodeLink(tuple, xldata->nodeI,
162 							  blknoLeaf, xldata->offnumLeaf);
163 
164 			PageSetLSN(page, lsn);
165 			MarkBufferDirty(buffer);
166 		}
167 		if (BufferIsValid(buffer))
168 			UnlockReleaseBuffer(buffer);
169 	}
170 }
171 
172 static void
spgRedoMoveLeafs(XLogReaderState * record)173 spgRedoMoveLeafs(XLogReaderState *record)
174 {
175 	XLogRecPtr	lsn = record->EndRecPtr;
176 	char	   *ptr = XLogRecGetData(record);
177 	spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
178 	SpGistState state;
179 	OffsetNumber *toDelete;
180 	OffsetNumber *toInsert;
181 	int			nInsert;
182 	Buffer		buffer;
183 	Page		page;
184 	XLogRedoAction action;
185 	BlockNumber blknoDst;
186 
187 	XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
188 
189 	fillFakeState(&state, xldata->stateSrc);
190 
191 	nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
192 
193 	ptr += SizeOfSpgxlogMoveLeafs;
194 	toDelete = (OffsetNumber *) ptr;
195 	ptr += sizeof(OffsetNumber) * xldata->nMoves;
196 	toInsert = (OffsetNumber *) ptr;
197 	ptr += sizeof(OffsetNumber) * nInsert;
198 
199 	/* now ptr points to the list of leaf tuples */
200 
201 	/*
202 	 * In normal operation we would have all three pages (source, dest, and
203 	 * parent) locked simultaneously; but in WAL replay it should be safe to
204 	 * update them one at a time, as long as we do it in the right order.
205 	 */
206 
207 	/* Insert tuples on the dest page (do first, so redirect is valid) */
208 	if (xldata->newPage)
209 	{
210 		buffer = XLogInitBufferForRedo(record, 1);
211 		SpGistInitBuffer(buffer,
212 						 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
213 		action = BLK_NEEDS_REDO;
214 	}
215 	else
216 		action = XLogReadBufferForRedo(record, 1, &buffer);
217 
218 	if (action == BLK_NEEDS_REDO)
219 	{
220 		int			i;
221 
222 		page = BufferGetPage(buffer);
223 
224 		for (i = 0; i < nInsert; i++)
225 		{
226 			char	   *leafTuple;
227 			SpGistLeafTupleData leafTupleHdr;
228 
229 			/*
230 			 * the tuples are not aligned, so must copy to access the size
231 			 * field.
232 			 */
233 			leafTuple = ptr;
234 			memcpy(&leafTupleHdr, leafTuple,
235 				   sizeof(SpGistLeafTupleData));
236 
237 			addOrReplaceTuple(page, (Item) leafTuple,
238 							  leafTupleHdr.size, toInsert[i]);
239 			ptr += leafTupleHdr.size;
240 		}
241 
242 		PageSetLSN(page, lsn);
243 		MarkBufferDirty(buffer);
244 	}
245 	if (BufferIsValid(buffer))
246 		UnlockReleaseBuffer(buffer);
247 
248 	/* Delete tuples from the source page, inserting a redirection pointer */
249 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
250 	{
251 		page = BufferGetPage(buffer);
252 
253 		spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
254 								state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
255 								SPGIST_PLACEHOLDER,
256 								blknoDst,
257 								toInsert[nInsert - 1]);
258 
259 		PageSetLSN(page, lsn);
260 		MarkBufferDirty(buffer);
261 	}
262 	if (BufferIsValid(buffer))
263 		UnlockReleaseBuffer(buffer);
264 
265 	/* And update the parent downlink */
266 	if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
267 	{
268 		SpGistInnerTuple tuple;
269 
270 		page = BufferGetPage(buffer);
271 
272 		tuple = (SpGistInnerTuple) PageGetItem(page,
273 											   PageGetItemId(page, xldata->offnumParent));
274 
275 		spgUpdateNodeLink(tuple, xldata->nodeI,
276 						  blknoDst, toInsert[nInsert - 1]);
277 
278 		PageSetLSN(page, lsn);
279 		MarkBufferDirty(buffer);
280 	}
281 	if (BufferIsValid(buffer))
282 		UnlockReleaseBuffer(buffer);
283 }
284 
285 static void
spgRedoAddNode(XLogReaderState * record)286 spgRedoAddNode(XLogReaderState *record)
287 {
288 	XLogRecPtr	lsn = record->EndRecPtr;
289 	char	   *ptr = XLogRecGetData(record);
290 	spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
291 	char	   *innerTuple;
292 	SpGistInnerTupleData innerTupleHdr;
293 	SpGistState state;
294 	Buffer		buffer;
295 	Page		page;
296 	XLogRedoAction action;
297 
298 	ptr += sizeof(spgxlogAddNode);
299 	innerTuple = ptr;
300 	/* the tuple is unaligned, so make a copy to access its header */
301 	memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
302 
303 	fillFakeState(&state, xldata->stateSrc);
304 
305 	if (!XLogRecHasBlockRef(record, 1))
306 	{
307 		/* update in place */
308 		Assert(xldata->parentBlk == -1);
309 		if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
310 		{
311 			page = BufferGetPage(buffer);
312 
313 			PageIndexTupleDelete(page, xldata->offnum);
314 			if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
315 							xldata->offnum,
316 							false, false) != xldata->offnum)
317 				elog(ERROR, "failed to add item of size %u to SPGiST index page",
318 					 innerTupleHdr.size);
319 
320 			PageSetLSN(page, lsn);
321 			MarkBufferDirty(buffer);
322 		}
323 		if (BufferIsValid(buffer))
324 			UnlockReleaseBuffer(buffer);
325 	}
326 	else
327 	{
328 		BlockNumber blkno;
329 		BlockNumber blknoNew;
330 
331 		XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
332 		XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
333 
334 		/*
335 		 * In normal operation we would have all three pages (source, dest,
336 		 * and parent) locked simultaneously; but in WAL replay it should be
337 		 * safe to update them one at a time, as long as we do it in the right
338 		 * order. We must insert the new tuple before replacing the old tuple
339 		 * with the redirect tuple.
340 		 */
341 
342 		/* Install new tuple first so redirect is valid */
343 		if (xldata->newPage)
344 		{
345 			/* AddNode is not used for nulls pages */
346 			buffer = XLogInitBufferForRedo(record, 1);
347 			SpGistInitBuffer(buffer, 0);
348 			action = BLK_NEEDS_REDO;
349 		}
350 		else
351 			action = XLogReadBufferForRedo(record, 1, &buffer);
352 		if (action == BLK_NEEDS_REDO)
353 		{
354 			page = BufferGetPage(buffer);
355 
356 			addOrReplaceTuple(page, (Item) innerTuple,
357 							  innerTupleHdr.size, xldata->offnumNew);
358 
359 			/*
360 			 * If parent is in this same page, update it now.
361 			 */
362 			if (xldata->parentBlk == 1)
363 			{
364 				SpGistInnerTuple parentTuple;
365 
366 				parentTuple = (SpGistInnerTuple) PageGetItem(page,
367 															 PageGetItemId(page, xldata->offnumParent));
368 
369 				spgUpdateNodeLink(parentTuple, xldata->nodeI,
370 								  blknoNew, xldata->offnumNew);
371 			}
372 			PageSetLSN(page, lsn);
373 			MarkBufferDirty(buffer);
374 		}
375 		if (BufferIsValid(buffer))
376 			UnlockReleaseBuffer(buffer);
377 
378 		/* Delete old tuple, replacing it with redirect or placeholder tuple */
379 		if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
380 		{
381 			SpGistDeadTuple dt;
382 
383 			page = BufferGetPage(buffer);
384 
385 			if (state.isBuild)
386 				dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
387 									  InvalidBlockNumber,
388 									  InvalidOffsetNumber);
389 			else
390 				dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
391 									  blknoNew,
392 									  xldata->offnumNew);
393 
394 			PageIndexTupleDelete(page, xldata->offnum);
395 			if (PageAddItem(page, (Item) dt, dt->size,
396 							xldata->offnum,
397 							false, false) != xldata->offnum)
398 				elog(ERROR, "failed to add item of size %u to SPGiST index page",
399 					 dt->size);
400 
401 			if (state.isBuild)
402 				SpGistPageGetOpaque(page)->nPlaceholder++;
403 			else
404 				SpGistPageGetOpaque(page)->nRedirection++;
405 
406 			/*
407 			 * If parent is in this same page, update it now.
408 			 */
409 			if (xldata->parentBlk == 0)
410 			{
411 				SpGistInnerTuple parentTuple;
412 
413 				parentTuple = (SpGistInnerTuple) PageGetItem(page,
414 															 PageGetItemId(page, xldata->offnumParent));
415 
416 				spgUpdateNodeLink(parentTuple, xldata->nodeI,
417 								  blknoNew, xldata->offnumNew);
418 			}
419 			PageSetLSN(page, lsn);
420 			MarkBufferDirty(buffer);
421 		}
422 		if (BufferIsValid(buffer))
423 			UnlockReleaseBuffer(buffer);
424 
425 		/*
426 		 * Update parent downlink (if we didn't do it as part of the source or
427 		 * destination page update already).
428 		 */
429 		if (xldata->parentBlk == 2)
430 		{
431 			if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
432 			{
433 				SpGistInnerTuple parentTuple;
434 
435 				page = BufferGetPage(buffer);
436 
437 				parentTuple = (SpGistInnerTuple) PageGetItem(page,
438 															 PageGetItemId(page, xldata->offnumParent));
439 
440 				spgUpdateNodeLink(parentTuple, xldata->nodeI,
441 								  blknoNew, xldata->offnumNew);
442 
443 				PageSetLSN(page, lsn);
444 				MarkBufferDirty(buffer);
445 			}
446 			if (BufferIsValid(buffer))
447 				UnlockReleaseBuffer(buffer);
448 		}
449 	}
450 }
451 
452 static void
spgRedoSplitTuple(XLogReaderState * record)453 spgRedoSplitTuple(XLogReaderState *record)
454 {
455 	XLogRecPtr	lsn = record->EndRecPtr;
456 	char	   *ptr = XLogRecGetData(record);
457 	spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
458 	char	   *prefixTuple;
459 	SpGistInnerTupleData prefixTupleHdr;
460 	char	   *postfixTuple;
461 	SpGistInnerTupleData postfixTupleHdr;
462 	Buffer		buffer;
463 	Page		page;
464 	XLogRedoAction action;
465 
466 	ptr += sizeof(spgxlogSplitTuple);
467 	prefixTuple = ptr;
468 	/* the prefix tuple is unaligned, so make a copy to access its header */
469 	memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
470 	ptr += prefixTupleHdr.size;
471 	postfixTuple = ptr;
472 	/* postfix tuple is also unaligned */
473 	memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
474 
475 	/*
476 	 * In normal operation we would have both pages locked simultaneously; but
477 	 * in WAL replay it should be safe to update them one at a time, as long
478 	 * as we do it in the right order.
479 	 */
480 
481 	/* insert postfix tuple first to avoid dangling link */
482 	if (!xldata->postfixBlkSame)
483 	{
484 		if (xldata->newPage)
485 		{
486 			buffer = XLogInitBufferForRedo(record, 1);
487 			/* SplitTuple is not used for nulls pages */
488 			SpGistInitBuffer(buffer, 0);
489 			action = BLK_NEEDS_REDO;
490 		}
491 		else
492 			action = XLogReadBufferForRedo(record, 1, &buffer);
493 		if (action == BLK_NEEDS_REDO)
494 		{
495 			page = BufferGetPage(buffer);
496 
497 			addOrReplaceTuple(page, (Item) postfixTuple,
498 							  postfixTupleHdr.size, xldata->offnumPostfix);
499 
500 			PageSetLSN(page, lsn);
501 			MarkBufferDirty(buffer);
502 		}
503 		if (BufferIsValid(buffer))
504 			UnlockReleaseBuffer(buffer);
505 	}
506 
507 	/* now handle the original page */
508 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
509 	{
510 		page = BufferGetPage(buffer);
511 
512 		PageIndexTupleDelete(page, xldata->offnumPrefix);
513 		if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
514 						xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
515 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
516 				 prefixTupleHdr.size);
517 
518 		if (xldata->postfixBlkSame)
519 			addOrReplaceTuple(page, (Item) postfixTuple,
520 							  postfixTupleHdr.size,
521 							  xldata->offnumPostfix);
522 
523 		PageSetLSN(page, lsn);
524 		MarkBufferDirty(buffer);
525 	}
526 	if (BufferIsValid(buffer))
527 		UnlockReleaseBuffer(buffer);
528 }
529 
530 static void
spgRedoPickSplit(XLogReaderState * record)531 spgRedoPickSplit(XLogReaderState *record)
532 {
533 	XLogRecPtr	lsn = record->EndRecPtr;
534 	char	   *ptr = XLogRecGetData(record);
535 	spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
536 	char	   *innerTuple;
537 	SpGistInnerTupleData innerTupleHdr;
538 	SpGistState state;
539 	OffsetNumber *toDelete;
540 	OffsetNumber *toInsert;
541 	uint8	   *leafPageSelect;
542 	Buffer		srcBuffer;
543 	Buffer		destBuffer;
544 	Buffer		innerBuffer;
545 	Page		srcPage;
546 	Page		destPage;
547 	Page		page;
548 	int			i;
549 	BlockNumber blknoInner;
550 	XLogRedoAction action;
551 
552 	XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
553 
554 	fillFakeState(&state, xldata->stateSrc);
555 
556 	ptr += SizeOfSpgxlogPickSplit;
557 	toDelete = (OffsetNumber *) ptr;
558 	ptr += sizeof(OffsetNumber) * xldata->nDelete;
559 	toInsert = (OffsetNumber *) ptr;
560 	ptr += sizeof(OffsetNumber) * xldata->nInsert;
561 	leafPageSelect = (uint8 *) ptr;
562 	ptr += sizeof(uint8) * xldata->nInsert;
563 
564 	innerTuple = ptr;
565 	/* the inner tuple is unaligned, so make a copy to access its header */
566 	memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
567 	ptr += innerTupleHdr.size;
568 
569 	/* now ptr points to the list of leaf tuples */
570 
571 	if (xldata->isRootSplit)
572 	{
573 		/* when splitting root, we touch it only in the guise of new inner */
574 		srcBuffer = InvalidBuffer;
575 		srcPage = NULL;
576 	}
577 	else if (xldata->initSrc)
578 	{
579 		/* just re-init the source page */
580 		srcBuffer = XLogInitBufferForRedo(record, 0);
581 		srcPage = (Page) BufferGetPage(srcBuffer);
582 
583 		SpGistInitBuffer(srcBuffer,
584 						 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
585 		/* don't update LSN etc till we're done with it */
586 	}
587 	else
588 	{
589 		/*
590 		 * Delete the specified tuples from source page.  (In case we're in
591 		 * Hot Standby, we need to hold lock on the page till we're done
592 		 * inserting leaf tuples and the new inner tuple, else the added
593 		 * redirect tuple will be a dangling link.)
594 		 */
595 		srcPage = NULL;
596 		if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
597 		{
598 			srcPage = BufferGetPage(srcBuffer);
599 
600 			/*
601 			 * We have it a bit easier here than in doPickSplit(), because we
602 			 * know the inner tuple's location already, so we can inject the
603 			 * correct redirection tuple now.
604 			 */
605 			if (!state.isBuild)
606 				spgPageIndexMultiDelete(&state, srcPage,
607 										toDelete, xldata->nDelete,
608 										SPGIST_REDIRECT,
609 										SPGIST_PLACEHOLDER,
610 										blknoInner,
611 										xldata->offnumInner);
612 			else
613 				spgPageIndexMultiDelete(&state, srcPage,
614 										toDelete, xldata->nDelete,
615 										SPGIST_PLACEHOLDER,
616 										SPGIST_PLACEHOLDER,
617 										InvalidBlockNumber,
618 										InvalidOffsetNumber);
619 
620 			/* don't update LSN etc till we're done with it */
621 		}
622 	}
623 
624 	/* try to access dest page if any */
625 	if (!XLogRecHasBlockRef(record, 1))
626 	{
627 		destBuffer = InvalidBuffer;
628 		destPage = NULL;
629 	}
630 	else if (xldata->initDest)
631 	{
632 		/* just re-init the dest page */
633 		destBuffer = XLogInitBufferForRedo(record, 1);
634 		destPage = (Page) BufferGetPage(destBuffer);
635 
636 		SpGistInitBuffer(destBuffer,
637 						 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
638 		/* don't update LSN etc till we're done with it */
639 	}
640 	else
641 	{
642 		/*
643 		 * We could probably release the page lock immediately in the
644 		 * full-page-image case, but for safety let's hold it till later.
645 		 */
646 		if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
647 			destPage = (Page) BufferGetPage(destBuffer);
648 		else
649 			destPage = NULL;	/* don't do any page updates */
650 	}
651 
652 	/* restore leaf tuples to src and/or dest page */
653 	for (i = 0; i < xldata->nInsert; i++)
654 	{
655 		char	   *leafTuple;
656 		SpGistLeafTupleData leafTupleHdr;
657 
658 		/* the tuples are not aligned, so must copy to access the size field. */
659 		leafTuple = ptr;
660 		memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
661 		ptr += leafTupleHdr.size;
662 
663 		page = leafPageSelect[i] ? destPage : srcPage;
664 		if (page == NULL)
665 			continue;			/* no need to touch this page */
666 
667 		addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
668 						  toInsert[i]);
669 	}
670 
671 	/* Now update src and dest page LSNs if needed */
672 	if (srcPage != NULL)
673 	{
674 		PageSetLSN(srcPage, lsn);
675 		MarkBufferDirty(srcBuffer);
676 	}
677 	if (destPage != NULL)
678 	{
679 		PageSetLSN(destPage, lsn);
680 		MarkBufferDirty(destBuffer);
681 	}
682 
683 	/* restore new inner tuple */
684 	if (xldata->initInner)
685 	{
686 		innerBuffer = XLogInitBufferForRedo(record, 2);
687 		SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
688 		action = BLK_NEEDS_REDO;
689 	}
690 	else
691 		action = XLogReadBufferForRedo(record, 2, &innerBuffer);
692 
693 	if (action == BLK_NEEDS_REDO)
694 	{
695 		page = BufferGetPage(innerBuffer);
696 
697 		addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
698 						  xldata->offnumInner);
699 
700 		/* if inner is also parent, update link while we're here */
701 		if (xldata->innerIsParent)
702 		{
703 			SpGistInnerTuple parent;
704 
705 			parent = (SpGistInnerTuple) PageGetItem(page,
706 													PageGetItemId(page, xldata->offnumParent));
707 			spgUpdateNodeLink(parent, xldata->nodeI,
708 							  blknoInner, xldata->offnumInner);
709 		}
710 
711 		PageSetLSN(page, lsn);
712 		MarkBufferDirty(innerBuffer);
713 	}
714 	if (BufferIsValid(innerBuffer))
715 		UnlockReleaseBuffer(innerBuffer);
716 
717 	/*
718 	 * Now we can release the leaf-page locks.  It's okay to do this before
719 	 * updating the parent downlink.
720 	 */
721 	if (BufferIsValid(srcBuffer))
722 		UnlockReleaseBuffer(srcBuffer);
723 	if (BufferIsValid(destBuffer))
724 		UnlockReleaseBuffer(destBuffer);
725 
726 	/* update parent downlink, unless we did it above */
727 	if (XLogRecHasBlockRef(record, 3))
728 	{
729 		Buffer		parentBuffer;
730 
731 		if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
732 		{
733 			SpGistInnerTuple parent;
734 
735 			page = BufferGetPage(parentBuffer);
736 
737 			parent = (SpGistInnerTuple) PageGetItem(page,
738 													PageGetItemId(page, xldata->offnumParent));
739 			spgUpdateNodeLink(parent, xldata->nodeI,
740 							  blknoInner, xldata->offnumInner);
741 
742 			PageSetLSN(page, lsn);
743 			MarkBufferDirty(parentBuffer);
744 		}
745 		if (BufferIsValid(parentBuffer))
746 			UnlockReleaseBuffer(parentBuffer);
747 	}
748 	else
749 		Assert(xldata->innerIsParent || xldata->isRootSplit);
750 }
751 
752 static void
spgRedoVacuumLeaf(XLogReaderState * record)753 spgRedoVacuumLeaf(XLogReaderState *record)
754 {
755 	XLogRecPtr	lsn = record->EndRecPtr;
756 	char	   *ptr = XLogRecGetData(record);
757 	spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
758 	OffsetNumber *toDead;
759 	OffsetNumber *toPlaceholder;
760 	OffsetNumber *moveSrc;
761 	OffsetNumber *moveDest;
762 	OffsetNumber *chainSrc;
763 	OffsetNumber *chainDest;
764 	SpGistState state;
765 	Buffer		buffer;
766 	Page		page;
767 	int			i;
768 
769 	fillFakeState(&state, xldata->stateSrc);
770 
771 	ptr += SizeOfSpgxlogVacuumLeaf;
772 	toDead = (OffsetNumber *) ptr;
773 	ptr += sizeof(OffsetNumber) * xldata->nDead;
774 	toPlaceholder = (OffsetNumber *) ptr;
775 	ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
776 	moveSrc = (OffsetNumber *) ptr;
777 	ptr += sizeof(OffsetNumber) * xldata->nMove;
778 	moveDest = (OffsetNumber *) ptr;
779 	ptr += sizeof(OffsetNumber) * xldata->nMove;
780 	chainSrc = (OffsetNumber *) ptr;
781 	ptr += sizeof(OffsetNumber) * xldata->nChain;
782 	chainDest = (OffsetNumber *) ptr;
783 
784 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
785 	{
786 		page = BufferGetPage(buffer);
787 
788 		spgPageIndexMultiDelete(&state, page,
789 								toDead, xldata->nDead,
790 								SPGIST_DEAD, SPGIST_DEAD,
791 								InvalidBlockNumber,
792 								InvalidOffsetNumber);
793 
794 		spgPageIndexMultiDelete(&state, page,
795 								toPlaceholder, xldata->nPlaceholder,
796 								SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
797 								InvalidBlockNumber,
798 								InvalidOffsetNumber);
799 
800 		/* see comments in vacuumLeafPage() */
801 		for (i = 0; i < xldata->nMove; i++)
802 		{
803 			ItemId		idSrc = PageGetItemId(page, moveSrc[i]);
804 			ItemId		idDest = PageGetItemId(page, moveDest[i]);
805 			ItemIdData	tmp;
806 
807 			tmp = *idSrc;
808 			*idSrc = *idDest;
809 			*idDest = tmp;
810 		}
811 
812 		spgPageIndexMultiDelete(&state, page,
813 								moveSrc, xldata->nMove,
814 								SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
815 								InvalidBlockNumber,
816 								InvalidOffsetNumber);
817 
818 		for (i = 0; i < xldata->nChain; i++)
819 		{
820 			SpGistLeafTuple lt;
821 
822 			lt = (SpGistLeafTuple) PageGetItem(page,
823 											   PageGetItemId(page, chainSrc[i]));
824 			Assert(lt->tupstate == SPGIST_LIVE);
825 			SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
826 		}
827 
828 		PageSetLSN(page, lsn);
829 		MarkBufferDirty(buffer);
830 	}
831 	if (BufferIsValid(buffer))
832 		UnlockReleaseBuffer(buffer);
833 }
834 
835 static void
spgRedoVacuumRoot(XLogReaderState * record)836 spgRedoVacuumRoot(XLogReaderState *record)
837 {
838 	XLogRecPtr	lsn = record->EndRecPtr;
839 	char	   *ptr = XLogRecGetData(record);
840 	spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
841 	OffsetNumber *toDelete;
842 	Buffer		buffer;
843 	Page		page;
844 
845 	toDelete = xldata->offsets;
846 
847 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
848 	{
849 		page = BufferGetPage(buffer);
850 
851 		/* The tuple numbers are in order */
852 		PageIndexMultiDelete(page, toDelete, xldata->nDelete);
853 
854 		PageSetLSN(page, lsn);
855 		MarkBufferDirty(buffer);
856 	}
857 	if (BufferIsValid(buffer))
858 		UnlockReleaseBuffer(buffer);
859 }
860 
861 static void
spgRedoVacuumRedirect(XLogReaderState * record)862 spgRedoVacuumRedirect(XLogReaderState *record)
863 {
864 	XLogRecPtr	lsn = record->EndRecPtr;
865 	char	   *ptr = XLogRecGetData(record);
866 	spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
867 	OffsetNumber *itemToPlaceholder;
868 	Buffer		buffer;
869 
870 	itemToPlaceholder = xldata->offsets;
871 
872 	/*
873 	 * If any redirection tuples are being removed, make sure there are no
874 	 * live Hot Standby transactions that might need to see them.
875 	 */
876 	if (InHotStandby)
877 	{
878 		if (TransactionIdIsValid(xldata->newestRedirectXid))
879 		{
880 			RelFileNode node;
881 
882 			XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
883 			ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
884 												node);
885 		}
886 	}
887 
888 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
889 	{
890 		Page		page = BufferGetPage(buffer);
891 		SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
892 		int			i;
893 
894 		/* Convert redirect pointers to plain placeholders */
895 		for (i = 0; i < xldata->nToPlaceholder; i++)
896 		{
897 			SpGistDeadTuple dt;
898 
899 			dt = (SpGistDeadTuple) PageGetItem(page,
900 											   PageGetItemId(page, itemToPlaceholder[i]));
901 			Assert(dt->tupstate == SPGIST_REDIRECT);
902 			dt->tupstate = SPGIST_PLACEHOLDER;
903 			ItemPointerSetInvalid(&dt->pointer);
904 		}
905 
906 		Assert(opaque->nRedirection >= xldata->nToPlaceholder);
907 		opaque->nRedirection -= xldata->nToPlaceholder;
908 		opaque->nPlaceholder += xldata->nToPlaceholder;
909 
910 		/* Remove placeholder tuples at end of page */
911 		if (xldata->firstPlaceholder != InvalidOffsetNumber)
912 		{
913 			int			max = PageGetMaxOffsetNumber(page);
914 			OffsetNumber *toDelete;
915 
916 			toDelete = palloc(sizeof(OffsetNumber) * max);
917 
918 			for (i = xldata->firstPlaceholder; i <= max; i++)
919 				toDelete[i - xldata->firstPlaceholder] = i;
920 
921 			i = max - xldata->firstPlaceholder + 1;
922 			Assert(opaque->nPlaceholder >= i);
923 			opaque->nPlaceholder -= i;
924 
925 			/* The array is sorted, so can use PageIndexMultiDelete */
926 			PageIndexMultiDelete(page, toDelete, i);
927 
928 			pfree(toDelete);
929 		}
930 
931 		PageSetLSN(page, lsn);
932 		MarkBufferDirty(buffer);
933 	}
934 	if (BufferIsValid(buffer))
935 		UnlockReleaseBuffer(buffer);
936 }
937 
938 void
spg_redo(XLogReaderState * record)939 spg_redo(XLogReaderState *record)
940 {
941 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
942 	MemoryContext oldCxt;
943 
944 	oldCxt = MemoryContextSwitchTo(opCtx);
945 	switch (info)
946 	{
947 		case XLOG_SPGIST_ADD_LEAF:
948 			spgRedoAddLeaf(record);
949 			break;
950 		case XLOG_SPGIST_MOVE_LEAFS:
951 			spgRedoMoveLeafs(record);
952 			break;
953 		case XLOG_SPGIST_ADD_NODE:
954 			spgRedoAddNode(record);
955 			break;
956 		case XLOG_SPGIST_SPLIT_TUPLE:
957 			spgRedoSplitTuple(record);
958 			break;
959 		case XLOG_SPGIST_PICKSPLIT:
960 			spgRedoPickSplit(record);
961 			break;
962 		case XLOG_SPGIST_VACUUM_LEAF:
963 			spgRedoVacuumLeaf(record);
964 			break;
965 		case XLOG_SPGIST_VACUUM_ROOT:
966 			spgRedoVacuumRoot(record);
967 			break;
968 		case XLOG_SPGIST_VACUUM_REDIRECT:
969 			spgRedoVacuumRedirect(record);
970 			break;
971 		default:
972 			elog(PANIC, "spg_redo: unknown op code %u", info);
973 	}
974 
975 	MemoryContextSwitchTo(oldCxt);
976 	MemoryContextReset(opCtx);
977 }
978 
979 void
spg_xlog_startup(void)980 spg_xlog_startup(void)
981 {
982 	opCtx = AllocSetContextCreate(CurrentMemoryContext,
983 								  "SP-GiST temporary context",
984 								  ALLOCSET_DEFAULT_SIZES);
985 }
986 
987 void
spg_xlog_cleanup(void)988 spg_xlog_cleanup(void)
989 {
990 	MemoryContextDelete(opCtx);
991 	opCtx = NULL;
992 }
993 
994 /*
995  * Mask a SpGist page before performing consistency checks on it.
996  */
997 void
spg_mask(char * pagedata,BlockNumber blkno)998 spg_mask(char *pagedata, BlockNumber blkno)
999 {
1000 	Page		page = (Page) pagedata;
1001 	PageHeader	pagehdr = (PageHeader) page;
1002 
1003 	mask_page_lsn_and_checksum(page);
1004 
1005 	mask_page_hint_bits(page);
1006 
1007 	/*
1008 	 * Mask the unused space, but only if the page's pd_lower appears to have
1009 	 * been set correctly.
1010 	 */
1011 	if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1012 		mask_unused_space(page);
1013 }
1014