1 /*
2  * brin.c
3  *		Implementation of BRIN indexes for Postgres
4  *
5  * See src/backend/access/brin/README for details.
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/brin/brin.c
12  *
13  * TODO
14  *		* ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15  */
16 #include "postgres.h"
17 
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/relation.h"
23 #include "access/reloptions.h"
24 #include "access/relscan.h"
25 #include "access/table.h"
26 #include "access/tableam.h"
27 #include "access/xloginsert.h"
28 #include "catalog/index.h"
29 #include "catalog/pg_am.h"
30 #include "miscadmin.h"
31 #include "pgstat.h"
32 #include "postmaster/autovacuum.h"
33 #include "storage/bufmgr.h"
34 #include "storage/freespace.h"
35 #include "utils/builtins.h"
36 #include "utils/index_selfuncs.h"
37 #include "utils/memutils.h"
38 #include "utils/rel.h"
39 
40 
41 /*
42  * We use a BrinBuildState during initial construction of a BRIN index.
43  * The running state is kept in a BrinMemTuple.
44  */
45 typedef struct BrinBuildState
46 {
47 	Relation	bs_irel;
48 	int			bs_numtuples;
49 	Buffer		bs_currentInsertBuf;
50 	BlockNumber bs_pagesPerRange;
51 	BlockNumber bs_currRangeStart;
52 	BrinRevmap *bs_rmAccess;
53 	BrinDesc   *bs_bdesc;
54 	BrinMemTuple *bs_dtuple;
55 } BrinBuildState;
56 
57 /*
58  * Struct used as "opaque" during index scans
59  */
60 typedef struct BrinOpaque
61 {
62 	BlockNumber bo_pagesPerRange;
63 	BrinRevmap *bo_rmAccess;
64 	BrinDesc   *bo_bdesc;
65 } BrinOpaque;
66 
67 #define BRIN_ALL_BLOCKRANGES	InvalidBlockNumber
68 
69 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
70 												  BrinRevmap *revmap, BlockNumber pagesPerRange);
71 static void terminate_brin_buildstate(BrinBuildState *state);
72 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
73 						  bool include_partial, double *numSummarized, double *numExisting);
74 static void form_and_insert_tuple(BrinBuildState *state);
75 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
76 						 BrinTuple *b);
77 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
78 
79 
80 /*
81  * BRIN handler function: return IndexAmRoutine with access method parameters
82  * and callbacks.
83  */
84 Datum
brinhandler(PG_FUNCTION_ARGS)85 brinhandler(PG_FUNCTION_ARGS)
86 {
87 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
88 
89 	amroutine->amstrategies = 0;
90 	amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
91 	amroutine->amcanorder = false;
92 	amroutine->amcanorderbyop = false;
93 	amroutine->amcanbackward = false;
94 	amroutine->amcanunique = false;
95 	amroutine->amcanmulticol = true;
96 	amroutine->amoptionalkey = true;
97 	amroutine->amsearcharray = false;
98 	amroutine->amsearchnulls = true;
99 	amroutine->amstorage = true;
100 	amroutine->amclusterable = false;
101 	amroutine->ampredlocks = false;
102 	amroutine->amcanparallel = false;
103 	amroutine->amcaninclude = false;
104 	amroutine->amkeytype = InvalidOid;
105 
106 	amroutine->ambuild = brinbuild;
107 	amroutine->ambuildempty = brinbuildempty;
108 	amroutine->aminsert = brininsert;
109 	amroutine->ambulkdelete = brinbulkdelete;
110 	amroutine->amvacuumcleanup = brinvacuumcleanup;
111 	amroutine->amcanreturn = NULL;
112 	amroutine->amcostestimate = brincostestimate;
113 	amroutine->amoptions = brinoptions;
114 	amroutine->amproperty = NULL;
115 	amroutine->ambuildphasename = NULL;
116 	amroutine->amvalidate = brinvalidate;
117 	amroutine->ambeginscan = brinbeginscan;
118 	amroutine->amrescan = brinrescan;
119 	amroutine->amgettuple = NULL;
120 	amroutine->amgetbitmap = bringetbitmap;
121 	amroutine->amendscan = brinendscan;
122 	amroutine->ammarkpos = NULL;
123 	amroutine->amrestrpos = NULL;
124 	amroutine->amestimateparallelscan = NULL;
125 	amroutine->aminitparallelscan = NULL;
126 	amroutine->amparallelrescan = NULL;
127 
128 	PG_RETURN_POINTER(amroutine);
129 }
130 
131 /*
132  * A tuple in the heap is being inserted.  To keep a brin index up to date,
133  * we need to obtain the relevant index tuple and compare its stored values
134  * with those of the new tuple.  If the tuple values are not consistent with
135  * the summary tuple, we need to update the index tuple.
136  *
137  * If autosummarization is enabled, check if we need to summarize the previous
138  * page range.
139  *
140  * If the range is not currently summarized (i.e. the revmap returns NULL for
141  * it), there's nothing to do for this tuple.
142  */
143 bool
brininsert(Relation idxRel,Datum * values,bool * nulls,ItemPointer heaptid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)144 brininsert(Relation idxRel, Datum *values, bool *nulls,
145 		   ItemPointer heaptid, Relation heapRel,
146 		   IndexUniqueCheck checkUnique,
147 		   IndexInfo *indexInfo)
148 {
149 	BlockNumber pagesPerRange;
150 	BlockNumber origHeapBlk;
151 	BlockNumber heapBlk;
152 	BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
153 	BrinRevmap *revmap;
154 	Buffer		buf = InvalidBuffer;
155 	MemoryContext tupcxt = NULL;
156 	MemoryContext oldcxt = CurrentMemoryContext;
157 	bool		autosummarize = BrinGetAutoSummarize(idxRel);
158 
159 	revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
160 
161 	/*
162 	 * origHeapBlk is the block number where the insertion occurred.  heapBlk
163 	 * is the first block in the corresponding page range.
164 	 */
165 	origHeapBlk = ItemPointerGetBlockNumber(heaptid);
166 	heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
167 
168 	for (;;)
169 	{
170 		bool		need_insert = false;
171 		OffsetNumber off;
172 		BrinTuple  *brtup;
173 		BrinMemTuple *dtup;
174 		int			keyno;
175 
176 		CHECK_FOR_INTERRUPTS();
177 
178 		/*
179 		 * If auto-summarization is enabled and we just inserted the first
180 		 * tuple into the first block of a new non-first page range, request a
181 		 * summarization run of the previous range.
182 		 */
183 		if (autosummarize &&
184 			heapBlk > 0 &&
185 			heapBlk == origHeapBlk &&
186 			ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
187 		{
188 			BlockNumber lastPageRange = heapBlk - 1;
189 			BrinTuple  *lastPageTuple;
190 
191 			lastPageTuple =
192 				brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
193 										 NULL, BUFFER_LOCK_SHARE, NULL);
194 			if (!lastPageTuple)
195 			{
196 				bool		recorded;
197 
198 				recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
199 												 RelationGetRelid(idxRel),
200 												 lastPageRange);
201 				if (!recorded)
202 					ereport(LOG,
203 							(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
204 							 errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
205 									RelationGetRelationName(idxRel),
206 									lastPageRange)));
207 			}
208 			else
209 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
210 		}
211 
212 		brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
213 										 NULL, BUFFER_LOCK_SHARE, NULL);
214 
215 		/* if range is unsummarized, there's nothing to do */
216 		if (!brtup)
217 			break;
218 
219 		/* First time through in this statement? */
220 		if (bdesc == NULL)
221 		{
222 			MemoryContextSwitchTo(indexInfo->ii_Context);
223 			bdesc = brin_build_desc(idxRel);
224 			indexInfo->ii_AmCache = (void *) bdesc;
225 			MemoryContextSwitchTo(oldcxt);
226 		}
227 		/* First time through in this brininsert call? */
228 		if (tupcxt == NULL)
229 		{
230 			tupcxt = AllocSetContextCreate(CurrentMemoryContext,
231 										   "brininsert cxt",
232 										   ALLOCSET_DEFAULT_SIZES);
233 			MemoryContextSwitchTo(tupcxt);
234 		}
235 
236 		dtup = brin_deform_tuple(bdesc, brtup, NULL);
237 
238 		/*
239 		 * Compare the key values of the new tuple to the stored index values;
240 		 * our deformed tuple will get updated if the new tuple doesn't fit
241 		 * the original range (note this means we can't break out of the loop
242 		 * early). Make a note of whether this happens, so that we know to
243 		 * insert the modified tuple later.
244 		 */
245 		for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
246 		{
247 			Datum		result;
248 			BrinValues *bval;
249 			FmgrInfo   *addValue;
250 
251 			bval = &dtup->bt_columns[keyno];
252 			addValue = index_getprocinfo(idxRel, keyno + 1,
253 										 BRIN_PROCNUM_ADDVALUE);
254 			result = FunctionCall4Coll(addValue,
255 									   idxRel->rd_indcollation[keyno],
256 									   PointerGetDatum(bdesc),
257 									   PointerGetDatum(bval),
258 									   values[keyno],
259 									   nulls[keyno]);
260 			/* if that returned true, we need to insert the updated tuple */
261 			need_insert |= DatumGetBool(result);
262 		}
263 
264 		if (!need_insert)
265 		{
266 			/*
267 			 * The tuple is consistent with the new values, so there's nothing
268 			 * to do.
269 			 */
270 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
271 		}
272 		else
273 		{
274 			Page		page = BufferGetPage(buf);
275 			ItemId		lp = PageGetItemId(page, off);
276 			Size		origsz;
277 			BrinTuple  *origtup;
278 			Size		newsz;
279 			BrinTuple  *newtup;
280 			bool		samepage;
281 
282 			/*
283 			 * Make a copy of the old tuple, so that we can compare it after
284 			 * re-acquiring the lock.
285 			 */
286 			origsz = ItemIdGetLength(lp);
287 			origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
288 
289 			/*
290 			 * Before releasing the lock, check if we can attempt a same-page
291 			 * update.  Another process could insert a tuple concurrently in
292 			 * the same page though, so downstream we must be prepared to cope
293 			 * if this turns out to not be possible after all.
294 			 */
295 			newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
296 			samepage = brin_can_do_samepage_update(buf, origsz, newsz);
297 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
298 
299 			/*
300 			 * Try to update the tuple.  If this doesn't work for whatever
301 			 * reason, we need to restart from the top; the revmap might be
302 			 * pointing at a different tuple for this block now, so we need to
303 			 * recompute to ensure both our new heap tuple and the other
304 			 * inserter's are covered by the combined tuple.  It might be that
305 			 * we don't need to update at all.
306 			 */
307 			if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
308 							   buf, off, origtup, origsz, newtup, newsz,
309 							   samepage))
310 			{
311 				/* no luck; start over */
312 				MemoryContextResetAndDeleteChildren(tupcxt);
313 				continue;
314 			}
315 		}
316 
317 		/* success! */
318 		break;
319 	}
320 
321 	brinRevmapTerminate(revmap);
322 	if (BufferIsValid(buf))
323 		ReleaseBuffer(buf);
324 	MemoryContextSwitchTo(oldcxt);
325 	if (tupcxt != NULL)
326 		MemoryContextDelete(tupcxt);
327 
328 	return false;
329 }
330 
331 /*
332  * Initialize state for a BRIN index scan.
333  *
334  * We read the metapage here to determine the pages-per-range number that this
335  * index was built with.  Note that since this cannot be changed while we're
336  * holding lock on index, it's not necessary to recompute it during brinrescan.
337  */
338 IndexScanDesc
brinbeginscan(Relation r,int nkeys,int norderbys)339 brinbeginscan(Relation r, int nkeys, int norderbys)
340 {
341 	IndexScanDesc scan;
342 	BrinOpaque *opaque;
343 
344 	scan = RelationGetIndexScan(r, nkeys, norderbys);
345 
346 	opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
347 	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
348 											   scan->xs_snapshot);
349 	opaque->bo_bdesc = brin_build_desc(r);
350 	scan->opaque = opaque;
351 
352 	return scan;
353 }
354 
355 /*
356  * Execute the index scan.
357  *
358  * This works by reading index TIDs from the revmap, and obtaining the index
359  * tuples pointed to by them; the summary values in the index tuples are
360  * compared to the scan keys.  We return into the TID bitmap all the pages in
361  * ranges corresponding to index tuples that match the scan keys.
362  *
363  * If a TID from the revmap is read as InvalidTID, we know that range is
364  * unsummarized.  Pages in those ranges need to be returned regardless of scan
365  * keys.
366  */
367 int64
bringetbitmap(IndexScanDesc scan,TIDBitmap * tbm)368 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
369 {
370 	Relation	idxRel = scan->indexRelation;
371 	Buffer		buf = InvalidBuffer;
372 	BrinDesc   *bdesc;
373 	Oid			heapOid;
374 	Relation	heapRel;
375 	BrinOpaque *opaque;
376 	BlockNumber nblocks;
377 	BlockNumber heapBlk;
378 	int			totalpages = 0;
379 	FmgrInfo   *consistentFn;
380 	MemoryContext oldcxt;
381 	MemoryContext perRangeCxt;
382 	BrinMemTuple *dtup;
383 	BrinTuple  *btup = NULL;
384 	Size		btupsz = 0;
385 
386 	opaque = (BrinOpaque *) scan->opaque;
387 	bdesc = opaque->bo_bdesc;
388 	pgstat_count_index_scan(idxRel);
389 
390 	/*
391 	 * We need to know the size of the table so that we know how long to
392 	 * iterate on the revmap.
393 	 */
394 	heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
395 	heapRel = table_open(heapOid, AccessShareLock);
396 	nblocks = RelationGetNumberOfBlocks(heapRel);
397 	table_close(heapRel, AccessShareLock);
398 
399 	/*
400 	 * Make room for the consistent support procedures of indexed columns.  We
401 	 * don't look them up here; we do that lazily the first time we see a scan
402 	 * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
403 	 */
404 	consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
405 
406 	/* allocate an initial in-memory tuple, out of the per-range memcxt */
407 	dtup = brin_new_memtuple(bdesc);
408 
409 	/*
410 	 * Setup and use a per-range memory context, which is reset every time we
411 	 * loop below.  This avoids having to free the tuples within the loop.
412 	 */
413 	perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
414 										"bringetbitmap cxt",
415 										ALLOCSET_DEFAULT_SIZES);
416 	oldcxt = MemoryContextSwitchTo(perRangeCxt);
417 
418 	/*
419 	 * Now scan the revmap.  We start by querying for heap page 0,
420 	 * incrementing by the number of pages per range; this gives us a full
421 	 * view of the table.
422 	 */
423 	for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
424 	{
425 		bool		addrange;
426 		bool		gottuple = false;
427 		BrinTuple  *tup;
428 		OffsetNumber off;
429 		Size		size;
430 
431 		CHECK_FOR_INTERRUPTS();
432 
433 		MemoryContextResetAndDeleteChildren(perRangeCxt);
434 
435 		tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
436 									   &off, &size, BUFFER_LOCK_SHARE,
437 									   scan->xs_snapshot);
438 		if (tup)
439 		{
440 			gottuple = true;
441 			btup = brin_copy_tuple(tup, size, btup, &btupsz);
442 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
443 		}
444 
445 		/*
446 		 * For page ranges with no indexed tuple, we must return the whole
447 		 * range; otherwise, compare it to the scan keys.
448 		 */
449 		if (!gottuple)
450 		{
451 			addrange = true;
452 		}
453 		else
454 		{
455 			dtup = brin_deform_tuple(bdesc, btup, dtup);
456 			if (dtup->bt_placeholder)
457 			{
458 				/*
459 				 * Placeholder tuples are always returned, regardless of the
460 				 * values stored in them.
461 				 */
462 				addrange = true;
463 			}
464 			else
465 			{
466 				int			keyno;
467 
468 				/*
469 				 * Compare scan keys with summary values stored for the range.
470 				 * If scan keys are matched, the page range must be added to
471 				 * the bitmap.  We initially assume the range needs to be
472 				 * added; in particular this serves the case where there are
473 				 * no keys.
474 				 */
475 				addrange = true;
476 				for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
477 				{
478 					ScanKey		key = &scan->keyData[keyno];
479 					AttrNumber	keyattno = key->sk_attno;
480 					BrinValues *bval = &dtup->bt_columns[keyattno - 1];
481 					Datum		add;
482 
483 					/*
484 					 * The collation of the scan key must match the collation
485 					 * used in the index column (but only if the search is not
486 					 * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
487 					 * this index ...
488 					 */
489 					Assert((key->sk_flags & SK_ISNULL) ||
490 						   (key->sk_collation ==
491 							TupleDescAttr(bdesc->bd_tupdesc,
492 										  keyattno - 1)->attcollation));
493 
494 					/* First time this column? look up consistent function */
495 					if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
496 					{
497 						FmgrInfo   *tmp;
498 
499 						tmp = index_getprocinfo(idxRel, keyattno,
500 												BRIN_PROCNUM_CONSISTENT);
501 						fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
502 									   CurrentMemoryContext);
503 					}
504 
505 					/*
506 					 * Check whether the scan key is consistent with the page
507 					 * range values; if so, have the pages in the range added
508 					 * to the output bitmap.
509 					 *
510 					 * When there are multiple scan keys, failure to meet the
511 					 * criteria for a single one of them is enough to discard
512 					 * the range as a whole, so break out of the loop as soon
513 					 * as a false return value is obtained.
514 					 */
515 					add = FunctionCall3Coll(&consistentFn[keyattno - 1],
516 											key->sk_collation,
517 											PointerGetDatum(bdesc),
518 											PointerGetDatum(bval),
519 											PointerGetDatum(key));
520 					addrange = DatumGetBool(add);
521 					if (!addrange)
522 						break;
523 				}
524 			}
525 		}
526 
527 		/* add the pages in the range to the output bitmap, if needed */
528 		if (addrange)
529 		{
530 			BlockNumber pageno;
531 
532 			for (pageno = heapBlk;
533 				 pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
534 				 pageno++)
535 			{
536 				MemoryContextSwitchTo(oldcxt);
537 				tbm_add_page(tbm, pageno);
538 				totalpages++;
539 				MemoryContextSwitchTo(perRangeCxt);
540 			}
541 		}
542 	}
543 
544 	MemoryContextSwitchTo(oldcxt);
545 	MemoryContextDelete(perRangeCxt);
546 
547 	if (buf != InvalidBuffer)
548 		ReleaseBuffer(buf);
549 
550 	/*
551 	 * XXX We have an approximation of the number of *pages* that our scan
552 	 * returns, but we don't have a precise idea of the number of heap tuples
553 	 * involved.
554 	 */
555 	return totalpages * 10;
556 }
557 
558 /*
559  * Re-initialize state for a BRIN index scan
560  */
561 void
brinrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)562 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
563 		   ScanKey orderbys, int norderbys)
564 {
565 	/*
566 	 * Other index AMs preprocess the scan keys at this point, or sometime
567 	 * early during the scan; this lets them optimize by removing redundant
568 	 * keys, or doing early returns when they are impossible to satisfy; see
569 	 * _bt_preprocess_keys for an example.  Something like that could be added
570 	 * here someday, too.
571 	 */
572 
573 	if (scankey && scan->numberOfKeys > 0)
574 		memmove(scan->keyData, scankey,
575 				scan->numberOfKeys * sizeof(ScanKeyData));
576 }
577 
578 /*
579  * Close down a BRIN index scan
580  */
581 void
brinendscan(IndexScanDesc scan)582 brinendscan(IndexScanDesc scan)
583 {
584 	BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
585 
586 	brinRevmapTerminate(opaque->bo_rmAccess);
587 	brin_free_desc(opaque->bo_bdesc);
588 	pfree(opaque);
589 }
590 
591 /*
592  * Per-heap-tuple callback for table_index_build_scan.
593  *
594  * Note we don't worry about the page range at the end of the table here; it is
595  * present in the build state struct after we're called the last time, but not
596  * inserted into the index.  Caller must ensure to do so, if appropriate.
597  */
598 static void
brinbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * brstate)599 brinbuildCallback(Relation index,
600 				  HeapTuple htup,
601 				  Datum *values,
602 				  bool *isnull,
603 				  bool tupleIsAlive,
604 				  void *brstate)
605 {
606 	BrinBuildState *state = (BrinBuildState *) brstate;
607 	BlockNumber thisblock;
608 	int			i;
609 
610 	thisblock = ItemPointerGetBlockNumber(&htup->t_self);
611 
612 	/*
613 	 * If we're in a block that belongs to a future range, summarize what
614 	 * we've got and start afresh.  Note the scan might have skipped many
615 	 * pages, if they were devoid of live tuples; make sure to insert index
616 	 * tuples for those too.
617 	 */
618 	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
619 	{
620 
621 		BRIN_elog((DEBUG2,
622 				   "brinbuildCallback: completed a range: %u--%u",
623 				   state->bs_currRangeStart,
624 				   state->bs_currRangeStart + state->bs_pagesPerRange));
625 
626 		/* create the index tuple and insert it */
627 		form_and_insert_tuple(state);
628 
629 		/* set state to correspond to the next range */
630 		state->bs_currRangeStart += state->bs_pagesPerRange;
631 
632 		/* re-initialize state for it */
633 		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
634 	}
635 
636 	/* Accumulate the current tuple into the running state */
637 	for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
638 	{
639 		FmgrInfo   *addValue;
640 		BrinValues *col;
641 		Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
642 
643 		col = &state->bs_dtuple->bt_columns[i];
644 		addValue = index_getprocinfo(index, i + 1,
645 									 BRIN_PROCNUM_ADDVALUE);
646 
647 		/*
648 		 * Update dtuple state, if and as necessary.
649 		 */
650 		FunctionCall4Coll(addValue,
651 						  attr->attcollation,
652 						  PointerGetDatum(state->bs_bdesc),
653 						  PointerGetDatum(col),
654 						  values[i], isnull[i]);
655 	}
656 }
657 
658 /*
659  * brinbuild() -- build a new BRIN index.
660  */
661 IndexBuildResult *
brinbuild(Relation heap,Relation index,IndexInfo * indexInfo)662 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
663 {
664 	IndexBuildResult *result;
665 	double		reltuples;
666 	double		idxtuples;
667 	BrinRevmap *revmap;
668 	BrinBuildState *state;
669 	Buffer		meta;
670 	BlockNumber pagesPerRange;
671 
672 	/*
673 	 * We expect to be called exactly once for any index relation.
674 	 */
675 	if (RelationGetNumberOfBlocks(index) != 0)
676 		elog(ERROR, "index \"%s\" already contains data",
677 			 RelationGetRelationName(index));
678 
679 	/*
680 	 * Critical section not required, because on error the creation of the
681 	 * whole relation will be rolled back.
682 	 */
683 
684 	meta = ReadBuffer(index, P_NEW);
685 	Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
686 	LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
687 
688 	brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
689 					   BRIN_CURRENT_VERSION);
690 	MarkBufferDirty(meta);
691 
692 	if (RelationNeedsWAL(index))
693 	{
694 		xl_brin_createidx xlrec;
695 		XLogRecPtr	recptr;
696 		Page		page;
697 
698 		xlrec.version = BRIN_CURRENT_VERSION;
699 		xlrec.pagesPerRange = BrinGetPagesPerRange(index);
700 
701 		XLogBeginInsert();
702 		XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
703 		XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
704 
705 		recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
706 
707 		page = BufferGetPage(meta);
708 		PageSetLSN(page, recptr);
709 	}
710 
711 	UnlockReleaseBuffer(meta);
712 
713 	/*
714 	 * Initialize our state, including the deformed tuple state.
715 	 */
716 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
717 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
718 
719 	/*
720 	 * Now scan the relation.  No syncscan allowed here because we want the
721 	 * heap blocks in physical order.
722 	 */
723 	reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
724 									   brinbuildCallback, (void *) state, NULL);
725 
726 	/* process the final batch */
727 	form_and_insert_tuple(state);
728 
729 	/* release resources */
730 	idxtuples = state->bs_numtuples;
731 	brinRevmapTerminate(state->bs_rmAccess);
732 	terminate_brin_buildstate(state);
733 
734 	/*
735 	 * Return statistics
736 	 */
737 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
738 
739 	result->heap_tuples = reltuples;
740 	result->index_tuples = idxtuples;
741 
742 	return result;
743 }
744 
745 void
brinbuildempty(Relation index)746 brinbuildempty(Relation index)
747 {
748 	Buffer		metabuf;
749 
750 	/* An empty BRIN index has a metapage only. */
751 	metabuf =
752 		ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
753 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
754 
755 	/* Initialize and xlog metabuffer. */
756 	START_CRIT_SECTION();
757 	brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
758 					   BRIN_CURRENT_VERSION);
759 	MarkBufferDirty(metabuf);
760 	log_newpage_buffer(metabuf, true);
761 	END_CRIT_SECTION();
762 
763 	UnlockReleaseBuffer(metabuf);
764 }
765 
766 /*
767  * brinbulkdelete
768  *		Since there are no per-heap-tuple index tuples in BRIN indexes,
769  *		there's not a lot we can do here.
770  *
771  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
772  * tuple is deleted), meaning the need to re-run summarization on the affected
773  * range.  Would need to add an extra flag in brintuples for that.
774  */
775 IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)776 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
777 			   IndexBulkDeleteCallback callback, void *callback_state)
778 {
779 	/* allocate stats if first time through, else re-use existing struct */
780 	if (stats == NULL)
781 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
782 
783 	return stats;
784 }
785 
786 /*
787  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
788  * ranges that are currently unsummarized.
789  */
790 IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)791 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
792 {
793 	Relation	heapRel;
794 
795 	/* No-op in ANALYZE ONLY mode */
796 	if (info->analyze_only)
797 		return stats;
798 
799 	if (!stats)
800 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
801 	stats->num_pages = RelationGetNumberOfBlocks(info->index);
802 	/* rest of stats is initialized by zeroing */
803 
804 	heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
805 						 AccessShareLock);
806 
807 	brin_vacuum_scan(info->index, info->strategy);
808 
809 	brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
810 				  &stats->num_index_tuples, &stats->num_index_tuples);
811 
812 	table_close(heapRel, AccessShareLock);
813 
814 	return stats;
815 }
816 
817 /*
818  * reloptions processor for BRIN indexes
819  */
820 bytea *
brinoptions(Datum reloptions,bool validate)821 brinoptions(Datum reloptions, bool validate)
822 {
823 	relopt_value *options;
824 	BrinOptions *rdopts;
825 	int			numoptions;
826 	static const relopt_parse_elt tab[] = {
827 		{"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
828 		{"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
829 	};
830 
831 	options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
832 							  &numoptions);
833 
834 	/* if none set, we're done */
835 	if (numoptions == 0)
836 		return NULL;
837 
838 	rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
839 
840 	fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
841 				   validate, tab, lengthof(tab));
842 
843 	pfree(options);
844 
845 	return (bytea *) rdopts;
846 }
847 
848 /*
849  * SQL-callable function to scan through an index and summarize all ranges
850  * that are not currently summarized.
851  */
852 Datum
brin_summarize_new_values(PG_FUNCTION_ARGS)853 brin_summarize_new_values(PG_FUNCTION_ARGS)
854 {
855 	Datum		relation = PG_GETARG_DATUM(0);
856 
857 	return DirectFunctionCall2(brin_summarize_range,
858 							   relation,
859 							   Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
860 }
861 
862 /*
863  * SQL-callable function to summarize the indicated page range, if not already
864  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
865  * unsummarized ranges are summarized.
866  */
867 Datum
brin_summarize_range(PG_FUNCTION_ARGS)868 brin_summarize_range(PG_FUNCTION_ARGS)
869 {
870 	Oid			indexoid = PG_GETARG_OID(0);
871 	int64		heapBlk64 = PG_GETARG_INT64(1);
872 	BlockNumber heapBlk;
873 	Oid			heapoid;
874 	Relation	indexRel;
875 	Relation	heapRel;
876 	double		numSummarized = 0;
877 
878 	if (RecoveryInProgress())
879 		ereport(ERROR,
880 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
881 				 errmsg("recovery is in progress"),
882 				 errhint("BRIN control functions cannot be executed during recovery.")));
883 
884 	if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
885 	{
886 		char	   *blk = psprintf(INT64_FORMAT, heapBlk64);
887 
888 		ereport(ERROR,
889 				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
890 				 errmsg("block number out of range: %s", blk)));
891 	}
892 	heapBlk = (BlockNumber) heapBlk64;
893 
894 	/*
895 	 * We must lock table before index to avoid deadlocks.  However, if the
896 	 * passed indexoid isn't an index then IndexGetRelation() will fail.
897 	 * Rather than emitting a not-very-helpful error message, postpone
898 	 * complaining, expecting that the is-it-an-index test below will fail.
899 	 */
900 	heapoid = IndexGetRelation(indexoid, true);
901 	if (OidIsValid(heapoid))
902 		heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
903 	else
904 		heapRel = NULL;
905 
906 	indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
907 
908 	/* Must be a BRIN index */
909 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
910 		indexRel->rd_rel->relam != BRIN_AM_OID)
911 		ereport(ERROR,
912 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
913 				 errmsg("\"%s\" is not a BRIN index",
914 						RelationGetRelationName(indexRel))));
915 
916 	/* User must own the index (comparable to privileges needed for VACUUM) */
917 	if (!pg_class_ownercheck(indexoid, GetUserId()))
918 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
919 					   RelationGetRelationName(indexRel));
920 
921 	/*
922 	 * Since we did the IndexGetRelation call above without any lock, it's
923 	 * barely possible that a race against an index drop/recreation could have
924 	 * netted us the wrong table.  Recheck.
925 	 */
926 	if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
927 		ereport(ERROR,
928 				(errcode(ERRCODE_UNDEFINED_TABLE),
929 				 errmsg("could not open parent table of index %s",
930 						RelationGetRelationName(indexRel))));
931 
932 	/* OK, do it */
933 	brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
934 
935 	relation_close(indexRel, ShareUpdateExclusiveLock);
936 	relation_close(heapRel, ShareUpdateExclusiveLock);
937 
938 	PG_RETURN_INT32((int32) numSummarized);
939 }
940 
941 /*
942  * SQL-callable interface to mark a range as no longer summarized
943  */
944 Datum
brin_desummarize_range(PG_FUNCTION_ARGS)945 brin_desummarize_range(PG_FUNCTION_ARGS)
946 {
947 	Oid			indexoid = PG_GETARG_OID(0);
948 	int64		heapBlk64 = PG_GETARG_INT64(1);
949 	BlockNumber heapBlk;
950 	Oid			heapoid;
951 	Relation	heapRel;
952 	Relation	indexRel;
953 	bool		done;
954 
955 	if (RecoveryInProgress())
956 		ereport(ERROR,
957 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
958 				 errmsg("recovery is in progress"),
959 				 errhint("BRIN control functions cannot be executed during recovery.")));
960 
961 	if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
962 	{
963 		char	   *blk = psprintf(INT64_FORMAT, heapBlk64);
964 
965 		ereport(ERROR,
966 				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
967 				 errmsg("block number out of range: %s", blk)));
968 	}
969 	heapBlk = (BlockNumber) heapBlk64;
970 
971 	/*
972 	 * We must lock table before index to avoid deadlocks.  However, if the
973 	 * passed indexoid isn't an index then IndexGetRelation() will fail.
974 	 * Rather than emitting a not-very-helpful error message, postpone
975 	 * complaining, expecting that the is-it-an-index test below will fail.
976 	 */
977 	heapoid = IndexGetRelation(indexoid, true);
978 	if (OidIsValid(heapoid))
979 		heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
980 	else
981 		heapRel = NULL;
982 
983 	indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
984 
985 	/* Must be a BRIN index */
986 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
987 		indexRel->rd_rel->relam != BRIN_AM_OID)
988 		ereport(ERROR,
989 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
990 				 errmsg("\"%s\" is not a BRIN index",
991 						RelationGetRelationName(indexRel))));
992 
993 	/* User must own the index (comparable to privileges needed for VACUUM) */
994 	if (!pg_class_ownercheck(indexoid, GetUserId()))
995 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
996 					   RelationGetRelationName(indexRel));
997 
998 	/*
999 	 * Since we did the IndexGetRelation call above without any lock, it's
1000 	 * barely possible that a race against an index drop/recreation could have
1001 	 * netted us the wrong table.  Recheck.
1002 	 */
1003 	if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1004 		ereport(ERROR,
1005 				(errcode(ERRCODE_UNDEFINED_TABLE),
1006 				 errmsg("could not open parent table of index %s",
1007 						RelationGetRelationName(indexRel))));
1008 
1009 	/* the revmap does the hard work */
1010 	do
1011 	{
1012 		done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1013 	}
1014 	while (!done);
1015 
1016 	relation_close(indexRel, ShareUpdateExclusiveLock);
1017 	relation_close(heapRel, ShareUpdateExclusiveLock);
1018 
1019 	PG_RETURN_VOID();
1020 }
1021 
1022 /*
1023  * Build a BrinDesc used to create or scan a BRIN index
1024  */
1025 BrinDesc *
brin_build_desc(Relation rel)1026 brin_build_desc(Relation rel)
1027 {
1028 	BrinOpcInfo **opcinfo;
1029 	BrinDesc   *bdesc;
1030 	TupleDesc	tupdesc;
1031 	int			totalstored = 0;
1032 	int			keyno;
1033 	long		totalsize;
1034 	MemoryContext cxt;
1035 	MemoryContext oldcxt;
1036 
1037 	cxt = AllocSetContextCreate(CurrentMemoryContext,
1038 								"brin desc cxt",
1039 								ALLOCSET_SMALL_SIZES);
1040 	oldcxt = MemoryContextSwitchTo(cxt);
1041 	tupdesc = RelationGetDescr(rel);
1042 
1043 	/*
1044 	 * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
1045 	 * the number of columns stored, since the number is opclass-defined.
1046 	 */
1047 	opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1048 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
1049 	{
1050 		FmgrInfo   *opcInfoFn;
1051 		Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1052 
1053 		opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1054 
1055 		opcinfo[keyno] = (BrinOpcInfo *)
1056 			DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1057 		totalstored += opcinfo[keyno]->oi_nstored;
1058 	}
1059 
1060 	/* Allocate our result struct and fill it in */
1061 	totalsize = offsetof(BrinDesc, bd_info) +
1062 		sizeof(BrinOpcInfo *) * tupdesc->natts;
1063 
1064 	bdesc = palloc(totalsize);
1065 	bdesc->bd_context = cxt;
1066 	bdesc->bd_index = rel;
1067 	bdesc->bd_tupdesc = tupdesc;
1068 	bdesc->bd_disktdesc = NULL; /* generated lazily */
1069 	bdesc->bd_totalstored = totalstored;
1070 
1071 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
1072 		bdesc->bd_info[keyno] = opcinfo[keyno];
1073 	pfree(opcinfo);
1074 
1075 	MemoryContextSwitchTo(oldcxt);
1076 
1077 	return bdesc;
1078 }
1079 
1080 void
brin_free_desc(BrinDesc * bdesc)1081 brin_free_desc(BrinDesc *bdesc)
1082 {
1083 	/* make sure the tupdesc is still valid */
1084 	Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1085 	/* no need for retail pfree */
1086 	MemoryContextDelete(bdesc->bd_context);
1087 }
1088 
1089 /*
1090  * Fetch index's statistical data into *stats
1091  */
1092 void
brinGetStats(Relation index,BrinStatsData * stats)1093 brinGetStats(Relation index, BrinStatsData *stats)
1094 {
1095 	Buffer		metabuffer;
1096 	Page		metapage;
1097 	BrinMetaPageData *metadata;
1098 
1099 	metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1100 	LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1101 	metapage = BufferGetPage(metabuffer);
1102 	metadata = (BrinMetaPageData *) PageGetContents(metapage);
1103 
1104 	stats->pagesPerRange = metadata->pagesPerRange;
1105 	stats->revmapNumPages = metadata->lastRevmapPage - 1;
1106 
1107 	UnlockReleaseBuffer(metabuffer);
1108 }
1109 
1110 /*
1111  * Initialize a BrinBuildState appropriate to create tuples on the given index.
1112  */
1113 static BrinBuildState *
initialize_brin_buildstate(Relation idxRel,BrinRevmap * revmap,BlockNumber pagesPerRange)1114 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1115 						   BlockNumber pagesPerRange)
1116 {
1117 	BrinBuildState *state;
1118 
1119 	state = palloc(sizeof(BrinBuildState));
1120 
1121 	state->bs_irel = idxRel;
1122 	state->bs_numtuples = 0;
1123 	state->bs_currentInsertBuf = InvalidBuffer;
1124 	state->bs_pagesPerRange = pagesPerRange;
1125 	state->bs_currRangeStart = 0;
1126 	state->bs_rmAccess = revmap;
1127 	state->bs_bdesc = brin_build_desc(idxRel);
1128 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1129 
1130 	brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1131 
1132 	return state;
1133 }
1134 
1135 /*
1136  * Release resources associated with a BrinBuildState.
1137  */
1138 static void
terminate_brin_buildstate(BrinBuildState * state)1139 terminate_brin_buildstate(BrinBuildState *state)
1140 {
1141 	/*
1142 	 * Release the last index buffer used.  We might as well ensure that
1143 	 * whatever free space remains in that page is available in FSM, too.
1144 	 */
1145 	if (!BufferIsInvalid(state->bs_currentInsertBuf))
1146 	{
1147 		Page		page;
1148 		Size		freespace;
1149 		BlockNumber blk;
1150 
1151 		page = BufferGetPage(state->bs_currentInsertBuf);
1152 		freespace = PageGetFreeSpace(page);
1153 		blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1154 		ReleaseBuffer(state->bs_currentInsertBuf);
1155 		RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1156 		FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1157 	}
1158 
1159 	brin_free_desc(state->bs_bdesc);
1160 	pfree(state->bs_dtuple);
1161 	pfree(state);
1162 }
1163 
1164 /*
1165  * On the given BRIN index, summarize the heap page range that corresponds
1166  * to the heap block number given.
1167  *
1168  * This routine can run in parallel with insertions into the heap.  To avoid
1169  * missing those values from the summary tuple, we first insert a placeholder
1170  * index tuple into the index, then execute the heap scan; transactions
1171  * concurrent with the scan update the placeholder tuple.  After the scan, we
1172  * union the placeholder tuple with the one computed by this routine.  The
1173  * update of the index value happens in a loop, so that if somebody updates
1174  * the placeholder tuple after we read it, we detect the case and try again.
1175  * This ensures that the concurrently inserted tuples are not lost.
1176  *
1177  * A further corner case is this routine being asked to summarize the partial
1178  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
1179  * table size; if we notice that the requested range lies beyond that size,
1180  * we re-compute the table size after inserting the placeholder tuple, to
1181  * avoid missing pages that were appended recently.
1182  */
1183 static void
summarize_range(IndexInfo * indexInfo,BrinBuildState * state,Relation heapRel,BlockNumber heapBlk,BlockNumber heapNumBlks)1184 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1185 				BlockNumber heapBlk, BlockNumber heapNumBlks)
1186 {
1187 	Buffer		phbuf;
1188 	BrinTuple  *phtup;
1189 	Size		phsz;
1190 	OffsetNumber offset;
1191 	BlockNumber scanNumBlks;
1192 
1193 	/*
1194 	 * Insert the placeholder tuple
1195 	 */
1196 	phbuf = InvalidBuffer;
1197 	phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1198 	offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1199 						   state->bs_rmAccess, &phbuf,
1200 						   heapBlk, phtup, phsz);
1201 
1202 	/*
1203 	 * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
1204 	 * cannot shrink concurrently (but it can grow).
1205 	 */
1206 	Assert(heapBlk % state->bs_pagesPerRange == 0);
1207 	if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1208 	{
1209 		/*
1210 		 * If we're asked to scan what we believe to be the final range on the
1211 		 * table (i.e. a range that might be partial) we need to recompute our
1212 		 * idea of what the latest page is after inserting the placeholder
1213 		 * tuple.  Anyone that grows the table later will update the
1214 		 * placeholder tuple, so it doesn't matter that we won't scan these
1215 		 * pages ourselves.  Careful: the table might have been extended
1216 		 * beyond the current range, so clamp our result.
1217 		 *
1218 		 * Fortunately, this should occur infrequently.
1219 		 */
1220 		scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1221 						  state->bs_pagesPerRange);
1222 	}
1223 	else
1224 	{
1225 		/* Easy case: range is known to be complete */
1226 		scanNumBlks = state->bs_pagesPerRange;
1227 	}
1228 
1229 	/*
1230 	 * Execute the partial heap scan covering the heap blocks in the specified
1231 	 * page range, summarizing the heap tuples in it.  This scan stops just
1232 	 * short of brinbuildCallback creating the new index entry.
1233 	 *
1234 	 * Note that it is critical we use the "any visible" mode of
1235 	 * table_index_build_range_scan here: otherwise, we would miss tuples
1236 	 * inserted by transactions that are still in progress, among other corner
1237 	 * cases.
1238 	 */
1239 	state->bs_currRangeStart = heapBlk;
1240 	table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1241 								 heapBlk, scanNumBlks,
1242 								 brinbuildCallback, (void *) state, NULL);
1243 
1244 	/*
1245 	 * Now we update the values obtained by the scan with the placeholder
1246 	 * tuple.  We do this in a loop which only terminates if we're able to
1247 	 * update the placeholder tuple successfully; if we are not, this means
1248 	 * somebody else modified the placeholder tuple after we read it.
1249 	 */
1250 	for (;;)
1251 	{
1252 		BrinTuple  *newtup;
1253 		Size		newsize;
1254 		bool		didupdate;
1255 		bool		samepage;
1256 
1257 		CHECK_FOR_INTERRUPTS();
1258 
1259 		/*
1260 		 * Update the summary tuple and try to update.
1261 		 */
1262 		newtup = brin_form_tuple(state->bs_bdesc,
1263 								 heapBlk, state->bs_dtuple, &newsize);
1264 		samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1265 		didupdate =
1266 			brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1267 						  state->bs_rmAccess, heapBlk, phbuf, offset,
1268 						  phtup, phsz, newtup, newsize, samepage);
1269 		brin_free_tuple(phtup);
1270 		brin_free_tuple(newtup);
1271 
1272 		/* If the update succeeded, we're done. */
1273 		if (didupdate)
1274 			break;
1275 
1276 		/*
1277 		 * If the update didn't work, it might be because somebody updated the
1278 		 * placeholder tuple concurrently.  Extract the new version, union it
1279 		 * with the values we have from the scan, and start over.  (There are
1280 		 * other reasons for the update to fail, but it's simple to treat them
1281 		 * the same.)
1282 		 */
1283 		phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1284 										 &offset, &phsz, BUFFER_LOCK_SHARE,
1285 										 NULL);
1286 		/* the placeholder tuple must exist */
1287 		if (phtup == NULL)
1288 			elog(ERROR, "missing placeholder tuple");
1289 		phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1290 		LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1291 
1292 		/* merge it into the tuple from the heap scan */
1293 		union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1294 	}
1295 
1296 	ReleaseBuffer(phbuf);
1297 }
1298 
1299 /*
1300  * Summarize page ranges that are not already summarized.  If pageRange is
1301  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1302  * page range containing the given heap page number is scanned.
1303  * If include_partial is true, then the partial range at the end of the table
1304  * is summarized, otherwise not.
1305  *
1306  * For each new index tuple inserted, *numSummarized (if not NULL) is
1307  * incremented; for each existing tuple, *numExisting (if not NULL) is
1308  * incremented.
1309  */
1310 static void
brinsummarize(Relation index,Relation heapRel,BlockNumber pageRange,bool include_partial,double * numSummarized,double * numExisting)1311 brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1312 			  bool include_partial, double *numSummarized, double *numExisting)
1313 {
1314 	BrinRevmap *revmap;
1315 	BrinBuildState *state = NULL;
1316 	IndexInfo  *indexInfo = NULL;
1317 	BlockNumber heapNumBlocks;
1318 	BlockNumber pagesPerRange;
1319 	Buffer		buf;
1320 	BlockNumber startBlk;
1321 
1322 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1323 
1324 	/* determine range of pages to process */
1325 	heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1326 	if (pageRange == BRIN_ALL_BLOCKRANGES)
1327 		startBlk = 0;
1328 	else
1329 	{
1330 		startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1331 		heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1332 	}
1333 	if (startBlk > heapNumBlocks)
1334 	{
1335 		/* Nothing to do if start point is beyond end of table */
1336 		brinRevmapTerminate(revmap);
1337 		return;
1338 	}
1339 
1340 	/*
1341 	 * Scan the revmap to find unsummarized items.
1342 	 */
1343 	buf = InvalidBuffer;
1344 	for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1345 	{
1346 		BrinTuple  *tup;
1347 		OffsetNumber off;
1348 
1349 		/*
1350 		 * Unless requested to summarize even a partial range, go away now if
1351 		 * we think the next range is partial.  Caller would pass true when it
1352 		 * is typically run once bulk data loading is done
1353 		 * (brin_summarize_new_values), and false when it is typically the
1354 		 * result of arbitrarily-scheduled maintenance command (vacuuming).
1355 		 */
1356 		if (!include_partial &&
1357 			(startBlk + pagesPerRange > heapNumBlocks))
1358 			break;
1359 
1360 		CHECK_FOR_INTERRUPTS();
1361 
1362 		tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1363 									   BUFFER_LOCK_SHARE, NULL);
1364 		if (tup == NULL)
1365 		{
1366 			/* no revmap entry for this heap range. Summarize it. */
1367 			if (state == NULL)
1368 			{
1369 				/* first time through */
1370 				Assert(!indexInfo);
1371 				state = initialize_brin_buildstate(index, revmap,
1372 												   pagesPerRange);
1373 				indexInfo = BuildIndexInfo(index);
1374 			}
1375 			summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1376 
1377 			/* and re-initialize state for the next range */
1378 			brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1379 
1380 			if (numSummarized)
1381 				*numSummarized += 1.0;
1382 		}
1383 		else
1384 		{
1385 			if (numExisting)
1386 				*numExisting += 1.0;
1387 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1388 		}
1389 	}
1390 
1391 	if (BufferIsValid(buf))
1392 		ReleaseBuffer(buf);
1393 
1394 	/* free resources */
1395 	brinRevmapTerminate(revmap);
1396 	if (state)
1397 	{
1398 		terminate_brin_buildstate(state);
1399 		pfree(indexInfo);
1400 	}
1401 }
1402 
1403 /*
1404  * Given a deformed tuple in the build state, convert it into the on-disk
1405  * format and insert it into the index, making the revmap point to it.
1406  */
1407 static void
form_and_insert_tuple(BrinBuildState * state)1408 form_and_insert_tuple(BrinBuildState *state)
1409 {
1410 	BrinTuple  *tup;
1411 	Size		size;
1412 
1413 	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1414 						  state->bs_dtuple, &size);
1415 	brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1416 				  &state->bs_currentInsertBuf, state->bs_currRangeStart,
1417 				  tup, size);
1418 	state->bs_numtuples++;
1419 
1420 	pfree(tup);
1421 }
1422 
1423 /*
1424  * Given two deformed tuples, adjust the first one so that it's consistent
1425  * with the summary values in both.
1426  */
1427 static void
union_tuples(BrinDesc * bdesc,BrinMemTuple * a,BrinTuple * b)1428 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1429 {
1430 	int			keyno;
1431 	BrinMemTuple *db;
1432 	MemoryContext cxt;
1433 	MemoryContext oldcxt;
1434 
1435 	/* Use our own memory context to avoid retail pfree */
1436 	cxt = AllocSetContextCreate(CurrentMemoryContext,
1437 								"brin union",
1438 								ALLOCSET_DEFAULT_SIZES);
1439 	oldcxt = MemoryContextSwitchTo(cxt);
1440 	db = brin_deform_tuple(bdesc, b, NULL);
1441 	MemoryContextSwitchTo(oldcxt);
1442 
1443 	for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1444 	{
1445 		FmgrInfo   *unionFn;
1446 		BrinValues *col_a = &a->bt_columns[keyno];
1447 		BrinValues *col_b = &db->bt_columns[keyno];
1448 
1449 		unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1450 									BRIN_PROCNUM_UNION);
1451 		FunctionCall3Coll(unionFn,
1452 						  bdesc->bd_index->rd_indcollation[keyno],
1453 						  PointerGetDatum(bdesc),
1454 						  PointerGetDatum(col_a),
1455 						  PointerGetDatum(col_b));
1456 	}
1457 
1458 	MemoryContextDelete(cxt);
1459 }
1460 
1461 /*
1462  * brin_vacuum_scan
1463  *		Do a complete scan of the index during VACUUM.
1464  *
1465  * This routine scans the complete index looking for uncatalogued index pages,
1466  * i.e. those that might have been lost due to a crash after index extension
1467  * and such.
1468  */
1469 static void
brin_vacuum_scan(Relation idxrel,BufferAccessStrategy strategy)1470 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1471 {
1472 	BlockNumber nblocks;
1473 	BlockNumber blkno;
1474 
1475 	/*
1476 	 * Scan the index in physical order, and clean up any possible mess in
1477 	 * each page.
1478 	 */
1479 	nblocks = RelationGetNumberOfBlocks(idxrel);
1480 	for (blkno = 0; blkno < nblocks; blkno++)
1481 	{
1482 		Buffer		buf;
1483 
1484 		CHECK_FOR_INTERRUPTS();
1485 
1486 		buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1487 								 RBM_NORMAL, strategy);
1488 
1489 		brin_page_cleanup(idxrel, buf);
1490 
1491 		ReleaseBuffer(buf);
1492 	}
1493 
1494 	/*
1495 	 * Update all upper pages in the index's FSM, as well.  This ensures not
1496 	 * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
1497 	 * but also that any pre-existing damage or out-of-dateness is repaired.
1498 	 */
1499 	FreeSpaceMapVacuum(idxrel);
1500 }
1501