1 /*
2  * brin.c
3  *		Implementation of BRIN indexes for Postgres
4  *
5  * See src/backend/access/brin/README for details.
6  *
7  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/brin/brin.c
12  *
13  * TODO
14  *		* ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15  */
16 #include "postgres.h"
17 
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/relation.h"
23 #include "access/reloptions.h"
24 #include "access/relscan.h"
25 #include "access/table.h"
26 #include "access/tableam.h"
27 #include "access/xloginsert.h"
28 #include "catalog/index.h"
29 #include "catalog/pg_am.h"
30 #include "commands/vacuum.h"
31 #include "miscadmin.h"
32 #include "pgstat.h"
33 #include "postmaster/autovacuum.h"
34 #include "storage/bufmgr.h"
35 #include "storage/freespace.h"
36 #include "utils/acl.h"
37 #include "utils/builtins.h"
38 #include "utils/index_selfuncs.h"
39 #include "utils/memutils.h"
40 #include "utils/rel.h"
41 
42 
43 /*
44  * We use a BrinBuildState during initial construction of a BRIN index.
45  * The running state is kept in a BrinMemTuple.
46  */
47 typedef struct BrinBuildState
48 {
49 	Relation	bs_irel;
50 	int			bs_numtuples;
51 	Buffer		bs_currentInsertBuf;
52 	BlockNumber bs_pagesPerRange;
53 	BlockNumber bs_currRangeStart;
54 	BrinRevmap *bs_rmAccess;
55 	BrinDesc   *bs_bdesc;
56 	BrinMemTuple *bs_dtuple;
57 } BrinBuildState;
58 
59 /*
60  * Struct used as "opaque" during index scans
61  */
62 typedef struct BrinOpaque
63 {
64 	BlockNumber bo_pagesPerRange;
65 	BrinRevmap *bo_rmAccess;
66 	BrinDesc   *bo_bdesc;
67 } BrinOpaque;
68 
69 #define BRIN_ALL_BLOCKRANGES	InvalidBlockNumber
70 
71 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
72 												  BrinRevmap *revmap, BlockNumber pagesPerRange);
73 static void terminate_brin_buildstate(BrinBuildState *state);
74 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
75 						  bool include_partial, double *numSummarized, double *numExisting);
76 static void form_and_insert_tuple(BrinBuildState *state);
77 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
78 						 BrinTuple *b);
79 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
80 
81 
82 /*
83  * BRIN handler function: return IndexAmRoutine with access method parameters
84  * and callbacks.
85  */
86 Datum
brinhandler(PG_FUNCTION_ARGS)87 brinhandler(PG_FUNCTION_ARGS)
88 {
89 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
90 
91 	amroutine->amstrategies = 0;
92 	amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
93 	amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
94 	amroutine->amcanorder = false;
95 	amroutine->amcanorderbyop = false;
96 	amroutine->amcanbackward = false;
97 	amroutine->amcanunique = false;
98 	amroutine->amcanmulticol = true;
99 	amroutine->amoptionalkey = true;
100 	amroutine->amsearcharray = false;
101 	amroutine->amsearchnulls = true;
102 	amroutine->amstorage = true;
103 	amroutine->amclusterable = false;
104 	amroutine->ampredlocks = false;
105 	amroutine->amcanparallel = false;
106 	amroutine->amcaninclude = false;
107 	amroutine->amusemaintenanceworkmem = false;
108 	amroutine->amparallelvacuumoptions =
109 		VACUUM_OPTION_PARALLEL_CLEANUP;
110 	amroutine->amkeytype = InvalidOid;
111 
112 	amroutine->ambuild = brinbuild;
113 	amroutine->ambuildempty = brinbuildempty;
114 	amroutine->aminsert = brininsert;
115 	amroutine->ambulkdelete = brinbulkdelete;
116 	amroutine->amvacuumcleanup = brinvacuumcleanup;
117 	amroutine->amcanreturn = NULL;
118 	amroutine->amcostestimate = brincostestimate;
119 	amroutine->amoptions = brinoptions;
120 	amroutine->amproperty = NULL;
121 	amroutine->ambuildphasename = NULL;
122 	amroutine->amvalidate = brinvalidate;
123 	amroutine->ambeginscan = brinbeginscan;
124 	amroutine->amrescan = brinrescan;
125 	amroutine->amgettuple = NULL;
126 	amroutine->amgetbitmap = bringetbitmap;
127 	amroutine->amendscan = brinendscan;
128 	amroutine->ammarkpos = NULL;
129 	amroutine->amrestrpos = NULL;
130 	amroutine->amestimateparallelscan = NULL;
131 	amroutine->aminitparallelscan = NULL;
132 	amroutine->amparallelrescan = NULL;
133 
134 	PG_RETURN_POINTER(amroutine);
135 }
136 
137 /*
138  * A tuple in the heap is being inserted.  To keep a brin index up to date,
139  * we need to obtain the relevant index tuple and compare its stored values
140  * with those of the new tuple.  If the tuple values are not consistent with
141  * the summary tuple, we need to update the index tuple.
142  *
143  * If autosummarization is enabled, check if we need to summarize the previous
144  * page range.
145  *
146  * If the range is not currently summarized (i.e. the revmap returns NULL for
147  * it), there's nothing to do for this tuple.
148  */
149 bool
brininsert(Relation idxRel,Datum * values,bool * nulls,ItemPointer heaptid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)150 brininsert(Relation idxRel, Datum *values, bool *nulls,
151 		   ItemPointer heaptid, Relation heapRel,
152 		   IndexUniqueCheck checkUnique,
153 		   IndexInfo *indexInfo)
154 {
155 	BlockNumber pagesPerRange;
156 	BlockNumber origHeapBlk;
157 	BlockNumber heapBlk;
158 	BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
159 	BrinRevmap *revmap;
160 	Buffer		buf = InvalidBuffer;
161 	MemoryContext tupcxt = NULL;
162 	MemoryContext oldcxt = CurrentMemoryContext;
163 	bool		autosummarize = BrinGetAutoSummarize(idxRel);
164 
165 	revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
166 
167 	/*
168 	 * origHeapBlk is the block number where the insertion occurred.  heapBlk
169 	 * is the first block in the corresponding page range.
170 	 */
171 	origHeapBlk = ItemPointerGetBlockNumber(heaptid);
172 	heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
173 
174 	for (;;)
175 	{
176 		bool		need_insert = false;
177 		OffsetNumber off;
178 		BrinTuple  *brtup;
179 		BrinMemTuple *dtup;
180 		int			keyno;
181 
182 		CHECK_FOR_INTERRUPTS();
183 
184 		/*
185 		 * If auto-summarization is enabled and we just inserted the first
186 		 * tuple into the first block of a new non-first page range, request a
187 		 * summarization run of the previous range.
188 		 */
189 		if (autosummarize &&
190 			heapBlk > 0 &&
191 			heapBlk == origHeapBlk &&
192 			ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
193 		{
194 			BlockNumber lastPageRange = heapBlk - 1;
195 			BrinTuple  *lastPageTuple;
196 
197 			lastPageTuple =
198 				brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
199 										 NULL, BUFFER_LOCK_SHARE, NULL);
200 			if (!lastPageTuple)
201 			{
202 				bool		recorded;
203 
204 				recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
205 												 RelationGetRelid(idxRel),
206 												 lastPageRange);
207 				if (!recorded)
208 					ereport(LOG,
209 							(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
210 							 errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
211 									RelationGetRelationName(idxRel),
212 									lastPageRange)));
213 			}
214 			else
215 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
216 		}
217 
218 		brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
219 										 NULL, BUFFER_LOCK_SHARE, NULL);
220 
221 		/* if range is unsummarized, there's nothing to do */
222 		if (!brtup)
223 			break;
224 
225 		/* First time through in this statement? */
226 		if (bdesc == NULL)
227 		{
228 			MemoryContextSwitchTo(indexInfo->ii_Context);
229 			bdesc = brin_build_desc(idxRel);
230 			indexInfo->ii_AmCache = (void *) bdesc;
231 			MemoryContextSwitchTo(oldcxt);
232 		}
233 		/* First time through in this brininsert call? */
234 		if (tupcxt == NULL)
235 		{
236 			tupcxt = AllocSetContextCreate(CurrentMemoryContext,
237 										   "brininsert cxt",
238 										   ALLOCSET_DEFAULT_SIZES);
239 			MemoryContextSwitchTo(tupcxt);
240 		}
241 
242 		dtup = brin_deform_tuple(bdesc, brtup, NULL);
243 
244 		/*
245 		 * Compare the key values of the new tuple to the stored index values;
246 		 * our deformed tuple will get updated if the new tuple doesn't fit
247 		 * the original range (note this means we can't break out of the loop
248 		 * early). Make a note of whether this happens, so that we know to
249 		 * insert the modified tuple later.
250 		 */
251 		for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
252 		{
253 			Datum		result;
254 			BrinValues *bval;
255 			FmgrInfo   *addValue;
256 
257 			bval = &dtup->bt_columns[keyno];
258 			addValue = index_getprocinfo(idxRel, keyno + 1,
259 										 BRIN_PROCNUM_ADDVALUE);
260 			result = FunctionCall4Coll(addValue,
261 									   idxRel->rd_indcollation[keyno],
262 									   PointerGetDatum(bdesc),
263 									   PointerGetDatum(bval),
264 									   values[keyno],
265 									   nulls[keyno]);
266 			/* if that returned true, we need to insert the updated tuple */
267 			need_insert |= DatumGetBool(result);
268 		}
269 
270 		if (!need_insert)
271 		{
272 			/*
273 			 * The tuple is consistent with the new values, so there's nothing
274 			 * to do.
275 			 */
276 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
277 		}
278 		else
279 		{
280 			Page		page = BufferGetPage(buf);
281 			ItemId		lp = PageGetItemId(page, off);
282 			Size		origsz;
283 			BrinTuple  *origtup;
284 			Size		newsz;
285 			BrinTuple  *newtup;
286 			bool		samepage;
287 
288 			/*
289 			 * Make a copy of the old tuple, so that we can compare it after
290 			 * re-acquiring the lock.
291 			 */
292 			origsz = ItemIdGetLength(lp);
293 			origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
294 
295 			/*
296 			 * Before releasing the lock, check if we can attempt a same-page
297 			 * update.  Another process could insert a tuple concurrently in
298 			 * the same page though, so downstream we must be prepared to cope
299 			 * if this turns out to not be possible after all.
300 			 */
301 			newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
302 			samepage = brin_can_do_samepage_update(buf, origsz, newsz);
303 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
304 
305 			/*
306 			 * Try to update the tuple.  If this doesn't work for whatever
307 			 * reason, we need to restart from the top; the revmap might be
308 			 * pointing at a different tuple for this block now, so we need to
309 			 * recompute to ensure both our new heap tuple and the other
310 			 * inserter's are covered by the combined tuple.  It might be that
311 			 * we don't need to update at all.
312 			 */
313 			if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
314 							   buf, off, origtup, origsz, newtup, newsz,
315 							   samepage))
316 			{
317 				/* no luck; start over */
318 				MemoryContextResetAndDeleteChildren(tupcxt);
319 				continue;
320 			}
321 		}
322 
323 		/* success! */
324 		break;
325 	}
326 
327 	brinRevmapTerminate(revmap);
328 	if (BufferIsValid(buf))
329 		ReleaseBuffer(buf);
330 	MemoryContextSwitchTo(oldcxt);
331 	if (tupcxt != NULL)
332 		MemoryContextDelete(tupcxt);
333 
334 	return false;
335 }
336 
337 /*
338  * Initialize state for a BRIN index scan.
339  *
340  * We read the metapage here to determine the pages-per-range number that this
341  * index was built with.  Note that since this cannot be changed while we're
342  * holding lock on index, it's not necessary to recompute it during brinrescan.
343  */
344 IndexScanDesc
brinbeginscan(Relation r,int nkeys,int norderbys)345 brinbeginscan(Relation r, int nkeys, int norderbys)
346 {
347 	IndexScanDesc scan;
348 	BrinOpaque *opaque;
349 
350 	scan = RelationGetIndexScan(r, nkeys, norderbys);
351 
352 	opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
353 	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
354 											   scan->xs_snapshot);
355 	opaque->bo_bdesc = brin_build_desc(r);
356 	scan->opaque = opaque;
357 
358 	return scan;
359 }
360 
361 /*
362  * Execute the index scan.
363  *
364  * This works by reading index TIDs from the revmap, and obtaining the index
365  * tuples pointed to by them; the summary values in the index tuples are
366  * compared to the scan keys.  We return into the TID bitmap all the pages in
367  * ranges corresponding to index tuples that match the scan keys.
368  *
369  * If a TID from the revmap is read as InvalidTID, we know that range is
370  * unsummarized.  Pages in those ranges need to be returned regardless of scan
371  * keys.
372  */
373 int64
bringetbitmap(IndexScanDesc scan,TIDBitmap * tbm)374 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
375 {
376 	Relation	idxRel = scan->indexRelation;
377 	Buffer		buf = InvalidBuffer;
378 	BrinDesc   *bdesc;
379 	Oid			heapOid;
380 	Relation	heapRel;
381 	BrinOpaque *opaque;
382 	BlockNumber nblocks;
383 	BlockNumber heapBlk;
384 	int			totalpages = 0;
385 	FmgrInfo   *consistentFn;
386 	MemoryContext oldcxt;
387 	MemoryContext perRangeCxt;
388 	BrinMemTuple *dtup;
389 	BrinTuple  *btup = NULL;
390 	Size		btupsz = 0;
391 
392 	opaque = (BrinOpaque *) scan->opaque;
393 	bdesc = opaque->bo_bdesc;
394 	pgstat_count_index_scan(idxRel);
395 
396 	/*
397 	 * We need to know the size of the table so that we know how long to
398 	 * iterate on the revmap.
399 	 */
400 	heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
401 	heapRel = table_open(heapOid, AccessShareLock);
402 	nblocks = RelationGetNumberOfBlocks(heapRel);
403 	table_close(heapRel, AccessShareLock);
404 
405 	/*
406 	 * Make room for the consistent support procedures of indexed columns.  We
407 	 * don't look them up here; we do that lazily the first time we see a scan
408 	 * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
409 	 */
410 	consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
411 
412 	/* allocate an initial in-memory tuple, out of the per-range memcxt */
413 	dtup = brin_new_memtuple(bdesc);
414 
415 	/*
416 	 * Setup and use a per-range memory context, which is reset every time we
417 	 * loop below.  This avoids having to free the tuples within the loop.
418 	 */
419 	perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
420 										"bringetbitmap cxt",
421 										ALLOCSET_DEFAULT_SIZES);
422 	oldcxt = MemoryContextSwitchTo(perRangeCxt);
423 
424 	/*
425 	 * Now scan the revmap.  We start by querying for heap page 0,
426 	 * incrementing by the number of pages per range; this gives us a full
427 	 * view of the table.
428 	 */
429 	for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
430 	{
431 		bool		addrange;
432 		bool		gottuple = false;
433 		BrinTuple  *tup;
434 		OffsetNumber off;
435 		Size		size;
436 
437 		CHECK_FOR_INTERRUPTS();
438 
439 		MemoryContextResetAndDeleteChildren(perRangeCxt);
440 
441 		tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
442 									   &off, &size, BUFFER_LOCK_SHARE,
443 									   scan->xs_snapshot);
444 		if (tup)
445 		{
446 			gottuple = true;
447 			btup = brin_copy_tuple(tup, size, btup, &btupsz);
448 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
449 		}
450 
451 		/*
452 		 * For page ranges with no indexed tuple, we must return the whole
453 		 * range; otherwise, compare it to the scan keys.
454 		 */
455 		if (!gottuple)
456 		{
457 			addrange = true;
458 		}
459 		else
460 		{
461 			dtup = brin_deform_tuple(bdesc, btup, dtup);
462 			if (dtup->bt_placeholder)
463 			{
464 				/*
465 				 * Placeholder tuples are always returned, regardless of the
466 				 * values stored in them.
467 				 */
468 				addrange = true;
469 			}
470 			else
471 			{
472 				int			keyno;
473 
474 				/*
475 				 * Compare scan keys with summary values stored for the range.
476 				 * If scan keys are matched, the page range must be added to
477 				 * the bitmap.  We initially assume the range needs to be
478 				 * added; in particular this serves the case where there are
479 				 * no keys.
480 				 */
481 				addrange = true;
482 				for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
483 				{
484 					ScanKey		key = &scan->keyData[keyno];
485 					AttrNumber	keyattno = key->sk_attno;
486 					BrinValues *bval = &dtup->bt_columns[keyattno - 1];
487 					Datum		add;
488 
489 					/*
490 					 * The collation of the scan key must match the collation
491 					 * used in the index column (but only if the search is not
492 					 * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
493 					 * this index ...
494 					 */
495 					Assert((key->sk_flags & SK_ISNULL) ||
496 						   (key->sk_collation ==
497 							TupleDescAttr(bdesc->bd_tupdesc,
498 										  keyattno - 1)->attcollation));
499 
500 					/* First time this column? look up consistent function */
501 					if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
502 					{
503 						FmgrInfo   *tmp;
504 
505 						tmp = index_getprocinfo(idxRel, keyattno,
506 												BRIN_PROCNUM_CONSISTENT);
507 						fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
508 									   CurrentMemoryContext);
509 					}
510 
511 					/*
512 					 * Check whether the scan key is consistent with the page
513 					 * range values; if so, have the pages in the range added
514 					 * to the output bitmap.
515 					 *
516 					 * When there are multiple scan keys, failure to meet the
517 					 * criteria for a single one of them is enough to discard
518 					 * the range as a whole, so break out of the loop as soon
519 					 * as a false return value is obtained.
520 					 */
521 					add = FunctionCall3Coll(&consistentFn[keyattno - 1],
522 											key->sk_collation,
523 											PointerGetDatum(bdesc),
524 											PointerGetDatum(bval),
525 											PointerGetDatum(key));
526 					addrange = DatumGetBool(add);
527 					if (!addrange)
528 						break;
529 				}
530 			}
531 		}
532 
533 		/* add the pages in the range to the output bitmap, if needed */
534 		if (addrange)
535 		{
536 			BlockNumber pageno;
537 
538 			for (pageno = heapBlk;
539 				 pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
540 				 pageno++)
541 			{
542 				MemoryContextSwitchTo(oldcxt);
543 				tbm_add_page(tbm, pageno);
544 				totalpages++;
545 				MemoryContextSwitchTo(perRangeCxt);
546 			}
547 		}
548 	}
549 
550 	MemoryContextSwitchTo(oldcxt);
551 	MemoryContextDelete(perRangeCxt);
552 
553 	if (buf != InvalidBuffer)
554 		ReleaseBuffer(buf);
555 
556 	/*
557 	 * XXX We have an approximation of the number of *pages* that our scan
558 	 * returns, but we don't have a precise idea of the number of heap tuples
559 	 * involved.
560 	 */
561 	return totalpages * 10;
562 }
563 
564 /*
565  * Re-initialize state for a BRIN index scan
566  */
567 void
brinrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)568 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
569 		   ScanKey orderbys, int norderbys)
570 {
571 	/*
572 	 * Other index AMs preprocess the scan keys at this point, or sometime
573 	 * early during the scan; this lets them optimize by removing redundant
574 	 * keys, or doing early returns when they are impossible to satisfy; see
575 	 * _bt_preprocess_keys for an example.  Something like that could be added
576 	 * here someday, too.
577 	 */
578 
579 	if (scankey && scan->numberOfKeys > 0)
580 		memmove(scan->keyData, scankey,
581 				scan->numberOfKeys * sizeof(ScanKeyData));
582 }
583 
584 /*
585  * Close down a BRIN index scan
586  */
587 void
brinendscan(IndexScanDesc scan)588 brinendscan(IndexScanDesc scan)
589 {
590 	BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
591 
592 	brinRevmapTerminate(opaque->bo_rmAccess);
593 	brin_free_desc(opaque->bo_bdesc);
594 	pfree(opaque);
595 }
596 
597 /*
598  * Per-heap-tuple callback for table_index_build_scan.
599  *
600  * Note we don't worry about the page range at the end of the table here; it is
601  * present in the build state struct after we're called the last time, but not
602  * inserted into the index.  Caller must ensure to do so, if appropriate.
603  */
604 static void
brinbuildCallback(Relation index,ItemPointer tid,Datum * values,bool * isnull,bool tupleIsAlive,void * brstate)605 brinbuildCallback(Relation index,
606 				  ItemPointer tid,
607 				  Datum *values,
608 				  bool *isnull,
609 				  bool tupleIsAlive,
610 				  void *brstate)
611 {
612 	BrinBuildState *state = (BrinBuildState *) brstate;
613 	BlockNumber thisblock;
614 	int			i;
615 
616 	thisblock = ItemPointerGetBlockNumber(tid);
617 
618 	/*
619 	 * If we're in a block that belongs to a future range, summarize what
620 	 * we've got and start afresh.  Note the scan might have skipped many
621 	 * pages, if they were devoid of live tuples; make sure to insert index
622 	 * tuples for those too.
623 	 */
624 	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
625 	{
626 
627 		BRIN_elog((DEBUG2,
628 				   "brinbuildCallback: completed a range: %u--%u",
629 				   state->bs_currRangeStart,
630 				   state->bs_currRangeStart + state->bs_pagesPerRange));
631 
632 		/* create the index tuple and insert it */
633 		form_and_insert_tuple(state);
634 
635 		/* set state to correspond to the next range */
636 		state->bs_currRangeStart += state->bs_pagesPerRange;
637 
638 		/* re-initialize state for it */
639 		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
640 	}
641 
642 	/* Accumulate the current tuple into the running state */
643 	for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
644 	{
645 		FmgrInfo   *addValue;
646 		BrinValues *col;
647 		Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
648 
649 		col = &state->bs_dtuple->bt_columns[i];
650 		addValue = index_getprocinfo(index, i + 1,
651 									 BRIN_PROCNUM_ADDVALUE);
652 
653 		/*
654 		 * Update dtuple state, if and as necessary.
655 		 */
656 		FunctionCall4Coll(addValue,
657 						  attr->attcollation,
658 						  PointerGetDatum(state->bs_bdesc),
659 						  PointerGetDatum(col),
660 						  values[i], isnull[i]);
661 	}
662 }
663 
664 /*
665  * brinbuild() -- build a new BRIN index.
666  */
667 IndexBuildResult *
brinbuild(Relation heap,Relation index,IndexInfo * indexInfo)668 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
669 {
670 	IndexBuildResult *result;
671 	double		reltuples;
672 	double		idxtuples;
673 	BrinRevmap *revmap;
674 	BrinBuildState *state;
675 	Buffer		meta;
676 	BlockNumber pagesPerRange;
677 
678 	/*
679 	 * We expect to be called exactly once for any index relation.
680 	 */
681 	if (RelationGetNumberOfBlocks(index) != 0)
682 		elog(ERROR, "index \"%s\" already contains data",
683 			 RelationGetRelationName(index));
684 
685 	/*
686 	 * Critical section not required, because on error the creation of the
687 	 * whole relation will be rolled back.
688 	 */
689 
690 	meta = ReadBuffer(index, P_NEW);
691 	Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
692 	LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
693 
694 	brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
695 					   BRIN_CURRENT_VERSION);
696 	MarkBufferDirty(meta);
697 
698 	if (RelationNeedsWAL(index))
699 	{
700 		xl_brin_createidx xlrec;
701 		XLogRecPtr	recptr;
702 		Page		page;
703 
704 		xlrec.version = BRIN_CURRENT_VERSION;
705 		xlrec.pagesPerRange = BrinGetPagesPerRange(index);
706 
707 		XLogBeginInsert();
708 		XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
709 		XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
710 
711 		recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
712 
713 		page = BufferGetPage(meta);
714 		PageSetLSN(page, recptr);
715 	}
716 
717 	UnlockReleaseBuffer(meta);
718 
719 	/*
720 	 * Initialize our state, including the deformed tuple state.
721 	 */
722 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
723 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
724 
725 	/*
726 	 * Now scan the relation.  No syncscan allowed here because we want the
727 	 * heap blocks in physical order.
728 	 */
729 	reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
730 									   brinbuildCallback, (void *) state, NULL);
731 
732 	/* process the final batch */
733 	form_and_insert_tuple(state);
734 
735 	/* release resources */
736 	idxtuples = state->bs_numtuples;
737 	brinRevmapTerminate(state->bs_rmAccess);
738 	terminate_brin_buildstate(state);
739 
740 	/*
741 	 * Return statistics
742 	 */
743 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
744 
745 	result->heap_tuples = reltuples;
746 	result->index_tuples = idxtuples;
747 
748 	return result;
749 }
750 
751 void
brinbuildempty(Relation index)752 brinbuildempty(Relation index)
753 {
754 	Buffer		metabuf;
755 
756 	/* An empty BRIN index has a metapage only. */
757 	metabuf =
758 		ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
759 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
760 
761 	/* Initialize and xlog metabuffer. */
762 	START_CRIT_SECTION();
763 	brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
764 					   BRIN_CURRENT_VERSION);
765 	MarkBufferDirty(metabuf);
766 	log_newpage_buffer(metabuf, true);
767 	END_CRIT_SECTION();
768 
769 	UnlockReleaseBuffer(metabuf);
770 }
771 
772 /*
773  * brinbulkdelete
774  *		Since there are no per-heap-tuple index tuples in BRIN indexes,
775  *		there's not a lot we can do here.
776  *
777  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
778  * tuple is deleted), meaning the need to re-run summarization on the affected
779  * range.  Would need to add an extra flag in brintuples for that.
780  */
781 IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)782 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
783 			   IndexBulkDeleteCallback callback, void *callback_state)
784 {
785 	/* allocate stats if first time through, else re-use existing struct */
786 	if (stats == NULL)
787 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
788 
789 	return stats;
790 }
791 
792 /*
793  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
794  * ranges that are currently unsummarized.
795  */
796 IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)797 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
798 {
799 	Relation	heapRel;
800 
801 	/* No-op in ANALYZE ONLY mode */
802 	if (info->analyze_only)
803 		return stats;
804 
805 	if (!stats)
806 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
807 	stats->num_pages = RelationGetNumberOfBlocks(info->index);
808 	/* rest of stats is initialized by zeroing */
809 
810 	heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
811 						 AccessShareLock);
812 
813 	brin_vacuum_scan(info->index, info->strategy);
814 
815 	brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
816 				  &stats->num_index_tuples, &stats->num_index_tuples);
817 
818 	table_close(heapRel, AccessShareLock);
819 
820 	return stats;
821 }
822 
823 /*
824  * reloptions processor for BRIN indexes
825  */
826 bytea *
brinoptions(Datum reloptions,bool validate)827 brinoptions(Datum reloptions, bool validate)
828 {
829 	static const relopt_parse_elt tab[] = {
830 		{"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
831 		{"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
832 	};
833 
834 	return (bytea *) build_reloptions(reloptions, validate,
835 									  RELOPT_KIND_BRIN,
836 									  sizeof(BrinOptions),
837 									  tab, lengthof(tab));
838 }
839 
840 /*
841  * SQL-callable function to scan through an index and summarize all ranges
842  * that are not currently summarized.
843  */
844 Datum
brin_summarize_new_values(PG_FUNCTION_ARGS)845 brin_summarize_new_values(PG_FUNCTION_ARGS)
846 {
847 	Datum		relation = PG_GETARG_DATUM(0);
848 
849 	return DirectFunctionCall2(brin_summarize_range,
850 							   relation,
851 							   Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
852 }
853 
854 /*
855  * SQL-callable function to summarize the indicated page range, if not already
856  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
857  * unsummarized ranges are summarized.
858  */
859 Datum
brin_summarize_range(PG_FUNCTION_ARGS)860 brin_summarize_range(PG_FUNCTION_ARGS)
861 {
862 	Oid			indexoid = PG_GETARG_OID(0);
863 	int64		heapBlk64 = PG_GETARG_INT64(1);
864 	BlockNumber heapBlk;
865 	Oid			heapoid;
866 	Relation	indexRel;
867 	Relation	heapRel;
868 	double		numSummarized = 0;
869 
870 	if (RecoveryInProgress())
871 		ereport(ERROR,
872 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
873 				 errmsg("recovery is in progress"),
874 				 errhint("BRIN control functions cannot be executed during recovery.")));
875 
876 	if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
877 	{
878 		char	   *blk = psprintf(INT64_FORMAT, heapBlk64);
879 
880 		ereport(ERROR,
881 				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
882 				 errmsg("block number out of range: %s", blk)));
883 	}
884 	heapBlk = (BlockNumber) heapBlk64;
885 
886 	/*
887 	 * We must lock table before index to avoid deadlocks.  However, if the
888 	 * passed indexoid isn't an index then IndexGetRelation() will fail.
889 	 * Rather than emitting a not-very-helpful error message, postpone
890 	 * complaining, expecting that the is-it-an-index test below will fail.
891 	 */
892 	heapoid = IndexGetRelation(indexoid, true);
893 	if (OidIsValid(heapoid))
894 		heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
895 	else
896 		heapRel = NULL;
897 
898 	indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
899 
900 	/* Must be a BRIN index */
901 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
902 		indexRel->rd_rel->relam != BRIN_AM_OID)
903 		ereport(ERROR,
904 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
905 				 errmsg("\"%s\" is not a BRIN index",
906 						RelationGetRelationName(indexRel))));
907 
908 	/* User must own the index (comparable to privileges needed for VACUUM) */
909 	if (!pg_class_ownercheck(indexoid, GetUserId()))
910 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
911 					   RelationGetRelationName(indexRel));
912 
913 	/*
914 	 * Since we did the IndexGetRelation call above without any lock, it's
915 	 * barely possible that a race against an index drop/recreation could have
916 	 * netted us the wrong table.  Recheck.
917 	 */
918 	if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
919 		ereport(ERROR,
920 				(errcode(ERRCODE_UNDEFINED_TABLE),
921 				 errmsg("could not open parent table of index %s",
922 						RelationGetRelationName(indexRel))));
923 
924 	/* OK, do it */
925 	brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
926 
927 	relation_close(indexRel, ShareUpdateExclusiveLock);
928 	relation_close(heapRel, ShareUpdateExclusiveLock);
929 
930 	PG_RETURN_INT32((int32) numSummarized);
931 }
932 
933 /*
934  * SQL-callable interface to mark a range as no longer summarized
935  */
936 Datum
brin_desummarize_range(PG_FUNCTION_ARGS)937 brin_desummarize_range(PG_FUNCTION_ARGS)
938 {
939 	Oid			indexoid = PG_GETARG_OID(0);
940 	int64		heapBlk64 = PG_GETARG_INT64(1);
941 	BlockNumber heapBlk;
942 	Oid			heapoid;
943 	Relation	heapRel;
944 	Relation	indexRel;
945 	bool		done;
946 
947 	if (RecoveryInProgress())
948 		ereport(ERROR,
949 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
950 				 errmsg("recovery is in progress"),
951 				 errhint("BRIN control functions cannot be executed during recovery.")));
952 
953 	if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
954 	{
955 		char	   *blk = psprintf(INT64_FORMAT, heapBlk64);
956 
957 		ereport(ERROR,
958 				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
959 				 errmsg("block number out of range: %s", blk)));
960 	}
961 	heapBlk = (BlockNumber) heapBlk64;
962 
963 	/*
964 	 * We must lock table before index to avoid deadlocks.  However, if the
965 	 * passed indexoid isn't an index then IndexGetRelation() will fail.
966 	 * Rather than emitting a not-very-helpful error message, postpone
967 	 * complaining, expecting that the is-it-an-index test below will fail.
968 	 */
969 	heapoid = IndexGetRelation(indexoid, true);
970 	if (OidIsValid(heapoid))
971 		heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
972 	else
973 		heapRel = NULL;
974 
975 	indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
976 
977 	/* Must be a BRIN index */
978 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
979 		indexRel->rd_rel->relam != BRIN_AM_OID)
980 		ereport(ERROR,
981 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
982 				 errmsg("\"%s\" is not a BRIN index",
983 						RelationGetRelationName(indexRel))));
984 
985 	/* User must own the index (comparable to privileges needed for VACUUM) */
986 	if (!pg_class_ownercheck(indexoid, GetUserId()))
987 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
988 					   RelationGetRelationName(indexRel));
989 
990 	/*
991 	 * Since we did the IndexGetRelation call above without any lock, it's
992 	 * barely possible that a race against an index drop/recreation could have
993 	 * netted us the wrong table.  Recheck.
994 	 */
995 	if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
996 		ereport(ERROR,
997 				(errcode(ERRCODE_UNDEFINED_TABLE),
998 				 errmsg("could not open parent table of index %s",
999 						RelationGetRelationName(indexRel))));
1000 
1001 	/* the revmap does the hard work */
1002 	do
1003 	{
1004 		done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1005 	}
1006 	while (!done);
1007 
1008 	relation_close(indexRel, ShareUpdateExclusiveLock);
1009 	relation_close(heapRel, ShareUpdateExclusiveLock);
1010 
1011 	PG_RETURN_VOID();
1012 }
1013 
1014 /*
1015  * Build a BrinDesc used to create or scan a BRIN index
1016  */
1017 BrinDesc *
brin_build_desc(Relation rel)1018 brin_build_desc(Relation rel)
1019 {
1020 	BrinOpcInfo **opcinfo;
1021 	BrinDesc   *bdesc;
1022 	TupleDesc	tupdesc;
1023 	int			totalstored = 0;
1024 	int			keyno;
1025 	long		totalsize;
1026 	MemoryContext cxt;
1027 	MemoryContext oldcxt;
1028 
1029 	cxt = AllocSetContextCreate(CurrentMemoryContext,
1030 								"brin desc cxt",
1031 								ALLOCSET_SMALL_SIZES);
1032 	oldcxt = MemoryContextSwitchTo(cxt);
1033 	tupdesc = RelationGetDescr(rel);
1034 
1035 	/*
1036 	 * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
1037 	 * the number of columns stored, since the number is opclass-defined.
1038 	 */
1039 	opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1040 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
1041 	{
1042 		FmgrInfo   *opcInfoFn;
1043 		Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1044 
1045 		opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1046 
1047 		opcinfo[keyno] = (BrinOpcInfo *)
1048 			DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1049 		totalstored += opcinfo[keyno]->oi_nstored;
1050 	}
1051 
1052 	/* Allocate our result struct and fill it in */
1053 	totalsize = offsetof(BrinDesc, bd_info) +
1054 		sizeof(BrinOpcInfo *) * tupdesc->natts;
1055 
1056 	bdesc = palloc(totalsize);
1057 	bdesc->bd_context = cxt;
1058 	bdesc->bd_index = rel;
1059 	bdesc->bd_tupdesc = tupdesc;
1060 	bdesc->bd_disktdesc = NULL; /* generated lazily */
1061 	bdesc->bd_totalstored = totalstored;
1062 
1063 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
1064 		bdesc->bd_info[keyno] = opcinfo[keyno];
1065 	pfree(opcinfo);
1066 
1067 	MemoryContextSwitchTo(oldcxt);
1068 
1069 	return bdesc;
1070 }
1071 
1072 void
brin_free_desc(BrinDesc * bdesc)1073 brin_free_desc(BrinDesc *bdesc)
1074 {
1075 	/* make sure the tupdesc is still valid */
1076 	Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1077 	/* no need for retail pfree */
1078 	MemoryContextDelete(bdesc->bd_context);
1079 }
1080 
1081 /*
1082  * Fetch index's statistical data into *stats
1083  */
1084 void
brinGetStats(Relation index,BrinStatsData * stats)1085 brinGetStats(Relation index, BrinStatsData *stats)
1086 {
1087 	Buffer		metabuffer;
1088 	Page		metapage;
1089 	BrinMetaPageData *metadata;
1090 
1091 	metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1092 	LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1093 	metapage = BufferGetPage(metabuffer);
1094 	metadata = (BrinMetaPageData *) PageGetContents(metapage);
1095 
1096 	stats->pagesPerRange = metadata->pagesPerRange;
1097 	stats->revmapNumPages = metadata->lastRevmapPage - 1;
1098 
1099 	UnlockReleaseBuffer(metabuffer);
1100 }
1101 
1102 /*
1103  * Initialize a BrinBuildState appropriate to create tuples on the given index.
1104  */
1105 static BrinBuildState *
initialize_brin_buildstate(Relation idxRel,BrinRevmap * revmap,BlockNumber pagesPerRange)1106 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1107 						   BlockNumber pagesPerRange)
1108 {
1109 	BrinBuildState *state;
1110 
1111 	state = palloc(sizeof(BrinBuildState));
1112 
1113 	state->bs_irel = idxRel;
1114 	state->bs_numtuples = 0;
1115 	state->bs_currentInsertBuf = InvalidBuffer;
1116 	state->bs_pagesPerRange = pagesPerRange;
1117 	state->bs_currRangeStart = 0;
1118 	state->bs_rmAccess = revmap;
1119 	state->bs_bdesc = brin_build_desc(idxRel);
1120 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1121 
1122 	brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1123 
1124 	return state;
1125 }
1126 
1127 /*
1128  * Release resources associated with a BrinBuildState.
1129  */
1130 static void
terminate_brin_buildstate(BrinBuildState * state)1131 terminate_brin_buildstate(BrinBuildState *state)
1132 {
1133 	/*
1134 	 * Release the last index buffer used.  We might as well ensure that
1135 	 * whatever free space remains in that page is available in FSM, too.
1136 	 */
1137 	if (!BufferIsInvalid(state->bs_currentInsertBuf))
1138 	{
1139 		Page		page;
1140 		Size		freespace;
1141 		BlockNumber blk;
1142 
1143 		page = BufferGetPage(state->bs_currentInsertBuf);
1144 		freespace = PageGetFreeSpace(page);
1145 		blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1146 		ReleaseBuffer(state->bs_currentInsertBuf);
1147 		RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1148 		FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1149 	}
1150 
1151 	brin_free_desc(state->bs_bdesc);
1152 	pfree(state->bs_dtuple);
1153 	pfree(state);
1154 }
1155 
1156 /*
1157  * On the given BRIN index, summarize the heap page range that corresponds
1158  * to the heap block number given.
1159  *
1160  * This routine can run in parallel with insertions into the heap.  To avoid
1161  * missing those values from the summary tuple, we first insert a placeholder
1162  * index tuple into the index, then execute the heap scan; transactions
1163  * concurrent with the scan update the placeholder tuple.  After the scan, we
1164  * union the placeholder tuple with the one computed by this routine.  The
1165  * update of the index value happens in a loop, so that if somebody updates
1166  * the placeholder tuple after we read it, we detect the case and try again.
1167  * This ensures that the concurrently inserted tuples are not lost.
1168  *
1169  * A further corner case is this routine being asked to summarize the partial
1170  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
1171  * table size; if we notice that the requested range lies beyond that size,
1172  * we re-compute the table size after inserting the placeholder tuple, to
1173  * avoid missing pages that were appended recently.
1174  */
1175 static void
summarize_range(IndexInfo * indexInfo,BrinBuildState * state,Relation heapRel,BlockNumber heapBlk,BlockNumber heapNumBlks)1176 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1177 				BlockNumber heapBlk, BlockNumber heapNumBlks)
1178 {
1179 	Buffer		phbuf;
1180 	BrinTuple  *phtup;
1181 	Size		phsz;
1182 	OffsetNumber offset;
1183 	BlockNumber scanNumBlks;
1184 
1185 	/*
1186 	 * Insert the placeholder tuple
1187 	 */
1188 	phbuf = InvalidBuffer;
1189 	phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1190 	offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1191 						   state->bs_rmAccess, &phbuf,
1192 						   heapBlk, phtup, phsz);
1193 
1194 	/*
1195 	 * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
1196 	 * cannot shrink concurrently (but it can grow).
1197 	 */
1198 	Assert(heapBlk % state->bs_pagesPerRange == 0);
1199 	if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1200 	{
1201 		/*
1202 		 * If we're asked to scan what we believe to be the final range on the
1203 		 * table (i.e. a range that might be partial) we need to recompute our
1204 		 * idea of what the latest page is after inserting the placeholder
1205 		 * tuple.  Anyone that grows the table later will update the
1206 		 * placeholder tuple, so it doesn't matter that we won't scan these
1207 		 * pages ourselves.  Careful: the table might have been extended
1208 		 * beyond the current range, so clamp our result.
1209 		 *
1210 		 * Fortunately, this should occur infrequently.
1211 		 */
1212 		scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1213 						  state->bs_pagesPerRange);
1214 	}
1215 	else
1216 	{
1217 		/* Easy case: range is known to be complete */
1218 		scanNumBlks = state->bs_pagesPerRange;
1219 	}
1220 
1221 	/*
1222 	 * Execute the partial heap scan covering the heap blocks in the specified
1223 	 * page range, summarizing the heap tuples in it.  This scan stops just
1224 	 * short of brinbuildCallback creating the new index entry.
1225 	 *
1226 	 * Note that it is critical we use the "any visible" mode of
1227 	 * table_index_build_range_scan here: otherwise, we would miss tuples
1228 	 * inserted by transactions that are still in progress, among other corner
1229 	 * cases.
1230 	 */
1231 	state->bs_currRangeStart = heapBlk;
1232 	table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1233 								 heapBlk, scanNumBlks,
1234 								 brinbuildCallback, (void *) state, NULL);
1235 
1236 	/*
1237 	 * Now we update the values obtained by the scan with the placeholder
1238 	 * tuple.  We do this in a loop which only terminates if we're able to
1239 	 * update the placeholder tuple successfully; if we are not, this means
1240 	 * somebody else modified the placeholder tuple after we read it.
1241 	 */
1242 	for (;;)
1243 	{
1244 		BrinTuple  *newtup;
1245 		Size		newsize;
1246 		bool		didupdate;
1247 		bool		samepage;
1248 
1249 		CHECK_FOR_INTERRUPTS();
1250 
1251 		/*
1252 		 * Update the summary tuple and try to update.
1253 		 */
1254 		newtup = brin_form_tuple(state->bs_bdesc,
1255 								 heapBlk, state->bs_dtuple, &newsize);
1256 		samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1257 		didupdate =
1258 			brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1259 						  state->bs_rmAccess, heapBlk, phbuf, offset,
1260 						  phtup, phsz, newtup, newsize, samepage);
1261 		brin_free_tuple(phtup);
1262 		brin_free_tuple(newtup);
1263 
1264 		/* If the update succeeded, we're done. */
1265 		if (didupdate)
1266 			break;
1267 
1268 		/*
1269 		 * If the update didn't work, it might be because somebody updated the
1270 		 * placeholder tuple concurrently.  Extract the new version, union it
1271 		 * with the values we have from the scan, and start over.  (There are
1272 		 * other reasons for the update to fail, but it's simple to treat them
1273 		 * the same.)
1274 		 */
1275 		phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1276 										 &offset, &phsz, BUFFER_LOCK_SHARE,
1277 										 NULL);
1278 		/* the placeholder tuple must exist */
1279 		if (phtup == NULL)
1280 			elog(ERROR, "missing placeholder tuple");
1281 		phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1282 		LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1283 
1284 		/* merge it into the tuple from the heap scan */
1285 		union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1286 	}
1287 
1288 	ReleaseBuffer(phbuf);
1289 }
1290 
1291 /*
1292  * Summarize page ranges that are not already summarized.  If pageRange is
1293  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1294  * page range containing the given heap page number is scanned.
1295  * If include_partial is true, then the partial range at the end of the table
1296  * is summarized, otherwise not.
1297  *
1298  * For each new index tuple inserted, *numSummarized (if not NULL) is
1299  * incremented; for each existing tuple, *numExisting (if not NULL) is
1300  * incremented.
1301  */
1302 static void
brinsummarize(Relation index,Relation heapRel,BlockNumber pageRange,bool include_partial,double * numSummarized,double * numExisting)1303 brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1304 			  bool include_partial, double *numSummarized, double *numExisting)
1305 {
1306 	BrinRevmap *revmap;
1307 	BrinBuildState *state = NULL;
1308 	IndexInfo  *indexInfo = NULL;
1309 	BlockNumber heapNumBlocks;
1310 	BlockNumber pagesPerRange;
1311 	Buffer		buf;
1312 	BlockNumber startBlk;
1313 
1314 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1315 
1316 	/* determine range of pages to process */
1317 	heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1318 	if (pageRange == BRIN_ALL_BLOCKRANGES)
1319 		startBlk = 0;
1320 	else
1321 	{
1322 		startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1323 		heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1324 	}
1325 	if (startBlk > heapNumBlocks)
1326 	{
1327 		/* Nothing to do if start point is beyond end of table */
1328 		brinRevmapTerminate(revmap);
1329 		return;
1330 	}
1331 
1332 	/*
1333 	 * Scan the revmap to find unsummarized items.
1334 	 */
1335 	buf = InvalidBuffer;
1336 	for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1337 	{
1338 		BrinTuple  *tup;
1339 		OffsetNumber off;
1340 
1341 		/*
1342 		 * Unless requested to summarize even a partial range, go away now if
1343 		 * we think the next range is partial.  Caller would pass true when it
1344 		 * is typically run once bulk data loading is done
1345 		 * (brin_summarize_new_values), and false when it is typically the
1346 		 * result of arbitrarily-scheduled maintenance command (vacuuming).
1347 		 */
1348 		if (!include_partial &&
1349 			(startBlk + pagesPerRange > heapNumBlocks))
1350 			break;
1351 
1352 		CHECK_FOR_INTERRUPTS();
1353 
1354 		tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1355 									   BUFFER_LOCK_SHARE, NULL);
1356 		if (tup == NULL)
1357 		{
1358 			/* no revmap entry for this heap range. Summarize it. */
1359 			if (state == NULL)
1360 			{
1361 				/* first time through */
1362 				Assert(!indexInfo);
1363 				state = initialize_brin_buildstate(index, revmap,
1364 												   pagesPerRange);
1365 				indexInfo = BuildIndexInfo(index);
1366 			}
1367 			summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1368 
1369 			/* and re-initialize state for the next range */
1370 			brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1371 
1372 			if (numSummarized)
1373 				*numSummarized += 1.0;
1374 		}
1375 		else
1376 		{
1377 			if (numExisting)
1378 				*numExisting += 1.0;
1379 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1380 		}
1381 	}
1382 
1383 	if (BufferIsValid(buf))
1384 		ReleaseBuffer(buf);
1385 
1386 	/* free resources */
1387 	brinRevmapTerminate(revmap);
1388 	if (state)
1389 	{
1390 		terminate_brin_buildstate(state);
1391 		pfree(indexInfo);
1392 	}
1393 }
1394 
1395 /*
1396  * Given a deformed tuple in the build state, convert it into the on-disk
1397  * format and insert it into the index, making the revmap point to it.
1398  */
1399 static void
form_and_insert_tuple(BrinBuildState * state)1400 form_and_insert_tuple(BrinBuildState *state)
1401 {
1402 	BrinTuple  *tup;
1403 	Size		size;
1404 
1405 	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1406 						  state->bs_dtuple, &size);
1407 	brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1408 				  &state->bs_currentInsertBuf, state->bs_currRangeStart,
1409 				  tup, size);
1410 	state->bs_numtuples++;
1411 
1412 	pfree(tup);
1413 }
1414 
1415 /*
1416  * Given two deformed tuples, adjust the first one so that it's consistent
1417  * with the summary values in both.
1418  */
1419 static void
union_tuples(BrinDesc * bdesc,BrinMemTuple * a,BrinTuple * b)1420 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1421 {
1422 	int			keyno;
1423 	BrinMemTuple *db;
1424 	MemoryContext cxt;
1425 	MemoryContext oldcxt;
1426 
1427 	/* Use our own memory context to avoid retail pfree */
1428 	cxt = AllocSetContextCreate(CurrentMemoryContext,
1429 								"brin union",
1430 								ALLOCSET_DEFAULT_SIZES);
1431 	oldcxt = MemoryContextSwitchTo(cxt);
1432 	db = brin_deform_tuple(bdesc, b, NULL);
1433 	MemoryContextSwitchTo(oldcxt);
1434 
1435 	for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1436 	{
1437 		FmgrInfo   *unionFn;
1438 		BrinValues *col_a = &a->bt_columns[keyno];
1439 		BrinValues *col_b = &db->bt_columns[keyno];
1440 
1441 		unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1442 									BRIN_PROCNUM_UNION);
1443 		FunctionCall3Coll(unionFn,
1444 						  bdesc->bd_index->rd_indcollation[keyno],
1445 						  PointerGetDatum(bdesc),
1446 						  PointerGetDatum(col_a),
1447 						  PointerGetDatum(col_b));
1448 	}
1449 
1450 	MemoryContextDelete(cxt);
1451 }
1452 
1453 /*
1454  * brin_vacuum_scan
1455  *		Do a complete scan of the index during VACUUM.
1456  *
1457  * This routine scans the complete index looking for uncatalogued index pages,
1458  * i.e. those that might have been lost due to a crash after index extension
1459  * and such.
1460  */
1461 static void
brin_vacuum_scan(Relation idxrel,BufferAccessStrategy strategy)1462 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1463 {
1464 	BlockNumber nblocks;
1465 	BlockNumber blkno;
1466 
1467 	/*
1468 	 * Scan the index in physical order, and clean up any possible mess in
1469 	 * each page.
1470 	 */
1471 	nblocks = RelationGetNumberOfBlocks(idxrel);
1472 	for (blkno = 0; blkno < nblocks; blkno++)
1473 	{
1474 		Buffer		buf;
1475 
1476 		CHECK_FOR_INTERRUPTS();
1477 
1478 		buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1479 								 RBM_NORMAL, strategy);
1480 
1481 		brin_page_cleanup(idxrel, buf);
1482 
1483 		ReleaseBuffer(buf);
1484 	}
1485 
1486 	/*
1487 	 * Update all upper pages in the index's FSM, as well.  This ensures not
1488 	 * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
1489 	 * but also that any pre-existing damage or out-of-dateness is repaired.
1490 	 */
1491 	FreeSpaceMapVacuum(idxrel);
1492 }
1493