1 /*-------------------------------------------------------------------------
2 *
3 * ginfast.c
4 * Fast insert routines for the Postgres inverted index access method.
5 * Pending entries are stored in linear list of pages. Later on
6 * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 * transfer pending entries into the regular index structure. This
8 * wins because bulk insertion is much more efficient than retail.
9 *
10 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
12 *
13 * IDENTIFICATION
14 * src/backend/access/gin/ginfast.c
15 *
16 *-------------------------------------------------------------------------
17 */
18
19 #include "postgres.h"
20
21 #include "access/gin_private.h"
22 #include "access/xloginsert.h"
23 #include "access/xlog.h"
24 #include "commands/vacuum.h"
25 #include "catalog/pg_am.h"
26 #include "miscadmin.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 #include "utils/acl.h"
30 #include "postmaster/autovacuum.h"
31 #include "storage/indexfsm.h"
32 #include "storage/lmgr.h"
33
34 /* GUC parameter */
35 int gin_pending_list_limit = 0;
36
37 #define GIN_PAGE_FREESIZE \
38 ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
39
40 typedef struct KeyArray
41 {
42 Datum *keys; /* expansible array */
43 GinNullCategory *categories; /* another expansible array */
44 int32 nvalues; /* current number of valid entries */
45 int32 maxvalues; /* allocated size of arrays */
46 } KeyArray;
47
48
49 /*
50 * Build a pending-list page from the given array of tuples, and write it out.
51 *
52 * Returns amount of free space left on the page.
53 */
54 static int32
writeListPage(Relation index,Buffer buffer,IndexTuple * tuples,int32 ntuples,BlockNumber rightlink)55 writeListPage(Relation index, Buffer buffer,
56 IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
57 {
58 Page page = BufferGetPage(buffer);
59 int32 i,
60 freesize,
61 size = 0;
62 OffsetNumber l,
63 off;
64 PGAlignedBlock workspace;
65 char *ptr;
66
67 START_CRIT_SECTION();
68
69 GinInitBuffer(buffer, GIN_LIST);
70
71 off = FirstOffsetNumber;
72 ptr = workspace.data;
73
74 for (i = 0; i < ntuples; i++)
75 {
76 int this_size = IndexTupleSize(tuples[i]);
77
78 memcpy(ptr, tuples[i], this_size);
79 ptr += this_size;
80 size += this_size;
81
82 l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
83
84 if (l == InvalidOffsetNumber)
85 elog(ERROR, "failed to add item to index page in \"%s\"",
86 RelationGetRelationName(index));
87
88 off++;
89 }
90
91 Assert(size <= BLCKSZ); /* else we overran workspace */
92
93 GinPageGetOpaque(page)->rightlink = rightlink;
94
95 /*
96 * tail page may contain only whole row(s) or final part of row placed on
97 * previous pages (a "row" here meaning all the index tuples generated for
98 * one heap tuple)
99 */
100 if (rightlink == InvalidBlockNumber)
101 {
102 GinPageSetFullRow(page);
103 GinPageGetOpaque(page)->maxoff = 1;
104 }
105 else
106 {
107 GinPageGetOpaque(page)->maxoff = 0;
108 }
109
110 MarkBufferDirty(buffer);
111
112 if (RelationNeedsWAL(index))
113 {
114 ginxlogInsertListPage data;
115 XLogRecPtr recptr;
116
117 data.rightlink = rightlink;
118 data.ntuples = ntuples;
119
120 XLogBeginInsert();
121 XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
122
123 XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
124 XLogRegisterBufData(0, workspace.data, size);
125
126 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
127 PageSetLSN(page, recptr);
128 }
129
130 /* get free space before releasing buffer */
131 freesize = PageGetExactFreeSpace(page);
132
133 UnlockReleaseBuffer(buffer);
134
135 END_CRIT_SECTION();
136
137 return freesize;
138 }
139
140 static void
makeSublist(Relation index,IndexTuple * tuples,int32 ntuples,GinMetaPageData * res)141 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
142 GinMetaPageData *res)
143 {
144 Buffer curBuffer = InvalidBuffer;
145 Buffer prevBuffer = InvalidBuffer;
146 int i,
147 size = 0,
148 tupsize;
149 int startTuple = 0;
150
151 Assert(ntuples > 0);
152
153 /*
154 * Split tuples into pages
155 */
156 for (i = 0; i < ntuples; i++)
157 {
158 if (curBuffer == InvalidBuffer)
159 {
160 curBuffer = GinNewBuffer(index);
161
162 if (prevBuffer != InvalidBuffer)
163 {
164 res->nPendingPages++;
165 writeListPage(index, prevBuffer,
166 tuples + startTuple,
167 i - startTuple,
168 BufferGetBlockNumber(curBuffer));
169 }
170 else
171 {
172 res->head = BufferGetBlockNumber(curBuffer);
173 }
174
175 prevBuffer = curBuffer;
176 startTuple = i;
177 size = 0;
178 }
179
180 tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
181
182 if (size + tupsize > GinListPageSize)
183 {
184 /* won't fit, force a new page and reprocess */
185 i--;
186 curBuffer = InvalidBuffer;
187 }
188 else
189 {
190 size += tupsize;
191 }
192 }
193
194 /*
195 * Write last page
196 */
197 res->tail = BufferGetBlockNumber(curBuffer);
198 res->tailFreeSize = writeListPage(index, curBuffer,
199 tuples + startTuple,
200 ntuples - startTuple,
201 InvalidBlockNumber);
202 res->nPendingPages++;
203 /* that was only one heap tuple */
204 res->nPendingHeapTuples = 1;
205 }
206
207 /*
208 * Write the index tuples contained in *collector into the index's
209 * pending list.
210 *
211 * Function guarantees that all these tuples will be inserted consecutively,
212 * preserving order
213 */
214 void
ginHeapTupleFastInsert(GinState * ginstate,GinTupleCollector * collector)215 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
216 {
217 Relation index = ginstate->index;
218 Buffer metabuffer;
219 Page metapage;
220 GinMetaPageData *metadata = NULL;
221 Buffer buffer = InvalidBuffer;
222 Page page = NULL;
223 ginxlogUpdateMeta data;
224 bool separateList = false;
225 bool needCleanup = false;
226 int cleanupSize;
227 bool needWal;
228
229 if (collector->ntuples == 0)
230 return;
231
232 needWal = RelationNeedsWAL(index);
233
234 data.node = index->rd_node;
235 data.ntuples = 0;
236 data.newRightlink = data.prevTail = InvalidBlockNumber;
237
238 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
239 metapage = BufferGetPage(metabuffer);
240
241 if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
242 {
243 /*
244 * Total size is greater than one page => make sublist
245 */
246 separateList = true;
247 }
248 else
249 {
250 LockBuffer(metabuffer, GIN_EXCLUSIVE);
251 metadata = GinPageGetMeta(metapage);
252
253 if (metadata->head == InvalidBlockNumber ||
254 collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
255 {
256 /*
257 * Pending list is empty or total size is greater than freespace
258 * on tail page => make sublist
259 *
260 * We unlock metabuffer to keep high concurrency
261 */
262 separateList = true;
263 LockBuffer(metabuffer, GIN_UNLOCK);
264 }
265 }
266
267 if (separateList)
268 {
269 /*
270 * We should make sublist separately and append it to the tail
271 */
272 GinMetaPageData sublist;
273
274 memset(&sublist, 0, sizeof(GinMetaPageData));
275 makeSublist(index, collector->tuples, collector->ntuples, &sublist);
276
277 if (needWal)
278 XLogBeginInsert();
279
280 /*
281 * metapage was unlocked, see above
282 */
283 LockBuffer(metabuffer, GIN_EXCLUSIVE);
284 metadata = GinPageGetMeta(metapage);
285
286 if (metadata->head == InvalidBlockNumber)
287 {
288 /*
289 * Main list is empty, so just insert sublist as main list
290 */
291 START_CRIT_SECTION();
292
293 metadata->head = sublist.head;
294 metadata->tail = sublist.tail;
295 metadata->tailFreeSize = sublist.tailFreeSize;
296
297 metadata->nPendingPages = sublist.nPendingPages;
298 metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
299 }
300 else
301 {
302 /*
303 * Merge lists
304 */
305 data.prevTail = metadata->tail;
306 data.newRightlink = sublist.head;
307
308 buffer = ReadBuffer(index, metadata->tail);
309 LockBuffer(buffer, GIN_EXCLUSIVE);
310 page = BufferGetPage(buffer);
311
312 Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
313
314 START_CRIT_SECTION();
315
316 GinPageGetOpaque(page)->rightlink = sublist.head;
317
318 MarkBufferDirty(buffer);
319
320 metadata->tail = sublist.tail;
321 metadata->tailFreeSize = sublist.tailFreeSize;
322
323 metadata->nPendingPages += sublist.nPendingPages;
324 metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
325
326 if (needWal)
327 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
328 }
329 }
330 else
331 {
332 /*
333 * Insert into tail page. Metapage is already locked
334 */
335 OffsetNumber l,
336 off;
337 int i,
338 tupsize;
339 char *ptr;
340 char *collectordata;
341
342 buffer = ReadBuffer(index, metadata->tail);
343 LockBuffer(buffer, GIN_EXCLUSIVE);
344 page = BufferGetPage(buffer);
345
346 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
347 OffsetNumberNext(PageGetMaxOffsetNumber(page));
348
349 collectordata = ptr = (char *) palloc(collector->sumsize);
350
351 data.ntuples = collector->ntuples;
352
353 if (needWal)
354 XLogBeginInsert();
355
356 START_CRIT_SECTION();
357
358 /*
359 * Increase counter of heap tuples
360 */
361 Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
362 GinPageGetOpaque(page)->maxoff++;
363 metadata->nPendingHeapTuples++;
364
365 for (i = 0; i < collector->ntuples; i++)
366 {
367 tupsize = IndexTupleSize(collector->tuples[i]);
368 l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
369
370 if (l == InvalidOffsetNumber)
371 elog(ERROR, "failed to add item to index page in \"%s\"",
372 RelationGetRelationName(index));
373
374 memcpy(ptr, collector->tuples[i], tupsize);
375 ptr += tupsize;
376
377 off++;
378 }
379
380 Assert((ptr - collectordata) <= collector->sumsize);
381 if (needWal)
382 {
383 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
384 XLogRegisterBufData(1, collectordata, collector->sumsize);
385 }
386
387 metadata->tailFreeSize = PageGetExactFreeSpace(page);
388
389 MarkBufferDirty(buffer);
390 }
391
392 /*
393 * Write metabuffer, make xlog entry
394 */
395 MarkBufferDirty(metabuffer);
396
397 if (needWal)
398 {
399 XLogRecPtr recptr;
400
401 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
402
403 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
404 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
405
406 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
407 PageSetLSN(metapage, recptr);
408
409 if (buffer != InvalidBuffer)
410 {
411 PageSetLSN(page, recptr);
412 }
413 }
414
415 if (buffer != InvalidBuffer)
416 UnlockReleaseBuffer(buffer);
417
418 /*
419 * Force pending list cleanup when it becomes too long. And,
420 * ginInsertCleanup could take significant amount of time, so we prefer to
421 * call it when it can do all the work in a single collection cycle. In
422 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
423 * while pending list is still small enough to fit into
424 * gin_pending_list_limit.
425 *
426 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
427 */
428 cleanupSize = GinGetPendingListCleanupSize(index);
429 if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
430 needCleanup = true;
431
432 UnlockReleaseBuffer(metabuffer);
433
434 END_CRIT_SECTION();
435
436 /*
437 * Since it could contend with concurrent cleanup process we cleanup
438 * pending list not forcibly.
439 */
440 if (needCleanup)
441 ginInsertCleanup(ginstate, false, true, false, NULL);
442 }
443
444 /*
445 * Create temporary index tuples for a single indexable item (one index column
446 * for the heap tuple specified by ht_ctid), and append them to the array
447 * in *collector. They will subsequently be written out using
448 * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
449 * temp tuples for a given heap tuple must be written in one call to
450 * ginHeapTupleFastInsert.
451 */
452 void
ginHeapTupleFastCollect(GinState * ginstate,GinTupleCollector * collector,OffsetNumber attnum,Datum value,bool isNull,ItemPointer ht_ctid)453 ginHeapTupleFastCollect(GinState *ginstate,
454 GinTupleCollector *collector,
455 OffsetNumber attnum, Datum value, bool isNull,
456 ItemPointer ht_ctid)
457 {
458 Datum *entries;
459 GinNullCategory *categories;
460 int32 i,
461 nentries;
462
463 /*
464 * Extract the key values that need to be inserted in the index
465 */
466 entries = ginExtractEntries(ginstate, attnum, value, isNull,
467 &nentries, &categories);
468
469 /*
470 * Allocate/reallocate memory for storing collected tuples
471 */
472 if (collector->tuples == NULL)
473 {
474 collector->lentuples = nentries * ginstate->origTupdesc->natts;
475 collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
476 }
477
478 while (collector->ntuples + nentries > collector->lentuples)
479 {
480 collector->lentuples *= 2;
481 collector->tuples = (IndexTuple *) repalloc(collector->tuples,
482 sizeof(IndexTuple) * collector->lentuples);
483 }
484
485 /*
486 * Build an index tuple for each key value, and add to array. In pending
487 * tuples we just stick the heap TID into t_tid.
488 */
489 for (i = 0; i < nentries; i++)
490 {
491 IndexTuple itup;
492
493 itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
494 NULL, 0, 0, true);
495 itup->t_tid = *ht_ctid;
496 collector->tuples[collector->ntuples++] = itup;
497 collector->sumsize += IndexTupleSize(itup);
498 }
499 }
500
501 /*
502 * Deletes pending list pages up to (not including) newHead page.
503 * If newHead == InvalidBlockNumber then function drops the whole list.
504 *
505 * metapage is pinned and exclusive-locked throughout this function.
506 */
507 static void
shiftList(Relation index,Buffer metabuffer,BlockNumber newHead,bool fill_fsm,IndexBulkDeleteResult * stats)508 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
509 bool fill_fsm, IndexBulkDeleteResult *stats)
510 {
511 Page metapage;
512 GinMetaPageData *metadata;
513 BlockNumber blknoToDelete;
514
515 metapage = BufferGetPage(metabuffer);
516 metadata = GinPageGetMeta(metapage);
517 blknoToDelete = metadata->head;
518
519 do
520 {
521 Page page;
522 int i;
523 int64 nDeletedHeapTuples = 0;
524 ginxlogDeleteListPages data;
525 Buffer buffers[GIN_NDELETE_AT_ONCE];
526 BlockNumber freespace[GIN_NDELETE_AT_ONCE];
527
528 data.ndeleted = 0;
529 while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
530 {
531 freespace[data.ndeleted] = blknoToDelete;
532 buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
533 LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
534 page = BufferGetPage(buffers[data.ndeleted]);
535
536 data.ndeleted++;
537
538 Assert(!GinPageIsDeleted(page));
539
540 nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
541 blknoToDelete = GinPageGetOpaque(page)->rightlink;
542 }
543
544 if (stats)
545 stats->pages_deleted += data.ndeleted;
546
547 /*
548 * This operation touches an unusually large number of pages, so
549 * prepare the XLogInsert machinery for that before entering the
550 * critical section.
551 */
552 if (RelationNeedsWAL(index))
553 XLogEnsureRecordSpace(data.ndeleted, 0);
554
555 START_CRIT_SECTION();
556
557 metadata->head = blknoToDelete;
558
559 Assert(metadata->nPendingPages >= data.ndeleted);
560 metadata->nPendingPages -= data.ndeleted;
561 Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
562 metadata->nPendingHeapTuples -= nDeletedHeapTuples;
563
564 if (blknoToDelete == InvalidBlockNumber)
565 {
566 metadata->tail = InvalidBlockNumber;
567 metadata->tailFreeSize = 0;
568 metadata->nPendingPages = 0;
569 metadata->nPendingHeapTuples = 0;
570 }
571
572 MarkBufferDirty(metabuffer);
573
574 for (i = 0; i < data.ndeleted; i++)
575 {
576 page = BufferGetPage(buffers[i]);
577 GinPageGetOpaque(page)->flags = GIN_DELETED;
578 MarkBufferDirty(buffers[i]);
579 }
580
581 if (RelationNeedsWAL(index))
582 {
583 XLogRecPtr recptr;
584
585 XLogBeginInsert();
586 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
587 for (i = 0; i < data.ndeleted; i++)
588 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
589
590 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
591
592 XLogRegisterData((char *) &data,
593 sizeof(ginxlogDeleteListPages));
594
595 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
596 PageSetLSN(metapage, recptr);
597
598 for (i = 0; i < data.ndeleted; i++)
599 {
600 page = BufferGetPage(buffers[i]);
601 PageSetLSN(page, recptr);
602 }
603 }
604
605 for (i = 0; i < data.ndeleted; i++)
606 UnlockReleaseBuffer(buffers[i]);
607
608 END_CRIT_SECTION();
609
610 for (i = 0; fill_fsm && i < data.ndeleted; i++)
611 RecordFreeIndexPage(index, freespace[i]);
612
613 } while (blknoToDelete != newHead);
614 }
615
616 /* Initialize empty KeyArray */
617 static void
initKeyArray(KeyArray * keys,int32 maxvalues)618 initKeyArray(KeyArray *keys, int32 maxvalues)
619 {
620 keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
621 keys->categories = (GinNullCategory *)
622 palloc(sizeof(GinNullCategory) * maxvalues);
623 keys->nvalues = 0;
624 keys->maxvalues = maxvalues;
625 }
626
627 /* Add datum to KeyArray, resizing if needed */
628 static void
addDatum(KeyArray * keys,Datum datum,GinNullCategory category)629 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
630 {
631 if (keys->nvalues >= keys->maxvalues)
632 {
633 keys->maxvalues *= 2;
634 keys->keys = (Datum *)
635 repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
636 keys->categories = (GinNullCategory *)
637 repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
638 }
639
640 keys->keys[keys->nvalues] = datum;
641 keys->categories[keys->nvalues] = category;
642 keys->nvalues++;
643 }
644
645 /*
646 * Collect data from a pending-list page in preparation for insertion into
647 * the main index.
648 *
649 * Go through all tuples >= startoff on page and collect values in accum
650 *
651 * Note that ka is just workspace --- it does not carry any state across
652 * calls.
653 */
654 static void
processPendingPage(BuildAccumulator * accum,KeyArray * ka,Page page,OffsetNumber startoff)655 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
656 Page page, OffsetNumber startoff)
657 {
658 ItemPointerData heapptr;
659 OffsetNumber i,
660 maxoff;
661 OffsetNumber attrnum;
662
663 /* reset *ka to empty */
664 ka->nvalues = 0;
665
666 maxoff = PageGetMaxOffsetNumber(page);
667 Assert(maxoff >= FirstOffsetNumber);
668 ItemPointerSetInvalid(&heapptr);
669 attrnum = 0;
670
671 for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
672 {
673 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
674 OffsetNumber curattnum;
675 Datum curkey;
676 GinNullCategory curcategory;
677
678 /* Check for change of heap TID or attnum */
679 curattnum = gintuple_get_attrnum(accum->ginstate, itup);
680
681 if (!ItemPointerIsValid(&heapptr))
682 {
683 heapptr = itup->t_tid;
684 attrnum = curattnum;
685 }
686 else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
687 curattnum == attrnum))
688 {
689 /*
690 * ginInsertBAEntries can insert several datums per call, but only
691 * for one heap tuple and one column. So call it at a boundary,
692 * and reset ka.
693 */
694 ginInsertBAEntries(accum, &heapptr, attrnum,
695 ka->keys, ka->categories, ka->nvalues);
696 ka->nvalues = 0;
697 heapptr = itup->t_tid;
698 attrnum = curattnum;
699 }
700
701 /* Add key to KeyArray */
702 curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
703 addDatum(ka, curkey, curcategory);
704 }
705
706 /* Dump out all remaining keys */
707 ginInsertBAEntries(accum, &heapptr, attrnum,
708 ka->keys, ka->categories, ka->nvalues);
709 }
710
711 /*
712 * Move tuples from pending pages into regular GIN structure.
713 *
714 * On first glance it looks completely not crash-safe. But if we crash
715 * after posting entries to the main index and before removing them from the
716 * pending list, it's okay because when we redo the posting later on, nothing
717 * bad will happen.
718 *
719 * fill_fsm indicates that ginInsertCleanup should add deleted pages
720 * to FSM otherwise caller is responsible to put deleted pages into
721 * FSM.
722 *
723 * If stats isn't null, we count deleted pending pages into the counts.
724 */
725 void
ginInsertCleanup(GinState * ginstate,bool full_clean,bool fill_fsm,bool forceCleanup,IndexBulkDeleteResult * stats)726 ginInsertCleanup(GinState *ginstate, bool full_clean,
727 bool fill_fsm, bool forceCleanup,
728 IndexBulkDeleteResult *stats)
729 {
730 Relation index = ginstate->index;
731 Buffer metabuffer,
732 buffer;
733 Page metapage,
734 page;
735 GinMetaPageData *metadata;
736 MemoryContext opCtx,
737 oldCtx;
738 BuildAccumulator accum;
739 KeyArray datums;
740 BlockNumber blkno,
741 blknoFinish;
742 bool cleanupFinish = false;
743 bool fsm_vac = false;
744 Size workMemory;
745
746 /*
747 * We would like to prevent concurrent cleanup process. For that we will
748 * lock metapage in exclusive mode using LockPage() call. Nobody other
749 * will use that lock for metapage, so we keep possibility of concurrent
750 * insertion into pending list
751 */
752
753 if (forceCleanup)
754 {
755 /*
756 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
757 * and we would like to wait concurrent cleanup to finish.
758 */
759 LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
760 workMemory =
761 (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
762 autovacuum_work_mem : maintenance_work_mem;
763 }
764 else
765 {
766 /*
767 * We are called from regular insert and if we see concurrent cleanup
768 * just exit in hope that concurrent process will clean up pending
769 * list.
770 */
771 if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
772 return;
773 workMemory = work_mem;
774 }
775
776 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
777 LockBuffer(metabuffer, GIN_SHARE);
778 metapage = BufferGetPage(metabuffer);
779 metadata = GinPageGetMeta(metapage);
780
781 if (metadata->head == InvalidBlockNumber)
782 {
783 /* Nothing to do */
784 UnlockReleaseBuffer(metabuffer);
785 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
786 return;
787 }
788
789 /*
790 * Remember a tail page to prevent infinite cleanup if other backends add
791 * new tuples faster than we can cleanup.
792 */
793 blknoFinish = metadata->tail;
794
795 /*
796 * Read and lock head of pending list
797 */
798 blkno = metadata->head;
799 buffer = ReadBuffer(index, blkno);
800 LockBuffer(buffer, GIN_SHARE);
801 page = BufferGetPage(buffer);
802
803 LockBuffer(metabuffer, GIN_UNLOCK);
804
805 /*
806 * Initialize. All temporary space will be in opCtx
807 */
808 opCtx = AllocSetContextCreate(CurrentMemoryContext,
809 "GIN insert cleanup temporary context",
810 ALLOCSET_DEFAULT_SIZES);
811
812 oldCtx = MemoryContextSwitchTo(opCtx);
813
814 initKeyArray(&datums, 128);
815 ginInitBA(&accum);
816 accum.ginstate = ginstate;
817
818 /*
819 * At the top of this loop, we have pin and lock on the current page of
820 * the pending list. However, we'll release that before exiting the loop.
821 * Note we also have pin but not lock on the metapage.
822 */
823 for (;;)
824 {
825 Assert(!GinPageIsDeleted(page));
826
827 /*
828 * Are we walk through the page which as we remember was a tail when
829 * we start our cleanup? But if caller asks us to clean up whole
830 * pending list then ignore old tail, we will work until list becomes
831 * empty.
832 */
833 if (blkno == blknoFinish && full_clean == false)
834 cleanupFinish = true;
835
836 /*
837 * read page's datums into accum
838 */
839 processPendingPage(&accum, &datums, page, FirstOffsetNumber);
840
841 vacuum_delay_point();
842
843 /*
844 * Is it time to flush memory to disk? Flush if we are at the end of
845 * the pending list, or if we have a full row and memory is getting
846 * full.
847 */
848 if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
849 (GinPageHasFullRow(page) &&
850 (accum.allocatedMemory >= workMemory * 1024L)))
851 {
852 ItemPointerData *list;
853 uint32 nlist;
854 Datum key;
855 GinNullCategory category;
856 OffsetNumber maxoff,
857 attnum;
858
859 /*
860 * Unlock current page to increase performance. Changes of page
861 * will be checked later by comparing maxoff after completion of
862 * memory flush.
863 */
864 maxoff = PageGetMaxOffsetNumber(page);
865 LockBuffer(buffer, GIN_UNLOCK);
866
867 /*
868 * Moving collected data into regular structure can take
869 * significant amount of time - so, run it without locking pending
870 * list.
871 */
872 ginBeginBAScan(&accum);
873 while ((list = ginGetBAEntry(&accum,
874 &attnum, &key, &category, &nlist)) != NULL)
875 {
876 ginEntryInsert(ginstate, attnum, key, category,
877 list, nlist, NULL);
878 vacuum_delay_point();
879 }
880
881 /*
882 * Lock the whole list to remove pages
883 */
884 LockBuffer(metabuffer, GIN_EXCLUSIVE);
885 LockBuffer(buffer, GIN_SHARE);
886
887 Assert(!GinPageIsDeleted(page));
888
889 /*
890 * While we left the page unlocked, more stuff might have gotten
891 * added to it. If so, process those entries immediately. There
892 * shouldn't be very many, so we don't worry about the fact that
893 * we're doing this with exclusive lock. Insertion algorithm
894 * guarantees that inserted row(s) will not continue on next page.
895 * NOTE: intentionally no vacuum_delay_point in this loop.
896 */
897 if (PageGetMaxOffsetNumber(page) != maxoff)
898 {
899 ginInitBA(&accum);
900 processPendingPage(&accum, &datums, page, maxoff + 1);
901
902 ginBeginBAScan(&accum);
903 while ((list = ginGetBAEntry(&accum,
904 &attnum, &key, &category, &nlist)) != NULL)
905 ginEntryInsert(ginstate, attnum, key, category,
906 list, nlist, NULL);
907 }
908
909 /*
910 * Remember next page - it will become the new list head
911 */
912 blkno = GinPageGetOpaque(page)->rightlink;
913 UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
914 * locking */
915
916 /*
917 * remove read pages from pending list, at this point all content
918 * of read pages is in regular structure
919 */
920 shiftList(index, metabuffer, blkno, fill_fsm, stats);
921
922 /* At this point, some pending pages have been freed up */
923 fsm_vac = true;
924
925 Assert(blkno == metadata->head);
926 LockBuffer(metabuffer, GIN_UNLOCK);
927
928 /*
929 * if we removed the whole pending list or we cleanup tail (which
930 * we remembered on start our cleanup process) then just exit
931 */
932 if (blkno == InvalidBlockNumber || cleanupFinish)
933 break;
934
935 /*
936 * release memory used so far and reinit state
937 */
938 MemoryContextReset(opCtx);
939 initKeyArray(&datums, datums.maxvalues);
940 ginInitBA(&accum);
941 }
942 else
943 {
944 blkno = GinPageGetOpaque(page)->rightlink;
945 UnlockReleaseBuffer(buffer);
946 }
947
948 /*
949 * Read next page in pending list
950 */
951 vacuum_delay_point();
952 buffer = ReadBuffer(index, blkno);
953 LockBuffer(buffer, GIN_SHARE);
954 page = BufferGetPage(buffer);
955 }
956
957 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
958 ReleaseBuffer(metabuffer);
959
960 /*
961 * As pending list pages can have a high churn rate, it is desirable to
962 * recycle them immediately to the FreeSpace Map when ordinary backends
963 * clean the list.
964 */
965 if (fsm_vac && fill_fsm)
966 IndexFreeSpaceMapVacuum(index);
967
968
969 /* Clean up temporary space */
970 MemoryContextSwitchTo(oldCtx);
971 MemoryContextDelete(opCtx);
972 }
973
974 /*
975 * SQL-callable function to clean the insert pending list
976 */
977 Datum
gin_clean_pending_list(PG_FUNCTION_ARGS)978 gin_clean_pending_list(PG_FUNCTION_ARGS)
979 {
980 Oid indexoid = PG_GETARG_OID(0);
981 Relation indexRel = index_open(indexoid, AccessShareLock);
982 IndexBulkDeleteResult stats;
983 GinState ginstate;
984
985 if (RecoveryInProgress())
986 ereport(ERROR,
987 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
988 errmsg("recovery is in progress"),
989 errhint("GIN pending list cannot be cleaned up during recovery.")));
990
991 /* Must be a GIN index */
992 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
993 indexRel->rd_rel->relam != GIN_AM_OID)
994 ereport(ERROR,
995 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
996 errmsg("\"%s\" is not a GIN index",
997 RelationGetRelationName(indexRel))));
998
999 /*
1000 * Reject attempts to read non-local temporary relations; we would be
1001 * likely to get wrong data since we have no visibility into the owning
1002 * session's local buffers.
1003 */
1004 if (RELATION_IS_OTHER_TEMP(indexRel))
1005 ereport(ERROR,
1006 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1007 errmsg("cannot access temporary indexes of other sessions")));
1008
1009 /* User must own the index (comparable to privileges needed for VACUUM) */
1010 if (!pg_class_ownercheck(indexoid, GetUserId()))
1011 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1012 RelationGetRelationName(indexRel));
1013
1014 memset(&stats, 0, sizeof(stats));
1015 initGinState(&ginstate, indexRel);
1016 ginInsertCleanup(&ginstate, true, true, true, &stats);
1017
1018 index_close(indexRel, AccessShareLock);
1019
1020 PG_RETURN_INT64((int64) stats.pages_deleted);
1021 }
1022