1 /*
2  * brin.c
3  *		Implementation of BRIN indexes for Postgres
4  *
5  * See src/backend/access/brin/README for details.
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/brin/brin.c
12  *
13  * TODO
14  *		* ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15  */
16 #include "postgres.h"
17 
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/reloptions.h"
23 #include "access/relscan.h"
24 #include "access/xloginsert.h"
25 #include "catalog/index.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "pgstat.h"
29 #include "storage/bufmgr.h"
30 #include "storage/freespace.h"
31 #include "utils/index_selfuncs.h"
32 #include "utils/memutils.h"
33 #include "utils/rel.h"
34 
35 
36 /*
37  * We use a BrinBuildState during initial construction of a BRIN index.
38  * The running state is kept in a BrinMemTuple.
39  */
40 typedef struct BrinBuildState
41 {
42 	Relation	bs_irel;
43 	int			bs_numtuples;
44 	Buffer		bs_currentInsertBuf;
45 	BlockNumber bs_pagesPerRange;
46 	BlockNumber bs_currRangeStart;
47 	BrinRevmap *bs_rmAccess;
48 	BrinDesc   *bs_bdesc;
49 	BrinMemTuple *bs_dtuple;
50 } BrinBuildState;
51 
52 /*
53  * Struct used as "opaque" during index scans
54  */
55 typedef struct BrinOpaque
56 {
57 	BlockNumber bo_pagesPerRange;
58 	BrinRevmap *bo_rmAccess;
59 	BrinDesc   *bo_bdesc;
60 } BrinOpaque;
61 
62 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
63 						   BrinRevmap *revmap, BlockNumber pagesPerRange);
64 static void terminate_brin_buildstate(BrinBuildState *state);
65 static void brinsummarize(Relation index, Relation heapRel,
66 			  double *numSummarized, double *numExisting);
67 static void form_and_insert_tuple(BrinBuildState *state);
68 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
69 			 BrinTuple *b);
70 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
71 
72 
73 /*
74  * BRIN handler function: return IndexAmRoutine with access method parameters
75  * and callbacks.
76  */
77 Datum
brinhandler(PG_FUNCTION_ARGS)78 brinhandler(PG_FUNCTION_ARGS)
79 {
80 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
81 
82 	amroutine->amstrategies = 0;
83 	amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
84 	amroutine->amcanorder = false;
85 	amroutine->amcanorderbyop = false;
86 	amroutine->amcanbackward = false;
87 	amroutine->amcanunique = false;
88 	amroutine->amcanmulticol = true;
89 	amroutine->amoptionalkey = true;
90 	amroutine->amsearcharray = false;
91 	amroutine->amsearchnulls = true;
92 	amroutine->amstorage = true;
93 	amroutine->amclusterable = false;
94 	amroutine->ampredlocks = false;
95 	amroutine->amkeytype = InvalidOid;
96 
97 	amroutine->ambuild = brinbuild;
98 	amroutine->ambuildempty = brinbuildempty;
99 	amroutine->aminsert = brininsert;
100 	amroutine->ambulkdelete = brinbulkdelete;
101 	amroutine->amvacuumcleanup = brinvacuumcleanup;
102 	amroutine->amcanreturn = NULL;
103 	amroutine->amcostestimate = brincostestimate;
104 	amroutine->amoptions = brinoptions;
105 	amroutine->amproperty = NULL;
106 	amroutine->amvalidate = brinvalidate;
107 	amroutine->ambeginscan = brinbeginscan;
108 	amroutine->amrescan = brinrescan;
109 	amroutine->amgettuple = NULL;
110 	amroutine->amgetbitmap = bringetbitmap;
111 	amroutine->amendscan = brinendscan;
112 	amroutine->ammarkpos = NULL;
113 	amroutine->amrestrpos = NULL;
114 
115 	PG_RETURN_POINTER(amroutine);
116 }
117 
118 /*
119  * A tuple in the heap is being inserted.  To keep a brin index up to date,
120  * we need to obtain the relevant index tuple and compare its stored values
121  * with those of the new tuple.  If the tuple values are not consistent with
122  * the summary tuple, we need to update the index tuple.
123  *
124  * If the range is not currently summarized (i.e. the revmap returns NULL for
125  * it), there's nothing to do.
126  */
127 bool
brininsert(Relation idxRel,Datum * values,bool * nulls,ItemPointer heaptid,Relation heapRel,IndexUniqueCheck checkUnique)128 brininsert(Relation idxRel, Datum *values, bool *nulls,
129 		   ItemPointer heaptid, Relation heapRel,
130 		   IndexUniqueCheck checkUnique)
131 {
132 	BlockNumber pagesPerRange;
133 	BrinDesc   *bdesc = NULL;
134 	BrinRevmap *revmap;
135 	Buffer		buf = InvalidBuffer;
136 	MemoryContext tupcxt = NULL;
137 	MemoryContext oldcxt = NULL;
138 
139 	revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
140 
141 	for (;;)
142 	{
143 		bool		need_insert = false;
144 		OffsetNumber off;
145 		BrinTuple  *brtup;
146 		BrinMemTuple *dtup;
147 		BlockNumber heapBlk;
148 		int			keyno;
149 
150 		CHECK_FOR_INTERRUPTS();
151 
152 		heapBlk = ItemPointerGetBlockNumber(heaptid);
153 		/* normalize the block number to be the first block in the range */
154 		heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
155 		brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
156 										 BUFFER_LOCK_SHARE, NULL);
157 
158 		/* if range is unsummarized, there's nothing to do */
159 		if (!brtup)
160 			break;
161 
162 		/* First time through? */
163 		if (bdesc == NULL)
164 		{
165 			bdesc = brin_build_desc(idxRel);
166 			tupcxt = AllocSetContextCreate(CurrentMemoryContext,
167 										   "brininsert cxt",
168 										   ALLOCSET_DEFAULT_SIZES);
169 			oldcxt = MemoryContextSwitchTo(tupcxt);
170 		}
171 
172 		dtup = brin_deform_tuple(bdesc, brtup);
173 
174 		/*
175 		 * Compare the key values of the new tuple to the stored index values;
176 		 * our deformed tuple will get updated if the new tuple doesn't fit
177 		 * the original range (note this means we can't break out of the loop
178 		 * early). Make a note of whether this happens, so that we know to
179 		 * insert the modified tuple later.
180 		 */
181 		for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
182 		{
183 			Datum		result;
184 			BrinValues *bval;
185 			FmgrInfo   *addValue;
186 
187 			bval = &dtup->bt_columns[keyno];
188 			addValue = index_getprocinfo(idxRel, keyno + 1,
189 										 BRIN_PROCNUM_ADDVALUE);
190 			result = FunctionCall4Coll(addValue,
191 									   idxRel->rd_indcollation[keyno],
192 									   PointerGetDatum(bdesc),
193 									   PointerGetDatum(bval),
194 									   values[keyno],
195 									   nulls[keyno]);
196 			/* if that returned true, we need to insert the updated tuple */
197 			need_insert |= DatumGetBool(result);
198 		}
199 
200 		if (!need_insert)
201 		{
202 			/*
203 			 * The tuple is consistent with the new values, so there's nothing
204 			 * to do.
205 			 */
206 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
207 		}
208 		else
209 		{
210 			Page		page = BufferGetPage(buf);
211 			ItemId		lp = PageGetItemId(page, off);
212 			Size		origsz;
213 			BrinTuple  *origtup;
214 			Size		newsz;
215 			BrinTuple  *newtup;
216 			bool		samepage;
217 
218 			/*
219 			 * Make a copy of the old tuple, so that we can compare it after
220 			 * re-acquiring the lock.
221 			 */
222 			origsz = ItemIdGetLength(lp);
223 			origtup = brin_copy_tuple(brtup, origsz);
224 
225 			/*
226 			 * Before releasing the lock, check if we can attempt a same-page
227 			 * update.  Another process could insert a tuple concurrently in
228 			 * the same page though, so downstream we must be prepared to cope
229 			 * if this turns out to not be possible after all.
230 			 */
231 			newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
232 			samepage = brin_can_do_samepage_update(buf, origsz, newsz);
233 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
234 
235 			/*
236 			 * Try to update the tuple.  If this doesn't work for whatever
237 			 * reason, we need to restart from the top; the revmap might be
238 			 * pointing at a different tuple for this block now, so we need to
239 			 * recompute to ensure both our new heap tuple and the other
240 			 * inserter's are covered by the combined tuple.  It might be that
241 			 * we don't need to update at all.
242 			 */
243 			if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
244 							   buf, off, origtup, origsz, newtup, newsz,
245 							   samepage))
246 			{
247 				/* no luck; start over */
248 				MemoryContextResetAndDeleteChildren(tupcxt);
249 				continue;
250 			}
251 		}
252 
253 		/* success! */
254 		break;
255 	}
256 
257 	brinRevmapTerminate(revmap);
258 	if (BufferIsValid(buf))
259 		ReleaseBuffer(buf);
260 	if (bdesc != NULL)
261 	{
262 		brin_free_desc(bdesc);
263 		MemoryContextSwitchTo(oldcxt);
264 		MemoryContextDelete(tupcxt);
265 	}
266 
267 	return false;
268 }
269 
270 /*
271  * Initialize state for a BRIN index scan.
272  *
273  * We read the metapage here to determine the pages-per-range number that this
274  * index was built with.  Note that since this cannot be changed while we're
275  * holding lock on index, it's not necessary to recompute it during brinrescan.
276  */
277 IndexScanDesc
brinbeginscan(Relation r,int nkeys,int norderbys)278 brinbeginscan(Relation r, int nkeys, int norderbys)
279 {
280 	IndexScanDesc scan;
281 	BrinOpaque *opaque;
282 
283 	scan = RelationGetIndexScan(r, nkeys, norderbys);
284 
285 	opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
286 	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
287 											   scan->xs_snapshot);
288 	opaque->bo_bdesc = brin_build_desc(r);
289 	scan->opaque = opaque;
290 
291 	return scan;
292 }
293 
294 /*
295  * Execute the index scan.
296  *
297  * This works by reading index TIDs from the revmap, and obtaining the index
298  * tuples pointed to by them; the summary values in the index tuples are
299  * compared to the scan keys.  We return into the TID bitmap all the pages in
300  * ranges corresponding to index tuples that match the scan keys.
301  *
302  * If a TID from the revmap is read as InvalidTID, we know that range is
303  * unsummarized.  Pages in those ranges need to be returned regardless of scan
304  * keys.
305  */
306 int64
bringetbitmap(IndexScanDesc scan,TIDBitmap * tbm)307 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
308 {
309 	Relation	idxRel = scan->indexRelation;
310 	Buffer		buf = InvalidBuffer;
311 	BrinDesc   *bdesc;
312 	Oid			heapOid;
313 	Relation	heapRel;
314 	BrinOpaque *opaque;
315 	BlockNumber nblocks;
316 	BlockNumber heapBlk;
317 	int			totalpages = 0;
318 	FmgrInfo   *consistentFn;
319 	MemoryContext oldcxt;
320 	MemoryContext perRangeCxt;
321 
322 	opaque = (BrinOpaque *) scan->opaque;
323 	bdesc = opaque->bo_bdesc;
324 	pgstat_count_index_scan(idxRel);
325 
326 	/*
327 	 * We need to know the size of the table so that we know how long to
328 	 * iterate on the revmap.
329 	 */
330 	heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
331 	heapRel = heap_open(heapOid, AccessShareLock);
332 	nblocks = RelationGetNumberOfBlocks(heapRel);
333 	heap_close(heapRel, AccessShareLock);
334 
335 	/*
336 	 * Make room for the consistent support procedures of indexed columns.  We
337 	 * don't look them up here; we do that lazily the first time we see a scan
338 	 * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
339 	 */
340 	consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
341 
342 	/*
343 	 * Setup and use a per-range memory context, which is reset every time we
344 	 * loop below.  This avoids having to free the tuples within the loop.
345 	 */
346 	perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
347 										"bringetbitmap cxt",
348 										ALLOCSET_DEFAULT_SIZES);
349 	oldcxt = MemoryContextSwitchTo(perRangeCxt);
350 
351 	/*
352 	 * Now scan the revmap.  We start by querying for heap page 0,
353 	 * incrementing by the number of pages per range; this gives us a full
354 	 * view of the table.
355 	 */
356 	for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
357 	{
358 		bool		addrange;
359 		BrinTuple  *tup;
360 		OffsetNumber off;
361 		Size		size;
362 
363 		CHECK_FOR_INTERRUPTS();
364 
365 		MemoryContextResetAndDeleteChildren(perRangeCxt);
366 
367 		tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
368 									   &off, &size, BUFFER_LOCK_SHARE,
369 									   scan->xs_snapshot);
370 		if (tup)
371 		{
372 			tup = brin_copy_tuple(tup, size);
373 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
374 		}
375 
376 		/*
377 		 * For page ranges with no indexed tuple, we must return the whole
378 		 * range; otherwise, compare it to the scan keys.
379 		 */
380 		if (tup == NULL)
381 		{
382 			addrange = true;
383 		}
384 		else
385 		{
386 			BrinMemTuple *dtup;
387 
388 			dtup = brin_deform_tuple(bdesc, tup);
389 			if (dtup->bt_placeholder)
390 			{
391 				/*
392 				 * Placeholder tuples are always returned, regardless of the
393 				 * values stored in them.
394 				 */
395 				addrange = true;
396 			}
397 			else
398 			{
399 				int			keyno;
400 
401 				/*
402 				 * Compare scan keys with summary values stored for the range.
403 				 * If scan keys are matched, the page range must be added to
404 				 * the bitmap.  We initially assume the range needs to be
405 				 * added; in particular this serves the case where there are
406 				 * no keys.
407 				 */
408 				addrange = true;
409 				for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
410 				{
411 					ScanKey		key = &scan->keyData[keyno];
412 					AttrNumber	keyattno = key->sk_attno;
413 					BrinValues *bval = &dtup->bt_columns[keyattno - 1];
414 					Datum		add;
415 
416 					/*
417 					 * The collation of the scan key must match the collation
418 					 * used in the index column (but only if the search is not
419 					 * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
420 					 * this index ...
421 					 */
422 					Assert((key->sk_flags & SK_ISNULL) ||
423 						   (key->sk_collation ==
424 					  bdesc->bd_tupdesc->attrs[keyattno - 1]->attcollation));
425 
426 					/* First time this column? look up consistent function */
427 					if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
428 					{
429 						FmgrInfo   *tmp;
430 
431 						tmp = index_getprocinfo(idxRel, keyattno,
432 												BRIN_PROCNUM_CONSISTENT);
433 						fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
434 									   CurrentMemoryContext);
435 					}
436 
437 					/*
438 					 * Check whether the scan key is consistent with the page
439 					 * range values; if so, have the pages in the range added
440 					 * to the output bitmap.
441 					 *
442 					 * When there are multiple scan keys, failure to meet the
443 					 * criteria for a single one of them is enough to discard
444 					 * the range as a whole, so break out of the loop as soon
445 					 * as a false return value is obtained.
446 					 */
447 					add = FunctionCall3Coll(&consistentFn[keyattno - 1],
448 											key->sk_collation,
449 											PointerGetDatum(bdesc),
450 											PointerGetDatum(bval),
451 											PointerGetDatum(key));
452 					addrange = DatumGetBool(add);
453 					if (!addrange)
454 						break;
455 				}
456 			}
457 		}
458 
459 		/* add the pages in the range to the output bitmap, if needed */
460 		if (addrange)
461 		{
462 			BlockNumber pageno;
463 
464 			for (pageno = heapBlk;
465 				 pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
466 				 pageno++)
467 			{
468 				MemoryContextSwitchTo(oldcxt);
469 				tbm_add_page(tbm, pageno);
470 				totalpages++;
471 				MemoryContextSwitchTo(perRangeCxt);
472 			}
473 		}
474 	}
475 
476 	MemoryContextSwitchTo(oldcxt);
477 	MemoryContextDelete(perRangeCxt);
478 
479 	if (buf != InvalidBuffer)
480 		ReleaseBuffer(buf);
481 
482 	/*
483 	 * XXX We have an approximation of the number of *pages* that our scan
484 	 * returns, but we don't have a precise idea of the number of heap tuples
485 	 * involved.
486 	 */
487 	return totalpages * 10;
488 }
489 
490 /*
491  * Re-initialize state for a BRIN index scan
492  */
493 void
brinrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)494 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
495 		   ScanKey orderbys, int norderbys)
496 {
497 	/*
498 	 * Other index AMs preprocess the scan keys at this point, or sometime
499 	 * early during the scan; this lets them optimize by removing redundant
500 	 * keys, or doing early returns when they are impossible to satisfy; see
501 	 * _bt_preprocess_keys for an example.  Something like that could be added
502 	 * here someday, too.
503 	 */
504 
505 	if (scankey && scan->numberOfKeys > 0)
506 		memmove(scan->keyData, scankey,
507 				scan->numberOfKeys * sizeof(ScanKeyData));
508 }
509 
510 /*
511  * Close down a BRIN index scan
512  */
513 void
brinendscan(IndexScanDesc scan)514 brinendscan(IndexScanDesc scan)
515 {
516 	BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
517 
518 	brinRevmapTerminate(opaque->bo_rmAccess);
519 	brin_free_desc(opaque->bo_bdesc);
520 	pfree(opaque);
521 }
522 
523 /*
524  * Per-heap-tuple callback for IndexBuildHeapScan.
525  *
526  * Note we don't worry about the page range at the end of the table here; it is
527  * present in the build state struct after we're called the last time, but not
528  * inserted into the index.  Caller must ensure to do so, if appropriate.
529  */
530 static void
brinbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * brstate)531 brinbuildCallback(Relation index,
532 				  HeapTuple htup,
533 				  Datum *values,
534 				  bool *isnull,
535 				  bool tupleIsAlive,
536 				  void *brstate)
537 {
538 	BrinBuildState *state = (BrinBuildState *) brstate;
539 	BlockNumber thisblock;
540 	int			i;
541 
542 	thisblock = ItemPointerGetBlockNumber(&htup->t_self);
543 
544 	/*
545 	 * If we're in a block that belongs to a future range, summarize what
546 	 * we've got and start afresh.  Note the scan might have skipped many
547 	 * pages, if they were devoid of live tuples; make sure to insert index
548 	 * tuples for those too.
549 	 */
550 	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
551 	{
552 
553 		BRIN_elog((DEBUG2,
554 				   "brinbuildCallback: completed a range: %u--%u",
555 				   state->bs_currRangeStart,
556 				   state->bs_currRangeStart + state->bs_pagesPerRange));
557 
558 		/* create the index tuple and insert it */
559 		form_and_insert_tuple(state);
560 
561 		/* set state to correspond to the next range */
562 		state->bs_currRangeStart += state->bs_pagesPerRange;
563 
564 		/* re-initialize state for it */
565 		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
566 	}
567 
568 	/* Accumulate the current tuple into the running state */
569 	for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
570 	{
571 		FmgrInfo   *addValue;
572 		BrinValues *col;
573 
574 		col = &state->bs_dtuple->bt_columns[i];
575 		addValue = index_getprocinfo(index, i + 1,
576 									 BRIN_PROCNUM_ADDVALUE);
577 
578 		/*
579 		 * Update dtuple state, if and as necessary.
580 		 */
581 		FunctionCall4Coll(addValue,
582 						  state->bs_bdesc->bd_tupdesc->attrs[i]->attcollation,
583 						  PointerGetDatum(state->bs_bdesc),
584 						  PointerGetDatum(col),
585 						  values[i], isnull[i]);
586 	}
587 }
588 
589 /*
590  * brinbuild() -- build a new BRIN index.
591  */
592 IndexBuildResult *
brinbuild(Relation heap,Relation index,IndexInfo * indexInfo)593 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
594 {
595 	IndexBuildResult *result;
596 	double		reltuples;
597 	double		idxtuples;
598 	BrinRevmap *revmap;
599 	BrinBuildState *state;
600 	Buffer		meta;
601 	BlockNumber pagesPerRange;
602 
603 	/*
604 	 * We expect to be called exactly once for any index relation.
605 	 */
606 	if (RelationGetNumberOfBlocks(index) != 0)
607 		elog(ERROR, "index \"%s\" already contains data",
608 			 RelationGetRelationName(index));
609 
610 	/*
611 	 * Critical section not required, because on error the creation of the
612 	 * whole relation will be rolled back.
613 	 */
614 
615 	meta = ReadBuffer(index, P_NEW);
616 	Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
617 	LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
618 
619 	brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
620 					   BRIN_CURRENT_VERSION);
621 	MarkBufferDirty(meta);
622 
623 	if (RelationNeedsWAL(index))
624 	{
625 		xl_brin_createidx xlrec;
626 		XLogRecPtr	recptr;
627 		Page		page;
628 
629 		xlrec.version = BRIN_CURRENT_VERSION;
630 		xlrec.pagesPerRange = BrinGetPagesPerRange(index);
631 
632 		XLogBeginInsert();
633 		XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
634 		XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT);
635 
636 		recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
637 
638 		page = BufferGetPage(meta);
639 		PageSetLSN(page, recptr);
640 	}
641 
642 	UnlockReleaseBuffer(meta);
643 
644 	/*
645 	 * Initialize our state, including the deformed tuple state.
646 	 */
647 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
648 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
649 
650 	/*
651 	 * Now scan the relation.  No syncscan allowed here because we want the
652 	 * heap blocks in physical order.
653 	 */
654 	reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
655 								   brinbuildCallback, (void *) state);
656 
657 	/* process the final batch */
658 	form_and_insert_tuple(state);
659 
660 	/* release resources */
661 	idxtuples = state->bs_numtuples;
662 	brinRevmapTerminate(state->bs_rmAccess);
663 	terminate_brin_buildstate(state);
664 
665 	/*
666 	 * Return statistics
667 	 */
668 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
669 
670 	result->heap_tuples = reltuples;
671 	result->index_tuples = idxtuples;
672 
673 	return result;
674 }
675 
676 void
brinbuildempty(Relation index)677 brinbuildempty(Relation index)
678 {
679 	Buffer		metabuf;
680 
681 	/* An empty BRIN index has a metapage only. */
682 	metabuf =
683 		ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
684 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
685 
686 	/* Initialize and xlog metabuffer. */
687 	START_CRIT_SECTION();
688 	brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
689 					   BRIN_CURRENT_VERSION);
690 	MarkBufferDirty(metabuf);
691 	log_newpage_buffer(metabuf, false);
692 	END_CRIT_SECTION();
693 
694 	UnlockReleaseBuffer(metabuf);
695 }
696 
697 /*
698  * brinbulkdelete
699  *		Since there are no per-heap-tuple index tuples in BRIN indexes,
700  *		there's not a lot we can do here.
701  *
702  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
703  * tuple is deleted), meaning the need to re-run summarization on the affected
704  * range.  Would need to add an extra flag in brintuples for that.
705  */
706 IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)707 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
708 			   IndexBulkDeleteCallback callback, void *callback_state)
709 {
710 	/* allocate stats if first time through, else re-use existing struct */
711 	if (stats == NULL)
712 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
713 
714 	return stats;
715 }
716 
717 /*
718  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
719  * ranges that are currently unsummarized.
720  */
721 IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)722 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
723 {
724 	Relation	heapRel;
725 
726 	/* No-op in ANALYZE ONLY mode */
727 	if (info->analyze_only)
728 		return stats;
729 
730 	if (!stats)
731 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
732 	stats->num_pages = RelationGetNumberOfBlocks(info->index);
733 	/* rest of stats is initialized by zeroing */
734 
735 	heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
736 						AccessShareLock);
737 
738 	brin_vacuum_scan(info->index, info->strategy);
739 
740 	brinsummarize(info->index, heapRel,
741 				  &stats->num_index_tuples, &stats->num_index_tuples);
742 
743 	heap_close(heapRel, AccessShareLock);
744 
745 	return stats;
746 }
747 
748 /*
749  * reloptions processor for BRIN indexes
750  */
751 bytea *
brinoptions(Datum reloptions,bool validate)752 brinoptions(Datum reloptions, bool validate)
753 {
754 	relopt_value *options;
755 	BrinOptions *rdopts;
756 	int			numoptions;
757 	static const relopt_parse_elt tab[] = {
758 		{"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)}
759 	};
760 
761 	options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
762 							  &numoptions);
763 
764 	/* if none set, we're done */
765 	if (numoptions == 0)
766 		return NULL;
767 
768 	rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
769 
770 	fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
771 				   validate, tab, lengthof(tab));
772 
773 	pfree(options);
774 
775 	return (bytea *) rdopts;
776 }
777 
778 /*
779  * SQL-callable function to scan through an index and summarize all ranges
780  * that are not currently summarized.
781  */
782 Datum
brin_summarize_new_values(PG_FUNCTION_ARGS)783 brin_summarize_new_values(PG_FUNCTION_ARGS)
784 {
785 	Oid			indexoid = PG_GETARG_OID(0);
786 	Oid			heapoid;
787 	Relation	indexRel;
788 	Relation	heapRel;
789 	double		numSummarized = 0;
790 
791 	if (RecoveryInProgress())
792 		ereport(ERROR,
793 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
794 				 errmsg("recovery is in progress"),
795 				 errhint("BRIN control functions cannot be executed during recovery.")));
796 
797 	/*
798 	 * We must lock table before index to avoid deadlocks.  However, if the
799 	 * passed indexoid isn't an index then IndexGetRelation() will fail.
800 	 * Rather than emitting a not-very-helpful error message, postpone
801 	 * complaining, expecting that the is-it-an-index test below will fail.
802 	 */
803 	heapoid = IndexGetRelation(indexoid, true);
804 	if (OidIsValid(heapoid))
805 		heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
806 	else
807 		heapRel = NULL;
808 
809 	indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
810 
811 	/* Must be a BRIN index */
812 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
813 		indexRel->rd_rel->relam != BRIN_AM_OID)
814 		ereport(ERROR,
815 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
816 				 errmsg("\"%s\" is not a BRIN index",
817 						RelationGetRelationName(indexRel))));
818 
819 	/* User must own the index (comparable to privileges needed for VACUUM) */
820 	if (!pg_class_ownercheck(indexoid, GetUserId()))
821 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
822 					   RelationGetRelationName(indexRel));
823 
824 	/*
825 	 * Since we did the IndexGetRelation call above without any lock, it's
826 	 * barely possible that a race against an index drop/recreation could have
827 	 * netted us the wrong table.  Recheck.
828 	 */
829 	if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
830 		ereport(ERROR,
831 				(errcode(ERRCODE_UNDEFINED_TABLE),
832 				 errmsg("could not open parent table of index %s",
833 						RelationGetRelationName(indexRel))));
834 
835 	/* OK, do it */
836 	brinsummarize(indexRel, heapRel, &numSummarized, NULL);
837 
838 	relation_close(indexRel, ShareUpdateExclusiveLock);
839 	relation_close(heapRel, ShareUpdateExclusiveLock);
840 
841 	PG_RETURN_INT32((int32) numSummarized);
842 }
843 
844 /*
845  * Build a BrinDesc used to create or scan a BRIN index
846  */
847 BrinDesc *
brin_build_desc(Relation rel)848 brin_build_desc(Relation rel)
849 {
850 	BrinOpcInfo **opcinfo;
851 	BrinDesc   *bdesc;
852 	TupleDesc	tupdesc;
853 	int			totalstored = 0;
854 	int			keyno;
855 	long		totalsize;
856 	MemoryContext cxt;
857 	MemoryContext oldcxt;
858 
859 	cxt = AllocSetContextCreate(CurrentMemoryContext,
860 								"brin desc cxt",
861 								ALLOCSET_SMALL_SIZES);
862 	oldcxt = MemoryContextSwitchTo(cxt);
863 	tupdesc = RelationGetDescr(rel);
864 
865 	/*
866 	 * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
867 	 * the number of columns stored, since the number is opclass-defined.
868 	 */
869 	opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
870 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
871 	{
872 		FmgrInfo   *opcInfoFn;
873 
874 		opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
875 
876 		opcinfo[keyno] = (BrinOpcInfo *)
877 			DatumGetPointer(FunctionCall1(opcInfoFn,
878 										  tupdesc->attrs[keyno]->atttypid));
879 		totalstored += opcinfo[keyno]->oi_nstored;
880 	}
881 
882 	/* Allocate our result struct and fill it in */
883 	totalsize = offsetof(BrinDesc, bd_info) +
884 		sizeof(BrinOpcInfo *) * tupdesc->natts;
885 
886 	bdesc = palloc(totalsize);
887 	bdesc->bd_context = cxt;
888 	bdesc->bd_index = rel;
889 	bdesc->bd_tupdesc = tupdesc;
890 	bdesc->bd_disktdesc = NULL; /* generated lazily */
891 	bdesc->bd_totalstored = totalstored;
892 
893 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
894 		bdesc->bd_info[keyno] = opcinfo[keyno];
895 	pfree(opcinfo);
896 
897 	MemoryContextSwitchTo(oldcxt);
898 
899 	return bdesc;
900 }
901 
902 void
brin_free_desc(BrinDesc * bdesc)903 brin_free_desc(BrinDesc *bdesc)
904 {
905 	/* make sure the tupdesc is still valid */
906 	Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
907 	/* no need for retail pfree */
908 	MemoryContextDelete(bdesc->bd_context);
909 }
910 
911 /*
912  * Initialize a BrinBuildState appropriate to create tuples on the given index.
913  */
914 static BrinBuildState *
initialize_brin_buildstate(Relation idxRel,BrinRevmap * revmap,BlockNumber pagesPerRange)915 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
916 						   BlockNumber pagesPerRange)
917 {
918 	BrinBuildState *state;
919 
920 	state = palloc(sizeof(BrinBuildState));
921 
922 	state->bs_irel = idxRel;
923 	state->bs_numtuples = 0;
924 	state->bs_currentInsertBuf = InvalidBuffer;
925 	state->bs_pagesPerRange = pagesPerRange;
926 	state->bs_currRangeStart = 0;
927 	state->bs_rmAccess = revmap;
928 	state->bs_bdesc = brin_build_desc(idxRel);
929 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
930 
931 	brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
932 
933 	return state;
934 }
935 
936 /*
937  * Release resources associated with a BrinBuildState.
938  */
939 static void
terminate_brin_buildstate(BrinBuildState * state)940 terminate_brin_buildstate(BrinBuildState *state)
941 {
942 	/* release the last index buffer used */
943 	if (!BufferIsInvalid(state->bs_currentInsertBuf))
944 	{
945 		Page		page;
946 
947 		page = BufferGetPage(state->bs_currentInsertBuf);
948 		RecordPageWithFreeSpace(state->bs_irel,
949 							BufferGetBlockNumber(state->bs_currentInsertBuf),
950 								PageGetFreeSpace(page));
951 		ReleaseBuffer(state->bs_currentInsertBuf);
952 	}
953 
954 	brin_free_desc(state->bs_bdesc);
955 	pfree(state->bs_dtuple);
956 	pfree(state);
957 }
958 
959 /*
960  * On the given BRIN index, summarize the heap page range that corresponds
961  * to the heap block number given.
962  *
963  * This routine can run in parallel with insertions into the heap.  To avoid
964  * missing those values from the summary tuple, we first insert a placeholder
965  * index tuple into the index, then execute the heap scan; transactions
966  * concurrent with the scan update the placeholder tuple.  After the scan, we
967  * union the placeholder tuple with the one computed by this routine.  The
968  * update of the index value happens in a loop, so that if somebody updates
969  * the placeholder tuple after we read it, we detect the case and try again.
970  * This ensures that the concurrently inserted tuples are not lost.
971  *
972  * A further corner case is this routine being asked to summarize the partial
973  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
974  * table size; if we notice that the requested range lies beyond that size,
975  * we re-compute the table size after inserting the placeholder tuple, to
976  * avoid missing pages that were appended recently.
977  */
978 static void
summarize_range(IndexInfo * indexInfo,BrinBuildState * state,Relation heapRel,BlockNumber heapBlk,BlockNumber heapNumBlks)979 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
980 				BlockNumber heapBlk, BlockNumber heapNumBlks)
981 {
982 	Buffer		phbuf;
983 	BrinTuple  *phtup;
984 	Size		phsz;
985 	OffsetNumber offset;
986 	BlockNumber scanNumBlks;
987 
988 	/*
989 	 * Insert the placeholder tuple
990 	 */
991 	phbuf = InvalidBuffer;
992 	phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
993 	offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
994 						   state->bs_rmAccess, &phbuf,
995 						   heapBlk, phtup, phsz);
996 
997 	/*
998 	 * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
999 	 * cannot shrink concurrently (but it can grow).
1000 	 */
1001 	Assert(heapBlk % state->bs_pagesPerRange == 0);
1002 	if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1003 	{
1004 		/*
1005 		 * If we're asked to scan what we believe to be the final range on the
1006 		 * table (i.e. a range that might be partial) we need to recompute our
1007 		 * idea of what the latest page is after inserting the placeholder
1008 		 * tuple.  Anyone that grows the table later will update the
1009 		 * placeholder tuple, so it doesn't matter that we won't scan these
1010 		 * pages ourselves.  Careful: the table might have been extended
1011 		 * beyond the current range, so clamp our result.
1012 		 *
1013 		 * Fortunately, this should occur infrequently.
1014 		 */
1015 		scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1016 						  state->bs_pagesPerRange);
1017 	}
1018 	else
1019 	{
1020 		/* Easy case: range is known to be complete */
1021 		scanNumBlks = state->bs_pagesPerRange;
1022 	}
1023 
1024 	/*
1025 	 * Execute the partial heap scan covering the heap blocks in the specified
1026 	 * page range, summarizing the heap tuples in it.  This scan stops just
1027 	 * short of brinbuildCallback creating the new index entry.
1028 	 *
1029 	 * Note that it is critical we use the "any visible" mode of
1030 	 * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
1031 	 * by transactions that are still in progress, among other corner cases.
1032 	 */
1033 	state->bs_currRangeStart = heapBlk;
1034 	IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
1035 							heapBlk, scanNumBlks,
1036 							brinbuildCallback, (void *) state);
1037 
1038 	/*
1039 	 * Now we update the values obtained by the scan with the placeholder
1040 	 * tuple.  We do this in a loop which only terminates if we're able to
1041 	 * update the placeholder tuple successfully; if we are not, this means
1042 	 * somebody else modified the placeholder tuple after we read it.
1043 	 */
1044 	for (;;)
1045 	{
1046 		BrinTuple  *newtup;
1047 		Size		newsize;
1048 		bool		didupdate;
1049 		bool		samepage;
1050 
1051 		CHECK_FOR_INTERRUPTS();
1052 
1053 		/*
1054 		 * Update the summary tuple and try to update.
1055 		 */
1056 		newtup = brin_form_tuple(state->bs_bdesc,
1057 								 heapBlk, state->bs_dtuple, &newsize);
1058 		samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1059 		didupdate =
1060 			brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1061 						  state->bs_rmAccess, heapBlk, phbuf, offset,
1062 						  phtup, phsz, newtup, newsize, samepage);
1063 		brin_free_tuple(phtup);
1064 		brin_free_tuple(newtup);
1065 
1066 		/* If the update succeeded, we're done. */
1067 		if (didupdate)
1068 			break;
1069 
1070 		/*
1071 		 * If the update didn't work, it might be because somebody updated the
1072 		 * placeholder tuple concurrently.  Extract the new version, union it
1073 		 * with the values we have from the scan, and start over.  (There are
1074 		 * other reasons for the update to fail, but it's simple to treat them
1075 		 * the same.)
1076 		 */
1077 		phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1078 										 &offset, &phsz, BUFFER_LOCK_SHARE,
1079 										 NULL);
1080 		/* the placeholder tuple must exist */
1081 		if (phtup == NULL)
1082 			elog(ERROR, "missing placeholder tuple");
1083 		phtup = brin_copy_tuple(phtup, phsz);
1084 		LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1085 
1086 		/* merge it into the tuple from the heap scan */
1087 		union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1088 	}
1089 
1090 	ReleaseBuffer(phbuf);
1091 }
1092 
1093 /*
1094  * Scan a complete BRIN index, and summarize each page range that's not already
1095  * summarized.  The index and heap must have been locked by caller in at
1096  * least ShareUpdateExclusiveLock mode.
1097  *
1098  * For each new index tuple inserted, *numSummarized (if not NULL) is
1099  * incremented; for each existing tuple, *numExisting (if not NULL) is
1100  * incremented.
1101  */
1102 static void
brinsummarize(Relation index,Relation heapRel,double * numSummarized,double * numExisting)1103 brinsummarize(Relation index, Relation heapRel, double *numSummarized,
1104 			  double *numExisting)
1105 {
1106 	BrinRevmap *revmap;
1107 	BrinBuildState *state = NULL;
1108 	IndexInfo  *indexInfo = NULL;
1109 	BlockNumber heapNumBlocks;
1110 	BlockNumber pagesPerRange;
1111 	Buffer		buf;
1112 	BlockNumber startBlk;
1113 
1114 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1115 
1116 	/* determine range of pages to process: always start from the beginning */
1117 	heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1118 	startBlk = 0;
1119 
1120 	/*
1121 	 * Scan the revmap to find unsummarized items.
1122 	 */
1123 	buf = InvalidBuffer;
1124 	for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1125 	{
1126 		BrinTuple  *tup;
1127 		OffsetNumber off;
1128 
1129 		/*
1130 		 * Go away now if we think the next range is partial.
1131 		 */
1132 		if (startBlk + pagesPerRange > heapNumBlocks)
1133 			break;
1134 
1135 		CHECK_FOR_INTERRUPTS();
1136 
1137 		tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1138 									   BUFFER_LOCK_SHARE, NULL);
1139 		if (tup == NULL)
1140 		{
1141 			/* no revmap entry for this heap range. Summarize it. */
1142 			if (state == NULL)
1143 			{
1144 				/* first time through */
1145 				Assert(!indexInfo);
1146 				state = initialize_brin_buildstate(index, revmap,
1147 												   pagesPerRange);
1148 				indexInfo = BuildIndexInfo(index);
1149 			}
1150 			summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1151 
1152 			/* and re-initialize state for the next range */
1153 			brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1154 
1155 			if (numSummarized)
1156 				*numSummarized += 1.0;
1157 		}
1158 		else
1159 		{
1160 			if (numExisting)
1161 				*numExisting += 1.0;
1162 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1163 		}
1164 	}
1165 
1166 	if (BufferIsValid(buf))
1167 		ReleaseBuffer(buf);
1168 
1169 	/* free resources */
1170 	brinRevmapTerminate(revmap);
1171 	if (state)
1172 	{
1173 		terminate_brin_buildstate(state);
1174 		pfree(indexInfo);
1175 	}
1176 }
1177 
1178 /*
1179  * Given a deformed tuple in the build state, convert it into the on-disk
1180  * format and insert it into the index, making the revmap point to it.
1181  */
1182 static void
form_and_insert_tuple(BrinBuildState * state)1183 form_and_insert_tuple(BrinBuildState *state)
1184 {
1185 	BrinTuple  *tup;
1186 	Size		size;
1187 
1188 	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1189 						  state->bs_dtuple, &size);
1190 	brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1191 				  &state->bs_currentInsertBuf, state->bs_currRangeStart,
1192 				  tup, size);
1193 	state->bs_numtuples++;
1194 
1195 	pfree(tup);
1196 }
1197 
1198 /*
1199  * Given two deformed tuples, adjust the first one so that it's consistent
1200  * with the summary values in both.
1201  */
1202 static void
union_tuples(BrinDesc * bdesc,BrinMemTuple * a,BrinTuple * b)1203 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1204 {
1205 	int			keyno;
1206 	BrinMemTuple *db;
1207 	MemoryContext cxt;
1208 	MemoryContext oldcxt;
1209 
1210 	/* Use our own memory context to avoid retail pfree */
1211 	cxt = AllocSetContextCreate(CurrentMemoryContext,
1212 								"brin union",
1213 								ALLOCSET_DEFAULT_SIZES);
1214 	oldcxt = MemoryContextSwitchTo(cxt);
1215 	db = brin_deform_tuple(bdesc, b);
1216 	MemoryContextSwitchTo(oldcxt);
1217 
1218 	for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1219 	{
1220 		FmgrInfo   *unionFn;
1221 		BrinValues *col_a = &a->bt_columns[keyno];
1222 		BrinValues *col_b = &db->bt_columns[keyno];
1223 
1224 		unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1225 									BRIN_PROCNUM_UNION);
1226 		FunctionCall3Coll(unionFn,
1227 						  bdesc->bd_index->rd_indcollation[keyno],
1228 						  PointerGetDatum(bdesc),
1229 						  PointerGetDatum(col_a),
1230 						  PointerGetDatum(col_b));
1231 	}
1232 
1233 	MemoryContextDelete(cxt);
1234 }
1235 
1236 /*
1237  * brin_vacuum_scan
1238  *		Do a complete scan of the index during VACUUM.
1239  *
1240  * This routine scans the complete index looking for uncatalogued index pages,
1241  * i.e. those that might have been lost due to a crash after index extension
1242  * and such.
1243  */
1244 static void
brin_vacuum_scan(Relation idxrel,BufferAccessStrategy strategy)1245 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1246 {
1247 	bool		vacuum_fsm = false;
1248 	BlockNumber blkno;
1249 
1250 	/*
1251 	 * Scan the index in physical order, and clean up any possible mess in
1252 	 * each page.
1253 	 */
1254 	for (blkno = 0; blkno < RelationGetNumberOfBlocks(idxrel); blkno++)
1255 	{
1256 		Buffer		buf;
1257 
1258 		CHECK_FOR_INTERRUPTS();
1259 
1260 		buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1261 								 RBM_NORMAL, strategy);
1262 
1263 		vacuum_fsm |= brin_page_cleanup(idxrel, buf);
1264 
1265 		ReleaseBuffer(buf);
1266 	}
1267 
1268 	/*
1269 	 * If we made any change to the FSM, make sure the new info is visible all
1270 	 * the way to the top.
1271 	 */
1272 	if (vacuum_fsm)
1273 		FreeSpaceMapVacuum(idxrel);
1274 }
1275