1 /*-------------------------------------------------------------------------
2  *
3  * blvacuum.c
4  *		Bloom VACUUM functions.
5  *
6  * Copyright (c) 2016-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  contrib/bloom/blvacuum.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/genam.h"
16 #include "bloom.h"
17 #include "catalog/storage.h"
18 #include "commands/vacuum.h"
19 #include "miscadmin.h"
20 #include "postmaster/autovacuum.h"
21 #include "storage/bufmgr.h"
22 #include "storage/indexfsm.h"
23 #include "storage/lmgr.h"
24 
25 
26 /*
27  * Bulk deletion of all index entries pointing to a set of heap tuples.
28  * The set of target tuples is specified via a callback routine that tells
29  * whether any given heap tuple (identified by ItemPointer) is being deleted.
30  *
31  * Result: a palloc'd struct containing statistical info for VACUUM displays.
32  */
33 IndexBulkDeleteResult *
blbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)34 blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
35 			 IndexBulkDeleteCallback callback, void *callback_state)
36 {
37 	Relation	index = info->index;
38 	BlockNumber blkno,
39 				npages;
40 	FreeBlockNumberArray notFullPage;
41 	int			countPage = 0;
42 	BloomState	state;
43 	Buffer		buffer;
44 	Page		page;
45 	BloomMetaPageData *metaData;
46 	GenericXLogState *gxlogState;
47 
48 	if (stats == NULL)
49 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
50 
51 	initBloomState(&state, index);
52 
53 	/*
54 	 * Iterate over the pages. We don't care about concurrently added pages,
55 	 * they can't contain tuples to delete.
56 	 */
57 	npages = RelationGetNumberOfBlocks(index);
58 	for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
59 	{
60 		BloomTuple *itup,
61 				   *itupPtr,
62 				   *itupEnd;
63 
64 		vacuum_delay_point();
65 
66 		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
67 									RBM_NORMAL, info->strategy);
68 
69 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
70 		gxlogState = GenericXLogStart(index);
71 		page = GenericXLogRegisterBuffer(gxlogState, buffer, 0);
72 
73 		/* Ignore empty/deleted pages until blvacuumcleanup() */
74 		if (PageIsNew(page) || BloomPageIsDeleted(page))
75 		{
76 			UnlockReleaseBuffer(buffer);
77 			GenericXLogAbort(gxlogState);
78 			continue;
79 		}
80 
81 		/*
82 		 * Iterate over the tuples.  itup points to current tuple being
83 		 * scanned, itupPtr points to where to save next non-deleted tuple.
84 		 */
85 		itup = itupPtr = BloomPageGetTuple(&state, page, FirstOffsetNumber);
86 		itupEnd = BloomPageGetTuple(&state, page,
87 									OffsetNumberNext(BloomPageGetMaxOffset(page)));
88 		while (itup < itupEnd)
89 		{
90 			/* Do we have to delete this tuple? */
91 			if (callback(&itup->heapPtr, callback_state))
92 			{
93 				/* Yes; adjust count of tuples that will be left on page */
94 				BloomPageGetOpaque(page)->maxoff--;
95 				stats->tuples_removed += 1;
96 			}
97 			else
98 			{
99 				/* No; copy it to itupPtr++, but skip copy if not needed */
100 				if (itupPtr != itup)
101 					memmove((Pointer) itupPtr, (Pointer) itup,
102 							state.sizeOfBloomTuple);
103 				itupPtr = BloomPageGetNextTuple(&state, itupPtr);
104 			}
105 
106 			itup = BloomPageGetNextTuple(&state, itup);
107 		}
108 
109 		/* Assert that we counted correctly */
110 		Assert(itupPtr == BloomPageGetTuple(&state, page,
111 											OffsetNumberNext(BloomPageGetMaxOffset(page))));
112 
113 		/*
114 		 * Add page to new notFullPage list if we will not mark page as
115 		 * deleted and there is free space on it
116 		 */
117 		if (BloomPageGetMaxOffset(page) != 0 &&
118 			BloomPageGetFreeSpace(&state, page) >= state.sizeOfBloomTuple &&
119 			countPage < BloomMetaBlockN)
120 			notFullPage[countPage++] = blkno;
121 
122 		/* Did we delete something? */
123 		if (itupPtr != itup)
124 		{
125 			/* Is it empty page now? */
126 			if (BloomPageGetMaxOffset(page) == 0)
127 				BloomPageSetDeleted(page);
128 			/* Adjust pd_lower */
129 			((PageHeader) page)->pd_lower = (Pointer) itupPtr - page;
130 			/* Finish WAL-logging */
131 			GenericXLogFinish(gxlogState);
132 		}
133 		else
134 		{
135 			/* Didn't change anything: abort WAL-logging */
136 			GenericXLogAbort(gxlogState);
137 		}
138 		UnlockReleaseBuffer(buffer);
139 	}
140 
141 	/*
142 	 * Update the metapage's notFullPage list with whatever we found.  Our
143 	 * info could already be out of date at this point, but blinsert() will
144 	 * cope if so.
145 	 */
146 	buffer = ReadBuffer(index, BLOOM_METAPAGE_BLKNO);
147 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
148 
149 	gxlogState = GenericXLogStart(index);
150 	page = GenericXLogRegisterBuffer(gxlogState, buffer, 0);
151 
152 	metaData = BloomPageGetMeta(page);
153 	memcpy(metaData->notFullPage, notFullPage, sizeof(BlockNumber) * countPage);
154 	metaData->nStart = 0;
155 	metaData->nEnd = countPage;
156 
157 	GenericXLogFinish(gxlogState);
158 	UnlockReleaseBuffer(buffer);
159 
160 	return stats;
161 }
162 
163 /*
164  * Post-VACUUM cleanup.
165  *
166  * Result: a palloc'd struct containing statistical info for VACUUM displays.
167  */
168 IndexBulkDeleteResult *
blvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)169 blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
170 {
171 	Relation	index = info->index;
172 	BlockNumber npages,
173 				blkno;
174 
175 	if (info->analyze_only)
176 		return stats;
177 
178 	if (stats == NULL)
179 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
180 
181 	/*
182 	 * Iterate over the pages: insert deleted pages into FSM and collect
183 	 * statistics.
184 	 */
185 	npages = RelationGetNumberOfBlocks(index);
186 	stats->num_pages = npages;
187 	stats->pages_free = 0;
188 	stats->num_index_tuples = 0;
189 	for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
190 	{
191 		Buffer		buffer;
192 		Page		page;
193 
194 		vacuum_delay_point();
195 
196 		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
197 									RBM_NORMAL, info->strategy);
198 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
199 		page = (Page) BufferGetPage(buffer);
200 
201 		if (PageIsNew(page) || BloomPageIsDeleted(page))
202 		{
203 			RecordFreeIndexPage(index, blkno);
204 			stats->pages_free++;
205 		}
206 		else
207 		{
208 			stats->num_index_tuples += BloomPageGetMaxOffset(page);
209 		}
210 
211 		UnlockReleaseBuffer(buffer);
212 	}
213 
214 	IndexFreeSpaceMapVacuum(info->index);
215 
216 	return stats;
217 }
218