1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  *	  Fast insert routines for the Postgres inverted index access method.
5  *	  Pending entries are stored in linear list of pages.  Later on
6  *	  (typically during VACUUM), ginInsertCleanup() will be invoked to
7  *	  transfer pending entries into the regular index structure.  This
8  *	  wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *			src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xloginsert.h"
24 #include "access/xlog.h"
25 #include "commands/vacuum.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 #include "utils/acl.h"
31 #include "postmaster/autovacuum.h"
32 #include "storage/indexfsm.h"
33 #include "storage/lmgr.h"
34 #include "utils/builtins.h"
35 
36 /* GUC parameter */
37 int			gin_pending_list_limit = 0;
38 
39 #define GIN_PAGE_FREESIZE \
40 	( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
41 
42 typedef struct KeyArray
43 {
44 	Datum	   *keys;			/* expansible array */
45 	GinNullCategory *categories;	/* another expansible array */
46 	int32		nvalues;		/* current number of valid entries */
47 	int32		maxvalues;		/* allocated size of arrays */
48 } KeyArray;
49 
50 
51 /*
52  * Build a pending-list page from the given array of tuples, and write it out.
53  *
54  * Returns amount of free space left on the page.
55  */
56 static int32
writeListPage(Relation index,Buffer buffer,IndexTuple * tuples,int32 ntuples,BlockNumber rightlink)57 writeListPage(Relation index, Buffer buffer,
58 			  IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
59 {
60 	Page		page = BufferGetPage(buffer);
61 	int32		i,
62 				freesize,
63 				size = 0;
64 	OffsetNumber l,
65 				off;
66 	PGAlignedBlock workspace;
67 	char	   *ptr;
68 
69 	START_CRIT_SECTION();
70 
71 	GinInitBuffer(buffer, GIN_LIST);
72 
73 	off = FirstOffsetNumber;
74 	ptr = workspace.data;
75 
76 	for (i = 0; i < ntuples; i++)
77 	{
78 		int			this_size = IndexTupleSize(tuples[i]);
79 
80 		memcpy(ptr, tuples[i], this_size);
81 		ptr += this_size;
82 		size += this_size;
83 
84 		l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
85 
86 		if (l == InvalidOffsetNumber)
87 			elog(ERROR, "failed to add item to index page in \"%s\"",
88 				 RelationGetRelationName(index));
89 
90 		off++;
91 	}
92 
93 	Assert(size <= BLCKSZ);		/* else we overran workspace */
94 
95 	GinPageGetOpaque(page)->rightlink = rightlink;
96 
97 	/*
98 	 * tail page may contain only whole row(s) or final part of row placed on
99 	 * previous pages (a "row" here meaning all the index tuples generated for
100 	 * one heap tuple)
101 	 */
102 	if (rightlink == InvalidBlockNumber)
103 	{
104 		GinPageSetFullRow(page);
105 		GinPageGetOpaque(page)->maxoff = 1;
106 	}
107 	else
108 	{
109 		GinPageGetOpaque(page)->maxoff = 0;
110 	}
111 
112 	MarkBufferDirty(buffer);
113 
114 	if (RelationNeedsWAL(index))
115 	{
116 		ginxlogInsertListPage data;
117 		XLogRecPtr	recptr;
118 
119 		data.rightlink = rightlink;
120 		data.ntuples = ntuples;
121 
122 		XLogBeginInsert();
123 		XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
124 
125 		XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
126 		XLogRegisterBufData(0, workspace.data, size);
127 
128 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
129 		PageSetLSN(page, recptr);
130 	}
131 
132 	/* get free space before releasing buffer */
133 	freesize = PageGetExactFreeSpace(page);
134 
135 	UnlockReleaseBuffer(buffer);
136 
137 	END_CRIT_SECTION();
138 
139 	return freesize;
140 }
141 
142 static void
makeSublist(Relation index,IndexTuple * tuples,int32 ntuples,GinMetaPageData * res)143 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
144 			GinMetaPageData *res)
145 {
146 	Buffer		curBuffer = InvalidBuffer;
147 	Buffer		prevBuffer = InvalidBuffer;
148 	int			i,
149 				size = 0,
150 				tupsize;
151 	int			startTuple = 0;
152 
153 	Assert(ntuples > 0);
154 
155 	/*
156 	 * Split tuples into pages
157 	 */
158 	for (i = 0; i < ntuples; i++)
159 	{
160 		if (curBuffer == InvalidBuffer)
161 		{
162 			curBuffer = GinNewBuffer(index);
163 
164 			if (prevBuffer != InvalidBuffer)
165 			{
166 				res->nPendingPages++;
167 				writeListPage(index, prevBuffer,
168 							  tuples + startTuple,
169 							  i - startTuple,
170 							  BufferGetBlockNumber(curBuffer));
171 			}
172 			else
173 			{
174 				res->head = BufferGetBlockNumber(curBuffer);
175 			}
176 
177 			prevBuffer = curBuffer;
178 			startTuple = i;
179 			size = 0;
180 		}
181 
182 		tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
183 
184 		if (size + tupsize > GinListPageSize)
185 		{
186 			/* won't fit, force a new page and reprocess */
187 			i--;
188 			curBuffer = InvalidBuffer;
189 		}
190 		else
191 		{
192 			size += tupsize;
193 		}
194 	}
195 
196 	/*
197 	 * Write last page
198 	 */
199 	res->tail = BufferGetBlockNumber(curBuffer);
200 	res->tailFreeSize = writeListPage(index, curBuffer,
201 									  tuples + startTuple,
202 									  ntuples - startTuple,
203 									  InvalidBlockNumber);
204 	res->nPendingPages++;
205 	/* that was only one heap tuple */
206 	res->nPendingHeapTuples = 1;
207 }
208 
209 /*
210  * Write the index tuples contained in *collector into the index's
211  * pending list.
212  *
213  * Function guarantees that all these tuples will be inserted consecutively,
214  * preserving order
215  */
216 void
ginHeapTupleFastInsert(GinState * ginstate,GinTupleCollector * collector)217 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
218 {
219 	Relation	index = ginstate->index;
220 	Buffer		metabuffer;
221 	Page		metapage;
222 	GinMetaPageData *metadata = NULL;
223 	Buffer		buffer = InvalidBuffer;
224 	Page		page = NULL;
225 	ginxlogUpdateMeta data;
226 	bool		separateList = false;
227 	bool		needCleanup = false;
228 	int			cleanupSize;
229 	bool		needWal;
230 
231 	if (collector->ntuples == 0)
232 		return;
233 
234 	needWal = RelationNeedsWAL(index);
235 
236 	data.node = index->rd_node;
237 	data.ntuples = 0;
238 	data.newRightlink = data.prevTail = InvalidBlockNumber;
239 
240 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
241 	metapage = BufferGetPage(metabuffer);
242 
243 	if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
244 	{
245 		/*
246 		 * Total size is greater than one page => make sublist
247 		 */
248 		separateList = true;
249 	}
250 	else
251 	{
252 		LockBuffer(metabuffer, GIN_EXCLUSIVE);
253 		metadata = GinPageGetMeta(metapage);
254 
255 		if (metadata->head == InvalidBlockNumber ||
256 			collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
257 		{
258 			/*
259 			 * Pending list is empty or total size is greater than freespace
260 			 * on tail page => make sublist
261 			 *
262 			 * We unlock metabuffer to keep high concurrency
263 			 */
264 			separateList = true;
265 			LockBuffer(metabuffer, GIN_UNLOCK);
266 		}
267 	}
268 
269 	if (separateList)
270 	{
271 		/*
272 		 * We should make sublist separately and append it to the tail
273 		 */
274 		GinMetaPageData sublist;
275 
276 		memset(&sublist, 0, sizeof(GinMetaPageData));
277 		makeSublist(index, collector->tuples, collector->ntuples, &sublist);
278 
279 		if (needWal)
280 			XLogBeginInsert();
281 
282 		/*
283 		 * metapage was unlocked, see above
284 		 */
285 		LockBuffer(metabuffer, GIN_EXCLUSIVE);
286 		metadata = GinPageGetMeta(metapage);
287 
288 		if (metadata->head == InvalidBlockNumber)
289 		{
290 			/*
291 			 * Main list is empty, so just insert sublist as main list
292 			 */
293 			START_CRIT_SECTION();
294 
295 			metadata->head = sublist.head;
296 			metadata->tail = sublist.tail;
297 			metadata->tailFreeSize = sublist.tailFreeSize;
298 
299 			metadata->nPendingPages = sublist.nPendingPages;
300 			metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
301 		}
302 		else
303 		{
304 			/*
305 			 * Merge lists
306 			 */
307 			data.prevTail = metadata->tail;
308 			data.newRightlink = sublist.head;
309 
310 			buffer = ReadBuffer(index, metadata->tail);
311 			LockBuffer(buffer, GIN_EXCLUSIVE);
312 			page = BufferGetPage(buffer);
313 
314 			Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
315 
316 			START_CRIT_SECTION();
317 
318 			GinPageGetOpaque(page)->rightlink = sublist.head;
319 
320 			MarkBufferDirty(buffer);
321 
322 			metadata->tail = sublist.tail;
323 			metadata->tailFreeSize = sublist.tailFreeSize;
324 
325 			metadata->nPendingPages += sublist.nPendingPages;
326 			metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
327 
328 			if (needWal)
329 				XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
330 		}
331 	}
332 	else
333 	{
334 		/*
335 		 * Insert into tail page.  Metapage is already locked
336 		 */
337 		OffsetNumber l,
338 					off;
339 		int			i,
340 					tupsize;
341 		char	   *ptr;
342 		char	   *collectordata;
343 
344 		buffer = ReadBuffer(index, metadata->tail);
345 		LockBuffer(buffer, GIN_EXCLUSIVE);
346 		page = BufferGetPage(buffer);
347 
348 		off = (PageIsEmpty(page)) ? FirstOffsetNumber :
349 			OffsetNumberNext(PageGetMaxOffsetNumber(page));
350 
351 		collectordata = ptr = (char *) palloc(collector->sumsize);
352 
353 		data.ntuples = collector->ntuples;
354 
355 		if (needWal)
356 			XLogBeginInsert();
357 
358 		START_CRIT_SECTION();
359 
360 		/*
361 		 * Increase counter of heap tuples
362 		 */
363 		Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
364 		GinPageGetOpaque(page)->maxoff++;
365 		metadata->nPendingHeapTuples++;
366 
367 		for (i = 0; i < collector->ntuples; i++)
368 		{
369 			tupsize = IndexTupleSize(collector->tuples[i]);
370 			l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
371 
372 			if (l == InvalidOffsetNumber)
373 				elog(ERROR, "failed to add item to index page in \"%s\"",
374 					 RelationGetRelationName(index));
375 
376 			memcpy(ptr, collector->tuples[i], tupsize);
377 			ptr += tupsize;
378 
379 			off++;
380 		}
381 
382 		Assert((ptr - collectordata) <= collector->sumsize);
383 		if (needWal)
384 		{
385 			XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
386 			XLogRegisterBufData(1, collectordata, collector->sumsize);
387 		}
388 
389 		metadata->tailFreeSize = PageGetExactFreeSpace(page);
390 
391 		MarkBufferDirty(buffer);
392 	}
393 
394 	/*
395 	 * Write metabuffer, make xlog entry
396 	 */
397 	MarkBufferDirty(metabuffer);
398 
399 	if (needWal)
400 	{
401 		XLogRecPtr	recptr;
402 
403 		memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
404 
405 		XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
406 		XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
407 
408 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
409 		PageSetLSN(metapage, recptr);
410 
411 		if (buffer != InvalidBuffer)
412 		{
413 			PageSetLSN(page, recptr);
414 		}
415 	}
416 
417 	if (buffer != InvalidBuffer)
418 		UnlockReleaseBuffer(buffer);
419 
420 	/*
421 	 * Force pending list cleanup when it becomes too long. And,
422 	 * ginInsertCleanup could take significant amount of time, so we prefer to
423 	 * call it when it can do all the work in a single collection cycle. In
424 	 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
425 	 * while pending list is still small enough to fit into
426 	 * gin_pending_list_limit.
427 	 *
428 	 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
429 	 */
430 	cleanupSize = GinGetPendingListCleanupSize(index);
431 	if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
432 		needCleanup = true;
433 
434 	UnlockReleaseBuffer(metabuffer);
435 
436 	END_CRIT_SECTION();
437 
438 	/*
439 	 * Since it could contend with concurrent cleanup process we cleanup
440 	 * pending list not forcibly.
441 	 */
442 	if (needCleanup)
443 		ginInsertCleanup(ginstate, false, true, false, NULL);
444 }
445 
446 /*
447  * Create temporary index tuples for a single indexable item (one index column
448  * for the heap tuple specified by ht_ctid), and append them to the array
449  * in *collector.  They will subsequently be written out using
450  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
451  * temp tuples for a given heap tuple must be written in one call to
452  * ginHeapTupleFastInsert.
453  */
454 void
ginHeapTupleFastCollect(GinState * ginstate,GinTupleCollector * collector,OffsetNumber attnum,Datum value,bool isNull,ItemPointer ht_ctid)455 ginHeapTupleFastCollect(GinState *ginstate,
456 						GinTupleCollector *collector,
457 						OffsetNumber attnum, Datum value, bool isNull,
458 						ItemPointer ht_ctid)
459 {
460 	Datum	   *entries;
461 	GinNullCategory *categories;
462 	int32		i,
463 				nentries;
464 
465 	/*
466 	 * Extract the key values that need to be inserted in the index
467 	 */
468 	entries = ginExtractEntries(ginstate, attnum, value, isNull,
469 								&nentries, &categories);
470 
471 	/*
472 	 * Allocate/reallocate memory for storing collected tuples
473 	 */
474 	if (collector->tuples == NULL)
475 	{
476 		collector->lentuples = nentries * ginstate->origTupdesc->natts;
477 		collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
478 	}
479 
480 	while (collector->ntuples + nentries > collector->lentuples)
481 	{
482 		collector->lentuples *= 2;
483 		collector->tuples = (IndexTuple *) repalloc(collector->tuples,
484 													sizeof(IndexTuple) * collector->lentuples);
485 	}
486 
487 	/*
488 	 * Build an index tuple for each key value, and add to array.  In pending
489 	 * tuples we just stick the heap TID into t_tid.
490 	 */
491 	for (i = 0; i < nentries; i++)
492 	{
493 		IndexTuple	itup;
494 
495 		itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
496 							NULL, 0, 0, true);
497 		itup->t_tid = *ht_ctid;
498 		collector->tuples[collector->ntuples++] = itup;
499 		collector->sumsize += IndexTupleSize(itup);
500 	}
501 }
502 
503 /*
504  * Deletes pending list pages up to (not including) newHead page.
505  * If newHead == InvalidBlockNumber then function drops the whole list.
506  *
507  * metapage is pinned and exclusive-locked throughout this function.
508  */
509 static void
shiftList(Relation index,Buffer metabuffer,BlockNumber newHead,bool fill_fsm,IndexBulkDeleteResult * stats)510 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
511 		  bool fill_fsm, IndexBulkDeleteResult *stats)
512 {
513 	Page		metapage;
514 	GinMetaPageData *metadata;
515 	BlockNumber blknoToDelete;
516 
517 	metapage = BufferGetPage(metabuffer);
518 	metadata = GinPageGetMeta(metapage);
519 	blknoToDelete = metadata->head;
520 
521 	do
522 	{
523 		Page		page;
524 		int			i;
525 		int64		nDeletedHeapTuples = 0;
526 		ginxlogDeleteListPages data;
527 		Buffer		buffers[GIN_NDELETE_AT_ONCE];
528 		BlockNumber freespace[GIN_NDELETE_AT_ONCE];
529 
530 		data.ndeleted = 0;
531 		while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
532 		{
533 			freespace[data.ndeleted] = blknoToDelete;
534 			buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
535 			LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
536 			page = BufferGetPage(buffers[data.ndeleted]);
537 
538 			data.ndeleted++;
539 
540 			Assert(!GinPageIsDeleted(page));
541 
542 			nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
543 			blknoToDelete = GinPageGetOpaque(page)->rightlink;
544 		}
545 
546 		if (stats)
547 			stats->pages_deleted += data.ndeleted;
548 
549 		/*
550 		 * This operation touches an unusually large number of pages, so
551 		 * prepare the XLogInsert machinery for that before entering the
552 		 * critical section.
553 		 */
554 		if (RelationNeedsWAL(index))
555 			XLogEnsureRecordSpace(data.ndeleted, 0);
556 
557 		START_CRIT_SECTION();
558 
559 		metadata->head = blknoToDelete;
560 
561 		Assert(metadata->nPendingPages >= data.ndeleted);
562 		metadata->nPendingPages -= data.ndeleted;
563 		Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
564 		metadata->nPendingHeapTuples -= nDeletedHeapTuples;
565 
566 		if (blknoToDelete == InvalidBlockNumber)
567 		{
568 			metadata->tail = InvalidBlockNumber;
569 			metadata->tailFreeSize = 0;
570 			metadata->nPendingPages = 0;
571 			metadata->nPendingHeapTuples = 0;
572 		}
573 
574 		MarkBufferDirty(metabuffer);
575 
576 		for (i = 0; i < data.ndeleted; i++)
577 		{
578 			page = BufferGetPage(buffers[i]);
579 			GinPageGetOpaque(page)->flags = GIN_DELETED;
580 			MarkBufferDirty(buffers[i]);
581 		}
582 
583 		if (RelationNeedsWAL(index))
584 		{
585 			XLogRecPtr	recptr;
586 
587 			XLogBeginInsert();
588 			XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
589 			for (i = 0; i < data.ndeleted; i++)
590 				XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
591 
592 			memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
593 
594 			XLogRegisterData((char *) &data,
595 							 sizeof(ginxlogDeleteListPages));
596 
597 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
598 			PageSetLSN(metapage, recptr);
599 
600 			for (i = 0; i < data.ndeleted; i++)
601 			{
602 				page = BufferGetPage(buffers[i]);
603 				PageSetLSN(page, recptr);
604 			}
605 		}
606 
607 		for (i = 0; i < data.ndeleted; i++)
608 			UnlockReleaseBuffer(buffers[i]);
609 
610 		END_CRIT_SECTION();
611 
612 		for (i = 0; fill_fsm && i < data.ndeleted; i++)
613 			RecordFreeIndexPage(index, freespace[i]);
614 
615 	} while (blknoToDelete != newHead);
616 }
617 
618 /* Initialize empty KeyArray */
619 static void
initKeyArray(KeyArray * keys,int32 maxvalues)620 initKeyArray(KeyArray *keys, int32 maxvalues)
621 {
622 	keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
623 	keys->categories = (GinNullCategory *)
624 		palloc(sizeof(GinNullCategory) * maxvalues);
625 	keys->nvalues = 0;
626 	keys->maxvalues = maxvalues;
627 }
628 
629 /* Add datum to KeyArray, resizing if needed */
630 static void
addDatum(KeyArray * keys,Datum datum,GinNullCategory category)631 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
632 {
633 	if (keys->nvalues >= keys->maxvalues)
634 	{
635 		keys->maxvalues *= 2;
636 		keys->keys = (Datum *)
637 			repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
638 		keys->categories = (GinNullCategory *)
639 			repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
640 	}
641 
642 	keys->keys[keys->nvalues] = datum;
643 	keys->categories[keys->nvalues] = category;
644 	keys->nvalues++;
645 }
646 
647 /*
648  * Collect data from a pending-list page in preparation for insertion into
649  * the main index.
650  *
651  * Go through all tuples >= startoff on page and collect values in accum
652  *
653  * Note that ka is just workspace --- it does not carry any state across
654  * calls.
655  */
656 static void
processPendingPage(BuildAccumulator * accum,KeyArray * ka,Page page,OffsetNumber startoff)657 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
658 				   Page page, OffsetNumber startoff)
659 {
660 	ItemPointerData heapptr;
661 	OffsetNumber i,
662 				maxoff;
663 	OffsetNumber attrnum;
664 
665 	/* reset *ka to empty */
666 	ka->nvalues = 0;
667 
668 	maxoff = PageGetMaxOffsetNumber(page);
669 	Assert(maxoff >= FirstOffsetNumber);
670 	ItemPointerSetInvalid(&heapptr);
671 	attrnum = 0;
672 
673 	for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
674 	{
675 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
676 		OffsetNumber curattnum;
677 		Datum		curkey;
678 		GinNullCategory curcategory;
679 
680 		/* Check for change of heap TID or attnum */
681 		curattnum = gintuple_get_attrnum(accum->ginstate, itup);
682 
683 		if (!ItemPointerIsValid(&heapptr))
684 		{
685 			heapptr = itup->t_tid;
686 			attrnum = curattnum;
687 		}
688 		else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
689 				   curattnum == attrnum))
690 		{
691 			/*
692 			 * ginInsertBAEntries can insert several datums per call, but only
693 			 * for one heap tuple and one column.  So call it at a boundary,
694 			 * and reset ka.
695 			 */
696 			ginInsertBAEntries(accum, &heapptr, attrnum,
697 							   ka->keys, ka->categories, ka->nvalues);
698 			ka->nvalues = 0;
699 			heapptr = itup->t_tid;
700 			attrnum = curattnum;
701 		}
702 
703 		/* Add key to KeyArray */
704 		curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
705 		addDatum(ka, curkey, curcategory);
706 	}
707 
708 	/* Dump out all remaining keys */
709 	ginInsertBAEntries(accum, &heapptr, attrnum,
710 					   ka->keys, ka->categories, ka->nvalues);
711 }
712 
713 /*
714  * Move tuples from pending pages into regular GIN structure.
715  *
716  * On first glance it looks completely not crash-safe. But if we crash
717  * after posting entries to the main index and before removing them from the
718  * pending list, it's okay because when we redo the posting later on, nothing
719  * bad will happen.
720  *
721  * fill_fsm indicates that ginInsertCleanup should add deleted pages
722  * to FSM otherwise caller is responsible to put deleted pages into
723  * FSM.
724  *
725  * If stats isn't null, we count deleted pending pages into the counts.
726  */
727 void
ginInsertCleanup(GinState * ginstate,bool full_clean,bool fill_fsm,bool forceCleanup,IndexBulkDeleteResult * stats)728 ginInsertCleanup(GinState *ginstate, bool full_clean,
729 				 bool fill_fsm, bool forceCleanup,
730 				 IndexBulkDeleteResult *stats)
731 {
732 	Relation	index = ginstate->index;
733 	Buffer		metabuffer,
734 				buffer;
735 	Page		metapage,
736 				page;
737 	GinMetaPageData *metadata;
738 	MemoryContext opCtx,
739 				oldCtx;
740 	BuildAccumulator accum;
741 	KeyArray	datums;
742 	BlockNumber blkno,
743 				blknoFinish;
744 	bool		cleanupFinish = false;
745 	bool		fsm_vac = false;
746 	Size		workMemory;
747 
748 	/*
749 	 * We would like to prevent concurrent cleanup process. For that we will
750 	 * lock metapage in exclusive mode using LockPage() call. Nobody other
751 	 * will use that lock for metapage, so we keep possibility of concurrent
752 	 * insertion into pending list
753 	 */
754 
755 	if (forceCleanup)
756 	{
757 		/*
758 		 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
759 		 * and we would like to wait concurrent cleanup to finish.
760 		 */
761 		LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
762 		workMemory =
763 			(IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
764 			autovacuum_work_mem : maintenance_work_mem;
765 	}
766 	else
767 	{
768 		/*
769 		 * We are called from regular insert and if we see concurrent cleanup
770 		 * just exit in hope that concurrent process will clean up pending
771 		 * list.
772 		 */
773 		if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
774 			return;
775 		workMemory = work_mem;
776 	}
777 
778 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
779 	LockBuffer(metabuffer, GIN_SHARE);
780 	metapage = BufferGetPage(metabuffer);
781 	metadata = GinPageGetMeta(metapage);
782 
783 	if (metadata->head == InvalidBlockNumber)
784 	{
785 		/* Nothing to do */
786 		UnlockReleaseBuffer(metabuffer);
787 		UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
788 		return;
789 	}
790 
791 	/*
792 	 * Remember a tail page to prevent infinite cleanup if other backends add
793 	 * new tuples faster than we can cleanup.
794 	 */
795 	blknoFinish = metadata->tail;
796 
797 	/*
798 	 * Read and lock head of pending list
799 	 */
800 	blkno = metadata->head;
801 	buffer = ReadBuffer(index, blkno);
802 	LockBuffer(buffer, GIN_SHARE);
803 	page = BufferGetPage(buffer);
804 
805 	LockBuffer(metabuffer, GIN_UNLOCK);
806 
807 	/*
808 	 * Initialize.  All temporary space will be in opCtx
809 	 */
810 	opCtx = AllocSetContextCreate(CurrentMemoryContext,
811 								  "GIN insert cleanup temporary context",
812 								  ALLOCSET_DEFAULT_SIZES);
813 
814 	oldCtx = MemoryContextSwitchTo(opCtx);
815 
816 	initKeyArray(&datums, 128);
817 	ginInitBA(&accum);
818 	accum.ginstate = ginstate;
819 
820 	/*
821 	 * At the top of this loop, we have pin and lock on the current page of
822 	 * the pending list.  However, we'll release that before exiting the loop.
823 	 * Note we also have pin but not lock on the metapage.
824 	 */
825 	for (;;)
826 	{
827 		Assert(!GinPageIsDeleted(page));
828 
829 		/*
830 		 * Are we walk through the page which as we remember was a tail when
831 		 * we start our cleanup?  But if caller asks us to clean up whole
832 		 * pending list then ignore old tail, we will work until list becomes
833 		 * empty.
834 		 */
835 		if (blkno == blknoFinish && full_clean == false)
836 			cleanupFinish = true;
837 
838 		/*
839 		 * read page's datums into accum
840 		 */
841 		processPendingPage(&accum, &datums, page, FirstOffsetNumber);
842 
843 		vacuum_delay_point();
844 
845 		/*
846 		 * Is it time to flush memory to disk?	Flush if we are at the end of
847 		 * the pending list, or if we have a full row and memory is getting
848 		 * full.
849 		 */
850 		if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
851 			(GinPageHasFullRow(page) &&
852 			 (accum.allocatedMemory >= workMemory * 1024L)))
853 		{
854 			ItemPointerData *list;
855 			uint32		nlist;
856 			Datum		key;
857 			GinNullCategory category;
858 			OffsetNumber maxoff,
859 						attnum;
860 
861 			/*
862 			 * Unlock current page to increase performance. Changes of page
863 			 * will be checked later by comparing maxoff after completion of
864 			 * memory flush.
865 			 */
866 			maxoff = PageGetMaxOffsetNumber(page);
867 			LockBuffer(buffer, GIN_UNLOCK);
868 
869 			/*
870 			 * Moving collected data into regular structure can take
871 			 * significant amount of time - so, run it without locking pending
872 			 * list.
873 			 */
874 			ginBeginBAScan(&accum);
875 			while ((list = ginGetBAEntry(&accum,
876 										 &attnum, &key, &category, &nlist)) != NULL)
877 			{
878 				ginEntryInsert(ginstate, attnum, key, category,
879 							   list, nlist, NULL);
880 				vacuum_delay_point();
881 			}
882 
883 			/*
884 			 * Lock the whole list to remove pages
885 			 */
886 			LockBuffer(metabuffer, GIN_EXCLUSIVE);
887 			LockBuffer(buffer, GIN_SHARE);
888 
889 			Assert(!GinPageIsDeleted(page));
890 
891 			/*
892 			 * While we left the page unlocked, more stuff might have gotten
893 			 * added to it.  If so, process those entries immediately.  There
894 			 * shouldn't be very many, so we don't worry about the fact that
895 			 * we're doing this with exclusive lock. Insertion algorithm
896 			 * guarantees that inserted row(s) will not continue on next page.
897 			 * NOTE: intentionally no vacuum_delay_point in this loop.
898 			 */
899 			if (PageGetMaxOffsetNumber(page) != maxoff)
900 			{
901 				ginInitBA(&accum);
902 				processPendingPage(&accum, &datums, page, maxoff + 1);
903 
904 				ginBeginBAScan(&accum);
905 				while ((list = ginGetBAEntry(&accum,
906 											 &attnum, &key, &category, &nlist)) != NULL)
907 					ginEntryInsert(ginstate, attnum, key, category,
908 								   list, nlist, NULL);
909 			}
910 
911 			/*
912 			 * Remember next page - it will become the new list head
913 			 */
914 			blkno = GinPageGetOpaque(page)->rightlink;
915 			UnlockReleaseBuffer(buffer);	/* shiftList will do exclusive
916 											 * locking */
917 
918 			/*
919 			 * remove read pages from pending list, at this point all content
920 			 * of read pages is in regular structure
921 			 */
922 			shiftList(index, metabuffer, blkno, fill_fsm, stats);
923 
924 			/* At this point, some pending pages have been freed up */
925 			fsm_vac = true;
926 
927 			Assert(blkno == metadata->head);
928 			LockBuffer(metabuffer, GIN_UNLOCK);
929 
930 			/*
931 			 * if we removed the whole pending list or we cleanup tail (which
932 			 * we remembered on start our cleanup process) then just exit
933 			 */
934 			if (blkno == InvalidBlockNumber || cleanupFinish)
935 				break;
936 
937 			/*
938 			 * release memory used so far and reinit state
939 			 */
940 			MemoryContextReset(opCtx);
941 			initKeyArray(&datums, datums.maxvalues);
942 			ginInitBA(&accum);
943 		}
944 		else
945 		{
946 			blkno = GinPageGetOpaque(page)->rightlink;
947 			UnlockReleaseBuffer(buffer);
948 		}
949 
950 		/*
951 		 * Read next page in pending list
952 		 */
953 		vacuum_delay_point();
954 		buffer = ReadBuffer(index, blkno);
955 		LockBuffer(buffer, GIN_SHARE);
956 		page = BufferGetPage(buffer);
957 	}
958 
959 	UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
960 	ReleaseBuffer(metabuffer);
961 
962 	/*
963 	 * As pending list pages can have a high churn rate, it is desirable to
964 	 * recycle them immediately to the FreeSpace Map when ordinary backends
965 	 * clean the list.
966 	 */
967 	if (fsm_vac && fill_fsm)
968 		IndexFreeSpaceMapVacuum(index);
969 
970 
971 	/* Clean up temporary space */
972 	MemoryContextSwitchTo(oldCtx);
973 	MemoryContextDelete(opCtx);
974 }
975 
976 /*
977  * SQL-callable function to clean the insert pending list
978  */
979 Datum
gin_clean_pending_list(PG_FUNCTION_ARGS)980 gin_clean_pending_list(PG_FUNCTION_ARGS)
981 {
982 	Oid			indexoid = PG_GETARG_OID(0);
983 	Relation	indexRel = index_open(indexoid, AccessShareLock);
984 	IndexBulkDeleteResult stats;
985 	GinState	ginstate;
986 
987 	if (RecoveryInProgress())
988 		ereport(ERROR,
989 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
990 				 errmsg("recovery is in progress"),
991 				 errhint("GIN pending list cannot be cleaned up during recovery.")));
992 
993 	/* Must be a GIN index */
994 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
995 		indexRel->rd_rel->relam != GIN_AM_OID)
996 		ereport(ERROR,
997 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
998 				 errmsg("\"%s\" is not a GIN index",
999 						RelationGetRelationName(indexRel))));
1000 
1001 	/*
1002 	 * Reject attempts to read non-local temporary relations; we would be
1003 	 * likely to get wrong data since we have no visibility into the owning
1004 	 * session's local buffers.
1005 	 */
1006 	if (RELATION_IS_OTHER_TEMP(indexRel))
1007 		ereport(ERROR,
1008 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1009 				 errmsg("cannot access temporary indexes of other sessions")));
1010 
1011 	/* User must own the index (comparable to privileges needed for VACUUM) */
1012 	if (!pg_class_ownercheck(indexoid, GetUserId()))
1013 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1014 					   RelationGetRelationName(indexRel));
1015 
1016 	memset(&stats, 0, sizeof(stats));
1017 	initGinState(&ginstate, indexRel);
1018 	ginInsertCleanup(&ginstate, true, true, true, &stats);
1019 
1020 	index_close(indexRel, AccessShareLock);
1021 
1022 	PG_RETURN_INT64((int64) stats.pages_deleted);
1023 }
1024