1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  *	  Fast insert routines for the Postgres inverted index access method.
5  *	  Pending entries are stored in linear list of pages.  Later on
6  *	  (typically during VACUUM), ginInsertCleanup() will be invoked to
7  *	  transfer pending entries into the regular index structure.  This
8  *	  wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *			src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xlog.h"
24 #include "access/xloginsert.h"
25 #include "catalog/pg_am.h"
26 #include "commands/vacuum.h"
27 #include "miscadmin.h"
28 #include "port/pg_bitutils.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/indexfsm.h"
31 #include "storage/lmgr.h"
32 #include "storage/predicate.h"
33 #include "utils/acl.h"
34 #include "utils/builtins.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 
38 /* GUC parameter */
39 int			gin_pending_list_limit = 0;
40 
41 #define GIN_PAGE_FREESIZE \
42 	( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
43 
44 typedef struct KeyArray
45 {
46 	Datum	   *keys;			/* expansible array */
47 	GinNullCategory *categories;	/* another expansible array */
48 	int32		nvalues;		/* current number of valid entries */
49 	int32		maxvalues;		/* allocated size of arrays */
50 } KeyArray;
51 
52 
53 /*
54  * Build a pending-list page from the given array of tuples, and write it out.
55  *
56  * Returns amount of free space left on the page.
57  */
58 static int32
writeListPage(Relation index,Buffer buffer,IndexTuple * tuples,int32 ntuples,BlockNumber rightlink)59 writeListPage(Relation index, Buffer buffer,
60 			  IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
61 {
62 	Page		page = BufferGetPage(buffer);
63 	int32		i,
64 				freesize,
65 				size = 0;
66 	OffsetNumber l,
67 				off;
68 	PGAlignedBlock workspace;
69 	char	   *ptr;
70 
71 	START_CRIT_SECTION();
72 
73 	GinInitBuffer(buffer, GIN_LIST);
74 
75 	off = FirstOffsetNumber;
76 	ptr = workspace.data;
77 
78 	for (i = 0; i < ntuples; i++)
79 	{
80 		int			this_size = IndexTupleSize(tuples[i]);
81 
82 		memcpy(ptr, tuples[i], this_size);
83 		ptr += this_size;
84 		size += this_size;
85 
86 		l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
87 
88 		if (l == InvalidOffsetNumber)
89 			elog(ERROR, "failed to add item to index page in \"%s\"",
90 				 RelationGetRelationName(index));
91 
92 		off++;
93 	}
94 
95 	Assert(size <= BLCKSZ);		/* else we overran workspace */
96 
97 	GinPageGetOpaque(page)->rightlink = rightlink;
98 
99 	/*
100 	 * tail page may contain only whole row(s) or final part of row placed on
101 	 * previous pages (a "row" here meaning all the index tuples generated for
102 	 * one heap tuple)
103 	 */
104 	if (rightlink == InvalidBlockNumber)
105 	{
106 		GinPageSetFullRow(page);
107 		GinPageGetOpaque(page)->maxoff = 1;
108 	}
109 	else
110 	{
111 		GinPageGetOpaque(page)->maxoff = 0;
112 	}
113 
114 	MarkBufferDirty(buffer);
115 
116 	if (RelationNeedsWAL(index))
117 	{
118 		ginxlogInsertListPage data;
119 		XLogRecPtr	recptr;
120 
121 		data.rightlink = rightlink;
122 		data.ntuples = ntuples;
123 
124 		XLogBeginInsert();
125 		XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
126 
127 		XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
128 		XLogRegisterBufData(0, workspace.data, size);
129 
130 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
131 		PageSetLSN(page, recptr);
132 	}
133 
134 	/* get free space before releasing buffer */
135 	freesize = PageGetExactFreeSpace(page);
136 
137 	UnlockReleaseBuffer(buffer);
138 
139 	END_CRIT_SECTION();
140 
141 	return freesize;
142 }
143 
144 static void
makeSublist(Relation index,IndexTuple * tuples,int32 ntuples,GinMetaPageData * res)145 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
146 			GinMetaPageData *res)
147 {
148 	Buffer		curBuffer = InvalidBuffer;
149 	Buffer		prevBuffer = InvalidBuffer;
150 	int			i,
151 				size = 0,
152 				tupsize;
153 	int			startTuple = 0;
154 
155 	Assert(ntuples > 0);
156 
157 	/*
158 	 * Split tuples into pages
159 	 */
160 	for (i = 0; i < ntuples; i++)
161 	{
162 		if (curBuffer == InvalidBuffer)
163 		{
164 			curBuffer = GinNewBuffer(index);
165 
166 			if (prevBuffer != InvalidBuffer)
167 			{
168 				res->nPendingPages++;
169 				writeListPage(index, prevBuffer,
170 							  tuples + startTuple,
171 							  i - startTuple,
172 							  BufferGetBlockNumber(curBuffer));
173 			}
174 			else
175 			{
176 				res->head = BufferGetBlockNumber(curBuffer);
177 			}
178 
179 			prevBuffer = curBuffer;
180 			startTuple = i;
181 			size = 0;
182 		}
183 
184 		tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
185 
186 		if (size + tupsize > GinListPageSize)
187 		{
188 			/* won't fit, force a new page and reprocess */
189 			i--;
190 			curBuffer = InvalidBuffer;
191 		}
192 		else
193 		{
194 			size += tupsize;
195 		}
196 	}
197 
198 	/*
199 	 * Write last page
200 	 */
201 	res->tail = BufferGetBlockNumber(curBuffer);
202 	res->tailFreeSize = writeListPage(index, curBuffer,
203 									  tuples + startTuple,
204 									  ntuples - startTuple,
205 									  InvalidBlockNumber);
206 	res->nPendingPages++;
207 	/* that was only one heap tuple */
208 	res->nPendingHeapTuples = 1;
209 }
210 
211 /*
212  * Write the index tuples contained in *collector into the index's
213  * pending list.
214  *
215  * Function guarantees that all these tuples will be inserted consecutively,
216  * preserving order
217  */
218 void
ginHeapTupleFastInsert(GinState * ginstate,GinTupleCollector * collector)219 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
220 {
221 	Relation	index = ginstate->index;
222 	Buffer		metabuffer;
223 	Page		metapage;
224 	GinMetaPageData *metadata = NULL;
225 	Buffer		buffer = InvalidBuffer;
226 	Page		page = NULL;
227 	ginxlogUpdateMeta data;
228 	bool		separateList = false;
229 	bool		needCleanup = false;
230 	int			cleanupSize;
231 	bool		needWal;
232 
233 	if (collector->ntuples == 0)
234 		return;
235 
236 	needWal = RelationNeedsWAL(index);
237 
238 	data.node = index->rd_node;
239 	data.ntuples = 0;
240 	data.newRightlink = data.prevTail = InvalidBlockNumber;
241 
242 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
243 	metapage = BufferGetPage(metabuffer);
244 
245 	/*
246 	 * An insertion to the pending list could logically belong anywhere in the
247 	 * tree, so it conflicts with all serializable scans.  All scans acquire a
248 	 * predicate lock on the metabuffer to represent that.
249 	 */
250 	CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
251 
252 	if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
253 	{
254 		/*
255 		 * Total size is greater than one page => make sublist
256 		 */
257 		separateList = true;
258 	}
259 	else
260 	{
261 		LockBuffer(metabuffer, GIN_EXCLUSIVE);
262 		metadata = GinPageGetMeta(metapage);
263 
264 		if (metadata->head == InvalidBlockNumber ||
265 			collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
266 		{
267 			/*
268 			 * Pending list is empty or total size is greater than freespace
269 			 * on tail page => make sublist
270 			 *
271 			 * We unlock metabuffer to keep high concurrency
272 			 */
273 			separateList = true;
274 			LockBuffer(metabuffer, GIN_UNLOCK);
275 		}
276 	}
277 
278 	if (separateList)
279 	{
280 		/*
281 		 * We should make sublist separately and append it to the tail
282 		 */
283 		GinMetaPageData sublist;
284 
285 		memset(&sublist, 0, sizeof(GinMetaPageData));
286 		makeSublist(index, collector->tuples, collector->ntuples, &sublist);
287 
288 		if (needWal)
289 			XLogBeginInsert();
290 
291 		/*
292 		 * metapage was unlocked, see above
293 		 */
294 		LockBuffer(metabuffer, GIN_EXCLUSIVE);
295 		metadata = GinPageGetMeta(metapage);
296 
297 		if (metadata->head == InvalidBlockNumber)
298 		{
299 			/*
300 			 * Main list is empty, so just insert sublist as main list
301 			 */
302 			START_CRIT_SECTION();
303 
304 			metadata->head = sublist.head;
305 			metadata->tail = sublist.tail;
306 			metadata->tailFreeSize = sublist.tailFreeSize;
307 
308 			metadata->nPendingPages = sublist.nPendingPages;
309 			metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
310 		}
311 		else
312 		{
313 			/*
314 			 * Merge lists
315 			 */
316 			data.prevTail = metadata->tail;
317 			data.newRightlink = sublist.head;
318 
319 			buffer = ReadBuffer(index, metadata->tail);
320 			LockBuffer(buffer, GIN_EXCLUSIVE);
321 			page = BufferGetPage(buffer);
322 
323 			Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
324 
325 			START_CRIT_SECTION();
326 
327 			GinPageGetOpaque(page)->rightlink = sublist.head;
328 
329 			MarkBufferDirty(buffer);
330 
331 			metadata->tail = sublist.tail;
332 			metadata->tailFreeSize = sublist.tailFreeSize;
333 
334 			metadata->nPendingPages += sublist.nPendingPages;
335 			metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
336 
337 			if (needWal)
338 				XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
339 		}
340 	}
341 	else
342 	{
343 		/*
344 		 * Insert into tail page.  Metapage is already locked
345 		 */
346 		OffsetNumber l,
347 					off;
348 		int			i,
349 					tupsize;
350 		char	   *ptr;
351 		char	   *collectordata;
352 
353 		buffer = ReadBuffer(index, metadata->tail);
354 		LockBuffer(buffer, GIN_EXCLUSIVE);
355 		page = BufferGetPage(buffer);
356 
357 		off = (PageIsEmpty(page)) ? FirstOffsetNumber :
358 			OffsetNumberNext(PageGetMaxOffsetNumber(page));
359 
360 		collectordata = ptr = (char *) palloc(collector->sumsize);
361 
362 		data.ntuples = collector->ntuples;
363 
364 		if (needWal)
365 			XLogBeginInsert();
366 
367 		START_CRIT_SECTION();
368 
369 		/*
370 		 * Increase counter of heap tuples
371 		 */
372 		Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
373 		GinPageGetOpaque(page)->maxoff++;
374 		metadata->nPendingHeapTuples++;
375 
376 		for (i = 0; i < collector->ntuples; i++)
377 		{
378 			tupsize = IndexTupleSize(collector->tuples[i]);
379 			l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
380 
381 			if (l == InvalidOffsetNumber)
382 				elog(ERROR, "failed to add item to index page in \"%s\"",
383 					 RelationGetRelationName(index));
384 
385 			memcpy(ptr, collector->tuples[i], tupsize);
386 			ptr += tupsize;
387 
388 			off++;
389 		}
390 
391 		Assert((ptr - collectordata) <= collector->sumsize);
392 		if (needWal)
393 		{
394 			XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
395 			XLogRegisterBufData(1, collectordata, collector->sumsize);
396 		}
397 
398 		metadata->tailFreeSize = PageGetExactFreeSpace(page);
399 
400 		MarkBufferDirty(buffer);
401 	}
402 
403 	/*
404 	 * Set pd_lower just past the end of the metadata.  This is essential,
405 	 * because without doing so, metadata will be lost if xlog.c compresses
406 	 * the page.  (We must do this here because pre-v11 versions of PG did not
407 	 * set the metapage's pd_lower correctly, so a pg_upgraded index might
408 	 * contain the wrong value.)
409 	 */
410 	((PageHeader) metapage)->pd_lower =
411 		((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
412 
413 	/*
414 	 * Write metabuffer, make xlog entry
415 	 */
416 	MarkBufferDirty(metabuffer);
417 
418 	if (needWal)
419 	{
420 		XLogRecPtr	recptr;
421 
422 		memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
423 
424 		XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
425 		XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
426 
427 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
428 		PageSetLSN(metapage, recptr);
429 
430 		if (buffer != InvalidBuffer)
431 		{
432 			PageSetLSN(page, recptr);
433 		}
434 	}
435 
436 	if (buffer != InvalidBuffer)
437 		UnlockReleaseBuffer(buffer);
438 
439 	/*
440 	 * Force pending list cleanup when it becomes too long. And,
441 	 * ginInsertCleanup could take significant amount of time, so we prefer to
442 	 * call it when it can do all the work in a single collection cycle. In
443 	 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
444 	 * while pending list is still small enough to fit into
445 	 * gin_pending_list_limit.
446 	 *
447 	 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
448 	 */
449 	cleanupSize = GinGetPendingListCleanupSize(index);
450 	if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
451 		needCleanup = true;
452 
453 	UnlockReleaseBuffer(metabuffer);
454 
455 	END_CRIT_SECTION();
456 
457 	/*
458 	 * Since it could contend with concurrent cleanup process we cleanup
459 	 * pending list not forcibly.
460 	 */
461 	if (needCleanup)
462 		ginInsertCleanup(ginstate, false, true, false, NULL);
463 }
464 
465 /*
466  * Create temporary index tuples for a single indexable item (one index column
467  * for the heap tuple specified by ht_ctid), and append them to the array
468  * in *collector.  They will subsequently be written out using
469  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
470  * temp tuples for a given heap tuple must be written in one call to
471  * ginHeapTupleFastInsert.
472  */
473 void
ginHeapTupleFastCollect(GinState * ginstate,GinTupleCollector * collector,OffsetNumber attnum,Datum value,bool isNull,ItemPointer ht_ctid)474 ginHeapTupleFastCollect(GinState *ginstate,
475 						GinTupleCollector *collector,
476 						OffsetNumber attnum, Datum value, bool isNull,
477 						ItemPointer ht_ctid)
478 {
479 	Datum	   *entries;
480 	GinNullCategory *categories;
481 	int32		i,
482 				nentries;
483 
484 	/*
485 	 * Extract the key values that need to be inserted in the index
486 	 */
487 	entries = ginExtractEntries(ginstate, attnum, value, isNull,
488 								&nentries, &categories);
489 
490 	/*
491 	 * Protect against integer overflow in allocation calculations
492 	 */
493 	if (nentries < 0 ||
494 		collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
495 		elog(ERROR, "too many entries for GIN index");
496 
497 	/*
498 	 * Allocate/reallocate memory for storing collected tuples
499 	 */
500 	if (collector->tuples == NULL)
501 	{
502 		/*
503 		 * Determine the number of elements to allocate in the tuples array
504 		 * initially.  Make it a power of 2 to avoid wasting memory when
505 		 * resizing (since palloc likes powers of 2).
506 		 */
507 		collector->lentuples = pg_nextpower2_32(Max(16, nentries));
508 		collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
509 	}
510 	else if (collector->lentuples < collector->ntuples + nentries)
511 	{
512 		/*
513 		 * Advance lentuples to the next suitable power of 2.  This won't
514 		 * overflow, though we could get to a value that exceeds
515 		 * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
516 		 */
517 		collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
518 		collector->tuples = (IndexTuple *) repalloc(collector->tuples,
519 													sizeof(IndexTuple) * collector->lentuples);
520 	}
521 
522 	/*
523 	 * Build an index tuple for each key value, and add to array.  In pending
524 	 * tuples we just stick the heap TID into t_tid.
525 	 */
526 	for (i = 0; i < nentries; i++)
527 	{
528 		IndexTuple	itup;
529 
530 		itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
531 							NULL, 0, 0, true);
532 		itup->t_tid = *ht_ctid;
533 		collector->tuples[collector->ntuples++] = itup;
534 		collector->sumsize += IndexTupleSize(itup);
535 	}
536 }
537 
538 /*
539  * Deletes pending list pages up to (not including) newHead page.
540  * If newHead == InvalidBlockNumber then function drops the whole list.
541  *
542  * metapage is pinned and exclusive-locked throughout this function.
543  */
544 static void
shiftList(Relation index,Buffer metabuffer,BlockNumber newHead,bool fill_fsm,IndexBulkDeleteResult * stats)545 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
546 		  bool fill_fsm, IndexBulkDeleteResult *stats)
547 {
548 	Page		metapage;
549 	GinMetaPageData *metadata;
550 	BlockNumber blknoToDelete;
551 
552 	metapage = BufferGetPage(metabuffer);
553 	metadata = GinPageGetMeta(metapage);
554 	blknoToDelete = metadata->head;
555 
556 	do
557 	{
558 		Page		page;
559 		int			i;
560 		int64		nDeletedHeapTuples = 0;
561 		ginxlogDeleteListPages data;
562 		Buffer		buffers[GIN_NDELETE_AT_ONCE];
563 		BlockNumber freespace[GIN_NDELETE_AT_ONCE];
564 
565 		data.ndeleted = 0;
566 		while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
567 		{
568 			freespace[data.ndeleted] = blknoToDelete;
569 			buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
570 			LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
571 			page = BufferGetPage(buffers[data.ndeleted]);
572 
573 			data.ndeleted++;
574 
575 			Assert(!GinPageIsDeleted(page));
576 
577 			nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
578 			blknoToDelete = GinPageGetOpaque(page)->rightlink;
579 		}
580 
581 		if (stats)
582 			stats->pages_deleted += data.ndeleted;
583 
584 		/*
585 		 * This operation touches an unusually large number of pages, so
586 		 * prepare the XLogInsert machinery for that before entering the
587 		 * critical section.
588 		 */
589 		if (RelationNeedsWAL(index))
590 			XLogEnsureRecordSpace(data.ndeleted, 0);
591 
592 		START_CRIT_SECTION();
593 
594 		metadata->head = blknoToDelete;
595 
596 		Assert(metadata->nPendingPages >= data.ndeleted);
597 		metadata->nPendingPages -= data.ndeleted;
598 		Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
599 		metadata->nPendingHeapTuples -= nDeletedHeapTuples;
600 
601 		if (blknoToDelete == InvalidBlockNumber)
602 		{
603 			metadata->tail = InvalidBlockNumber;
604 			metadata->tailFreeSize = 0;
605 			metadata->nPendingPages = 0;
606 			metadata->nPendingHeapTuples = 0;
607 		}
608 
609 		/*
610 		 * Set pd_lower just past the end of the metadata.  This is essential,
611 		 * because without doing so, metadata will be lost if xlog.c
612 		 * compresses the page.  (We must do this here because pre-v11
613 		 * versions of PG did not set the metapage's pd_lower correctly, so a
614 		 * pg_upgraded index might contain the wrong value.)
615 		 */
616 		((PageHeader) metapage)->pd_lower =
617 			((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
618 
619 		MarkBufferDirty(metabuffer);
620 
621 		for (i = 0; i < data.ndeleted; i++)
622 		{
623 			page = BufferGetPage(buffers[i]);
624 			GinPageGetOpaque(page)->flags = GIN_DELETED;
625 			MarkBufferDirty(buffers[i]);
626 		}
627 
628 		if (RelationNeedsWAL(index))
629 		{
630 			XLogRecPtr	recptr;
631 
632 			XLogBeginInsert();
633 			XLogRegisterBuffer(0, metabuffer,
634 							   REGBUF_WILL_INIT | REGBUF_STANDARD);
635 			for (i = 0; i < data.ndeleted; i++)
636 				XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
637 
638 			memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
639 
640 			XLogRegisterData((char *) &data,
641 							 sizeof(ginxlogDeleteListPages));
642 
643 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
644 			PageSetLSN(metapage, recptr);
645 
646 			for (i = 0; i < data.ndeleted; i++)
647 			{
648 				page = BufferGetPage(buffers[i]);
649 				PageSetLSN(page, recptr);
650 			}
651 		}
652 
653 		for (i = 0; i < data.ndeleted; i++)
654 			UnlockReleaseBuffer(buffers[i]);
655 
656 		END_CRIT_SECTION();
657 
658 		for (i = 0; fill_fsm && i < data.ndeleted; i++)
659 			RecordFreeIndexPage(index, freespace[i]);
660 
661 	} while (blknoToDelete != newHead);
662 }
663 
664 /* Initialize empty KeyArray */
665 static void
initKeyArray(KeyArray * keys,int32 maxvalues)666 initKeyArray(KeyArray *keys, int32 maxvalues)
667 {
668 	keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
669 	keys->categories = (GinNullCategory *)
670 		palloc(sizeof(GinNullCategory) * maxvalues);
671 	keys->nvalues = 0;
672 	keys->maxvalues = maxvalues;
673 }
674 
675 /* Add datum to KeyArray, resizing if needed */
676 static void
addDatum(KeyArray * keys,Datum datum,GinNullCategory category)677 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
678 {
679 	if (keys->nvalues >= keys->maxvalues)
680 	{
681 		keys->maxvalues *= 2;
682 		keys->keys = (Datum *)
683 			repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
684 		keys->categories = (GinNullCategory *)
685 			repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
686 	}
687 
688 	keys->keys[keys->nvalues] = datum;
689 	keys->categories[keys->nvalues] = category;
690 	keys->nvalues++;
691 }
692 
693 /*
694  * Collect data from a pending-list page in preparation for insertion into
695  * the main index.
696  *
697  * Go through all tuples >= startoff on page and collect values in accum
698  *
699  * Note that ka is just workspace --- it does not carry any state across
700  * calls.
701  */
702 static void
processPendingPage(BuildAccumulator * accum,KeyArray * ka,Page page,OffsetNumber startoff)703 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
704 				   Page page, OffsetNumber startoff)
705 {
706 	ItemPointerData heapptr;
707 	OffsetNumber i,
708 				maxoff;
709 	OffsetNumber attrnum;
710 
711 	/* reset *ka to empty */
712 	ka->nvalues = 0;
713 
714 	maxoff = PageGetMaxOffsetNumber(page);
715 	Assert(maxoff >= FirstOffsetNumber);
716 	ItemPointerSetInvalid(&heapptr);
717 	attrnum = 0;
718 
719 	for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
720 	{
721 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
722 		OffsetNumber curattnum;
723 		Datum		curkey;
724 		GinNullCategory curcategory;
725 
726 		/* Check for change of heap TID or attnum */
727 		curattnum = gintuple_get_attrnum(accum->ginstate, itup);
728 
729 		if (!ItemPointerIsValid(&heapptr))
730 		{
731 			heapptr = itup->t_tid;
732 			attrnum = curattnum;
733 		}
734 		else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
735 				   curattnum == attrnum))
736 		{
737 			/*
738 			 * ginInsertBAEntries can insert several datums per call, but only
739 			 * for one heap tuple and one column.  So call it at a boundary,
740 			 * and reset ka.
741 			 */
742 			ginInsertBAEntries(accum, &heapptr, attrnum,
743 							   ka->keys, ka->categories, ka->nvalues);
744 			ka->nvalues = 0;
745 			heapptr = itup->t_tid;
746 			attrnum = curattnum;
747 		}
748 
749 		/* Add key to KeyArray */
750 		curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
751 		addDatum(ka, curkey, curcategory);
752 	}
753 
754 	/* Dump out all remaining keys */
755 	ginInsertBAEntries(accum, &heapptr, attrnum,
756 					   ka->keys, ka->categories, ka->nvalues);
757 }
758 
759 /*
760  * Move tuples from pending pages into regular GIN structure.
761  *
762  * On first glance it looks completely not crash-safe. But if we crash
763  * after posting entries to the main index and before removing them from the
764  * pending list, it's okay because when we redo the posting later on, nothing
765  * bad will happen.
766  *
767  * fill_fsm indicates that ginInsertCleanup should add deleted pages
768  * to FSM otherwise caller is responsible to put deleted pages into
769  * FSM.
770  *
771  * If stats isn't null, we count deleted pending pages into the counts.
772  */
773 void
ginInsertCleanup(GinState * ginstate,bool full_clean,bool fill_fsm,bool forceCleanup,IndexBulkDeleteResult * stats)774 ginInsertCleanup(GinState *ginstate, bool full_clean,
775 				 bool fill_fsm, bool forceCleanup,
776 				 IndexBulkDeleteResult *stats)
777 {
778 	Relation	index = ginstate->index;
779 	Buffer		metabuffer,
780 				buffer;
781 	Page		metapage,
782 				page;
783 	GinMetaPageData *metadata;
784 	MemoryContext opCtx,
785 				oldCtx;
786 	BuildAccumulator accum;
787 	KeyArray	datums;
788 	BlockNumber blkno,
789 				blknoFinish;
790 	bool		cleanupFinish = false;
791 	bool		fsm_vac = false;
792 	Size		workMemory;
793 
794 	/*
795 	 * We would like to prevent concurrent cleanup process. For that we will
796 	 * lock metapage in exclusive mode using LockPage() call. Nobody other
797 	 * will use that lock for metapage, so we keep possibility of concurrent
798 	 * insertion into pending list
799 	 */
800 
801 	if (forceCleanup)
802 	{
803 		/*
804 		 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
805 		 * and we would like to wait concurrent cleanup to finish.
806 		 */
807 		LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
808 		workMemory =
809 			(IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
810 			autovacuum_work_mem : maintenance_work_mem;
811 	}
812 	else
813 	{
814 		/*
815 		 * We are called from regular insert and if we see concurrent cleanup
816 		 * just exit in hope that concurrent process will clean up pending
817 		 * list.
818 		 */
819 		if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
820 			return;
821 		workMemory = work_mem;
822 	}
823 
824 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
825 	LockBuffer(metabuffer, GIN_SHARE);
826 	metapage = BufferGetPage(metabuffer);
827 	metadata = GinPageGetMeta(metapage);
828 
829 	if (metadata->head == InvalidBlockNumber)
830 	{
831 		/* Nothing to do */
832 		UnlockReleaseBuffer(metabuffer);
833 		UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
834 		return;
835 	}
836 
837 	/*
838 	 * Remember a tail page to prevent infinite cleanup if other backends add
839 	 * new tuples faster than we can cleanup.
840 	 */
841 	blknoFinish = metadata->tail;
842 
843 	/*
844 	 * Read and lock head of pending list
845 	 */
846 	blkno = metadata->head;
847 	buffer = ReadBuffer(index, blkno);
848 	LockBuffer(buffer, GIN_SHARE);
849 	page = BufferGetPage(buffer);
850 
851 	LockBuffer(metabuffer, GIN_UNLOCK);
852 
853 	/*
854 	 * Initialize.  All temporary space will be in opCtx
855 	 */
856 	opCtx = AllocSetContextCreate(CurrentMemoryContext,
857 								  "GIN insert cleanup temporary context",
858 								  ALLOCSET_DEFAULT_SIZES);
859 
860 	oldCtx = MemoryContextSwitchTo(opCtx);
861 
862 	initKeyArray(&datums, 128);
863 	ginInitBA(&accum);
864 	accum.ginstate = ginstate;
865 
866 	/*
867 	 * At the top of this loop, we have pin and lock on the current page of
868 	 * the pending list.  However, we'll release that before exiting the loop.
869 	 * Note we also have pin but not lock on the metapage.
870 	 */
871 	for (;;)
872 	{
873 		Assert(!GinPageIsDeleted(page));
874 
875 		/*
876 		 * Are we walk through the page which as we remember was a tail when
877 		 * we start our cleanup?  But if caller asks us to clean up whole
878 		 * pending list then ignore old tail, we will work until list becomes
879 		 * empty.
880 		 */
881 		if (blkno == blknoFinish && full_clean == false)
882 			cleanupFinish = true;
883 
884 		/*
885 		 * read page's datums into accum
886 		 */
887 		processPendingPage(&accum, &datums, page, FirstOffsetNumber);
888 
889 		vacuum_delay_point();
890 
891 		/*
892 		 * Is it time to flush memory to disk?	Flush if we are at the end of
893 		 * the pending list, or if we have a full row and memory is getting
894 		 * full.
895 		 */
896 		if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
897 			(GinPageHasFullRow(page) &&
898 			 (accum.allocatedMemory >= workMemory * 1024L)))
899 		{
900 			ItemPointerData *list;
901 			uint32		nlist;
902 			Datum		key;
903 			GinNullCategory category;
904 			OffsetNumber maxoff,
905 						attnum;
906 
907 			/*
908 			 * Unlock current page to increase performance. Changes of page
909 			 * will be checked later by comparing maxoff after completion of
910 			 * memory flush.
911 			 */
912 			maxoff = PageGetMaxOffsetNumber(page);
913 			LockBuffer(buffer, GIN_UNLOCK);
914 
915 			/*
916 			 * Moving collected data into regular structure can take
917 			 * significant amount of time - so, run it without locking pending
918 			 * list.
919 			 */
920 			ginBeginBAScan(&accum);
921 			while ((list = ginGetBAEntry(&accum,
922 										 &attnum, &key, &category, &nlist)) != NULL)
923 			{
924 				ginEntryInsert(ginstate, attnum, key, category,
925 							   list, nlist, NULL);
926 				vacuum_delay_point();
927 			}
928 
929 			/*
930 			 * Lock the whole list to remove pages
931 			 */
932 			LockBuffer(metabuffer, GIN_EXCLUSIVE);
933 			LockBuffer(buffer, GIN_SHARE);
934 
935 			Assert(!GinPageIsDeleted(page));
936 
937 			/*
938 			 * While we left the page unlocked, more stuff might have gotten
939 			 * added to it.  If so, process those entries immediately.  There
940 			 * shouldn't be very many, so we don't worry about the fact that
941 			 * we're doing this with exclusive lock. Insertion algorithm
942 			 * guarantees that inserted row(s) will not continue on next page.
943 			 * NOTE: intentionally no vacuum_delay_point in this loop.
944 			 */
945 			if (PageGetMaxOffsetNumber(page) != maxoff)
946 			{
947 				ginInitBA(&accum);
948 				processPendingPage(&accum, &datums, page, maxoff + 1);
949 
950 				ginBeginBAScan(&accum);
951 				while ((list = ginGetBAEntry(&accum,
952 											 &attnum, &key, &category, &nlist)) != NULL)
953 					ginEntryInsert(ginstate, attnum, key, category,
954 								   list, nlist, NULL);
955 			}
956 
957 			/*
958 			 * Remember next page - it will become the new list head
959 			 */
960 			blkno = GinPageGetOpaque(page)->rightlink;
961 			UnlockReleaseBuffer(buffer);	/* shiftList will do exclusive
962 											 * locking */
963 
964 			/*
965 			 * remove read pages from pending list, at this point all content
966 			 * of read pages is in regular structure
967 			 */
968 			shiftList(index, metabuffer, blkno, fill_fsm, stats);
969 
970 			/* At this point, some pending pages have been freed up */
971 			fsm_vac = true;
972 
973 			Assert(blkno == metadata->head);
974 			LockBuffer(metabuffer, GIN_UNLOCK);
975 
976 			/*
977 			 * if we removed the whole pending list or we cleanup tail (which
978 			 * we remembered on start our cleanup process) then just exit
979 			 */
980 			if (blkno == InvalidBlockNumber || cleanupFinish)
981 				break;
982 
983 			/*
984 			 * release memory used so far and reinit state
985 			 */
986 			MemoryContextReset(opCtx);
987 			initKeyArray(&datums, datums.maxvalues);
988 			ginInitBA(&accum);
989 		}
990 		else
991 		{
992 			blkno = GinPageGetOpaque(page)->rightlink;
993 			UnlockReleaseBuffer(buffer);
994 		}
995 
996 		/*
997 		 * Read next page in pending list
998 		 */
999 		vacuum_delay_point();
1000 		buffer = ReadBuffer(index, blkno);
1001 		LockBuffer(buffer, GIN_SHARE);
1002 		page = BufferGetPage(buffer);
1003 	}
1004 
1005 	UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
1006 	ReleaseBuffer(metabuffer);
1007 
1008 	/*
1009 	 * As pending list pages can have a high churn rate, it is desirable to
1010 	 * recycle them immediately to the FreeSpaceMap when ordinary backends
1011 	 * clean the list.
1012 	 */
1013 	if (fsm_vac && fill_fsm)
1014 		IndexFreeSpaceMapVacuum(index);
1015 
1016 	/* Clean up temporary space */
1017 	MemoryContextSwitchTo(oldCtx);
1018 	MemoryContextDelete(opCtx);
1019 }
1020 
1021 /*
1022  * SQL-callable function to clean the insert pending list
1023  */
1024 Datum
gin_clean_pending_list(PG_FUNCTION_ARGS)1025 gin_clean_pending_list(PG_FUNCTION_ARGS)
1026 {
1027 	Oid			indexoid = PG_GETARG_OID(0);
1028 	Relation	indexRel = index_open(indexoid, RowExclusiveLock);
1029 	IndexBulkDeleteResult stats;
1030 	GinState	ginstate;
1031 
1032 	if (RecoveryInProgress())
1033 		ereport(ERROR,
1034 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1035 				 errmsg("recovery is in progress"),
1036 				 errhint("GIN pending list cannot be cleaned up during recovery.")));
1037 
1038 	/* Must be a GIN index */
1039 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1040 		indexRel->rd_rel->relam != GIN_AM_OID)
1041 		ereport(ERROR,
1042 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1043 				 errmsg("\"%s\" is not a GIN index",
1044 						RelationGetRelationName(indexRel))));
1045 
1046 	/*
1047 	 * Reject attempts to read non-local temporary relations; we would be
1048 	 * likely to get wrong data since we have no visibility into the owning
1049 	 * session's local buffers.
1050 	 */
1051 	if (RELATION_IS_OTHER_TEMP(indexRel))
1052 		ereport(ERROR,
1053 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1054 				 errmsg("cannot access temporary indexes of other sessions")));
1055 
1056 	/* User must own the index (comparable to privileges needed for VACUUM) */
1057 	if (!pg_class_ownercheck(indexoid, GetUserId()))
1058 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1059 					   RelationGetRelationName(indexRel));
1060 
1061 	memset(&stats, 0, sizeof(stats));
1062 	initGinState(&ginstate, indexRel);
1063 	ginInsertCleanup(&ginstate, true, true, true, &stats);
1064 
1065 	index_close(indexRel, RowExclusiveLock);
1066 
1067 	PG_RETURN_INT64((int64) stats.pages_deleted);
1068 }
1069