1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *	  POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/utils/cache/relcache.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *		RelationCacheInitialize			- initialize relcache (to empty)
18  *		RelationCacheInitializePhase2	- initialize shared-catalog entries
19  *		RelationCacheInitializePhase3	- finish initializing relcache
20  *		RelationIdGetRelation			- get a reldesc by relation id
21  *		RelationClose					- close an open relation
22  *
23  * NOTES
24  *		The following code contains many undocumented hacks.  Please be
25  *		careful....
26  */
27 #include "postgres.h"
28 
29 #include <sys/file.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 
33 #include "access/htup_details.h"
34 #include "access/multixact.h"
35 #include "access/nbtree.h"
36 #include "access/reloptions.h"
37 #include "access/sysattr.h"
38 #include "access/table.h"
39 #include "access/tableam.h"
40 #include "access/tupdesc_details.h"
41 #include "access/xact.h"
42 #include "access/xlog.h"
43 #include "catalog/catalog.h"
44 #include "catalog/indexing.h"
45 #include "catalog/namespace.h"
46 #include "catalog/partition.h"
47 #include "catalog/pg_am.h"
48 #include "catalog/pg_amproc.h"
49 #include "catalog/pg_attrdef.h"
50 #include "catalog/pg_authid.h"
51 #include "catalog/pg_auth_members.h"
52 #include "catalog/pg_constraint.h"
53 #include "catalog/pg_database.h"
54 #include "catalog/pg_namespace.h"
55 #include "catalog/pg_opclass.h"
56 #include "catalog/pg_partitioned_table.h"
57 #include "catalog/pg_proc.h"
58 #include "catalog/pg_publication.h"
59 #include "catalog/pg_rewrite.h"
60 #include "catalog/pg_shseclabel.h"
61 #include "catalog/pg_statistic_ext.h"
62 #include "catalog/pg_subscription.h"
63 #include "catalog/pg_tablespace.h"
64 #include "catalog/pg_trigger.h"
65 #include "catalog/pg_type.h"
66 #include "catalog/schemapg.h"
67 #include "catalog/storage.h"
68 #include "commands/policy.h"
69 #include "commands/trigger.h"
70 #include "miscadmin.h"
71 #include "nodes/makefuncs.h"
72 #include "nodes/nodeFuncs.h"
73 #include "optimizer/optimizer.h"
74 #include "partitioning/partbounds.h"
75 #include "partitioning/partdesc.h"
76 #include "rewrite/rewriteDefine.h"
77 #include "rewrite/rowsecurity.h"
78 #include "storage/lmgr.h"
79 #include "storage/smgr.h"
80 #include "utils/array.h"
81 #include "utils/builtins.h"
82 #include "utils/datum.h"
83 #include "utils/fmgroids.h"
84 #include "utils/inval.h"
85 #include "utils/lsyscache.h"
86 #include "utils/memutils.h"
87 #include "utils/partcache.h"
88 #include "utils/relmapper.h"
89 #include "utils/resowner_private.h"
90 #include "utils/snapmgr.h"
91 #include "utils/syscache.h"
92 
93 
94 #define RELCACHE_INIT_FILEMAGIC		0x573266	/* version ID value */
95 
96 /*
97  * Default policy for whether to apply RECOVER_RELATION_BUILD_MEMORY:
98  * do so in clobber-cache builds but not otherwise.  This choice can be
99  * overridden at compile time with -DRECOVER_RELATION_BUILD_MEMORY=1 or =0.
100  */
101 #ifndef RECOVER_RELATION_BUILD_MEMORY
102 #if defined(CLOBBER_CACHE_ALWAYS) || defined(CLOBBER_CACHE_RECURSIVELY)
103 #define RECOVER_RELATION_BUILD_MEMORY 1
104 #else
105 #define RECOVER_RELATION_BUILD_MEMORY 0
106 #endif
107 #endif
108 
109 /*
110  *		hardcoded tuple descriptors, contents generated by genbki.pl
111  */
112 static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
113 static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
114 static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
115 static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
116 static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
117 static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
118 static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
119 static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
120 static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel};
121 static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};
122 
123 /*
124  *		Hash tables that index the relation cache
125  *
126  *		We used to index the cache by both name and OID, but now there
127  *		is only an index by OID.
128  */
129 typedef struct relidcacheent
130 {
131 	Oid			reloid;
132 	Relation	reldesc;
133 } RelIdCacheEnt;
134 
135 static HTAB *RelationIdCache;
136 
137 /*
138  * This flag is false until we have prepared the critical relcache entries
139  * that are needed to do indexscans on the tables read by relcache building.
140  */
141 bool		criticalRelcachesBuilt = false;
142 
143 /*
144  * This flag is false until we have prepared the critical relcache entries
145  * for shared catalogs (which are the tables needed for login).
146  */
147 bool		criticalSharedRelcachesBuilt = false;
148 
149 /*
150  * This counter counts relcache inval events received since backend startup
151  * (but only for rels that are actually in cache).  Presently, we use it only
152  * to detect whether data about to be written by write_relcache_init_file()
153  * might already be obsolete.
154  */
155 static long relcacheInvalsReceived = 0L;
156 
157 /*
158  * in_progress_list is a stack of ongoing RelationBuildDesc() calls.  CREATE
159  * INDEX CONCURRENTLY makes catalog changes under ShareUpdateExclusiveLock.
160  * It critically relies on each backend absorbing those changes no later than
161  * next transaction start.  Hence, RelationBuildDesc() loops until it finishes
162  * without accepting a relevant invalidation.  (Most invalidation consumers
163  * don't do this.)
164  */
165 typedef struct inprogressent
166 {
167 	Oid			reloid;			/* OID of relation being built */
168 	bool		invalidated;	/* whether an invalidation arrived for it */
169 } InProgressEnt;
170 
171 static InProgressEnt *in_progress_list;
172 static int	in_progress_list_len;
173 static int	in_progress_list_maxlen;
174 
175 /*
176  * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
177  * cleanup work.  This list intentionally has limited size; if it overflows,
178  * we fall back to scanning the whole hashtable.  There is no value in a very
179  * large list because (1) at some point, a hash_seq_search scan is faster than
180  * retail lookups, and (2) the value of this is to reduce EOXact work for
181  * short transactions, which can't have dirtied all that many tables anyway.
182  * EOXactListAdd() does not bother to prevent duplicate list entries, so the
183  * cleanup processing must be idempotent.
184  */
185 #define MAX_EOXACT_LIST 32
186 static Oid	eoxact_list[MAX_EOXACT_LIST];
187 static int	eoxact_list_len = 0;
188 static bool eoxact_list_overflowed = false;
189 
190 #define EOXactListAdd(rel) \
191 	do { \
192 		if (eoxact_list_len < MAX_EOXACT_LIST) \
193 			eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
194 		else \
195 			eoxact_list_overflowed = true; \
196 	} while (0)
197 
198 /*
199  * EOXactTupleDescArray stores TupleDescs that (might) need AtEOXact
200  * cleanup work.  The array expands as needed; there is no hashtable because
201  * we don't need to access individual items except at EOXact.
202  */
203 static TupleDesc *EOXactTupleDescArray;
204 static int	NextEOXactTupleDescNum = 0;
205 static int	EOXactTupleDescArrayLen = 0;
206 
207 /*
208  *		macros to manipulate the lookup hashtable
209  */
210 #define RelationCacheInsert(RELATION, replace_allowed)	\
211 do { \
212 	RelIdCacheEnt *hentry; bool found; \
213 	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
214 										   (void *) &((RELATION)->rd_id), \
215 										   HASH_ENTER, &found); \
216 	if (found) \
217 	{ \
218 		/* see comments in RelationBuildDesc and RelationBuildLocalRelation */ \
219 		Relation _old_rel = hentry->reldesc; \
220 		Assert(replace_allowed); \
221 		hentry->reldesc = (RELATION); \
222 		if (RelationHasReferenceCountZero(_old_rel)) \
223 			RelationDestroyRelation(_old_rel, false); \
224 		else if (!IsBootstrapProcessingMode()) \
225 			elog(WARNING, "leaking still-referenced relcache entry for \"%s\"", \
226 				 RelationGetRelationName(_old_rel)); \
227 	} \
228 	else \
229 		hentry->reldesc = (RELATION); \
230 } while(0)
231 
232 #define RelationIdCacheLookup(ID, RELATION) \
233 do { \
234 	RelIdCacheEnt *hentry; \
235 	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
236 										   (void *) &(ID), \
237 										   HASH_FIND, NULL); \
238 	if (hentry) \
239 		RELATION = hentry->reldesc; \
240 	else \
241 		RELATION = NULL; \
242 } while(0)
243 
244 #define RelationCacheDelete(RELATION) \
245 do { \
246 	RelIdCacheEnt *hentry; \
247 	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
248 										   (void *) &((RELATION)->rd_id), \
249 										   HASH_REMOVE, NULL); \
250 	if (hentry == NULL) \
251 		elog(WARNING, "failed to delete relcache entry for OID %u", \
252 			 (RELATION)->rd_id); \
253 } while(0)
254 
255 
256 /*
257  * Special cache for opclass-related information
258  *
259  * Note: only default support procs get cached, ie, those with
260  * lefttype = righttype = opcintype.
261  */
262 typedef struct opclasscacheent
263 {
264 	Oid			opclassoid;		/* lookup key: OID of opclass */
265 	bool		valid;			/* set true after successful fill-in */
266 	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
267 	Oid			opcfamily;		/* OID of opclass's family */
268 	Oid			opcintype;		/* OID of opclass's declared input type */
269 	RegProcedure *supportProcs; /* OIDs of support procedures */
270 } OpClassCacheEnt;
271 
272 static HTAB *OpClassCache = NULL;
273 
274 
275 /* non-export function prototypes */
276 
277 static void RelationDestroyRelation(Relation relation, bool remember_tupdesc);
278 static void RelationClearRelation(Relation relation, bool rebuild);
279 
280 static void RelationReloadIndexInfo(Relation relation);
281 static void RelationReloadNailed(Relation relation);
282 static void RelationFlushRelation(Relation relation);
283 static void RememberToFreeTupleDescAtEOX(TupleDesc td);
284 static void AtEOXact_cleanup(Relation relation, bool isCommit);
285 static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
286 								SubTransactionId mySubid, SubTransactionId parentSubid);
287 static bool load_relcache_init_file(bool shared);
288 static void write_relcache_init_file(bool shared);
289 static void write_item(const void *data, Size len, FILE *fp);
290 
291 static void formrdesc(const char *relationName, Oid relationReltype,
292 					  bool isshared, int natts, const FormData_pg_attribute *attrs);
293 
294 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
295 static Relation AllocateRelationDesc(Form_pg_class relp);
296 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
297 static void RelationBuildTupleDesc(Relation relation);
298 static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
299 static void RelationInitPhysicalAddr(Relation relation);
300 static void load_critical_index(Oid indexoid, Oid heapoid);
301 static TupleDesc GetPgClassDescriptor(void);
302 static TupleDesc GetPgIndexDescriptor(void);
303 static void AttrDefaultFetch(Relation relation);
304 static void CheckConstraintFetch(Relation relation);
305 static int	CheckConstraintCmp(const void *a, const void *b);
306 static List *insert_ordered_oid(List *list, Oid datum);
307 static void InitIndexAmRoutine(Relation relation);
308 static void IndexSupportInitialize(oidvector *indclass,
309 								   RegProcedure *indexSupport,
310 								   Oid *opFamily,
311 								   Oid *opcInType,
312 								   StrategyNumber maxSupportNumber,
313 								   AttrNumber maxAttributeNumber);
314 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
315 										  StrategyNumber numSupport);
316 static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
317 static void unlink_initfile(const char *initfilename, int elevel);
318 
319 
320 /*
321  *		ScanPgRelation
322  *
323  *		This is used by RelationBuildDesc to find a pg_class
324  *		tuple matching targetRelId.  The caller must hold at least
325  *		AccessShareLock on the target relid to prevent concurrent-update
326  *		scenarios; it isn't guaranteed that all scans used to build the
327  *		relcache entry will use the same snapshot.  If, for example,
328  *		an attribute were to be added after scanning pg_class and before
329  *		scanning pg_attribute, relnatts wouldn't match.
330  *
331  *		NB: the returned tuple has been copied into palloc'd storage
332  *		and must eventually be freed with heap_freetuple.
333  */
334 static HeapTuple
335 ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
336 {
337 	HeapTuple	pg_class_tuple;
338 	Relation	pg_class_desc;
339 	SysScanDesc pg_class_scan;
340 	ScanKeyData key[1];
341 	Snapshot	snapshot = NULL;
342 
343 	/*
344 	 * If something goes wrong during backend startup, we might find ourselves
345 	 * trying to read pg_class before we've selected a database.  That ain't
346 	 * gonna work, so bail out with a useful error message.  If this happens,
347 	 * it probably means a relcache entry that needs to be nailed isn't.
348 	 */
349 	if (!OidIsValid(MyDatabaseId))
350 		elog(FATAL, "cannot read pg_class without having selected a database");
351 
352 	/*
353 	 * form a scan key
354 	 */
355 	ScanKeyInit(&key[0],
356 				Anum_pg_class_oid,
357 				BTEqualStrategyNumber, F_OIDEQ,
358 				ObjectIdGetDatum(targetRelId));
359 
360 	/*
361 	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
362 	 * built the critical relcache entries (this includes initdb and startup
363 	 * without a pg_internal.init file).  The caller can also force a heap
364 	 * scan by setting indexOK == false.
365 	 */
366 	pg_class_desc = table_open(RelationRelationId, AccessShareLock);
367 
368 	/*
369 	 * The caller might need a tuple that's newer than the one the historic
370 	 * snapshot; currently the only case requiring to do so is looking up the
371 	 * relfilenode of non mapped system relations during decoding. That
372 	 * snapshot cant't change in the midst of a relcache build, so there's no
373 	 * need to register the snapshot.
374 	 */
375 	if (force_non_historic)
376 		snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId);
377 
378 	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
379 									   indexOK && criticalRelcachesBuilt,
380 									   snapshot,
381 									   1, key);
382 
383 	pg_class_tuple = systable_getnext(pg_class_scan);
384 
385 	/*
386 	 * Must copy tuple before releasing buffer.
387 	 */
388 	if (HeapTupleIsValid(pg_class_tuple))
389 		pg_class_tuple = heap_copytuple(pg_class_tuple);
390 
391 	/* all done */
392 	systable_endscan(pg_class_scan);
393 	table_close(pg_class_desc, AccessShareLock);
394 
395 	return pg_class_tuple;
396 }
397 
398 /*
399  *		AllocateRelationDesc
400  *
401  *		This is used to allocate memory for a new relation descriptor
402  *		and initialize the rd_rel field from the given pg_class tuple.
403  */
404 static Relation
405 AllocateRelationDesc(Form_pg_class relp)
406 {
407 	Relation	relation;
408 	MemoryContext oldcxt;
409 	Form_pg_class relationForm;
410 
411 	/* Relcache entries must live in CacheMemoryContext */
412 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
413 
414 	/*
415 	 * allocate and zero space for new relation descriptor
416 	 */
417 	relation = (Relation) palloc0(sizeof(RelationData));
418 
419 	/* make sure relation is marked as having no open file yet */
420 	relation->rd_smgr = NULL;
421 
422 	/*
423 	 * Copy the relation tuple form
424 	 *
425 	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
426 	 * variable-length fields (relacl, reloptions) are NOT stored in the
427 	 * relcache --- there'd be little point in it, since we don't copy the
428 	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
429 	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
430 	 * it from the syscache if you need it.  The same goes for the original
431 	 * form of reloptions (however, we do store the parsed form of reloptions
432 	 * in rd_options).
433 	 */
434 	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
435 
436 	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
437 
438 	/* initialize relation tuple form */
439 	relation->rd_rel = relationForm;
440 
441 	/* and allocate attribute tuple form storage */
442 	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts);
443 	/* which we mark as a reference-counted tupdesc */
444 	relation->rd_att->tdrefcount = 1;
445 
446 	MemoryContextSwitchTo(oldcxt);
447 
448 	return relation;
449 }
450 
451 /*
452  * RelationParseRelOptions
453  *		Convert pg_class.reloptions into pre-parsed rd_options
454  *
455  * tuple is the real pg_class tuple (not rd_rel!) for relation
456  *
457  * Note: rd_rel and (if an index) rd_indam must be valid already
458  */
459 static void
460 RelationParseRelOptions(Relation relation, HeapTuple tuple)
461 {
462 	bytea	   *options;
463 	amoptions_function amoptsfn;
464 
465 	relation->rd_options = NULL;
466 
467 	/*
468 	 * Look up any AM-specific parse function; fall out if relkind should not
469 	 * have options.
470 	 */
471 	switch (relation->rd_rel->relkind)
472 	{
473 		case RELKIND_RELATION:
474 		case RELKIND_TOASTVALUE:
475 		case RELKIND_VIEW:
476 		case RELKIND_MATVIEW:
477 		case RELKIND_PARTITIONED_TABLE:
478 			amoptsfn = NULL;
479 			break;
480 		case RELKIND_INDEX:
481 		case RELKIND_PARTITIONED_INDEX:
482 			amoptsfn = relation->rd_indam->amoptions;
483 			break;
484 		default:
485 			return;
486 	}
487 
488 	/*
489 	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
490 	 * we might not have any other for pg_class yet (consider executing this
491 	 * code for pg_class itself)
492 	 */
493 	options = extractRelOptions(tuple, GetPgClassDescriptor(), amoptsfn);
494 
495 	/*
496 	 * Copy parsed data into CacheMemoryContext.  To guard against the
497 	 * possibility of leaks in the reloptions code, we want to do the actual
498 	 * parsing in the caller's memory context and copy the results into
499 	 * CacheMemoryContext after the fact.
500 	 */
501 	if (options)
502 	{
503 		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
504 												  VARSIZE(options));
505 		memcpy(relation->rd_options, options, VARSIZE(options));
506 		pfree(options);
507 	}
508 }
509 
510 /*
511  *		RelationBuildTupleDesc
512  *
513  *		Form the relation's tuple descriptor from information in
514  *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
515  */
516 static void
517 RelationBuildTupleDesc(Relation relation)
518 {
519 	HeapTuple	pg_attribute_tuple;
520 	Relation	pg_attribute_desc;
521 	SysScanDesc pg_attribute_scan;
522 	ScanKeyData skey[2];
523 	int			need;
524 	TupleConstr *constr;
525 	AttrDefault *attrdef = NULL;
526 	AttrMissing *attrmiss = NULL;
527 	int			ndef = 0;
528 
529 	/* copy some fields from pg_class row to rd_att */
530 	relation->rd_att->tdtypeid = relation->rd_rel->reltype;
531 	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
532 
533 	constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
534 												sizeof(TupleConstr));
535 	constr->has_not_null = false;
536 	constr->has_generated_stored = false;
537 
538 	/*
539 	 * Form a scan key that selects only user attributes (attnum > 0).
540 	 * (Eliminating system attribute rows at the index level is lots faster
541 	 * than fetching them.)
542 	 */
543 	ScanKeyInit(&skey[0],
544 				Anum_pg_attribute_attrelid,
545 				BTEqualStrategyNumber, F_OIDEQ,
546 				ObjectIdGetDatum(RelationGetRelid(relation)));
547 	ScanKeyInit(&skey[1],
548 				Anum_pg_attribute_attnum,
549 				BTGreaterStrategyNumber, F_INT2GT,
550 				Int16GetDatum(0));
551 
552 	/*
553 	 * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
554 	 * built the critical relcache entries (this includes initdb and startup
555 	 * without a pg_internal.init file).
556 	 */
557 	pg_attribute_desc = table_open(AttributeRelationId, AccessShareLock);
558 	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
559 										   AttributeRelidNumIndexId,
560 										   criticalRelcachesBuilt,
561 										   NULL,
562 										   2, skey);
563 
564 	/*
565 	 * add attribute data to relation->rd_att
566 	 */
567 	need = RelationGetNumberOfAttributes(relation);
568 
569 	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
570 	{
571 		Form_pg_attribute attp;
572 		int			attnum;
573 		bool        atthasmissing;
574 
575 		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
576 
577 		attnum = attp->attnum;
578 		if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
579 			elog(ERROR, "invalid attribute number %d for %s",
580 				 attp->attnum, RelationGetRelationName(relation));
581 
582 
583 		memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
584 			   attp,
585 			   ATTRIBUTE_FIXED_PART_SIZE);
586 
587 		/*
588 		 * Fix atthasmissing flag - it's only for plain tables. Others
589 		 * should not have missing values set, but there may be some left from
590 		 * before when we placed that check, so this code defensively ignores
591 		 * such values.
592 		 */
593 		atthasmissing = attp->atthasmissing;
594 		if (relation->rd_rel->relkind != RELKIND_RELATION && atthasmissing)
595 		{
596 			Form_pg_attribute nattp;
597 
598 			atthasmissing = false;
599 			nattp = TupleDescAttr(relation->rd_att, attnum - 1);
600 			nattp->atthasmissing = false;
601 		}
602 
603 		/* Update constraint/default info */
604 		if (attp->attnotnull)
605 			constr->has_not_null = true;
606 		if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED)
607 			constr->has_generated_stored = true;
608 
609 		/* If the column has a default, fill it into the attrdef array */
610 		if (attp->atthasdef)
611 		{
612 			if (attrdef == NULL)
613 				attrdef = (AttrDefault *)
614 					MemoryContextAllocZero(CacheMemoryContext,
615 										   RelationGetNumberOfAttributes(relation) *
616 										   sizeof(AttrDefault));
617 			attrdef[ndef].adnum = attnum;
618 			attrdef[ndef].adbin = NULL;
619 
620 			ndef++;
621 		}
622 
623 		/* Likewise for a missing value */
624 		if (atthasmissing)
625 		{
626 			Datum		missingval;
627 			bool		missingNull;
628 
629 			/* Do we have a missing value? */
630 			missingval = heap_getattr(pg_attribute_tuple,
631 									  Anum_pg_attribute_attmissingval,
632 									  pg_attribute_desc->rd_att,
633 									  &missingNull);
634 			if (!missingNull)
635 			{
636 				/* Yes, fetch from the array */
637 				MemoryContext oldcxt;
638 				bool		is_null;
639 				int			one = 1;
640 				Datum		missval;
641 
642 				if (attrmiss == NULL)
643 					attrmiss = (AttrMissing *)
644 						MemoryContextAllocZero(CacheMemoryContext,
645 											   relation->rd_rel->relnatts *
646 											   sizeof(AttrMissing));
647 
648 				missval = array_get_element(missingval,
649 											1,
650 											&one,
651 											-1,
652 											attp->attlen,
653 											attp->attbyval,
654 											attp->attalign,
655 											&is_null);
656 				Assert(!is_null);
657 				if (attp->attbyval)
658 				{
659 					/* for copy by val just copy the datum direct */
660 					attrmiss[attnum - 1].am_value = missval;
661 				}
662 				else
663 				{
664 					/* otherwise copy in the correct context */
665 					oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
666 					attrmiss[attnum - 1].am_value = datumCopy(missval,
667 															  attp->attbyval,
668 															  attp->attlen);
669 					MemoryContextSwitchTo(oldcxt);
670 				}
671 				attrmiss[attnum - 1].am_present = true;
672 			}
673 		}
674 		need--;
675 		if (need == 0)
676 			break;
677 	}
678 
679 	/*
680 	 * end the scan and close the attribute relation
681 	 */
682 	systable_endscan(pg_attribute_scan);
683 	table_close(pg_attribute_desc, AccessShareLock);
684 
685 	if (need != 0)
686 		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
687 			 need, RelationGetRelid(relation));
688 
689 	/*
690 	 * The attcacheoff values we read from pg_attribute should all be -1
691 	 * ("unknown").  Verify this if assert checking is on.  They will be
692 	 * computed when and if needed during tuple access.
693 	 */
694 #ifdef USE_ASSERT_CHECKING
695 	{
696 		int			i;
697 
698 		for (i = 0; i < RelationGetNumberOfAttributes(relation); i++)
699 			Assert(TupleDescAttr(relation->rd_att, i)->attcacheoff == -1);
700 	}
701 #endif
702 
703 	/*
704 	 * However, we can easily set the attcacheoff value for the first
705 	 * attribute: it must be zero.  This eliminates the need for special cases
706 	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
707 	 */
708 	if (RelationGetNumberOfAttributes(relation) > 0)
709 		TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
710 
711 	/*
712 	 * Set up constraint/default info
713 	 */
714 	if (constr->has_not_null || ndef > 0 ||
715 		attrmiss || relation->rd_rel->relchecks)
716 	{
717 		relation->rd_att->constr = constr;
718 
719 		if (ndef > 0)			/* DEFAULTs */
720 		{
721 			if (ndef < RelationGetNumberOfAttributes(relation))
722 				constr->defval = (AttrDefault *)
723 					repalloc(attrdef, ndef * sizeof(AttrDefault));
724 			else
725 				constr->defval = attrdef;
726 			constr->num_defval = ndef;
727 			AttrDefaultFetch(relation);
728 		}
729 		else
730 			constr->num_defval = 0;
731 
732 		constr->missing = attrmiss;
733 
734 		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
735 		{
736 			constr->num_check = relation->rd_rel->relchecks;
737 			constr->check = (ConstrCheck *)
738 				MemoryContextAllocZero(CacheMemoryContext,
739 									   constr->num_check * sizeof(ConstrCheck));
740 			CheckConstraintFetch(relation);
741 		}
742 		else
743 			constr->num_check = 0;
744 	}
745 	else
746 	{
747 		pfree(constr);
748 		relation->rd_att->constr = NULL;
749 	}
750 }
751 
752 /*
753  *		RelationBuildRuleLock
754  *
755  *		Form the relation's rewrite rules from information in
756  *		the pg_rewrite system catalog.
757  *
758  * Note: The rule parsetrees are potentially very complex node structures.
759  * To allow these trees to be freed when the relcache entry is flushed,
760  * we make a private memory context to hold the RuleLock information for
761  * each relcache entry that has associated rules.  The context is used
762  * just for rule info, not for any other subsidiary data of the relcache
763  * entry, because that keeps the update logic in RelationClearRelation()
764  * manageable.  The other subsidiary data structures are simple enough
765  * to be easy to free explicitly, anyway.
766  */
767 static void
768 RelationBuildRuleLock(Relation relation)
769 {
770 	MemoryContext rulescxt;
771 	MemoryContext oldcxt;
772 	HeapTuple	rewrite_tuple;
773 	Relation	rewrite_desc;
774 	TupleDesc	rewrite_tupdesc;
775 	SysScanDesc rewrite_scan;
776 	ScanKeyData key;
777 	RuleLock   *rulelock;
778 	int			numlocks;
779 	RewriteRule **rules;
780 	int			maxlocks;
781 
782 	/*
783 	 * Make the private context.  Assume it'll not contain much data.
784 	 */
785 	rulescxt = AllocSetContextCreate(CacheMemoryContext,
786 									 "relation rules",
787 									 ALLOCSET_SMALL_SIZES);
788 	relation->rd_rulescxt = rulescxt;
789 	MemoryContextCopyAndSetIdentifier(rulescxt,
790 									  RelationGetRelationName(relation));
791 
792 	/*
793 	 * allocate an array to hold the rewrite rules (the array is extended if
794 	 * necessary)
795 	 */
796 	maxlocks = 4;
797 	rules = (RewriteRule **)
798 		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
799 	numlocks = 0;
800 
801 	/*
802 	 * form a scan key
803 	 */
804 	ScanKeyInit(&key,
805 				Anum_pg_rewrite_ev_class,
806 				BTEqualStrategyNumber, F_OIDEQ,
807 				ObjectIdGetDatum(RelationGetRelid(relation)));
808 
809 	/*
810 	 * open pg_rewrite and begin a scan
811 	 *
812 	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
813 	 * be reading the rules in name order, except possibly during
814 	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
815 	 * ensures that rules will be fired in name order.
816 	 */
817 	rewrite_desc = table_open(RewriteRelationId, AccessShareLock);
818 	rewrite_tupdesc = RelationGetDescr(rewrite_desc);
819 	rewrite_scan = systable_beginscan(rewrite_desc,
820 									  RewriteRelRulenameIndexId,
821 									  true, NULL,
822 									  1, &key);
823 
824 	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
825 	{
826 		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
827 		bool		isnull;
828 		Datum		rule_datum;
829 		char	   *rule_str;
830 		RewriteRule *rule;
831 
832 		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
833 												  sizeof(RewriteRule));
834 
835 		rule->ruleId = rewrite_form->oid;
836 
837 		rule->event = rewrite_form->ev_type - '0';
838 		rule->enabled = rewrite_form->ev_enabled;
839 		rule->isInstead = rewrite_form->is_instead;
840 
841 		/*
842 		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
843 		 * rule strings are often large enough to be toasted.  To avoid
844 		 * leaking memory in the caller's context, do the detoasting here so
845 		 * we can free the detoasted version.
846 		 */
847 		rule_datum = heap_getattr(rewrite_tuple,
848 								  Anum_pg_rewrite_ev_action,
849 								  rewrite_tupdesc,
850 								  &isnull);
851 		Assert(!isnull);
852 		rule_str = TextDatumGetCString(rule_datum);
853 		oldcxt = MemoryContextSwitchTo(rulescxt);
854 		rule->actions = (List *) stringToNode(rule_str);
855 		MemoryContextSwitchTo(oldcxt);
856 		pfree(rule_str);
857 
858 		rule_datum = heap_getattr(rewrite_tuple,
859 								  Anum_pg_rewrite_ev_qual,
860 								  rewrite_tupdesc,
861 								  &isnull);
862 		Assert(!isnull);
863 		rule_str = TextDatumGetCString(rule_datum);
864 		oldcxt = MemoryContextSwitchTo(rulescxt);
865 		rule->qual = (Node *) stringToNode(rule_str);
866 		MemoryContextSwitchTo(oldcxt);
867 		pfree(rule_str);
868 
869 		/*
870 		 * We want the rule's table references to be checked as though by the
871 		 * table owner, not the user referencing the rule.  Therefore, scan
872 		 * through the rule's actions and set the checkAsUser field on all
873 		 * rtable entries.  We have to look at the qual as well, in case it
874 		 * contains sublinks.
875 		 *
876 		 * The reason for doing this when the rule is loaded, rather than when
877 		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
878 		 * grovel through stored rules to update checkAsUser fields. Scanning
879 		 * the rule tree during load is relatively cheap (compared to
880 		 * constructing it in the first place), so we do it here.
881 		 */
882 		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
883 		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
884 
885 		if (numlocks >= maxlocks)
886 		{
887 			maxlocks *= 2;
888 			rules = (RewriteRule **)
889 				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
890 		}
891 		rules[numlocks++] = rule;
892 	}
893 
894 	/*
895 	 * end the scan and close the attribute relation
896 	 */
897 	systable_endscan(rewrite_scan);
898 	table_close(rewrite_desc, AccessShareLock);
899 
900 	/*
901 	 * there might not be any rules (if relhasrules is out-of-date)
902 	 */
903 	if (numlocks == 0)
904 	{
905 		relation->rd_rules = NULL;
906 		relation->rd_rulescxt = NULL;
907 		MemoryContextDelete(rulescxt);
908 		return;
909 	}
910 
911 	/*
912 	 * form a RuleLock and insert into relation
913 	 */
914 	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
915 	rulelock->numLocks = numlocks;
916 	rulelock->rules = rules;
917 
918 	relation->rd_rules = rulelock;
919 }
920 
921 /*
922  *		equalRuleLocks
923  *
924  *		Determine whether two RuleLocks are equivalent
925  *
926  *		Probably this should be in the rules code someplace...
927  */
928 static bool
929 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
930 {
931 	int			i;
932 
933 	/*
934 	 * As of 7.3 we assume the rule ordering is repeatable, because
935 	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
936 	 * compare corresponding slots.
937 	 */
938 	if (rlock1 != NULL)
939 	{
940 		if (rlock2 == NULL)
941 			return false;
942 		if (rlock1->numLocks != rlock2->numLocks)
943 			return false;
944 		for (i = 0; i < rlock1->numLocks; i++)
945 		{
946 			RewriteRule *rule1 = rlock1->rules[i];
947 			RewriteRule *rule2 = rlock2->rules[i];
948 
949 			if (rule1->ruleId != rule2->ruleId)
950 				return false;
951 			if (rule1->event != rule2->event)
952 				return false;
953 			if (rule1->enabled != rule2->enabled)
954 				return false;
955 			if (rule1->isInstead != rule2->isInstead)
956 				return false;
957 			if (!equal(rule1->qual, rule2->qual))
958 				return false;
959 			if (!equal(rule1->actions, rule2->actions))
960 				return false;
961 		}
962 	}
963 	else if (rlock2 != NULL)
964 		return false;
965 	return true;
966 }
967 
968 /*
969  *		equalPolicy
970  *
971  *		Determine whether two policies are equivalent
972  */
973 static bool
974 equalPolicy(RowSecurityPolicy *policy1, RowSecurityPolicy *policy2)
975 {
976 	int			i;
977 	Oid		   *r1,
978 			   *r2;
979 
980 	if (policy1 != NULL)
981 	{
982 		if (policy2 == NULL)
983 			return false;
984 
985 		if (policy1->polcmd != policy2->polcmd)
986 			return false;
987 		if (policy1->hassublinks != policy2->hassublinks)
988 			return false;
989 		if (strcmp(policy1->policy_name, policy2->policy_name) != 0)
990 			return false;
991 		if (ARR_DIMS(policy1->roles)[0] != ARR_DIMS(policy2->roles)[0])
992 			return false;
993 
994 		r1 = (Oid *) ARR_DATA_PTR(policy1->roles);
995 		r2 = (Oid *) ARR_DATA_PTR(policy2->roles);
996 
997 		for (i = 0; i < ARR_DIMS(policy1->roles)[0]; i++)
998 		{
999 			if (r1[i] != r2[i])
1000 				return false;
1001 		}
1002 
1003 		if (!equal(policy1->qual, policy2->qual))
1004 			return false;
1005 		if (!equal(policy1->with_check_qual, policy2->with_check_qual))
1006 			return false;
1007 	}
1008 	else if (policy2 != NULL)
1009 		return false;
1010 
1011 	return true;
1012 }
1013 
1014 /*
1015  *		equalRSDesc
1016  *
1017  *		Determine whether two RowSecurityDesc's are equivalent
1018  */
1019 static bool
1020 equalRSDesc(RowSecurityDesc *rsdesc1, RowSecurityDesc *rsdesc2)
1021 {
1022 	ListCell   *lc,
1023 			   *rc;
1024 
1025 	if (rsdesc1 == NULL && rsdesc2 == NULL)
1026 		return true;
1027 
1028 	if ((rsdesc1 != NULL && rsdesc2 == NULL) ||
1029 		(rsdesc1 == NULL && rsdesc2 != NULL))
1030 		return false;
1031 
1032 	if (list_length(rsdesc1->policies) != list_length(rsdesc2->policies))
1033 		return false;
1034 
1035 	/* RelationBuildRowSecurity should build policies in order */
1036 	forboth(lc, rsdesc1->policies, rc, rsdesc2->policies)
1037 	{
1038 		RowSecurityPolicy *l = (RowSecurityPolicy *) lfirst(lc);
1039 		RowSecurityPolicy *r = (RowSecurityPolicy *) lfirst(rc);
1040 
1041 		if (!equalPolicy(l, r))
1042 			return false;
1043 	}
1044 
1045 	return true;
1046 }
1047 
1048 /*
1049  *		RelationBuildDesc
1050  *
1051  *		Build a relation descriptor.  The caller must hold at least
1052  *		AccessShareLock on the target relid.
1053  *
1054  *		The new descriptor is inserted into the hash table if insertIt is true.
1055  *
1056  *		Returns NULL if no pg_class row could be found for the given relid
1057  *		(suggesting we are trying to access a just-deleted relation).
1058  *		Any other error is reported via elog.
1059  */
1060 static Relation
1061 RelationBuildDesc(Oid targetRelId, bool insertIt)
1062 {
1063 	int			in_progress_offset;
1064 	Relation	relation;
1065 	Oid			relid;
1066 	HeapTuple	pg_class_tuple;
1067 	Form_pg_class relp;
1068 
1069 	/*
1070 	 * This function and its subroutines can allocate a good deal of transient
1071 	 * data in CurrentMemoryContext.  Traditionally we've just leaked that
1072 	 * data, reasoning that the caller's context is at worst of transaction
1073 	 * scope, and relcache loads shouldn't happen so often that it's essential
1074 	 * to recover transient data before end of statement/transaction.  However
1075 	 * that's definitely not true in clobber-cache test builds, and perhaps
1076 	 * it's not true in other cases.  If RECOVER_RELATION_BUILD_MEMORY is not
1077 	 * zero, arrange to allocate the junk in a temporary context that we'll
1078 	 * free before returning.  Make it a child of caller's context so that it
1079 	 * will get cleaned up appropriately if we error out partway through.
1080 	 */
1081 #if RECOVER_RELATION_BUILD_MEMORY
1082 	MemoryContext tmpcxt;
1083 	MemoryContext oldcxt;
1084 
1085 	tmpcxt = AllocSetContextCreate(CurrentMemoryContext,
1086 								   "RelationBuildDesc workspace",
1087 								   ALLOCSET_DEFAULT_SIZES);
1088 	oldcxt = MemoryContextSwitchTo(tmpcxt);
1089 #endif
1090 
1091 	/* Register to catch invalidation messages */
1092 	if (in_progress_list_len >= in_progress_list_maxlen)
1093 	{
1094 		int			allocsize;
1095 
1096 		allocsize = in_progress_list_maxlen * 2;
1097 		in_progress_list = repalloc(in_progress_list,
1098 									allocsize * sizeof(*in_progress_list));
1099 		in_progress_list_maxlen = allocsize;
1100 	}
1101 	in_progress_offset = in_progress_list_len++;
1102 	in_progress_list[in_progress_offset].reloid = targetRelId;
1103 retry:
1104 	in_progress_list[in_progress_offset].invalidated = false;
1105 
1106 	/*
1107 	 * find the tuple in pg_class corresponding to the given relation id
1108 	 */
1109 	pg_class_tuple = ScanPgRelation(targetRelId, true, false);
1110 
1111 	/*
1112 	 * if no such tuple exists, return NULL
1113 	 */
1114 	if (!HeapTupleIsValid(pg_class_tuple))
1115 	{
1116 #if RECOVER_RELATION_BUILD_MEMORY
1117 		/* Return to caller's context, and blow away the temporary context */
1118 		MemoryContextSwitchTo(oldcxt);
1119 		MemoryContextDelete(tmpcxt);
1120 #endif
1121 		Assert(in_progress_offset + 1 == in_progress_list_len);
1122 		in_progress_list_len--;
1123 		return NULL;
1124 	}
1125 
1126 	/*
1127 	 * get information from the pg_class_tuple
1128 	 */
1129 	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1130 	relid = relp->oid;
1131 	Assert(relid == targetRelId);
1132 
1133 	/*
1134 	 * allocate storage for the relation descriptor, and copy pg_class_tuple
1135 	 * to relation->rd_rel.
1136 	 */
1137 	relation = AllocateRelationDesc(relp);
1138 
1139 	/*
1140 	 * initialize the relation's relation id (relation->rd_id)
1141 	 */
1142 	RelationGetRelid(relation) = relid;
1143 
1144 	/*
1145 	 * normal relations are not nailed into the cache; nor can a pre-existing
1146 	 * relation be new.  It could be temp though.  (Actually, it could be new
1147 	 * too, but it's okay to forget that fact if forced to flush the entry.)
1148 	 */
1149 	relation->rd_refcnt = 0;
1150 	relation->rd_isnailed = false;
1151 	relation->rd_createSubid = InvalidSubTransactionId;
1152 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1153 	switch (relation->rd_rel->relpersistence)
1154 	{
1155 		case RELPERSISTENCE_UNLOGGED:
1156 		case RELPERSISTENCE_PERMANENT:
1157 			relation->rd_backend = InvalidBackendId;
1158 			relation->rd_islocaltemp = false;
1159 			break;
1160 		case RELPERSISTENCE_TEMP:
1161 			if (isTempOrTempToastNamespace(relation->rd_rel->relnamespace))
1162 			{
1163 				relation->rd_backend = BackendIdForTempRelations();
1164 				relation->rd_islocaltemp = true;
1165 			}
1166 			else
1167 			{
1168 				/*
1169 				 * If it's a temp table, but not one of ours, we have to use
1170 				 * the slow, grotty method to figure out the owning backend.
1171 				 *
1172 				 * Note: it's possible that rd_backend gets set to MyBackendId
1173 				 * here, in case we are looking at a pg_class entry left over
1174 				 * from a crashed backend that coincidentally had the same
1175 				 * BackendId we're using.  We should *not* consider such a
1176 				 * table to be "ours"; this is why we need the separate
1177 				 * rd_islocaltemp flag.  The pg_class entry will get flushed
1178 				 * if/when we clean out the corresponding temp table namespace
1179 				 * in preparation for using it.
1180 				 */
1181 				relation->rd_backend =
1182 					GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
1183 				Assert(relation->rd_backend != InvalidBackendId);
1184 				relation->rd_islocaltemp = false;
1185 			}
1186 			break;
1187 		default:
1188 			elog(ERROR, "invalid relpersistence: %c",
1189 				 relation->rd_rel->relpersistence);
1190 			break;
1191 	}
1192 
1193 	/*
1194 	 * initialize the tuple descriptor (relation->rd_att).
1195 	 */
1196 	RelationBuildTupleDesc(relation);
1197 
1198 	/*
1199 	 * Fetch rules and triggers that affect this relation
1200 	 */
1201 	if (relation->rd_rel->relhasrules)
1202 		RelationBuildRuleLock(relation);
1203 	else
1204 	{
1205 		relation->rd_rules = NULL;
1206 		relation->rd_rulescxt = NULL;
1207 	}
1208 
1209 	if (relation->rd_rel->relhastriggers)
1210 		RelationBuildTriggers(relation);
1211 	else
1212 		relation->trigdesc = NULL;
1213 
1214 	if (relation->rd_rel->relrowsecurity)
1215 		RelationBuildRowSecurity(relation);
1216 	else
1217 		relation->rd_rsdesc = NULL;
1218 
1219 	/* foreign key data is not loaded till asked for */
1220 	relation->rd_fkeylist = NIL;
1221 	relation->rd_fkeyvalid = false;
1222 
1223 	/* if a partitioned table, initialize key and partition descriptor info */
1224 	if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1225 	{
1226 		RelationBuildPartitionKey(relation);
1227 		RelationBuildPartitionDesc(relation);
1228 	}
1229 	else
1230 	{
1231 		relation->rd_partkey = NULL;
1232 		relation->rd_partkeycxt = NULL;
1233 		relation->rd_partdesc = NULL;
1234 		relation->rd_pdcxt = NULL;
1235 	}
1236 	/* ... but partcheck is not loaded till asked for */
1237 	relation->rd_partcheck = NIL;
1238 	relation->rd_partcheckvalid = false;
1239 	relation->rd_partcheckcxt = NULL;
1240 
1241 	/*
1242 	 * initialize access method information
1243 	 */
1244 	switch (relation->rd_rel->relkind)
1245 	{
1246 		case RELKIND_INDEX:
1247 		case RELKIND_PARTITIONED_INDEX:
1248 			Assert(relation->rd_rel->relam != InvalidOid);
1249 			RelationInitIndexAccessInfo(relation);
1250 			break;
1251 		case RELKIND_RELATION:
1252 		case RELKIND_TOASTVALUE:
1253 		case RELKIND_MATVIEW:
1254 			Assert(relation->rd_rel->relam != InvalidOid);
1255 			RelationInitTableAccessMethod(relation);
1256 			break;
1257 		case RELKIND_SEQUENCE:
1258 			Assert(relation->rd_rel->relam == InvalidOid);
1259 			RelationInitTableAccessMethod(relation);
1260 			break;
1261 		case RELKIND_VIEW:
1262 		case RELKIND_COMPOSITE_TYPE:
1263 		case RELKIND_FOREIGN_TABLE:
1264 		case RELKIND_PARTITIONED_TABLE:
1265 			Assert(relation->rd_rel->relam == InvalidOid);
1266 			break;
1267 	}
1268 
1269 	/* extract reloptions if any */
1270 	RelationParseRelOptions(relation, pg_class_tuple);
1271 
1272 	/*
1273 	 * initialize the relation lock manager information
1274 	 */
1275 	RelationInitLockInfo(relation); /* see lmgr.c */
1276 
1277 	/*
1278 	 * initialize physical addressing information for the relation
1279 	 */
1280 	RelationInitPhysicalAddr(relation);
1281 
1282 	/* make sure relation is marked as having no open file yet */
1283 	relation->rd_smgr = NULL;
1284 
1285 	/*
1286 	 * now we can free the memory allocated for pg_class_tuple
1287 	 */
1288 	heap_freetuple(pg_class_tuple);
1289 
1290 	/*
1291 	 * If an invalidation arrived mid-build, start over.  Between here and the
1292 	 * end of this function, don't add code that does or reasonably could read
1293 	 * system catalogs.  That range must be free from invalidation processing
1294 	 * for the !insertIt case.  For the insertIt case, RelationCacheInsert()
1295 	 * will enroll this relation in ordinary relcache invalidation processing,
1296 	 */
1297 	if (in_progress_list[in_progress_offset].invalidated)
1298 	{
1299 		RelationDestroyRelation(relation, false);
1300 		goto retry;
1301 	}
1302 	Assert(in_progress_offset + 1 == in_progress_list_len);
1303 	in_progress_list_len--;
1304 
1305 	/*
1306 	 * Insert newly created relation into relcache hash table, if requested.
1307 	 *
1308 	 * There is one scenario in which we might find a hashtable entry already
1309 	 * present, even though our caller failed to find it: if the relation is a
1310 	 * system catalog or index that's used during relcache load, we might have
1311 	 * recursively created the same relcache entry during the preceding steps.
1312 	 * So allow RelationCacheInsert to delete any already-present relcache
1313 	 * entry for the same OID.  The already-present entry should have refcount
1314 	 * zero (else somebody forgot to close it); in the event that it doesn't,
1315 	 * we'll elog a WARNING and leak the already-present entry.
1316 	 */
1317 	if (insertIt)
1318 		RelationCacheInsert(relation, true);
1319 
1320 	/* It's fully valid */
1321 	relation->rd_isvalid = true;
1322 
1323 #if RECOVER_RELATION_BUILD_MEMORY
1324 	/* Return to caller's context, and blow away the temporary context */
1325 	MemoryContextSwitchTo(oldcxt);
1326 	MemoryContextDelete(tmpcxt);
1327 #endif
1328 
1329 	return relation;
1330 }
1331 
1332 /*
1333  * Initialize the physical addressing info (RelFileNode) for a relcache entry
1334  *
1335  * Note: at the physical level, relations in the pg_global tablespace must
1336  * be treated as shared, even if relisshared isn't set.  Hence we do not
1337  * look at relisshared here.
1338  */
1339 static void
1340 RelationInitPhysicalAddr(Relation relation)
1341 {
1342 	/* these relations kinds never have storage */
1343 	if (!RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
1344 		return;
1345 
1346 	if (relation->rd_rel->reltablespace)
1347 		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
1348 	else
1349 		relation->rd_node.spcNode = MyDatabaseTableSpace;
1350 	if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
1351 		relation->rd_node.dbNode = InvalidOid;
1352 	else
1353 		relation->rd_node.dbNode = MyDatabaseId;
1354 
1355 	if (relation->rd_rel->relfilenode)
1356 	{
1357 		/*
1358 		 * Even if we are using a decoding snapshot that doesn't represent the
1359 		 * current state of the catalog we need to make sure the filenode
1360 		 * points to the current file since the older file will be gone (or
1361 		 * truncated). The new file will still contain older rows so lookups
1362 		 * in them will work correctly. This wouldn't work correctly if
1363 		 * rewrites were allowed to change the schema in an incompatible way,
1364 		 * but those are prevented both on catalog tables and on user tables
1365 		 * declared as additional catalog tables.
1366 		 */
1367 		if (HistoricSnapshotActive()
1368 			&& RelationIsAccessibleInLogicalDecoding(relation)
1369 			&& IsTransactionState())
1370 		{
1371 			HeapTuple	phys_tuple;
1372 			Form_pg_class physrel;
1373 
1374 			phys_tuple = ScanPgRelation(RelationGetRelid(relation),
1375 										RelationGetRelid(relation) != ClassOidIndexId,
1376 										true);
1377 			if (!HeapTupleIsValid(phys_tuple))
1378 				elog(ERROR, "could not find pg_class entry for %u",
1379 					 RelationGetRelid(relation));
1380 			physrel = (Form_pg_class) GETSTRUCT(phys_tuple);
1381 
1382 			relation->rd_rel->reltablespace = physrel->reltablespace;
1383 			relation->rd_rel->relfilenode = physrel->relfilenode;
1384 			heap_freetuple(phys_tuple);
1385 		}
1386 
1387 		relation->rd_node.relNode = relation->rd_rel->relfilenode;
1388 	}
1389 	else
1390 	{
1391 		/* Consult the relation mapper */
1392 		relation->rd_node.relNode =
1393 			RelationMapOidToFilenode(relation->rd_id,
1394 									 relation->rd_rel->relisshared);
1395 		if (!OidIsValid(relation->rd_node.relNode))
1396 			elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1397 				 RelationGetRelationName(relation), relation->rd_id);
1398 	}
1399 }
1400 
1401 /*
1402  * Fill in the IndexAmRoutine for an index relation.
1403  *
1404  * relation's rd_amhandler and rd_indexcxt must be valid already.
1405  */
1406 static void
1407 InitIndexAmRoutine(Relation relation)
1408 {
1409 	IndexAmRoutine *cached,
1410 			   *tmp;
1411 
1412 	/*
1413 	 * Call the amhandler in current, short-lived memory context, just in case
1414 	 * it leaks anything (it probably won't, but let's be paranoid).
1415 	 */
1416 	tmp = GetIndexAmRoutine(relation->rd_amhandler);
1417 
1418 	/* OK, now transfer the data into relation's rd_indexcxt. */
1419 	cached = (IndexAmRoutine *) MemoryContextAlloc(relation->rd_indexcxt,
1420 												   sizeof(IndexAmRoutine));
1421 	memcpy(cached, tmp, sizeof(IndexAmRoutine));
1422 	relation->rd_indam = cached;
1423 
1424 	pfree(tmp);
1425 }
1426 
1427 /*
1428  * Initialize index-access-method support data for an index relation
1429  */
1430 void
1431 RelationInitIndexAccessInfo(Relation relation)
1432 {
1433 	HeapTuple	tuple;
1434 	Form_pg_am	aform;
1435 	Datum		indcollDatum;
1436 	Datum		indclassDatum;
1437 	Datum		indoptionDatum;
1438 	bool		isnull;
1439 	oidvector  *indcoll;
1440 	oidvector  *indclass;
1441 	int2vector *indoption;
1442 	MemoryContext indexcxt;
1443 	MemoryContext oldcontext;
1444 	int			indnatts;
1445 	int			indnkeyatts;
1446 	uint16		amsupport;
1447 
1448 	/*
1449 	 * Make a copy of the pg_index entry for the index.  Since pg_index
1450 	 * contains variable-length and possibly-null fields, we have to do this
1451 	 * honestly rather than just treating it as a Form_pg_index struct.
1452 	 */
1453 	tuple = SearchSysCache1(INDEXRELID,
1454 							ObjectIdGetDatum(RelationGetRelid(relation)));
1455 	if (!HeapTupleIsValid(tuple))
1456 		elog(ERROR, "cache lookup failed for index %u",
1457 			 RelationGetRelid(relation));
1458 	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
1459 	relation->rd_indextuple = heap_copytuple(tuple);
1460 	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
1461 	MemoryContextSwitchTo(oldcontext);
1462 	ReleaseSysCache(tuple);
1463 
1464 	/*
1465 	 * Look up the index's access method, save the OID of its handler function
1466 	 */
1467 	tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
1468 	if (!HeapTupleIsValid(tuple))
1469 		elog(ERROR, "cache lookup failed for access method %u",
1470 			 relation->rd_rel->relam);
1471 	aform = (Form_pg_am) GETSTRUCT(tuple);
1472 	relation->rd_amhandler = aform->amhandler;
1473 	ReleaseSysCache(tuple);
1474 
1475 	indnatts = RelationGetNumberOfAttributes(relation);
1476 	if (indnatts != IndexRelationGetNumberOfAttributes(relation))
1477 		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1478 			 RelationGetRelid(relation));
1479 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
1480 
1481 	/*
1482 	 * Make the private context to hold index access info.  The reason we need
1483 	 * a context, and not just a couple of pallocs, is so that we won't leak
1484 	 * any subsidiary info attached to fmgr lookup records.
1485 	 */
1486 	indexcxt = AllocSetContextCreate(CacheMemoryContext,
1487 									 "index info",
1488 									 ALLOCSET_SMALL_SIZES);
1489 	relation->rd_indexcxt = indexcxt;
1490 	MemoryContextCopyAndSetIdentifier(indexcxt,
1491 									  RelationGetRelationName(relation));
1492 
1493 	/*
1494 	 * Now we can fetch the index AM's API struct
1495 	 */
1496 	InitIndexAmRoutine(relation);
1497 
1498 	/*
1499 	 * Allocate arrays to hold data. Opclasses are not used for included
1500 	 * columns, so allocate them for indnkeyatts only.
1501 	 */
1502 	relation->rd_opfamily = (Oid *)
1503 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1504 	relation->rd_opcintype = (Oid *)
1505 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1506 
1507 	amsupport = relation->rd_indam->amsupport;
1508 	if (amsupport > 0)
1509 	{
1510 		int			nsupport = indnatts * amsupport;
1511 
1512 		relation->rd_support = (RegProcedure *)
1513 			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1514 		relation->rd_supportinfo = (FmgrInfo *)
1515 			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1516 	}
1517 	else
1518 	{
1519 		relation->rd_support = NULL;
1520 		relation->rd_supportinfo = NULL;
1521 	}
1522 
1523 	relation->rd_indcollation = (Oid *)
1524 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1525 
1526 	relation->rd_indoption = (int16 *)
1527 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(int16));
1528 
1529 	/*
1530 	 * indcollation cannot be referenced directly through the C struct,
1531 	 * because it comes after the variable-width indkey field.  Must extract
1532 	 * the datum the hard way...
1533 	 */
1534 	indcollDatum = fastgetattr(relation->rd_indextuple,
1535 							   Anum_pg_index_indcollation,
1536 							   GetPgIndexDescriptor(),
1537 							   &isnull);
1538 	Assert(!isnull);
1539 	indcoll = (oidvector *) DatumGetPointer(indcollDatum);
1540 	memcpy(relation->rd_indcollation, indcoll->values, indnkeyatts * sizeof(Oid));
1541 
1542 	/*
1543 	 * indclass cannot be referenced directly through the C struct, because it
1544 	 * comes after the variable-width indkey field.  Must extract the datum
1545 	 * the hard way...
1546 	 */
1547 	indclassDatum = fastgetattr(relation->rd_indextuple,
1548 								Anum_pg_index_indclass,
1549 								GetPgIndexDescriptor(),
1550 								&isnull);
1551 	Assert(!isnull);
1552 	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1553 
1554 	/*
1555 	 * Fill the support procedure OID array, as well as the info about
1556 	 * opfamilies and opclass input types.  (aminfo and supportinfo are left
1557 	 * as zeroes, and are filled on-the-fly when used)
1558 	 */
1559 	IndexSupportInitialize(indclass, relation->rd_support,
1560 						   relation->rd_opfamily, relation->rd_opcintype,
1561 						   amsupport, indnkeyatts);
1562 
1563 	/*
1564 	 * Similarly extract indoption and copy it to the cache entry
1565 	 */
1566 	indoptionDatum = fastgetattr(relation->rd_indextuple,
1567 								 Anum_pg_index_indoption,
1568 								 GetPgIndexDescriptor(),
1569 								 &isnull);
1570 	Assert(!isnull);
1571 	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
1572 	memcpy(relation->rd_indoption, indoption->values, indnkeyatts * sizeof(int16));
1573 
1574 	/*
1575 	 * expressions, predicate, exclusion caches will be filled later
1576 	 */
1577 	relation->rd_indexprs = NIL;
1578 	relation->rd_indpred = NIL;
1579 	relation->rd_exclops = NULL;
1580 	relation->rd_exclprocs = NULL;
1581 	relation->rd_exclstrats = NULL;
1582 	relation->rd_amcache = NULL;
1583 }
1584 
1585 /*
1586  * IndexSupportInitialize
1587  *		Initializes an index's cached opclass information,
1588  *		given the index's pg_index.indclass entry.
1589  *
1590  * Data is returned into *indexSupport, *opFamily, and *opcInType,
1591  * which are arrays allocated by the caller.
1592  *
1593  * The caller also passes maxSupportNumber and maxAttributeNumber, since these
1594  * indicate the size of the arrays it has allocated --- but in practice these
1595  * numbers must always match those obtainable from the system catalog entries
1596  * for the index and access method.
1597  */
1598 static void
1599 IndexSupportInitialize(oidvector *indclass,
1600 					   RegProcedure *indexSupport,
1601 					   Oid *opFamily,
1602 					   Oid *opcInType,
1603 					   StrategyNumber maxSupportNumber,
1604 					   AttrNumber maxAttributeNumber)
1605 {
1606 	int			attIndex;
1607 
1608 	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1609 	{
1610 		OpClassCacheEnt *opcentry;
1611 
1612 		if (!OidIsValid(indclass->values[attIndex]))
1613 			elog(ERROR, "bogus pg_index tuple");
1614 
1615 		/* look up the info for this opclass, using a cache */
1616 		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1617 									 maxSupportNumber);
1618 
1619 		/* copy cached data into relcache entry */
1620 		opFamily[attIndex] = opcentry->opcfamily;
1621 		opcInType[attIndex] = opcentry->opcintype;
1622 		if (maxSupportNumber > 0)
1623 			memcpy(&indexSupport[attIndex * maxSupportNumber],
1624 				   opcentry->supportProcs,
1625 				   maxSupportNumber * sizeof(RegProcedure));
1626 	}
1627 }
1628 
1629 /*
1630  * LookupOpclassInfo
1631  *
1632  * This routine maintains a per-opclass cache of the information needed
1633  * by IndexSupportInitialize().  This is more efficient than relying on
1634  * the catalog cache, because we can load all the info about a particular
1635  * opclass in a single indexscan of pg_amproc.
1636  *
1637  * The information from pg_am about expected range of support function
1638  * numbers is passed in, rather than being looked up, mainly because the
1639  * caller will have it already.
1640  *
1641  * Note there is no provision for flushing the cache.  This is OK at the
1642  * moment because there is no way to ALTER any interesting properties of an
1643  * existing opclass --- all you can do is drop it, which will result in
1644  * a useless but harmless dead entry in the cache.  To support altering
1645  * opclass membership (not the same as opfamily membership!), we'd need to
1646  * be able to flush this cache as well as the contents of relcache entries
1647  * for indexes.
1648  */
1649 static OpClassCacheEnt *
1650 LookupOpclassInfo(Oid operatorClassOid,
1651 				  StrategyNumber numSupport)
1652 {
1653 	OpClassCacheEnt *opcentry;
1654 	bool		found;
1655 	Relation	rel;
1656 	SysScanDesc scan;
1657 	ScanKeyData skey[3];
1658 	HeapTuple	htup;
1659 	bool		indexOK;
1660 
1661 	if (OpClassCache == NULL)
1662 	{
1663 		/* First time through: initialize the opclass cache */
1664 		HASHCTL		ctl;
1665 
1666 		/* Also make sure CacheMemoryContext exists */
1667 		if (!CacheMemoryContext)
1668 			CreateCacheMemoryContext();
1669 
1670 		MemSet(&ctl, 0, sizeof(ctl));
1671 		ctl.keysize = sizeof(Oid);
1672 		ctl.entrysize = sizeof(OpClassCacheEnt);
1673 		OpClassCache = hash_create("Operator class cache", 64,
1674 								   &ctl, HASH_ELEM | HASH_BLOBS);
1675 	}
1676 
1677 	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1678 											   (void *) &operatorClassOid,
1679 											   HASH_ENTER, &found);
1680 
1681 	if (!found)
1682 	{
1683 		/* Initialize new entry */
1684 		opcentry->valid = false;	/* until known OK */
1685 		opcentry->numSupport = numSupport;
1686 		opcentry->supportProcs = NULL;	/* filled below */
1687 	}
1688 	else
1689 	{
1690 		Assert(numSupport == opcentry->numSupport);
1691 	}
1692 
1693 	/*
1694 	 * When aggressively testing cache-flush hazards, we disable the operator
1695 	 * class cache and force reloading of the info on each call.  This models
1696 	 * no real-world behavior, since the cache entries are never invalidated
1697 	 * otherwise.  However it can be helpful for detecting bugs in the cache
1698 	 * loading logic itself, such as reliance on a non-nailed index.  Given
1699 	 * the limited use-case and the fact that this adds a great deal of
1700 	 * expense, we enable it only in CLOBBER_CACHE_RECURSIVELY mode.
1701 	 */
1702 #if defined(CLOBBER_CACHE_RECURSIVELY)
1703 	opcentry->valid = false;
1704 #endif
1705 
1706 	if (opcentry->valid)
1707 		return opcentry;
1708 
1709 	/*
1710 	 * Need to fill in new entry.  First allocate space, unless we already did
1711 	 * so in some previous attempt.
1712 	 */
1713 	if (opcentry->supportProcs == NULL && numSupport > 0)
1714 		opcentry->supportProcs = (RegProcedure *)
1715 			MemoryContextAllocZero(CacheMemoryContext,
1716 								   numSupport * sizeof(RegProcedure));
1717 
1718 	/*
1719 	 * To avoid infinite recursion during startup, force heap scans if we're
1720 	 * looking up info for the opclasses used by the indexes we would like to
1721 	 * reference here.
1722 	 */
1723 	indexOK = criticalRelcachesBuilt ||
1724 		(operatorClassOid != OID_BTREE_OPS_OID &&
1725 		 operatorClassOid != INT2_BTREE_OPS_OID);
1726 
1727 	/*
1728 	 * We have to fetch the pg_opclass row to determine its opfamily and
1729 	 * opcintype, which are needed to look up related operators and functions.
1730 	 * It'd be convenient to use the syscache here, but that probably doesn't
1731 	 * work while bootstrapping.
1732 	 */
1733 	ScanKeyInit(&skey[0],
1734 				Anum_pg_opclass_oid,
1735 				BTEqualStrategyNumber, F_OIDEQ,
1736 				ObjectIdGetDatum(operatorClassOid));
1737 	rel = table_open(OperatorClassRelationId, AccessShareLock);
1738 	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
1739 							  NULL, 1, skey);
1740 
1741 	if (HeapTupleIsValid(htup = systable_getnext(scan)))
1742 	{
1743 		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
1744 
1745 		opcentry->opcfamily = opclassform->opcfamily;
1746 		opcentry->opcintype = opclassform->opcintype;
1747 	}
1748 	else
1749 		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
1750 
1751 	systable_endscan(scan);
1752 	table_close(rel, AccessShareLock);
1753 
1754 	/*
1755 	 * Scan pg_amproc to obtain support procs for the opclass.  We only fetch
1756 	 * the default ones (those with lefttype = righttype = opcintype).
1757 	 */
1758 	if (numSupport > 0)
1759 	{
1760 		ScanKeyInit(&skey[0],
1761 					Anum_pg_amproc_amprocfamily,
1762 					BTEqualStrategyNumber, F_OIDEQ,
1763 					ObjectIdGetDatum(opcentry->opcfamily));
1764 		ScanKeyInit(&skey[1],
1765 					Anum_pg_amproc_amproclefttype,
1766 					BTEqualStrategyNumber, F_OIDEQ,
1767 					ObjectIdGetDatum(opcentry->opcintype));
1768 		ScanKeyInit(&skey[2],
1769 					Anum_pg_amproc_amprocrighttype,
1770 					BTEqualStrategyNumber, F_OIDEQ,
1771 					ObjectIdGetDatum(opcentry->opcintype));
1772 		rel = table_open(AccessMethodProcedureRelationId, AccessShareLock);
1773 		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1774 								  NULL, 3, skey);
1775 
1776 		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1777 		{
1778 			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1779 
1780 			if (amprocform->amprocnum <= 0 ||
1781 				(StrategyNumber) amprocform->amprocnum > numSupport)
1782 				elog(ERROR, "invalid amproc number %d for opclass %u",
1783 					 amprocform->amprocnum, operatorClassOid);
1784 
1785 			opcentry->supportProcs[amprocform->amprocnum - 1] =
1786 				amprocform->amproc;
1787 		}
1788 
1789 		systable_endscan(scan);
1790 		table_close(rel, AccessShareLock);
1791 	}
1792 
1793 	opcentry->valid = true;
1794 	return opcentry;
1795 }
1796 
1797 /*
1798  * Fill in the TableAmRoutine for a relation
1799  *
1800  * relation's rd_amhandler must be valid already.
1801  */
1802 static void
1803 InitTableAmRoutine(Relation relation)
1804 {
1805 	relation->rd_tableam = GetTableAmRoutine(relation->rd_amhandler);
1806 }
1807 
1808 /*
1809  * Initialize table access method support for a table like relation
1810  */
1811 void
1812 RelationInitTableAccessMethod(Relation relation)
1813 {
1814 	HeapTuple	tuple;
1815 	Form_pg_am	aform;
1816 
1817 	if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1818 	{
1819 		/*
1820 		 * Sequences are currently accessed like heap tables, but it doesn't
1821 		 * seem prudent to show that in the catalog. So just overwrite it
1822 		 * here.
1823 		 */
1824 		relation->rd_amhandler = HEAP_TABLE_AM_HANDLER_OID;
1825 	}
1826 	else if (IsCatalogRelation(relation))
1827 	{
1828 		/*
1829 		 * Avoid doing a syscache lookup for catalog tables.
1830 		 */
1831 		Assert(relation->rd_rel->relam == HEAP_TABLE_AM_OID);
1832 		relation->rd_amhandler = HEAP_TABLE_AM_HANDLER_OID;
1833 	}
1834 	else
1835 	{
1836 		/*
1837 		 * Look up the table access method, save the OID of its handler
1838 		 * function.
1839 		 */
1840 		Assert(relation->rd_rel->relam != InvalidOid);
1841 		tuple = SearchSysCache1(AMOID,
1842 								ObjectIdGetDatum(relation->rd_rel->relam));
1843 		if (!HeapTupleIsValid(tuple))
1844 			elog(ERROR, "cache lookup failed for access method %u",
1845 				 relation->rd_rel->relam);
1846 		aform = (Form_pg_am) GETSTRUCT(tuple);
1847 		relation->rd_amhandler = aform->amhandler;
1848 		ReleaseSysCache(tuple);
1849 	}
1850 
1851 	/*
1852 	 * Now we can fetch the table AM's API struct
1853 	 */
1854 	InitTableAmRoutine(relation);
1855 }
1856 
1857 /*
1858  *		formrdesc
1859  *
1860  *		This is a special cut-down version of RelationBuildDesc(),
1861  *		used while initializing the relcache.
1862  *		The relation descriptor is built just from the supplied parameters,
1863  *		without actually looking at any system table entries.  We cheat
1864  *		quite a lot since we only need to work for a few basic system
1865  *		catalogs.
1866  *
1867  * The catalogs this is used for can't have constraints (except attnotnull),
1868  * default values, rules, or triggers, since we don't cope with any of that.
1869  * (Well, actually, this only matters for properties that need to be valid
1870  * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
1871  * these properties matter then...)
1872  *
1873  * NOTE: we assume we are already switched into CacheMemoryContext.
1874  */
1875 static void
1876 formrdesc(const char *relationName, Oid relationReltype,
1877 		  bool isshared,
1878 		  int natts, const FormData_pg_attribute *attrs)
1879 {
1880 	Relation	relation;
1881 	int			i;
1882 	bool		has_not_null;
1883 
1884 	/*
1885 	 * allocate new relation desc, clear all fields of reldesc
1886 	 */
1887 	relation = (Relation) palloc0(sizeof(RelationData));
1888 
1889 	/* make sure relation is marked as having no open file yet */
1890 	relation->rd_smgr = NULL;
1891 
1892 	/*
1893 	 * initialize reference count: 1 because it is nailed in cache
1894 	 */
1895 	relation->rd_refcnt = 1;
1896 
1897 	/*
1898 	 * all entries built with this routine are nailed-in-cache; none are for
1899 	 * new or temp relations.
1900 	 */
1901 	relation->rd_isnailed = true;
1902 	relation->rd_createSubid = InvalidSubTransactionId;
1903 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1904 	relation->rd_backend = InvalidBackendId;
1905 	relation->rd_islocaltemp = false;
1906 
1907 	/*
1908 	 * initialize relation tuple form
1909 	 *
1910 	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
1911 	 * get us launched.  RelationCacheInitializePhase3() will read the real
1912 	 * data from pg_class and replace what we've done here.  Note in
1913 	 * particular that relowner is left as zero; this cues
1914 	 * RelationCacheInitializePhase3 that the real data isn't there yet.
1915 	 */
1916 	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1917 
1918 	namestrcpy(&relation->rd_rel->relname, relationName);
1919 	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1920 	relation->rd_rel->reltype = relationReltype;
1921 
1922 	/*
1923 	 * It's important to distinguish between shared and non-shared relations,
1924 	 * even at bootstrap time, to make sure we know where they are stored.
1925 	 */
1926 	relation->rd_rel->relisshared = isshared;
1927 	if (isshared)
1928 		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1929 
1930 	/* formrdesc is used only for permanent relations */
1931 	relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
1932 
1933 	/* ... and they're always populated, too */
1934 	relation->rd_rel->relispopulated = true;
1935 
1936 	relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
1937 	relation->rd_rel->relpages = 0;
1938 	relation->rd_rel->reltuples = 0;
1939 	relation->rd_rel->relallvisible = 0;
1940 	relation->rd_rel->relkind = RELKIND_RELATION;
1941 	relation->rd_rel->relnatts = (int16) natts;
1942 	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
1943 
1944 	/*
1945 	 * initialize attribute tuple form
1946 	 *
1947 	 * Unlike the case with the relation tuple, this data had better be right
1948 	 * because it will never be replaced.  The data comes from
1949 	 * src/include/catalog/ headers via genbki.pl.
1950 	 */
1951 	relation->rd_att = CreateTemplateTupleDesc(natts);
1952 	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */
1953 
1954 	relation->rd_att->tdtypeid = relationReltype;
1955 	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
1956 
1957 	/*
1958 	 * initialize tuple desc info
1959 	 */
1960 	has_not_null = false;
1961 	for (i = 0; i < natts; i++)
1962 	{
1963 		memcpy(TupleDescAttr(relation->rd_att, i),
1964 			   &attrs[i],
1965 			   ATTRIBUTE_FIXED_PART_SIZE);
1966 		has_not_null |= attrs[i].attnotnull;
1967 		/* make sure attcacheoff is valid */
1968 		TupleDescAttr(relation->rd_att, i)->attcacheoff = -1;
1969 	}
1970 
1971 	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1972 	TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
1973 
1974 	/* mark not-null status */
1975 	if (has_not_null)
1976 	{
1977 		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1978 
1979 		constr->has_not_null = true;
1980 		relation->rd_att->constr = constr;
1981 	}
1982 
1983 	/*
1984 	 * initialize relation id from info in att array (my, this is ugly)
1985 	 */
1986 	RelationGetRelid(relation) = TupleDescAttr(relation->rd_att, 0)->attrelid;
1987 
1988 	/*
1989 	 * All relations made with formrdesc are mapped.  This is necessarily so
1990 	 * because there is no other way to know what filenode they currently
1991 	 * have.  In bootstrap mode, add them to the initial relation mapper data,
1992 	 * specifying that the initial filenode is the same as the OID.
1993 	 */
1994 	relation->rd_rel->relfilenode = InvalidOid;
1995 	if (IsBootstrapProcessingMode())
1996 		RelationMapUpdateMap(RelationGetRelid(relation),
1997 							 RelationGetRelid(relation),
1998 							 isshared, true);
1999 
2000 	/*
2001 	 * initialize the relation lock manager information
2002 	 */
2003 	RelationInitLockInfo(relation); /* see lmgr.c */
2004 
2005 	/*
2006 	 * initialize physical addressing information for the relation
2007 	 */
2008 	RelationInitPhysicalAddr(relation);
2009 
2010 	/*
2011 	 * initialize the table am handler
2012 	 */
2013 	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
2014 	relation->rd_tableam = GetHeapamTableAmRoutine();
2015 
2016 	/*
2017 	 * initialize the rel-has-index flag, using hardwired knowledge
2018 	 */
2019 	if (IsBootstrapProcessingMode())
2020 	{
2021 		/* In bootstrap mode, we have no indexes */
2022 		relation->rd_rel->relhasindex = false;
2023 	}
2024 	else
2025 	{
2026 		/* Otherwise, all the rels formrdesc is used for have indexes */
2027 		relation->rd_rel->relhasindex = true;
2028 	}
2029 
2030 	/*
2031 	 * add new reldesc to relcache
2032 	 */
2033 	RelationCacheInsert(relation, false);
2034 
2035 	/* It's fully valid */
2036 	relation->rd_isvalid = true;
2037 }
2038 
2039 
2040 /* ----------------------------------------------------------------
2041  *				 Relation Descriptor Lookup Interface
2042  * ----------------------------------------------------------------
2043  */
2044 
2045 /*
2046  *		RelationIdGetRelation
2047  *
2048  *		Lookup a reldesc by OID; make one if not already in cache.
2049  *
2050  *		Returns NULL if no pg_class row could be found for the given relid
2051  *		(suggesting we are trying to access a just-deleted relation).
2052  *		Any other error is reported via elog.
2053  *
2054  *		NB: caller should already have at least AccessShareLock on the
2055  *		relation ID, else there are nasty race conditions.
2056  *
2057  *		NB: relation ref count is incremented, or set to 1 if new entry.
2058  *		Caller should eventually decrement count.  (Usually,
2059  *		that happens by calling RelationClose().)
2060  */
2061 Relation
2062 RelationIdGetRelation(Oid relationId)
2063 {
2064 	Relation	rd;
2065 
2066 	/* Make sure we're in an xact, even if this ends up being a cache hit */
2067 	Assert(IsTransactionState());
2068 
2069 	/*
2070 	 * first try to find reldesc in the cache
2071 	 */
2072 	RelationIdCacheLookup(relationId, rd);
2073 
2074 	if (RelationIsValid(rd))
2075 	{
2076 		RelationIncrementReferenceCount(rd);
2077 		/* revalidate cache entry if necessary */
2078 		if (!rd->rd_isvalid)
2079 		{
2080 			/*
2081 			 * Indexes only have a limited number of possible schema changes,
2082 			 * and we don't want to use the full-blown procedure because it's
2083 			 * a headache for indexes that reload itself depends on.
2084 			 */
2085 			if (rd->rd_rel->relkind == RELKIND_INDEX ||
2086 				rd->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
2087 				RelationReloadIndexInfo(rd);
2088 			else
2089 				RelationClearRelation(rd, true);
2090 
2091 			/*
2092 			 * Normally entries need to be valid here, but before the relcache
2093 			 * has been initialized, not enough infrastructure exists to
2094 			 * perform pg_class lookups. The structure of such entries doesn't
2095 			 * change, but we still want to update the rd_rel entry. So
2096 			 * rd_isvalid = false is left in place for a later lookup.
2097 			 */
2098 			Assert(rd->rd_isvalid ||
2099 				   (rd->rd_isnailed && !criticalRelcachesBuilt));
2100 		}
2101 		return rd;
2102 	}
2103 
2104 	/*
2105 	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
2106 	 * it.
2107 	 */
2108 	rd = RelationBuildDesc(relationId, true);
2109 	if (RelationIsValid(rd))
2110 		RelationIncrementReferenceCount(rd);
2111 	return rd;
2112 }
2113 
2114 /* ----------------------------------------------------------------
2115  *				cache invalidation support routines
2116  * ----------------------------------------------------------------
2117  */
2118 
2119 /*
2120  * RelationIncrementReferenceCount
2121  *		Increments relation reference count.
2122  *
2123  * Note: bootstrap mode has its own weird ideas about relation refcount
2124  * behavior; we ought to fix it someday, but for now, just disable
2125  * reference count ownership tracking in bootstrap mode.
2126  */
2127 void
2128 RelationIncrementReferenceCount(Relation rel)
2129 {
2130 	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
2131 	rel->rd_refcnt += 1;
2132 	if (!IsBootstrapProcessingMode())
2133 		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
2134 }
2135 
2136 /*
2137  * RelationDecrementReferenceCount
2138  *		Decrements relation reference count.
2139  */
2140 void
2141 RelationDecrementReferenceCount(Relation rel)
2142 {
2143 	Assert(rel->rd_refcnt > 0);
2144 	rel->rd_refcnt -= 1;
2145 	if (!IsBootstrapProcessingMode())
2146 		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
2147 }
2148 
2149 /*
2150  * RelationClose - close an open relation
2151  *
2152  *	Actually, we just decrement the refcount.
2153  *
2154  *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
2155  *	will be freed as soon as their refcount goes to zero.  In combination
2156  *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
2157  *	to catch references to already-released relcache entries.  It slows
2158  *	things down quite a bit, however.
2159  */
2160 void
2161 RelationClose(Relation relation)
2162 {
2163 	/* Note: no locking manipulations needed */
2164 	RelationDecrementReferenceCount(relation);
2165 
2166 #ifdef RELCACHE_FORCE_RELEASE
2167 	if (RelationHasReferenceCountZero(relation) &&
2168 		relation->rd_createSubid == InvalidSubTransactionId &&
2169 		relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
2170 		RelationClearRelation(relation, false);
2171 #endif
2172 }
2173 
2174 /*
2175  * RelationReloadIndexInfo - reload minimal information for an open index
2176  *
2177  *	This function is used only for indexes.  A relcache inval on an index
2178  *	can mean that its pg_class or pg_index row changed.  There are only
2179  *	very limited changes that are allowed to an existing index's schema,
2180  *	so we can update the relcache entry without a complete rebuild; which
2181  *	is fortunate because we can't rebuild an index entry that is "nailed"
2182  *	and/or in active use.  We support full replacement of the pg_class row,
2183  *	as well as updates of a few simple fields of the pg_index row.
2184  *
2185  *	We can't necessarily reread the catalog rows right away; we might be
2186  *	in a failed transaction when we receive the SI notification.  If so,
2187  *	RelationClearRelation just marks the entry as invalid by setting
2188  *	rd_isvalid to false.  This routine is called to fix the entry when it
2189  *	is next needed.
2190  *
2191  *	We assume that at the time we are called, we have at least AccessShareLock
2192  *	on the target index.  (Note: in the calls from RelationClearRelation,
2193  *	this is legitimate because we know the rel has positive refcount.)
2194  *
2195  *	If the target index is an index on pg_class or pg_index, we'd better have
2196  *	previously gotten at least AccessShareLock on its underlying catalog,
2197  *	else we are at risk of deadlock against someone trying to exclusive-lock
2198  *	the heap and index in that order.  This is ensured in current usage by
2199  *	only applying this to indexes being opened or having positive refcount.
2200  */
2201 static void
2202 RelationReloadIndexInfo(Relation relation)
2203 {
2204 	bool		indexOK;
2205 	HeapTuple	pg_class_tuple;
2206 	Form_pg_class relp;
2207 
2208 	/* Should be called only for invalidated indexes */
2209 	Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
2210 			relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2211 		   !relation->rd_isvalid);
2212 
2213 	/* Ensure it's closed at smgr level */
2214 	RelationCloseSmgr(relation);
2215 
2216 	/* Must free any AM cached data upon relcache flush */
2217 	if (relation->rd_amcache)
2218 		pfree(relation->rd_amcache);
2219 	relation->rd_amcache = NULL;
2220 
2221 	/*
2222 	 * If it's a shared index, we might be called before backend startup has
2223 	 * finished selecting a database, in which case we have no way to read
2224 	 * pg_class yet.  However, a shared index can never have any significant
2225 	 * schema updates, so it's okay to ignore the invalidation signal.  Just
2226 	 * mark it valid and return without doing anything more.
2227 	 */
2228 	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
2229 	{
2230 		relation->rd_isvalid = true;
2231 		return;
2232 	}
2233 
2234 	/*
2235 	 * Read the pg_class row
2236 	 *
2237 	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
2238 	 * for pg_class_oid_index ...
2239 	 */
2240 	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2241 	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
2242 	if (!HeapTupleIsValid(pg_class_tuple))
2243 		elog(ERROR, "could not find pg_class tuple for index %u",
2244 			 RelationGetRelid(relation));
2245 	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2246 	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2247 	/* Reload reloptions in case they changed */
2248 	if (relation->rd_options)
2249 		pfree(relation->rd_options);
2250 	RelationParseRelOptions(relation, pg_class_tuple);
2251 	/* done with pg_class tuple */
2252 	heap_freetuple(pg_class_tuple);
2253 	/* We must recalculate physical address in case it changed */
2254 	RelationInitPhysicalAddr(relation);
2255 
2256 	/*
2257 	 * For a non-system index, there are fields of the pg_index row that are
2258 	 * allowed to change, so re-read that row and update the relcache entry.
2259 	 * Most of the info derived from pg_index (such as support function lookup
2260 	 * info) cannot change, and indeed the whole point of this routine is to
2261 	 * update the relcache entry without clobbering that data; so wholesale
2262 	 * replacement is not appropriate.
2263 	 */
2264 	if (!IsSystemRelation(relation))
2265 	{
2266 		HeapTuple	tuple;
2267 		Form_pg_index index;
2268 
2269 		tuple = SearchSysCache1(INDEXRELID,
2270 								ObjectIdGetDatum(RelationGetRelid(relation)));
2271 		if (!HeapTupleIsValid(tuple))
2272 			elog(ERROR, "cache lookup failed for index %u",
2273 				 RelationGetRelid(relation));
2274 		index = (Form_pg_index) GETSTRUCT(tuple);
2275 
2276 		/*
2277 		 * Basically, let's just copy all the bool fields.  There are one or
2278 		 * two of these that can't actually change in the current code, but
2279 		 * it's not worth it to track exactly which ones they are.  None of
2280 		 * the array fields are allowed to change, though.
2281 		 */
2282 		relation->rd_index->indisunique = index->indisunique;
2283 		relation->rd_index->indisprimary = index->indisprimary;
2284 		relation->rd_index->indisexclusion = index->indisexclusion;
2285 		relation->rd_index->indimmediate = index->indimmediate;
2286 		relation->rd_index->indisclustered = index->indisclustered;
2287 		relation->rd_index->indisvalid = index->indisvalid;
2288 		relation->rd_index->indcheckxmin = index->indcheckxmin;
2289 		relation->rd_index->indisready = index->indisready;
2290 		relation->rd_index->indislive = index->indislive;
2291 
2292 		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
2293 		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
2294 							   HeapTupleHeaderGetXmin(tuple->t_data));
2295 
2296 		ReleaseSysCache(tuple);
2297 	}
2298 
2299 	/* Okay, now it's valid again */
2300 	relation->rd_isvalid = true;
2301 }
2302 
2303 /*
2304  * RelationReloadNailed - reload minimal information for nailed relations.
2305  *
2306  * The structure of a nailed relation can never change (which is good, because
2307  * we rely on knowing their structure to be able to read catalog content). But
2308  * some parts, e.g. pg_class.relfrozenxid, are still important to have
2309  * accurate content for. Therefore those need to be reloaded after the arrival
2310  * of invalidations.
2311  */
2312 static void
2313 RelationReloadNailed(Relation relation)
2314 {
2315 	Assert(relation->rd_isnailed);
2316 
2317 	/*
2318 	 * Redo RelationInitPhysicalAddr in case it is a mapped relation whose
2319 	 * mapping changed.
2320 	 */
2321 	RelationInitPhysicalAddr(relation);
2322 
2323 	/* flag as needing to be revalidated */
2324 	relation->rd_isvalid = false;
2325 
2326 	/*
2327 	 * Can only reread catalog contents if in a transaction.  If the relation
2328 	 * is currently open (not counting the nailed refcount), do so
2329 	 * immediately. Otherwise we've already marked the entry as possibly
2330 	 * invalid, and it'll be fixed when next opened.
2331 	 */
2332 	if (!IsTransactionState() || relation->rd_refcnt <= 1)
2333 		return;
2334 
2335 	if (relation->rd_rel->relkind == RELKIND_INDEX)
2336 	{
2337 		/*
2338 		 * If it's a nailed-but-not-mapped index, then we need to re-read the
2339 		 * pg_class row to see if its relfilenode changed.
2340 		 */
2341 		RelationReloadIndexInfo(relation);
2342 	}
2343 	else
2344 	{
2345 		/*
2346 		 * Reload a non-index entry.  We can't easily do so if relcaches
2347 		 * aren't yet built, but that's fine because at that stage the
2348 		 * attributes that need to be current (like relfrozenxid) aren't yet
2349 		 * accessed.  To ensure the entry will later be revalidated, we leave
2350 		 * it in invalid state, but allow use (cf. RelationIdGetRelation()).
2351 		 */
2352 		if (criticalRelcachesBuilt)
2353 		{
2354 			HeapTuple	pg_class_tuple;
2355 			Form_pg_class relp;
2356 
2357 			/*
2358 			 * NB: Mark the entry as valid before starting to scan, to avoid
2359 			 * self-recursion when re-building pg_class.
2360 			 */
2361 			relation->rd_isvalid = true;
2362 
2363 			pg_class_tuple = ScanPgRelation(RelationGetRelid(relation),
2364 											true, false);
2365 			relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2366 			memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2367 			heap_freetuple(pg_class_tuple);
2368 
2369 			/*
2370 			 * Again mark as valid, to protect against concurrently arriving
2371 			 * invalidations.
2372 			 */
2373 			relation->rd_isvalid = true;
2374 		}
2375 	}
2376 }
2377 
2378 /*
2379  * RelationDestroyRelation
2380  *
2381  *	Physically delete a relation cache entry and all subsidiary data.
2382  *	Caller must already have unhooked the entry from the hash table.
2383  */
2384 static void
2385 RelationDestroyRelation(Relation relation, bool remember_tupdesc)
2386 {
2387 	Assert(RelationHasReferenceCountZero(relation));
2388 
2389 	/*
2390 	 * Make sure smgr and lower levels close the relation's files, if they
2391 	 * weren't closed already.  (This was probably done by caller, but let's
2392 	 * just be real sure.)
2393 	 */
2394 	RelationCloseSmgr(relation);
2395 
2396 	/*
2397 	 * Free all the subsidiary data structures of the relcache entry, then the
2398 	 * entry itself.
2399 	 */
2400 	if (relation->rd_rel)
2401 		pfree(relation->rd_rel);
2402 	/* can't use DecrTupleDescRefCount here */
2403 	Assert(relation->rd_att->tdrefcount > 0);
2404 	if (--relation->rd_att->tdrefcount == 0)
2405 	{
2406 		/*
2407 		 * If we Rebuilt a relcache entry during a transaction then its
2408 		 * possible we did that because the TupDesc changed as the result of
2409 		 * an ALTER TABLE that ran at less than AccessExclusiveLock. It's
2410 		 * possible someone copied that TupDesc, in which case the copy would
2411 		 * point to free'd memory. So if we rebuild an entry we keep the
2412 		 * TupDesc around until end of transaction, to be safe.
2413 		 */
2414 		if (remember_tupdesc)
2415 			RememberToFreeTupleDescAtEOX(relation->rd_att);
2416 		else
2417 			FreeTupleDesc(relation->rd_att);
2418 	}
2419 	FreeTriggerDesc(relation->trigdesc);
2420 	list_free_deep(relation->rd_fkeylist);
2421 	list_free(relation->rd_indexlist);
2422 	list_free(relation->rd_statlist);
2423 	bms_free(relation->rd_indexattr);
2424 	bms_free(relation->rd_keyattr);
2425 	bms_free(relation->rd_pkattr);
2426 	bms_free(relation->rd_idattr);
2427 	if (relation->rd_pubactions)
2428 		pfree(relation->rd_pubactions);
2429 	if (relation->rd_options)
2430 		pfree(relation->rd_options);
2431 	if (relation->rd_indextuple)
2432 		pfree(relation->rd_indextuple);
2433 	if (relation->rd_amcache)
2434 		pfree(relation->rd_amcache);
2435 	if (relation->rd_fdwroutine)
2436 		pfree(relation->rd_fdwroutine);
2437 	if (relation->rd_indexcxt)
2438 		MemoryContextDelete(relation->rd_indexcxt);
2439 	if (relation->rd_rulescxt)
2440 		MemoryContextDelete(relation->rd_rulescxt);
2441 	if (relation->rd_rsdesc)
2442 		MemoryContextDelete(relation->rd_rsdesc->rscxt);
2443 	if (relation->rd_partkeycxt)
2444 		MemoryContextDelete(relation->rd_partkeycxt);
2445 	if (relation->rd_pdcxt)
2446 		MemoryContextDelete(relation->rd_pdcxt);
2447 	if (relation->rd_partcheckcxt)
2448 		MemoryContextDelete(relation->rd_partcheckcxt);
2449 	pfree(relation);
2450 }
2451 
2452 /*
2453  * RelationClearRelation
2454  *
2455  *	 Physically blow away a relation cache entry, or reset it and rebuild
2456  *	 it from scratch (that is, from catalog entries).  The latter path is
2457  *	 used when we are notified of a change to an open relation (one with
2458  *	 refcount > 0).
2459  *
2460  *	 NB: when rebuilding, we'd better hold some lock on the relation,
2461  *	 else the catalog data we need to read could be changing under us.
2462  *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
2463  *	 a sinval reset could happen while we're accessing the catalogs, and
2464  *	 the rel would get blown away underneath us by RelationCacheInvalidate
2465  *	 if it has zero refcnt.
2466  *
2467  *	 The "rebuild" parameter is redundant in current usage because it has
2468  *	 to match the relation's refcnt status, but we keep it as a crosscheck
2469  *	 that we're doing what the caller expects.
2470  */
2471 static void
2472 RelationClearRelation(Relation relation, bool rebuild)
2473 {
2474 	/*
2475 	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
2476 	 * course it would be an equally bad idea to blow away one with nonzero
2477 	 * refcnt, since that would leave someone somewhere with a dangling
2478 	 * pointer.  All callers are expected to have verified that this holds.
2479 	 */
2480 	Assert(rebuild ?
2481 		   !RelationHasReferenceCountZero(relation) :
2482 		   RelationHasReferenceCountZero(relation));
2483 
2484 	/*
2485 	 * Make sure smgr and lower levels close the relation's files, if they
2486 	 * weren't closed already.  If the relation is not getting deleted, the
2487 	 * next smgr access should reopen the files automatically.  This ensures
2488 	 * that the low-level file access state is updated after, say, a vacuum
2489 	 * truncation.
2490 	 */
2491 	RelationCloseSmgr(relation);
2492 
2493 	/* Free AM cached data, if any */
2494 	if (relation->rd_amcache)
2495 		pfree(relation->rd_amcache);
2496 	relation->rd_amcache = NULL;
2497 
2498 	/*
2499 	 * Treat nailed-in system relations separately, they always need to be
2500 	 * accessible, so we can't blow them away.
2501 	 */
2502 	if (relation->rd_isnailed)
2503 	{
2504 		RelationReloadNailed(relation);
2505 		return;
2506 	}
2507 
2508 	/*
2509 	 * Even non-system indexes should not be blown away if they are open and
2510 	 * have valid index support information.  This avoids problems with active
2511 	 * use of the index support information.  As with nailed indexes, we
2512 	 * re-read the pg_class row to handle possible physical relocation of the
2513 	 * index, and we check for pg_index updates too.
2514 	 */
2515 	if ((relation->rd_rel->relkind == RELKIND_INDEX ||
2516 		 relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2517 		relation->rd_refcnt > 0 &&
2518 		relation->rd_indexcxt != NULL)
2519 	{
2520 		relation->rd_isvalid = false;	/* needs to be revalidated */
2521 		if (IsTransactionState())
2522 			RelationReloadIndexInfo(relation);
2523 		return;
2524 	}
2525 
2526 	/* Mark it invalid until we've finished rebuild */
2527 	relation->rd_isvalid = false;
2528 
2529 	/*
2530 	 * If we're really done with the relcache entry, blow it away. But if
2531 	 * someone is still using it, reconstruct the whole deal without moving
2532 	 * the physical RelationData record (so that the someone's pointer is
2533 	 * still valid).
2534 	 */
2535 	if (!rebuild)
2536 	{
2537 		/* Remove it from the hash table */
2538 		RelationCacheDelete(relation);
2539 
2540 		/* And release storage */
2541 		RelationDestroyRelation(relation, false);
2542 	}
2543 	else if (!IsTransactionState())
2544 	{
2545 		/*
2546 		 * If we're not inside a valid transaction, we can't do any catalog
2547 		 * access so it's not possible to rebuild yet.  Just exit, leaving
2548 		 * rd_isvalid = false so that the rebuild will occur when the entry is
2549 		 * next opened.
2550 		 *
2551 		 * Note: it's possible that we come here during subtransaction abort,
2552 		 * and the reason for wanting to rebuild is that the rel is open in
2553 		 * the outer transaction.  In that case it might seem unsafe to not
2554 		 * rebuild immediately, since whatever code has the rel already open
2555 		 * will keep on using the relcache entry as-is.  However, in such a
2556 		 * case the outer transaction should be holding a lock that's
2557 		 * sufficient to prevent any significant change in the rel's schema,
2558 		 * so the existing entry contents should be good enough for its
2559 		 * purposes; at worst we might be behind on statistics updates or the
2560 		 * like.  (See also CheckTableNotInUse() and its callers.)	These same
2561 		 * remarks also apply to the cases above where we exit without having
2562 		 * done RelationReloadIndexInfo() yet.
2563 		 */
2564 		return;
2565 	}
2566 	else
2567 	{
2568 		/*
2569 		 * Our strategy for rebuilding an open relcache entry is to build a
2570 		 * new entry from scratch, swap its contents with the old entry, and
2571 		 * finally delete the new entry (along with any infrastructure swapped
2572 		 * over from the old entry).  This is to avoid trouble in case an
2573 		 * error causes us to lose control partway through.  The old entry
2574 		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
2575 		 * on next access.  Meanwhile it's not any less valid than it was
2576 		 * before, so any code that might expect to continue accessing it
2577 		 * isn't hurt by the rebuild failure.  (Consider for example a
2578 		 * subtransaction that ALTERs a table and then gets canceled partway
2579 		 * through the cache entry rebuild.  The outer transaction should
2580 		 * still see the not-modified cache entry as valid.)  The worst
2581 		 * consequence of an error is leaking the necessarily-unreferenced new
2582 		 * entry, and this shouldn't happen often enough for that to be a big
2583 		 * problem.
2584 		 *
2585 		 * When rebuilding an open relcache entry, we must preserve ref count,
2586 		 * rd_createSubid/rd_newRelfilenodeSubid, and rd_toastoid state.  Also
2587 		 * attempt to preserve the pg_class entry (rd_rel), tupledesc,
2588 		 * rewrite-rule, partition key, and partition descriptor substructures
2589 		 * in place, because various places assume that these structures won't
2590 		 * move while they are working with an open relcache entry.  (Note:
2591 		 * the refcount mechanism for tupledescs might someday allow us to
2592 		 * remove this hack for the tupledesc.)
2593 		 *
2594 		 * Note that this process does not touch CurrentResourceOwner; which
2595 		 * is good because whatever ref counts the entry may have do not
2596 		 * necessarily belong to that resource owner.
2597 		 */
2598 		Relation	newrel;
2599 		Oid			save_relid = RelationGetRelid(relation);
2600 		bool		keep_tupdesc;
2601 		bool		keep_rules;
2602 		bool		keep_policies;
2603 		bool		keep_partkey;
2604 		bool		keep_partdesc;
2605 
2606 		/* Build temporary entry, but don't link it into hashtable */
2607 		newrel = RelationBuildDesc(save_relid, false);
2608 
2609 		/*
2610 		 * Between here and the end of the swap, don't add code that does or
2611 		 * reasonably could read system catalogs.  That range must be free
2612 		 * from invalidation processing.  See RelationBuildDesc() manipulation
2613 		 * of in_progress_list.
2614 		 */
2615 
2616 		if (newrel == NULL)
2617 		{
2618 			/*
2619 			 * We can validly get here, if we're using a historic snapshot in
2620 			 * which a relation, accessed from outside logical decoding, is
2621 			 * still invisible. In that case it's fine to just mark the
2622 			 * relation as invalid and return - it'll fully get reloaded by
2623 			 * the cache reset at the end of logical decoding (or at the next
2624 			 * access).  During normal processing we don't want to ignore this
2625 			 * case as it shouldn't happen there, as explained below.
2626 			 */
2627 			if (HistoricSnapshotActive())
2628 				return;
2629 
2630 			/*
2631 			 * This shouldn't happen as dropping a relation is intended to be
2632 			 * impossible if still referenced (cf. CheckTableNotInUse()). But
2633 			 * if we get here anyway, we can't just delete the relcache entry,
2634 			 * as it possibly could get accessed later (as e.g. the error
2635 			 * might get trapped and handled via a subtransaction rollback).
2636 			 */
2637 			elog(ERROR, "relation %u deleted while still in use", save_relid);
2638 		}
2639 
2640 		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
2641 		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
2642 		keep_policies = equalRSDesc(relation->rd_rsdesc, newrel->rd_rsdesc);
2643 		/* partkey is immutable once set up, so we can always keep it */
2644 		keep_partkey = (relation->rd_partkey != NULL);
2645 		keep_partdesc = equalPartitionDescs(relation->rd_partkey,
2646 											relation->rd_partdesc,
2647 											newrel->rd_partdesc);
2648 
2649 		/*
2650 		 * Perform swapping of the relcache entry contents.  Within this
2651 		 * process the old entry is momentarily invalid, so there *must* be no
2652 		 * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
2653 		 * all-in-line code for safety.
2654 		 *
2655 		 * Since the vast majority of fields should be swapped, our method is
2656 		 * to swap the whole structures and then re-swap those few fields we
2657 		 * didn't want swapped.
2658 		 */
2659 #define SWAPFIELD(fldtype, fldname) \
2660 		do { \
2661 			fldtype _tmp = newrel->fldname; \
2662 			newrel->fldname = relation->fldname; \
2663 			relation->fldname = _tmp; \
2664 		} while (0)
2665 
2666 		/* swap all Relation struct fields */
2667 		{
2668 			RelationData tmpstruct;
2669 
2670 			memcpy(&tmpstruct, newrel, sizeof(RelationData));
2671 			memcpy(newrel, relation, sizeof(RelationData));
2672 			memcpy(relation, &tmpstruct, sizeof(RelationData));
2673 		}
2674 
2675 		/* rd_smgr must not be swapped, due to back-links from smgr level */
2676 		SWAPFIELD(SMgrRelation, rd_smgr);
2677 		/* rd_refcnt must be preserved */
2678 		SWAPFIELD(int, rd_refcnt);
2679 		/* isnailed shouldn't change */
2680 		Assert(newrel->rd_isnailed == relation->rd_isnailed);
2681 		/* creation sub-XIDs must be preserved */
2682 		SWAPFIELD(SubTransactionId, rd_createSubid);
2683 		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2684 		/* un-swap rd_rel pointers, swap contents instead */
2685 		SWAPFIELD(Form_pg_class, rd_rel);
2686 		/* ... but actually, we don't have to update newrel->rd_rel */
2687 		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
2688 		/* preserve old tupledesc, rules, policies if no logical change */
2689 		if (keep_tupdesc)
2690 			SWAPFIELD(TupleDesc, rd_att);
2691 		if (keep_rules)
2692 		{
2693 			SWAPFIELD(RuleLock *, rd_rules);
2694 			SWAPFIELD(MemoryContext, rd_rulescxt);
2695 		}
2696 		if (keep_policies)
2697 			SWAPFIELD(RowSecurityDesc *, rd_rsdesc);
2698 		/* toast OID override must be preserved */
2699 		SWAPFIELD(Oid, rd_toastoid);
2700 		/* pgstat_info must be preserved */
2701 		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
2702 		/* preserve old partitioning info if no logical change */
2703 		if (keep_partkey)
2704 		{
2705 			SWAPFIELD(PartitionKey, rd_partkey);
2706 			SWAPFIELD(MemoryContext, rd_partkeycxt);
2707 		}
2708 		if (keep_partdesc)
2709 		{
2710 			SWAPFIELD(PartitionDesc, rd_partdesc);
2711 			SWAPFIELD(MemoryContext, rd_pdcxt);
2712 		}
2713 		else if (rebuild && newrel->rd_pdcxt != NULL)
2714 		{
2715 			/*
2716 			 * We are rebuilding a partitioned relation with a non-zero
2717 			 * reference count, so keep the old partition descriptor around,
2718 			 * in case there's a PartitionDirectory with a pointer to it.
2719 			 * Attach it to the new rd_pdcxt so that it gets cleaned up
2720 			 * eventually.  In the case where the reference count is 0, this
2721 			 * code is not reached, which should be OK because in that case
2722 			 * there should be no PartitionDirectory with a pointer to the old
2723 			 * entry.
2724 			 *
2725 			 * Note that newrel and relation have already been swapped, so the
2726 			 * "old" partition descriptor is actually the one hanging off of
2727 			 * newrel.
2728 			 */
2729 			MemoryContextSetParent(newrel->rd_pdcxt, relation->rd_pdcxt);
2730 			newrel->rd_partdesc = NULL;
2731 			newrel->rd_pdcxt = NULL;
2732 		}
2733 
2734 #undef SWAPFIELD
2735 
2736 		/* And now we can throw away the temporary entry */
2737 		RelationDestroyRelation(newrel, !keep_tupdesc);
2738 	}
2739 }
2740 
2741 /*
2742  * RelationFlushRelation
2743  *
2744  *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
2745  *	 This is used when we receive a cache invalidation event for the rel.
2746  */
2747 static void
2748 RelationFlushRelation(Relation relation)
2749 {
2750 	if (relation->rd_createSubid != InvalidSubTransactionId ||
2751 		relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2752 	{
2753 		/*
2754 		 * New relcache entries are always rebuilt, not flushed; else we'd
2755 		 * forget the "new" status of the relation, which is a useful
2756 		 * optimization to have.  Ditto for the new-relfilenode status.
2757 		 *
2758 		 * The rel could have zero refcnt here, so temporarily increment the
2759 		 * refcnt to ensure it's safe to rebuild it.  We can assume that the
2760 		 * current transaction has some lock on the rel already.
2761 		 */
2762 		RelationIncrementReferenceCount(relation);
2763 		RelationClearRelation(relation, true);
2764 		RelationDecrementReferenceCount(relation);
2765 	}
2766 	else
2767 	{
2768 		/*
2769 		 * Pre-existing rels can be dropped from the relcache if not open.
2770 		 */
2771 		bool		rebuild = !RelationHasReferenceCountZero(relation);
2772 
2773 		RelationClearRelation(relation, rebuild);
2774 	}
2775 }
2776 
2777 /*
2778  * RelationForgetRelation - unconditionally remove a relcache entry
2779  *
2780  *		   External interface for destroying a relcache entry when we
2781  *		   drop the relation.
2782  */
2783 void
2784 RelationForgetRelation(Oid rid)
2785 {
2786 	Relation	relation;
2787 
2788 	RelationIdCacheLookup(rid, relation);
2789 
2790 	if (!PointerIsValid(relation))
2791 		return;					/* not in cache, nothing to do */
2792 
2793 	if (!RelationHasReferenceCountZero(relation))
2794 		elog(ERROR, "relation %u is still open", rid);
2795 
2796 	/* Unconditionally destroy the relcache entry */
2797 	RelationClearRelation(relation, false);
2798 }
2799 
2800 /*
2801  *		RelationCacheInvalidateEntry
2802  *
2803  *		This routine is invoked for SI cache flush messages.
2804  *
2805  * Any relcache entry matching the relid must be flushed.  (Note: caller has
2806  * already determined that the relid belongs to our database or is a shared
2807  * relation.)
2808  *
2809  * We used to skip local relations, on the grounds that they could
2810  * not be targets of cross-backend SI update messages; but it seems
2811  * safer to process them, so that our *own* SI update messages will
2812  * have the same effects during CommandCounterIncrement for both
2813  * local and nonlocal relations.
2814  */
2815 void
2816 RelationCacheInvalidateEntry(Oid relationId)
2817 {
2818 	Relation	relation;
2819 
2820 	RelationIdCacheLookup(relationId, relation);
2821 
2822 	if (PointerIsValid(relation))
2823 	{
2824 		relcacheInvalsReceived++;
2825 		RelationFlushRelation(relation);
2826 	}
2827 	else
2828 	{
2829 		int			i;
2830 
2831 		for (i = 0; i < in_progress_list_len; i++)
2832 			if (in_progress_list[i].reloid == relationId)
2833 				in_progress_list[i].invalidated = true;
2834 	}
2835 }
2836 
2837 /*
2838  * RelationCacheInvalidate
2839  *	 Blow away cached relation descriptors that have zero reference counts,
2840  *	 and rebuild those with positive reference counts.  Also reset the smgr
2841  *	 relation cache and re-read relation mapping data.
2842  *
2843  *	 This is currently used only to recover from SI message buffer overflow,
2844  *	 so we do not touch new-in-transaction relations; they cannot be targets
2845  *	 of cross-backend SI updates (and our own updates now go through a
2846  *	 separate linked list that isn't limited by the SI message buffer size).
2847  *	 Likewise, we need not discard new-relfilenode-in-transaction hints,
2848  *	 since any invalidation of those would be a local event.
2849  *
2850  *	 We do this in two phases: the first pass deletes deletable items, and
2851  *	 the second one rebuilds the rebuildable items.  This is essential for
2852  *	 safety, because hash_seq_search only copes with concurrent deletion of
2853  *	 the element it is currently visiting.  If a second SI overflow were to
2854  *	 occur while we are walking the table, resulting in recursive entry to
2855  *	 this routine, we could crash because the inner invocation blows away
2856  *	 the entry next to be visited by the outer scan.  But this way is OK,
2857  *	 because (a) during the first pass we won't process any more SI messages,
2858  *	 so hash_seq_search will complete safely; (b) during the second pass we
2859  *	 only hold onto pointers to nondeletable entries.
2860  *
2861  *	 The two-phase approach also makes it easy to update relfilenodes for
2862  *	 mapped relations before we do anything else, and to ensure that the
2863  *	 second pass processes nailed-in-cache items before other nondeletable
2864  *	 items.  This should ensure that system catalogs are up to date before
2865  *	 we attempt to use them to reload information about other open relations.
2866  *
2867  *	 After those two phases of work having immediate effects, we normally
2868  *	 signal any RelationBuildDesc() on the stack to start over.  However, we
2869  *	 don't do this if called as part of debug_discard_caches.  Otherwise,
2870  *	 RelationBuildDesc() would become an infinite loop.
2871  */
2872 void
2873 RelationCacheInvalidate(bool debug_discard)
2874 {
2875 	HASH_SEQ_STATUS status;
2876 	RelIdCacheEnt *idhentry;
2877 	Relation	relation;
2878 	List	   *rebuildFirstList = NIL;
2879 	List	   *rebuildList = NIL;
2880 	ListCell   *l;
2881 	int			i;
2882 
2883 	/*
2884 	 * Reload relation mapping data before starting to reconstruct cache.
2885 	 */
2886 	RelationMapInvalidateAll();
2887 
2888 	/* Phase 1 */
2889 	hash_seq_init(&status, RelationIdCache);
2890 
2891 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2892 	{
2893 		relation = idhentry->reldesc;
2894 
2895 		/* Must close all smgr references to avoid leaving dangling ptrs */
2896 		RelationCloseSmgr(relation);
2897 
2898 		/*
2899 		 * Ignore new relations; no other backend will manipulate them before
2900 		 * we commit.  Likewise, before replacing a relation's relfilenode, we
2901 		 * shall have acquired AccessExclusiveLock and drained any applicable
2902 		 * pending invalidations.
2903 		 */
2904 		if (relation->rd_createSubid != InvalidSubTransactionId ||
2905 			relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2906 			continue;
2907 
2908 		relcacheInvalsReceived++;
2909 
2910 		if (RelationHasReferenceCountZero(relation))
2911 		{
2912 			/* Delete this entry immediately */
2913 			Assert(!relation->rd_isnailed);
2914 			RelationClearRelation(relation, false);
2915 		}
2916 		else
2917 		{
2918 			/*
2919 			 * If it's a mapped relation, immediately update its rd_node in
2920 			 * case its relfilenode changed.  We must do this during phase 1
2921 			 * in case the relation is consulted during rebuild of other
2922 			 * relcache entries in phase 2.  It's safe since consulting the
2923 			 * map doesn't involve any access to relcache entries.
2924 			 */
2925 			if (RelationIsMapped(relation))
2926 				RelationInitPhysicalAddr(relation);
2927 
2928 			/*
2929 			 * Add this entry to list of stuff to rebuild in second pass.
2930 			 * pg_class goes to the front of rebuildFirstList while
2931 			 * pg_class_oid_index goes to the back of rebuildFirstList, so
2932 			 * they are done first and second respectively.  Other nailed
2933 			 * relations go to the front of rebuildList, so they'll be done
2934 			 * next in no particular order; and everything else goes to the
2935 			 * back of rebuildList.
2936 			 */
2937 			if (RelationGetRelid(relation) == RelationRelationId)
2938 				rebuildFirstList = lcons(relation, rebuildFirstList);
2939 			else if (RelationGetRelid(relation) == ClassOidIndexId)
2940 				rebuildFirstList = lappend(rebuildFirstList, relation);
2941 			else if (relation->rd_isnailed)
2942 				rebuildList = lcons(relation, rebuildList);
2943 			else
2944 				rebuildList = lappend(rebuildList, relation);
2945 		}
2946 	}
2947 
2948 	/*
2949 	 * Now zap any remaining smgr cache entries.  This must happen before we
2950 	 * start to rebuild entries, since that may involve catalog fetches which
2951 	 * will re-open catalog files.
2952 	 */
2953 	smgrcloseall();
2954 
2955 	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2956 	foreach(l, rebuildFirstList)
2957 	{
2958 		relation = (Relation) lfirst(l);
2959 		RelationClearRelation(relation, true);
2960 	}
2961 	list_free(rebuildFirstList);
2962 	foreach(l, rebuildList)
2963 	{
2964 		relation = (Relation) lfirst(l);
2965 		RelationClearRelation(relation, true);
2966 	}
2967 	list_free(rebuildList);
2968 
2969 	if (!debug_discard)
2970 		/* Any RelationBuildDesc() on the stack must start over. */
2971 		for (i = 0; i < in_progress_list_len; i++)
2972 			in_progress_list[i].invalidated = true;
2973 }
2974 
2975 /*
2976  * RelationCloseSmgrByOid - close a relcache entry's smgr link
2977  *
2978  * Needed in some cases where we are changing a relation's physical mapping.
2979  * The link will be automatically reopened on next use.
2980  */
2981 void
2982 RelationCloseSmgrByOid(Oid relationId)
2983 {
2984 	Relation	relation;
2985 
2986 	RelationIdCacheLookup(relationId, relation);
2987 
2988 	if (!PointerIsValid(relation))
2989 		return;					/* not in cache, nothing to do */
2990 
2991 	RelationCloseSmgr(relation);
2992 }
2993 
2994 static void
2995 RememberToFreeTupleDescAtEOX(TupleDesc td)
2996 {
2997 	if (EOXactTupleDescArray == NULL)
2998 	{
2999 		MemoryContext oldcxt;
3000 
3001 		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3002 
3003 		EOXactTupleDescArray = (TupleDesc *) palloc(16 * sizeof(TupleDesc));
3004 		EOXactTupleDescArrayLen = 16;
3005 		NextEOXactTupleDescNum = 0;
3006 		MemoryContextSwitchTo(oldcxt);
3007 	}
3008 	else if (NextEOXactTupleDescNum >= EOXactTupleDescArrayLen)
3009 	{
3010 		int32		newlen = EOXactTupleDescArrayLen * 2;
3011 
3012 		Assert(EOXactTupleDescArrayLen > 0);
3013 
3014 		EOXactTupleDescArray = (TupleDesc *) repalloc(EOXactTupleDescArray,
3015 													  newlen * sizeof(TupleDesc));
3016 		EOXactTupleDescArrayLen = newlen;
3017 	}
3018 
3019 	EOXactTupleDescArray[NextEOXactTupleDescNum++] = td;
3020 }
3021 
3022 /*
3023  * AtEOXact_RelationCache
3024  *
3025  *	Clean up the relcache at main-transaction commit or abort.
3026  *
3027  * Note: this must be called *before* processing invalidation messages.
3028  * In the case of abort, we don't want to try to rebuild any invalidated
3029  * cache entries (since we can't safely do database accesses).  Therefore
3030  * we must reset refcnts before handling pending invalidations.
3031  *
3032  * As of PostgreSQL 8.1, relcache refcnts should get released by the
3033  * ResourceOwner mechanism.  This routine just does a debugging
3034  * cross-check that no pins remain.  However, we also need to do special
3035  * cleanup when the current transaction created any relations or made use
3036  * of forced index lists.
3037  */
3038 void
3039 AtEOXact_RelationCache(bool isCommit)
3040 {
3041 	HASH_SEQ_STATUS status;
3042 	RelIdCacheEnt *idhentry;
3043 	int			i;
3044 
3045 	/*
3046 	 * Forget in_progress_list.  This is relevant when we're aborting due to
3047 	 * an error during RelationBuildDesc().
3048 	 */
3049 	Assert(in_progress_list_len == 0 || !isCommit);
3050 	in_progress_list_len = 0;
3051 
3052 	/*
3053 	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
3054 	 * listed in it.  Otherwise fall back on a hash_seq_search scan.
3055 	 *
3056 	 * For simplicity, eoxact_list[] entries are not deleted till end of
3057 	 * top-level transaction, even though we could remove them at
3058 	 * subtransaction end in some cases, or remove relations from the list if
3059 	 * they are cleared for other reasons.  Therefore we should expect the
3060 	 * case that list entries are not found in the hashtable; if not, there's
3061 	 * nothing to do for them.
3062 	 */
3063 	if (eoxact_list_overflowed)
3064 	{
3065 		hash_seq_init(&status, RelationIdCache);
3066 		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3067 		{
3068 			AtEOXact_cleanup(idhentry->reldesc, isCommit);
3069 		}
3070 	}
3071 	else
3072 	{
3073 		for (i = 0; i < eoxact_list_len; i++)
3074 		{
3075 			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
3076 													 (void *) &eoxact_list[i],
3077 													 HASH_FIND,
3078 													 NULL);
3079 			if (idhentry != NULL)
3080 				AtEOXact_cleanup(idhentry->reldesc, isCommit);
3081 		}
3082 	}
3083 
3084 	if (EOXactTupleDescArrayLen > 0)
3085 	{
3086 		Assert(EOXactTupleDescArray != NULL);
3087 		for (i = 0; i < NextEOXactTupleDescNum; i++)
3088 			FreeTupleDesc(EOXactTupleDescArray[i]);
3089 		pfree(EOXactTupleDescArray);
3090 		EOXactTupleDescArray = NULL;
3091 	}
3092 
3093 	/* Now we're out of the transaction and can clear the lists */
3094 	eoxact_list_len = 0;
3095 	eoxact_list_overflowed = false;
3096 	NextEOXactTupleDescNum = 0;
3097 	EOXactTupleDescArrayLen = 0;
3098 }
3099 
3100 /*
3101  * AtEOXact_cleanup
3102  *
3103  *	Clean up a single rel at main-transaction commit or abort
3104  *
3105  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
3106  * bother to prevent duplicate entries in eoxact_list[].
3107  */
3108 static void
3109 AtEOXact_cleanup(Relation relation, bool isCommit)
3110 {
3111 	/*
3112 	 * The relcache entry's ref count should be back to its normal
3113 	 * not-in-a-transaction state: 0 unless it's nailed in cache.
3114 	 *
3115 	 * In bootstrap mode, this is NOT true, so don't check it --- the
3116 	 * bootstrap code expects relations to stay open across start/commit
3117 	 * transaction calls.  (That seems bogus, but it's not worth fixing.)
3118 	 *
3119 	 * Note: ideally this check would be applied to every relcache entry, not
3120 	 * just those that have eoxact work to do.  But it's not worth forcing a
3121 	 * scan of the whole relcache just for this.  (Moreover, doing so would
3122 	 * mean that assert-enabled testing never tests the hash_search code path
3123 	 * above, which seems a bad idea.)
3124 	 */
3125 #ifdef USE_ASSERT_CHECKING
3126 	if (!IsBootstrapProcessingMode())
3127 	{
3128 		int			expected_refcnt;
3129 
3130 		expected_refcnt = relation->rd_isnailed ? 1 : 0;
3131 		Assert(relation->rd_refcnt == expected_refcnt);
3132 	}
3133 #endif
3134 
3135 	/*
3136 	 * Is it a relation created in the current transaction?
3137 	 *
3138 	 * During commit, reset the flag to zero, since we are now out of the
3139 	 * creating transaction.  During abort, simply delete the relcache entry
3140 	 * --- it isn't interesting any longer.
3141 	 */
3142 	if (relation->rd_createSubid != InvalidSubTransactionId)
3143 	{
3144 		if (isCommit)
3145 			relation->rd_createSubid = InvalidSubTransactionId;
3146 		else if (RelationHasReferenceCountZero(relation))
3147 		{
3148 			RelationClearRelation(relation, false);
3149 			return;
3150 		}
3151 		else
3152 		{
3153 			/*
3154 			 * Hmm, somewhere there's a (leaked?) reference to the relation.
3155 			 * We daren't remove the entry for fear of dereferencing a
3156 			 * dangling pointer later.  Bleat, and mark it as not belonging to
3157 			 * the current transaction.  Hopefully it'll get cleaned up
3158 			 * eventually.  This must be just a WARNING to avoid
3159 			 * error-during-error-recovery loops.
3160 			 */
3161 			relation->rd_createSubid = InvalidSubTransactionId;
3162 			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
3163 				 RelationGetRelationName(relation));
3164 		}
3165 	}
3166 
3167 	/*
3168 	 * Likewise, reset the hint about the relfilenode being new.
3169 	 */
3170 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3171 }
3172 
3173 /*
3174  * AtEOSubXact_RelationCache
3175  *
3176  *	Clean up the relcache at sub-transaction commit or abort.
3177  *
3178  * Note: this must be called *before* processing invalidation messages.
3179  */
3180 void
3181 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
3182 						  SubTransactionId parentSubid)
3183 {
3184 	HASH_SEQ_STATUS status;
3185 	RelIdCacheEnt *idhentry;
3186 	int			i;
3187 
3188 	/*
3189 	 * Forget in_progress_list.  This is relevant when we're aborting due to
3190 	 * an error during RelationBuildDesc().  We don't commit subtransactions
3191 	 * during RelationBuildDesc().
3192 	 */
3193 	Assert(in_progress_list_len == 0 || !isCommit);
3194 	in_progress_list_len = 0;
3195 
3196 	/*
3197 	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
3198 	 * listed in it.  Otherwise fall back on a hash_seq_search scan.  Same
3199 	 * logic as in AtEOXact_RelationCache.
3200 	 */
3201 	if (eoxact_list_overflowed)
3202 	{
3203 		hash_seq_init(&status, RelationIdCache);
3204 		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3205 		{
3206 			AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
3207 								mySubid, parentSubid);
3208 		}
3209 	}
3210 	else
3211 	{
3212 		for (i = 0; i < eoxact_list_len; i++)
3213 		{
3214 			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
3215 													 (void *) &eoxact_list[i],
3216 													 HASH_FIND,
3217 													 NULL);
3218 			if (idhentry != NULL)
3219 				AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
3220 									mySubid, parentSubid);
3221 		}
3222 	}
3223 
3224 	/* Don't reset the list; we still need more cleanup later */
3225 }
3226 
3227 /*
3228  * AtEOSubXact_cleanup
3229  *
3230  *	Clean up a single rel at subtransaction commit or abort
3231  *
3232  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
3233  * bother to prevent duplicate entries in eoxact_list[].
3234  */
3235 static void
3236 AtEOSubXact_cleanup(Relation relation, bool isCommit,
3237 					SubTransactionId mySubid, SubTransactionId parentSubid)
3238 {
3239 	/*
3240 	 * Is it a relation created in the current subtransaction?
3241 	 *
3242 	 * During subcommit, mark it as belonging to the parent, instead. During
3243 	 * subabort, simply delete the relcache entry.
3244 	 */
3245 	if (relation->rd_createSubid == mySubid)
3246 	{
3247 		if (isCommit)
3248 			relation->rd_createSubid = parentSubid;
3249 		else if (RelationHasReferenceCountZero(relation))
3250 		{
3251 			RelationClearRelation(relation, false);
3252 			return;
3253 		}
3254 		else
3255 		{
3256 			/*
3257 			 * Hmm, somewhere there's a (leaked?) reference to the relation.
3258 			 * We daren't remove the entry for fear of dereferencing a
3259 			 * dangling pointer later.  Bleat, and transfer it to the parent
3260 			 * subtransaction so we can try again later.  This must be just a
3261 			 * WARNING to avoid error-during-error-recovery loops.
3262 			 */
3263 			relation->rd_createSubid = parentSubid;
3264 			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
3265 				 RelationGetRelationName(relation));
3266 		}
3267 	}
3268 
3269 	/*
3270 	 * Likewise, update or drop any new-relfilenode-in-subtransaction hint.
3271 	 */
3272 	if (relation->rd_newRelfilenodeSubid == mySubid)
3273 	{
3274 		if (isCommit)
3275 			relation->rd_newRelfilenodeSubid = parentSubid;
3276 		else
3277 			relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3278 	}
3279 }
3280 
3281 
3282 /*
3283  *		RelationBuildLocalRelation
3284  *			Build a relcache entry for an about-to-be-created relation,
3285  *			and enter it into the relcache.
3286  */
3287 Relation
3288 RelationBuildLocalRelation(const char *relname,
3289 						   Oid relnamespace,
3290 						   TupleDesc tupDesc,
3291 						   Oid relid,
3292 						   Oid accessmtd,
3293 						   Oid relfilenode,
3294 						   Oid reltablespace,
3295 						   bool shared_relation,
3296 						   bool mapped_relation,
3297 						   char relpersistence,
3298 						   char relkind)
3299 {
3300 	Relation	rel;
3301 	MemoryContext oldcxt;
3302 	int			natts = tupDesc->natts;
3303 	int			i;
3304 	bool		has_not_null;
3305 	bool		nailit;
3306 
3307 	AssertArg(natts >= 0);
3308 
3309 	/*
3310 	 * check for creation of a rel that must be nailed in cache.
3311 	 *
3312 	 * XXX this list had better match the relations specially handled in
3313 	 * RelationCacheInitializePhase2/3.
3314 	 */
3315 	switch (relid)
3316 	{
3317 		case DatabaseRelationId:
3318 		case AuthIdRelationId:
3319 		case AuthMemRelationId:
3320 		case RelationRelationId:
3321 		case AttributeRelationId:
3322 		case ProcedureRelationId:
3323 		case TypeRelationId:
3324 			nailit = true;
3325 			break;
3326 		default:
3327 			nailit = false;
3328 			break;
3329 	}
3330 
3331 	/*
3332 	 * check that hardwired list of shared rels matches what's in the
3333 	 * bootstrap .bki file.  If you get a failure here during initdb, you
3334 	 * probably need to fix IsSharedRelation() to match whatever you've done
3335 	 * to the set of shared relations.
3336 	 */
3337 	if (shared_relation != IsSharedRelation(relid))
3338 		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
3339 			 relname, relid);
3340 
3341 	/* Shared relations had better be mapped, too */
3342 	Assert(mapped_relation || !shared_relation);
3343 
3344 	/*
3345 	 * switch to the cache context to create the relcache entry.
3346 	 */
3347 	if (!CacheMemoryContext)
3348 		CreateCacheMemoryContext();
3349 
3350 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3351 
3352 	/*
3353 	 * allocate a new relation descriptor and fill in basic state fields.
3354 	 */
3355 	rel = (Relation) palloc0(sizeof(RelationData));
3356 
3357 	/* make sure relation is marked as having no open file yet */
3358 	rel->rd_smgr = NULL;
3359 
3360 	/* mark it nailed if appropriate */
3361 	rel->rd_isnailed = nailit;
3362 
3363 	rel->rd_refcnt = nailit ? 1 : 0;
3364 
3365 	/* it's being created in this transaction */
3366 	rel->rd_createSubid = GetCurrentSubTransactionId();
3367 	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3368 
3369 	/*
3370 	 * create a new tuple descriptor from the one passed in.  We do this
3371 	 * partly to copy it into the cache context, and partly because the new
3372 	 * relation can't have any defaults or constraints yet; they have to be
3373 	 * added in later steps, because they require additions to multiple system
3374 	 * catalogs.  We can copy attnotnull constraints here, however.
3375 	 */
3376 	rel->rd_att = CreateTupleDescCopy(tupDesc);
3377 	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
3378 	has_not_null = false;
3379 	for (i = 0; i < natts; i++)
3380 	{
3381 		Form_pg_attribute satt = TupleDescAttr(tupDesc, i);
3382 		Form_pg_attribute datt = TupleDescAttr(rel->rd_att, i);
3383 
3384 		datt->attidentity = satt->attidentity;
3385 		datt->attgenerated = satt->attgenerated;
3386 		datt->attnotnull = satt->attnotnull;
3387 		has_not_null |= satt->attnotnull;
3388 	}
3389 
3390 	if (has_not_null)
3391 	{
3392 		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
3393 
3394 		constr->has_not_null = true;
3395 		rel->rd_att->constr = constr;
3396 	}
3397 
3398 	/*
3399 	 * initialize relation tuple form (caller may add/override data later)
3400 	 */
3401 	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3402 
3403 	namestrcpy(&rel->rd_rel->relname, relname);
3404 	rel->rd_rel->relnamespace = relnamespace;
3405 
3406 	rel->rd_rel->relkind = relkind;
3407 	rel->rd_rel->relnatts = natts;
3408 	rel->rd_rel->reltype = InvalidOid;
3409 	/* needed when bootstrapping: */
3410 	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3411 
3412 	/* set up persistence and relcache fields dependent on it */
3413 	rel->rd_rel->relpersistence = relpersistence;
3414 	switch (relpersistence)
3415 	{
3416 		case RELPERSISTENCE_UNLOGGED:
3417 		case RELPERSISTENCE_PERMANENT:
3418 			rel->rd_backend = InvalidBackendId;
3419 			rel->rd_islocaltemp = false;
3420 			break;
3421 		case RELPERSISTENCE_TEMP:
3422 			Assert(isTempOrTempToastNamespace(relnamespace));
3423 			rel->rd_backend = BackendIdForTempRelations();
3424 			rel->rd_islocaltemp = true;
3425 			break;
3426 		default:
3427 			elog(ERROR, "invalid relpersistence: %c", relpersistence);
3428 			break;
3429 	}
3430 
3431 	/* if it's a materialized view, it's not populated initially */
3432 	if (relkind == RELKIND_MATVIEW)
3433 		rel->rd_rel->relispopulated = false;
3434 	else
3435 		rel->rd_rel->relispopulated = true;
3436 
3437 	/* set replica identity -- system catalogs and non-tables don't have one */
3438 	if (!IsCatalogNamespace(relnamespace) &&
3439 		(relkind == RELKIND_RELATION ||
3440 		 relkind == RELKIND_MATVIEW ||
3441 		 relkind == RELKIND_PARTITIONED_TABLE))
3442 		rel->rd_rel->relreplident = REPLICA_IDENTITY_DEFAULT;
3443 	else
3444 		rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
3445 
3446 	/*
3447 	 * Insert relation physical and logical identifiers (OIDs) into the right
3448 	 * places.  For a mapped relation, we set relfilenode to zero and rely on
3449 	 * RelationInitPhysicalAddr to consult the map.
3450 	 */
3451 	rel->rd_rel->relisshared = shared_relation;
3452 
3453 	RelationGetRelid(rel) = relid;
3454 
3455 	for (i = 0; i < natts; i++)
3456 		TupleDescAttr(rel->rd_att, i)->attrelid = relid;
3457 
3458 	rel->rd_rel->reltablespace = reltablespace;
3459 
3460 	if (mapped_relation)
3461 	{
3462 		rel->rd_rel->relfilenode = InvalidOid;
3463 		/* Add it to the active mapping information */
3464 		RelationMapUpdateMap(relid, relfilenode, shared_relation, true);
3465 	}
3466 	else
3467 		rel->rd_rel->relfilenode = relfilenode;
3468 
3469 	RelationInitLockInfo(rel);	/* see lmgr.c */
3470 
3471 	RelationInitPhysicalAddr(rel);
3472 
3473 	rel->rd_rel->relam = accessmtd;
3474 
3475 	/*
3476 	 * RelationInitTableAccessMethod will do syscache lookups, so we mustn't
3477 	 * run it in CacheMemoryContext.  Fortunately, the remaining steps don't
3478 	 * require a long-lived current context.
3479 	 */
3480 	MemoryContextSwitchTo(oldcxt);
3481 
3482 	if (relkind == RELKIND_RELATION ||
3483 		relkind == RELKIND_SEQUENCE ||
3484 		relkind == RELKIND_TOASTVALUE ||
3485 		relkind == RELKIND_MATVIEW)
3486 		RelationInitTableAccessMethod(rel);
3487 
3488 	/*
3489 	 * Okay to insert into the relcache hash table.
3490 	 *
3491 	 * Ordinarily, there should certainly not be an existing hash entry for
3492 	 * the same OID; but during bootstrap, when we create a "real" relcache
3493 	 * entry for one of the bootstrap relations, we'll be overwriting the
3494 	 * phony one created with formrdesc.  So allow that to happen for nailed
3495 	 * rels.
3496 	 */
3497 	RelationCacheInsert(rel, nailit);
3498 
3499 	/*
3500 	 * Flag relation as needing eoxact cleanup (to clear rd_createSubid). We
3501 	 * can't do this before storing relid in it.
3502 	 */
3503 	EOXactListAdd(rel);
3504 
3505 	/* It's fully valid */
3506 	rel->rd_isvalid = true;
3507 
3508 	/*
3509 	 * Caller expects us to pin the returned entry.
3510 	 */
3511 	RelationIncrementReferenceCount(rel);
3512 
3513 	return rel;
3514 }
3515 
3516 
3517 /*
3518  * RelationSetNewRelfilenode
3519  *
3520  * Assign a new relfilenode (physical file name), and possibly a new
3521  * persistence setting, to the relation.
3522  *
3523  * This allows a full rewrite of the relation to be done with transactional
3524  * safety (since the filenode assignment can be rolled back).  Note however
3525  * that there is no simple way to access the relation's old data for the
3526  * remainder of the current transaction.  This limits the usefulness to cases
3527  * such as TRUNCATE or rebuilding an index from scratch.
3528  *
3529  * Caller must already hold exclusive lock on the relation.
3530  */
3531 void
3532 RelationSetNewRelfilenode(Relation relation, char persistence)
3533 {
3534 	Oid			newrelfilenode;
3535 	Relation	pg_class;
3536 	HeapTuple	tuple;
3537 	Form_pg_class classform;
3538 	MultiXactId minmulti = InvalidMultiXactId;
3539 	TransactionId freezeXid = InvalidTransactionId;
3540 	RelFileNode newrnode;
3541 
3542 	/* Allocate a new relfilenode */
3543 	newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
3544 									   persistence);
3545 
3546 	/*
3547 	 * Get a writable copy of the pg_class tuple for the given relation.
3548 	 */
3549 	pg_class = table_open(RelationRelationId, RowExclusiveLock);
3550 
3551 	tuple = SearchSysCacheCopy1(RELOID,
3552 								ObjectIdGetDatum(RelationGetRelid(relation)));
3553 	if (!HeapTupleIsValid(tuple))
3554 		elog(ERROR, "could not find tuple for relation %u",
3555 			 RelationGetRelid(relation));
3556 	classform = (Form_pg_class) GETSTRUCT(tuple);
3557 
3558 	/*
3559 	 * Schedule unlinking of the old storage at transaction commit.
3560 	 */
3561 	RelationDropStorage(relation);
3562 
3563 	/*
3564 	 * Create storage for the main fork of the new relfilenode.  If it's a
3565 	 * table-like object, call into the table AM to do so, which'll also
3566 	 * create the table's init fork if needed.
3567 	 *
3568 	 * NOTE: If relevant for the AM, any conflict in relfilenode value will be
3569 	 * caught here, if GetNewRelFileNode messes up for any reason.
3570 	 */
3571 	newrnode = relation->rd_node;
3572 	newrnode.relNode = newrelfilenode;
3573 
3574 	switch (relation->rd_rel->relkind)
3575 	{
3576 		case RELKIND_INDEX:
3577 		case RELKIND_SEQUENCE:
3578 			{
3579 				/* handle these directly, at least for now */
3580 				SMgrRelation srel;
3581 
3582 				srel = RelationCreateStorage(newrnode, persistence);
3583 				smgrclose(srel);
3584 			}
3585 			break;
3586 
3587 		case RELKIND_RELATION:
3588 		case RELKIND_TOASTVALUE:
3589 		case RELKIND_MATVIEW:
3590 			table_relation_set_new_filenode(relation, &newrnode,
3591 											persistence,
3592 											&freezeXid, &minmulti);
3593 			break;
3594 
3595 		default:
3596 			/* we shouldn't be called for anything else */
3597 			elog(ERROR, "relation \"%s\" does not have storage",
3598 				 RelationGetRelationName(relation));
3599 			break;
3600 	}
3601 
3602 	/*
3603 	 * If we're dealing with a mapped index, pg_class.relfilenode doesn't
3604 	 * change; instead we have to send the update to the relation mapper.
3605 	 *
3606 	 * For mapped indexes, we don't actually change the pg_class entry at all;
3607 	 * this is essential when reindexing pg_class itself.  That leaves us with
3608 	 * possibly-inaccurate values of relpages etc, but those will be fixed up
3609 	 * later.
3610 	 */
3611 	if (RelationIsMapped(relation))
3612 	{
3613 		/* This case is only supported for indexes */
3614 		Assert(relation->rd_rel->relkind == RELKIND_INDEX);
3615 
3616 		/* Since we're not updating pg_class, these had better not change */
3617 		Assert(classform->relfrozenxid == freezeXid);
3618 		Assert(classform->relminmxid == minmulti);
3619 		Assert(classform->relpersistence == persistence);
3620 
3621 		/*
3622 		 * In some code paths it's possible that the tuple update we'd
3623 		 * otherwise do here is the only thing that would assign an XID for
3624 		 * the current transaction.  However, we must have an XID to delete
3625 		 * files, so make sure one is assigned.
3626 		 */
3627 		(void) GetCurrentTransactionId();
3628 
3629 		/* Do the deed */
3630 		RelationMapUpdateMap(RelationGetRelid(relation),
3631 							 newrelfilenode,
3632 							 relation->rd_rel->relisshared,
3633 							 false);
3634 
3635 		/* Since we're not updating pg_class, must trigger inval manually */
3636 		CacheInvalidateRelcache(relation);
3637 	}
3638 	else
3639 	{
3640 		/* Normal case, update the pg_class entry */
3641 		classform->relfilenode = newrelfilenode;
3642 
3643 		/* relpages etc. never change for sequences */
3644 		if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
3645 		{
3646 			classform->relpages = 0;	/* it's empty until further notice */
3647 			classform->reltuples = 0;
3648 			classform->relallvisible = 0;
3649 		}
3650 		classform->relfrozenxid = freezeXid;
3651 		classform->relminmxid = minmulti;
3652 		classform->relpersistence = persistence;
3653 
3654 		CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
3655 	}
3656 
3657 	heap_freetuple(tuple);
3658 
3659 	table_close(pg_class, RowExclusiveLock);
3660 
3661 	/*
3662 	 * Make the pg_class row change or relation map change visible.  This will
3663 	 * cause the relcache entry to get updated, too.
3664 	 */
3665 	CommandCounterIncrement();
3666 
3667 	/*
3668 	 * Mark the rel as having been given a new relfilenode in the current
3669 	 * (sub) transaction.  This is a hint that can be used to optimize later
3670 	 * operations on the rel in the same transaction.
3671 	 */
3672 	relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
3673 
3674 	/* Flag relation as needing eoxact cleanup (to remove the hint) */
3675 	EOXactListAdd(relation);
3676 }
3677 
3678 
3679 /*
3680  *		RelationCacheInitialize
3681  *
3682  *		This initializes the relation descriptor cache.  At the time
3683  *		that this is invoked, we can't do database access yet (mainly
3684  *		because the transaction subsystem is not up); all we are doing
3685  *		is making an empty cache hashtable.  This must be done before
3686  *		starting the initialization transaction, because otherwise
3687  *		AtEOXact_RelationCache would crash if that transaction aborts
3688  *		before we can get the relcache set up.
3689  */
3690 
3691 #define INITRELCACHESIZE		400
3692 
3693 void
3694 RelationCacheInitialize(void)
3695 {
3696 	HASHCTL		ctl;
3697 	int			allocsize;
3698 
3699 	/*
3700 	 * make sure cache memory context exists
3701 	 */
3702 	if (!CacheMemoryContext)
3703 		CreateCacheMemoryContext();
3704 
3705 	/*
3706 	 * create hashtable that indexes the relcache
3707 	 */
3708 	MemSet(&ctl, 0, sizeof(ctl));
3709 	ctl.keysize = sizeof(Oid);
3710 	ctl.entrysize = sizeof(RelIdCacheEnt);
3711 	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
3712 								  &ctl, HASH_ELEM | HASH_BLOBS);
3713 
3714 	/*
3715 	 * reserve enough in_progress_list slots for many cases
3716 	 */
3717 	allocsize = 4;
3718 	in_progress_list =
3719 		MemoryContextAlloc(CacheMemoryContext,
3720 						   allocsize * sizeof(*in_progress_list));
3721 	in_progress_list_maxlen = allocsize;
3722 
3723 	/*
3724 	 * relation mapper needs to be initialized too
3725 	 */
3726 	RelationMapInitialize();
3727 }
3728 
3729 /*
3730  *		RelationCacheInitializePhase2
3731  *
3732  *		This is called to prepare for access to shared catalogs during startup.
3733  *		We must at least set up nailed reldescs for pg_database, pg_authid,
3734  *		pg_auth_members, and pg_shseclabel. Ideally we'd like to have reldescs
3735  *		for their indexes, too.  We attempt to load this information from the
3736  *		shared relcache init file.  If that's missing or broken, just make
3737  *		phony entries for the catalogs themselves.
3738  *		RelationCacheInitializePhase3 will clean up as needed.
3739  */
3740 void
3741 RelationCacheInitializePhase2(void)
3742 {
3743 	MemoryContext oldcxt;
3744 
3745 	/*
3746 	 * relation mapper needs initialized too
3747 	 */
3748 	RelationMapInitializePhase2();
3749 
3750 	/*
3751 	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
3752 	 * nothing.
3753 	 */
3754 	if (IsBootstrapProcessingMode())
3755 		return;
3756 
3757 	/*
3758 	 * switch to cache memory context
3759 	 */
3760 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3761 
3762 	/*
3763 	 * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
3764 	 * the cache with pre-made descriptors for the critical shared catalogs.
3765 	 */
3766 	if (!load_relcache_init_file(true))
3767 	{
3768 		formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
3769 				  Natts_pg_database, Desc_pg_database);
3770 		formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
3771 				  Natts_pg_authid, Desc_pg_authid);
3772 		formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
3773 				  Natts_pg_auth_members, Desc_pg_auth_members);
3774 		formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true,
3775 				  Natts_pg_shseclabel, Desc_pg_shseclabel);
3776 		formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true,
3777 				  Natts_pg_subscription, Desc_pg_subscription);
3778 
3779 #define NUM_CRITICAL_SHARED_RELS	5	/* fix if you change list above */
3780 	}
3781 
3782 	MemoryContextSwitchTo(oldcxt);
3783 }
3784 
3785 /*
3786  *		RelationCacheInitializePhase3
3787  *
3788  *		This is called as soon as the catcache and transaction system
3789  *		are functional and we have determined MyDatabaseId.  At this point
3790  *		we can actually read data from the database's system catalogs.
3791  *		We first try to read pre-computed relcache entries from the local
3792  *		relcache init file.  If that's missing or broken, make phony entries
3793  *		for the minimum set of nailed-in-cache relations.  Then (unless
3794  *		bootstrapping) make sure we have entries for the critical system
3795  *		indexes.  Once we've done all this, we have enough infrastructure to
3796  *		open any system catalog or use any catcache.  The last step is to
3797  *		rewrite the cache files if needed.
3798  */
3799 void
3800 RelationCacheInitializePhase3(void)
3801 {
3802 	HASH_SEQ_STATUS status;
3803 	RelIdCacheEnt *idhentry;
3804 	MemoryContext oldcxt;
3805 	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3806 
3807 	/*
3808 	 * relation mapper needs initialized too
3809 	 */
3810 	RelationMapInitializePhase3();
3811 
3812 	/*
3813 	 * switch to cache memory context
3814 	 */
3815 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3816 
3817 	/*
3818 	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
3819 	 * the cache with pre-made descriptors for the critical "nailed-in" system
3820 	 * catalogs.
3821 	 */
3822 	if (IsBootstrapProcessingMode() ||
3823 		!load_relcache_init_file(false))
3824 	{
3825 		needNewCacheFile = true;
3826 
3827 		formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
3828 				  Natts_pg_class, Desc_pg_class);
3829 		formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
3830 				  Natts_pg_attribute, Desc_pg_attribute);
3831 		formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
3832 				  Natts_pg_proc, Desc_pg_proc);
3833 		formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
3834 				  Natts_pg_type, Desc_pg_type);
3835 
3836 #define NUM_CRITICAL_LOCAL_RELS 4	/* fix if you change list above */
3837 	}
3838 
3839 	MemoryContextSwitchTo(oldcxt);
3840 
3841 	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
3842 	if (IsBootstrapProcessingMode())
3843 		return;
3844 
3845 	/*
3846 	 * If we didn't get the critical system indexes loaded into relcache, do
3847 	 * so now.  These are critical because the catcache and/or opclass cache
3848 	 * depend on them for fetches done during relcache load.  Thus, we have an
3849 	 * infinite-recursion problem.  We can break the recursion by doing
3850 	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
3851 	 * performance, we only want to do that until we have the critical indexes
3852 	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
3853 	 * decide whether to do heapscan or indexscan at the key spots, and we set
3854 	 * it true after we've loaded the critical indexes.
3855 	 *
3856 	 * The critical indexes are marked as "nailed in cache", partly to make it
3857 	 * easy for load_relcache_init_file to count them, but mainly because we
3858 	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
3859 	 * true.  (NOTE: perhaps it would be possible to reload them by
3860 	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
3861 	 * though, we just nail 'em in.)
3862 	 *
3863 	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
3864 	 * in the same way as the others, because the critical catalogs don't
3865 	 * (currently) have any rules or triggers, and so these indexes can be
3866 	 * rebuilt without inducing recursion.  However they are used during
3867 	 * relcache load when a rel does have rules or triggers, so we choose to
3868 	 * nail them for performance reasons.
3869 	 */
3870 	if (!criticalRelcachesBuilt)
3871 	{
3872 		load_critical_index(ClassOidIndexId,
3873 							RelationRelationId);
3874 		load_critical_index(AttributeRelidNumIndexId,
3875 							AttributeRelationId);
3876 		load_critical_index(IndexRelidIndexId,
3877 							IndexRelationId);
3878 		load_critical_index(OpclassOidIndexId,
3879 							OperatorClassRelationId);
3880 		load_critical_index(AccessMethodProcedureIndexId,
3881 							AccessMethodProcedureRelationId);
3882 		load_critical_index(RewriteRelRulenameIndexId,
3883 							RewriteRelationId);
3884 		load_critical_index(TriggerRelidNameIndexId,
3885 							TriggerRelationId);
3886 
3887 #define NUM_CRITICAL_LOCAL_INDEXES	7	/* fix if you change list above */
3888 
3889 		criticalRelcachesBuilt = true;
3890 	}
3891 
3892 	/*
3893 	 * Process critical shared indexes too.
3894 	 *
3895 	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
3896 	 * initial lookup of MyDatabaseId, without which we'll never find any
3897 	 * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
3898 	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
3899 	 * need to nail up some indexes on pg_authid and pg_auth_members for use
3900 	 * during client authentication.  SharedSecLabelObjectIndexId isn't
3901 	 * critical for the core system, but authentication hooks might be
3902 	 * interested in it.
3903 	 */
3904 	if (!criticalSharedRelcachesBuilt)
3905 	{
3906 		load_critical_index(DatabaseNameIndexId,
3907 							DatabaseRelationId);
3908 		load_critical_index(DatabaseOidIndexId,
3909 							DatabaseRelationId);
3910 		load_critical_index(AuthIdRolnameIndexId,
3911 							AuthIdRelationId);
3912 		load_critical_index(AuthIdOidIndexId,
3913 							AuthIdRelationId);
3914 		load_critical_index(AuthMemMemRoleIndexId,
3915 							AuthMemRelationId);
3916 		load_critical_index(SharedSecLabelObjectIndexId,
3917 							SharedSecLabelRelationId);
3918 
3919 #define NUM_CRITICAL_SHARED_INDEXES 6	/* fix if you change list above */
3920 
3921 		criticalSharedRelcachesBuilt = true;
3922 	}
3923 
3924 	/*
3925 	 * Now, scan all the relcache entries and update anything that might be
3926 	 * wrong in the results from formrdesc or the relcache cache file. If we
3927 	 * faked up relcache entries using formrdesc, then read the real pg_class
3928 	 * rows and replace the fake entries with them. Also, if any of the
3929 	 * relcache entries have rules, triggers, or security policies, load that
3930 	 * info the hard way since it isn't recorded in the cache file.
3931 	 *
3932 	 * Whenever we access the catalogs to read data, there is a possibility of
3933 	 * a shared-inval cache flush causing relcache entries to be removed.
3934 	 * Since hash_seq_search only guarantees to still work after the *current*
3935 	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
3936 	 * We handle this by restarting the scan from scratch after each access.
3937 	 * This is theoretically O(N^2), but the number of entries that actually
3938 	 * need to be fixed is small enough that it doesn't matter.
3939 	 */
3940 	hash_seq_init(&status, RelationIdCache);
3941 
3942 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3943 	{
3944 		Relation	relation = idhentry->reldesc;
3945 		bool		restart = false;
3946 
3947 		/*
3948 		 * Make sure *this* entry doesn't get flushed while we work with it.
3949 		 */
3950 		RelationIncrementReferenceCount(relation);
3951 
3952 		/*
3953 		 * If it's a faked-up entry, read the real pg_class tuple.
3954 		 */
3955 		if (relation->rd_rel->relowner == InvalidOid)
3956 		{
3957 			HeapTuple	htup;
3958 			Form_pg_class relp;
3959 
3960 			htup = SearchSysCache1(RELOID,
3961 								   ObjectIdGetDatum(RelationGetRelid(relation)));
3962 			if (!HeapTupleIsValid(htup))
3963 				elog(FATAL, "cache lookup failed for relation %u",
3964 					 RelationGetRelid(relation));
3965 			relp = (Form_pg_class) GETSTRUCT(htup);
3966 
3967 			/*
3968 			 * Copy tuple to relation->rd_rel. (See notes in
3969 			 * AllocateRelationDesc())
3970 			 */
3971 			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3972 
3973 			/* Update rd_options while we have the tuple */
3974 			if (relation->rd_options)
3975 				pfree(relation->rd_options);
3976 			RelationParseRelOptions(relation, htup);
3977 
3978 			/*
3979 			 * Check the values in rd_att were set up correctly.  (We cannot
3980 			 * just copy them over now: formrdesc must have set up the rd_att
3981 			 * data correctly to start with, because it may already have been
3982 			 * copied into one or more catcache entries.)
3983 			 */
3984 			Assert(relation->rd_att->tdtypeid == relp->reltype);
3985 			Assert(relation->rd_att->tdtypmod == -1);
3986 
3987 			ReleaseSysCache(htup);
3988 
3989 			/* relowner had better be OK now, else we'll loop forever */
3990 			if (relation->rd_rel->relowner == InvalidOid)
3991 				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
3992 					 RelationGetRelationName(relation));
3993 
3994 			restart = true;
3995 		}
3996 
3997 		/*
3998 		 * Fix data that isn't saved in relcache cache file.
3999 		 *
4000 		 * relhasrules or relhastriggers could possibly be wrong or out of
4001 		 * date.  If we don't actually find any rules or triggers, clear the
4002 		 * local copy of the flag so that we don't get into an infinite loop
4003 		 * here.  We don't make any attempt to fix the pg_class entry, though.
4004 		 */
4005 		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
4006 		{
4007 			RelationBuildRuleLock(relation);
4008 			if (relation->rd_rules == NULL)
4009 				relation->rd_rel->relhasrules = false;
4010 			restart = true;
4011 		}
4012 		if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
4013 		{
4014 			RelationBuildTriggers(relation);
4015 			if (relation->trigdesc == NULL)
4016 				relation->rd_rel->relhastriggers = false;
4017 			restart = true;
4018 		}
4019 
4020 		/*
4021 		 * Re-load the row security policies if the relation has them, since
4022 		 * they are not preserved in the cache.  Note that we can never NOT
4023 		 * have a policy while relrowsecurity is true,
4024 		 * RelationBuildRowSecurity will create a single default-deny policy
4025 		 * if there is no policy defined in pg_policy.
4026 		 */
4027 		if (relation->rd_rel->relrowsecurity && relation->rd_rsdesc == NULL)
4028 		{
4029 			RelationBuildRowSecurity(relation);
4030 
4031 			Assert(relation->rd_rsdesc != NULL);
4032 			restart = true;
4033 		}
4034 
4035 		/*
4036 		 * Reload the partition key and descriptor for a partitioned table.
4037 		 */
4038 		if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
4039 			relation->rd_partkey == NULL)
4040 		{
4041 			RelationBuildPartitionKey(relation);
4042 			Assert(relation->rd_partkey != NULL);
4043 
4044 			restart = true;
4045 		}
4046 
4047 		if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
4048 			relation->rd_partdesc == NULL)
4049 		{
4050 			RelationBuildPartitionDesc(relation);
4051 			Assert(relation->rd_partdesc != NULL);
4052 
4053 			restart = true;
4054 		}
4055 
4056 		if (relation->rd_tableam == NULL &&
4057 			(relation->rd_rel->relkind == RELKIND_RELATION ||
4058 			 relation->rd_rel->relkind == RELKIND_SEQUENCE ||
4059 			 relation->rd_rel->relkind == RELKIND_TOASTVALUE ||
4060 			 relation->rd_rel->relkind == RELKIND_MATVIEW))
4061 		{
4062 			RelationInitTableAccessMethod(relation);
4063 			Assert(relation->rd_tableam != NULL);
4064 
4065 			restart = true;
4066 		}
4067 
4068 		/* Release hold on the relation */
4069 		RelationDecrementReferenceCount(relation);
4070 
4071 		/* Now, restart the hashtable scan if needed */
4072 		if (restart)
4073 		{
4074 			hash_seq_term(&status);
4075 			hash_seq_init(&status, RelationIdCache);
4076 		}
4077 	}
4078 
4079 	/*
4080 	 * Lastly, write out new relcache cache files if needed.  We don't bother
4081 	 * to distinguish cases where only one of the two needs an update.
4082 	 */
4083 	if (needNewCacheFile)
4084 	{
4085 		/*
4086 		 * Force all the catcaches to finish initializing and thereby open the
4087 		 * catalogs and indexes they use.  This will preload the relcache with
4088 		 * entries for all the most important system catalogs and indexes, so
4089 		 * that the init files will be most useful for future backends.
4090 		 */
4091 		InitCatalogCachePhase2();
4092 
4093 		/* now write the files */
4094 		write_relcache_init_file(true);
4095 		write_relcache_init_file(false);
4096 	}
4097 }
4098 
4099 /*
4100  * Load one critical system index into the relcache
4101  *
4102  * indexoid is the OID of the target index, heapoid is the OID of the catalog
4103  * it belongs to.
4104  */
4105 static void
4106 load_critical_index(Oid indexoid, Oid heapoid)
4107 {
4108 	Relation	ird;
4109 
4110 	/*
4111 	 * We must lock the underlying catalog before locking the index to avoid
4112 	 * deadlock, since RelationBuildDesc might well need to read the catalog,
4113 	 * and if anyone else is exclusive-locking this catalog and index they'll
4114 	 * be doing it in that order.
4115 	 */
4116 	LockRelationOid(heapoid, AccessShareLock);
4117 	LockRelationOid(indexoid, AccessShareLock);
4118 	ird = RelationBuildDesc(indexoid, true);
4119 	if (ird == NULL)
4120 		elog(PANIC, "could not open critical system index %u", indexoid);
4121 	ird->rd_isnailed = true;
4122 	ird->rd_refcnt = 1;
4123 	UnlockRelationOid(indexoid, AccessShareLock);
4124 	UnlockRelationOid(heapoid, AccessShareLock);
4125 }
4126 
4127 /*
4128  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
4129  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
4130  *
4131  * We need this kluge because we have to be able to access non-fixed-width
4132  * fields of pg_class and pg_index before we have the standard catalog caches
4133  * available.  We use predefined data that's set up in just the same way as
4134  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
4135  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
4136  * does it have a TupleConstr field.  But it's good enough for the purpose of
4137  * extracting fields.
4138  */
4139 static TupleDesc
4140 BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs)
4141 {
4142 	TupleDesc	result;
4143 	MemoryContext oldcxt;
4144 	int			i;
4145 
4146 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4147 
4148 	result = CreateTemplateTupleDesc(natts);
4149 	result->tdtypeid = RECORDOID;	/* not right, but we don't care */
4150 	result->tdtypmod = -1;
4151 
4152 	for (i = 0; i < natts; i++)
4153 	{
4154 		memcpy(TupleDescAttr(result, i), &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
4155 		/* make sure attcacheoff is valid */
4156 		TupleDescAttr(result, i)->attcacheoff = -1;
4157 	}
4158 
4159 	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
4160 	TupleDescAttr(result, 0)->attcacheoff = 0;
4161 
4162 	/* Note: we don't bother to set up a TupleConstr entry */
4163 
4164 	MemoryContextSwitchTo(oldcxt);
4165 
4166 	return result;
4167 }
4168 
4169 static TupleDesc
4170 GetPgClassDescriptor(void)
4171 {
4172 	static TupleDesc pgclassdesc = NULL;
4173 
4174 	/* Already done? */
4175 	if (pgclassdesc == NULL)
4176 		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
4177 											   Desc_pg_class);
4178 
4179 	return pgclassdesc;
4180 }
4181 
4182 static TupleDesc
4183 GetPgIndexDescriptor(void)
4184 {
4185 	static TupleDesc pgindexdesc = NULL;
4186 
4187 	/* Already done? */
4188 	if (pgindexdesc == NULL)
4189 		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
4190 											   Desc_pg_index);
4191 
4192 	return pgindexdesc;
4193 }
4194 
4195 /*
4196  * Load any default attribute value definitions for the relation.
4197  */
4198 static void
4199 AttrDefaultFetch(Relation relation)
4200 {
4201 	AttrDefault *attrdef = relation->rd_att->constr->defval;
4202 	int			ndef = relation->rd_att->constr->num_defval;
4203 	Relation	adrel;
4204 	SysScanDesc adscan;
4205 	ScanKeyData skey;
4206 	HeapTuple	htup;
4207 	Datum		val;
4208 	bool		isnull;
4209 	int			found;
4210 	int			i;
4211 
4212 	ScanKeyInit(&skey,
4213 				Anum_pg_attrdef_adrelid,
4214 				BTEqualStrategyNumber, F_OIDEQ,
4215 				ObjectIdGetDatum(RelationGetRelid(relation)));
4216 
4217 	adrel = table_open(AttrDefaultRelationId, AccessShareLock);
4218 	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
4219 								NULL, 1, &skey);
4220 	found = 0;
4221 
4222 	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
4223 	{
4224 		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
4225 		Form_pg_attribute attr = TupleDescAttr(relation->rd_att, adform->adnum - 1);
4226 
4227 		for (i = 0; i < ndef; i++)
4228 		{
4229 			if (adform->adnum != attrdef[i].adnum)
4230 				continue;
4231 			if (attrdef[i].adbin != NULL)
4232 				elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
4233 					 NameStr(attr->attname),
4234 					 RelationGetRelationName(relation));
4235 			else
4236 				found++;
4237 
4238 			val = fastgetattr(htup,
4239 							  Anum_pg_attrdef_adbin,
4240 							  adrel->rd_att, &isnull);
4241 			if (isnull)
4242 				elog(WARNING, "null adbin for attr %s of rel %s",
4243 					 NameStr(attr->attname),
4244 					 RelationGetRelationName(relation));
4245 			else
4246 			{
4247 				/* detoast and convert to cstring in caller's context */
4248 				char	   *s = TextDatumGetCString(val);
4249 
4250 				attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext, s);
4251 				pfree(s);
4252 			}
4253 			break;
4254 		}
4255 
4256 		if (i >= ndef)
4257 			elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
4258 				 adform->adnum, RelationGetRelationName(relation));
4259 	}
4260 
4261 	systable_endscan(adscan);
4262 	table_close(adrel, AccessShareLock);
4263 }
4264 
4265 /*
4266  * Load any check constraints for the relation.
4267  */
4268 static void
4269 CheckConstraintFetch(Relation relation)
4270 {
4271 	ConstrCheck *check = relation->rd_att->constr->check;
4272 	int			ncheck = relation->rd_att->constr->num_check;
4273 	Relation	conrel;
4274 	SysScanDesc conscan;
4275 	ScanKeyData skey[1];
4276 	HeapTuple	htup;
4277 	int			found = 0;
4278 
4279 	ScanKeyInit(&skey[0],
4280 				Anum_pg_constraint_conrelid,
4281 				BTEqualStrategyNumber, F_OIDEQ,
4282 				ObjectIdGetDatum(RelationGetRelid(relation)));
4283 
4284 	conrel = table_open(ConstraintRelationId, AccessShareLock);
4285 	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4286 								 NULL, 1, skey);
4287 
4288 	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4289 	{
4290 		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
4291 		Datum		val;
4292 		bool		isnull;
4293 		char	   *s;
4294 
4295 		/* We want check constraints only */
4296 		if (conform->contype != CONSTRAINT_CHECK)
4297 			continue;
4298 
4299 		if (found >= ncheck)
4300 			elog(ERROR, "unexpected constraint record found for rel %s",
4301 				 RelationGetRelationName(relation));
4302 
4303 		check[found].ccvalid = conform->convalidated;
4304 		check[found].ccnoinherit = conform->connoinherit;
4305 		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
4306 												  NameStr(conform->conname));
4307 
4308 		/* Grab and test conbin is actually set */
4309 		val = fastgetattr(htup,
4310 						  Anum_pg_constraint_conbin,
4311 						  conrel->rd_att, &isnull);
4312 		if (isnull)
4313 			elog(ERROR, "null conbin for rel %s",
4314 				 RelationGetRelationName(relation));
4315 
4316 		/* detoast and convert to cstring in caller's context */
4317 		s = TextDatumGetCString(val);
4318 		check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
4319 		pfree(s);
4320 
4321 		found++;
4322 	}
4323 
4324 	systable_endscan(conscan);
4325 	table_close(conrel, AccessShareLock);
4326 
4327 	if (found != ncheck)
4328 		elog(ERROR, "%d constraint record(s) missing for rel %s",
4329 			 ncheck - found, RelationGetRelationName(relation));
4330 
4331 	/* Sort the records so that CHECKs are applied in a deterministic order */
4332 	if (ncheck > 1)
4333 		qsort(check, ncheck, sizeof(ConstrCheck), CheckConstraintCmp);
4334 }
4335 
4336 /*
4337  * qsort comparator to sort ConstrCheck entries by name
4338  */
4339 static int
4340 CheckConstraintCmp(const void *a, const void *b)
4341 {
4342 	const ConstrCheck *ca = (const ConstrCheck *) a;
4343 	const ConstrCheck *cb = (const ConstrCheck *) b;
4344 
4345 	return strcmp(ca->ccname, cb->ccname);
4346 }
4347 
4348 /*
4349  * RelationGetFKeyList -- get a list of foreign key info for the relation
4350  *
4351  * Returns a list of ForeignKeyCacheInfo structs, one per FK constraining
4352  * the given relation.  This data is a direct copy of relevant fields from
4353  * pg_constraint.  The list items are in no particular order.
4354  *
4355  * CAUTION: the returned list is part of the relcache's data, and could
4356  * vanish in a relcache entry reset.  Callers must inspect or copy it
4357  * before doing anything that might trigger a cache flush, such as
4358  * system catalog accesses.  copyObject() can be used if desired.
4359  * (We define it this way because current callers want to filter and
4360  * modify the list entries anyway, so copying would be a waste of time.)
4361  */
4362 List *
4363 RelationGetFKeyList(Relation relation)
4364 {
4365 	List	   *result;
4366 	Relation	conrel;
4367 	SysScanDesc conscan;
4368 	ScanKeyData skey;
4369 	HeapTuple	htup;
4370 	List	   *oldlist;
4371 	MemoryContext oldcxt;
4372 
4373 	/* Quick exit if we already computed the list. */
4374 	if (relation->rd_fkeyvalid)
4375 		return relation->rd_fkeylist;
4376 
4377 	/* Fast path: non-partitioned tables without triggers can't have FKs */
4378 	if (!relation->rd_rel->relhastriggers &&
4379 		relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
4380 		return NIL;
4381 
4382 	/*
4383 	 * We build the list we intend to return (in the caller's context) while
4384 	 * doing the scan.  After successfully completing the scan, we copy that
4385 	 * list into the relcache entry.  This avoids cache-context memory leakage
4386 	 * if we get some sort of error partway through.
4387 	 */
4388 	result = NIL;
4389 
4390 	/* Prepare to scan pg_constraint for entries having conrelid = this rel. */
4391 	ScanKeyInit(&skey,
4392 				Anum_pg_constraint_conrelid,
4393 				BTEqualStrategyNumber, F_OIDEQ,
4394 				ObjectIdGetDatum(RelationGetRelid(relation)));
4395 
4396 	conrel = table_open(ConstraintRelationId, AccessShareLock);
4397 	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4398 								 NULL, 1, &skey);
4399 
4400 	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4401 	{
4402 		Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup);
4403 		ForeignKeyCacheInfo *info;
4404 
4405 		/* consider only foreign keys */
4406 		if (constraint->contype != CONSTRAINT_FOREIGN)
4407 			continue;
4408 
4409 		info = makeNode(ForeignKeyCacheInfo);
4410 		info->conoid = constraint->oid;
4411 		info->conrelid = constraint->conrelid;
4412 		info->confrelid = constraint->confrelid;
4413 
4414 		DeconstructFkConstraintRow(htup, &info->nkeys,
4415 								   info->conkey,
4416 								   info->confkey,
4417 								   info->conpfeqop,
4418 								   NULL, NULL);
4419 
4420 		/* Add FK's node to the result list */
4421 		result = lappend(result, info);
4422 	}
4423 
4424 	systable_endscan(conscan);
4425 	table_close(conrel, AccessShareLock);
4426 
4427 	/* Now save a copy of the completed list in the relcache entry. */
4428 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4429 	oldlist = relation->rd_fkeylist;
4430 	relation->rd_fkeylist = copyObject(result);
4431 	relation->rd_fkeyvalid = true;
4432 	MemoryContextSwitchTo(oldcxt);
4433 
4434 	/* Don't leak the old list, if there is one */
4435 	list_free_deep(oldlist);
4436 
4437 	return result;
4438 }
4439 
4440 /*
4441  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
4442  *
4443  * The index list is created only if someone requests it.  We scan pg_index
4444  * to find relevant indexes, and add the list to the relcache entry so that
4445  * we won't have to compute it again.  Note that shared cache inval of a
4446  * relcache entry will delete the old list and set rd_indexvalid to false,
4447  * so that we must recompute the index list on next request.  This handles
4448  * creation or deletion of an index.
4449  *
4450  * Indexes that are marked not indislive are omitted from the returned list.
4451  * Such indexes are expected to be dropped momentarily, and should not be
4452  * touched at all by any caller of this function.
4453  *
4454  * The returned list is guaranteed to be sorted in order by OID.  This is
4455  * needed by the executor, since for index types that we obtain exclusive
4456  * locks on when updating the index, all backends must lock the indexes in
4457  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
4458  * consistent ordering would do, but ordering by OID is easy.
4459  *
4460  * Since shared cache inval causes the relcache's copy of the list to go away,
4461  * we return a copy of the list palloc'd in the caller's context.  The caller
4462  * may list_free() the returned list after scanning it. This is necessary
4463  * since the caller will typically be doing syscache lookups on the relevant
4464  * indexes, and syscache lookup could cause SI messages to be processed!
4465  *
4466  * In exactly the same way, we update rd_pkindex, which is the OID of the
4467  * relation's primary key index if any, else InvalidOid; and rd_replidindex,
4468  * which is the pg_class OID of an index to be used as the relation's
4469  * replication identity index, or InvalidOid if there is no such index.
4470  */
4471 List *
4472 RelationGetIndexList(Relation relation)
4473 {
4474 	Relation	indrel;
4475 	SysScanDesc indscan;
4476 	ScanKeyData skey;
4477 	HeapTuple	htup;
4478 	List	   *result;
4479 	List	   *oldlist;
4480 	char		replident = relation->rd_rel->relreplident;
4481 	Oid			pkeyIndex = InvalidOid;
4482 	Oid			candidateIndex = InvalidOid;
4483 	MemoryContext oldcxt;
4484 
4485 	/* Quick exit if we already computed the list. */
4486 	if (relation->rd_indexvalid)
4487 		return list_copy(relation->rd_indexlist);
4488 
4489 	/*
4490 	 * We build the list we intend to return (in the caller's context) while
4491 	 * doing the scan.  After successfully completing the scan, we copy that
4492 	 * list into the relcache entry.  This avoids cache-context memory leakage
4493 	 * if we get some sort of error partway through.
4494 	 */
4495 	result = NIL;
4496 
4497 	/* Prepare to scan pg_index for entries having indrelid = this rel. */
4498 	ScanKeyInit(&skey,
4499 				Anum_pg_index_indrelid,
4500 				BTEqualStrategyNumber, F_OIDEQ,
4501 				ObjectIdGetDatum(RelationGetRelid(relation)));
4502 
4503 	indrel = table_open(IndexRelationId, AccessShareLock);
4504 	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
4505 								 NULL, 1, &skey);
4506 
4507 	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4508 	{
4509 		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
4510 
4511 		/*
4512 		 * Ignore any indexes that are currently being dropped.  This will
4513 		 * prevent them from being searched, inserted into, or considered in
4514 		 * HOT-safety decisions.  It's unsafe to touch such an index at all
4515 		 * since its catalog entries could disappear at any instant.
4516 		 */
4517 		if (!index->indislive)
4518 			continue;
4519 
4520 		/* Add index's OID to result list in the proper order */
4521 		result = insert_ordered_oid(result, index->indexrelid);
4522 
4523 		/*
4524 		 * Invalid, non-unique, non-immediate or predicate indexes aren't
4525 		 * interesting for either oid indexes or replication identity indexes,
4526 		 * so don't check them.
4527 		 */
4528 		if (!index->indisvalid || !index->indisunique ||
4529 			!index->indimmediate ||
4530 			!heap_attisnull(htup, Anum_pg_index_indpred, NULL))
4531 			continue;
4532 
4533 		/* remember primary key index if any */
4534 		if (index->indisprimary)
4535 			pkeyIndex = index->indexrelid;
4536 
4537 		/* remember explicitly chosen replica index */
4538 		if (index->indisreplident)
4539 			candidateIndex = index->indexrelid;
4540 	}
4541 
4542 	systable_endscan(indscan);
4543 
4544 	table_close(indrel, AccessShareLock);
4545 
4546 	/* Now save a copy of the completed list in the relcache entry. */
4547 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4548 	oldlist = relation->rd_indexlist;
4549 	relation->rd_indexlist = list_copy(result);
4550 	relation->rd_pkindex = pkeyIndex;
4551 	if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex))
4552 		relation->rd_replidindex = pkeyIndex;
4553 	else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
4554 		relation->rd_replidindex = candidateIndex;
4555 	else
4556 		relation->rd_replidindex = InvalidOid;
4557 	relation->rd_indexvalid = true;
4558 	MemoryContextSwitchTo(oldcxt);
4559 
4560 	/* Don't leak the old list, if there is one */
4561 	list_free(oldlist);
4562 
4563 	return result;
4564 }
4565 
4566 /*
4567  * RelationGetStatExtList
4568  *		get a list of OIDs of statistics objects on this relation
4569  *
4570  * The statistics list is created only if someone requests it, in a way
4571  * similar to RelationGetIndexList().  We scan pg_statistic_ext to find
4572  * relevant statistics, and add the list to the relcache entry so that we
4573  * won't have to compute it again.  Note that shared cache inval of a
4574  * relcache entry will delete the old list and set rd_statvalid to 0,
4575  * so that we must recompute the statistics list on next request.  This
4576  * handles creation or deletion of a statistics object.
4577  *
4578  * The returned list is guaranteed to be sorted in order by OID, although
4579  * this is not currently needed.
4580  *
4581  * Since shared cache inval causes the relcache's copy of the list to go away,
4582  * we return a copy of the list palloc'd in the caller's context.  The caller
4583  * may list_free() the returned list after scanning it. This is necessary
4584  * since the caller will typically be doing syscache lookups on the relevant
4585  * statistics, and syscache lookup could cause SI messages to be processed!
4586  */
4587 List *
4588 RelationGetStatExtList(Relation relation)
4589 {
4590 	Relation	indrel;
4591 	SysScanDesc indscan;
4592 	ScanKeyData skey;
4593 	HeapTuple	htup;
4594 	List	   *result;
4595 	List	   *oldlist;
4596 	MemoryContext oldcxt;
4597 
4598 	/* Quick exit if we already computed the list. */
4599 	if (relation->rd_statvalid != 0)
4600 		return list_copy(relation->rd_statlist);
4601 
4602 	/*
4603 	 * We build the list we intend to return (in the caller's context) while
4604 	 * doing the scan.  After successfully completing the scan, we copy that
4605 	 * list into the relcache entry.  This avoids cache-context memory leakage
4606 	 * if we get some sort of error partway through.
4607 	 */
4608 	result = NIL;
4609 
4610 	/*
4611 	 * Prepare to scan pg_statistic_ext for entries having stxrelid = this
4612 	 * rel.
4613 	 */
4614 	ScanKeyInit(&skey,
4615 				Anum_pg_statistic_ext_stxrelid,
4616 				BTEqualStrategyNumber, F_OIDEQ,
4617 				ObjectIdGetDatum(RelationGetRelid(relation)));
4618 
4619 	indrel = table_open(StatisticExtRelationId, AccessShareLock);
4620 	indscan = systable_beginscan(indrel, StatisticExtRelidIndexId, true,
4621 								 NULL, 1, &skey);
4622 
4623 	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4624 	{
4625 		Oid			oid = ((Form_pg_statistic_ext) GETSTRUCT(htup))->oid;
4626 
4627 		result = insert_ordered_oid(result, oid);
4628 	}
4629 
4630 	systable_endscan(indscan);
4631 
4632 	table_close(indrel, AccessShareLock);
4633 
4634 	/* Now save a copy of the completed list in the relcache entry. */
4635 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4636 	oldlist = relation->rd_statlist;
4637 	relation->rd_statlist = list_copy(result);
4638 
4639 	relation->rd_statvalid = true;
4640 	MemoryContextSwitchTo(oldcxt);
4641 
4642 	/* Don't leak the old list, if there is one */
4643 	list_free(oldlist);
4644 
4645 	return result;
4646 }
4647 
4648 /*
4649  * insert_ordered_oid
4650  *		Insert a new Oid into a sorted list of Oids, preserving ordering
4651  *
4652  * Building the ordered list this way is O(N^2), but with a pretty small
4653  * constant, so for the number of entries we expect it will probably be
4654  * faster than trying to apply qsort().  Most tables don't have very many
4655  * indexes...
4656  */
4657 static List *
4658 insert_ordered_oid(List *list, Oid datum)
4659 {
4660 	ListCell   *prev;
4661 
4662 	/* Does the datum belong at the front? */
4663 	if (list == NIL || datum < linitial_oid(list))
4664 		return lcons_oid(datum, list);
4665 	/* No, so find the entry it belongs after */
4666 	prev = list_head(list);
4667 	for (;;)
4668 	{
4669 		ListCell   *curr = lnext(prev);
4670 
4671 		if (curr == NULL || datum < lfirst_oid(curr))
4672 			break;				/* it belongs after 'prev', before 'curr' */
4673 
4674 		prev = curr;
4675 	}
4676 	/* Insert datum into list after 'prev' */
4677 	lappend_cell_oid(list, prev, datum);
4678 	return list;
4679 }
4680 
4681 /*
4682  * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
4683  *
4684  * Returns InvalidOid if there is no such index.
4685  */
4686 Oid
4687 RelationGetPrimaryKeyIndex(Relation relation)
4688 {
4689 	List	   *ilist;
4690 
4691 	if (!relation->rd_indexvalid)
4692 	{
4693 		/* RelationGetIndexList does the heavy lifting. */
4694 		ilist = RelationGetIndexList(relation);
4695 		list_free(ilist);
4696 		Assert(relation->rd_indexvalid);
4697 	}
4698 
4699 	return relation->rd_pkindex;
4700 }
4701 
4702 /*
4703  * RelationGetReplicaIndex -- get OID of the relation's replica identity index
4704  *
4705  * Returns InvalidOid if there is no such index.
4706  */
4707 Oid
4708 RelationGetReplicaIndex(Relation relation)
4709 {
4710 	List	   *ilist;
4711 
4712 	if (!relation->rd_indexvalid)
4713 	{
4714 		/* RelationGetIndexList does the heavy lifting. */
4715 		ilist = RelationGetIndexList(relation);
4716 		list_free(ilist);
4717 		Assert(relation->rd_indexvalid);
4718 	}
4719 
4720 	return relation->rd_replidindex;
4721 }
4722 
4723 /*
4724  * RelationGetIndexExpressions -- get the index expressions for an index
4725  *
4726  * We cache the result of transforming pg_index.indexprs into a node tree.
4727  * If the rel is not an index or has no expressional columns, we return NIL.
4728  * Otherwise, the returned tree is copied into the caller's memory context.
4729  * (We don't want to return a pointer to the relcache copy, since it could
4730  * disappear due to relcache invalidation.)
4731  */
4732 List *
4733 RelationGetIndexExpressions(Relation relation)
4734 {
4735 	List	   *result;
4736 	Datum		exprsDatum;
4737 	bool		isnull;
4738 	char	   *exprsString;
4739 	MemoryContext oldcxt;
4740 
4741 	/* Quick exit if we already computed the result. */
4742 	if (relation->rd_indexprs)
4743 		return copyObject(relation->rd_indexprs);
4744 
4745 	/* Quick exit if there is nothing to do. */
4746 	if (relation->rd_indextuple == NULL ||
4747 		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
4748 		return NIL;
4749 
4750 	/*
4751 	 * We build the tree we intend to return in the caller's context. After
4752 	 * successfully completing the work, we copy it into the relcache entry.
4753 	 * This avoids problems if we get some sort of error partway through.
4754 	 */
4755 	exprsDatum = heap_getattr(relation->rd_indextuple,
4756 							  Anum_pg_index_indexprs,
4757 							  GetPgIndexDescriptor(),
4758 							  &isnull);
4759 	Assert(!isnull);
4760 	exprsString = TextDatumGetCString(exprsDatum);
4761 	result = (List *) stringToNode(exprsString);
4762 	pfree(exprsString);
4763 
4764 	/*
4765 	 * Run the expressions through eval_const_expressions. This is not just an
4766 	 * optimization, but is necessary, because the planner will be comparing
4767 	 * them to similarly-processed qual clauses, and may fail to detect valid
4768 	 * matches without this.  We must not use canonicalize_qual, however,
4769 	 * since these aren't qual expressions.
4770 	 */
4771 	result = (List *) eval_const_expressions(NULL, (Node *) result);
4772 
4773 	/* May as well fix opfuncids too */
4774 	fix_opfuncids((Node *) result);
4775 
4776 	/* Now save a copy of the completed tree in the relcache entry. */
4777 	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4778 	relation->rd_indexprs = copyObject(result);
4779 	MemoryContextSwitchTo(oldcxt);
4780 
4781 	return result;
4782 }
4783 
4784 /*
4785  * RelationGetDummyIndexExpressions -- get dummy expressions for an index
4786  *
4787  * Return a list of dummy expressions (just Const nodes) with the same
4788  * types/typmods/collations as the index's real expressions.  This is
4789  * useful in situations where we don't want to run any user-defined code.
4790  */
4791 List *
4792 RelationGetDummyIndexExpressions(Relation relation)
4793 {
4794 	List	   *result;
4795 	Datum		exprsDatum;
4796 	bool		isnull;
4797 	char	   *exprsString;
4798 	List	   *rawExprs;
4799 	ListCell   *lc;
4800 
4801 	/* Quick exit if there is nothing to do. */
4802 	if (relation->rd_indextuple == NULL ||
4803 		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
4804 		return NIL;
4805 
4806 	/* Extract raw node tree(s) from index tuple. */
4807 	exprsDatum = heap_getattr(relation->rd_indextuple,
4808 							  Anum_pg_index_indexprs,
4809 							  GetPgIndexDescriptor(),
4810 							  &isnull);
4811 	Assert(!isnull);
4812 	exprsString = TextDatumGetCString(exprsDatum);
4813 	rawExprs = (List *) stringToNode(exprsString);
4814 	pfree(exprsString);
4815 
4816 	/* Construct null Consts; the typlen and typbyval are arbitrary. */
4817 	result = NIL;
4818 	foreach(lc, rawExprs)
4819 	{
4820 		Node	   *rawExpr = (Node *) lfirst(lc);
4821 
4822 		result = lappend(result,
4823 						 makeConst(exprType(rawExpr),
4824 								   exprTypmod(rawExpr),
4825 								   exprCollation(rawExpr),
4826 								   1,
4827 								   (Datum) 0,
4828 								   true,
4829 								   true));
4830 	}
4831 
4832 	return result;
4833 }
4834 
4835 /*
4836  * RelationGetIndexPredicate -- get the index predicate for an index
4837  *
4838  * We cache the result of transforming pg_index.indpred into an implicit-AND
4839  * node tree (suitable for use in planning).
4840  * If the rel is not an index or has no predicate, we return NIL.
4841  * Otherwise, the returned tree is copied into the caller's memory context.
4842  * (We don't want to return a pointer to the relcache copy, since it could
4843  * disappear due to relcache invalidation.)
4844  */
4845 List *
4846 RelationGetIndexPredicate(Relation relation)
4847 {
4848 	List	   *result;
4849 	Datum		predDatum;
4850 	bool		isnull;
4851 	char	   *predString;
4852 	MemoryContext oldcxt;
4853 
4854 	/* Quick exit if we already computed the result. */
4855 	if (relation->rd_indpred)
4856 		return copyObject(relation->rd_indpred);
4857 
4858 	/* Quick exit if there is nothing to do. */
4859 	if (relation->rd_indextuple == NULL ||
4860 		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred, NULL))
4861 		return NIL;
4862 
4863 	/*
4864 	 * We build the tree we intend to return in the caller's context. After
4865 	 * successfully completing the work, we copy it into the relcache entry.
4866 	 * This avoids problems if we get some sort of error partway through.
4867 	 */
4868 	predDatum = heap_getattr(relation->rd_indextuple,
4869 							 Anum_pg_index_indpred,
4870 							 GetPgIndexDescriptor(),
4871 							 &isnull);
4872 	Assert(!isnull);
4873 	predString = TextDatumGetCString(predDatum);
4874 	result = (List *) stringToNode(predString);
4875 	pfree(predString);
4876 
4877 	/*
4878 	 * Run the expression through const-simplification and canonicalization.
4879 	 * This is not just an optimization, but is necessary, because the planner
4880 	 * will be comparing it to similarly-processed qual clauses, and may fail
4881 	 * to detect valid matches without this.  This must match the processing
4882 	 * done to qual clauses in preprocess_expression()!  (We can skip the
4883 	 * stuff involving subqueries, however, since we don't allow any in index
4884 	 * predicates.)
4885 	 */
4886 	result = (List *) eval_const_expressions(NULL, (Node *) result);
4887 
4888 	result = (List *) canonicalize_qual((Expr *) result, false);
4889 
4890 	/* Also convert to implicit-AND format */
4891 	result = make_ands_implicit((Expr *) result);
4892 
4893 	/* May as well fix opfuncids too */
4894 	fix_opfuncids((Node *) result);
4895 
4896 	/* Now save a copy of the completed tree in the relcache entry. */
4897 	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4898 	relation->rd_indpred = copyObject(result);
4899 	MemoryContextSwitchTo(oldcxt);
4900 
4901 	return result;
4902 }
4903 
4904 /*
4905  * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
4906  *
4907  * The result has a bit set for each attribute used anywhere in the index
4908  * definitions of all the indexes on this relation.  (This includes not only
4909  * simple index keys, but attributes used in expressions and partial-index
4910  * predicates.)
4911  *
4912  * Depending on attrKind, a bitmap covering the attnums for all index columns,
4913  * for all potential foreign key columns, or for all columns in the configured
4914  * replica identity index is returned.
4915  *
4916  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
4917  * we can include system attributes (e.g., OID) in the bitmap representation.
4918  *
4919  * Caller had better hold at least RowExclusiveLock on the target relation
4920  * to ensure it is safe (deadlock-free) for us to take locks on the relation's
4921  * indexes.  Note that since the introduction of CREATE INDEX CONCURRENTLY,
4922  * that lock level doesn't guarantee a stable set of indexes, so we have to
4923  * be prepared to retry here in case of a change in the set of indexes.
4924  *
4925  * The returned result is palloc'd in the caller's memory context and should
4926  * be bms_free'd when not needed anymore.
4927  */
4928 Bitmapset *
4929 RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
4930 {
4931 	Bitmapset  *indexattrs;		/* indexed columns */
4932 	Bitmapset  *uindexattrs;	/* columns in unique indexes */
4933 	Bitmapset  *pkindexattrs;	/* columns in the primary index */
4934 	Bitmapset  *idindexattrs;	/* columns in the replica identity */
4935 	List	   *indexoidlist;
4936 	List	   *newindexoidlist;
4937 	Oid			relpkindex;
4938 	Oid			relreplindex;
4939 	ListCell   *l;
4940 	MemoryContext oldcxt;
4941 
4942 	/* Quick exit if we already computed the result. */
4943 	if (relation->rd_indexattr != NULL)
4944 	{
4945 		switch (attrKind)
4946 		{
4947 			case INDEX_ATTR_BITMAP_ALL:
4948 				return bms_copy(relation->rd_indexattr);
4949 			case INDEX_ATTR_BITMAP_KEY:
4950 				return bms_copy(relation->rd_keyattr);
4951 			case INDEX_ATTR_BITMAP_PRIMARY_KEY:
4952 				return bms_copy(relation->rd_pkattr);
4953 			case INDEX_ATTR_BITMAP_IDENTITY_KEY:
4954 				return bms_copy(relation->rd_idattr);
4955 			default:
4956 				elog(ERROR, "unknown attrKind %u", attrKind);
4957 		}
4958 	}
4959 
4960 	/* Fast path if definitely no indexes */
4961 	if (!RelationGetForm(relation)->relhasindex)
4962 		return NULL;
4963 
4964 	/*
4965 	 * Get cached list of index OIDs. If we have to start over, we do so here.
4966 	 */
4967 restart:
4968 	indexoidlist = RelationGetIndexList(relation);
4969 
4970 	/* Fall out if no indexes (but relhasindex was set) */
4971 	if (indexoidlist == NIL)
4972 		return NULL;
4973 
4974 	/*
4975 	 * Copy the rd_pkindex and rd_replidindex values computed by
4976 	 * RelationGetIndexList before proceeding.  This is needed because a
4977 	 * relcache flush could occur inside index_open below, resetting the
4978 	 * fields managed by RelationGetIndexList.  We need to do the work with
4979 	 * stable values of these fields.
4980 	 */
4981 	relpkindex = relation->rd_pkindex;
4982 	relreplindex = relation->rd_replidindex;
4983 
4984 	/*
4985 	 * For each index, add referenced attributes to indexattrs.
4986 	 *
4987 	 * Note: we consider all indexes returned by RelationGetIndexList, even if
4988 	 * they are not indisready or indisvalid.  This is important because an
4989 	 * index for which CREATE INDEX CONCURRENTLY has just started must be
4990 	 * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
4991 	 * CONCURRENTLY is far enough along that we should ignore the index, it
4992 	 * won't be returned at all by RelationGetIndexList.
4993 	 */
4994 	indexattrs = NULL;
4995 	uindexattrs = NULL;
4996 	pkindexattrs = NULL;
4997 	idindexattrs = NULL;
4998 	foreach(l, indexoidlist)
4999 	{
5000 		Oid			indexOid = lfirst_oid(l);
5001 		Relation	indexDesc;
5002 		Datum		datum;
5003 		bool		isnull;
5004 		Node	   *indexExpressions;
5005 		Node	   *indexPredicate;
5006 		int			i;
5007 		bool		isKey;		/* candidate key */
5008 		bool		isPK;		/* primary key */
5009 		bool		isIDKey;	/* replica identity index */
5010 
5011 		indexDesc = index_open(indexOid, AccessShareLock);
5012 
5013 		/*
5014 		 * Extract index expressions and index predicate.  Note: Don't use
5015 		 * RelationGetIndexExpressions()/RelationGetIndexPredicate(), because
5016 		 * those might run constant expressions evaluation, which needs a
5017 		 * snapshot, which we might not have here.  (Also, it's probably more
5018 		 * sound to collect the bitmaps before any transformations that might
5019 		 * eliminate columns, but the practical impact of this is limited.)
5020 		 */
5021 
5022 		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indexprs,
5023 							 GetPgIndexDescriptor(), &isnull);
5024 		if (!isnull)
5025 			indexExpressions = stringToNode(TextDatumGetCString(datum));
5026 		else
5027 			indexExpressions = NULL;
5028 
5029 		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indpred,
5030 							 GetPgIndexDescriptor(), &isnull);
5031 		if (!isnull)
5032 			indexPredicate = stringToNode(TextDatumGetCString(datum));
5033 		else
5034 			indexPredicate = NULL;
5035 
5036 		/* Can this index be referenced by a foreign key? */
5037 		isKey = indexDesc->rd_index->indisunique &&
5038 			indexExpressions == NULL &&
5039 			indexPredicate == NULL;
5040 
5041 		/* Is this a primary key? */
5042 		isPK = (indexOid == relpkindex);
5043 
5044 		/* Is this index the configured (or default) replica identity? */
5045 		isIDKey = (indexOid == relreplindex);
5046 
5047 		/* Collect simple attribute references */
5048 		for (i = 0; i < indexDesc->rd_index->indnatts; i++)
5049 		{
5050 			int			attrnum = indexDesc->rd_index->indkey.values[i];
5051 
5052 			/*
5053 			 * Since we have covering indexes with non-key columns, we must
5054 			 * handle them accurately here. non-key columns must be added into
5055 			 * indexattrs, since they are in index, and HOT-update shouldn't
5056 			 * miss them. Obviously, non-key columns couldn't be referenced by
5057 			 * foreign key or identity key. Hence we do not include them into
5058 			 * uindexattrs, pkindexattrs and idindexattrs bitmaps.
5059 			 */
5060 			if (attrnum != 0)
5061 			{
5062 				indexattrs = bms_add_member(indexattrs,
5063 											attrnum - FirstLowInvalidHeapAttributeNumber);
5064 
5065 				if (isKey && i < indexDesc->rd_index->indnkeyatts)
5066 					uindexattrs = bms_add_member(uindexattrs,
5067 												 attrnum - FirstLowInvalidHeapAttributeNumber);
5068 
5069 				if (isPK && i < indexDesc->rd_index->indnkeyatts)
5070 					pkindexattrs = bms_add_member(pkindexattrs,
5071 												  attrnum - FirstLowInvalidHeapAttributeNumber);
5072 
5073 				if (isIDKey && i < indexDesc->rd_index->indnkeyatts)
5074 					idindexattrs = bms_add_member(idindexattrs,
5075 												  attrnum - FirstLowInvalidHeapAttributeNumber);
5076 			}
5077 		}
5078 
5079 		/* Collect all attributes used in expressions, too */
5080 		pull_varattnos(indexExpressions, 1, &indexattrs);
5081 
5082 		/* Collect all attributes in the index predicate, too */
5083 		pull_varattnos(indexPredicate, 1, &indexattrs);
5084 
5085 		index_close(indexDesc, AccessShareLock);
5086 	}
5087 
5088 	/*
5089 	 * During one of the index_opens in the above loop, we might have received
5090 	 * a relcache flush event on this relcache entry, which might have been
5091 	 * signaling a change in the rel's index list.  If so, we'd better start
5092 	 * over to ensure we deliver up-to-date attribute bitmaps.
5093 	 */
5094 	newindexoidlist = RelationGetIndexList(relation);
5095 	if (equal(indexoidlist, newindexoidlist) &&
5096 		relpkindex == relation->rd_pkindex &&
5097 		relreplindex == relation->rd_replidindex)
5098 	{
5099 		/* Still the same index set, so proceed */
5100 		list_free(newindexoidlist);
5101 		list_free(indexoidlist);
5102 	}
5103 	else
5104 	{
5105 		/* Gotta do it over ... might as well not leak memory */
5106 		list_free(newindexoidlist);
5107 		list_free(indexoidlist);
5108 		bms_free(uindexattrs);
5109 		bms_free(pkindexattrs);
5110 		bms_free(idindexattrs);
5111 		bms_free(indexattrs);
5112 
5113 		goto restart;
5114 	}
5115 
5116 	/* Don't leak the old values of these bitmaps, if any */
5117 	bms_free(relation->rd_indexattr);
5118 	relation->rd_indexattr = NULL;
5119 	bms_free(relation->rd_keyattr);
5120 	relation->rd_keyattr = NULL;
5121 	bms_free(relation->rd_pkattr);
5122 	relation->rd_pkattr = NULL;
5123 	bms_free(relation->rd_idattr);
5124 	relation->rd_idattr = NULL;
5125 
5126 	/*
5127 	 * Now save copies of the bitmaps in the relcache entry.  We intentionally
5128 	 * set rd_indexattr last, because that's the one that signals validity of
5129 	 * the values; if we run out of memory before making that copy, we won't
5130 	 * leave the relcache entry looking like the other ones are valid but
5131 	 * empty.
5132 	 */
5133 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5134 	relation->rd_keyattr = bms_copy(uindexattrs);
5135 	relation->rd_pkattr = bms_copy(pkindexattrs);
5136 	relation->rd_idattr = bms_copy(idindexattrs);
5137 	relation->rd_indexattr = bms_copy(indexattrs);
5138 	MemoryContextSwitchTo(oldcxt);
5139 
5140 	/* We return our original working copy for caller to play with */
5141 	switch (attrKind)
5142 	{
5143 		case INDEX_ATTR_BITMAP_ALL:
5144 			return indexattrs;
5145 		case INDEX_ATTR_BITMAP_KEY:
5146 			return uindexattrs;
5147 		case INDEX_ATTR_BITMAP_PRIMARY_KEY:
5148 			return pkindexattrs;
5149 		case INDEX_ATTR_BITMAP_IDENTITY_KEY:
5150 			return idindexattrs;
5151 		default:
5152 			elog(ERROR, "unknown attrKind %u", attrKind);
5153 			return NULL;
5154 	}
5155 }
5156 
5157 /*
5158  * RelationGetExclusionInfo -- get info about index's exclusion constraint
5159  *
5160  * This should be called only for an index that is known to have an
5161  * associated exclusion constraint.  It returns arrays (palloc'd in caller's
5162  * context) of the exclusion operator OIDs, their underlying functions'
5163  * OIDs, and their strategy numbers in the index's opclasses.  We cache
5164  * all this information since it requires a fair amount of work to get.
5165  */
5166 void
5167 RelationGetExclusionInfo(Relation indexRelation,
5168 						 Oid **operators,
5169 						 Oid **procs,
5170 						 uint16 **strategies)
5171 {
5172 	int			indnkeyatts;
5173 	Oid		   *ops;
5174 	Oid		   *funcs;
5175 	uint16	   *strats;
5176 	Relation	conrel;
5177 	SysScanDesc conscan;
5178 	ScanKeyData skey[1];
5179 	HeapTuple	htup;
5180 	bool		found;
5181 	MemoryContext oldcxt;
5182 	int			i;
5183 
5184 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
5185 
5186 	/* Allocate result space in caller context */
5187 	*operators = ops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5188 	*procs = funcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5189 	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
5190 
5191 	/* Quick exit if we have the data cached already */
5192 	if (indexRelation->rd_exclstrats != NULL)
5193 	{
5194 		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * indnkeyatts);
5195 		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * indnkeyatts);
5196 		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * indnkeyatts);
5197 		return;
5198 	}
5199 
5200 	/*
5201 	 * Search pg_constraint for the constraint associated with the index. To
5202 	 * make this not too painfully slow, we use the index on conrelid; that
5203 	 * will hold the parent relation's OID not the index's own OID.
5204 	 *
5205 	 * Note: if we wanted to rely on the constraint name matching the index's
5206 	 * name, we could just do a direct lookup using pg_constraint's unique
5207 	 * index.  For the moment it doesn't seem worth requiring that.
5208 	 */
5209 	ScanKeyInit(&skey[0],
5210 				Anum_pg_constraint_conrelid,
5211 				BTEqualStrategyNumber, F_OIDEQ,
5212 				ObjectIdGetDatum(indexRelation->rd_index->indrelid));
5213 
5214 	conrel = table_open(ConstraintRelationId, AccessShareLock);
5215 	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
5216 								 NULL, 1, skey);
5217 	found = false;
5218 
5219 	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
5220 	{
5221 		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
5222 		Datum		val;
5223 		bool		isnull;
5224 		ArrayType  *arr;
5225 		int			nelem;
5226 
5227 		/* We want the exclusion constraint owning the index */
5228 		if (conform->contype != CONSTRAINT_EXCLUSION ||
5229 			conform->conindid != RelationGetRelid(indexRelation))
5230 			continue;
5231 
5232 		/* There should be only one */
5233 		if (found)
5234 			elog(ERROR, "unexpected exclusion constraint record found for rel %s",
5235 				 RelationGetRelationName(indexRelation));
5236 		found = true;
5237 
5238 		/* Extract the operator OIDS from conexclop */
5239 		val = fastgetattr(htup,
5240 						  Anum_pg_constraint_conexclop,
5241 						  conrel->rd_att, &isnull);
5242 		if (isnull)
5243 			elog(ERROR, "null conexclop for rel %s",
5244 				 RelationGetRelationName(indexRelation));
5245 
5246 		arr = DatumGetArrayTypeP(val);	/* ensure not toasted */
5247 		nelem = ARR_DIMS(arr)[0];
5248 		if (ARR_NDIM(arr) != 1 ||
5249 			nelem != indnkeyatts ||
5250 			ARR_HASNULL(arr) ||
5251 			ARR_ELEMTYPE(arr) != OIDOID)
5252 			elog(ERROR, "conexclop is not a 1-D Oid array");
5253 
5254 		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * indnkeyatts);
5255 	}
5256 
5257 	systable_endscan(conscan);
5258 	table_close(conrel, AccessShareLock);
5259 
5260 	if (!found)
5261 		elog(ERROR, "exclusion constraint record missing for rel %s",
5262 			 RelationGetRelationName(indexRelation));
5263 
5264 	/* We need the func OIDs and strategy numbers too */
5265 	for (i = 0; i < indnkeyatts; i++)
5266 	{
5267 		funcs[i] = get_opcode(ops[i]);
5268 		strats[i] = get_op_opfamily_strategy(ops[i],
5269 											 indexRelation->rd_opfamily[i]);
5270 		/* shouldn't fail, since it was checked at index creation */
5271 		if (strats[i] == InvalidStrategy)
5272 			elog(ERROR, "could not find strategy for operator %u in family %u",
5273 				 ops[i], indexRelation->rd_opfamily[i]);
5274 	}
5275 
5276 	/* Save a copy of the results in the relcache entry. */
5277 	oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
5278 	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5279 	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5280 	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
5281 	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * indnkeyatts);
5282 	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * indnkeyatts);
5283 	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * indnkeyatts);
5284 	MemoryContextSwitchTo(oldcxt);
5285 }
5286 
5287 /*
5288  * Get publication actions for the given relation.
5289  */
5290 struct PublicationActions *
5291 GetRelationPublicationActions(Relation relation)
5292 {
5293 	List	   *puboids;
5294 	ListCell   *lc;
5295 	MemoryContext oldcxt;
5296 	PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
5297 
5298 	/*
5299 	 * If not publishable, it publishes no actions.  (pgoutput_change() will
5300 	 * ignore it.)
5301 	 */
5302 	if (!is_publishable_relation(relation))
5303 		return pubactions;
5304 
5305 	if (relation->rd_pubactions)
5306 		return memcpy(pubactions, relation->rd_pubactions,
5307 					  sizeof(PublicationActions));
5308 
5309 	/* Fetch the publication membership info. */
5310 	puboids = GetRelationPublications(RelationGetRelid(relation));
5311 	puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
5312 
5313 	foreach(lc, puboids)
5314 	{
5315 		Oid			pubid = lfirst_oid(lc);
5316 		HeapTuple	tup;
5317 		Form_pg_publication pubform;
5318 
5319 		tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
5320 
5321 		if (!HeapTupleIsValid(tup))
5322 			elog(ERROR, "cache lookup failed for publication %u", pubid);
5323 
5324 		pubform = (Form_pg_publication) GETSTRUCT(tup);
5325 
5326 		pubactions->pubinsert |= pubform->pubinsert;
5327 		pubactions->pubupdate |= pubform->pubupdate;
5328 		pubactions->pubdelete |= pubform->pubdelete;
5329 		pubactions->pubtruncate |= pubform->pubtruncate;
5330 
5331 		ReleaseSysCache(tup);
5332 
5333 		/*
5334 		 * If we know everything is replicated, there is no point to check for
5335 		 * other publications.
5336 		 */
5337 		if (pubactions->pubinsert && pubactions->pubupdate &&
5338 			pubactions->pubdelete && pubactions->pubtruncate)
5339 			break;
5340 	}
5341 
5342 	if (relation->rd_pubactions)
5343 	{
5344 		pfree(relation->rd_pubactions);
5345 		relation->rd_pubactions = NULL;
5346 	}
5347 
5348 	/* Now save copy of the actions in the relcache entry. */
5349 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5350 	relation->rd_pubactions = palloc(sizeof(PublicationActions));
5351 	memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
5352 	MemoryContextSwitchTo(oldcxt);
5353 
5354 	return pubactions;
5355 }
5356 
5357 /*
5358  * Routines to support ereport() reports of relation-related errors
5359  *
5360  * These could have been put into elog.c, but it seems like a module layering
5361  * violation to have elog.c calling relcache or syscache stuff --- and we
5362  * definitely don't want elog.h including rel.h.  So we put them here.
5363  */
5364 
5365 /*
5366  * errtable --- stores schema_name and table_name of a table
5367  * within the current errordata.
5368  */
5369 int
5370 errtable(Relation rel)
5371 {
5372 	err_generic_string(PG_DIAG_SCHEMA_NAME,
5373 					   get_namespace_name(RelationGetNamespace(rel)));
5374 	err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));
5375 
5376 	return 0;					/* return value does not matter */
5377 }
5378 
5379 /*
5380  * errtablecol --- stores schema_name, table_name and column_name
5381  * of a table column within the current errordata.
5382  *
5383  * The column is specified by attribute number --- for most callers, this is
5384  * easier and less error-prone than getting the column name for themselves.
5385  */
5386 int
5387 errtablecol(Relation rel, int attnum)
5388 {
5389 	TupleDesc	reldesc = RelationGetDescr(rel);
5390 	const char *colname;
5391 
5392 	/* Use reldesc if it's a user attribute, else consult the catalogs */
5393 	if (attnum > 0 && attnum <= reldesc->natts)
5394 		colname = NameStr(TupleDescAttr(reldesc, attnum - 1)->attname);
5395 	else
5396 		colname = get_attname(RelationGetRelid(rel), attnum, false);
5397 
5398 	return errtablecolname(rel, colname);
5399 }
5400 
5401 /*
5402  * errtablecolname --- stores schema_name, table_name and column_name
5403  * of a table column within the current errordata, where the column name is
5404  * given directly rather than extracted from the relation's catalog data.
5405  *
5406  * Don't use this directly unless errtablecol() is inconvenient for some
5407  * reason.  This might possibly be needed during intermediate states in ALTER
5408  * TABLE, for instance.
5409  */
5410 int
5411 errtablecolname(Relation rel, const char *colname)
5412 {
5413 	errtable(rel);
5414 	err_generic_string(PG_DIAG_COLUMN_NAME, colname);
5415 
5416 	return 0;					/* return value does not matter */
5417 }
5418 
5419 /*
5420  * errtableconstraint --- stores schema_name, table_name and constraint_name
5421  * of a table-related constraint within the current errordata.
5422  */
5423 int
5424 errtableconstraint(Relation rel, const char *conname)
5425 {
5426 	errtable(rel);
5427 	err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);
5428 
5429 	return 0;					/* return value does not matter */
5430 }
5431 
5432 
5433 /*
5434  *	load_relcache_init_file, write_relcache_init_file
5435  *
5436  *		In late 1992, we started regularly having databases with more than
5437  *		a thousand classes in them.  With this number of classes, it became
5438  *		critical to do indexed lookups on the system catalogs.
5439  *
5440  *		Bootstrapping these lookups is very hard.  We want to be able to
5441  *		use an index on pg_attribute, for example, but in order to do so,
5442  *		we must have read pg_attribute for the attributes in the index,
5443  *		which implies that we need to use the index.
5444  *
5445  *		In order to get around the problem, we do the following:
5446  *
5447  *		   +  When the database system is initialized (at initdb time), we
5448  *			  don't use indexes.  We do sequential scans.
5449  *
5450  *		   +  When the backend is started up in normal mode, we load an image
5451  *			  of the appropriate relation descriptors, in internal format,
5452  *			  from an initialization file in the data/base/... directory.
5453  *
5454  *		   +  If the initialization file isn't there, then we create the
5455  *			  relation descriptors using sequential scans and write 'em to
5456  *			  the initialization file for use by subsequent backends.
5457  *
5458  *		As of Postgres 9.0, there is one local initialization file in each
5459  *		database, plus one shared initialization file for shared catalogs.
5460  *
5461  *		We could dispense with the initialization files and just build the
5462  *		critical reldescs the hard way on every backend startup, but that
5463  *		slows down backend startup noticeably.
5464  *
5465  *		We can in fact go further, and save more relcache entries than
5466  *		just the ones that are absolutely critical; this allows us to speed
5467  *		up backend startup by not having to build such entries the hard way.
5468  *		Presently, all the catalog and index entries that are referred to
5469  *		by catcaches are stored in the initialization files.
5470  *
5471  *		The same mechanism that detects when catcache and relcache entries
5472  *		need to be invalidated (due to catalog updates) also arranges to
5473  *		unlink the initialization files when the contents may be out of date.
5474  *		The files will then be rebuilt during the next backend startup.
5475  */
5476 
5477 /*
5478  * load_relcache_init_file -- attempt to load cache from the shared
5479  * or local cache init file
5480  *
5481  * If successful, return true and set criticalRelcachesBuilt or
5482  * criticalSharedRelcachesBuilt to true.
5483  * If not successful, return false.
5484  *
5485  * NOTE: we assume we are already switched into CacheMemoryContext.
5486  */
5487 static bool
5488 load_relcache_init_file(bool shared)
5489 {
5490 	FILE	   *fp;
5491 	char		initfilename[MAXPGPATH];
5492 	Relation   *rels;
5493 	int			relno,
5494 				num_rels,
5495 				max_rels,
5496 				nailed_rels,
5497 				nailed_indexes,
5498 				magic;
5499 	int			i;
5500 
5501 	if (shared)
5502 		snprintf(initfilename, sizeof(initfilename), "global/%s",
5503 				 RELCACHE_INIT_FILENAME);
5504 	else
5505 		snprintf(initfilename, sizeof(initfilename), "%s/%s",
5506 				 DatabasePath, RELCACHE_INIT_FILENAME);
5507 
5508 	fp = AllocateFile(initfilename, PG_BINARY_R);
5509 	if (fp == NULL)
5510 		return false;
5511 
5512 	/*
5513 	 * Read the index relcache entries from the file.  Note we will not enter
5514 	 * any of them into the cache if the read fails partway through; this
5515 	 * helps to guard against broken init files.
5516 	 */
5517 	max_rels = 100;
5518 	rels = (Relation *) palloc(max_rels * sizeof(Relation));
5519 	num_rels = 0;
5520 	nailed_rels = nailed_indexes = 0;
5521 
5522 	/* check for correct magic number (compatible version) */
5523 	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
5524 		goto read_failed;
5525 	if (magic != RELCACHE_INIT_FILEMAGIC)
5526 		goto read_failed;
5527 
5528 	for (relno = 0;; relno++)
5529 	{
5530 		Size		len;
5531 		size_t		nread;
5532 		Relation	rel;
5533 		Form_pg_class relform;
5534 		bool		has_not_null;
5535 
5536 		/* first read the relation descriptor length */
5537 		nread = fread(&len, 1, sizeof(len), fp);
5538 		if (nread != sizeof(len))
5539 		{
5540 			if (nread == 0)
5541 				break;			/* end of file */
5542 			goto read_failed;
5543 		}
5544 
5545 		/* safety check for incompatible relcache layout */
5546 		if (len != sizeof(RelationData))
5547 			goto read_failed;
5548 
5549 		/* allocate another relcache header */
5550 		if (num_rels >= max_rels)
5551 		{
5552 			max_rels *= 2;
5553 			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
5554 		}
5555 
5556 		rel = rels[num_rels++] = (Relation) palloc(len);
5557 
5558 		/* then, read the Relation structure */
5559 		if (fread(rel, 1, len, fp) != len)
5560 			goto read_failed;
5561 
5562 		/* next read the relation tuple form */
5563 		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5564 			goto read_failed;
5565 
5566 		relform = (Form_pg_class) palloc(len);
5567 		if (fread(relform, 1, len, fp) != len)
5568 			goto read_failed;
5569 
5570 		rel->rd_rel = relform;
5571 
5572 		/* initialize attribute tuple forms */
5573 		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts);
5574 		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
5575 
5576 		rel->rd_att->tdtypeid = relform->reltype;
5577 		rel->rd_att->tdtypmod = -1; /* unnecessary, but... */
5578 
5579 		/* next read all the attribute tuple form data entries */
5580 		has_not_null = false;
5581 		for (i = 0; i < relform->relnatts; i++)
5582 		{
5583 			Form_pg_attribute attr = TupleDescAttr(rel->rd_att, i);
5584 
5585 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5586 				goto read_failed;
5587 			if (len != ATTRIBUTE_FIXED_PART_SIZE)
5588 				goto read_failed;
5589 			if (fread(attr, 1, len, fp) != len)
5590 				goto read_failed;
5591 
5592 			has_not_null |= attr->attnotnull;
5593 		}
5594 
5595 		/* next read the access method specific field */
5596 		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5597 			goto read_failed;
5598 		if (len > 0)
5599 		{
5600 			rel->rd_options = palloc(len);
5601 			if (fread(rel->rd_options, 1, len, fp) != len)
5602 				goto read_failed;
5603 			if (len != VARSIZE(rel->rd_options))
5604 				goto read_failed;	/* sanity check */
5605 		}
5606 		else
5607 		{
5608 			rel->rd_options = NULL;
5609 		}
5610 
5611 		/* mark not-null status */
5612 		if (has_not_null)
5613 		{
5614 			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
5615 
5616 			constr->has_not_null = true;
5617 			rel->rd_att->constr = constr;
5618 		}
5619 
5620 		/*
5621 		 * If it's an index, there's more to do.  Note we explicitly ignore
5622 		 * partitioned indexes here.
5623 		 */
5624 		if (rel->rd_rel->relkind == RELKIND_INDEX)
5625 		{
5626 			MemoryContext indexcxt;
5627 			Oid		   *opfamily;
5628 			Oid		   *opcintype;
5629 			RegProcedure *support;
5630 			int			nsupport;
5631 			int16	   *indoption;
5632 			Oid		   *indcollation;
5633 
5634 			/* Count nailed indexes to ensure we have 'em all */
5635 			if (rel->rd_isnailed)
5636 				nailed_indexes++;
5637 
5638 			/* next, read the pg_index tuple */
5639 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5640 				goto read_failed;
5641 
5642 			rel->rd_indextuple = (HeapTuple) palloc(len);
5643 			if (fread(rel->rd_indextuple, 1, len, fp) != len)
5644 				goto read_failed;
5645 
5646 			/* Fix up internal pointers in the tuple -- see heap_copytuple */
5647 			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
5648 			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
5649 
5650 			/*
5651 			 * prepare index info context --- parameters should match
5652 			 * RelationInitIndexAccessInfo
5653 			 */
5654 			indexcxt = AllocSetContextCreate(CacheMemoryContext,
5655 											 "index info",
5656 											 ALLOCSET_SMALL_SIZES);
5657 			rel->rd_indexcxt = indexcxt;
5658 			MemoryContextCopyAndSetIdentifier(indexcxt,
5659 											  RelationGetRelationName(rel));
5660 
5661 			/*
5662 			 * Now we can fetch the index AM's API struct.  (We can't store
5663 			 * that in the init file, since it contains function pointers that
5664 			 * might vary across server executions.  Fortunately, it should be
5665 			 * safe to call the amhandler even while bootstrapping indexes.)
5666 			 */
5667 			InitIndexAmRoutine(rel);
5668 
5669 			/* next, read the vector of opfamily OIDs */
5670 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5671 				goto read_failed;
5672 
5673 			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
5674 			if (fread(opfamily, 1, len, fp) != len)
5675 				goto read_failed;
5676 
5677 			rel->rd_opfamily = opfamily;
5678 
5679 			/* next, read the vector of opcintype OIDs */
5680 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5681 				goto read_failed;
5682 
5683 			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
5684 			if (fread(opcintype, 1, len, fp) != len)
5685 				goto read_failed;
5686 
5687 			rel->rd_opcintype = opcintype;
5688 
5689 			/* next, read the vector of support procedure OIDs */
5690 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5691 				goto read_failed;
5692 			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
5693 			if (fread(support, 1, len, fp) != len)
5694 				goto read_failed;
5695 
5696 			rel->rd_support = support;
5697 
5698 			/* next, read the vector of collation OIDs */
5699 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5700 				goto read_failed;
5701 
5702 			indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
5703 			if (fread(indcollation, 1, len, fp) != len)
5704 				goto read_failed;
5705 
5706 			rel->rd_indcollation = indcollation;
5707 
5708 			/* finally, read the vector of indoption values */
5709 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5710 				goto read_failed;
5711 
5712 			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
5713 			if (fread(indoption, 1, len, fp) != len)
5714 				goto read_failed;
5715 
5716 			rel->rd_indoption = indoption;
5717 
5718 			/* set up zeroed fmgr-info vector */
5719 			nsupport = relform->relnatts * rel->rd_indam->amsupport;
5720 			rel->rd_supportinfo = (FmgrInfo *)
5721 				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
5722 		}
5723 		else
5724 		{
5725 			/* Count nailed rels to ensure we have 'em all */
5726 			if (rel->rd_isnailed)
5727 				nailed_rels++;
5728 
5729 			/* Load table AM data */
5730 			if (rel->rd_rel->relkind == RELKIND_RELATION ||
5731 				rel->rd_rel->relkind == RELKIND_SEQUENCE ||
5732 				rel->rd_rel->relkind == RELKIND_TOASTVALUE ||
5733 				rel->rd_rel->relkind == RELKIND_MATVIEW)
5734 				RelationInitTableAccessMethod(rel);
5735 
5736 			Assert(rel->rd_index == NULL);
5737 			Assert(rel->rd_indextuple == NULL);
5738 			Assert(rel->rd_indexcxt == NULL);
5739 			Assert(rel->rd_indam == NULL);
5740 			Assert(rel->rd_opfamily == NULL);
5741 			Assert(rel->rd_opcintype == NULL);
5742 			Assert(rel->rd_support == NULL);
5743 			Assert(rel->rd_supportinfo == NULL);
5744 			Assert(rel->rd_indoption == NULL);
5745 			Assert(rel->rd_indcollation == NULL);
5746 		}
5747 
5748 		/*
5749 		 * Rules and triggers are not saved (mainly because the internal
5750 		 * format is complex and subject to change).  They must be rebuilt if
5751 		 * needed by RelationCacheInitializePhase3.  This is not expected to
5752 		 * be a big performance hit since few system catalogs have such. Ditto
5753 		 * for RLS policy data, partition info, index expressions, predicates,
5754 		 * exclusion info, and FDW info.
5755 		 */
5756 		rel->rd_rules = NULL;
5757 		rel->rd_rulescxt = NULL;
5758 		rel->trigdesc = NULL;
5759 		rel->rd_rsdesc = NULL;
5760 		rel->rd_partkey = NULL;
5761 		rel->rd_partkeycxt = NULL;
5762 		rel->rd_partdesc = NULL;
5763 		rel->rd_pdcxt = NULL;
5764 		rel->rd_partcheck = NIL;
5765 		rel->rd_partcheckvalid = false;
5766 		rel->rd_partcheckcxt = NULL;
5767 		rel->rd_indexprs = NIL;
5768 		rel->rd_indpred = NIL;
5769 		rel->rd_exclops = NULL;
5770 		rel->rd_exclprocs = NULL;
5771 		rel->rd_exclstrats = NULL;
5772 		rel->rd_fdwroutine = NULL;
5773 
5774 		/*
5775 		 * Reset transient-state fields in the relcache entry
5776 		 */
5777 		rel->rd_smgr = NULL;
5778 		if (rel->rd_isnailed)
5779 			rel->rd_refcnt = 1;
5780 		else
5781 			rel->rd_refcnt = 0;
5782 		rel->rd_indexvalid = false;
5783 		rel->rd_indexlist = NIL;
5784 		rel->rd_pkindex = InvalidOid;
5785 		rel->rd_replidindex = InvalidOid;
5786 		rel->rd_indexattr = NULL;
5787 		rel->rd_keyattr = NULL;
5788 		rel->rd_pkattr = NULL;
5789 		rel->rd_idattr = NULL;
5790 		rel->rd_pubactions = NULL;
5791 		rel->rd_statvalid = false;
5792 		rel->rd_statlist = NIL;
5793 		rel->rd_fkeyvalid = false;
5794 		rel->rd_fkeylist = NIL;
5795 		rel->rd_createSubid = InvalidSubTransactionId;
5796 		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
5797 		rel->rd_amcache = NULL;
5798 		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
5799 
5800 		/*
5801 		 * Recompute lock and physical addressing info.  This is needed in
5802 		 * case the pg_internal.init file was copied from some other database
5803 		 * by CREATE DATABASE.
5804 		 */
5805 		RelationInitLockInfo(rel);
5806 		RelationInitPhysicalAddr(rel);
5807 	}
5808 
5809 	/*
5810 	 * We reached the end of the init file without apparent problem.  Did we
5811 	 * get the right number of nailed items?  This is a useful crosscheck in
5812 	 * case the set of critical rels or indexes changes.  However, that should
5813 	 * not happen in a normally-running system, so let's bleat if it does.
5814 	 *
5815 	 * For the shared init file, we're called before client authentication is
5816 	 * done, which means that elog(WARNING) will go only to the postmaster
5817 	 * log, where it's easily missed.  To ensure that developers notice bad
5818 	 * values of NUM_CRITICAL_SHARED_RELS/NUM_CRITICAL_SHARED_INDEXES, we put
5819 	 * an Assert(false) there.
5820 	 */
5821 	if (shared)
5822 	{
5823 		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
5824 			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
5825 		{
5826 			elog(WARNING, "found %d nailed shared rels and %d nailed shared indexes in init file, but expected %d and %d respectively",
5827 				 nailed_rels, nailed_indexes,
5828 				 NUM_CRITICAL_SHARED_RELS, NUM_CRITICAL_SHARED_INDEXES);
5829 			/* Make sure we get developers' attention about this */
5830 			Assert(false);
5831 			/* In production builds, recover by bootstrapping the relcache */
5832 			goto read_failed;
5833 		}
5834 	}
5835 	else
5836 	{
5837 		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
5838 			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
5839 		{
5840 			elog(WARNING, "found %d nailed rels and %d nailed indexes in init file, but expected %d and %d respectively",
5841 				 nailed_rels, nailed_indexes,
5842 				 NUM_CRITICAL_LOCAL_RELS, NUM_CRITICAL_LOCAL_INDEXES);
5843 			/* We don't need an Assert() in this case */
5844 			goto read_failed;
5845 		}
5846 	}
5847 
5848 	/*
5849 	 * OK, all appears well.
5850 	 *
5851 	 * Now insert all the new relcache entries into the cache.
5852 	 */
5853 	for (relno = 0; relno < num_rels; relno++)
5854 	{
5855 		RelationCacheInsert(rels[relno], false);
5856 	}
5857 
5858 	pfree(rels);
5859 	FreeFile(fp);
5860 
5861 	if (shared)
5862 		criticalSharedRelcachesBuilt = true;
5863 	else
5864 		criticalRelcachesBuilt = true;
5865 	return true;
5866 
5867 	/*
5868 	 * init file is broken, so do it the hard way.  We don't bother trying to
5869 	 * free the clutter we just allocated; it's not in the relcache so it
5870 	 * won't hurt.
5871 	 */
5872 read_failed:
5873 	pfree(rels);
5874 	FreeFile(fp);
5875 
5876 	return false;
5877 }
5878 
5879 /*
5880  * Write out a new initialization file with the current contents
5881  * of the relcache (either shared rels or local rels, as indicated).
5882  */
5883 static void
5884 write_relcache_init_file(bool shared)
5885 {
5886 	FILE	   *fp;
5887 	char		tempfilename[MAXPGPATH];
5888 	char		finalfilename[MAXPGPATH];
5889 	int			magic;
5890 	HASH_SEQ_STATUS status;
5891 	RelIdCacheEnt *idhentry;
5892 	int			i;
5893 
5894 	/*
5895 	 * If we have already received any relcache inval events, there's no
5896 	 * chance of succeeding so we may as well skip the whole thing.
5897 	 */
5898 	if (relcacheInvalsReceived != 0L)
5899 		return;
5900 
5901 	/*
5902 	 * We must write a temporary file and rename it into place. Otherwise,
5903 	 * another backend starting at about the same time might crash trying to
5904 	 * read the partially-complete file.
5905 	 */
5906 	if (shared)
5907 	{
5908 		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
5909 				 RELCACHE_INIT_FILENAME, MyProcPid);
5910 		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
5911 				 RELCACHE_INIT_FILENAME);
5912 	}
5913 	else
5914 	{
5915 		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
5916 				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
5917 		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
5918 				 DatabasePath, RELCACHE_INIT_FILENAME);
5919 	}
5920 
5921 	unlink(tempfilename);		/* in case it exists w/wrong permissions */
5922 
5923 	fp = AllocateFile(tempfilename, PG_BINARY_W);
5924 	if (fp == NULL)
5925 	{
5926 		/*
5927 		 * We used to consider this a fatal error, but we might as well
5928 		 * continue with backend startup ...
5929 		 */
5930 		ereport(WARNING,
5931 				(errcode_for_file_access(),
5932 				 errmsg("could not create relation-cache initialization file \"%s\": %m",
5933 						tempfilename),
5934 				 errdetail("Continuing anyway, but there's something wrong.")));
5935 		return;
5936 	}
5937 
5938 	/*
5939 	 * Write a magic number to serve as a file version identifier.  We can
5940 	 * change the magic number whenever the relcache layout changes.
5941 	 */
5942 	magic = RELCACHE_INIT_FILEMAGIC;
5943 	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
5944 		elog(FATAL, "could not write init file");
5945 
5946 	/*
5947 	 * Write all the appropriate reldescs (in no particular order).
5948 	 */
5949 	hash_seq_init(&status, RelationIdCache);
5950 
5951 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
5952 	{
5953 		Relation	rel = idhentry->reldesc;
5954 		Form_pg_class relform = rel->rd_rel;
5955 
5956 		/* ignore if not correct group */
5957 		if (relform->relisshared != shared)
5958 			continue;
5959 
5960 		/*
5961 		 * Ignore if not supposed to be in init file.  We can allow any shared
5962 		 * relation that's been loaded so far to be in the shared init file,
5963 		 * but unshared relations must be ones that should be in the local
5964 		 * file per RelationIdIsInInitFile.  (Note: if you want to change the
5965 		 * criterion for rels to be kept in the init file, see also inval.c.
5966 		 * The reason for filtering here is to be sure that we don't put
5967 		 * anything into the local init file for which a relcache inval would
5968 		 * not cause invalidation of that init file.)
5969 		 */
5970 		if (!shared && !RelationIdIsInInitFile(RelationGetRelid(rel)))
5971 		{
5972 			/* Nailed rels had better get stored. */
5973 			Assert(!rel->rd_isnailed);
5974 			continue;
5975 		}
5976 
5977 		/* first write the relcache entry proper */
5978 		write_item(rel, sizeof(RelationData), fp);
5979 
5980 		/* next write the relation tuple form */
5981 		write_item(relform, CLASS_TUPLE_SIZE, fp);
5982 
5983 		/* next, do all the attribute tuple form data entries */
5984 		for (i = 0; i < relform->relnatts; i++)
5985 		{
5986 			write_item(TupleDescAttr(rel->rd_att, i),
5987 					   ATTRIBUTE_FIXED_PART_SIZE, fp);
5988 		}
5989 
5990 		/* next, do the access method specific field */
5991 		write_item(rel->rd_options,
5992 				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
5993 				   fp);
5994 
5995 		/*
5996 		 * If it's an index, there's more to do. Note we explicitly ignore
5997 		 * partitioned indexes here.
5998 		 */
5999 		if (rel->rd_rel->relkind == RELKIND_INDEX)
6000 		{
6001 			/* write the pg_index tuple */
6002 			/* we assume this was created by heap_copytuple! */
6003 			write_item(rel->rd_indextuple,
6004 					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
6005 					   fp);
6006 
6007 			/* next, write the vector of opfamily OIDs */
6008 			write_item(rel->rd_opfamily,
6009 					   relform->relnatts * sizeof(Oid),
6010 					   fp);
6011 
6012 			/* next, write the vector of opcintype OIDs */
6013 			write_item(rel->rd_opcintype,
6014 					   relform->relnatts * sizeof(Oid),
6015 					   fp);
6016 
6017 			/* next, write the vector of support procedure OIDs */
6018 			write_item(rel->rd_support,
6019 					   relform->relnatts * (rel->rd_indam->amsupport * sizeof(RegProcedure)),
6020 					   fp);
6021 
6022 			/* next, write the vector of collation OIDs */
6023 			write_item(rel->rd_indcollation,
6024 					   relform->relnatts * sizeof(Oid),
6025 					   fp);
6026 
6027 			/* finally, write the vector of indoption values */
6028 			write_item(rel->rd_indoption,
6029 					   relform->relnatts * sizeof(int16),
6030 					   fp);
6031 		}
6032 	}
6033 
6034 	if (FreeFile(fp))
6035 		elog(FATAL, "could not write init file");
6036 
6037 	/*
6038 	 * Now we have to check whether the data we've so painstakingly
6039 	 * accumulated is already obsolete due to someone else's just-committed
6040 	 * catalog changes.  If so, we just delete the temp file and leave it to
6041 	 * the next backend to try again.  (Our own relcache entries will be
6042 	 * updated by SI message processing, but we can't be sure whether what we
6043 	 * wrote out was up-to-date.)
6044 	 *
6045 	 * This mustn't run concurrently with the code that unlinks an init file
6046 	 * and sends SI messages, so grab a serialization lock for the duration.
6047 	 */
6048 	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
6049 
6050 	/* Make sure we have seen all incoming SI messages */
6051 	AcceptInvalidationMessages();
6052 
6053 	/*
6054 	 * If we have received any SI relcache invals since backend start, assume
6055 	 * we may have written out-of-date data.
6056 	 */
6057 	if (relcacheInvalsReceived == 0L)
6058 	{
6059 		/*
6060 		 * OK, rename the temp file to its final name, deleting any
6061 		 * previously-existing init file.
6062 		 *
6063 		 * Note: a failure here is possible under Cygwin, if some other
6064 		 * backend is holding open an unlinked-but-not-yet-gone init file. So
6065 		 * treat this as a noncritical failure; just remove the useless temp
6066 		 * file on failure.
6067 		 */
6068 		if (rename(tempfilename, finalfilename) < 0)
6069 			unlink(tempfilename);
6070 	}
6071 	else
6072 	{
6073 		/* Delete the already-obsolete temp file */
6074 		unlink(tempfilename);
6075 	}
6076 
6077 	LWLockRelease(RelCacheInitLock);
6078 }
6079 
6080 /* write a chunk of data preceded by its length */
6081 static void
6082 write_item(const void *data, Size len, FILE *fp)
6083 {
6084 	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
6085 		elog(FATAL, "could not write init file");
6086 	if (fwrite(data, 1, len, fp) != len)
6087 		elog(FATAL, "could not write init file");
6088 }
6089 
6090 /*
6091  * Determine whether a given relation (identified by OID) is one of the ones
6092  * we should store in a relcache init file.
6093  *
6094  * We must cache all nailed rels, and for efficiency we should cache every rel
6095  * that supports a syscache.  The former set is almost but not quite a subset
6096  * of the latter. The special cases are relations where
6097  * RelationCacheInitializePhase2/3 chooses to nail for efficiency reasons, but
6098  * which do not support any syscache.
6099  */
6100 bool
6101 RelationIdIsInInitFile(Oid relationId)
6102 {
6103 	if (relationId == SharedSecLabelRelationId ||
6104 		relationId == TriggerRelidNameIndexId ||
6105 		relationId == DatabaseNameIndexId ||
6106 		relationId == SharedSecLabelObjectIndexId)
6107 	{
6108 		/*
6109 		 * If this Assert fails, we don't need the applicable special case
6110 		 * anymore.
6111 		 */
6112 		Assert(!RelationSupportsSysCache(relationId));
6113 		return true;
6114 	}
6115 	return RelationSupportsSysCache(relationId);
6116 }
6117 
6118 /*
6119  * Invalidate (remove) the init file during commit of a transaction that
6120  * changed one or more of the relation cache entries that are kept in the
6121  * local init file.
6122  *
6123  * To be safe against concurrent inspection or rewriting of the init file,
6124  * we must take RelCacheInitLock, then remove the old init file, then send
6125  * the SI messages that include relcache inval for such relations, and then
6126  * release RelCacheInitLock.  This serializes the whole affair against
6127  * write_relcache_init_file, so that we can be sure that any other process
6128  * that's concurrently trying to create a new init file won't move an
6129  * already-stale version into place after we unlink.  Also, because we unlink
6130  * before sending the SI messages, a backend that's currently starting cannot
6131  * read the now-obsolete init file and then miss the SI messages that will
6132  * force it to update its relcache entries.  (This works because the backend
6133  * startup sequence gets into the sinval array before trying to load the init
6134  * file.)
6135  *
6136  * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
6137  * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
6138  * send any pending SI messages between those calls.
6139  */
6140 void
6141 RelationCacheInitFilePreInvalidate(void)
6142 {
6143 	char		localinitfname[MAXPGPATH];
6144 	char		sharedinitfname[MAXPGPATH];
6145 
6146 	if (DatabasePath)
6147 		snprintf(localinitfname, sizeof(localinitfname), "%s/%s",
6148 				 DatabasePath, RELCACHE_INIT_FILENAME);
6149 	snprintf(sharedinitfname, sizeof(sharedinitfname), "global/%s",
6150 			 RELCACHE_INIT_FILENAME);
6151 
6152 	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
6153 
6154 	/*
6155 	 * The files might not be there if no backend has been started since the
6156 	 * last removal.  But complain about failures other than ENOENT with
6157 	 * ERROR.  Fortunately, it's not too late to abort the transaction if we
6158 	 * can't get rid of the would-be-obsolete init file.
6159 	 */
6160 	if (DatabasePath)
6161 		unlink_initfile(localinitfname, ERROR);
6162 	unlink_initfile(sharedinitfname, ERROR);
6163 }
6164 
6165 void
6166 RelationCacheInitFilePostInvalidate(void)
6167 {
6168 	LWLockRelease(RelCacheInitLock);
6169 }
6170 
6171 /*
6172  * Remove the init files during postmaster startup.
6173  *
6174  * We used to keep the init files across restarts, but that is unsafe in PITR
6175  * scenarios, and even in simple crash-recovery cases there are windows for
6176  * the init files to become out-of-sync with the database.  So now we just
6177  * remove them during startup and expect the first backend launch to rebuild
6178  * them.  Of course, this has to happen in each database of the cluster.
6179  */
6180 void
6181 RelationCacheInitFileRemove(void)
6182 {
6183 	const char *tblspcdir = "pg_tblspc";
6184 	DIR		   *dir;
6185 	struct dirent *de;
6186 	char		path[MAXPGPATH + 10 + sizeof(TABLESPACE_VERSION_DIRECTORY)];
6187 
6188 	snprintf(path, sizeof(path), "global/%s",
6189 			 RELCACHE_INIT_FILENAME);
6190 	unlink_initfile(path, LOG);
6191 
6192 	/* Scan everything in the default tablespace */
6193 	RelationCacheInitFileRemoveInDir("base");
6194 
6195 	/* Scan the tablespace link directory to find non-default tablespaces */
6196 	dir = AllocateDir(tblspcdir);
6197 
6198 	while ((de = ReadDirExtended(dir, tblspcdir, LOG)) != NULL)
6199 	{
6200 		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
6201 		{
6202 			/* Scan the tablespace dir for per-database dirs */
6203 			snprintf(path, sizeof(path), "%s/%s/%s",
6204 					 tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
6205 			RelationCacheInitFileRemoveInDir(path);
6206 		}
6207 	}
6208 
6209 	FreeDir(dir);
6210 }
6211 
6212 /* Process one per-tablespace directory for RelationCacheInitFileRemove */
6213 static void
6214 RelationCacheInitFileRemoveInDir(const char *tblspcpath)
6215 {
6216 	DIR		   *dir;
6217 	struct dirent *de;
6218 	char		initfilename[MAXPGPATH * 2];
6219 
6220 	/* Scan the tablespace directory to find per-database directories */
6221 	dir = AllocateDir(tblspcpath);
6222 
6223 	while ((de = ReadDirExtended(dir, tblspcpath, LOG)) != NULL)
6224 	{
6225 		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
6226 		{
6227 			/* Try to remove the init file in each database */
6228 			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
6229 					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
6230 			unlink_initfile(initfilename, LOG);
6231 		}
6232 	}
6233 
6234 	FreeDir(dir);
6235 }
6236 
6237 static void
6238 unlink_initfile(const char *initfilename, int elevel)
6239 {
6240 	if (unlink(initfilename) < 0)
6241 	{
6242 		/* It might not be there, but log any error other than ENOENT */
6243 		if (errno != ENOENT)
6244 			ereport(elevel,
6245 					(errcode_for_file_access(),
6246 					 errmsg("could not remove cache file \"%s\": %m",
6247 							initfilename)));
6248 	}
6249 }
6250