1 /*
2 * brin.c
3 * Implementation of BRIN indexes for Postgres
4 *
5 * See src/backend/access/brin/README for details.
6 *
7 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/brin/brin.c
12 *
13 * TODO
14 * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 */
16 #include "postgres.h"
17
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/relation.h"
23 #include "access/reloptions.h"
24 #include "access/relscan.h"
25 #include "access/table.h"
26 #include "access/tableam.h"
27 #include "access/xloginsert.h"
28 #include "catalog/index.h"
29 #include "catalog/pg_am.h"
30 #include "commands/vacuum.h"
31 #include "miscadmin.h"
32 #include "pgstat.h"
33 #include "postmaster/autovacuum.h"
34 #include "storage/bufmgr.h"
35 #include "storage/freespace.h"
36 #include "utils/acl.h"
37 #include "utils/builtins.h"
38 #include "utils/index_selfuncs.h"
39 #include "utils/memutils.h"
40 #include "utils/rel.h"
41
42
43 /*
44 * We use a BrinBuildState during initial construction of a BRIN index.
45 * The running state is kept in a BrinMemTuple.
46 */
47 typedef struct BrinBuildState
48 {
49 Relation bs_irel;
50 int bs_numtuples;
51 Buffer bs_currentInsertBuf;
52 BlockNumber bs_pagesPerRange;
53 BlockNumber bs_currRangeStart;
54 BrinRevmap *bs_rmAccess;
55 BrinDesc *bs_bdesc;
56 BrinMemTuple *bs_dtuple;
57 } BrinBuildState;
58
59 /*
60 * Struct used as "opaque" during index scans
61 */
62 typedef struct BrinOpaque
63 {
64 BlockNumber bo_pagesPerRange;
65 BrinRevmap *bo_rmAccess;
66 BrinDesc *bo_bdesc;
67 } BrinOpaque;
68
69 #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
70
71 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
72 BrinRevmap *revmap, BlockNumber pagesPerRange);
73 static void terminate_brin_buildstate(BrinBuildState *state);
74 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
75 bool include_partial, double *numSummarized, double *numExisting);
76 static void form_and_insert_tuple(BrinBuildState *state);
77 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
78 BrinTuple *b);
79 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
80
81
82 /*
83 * BRIN handler function: return IndexAmRoutine with access method parameters
84 * and callbacks.
85 */
86 Datum
brinhandler(PG_FUNCTION_ARGS)87 brinhandler(PG_FUNCTION_ARGS)
88 {
89 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
90
91 amroutine->amstrategies = 0;
92 amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
93 amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
94 amroutine->amcanorder = false;
95 amroutine->amcanorderbyop = false;
96 amroutine->amcanbackward = false;
97 amroutine->amcanunique = false;
98 amroutine->amcanmulticol = true;
99 amroutine->amoptionalkey = true;
100 amroutine->amsearcharray = false;
101 amroutine->amsearchnulls = true;
102 amroutine->amstorage = true;
103 amroutine->amclusterable = false;
104 amroutine->ampredlocks = false;
105 amroutine->amcanparallel = false;
106 amroutine->amcaninclude = false;
107 amroutine->amusemaintenanceworkmem = false;
108 amroutine->amparallelvacuumoptions =
109 VACUUM_OPTION_PARALLEL_CLEANUP;
110 amroutine->amkeytype = InvalidOid;
111
112 amroutine->ambuild = brinbuild;
113 amroutine->ambuildempty = brinbuildempty;
114 amroutine->aminsert = brininsert;
115 amroutine->ambulkdelete = brinbulkdelete;
116 amroutine->amvacuumcleanup = brinvacuumcleanup;
117 amroutine->amcanreturn = NULL;
118 amroutine->amcostestimate = brincostestimate;
119 amroutine->amoptions = brinoptions;
120 amroutine->amproperty = NULL;
121 amroutine->ambuildphasename = NULL;
122 amroutine->amvalidate = brinvalidate;
123 amroutine->ambeginscan = brinbeginscan;
124 amroutine->amrescan = brinrescan;
125 amroutine->amgettuple = NULL;
126 amroutine->amgetbitmap = bringetbitmap;
127 amroutine->amendscan = brinendscan;
128 amroutine->ammarkpos = NULL;
129 amroutine->amrestrpos = NULL;
130 amroutine->amestimateparallelscan = NULL;
131 amroutine->aminitparallelscan = NULL;
132 amroutine->amparallelrescan = NULL;
133
134 PG_RETURN_POINTER(amroutine);
135 }
136
137 /*
138 * A tuple in the heap is being inserted. To keep a brin index up to date,
139 * we need to obtain the relevant index tuple and compare its stored values
140 * with those of the new tuple. If the tuple values are not consistent with
141 * the summary tuple, we need to update the index tuple.
142 *
143 * If autosummarization is enabled, check if we need to summarize the previous
144 * page range.
145 *
146 * If the range is not currently summarized (i.e. the revmap returns NULL for
147 * it), there's nothing to do for this tuple.
148 */
149 bool
brininsert(Relation idxRel,Datum * values,bool * nulls,ItemPointer heaptid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)150 brininsert(Relation idxRel, Datum *values, bool *nulls,
151 ItemPointer heaptid, Relation heapRel,
152 IndexUniqueCheck checkUnique,
153 IndexInfo *indexInfo)
154 {
155 BlockNumber pagesPerRange;
156 BlockNumber origHeapBlk;
157 BlockNumber heapBlk;
158 BrinDesc *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
159 BrinRevmap *revmap;
160 Buffer buf = InvalidBuffer;
161 MemoryContext tupcxt = NULL;
162 MemoryContext oldcxt = CurrentMemoryContext;
163 bool autosummarize = BrinGetAutoSummarize(idxRel);
164
165 revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
166
167 /*
168 * origHeapBlk is the block number where the insertion occurred. heapBlk
169 * is the first block in the corresponding page range.
170 */
171 origHeapBlk = ItemPointerGetBlockNumber(heaptid);
172 heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
173
174 for (;;)
175 {
176 bool need_insert = false;
177 OffsetNumber off;
178 BrinTuple *brtup;
179 BrinMemTuple *dtup;
180 int keyno;
181
182 CHECK_FOR_INTERRUPTS();
183
184 /*
185 * If auto-summarization is enabled and we just inserted the first
186 * tuple into the first block of a new non-first page range, request a
187 * summarization run of the previous range.
188 */
189 if (autosummarize &&
190 heapBlk > 0 &&
191 heapBlk == origHeapBlk &&
192 ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
193 {
194 BlockNumber lastPageRange = heapBlk - 1;
195 BrinTuple *lastPageTuple;
196
197 lastPageTuple =
198 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
199 NULL, BUFFER_LOCK_SHARE, NULL);
200 if (!lastPageTuple)
201 {
202 bool recorded;
203
204 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
205 RelationGetRelid(idxRel),
206 lastPageRange);
207 if (!recorded)
208 ereport(LOG,
209 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
210 errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
211 RelationGetRelationName(idxRel),
212 lastPageRange)));
213 }
214 else
215 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
216 }
217
218 brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
219 NULL, BUFFER_LOCK_SHARE, NULL);
220
221 /* if range is unsummarized, there's nothing to do */
222 if (!brtup)
223 break;
224
225 /* First time through in this statement? */
226 if (bdesc == NULL)
227 {
228 MemoryContextSwitchTo(indexInfo->ii_Context);
229 bdesc = brin_build_desc(idxRel);
230 indexInfo->ii_AmCache = (void *) bdesc;
231 MemoryContextSwitchTo(oldcxt);
232 }
233 /* First time through in this brininsert call? */
234 if (tupcxt == NULL)
235 {
236 tupcxt = AllocSetContextCreate(CurrentMemoryContext,
237 "brininsert cxt",
238 ALLOCSET_DEFAULT_SIZES);
239 MemoryContextSwitchTo(tupcxt);
240 }
241
242 dtup = brin_deform_tuple(bdesc, brtup, NULL);
243
244 /*
245 * Compare the key values of the new tuple to the stored index values;
246 * our deformed tuple will get updated if the new tuple doesn't fit
247 * the original range (note this means we can't break out of the loop
248 * early). Make a note of whether this happens, so that we know to
249 * insert the modified tuple later.
250 */
251 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
252 {
253 Datum result;
254 BrinValues *bval;
255 FmgrInfo *addValue;
256
257 bval = &dtup->bt_columns[keyno];
258 addValue = index_getprocinfo(idxRel, keyno + 1,
259 BRIN_PROCNUM_ADDVALUE);
260 result = FunctionCall4Coll(addValue,
261 idxRel->rd_indcollation[keyno],
262 PointerGetDatum(bdesc),
263 PointerGetDatum(bval),
264 values[keyno],
265 nulls[keyno]);
266 /* if that returned true, we need to insert the updated tuple */
267 need_insert |= DatumGetBool(result);
268 }
269
270 if (!need_insert)
271 {
272 /*
273 * The tuple is consistent with the new values, so there's nothing
274 * to do.
275 */
276 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
277 }
278 else
279 {
280 Page page = BufferGetPage(buf);
281 ItemId lp = PageGetItemId(page, off);
282 Size origsz;
283 BrinTuple *origtup;
284 Size newsz;
285 BrinTuple *newtup;
286 bool samepage;
287
288 /*
289 * Make a copy of the old tuple, so that we can compare it after
290 * re-acquiring the lock.
291 */
292 origsz = ItemIdGetLength(lp);
293 origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
294
295 /*
296 * Before releasing the lock, check if we can attempt a same-page
297 * update. Another process could insert a tuple concurrently in
298 * the same page though, so downstream we must be prepared to cope
299 * if this turns out to not be possible after all.
300 */
301 newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
302 samepage = brin_can_do_samepage_update(buf, origsz, newsz);
303 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
304
305 /*
306 * Try to update the tuple. If this doesn't work for whatever
307 * reason, we need to restart from the top; the revmap might be
308 * pointing at a different tuple for this block now, so we need to
309 * recompute to ensure both our new heap tuple and the other
310 * inserter's are covered by the combined tuple. It might be that
311 * we don't need to update at all.
312 */
313 if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
314 buf, off, origtup, origsz, newtup, newsz,
315 samepage))
316 {
317 /* no luck; start over */
318 MemoryContextResetAndDeleteChildren(tupcxt);
319 continue;
320 }
321 }
322
323 /* success! */
324 break;
325 }
326
327 brinRevmapTerminate(revmap);
328 if (BufferIsValid(buf))
329 ReleaseBuffer(buf);
330 MemoryContextSwitchTo(oldcxt);
331 if (tupcxt != NULL)
332 MemoryContextDelete(tupcxt);
333
334 return false;
335 }
336
337 /*
338 * Initialize state for a BRIN index scan.
339 *
340 * We read the metapage here to determine the pages-per-range number that this
341 * index was built with. Note that since this cannot be changed while we're
342 * holding lock on index, it's not necessary to recompute it during brinrescan.
343 */
344 IndexScanDesc
brinbeginscan(Relation r,int nkeys,int norderbys)345 brinbeginscan(Relation r, int nkeys, int norderbys)
346 {
347 IndexScanDesc scan;
348 BrinOpaque *opaque;
349
350 scan = RelationGetIndexScan(r, nkeys, norderbys);
351
352 opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
353 opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
354 scan->xs_snapshot);
355 opaque->bo_bdesc = brin_build_desc(r);
356 scan->opaque = opaque;
357
358 return scan;
359 }
360
361 /*
362 * Execute the index scan.
363 *
364 * This works by reading index TIDs from the revmap, and obtaining the index
365 * tuples pointed to by them; the summary values in the index tuples are
366 * compared to the scan keys. We return into the TID bitmap all the pages in
367 * ranges corresponding to index tuples that match the scan keys.
368 *
369 * If a TID from the revmap is read as InvalidTID, we know that range is
370 * unsummarized. Pages in those ranges need to be returned regardless of scan
371 * keys.
372 */
373 int64
bringetbitmap(IndexScanDesc scan,TIDBitmap * tbm)374 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
375 {
376 Relation idxRel = scan->indexRelation;
377 Buffer buf = InvalidBuffer;
378 BrinDesc *bdesc;
379 Oid heapOid;
380 Relation heapRel;
381 BrinOpaque *opaque;
382 BlockNumber nblocks;
383 BlockNumber heapBlk;
384 int totalpages = 0;
385 FmgrInfo *consistentFn;
386 MemoryContext oldcxt;
387 MemoryContext perRangeCxt;
388 BrinMemTuple *dtup;
389 BrinTuple *btup = NULL;
390 Size btupsz = 0;
391
392 opaque = (BrinOpaque *) scan->opaque;
393 bdesc = opaque->bo_bdesc;
394 pgstat_count_index_scan(idxRel);
395
396 /*
397 * We need to know the size of the table so that we know how long to
398 * iterate on the revmap.
399 */
400 heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
401 heapRel = table_open(heapOid, AccessShareLock);
402 nblocks = RelationGetNumberOfBlocks(heapRel);
403 table_close(heapRel, AccessShareLock);
404
405 /*
406 * Make room for the consistent support procedures of indexed columns. We
407 * don't look them up here; we do that lazily the first time we see a scan
408 * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
409 */
410 consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
411
412 /* allocate an initial in-memory tuple, out of the per-range memcxt */
413 dtup = brin_new_memtuple(bdesc);
414
415 /*
416 * Setup and use a per-range memory context, which is reset every time we
417 * loop below. This avoids having to free the tuples within the loop.
418 */
419 perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
420 "bringetbitmap cxt",
421 ALLOCSET_DEFAULT_SIZES);
422 oldcxt = MemoryContextSwitchTo(perRangeCxt);
423
424 /*
425 * Now scan the revmap. We start by querying for heap page 0,
426 * incrementing by the number of pages per range; this gives us a full
427 * view of the table.
428 */
429 for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
430 {
431 bool addrange;
432 bool gottuple = false;
433 BrinTuple *tup;
434 OffsetNumber off;
435 Size size;
436
437 CHECK_FOR_INTERRUPTS();
438
439 MemoryContextResetAndDeleteChildren(perRangeCxt);
440
441 tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
442 &off, &size, BUFFER_LOCK_SHARE,
443 scan->xs_snapshot);
444 if (tup)
445 {
446 gottuple = true;
447 btup = brin_copy_tuple(tup, size, btup, &btupsz);
448 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
449 }
450
451 /*
452 * For page ranges with no indexed tuple, we must return the whole
453 * range; otherwise, compare it to the scan keys.
454 */
455 if (!gottuple)
456 {
457 addrange = true;
458 }
459 else
460 {
461 dtup = brin_deform_tuple(bdesc, btup, dtup);
462 if (dtup->bt_placeholder)
463 {
464 /*
465 * Placeholder tuples are always returned, regardless of the
466 * values stored in them.
467 */
468 addrange = true;
469 }
470 else
471 {
472 int keyno;
473
474 /*
475 * Compare scan keys with summary values stored for the range.
476 * If scan keys are matched, the page range must be added to
477 * the bitmap. We initially assume the range needs to be
478 * added; in particular this serves the case where there are
479 * no keys.
480 */
481 addrange = true;
482 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
483 {
484 ScanKey key = &scan->keyData[keyno];
485 AttrNumber keyattno = key->sk_attno;
486 BrinValues *bval = &dtup->bt_columns[keyattno - 1];
487 Datum add;
488
489 /*
490 * The collation of the scan key must match the collation
491 * used in the index column (but only if the search is not
492 * IS NULL/ IS NOT NULL). Otherwise we shouldn't be using
493 * this index ...
494 */
495 Assert((key->sk_flags & SK_ISNULL) ||
496 (key->sk_collation ==
497 TupleDescAttr(bdesc->bd_tupdesc,
498 keyattno - 1)->attcollation));
499
500 /* First time this column? look up consistent function */
501 if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
502 {
503 FmgrInfo *tmp;
504
505 tmp = index_getprocinfo(idxRel, keyattno,
506 BRIN_PROCNUM_CONSISTENT);
507 fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
508 CurrentMemoryContext);
509 }
510
511 /*
512 * Check whether the scan key is consistent with the page
513 * range values; if so, have the pages in the range added
514 * to the output bitmap.
515 *
516 * When there are multiple scan keys, failure to meet the
517 * criteria for a single one of them is enough to discard
518 * the range as a whole, so break out of the loop as soon
519 * as a false return value is obtained.
520 */
521 add = FunctionCall3Coll(&consistentFn[keyattno - 1],
522 key->sk_collation,
523 PointerGetDatum(bdesc),
524 PointerGetDatum(bval),
525 PointerGetDatum(key));
526 addrange = DatumGetBool(add);
527 if (!addrange)
528 break;
529 }
530 }
531 }
532
533 /* add the pages in the range to the output bitmap, if needed */
534 if (addrange)
535 {
536 BlockNumber pageno;
537
538 for (pageno = heapBlk;
539 pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
540 pageno++)
541 {
542 MemoryContextSwitchTo(oldcxt);
543 tbm_add_page(tbm, pageno);
544 totalpages++;
545 MemoryContextSwitchTo(perRangeCxt);
546 }
547 }
548 }
549
550 MemoryContextSwitchTo(oldcxt);
551 MemoryContextDelete(perRangeCxt);
552
553 if (buf != InvalidBuffer)
554 ReleaseBuffer(buf);
555
556 /*
557 * XXX We have an approximation of the number of *pages* that our scan
558 * returns, but we don't have a precise idea of the number of heap tuples
559 * involved.
560 */
561 return totalpages * 10;
562 }
563
564 /*
565 * Re-initialize state for a BRIN index scan
566 */
567 void
brinrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)568 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
569 ScanKey orderbys, int norderbys)
570 {
571 /*
572 * Other index AMs preprocess the scan keys at this point, or sometime
573 * early during the scan; this lets them optimize by removing redundant
574 * keys, or doing early returns when they are impossible to satisfy; see
575 * _bt_preprocess_keys for an example. Something like that could be added
576 * here someday, too.
577 */
578
579 if (scankey && scan->numberOfKeys > 0)
580 memmove(scan->keyData, scankey,
581 scan->numberOfKeys * sizeof(ScanKeyData));
582 }
583
584 /*
585 * Close down a BRIN index scan
586 */
587 void
brinendscan(IndexScanDesc scan)588 brinendscan(IndexScanDesc scan)
589 {
590 BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
591
592 brinRevmapTerminate(opaque->bo_rmAccess);
593 brin_free_desc(opaque->bo_bdesc);
594 pfree(opaque);
595 }
596
597 /*
598 * Per-heap-tuple callback for table_index_build_scan.
599 *
600 * Note we don't worry about the page range at the end of the table here; it is
601 * present in the build state struct after we're called the last time, but not
602 * inserted into the index. Caller must ensure to do so, if appropriate.
603 */
604 static void
brinbuildCallback(Relation index,ItemPointer tid,Datum * values,bool * isnull,bool tupleIsAlive,void * brstate)605 brinbuildCallback(Relation index,
606 ItemPointer tid,
607 Datum *values,
608 bool *isnull,
609 bool tupleIsAlive,
610 void *brstate)
611 {
612 BrinBuildState *state = (BrinBuildState *) brstate;
613 BlockNumber thisblock;
614 int i;
615
616 thisblock = ItemPointerGetBlockNumber(tid);
617
618 /*
619 * If we're in a block that belongs to a future range, summarize what
620 * we've got and start afresh. Note the scan might have skipped many
621 * pages, if they were devoid of live tuples; make sure to insert index
622 * tuples for those too.
623 */
624 while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
625 {
626
627 BRIN_elog((DEBUG2,
628 "brinbuildCallback: completed a range: %u--%u",
629 state->bs_currRangeStart,
630 state->bs_currRangeStart + state->bs_pagesPerRange));
631
632 /* create the index tuple and insert it */
633 form_and_insert_tuple(state);
634
635 /* set state to correspond to the next range */
636 state->bs_currRangeStart += state->bs_pagesPerRange;
637
638 /* re-initialize state for it */
639 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
640 }
641
642 /* Accumulate the current tuple into the running state */
643 for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
644 {
645 FmgrInfo *addValue;
646 BrinValues *col;
647 Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
648
649 col = &state->bs_dtuple->bt_columns[i];
650 addValue = index_getprocinfo(index, i + 1,
651 BRIN_PROCNUM_ADDVALUE);
652
653 /*
654 * Update dtuple state, if and as necessary.
655 */
656 FunctionCall4Coll(addValue,
657 attr->attcollation,
658 PointerGetDatum(state->bs_bdesc),
659 PointerGetDatum(col),
660 values[i], isnull[i]);
661 }
662 }
663
664 /*
665 * brinbuild() -- build a new BRIN index.
666 */
667 IndexBuildResult *
brinbuild(Relation heap,Relation index,IndexInfo * indexInfo)668 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
669 {
670 IndexBuildResult *result;
671 double reltuples;
672 double idxtuples;
673 BrinRevmap *revmap;
674 BrinBuildState *state;
675 Buffer meta;
676 BlockNumber pagesPerRange;
677
678 /*
679 * We expect to be called exactly once for any index relation.
680 */
681 if (RelationGetNumberOfBlocks(index) != 0)
682 elog(ERROR, "index \"%s\" already contains data",
683 RelationGetRelationName(index));
684
685 /*
686 * Critical section not required, because on error the creation of the
687 * whole relation will be rolled back.
688 */
689
690 meta = ReadBuffer(index, P_NEW);
691 Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
692 LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
693
694 brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
695 BRIN_CURRENT_VERSION);
696 MarkBufferDirty(meta);
697
698 if (RelationNeedsWAL(index))
699 {
700 xl_brin_createidx xlrec;
701 XLogRecPtr recptr;
702 Page page;
703
704 xlrec.version = BRIN_CURRENT_VERSION;
705 xlrec.pagesPerRange = BrinGetPagesPerRange(index);
706
707 XLogBeginInsert();
708 XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
709 XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
710
711 recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
712
713 page = BufferGetPage(meta);
714 PageSetLSN(page, recptr);
715 }
716
717 UnlockReleaseBuffer(meta);
718
719 /*
720 * Initialize our state, including the deformed tuple state.
721 */
722 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
723 state = initialize_brin_buildstate(index, revmap, pagesPerRange);
724
725 /*
726 * Now scan the relation. No syncscan allowed here because we want the
727 * heap blocks in physical order.
728 */
729 reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
730 brinbuildCallback, (void *) state, NULL);
731
732 /* process the final batch */
733 form_and_insert_tuple(state);
734
735 /* release resources */
736 idxtuples = state->bs_numtuples;
737 brinRevmapTerminate(state->bs_rmAccess);
738 terminate_brin_buildstate(state);
739
740 /*
741 * Return statistics
742 */
743 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
744
745 result->heap_tuples = reltuples;
746 result->index_tuples = idxtuples;
747
748 return result;
749 }
750
751 void
brinbuildempty(Relation index)752 brinbuildempty(Relation index)
753 {
754 Buffer metabuf;
755
756 /* An empty BRIN index has a metapage only. */
757 metabuf =
758 ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
759 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
760
761 /* Initialize and xlog metabuffer. */
762 START_CRIT_SECTION();
763 brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
764 BRIN_CURRENT_VERSION);
765 MarkBufferDirty(metabuf);
766 log_newpage_buffer(metabuf, true);
767 END_CRIT_SECTION();
768
769 UnlockReleaseBuffer(metabuf);
770 }
771
772 /*
773 * brinbulkdelete
774 * Since there are no per-heap-tuple index tuples in BRIN indexes,
775 * there's not a lot we can do here.
776 *
777 * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
778 * tuple is deleted), meaning the need to re-run summarization on the affected
779 * range. Would need to add an extra flag in brintuples for that.
780 */
781 IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)782 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
783 IndexBulkDeleteCallback callback, void *callback_state)
784 {
785 /* allocate stats if first time through, else re-use existing struct */
786 if (stats == NULL)
787 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
788
789 return stats;
790 }
791
792 /*
793 * This routine is in charge of "vacuuming" a BRIN index: we just summarize
794 * ranges that are currently unsummarized.
795 */
796 IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)797 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
798 {
799 Relation heapRel;
800
801 /* No-op in ANALYZE ONLY mode */
802 if (info->analyze_only)
803 return stats;
804
805 if (!stats)
806 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
807 stats->num_pages = RelationGetNumberOfBlocks(info->index);
808 /* rest of stats is initialized by zeroing */
809
810 heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
811 AccessShareLock);
812
813 brin_vacuum_scan(info->index, info->strategy);
814
815 brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
816 &stats->num_index_tuples, &stats->num_index_tuples);
817
818 table_close(heapRel, AccessShareLock);
819
820 return stats;
821 }
822
823 /*
824 * reloptions processor for BRIN indexes
825 */
826 bytea *
brinoptions(Datum reloptions,bool validate)827 brinoptions(Datum reloptions, bool validate)
828 {
829 static const relopt_parse_elt tab[] = {
830 {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
831 {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
832 };
833
834 return (bytea *) build_reloptions(reloptions, validate,
835 RELOPT_KIND_BRIN,
836 sizeof(BrinOptions),
837 tab, lengthof(tab));
838 }
839
840 /*
841 * SQL-callable function to scan through an index and summarize all ranges
842 * that are not currently summarized.
843 */
844 Datum
brin_summarize_new_values(PG_FUNCTION_ARGS)845 brin_summarize_new_values(PG_FUNCTION_ARGS)
846 {
847 Datum relation = PG_GETARG_DATUM(0);
848
849 return DirectFunctionCall2(brin_summarize_range,
850 relation,
851 Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
852 }
853
854 /*
855 * SQL-callable function to summarize the indicated page range, if not already
856 * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
857 * unsummarized ranges are summarized.
858 */
859 Datum
brin_summarize_range(PG_FUNCTION_ARGS)860 brin_summarize_range(PG_FUNCTION_ARGS)
861 {
862 Oid indexoid = PG_GETARG_OID(0);
863 int64 heapBlk64 = PG_GETARG_INT64(1);
864 BlockNumber heapBlk;
865 Oid heapoid;
866 Relation indexRel;
867 Relation heapRel;
868 double numSummarized = 0;
869
870 if (RecoveryInProgress())
871 ereport(ERROR,
872 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
873 errmsg("recovery is in progress"),
874 errhint("BRIN control functions cannot be executed during recovery.")));
875
876 if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
877 {
878 char *blk = psprintf(INT64_FORMAT, heapBlk64);
879
880 ereport(ERROR,
881 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
882 errmsg("block number out of range: %s", blk)));
883 }
884 heapBlk = (BlockNumber) heapBlk64;
885
886 /*
887 * We must lock table before index to avoid deadlocks. However, if the
888 * passed indexoid isn't an index then IndexGetRelation() will fail.
889 * Rather than emitting a not-very-helpful error message, postpone
890 * complaining, expecting that the is-it-an-index test below will fail.
891 */
892 heapoid = IndexGetRelation(indexoid, true);
893 if (OidIsValid(heapoid))
894 heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
895 else
896 heapRel = NULL;
897
898 indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
899
900 /* Must be a BRIN index */
901 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
902 indexRel->rd_rel->relam != BRIN_AM_OID)
903 ereport(ERROR,
904 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
905 errmsg("\"%s\" is not a BRIN index",
906 RelationGetRelationName(indexRel))));
907
908 /* User must own the index (comparable to privileges needed for VACUUM) */
909 if (!pg_class_ownercheck(indexoid, GetUserId()))
910 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
911 RelationGetRelationName(indexRel));
912
913 /*
914 * Since we did the IndexGetRelation call above without any lock, it's
915 * barely possible that a race against an index drop/recreation could have
916 * netted us the wrong table. Recheck.
917 */
918 if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
919 ereport(ERROR,
920 (errcode(ERRCODE_UNDEFINED_TABLE),
921 errmsg("could not open parent table of index %s",
922 RelationGetRelationName(indexRel))));
923
924 /* OK, do it */
925 brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
926
927 relation_close(indexRel, ShareUpdateExclusiveLock);
928 relation_close(heapRel, ShareUpdateExclusiveLock);
929
930 PG_RETURN_INT32((int32) numSummarized);
931 }
932
933 /*
934 * SQL-callable interface to mark a range as no longer summarized
935 */
936 Datum
brin_desummarize_range(PG_FUNCTION_ARGS)937 brin_desummarize_range(PG_FUNCTION_ARGS)
938 {
939 Oid indexoid = PG_GETARG_OID(0);
940 int64 heapBlk64 = PG_GETARG_INT64(1);
941 BlockNumber heapBlk;
942 Oid heapoid;
943 Relation heapRel;
944 Relation indexRel;
945 bool done;
946
947 if (RecoveryInProgress())
948 ereport(ERROR,
949 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
950 errmsg("recovery is in progress"),
951 errhint("BRIN control functions cannot be executed during recovery.")));
952
953 if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
954 {
955 char *blk = psprintf(INT64_FORMAT, heapBlk64);
956
957 ereport(ERROR,
958 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
959 errmsg("block number out of range: %s", blk)));
960 }
961 heapBlk = (BlockNumber) heapBlk64;
962
963 /*
964 * We must lock table before index to avoid deadlocks. However, if the
965 * passed indexoid isn't an index then IndexGetRelation() will fail.
966 * Rather than emitting a not-very-helpful error message, postpone
967 * complaining, expecting that the is-it-an-index test below will fail.
968 */
969 heapoid = IndexGetRelation(indexoid, true);
970 if (OidIsValid(heapoid))
971 heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
972 else
973 heapRel = NULL;
974
975 indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
976
977 /* Must be a BRIN index */
978 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
979 indexRel->rd_rel->relam != BRIN_AM_OID)
980 ereport(ERROR,
981 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
982 errmsg("\"%s\" is not a BRIN index",
983 RelationGetRelationName(indexRel))));
984
985 /* User must own the index (comparable to privileges needed for VACUUM) */
986 if (!pg_class_ownercheck(indexoid, GetUserId()))
987 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
988 RelationGetRelationName(indexRel));
989
990 /*
991 * Since we did the IndexGetRelation call above without any lock, it's
992 * barely possible that a race against an index drop/recreation could have
993 * netted us the wrong table. Recheck.
994 */
995 if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
996 ereport(ERROR,
997 (errcode(ERRCODE_UNDEFINED_TABLE),
998 errmsg("could not open parent table of index %s",
999 RelationGetRelationName(indexRel))));
1000
1001 /* the revmap does the hard work */
1002 do
1003 {
1004 done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1005 }
1006 while (!done);
1007
1008 relation_close(indexRel, ShareUpdateExclusiveLock);
1009 relation_close(heapRel, ShareUpdateExclusiveLock);
1010
1011 PG_RETURN_VOID();
1012 }
1013
1014 /*
1015 * Build a BrinDesc used to create or scan a BRIN index
1016 */
1017 BrinDesc *
brin_build_desc(Relation rel)1018 brin_build_desc(Relation rel)
1019 {
1020 BrinOpcInfo **opcinfo;
1021 BrinDesc *bdesc;
1022 TupleDesc tupdesc;
1023 int totalstored = 0;
1024 int keyno;
1025 long totalsize;
1026 MemoryContext cxt;
1027 MemoryContext oldcxt;
1028
1029 cxt = AllocSetContextCreate(CurrentMemoryContext,
1030 "brin desc cxt",
1031 ALLOCSET_SMALL_SIZES);
1032 oldcxt = MemoryContextSwitchTo(cxt);
1033 tupdesc = RelationGetDescr(rel);
1034
1035 /*
1036 * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1037 * the number of columns stored, since the number is opclass-defined.
1038 */
1039 opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1040 for (keyno = 0; keyno < tupdesc->natts; keyno++)
1041 {
1042 FmgrInfo *opcInfoFn;
1043 Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1044
1045 opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1046
1047 opcinfo[keyno] = (BrinOpcInfo *)
1048 DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1049 totalstored += opcinfo[keyno]->oi_nstored;
1050 }
1051
1052 /* Allocate our result struct and fill it in */
1053 totalsize = offsetof(BrinDesc, bd_info) +
1054 sizeof(BrinOpcInfo *) * tupdesc->natts;
1055
1056 bdesc = palloc(totalsize);
1057 bdesc->bd_context = cxt;
1058 bdesc->bd_index = rel;
1059 bdesc->bd_tupdesc = tupdesc;
1060 bdesc->bd_disktdesc = NULL; /* generated lazily */
1061 bdesc->bd_totalstored = totalstored;
1062
1063 for (keyno = 0; keyno < tupdesc->natts; keyno++)
1064 bdesc->bd_info[keyno] = opcinfo[keyno];
1065 pfree(opcinfo);
1066
1067 MemoryContextSwitchTo(oldcxt);
1068
1069 return bdesc;
1070 }
1071
1072 void
brin_free_desc(BrinDesc * bdesc)1073 brin_free_desc(BrinDesc *bdesc)
1074 {
1075 /* make sure the tupdesc is still valid */
1076 Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1077 /* no need for retail pfree */
1078 MemoryContextDelete(bdesc->bd_context);
1079 }
1080
1081 /*
1082 * Fetch index's statistical data into *stats
1083 */
1084 void
brinGetStats(Relation index,BrinStatsData * stats)1085 brinGetStats(Relation index, BrinStatsData *stats)
1086 {
1087 Buffer metabuffer;
1088 Page metapage;
1089 BrinMetaPageData *metadata;
1090
1091 metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1092 LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1093 metapage = BufferGetPage(metabuffer);
1094 metadata = (BrinMetaPageData *) PageGetContents(metapage);
1095
1096 stats->pagesPerRange = metadata->pagesPerRange;
1097 stats->revmapNumPages = metadata->lastRevmapPage - 1;
1098
1099 UnlockReleaseBuffer(metabuffer);
1100 }
1101
1102 /*
1103 * Initialize a BrinBuildState appropriate to create tuples on the given index.
1104 */
1105 static BrinBuildState *
initialize_brin_buildstate(Relation idxRel,BrinRevmap * revmap,BlockNumber pagesPerRange)1106 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1107 BlockNumber pagesPerRange)
1108 {
1109 BrinBuildState *state;
1110
1111 state = palloc(sizeof(BrinBuildState));
1112
1113 state->bs_irel = idxRel;
1114 state->bs_numtuples = 0;
1115 state->bs_currentInsertBuf = InvalidBuffer;
1116 state->bs_pagesPerRange = pagesPerRange;
1117 state->bs_currRangeStart = 0;
1118 state->bs_rmAccess = revmap;
1119 state->bs_bdesc = brin_build_desc(idxRel);
1120 state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1121
1122 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1123
1124 return state;
1125 }
1126
1127 /*
1128 * Release resources associated with a BrinBuildState.
1129 */
1130 static void
terminate_brin_buildstate(BrinBuildState * state)1131 terminate_brin_buildstate(BrinBuildState *state)
1132 {
1133 /*
1134 * Release the last index buffer used. We might as well ensure that
1135 * whatever free space remains in that page is available in FSM, too.
1136 */
1137 if (!BufferIsInvalid(state->bs_currentInsertBuf))
1138 {
1139 Page page;
1140 Size freespace;
1141 BlockNumber blk;
1142
1143 page = BufferGetPage(state->bs_currentInsertBuf);
1144 freespace = PageGetFreeSpace(page);
1145 blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1146 ReleaseBuffer(state->bs_currentInsertBuf);
1147 RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1148 FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1149 }
1150
1151 brin_free_desc(state->bs_bdesc);
1152 pfree(state->bs_dtuple);
1153 pfree(state);
1154 }
1155
1156 /*
1157 * On the given BRIN index, summarize the heap page range that corresponds
1158 * to the heap block number given.
1159 *
1160 * This routine can run in parallel with insertions into the heap. To avoid
1161 * missing those values from the summary tuple, we first insert a placeholder
1162 * index tuple into the index, then execute the heap scan; transactions
1163 * concurrent with the scan update the placeholder tuple. After the scan, we
1164 * union the placeholder tuple with the one computed by this routine. The
1165 * update of the index value happens in a loop, so that if somebody updates
1166 * the placeholder tuple after we read it, we detect the case and try again.
1167 * This ensures that the concurrently inserted tuples are not lost.
1168 *
1169 * A further corner case is this routine being asked to summarize the partial
1170 * range at the end of the table. heapNumBlocks is the (possibly outdated)
1171 * table size; if we notice that the requested range lies beyond that size,
1172 * we re-compute the table size after inserting the placeholder tuple, to
1173 * avoid missing pages that were appended recently.
1174 */
1175 static void
summarize_range(IndexInfo * indexInfo,BrinBuildState * state,Relation heapRel,BlockNumber heapBlk,BlockNumber heapNumBlks)1176 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1177 BlockNumber heapBlk, BlockNumber heapNumBlks)
1178 {
1179 Buffer phbuf;
1180 BrinTuple *phtup;
1181 Size phsz;
1182 OffsetNumber offset;
1183 BlockNumber scanNumBlks;
1184
1185 /*
1186 * Insert the placeholder tuple
1187 */
1188 phbuf = InvalidBuffer;
1189 phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1190 offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1191 state->bs_rmAccess, &phbuf,
1192 heapBlk, phtup, phsz);
1193
1194 /*
1195 * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1196 * cannot shrink concurrently (but it can grow).
1197 */
1198 Assert(heapBlk % state->bs_pagesPerRange == 0);
1199 if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1200 {
1201 /*
1202 * If we're asked to scan what we believe to be the final range on the
1203 * table (i.e. a range that might be partial) we need to recompute our
1204 * idea of what the latest page is after inserting the placeholder
1205 * tuple. Anyone that grows the table later will update the
1206 * placeholder tuple, so it doesn't matter that we won't scan these
1207 * pages ourselves. Careful: the table might have been extended
1208 * beyond the current range, so clamp our result.
1209 *
1210 * Fortunately, this should occur infrequently.
1211 */
1212 scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1213 state->bs_pagesPerRange);
1214 }
1215 else
1216 {
1217 /* Easy case: range is known to be complete */
1218 scanNumBlks = state->bs_pagesPerRange;
1219 }
1220
1221 /*
1222 * Execute the partial heap scan covering the heap blocks in the specified
1223 * page range, summarizing the heap tuples in it. This scan stops just
1224 * short of brinbuildCallback creating the new index entry.
1225 *
1226 * Note that it is critical we use the "any visible" mode of
1227 * table_index_build_range_scan here: otherwise, we would miss tuples
1228 * inserted by transactions that are still in progress, among other corner
1229 * cases.
1230 */
1231 state->bs_currRangeStart = heapBlk;
1232 table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1233 heapBlk, scanNumBlks,
1234 brinbuildCallback, (void *) state, NULL);
1235
1236 /*
1237 * Now we update the values obtained by the scan with the placeholder
1238 * tuple. We do this in a loop which only terminates if we're able to
1239 * update the placeholder tuple successfully; if we are not, this means
1240 * somebody else modified the placeholder tuple after we read it.
1241 */
1242 for (;;)
1243 {
1244 BrinTuple *newtup;
1245 Size newsize;
1246 bool didupdate;
1247 bool samepage;
1248
1249 CHECK_FOR_INTERRUPTS();
1250
1251 /*
1252 * Update the summary tuple and try to update.
1253 */
1254 newtup = brin_form_tuple(state->bs_bdesc,
1255 heapBlk, state->bs_dtuple, &newsize);
1256 samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1257 didupdate =
1258 brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1259 state->bs_rmAccess, heapBlk, phbuf, offset,
1260 phtup, phsz, newtup, newsize, samepage);
1261 brin_free_tuple(phtup);
1262 brin_free_tuple(newtup);
1263
1264 /* If the update succeeded, we're done. */
1265 if (didupdate)
1266 break;
1267
1268 /*
1269 * If the update didn't work, it might be because somebody updated the
1270 * placeholder tuple concurrently. Extract the new version, union it
1271 * with the values we have from the scan, and start over. (There are
1272 * other reasons for the update to fail, but it's simple to treat them
1273 * the same.)
1274 */
1275 phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1276 &offset, &phsz, BUFFER_LOCK_SHARE,
1277 NULL);
1278 /* the placeholder tuple must exist */
1279 if (phtup == NULL)
1280 elog(ERROR, "missing placeholder tuple");
1281 phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1282 LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1283
1284 /* merge it into the tuple from the heap scan */
1285 union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1286 }
1287
1288 ReleaseBuffer(phbuf);
1289 }
1290
1291 /*
1292 * Summarize page ranges that are not already summarized. If pageRange is
1293 * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1294 * page range containing the given heap page number is scanned.
1295 * If include_partial is true, then the partial range at the end of the table
1296 * is summarized, otherwise not.
1297 *
1298 * For each new index tuple inserted, *numSummarized (if not NULL) is
1299 * incremented; for each existing tuple, *numExisting (if not NULL) is
1300 * incremented.
1301 */
1302 static void
brinsummarize(Relation index,Relation heapRel,BlockNumber pageRange,bool include_partial,double * numSummarized,double * numExisting)1303 brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1304 bool include_partial, double *numSummarized, double *numExisting)
1305 {
1306 BrinRevmap *revmap;
1307 BrinBuildState *state = NULL;
1308 IndexInfo *indexInfo = NULL;
1309 BlockNumber heapNumBlocks;
1310 BlockNumber pagesPerRange;
1311 Buffer buf;
1312 BlockNumber startBlk;
1313
1314 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1315
1316 /* determine range of pages to process */
1317 heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1318 if (pageRange == BRIN_ALL_BLOCKRANGES)
1319 startBlk = 0;
1320 else
1321 {
1322 startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1323 heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1324 }
1325 if (startBlk > heapNumBlocks)
1326 {
1327 /* Nothing to do if start point is beyond end of table */
1328 brinRevmapTerminate(revmap);
1329 return;
1330 }
1331
1332 /*
1333 * Scan the revmap to find unsummarized items.
1334 */
1335 buf = InvalidBuffer;
1336 for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1337 {
1338 BrinTuple *tup;
1339 OffsetNumber off;
1340
1341 /*
1342 * Unless requested to summarize even a partial range, go away now if
1343 * we think the next range is partial. Caller would pass true when it
1344 * is typically run once bulk data loading is done
1345 * (brin_summarize_new_values), and false when it is typically the
1346 * result of arbitrarily-scheduled maintenance command (vacuuming).
1347 */
1348 if (!include_partial &&
1349 (startBlk + pagesPerRange > heapNumBlocks))
1350 break;
1351
1352 CHECK_FOR_INTERRUPTS();
1353
1354 tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1355 BUFFER_LOCK_SHARE, NULL);
1356 if (tup == NULL)
1357 {
1358 /* no revmap entry for this heap range. Summarize it. */
1359 if (state == NULL)
1360 {
1361 /* first time through */
1362 Assert(!indexInfo);
1363 state = initialize_brin_buildstate(index, revmap,
1364 pagesPerRange);
1365 indexInfo = BuildIndexInfo(index);
1366 }
1367 summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1368
1369 /* and re-initialize state for the next range */
1370 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1371
1372 if (numSummarized)
1373 *numSummarized += 1.0;
1374 }
1375 else
1376 {
1377 if (numExisting)
1378 *numExisting += 1.0;
1379 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1380 }
1381 }
1382
1383 if (BufferIsValid(buf))
1384 ReleaseBuffer(buf);
1385
1386 /* free resources */
1387 brinRevmapTerminate(revmap);
1388 if (state)
1389 {
1390 terminate_brin_buildstate(state);
1391 pfree(indexInfo);
1392 }
1393 }
1394
1395 /*
1396 * Given a deformed tuple in the build state, convert it into the on-disk
1397 * format and insert it into the index, making the revmap point to it.
1398 */
1399 static void
form_and_insert_tuple(BrinBuildState * state)1400 form_and_insert_tuple(BrinBuildState *state)
1401 {
1402 BrinTuple *tup;
1403 Size size;
1404
1405 tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1406 state->bs_dtuple, &size);
1407 brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1408 &state->bs_currentInsertBuf, state->bs_currRangeStart,
1409 tup, size);
1410 state->bs_numtuples++;
1411
1412 pfree(tup);
1413 }
1414
1415 /*
1416 * Given two deformed tuples, adjust the first one so that it's consistent
1417 * with the summary values in both.
1418 */
1419 static void
union_tuples(BrinDesc * bdesc,BrinMemTuple * a,BrinTuple * b)1420 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1421 {
1422 int keyno;
1423 BrinMemTuple *db;
1424 MemoryContext cxt;
1425 MemoryContext oldcxt;
1426
1427 /* Use our own memory context to avoid retail pfree */
1428 cxt = AllocSetContextCreate(CurrentMemoryContext,
1429 "brin union",
1430 ALLOCSET_DEFAULT_SIZES);
1431 oldcxt = MemoryContextSwitchTo(cxt);
1432 db = brin_deform_tuple(bdesc, b, NULL);
1433 MemoryContextSwitchTo(oldcxt);
1434
1435 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1436 {
1437 FmgrInfo *unionFn;
1438 BrinValues *col_a = &a->bt_columns[keyno];
1439 BrinValues *col_b = &db->bt_columns[keyno];
1440
1441 unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1442 BRIN_PROCNUM_UNION);
1443 FunctionCall3Coll(unionFn,
1444 bdesc->bd_index->rd_indcollation[keyno],
1445 PointerGetDatum(bdesc),
1446 PointerGetDatum(col_a),
1447 PointerGetDatum(col_b));
1448 }
1449
1450 MemoryContextDelete(cxt);
1451 }
1452
1453 /*
1454 * brin_vacuum_scan
1455 * Do a complete scan of the index during VACUUM.
1456 *
1457 * This routine scans the complete index looking for uncatalogued index pages,
1458 * i.e. those that might have been lost due to a crash after index extension
1459 * and such.
1460 */
1461 static void
brin_vacuum_scan(Relation idxrel,BufferAccessStrategy strategy)1462 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1463 {
1464 BlockNumber nblocks;
1465 BlockNumber blkno;
1466
1467 /*
1468 * Scan the index in physical order, and clean up any possible mess in
1469 * each page.
1470 */
1471 nblocks = RelationGetNumberOfBlocks(idxrel);
1472 for (blkno = 0; blkno < nblocks; blkno++)
1473 {
1474 Buffer buf;
1475
1476 CHECK_FOR_INTERRUPTS();
1477
1478 buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1479 RBM_NORMAL, strategy);
1480
1481 brin_page_cleanup(idxrel, buf);
1482
1483 ReleaseBuffer(buf);
1484 }
1485
1486 /*
1487 * Update all upper pages in the index's FSM, as well. This ensures not
1488 * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
1489 * but also that any pre-existing damage or out-of-dateness is repaired.
1490 */
1491 FreeSpaceMapVacuum(idxrel);
1492 }
1493