1 /*-------------------------------------------------------------------------
2 *
3 * ginfast.c
4 * Fast insert routines for the Postgres inverted index access method.
5 * Pending entries are stored in linear list of pages. Later on
6 * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 * transfer pending entries into the regular index structure. This
8 * wins because bulk insertion is much more efficient than retail.
9 *
10 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
12 *
13 * IDENTIFICATION
14 * src/backend/access/gin/ginfast.c
15 *
16 *-------------------------------------------------------------------------
17 */
18
19 #include "postgres.h"
20
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xlog.h"
24 #include "access/xloginsert.h"
25 #include "catalog/pg_am.h"
26 #include "commands/vacuum.h"
27 #include "miscadmin.h"
28 #include "port/pg_bitutils.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/indexfsm.h"
31 #include "storage/lmgr.h"
32 #include "storage/predicate.h"
33 #include "utils/acl.h"
34 #include "utils/builtins.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37
38 /* GUC parameter */
39 int gin_pending_list_limit = 0;
40
41 #define GIN_PAGE_FREESIZE \
42 ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
43
44 typedef struct KeyArray
45 {
46 Datum *keys; /* expansible array */
47 GinNullCategory *categories; /* another expansible array */
48 int32 nvalues; /* current number of valid entries */
49 int32 maxvalues; /* allocated size of arrays */
50 } KeyArray;
51
52
53 /*
54 * Build a pending-list page from the given array of tuples, and write it out.
55 *
56 * Returns amount of free space left on the page.
57 */
58 static int32
writeListPage(Relation index,Buffer buffer,IndexTuple * tuples,int32 ntuples,BlockNumber rightlink)59 writeListPage(Relation index, Buffer buffer,
60 IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
61 {
62 Page page = BufferGetPage(buffer);
63 int32 i,
64 freesize,
65 size = 0;
66 OffsetNumber l,
67 off;
68 PGAlignedBlock workspace;
69 char *ptr;
70
71 START_CRIT_SECTION();
72
73 GinInitBuffer(buffer, GIN_LIST);
74
75 off = FirstOffsetNumber;
76 ptr = workspace.data;
77
78 for (i = 0; i < ntuples; i++)
79 {
80 int this_size = IndexTupleSize(tuples[i]);
81
82 memcpy(ptr, tuples[i], this_size);
83 ptr += this_size;
84 size += this_size;
85
86 l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
87
88 if (l == InvalidOffsetNumber)
89 elog(ERROR, "failed to add item to index page in \"%s\"",
90 RelationGetRelationName(index));
91
92 off++;
93 }
94
95 Assert(size <= BLCKSZ); /* else we overran workspace */
96
97 GinPageGetOpaque(page)->rightlink = rightlink;
98
99 /*
100 * tail page may contain only whole row(s) or final part of row placed on
101 * previous pages (a "row" here meaning all the index tuples generated for
102 * one heap tuple)
103 */
104 if (rightlink == InvalidBlockNumber)
105 {
106 GinPageSetFullRow(page);
107 GinPageGetOpaque(page)->maxoff = 1;
108 }
109 else
110 {
111 GinPageGetOpaque(page)->maxoff = 0;
112 }
113
114 MarkBufferDirty(buffer);
115
116 if (RelationNeedsWAL(index))
117 {
118 ginxlogInsertListPage data;
119 XLogRecPtr recptr;
120
121 data.rightlink = rightlink;
122 data.ntuples = ntuples;
123
124 XLogBeginInsert();
125 XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
126
127 XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
128 XLogRegisterBufData(0, workspace.data, size);
129
130 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
131 PageSetLSN(page, recptr);
132 }
133
134 /* get free space before releasing buffer */
135 freesize = PageGetExactFreeSpace(page);
136
137 UnlockReleaseBuffer(buffer);
138
139 END_CRIT_SECTION();
140
141 return freesize;
142 }
143
144 static void
makeSublist(Relation index,IndexTuple * tuples,int32 ntuples,GinMetaPageData * res)145 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
146 GinMetaPageData *res)
147 {
148 Buffer curBuffer = InvalidBuffer;
149 Buffer prevBuffer = InvalidBuffer;
150 int i,
151 size = 0,
152 tupsize;
153 int startTuple = 0;
154
155 Assert(ntuples > 0);
156
157 /*
158 * Split tuples into pages
159 */
160 for (i = 0; i < ntuples; i++)
161 {
162 if (curBuffer == InvalidBuffer)
163 {
164 curBuffer = GinNewBuffer(index);
165
166 if (prevBuffer != InvalidBuffer)
167 {
168 res->nPendingPages++;
169 writeListPage(index, prevBuffer,
170 tuples + startTuple,
171 i - startTuple,
172 BufferGetBlockNumber(curBuffer));
173 }
174 else
175 {
176 res->head = BufferGetBlockNumber(curBuffer);
177 }
178
179 prevBuffer = curBuffer;
180 startTuple = i;
181 size = 0;
182 }
183
184 tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
185
186 if (size + tupsize > GinListPageSize)
187 {
188 /* won't fit, force a new page and reprocess */
189 i--;
190 curBuffer = InvalidBuffer;
191 }
192 else
193 {
194 size += tupsize;
195 }
196 }
197
198 /*
199 * Write last page
200 */
201 res->tail = BufferGetBlockNumber(curBuffer);
202 res->tailFreeSize = writeListPage(index, curBuffer,
203 tuples + startTuple,
204 ntuples - startTuple,
205 InvalidBlockNumber);
206 res->nPendingPages++;
207 /* that was only one heap tuple */
208 res->nPendingHeapTuples = 1;
209 }
210
211 /*
212 * Write the index tuples contained in *collector into the index's
213 * pending list.
214 *
215 * Function guarantees that all these tuples will be inserted consecutively,
216 * preserving order
217 */
218 void
ginHeapTupleFastInsert(GinState * ginstate,GinTupleCollector * collector)219 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
220 {
221 Relation index = ginstate->index;
222 Buffer metabuffer;
223 Page metapage;
224 GinMetaPageData *metadata = NULL;
225 Buffer buffer = InvalidBuffer;
226 Page page = NULL;
227 ginxlogUpdateMeta data;
228 bool separateList = false;
229 bool needCleanup = false;
230 int cleanupSize;
231 bool needWal;
232
233 if (collector->ntuples == 0)
234 return;
235
236 needWal = RelationNeedsWAL(index);
237
238 data.node = index->rd_node;
239 data.ntuples = 0;
240 data.newRightlink = data.prevTail = InvalidBlockNumber;
241
242 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
243 metapage = BufferGetPage(metabuffer);
244
245 /*
246 * An insertion to the pending list could logically belong anywhere in the
247 * tree, so it conflicts with all serializable scans. All scans acquire a
248 * predicate lock on the metabuffer to represent that.
249 */
250 CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
251
252 if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
253 {
254 /*
255 * Total size is greater than one page => make sublist
256 */
257 separateList = true;
258 }
259 else
260 {
261 LockBuffer(metabuffer, GIN_EXCLUSIVE);
262 metadata = GinPageGetMeta(metapage);
263
264 if (metadata->head == InvalidBlockNumber ||
265 collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
266 {
267 /*
268 * Pending list is empty or total size is greater than freespace
269 * on tail page => make sublist
270 *
271 * We unlock metabuffer to keep high concurrency
272 */
273 separateList = true;
274 LockBuffer(metabuffer, GIN_UNLOCK);
275 }
276 }
277
278 if (separateList)
279 {
280 /*
281 * We should make sublist separately and append it to the tail
282 */
283 GinMetaPageData sublist;
284
285 memset(&sublist, 0, sizeof(GinMetaPageData));
286 makeSublist(index, collector->tuples, collector->ntuples, &sublist);
287
288 if (needWal)
289 XLogBeginInsert();
290
291 /*
292 * metapage was unlocked, see above
293 */
294 LockBuffer(metabuffer, GIN_EXCLUSIVE);
295 metadata = GinPageGetMeta(metapage);
296
297 if (metadata->head == InvalidBlockNumber)
298 {
299 /*
300 * Main list is empty, so just insert sublist as main list
301 */
302 START_CRIT_SECTION();
303
304 metadata->head = sublist.head;
305 metadata->tail = sublist.tail;
306 metadata->tailFreeSize = sublist.tailFreeSize;
307
308 metadata->nPendingPages = sublist.nPendingPages;
309 metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
310 }
311 else
312 {
313 /*
314 * Merge lists
315 */
316 data.prevTail = metadata->tail;
317 data.newRightlink = sublist.head;
318
319 buffer = ReadBuffer(index, metadata->tail);
320 LockBuffer(buffer, GIN_EXCLUSIVE);
321 page = BufferGetPage(buffer);
322
323 Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
324
325 START_CRIT_SECTION();
326
327 GinPageGetOpaque(page)->rightlink = sublist.head;
328
329 MarkBufferDirty(buffer);
330
331 metadata->tail = sublist.tail;
332 metadata->tailFreeSize = sublist.tailFreeSize;
333
334 metadata->nPendingPages += sublist.nPendingPages;
335 metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
336
337 if (needWal)
338 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
339 }
340 }
341 else
342 {
343 /*
344 * Insert into tail page. Metapage is already locked
345 */
346 OffsetNumber l,
347 off;
348 int i,
349 tupsize;
350 char *ptr;
351 char *collectordata;
352
353 buffer = ReadBuffer(index, metadata->tail);
354 LockBuffer(buffer, GIN_EXCLUSIVE);
355 page = BufferGetPage(buffer);
356
357 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
358 OffsetNumberNext(PageGetMaxOffsetNumber(page));
359
360 collectordata = ptr = (char *) palloc(collector->sumsize);
361
362 data.ntuples = collector->ntuples;
363
364 if (needWal)
365 XLogBeginInsert();
366
367 START_CRIT_SECTION();
368
369 /*
370 * Increase counter of heap tuples
371 */
372 Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
373 GinPageGetOpaque(page)->maxoff++;
374 metadata->nPendingHeapTuples++;
375
376 for (i = 0; i < collector->ntuples; i++)
377 {
378 tupsize = IndexTupleSize(collector->tuples[i]);
379 l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
380
381 if (l == InvalidOffsetNumber)
382 elog(ERROR, "failed to add item to index page in \"%s\"",
383 RelationGetRelationName(index));
384
385 memcpy(ptr, collector->tuples[i], tupsize);
386 ptr += tupsize;
387
388 off++;
389 }
390
391 Assert((ptr - collectordata) <= collector->sumsize);
392 if (needWal)
393 {
394 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
395 XLogRegisterBufData(1, collectordata, collector->sumsize);
396 }
397
398 metadata->tailFreeSize = PageGetExactFreeSpace(page);
399
400 MarkBufferDirty(buffer);
401 }
402
403 /*
404 * Set pd_lower just past the end of the metadata. This is essential,
405 * because without doing so, metadata will be lost if xlog.c compresses
406 * the page. (We must do this here because pre-v11 versions of PG did not
407 * set the metapage's pd_lower correctly, so a pg_upgraded index might
408 * contain the wrong value.)
409 */
410 ((PageHeader) metapage)->pd_lower =
411 ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
412
413 /*
414 * Write metabuffer, make xlog entry
415 */
416 MarkBufferDirty(metabuffer);
417
418 if (needWal)
419 {
420 XLogRecPtr recptr;
421
422 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
423
424 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
425 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
426
427 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
428 PageSetLSN(metapage, recptr);
429
430 if (buffer != InvalidBuffer)
431 {
432 PageSetLSN(page, recptr);
433 }
434 }
435
436 if (buffer != InvalidBuffer)
437 UnlockReleaseBuffer(buffer);
438
439 /*
440 * Force pending list cleanup when it becomes too long. And,
441 * ginInsertCleanup could take significant amount of time, so we prefer to
442 * call it when it can do all the work in a single collection cycle. In
443 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
444 * while pending list is still small enough to fit into
445 * gin_pending_list_limit.
446 *
447 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
448 */
449 cleanupSize = GinGetPendingListCleanupSize(index);
450 if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
451 needCleanup = true;
452
453 UnlockReleaseBuffer(metabuffer);
454
455 END_CRIT_SECTION();
456
457 /*
458 * Since it could contend with concurrent cleanup process we cleanup
459 * pending list not forcibly.
460 */
461 if (needCleanup)
462 ginInsertCleanup(ginstate, false, true, false, NULL);
463 }
464
465 /*
466 * Create temporary index tuples for a single indexable item (one index column
467 * for the heap tuple specified by ht_ctid), and append them to the array
468 * in *collector. They will subsequently be written out using
469 * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
470 * temp tuples for a given heap tuple must be written in one call to
471 * ginHeapTupleFastInsert.
472 */
473 void
ginHeapTupleFastCollect(GinState * ginstate,GinTupleCollector * collector,OffsetNumber attnum,Datum value,bool isNull,ItemPointer ht_ctid)474 ginHeapTupleFastCollect(GinState *ginstate,
475 GinTupleCollector *collector,
476 OffsetNumber attnum, Datum value, bool isNull,
477 ItemPointer ht_ctid)
478 {
479 Datum *entries;
480 GinNullCategory *categories;
481 int32 i,
482 nentries;
483
484 /*
485 * Extract the key values that need to be inserted in the index
486 */
487 entries = ginExtractEntries(ginstate, attnum, value, isNull,
488 &nentries, &categories);
489
490 /*
491 * Protect against integer overflow in allocation calculations
492 */
493 if (nentries < 0 ||
494 collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
495 elog(ERROR, "too many entries for GIN index");
496
497 /*
498 * Allocate/reallocate memory for storing collected tuples
499 */
500 if (collector->tuples == NULL)
501 {
502 /*
503 * Determine the number of elements to allocate in the tuples array
504 * initially. Make it a power of 2 to avoid wasting memory when
505 * resizing (since palloc likes powers of 2).
506 */
507 collector->lentuples = pg_nextpower2_32(Max(16, nentries));
508 collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
509 }
510 else if (collector->lentuples < collector->ntuples + nentries)
511 {
512 /*
513 * Advance lentuples to the next suitable power of 2. This won't
514 * overflow, though we could get to a value that exceeds
515 * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
516 */
517 collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
518 collector->tuples = (IndexTuple *) repalloc(collector->tuples,
519 sizeof(IndexTuple) * collector->lentuples);
520 }
521
522 /*
523 * Build an index tuple for each key value, and add to array. In pending
524 * tuples we just stick the heap TID into t_tid.
525 */
526 for (i = 0; i < nentries; i++)
527 {
528 IndexTuple itup;
529
530 itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
531 NULL, 0, 0, true);
532 itup->t_tid = *ht_ctid;
533 collector->tuples[collector->ntuples++] = itup;
534 collector->sumsize += IndexTupleSize(itup);
535 }
536 }
537
538 /*
539 * Deletes pending list pages up to (not including) newHead page.
540 * If newHead == InvalidBlockNumber then function drops the whole list.
541 *
542 * metapage is pinned and exclusive-locked throughout this function.
543 */
544 static void
shiftList(Relation index,Buffer metabuffer,BlockNumber newHead,bool fill_fsm,IndexBulkDeleteResult * stats)545 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
546 bool fill_fsm, IndexBulkDeleteResult *stats)
547 {
548 Page metapage;
549 GinMetaPageData *metadata;
550 BlockNumber blknoToDelete;
551
552 metapage = BufferGetPage(metabuffer);
553 metadata = GinPageGetMeta(metapage);
554 blknoToDelete = metadata->head;
555
556 do
557 {
558 Page page;
559 int i;
560 int64 nDeletedHeapTuples = 0;
561 ginxlogDeleteListPages data;
562 Buffer buffers[GIN_NDELETE_AT_ONCE];
563 BlockNumber freespace[GIN_NDELETE_AT_ONCE];
564
565 data.ndeleted = 0;
566 while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
567 {
568 freespace[data.ndeleted] = blknoToDelete;
569 buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
570 LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
571 page = BufferGetPage(buffers[data.ndeleted]);
572
573 data.ndeleted++;
574
575 Assert(!GinPageIsDeleted(page));
576
577 nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
578 blknoToDelete = GinPageGetOpaque(page)->rightlink;
579 }
580
581 if (stats)
582 stats->pages_deleted += data.ndeleted;
583
584 /*
585 * This operation touches an unusually large number of pages, so
586 * prepare the XLogInsert machinery for that before entering the
587 * critical section.
588 */
589 if (RelationNeedsWAL(index))
590 XLogEnsureRecordSpace(data.ndeleted, 0);
591
592 START_CRIT_SECTION();
593
594 metadata->head = blknoToDelete;
595
596 Assert(metadata->nPendingPages >= data.ndeleted);
597 metadata->nPendingPages -= data.ndeleted;
598 Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
599 metadata->nPendingHeapTuples -= nDeletedHeapTuples;
600
601 if (blknoToDelete == InvalidBlockNumber)
602 {
603 metadata->tail = InvalidBlockNumber;
604 metadata->tailFreeSize = 0;
605 metadata->nPendingPages = 0;
606 metadata->nPendingHeapTuples = 0;
607 }
608
609 /*
610 * Set pd_lower just past the end of the metadata. This is essential,
611 * because without doing so, metadata will be lost if xlog.c
612 * compresses the page. (We must do this here because pre-v11
613 * versions of PG did not set the metapage's pd_lower correctly, so a
614 * pg_upgraded index might contain the wrong value.)
615 */
616 ((PageHeader) metapage)->pd_lower =
617 ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
618
619 MarkBufferDirty(metabuffer);
620
621 for (i = 0; i < data.ndeleted; i++)
622 {
623 page = BufferGetPage(buffers[i]);
624 GinPageGetOpaque(page)->flags = GIN_DELETED;
625 MarkBufferDirty(buffers[i]);
626 }
627
628 if (RelationNeedsWAL(index))
629 {
630 XLogRecPtr recptr;
631
632 XLogBeginInsert();
633 XLogRegisterBuffer(0, metabuffer,
634 REGBUF_WILL_INIT | REGBUF_STANDARD);
635 for (i = 0; i < data.ndeleted; i++)
636 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
637
638 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
639
640 XLogRegisterData((char *) &data,
641 sizeof(ginxlogDeleteListPages));
642
643 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
644 PageSetLSN(metapage, recptr);
645
646 for (i = 0; i < data.ndeleted; i++)
647 {
648 page = BufferGetPage(buffers[i]);
649 PageSetLSN(page, recptr);
650 }
651 }
652
653 for (i = 0; i < data.ndeleted; i++)
654 UnlockReleaseBuffer(buffers[i]);
655
656 END_CRIT_SECTION();
657
658 for (i = 0; fill_fsm && i < data.ndeleted; i++)
659 RecordFreeIndexPage(index, freespace[i]);
660
661 } while (blknoToDelete != newHead);
662 }
663
664 /* Initialize empty KeyArray */
665 static void
initKeyArray(KeyArray * keys,int32 maxvalues)666 initKeyArray(KeyArray *keys, int32 maxvalues)
667 {
668 keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
669 keys->categories = (GinNullCategory *)
670 palloc(sizeof(GinNullCategory) * maxvalues);
671 keys->nvalues = 0;
672 keys->maxvalues = maxvalues;
673 }
674
675 /* Add datum to KeyArray, resizing if needed */
676 static void
addDatum(KeyArray * keys,Datum datum,GinNullCategory category)677 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
678 {
679 if (keys->nvalues >= keys->maxvalues)
680 {
681 keys->maxvalues *= 2;
682 keys->keys = (Datum *)
683 repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
684 keys->categories = (GinNullCategory *)
685 repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
686 }
687
688 keys->keys[keys->nvalues] = datum;
689 keys->categories[keys->nvalues] = category;
690 keys->nvalues++;
691 }
692
693 /*
694 * Collect data from a pending-list page in preparation for insertion into
695 * the main index.
696 *
697 * Go through all tuples >= startoff on page and collect values in accum
698 *
699 * Note that ka is just workspace --- it does not carry any state across
700 * calls.
701 */
702 static void
processPendingPage(BuildAccumulator * accum,KeyArray * ka,Page page,OffsetNumber startoff)703 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
704 Page page, OffsetNumber startoff)
705 {
706 ItemPointerData heapptr;
707 OffsetNumber i,
708 maxoff;
709 OffsetNumber attrnum;
710
711 /* reset *ka to empty */
712 ka->nvalues = 0;
713
714 maxoff = PageGetMaxOffsetNumber(page);
715 Assert(maxoff >= FirstOffsetNumber);
716 ItemPointerSetInvalid(&heapptr);
717 attrnum = 0;
718
719 for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
720 {
721 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
722 OffsetNumber curattnum;
723 Datum curkey;
724 GinNullCategory curcategory;
725
726 /* Check for change of heap TID or attnum */
727 curattnum = gintuple_get_attrnum(accum->ginstate, itup);
728
729 if (!ItemPointerIsValid(&heapptr))
730 {
731 heapptr = itup->t_tid;
732 attrnum = curattnum;
733 }
734 else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
735 curattnum == attrnum))
736 {
737 /*
738 * ginInsertBAEntries can insert several datums per call, but only
739 * for one heap tuple and one column. So call it at a boundary,
740 * and reset ka.
741 */
742 ginInsertBAEntries(accum, &heapptr, attrnum,
743 ka->keys, ka->categories, ka->nvalues);
744 ka->nvalues = 0;
745 heapptr = itup->t_tid;
746 attrnum = curattnum;
747 }
748
749 /* Add key to KeyArray */
750 curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
751 addDatum(ka, curkey, curcategory);
752 }
753
754 /* Dump out all remaining keys */
755 ginInsertBAEntries(accum, &heapptr, attrnum,
756 ka->keys, ka->categories, ka->nvalues);
757 }
758
759 /*
760 * Move tuples from pending pages into regular GIN structure.
761 *
762 * On first glance it looks completely not crash-safe. But if we crash
763 * after posting entries to the main index and before removing them from the
764 * pending list, it's okay because when we redo the posting later on, nothing
765 * bad will happen.
766 *
767 * fill_fsm indicates that ginInsertCleanup should add deleted pages
768 * to FSM otherwise caller is responsible to put deleted pages into
769 * FSM.
770 *
771 * If stats isn't null, we count deleted pending pages into the counts.
772 */
773 void
ginInsertCleanup(GinState * ginstate,bool full_clean,bool fill_fsm,bool forceCleanup,IndexBulkDeleteResult * stats)774 ginInsertCleanup(GinState *ginstate, bool full_clean,
775 bool fill_fsm, bool forceCleanup,
776 IndexBulkDeleteResult *stats)
777 {
778 Relation index = ginstate->index;
779 Buffer metabuffer,
780 buffer;
781 Page metapage,
782 page;
783 GinMetaPageData *metadata;
784 MemoryContext opCtx,
785 oldCtx;
786 BuildAccumulator accum;
787 KeyArray datums;
788 BlockNumber blkno,
789 blknoFinish;
790 bool cleanupFinish = false;
791 bool fsm_vac = false;
792 Size workMemory;
793
794 /*
795 * We would like to prevent concurrent cleanup process. For that we will
796 * lock metapage in exclusive mode using LockPage() call. Nobody other
797 * will use that lock for metapage, so we keep possibility of concurrent
798 * insertion into pending list
799 */
800
801 if (forceCleanup)
802 {
803 /*
804 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
805 * and we would like to wait concurrent cleanup to finish.
806 */
807 LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
808 workMemory =
809 (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
810 autovacuum_work_mem : maintenance_work_mem;
811 }
812 else
813 {
814 /*
815 * We are called from regular insert and if we see concurrent cleanup
816 * just exit in hope that concurrent process will clean up pending
817 * list.
818 */
819 if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
820 return;
821 workMemory = work_mem;
822 }
823
824 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
825 LockBuffer(metabuffer, GIN_SHARE);
826 metapage = BufferGetPage(metabuffer);
827 metadata = GinPageGetMeta(metapage);
828
829 if (metadata->head == InvalidBlockNumber)
830 {
831 /* Nothing to do */
832 UnlockReleaseBuffer(metabuffer);
833 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
834 return;
835 }
836
837 /*
838 * Remember a tail page to prevent infinite cleanup if other backends add
839 * new tuples faster than we can cleanup.
840 */
841 blknoFinish = metadata->tail;
842
843 /*
844 * Read and lock head of pending list
845 */
846 blkno = metadata->head;
847 buffer = ReadBuffer(index, blkno);
848 LockBuffer(buffer, GIN_SHARE);
849 page = BufferGetPage(buffer);
850
851 LockBuffer(metabuffer, GIN_UNLOCK);
852
853 /*
854 * Initialize. All temporary space will be in opCtx
855 */
856 opCtx = AllocSetContextCreate(CurrentMemoryContext,
857 "GIN insert cleanup temporary context",
858 ALLOCSET_DEFAULT_SIZES);
859
860 oldCtx = MemoryContextSwitchTo(opCtx);
861
862 initKeyArray(&datums, 128);
863 ginInitBA(&accum);
864 accum.ginstate = ginstate;
865
866 /*
867 * At the top of this loop, we have pin and lock on the current page of
868 * the pending list. However, we'll release that before exiting the loop.
869 * Note we also have pin but not lock on the metapage.
870 */
871 for (;;)
872 {
873 Assert(!GinPageIsDeleted(page));
874
875 /*
876 * Are we walk through the page which as we remember was a tail when
877 * we start our cleanup? But if caller asks us to clean up whole
878 * pending list then ignore old tail, we will work until list becomes
879 * empty.
880 */
881 if (blkno == blknoFinish && full_clean == false)
882 cleanupFinish = true;
883
884 /*
885 * read page's datums into accum
886 */
887 processPendingPage(&accum, &datums, page, FirstOffsetNumber);
888
889 vacuum_delay_point();
890
891 /*
892 * Is it time to flush memory to disk? Flush if we are at the end of
893 * the pending list, or if we have a full row and memory is getting
894 * full.
895 */
896 if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
897 (GinPageHasFullRow(page) &&
898 (accum.allocatedMemory >= workMemory * 1024L)))
899 {
900 ItemPointerData *list;
901 uint32 nlist;
902 Datum key;
903 GinNullCategory category;
904 OffsetNumber maxoff,
905 attnum;
906
907 /*
908 * Unlock current page to increase performance. Changes of page
909 * will be checked later by comparing maxoff after completion of
910 * memory flush.
911 */
912 maxoff = PageGetMaxOffsetNumber(page);
913 LockBuffer(buffer, GIN_UNLOCK);
914
915 /*
916 * Moving collected data into regular structure can take
917 * significant amount of time - so, run it without locking pending
918 * list.
919 */
920 ginBeginBAScan(&accum);
921 while ((list = ginGetBAEntry(&accum,
922 &attnum, &key, &category, &nlist)) != NULL)
923 {
924 ginEntryInsert(ginstate, attnum, key, category,
925 list, nlist, NULL);
926 vacuum_delay_point();
927 }
928
929 /*
930 * Lock the whole list to remove pages
931 */
932 LockBuffer(metabuffer, GIN_EXCLUSIVE);
933 LockBuffer(buffer, GIN_SHARE);
934
935 Assert(!GinPageIsDeleted(page));
936
937 /*
938 * While we left the page unlocked, more stuff might have gotten
939 * added to it. If so, process those entries immediately. There
940 * shouldn't be very many, so we don't worry about the fact that
941 * we're doing this with exclusive lock. Insertion algorithm
942 * guarantees that inserted row(s) will not continue on next page.
943 * NOTE: intentionally no vacuum_delay_point in this loop.
944 */
945 if (PageGetMaxOffsetNumber(page) != maxoff)
946 {
947 ginInitBA(&accum);
948 processPendingPage(&accum, &datums, page, maxoff + 1);
949
950 ginBeginBAScan(&accum);
951 while ((list = ginGetBAEntry(&accum,
952 &attnum, &key, &category, &nlist)) != NULL)
953 ginEntryInsert(ginstate, attnum, key, category,
954 list, nlist, NULL);
955 }
956
957 /*
958 * Remember next page - it will become the new list head
959 */
960 blkno = GinPageGetOpaque(page)->rightlink;
961 UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
962 * locking */
963
964 /*
965 * remove read pages from pending list, at this point all content
966 * of read pages is in regular structure
967 */
968 shiftList(index, metabuffer, blkno, fill_fsm, stats);
969
970 /* At this point, some pending pages have been freed up */
971 fsm_vac = true;
972
973 Assert(blkno == metadata->head);
974 LockBuffer(metabuffer, GIN_UNLOCK);
975
976 /*
977 * if we removed the whole pending list or we cleanup tail (which
978 * we remembered on start our cleanup process) then just exit
979 */
980 if (blkno == InvalidBlockNumber || cleanupFinish)
981 break;
982
983 /*
984 * release memory used so far and reinit state
985 */
986 MemoryContextReset(opCtx);
987 initKeyArray(&datums, datums.maxvalues);
988 ginInitBA(&accum);
989 }
990 else
991 {
992 blkno = GinPageGetOpaque(page)->rightlink;
993 UnlockReleaseBuffer(buffer);
994 }
995
996 /*
997 * Read next page in pending list
998 */
999 vacuum_delay_point();
1000 buffer = ReadBuffer(index, blkno);
1001 LockBuffer(buffer, GIN_SHARE);
1002 page = BufferGetPage(buffer);
1003 }
1004
1005 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
1006 ReleaseBuffer(metabuffer);
1007
1008 /*
1009 * As pending list pages can have a high churn rate, it is desirable to
1010 * recycle them immediately to the FreeSpaceMap when ordinary backends
1011 * clean the list.
1012 */
1013 if (fsm_vac && fill_fsm)
1014 IndexFreeSpaceMapVacuum(index);
1015
1016 /* Clean up temporary space */
1017 MemoryContextSwitchTo(oldCtx);
1018 MemoryContextDelete(opCtx);
1019 }
1020
1021 /*
1022 * SQL-callable function to clean the insert pending list
1023 */
1024 Datum
gin_clean_pending_list(PG_FUNCTION_ARGS)1025 gin_clean_pending_list(PG_FUNCTION_ARGS)
1026 {
1027 Oid indexoid = PG_GETARG_OID(0);
1028 Relation indexRel = index_open(indexoid, RowExclusiveLock);
1029 IndexBulkDeleteResult stats;
1030 GinState ginstate;
1031
1032 if (RecoveryInProgress())
1033 ereport(ERROR,
1034 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1035 errmsg("recovery is in progress"),
1036 errhint("GIN pending list cannot be cleaned up during recovery.")));
1037
1038 /* Must be a GIN index */
1039 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1040 indexRel->rd_rel->relam != GIN_AM_OID)
1041 ereport(ERROR,
1042 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1043 errmsg("\"%s\" is not a GIN index",
1044 RelationGetRelationName(indexRel))));
1045
1046 /*
1047 * Reject attempts to read non-local temporary relations; we would be
1048 * likely to get wrong data since we have no visibility into the owning
1049 * session's local buffers.
1050 */
1051 if (RELATION_IS_OTHER_TEMP(indexRel))
1052 ereport(ERROR,
1053 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1054 errmsg("cannot access temporary indexes of other sessions")));
1055
1056 /* User must own the index (comparable to privileges needed for VACUUM) */
1057 if (!pg_class_ownercheck(indexoid, GetUserId()))
1058 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1059 RelationGetRelationName(indexRel));
1060
1061 memset(&stats, 0, sizeof(stats));
1062 initGinState(&ginstate, indexRel);
1063 ginInsertCleanup(&ginstate, true, true, true, &stats);
1064
1065 index_close(indexRel, RowExclusiveLock);
1066
1067 PG_RETURN_INT64((int64) stats.pages_deleted);
1068 }
1069