1 /*-------------------------------------------------------------------------
2  *
3  * ginvacuum.c
4  *	  delete & vacuum routines for the postgres GIN
5  *
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gin/ginvacuum.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/gin_private.h"
18 #include "access/ginxlog.h"
19 #include "access/xloginsert.h"
20 #include "commands/vacuum.h"
21 #include "miscadmin.h"
22 #include "postmaster/autovacuum.h"
23 #include "storage/indexfsm.h"
24 #include "storage/lmgr.h"
25 #include "utils/memutils.h"
26 
27 struct GinVacuumState
28 {
29 	Relation	index;
30 	IndexBulkDeleteResult *result;
31 	IndexBulkDeleteCallback callback;
32 	void	   *callback_state;
33 	GinState	ginstate;
34 	BufferAccessStrategy strategy;
35 	MemoryContext tmpCxt;
36 };
37 
38 /*
39  * Vacuums an uncompressed posting list. The size of the must can be specified
40  * in number of items (nitems).
41  *
42  * If none of the items need to be removed, returns NULL. Otherwise returns
43  * a new palloc'd array with the remaining items. The number of remaining
44  * items is returned in *nremaining.
45  */
46 ItemPointer
ginVacuumItemPointers(GinVacuumState * gvs,ItemPointerData * items,int nitem,int * nremaining)47 ginVacuumItemPointers(GinVacuumState *gvs, ItemPointerData *items,
48 					  int nitem, int *nremaining)
49 {
50 	int			i,
51 				remaining = 0;
52 	ItemPointer tmpitems = NULL;
53 
54 	/*
55 	 * Iterate over TIDs array
56 	 */
57 	for (i = 0; i < nitem; i++)
58 	{
59 		if (gvs->callback(items + i, gvs->callback_state))
60 		{
61 			gvs->result->tuples_removed += 1;
62 			if (!tmpitems)
63 			{
64 				/*
65 				 * First TID to be deleted: allocate memory to hold the
66 				 * remaining items.
67 				 */
68 				tmpitems = palloc(sizeof(ItemPointerData) * nitem);
69 				memcpy(tmpitems, items, sizeof(ItemPointerData) * i);
70 			}
71 		}
72 		else
73 		{
74 			gvs->result->num_index_tuples += 1;
75 			if (tmpitems)
76 				tmpitems[remaining] = items[i];
77 			remaining++;
78 		}
79 	}
80 
81 	*nremaining = remaining;
82 	return tmpitems;
83 }
84 
85 /*
86  * Create a WAL record for vacuuming entry tree leaf page.
87  */
88 static void
xlogVacuumPage(Relation index,Buffer buffer)89 xlogVacuumPage(Relation index, Buffer buffer)
90 {
91 	Page		page = BufferGetPage(buffer);
92 	XLogRecPtr	recptr;
93 
94 	/* This is only used for entry tree leaf pages. */
95 	Assert(!GinPageIsData(page));
96 	Assert(GinPageIsLeaf(page));
97 
98 	if (!RelationNeedsWAL(index))
99 		return;
100 
101 	/*
102 	 * Always create a full image, we don't track the changes on the page at
103 	 * any more fine-grained level. This could obviously be improved...
104 	 */
105 	XLogBeginInsert();
106 	XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
107 
108 	recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE);
109 	PageSetLSN(page, recptr);
110 }
111 
112 
113 typedef struct DataPageDeleteStack
114 {
115 	struct DataPageDeleteStack *child;
116 	struct DataPageDeleteStack *parent;
117 
118 	BlockNumber blkno;			/* current block number */
119 	Buffer		leftBuffer;		/* pinned and locked rightest non-deleted page
120 								 * on left */
121 	bool		isRoot;
122 } DataPageDeleteStack;
123 
124 
125 /*
126  * Delete a posting tree page.
127  */
128 static void
ginDeletePage(GinVacuumState * gvs,BlockNumber deleteBlkno,BlockNumber leftBlkno,BlockNumber parentBlkno,OffsetNumber myoff,bool isParentRoot)129 ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkno,
130 			  BlockNumber parentBlkno, OffsetNumber myoff, bool isParentRoot)
131 {
132 	Buffer		dBuffer;
133 	Buffer		lBuffer;
134 	Buffer		pBuffer;
135 	Page		page,
136 				parentPage;
137 	BlockNumber rightlink;
138 
139 	/*
140 	 * This function MUST be called only if someone of parent pages hold
141 	 * exclusive cleanup lock. This guarantees that no insertions currently
142 	 * happen in this subtree. Caller also acquires Exclusive locks on
143 	 * deletable, parent and left pages.
144 	 */
145 	lBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, leftBlkno,
146 								 RBM_NORMAL, gvs->strategy);
147 	dBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, deleteBlkno,
148 								 RBM_NORMAL, gvs->strategy);
149 	pBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, parentBlkno,
150 								 RBM_NORMAL, gvs->strategy);
151 
152 	START_CRIT_SECTION();
153 
154 	/* Unlink the page by changing left sibling's rightlink */
155 	page = BufferGetPage(dBuffer);
156 	rightlink = GinPageGetOpaque(page)->rightlink;
157 
158 	/* For deleted page remember last xid which could knew its address */
159 	GinPageSetDeleteXid(page, ReadNewTransactionId());
160 
161 	page = BufferGetPage(lBuffer);
162 	GinPageGetOpaque(page)->rightlink = rightlink;
163 
164 	/* Delete downlink from parent */
165 	parentPage = BufferGetPage(pBuffer);
166 #ifdef USE_ASSERT_CHECKING
167 	do
168 	{
169 		PostingItem *tod = GinDataPageGetPostingItem(parentPage, myoff);
170 
171 		Assert(PostingItemGetBlockNumber(tod) == deleteBlkno);
172 	} while (0);
173 #endif
174 	GinPageDeletePostingItem(parentPage, myoff);
175 
176 	page = BufferGetPage(dBuffer);
177 
178 	/*
179 	 * we shouldn't change rightlink field to save workability of running
180 	 * search scan
181 	 */
182 	GinPageSetDeleted(page);
183 
184 	MarkBufferDirty(pBuffer);
185 	MarkBufferDirty(lBuffer);
186 	MarkBufferDirty(dBuffer);
187 
188 	if (RelationNeedsWAL(gvs->index))
189 	{
190 		XLogRecPtr	recptr;
191 		ginxlogDeletePage data;
192 
193 		/*
194 		 * We can't pass REGBUF_STANDARD for the deleted page, because we
195 		 * didn't set pd_lower on pre-9.4 versions. The page might've been
196 		 * binary-upgraded from an older version, and hence not have pd_lower
197 		 * set correctly. Ditto for the left page, but removing the item from
198 		 * the parent updated its pd_lower, so we know that's OK at this
199 		 * point.
200 		 */
201 		XLogBeginInsert();
202 		XLogRegisterBuffer(0, dBuffer, 0);
203 		XLogRegisterBuffer(1, pBuffer, REGBUF_STANDARD);
204 		XLogRegisterBuffer(2, lBuffer, 0);
205 
206 		data.parentOffset = myoff;
207 		data.rightLink = GinPageGetOpaque(page)->rightlink;
208 		data.deleteXid = GinPageGetDeleteXid(page);
209 
210 		XLogRegisterData((char *) &data, sizeof(ginxlogDeletePage));
211 
212 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_PAGE);
213 		PageSetLSN(page, recptr);
214 		PageSetLSN(parentPage, recptr);
215 		PageSetLSN(BufferGetPage(lBuffer), recptr);
216 	}
217 
218 	ReleaseBuffer(pBuffer);
219 	ReleaseBuffer(lBuffer);
220 	ReleaseBuffer(dBuffer);
221 
222 	END_CRIT_SECTION();
223 
224 	gvs->result->pages_deleted++;
225 }
226 
227 
228 /*
229  * Scans posting tree and deletes empty pages.  Caller must lock root page for
230  * cleanup.  During scan path from root to current page is kept exclusively
231  * locked.  Also keep left page exclusively locked, because ginDeletePage()
232  * needs it.  If we try to relock left page later, it could deadlock with
233  * ginStepRight().
234  */
235 static bool
ginScanToDelete(GinVacuumState * gvs,BlockNumber blkno,bool isRoot,DataPageDeleteStack * parent,OffsetNumber myoff)236 ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot,
237 				DataPageDeleteStack *parent, OffsetNumber myoff)
238 {
239 	DataPageDeleteStack *me;
240 	Buffer		buffer;
241 	Page		page;
242 	bool		meDelete = FALSE;
243 	bool		isempty;
244 
245 	if (isRoot)
246 	{
247 		me = parent;
248 	}
249 	else
250 	{
251 		if (!parent->child)
252 		{
253 			me = (DataPageDeleteStack *) palloc0(sizeof(DataPageDeleteStack));
254 			me->parent = parent;
255 			parent->child = me;
256 			me->leftBuffer = InvalidBuffer;
257 		}
258 		else
259 			me = parent->child;
260 	}
261 
262 	buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
263 								RBM_NORMAL, gvs->strategy);
264 
265 	if (!isRoot)
266 		LockBuffer(buffer, GIN_EXCLUSIVE);
267 
268 	page = BufferGetPage(buffer);
269 
270 	Assert(GinPageIsData(page));
271 
272 	if (!GinPageIsLeaf(page))
273 	{
274 		OffsetNumber i;
275 
276 		me->blkno = blkno;
277 		for (i = FirstOffsetNumber; i <= GinPageGetOpaque(page)->maxoff; i++)
278 		{
279 			PostingItem *pitem = GinDataPageGetPostingItem(page, i);
280 
281 			if (ginScanToDelete(gvs, PostingItemGetBlockNumber(pitem), FALSE, me, i))
282 				i--;
283 		}
284 
285 		if (GinPageRightMost(page) && BufferIsValid(me->child->leftBuffer))
286 		{
287 			UnlockReleaseBuffer(me->child->leftBuffer);
288 			me->child->leftBuffer = InvalidBuffer;
289 		}
290 	}
291 
292 	if (GinPageIsLeaf(page))
293 		isempty = GinDataLeafPageIsEmpty(page);
294 	else
295 		isempty = GinPageGetOpaque(page)->maxoff < FirstOffsetNumber;
296 
297 	if (isempty)
298 	{
299 		/* we never delete the left- or rightmost branch */
300 		if (BufferIsValid(me->leftBuffer) && !GinPageRightMost(page))
301 		{
302 			Assert(!isRoot);
303 			ginDeletePage(gvs, blkno, BufferGetBlockNumber(me->leftBuffer),
304 						  me->parent->blkno, myoff, me->parent->isRoot);
305 			meDelete = TRUE;
306 		}
307 	}
308 
309 	if (!meDelete)
310 	{
311 		if (BufferIsValid(me->leftBuffer))
312 			UnlockReleaseBuffer(me->leftBuffer);
313 		me->leftBuffer = buffer;
314 	}
315 	else
316 	{
317 		if (!isRoot)
318 			LockBuffer(buffer, GIN_UNLOCK);
319 
320 		ReleaseBuffer(buffer);
321 	}
322 
323 	if (isRoot)
324 		ReleaseBuffer(buffer);
325 
326 	return meDelete;
327 }
328 
329 
330 /*
331  * Scan through posting tree leafs, delete empty tuples.  Returns true if there
332  * is at least one empty page.
333  */
334 static bool
ginVacuumPostingTreeLeaves(GinVacuumState * gvs,BlockNumber blkno)335 ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno)
336 {
337 	Buffer		buffer;
338 	Page		page;
339 	bool		hasVoidPage = FALSE;
340 	MemoryContext oldCxt;
341 
342 	/* Find leftmost leaf page of posting tree and lock it in exclusive mode */
343 	while (true)
344 	{
345 		PostingItem *pitem;
346 
347 		buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
348 									RBM_NORMAL, gvs->strategy);
349 		LockBuffer(buffer, GIN_SHARE);
350 		page = BufferGetPage(buffer);
351 
352 		Assert(GinPageIsData(page));
353 
354 		if (GinPageIsLeaf(page))
355 		{
356 			LockBuffer(buffer, GIN_UNLOCK);
357 			LockBuffer(buffer, GIN_EXCLUSIVE);
358 			break;
359 		}
360 
361 		Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
362 
363 		pitem = GinDataPageGetPostingItem(page, FirstOffsetNumber);
364 		blkno = PostingItemGetBlockNumber(pitem);
365 		Assert(blkno != InvalidBlockNumber);
366 
367 		UnlockReleaseBuffer(buffer);
368 	}
369 
370 	/* Iterate all posting tree leaves using rightlinks and vacuum them */
371 	while (true)
372 	{
373 		oldCxt = MemoryContextSwitchTo(gvs->tmpCxt);
374 		ginVacuumPostingTreeLeaf(gvs->index, buffer, gvs);
375 		MemoryContextSwitchTo(oldCxt);
376 		MemoryContextReset(gvs->tmpCxt);
377 
378 		if (GinDataLeafPageIsEmpty(page))
379 			hasVoidPage = TRUE;
380 
381 		blkno = GinPageGetOpaque(page)->rightlink;
382 
383 		UnlockReleaseBuffer(buffer);
384 
385 		if (blkno == InvalidBlockNumber)
386 			break;
387 
388 		buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
389 									RBM_NORMAL, gvs->strategy);
390 		LockBuffer(buffer, GIN_EXCLUSIVE);
391 		page = BufferGetPage(buffer);
392 	}
393 
394 	return hasVoidPage;
395 }
396 
397 static void
ginVacuumPostingTree(GinVacuumState * gvs,BlockNumber rootBlkno)398 ginVacuumPostingTree(GinVacuumState *gvs, BlockNumber rootBlkno)
399 {
400 	if (ginVacuumPostingTreeLeaves(gvs, rootBlkno))
401 	{
402 		/*
403 		 * There is at least one empty page.  So we have to rescan the tree
404 		 * deleting empty pages.
405 		 */
406 		Buffer				buffer;
407 		DataPageDeleteStack root,
408 						   *ptr,
409 						   *tmp;
410 
411 		buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, rootBlkno,
412 									RBM_NORMAL, gvs->strategy);
413 
414 		/*
415 		 * Lock posting tree root for cleanup to ensure there are no concurrent
416 		 * inserts.
417 		 */
418 		LockBufferForCleanup(buffer);
419 
420 		memset(&root, 0, sizeof(DataPageDeleteStack));
421 		root.leftBuffer = InvalidBuffer;
422 		root.isRoot = true;
423 
424 		ginScanToDelete(gvs, rootBlkno, true, &root, InvalidOffsetNumber);
425 
426 		ptr = root.child;
427 
428 		while (ptr)
429 		{
430 			tmp = ptr->child;
431 			pfree(ptr);
432 			ptr = tmp;
433 		}
434 
435 		UnlockReleaseBuffer(buffer);
436 	}
437 }
438 
439 /*
440  * returns modified page or NULL if page isn't modified.
441  * Function works with original page until first change is occurred,
442  * then page is copied into temporary one.
443  */
444 static Page
ginVacuumEntryPage(GinVacuumState * gvs,Buffer buffer,BlockNumber * roots,uint32 * nroot)445 ginVacuumEntryPage(GinVacuumState *gvs, Buffer buffer, BlockNumber *roots, uint32 *nroot)
446 {
447 	Page		origpage = BufferGetPage(buffer),
448 				tmppage;
449 	OffsetNumber i,
450 				maxoff = PageGetMaxOffsetNumber(origpage);
451 
452 	tmppage = origpage;
453 
454 	*nroot = 0;
455 
456 	for (i = FirstOffsetNumber; i <= maxoff; i++)
457 	{
458 		IndexTuple	itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
459 
460 		if (GinIsPostingTree(itup))
461 		{
462 			/*
463 			 * store posting tree's roots for further processing, we can't
464 			 * vacuum it just now due to risk of deadlocks with scans/inserts
465 			 */
466 			roots[*nroot] = GinGetDownlink(itup);
467 			(*nroot)++;
468 		}
469 		else if (GinGetNPosting(itup) > 0)
470 		{
471 			int			nitems;
472 			ItemPointer items_orig;
473 			bool		free_items_orig;
474 			ItemPointer items;
475 
476 			/* Get list of item pointers from the tuple. */
477 			if (GinItupIsCompressed(itup))
478 			{
479 				items_orig = ginPostingListDecode((GinPostingList *) GinGetPosting(itup), &nitems);
480 				free_items_orig = true;
481 			}
482 			else
483 			{
484 				items_orig = (ItemPointer) GinGetPosting(itup);
485 				nitems = GinGetNPosting(itup);
486 				free_items_orig = false;
487 			}
488 
489 			/* Remove any items from the list that need to be vacuumed. */
490 			items = ginVacuumItemPointers(gvs, items_orig, nitems, &nitems);
491 
492 			if (free_items_orig)
493 				pfree(items_orig);
494 
495 			/* If any item pointers were removed, recreate the tuple. */
496 			if (items)
497 			{
498 				OffsetNumber attnum;
499 				Datum		key;
500 				GinNullCategory category;
501 				GinPostingList *plist;
502 				int			plistsize;
503 
504 				if (nitems > 0)
505 				{
506 					plist = ginCompressPostingList(items, nitems, GinMaxItemSize, NULL);
507 					plistsize = SizeOfGinPostingList(plist);
508 				}
509 				else
510 				{
511 					plist = NULL;
512 					plistsize = 0;
513 				}
514 
515 				/*
516 				 * if we already created a temporary page, make changes in
517 				 * place
518 				 */
519 				if (tmppage == origpage)
520 				{
521 					/*
522 					 * On first difference, create a temporary copy of the
523 					 * page and copy the tuple's posting list to it.
524 					 */
525 					tmppage = PageGetTempPageCopy(origpage);
526 
527 					/* set itup pointer to new page */
528 					itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
529 				}
530 
531 				attnum = gintuple_get_attrnum(&gvs->ginstate, itup);
532 				key = gintuple_get_key(&gvs->ginstate, itup, &category);
533 				itup = GinFormTuple(&gvs->ginstate, attnum, key, category,
534 									(char *) plist, plistsize,
535 									nitems, true);
536 				if (plist)
537 					pfree(plist);
538 				PageIndexTupleDelete(tmppage, i);
539 
540 				if (PageAddItem(tmppage, (Item) itup, IndexTupleSize(itup), i, false, false) != i)
541 					elog(ERROR, "failed to add item to index page in \"%s\"",
542 						 RelationGetRelationName(gvs->index));
543 
544 				pfree(itup);
545 				pfree(items);
546 			}
547 		}
548 	}
549 
550 	return (tmppage == origpage) ? NULL : tmppage;
551 }
552 
553 IndexBulkDeleteResult *
ginbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)554 ginbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
555 			  IndexBulkDeleteCallback callback, void *callback_state)
556 {
557 	Relation	index = info->index;
558 	BlockNumber blkno = GIN_ROOT_BLKNO;
559 	GinVacuumState gvs;
560 	Buffer		buffer;
561 	BlockNumber rootOfPostingTree[BLCKSZ / (sizeof(IndexTupleData) + sizeof(ItemId))];
562 	uint32		nRoot;
563 
564 	gvs.tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
565 									   "Gin vacuum temporary context",
566 									   ALLOCSET_DEFAULT_SIZES);
567 	gvs.index = index;
568 	gvs.callback = callback;
569 	gvs.callback_state = callback_state;
570 	gvs.strategy = info->strategy;
571 	initGinState(&gvs.ginstate, index);
572 
573 	/* first time through? */
574 	if (stats == NULL)
575 	{
576 		/* Yes, so initialize stats to zeroes */
577 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
578 
579 		/*
580 		 * and cleanup any pending inserts
581 		 */
582 		ginInsertCleanup(&gvs.ginstate, !IsAutoVacuumWorkerProcess(),
583 						 false, true, stats);
584 	}
585 
586 	/* we'll re-count the tuples each time */
587 	stats->num_index_tuples = 0;
588 	gvs.result = stats;
589 
590 	buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
591 								RBM_NORMAL, info->strategy);
592 
593 	/* find leaf page */
594 	for (;;)
595 	{
596 		Page		page = BufferGetPage(buffer);
597 		IndexTuple	itup;
598 
599 		LockBuffer(buffer, GIN_SHARE);
600 
601 		Assert(!GinPageIsData(page));
602 
603 		if (GinPageIsLeaf(page))
604 		{
605 			LockBuffer(buffer, GIN_UNLOCK);
606 			LockBuffer(buffer, GIN_EXCLUSIVE);
607 
608 			if (blkno == GIN_ROOT_BLKNO && !GinPageIsLeaf(page))
609 			{
610 				LockBuffer(buffer, GIN_UNLOCK);
611 				continue;		/* check it one more */
612 			}
613 			break;
614 		}
615 
616 		Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
617 
618 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
619 		blkno = GinGetDownlink(itup);
620 		Assert(blkno != InvalidBlockNumber);
621 
622 		UnlockReleaseBuffer(buffer);
623 		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
624 									RBM_NORMAL, info->strategy);
625 	}
626 
627 	/* right now we found leftmost page in entry's BTree */
628 
629 	for (;;)
630 	{
631 		Page		page = BufferGetPage(buffer);
632 		Page		resPage;
633 		uint32		i;
634 
635 		Assert(!GinPageIsData(page));
636 
637 		resPage = ginVacuumEntryPage(&gvs, buffer, rootOfPostingTree, &nRoot);
638 
639 		blkno = GinPageGetOpaque(page)->rightlink;
640 
641 		if (resPage)
642 		{
643 			START_CRIT_SECTION();
644 			PageRestoreTempPage(resPage, page);
645 			MarkBufferDirty(buffer);
646 			xlogVacuumPage(gvs.index, buffer);
647 			UnlockReleaseBuffer(buffer);
648 			END_CRIT_SECTION();
649 		}
650 		else
651 		{
652 			UnlockReleaseBuffer(buffer);
653 		}
654 
655 		vacuum_delay_point();
656 
657 		for (i = 0; i < nRoot; i++)
658 		{
659 			ginVacuumPostingTree(&gvs, rootOfPostingTree[i]);
660 			vacuum_delay_point();
661 		}
662 
663 		if (blkno == InvalidBlockNumber)	/* rightmost page */
664 			break;
665 
666 		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
667 									RBM_NORMAL, info->strategy);
668 		LockBuffer(buffer, GIN_EXCLUSIVE);
669 	}
670 
671 	MemoryContextDelete(gvs.tmpCxt);
672 
673 	return gvs.result;
674 }
675 
676 IndexBulkDeleteResult *
ginvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)677 ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
678 {
679 	Relation	index = info->index;
680 	bool		needLock;
681 	BlockNumber npages,
682 				blkno;
683 	BlockNumber totFreePages;
684 	GinState	ginstate;
685 	GinStatsData idxStat;
686 
687 	/*
688 	 * In an autovacuum analyze, we want to clean up pending insertions.
689 	 * Otherwise, an ANALYZE-only call is a no-op.
690 	 */
691 	if (info->analyze_only)
692 	{
693 		if (IsAutoVacuumWorkerProcess())
694 		{
695 			initGinState(&ginstate, index);
696 			ginInsertCleanup(&ginstate, false, true, true, stats);
697 		}
698 		return stats;
699 	}
700 
701 	/*
702 	 * Set up all-zero stats and cleanup pending inserts if ginbulkdelete
703 	 * wasn't called
704 	 */
705 	if (stats == NULL)
706 	{
707 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
708 		initGinState(&ginstate, index);
709 		ginInsertCleanup(&ginstate, !IsAutoVacuumWorkerProcess(),
710 						 false, true, stats);
711 	}
712 
713 	memset(&idxStat, 0, sizeof(idxStat));
714 
715 	/*
716 	 * XXX we always report the heap tuple count as the number of index
717 	 * entries.  This is bogus if the index is partial, but it's real hard to
718 	 * tell how many distinct heap entries are referenced by a GIN index.
719 	 */
720 	stats->num_index_tuples = info->num_heap_tuples;
721 	stats->estimated_count = info->estimated_count;
722 
723 	/*
724 	 * Need lock unless it's local to this backend.
725 	 */
726 	needLock = !RELATION_IS_LOCAL(index);
727 
728 	if (needLock)
729 		LockRelationForExtension(index, ExclusiveLock);
730 	npages = RelationGetNumberOfBlocks(index);
731 	if (needLock)
732 		UnlockRelationForExtension(index, ExclusiveLock);
733 
734 	totFreePages = 0;
735 
736 	for (blkno = GIN_ROOT_BLKNO; blkno < npages; blkno++)
737 	{
738 		Buffer		buffer;
739 		Page		page;
740 
741 		vacuum_delay_point();
742 
743 		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
744 									RBM_NORMAL, info->strategy);
745 		LockBuffer(buffer, GIN_SHARE);
746 		page = (Page) BufferGetPage(buffer);
747 
748 		if (GinPageIsRecyclable(page))
749 		{
750 			Assert(blkno != GIN_ROOT_BLKNO);
751 			RecordFreeIndexPage(index, blkno);
752 			totFreePages++;
753 		}
754 		else if (GinPageIsData(page))
755 		{
756 			idxStat.nDataPages++;
757 		}
758 		else if (!GinPageIsList(page))
759 		{
760 			idxStat.nEntryPages++;
761 
762 			if (GinPageIsLeaf(page))
763 				idxStat.nEntries += PageGetMaxOffsetNumber(page);
764 		}
765 
766 		UnlockReleaseBuffer(buffer);
767 	}
768 
769 	/* Update the metapage with accurate page and entry counts */
770 	idxStat.nTotalPages = npages;
771 	ginUpdateStats(info->index, &idxStat);
772 
773 	/* Finally, vacuum the FSM */
774 	IndexFreeSpaceMapVacuum(info->index);
775 
776 	stats->pages_free = totFreePages;
777 
778 	if (needLock)
779 		LockRelationForExtension(index, ExclusiveLock);
780 	stats->num_pages = RelationGetNumberOfBlocks(index);
781 	if (needLock)
782 		UnlockRelationForExtension(index, ExclusiveLock);
783 
784 	return stats;
785 }
786