1 /*
2  * brin.c
3  *		Implementation of BRIN indexes for Postgres
4  *
5  * See src/backend/access/brin/README for details.
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/brin/brin.c
12  *
13  * TODO
14  *		* ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15  */
16 #include "postgres.h"
17 
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/reloptions.h"
23 #include "access/relscan.h"
24 #include "access/xloginsert.h"
25 #include "catalog/index.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "pgstat.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/bufmgr.h"
31 #include "storage/freespace.h"
32 #include "utils/builtins.h"
33 #include "utils/index_selfuncs.h"
34 #include "utils/memutils.h"
35 #include "utils/rel.h"
36 
37 
38 /*
39  * We use a BrinBuildState during initial construction of a BRIN index.
40  * The running state is kept in a BrinMemTuple.
41  */
42 typedef struct BrinBuildState
43 {
44 	Relation	bs_irel;
45 	int			bs_numtuples;
46 	Buffer		bs_currentInsertBuf;
47 	BlockNumber bs_pagesPerRange;
48 	BlockNumber bs_currRangeStart;
49 	BrinRevmap *bs_rmAccess;
50 	BrinDesc   *bs_bdesc;
51 	BrinMemTuple *bs_dtuple;
52 } BrinBuildState;
53 
54 /*
55  * Struct used as "opaque" during index scans
56  */
57 typedef struct BrinOpaque
58 {
59 	BlockNumber bo_pagesPerRange;
60 	BrinRevmap *bo_rmAccess;
61 	BrinDesc   *bo_bdesc;
62 } BrinOpaque;
63 
64 #define BRIN_ALL_BLOCKRANGES	InvalidBlockNumber
65 
66 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
67 						   BrinRevmap *revmap, BlockNumber pagesPerRange);
68 static void terminate_brin_buildstate(BrinBuildState *state);
69 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
70 			  bool include_partial, double *numSummarized, double *numExisting);
71 static void form_and_insert_tuple(BrinBuildState *state);
72 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
73 			 BrinTuple *b);
74 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
75 
76 
77 /*
78  * BRIN handler function: return IndexAmRoutine with access method parameters
79  * and callbacks.
80  */
81 Datum
brinhandler(PG_FUNCTION_ARGS)82 brinhandler(PG_FUNCTION_ARGS)
83 {
84 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
85 
86 	amroutine->amstrategies = 0;
87 	amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
88 	amroutine->amcanorder = false;
89 	amroutine->amcanorderbyop = false;
90 	amroutine->amcanbackward = false;
91 	amroutine->amcanunique = false;
92 	amroutine->amcanmulticol = true;
93 	amroutine->amoptionalkey = true;
94 	amroutine->amsearcharray = false;
95 	amroutine->amsearchnulls = true;
96 	amroutine->amstorage = true;
97 	amroutine->amclusterable = false;
98 	amroutine->ampredlocks = false;
99 	amroutine->amcanparallel = false;
100 	amroutine->amcaninclude = false;
101 	amroutine->amkeytype = InvalidOid;
102 
103 	amroutine->ambuild = brinbuild;
104 	amroutine->ambuildempty = brinbuildempty;
105 	amroutine->aminsert = brininsert;
106 	amroutine->ambulkdelete = brinbulkdelete;
107 	amroutine->amvacuumcleanup = brinvacuumcleanup;
108 	amroutine->amcanreturn = NULL;
109 	amroutine->amcostestimate = brincostestimate;
110 	amroutine->amoptions = brinoptions;
111 	amroutine->amproperty = NULL;
112 	amroutine->amvalidate = brinvalidate;
113 	amroutine->ambeginscan = brinbeginscan;
114 	amroutine->amrescan = brinrescan;
115 	amroutine->amgettuple = NULL;
116 	amroutine->amgetbitmap = bringetbitmap;
117 	amroutine->amendscan = brinendscan;
118 	amroutine->ammarkpos = NULL;
119 	amroutine->amrestrpos = NULL;
120 	amroutine->amestimateparallelscan = NULL;
121 	amroutine->aminitparallelscan = NULL;
122 	amroutine->amparallelrescan = NULL;
123 
124 	PG_RETURN_POINTER(amroutine);
125 }
126 
127 /*
128  * A tuple in the heap is being inserted.  To keep a brin index up to date,
129  * we need to obtain the relevant index tuple and compare its stored values
130  * with those of the new tuple.  If the tuple values are not consistent with
131  * the summary tuple, we need to update the index tuple.
132  *
133  * If autosummarization is enabled, check if we need to summarize the previous
134  * page range.
135  *
136  * If the range is not currently summarized (i.e. the revmap returns NULL for
137  * it), there's nothing to do for this tuple.
138  */
139 bool
brininsert(Relation idxRel,Datum * values,bool * nulls,ItemPointer heaptid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)140 brininsert(Relation idxRel, Datum *values, bool *nulls,
141 		   ItemPointer heaptid, Relation heapRel,
142 		   IndexUniqueCheck checkUnique,
143 		   IndexInfo *indexInfo)
144 {
145 	BlockNumber pagesPerRange;
146 	BlockNumber origHeapBlk;
147 	BlockNumber heapBlk;
148 	BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
149 	BrinRevmap *revmap;
150 	Buffer		buf = InvalidBuffer;
151 	MemoryContext tupcxt = NULL;
152 	MemoryContext oldcxt = CurrentMemoryContext;
153 	bool		autosummarize = BrinGetAutoSummarize(idxRel);
154 
155 	revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
156 
157 	/*
158 	 * origHeapBlk is the block number where the insertion occurred.  heapBlk
159 	 * is the first block in the corresponding page range.
160 	 */
161 	origHeapBlk = ItemPointerGetBlockNumber(heaptid);
162 	heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
163 
164 	for (;;)
165 	{
166 		bool		need_insert = false;
167 		OffsetNumber off;
168 		BrinTuple  *brtup;
169 		BrinMemTuple *dtup;
170 		int			keyno;
171 
172 		CHECK_FOR_INTERRUPTS();
173 
174 		/*
175 		 * If auto-summarization is enabled and we just inserted the first
176 		 * tuple into the first block of a new non-first page range, request a
177 		 * summarization run of the previous range.
178 		 */
179 		if (autosummarize &&
180 			heapBlk > 0 &&
181 			heapBlk == origHeapBlk &&
182 			ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
183 		{
184 			BlockNumber lastPageRange = heapBlk - 1;
185 			BrinTuple  *lastPageTuple;
186 
187 			lastPageTuple =
188 				brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
189 										 NULL, BUFFER_LOCK_SHARE, NULL);
190 			if (!lastPageTuple)
191 			{
192 				bool		recorded;
193 
194 				recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
195 												 RelationGetRelid(idxRel),
196 												 lastPageRange);
197 				if (!recorded)
198 					ereport(LOG,
199 							(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
200 							 errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
201 									RelationGetRelationName(idxRel),
202 									lastPageRange)));
203 			}
204 			else
205 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
206 		}
207 
208 		brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
209 										 NULL, BUFFER_LOCK_SHARE, NULL);
210 
211 		/* if range is unsummarized, there's nothing to do */
212 		if (!brtup)
213 			break;
214 
215 		/* First time through in this statement? */
216 		if (bdesc == NULL)
217 		{
218 			MemoryContextSwitchTo(indexInfo->ii_Context);
219 			bdesc = brin_build_desc(idxRel);
220 			indexInfo->ii_AmCache = (void *) bdesc;
221 			MemoryContextSwitchTo(oldcxt);
222 		}
223 		/* First time through in this brininsert call? */
224 		if (tupcxt == NULL)
225 		{
226 			tupcxt = AllocSetContextCreate(CurrentMemoryContext,
227 										   "brininsert cxt",
228 										   ALLOCSET_DEFAULT_SIZES);
229 			MemoryContextSwitchTo(tupcxt);
230 		}
231 
232 		dtup = brin_deform_tuple(bdesc, brtup, NULL);
233 
234 		/*
235 		 * Compare the key values of the new tuple to the stored index values;
236 		 * our deformed tuple will get updated if the new tuple doesn't fit
237 		 * the original range (note this means we can't break out of the loop
238 		 * early). Make a note of whether this happens, so that we know to
239 		 * insert the modified tuple later.
240 		 */
241 		for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
242 		{
243 			Datum		result;
244 			BrinValues *bval;
245 			FmgrInfo   *addValue;
246 
247 			bval = &dtup->bt_columns[keyno];
248 			addValue = index_getprocinfo(idxRel, keyno + 1,
249 										 BRIN_PROCNUM_ADDVALUE);
250 			result = FunctionCall4Coll(addValue,
251 									   idxRel->rd_indcollation[keyno],
252 									   PointerGetDatum(bdesc),
253 									   PointerGetDatum(bval),
254 									   values[keyno],
255 									   nulls[keyno]);
256 			/* if that returned true, we need to insert the updated tuple */
257 			need_insert |= DatumGetBool(result);
258 		}
259 
260 		if (!need_insert)
261 		{
262 			/*
263 			 * The tuple is consistent with the new values, so there's nothing
264 			 * to do.
265 			 */
266 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
267 		}
268 		else
269 		{
270 			Page		page = BufferGetPage(buf);
271 			ItemId		lp = PageGetItemId(page, off);
272 			Size		origsz;
273 			BrinTuple  *origtup;
274 			Size		newsz;
275 			BrinTuple  *newtup;
276 			bool		samepage;
277 
278 			/*
279 			 * Make a copy of the old tuple, so that we can compare it after
280 			 * re-acquiring the lock.
281 			 */
282 			origsz = ItemIdGetLength(lp);
283 			origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
284 
285 			/*
286 			 * Before releasing the lock, check if we can attempt a same-page
287 			 * update.  Another process could insert a tuple concurrently in
288 			 * the same page though, so downstream we must be prepared to cope
289 			 * if this turns out to not be possible after all.
290 			 */
291 			newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
292 			samepage = brin_can_do_samepage_update(buf, origsz, newsz);
293 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
294 
295 			/*
296 			 * Try to update the tuple.  If this doesn't work for whatever
297 			 * reason, we need to restart from the top; the revmap might be
298 			 * pointing at a different tuple for this block now, so we need to
299 			 * recompute to ensure both our new heap tuple and the other
300 			 * inserter's are covered by the combined tuple.  It might be that
301 			 * we don't need to update at all.
302 			 */
303 			if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
304 							   buf, off, origtup, origsz, newtup, newsz,
305 							   samepage))
306 			{
307 				/* no luck; start over */
308 				MemoryContextResetAndDeleteChildren(tupcxt);
309 				continue;
310 			}
311 		}
312 
313 		/* success! */
314 		break;
315 	}
316 
317 	brinRevmapTerminate(revmap);
318 	if (BufferIsValid(buf))
319 		ReleaseBuffer(buf);
320 	MemoryContextSwitchTo(oldcxt);
321 	if (tupcxt != NULL)
322 		MemoryContextDelete(tupcxt);
323 
324 	return false;
325 }
326 
327 /*
328  * Initialize state for a BRIN index scan.
329  *
330  * We read the metapage here to determine the pages-per-range number that this
331  * index was built with.  Note that since this cannot be changed while we're
332  * holding lock on index, it's not necessary to recompute it during brinrescan.
333  */
334 IndexScanDesc
brinbeginscan(Relation r,int nkeys,int norderbys)335 brinbeginscan(Relation r, int nkeys, int norderbys)
336 {
337 	IndexScanDesc scan;
338 	BrinOpaque *opaque;
339 
340 	scan = RelationGetIndexScan(r, nkeys, norderbys);
341 
342 	opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
343 	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
344 											   scan->xs_snapshot);
345 	opaque->bo_bdesc = brin_build_desc(r);
346 	scan->opaque = opaque;
347 
348 	return scan;
349 }
350 
351 /*
352  * Execute the index scan.
353  *
354  * This works by reading index TIDs from the revmap, and obtaining the index
355  * tuples pointed to by them; the summary values in the index tuples are
356  * compared to the scan keys.  We return into the TID bitmap all the pages in
357  * ranges corresponding to index tuples that match the scan keys.
358  *
359  * If a TID from the revmap is read as InvalidTID, we know that range is
360  * unsummarized.  Pages in those ranges need to be returned regardless of scan
361  * keys.
362  */
363 int64
bringetbitmap(IndexScanDesc scan,TIDBitmap * tbm)364 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
365 {
366 	Relation	idxRel = scan->indexRelation;
367 	Buffer		buf = InvalidBuffer;
368 	BrinDesc   *bdesc;
369 	Oid			heapOid;
370 	Relation	heapRel;
371 	BrinOpaque *opaque;
372 	BlockNumber nblocks;
373 	BlockNumber heapBlk;
374 	int			totalpages = 0;
375 	FmgrInfo   *consistentFn;
376 	MemoryContext oldcxt;
377 	MemoryContext perRangeCxt;
378 	BrinMemTuple *dtup;
379 	BrinTuple  *btup = NULL;
380 	Size		btupsz = 0;
381 
382 	opaque = (BrinOpaque *) scan->opaque;
383 	bdesc = opaque->bo_bdesc;
384 	pgstat_count_index_scan(idxRel);
385 
386 	/*
387 	 * We need to know the size of the table so that we know how long to
388 	 * iterate on the revmap.
389 	 */
390 	heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
391 	heapRel = heap_open(heapOid, AccessShareLock);
392 	nblocks = RelationGetNumberOfBlocks(heapRel);
393 	heap_close(heapRel, AccessShareLock);
394 
395 	/*
396 	 * Make room for the consistent support procedures of indexed columns.  We
397 	 * don't look them up here; we do that lazily the first time we see a scan
398 	 * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
399 	 */
400 	consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
401 
402 	/* allocate an initial in-memory tuple, out of the per-range memcxt */
403 	dtup = brin_new_memtuple(bdesc);
404 
405 	/*
406 	 * Setup and use a per-range memory context, which is reset every time we
407 	 * loop below.  This avoids having to free the tuples within the loop.
408 	 */
409 	perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
410 										"bringetbitmap cxt",
411 										ALLOCSET_DEFAULT_SIZES);
412 	oldcxt = MemoryContextSwitchTo(perRangeCxt);
413 
414 	/*
415 	 * Now scan the revmap.  We start by querying for heap page 0,
416 	 * incrementing by the number of pages per range; this gives us a full
417 	 * view of the table.
418 	 */
419 	for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
420 	{
421 		bool		addrange;
422 		bool		gottuple = false;
423 		BrinTuple  *tup;
424 		OffsetNumber off;
425 		Size		size;
426 
427 		CHECK_FOR_INTERRUPTS();
428 
429 		MemoryContextResetAndDeleteChildren(perRangeCxt);
430 
431 		tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
432 									   &off, &size, BUFFER_LOCK_SHARE,
433 									   scan->xs_snapshot);
434 		if (tup)
435 		{
436 			gottuple = true;
437 			btup = brin_copy_tuple(tup, size, btup, &btupsz);
438 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
439 		}
440 
441 		/*
442 		 * For page ranges with no indexed tuple, we must return the whole
443 		 * range; otherwise, compare it to the scan keys.
444 		 */
445 		if (!gottuple)
446 		{
447 			addrange = true;
448 		}
449 		else
450 		{
451 			dtup = brin_deform_tuple(bdesc, btup, dtup);
452 			if (dtup->bt_placeholder)
453 			{
454 				/*
455 				 * Placeholder tuples are always returned, regardless of the
456 				 * values stored in them.
457 				 */
458 				addrange = true;
459 			}
460 			else
461 			{
462 				int			keyno;
463 
464 				/*
465 				 * Compare scan keys with summary values stored for the range.
466 				 * If scan keys are matched, the page range must be added to
467 				 * the bitmap.  We initially assume the range needs to be
468 				 * added; in particular this serves the case where there are
469 				 * no keys.
470 				 */
471 				addrange = true;
472 				for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
473 				{
474 					ScanKey		key = &scan->keyData[keyno];
475 					AttrNumber	keyattno = key->sk_attno;
476 					BrinValues *bval = &dtup->bt_columns[keyattno - 1];
477 					Datum		add;
478 
479 					/*
480 					 * The collation of the scan key must match the collation
481 					 * used in the index column (but only if the search is not
482 					 * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
483 					 * this index ...
484 					 */
485 					Assert((key->sk_flags & SK_ISNULL) ||
486 						   (key->sk_collation ==
487 							TupleDescAttr(bdesc->bd_tupdesc,
488 										  keyattno - 1)->attcollation));
489 
490 					/* First time this column? look up consistent function */
491 					if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
492 					{
493 						FmgrInfo   *tmp;
494 
495 						tmp = index_getprocinfo(idxRel, keyattno,
496 												BRIN_PROCNUM_CONSISTENT);
497 						fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
498 									   CurrentMemoryContext);
499 					}
500 
501 					/*
502 					 * Check whether the scan key is consistent with the page
503 					 * range values; if so, have the pages in the range added
504 					 * to the output bitmap.
505 					 *
506 					 * When there are multiple scan keys, failure to meet the
507 					 * criteria for a single one of them is enough to discard
508 					 * the range as a whole, so break out of the loop as soon
509 					 * as a false return value is obtained.
510 					 */
511 					add = FunctionCall3Coll(&consistentFn[keyattno - 1],
512 											key->sk_collation,
513 											PointerGetDatum(bdesc),
514 											PointerGetDatum(bval),
515 											PointerGetDatum(key));
516 					addrange = DatumGetBool(add);
517 					if (!addrange)
518 						break;
519 				}
520 			}
521 		}
522 
523 		/* add the pages in the range to the output bitmap, if needed */
524 		if (addrange)
525 		{
526 			BlockNumber pageno;
527 
528 			for (pageno = heapBlk;
529 				 pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
530 				 pageno++)
531 			{
532 				MemoryContextSwitchTo(oldcxt);
533 				tbm_add_page(tbm, pageno);
534 				totalpages++;
535 				MemoryContextSwitchTo(perRangeCxt);
536 			}
537 		}
538 	}
539 
540 	MemoryContextSwitchTo(oldcxt);
541 	MemoryContextDelete(perRangeCxt);
542 
543 	if (buf != InvalidBuffer)
544 		ReleaseBuffer(buf);
545 
546 	/*
547 	 * XXX We have an approximation of the number of *pages* that our scan
548 	 * returns, but we don't have a precise idea of the number of heap tuples
549 	 * involved.
550 	 */
551 	return totalpages * 10;
552 }
553 
554 /*
555  * Re-initialize state for a BRIN index scan
556  */
557 void
brinrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)558 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
559 		   ScanKey orderbys, int norderbys)
560 {
561 	/*
562 	 * Other index AMs preprocess the scan keys at this point, or sometime
563 	 * early during the scan; this lets them optimize by removing redundant
564 	 * keys, or doing early returns when they are impossible to satisfy; see
565 	 * _bt_preprocess_keys for an example.  Something like that could be added
566 	 * here someday, too.
567 	 */
568 
569 	if (scankey && scan->numberOfKeys > 0)
570 		memmove(scan->keyData, scankey,
571 				scan->numberOfKeys * sizeof(ScanKeyData));
572 }
573 
574 /*
575  * Close down a BRIN index scan
576  */
577 void
brinendscan(IndexScanDesc scan)578 brinendscan(IndexScanDesc scan)
579 {
580 	BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
581 
582 	brinRevmapTerminate(opaque->bo_rmAccess);
583 	brin_free_desc(opaque->bo_bdesc);
584 	pfree(opaque);
585 }
586 
587 /*
588  * Per-heap-tuple callback for IndexBuildHeapScan.
589  *
590  * Note we don't worry about the page range at the end of the table here; it is
591  * present in the build state struct after we're called the last time, but not
592  * inserted into the index.  Caller must ensure to do so, if appropriate.
593  */
594 static void
brinbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * brstate)595 brinbuildCallback(Relation index,
596 				  HeapTuple htup,
597 				  Datum *values,
598 				  bool *isnull,
599 				  bool tupleIsAlive,
600 				  void *brstate)
601 {
602 	BrinBuildState *state = (BrinBuildState *) brstate;
603 	BlockNumber thisblock;
604 	int			i;
605 
606 	thisblock = ItemPointerGetBlockNumber(&htup->t_self);
607 
608 	/*
609 	 * If we're in a block that belongs to a future range, summarize what
610 	 * we've got and start afresh.  Note the scan might have skipped many
611 	 * pages, if they were devoid of live tuples; make sure to insert index
612 	 * tuples for those too.
613 	 */
614 	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
615 	{
616 
617 		BRIN_elog((DEBUG2,
618 				   "brinbuildCallback: completed a range: %u--%u",
619 				   state->bs_currRangeStart,
620 				   state->bs_currRangeStart + state->bs_pagesPerRange));
621 
622 		/* create the index tuple and insert it */
623 		form_and_insert_tuple(state);
624 
625 		/* set state to correspond to the next range */
626 		state->bs_currRangeStart += state->bs_pagesPerRange;
627 
628 		/* re-initialize state for it */
629 		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
630 	}
631 
632 	/* Accumulate the current tuple into the running state */
633 	for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
634 	{
635 		FmgrInfo   *addValue;
636 		BrinValues *col;
637 		Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
638 
639 		col = &state->bs_dtuple->bt_columns[i];
640 		addValue = index_getprocinfo(index, i + 1,
641 									 BRIN_PROCNUM_ADDVALUE);
642 
643 		/*
644 		 * Update dtuple state, if and as necessary.
645 		 */
646 		FunctionCall4Coll(addValue,
647 						  attr->attcollation,
648 						  PointerGetDatum(state->bs_bdesc),
649 						  PointerGetDatum(col),
650 						  values[i], isnull[i]);
651 	}
652 }
653 
654 /*
655  * brinbuild() -- build a new BRIN index.
656  */
657 IndexBuildResult *
brinbuild(Relation heap,Relation index,IndexInfo * indexInfo)658 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
659 {
660 	IndexBuildResult *result;
661 	double		reltuples;
662 	double		idxtuples;
663 	BrinRevmap *revmap;
664 	BrinBuildState *state;
665 	Buffer		meta;
666 	BlockNumber pagesPerRange;
667 
668 	/*
669 	 * We expect to be called exactly once for any index relation.
670 	 */
671 	if (RelationGetNumberOfBlocks(index) != 0)
672 		elog(ERROR, "index \"%s\" already contains data",
673 			 RelationGetRelationName(index));
674 
675 	/*
676 	 * Critical section not required, because on error the creation of the
677 	 * whole relation will be rolled back.
678 	 */
679 
680 	meta = ReadBuffer(index, P_NEW);
681 	Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
682 	LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
683 
684 	brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
685 					   BRIN_CURRENT_VERSION);
686 	MarkBufferDirty(meta);
687 
688 	if (RelationNeedsWAL(index))
689 	{
690 		xl_brin_createidx xlrec;
691 		XLogRecPtr	recptr;
692 		Page		page;
693 
694 		xlrec.version = BRIN_CURRENT_VERSION;
695 		xlrec.pagesPerRange = BrinGetPagesPerRange(index);
696 
697 		XLogBeginInsert();
698 		XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
699 		XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
700 
701 		recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
702 
703 		page = BufferGetPage(meta);
704 		PageSetLSN(page, recptr);
705 	}
706 
707 	UnlockReleaseBuffer(meta);
708 
709 	/*
710 	 * Initialize our state, including the deformed tuple state.
711 	 */
712 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
713 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
714 
715 	/*
716 	 * Now scan the relation.  No syncscan allowed here because we want the
717 	 * heap blocks in physical order.
718 	 */
719 	reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
720 								   brinbuildCallback, (void *) state, NULL);
721 
722 	/* process the final batch */
723 	form_and_insert_tuple(state);
724 
725 	/* release resources */
726 	idxtuples = state->bs_numtuples;
727 	brinRevmapTerminate(state->bs_rmAccess);
728 	terminate_brin_buildstate(state);
729 
730 	/*
731 	 * Return statistics
732 	 */
733 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
734 
735 	result->heap_tuples = reltuples;
736 	result->index_tuples = idxtuples;
737 
738 	return result;
739 }
740 
741 void
brinbuildempty(Relation index)742 brinbuildempty(Relation index)
743 {
744 	Buffer		metabuf;
745 
746 	/* An empty BRIN index has a metapage only. */
747 	metabuf =
748 		ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
749 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
750 
751 	/* Initialize and xlog metabuffer. */
752 	START_CRIT_SECTION();
753 	brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
754 					   BRIN_CURRENT_VERSION);
755 	MarkBufferDirty(metabuf);
756 	log_newpage_buffer(metabuf, true);
757 	END_CRIT_SECTION();
758 
759 	UnlockReleaseBuffer(metabuf);
760 }
761 
762 /*
763  * brinbulkdelete
764  *		Since there are no per-heap-tuple index tuples in BRIN indexes,
765  *		there's not a lot we can do here.
766  *
767  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
768  * tuple is deleted), meaning the need to re-run summarization on the affected
769  * range.  Would need to add an extra flag in brintuples for that.
770  */
771 IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)772 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
773 			   IndexBulkDeleteCallback callback, void *callback_state)
774 {
775 	/* allocate stats if first time through, else re-use existing struct */
776 	if (stats == NULL)
777 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
778 
779 	return stats;
780 }
781 
782 /*
783  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
784  * ranges that are currently unsummarized.
785  */
786 IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)787 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
788 {
789 	Relation	heapRel;
790 
791 	/* No-op in ANALYZE ONLY mode */
792 	if (info->analyze_only)
793 		return stats;
794 
795 	if (!stats)
796 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
797 	stats->num_pages = RelationGetNumberOfBlocks(info->index);
798 	/* rest of stats is initialized by zeroing */
799 
800 	heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
801 						AccessShareLock);
802 
803 	brin_vacuum_scan(info->index, info->strategy);
804 
805 	brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
806 				  &stats->num_index_tuples, &stats->num_index_tuples);
807 
808 	heap_close(heapRel, AccessShareLock);
809 
810 	return stats;
811 }
812 
813 /*
814  * reloptions processor for BRIN indexes
815  */
816 bytea *
brinoptions(Datum reloptions,bool validate)817 brinoptions(Datum reloptions, bool validate)
818 {
819 	relopt_value *options;
820 	BrinOptions *rdopts;
821 	int			numoptions;
822 	static const relopt_parse_elt tab[] = {
823 		{"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
824 		{"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
825 	};
826 
827 	options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
828 							  &numoptions);
829 
830 	/* if none set, we're done */
831 	if (numoptions == 0)
832 		return NULL;
833 
834 	rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
835 
836 	fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
837 				   validate, tab, lengthof(tab));
838 
839 	pfree(options);
840 
841 	return (bytea *) rdopts;
842 }
843 
844 /*
845  * SQL-callable function to scan through an index and summarize all ranges
846  * that are not currently summarized.
847  */
848 Datum
brin_summarize_new_values(PG_FUNCTION_ARGS)849 brin_summarize_new_values(PG_FUNCTION_ARGS)
850 {
851 	Datum		relation = PG_GETARG_DATUM(0);
852 
853 	return DirectFunctionCall2(brin_summarize_range,
854 							   relation,
855 							   Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
856 }
857 
858 /*
859  * SQL-callable function to summarize the indicated page range, if not already
860  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
861  * unsummarized ranges are summarized.
862  */
863 Datum
brin_summarize_range(PG_FUNCTION_ARGS)864 brin_summarize_range(PG_FUNCTION_ARGS)
865 {
866 	Oid			indexoid = PG_GETARG_OID(0);
867 	int64		heapBlk64 = PG_GETARG_INT64(1);
868 	BlockNumber heapBlk;
869 	Oid			heapoid;
870 	Relation	indexRel;
871 	Relation	heapRel;
872 	double		numSummarized = 0;
873 
874 	if (RecoveryInProgress())
875 		ereport(ERROR,
876 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
877 				 errmsg("recovery is in progress"),
878 				 errhint("BRIN control functions cannot be executed during recovery.")));
879 
880 	if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
881 	{
882 		char	   *blk = psprintf(INT64_FORMAT, heapBlk64);
883 
884 		ereport(ERROR,
885 				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
886 				 errmsg("block number out of range: %s", blk)));
887 	}
888 	heapBlk = (BlockNumber) heapBlk64;
889 
890 	/*
891 	 * We must lock table before index to avoid deadlocks.  However, if the
892 	 * passed indexoid isn't an index then IndexGetRelation() will fail.
893 	 * Rather than emitting a not-very-helpful error message, postpone
894 	 * complaining, expecting that the is-it-an-index test below will fail.
895 	 */
896 	heapoid = IndexGetRelation(indexoid, true);
897 	if (OidIsValid(heapoid))
898 		heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
899 	else
900 		heapRel = NULL;
901 
902 	indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
903 
904 	/* Must be a BRIN index */
905 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
906 		indexRel->rd_rel->relam != BRIN_AM_OID)
907 		ereport(ERROR,
908 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
909 				 errmsg("\"%s\" is not a BRIN index",
910 						RelationGetRelationName(indexRel))));
911 
912 	/* User must own the index (comparable to privileges needed for VACUUM) */
913 	if (!pg_class_ownercheck(indexoid, GetUserId()))
914 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
915 					   RelationGetRelationName(indexRel));
916 
917 	/*
918 	 * Since we did the IndexGetRelation call above without any lock, it's
919 	 * barely possible that a race against an index drop/recreation could have
920 	 * netted us the wrong table.  Recheck.
921 	 */
922 	if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
923 		ereport(ERROR,
924 				(errcode(ERRCODE_UNDEFINED_TABLE),
925 				 errmsg("could not open parent table of index %s",
926 						RelationGetRelationName(indexRel))));
927 
928 	/* OK, do it */
929 	brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
930 
931 	relation_close(indexRel, ShareUpdateExclusiveLock);
932 	relation_close(heapRel, ShareUpdateExclusiveLock);
933 
934 	PG_RETURN_INT32((int32) numSummarized);
935 }
936 
937 /*
938  * SQL-callable interface to mark a range as no longer summarized
939  */
940 Datum
brin_desummarize_range(PG_FUNCTION_ARGS)941 brin_desummarize_range(PG_FUNCTION_ARGS)
942 {
943 	Oid			indexoid = PG_GETARG_OID(0);
944 	int64		heapBlk64 = PG_GETARG_INT64(1);
945 	BlockNumber heapBlk;
946 	Oid			heapoid;
947 	Relation	heapRel;
948 	Relation	indexRel;
949 	bool		done;
950 
951 	if (RecoveryInProgress())
952 		ereport(ERROR,
953 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
954 				 errmsg("recovery is in progress"),
955 				 errhint("BRIN control functions cannot be executed during recovery.")));
956 
957 	if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
958 	{
959 		char	   *blk = psprintf(INT64_FORMAT, heapBlk64);
960 
961 		ereport(ERROR,
962 				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
963 				 errmsg("block number out of range: %s", blk)));
964 	}
965 	heapBlk = (BlockNumber) heapBlk64;
966 
967 	/*
968 	 * We must lock table before index to avoid deadlocks.  However, if the
969 	 * passed indexoid isn't an index then IndexGetRelation() will fail.
970 	 * Rather than emitting a not-very-helpful error message, postpone
971 	 * complaining, expecting that the is-it-an-index test below will fail.
972 	 */
973 	heapoid = IndexGetRelation(indexoid, true);
974 	if (OidIsValid(heapoid))
975 		heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
976 	else
977 		heapRel = NULL;
978 
979 	indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
980 
981 	/* Must be a BRIN index */
982 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
983 		indexRel->rd_rel->relam != BRIN_AM_OID)
984 		ereport(ERROR,
985 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
986 				 errmsg("\"%s\" is not a BRIN index",
987 						RelationGetRelationName(indexRel))));
988 
989 	/* User must own the index (comparable to privileges needed for VACUUM) */
990 	if (!pg_class_ownercheck(indexoid, GetUserId()))
991 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
992 					   RelationGetRelationName(indexRel));
993 
994 	/*
995 	 * Since we did the IndexGetRelation call above without any lock, it's
996 	 * barely possible that a race against an index drop/recreation could have
997 	 * netted us the wrong table.  Recheck.
998 	 */
999 	if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1000 		ereport(ERROR,
1001 				(errcode(ERRCODE_UNDEFINED_TABLE),
1002 				 errmsg("could not open parent table of index %s",
1003 						RelationGetRelationName(indexRel))));
1004 
1005 	/* the revmap does the hard work */
1006 	do
1007 	{
1008 		done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1009 	}
1010 	while (!done);
1011 
1012 	relation_close(indexRel, ShareUpdateExclusiveLock);
1013 	relation_close(heapRel, ShareUpdateExclusiveLock);
1014 
1015 	PG_RETURN_VOID();
1016 }
1017 
1018 /*
1019  * Build a BrinDesc used to create or scan a BRIN index
1020  */
1021 BrinDesc *
brin_build_desc(Relation rel)1022 brin_build_desc(Relation rel)
1023 {
1024 	BrinOpcInfo **opcinfo;
1025 	BrinDesc   *bdesc;
1026 	TupleDesc	tupdesc;
1027 	int			totalstored = 0;
1028 	int			keyno;
1029 	long		totalsize;
1030 	MemoryContext cxt;
1031 	MemoryContext oldcxt;
1032 
1033 	cxt = AllocSetContextCreate(CurrentMemoryContext,
1034 								"brin desc cxt",
1035 								ALLOCSET_SMALL_SIZES);
1036 	oldcxt = MemoryContextSwitchTo(cxt);
1037 	tupdesc = RelationGetDescr(rel);
1038 
1039 	/*
1040 	 * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
1041 	 * the number of columns stored, since the number is opclass-defined.
1042 	 */
1043 	opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1044 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
1045 	{
1046 		FmgrInfo   *opcInfoFn;
1047 		Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1048 
1049 		opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1050 
1051 		opcinfo[keyno] = (BrinOpcInfo *)
1052 			DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1053 		totalstored += opcinfo[keyno]->oi_nstored;
1054 	}
1055 
1056 	/* Allocate our result struct and fill it in */
1057 	totalsize = offsetof(BrinDesc, bd_info) +
1058 		sizeof(BrinOpcInfo *) * tupdesc->natts;
1059 
1060 	bdesc = palloc(totalsize);
1061 	bdesc->bd_context = cxt;
1062 	bdesc->bd_index = rel;
1063 	bdesc->bd_tupdesc = tupdesc;
1064 	bdesc->bd_disktdesc = NULL; /* generated lazily */
1065 	bdesc->bd_totalstored = totalstored;
1066 
1067 	for (keyno = 0; keyno < tupdesc->natts; keyno++)
1068 		bdesc->bd_info[keyno] = opcinfo[keyno];
1069 	pfree(opcinfo);
1070 
1071 	MemoryContextSwitchTo(oldcxt);
1072 
1073 	return bdesc;
1074 }
1075 
1076 void
brin_free_desc(BrinDesc * bdesc)1077 brin_free_desc(BrinDesc *bdesc)
1078 {
1079 	/* make sure the tupdesc is still valid */
1080 	Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1081 	/* no need for retail pfree */
1082 	MemoryContextDelete(bdesc->bd_context);
1083 }
1084 
1085 /*
1086  * Fetch index's statistical data into *stats
1087  */
1088 void
brinGetStats(Relation index,BrinStatsData * stats)1089 brinGetStats(Relation index, BrinStatsData *stats)
1090 {
1091 	Buffer		metabuffer;
1092 	Page		metapage;
1093 	BrinMetaPageData *metadata;
1094 
1095 	metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1096 	LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1097 	metapage = BufferGetPage(metabuffer);
1098 	metadata = (BrinMetaPageData *) PageGetContents(metapage);
1099 
1100 	stats->pagesPerRange = metadata->pagesPerRange;
1101 	stats->revmapNumPages = metadata->lastRevmapPage - 1;
1102 
1103 	UnlockReleaseBuffer(metabuffer);
1104 }
1105 
1106 /*
1107  * Initialize a BrinBuildState appropriate to create tuples on the given index.
1108  */
1109 static BrinBuildState *
initialize_brin_buildstate(Relation idxRel,BrinRevmap * revmap,BlockNumber pagesPerRange)1110 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1111 						   BlockNumber pagesPerRange)
1112 {
1113 	BrinBuildState *state;
1114 
1115 	state = palloc(sizeof(BrinBuildState));
1116 
1117 	state->bs_irel = idxRel;
1118 	state->bs_numtuples = 0;
1119 	state->bs_currentInsertBuf = InvalidBuffer;
1120 	state->bs_pagesPerRange = pagesPerRange;
1121 	state->bs_currRangeStart = 0;
1122 	state->bs_rmAccess = revmap;
1123 	state->bs_bdesc = brin_build_desc(idxRel);
1124 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1125 
1126 	brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1127 
1128 	return state;
1129 }
1130 
1131 /*
1132  * Release resources associated with a BrinBuildState.
1133  */
1134 static void
terminate_brin_buildstate(BrinBuildState * state)1135 terminate_brin_buildstate(BrinBuildState *state)
1136 {
1137 	/*
1138 	 * Release the last index buffer used.  We might as well ensure that
1139 	 * whatever free space remains in that page is available in FSM, too.
1140 	 */
1141 	if (!BufferIsInvalid(state->bs_currentInsertBuf))
1142 	{
1143 		Page		page;
1144 		Size		freespace;
1145 		BlockNumber blk;
1146 
1147 		page = BufferGetPage(state->bs_currentInsertBuf);
1148 		freespace = PageGetFreeSpace(page);
1149 		blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1150 		ReleaseBuffer(state->bs_currentInsertBuf);
1151 		RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1152 		FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1153 	}
1154 
1155 	brin_free_desc(state->bs_bdesc);
1156 	pfree(state->bs_dtuple);
1157 	pfree(state);
1158 }
1159 
1160 /*
1161  * On the given BRIN index, summarize the heap page range that corresponds
1162  * to the heap block number given.
1163  *
1164  * This routine can run in parallel with insertions into the heap.  To avoid
1165  * missing those values from the summary tuple, we first insert a placeholder
1166  * index tuple into the index, then execute the heap scan; transactions
1167  * concurrent with the scan update the placeholder tuple.  After the scan, we
1168  * union the placeholder tuple with the one computed by this routine.  The
1169  * update of the index value happens in a loop, so that if somebody updates
1170  * the placeholder tuple after we read it, we detect the case and try again.
1171  * This ensures that the concurrently inserted tuples are not lost.
1172  *
1173  * A further corner case is this routine being asked to summarize the partial
1174  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
1175  * table size; if we notice that the requested range lies beyond that size,
1176  * we re-compute the table size after inserting the placeholder tuple, to
1177  * avoid missing pages that were appended recently.
1178  */
1179 static void
summarize_range(IndexInfo * indexInfo,BrinBuildState * state,Relation heapRel,BlockNumber heapBlk,BlockNumber heapNumBlks)1180 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1181 				BlockNumber heapBlk, BlockNumber heapNumBlks)
1182 {
1183 	Buffer		phbuf;
1184 	BrinTuple  *phtup;
1185 	Size		phsz;
1186 	OffsetNumber offset;
1187 	BlockNumber scanNumBlks;
1188 
1189 	/*
1190 	 * Insert the placeholder tuple
1191 	 */
1192 	phbuf = InvalidBuffer;
1193 	phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1194 	offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1195 						   state->bs_rmAccess, &phbuf,
1196 						   heapBlk, phtup, phsz);
1197 
1198 	/*
1199 	 * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
1200 	 * cannot shrink concurrently (but it can grow).
1201 	 */
1202 	Assert(heapBlk % state->bs_pagesPerRange == 0);
1203 	if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1204 	{
1205 		/*
1206 		 * If we're asked to scan what we believe to be the final range on the
1207 		 * table (i.e. a range that might be partial) we need to recompute our
1208 		 * idea of what the latest page is after inserting the placeholder
1209 		 * tuple.  Anyone that grows the table later will update the
1210 		 * placeholder tuple, so it doesn't matter that we won't scan these
1211 		 * pages ourselves.  Careful: the table might have been extended
1212 		 * beyond the current range, so clamp our result.
1213 		 *
1214 		 * Fortunately, this should occur infrequently.
1215 		 */
1216 		scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1217 						  state->bs_pagesPerRange);
1218 	}
1219 	else
1220 	{
1221 		/* Easy case: range is known to be complete */
1222 		scanNumBlks = state->bs_pagesPerRange;
1223 	}
1224 
1225 	/*
1226 	 * Execute the partial heap scan covering the heap blocks in the specified
1227 	 * page range, summarizing the heap tuples in it.  This scan stops just
1228 	 * short of brinbuildCallback creating the new index entry.
1229 	 *
1230 	 * Note that it is critical we use the "any visible" mode of
1231 	 * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
1232 	 * by transactions that are still in progress, among other corner cases.
1233 	 */
1234 	state->bs_currRangeStart = heapBlk;
1235 	IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
1236 							heapBlk, scanNumBlks,
1237 							brinbuildCallback, (void *) state, NULL);
1238 
1239 	/*
1240 	 * Now we update the values obtained by the scan with the placeholder
1241 	 * tuple.  We do this in a loop which only terminates if we're able to
1242 	 * update the placeholder tuple successfully; if we are not, this means
1243 	 * somebody else modified the placeholder tuple after we read it.
1244 	 */
1245 	for (;;)
1246 	{
1247 		BrinTuple  *newtup;
1248 		Size		newsize;
1249 		bool		didupdate;
1250 		bool		samepage;
1251 
1252 		CHECK_FOR_INTERRUPTS();
1253 
1254 		/*
1255 		 * Update the summary tuple and try to update.
1256 		 */
1257 		newtup = brin_form_tuple(state->bs_bdesc,
1258 								 heapBlk, state->bs_dtuple, &newsize);
1259 		samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1260 		didupdate =
1261 			brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1262 						  state->bs_rmAccess, heapBlk, phbuf, offset,
1263 						  phtup, phsz, newtup, newsize, samepage);
1264 		brin_free_tuple(phtup);
1265 		brin_free_tuple(newtup);
1266 
1267 		/* If the update succeeded, we're done. */
1268 		if (didupdate)
1269 			break;
1270 
1271 		/*
1272 		 * If the update didn't work, it might be because somebody updated the
1273 		 * placeholder tuple concurrently.  Extract the new version, union it
1274 		 * with the values we have from the scan, and start over.  (There are
1275 		 * other reasons for the update to fail, but it's simple to treat them
1276 		 * the same.)
1277 		 */
1278 		phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1279 										 &offset, &phsz, BUFFER_LOCK_SHARE,
1280 										 NULL);
1281 		/* the placeholder tuple must exist */
1282 		if (phtup == NULL)
1283 			elog(ERROR, "missing placeholder tuple");
1284 		phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1285 		LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1286 
1287 		/* merge it into the tuple from the heap scan */
1288 		union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1289 	}
1290 
1291 	ReleaseBuffer(phbuf);
1292 }
1293 
1294 /*
1295  * Summarize page ranges that are not already summarized.  If pageRange is
1296  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1297  * page range containing the given heap page number is scanned.
1298  * If include_partial is true, then the partial range at the end of the table
1299  * is summarized, otherwise not.
1300  *
1301  * For each new index tuple inserted, *numSummarized (if not NULL) is
1302  * incremented; for each existing tuple, *numExisting (if not NULL) is
1303  * incremented.
1304  */
1305 static void
brinsummarize(Relation index,Relation heapRel,BlockNumber pageRange,bool include_partial,double * numSummarized,double * numExisting)1306 brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1307 			  bool include_partial, double *numSummarized, double *numExisting)
1308 {
1309 	BrinRevmap *revmap;
1310 	BrinBuildState *state = NULL;
1311 	IndexInfo  *indexInfo = NULL;
1312 	BlockNumber heapNumBlocks;
1313 	BlockNumber pagesPerRange;
1314 	Buffer		buf;
1315 	BlockNumber startBlk;
1316 
1317 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1318 
1319 	/* determine range of pages to process */
1320 	heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1321 	if (pageRange == BRIN_ALL_BLOCKRANGES)
1322 		startBlk = 0;
1323 	else
1324 	{
1325 		startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1326 		heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1327 	}
1328 	if (startBlk > heapNumBlocks)
1329 	{
1330 		/* Nothing to do if start point is beyond end of table */
1331 		brinRevmapTerminate(revmap);
1332 		return;
1333 	}
1334 
1335 	/*
1336 	 * Scan the revmap to find unsummarized items.
1337 	 */
1338 	buf = InvalidBuffer;
1339 	for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1340 	{
1341 		BrinTuple  *tup;
1342 		OffsetNumber off;
1343 
1344 		/*
1345 		 * Unless requested to summarize even a partial range, go away now if
1346 		 * we think the next range is partial.  Caller would pass true when it
1347 		 * is typically run once bulk data loading is done
1348 		 * (brin_summarize_new_values), and false when it is typically the
1349 		 * result of arbitrarily-scheduled maintenance command (vacuuming).
1350 		 */
1351 		if (!include_partial &&
1352 			(startBlk + pagesPerRange > heapNumBlocks))
1353 			break;
1354 
1355 		CHECK_FOR_INTERRUPTS();
1356 
1357 		tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1358 									   BUFFER_LOCK_SHARE, NULL);
1359 		if (tup == NULL)
1360 		{
1361 			/* no revmap entry for this heap range. Summarize it. */
1362 			if (state == NULL)
1363 			{
1364 				/* first time through */
1365 				Assert(!indexInfo);
1366 				state = initialize_brin_buildstate(index, revmap,
1367 												   pagesPerRange);
1368 				indexInfo = BuildIndexInfo(index);
1369 			}
1370 			summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1371 
1372 			/* and re-initialize state for the next range */
1373 			brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1374 
1375 			if (numSummarized)
1376 				*numSummarized += 1.0;
1377 		}
1378 		else
1379 		{
1380 			if (numExisting)
1381 				*numExisting += 1.0;
1382 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1383 		}
1384 	}
1385 
1386 	if (BufferIsValid(buf))
1387 		ReleaseBuffer(buf);
1388 
1389 	/* free resources */
1390 	brinRevmapTerminate(revmap);
1391 	if (state)
1392 	{
1393 		terminate_brin_buildstate(state);
1394 		pfree(indexInfo);
1395 	}
1396 }
1397 
1398 /*
1399  * Given a deformed tuple in the build state, convert it into the on-disk
1400  * format and insert it into the index, making the revmap point to it.
1401  */
1402 static void
form_and_insert_tuple(BrinBuildState * state)1403 form_and_insert_tuple(BrinBuildState *state)
1404 {
1405 	BrinTuple  *tup;
1406 	Size		size;
1407 
1408 	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1409 						  state->bs_dtuple, &size);
1410 	brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1411 				  &state->bs_currentInsertBuf, state->bs_currRangeStart,
1412 				  tup, size);
1413 	state->bs_numtuples++;
1414 
1415 	pfree(tup);
1416 }
1417 
1418 /*
1419  * Given two deformed tuples, adjust the first one so that it's consistent
1420  * with the summary values in both.
1421  */
1422 static void
union_tuples(BrinDesc * bdesc,BrinMemTuple * a,BrinTuple * b)1423 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1424 {
1425 	int			keyno;
1426 	BrinMemTuple *db;
1427 	MemoryContext cxt;
1428 	MemoryContext oldcxt;
1429 
1430 	/* Use our own memory context to avoid retail pfree */
1431 	cxt = AllocSetContextCreate(CurrentMemoryContext,
1432 								"brin union",
1433 								ALLOCSET_DEFAULT_SIZES);
1434 	oldcxt = MemoryContextSwitchTo(cxt);
1435 	db = brin_deform_tuple(bdesc, b, NULL);
1436 	MemoryContextSwitchTo(oldcxt);
1437 
1438 	for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1439 	{
1440 		FmgrInfo   *unionFn;
1441 		BrinValues *col_a = &a->bt_columns[keyno];
1442 		BrinValues *col_b = &db->bt_columns[keyno];
1443 
1444 		unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1445 									BRIN_PROCNUM_UNION);
1446 		FunctionCall3Coll(unionFn,
1447 						  bdesc->bd_index->rd_indcollation[keyno],
1448 						  PointerGetDatum(bdesc),
1449 						  PointerGetDatum(col_a),
1450 						  PointerGetDatum(col_b));
1451 	}
1452 
1453 	MemoryContextDelete(cxt);
1454 }
1455 
1456 /*
1457  * brin_vacuum_scan
1458  *		Do a complete scan of the index during VACUUM.
1459  *
1460  * This routine scans the complete index looking for uncatalogued index pages,
1461  * i.e. those that might have been lost due to a crash after index extension
1462  * and such.
1463  */
1464 static void
brin_vacuum_scan(Relation idxrel,BufferAccessStrategy strategy)1465 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1466 {
1467 	BlockNumber nblocks;
1468 	BlockNumber blkno;
1469 
1470 	/*
1471 	 * Scan the index in physical order, and clean up any possible mess in
1472 	 * each page.
1473 	 */
1474 	nblocks = RelationGetNumberOfBlocks(idxrel);
1475 	for (blkno = 0; blkno < nblocks; blkno++)
1476 	{
1477 		Buffer		buf;
1478 
1479 		CHECK_FOR_INTERRUPTS();
1480 
1481 		buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1482 								 RBM_NORMAL, strategy);
1483 
1484 		brin_page_cleanup(idxrel, buf);
1485 
1486 		ReleaseBuffer(buf);
1487 	}
1488 
1489 	/*
1490 	 * Update all upper pages in the index's FSM, as well.  This ensures not
1491 	 * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
1492 	 * but also that any pre-existing damage or out-of-dateness is repaired.
1493 	 */
1494 	FreeSpaceMapVacuum(idxrel);
1495 }
1496