1 /*-------------------------------------------------------------------------
2 *
3 * ginfast.c
4 * Fast insert routines for the Postgres inverted index access method.
5 * Pending entries are stored in linear list of pages. Later on
6 * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 * transfer pending entries into the regular index structure. This
8 * wins because bulk insertion is much more efficient than retail.
9 *
10 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
12 *
13 * IDENTIFICATION
14 * src/backend/access/gin/ginfast.c
15 *
16 *-------------------------------------------------------------------------
17 */
18
19 #include "postgres.h"
20
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xloginsert.h"
24 #include "access/xlog.h"
25 #include "commands/vacuum.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 #include "utils/acl.h"
31 #include "postmaster/autovacuum.h"
32 #include "storage/indexfsm.h"
33 #include "storage/lmgr.h"
34 #include "storage/predicate.h"
35 #include "utils/builtins.h"
36
37 /* GUC parameter */
38 int gin_pending_list_limit = 0;
39
40 #define GIN_PAGE_FREESIZE \
41 ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
42
43 typedef struct KeyArray
44 {
45 Datum *keys; /* expansible array */
46 GinNullCategory *categories; /* another expansible array */
47 int32 nvalues; /* current number of valid entries */
48 int32 maxvalues; /* allocated size of arrays */
49 } KeyArray;
50
51
52 /*
53 * Build a pending-list page from the given array of tuples, and write it out.
54 *
55 * Returns amount of free space left on the page.
56 */
57 static int32
writeListPage(Relation index,Buffer buffer,IndexTuple * tuples,int32 ntuples,BlockNumber rightlink)58 writeListPage(Relation index, Buffer buffer,
59 IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
60 {
61 Page page = BufferGetPage(buffer);
62 int32 i,
63 freesize,
64 size = 0;
65 OffsetNumber l,
66 off;
67 PGAlignedBlock workspace;
68 char *ptr;
69
70 START_CRIT_SECTION();
71
72 GinInitBuffer(buffer, GIN_LIST);
73
74 off = FirstOffsetNumber;
75 ptr = workspace.data;
76
77 for (i = 0; i < ntuples; i++)
78 {
79 int this_size = IndexTupleSize(tuples[i]);
80
81 memcpy(ptr, tuples[i], this_size);
82 ptr += this_size;
83 size += this_size;
84
85 l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
86
87 if (l == InvalidOffsetNumber)
88 elog(ERROR, "failed to add item to index page in \"%s\"",
89 RelationGetRelationName(index));
90
91 off++;
92 }
93
94 Assert(size <= BLCKSZ); /* else we overran workspace */
95
96 GinPageGetOpaque(page)->rightlink = rightlink;
97
98 /*
99 * tail page may contain only whole row(s) or final part of row placed on
100 * previous pages (a "row" here meaning all the index tuples generated for
101 * one heap tuple)
102 */
103 if (rightlink == InvalidBlockNumber)
104 {
105 GinPageSetFullRow(page);
106 GinPageGetOpaque(page)->maxoff = 1;
107 }
108 else
109 {
110 GinPageGetOpaque(page)->maxoff = 0;
111 }
112
113 MarkBufferDirty(buffer);
114
115 if (RelationNeedsWAL(index))
116 {
117 ginxlogInsertListPage data;
118 XLogRecPtr recptr;
119
120 data.rightlink = rightlink;
121 data.ntuples = ntuples;
122
123 XLogBeginInsert();
124 XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
125
126 XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
127 XLogRegisterBufData(0, workspace.data, size);
128
129 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
130 PageSetLSN(page, recptr);
131 }
132
133 /* get free space before releasing buffer */
134 freesize = PageGetExactFreeSpace(page);
135
136 UnlockReleaseBuffer(buffer);
137
138 END_CRIT_SECTION();
139
140 return freesize;
141 }
142
143 static void
makeSublist(Relation index,IndexTuple * tuples,int32 ntuples,GinMetaPageData * res)144 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
145 GinMetaPageData *res)
146 {
147 Buffer curBuffer = InvalidBuffer;
148 Buffer prevBuffer = InvalidBuffer;
149 int i,
150 size = 0,
151 tupsize;
152 int startTuple = 0;
153
154 Assert(ntuples > 0);
155
156 /*
157 * Split tuples into pages
158 */
159 for (i = 0; i < ntuples; i++)
160 {
161 if (curBuffer == InvalidBuffer)
162 {
163 curBuffer = GinNewBuffer(index);
164
165 if (prevBuffer != InvalidBuffer)
166 {
167 res->nPendingPages++;
168 writeListPage(index, prevBuffer,
169 tuples + startTuple,
170 i - startTuple,
171 BufferGetBlockNumber(curBuffer));
172 }
173 else
174 {
175 res->head = BufferGetBlockNumber(curBuffer);
176 }
177
178 prevBuffer = curBuffer;
179 startTuple = i;
180 size = 0;
181 }
182
183 tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
184
185 if (size + tupsize > GinListPageSize)
186 {
187 /* won't fit, force a new page and reprocess */
188 i--;
189 curBuffer = InvalidBuffer;
190 }
191 else
192 {
193 size += tupsize;
194 }
195 }
196
197 /*
198 * Write last page
199 */
200 res->tail = BufferGetBlockNumber(curBuffer);
201 res->tailFreeSize = writeListPage(index, curBuffer,
202 tuples + startTuple,
203 ntuples - startTuple,
204 InvalidBlockNumber);
205 res->nPendingPages++;
206 /* that was only one heap tuple */
207 res->nPendingHeapTuples = 1;
208 }
209
210 /*
211 * Write the index tuples contained in *collector into the index's
212 * pending list.
213 *
214 * Function guarantees that all these tuples will be inserted consecutively,
215 * preserving order
216 */
217 void
ginHeapTupleFastInsert(GinState * ginstate,GinTupleCollector * collector)218 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
219 {
220 Relation index = ginstate->index;
221 Buffer metabuffer;
222 Page metapage;
223 GinMetaPageData *metadata = NULL;
224 Buffer buffer = InvalidBuffer;
225 Page page = NULL;
226 ginxlogUpdateMeta data;
227 bool separateList = false;
228 bool needCleanup = false;
229 int cleanupSize;
230 bool needWal;
231
232 if (collector->ntuples == 0)
233 return;
234
235 needWal = RelationNeedsWAL(index);
236
237 data.node = index->rd_node;
238 data.ntuples = 0;
239 data.newRightlink = data.prevTail = InvalidBlockNumber;
240
241 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
242 metapage = BufferGetPage(metabuffer);
243
244 /*
245 * An insertion to the pending list could logically belong anywhere in the
246 * tree, so it conflicts with all serializable scans. All scans acquire a
247 * predicate lock on the metabuffer to represent that.
248 */
249 CheckForSerializableConflictIn(index, NULL, metabuffer);
250
251 if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
252 {
253 /*
254 * Total size is greater than one page => make sublist
255 */
256 separateList = true;
257 }
258 else
259 {
260 LockBuffer(metabuffer, GIN_EXCLUSIVE);
261 metadata = GinPageGetMeta(metapage);
262
263 if (metadata->head == InvalidBlockNumber ||
264 collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
265 {
266 /*
267 * Pending list is empty or total size is greater than freespace
268 * on tail page => make sublist
269 *
270 * We unlock metabuffer to keep high concurrency
271 */
272 separateList = true;
273 LockBuffer(metabuffer, GIN_UNLOCK);
274 }
275 }
276
277 if (separateList)
278 {
279 /*
280 * We should make sublist separately and append it to the tail
281 */
282 GinMetaPageData sublist;
283
284 memset(&sublist, 0, sizeof(GinMetaPageData));
285 makeSublist(index, collector->tuples, collector->ntuples, &sublist);
286
287 if (needWal)
288 XLogBeginInsert();
289
290 /*
291 * metapage was unlocked, see above
292 */
293 LockBuffer(metabuffer, GIN_EXCLUSIVE);
294 metadata = GinPageGetMeta(metapage);
295
296 if (metadata->head == InvalidBlockNumber)
297 {
298 /*
299 * Main list is empty, so just insert sublist as main list
300 */
301 START_CRIT_SECTION();
302
303 metadata->head = sublist.head;
304 metadata->tail = sublist.tail;
305 metadata->tailFreeSize = sublist.tailFreeSize;
306
307 metadata->nPendingPages = sublist.nPendingPages;
308 metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
309 }
310 else
311 {
312 /*
313 * Merge lists
314 */
315 data.prevTail = metadata->tail;
316 data.newRightlink = sublist.head;
317
318 buffer = ReadBuffer(index, metadata->tail);
319 LockBuffer(buffer, GIN_EXCLUSIVE);
320 page = BufferGetPage(buffer);
321
322 Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
323
324 START_CRIT_SECTION();
325
326 GinPageGetOpaque(page)->rightlink = sublist.head;
327
328 MarkBufferDirty(buffer);
329
330 metadata->tail = sublist.tail;
331 metadata->tailFreeSize = sublist.tailFreeSize;
332
333 metadata->nPendingPages += sublist.nPendingPages;
334 metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
335
336 if (needWal)
337 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
338 }
339 }
340 else
341 {
342 /*
343 * Insert into tail page. Metapage is already locked
344 */
345 OffsetNumber l,
346 off;
347 int i,
348 tupsize;
349 char *ptr;
350 char *collectordata;
351
352 buffer = ReadBuffer(index, metadata->tail);
353 LockBuffer(buffer, GIN_EXCLUSIVE);
354 page = BufferGetPage(buffer);
355
356 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
357 OffsetNumberNext(PageGetMaxOffsetNumber(page));
358
359 collectordata = ptr = (char *) palloc(collector->sumsize);
360
361 data.ntuples = collector->ntuples;
362
363 if (needWal)
364 XLogBeginInsert();
365
366 START_CRIT_SECTION();
367
368 /*
369 * Increase counter of heap tuples
370 */
371 Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
372 GinPageGetOpaque(page)->maxoff++;
373 metadata->nPendingHeapTuples++;
374
375 for (i = 0; i < collector->ntuples; i++)
376 {
377 tupsize = IndexTupleSize(collector->tuples[i]);
378 l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
379
380 if (l == InvalidOffsetNumber)
381 elog(ERROR, "failed to add item to index page in \"%s\"",
382 RelationGetRelationName(index));
383
384 memcpy(ptr, collector->tuples[i], tupsize);
385 ptr += tupsize;
386
387 off++;
388 }
389
390 Assert((ptr - collectordata) <= collector->sumsize);
391 if (needWal)
392 {
393 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
394 XLogRegisterBufData(1, collectordata, collector->sumsize);
395 }
396
397 metadata->tailFreeSize = PageGetExactFreeSpace(page);
398
399 MarkBufferDirty(buffer);
400 }
401
402 /*
403 * Set pd_lower just past the end of the metadata. This is essential,
404 * because without doing so, metadata will be lost if xlog.c compresses
405 * the page. (We must do this here because pre-v11 versions of PG did not
406 * set the metapage's pd_lower correctly, so a pg_upgraded index might
407 * contain the wrong value.)
408 */
409 ((PageHeader) metapage)->pd_lower =
410 ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
411
412 /*
413 * Write metabuffer, make xlog entry
414 */
415 MarkBufferDirty(metabuffer);
416
417 if (needWal)
418 {
419 XLogRecPtr recptr;
420
421 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
422
423 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
424 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
425
426 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
427 PageSetLSN(metapage, recptr);
428
429 if (buffer != InvalidBuffer)
430 {
431 PageSetLSN(page, recptr);
432 }
433 }
434
435 if (buffer != InvalidBuffer)
436 UnlockReleaseBuffer(buffer);
437
438 /*
439 * Force pending list cleanup when it becomes too long. And,
440 * ginInsertCleanup could take significant amount of time, so we prefer to
441 * call it when it can do all the work in a single collection cycle. In
442 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
443 * while pending list is still small enough to fit into
444 * gin_pending_list_limit.
445 *
446 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
447 */
448 cleanupSize = GinGetPendingListCleanupSize(index);
449 if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
450 needCleanup = true;
451
452 UnlockReleaseBuffer(metabuffer);
453
454 END_CRIT_SECTION();
455
456 /*
457 * Since it could contend with concurrent cleanup process we cleanup
458 * pending list not forcibly.
459 */
460 if (needCleanup)
461 ginInsertCleanup(ginstate, false, true, false, NULL);
462 }
463
464 /*
465 * Create temporary index tuples for a single indexable item (one index column
466 * for the heap tuple specified by ht_ctid), and append them to the array
467 * in *collector. They will subsequently be written out using
468 * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
469 * temp tuples for a given heap tuple must be written in one call to
470 * ginHeapTupleFastInsert.
471 */
472 void
ginHeapTupleFastCollect(GinState * ginstate,GinTupleCollector * collector,OffsetNumber attnum,Datum value,bool isNull,ItemPointer ht_ctid)473 ginHeapTupleFastCollect(GinState *ginstate,
474 GinTupleCollector *collector,
475 OffsetNumber attnum, Datum value, bool isNull,
476 ItemPointer ht_ctid)
477 {
478 Datum *entries;
479 GinNullCategory *categories;
480 int32 i,
481 nentries;
482
483 /*
484 * Extract the key values that need to be inserted in the index
485 */
486 entries = ginExtractEntries(ginstate, attnum, value, isNull,
487 &nentries, &categories);
488
489 /*
490 * Allocate/reallocate memory for storing collected tuples
491 */
492 if (collector->tuples == NULL)
493 {
494 collector->lentuples = nentries * ginstate->origTupdesc->natts;
495 collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
496 }
497
498 while (collector->ntuples + nentries > collector->lentuples)
499 {
500 collector->lentuples *= 2;
501 collector->tuples = (IndexTuple *) repalloc(collector->tuples,
502 sizeof(IndexTuple) * collector->lentuples);
503 }
504
505 /*
506 * Build an index tuple for each key value, and add to array. In pending
507 * tuples we just stick the heap TID into t_tid.
508 */
509 for (i = 0; i < nentries; i++)
510 {
511 IndexTuple itup;
512
513 itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
514 NULL, 0, 0, true);
515 itup->t_tid = *ht_ctid;
516 collector->tuples[collector->ntuples++] = itup;
517 collector->sumsize += IndexTupleSize(itup);
518 }
519 }
520
521 /*
522 * Deletes pending list pages up to (not including) newHead page.
523 * If newHead == InvalidBlockNumber then function drops the whole list.
524 *
525 * metapage is pinned and exclusive-locked throughout this function.
526 */
527 static void
shiftList(Relation index,Buffer metabuffer,BlockNumber newHead,bool fill_fsm,IndexBulkDeleteResult * stats)528 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
529 bool fill_fsm, IndexBulkDeleteResult *stats)
530 {
531 Page metapage;
532 GinMetaPageData *metadata;
533 BlockNumber blknoToDelete;
534
535 metapage = BufferGetPage(metabuffer);
536 metadata = GinPageGetMeta(metapage);
537 blknoToDelete = metadata->head;
538
539 do
540 {
541 Page page;
542 int i;
543 int64 nDeletedHeapTuples = 0;
544 ginxlogDeleteListPages data;
545 Buffer buffers[GIN_NDELETE_AT_ONCE];
546 BlockNumber freespace[GIN_NDELETE_AT_ONCE];
547
548 data.ndeleted = 0;
549 while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
550 {
551 freespace[data.ndeleted] = blknoToDelete;
552 buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
553 LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
554 page = BufferGetPage(buffers[data.ndeleted]);
555
556 data.ndeleted++;
557
558 Assert(!GinPageIsDeleted(page));
559
560 nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
561 blknoToDelete = GinPageGetOpaque(page)->rightlink;
562 }
563
564 if (stats)
565 stats->pages_deleted += data.ndeleted;
566
567 /*
568 * This operation touches an unusually large number of pages, so
569 * prepare the XLogInsert machinery for that before entering the
570 * critical section.
571 */
572 if (RelationNeedsWAL(index))
573 XLogEnsureRecordSpace(data.ndeleted, 0);
574
575 START_CRIT_SECTION();
576
577 metadata->head = blknoToDelete;
578
579 Assert(metadata->nPendingPages >= data.ndeleted);
580 metadata->nPendingPages -= data.ndeleted;
581 Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
582 metadata->nPendingHeapTuples -= nDeletedHeapTuples;
583
584 if (blknoToDelete == InvalidBlockNumber)
585 {
586 metadata->tail = InvalidBlockNumber;
587 metadata->tailFreeSize = 0;
588 metadata->nPendingPages = 0;
589 metadata->nPendingHeapTuples = 0;
590 }
591
592 /*
593 * Set pd_lower just past the end of the metadata. This is essential,
594 * because without doing so, metadata will be lost if xlog.c
595 * compresses the page. (We must do this here because pre-v11
596 * versions of PG did not set the metapage's pd_lower correctly, so a
597 * pg_upgraded index might contain the wrong value.)
598 */
599 ((PageHeader) metapage)->pd_lower =
600 ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
601
602 MarkBufferDirty(metabuffer);
603
604 for (i = 0; i < data.ndeleted; i++)
605 {
606 page = BufferGetPage(buffers[i]);
607 GinPageGetOpaque(page)->flags = GIN_DELETED;
608 MarkBufferDirty(buffers[i]);
609 }
610
611 if (RelationNeedsWAL(index))
612 {
613 XLogRecPtr recptr;
614
615 XLogBeginInsert();
616 XLogRegisterBuffer(0, metabuffer,
617 REGBUF_WILL_INIT | REGBUF_STANDARD);
618 for (i = 0; i < data.ndeleted; i++)
619 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
620
621 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
622
623 XLogRegisterData((char *) &data,
624 sizeof(ginxlogDeleteListPages));
625
626 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
627 PageSetLSN(metapage, recptr);
628
629 for (i = 0; i < data.ndeleted; i++)
630 {
631 page = BufferGetPage(buffers[i]);
632 PageSetLSN(page, recptr);
633 }
634 }
635
636 for (i = 0; i < data.ndeleted; i++)
637 UnlockReleaseBuffer(buffers[i]);
638
639 END_CRIT_SECTION();
640
641 for (i = 0; fill_fsm && i < data.ndeleted; i++)
642 RecordFreeIndexPage(index, freespace[i]);
643
644 } while (blknoToDelete != newHead);
645 }
646
647 /* Initialize empty KeyArray */
648 static void
initKeyArray(KeyArray * keys,int32 maxvalues)649 initKeyArray(KeyArray *keys, int32 maxvalues)
650 {
651 keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
652 keys->categories = (GinNullCategory *)
653 palloc(sizeof(GinNullCategory) * maxvalues);
654 keys->nvalues = 0;
655 keys->maxvalues = maxvalues;
656 }
657
658 /* Add datum to KeyArray, resizing if needed */
659 static void
addDatum(KeyArray * keys,Datum datum,GinNullCategory category)660 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
661 {
662 if (keys->nvalues >= keys->maxvalues)
663 {
664 keys->maxvalues *= 2;
665 keys->keys = (Datum *)
666 repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
667 keys->categories = (GinNullCategory *)
668 repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
669 }
670
671 keys->keys[keys->nvalues] = datum;
672 keys->categories[keys->nvalues] = category;
673 keys->nvalues++;
674 }
675
676 /*
677 * Collect data from a pending-list page in preparation for insertion into
678 * the main index.
679 *
680 * Go through all tuples >= startoff on page and collect values in accum
681 *
682 * Note that ka is just workspace --- it does not carry any state across
683 * calls.
684 */
685 static void
processPendingPage(BuildAccumulator * accum,KeyArray * ka,Page page,OffsetNumber startoff)686 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
687 Page page, OffsetNumber startoff)
688 {
689 ItemPointerData heapptr;
690 OffsetNumber i,
691 maxoff;
692 OffsetNumber attrnum;
693
694 /* reset *ka to empty */
695 ka->nvalues = 0;
696
697 maxoff = PageGetMaxOffsetNumber(page);
698 Assert(maxoff >= FirstOffsetNumber);
699 ItemPointerSetInvalid(&heapptr);
700 attrnum = 0;
701
702 for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
703 {
704 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
705 OffsetNumber curattnum;
706 Datum curkey;
707 GinNullCategory curcategory;
708
709 /* Check for change of heap TID or attnum */
710 curattnum = gintuple_get_attrnum(accum->ginstate, itup);
711
712 if (!ItemPointerIsValid(&heapptr))
713 {
714 heapptr = itup->t_tid;
715 attrnum = curattnum;
716 }
717 else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
718 curattnum == attrnum))
719 {
720 /*
721 * ginInsertBAEntries can insert several datums per call, but only
722 * for one heap tuple and one column. So call it at a boundary,
723 * and reset ka.
724 */
725 ginInsertBAEntries(accum, &heapptr, attrnum,
726 ka->keys, ka->categories, ka->nvalues);
727 ka->nvalues = 0;
728 heapptr = itup->t_tid;
729 attrnum = curattnum;
730 }
731
732 /* Add key to KeyArray */
733 curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
734 addDatum(ka, curkey, curcategory);
735 }
736
737 /* Dump out all remaining keys */
738 ginInsertBAEntries(accum, &heapptr, attrnum,
739 ka->keys, ka->categories, ka->nvalues);
740 }
741
742 /*
743 * Move tuples from pending pages into regular GIN structure.
744 *
745 * On first glance it looks completely not crash-safe. But if we crash
746 * after posting entries to the main index and before removing them from the
747 * pending list, it's okay because when we redo the posting later on, nothing
748 * bad will happen.
749 *
750 * fill_fsm indicates that ginInsertCleanup should add deleted pages
751 * to FSM otherwise caller is responsible to put deleted pages into
752 * FSM.
753 *
754 * If stats isn't null, we count deleted pending pages into the counts.
755 */
756 void
ginInsertCleanup(GinState * ginstate,bool full_clean,bool fill_fsm,bool forceCleanup,IndexBulkDeleteResult * stats)757 ginInsertCleanup(GinState *ginstate, bool full_clean,
758 bool fill_fsm, bool forceCleanup,
759 IndexBulkDeleteResult *stats)
760 {
761 Relation index = ginstate->index;
762 Buffer metabuffer,
763 buffer;
764 Page metapage,
765 page;
766 GinMetaPageData *metadata;
767 MemoryContext opCtx,
768 oldCtx;
769 BuildAccumulator accum;
770 KeyArray datums;
771 BlockNumber blkno,
772 blknoFinish;
773 bool cleanupFinish = false;
774 bool fsm_vac = false;
775 Size workMemory;
776
777 /*
778 * We would like to prevent concurrent cleanup process. For that we will
779 * lock metapage in exclusive mode using LockPage() call. Nobody other
780 * will use that lock for metapage, so we keep possibility of concurrent
781 * insertion into pending list
782 */
783
784 if (forceCleanup)
785 {
786 /*
787 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
788 * and we would like to wait concurrent cleanup to finish.
789 */
790 LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
791 workMemory =
792 (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
793 autovacuum_work_mem : maintenance_work_mem;
794 }
795 else
796 {
797 /*
798 * We are called from regular insert and if we see concurrent cleanup
799 * just exit in hope that concurrent process will clean up pending
800 * list.
801 */
802 if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
803 return;
804 workMemory = work_mem;
805 }
806
807 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
808 LockBuffer(metabuffer, GIN_SHARE);
809 metapage = BufferGetPage(metabuffer);
810 metadata = GinPageGetMeta(metapage);
811
812 if (metadata->head == InvalidBlockNumber)
813 {
814 /* Nothing to do */
815 UnlockReleaseBuffer(metabuffer);
816 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
817 return;
818 }
819
820 /*
821 * Remember a tail page to prevent infinite cleanup if other backends add
822 * new tuples faster than we can cleanup.
823 */
824 blknoFinish = metadata->tail;
825
826 /*
827 * Read and lock head of pending list
828 */
829 blkno = metadata->head;
830 buffer = ReadBuffer(index, blkno);
831 LockBuffer(buffer, GIN_SHARE);
832 page = BufferGetPage(buffer);
833
834 LockBuffer(metabuffer, GIN_UNLOCK);
835
836 /*
837 * Initialize. All temporary space will be in opCtx
838 */
839 opCtx = AllocSetContextCreate(CurrentMemoryContext,
840 "GIN insert cleanup temporary context",
841 ALLOCSET_DEFAULT_SIZES);
842
843 oldCtx = MemoryContextSwitchTo(opCtx);
844
845 initKeyArray(&datums, 128);
846 ginInitBA(&accum);
847 accum.ginstate = ginstate;
848
849 /*
850 * At the top of this loop, we have pin and lock on the current page of
851 * the pending list. However, we'll release that before exiting the loop.
852 * Note we also have pin but not lock on the metapage.
853 */
854 for (;;)
855 {
856 Assert(!GinPageIsDeleted(page));
857
858 /*
859 * Are we walk through the page which as we remember was a tail when
860 * we start our cleanup? But if caller asks us to clean up whole
861 * pending list then ignore old tail, we will work until list becomes
862 * empty.
863 */
864 if (blkno == blknoFinish && full_clean == false)
865 cleanupFinish = true;
866
867 /*
868 * read page's datums into accum
869 */
870 processPendingPage(&accum, &datums, page, FirstOffsetNumber);
871
872 vacuum_delay_point();
873
874 /*
875 * Is it time to flush memory to disk? Flush if we are at the end of
876 * the pending list, or if we have a full row and memory is getting
877 * full.
878 */
879 if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
880 (GinPageHasFullRow(page) &&
881 (accum.allocatedMemory >= workMemory * 1024L)))
882 {
883 ItemPointerData *list;
884 uint32 nlist;
885 Datum key;
886 GinNullCategory category;
887 OffsetNumber maxoff,
888 attnum;
889
890 /*
891 * Unlock current page to increase performance. Changes of page
892 * will be checked later by comparing maxoff after completion of
893 * memory flush.
894 */
895 maxoff = PageGetMaxOffsetNumber(page);
896 LockBuffer(buffer, GIN_UNLOCK);
897
898 /*
899 * Moving collected data into regular structure can take
900 * significant amount of time - so, run it without locking pending
901 * list.
902 */
903 ginBeginBAScan(&accum);
904 while ((list = ginGetBAEntry(&accum,
905 &attnum, &key, &category, &nlist)) != NULL)
906 {
907 ginEntryInsert(ginstate, attnum, key, category,
908 list, nlist, NULL);
909 vacuum_delay_point();
910 }
911
912 /*
913 * Lock the whole list to remove pages
914 */
915 LockBuffer(metabuffer, GIN_EXCLUSIVE);
916 LockBuffer(buffer, GIN_SHARE);
917
918 Assert(!GinPageIsDeleted(page));
919
920 /*
921 * While we left the page unlocked, more stuff might have gotten
922 * added to it. If so, process those entries immediately. There
923 * shouldn't be very many, so we don't worry about the fact that
924 * we're doing this with exclusive lock. Insertion algorithm
925 * guarantees that inserted row(s) will not continue on next page.
926 * NOTE: intentionally no vacuum_delay_point in this loop.
927 */
928 if (PageGetMaxOffsetNumber(page) != maxoff)
929 {
930 ginInitBA(&accum);
931 processPendingPage(&accum, &datums, page, maxoff + 1);
932
933 ginBeginBAScan(&accum);
934 while ((list = ginGetBAEntry(&accum,
935 &attnum, &key, &category, &nlist)) != NULL)
936 ginEntryInsert(ginstate, attnum, key, category,
937 list, nlist, NULL);
938 }
939
940 /*
941 * Remember next page - it will become the new list head
942 */
943 blkno = GinPageGetOpaque(page)->rightlink;
944 UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
945 * locking */
946
947 /*
948 * remove read pages from pending list, at this point all content
949 * of read pages is in regular structure
950 */
951 shiftList(index, metabuffer, blkno, fill_fsm, stats);
952
953 /* At this point, some pending pages have been freed up */
954 fsm_vac = true;
955
956 Assert(blkno == metadata->head);
957 LockBuffer(metabuffer, GIN_UNLOCK);
958
959 /*
960 * if we removed the whole pending list or we cleanup tail (which
961 * we remembered on start our cleanup process) then just exit
962 */
963 if (blkno == InvalidBlockNumber || cleanupFinish)
964 break;
965
966 /*
967 * release memory used so far and reinit state
968 */
969 MemoryContextReset(opCtx);
970 initKeyArray(&datums, datums.maxvalues);
971 ginInitBA(&accum);
972 }
973 else
974 {
975 blkno = GinPageGetOpaque(page)->rightlink;
976 UnlockReleaseBuffer(buffer);
977 }
978
979 /*
980 * Read next page in pending list
981 */
982 vacuum_delay_point();
983 buffer = ReadBuffer(index, blkno);
984 LockBuffer(buffer, GIN_SHARE);
985 page = BufferGetPage(buffer);
986 }
987
988 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
989 ReleaseBuffer(metabuffer);
990
991 /*
992 * As pending list pages can have a high churn rate, it is desirable to
993 * recycle them immediately to the FreeSpace Map when ordinary backends
994 * clean the list.
995 */
996 if (fsm_vac && fill_fsm)
997 IndexFreeSpaceMapVacuum(index);
998
999 /* Clean up temporary space */
1000 MemoryContextSwitchTo(oldCtx);
1001 MemoryContextDelete(opCtx);
1002 }
1003
1004 /*
1005 * SQL-callable function to clean the insert pending list
1006 */
1007 Datum
gin_clean_pending_list(PG_FUNCTION_ARGS)1008 gin_clean_pending_list(PG_FUNCTION_ARGS)
1009 {
1010 Oid indexoid = PG_GETARG_OID(0);
1011 Relation indexRel = index_open(indexoid, AccessShareLock);
1012 IndexBulkDeleteResult stats;
1013 GinState ginstate;
1014
1015 if (RecoveryInProgress())
1016 ereport(ERROR,
1017 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1018 errmsg("recovery is in progress"),
1019 errhint("GIN pending list cannot be cleaned up during recovery.")));
1020
1021 /* Must be a GIN index */
1022 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1023 indexRel->rd_rel->relam != GIN_AM_OID)
1024 ereport(ERROR,
1025 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1026 errmsg("\"%s\" is not a GIN index",
1027 RelationGetRelationName(indexRel))));
1028
1029 /*
1030 * Reject attempts to read non-local temporary relations; we would be
1031 * likely to get wrong data since we have no visibility into the owning
1032 * session's local buffers.
1033 */
1034 if (RELATION_IS_OTHER_TEMP(indexRel))
1035 ereport(ERROR,
1036 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1037 errmsg("cannot access temporary indexes of other sessions")));
1038
1039 /* User must own the index (comparable to privileges needed for VACUUM) */
1040 if (!pg_class_ownercheck(indexoid, GetUserId()))
1041 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1042 RelationGetRelationName(indexRel));
1043
1044 memset(&stats, 0, sizeof(stats));
1045 initGinState(&ginstate, indexRel);
1046 ginInsertCleanup(&ginstate, true, true, true, &stats);
1047
1048 index_close(indexRel, AccessShareLock);
1049
1050 PG_RETURN_INT64((int64) stats.pages_deleted);
1051 }
1052