1 /*-------------------------------------------------------------------------
2  *
3  * spgvacuum.c
4  *	  vacuum for SP-GiST
5  *
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/spgist/spgvacuum.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/transam.h"
21 #include "access/xloginsert.h"
22 #include "catalog/storage_xlog.h"
23 #include "commands/vacuum.h"
24 #include "miscadmin.h"
25 #include "storage/bufmgr.h"
26 #include "storage/indexfsm.h"
27 #include "storage/lmgr.h"
28 #include "utils/snapmgr.h"
29 
30 
31 /* Entry in pending-list of TIDs we need to revisit */
32 typedef struct spgVacPendingItem
33 {
34 	ItemPointerData tid;		/* redirection target to visit */
35 	bool		done;			/* have we dealt with this? */
36 	struct spgVacPendingItem *next;		/* list link */
37 } spgVacPendingItem;
38 
39 /* Local state for vacuum operations */
40 typedef struct spgBulkDeleteState
41 {
42 	/* Parameters passed in to spgvacuumscan */
43 	IndexVacuumInfo *info;
44 	IndexBulkDeleteResult *stats;
45 	IndexBulkDeleteCallback callback;
46 	void	   *callback_state;
47 
48 	/* Additional working state */
49 	SpGistState spgstate;		/* for SPGiST operations that need one */
50 	spgVacPendingItem *pendingList;		/* TIDs we need to (re)visit */
51 	TransactionId myXmin;		/* for detecting newly-added redirects */
52 	BlockNumber lastFilledBlock;	/* last non-deletable block */
53 } spgBulkDeleteState;
54 
55 
56 /*
57  * Add TID to pendingList, but only if not already present.
58  *
59  * Note that new items are always appended at the end of the list; this
60  * ensures that scans of the list don't miss items added during the scan.
61  */
62 static void
spgAddPendingTID(spgBulkDeleteState * bds,ItemPointer tid)63 spgAddPendingTID(spgBulkDeleteState *bds, ItemPointer tid)
64 {
65 	spgVacPendingItem *pitem;
66 	spgVacPendingItem **listLink;
67 
68 	/* search the list for pre-existing entry */
69 	listLink = &bds->pendingList;
70 	while (*listLink != NULL)
71 	{
72 		pitem = *listLink;
73 		if (ItemPointerEquals(tid, &pitem->tid))
74 			return;				/* already in list, do nothing */
75 		listLink = &pitem->next;
76 	}
77 	/* not there, so append new entry */
78 	pitem = (spgVacPendingItem *) palloc(sizeof(spgVacPendingItem));
79 	pitem->tid = *tid;
80 	pitem->done = false;
81 	pitem->next = NULL;
82 	*listLink = pitem;
83 }
84 
85 /*
86  * Clear pendingList
87  */
88 static void
spgClearPendingList(spgBulkDeleteState * bds)89 spgClearPendingList(spgBulkDeleteState *bds)
90 {
91 	spgVacPendingItem *pitem;
92 	spgVacPendingItem *nitem;
93 
94 	for (pitem = bds->pendingList; pitem != NULL; pitem = nitem)
95 	{
96 		nitem = pitem->next;
97 		/* All items in list should have been dealt with */
98 		Assert(pitem->done);
99 		pfree(pitem);
100 	}
101 	bds->pendingList = NULL;
102 }
103 
104 /*
105  * Vacuum a regular (non-root) leaf page
106  *
107  * We must delete tuples that are targeted for deletion by the VACUUM,
108  * but not move any tuples that are referenced by outside links; we assume
109  * those are the ones that are heads of chains.
110  *
111  * If we find a REDIRECT that was made by a concurrently-running transaction,
112  * we must add its target TID to pendingList.  (We don't try to visit the
113  * target immediately, first because we don't want VACUUM locking more than
114  * one buffer at a time, and second because the duplicate-filtering logic
115  * in spgAddPendingTID is useful to ensure we can't get caught in an infinite
116  * loop in the face of continuous concurrent insertions.)
117  *
118  * If forPending is true, we are examining the page as a consequence of
119  * chasing a redirect link, not as part of the normal sequential scan.
120  * We still vacuum the page normally, but we don't increment the stats
121  * about live tuples; else we'd double-count those tuples, since the page
122  * has been or will be visited in the sequential scan as well.
123  */
124 static void
vacuumLeafPage(spgBulkDeleteState * bds,Relation index,Buffer buffer,bool forPending)125 vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
126 			   bool forPending)
127 {
128 	Page		page = BufferGetPage(buffer);
129 	spgxlogVacuumLeaf xlrec;
130 	OffsetNumber toDead[MaxIndexTuplesPerPage];
131 	OffsetNumber toPlaceholder[MaxIndexTuplesPerPage];
132 	OffsetNumber moveSrc[MaxIndexTuplesPerPage];
133 	OffsetNumber moveDest[MaxIndexTuplesPerPage];
134 	OffsetNumber chainSrc[MaxIndexTuplesPerPage];
135 	OffsetNumber chainDest[MaxIndexTuplesPerPage];
136 	OffsetNumber predecessor[MaxIndexTuplesPerPage + 1];
137 	bool		deletable[MaxIndexTuplesPerPage + 1];
138 	int			nDeletable;
139 	OffsetNumber i,
140 				max = PageGetMaxOffsetNumber(page);
141 
142 	memset(predecessor, 0, sizeof(predecessor));
143 	memset(deletable, 0, sizeof(deletable));
144 	nDeletable = 0;
145 
146 	/* Scan page, identify tuples to delete, accumulate stats */
147 	for (i = FirstOffsetNumber; i <= max; i++)
148 	{
149 		SpGistLeafTuple lt;
150 
151 		lt = (SpGistLeafTuple) PageGetItem(page,
152 										   PageGetItemId(page, i));
153 		if (lt->tupstate == SPGIST_LIVE)
154 		{
155 			Assert(ItemPointerIsValid(&lt->heapPtr));
156 
157 			if (bds->callback(&lt->heapPtr, bds->callback_state))
158 			{
159 				bds->stats->tuples_removed += 1;
160 				deletable[i] = true;
161 				nDeletable++;
162 			}
163 			else
164 			{
165 				if (!forPending)
166 					bds->stats->num_index_tuples += 1;
167 			}
168 
169 			/* Form predecessor map, too */
170 			if (lt->nextOffset != InvalidOffsetNumber)
171 			{
172 				/* paranoia about corrupted chain links */
173 				if (lt->nextOffset < FirstOffsetNumber ||
174 					lt->nextOffset > max ||
175 					predecessor[lt->nextOffset] != InvalidOffsetNumber)
176 					elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
177 						 BufferGetBlockNumber(buffer),
178 						 RelationGetRelationName(index));
179 				predecessor[lt->nextOffset] = i;
180 			}
181 		}
182 		else if (lt->tupstate == SPGIST_REDIRECT)
183 		{
184 			SpGistDeadTuple dt = (SpGistDeadTuple) lt;
185 
186 			Assert(dt->nextOffset == InvalidOffsetNumber);
187 			Assert(ItemPointerIsValid(&dt->pointer));
188 
189 			/*
190 			 * Add target TID to pending list if the redirection could have
191 			 * happened since VACUUM started.
192 			 *
193 			 * Note: we could make a tighter test by seeing if the xid is
194 			 * "running" according to the active snapshot; but tqual.c doesn't
195 			 * currently export a suitable API, and it's not entirely clear
196 			 * that a tighter test is worth the cycles anyway.
197 			 */
198 			if (TransactionIdFollowsOrEquals(dt->xid, bds->myXmin))
199 				spgAddPendingTID(bds, &dt->pointer);
200 		}
201 		else
202 		{
203 			Assert(lt->nextOffset == InvalidOffsetNumber);
204 		}
205 	}
206 
207 	if (nDeletable == 0)
208 		return;					/* nothing more to do */
209 
210 	/*----------
211 	 * Figure out exactly what we have to do.  We do this separately from
212 	 * actually modifying the page, mainly so that we have a representation
213 	 * that can be dumped into WAL and then the replay code can do exactly
214 	 * the same thing.  The output of this step consists of six arrays
215 	 * describing four kinds of operations, to be performed in this order:
216 	 *
217 	 * toDead[]: tuple numbers to be replaced with DEAD tuples
218 	 * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples
219 	 * moveSrc[]: tuple numbers that need to be relocated to another offset
220 	 * (replacing the tuple there) and then replaced with PLACEHOLDER tuples
221 	 * moveDest[]: new locations for moveSrc tuples
222 	 * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates
223 	 * chainDest[]: new values of nextOffset for chainSrc members
224 	 *
225 	 * It's easiest to figure out what we have to do by processing tuple
226 	 * chains, so we iterate over all the tuples (not just the deletable
227 	 * ones!) to identify chain heads, then chase down each chain and make
228 	 * work item entries for deletable tuples within the chain.
229 	 *----------
230 	 */
231 	xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0;
232 
233 	for (i = FirstOffsetNumber; i <= max; i++)
234 	{
235 		SpGistLeafTuple head;
236 		bool		interveningDeletable;
237 		OffsetNumber prevLive;
238 		OffsetNumber j;
239 
240 		head = (SpGistLeafTuple) PageGetItem(page,
241 											 PageGetItemId(page, i));
242 		if (head->tupstate != SPGIST_LIVE)
243 			continue;			/* can't be a chain member */
244 		if (predecessor[i] != 0)
245 			continue;			/* not a chain head */
246 
247 		/* initialize ... */
248 		interveningDeletable = false;
249 		prevLive = deletable[i] ? InvalidOffsetNumber : i;
250 
251 		/* scan down the chain ... */
252 		j = head->nextOffset;
253 		while (j != InvalidOffsetNumber)
254 		{
255 			SpGistLeafTuple lt;
256 
257 			lt = (SpGistLeafTuple) PageGetItem(page,
258 											   PageGetItemId(page, j));
259 			if (lt->tupstate != SPGIST_LIVE)
260 			{
261 				/* all tuples in chain should be live */
262 				elog(ERROR, "unexpected SPGiST tuple state: %d",
263 					 lt->tupstate);
264 			}
265 
266 			if (deletable[j])
267 			{
268 				/* This tuple should be replaced by a placeholder */
269 				toPlaceholder[xlrec.nPlaceholder] = j;
270 				xlrec.nPlaceholder++;
271 				/* previous live tuple's chain link will need an update */
272 				interveningDeletable = true;
273 			}
274 			else if (prevLive == InvalidOffsetNumber)
275 			{
276 				/*
277 				 * This is the first live tuple in the chain.  It has to move
278 				 * to the head position.
279 				 */
280 				moveSrc[xlrec.nMove] = j;
281 				moveDest[xlrec.nMove] = i;
282 				xlrec.nMove++;
283 				/* Chain updates will be applied after the move */
284 				prevLive = i;
285 				interveningDeletable = false;
286 			}
287 			else
288 			{
289 				/*
290 				 * Second or later live tuple.  Arrange to re-chain it to the
291 				 * previous live one, if there was a gap.
292 				 */
293 				if (interveningDeletable)
294 				{
295 					chainSrc[xlrec.nChain] = prevLive;
296 					chainDest[xlrec.nChain] = j;
297 					xlrec.nChain++;
298 				}
299 				prevLive = j;
300 				interveningDeletable = false;
301 			}
302 
303 			j = lt->nextOffset;
304 		}
305 
306 		if (prevLive == InvalidOffsetNumber)
307 		{
308 			/* The chain is entirely removable, so we need a DEAD tuple */
309 			toDead[xlrec.nDead] = i;
310 			xlrec.nDead++;
311 		}
312 		else if (interveningDeletable)
313 		{
314 			/* One or more deletions at end of chain, so close it off */
315 			chainSrc[xlrec.nChain] = prevLive;
316 			chainDest[xlrec.nChain] = InvalidOffsetNumber;
317 			xlrec.nChain++;
318 		}
319 	}
320 
321 	/* sanity check ... */
322 	if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove)
323 		elog(ERROR, "inconsistent counts of deletable tuples");
324 
325 	/* Do the updates */
326 	START_CRIT_SECTION();
327 
328 	spgPageIndexMultiDelete(&bds->spgstate, page,
329 							toDead, xlrec.nDead,
330 							SPGIST_DEAD, SPGIST_DEAD,
331 							InvalidBlockNumber, InvalidOffsetNumber);
332 
333 	spgPageIndexMultiDelete(&bds->spgstate, page,
334 							toPlaceholder, xlrec.nPlaceholder,
335 							SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
336 							InvalidBlockNumber, InvalidOffsetNumber);
337 
338 	/*
339 	 * We implement the move step by swapping the item pointers of the source
340 	 * and target tuples, then replacing the newly-source tuples with
341 	 * placeholders.  This is perhaps unduly friendly with the page data
342 	 * representation, but it's fast and doesn't risk page overflow when a
343 	 * tuple to be relocated is large.
344 	 */
345 	for (i = 0; i < xlrec.nMove; i++)
346 	{
347 		ItemId		idSrc = PageGetItemId(page, moveSrc[i]);
348 		ItemId		idDest = PageGetItemId(page, moveDest[i]);
349 		ItemIdData	tmp;
350 
351 		tmp = *idSrc;
352 		*idSrc = *idDest;
353 		*idDest = tmp;
354 	}
355 
356 	spgPageIndexMultiDelete(&bds->spgstate, page,
357 							moveSrc, xlrec.nMove,
358 							SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
359 							InvalidBlockNumber, InvalidOffsetNumber);
360 
361 	for (i = 0; i < xlrec.nChain; i++)
362 	{
363 		SpGistLeafTuple lt;
364 
365 		lt = (SpGistLeafTuple) PageGetItem(page,
366 										   PageGetItemId(page, chainSrc[i]));
367 		Assert(lt->tupstate == SPGIST_LIVE);
368 		lt->nextOffset = chainDest[i];
369 	}
370 
371 	MarkBufferDirty(buffer);
372 
373 	if (RelationNeedsWAL(index))
374 	{
375 		XLogRecPtr	recptr;
376 
377 		XLogBeginInsert();
378 
379 		STORE_STATE(&bds->spgstate, xlrec.stateSrc);
380 
381 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumLeaf);
382 		/* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
383 		XLogRegisterData((char *) toDead, sizeof(OffsetNumber) * xlrec.nDead);
384 		XLogRegisterData((char *) toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder);
385 		XLogRegisterData((char *) moveSrc, sizeof(OffsetNumber) * xlrec.nMove);
386 		XLogRegisterData((char *) moveDest, sizeof(OffsetNumber) * xlrec.nMove);
387 		XLogRegisterData((char *) chainSrc, sizeof(OffsetNumber) * xlrec.nChain);
388 		XLogRegisterData((char *) chainDest, sizeof(OffsetNumber) * xlrec.nChain);
389 
390 		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
391 
392 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF);
393 
394 		PageSetLSN(page, recptr);
395 	}
396 
397 	END_CRIT_SECTION();
398 }
399 
400 /*
401  * Vacuum a root page when it is also a leaf
402  *
403  * On the root, we just delete any dead leaf tuples; no fancy business
404  */
405 static void
vacuumLeafRoot(spgBulkDeleteState * bds,Relation index,Buffer buffer)406 vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
407 {
408 	Page		page = BufferGetPage(buffer);
409 	spgxlogVacuumRoot xlrec;
410 	OffsetNumber toDelete[MaxIndexTuplesPerPage];
411 	OffsetNumber i,
412 				max = PageGetMaxOffsetNumber(page);
413 
414 	xlrec.nDelete = 0;
415 
416 	/* Scan page, identify tuples to delete, accumulate stats */
417 	for (i = FirstOffsetNumber; i <= max; i++)
418 	{
419 		SpGistLeafTuple lt;
420 
421 		lt = (SpGistLeafTuple) PageGetItem(page,
422 										   PageGetItemId(page, i));
423 		if (lt->tupstate == SPGIST_LIVE)
424 		{
425 			Assert(ItemPointerIsValid(&lt->heapPtr));
426 
427 			if (bds->callback(&lt->heapPtr, bds->callback_state))
428 			{
429 				bds->stats->tuples_removed += 1;
430 				toDelete[xlrec.nDelete] = i;
431 				xlrec.nDelete++;
432 			}
433 			else
434 			{
435 				bds->stats->num_index_tuples += 1;
436 			}
437 		}
438 		else
439 		{
440 			/* all tuples on root should be live */
441 			elog(ERROR, "unexpected SPGiST tuple state: %d",
442 				 lt->tupstate);
443 		}
444 	}
445 
446 	if (xlrec.nDelete == 0)
447 		return;					/* nothing more to do */
448 
449 	/* Do the update */
450 	START_CRIT_SECTION();
451 
452 	/* The tuple numbers are in order, so we can use PageIndexMultiDelete */
453 	PageIndexMultiDelete(page, toDelete, xlrec.nDelete);
454 
455 	MarkBufferDirty(buffer);
456 
457 	if (RelationNeedsWAL(index))
458 	{
459 		XLogRecPtr	recptr;
460 
461 		XLogBeginInsert();
462 
463 		/* Prepare WAL record */
464 		STORE_STATE(&bds->spgstate, xlrec.stateSrc);
465 
466 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRoot);
467 		/* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
468 		XLogRegisterData((char *) toDelete,
469 						 sizeof(OffsetNumber) * xlrec.nDelete);
470 
471 		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
472 
473 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT);
474 
475 		PageSetLSN(page, recptr);
476 	}
477 
478 	END_CRIT_SECTION();
479 }
480 
481 /*
482  * Clean up redirect and placeholder tuples on the given page
483  *
484  * Redirect tuples can be marked placeholder once they're old enough.
485  * Placeholder tuples can be removed if it won't change the offsets of
486  * non-placeholder ones.
487  *
488  * Unlike the routines above, this works on both leaf and inner pages.
489  */
490 static void
vacuumRedirectAndPlaceholder(Relation index,Buffer buffer)491 vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
492 {
493 	Page		page = BufferGetPage(buffer);
494 	SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
495 	OffsetNumber i,
496 				max = PageGetMaxOffsetNumber(page),
497 				firstPlaceholder = InvalidOffsetNumber;
498 	bool		hasNonPlaceholder = false;
499 	bool		hasUpdate = false;
500 	OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage];
501 	OffsetNumber itemnos[MaxIndexTuplesPerPage];
502 	spgxlogVacuumRedirect xlrec;
503 
504 	xlrec.nToPlaceholder = 0;
505 	xlrec.newestRedirectXid = InvalidTransactionId;
506 
507 	START_CRIT_SECTION();
508 
509 	/*
510 	 * Scan backwards to convert old redirection tuples to placeholder tuples,
511 	 * and identify location of last non-placeholder tuple while at it.
512 	 */
513 	for (i = max;
514 		 i >= FirstOffsetNumber &&
515 		 (opaque->nRedirection > 0 || !hasNonPlaceholder);
516 		 i--)
517 	{
518 		SpGistDeadTuple dt;
519 
520 		dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i));
521 
522 		if (dt->tupstate == SPGIST_REDIRECT &&
523 			TransactionIdPrecedes(dt->xid, RecentGlobalXmin))
524 		{
525 			dt->tupstate = SPGIST_PLACEHOLDER;
526 			Assert(opaque->nRedirection > 0);
527 			opaque->nRedirection--;
528 			opaque->nPlaceholder++;
529 
530 			/* remember newest XID among the removed redirects */
531 			if (!TransactionIdIsValid(xlrec.newestRedirectXid) ||
532 				TransactionIdPrecedes(xlrec.newestRedirectXid, dt->xid))
533 				xlrec.newestRedirectXid = dt->xid;
534 
535 			ItemPointerSetInvalid(&dt->pointer);
536 
537 			itemToPlaceholder[xlrec.nToPlaceholder] = i;
538 			xlrec.nToPlaceholder++;
539 
540 			hasUpdate = true;
541 		}
542 
543 		if (dt->tupstate == SPGIST_PLACEHOLDER)
544 		{
545 			if (!hasNonPlaceholder)
546 				firstPlaceholder = i;
547 		}
548 		else
549 		{
550 			hasNonPlaceholder = true;
551 		}
552 	}
553 
554 	/*
555 	 * Any placeholder tuples at the end of page can safely be removed.  We
556 	 * can't remove ones before the last non-placeholder, though, because we
557 	 * can't alter the offset numbers of non-placeholder tuples.
558 	 */
559 	if (firstPlaceholder != InvalidOffsetNumber)
560 	{
561 		/*
562 		 * We do not store this array to rdata because it's easy to recreate.
563 		 */
564 		for (i = firstPlaceholder; i <= max; i++)
565 			itemnos[i - firstPlaceholder] = i;
566 
567 		i = max - firstPlaceholder + 1;
568 		Assert(opaque->nPlaceholder >= i);
569 		opaque->nPlaceholder -= i;
570 
571 		/* The array is surely sorted, so can use PageIndexMultiDelete */
572 		PageIndexMultiDelete(page, itemnos, i);
573 
574 		hasUpdate = true;
575 	}
576 
577 	xlrec.firstPlaceholder = firstPlaceholder;
578 
579 	if (hasUpdate)
580 		MarkBufferDirty(buffer);
581 
582 	if (hasUpdate && RelationNeedsWAL(index))
583 	{
584 		XLogRecPtr	recptr;
585 
586 		XLogBeginInsert();
587 
588 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRedirect);
589 		XLogRegisterData((char *) itemToPlaceholder,
590 						 sizeof(OffsetNumber) * xlrec.nToPlaceholder);
591 
592 		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
593 
594 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT);
595 
596 		PageSetLSN(page, recptr);
597 	}
598 
599 	END_CRIT_SECTION();
600 }
601 
602 /*
603  * Process one page during a bulkdelete scan
604  */
605 static void
spgvacuumpage(spgBulkDeleteState * bds,BlockNumber blkno)606 spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
607 {
608 	Relation	index = bds->info->index;
609 	Buffer		buffer;
610 	Page		page;
611 
612 	/* call vacuum_delay_point while not holding any buffer lock */
613 	vacuum_delay_point();
614 
615 	buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
616 								RBM_NORMAL, bds->info->strategy);
617 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
618 	page = (Page) BufferGetPage(buffer);
619 
620 	if (PageIsNew(page))
621 	{
622 		/*
623 		 * We found an all-zero page, which could happen if the database
624 		 * crashed just after extending the file.  Recycle it.
625 		 */
626 	}
627 	else if (PageIsEmpty(page))
628 	{
629 		/* nothing to do */
630 	}
631 	else if (SpGistPageIsLeaf(page))
632 	{
633 		if (SpGistBlockIsRoot(blkno))
634 		{
635 			vacuumLeafRoot(bds, index, buffer);
636 			/* no need for vacuumRedirectAndPlaceholder */
637 		}
638 		else
639 		{
640 			vacuumLeafPage(bds, index, buffer, false);
641 			vacuumRedirectAndPlaceholder(index, buffer);
642 		}
643 	}
644 	else
645 	{
646 		/* inner page */
647 		vacuumRedirectAndPlaceholder(index, buffer);
648 	}
649 
650 	/*
651 	 * The root pages must never be deleted, nor marked as available in FSM,
652 	 * because we don't want them ever returned by a search for a place to put
653 	 * a new tuple.  Otherwise, check for empty page, and make sure the FSM
654 	 * knows about it.
655 	 */
656 	if (!SpGistBlockIsRoot(blkno))
657 	{
658 		if (PageIsNew(page) || PageIsEmpty(page))
659 		{
660 			RecordFreeIndexPage(index, blkno);
661 			bds->stats->pages_deleted++;
662 		}
663 		else
664 		{
665 			SpGistSetLastUsedPage(index, buffer);
666 			bds->lastFilledBlock = blkno;
667 		}
668 	}
669 
670 	UnlockReleaseBuffer(buffer);
671 }
672 
673 /*
674  * Process the pending-TID list between pages of the main scan
675  */
676 static void
spgprocesspending(spgBulkDeleteState * bds)677 spgprocesspending(spgBulkDeleteState *bds)
678 {
679 	Relation	index = bds->info->index;
680 	spgVacPendingItem *pitem;
681 	spgVacPendingItem *nitem;
682 	BlockNumber blkno;
683 	Buffer		buffer;
684 	Page		page;
685 
686 	for (pitem = bds->pendingList; pitem != NULL; pitem = pitem->next)
687 	{
688 		if (pitem->done)
689 			continue;			/* ignore already-done items */
690 
691 		/* call vacuum_delay_point while not holding any buffer lock */
692 		vacuum_delay_point();
693 
694 		/* examine the referenced page */
695 		blkno = ItemPointerGetBlockNumber(&pitem->tid);
696 		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
697 									RBM_NORMAL, bds->info->strategy);
698 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
699 		page = (Page) BufferGetPage(buffer);
700 
701 		if (PageIsNew(page) || SpGistPageIsDeleted(page))
702 		{
703 			/* Probably shouldn't happen, but ignore it */
704 		}
705 		else if (SpGistPageIsLeaf(page))
706 		{
707 			if (SpGistBlockIsRoot(blkno))
708 			{
709 				/* this should definitely not happen */
710 				elog(ERROR, "redirection leads to root page of index \"%s\"",
711 					 RelationGetRelationName(index));
712 			}
713 
714 			/* deal with any deletable tuples */
715 			vacuumLeafPage(bds, index, buffer, true);
716 			/* might as well do this while we are here */
717 			vacuumRedirectAndPlaceholder(index, buffer);
718 
719 			SpGistSetLastUsedPage(index, buffer);
720 
721 			/*
722 			 * We can mark as done not only this item, but any later ones
723 			 * pointing at the same page, since we vacuumed the whole page.
724 			 */
725 			pitem->done = true;
726 			for (nitem = pitem->next; nitem != NULL; nitem = nitem->next)
727 			{
728 				if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
729 					nitem->done = true;
730 			}
731 		}
732 		else
733 		{
734 			/*
735 			 * On an inner page, visit the referenced inner tuple and add all
736 			 * its downlinks to the pending list.  We might have pending items
737 			 * for more than one inner tuple on the same page (in fact this is
738 			 * pretty likely given the way space allocation works), so get
739 			 * them all while we are here.
740 			 */
741 			for (nitem = pitem; nitem != NULL; nitem = nitem->next)
742 			{
743 				if (nitem->done)
744 					continue;
745 				if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
746 				{
747 					OffsetNumber offset;
748 					SpGistInnerTuple innerTuple;
749 
750 					offset = ItemPointerGetOffsetNumber(&nitem->tid);
751 					innerTuple = (SpGistInnerTuple) PageGetItem(page,
752 												PageGetItemId(page, offset));
753 					if (innerTuple->tupstate == SPGIST_LIVE)
754 					{
755 						SpGistNodeTuple node;
756 						int			i;
757 
758 						SGITITERATE(innerTuple, i, node)
759 						{
760 							if (ItemPointerIsValid(&node->t_tid))
761 								spgAddPendingTID(bds, &node->t_tid);
762 						}
763 					}
764 					else if (innerTuple->tupstate == SPGIST_REDIRECT)
765 					{
766 						/* transfer attention to redirect point */
767 						spgAddPendingTID(bds,
768 								   &((SpGistDeadTuple) innerTuple)->pointer);
769 					}
770 					else
771 						elog(ERROR, "unexpected SPGiST tuple state: %d",
772 							 innerTuple->tupstate);
773 
774 					nitem->done = true;
775 				}
776 			}
777 		}
778 
779 		UnlockReleaseBuffer(buffer);
780 	}
781 
782 	spgClearPendingList(bds);
783 }
784 
785 /*
786  * Perform a bulkdelete scan
787  */
788 static void
spgvacuumscan(spgBulkDeleteState * bds)789 spgvacuumscan(spgBulkDeleteState *bds)
790 {
791 	Relation	index = bds->info->index;
792 	bool		needLock;
793 	BlockNumber num_pages,
794 				blkno;
795 
796 	/* Finish setting up spgBulkDeleteState */
797 	initSpGistState(&bds->spgstate, index);
798 	bds->pendingList = NULL;
799 	bds->myXmin = GetActiveSnapshot()->xmin;
800 	bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO;
801 
802 	/*
803 	 * Reset counts that will be incremented during the scan; needed in case
804 	 * of multiple scans during a single VACUUM command
805 	 */
806 	bds->stats->estimated_count = false;
807 	bds->stats->num_index_tuples = 0;
808 	bds->stats->pages_deleted = 0;
809 
810 	/* We can skip locking for new or temp relations */
811 	needLock = !RELATION_IS_LOCAL(index);
812 
813 	/*
814 	 * The outer loop iterates over all index pages except the metapage, in
815 	 * physical order (we hope the kernel will cooperate in providing
816 	 * read-ahead for speed).  It is critical that we visit all leaf pages,
817 	 * including ones added after we start the scan, else we might fail to
818 	 * delete some deletable tuples.  See more extensive comments about this
819 	 * in btvacuumscan().
820 	 */
821 	blkno = SPGIST_METAPAGE_BLKNO + 1;
822 	for (;;)
823 	{
824 		/* Get the current relation length */
825 		if (needLock)
826 			LockRelationForExtension(index, ExclusiveLock);
827 		num_pages = RelationGetNumberOfBlocks(index);
828 		if (needLock)
829 			UnlockRelationForExtension(index, ExclusiveLock);
830 
831 		/* Quit if we've scanned the whole relation */
832 		if (blkno >= num_pages)
833 			break;
834 		/* Iterate over pages, then loop back to recheck length */
835 		for (; blkno < num_pages; blkno++)
836 		{
837 			spgvacuumpage(bds, blkno);
838 			/* empty the pending-list after each page */
839 			if (bds->pendingList != NULL)
840 				spgprocesspending(bds);
841 		}
842 	}
843 
844 	/* Propagate local lastUsedPage cache to metablock */
845 	SpGistUpdateMetaPage(index);
846 
847 	/*
848 	 * Truncate index if possible
849 	 *
850 	 * XXX disabled because it's unsafe due to possible concurrent inserts.
851 	 * We'd have to rescan the pages to make sure they're still empty, and it
852 	 * doesn't seem worth it.  Note that btree doesn't do this either.
853 	 *
854 	 * Another reason not to truncate is that it could invalidate the cached
855 	 * pages-with-freespace pointers in the metapage and other backends'
856 	 * relation caches, that is leave them pointing to nonexistent pages.
857 	 * Adding RelationGetNumberOfBlocks calls to protect the places that use
858 	 * those pointers would be unduly expensive.
859 	 */
860 #ifdef NOT_USED
861 	if (num_pages > bds->lastFilledBlock + 1)
862 	{
863 		BlockNumber lastBlock = num_pages - 1;
864 
865 		num_pages = bds->lastFilledBlock + 1;
866 		RelationTruncate(index, num_pages);
867 		bds->stats->pages_removed += lastBlock - bds->lastFilledBlock;
868 		bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock;
869 	}
870 #endif
871 
872 	/* Report final stats */
873 	bds->stats->num_pages = num_pages;
874 	bds->stats->pages_free = bds->stats->pages_deleted;
875 }
876 
877 /*
878  * Bulk deletion of all index entries pointing to a set of heap tuples.
879  * The set of target tuples is specified via a callback routine that tells
880  * whether any given heap tuple (identified by ItemPointer) is being deleted.
881  *
882  * Result: a palloc'd struct containing statistical info for VACUUM displays.
883  */
884 IndexBulkDeleteResult *
spgbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)885 spgbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
886 			  IndexBulkDeleteCallback callback, void *callback_state)
887 {
888 	spgBulkDeleteState bds;
889 
890 	/* allocate stats if first time through, else re-use existing struct */
891 	if (stats == NULL)
892 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
893 	bds.info = info;
894 	bds.stats = stats;
895 	bds.callback = callback;
896 	bds.callback_state = callback_state;
897 
898 	spgvacuumscan(&bds);
899 
900 	return stats;
901 }
902 
903 /* Dummy callback to delete no tuples during spgvacuumcleanup */
904 static bool
dummy_callback(ItemPointer itemptr,void * state)905 dummy_callback(ItemPointer itemptr, void *state)
906 {
907 	return false;
908 }
909 
910 /*
911  * Post-VACUUM cleanup.
912  *
913  * Result: a palloc'd struct containing statistical info for VACUUM displays.
914  */
915 IndexBulkDeleteResult *
spgvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)916 spgvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
917 {
918 	Relation	index = info->index;
919 	spgBulkDeleteState bds;
920 
921 	/* No-op in ANALYZE ONLY mode */
922 	if (info->analyze_only)
923 		return stats;
924 
925 	/*
926 	 * We don't need to scan the index if there was a preceding bulkdelete
927 	 * pass.  Otherwise, make a pass that won't delete any live tuples, but
928 	 * might still accomplish useful stuff with redirect/placeholder cleanup,
929 	 * and in any case will provide stats.
930 	 */
931 	if (stats == NULL)
932 	{
933 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
934 		bds.info = info;
935 		bds.stats = stats;
936 		bds.callback = dummy_callback;
937 		bds.callback_state = NULL;
938 
939 		spgvacuumscan(&bds);
940 	}
941 
942 	/* Finally, vacuum the FSM */
943 	IndexFreeSpaceMapVacuum(index);
944 
945 	/*
946 	 * It's quite possible for us to be fooled by concurrent tuple moves into
947 	 * double-counting some index tuples, so disbelieve any total that exceeds
948 	 * the underlying heap's count ... if we know that accurately.  Otherwise
949 	 * this might just make matters worse.
950 	 */
951 	if (!info->estimated_count)
952 	{
953 		if (stats->num_index_tuples > info->num_heap_tuples)
954 			stats->num_index_tuples = info->num_heap_tuples;
955 	}
956 
957 	return stats;
958 }
959