1 /*
2 * brin.c
3 * Implementation of BRIN indexes for Postgres
4 *
5 * See src/backend/access/brin/README for details.
6 *
7 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/brin/brin.c
12 *
13 * TODO
14 * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 */
16 #include "postgres.h"
17
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/reloptions.h"
23 #include "access/relscan.h"
24 #include "access/xloginsert.h"
25 #include "catalog/index.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "pgstat.h"
29 #include "storage/bufmgr.h"
30 #include "storage/freespace.h"
31 #include "utils/index_selfuncs.h"
32 #include "utils/memutils.h"
33 #include "utils/rel.h"
34
35
36 /*
37 * We use a BrinBuildState during initial construction of a BRIN index.
38 * The running state is kept in a BrinMemTuple.
39 */
40 typedef struct BrinBuildState
41 {
42 Relation bs_irel;
43 int bs_numtuples;
44 Buffer bs_currentInsertBuf;
45 BlockNumber bs_pagesPerRange;
46 BlockNumber bs_currRangeStart;
47 BrinRevmap *bs_rmAccess;
48 BrinDesc *bs_bdesc;
49 BrinMemTuple *bs_dtuple;
50 } BrinBuildState;
51
52 /*
53 * Struct used as "opaque" during index scans
54 */
55 typedef struct BrinOpaque
56 {
57 BlockNumber bo_pagesPerRange;
58 BrinRevmap *bo_rmAccess;
59 BrinDesc *bo_bdesc;
60 } BrinOpaque;
61
62 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
63 BrinRevmap *revmap, BlockNumber pagesPerRange);
64 static void terminate_brin_buildstate(BrinBuildState *state);
65 static void brinsummarize(Relation index, Relation heapRel,
66 double *numSummarized, double *numExisting);
67 static void form_and_insert_tuple(BrinBuildState *state);
68 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
69 BrinTuple *b);
70 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
71
72
73 /*
74 * BRIN handler function: return IndexAmRoutine with access method parameters
75 * and callbacks.
76 */
77 Datum
brinhandler(PG_FUNCTION_ARGS)78 brinhandler(PG_FUNCTION_ARGS)
79 {
80 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
81
82 amroutine->amstrategies = 0;
83 amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
84 amroutine->amcanorder = false;
85 amroutine->amcanorderbyop = false;
86 amroutine->amcanbackward = false;
87 amroutine->amcanunique = false;
88 amroutine->amcanmulticol = true;
89 amroutine->amoptionalkey = true;
90 amroutine->amsearcharray = false;
91 amroutine->amsearchnulls = true;
92 amroutine->amstorage = true;
93 amroutine->amclusterable = false;
94 amroutine->ampredlocks = false;
95 amroutine->amkeytype = InvalidOid;
96
97 amroutine->ambuild = brinbuild;
98 amroutine->ambuildempty = brinbuildempty;
99 amroutine->aminsert = brininsert;
100 amroutine->ambulkdelete = brinbulkdelete;
101 amroutine->amvacuumcleanup = brinvacuumcleanup;
102 amroutine->amcanreturn = NULL;
103 amroutine->amcostestimate = brincostestimate;
104 amroutine->amoptions = brinoptions;
105 amroutine->amproperty = NULL;
106 amroutine->amvalidate = brinvalidate;
107 amroutine->ambeginscan = brinbeginscan;
108 amroutine->amrescan = brinrescan;
109 amroutine->amgettuple = NULL;
110 amroutine->amgetbitmap = bringetbitmap;
111 amroutine->amendscan = brinendscan;
112 amroutine->ammarkpos = NULL;
113 amroutine->amrestrpos = NULL;
114
115 PG_RETURN_POINTER(amroutine);
116 }
117
118 /*
119 * A tuple in the heap is being inserted. To keep a brin index up to date,
120 * we need to obtain the relevant index tuple and compare its stored values
121 * with those of the new tuple. If the tuple values are not consistent with
122 * the summary tuple, we need to update the index tuple.
123 *
124 * If the range is not currently summarized (i.e. the revmap returns NULL for
125 * it), there's nothing to do.
126 */
127 bool
brininsert(Relation idxRel,Datum * values,bool * nulls,ItemPointer heaptid,Relation heapRel,IndexUniqueCheck checkUnique)128 brininsert(Relation idxRel, Datum *values, bool *nulls,
129 ItemPointer heaptid, Relation heapRel,
130 IndexUniqueCheck checkUnique)
131 {
132 BlockNumber pagesPerRange;
133 BrinDesc *bdesc = NULL;
134 BrinRevmap *revmap;
135 Buffer buf = InvalidBuffer;
136 MemoryContext tupcxt = NULL;
137 MemoryContext oldcxt = NULL;
138
139 revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
140
141 for (;;)
142 {
143 bool need_insert = false;
144 OffsetNumber off;
145 BrinTuple *brtup;
146 BrinMemTuple *dtup;
147 BlockNumber heapBlk;
148 int keyno;
149
150 CHECK_FOR_INTERRUPTS();
151
152 heapBlk = ItemPointerGetBlockNumber(heaptid);
153 /* normalize the block number to be the first block in the range */
154 heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
155 brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
156 BUFFER_LOCK_SHARE, NULL);
157
158 /* if range is unsummarized, there's nothing to do */
159 if (!brtup)
160 break;
161
162 /* First time through? */
163 if (bdesc == NULL)
164 {
165 bdesc = brin_build_desc(idxRel);
166 tupcxt = AllocSetContextCreate(CurrentMemoryContext,
167 "brininsert cxt",
168 ALLOCSET_DEFAULT_SIZES);
169 oldcxt = MemoryContextSwitchTo(tupcxt);
170 }
171
172 dtup = brin_deform_tuple(bdesc, brtup);
173
174 /*
175 * Compare the key values of the new tuple to the stored index values;
176 * our deformed tuple will get updated if the new tuple doesn't fit
177 * the original range (note this means we can't break out of the loop
178 * early). Make a note of whether this happens, so that we know to
179 * insert the modified tuple later.
180 */
181 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
182 {
183 Datum result;
184 BrinValues *bval;
185 FmgrInfo *addValue;
186
187 bval = &dtup->bt_columns[keyno];
188 addValue = index_getprocinfo(idxRel, keyno + 1,
189 BRIN_PROCNUM_ADDVALUE);
190 result = FunctionCall4Coll(addValue,
191 idxRel->rd_indcollation[keyno],
192 PointerGetDatum(bdesc),
193 PointerGetDatum(bval),
194 values[keyno],
195 nulls[keyno]);
196 /* if that returned true, we need to insert the updated tuple */
197 need_insert |= DatumGetBool(result);
198 }
199
200 if (!need_insert)
201 {
202 /*
203 * The tuple is consistent with the new values, so there's nothing
204 * to do.
205 */
206 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
207 }
208 else
209 {
210 Page page = BufferGetPage(buf);
211 ItemId lp = PageGetItemId(page, off);
212 Size origsz;
213 BrinTuple *origtup;
214 Size newsz;
215 BrinTuple *newtup;
216 bool samepage;
217
218 /*
219 * Make a copy of the old tuple, so that we can compare it after
220 * re-acquiring the lock.
221 */
222 origsz = ItemIdGetLength(lp);
223 origtup = brin_copy_tuple(brtup, origsz);
224
225 /*
226 * Before releasing the lock, check if we can attempt a same-page
227 * update. Another process could insert a tuple concurrently in
228 * the same page though, so downstream we must be prepared to cope
229 * if this turns out to not be possible after all.
230 */
231 newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
232 samepage = brin_can_do_samepage_update(buf, origsz, newsz);
233 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
234
235 /*
236 * Try to update the tuple. If this doesn't work for whatever
237 * reason, we need to restart from the top; the revmap might be
238 * pointing at a different tuple for this block now, so we need to
239 * recompute to ensure both our new heap tuple and the other
240 * inserter's are covered by the combined tuple. It might be that
241 * we don't need to update at all.
242 */
243 if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
244 buf, off, origtup, origsz, newtup, newsz,
245 samepage))
246 {
247 /* no luck; start over */
248 MemoryContextResetAndDeleteChildren(tupcxt);
249 continue;
250 }
251 }
252
253 /* success! */
254 break;
255 }
256
257 brinRevmapTerminate(revmap);
258 if (BufferIsValid(buf))
259 ReleaseBuffer(buf);
260 if (bdesc != NULL)
261 {
262 brin_free_desc(bdesc);
263 MemoryContextSwitchTo(oldcxt);
264 MemoryContextDelete(tupcxt);
265 }
266
267 return false;
268 }
269
270 /*
271 * Initialize state for a BRIN index scan.
272 *
273 * We read the metapage here to determine the pages-per-range number that this
274 * index was built with. Note that since this cannot be changed while we're
275 * holding lock on index, it's not necessary to recompute it during brinrescan.
276 */
277 IndexScanDesc
brinbeginscan(Relation r,int nkeys,int norderbys)278 brinbeginscan(Relation r, int nkeys, int norderbys)
279 {
280 IndexScanDesc scan;
281 BrinOpaque *opaque;
282
283 scan = RelationGetIndexScan(r, nkeys, norderbys);
284
285 opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
286 opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
287 scan->xs_snapshot);
288 opaque->bo_bdesc = brin_build_desc(r);
289 scan->opaque = opaque;
290
291 return scan;
292 }
293
294 /*
295 * Execute the index scan.
296 *
297 * This works by reading index TIDs from the revmap, and obtaining the index
298 * tuples pointed to by them; the summary values in the index tuples are
299 * compared to the scan keys. We return into the TID bitmap all the pages in
300 * ranges corresponding to index tuples that match the scan keys.
301 *
302 * If a TID from the revmap is read as InvalidTID, we know that range is
303 * unsummarized. Pages in those ranges need to be returned regardless of scan
304 * keys.
305 */
306 int64
bringetbitmap(IndexScanDesc scan,TIDBitmap * tbm)307 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
308 {
309 Relation idxRel = scan->indexRelation;
310 Buffer buf = InvalidBuffer;
311 BrinDesc *bdesc;
312 Oid heapOid;
313 Relation heapRel;
314 BrinOpaque *opaque;
315 BlockNumber nblocks;
316 BlockNumber heapBlk;
317 int totalpages = 0;
318 FmgrInfo *consistentFn;
319 MemoryContext oldcxt;
320 MemoryContext perRangeCxt;
321
322 opaque = (BrinOpaque *) scan->opaque;
323 bdesc = opaque->bo_bdesc;
324 pgstat_count_index_scan(idxRel);
325
326 /*
327 * We need to know the size of the table so that we know how long to
328 * iterate on the revmap.
329 */
330 heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
331 heapRel = heap_open(heapOid, AccessShareLock);
332 nblocks = RelationGetNumberOfBlocks(heapRel);
333 heap_close(heapRel, AccessShareLock);
334
335 /*
336 * Make room for the consistent support procedures of indexed columns. We
337 * don't look them up here; we do that lazily the first time we see a scan
338 * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
339 */
340 consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
341
342 /*
343 * Setup and use a per-range memory context, which is reset every time we
344 * loop below. This avoids having to free the tuples within the loop.
345 */
346 perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
347 "bringetbitmap cxt",
348 ALLOCSET_DEFAULT_SIZES);
349 oldcxt = MemoryContextSwitchTo(perRangeCxt);
350
351 /*
352 * Now scan the revmap. We start by querying for heap page 0,
353 * incrementing by the number of pages per range; this gives us a full
354 * view of the table.
355 */
356 for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
357 {
358 bool addrange;
359 BrinTuple *tup;
360 OffsetNumber off;
361 Size size;
362
363 CHECK_FOR_INTERRUPTS();
364
365 MemoryContextResetAndDeleteChildren(perRangeCxt);
366
367 tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
368 &off, &size, BUFFER_LOCK_SHARE,
369 scan->xs_snapshot);
370 if (tup)
371 {
372 tup = brin_copy_tuple(tup, size);
373 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
374 }
375
376 /*
377 * For page ranges with no indexed tuple, we must return the whole
378 * range; otherwise, compare it to the scan keys.
379 */
380 if (tup == NULL)
381 {
382 addrange = true;
383 }
384 else
385 {
386 BrinMemTuple *dtup;
387
388 dtup = brin_deform_tuple(bdesc, tup);
389 if (dtup->bt_placeholder)
390 {
391 /*
392 * Placeholder tuples are always returned, regardless of the
393 * values stored in them.
394 */
395 addrange = true;
396 }
397 else
398 {
399 int keyno;
400
401 /*
402 * Compare scan keys with summary values stored for the range.
403 * If scan keys are matched, the page range must be added to
404 * the bitmap. We initially assume the range needs to be
405 * added; in particular this serves the case where there are
406 * no keys.
407 */
408 addrange = true;
409 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
410 {
411 ScanKey key = &scan->keyData[keyno];
412 AttrNumber keyattno = key->sk_attno;
413 BrinValues *bval = &dtup->bt_columns[keyattno - 1];
414 Datum add;
415
416 /*
417 * The collation of the scan key must match the collation
418 * used in the index column (but only if the search is not
419 * IS NULL/ IS NOT NULL). Otherwise we shouldn't be using
420 * this index ...
421 */
422 Assert((key->sk_flags & SK_ISNULL) ||
423 (key->sk_collation ==
424 bdesc->bd_tupdesc->attrs[keyattno - 1]->attcollation));
425
426 /* First time this column? look up consistent function */
427 if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
428 {
429 FmgrInfo *tmp;
430
431 tmp = index_getprocinfo(idxRel, keyattno,
432 BRIN_PROCNUM_CONSISTENT);
433 fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
434 CurrentMemoryContext);
435 }
436
437 /*
438 * Check whether the scan key is consistent with the page
439 * range values; if so, have the pages in the range added
440 * to the output bitmap.
441 *
442 * When there are multiple scan keys, failure to meet the
443 * criteria for a single one of them is enough to discard
444 * the range as a whole, so break out of the loop as soon
445 * as a false return value is obtained.
446 */
447 add = FunctionCall3Coll(&consistentFn[keyattno - 1],
448 key->sk_collation,
449 PointerGetDatum(bdesc),
450 PointerGetDatum(bval),
451 PointerGetDatum(key));
452 addrange = DatumGetBool(add);
453 if (!addrange)
454 break;
455 }
456 }
457 }
458
459 /* add the pages in the range to the output bitmap, if needed */
460 if (addrange)
461 {
462 BlockNumber pageno;
463
464 for (pageno = heapBlk;
465 pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
466 pageno++)
467 {
468 MemoryContextSwitchTo(oldcxt);
469 tbm_add_page(tbm, pageno);
470 totalpages++;
471 MemoryContextSwitchTo(perRangeCxt);
472 }
473 }
474 }
475
476 MemoryContextSwitchTo(oldcxt);
477 MemoryContextDelete(perRangeCxt);
478
479 if (buf != InvalidBuffer)
480 ReleaseBuffer(buf);
481
482 /*
483 * XXX We have an approximation of the number of *pages* that our scan
484 * returns, but we don't have a precise idea of the number of heap tuples
485 * involved.
486 */
487 return totalpages * 10;
488 }
489
490 /*
491 * Re-initialize state for a BRIN index scan
492 */
493 void
brinrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)494 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
495 ScanKey orderbys, int norderbys)
496 {
497 /*
498 * Other index AMs preprocess the scan keys at this point, or sometime
499 * early during the scan; this lets them optimize by removing redundant
500 * keys, or doing early returns when they are impossible to satisfy; see
501 * _bt_preprocess_keys for an example. Something like that could be added
502 * here someday, too.
503 */
504
505 if (scankey && scan->numberOfKeys > 0)
506 memmove(scan->keyData, scankey,
507 scan->numberOfKeys * sizeof(ScanKeyData));
508 }
509
510 /*
511 * Close down a BRIN index scan
512 */
513 void
brinendscan(IndexScanDesc scan)514 brinendscan(IndexScanDesc scan)
515 {
516 BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
517
518 brinRevmapTerminate(opaque->bo_rmAccess);
519 brin_free_desc(opaque->bo_bdesc);
520 pfree(opaque);
521 }
522
523 /*
524 * Per-heap-tuple callback for IndexBuildHeapScan.
525 *
526 * Note we don't worry about the page range at the end of the table here; it is
527 * present in the build state struct after we're called the last time, but not
528 * inserted into the index. Caller must ensure to do so, if appropriate.
529 */
530 static void
brinbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * brstate)531 brinbuildCallback(Relation index,
532 HeapTuple htup,
533 Datum *values,
534 bool *isnull,
535 bool tupleIsAlive,
536 void *brstate)
537 {
538 BrinBuildState *state = (BrinBuildState *) brstate;
539 BlockNumber thisblock;
540 int i;
541
542 thisblock = ItemPointerGetBlockNumber(&htup->t_self);
543
544 /*
545 * If we're in a block that belongs to a future range, summarize what
546 * we've got and start afresh. Note the scan might have skipped many
547 * pages, if they were devoid of live tuples; make sure to insert index
548 * tuples for those too.
549 */
550 while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
551 {
552
553 BRIN_elog((DEBUG2,
554 "brinbuildCallback: completed a range: %u--%u",
555 state->bs_currRangeStart,
556 state->bs_currRangeStart + state->bs_pagesPerRange));
557
558 /* create the index tuple and insert it */
559 form_and_insert_tuple(state);
560
561 /* set state to correspond to the next range */
562 state->bs_currRangeStart += state->bs_pagesPerRange;
563
564 /* re-initialize state for it */
565 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
566 }
567
568 /* Accumulate the current tuple into the running state */
569 for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
570 {
571 FmgrInfo *addValue;
572 BrinValues *col;
573
574 col = &state->bs_dtuple->bt_columns[i];
575 addValue = index_getprocinfo(index, i + 1,
576 BRIN_PROCNUM_ADDVALUE);
577
578 /*
579 * Update dtuple state, if and as necessary.
580 */
581 FunctionCall4Coll(addValue,
582 state->bs_bdesc->bd_tupdesc->attrs[i]->attcollation,
583 PointerGetDatum(state->bs_bdesc),
584 PointerGetDatum(col),
585 values[i], isnull[i]);
586 }
587 }
588
589 /*
590 * brinbuild() -- build a new BRIN index.
591 */
592 IndexBuildResult *
brinbuild(Relation heap,Relation index,IndexInfo * indexInfo)593 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
594 {
595 IndexBuildResult *result;
596 double reltuples;
597 double idxtuples;
598 BrinRevmap *revmap;
599 BrinBuildState *state;
600 Buffer meta;
601 BlockNumber pagesPerRange;
602
603 /*
604 * We expect to be called exactly once for any index relation.
605 */
606 if (RelationGetNumberOfBlocks(index) != 0)
607 elog(ERROR, "index \"%s\" already contains data",
608 RelationGetRelationName(index));
609
610 /*
611 * Critical section not required, because on error the creation of the
612 * whole relation will be rolled back.
613 */
614
615 meta = ReadBuffer(index, P_NEW);
616 Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
617 LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
618
619 brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
620 BRIN_CURRENT_VERSION);
621 MarkBufferDirty(meta);
622
623 if (RelationNeedsWAL(index))
624 {
625 xl_brin_createidx xlrec;
626 XLogRecPtr recptr;
627 Page page;
628
629 xlrec.version = BRIN_CURRENT_VERSION;
630 xlrec.pagesPerRange = BrinGetPagesPerRange(index);
631
632 XLogBeginInsert();
633 XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
634 XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT);
635
636 recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
637
638 page = BufferGetPage(meta);
639 PageSetLSN(page, recptr);
640 }
641
642 UnlockReleaseBuffer(meta);
643
644 /*
645 * Initialize our state, including the deformed tuple state.
646 */
647 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
648 state = initialize_brin_buildstate(index, revmap, pagesPerRange);
649
650 /*
651 * Now scan the relation. No syncscan allowed here because we want the
652 * heap blocks in physical order.
653 */
654 reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
655 brinbuildCallback, (void *) state);
656
657 /* process the final batch */
658 form_and_insert_tuple(state);
659
660 /* release resources */
661 idxtuples = state->bs_numtuples;
662 brinRevmapTerminate(state->bs_rmAccess);
663 terminate_brin_buildstate(state);
664
665 /*
666 * Return statistics
667 */
668 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
669
670 result->heap_tuples = reltuples;
671 result->index_tuples = idxtuples;
672
673 return result;
674 }
675
676 void
brinbuildempty(Relation index)677 brinbuildempty(Relation index)
678 {
679 Buffer metabuf;
680
681 /* An empty BRIN index has a metapage only. */
682 metabuf =
683 ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
684 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
685
686 /* Initialize and xlog metabuffer. */
687 START_CRIT_SECTION();
688 brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
689 BRIN_CURRENT_VERSION);
690 MarkBufferDirty(metabuf);
691 log_newpage_buffer(metabuf, false);
692 END_CRIT_SECTION();
693
694 UnlockReleaseBuffer(metabuf);
695 }
696
697 /*
698 * brinbulkdelete
699 * Since there are no per-heap-tuple index tuples in BRIN indexes,
700 * there's not a lot we can do here.
701 *
702 * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
703 * tuple is deleted), meaning the need to re-run summarization on the affected
704 * range. Would need to add an extra flag in brintuples for that.
705 */
706 IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)707 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
708 IndexBulkDeleteCallback callback, void *callback_state)
709 {
710 /* allocate stats if first time through, else re-use existing struct */
711 if (stats == NULL)
712 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
713
714 return stats;
715 }
716
717 /*
718 * This routine is in charge of "vacuuming" a BRIN index: we just summarize
719 * ranges that are currently unsummarized.
720 */
721 IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)722 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
723 {
724 Relation heapRel;
725
726 /* No-op in ANALYZE ONLY mode */
727 if (info->analyze_only)
728 return stats;
729
730 if (!stats)
731 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
732 stats->num_pages = RelationGetNumberOfBlocks(info->index);
733 /* rest of stats is initialized by zeroing */
734
735 heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
736 AccessShareLock);
737
738 brin_vacuum_scan(info->index, info->strategy);
739
740 brinsummarize(info->index, heapRel,
741 &stats->num_index_tuples, &stats->num_index_tuples);
742
743 heap_close(heapRel, AccessShareLock);
744
745 return stats;
746 }
747
748 /*
749 * reloptions processor for BRIN indexes
750 */
751 bytea *
brinoptions(Datum reloptions,bool validate)752 brinoptions(Datum reloptions, bool validate)
753 {
754 relopt_value *options;
755 BrinOptions *rdopts;
756 int numoptions;
757 static const relopt_parse_elt tab[] = {
758 {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)}
759 };
760
761 options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
762 &numoptions);
763
764 /* if none set, we're done */
765 if (numoptions == 0)
766 return NULL;
767
768 rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
769
770 fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
771 validate, tab, lengthof(tab));
772
773 pfree(options);
774
775 return (bytea *) rdopts;
776 }
777
778 /*
779 * SQL-callable function to scan through an index and summarize all ranges
780 * that are not currently summarized.
781 */
782 Datum
brin_summarize_new_values(PG_FUNCTION_ARGS)783 brin_summarize_new_values(PG_FUNCTION_ARGS)
784 {
785 Oid indexoid = PG_GETARG_OID(0);
786 Oid heapoid;
787 Relation indexRel;
788 Relation heapRel;
789 double numSummarized = 0;
790
791 if (RecoveryInProgress())
792 ereport(ERROR,
793 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
794 errmsg("recovery is in progress"),
795 errhint("BRIN control functions cannot be executed during recovery.")));
796
797 /*
798 * We must lock table before index to avoid deadlocks. However, if the
799 * passed indexoid isn't an index then IndexGetRelation() will fail.
800 * Rather than emitting a not-very-helpful error message, postpone
801 * complaining, expecting that the is-it-an-index test below will fail.
802 */
803 heapoid = IndexGetRelation(indexoid, true);
804 if (OidIsValid(heapoid))
805 heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
806 else
807 heapRel = NULL;
808
809 indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
810
811 /* Must be a BRIN index */
812 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
813 indexRel->rd_rel->relam != BRIN_AM_OID)
814 ereport(ERROR,
815 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
816 errmsg("\"%s\" is not a BRIN index",
817 RelationGetRelationName(indexRel))));
818
819 /* User must own the index (comparable to privileges needed for VACUUM) */
820 if (!pg_class_ownercheck(indexoid, GetUserId()))
821 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
822 RelationGetRelationName(indexRel));
823
824 /*
825 * Since we did the IndexGetRelation call above without any lock, it's
826 * barely possible that a race against an index drop/recreation could have
827 * netted us the wrong table. Recheck.
828 */
829 if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
830 ereport(ERROR,
831 (errcode(ERRCODE_UNDEFINED_TABLE),
832 errmsg("could not open parent table of index %s",
833 RelationGetRelationName(indexRel))));
834
835 /* OK, do it */
836 brinsummarize(indexRel, heapRel, &numSummarized, NULL);
837
838 relation_close(indexRel, ShareUpdateExclusiveLock);
839 relation_close(heapRel, ShareUpdateExclusiveLock);
840
841 PG_RETURN_INT32((int32) numSummarized);
842 }
843
844 /*
845 * Build a BrinDesc used to create or scan a BRIN index
846 */
847 BrinDesc *
brin_build_desc(Relation rel)848 brin_build_desc(Relation rel)
849 {
850 BrinOpcInfo **opcinfo;
851 BrinDesc *bdesc;
852 TupleDesc tupdesc;
853 int totalstored = 0;
854 int keyno;
855 long totalsize;
856 MemoryContext cxt;
857 MemoryContext oldcxt;
858
859 cxt = AllocSetContextCreate(CurrentMemoryContext,
860 "brin desc cxt",
861 ALLOCSET_SMALL_SIZES);
862 oldcxt = MemoryContextSwitchTo(cxt);
863 tupdesc = RelationGetDescr(rel);
864
865 /*
866 * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
867 * the number of columns stored, since the number is opclass-defined.
868 */
869 opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
870 for (keyno = 0; keyno < tupdesc->natts; keyno++)
871 {
872 FmgrInfo *opcInfoFn;
873
874 opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
875
876 opcinfo[keyno] = (BrinOpcInfo *)
877 DatumGetPointer(FunctionCall1(opcInfoFn,
878 tupdesc->attrs[keyno]->atttypid));
879 totalstored += opcinfo[keyno]->oi_nstored;
880 }
881
882 /* Allocate our result struct and fill it in */
883 totalsize = offsetof(BrinDesc, bd_info) +
884 sizeof(BrinOpcInfo *) * tupdesc->natts;
885
886 bdesc = palloc(totalsize);
887 bdesc->bd_context = cxt;
888 bdesc->bd_index = rel;
889 bdesc->bd_tupdesc = tupdesc;
890 bdesc->bd_disktdesc = NULL; /* generated lazily */
891 bdesc->bd_totalstored = totalstored;
892
893 for (keyno = 0; keyno < tupdesc->natts; keyno++)
894 bdesc->bd_info[keyno] = opcinfo[keyno];
895 pfree(opcinfo);
896
897 MemoryContextSwitchTo(oldcxt);
898
899 return bdesc;
900 }
901
902 void
brin_free_desc(BrinDesc * bdesc)903 brin_free_desc(BrinDesc *bdesc)
904 {
905 /* make sure the tupdesc is still valid */
906 Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
907 /* no need for retail pfree */
908 MemoryContextDelete(bdesc->bd_context);
909 }
910
911 /*
912 * Initialize a BrinBuildState appropriate to create tuples on the given index.
913 */
914 static BrinBuildState *
initialize_brin_buildstate(Relation idxRel,BrinRevmap * revmap,BlockNumber pagesPerRange)915 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
916 BlockNumber pagesPerRange)
917 {
918 BrinBuildState *state;
919
920 state = palloc(sizeof(BrinBuildState));
921
922 state->bs_irel = idxRel;
923 state->bs_numtuples = 0;
924 state->bs_currentInsertBuf = InvalidBuffer;
925 state->bs_pagesPerRange = pagesPerRange;
926 state->bs_currRangeStart = 0;
927 state->bs_rmAccess = revmap;
928 state->bs_bdesc = brin_build_desc(idxRel);
929 state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
930
931 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
932
933 return state;
934 }
935
936 /*
937 * Release resources associated with a BrinBuildState.
938 */
939 static void
terminate_brin_buildstate(BrinBuildState * state)940 terminate_brin_buildstate(BrinBuildState *state)
941 {
942 /* release the last index buffer used */
943 if (!BufferIsInvalid(state->bs_currentInsertBuf))
944 {
945 Page page;
946
947 page = BufferGetPage(state->bs_currentInsertBuf);
948 RecordPageWithFreeSpace(state->bs_irel,
949 BufferGetBlockNumber(state->bs_currentInsertBuf),
950 PageGetFreeSpace(page));
951 ReleaseBuffer(state->bs_currentInsertBuf);
952 }
953
954 brin_free_desc(state->bs_bdesc);
955 pfree(state->bs_dtuple);
956 pfree(state);
957 }
958
959 /*
960 * On the given BRIN index, summarize the heap page range that corresponds
961 * to the heap block number given.
962 *
963 * This routine can run in parallel with insertions into the heap. To avoid
964 * missing those values from the summary tuple, we first insert a placeholder
965 * index tuple into the index, then execute the heap scan; transactions
966 * concurrent with the scan update the placeholder tuple. After the scan, we
967 * union the placeholder tuple with the one computed by this routine. The
968 * update of the index value happens in a loop, so that if somebody updates
969 * the placeholder tuple after we read it, we detect the case and try again.
970 * This ensures that the concurrently inserted tuples are not lost.
971 *
972 * A further corner case is this routine being asked to summarize the partial
973 * range at the end of the table. heapNumBlocks is the (possibly outdated)
974 * table size; if we notice that the requested range lies beyond that size,
975 * we re-compute the table size after inserting the placeholder tuple, to
976 * avoid missing pages that were appended recently.
977 */
978 static void
summarize_range(IndexInfo * indexInfo,BrinBuildState * state,Relation heapRel,BlockNumber heapBlk,BlockNumber heapNumBlks)979 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
980 BlockNumber heapBlk, BlockNumber heapNumBlks)
981 {
982 Buffer phbuf;
983 BrinTuple *phtup;
984 Size phsz;
985 OffsetNumber offset;
986 BlockNumber scanNumBlks;
987
988 /*
989 * Insert the placeholder tuple
990 */
991 phbuf = InvalidBuffer;
992 phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
993 offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
994 state->bs_rmAccess, &phbuf,
995 heapBlk, phtup, phsz);
996
997 /*
998 * Compute range end. We hold ShareUpdateExclusive lock on table, so it
999 * cannot shrink concurrently (but it can grow).
1000 */
1001 Assert(heapBlk % state->bs_pagesPerRange == 0);
1002 if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1003 {
1004 /*
1005 * If we're asked to scan what we believe to be the final range on the
1006 * table (i.e. a range that might be partial) we need to recompute our
1007 * idea of what the latest page is after inserting the placeholder
1008 * tuple. Anyone that grows the table later will update the
1009 * placeholder tuple, so it doesn't matter that we won't scan these
1010 * pages ourselves. Careful: the table might have been extended
1011 * beyond the current range, so clamp our result.
1012 *
1013 * Fortunately, this should occur infrequently.
1014 */
1015 scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1016 state->bs_pagesPerRange);
1017 }
1018 else
1019 {
1020 /* Easy case: range is known to be complete */
1021 scanNumBlks = state->bs_pagesPerRange;
1022 }
1023
1024 /*
1025 * Execute the partial heap scan covering the heap blocks in the specified
1026 * page range, summarizing the heap tuples in it. This scan stops just
1027 * short of brinbuildCallback creating the new index entry.
1028 *
1029 * Note that it is critical we use the "any visible" mode of
1030 * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
1031 * by transactions that are still in progress, among other corner cases.
1032 */
1033 state->bs_currRangeStart = heapBlk;
1034 IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
1035 heapBlk, scanNumBlks,
1036 brinbuildCallback, (void *) state);
1037
1038 /*
1039 * Now we update the values obtained by the scan with the placeholder
1040 * tuple. We do this in a loop which only terminates if we're able to
1041 * update the placeholder tuple successfully; if we are not, this means
1042 * somebody else modified the placeholder tuple after we read it.
1043 */
1044 for (;;)
1045 {
1046 BrinTuple *newtup;
1047 Size newsize;
1048 bool didupdate;
1049 bool samepage;
1050
1051 CHECK_FOR_INTERRUPTS();
1052
1053 /*
1054 * Update the summary tuple and try to update.
1055 */
1056 newtup = brin_form_tuple(state->bs_bdesc,
1057 heapBlk, state->bs_dtuple, &newsize);
1058 samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1059 didupdate =
1060 brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1061 state->bs_rmAccess, heapBlk, phbuf, offset,
1062 phtup, phsz, newtup, newsize, samepage);
1063 brin_free_tuple(phtup);
1064 brin_free_tuple(newtup);
1065
1066 /* If the update succeeded, we're done. */
1067 if (didupdate)
1068 break;
1069
1070 /*
1071 * If the update didn't work, it might be because somebody updated the
1072 * placeholder tuple concurrently. Extract the new version, union it
1073 * with the values we have from the scan, and start over. (There are
1074 * other reasons for the update to fail, but it's simple to treat them
1075 * the same.)
1076 */
1077 phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1078 &offset, &phsz, BUFFER_LOCK_SHARE,
1079 NULL);
1080 /* the placeholder tuple must exist */
1081 if (phtup == NULL)
1082 elog(ERROR, "missing placeholder tuple");
1083 phtup = brin_copy_tuple(phtup, phsz);
1084 LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1085
1086 /* merge it into the tuple from the heap scan */
1087 union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1088 }
1089
1090 ReleaseBuffer(phbuf);
1091 }
1092
1093 /*
1094 * Scan a complete BRIN index, and summarize each page range that's not already
1095 * summarized. The index and heap must have been locked by caller in at
1096 * least ShareUpdateExclusiveLock mode.
1097 *
1098 * For each new index tuple inserted, *numSummarized (if not NULL) is
1099 * incremented; for each existing tuple, *numExisting (if not NULL) is
1100 * incremented.
1101 */
1102 static void
brinsummarize(Relation index,Relation heapRel,double * numSummarized,double * numExisting)1103 brinsummarize(Relation index, Relation heapRel, double *numSummarized,
1104 double *numExisting)
1105 {
1106 BrinRevmap *revmap;
1107 BrinBuildState *state = NULL;
1108 IndexInfo *indexInfo = NULL;
1109 BlockNumber heapNumBlocks;
1110 BlockNumber pagesPerRange;
1111 Buffer buf;
1112 BlockNumber startBlk;
1113
1114 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1115
1116 /* determine range of pages to process: always start from the beginning */
1117 heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1118 startBlk = 0;
1119
1120 /*
1121 * Scan the revmap to find unsummarized items.
1122 */
1123 buf = InvalidBuffer;
1124 for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1125 {
1126 BrinTuple *tup;
1127 OffsetNumber off;
1128
1129 /*
1130 * Go away now if we think the next range is partial.
1131 */
1132 if (startBlk + pagesPerRange > heapNumBlocks)
1133 break;
1134
1135 CHECK_FOR_INTERRUPTS();
1136
1137 tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1138 BUFFER_LOCK_SHARE, NULL);
1139 if (tup == NULL)
1140 {
1141 /* no revmap entry for this heap range. Summarize it. */
1142 if (state == NULL)
1143 {
1144 /* first time through */
1145 Assert(!indexInfo);
1146 state = initialize_brin_buildstate(index, revmap,
1147 pagesPerRange);
1148 indexInfo = BuildIndexInfo(index);
1149 }
1150 summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1151
1152 /* and re-initialize state for the next range */
1153 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1154
1155 if (numSummarized)
1156 *numSummarized += 1.0;
1157 }
1158 else
1159 {
1160 if (numExisting)
1161 *numExisting += 1.0;
1162 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1163 }
1164 }
1165
1166 if (BufferIsValid(buf))
1167 ReleaseBuffer(buf);
1168
1169 /* free resources */
1170 brinRevmapTerminate(revmap);
1171 if (state)
1172 {
1173 terminate_brin_buildstate(state);
1174 pfree(indexInfo);
1175 }
1176 }
1177
1178 /*
1179 * Given a deformed tuple in the build state, convert it into the on-disk
1180 * format and insert it into the index, making the revmap point to it.
1181 */
1182 static void
form_and_insert_tuple(BrinBuildState * state)1183 form_and_insert_tuple(BrinBuildState *state)
1184 {
1185 BrinTuple *tup;
1186 Size size;
1187
1188 tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1189 state->bs_dtuple, &size);
1190 brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1191 &state->bs_currentInsertBuf, state->bs_currRangeStart,
1192 tup, size);
1193 state->bs_numtuples++;
1194
1195 pfree(tup);
1196 }
1197
1198 /*
1199 * Given two deformed tuples, adjust the first one so that it's consistent
1200 * with the summary values in both.
1201 */
1202 static void
union_tuples(BrinDesc * bdesc,BrinMemTuple * a,BrinTuple * b)1203 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1204 {
1205 int keyno;
1206 BrinMemTuple *db;
1207 MemoryContext cxt;
1208 MemoryContext oldcxt;
1209
1210 /* Use our own memory context to avoid retail pfree */
1211 cxt = AllocSetContextCreate(CurrentMemoryContext,
1212 "brin union",
1213 ALLOCSET_DEFAULT_SIZES);
1214 oldcxt = MemoryContextSwitchTo(cxt);
1215 db = brin_deform_tuple(bdesc, b);
1216 MemoryContextSwitchTo(oldcxt);
1217
1218 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1219 {
1220 FmgrInfo *unionFn;
1221 BrinValues *col_a = &a->bt_columns[keyno];
1222 BrinValues *col_b = &db->bt_columns[keyno];
1223
1224 unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1225 BRIN_PROCNUM_UNION);
1226 FunctionCall3Coll(unionFn,
1227 bdesc->bd_index->rd_indcollation[keyno],
1228 PointerGetDatum(bdesc),
1229 PointerGetDatum(col_a),
1230 PointerGetDatum(col_b));
1231 }
1232
1233 MemoryContextDelete(cxt);
1234 }
1235
1236 /*
1237 * brin_vacuum_scan
1238 * Do a complete scan of the index during VACUUM.
1239 *
1240 * This routine scans the complete index looking for uncatalogued index pages,
1241 * i.e. those that might have been lost due to a crash after index extension
1242 * and such.
1243 */
1244 static void
brin_vacuum_scan(Relation idxrel,BufferAccessStrategy strategy)1245 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1246 {
1247 bool vacuum_fsm = false;
1248 BlockNumber blkno;
1249
1250 /*
1251 * Scan the index in physical order, and clean up any possible mess in
1252 * each page.
1253 */
1254 for (blkno = 0; blkno < RelationGetNumberOfBlocks(idxrel); blkno++)
1255 {
1256 Buffer buf;
1257
1258 CHECK_FOR_INTERRUPTS();
1259
1260 buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1261 RBM_NORMAL, strategy);
1262
1263 vacuum_fsm |= brin_page_cleanup(idxrel, buf);
1264
1265 ReleaseBuffer(buf);
1266 }
1267
1268 /*
1269 * If we made any change to the FSM, make sure the new info is visible all
1270 * the way to the top.
1271 */
1272 if (vacuum_fsm)
1273 FreeSpaceMapVacuum(idxrel);
1274 }
1275