1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  *	  Fast insert routines for the Postgres inverted index access method.
5  *	  Pending entries are stored in linear list of pages.  Later on
6  *	  (typically during VACUUM), ginInsertCleanup() will be invoked to
7  *	  transfer pending entries into the regular index structure.  This
8  *	  wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *			src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/gin_private.h"
22 #include "access/xloginsert.h"
23 #include "access/xlog.h"
24 #include "commands/vacuum.h"
25 #include "catalog/pg_am.h"
26 #include "miscadmin.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 #include "utils/acl.h"
30 #include "postmaster/autovacuum.h"
31 #include "storage/indexfsm.h"
32 #include "storage/lmgr.h"
33 
34 /* GUC parameter */
35 int			gin_pending_list_limit = 0;
36 
37 #define GIN_PAGE_FREESIZE \
38 	( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
39 
40 typedef struct KeyArray
41 {
42 	Datum	   *keys;			/* expansible array */
43 	GinNullCategory *categories;	/* another expansible array */
44 	int32		nvalues;		/* current number of valid entries */
45 	int32		maxvalues;		/* allocated size of arrays */
46 } KeyArray;
47 
48 
49 /*
50  * Build a pending-list page from the given array of tuples, and write it out.
51  *
52  * Returns amount of free space left on the page.
53  */
54 static int32
writeListPage(Relation index,Buffer buffer,IndexTuple * tuples,int32 ntuples,BlockNumber rightlink)55 writeListPage(Relation index, Buffer buffer,
56 			  IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
57 {
58 	Page		page = BufferGetPage(buffer);
59 	int32		i,
60 				freesize,
61 				size = 0;
62 	OffsetNumber l,
63 				off;
64 	PGAlignedBlock workspace;
65 	char	   *ptr;
66 
67 	START_CRIT_SECTION();
68 
69 	GinInitBuffer(buffer, GIN_LIST);
70 
71 	off = FirstOffsetNumber;
72 	ptr = workspace.data;
73 
74 	for (i = 0; i < ntuples; i++)
75 	{
76 		int			this_size = IndexTupleSize(tuples[i]);
77 
78 		memcpy(ptr, tuples[i], this_size);
79 		ptr += this_size;
80 		size += this_size;
81 
82 		l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
83 
84 		if (l == InvalidOffsetNumber)
85 			elog(ERROR, "failed to add item to index page in \"%s\"",
86 				 RelationGetRelationName(index));
87 
88 		off++;
89 	}
90 
91 	Assert(size <= BLCKSZ);		/* else we overran workspace */
92 
93 	GinPageGetOpaque(page)->rightlink = rightlink;
94 
95 	/*
96 	 * tail page may contain only whole row(s) or final part of row placed on
97 	 * previous pages (a "row" here meaning all the index tuples generated for
98 	 * one heap tuple)
99 	 */
100 	if (rightlink == InvalidBlockNumber)
101 	{
102 		GinPageSetFullRow(page);
103 		GinPageGetOpaque(page)->maxoff = 1;
104 	}
105 	else
106 	{
107 		GinPageGetOpaque(page)->maxoff = 0;
108 	}
109 
110 	MarkBufferDirty(buffer);
111 
112 	if (RelationNeedsWAL(index))
113 	{
114 		ginxlogInsertListPage data;
115 		XLogRecPtr	recptr;
116 
117 		data.rightlink = rightlink;
118 		data.ntuples = ntuples;
119 
120 		XLogBeginInsert();
121 		XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
122 
123 		XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
124 		XLogRegisterBufData(0, workspace.data, size);
125 
126 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
127 		PageSetLSN(page, recptr);
128 	}
129 
130 	/* get free space before releasing buffer */
131 	freesize = PageGetExactFreeSpace(page);
132 
133 	UnlockReleaseBuffer(buffer);
134 
135 	END_CRIT_SECTION();
136 
137 	return freesize;
138 }
139 
140 static void
makeSublist(Relation index,IndexTuple * tuples,int32 ntuples,GinMetaPageData * res)141 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
142 			GinMetaPageData *res)
143 {
144 	Buffer		curBuffer = InvalidBuffer;
145 	Buffer		prevBuffer = InvalidBuffer;
146 	int			i,
147 				size = 0,
148 				tupsize;
149 	int			startTuple = 0;
150 
151 	Assert(ntuples > 0);
152 
153 	/*
154 	 * Split tuples into pages
155 	 */
156 	for (i = 0; i < ntuples; i++)
157 	{
158 		if (curBuffer == InvalidBuffer)
159 		{
160 			curBuffer = GinNewBuffer(index);
161 
162 			if (prevBuffer != InvalidBuffer)
163 			{
164 				res->nPendingPages++;
165 				writeListPage(index, prevBuffer,
166 							  tuples + startTuple,
167 							  i - startTuple,
168 							  BufferGetBlockNumber(curBuffer));
169 			}
170 			else
171 			{
172 				res->head = BufferGetBlockNumber(curBuffer);
173 			}
174 
175 			prevBuffer = curBuffer;
176 			startTuple = i;
177 			size = 0;
178 		}
179 
180 		tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
181 
182 		if (size + tupsize > GinListPageSize)
183 		{
184 			/* won't fit, force a new page and reprocess */
185 			i--;
186 			curBuffer = InvalidBuffer;
187 		}
188 		else
189 		{
190 			size += tupsize;
191 		}
192 	}
193 
194 	/*
195 	 * Write last page
196 	 */
197 	res->tail = BufferGetBlockNumber(curBuffer);
198 	res->tailFreeSize = writeListPage(index, curBuffer,
199 									  tuples + startTuple,
200 									  ntuples - startTuple,
201 									  InvalidBlockNumber);
202 	res->nPendingPages++;
203 	/* that was only one heap tuple */
204 	res->nPendingHeapTuples = 1;
205 }
206 
207 /*
208  * Write the index tuples contained in *collector into the index's
209  * pending list.
210  *
211  * Function guarantees that all these tuples will be inserted consecutively,
212  * preserving order
213  */
214 void
ginHeapTupleFastInsert(GinState * ginstate,GinTupleCollector * collector)215 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
216 {
217 	Relation	index = ginstate->index;
218 	Buffer		metabuffer;
219 	Page		metapage;
220 	GinMetaPageData *metadata = NULL;
221 	Buffer		buffer = InvalidBuffer;
222 	Page		page = NULL;
223 	ginxlogUpdateMeta data;
224 	bool		separateList = false;
225 	bool		needCleanup = false;
226 	int			cleanupSize;
227 	bool		needWal;
228 
229 	if (collector->ntuples == 0)
230 		return;
231 
232 	needWal = RelationNeedsWAL(index);
233 
234 	data.node = index->rd_node;
235 	data.ntuples = 0;
236 	data.newRightlink = data.prevTail = InvalidBlockNumber;
237 
238 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
239 	metapage = BufferGetPage(metabuffer);
240 
241 	if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
242 	{
243 		/*
244 		 * Total size is greater than one page => make sublist
245 		 */
246 		separateList = true;
247 	}
248 	else
249 	{
250 		LockBuffer(metabuffer, GIN_EXCLUSIVE);
251 		metadata = GinPageGetMeta(metapage);
252 
253 		if (metadata->head == InvalidBlockNumber ||
254 			collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
255 		{
256 			/*
257 			 * Pending list is empty or total size is greater than freespace
258 			 * on tail page => make sublist
259 			 *
260 			 * We unlock metabuffer to keep high concurrency
261 			 */
262 			separateList = true;
263 			LockBuffer(metabuffer, GIN_UNLOCK);
264 		}
265 	}
266 
267 	if (separateList)
268 	{
269 		/*
270 		 * We should make sublist separately and append it to the tail
271 		 */
272 		GinMetaPageData sublist;
273 
274 		memset(&sublist, 0, sizeof(GinMetaPageData));
275 		makeSublist(index, collector->tuples, collector->ntuples, &sublist);
276 
277 		if (needWal)
278 			XLogBeginInsert();
279 
280 		/*
281 		 * metapage was unlocked, see above
282 		 */
283 		LockBuffer(metabuffer, GIN_EXCLUSIVE);
284 		metadata = GinPageGetMeta(metapage);
285 
286 		if (metadata->head == InvalidBlockNumber)
287 		{
288 			/*
289 			 * Main list is empty, so just insert sublist as main list
290 			 */
291 			START_CRIT_SECTION();
292 
293 			metadata->head = sublist.head;
294 			metadata->tail = sublist.tail;
295 			metadata->tailFreeSize = sublist.tailFreeSize;
296 
297 			metadata->nPendingPages = sublist.nPendingPages;
298 			metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
299 		}
300 		else
301 		{
302 			/*
303 			 * Merge lists
304 			 */
305 			data.prevTail = metadata->tail;
306 			data.newRightlink = sublist.head;
307 
308 			buffer = ReadBuffer(index, metadata->tail);
309 			LockBuffer(buffer, GIN_EXCLUSIVE);
310 			page = BufferGetPage(buffer);
311 
312 			Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
313 
314 			START_CRIT_SECTION();
315 
316 			GinPageGetOpaque(page)->rightlink = sublist.head;
317 
318 			MarkBufferDirty(buffer);
319 
320 			metadata->tail = sublist.tail;
321 			metadata->tailFreeSize = sublist.tailFreeSize;
322 
323 			metadata->nPendingPages += sublist.nPendingPages;
324 			metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
325 
326 			if (needWal)
327 				XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
328 		}
329 	}
330 	else
331 	{
332 		/*
333 		 * Insert into tail page.  Metapage is already locked
334 		 */
335 		OffsetNumber l,
336 					off;
337 		int			i,
338 					tupsize;
339 		char	   *ptr;
340 		char	   *collectordata;
341 
342 		buffer = ReadBuffer(index, metadata->tail);
343 		LockBuffer(buffer, GIN_EXCLUSIVE);
344 		page = BufferGetPage(buffer);
345 
346 		off = (PageIsEmpty(page)) ? FirstOffsetNumber :
347 			OffsetNumberNext(PageGetMaxOffsetNumber(page));
348 
349 		collectordata = ptr = (char *) palloc(collector->sumsize);
350 
351 		data.ntuples = collector->ntuples;
352 
353 		if (needWal)
354 			XLogBeginInsert();
355 
356 		START_CRIT_SECTION();
357 
358 		/*
359 		 * Increase counter of heap tuples
360 		 */
361 		Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
362 		GinPageGetOpaque(page)->maxoff++;
363 		metadata->nPendingHeapTuples++;
364 
365 		for (i = 0; i < collector->ntuples; i++)
366 		{
367 			tupsize = IndexTupleSize(collector->tuples[i]);
368 			l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
369 
370 			if (l == InvalidOffsetNumber)
371 				elog(ERROR, "failed to add item to index page in \"%s\"",
372 					 RelationGetRelationName(index));
373 
374 			memcpy(ptr, collector->tuples[i], tupsize);
375 			ptr += tupsize;
376 
377 			off++;
378 		}
379 
380 		Assert((ptr - collectordata) <= collector->sumsize);
381 		if (needWal)
382 		{
383 			XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
384 			XLogRegisterBufData(1, collectordata, collector->sumsize);
385 		}
386 
387 		metadata->tailFreeSize = PageGetExactFreeSpace(page);
388 
389 		MarkBufferDirty(buffer);
390 	}
391 
392 	/*
393 	 * Write metabuffer, make xlog entry
394 	 */
395 	MarkBufferDirty(metabuffer);
396 
397 	if (needWal)
398 	{
399 		XLogRecPtr	recptr;
400 
401 		memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
402 
403 		XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
404 		XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
405 
406 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
407 		PageSetLSN(metapage, recptr);
408 
409 		if (buffer != InvalidBuffer)
410 		{
411 			PageSetLSN(page, recptr);
412 		}
413 	}
414 
415 	if (buffer != InvalidBuffer)
416 		UnlockReleaseBuffer(buffer);
417 
418 	/*
419 	 * Force pending list cleanup when it becomes too long. And,
420 	 * ginInsertCleanup could take significant amount of time, so we prefer to
421 	 * call it when it can do all the work in a single collection cycle. In
422 	 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
423 	 * while pending list is still small enough to fit into
424 	 * gin_pending_list_limit.
425 	 *
426 	 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
427 	 */
428 	cleanupSize = GinGetPendingListCleanupSize(index);
429 	if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
430 		needCleanup = true;
431 
432 	UnlockReleaseBuffer(metabuffer);
433 
434 	END_CRIT_SECTION();
435 
436 	/*
437 	 * Since it could contend with concurrent cleanup process we cleanup
438 	 * pending list not forcibly.
439 	 */
440 	if (needCleanup)
441 		ginInsertCleanup(ginstate, false, true, false, NULL);
442 }
443 
444 /*
445  * Create temporary index tuples for a single indexable item (one index column
446  * for the heap tuple specified by ht_ctid), and append them to the array
447  * in *collector.  They will subsequently be written out using
448  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
449  * temp tuples for a given heap tuple must be written in one call to
450  * ginHeapTupleFastInsert.
451  */
452 void
ginHeapTupleFastCollect(GinState * ginstate,GinTupleCollector * collector,OffsetNumber attnum,Datum value,bool isNull,ItemPointer ht_ctid)453 ginHeapTupleFastCollect(GinState *ginstate,
454 						GinTupleCollector *collector,
455 						OffsetNumber attnum, Datum value, bool isNull,
456 						ItemPointer ht_ctid)
457 {
458 	Datum	   *entries;
459 	GinNullCategory *categories;
460 	int32		i,
461 				nentries;
462 
463 	/*
464 	 * Extract the key values that need to be inserted in the index
465 	 */
466 	entries = ginExtractEntries(ginstate, attnum, value, isNull,
467 								&nentries, &categories);
468 
469 	/*
470 	 * Allocate/reallocate memory for storing collected tuples
471 	 */
472 	if (collector->tuples == NULL)
473 	{
474 		collector->lentuples = nentries * ginstate->origTupdesc->natts;
475 		collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
476 	}
477 
478 	while (collector->ntuples + nentries > collector->lentuples)
479 	{
480 		collector->lentuples *= 2;
481 		collector->tuples = (IndexTuple *) repalloc(collector->tuples,
482 								  sizeof(IndexTuple) * collector->lentuples);
483 	}
484 
485 	/*
486 	 * Build an index tuple for each key value, and add to array.  In pending
487 	 * tuples we just stick the heap TID into t_tid.
488 	 */
489 	for (i = 0; i < nentries; i++)
490 	{
491 		IndexTuple	itup;
492 
493 		itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
494 							NULL, 0, 0, true);
495 		itup->t_tid = *ht_ctid;
496 		collector->tuples[collector->ntuples++] = itup;
497 		collector->sumsize += IndexTupleSize(itup);
498 	}
499 }
500 
501 /*
502  * Deletes pending list pages up to (not including) newHead page.
503  * If newHead == InvalidBlockNumber then function drops the whole list.
504  *
505  * metapage is pinned and exclusive-locked throughout this function.
506  */
507 static void
shiftList(Relation index,Buffer metabuffer,BlockNumber newHead,bool fill_fsm,IndexBulkDeleteResult * stats)508 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
509 		  bool fill_fsm, IndexBulkDeleteResult *stats)
510 {
511 	Page		metapage;
512 	GinMetaPageData *metadata;
513 	BlockNumber blknoToDelete;
514 
515 	metapage = BufferGetPage(metabuffer);
516 	metadata = GinPageGetMeta(metapage);
517 	blknoToDelete = metadata->head;
518 
519 	do
520 	{
521 		Page		page;
522 		int			i;
523 		int64		nDeletedHeapTuples = 0;
524 		ginxlogDeleteListPages data;
525 		Buffer		buffers[GIN_NDELETE_AT_ONCE];
526 		BlockNumber freespace[GIN_NDELETE_AT_ONCE];
527 
528 		data.ndeleted = 0;
529 		while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
530 		{
531 			freespace[data.ndeleted] = blknoToDelete;
532 			buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
533 			LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
534 			page = BufferGetPage(buffers[data.ndeleted]);
535 
536 			data.ndeleted++;
537 
538 			Assert(!GinPageIsDeleted(page));
539 
540 			nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
541 			blknoToDelete = GinPageGetOpaque(page)->rightlink;
542 		}
543 
544 		if (stats)
545 			stats->pages_deleted += data.ndeleted;
546 
547 		/*
548 		 * This operation touches an unusually large number of pages, so
549 		 * prepare the XLogInsert machinery for that before entering the
550 		 * critical section.
551 		 */
552 		if (RelationNeedsWAL(index))
553 			XLogEnsureRecordSpace(data.ndeleted, 0);
554 
555 		START_CRIT_SECTION();
556 
557 		metadata->head = blknoToDelete;
558 
559 		Assert(metadata->nPendingPages >= data.ndeleted);
560 		metadata->nPendingPages -= data.ndeleted;
561 		Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
562 		metadata->nPendingHeapTuples -= nDeletedHeapTuples;
563 
564 		if (blknoToDelete == InvalidBlockNumber)
565 		{
566 			metadata->tail = InvalidBlockNumber;
567 			metadata->tailFreeSize = 0;
568 			metadata->nPendingPages = 0;
569 			metadata->nPendingHeapTuples = 0;
570 		}
571 
572 		MarkBufferDirty(metabuffer);
573 
574 		for (i = 0; i < data.ndeleted; i++)
575 		{
576 			page = BufferGetPage(buffers[i]);
577 			GinPageGetOpaque(page)->flags = GIN_DELETED;
578 			MarkBufferDirty(buffers[i]);
579 		}
580 
581 		if (RelationNeedsWAL(index))
582 		{
583 			XLogRecPtr	recptr;
584 
585 			XLogBeginInsert();
586 			XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
587 			for (i = 0; i < data.ndeleted; i++)
588 				XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
589 
590 			memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
591 
592 			XLogRegisterData((char *) &data,
593 							 sizeof(ginxlogDeleteListPages));
594 
595 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
596 			PageSetLSN(metapage, recptr);
597 
598 			for (i = 0; i < data.ndeleted; i++)
599 			{
600 				page = BufferGetPage(buffers[i]);
601 				PageSetLSN(page, recptr);
602 			}
603 		}
604 
605 		for (i = 0; i < data.ndeleted; i++)
606 			UnlockReleaseBuffer(buffers[i]);
607 
608 		END_CRIT_SECTION();
609 
610 		for (i = 0; fill_fsm && i < data.ndeleted; i++)
611 			RecordFreeIndexPage(index, freespace[i]);
612 
613 	} while (blknoToDelete != newHead);
614 }
615 
616 /* Initialize empty KeyArray */
617 static void
initKeyArray(KeyArray * keys,int32 maxvalues)618 initKeyArray(KeyArray *keys, int32 maxvalues)
619 {
620 	keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
621 	keys->categories = (GinNullCategory *)
622 		palloc(sizeof(GinNullCategory) * maxvalues);
623 	keys->nvalues = 0;
624 	keys->maxvalues = maxvalues;
625 }
626 
627 /* Add datum to KeyArray, resizing if needed */
628 static void
addDatum(KeyArray * keys,Datum datum,GinNullCategory category)629 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
630 {
631 	if (keys->nvalues >= keys->maxvalues)
632 	{
633 		keys->maxvalues *= 2;
634 		keys->keys = (Datum *)
635 			repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
636 		keys->categories = (GinNullCategory *)
637 			repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
638 	}
639 
640 	keys->keys[keys->nvalues] = datum;
641 	keys->categories[keys->nvalues] = category;
642 	keys->nvalues++;
643 }
644 
645 /*
646  * Collect data from a pending-list page in preparation for insertion into
647  * the main index.
648  *
649  * Go through all tuples >= startoff on page and collect values in accum
650  *
651  * Note that ka is just workspace --- it does not carry any state across
652  * calls.
653  */
654 static void
processPendingPage(BuildAccumulator * accum,KeyArray * ka,Page page,OffsetNumber startoff)655 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
656 				   Page page, OffsetNumber startoff)
657 {
658 	ItemPointerData heapptr;
659 	OffsetNumber i,
660 				maxoff;
661 	OffsetNumber attrnum;
662 
663 	/* reset *ka to empty */
664 	ka->nvalues = 0;
665 
666 	maxoff = PageGetMaxOffsetNumber(page);
667 	Assert(maxoff >= FirstOffsetNumber);
668 	ItemPointerSetInvalid(&heapptr);
669 	attrnum = 0;
670 
671 	for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
672 	{
673 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
674 		OffsetNumber curattnum;
675 		Datum		curkey;
676 		GinNullCategory curcategory;
677 
678 		/* Check for change of heap TID or attnum */
679 		curattnum = gintuple_get_attrnum(accum->ginstate, itup);
680 
681 		if (!ItemPointerIsValid(&heapptr))
682 		{
683 			heapptr = itup->t_tid;
684 			attrnum = curattnum;
685 		}
686 		else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
687 				   curattnum == attrnum))
688 		{
689 			/*
690 			 * ginInsertBAEntries can insert several datums per call, but only
691 			 * for one heap tuple and one column.  So call it at a boundary,
692 			 * and reset ka.
693 			 */
694 			ginInsertBAEntries(accum, &heapptr, attrnum,
695 							   ka->keys, ka->categories, ka->nvalues);
696 			ka->nvalues = 0;
697 			heapptr = itup->t_tid;
698 			attrnum = curattnum;
699 		}
700 
701 		/* Add key to KeyArray */
702 		curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
703 		addDatum(ka, curkey, curcategory);
704 	}
705 
706 	/* Dump out all remaining keys */
707 	ginInsertBAEntries(accum, &heapptr, attrnum,
708 					   ka->keys, ka->categories, ka->nvalues);
709 }
710 
711 /*
712  * Move tuples from pending pages into regular GIN structure.
713  *
714  * On first glance it looks completely not crash-safe. But if we crash
715  * after posting entries to the main index and before removing them from the
716  * pending list, it's okay because when we redo the posting later on, nothing
717  * bad will happen.
718  *
719  * fill_fsm indicates that ginInsertCleanup should add deleted pages
720  * to FSM otherwise caller is responsible to put deleted pages into
721  * FSM.
722  *
723  * If stats isn't null, we count deleted pending pages into the counts.
724  */
725 void
ginInsertCleanup(GinState * ginstate,bool full_clean,bool fill_fsm,bool forceCleanup,IndexBulkDeleteResult * stats)726 ginInsertCleanup(GinState *ginstate, bool full_clean,
727 				 bool fill_fsm, bool forceCleanup,
728 				 IndexBulkDeleteResult *stats)
729 {
730 	Relation	index = ginstate->index;
731 	Buffer		metabuffer,
732 				buffer;
733 	Page		metapage,
734 				page;
735 	GinMetaPageData *metadata;
736 	MemoryContext opCtx,
737 				oldCtx;
738 	BuildAccumulator accum;
739 	KeyArray	datums;
740 	BlockNumber blkno,
741 				blknoFinish;
742 	bool		cleanupFinish = false;
743 	bool		fsm_vac = false;
744 	Size		workMemory;
745 
746 	/*
747 	 * We would like to prevent concurrent cleanup process. For that we will
748 	 * lock metapage in exclusive mode using LockPage() call. Nobody other
749 	 * will use that lock for metapage, so we keep possibility of concurrent
750 	 * insertion into pending list
751 	 */
752 
753 	if (forceCleanup)
754 	{
755 		/*
756 		 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
757 		 * and we would like to wait concurrent cleanup to finish.
758 		 */
759 		LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
760 		workMemory =
761 			(IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
762 			autovacuum_work_mem : maintenance_work_mem;
763 	}
764 	else
765 	{
766 		/*
767 		 * We are called from regular insert and if we see concurrent cleanup
768 		 * just exit in hope that concurrent process will clean up pending
769 		 * list.
770 		 */
771 		if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
772 			return;
773 		workMemory = work_mem;
774 	}
775 
776 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
777 	LockBuffer(metabuffer, GIN_SHARE);
778 	metapage = BufferGetPage(metabuffer);
779 	metadata = GinPageGetMeta(metapage);
780 
781 	if (metadata->head == InvalidBlockNumber)
782 	{
783 		/* Nothing to do */
784 		UnlockReleaseBuffer(metabuffer);
785 		UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
786 		return;
787 	}
788 
789 	/*
790 	 * Remember a tail page to prevent infinite cleanup if other backends add
791 	 * new tuples faster than we can cleanup.
792 	 */
793 	blknoFinish = metadata->tail;
794 
795 	/*
796 	 * Read and lock head of pending list
797 	 */
798 	blkno = metadata->head;
799 	buffer = ReadBuffer(index, blkno);
800 	LockBuffer(buffer, GIN_SHARE);
801 	page = BufferGetPage(buffer);
802 
803 	LockBuffer(metabuffer, GIN_UNLOCK);
804 
805 	/*
806 	 * Initialize.  All temporary space will be in opCtx
807 	 */
808 	opCtx = AllocSetContextCreate(CurrentMemoryContext,
809 								  "GIN insert cleanup temporary context",
810 								  ALLOCSET_DEFAULT_SIZES);
811 
812 	oldCtx = MemoryContextSwitchTo(opCtx);
813 
814 	initKeyArray(&datums, 128);
815 	ginInitBA(&accum);
816 	accum.ginstate = ginstate;
817 
818 	/*
819 	 * At the top of this loop, we have pin and lock on the current page of
820 	 * the pending list.  However, we'll release that before exiting the loop.
821 	 * Note we also have pin but not lock on the metapage.
822 	 */
823 	for (;;)
824 	{
825 		Assert(!GinPageIsDeleted(page));
826 
827 		/*
828 		 * Are we walk through the page which as we remember was a tail when
829 		 * we start our cleanup?  But if caller asks us to clean up whole
830 		 * pending list then ignore old tail, we will work until list becomes
831 		 * empty.
832 		 */
833 		if (blkno == blknoFinish && full_clean == false)
834 			cleanupFinish = true;
835 
836 		/*
837 		 * read page's datums into accum
838 		 */
839 		processPendingPage(&accum, &datums, page, FirstOffsetNumber);
840 
841 		vacuum_delay_point();
842 
843 		/*
844 		 * Is it time to flush memory to disk?	Flush if we are at the end of
845 		 * the pending list, or if we have a full row and memory is getting
846 		 * full.
847 		 */
848 		if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
849 			(GinPageHasFullRow(page) &&
850 			 (accum.allocatedMemory >= workMemory * 1024L)))
851 		{
852 			ItemPointerData *list;
853 			uint32		nlist;
854 			Datum		key;
855 			GinNullCategory category;
856 			OffsetNumber maxoff,
857 						attnum;
858 
859 			/*
860 			 * Unlock current page to increase performance. Changes of page
861 			 * will be checked later by comparing maxoff after completion of
862 			 * memory flush.
863 			 */
864 			maxoff = PageGetMaxOffsetNumber(page);
865 			LockBuffer(buffer, GIN_UNLOCK);
866 
867 			/*
868 			 * Moving collected data into regular structure can take
869 			 * significant amount of time - so, run it without locking pending
870 			 * list.
871 			 */
872 			ginBeginBAScan(&accum);
873 			while ((list = ginGetBAEntry(&accum,
874 								  &attnum, &key, &category, &nlist)) != NULL)
875 			{
876 				ginEntryInsert(ginstate, attnum, key, category,
877 							   list, nlist, NULL);
878 				vacuum_delay_point();
879 			}
880 
881 			/*
882 			 * Lock the whole list to remove pages
883 			 */
884 			LockBuffer(metabuffer, GIN_EXCLUSIVE);
885 			LockBuffer(buffer, GIN_SHARE);
886 
887 			Assert(!GinPageIsDeleted(page));
888 
889 			/*
890 			 * While we left the page unlocked, more stuff might have gotten
891 			 * added to it.  If so, process those entries immediately.  There
892 			 * shouldn't be very many, so we don't worry about the fact that
893 			 * we're doing this with exclusive lock. Insertion algorithm
894 			 * guarantees that inserted row(s) will not continue on next page.
895 			 * NOTE: intentionally no vacuum_delay_point in this loop.
896 			 */
897 			if (PageGetMaxOffsetNumber(page) != maxoff)
898 			{
899 				ginInitBA(&accum);
900 				processPendingPage(&accum, &datums, page, maxoff + 1);
901 
902 				ginBeginBAScan(&accum);
903 				while ((list = ginGetBAEntry(&accum,
904 								  &attnum, &key, &category, &nlist)) != NULL)
905 					ginEntryInsert(ginstate, attnum, key, category,
906 								   list, nlist, NULL);
907 			}
908 
909 			/*
910 			 * Remember next page - it will become the new list head
911 			 */
912 			blkno = GinPageGetOpaque(page)->rightlink;
913 			UnlockReleaseBuffer(buffer);		/* shiftList will do exclusive
914 												 * locking */
915 
916 			/*
917 			 * remove read pages from pending list, at this point all content
918 			 * of read pages is in regular structure
919 			 */
920 			shiftList(index, metabuffer, blkno, fill_fsm, stats);
921 
922 			/* At this point, some pending pages have been freed up */
923 			fsm_vac = true;
924 
925 			Assert(blkno == metadata->head);
926 			LockBuffer(metabuffer, GIN_UNLOCK);
927 
928 			/*
929 			 * if we removed the whole pending list or we cleanup tail (which
930 			 * we remembered on start our cleanup process) then just exit
931 			 */
932 			if (blkno == InvalidBlockNumber || cleanupFinish)
933 				break;
934 
935 			/*
936 			 * release memory used so far and reinit state
937 			 */
938 			MemoryContextReset(opCtx);
939 			initKeyArray(&datums, datums.maxvalues);
940 			ginInitBA(&accum);
941 		}
942 		else
943 		{
944 			blkno = GinPageGetOpaque(page)->rightlink;
945 			UnlockReleaseBuffer(buffer);
946 		}
947 
948 		/*
949 		 * Read next page in pending list
950 		 */
951 		vacuum_delay_point();
952 		buffer = ReadBuffer(index, blkno);
953 		LockBuffer(buffer, GIN_SHARE);
954 		page = BufferGetPage(buffer);
955 	}
956 
957 	UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
958 	ReleaseBuffer(metabuffer);
959 
960 	/*
961 	 * As pending list pages can have a high churn rate, it is desirable to
962 	 * recycle them immediately to the FreeSpace Map when ordinary backends
963 	 * clean the list.
964 	 */
965 	if (fsm_vac && fill_fsm)
966 		IndexFreeSpaceMapVacuum(index);
967 
968 
969 	/* Clean up temporary space */
970 	MemoryContextSwitchTo(oldCtx);
971 	MemoryContextDelete(opCtx);
972 }
973 
974 /*
975  * SQL-callable function to clean the insert pending list
976  */
977 Datum
gin_clean_pending_list(PG_FUNCTION_ARGS)978 gin_clean_pending_list(PG_FUNCTION_ARGS)
979 {
980 	Oid			indexoid = PG_GETARG_OID(0);
981 	Relation	indexRel = index_open(indexoid, AccessShareLock);
982 	IndexBulkDeleteResult stats;
983 	GinState	ginstate;
984 
985 	if (RecoveryInProgress())
986 		ereport(ERROR,
987 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
988 				 errmsg("recovery is in progress"),
989 		 errhint("GIN pending list cannot be cleaned up during recovery.")));
990 
991 	/* Must be a GIN index */
992 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
993 		indexRel->rd_rel->relam != GIN_AM_OID)
994 		ereport(ERROR,
995 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
996 				 errmsg("\"%s\" is not a GIN index",
997 						RelationGetRelationName(indexRel))));
998 
999 	/*
1000 	 * Reject attempts to read non-local temporary relations; we would be
1001 	 * likely to get wrong data since we have no visibility into the owning
1002 	 * session's local buffers.
1003 	 */
1004 	if (RELATION_IS_OTHER_TEMP(indexRel))
1005 		ereport(ERROR,
1006 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1007 			   errmsg("cannot access temporary indexes of other sessions")));
1008 
1009 	/* User must own the index (comparable to privileges needed for VACUUM) */
1010 	if (!pg_class_ownercheck(indexoid, GetUserId()))
1011 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1012 					   RelationGetRelationName(indexRel));
1013 
1014 	memset(&stats, 0, sizeof(stats));
1015 	initGinState(&ginstate, indexRel);
1016 	ginInsertCleanup(&ginstate, true, true, true, &stats);
1017 
1018 	index_close(indexRel, AccessShareLock);
1019 
1020 	PG_RETURN_INT64((int64) stats.pages_deleted);
1021 }
1022