1 /*
2 * brin.c
3 * Implementation of BRIN indexes for Postgres
4 *
5 * See src/backend/access/brin/README for details.
6 *
7 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/brin/brin.c
12 *
13 * TODO
14 * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 */
16 #include "postgres.h"
17
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/reloptions.h"
23 #include "access/relscan.h"
24 #include "access/xloginsert.h"
25 #include "catalog/index.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "pgstat.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/bufmgr.h"
31 #include "storage/freespace.h"
32 #include "utils/builtins.h"
33 #include "utils/index_selfuncs.h"
34 #include "utils/memutils.h"
35 #include "utils/rel.h"
36
37
38 /*
39 * We use a BrinBuildState during initial construction of a BRIN index.
40 * The running state is kept in a BrinMemTuple.
41 */
42 typedef struct BrinBuildState
43 {
44 Relation bs_irel;
45 int bs_numtuples;
46 Buffer bs_currentInsertBuf;
47 BlockNumber bs_pagesPerRange;
48 BlockNumber bs_currRangeStart;
49 BrinRevmap *bs_rmAccess;
50 BrinDesc *bs_bdesc;
51 BrinMemTuple *bs_dtuple;
52 } BrinBuildState;
53
54 /*
55 * Struct used as "opaque" during index scans
56 */
57 typedef struct BrinOpaque
58 {
59 BlockNumber bo_pagesPerRange;
60 BrinRevmap *bo_rmAccess;
61 BrinDesc *bo_bdesc;
62 } BrinOpaque;
63
64 #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
65
66 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
67 BrinRevmap *revmap, BlockNumber pagesPerRange);
68 static void terminate_brin_buildstate(BrinBuildState *state);
69 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
70 bool include_partial, double *numSummarized, double *numExisting);
71 static void form_and_insert_tuple(BrinBuildState *state);
72 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
73 BrinTuple *b);
74 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
75
76
77 /*
78 * BRIN handler function: return IndexAmRoutine with access method parameters
79 * and callbacks.
80 */
81 Datum
brinhandler(PG_FUNCTION_ARGS)82 brinhandler(PG_FUNCTION_ARGS)
83 {
84 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
85
86 amroutine->amstrategies = 0;
87 amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
88 amroutine->amcanorder = false;
89 amroutine->amcanorderbyop = false;
90 amroutine->amcanbackward = false;
91 amroutine->amcanunique = false;
92 amroutine->amcanmulticol = true;
93 amroutine->amoptionalkey = true;
94 amroutine->amsearcharray = false;
95 amroutine->amsearchnulls = true;
96 amroutine->amstorage = true;
97 amroutine->amclusterable = false;
98 amroutine->ampredlocks = false;
99 amroutine->amcanparallel = false;
100 amroutine->amcaninclude = false;
101 amroutine->amkeytype = InvalidOid;
102
103 amroutine->ambuild = brinbuild;
104 amroutine->ambuildempty = brinbuildempty;
105 amroutine->aminsert = brininsert;
106 amroutine->ambulkdelete = brinbulkdelete;
107 amroutine->amvacuumcleanup = brinvacuumcleanup;
108 amroutine->amcanreturn = NULL;
109 amroutine->amcostestimate = brincostestimate;
110 amroutine->amoptions = brinoptions;
111 amroutine->amproperty = NULL;
112 amroutine->amvalidate = brinvalidate;
113 amroutine->ambeginscan = brinbeginscan;
114 amroutine->amrescan = brinrescan;
115 amroutine->amgettuple = NULL;
116 amroutine->amgetbitmap = bringetbitmap;
117 amroutine->amendscan = brinendscan;
118 amroutine->ammarkpos = NULL;
119 amroutine->amrestrpos = NULL;
120 amroutine->amestimateparallelscan = NULL;
121 amroutine->aminitparallelscan = NULL;
122 amroutine->amparallelrescan = NULL;
123
124 PG_RETURN_POINTER(amroutine);
125 }
126
127 /*
128 * A tuple in the heap is being inserted. To keep a brin index up to date,
129 * we need to obtain the relevant index tuple and compare its stored values
130 * with those of the new tuple. If the tuple values are not consistent with
131 * the summary tuple, we need to update the index tuple.
132 *
133 * If autosummarization is enabled, check if we need to summarize the previous
134 * page range.
135 *
136 * If the range is not currently summarized (i.e. the revmap returns NULL for
137 * it), there's nothing to do for this tuple.
138 */
139 bool
brininsert(Relation idxRel,Datum * values,bool * nulls,ItemPointer heaptid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)140 brininsert(Relation idxRel, Datum *values, bool *nulls,
141 ItemPointer heaptid, Relation heapRel,
142 IndexUniqueCheck checkUnique,
143 IndexInfo *indexInfo)
144 {
145 BlockNumber pagesPerRange;
146 BlockNumber origHeapBlk;
147 BlockNumber heapBlk;
148 BrinDesc *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
149 BrinRevmap *revmap;
150 Buffer buf = InvalidBuffer;
151 MemoryContext tupcxt = NULL;
152 MemoryContext oldcxt = CurrentMemoryContext;
153 bool autosummarize = BrinGetAutoSummarize(idxRel);
154
155 revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
156
157 /*
158 * origHeapBlk is the block number where the insertion occurred. heapBlk
159 * is the first block in the corresponding page range.
160 */
161 origHeapBlk = ItemPointerGetBlockNumber(heaptid);
162 heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
163
164 for (;;)
165 {
166 bool need_insert = false;
167 OffsetNumber off;
168 BrinTuple *brtup;
169 BrinMemTuple *dtup;
170 int keyno;
171
172 CHECK_FOR_INTERRUPTS();
173
174 /*
175 * If auto-summarization is enabled and we just inserted the first
176 * tuple into the first block of a new non-first page range, request a
177 * summarization run of the previous range.
178 */
179 if (autosummarize &&
180 heapBlk > 0 &&
181 heapBlk == origHeapBlk &&
182 ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
183 {
184 BlockNumber lastPageRange = heapBlk - 1;
185 BrinTuple *lastPageTuple;
186
187 lastPageTuple =
188 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
189 NULL, BUFFER_LOCK_SHARE, NULL);
190 if (!lastPageTuple)
191 {
192 bool recorded;
193
194 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
195 RelationGetRelid(idxRel),
196 lastPageRange);
197 if (!recorded)
198 ereport(LOG,
199 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
200 errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
201 RelationGetRelationName(idxRel),
202 lastPageRange)));
203 }
204 else
205 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
206 }
207
208 brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
209 NULL, BUFFER_LOCK_SHARE, NULL);
210
211 /* if range is unsummarized, there's nothing to do */
212 if (!brtup)
213 break;
214
215 /* First time through in this statement? */
216 if (bdesc == NULL)
217 {
218 MemoryContextSwitchTo(indexInfo->ii_Context);
219 bdesc = brin_build_desc(idxRel);
220 indexInfo->ii_AmCache = (void *) bdesc;
221 MemoryContextSwitchTo(oldcxt);
222 }
223 /* First time through in this brininsert call? */
224 if (tupcxt == NULL)
225 {
226 tupcxt = AllocSetContextCreate(CurrentMemoryContext,
227 "brininsert cxt",
228 ALLOCSET_DEFAULT_SIZES);
229 MemoryContextSwitchTo(tupcxt);
230 }
231
232 dtup = brin_deform_tuple(bdesc, brtup, NULL);
233
234 /*
235 * Compare the key values of the new tuple to the stored index values;
236 * our deformed tuple will get updated if the new tuple doesn't fit
237 * the original range (note this means we can't break out of the loop
238 * early). Make a note of whether this happens, so that we know to
239 * insert the modified tuple later.
240 */
241 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
242 {
243 Datum result;
244 BrinValues *bval;
245 FmgrInfo *addValue;
246
247 bval = &dtup->bt_columns[keyno];
248 addValue = index_getprocinfo(idxRel, keyno + 1,
249 BRIN_PROCNUM_ADDVALUE);
250 result = FunctionCall4Coll(addValue,
251 idxRel->rd_indcollation[keyno],
252 PointerGetDatum(bdesc),
253 PointerGetDatum(bval),
254 values[keyno],
255 nulls[keyno]);
256 /* if that returned true, we need to insert the updated tuple */
257 need_insert |= DatumGetBool(result);
258 }
259
260 if (!need_insert)
261 {
262 /*
263 * The tuple is consistent with the new values, so there's nothing
264 * to do.
265 */
266 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
267 }
268 else
269 {
270 Page page = BufferGetPage(buf);
271 ItemId lp = PageGetItemId(page, off);
272 Size origsz;
273 BrinTuple *origtup;
274 Size newsz;
275 BrinTuple *newtup;
276 bool samepage;
277
278 /*
279 * Make a copy of the old tuple, so that we can compare it after
280 * re-acquiring the lock.
281 */
282 origsz = ItemIdGetLength(lp);
283 origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
284
285 /*
286 * Before releasing the lock, check if we can attempt a same-page
287 * update. Another process could insert a tuple concurrently in
288 * the same page though, so downstream we must be prepared to cope
289 * if this turns out to not be possible after all.
290 */
291 newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
292 samepage = brin_can_do_samepage_update(buf, origsz, newsz);
293 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
294
295 /*
296 * Try to update the tuple. If this doesn't work for whatever
297 * reason, we need to restart from the top; the revmap might be
298 * pointing at a different tuple for this block now, so we need to
299 * recompute to ensure both our new heap tuple and the other
300 * inserter's are covered by the combined tuple. It might be that
301 * we don't need to update at all.
302 */
303 if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
304 buf, off, origtup, origsz, newtup, newsz,
305 samepage))
306 {
307 /* no luck; start over */
308 MemoryContextResetAndDeleteChildren(tupcxt);
309 continue;
310 }
311 }
312
313 /* success! */
314 break;
315 }
316
317 brinRevmapTerminate(revmap);
318 if (BufferIsValid(buf))
319 ReleaseBuffer(buf);
320 MemoryContextSwitchTo(oldcxt);
321 if (tupcxt != NULL)
322 MemoryContextDelete(tupcxt);
323
324 return false;
325 }
326
327 /*
328 * Initialize state for a BRIN index scan.
329 *
330 * We read the metapage here to determine the pages-per-range number that this
331 * index was built with. Note that since this cannot be changed while we're
332 * holding lock on index, it's not necessary to recompute it during brinrescan.
333 */
334 IndexScanDesc
brinbeginscan(Relation r,int nkeys,int norderbys)335 brinbeginscan(Relation r, int nkeys, int norderbys)
336 {
337 IndexScanDesc scan;
338 BrinOpaque *opaque;
339
340 scan = RelationGetIndexScan(r, nkeys, norderbys);
341
342 opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
343 opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
344 scan->xs_snapshot);
345 opaque->bo_bdesc = brin_build_desc(r);
346 scan->opaque = opaque;
347
348 return scan;
349 }
350
351 /*
352 * Execute the index scan.
353 *
354 * This works by reading index TIDs from the revmap, and obtaining the index
355 * tuples pointed to by them; the summary values in the index tuples are
356 * compared to the scan keys. We return into the TID bitmap all the pages in
357 * ranges corresponding to index tuples that match the scan keys.
358 *
359 * If a TID from the revmap is read as InvalidTID, we know that range is
360 * unsummarized. Pages in those ranges need to be returned regardless of scan
361 * keys.
362 */
363 int64
bringetbitmap(IndexScanDesc scan,TIDBitmap * tbm)364 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
365 {
366 Relation idxRel = scan->indexRelation;
367 Buffer buf = InvalidBuffer;
368 BrinDesc *bdesc;
369 Oid heapOid;
370 Relation heapRel;
371 BrinOpaque *opaque;
372 BlockNumber nblocks;
373 BlockNumber heapBlk;
374 int totalpages = 0;
375 FmgrInfo *consistentFn;
376 MemoryContext oldcxt;
377 MemoryContext perRangeCxt;
378 BrinMemTuple *dtup;
379 BrinTuple *btup = NULL;
380 Size btupsz = 0;
381
382 opaque = (BrinOpaque *) scan->opaque;
383 bdesc = opaque->bo_bdesc;
384 pgstat_count_index_scan(idxRel);
385
386 /*
387 * We need to know the size of the table so that we know how long to
388 * iterate on the revmap.
389 */
390 heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
391 heapRel = heap_open(heapOid, AccessShareLock);
392 nblocks = RelationGetNumberOfBlocks(heapRel);
393 heap_close(heapRel, AccessShareLock);
394
395 /*
396 * Make room for the consistent support procedures of indexed columns. We
397 * don't look them up here; we do that lazily the first time we see a scan
398 * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
399 */
400 consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
401
402 /* allocate an initial in-memory tuple, out of the per-range memcxt */
403 dtup = brin_new_memtuple(bdesc);
404
405 /*
406 * Setup and use a per-range memory context, which is reset every time we
407 * loop below. This avoids having to free the tuples within the loop.
408 */
409 perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
410 "bringetbitmap cxt",
411 ALLOCSET_DEFAULT_SIZES);
412 oldcxt = MemoryContextSwitchTo(perRangeCxt);
413
414 /*
415 * Now scan the revmap. We start by querying for heap page 0,
416 * incrementing by the number of pages per range; this gives us a full
417 * view of the table.
418 */
419 for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
420 {
421 bool addrange;
422 bool gottuple = false;
423 BrinTuple *tup;
424 OffsetNumber off;
425 Size size;
426
427 CHECK_FOR_INTERRUPTS();
428
429 MemoryContextResetAndDeleteChildren(perRangeCxt);
430
431 tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
432 &off, &size, BUFFER_LOCK_SHARE,
433 scan->xs_snapshot);
434 if (tup)
435 {
436 gottuple = true;
437 btup = brin_copy_tuple(tup, size, btup, &btupsz);
438 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
439 }
440
441 /*
442 * For page ranges with no indexed tuple, we must return the whole
443 * range; otherwise, compare it to the scan keys.
444 */
445 if (!gottuple)
446 {
447 addrange = true;
448 }
449 else
450 {
451 dtup = brin_deform_tuple(bdesc, btup, dtup);
452 if (dtup->bt_placeholder)
453 {
454 /*
455 * Placeholder tuples are always returned, regardless of the
456 * values stored in them.
457 */
458 addrange = true;
459 }
460 else
461 {
462 int keyno;
463
464 /*
465 * Compare scan keys with summary values stored for the range.
466 * If scan keys are matched, the page range must be added to
467 * the bitmap. We initially assume the range needs to be
468 * added; in particular this serves the case where there are
469 * no keys.
470 */
471 addrange = true;
472 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
473 {
474 ScanKey key = &scan->keyData[keyno];
475 AttrNumber keyattno = key->sk_attno;
476 BrinValues *bval = &dtup->bt_columns[keyattno - 1];
477 Datum add;
478
479 /*
480 * The collation of the scan key must match the collation
481 * used in the index column (but only if the search is not
482 * IS NULL/ IS NOT NULL). Otherwise we shouldn't be using
483 * this index ...
484 */
485 Assert((key->sk_flags & SK_ISNULL) ||
486 (key->sk_collation ==
487 TupleDescAttr(bdesc->bd_tupdesc,
488 keyattno - 1)->attcollation));
489
490 /* First time this column? look up consistent function */
491 if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
492 {
493 FmgrInfo *tmp;
494
495 tmp = index_getprocinfo(idxRel, keyattno,
496 BRIN_PROCNUM_CONSISTENT);
497 fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
498 CurrentMemoryContext);
499 }
500
501 /*
502 * Check whether the scan key is consistent with the page
503 * range values; if so, have the pages in the range added
504 * to the output bitmap.
505 *
506 * When there are multiple scan keys, failure to meet the
507 * criteria for a single one of them is enough to discard
508 * the range as a whole, so break out of the loop as soon
509 * as a false return value is obtained.
510 */
511 add = FunctionCall3Coll(&consistentFn[keyattno - 1],
512 key->sk_collation,
513 PointerGetDatum(bdesc),
514 PointerGetDatum(bval),
515 PointerGetDatum(key));
516 addrange = DatumGetBool(add);
517 if (!addrange)
518 break;
519 }
520 }
521 }
522
523 /* add the pages in the range to the output bitmap, if needed */
524 if (addrange)
525 {
526 BlockNumber pageno;
527
528 for (pageno = heapBlk;
529 pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
530 pageno++)
531 {
532 MemoryContextSwitchTo(oldcxt);
533 tbm_add_page(tbm, pageno);
534 totalpages++;
535 MemoryContextSwitchTo(perRangeCxt);
536 }
537 }
538 }
539
540 MemoryContextSwitchTo(oldcxt);
541 MemoryContextDelete(perRangeCxt);
542
543 if (buf != InvalidBuffer)
544 ReleaseBuffer(buf);
545
546 /*
547 * XXX We have an approximation of the number of *pages* that our scan
548 * returns, but we don't have a precise idea of the number of heap tuples
549 * involved.
550 */
551 return totalpages * 10;
552 }
553
554 /*
555 * Re-initialize state for a BRIN index scan
556 */
557 void
brinrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)558 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
559 ScanKey orderbys, int norderbys)
560 {
561 /*
562 * Other index AMs preprocess the scan keys at this point, or sometime
563 * early during the scan; this lets them optimize by removing redundant
564 * keys, or doing early returns when they are impossible to satisfy; see
565 * _bt_preprocess_keys for an example. Something like that could be added
566 * here someday, too.
567 */
568
569 if (scankey && scan->numberOfKeys > 0)
570 memmove(scan->keyData, scankey,
571 scan->numberOfKeys * sizeof(ScanKeyData));
572 }
573
574 /*
575 * Close down a BRIN index scan
576 */
577 void
brinendscan(IndexScanDesc scan)578 brinendscan(IndexScanDesc scan)
579 {
580 BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
581
582 brinRevmapTerminate(opaque->bo_rmAccess);
583 brin_free_desc(opaque->bo_bdesc);
584 pfree(opaque);
585 }
586
587 /*
588 * Per-heap-tuple callback for IndexBuildHeapScan.
589 *
590 * Note we don't worry about the page range at the end of the table here; it is
591 * present in the build state struct after we're called the last time, but not
592 * inserted into the index. Caller must ensure to do so, if appropriate.
593 */
594 static void
brinbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * brstate)595 brinbuildCallback(Relation index,
596 HeapTuple htup,
597 Datum *values,
598 bool *isnull,
599 bool tupleIsAlive,
600 void *brstate)
601 {
602 BrinBuildState *state = (BrinBuildState *) brstate;
603 BlockNumber thisblock;
604 int i;
605
606 thisblock = ItemPointerGetBlockNumber(&htup->t_self);
607
608 /*
609 * If we're in a block that belongs to a future range, summarize what
610 * we've got and start afresh. Note the scan might have skipped many
611 * pages, if they were devoid of live tuples; make sure to insert index
612 * tuples for those too.
613 */
614 while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
615 {
616
617 BRIN_elog((DEBUG2,
618 "brinbuildCallback: completed a range: %u--%u",
619 state->bs_currRangeStart,
620 state->bs_currRangeStart + state->bs_pagesPerRange));
621
622 /* create the index tuple and insert it */
623 form_and_insert_tuple(state);
624
625 /* set state to correspond to the next range */
626 state->bs_currRangeStart += state->bs_pagesPerRange;
627
628 /* re-initialize state for it */
629 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
630 }
631
632 /* Accumulate the current tuple into the running state */
633 for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
634 {
635 FmgrInfo *addValue;
636 BrinValues *col;
637 Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
638
639 col = &state->bs_dtuple->bt_columns[i];
640 addValue = index_getprocinfo(index, i + 1,
641 BRIN_PROCNUM_ADDVALUE);
642
643 /*
644 * Update dtuple state, if and as necessary.
645 */
646 FunctionCall4Coll(addValue,
647 attr->attcollation,
648 PointerGetDatum(state->bs_bdesc),
649 PointerGetDatum(col),
650 values[i], isnull[i]);
651 }
652 }
653
654 /*
655 * brinbuild() -- build a new BRIN index.
656 */
657 IndexBuildResult *
brinbuild(Relation heap,Relation index,IndexInfo * indexInfo)658 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
659 {
660 IndexBuildResult *result;
661 double reltuples;
662 double idxtuples;
663 BrinRevmap *revmap;
664 BrinBuildState *state;
665 Buffer meta;
666 BlockNumber pagesPerRange;
667
668 /*
669 * We expect to be called exactly once for any index relation.
670 */
671 if (RelationGetNumberOfBlocks(index) != 0)
672 elog(ERROR, "index \"%s\" already contains data",
673 RelationGetRelationName(index));
674
675 /*
676 * Critical section not required, because on error the creation of the
677 * whole relation will be rolled back.
678 */
679
680 meta = ReadBuffer(index, P_NEW);
681 Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
682 LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
683
684 brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
685 BRIN_CURRENT_VERSION);
686 MarkBufferDirty(meta);
687
688 if (RelationNeedsWAL(index))
689 {
690 xl_brin_createidx xlrec;
691 XLogRecPtr recptr;
692 Page page;
693
694 xlrec.version = BRIN_CURRENT_VERSION;
695 xlrec.pagesPerRange = BrinGetPagesPerRange(index);
696
697 XLogBeginInsert();
698 XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
699 XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
700
701 recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
702
703 page = BufferGetPage(meta);
704 PageSetLSN(page, recptr);
705 }
706
707 UnlockReleaseBuffer(meta);
708
709 /*
710 * Initialize our state, including the deformed tuple state.
711 */
712 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
713 state = initialize_brin_buildstate(index, revmap, pagesPerRange);
714
715 /*
716 * Now scan the relation. No syncscan allowed here because we want the
717 * heap blocks in physical order.
718 */
719 reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
720 brinbuildCallback, (void *) state, NULL);
721
722 /* process the final batch */
723 form_and_insert_tuple(state);
724
725 /* release resources */
726 idxtuples = state->bs_numtuples;
727 brinRevmapTerminate(state->bs_rmAccess);
728 terminate_brin_buildstate(state);
729
730 /*
731 * Return statistics
732 */
733 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
734
735 result->heap_tuples = reltuples;
736 result->index_tuples = idxtuples;
737
738 return result;
739 }
740
741 void
brinbuildempty(Relation index)742 brinbuildempty(Relation index)
743 {
744 Buffer metabuf;
745
746 /* An empty BRIN index has a metapage only. */
747 metabuf =
748 ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
749 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
750
751 /* Initialize and xlog metabuffer. */
752 START_CRIT_SECTION();
753 brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
754 BRIN_CURRENT_VERSION);
755 MarkBufferDirty(metabuf);
756 log_newpage_buffer(metabuf, true);
757 END_CRIT_SECTION();
758
759 UnlockReleaseBuffer(metabuf);
760 }
761
762 /*
763 * brinbulkdelete
764 * Since there are no per-heap-tuple index tuples in BRIN indexes,
765 * there's not a lot we can do here.
766 *
767 * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
768 * tuple is deleted), meaning the need to re-run summarization on the affected
769 * range. Would need to add an extra flag in brintuples for that.
770 */
771 IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)772 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
773 IndexBulkDeleteCallback callback, void *callback_state)
774 {
775 /* allocate stats if first time through, else re-use existing struct */
776 if (stats == NULL)
777 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
778
779 return stats;
780 }
781
782 /*
783 * This routine is in charge of "vacuuming" a BRIN index: we just summarize
784 * ranges that are currently unsummarized.
785 */
786 IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)787 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
788 {
789 Relation heapRel;
790
791 /* No-op in ANALYZE ONLY mode */
792 if (info->analyze_only)
793 return stats;
794
795 if (!stats)
796 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
797 stats->num_pages = RelationGetNumberOfBlocks(info->index);
798 /* rest of stats is initialized by zeroing */
799
800 heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
801 AccessShareLock);
802
803 brin_vacuum_scan(info->index, info->strategy);
804
805 brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
806 &stats->num_index_tuples, &stats->num_index_tuples);
807
808 heap_close(heapRel, AccessShareLock);
809
810 return stats;
811 }
812
813 /*
814 * reloptions processor for BRIN indexes
815 */
816 bytea *
brinoptions(Datum reloptions,bool validate)817 brinoptions(Datum reloptions, bool validate)
818 {
819 relopt_value *options;
820 BrinOptions *rdopts;
821 int numoptions;
822 static const relopt_parse_elt tab[] = {
823 {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
824 {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
825 };
826
827 options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
828 &numoptions);
829
830 /* if none set, we're done */
831 if (numoptions == 0)
832 return NULL;
833
834 rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
835
836 fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
837 validate, tab, lengthof(tab));
838
839 pfree(options);
840
841 return (bytea *) rdopts;
842 }
843
844 /*
845 * SQL-callable function to scan through an index and summarize all ranges
846 * that are not currently summarized.
847 */
848 Datum
brin_summarize_new_values(PG_FUNCTION_ARGS)849 brin_summarize_new_values(PG_FUNCTION_ARGS)
850 {
851 Datum relation = PG_GETARG_DATUM(0);
852
853 return DirectFunctionCall2(brin_summarize_range,
854 relation,
855 Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
856 }
857
858 /*
859 * SQL-callable function to summarize the indicated page range, if not already
860 * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
861 * unsummarized ranges are summarized.
862 */
863 Datum
brin_summarize_range(PG_FUNCTION_ARGS)864 brin_summarize_range(PG_FUNCTION_ARGS)
865 {
866 Oid indexoid = PG_GETARG_OID(0);
867 int64 heapBlk64 = PG_GETARG_INT64(1);
868 BlockNumber heapBlk;
869 Oid heapoid;
870 Relation indexRel;
871 Relation heapRel;
872 double numSummarized = 0;
873
874 if (RecoveryInProgress())
875 ereport(ERROR,
876 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
877 errmsg("recovery is in progress"),
878 errhint("BRIN control functions cannot be executed during recovery.")));
879
880 if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
881 {
882 char *blk = psprintf(INT64_FORMAT, heapBlk64);
883
884 ereport(ERROR,
885 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
886 errmsg("block number out of range: %s", blk)));
887 }
888 heapBlk = (BlockNumber) heapBlk64;
889
890 /*
891 * We must lock table before index to avoid deadlocks. However, if the
892 * passed indexoid isn't an index then IndexGetRelation() will fail.
893 * Rather than emitting a not-very-helpful error message, postpone
894 * complaining, expecting that the is-it-an-index test below will fail.
895 */
896 heapoid = IndexGetRelation(indexoid, true);
897 if (OidIsValid(heapoid))
898 heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
899 else
900 heapRel = NULL;
901
902 indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
903
904 /* Must be a BRIN index */
905 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
906 indexRel->rd_rel->relam != BRIN_AM_OID)
907 ereport(ERROR,
908 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
909 errmsg("\"%s\" is not a BRIN index",
910 RelationGetRelationName(indexRel))));
911
912 /* User must own the index (comparable to privileges needed for VACUUM) */
913 if (!pg_class_ownercheck(indexoid, GetUserId()))
914 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
915 RelationGetRelationName(indexRel));
916
917 /*
918 * Since we did the IndexGetRelation call above without any lock, it's
919 * barely possible that a race against an index drop/recreation could have
920 * netted us the wrong table. Recheck.
921 */
922 if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
923 ereport(ERROR,
924 (errcode(ERRCODE_UNDEFINED_TABLE),
925 errmsg("could not open parent table of index %s",
926 RelationGetRelationName(indexRel))));
927
928 /* OK, do it */
929 brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
930
931 relation_close(indexRel, ShareUpdateExclusiveLock);
932 relation_close(heapRel, ShareUpdateExclusiveLock);
933
934 PG_RETURN_INT32((int32) numSummarized);
935 }
936
937 /*
938 * SQL-callable interface to mark a range as no longer summarized
939 */
940 Datum
brin_desummarize_range(PG_FUNCTION_ARGS)941 brin_desummarize_range(PG_FUNCTION_ARGS)
942 {
943 Oid indexoid = PG_GETARG_OID(0);
944 int64 heapBlk64 = PG_GETARG_INT64(1);
945 BlockNumber heapBlk;
946 Oid heapoid;
947 Relation heapRel;
948 Relation indexRel;
949 bool done;
950
951 if (RecoveryInProgress())
952 ereport(ERROR,
953 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
954 errmsg("recovery is in progress"),
955 errhint("BRIN control functions cannot be executed during recovery.")));
956
957 if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
958 {
959 char *blk = psprintf(INT64_FORMAT, heapBlk64);
960
961 ereport(ERROR,
962 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
963 errmsg("block number out of range: %s", blk)));
964 }
965 heapBlk = (BlockNumber) heapBlk64;
966
967 /*
968 * We must lock table before index to avoid deadlocks. However, if the
969 * passed indexoid isn't an index then IndexGetRelation() will fail.
970 * Rather than emitting a not-very-helpful error message, postpone
971 * complaining, expecting that the is-it-an-index test below will fail.
972 */
973 heapoid = IndexGetRelation(indexoid, true);
974 if (OidIsValid(heapoid))
975 heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
976 else
977 heapRel = NULL;
978
979 indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
980
981 /* Must be a BRIN index */
982 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
983 indexRel->rd_rel->relam != BRIN_AM_OID)
984 ereport(ERROR,
985 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
986 errmsg("\"%s\" is not a BRIN index",
987 RelationGetRelationName(indexRel))));
988
989 /* User must own the index (comparable to privileges needed for VACUUM) */
990 if (!pg_class_ownercheck(indexoid, GetUserId()))
991 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
992 RelationGetRelationName(indexRel));
993
994 /*
995 * Since we did the IndexGetRelation call above without any lock, it's
996 * barely possible that a race against an index drop/recreation could have
997 * netted us the wrong table. Recheck.
998 */
999 if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1000 ereport(ERROR,
1001 (errcode(ERRCODE_UNDEFINED_TABLE),
1002 errmsg("could not open parent table of index %s",
1003 RelationGetRelationName(indexRel))));
1004
1005 /* the revmap does the hard work */
1006 do
1007 {
1008 done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1009 }
1010 while (!done);
1011
1012 relation_close(indexRel, ShareUpdateExclusiveLock);
1013 relation_close(heapRel, ShareUpdateExclusiveLock);
1014
1015 PG_RETURN_VOID();
1016 }
1017
1018 /*
1019 * Build a BrinDesc used to create or scan a BRIN index
1020 */
1021 BrinDesc *
brin_build_desc(Relation rel)1022 brin_build_desc(Relation rel)
1023 {
1024 BrinOpcInfo **opcinfo;
1025 BrinDesc *bdesc;
1026 TupleDesc tupdesc;
1027 int totalstored = 0;
1028 int keyno;
1029 long totalsize;
1030 MemoryContext cxt;
1031 MemoryContext oldcxt;
1032
1033 cxt = AllocSetContextCreate(CurrentMemoryContext,
1034 "brin desc cxt",
1035 ALLOCSET_SMALL_SIZES);
1036 oldcxt = MemoryContextSwitchTo(cxt);
1037 tupdesc = RelationGetDescr(rel);
1038
1039 /*
1040 * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1041 * the number of columns stored, since the number is opclass-defined.
1042 */
1043 opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1044 for (keyno = 0; keyno < tupdesc->natts; keyno++)
1045 {
1046 FmgrInfo *opcInfoFn;
1047 Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1048
1049 opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1050
1051 opcinfo[keyno] = (BrinOpcInfo *)
1052 DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1053 totalstored += opcinfo[keyno]->oi_nstored;
1054 }
1055
1056 /* Allocate our result struct and fill it in */
1057 totalsize = offsetof(BrinDesc, bd_info) +
1058 sizeof(BrinOpcInfo *) * tupdesc->natts;
1059
1060 bdesc = palloc(totalsize);
1061 bdesc->bd_context = cxt;
1062 bdesc->bd_index = rel;
1063 bdesc->bd_tupdesc = tupdesc;
1064 bdesc->bd_disktdesc = NULL; /* generated lazily */
1065 bdesc->bd_totalstored = totalstored;
1066
1067 for (keyno = 0; keyno < tupdesc->natts; keyno++)
1068 bdesc->bd_info[keyno] = opcinfo[keyno];
1069 pfree(opcinfo);
1070
1071 MemoryContextSwitchTo(oldcxt);
1072
1073 return bdesc;
1074 }
1075
1076 void
brin_free_desc(BrinDesc * bdesc)1077 brin_free_desc(BrinDesc *bdesc)
1078 {
1079 /* make sure the tupdesc is still valid */
1080 Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1081 /* no need for retail pfree */
1082 MemoryContextDelete(bdesc->bd_context);
1083 }
1084
1085 /*
1086 * Fetch index's statistical data into *stats
1087 */
1088 void
brinGetStats(Relation index,BrinStatsData * stats)1089 brinGetStats(Relation index, BrinStatsData *stats)
1090 {
1091 Buffer metabuffer;
1092 Page metapage;
1093 BrinMetaPageData *metadata;
1094
1095 metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1096 LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1097 metapage = BufferGetPage(metabuffer);
1098 metadata = (BrinMetaPageData *) PageGetContents(metapage);
1099
1100 stats->pagesPerRange = metadata->pagesPerRange;
1101 stats->revmapNumPages = metadata->lastRevmapPage - 1;
1102
1103 UnlockReleaseBuffer(metabuffer);
1104 }
1105
1106 /*
1107 * Initialize a BrinBuildState appropriate to create tuples on the given index.
1108 */
1109 static BrinBuildState *
initialize_brin_buildstate(Relation idxRel,BrinRevmap * revmap,BlockNumber pagesPerRange)1110 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1111 BlockNumber pagesPerRange)
1112 {
1113 BrinBuildState *state;
1114
1115 state = palloc(sizeof(BrinBuildState));
1116
1117 state->bs_irel = idxRel;
1118 state->bs_numtuples = 0;
1119 state->bs_currentInsertBuf = InvalidBuffer;
1120 state->bs_pagesPerRange = pagesPerRange;
1121 state->bs_currRangeStart = 0;
1122 state->bs_rmAccess = revmap;
1123 state->bs_bdesc = brin_build_desc(idxRel);
1124 state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1125
1126 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1127
1128 return state;
1129 }
1130
1131 /*
1132 * Release resources associated with a BrinBuildState.
1133 */
1134 static void
terminate_brin_buildstate(BrinBuildState * state)1135 terminate_brin_buildstate(BrinBuildState *state)
1136 {
1137 /*
1138 * Release the last index buffer used. We might as well ensure that
1139 * whatever free space remains in that page is available in FSM, too.
1140 */
1141 if (!BufferIsInvalid(state->bs_currentInsertBuf))
1142 {
1143 Page page;
1144 Size freespace;
1145 BlockNumber blk;
1146
1147 page = BufferGetPage(state->bs_currentInsertBuf);
1148 freespace = PageGetFreeSpace(page);
1149 blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1150 ReleaseBuffer(state->bs_currentInsertBuf);
1151 RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1152 FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1153 }
1154
1155 brin_free_desc(state->bs_bdesc);
1156 pfree(state->bs_dtuple);
1157 pfree(state);
1158 }
1159
1160 /*
1161 * On the given BRIN index, summarize the heap page range that corresponds
1162 * to the heap block number given.
1163 *
1164 * This routine can run in parallel with insertions into the heap. To avoid
1165 * missing those values from the summary tuple, we first insert a placeholder
1166 * index tuple into the index, then execute the heap scan; transactions
1167 * concurrent with the scan update the placeholder tuple. After the scan, we
1168 * union the placeholder tuple with the one computed by this routine. The
1169 * update of the index value happens in a loop, so that if somebody updates
1170 * the placeholder tuple after we read it, we detect the case and try again.
1171 * This ensures that the concurrently inserted tuples are not lost.
1172 *
1173 * A further corner case is this routine being asked to summarize the partial
1174 * range at the end of the table. heapNumBlocks is the (possibly outdated)
1175 * table size; if we notice that the requested range lies beyond that size,
1176 * we re-compute the table size after inserting the placeholder tuple, to
1177 * avoid missing pages that were appended recently.
1178 */
1179 static void
summarize_range(IndexInfo * indexInfo,BrinBuildState * state,Relation heapRel,BlockNumber heapBlk,BlockNumber heapNumBlks)1180 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1181 BlockNumber heapBlk, BlockNumber heapNumBlks)
1182 {
1183 Buffer phbuf;
1184 BrinTuple *phtup;
1185 Size phsz;
1186 OffsetNumber offset;
1187 BlockNumber scanNumBlks;
1188
1189 /*
1190 * Insert the placeholder tuple
1191 */
1192 phbuf = InvalidBuffer;
1193 phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1194 offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1195 state->bs_rmAccess, &phbuf,
1196 heapBlk, phtup, phsz);
1197
1198 /*
1199 * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1200 * cannot shrink concurrently (but it can grow).
1201 */
1202 Assert(heapBlk % state->bs_pagesPerRange == 0);
1203 if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1204 {
1205 /*
1206 * If we're asked to scan what we believe to be the final range on the
1207 * table (i.e. a range that might be partial) we need to recompute our
1208 * idea of what the latest page is after inserting the placeholder
1209 * tuple. Anyone that grows the table later will update the
1210 * placeholder tuple, so it doesn't matter that we won't scan these
1211 * pages ourselves. Careful: the table might have been extended
1212 * beyond the current range, so clamp our result.
1213 *
1214 * Fortunately, this should occur infrequently.
1215 */
1216 scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1217 state->bs_pagesPerRange);
1218 }
1219 else
1220 {
1221 /* Easy case: range is known to be complete */
1222 scanNumBlks = state->bs_pagesPerRange;
1223 }
1224
1225 /*
1226 * Execute the partial heap scan covering the heap blocks in the specified
1227 * page range, summarizing the heap tuples in it. This scan stops just
1228 * short of brinbuildCallback creating the new index entry.
1229 *
1230 * Note that it is critical we use the "any visible" mode of
1231 * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
1232 * by transactions that are still in progress, among other corner cases.
1233 */
1234 state->bs_currRangeStart = heapBlk;
1235 IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
1236 heapBlk, scanNumBlks,
1237 brinbuildCallback, (void *) state, NULL);
1238
1239 /*
1240 * Now we update the values obtained by the scan with the placeholder
1241 * tuple. We do this in a loop which only terminates if we're able to
1242 * update the placeholder tuple successfully; if we are not, this means
1243 * somebody else modified the placeholder tuple after we read it.
1244 */
1245 for (;;)
1246 {
1247 BrinTuple *newtup;
1248 Size newsize;
1249 bool didupdate;
1250 bool samepage;
1251
1252 CHECK_FOR_INTERRUPTS();
1253
1254 /*
1255 * Update the summary tuple and try to update.
1256 */
1257 newtup = brin_form_tuple(state->bs_bdesc,
1258 heapBlk, state->bs_dtuple, &newsize);
1259 samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1260 didupdate =
1261 brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1262 state->bs_rmAccess, heapBlk, phbuf, offset,
1263 phtup, phsz, newtup, newsize, samepage);
1264 brin_free_tuple(phtup);
1265 brin_free_tuple(newtup);
1266
1267 /* If the update succeeded, we're done. */
1268 if (didupdate)
1269 break;
1270
1271 /*
1272 * If the update didn't work, it might be because somebody updated the
1273 * placeholder tuple concurrently. Extract the new version, union it
1274 * with the values we have from the scan, and start over. (There are
1275 * other reasons for the update to fail, but it's simple to treat them
1276 * the same.)
1277 */
1278 phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1279 &offset, &phsz, BUFFER_LOCK_SHARE,
1280 NULL);
1281 /* the placeholder tuple must exist */
1282 if (phtup == NULL)
1283 elog(ERROR, "missing placeholder tuple");
1284 phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1285 LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1286
1287 /* merge it into the tuple from the heap scan */
1288 union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1289 }
1290
1291 ReleaseBuffer(phbuf);
1292 }
1293
1294 /*
1295 * Summarize page ranges that are not already summarized. If pageRange is
1296 * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1297 * page range containing the given heap page number is scanned.
1298 * If include_partial is true, then the partial range at the end of the table
1299 * is summarized, otherwise not.
1300 *
1301 * For each new index tuple inserted, *numSummarized (if not NULL) is
1302 * incremented; for each existing tuple, *numExisting (if not NULL) is
1303 * incremented.
1304 */
1305 static void
brinsummarize(Relation index,Relation heapRel,BlockNumber pageRange,bool include_partial,double * numSummarized,double * numExisting)1306 brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1307 bool include_partial, double *numSummarized, double *numExisting)
1308 {
1309 BrinRevmap *revmap;
1310 BrinBuildState *state = NULL;
1311 IndexInfo *indexInfo = NULL;
1312 BlockNumber heapNumBlocks;
1313 BlockNumber pagesPerRange;
1314 Buffer buf;
1315 BlockNumber startBlk;
1316
1317 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1318
1319 /* determine range of pages to process */
1320 heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1321 if (pageRange == BRIN_ALL_BLOCKRANGES)
1322 startBlk = 0;
1323 else
1324 {
1325 startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1326 heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1327 }
1328 if (startBlk > heapNumBlocks)
1329 {
1330 /* Nothing to do if start point is beyond end of table */
1331 brinRevmapTerminate(revmap);
1332 return;
1333 }
1334
1335 /*
1336 * Scan the revmap to find unsummarized items.
1337 */
1338 buf = InvalidBuffer;
1339 for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1340 {
1341 BrinTuple *tup;
1342 OffsetNumber off;
1343
1344 /*
1345 * Unless requested to summarize even a partial range, go away now if
1346 * we think the next range is partial. Caller would pass true when it
1347 * is typically run once bulk data loading is done
1348 * (brin_summarize_new_values), and false when it is typically the
1349 * result of arbitrarily-scheduled maintenance command (vacuuming).
1350 */
1351 if (!include_partial &&
1352 (startBlk + pagesPerRange > heapNumBlocks))
1353 break;
1354
1355 CHECK_FOR_INTERRUPTS();
1356
1357 tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1358 BUFFER_LOCK_SHARE, NULL);
1359 if (tup == NULL)
1360 {
1361 /* no revmap entry for this heap range. Summarize it. */
1362 if (state == NULL)
1363 {
1364 /* first time through */
1365 Assert(!indexInfo);
1366 state = initialize_brin_buildstate(index, revmap,
1367 pagesPerRange);
1368 indexInfo = BuildIndexInfo(index);
1369 }
1370 summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1371
1372 /* and re-initialize state for the next range */
1373 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1374
1375 if (numSummarized)
1376 *numSummarized += 1.0;
1377 }
1378 else
1379 {
1380 if (numExisting)
1381 *numExisting += 1.0;
1382 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1383 }
1384 }
1385
1386 if (BufferIsValid(buf))
1387 ReleaseBuffer(buf);
1388
1389 /* free resources */
1390 brinRevmapTerminate(revmap);
1391 if (state)
1392 {
1393 terminate_brin_buildstate(state);
1394 pfree(indexInfo);
1395 }
1396 }
1397
1398 /*
1399 * Given a deformed tuple in the build state, convert it into the on-disk
1400 * format and insert it into the index, making the revmap point to it.
1401 */
1402 static void
form_and_insert_tuple(BrinBuildState * state)1403 form_and_insert_tuple(BrinBuildState *state)
1404 {
1405 BrinTuple *tup;
1406 Size size;
1407
1408 tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1409 state->bs_dtuple, &size);
1410 brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1411 &state->bs_currentInsertBuf, state->bs_currRangeStart,
1412 tup, size);
1413 state->bs_numtuples++;
1414
1415 pfree(tup);
1416 }
1417
1418 /*
1419 * Given two deformed tuples, adjust the first one so that it's consistent
1420 * with the summary values in both.
1421 */
1422 static void
union_tuples(BrinDesc * bdesc,BrinMemTuple * a,BrinTuple * b)1423 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1424 {
1425 int keyno;
1426 BrinMemTuple *db;
1427 MemoryContext cxt;
1428 MemoryContext oldcxt;
1429
1430 /* Use our own memory context to avoid retail pfree */
1431 cxt = AllocSetContextCreate(CurrentMemoryContext,
1432 "brin union",
1433 ALLOCSET_DEFAULT_SIZES);
1434 oldcxt = MemoryContextSwitchTo(cxt);
1435 db = brin_deform_tuple(bdesc, b, NULL);
1436 MemoryContextSwitchTo(oldcxt);
1437
1438 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1439 {
1440 FmgrInfo *unionFn;
1441 BrinValues *col_a = &a->bt_columns[keyno];
1442 BrinValues *col_b = &db->bt_columns[keyno];
1443
1444 unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1445 BRIN_PROCNUM_UNION);
1446 FunctionCall3Coll(unionFn,
1447 bdesc->bd_index->rd_indcollation[keyno],
1448 PointerGetDatum(bdesc),
1449 PointerGetDatum(col_a),
1450 PointerGetDatum(col_b));
1451 }
1452
1453 MemoryContextDelete(cxt);
1454 }
1455
1456 /*
1457 * brin_vacuum_scan
1458 * Do a complete scan of the index during VACUUM.
1459 *
1460 * This routine scans the complete index looking for uncatalogued index pages,
1461 * i.e. those that might have been lost due to a crash after index extension
1462 * and such.
1463 */
1464 static void
brin_vacuum_scan(Relation idxrel,BufferAccessStrategy strategy)1465 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1466 {
1467 BlockNumber nblocks;
1468 BlockNumber blkno;
1469
1470 /*
1471 * Scan the index in physical order, and clean up any possible mess in
1472 * each page.
1473 */
1474 nblocks = RelationGetNumberOfBlocks(idxrel);
1475 for (blkno = 0; blkno < nblocks; blkno++)
1476 {
1477 Buffer buf;
1478
1479 CHECK_FOR_INTERRUPTS();
1480
1481 buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1482 RBM_NORMAL, strategy);
1483
1484 brin_page_cleanup(idxrel, buf);
1485
1486 ReleaseBuffer(buf);
1487 }
1488
1489 /*
1490 * Update all upper pages in the index's FSM, as well. This ensures not
1491 * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
1492 * but also that any pre-existing damage or out-of-dateness is repaired.
1493 */
1494 FreeSpaceMapVacuum(idxrel);
1495 }
1496