1 /*-------------------------------------------------------------------------
2  *
3  * ginutil.c
4  *	  Utility routines for the Postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gin/ginutil.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/gin_private.h"
18 #include "access/ginxlog.h"
19 #include "access/reloptions.h"
20 #include "access/xloginsert.h"
21 #include "catalog/pg_collation.h"
22 #include "catalog/pg_type.h"
23 #include "miscadmin.h"
24 #include "storage/indexfsm.h"
25 #include "storage/lmgr.h"
26 #include "storage/predicate.h"
27 #include "utils/builtins.h"
28 #include "utils/index_selfuncs.h"
29 #include "utils/typcache.h"
30 
31 
32 /*
33  * GIN handler function: return IndexAmRoutine with access method parameters
34  * and callbacks.
35  */
36 Datum
ginhandler(PG_FUNCTION_ARGS)37 ginhandler(PG_FUNCTION_ARGS)
38 {
39 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
40 
41 	amroutine->amstrategies = 0;
42 	amroutine->amsupport = GINNProcs;
43 	amroutine->amcanorder = false;
44 	amroutine->amcanorderbyop = false;
45 	amroutine->amcanbackward = false;
46 	amroutine->amcanunique = false;
47 	amroutine->amcanmulticol = true;
48 	amroutine->amoptionalkey = true;
49 	amroutine->amsearcharray = false;
50 	amroutine->amsearchnulls = false;
51 	amroutine->amstorage = true;
52 	amroutine->amclusterable = false;
53 	amroutine->ampredlocks = true;
54 	amroutine->amcanparallel = false;
55 	amroutine->amcaninclude = false;
56 	amroutine->amkeytype = InvalidOid;
57 
58 	amroutine->ambuild = ginbuild;
59 	amroutine->ambuildempty = ginbuildempty;
60 	amroutine->aminsert = gininsert;
61 	amroutine->ambulkdelete = ginbulkdelete;
62 	amroutine->amvacuumcleanup = ginvacuumcleanup;
63 	amroutine->amcanreturn = NULL;
64 	amroutine->amcostestimate = gincostestimate;
65 	amroutine->amoptions = ginoptions;
66 	amroutine->amproperty = NULL;
67 	amroutine->amvalidate = ginvalidate;
68 	amroutine->ambeginscan = ginbeginscan;
69 	amroutine->amrescan = ginrescan;
70 	amroutine->amgettuple = NULL;
71 	amroutine->amgetbitmap = gingetbitmap;
72 	amroutine->amendscan = ginendscan;
73 	amroutine->ammarkpos = NULL;
74 	amroutine->amrestrpos = NULL;
75 	amroutine->amestimateparallelscan = NULL;
76 	amroutine->aminitparallelscan = NULL;
77 	amroutine->amparallelrescan = NULL;
78 
79 	PG_RETURN_POINTER(amroutine);
80 }
81 
82 /*
83  * initGinState: fill in an empty GinState struct to describe the index
84  *
85  * Note: assorted subsidiary data is allocated in the CurrentMemoryContext.
86  */
87 void
initGinState(GinState * state,Relation index)88 initGinState(GinState *state, Relation index)
89 {
90 	TupleDesc	origTupdesc = RelationGetDescr(index);
91 	int			i;
92 
93 	MemSet(state, 0, sizeof(GinState));
94 
95 	state->index = index;
96 	state->oneCol = (origTupdesc->natts == 1) ? true : false;
97 	state->origTupdesc = origTupdesc;
98 
99 	for (i = 0; i < origTupdesc->natts; i++)
100 	{
101 		Form_pg_attribute attr = TupleDescAttr(origTupdesc, i);
102 
103 		if (state->oneCol)
104 			state->tupdesc[i] = state->origTupdesc;
105 		else
106 		{
107 			state->tupdesc[i] = CreateTemplateTupleDesc(2, false);
108 
109 			TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 1, NULL,
110 							   INT2OID, -1, 0);
111 			TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 2, NULL,
112 							   attr->atttypid,
113 							   attr->atttypmod,
114 							   attr->attndims);
115 			TupleDescInitEntryCollation(state->tupdesc[i], (AttrNumber) 2,
116 										attr->attcollation);
117 		}
118 
119 		/*
120 		 * If the compare proc isn't specified in the opclass definition, look
121 		 * up the index key type's default btree comparator.
122 		 */
123 		if (index_getprocid(index, i + 1, GIN_COMPARE_PROC) != InvalidOid)
124 		{
125 			fmgr_info_copy(&(state->compareFn[i]),
126 						   index_getprocinfo(index, i + 1, GIN_COMPARE_PROC),
127 						   CurrentMemoryContext);
128 		}
129 		else
130 		{
131 			TypeCacheEntry *typentry;
132 
133 			typentry = lookup_type_cache(attr->atttypid,
134 										 TYPECACHE_CMP_PROC_FINFO);
135 			if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
136 				ereport(ERROR,
137 						(errcode(ERRCODE_UNDEFINED_FUNCTION),
138 						 errmsg("could not identify a comparison function for type %s",
139 								format_type_be(attr->atttypid))));
140 			fmgr_info_copy(&(state->compareFn[i]),
141 						   &(typentry->cmp_proc_finfo),
142 						   CurrentMemoryContext);
143 		}
144 
145 		/* Opclass must always provide extract procs */
146 		fmgr_info_copy(&(state->extractValueFn[i]),
147 					   index_getprocinfo(index, i + 1, GIN_EXTRACTVALUE_PROC),
148 					   CurrentMemoryContext);
149 		fmgr_info_copy(&(state->extractQueryFn[i]),
150 					   index_getprocinfo(index, i + 1, GIN_EXTRACTQUERY_PROC),
151 					   CurrentMemoryContext);
152 
153 		/*
154 		 * Check opclass capability to do tri-state or binary logic consistent
155 		 * check.
156 		 */
157 		if (index_getprocid(index, i + 1, GIN_TRICONSISTENT_PROC) != InvalidOid)
158 		{
159 			fmgr_info_copy(&(state->triConsistentFn[i]),
160 						   index_getprocinfo(index, i + 1, GIN_TRICONSISTENT_PROC),
161 						   CurrentMemoryContext);
162 		}
163 
164 		if (index_getprocid(index, i + 1, GIN_CONSISTENT_PROC) != InvalidOid)
165 		{
166 			fmgr_info_copy(&(state->consistentFn[i]),
167 						   index_getprocinfo(index, i + 1, GIN_CONSISTENT_PROC),
168 						   CurrentMemoryContext);
169 		}
170 
171 		if (state->consistentFn[i].fn_oid == InvalidOid &&
172 			state->triConsistentFn[i].fn_oid == InvalidOid)
173 		{
174 			elog(ERROR, "missing GIN support function (%d or %d) for attribute %d of index \"%s\"",
175 				 GIN_CONSISTENT_PROC, GIN_TRICONSISTENT_PROC,
176 				 i + 1, RelationGetRelationName(index));
177 		}
178 
179 		/*
180 		 * Check opclass capability to do partial match.
181 		 */
182 		if (index_getprocid(index, i + 1, GIN_COMPARE_PARTIAL_PROC) != InvalidOid)
183 		{
184 			fmgr_info_copy(&(state->comparePartialFn[i]),
185 						   index_getprocinfo(index, i + 1, GIN_COMPARE_PARTIAL_PROC),
186 						   CurrentMemoryContext);
187 			state->canPartialMatch[i] = true;
188 		}
189 		else
190 		{
191 			state->canPartialMatch[i] = false;
192 		}
193 
194 		/*
195 		 * If the index column has a specified collation, we should honor that
196 		 * while doing comparisons.  However, we may have a collatable storage
197 		 * type for a noncollatable indexed data type (for instance, hstore
198 		 * uses text index entries).  If there's no index collation then
199 		 * specify default collation in case the support functions need
200 		 * collation.  This is harmless if the support functions don't care
201 		 * about collation, so we just do it unconditionally.  (We could
202 		 * alternatively call get_typcollation, but that seems like expensive
203 		 * overkill --- there aren't going to be any cases where a GIN storage
204 		 * type has a nondefault collation.)
205 		 */
206 		if (OidIsValid(index->rd_indcollation[i]))
207 			state->supportCollation[i] = index->rd_indcollation[i];
208 		else
209 			state->supportCollation[i] = DEFAULT_COLLATION_OID;
210 	}
211 }
212 
213 /*
214  * Extract attribute (column) number of stored entry from GIN tuple
215  */
216 OffsetNumber
gintuple_get_attrnum(GinState * ginstate,IndexTuple tuple)217 gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
218 {
219 	OffsetNumber colN;
220 
221 	if (ginstate->oneCol)
222 	{
223 		/* column number is not stored explicitly */
224 		colN = FirstOffsetNumber;
225 	}
226 	else
227 	{
228 		Datum		res;
229 		bool		isnull;
230 
231 		/*
232 		 * First attribute is always int16, so we can safely use any tuple
233 		 * descriptor to obtain first attribute of tuple
234 		 */
235 		res = index_getattr(tuple, FirstOffsetNumber, ginstate->tupdesc[0],
236 							&isnull);
237 		Assert(!isnull);
238 
239 		colN = DatumGetUInt16(res);
240 		Assert(colN >= FirstOffsetNumber && colN <= ginstate->origTupdesc->natts);
241 	}
242 
243 	return colN;
244 }
245 
246 /*
247  * Extract stored datum (and possible null category) from GIN tuple
248  */
249 Datum
gintuple_get_key(GinState * ginstate,IndexTuple tuple,GinNullCategory * category)250 gintuple_get_key(GinState *ginstate, IndexTuple tuple,
251 				 GinNullCategory *category)
252 {
253 	Datum		res;
254 	bool		isnull;
255 
256 	if (ginstate->oneCol)
257 	{
258 		/*
259 		 * Single column index doesn't store attribute numbers in tuples
260 		 */
261 		res = index_getattr(tuple, FirstOffsetNumber, ginstate->origTupdesc,
262 							&isnull);
263 	}
264 	else
265 	{
266 		/*
267 		 * Since the datum type depends on which index column it's from, we
268 		 * must be careful to use the right tuple descriptor here.
269 		 */
270 		OffsetNumber colN = gintuple_get_attrnum(ginstate, tuple);
271 
272 		res = index_getattr(tuple, OffsetNumberNext(FirstOffsetNumber),
273 							ginstate->tupdesc[colN - 1],
274 							&isnull);
275 	}
276 
277 	if (isnull)
278 		*category = GinGetNullCategory(tuple, ginstate);
279 	else
280 		*category = GIN_CAT_NORM_KEY;
281 
282 	return res;
283 }
284 
285 /*
286  * Allocate a new page (either by recycling, or by extending the index file)
287  * The returned buffer is already pinned and exclusive-locked
288  * Caller is responsible for initializing the page by calling GinInitBuffer
289  */
290 Buffer
GinNewBuffer(Relation index)291 GinNewBuffer(Relation index)
292 {
293 	Buffer		buffer;
294 	bool		needLock;
295 
296 	/* First, try to get a page from FSM */
297 	for (;;)
298 	{
299 		BlockNumber blkno = GetFreeIndexPage(index);
300 
301 		if (blkno == InvalidBlockNumber)
302 			break;
303 
304 		buffer = ReadBuffer(index, blkno);
305 
306 		/*
307 		 * We have to guard against the possibility that someone else already
308 		 * recycled this page; the buffer may be locked if so.
309 		 */
310 		if (ConditionalLockBuffer(buffer))
311 		{
312 			if (GinPageIsRecyclable(BufferGetPage(buffer)))
313 				return buffer;	/* OK to use */
314 
315 			LockBuffer(buffer, GIN_UNLOCK);
316 		}
317 
318 		/* Can't use it, so release buffer and try again */
319 		ReleaseBuffer(buffer);
320 	}
321 
322 	/* Must extend the file */
323 	needLock = !RELATION_IS_LOCAL(index);
324 	if (needLock)
325 		LockRelationForExtension(index, ExclusiveLock);
326 
327 	buffer = ReadBuffer(index, P_NEW);
328 	LockBuffer(buffer, GIN_EXCLUSIVE);
329 
330 	if (needLock)
331 		UnlockRelationForExtension(index, ExclusiveLock);
332 
333 	return buffer;
334 }
335 
336 void
GinInitPage(Page page,uint32 f,Size pageSize)337 GinInitPage(Page page, uint32 f, Size pageSize)
338 {
339 	GinPageOpaque opaque;
340 
341 	PageInit(page, pageSize, sizeof(GinPageOpaqueData));
342 
343 	opaque = GinPageGetOpaque(page);
344 	memset(opaque, 0, sizeof(GinPageOpaqueData));
345 	opaque->flags = f;
346 	opaque->rightlink = InvalidBlockNumber;
347 }
348 
349 void
GinInitBuffer(Buffer b,uint32 f)350 GinInitBuffer(Buffer b, uint32 f)
351 {
352 	GinInitPage(BufferGetPage(b), f, BufferGetPageSize(b));
353 }
354 
355 void
GinInitMetabuffer(Buffer b)356 GinInitMetabuffer(Buffer b)
357 {
358 	GinMetaPageData *metadata;
359 	Page		page = BufferGetPage(b);
360 
361 	GinInitPage(page, GIN_META, BufferGetPageSize(b));
362 
363 	metadata = GinPageGetMeta(page);
364 
365 	metadata->head = metadata->tail = InvalidBlockNumber;
366 	metadata->tailFreeSize = 0;
367 	metadata->nPendingPages = 0;
368 	metadata->nPendingHeapTuples = 0;
369 	metadata->nTotalPages = 0;
370 	metadata->nEntryPages = 0;
371 	metadata->nDataPages = 0;
372 	metadata->nEntries = 0;
373 	metadata->ginVersion = GIN_CURRENT_VERSION;
374 
375 	/*
376 	 * Set pd_lower just past the end of the metadata.  This is essential,
377 	 * because without doing so, metadata will be lost if xlog.c compresses
378 	 * the page.
379 	 */
380 	((PageHeader) page)->pd_lower =
381 		((char *) metadata + sizeof(GinMetaPageData)) - (char *) page;
382 }
383 
384 /*
385  * Compare two keys of the same index column
386  */
387 int
ginCompareEntries(GinState * ginstate,OffsetNumber attnum,Datum a,GinNullCategory categorya,Datum b,GinNullCategory categoryb)388 ginCompareEntries(GinState *ginstate, OffsetNumber attnum,
389 				  Datum a, GinNullCategory categorya,
390 				  Datum b, GinNullCategory categoryb)
391 {
392 	/* if not of same null category, sort by that first */
393 	if (categorya != categoryb)
394 		return (categorya < categoryb) ? -1 : 1;
395 
396 	/* all null items in same category are equal */
397 	if (categorya != GIN_CAT_NORM_KEY)
398 		return 0;
399 
400 	/* both not null, so safe to call the compareFn */
401 	return DatumGetInt32(FunctionCall2Coll(&ginstate->compareFn[attnum - 1],
402 										   ginstate->supportCollation[attnum - 1],
403 										   a, b));
404 }
405 
406 /*
407  * Compare two keys of possibly different index columns
408  */
409 int
ginCompareAttEntries(GinState * ginstate,OffsetNumber attnuma,Datum a,GinNullCategory categorya,OffsetNumber attnumb,Datum b,GinNullCategory categoryb)410 ginCompareAttEntries(GinState *ginstate,
411 					 OffsetNumber attnuma, Datum a, GinNullCategory categorya,
412 					 OffsetNumber attnumb, Datum b, GinNullCategory categoryb)
413 {
414 	/* attribute number is the first sort key */
415 	if (attnuma != attnumb)
416 		return (attnuma < attnumb) ? -1 : 1;
417 
418 	return ginCompareEntries(ginstate, attnuma, a, categorya, b, categoryb);
419 }
420 
421 
422 /*
423  * Support for sorting key datums in ginExtractEntries
424  *
425  * Note: we only have to worry about null and not-null keys here;
426  * ginExtractEntries never generates more than one placeholder null,
427  * so it doesn't have to sort those.
428  */
429 typedef struct
430 {
431 	Datum		datum;
432 	bool		isnull;
433 } keyEntryData;
434 
435 typedef struct
436 {
437 	FmgrInfo   *cmpDatumFunc;
438 	Oid			collation;
439 	bool		haveDups;
440 } cmpEntriesArg;
441 
442 static int
cmpEntries(const void * a,const void * b,void * arg)443 cmpEntries(const void *a, const void *b, void *arg)
444 {
445 	const keyEntryData *aa = (const keyEntryData *) a;
446 	const keyEntryData *bb = (const keyEntryData *) b;
447 	cmpEntriesArg *data = (cmpEntriesArg *) arg;
448 	int			res;
449 
450 	if (aa->isnull)
451 	{
452 		if (bb->isnull)
453 			res = 0;			/* NULL "=" NULL */
454 		else
455 			res = 1;			/* NULL ">" not-NULL */
456 	}
457 	else if (bb->isnull)
458 		res = -1;				/* not-NULL "<" NULL */
459 	else
460 		res = DatumGetInt32(FunctionCall2Coll(data->cmpDatumFunc,
461 											  data->collation,
462 											  aa->datum, bb->datum));
463 
464 	/*
465 	 * Detect if we have any duplicates.  If there are equal keys, qsort must
466 	 * compare them at some point, else it wouldn't know whether one should go
467 	 * before or after the other.
468 	 */
469 	if (res == 0)
470 		data->haveDups = true;
471 
472 	return res;
473 }
474 
475 
476 /*
477  * Extract the index key values from an indexable item
478  *
479  * The resulting key values are sorted, and any duplicates are removed.
480  * This avoids generating redundant index entries.
481  */
482 Datum *
ginExtractEntries(GinState * ginstate,OffsetNumber attnum,Datum value,bool isNull,int32 * nentries,GinNullCategory ** categories)483 ginExtractEntries(GinState *ginstate, OffsetNumber attnum,
484 				  Datum value, bool isNull,
485 				  int32 *nentries, GinNullCategory **categories)
486 {
487 	Datum	   *entries;
488 	bool	   *nullFlags;
489 	int32		i;
490 
491 	/*
492 	 * We don't call the extractValueFn on a null item.  Instead generate a
493 	 * placeholder.
494 	 */
495 	if (isNull)
496 	{
497 		*nentries = 1;
498 		entries = (Datum *) palloc(sizeof(Datum));
499 		entries[0] = (Datum) 0;
500 		*categories = (GinNullCategory *) palloc(sizeof(GinNullCategory));
501 		(*categories)[0] = GIN_CAT_NULL_ITEM;
502 		return entries;
503 	}
504 
505 	/* OK, call the opclass's extractValueFn */
506 	nullFlags = NULL;			/* in case extractValue doesn't set it */
507 	entries = (Datum *)
508 		DatumGetPointer(FunctionCall3Coll(&ginstate->extractValueFn[attnum - 1],
509 										  ginstate->supportCollation[attnum - 1],
510 										  value,
511 										  PointerGetDatum(nentries),
512 										  PointerGetDatum(&nullFlags)));
513 
514 	/*
515 	 * Generate a placeholder if the item contained no keys.
516 	 */
517 	if (entries == NULL || *nentries <= 0)
518 	{
519 		*nentries = 1;
520 		entries = (Datum *) palloc(sizeof(Datum));
521 		entries[0] = (Datum) 0;
522 		*categories = (GinNullCategory *) palloc(sizeof(GinNullCategory));
523 		(*categories)[0] = GIN_CAT_EMPTY_ITEM;
524 		return entries;
525 	}
526 
527 	/*
528 	 * If the extractValueFn didn't create a nullFlags array, create one,
529 	 * assuming that everything's non-null.
530 	 */
531 	if (nullFlags == NULL)
532 		nullFlags = (bool *) palloc0(*nentries * sizeof(bool));
533 
534 	/*
535 	 * If there's more than one key, sort and unique-ify.
536 	 *
537 	 * XXX Using qsort here is notationally painful, and the overhead is
538 	 * pretty bad too.  For small numbers of keys it'd likely be better to use
539 	 * a simple insertion sort.
540 	 */
541 	if (*nentries > 1)
542 	{
543 		keyEntryData *keydata;
544 		cmpEntriesArg arg;
545 
546 		keydata = (keyEntryData *) palloc(*nentries * sizeof(keyEntryData));
547 		for (i = 0; i < *nentries; i++)
548 		{
549 			keydata[i].datum = entries[i];
550 			keydata[i].isnull = nullFlags[i];
551 		}
552 
553 		arg.cmpDatumFunc = &ginstate->compareFn[attnum - 1];
554 		arg.collation = ginstate->supportCollation[attnum - 1];
555 		arg.haveDups = false;
556 		qsort_arg(keydata, *nentries, sizeof(keyEntryData),
557 				  cmpEntries, (void *) &arg);
558 
559 		if (arg.haveDups)
560 		{
561 			/* there are duplicates, must get rid of 'em */
562 			int32		j;
563 
564 			entries[0] = keydata[0].datum;
565 			nullFlags[0] = keydata[0].isnull;
566 			j = 1;
567 			for (i = 1; i < *nentries; i++)
568 			{
569 				if (cmpEntries(&keydata[i - 1], &keydata[i], &arg) != 0)
570 				{
571 					entries[j] = keydata[i].datum;
572 					nullFlags[j] = keydata[i].isnull;
573 					j++;
574 				}
575 			}
576 			*nentries = j;
577 		}
578 		else
579 		{
580 			/* easy, no duplicates */
581 			for (i = 0; i < *nentries; i++)
582 			{
583 				entries[i] = keydata[i].datum;
584 				nullFlags[i] = keydata[i].isnull;
585 			}
586 		}
587 
588 		pfree(keydata);
589 	}
590 
591 	/*
592 	 * Create GinNullCategory representation from nullFlags.
593 	 */
594 	*categories = (GinNullCategory *) palloc0(*nentries * sizeof(GinNullCategory));
595 	for (i = 0; i < *nentries; i++)
596 		(*categories)[i] = (nullFlags[i] ? GIN_CAT_NULL_KEY : GIN_CAT_NORM_KEY);
597 
598 	return entries;
599 }
600 
601 bytea *
ginoptions(Datum reloptions,bool validate)602 ginoptions(Datum reloptions, bool validate)
603 {
604 	relopt_value *options;
605 	GinOptions *rdopts;
606 	int			numoptions;
607 	static const relopt_parse_elt tab[] = {
608 		{"fastupdate", RELOPT_TYPE_BOOL, offsetof(GinOptions, useFastUpdate)},
609 		{"gin_pending_list_limit", RELOPT_TYPE_INT, offsetof(GinOptions,
610 															 pendingListCleanupSize)}
611 	};
612 
613 	options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIN,
614 							  &numoptions);
615 
616 	/* if none set, we're done */
617 	if (numoptions == 0)
618 		return NULL;
619 
620 	rdopts = allocateReloptStruct(sizeof(GinOptions), options, numoptions);
621 
622 	fillRelOptions((void *) rdopts, sizeof(GinOptions), options, numoptions,
623 				   validate, tab, lengthof(tab));
624 
625 	pfree(options);
626 
627 	return (bytea *) rdopts;
628 }
629 
630 /*
631  * Fetch index's statistical data into *stats
632  *
633  * Note: in the result, nPendingPages can be trusted to be up-to-date,
634  * as can ginVersion; but the other fields are as of the last VACUUM.
635  */
636 void
ginGetStats(Relation index,GinStatsData * stats)637 ginGetStats(Relation index, GinStatsData *stats)
638 {
639 	Buffer		metabuffer;
640 	Page		metapage;
641 	GinMetaPageData *metadata;
642 
643 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
644 	LockBuffer(metabuffer, GIN_SHARE);
645 	metapage = BufferGetPage(metabuffer);
646 	metadata = GinPageGetMeta(metapage);
647 
648 	stats->nPendingPages = metadata->nPendingPages;
649 	stats->nTotalPages = metadata->nTotalPages;
650 	stats->nEntryPages = metadata->nEntryPages;
651 	stats->nDataPages = metadata->nDataPages;
652 	stats->nEntries = metadata->nEntries;
653 	stats->ginVersion = metadata->ginVersion;
654 
655 	UnlockReleaseBuffer(metabuffer);
656 }
657 
658 /*
659  * Write the given statistics to the index's metapage
660  *
661  * Note: nPendingPages and ginVersion are *not* copied over
662  */
663 void
ginUpdateStats(Relation index,const GinStatsData * stats)664 ginUpdateStats(Relation index, const GinStatsData *stats)
665 {
666 	Buffer		metabuffer;
667 	Page		metapage;
668 	GinMetaPageData *metadata;
669 
670 	metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
671 	LockBuffer(metabuffer, GIN_EXCLUSIVE);
672 	metapage = BufferGetPage(metabuffer);
673 	metadata = GinPageGetMeta(metapage);
674 
675 	START_CRIT_SECTION();
676 
677 	metadata->nTotalPages = stats->nTotalPages;
678 	metadata->nEntryPages = stats->nEntryPages;
679 	metadata->nDataPages = stats->nDataPages;
680 	metadata->nEntries = stats->nEntries;
681 
682 	/*
683 	 * Set pd_lower just past the end of the metadata.  This is essential,
684 	 * because without doing so, metadata will be lost if xlog.c compresses
685 	 * the page.  (We must do this here because pre-v11 versions of PG did not
686 	 * set the metapage's pd_lower correctly, so a pg_upgraded index might
687 	 * contain the wrong value.)
688 	 */
689 	((PageHeader) metapage)->pd_lower =
690 		((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
691 
692 	MarkBufferDirty(metabuffer);
693 
694 	if (RelationNeedsWAL(index))
695 	{
696 		XLogRecPtr	recptr;
697 		ginxlogUpdateMeta data;
698 
699 		data.node = index->rd_node;
700 		data.ntuples = 0;
701 		data.newRightlink = data.prevTail = InvalidBlockNumber;
702 		memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
703 
704 		XLogBeginInsert();
705 		XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
706 		XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
707 
708 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
709 		PageSetLSN(metapage, recptr);
710 	}
711 
712 	UnlockReleaseBuffer(metabuffer);
713 
714 	END_CRIT_SECTION();
715 }
716