1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *	  POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/utils/cache/relcache.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *		RelationCacheInitialize			- initialize relcache (to empty)
18  *		RelationCacheInitializePhase2	- initialize shared-catalog entries
19  *		RelationCacheInitializePhase3	- finish initializing relcache
20  *		RelationIdGetRelation			- get a reldesc by relation id
21  *		RelationClose					- close an open relation
22  *
23  * NOTES
24  *		The following code contains many undocumented hacks.  Please be
25  *		careful....
26  */
27 #include "postgres.h"
28 
29 #include <sys/file.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 
33 #include "access/htup_details.h"
34 #include "access/multixact.h"
35 #include "access/nbtree.h"
36 #include "access/parallel.h"
37 #include "access/reloptions.h"
38 #include "access/sysattr.h"
39 #include "access/table.h"
40 #include "access/tableam.h"
41 #include "access/tupdesc_details.h"
42 #include "access/xact.h"
43 #include "access/xlog.h"
44 #include "catalog/catalog.h"
45 #include "catalog/indexing.h"
46 #include "catalog/namespace.h"
47 #include "catalog/partition.h"
48 #include "catalog/pg_am.h"
49 #include "catalog/pg_amproc.h"
50 #include "catalog/pg_attrdef.h"
51 #include "catalog/pg_auth_members.h"
52 #include "catalog/pg_authid.h"
53 #include "catalog/pg_constraint.h"
54 #include "catalog/pg_database.h"
55 #include "catalog/pg_namespace.h"
56 #include "catalog/pg_opclass.h"
57 #include "catalog/pg_proc.h"
58 #include "catalog/pg_publication.h"
59 #include "catalog/pg_rewrite.h"
60 #include "catalog/pg_shseclabel.h"
61 #include "catalog/pg_statistic_ext.h"
62 #include "catalog/pg_subscription.h"
63 #include "catalog/pg_tablespace.h"
64 #include "catalog/pg_trigger.h"
65 #include "catalog/pg_type.h"
66 #include "catalog/schemapg.h"
67 #include "catalog/storage.h"
68 #include "commands/policy.h"
69 #include "commands/trigger.h"
70 #include "miscadmin.h"
71 #include "nodes/makefuncs.h"
72 #include "nodes/nodeFuncs.h"
73 #include "optimizer/optimizer.h"
74 #include "rewrite/rewriteDefine.h"
75 #include "rewrite/rowsecurity.h"
76 #include "storage/lmgr.h"
77 #include "storage/smgr.h"
78 #include "utils/array.h"
79 #include "utils/builtins.h"
80 #include "utils/datum.h"
81 #include "utils/fmgroids.h"
82 #include "utils/inval.h"
83 #include "utils/lsyscache.h"
84 #include "utils/memutils.h"
85 #include "utils/relmapper.h"
86 #include "utils/resowner_private.h"
87 #include "utils/snapmgr.h"
88 #include "utils/syscache.h"
89 
90 #define RELCACHE_INIT_FILEMAGIC		0x573266	/* version ID value */
91 
92 /*
93  * Whether to bother checking if relation cache memory needs to be freed
94  * eagerly.  See also RelationBuildDesc() and pg_config_manual.h.
95  */
96 #if defined(RECOVER_RELATION_BUILD_MEMORY) && (RECOVER_RELATION_BUILD_MEMORY != 0)
97 #define MAYBE_RECOVER_RELATION_BUILD_MEMORY 1
98 #else
99 #define RECOVER_RELATION_BUILD_MEMORY 0
100 #ifdef DISCARD_CACHES_ENABLED
101 #define MAYBE_RECOVER_RELATION_BUILD_MEMORY 1
102 #endif
103 #endif
104 
105 /*
106  *		hardcoded tuple descriptors, contents generated by genbki.pl
107  */
108 static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
109 static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
110 static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
111 static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
112 static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
113 static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
114 static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
115 static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
116 static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel};
117 static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};
118 
119 /*
120  *		Hash tables that index the relation cache
121  *
122  *		We used to index the cache by both name and OID, but now there
123  *		is only an index by OID.
124  */
125 typedef struct relidcacheent
126 {
127 	Oid			reloid;
128 	Relation	reldesc;
129 } RelIdCacheEnt;
130 
131 static HTAB *RelationIdCache;
132 
133 /*
134  * This flag is false until we have prepared the critical relcache entries
135  * that are needed to do indexscans on the tables read by relcache building.
136  */
137 bool		criticalRelcachesBuilt = false;
138 
139 /*
140  * This flag is false until we have prepared the critical relcache entries
141  * for shared catalogs (which are the tables needed for login).
142  */
143 bool		criticalSharedRelcachesBuilt = false;
144 
145 /*
146  * This counter counts relcache inval events received since backend startup
147  * (but only for rels that are actually in cache).  Presently, we use it only
148  * to detect whether data about to be written by write_relcache_init_file()
149  * might already be obsolete.
150  */
151 static long relcacheInvalsReceived = 0L;
152 
153 /*
154  * in_progress_list is a stack of ongoing RelationBuildDesc() calls.  CREATE
155  * INDEX CONCURRENTLY makes catalog changes under ShareUpdateExclusiveLock.
156  * It critically relies on each backend absorbing those changes no later than
157  * next transaction start.  Hence, RelationBuildDesc() loops until it finishes
158  * without accepting a relevant invalidation.  (Most invalidation consumers
159  * don't do this.)
160  */
161 typedef struct inprogressent
162 {
163 	Oid			reloid;			/* OID of relation being built */
164 	bool		invalidated;	/* whether an invalidation arrived for it */
165 } InProgressEnt;
166 
167 static InProgressEnt *in_progress_list;
168 static int	in_progress_list_len;
169 static int	in_progress_list_maxlen;
170 
171 /*
172  * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
173  * cleanup work.  This list intentionally has limited size; if it overflows,
174  * we fall back to scanning the whole hashtable.  There is no value in a very
175  * large list because (1) at some point, a hash_seq_search scan is faster than
176  * retail lookups, and (2) the value of this is to reduce EOXact work for
177  * short transactions, which can't have dirtied all that many tables anyway.
178  * EOXactListAdd() does not bother to prevent duplicate list entries, so the
179  * cleanup processing must be idempotent.
180  */
181 #define MAX_EOXACT_LIST 32
182 static Oid	eoxact_list[MAX_EOXACT_LIST];
183 static int	eoxact_list_len = 0;
184 static bool eoxact_list_overflowed = false;
185 
186 #define EOXactListAdd(rel) \
187 	do { \
188 		if (eoxact_list_len < MAX_EOXACT_LIST) \
189 			eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
190 		else \
191 			eoxact_list_overflowed = true; \
192 	} while (0)
193 
194 /*
195  * EOXactTupleDescArray stores TupleDescs that (might) need AtEOXact
196  * cleanup work.  The array expands as needed; there is no hashtable because
197  * we don't need to access individual items except at EOXact.
198  */
199 static TupleDesc *EOXactTupleDescArray;
200 static int	NextEOXactTupleDescNum = 0;
201 static int	EOXactTupleDescArrayLen = 0;
202 
203 /*
204  *		macros to manipulate the lookup hashtable
205  */
206 #define RelationCacheInsert(RELATION, replace_allowed)	\
207 do { \
208 	RelIdCacheEnt *hentry; bool found; \
209 	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
210 										   (void *) &((RELATION)->rd_id), \
211 										   HASH_ENTER, &found); \
212 	if (found) \
213 	{ \
214 		/* see comments in RelationBuildDesc and RelationBuildLocalRelation */ \
215 		Relation _old_rel = hentry->reldesc; \
216 		Assert(replace_allowed); \
217 		hentry->reldesc = (RELATION); \
218 		if (RelationHasReferenceCountZero(_old_rel)) \
219 			RelationDestroyRelation(_old_rel, false); \
220 		else if (!IsBootstrapProcessingMode()) \
221 			elog(WARNING, "leaking still-referenced relcache entry for \"%s\"", \
222 				 RelationGetRelationName(_old_rel)); \
223 	} \
224 	else \
225 		hentry->reldesc = (RELATION); \
226 } while(0)
227 
228 #define RelationIdCacheLookup(ID, RELATION) \
229 do { \
230 	RelIdCacheEnt *hentry; \
231 	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
232 										   (void *) &(ID), \
233 										   HASH_FIND, NULL); \
234 	if (hentry) \
235 		RELATION = hentry->reldesc; \
236 	else \
237 		RELATION = NULL; \
238 } while(0)
239 
240 #define RelationCacheDelete(RELATION) \
241 do { \
242 	RelIdCacheEnt *hentry; \
243 	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
244 										   (void *) &((RELATION)->rd_id), \
245 										   HASH_REMOVE, NULL); \
246 	if (hentry == NULL) \
247 		elog(WARNING, "failed to delete relcache entry for OID %u", \
248 			 (RELATION)->rd_id); \
249 } while(0)
250 
251 
252 /*
253  * Special cache for opclass-related information
254  *
255  * Note: only default support procs get cached, ie, those with
256  * lefttype = righttype = opcintype.
257  */
258 typedef struct opclasscacheent
259 {
260 	Oid			opclassoid;		/* lookup key: OID of opclass */
261 	bool		valid;			/* set true after successful fill-in */
262 	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
263 	Oid			opcfamily;		/* OID of opclass's family */
264 	Oid			opcintype;		/* OID of opclass's declared input type */
265 	RegProcedure *supportProcs; /* OIDs of support procedures */
266 } OpClassCacheEnt;
267 
268 static HTAB *OpClassCache = NULL;
269 
270 
271 /* non-export function prototypes */
272 
273 static void RelationDestroyRelation(Relation relation, bool remember_tupdesc);
274 static void RelationClearRelation(Relation relation, bool rebuild);
275 
276 static void RelationReloadIndexInfo(Relation relation);
277 static void RelationReloadNailed(Relation relation);
278 static void RelationFlushRelation(Relation relation);
279 static void RememberToFreeTupleDescAtEOX(TupleDesc td);
280 #ifdef USE_ASSERT_CHECKING
281 static void AssertPendingSyncConsistency(Relation relation);
282 #endif
283 static void AtEOXact_cleanup(Relation relation, bool isCommit);
284 static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
285 								SubTransactionId mySubid, SubTransactionId parentSubid);
286 static bool load_relcache_init_file(bool shared);
287 static void write_relcache_init_file(bool shared);
288 static void write_item(const void *data, Size len, FILE *fp);
289 
290 static void formrdesc(const char *relationName, Oid relationReltype,
291 					  bool isshared, int natts, const FormData_pg_attribute *attrs);
292 
293 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
294 static Relation AllocateRelationDesc(Form_pg_class relp);
295 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
296 static void RelationBuildTupleDesc(Relation relation);
297 static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
298 static void RelationInitPhysicalAddr(Relation relation);
299 static void load_critical_index(Oid indexoid, Oid heapoid);
300 static TupleDesc GetPgClassDescriptor(void);
301 static TupleDesc GetPgIndexDescriptor(void);
302 static void AttrDefaultFetch(Relation relation, int ndef);
303 static int	AttrDefaultCmp(const void *a, const void *b);
304 static void CheckConstraintFetch(Relation relation);
305 static int	CheckConstraintCmp(const void *a, const void *b);
306 static void InitIndexAmRoutine(Relation relation);
307 static void IndexSupportInitialize(oidvector *indclass,
308 								   RegProcedure *indexSupport,
309 								   Oid *opFamily,
310 								   Oid *opcInType,
311 								   StrategyNumber maxSupportNumber,
312 								   AttrNumber maxAttributeNumber);
313 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
314 										  StrategyNumber numSupport);
315 static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
316 static void unlink_initfile(const char *initfilename, int elevel);
317 
318 
319 /*
320  *		ScanPgRelation
321  *
322  *		This is used by RelationBuildDesc to find a pg_class
323  *		tuple matching targetRelId.  The caller must hold at least
324  *		AccessShareLock on the target relid to prevent concurrent-update
325  *		scenarios; it isn't guaranteed that all scans used to build the
326  *		relcache entry will use the same snapshot.  If, for example,
327  *		an attribute were to be added after scanning pg_class and before
328  *		scanning pg_attribute, relnatts wouldn't match.
329  *
330  *		NB: the returned tuple has been copied into palloc'd storage
331  *		and must eventually be freed with heap_freetuple.
332  */
333 static HeapTuple
ScanPgRelation(Oid targetRelId,bool indexOK,bool force_non_historic)334 ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
335 {
336 	HeapTuple	pg_class_tuple;
337 	Relation	pg_class_desc;
338 	SysScanDesc pg_class_scan;
339 	ScanKeyData key[1];
340 	Snapshot	snapshot = NULL;
341 
342 	/*
343 	 * If something goes wrong during backend startup, we might find ourselves
344 	 * trying to read pg_class before we've selected a database.  That ain't
345 	 * gonna work, so bail out with a useful error message.  If this happens,
346 	 * it probably means a relcache entry that needs to be nailed isn't.
347 	 */
348 	if (!OidIsValid(MyDatabaseId))
349 		elog(FATAL, "cannot read pg_class without having selected a database");
350 
351 	/*
352 	 * form a scan key
353 	 */
354 	ScanKeyInit(&key[0],
355 				Anum_pg_class_oid,
356 				BTEqualStrategyNumber, F_OIDEQ,
357 				ObjectIdGetDatum(targetRelId));
358 
359 	/*
360 	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
361 	 * built the critical relcache entries (this includes initdb and startup
362 	 * without a pg_internal.init file).  The caller can also force a heap
363 	 * scan by setting indexOK == false.
364 	 */
365 	pg_class_desc = table_open(RelationRelationId, AccessShareLock);
366 
367 	/*
368 	 * The caller might need a tuple that's newer than the one the historic
369 	 * snapshot; currently the only case requiring to do so is looking up the
370 	 * relfilenode of non mapped system relations during decoding. That
371 	 * snapshot can't change in the midst of a relcache build, so there's no
372 	 * need to register the snapshot.
373 	 */
374 	if (force_non_historic)
375 		snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId);
376 
377 	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
378 									   indexOK && criticalRelcachesBuilt,
379 									   snapshot,
380 									   1, key);
381 
382 	pg_class_tuple = systable_getnext(pg_class_scan);
383 
384 	/*
385 	 * Must copy tuple before releasing buffer.
386 	 */
387 	if (HeapTupleIsValid(pg_class_tuple))
388 		pg_class_tuple = heap_copytuple(pg_class_tuple);
389 
390 	/* all done */
391 	systable_endscan(pg_class_scan);
392 	table_close(pg_class_desc, AccessShareLock);
393 
394 	return pg_class_tuple;
395 }
396 
397 /*
398  *		AllocateRelationDesc
399  *
400  *		This is used to allocate memory for a new relation descriptor
401  *		and initialize the rd_rel field from the given pg_class tuple.
402  */
403 static Relation
AllocateRelationDesc(Form_pg_class relp)404 AllocateRelationDesc(Form_pg_class relp)
405 {
406 	Relation	relation;
407 	MemoryContext oldcxt;
408 	Form_pg_class relationForm;
409 
410 	/* Relcache entries must live in CacheMemoryContext */
411 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
412 
413 	/*
414 	 * allocate and zero space for new relation descriptor
415 	 */
416 	relation = (Relation) palloc0(sizeof(RelationData));
417 
418 	/* make sure relation is marked as having no open file yet */
419 	relation->rd_smgr = NULL;
420 
421 	/*
422 	 * Copy the relation tuple form
423 	 *
424 	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
425 	 * variable-length fields (relacl, reloptions) are NOT stored in the
426 	 * relcache --- there'd be little point in it, since we don't copy the
427 	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
428 	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
429 	 * it from the syscache if you need it.  The same goes for the original
430 	 * form of reloptions (however, we do store the parsed form of reloptions
431 	 * in rd_options).
432 	 */
433 	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
434 
435 	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
436 
437 	/* initialize relation tuple form */
438 	relation->rd_rel = relationForm;
439 
440 	/* and allocate attribute tuple form storage */
441 	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts);
442 	/* which we mark as a reference-counted tupdesc */
443 	relation->rd_att->tdrefcount = 1;
444 
445 	MemoryContextSwitchTo(oldcxt);
446 
447 	return relation;
448 }
449 
450 /*
451  * RelationParseRelOptions
452  *		Convert pg_class.reloptions into pre-parsed rd_options
453  *
454  * tuple is the real pg_class tuple (not rd_rel!) for relation
455  *
456  * Note: rd_rel and (if an index) rd_indam must be valid already
457  */
458 static void
RelationParseRelOptions(Relation relation,HeapTuple tuple)459 RelationParseRelOptions(Relation relation, HeapTuple tuple)
460 {
461 	bytea	   *options;
462 	amoptions_function amoptsfn;
463 
464 	relation->rd_options = NULL;
465 
466 	/*
467 	 * Look up any AM-specific parse function; fall out if relkind should not
468 	 * have options.
469 	 */
470 	switch (relation->rd_rel->relkind)
471 	{
472 		case RELKIND_RELATION:
473 		case RELKIND_TOASTVALUE:
474 		case RELKIND_VIEW:
475 		case RELKIND_MATVIEW:
476 		case RELKIND_PARTITIONED_TABLE:
477 			amoptsfn = NULL;
478 			break;
479 		case RELKIND_INDEX:
480 		case RELKIND_PARTITIONED_INDEX:
481 			amoptsfn = relation->rd_indam->amoptions;
482 			break;
483 		default:
484 			return;
485 	}
486 
487 	/*
488 	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
489 	 * we might not have any other for pg_class yet (consider executing this
490 	 * code for pg_class itself)
491 	 */
492 	options = extractRelOptions(tuple, GetPgClassDescriptor(), amoptsfn);
493 
494 	/*
495 	 * Copy parsed data into CacheMemoryContext.  To guard against the
496 	 * possibility of leaks in the reloptions code, we want to do the actual
497 	 * parsing in the caller's memory context and copy the results into
498 	 * CacheMemoryContext after the fact.
499 	 */
500 	if (options)
501 	{
502 		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
503 												  VARSIZE(options));
504 		memcpy(relation->rd_options, options, VARSIZE(options));
505 		pfree(options);
506 	}
507 }
508 
509 /*
510  *		RelationBuildTupleDesc
511  *
512  *		Form the relation's tuple descriptor from information in
513  *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
514  */
515 static void
RelationBuildTupleDesc(Relation relation)516 RelationBuildTupleDesc(Relation relation)
517 {
518 	HeapTuple	pg_attribute_tuple;
519 	Relation	pg_attribute_desc;
520 	SysScanDesc pg_attribute_scan;
521 	ScanKeyData skey[2];
522 	int			need;
523 	TupleConstr *constr;
524 	AttrMissing *attrmiss = NULL;
525 	int			ndef = 0;
526 
527 	/* fill rd_att's type ID fields (compare heap.c's AddNewRelationTuple) */
528 	relation->rd_att->tdtypeid =
529 		relation->rd_rel->reltype ? relation->rd_rel->reltype : RECORDOID;
530 	relation->rd_att->tdtypmod = -1;	/* just to be sure */
531 
532 	constr = (TupleConstr *) MemoryContextAllocZero(CacheMemoryContext,
533 													sizeof(TupleConstr));
534 	constr->has_not_null = false;
535 	constr->has_generated_stored = false;
536 
537 	/*
538 	 * Form a scan key that selects only user attributes (attnum > 0).
539 	 * (Eliminating system attribute rows at the index level is lots faster
540 	 * than fetching them.)
541 	 */
542 	ScanKeyInit(&skey[0],
543 				Anum_pg_attribute_attrelid,
544 				BTEqualStrategyNumber, F_OIDEQ,
545 				ObjectIdGetDatum(RelationGetRelid(relation)));
546 	ScanKeyInit(&skey[1],
547 				Anum_pg_attribute_attnum,
548 				BTGreaterStrategyNumber, F_INT2GT,
549 				Int16GetDatum(0));
550 
551 	/*
552 	 * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
553 	 * built the critical relcache entries (this includes initdb and startup
554 	 * without a pg_internal.init file).
555 	 */
556 	pg_attribute_desc = table_open(AttributeRelationId, AccessShareLock);
557 	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
558 										   AttributeRelidNumIndexId,
559 										   criticalRelcachesBuilt,
560 										   NULL,
561 										   2, skey);
562 
563 	/*
564 	 * add attribute data to relation->rd_att
565 	 */
566 	need = RelationGetNumberOfAttributes(relation);
567 
568 	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
569 	{
570 		Form_pg_attribute attp;
571 		int			attnum;
572 
573 		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
574 
575 		attnum = attp->attnum;
576 		if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
577 			elog(ERROR, "invalid attribute number %d for relation \"%s\"",
578 				 attp->attnum, RelationGetRelationName(relation));
579 
580 		memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
581 			   attp,
582 			   ATTRIBUTE_FIXED_PART_SIZE);
583 
584 		/* Update constraint/default info */
585 		if (attp->attnotnull)
586 			constr->has_not_null = true;
587 		if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED)
588 			constr->has_generated_stored = true;
589 		if (attp->atthasdef)
590 			ndef++;
591 
592 		/* If the column has a "missing" value, put it in the attrmiss array */
593 		if (attp->atthasmissing)
594 		{
595 			Datum		missingval;
596 			bool		missingNull;
597 
598 			/* Do we have a missing value? */
599 			missingval = heap_getattr(pg_attribute_tuple,
600 									  Anum_pg_attribute_attmissingval,
601 									  pg_attribute_desc->rd_att,
602 									  &missingNull);
603 			if (!missingNull)
604 			{
605 				/* Yes, fetch from the array */
606 				MemoryContext oldcxt;
607 				bool		is_null;
608 				int			one = 1;
609 				Datum		missval;
610 
611 				if (attrmiss == NULL)
612 					attrmiss = (AttrMissing *)
613 						MemoryContextAllocZero(CacheMemoryContext,
614 											   relation->rd_rel->relnatts *
615 											   sizeof(AttrMissing));
616 
617 				missval = array_get_element(missingval,
618 											1,
619 											&one,
620 											-1,
621 											attp->attlen,
622 											attp->attbyval,
623 											attp->attalign,
624 											&is_null);
625 				Assert(!is_null);
626 				if (attp->attbyval)
627 				{
628 					/* for copy by val just copy the datum direct */
629 					attrmiss[attnum - 1].am_value = missval;
630 				}
631 				else
632 				{
633 					/* otherwise copy in the correct context */
634 					oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
635 					attrmiss[attnum - 1].am_value = datumCopy(missval,
636 															  attp->attbyval,
637 															  attp->attlen);
638 					MemoryContextSwitchTo(oldcxt);
639 				}
640 				attrmiss[attnum - 1].am_present = true;
641 			}
642 		}
643 		need--;
644 		if (need == 0)
645 			break;
646 	}
647 
648 	/*
649 	 * end the scan and close the attribute relation
650 	 */
651 	systable_endscan(pg_attribute_scan);
652 	table_close(pg_attribute_desc, AccessShareLock);
653 
654 	if (need != 0)
655 		elog(ERROR, "pg_attribute catalog is missing %d attribute(s) for relation OID %u",
656 			 need, RelationGetRelid(relation));
657 
658 	/*
659 	 * The attcacheoff values we read from pg_attribute should all be -1
660 	 * ("unknown").  Verify this if assert checking is on.  They will be
661 	 * computed when and if needed during tuple access.
662 	 */
663 #ifdef USE_ASSERT_CHECKING
664 	{
665 		int			i;
666 
667 		for (i = 0; i < RelationGetNumberOfAttributes(relation); i++)
668 			Assert(TupleDescAttr(relation->rd_att, i)->attcacheoff == -1);
669 	}
670 #endif
671 
672 	/*
673 	 * However, we can easily set the attcacheoff value for the first
674 	 * attribute: it must be zero.  This eliminates the need for special cases
675 	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
676 	 */
677 	if (RelationGetNumberOfAttributes(relation) > 0)
678 		TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
679 
680 	/*
681 	 * Set up constraint/default info
682 	 */
683 	if (constr->has_not_null ||
684 		constr->has_generated_stored ||
685 		ndef > 0 ||
686 		attrmiss ||
687 		relation->rd_rel->relchecks > 0)
688 	{
689 		relation->rd_att->constr = constr;
690 
691 		if (ndef > 0)			/* DEFAULTs */
692 			AttrDefaultFetch(relation, ndef);
693 		else
694 			constr->num_defval = 0;
695 
696 		constr->missing = attrmiss;
697 
698 		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
699 			CheckConstraintFetch(relation);
700 		else
701 			constr->num_check = 0;
702 	}
703 	else
704 	{
705 		pfree(constr);
706 		relation->rd_att->constr = NULL;
707 	}
708 }
709 
710 /*
711  *		RelationBuildRuleLock
712  *
713  *		Form the relation's rewrite rules from information in
714  *		the pg_rewrite system catalog.
715  *
716  * Note: The rule parsetrees are potentially very complex node structures.
717  * To allow these trees to be freed when the relcache entry is flushed,
718  * we make a private memory context to hold the RuleLock information for
719  * each relcache entry that has associated rules.  The context is used
720  * just for rule info, not for any other subsidiary data of the relcache
721  * entry, because that keeps the update logic in RelationClearRelation()
722  * manageable.  The other subsidiary data structures are simple enough
723  * to be easy to free explicitly, anyway.
724  */
725 static void
RelationBuildRuleLock(Relation relation)726 RelationBuildRuleLock(Relation relation)
727 {
728 	MemoryContext rulescxt;
729 	MemoryContext oldcxt;
730 	HeapTuple	rewrite_tuple;
731 	Relation	rewrite_desc;
732 	TupleDesc	rewrite_tupdesc;
733 	SysScanDesc rewrite_scan;
734 	ScanKeyData key;
735 	RuleLock   *rulelock;
736 	int			numlocks;
737 	RewriteRule **rules;
738 	int			maxlocks;
739 
740 	/*
741 	 * Make the private context.  Assume it'll not contain much data.
742 	 */
743 	rulescxt = AllocSetContextCreate(CacheMemoryContext,
744 									 "relation rules",
745 									 ALLOCSET_SMALL_SIZES);
746 	relation->rd_rulescxt = rulescxt;
747 	MemoryContextCopyAndSetIdentifier(rulescxt,
748 									  RelationGetRelationName(relation));
749 
750 	/*
751 	 * allocate an array to hold the rewrite rules (the array is extended if
752 	 * necessary)
753 	 */
754 	maxlocks = 4;
755 	rules = (RewriteRule **)
756 		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
757 	numlocks = 0;
758 
759 	/*
760 	 * form a scan key
761 	 */
762 	ScanKeyInit(&key,
763 				Anum_pg_rewrite_ev_class,
764 				BTEqualStrategyNumber, F_OIDEQ,
765 				ObjectIdGetDatum(RelationGetRelid(relation)));
766 
767 	/*
768 	 * open pg_rewrite and begin a scan
769 	 *
770 	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
771 	 * be reading the rules in name order, except possibly during
772 	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
773 	 * ensures that rules will be fired in name order.
774 	 */
775 	rewrite_desc = table_open(RewriteRelationId, AccessShareLock);
776 	rewrite_tupdesc = RelationGetDescr(rewrite_desc);
777 	rewrite_scan = systable_beginscan(rewrite_desc,
778 									  RewriteRelRulenameIndexId,
779 									  true, NULL,
780 									  1, &key);
781 
782 	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
783 	{
784 		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
785 		bool		isnull;
786 		Datum		rule_datum;
787 		char	   *rule_str;
788 		RewriteRule *rule;
789 
790 		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
791 												  sizeof(RewriteRule));
792 
793 		rule->ruleId = rewrite_form->oid;
794 
795 		rule->event = rewrite_form->ev_type - '0';
796 		rule->enabled = rewrite_form->ev_enabled;
797 		rule->isInstead = rewrite_form->is_instead;
798 
799 		/*
800 		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
801 		 * rule strings are often large enough to be toasted.  To avoid
802 		 * leaking memory in the caller's context, do the detoasting here so
803 		 * we can free the detoasted version.
804 		 */
805 		rule_datum = heap_getattr(rewrite_tuple,
806 								  Anum_pg_rewrite_ev_action,
807 								  rewrite_tupdesc,
808 								  &isnull);
809 		Assert(!isnull);
810 		rule_str = TextDatumGetCString(rule_datum);
811 		oldcxt = MemoryContextSwitchTo(rulescxt);
812 		rule->actions = (List *) stringToNode(rule_str);
813 		MemoryContextSwitchTo(oldcxt);
814 		pfree(rule_str);
815 
816 		rule_datum = heap_getattr(rewrite_tuple,
817 								  Anum_pg_rewrite_ev_qual,
818 								  rewrite_tupdesc,
819 								  &isnull);
820 		Assert(!isnull);
821 		rule_str = TextDatumGetCString(rule_datum);
822 		oldcxt = MemoryContextSwitchTo(rulescxt);
823 		rule->qual = (Node *) stringToNode(rule_str);
824 		MemoryContextSwitchTo(oldcxt);
825 		pfree(rule_str);
826 
827 		/*
828 		 * We want the rule's table references to be checked as though by the
829 		 * table owner, not the user referencing the rule.  Therefore, scan
830 		 * through the rule's actions and set the checkAsUser field on all
831 		 * rtable entries.  We have to look at the qual as well, in case it
832 		 * contains sublinks.
833 		 *
834 		 * The reason for doing this when the rule is loaded, rather than when
835 		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
836 		 * grovel through stored rules to update checkAsUser fields. Scanning
837 		 * the rule tree during load is relatively cheap (compared to
838 		 * constructing it in the first place), so we do it here.
839 		 */
840 		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
841 		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
842 
843 		if (numlocks >= maxlocks)
844 		{
845 			maxlocks *= 2;
846 			rules = (RewriteRule **)
847 				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
848 		}
849 		rules[numlocks++] = rule;
850 	}
851 
852 	/*
853 	 * end the scan and close the attribute relation
854 	 */
855 	systable_endscan(rewrite_scan);
856 	table_close(rewrite_desc, AccessShareLock);
857 
858 	/*
859 	 * there might not be any rules (if relhasrules is out-of-date)
860 	 */
861 	if (numlocks == 0)
862 	{
863 		relation->rd_rules = NULL;
864 		relation->rd_rulescxt = NULL;
865 		MemoryContextDelete(rulescxt);
866 		return;
867 	}
868 
869 	/*
870 	 * form a RuleLock and insert into relation
871 	 */
872 	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
873 	rulelock->numLocks = numlocks;
874 	rulelock->rules = rules;
875 
876 	relation->rd_rules = rulelock;
877 }
878 
879 /*
880  *		equalRuleLocks
881  *
882  *		Determine whether two RuleLocks are equivalent
883  *
884  *		Probably this should be in the rules code someplace...
885  */
886 static bool
equalRuleLocks(RuleLock * rlock1,RuleLock * rlock2)887 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
888 {
889 	int			i;
890 
891 	/*
892 	 * As of 7.3 we assume the rule ordering is repeatable, because
893 	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
894 	 * compare corresponding slots.
895 	 */
896 	if (rlock1 != NULL)
897 	{
898 		if (rlock2 == NULL)
899 			return false;
900 		if (rlock1->numLocks != rlock2->numLocks)
901 			return false;
902 		for (i = 0; i < rlock1->numLocks; i++)
903 		{
904 			RewriteRule *rule1 = rlock1->rules[i];
905 			RewriteRule *rule2 = rlock2->rules[i];
906 
907 			if (rule1->ruleId != rule2->ruleId)
908 				return false;
909 			if (rule1->event != rule2->event)
910 				return false;
911 			if (rule1->enabled != rule2->enabled)
912 				return false;
913 			if (rule1->isInstead != rule2->isInstead)
914 				return false;
915 			if (!equal(rule1->qual, rule2->qual))
916 				return false;
917 			if (!equal(rule1->actions, rule2->actions))
918 				return false;
919 		}
920 	}
921 	else if (rlock2 != NULL)
922 		return false;
923 	return true;
924 }
925 
926 /*
927  *		equalPolicy
928  *
929  *		Determine whether two policies are equivalent
930  */
931 static bool
equalPolicy(RowSecurityPolicy * policy1,RowSecurityPolicy * policy2)932 equalPolicy(RowSecurityPolicy *policy1, RowSecurityPolicy *policy2)
933 {
934 	int			i;
935 	Oid		   *r1,
936 			   *r2;
937 
938 	if (policy1 != NULL)
939 	{
940 		if (policy2 == NULL)
941 			return false;
942 
943 		if (policy1->polcmd != policy2->polcmd)
944 			return false;
945 		if (policy1->hassublinks != policy2->hassublinks)
946 			return false;
947 		if (strcmp(policy1->policy_name, policy2->policy_name) != 0)
948 			return false;
949 		if (ARR_DIMS(policy1->roles)[0] != ARR_DIMS(policy2->roles)[0])
950 			return false;
951 
952 		r1 = (Oid *) ARR_DATA_PTR(policy1->roles);
953 		r2 = (Oid *) ARR_DATA_PTR(policy2->roles);
954 
955 		for (i = 0; i < ARR_DIMS(policy1->roles)[0]; i++)
956 		{
957 			if (r1[i] != r2[i])
958 				return false;
959 		}
960 
961 		if (!equal(policy1->qual, policy2->qual))
962 			return false;
963 		if (!equal(policy1->with_check_qual, policy2->with_check_qual))
964 			return false;
965 	}
966 	else if (policy2 != NULL)
967 		return false;
968 
969 	return true;
970 }
971 
972 /*
973  *		equalRSDesc
974  *
975  *		Determine whether two RowSecurityDesc's are equivalent
976  */
977 static bool
equalRSDesc(RowSecurityDesc * rsdesc1,RowSecurityDesc * rsdesc2)978 equalRSDesc(RowSecurityDesc *rsdesc1, RowSecurityDesc *rsdesc2)
979 {
980 	ListCell   *lc,
981 			   *rc;
982 
983 	if (rsdesc1 == NULL && rsdesc2 == NULL)
984 		return true;
985 
986 	if ((rsdesc1 != NULL && rsdesc2 == NULL) ||
987 		(rsdesc1 == NULL && rsdesc2 != NULL))
988 		return false;
989 
990 	if (list_length(rsdesc1->policies) != list_length(rsdesc2->policies))
991 		return false;
992 
993 	/* RelationBuildRowSecurity should build policies in order */
994 	forboth(lc, rsdesc1->policies, rc, rsdesc2->policies)
995 	{
996 		RowSecurityPolicy *l = (RowSecurityPolicy *) lfirst(lc);
997 		RowSecurityPolicy *r = (RowSecurityPolicy *) lfirst(rc);
998 
999 		if (!equalPolicy(l, r))
1000 			return false;
1001 	}
1002 
1003 	return true;
1004 }
1005 
1006 /*
1007  *		RelationBuildDesc
1008  *
1009  *		Build a relation descriptor.  The caller must hold at least
1010  *		AccessShareLock on the target relid.
1011  *
1012  *		The new descriptor is inserted into the hash table if insertIt is true.
1013  *
1014  *		Returns NULL if no pg_class row could be found for the given relid
1015  *		(suggesting we are trying to access a just-deleted relation).
1016  *		Any other error is reported via elog.
1017  */
1018 static Relation
RelationBuildDesc(Oid targetRelId,bool insertIt)1019 RelationBuildDesc(Oid targetRelId, bool insertIt)
1020 {
1021 	int			in_progress_offset;
1022 	Relation	relation;
1023 	Oid			relid;
1024 	HeapTuple	pg_class_tuple;
1025 	Form_pg_class relp;
1026 
1027 	/*
1028 	 * This function and its subroutines can allocate a good deal of transient
1029 	 * data in CurrentMemoryContext.  Traditionally we've just leaked that
1030 	 * data, reasoning that the caller's context is at worst of transaction
1031 	 * scope, and relcache loads shouldn't happen so often that it's essential
1032 	 * to recover transient data before end of statement/transaction.  However
1033 	 * that's definitely not true when debug_discard_caches is active, and
1034 	 * perhaps it's not true in other cases.
1035 	 *
1036 	 * When debug_discard_caches is active or when forced to by
1037 	 * RECOVER_RELATION_BUILD_MEMORY=1, arrange to allocate the junk in a
1038 	 * temporary context that we'll free before returning.  Make it a child of
1039 	 * caller's context so that it will get cleaned up appropriately if we
1040 	 * error out partway through.
1041 	 */
1042 #ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
1043 	MemoryContext tmpcxt = NULL;
1044 	MemoryContext oldcxt = NULL;
1045 
1046 	if (RECOVER_RELATION_BUILD_MEMORY || debug_discard_caches > 0)
1047 	{
1048 		tmpcxt = AllocSetContextCreate(CurrentMemoryContext,
1049 									   "RelationBuildDesc workspace",
1050 									   ALLOCSET_DEFAULT_SIZES);
1051 		oldcxt = MemoryContextSwitchTo(tmpcxt);
1052 	}
1053 #endif
1054 
1055 	/* Register to catch invalidation messages */
1056 	if (in_progress_list_len >= in_progress_list_maxlen)
1057 	{
1058 		int			allocsize;
1059 
1060 		allocsize = in_progress_list_maxlen * 2;
1061 		in_progress_list = repalloc(in_progress_list,
1062 									allocsize * sizeof(*in_progress_list));
1063 		in_progress_list_maxlen = allocsize;
1064 	}
1065 	in_progress_offset = in_progress_list_len++;
1066 	in_progress_list[in_progress_offset].reloid = targetRelId;
1067 retry:
1068 	in_progress_list[in_progress_offset].invalidated = false;
1069 
1070 	/*
1071 	 * find the tuple in pg_class corresponding to the given relation id
1072 	 */
1073 	pg_class_tuple = ScanPgRelation(targetRelId, true, false);
1074 
1075 	/*
1076 	 * if no such tuple exists, return NULL
1077 	 */
1078 	if (!HeapTupleIsValid(pg_class_tuple))
1079 	{
1080 #ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
1081 		if (tmpcxt)
1082 		{
1083 			/* Return to caller's context, and blow away the temporary context */
1084 			MemoryContextSwitchTo(oldcxt);
1085 			MemoryContextDelete(tmpcxt);
1086 		}
1087 #endif
1088 		Assert(in_progress_offset + 1 == in_progress_list_len);
1089 		in_progress_list_len--;
1090 		return NULL;
1091 	}
1092 
1093 	/*
1094 	 * get information from the pg_class_tuple
1095 	 */
1096 	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1097 	relid = relp->oid;
1098 	Assert(relid == targetRelId);
1099 
1100 	/*
1101 	 * allocate storage for the relation descriptor, and copy pg_class_tuple
1102 	 * to relation->rd_rel.
1103 	 */
1104 	relation = AllocateRelationDesc(relp);
1105 
1106 	/*
1107 	 * initialize the relation's relation id (relation->rd_id)
1108 	 */
1109 	RelationGetRelid(relation) = relid;
1110 
1111 	/*
1112 	 * Normal relations are not nailed into the cache.  Since we don't flush
1113 	 * new relations, it won't be new.  It could be temp though.
1114 	 */
1115 	relation->rd_refcnt = 0;
1116 	relation->rd_isnailed = false;
1117 	relation->rd_createSubid = InvalidSubTransactionId;
1118 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1119 	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
1120 	relation->rd_droppedSubid = InvalidSubTransactionId;
1121 	switch (relation->rd_rel->relpersistence)
1122 	{
1123 		case RELPERSISTENCE_UNLOGGED:
1124 		case RELPERSISTENCE_PERMANENT:
1125 			relation->rd_backend = InvalidBackendId;
1126 			relation->rd_islocaltemp = false;
1127 			break;
1128 		case RELPERSISTENCE_TEMP:
1129 			if (isTempOrTempToastNamespace(relation->rd_rel->relnamespace))
1130 			{
1131 				relation->rd_backend = BackendIdForTempRelations();
1132 				relation->rd_islocaltemp = true;
1133 			}
1134 			else
1135 			{
1136 				/*
1137 				 * If it's a temp table, but not one of ours, we have to use
1138 				 * the slow, grotty method to figure out the owning backend.
1139 				 *
1140 				 * Note: it's possible that rd_backend gets set to MyBackendId
1141 				 * here, in case we are looking at a pg_class entry left over
1142 				 * from a crashed backend that coincidentally had the same
1143 				 * BackendId we're using.  We should *not* consider such a
1144 				 * table to be "ours"; this is why we need the separate
1145 				 * rd_islocaltemp flag.  The pg_class entry will get flushed
1146 				 * if/when we clean out the corresponding temp table namespace
1147 				 * in preparation for using it.
1148 				 */
1149 				relation->rd_backend =
1150 					GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
1151 				Assert(relation->rd_backend != InvalidBackendId);
1152 				relation->rd_islocaltemp = false;
1153 			}
1154 			break;
1155 		default:
1156 			elog(ERROR, "invalid relpersistence: %c",
1157 				 relation->rd_rel->relpersistence);
1158 			break;
1159 	}
1160 
1161 	/*
1162 	 * initialize the tuple descriptor (relation->rd_att).
1163 	 */
1164 	RelationBuildTupleDesc(relation);
1165 
1166 	/*
1167 	 * Fetch rules and triggers that affect this relation
1168 	 */
1169 	if (relation->rd_rel->relhasrules)
1170 		RelationBuildRuleLock(relation);
1171 	else
1172 	{
1173 		relation->rd_rules = NULL;
1174 		relation->rd_rulescxt = NULL;
1175 	}
1176 
1177 	if (relation->rd_rel->relhastriggers)
1178 		RelationBuildTriggers(relation);
1179 	else
1180 		relation->trigdesc = NULL;
1181 
1182 	if (relation->rd_rel->relrowsecurity)
1183 		RelationBuildRowSecurity(relation);
1184 	else
1185 		relation->rd_rsdesc = NULL;
1186 
1187 	/* foreign key data is not loaded till asked for */
1188 	relation->rd_fkeylist = NIL;
1189 	relation->rd_fkeyvalid = false;
1190 
1191 	/* partitioning data is not loaded till asked for */
1192 	relation->rd_partkey = NULL;
1193 	relation->rd_partkeycxt = NULL;
1194 	relation->rd_partdesc = NULL;
1195 	relation->rd_partdesc_nodetached = NULL;
1196 	relation->rd_partdesc_nodetached_xmin = InvalidTransactionId;
1197 	relation->rd_pdcxt = NULL;
1198 	relation->rd_pddcxt = NULL;
1199 	relation->rd_partcheck = NIL;
1200 	relation->rd_partcheckvalid = false;
1201 	relation->rd_partcheckcxt = NULL;
1202 
1203 	/*
1204 	 * initialize access method information
1205 	 */
1206 	switch (relation->rd_rel->relkind)
1207 	{
1208 		case RELKIND_INDEX:
1209 		case RELKIND_PARTITIONED_INDEX:
1210 			Assert(relation->rd_rel->relam != InvalidOid);
1211 			RelationInitIndexAccessInfo(relation);
1212 			break;
1213 		case RELKIND_RELATION:
1214 		case RELKIND_TOASTVALUE:
1215 		case RELKIND_MATVIEW:
1216 			Assert(relation->rd_rel->relam != InvalidOid);
1217 			RelationInitTableAccessMethod(relation);
1218 			break;
1219 		case RELKIND_SEQUENCE:
1220 			Assert(relation->rd_rel->relam == InvalidOid);
1221 			RelationInitTableAccessMethod(relation);
1222 			break;
1223 		case RELKIND_VIEW:
1224 		case RELKIND_COMPOSITE_TYPE:
1225 		case RELKIND_FOREIGN_TABLE:
1226 		case RELKIND_PARTITIONED_TABLE:
1227 			Assert(relation->rd_rel->relam == InvalidOid);
1228 			break;
1229 	}
1230 
1231 	/* extract reloptions if any */
1232 	RelationParseRelOptions(relation, pg_class_tuple);
1233 
1234 	/*
1235 	 * initialize the relation lock manager information
1236 	 */
1237 	RelationInitLockInfo(relation); /* see lmgr.c */
1238 
1239 	/*
1240 	 * initialize physical addressing information for the relation
1241 	 */
1242 	RelationInitPhysicalAddr(relation);
1243 
1244 	/* make sure relation is marked as having no open file yet */
1245 	relation->rd_smgr = NULL;
1246 
1247 	/*
1248 	 * now we can free the memory allocated for pg_class_tuple
1249 	 */
1250 	heap_freetuple(pg_class_tuple);
1251 
1252 	/*
1253 	 * If an invalidation arrived mid-build, start over.  Between here and the
1254 	 * end of this function, don't add code that does or reasonably could read
1255 	 * system catalogs.  That range must be free from invalidation processing
1256 	 * for the !insertIt case.  For the insertIt case, RelationCacheInsert()
1257 	 * will enroll this relation in ordinary relcache invalidation processing,
1258 	 */
1259 	if (in_progress_list[in_progress_offset].invalidated)
1260 	{
1261 		RelationDestroyRelation(relation, false);
1262 		goto retry;
1263 	}
1264 	Assert(in_progress_offset + 1 == in_progress_list_len);
1265 	in_progress_list_len--;
1266 
1267 	/*
1268 	 * Insert newly created relation into relcache hash table, if requested.
1269 	 *
1270 	 * There is one scenario in which we might find a hashtable entry already
1271 	 * present, even though our caller failed to find it: if the relation is a
1272 	 * system catalog or index that's used during relcache load, we might have
1273 	 * recursively created the same relcache entry during the preceding steps.
1274 	 * So allow RelationCacheInsert to delete any already-present relcache
1275 	 * entry for the same OID.  The already-present entry should have refcount
1276 	 * zero (else somebody forgot to close it); in the event that it doesn't,
1277 	 * we'll elog a WARNING and leak the already-present entry.
1278 	 */
1279 	if (insertIt)
1280 		RelationCacheInsert(relation, true);
1281 
1282 	/* It's fully valid */
1283 	relation->rd_isvalid = true;
1284 
1285 #ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
1286 	if (tmpcxt)
1287 	{
1288 		/* Return to caller's context, and blow away the temporary context */
1289 		MemoryContextSwitchTo(oldcxt);
1290 		MemoryContextDelete(tmpcxt);
1291 	}
1292 #endif
1293 
1294 	return relation;
1295 }
1296 
1297 /*
1298  * Initialize the physical addressing info (RelFileNode) for a relcache entry
1299  *
1300  * Note: at the physical level, relations in the pg_global tablespace must
1301  * be treated as shared, even if relisshared isn't set.  Hence we do not
1302  * look at relisshared here.
1303  */
1304 static void
RelationInitPhysicalAddr(Relation relation)1305 RelationInitPhysicalAddr(Relation relation)
1306 {
1307 	Oid			oldnode = relation->rd_node.relNode;
1308 
1309 	/* these relations kinds never have storage */
1310 	if (!RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
1311 		return;
1312 
1313 	if (relation->rd_rel->reltablespace)
1314 		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
1315 	else
1316 		relation->rd_node.spcNode = MyDatabaseTableSpace;
1317 	if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
1318 		relation->rd_node.dbNode = InvalidOid;
1319 	else
1320 		relation->rd_node.dbNode = MyDatabaseId;
1321 
1322 	if (relation->rd_rel->relfilenode)
1323 	{
1324 		/*
1325 		 * Even if we are using a decoding snapshot that doesn't represent the
1326 		 * current state of the catalog we need to make sure the filenode
1327 		 * points to the current file since the older file will be gone (or
1328 		 * truncated). The new file will still contain older rows so lookups
1329 		 * in them will work correctly. This wouldn't work correctly if
1330 		 * rewrites were allowed to change the schema in an incompatible way,
1331 		 * but those are prevented both on catalog tables and on user tables
1332 		 * declared as additional catalog tables.
1333 		 */
1334 		if (HistoricSnapshotActive()
1335 			&& RelationIsAccessibleInLogicalDecoding(relation)
1336 			&& IsTransactionState())
1337 		{
1338 			HeapTuple	phys_tuple;
1339 			Form_pg_class physrel;
1340 
1341 			phys_tuple = ScanPgRelation(RelationGetRelid(relation),
1342 										RelationGetRelid(relation) != ClassOidIndexId,
1343 										true);
1344 			if (!HeapTupleIsValid(phys_tuple))
1345 				elog(ERROR, "could not find pg_class entry for %u",
1346 					 RelationGetRelid(relation));
1347 			physrel = (Form_pg_class) GETSTRUCT(phys_tuple);
1348 
1349 			relation->rd_rel->reltablespace = physrel->reltablespace;
1350 			relation->rd_rel->relfilenode = physrel->relfilenode;
1351 			heap_freetuple(phys_tuple);
1352 		}
1353 
1354 		relation->rd_node.relNode = relation->rd_rel->relfilenode;
1355 	}
1356 	else
1357 	{
1358 		/* Consult the relation mapper */
1359 		relation->rd_node.relNode =
1360 			RelationMapOidToFilenode(relation->rd_id,
1361 									 relation->rd_rel->relisshared);
1362 		if (!OidIsValid(relation->rd_node.relNode))
1363 			elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1364 				 RelationGetRelationName(relation), relation->rd_id);
1365 	}
1366 
1367 	/*
1368 	 * For RelationNeedsWAL() to answer correctly on parallel workers, restore
1369 	 * rd_firstRelfilenodeSubid.  No subtransactions start or end while in
1370 	 * parallel mode, so the specific SubTransactionId does not matter.
1371 	 */
1372 	if (IsParallelWorker() && oldnode != relation->rd_node.relNode)
1373 	{
1374 		if (RelFileNodeSkippingWAL(relation->rd_node))
1375 			relation->rd_firstRelfilenodeSubid = TopSubTransactionId;
1376 		else
1377 			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
1378 	}
1379 }
1380 
1381 /*
1382  * Fill in the IndexAmRoutine for an index relation.
1383  *
1384  * relation's rd_amhandler and rd_indexcxt must be valid already.
1385  */
1386 static void
InitIndexAmRoutine(Relation relation)1387 InitIndexAmRoutine(Relation relation)
1388 {
1389 	IndexAmRoutine *cached,
1390 			   *tmp;
1391 
1392 	/*
1393 	 * Call the amhandler in current, short-lived memory context, just in case
1394 	 * it leaks anything (it probably won't, but let's be paranoid).
1395 	 */
1396 	tmp = GetIndexAmRoutine(relation->rd_amhandler);
1397 
1398 	/* OK, now transfer the data into relation's rd_indexcxt. */
1399 	cached = (IndexAmRoutine *) MemoryContextAlloc(relation->rd_indexcxt,
1400 												   sizeof(IndexAmRoutine));
1401 	memcpy(cached, tmp, sizeof(IndexAmRoutine));
1402 	relation->rd_indam = cached;
1403 
1404 	pfree(tmp);
1405 }
1406 
1407 /*
1408  * Initialize index-access-method support data for an index relation
1409  */
1410 void
RelationInitIndexAccessInfo(Relation relation)1411 RelationInitIndexAccessInfo(Relation relation)
1412 {
1413 	HeapTuple	tuple;
1414 	Form_pg_am	aform;
1415 	Datum		indcollDatum;
1416 	Datum		indclassDatum;
1417 	Datum		indoptionDatum;
1418 	bool		isnull;
1419 	oidvector  *indcoll;
1420 	oidvector  *indclass;
1421 	int2vector *indoption;
1422 	MemoryContext indexcxt;
1423 	MemoryContext oldcontext;
1424 	int			indnatts;
1425 	int			indnkeyatts;
1426 	uint16		amsupport;
1427 
1428 	/*
1429 	 * Make a copy of the pg_index entry for the index.  Since pg_index
1430 	 * contains variable-length and possibly-null fields, we have to do this
1431 	 * honestly rather than just treating it as a Form_pg_index struct.
1432 	 */
1433 	tuple = SearchSysCache1(INDEXRELID,
1434 							ObjectIdGetDatum(RelationGetRelid(relation)));
1435 	if (!HeapTupleIsValid(tuple))
1436 		elog(ERROR, "cache lookup failed for index %u",
1437 			 RelationGetRelid(relation));
1438 	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
1439 	relation->rd_indextuple = heap_copytuple(tuple);
1440 	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
1441 	MemoryContextSwitchTo(oldcontext);
1442 	ReleaseSysCache(tuple);
1443 
1444 	/*
1445 	 * Look up the index's access method, save the OID of its handler function
1446 	 */
1447 	tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
1448 	if (!HeapTupleIsValid(tuple))
1449 		elog(ERROR, "cache lookup failed for access method %u",
1450 			 relation->rd_rel->relam);
1451 	aform = (Form_pg_am) GETSTRUCT(tuple);
1452 	relation->rd_amhandler = aform->amhandler;
1453 	ReleaseSysCache(tuple);
1454 
1455 	indnatts = RelationGetNumberOfAttributes(relation);
1456 	if (indnatts != IndexRelationGetNumberOfAttributes(relation))
1457 		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1458 			 RelationGetRelid(relation));
1459 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
1460 
1461 	/*
1462 	 * Make the private context to hold index access info.  The reason we need
1463 	 * a context, and not just a couple of pallocs, is so that we won't leak
1464 	 * any subsidiary info attached to fmgr lookup records.
1465 	 */
1466 	indexcxt = AllocSetContextCreate(CacheMemoryContext,
1467 									 "index info",
1468 									 ALLOCSET_SMALL_SIZES);
1469 	relation->rd_indexcxt = indexcxt;
1470 	MemoryContextCopyAndSetIdentifier(indexcxt,
1471 									  RelationGetRelationName(relation));
1472 
1473 	/*
1474 	 * Now we can fetch the index AM's API struct
1475 	 */
1476 	InitIndexAmRoutine(relation);
1477 
1478 	/*
1479 	 * Allocate arrays to hold data. Opclasses are not used for included
1480 	 * columns, so allocate them for indnkeyatts only.
1481 	 */
1482 	relation->rd_opfamily = (Oid *)
1483 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1484 	relation->rd_opcintype = (Oid *)
1485 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1486 
1487 	amsupport = relation->rd_indam->amsupport;
1488 	if (amsupport > 0)
1489 	{
1490 		int			nsupport = indnatts * amsupport;
1491 
1492 		relation->rd_support = (RegProcedure *)
1493 			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1494 		relation->rd_supportinfo = (FmgrInfo *)
1495 			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1496 	}
1497 	else
1498 	{
1499 		relation->rd_support = NULL;
1500 		relation->rd_supportinfo = NULL;
1501 	}
1502 
1503 	relation->rd_indcollation = (Oid *)
1504 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1505 
1506 	relation->rd_indoption = (int16 *)
1507 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(int16));
1508 
1509 	/*
1510 	 * indcollation cannot be referenced directly through the C struct,
1511 	 * because it comes after the variable-width indkey field.  Must extract
1512 	 * the datum the hard way...
1513 	 */
1514 	indcollDatum = fastgetattr(relation->rd_indextuple,
1515 							   Anum_pg_index_indcollation,
1516 							   GetPgIndexDescriptor(),
1517 							   &isnull);
1518 	Assert(!isnull);
1519 	indcoll = (oidvector *) DatumGetPointer(indcollDatum);
1520 	memcpy(relation->rd_indcollation, indcoll->values, indnkeyatts * sizeof(Oid));
1521 
1522 	/*
1523 	 * indclass cannot be referenced directly through the C struct, because it
1524 	 * comes after the variable-width indkey field.  Must extract the datum
1525 	 * the hard way...
1526 	 */
1527 	indclassDatum = fastgetattr(relation->rd_indextuple,
1528 								Anum_pg_index_indclass,
1529 								GetPgIndexDescriptor(),
1530 								&isnull);
1531 	Assert(!isnull);
1532 	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1533 
1534 	/*
1535 	 * Fill the support procedure OID array, as well as the info about
1536 	 * opfamilies and opclass input types.  (aminfo and supportinfo are left
1537 	 * as zeroes, and are filled on-the-fly when used)
1538 	 */
1539 	IndexSupportInitialize(indclass, relation->rd_support,
1540 						   relation->rd_opfamily, relation->rd_opcintype,
1541 						   amsupport, indnkeyatts);
1542 
1543 	/*
1544 	 * Similarly extract indoption and copy it to the cache entry
1545 	 */
1546 	indoptionDatum = fastgetattr(relation->rd_indextuple,
1547 								 Anum_pg_index_indoption,
1548 								 GetPgIndexDescriptor(),
1549 								 &isnull);
1550 	Assert(!isnull);
1551 	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
1552 	memcpy(relation->rd_indoption, indoption->values, indnkeyatts * sizeof(int16));
1553 
1554 	(void) RelationGetIndexAttOptions(relation, false);
1555 
1556 	/*
1557 	 * expressions, predicate, exclusion caches will be filled later
1558 	 */
1559 	relation->rd_indexprs = NIL;
1560 	relation->rd_indpred = NIL;
1561 	relation->rd_exclops = NULL;
1562 	relation->rd_exclprocs = NULL;
1563 	relation->rd_exclstrats = NULL;
1564 	relation->rd_amcache = NULL;
1565 }
1566 
1567 /*
1568  * IndexSupportInitialize
1569  *		Initializes an index's cached opclass information,
1570  *		given the index's pg_index.indclass entry.
1571  *
1572  * Data is returned into *indexSupport, *opFamily, and *opcInType,
1573  * which are arrays allocated by the caller.
1574  *
1575  * The caller also passes maxSupportNumber and maxAttributeNumber, since these
1576  * indicate the size of the arrays it has allocated --- but in practice these
1577  * numbers must always match those obtainable from the system catalog entries
1578  * for the index and access method.
1579  */
1580 static void
IndexSupportInitialize(oidvector * indclass,RegProcedure * indexSupport,Oid * opFamily,Oid * opcInType,StrategyNumber maxSupportNumber,AttrNumber maxAttributeNumber)1581 IndexSupportInitialize(oidvector *indclass,
1582 					   RegProcedure *indexSupport,
1583 					   Oid *opFamily,
1584 					   Oid *opcInType,
1585 					   StrategyNumber maxSupportNumber,
1586 					   AttrNumber maxAttributeNumber)
1587 {
1588 	int			attIndex;
1589 
1590 	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1591 	{
1592 		OpClassCacheEnt *opcentry;
1593 
1594 		if (!OidIsValid(indclass->values[attIndex]))
1595 			elog(ERROR, "bogus pg_index tuple");
1596 
1597 		/* look up the info for this opclass, using a cache */
1598 		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1599 									 maxSupportNumber);
1600 
1601 		/* copy cached data into relcache entry */
1602 		opFamily[attIndex] = opcentry->opcfamily;
1603 		opcInType[attIndex] = opcentry->opcintype;
1604 		if (maxSupportNumber > 0)
1605 			memcpy(&indexSupport[attIndex * maxSupportNumber],
1606 				   opcentry->supportProcs,
1607 				   maxSupportNumber * sizeof(RegProcedure));
1608 	}
1609 }
1610 
1611 /*
1612  * LookupOpclassInfo
1613  *
1614  * This routine maintains a per-opclass cache of the information needed
1615  * by IndexSupportInitialize().  This is more efficient than relying on
1616  * the catalog cache, because we can load all the info about a particular
1617  * opclass in a single indexscan of pg_amproc.
1618  *
1619  * The information from pg_am about expected range of support function
1620  * numbers is passed in, rather than being looked up, mainly because the
1621  * caller will have it already.
1622  *
1623  * Note there is no provision for flushing the cache.  This is OK at the
1624  * moment because there is no way to ALTER any interesting properties of an
1625  * existing opclass --- all you can do is drop it, which will result in
1626  * a useless but harmless dead entry in the cache.  To support altering
1627  * opclass membership (not the same as opfamily membership!), we'd need to
1628  * be able to flush this cache as well as the contents of relcache entries
1629  * for indexes.
1630  */
1631 static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,StrategyNumber numSupport)1632 LookupOpclassInfo(Oid operatorClassOid,
1633 				  StrategyNumber numSupport)
1634 {
1635 	OpClassCacheEnt *opcentry;
1636 	bool		found;
1637 	Relation	rel;
1638 	SysScanDesc scan;
1639 	ScanKeyData skey[3];
1640 	HeapTuple	htup;
1641 	bool		indexOK;
1642 
1643 	if (OpClassCache == NULL)
1644 	{
1645 		/* First time through: initialize the opclass cache */
1646 		HASHCTL		ctl;
1647 
1648 		/* Also make sure CacheMemoryContext exists */
1649 		if (!CacheMemoryContext)
1650 			CreateCacheMemoryContext();
1651 
1652 		ctl.keysize = sizeof(Oid);
1653 		ctl.entrysize = sizeof(OpClassCacheEnt);
1654 		OpClassCache = hash_create("Operator class cache", 64,
1655 								   &ctl, HASH_ELEM | HASH_BLOBS);
1656 	}
1657 
1658 	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1659 											   (void *) &operatorClassOid,
1660 											   HASH_ENTER, &found);
1661 
1662 	if (!found)
1663 	{
1664 		/* Initialize new entry */
1665 		opcentry->valid = false;	/* until known OK */
1666 		opcentry->numSupport = numSupport;
1667 		opcentry->supportProcs = NULL;	/* filled below */
1668 	}
1669 	else
1670 	{
1671 		Assert(numSupport == opcentry->numSupport);
1672 	}
1673 
1674 	/*
1675 	 * When aggressively testing cache-flush hazards, we disable the operator
1676 	 * class cache and force reloading of the info on each call.  This models
1677 	 * no real-world behavior, since the cache entries are never invalidated
1678 	 * otherwise.  However it can be helpful for detecting bugs in the cache
1679 	 * loading logic itself, such as reliance on a non-nailed index.  Given
1680 	 * the limited use-case and the fact that this adds a great deal of
1681 	 * expense, we enable it only for high values of debug_discard_caches.
1682 	 */
1683 #ifdef DISCARD_CACHES_ENABLED
1684 	if (debug_discard_caches > 2)
1685 		opcentry->valid = false;
1686 #endif
1687 
1688 	if (opcentry->valid)
1689 		return opcentry;
1690 
1691 	/*
1692 	 * Need to fill in new entry.  First allocate space, unless we already did
1693 	 * so in some previous attempt.
1694 	 */
1695 	if (opcentry->supportProcs == NULL && numSupport > 0)
1696 		opcentry->supportProcs = (RegProcedure *)
1697 			MemoryContextAllocZero(CacheMemoryContext,
1698 								   numSupport * sizeof(RegProcedure));
1699 
1700 	/*
1701 	 * To avoid infinite recursion during startup, force heap scans if we're
1702 	 * looking up info for the opclasses used by the indexes we would like to
1703 	 * reference here.
1704 	 */
1705 	indexOK = criticalRelcachesBuilt ||
1706 		(operatorClassOid != OID_BTREE_OPS_OID &&
1707 		 operatorClassOid != INT2_BTREE_OPS_OID);
1708 
1709 	/*
1710 	 * We have to fetch the pg_opclass row to determine its opfamily and
1711 	 * opcintype, which are needed to look up related operators and functions.
1712 	 * It'd be convenient to use the syscache here, but that probably doesn't
1713 	 * work while bootstrapping.
1714 	 */
1715 	ScanKeyInit(&skey[0],
1716 				Anum_pg_opclass_oid,
1717 				BTEqualStrategyNumber, F_OIDEQ,
1718 				ObjectIdGetDatum(operatorClassOid));
1719 	rel = table_open(OperatorClassRelationId, AccessShareLock);
1720 	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
1721 							  NULL, 1, skey);
1722 
1723 	if (HeapTupleIsValid(htup = systable_getnext(scan)))
1724 	{
1725 		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
1726 
1727 		opcentry->opcfamily = opclassform->opcfamily;
1728 		opcentry->opcintype = opclassform->opcintype;
1729 	}
1730 	else
1731 		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
1732 
1733 	systable_endscan(scan);
1734 	table_close(rel, AccessShareLock);
1735 
1736 	/*
1737 	 * Scan pg_amproc to obtain support procs for the opclass.  We only fetch
1738 	 * the default ones (those with lefttype = righttype = opcintype).
1739 	 */
1740 	if (numSupport > 0)
1741 	{
1742 		ScanKeyInit(&skey[0],
1743 					Anum_pg_amproc_amprocfamily,
1744 					BTEqualStrategyNumber, F_OIDEQ,
1745 					ObjectIdGetDatum(opcentry->opcfamily));
1746 		ScanKeyInit(&skey[1],
1747 					Anum_pg_amproc_amproclefttype,
1748 					BTEqualStrategyNumber, F_OIDEQ,
1749 					ObjectIdGetDatum(opcentry->opcintype));
1750 		ScanKeyInit(&skey[2],
1751 					Anum_pg_amproc_amprocrighttype,
1752 					BTEqualStrategyNumber, F_OIDEQ,
1753 					ObjectIdGetDatum(opcentry->opcintype));
1754 		rel = table_open(AccessMethodProcedureRelationId, AccessShareLock);
1755 		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1756 								  NULL, 3, skey);
1757 
1758 		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1759 		{
1760 			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1761 
1762 			if (amprocform->amprocnum <= 0 ||
1763 				(StrategyNumber) amprocform->amprocnum > numSupport)
1764 				elog(ERROR, "invalid amproc number %d for opclass %u",
1765 					 amprocform->amprocnum, operatorClassOid);
1766 
1767 			opcentry->supportProcs[amprocform->amprocnum - 1] =
1768 				amprocform->amproc;
1769 		}
1770 
1771 		systable_endscan(scan);
1772 		table_close(rel, AccessShareLock);
1773 	}
1774 
1775 	opcentry->valid = true;
1776 	return opcentry;
1777 }
1778 
1779 /*
1780  * Fill in the TableAmRoutine for a relation
1781  *
1782  * relation's rd_amhandler must be valid already.
1783  */
1784 static void
InitTableAmRoutine(Relation relation)1785 InitTableAmRoutine(Relation relation)
1786 {
1787 	relation->rd_tableam = GetTableAmRoutine(relation->rd_amhandler);
1788 }
1789 
1790 /*
1791  * Initialize table access method support for a table like relation
1792  */
1793 void
RelationInitTableAccessMethod(Relation relation)1794 RelationInitTableAccessMethod(Relation relation)
1795 {
1796 	HeapTuple	tuple;
1797 	Form_pg_am	aform;
1798 
1799 	if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1800 	{
1801 		/*
1802 		 * Sequences are currently accessed like heap tables, but it doesn't
1803 		 * seem prudent to show that in the catalog. So just overwrite it
1804 		 * here.
1805 		 */
1806 		relation->rd_amhandler = F_HEAP_TABLEAM_HANDLER;
1807 	}
1808 	else if (IsCatalogRelation(relation))
1809 	{
1810 		/*
1811 		 * Avoid doing a syscache lookup for catalog tables.
1812 		 */
1813 		Assert(relation->rd_rel->relam == HEAP_TABLE_AM_OID);
1814 		relation->rd_amhandler = F_HEAP_TABLEAM_HANDLER;
1815 	}
1816 	else
1817 	{
1818 		/*
1819 		 * Look up the table access method, save the OID of its handler
1820 		 * function.
1821 		 */
1822 		Assert(relation->rd_rel->relam != InvalidOid);
1823 		tuple = SearchSysCache1(AMOID,
1824 								ObjectIdGetDatum(relation->rd_rel->relam));
1825 		if (!HeapTupleIsValid(tuple))
1826 			elog(ERROR, "cache lookup failed for access method %u",
1827 				 relation->rd_rel->relam);
1828 		aform = (Form_pg_am) GETSTRUCT(tuple);
1829 		relation->rd_amhandler = aform->amhandler;
1830 		ReleaseSysCache(tuple);
1831 	}
1832 
1833 	/*
1834 	 * Now we can fetch the table AM's API struct
1835 	 */
1836 	InitTableAmRoutine(relation);
1837 }
1838 
1839 /*
1840  *		formrdesc
1841  *
1842  *		This is a special cut-down version of RelationBuildDesc(),
1843  *		used while initializing the relcache.
1844  *		The relation descriptor is built just from the supplied parameters,
1845  *		without actually looking at any system table entries.  We cheat
1846  *		quite a lot since we only need to work for a few basic system
1847  *		catalogs.
1848  *
1849  * The catalogs this is used for can't have constraints (except attnotnull),
1850  * default values, rules, or triggers, since we don't cope with any of that.
1851  * (Well, actually, this only matters for properties that need to be valid
1852  * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
1853  * these properties matter then...)
1854  *
1855  * NOTE: we assume we are already switched into CacheMemoryContext.
1856  */
1857 static void
formrdesc(const char * relationName,Oid relationReltype,bool isshared,int natts,const FormData_pg_attribute * attrs)1858 formrdesc(const char *relationName, Oid relationReltype,
1859 		  bool isshared,
1860 		  int natts, const FormData_pg_attribute *attrs)
1861 {
1862 	Relation	relation;
1863 	int			i;
1864 	bool		has_not_null;
1865 
1866 	/*
1867 	 * allocate new relation desc, clear all fields of reldesc
1868 	 */
1869 	relation = (Relation) palloc0(sizeof(RelationData));
1870 
1871 	/* make sure relation is marked as having no open file yet */
1872 	relation->rd_smgr = NULL;
1873 
1874 	/*
1875 	 * initialize reference count: 1 because it is nailed in cache
1876 	 */
1877 	relation->rd_refcnt = 1;
1878 
1879 	/*
1880 	 * all entries built with this routine are nailed-in-cache; none are for
1881 	 * new or temp relations.
1882 	 */
1883 	relation->rd_isnailed = true;
1884 	relation->rd_createSubid = InvalidSubTransactionId;
1885 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1886 	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
1887 	relation->rd_droppedSubid = InvalidSubTransactionId;
1888 	relation->rd_backend = InvalidBackendId;
1889 	relation->rd_islocaltemp = false;
1890 
1891 	/*
1892 	 * initialize relation tuple form
1893 	 *
1894 	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
1895 	 * get us launched.  RelationCacheInitializePhase3() will read the real
1896 	 * data from pg_class and replace what we've done here.  Note in
1897 	 * particular that relowner is left as zero; this cues
1898 	 * RelationCacheInitializePhase3 that the real data isn't there yet.
1899 	 */
1900 	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1901 
1902 	namestrcpy(&relation->rd_rel->relname, relationName);
1903 	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1904 	relation->rd_rel->reltype = relationReltype;
1905 
1906 	/*
1907 	 * It's important to distinguish between shared and non-shared relations,
1908 	 * even at bootstrap time, to make sure we know where they are stored.
1909 	 */
1910 	relation->rd_rel->relisshared = isshared;
1911 	if (isshared)
1912 		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1913 
1914 	/* formrdesc is used only for permanent relations */
1915 	relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
1916 
1917 	/* ... and they're always populated, too */
1918 	relation->rd_rel->relispopulated = true;
1919 
1920 	relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
1921 	relation->rd_rel->relpages = 0;
1922 	relation->rd_rel->reltuples = -1;
1923 	relation->rd_rel->relallvisible = 0;
1924 	relation->rd_rel->relkind = RELKIND_RELATION;
1925 	relation->rd_rel->relnatts = (int16) natts;
1926 	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
1927 
1928 	/*
1929 	 * initialize attribute tuple form
1930 	 *
1931 	 * Unlike the case with the relation tuple, this data had better be right
1932 	 * because it will never be replaced.  The data comes from
1933 	 * src/include/catalog/ headers via genbki.pl.
1934 	 */
1935 	relation->rd_att = CreateTemplateTupleDesc(natts);
1936 	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */
1937 
1938 	relation->rd_att->tdtypeid = relationReltype;
1939 	relation->rd_att->tdtypmod = -1;	/* just to be sure */
1940 
1941 	/*
1942 	 * initialize tuple desc info
1943 	 */
1944 	has_not_null = false;
1945 	for (i = 0; i < natts; i++)
1946 	{
1947 		memcpy(TupleDescAttr(relation->rd_att, i),
1948 			   &attrs[i],
1949 			   ATTRIBUTE_FIXED_PART_SIZE);
1950 		has_not_null |= attrs[i].attnotnull;
1951 		/* make sure attcacheoff is valid */
1952 		TupleDescAttr(relation->rd_att, i)->attcacheoff = -1;
1953 	}
1954 
1955 	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1956 	TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
1957 
1958 	/* mark not-null status */
1959 	if (has_not_null)
1960 	{
1961 		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1962 
1963 		constr->has_not_null = true;
1964 		relation->rd_att->constr = constr;
1965 	}
1966 
1967 	/*
1968 	 * initialize relation id from info in att array (my, this is ugly)
1969 	 */
1970 	RelationGetRelid(relation) = TupleDescAttr(relation->rd_att, 0)->attrelid;
1971 
1972 	/*
1973 	 * All relations made with formrdesc are mapped.  This is necessarily so
1974 	 * because there is no other way to know what filenode they currently
1975 	 * have.  In bootstrap mode, add them to the initial relation mapper data,
1976 	 * specifying that the initial filenode is the same as the OID.
1977 	 */
1978 	relation->rd_rel->relfilenode = InvalidOid;
1979 	if (IsBootstrapProcessingMode())
1980 		RelationMapUpdateMap(RelationGetRelid(relation),
1981 							 RelationGetRelid(relation),
1982 							 isshared, true);
1983 
1984 	/*
1985 	 * initialize the relation lock manager information
1986 	 */
1987 	RelationInitLockInfo(relation); /* see lmgr.c */
1988 
1989 	/*
1990 	 * initialize physical addressing information for the relation
1991 	 */
1992 	RelationInitPhysicalAddr(relation);
1993 
1994 	/*
1995 	 * initialize the table am handler
1996 	 */
1997 	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
1998 	relation->rd_tableam = GetHeapamTableAmRoutine();
1999 
2000 	/*
2001 	 * initialize the rel-has-index flag, using hardwired knowledge
2002 	 */
2003 	if (IsBootstrapProcessingMode())
2004 	{
2005 		/* In bootstrap mode, we have no indexes */
2006 		relation->rd_rel->relhasindex = false;
2007 	}
2008 	else
2009 	{
2010 		/* Otherwise, all the rels formrdesc is used for have indexes */
2011 		relation->rd_rel->relhasindex = true;
2012 	}
2013 
2014 	/*
2015 	 * add new reldesc to relcache
2016 	 */
2017 	RelationCacheInsert(relation, false);
2018 
2019 	/* It's fully valid */
2020 	relation->rd_isvalid = true;
2021 }
2022 
2023 
2024 /* ----------------------------------------------------------------
2025  *				 Relation Descriptor Lookup Interface
2026  * ----------------------------------------------------------------
2027  */
2028 
2029 /*
2030  *		RelationIdGetRelation
2031  *
2032  *		Lookup a reldesc by OID; make one if not already in cache.
2033  *
2034  *		Returns NULL if no pg_class row could be found for the given relid
2035  *		(suggesting we are trying to access a just-deleted relation).
2036  *		Any other error is reported via elog.
2037  *
2038  *		NB: caller should already have at least AccessShareLock on the
2039  *		relation ID, else there are nasty race conditions.
2040  *
2041  *		NB: relation ref count is incremented, or set to 1 if new entry.
2042  *		Caller should eventually decrement count.  (Usually,
2043  *		that happens by calling RelationClose().)
2044  */
2045 Relation
RelationIdGetRelation(Oid relationId)2046 RelationIdGetRelation(Oid relationId)
2047 {
2048 	Relation	rd;
2049 
2050 	/* Make sure we're in an xact, even if this ends up being a cache hit */
2051 	Assert(IsTransactionState());
2052 
2053 	/*
2054 	 * first try to find reldesc in the cache
2055 	 */
2056 	RelationIdCacheLookup(relationId, rd);
2057 
2058 	if (RelationIsValid(rd))
2059 	{
2060 		/* return NULL for dropped relations */
2061 		if (rd->rd_droppedSubid != InvalidSubTransactionId)
2062 		{
2063 			Assert(!rd->rd_isvalid);
2064 			return NULL;
2065 		}
2066 
2067 		RelationIncrementReferenceCount(rd);
2068 		/* revalidate cache entry if necessary */
2069 		if (!rd->rd_isvalid)
2070 		{
2071 			/*
2072 			 * Indexes only have a limited number of possible schema changes,
2073 			 * and we don't want to use the full-blown procedure because it's
2074 			 * a headache for indexes that reload itself depends on.
2075 			 */
2076 			if (rd->rd_rel->relkind == RELKIND_INDEX ||
2077 				rd->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
2078 				RelationReloadIndexInfo(rd);
2079 			else
2080 				RelationClearRelation(rd, true);
2081 
2082 			/*
2083 			 * Normally entries need to be valid here, but before the relcache
2084 			 * has been initialized, not enough infrastructure exists to
2085 			 * perform pg_class lookups. The structure of such entries doesn't
2086 			 * change, but we still want to update the rd_rel entry. So
2087 			 * rd_isvalid = false is left in place for a later lookup.
2088 			 */
2089 			Assert(rd->rd_isvalid ||
2090 				   (rd->rd_isnailed && !criticalRelcachesBuilt));
2091 		}
2092 		return rd;
2093 	}
2094 
2095 	/*
2096 	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
2097 	 * it.
2098 	 */
2099 	rd = RelationBuildDesc(relationId, true);
2100 	if (RelationIsValid(rd))
2101 		RelationIncrementReferenceCount(rd);
2102 	return rd;
2103 }
2104 
2105 /* ----------------------------------------------------------------
2106  *				cache invalidation support routines
2107  * ----------------------------------------------------------------
2108  */
2109 
2110 /*
2111  * RelationIncrementReferenceCount
2112  *		Increments relation reference count.
2113  *
2114  * Note: bootstrap mode has its own weird ideas about relation refcount
2115  * behavior; we ought to fix it someday, but for now, just disable
2116  * reference count ownership tracking in bootstrap mode.
2117  */
2118 void
RelationIncrementReferenceCount(Relation rel)2119 RelationIncrementReferenceCount(Relation rel)
2120 {
2121 	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
2122 	rel->rd_refcnt += 1;
2123 	if (!IsBootstrapProcessingMode())
2124 		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
2125 }
2126 
2127 /*
2128  * RelationDecrementReferenceCount
2129  *		Decrements relation reference count.
2130  */
2131 void
RelationDecrementReferenceCount(Relation rel)2132 RelationDecrementReferenceCount(Relation rel)
2133 {
2134 	Assert(rel->rd_refcnt > 0);
2135 	rel->rd_refcnt -= 1;
2136 	if (!IsBootstrapProcessingMode())
2137 		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
2138 }
2139 
2140 /*
2141  * RelationClose - close an open relation
2142  *
2143  *	Actually, we just decrement the refcount.
2144  *
2145  *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
2146  *	will be freed as soon as their refcount goes to zero.  In combination
2147  *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
2148  *	to catch references to already-released relcache entries.  It slows
2149  *	things down quite a bit, however.
2150  */
2151 void
RelationClose(Relation relation)2152 RelationClose(Relation relation)
2153 {
2154 	/* Note: no locking manipulations needed */
2155 	RelationDecrementReferenceCount(relation);
2156 
2157 	/*
2158 	 * If the relation is no longer open in this session, we can clean up any
2159 	 * stale partition descriptors it has.  This is unlikely, so check to see
2160 	 * if there are child contexts before expending a call to mcxt.c.
2161 	 */
2162 	if (RelationHasReferenceCountZero(relation))
2163 	{
2164 		if (relation->rd_pdcxt != NULL &&
2165 			relation->rd_pdcxt->firstchild != NULL)
2166 			MemoryContextDeleteChildren(relation->rd_pdcxt);
2167 
2168 		if (relation->rd_pddcxt != NULL &&
2169 			relation->rd_pddcxt->firstchild != NULL)
2170 			MemoryContextDeleteChildren(relation->rd_pddcxt);
2171 	}
2172 
2173 #ifdef RELCACHE_FORCE_RELEASE
2174 	if (RelationHasReferenceCountZero(relation) &&
2175 		relation->rd_createSubid == InvalidSubTransactionId &&
2176 		relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
2177 		RelationClearRelation(relation, false);
2178 #endif
2179 }
2180 
2181 /*
2182  * RelationReloadIndexInfo - reload minimal information for an open index
2183  *
2184  *	This function is used only for indexes.  A relcache inval on an index
2185  *	can mean that its pg_class or pg_index row changed.  There are only
2186  *	very limited changes that are allowed to an existing index's schema,
2187  *	so we can update the relcache entry without a complete rebuild; which
2188  *	is fortunate because we can't rebuild an index entry that is "nailed"
2189  *	and/or in active use.  We support full replacement of the pg_class row,
2190  *	as well as updates of a few simple fields of the pg_index row.
2191  *
2192  *	We can't necessarily reread the catalog rows right away; we might be
2193  *	in a failed transaction when we receive the SI notification.  If so,
2194  *	RelationClearRelation just marks the entry as invalid by setting
2195  *	rd_isvalid to false.  This routine is called to fix the entry when it
2196  *	is next needed.
2197  *
2198  *	We assume that at the time we are called, we have at least AccessShareLock
2199  *	on the target index.  (Note: in the calls from RelationClearRelation,
2200  *	this is legitimate because we know the rel has positive refcount.)
2201  *
2202  *	If the target index is an index on pg_class or pg_index, we'd better have
2203  *	previously gotten at least AccessShareLock on its underlying catalog,
2204  *	else we are at risk of deadlock against someone trying to exclusive-lock
2205  *	the heap and index in that order.  This is ensured in current usage by
2206  *	only applying this to indexes being opened or having positive refcount.
2207  */
2208 static void
RelationReloadIndexInfo(Relation relation)2209 RelationReloadIndexInfo(Relation relation)
2210 {
2211 	bool		indexOK;
2212 	HeapTuple	pg_class_tuple;
2213 	Form_pg_class relp;
2214 
2215 	/* Should be called only for invalidated, live indexes */
2216 	Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
2217 			relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2218 		   !relation->rd_isvalid &&
2219 		   relation->rd_droppedSubid == InvalidSubTransactionId);
2220 
2221 	/* Ensure it's closed at smgr level */
2222 	RelationCloseSmgr(relation);
2223 
2224 	/* Must free any AM cached data upon relcache flush */
2225 	if (relation->rd_amcache)
2226 		pfree(relation->rd_amcache);
2227 	relation->rd_amcache = NULL;
2228 
2229 	/*
2230 	 * If it's a shared index, we might be called before backend startup has
2231 	 * finished selecting a database, in which case we have no way to read
2232 	 * pg_class yet.  However, a shared index can never have any significant
2233 	 * schema updates, so it's okay to ignore the invalidation signal.  Just
2234 	 * mark it valid and return without doing anything more.
2235 	 */
2236 	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
2237 	{
2238 		relation->rd_isvalid = true;
2239 		return;
2240 	}
2241 
2242 	/*
2243 	 * Read the pg_class row
2244 	 *
2245 	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
2246 	 * for pg_class_oid_index ...
2247 	 */
2248 	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2249 	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
2250 	if (!HeapTupleIsValid(pg_class_tuple))
2251 		elog(ERROR, "could not find pg_class tuple for index %u",
2252 			 RelationGetRelid(relation));
2253 	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2254 	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2255 	/* Reload reloptions in case they changed */
2256 	if (relation->rd_options)
2257 		pfree(relation->rd_options);
2258 	RelationParseRelOptions(relation, pg_class_tuple);
2259 	/* done with pg_class tuple */
2260 	heap_freetuple(pg_class_tuple);
2261 	/* We must recalculate physical address in case it changed */
2262 	RelationInitPhysicalAddr(relation);
2263 
2264 	/*
2265 	 * For a non-system index, there are fields of the pg_index row that are
2266 	 * allowed to change, so re-read that row and update the relcache entry.
2267 	 * Most of the info derived from pg_index (such as support function lookup
2268 	 * info) cannot change, and indeed the whole point of this routine is to
2269 	 * update the relcache entry without clobbering that data; so wholesale
2270 	 * replacement is not appropriate.
2271 	 */
2272 	if (!IsSystemRelation(relation))
2273 	{
2274 		HeapTuple	tuple;
2275 		Form_pg_index index;
2276 
2277 		tuple = SearchSysCache1(INDEXRELID,
2278 								ObjectIdGetDatum(RelationGetRelid(relation)));
2279 		if (!HeapTupleIsValid(tuple))
2280 			elog(ERROR, "cache lookup failed for index %u",
2281 				 RelationGetRelid(relation));
2282 		index = (Form_pg_index) GETSTRUCT(tuple);
2283 
2284 		/*
2285 		 * Basically, let's just copy all the bool fields.  There are one or
2286 		 * two of these that can't actually change in the current code, but
2287 		 * it's not worth it to track exactly which ones they are.  None of
2288 		 * the array fields are allowed to change, though.
2289 		 */
2290 		relation->rd_index->indisunique = index->indisunique;
2291 		relation->rd_index->indisprimary = index->indisprimary;
2292 		relation->rd_index->indisexclusion = index->indisexclusion;
2293 		relation->rd_index->indimmediate = index->indimmediate;
2294 		relation->rd_index->indisclustered = index->indisclustered;
2295 		relation->rd_index->indisvalid = index->indisvalid;
2296 		relation->rd_index->indcheckxmin = index->indcheckxmin;
2297 		relation->rd_index->indisready = index->indisready;
2298 		relation->rd_index->indislive = index->indislive;
2299 
2300 		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
2301 		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
2302 							   HeapTupleHeaderGetXmin(tuple->t_data));
2303 
2304 		ReleaseSysCache(tuple);
2305 	}
2306 
2307 	/* Okay, now it's valid again */
2308 	relation->rd_isvalid = true;
2309 }
2310 
2311 /*
2312  * RelationReloadNailed - reload minimal information for nailed relations.
2313  *
2314  * The structure of a nailed relation can never change (which is good, because
2315  * we rely on knowing their structure to be able to read catalog content). But
2316  * some parts, e.g. pg_class.relfrozenxid, are still important to have
2317  * accurate content for. Therefore those need to be reloaded after the arrival
2318  * of invalidations.
2319  */
2320 static void
RelationReloadNailed(Relation relation)2321 RelationReloadNailed(Relation relation)
2322 {
2323 	Assert(relation->rd_isnailed);
2324 
2325 	/*
2326 	 * Redo RelationInitPhysicalAddr in case it is a mapped relation whose
2327 	 * mapping changed.
2328 	 */
2329 	RelationInitPhysicalAddr(relation);
2330 
2331 	/* flag as needing to be revalidated */
2332 	relation->rd_isvalid = false;
2333 
2334 	/*
2335 	 * Can only reread catalog contents if in a transaction.  If the relation
2336 	 * is currently open (not counting the nailed refcount), do so
2337 	 * immediately. Otherwise we've already marked the entry as possibly
2338 	 * invalid, and it'll be fixed when next opened.
2339 	 */
2340 	if (!IsTransactionState() || relation->rd_refcnt <= 1)
2341 		return;
2342 
2343 	if (relation->rd_rel->relkind == RELKIND_INDEX)
2344 	{
2345 		/*
2346 		 * If it's a nailed-but-not-mapped index, then we need to re-read the
2347 		 * pg_class row to see if its relfilenode changed.
2348 		 */
2349 		RelationReloadIndexInfo(relation);
2350 	}
2351 	else
2352 	{
2353 		/*
2354 		 * Reload a non-index entry.  We can't easily do so if relcaches
2355 		 * aren't yet built, but that's fine because at that stage the
2356 		 * attributes that need to be current (like relfrozenxid) aren't yet
2357 		 * accessed.  To ensure the entry will later be revalidated, we leave
2358 		 * it in invalid state, but allow use (cf. RelationIdGetRelation()).
2359 		 */
2360 		if (criticalRelcachesBuilt)
2361 		{
2362 			HeapTuple	pg_class_tuple;
2363 			Form_pg_class relp;
2364 
2365 			/*
2366 			 * NB: Mark the entry as valid before starting to scan, to avoid
2367 			 * self-recursion when re-building pg_class.
2368 			 */
2369 			relation->rd_isvalid = true;
2370 
2371 			pg_class_tuple = ScanPgRelation(RelationGetRelid(relation),
2372 											true, false);
2373 			relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2374 			memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2375 			heap_freetuple(pg_class_tuple);
2376 
2377 			/*
2378 			 * Again mark as valid, to protect against concurrently arriving
2379 			 * invalidations.
2380 			 */
2381 			relation->rd_isvalid = true;
2382 		}
2383 	}
2384 }
2385 
2386 /*
2387  * RelationDestroyRelation
2388  *
2389  *	Physically delete a relation cache entry and all subsidiary data.
2390  *	Caller must already have unhooked the entry from the hash table.
2391  */
2392 static void
RelationDestroyRelation(Relation relation,bool remember_tupdesc)2393 RelationDestroyRelation(Relation relation, bool remember_tupdesc)
2394 {
2395 	Assert(RelationHasReferenceCountZero(relation));
2396 
2397 	/*
2398 	 * Make sure smgr and lower levels close the relation's files, if they
2399 	 * weren't closed already.  (This was probably done by caller, but let's
2400 	 * just be real sure.)
2401 	 */
2402 	RelationCloseSmgr(relation);
2403 
2404 	/*
2405 	 * Free all the subsidiary data structures of the relcache entry, then the
2406 	 * entry itself.
2407 	 */
2408 	if (relation->rd_rel)
2409 		pfree(relation->rd_rel);
2410 	/* can't use DecrTupleDescRefCount here */
2411 	Assert(relation->rd_att->tdrefcount > 0);
2412 	if (--relation->rd_att->tdrefcount == 0)
2413 	{
2414 		/*
2415 		 * If we Rebuilt a relcache entry during a transaction then its
2416 		 * possible we did that because the TupDesc changed as the result of
2417 		 * an ALTER TABLE that ran at less than AccessExclusiveLock. It's
2418 		 * possible someone copied that TupDesc, in which case the copy would
2419 		 * point to free'd memory. So if we rebuild an entry we keep the
2420 		 * TupDesc around until end of transaction, to be safe.
2421 		 */
2422 		if (remember_tupdesc)
2423 			RememberToFreeTupleDescAtEOX(relation->rd_att);
2424 		else
2425 			FreeTupleDesc(relation->rd_att);
2426 	}
2427 	FreeTriggerDesc(relation->trigdesc);
2428 	list_free_deep(relation->rd_fkeylist);
2429 	list_free(relation->rd_indexlist);
2430 	list_free(relation->rd_statlist);
2431 	bms_free(relation->rd_indexattr);
2432 	bms_free(relation->rd_keyattr);
2433 	bms_free(relation->rd_pkattr);
2434 	bms_free(relation->rd_idattr);
2435 	if (relation->rd_pubactions)
2436 		pfree(relation->rd_pubactions);
2437 	if (relation->rd_options)
2438 		pfree(relation->rd_options);
2439 	if (relation->rd_indextuple)
2440 		pfree(relation->rd_indextuple);
2441 	if (relation->rd_amcache)
2442 		pfree(relation->rd_amcache);
2443 	if (relation->rd_fdwroutine)
2444 		pfree(relation->rd_fdwroutine);
2445 	if (relation->rd_indexcxt)
2446 		MemoryContextDelete(relation->rd_indexcxt);
2447 	if (relation->rd_rulescxt)
2448 		MemoryContextDelete(relation->rd_rulescxt);
2449 	if (relation->rd_rsdesc)
2450 		MemoryContextDelete(relation->rd_rsdesc->rscxt);
2451 	if (relation->rd_partkeycxt)
2452 		MemoryContextDelete(relation->rd_partkeycxt);
2453 	if (relation->rd_pdcxt)
2454 		MemoryContextDelete(relation->rd_pdcxt);
2455 	if (relation->rd_pddcxt)
2456 		MemoryContextDelete(relation->rd_pddcxt);
2457 	if (relation->rd_partcheckcxt)
2458 		MemoryContextDelete(relation->rd_partcheckcxt);
2459 	pfree(relation);
2460 }
2461 
2462 /*
2463  * RelationClearRelation
2464  *
2465  *	 Physically blow away a relation cache entry, or reset it and rebuild
2466  *	 it from scratch (that is, from catalog entries).  The latter path is
2467  *	 used when we are notified of a change to an open relation (one with
2468  *	 refcount > 0).
2469  *
2470  *	 NB: when rebuilding, we'd better hold some lock on the relation,
2471  *	 else the catalog data we need to read could be changing under us.
2472  *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
2473  *	 a sinval reset could happen while we're accessing the catalogs, and
2474  *	 the rel would get blown away underneath us by RelationCacheInvalidate
2475  *	 if it has zero refcnt.
2476  *
2477  *	 The "rebuild" parameter is redundant in current usage because it has
2478  *	 to match the relation's refcnt status, but we keep it as a crosscheck
2479  *	 that we're doing what the caller expects.
2480  */
2481 static void
RelationClearRelation(Relation relation,bool rebuild)2482 RelationClearRelation(Relation relation, bool rebuild)
2483 {
2484 	/*
2485 	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
2486 	 * course it would be an equally bad idea to blow away one with nonzero
2487 	 * refcnt, since that would leave someone somewhere with a dangling
2488 	 * pointer.  All callers are expected to have verified that this holds.
2489 	 */
2490 	Assert(rebuild ?
2491 		   !RelationHasReferenceCountZero(relation) :
2492 		   RelationHasReferenceCountZero(relation));
2493 
2494 	/*
2495 	 * Make sure smgr and lower levels close the relation's files, if they
2496 	 * weren't closed already.  If the relation is not getting deleted, the
2497 	 * next smgr access should reopen the files automatically.  This ensures
2498 	 * that the low-level file access state is updated after, say, a vacuum
2499 	 * truncation.
2500 	 */
2501 	RelationCloseSmgr(relation);
2502 
2503 	/* Free AM cached data, if any */
2504 	if (relation->rd_amcache)
2505 		pfree(relation->rd_amcache);
2506 	relation->rd_amcache = NULL;
2507 
2508 	/*
2509 	 * Treat nailed-in system relations separately, they always need to be
2510 	 * accessible, so we can't blow them away.
2511 	 */
2512 	if (relation->rd_isnailed)
2513 	{
2514 		RelationReloadNailed(relation);
2515 		return;
2516 	}
2517 
2518 	/* Mark it invalid until we've finished rebuild */
2519 	relation->rd_isvalid = false;
2520 
2521 	/* See RelationForgetRelation(). */
2522 	if (relation->rd_droppedSubid != InvalidSubTransactionId)
2523 		return;
2524 
2525 	/*
2526 	 * Even non-system indexes should not be blown away if they are open and
2527 	 * have valid index support information.  This avoids problems with active
2528 	 * use of the index support information.  As with nailed indexes, we
2529 	 * re-read the pg_class row to handle possible physical relocation of the
2530 	 * index, and we check for pg_index updates too.
2531 	 */
2532 	if ((relation->rd_rel->relkind == RELKIND_INDEX ||
2533 		 relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2534 		relation->rd_refcnt > 0 &&
2535 		relation->rd_indexcxt != NULL)
2536 	{
2537 		if (IsTransactionState())
2538 			RelationReloadIndexInfo(relation);
2539 		return;
2540 	}
2541 
2542 	/*
2543 	 * If we're really done with the relcache entry, blow it away. But if
2544 	 * someone is still using it, reconstruct the whole deal without moving
2545 	 * the physical RelationData record (so that the someone's pointer is
2546 	 * still valid).
2547 	 */
2548 	if (!rebuild)
2549 	{
2550 		/* Remove it from the hash table */
2551 		RelationCacheDelete(relation);
2552 
2553 		/* And release storage */
2554 		RelationDestroyRelation(relation, false);
2555 	}
2556 	else if (!IsTransactionState())
2557 	{
2558 		/*
2559 		 * If we're not inside a valid transaction, we can't do any catalog
2560 		 * access so it's not possible to rebuild yet.  Just exit, leaving
2561 		 * rd_isvalid = false so that the rebuild will occur when the entry is
2562 		 * next opened.
2563 		 *
2564 		 * Note: it's possible that we come here during subtransaction abort,
2565 		 * and the reason for wanting to rebuild is that the rel is open in
2566 		 * the outer transaction.  In that case it might seem unsafe to not
2567 		 * rebuild immediately, since whatever code has the rel already open
2568 		 * will keep on using the relcache entry as-is.  However, in such a
2569 		 * case the outer transaction should be holding a lock that's
2570 		 * sufficient to prevent any significant change in the rel's schema,
2571 		 * so the existing entry contents should be good enough for its
2572 		 * purposes; at worst we might be behind on statistics updates or the
2573 		 * like.  (See also CheckTableNotInUse() and its callers.)	These same
2574 		 * remarks also apply to the cases above where we exit without having
2575 		 * done RelationReloadIndexInfo() yet.
2576 		 */
2577 		return;
2578 	}
2579 	else
2580 	{
2581 		/*
2582 		 * Our strategy for rebuilding an open relcache entry is to build a
2583 		 * new entry from scratch, swap its contents with the old entry, and
2584 		 * finally delete the new entry (along with any infrastructure swapped
2585 		 * over from the old entry).  This is to avoid trouble in case an
2586 		 * error causes us to lose control partway through.  The old entry
2587 		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
2588 		 * on next access.  Meanwhile it's not any less valid than it was
2589 		 * before, so any code that might expect to continue accessing it
2590 		 * isn't hurt by the rebuild failure.  (Consider for example a
2591 		 * subtransaction that ALTERs a table and then gets canceled partway
2592 		 * through the cache entry rebuild.  The outer transaction should
2593 		 * still see the not-modified cache entry as valid.)  The worst
2594 		 * consequence of an error is leaking the necessarily-unreferenced new
2595 		 * entry, and this shouldn't happen often enough for that to be a big
2596 		 * problem.
2597 		 *
2598 		 * When rebuilding an open relcache entry, we must preserve ref count,
2599 		 * rd_*Subid, and rd_toastoid state.  Also attempt to preserve the
2600 		 * pg_class entry (rd_rel), tupledesc, rewrite-rule, partition key,
2601 		 * and partition descriptor substructures in place, because various
2602 		 * places assume that these structures won't move while they are
2603 		 * working with an open relcache entry.  (Note:  the refcount
2604 		 * mechanism for tupledescs might someday allow us to remove this hack
2605 		 * for the tupledesc.)
2606 		 *
2607 		 * Note that this process does not touch CurrentResourceOwner; which
2608 		 * is good because whatever ref counts the entry may have do not
2609 		 * necessarily belong to that resource owner.
2610 		 */
2611 		Relation	newrel;
2612 		Oid			save_relid = RelationGetRelid(relation);
2613 		bool		keep_tupdesc;
2614 		bool		keep_rules;
2615 		bool		keep_policies;
2616 		bool		keep_partkey;
2617 
2618 		/* Build temporary entry, but don't link it into hashtable */
2619 		newrel = RelationBuildDesc(save_relid, false);
2620 
2621 		/*
2622 		 * Between here and the end of the swap, don't add code that does or
2623 		 * reasonably could read system catalogs.  That range must be free
2624 		 * from invalidation processing.  See RelationBuildDesc() manipulation
2625 		 * of in_progress_list.
2626 		 */
2627 
2628 		if (newrel == NULL)
2629 		{
2630 			/*
2631 			 * We can validly get here, if we're using a historic snapshot in
2632 			 * which a relation, accessed from outside logical decoding, is
2633 			 * still invisible. In that case it's fine to just mark the
2634 			 * relation as invalid and return - it'll fully get reloaded by
2635 			 * the cache reset at the end of logical decoding (or at the next
2636 			 * access).  During normal processing we don't want to ignore this
2637 			 * case as it shouldn't happen there, as explained below.
2638 			 */
2639 			if (HistoricSnapshotActive())
2640 				return;
2641 
2642 			/*
2643 			 * This shouldn't happen as dropping a relation is intended to be
2644 			 * impossible if still referenced (cf. CheckTableNotInUse()). But
2645 			 * if we get here anyway, we can't just delete the relcache entry,
2646 			 * as it possibly could get accessed later (as e.g. the error
2647 			 * might get trapped and handled via a subtransaction rollback).
2648 			 */
2649 			elog(ERROR, "relation %u deleted while still in use", save_relid);
2650 		}
2651 
2652 		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
2653 		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
2654 		keep_policies = equalRSDesc(relation->rd_rsdesc, newrel->rd_rsdesc);
2655 		/* partkey is immutable once set up, so we can always keep it */
2656 		keep_partkey = (relation->rd_partkey != NULL);
2657 
2658 		/*
2659 		 * Perform swapping of the relcache entry contents.  Within this
2660 		 * process the old entry is momentarily invalid, so there *must* be no
2661 		 * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
2662 		 * all-in-line code for safety.
2663 		 *
2664 		 * Since the vast majority of fields should be swapped, our method is
2665 		 * to swap the whole structures and then re-swap those few fields we
2666 		 * didn't want swapped.
2667 		 */
2668 #define SWAPFIELD(fldtype, fldname) \
2669 		do { \
2670 			fldtype _tmp = newrel->fldname; \
2671 			newrel->fldname = relation->fldname; \
2672 			relation->fldname = _tmp; \
2673 		} while (0)
2674 
2675 		/* swap all Relation struct fields */
2676 		{
2677 			RelationData tmpstruct;
2678 
2679 			memcpy(&tmpstruct, newrel, sizeof(RelationData));
2680 			memcpy(newrel, relation, sizeof(RelationData));
2681 			memcpy(relation, &tmpstruct, sizeof(RelationData));
2682 		}
2683 
2684 		/* rd_smgr must not be swapped, due to back-links from smgr level */
2685 		SWAPFIELD(SMgrRelation, rd_smgr);
2686 		/* rd_refcnt must be preserved */
2687 		SWAPFIELD(int, rd_refcnt);
2688 		/* isnailed shouldn't change */
2689 		Assert(newrel->rd_isnailed == relation->rd_isnailed);
2690 		/* creation sub-XIDs must be preserved */
2691 		SWAPFIELD(SubTransactionId, rd_createSubid);
2692 		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2693 		SWAPFIELD(SubTransactionId, rd_firstRelfilenodeSubid);
2694 		SWAPFIELD(SubTransactionId, rd_droppedSubid);
2695 		/* un-swap rd_rel pointers, swap contents instead */
2696 		SWAPFIELD(Form_pg_class, rd_rel);
2697 		/* ... but actually, we don't have to update newrel->rd_rel */
2698 		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
2699 		/* preserve old tupledesc, rules, policies if no logical change */
2700 		if (keep_tupdesc)
2701 			SWAPFIELD(TupleDesc, rd_att);
2702 		if (keep_rules)
2703 		{
2704 			SWAPFIELD(RuleLock *, rd_rules);
2705 			SWAPFIELD(MemoryContext, rd_rulescxt);
2706 		}
2707 		if (keep_policies)
2708 			SWAPFIELD(RowSecurityDesc *, rd_rsdesc);
2709 		/* toast OID override must be preserved */
2710 		SWAPFIELD(Oid, rd_toastoid);
2711 		/* pgstat_info must be preserved */
2712 		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
2713 		/* preserve old partition key if we have one */
2714 		if (keep_partkey)
2715 		{
2716 			SWAPFIELD(PartitionKey, rd_partkey);
2717 			SWAPFIELD(MemoryContext, rd_partkeycxt);
2718 		}
2719 		if (newrel->rd_pdcxt != NULL || newrel->rd_pddcxt != NULL)
2720 		{
2721 			/*
2722 			 * We are rebuilding a partitioned relation with a non-zero
2723 			 * reference count, so we must keep the old partition descriptor
2724 			 * around, in case there's a PartitionDirectory with a pointer to
2725 			 * it.  This means we can't free the old rd_pdcxt yet.  (This is
2726 			 * necessary because RelationGetPartitionDesc hands out direct
2727 			 * pointers to the relcache's data structure, unlike our usual
2728 			 * practice which is to hand out copies.  We'd have the same
2729 			 * problem with rd_partkey, except that we always preserve that
2730 			 * once created.)
2731 			 *
2732 			 * To ensure that it's not leaked completely, re-attach it to the
2733 			 * new reldesc, or make it a child of the new reldesc's rd_pdcxt
2734 			 * in the unlikely event that there is one already.  (Compare hack
2735 			 * in RelationBuildPartitionDesc.)  RelationClose will clean up
2736 			 * any such contexts once the reference count reaches zero.
2737 			 *
2738 			 * In the case where the reference count is zero, this code is not
2739 			 * reached, which should be OK because in that case there should
2740 			 * be no PartitionDirectory with a pointer to the old entry.
2741 			 *
2742 			 * Note that newrel and relation have already been swapped, so the
2743 			 * "old" partition descriptor is actually the one hanging off of
2744 			 * newrel.
2745 			 */
2746 			relation->rd_partdesc = NULL;	/* ensure rd_partdesc is invalid */
2747 			relation->rd_partdesc_nodetached = NULL;
2748 			relation->rd_partdesc_nodetached_xmin = InvalidTransactionId;
2749 			if (relation->rd_pdcxt != NULL) /* probably never happens */
2750 				MemoryContextSetParent(newrel->rd_pdcxt, relation->rd_pdcxt);
2751 			else
2752 				relation->rd_pdcxt = newrel->rd_pdcxt;
2753 			if (relation->rd_pddcxt != NULL)
2754 				MemoryContextSetParent(newrel->rd_pddcxt, relation->rd_pddcxt);
2755 			else
2756 				relation->rd_pddcxt = newrel->rd_pddcxt;
2757 			/* drop newrel's pointers so we don't destroy it below */
2758 			newrel->rd_partdesc = NULL;
2759 			newrel->rd_partdesc_nodetached = NULL;
2760 			newrel->rd_partdesc_nodetached_xmin = InvalidTransactionId;
2761 			newrel->rd_pdcxt = NULL;
2762 			newrel->rd_pddcxt = NULL;
2763 		}
2764 
2765 #undef SWAPFIELD
2766 
2767 		/* And now we can throw away the temporary entry */
2768 		RelationDestroyRelation(newrel, !keep_tupdesc);
2769 	}
2770 }
2771 
2772 /*
2773  * RelationFlushRelation
2774  *
2775  *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
2776  *	 This is used when we receive a cache invalidation event for the rel.
2777  */
2778 static void
RelationFlushRelation(Relation relation)2779 RelationFlushRelation(Relation relation)
2780 {
2781 	if (relation->rd_createSubid != InvalidSubTransactionId ||
2782 		relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
2783 	{
2784 		/*
2785 		 * New relcache entries are always rebuilt, not flushed; else we'd
2786 		 * forget the "new" status of the relation.  Ditto for the
2787 		 * new-relfilenode status.
2788 		 *
2789 		 * The rel could have zero refcnt here, so temporarily increment the
2790 		 * refcnt to ensure it's safe to rebuild it.  We can assume that the
2791 		 * current transaction has some lock on the rel already.
2792 		 */
2793 		RelationIncrementReferenceCount(relation);
2794 		RelationClearRelation(relation, true);
2795 		RelationDecrementReferenceCount(relation);
2796 	}
2797 	else
2798 	{
2799 		/*
2800 		 * Pre-existing rels can be dropped from the relcache if not open.
2801 		 */
2802 		bool		rebuild = !RelationHasReferenceCountZero(relation);
2803 
2804 		RelationClearRelation(relation, rebuild);
2805 	}
2806 }
2807 
2808 /*
2809  * RelationForgetRelation - caller reports that it dropped the relation
2810  */
2811 void
RelationForgetRelation(Oid rid)2812 RelationForgetRelation(Oid rid)
2813 {
2814 	Relation	relation;
2815 
2816 	RelationIdCacheLookup(rid, relation);
2817 
2818 	if (!PointerIsValid(relation))
2819 		return;					/* not in cache, nothing to do */
2820 
2821 	if (!RelationHasReferenceCountZero(relation))
2822 		elog(ERROR, "relation %u is still open", rid);
2823 
2824 	Assert(relation->rd_droppedSubid == InvalidSubTransactionId);
2825 	if (relation->rd_createSubid != InvalidSubTransactionId ||
2826 		relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
2827 	{
2828 		/*
2829 		 * In the event of subtransaction rollback, we must not forget
2830 		 * rd_*Subid.  Mark the entry "dropped" so RelationClearRelation()
2831 		 * invalidates it in lieu of destroying it.  (If we're in a top
2832 		 * transaction, we could opt to destroy the entry.)
2833 		 */
2834 		relation->rd_droppedSubid = GetCurrentSubTransactionId();
2835 	}
2836 
2837 	RelationClearRelation(relation, false);
2838 }
2839 
2840 /*
2841  *		RelationCacheInvalidateEntry
2842  *
2843  *		This routine is invoked for SI cache flush messages.
2844  *
2845  * Any relcache entry matching the relid must be flushed.  (Note: caller has
2846  * already determined that the relid belongs to our database or is a shared
2847  * relation.)
2848  *
2849  * We used to skip local relations, on the grounds that they could
2850  * not be targets of cross-backend SI update messages; but it seems
2851  * safer to process them, so that our *own* SI update messages will
2852  * have the same effects during CommandCounterIncrement for both
2853  * local and nonlocal relations.
2854  */
2855 void
RelationCacheInvalidateEntry(Oid relationId)2856 RelationCacheInvalidateEntry(Oid relationId)
2857 {
2858 	Relation	relation;
2859 
2860 	RelationIdCacheLookup(relationId, relation);
2861 
2862 	if (PointerIsValid(relation))
2863 	{
2864 		relcacheInvalsReceived++;
2865 		RelationFlushRelation(relation);
2866 	}
2867 	else
2868 	{
2869 		int			i;
2870 
2871 		for (i = 0; i < in_progress_list_len; i++)
2872 			if (in_progress_list[i].reloid == relationId)
2873 				in_progress_list[i].invalidated = true;
2874 	}
2875 }
2876 
2877 /*
2878  * RelationCacheInvalidate
2879  *	 Blow away cached relation descriptors that have zero reference counts,
2880  *	 and rebuild those with positive reference counts.  Also reset the smgr
2881  *	 relation cache and re-read relation mapping data.
2882  *
2883  *	 Apart from debug_discard_caches, this is currently used only to recover
2884  *	 from SI message buffer overflow, so we do not touch relations having
2885  *	 new-in-transaction relfilenodes; they cannot be targets of cross-backend
2886  *	 SI updates (and our own updates now go through a separate linked list
2887  *	 that isn't limited by the SI message buffer size).
2888  *
2889  *	 We do this in two phases: the first pass deletes deletable items, and
2890  *	 the second one rebuilds the rebuildable items.  This is essential for
2891  *	 safety, because hash_seq_search only copes with concurrent deletion of
2892  *	 the element it is currently visiting.  If a second SI overflow were to
2893  *	 occur while we are walking the table, resulting in recursive entry to
2894  *	 this routine, we could crash because the inner invocation blows away
2895  *	 the entry next to be visited by the outer scan.  But this way is OK,
2896  *	 because (a) during the first pass we won't process any more SI messages,
2897  *	 so hash_seq_search will complete safely; (b) during the second pass we
2898  *	 only hold onto pointers to nondeletable entries.
2899  *
2900  *	 The two-phase approach also makes it easy to update relfilenodes for
2901  *	 mapped relations before we do anything else, and to ensure that the
2902  *	 second pass processes nailed-in-cache items before other nondeletable
2903  *	 items.  This should ensure that system catalogs are up to date before
2904  *	 we attempt to use them to reload information about other open relations.
2905  *
2906  *	 After those two phases of work having immediate effects, we normally
2907  *	 signal any RelationBuildDesc() on the stack to start over.  However, we
2908  *	 don't do this if called as part of debug_discard_caches.  Otherwise,
2909  *	 RelationBuildDesc() would become an infinite loop.
2910  */
2911 void
RelationCacheInvalidate(bool debug_discard)2912 RelationCacheInvalidate(bool debug_discard)
2913 {
2914 	HASH_SEQ_STATUS status;
2915 	RelIdCacheEnt *idhentry;
2916 	Relation	relation;
2917 	List	   *rebuildFirstList = NIL;
2918 	List	   *rebuildList = NIL;
2919 	ListCell   *l;
2920 	int			i;
2921 
2922 	/*
2923 	 * Reload relation mapping data before starting to reconstruct cache.
2924 	 */
2925 	RelationMapInvalidateAll();
2926 
2927 	/* Phase 1 */
2928 	hash_seq_init(&status, RelationIdCache);
2929 
2930 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2931 	{
2932 		relation = idhentry->reldesc;
2933 
2934 		/* Must close all smgr references to avoid leaving dangling ptrs */
2935 		RelationCloseSmgr(relation);
2936 
2937 		/*
2938 		 * Ignore new relations; no other backend will manipulate them before
2939 		 * we commit.  Likewise, before replacing a relation's relfilenode, we
2940 		 * shall have acquired AccessExclusiveLock and drained any applicable
2941 		 * pending invalidations.
2942 		 */
2943 		if (relation->rd_createSubid != InvalidSubTransactionId ||
2944 			relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
2945 			continue;
2946 
2947 		relcacheInvalsReceived++;
2948 
2949 		if (RelationHasReferenceCountZero(relation))
2950 		{
2951 			/* Delete this entry immediately */
2952 			Assert(!relation->rd_isnailed);
2953 			RelationClearRelation(relation, false);
2954 		}
2955 		else
2956 		{
2957 			/*
2958 			 * If it's a mapped relation, immediately update its rd_node in
2959 			 * case its relfilenode changed.  We must do this during phase 1
2960 			 * in case the relation is consulted during rebuild of other
2961 			 * relcache entries in phase 2.  It's safe since consulting the
2962 			 * map doesn't involve any access to relcache entries.
2963 			 */
2964 			if (RelationIsMapped(relation))
2965 				RelationInitPhysicalAddr(relation);
2966 
2967 			/*
2968 			 * Add this entry to list of stuff to rebuild in second pass.
2969 			 * pg_class goes to the front of rebuildFirstList while
2970 			 * pg_class_oid_index goes to the back of rebuildFirstList, so
2971 			 * they are done first and second respectively.  Other nailed
2972 			 * relations go to the front of rebuildList, so they'll be done
2973 			 * next in no particular order; and everything else goes to the
2974 			 * back of rebuildList.
2975 			 */
2976 			if (RelationGetRelid(relation) == RelationRelationId)
2977 				rebuildFirstList = lcons(relation, rebuildFirstList);
2978 			else if (RelationGetRelid(relation) == ClassOidIndexId)
2979 				rebuildFirstList = lappend(rebuildFirstList, relation);
2980 			else if (relation->rd_isnailed)
2981 				rebuildList = lcons(relation, rebuildList);
2982 			else
2983 				rebuildList = lappend(rebuildList, relation);
2984 		}
2985 	}
2986 
2987 	/*
2988 	 * Now zap any remaining smgr cache entries.  This must happen before we
2989 	 * start to rebuild entries, since that may involve catalog fetches which
2990 	 * will re-open catalog files.
2991 	 */
2992 	smgrcloseall();
2993 
2994 	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2995 	foreach(l, rebuildFirstList)
2996 	{
2997 		relation = (Relation) lfirst(l);
2998 		RelationClearRelation(relation, true);
2999 	}
3000 	list_free(rebuildFirstList);
3001 	foreach(l, rebuildList)
3002 	{
3003 		relation = (Relation) lfirst(l);
3004 		RelationClearRelation(relation, true);
3005 	}
3006 	list_free(rebuildList);
3007 
3008 	if (!debug_discard)
3009 		/* Any RelationBuildDesc() on the stack must start over. */
3010 		for (i = 0; i < in_progress_list_len; i++)
3011 			in_progress_list[i].invalidated = true;
3012 }
3013 
3014 /*
3015  * RelationCloseSmgrByOid - close a relcache entry's smgr link
3016  *
3017  * Needed in some cases where we are changing a relation's physical mapping.
3018  * The link will be automatically reopened on next use.
3019  */
3020 void
RelationCloseSmgrByOid(Oid relationId)3021 RelationCloseSmgrByOid(Oid relationId)
3022 {
3023 	Relation	relation;
3024 
3025 	RelationIdCacheLookup(relationId, relation);
3026 
3027 	if (!PointerIsValid(relation))
3028 		return;					/* not in cache, nothing to do */
3029 
3030 	RelationCloseSmgr(relation);
3031 }
3032 
3033 static void
RememberToFreeTupleDescAtEOX(TupleDesc td)3034 RememberToFreeTupleDescAtEOX(TupleDesc td)
3035 {
3036 	if (EOXactTupleDescArray == NULL)
3037 	{
3038 		MemoryContext oldcxt;
3039 
3040 		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3041 
3042 		EOXactTupleDescArray = (TupleDesc *) palloc(16 * sizeof(TupleDesc));
3043 		EOXactTupleDescArrayLen = 16;
3044 		NextEOXactTupleDescNum = 0;
3045 		MemoryContextSwitchTo(oldcxt);
3046 	}
3047 	else if (NextEOXactTupleDescNum >= EOXactTupleDescArrayLen)
3048 	{
3049 		int32		newlen = EOXactTupleDescArrayLen * 2;
3050 
3051 		Assert(EOXactTupleDescArrayLen > 0);
3052 
3053 		EOXactTupleDescArray = (TupleDesc *) repalloc(EOXactTupleDescArray,
3054 													  newlen * sizeof(TupleDesc));
3055 		EOXactTupleDescArrayLen = newlen;
3056 	}
3057 
3058 	EOXactTupleDescArray[NextEOXactTupleDescNum++] = td;
3059 }
3060 
3061 #ifdef USE_ASSERT_CHECKING
3062 static void
AssertPendingSyncConsistency(Relation relation)3063 AssertPendingSyncConsistency(Relation relation)
3064 {
3065 	bool		relcache_verdict =
3066 	RelationIsPermanent(relation) &&
3067 	((relation->rd_createSubid != InvalidSubTransactionId &&
3068 	  RELKIND_HAS_STORAGE(relation->rd_rel->relkind)) ||
3069 	 relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId);
3070 
3071 	Assert(relcache_verdict == RelFileNodeSkippingWAL(relation->rd_node));
3072 
3073 	if (relation->rd_droppedSubid != InvalidSubTransactionId)
3074 		Assert(!relation->rd_isvalid &&
3075 			   (relation->rd_createSubid != InvalidSubTransactionId ||
3076 				relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId));
3077 }
3078 
3079 /*
3080  * AssertPendingSyncs_RelationCache
3081  *
3082  *	Assert that relcache.c and storage.c agree on whether to skip WAL.
3083  */
3084 void
AssertPendingSyncs_RelationCache(void)3085 AssertPendingSyncs_RelationCache(void)
3086 {
3087 	HASH_SEQ_STATUS status;
3088 	LOCALLOCK  *locallock;
3089 	Relation   *rels;
3090 	int			maxrels;
3091 	int			nrels;
3092 	RelIdCacheEnt *idhentry;
3093 	int			i;
3094 
3095 	/*
3096 	 * Open every relation that this transaction has locked.  If, for some
3097 	 * relation, storage.c is skipping WAL and relcache.c is not skipping WAL,
3098 	 * a CommandCounterIncrement() typically yields a local invalidation
3099 	 * message that destroys the relcache entry.  By recreating such entries
3100 	 * here, we detect the problem.
3101 	 */
3102 	PushActiveSnapshot(GetTransactionSnapshot());
3103 	maxrels = 1;
3104 	rels = palloc(maxrels * sizeof(*rels));
3105 	nrels = 0;
3106 	hash_seq_init(&status, GetLockMethodLocalHash());
3107 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3108 	{
3109 		Oid			relid;
3110 		Relation	r;
3111 
3112 		if (locallock->nLocks <= 0)
3113 			continue;
3114 		if ((LockTagType) locallock->tag.lock.locktag_type !=
3115 			LOCKTAG_RELATION)
3116 			continue;
3117 		relid = ObjectIdGetDatum(locallock->tag.lock.locktag_field2);
3118 		r = RelationIdGetRelation(relid);
3119 		if (!RelationIsValid(r))
3120 			continue;
3121 		if (nrels >= maxrels)
3122 		{
3123 			maxrels *= 2;
3124 			rels = repalloc(rels, maxrels * sizeof(*rels));
3125 		}
3126 		rels[nrels++] = r;
3127 	}
3128 
3129 	hash_seq_init(&status, RelationIdCache);
3130 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3131 		AssertPendingSyncConsistency(idhentry->reldesc);
3132 
3133 	for (i = 0; i < nrels; i++)
3134 		RelationClose(rels[i]);
3135 	PopActiveSnapshot();
3136 }
3137 #endif
3138 
3139 /*
3140  * AtEOXact_RelationCache
3141  *
3142  *	Clean up the relcache at main-transaction commit or abort.
3143  *
3144  * Note: this must be called *before* processing invalidation messages.
3145  * In the case of abort, we don't want to try to rebuild any invalidated
3146  * cache entries (since we can't safely do database accesses).  Therefore
3147  * we must reset refcnts before handling pending invalidations.
3148  *
3149  * As of PostgreSQL 8.1, relcache refcnts should get released by the
3150  * ResourceOwner mechanism.  This routine just does a debugging
3151  * cross-check that no pins remain.  However, we also need to do special
3152  * cleanup when the current transaction created any relations or made use
3153  * of forced index lists.
3154  */
3155 void
AtEOXact_RelationCache(bool isCommit)3156 AtEOXact_RelationCache(bool isCommit)
3157 {
3158 	HASH_SEQ_STATUS status;
3159 	RelIdCacheEnt *idhentry;
3160 	int			i;
3161 
3162 	/*
3163 	 * Forget in_progress_list.  This is relevant when we're aborting due to
3164 	 * an error during RelationBuildDesc().
3165 	 */
3166 	Assert(in_progress_list_len == 0 || !isCommit);
3167 	in_progress_list_len = 0;
3168 
3169 	/*
3170 	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
3171 	 * listed in it.  Otherwise fall back on a hash_seq_search scan.
3172 	 *
3173 	 * For simplicity, eoxact_list[] entries are not deleted till end of
3174 	 * top-level transaction, even though we could remove them at
3175 	 * subtransaction end in some cases, or remove relations from the list if
3176 	 * they are cleared for other reasons.  Therefore we should expect the
3177 	 * case that list entries are not found in the hashtable; if not, there's
3178 	 * nothing to do for them.
3179 	 */
3180 	if (eoxact_list_overflowed)
3181 	{
3182 		hash_seq_init(&status, RelationIdCache);
3183 		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3184 		{
3185 			AtEOXact_cleanup(idhentry->reldesc, isCommit);
3186 		}
3187 	}
3188 	else
3189 	{
3190 		for (i = 0; i < eoxact_list_len; i++)
3191 		{
3192 			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
3193 													 (void *) &eoxact_list[i],
3194 													 HASH_FIND,
3195 													 NULL);
3196 			if (idhentry != NULL)
3197 				AtEOXact_cleanup(idhentry->reldesc, isCommit);
3198 		}
3199 	}
3200 
3201 	if (EOXactTupleDescArrayLen > 0)
3202 	{
3203 		Assert(EOXactTupleDescArray != NULL);
3204 		for (i = 0; i < NextEOXactTupleDescNum; i++)
3205 			FreeTupleDesc(EOXactTupleDescArray[i]);
3206 		pfree(EOXactTupleDescArray);
3207 		EOXactTupleDescArray = NULL;
3208 	}
3209 
3210 	/* Now we're out of the transaction and can clear the lists */
3211 	eoxact_list_len = 0;
3212 	eoxact_list_overflowed = false;
3213 	NextEOXactTupleDescNum = 0;
3214 	EOXactTupleDescArrayLen = 0;
3215 }
3216 
3217 /*
3218  * AtEOXact_cleanup
3219  *
3220  *	Clean up a single rel at main-transaction commit or abort
3221  *
3222  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
3223  * bother to prevent duplicate entries in eoxact_list[].
3224  */
3225 static void
AtEOXact_cleanup(Relation relation,bool isCommit)3226 AtEOXact_cleanup(Relation relation, bool isCommit)
3227 {
3228 	bool		clear_relcache = false;
3229 
3230 	/*
3231 	 * The relcache entry's ref count should be back to its normal
3232 	 * not-in-a-transaction state: 0 unless it's nailed in cache.
3233 	 *
3234 	 * In bootstrap mode, this is NOT true, so don't check it --- the
3235 	 * bootstrap code expects relations to stay open across start/commit
3236 	 * transaction calls.  (That seems bogus, but it's not worth fixing.)
3237 	 *
3238 	 * Note: ideally this check would be applied to every relcache entry, not
3239 	 * just those that have eoxact work to do.  But it's not worth forcing a
3240 	 * scan of the whole relcache just for this.  (Moreover, doing so would
3241 	 * mean that assert-enabled testing never tests the hash_search code path
3242 	 * above, which seems a bad idea.)
3243 	 */
3244 #ifdef USE_ASSERT_CHECKING
3245 	if (!IsBootstrapProcessingMode())
3246 	{
3247 		int			expected_refcnt;
3248 
3249 		expected_refcnt = relation->rd_isnailed ? 1 : 0;
3250 		Assert(relation->rd_refcnt == expected_refcnt);
3251 	}
3252 #endif
3253 
3254 	/*
3255 	 * Is the relation live after this transaction ends?
3256 	 *
3257 	 * During commit, clear the relcache entry if it is preserved after
3258 	 * relation drop, in order not to orphan the entry.  During rollback,
3259 	 * clear the relcache entry if the relation is created in the current
3260 	 * transaction since it isn't interesting any longer once we are out of
3261 	 * the transaction.
3262 	 */
3263 	clear_relcache =
3264 		(isCommit ?
3265 		 relation->rd_droppedSubid != InvalidSubTransactionId :
3266 		 relation->rd_createSubid != InvalidSubTransactionId);
3267 
3268 	/*
3269 	 * Since we are now out of the transaction, reset the subids to zero. That
3270 	 * also lets RelationClearRelation() drop the relcache entry.
3271 	 */
3272 	relation->rd_createSubid = InvalidSubTransactionId;
3273 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3274 	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
3275 	relation->rd_droppedSubid = InvalidSubTransactionId;
3276 
3277 	if (clear_relcache)
3278 	{
3279 		if (RelationHasReferenceCountZero(relation))
3280 		{
3281 			RelationClearRelation(relation, false);
3282 			return;
3283 		}
3284 		else
3285 		{
3286 			/*
3287 			 * Hmm, somewhere there's a (leaked?) reference to the relation.
3288 			 * We daren't remove the entry for fear of dereferencing a
3289 			 * dangling pointer later.  Bleat, and mark it as not belonging to
3290 			 * the current transaction.  Hopefully it'll get cleaned up
3291 			 * eventually.  This must be just a WARNING to avoid
3292 			 * error-during-error-recovery loops.
3293 			 */
3294 			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
3295 				 RelationGetRelationName(relation));
3296 		}
3297 	}
3298 }
3299 
3300 /*
3301  * AtEOSubXact_RelationCache
3302  *
3303  *	Clean up the relcache at sub-transaction commit or abort.
3304  *
3305  * Note: this must be called *before* processing invalidation messages.
3306  */
3307 void
AtEOSubXact_RelationCache(bool isCommit,SubTransactionId mySubid,SubTransactionId parentSubid)3308 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
3309 						  SubTransactionId parentSubid)
3310 {
3311 	HASH_SEQ_STATUS status;
3312 	RelIdCacheEnt *idhentry;
3313 	int			i;
3314 
3315 	/*
3316 	 * Forget in_progress_list.  This is relevant when we're aborting due to
3317 	 * an error during RelationBuildDesc().  We don't commit subtransactions
3318 	 * during RelationBuildDesc().
3319 	 */
3320 	Assert(in_progress_list_len == 0 || !isCommit);
3321 	in_progress_list_len = 0;
3322 
3323 	/*
3324 	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
3325 	 * listed in it.  Otherwise fall back on a hash_seq_search scan.  Same
3326 	 * logic as in AtEOXact_RelationCache.
3327 	 */
3328 	if (eoxact_list_overflowed)
3329 	{
3330 		hash_seq_init(&status, RelationIdCache);
3331 		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3332 		{
3333 			AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
3334 								mySubid, parentSubid);
3335 		}
3336 	}
3337 	else
3338 	{
3339 		for (i = 0; i < eoxact_list_len; i++)
3340 		{
3341 			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
3342 													 (void *) &eoxact_list[i],
3343 													 HASH_FIND,
3344 													 NULL);
3345 			if (idhentry != NULL)
3346 				AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
3347 									mySubid, parentSubid);
3348 		}
3349 	}
3350 
3351 	/* Don't reset the list; we still need more cleanup later */
3352 }
3353 
3354 /*
3355  * AtEOSubXact_cleanup
3356  *
3357  *	Clean up a single rel at subtransaction commit or abort
3358  *
3359  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
3360  * bother to prevent duplicate entries in eoxact_list[].
3361  */
3362 static void
AtEOSubXact_cleanup(Relation relation,bool isCommit,SubTransactionId mySubid,SubTransactionId parentSubid)3363 AtEOSubXact_cleanup(Relation relation, bool isCommit,
3364 					SubTransactionId mySubid, SubTransactionId parentSubid)
3365 {
3366 	/*
3367 	 * Is it a relation created in the current subtransaction?
3368 	 *
3369 	 * During subcommit, mark it as belonging to the parent, instead, as long
3370 	 * as it has not been dropped. Otherwise simply delete the relcache entry.
3371 	 * --- it isn't interesting any longer.
3372 	 */
3373 	if (relation->rd_createSubid == mySubid)
3374 	{
3375 		/*
3376 		 * Valid rd_droppedSubid means the corresponding relation is dropped
3377 		 * but the relcache entry is preserved for at-commit pending sync. We
3378 		 * need to drop it explicitly here not to make the entry orphan.
3379 		 */
3380 		Assert(relation->rd_droppedSubid == mySubid ||
3381 			   relation->rd_droppedSubid == InvalidSubTransactionId);
3382 		if (isCommit && relation->rd_droppedSubid == InvalidSubTransactionId)
3383 			relation->rd_createSubid = parentSubid;
3384 		else if (RelationHasReferenceCountZero(relation))
3385 		{
3386 			/* allow the entry to be removed */
3387 			relation->rd_createSubid = InvalidSubTransactionId;
3388 			relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3389 			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
3390 			relation->rd_droppedSubid = InvalidSubTransactionId;
3391 			RelationClearRelation(relation, false);
3392 			return;
3393 		}
3394 		else
3395 		{
3396 			/*
3397 			 * Hmm, somewhere there's a (leaked?) reference to the relation.
3398 			 * We daren't remove the entry for fear of dereferencing a
3399 			 * dangling pointer later.  Bleat, and transfer it to the parent
3400 			 * subtransaction so we can try again later.  This must be just a
3401 			 * WARNING to avoid error-during-error-recovery loops.
3402 			 */
3403 			relation->rd_createSubid = parentSubid;
3404 			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
3405 				 RelationGetRelationName(relation));
3406 		}
3407 	}
3408 
3409 	/*
3410 	 * Likewise, update or drop any new-relfilenode-in-subtransaction record
3411 	 * or drop record.
3412 	 */
3413 	if (relation->rd_newRelfilenodeSubid == mySubid)
3414 	{
3415 		if (isCommit)
3416 			relation->rd_newRelfilenodeSubid = parentSubid;
3417 		else
3418 			relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3419 	}
3420 
3421 	if (relation->rd_firstRelfilenodeSubid == mySubid)
3422 	{
3423 		if (isCommit)
3424 			relation->rd_firstRelfilenodeSubid = parentSubid;
3425 		else
3426 			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
3427 	}
3428 
3429 	if (relation->rd_droppedSubid == mySubid)
3430 	{
3431 		if (isCommit)
3432 			relation->rd_droppedSubid = parentSubid;
3433 		else
3434 			relation->rd_droppedSubid = InvalidSubTransactionId;
3435 	}
3436 }
3437 
3438 
3439 /*
3440  *		RelationBuildLocalRelation
3441  *			Build a relcache entry for an about-to-be-created relation,
3442  *			and enter it into the relcache.
3443  */
3444 Relation
RelationBuildLocalRelation(const char * relname,Oid relnamespace,TupleDesc tupDesc,Oid relid,Oid accessmtd,Oid relfilenode,Oid reltablespace,bool shared_relation,bool mapped_relation,char relpersistence,char relkind)3445 RelationBuildLocalRelation(const char *relname,
3446 						   Oid relnamespace,
3447 						   TupleDesc tupDesc,
3448 						   Oid relid,
3449 						   Oid accessmtd,
3450 						   Oid relfilenode,
3451 						   Oid reltablespace,
3452 						   bool shared_relation,
3453 						   bool mapped_relation,
3454 						   char relpersistence,
3455 						   char relkind)
3456 {
3457 	Relation	rel;
3458 	MemoryContext oldcxt;
3459 	int			natts = tupDesc->natts;
3460 	int			i;
3461 	bool		has_not_null;
3462 	bool		nailit;
3463 
3464 	AssertArg(natts >= 0);
3465 
3466 	/*
3467 	 * check for creation of a rel that must be nailed in cache.
3468 	 *
3469 	 * XXX this list had better match the relations specially handled in
3470 	 * RelationCacheInitializePhase2/3.
3471 	 */
3472 	switch (relid)
3473 	{
3474 		case DatabaseRelationId:
3475 		case AuthIdRelationId:
3476 		case AuthMemRelationId:
3477 		case RelationRelationId:
3478 		case AttributeRelationId:
3479 		case ProcedureRelationId:
3480 		case TypeRelationId:
3481 			nailit = true;
3482 			break;
3483 		default:
3484 			nailit = false;
3485 			break;
3486 	}
3487 
3488 	/*
3489 	 * check that hardwired list of shared rels matches what's in the
3490 	 * bootstrap .bki file.  If you get a failure here during initdb, you
3491 	 * probably need to fix IsSharedRelation() to match whatever you've done
3492 	 * to the set of shared relations.
3493 	 */
3494 	if (shared_relation != IsSharedRelation(relid))
3495 		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
3496 			 relname, relid);
3497 
3498 	/* Shared relations had better be mapped, too */
3499 	Assert(mapped_relation || !shared_relation);
3500 
3501 	/*
3502 	 * switch to the cache context to create the relcache entry.
3503 	 */
3504 	if (!CacheMemoryContext)
3505 		CreateCacheMemoryContext();
3506 
3507 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3508 
3509 	/*
3510 	 * allocate a new relation descriptor and fill in basic state fields.
3511 	 */
3512 	rel = (Relation) palloc0(sizeof(RelationData));
3513 
3514 	/* make sure relation is marked as having no open file yet */
3515 	rel->rd_smgr = NULL;
3516 
3517 	/* mark it nailed if appropriate */
3518 	rel->rd_isnailed = nailit;
3519 
3520 	rel->rd_refcnt = nailit ? 1 : 0;
3521 
3522 	/* it's being created in this transaction */
3523 	rel->rd_createSubid = GetCurrentSubTransactionId();
3524 	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3525 	rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
3526 	rel->rd_droppedSubid = InvalidSubTransactionId;
3527 
3528 	/*
3529 	 * create a new tuple descriptor from the one passed in.  We do this
3530 	 * partly to copy it into the cache context, and partly because the new
3531 	 * relation can't have any defaults or constraints yet; they have to be
3532 	 * added in later steps, because they require additions to multiple system
3533 	 * catalogs.  We can copy attnotnull constraints here, however.
3534 	 */
3535 	rel->rd_att = CreateTupleDescCopy(tupDesc);
3536 	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
3537 	has_not_null = false;
3538 	for (i = 0; i < natts; i++)
3539 	{
3540 		Form_pg_attribute satt = TupleDescAttr(tupDesc, i);
3541 		Form_pg_attribute datt = TupleDescAttr(rel->rd_att, i);
3542 
3543 		datt->attidentity = satt->attidentity;
3544 		datt->attgenerated = satt->attgenerated;
3545 		datt->attnotnull = satt->attnotnull;
3546 		has_not_null |= satt->attnotnull;
3547 	}
3548 
3549 	if (has_not_null)
3550 	{
3551 		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
3552 
3553 		constr->has_not_null = true;
3554 		rel->rd_att->constr = constr;
3555 	}
3556 
3557 	/*
3558 	 * initialize relation tuple form (caller may add/override data later)
3559 	 */
3560 	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3561 
3562 	namestrcpy(&rel->rd_rel->relname, relname);
3563 	rel->rd_rel->relnamespace = relnamespace;
3564 
3565 	rel->rd_rel->relkind = relkind;
3566 	rel->rd_rel->relnatts = natts;
3567 	rel->rd_rel->reltype = InvalidOid;
3568 	/* needed when bootstrapping: */
3569 	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3570 
3571 	/* set up persistence and relcache fields dependent on it */
3572 	rel->rd_rel->relpersistence = relpersistence;
3573 	switch (relpersistence)
3574 	{
3575 		case RELPERSISTENCE_UNLOGGED:
3576 		case RELPERSISTENCE_PERMANENT:
3577 			rel->rd_backend = InvalidBackendId;
3578 			rel->rd_islocaltemp = false;
3579 			break;
3580 		case RELPERSISTENCE_TEMP:
3581 			Assert(isTempOrTempToastNamespace(relnamespace));
3582 			rel->rd_backend = BackendIdForTempRelations();
3583 			rel->rd_islocaltemp = true;
3584 			break;
3585 		default:
3586 			elog(ERROR, "invalid relpersistence: %c", relpersistence);
3587 			break;
3588 	}
3589 
3590 	/* if it's a materialized view, it's not populated initially */
3591 	if (relkind == RELKIND_MATVIEW)
3592 		rel->rd_rel->relispopulated = false;
3593 	else
3594 		rel->rd_rel->relispopulated = true;
3595 
3596 	/* set replica identity -- system catalogs and non-tables don't have one */
3597 	if (!IsCatalogNamespace(relnamespace) &&
3598 		(relkind == RELKIND_RELATION ||
3599 		 relkind == RELKIND_MATVIEW ||
3600 		 relkind == RELKIND_PARTITIONED_TABLE))
3601 		rel->rd_rel->relreplident = REPLICA_IDENTITY_DEFAULT;
3602 	else
3603 		rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
3604 
3605 	/*
3606 	 * Insert relation physical and logical identifiers (OIDs) into the right
3607 	 * places.  For a mapped relation, we set relfilenode to zero and rely on
3608 	 * RelationInitPhysicalAddr to consult the map.
3609 	 */
3610 	rel->rd_rel->relisshared = shared_relation;
3611 
3612 	RelationGetRelid(rel) = relid;
3613 
3614 	for (i = 0; i < natts; i++)
3615 		TupleDescAttr(rel->rd_att, i)->attrelid = relid;
3616 
3617 	rel->rd_rel->reltablespace = reltablespace;
3618 
3619 	if (mapped_relation)
3620 	{
3621 		rel->rd_rel->relfilenode = InvalidOid;
3622 		/* Add it to the active mapping information */
3623 		RelationMapUpdateMap(relid, relfilenode, shared_relation, true);
3624 	}
3625 	else
3626 		rel->rd_rel->relfilenode = relfilenode;
3627 
3628 	RelationInitLockInfo(rel);	/* see lmgr.c */
3629 
3630 	RelationInitPhysicalAddr(rel);
3631 
3632 	rel->rd_rel->relam = accessmtd;
3633 
3634 	/*
3635 	 * RelationInitTableAccessMethod will do syscache lookups, so we mustn't
3636 	 * run it in CacheMemoryContext.  Fortunately, the remaining steps don't
3637 	 * require a long-lived current context.
3638 	 */
3639 	MemoryContextSwitchTo(oldcxt);
3640 
3641 	if (relkind == RELKIND_RELATION ||
3642 		relkind == RELKIND_SEQUENCE ||
3643 		relkind == RELKIND_TOASTVALUE ||
3644 		relkind == RELKIND_MATVIEW)
3645 		RelationInitTableAccessMethod(rel);
3646 
3647 	/*
3648 	 * Okay to insert into the relcache hash table.
3649 	 *
3650 	 * Ordinarily, there should certainly not be an existing hash entry for
3651 	 * the same OID; but during bootstrap, when we create a "real" relcache
3652 	 * entry for one of the bootstrap relations, we'll be overwriting the
3653 	 * phony one created with formrdesc.  So allow that to happen for nailed
3654 	 * rels.
3655 	 */
3656 	RelationCacheInsert(rel, nailit);
3657 
3658 	/*
3659 	 * Flag relation as needing eoxact cleanup (to clear rd_createSubid). We
3660 	 * can't do this before storing relid in it.
3661 	 */
3662 	EOXactListAdd(rel);
3663 
3664 	/* It's fully valid */
3665 	rel->rd_isvalid = true;
3666 
3667 	/*
3668 	 * Caller expects us to pin the returned entry.
3669 	 */
3670 	RelationIncrementReferenceCount(rel);
3671 
3672 	return rel;
3673 }
3674 
3675 
3676 /*
3677  * RelationSetNewRelfilenode
3678  *
3679  * Assign a new relfilenode (physical file name), and possibly a new
3680  * persistence setting, to the relation.
3681  *
3682  * This allows a full rewrite of the relation to be done with transactional
3683  * safety (since the filenode assignment can be rolled back).  Note however
3684  * that there is no simple way to access the relation's old data for the
3685  * remainder of the current transaction.  This limits the usefulness to cases
3686  * such as TRUNCATE or rebuilding an index from scratch.
3687  *
3688  * Caller must already hold exclusive lock on the relation.
3689  */
3690 void
RelationSetNewRelfilenode(Relation relation,char persistence)3691 RelationSetNewRelfilenode(Relation relation, char persistence)
3692 {
3693 	Oid			newrelfilenode;
3694 	Relation	pg_class;
3695 	HeapTuple	tuple;
3696 	Form_pg_class classform;
3697 	MultiXactId minmulti = InvalidMultiXactId;
3698 	TransactionId freezeXid = InvalidTransactionId;
3699 	RelFileNode newrnode;
3700 
3701 	/* Allocate a new relfilenode */
3702 	newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
3703 									   persistence);
3704 
3705 	/*
3706 	 * Get a writable copy of the pg_class tuple for the given relation.
3707 	 */
3708 	pg_class = table_open(RelationRelationId, RowExclusiveLock);
3709 
3710 	tuple = SearchSysCacheCopy1(RELOID,
3711 								ObjectIdGetDatum(RelationGetRelid(relation)));
3712 	if (!HeapTupleIsValid(tuple))
3713 		elog(ERROR, "could not find tuple for relation %u",
3714 			 RelationGetRelid(relation));
3715 	classform = (Form_pg_class) GETSTRUCT(tuple);
3716 
3717 	/*
3718 	 * Schedule unlinking of the old storage at transaction commit.
3719 	 */
3720 	RelationDropStorage(relation);
3721 
3722 	/*
3723 	 * Create storage for the main fork of the new relfilenode.  If it's a
3724 	 * table-like object, call into the table AM to do so, which'll also
3725 	 * create the table's init fork if needed.
3726 	 *
3727 	 * NOTE: If relevant for the AM, any conflict in relfilenode value will be
3728 	 * caught here, if GetNewRelFileNode messes up for any reason.
3729 	 */
3730 	newrnode = relation->rd_node;
3731 	newrnode.relNode = newrelfilenode;
3732 
3733 	switch (relation->rd_rel->relkind)
3734 	{
3735 		case RELKIND_INDEX:
3736 		case RELKIND_SEQUENCE:
3737 			{
3738 				/* handle these directly, at least for now */
3739 				SMgrRelation srel;
3740 
3741 				srel = RelationCreateStorage(newrnode, persistence);
3742 				smgrclose(srel);
3743 			}
3744 			break;
3745 
3746 		case RELKIND_RELATION:
3747 		case RELKIND_TOASTVALUE:
3748 		case RELKIND_MATVIEW:
3749 			table_relation_set_new_filenode(relation, &newrnode,
3750 											persistence,
3751 											&freezeXid, &minmulti);
3752 			break;
3753 
3754 		default:
3755 			/* we shouldn't be called for anything else */
3756 			elog(ERROR, "relation \"%s\" does not have storage",
3757 				 RelationGetRelationName(relation));
3758 			break;
3759 	}
3760 
3761 	/*
3762 	 * If we're dealing with a mapped index, pg_class.relfilenode doesn't
3763 	 * change; instead we have to send the update to the relation mapper.
3764 	 *
3765 	 * For mapped indexes, we don't actually change the pg_class entry at all;
3766 	 * this is essential when reindexing pg_class itself.  That leaves us with
3767 	 * possibly-inaccurate values of relpages etc, but those will be fixed up
3768 	 * later.
3769 	 */
3770 	if (RelationIsMapped(relation))
3771 	{
3772 		/* This case is only supported for indexes */
3773 		Assert(relation->rd_rel->relkind == RELKIND_INDEX);
3774 
3775 		/* Since we're not updating pg_class, these had better not change */
3776 		Assert(classform->relfrozenxid == freezeXid);
3777 		Assert(classform->relminmxid == minmulti);
3778 		Assert(classform->relpersistence == persistence);
3779 
3780 		/*
3781 		 * In some code paths it's possible that the tuple update we'd
3782 		 * otherwise do here is the only thing that would assign an XID for
3783 		 * the current transaction.  However, we must have an XID to delete
3784 		 * files, so make sure one is assigned.
3785 		 */
3786 		(void) GetCurrentTransactionId();
3787 
3788 		/* Do the deed */
3789 		RelationMapUpdateMap(RelationGetRelid(relation),
3790 							 newrelfilenode,
3791 							 relation->rd_rel->relisshared,
3792 							 false);
3793 
3794 		/* Since we're not updating pg_class, must trigger inval manually */
3795 		CacheInvalidateRelcache(relation);
3796 	}
3797 	else
3798 	{
3799 		/* Normal case, update the pg_class entry */
3800 		classform->relfilenode = newrelfilenode;
3801 
3802 		/* relpages etc. never change for sequences */
3803 		if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
3804 		{
3805 			classform->relpages = 0;	/* it's empty until further notice */
3806 			classform->reltuples = -1;
3807 			classform->relallvisible = 0;
3808 		}
3809 		classform->relfrozenxid = freezeXid;
3810 		classform->relminmxid = minmulti;
3811 		classform->relpersistence = persistence;
3812 
3813 		CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
3814 	}
3815 
3816 	heap_freetuple(tuple);
3817 
3818 	table_close(pg_class, RowExclusiveLock);
3819 
3820 	/*
3821 	 * Make the pg_class row change or relation map change visible.  This will
3822 	 * cause the relcache entry to get updated, too.
3823 	 */
3824 	CommandCounterIncrement();
3825 
3826 	RelationAssumeNewRelfilenode(relation);
3827 }
3828 
3829 /*
3830  * RelationAssumeNewRelfilenode
3831  *
3832  * Code that modifies pg_class.reltablespace or pg_class.relfilenode must call
3833  * this.  The call shall precede any code that might insert WAL records whose
3834  * replay would modify bytes in the new RelFileNode, and the call shall follow
3835  * any WAL modifying bytes in the prior RelFileNode.  See struct RelationData.
3836  * Ideally, call this as near as possible to the CommandCounterIncrement()
3837  * that makes the pg_class change visible (before it or after it); that
3838  * minimizes the chance of future development adding a forbidden WAL insertion
3839  * between RelationAssumeNewRelfilenode() and CommandCounterIncrement().
3840  */
3841 void
RelationAssumeNewRelfilenode(Relation relation)3842 RelationAssumeNewRelfilenode(Relation relation)
3843 {
3844 	relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
3845 	if (relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
3846 		relation->rd_firstRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
3847 
3848 	/* Flag relation as needing eoxact cleanup (to clear these fields) */
3849 	EOXactListAdd(relation);
3850 }
3851 
3852 
3853 /*
3854  *		RelationCacheInitialize
3855  *
3856  *		This initializes the relation descriptor cache.  At the time
3857  *		that this is invoked, we can't do database access yet (mainly
3858  *		because the transaction subsystem is not up); all we are doing
3859  *		is making an empty cache hashtable.  This must be done before
3860  *		starting the initialization transaction, because otherwise
3861  *		AtEOXact_RelationCache would crash if that transaction aborts
3862  *		before we can get the relcache set up.
3863  */
3864 
3865 #define INITRELCACHESIZE		400
3866 
3867 void
RelationCacheInitialize(void)3868 RelationCacheInitialize(void)
3869 {
3870 	HASHCTL		ctl;
3871 	int			allocsize;
3872 
3873 	/*
3874 	 * make sure cache memory context exists
3875 	 */
3876 	if (!CacheMemoryContext)
3877 		CreateCacheMemoryContext();
3878 
3879 	/*
3880 	 * create hashtable that indexes the relcache
3881 	 */
3882 	ctl.keysize = sizeof(Oid);
3883 	ctl.entrysize = sizeof(RelIdCacheEnt);
3884 	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
3885 								  &ctl, HASH_ELEM | HASH_BLOBS);
3886 
3887 	/*
3888 	 * reserve enough in_progress_list slots for many cases
3889 	 */
3890 	allocsize = 4;
3891 	in_progress_list =
3892 		MemoryContextAlloc(CacheMemoryContext,
3893 						   allocsize * sizeof(*in_progress_list));
3894 	in_progress_list_maxlen = allocsize;
3895 
3896 	/*
3897 	 * relation mapper needs to be initialized too
3898 	 */
3899 	RelationMapInitialize();
3900 }
3901 
3902 /*
3903  *		RelationCacheInitializePhase2
3904  *
3905  *		This is called to prepare for access to shared catalogs during startup.
3906  *		We must at least set up nailed reldescs for pg_database, pg_authid,
3907  *		pg_auth_members, and pg_shseclabel. Ideally we'd like to have reldescs
3908  *		for their indexes, too.  We attempt to load this information from the
3909  *		shared relcache init file.  If that's missing or broken, just make
3910  *		phony entries for the catalogs themselves.
3911  *		RelationCacheInitializePhase3 will clean up as needed.
3912  */
3913 void
RelationCacheInitializePhase2(void)3914 RelationCacheInitializePhase2(void)
3915 {
3916 	MemoryContext oldcxt;
3917 
3918 	/*
3919 	 * relation mapper needs initialized too
3920 	 */
3921 	RelationMapInitializePhase2();
3922 
3923 	/*
3924 	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
3925 	 * nothing.
3926 	 */
3927 	if (IsBootstrapProcessingMode())
3928 		return;
3929 
3930 	/*
3931 	 * switch to cache memory context
3932 	 */
3933 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3934 
3935 	/*
3936 	 * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
3937 	 * the cache with pre-made descriptors for the critical shared catalogs.
3938 	 */
3939 	if (!load_relcache_init_file(true))
3940 	{
3941 		formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
3942 				  Natts_pg_database, Desc_pg_database);
3943 		formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
3944 				  Natts_pg_authid, Desc_pg_authid);
3945 		formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
3946 				  Natts_pg_auth_members, Desc_pg_auth_members);
3947 		formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true,
3948 				  Natts_pg_shseclabel, Desc_pg_shseclabel);
3949 		formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true,
3950 				  Natts_pg_subscription, Desc_pg_subscription);
3951 
3952 #define NUM_CRITICAL_SHARED_RELS	5	/* fix if you change list above */
3953 	}
3954 
3955 	MemoryContextSwitchTo(oldcxt);
3956 }
3957 
3958 /*
3959  *		RelationCacheInitializePhase3
3960  *
3961  *		This is called as soon as the catcache and transaction system
3962  *		are functional and we have determined MyDatabaseId.  At this point
3963  *		we can actually read data from the database's system catalogs.
3964  *		We first try to read pre-computed relcache entries from the local
3965  *		relcache init file.  If that's missing or broken, make phony entries
3966  *		for the minimum set of nailed-in-cache relations.  Then (unless
3967  *		bootstrapping) make sure we have entries for the critical system
3968  *		indexes.  Once we've done all this, we have enough infrastructure to
3969  *		open any system catalog or use any catcache.  The last step is to
3970  *		rewrite the cache files if needed.
3971  */
3972 void
RelationCacheInitializePhase3(void)3973 RelationCacheInitializePhase3(void)
3974 {
3975 	HASH_SEQ_STATUS status;
3976 	RelIdCacheEnt *idhentry;
3977 	MemoryContext oldcxt;
3978 	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3979 
3980 	/*
3981 	 * relation mapper needs initialized too
3982 	 */
3983 	RelationMapInitializePhase3();
3984 
3985 	/*
3986 	 * switch to cache memory context
3987 	 */
3988 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3989 
3990 	/*
3991 	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
3992 	 * the cache with pre-made descriptors for the critical "nailed-in" system
3993 	 * catalogs.
3994 	 */
3995 	if (IsBootstrapProcessingMode() ||
3996 		!load_relcache_init_file(false))
3997 	{
3998 		needNewCacheFile = true;
3999 
4000 		formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
4001 				  Natts_pg_class, Desc_pg_class);
4002 		formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
4003 				  Natts_pg_attribute, Desc_pg_attribute);
4004 		formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
4005 				  Natts_pg_proc, Desc_pg_proc);
4006 		formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
4007 				  Natts_pg_type, Desc_pg_type);
4008 
4009 #define NUM_CRITICAL_LOCAL_RELS 4	/* fix if you change list above */
4010 	}
4011 
4012 	MemoryContextSwitchTo(oldcxt);
4013 
4014 	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
4015 	if (IsBootstrapProcessingMode())
4016 		return;
4017 
4018 	/*
4019 	 * If we didn't get the critical system indexes loaded into relcache, do
4020 	 * so now.  These are critical because the catcache and/or opclass cache
4021 	 * depend on them for fetches done during relcache load.  Thus, we have an
4022 	 * infinite-recursion problem.  We can break the recursion by doing
4023 	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
4024 	 * performance, we only want to do that until we have the critical indexes
4025 	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
4026 	 * decide whether to do heapscan or indexscan at the key spots, and we set
4027 	 * it true after we've loaded the critical indexes.
4028 	 *
4029 	 * The critical indexes are marked as "nailed in cache", partly to make it
4030 	 * easy for load_relcache_init_file to count them, but mainly because we
4031 	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
4032 	 * true.  (NOTE: perhaps it would be possible to reload them by
4033 	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
4034 	 * though, we just nail 'em in.)
4035 	 *
4036 	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
4037 	 * in the same way as the others, because the critical catalogs don't
4038 	 * (currently) have any rules or triggers, and so these indexes can be
4039 	 * rebuilt without inducing recursion.  However they are used during
4040 	 * relcache load when a rel does have rules or triggers, so we choose to
4041 	 * nail them for performance reasons.
4042 	 */
4043 	if (!criticalRelcachesBuilt)
4044 	{
4045 		load_critical_index(ClassOidIndexId,
4046 							RelationRelationId);
4047 		load_critical_index(AttributeRelidNumIndexId,
4048 							AttributeRelationId);
4049 		load_critical_index(IndexRelidIndexId,
4050 							IndexRelationId);
4051 		load_critical_index(OpclassOidIndexId,
4052 							OperatorClassRelationId);
4053 		load_critical_index(AccessMethodProcedureIndexId,
4054 							AccessMethodProcedureRelationId);
4055 		load_critical_index(RewriteRelRulenameIndexId,
4056 							RewriteRelationId);
4057 		load_critical_index(TriggerRelidNameIndexId,
4058 							TriggerRelationId);
4059 
4060 #define NUM_CRITICAL_LOCAL_INDEXES	7	/* fix if you change list above */
4061 
4062 		criticalRelcachesBuilt = true;
4063 	}
4064 
4065 	/*
4066 	 * Process critical shared indexes too.
4067 	 *
4068 	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
4069 	 * initial lookup of MyDatabaseId, without which we'll never find any
4070 	 * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
4071 	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
4072 	 * need to nail up some indexes on pg_authid and pg_auth_members for use
4073 	 * during client authentication.  SharedSecLabelObjectIndexId isn't
4074 	 * critical for the core system, but authentication hooks might be
4075 	 * interested in it.
4076 	 */
4077 	if (!criticalSharedRelcachesBuilt)
4078 	{
4079 		load_critical_index(DatabaseNameIndexId,
4080 							DatabaseRelationId);
4081 		load_critical_index(DatabaseOidIndexId,
4082 							DatabaseRelationId);
4083 		load_critical_index(AuthIdRolnameIndexId,
4084 							AuthIdRelationId);
4085 		load_critical_index(AuthIdOidIndexId,
4086 							AuthIdRelationId);
4087 		load_critical_index(AuthMemMemRoleIndexId,
4088 							AuthMemRelationId);
4089 		load_critical_index(SharedSecLabelObjectIndexId,
4090 							SharedSecLabelRelationId);
4091 
4092 #define NUM_CRITICAL_SHARED_INDEXES 6	/* fix if you change list above */
4093 
4094 		criticalSharedRelcachesBuilt = true;
4095 	}
4096 
4097 	/*
4098 	 * Now, scan all the relcache entries and update anything that might be
4099 	 * wrong in the results from formrdesc or the relcache cache file. If we
4100 	 * faked up relcache entries using formrdesc, then read the real pg_class
4101 	 * rows and replace the fake entries with them. Also, if any of the
4102 	 * relcache entries have rules, triggers, or security policies, load that
4103 	 * info the hard way since it isn't recorded in the cache file.
4104 	 *
4105 	 * Whenever we access the catalogs to read data, there is a possibility of
4106 	 * a shared-inval cache flush causing relcache entries to be removed.
4107 	 * Since hash_seq_search only guarantees to still work after the *current*
4108 	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
4109 	 * We handle this by restarting the scan from scratch after each access.
4110 	 * This is theoretically O(N^2), but the number of entries that actually
4111 	 * need to be fixed is small enough that it doesn't matter.
4112 	 */
4113 	hash_seq_init(&status, RelationIdCache);
4114 
4115 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4116 	{
4117 		Relation	relation = idhentry->reldesc;
4118 		bool		restart = false;
4119 
4120 		/*
4121 		 * Make sure *this* entry doesn't get flushed while we work with it.
4122 		 */
4123 		RelationIncrementReferenceCount(relation);
4124 
4125 		/*
4126 		 * If it's a faked-up entry, read the real pg_class tuple.
4127 		 */
4128 		if (relation->rd_rel->relowner == InvalidOid)
4129 		{
4130 			HeapTuple	htup;
4131 			Form_pg_class relp;
4132 
4133 			htup = SearchSysCache1(RELOID,
4134 								   ObjectIdGetDatum(RelationGetRelid(relation)));
4135 			if (!HeapTupleIsValid(htup))
4136 				elog(FATAL, "cache lookup failed for relation %u",
4137 					 RelationGetRelid(relation));
4138 			relp = (Form_pg_class) GETSTRUCT(htup);
4139 
4140 			/*
4141 			 * Copy tuple to relation->rd_rel. (See notes in
4142 			 * AllocateRelationDesc())
4143 			 */
4144 			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
4145 
4146 			/* Update rd_options while we have the tuple */
4147 			if (relation->rd_options)
4148 				pfree(relation->rd_options);
4149 			RelationParseRelOptions(relation, htup);
4150 
4151 			/*
4152 			 * Check the values in rd_att were set up correctly.  (We cannot
4153 			 * just copy them over now: formrdesc must have set up the rd_att
4154 			 * data correctly to start with, because it may already have been
4155 			 * copied into one or more catcache entries.)
4156 			 */
4157 			Assert(relation->rd_att->tdtypeid == relp->reltype);
4158 			Assert(relation->rd_att->tdtypmod == -1);
4159 
4160 			ReleaseSysCache(htup);
4161 
4162 			/* relowner had better be OK now, else we'll loop forever */
4163 			if (relation->rd_rel->relowner == InvalidOid)
4164 				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
4165 					 RelationGetRelationName(relation));
4166 
4167 			restart = true;
4168 		}
4169 
4170 		/*
4171 		 * Fix data that isn't saved in relcache cache file.
4172 		 *
4173 		 * relhasrules or relhastriggers could possibly be wrong or out of
4174 		 * date.  If we don't actually find any rules or triggers, clear the
4175 		 * local copy of the flag so that we don't get into an infinite loop
4176 		 * here.  We don't make any attempt to fix the pg_class entry, though.
4177 		 */
4178 		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
4179 		{
4180 			RelationBuildRuleLock(relation);
4181 			if (relation->rd_rules == NULL)
4182 				relation->rd_rel->relhasrules = false;
4183 			restart = true;
4184 		}
4185 		if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
4186 		{
4187 			RelationBuildTriggers(relation);
4188 			if (relation->trigdesc == NULL)
4189 				relation->rd_rel->relhastriggers = false;
4190 			restart = true;
4191 		}
4192 
4193 		/*
4194 		 * Re-load the row security policies if the relation has them, since
4195 		 * they are not preserved in the cache.  Note that we can never NOT
4196 		 * have a policy while relrowsecurity is true,
4197 		 * RelationBuildRowSecurity will create a single default-deny policy
4198 		 * if there is no policy defined in pg_policy.
4199 		 */
4200 		if (relation->rd_rel->relrowsecurity && relation->rd_rsdesc == NULL)
4201 		{
4202 			RelationBuildRowSecurity(relation);
4203 
4204 			Assert(relation->rd_rsdesc != NULL);
4205 			restart = true;
4206 		}
4207 
4208 		/* Reload tableam data if needed */
4209 		if (relation->rd_tableam == NULL &&
4210 			(relation->rd_rel->relkind == RELKIND_RELATION ||
4211 			 relation->rd_rel->relkind == RELKIND_SEQUENCE ||
4212 			 relation->rd_rel->relkind == RELKIND_TOASTVALUE ||
4213 			 relation->rd_rel->relkind == RELKIND_MATVIEW))
4214 		{
4215 			RelationInitTableAccessMethod(relation);
4216 			Assert(relation->rd_tableam != NULL);
4217 
4218 			restart = true;
4219 		}
4220 
4221 		/* Release hold on the relation */
4222 		RelationDecrementReferenceCount(relation);
4223 
4224 		/* Now, restart the hashtable scan if needed */
4225 		if (restart)
4226 		{
4227 			hash_seq_term(&status);
4228 			hash_seq_init(&status, RelationIdCache);
4229 		}
4230 	}
4231 
4232 	/*
4233 	 * Lastly, write out new relcache cache files if needed.  We don't bother
4234 	 * to distinguish cases where only one of the two needs an update.
4235 	 */
4236 	if (needNewCacheFile)
4237 	{
4238 		/*
4239 		 * Force all the catcaches to finish initializing and thereby open the
4240 		 * catalogs and indexes they use.  This will preload the relcache with
4241 		 * entries for all the most important system catalogs and indexes, so
4242 		 * that the init files will be most useful for future backends.
4243 		 */
4244 		InitCatalogCachePhase2();
4245 
4246 		/* now write the files */
4247 		write_relcache_init_file(true);
4248 		write_relcache_init_file(false);
4249 	}
4250 }
4251 
4252 /*
4253  * Load one critical system index into the relcache
4254  *
4255  * indexoid is the OID of the target index, heapoid is the OID of the catalog
4256  * it belongs to.
4257  */
4258 static void
load_critical_index(Oid indexoid,Oid heapoid)4259 load_critical_index(Oid indexoid, Oid heapoid)
4260 {
4261 	Relation	ird;
4262 
4263 	/*
4264 	 * We must lock the underlying catalog before locking the index to avoid
4265 	 * deadlock, since RelationBuildDesc might well need to read the catalog,
4266 	 * and if anyone else is exclusive-locking this catalog and index they'll
4267 	 * be doing it in that order.
4268 	 */
4269 	LockRelationOid(heapoid, AccessShareLock);
4270 	LockRelationOid(indexoid, AccessShareLock);
4271 	ird = RelationBuildDesc(indexoid, true);
4272 	if (ird == NULL)
4273 		elog(PANIC, "could not open critical system index %u", indexoid);
4274 	ird->rd_isnailed = true;
4275 	ird->rd_refcnt = 1;
4276 	UnlockRelationOid(indexoid, AccessShareLock);
4277 	UnlockRelationOid(heapoid, AccessShareLock);
4278 
4279 	(void) RelationGetIndexAttOptions(ird, false);
4280 }
4281 
4282 /*
4283  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
4284  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
4285  *
4286  * We need this kluge because we have to be able to access non-fixed-width
4287  * fields of pg_class and pg_index before we have the standard catalog caches
4288  * available.  We use predefined data that's set up in just the same way as
4289  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
4290  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
4291  * does it have a TupleConstr field.  But it's good enough for the purpose of
4292  * extracting fields.
4293  */
4294 static TupleDesc
BuildHardcodedDescriptor(int natts,const FormData_pg_attribute * attrs)4295 BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs)
4296 {
4297 	TupleDesc	result;
4298 	MemoryContext oldcxt;
4299 	int			i;
4300 
4301 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4302 
4303 	result = CreateTemplateTupleDesc(natts);
4304 	result->tdtypeid = RECORDOID;	/* not right, but we don't care */
4305 	result->tdtypmod = -1;
4306 
4307 	for (i = 0; i < natts; i++)
4308 	{
4309 		memcpy(TupleDescAttr(result, i), &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
4310 		/* make sure attcacheoff is valid */
4311 		TupleDescAttr(result, i)->attcacheoff = -1;
4312 	}
4313 
4314 	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
4315 	TupleDescAttr(result, 0)->attcacheoff = 0;
4316 
4317 	/* Note: we don't bother to set up a TupleConstr entry */
4318 
4319 	MemoryContextSwitchTo(oldcxt);
4320 
4321 	return result;
4322 }
4323 
4324 static TupleDesc
GetPgClassDescriptor(void)4325 GetPgClassDescriptor(void)
4326 {
4327 	static TupleDesc pgclassdesc = NULL;
4328 
4329 	/* Already done? */
4330 	if (pgclassdesc == NULL)
4331 		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
4332 											   Desc_pg_class);
4333 
4334 	return pgclassdesc;
4335 }
4336 
4337 static TupleDesc
GetPgIndexDescriptor(void)4338 GetPgIndexDescriptor(void)
4339 {
4340 	static TupleDesc pgindexdesc = NULL;
4341 
4342 	/* Already done? */
4343 	if (pgindexdesc == NULL)
4344 		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
4345 											   Desc_pg_index);
4346 
4347 	return pgindexdesc;
4348 }
4349 
4350 /*
4351  * Load any default attribute value definitions for the relation.
4352  *
4353  * ndef is the number of attributes that were marked atthasdef.
4354  *
4355  * Note: we don't make it a hard error to be missing some pg_attrdef records.
4356  * We can limp along as long as nothing needs to use the default value.  Code
4357  * that fails to find an expected AttrDefault record should throw an error.
4358  */
4359 static void
AttrDefaultFetch(Relation relation,int ndef)4360 AttrDefaultFetch(Relation relation, int ndef)
4361 {
4362 	AttrDefault *attrdef;
4363 	Relation	adrel;
4364 	SysScanDesc adscan;
4365 	ScanKeyData skey;
4366 	HeapTuple	htup;
4367 	int			found = 0;
4368 
4369 	/* Allocate array with room for as many entries as expected */
4370 	attrdef = (AttrDefault *)
4371 		MemoryContextAllocZero(CacheMemoryContext,
4372 							   ndef * sizeof(AttrDefault));
4373 
4374 	/* Search pg_attrdef for relevant entries */
4375 	ScanKeyInit(&skey,
4376 				Anum_pg_attrdef_adrelid,
4377 				BTEqualStrategyNumber, F_OIDEQ,
4378 				ObjectIdGetDatum(RelationGetRelid(relation)));
4379 
4380 	adrel = table_open(AttrDefaultRelationId, AccessShareLock);
4381 	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
4382 								NULL, 1, &skey);
4383 
4384 	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
4385 	{
4386 		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
4387 		Datum		val;
4388 		bool		isnull;
4389 
4390 		/* protect limited size of array */
4391 		if (found >= ndef)
4392 		{
4393 			elog(WARNING, "unexpected pg_attrdef record found for attribute %d of relation \"%s\"",
4394 				 adform->adnum, RelationGetRelationName(relation));
4395 			break;
4396 		}
4397 
4398 		val = fastgetattr(htup,
4399 						  Anum_pg_attrdef_adbin,
4400 						  adrel->rd_att, &isnull);
4401 		if (isnull)
4402 			elog(WARNING, "null adbin for attribute %d of relation \"%s\"",
4403 				 adform->adnum, RelationGetRelationName(relation));
4404 		else
4405 		{
4406 			/* detoast and convert to cstring in caller's context */
4407 			char	   *s = TextDatumGetCString(val);
4408 
4409 			attrdef[found].adnum = adform->adnum;
4410 			attrdef[found].adbin = MemoryContextStrdup(CacheMemoryContext, s);
4411 			pfree(s);
4412 			found++;
4413 		}
4414 	}
4415 
4416 	systable_endscan(adscan);
4417 	table_close(adrel, AccessShareLock);
4418 
4419 	if (found != ndef)
4420 		elog(WARNING, "%d pg_attrdef record(s) missing for relation \"%s\"",
4421 			 ndef - found, RelationGetRelationName(relation));
4422 
4423 	/*
4424 	 * Sort the AttrDefault entries by adnum, for the convenience of
4425 	 * equalTupleDescs().  (Usually, they already will be in order, but this
4426 	 * might not be so if systable_getnext isn't using an index.)
4427 	 */
4428 	if (found > 1)
4429 		qsort(attrdef, found, sizeof(AttrDefault), AttrDefaultCmp);
4430 
4431 	/* Install array only after it's fully valid */
4432 	relation->rd_att->constr->defval = attrdef;
4433 	relation->rd_att->constr->num_defval = found;
4434 }
4435 
4436 /*
4437  * qsort comparator to sort AttrDefault entries by adnum
4438  */
4439 static int
AttrDefaultCmp(const void * a,const void * b)4440 AttrDefaultCmp(const void *a, const void *b)
4441 {
4442 	const AttrDefault *ada = (const AttrDefault *) a;
4443 	const AttrDefault *adb = (const AttrDefault *) b;
4444 
4445 	return ada->adnum - adb->adnum;
4446 }
4447 
4448 /*
4449  * Load any check constraints for the relation.
4450  *
4451  * As with defaults, if we don't find the expected number of them, just warn
4452  * here.  The executor should throw an error if an INSERT/UPDATE is attempted.
4453  */
4454 static void
CheckConstraintFetch(Relation relation)4455 CheckConstraintFetch(Relation relation)
4456 {
4457 	ConstrCheck *check;
4458 	int			ncheck = relation->rd_rel->relchecks;
4459 	Relation	conrel;
4460 	SysScanDesc conscan;
4461 	ScanKeyData skey[1];
4462 	HeapTuple	htup;
4463 	int			found = 0;
4464 
4465 	/* Allocate array with room for as many entries as expected */
4466 	check = (ConstrCheck *)
4467 		MemoryContextAllocZero(CacheMemoryContext,
4468 							   ncheck * sizeof(ConstrCheck));
4469 
4470 	/* Search pg_constraint for relevant entries */
4471 	ScanKeyInit(&skey[0],
4472 				Anum_pg_constraint_conrelid,
4473 				BTEqualStrategyNumber, F_OIDEQ,
4474 				ObjectIdGetDatum(RelationGetRelid(relation)));
4475 
4476 	conrel = table_open(ConstraintRelationId, AccessShareLock);
4477 	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4478 								 NULL, 1, skey);
4479 
4480 	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4481 	{
4482 		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
4483 		Datum		val;
4484 		bool		isnull;
4485 
4486 		/* We want check constraints only */
4487 		if (conform->contype != CONSTRAINT_CHECK)
4488 			continue;
4489 
4490 		/* protect limited size of array */
4491 		if (found >= ncheck)
4492 		{
4493 			elog(WARNING, "unexpected pg_constraint record found for relation \"%s\"",
4494 				 RelationGetRelationName(relation));
4495 			break;
4496 		}
4497 
4498 		check[found].ccvalid = conform->convalidated;
4499 		check[found].ccnoinherit = conform->connoinherit;
4500 		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
4501 												  NameStr(conform->conname));
4502 
4503 		/* Grab and test conbin is actually set */
4504 		val = fastgetattr(htup,
4505 						  Anum_pg_constraint_conbin,
4506 						  conrel->rd_att, &isnull);
4507 		if (isnull)
4508 			elog(WARNING, "null conbin for relation \"%s\"",
4509 				 RelationGetRelationName(relation));
4510 		else
4511 		{
4512 			/* detoast and convert to cstring in caller's context */
4513 			char	   *s = TextDatumGetCString(val);
4514 
4515 			check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
4516 			pfree(s);
4517 			found++;
4518 		}
4519 	}
4520 
4521 	systable_endscan(conscan);
4522 	table_close(conrel, AccessShareLock);
4523 
4524 	if (found != ncheck)
4525 		elog(WARNING, "%d pg_constraint record(s) missing for relation \"%s\"",
4526 			 ncheck - found, RelationGetRelationName(relation));
4527 
4528 	/*
4529 	 * Sort the records by name.  This ensures that CHECKs are applied in a
4530 	 * deterministic order, and it also makes equalTupleDescs() faster.
4531 	 */
4532 	if (found > 1)
4533 		qsort(check, found, sizeof(ConstrCheck), CheckConstraintCmp);
4534 
4535 	/* Install array only after it's fully valid */
4536 	relation->rd_att->constr->check = check;
4537 	relation->rd_att->constr->num_check = found;
4538 }
4539 
4540 /*
4541  * qsort comparator to sort ConstrCheck entries by name
4542  */
4543 static int
CheckConstraintCmp(const void * a,const void * b)4544 CheckConstraintCmp(const void *a, const void *b)
4545 {
4546 	const ConstrCheck *ca = (const ConstrCheck *) a;
4547 	const ConstrCheck *cb = (const ConstrCheck *) b;
4548 
4549 	return strcmp(ca->ccname, cb->ccname);
4550 }
4551 
4552 /*
4553  * RelationGetFKeyList -- get a list of foreign key info for the relation
4554  *
4555  * Returns a list of ForeignKeyCacheInfo structs, one per FK constraining
4556  * the given relation.  This data is a direct copy of relevant fields from
4557  * pg_constraint.  The list items are in no particular order.
4558  *
4559  * CAUTION: the returned list is part of the relcache's data, and could
4560  * vanish in a relcache entry reset.  Callers must inspect or copy it
4561  * before doing anything that might trigger a cache flush, such as
4562  * system catalog accesses.  copyObject() can be used if desired.
4563  * (We define it this way because current callers want to filter and
4564  * modify the list entries anyway, so copying would be a waste of time.)
4565  */
4566 List *
RelationGetFKeyList(Relation relation)4567 RelationGetFKeyList(Relation relation)
4568 {
4569 	List	   *result;
4570 	Relation	conrel;
4571 	SysScanDesc conscan;
4572 	ScanKeyData skey;
4573 	HeapTuple	htup;
4574 	List	   *oldlist;
4575 	MemoryContext oldcxt;
4576 
4577 	/* Quick exit if we already computed the list. */
4578 	if (relation->rd_fkeyvalid)
4579 		return relation->rd_fkeylist;
4580 
4581 	/* Fast path: non-partitioned tables without triggers can't have FKs */
4582 	if (!relation->rd_rel->relhastriggers &&
4583 		relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
4584 		return NIL;
4585 
4586 	/*
4587 	 * We build the list we intend to return (in the caller's context) while
4588 	 * doing the scan.  After successfully completing the scan, we copy that
4589 	 * list into the relcache entry.  This avoids cache-context memory leakage
4590 	 * if we get some sort of error partway through.
4591 	 */
4592 	result = NIL;
4593 
4594 	/* Prepare to scan pg_constraint for entries having conrelid = this rel. */
4595 	ScanKeyInit(&skey,
4596 				Anum_pg_constraint_conrelid,
4597 				BTEqualStrategyNumber, F_OIDEQ,
4598 				ObjectIdGetDatum(RelationGetRelid(relation)));
4599 
4600 	conrel = table_open(ConstraintRelationId, AccessShareLock);
4601 	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4602 								 NULL, 1, &skey);
4603 
4604 	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4605 	{
4606 		Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup);
4607 		ForeignKeyCacheInfo *info;
4608 
4609 		/* consider only foreign keys */
4610 		if (constraint->contype != CONSTRAINT_FOREIGN)
4611 			continue;
4612 
4613 		info = makeNode(ForeignKeyCacheInfo);
4614 		info->conoid = constraint->oid;
4615 		info->conrelid = constraint->conrelid;
4616 		info->confrelid = constraint->confrelid;
4617 
4618 		DeconstructFkConstraintRow(htup, &info->nkeys,
4619 								   info->conkey,
4620 								   info->confkey,
4621 								   info->conpfeqop,
4622 								   NULL, NULL);
4623 
4624 		/* Add FK's node to the result list */
4625 		result = lappend(result, info);
4626 	}
4627 
4628 	systable_endscan(conscan);
4629 	table_close(conrel, AccessShareLock);
4630 
4631 	/* Now save a copy of the completed list in the relcache entry. */
4632 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4633 	oldlist = relation->rd_fkeylist;
4634 	relation->rd_fkeylist = copyObject(result);
4635 	relation->rd_fkeyvalid = true;
4636 	MemoryContextSwitchTo(oldcxt);
4637 
4638 	/* Don't leak the old list, if there is one */
4639 	list_free_deep(oldlist);
4640 
4641 	return result;
4642 }
4643 
4644 /*
4645  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
4646  *
4647  * The index list is created only if someone requests it.  We scan pg_index
4648  * to find relevant indexes, and add the list to the relcache entry so that
4649  * we won't have to compute it again.  Note that shared cache inval of a
4650  * relcache entry will delete the old list and set rd_indexvalid to false,
4651  * so that we must recompute the index list on next request.  This handles
4652  * creation or deletion of an index.
4653  *
4654  * Indexes that are marked not indislive are omitted from the returned list.
4655  * Such indexes are expected to be dropped momentarily, and should not be
4656  * touched at all by any caller of this function.
4657  *
4658  * The returned list is guaranteed to be sorted in order by OID.  This is
4659  * needed by the executor, since for index types that we obtain exclusive
4660  * locks on when updating the index, all backends must lock the indexes in
4661  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
4662  * consistent ordering would do, but ordering by OID is easy.
4663  *
4664  * Since shared cache inval causes the relcache's copy of the list to go away,
4665  * we return a copy of the list palloc'd in the caller's context.  The caller
4666  * may list_free() the returned list after scanning it. This is necessary
4667  * since the caller will typically be doing syscache lookups on the relevant
4668  * indexes, and syscache lookup could cause SI messages to be processed!
4669  *
4670  * In exactly the same way, we update rd_pkindex, which is the OID of the
4671  * relation's primary key index if any, else InvalidOid; and rd_replidindex,
4672  * which is the pg_class OID of an index to be used as the relation's
4673  * replication identity index, or InvalidOid if there is no such index.
4674  */
4675 List *
RelationGetIndexList(Relation relation)4676 RelationGetIndexList(Relation relation)
4677 {
4678 	Relation	indrel;
4679 	SysScanDesc indscan;
4680 	ScanKeyData skey;
4681 	HeapTuple	htup;
4682 	List	   *result;
4683 	List	   *oldlist;
4684 	char		replident = relation->rd_rel->relreplident;
4685 	Oid			pkeyIndex = InvalidOid;
4686 	Oid			candidateIndex = InvalidOid;
4687 	MemoryContext oldcxt;
4688 
4689 	/* Quick exit if we already computed the list. */
4690 	if (relation->rd_indexvalid)
4691 		return list_copy(relation->rd_indexlist);
4692 
4693 	/*
4694 	 * We build the list we intend to return (in the caller's context) while
4695 	 * doing the scan.  After successfully completing the scan, we copy that
4696 	 * list into the relcache entry.  This avoids cache-context memory leakage
4697 	 * if we get some sort of error partway through.
4698 	 */
4699 	result = NIL;
4700 
4701 	/* Prepare to scan pg_index for entries having indrelid = this rel. */
4702 	ScanKeyInit(&skey,
4703 				Anum_pg_index_indrelid,
4704 				BTEqualStrategyNumber, F_OIDEQ,
4705 				ObjectIdGetDatum(RelationGetRelid(relation)));
4706 
4707 	indrel = table_open(IndexRelationId, AccessShareLock);
4708 	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
4709 								 NULL, 1, &skey);
4710 
4711 	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4712 	{
4713 		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
4714 
4715 		/*
4716 		 * Ignore any indexes that are currently being dropped.  This will
4717 		 * prevent them from being searched, inserted into, or considered in
4718 		 * HOT-safety decisions.  It's unsafe to touch such an index at all
4719 		 * since its catalog entries could disappear at any instant.
4720 		 */
4721 		if (!index->indislive)
4722 			continue;
4723 
4724 		/* add index's OID to result list */
4725 		result = lappend_oid(result, index->indexrelid);
4726 
4727 		/*
4728 		 * Invalid, non-unique, non-immediate or predicate indexes aren't
4729 		 * interesting for either oid indexes or replication identity indexes,
4730 		 * so don't check them.
4731 		 */
4732 		if (!index->indisvalid || !index->indisunique ||
4733 			!index->indimmediate ||
4734 			!heap_attisnull(htup, Anum_pg_index_indpred, NULL))
4735 			continue;
4736 
4737 		/* remember primary key index if any */
4738 		if (index->indisprimary)
4739 			pkeyIndex = index->indexrelid;
4740 
4741 		/* remember explicitly chosen replica index */
4742 		if (index->indisreplident)
4743 			candidateIndex = index->indexrelid;
4744 	}
4745 
4746 	systable_endscan(indscan);
4747 
4748 	table_close(indrel, AccessShareLock);
4749 
4750 	/* Sort the result list into OID order, per API spec. */
4751 	list_sort(result, list_oid_cmp);
4752 
4753 	/* Now save a copy of the completed list in the relcache entry. */
4754 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4755 	oldlist = relation->rd_indexlist;
4756 	relation->rd_indexlist = list_copy(result);
4757 	relation->rd_pkindex = pkeyIndex;
4758 	if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex))
4759 		relation->rd_replidindex = pkeyIndex;
4760 	else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
4761 		relation->rd_replidindex = candidateIndex;
4762 	else
4763 		relation->rd_replidindex = InvalidOid;
4764 	relation->rd_indexvalid = true;
4765 	MemoryContextSwitchTo(oldcxt);
4766 
4767 	/* Don't leak the old list, if there is one */
4768 	list_free(oldlist);
4769 
4770 	return result;
4771 }
4772 
4773 /*
4774  * RelationGetStatExtList
4775  *		get a list of OIDs of statistics objects on this relation
4776  *
4777  * The statistics list is created only if someone requests it, in a way
4778  * similar to RelationGetIndexList().  We scan pg_statistic_ext to find
4779  * relevant statistics, and add the list to the relcache entry so that we
4780  * won't have to compute it again.  Note that shared cache inval of a
4781  * relcache entry will delete the old list and set rd_statvalid to 0,
4782  * so that we must recompute the statistics list on next request.  This
4783  * handles creation or deletion of a statistics object.
4784  *
4785  * The returned list is guaranteed to be sorted in order by OID, although
4786  * this is not currently needed.
4787  *
4788  * Since shared cache inval causes the relcache's copy of the list to go away,
4789  * we return a copy of the list palloc'd in the caller's context.  The caller
4790  * may list_free() the returned list after scanning it. This is necessary
4791  * since the caller will typically be doing syscache lookups on the relevant
4792  * statistics, and syscache lookup could cause SI messages to be processed!
4793  */
4794 List *
RelationGetStatExtList(Relation relation)4795 RelationGetStatExtList(Relation relation)
4796 {
4797 	Relation	indrel;
4798 	SysScanDesc indscan;
4799 	ScanKeyData skey;
4800 	HeapTuple	htup;
4801 	List	   *result;
4802 	List	   *oldlist;
4803 	MemoryContext oldcxt;
4804 
4805 	/* Quick exit if we already computed the list. */
4806 	if (relation->rd_statvalid != 0)
4807 		return list_copy(relation->rd_statlist);
4808 
4809 	/*
4810 	 * We build the list we intend to return (in the caller's context) while
4811 	 * doing the scan.  After successfully completing the scan, we copy that
4812 	 * list into the relcache entry.  This avoids cache-context memory leakage
4813 	 * if we get some sort of error partway through.
4814 	 */
4815 	result = NIL;
4816 
4817 	/*
4818 	 * Prepare to scan pg_statistic_ext for entries having stxrelid = this
4819 	 * rel.
4820 	 */
4821 	ScanKeyInit(&skey,
4822 				Anum_pg_statistic_ext_stxrelid,
4823 				BTEqualStrategyNumber, F_OIDEQ,
4824 				ObjectIdGetDatum(RelationGetRelid(relation)));
4825 
4826 	indrel = table_open(StatisticExtRelationId, AccessShareLock);
4827 	indscan = systable_beginscan(indrel, StatisticExtRelidIndexId, true,
4828 								 NULL, 1, &skey);
4829 
4830 	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4831 	{
4832 		Oid			oid = ((Form_pg_statistic_ext) GETSTRUCT(htup))->oid;
4833 
4834 		result = lappend_oid(result, oid);
4835 	}
4836 
4837 	systable_endscan(indscan);
4838 
4839 	table_close(indrel, AccessShareLock);
4840 
4841 	/* Sort the result list into OID order, per API spec. */
4842 	list_sort(result, list_oid_cmp);
4843 
4844 	/* Now save a copy of the completed list in the relcache entry. */
4845 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4846 	oldlist = relation->rd_statlist;
4847 	relation->rd_statlist = list_copy(result);
4848 
4849 	relation->rd_statvalid = true;
4850 	MemoryContextSwitchTo(oldcxt);
4851 
4852 	/* Don't leak the old list, if there is one */
4853 	list_free(oldlist);
4854 
4855 	return result;
4856 }
4857 
4858 /*
4859  * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
4860  *
4861  * Returns InvalidOid if there is no such index.
4862  */
4863 Oid
RelationGetPrimaryKeyIndex(Relation relation)4864 RelationGetPrimaryKeyIndex(Relation relation)
4865 {
4866 	List	   *ilist;
4867 
4868 	if (!relation->rd_indexvalid)
4869 	{
4870 		/* RelationGetIndexList does the heavy lifting. */
4871 		ilist = RelationGetIndexList(relation);
4872 		list_free(ilist);
4873 		Assert(relation->rd_indexvalid);
4874 	}
4875 
4876 	return relation->rd_pkindex;
4877 }
4878 
4879 /*
4880  * RelationGetReplicaIndex -- get OID of the relation's replica identity index
4881  *
4882  * Returns InvalidOid if there is no such index.
4883  */
4884 Oid
RelationGetReplicaIndex(Relation relation)4885 RelationGetReplicaIndex(Relation relation)
4886 {
4887 	List	   *ilist;
4888 
4889 	if (!relation->rd_indexvalid)
4890 	{
4891 		/* RelationGetIndexList does the heavy lifting. */
4892 		ilist = RelationGetIndexList(relation);
4893 		list_free(ilist);
4894 		Assert(relation->rd_indexvalid);
4895 	}
4896 
4897 	return relation->rd_replidindex;
4898 }
4899 
4900 /*
4901  * RelationGetIndexExpressions -- get the index expressions for an index
4902  *
4903  * We cache the result of transforming pg_index.indexprs into a node tree.
4904  * If the rel is not an index or has no expressional columns, we return NIL.
4905  * Otherwise, the returned tree is copied into the caller's memory context.
4906  * (We don't want to return a pointer to the relcache copy, since it could
4907  * disappear due to relcache invalidation.)
4908  */
4909 List *
RelationGetIndexExpressions(Relation relation)4910 RelationGetIndexExpressions(Relation relation)
4911 {
4912 	List	   *result;
4913 	Datum		exprsDatum;
4914 	bool		isnull;
4915 	char	   *exprsString;
4916 	MemoryContext oldcxt;
4917 
4918 	/* Quick exit if we already computed the result. */
4919 	if (relation->rd_indexprs)
4920 		return copyObject(relation->rd_indexprs);
4921 
4922 	/* Quick exit if there is nothing to do. */
4923 	if (relation->rd_indextuple == NULL ||
4924 		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
4925 		return NIL;
4926 
4927 	/*
4928 	 * We build the tree we intend to return in the caller's context. After
4929 	 * successfully completing the work, we copy it into the relcache entry.
4930 	 * This avoids problems if we get some sort of error partway through.
4931 	 */
4932 	exprsDatum = heap_getattr(relation->rd_indextuple,
4933 							  Anum_pg_index_indexprs,
4934 							  GetPgIndexDescriptor(),
4935 							  &isnull);
4936 	Assert(!isnull);
4937 	exprsString = TextDatumGetCString(exprsDatum);
4938 	result = (List *) stringToNode(exprsString);
4939 	pfree(exprsString);
4940 
4941 	/*
4942 	 * Run the expressions through eval_const_expressions. This is not just an
4943 	 * optimization, but is necessary, because the planner will be comparing
4944 	 * them to similarly-processed qual clauses, and may fail to detect valid
4945 	 * matches without this.  We must not use canonicalize_qual, however,
4946 	 * since these aren't qual expressions.
4947 	 */
4948 	result = (List *) eval_const_expressions(NULL, (Node *) result);
4949 
4950 	/* May as well fix opfuncids too */
4951 	fix_opfuncids((Node *) result);
4952 
4953 	/* Now save a copy of the completed tree in the relcache entry. */
4954 	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4955 	relation->rd_indexprs = copyObject(result);
4956 	MemoryContextSwitchTo(oldcxt);
4957 
4958 	return result;
4959 }
4960 
4961 /*
4962  * RelationGetDummyIndexExpressions -- get dummy expressions for an index
4963  *
4964  * Return a list of dummy expressions (just Const nodes) with the same
4965  * types/typmods/collations as the index's real expressions.  This is
4966  * useful in situations where we don't want to run any user-defined code.
4967  */
4968 List *
RelationGetDummyIndexExpressions(Relation relation)4969 RelationGetDummyIndexExpressions(Relation relation)
4970 {
4971 	List	   *result;
4972 	Datum		exprsDatum;
4973 	bool		isnull;
4974 	char	   *exprsString;
4975 	List	   *rawExprs;
4976 	ListCell   *lc;
4977 
4978 	/* Quick exit if there is nothing to do. */
4979 	if (relation->rd_indextuple == NULL ||
4980 		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
4981 		return NIL;
4982 
4983 	/* Extract raw node tree(s) from index tuple. */
4984 	exprsDatum = heap_getattr(relation->rd_indextuple,
4985 							  Anum_pg_index_indexprs,
4986 							  GetPgIndexDescriptor(),
4987 							  &isnull);
4988 	Assert(!isnull);
4989 	exprsString = TextDatumGetCString(exprsDatum);
4990 	rawExprs = (List *) stringToNode(exprsString);
4991 	pfree(exprsString);
4992 
4993 	/* Construct null Consts; the typlen and typbyval are arbitrary. */
4994 	result = NIL;
4995 	foreach(lc, rawExprs)
4996 	{
4997 		Node	   *rawExpr = (Node *) lfirst(lc);
4998 
4999 		result = lappend(result,
5000 						 makeConst(exprType(rawExpr),
5001 								   exprTypmod(rawExpr),
5002 								   exprCollation(rawExpr),
5003 								   1,
5004 								   (Datum) 0,
5005 								   true,
5006 								   true));
5007 	}
5008 
5009 	return result;
5010 }
5011 
5012 /*
5013  * RelationGetIndexPredicate -- get the index predicate for an index
5014  *
5015  * We cache the result of transforming pg_index.indpred into an implicit-AND
5016  * node tree (suitable for use in planning).
5017  * If the rel is not an index or has no predicate, we return NIL.
5018  * Otherwise, the returned tree is copied into the caller's memory context.
5019  * (We don't want to return a pointer to the relcache copy, since it could
5020  * disappear due to relcache invalidation.)
5021  */
5022 List *
RelationGetIndexPredicate(Relation relation)5023 RelationGetIndexPredicate(Relation relation)
5024 {
5025 	List	   *result;
5026 	Datum		predDatum;
5027 	bool		isnull;
5028 	char	   *predString;
5029 	MemoryContext oldcxt;
5030 
5031 	/* Quick exit if we already computed the result. */
5032 	if (relation->rd_indpred)
5033 		return copyObject(relation->rd_indpred);
5034 
5035 	/* Quick exit if there is nothing to do. */
5036 	if (relation->rd_indextuple == NULL ||
5037 		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred, NULL))
5038 		return NIL;
5039 
5040 	/*
5041 	 * We build the tree we intend to return in the caller's context. After
5042 	 * successfully completing the work, we copy it into the relcache entry.
5043 	 * This avoids problems if we get some sort of error partway through.
5044 	 */
5045 	predDatum = heap_getattr(relation->rd_indextuple,
5046 							 Anum_pg_index_indpred,
5047 							 GetPgIndexDescriptor(),
5048 							 &isnull);
5049 	Assert(!isnull);
5050 	predString = TextDatumGetCString(predDatum);
5051 	result = (List *) stringToNode(predString);
5052 	pfree(predString);
5053 
5054 	/*
5055 	 * Run the expression through const-simplification and canonicalization.
5056 	 * This is not just an optimization, but is necessary, because the planner
5057 	 * will be comparing it to similarly-processed qual clauses, and may fail
5058 	 * to detect valid matches without this.  This must match the processing
5059 	 * done to qual clauses in preprocess_expression()!  (We can skip the
5060 	 * stuff involving subqueries, however, since we don't allow any in index
5061 	 * predicates.)
5062 	 */
5063 	result = (List *) eval_const_expressions(NULL, (Node *) result);
5064 
5065 	result = (List *) canonicalize_qual((Expr *) result, false);
5066 
5067 	/* Also convert to implicit-AND format */
5068 	result = make_ands_implicit((Expr *) result);
5069 
5070 	/* May as well fix opfuncids too */
5071 	fix_opfuncids((Node *) result);
5072 
5073 	/* Now save a copy of the completed tree in the relcache entry. */
5074 	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
5075 	relation->rd_indpred = copyObject(result);
5076 	MemoryContextSwitchTo(oldcxt);
5077 
5078 	return result;
5079 }
5080 
5081 /*
5082  * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
5083  *
5084  * The result has a bit set for each attribute used anywhere in the index
5085  * definitions of all the indexes on this relation.  (This includes not only
5086  * simple index keys, but attributes used in expressions and partial-index
5087  * predicates.)
5088  *
5089  * Depending on attrKind, a bitmap covering the attnums for all index columns,
5090  * for all potential foreign key columns, or for all columns in the configured
5091  * replica identity index is returned.
5092  *
5093  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
5094  * we can include system attributes (e.g., OID) in the bitmap representation.
5095  *
5096  * Caller had better hold at least RowExclusiveLock on the target relation
5097  * to ensure it is safe (deadlock-free) for us to take locks on the relation's
5098  * indexes.  Note that since the introduction of CREATE INDEX CONCURRENTLY,
5099  * that lock level doesn't guarantee a stable set of indexes, so we have to
5100  * be prepared to retry here in case of a change in the set of indexes.
5101  *
5102  * The returned result is palloc'd in the caller's memory context and should
5103  * be bms_free'd when not needed anymore.
5104  */
5105 Bitmapset *
RelationGetIndexAttrBitmap(Relation relation,IndexAttrBitmapKind attrKind)5106 RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
5107 {
5108 	Bitmapset  *indexattrs;		/* indexed columns */
5109 	Bitmapset  *uindexattrs;	/* columns in unique indexes */
5110 	Bitmapset  *pkindexattrs;	/* columns in the primary index */
5111 	Bitmapset  *idindexattrs;	/* columns in the replica identity */
5112 	List	   *indexoidlist;
5113 	List	   *newindexoidlist;
5114 	Oid			relpkindex;
5115 	Oid			relreplindex;
5116 	ListCell   *l;
5117 	MemoryContext oldcxt;
5118 
5119 	/* Quick exit if we already computed the result. */
5120 	if (relation->rd_indexattr != NULL)
5121 	{
5122 		switch (attrKind)
5123 		{
5124 			case INDEX_ATTR_BITMAP_ALL:
5125 				return bms_copy(relation->rd_indexattr);
5126 			case INDEX_ATTR_BITMAP_KEY:
5127 				return bms_copy(relation->rd_keyattr);
5128 			case INDEX_ATTR_BITMAP_PRIMARY_KEY:
5129 				return bms_copy(relation->rd_pkattr);
5130 			case INDEX_ATTR_BITMAP_IDENTITY_KEY:
5131 				return bms_copy(relation->rd_idattr);
5132 			default:
5133 				elog(ERROR, "unknown attrKind %u", attrKind);
5134 		}
5135 	}
5136 
5137 	/* Fast path if definitely no indexes */
5138 	if (!RelationGetForm(relation)->relhasindex)
5139 		return NULL;
5140 
5141 	/*
5142 	 * Get cached list of index OIDs. If we have to start over, we do so here.
5143 	 */
5144 restart:
5145 	indexoidlist = RelationGetIndexList(relation);
5146 
5147 	/* Fall out if no indexes (but relhasindex was set) */
5148 	if (indexoidlist == NIL)
5149 		return NULL;
5150 
5151 	/*
5152 	 * Copy the rd_pkindex and rd_replidindex values computed by
5153 	 * RelationGetIndexList before proceeding.  This is needed because a
5154 	 * relcache flush could occur inside index_open below, resetting the
5155 	 * fields managed by RelationGetIndexList.  We need to do the work with
5156 	 * stable values of these fields.
5157 	 */
5158 	relpkindex = relation->rd_pkindex;
5159 	relreplindex = relation->rd_replidindex;
5160 
5161 	/*
5162 	 * For each index, add referenced attributes to indexattrs.
5163 	 *
5164 	 * Note: we consider all indexes returned by RelationGetIndexList, even if
5165 	 * they are not indisready or indisvalid.  This is important because an
5166 	 * index for which CREATE INDEX CONCURRENTLY has just started must be
5167 	 * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
5168 	 * CONCURRENTLY is far enough along that we should ignore the index, it
5169 	 * won't be returned at all by RelationGetIndexList.
5170 	 */
5171 	indexattrs = NULL;
5172 	uindexattrs = NULL;
5173 	pkindexattrs = NULL;
5174 	idindexattrs = NULL;
5175 	foreach(l, indexoidlist)
5176 	{
5177 		Oid			indexOid = lfirst_oid(l);
5178 		Relation	indexDesc;
5179 		Datum		datum;
5180 		bool		isnull;
5181 		Node	   *indexExpressions;
5182 		Node	   *indexPredicate;
5183 		int			i;
5184 		bool		isKey;		/* candidate key */
5185 		bool		isPK;		/* primary key */
5186 		bool		isIDKey;	/* replica identity index */
5187 
5188 		indexDesc = index_open(indexOid, AccessShareLock);
5189 
5190 		/*
5191 		 * Extract index expressions and index predicate.  Note: Don't use
5192 		 * RelationGetIndexExpressions()/RelationGetIndexPredicate(), because
5193 		 * those might run constant expressions evaluation, which needs a
5194 		 * snapshot, which we might not have here.  (Also, it's probably more
5195 		 * sound to collect the bitmaps before any transformations that might
5196 		 * eliminate columns, but the practical impact of this is limited.)
5197 		 */
5198 
5199 		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indexprs,
5200 							 GetPgIndexDescriptor(), &isnull);
5201 		if (!isnull)
5202 			indexExpressions = stringToNode(TextDatumGetCString(datum));
5203 		else
5204 			indexExpressions = NULL;
5205 
5206 		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indpred,
5207 							 GetPgIndexDescriptor(), &isnull);
5208 		if (!isnull)
5209 			indexPredicate = stringToNode(TextDatumGetCString(datum));
5210 		else
5211 			indexPredicate = NULL;
5212 
5213 		/* Can this index be referenced by a foreign key? */
5214 		isKey = indexDesc->rd_index->indisunique &&
5215 			indexExpressions == NULL &&
5216 			indexPredicate == NULL;
5217 
5218 		/* Is this a primary key? */
5219 		isPK = (indexOid == relpkindex);
5220 
5221 		/* Is this index the configured (or default) replica identity? */
5222 		isIDKey = (indexOid == relreplindex);
5223 
5224 		/* Collect simple attribute references */
5225 		for (i = 0; i < indexDesc->rd_index->indnatts; i++)
5226 		{
5227 			int			attrnum = indexDesc->rd_index->indkey.values[i];
5228 
5229 			/*
5230 			 * Since we have covering indexes with non-key columns, we must
5231 			 * handle them accurately here. non-key columns must be added into
5232 			 * indexattrs, since they are in index, and HOT-update shouldn't
5233 			 * miss them. Obviously, non-key columns couldn't be referenced by
5234 			 * foreign key or identity key. Hence we do not include them into
5235 			 * uindexattrs, pkindexattrs and idindexattrs bitmaps.
5236 			 */
5237 			if (attrnum != 0)
5238 			{
5239 				indexattrs = bms_add_member(indexattrs,
5240 											attrnum - FirstLowInvalidHeapAttributeNumber);
5241 
5242 				if (isKey && i < indexDesc->rd_index->indnkeyatts)
5243 					uindexattrs = bms_add_member(uindexattrs,
5244 												 attrnum - FirstLowInvalidHeapAttributeNumber);
5245 
5246 				if (isPK && i < indexDesc->rd_index->indnkeyatts)
5247 					pkindexattrs = bms_add_member(pkindexattrs,
5248 												  attrnum - FirstLowInvalidHeapAttributeNumber);
5249 
5250 				if (isIDKey && i < indexDesc->rd_index->indnkeyatts)
5251 					idindexattrs = bms_add_member(idindexattrs,
5252 												  attrnum - FirstLowInvalidHeapAttributeNumber);
5253 			}
5254 		}
5255 
5256 		/* Collect all attributes used in expressions, too */
5257 		pull_varattnos(indexExpressions, 1, &indexattrs);
5258 
5259 		/* Collect all attributes in the index predicate, too */
5260 		pull_varattnos(indexPredicate, 1, &indexattrs);
5261 
5262 		index_close(indexDesc, AccessShareLock);
5263 	}
5264 
5265 	/*
5266 	 * During one of the index_opens in the above loop, we might have received
5267 	 * a relcache flush event on this relcache entry, which might have been
5268 	 * signaling a change in the rel's index list.  If so, we'd better start
5269 	 * over to ensure we deliver up-to-date attribute bitmaps.
5270 	 */
5271 	newindexoidlist = RelationGetIndexList(relation);
5272 	if (equal(indexoidlist, newindexoidlist) &&
5273 		relpkindex == relation->rd_pkindex &&
5274 		relreplindex == relation->rd_replidindex)
5275 	{
5276 		/* Still the same index set, so proceed */
5277 		list_free(newindexoidlist);
5278 		list_free(indexoidlist);
5279 	}
5280 	else
5281 	{
5282 		/* Gotta do it over ... might as well not leak memory */
5283 		list_free(newindexoidlist);
5284 		list_free(indexoidlist);
5285 		bms_free(uindexattrs);
5286 		bms_free(pkindexattrs);
5287 		bms_free(idindexattrs);
5288 		bms_free(indexattrs);
5289 
5290 		goto restart;
5291 	}
5292 
5293 	/* Don't leak the old values of these bitmaps, if any */
5294 	bms_free(relation->rd_indexattr);
5295 	relation->rd_indexattr = NULL;
5296 	bms_free(relation->rd_keyattr);
5297 	relation->rd_keyattr = NULL;
5298 	bms_free(relation->rd_pkattr);
5299 	relation->rd_pkattr = NULL;
5300 	bms_free(relation->rd_idattr);
5301 	relation->rd_idattr = NULL;
5302 
5303 	/*
5304 	 * Now save copies of the bitmaps in the relcache entry.  We intentionally
5305 	 * set rd_indexattr last, because that's the one that signals validity of
5306 	 * the values; if we run out of memory before making that copy, we won't
5307 	 * leave the relcache entry looking like the other ones are valid but
5308 	 * empty.
5309 	 */
5310 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5311 	relation->rd_keyattr = bms_copy(uindexattrs);
5312 	relation->rd_pkattr = bms_copy(pkindexattrs);
5313 	relation->rd_idattr = bms_copy(idindexattrs);
5314 	relation->rd_indexattr = bms_copy(indexattrs);
5315 	MemoryContextSwitchTo(oldcxt);
5316 
5317 	/* We return our original working copy for caller to play with */
5318 	switch (attrKind)
5319 	{
5320 		case INDEX_ATTR_BITMAP_ALL:
5321 			return indexattrs;
5322 		case INDEX_ATTR_BITMAP_KEY:
5323 			return uindexattrs;
5324 		case INDEX_ATTR_BITMAP_PRIMARY_KEY:
5325 			return pkindexattrs;
5326 		case INDEX_ATTR_BITMAP_IDENTITY_KEY:
5327 			return idindexattrs;
5328 		default:
5329 			elog(ERROR, "unknown attrKind %u", attrKind);
5330 			return NULL;
5331 	}
5332 }
5333 
5334 /*
5335  * RelationGetIdentityKeyBitmap -- get a bitmap of replica identity attribute
5336  * numbers
5337  *
5338  * A bitmap of index attribute numbers for the configured replica identity
5339  * index is returned.
5340  *
5341  * See also comments of RelationGetIndexAttrBitmap().
5342  *
5343  * This is a special purpose function used during logical replication. Here,
5344  * unlike RelationGetIndexAttrBitmap(), we don't acquire a lock on the required
5345  * index as we build the cache entry using a historic snapshot and all the
5346  * later changes are absorbed while decoding WAL. Due to this reason, we don't
5347  * need to retry here in case of a change in the set of indexes.
5348  */
5349 Bitmapset *
RelationGetIdentityKeyBitmap(Relation relation)5350 RelationGetIdentityKeyBitmap(Relation relation)
5351 {
5352 	Bitmapset  *idindexattrs = NULL;	/* columns in the replica identity */
5353 	Relation	indexDesc;
5354 	int			i;
5355 	Oid			replidindex;
5356 	MemoryContext oldcxt;
5357 
5358 	/* Quick exit if we already computed the result */
5359 	if (relation->rd_idattr != NULL)
5360 		return bms_copy(relation->rd_idattr);
5361 
5362 	/* Fast path if definitely no indexes */
5363 	if (!RelationGetForm(relation)->relhasindex)
5364 		return NULL;
5365 
5366 	/* Historic snapshot must be set. */
5367 	Assert(HistoricSnapshotActive());
5368 
5369 	replidindex = RelationGetReplicaIndex(relation);
5370 
5371 	/* Fall out if there is no replica identity index */
5372 	if (!OidIsValid(replidindex))
5373 		return NULL;
5374 
5375 	/* Look up the description for the replica identity index */
5376 	indexDesc = RelationIdGetRelation(replidindex);
5377 
5378 	if (!RelationIsValid(indexDesc))
5379 		elog(ERROR, "could not open relation with OID %u",
5380 			 relation->rd_replidindex);
5381 
5382 	/* Add referenced attributes to idindexattrs */
5383 	for (i = 0; i < indexDesc->rd_index->indnatts; i++)
5384 	{
5385 		int			attrnum = indexDesc->rd_index->indkey.values[i];
5386 
5387 		/*
5388 		 * We don't include non-key columns into idindexattrs bitmaps. See
5389 		 * RelationGetIndexAttrBitmap.
5390 		 */
5391 		if (attrnum != 0)
5392 		{
5393 			if (i < indexDesc->rd_index->indnkeyatts)
5394 				idindexattrs = bms_add_member(idindexattrs,
5395 											  attrnum - FirstLowInvalidHeapAttributeNumber);
5396 		}
5397 	}
5398 
5399 	RelationClose(indexDesc);
5400 
5401 	/* Don't leak the old values of these bitmaps, if any */
5402 	bms_free(relation->rd_idattr);
5403 	relation->rd_idattr = NULL;
5404 
5405 	/* Now save copy of the bitmap in the relcache entry */
5406 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5407 	relation->rd_idattr = bms_copy(idindexattrs);
5408 	MemoryContextSwitchTo(oldcxt);
5409 
5410 	/* We return our original working copy for caller to play with */
5411 	return idindexattrs;
5412 }
5413 
5414 /*
5415  * RelationGetExclusionInfo -- get info about index's exclusion constraint
5416  *
5417  * This should be called only for an index that is known to have an
5418  * associated exclusion constraint.  It returns arrays (palloc'd in caller's
5419  * context) of the exclusion operator OIDs, their underlying functions'
5420  * OIDs, and their strategy numbers in the index's opclasses.  We cache
5421  * all this information since it requires a fair amount of work to get.
5422  */
5423 void
RelationGetExclusionInfo(Relation indexRelation,Oid ** operators,Oid ** procs,uint16 ** strategies)5424 RelationGetExclusionInfo(Relation indexRelation,
5425 						 Oid **operators,
5426 						 Oid **procs,
5427 						 uint16 **strategies)
5428 {
5429 	int			indnkeyatts;
5430 	Oid		   *ops;
5431 	Oid		   *funcs;
5432 	uint16	   *strats;
5433 	Relation	conrel;
5434 	SysScanDesc conscan;
5435 	ScanKeyData skey[1];
5436 	HeapTuple	htup;
5437 	bool		found;
5438 	MemoryContext oldcxt;
5439 	int			i;
5440 
5441 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
5442 
5443 	/* Allocate result space in caller context */
5444 	*operators = ops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5445 	*procs = funcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5446 	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
5447 
5448 	/* Quick exit if we have the data cached already */
5449 	if (indexRelation->rd_exclstrats != NULL)
5450 	{
5451 		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * indnkeyatts);
5452 		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * indnkeyatts);
5453 		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * indnkeyatts);
5454 		return;
5455 	}
5456 
5457 	/*
5458 	 * Search pg_constraint for the constraint associated with the index. To
5459 	 * make this not too painfully slow, we use the index on conrelid; that
5460 	 * will hold the parent relation's OID not the index's own OID.
5461 	 *
5462 	 * Note: if we wanted to rely on the constraint name matching the index's
5463 	 * name, we could just do a direct lookup using pg_constraint's unique
5464 	 * index.  For the moment it doesn't seem worth requiring that.
5465 	 */
5466 	ScanKeyInit(&skey[0],
5467 				Anum_pg_constraint_conrelid,
5468 				BTEqualStrategyNumber, F_OIDEQ,
5469 				ObjectIdGetDatum(indexRelation->rd_index->indrelid));
5470 
5471 	conrel = table_open(ConstraintRelationId, AccessShareLock);
5472 	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
5473 								 NULL, 1, skey);
5474 	found = false;
5475 
5476 	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
5477 	{
5478 		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
5479 		Datum		val;
5480 		bool		isnull;
5481 		ArrayType  *arr;
5482 		int			nelem;
5483 
5484 		/* We want the exclusion constraint owning the index */
5485 		if (conform->contype != CONSTRAINT_EXCLUSION ||
5486 			conform->conindid != RelationGetRelid(indexRelation))
5487 			continue;
5488 
5489 		/* There should be only one */
5490 		if (found)
5491 			elog(ERROR, "unexpected exclusion constraint record found for rel %s",
5492 				 RelationGetRelationName(indexRelation));
5493 		found = true;
5494 
5495 		/* Extract the operator OIDS from conexclop */
5496 		val = fastgetattr(htup,
5497 						  Anum_pg_constraint_conexclop,
5498 						  conrel->rd_att, &isnull);
5499 		if (isnull)
5500 			elog(ERROR, "null conexclop for rel %s",
5501 				 RelationGetRelationName(indexRelation));
5502 
5503 		arr = DatumGetArrayTypeP(val);	/* ensure not toasted */
5504 		nelem = ARR_DIMS(arr)[0];
5505 		if (ARR_NDIM(arr) != 1 ||
5506 			nelem != indnkeyatts ||
5507 			ARR_HASNULL(arr) ||
5508 			ARR_ELEMTYPE(arr) != OIDOID)
5509 			elog(ERROR, "conexclop is not a 1-D Oid array");
5510 
5511 		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * indnkeyatts);
5512 	}
5513 
5514 	systable_endscan(conscan);
5515 	table_close(conrel, AccessShareLock);
5516 
5517 	if (!found)
5518 		elog(ERROR, "exclusion constraint record missing for rel %s",
5519 			 RelationGetRelationName(indexRelation));
5520 
5521 	/* We need the func OIDs and strategy numbers too */
5522 	for (i = 0; i < indnkeyatts; i++)
5523 	{
5524 		funcs[i] = get_opcode(ops[i]);
5525 		strats[i] = get_op_opfamily_strategy(ops[i],
5526 											 indexRelation->rd_opfamily[i]);
5527 		/* shouldn't fail, since it was checked at index creation */
5528 		if (strats[i] == InvalidStrategy)
5529 			elog(ERROR, "could not find strategy for operator %u in family %u",
5530 				 ops[i], indexRelation->rd_opfamily[i]);
5531 	}
5532 
5533 	/* Save a copy of the results in the relcache entry. */
5534 	oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
5535 	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5536 	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5537 	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
5538 	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * indnkeyatts);
5539 	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * indnkeyatts);
5540 	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * indnkeyatts);
5541 	MemoryContextSwitchTo(oldcxt);
5542 }
5543 
5544 /*
5545  * Get publication actions for the given relation.
5546  */
5547 struct PublicationActions *
GetRelationPublicationActions(Relation relation)5548 GetRelationPublicationActions(Relation relation)
5549 {
5550 	List	   *puboids;
5551 	ListCell   *lc;
5552 	MemoryContext oldcxt;
5553 	PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
5554 
5555 	/*
5556 	 * If not publishable, it publishes no actions.  (pgoutput_change() will
5557 	 * ignore it.)
5558 	 */
5559 	if (!is_publishable_relation(relation))
5560 		return pubactions;
5561 
5562 	if (relation->rd_pubactions)
5563 		return memcpy(pubactions, relation->rd_pubactions,
5564 					  sizeof(PublicationActions));
5565 
5566 	/* Fetch the publication membership info. */
5567 	puboids = GetRelationPublications(RelationGetRelid(relation));
5568 	if (relation->rd_rel->relispartition)
5569 	{
5570 		/* Add publications that the ancestors are in too. */
5571 		List	   *ancestors = get_partition_ancestors(RelationGetRelid(relation));
5572 		ListCell   *lc;
5573 
5574 		foreach(lc, ancestors)
5575 		{
5576 			Oid			ancestor = lfirst_oid(lc);
5577 
5578 			puboids = list_concat_unique_oid(puboids,
5579 											 GetRelationPublications(ancestor));
5580 		}
5581 	}
5582 	puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
5583 
5584 	foreach(lc, puboids)
5585 	{
5586 		Oid			pubid = lfirst_oid(lc);
5587 		HeapTuple	tup;
5588 		Form_pg_publication pubform;
5589 
5590 		tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
5591 
5592 		if (!HeapTupleIsValid(tup))
5593 			elog(ERROR, "cache lookup failed for publication %u", pubid);
5594 
5595 		pubform = (Form_pg_publication) GETSTRUCT(tup);
5596 
5597 		pubactions->pubinsert |= pubform->pubinsert;
5598 		pubactions->pubupdate |= pubform->pubupdate;
5599 		pubactions->pubdelete |= pubform->pubdelete;
5600 		pubactions->pubtruncate |= pubform->pubtruncate;
5601 
5602 		ReleaseSysCache(tup);
5603 
5604 		/*
5605 		 * If we know everything is replicated, there is no point to check for
5606 		 * other publications.
5607 		 */
5608 		if (pubactions->pubinsert && pubactions->pubupdate &&
5609 			pubactions->pubdelete && pubactions->pubtruncate)
5610 			break;
5611 	}
5612 
5613 	if (relation->rd_pubactions)
5614 	{
5615 		pfree(relation->rd_pubactions);
5616 		relation->rd_pubactions = NULL;
5617 	}
5618 
5619 	/* Now save copy of the actions in the relcache entry. */
5620 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5621 	relation->rd_pubactions = palloc(sizeof(PublicationActions));
5622 	memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
5623 	MemoryContextSwitchTo(oldcxt);
5624 
5625 	return pubactions;
5626 }
5627 
5628 /*
5629  * RelationGetIndexRawAttOptions -- get AM/opclass-specific options for the index
5630  */
5631 Datum *
RelationGetIndexRawAttOptions(Relation indexrel)5632 RelationGetIndexRawAttOptions(Relation indexrel)
5633 {
5634 	Oid			indexrelid = RelationGetRelid(indexrel);
5635 	int16		natts = RelationGetNumberOfAttributes(indexrel);
5636 	Datum	   *options = NULL;
5637 	int16		attnum;
5638 
5639 	for (attnum = 1; attnum <= natts; attnum++)
5640 	{
5641 		if (indexrel->rd_indam->amoptsprocnum == 0)
5642 			continue;
5643 
5644 		if (!OidIsValid(index_getprocid(indexrel, attnum,
5645 										indexrel->rd_indam->amoptsprocnum)))
5646 			continue;
5647 
5648 		if (!options)
5649 			options = palloc0(sizeof(Datum) * natts);
5650 
5651 		options[attnum - 1] = get_attoptions(indexrelid, attnum);
5652 	}
5653 
5654 	return options;
5655 }
5656 
5657 static bytea **
CopyIndexAttOptions(bytea ** srcopts,int natts)5658 CopyIndexAttOptions(bytea **srcopts, int natts)
5659 {
5660 	bytea	  **opts = palloc(sizeof(*opts) * natts);
5661 
5662 	for (int i = 0; i < natts; i++)
5663 	{
5664 		bytea	   *opt = srcopts[i];
5665 
5666 		opts[i] = !opt ? NULL : (bytea *)
5667 			DatumGetPointer(datumCopy(PointerGetDatum(opt), false, -1));
5668 	}
5669 
5670 	return opts;
5671 }
5672 
5673 /*
5674  * RelationGetIndexAttOptions
5675  *		get AM/opclass-specific options for an index parsed into a binary form
5676  */
5677 bytea	  **
RelationGetIndexAttOptions(Relation relation,bool copy)5678 RelationGetIndexAttOptions(Relation relation, bool copy)
5679 {
5680 	MemoryContext oldcxt;
5681 	bytea	  **opts = relation->rd_opcoptions;
5682 	Oid			relid = RelationGetRelid(relation);
5683 	int			natts = RelationGetNumberOfAttributes(relation);	/* XXX
5684 																	 * IndexRelationGetNumberOfKeyAttributes */
5685 	int			i;
5686 
5687 	/* Try to copy cached options. */
5688 	if (opts)
5689 		return copy ? CopyIndexAttOptions(opts, natts) : opts;
5690 
5691 	/* Get and parse opclass options. */
5692 	opts = palloc0(sizeof(*opts) * natts);
5693 
5694 	for (i = 0; i < natts; i++)
5695 	{
5696 		if (criticalRelcachesBuilt && relid != AttributeRelidNumIndexId)
5697 		{
5698 			Datum		attoptions = get_attoptions(relid, i + 1);
5699 
5700 			opts[i] = index_opclass_options(relation, i + 1, attoptions, false);
5701 
5702 			if (attoptions != (Datum) 0)
5703 				pfree(DatumGetPointer(attoptions));
5704 		}
5705 	}
5706 
5707 	/* Copy parsed options to the cache. */
5708 	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
5709 	relation->rd_opcoptions = CopyIndexAttOptions(opts, natts);
5710 	MemoryContextSwitchTo(oldcxt);
5711 
5712 	if (copy)
5713 		return opts;
5714 
5715 	for (i = 0; i < natts; i++)
5716 	{
5717 		if (opts[i])
5718 			pfree(opts[i]);
5719 	}
5720 
5721 	pfree(opts);
5722 
5723 	return relation->rd_opcoptions;
5724 }
5725 
5726 /*
5727  * Routines to support ereport() reports of relation-related errors
5728  *
5729  * These could have been put into elog.c, but it seems like a module layering
5730  * violation to have elog.c calling relcache or syscache stuff --- and we
5731  * definitely don't want elog.h including rel.h.  So we put them here.
5732  */
5733 
5734 /*
5735  * errtable --- stores schema_name and table_name of a table
5736  * within the current errordata.
5737  */
5738 int
errtable(Relation rel)5739 errtable(Relation rel)
5740 {
5741 	err_generic_string(PG_DIAG_SCHEMA_NAME,
5742 					   get_namespace_name(RelationGetNamespace(rel)));
5743 	err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));
5744 
5745 	return 0;					/* return value does not matter */
5746 }
5747 
5748 /*
5749  * errtablecol --- stores schema_name, table_name and column_name
5750  * of a table column within the current errordata.
5751  *
5752  * The column is specified by attribute number --- for most callers, this is
5753  * easier and less error-prone than getting the column name for themselves.
5754  */
5755 int
errtablecol(Relation rel,int attnum)5756 errtablecol(Relation rel, int attnum)
5757 {
5758 	TupleDesc	reldesc = RelationGetDescr(rel);
5759 	const char *colname;
5760 
5761 	/* Use reldesc if it's a user attribute, else consult the catalogs */
5762 	if (attnum > 0 && attnum <= reldesc->natts)
5763 		colname = NameStr(TupleDescAttr(reldesc, attnum - 1)->attname);
5764 	else
5765 		colname = get_attname(RelationGetRelid(rel), attnum, false);
5766 
5767 	return errtablecolname(rel, colname);
5768 }
5769 
5770 /*
5771  * errtablecolname --- stores schema_name, table_name and column_name
5772  * of a table column within the current errordata, where the column name is
5773  * given directly rather than extracted from the relation's catalog data.
5774  *
5775  * Don't use this directly unless errtablecol() is inconvenient for some
5776  * reason.  This might possibly be needed during intermediate states in ALTER
5777  * TABLE, for instance.
5778  */
5779 int
errtablecolname(Relation rel,const char * colname)5780 errtablecolname(Relation rel, const char *colname)
5781 {
5782 	errtable(rel);
5783 	err_generic_string(PG_DIAG_COLUMN_NAME, colname);
5784 
5785 	return 0;					/* return value does not matter */
5786 }
5787 
5788 /*
5789  * errtableconstraint --- stores schema_name, table_name and constraint_name
5790  * of a table-related constraint within the current errordata.
5791  */
5792 int
errtableconstraint(Relation rel,const char * conname)5793 errtableconstraint(Relation rel, const char *conname)
5794 {
5795 	errtable(rel);
5796 	err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);
5797 
5798 	return 0;					/* return value does not matter */
5799 }
5800 
5801 
5802 /*
5803  *	load_relcache_init_file, write_relcache_init_file
5804  *
5805  *		In late 1992, we started regularly having databases with more than
5806  *		a thousand classes in them.  With this number of classes, it became
5807  *		critical to do indexed lookups on the system catalogs.
5808  *
5809  *		Bootstrapping these lookups is very hard.  We want to be able to
5810  *		use an index on pg_attribute, for example, but in order to do so,
5811  *		we must have read pg_attribute for the attributes in the index,
5812  *		which implies that we need to use the index.
5813  *
5814  *		In order to get around the problem, we do the following:
5815  *
5816  *		   +  When the database system is initialized (at initdb time), we
5817  *			  don't use indexes.  We do sequential scans.
5818  *
5819  *		   +  When the backend is started up in normal mode, we load an image
5820  *			  of the appropriate relation descriptors, in internal format,
5821  *			  from an initialization file in the data/base/... directory.
5822  *
5823  *		   +  If the initialization file isn't there, then we create the
5824  *			  relation descriptors using sequential scans and write 'em to
5825  *			  the initialization file for use by subsequent backends.
5826  *
5827  *		As of Postgres 9.0, there is one local initialization file in each
5828  *		database, plus one shared initialization file for shared catalogs.
5829  *
5830  *		We could dispense with the initialization files and just build the
5831  *		critical reldescs the hard way on every backend startup, but that
5832  *		slows down backend startup noticeably.
5833  *
5834  *		We can in fact go further, and save more relcache entries than
5835  *		just the ones that are absolutely critical; this allows us to speed
5836  *		up backend startup by not having to build such entries the hard way.
5837  *		Presently, all the catalog and index entries that are referred to
5838  *		by catcaches are stored in the initialization files.
5839  *
5840  *		The same mechanism that detects when catcache and relcache entries
5841  *		need to be invalidated (due to catalog updates) also arranges to
5842  *		unlink the initialization files when the contents may be out of date.
5843  *		The files will then be rebuilt during the next backend startup.
5844  */
5845 
5846 /*
5847  * load_relcache_init_file -- attempt to load cache from the shared
5848  * or local cache init file
5849  *
5850  * If successful, return true and set criticalRelcachesBuilt or
5851  * criticalSharedRelcachesBuilt to true.
5852  * If not successful, return false.
5853  *
5854  * NOTE: we assume we are already switched into CacheMemoryContext.
5855  */
5856 static bool
load_relcache_init_file(bool shared)5857 load_relcache_init_file(bool shared)
5858 {
5859 	FILE	   *fp;
5860 	char		initfilename[MAXPGPATH];
5861 	Relation   *rels;
5862 	int			relno,
5863 				num_rels,
5864 				max_rels,
5865 				nailed_rels,
5866 				nailed_indexes,
5867 				magic;
5868 	int			i;
5869 
5870 	if (shared)
5871 		snprintf(initfilename, sizeof(initfilename), "global/%s",
5872 				 RELCACHE_INIT_FILENAME);
5873 	else
5874 		snprintf(initfilename, sizeof(initfilename), "%s/%s",
5875 				 DatabasePath, RELCACHE_INIT_FILENAME);
5876 
5877 	fp = AllocateFile(initfilename, PG_BINARY_R);
5878 	if (fp == NULL)
5879 		return false;
5880 
5881 	/*
5882 	 * Read the index relcache entries from the file.  Note we will not enter
5883 	 * any of them into the cache if the read fails partway through; this
5884 	 * helps to guard against broken init files.
5885 	 */
5886 	max_rels = 100;
5887 	rels = (Relation *) palloc(max_rels * sizeof(Relation));
5888 	num_rels = 0;
5889 	nailed_rels = nailed_indexes = 0;
5890 
5891 	/* check for correct magic number (compatible version) */
5892 	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
5893 		goto read_failed;
5894 	if (magic != RELCACHE_INIT_FILEMAGIC)
5895 		goto read_failed;
5896 
5897 	for (relno = 0;; relno++)
5898 	{
5899 		Size		len;
5900 		size_t		nread;
5901 		Relation	rel;
5902 		Form_pg_class relform;
5903 		bool		has_not_null;
5904 
5905 		/* first read the relation descriptor length */
5906 		nread = fread(&len, 1, sizeof(len), fp);
5907 		if (nread != sizeof(len))
5908 		{
5909 			if (nread == 0)
5910 				break;			/* end of file */
5911 			goto read_failed;
5912 		}
5913 
5914 		/* safety check for incompatible relcache layout */
5915 		if (len != sizeof(RelationData))
5916 			goto read_failed;
5917 
5918 		/* allocate another relcache header */
5919 		if (num_rels >= max_rels)
5920 		{
5921 			max_rels *= 2;
5922 			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
5923 		}
5924 
5925 		rel = rels[num_rels++] = (Relation) palloc(len);
5926 
5927 		/* then, read the Relation structure */
5928 		if (fread(rel, 1, len, fp) != len)
5929 			goto read_failed;
5930 
5931 		/* next read the relation tuple form */
5932 		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5933 			goto read_failed;
5934 
5935 		relform = (Form_pg_class) palloc(len);
5936 		if (fread(relform, 1, len, fp) != len)
5937 			goto read_failed;
5938 
5939 		rel->rd_rel = relform;
5940 
5941 		/* initialize attribute tuple forms */
5942 		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts);
5943 		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
5944 
5945 		rel->rd_att->tdtypeid = relform->reltype ? relform->reltype : RECORDOID;
5946 		rel->rd_att->tdtypmod = -1; /* just to be sure */
5947 
5948 		/* next read all the attribute tuple form data entries */
5949 		has_not_null = false;
5950 		for (i = 0; i < relform->relnatts; i++)
5951 		{
5952 			Form_pg_attribute attr = TupleDescAttr(rel->rd_att, i);
5953 
5954 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5955 				goto read_failed;
5956 			if (len != ATTRIBUTE_FIXED_PART_SIZE)
5957 				goto read_failed;
5958 			if (fread(attr, 1, len, fp) != len)
5959 				goto read_failed;
5960 
5961 			has_not_null |= attr->attnotnull;
5962 		}
5963 
5964 		/* next read the access method specific field */
5965 		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5966 			goto read_failed;
5967 		if (len > 0)
5968 		{
5969 			rel->rd_options = palloc(len);
5970 			if (fread(rel->rd_options, 1, len, fp) != len)
5971 				goto read_failed;
5972 			if (len != VARSIZE(rel->rd_options))
5973 				goto read_failed;	/* sanity check */
5974 		}
5975 		else
5976 		{
5977 			rel->rd_options = NULL;
5978 		}
5979 
5980 		/* mark not-null status */
5981 		if (has_not_null)
5982 		{
5983 			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
5984 
5985 			constr->has_not_null = true;
5986 			rel->rd_att->constr = constr;
5987 		}
5988 
5989 		/*
5990 		 * If it's an index, there's more to do.  Note we explicitly ignore
5991 		 * partitioned indexes here.
5992 		 */
5993 		if (rel->rd_rel->relkind == RELKIND_INDEX)
5994 		{
5995 			MemoryContext indexcxt;
5996 			Oid		   *opfamily;
5997 			Oid		   *opcintype;
5998 			RegProcedure *support;
5999 			int			nsupport;
6000 			int16	   *indoption;
6001 			Oid		   *indcollation;
6002 
6003 			/* Count nailed indexes to ensure we have 'em all */
6004 			if (rel->rd_isnailed)
6005 				nailed_indexes++;
6006 
6007 			/* next, read the pg_index tuple */
6008 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6009 				goto read_failed;
6010 
6011 			rel->rd_indextuple = (HeapTuple) palloc(len);
6012 			if (fread(rel->rd_indextuple, 1, len, fp) != len)
6013 				goto read_failed;
6014 
6015 			/* Fix up internal pointers in the tuple -- see heap_copytuple */
6016 			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
6017 			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
6018 
6019 			/*
6020 			 * prepare index info context --- parameters should match
6021 			 * RelationInitIndexAccessInfo
6022 			 */
6023 			indexcxt = AllocSetContextCreate(CacheMemoryContext,
6024 											 "index info",
6025 											 ALLOCSET_SMALL_SIZES);
6026 			rel->rd_indexcxt = indexcxt;
6027 			MemoryContextCopyAndSetIdentifier(indexcxt,
6028 											  RelationGetRelationName(rel));
6029 
6030 			/*
6031 			 * Now we can fetch the index AM's API struct.  (We can't store
6032 			 * that in the init file, since it contains function pointers that
6033 			 * might vary across server executions.  Fortunately, it should be
6034 			 * safe to call the amhandler even while bootstrapping indexes.)
6035 			 */
6036 			InitIndexAmRoutine(rel);
6037 
6038 			/* next, read the vector of opfamily OIDs */
6039 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6040 				goto read_failed;
6041 
6042 			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
6043 			if (fread(opfamily, 1, len, fp) != len)
6044 				goto read_failed;
6045 
6046 			rel->rd_opfamily = opfamily;
6047 
6048 			/* next, read the vector of opcintype OIDs */
6049 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6050 				goto read_failed;
6051 
6052 			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
6053 			if (fread(opcintype, 1, len, fp) != len)
6054 				goto read_failed;
6055 
6056 			rel->rd_opcintype = opcintype;
6057 
6058 			/* next, read the vector of support procedure OIDs */
6059 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6060 				goto read_failed;
6061 			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
6062 			if (fread(support, 1, len, fp) != len)
6063 				goto read_failed;
6064 
6065 			rel->rd_support = support;
6066 
6067 			/* next, read the vector of collation OIDs */
6068 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6069 				goto read_failed;
6070 
6071 			indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
6072 			if (fread(indcollation, 1, len, fp) != len)
6073 				goto read_failed;
6074 
6075 			rel->rd_indcollation = indcollation;
6076 
6077 			/* finally, read the vector of indoption values */
6078 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6079 				goto read_failed;
6080 
6081 			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
6082 			if (fread(indoption, 1, len, fp) != len)
6083 				goto read_failed;
6084 
6085 			rel->rd_indoption = indoption;
6086 
6087 			/* finally, read the vector of opcoptions values */
6088 			rel->rd_opcoptions = (bytea **)
6089 				MemoryContextAllocZero(indexcxt, sizeof(*rel->rd_opcoptions) * relform->relnatts);
6090 
6091 			for (i = 0; i < relform->relnatts; i++)
6092 			{
6093 				if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
6094 					goto read_failed;
6095 
6096 				if (len > 0)
6097 				{
6098 					rel->rd_opcoptions[i] = (bytea *) MemoryContextAlloc(indexcxt, len);
6099 					if (fread(rel->rd_opcoptions[i], 1, len, fp) != len)
6100 						goto read_failed;
6101 				}
6102 			}
6103 
6104 			/* set up zeroed fmgr-info vector */
6105 			nsupport = relform->relnatts * rel->rd_indam->amsupport;
6106 			rel->rd_supportinfo = (FmgrInfo *)
6107 				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
6108 		}
6109 		else
6110 		{
6111 			/* Count nailed rels to ensure we have 'em all */
6112 			if (rel->rd_isnailed)
6113 				nailed_rels++;
6114 
6115 			/* Load table AM data */
6116 			if (rel->rd_rel->relkind == RELKIND_RELATION ||
6117 				rel->rd_rel->relkind == RELKIND_SEQUENCE ||
6118 				rel->rd_rel->relkind == RELKIND_TOASTVALUE ||
6119 				rel->rd_rel->relkind == RELKIND_MATVIEW)
6120 				RelationInitTableAccessMethod(rel);
6121 
6122 			Assert(rel->rd_index == NULL);
6123 			Assert(rel->rd_indextuple == NULL);
6124 			Assert(rel->rd_indexcxt == NULL);
6125 			Assert(rel->rd_indam == NULL);
6126 			Assert(rel->rd_opfamily == NULL);
6127 			Assert(rel->rd_opcintype == NULL);
6128 			Assert(rel->rd_support == NULL);
6129 			Assert(rel->rd_supportinfo == NULL);
6130 			Assert(rel->rd_indoption == NULL);
6131 			Assert(rel->rd_indcollation == NULL);
6132 			Assert(rel->rd_opcoptions == NULL);
6133 		}
6134 
6135 		/*
6136 		 * Rules and triggers are not saved (mainly because the internal
6137 		 * format is complex and subject to change).  They must be rebuilt if
6138 		 * needed by RelationCacheInitializePhase3.  This is not expected to
6139 		 * be a big performance hit since few system catalogs have such. Ditto
6140 		 * for RLS policy data, partition info, index expressions, predicates,
6141 		 * exclusion info, and FDW info.
6142 		 */
6143 		rel->rd_rules = NULL;
6144 		rel->rd_rulescxt = NULL;
6145 		rel->trigdesc = NULL;
6146 		rel->rd_rsdesc = NULL;
6147 		rel->rd_partkey = NULL;
6148 		rel->rd_partkeycxt = NULL;
6149 		rel->rd_partdesc = NULL;
6150 		rel->rd_partdesc_nodetached = NULL;
6151 		rel->rd_partdesc_nodetached_xmin = InvalidTransactionId;
6152 		rel->rd_pdcxt = NULL;
6153 		rel->rd_pddcxt = NULL;
6154 		rel->rd_partcheck = NIL;
6155 		rel->rd_partcheckvalid = false;
6156 		rel->rd_partcheckcxt = NULL;
6157 		rel->rd_indexprs = NIL;
6158 		rel->rd_indpred = NIL;
6159 		rel->rd_exclops = NULL;
6160 		rel->rd_exclprocs = NULL;
6161 		rel->rd_exclstrats = NULL;
6162 		rel->rd_fdwroutine = NULL;
6163 
6164 		/*
6165 		 * Reset transient-state fields in the relcache entry
6166 		 */
6167 		rel->rd_smgr = NULL;
6168 		if (rel->rd_isnailed)
6169 			rel->rd_refcnt = 1;
6170 		else
6171 			rel->rd_refcnt = 0;
6172 		rel->rd_indexvalid = false;
6173 		rel->rd_indexlist = NIL;
6174 		rel->rd_pkindex = InvalidOid;
6175 		rel->rd_replidindex = InvalidOid;
6176 		rel->rd_indexattr = NULL;
6177 		rel->rd_keyattr = NULL;
6178 		rel->rd_pkattr = NULL;
6179 		rel->rd_idattr = NULL;
6180 		rel->rd_pubactions = NULL;
6181 		rel->rd_statvalid = false;
6182 		rel->rd_statlist = NIL;
6183 		rel->rd_fkeyvalid = false;
6184 		rel->rd_fkeylist = NIL;
6185 		rel->rd_createSubid = InvalidSubTransactionId;
6186 		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
6187 		rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
6188 		rel->rd_droppedSubid = InvalidSubTransactionId;
6189 		rel->rd_amcache = NULL;
6190 		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
6191 
6192 		/*
6193 		 * Recompute lock and physical addressing info.  This is needed in
6194 		 * case the pg_internal.init file was copied from some other database
6195 		 * by CREATE DATABASE.
6196 		 */
6197 		RelationInitLockInfo(rel);
6198 		RelationInitPhysicalAddr(rel);
6199 	}
6200 
6201 	/*
6202 	 * We reached the end of the init file without apparent problem.  Did we
6203 	 * get the right number of nailed items?  This is a useful crosscheck in
6204 	 * case the set of critical rels or indexes changes.  However, that should
6205 	 * not happen in a normally-running system, so let's bleat if it does.
6206 	 *
6207 	 * For the shared init file, we're called before client authentication is
6208 	 * done, which means that elog(WARNING) will go only to the postmaster
6209 	 * log, where it's easily missed.  To ensure that developers notice bad
6210 	 * values of NUM_CRITICAL_SHARED_RELS/NUM_CRITICAL_SHARED_INDEXES, we put
6211 	 * an Assert(false) there.
6212 	 */
6213 	if (shared)
6214 	{
6215 		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
6216 			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
6217 		{
6218 			elog(WARNING, "found %d nailed shared rels and %d nailed shared indexes in init file, but expected %d and %d respectively",
6219 				 nailed_rels, nailed_indexes,
6220 				 NUM_CRITICAL_SHARED_RELS, NUM_CRITICAL_SHARED_INDEXES);
6221 			/* Make sure we get developers' attention about this */
6222 			Assert(false);
6223 			/* In production builds, recover by bootstrapping the relcache */
6224 			goto read_failed;
6225 		}
6226 	}
6227 	else
6228 	{
6229 		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
6230 			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
6231 		{
6232 			elog(WARNING, "found %d nailed rels and %d nailed indexes in init file, but expected %d and %d respectively",
6233 				 nailed_rels, nailed_indexes,
6234 				 NUM_CRITICAL_LOCAL_RELS, NUM_CRITICAL_LOCAL_INDEXES);
6235 			/* We don't need an Assert() in this case */
6236 			goto read_failed;
6237 		}
6238 	}
6239 
6240 	/*
6241 	 * OK, all appears well.
6242 	 *
6243 	 * Now insert all the new relcache entries into the cache.
6244 	 */
6245 	for (relno = 0; relno < num_rels; relno++)
6246 	{
6247 		RelationCacheInsert(rels[relno], false);
6248 	}
6249 
6250 	pfree(rels);
6251 	FreeFile(fp);
6252 
6253 	if (shared)
6254 		criticalSharedRelcachesBuilt = true;
6255 	else
6256 		criticalRelcachesBuilt = true;
6257 	return true;
6258 
6259 	/*
6260 	 * init file is broken, so do it the hard way.  We don't bother trying to
6261 	 * free the clutter we just allocated; it's not in the relcache so it
6262 	 * won't hurt.
6263 	 */
6264 read_failed:
6265 	pfree(rels);
6266 	FreeFile(fp);
6267 
6268 	return false;
6269 }
6270 
6271 /*
6272  * Write out a new initialization file with the current contents
6273  * of the relcache (either shared rels or local rels, as indicated).
6274  */
6275 static void
write_relcache_init_file(bool shared)6276 write_relcache_init_file(bool shared)
6277 {
6278 	FILE	   *fp;
6279 	char		tempfilename[MAXPGPATH];
6280 	char		finalfilename[MAXPGPATH];
6281 	int			magic;
6282 	HASH_SEQ_STATUS status;
6283 	RelIdCacheEnt *idhentry;
6284 	int			i;
6285 
6286 	/*
6287 	 * If we have already received any relcache inval events, there's no
6288 	 * chance of succeeding so we may as well skip the whole thing.
6289 	 */
6290 	if (relcacheInvalsReceived != 0L)
6291 		return;
6292 
6293 	/*
6294 	 * We must write a temporary file and rename it into place. Otherwise,
6295 	 * another backend starting at about the same time might crash trying to
6296 	 * read the partially-complete file.
6297 	 */
6298 	if (shared)
6299 	{
6300 		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
6301 				 RELCACHE_INIT_FILENAME, MyProcPid);
6302 		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
6303 				 RELCACHE_INIT_FILENAME);
6304 	}
6305 	else
6306 	{
6307 		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
6308 				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
6309 		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
6310 				 DatabasePath, RELCACHE_INIT_FILENAME);
6311 	}
6312 
6313 	unlink(tempfilename);		/* in case it exists w/wrong permissions */
6314 
6315 	fp = AllocateFile(tempfilename, PG_BINARY_W);
6316 	if (fp == NULL)
6317 	{
6318 		/*
6319 		 * We used to consider this a fatal error, but we might as well
6320 		 * continue with backend startup ...
6321 		 */
6322 		ereport(WARNING,
6323 				(errcode_for_file_access(),
6324 				 errmsg("could not create relation-cache initialization file \"%s\": %m",
6325 						tempfilename),
6326 				 errdetail("Continuing anyway, but there's something wrong.")));
6327 		return;
6328 	}
6329 
6330 	/*
6331 	 * Write a magic number to serve as a file version identifier.  We can
6332 	 * change the magic number whenever the relcache layout changes.
6333 	 */
6334 	magic = RELCACHE_INIT_FILEMAGIC;
6335 	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
6336 		elog(FATAL, "could not write init file");
6337 
6338 	/*
6339 	 * Write all the appropriate reldescs (in no particular order).
6340 	 */
6341 	hash_seq_init(&status, RelationIdCache);
6342 
6343 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
6344 	{
6345 		Relation	rel = idhentry->reldesc;
6346 		Form_pg_class relform = rel->rd_rel;
6347 
6348 		/* ignore if not correct group */
6349 		if (relform->relisshared != shared)
6350 			continue;
6351 
6352 		/*
6353 		 * Ignore if not supposed to be in init file.  We can allow any shared
6354 		 * relation that's been loaded so far to be in the shared init file,
6355 		 * but unshared relations must be ones that should be in the local
6356 		 * file per RelationIdIsInInitFile.  (Note: if you want to change the
6357 		 * criterion for rels to be kept in the init file, see also inval.c.
6358 		 * The reason for filtering here is to be sure that we don't put
6359 		 * anything into the local init file for which a relcache inval would
6360 		 * not cause invalidation of that init file.)
6361 		 */
6362 		if (!shared && !RelationIdIsInInitFile(RelationGetRelid(rel)))
6363 		{
6364 			/* Nailed rels had better get stored. */
6365 			Assert(!rel->rd_isnailed);
6366 			continue;
6367 		}
6368 
6369 		/* first write the relcache entry proper */
6370 		write_item(rel, sizeof(RelationData), fp);
6371 
6372 		/* next write the relation tuple form */
6373 		write_item(relform, CLASS_TUPLE_SIZE, fp);
6374 
6375 		/* next, do all the attribute tuple form data entries */
6376 		for (i = 0; i < relform->relnatts; i++)
6377 		{
6378 			write_item(TupleDescAttr(rel->rd_att, i),
6379 					   ATTRIBUTE_FIXED_PART_SIZE, fp);
6380 		}
6381 
6382 		/* next, do the access method specific field */
6383 		write_item(rel->rd_options,
6384 				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
6385 				   fp);
6386 
6387 		/*
6388 		 * If it's an index, there's more to do. Note we explicitly ignore
6389 		 * partitioned indexes here.
6390 		 */
6391 		if (rel->rd_rel->relkind == RELKIND_INDEX)
6392 		{
6393 			/* write the pg_index tuple */
6394 			/* we assume this was created by heap_copytuple! */
6395 			write_item(rel->rd_indextuple,
6396 					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
6397 					   fp);
6398 
6399 			/* next, write the vector of opfamily OIDs */
6400 			write_item(rel->rd_opfamily,
6401 					   relform->relnatts * sizeof(Oid),
6402 					   fp);
6403 
6404 			/* next, write the vector of opcintype OIDs */
6405 			write_item(rel->rd_opcintype,
6406 					   relform->relnatts * sizeof(Oid),
6407 					   fp);
6408 
6409 			/* next, write the vector of support procedure OIDs */
6410 			write_item(rel->rd_support,
6411 					   relform->relnatts * (rel->rd_indam->amsupport * sizeof(RegProcedure)),
6412 					   fp);
6413 
6414 			/* next, write the vector of collation OIDs */
6415 			write_item(rel->rd_indcollation,
6416 					   relform->relnatts * sizeof(Oid),
6417 					   fp);
6418 
6419 			/* finally, write the vector of indoption values */
6420 			write_item(rel->rd_indoption,
6421 					   relform->relnatts * sizeof(int16),
6422 					   fp);
6423 
6424 			Assert(rel->rd_opcoptions);
6425 
6426 			/* finally, write the vector of opcoptions values */
6427 			for (i = 0; i < relform->relnatts; i++)
6428 			{
6429 				bytea	   *opt = rel->rd_opcoptions[i];
6430 
6431 				write_item(opt, opt ? VARSIZE(opt) : 0, fp);
6432 			}
6433 		}
6434 	}
6435 
6436 	if (FreeFile(fp))
6437 		elog(FATAL, "could not write init file");
6438 
6439 	/*
6440 	 * Now we have to check whether the data we've so painstakingly
6441 	 * accumulated is already obsolete due to someone else's just-committed
6442 	 * catalog changes.  If so, we just delete the temp file and leave it to
6443 	 * the next backend to try again.  (Our own relcache entries will be
6444 	 * updated by SI message processing, but we can't be sure whether what we
6445 	 * wrote out was up-to-date.)
6446 	 *
6447 	 * This mustn't run concurrently with the code that unlinks an init file
6448 	 * and sends SI messages, so grab a serialization lock for the duration.
6449 	 */
6450 	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
6451 
6452 	/* Make sure we have seen all incoming SI messages */
6453 	AcceptInvalidationMessages();
6454 
6455 	/*
6456 	 * If we have received any SI relcache invals since backend start, assume
6457 	 * we may have written out-of-date data.
6458 	 */
6459 	if (relcacheInvalsReceived == 0L)
6460 	{
6461 		/*
6462 		 * OK, rename the temp file to its final name, deleting any
6463 		 * previously-existing init file.
6464 		 *
6465 		 * Note: a failure here is possible under Cygwin, if some other
6466 		 * backend is holding open an unlinked-but-not-yet-gone init file. So
6467 		 * treat this as a noncritical failure; just remove the useless temp
6468 		 * file on failure.
6469 		 */
6470 		if (rename(tempfilename, finalfilename) < 0)
6471 			unlink(tempfilename);
6472 	}
6473 	else
6474 	{
6475 		/* Delete the already-obsolete temp file */
6476 		unlink(tempfilename);
6477 	}
6478 
6479 	LWLockRelease(RelCacheInitLock);
6480 }
6481 
6482 /* write a chunk of data preceded by its length */
6483 static void
write_item(const void * data,Size len,FILE * fp)6484 write_item(const void *data, Size len, FILE *fp)
6485 {
6486 	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
6487 		elog(FATAL, "could not write init file");
6488 	if (fwrite(data, 1, len, fp) != len)
6489 		elog(FATAL, "could not write init file");
6490 }
6491 
6492 /*
6493  * Determine whether a given relation (identified by OID) is one of the ones
6494  * we should store in a relcache init file.
6495  *
6496  * We must cache all nailed rels, and for efficiency we should cache every rel
6497  * that supports a syscache.  The former set is almost but not quite a subset
6498  * of the latter. The special cases are relations where
6499  * RelationCacheInitializePhase2/3 chooses to nail for efficiency reasons, but
6500  * which do not support any syscache.
6501  */
6502 bool
RelationIdIsInInitFile(Oid relationId)6503 RelationIdIsInInitFile(Oid relationId)
6504 {
6505 	if (relationId == SharedSecLabelRelationId ||
6506 		relationId == TriggerRelidNameIndexId ||
6507 		relationId == DatabaseNameIndexId ||
6508 		relationId == SharedSecLabelObjectIndexId)
6509 	{
6510 		/*
6511 		 * If this Assert fails, we don't need the applicable special case
6512 		 * anymore.
6513 		 */
6514 		Assert(!RelationSupportsSysCache(relationId));
6515 		return true;
6516 	}
6517 	return RelationSupportsSysCache(relationId);
6518 }
6519 
6520 /*
6521  * Invalidate (remove) the init file during commit of a transaction that
6522  * changed one or more of the relation cache entries that are kept in the
6523  * local init file.
6524  *
6525  * To be safe against concurrent inspection or rewriting of the init file,
6526  * we must take RelCacheInitLock, then remove the old init file, then send
6527  * the SI messages that include relcache inval for such relations, and then
6528  * release RelCacheInitLock.  This serializes the whole affair against
6529  * write_relcache_init_file, so that we can be sure that any other process
6530  * that's concurrently trying to create a new init file won't move an
6531  * already-stale version into place after we unlink.  Also, because we unlink
6532  * before sending the SI messages, a backend that's currently starting cannot
6533  * read the now-obsolete init file and then miss the SI messages that will
6534  * force it to update its relcache entries.  (This works because the backend
6535  * startup sequence gets into the sinval array before trying to load the init
6536  * file.)
6537  *
6538  * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
6539  * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
6540  * send any pending SI messages between those calls.
6541  */
6542 void
RelationCacheInitFilePreInvalidate(void)6543 RelationCacheInitFilePreInvalidate(void)
6544 {
6545 	char		localinitfname[MAXPGPATH];
6546 	char		sharedinitfname[MAXPGPATH];
6547 
6548 	if (DatabasePath)
6549 		snprintf(localinitfname, sizeof(localinitfname), "%s/%s",
6550 				 DatabasePath, RELCACHE_INIT_FILENAME);
6551 	snprintf(sharedinitfname, sizeof(sharedinitfname), "global/%s",
6552 			 RELCACHE_INIT_FILENAME);
6553 
6554 	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
6555 
6556 	/*
6557 	 * The files might not be there if no backend has been started since the
6558 	 * last removal.  But complain about failures other than ENOENT with
6559 	 * ERROR.  Fortunately, it's not too late to abort the transaction if we
6560 	 * can't get rid of the would-be-obsolete init file.
6561 	 */
6562 	if (DatabasePath)
6563 		unlink_initfile(localinitfname, ERROR);
6564 	unlink_initfile(sharedinitfname, ERROR);
6565 }
6566 
6567 void
RelationCacheInitFilePostInvalidate(void)6568 RelationCacheInitFilePostInvalidate(void)
6569 {
6570 	LWLockRelease(RelCacheInitLock);
6571 }
6572 
6573 /*
6574  * Remove the init files during postmaster startup.
6575  *
6576  * We used to keep the init files across restarts, but that is unsafe in PITR
6577  * scenarios, and even in simple crash-recovery cases there are windows for
6578  * the init files to become out-of-sync with the database.  So now we just
6579  * remove them during startup and expect the first backend launch to rebuild
6580  * them.  Of course, this has to happen in each database of the cluster.
6581  */
6582 void
RelationCacheInitFileRemove(void)6583 RelationCacheInitFileRemove(void)
6584 {
6585 	const char *tblspcdir = "pg_tblspc";
6586 	DIR		   *dir;
6587 	struct dirent *de;
6588 	char		path[MAXPGPATH + 10 + sizeof(TABLESPACE_VERSION_DIRECTORY)];
6589 
6590 	snprintf(path, sizeof(path), "global/%s",
6591 			 RELCACHE_INIT_FILENAME);
6592 	unlink_initfile(path, LOG);
6593 
6594 	/* Scan everything in the default tablespace */
6595 	RelationCacheInitFileRemoveInDir("base");
6596 
6597 	/* Scan the tablespace link directory to find non-default tablespaces */
6598 	dir = AllocateDir(tblspcdir);
6599 
6600 	while ((de = ReadDirExtended(dir, tblspcdir, LOG)) != NULL)
6601 	{
6602 		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
6603 		{
6604 			/* Scan the tablespace dir for per-database dirs */
6605 			snprintf(path, sizeof(path), "%s/%s/%s",
6606 					 tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
6607 			RelationCacheInitFileRemoveInDir(path);
6608 		}
6609 	}
6610 
6611 	FreeDir(dir);
6612 }
6613 
6614 /* Process one per-tablespace directory for RelationCacheInitFileRemove */
6615 static void
RelationCacheInitFileRemoveInDir(const char * tblspcpath)6616 RelationCacheInitFileRemoveInDir(const char *tblspcpath)
6617 {
6618 	DIR		   *dir;
6619 	struct dirent *de;
6620 	char		initfilename[MAXPGPATH * 2];
6621 
6622 	/* Scan the tablespace directory to find per-database directories */
6623 	dir = AllocateDir(tblspcpath);
6624 
6625 	while ((de = ReadDirExtended(dir, tblspcpath, LOG)) != NULL)
6626 	{
6627 		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
6628 		{
6629 			/* Try to remove the init file in each database */
6630 			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
6631 					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
6632 			unlink_initfile(initfilename, LOG);
6633 		}
6634 	}
6635 
6636 	FreeDir(dir);
6637 }
6638 
6639 static void
unlink_initfile(const char * initfilename,int elevel)6640 unlink_initfile(const char *initfilename, int elevel)
6641 {
6642 	if (unlink(initfilename) < 0)
6643 	{
6644 		/* It might not be there, but log any error other than ENOENT */
6645 		if (errno != ENOENT)
6646 			ereport(elevel,
6647 					(errcode_for_file_access(),
6648 					 errmsg("could not remove cache file \"%s\": %m",
6649 							initfilename)));
6650 	}
6651 }
6652