1 /*-------------------------------------------------------------------------
2 *
3 * ginfast.c
4 * Fast insert routines for the Postgres inverted index access method.
5 * Pending entries are stored in linear list of pages. Later on
6 * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 * transfer pending entries into the regular index structure. This
8 * wins because bulk insertion is much more efficient than retail.
9 *
10 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
12 *
13 * IDENTIFICATION
14 * src/backend/access/gin/ginfast.c
15 *
16 *-------------------------------------------------------------------------
17 */
18
19 #include "postgres.h"
20
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xloginsert.h"
24 #include "access/xlog.h"
25 #include "commands/vacuum.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 #include "utils/acl.h"
31 #include "postmaster/autovacuum.h"
32 #include "storage/indexfsm.h"
33 #include "storage/lmgr.h"
34 #include "utils/builtins.h"
35
36 /* GUC parameter */
37 int gin_pending_list_limit = 0;
38
39 #define GIN_PAGE_FREESIZE \
40 ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
41
42 typedef struct KeyArray
43 {
44 Datum *keys; /* expansible array */
45 GinNullCategory *categories; /* another expansible array */
46 int32 nvalues; /* current number of valid entries */
47 int32 maxvalues; /* allocated size of arrays */
48 } KeyArray;
49
50
51 /*
52 * Build a pending-list page from the given array of tuples, and write it out.
53 *
54 * Returns amount of free space left on the page.
55 */
56 static int32
writeListPage(Relation index,Buffer buffer,IndexTuple * tuples,int32 ntuples,BlockNumber rightlink)57 writeListPage(Relation index, Buffer buffer,
58 IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
59 {
60 Page page = BufferGetPage(buffer);
61 int32 i,
62 freesize,
63 size = 0;
64 OffsetNumber l,
65 off;
66 PGAlignedBlock workspace;
67 char *ptr;
68
69 START_CRIT_SECTION();
70
71 GinInitBuffer(buffer, GIN_LIST);
72
73 off = FirstOffsetNumber;
74 ptr = workspace.data;
75
76 for (i = 0; i < ntuples; i++)
77 {
78 int this_size = IndexTupleSize(tuples[i]);
79
80 memcpy(ptr, tuples[i], this_size);
81 ptr += this_size;
82 size += this_size;
83
84 l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
85
86 if (l == InvalidOffsetNumber)
87 elog(ERROR, "failed to add item to index page in \"%s\"",
88 RelationGetRelationName(index));
89
90 off++;
91 }
92
93 Assert(size <= BLCKSZ); /* else we overran workspace */
94
95 GinPageGetOpaque(page)->rightlink = rightlink;
96
97 /*
98 * tail page may contain only whole row(s) or final part of row placed on
99 * previous pages (a "row" here meaning all the index tuples generated for
100 * one heap tuple)
101 */
102 if (rightlink == InvalidBlockNumber)
103 {
104 GinPageSetFullRow(page);
105 GinPageGetOpaque(page)->maxoff = 1;
106 }
107 else
108 {
109 GinPageGetOpaque(page)->maxoff = 0;
110 }
111
112 MarkBufferDirty(buffer);
113
114 if (RelationNeedsWAL(index))
115 {
116 ginxlogInsertListPage data;
117 XLogRecPtr recptr;
118
119 data.rightlink = rightlink;
120 data.ntuples = ntuples;
121
122 XLogBeginInsert();
123 XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
124
125 XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
126 XLogRegisterBufData(0, workspace.data, size);
127
128 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
129 PageSetLSN(page, recptr);
130 }
131
132 /* get free space before releasing buffer */
133 freesize = PageGetExactFreeSpace(page);
134
135 UnlockReleaseBuffer(buffer);
136
137 END_CRIT_SECTION();
138
139 return freesize;
140 }
141
142 static void
makeSublist(Relation index,IndexTuple * tuples,int32 ntuples,GinMetaPageData * res)143 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
144 GinMetaPageData *res)
145 {
146 Buffer curBuffer = InvalidBuffer;
147 Buffer prevBuffer = InvalidBuffer;
148 int i,
149 size = 0,
150 tupsize;
151 int startTuple = 0;
152
153 Assert(ntuples > 0);
154
155 /*
156 * Split tuples into pages
157 */
158 for (i = 0; i < ntuples; i++)
159 {
160 if (curBuffer == InvalidBuffer)
161 {
162 curBuffer = GinNewBuffer(index);
163
164 if (prevBuffer != InvalidBuffer)
165 {
166 res->nPendingPages++;
167 writeListPage(index, prevBuffer,
168 tuples + startTuple,
169 i - startTuple,
170 BufferGetBlockNumber(curBuffer));
171 }
172 else
173 {
174 res->head = BufferGetBlockNumber(curBuffer);
175 }
176
177 prevBuffer = curBuffer;
178 startTuple = i;
179 size = 0;
180 }
181
182 tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
183
184 if (size + tupsize > GinListPageSize)
185 {
186 /* won't fit, force a new page and reprocess */
187 i--;
188 curBuffer = InvalidBuffer;
189 }
190 else
191 {
192 size += tupsize;
193 }
194 }
195
196 /*
197 * Write last page
198 */
199 res->tail = BufferGetBlockNumber(curBuffer);
200 res->tailFreeSize = writeListPage(index, curBuffer,
201 tuples + startTuple,
202 ntuples - startTuple,
203 InvalidBlockNumber);
204 res->nPendingPages++;
205 /* that was only one heap tuple */
206 res->nPendingHeapTuples = 1;
207 }
208
209 /*
210 * Write the index tuples contained in *collector into the index's
211 * pending list.
212 *
213 * Function guarantees that all these tuples will be inserted consecutively,
214 * preserving order
215 */
216 void
ginHeapTupleFastInsert(GinState * ginstate,GinTupleCollector * collector)217 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
218 {
219 Relation index = ginstate->index;
220 Buffer metabuffer;
221 Page metapage;
222 GinMetaPageData *metadata = NULL;
223 Buffer buffer = InvalidBuffer;
224 Page page = NULL;
225 ginxlogUpdateMeta data;
226 bool separateList = false;
227 bool needCleanup = false;
228 int cleanupSize;
229 bool needWal;
230
231 if (collector->ntuples == 0)
232 return;
233
234 needWal = RelationNeedsWAL(index);
235
236 data.node = index->rd_node;
237 data.ntuples = 0;
238 data.newRightlink = data.prevTail = InvalidBlockNumber;
239
240 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
241 metapage = BufferGetPage(metabuffer);
242
243 if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
244 {
245 /*
246 * Total size is greater than one page => make sublist
247 */
248 separateList = true;
249 }
250 else
251 {
252 LockBuffer(metabuffer, GIN_EXCLUSIVE);
253 metadata = GinPageGetMeta(metapage);
254
255 if (metadata->head == InvalidBlockNumber ||
256 collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
257 {
258 /*
259 * Pending list is empty or total size is greater than freespace
260 * on tail page => make sublist
261 *
262 * We unlock metabuffer to keep high concurrency
263 */
264 separateList = true;
265 LockBuffer(metabuffer, GIN_UNLOCK);
266 }
267 }
268
269 if (separateList)
270 {
271 /*
272 * We should make sublist separately and append it to the tail
273 */
274 GinMetaPageData sublist;
275
276 memset(&sublist, 0, sizeof(GinMetaPageData));
277 makeSublist(index, collector->tuples, collector->ntuples, &sublist);
278
279 if (needWal)
280 XLogBeginInsert();
281
282 /*
283 * metapage was unlocked, see above
284 */
285 LockBuffer(metabuffer, GIN_EXCLUSIVE);
286 metadata = GinPageGetMeta(metapage);
287
288 if (metadata->head == InvalidBlockNumber)
289 {
290 /*
291 * Main list is empty, so just insert sublist as main list
292 */
293 START_CRIT_SECTION();
294
295 metadata->head = sublist.head;
296 metadata->tail = sublist.tail;
297 metadata->tailFreeSize = sublist.tailFreeSize;
298
299 metadata->nPendingPages = sublist.nPendingPages;
300 metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
301 }
302 else
303 {
304 /*
305 * Merge lists
306 */
307 data.prevTail = metadata->tail;
308 data.newRightlink = sublist.head;
309
310 buffer = ReadBuffer(index, metadata->tail);
311 LockBuffer(buffer, GIN_EXCLUSIVE);
312 page = BufferGetPage(buffer);
313
314 Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
315
316 START_CRIT_SECTION();
317
318 GinPageGetOpaque(page)->rightlink = sublist.head;
319
320 MarkBufferDirty(buffer);
321
322 metadata->tail = sublist.tail;
323 metadata->tailFreeSize = sublist.tailFreeSize;
324
325 metadata->nPendingPages += sublist.nPendingPages;
326 metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
327
328 if (needWal)
329 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
330 }
331 }
332 else
333 {
334 /*
335 * Insert into tail page. Metapage is already locked
336 */
337 OffsetNumber l,
338 off;
339 int i,
340 tupsize;
341 char *ptr;
342 char *collectordata;
343
344 buffer = ReadBuffer(index, metadata->tail);
345 LockBuffer(buffer, GIN_EXCLUSIVE);
346 page = BufferGetPage(buffer);
347
348 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
349 OffsetNumberNext(PageGetMaxOffsetNumber(page));
350
351 collectordata = ptr = (char *) palloc(collector->sumsize);
352
353 data.ntuples = collector->ntuples;
354
355 if (needWal)
356 XLogBeginInsert();
357
358 START_CRIT_SECTION();
359
360 /*
361 * Increase counter of heap tuples
362 */
363 Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
364 GinPageGetOpaque(page)->maxoff++;
365 metadata->nPendingHeapTuples++;
366
367 for (i = 0; i < collector->ntuples; i++)
368 {
369 tupsize = IndexTupleSize(collector->tuples[i]);
370 l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
371
372 if (l == InvalidOffsetNumber)
373 elog(ERROR, "failed to add item to index page in \"%s\"",
374 RelationGetRelationName(index));
375
376 memcpy(ptr, collector->tuples[i], tupsize);
377 ptr += tupsize;
378
379 off++;
380 }
381
382 Assert((ptr - collectordata) <= collector->sumsize);
383 if (needWal)
384 {
385 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
386 XLogRegisterBufData(1, collectordata, collector->sumsize);
387 }
388
389 metadata->tailFreeSize = PageGetExactFreeSpace(page);
390
391 MarkBufferDirty(buffer);
392 }
393
394 /*
395 * Write metabuffer, make xlog entry
396 */
397 MarkBufferDirty(metabuffer);
398
399 if (needWal)
400 {
401 XLogRecPtr recptr;
402
403 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
404
405 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
406 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
407
408 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
409 PageSetLSN(metapage, recptr);
410
411 if (buffer != InvalidBuffer)
412 {
413 PageSetLSN(page, recptr);
414 }
415 }
416
417 if (buffer != InvalidBuffer)
418 UnlockReleaseBuffer(buffer);
419
420 /*
421 * Force pending list cleanup when it becomes too long. And,
422 * ginInsertCleanup could take significant amount of time, so we prefer to
423 * call it when it can do all the work in a single collection cycle. In
424 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
425 * while pending list is still small enough to fit into
426 * gin_pending_list_limit.
427 *
428 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
429 */
430 cleanupSize = GinGetPendingListCleanupSize(index);
431 if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
432 needCleanup = true;
433
434 UnlockReleaseBuffer(metabuffer);
435
436 END_CRIT_SECTION();
437
438 /*
439 * Since it could contend with concurrent cleanup process we cleanup
440 * pending list not forcibly.
441 */
442 if (needCleanup)
443 ginInsertCleanup(ginstate, false, true, false, NULL);
444 }
445
446 /*
447 * Create temporary index tuples for a single indexable item (one index column
448 * for the heap tuple specified by ht_ctid), and append them to the array
449 * in *collector. They will subsequently be written out using
450 * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
451 * temp tuples for a given heap tuple must be written in one call to
452 * ginHeapTupleFastInsert.
453 */
454 void
ginHeapTupleFastCollect(GinState * ginstate,GinTupleCollector * collector,OffsetNumber attnum,Datum value,bool isNull,ItemPointer ht_ctid)455 ginHeapTupleFastCollect(GinState *ginstate,
456 GinTupleCollector *collector,
457 OffsetNumber attnum, Datum value, bool isNull,
458 ItemPointer ht_ctid)
459 {
460 Datum *entries;
461 GinNullCategory *categories;
462 int32 i,
463 nentries;
464
465 /*
466 * Extract the key values that need to be inserted in the index
467 */
468 entries = ginExtractEntries(ginstate, attnum, value, isNull,
469 &nentries, &categories);
470
471 /*
472 * Allocate/reallocate memory for storing collected tuples
473 */
474 if (collector->tuples == NULL)
475 {
476 collector->lentuples = nentries * ginstate->origTupdesc->natts;
477 collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
478 }
479
480 while (collector->ntuples + nentries > collector->lentuples)
481 {
482 collector->lentuples *= 2;
483 collector->tuples = (IndexTuple *) repalloc(collector->tuples,
484 sizeof(IndexTuple) * collector->lentuples);
485 }
486
487 /*
488 * Build an index tuple for each key value, and add to array. In pending
489 * tuples we just stick the heap TID into t_tid.
490 */
491 for (i = 0; i < nentries; i++)
492 {
493 IndexTuple itup;
494
495 itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
496 NULL, 0, 0, true);
497 itup->t_tid = *ht_ctid;
498 collector->tuples[collector->ntuples++] = itup;
499 collector->sumsize += IndexTupleSize(itup);
500 }
501 }
502
503 /*
504 * Deletes pending list pages up to (not including) newHead page.
505 * If newHead == InvalidBlockNumber then function drops the whole list.
506 *
507 * metapage is pinned and exclusive-locked throughout this function.
508 */
509 static void
shiftList(Relation index,Buffer metabuffer,BlockNumber newHead,bool fill_fsm,IndexBulkDeleteResult * stats)510 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
511 bool fill_fsm, IndexBulkDeleteResult *stats)
512 {
513 Page metapage;
514 GinMetaPageData *metadata;
515 BlockNumber blknoToDelete;
516
517 metapage = BufferGetPage(metabuffer);
518 metadata = GinPageGetMeta(metapage);
519 blknoToDelete = metadata->head;
520
521 do
522 {
523 Page page;
524 int i;
525 int64 nDeletedHeapTuples = 0;
526 ginxlogDeleteListPages data;
527 Buffer buffers[GIN_NDELETE_AT_ONCE];
528 BlockNumber freespace[GIN_NDELETE_AT_ONCE];
529
530 data.ndeleted = 0;
531 while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
532 {
533 freespace[data.ndeleted] = blknoToDelete;
534 buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
535 LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
536 page = BufferGetPage(buffers[data.ndeleted]);
537
538 data.ndeleted++;
539
540 Assert(!GinPageIsDeleted(page));
541
542 nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
543 blknoToDelete = GinPageGetOpaque(page)->rightlink;
544 }
545
546 if (stats)
547 stats->pages_deleted += data.ndeleted;
548
549 /*
550 * This operation touches an unusually large number of pages, so
551 * prepare the XLogInsert machinery for that before entering the
552 * critical section.
553 */
554 if (RelationNeedsWAL(index))
555 XLogEnsureRecordSpace(data.ndeleted, 0);
556
557 START_CRIT_SECTION();
558
559 metadata->head = blknoToDelete;
560
561 Assert(metadata->nPendingPages >= data.ndeleted);
562 metadata->nPendingPages -= data.ndeleted;
563 Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
564 metadata->nPendingHeapTuples -= nDeletedHeapTuples;
565
566 if (blknoToDelete == InvalidBlockNumber)
567 {
568 metadata->tail = InvalidBlockNumber;
569 metadata->tailFreeSize = 0;
570 metadata->nPendingPages = 0;
571 metadata->nPendingHeapTuples = 0;
572 }
573
574 MarkBufferDirty(metabuffer);
575
576 for (i = 0; i < data.ndeleted; i++)
577 {
578 page = BufferGetPage(buffers[i]);
579 GinPageGetOpaque(page)->flags = GIN_DELETED;
580 MarkBufferDirty(buffers[i]);
581 }
582
583 if (RelationNeedsWAL(index))
584 {
585 XLogRecPtr recptr;
586
587 XLogBeginInsert();
588 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
589 for (i = 0; i < data.ndeleted; i++)
590 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
591
592 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
593
594 XLogRegisterData((char *) &data,
595 sizeof(ginxlogDeleteListPages));
596
597 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
598 PageSetLSN(metapage, recptr);
599
600 for (i = 0; i < data.ndeleted; i++)
601 {
602 page = BufferGetPage(buffers[i]);
603 PageSetLSN(page, recptr);
604 }
605 }
606
607 for (i = 0; i < data.ndeleted; i++)
608 UnlockReleaseBuffer(buffers[i]);
609
610 END_CRIT_SECTION();
611
612 for (i = 0; fill_fsm && i < data.ndeleted; i++)
613 RecordFreeIndexPage(index, freespace[i]);
614
615 } while (blknoToDelete != newHead);
616 }
617
618 /* Initialize empty KeyArray */
619 static void
initKeyArray(KeyArray * keys,int32 maxvalues)620 initKeyArray(KeyArray *keys, int32 maxvalues)
621 {
622 keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
623 keys->categories = (GinNullCategory *)
624 palloc(sizeof(GinNullCategory) * maxvalues);
625 keys->nvalues = 0;
626 keys->maxvalues = maxvalues;
627 }
628
629 /* Add datum to KeyArray, resizing if needed */
630 static void
addDatum(KeyArray * keys,Datum datum,GinNullCategory category)631 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
632 {
633 if (keys->nvalues >= keys->maxvalues)
634 {
635 keys->maxvalues *= 2;
636 keys->keys = (Datum *)
637 repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
638 keys->categories = (GinNullCategory *)
639 repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
640 }
641
642 keys->keys[keys->nvalues] = datum;
643 keys->categories[keys->nvalues] = category;
644 keys->nvalues++;
645 }
646
647 /*
648 * Collect data from a pending-list page in preparation for insertion into
649 * the main index.
650 *
651 * Go through all tuples >= startoff on page and collect values in accum
652 *
653 * Note that ka is just workspace --- it does not carry any state across
654 * calls.
655 */
656 static void
processPendingPage(BuildAccumulator * accum,KeyArray * ka,Page page,OffsetNumber startoff)657 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
658 Page page, OffsetNumber startoff)
659 {
660 ItemPointerData heapptr;
661 OffsetNumber i,
662 maxoff;
663 OffsetNumber attrnum;
664
665 /* reset *ka to empty */
666 ka->nvalues = 0;
667
668 maxoff = PageGetMaxOffsetNumber(page);
669 Assert(maxoff >= FirstOffsetNumber);
670 ItemPointerSetInvalid(&heapptr);
671 attrnum = 0;
672
673 for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
674 {
675 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
676 OffsetNumber curattnum;
677 Datum curkey;
678 GinNullCategory curcategory;
679
680 /* Check for change of heap TID or attnum */
681 curattnum = gintuple_get_attrnum(accum->ginstate, itup);
682
683 if (!ItemPointerIsValid(&heapptr))
684 {
685 heapptr = itup->t_tid;
686 attrnum = curattnum;
687 }
688 else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
689 curattnum == attrnum))
690 {
691 /*
692 * ginInsertBAEntries can insert several datums per call, but only
693 * for one heap tuple and one column. So call it at a boundary,
694 * and reset ka.
695 */
696 ginInsertBAEntries(accum, &heapptr, attrnum,
697 ka->keys, ka->categories, ka->nvalues);
698 ka->nvalues = 0;
699 heapptr = itup->t_tid;
700 attrnum = curattnum;
701 }
702
703 /* Add key to KeyArray */
704 curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
705 addDatum(ka, curkey, curcategory);
706 }
707
708 /* Dump out all remaining keys */
709 ginInsertBAEntries(accum, &heapptr, attrnum,
710 ka->keys, ka->categories, ka->nvalues);
711 }
712
713 /*
714 * Move tuples from pending pages into regular GIN structure.
715 *
716 * On first glance it looks completely not crash-safe. But if we crash
717 * after posting entries to the main index and before removing them from the
718 * pending list, it's okay because when we redo the posting later on, nothing
719 * bad will happen.
720 *
721 * fill_fsm indicates that ginInsertCleanup should add deleted pages
722 * to FSM otherwise caller is responsible to put deleted pages into
723 * FSM.
724 *
725 * If stats isn't null, we count deleted pending pages into the counts.
726 */
727 void
ginInsertCleanup(GinState * ginstate,bool full_clean,bool fill_fsm,bool forceCleanup,IndexBulkDeleteResult * stats)728 ginInsertCleanup(GinState *ginstate, bool full_clean,
729 bool fill_fsm, bool forceCleanup,
730 IndexBulkDeleteResult *stats)
731 {
732 Relation index = ginstate->index;
733 Buffer metabuffer,
734 buffer;
735 Page metapage,
736 page;
737 GinMetaPageData *metadata;
738 MemoryContext opCtx,
739 oldCtx;
740 BuildAccumulator accum;
741 KeyArray datums;
742 BlockNumber blkno,
743 blknoFinish;
744 bool cleanupFinish = false;
745 bool fsm_vac = false;
746 Size workMemory;
747
748 /*
749 * We would like to prevent concurrent cleanup process. For that we will
750 * lock metapage in exclusive mode using LockPage() call. Nobody other
751 * will use that lock for metapage, so we keep possibility of concurrent
752 * insertion into pending list
753 */
754
755 if (forceCleanup)
756 {
757 /*
758 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
759 * and we would like to wait concurrent cleanup to finish.
760 */
761 LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
762 workMemory =
763 (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
764 autovacuum_work_mem : maintenance_work_mem;
765 }
766 else
767 {
768 /*
769 * We are called from regular insert and if we see concurrent cleanup
770 * just exit in hope that concurrent process will clean up pending
771 * list.
772 */
773 if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
774 return;
775 workMemory = work_mem;
776 }
777
778 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
779 LockBuffer(metabuffer, GIN_SHARE);
780 metapage = BufferGetPage(metabuffer);
781 metadata = GinPageGetMeta(metapage);
782
783 if (metadata->head == InvalidBlockNumber)
784 {
785 /* Nothing to do */
786 UnlockReleaseBuffer(metabuffer);
787 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
788 return;
789 }
790
791 /*
792 * Remember a tail page to prevent infinite cleanup if other backends add
793 * new tuples faster than we can cleanup.
794 */
795 blknoFinish = metadata->tail;
796
797 /*
798 * Read and lock head of pending list
799 */
800 blkno = metadata->head;
801 buffer = ReadBuffer(index, blkno);
802 LockBuffer(buffer, GIN_SHARE);
803 page = BufferGetPage(buffer);
804
805 LockBuffer(metabuffer, GIN_UNLOCK);
806
807 /*
808 * Initialize. All temporary space will be in opCtx
809 */
810 opCtx = AllocSetContextCreate(CurrentMemoryContext,
811 "GIN insert cleanup temporary context",
812 ALLOCSET_DEFAULT_SIZES);
813
814 oldCtx = MemoryContextSwitchTo(opCtx);
815
816 initKeyArray(&datums, 128);
817 ginInitBA(&accum);
818 accum.ginstate = ginstate;
819
820 /*
821 * At the top of this loop, we have pin and lock on the current page of
822 * the pending list. However, we'll release that before exiting the loop.
823 * Note we also have pin but not lock on the metapage.
824 */
825 for (;;)
826 {
827 Assert(!GinPageIsDeleted(page));
828
829 /*
830 * Are we walk through the page which as we remember was a tail when
831 * we start our cleanup? But if caller asks us to clean up whole
832 * pending list then ignore old tail, we will work until list becomes
833 * empty.
834 */
835 if (blkno == blknoFinish && full_clean == false)
836 cleanupFinish = true;
837
838 /*
839 * read page's datums into accum
840 */
841 processPendingPage(&accum, &datums, page, FirstOffsetNumber);
842
843 vacuum_delay_point();
844
845 /*
846 * Is it time to flush memory to disk? Flush if we are at the end of
847 * the pending list, or if we have a full row and memory is getting
848 * full.
849 */
850 if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
851 (GinPageHasFullRow(page) &&
852 (accum.allocatedMemory >= workMemory * 1024L)))
853 {
854 ItemPointerData *list;
855 uint32 nlist;
856 Datum key;
857 GinNullCategory category;
858 OffsetNumber maxoff,
859 attnum;
860
861 /*
862 * Unlock current page to increase performance. Changes of page
863 * will be checked later by comparing maxoff after completion of
864 * memory flush.
865 */
866 maxoff = PageGetMaxOffsetNumber(page);
867 LockBuffer(buffer, GIN_UNLOCK);
868
869 /*
870 * Moving collected data into regular structure can take
871 * significant amount of time - so, run it without locking pending
872 * list.
873 */
874 ginBeginBAScan(&accum);
875 while ((list = ginGetBAEntry(&accum,
876 &attnum, &key, &category, &nlist)) != NULL)
877 {
878 ginEntryInsert(ginstate, attnum, key, category,
879 list, nlist, NULL);
880 vacuum_delay_point();
881 }
882
883 /*
884 * Lock the whole list to remove pages
885 */
886 LockBuffer(metabuffer, GIN_EXCLUSIVE);
887 LockBuffer(buffer, GIN_SHARE);
888
889 Assert(!GinPageIsDeleted(page));
890
891 /*
892 * While we left the page unlocked, more stuff might have gotten
893 * added to it. If so, process those entries immediately. There
894 * shouldn't be very many, so we don't worry about the fact that
895 * we're doing this with exclusive lock. Insertion algorithm
896 * guarantees that inserted row(s) will not continue on next page.
897 * NOTE: intentionally no vacuum_delay_point in this loop.
898 */
899 if (PageGetMaxOffsetNumber(page) != maxoff)
900 {
901 ginInitBA(&accum);
902 processPendingPage(&accum, &datums, page, maxoff + 1);
903
904 ginBeginBAScan(&accum);
905 while ((list = ginGetBAEntry(&accum,
906 &attnum, &key, &category, &nlist)) != NULL)
907 ginEntryInsert(ginstate, attnum, key, category,
908 list, nlist, NULL);
909 }
910
911 /*
912 * Remember next page - it will become the new list head
913 */
914 blkno = GinPageGetOpaque(page)->rightlink;
915 UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
916 * locking */
917
918 /*
919 * remove read pages from pending list, at this point all content
920 * of read pages is in regular structure
921 */
922 shiftList(index, metabuffer, blkno, fill_fsm, stats);
923
924 /* At this point, some pending pages have been freed up */
925 fsm_vac = true;
926
927 Assert(blkno == metadata->head);
928 LockBuffer(metabuffer, GIN_UNLOCK);
929
930 /*
931 * if we removed the whole pending list or we cleanup tail (which
932 * we remembered on start our cleanup process) then just exit
933 */
934 if (blkno == InvalidBlockNumber || cleanupFinish)
935 break;
936
937 /*
938 * release memory used so far and reinit state
939 */
940 MemoryContextReset(opCtx);
941 initKeyArray(&datums, datums.maxvalues);
942 ginInitBA(&accum);
943 }
944 else
945 {
946 blkno = GinPageGetOpaque(page)->rightlink;
947 UnlockReleaseBuffer(buffer);
948 }
949
950 /*
951 * Read next page in pending list
952 */
953 vacuum_delay_point();
954 buffer = ReadBuffer(index, blkno);
955 LockBuffer(buffer, GIN_SHARE);
956 page = BufferGetPage(buffer);
957 }
958
959 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
960 ReleaseBuffer(metabuffer);
961
962 /*
963 * As pending list pages can have a high churn rate, it is desirable to
964 * recycle them immediately to the FreeSpace Map when ordinary backends
965 * clean the list.
966 */
967 if (fsm_vac && fill_fsm)
968 IndexFreeSpaceMapVacuum(index);
969
970
971 /* Clean up temporary space */
972 MemoryContextSwitchTo(oldCtx);
973 MemoryContextDelete(opCtx);
974 }
975
976 /*
977 * SQL-callable function to clean the insert pending list
978 */
979 Datum
gin_clean_pending_list(PG_FUNCTION_ARGS)980 gin_clean_pending_list(PG_FUNCTION_ARGS)
981 {
982 Oid indexoid = PG_GETARG_OID(0);
983 Relation indexRel = index_open(indexoid, AccessShareLock);
984 IndexBulkDeleteResult stats;
985 GinState ginstate;
986
987 if (RecoveryInProgress())
988 ereport(ERROR,
989 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
990 errmsg("recovery is in progress"),
991 errhint("GIN pending list cannot be cleaned up during recovery.")));
992
993 /* Must be a GIN index */
994 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
995 indexRel->rd_rel->relam != GIN_AM_OID)
996 ereport(ERROR,
997 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
998 errmsg("\"%s\" is not a GIN index",
999 RelationGetRelationName(indexRel))));
1000
1001 /*
1002 * Reject attempts to read non-local temporary relations; we would be
1003 * likely to get wrong data since we have no visibility into the owning
1004 * session's local buffers.
1005 */
1006 if (RELATION_IS_OTHER_TEMP(indexRel))
1007 ereport(ERROR,
1008 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1009 errmsg("cannot access temporary indexes of other sessions")));
1010
1011 /* User must own the index (comparable to privileges needed for VACUUM) */
1012 if (!pg_class_ownercheck(indexoid, GetUserId()))
1013 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1014 RelationGetRelationName(indexRel));
1015
1016 memset(&stats, 0, sizeof(stats));
1017 initGinState(&ginstate, indexRel);
1018 ginInsertCleanup(&ginstate, true, true, true, &stats);
1019
1020 index_close(indexRel, AccessShareLock);
1021
1022 PG_RETURN_INT64((int64) stats.pages_deleted);
1023 }
1024