1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  *	  Fast insert routines for the Postgres inverted index access method.
5  *	  Pending entries are stored in linear list of pages.  Later on
6  *	  (typically during VACUUM), ginInsertCleanup() will be invoked to
7  *	  transfer pending entries into the regular index structure.  This
8  *	  wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *			src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/gin_private.h"
22 #include "access/ginxlog.h"
23 #include "access/xloginsert.h"
24 #include "access/xlog.h"
25 #include "commands/vacuum.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 #include "utils/acl.h"
31 #include "postmaster/autovacuum.h"
32 #include "storage/indexfsm.h"
33 #include "storage/lmgr.h"
34 #include "storage/predicate.h"
35 #include "utils/builtins.h"
36 
37 /* GUC parameter */
38 int			gin_pending_list_limit = 0;
39 
40 #define GIN_PAGE_FREESIZE \
41 	( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
42 
43 typedef struct KeyArray
44 {
45 	Datum	   *keys;			/* expansible array */
46 	GinNullCategory *categories;	/* another expansible array */
47 	int32		nvalues;		/* current number of valid entries */
48 	int32		maxvalues;		/* allocated size of arrays */
49 } KeyArray;
50 
51 
52 /*
53  * Build a pending-list page from the given array of tuples, and write it out.
54  *
55  * Returns amount of free space left on the page.
56  */
57 static int32
writeListPage(Relation index,Buffer buffer,IndexTuple * tuples,int32 ntuples,BlockNumber rightlink)58 writeListPage(Relation index, Buffer buffer,
59 			  IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
60 {
61 	Page		page = BufferGetPage(buffer);
62 	int32		i,
63 				freesize,
64 				size = 0;
65 	OffsetNumber l,
66 				off;
67 	PGAlignedBlock workspace;
68 	char	   *ptr;
69 
70 	START_CRIT_SECTION();
71 
72 	GinInitBuffer(buffer, GIN_LIST);
73 
74 	off = FirstOffsetNumber;
75 	ptr = workspace.data;
76 
77 	for (i = 0; i < ntuples; i++)
78 	{
79 		int			this_size = IndexTupleSize(tuples[i]);
80 
81 		memcpy(ptr, tuples[i], this_size);
82 		ptr += this_size;
83 		size += this_size;
84 
85 		l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
86 
87 		if (l == InvalidOffsetNumber)
88 			elog(ERROR, "failed to add item to index page in \"%s\"",
89 				 RelationGetRelationName(index));
90 
91 		off++;
92 	}
93 
94 	Assert(size <= BLCKSZ);		/* else we overran workspace */
95 
96 	GinPageGetOpaque(page)->rightlink = rightlink;
97 
98 	/*
99 	 * tail page may contain only whole row(s) or final part of row placed on
100 	 * previous pages (a "row" here meaning all the index tuples generated for
101 	 * one heap tuple)
102 	 */
103 	if (rightlink == InvalidBlockNumber)
104 	{
105 		GinPageSetFullRow(page);
106 		GinPageGetOpaque(page)->maxoff = 1;
107 	}
108 	else
109 	{
110 		GinPageGetOpaque(page)->maxoff = 0;
111 	}
112 
113 	MarkBufferDirty(buffer);
114 
115 	if (RelationNeedsWAL(index))
116 	{
117 		ginxlogInsertListPage data;
118 		XLogRecPtr	recptr;
119 
120 		data.rightlink = rightlink;
121 		data.ntuples = ntuples;
122 
123 		XLogBeginInsert();
124 		XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
125 
126 		XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
127 		XLogRegisterBufData(0, workspace.data, size);
128 
129 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
130 		PageSetLSN(page, recptr);
131 	}
132 
133 	/* get free space before releasing buffer */
134 	freesize = PageGetExactFreeSpace(page);
135 
136 	UnlockReleaseBuffer(buffer);
137 
138 	END_CRIT_SECTION();
139 
140 	return freesize;
141 }
142 
143 static void
makeSublist(Relation index,IndexTuple * tuples,int32 ntuples,GinMetaPageData * res)144 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
145 			GinMetaPageData *res)
146 {
147 	Buffer		curBuffer = InvalidBuffer;
148 	Buffer		prevBuffer = InvalidBuffer;
149 	int			i,
150 				size = 0,
151 				tupsize;
152 	int			startTuple = 0;
153 
154 	Assert(ntuples > 0);
155 
156 	/*
157 	 * Split tuples into pages
158 	 */
159 	for (i = 0; i < ntuples; i++)
160 	{
161 		if (curBuffer == InvalidBuffer)
162 		{
163 			curBuffer = GinNewBuffer(index);
164 
165 			if (prevBuffer != InvalidBuffer)
166 			{
167 				res->nPendingPages++;
168 				writeListPage(index, prevBuffer,
169 							  tuples + startTuple,
170 							  i - startTuple,
171 							  BufferGetBlockNumber(curBuffer));
172 			}
173 			else
174 			{
175 				res->head = BufferGetBlockNumber(curBuffer);
176 			}
177 
178 			prevBuffer = curBuffer;
179 			startTuple = i;
180 			size = 0;
181 		}
182 
183 		tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
184 
185 		if (size + tupsize > GinListPageSize)
186 		{
187 			/* won't fit, force a new page and reprocess */
188 			i--;
189 			curBuffer = InvalidBuffer;
190 		}
191 		else
192 		{
193 			size += tupsize;
194 		}
195 	}
196 
197 	/*
198 	 * Write last page
199 	 */
200 	res->tail = BufferGetBlockNumber(curBuffer);
201 	res->tailFreeSize = writeListPage(index, curBuffer,
202 									  tuples + startTuple,
203 									  ntuples - startTuple,
204 									  InvalidBlockNumber);
205 	res->nPendingPages++;
206 	/* that was only one heap tuple */
207 	res->nPendingHeapTuples = 1;
208 }
209 
210 /*
211  * Write the index tuples contained in *collector into the index's
212  * pending list.
213  *
214  * Function guarantees that all these tuples will be inserted consecutively,
215  * preserving order
216  */
217 void
ginHeapTupleFastInsert(GinState * ginstate,GinTupleCollector * collector)218 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
219 {
220 	Relation	index = ginstate->index;
221 	Buffer		metabuffer;
222 	Page		metapage;
223 	GinMetaPageData *metadata = NULL;
224 	Buffer		buffer = InvalidBuffer;
225 	Page		page = NULL;
226 	ginxlogUpdateMeta data;
227 	bool		separateList = false;
228 	bool		needCleanup = false;
229 	int			cleanupSize;
230 	bool		needWal;
231 
232 	if (collector->ntuples == 0)
233 		return;
234 
235 	needWal = RelationNeedsWAL(index);
236 
237 	data.node = index->rd_node;
238 	data.ntuples = 0;
239 	data.newRightlink = data.prevTail = InvalidBlockNumber;
240 
241 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
242 	metapage = BufferGetPage(metabuffer);
243 
244 	/*
245 	 * An insertion to the pending list could logically belong anywhere in the
246 	 * tree, so it conflicts with all serializable scans.  All scans acquire a
247 	 * predicate lock on the metabuffer to represent that.
248 	 */
249 	CheckForSerializableConflictIn(index, NULL, metabuffer);
250 
251 	if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
252 	{
253 		/*
254 		 * Total size is greater than one page => make sublist
255 		 */
256 		separateList = true;
257 	}
258 	else
259 	{
260 		LockBuffer(metabuffer, GIN_EXCLUSIVE);
261 		metadata = GinPageGetMeta(metapage);
262 
263 		if (metadata->head == InvalidBlockNumber ||
264 			collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
265 		{
266 			/*
267 			 * Pending list is empty or total size is greater than freespace
268 			 * on tail page => make sublist
269 			 *
270 			 * We unlock metabuffer to keep high concurrency
271 			 */
272 			separateList = true;
273 			LockBuffer(metabuffer, GIN_UNLOCK);
274 		}
275 	}
276 
277 	if (separateList)
278 	{
279 		/*
280 		 * We should make sublist separately and append it to the tail
281 		 */
282 		GinMetaPageData sublist;
283 
284 		memset(&sublist, 0, sizeof(GinMetaPageData));
285 		makeSublist(index, collector->tuples, collector->ntuples, &sublist);
286 
287 		if (needWal)
288 			XLogBeginInsert();
289 
290 		/*
291 		 * metapage was unlocked, see above
292 		 */
293 		LockBuffer(metabuffer, GIN_EXCLUSIVE);
294 		metadata = GinPageGetMeta(metapage);
295 
296 		if (metadata->head == InvalidBlockNumber)
297 		{
298 			/*
299 			 * Main list is empty, so just insert sublist as main list
300 			 */
301 			START_CRIT_SECTION();
302 
303 			metadata->head = sublist.head;
304 			metadata->tail = sublist.tail;
305 			metadata->tailFreeSize = sublist.tailFreeSize;
306 
307 			metadata->nPendingPages = sublist.nPendingPages;
308 			metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
309 		}
310 		else
311 		{
312 			/*
313 			 * Merge lists
314 			 */
315 			data.prevTail = metadata->tail;
316 			data.newRightlink = sublist.head;
317 
318 			buffer = ReadBuffer(index, metadata->tail);
319 			LockBuffer(buffer, GIN_EXCLUSIVE);
320 			page = BufferGetPage(buffer);
321 
322 			Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
323 
324 			START_CRIT_SECTION();
325 
326 			GinPageGetOpaque(page)->rightlink = sublist.head;
327 
328 			MarkBufferDirty(buffer);
329 
330 			metadata->tail = sublist.tail;
331 			metadata->tailFreeSize = sublist.tailFreeSize;
332 
333 			metadata->nPendingPages += sublist.nPendingPages;
334 			metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
335 
336 			if (needWal)
337 				XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
338 		}
339 	}
340 	else
341 	{
342 		/*
343 		 * Insert into tail page.  Metapage is already locked
344 		 */
345 		OffsetNumber l,
346 					off;
347 		int			i,
348 					tupsize;
349 		char	   *ptr;
350 		char	   *collectordata;
351 
352 		buffer = ReadBuffer(index, metadata->tail);
353 		LockBuffer(buffer, GIN_EXCLUSIVE);
354 		page = BufferGetPage(buffer);
355 
356 		off = (PageIsEmpty(page)) ? FirstOffsetNumber :
357 			OffsetNumberNext(PageGetMaxOffsetNumber(page));
358 
359 		collectordata = ptr = (char *) palloc(collector->sumsize);
360 
361 		data.ntuples = collector->ntuples;
362 
363 		if (needWal)
364 			XLogBeginInsert();
365 
366 		START_CRIT_SECTION();
367 
368 		/*
369 		 * Increase counter of heap tuples
370 		 */
371 		Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
372 		GinPageGetOpaque(page)->maxoff++;
373 		metadata->nPendingHeapTuples++;
374 
375 		for (i = 0; i < collector->ntuples; i++)
376 		{
377 			tupsize = IndexTupleSize(collector->tuples[i]);
378 			l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
379 
380 			if (l == InvalidOffsetNumber)
381 				elog(ERROR, "failed to add item to index page in \"%s\"",
382 					 RelationGetRelationName(index));
383 
384 			memcpy(ptr, collector->tuples[i], tupsize);
385 			ptr += tupsize;
386 
387 			off++;
388 		}
389 
390 		Assert((ptr - collectordata) <= collector->sumsize);
391 		if (needWal)
392 		{
393 			XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
394 			XLogRegisterBufData(1, collectordata, collector->sumsize);
395 		}
396 
397 		metadata->tailFreeSize = PageGetExactFreeSpace(page);
398 
399 		MarkBufferDirty(buffer);
400 	}
401 
402 	/*
403 	 * Set pd_lower just past the end of the metadata.  This is essential,
404 	 * because without doing so, metadata will be lost if xlog.c compresses
405 	 * the page.  (We must do this here because pre-v11 versions of PG did not
406 	 * set the metapage's pd_lower correctly, so a pg_upgraded index might
407 	 * contain the wrong value.)
408 	 */
409 	((PageHeader) metapage)->pd_lower =
410 		((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
411 
412 	/*
413 	 * Write metabuffer, make xlog entry
414 	 */
415 	MarkBufferDirty(metabuffer);
416 
417 	if (needWal)
418 	{
419 		XLogRecPtr	recptr;
420 
421 		memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
422 
423 		XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
424 		XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
425 
426 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
427 		PageSetLSN(metapage, recptr);
428 
429 		if (buffer != InvalidBuffer)
430 		{
431 			PageSetLSN(page, recptr);
432 		}
433 	}
434 
435 	if (buffer != InvalidBuffer)
436 		UnlockReleaseBuffer(buffer);
437 
438 	/*
439 	 * Force pending list cleanup when it becomes too long. And,
440 	 * ginInsertCleanup could take significant amount of time, so we prefer to
441 	 * call it when it can do all the work in a single collection cycle. In
442 	 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
443 	 * while pending list is still small enough to fit into
444 	 * gin_pending_list_limit.
445 	 *
446 	 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
447 	 */
448 	cleanupSize = GinGetPendingListCleanupSize(index);
449 	if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
450 		needCleanup = true;
451 
452 	UnlockReleaseBuffer(metabuffer);
453 
454 	END_CRIT_SECTION();
455 
456 	/*
457 	 * Since it could contend with concurrent cleanup process we cleanup
458 	 * pending list not forcibly.
459 	 */
460 	if (needCleanup)
461 		ginInsertCleanup(ginstate, false, true, false, NULL);
462 }
463 
464 /*
465  * Create temporary index tuples for a single indexable item (one index column
466  * for the heap tuple specified by ht_ctid), and append them to the array
467  * in *collector.  They will subsequently be written out using
468  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
469  * temp tuples for a given heap tuple must be written in one call to
470  * ginHeapTupleFastInsert.
471  */
472 void
ginHeapTupleFastCollect(GinState * ginstate,GinTupleCollector * collector,OffsetNumber attnum,Datum value,bool isNull,ItemPointer ht_ctid)473 ginHeapTupleFastCollect(GinState *ginstate,
474 						GinTupleCollector *collector,
475 						OffsetNumber attnum, Datum value, bool isNull,
476 						ItemPointer ht_ctid)
477 {
478 	Datum	   *entries;
479 	GinNullCategory *categories;
480 	int32		i,
481 				nentries;
482 
483 	/*
484 	 * Extract the key values that need to be inserted in the index
485 	 */
486 	entries = ginExtractEntries(ginstate, attnum, value, isNull,
487 								&nentries, &categories);
488 
489 	/*
490 	 * Allocate/reallocate memory for storing collected tuples
491 	 */
492 	if (collector->tuples == NULL)
493 	{
494 		collector->lentuples = nentries * ginstate->origTupdesc->natts;
495 		collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
496 	}
497 
498 	while (collector->ntuples + nentries > collector->lentuples)
499 	{
500 		collector->lentuples *= 2;
501 		collector->tuples = (IndexTuple *) repalloc(collector->tuples,
502 													sizeof(IndexTuple) * collector->lentuples);
503 	}
504 
505 	/*
506 	 * Build an index tuple for each key value, and add to array.  In pending
507 	 * tuples we just stick the heap TID into t_tid.
508 	 */
509 	for (i = 0; i < nentries; i++)
510 	{
511 		IndexTuple	itup;
512 
513 		itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
514 							NULL, 0, 0, true);
515 		itup->t_tid = *ht_ctid;
516 		collector->tuples[collector->ntuples++] = itup;
517 		collector->sumsize += IndexTupleSize(itup);
518 	}
519 }
520 
521 /*
522  * Deletes pending list pages up to (not including) newHead page.
523  * If newHead == InvalidBlockNumber then function drops the whole list.
524  *
525  * metapage is pinned and exclusive-locked throughout this function.
526  */
527 static void
shiftList(Relation index,Buffer metabuffer,BlockNumber newHead,bool fill_fsm,IndexBulkDeleteResult * stats)528 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
529 		  bool fill_fsm, IndexBulkDeleteResult *stats)
530 {
531 	Page		metapage;
532 	GinMetaPageData *metadata;
533 	BlockNumber blknoToDelete;
534 
535 	metapage = BufferGetPage(metabuffer);
536 	metadata = GinPageGetMeta(metapage);
537 	blknoToDelete = metadata->head;
538 
539 	do
540 	{
541 		Page		page;
542 		int			i;
543 		int64		nDeletedHeapTuples = 0;
544 		ginxlogDeleteListPages data;
545 		Buffer		buffers[GIN_NDELETE_AT_ONCE];
546 		BlockNumber freespace[GIN_NDELETE_AT_ONCE];
547 
548 		data.ndeleted = 0;
549 		while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
550 		{
551 			freespace[data.ndeleted] = blknoToDelete;
552 			buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
553 			LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
554 			page = BufferGetPage(buffers[data.ndeleted]);
555 
556 			data.ndeleted++;
557 
558 			Assert(!GinPageIsDeleted(page));
559 
560 			nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
561 			blknoToDelete = GinPageGetOpaque(page)->rightlink;
562 		}
563 
564 		if (stats)
565 			stats->pages_deleted += data.ndeleted;
566 
567 		/*
568 		 * This operation touches an unusually large number of pages, so
569 		 * prepare the XLogInsert machinery for that before entering the
570 		 * critical section.
571 		 */
572 		if (RelationNeedsWAL(index))
573 			XLogEnsureRecordSpace(data.ndeleted, 0);
574 
575 		START_CRIT_SECTION();
576 
577 		metadata->head = blknoToDelete;
578 
579 		Assert(metadata->nPendingPages >= data.ndeleted);
580 		metadata->nPendingPages -= data.ndeleted;
581 		Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
582 		metadata->nPendingHeapTuples -= nDeletedHeapTuples;
583 
584 		if (blknoToDelete == InvalidBlockNumber)
585 		{
586 			metadata->tail = InvalidBlockNumber;
587 			metadata->tailFreeSize = 0;
588 			metadata->nPendingPages = 0;
589 			metadata->nPendingHeapTuples = 0;
590 		}
591 
592 		/*
593 		 * Set pd_lower just past the end of the metadata.  This is essential,
594 		 * because without doing so, metadata will be lost if xlog.c
595 		 * compresses the page.  (We must do this here because pre-v11
596 		 * versions of PG did not set the metapage's pd_lower correctly, so a
597 		 * pg_upgraded index might contain the wrong value.)
598 		 */
599 		((PageHeader) metapage)->pd_lower =
600 			((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
601 
602 		MarkBufferDirty(metabuffer);
603 
604 		for (i = 0; i < data.ndeleted; i++)
605 		{
606 			page = BufferGetPage(buffers[i]);
607 			GinPageGetOpaque(page)->flags = GIN_DELETED;
608 			MarkBufferDirty(buffers[i]);
609 		}
610 
611 		if (RelationNeedsWAL(index))
612 		{
613 			XLogRecPtr	recptr;
614 
615 			XLogBeginInsert();
616 			XLogRegisterBuffer(0, metabuffer,
617 							   REGBUF_WILL_INIT | REGBUF_STANDARD);
618 			for (i = 0; i < data.ndeleted; i++)
619 				XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
620 
621 			memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
622 
623 			XLogRegisterData((char *) &data,
624 							 sizeof(ginxlogDeleteListPages));
625 
626 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
627 			PageSetLSN(metapage, recptr);
628 
629 			for (i = 0; i < data.ndeleted; i++)
630 			{
631 				page = BufferGetPage(buffers[i]);
632 				PageSetLSN(page, recptr);
633 			}
634 		}
635 
636 		for (i = 0; i < data.ndeleted; i++)
637 			UnlockReleaseBuffer(buffers[i]);
638 
639 		END_CRIT_SECTION();
640 
641 		for (i = 0; fill_fsm && i < data.ndeleted; i++)
642 			RecordFreeIndexPage(index, freespace[i]);
643 
644 	} while (blknoToDelete != newHead);
645 }
646 
647 /* Initialize empty KeyArray */
648 static void
initKeyArray(KeyArray * keys,int32 maxvalues)649 initKeyArray(KeyArray *keys, int32 maxvalues)
650 {
651 	keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
652 	keys->categories = (GinNullCategory *)
653 		palloc(sizeof(GinNullCategory) * maxvalues);
654 	keys->nvalues = 0;
655 	keys->maxvalues = maxvalues;
656 }
657 
658 /* Add datum to KeyArray, resizing if needed */
659 static void
addDatum(KeyArray * keys,Datum datum,GinNullCategory category)660 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
661 {
662 	if (keys->nvalues >= keys->maxvalues)
663 	{
664 		keys->maxvalues *= 2;
665 		keys->keys = (Datum *)
666 			repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
667 		keys->categories = (GinNullCategory *)
668 			repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
669 	}
670 
671 	keys->keys[keys->nvalues] = datum;
672 	keys->categories[keys->nvalues] = category;
673 	keys->nvalues++;
674 }
675 
676 /*
677  * Collect data from a pending-list page in preparation for insertion into
678  * the main index.
679  *
680  * Go through all tuples >= startoff on page and collect values in accum
681  *
682  * Note that ka is just workspace --- it does not carry any state across
683  * calls.
684  */
685 static void
processPendingPage(BuildAccumulator * accum,KeyArray * ka,Page page,OffsetNumber startoff)686 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
687 				   Page page, OffsetNumber startoff)
688 {
689 	ItemPointerData heapptr;
690 	OffsetNumber i,
691 				maxoff;
692 	OffsetNumber attrnum;
693 
694 	/* reset *ka to empty */
695 	ka->nvalues = 0;
696 
697 	maxoff = PageGetMaxOffsetNumber(page);
698 	Assert(maxoff >= FirstOffsetNumber);
699 	ItemPointerSetInvalid(&heapptr);
700 	attrnum = 0;
701 
702 	for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
703 	{
704 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
705 		OffsetNumber curattnum;
706 		Datum		curkey;
707 		GinNullCategory curcategory;
708 
709 		/* Check for change of heap TID or attnum */
710 		curattnum = gintuple_get_attrnum(accum->ginstate, itup);
711 
712 		if (!ItemPointerIsValid(&heapptr))
713 		{
714 			heapptr = itup->t_tid;
715 			attrnum = curattnum;
716 		}
717 		else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
718 				   curattnum == attrnum))
719 		{
720 			/*
721 			 * ginInsertBAEntries can insert several datums per call, but only
722 			 * for one heap tuple and one column.  So call it at a boundary,
723 			 * and reset ka.
724 			 */
725 			ginInsertBAEntries(accum, &heapptr, attrnum,
726 							   ka->keys, ka->categories, ka->nvalues);
727 			ka->nvalues = 0;
728 			heapptr = itup->t_tid;
729 			attrnum = curattnum;
730 		}
731 
732 		/* Add key to KeyArray */
733 		curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
734 		addDatum(ka, curkey, curcategory);
735 	}
736 
737 	/* Dump out all remaining keys */
738 	ginInsertBAEntries(accum, &heapptr, attrnum,
739 					   ka->keys, ka->categories, ka->nvalues);
740 }
741 
742 /*
743  * Move tuples from pending pages into regular GIN structure.
744  *
745  * On first glance it looks completely not crash-safe. But if we crash
746  * after posting entries to the main index and before removing them from the
747  * pending list, it's okay because when we redo the posting later on, nothing
748  * bad will happen.
749  *
750  * fill_fsm indicates that ginInsertCleanup should add deleted pages
751  * to FSM otherwise caller is responsible to put deleted pages into
752  * FSM.
753  *
754  * If stats isn't null, we count deleted pending pages into the counts.
755  */
756 void
ginInsertCleanup(GinState * ginstate,bool full_clean,bool fill_fsm,bool forceCleanup,IndexBulkDeleteResult * stats)757 ginInsertCleanup(GinState *ginstate, bool full_clean,
758 				 bool fill_fsm, bool forceCleanup,
759 				 IndexBulkDeleteResult *stats)
760 {
761 	Relation	index = ginstate->index;
762 	Buffer		metabuffer,
763 				buffer;
764 	Page		metapage,
765 				page;
766 	GinMetaPageData *metadata;
767 	MemoryContext opCtx,
768 				oldCtx;
769 	BuildAccumulator accum;
770 	KeyArray	datums;
771 	BlockNumber blkno,
772 				blknoFinish;
773 	bool		cleanupFinish = false;
774 	bool		fsm_vac = false;
775 	Size		workMemory;
776 
777 	/*
778 	 * We would like to prevent concurrent cleanup process. For that we will
779 	 * lock metapage in exclusive mode using LockPage() call. Nobody other
780 	 * will use that lock for metapage, so we keep possibility of concurrent
781 	 * insertion into pending list
782 	 */
783 
784 	if (forceCleanup)
785 	{
786 		/*
787 		 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
788 		 * and we would like to wait concurrent cleanup to finish.
789 		 */
790 		LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
791 		workMemory =
792 			(IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
793 			autovacuum_work_mem : maintenance_work_mem;
794 	}
795 	else
796 	{
797 		/*
798 		 * We are called from regular insert and if we see concurrent cleanup
799 		 * just exit in hope that concurrent process will clean up pending
800 		 * list.
801 		 */
802 		if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
803 			return;
804 		workMemory = work_mem;
805 	}
806 
807 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
808 	LockBuffer(metabuffer, GIN_SHARE);
809 	metapage = BufferGetPage(metabuffer);
810 	metadata = GinPageGetMeta(metapage);
811 
812 	if (metadata->head == InvalidBlockNumber)
813 	{
814 		/* Nothing to do */
815 		UnlockReleaseBuffer(metabuffer);
816 		UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
817 		return;
818 	}
819 
820 	/*
821 	 * Remember a tail page to prevent infinite cleanup if other backends add
822 	 * new tuples faster than we can cleanup.
823 	 */
824 	blknoFinish = metadata->tail;
825 
826 	/*
827 	 * Read and lock head of pending list
828 	 */
829 	blkno = metadata->head;
830 	buffer = ReadBuffer(index, blkno);
831 	LockBuffer(buffer, GIN_SHARE);
832 	page = BufferGetPage(buffer);
833 
834 	LockBuffer(metabuffer, GIN_UNLOCK);
835 
836 	/*
837 	 * Initialize.  All temporary space will be in opCtx
838 	 */
839 	opCtx = AllocSetContextCreate(CurrentMemoryContext,
840 								  "GIN insert cleanup temporary context",
841 								  ALLOCSET_DEFAULT_SIZES);
842 
843 	oldCtx = MemoryContextSwitchTo(opCtx);
844 
845 	initKeyArray(&datums, 128);
846 	ginInitBA(&accum);
847 	accum.ginstate = ginstate;
848 
849 	/*
850 	 * At the top of this loop, we have pin and lock on the current page of
851 	 * the pending list.  However, we'll release that before exiting the loop.
852 	 * Note we also have pin but not lock on the metapage.
853 	 */
854 	for (;;)
855 	{
856 		Assert(!GinPageIsDeleted(page));
857 
858 		/*
859 		 * Are we walk through the page which as we remember was a tail when
860 		 * we start our cleanup?  But if caller asks us to clean up whole
861 		 * pending list then ignore old tail, we will work until list becomes
862 		 * empty.
863 		 */
864 		if (blkno == blknoFinish && full_clean == false)
865 			cleanupFinish = true;
866 
867 		/*
868 		 * read page's datums into accum
869 		 */
870 		processPendingPage(&accum, &datums, page, FirstOffsetNumber);
871 
872 		vacuum_delay_point();
873 
874 		/*
875 		 * Is it time to flush memory to disk?	Flush if we are at the end of
876 		 * the pending list, or if we have a full row and memory is getting
877 		 * full.
878 		 */
879 		if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
880 			(GinPageHasFullRow(page) &&
881 			 (accum.allocatedMemory >= workMemory * 1024L)))
882 		{
883 			ItemPointerData *list;
884 			uint32		nlist;
885 			Datum		key;
886 			GinNullCategory category;
887 			OffsetNumber maxoff,
888 						attnum;
889 
890 			/*
891 			 * Unlock current page to increase performance. Changes of page
892 			 * will be checked later by comparing maxoff after completion of
893 			 * memory flush.
894 			 */
895 			maxoff = PageGetMaxOffsetNumber(page);
896 			LockBuffer(buffer, GIN_UNLOCK);
897 
898 			/*
899 			 * Moving collected data into regular structure can take
900 			 * significant amount of time - so, run it without locking pending
901 			 * list.
902 			 */
903 			ginBeginBAScan(&accum);
904 			while ((list = ginGetBAEntry(&accum,
905 										 &attnum, &key, &category, &nlist)) != NULL)
906 			{
907 				ginEntryInsert(ginstate, attnum, key, category,
908 							   list, nlist, NULL);
909 				vacuum_delay_point();
910 			}
911 
912 			/*
913 			 * Lock the whole list to remove pages
914 			 */
915 			LockBuffer(metabuffer, GIN_EXCLUSIVE);
916 			LockBuffer(buffer, GIN_SHARE);
917 
918 			Assert(!GinPageIsDeleted(page));
919 
920 			/*
921 			 * While we left the page unlocked, more stuff might have gotten
922 			 * added to it.  If so, process those entries immediately.  There
923 			 * shouldn't be very many, so we don't worry about the fact that
924 			 * we're doing this with exclusive lock. Insertion algorithm
925 			 * guarantees that inserted row(s) will not continue on next page.
926 			 * NOTE: intentionally no vacuum_delay_point in this loop.
927 			 */
928 			if (PageGetMaxOffsetNumber(page) != maxoff)
929 			{
930 				ginInitBA(&accum);
931 				processPendingPage(&accum, &datums, page, maxoff + 1);
932 
933 				ginBeginBAScan(&accum);
934 				while ((list = ginGetBAEntry(&accum,
935 											 &attnum, &key, &category, &nlist)) != NULL)
936 					ginEntryInsert(ginstate, attnum, key, category,
937 								   list, nlist, NULL);
938 			}
939 
940 			/*
941 			 * Remember next page - it will become the new list head
942 			 */
943 			blkno = GinPageGetOpaque(page)->rightlink;
944 			UnlockReleaseBuffer(buffer);	/* shiftList will do exclusive
945 											 * locking */
946 
947 			/*
948 			 * remove read pages from pending list, at this point all content
949 			 * of read pages is in regular structure
950 			 */
951 			shiftList(index, metabuffer, blkno, fill_fsm, stats);
952 
953 			/* At this point, some pending pages have been freed up */
954 			fsm_vac = true;
955 
956 			Assert(blkno == metadata->head);
957 			LockBuffer(metabuffer, GIN_UNLOCK);
958 
959 			/*
960 			 * if we removed the whole pending list or we cleanup tail (which
961 			 * we remembered on start our cleanup process) then just exit
962 			 */
963 			if (blkno == InvalidBlockNumber || cleanupFinish)
964 				break;
965 
966 			/*
967 			 * release memory used so far and reinit state
968 			 */
969 			MemoryContextReset(opCtx);
970 			initKeyArray(&datums, datums.maxvalues);
971 			ginInitBA(&accum);
972 		}
973 		else
974 		{
975 			blkno = GinPageGetOpaque(page)->rightlink;
976 			UnlockReleaseBuffer(buffer);
977 		}
978 
979 		/*
980 		 * Read next page in pending list
981 		 */
982 		vacuum_delay_point();
983 		buffer = ReadBuffer(index, blkno);
984 		LockBuffer(buffer, GIN_SHARE);
985 		page = BufferGetPage(buffer);
986 	}
987 
988 	UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
989 	ReleaseBuffer(metabuffer);
990 
991 	/*
992 	 * As pending list pages can have a high churn rate, it is desirable to
993 	 * recycle them immediately to the FreeSpace Map when ordinary backends
994 	 * clean the list.
995 	 */
996 	if (fsm_vac && fill_fsm)
997 		IndexFreeSpaceMapVacuum(index);
998 
999 	/* Clean up temporary space */
1000 	MemoryContextSwitchTo(oldCtx);
1001 	MemoryContextDelete(opCtx);
1002 }
1003 
1004 /*
1005  * SQL-callable function to clean the insert pending list
1006  */
1007 Datum
gin_clean_pending_list(PG_FUNCTION_ARGS)1008 gin_clean_pending_list(PG_FUNCTION_ARGS)
1009 {
1010 	Oid			indexoid = PG_GETARG_OID(0);
1011 	Relation	indexRel = index_open(indexoid, AccessShareLock);
1012 	IndexBulkDeleteResult stats;
1013 	GinState	ginstate;
1014 
1015 	if (RecoveryInProgress())
1016 		ereport(ERROR,
1017 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1018 				 errmsg("recovery is in progress"),
1019 				 errhint("GIN pending list cannot be cleaned up during recovery.")));
1020 
1021 	/* Must be a GIN index */
1022 	if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1023 		indexRel->rd_rel->relam != GIN_AM_OID)
1024 		ereport(ERROR,
1025 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1026 				 errmsg("\"%s\" is not a GIN index",
1027 						RelationGetRelationName(indexRel))));
1028 
1029 	/*
1030 	 * Reject attempts to read non-local temporary relations; we would be
1031 	 * likely to get wrong data since we have no visibility into the owning
1032 	 * session's local buffers.
1033 	 */
1034 	if (RELATION_IS_OTHER_TEMP(indexRel))
1035 		ereport(ERROR,
1036 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1037 				 errmsg("cannot access temporary indexes of other sessions")));
1038 
1039 	/* User must own the index (comparable to privileges needed for VACUUM) */
1040 	if (!pg_class_ownercheck(indexoid, GetUserId()))
1041 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1042 					   RelationGetRelationName(indexRel));
1043 
1044 	memset(&stats, 0, sizeof(stats));
1045 	initGinState(&ginstate, indexRel);
1046 	ginInsertCleanup(&ginstate, true, true, true, &stats);
1047 
1048 	index_close(indexRel, AccessShareLock);
1049 
1050 	PG_RETURN_INT64((int64) stats.pages_deleted);
1051 }
1052