1 /*
2  * brin.c
3  *		Implementation of BRIN indexes for Postgres
4  *
5  * See src/backend/access/brin/README for details.
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/brin/brin.c
12  *
13  * TODO
14  *		* ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15  */
16 #include "postgres.h"
17 
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/reloptions.h"
23 #include "access/relscan.h"
24 #include "access/xloginsert.h"
25 #include "catalog/index.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "pgstat.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/bufmgr.h"
31 #include "storage/freespace.h"
32 #include "utils/builtins.h"
33 #include "utils/index_selfuncs.h"
34 #include "utils/memutils.h"
35 #include "utils/rel.h"
36 
37 
38 /*
39  * We use a BrinBuildState during initial construction of a BRIN index.
40  * The running state is kept in a BrinMemTuple.
41  */
42 typedef struct BrinBuildState
43 {
44 	Relation	bs_irel;
45 	int			bs_numtuples;
46 	Buffer		bs_currentInsertBuf;
47 	BlockNumber bs_pagesPerRange;
48 	BlockNumber bs_currRangeStart;
49 	BrinRevmap *bs_rmAccess;
50 	BrinDesc   *bs_bdesc;
51 	BrinMemTuple *bs_dtuple;
52 } BrinBuildState;
53 
54 /*
55  * Struct used as "opaque" during index scans
56  */
57 typedef struct BrinOpaque
58 {
59 	BlockNumber bo_pagesPerRange;
60 	BrinRevmap *bo_rmAccess;
61 	BrinDesc   *bo_bdesc;
62 } BrinOpaque;
63 
64 #define BRIN_ALL_BLOCKRANGES	InvalidBlockNumber
65 
66 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
67 						   BrinRevmap *revmap, BlockNumber pagesPerRange);
68 static void terminate_brin_buildstate(BrinBuildState *state);
69 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
70 			  bool include_partial, double *numSummarized, double *numExisting);
71 static void form_and_insert_tuple(BrinBuildState *state);
72 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
73 			 BrinTuple *b);
74 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
75 
76 
77 /*
78  * BRIN handler function: return IndexAmRoutine with access method parameters
79  * and callbacks.
80  */
81 Datum
brinhandler(PG_FUNCTION_ARGS)82 brinhandler(PG_FUNCTION_ARGS)
83 {
84 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
85 
86 	amroutine->amstrategies = 0;
87 	amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
88 	amroutine->amcanorder = false;
89 	amroutine->amcanorderbyop = false;
90 	amroutine->amcanbackward = false;
91 	amroutine->amcanunique = false;
92 	amroutine->amcanmulticol = true;
93 	amroutine->amoptionalkey = true;
94 	amroutine->amsearcharray = false;
95 	amroutine->amsearchnulls = true;
96 	amroutine->amstorage = true;
97 	amroutine->amclusterable = false;
98 	amroutine->ampredlocks = false;
99 	amroutine->amcanparallel = false;
100 	amroutine->amkeytype = InvalidOid;
101 
102 	amroutine->ambuild = brinbuild;
103 	amroutine->ambuildempty = brinbuildempty;
104 	amroutine->aminsert = brininsert;
105 	amroutine->ambulkdelete = brinbulkdelete;
106 	amroutine->amvacuumcleanup = brinvacuumcleanup;
107 	amroutine->amcanreturn = NULL;
108 	amroutine->amcostestimate = brincostestimate;
109 	amroutine->amoptions = brinoptions;
110 	amroutine->amproperty = NULL;
111 	amroutine->amvalidate = brinvalidate;
112 	amroutine->ambeginscan = brinbeginscan;
113 	amroutine->amrescan = brinrescan;
114 	amroutine->amgettuple = NULL;
115 	amroutine->amgetbitmap = bringetbitmap;
116 	amroutine->amendscan = brinendscan;
117 	amroutine->ammarkpos = NULL;
118 	amroutine->amrestrpos = NULL;
119 	amroutine->amestimateparallelscan = NULL;
120 	amroutine->aminitparallelscan = NULL;
121 	amroutine->amparallelrescan = NULL;
122 
123 	PG_RETURN_POINTER(amroutine);
124 }
125 
126 /*
127  * A tuple in the heap is being inserted.  To keep a brin index up to date,
128  * we need to obtain the relevant index tuple and compare its stored values
129  * with those of the new tuple.  If the tuple values are not consistent with
130  * the summary tuple, we need to update the index tuple.
131  *
132  * If autosummarization is enabled, check if we need to summarize the previous
133  * page range.
134  *
135  * If the range is not currently summarized (i.e. the revmap returns NULL for
136  * it), there's nothing to do for this tuple.
137  */
138 bool
brininsert(Relation idxRel,Datum * values,bool * nulls,ItemPointer heaptid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)139 brininsert(Relation idxRel, Datum *values, bool *nulls,
140 		   ItemPointer heaptid, Relation heapRel,
141 		   IndexUniqueCheck checkUnique,
142 		   IndexInfo *indexInfo)
143 {
144 	BlockNumber pagesPerRange;
145 	BlockNumber origHeapBlk;
146 	BlockNumber heapBlk;
147 	BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
148 	BrinRevmap *revmap;
149 	Buffer		buf = InvalidBuffer;
150 	MemoryContext tupcxt = NULL;
151 	MemoryContext oldcxt = CurrentMemoryContext;
152 	bool		autosummarize = BrinGetAutoSummarize(idxRel);
153 
154 	revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
155 
156 	/*
157 	 * origHeapBlk is the block number where the insertion occurred.  heapBlk
158 	 * is the first block in the corresponding page range.
159 	 */
160 	origHeapBlk = ItemPointerGetBlockNumber(heaptid);
161 	heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
162 
163 	for (;;)
164 	{
165 		bool		need_insert = false;
166 		OffsetNumber off;
167 		BrinTuple  *brtup;
168 		BrinMemTuple *dtup;
169 		int			keyno;
170 
171 		CHECK_FOR_INTERRUPTS();
172 
173 		/*
174 		 * If auto-summarization is enabled and we just inserted the first
175 		 * tuple into the first block of a new non-first page range, request a
176 		 * summarization run of the previous range.
177 		 */
178 		if (autosummarize &&
179 			heapBlk > 0 &&
180 			heapBlk == origHeapBlk &&
181 			ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
182 		{
183 			BlockNumber lastPageRange = heapBlk - 1;
184 			BrinTuple  *lastPageTuple;
185 
186 			lastPageTuple =
187 				brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
188 										 NULL, BUFFER_LOCK_SHARE, NULL);
189 			if (!lastPageTuple)
190 			{
191 				bool	recorded;
192 
193 				recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
194 												 RelationGetRelid(idxRel),
195 												 lastPageRange);
196 				if (!recorded)
197 					ereport(LOG,
198 							(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
199 							 errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
200 									RelationGetRelationName(idxRel),
201 									lastPageRange)));
202 			}
203 			else
204 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
205 		}
206 
207 		brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
208 										 NULL, BUFFER_LOCK_SHARE, NULL);
209 
210 		/* if range is unsummarized, there's nothing to do */
211 		if (!brtup)
212 			break;
213 
214 		/* First time through in this statement? */
215 		if (bdesc == NULL)
216 		{
217 			MemoryContextSwitchTo(indexInfo->ii_Context);
218 			bdesc = brin_build_desc(idxRel);
219 			indexInfo->ii_AmCache = (void *) bdesc;
220 			MemoryContextSwitchTo(oldcxt);
221 		}
222 		/* First time through in this brininsert call? */
223 		if (tupcxt == NULL)
224 		{
225 			tupcxt = AllocSetContextCreate(CurrentMemoryContext,
226 										   "brininsert cxt",
227 										   ALLOCSET_DEFAULT_SIZES);
228 			MemoryContextSwitchTo(tupcxt);
229 		}
230 
231 		dtup = brin_deform_tuple(bdesc, brtup, NULL);
232 
233 		/*
234 		 * Compare the key values of the new tuple to the stored index values;
235 		 * our deformed tuple will get updated if the new tuple doesn't fit
236 		 * the original range (note this means we can't break out of the loop
237 		 * early). Make a note of whether this happens, so that we know to
238 		 * insert the modified tuple later.
239 		 */
240 		for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
241 		{
242 			Datum		result;
243 			BrinValues *bval;
244 			FmgrInfo   *addValue;
245 
246 			bval = &dtup->bt_columns[keyno];
247 			addValue = index_getprocinfo(idxRel, keyno + 1,
248 										 BRIN_PROCNUM_ADDVALUE);
249 			result = FunctionCall4Coll(addValue,
250 									   idxRel->rd_indcollation[keyno],
251 									   PointerGetDatum(bdesc),
252 									   PointerGetDatum(bval),
253 									   values[keyno],
254 									   nulls[keyno]);
255 			/* if that returned true, we need to insert the updated tuple */
256 			need_insert |= DatumGetBool(result);
257 		}
258 
259 		if (!need_insert)
260 		{
261 			/*
262 			 * The tuple is consistent with the new values, so there's nothing
263 			 * to do.
264 			 */
265 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
266 		}
267 		else
268 		{
269 			Page		page = BufferGetPage(buf);
270 			ItemId		lp = PageGetItemId(page, off);
271 			Size		origsz;
272 			BrinTuple  *origtup;
273 			Size		newsz;
274 			BrinTuple  *newtup;
275 			bool		samepage;
276 
277 			/*
278 			 * Make a copy of the old tuple, so that we can compare it after
279 			 * re-acquiring the lock.
280 			 */
281 			origsz = ItemIdGetLength(lp);
282 			origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
283 
284 			/*
285 			 * Before releasing the lock, check if we can attempt a same-page
286 			 * update.  Another process could insert a tuple concurrently in
287 			 * the same page though, so downstream we must be prepared to cope
288 			 * if this turns out to not be possible after all.
289 			 */
290 			newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
291 			samepage = brin_can_do_samepage_update(buf, origsz, newsz);
292 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
293 
294 			/*
295 			 * Try to update the tuple.  If this doesn't work for whatever
296 			 * reason, we need to restart from the top; the revmap might be
297 			 * pointing at a different tuple for this block now, so we need to
298 			 * recompute to ensure both our new heap tuple and the other
299 			 * inserter's are covered by the combined tuple.  It might be that
300 			 * we don't need to update at all.
301 			 */
302 			if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
303 							   buf, off, origtup, origsz, newtup, newsz,
304 							   samepage))
305 			{
306 				/* no luck; start over */
307 				MemoryContextResetAndDeleteChildren(tupcxt);
308 				continue;
309 			}
310 		}
311 
312 		/* success! */
313 		break;
314 	}
315 
316 	brinRevmapTerminate(revmap);
317 	if (BufferIsValid(buf))
318 		ReleaseBuffer(buf);
319 	MemoryContextSwitchTo(oldcxt);
320 	if (tupcxt != NULL)
321 		MemoryContextDelete(tupcxt);
322 
323 	return false;
324 }
325 
326 /*
327  * Initialize state for a BRIN index scan.
328  *
329  * We read the metapage here to determine the pages-per-range number that this
330  * index was built with.  Note that since this cannot be changed while we're
331  * holding lock on index, it's not necessary to recompute it during brinrescan.
332  */
333 IndexScanDesc
brinbeginscan(Relation r,int nkeys,int norderbys)334 brinbeginscan(Relation r, int nkeys, int norderbys)
335 {
336 	IndexScanDesc scan;
337 	BrinOpaque *opaque;
338 
339 	scan = RelationGetIndexScan(r, nkeys, norderbys);
340 
341 	opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
342 	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
343 											   scan->xs_snapshot);
344 	opaque->bo_bdesc = brin_build_desc(r);
345 	scan->opaque = opaque;
346 
347 	return scan;
348 }
349 
350 /*
351  * Execute the index scan.
352  *
353  * This works by reading index TIDs from the revmap, and obtaining the index
354  * tuples pointed to by them; the summary values in the index tuples are
355  * compared to the scan keys.  We return into the TID bitmap all the pages in
356  * ranges corresponding to index tuples that match the scan keys.
357  *
358  * If a TID from the revmap is read as InvalidTID, we know that range is
359  * unsummarized.  Pages in those ranges need to be returned regardless of scan
360  * keys.
361  */
362 int64
bringetbitmap(IndexScanDesc scan,TIDBitmap * tbm)363 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
364 {
365 	Relation	idxRel = scan->indexRelation;
366 	Buffer		buf = InvalidBuffer;
367 	BrinDesc   *bdesc;
368 	Oid			heapOid;
369 	Relation	heapRel;
370 	BrinOpaque *opaque;
371 	BlockNumber nblocks;
372 	BlockNumber heapBlk;
373 	int			totalpages = 0;
374 	FmgrInfo   *consistentFn;
375 	MemoryContext oldcxt;
376 	MemoryContext perRangeCxt;
377 	BrinMemTuple *dtup;
378 	BrinTuple  *btup = NULL;
379 	Size		btupsz = 0;
380 
381 	opaque = (BrinOpaque *) scan->opaque;
382 	bdesc = opaque->bo_bdesc;
383 	pgstat_count_index_scan(idxRel);
384 
385 	/*
386 	 * We need to know the size of the table so that we know how long to
387 	 * iterate on the revmap.
388 	 */
389 	heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
390 	heapRel = heap_open(heapOid, AccessShareLock);
391 	nblocks = RelationGetNumberOfBlocks(heapRel);
392 	heap_close(heapRel, AccessShareLock);
393 
394 	/*
395 	 * Make room for the consistent support procedures of indexed columns.  We
396 	 * don't look them up here; we do that lazily the first time we see a scan
397 	 * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
398 	 */
399 	consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
400 
401 	/* allocate an initial in-memory tuple, out of the per-range memcxt */
402 	dtup = brin_new_memtuple(bdesc);
403 
404 	/*
405 	 * Setup and use a per-range memory context, which is reset every time we
406 	 * loop below.  This avoids having to free the tuples within the loop.
407 	 */
408 	perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
409 										"bringetbitmap cxt",
410 										ALLOCSET_DEFAULT_SIZES);
411 	oldcxt = MemoryContextSwitchTo(perRangeCxt);
412 
413 	/*
414 	 * Now scan the revmap.  We start by querying for heap page 0,
415 	 * incrementing by the number of pages per range; this gives us a full
416 	 * view of the table.
417 	 */
418 	for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
419 	{
420 		bool		addrange;
421 		bool		gottuple = false;
422 		BrinTuple  *tup;
423 		OffsetNumber off;
424 		Size		size;
425 
426 		CHECK_FOR_INTERRUPTS();
427 
428 		MemoryContextResetAndDeleteChildren(perRangeCxt);
429 
430 		tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
431 									   &off, &size, BUFFER_LOCK_SHARE,
432 									   scan->xs_snapshot);
433 		if (tup)
434 		{
435 			gottuple = true;
436 			btup = brin_copy_tuple(tup, size, btup, &btupsz);
437 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
438 		}
439 
440 		/*
441 		 * For page ranges with no indexed tuple, we must return the whole
442 		 * range; otherwise, compare it to the scan keys.
443 		 */
444 		if (!gottuple)
445 		{
446 			addrange = true;
447 		}
448 		else
449 		{
450 			dtup = brin_deform_tuple(bdesc, btup, dtup);
451 			if (dtup->bt_placeholder)
452 			{
453 				/*
454 				 * Placeholder tuples are always returned, regardless of the
455 				 * values stored in them.
456 				 */
457 				addrange = true;
458 			}
459 			else
460 			{
461 				int			keyno;
462 
463 				/*
464 				 * Compare scan keys with summary values stored for the range.
465 				 * If scan keys are matched, the page range must be added to
466 				 * the bitmap.  We initially assume the range needs to be
467 				 * added; in particular this serves the case where there are
468 				 * no keys.
469 				 */
470 				addrange = true;
471 				for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
472 				{
473 					ScanKey		key = &scan->keyData[keyno];
474 					AttrNumber	keyattno = key->sk_attno;
475 					BrinValues *bval = &dtup->bt_columns[keyattno - 1];
476 					Datum		add;
477 
478 					/*
479 					 * The collation of the scan key must match the collation
480 					 * used in the index column (but only if the search is not
481 					 * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
482 					 * this index ...
483 					 */
484 					Assert((key->sk_flags & SK_ISNULL) ||
485 						   (key->sk_collation ==
486 							bdesc->bd_tupdesc->attrs[keyattno - 1]->attcollation));
487 
488 					/* First time this column? look up consistent function */
489 					if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
490 					{
491 						FmgrInfo   *tmp;
492 
493 						tmp = index_getprocinfo(idxRel, keyattno,
494 												BRIN_PROCNUM_CONSISTENT);
495 						fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
496 									   CurrentMemoryContext);
497 					}
498 
499 					/*
500 					 * Check whether the scan key is consistent with the page
501 					 * range values; if so, have the pages in the range added
502 					 * to the output bitmap.
503 					 *
504 					 * When there are multiple scan keys, failure to meet the
505 					 * criteria for a single one of them is enough to discard
506 					 * the range as a whole, so break out of the loop as soon
507 					 * as a false return value is obtained.
508 					 */
509 					add = FunctionCall3Coll(&consistentFn[keyattno - 1],
510 											key->sk_collation,
511 											PointerGetDatum(bdesc),
512 											PointerGetDatum(bval),
513 											PointerGetDatum(key));
514 					addrange = DatumGetBool(add);
515 					if (!addrange)
516 						break;
517 				}
518 			}
519 		}
520 
521 		/* add the pages in the range to the output bitmap, if needed */
522 		if (addrange)
523 		{
524 			BlockNumber pageno;
525 
526 			for (pageno = heapBlk;
527 				 pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
528 				 pageno++)
529 			{
530 				MemoryContextSwitchTo(oldcxt);
531 				tbm_add_page(tbm, pageno);
532 				totalpages++;
533 				MemoryContextSwitchTo(perRangeCxt);
534 			}
535 		}
536 	}
537 
538 	MemoryContextSwitchTo(oldcxt);
539 	MemoryContextDelete(perRangeCxt);
540 
541 	if (buf != InvalidBuffer)
542 		ReleaseBuffer(buf);
543 
544 	/*
545 	 * XXX We have an approximation of the number of *pages* that our scan
546 	 * returns, but we don't have a precise idea of the number of heap tuples
547 	 * involved.
548 	 */
549 	return totalpages * 10;
550 }
551 
552 /*
553  * Re-initialize state for a BRIN index scan
554  */
555 void
brinrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)556 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
557 		   ScanKey orderbys, int norderbys)
558 {
559 	/*
560 	 * Other index AMs preprocess the scan keys at this point, or sometime
561 	 * early during the scan; this lets them optimize by removing redundant
562 	 * keys, or doing early returns when they are impossible to satisfy; see
563 	 * _bt_preprocess_keys for an example.  Something like that could be added
564 	 * here someday, too.
565 	 */
566 
567 	if (scankey && scan->numberOfKeys > 0)
568 		memmove(scan->keyData, scankey,
569 				scan->numberOfKeys * sizeof(ScanKeyData));
570 }
571 
572 /*
573  * Close down a BRIN index scan
574  */
575 void
brinendscan(IndexScanDesc scan)576 brinendscan(IndexScanDesc scan)
577 {
578 	BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
579 
580 	brinRevmapTerminate(opaque->bo_rmAccess);
581 	brin_free_desc(opaque->bo_bdesc);
582 	pfree(opaque);
583 }
584 
585 /*
586  * Per-heap-tuple callback for IndexBuildHeapScan.
587  *
588  * Note we don't worry about the page range at the end of the table here; it is
589  * present in the build state struct after we're called the last time, but not
590  * inserted into the index.  Caller must ensure to do so, if appropriate.
591  */
592 static void
brinbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * brstate)593 brinbuildCallback(Relation index,
594 				  HeapTuple htup,
595 				  Datum *values,
596 				  bool *isnull,
597 				  bool tupleIsAlive,
598 				  void *brstate)
599 {
600 	BrinBuildState *state = (BrinBuildState *) brstate;
601 	BlockNumber thisblock;
602 	int			i;
603 
604 	thisblock = ItemPointerGetBlockNumber(&htup->t_self);
605 
606 	/*
607 	 * If we're in a block that belongs to a future range, summarize what
608 	 * we've got and start afresh.  Note the scan might have skipped many
609 	 * pages, if they were devoid of live tuples; make sure to insert index
610 	 * tuples for those too.
611 	 */
612 	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
613 	{
614 
615 		BRIN_elog((DEBUG2,
616 				   "brinbuildCallback: completed a range: %u--%u",
617 				   state->bs_currRangeStart,
618 				   state->bs_currRangeStart + state->bs_pagesPerRange));
619 
620 		/* create the index tuple and insert it */
621 		form_and_insert_tuple(state);
622 
623 		/* set state to correspond to the next range */
624 		state->bs_currRangeStart += state->bs_pagesPerRange;
625 
626 		/* re-initialize state for it */
627 		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
628 	}
629 
630 	/* Accumulate the current tuple into the running state */
631 	for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
632 	{
633 		FmgrInfo   *addValue;
634 		BrinValues *col;
635 
636 		col = &state->bs_dtuple->bt_columns[i];
637 		addValue = index_getprocinfo(index, i + 1,
638 									 BRIN_PROCNUM_ADDVALUE);
639 
640 		/*
641 		 * Update dtuple state, if and as necessary.
642 		 */
643 		FunctionCall4Coll(addValue,
644 						  state->bs_bdesc->bd_tupdesc->attrs[i]->attcollation,
645 						  PointerGetDatum(state->bs_bdesc),
646 						  PointerGetDatum(col),
647 						  values[i], isnull[i]);
648 	}
649 }
650 
651 /*
652  * brinbuild() -- build a new BRIN index.
653  */
654 IndexBuildResult *
brinbuild(Relation heap,Relation index,IndexInfo * indexInfo)655 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
656 {
657 	IndexBuildResult *result;
658 	double		reltuples;
659 	double		idxtuples;
660 	BrinRevmap *revmap;
661 	BrinBuildState *state;
662 	Buffer		meta;
663 	BlockNumber pagesPerRange;
664 
665 	/*
666 	 * We expect to be called exactly once for any index relation.
667 	 */
668 	if (RelationGetNumberOfBlocks(index) != 0)
669 		elog(ERROR, "index \"%s\" already contains data",
670 			 RelationGetRelationName(index));
671 
672 	/*
673 	 * Critical section not required, because on error the creation of the
674 	 * whole relation will be rolled back.
675 	 */
676 
677 	meta = ReadBuffer(index, P_NEW);
678 	Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
679 	LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
680 
681 	brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
682 					   BRIN_CURRENT_VERSION);
683 	MarkBufferDirty(meta);
684 
685 	if (RelationNeedsWAL(index))
686 	{
687 		xl_brin_createidx xlrec;
688 		XLogRecPtr	recptr;
689 		Page		page;
690 
691 		xlrec.version = BRIN_CURRENT_VERSION;
692 		xlrec.pagesPerRange = BrinGetPagesPerRange(index);
693 
694 		XLogBeginInsert();
695 		XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
696 		XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT);
697 
698 		recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
699 
700 		page = BufferGetPage(meta);
701 		PageSetLSN(page, recptr);
702 	}
703 
704 	UnlockReleaseBuffer(meta);
705 
706 	/*
707 	 * Initialize our state, including the deformed tuple state.
708 	 */
709 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
710 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
711 
712 	/*
713 	 * Now scan the relation.  No syncscan allowed here because we want the
714 	 * heap blocks in physical order.
715 	 */
716 	reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
717 								   brinbuildCallback, (void *) state);
718 
719 	/* process the final batch */
720 	form_and_insert_tuple(state);
721 
722 	/* release resources */
723 	idxtuples = state->bs_numtuples;
724 	brinRevmapTerminate(state->bs_rmAccess);
725 	terminate_brin_buildstate(state);
726 
727 	/*
728 	 * Return statistics
729 	 */
730 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
731 
732 	result->heap_tuples = reltuples;
733 	result->index_tuples = idxtuples;
734 
735 	return result;
736 }
737 
738 void
brinbuildempty(Relation index)739 brinbuildempty(Relation index)
740 {
741 	Buffer		metabuf;
742 
743 	/* An empty BRIN index has a metapage only. */
744 	metabuf =
745 		ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
746 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
747 
748 	/* Initialize and xlog metabuffer. */
749 	START_CRIT_SECTION();
750 	brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
751 					   BRIN_CURRENT_VERSION);
752 	MarkBufferDirty(metabuf);
753 	log_newpage_buffer(metabuf, false);
754 	END_CRIT_SECTION();
755 
756 	UnlockReleaseBuffer(metabuf);
757 }
758 
759 /*
760  * brinbulkdelete
761  *		Since there are no per-heap-tuple index tuples in BRIN indexes,
762  *		there's not a lot we can do here.
763  *
764  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
765  * tuple is deleted), meaning the need to re-run summarization on the affected
766  * range.  Would need to add an extra flag in brintuples for that.
767  */
768 IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)769 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
770 			   IndexBulkDeleteCallback callback, void *callback_state)
771 {
772 	/* allocate stats if first time through, else re-use existing struct */
773 	if (stats == NULL)
774 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
775 
776 	return stats;
777 }
778 
779 /*
780  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
781  * ranges that are currently unsummarized.
782  */
783 IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)784 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
785 {
786 	Relation	heapRel;
787 
788 	/* No-op in ANALYZE ONLY mode */
789 	if (info->analyze_only)
790 		return stats;
791 
792 	if (!stats)
793 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
794 	stats->num_pages = RelationGetNumberOfBlocks(info->index);
795 	/* rest of stats is initialized by zeroing */
796 
797 	heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
798 						AccessShareLock);
799 
800 	brin_vacuum_scan(info->index, info->strategy);
801 
802 	brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
803 				  &stats->num_index_tuples, &stats->num_index_tuples);
804 
805 	heap_close(heapRel, AccessShareLock);
806 
807 	return stats;
808 }
809 
810 /*
811  * reloptions processor for BRIN indexes
812  */
813 bytea *
brinoptions(Datum reloptions,bool validate)814 brinoptions(Datum reloptions, bool validate)
815 {
816 	relopt_value *options;
817 	BrinOptions *rdopts;
818 	int			numoptions;
819 	static const relopt_parse_elt tab[] = {
820 		{"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
821 		{"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
822 	};
823 
824 	options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
825 							  &numoptions);
826 
827 	/* if none set, we're done */
828 	if (numoptions == 0)
829 		return NULL;
830 
831 	rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
832 
833 	fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
834 				   validate, tab, lengthof(tab));
835 
836 	pfree(options);
837 
838 	return (bytea *) rdopts;
839 }
840 
841 /*
842  * SQL-callable function to scan through an index and summarize all ranges
843  * that are not currently summarized.
844  */
845 Datum
brin_summarize_new_values(PG_FUNCTION_ARGS)846 brin_summarize_new_values(PG_FUNCTION_ARGS)
847 {
848 	Datum		relation = PG_GETARG_DATUM(0);
849 
850 	return DirectFunctionCall2(brin_summarize_range,
851 							   relation,
852 							   Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
853 }
854 
855 /*
856  * SQL-callable function to summarize the indicated page range, if not already
857  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
858  * unsummarized ranges are summarized.
859  */
860 Datum
brin_summarize_range(PG_FUNCTION_ARGS)861 brin_summarize_range(PG_FUNCTION_ARGS)
862 {
863 	Oid			indexoid = PG_GETARG_OID(0);
864 	int64		heapBlk64 = PG_GETARG_INT64(1);
865 	BlockNumber heapBlk;
866 	Oid			heapoid;
867 	Relation	indexRel;
868 	Relation	heapRel;
869 	double		numSummarized = 0;
870 
871 	if (RecoveryInProgress())
872 		ereport(ERROR,
873 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
874 				 errmsg("recovery is in progress"),
875 				 errhint("BRIN control functions cannot be executed during recovery.")));
876 
877 	if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
878 	{
879 		char	   *blk = psprintf(INT64_FORMAT, heapBlk64);
880 
881 		ereport(ERROR,
882 				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
883 				 errmsg("block number out of range: %s", blk)));
884 	}
885 	heapBlk = (BlockNumber) heapBlk64;
886 
887 	/*
888 	 * We must lock table before index to avoid deadlocks.  However, if the
889 	 * passed indexoid isn't an index then IndexGetRelation() will fail.
890 	 * Rather than emitting a not-very-helpful error message, postpone
891 	 * complaining, expecting that the is-it-an-index test below will fail.
892 	 */
893 	heapoid = IndexGetRelation(indexoid, true);
894 	if (OidIsValid(heapoid))
895 		heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
896 	else
897 		heapRel = NULL;
898 
899 	indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
900 
901 	/* Must be a BRIN index */
902 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
903 		indexRel->rd_rel->relam != BRIN_AM_OID)
904 		ereport(ERROR,
905 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
906 				 errmsg("\"%s\" is not a BRIN index",
907 						RelationGetRelationName(indexRel))));
908 
909 	/* User must own the index (comparable to privileges needed for VACUUM) */
910 	if (!pg_class_ownercheck(indexoid, GetUserId()))
911 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
912 					   RelationGetRelationName(indexRel));
913 
914 	/*
915 	 * Since we did the IndexGetRelation call above without any lock, it's
916 	 * barely possible that a race against an index drop/recreation could have
917 	 * netted us the wrong table.  Recheck.
918 	 */
919 	if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
920 		ereport(ERROR,
921 				(errcode(ERRCODE_UNDEFINED_TABLE),
922 				 errmsg("could not open parent table of index %s",
923 						RelationGetRelationName(indexRel))));
924 
925 	/* OK, do it */
926 	brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
927 
928 	relation_close(indexRel, ShareUpdateExclusiveLock);
929 	relation_close(heapRel, ShareUpdateExclusiveLock);
930 
931 	PG_RETURN_INT32((int32) numSummarized);
932 }
933 
934 /*
935  * SQL-callable interface to mark a range as no longer summarized
936  */
937 Datum
brin_desummarize_range(PG_FUNCTION_ARGS)938 brin_desummarize_range(PG_FUNCTION_ARGS)
939 {
940 	Oid			indexoid = PG_GETARG_OID(0);
941 	int64		heapBlk64 = PG_GETARG_INT64(1);
942 	BlockNumber heapBlk;
943 	Oid			heapoid;
944 	Relation	heapRel;
945 	Relation	indexRel;
946 	bool		done;
947 
948 	if (RecoveryInProgress())
949 		ereport(ERROR,
950 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
951 				 errmsg("recovery is in progress"),
952 				 errhint("BRIN control functions cannot be executed during recovery.")));
953 
954 	if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
955 	{
956 		char	   *blk = psprintf(INT64_FORMAT, heapBlk64);
957 
958 		ereport(ERROR,
959 				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
960 				 errmsg("block number out of range: %s", blk)));
961 	}
962 	heapBlk = (BlockNumber) heapBlk64;
963 
964 	/*
965 	 * We must lock table before index to avoid deadlocks.  However, if the
966 	 * passed indexoid isn't an index then IndexGetRelation() will fail.
967 	 * Rather than emitting a not-very-helpful error message, postpone
968 	 * complaining, expecting that the is-it-an-index test below will fail.
969 	 */
970 	heapoid = IndexGetRelation(indexoid, true);
971 	if (OidIsValid(heapoid))
972 		heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
973 	else
974 		heapRel = NULL;
975 
976 	indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
977 
978 	/* Must be a BRIN index */
979 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
980 		indexRel->rd_rel->relam != BRIN_AM_OID)
981 		ereport(ERROR,
982 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
983 				 errmsg("\"%s\" is not a BRIN index",
984 						RelationGetRelationName(indexRel))));
985 
986 	/* User must own the index (comparable to privileges needed for VACUUM) */
987 	if (!pg_class_ownercheck(indexoid, GetUserId()))
988 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
989 					   RelationGetRelationName(indexRel));
990 
991 	/*
992 	 * Since we did the IndexGetRelation call above without any lock, it's
993 	 * barely possible that a race against an index drop/recreation could have
994 	 * netted us the wrong table.  Recheck.
995 	 */
996 	if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
997 		ereport(ERROR,
998 				(errcode(ERRCODE_UNDEFINED_TABLE),
999 				 errmsg("could not open parent table of index %s",
1000 						RelationGetRelationName(indexRel))));
1001 
1002 	/* the revmap does the hard work */
1003 	do
1004 	{
1005 		done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1006 	}
1007 	while (!done);
1008 
1009 	relation_close(indexRel, ShareUpdateExclusiveLock);
1010 	relation_close(heapRel, ShareUpdateExclusiveLock);
1011 
1012 	PG_RETURN_VOID();
1013 }
1014 
1015 /*
1016  * Build a BrinDesc used to create or scan a BRIN index
1017  */
1018 BrinDesc *
brin_build_desc(Relation rel)1019 brin_build_desc(Relation rel)
1020 {
1021 	BrinOpcInfo **opcinfo;
1022 	BrinDesc   *bdesc;
1023 	TupleDesc	tupdesc;
1024 	int			totalstored = 0;
1025 	int			keyno;
1026 	long		totalsize;
1027 	MemoryContext cxt;
1028 	MemoryContext oldcxt;
1029 
1030 	cxt = AllocSetContextCreate(CurrentMemoryContext,
1031 								"brin desc cxt",
1032 								ALLOCSET_SMALL_SIZES);
1033 	oldcxt = MemoryContextSwitchTo(cxt);
1034 	tupdesc = RelationGetDescr(rel);
1035 
1036 	/*
1037 	 * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
1038 	 * the number of columns stored, since the number is opclass-defined.
1039 	 */
1040 	opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1041 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
1042 	{
1043 		FmgrInfo   *opcInfoFn;
1044 
1045 		opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1046 
1047 		opcinfo[keyno] = (BrinOpcInfo *)
1048 			DatumGetPointer(FunctionCall1(opcInfoFn,
1049 										  tupdesc->attrs[keyno]->atttypid));
1050 		totalstored += opcinfo[keyno]->oi_nstored;
1051 	}
1052 
1053 	/* Allocate our result struct and fill it in */
1054 	totalsize = offsetof(BrinDesc, bd_info) +
1055 		sizeof(BrinOpcInfo *) * tupdesc->natts;
1056 
1057 	bdesc = palloc(totalsize);
1058 	bdesc->bd_context = cxt;
1059 	bdesc->bd_index = rel;
1060 	bdesc->bd_tupdesc = tupdesc;
1061 	bdesc->bd_disktdesc = NULL; /* generated lazily */
1062 	bdesc->bd_totalstored = totalstored;
1063 
1064 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
1065 		bdesc->bd_info[keyno] = opcinfo[keyno];
1066 	pfree(opcinfo);
1067 
1068 	MemoryContextSwitchTo(oldcxt);
1069 
1070 	return bdesc;
1071 }
1072 
1073 void
brin_free_desc(BrinDesc * bdesc)1074 brin_free_desc(BrinDesc *bdesc)
1075 {
1076 	/* make sure the tupdesc is still valid */
1077 	Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1078 	/* no need for retail pfree */
1079 	MemoryContextDelete(bdesc->bd_context);
1080 }
1081 
1082 /*
1083  * Fetch index's statistical data into *stats
1084  */
1085 void
brinGetStats(Relation index,BrinStatsData * stats)1086 brinGetStats(Relation index, BrinStatsData *stats)
1087 {
1088 	Buffer		metabuffer;
1089 	Page		metapage;
1090 	BrinMetaPageData *metadata;
1091 
1092 	metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1093 	LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1094 	metapage = BufferGetPage(metabuffer);
1095 	metadata = (BrinMetaPageData *) PageGetContents(metapage);
1096 
1097 	stats->pagesPerRange = metadata->pagesPerRange;
1098 	stats->revmapNumPages = metadata->lastRevmapPage - 1;
1099 
1100 	UnlockReleaseBuffer(metabuffer);
1101 }
1102 
1103 /*
1104  * Initialize a BrinBuildState appropriate to create tuples on the given index.
1105  */
1106 static BrinBuildState *
initialize_brin_buildstate(Relation idxRel,BrinRevmap * revmap,BlockNumber pagesPerRange)1107 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1108 						   BlockNumber pagesPerRange)
1109 {
1110 	BrinBuildState *state;
1111 
1112 	state = palloc(sizeof(BrinBuildState));
1113 
1114 	state->bs_irel = idxRel;
1115 	state->bs_numtuples = 0;
1116 	state->bs_currentInsertBuf = InvalidBuffer;
1117 	state->bs_pagesPerRange = pagesPerRange;
1118 	state->bs_currRangeStart = 0;
1119 	state->bs_rmAccess = revmap;
1120 	state->bs_bdesc = brin_build_desc(idxRel);
1121 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1122 
1123 	brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1124 
1125 	return state;
1126 }
1127 
1128 /*
1129  * Release resources associated with a BrinBuildState.
1130  */
1131 static void
terminate_brin_buildstate(BrinBuildState * state)1132 terminate_brin_buildstate(BrinBuildState *state)
1133 {
1134 	/* release the last index buffer used */
1135 	if (!BufferIsInvalid(state->bs_currentInsertBuf))
1136 	{
1137 		Page		page;
1138 
1139 		page = BufferGetPage(state->bs_currentInsertBuf);
1140 		RecordPageWithFreeSpace(state->bs_irel,
1141 								BufferGetBlockNumber(state->bs_currentInsertBuf),
1142 								PageGetFreeSpace(page));
1143 		ReleaseBuffer(state->bs_currentInsertBuf);
1144 	}
1145 
1146 	brin_free_desc(state->bs_bdesc);
1147 	pfree(state->bs_dtuple);
1148 	pfree(state);
1149 }
1150 
1151 /*
1152  * On the given BRIN index, summarize the heap page range that corresponds
1153  * to the heap block number given.
1154  *
1155  * This routine can run in parallel with insertions into the heap.  To avoid
1156  * missing those values from the summary tuple, we first insert a placeholder
1157  * index tuple into the index, then execute the heap scan; transactions
1158  * concurrent with the scan update the placeholder tuple.  After the scan, we
1159  * union the placeholder tuple with the one computed by this routine.  The
1160  * update of the index value happens in a loop, so that if somebody updates
1161  * the placeholder tuple after we read it, we detect the case and try again.
1162  * This ensures that the concurrently inserted tuples are not lost.
1163  *
1164  * A further corner case is this routine being asked to summarize the partial
1165  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
1166  * table size; if we notice that the requested range lies beyond that size,
1167  * we re-compute the table size after inserting the placeholder tuple, to
1168  * avoid missing pages that were appended recently.
1169  */
1170 static void
summarize_range(IndexInfo * indexInfo,BrinBuildState * state,Relation heapRel,BlockNumber heapBlk,BlockNumber heapNumBlks)1171 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1172 				BlockNumber heapBlk, BlockNumber heapNumBlks)
1173 {
1174 	Buffer		phbuf;
1175 	BrinTuple  *phtup;
1176 	Size		phsz;
1177 	OffsetNumber offset;
1178 	BlockNumber scanNumBlks;
1179 
1180 	/*
1181 	 * Insert the placeholder tuple
1182 	 */
1183 	phbuf = InvalidBuffer;
1184 	phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1185 	offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1186 						   state->bs_rmAccess, &phbuf,
1187 						   heapBlk, phtup, phsz);
1188 
1189 	/*
1190 	 * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
1191 	 * cannot shrink concurrently (but it can grow).
1192 	 */
1193 	Assert(heapBlk % state->bs_pagesPerRange == 0);
1194 	if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1195 	{
1196 		/*
1197 		 * If we're asked to scan what we believe to be the final range on the
1198 		 * table (i.e. a range that might be partial) we need to recompute our
1199 		 * idea of what the latest page is after inserting the placeholder
1200 		 * tuple.  Anyone that grows the table later will update the
1201 		 * placeholder tuple, so it doesn't matter that we won't scan these
1202 		 * pages ourselves.  Careful: the table might have been extended
1203 		 * beyond the current range, so clamp our result.
1204 		 *
1205 		 * Fortunately, this should occur infrequently.
1206 		 */
1207 		scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1208 						  state->bs_pagesPerRange);
1209 	}
1210 	else
1211 	{
1212 		/* Easy case: range is known to be complete */
1213 		scanNumBlks = state->bs_pagesPerRange;
1214 	}
1215 
1216 	/*
1217 	 * Execute the partial heap scan covering the heap blocks in the specified
1218 	 * page range, summarizing the heap tuples in it.  This scan stops just
1219 	 * short of brinbuildCallback creating the new index entry.
1220 	 *
1221 	 * Note that it is critical we use the "any visible" mode of
1222 	 * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
1223 	 * by transactions that are still in progress, among other corner cases.
1224 	 */
1225 	state->bs_currRangeStart = heapBlk;
1226 	IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
1227 							heapBlk, scanNumBlks,
1228 							brinbuildCallback, (void *) state);
1229 
1230 	/*
1231 	 * Now we update the values obtained by the scan with the placeholder
1232 	 * tuple.  We do this in a loop which only terminates if we're able to
1233 	 * update the placeholder tuple successfully; if we are not, this means
1234 	 * somebody else modified the placeholder tuple after we read it.
1235 	 */
1236 	for (;;)
1237 	{
1238 		BrinTuple  *newtup;
1239 		Size		newsize;
1240 		bool		didupdate;
1241 		bool		samepage;
1242 
1243 		CHECK_FOR_INTERRUPTS();
1244 
1245 		/*
1246 		 * Update the summary tuple and try to update.
1247 		 */
1248 		newtup = brin_form_tuple(state->bs_bdesc,
1249 								 heapBlk, state->bs_dtuple, &newsize);
1250 		samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1251 		didupdate =
1252 			brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1253 						  state->bs_rmAccess, heapBlk, phbuf, offset,
1254 						  phtup, phsz, newtup, newsize, samepage);
1255 		brin_free_tuple(phtup);
1256 		brin_free_tuple(newtup);
1257 
1258 		/* If the update succeeded, we're done. */
1259 		if (didupdate)
1260 			break;
1261 
1262 		/*
1263 		 * If the update didn't work, it might be because somebody updated the
1264 		 * placeholder tuple concurrently.  Extract the new version, union it
1265 		 * with the values we have from the scan, and start over.  (There are
1266 		 * other reasons for the update to fail, but it's simple to treat them
1267 		 * the same.)
1268 		 */
1269 		phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1270 										 &offset, &phsz, BUFFER_LOCK_SHARE,
1271 										 NULL);
1272 		/* the placeholder tuple must exist */
1273 		if (phtup == NULL)
1274 			elog(ERROR, "missing placeholder tuple");
1275 		phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1276 		LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1277 
1278 		/* merge it into the tuple from the heap scan */
1279 		union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1280 	}
1281 
1282 	ReleaseBuffer(phbuf);
1283 }
1284 
1285 /*
1286  * Summarize page ranges that are not already summarized.  If pageRange is
1287  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1288  * page range containing the given heap page number is scanned.
1289  * If include_partial is true, then the partial range at the end of the table
1290  * is summarized, otherwise not.
1291  *
1292  * For each new index tuple inserted, *numSummarized (if not NULL) is
1293  * incremented; for each existing tuple, *numExisting (if not NULL) is
1294  * incremented.
1295  */
1296 static void
brinsummarize(Relation index,Relation heapRel,BlockNumber pageRange,bool include_partial,double * numSummarized,double * numExisting)1297 brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1298 			  bool include_partial, double *numSummarized, double *numExisting)
1299 {
1300 	BrinRevmap *revmap;
1301 	BrinBuildState *state = NULL;
1302 	IndexInfo  *indexInfo = NULL;
1303 	BlockNumber heapNumBlocks;
1304 	BlockNumber pagesPerRange;
1305 	Buffer		buf;
1306 	BlockNumber startBlk;
1307 
1308 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1309 
1310 	/* determine range of pages to process */
1311 	heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1312 	if (pageRange == BRIN_ALL_BLOCKRANGES)
1313 		startBlk = 0;
1314 	else
1315 	{
1316 		startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1317 		heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1318 	}
1319 	if (startBlk > heapNumBlocks)
1320 	{
1321 		/* Nothing to do if start point is beyond end of table */
1322 		brinRevmapTerminate(revmap);
1323 		return;
1324 	}
1325 
1326 	/*
1327 	 * Scan the revmap to find unsummarized items.
1328 	 */
1329 	buf = InvalidBuffer;
1330 	for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1331 	{
1332 		BrinTuple  *tup;
1333 		OffsetNumber off;
1334 
1335 		/*
1336 		 * Unless requested to summarize even a partial range, go away now if
1337 		 * we think the next range is partial.  Caller would pass true when
1338 		 * it is typically run once bulk data loading is done
1339 		 * (brin_summarize_new_values), and false when it is typically the
1340 		 * result of arbitrarily-scheduled maintenance command (vacuuming).
1341 		 */
1342 		if (!include_partial &&
1343 			(startBlk + pagesPerRange > heapNumBlocks))
1344 			break;
1345 
1346 		CHECK_FOR_INTERRUPTS();
1347 
1348 		tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1349 									   BUFFER_LOCK_SHARE, NULL);
1350 		if (tup == NULL)
1351 		{
1352 			/* no revmap entry for this heap range. Summarize it. */
1353 			if (state == NULL)
1354 			{
1355 				/* first time through */
1356 				Assert(!indexInfo);
1357 				state = initialize_brin_buildstate(index, revmap,
1358 												   pagesPerRange);
1359 				indexInfo = BuildIndexInfo(index);
1360 			}
1361 			summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1362 
1363 			/* and re-initialize state for the next range */
1364 			brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1365 
1366 			if (numSummarized)
1367 				*numSummarized += 1.0;
1368 		}
1369 		else
1370 		{
1371 			if (numExisting)
1372 				*numExisting += 1.0;
1373 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1374 		}
1375 	}
1376 
1377 	if (BufferIsValid(buf))
1378 		ReleaseBuffer(buf);
1379 
1380 	/* free resources */
1381 	brinRevmapTerminate(revmap);
1382 	if (state)
1383 	{
1384 		terminate_brin_buildstate(state);
1385 		pfree(indexInfo);
1386 	}
1387 }
1388 
1389 /*
1390  * Given a deformed tuple in the build state, convert it into the on-disk
1391  * format and insert it into the index, making the revmap point to it.
1392  */
1393 static void
form_and_insert_tuple(BrinBuildState * state)1394 form_and_insert_tuple(BrinBuildState *state)
1395 {
1396 	BrinTuple  *tup;
1397 	Size		size;
1398 
1399 	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1400 						  state->bs_dtuple, &size);
1401 	brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1402 				  &state->bs_currentInsertBuf, state->bs_currRangeStart,
1403 				  tup, size);
1404 	state->bs_numtuples++;
1405 
1406 	pfree(tup);
1407 }
1408 
1409 /*
1410  * Given two deformed tuples, adjust the first one so that it's consistent
1411  * with the summary values in both.
1412  */
1413 static void
union_tuples(BrinDesc * bdesc,BrinMemTuple * a,BrinTuple * b)1414 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1415 {
1416 	int			keyno;
1417 	BrinMemTuple *db;
1418 	MemoryContext cxt;
1419 	MemoryContext oldcxt;
1420 
1421 	/* Use our own memory context to avoid retail pfree */
1422 	cxt = AllocSetContextCreate(CurrentMemoryContext,
1423 								"brin union",
1424 								ALLOCSET_DEFAULT_SIZES);
1425 	oldcxt = MemoryContextSwitchTo(cxt);
1426 	db = brin_deform_tuple(bdesc, b, NULL);
1427 	MemoryContextSwitchTo(oldcxt);
1428 
1429 	for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1430 	{
1431 		FmgrInfo   *unionFn;
1432 		BrinValues *col_a = &a->bt_columns[keyno];
1433 		BrinValues *col_b = &db->bt_columns[keyno];
1434 
1435 		unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1436 									BRIN_PROCNUM_UNION);
1437 		FunctionCall3Coll(unionFn,
1438 						  bdesc->bd_index->rd_indcollation[keyno],
1439 						  PointerGetDatum(bdesc),
1440 						  PointerGetDatum(col_a),
1441 						  PointerGetDatum(col_b));
1442 	}
1443 
1444 	MemoryContextDelete(cxt);
1445 }
1446 
1447 /*
1448  * brin_vacuum_scan
1449  *		Do a complete scan of the index during VACUUM.
1450  *
1451  * This routine scans the complete index looking for uncatalogued index pages,
1452  * i.e. those that might have been lost due to a crash after index extension
1453  * and such.
1454  */
1455 static void
brin_vacuum_scan(Relation idxrel,BufferAccessStrategy strategy)1456 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1457 {
1458 	bool		vacuum_fsm = false;
1459 	BlockNumber blkno;
1460 
1461 	/*
1462 	 * Scan the index in physical order, and clean up any possible mess in
1463 	 * each page.
1464 	 */
1465 	for (blkno = 0; blkno < RelationGetNumberOfBlocks(idxrel); blkno++)
1466 	{
1467 		Buffer		buf;
1468 
1469 		CHECK_FOR_INTERRUPTS();
1470 
1471 		buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1472 								 RBM_NORMAL, strategy);
1473 
1474 		vacuum_fsm |= brin_page_cleanup(idxrel, buf);
1475 
1476 		ReleaseBuffer(buf);
1477 	}
1478 
1479 	/*
1480 	 * If we made any change to the FSM, make sure the new info is visible all
1481 	 * the way to the top.
1482 	 */
1483 	if (vacuum_fsm)
1484 		FreeSpaceMapVacuum(idxrel);
1485 }
1486