1 /*
2 * brin.c
3 * Implementation of BRIN indexes for Postgres
4 *
5 * See src/backend/access/brin/README for details.
6 *
7 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/brin/brin.c
12 *
13 * TODO
14 * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 */
16 #include "postgres.h"
17
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/reloptions.h"
23 #include "access/relscan.h"
24 #include "access/xloginsert.h"
25 #include "catalog/index.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "pgstat.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/bufmgr.h"
31 #include "storage/freespace.h"
32 #include "utils/builtins.h"
33 #include "utils/index_selfuncs.h"
34 #include "utils/memutils.h"
35 #include "utils/rel.h"
36
37
38 /*
39 * We use a BrinBuildState during initial construction of a BRIN index.
40 * The running state is kept in a BrinMemTuple.
41 */
42 typedef struct BrinBuildState
43 {
44 Relation bs_irel;
45 int bs_numtuples;
46 Buffer bs_currentInsertBuf;
47 BlockNumber bs_pagesPerRange;
48 BlockNumber bs_currRangeStart;
49 BrinRevmap *bs_rmAccess;
50 BrinDesc *bs_bdesc;
51 BrinMemTuple *bs_dtuple;
52 } BrinBuildState;
53
54 /*
55 * Struct used as "opaque" during index scans
56 */
57 typedef struct BrinOpaque
58 {
59 BlockNumber bo_pagesPerRange;
60 BrinRevmap *bo_rmAccess;
61 BrinDesc *bo_bdesc;
62 } BrinOpaque;
63
64 #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
65
66 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
67 BrinRevmap *revmap, BlockNumber pagesPerRange);
68 static void terminate_brin_buildstate(BrinBuildState *state);
69 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
70 bool include_partial, double *numSummarized, double *numExisting);
71 static void form_and_insert_tuple(BrinBuildState *state);
72 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
73 BrinTuple *b);
74 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
75
76
77 /*
78 * BRIN handler function: return IndexAmRoutine with access method parameters
79 * and callbacks.
80 */
81 Datum
brinhandler(PG_FUNCTION_ARGS)82 brinhandler(PG_FUNCTION_ARGS)
83 {
84 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
85
86 amroutine->amstrategies = 0;
87 amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
88 amroutine->amcanorder = false;
89 amroutine->amcanorderbyop = false;
90 amroutine->amcanbackward = false;
91 amroutine->amcanunique = false;
92 amroutine->amcanmulticol = true;
93 amroutine->amoptionalkey = true;
94 amroutine->amsearcharray = false;
95 amroutine->amsearchnulls = true;
96 amroutine->amstorage = true;
97 amroutine->amclusterable = false;
98 amroutine->ampredlocks = false;
99 amroutine->amcanparallel = false;
100 amroutine->amkeytype = InvalidOid;
101
102 amroutine->ambuild = brinbuild;
103 amroutine->ambuildempty = brinbuildempty;
104 amroutine->aminsert = brininsert;
105 amroutine->ambulkdelete = brinbulkdelete;
106 amroutine->amvacuumcleanup = brinvacuumcleanup;
107 amroutine->amcanreturn = NULL;
108 amroutine->amcostestimate = brincostestimate;
109 amroutine->amoptions = brinoptions;
110 amroutine->amproperty = NULL;
111 amroutine->amvalidate = brinvalidate;
112 amroutine->ambeginscan = brinbeginscan;
113 amroutine->amrescan = brinrescan;
114 amroutine->amgettuple = NULL;
115 amroutine->amgetbitmap = bringetbitmap;
116 amroutine->amendscan = brinendscan;
117 amroutine->ammarkpos = NULL;
118 amroutine->amrestrpos = NULL;
119 amroutine->amestimateparallelscan = NULL;
120 amroutine->aminitparallelscan = NULL;
121 amroutine->amparallelrescan = NULL;
122
123 PG_RETURN_POINTER(amroutine);
124 }
125
126 /*
127 * A tuple in the heap is being inserted. To keep a brin index up to date,
128 * we need to obtain the relevant index tuple and compare its stored values
129 * with those of the new tuple. If the tuple values are not consistent with
130 * the summary tuple, we need to update the index tuple.
131 *
132 * If autosummarization is enabled, check if we need to summarize the previous
133 * page range.
134 *
135 * If the range is not currently summarized (i.e. the revmap returns NULL for
136 * it), there's nothing to do for this tuple.
137 */
138 bool
brininsert(Relation idxRel,Datum * values,bool * nulls,ItemPointer heaptid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)139 brininsert(Relation idxRel, Datum *values, bool *nulls,
140 ItemPointer heaptid, Relation heapRel,
141 IndexUniqueCheck checkUnique,
142 IndexInfo *indexInfo)
143 {
144 BlockNumber pagesPerRange;
145 BlockNumber origHeapBlk;
146 BlockNumber heapBlk;
147 BrinDesc *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
148 BrinRevmap *revmap;
149 Buffer buf = InvalidBuffer;
150 MemoryContext tupcxt = NULL;
151 MemoryContext oldcxt = CurrentMemoryContext;
152 bool autosummarize = BrinGetAutoSummarize(idxRel);
153
154 revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
155
156 /*
157 * origHeapBlk is the block number where the insertion occurred. heapBlk
158 * is the first block in the corresponding page range.
159 */
160 origHeapBlk = ItemPointerGetBlockNumber(heaptid);
161 heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
162
163 for (;;)
164 {
165 bool need_insert = false;
166 OffsetNumber off;
167 BrinTuple *brtup;
168 BrinMemTuple *dtup;
169 int keyno;
170
171 CHECK_FOR_INTERRUPTS();
172
173 /*
174 * If auto-summarization is enabled and we just inserted the first
175 * tuple into the first block of a new non-first page range, request a
176 * summarization run of the previous range.
177 */
178 if (autosummarize &&
179 heapBlk > 0 &&
180 heapBlk == origHeapBlk &&
181 ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
182 {
183 BlockNumber lastPageRange = heapBlk - 1;
184 BrinTuple *lastPageTuple;
185
186 lastPageTuple =
187 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
188 NULL, BUFFER_LOCK_SHARE, NULL);
189 if (!lastPageTuple)
190 {
191 bool recorded;
192
193 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
194 RelationGetRelid(idxRel),
195 lastPageRange);
196 if (!recorded)
197 ereport(LOG,
198 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
199 errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
200 RelationGetRelationName(idxRel),
201 lastPageRange)));
202 }
203 else
204 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
205 }
206
207 brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
208 NULL, BUFFER_LOCK_SHARE, NULL);
209
210 /* if range is unsummarized, there's nothing to do */
211 if (!brtup)
212 break;
213
214 /* First time through in this statement? */
215 if (bdesc == NULL)
216 {
217 MemoryContextSwitchTo(indexInfo->ii_Context);
218 bdesc = brin_build_desc(idxRel);
219 indexInfo->ii_AmCache = (void *) bdesc;
220 MemoryContextSwitchTo(oldcxt);
221 }
222 /* First time through in this brininsert call? */
223 if (tupcxt == NULL)
224 {
225 tupcxt = AllocSetContextCreate(CurrentMemoryContext,
226 "brininsert cxt",
227 ALLOCSET_DEFAULT_SIZES);
228 MemoryContextSwitchTo(tupcxt);
229 }
230
231 dtup = brin_deform_tuple(bdesc, brtup, NULL);
232
233 /*
234 * Compare the key values of the new tuple to the stored index values;
235 * our deformed tuple will get updated if the new tuple doesn't fit
236 * the original range (note this means we can't break out of the loop
237 * early). Make a note of whether this happens, so that we know to
238 * insert the modified tuple later.
239 */
240 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
241 {
242 Datum result;
243 BrinValues *bval;
244 FmgrInfo *addValue;
245
246 bval = &dtup->bt_columns[keyno];
247 addValue = index_getprocinfo(idxRel, keyno + 1,
248 BRIN_PROCNUM_ADDVALUE);
249 result = FunctionCall4Coll(addValue,
250 idxRel->rd_indcollation[keyno],
251 PointerGetDatum(bdesc),
252 PointerGetDatum(bval),
253 values[keyno],
254 nulls[keyno]);
255 /* if that returned true, we need to insert the updated tuple */
256 need_insert |= DatumGetBool(result);
257 }
258
259 if (!need_insert)
260 {
261 /*
262 * The tuple is consistent with the new values, so there's nothing
263 * to do.
264 */
265 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
266 }
267 else
268 {
269 Page page = BufferGetPage(buf);
270 ItemId lp = PageGetItemId(page, off);
271 Size origsz;
272 BrinTuple *origtup;
273 Size newsz;
274 BrinTuple *newtup;
275 bool samepage;
276
277 /*
278 * Make a copy of the old tuple, so that we can compare it after
279 * re-acquiring the lock.
280 */
281 origsz = ItemIdGetLength(lp);
282 origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
283
284 /*
285 * Before releasing the lock, check if we can attempt a same-page
286 * update. Another process could insert a tuple concurrently in
287 * the same page though, so downstream we must be prepared to cope
288 * if this turns out to not be possible after all.
289 */
290 newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
291 samepage = brin_can_do_samepage_update(buf, origsz, newsz);
292 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
293
294 /*
295 * Try to update the tuple. If this doesn't work for whatever
296 * reason, we need to restart from the top; the revmap might be
297 * pointing at a different tuple for this block now, so we need to
298 * recompute to ensure both our new heap tuple and the other
299 * inserter's are covered by the combined tuple. It might be that
300 * we don't need to update at all.
301 */
302 if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
303 buf, off, origtup, origsz, newtup, newsz,
304 samepage))
305 {
306 /* no luck; start over */
307 MemoryContextResetAndDeleteChildren(tupcxt);
308 continue;
309 }
310 }
311
312 /* success! */
313 break;
314 }
315
316 brinRevmapTerminate(revmap);
317 if (BufferIsValid(buf))
318 ReleaseBuffer(buf);
319 MemoryContextSwitchTo(oldcxt);
320 if (tupcxt != NULL)
321 MemoryContextDelete(tupcxt);
322
323 return false;
324 }
325
326 /*
327 * Initialize state for a BRIN index scan.
328 *
329 * We read the metapage here to determine the pages-per-range number that this
330 * index was built with. Note that since this cannot be changed while we're
331 * holding lock on index, it's not necessary to recompute it during brinrescan.
332 */
333 IndexScanDesc
brinbeginscan(Relation r,int nkeys,int norderbys)334 brinbeginscan(Relation r, int nkeys, int norderbys)
335 {
336 IndexScanDesc scan;
337 BrinOpaque *opaque;
338
339 scan = RelationGetIndexScan(r, nkeys, norderbys);
340
341 opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
342 opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
343 scan->xs_snapshot);
344 opaque->bo_bdesc = brin_build_desc(r);
345 scan->opaque = opaque;
346
347 return scan;
348 }
349
350 /*
351 * Execute the index scan.
352 *
353 * This works by reading index TIDs from the revmap, and obtaining the index
354 * tuples pointed to by them; the summary values in the index tuples are
355 * compared to the scan keys. We return into the TID bitmap all the pages in
356 * ranges corresponding to index tuples that match the scan keys.
357 *
358 * If a TID from the revmap is read as InvalidTID, we know that range is
359 * unsummarized. Pages in those ranges need to be returned regardless of scan
360 * keys.
361 */
362 int64
bringetbitmap(IndexScanDesc scan,TIDBitmap * tbm)363 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
364 {
365 Relation idxRel = scan->indexRelation;
366 Buffer buf = InvalidBuffer;
367 BrinDesc *bdesc;
368 Oid heapOid;
369 Relation heapRel;
370 BrinOpaque *opaque;
371 BlockNumber nblocks;
372 BlockNumber heapBlk;
373 int totalpages = 0;
374 FmgrInfo *consistentFn;
375 MemoryContext oldcxt;
376 MemoryContext perRangeCxt;
377 BrinMemTuple *dtup;
378 BrinTuple *btup = NULL;
379 Size btupsz = 0;
380
381 opaque = (BrinOpaque *) scan->opaque;
382 bdesc = opaque->bo_bdesc;
383 pgstat_count_index_scan(idxRel);
384
385 /*
386 * We need to know the size of the table so that we know how long to
387 * iterate on the revmap.
388 */
389 heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
390 heapRel = heap_open(heapOid, AccessShareLock);
391 nblocks = RelationGetNumberOfBlocks(heapRel);
392 heap_close(heapRel, AccessShareLock);
393
394 /*
395 * Make room for the consistent support procedures of indexed columns. We
396 * don't look them up here; we do that lazily the first time we see a scan
397 * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
398 */
399 consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
400
401 /* allocate an initial in-memory tuple, out of the per-range memcxt */
402 dtup = brin_new_memtuple(bdesc);
403
404 /*
405 * Setup and use a per-range memory context, which is reset every time we
406 * loop below. This avoids having to free the tuples within the loop.
407 */
408 perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
409 "bringetbitmap cxt",
410 ALLOCSET_DEFAULT_SIZES);
411 oldcxt = MemoryContextSwitchTo(perRangeCxt);
412
413 /*
414 * Now scan the revmap. We start by querying for heap page 0,
415 * incrementing by the number of pages per range; this gives us a full
416 * view of the table.
417 */
418 for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
419 {
420 bool addrange;
421 bool gottuple = false;
422 BrinTuple *tup;
423 OffsetNumber off;
424 Size size;
425
426 CHECK_FOR_INTERRUPTS();
427
428 MemoryContextResetAndDeleteChildren(perRangeCxt);
429
430 tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
431 &off, &size, BUFFER_LOCK_SHARE,
432 scan->xs_snapshot);
433 if (tup)
434 {
435 gottuple = true;
436 btup = brin_copy_tuple(tup, size, btup, &btupsz);
437 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
438 }
439
440 /*
441 * For page ranges with no indexed tuple, we must return the whole
442 * range; otherwise, compare it to the scan keys.
443 */
444 if (!gottuple)
445 {
446 addrange = true;
447 }
448 else
449 {
450 dtup = brin_deform_tuple(bdesc, btup, dtup);
451 if (dtup->bt_placeholder)
452 {
453 /*
454 * Placeholder tuples are always returned, regardless of the
455 * values stored in them.
456 */
457 addrange = true;
458 }
459 else
460 {
461 int keyno;
462
463 /*
464 * Compare scan keys with summary values stored for the range.
465 * If scan keys are matched, the page range must be added to
466 * the bitmap. We initially assume the range needs to be
467 * added; in particular this serves the case where there are
468 * no keys.
469 */
470 addrange = true;
471 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
472 {
473 ScanKey key = &scan->keyData[keyno];
474 AttrNumber keyattno = key->sk_attno;
475 BrinValues *bval = &dtup->bt_columns[keyattno - 1];
476 Datum add;
477
478 /*
479 * The collation of the scan key must match the collation
480 * used in the index column (but only if the search is not
481 * IS NULL/ IS NOT NULL). Otherwise we shouldn't be using
482 * this index ...
483 */
484 Assert((key->sk_flags & SK_ISNULL) ||
485 (key->sk_collation ==
486 bdesc->bd_tupdesc->attrs[keyattno - 1]->attcollation));
487
488 /* First time this column? look up consistent function */
489 if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
490 {
491 FmgrInfo *tmp;
492
493 tmp = index_getprocinfo(idxRel, keyattno,
494 BRIN_PROCNUM_CONSISTENT);
495 fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
496 CurrentMemoryContext);
497 }
498
499 /*
500 * Check whether the scan key is consistent with the page
501 * range values; if so, have the pages in the range added
502 * to the output bitmap.
503 *
504 * When there are multiple scan keys, failure to meet the
505 * criteria for a single one of them is enough to discard
506 * the range as a whole, so break out of the loop as soon
507 * as a false return value is obtained.
508 */
509 add = FunctionCall3Coll(&consistentFn[keyattno - 1],
510 key->sk_collation,
511 PointerGetDatum(bdesc),
512 PointerGetDatum(bval),
513 PointerGetDatum(key));
514 addrange = DatumGetBool(add);
515 if (!addrange)
516 break;
517 }
518 }
519 }
520
521 /* add the pages in the range to the output bitmap, if needed */
522 if (addrange)
523 {
524 BlockNumber pageno;
525
526 for (pageno = heapBlk;
527 pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
528 pageno++)
529 {
530 MemoryContextSwitchTo(oldcxt);
531 tbm_add_page(tbm, pageno);
532 totalpages++;
533 MemoryContextSwitchTo(perRangeCxt);
534 }
535 }
536 }
537
538 MemoryContextSwitchTo(oldcxt);
539 MemoryContextDelete(perRangeCxt);
540
541 if (buf != InvalidBuffer)
542 ReleaseBuffer(buf);
543
544 /*
545 * XXX We have an approximation of the number of *pages* that our scan
546 * returns, but we don't have a precise idea of the number of heap tuples
547 * involved.
548 */
549 return totalpages * 10;
550 }
551
552 /*
553 * Re-initialize state for a BRIN index scan
554 */
555 void
brinrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)556 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
557 ScanKey orderbys, int norderbys)
558 {
559 /*
560 * Other index AMs preprocess the scan keys at this point, or sometime
561 * early during the scan; this lets them optimize by removing redundant
562 * keys, or doing early returns when they are impossible to satisfy; see
563 * _bt_preprocess_keys for an example. Something like that could be added
564 * here someday, too.
565 */
566
567 if (scankey && scan->numberOfKeys > 0)
568 memmove(scan->keyData, scankey,
569 scan->numberOfKeys * sizeof(ScanKeyData));
570 }
571
572 /*
573 * Close down a BRIN index scan
574 */
575 void
brinendscan(IndexScanDesc scan)576 brinendscan(IndexScanDesc scan)
577 {
578 BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
579
580 brinRevmapTerminate(opaque->bo_rmAccess);
581 brin_free_desc(opaque->bo_bdesc);
582 pfree(opaque);
583 }
584
585 /*
586 * Per-heap-tuple callback for IndexBuildHeapScan.
587 *
588 * Note we don't worry about the page range at the end of the table here; it is
589 * present in the build state struct after we're called the last time, but not
590 * inserted into the index. Caller must ensure to do so, if appropriate.
591 */
592 static void
brinbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * brstate)593 brinbuildCallback(Relation index,
594 HeapTuple htup,
595 Datum *values,
596 bool *isnull,
597 bool tupleIsAlive,
598 void *brstate)
599 {
600 BrinBuildState *state = (BrinBuildState *) brstate;
601 BlockNumber thisblock;
602 int i;
603
604 thisblock = ItemPointerGetBlockNumber(&htup->t_self);
605
606 /*
607 * If we're in a block that belongs to a future range, summarize what
608 * we've got and start afresh. Note the scan might have skipped many
609 * pages, if they were devoid of live tuples; make sure to insert index
610 * tuples for those too.
611 */
612 while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
613 {
614
615 BRIN_elog((DEBUG2,
616 "brinbuildCallback: completed a range: %u--%u",
617 state->bs_currRangeStart,
618 state->bs_currRangeStart + state->bs_pagesPerRange));
619
620 /* create the index tuple and insert it */
621 form_and_insert_tuple(state);
622
623 /* set state to correspond to the next range */
624 state->bs_currRangeStart += state->bs_pagesPerRange;
625
626 /* re-initialize state for it */
627 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
628 }
629
630 /* Accumulate the current tuple into the running state */
631 for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
632 {
633 FmgrInfo *addValue;
634 BrinValues *col;
635
636 col = &state->bs_dtuple->bt_columns[i];
637 addValue = index_getprocinfo(index, i + 1,
638 BRIN_PROCNUM_ADDVALUE);
639
640 /*
641 * Update dtuple state, if and as necessary.
642 */
643 FunctionCall4Coll(addValue,
644 state->bs_bdesc->bd_tupdesc->attrs[i]->attcollation,
645 PointerGetDatum(state->bs_bdesc),
646 PointerGetDatum(col),
647 values[i], isnull[i]);
648 }
649 }
650
651 /*
652 * brinbuild() -- build a new BRIN index.
653 */
654 IndexBuildResult *
brinbuild(Relation heap,Relation index,IndexInfo * indexInfo)655 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
656 {
657 IndexBuildResult *result;
658 double reltuples;
659 double idxtuples;
660 BrinRevmap *revmap;
661 BrinBuildState *state;
662 Buffer meta;
663 BlockNumber pagesPerRange;
664
665 /*
666 * We expect to be called exactly once for any index relation.
667 */
668 if (RelationGetNumberOfBlocks(index) != 0)
669 elog(ERROR, "index \"%s\" already contains data",
670 RelationGetRelationName(index));
671
672 /*
673 * Critical section not required, because on error the creation of the
674 * whole relation will be rolled back.
675 */
676
677 meta = ReadBuffer(index, P_NEW);
678 Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
679 LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
680
681 brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
682 BRIN_CURRENT_VERSION);
683 MarkBufferDirty(meta);
684
685 if (RelationNeedsWAL(index))
686 {
687 xl_brin_createidx xlrec;
688 XLogRecPtr recptr;
689 Page page;
690
691 xlrec.version = BRIN_CURRENT_VERSION;
692 xlrec.pagesPerRange = BrinGetPagesPerRange(index);
693
694 XLogBeginInsert();
695 XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
696 XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT);
697
698 recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
699
700 page = BufferGetPage(meta);
701 PageSetLSN(page, recptr);
702 }
703
704 UnlockReleaseBuffer(meta);
705
706 /*
707 * Initialize our state, including the deformed tuple state.
708 */
709 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
710 state = initialize_brin_buildstate(index, revmap, pagesPerRange);
711
712 /*
713 * Now scan the relation. No syncscan allowed here because we want the
714 * heap blocks in physical order.
715 */
716 reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
717 brinbuildCallback, (void *) state);
718
719 /* process the final batch */
720 form_and_insert_tuple(state);
721
722 /* release resources */
723 idxtuples = state->bs_numtuples;
724 brinRevmapTerminate(state->bs_rmAccess);
725 terminate_brin_buildstate(state);
726
727 /*
728 * Return statistics
729 */
730 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
731
732 result->heap_tuples = reltuples;
733 result->index_tuples = idxtuples;
734
735 return result;
736 }
737
738 void
brinbuildempty(Relation index)739 brinbuildempty(Relation index)
740 {
741 Buffer metabuf;
742
743 /* An empty BRIN index has a metapage only. */
744 metabuf =
745 ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
746 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
747
748 /* Initialize and xlog metabuffer. */
749 START_CRIT_SECTION();
750 brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
751 BRIN_CURRENT_VERSION);
752 MarkBufferDirty(metabuf);
753 log_newpage_buffer(metabuf, false);
754 END_CRIT_SECTION();
755
756 UnlockReleaseBuffer(metabuf);
757 }
758
759 /*
760 * brinbulkdelete
761 * Since there are no per-heap-tuple index tuples in BRIN indexes,
762 * there's not a lot we can do here.
763 *
764 * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
765 * tuple is deleted), meaning the need to re-run summarization on the affected
766 * range. Would need to add an extra flag in brintuples for that.
767 */
768 IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)769 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
770 IndexBulkDeleteCallback callback, void *callback_state)
771 {
772 /* allocate stats if first time through, else re-use existing struct */
773 if (stats == NULL)
774 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
775
776 return stats;
777 }
778
779 /*
780 * This routine is in charge of "vacuuming" a BRIN index: we just summarize
781 * ranges that are currently unsummarized.
782 */
783 IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)784 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
785 {
786 Relation heapRel;
787
788 /* No-op in ANALYZE ONLY mode */
789 if (info->analyze_only)
790 return stats;
791
792 if (!stats)
793 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
794 stats->num_pages = RelationGetNumberOfBlocks(info->index);
795 /* rest of stats is initialized by zeroing */
796
797 heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
798 AccessShareLock);
799
800 brin_vacuum_scan(info->index, info->strategy);
801
802 brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
803 &stats->num_index_tuples, &stats->num_index_tuples);
804
805 heap_close(heapRel, AccessShareLock);
806
807 return stats;
808 }
809
810 /*
811 * reloptions processor for BRIN indexes
812 */
813 bytea *
brinoptions(Datum reloptions,bool validate)814 brinoptions(Datum reloptions, bool validate)
815 {
816 relopt_value *options;
817 BrinOptions *rdopts;
818 int numoptions;
819 static const relopt_parse_elt tab[] = {
820 {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
821 {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
822 };
823
824 options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
825 &numoptions);
826
827 /* if none set, we're done */
828 if (numoptions == 0)
829 return NULL;
830
831 rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
832
833 fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
834 validate, tab, lengthof(tab));
835
836 pfree(options);
837
838 return (bytea *) rdopts;
839 }
840
841 /*
842 * SQL-callable function to scan through an index and summarize all ranges
843 * that are not currently summarized.
844 */
845 Datum
brin_summarize_new_values(PG_FUNCTION_ARGS)846 brin_summarize_new_values(PG_FUNCTION_ARGS)
847 {
848 Datum relation = PG_GETARG_DATUM(0);
849
850 return DirectFunctionCall2(brin_summarize_range,
851 relation,
852 Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
853 }
854
855 /*
856 * SQL-callable function to summarize the indicated page range, if not already
857 * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
858 * unsummarized ranges are summarized.
859 */
860 Datum
brin_summarize_range(PG_FUNCTION_ARGS)861 brin_summarize_range(PG_FUNCTION_ARGS)
862 {
863 Oid indexoid = PG_GETARG_OID(0);
864 int64 heapBlk64 = PG_GETARG_INT64(1);
865 BlockNumber heapBlk;
866 Oid heapoid;
867 Relation indexRel;
868 Relation heapRel;
869 double numSummarized = 0;
870
871 if (RecoveryInProgress())
872 ereport(ERROR,
873 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
874 errmsg("recovery is in progress"),
875 errhint("BRIN control functions cannot be executed during recovery.")));
876
877 if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
878 {
879 char *blk = psprintf(INT64_FORMAT, heapBlk64);
880
881 ereport(ERROR,
882 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
883 errmsg("block number out of range: %s", blk)));
884 }
885 heapBlk = (BlockNumber) heapBlk64;
886
887 /*
888 * We must lock table before index to avoid deadlocks. However, if the
889 * passed indexoid isn't an index then IndexGetRelation() will fail.
890 * Rather than emitting a not-very-helpful error message, postpone
891 * complaining, expecting that the is-it-an-index test below will fail.
892 */
893 heapoid = IndexGetRelation(indexoid, true);
894 if (OidIsValid(heapoid))
895 heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
896 else
897 heapRel = NULL;
898
899 indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
900
901 /* Must be a BRIN index */
902 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
903 indexRel->rd_rel->relam != BRIN_AM_OID)
904 ereport(ERROR,
905 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
906 errmsg("\"%s\" is not a BRIN index",
907 RelationGetRelationName(indexRel))));
908
909 /* User must own the index (comparable to privileges needed for VACUUM) */
910 if (!pg_class_ownercheck(indexoid, GetUserId()))
911 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
912 RelationGetRelationName(indexRel));
913
914 /*
915 * Since we did the IndexGetRelation call above without any lock, it's
916 * barely possible that a race against an index drop/recreation could have
917 * netted us the wrong table. Recheck.
918 */
919 if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
920 ereport(ERROR,
921 (errcode(ERRCODE_UNDEFINED_TABLE),
922 errmsg("could not open parent table of index %s",
923 RelationGetRelationName(indexRel))));
924
925 /* OK, do it */
926 brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
927
928 relation_close(indexRel, ShareUpdateExclusiveLock);
929 relation_close(heapRel, ShareUpdateExclusiveLock);
930
931 PG_RETURN_INT32((int32) numSummarized);
932 }
933
934 /*
935 * SQL-callable interface to mark a range as no longer summarized
936 */
937 Datum
brin_desummarize_range(PG_FUNCTION_ARGS)938 brin_desummarize_range(PG_FUNCTION_ARGS)
939 {
940 Oid indexoid = PG_GETARG_OID(0);
941 int64 heapBlk64 = PG_GETARG_INT64(1);
942 BlockNumber heapBlk;
943 Oid heapoid;
944 Relation heapRel;
945 Relation indexRel;
946 bool done;
947
948 if (RecoveryInProgress())
949 ereport(ERROR,
950 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
951 errmsg("recovery is in progress"),
952 errhint("BRIN control functions cannot be executed during recovery.")));
953
954 if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
955 {
956 char *blk = psprintf(INT64_FORMAT, heapBlk64);
957
958 ereport(ERROR,
959 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
960 errmsg("block number out of range: %s", blk)));
961 }
962 heapBlk = (BlockNumber) heapBlk64;
963
964 /*
965 * We must lock table before index to avoid deadlocks. However, if the
966 * passed indexoid isn't an index then IndexGetRelation() will fail.
967 * Rather than emitting a not-very-helpful error message, postpone
968 * complaining, expecting that the is-it-an-index test below will fail.
969 */
970 heapoid = IndexGetRelation(indexoid, true);
971 if (OidIsValid(heapoid))
972 heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
973 else
974 heapRel = NULL;
975
976 indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
977
978 /* Must be a BRIN index */
979 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
980 indexRel->rd_rel->relam != BRIN_AM_OID)
981 ereport(ERROR,
982 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
983 errmsg("\"%s\" is not a BRIN index",
984 RelationGetRelationName(indexRel))));
985
986 /* User must own the index (comparable to privileges needed for VACUUM) */
987 if (!pg_class_ownercheck(indexoid, GetUserId()))
988 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
989 RelationGetRelationName(indexRel));
990
991 /*
992 * Since we did the IndexGetRelation call above without any lock, it's
993 * barely possible that a race against an index drop/recreation could have
994 * netted us the wrong table. Recheck.
995 */
996 if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
997 ereport(ERROR,
998 (errcode(ERRCODE_UNDEFINED_TABLE),
999 errmsg("could not open parent table of index %s",
1000 RelationGetRelationName(indexRel))));
1001
1002 /* the revmap does the hard work */
1003 do
1004 {
1005 done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1006 }
1007 while (!done);
1008
1009 relation_close(indexRel, ShareUpdateExclusiveLock);
1010 relation_close(heapRel, ShareUpdateExclusiveLock);
1011
1012 PG_RETURN_VOID();
1013 }
1014
1015 /*
1016 * Build a BrinDesc used to create or scan a BRIN index
1017 */
1018 BrinDesc *
brin_build_desc(Relation rel)1019 brin_build_desc(Relation rel)
1020 {
1021 BrinOpcInfo **opcinfo;
1022 BrinDesc *bdesc;
1023 TupleDesc tupdesc;
1024 int totalstored = 0;
1025 int keyno;
1026 long totalsize;
1027 MemoryContext cxt;
1028 MemoryContext oldcxt;
1029
1030 cxt = AllocSetContextCreate(CurrentMemoryContext,
1031 "brin desc cxt",
1032 ALLOCSET_SMALL_SIZES);
1033 oldcxt = MemoryContextSwitchTo(cxt);
1034 tupdesc = RelationGetDescr(rel);
1035
1036 /*
1037 * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1038 * the number of columns stored, since the number is opclass-defined.
1039 */
1040 opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1041 for (keyno = 0; keyno < tupdesc->natts; keyno++)
1042 {
1043 FmgrInfo *opcInfoFn;
1044
1045 opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1046
1047 opcinfo[keyno] = (BrinOpcInfo *)
1048 DatumGetPointer(FunctionCall1(opcInfoFn,
1049 tupdesc->attrs[keyno]->atttypid));
1050 totalstored += opcinfo[keyno]->oi_nstored;
1051 }
1052
1053 /* Allocate our result struct and fill it in */
1054 totalsize = offsetof(BrinDesc, bd_info) +
1055 sizeof(BrinOpcInfo *) * tupdesc->natts;
1056
1057 bdesc = palloc(totalsize);
1058 bdesc->bd_context = cxt;
1059 bdesc->bd_index = rel;
1060 bdesc->bd_tupdesc = tupdesc;
1061 bdesc->bd_disktdesc = NULL; /* generated lazily */
1062 bdesc->bd_totalstored = totalstored;
1063
1064 for (keyno = 0; keyno < tupdesc->natts; keyno++)
1065 bdesc->bd_info[keyno] = opcinfo[keyno];
1066 pfree(opcinfo);
1067
1068 MemoryContextSwitchTo(oldcxt);
1069
1070 return bdesc;
1071 }
1072
1073 void
brin_free_desc(BrinDesc * bdesc)1074 brin_free_desc(BrinDesc *bdesc)
1075 {
1076 /* make sure the tupdesc is still valid */
1077 Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1078 /* no need for retail pfree */
1079 MemoryContextDelete(bdesc->bd_context);
1080 }
1081
1082 /*
1083 * Fetch index's statistical data into *stats
1084 */
1085 void
brinGetStats(Relation index,BrinStatsData * stats)1086 brinGetStats(Relation index, BrinStatsData *stats)
1087 {
1088 Buffer metabuffer;
1089 Page metapage;
1090 BrinMetaPageData *metadata;
1091
1092 metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1093 LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1094 metapage = BufferGetPage(metabuffer);
1095 metadata = (BrinMetaPageData *) PageGetContents(metapage);
1096
1097 stats->pagesPerRange = metadata->pagesPerRange;
1098 stats->revmapNumPages = metadata->lastRevmapPage - 1;
1099
1100 UnlockReleaseBuffer(metabuffer);
1101 }
1102
1103 /*
1104 * Initialize a BrinBuildState appropriate to create tuples on the given index.
1105 */
1106 static BrinBuildState *
initialize_brin_buildstate(Relation idxRel,BrinRevmap * revmap,BlockNumber pagesPerRange)1107 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1108 BlockNumber pagesPerRange)
1109 {
1110 BrinBuildState *state;
1111
1112 state = palloc(sizeof(BrinBuildState));
1113
1114 state->bs_irel = idxRel;
1115 state->bs_numtuples = 0;
1116 state->bs_currentInsertBuf = InvalidBuffer;
1117 state->bs_pagesPerRange = pagesPerRange;
1118 state->bs_currRangeStart = 0;
1119 state->bs_rmAccess = revmap;
1120 state->bs_bdesc = brin_build_desc(idxRel);
1121 state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1122
1123 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1124
1125 return state;
1126 }
1127
1128 /*
1129 * Release resources associated with a BrinBuildState.
1130 */
1131 static void
terminate_brin_buildstate(BrinBuildState * state)1132 terminate_brin_buildstate(BrinBuildState *state)
1133 {
1134 /* release the last index buffer used */
1135 if (!BufferIsInvalid(state->bs_currentInsertBuf))
1136 {
1137 Page page;
1138
1139 page = BufferGetPage(state->bs_currentInsertBuf);
1140 RecordPageWithFreeSpace(state->bs_irel,
1141 BufferGetBlockNumber(state->bs_currentInsertBuf),
1142 PageGetFreeSpace(page));
1143 ReleaseBuffer(state->bs_currentInsertBuf);
1144 }
1145
1146 brin_free_desc(state->bs_bdesc);
1147 pfree(state->bs_dtuple);
1148 pfree(state);
1149 }
1150
1151 /*
1152 * On the given BRIN index, summarize the heap page range that corresponds
1153 * to the heap block number given.
1154 *
1155 * This routine can run in parallel with insertions into the heap. To avoid
1156 * missing those values from the summary tuple, we first insert a placeholder
1157 * index tuple into the index, then execute the heap scan; transactions
1158 * concurrent with the scan update the placeholder tuple. After the scan, we
1159 * union the placeholder tuple with the one computed by this routine. The
1160 * update of the index value happens in a loop, so that if somebody updates
1161 * the placeholder tuple after we read it, we detect the case and try again.
1162 * This ensures that the concurrently inserted tuples are not lost.
1163 *
1164 * A further corner case is this routine being asked to summarize the partial
1165 * range at the end of the table. heapNumBlocks is the (possibly outdated)
1166 * table size; if we notice that the requested range lies beyond that size,
1167 * we re-compute the table size after inserting the placeholder tuple, to
1168 * avoid missing pages that were appended recently.
1169 */
1170 static void
summarize_range(IndexInfo * indexInfo,BrinBuildState * state,Relation heapRel,BlockNumber heapBlk,BlockNumber heapNumBlks)1171 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1172 BlockNumber heapBlk, BlockNumber heapNumBlks)
1173 {
1174 Buffer phbuf;
1175 BrinTuple *phtup;
1176 Size phsz;
1177 OffsetNumber offset;
1178 BlockNumber scanNumBlks;
1179
1180 /*
1181 * Insert the placeholder tuple
1182 */
1183 phbuf = InvalidBuffer;
1184 phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1185 offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1186 state->bs_rmAccess, &phbuf,
1187 heapBlk, phtup, phsz);
1188
1189 /*
1190 * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1191 * cannot shrink concurrently (but it can grow).
1192 */
1193 Assert(heapBlk % state->bs_pagesPerRange == 0);
1194 if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1195 {
1196 /*
1197 * If we're asked to scan what we believe to be the final range on the
1198 * table (i.e. a range that might be partial) we need to recompute our
1199 * idea of what the latest page is after inserting the placeholder
1200 * tuple. Anyone that grows the table later will update the
1201 * placeholder tuple, so it doesn't matter that we won't scan these
1202 * pages ourselves. Careful: the table might have been extended
1203 * beyond the current range, so clamp our result.
1204 *
1205 * Fortunately, this should occur infrequently.
1206 */
1207 scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1208 state->bs_pagesPerRange);
1209 }
1210 else
1211 {
1212 /* Easy case: range is known to be complete */
1213 scanNumBlks = state->bs_pagesPerRange;
1214 }
1215
1216 /*
1217 * Execute the partial heap scan covering the heap blocks in the specified
1218 * page range, summarizing the heap tuples in it. This scan stops just
1219 * short of brinbuildCallback creating the new index entry.
1220 *
1221 * Note that it is critical we use the "any visible" mode of
1222 * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
1223 * by transactions that are still in progress, among other corner cases.
1224 */
1225 state->bs_currRangeStart = heapBlk;
1226 IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
1227 heapBlk, scanNumBlks,
1228 brinbuildCallback, (void *) state);
1229
1230 /*
1231 * Now we update the values obtained by the scan with the placeholder
1232 * tuple. We do this in a loop which only terminates if we're able to
1233 * update the placeholder tuple successfully; if we are not, this means
1234 * somebody else modified the placeholder tuple after we read it.
1235 */
1236 for (;;)
1237 {
1238 BrinTuple *newtup;
1239 Size newsize;
1240 bool didupdate;
1241 bool samepage;
1242
1243 CHECK_FOR_INTERRUPTS();
1244
1245 /*
1246 * Update the summary tuple and try to update.
1247 */
1248 newtup = brin_form_tuple(state->bs_bdesc,
1249 heapBlk, state->bs_dtuple, &newsize);
1250 samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1251 didupdate =
1252 brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1253 state->bs_rmAccess, heapBlk, phbuf, offset,
1254 phtup, phsz, newtup, newsize, samepage);
1255 brin_free_tuple(phtup);
1256 brin_free_tuple(newtup);
1257
1258 /* If the update succeeded, we're done. */
1259 if (didupdate)
1260 break;
1261
1262 /*
1263 * If the update didn't work, it might be because somebody updated the
1264 * placeholder tuple concurrently. Extract the new version, union it
1265 * with the values we have from the scan, and start over. (There are
1266 * other reasons for the update to fail, but it's simple to treat them
1267 * the same.)
1268 */
1269 phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1270 &offset, &phsz, BUFFER_LOCK_SHARE,
1271 NULL);
1272 /* the placeholder tuple must exist */
1273 if (phtup == NULL)
1274 elog(ERROR, "missing placeholder tuple");
1275 phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1276 LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1277
1278 /* merge it into the tuple from the heap scan */
1279 union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1280 }
1281
1282 ReleaseBuffer(phbuf);
1283 }
1284
1285 /*
1286 * Summarize page ranges that are not already summarized. If pageRange is
1287 * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1288 * page range containing the given heap page number is scanned.
1289 * If include_partial is true, then the partial range at the end of the table
1290 * is summarized, otherwise not.
1291 *
1292 * For each new index tuple inserted, *numSummarized (if not NULL) is
1293 * incremented; for each existing tuple, *numExisting (if not NULL) is
1294 * incremented.
1295 */
1296 static void
brinsummarize(Relation index,Relation heapRel,BlockNumber pageRange,bool include_partial,double * numSummarized,double * numExisting)1297 brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1298 bool include_partial, double *numSummarized, double *numExisting)
1299 {
1300 BrinRevmap *revmap;
1301 BrinBuildState *state = NULL;
1302 IndexInfo *indexInfo = NULL;
1303 BlockNumber heapNumBlocks;
1304 BlockNumber pagesPerRange;
1305 Buffer buf;
1306 BlockNumber startBlk;
1307
1308 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1309
1310 /* determine range of pages to process */
1311 heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1312 if (pageRange == BRIN_ALL_BLOCKRANGES)
1313 startBlk = 0;
1314 else
1315 {
1316 startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1317 heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1318 }
1319 if (startBlk > heapNumBlocks)
1320 {
1321 /* Nothing to do if start point is beyond end of table */
1322 brinRevmapTerminate(revmap);
1323 return;
1324 }
1325
1326 /*
1327 * Scan the revmap to find unsummarized items.
1328 */
1329 buf = InvalidBuffer;
1330 for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1331 {
1332 BrinTuple *tup;
1333 OffsetNumber off;
1334
1335 /*
1336 * Unless requested to summarize even a partial range, go away now if
1337 * we think the next range is partial. Caller would pass true when
1338 * it is typically run once bulk data loading is done
1339 * (brin_summarize_new_values), and false when it is typically the
1340 * result of arbitrarily-scheduled maintenance command (vacuuming).
1341 */
1342 if (!include_partial &&
1343 (startBlk + pagesPerRange > heapNumBlocks))
1344 break;
1345
1346 CHECK_FOR_INTERRUPTS();
1347
1348 tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1349 BUFFER_LOCK_SHARE, NULL);
1350 if (tup == NULL)
1351 {
1352 /* no revmap entry for this heap range. Summarize it. */
1353 if (state == NULL)
1354 {
1355 /* first time through */
1356 Assert(!indexInfo);
1357 state = initialize_brin_buildstate(index, revmap,
1358 pagesPerRange);
1359 indexInfo = BuildIndexInfo(index);
1360 }
1361 summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1362
1363 /* and re-initialize state for the next range */
1364 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1365
1366 if (numSummarized)
1367 *numSummarized += 1.0;
1368 }
1369 else
1370 {
1371 if (numExisting)
1372 *numExisting += 1.0;
1373 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1374 }
1375 }
1376
1377 if (BufferIsValid(buf))
1378 ReleaseBuffer(buf);
1379
1380 /* free resources */
1381 brinRevmapTerminate(revmap);
1382 if (state)
1383 {
1384 terminate_brin_buildstate(state);
1385 pfree(indexInfo);
1386 }
1387 }
1388
1389 /*
1390 * Given a deformed tuple in the build state, convert it into the on-disk
1391 * format and insert it into the index, making the revmap point to it.
1392 */
1393 static void
form_and_insert_tuple(BrinBuildState * state)1394 form_and_insert_tuple(BrinBuildState *state)
1395 {
1396 BrinTuple *tup;
1397 Size size;
1398
1399 tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1400 state->bs_dtuple, &size);
1401 brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1402 &state->bs_currentInsertBuf, state->bs_currRangeStart,
1403 tup, size);
1404 state->bs_numtuples++;
1405
1406 pfree(tup);
1407 }
1408
1409 /*
1410 * Given two deformed tuples, adjust the first one so that it's consistent
1411 * with the summary values in both.
1412 */
1413 static void
union_tuples(BrinDesc * bdesc,BrinMemTuple * a,BrinTuple * b)1414 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1415 {
1416 int keyno;
1417 BrinMemTuple *db;
1418 MemoryContext cxt;
1419 MemoryContext oldcxt;
1420
1421 /* Use our own memory context to avoid retail pfree */
1422 cxt = AllocSetContextCreate(CurrentMemoryContext,
1423 "brin union",
1424 ALLOCSET_DEFAULT_SIZES);
1425 oldcxt = MemoryContextSwitchTo(cxt);
1426 db = brin_deform_tuple(bdesc, b, NULL);
1427 MemoryContextSwitchTo(oldcxt);
1428
1429 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1430 {
1431 FmgrInfo *unionFn;
1432 BrinValues *col_a = &a->bt_columns[keyno];
1433 BrinValues *col_b = &db->bt_columns[keyno];
1434
1435 unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1436 BRIN_PROCNUM_UNION);
1437 FunctionCall3Coll(unionFn,
1438 bdesc->bd_index->rd_indcollation[keyno],
1439 PointerGetDatum(bdesc),
1440 PointerGetDatum(col_a),
1441 PointerGetDatum(col_b));
1442 }
1443
1444 MemoryContextDelete(cxt);
1445 }
1446
1447 /*
1448 * brin_vacuum_scan
1449 * Do a complete scan of the index during VACUUM.
1450 *
1451 * This routine scans the complete index looking for uncatalogued index pages,
1452 * i.e. those that might have been lost due to a crash after index extension
1453 * and such.
1454 */
1455 static void
brin_vacuum_scan(Relation idxrel,BufferAccessStrategy strategy)1456 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1457 {
1458 bool vacuum_fsm = false;
1459 BlockNumber blkno;
1460
1461 /*
1462 * Scan the index in physical order, and clean up any possible mess in
1463 * each page.
1464 */
1465 for (blkno = 0; blkno < RelationGetNumberOfBlocks(idxrel); blkno++)
1466 {
1467 Buffer buf;
1468
1469 CHECK_FOR_INTERRUPTS();
1470
1471 buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1472 RBM_NORMAL, strategy);
1473
1474 vacuum_fsm |= brin_page_cleanup(idxrel, buf);
1475
1476 ReleaseBuffer(buf);
1477 }
1478
1479 /*
1480 * If we made any change to the FSM, make sure the new info is visible all
1481 * the way to the top.
1482 */
1483 if (vacuum_fsm)
1484 FreeSpaceMapVacuum(idxrel);
1485 }
1486