1 /*-------------------------------------------------------------------------
2  *
3  * spgvacuum.c
4  *	  vacuum for SP-GiST
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/spgist/spgvacuum.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/spgxlog.h"
21 #include "access/transam.h"
22 #include "access/xloginsert.h"
23 #include "catalog/storage_xlog.h"
24 #include "commands/vacuum.h"
25 #include "miscadmin.h"
26 #include "storage/bufmgr.h"
27 #include "storage/indexfsm.h"
28 #include "storage/lmgr.h"
29 #include "utils/snapmgr.h"
30 
31 
32 /* Entry in pending-list of TIDs we need to revisit */
33 typedef struct spgVacPendingItem
34 {
35 	ItemPointerData tid;		/* redirection target to visit */
36 	bool		done;			/* have we dealt with this? */
37 	struct spgVacPendingItem *next; /* list link */
38 } spgVacPendingItem;
39 
40 /* Local state for vacuum operations */
41 typedef struct spgBulkDeleteState
42 {
43 	/* Parameters passed in to spgvacuumscan */
44 	IndexVacuumInfo *info;
45 	IndexBulkDeleteResult *stats;
46 	IndexBulkDeleteCallback callback;
47 	void	   *callback_state;
48 
49 	/* Additional working state */
50 	SpGistState spgstate;		/* for SPGiST operations that need one */
51 	spgVacPendingItem *pendingList; /* TIDs we need to (re)visit */
52 	TransactionId myXmin;		/* for detecting newly-added redirects */
53 	BlockNumber lastFilledBlock;	/* last non-deletable block */
54 } spgBulkDeleteState;
55 
56 
57 /*
58  * Add TID to pendingList, but only if not already present.
59  *
60  * Note that new items are always appended at the end of the list; this
61  * ensures that scans of the list don't miss items added during the scan.
62  */
63 static void
spgAddPendingTID(spgBulkDeleteState * bds,ItemPointer tid)64 spgAddPendingTID(spgBulkDeleteState *bds, ItemPointer tid)
65 {
66 	spgVacPendingItem *pitem;
67 	spgVacPendingItem **listLink;
68 
69 	/* search the list for pre-existing entry */
70 	listLink = &bds->pendingList;
71 	while (*listLink != NULL)
72 	{
73 		pitem = *listLink;
74 		if (ItemPointerEquals(tid, &pitem->tid))
75 			return;				/* already in list, do nothing */
76 		listLink = &pitem->next;
77 	}
78 	/* not there, so append new entry */
79 	pitem = (spgVacPendingItem *) palloc(sizeof(spgVacPendingItem));
80 	pitem->tid = *tid;
81 	pitem->done = false;
82 	pitem->next = NULL;
83 	*listLink = pitem;
84 }
85 
86 /*
87  * Clear pendingList
88  */
89 static void
spgClearPendingList(spgBulkDeleteState * bds)90 spgClearPendingList(spgBulkDeleteState *bds)
91 {
92 	spgVacPendingItem *pitem;
93 	spgVacPendingItem *nitem;
94 
95 	for (pitem = bds->pendingList; pitem != NULL; pitem = nitem)
96 	{
97 		nitem = pitem->next;
98 		/* All items in list should have been dealt with */
99 		Assert(pitem->done);
100 		pfree(pitem);
101 	}
102 	bds->pendingList = NULL;
103 }
104 
105 /*
106  * Vacuum a regular (non-root) leaf page
107  *
108  * We must delete tuples that are targeted for deletion by the VACUUM,
109  * but not move any tuples that are referenced by outside links; we assume
110  * those are the ones that are heads of chains.
111  *
112  * If we find a REDIRECT that was made by a concurrently-running transaction,
113  * we must add its target TID to pendingList.  (We don't try to visit the
114  * target immediately, first because we don't want VACUUM locking more than
115  * one buffer at a time, and second because the duplicate-filtering logic
116  * in spgAddPendingTID is useful to ensure we can't get caught in an infinite
117  * loop in the face of continuous concurrent insertions.)
118  *
119  * If forPending is true, we are examining the page as a consequence of
120  * chasing a redirect link, not as part of the normal sequential scan.
121  * We still vacuum the page normally, but we don't increment the stats
122  * about live tuples; else we'd double-count those tuples, since the page
123  * has been or will be visited in the sequential scan as well.
124  */
125 static void
vacuumLeafPage(spgBulkDeleteState * bds,Relation index,Buffer buffer,bool forPending)126 vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
127 			   bool forPending)
128 {
129 	Page		page = BufferGetPage(buffer);
130 	spgxlogVacuumLeaf xlrec;
131 	OffsetNumber toDead[MaxIndexTuplesPerPage];
132 	OffsetNumber toPlaceholder[MaxIndexTuplesPerPage];
133 	OffsetNumber moveSrc[MaxIndexTuplesPerPage];
134 	OffsetNumber moveDest[MaxIndexTuplesPerPage];
135 	OffsetNumber chainSrc[MaxIndexTuplesPerPage];
136 	OffsetNumber chainDest[MaxIndexTuplesPerPage];
137 	OffsetNumber predecessor[MaxIndexTuplesPerPage + 1];
138 	bool		deletable[MaxIndexTuplesPerPage + 1];
139 	int			nDeletable;
140 	OffsetNumber i,
141 				max = PageGetMaxOffsetNumber(page);
142 
143 	memset(predecessor, 0, sizeof(predecessor));
144 	memset(deletable, 0, sizeof(deletable));
145 	nDeletable = 0;
146 
147 	/* Scan page, identify tuples to delete, accumulate stats */
148 	for (i = FirstOffsetNumber; i <= max; i++)
149 	{
150 		SpGistLeafTuple lt;
151 
152 		lt = (SpGistLeafTuple) PageGetItem(page,
153 										   PageGetItemId(page, i));
154 		if (lt->tupstate == SPGIST_LIVE)
155 		{
156 			Assert(ItemPointerIsValid(&lt->heapPtr));
157 
158 			if (bds->callback(&lt->heapPtr, bds->callback_state))
159 			{
160 				bds->stats->tuples_removed += 1;
161 				deletable[i] = true;
162 				nDeletable++;
163 			}
164 			else
165 			{
166 				if (!forPending)
167 					bds->stats->num_index_tuples += 1;
168 			}
169 
170 			/* Form predecessor map, too */
171 			if (lt->nextOffset != InvalidOffsetNumber)
172 			{
173 				/* paranoia about corrupted chain links */
174 				if (lt->nextOffset < FirstOffsetNumber ||
175 					lt->nextOffset > max ||
176 					predecessor[lt->nextOffset] != InvalidOffsetNumber)
177 					elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
178 						 BufferGetBlockNumber(buffer),
179 						 RelationGetRelationName(index));
180 				predecessor[lt->nextOffset] = i;
181 			}
182 		}
183 		else if (lt->tupstate == SPGIST_REDIRECT)
184 		{
185 			SpGistDeadTuple dt = (SpGistDeadTuple) lt;
186 
187 			Assert(dt->nextOffset == InvalidOffsetNumber);
188 			Assert(ItemPointerIsValid(&dt->pointer));
189 
190 			/*
191 			 * Add target TID to pending list if the redirection could have
192 			 * happened since VACUUM started.
193 			 *
194 			 * Note: we could make a tighter test by seeing if the xid is
195 			 * "running" according to the active snapshot; but snapmgr.c
196 			 * doesn't currently export a suitable API, and it's not entirely
197 			 * clear that a tighter test is worth the cycles anyway.
198 			 */
199 			if (TransactionIdFollowsOrEquals(dt->xid, bds->myXmin))
200 				spgAddPendingTID(bds, &dt->pointer);
201 		}
202 		else
203 		{
204 			Assert(lt->nextOffset == InvalidOffsetNumber);
205 		}
206 	}
207 
208 	if (nDeletable == 0)
209 		return;					/* nothing more to do */
210 
211 	/*----------
212 	 * Figure out exactly what we have to do.  We do this separately from
213 	 * actually modifying the page, mainly so that we have a representation
214 	 * that can be dumped into WAL and then the replay code can do exactly
215 	 * the same thing.  The output of this step consists of six arrays
216 	 * describing four kinds of operations, to be performed in this order:
217 	 *
218 	 * toDead[]: tuple numbers to be replaced with DEAD tuples
219 	 * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples
220 	 * moveSrc[]: tuple numbers that need to be relocated to another offset
221 	 * (replacing the tuple there) and then replaced with PLACEHOLDER tuples
222 	 * moveDest[]: new locations for moveSrc tuples
223 	 * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates
224 	 * chainDest[]: new values of nextOffset for chainSrc members
225 	 *
226 	 * It's easiest to figure out what we have to do by processing tuple
227 	 * chains, so we iterate over all the tuples (not just the deletable
228 	 * ones!) to identify chain heads, then chase down each chain and make
229 	 * work item entries for deletable tuples within the chain.
230 	 *----------
231 	 */
232 	xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0;
233 
234 	for (i = FirstOffsetNumber; i <= max; i++)
235 	{
236 		SpGistLeafTuple head;
237 		bool		interveningDeletable;
238 		OffsetNumber prevLive;
239 		OffsetNumber j;
240 
241 		head = (SpGistLeafTuple) PageGetItem(page,
242 											 PageGetItemId(page, i));
243 		if (head->tupstate != SPGIST_LIVE)
244 			continue;			/* can't be a chain member */
245 		if (predecessor[i] != 0)
246 			continue;			/* not a chain head */
247 
248 		/* initialize ... */
249 		interveningDeletable = false;
250 		prevLive = deletable[i] ? InvalidOffsetNumber : i;
251 
252 		/* scan down the chain ... */
253 		j = head->nextOffset;
254 		while (j != InvalidOffsetNumber)
255 		{
256 			SpGistLeafTuple lt;
257 
258 			lt = (SpGistLeafTuple) PageGetItem(page,
259 											   PageGetItemId(page, j));
260 			if (lt->tupstate != SPGIST_LIVE)
261 			{
262 				/* all tuples in chain should be live */
263 				elog(ERROR, "unexpected SPGiST tuple state: %d",
264 					 lt->tupstate);
265 			}
266 
267 			if (deletable[j])
268 			{
269 				/* This tuple should be replaced by a placeholder */
270 				toPlaceholder[xlrec.nPlaceholder] = j;
271 				xlrec.nPlaceholder++;
272 				/* previous live tuple's chain link will need an update */
273 				interveningDeletable = true;
274 			}
275 			else if (prevLive == InvalidOffsetNumber)
276 			{
277 				/*
278 				 * This is the first live tuple in the chain.  It has to move
279 				 * to the head position.
280 				 */
281 				moveSrc[xlrec.nMove] = j;
282 				moveDest[xlrec.nMove] = i;
283 				xlrec.nMove++;
284 				/* Chain updates will be applied after the move */
285 				prevLive = i;
286 				interveningDeletable = false;
287 			}
288 			else
289 			{
290 				/*
291 				 * Second or later live tuple.  Arrange to re-chain it to the
292 				 * previous live one, if there was a gap.
293 				 */
294 				if (interveningDeletable)
295 				{
296 					chainSrc[xlrec.nChain] = prevLive;
297 					chainDest[xlrec.nChain] = j;
298 					xlrec.nChain++;
299 				}
300 				prevLive = j;
301 				interveningDeletable = false;
302 			}
303 
304 			j = lt->nextOffset;
305 		}
306 
307 		if (prevLive == InvalidOffsetNumber)
308 		{
309 			/* The chain is entirely removable, so we need a DEAD tuple */
310 			toDead[xlrec.nDead] = i;
311 			xlrec.nDead++;
312 		}
313 		else if (interveningDeletable)
314 		{
315 			/* One or more deletions at end of chain, so close it off */
316 			chainSrc[xlrec.nChain] = prevLive;
317 			chainDest[xlrec.nChain] = InvalidOffsetNumber;
318 			xlrec.nChain++;
319 		}
320 	}
321 
322 	/* sanity check ... */
323 	if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove)
324 		elog(ERROR, "inconsistent counts of deletable tuples");
325 
326 	/* Do the updates */
327 	START_CRIT_SECTION();
328 
329 	spgPageIndexMultiDelete(&bds->spgstate, page,
330 							toDead, xlrec.nDead,
331 							SPGIST_DEAD, SPGIST_DEAD,
332 							InvalidBlockNumber, InvalidOffsetNumber);
333 
334 	spgPageIndexMultiDelete(&bds->spgstate, page,
335 							toPlaceholder, xlrec.nPlaceholder,
336 							SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
337 							InvalidBlockNumber, InvalidOffsetNumber);
338 
339 	/*
340 	 * We implement the move step by swapping the line pointers of the source
341 	 * and target tuples, then replacing the newly-source tuples with
342 	 * placeholders.  This is perhaps unduly friendly with the page data
343 	 * representation, but it's fast and doesn't risk page overflow when a
344 	 * tuple to be relocated is large.
345 	 */
346 	for (i = 0; i < xlrec.nMove; i++)
347 	{
348 		ItemId		idSrc = PageGetItemId(page, moveSrc[i]);
349 		ItemId		idDest = PageGetItemId(page, moveDest[i]);
350 		ItemIdData	tmp;
351 
352 		tmp = *idSrc;
353 		*idSrc = *idDest;
354 		*idDest = tmp;
355 	}
356 
357 	spgPageIndexMultiDelete(&bds->spgstate, page,
358 							moveSrc, xlrec.nMove,
359 							SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
360 							InvalidBlockNumber, InvalidOffsetNumber);
361 
362 	for (i = 0; i < xlrec.nChain; i++)
363 	{
364 		SpGistLeafTuple lt;
365 
366 		lt = (SpGistLeafTuple) PageGetItem(page,
367 										   PageGetItemId(page, chainSrc[i]));
368 		Assert(lt->tupstate == SPGIST_LIVE);
369 		lt->nextOffset = chainDest[i];
370 	}
371 
372 	MarkBufferDirty(buffer);
373 
374 	if (RelationNeedsWAL(index))
375 	{
376 		XLogRecPtr	recptr;
377 
378 		XLogBeginInsert();
379 
380 		STORE_STATE(&bds->spgstate, xlrec.stateSrc);
381 
382 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumLeaf);
383 		/* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
384 		XLogRegisterData((char *) toDead, sizeof(OffsetNumber) * xlrec.nDead);
385 		XLogRegisterData((char *) toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder);
386 		XLogRegisterData((char *) moveSrc, sizeof(OffsetNumber) * xlrec.nMove);
387 		XLogRegisterData((char *) moveDest, sizeof(OffsetNumber) * xlrec.nMove);
388 		XLogRegisterData((char *) chainSrc, sizeof(OffsetNumber) * xlrec.nChain);
389 		XLogRegisterData((char *) chainDest, sizeof(OffsetNumber) * xlrec.nChain);
390 
391 		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
392 
393 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF);
394 
395 		PageSetLSN(page, recptr);
396 	}
397 
398 	END_CRIT_SECTION();
399 }
400 
401 /*
402  * Vacuum a root page when it is also a leaf
403  *
404  * On the root, we just delete any dead leaf tuples; no fancy business
405  */
406 static void
vacuumLeafRoot(spgBulkDeleteState * bds,Relation index,Buffer buffer)407 vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
408 {
409 	Page		page = BufferGetPage(buffer);
410 	spgxlogVacuumRoot xlrec;
411 	OffsetNumber toDelete[MaxIndexTuplesPerPage];
412 	OffsetNumber i,
413 				max = PageGetMaxOffsetNumber(page);
414 
415 	xlrec.nDelete = 0;
416 
417 	/* Scan page, identify tuples to delete, accumulate stats */
418 	for (i = FirstOffsetNumber; i <= max; i++)
419 	{
420 		SpGistLeafTuple lt;
421 
422 		lt = (SpGistLeafTuple) PageGetItem(page,
423 										   PageGetItemId(page, i));
424 		if (lt->tupstate == SPGIST_LIVE)
425 		{
426 			Assert(ItemPointerIsValid(&lt->heapPtr));
427 
428 			if (bds->callback(&lt->heapPtr, bds->callback_state))
429 			{
430 				bds->stats->tuples_removed += 1;
431 				toDelete[xlrec.nDelete] = i;
432 				xlrec.nDelete++;
433 			}
434 			else
435 			{
436 				bds->stats->num_index_tuples += 1;
437 			}
438 		}
439 		else
440 		{
441 			/* all tuples on root should be live */
442 			elog(ERROR, "unexpected SPGiST tuple state: %d",
443 				 lt->tupstate);
444 		}
445 	}
446 
447 	if (xlrec.nDelete == 0)
448 		return;					/* nothing more to do */
449 
450 	/* Do the update */
451 	START_CRIT_SECTION();
452 
453 	/* The tuple numbers are in order, so we can use PageIndexMultiDelete */
454 	PageIndexMultiDelete(page, toDelete, xlrec.nDelete);
455 
456 	MarkBufferDirty(buffer);
457 
458 	if (RelationNeedsWAL(index))
459 	{
460 		XLogRecPtr	recptr;
461 
462 		XLogBeginInsert();
463 
464 		/* Prepare WAL record */
465 		STORE_STATE(&bds->spgstate, xlrec.stateSrc);
466 
467 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRoot);
468 		/* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
469 		XLogRegisterData((char *) toDelete,
470 						 sizeof(OffsetNumber) * xlrec.nDelete);
471 
472 		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
473 
474 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT);
475 
476 		PageSetLSN(page, recptr);
477 	}
478 
479 	END_CRIT_SECTION();
480 }
481 
482 /*
483  * Clean up redirect and placeholder tuples on the given page
484  *
485  * Redirect tuples can be marked placeholder once they're old enough.
486  * Placeholder tuples can be removed if it won't change the offsets of
487  * non-placeholder ones.
488  *
489  * Unlike the routines above, this works on both leaf and inner pages.
490  */
491 static void
vacuumRedirectAndPlaceholder(Relation index,Buffer buffer)492 vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
493 {
494 	Page		page = BufferGetPage(buffer);
495 	SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
496 	OffsetNumber i,
497 				max = PageGetMaxOffsetNumber(page),
498 				firstPlaceholder = InvalidOffsetNumber;
499 	bool		hasNonPlaceholder = false;
500 	bool		hasUpdate = false;
501 	OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage];
502 	OffsetNumber itemnos[MaxIndexTuplesPerPage];
503 	spgxlogVacuumRedirect xlrec;
504 
505 	xlrec.nToPlaceholder = 0;
506 	xlrec.newestRedirectXid = InvalidTransactionId;
507 
508 	START_CRIT_SECTION();
509 
510 	/*
511 	 * Scan backwards to convert old redirection tuples to placeholder tuples,
512 	 * and identify location of last non-placeholder tuple while at it.
513 	 */
514 	for (i = max;
515 		 i >= FirstOffsetNumber &&
516 		 (opaque->nRedirection > 0 || !hasNonPlaceholder);
517 		 i--)
518 	{
519 		SpGistDeadTuple dt;
520 
521 		dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i));
522 
523 		if (dt->tupstate == SPGIST_REDIRECT &&
524 			TransactionIdPrecedes(dt->xid, RecentGlobalXmin))
525 		{
526 			dt->tupstate = SPGIST_PLACEHOLDER;
527 			Assert(opaque->nRedirection > 0);
528 			opaque->nRedirection--;
529 			opaque->nPlaceholder++;
530 
531 			/* remember newest XID among the removed redirects */
532 			if (!TransactionIdIsValid(xlrec.newestRedirectXid) ||
533 				TransactionIdPrecedes(xlrec.newestRedirectXid, dt->xid))
534 				xlrec.newestRedirectXid = dt->xid;
535 
536 			ItemPointerSetInvalid(&dt->pointer);
537 
538 			itemToPlaceholder[xlrec.nToPlaceholder] = i;
539 			xlrec.nToPlaceholder++;
540 
541 			hasUpdate = true;
542 		}
543 
544 		if (dt->tupstate == SPGIST_PLACEHOLDER)
545 		{
546 			if (!hasNonPlaceholder)
547 				firstPlaceholder = i;
548 		}
549 		else
550 		{
551 			hasNonPlaceholder = true;
552 		}
553 	}
554 
555 	/*
556 	 * Any placeholder tuples at the end of page can safely be removed.  We
557 	 * can't remove ones before the last non-placeholder, though, because we
558 	 * can't alter the offset numbers of non-placeholder tuples.
559 	 */
560 	if (firstPlaceholder != InvalidOffsetNumber)
561 	{
562 		/*
563 		 * We do not store this array to rdata because it's easy to recreate.
564 		 */
565 		for (i = firstPlaceholder; i <= max; i++)
566 			itemnos[i - firstPlaceholder] = i;
567 
568 		i = max - firstPlaceholder + 1;
569 		Assert(opaque->nPlaceholder >= i);
570 		opaque->nPlaceholder -= i;
571 
572 		/* The array is surely sorted, so can use PageIndexMultiDelete */
573 		PageIndexMultiDelete(page, itemnos, i);
574 
575 		hasUpdate = true;
576 	}
577 
578 	xlrec.firstPlaceholder = firstPlaceholder;
579 
580 	if (hasUpdate)
581 		MarkBufferDirty(buffer);
582 
583 	if (hasUpdate && RelationNeedsWAL(index))
584 	{
585 		XLogRecPtr	recptr;
586 
587 		XLogBeginInsert();
588 
589 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRedirect);
590 		XLogRegisterData((char *) itemToPlaceholder,
591 						 sizeof(OffsetNumber) * xlrec.nToPlaceholder);
592 
593 		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
594 
595 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT);
596 
597 		PageSetLSN(page, recptr);
598 	}
599 
600 	END_CRIT_SECTION();
601 }
602 
603 /*
604  * Process one page during a bulkdelete scan
605  */
606 static void
spgvacuumpage(spgBulkDeleteState * bds,BlockNumber blkno)607 spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
608 {
609 	Relation	index = bds->info->index;
610 	Buffer		buffer;
611 	Page		page;
612 
613 	/* call vacuum_delay_point while not holding any buffer lock */
614 	vacuum_delay_point();
615 
616 	buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
617 								RBM_NORMAL, bds->info->strategy);
618 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
619 	page = (Page) BufferGetPage(buffer);
620 
621 	if (PageIsNew(page))
622 	{
623 		/*
624 		 * We found an all-zero page, which could happen if the database
625 		 * crashed just after extending the file.  Recycle it.
626 		 */
627 	}
628 	else if (PageIsEmpty(page))
629 	{
630 		/* nothing to do */
631 	}
632 	else if (SpGistPageIsLeaf(page))
633 	{
634 		if (SpGistBlockIsRoot(blkno))
635 		{
636 			vacuumLeafRoot(bds, index, buffer);
637 			/* no need for vacuumRedirectAndPlaceholder */
638 		}
639 		else
640 		{
641 			vacuumLeafPage(bds, index, buffer, false);
642 			vacuumRedirectAndPlaceholder(index, buffer);
643 		}
644 	}
645 	else
646 	{
647 		/* inner page */
648 		vacuumRedirectAndPlaceholder(index, buffer);
649 	}
650 
651 	/*
652 	 * The root pages must never be deleted, nor marked as available in FSM,
653 	 * because we don't want them ever returned by a search for a place to put
654 	 * a new tuple.  Otherwise, check for empty page, and make sure the FSM
655 	 * knows about it.
656 	 */
657 	if (!SpGistBlockIsRoot(blkno))
658 	{
659 		if (PageIsNew(page) || PageIsEmpty(page))
660 		{
661 			RecordFreeIndexPage(index, blkno);
662 			bds->stats->pages_deleted++;
663 		}
664 		else
665 		{
666 			SpGistSetLastUsedPage(index, buffer);
667 			bds->lastFilledBlock = blkno;
668 		}
669 	}
670 
671 	UnlockReleaseBuffer(buffer);
672 }
673 
674 /*
675  * Process the pending-TID list between pages of the main scan
676  */
677 static void
spgprocesspending(spgBulkDeleteState * bds)678 spgprocesspending(spgBulkDeleteState *bds)
679 {
680 	Relation	index = bds->info->index;
681 	spgVacPendingItem *pitem;
682 	spgVacPendingItem *nitem;
683 	BlockNumber blkno;
684 	Buffer		buffer;
685 	Page		page;
686 
687 	for (pitem = bds->pendingList; pitem != NULL; pitem = pitem->next)
688 	{
689 		if (pitem->done)
690 			continue;			/* ignore already-done items */
691 
692 		/* call vacuum_delay_point while not holding any buffer lock */
693 		vacuum_delay_point();
694 
695 		/* examine the referenced page */
696 		blkno = ItemPointerGetBlockNumber(&pitem->tid);
697 		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
698 									RBM_NORMAL, bds->info->strategy);
699 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
700 		page = (Page) BufferGetPage(buffer);
701 
702 		if (PageIsNew(page) || SpGistPageIsDeleted(page))
703 		{
704 			/* Probably shouldn't happen, but ignore it */
705 		}
706 		else if (SpGistPageIsLeaf(page))
707 		{
708 			if (SpGistBlockIsRoot(blkno))
709 			{
710 				/* this should definitely not happen */
711 				elog(ERROR, "redirection leads to root page of index \"%s\"",
712 					 RelationGetRelationName(index));
713 			}
714 
715 			/* deal with any deletable tuples */
716 			vacuumLeafPage(bds, index, buffer, true);
717 			/* might as well do this while we are here */
718 			vacuumRedirectAndPlaceholder(index, buffer);
719 
720 			SpGistSetLastUsedPage(index, buffer);
721 
722 			/*
723 			 * We can mark as done not only this item, but any later ones
724 			 * pointing at the same page, since we vacuumed the whole page.
725 			 */
726 			pitem->done = true;
727 			for (nitem = pitem->next; nitem != NULL; nitem = nitem->next)
728 			{
729 				if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
730 					nitem->done = true;
731 			}
732 		}
733 		else
734 		{
735 			/*
736 			 * On an inner page, visit the referenced inner tuple and add all
737 			 * its downlinks to the pending list.  We might have pending items
738 			 * for more than one inner tuple on the same page (in fact this is
739 			 * pretty likely given the way space allocation works), so get
740 			 * them all while we are here.
741 			 */
742 			for (nitem = pitem; nitem != NULL; nitem = nitem->next)
743 			{
744 				if (nitem->done)
745 					continue;
746 				if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
747 				{
748 					OffsetNumber offset;
749 					SpGistInnerTuple innerTuple;
750 
751 					offset = ItemPointerGetOffsetNumber(&nitem->tid);
752 					innerTuple = (SpGistInnerTuple) PageGetItem(page,
753 																PageGetItemId(page, offset));
754 					if (innerTuple->tupstate == SPGIST_LIVE)
755 					{
756 						SpGistNodeTuple node;
757 						int			i;
758 
759 						SGITITERATE(innerTuple, i, node)
760 						{
761 							if (ItemPointerIsValid(&node->t_tid))
762 								spgAddPendingTID(bds, &node->t_tid);
763 						}
764 					}
765 					else if (innerTuple->tupstate == SPGIST_REDIRECT)
766 					{
767 						/* transfer attention to redirect point */
768 						spgAddPendingTID(bds,
769 										 &((SpGistDeadTuple) innerTuple)->pointer);
770 					}
771 					else
772 						elog(ERROR, "unexpected SPGiST tuple state: %d",
773 							 innerTuple->tupstate);
774 
775 					nitem->done = true;
776 				}
777 			}
778 		}
779 
780 		UnlockReleaseBuffer(buffer);
781 	}
782 
783 	spgClearPendingList(bds);
784 }
785 
786 /*
787  * Perform a bulkdelete scan
788  */
789 static void
spgvacuumscan(spgBulkDeleteState * bds)790 spgvacuumscan(spgBulkDeleteState *bds)
791 {
792 	Relation	index = bds->info->index;
793 	bool		needLock;
794 	BlockNumber num_pages,
795 				blkno;
796 
797 	/* Finish setting up spgBulkDeleteState */
798 	initSpGistState(&bds->spgstate, index);
799 	bds->pendingList = NULL;
800 	bds->myXmin = GetActiveSnapshot()->xmin;
801 	bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO;
802 
803 	/*
804 	 * Reset counts that will be incremented during the scan; needed in case
805 	 * of multiple scans during a single VACUUM command
806 	 */
807 	bds->stats->estimated_count = false;
808 	bds->stats->num_index_tuples = 0;
809 	bds->stats->pages_deleted = 0;
810 
811 	/* We can skip locking for new or temp relations */
812 	needLock = !RELATION_IS_LOCAL(index);
813 
814 	/*
815 	 * The outer loop iterates over all index pages except the metapage, in
816 	 * physical order (we hope the kernel will cooperate in providing
817 	 * read-ahead for speed).  It is critical that we visit all leaf pages,
818 	 * including ones added after we start the scan, else we might fail to
819 	 * delete some deletable tuples.  See more extensive comments about this
820 	 * in btvacuumscan().
821 	 */
822 	blkno = SPGIST_METAPAGE_BLKNO + 1;
823 	for (;;)
824 	{
825 		/* Get the current relation length */
826 		if (needLock)
827 			LockRelationForExtension(index, ExclusiveLock);
828 		num_pages = RelationGetNumberOfBlocks(index);
829 		if (needLock)
830 			UnlockRelationForExtension(index, ExclusiveLock);
831 
832 		/* Quit if we've scanned the whole relation */
833 		if (blkno >= num_pages)
834 			break;
835 		/* Iterate over pages, then loop back to recheck length */
836 		for (; blkno < num_pages; blkno++)
837 		{
838 			spgvacuumpage(bds, blkno);
839 			/* empty the pending-list after each page */
840 			if (bds->pendingList != NULL)
841 				spgprocesspending(bds);
842 		}
843 	}
844 
845 	/* Propagate local lastUsedPage cache to metablock */
846 	SpGistUpdateMetaPage(index);
847 
848 	/*
849 	 * If we found any empty pages (and recorded them in the FSM), then
850 	 * forcibly update the upper-level FSM pages to ensure that searchers can
851 	 * find them.  It's possible that the pages were also found during
852 	 * previous scans and so this is a waste of time, but it's cheap enough
853 	 * relative to scanning the index that it shouldn't matter much, and
854 	 * making sure that free pages are available sooner not later seems
855 	 * worthwhile.
856 	 *
857 	 * Note that if no empty pages exist, we don't bother vacuuming the FSM at
858 	 * all.
859 	 */
860 	if (bds->stats->pages_deleted > 0)
861 		IndexFreeSpaceMapVacuum(index);
862 
863 	/*
864 	 * Truncate index if possible
865 	 *
866 	 * XXX disabled because it's unsafe due to possible concurrent inserts.
867 	 * We'd have to rescan the pages to make sure they're still empty, and it
868 	 * doesn't seem worth it.  Note that btree doesn't do this either.
869 	 *
870 	 * Another reason not to truncate is that it could invalidate the cached
871 	 * pages-with-freespace pointers in the metapage and other backends'
872 	 * relation caches, that is leave them pointing to nonexistent pages.
873 	 * Adding RelationGetNumberOfBlocks calls to protect the places that use
874 	 * those pointers would be unduly expensive.
875 	 */
876 #ifdef NOT_USED
877 	if (num_pages > bds->lastFilledBlock + 1)
878 	{
879 		BlockNumber lastBlock = num_pages - 1;
880 
881 		num_pages = bds->lastFilledBlock + 1;
882 		RelationTruncate(index, num_pages);
883 		bds->stats->pages_removed += lastBlock - bds->lastFilledBlock;
884 		bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock;
885 	}
886 #endif
887 
888 	/* Report final stats */
889 	bds->stats->num_pages = num_pages;
890 	bds->stats->pages_free = bds->stats->pages_deleted;
891 }
892 
893 /*
894  * Bulk deletion of all index entries pointing to a set of heap tuples.
895  * The set of target tuples is specified via a callback routine that tells
896  * whether any given heap tuple (identified by ItemPointer) is being deleted.
897  *
898  * Result: a palloc'd struct containing statistical info for VACUUM displays.
899  */
900 IndexBulkDeleteResult *
spgbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)901 spgbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
902 			  IndexBulkDeleteCallback callback, void *callback_state)
903 {
904 	spgBulkDeleteState bds;
905 
906 	/* allocate stats if first time through, else re-use existing struct */
907 	if (stats == NULL)
908 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
909 	bds.info = info;
910 	bds.stats = stats;
911 	bds.callback = callback;
912 	bds.callback_state = callback_state;
913 
914 	spgvacuumscan(&bds);
915 
916 	return stats;
917 }
918 
919 /* Dummy callback to delete no tuples during spgvacuumcleanup */
920 static bool
dummy_callback(ItemPointer itemptr,void * state)921 dummy_callback(ItemPointer itemptr, void *state)
922 {
923 	return false;
924 }
925 
926 /*
927  * Post-VACUUM cleanup.
928  *
929  * Result: a palloc'd struct containing statistical info for VACUUM displays.
930  */
931 IndexBulkDeleteResult *
spgvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)932 spgvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
933 {
934 	spgBulkDeleteState bds;
935 
936 	/* No-op in ANALYZE ONLY mode */
937 	if (info->analyze_only)
938 		return stats;
939 
940 	/*
941 	 * We don't need to scan the index if there was a preceding bulkdelete
942 	 * pass.  Otherwise, make a pass that won't delete any live tuples, but
943 	 * might still accomplish useful stuff with redirect/placeholder cleanup
944 	 * and/or FSM housekeeping, and in any case will provide stats.
945 	 */
946 	if (stats == NULL)
947 	{
948 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
949 		bds.info = info;
950 		bds.stats = stats;
951 		bds.callback = dummy_callback;
952 		bds.callback_state = NULL;
953 
954 		spgvacuumscan(&bds);
955 	}
956 
957 	/*
958 	 * It's quite possible for us to be fooled by concurrent tuple moves into
959 	 * double-counting some index tuples, so disbelieve any total that exceeds
960 	 * the underlying heap's count ... if we know that accurately.  Otherwise
961 	 * this might just make matters worse.
962 	 */
963 	if (!info->estimated_count)
964 	{
965 		if (stats->num_index_tuples > info->num_heap_tuples)
966 			stats->num_index_tuples = info->num_heap_tuples;
967 	}
968 
969 	return stats;
970 }
971